d69d9167288aec9e6a8d820462dc90277661dfee
[vuplus_bitbake] / lib / bb / runqueue.py
1 #!/usr/bin/env python
2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4 """
5 BitBake 'RunQueue' implementation
6
7 Handles preparation and execution of a queue of tasks
8 """
9
10 # Copyright (C) 2006  Richard Purdie
11 #
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License version 2 as
14 # published by the Free Software Foundation.
15 #
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 # GNU General Public License for more details.
20 #
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25 from bb import msg, data, fetch, event, mkdirhier, utils
26 from sets import Set 
27 import bb, os, sys
28
29 class TaskFailure(Exception):
30     """Exception raised when a task in a runqueue fails"""
31     def __init__(self, x): 
32         self.args = x
33
34 class RunQueue:
35     """
36     BitBake Run Queue implementation
37     """
38     def __init__(self):
39         self.reset_runqueue()
40
41     def reset_runqueue(self):
42         self.runq_fnid = []
43         self.runq_task = []
44         self.runq_depends = []
45         self.runq_revdeps = []
46         self.runq_weight = []
47         self.prio_map = []
48
49     def get_user_idstring(self, task, taskData):
50         fn = taskData.fn_index[self.runq_fnid[task]]
51         taskname = self.runq_task[task]
52         return "%s, %s" % (fn, taskname)
53
54     def prepare_runqueue(self, cfgData, dataCache, taskData, targets):
55         """
56         Turn a set of taskData into a RunQueue and compute data needed 
57         to optimise the execution order.
58         targets is list of paired values - a provider name and the task to run
59         """
60
61         depends = []
62         runq_weight1 = []
63         runq_build = []
64         runq_done = []
65
66         bb.msg.note(1, bb.msg.domain.RunQueue, "Preparing Runqueue")
67
68         for task in range(len(taskData.tasks_name)):
69             fnid = taskData.tasks_fnid[task]
70             fn = taskData.fn_index[fnid]
71             task_deps = dataCache.task_deps[fn]
72
73             if fnid not in taskData.failed_fnids:
74
75                 depends = taskData.tasks_tdepends[task]
76
77                 # Resolve Depends
78                 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
79                     taskname = task_deps['deptask'][taskData.tasks_name[task]]
80                     for depid in taskData.depids[fnid]:
81                         if depid in taskData.build_targets:
82                             depdata = taskData.build_targets[depid][0]
83                             if depdata:
84                                 dep = taskData.fn_index[depdata]
85                                 depends.append(taskData.gettask_id(dep, taskname))
86
87                 # Resolve Runtime Depends
88                 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
89                     taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
90                     for depid in taskData.rdepids[fnid]:
91                         if depid in taskData.run_targets:
92                             depdata = taskData.run_targets[depid][0]
93                             if depdata:
94                                 dep = taskData.fn_index[depdata]
95                                 depends.append(taskData.gettask_id(dep, taskname))
96
97                 def add_recursive_build(depid):
98                     """
99                     Add build depends of depid to depends
100                     (if we've not see it before)
101                     (calls itself recursively)
102                     """
103                     if str(depid) in dep_seen:
104                         return
105                     dep_seen.append(depid)
106                     if depid in taskData.build_targets:
107                         depdata = taskData.build_targets[depid][0]
108                         if depdata:
109                             dep = taskData.fn_index[depdata]
110                             taskid = taskData.gettask_id(dep, taskname)
111                             depends.append(taskid)
112                             fnid = taskData.tasks_fnid[taskid]
113                             for nextdepid in taskData.depids[fnid]:
114                                 if nextdepid not in dep_seen:
115                                     add_recursive_build(nextdepid)
116                             for nextdepid in taskData.rdepids[fnid]:
117                                 if nextdepid not in rdep_seen:
118                                     add_recursive_run(nextdepid)
119
120                 def add_recursive_run(rdepid):
121                     """
122                     Add runtime depends of rdepid to depends
123                     (if we've not see it before)
124                     (calls itself recursively)
125                     """
126                     if str(rdepid) in rdep_seen:
127                         return
128                     rdep_seen.append(rdepid)
129                     if rdepid in taskData.run_targets:
130                         depdata = taskData.run_targets[rdepid][0]
131                         if depdata:
132                             dep = taskData.fn_index[depdata]
133                             taskid = taskData.gettask_id(dep, taskname)
134                             depends.append(taskid)
135                             fnid = taskData.tasks_fnid[taskid]
136                             for nextdepid in taskData.depids[fnid]:
137                                 if nextdepid not in dep_seen:
138                                     add_recursive_build(nextdepid)
139                             for nextdepid in taskData.rdepids[fnid]:
140                                 if nextdepid not in rdep_seen:
141                                     add_recursive_run(nextdepid)
142
143
144                 # Resolve Recursive Runtime Depends
145                 # Also includes all Build Depends (and their runtime depends)
146                 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
147                     dep_seen = []
148                     rdep_seen = []
149                     taskname = task_deps['recrdeptask'][taskData.tasks_name[task]]
150                     for depid in taskData.depids[fnid]:
151                         add_recursive_build(depid)
152                     for rdepid in taskData.rdepids[fnid]:
153                         add_recursive_run(rdepid)
154
155                 #Prune self references
156                 if task in depends:
157                     newdep = []
158                     bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s %s) contains self reference! %s" % (task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends))
159                     for dep in depends:
160                        if task != dep:
161                           newdep.append(dep)
162                     depends = newdep
163
164
165             self.runq_fnid.append(taskData.tasks_fnid[task])
166             self.runq_task.append(taskData.tasks_name[task])
167             self.runq_depends.append(Set(depends))
168             self.runq_revdeps.append(Set())
169             self.runq_weight.append(0)
170
171             runq_weight1.append(0)
172             runq_build.append(0)
173             runq_done.append(0)
174
175         bb.msg.note(2, bb.msg.domain.RunQueue, "Marking Active Tasks")
176
177         def mark_active(listid, depth):
178             """
179             Mark an item as active along with its depends
180             (calls itself recursively)
181             """
182
183             if runq_build[listid] == 1:
184                 return
185
186             runq_build[listid] = 1
187
188             depends = self.runq_depends[listid]
189             for depend in depends:
190                 mark_active(depend, depth+1)
191
192         for target in targets:
193             targetid = taskData.getbuild_id(target[0])
194             if targetid in taskData.failed_deps:
195                 continue
196
197             if targetid not in taskData.build_targets:
198                 continue
199
200             fnid = taskData.build_targets[targetid][0]
201             if fnid in taskData.failed_fnids:
202                 continue
203
204             listid = taskData.tasks_lookup[fnid][target[1]]
205
206             mark_active(listid, 1)
207
208         # Prune inactive tasks
209         maps = []
210         delcount = 0
211         for listid in range(len(self.runq_fnid)):
212             if runq_build[listid-delcount] == 1:
213                 maps.append(listid-delcount)
214             else:
215                 del self.runq_fnid[listid-delcount]
216                 del self.runq_task[listid-delcount]
217                 del self.runq_depends[listid-delcount]
218                 del self.runq_weight[listid-delcount]
219                 del runq_weight1[listid-delcount]
220                 del runq_build[listid-delcount]
221                 del runq_done[listid-delcount]
222                 del self.runq_revdeps[listid-delcount]
223                 delcount = delcount + 1
224                 maps.append(-1)
225
226         if len(self.runq_fnid) == 0:
227             if not taskData.abort:
228                 bb.msg.note(1, bb.msg.domain.RunQueue, "All possible tasks have been run but build incomplete (--continue mode). See errors above for incomplete tasks.")
229                 return
230             bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.")
231
232         bb.msg.note(2, bb.msg.domain.RunQueue, "Pruned %s inactive tasks, %s left" % (delcount, len(self.runq_fnid)))
233
234         for listid in range(len(self.runq_fnid)):
235             newdeps = []
236             origdeps = self.runq_depends[listid]
237             for origdep in origdeps:
238                 if maps[origdep] == -1:
239                     bb.msg.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!")
240                 newdeps.append(maps[origdep])
241             self.runq_depends[listid] = Set(newdeps)
242
243         bb.msg.note(2, bb.msg.domain.RunQueue, "Assign Weightings")
244
245         for listid in range(len(self.runq_fnid)):
246             for dep in self.runq_depends[listid]:
247                 self.runq_revdeps[dep].add(listid)
248
249         endpoints = []
250         for listid in range(len(self.runq_fnid)):
251             revdeps = self.runq_revdeps[listid]
252             if len(revdeps) == 0:
253                 runq_done[listid] = 1
254                 self.runq_weight[listid] = 1
255                 endpoints.append(listid)
256             for dep in revdeps:
257                 if dep in self.runq_depends[listid]:
258                     #self.dump_data(taskData)
259                     bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep] , taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
260             runq_weight1[listid] = len(revdeps)
261
262         bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints))
263
264         while 1:
265             next_points = []
266             for listid in endpoints:
267                 for revdep in self.runq_depends[listid]:
268                     self.runq_weight[revdep] = self.runq_weight[revdep] + self.runq_weight[listid]
269                     runq_weight1[revdep] = runq_weight1[revdep] - 1
270                     if runq_weight1[revdep] == 0:
271                         next_points.append(revdep)
272                         runq_done[revdep] = 1
273             endpoints = next_points
274             if len(next_points) == 0:
275                 break           
276
277         # Sanity Checks
278         for task in range(len(self.runq_fnid)):
279             if runq_done[task] == 0:
280                 seen = []
281                 deps_seen = []
282                 def print_chain(taskid, finish):
283                     seen.append(taskid)
284                     for revdep in self.runq_revdeps[taskid]:
285                         if runq_done[revdep] == 0 and revdep not in seen and not finish:
286                             bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) (depends: %s)" % (revdep, self.get_user_idstring(revdep, taskData), self.runq_depends[revdep]))
287                             if revdep in deps_seen:
288                                 bb.msg.error(bb.msg.domain.RunQueue, "Chain ends at Task %s (%s)" % (revdep, self.get_user_idstring(revdep, taskData)))
289                                 finish = True
290                                 return
291                             for dep in self.runq_depends[revdep]:
292                                 deps_seen.append(dep)
293                             print_chain(revdep, finish)
294                 print_chain(task, False)
295                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) not processed!\nThis is probably a circular dependency (the chain might be printed above)." % (task, self.get_user_idstring(task, taskData)))
296             if runq_weight1[task] != 0:
297                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) count not zero!" % (task, self.get_user_idstring(task, taskData)))
298
299         # Make a weight sorted map
300         from copy import deepcopy
301
302         sortweight = deepcopy(self.runq_weight)
303         sortweight.sort()
304         copyweight = deepcopy(self.runq_weight)
305         self.prio_map = []
306
307         for weight in sortweight:
308             idx = copyweight.index(weight)
309             self.prio_map.append(idx)
310             copyweight[idx] = -1
311         self.prio_map.reverse()
312
313         #self.dump_data(taskData)
314
315     def execute_runqueue(self, cooker, cfgData, dataCache, taskData, runlist):
316         """
317         Run the tasks in a queue prepared by prepare_runqueue
318         Upon failure, optionally try to recover the build using any alternate providers
319         (if the abort on failure configuration option isn't set)
320         """
321
322         failures = 0
323         while 1:
324             failed_fnids = self.execute_runqueue_internal(cooker, cfgData, dataCache, taskData)
325             if len(failed_fnids) == 0:
326                 return failures
327             if taskData.abort:
328                 raise bb.runqueue.TaskFailure(failed_fnids)
329             for fnid in failed_fnids:
330                 #print "Failure: %s %s %s" % (fnid, taskData.fn_index[fnid],  self.runq_task[fnid])
331                 taskData.fail_fnid(fnid)
332                 failures = failures + 1
333             self.reset_runqueue()
334             self.prepare_runqueue(cfgData, dataCache, taskData, runlist)
335
336     def execute_runqueue_internal(self, cooker, cfgData, dataCache, taskData):
337         """
338         Run the tasks in a queue prepared by prepare_runqueue
339         """
340         import signal
341
342         bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
343
344         runq_buildable = []
345         runq_running = []
346         runq_complete = []
347         active_builds = 0
348         build_pids = {}
349         failed_fnids = []
350
351         if len(self.runq_fnid) == 0:
352             # nothing to do
353             return
354
355         def sigint_handler(signum, frame):
356             raise KeyboardInterrupt
357
358         def get_next_task(data):
359             """
360             Return the id of the highest priority task that is buildable
361             """
362             for task1 in range(len(data.runq_fnid)):
363                 task = data.prio_map[task1]
364                 if runq_running[task] == 1:
365                     continue
366                 if runq_buildable[task] == 1:
367                     return task
368             return None
369
370         def task_complete(data, task):
371             """
372             Mark a task as completed
373             Look at the reverse dependencies and mark any task with 
374             completed dependencies as buildable
375             """
376             runq_complete[task] = 1
377             for revdep in data.runq_revdeps[task]:
378                 if runq_running[revdep] == 1:
379                     continue
380                 if runq_buildable[revdep] == 1:
381                     continue
382                 alldeps = 1
383                 for dep in data.runq_depends[revdep]:
384                     if runq_complete[dep] != 1:
385                         alldeps = 0
386                 if alldeps == 1:
387                     runq_buildable[revdep] = 1
388                     fn = taskData.fn_index[self.runq_fnid[revdep]]
389                     taskname = self.runq_task[revdep]
390                     bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname))
391
392         # Mark initial buildable tasks
393         for task in range(len(self.runq_fnid)):
394             runq_running.append(0)
395             runq_complete.append(0)
396             if len(self.runq_depends[task]) == 0:
397                 runq_buildable.append(1)
398             else:
399                 runq_buildable.append(0)
400
401
402         number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData) or 1)
403
404         try:
405             while 1:
406                 task = get_next_task(self)
407                 if task is not None:
408                     fn = taskData.fn_index[self.runq_fnid[task]]
409                     taskname = self.runq_task[task]
410
411                     if bb.build.stamp_is_current_cache(dataCache, fn, taskname):
412                         targetid = taskData.gettask_id(fn, taskname)
413                         if not (targetid in taskData.external_targets and cooker.configuration.force):
414                             bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task, taskData)))
415                             runq_running[task] = 1
416                             task_complete(self, task)
417                             continue
418
419                     bb.msg.debug(1, bb.msg.domain.RunQueue, "Running task %s (%s)" % (task, self.get_user_idstring(task, taskData)))
420                     try: 
421                         pid = os.fork() 
422                     except OSError, e: 
423                         bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
424                     if pid == 0:
425                         # Bypass finally below
426                         active_builds = 0 
427                         # Stop Ctrl+C being sent to children
428                         # signal.signal(signal.SIGINT, signal.SIG_IGN)
429                         # Make the child the process group leader
430                         os.setpgid(0, 0)
431                         sys.stdin = open('/dev/null', 'r')
432                         cooker.configuration.cmd = taskname[3:]
433                         try: 
434                             cooker.tryBuild(fn, False)
435                         except bb.build.EventException:
436                             bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
437                             sys.exit(1)
438                         except:
439                             bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
440                             raise
441                         sys.exit(0)
442                     build_pids[pid] = task
443                     runq_running[task] = 1
444                     active_builds = active_builds + 1
445                     if active_builds < number_tasks:
446                         continue
447                 if active_builds > 0:
448                     result = os.waitpid(-1, 0)
449                     active_builds = active_builds - 1
450                     task = build_pids[result[0]]
451                     if result[1] != 0:
452                         del build_pids[result[0]]
453                         bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task, taskData)))
454                         failed_fnids.append(self.runq_fnid[task])
455                         break
456                     task_complete(self, task)
457                     del build_pids[result[0]]
458                     continue
459                 break
460         finally:
461             try:
462                 while active_builds > 0:
463                     bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % active_builds)
464                     tasknum = 1
465                     for k, v in build_pids.iteritems():
466                          bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v, taskData), k))
467                          tasknum = tasknum + 1
468                     result = os.waitpid(-1, 0)
469                     task = build_pids[result[0]]
470                     if result[1] != 0:
471                          bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task, taskData)))
472                          failed_fnids.append(self.runq_fnid[task])
473                     del build_pids[result[0]]
474                     active_builds = active_builds - 1
475                 if len(failed_fnids) > 0:
476                     return failed_fnids
477             except:
478                 bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % active_builds)
479                 for k, v in build_pids.iteritems():
480                      os.kill(-k, signal.SIGINT)
481                 raise
482
483         # Sanity Checks
484         for task in range(len(self.runq_fnid)):
485             if runq_buildable[task] == 0:
486                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task)
487             if runq_running[task] == 0:
488                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task)
489             if runq_complete[task] == 0:
490                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task)
491
492         return failed_fnids
493
494     def dump_data(self, taskQueue):
495         """
496         Dump some debug information on the internal data structures
497         """
498         bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
499         for task in range(len(self.runq_fnid)):
500                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
501                         taskQueue.fn_index[self.runq_fnid[task]], 
502                         self.runq_task[task], 
503                         self.runq_weight[task], 
504                         self.runq_depends[task], 
505                         self.runq_revdeps[task]))
506
507         bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
508         for task1 in range(len(self.runq_fnid)):
509             if task1 in self.prio_map:
510                 task = self.prio_map[task1]
511                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
512                         taskQueue.fn_index[self.runq_fnid[task]], 
513                         self.runq_task[task], 
514                         self.runq_weight[task], 
515                         self.runq_depends[task], 
516                         self.runq_revdeps[task]))