JobView/HTTPProgressDownloader/NFIFlash correctly handle cancelling downloads
[vuplus_dvbapp] / lib / python / Components / Task.py
1 # A Job consists of many "Tasks".
2 # A task is the run of an external tool, with proper methods for failure handling
3
4 from Tools.CList import CList
5
6 class Job(object):
7         NOT_STARTED, IN_PROGRESS, FINISHED, FAILED = range(4)
8         def __init__(self, name):
9                 self.tasks = [ ]
10                 self.resident_tasks = [ ]
11                 self.workspace = "/tmp"
12                 self.current_task = 0
13                 self.callback = None
14                 self.name = name
15                 self.finished = False
16                 self.end = 100
17                 self.__progress = 0
18                 self.weightScale = 1
19                 self.afterEvent = None
20
21                 self.state_changed = CList()
22
23                 self.status = self.NOT_STARTED
24
25         # description is a dict
26         def fromDescription(self, description):
27                 pass
28
29         def createDescription(self):
30                 return None
31
32         def getProgress(self):
33                 if self.current_task == len(self.tasks):
34                         return self.end
35                 t = self.tasks[self.current_task]
36                 jobprogress = t.weighting * t.progress / float(t.end) + sum([task.weighting for task in self.tasks[:self.current_task]])
37                 return int(jobprogress*self.weightScale)
38
39         progress = property(getProgress)
40
41         def getStatustext(self):
42                 return { self.NOT_STARTED: _("Waiting"), self.IN_PROGRESS: _("In Progress"), self.FINISHED: _("Finished"), self.FAILED: _("Failed") }[self.status]
43
44         def task_progress_changed_CB(self):
45                 self.state_changed()
46
47         def addTask(self, task):
48                 task.job = self
49                 task.task_progress_changed = self.task_progress_changed_CB
50                 self.tasks.append(task)
51
52         def start(self, callback):
53                 assert self.callback is None
54                 self.callback = callback
55                 self.restart()
56
57         def restart(self):
58                 self.status = self.IN_PROGRESS
59                 self.state_changed()
60                 self.runNext()
61                 sumTaskWeightings = sum([t.weighting for t in self.tasks]) or 1
62                 self.weightScale = self.end / float(sumTaskWeightings)
63
64         def runNext(self):
65                 if self.current_task == len(self.tasks):
66                         if len(self.resident_tasks) == 0:
67                                 self.status = self.FINISHED
68                                 self.state_changed()
69                                 self.callback(self, None, [])
70                                 self.callback = None
71                         else:
72                                 print "still waiting for %d resident task(s) %s to finish" % (len(self.resident_tasks), str(self.resident_tasks))
73                 else:
74                         self.tasks[self.current_task].run(self.taskCallback)
75                         self.state_changed()
76
77         def taskCallback(self, task, res, stay_resident = False):
78                 cb_idx = self.tasks.index(task)
79                 if stay_resident:
80                         if cb_idx not in self.resident_tasks:
81                                 self.resident_tasks.append(self.current_task)
82                                 print "task going resident:", task
83                         else:
84                                 print "task keeps staying resident:", task
85                                 return
86                 if len(res):
87                         print ">>> Error:", res
88                         self.status = self.FAILED
89                         self.state_changed()
90                         self.callback(self, task, res)
91                 if cb_idx != self.current_task:
92                         if cb_idx in self.resident_tasks:
93                                 print "resident task finished:", task
94                                 self.resident_tasks.remove(cb_idx)
95                 if res == []:
96                         self.state_changed()
97                         self.current_task += 1
98                         self.runNext()
99
100         def retry(self):
101                 assert self.status == self.FAILED
102                 self.restart()
103
104         def abort(self):
105                 if self.current_task < len(self.tasks):
106                         self.tasks[self.current_task].abort()
107                 for i in self.resident_tasks:
108                         self.tasks[i].abort()
109
110         def cancel(self):
111                 # some Jobs might have a better idea of how to cancel a job
112                 self.abort()
113
114 class Task(object):
115         def __init__(self, job, name):
116                 self.name = name
117                 self.immediate_preconditions = [ ]
118                 self.global_preconditions = [ ]
119                 self.postconditions = [ ]
120                 self.returncode = None
121                 self.initial_input = None
122                 self.job = None
123
124                 self.end = 100
125                 self.weighting = 100
126                 self.__progress = 0
127                 self.cmd = None
128                 self.cwd = "/tmp"
129                 self.args = [ ]
130                 self.cmdline = None
131                 self.task_progress_changed = None
132                 self.output_line = ""
133                 job.addTask(self)
134                 self.container = None
135
136         def setCommandline(self, cmd, args):
137                 self.cmd = cmd
138                 self.args = args
139
140         def setTool(self, tool):
141                 self.cmd = tool
142                 self.args = [tool]
143                 self.global_preconditions.append(ToolExistsPrecondition())
144                 self.postconditions.append(ReturncodePostcondition())
145
146         def setCmdline(self, cmdline):
147                 self.cmdline = cmdline
148
149         def checkPreconditions(self, immediate = False):
150                 not_met = [ ]
151                 if immediate:
152                         preconditions = self.immediate_preconditions
153                 else:
154                         preconditions = self.global_preconditions
155                 for precondition in preconditions:
156                         if not precondition.check(self):
157                                 not_met.append(precondition)
158                 return not_met
159
160         def run(self, callback):
161                 failed_preconditions = self.checkPreconditions(True) + self.checkPreconditions(False)
162                 if len(failed_preconditions):
163                         callback(self, failed_preconditions)
164                         return
165                 self.prepare()
166
167                 self.callback = callback
168                 from enigma import eConsoleAppContainer
169                 self.container = eConsoleAppContainer()
170                 self.container.appClosed.append(self.processFinished)
171                 self.container.stdoutAvail.append(self.processStdout)
172                 self.container.stderrAvail.append(self.processStderr)
173
174                 if self.cwd is not None:
175                         self.container.setCWD(self.cwd)
176
177                 if not self.cmd and self.cmdline:
178                         print "execute:", self.container.execute(self.cmdline), self.cmdline
179                 else:
180                         assert self.cmd is not None
181                         assert len(self.args) >= 1
182                         print "execute:", self.container.execute(self.cmd, *self.args), ' '.join(self.args)
183                 if self.initial_input:
184                         self.writeInput(self.initial_input)
185
186         def prepare(self):
187                 pass
188
189         def cleanup(self, failed):
190                 pass
191         
192         def processStdout(self, data):
193                 self.processOutput(data)
194                 
195         def processStderr(self, data):
196                 self.processOutput(data)
197
198         def processOutput(self, data):
199                 self.output_line += data
200                 while True:
201                         i = self.output_line.find('\n')
202                         if i == -1:
203                                 break
204                         self.processOutputLine(self.output_line[:i+1])
205                         self.output_line = self.output_line[i+1:]
206
207         def processOutputLine(self, line):
208                 pass
209
210         def processFinished(self, returncode):
211                 self.returncode = returncode
212                 self.finish()
213
214         def abort(self):
215                 if self.container:
216                         self.container.kill()
217                 self.finish(aborted = True)
218
219         def finish(self, aborted = False):
220                 self.afterRun()
221                 not_met = [ ]
222                 if aborted:
223                         not_met.append(AbortedPostcondition())
224                 else:
225                         for postcondition in self.postconditions:
226                                 if not postcondition.check(self):
227                                         not_met.append(postcondition)
228                 self.cleanup(not_met)
229                 self.callback(self, not_met)
230
231         def afterRun(self):
232                 pass
233
234         def writeInput(self, input):
235                 self.container.write(input)
236
237         def getProgress(self):
238                 return self.__progress
239
240         def setProgress(self, progress):
241                 if progress > self.end:
242                         progress = self.end
243                 if progress < 0:
244                         progress = 0
245                 self.__progress = progress
246                 if self.task_progress_changed:
247                         self.task_progress_changed()
248
249         progress = property(getProgress, setProgress)
250
251 # The jobmanager will execute multiple jobs, each after another.
252 # later, it will also support suspending jobs (and continuing them after reboot etc)
253 # It also supports a notification when some error occured, and possibly a retry.
254 class JobManager:
255         def __init__(self):
256                 self.active_jobs = [ ]
257                 self.failed_jobs = [ ]
258                 self.job_classes = [ ]
259                 self.in_background = False
260                 self.active_job = None
261
262         def AddJob(self, job):
263                 self.active_jobs.append(job)
264                 self.kick()
265
266         def kick(self):
267                 if self.active_job is None:
268                         if len(self.active_jobs):
269                                 self.active_job = self.active_jobs.pop(0)
270                                 self.active_job.start(self.jobDone)
271
272         def jobDone(self, job, task, problems):
273                 print "job", job, "completed with", problems, "in", task
274                 from Tools import Notifications
275                 if self.in_background:
276                         from Screens.TaskView import JobView
277                         self.in_background = False
278                         Notifications.AddNotification(JobView, self.active_job)
279                 if problems:
280                         from Screens.MessageBox import MessageBox
281                         if problems[0].RECOVERABLE:
282                                 Notifications.AddNotificationWithCallback(self.errorCB, MessageBox, _("Error: %s\nRetry?") % (problems[0].getErrorMessage(task)))
283                         else:
284                                 Notifications.AddNotification(MessageBox, _("Error") + (': %s') % (problems[0].getErrorMessage(task)), type = MessageBox.TYPE_ERROR )
285                                 self.errorCB(False)
286                         return
287                         #self.failed_jobs.append(self.active_job)
288
289                 self.active_job = None
290                 self.kick()
291
292         def errorCB(self, answer):
293                 if answer:
294                         print "retrying job"
295                         self.active_job.retry()
296                 else:
297                         print "not retrying job."
298                         self.failed_jobs.append(self.active_job)
299                         self.active_job = None
300                         self.kick()
301
302         def getPendingJobs(self):
303                 list = [ ]
304                 if self.active_job:
305                         list.append(self.active_job)
306                 list += self.active_jobs
307                 return list
308 # some examples:
309 #class PartitionExistsPostcondition:
310 #       def __init__(self, device):
311 #               self.device = device
312 #
313 #       def check(self, task):
314 #               import os
315 #               return os.access(self.device + "part1", os.F_OK)
316 #
317 #class CreatePartitionTask(Task):
318 #       def __init__(self, device):
319 #               Task.__init__(self, _("Create Partition"))
320 #               self.device = device
321 #               self.setTool("/sbin/sfdisk")
322 #               self.args += ["-f", self.device + "disc"]
323 #               self.initial_input = "0,\n;\n;\n;\ny\n"
324 #               self.postconditions.append(PartitionExistsPostcondition(self.device))
325 #
326 #class CreateFilesystemTask(Task):
327 #       def __init__(self, device, partition = 1, largefile = True):
328 #               Task.__init__(self, _("Create Filesystem"))
329 #               self.setTool("/sbin/mkfs.ext")
330 #               if largefile:
331 #                       self.args += ["-T", "largefile"]
332 #               self.args.append("-m0")
333 #               self.args.append(device + "part%d" % partition)
334 #
335 #class FilesystemMountTask(Task):
336 #       def __init__(self, device, partition = 1, filesystem = "ext3"):
337 #               Task.__init__(self, _("Mounting Filesystem"))
338 #               self.setTool("/bin/mount")
339 #               if filesystem is not None:
340 #                       self.args += ["-t", filesystem]
341 #               self.args.append(device + "part%d" % partition)
342
343 class Condition:
344         RECOVERABLE = False
345
346         def getErrorMessage(self, task):
347                 return _("An unknown error occured!") + " (%s @ task %s)" % (self.__class__.__name__, task.__class__.__name__)
348
349 class WorkspaceExistsPrecondition(Condition):
350         def check(self, task):
351                 return os.access(task.job.workspace, os.W_OK)
352
353 class DiskspacePrecondition(Condition):
354         def __init__(self, diskspace_required):
355                 self.diskspace_required = diskspace_required
356                 self.diskspace_available = 0
357
358         def check(self, task):
359                 import os
360                 try:
361                         s = os.statvfs(task.job.workspace)
362                         self.diskspace_available = s.f_bsize * s.f_bavail
363                         return self.diskspace_available >= self.diskspace_required
364                 except OSError:
365                         return False
366
367         def getErrorMessage(self, task):
368                 return _("Not enough diskspace. Please free up some diskspace and try again. (%d MB required, %d MB available)") % (self.diskspace_required / 1024 / 1024, self.diskspace_available / 1024 / 1024)
369
370 class ToolExistsPrecondition(Condition):
371         def check(self, task):
372                 import os
373                 
374                 if task.cmd[0]=='/':
375                         self.realpath = task.cmd
376                         print "[Task.py][ToolExistsPrecondition] WARNING: usage of absolute paths for tasks should be avoided!" 
377                         return os.access(self.realpath, os.X_OK)
378                 else:
379                         self.realpath = task.cmd
380                         path = os.environ.get('PATH', '').split(os.pathsep)
381                         path.append(task.cwd + '/')
382                         absolutes = filter(lambda file: os.access(file, os.X_OK), map(lambda directory, file = task.cmd: os.path.join(directory, file), path))
383                         if len(absolutes) > 0:
384                                 self.realpath = task.cmd[0]
385                                 return True
386                 return False 
387
388         def getErrorMessage(self, task):
389                 return _("A required tool (%s) was not found.") % (self.realpath)
390
391 class AbortedPostcondition(Condition):
392         def getErrorMessage(self, task):
393                 return "Cancelled upon user request"
394
395 class ReturncodePostcondition(Condition):
396         def check(self, task):
397                 return task.returncode == 0
398
399 #class HDDInitJob(Job):
400 #       def __init__(self, device):
401 #               Job.__init__(self, _("Initialize Harddisk"))
402 #               self.device = device
403 #               self.fromDescription(self.createDescription())
404 #
405 #       def fromDescription(self, description):
406 #               self.device = description["device"]
407 #               self.addTask(CreatePartitionTask(self.device))
408 #               self.addTask(CreateFilesystemTask(self.device))
409 #               self.addTask(FilesystemMountTask(self.device))
410 #
411 #       def createDescription(self):
412 #               return {"device": self.device}
413
414 job_manager = JobManager()