2 * Copyright (C) 2005-2013 Team XBMC
5 * This Program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2, or (at your option)
10 * This Program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with XBMC; see the file COPYING. If not, see
17 * <http://www.gnu.org/licenses/>.
21 #include "JobManager.h"
24 #include "threads/SingleLock.h"
25 #include "utils/log.h"
32 bool CJob::ShouldCancel(unsigned int progress, unsigned int total) const
35 return m_callback->OnJobProgress(progress, total, this);
39 CJobWorker::CJobWorker(CJobManager *manager) : CThread("JobWorker")
41 m_jobManager = manager;
42 Create(true); // start work immediately, and kill ourselves when we're done
45 CJobWorker::~CJobWorker()
47 // while we should already be removed from the job manager, if an exception
48 // occurs during processing that we haven't caught, we may skip over that step.
49 // Thus, before we go out of scope, ensure the job manager knows we're gone.
50 m_jobManager->RemoveWorker(this);
55 void CJobWorker::Process()
57 SetPriority( GetMinPriority() );
60 // request an item from our manager (this call is blocking)
61 CJob *job = m_jobManager->GetNextJob(this);
68 success = job->DoWork();
72 CLog::Log(LOGERROR, "%s error processing job %s", __FUNCTION__, job->GetType());
74 m_jobManager->OnJobComplete(success, job);
78 void CJobQueue::CJobPointer::CancelJob()
80 CJobManager::GetInstance().CancelJob(m_id);
84 CJobQueue::CJobQueue(bool lifo, unsigned int jobsAtOnce, CJob::PRIORITY priority)
85 : m_jobsAtOnce(jobsAtOnce), m_priority(priority), m_lifo(lifo)
89 CJobQueue::~CJobQueue()
94 void CJobQueue::OnJobComplete(unsigned int jobID, bool success, CJob *job)
96 CSingleLock lock(m_section);
97 // check if this job is in our processing list
98 Processing::iterator i = find(m_processing.begin(), m_processing.end(), job);
99 if (i != m_processing.end())
100 m_processing.erase(i);
101 // request a new job be queued
105 void CJobQueue::CancelJob(const CJob *job)
107 CSingleLock lock(m_section);
108 Processing::iterator i = find(m_processing.begin(), m_processing.end(), job);
109 if (i != m_processing.end())
112 m_processing.erase(i);
115 Queue::iterator j = find(m_jobQueue.begin(), m_jobQueue.end(), job);
116 if (j != m_jobQueue.end())
123 void CJobQueue::AddJob(CJob *job)
125 CSingleLock lock(m_section);
126 // check if we have this job already. If so, we're done.
127 if (find(m_jobQueue.begin(), m_jobQueue.end(), job) != m_jobQueue.end() ||
128 find(m_processing.begin(), m_processing.end(), job) != m_processing.end())
135 m_jobQueue.push_back(CJobPointer(job));
137 m_jobQueue.push_front(CJobPointer(job));
141 void CJobQueue::QueueNextJob()
143 CSingleLock lock(m_section);
144 if (m_jobQueue.size() && m_processing.size() < m_jobsAtOnce)
146 CJobPointer &job = m_jobQueue.back();
147 job.m_id = CJobManager::GetInstance().AddJob(job.m_job, this, m_priority);
148 m_processing.push_back(job);
149 m_jobQueue.pop_back();
153 void CJobQueue::CancelJobs()
155 CSingleLock lock(m_section);
156 for_each(m_processing.begin(), m_processing.end(), mem_fun_ref(&CJobPointer::CancelJob));
157 for_each(m_jobQueue.begin(), m_jobQueue.end(), mem_fun_ref(&CJobPointer::FreeJob));
159 m_processing.clear();
163 bool CJobQueue::QueueEmpty() const
165 CSingleLock lock(m_section);
166 return m_jobQueue.empty();
169 CJobManager &CJobManager::GetInstance()
171 static CJobManager sJobManager;
175 CJobManager::CJobManager()
182 void CJobManager::Restart()
184 CSingleLock lock(m_section);
187 throw std::logic_error("CJobManager already running");
191 void CJobManager::CancelJobs()
193 CSingleLock lock(m_section);
196 // clear any pending jobs
197 for (unsigned int priority = CJob::PRIORITY_LOW_PAUSABLE; priority <= CJob::PRIORITY_HIGH; ++priority)
199 for_each(m_jobQueue[priority].begin(), m_jobQueue[priority].end(), mem_fun_ref(&CWorkItem::FreeJob));
200 m_jobQueue[priority].clear();
203 // cancel any callbacks on jobs still processing
204 for_each(m_processing.begin(), m_processing.end(), mem_fun_ref(&CWorkItem::Cancel));
206 // tell our workers to finish
207 while (m_workers.size())
211 Sleep(0); // yield after setting the event to give the workers some time to die
216 CJobManager::~CJobManager()
220 unsigned int CJobManager::AddJob(CJob *job, IJobCallback *callback, CJob::PRIORITY priority)
222 CSingleLock lock(m_section);
227 // increment the job counter, ensuring 0 (invalid job) is never hit
229 if (m_jobCounter == 0)
232 // create a work item for this job
233 CWorkItem work(job, m_jobCounter, priority, callback);
234 m_jobQueue[priority].push_back(work);
236 StartWorkers(priority);
240 void CJobManager::CancelJob(unsigned int jobID)
242 CSingleLock lock(m_section);
244 // check whether we have this job in the queue
245 for (unsigned int priority = CJob::PRIORITY_LOW_PAUSABLE; priority <= CJob::PRIORITY_HIGH; ++priority)
247 JobQueue::iterator i = find(m_jobQueue[priority].begin(), m_jobQueue[priority].end(), jobID);
248 if (i != m_jobQueue[priority].end())
251 m_jobQueue[priority].erase(i);
255 // or if we're processing it
256 Processing::iterator it = find(m_processing.begin(), m_processing.end(), jobID);
257 if (it != m_processing.end())
258 it->m_callback = NULL; // job is in progress, so only thing to do is to remove callback
261 void CJobManager::StartWorkers(CJob::PRIORITY priority)
263 CSingleLock lock(m_section);
265 // check how many free threads we have
266 if (m_processing.size() >= GetMaxWorkers(priority))
269 // do we have any sleeping threads?
270 if (m_processing.size() < m_workers.size())
276 // everyone is busy - we need more workers
277 m_workers.push_back(new CJobWorker(this));
280 CJob *CJobManager::PopJob()
282 CSingleLock lock(m_section);
283 for (int priority = CJob::PRIORITY_HIGH; priority >= CJob::PRIORITY_LOW_PAUSABLE; --priority)
285 // Check whether we're pausing pausable jobs
286 if (priority == CJob::PRIORITY_LOW_PAUSABLE && m_pauseJobs)
289 if (m_jobQueue[priority].size() && m_processing.size() < GetMaxWorkers(CJob::PRIORITY(priority)))
291 // pop the job off the queue
292 CWorkItem job = m_jobQueue[priority].front();
293 m_jobQueue[priority].pop_front();
295 // add to the processing vector
296 m_processing.push_back(job);
297 job.m_job->m_callback = this;
304 void CJobManager::PauseJobs()
306 CSingleLock lock(m_section);
310 void CJobManager::UnPauseJobs()
312 CSingleLock lock(m_section);
316 bool CJobManager::IsProcessing(const CJob::PRIORITY &priority) const
318 CSingleLock lock(m_section);
323 for(Processing::const_iterator it = m_processing.begin(); it < m_processing.end(); it++)
325 if (priority == it->m_priority)
331 int CJobManager::IsProcessing(const std::string &type) const
334 CSingleLock lock(m_section);
339 for(Processing::const_iterator it = m_processing.begin(); it < m_processing.end(); it++)
341 if (type == std::string(it->m_job->GetType()))
347 CJob *CJobManager::GetNextJob(const CJobWorker *worker)
349 CSingleLock lock(m_section);
352 // grab a job off the queue if we have one
353 CJob *job = PopJob();
356 // no jobs are left - sleep for 30 seconds to allow new jobs to come in
358 bool newJob = m_jobEvent.WaitMSec(30000);
363 // ensure no jobs have come in during the period after
364 // timeout and before we held the lock
365 CJob *job = PopJob();
369 RemoveWorker(worker);
373 bool CJobManager::OnJobProgress(unsigned int progress, unsigned int total, const CJob *job) const
375 CSingleLock lock(m_section);
376 // find the job in the processing queue, and check whether it's cancelled (no callback)
377 Processing::const_iterator i = find(m_processing.begin(), m_processing.end(), job);
378 if (i != m_processing.end())
381 lock.Leave(); // leave section prior to call
384 item.m_callback->OnJobProgress(item.m_id, progress, total, job);
388 return true; // couldn't find the job, or it's been cancelled
391 void CJobManager::OnJobComplete(bool success, CJob *job)
393 CSingleLock lock(m_section);
394 // remove the job from the processing queue
395 Processing::iterator i = find(m_processing.begin(), m_processing.end(), job);
396 if (i != m_processing.end())
398 // tell any listeners we're done with the job, then delete it
404 item.m_callback->OnJobComplete(item.m_id, success, item.m_job);
408 CLog::Log(LOGERROR, "%s error processing job %s", __FUNCTION__, item.m_job->GetType());
411 Processing::iterator j = find(m_processing.begin(), m_processing.end(), job);
412 if (j != m_processing.end())
413 m_processing.erase(j);
419 void CJobManager::RemoveWorker(const CJobWorker *worker)
421 CSingleLock lock(m_section);
423 Workers::iterator i = find(m_workers.begin(), m_workers.end(), worker);
424 if (i != m_workers.end())
425 m_workers.erase(i); // workers auto-delete
428 unsigned int CJobManager::GetMaxWorkers(CJob::PRIORITY priority) const
430 static const unsigned int max_workers = 5;
431 return max_workers - (CJob::PRIORITY_HIGH - priority);