2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
5 BitBake 'RunQueue' implementation
7 Handles preparation and execution of a queue of tasks
10 # Copyright (C) 2006-2007 Richard Purdie
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License version 2 as
14 # published by the Free Software Foundation.
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 # GNU General Public License for more details.
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
25 from bb import msg, data, event, mkdirhier, utils
30 class TaskFailure(Exception):
31 """Exception raised when a task in a runqueue fails"""
32 def __init__(self, x):
38 Holds statistics on the tasks handled by the associated runQueue
46 self.failed = self.failed + 1
48 def taskCompleted(self):
49 self.completed = self.completed + 1
51 def taskSkipped(self):
52 self.skipped = self.skipped + 1
56 BitBake Run Queue implementation
58 def __init__(self, cooker, cfgData, dataCache, taskData, targets):
61 self.dataCache = dataCache
62 self.taskData = taskData
63 self.targets = targets
65 self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData) or 1)
66 self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData) or "").split()
68 def reset_runqueue(self):
72 self.runq_depends = []
73 self.runq_revdeps = []
77 def get_user_idstring(self, task):
78 fn = self.taskData.fn_index[self.runq_fnid[task]]
79 taskname = self.runq_task[task]
80 return "%s, %s" % (fn, taskname)
82 def prepare_runqueue(self):
84 Turn a set of taskData into a RunQueue and compute data needed
85 to optimise the execution order.
93 taskData = self.taskData
95 if len(taskData.tasks_name) == 0:
99 bb.msg.note(1, bb.msg.domain.RunQueue, "Preparing runqueue")
101 for task in range(len(taskData.tasks_name)):
102 fnid = taskData.tasks_fnid[task]
103 fn = taskData.fn_index[fnid]
104 task_deps = self.dataCache.task_deps[fn]
106 if fnid not in taskData.failed_fnids:
108 depends = taskData.tasks_tdepends[task]
111 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
112 tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
113 for depid in taskData.depids[fnid]:
114 # Won't be in build_targets if ASSUME_PROVIDED
115 if depid in taskData.build_targets:
116 depdata = taskData.build_targets[depid][0]
117 if depdata is not None:
118 dep = taskData.fn_index[depdata]
119 for taskname in tasknames:
120 depends.append(taskData.gettask_id(dep, taskname))
122 # Resolve Runtime Depends
123 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
124 taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
125 for depid in taskData.rdepids[fnid]:
126 if depid in taskData.run_targets:
127 depdata = taskData.run_targets[depid][0]
128 if depdata is not None:
129 dep = taskData.fn_index[depdata]
130 depends.append(taskData.gettask_id(dep, taskname))
132 idepends = taskData.tasks_idepends[task]
133 for idepend in idepends:
134 depid = int(idepend.split(":")[0])
135 if depid in taskData.build_targets:
136 # Won't be in build_targets if ASSUME_PROVIDED
137 depdata = taskData.build_targets[depid][0]
138 if depdata is not None:
139 dep = taskData.fn_index[depdata]
140 depends.append(taskData.gettask_id(dep, idepend.split(":")[1]))
142 def add_recursive_build(depid, depfnid):
144 Add build depends of depid to depends
145 (if we've not see it before)
146 (calls itself recursively)
148 if str(depid) in dep_seen:
150 dep_seen.append(depid)
151 if depid in taskData.build_targets:
152 depdata = taskData.build_targets[depid][0]
153 if depdata is not None:
154 dep = taskData.fn_index[depdata]
156 # Need to avoid creating new tasks here
157 taskid = taskData.gettask_id(dep, taskname, False)
158 if taskid is not None:
159 depends.append(taskid)
160 fnid = taskData.tasks_fnid[taskid]
161 idepends = taskData.tasks_idepends[taskid]
162 #print "Added %s (%s) due to %s" % (taskid, taskData.fn_index[fnid], taskData.fn_index[depfnid])
164 fnid = taskData.getfn_id(dep)
165 for nextdepid in taskData.depids[fnid]:
166 if nextdepid not in dep_seen:
167 add_recursive_build(nextdepid, fnid)
168 for nextdepid in taskData.rdepids[fnid]:
169 if nextdepid not in rdep_seen:
170 add_recursive_run(nextdepid, fnid)
171 for idepend in idepends:
172 nextdepid = int(idepend.split(":")[0])
173 if nextdepid not in dep_seen:
174 add_recursive_build(nextdepid, fnid)
176 def add_recursive_run(rdepid, depfnid):
178 Add runtime depends of rdepid to depends
179 (if we've not see it before)
180 (calls itself recursively)
182 if str(rdepid) in rdep_seen:
184 rdep_seen.append(rdepid)
185 if rdepid in taskData.run_targets:
186 depdata = taskData.run_targets[rdepid][0]
187 if depdata is not None:
188 dep = taskData.fn_index[depdata]
190 # Need to avoid creating new tasks here
191 taskid = taskData.gettask_id(dep, taskname, False)
192 if taskid is not None:
193 depends.append(taskid)
194 fnid = taskData.tasks_fnid[taskid]
195 idepends = taskData.tasks_idepends[taskid]
196 #print "Added %s (%s) due to %s" % (taskid, taskData.fn_index[fnid], taskData.fn_index[depfnid])
198 fnid = taskData.getfn_id(dep)
199 for nextdepid in taskData.depids[fnid]:
200 if nextdepid not in dep_seen:
201 add_recursive_build(nextdepid, fnid)
202 for nextdepid in taskData.rdepids[fnid]:
203 if nextdepid not in rdep_seen:
204 add_recursive_run(nextdepid, fnid)
205 for idepend in idepends:
206 nextdepid = int(idepend.split(":")[0])
207 if nextdepid not in dep_seen:
208 add_recursive_build(nextdepid, fnid)
211 # Resolve Recursive Runtime Depends
212 # Also includes all thier build depends, intertask depends and runtime depends
213 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
214 for taskname in task_deps['recrdeptask'][taskData.tasks_name[task]].split():
218 for depid in taskData.depids[fnid]:
219 add_recursive_build(depid, fnid)
220 for rdepid in taskData.rdepids[fnid]:
221 add_recursive_run(rdepid, fnid)
222 for idepend in idepends:
223 depid = int(idepend.split(":")[0])
224 add_recursive_build(depid, fnid)
226 #Prune self references
229 bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s %s) contains self reference! %s" % (task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends))
236 self.runq_fnid.append(taskData.tasks_fnid[task])
237 self.runq_task.append(taskData.tasks_name[task])
238 self.runq_depends.append(Set(depends))
239 self.runq_revdeps.append(Set())
240 self.runq_weight.append(0)
242 runq_weight1.append(0)
246 bb.msg.note(2, bb.msg.domain.RunQueue, "Marking Active Tasks")
248 def mark_active(listid, depth):
250 Mark an item as active along with its depends
251 (calls itself recursively)
254 if runq_build[listid] == 1:
257 runq_build[listid] = 1
259 depends = self.runq_depends[listid]
260 for depend in depends:
261 mark_active(depend, depth+1)
263 for target in self.targets:
264 targetid = taskData.getbuild_id(target[0])
266 if targetid not in taskData.build_targets:
269 if targetid in taskData.failed_deps:
272 fnid = taskData.build_targets[targetid][0]
274 # Remove stamps for targets if force mode active
275 if self.cooker.configuration.force:
276 fn = taskData.fn_index[fnid]
277 bb.msg.note(2, bb.msg.domain.RunQueue, "Remove stamp %s, %s" % (target[1], fn))
278 bb.build.del_stamp(target[1], self.dataCache, fn)
280 if fnid in taskData.failed_fnids:
283 listid = taskData.tasks_lookup[fnid][target[1]]
285 mark_active(listid, 1)
287 # Prune inactive tasks
290 for listid in range(len(self.runq_fnid)):
291 if runq_build[listid-delcount] == 1:
292 maps.append(listid-delcount)
294 del self.runq_fnid[listid-delcount]
295 del self.runq_task[listid-delcount]
296 del self.runq_depends[listid-delcount]
297 del self.runq_weight[listid-delcount]
298 del runq_weight1[listid-delcount]
299 del runq_build[listid-delcount]
300 del runq_done[listid-delcount]
301 del self.runq_revdeps[listid-delcount]
302 delcount = delcount + 1
305 if len(self.runq_fnid) == 0:
306 if not taskData.abort:
307 bb.msg.note(1, bb.msg.domain.RunQueue, "All possible tasks have been run but build incomplete (--continue mode). See errors above for incomplete tasks.")
309 bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.")
311 bb.msg.note(2, bb.msg.domain.RunQueue, "Pruned %s inactive tasks, %s left" % (delcount, len(self.runq_fnid)))
313 for listid in range(len(self.runq_fnid)):
315 origdeps = self.runq_depends[listid]
316 for origdep in origdeps:
317 if maps[origdep] == -1:
318 bb.msg.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!")
319 newdeps.append(maps[origdep])
320 self.runq_depends[listid] = Set(newdeps)
322 bb.msg.note(2, bb.msg.domain.RunQueue, "Assign Weightings")
324 for listid in range(len(self.runq_fnid)):
325 for dep in self.runq_depends[listid]:
326 self.runq_revdeps[dep].add(listid)
329 for listid in range(len(self.runq_fnid)):
330 revdeps = self.runq_revdeps[listid]
331 if len(revdeps) == 0:
332 runq_done[listid] = 1
333 self.runq_weight[listid] = 1
334 endpoints.append(listid)
336 if dep in self.runq_depends[listid]:
337 #self.dump_data(taskData)
338 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep] , taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
339 runq_weight1[listid] = len(revdeps)
341 bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints))
345 for listid in endpoints:
346 for revdep in self.runq_depends[listid]:
347 self.runq_weight[revdep] = self.runq_weight[revdep] + self.runq_weight[listid]
348 runq_weight1[revdep] = runq_weight1[revdep] - 1
349 if runq_weight1[revdep] == 0:
350 next_points.append(revdep)
351 runq_done[revdep] = 1
352 endpoints = next_points
353 if len(next_points) == 0:
357 for task in range(len(self.runq_fnid)):
358 if runq_done[task] == 0:
361 def print_chain(taskid, finish):
363 for revdep in self.runq_revdeps[taskid]:
364 if runq_done[revdep] == 0 and revdep not in seen and not finish:
365 bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) (depends: %s)" % (revdep, self.get_user_idstring(revdep), self.runq_depends[revdep]))
366 if revdep in deps_seen:
367 bb.msg.error(bb.msg.domain.RunQueue, "Chain ends at Task %s (%s)" % (revdep, self.get_user_idstring(revdep)))
370 for dep in self.runq_depends[revdep]:
371 deps_seen.append(dep)
372 print_chain(revdep, finish)
373 print_chain(task, False)
374 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) not processed!\nThis is probably a circular dependency (the chain might be printed above)." % (task, self.get_user_idstring(task)))
375 if runq_weight1[task] != 0:
376 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) count not zero!" % (task, self.get_user_idstring(task)))
379 # Check for multiple tasks building the same provider
382 for task in range(len(self.runq_fnid)):
383 fn = taskData.fn_index[self.runq_fnid[task]]
387 for prov in self.dataCache.fn_provides[fn]:
388 if prov not in prov_list:
389 prov_list[prov] = [fn]
390 elif fn not in prov_list[prov]:
391 prov_list[prov].append(fn)
393 for prov in prov_list:
394 if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
396 bb.msg.error(bb.msg.domain.RunQueue, "Multiple files due to be built which all provide %s (%s)" % (prov, " ".join(prov_list[prov])))
398 # bb.msg.fatal(bb.msg.domain.RunQueue, "Corrupted metadata configuration detected, aborting...")
401 # Make a weight sorted map
402 from copy import deepcopy
404 sortweight = deepcopy(self.runq_weight)
406 copyweight = deepcopy(self.runq_weight)
409 for weight in sortweight:
410 idx = copyweight.index(weight)
411 self.prio_map.append(idx)
413 self.prio_map.reverse()
415 #self.dump_data(taskData)
417 def execute_runqueue(self):
419 Run the tasks in a queue prepared by prepare_runqueue
420 Upon failure, optionally try to recover the build using any alternate providers
421 (if the abort on failure configuration option isn't set)
428 self.execute_runqueue_internal()
430 if self.master_process:
431 failed_fnids = self.finish_runqueue()
432 if len(failed_fnids) == 0:
434 if self.taskData.abort:
435 raise bb.runqueue.TaskFailure(failed_fnids)
436 for fnid in failed_fnids:
437 #print "Failure: %s %s %s" % (fnid, self.taskData.fn_index[fnid], self.runq_task[fnid])
438 self.taskData.fail_fnid(fnid)
439 failures = failures + 1
440 self.reset_runqueue()
441 self.prepare_runqueue()
443 def execute_runqueue_initVars(self):
445 self.stats = RunQueueStats()
447 self.active_builds = 0
448 self.runq_buildable = []
449 self.runq_running = []
450 self.runq_complete = []
452 self.failed_fnids = []
453 self.master_process = True
455 # Mark initial buildable tasks
456 for task in range(len(self.runq_fnid)):
457 self.runq_running.append(0)
458 self.runq_complete.append(0)
459 if len(self.runq_depends[task]) == 0:
460 self.runq_buildable.append(1)
462 self.runq_buildable.append(0)
464 def task_complete(self, task):
466 Mark a task as completed
467 Look at the reverse dependencies and mark any task with
468 completed dependencies as buildable
470 self.runq_complete[task] = 1
471 for revdep in self.runq_revdeps[task]:
472 if self.runq_running[revdep] == 1:
474 if self.runq_buildable[revdep] == 1:
477 for dep in self.runq_depends[revdep]:
478 if self.runq_complete[dep] != 1:
481 self.runq_buildable[revdep] = 1
482 fn = self.taskData.fn_index[self.runq_fnid[revdep]]
483 taskname = self.runq_task[revdep]
484 bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname))
486 def get_next_task(self):
488 Return the id of the highest priority task that is buildable
490 for task1 in range(len(self.runq_fnid)):
491 task = self.prio_map[task1]
492 if self.runq_running[task] == 1:
494 if self.runq_buildable[task] == 1:
498 def execute_runqueue_internal(self):
500 Run the tasks in a queue prepared by prepare_runqueue
503 bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
505 self.execute_runqueue_initVars()
507 if len(self.runq_fnid) == 0:
511 def sigint_handler(signum, frame):
512 raise KeyboardInterrupt
514 # Find any tasks with current stamps and remove them from the queue
515 for task1 in range(len(self.runq_fnid)):
516 task = self.prio_map[task1]
517 fn = self.taskData.fn_index[self.runq_fnid[task]]
518 taskname = self.runq_task[task]
519 if bb.build.stamp_is_current(taskname, self.dataCache, fn):
520 bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task)))
521 self.runq_running[task] = 1
522 self.task_complete(task)
523 self.stats.taskCompleted()
524 self.stats.taskSkipped()
527 task = self.get_next_task()
529 fn = self.taskData.fn_index[self.runq_fnid[task]]
531 taskname = self.runq_task[task]
532 if bb.build.stamp_is_current(taskname, self.dataCache, fn):
533 bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task)))
534 self.runq_running[task] = 1
535 self.task_complete(task)
536 self.stats.taskCompleted()
537 self.stats.taskSkipped()
540 bb.msg.note(1, bb.msg.domain.RunQueue, "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.active_builds + 1, len(self.runq_fnid), task, self.get_user_idstring(task)))
544 bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
546 # Bypass master process' handling
547 self.master_process = False
548 # Stop Ctrl+C being sent to children
549 # signal.signal(signal.SIGINT, signal.SIG_IGN)
550 # Make the child the process group leader
552 newsi = os.open('/dev/null', os.O_RDWR)
553 os.dup2(newsi, sys.stdin.fileno())
554 self.cooker.configuration.cmd = taskname[3:]
556 self.cooker.tryBuild(fn, False)
557 except bb.build.EventException:
558 bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
561 bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
564 self.build_pids[pid] = task
565 self.runq_running[task] = 1
566 self.active_builds = self.active_builds + 1
567 if self.active_builds < self.number_tasks:
569 if self.active_builds > 0:
570 result = os.waitpid(-1, 0)
571 self.active_builds = self.active_builds - 1
572 task = self.build_pids[result[0]]
574 del self.build_pids[result[0]]
575 bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task)))
576 self.failed_fnids.append(self.runq_fnid[task])
577 self.stats.taskFailed()
579 self.task_complete(task)
580 self.stats.taskCompleted()
581 del self.build_pids[result[0]]
585 def finish_runqueue(self):
587 while self.active_builds > 0:
588 bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.active_builds)
590 for k, v in self.build_pids.iteritems():
591 bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v), k))
592 tasknum = tasknum + 1
593 result = os.waitpid(-1, 0)
594 task = self.build_pids[result[0]]
596 bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task)))
597 self.failed_fnids.append(self.runq_fnid[task])
598 self.stats.taskFailed()
599 del self.build_pids[result[0]]
600 self.active_builds = self.active_builds - 1
601 bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
602 return self.failed_fnids
603 except KeyboardInterrupt:
604 bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.active_builds)
605 for k, v in self.build_pids.iteritems():
607 os.kill(-k, signal.SIGINT)
613 for task in range(len(self.runq_fnid)):
614 if self.runq_buildable[task] == 0:
615 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task)
616 if self.runq_running[task] == 0:
617 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task)
618 if self.runq_complete[task] == 0:
619 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task)
621 bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
623 return self.failed_fnids
625 def dump_data(self, taskQueue):
627 Dump some debug information on the internal data structures
629 bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
630 for task in range(len(self.runq_fnid)):
631 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
632 taskQueue.fn_index[self.runq_fnid[task]],
633 self.runq_task[task],
634 self.runq_weight[task],
635 self.runq_depends[task],
636 self.runq_revdeps[task]))
638 bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
639 for task1 in range(len(self.runq_fnid)):
640 if task1 in self.prio_map:
641 task = self.prio_map[task1]
642 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
643 taskQueue.fn_index[self.runq_fnid[task]],
644 self.runq_task[task],
645 self.runq_weight[task],
646 self.runq_depends[task],
647 self.runq_revdeps[task]))