1 # A Job consists of many "Tasks".
2 # A task is the run of an external tool, with proper methods for failure handling
4 from Tools.CList import CList
7 NOT_STARTED, IN_PROGRESS, FINISHED, FAILED = range(4)
8 def __init__(self, name):
10 self.resident_tasks = [ ]
11 self.workspace = "/tmp"
20 self.state_changed = CList()
22 self.status = self.NOT_STARTED
24 # description is a dict
25 def fromDescription(self, description):
28 def createDescription(self):
31 def getProgress(self):
32 if self.current_task == len(self.tasks):
34 t = self.tasks[self.current_task]
35 jobprogress = t.weighting * t.progress / float(t.end) + sum([task.weighting for task in self.tasks[:self.current_task]])
36 return int(jobprogress*self.weightScale)
38 progress = property(getProgress)
40 def task_progress_changed_CB(self):
43 def addTask(self, task):
45 self.tasks.append(task)
47 def start(self, callback):
48 assert self.callback is None
49 self.callback = callback
53 self.status = self.IN_PROGRESS
56 sumTaskWeightings = sum([t.weighting for t in self.tasks])
57 self.weightScale = self.end / float(sumTaskWeightings)
60 if self.current_task == len(self.tasks):
61 if len(self.resident_tasks) == 0:
64 self.status = self.FINISHED
68 print "still waiting for %d resident task(s) %s to finish" % (len(self.resident_tasks), str(self.resident_tasks))
70 self.tasks[self.current_task].run(self.taskCallback, self.task_progress_changed_CB)
73 def taskCallback(self, task, res, stay_resident = False):
74 cb_idx = self.tasks.index(task)
76 if cb_idx not in self.resident_tasks:
77 self.resident_tasks.append(self.current_task)
78 print "task going resident:", task
80 print "task keeps staying resident:", task
83 print ">>> Error:", res
84 self.status = self.FAILED
86 self.callback(self, task, res)
87 if cb_idx != self.current_task:
88 if cb_idx in self.resident_tasks:
89 print "resident task finished:", task
90 self.resident_tasks.remove(cb_idx)
93 self.current_task += 1
97 assert self.status == self.FAILED
101 if self.current_task < len(self.tasks):
102 self.tasks[self.current_task].abort()
103 for i in self.resident_tasks:
104 self.tasks[i].abort()
107 # some Jobs might have a better idea of how to cancel a job
111 def __init__(self, job, name):
113 self.immediate_preconditions = [ ]
114 self.global_preconditions = [ ]
115 self.postconditions = [ ]
116 self.returncode = None
117 self.initial_input = None
126 self.task_progress_changed = None
127 self.output_line = ""
130 def setCommandline(self, cmd, args):
134 def setTool(self, tool):
137 self.global_preconditions.append(ToolExistsPrecondition())
138 self.postconditions.append(ReturncodePostcondition())
140 def checkPreconditions(self, immediate = False):
143 preconditions = self.immediate_preconditions
145 preconditions = self.global_preconditions
146 for precondition in preconditions:
147 if not precondition.check(self):
148 not_met.append(precondition)
151 def run(self, callback, task_progress_changed):
152 failed_preconditions = self.checkPreconditions(True) + self.checkPreconditions(False)
153 if len(failed_preconditions):
154 callback(self, failed_preconditions)
158 self.callback = callback
159 self.task_progress_changed = task_progress_changed
160 from enigma import eConsoleAppContainer
161 self.container = eConsoleAppContainer()
162 self.container.appClosed.get().append(self.processFinished)
163 self.container.stdoutAvail.get().append(self.processStdout)
164 self.container.stderrAvail.get().append(self.processStderr)
166 assert self.cmd is not None
167 assert len(self.args) >= 1
169 if self.cwd is not None:
170 self.container.setCWD(self.cwd)
172 print "execute:", self.container.execute(self.cmd, self.args), self.cmd, " ".join(self.args)
173 if self.initial_input:
174 self.writeInput(self.initial_input)
179 def cleanup(self, failed):
182 def processStdout(self, data):
183 self.processOutput(data)
185 def processStderr(self, data):
186 self.processOutput(data)
188 def processOutput(self, data):
189 self.output_line += data
191 i = self.output_line.find('\n')
194 self.processOutputLine(self.output_line[:i+1])
195 self.output_line = self.output_line[i+1:]
197 def processOutputLine(self, line):
200 def processFinished(self, returncode):
201 self.returncode = returncode
205 self.container.kill()
206 self.finish(aborted = True)
208 def finish(self, aborted = False):
212 not_met.append(AbortedPostcondition())
214 for postcondition in self.postconditions:
215 if not postcondition.check(self):
216 not_met.append(postcondition)
217 self.cleanup(not_met)
218 self.callback(self, not_met)
223 def writeInput(self, input):
224 self.container.write(input)
226 def getProgress(self):
227 return self.__progress
229 def setProgress(self, progress):
230 if progress > self.end:
234 self.__progress = progress
235 self.task_progress_changed()
237 progress = property(getProgress, setProgress)
239 # The jobmanager will execute multiple jobs, each after another.
240 # later, it will also support suspending jobs (and continuing them after reboot etc)
241 # It also supports a notification when some error occured, and possibly a retry.
244 self.active_jobs = [ ]
245 self.failed_jobs = [ ]
246 self.job_classes = [ ]
247 self.active_job = None
249 def AddJob(self, job):
250 self.active_jobs.append(job)
254 if self.active_job is None:
255 if len(self.active_jobs):
256 self.active_job = self.active_jobs.pop(0)
257 self.active_job.start(self.jobDone)
259 def jobDone(self, job, task, problems):
260 print "job", job, "completed with", problems, "in", task
262 from Tools import Notifications
263 from Screens.MessageBox import MessageBox
264 Notifications.AddNotificationWithCallback(self.errorCB, MessageBox, _("Error: %s\nRetry?") % (problems[0].getErrorMessage(task)))
266 #self.failed_jobs.append(self.active_job)
268 self.active_job = None
271 def errorCB(self, answer):
274 self.active_job.retry()
276 print "not retrying job."
277 self.failed_jobs.append(self.active_job)
278 self.active_job = None
282 #class PartitionExistsPostcondition:
283 # def __init__(self, device):
284 # self.device = device
286 # def check(self, task):
288 # return os.access(self.device + "part1", os.F_OK)
290 #class CreatePartitionTask(Task):
291 # def __init__(self, device):
292 # Task.__init__(self, _("Create Partition"))
293 # self.device = device
294 # self.setTool("/sbin/sfdisk")
295 # self.args += ["-f", self.device + "disc"]
296 # self.initial_input = "0,\n;\n;\n;\ny\n"
297 # self.postconditions.append(PartitionExistsPostcondition(self.device))
299 #class CreateFilesystemTask(Task):
300 # def __init__(self, device, partition = 1, largefile = True):
301 # Task.__init__(self, _("Create Filesystem"))
302 # self.setTool("/sbin/mkfs.ext")
304 # self.args += ["-T", "largefile"]
305 # self.args.append("-m0")
306 # self.args.append(device + "part%d" % partition)
308 #class FilesystemMountTask(Task):
309 # def __init__(self, device, partition = 1, filesystem = "ext3"):
310 # Task.__init__(self, _("Mounting Filesystem"))
311 # self.setTool("/bin/mount")
312 # if filesystem is not None:
313 # self.args += ["-t", filesystem]
314 # self.args.append(device + "part%d" % partition)
319 def getErrorMessage(self, task):
320 return _("An error has occured. (%s)") % (self.__class__.__name__)
322 class WorkspaceExistsPrecondition(Condition):
323 def check(self, task):
324 return os.access(task.job.workspace, os.W_OK)
326 class DiskspacePrecondition(Condition):
327 def __init__(self, diskspace_required):
328 self.diskspace_required = diskspace_required
329 self.diskspace_available = 0
331 def check(self, task):
334 s = os.statvfs(task.job.workspace)
335 self.diskspace_available = s.f_bsize * s.f_bavail
336 return self.diskspace_available >= self.diskspace_required
340 def getErrorMessage(self, task):
341 return _("Not enough diskspace. Please free up some diskspace and try again. (%d MB required, %d MB available)") % (self.diskspace_required / 1024 / 1024, self.diskspace_available / 1024 / 1024)
343 class ToolExistsPrecondition(Condition):
344 def check(self, task):
349 realpath = task.cwd + '/' + task.cmd
350 self.realpath = realpath
351 return os.access(realpath, os.X_OK)
353 def getErrorMessage(self, task):
354 return _("A required tool (%s) was not found.") % (self.realpath)
356 class AbortedPostcondition(Condition):
359 class ReturncodePostcondition(Condition):
360 def check(self, task):
361 return task.returncode == 0
363 #class HDDInitJob(Job):
364 # def __init__(self, device):
365 # Job.__init__(self, _("Initialize Harddisk"))
366 # self.device = device
367 # self.fromDescription(self.createDescription())
369 # def fromDescription(self, description):
370 # self.device = description["device"]
371 # self.addTask(CreatePartitionTask(self.device))
372 # self.addTask(CreateFilesystemTask(self.device))
373 # self.addTask(FilesystemMountTask(self.device))
375 # def createDescription(self):
376 # return {"device": self.device}
378 job_manager = JobManager()