895092d94460aacffe97df6b09c82d23960ff474
[vuplus_bitbake] / lib / bb / runqueue.py
1 #!/usr/bin/env python
2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4 """
5 BitBake 'RunQueue' implementation
6
7 Handles preparation and execution of a queue of tasks
8 """
9
10 # Copyright (C) 2006-2007  Richard Purdie
11 #
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License version 2 as
14 # published by the Free Software Foundation.
15 #
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 # GNU General Public License for more details.
20 #
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25 from bb import msg, data, event, mkdirhier, utils
26 from sets import Set 
27 import bb, os, sys
28 import signal
29
30 class TaskFailure(Exception):
31     """Exception raised when a task in a runqueue fails"""
32     def __init__(self, x): 
33         self.args = x
34
35
36 class RunQueueStats:
37     """
38     Holds statistics on the tasks handled by the associated runQueue
39     """
40     def __init__(self):
41         self.completed = 0
42         self.skipped = 0
43         self.failed = 0
44
45     def taskFailed(self):
46         self.failed = self.failed + 1
47
48     def taskCompleted(self):
49         self.completed = self.completed + 1
50
51     def taskSkipped(self):
52         self.skipped = self.skipped + 1
53
54 class RunQueueScheduler:
55     """
56     Control the order tasks are scheduled in.
57     """
58     def __init__(self, runqueue):
59         """
60         The default scheduler just returns the first buildable task (the 
61         priority map is sorted by task numer)
62         """
63         self.rq = runqueue
64         numTasks = len(self.rq.runq_fnid)
65
66         self.prio_map = []
67         self.prio_map.extend(range(numTasks))
68
69     def next(self):
70         """
71         Return the id of the first task we find that is buildable
72         """
73         for task1 in range(len(self.rq.runq_fnid)):
74             task = self.prio_map[task1]
75             if self.rq.runq_running[task] == 1:
76                 continue
77             if self.rq.runq_buildable[task] == 1:
78                 return task
79
80 class RunQueueSchedulerSpeed(RunQueueScheduler):
81     """
82     A scheduler optimised for speed. The priority map is sorted by task weight,
83     heavier weighted tasks (tasks needed by the most other tasks) are run first.
84     """
85     def __init__(self, runqueue):
86         """
87         The priority map is sorted by task weight.
88         """
89         from copy import deepcopy
90
91         self.rq = runqueue
92
93         sortweight = deepcopy(self.rq.runq_weight)
94         sortweight.sort()
95         copyweight = deepcopy(self.rq.runq_weight)
96         self.prio_map = []
97
98         for weight in sortweight:
99             idx = copyweight.index(weight)
100             self.prio_map.append(idx)
101             copyweight[idx] = -1
102
103         self.prio_map.reverse()
104
105 class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
106     """
107     A scheduler optimised to complete .bb files are quickly as possible. The 
108     priority map is sorted by task weight, but then reordered so once a given 
109     .bb file starts to build, its completed as quickly as possible. This works
110     well where disk space is at a premium and classes like OE's rm_work are in 
111     force.
112     """
113     def __init__(self, runqueue):
114         RunQueueSchedulerSpeed.__init__(self, runqueue)
115         from copy import deepcopy
116
117         #FIXME - whilst this groups all fnids together it does not reorder the
118         #fnid groups optimally.
119  
120         basemap = deepcopy(self.prio_map)
121         self.prio_map = []
122         while (len(basemap) > 0):
123             entry = basemap.pop(0)
124             self.prio_map.append(entry)
125             fnid = self.rq.runq_fnid[entry]
126             todel = []
127             for entry in basemap:
128                 entry_fnid = self.rq.runq_fnid[entry]
129                 if entry_fnid == fnid:
130                     todel.append(basemap.index(entry))
131                     self.prio_map.append(entry)
132             todel.reverse()
133             for idx in todel:
134                 del basemap[idx]
135
136 class RunQueue:
137     """
138     BitBake Run Queue implementation
139     """
140     def __init__(self, cooker, cfgData, dataCache, taskData, targets):
141         self.reset_runqueue()
142         self.cooker = cooker
143         self.dataCache = dataCache
144         self.taskData = taskData
145         self.targets = targets
146
147         self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData) or 1)
148         self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData) or "").split()
149
150     def reset_runqueue(self):
151
152         self.runq_fnid = []
153         self.runq_task = []
154         self.runq_depends = []
155         self.runq_revdeps = []
156
157     def get_user_idstring(self, task):
158         fn = self.taskData.fn_index[self.runq_fnid[task]]
159         taskname = self.runq_task[task]
160         return "%s, %s" % (fn, taskname)
161
162     def circular_depchains_handler(self, tasks):
163         """
164         Some tasks aren't buildable, likely due to circular dependency issues.
165         Identify the circular dependencies and print them in a user readable format.
166         """
167         from copy import deepcopy
168
169         valid_chains = []
170         explored_deps = {}
171         msgs = []
172
173         def chain_reorder(chain):
174             """
175             Reorder a dependency chain so the lowest task id is first
176             """
177             lowest = 0
178             new_chain = []
179             for entry in range(len(chain)):
180                 if chain[entry] < chain[lowest]:
181                     lowest = entry
182             new_chain.extend(chain[lowest:])
183             new_chain.extend(chain[:lowest])
184             return new_chain
185
186         def chain_compare_equal(chain1, chain2):
187             """
188             Compare two dependency chains and see if they're the same
189             """
190             if len(chain1) != len(chain2):
191                 return False
192             for index in range(len(chain1)):
193                 if chain1[index] != chain2[index]:
194                     return False
195             return True
196             
197         def chain_array_contains(chain, chain_array):
198             """
199             Return True if chain_array contains chain
200             """
201             for ch in chain_array:
202                 if chain_compare_equal(ch, chain):
203                     return True
204             return False
205
206         def find_chains(taskid, prev_chain):
207             prev_chain.append(taskid)
208             total_deps = []
209             total_deps.extend(self.runq_revdeps[taskid])
210             for revdep in self.runq_revdeps[taskid]:
211                 if revdep in prev_chain:
212                     idx = prev_chain.index(revdep)
213                     # To prevent duplicates, reorder the chain to start with the lowest taskid
214                     # and search through an array of those we've already printed
215                     chain = prev_chain[idx:]
216                     new_chain = chain_reorder(chain)
217                     if not chain_array_contains(new_chain, valid_chains):
218                         valid_chains.append(new_chain)
219                         msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
220                         for dep in new_chain:
221                             msgs.append("  Task %s (%s) (depends: %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends[dep]))
222                         msgs.append("\n")
223                     if len(valid_chains) > 10:
224                         msgs.append("Aborted dependency loops search after 10 matches.\n")
225                         return msgs
226                     continue
227                 scan = False
228                 if revdep not in explored_deps:
229                     scan = True
230                 elif revdep in explored_deps[revdep]:
231                     scan = True
232                 else:
233                     for dep in prev_chain:
234                         if dep in explored_deps[revdep]:
235                             scan = True
236                 if scan:
237                     find_chains(revdep, deepcopy(prev_chain))
238                 for dep in explored_deps[revdep]:
239                     if dep not in total_deps:
240                         total_deps.append(dep)
241
242             explored_deps[taskid] = total_deps
243
244         for task in tasks:
245             find_chains(task, [])
246
247         return msgs
248
249     def calculate_task_weights(self, endpoints):
250         """
251         Calculate a number representing the "weight" of each task. Heavier weighted tasks 
252         have more dependencies and hence should be executed sooner for maximum speed.
253
254         This function also sanity checks the task list finding tasks that its not
255         possible to execute due to circular dependencies.
256         """
257
258         numTasks = len(self.runq_fnid)
259         weight = []
260         deps_left = []
261         task_done = []
262
263         for listid in range(numTasks):
264             task_done.append(False)
265             weight.append(0)
266             deps_left.append(len(self.runq_revdeps[listid]))
267
268         for listid in endpoints:
269             weight[listid] = 1
270             task_done[listid] = True
271
272         while 1:
273             next_points = []
274             for listid in endpoints:
275                 for revdep in self.runq_depends[listid]:
276                     weight[revdep] = weight[revdep] + weight[listid]
277                     deps_left[revdep] = deps_left[revdep] - 1
278                     if deps_left[revdep] == 0:
279                         next_points.append(revdep)
280                         task_done[revdep] = True
281             endpoints = next_points
282             if len(next_points) == 0:
283                 break      
284
285         # Circular dependency sanity check
286         problem_tasks = []
287         for task in range(numTasks):
288             if task_done[task] is False or deps_left[task] != 0:
289                 problem_tasks.append(task)
290                 bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s) is not buildable\n" % (task, self.get_user_idstring(task)))
291                 bb.msg.debug(2, bb.msg.domain.RunQueue, "(Complete marker was %s and the remaining dependency count was %s)\n\n" % (task_done[task], deps_left[task]))
292
293         if problem_tasks:
294             message = "Unbuildable tasks were found.\n"
295             message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
296             message = message + "Identifying dependency loops (this may take a short while)...\n"
297             bb.msg.error(bb.msg.domain.RunQueue, message)
298
299             msgs = self.circular_depchains_handler(problem_tasks)
300
301             message = "\n"
302             for msg in msgs:
303                 message = message + msg
304             bb.msg.fatal(bb.msg.domain.RunQueue, message)
305
306         return weight
307
308     def prepare_runqueue(self):
309         """
310         Turn a set of taskData into a RunQueue and compute data needed 
311         to optimise the execution order.
312         """
313
314         depends = []
315         runq_build = []
316
317         taskData = self.taskData
318
319         if len(taskData.tasks_name) == 0:
320             # Nothing to do
321             return
322
323         bb.msg.note(1, bb.msg.domain.RunQueue, "Preparing runqueue")
324
325         # Step A - Work out a list of tasks to run
326         #
327         # Taskdata gives us a list of possible providers for a every target 
328         # ordered by priority (build_targets, run_targets). It also gives
329         # information on each of those providers.
330         #
331         # To create the actual list of tasks to execute we fix the list of 
332         # providers and then resolve the dependencies into task IDs. This 
333         # process is repeated for each type of dependency (tdepends, deptask, 
334         # rdeptast, recrdeptask, idepends).
335
336         for task in range(len(taskData.tasks_name)):
337             fnid = taskData.tasks_fnid[task]
338             fn = taskData.fn_index[fnid]
339             task_deps = self.dataCache.task_deps[fn]
340
341             if fnid not in taskData.failed_fnids:
342
343                 # Resolve task internal dependencies 
344                 #
345                 # e.g. addtask before X after Y
346                 depends = taskData.tasks_tdepends[task]
347
348                 # Resolve 'deptask' dependencies 
349                 #
350                 # e.g. do_sometask[deptask] = "do_someothertask"
351                 # (makes sure sometask runs after someothertask of all DEPENDS)
352                 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
353                     tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
354                     for depid in taskData.depids[fnid]:
355                         # Won't be in build_targets if ASSUME_PROVIDED
356                         if depid in taskData.build_targets:
357                             depdata = taskData.build_targets[depid][0]
358                             if depdata is not None:
359                                 dep = taskData.fn_index[depdata]
360                                 for taskname in tasknames:
361                                     depends.append(taskData.gettask_id(dep, taskname))
362
363                 # Resolve 'rdeptask' dependencies 
364                 #
365                 # e.g. do_sometask[rdeptask] = "do_someothertask"
366                 # (makes sure sometask runs after someothertask of all RDEPENDS)
367                 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
368                     taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
369                     for depid in taskData.rdepids[fnid]:
370                         if depid in taskData.run_targets:
371                             depdata = taskData.run_targets[depid][0]
372                             if depdata is not None:
373                                 dep = taskData.fn_index[depdata]
374                                 depends.append(taskData.gettask_id(dep, taskname))
375
376                 # Resolve inter-task dependencies 
377                 #
378                 # e.g. do_sometask[depends] = "targetname:do_someothertask"
379                 # (makes sure sometask runs after targetname's someothertask)
380                 idepends = taskData.tasks_idepends[task]
381                 for idepend in idepends:
382                     depid = int(idepend.split(":")[0])
383                     if depid in taskData.build_targets:
384                         # Won't be in build_targets if ASSUME_PROVIDED
385                         depdata = taskData.build_targets[depid][0]
386                         if depdata is not None:
387                             dep = taskData.fn_index[depdata]
388                             depends.append(taskData.gettask_id(dep, idepend.split(":")[1]))
389
390                 def add_recursive_build(depid, depfnid):
391                     """
392                     Add build depends of depid to depends
393                     (if we've not see it before)
394                     (calls itself recursively)
395                     """
396                     if str(depid) in dep_seen:
397                         return
398                     dep_seen.append(depid)
399                     if depid in taskData.build_targets:
400                         depdata = taskData.build_targets[depid][0]
401                         if depdata is not None:
402                             dep = taskData.fn_index[depdata]
403                             idepends = []
404                             # Need to avoid creating new tasks here
405                             taskid = taskData.gettask_id(dep, taskname, False)
406                             if taskid is not None:
407                                 depends.append(taskid)
408                                 fnid = taskData.tasks_fnid[taskid]
409                                 idepends = taskData.tasks_idepends[taskid]
410                                 #print "Added %s (%s) due to %s" % (taskid, taskData.fn_index[fnid], taskData.fn_index[depfnid])
411                             else:
412                                 fnid = taskData.getfn_id(dep)
413                             for nextdepid in taskData.depids[fnid]:
414                                 if nextdepid not in dep_seen:
415                                     add_recursive_build(nextdepid, fnid)
416                             for nextdepid in taskData.rdepids[fnid]:
417                                 if nextdepid not in rdep_seen:
418                                     add_recursive_run(nextdepid, fnid)
419                             for idepend in idepends:
420                                 nextdepid = int(idepend.split(":")[0])
421                                 if nextdepid not in dep_seen:
422                                     add_recursive_build(nextdepid, fnid)
423
424                 def add_recursive_run(rdepid, depfnid):
425                     """
426                     Add runtime depends of rdepid to depends
427                     (if we've not see it before)
428                     (calls itself recursively)
429                     """
430                     if str(rdepid) in rdep_seen:
431                         return
432                     rdep_seen.append(rdepid)
433                     if rdepid in taskData.run_targets:
434                         depdata = taskData.run_targets[rdepid][0]
435                         if depdata is not None:
436                             dep = taskData.fn_index[depdata]
437                             idepends = []
438                             # Need to avoid creating new tasks here
439                             taskid = taskData.gettask_id(dep, taskname, False)
440                             if taskid is not None:
441                                 depends.append(taskid)
442                                 fnid = taskData.tasks_fnid[taskid]
443                                 idepends = taskData.tasks_idepends[taskid]
444                                 #print "Added %s (%s) due to %s" % (taskid, taskData.fn_index[fnid], taskData.fn_index[depfnid])
445                             else:
446                                 fnid = taskData.getfn_id(dep)
447                             for nextdepid in taskData.depids[fnid]:
448                                 if nextdepid not in dep_seen:
449                                     add_recursive_build(nextdepid, fnid)
450                             for nextdepid in taskData.rdepids[fnid]:
451                                 if nextdepid not in rdep_seen:
452                                     add_recursive_run(nextdepid, fnid)
453                             for idepend in idepends:
454                                 nextdepid = int(idepend.split(":")[0])
455                                 if nextdepid not in dep_seen:
456                                     add_recursive_build(nextdepid, fnid)
457
458                 # Resolve recursive 'recrdeptask' dependencies 
459                 #
460                 # e.g. do_sometask[recrdeptask] = "do_someothertask"
461                 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
462                 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
463                     for taskname in task_deps['recrdeptask'][taskData.tasks_name[task]].split():
464                         dep_seen = []
465                         rdep_seen = []
466                         idep_seen = []
467                         for depid in taskData.depids[fnid]:
468                             add_recursive_build(depid, fnid)
469                         for rdepid in taskData.rdepids[fnid]:
470                             add_recursive_run(rdepid, fnid)
471                         for idepend in idepends:
472                             depid = int(idepend.split(":")[0])
473                             add_recursive_build(depid, fnid)
474
475                 # Rmove all self references
476                 if task in depends:
477                     newdep = []
478                     bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s %s) contains self reference! %s" % (task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends))
479                     for dep in depends:
480                        if task != dep:
481                           newdep.append(dep)
482                     depends = newdep
483
484
485             self.runq_fnid.append(taskData.tasks_fnid[task])
486             self.runq_task.append(taskData.tasks_name[task])
487             self.runq_depends.append(Set(depends))
488             self.runq_revdeps.append(Set())
489
490             runq_build.append(0)
491
492         # Step B - Mark all active tasks
493         #
494         # Start with the tasks we were asked to run and mark all dependencies
495         # as active too. If the task is to be 'forced', clear its stamp. Once
496         # all active tasks are marked, prune the ones we don't need.
497
498         bb.msg.note(2, bb.msg.domain.RunQueue, "Marking Active Tasks")
499
500         def mark_active(listid, depth):
501             """
502             Mark an item as active along with its depends
503             (calls itself recursively)
504             """
505
506             if runq_build[listid] == 1:
507                 return
508
509             runq_build[listid] = 1
510
511             depends = self.runq_depends[listid]
512             for depend in depends:
513                 mark_active(depend, depth+1)
514
515         for target in self.targets:
516             targetid = taskData.getbuild_id(target[0])
517
518             if targetid not in taskData.build_targets:
519                 continue
520
521             if targetid in taskData.failed_deps:
522                 continue
523
524             fnid = taskData.build_targets[targetid][0]
525
526             # Remove stamps for targets if force mode active
527             if self.cooker.configuration.force:
528                 fn = taskData.fn_index[fnid]
529                 bb.msg.note(2, bb.msg.domain.RunQueue, "Remove stamp %s, %s" % (target[1], fn))
530                 bb.build.del_stamp(target[1], self.dataCache, fn)
531
532             if fnid in taskData.failed_fnids:
533                 continue
534
535             listid = taskData.tasks_lookup[fnid][target[1]]
536
537             mark_active(listid, 1)
538
539         # Step C - Prune all inactive tasks
540         #
541         # Once all active tasks are marked, prune the ones we don't need.
542
543         maps = []
544         delcount = 0
545         for listid in range(len(self.runq_fnid)):
546             if runq_build[listid-delcount] == 1:
547                 maps.append(listid-delcount)
548             else:
549                 del self.runq_fnid[listid-delcount]
550                 del self.runq_task[listid-delcount]
551                 del self.runq_depends[listid-delcount]
552                 del runq_build[listid-delcount]
553                 del self.runq_revdeps[listid-delcount]
554                 delcount = delcount + 1
555                 maps.append(-1)
556
557         #
558         # Step D - Sanity checks and computation
559         #
560
561         # Check to make sure we still have tasks to run
562         if len(self.runq_fnid) == 0:
563             if not taskData.abort:
564                 bb.msg.note(1, bb.msg.domain.RunQueue, "All possible tasks have been run but build incomplete (--continue mode). See errors above for incomplete tasks.")
565                 return
566             bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.")
567
568         bb.msg.note(2, bb.msg.domain.RunQueue, "Pruned %s inactive tasks, %s left" % (delcount, len(self.runq_fnid)))
569
570         # Remap the dependencies to account for the deleted tasks
571         # Check we didn't delete a task we depend on
572         for listid in range(len(self.runq_fnid)):
573             newdeps = []
574             origdeps = self.runq_depends[listid]
575             for origdep in origdeps:
576                 if maps[origdep] == -1:
577                     bb.msg.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!")
578                 newdeps.append(maps[origdep])
579             self.runq_depends[listid] = Set(newdeps)
580
581         bb.msg.note(2, bb.msg.domain.RunQueue, "Assign Weightings")
582
583         # Generate a list of reverse dependencies to ease future calculations
584         for listid in range(len(self.runq_fnid)):
585             for dep in self.runq_depends[listid]:
586                 self.runq_revdeps[dep].add(listid)
587
588         # Identify tasks at the end of dependency chains
589         # Error on circular dependency loops (length two)
590         endpoints = []
591         for listid in range(len(self.runq_fnid)):
592             revdeps = self.runq_revdeps[listid]
593             if len(revdeps) == 0:
594                 endpoints.append(listid)
595             for dep in revdeps:
596                 if dep in self.runq_depends[listid]:
597                     #self.dump_data(taskData)
598                     bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep] , taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
599
600         bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints))
601
602
603         # Calculate task weights 
604         # Check of higher length circular dependencies
605         self.runq_weight = self.calculate_task_weights(endpoints)
606
607         # Decide what order to execute the tasks in, pick a scheduler
608         # FIXME - Allow user selection
609         #self.sched = RunQueueScheduler(self)
610         self.sched = RunQueueSchedulerSpeed(self)
611         #self.sched = RunQueueSchedulerCompletion(self)
612
613         # Sanity Check - Check for multiple tasks building the same provider
614         prov_list = {}
615         seen_fn = []
616         for task in range(len(self.runq_fnid)):
617             fn = taskData.fn_index[self.runq_fnid[task]]
618             if fn in seen_fn:
619                 continue
620             seen_fn.append(fn)
621             for prov in self.dataCache.fn_provides[fn]:
622                 if prov not in prov_list:
623                     prov_list[prov] = [fn]
624                 elif fn not in prov_list[prov]: 
625                     prov_list[prov].append(fn)
626         error = False
627         for prov in prov_list:
628             if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
629                 error = True
630                 bb.msg.error(bb.msg.domain.RunQueue, "Multiple files due to be built which all provide %s (%s)" % (prov, " ".join(prov_list[prov])))
631         #if error:
632         #    bb.msg.fatal(bb.msg.domain.RunQueue, "Corrupted metadata configuration detected, aborting...")
633
634         #self.dump_data(taskData)
635
636     def execute_runqueue(self):
637         """
638         Run the tasks in a queue prepared by prepare_runqueue
639         Upon failure, optionally try to recover the build using any alternate providers
640         (if the abort on failure configuration option isn't set)
641         """
642
643         failures = 0
644         while 1:
645             failed_fnids = []
646             try:
647                 self.execute_runqueue_internal()
648             finally:
649                 if self.master_process:
650                     failed_fnids = self.finish_runqueue()
651             if len(failed_fnids) == 0:
652                 return failures
653             if self.taskData.abort:
654                 raise bb.runqueue.TaskFailure(failed_fnids)
655             for fnid in failed_fnids:
656                 #print "Failure: %s %s %s" % (fnid, self.taskData.fn_index[fnid],  self.runq_task[fnid])
657                 self.taskData.fail_fnid(fnid)
658                 failures = failures + 1
659             self.reset_runqueue()
660             self.prepare_runqueue()
661
662     def execute_runqueue_initVars(self):
663
664         self.stats = RunQueueStats()
665
666         self.active_builds = 0
667         self.runq_buildable = []
668         self.runq_running = []
669         self.runq_complete = []
670         self.build_pids = {}
671         self.failed_fnids = []
672         self.master_process = True
673
674         # Mark initial buildable tasks
675         for task in range(len(self.runq_fnid)):
676             self.runq_running.append(0)
677             self.runq_complete.append(0)
678             if len(self.runq_depends[task]) == 0:
679                 self.runq_buildable.append(1)
680             else:
681                 self.runq_buildable.append(0)
682
683     def task_complete(self, task):
684         """
685         Mark a task as completed
686         Look at the reverse dependencies and mark any task with 
687         completed dependencies as buildable
688         """
689         self.runq_complete[task] = 1
690         for revdep in self.runq_revdeps[task]:
691             if self.runq_running[revdep] == 1:
692                 continue
693             if self.runq_buildable[revdep] == 1:
694                 continue
695             alldeps = 1
696             for dep in self.runq_depends[revdep]:
697                 if self.runq_complete[dep] != 1:
698                     alldeps = 0
699             if alldeps == 1:
700                 self.runq_buildable[revdep] = 1
701                 fn = self.taskData.fn_index[self.runq_fnid[revdep]]
702                 taskname = self.runq_task[revdep]
703                 bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname))
704
705     def execute_runqueue_internal(self):
706         """
707         Run the tasks in a queue prepared by prepare_runqueue
708         """
709
710         bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
711
712         self.execute_runqueue_initVars()
713
714         if len(self.runq_fnid) == 0:
715             # nothing to do
716             return []
717
718         def sigint_handler(signum, frame):
719             raise KeyboardInterrupt
720
721         # RP - this code allows tasks to run out of the correct order - disabled, FIXME
722         # Find any tasks with current stamps and remove them from the queue
723         #for task1 in range(len(self.runq_fnid)):
724         #    task = self.prio_map[task1]
725         #    fn = self.taskData.fn_index[self.runq_fnid[task]]
726         #    taskname = self.runq_task[task]
727         #    if bb.build.stamp_is_current(taskname, self.dataCache, fn):
728         #        bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task)))
729         #        self.runq_running[task] = 1
730         #        self.task_complete(task)
731         #        self.stats.taskCompleted()
732         #        self.stats.taskSkipped()
733
734         while True:
735             task = self.sched.next()
736             if task is not None:
737                 fn = self.taskData.fn_index[self.runq_fnid[task]]
738
739                 taskname = self.runq_task[task]
740                 if bb.build.stamp_is_current(taskname, self.dataCache, fn):
741                     bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task)))
742                     self.runq_running[task] = 1
743                     self.task_complete(task)
744                     self.stats.taskCompleted()
745                     self.stats.taskSkipped()
746                     continue
747
748                 bb.msg.note(1, bb.msg.domain.RunQueue, "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.active_builds + 1, len(self.runq_fnid), task, self.get_user_idstring(task)))
749                 try: 
750                     pid = os.fork() 
751                 except OSError, e: 
752                     bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
753                 if pid == 0:
754                     # Bypass master process' handling
755                     self.master_process = False
756                     # Stop Ctrl+C being sent to children
757                     # signal.signal(signal.SIGINT, signal.SIG_IGN)
758                     # Make the child the process group leader
759                     os.setpgid(0, 0)
760                     newsi = os.open('/dev/null', os.O_RDWR)
761                     os.dup2(newsi, sys.stdin.fileno())
762                     self.cooker.configuration.cmd = taskname[3:]
763                     try: 
764                         self.cooker.tryBuild(fn, False)
765                     except bb.build.EventException:
766                         bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
767                         sys.exit(1)
768                     except:
769                         bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
770                         raise
771                     sys.exit(0)
772                 self.build_pids[pid] = task
773                 self.runq_running[task] = 1
774                 self.active_builds = self.active_builds + 1
775                 if self.active_builds < self.number_tasks:
776                     continue
777             if self.active_builds > 0:
778                 result = os.waitpid(-1, 0)
779                 self.active_builds = self.active_builds - 1
780                 task = self.build_pids[result[0]]
781                 if result[1] != 0:
782                     del self.build_pids[result[0]]
783                     bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task)))
784                     self.failed_fnids.append(self.runq_fnid[task])
785                     self.stats.taskFailed()
786                     break
787                 self.task_complete(task)
788                 self.stats.taskCompleted()
789                 del self.build_pids[result[0]]
790                 continue
791             return
792
793     def finish_runqueue(self):
794         try:
795             while self.active_builds > 0:
796                 bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.active_builds)
797                 tasknum = 1
798                 for k, v in self.build_pids.iteritems():
799                      bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v), k))
800                      tasknum = tasknum + 1
801                 result = os.waitpid(-1, 0)
802                 task = self.build_pids[result[0]]
803                 if result[1] != 0:
804                      bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task)))
805                      self.failed_fnids.append(self.runq_fnid[task])
806                      self.stats.taskFailed()
807                 del self.build_pids[result[0]]
808                 self.active_builds = self.active_builds - 1
809             bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
810             return self.failed_fnids
811         except KeyboardInterrupt:
812             bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.active_builds)
813             for k, v in self.build_pids.iteritems():
814                  try:
815                      os.kill(-k, signal.SIGINT)
816                  except:
817                      pass
818             raise
819
820         # Sanity Checks
821         for task in range(len(self.runq_fnid)):
822             if self.runq_buildable[task] == 0:
823                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task)
824             if self.runq_running[task] == 0:
825                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task)
826             if self.runq_complete[task] == 0:
827                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task)
828
829         bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
830
831         return self.failed_fnids
832
833     def dump_data(self, taskQueue):
834         """
835         Dump some debug information on the internal data structures
836         """
837         bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
838         for task in range(len(self.runq_fnid)):
839                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
840                         taskQueue.fn_index[self.runq_fnid[task]], 
841                         self.runq_task[task], 
842                         self.runq_weight[task], 
843                         self.runq_depends[task], 
844                         self.runq_revdeps[task]))
845
846         bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
847         for task1 in range(len(self.runq_fnid)):
848             if task1 in self.prio_map:
849                 task = self.prio_map[task1]
850                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
851                         taskQueue.fn_index[self.runq_fnid[task]], 
852                         self.runq_task[task], 
853                         self.runq_weight[task], 
854                         self.runq_depends[task], 
855                         self.runq_revdeps[task]))