Merge pull request #4676 from jmarshallnz/dont_set_scraper_on_tvshow_on_nfo
[vuplus_xbmc] / xbmc / utils / JobManager.cpp
1 /*
2  *      Copyright (C) 2005-2013 Team XBMC
3  *      http://xbmc.org
4  *
5  *  This Program is free software; you can redistribute it and/or modify
6  *  it under the terms of the GNU General Public License as published by
7  *  the Free Software Foundation; either version 2, or (at your option)
8  *  any later version.
9  *
10  *  This Program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  *  GNU General Public License for more details.
14  *
15  *  You should have received a copy of the GNU General Public License
16  *  along with XBMC; see the file COPYING.  If not, see
17  *  <http://www.gnu.org/licenses/>.
18  *
19  */
20
21 #include "JobManager.h"
22 #include <algorithm>
23 #include <stdexcept>
24 #include "threads/SingleLock.h"
25 #include "utils/log.h"
26
27 #include "system.h"
28
29
30 using namespace std;
31
32 bool CJob::ShouldCancel(unsigned int progress, unsigned int total) const
33 {
34   if (m_callback)
35     return m_callback->OnJobProgress(progress, total, this);
36   return false;
37 }
38
39 CJobWorker::CJobWorker(CJobManager *manager) : CThread("JobWorker")
40 {
41   m_jobManager = manager;
42   Create(true); // start work immediately, and kill ourselves when we're done
43 }
44
45 CJobWorker::~CJobWorker()
46 {
47   // while we should already be removed from the job manager, if an exception
48   // occurs during processing that we haven't caught, we may skip over that step.
49   // Thus, before we go out of scope, ensure the job manager knows we're gone.
50   m_jobManager->RemoveWorker(this);
51   if(!IsAutoDelete())
52     StopThread();
53 }
54
55 void CJobWorker::Process()
56 {
57   SetPriority( GetMinPriority() );
58   while (true)
59   {
60     // request an item from our manager (this call is blocking)
61     CJob *job = m_jobManager->GetNextJob(this);
62     if (!job)
63       break;
64
65     bool success = false;
66     try
67     {
68       success = job->DoWork();
69     }
70     catch (...)
71     {
72       CLog::Log(LOGERROR, "%s error processing job %s", __FUNCTION__, job->GetType());
73     }
74     m_jobManager->OnJobComplete(success, job);
75   }
76 }
77
78 void CJobQueue::CJobPointer::CancelJob()
79 {
80   CJobManager::GetInstance().CancelJob(m_id);
81   m_id = 0;
82 }
83
84 CJobQueue::CJobQueue(bool lifo, unsigned int jobsAtOnce, CJob::PRIORITY priority)
85 : m_jobsAtOnce(jobsAtOnce), m_priority(priority), m_lifo(lifo)
86 {
87 }
88
89 CJobQueue::~CJobQueue()
90 {
91   CancelJobs();
92 }
93
94 void CJobQueue::OnJobComplete(unsigned int jobID, bool success, CJob *job)
95 {
96   CSingleLock lock(m_section);
97   // check if this job is in our processing list
98   Processing::iterator i = find(m_processing.begin(), m_processing.end(), job);
99   if (i != m_processing.end())
100     m_processing.erase(i);
101   // request a new job be queued
102   QueueNextJob();
103 }
104
105 void CJobQueue::CancelJob(const CJob *job)
106 {
107   CSingleLock lock(m_section);
108   Processing::iterator i = find(m_processing.begin(), m_processing.end(), job);
109   if (i != m_processing.end())
110   {
111     i->CancelJob();
112     m_processing.erase(i);
113     return;
114   }
115   Queue::iterator j = find(m_jobQueue.begin(), m_jobQueue.end(), job);
116   if (j != m_jobQueue.end())
117   {
118     j->FreeJob();
119     m_jobQueue.erase(j);
120   }
121 }
122
123 void CJobQueue::AddJob(CJob *job)
124 {
125   CSingleLock lock(m_section);
126   // check if we have this job already.  If so, we're done.
127   if (find(m_jobQueue.begin(), m_jobQueue.end(), job) != m_jobQueue.end() ||
128       find(m_processing.begin(), m_processing.end(), job) != m_processing.end())
129   {
130     delete job;
131     return;
132   }
133
134   if (m_lifo)
135     m_jobQueue.push_back(CJobPointer(job));
136   else
137     m_jobQueue.push_front(CJobPointer(job));
138   QueueNextJob();
139 }
140
141 void CJobQueue::QueueNextJob()
142 {
143   CSingleLock lock(m_section);
144   if (m_jobQueue.size() && m_processing.size() < m_jobsAtOnce)
145   {
146     CJobPointer &job = m_jobQueue.back();
147     job.m_id = CJobManager::GetInstance().AddJob(job.m_job, this, m_priority);
148     m_processing.push_back(job);
149     m_jobQueue.pop_back();
150   }
151 }
152
153 void CJobQueue::CancelJobs()
154 {
155   CSingleLock lock(m_section);
156   for_each(m_processing.begin(), m_processing.end(), mem_fun_ref(&CJobPointer::CancelJob));
157   for_each(m_jobQueue.begin(), m_jobQueue.end(), mem_fun_ref(&CJobPointer::FreeJob));
158   m_jobQueue.clear();
159   m_processing.clear();
160 }
161
162
163 bool CJobQueue::QueueEmpty() const
164 {
165   CSingleLock lock(m_section);
166   return m_jobQueue.empty();
167 }
168
169 CJobManager &CJobManager::GetInstance()
170 {
171   static CJobManager sJobManager;
172   return sJobManager;
173 }
174
175 CJobManager::CJobManager()
176 {
177   m_jobCounter = 0;
178   m_running = true;
179   m_pauseJobs = false;
180 }
181
182 void CJobManager::Restart()
183 {
184   CSingleLock lock(m_section);
185
186   if (m_running)
187     throw std::logic_error("CJobManager already running");
188   m_running = true;
189 }
190
191 void CJobManager::CancelJobs()
192 {
193   CSingleLock lock(m_section);
194   m_running = false;
195
196   // clear any pending jobs
197   for (unsigned int priority = CJob::PRIORITY_LOW_PAUSABLE; priority <= CJob::PRIORITY_HIGH; ++priority)
198   {
199     for_each(m_jobQueue[priority].begin(), m_jobQueue[priority].end(), mem_fun_ref(&CWorkItem::FreeJob));
200     m_jobQueue[priority].clear();
201   }
202
203   // cancel any callbacks on jobs still processing
204   for_each(m_processing.begin(), m_processing.end(), mem_fun_ref(&CWorkItem::Cancel));
205
206   // tell our workers to finish
207   while (m_workers.size())
208   {
209     lock.Leave();
210     m_jobEvent.Set();
211     Sleep(0); // yield after setting the event to give the workers some time to die
212     lock.Enter();
213   }
214 }
215
216 CJobManager::~CJobManager()
217 {
218 }
219
220 unsigned int CJobManager::AddJob(CJob *job, IJobCallback *callback, CJob::PRIORITY priority)
221 {
222   CSingleLock lock(m_section);
223
224   if (!m_running)
225     return 0;
226
227   // increment the job counter, ensuring 0 (invalid job) is never hit
228   m_jobCounter++;
229   if (m_jobCounter == 0)
230     m_jobCounter++;
231
232   // create a work item for this job
233   CWorkItem work(job, m_jobCounter, priority, callback);
234   m_jobQueue[priority].push_back(work);
235
236   StartWorkers(priority);
237   return work.m_id;
238 }
239
240 void CJobManager::CancelJob(unsigned int jobID)
241 {
242   CSingleLock lock(m_section);
243
244   // check whether we have this job in the queue
245   for (unsigned int priority = CJob::PRIORITY_LOW_PAUSABLE; priority <= CJob::PRIORITY_HIGH; ++priority)
246   {
247     JobQueue::iterator i = find(m_jobQueue[priority].begin(), m_jobQueue[priority].end(), jobID);
248     if (i != m_jobQueue[priority].end())
249     {
250       delete i->m_job;
251       m_jobQueue[priority].erase(i);
252       return;
253     }
254   }
255   // or if we're processing it
256   Processing::iterator it = find(m_processing.begin(), m_processing.end(), jobID);
257   if (it != m_processing.end())
258     it->m_callback = NULL; // job is in progress, so only thing to do is to remove callback
259 }
260
261 void CJobManager::StartWorkers(CJob::PRIORITY priority)
262 {
263   CSingleLock lock(m_section);
264
265   // check how many free threads we have
266   if (m_processing.size() >= GetMaxWorkers(priority))
267     return;
268
269   // do we have any sleeping threads?
270   if (m_processing.size() < m_workers.size())
271   {
272     m_jobEvent.Set();
273     return;
274   }
275
276   // everyone is busy - we need more workers
277   m_workers.push_back(new CJobWorker(this));
278 }
279
280 CJob *CJobManager::PopJob()
281 {
282   CSingleLock lock(m_section);
283   for (int priority = CJob::PRIORITY_HIGH; priority >= CJob::PRIORITY_LOW_PAUSABLE; --priority)
284   {
285     // Check whether we're pausing pausable jobs
286     if (priority == CJob::PRIORITY_LOW_PAUSABLE && m_pauseJobs)
287       continue;
288
289     if (m_jobQueue[priority].size() && m_processing.size() < GetMaxWorkers(CJob::PRIORITY(priority)))
290     {
291       // pop the job off the queue
292       CWorkItem job = m_jobQueue[priority].front();
293       m_jobQueue[priority].pop_front();
294
295       // add to the processing vector
296       m_processing.push_back(job);
297       job.m_job->m_callback = this;
298       return job.m_job;
299     }
300   }
301   return NULL;
302 }
303
304 void CJobManager::PauseJobs()
305 {
306   CSingleLock lock(m_section);
307   m_pauseJobs = true;
308 }
309
310 void CJobManager::UnPauseJobs()
311 {
312   CSingleLock lock(m_section);
313   m_pauseJobs = false;
314 }
315
316 bool CJobManager::IsProcessing(const CJob::PRIORITY &priority) const
317 {
318   CSingleLock lock(m_section);
319
320   if (m_pauseJobs)
321     return false;
322
323   for(Processing::const_iterator it = m_processing.begin(); it < m_processing.end(); it++)
324   {
325     if (priority == it->m_priority)
326       return true;
327   }
328   return false;
329 }
330
331 int CJobManager::IsProcessing(const std::string &type) const
332 {
333   int jobsMatched = 0;
334   CSingleLock lock(m_section);
335
336   if (m_pauseJobs)
337     return 0;
338
339   for(Processing::const_iterator it = m_processing.begin(); it < m_processing.end(); it++)
340   {
341     if (type == std::string(it->m_job->GetType()))
342       jobsMatched++;
343   }
344   return jobsMatched;
345 }
346
347 CJob *CJobManager::GetNextJob(const CJobWorker *worker)
348 {
349   CSingleLock lock(m_section);
350   while (m_running)
351   {
352     // grab a job off the queue if we have one
353     CJob *job = PopJob();
354     if (job)
355       return job;
356     // no jobs are left - sleep for 30 seconds to allow new jobs to come in
357     lock.Leave();
358     bool newJob = m_jobEvent.WaitMSec(30000);
359     lock.Enter();
360     if (!newJob)
361       break;
362   }
363   // ensure no jobs have come in during the period after
364   // timeout and before we held the lock
365   CJob *job = PopJob();
366   if (job)
367     return job;
368   // have no jobs
369   RemoveWorker(worker);
370   return NULL;
371 }
372
373 bool CJobManager::OnJobProgress(unsigned int progress, unsigned int total, const CJob *job) const
374 {
375   CSingleLock lock(m_section);
376   // find the job in the processing queue, and check whether it's cancelled (no callback)
377   Processing::const_iterator i = find(m_processing.begin(), m_processing.end(), job);
378   if (i != m_processing.end())
379   {
380     CWorkItem item(*i);
381     lock.Leave(); // leave section prior to call
382     if (item.m_callback)
383     {
384       item.m_callback->OnJobProgress(item.m_id, progress, total, job);
385       return false;
386     }
387   }
388   return true; // couldn't find the job, or it's been cancelled
389 }
390
391 void CJobManager::OnJobComplete(bool success, CJob *job)
392 {
393   CSingleLock lock(m_section);
394   // remove the job from the processing queue
395   Processing::iterator i = find(m_processing.begin(), m_processing.end(), job);
396   if (i != m_processing.end())
397   {
398     // tell any listeners we're done with the job, then delete it
399     CWorkItem item(*i);
400     lock.Leave();
401     try
402     {
403       if (item.m_callback)
404         item.m_callback->OnJobComplete(item.m_id, success, item.m_job);
405     }
406     catch (...)
407     {
408       CLog::Log(LOGERROR, "%s error processing job %s", __FUNCTION__, item.m_job->GetType());
409     }
410     lock.Enter();
411     Processing::iterator j = find(m_processing.begin(), m_processing.end(), job);
412     if (j != m_processing.end())
413       m_processing.erase(j);
414     lock.Leave();
415     item.FreeJob();
416   }
417 }
418
419 void CJobManager::RemoveWorker(const CJobWorker *worker)
420 {
421   CSingleLock lock(m_section);
422   // remove our worker
423   Workers::iterator i = find(m_workers.begin(), m_workers.end(), worker);
424   if (i != m_workers.end())
425     m_workers.erase(i); // workers auto-delete
426 }
427
428 unsigned int CJobManager::GetMaxWorkers(CJob::PRIORITY priority) const
429 {
430   static const unsigned int max_workers = 5;
431   return max_workers - (CJob::PRIORITY_HIGH - priority);
432 }