2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
5 BitBake 'RunQueue' implementation
7 Handles preparation and execution of a queue of tasks
10 # Copyright (C) 2006-2007 Richard Purdie
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License version 2 as
14 # published by the Free Software Foundation.
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 # GNU General Public License for more details.
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
25 from bb import msg, data, event, mkdirhier, utils
30 class TaskFailure(Exception):
31 """Exception raised when a task in a runqueue fails"""
32 def __init__(self, x):
38 Holds statistics on the tasks handled by the associated runQueue
46 self.failed = self.failed + 1
48 def taskCompleted(self):
49 self.completed = self.completed + 1
51 def taskSkipped(self):
52 self.skipped = self.skipped + 1
56 BitBake Run Queue implementation
58 def __init__(self, cooker, cfgData, dataCache, taskData, targets):
61 self.dataCache = dataCache
62 self.taskData = taskData
63 self.targets = targets
65 self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData) or 1)
66 self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData) or "").split()
68 def reset_runqueue(self):
72 self.runq_depends = []
73 self.runq_revdeps = []
77 def get_user_idstring(self, task):
78 fn = self.taskData.fn_index[self.runq_fnid[task]]
79 taskname = self.runq_task[task]
80 return "%s, %s" % (fn, taskname)
82 def prepare_runqueue(self):
84 Turn a set of taskData into a RunQueue and compute data needed
85 to optimise the execution order.
93 taskData = self.taskData
95 if len(taskData.tasks_name) == 0:
99 bb.msg.note(1, bb.msg.domain.RunQueue, "Preparing runqueue")
101 for task in range(len(taskData.tasks_name)):
102 fnid = taskData.tasks_fnid[task]
103 fn = taskData.fn_index[fnid]
104 task_deps = self.dataCache.task_deps[fn]
106 if fnid not in taskData.failed_fnids:
108 depends = taskData.tasks_tdepends[task]
111 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
112 taskname = task_deps['deptask'][taskData.tasks_name[task]]
113 for depid in taskData.depids[fnid]:
114 # Won't be in build_targets if ASSUME_PROVIDED
115 if depid in taskData.build_targets:
116 depdata = taskData.build_targets[depid][0]
117 if depdata is not None:
118 dep = taskData.fn_index[depdata]
119 depends.append(taskData.gettask_id(dep, taskname))
121 # Resolve Runtime Depends
122 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
123 taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
124 for depid in taskData.rdepids[fnid]:
125 if depid in taskData.run_targets:
126 depdata = taskData.run_targets[depid][0]
127 if depdata is not None:
128 dep = taskData.fn_index[depdata]
129 depends.append(taskData.gettask_id(dep, taskname))
131 idepends = taskData.tasks_idepends[task]
132 for idepend in idepends:
133 depid = int(idepend.split(":")[0])
134 if depid in taskData.build_targets:
135 # Won't be in build_targets if ASSUME_PROVIDED
136 depdata = taskData.build_targets[depid][0]
137 if depdata is not None:
138 dep = taskData.fn_index[depdata]
139 depends.append(taskData.gettask_id(dep, idepend.split(":")[1]))
141 def add_recursive_build(depid, depfnid):
143 Add build depends of depid to depends
144 (if we've not see it before)
145 (calls itself recursively)
147 if str(depid) in dep_seen:
149 dep_seen.append(depid)
150 if depid in taskData.build_targets:
151 depdata = taskData.build_targets[depid][0]
152 if depdata is not None:
153 dep = taskData.fn_index[depdata]
155 # Need to avoid creating new tasks here
156 taskid = taskData.gettask_id(dep, taskname, False)
157 if taskid is not None:
158 depends.append(taskid)
159 fnid = taskData.tasks_fnid[taskid]
160 idepends = taskData.tasks_idepends[taskid]
161 #print "Added %s (%s) due to %s" % (taskid, taskData.fn_index[fnid], taskData.fn_index[depfnid])
163 fnid = taskData.getfn_id(dep)
164 for nextdepid in taskData.depids[fnid]:
165 if nextdepid not in dep_seen:
166 add_recursive_build(nextdepid, fnid)
167 for nextdepid in taskData.rdepids[fnid]:
168 if nextdepid not in rdep_seen:
169 add_recursive_run(nextdepid, fnid)
170 for idepend in idepends:
171 nextdepid = int(idepend.split(":")[0])
172 if nextdepid not in dep_seen:
173 add_recursive_build(nextdepid, fnid)
175 def add_recursive_run(rdepid, depfnid):
177 Add runtime depends of rdepid to depends
178 (if we've not see it before)
179 (calls itself recursively)
181 if str(rdepid) in rdep_seen:
183 rdep_seen.append(rdepid)
184 if rdepid in taskData.run_targets:
185 depdata = taskData.run_targets[rdepid][0]
186 if depdata is not None:
187 dep = taskData.fn_index[depdata]
189 # Need to avoid creating new tasks here
190 taskid = taskData.gettask_id(dep, taskname, False)
191 if taskid is not None:
192 depends.append(taskid)
193 fnid = taskData.tasks_fnid[taskid]
194 idepends = taskData.tasks_idepends[taskid]
195 #print "Added %s (%s) due to %s" % (taskid, taskData.fn_index[fnid], taskData.fn_index[depfnid])
197 fnid = taskData.getfn_id(dep)
198 for nextdepid in taskData.depids[fnid]:
199 if nextdepid not in dep_seen:
200 add_recursive_build(nextdepid, fnid)
201 for nextdepid in taskData.rdepids[fnid]:
202 if nextdepid not in rdep_seen:
203 add_recursive_run(nextdepid, fnid)
204 for idepend in idepends:
205 nextdepid = int(idepend.split(":")[0])
206 if nextdepid not in dep_seen:
207 add_recursive_build(nextdepid, fnid)
210 # Resolve Recursive Runtime Depends
211 # Also includes all thier build depends, intertask depends and runtime depends
212 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
213 for taskname in task_deps['recrdeptask'][taskData.tasks_name[task]].split():
217 for depid in taskData.depids[fnid]:
218 add_recursive_build(depid, fnid)
219 for rdepid in taskData.rdepids[fnid]:
220 add_recursive_run(rdepid, fnid)
221 for idepend in idepends:
222 depid = int(idepend.split(":")[0])
223 add_recursive_build(depid, fnid)
225 #Prune self references
228 bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s %s) contains self reference! %s" % (task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends))
235 self.runq_fnid.append(taskData.tasks_fnid[task])
236 self.runq_task.append(taskData.tasks_name[task])
237 self.runq_depends.append(Set(depends))
238 self.runq_revdeps.append(Set())
239 self.runq_weight.append(0)
241 runq_weight1.append(0)
245 bb.msg.note(2, bb.msg.domain.RunQueue, "Marking Active Tasks")
247 def mark_active(listid, depth):
249 Mark an item as active along with its depends
250 (calls itself recursively)
253 if runq_build[listid] == 1:
256 runq_build[listid] = 1
258 depends = self.runq_depends[listid]
259 for depend in depends:
260 mark_active(depend, depth+1)
262 for target in self.targets:
263 targetid = taskData.getbuild_id(target[0])
265 if targetid not in taskData.build_targets:
268 if targetid in taskData.failed_deps:
271 fnid = taskData.build_targets[targetid][0]
273 # Remove stamps for targets if force mode active
274 if self.cooker.configuration.force:
275 fn = taskData.fn_index[fnid]
276 bb.msg.note(2, bb.msg.domain.RunQueue, "Remove stamp %s, %s" % (target[1], fn))
277 bb.build.del_stamp(target[1], self.dataCache, fn)
279 if fnid in taskData.failed_fnids:
282 listid = taskData.tasks_lookup[fnid][target[1]]
284 mark_active(listid, 1)
286 # Prune inactive tasks
289 for listid in range(len(self.runq_fnid)):
290 if runq_build[listid-delcount] == 1:
291 maps.append(listid-delcount)
293 del self.runq_fnid[listid-delcount]
294 del self.runq_task[listid-delcount]
295 del self.runq_depends[listid-delcount]
296 del self.runq_weight[listid-delcount]
297 del runq_weight1[listid-delcount]
298 del runq_build[listid-delcount]
299 del runq_done[listid-delcount]
300 del self.runq_revdeps[listid-delcount]
301 delcount = delcount + 1
304 if len(self.runq_fnid) == 0:
305 if not taskData.abort:
306 bb.msg.note(1, bb.msg.domain.RunQueue, "All possible tasks have been run but build incomplete (--continue mode). See errors above for incomplete tasks.")
308 bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.")
310 bb.msg.note(2, bb.msg.domain.RunQueue, "Pruned %s inactive tasks, %s left" % (delcount, len(self.runq_fnid)))
312 for listid in range(len(self.runq_fnid)):
314 origdeps = self.runq_depends[listid]
315 for origdep in origdeps:
316 if maps[origdep] == -1:
317 bb.msg.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!")
318 newdeps.append(maps[origdep])
319 self.runq_depends[listid] = Set(newdeps)
321 bb.msg.note(2, bb.msg.domain.RunQueue, "Assign Weightings")
323 for listid in range(len(self.runq_fnid)):
324 for dep in self.runq_depends[listid]:
325 self.runq_revdeps[dep].add(listid)
328 for listid in range(len(self.runq_fnid)):
329 revdeps = self.runq_revdeps[listid]
330 if len(revdeps) == 0:
331 runq_done[listid] = 1
332 self.runq_weight[listid] = 1
333 endpoints.append(listid)
335 if dep in self.runq_depends[listid]:
336 #self.dump_data(taskData)
337 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep] , taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
338 runq_weight1[listid] = len(revdeps)
340 bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints))
344 for listid in endpoints:
345 for revdep in self.runq_depends[listid]:
346 self.runq_weight[revdep] = self.runq_weight[revdep] + self.runq_weight[listid]
347 runq_weight1[revdep] = runq_weight1[revdep] - 1
348 if runq_weight1[revdep] == 0:
349 next_points.append(revdep)
350 runq_done[revdep] = 1
351 endpoints = next_points
352 if len(next_points) == 0:
356 for task in range(len(self.runq_fnid)):
357 if runq_done[task] == 0:
360 def print_chain(taskid, finish):
362 for revdep in self.runq_revdeps[taskid]:
363 if runq_done[revdep] == 0 and revdep not in seen and not finish:
364 bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) (depends: %s)" % (revdep, self.get_user_idstring(revdep), self.runq_depends[revdep]))
365 if revdep in deps_seen:
366 bb.msg.error(bb.msg.domain.RunQueue, "Chain ends at Task %s (%s)" % (revdep, self.get_user_idstring(revdep)))
369 for dep in self.runq_depends[revdep]:
370 deps_seen.append(dep)
371 print_chain(revdep, finish)
372 print_chain(task, False)
373 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) not processed!\nThis is probably a circular dependency (the chain might be printed above)." % (task, self.get_user_idstring(task)))
374 if runq_weight1[task] != 0:
375 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) count not zero!" % (task, self.get_user_idstring(task)))
378 # Check for mulitple taska building the same provider
381 for task in range(len(self.runq_fnid)):
382 fn = taskData.fn_index[self.runq_fnid[task]]
386 for prov in self.dataCache.fn_provides[fn]:
387 if prov not in prov_list:
388 prov_list[prov] = [fn]
389 elif fn not in prov_list[prov]:
390 prov_list[prov].append(fn)
392 for prov in prov_list:
393 if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
395 bb.msg.error(bb.msg.domain.RunQueue, "Multiple files due to be built which all provide %s (%s)" % (prov, " ".join(prov_list[prov])))
397 # bb.msg.fatal(bb.msg.domain.RunQueue, "Corrupted metadata configuration detected, aborting...")
400 # Make a weight sorted map
401 from copy import deepcopy
403 sortweight = deepcopy(self.runq_weight)
405 copyweight = deepcopy(self.runq_weight)
408 for weight in sortweight:
409 idx = copyweight.index(weight)
410 self.prio_map.append(idx)
412 self.prio_map.reverse()
414 #self.dump_data(taskData)
416 def execute_runqueue(self):
418 Run the tasks in a queue prepared by prepare_runqueue
419 Upon failure, optionally try to recover the build using any alternate providers
420 (if the abort on failure configuration option isn't set)
427 self.execute_runqueue_internal()
429 if self.master_process:
430 failed_fnids = self.finish_runqueue()
431 if len(failed_fnids) == 0:
433 if self.taskData.abort:
434 raise bb.runqueue.TaskFailure(failed_fnids)
435 for fnid in failed_fnids:
436 #print "Failure: %s %s %s" % (fnid, self.taskData.fn_index[fnid], self.runq_task[fnid])
437 self.taskData.fail_fnid(fnid)
438 failures = failures + 1
439 self.reset_runqueue()
440 self.prepare_runqueue()
442 def execute_runqueue_initVars(self):
444 self.stats = RunQueueStats()
446 self.active_builds = 0
447 self.runq_buildable = []
448 self.runq_running = []
449 self.runq_complete = []
451 self.failed_fnids = []
452 self.master_process = True
454 # Mark initial buildable tasks
455 for task in range(len(self.runq_fnid)):
456 self.runq_running.append(0)
457 self.runq_complete.append(0)
458 if len(self.runq_depends[task]) == 0:
459 self.runq_buildable.append(1)
461 self.runq_buildable.append(0)
463 def task_complete(self, task):
465 Mark a task as completed
466 Look at the reverse dependencies and mark any task with
467 completed dependencies as buildable
469 self.runq_complete[task] = 1
470 for revdep in self.runq_revdeps[task]:
471 if self.runq_running[revdep] == 1:
473 if self.runq_buildable[revdep] == 1:
476 for dep in self.runq_depends[revdep]:
477 if self.runq_complete[dep] != 1:
480 self.runq_buildable[revdep] = 1
481 fn = self.taskData.fn_index[self.runq_fnid[revdep]]
482 taskname = self.runq_task[revdep]
483 bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname))
485 def get_next_task(self):
487 Return the id of the highest priority task that is buildable
489 for task1 in range(len(self.runq_fnid)):
490 task = self.prio_map[task1]
491 if self.runq_running[task] == 1:
493 if self.runq_buildable[task] == 1:
497 def execute_runqueue_internal(self):
499 Run the tasks in a queue prepared by prepare_runqueue
502 bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
504 self.execute_runqueue_initVars()
506 if len(self.runq_fnid) == 0:
510 def sigint_handler(signum, frame):
511 raise KeyboardInterrupt
513 # Find any tasks with current stamps and remove them from the queue
514 for task1 in range(len(self.runq_fnid)):
515 task = self.prio_map[task1]
516 fn = self.taskData.fn_index[self.runq_fnid[task]]
517 taskname = self.runq_task[task]
518 if bb.build.stamp_is_current(taskname, self.dataCache, fn):
519 bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task)))
520 self.runq_running[task] = 1
521 self.task_complete(task)
522 self.stats.taskCompleted()
523 self.stats.taskSkipped()
526 task = self.get_next_task()
528 fn = self.taskData.fn_index[self.runq_fnid[task]]
530 taskname = self.runq_task[task]
531 if bb.build.stamp_is_current(taskname, self.dataCache, fn):
532 bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task)))
533 self.runq_running[task] = 1
534 self.task_complete(task)
535 self.stats.taskCompleted()
536 self.stats.taskSkipped()
539 bb.msg.note(1, bb.msg.domain.RunQueue, "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.active_builds + 1, len(self.runq_fnid), task, self.get_user_idstring(task)))
543 bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
545 # Bypass master process' handling
546 self.master_process = False
547 # Stop Ctrl+C being sent to children
548 # signal.signal(signal.SIGINT, signal.SIG_IGN)
549 # Make the child the process group leader
551 newsi = os.open('/dev/null', os.O_RDWR)
552 os.dup2(newsi, sys.stdin.fileno())
553 self.cooker.configuration.cmd = taskname[3:]
555 self.cooker.tryBuild(fn, False)
556 except bb.build.EventException:
557 bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
560 bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
563 self.build_pids[pid] = task
564 self.runq_running[task] = 1
565 self.active_builds = self.active_builds + 1
566 if self.active_builds < self.number_tasks:
568 if self.active_builds > 0:
569 result = os.waitpid(-1, 0)
570 self.active_builds = self.active_builds - 1
571 task = self.build_pids[result[0]]
573 del self.build_pids[result[0]]
574 bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task)))
575 self.failed_fnids.append(self.runq_fnid[task])
576 self.stats.taskFailed()
578 self.task_complete(task)
579 self.stats.taskCompleted()
580 del self.build_pids[result[0]]
584 def finish_runqueue(self):
586 while self.active_builds > 0:
587 bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.active_builds)
589 for k, v in self.build_pids.iteritems():
590 bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v), k))
591 tasknum = tasknum + 1
592 result = os.waitpid(-1, 0)
593 task = self.build_pids[result[0]]
595 bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task)))
596 self.failed_fnids.append(self.runq_fnid[task])
597 self.stats.taskFailed()
598 del self.build_pids[result[0]]
599 self.active_builds = self.active_builds - 1
600 bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
601 return self.failed_fnids
602 except KeyboardInterrupt:
603 bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.active_builds)
604 for k, v in self.build_pids.iteritems():
606 os.kill(-k, signal.SIGINT)
612 for task in range(len(self.runq_fnid)):
613 if self.runq_buildable[task] == 0:
614 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task)
615 if self.runq_running[task] == 0:
616 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task)
617 if self.runq_complete[task] == 0:
618 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task)
620 bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
622 return self.failed_fnids
624 def dump_data(self, taskQueue):
626 Dump some debug information on the internal data structures
628 bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
629 for task in range(len(self.runq_fnid)):
630 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
631 taskQueue.fn_index[self.runq_fnid[task]],
632 self.runq_task[task],
633 self.runq_weight[task],
634 self.runq_depends[task],
635 self.runq_revdeps[task]))
637 bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
638 for task1 in range(len(self.runq_fnid)):
639 if task1 in self.prio_map:
640 task = self.prio_map[task1]
641 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
642 taskQueue.fn_index[self.runq_fnid[task]],
643 self.runq_task[task],
644 self.runq_weight[task],
645 self.runq_depends[task],
646 self.runq_revdeps[task]))