62bd10ae24dd1397759395fe471b3717089757d1
[vuplus_bitbake] / lib / bb / runqueue.py
1 #!/usr/bin/env python
2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4 """
5 BitBake 'RunQueue' implementation
6
7 Handles preparation and execution of a queue of tasks
8 """
9
10 # Copyright (C) 2006-2007  Richard Purdie
11 #
12 # This program is free software; you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License version 2 as
14 # published by the Free Software Foundation.
15 #
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 # GNU General Public License for more details.
20 #
21 # You should have received a copy of the GNU General Public License along
22 # with this program; if not, write to the Free Software Foundation, Inc.,
23 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25 from bb import msg, data, event, mkdirhier, utils
26 from sets import Set 
27 import bb, os, sys
28 import signal
29 import stat
30
31 class TaskFailure(Exception):
32     """Exception raised when a task in a runqueue fails"""
33     def __init__(self, x): 
34         self.args = x
35
36
37 class RunQueueStats:
38     """
39     Holds statistics on the tasks handled by the associated runQueue
40     """
41     def __init__(self):
42         self.completed = 0
43         self.skipped = 0
44         self.failed = 0
45
46     def taskFailed(self):
47         self.failed = self.failed + 1
48
49     def taskCompleted(self, number = 1):
50         self.completed = self.completed + number
51
52     def taskSkipped(self, number = 1):
53         self.skipped = self.skipped + number
54
55 class RunQueueScheduler:
56     """
57     Control the order tasks are scheduled in.
58     """
59     def __init__(self, runqueue):
60         """
61         The default scheduler just returns the first buildable task (the 
62         priority map is sorted by task numer)
63         """
64         self.rq = runqueue
65         numTasks = len(self.rq.runq_fnid)
66
67         self.prio_map = []
68         self.prio_map.extend(range(numTasks))
69
70     def next(self):
71         """
72         Return the id of the first task we find that is buildable
73         """
74         for task1 in range(len(self.rq.runq_fnid)):
75             task = self.prio_map[task1]
76             if self.rq.runq_running[task] == 1:
77                 continue
78             if self.rq.runq_buildable[task] == 1:
79                 return task
80
81 class RunQueueSchedulerSpeed(RunQueueScheduler):
82     """
83     A scheduler optimised for speed. The priority map is sorted by task weight,
84     heavier weighted tasks (tasks needed by the most other tasks) are run first.
85     """
86     def __init__(self, runqueue):
87         """
88         The priority map is sorted by task weight.
89         """
90         from copy import deepcopy
91
92         self.rq = runqueue
93
94         sortweight = deepcopy(self.rq.runq_weight)
95         sortweight.sort()
96         copyweight = deepcopy(self.rq.runq_weight)
97         self.prio_map = []
98
99         for weight in sortweight:
100             idx = copyweight.index(weight)
101             self.prio_map.append(idx)
102             copyweight[idx] = -1
103
104         self.prio_map.reverse()
105
106 class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
107     """
108     A scheduler optimised to complete .bb files are quickly as possible. The 
109     priority map is sorted by task weight, but then reordered so once a given 
110     .bb file starts to build, its completed as quickly as possible. This works
111     well where disk space is at a premium and classes like OE's rm_work are in 
112     force.
113     """
114     def __init__(self, runqueue):
115         RunQueueSchedulerSpeed.__init__(self, runqueue)
116         from copy import deepcopy
117
118         #FIXME - whilst this groups all fnids together it does not reorder the
119         #fnid groups optimally.
120  
121         basemap = deepcopy(self.prio_map)
122         self.prio_map = []
123         while (len(basemap) > 0):
124             entry = basemap.pop(0)
125             self.prio_map.append(entry)
126             fnid = self.rq.runq_fnid[entry]
127             todel = []
128             for entry in basemap:
129                 entry_fnid = self.rq.runq_fnid[entry]
130                 if entry_fnid == fnid:
131                     todel.append(basemap.index(entry))
132                     self.prio_map.append(entry)
133             todel.reverse()
134             for idx in todel:
135                 del basemap[idx]
136
137 class RunQueue:
138     """
139     BitBake Run Queue implementation
140     """
141     def __init__(self, cooker, cfgData, dataCache, taskData, targets):
142         self.reset_runqueue()
143         self.cooker = cooker
144         self.dataCache = dataCache
145         self.taskData = taskData
146         self.targets = targets
147
148         self.cfgdata = cfgData
149         self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1)
150         self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split()
151         self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed"
152         self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, 1) or "perfile"
153         self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or ""
154
155     def reset_runqueue(self):
156
157         self.runq_fnid = []
158         self.runq_task = []
159         self.runq_depends = []
160         self.runq_revdeps = []
161
162     def get_user_idstring(self, task):
163         fn = self.taskData.fn_index[self.runq_fnid[task]]
164         taskname = self.runq_task[task]
165         return "%s, %s" % (fn, taskname)
166
167     def get_task_id(self, fnid, taskname):
168         for listid in range(len(self.runq_fnid)):
169             if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname:
170                 return listid
171         return None
172
173     def circular_depchains_handler(self, tasks):
174         """
175         Some tasks aren't buildable, likely due to circular dependency issues.
176         Identify the circular dependencies and print them in a user readable format.
177         """
178         from copy import deepcopy
179
180         valid_chains = []
181         explored_deps = {}
182         msgs = []
183
184         def chain_reorder(chain):
185             """
186             Reorder a dependency chain so the lowest task id is first
187             """
188             lowest = 0
189             new_chain = []
190             for entry in range(len(chain)):
191                 if chain[entry] < chain[lowest]:
192                     lowest = entry
193             new_chain.extend(chain[lowest:])
194             new_chain.extend(chain[:lowest])
195             return new_chain
196
197         def chain_compare_equal(chain1, chain2):
198             """
199             Compare two dependency chains and see if they're the same
200             """
201             if len(chain1) != len(chain2):
202                 return False
203             for index in range(len(chain1)):
204                 if chain1[index] != chain2[index]:
205                     return False
206             return True
207             
208         def chain_array_contains(chain, chain_array):
209             """
210             Return True if chain_array contains chain
211             """
212             for ch in chain_array:
213                 if chain_compare_equal(ch, chain):
214                     return True
215             return False
216
217         def find_chains(taskid, prev_chain):
218             prev_chain.append(taskid)
219             total_deps = []
220             total_deps.extend(self.runq_revdeps[taskid])
221             for revdep in self.runq_revdeps[taskid]:
222                 if revdep in prev_chain:
223                     idx = prev_chain.index(revdep)
224                     # To prevent duplicates, reorder the chain to start with the lowest taskid
225                     # and search through an array of those we've already printed
226                     chain = prev_chain[idx:]
227                     new_chain = chain_reorder(chain)
228                     if not chain_array_contains(new_chain, valid_chains):
229                         valid_chains.append(new_chain)
230                         msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
231                         for dep in new_chain:
232                             msgs.append("  Task %s (%s) (depends: %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends[dep]))
233                         msgs.append("\n")
234                     if len(valid_chains) > 10:
235                         msgs.append("Aborted dependency loops search after 10 matches.\n")
236                         return msgs
237                     continue
238                 scan = False
239                 if revdep not in explored_deps:
240                     scan = True
241                 elif revdep in explored_deps[revdep]:
242                     scan = True
243                 else:
244                     for dep in prev_chain:
245                         if dep in explored_deps[revdep]:
246                             scan = True
247                 if scan:
248                     find_chains(revdep, deepcopy(prev_chain))
249                 for dep in explored_deps[revdep]:
250                     if dep not in total_deps:
251                         total_deps.append(dep)
252
253             explored_deps[taskid] = total_deps
254
255         for task in tasks:
256             find_chains(task, [])
257
258         return msgs
259
260     def calculate_task_weights(self, endpoints):
261         """
262         Calculate a number representing the "weight" of each task. Heavier weighted tasks 
263         have more dependencies and hence should be executed sooner for maximum speed.
264
265         This function also sanity checks the task list finding tasks that its not
266         possible to execute due to circular dependencies.
267         """
268
269         numTasks = len(self.runq_fnid)
270         weight = []
271         deps_left = []
272         task_done = []
273
274         for listid in range(numTasks):
275             task_done.append(False)
276             weight.append(0)
277             deps_left.append(len(self.runq_revdeps[listid]))
278
279         for listid in endpoints:
280             weight[listid] = 1
281             task_done[listid] = True
282
283         while 1:
284             next_points = []
285             for listid in endpoints:
286                 for revdep in self.runq_depends[listid]:
287                     weight[revdep] = weight[revdep] + weight[listid]
288                     deps_left[revdep] = deps_left[revdep] - 1
289                     if deps_left[revdep] == 0:
290                         next_points.append(revdep)
291                         task_done[revdep] = True
292             endpoints = next_points
293             if len(next_points) == 0:
294                 break      
295
296         # Circular dependency sanity check
297         problem_tasks = []
298         for task in range(numTasks):
299             if task_done[task] is False or deps_left[task] != 0:
300                 problem_tasks.append(task)
301                 bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s) is not buildable\n" % (task, self.get_user_idstring(task)))
302                 bb.msg.debug(2, bb.msg.domain.RunQueue, "(Complete marker was %s and the remaining dependency count was %s)\n\n" % (task_done[task], deps_left[task]))
303
304         if problem_tasks:
305             message = "Unbuildable tasks were found.\n"
306             message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
307             message = message + "Identifying dependency loops (this may take a short while)...\n"
308             bb.msg.error(bb.msg.domain.RunQueue, message)
309
310             msgs = self.circular_depchains_handler(problem_tasks)
311
312             message = "\n"
313             for msg in msgs:
314                 message = message + msg
315             bb.msg.fatal(bb.msg.domain.RunQueue, message)
316
317         return weight
318
319     def prepare_runqueue(self):
320         """
321         Turn a set of taskData into a RunQueue and compute data needed 
322         to optimise the execution order.
323         """
324
325         depends = []
326         runq_build = []
327         recursive_tdepends = {}
328
329         taskData = self.taskData
330
331         if len(taskData.tasks_name) == 0:
332             # Nothing to do
333             return
334
335         bb.msg.note(1, bb.msg.domain.RunQueue, "Preparing runqueue")
336
337         # Step A - Work out a list of tasks to run
338         #
339         # Taskdata gives us a list of possible providers for a every target 
340         # ordered by priority (build_targets, run_targets). It also gives
341         # information on each of those providers.
342         #
343         # To create the actual list of tasks to execute we fix the list of 
344         # providers and then resolve the dependencies into task IDs. This 
345         # process is repeated for each type of dependency (tdepends, deptask, 
346         # rdeptast, recrdeptask, idepends).
347
348         for task in range(len(taskData.tasks_name)):
349             fnid = taskData.tasks_fnid[task]
350             fn = taskData.fn_index[fnid]
351             task_deps = self.dataCache.task_deps[fn]
352
353             if fnid not in taskData.failed_fnids:
354
355                 # Resolve task internal dependencies 
356                 #
357                 # e.g. addtask before X after Y
358                 depends = taskData.tasks_tdepends[task]
359
360                 # Resolve 'deptask' dependencies 
361                 #
362                 # e.g. do_sometask[deptask] = "do_someothertask"
363                 # (makes sure sometask runs after someothertask of all DEPENDS)
364                 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
365                     tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
366                     for depid in taskData.depids[fnid]:
367                         # Won't be in build_targets if ASSUME_PROVIDED
368                         if depid in taskData.build_targets:
369                             depdata = taskData.build_targets[depid][0]
370                             if depdata is not None:
371                                 dep = taskData.fn_index[depdata]
372                                 for taskname in tasknames:
373                                     depends.append(taskData.gettask_id(dep, taskname))
374
375                 # Resolve 'rdeptask' dependencies 
376                 #
377                 # e.g. do_sometask[rdeptask] = "do_someothertask"
378                 # (makes sure sometask runs after someothertask of all RDEPENDS)
379                 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
380                     taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
381                     for depid in taskData.rdepids[fnid]:
382                         if depid in taskData.run_targets:
383                             depdata = taskData.run_targets[depid][0]
384                             if depdata is not None:
385                                 dep = taskData.fn_index[depdata]
386                                 depends.append(taskData.gettask_id(dep, taskname))
387
388                 # Resolve inter-task dependencies 
389                 #
390                 # e.g. do_sometask[depends] = "targetname:do_someothertask"
391                 # (makes sure sometask runs after targetname's someothertask)
392                 idepends = taskData.tasks_idepends[task]
393                 for (depid, idependtask) in idepends:
394                     if depid in taskData.build_targets:
395                         # Won't be in build_targets if ASSUME_PROVIDED
396                         depdata = taskData.build_targets[depid][0]
397                         if depdata is not None:
398                             dep = taskData.fn_index[depdata]
399                             depends.append(taskData.gettask_id(dep, idependtask))
400
401                 # Create a list of recursive dependent tasks (from tdepends) and cache
402                 def get_recursive_tdepends(task):
403                     if not task:
404                         return []
405                     if task in recursive_tdepends:
406                         return recursive_tdepends[task]
407
408                     fnid = taskData.tasks_fnid[task]
409                     taskids = taskData.gettask_ids(fnid)
410
411                     rectdepends = taskids
412                     nextdeps = taskids
413                     while len(nextdeps) != 0:
414                         newdeps = []
415                         for nextdep in nextdeps:
416                             for tdepend in taskData.tasks_tdepends[nextdep]:
417                                 if tdepend not in rectdepends:
418                                     rectdepends.append(tdepend)
419                                     newdeps.append(tdepend)
420                         nextdeps = newdeps
421                     recursive_tdepends[task] = rectdepends
422                     return rectdepends
423
424                 # Using the list of tdepends for this task create a list of 
425                 # the recursive idepends we have
426                 def get_recursive_idepends(task):
427                     if not task:
428                         return []
429                     rectdepends = get_recursive_tdepends(task)
430
431                     recidepends = []
432                     for tdepend in rectdepends:
433                         for idepend in taskData.tasks_idepends[tdepend]:
434                             recidepends.append(idepend)
435                     return recidepends
436
437                 def add_recursive_build(depid, depfnid):
438                     """
439                     Add build depends of depid to depends
440                     (if we've not see it before)
441                     (calls itself recursively)
442                     """
443                     if str(depid) in dep_seen:
444                         return
445                     dep_seen.append(depid)
446                     if depid in taskData.build_targets:
447                         depdata = taskData.build_targets[depid][0]
448                         if depdata is not None:
449                             dep = taskData.fn_index[depdata]
450                             # Need to avoid creating new tasks here
451                             taskid = taskData.gettask_id(dep, taskname, False)
452                             if taskid is not None:
453                                 depends.append(taskid)
454                                 fnid = taskData.tasks_fnid[taskid]
455                                 #print "Added %s (%s) due to %s" % (taskid, taskData.fn_index[fnid], taskData.fn_index[depfnid])
456                             else:
457                                 fnid = taskData.getfn_id(dep)
458                             for nextdepid in taskData.depids[fnid]:
459                                 if nextdepid not in dep_seen:
460                                     add_recursive_build(nextdepid, fnid)
461                             for nextdepid in taskData.rdepids[fnid]:
462                                 if nextdepid not in rdep_seen:
463                                     add_recursive_run(nextdepid, fnid)
464                             for (idependid, idependtask) in get_recursive_idepends(taskid):
465                                 if idependid not in dep_seen:
466                                     add_recursive_build(idependid, fnid)
467
468                 def add_recursive_run(rdepid, depfnid):
469                     """
470                     Add runtime depends of rdepid to depends
471                     (if we've not see it before)
472                     (calls itself recursively)
473                     """
474                     if str(rdepid) in rdep_seen:
475                         return
476                     rdep_seen.append(rdepid)
477                     if rdepid in taskData.run_targets:
478                         depdata = taskData.run_targets[rdepid][0]
479                         if depdata is not None:
480                             dep = taskData.fn_index[depdata]
481                             # Need to avoid creating new tasks here
482                             taskid = taskData.gettask_id(dep, taskname, False)
483                             if taskid is not None:
484                                 depends.append(taskid)
485                                 fnid = taskData.tasks_fnid[taskid]
486                                 #print "Added %s (%s) due to %s" % (taskid, taskData.fn_index[fnid], taskData.fn_index[depfnid])
487                             else:
488                                 fnid = taskData.getfn_id(dep)
489                             for nextdepid in taskData.depids[fnid]:
490                                 if nextdepid not in dep_seen:
491                                     add_recursive_build(nextdepid, fnid)
492                             for nextdepid in taskData.rdepids[fnid]:
493                                 if nextdepid not in rdep_seen:
494                                     add_recursive_run(nextdepid, fnid)
495                             for (idependid, idependtask) in get_recursive_idepends(taskid):
496                                 if idependid not in dep_seen:
497                                     add_recursive_build(idependid, fnid)
498
499                 # Resolve recursive 'recrdeptask' dependencies 
500                 #
501                 # e.g. do_sometask[recrdeptask] = "do_someothertask"
502                 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
503                 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
504                     for taskname in task_deps['recrdeptask'][taskData.tasks_name[task]].split():
505                         dep_seen = []
506                         rdep_seen = []
507                         idep_seen = []
508                         for depid in taskData.depids[fnid]:
509                             add_recursive_build(depid, fnid)
510                         for rdepid in taskData.rdepids[fnid]:
511                             add_recursive_run(rdepid, fnid)
512                         deptaskid = taskData.gettask_id(fn, taskname, False)
513                         for (idependid, idependtask) in get_recursive_idepends(deptaskid):
514                             add_recursive_build(idependid, fnid)
515
516                 # Rmove all self references
517                 if task in depends:
518                     newdep = []
519                     bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s %s) contains self reference! %s" % (task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends))
520                     for dep in depends:
521                        if task != dep:
522                           newdep.append(dep)
523                     depends = newdep
524
525
526             self.runq_fnid.append(taskData.tasks_fnid[task])
527             self.runq_task.append(taskData.tasks_name[task])
528             self.runq_depends.append(Set(depends))
529             self.runq_revdeps.append(Set())
530
531             runq_build.append(0)
532
533         # Step B - Mark all active tasks
534         #
535         # Start with the tasks we were asked to run and mark all dependencies
536         # as active too. If the task is to be 'forced', clear its stamp. Once
537         # all active tasks are marked, prune the ones we don't need.
538
539         bb.msg.note(2, bb.msg.domain.RunQueue, "Marking Active Tasks")
540
541         def mark_active(listid, depth):
542             """
543             Mark an item as active along with its depends
544             (calls itself recursively)
545             """
546
547             if runq_build[listid] == 1:
548                 return
549
550             runq_build[listid] = 1
551
552             depends = self.runq_depends[listid]
553             for depend in depends:
554                 mark_active(depend, depth+1)
555
556         self.target_pairs = []
557         for target in self.targets:
558             targetid = taskData.getbuild_id(target[0])
559
560             if targetid not in taskData.build_targets:
561                 continue
562
563             if targetid in taskData.failed_deps:
564                 continue
565
566             fnid = taskData.build_targets[targetid][0]
567             fn = taskData.fn_index[fnid]
568             self.target_pairs.append((fn, target[1]))
569
570             # Remove stamps for targets if force mode active
571             if self.cooker.configuration.force:
572                 bb.msg.note(2, bb.msg.domain.RunQueue, "Remove stamp %s, %s" % (target[1], fn))
573                 bb.build.del_stamp(target[1], self.dataCache, fn)
574
575             if fnid in taskData.failed_fnids:
576                 continue
577
578             if target[1] not in taskData.tasks_lookup[fnid]:
579                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s does not exist for target %s" % (target[1], target[0]))
580
581             listid = taskData.tasks_lookup[fnid][target[1]]
582
583             mark_active(listid, 1)
584
585         # Step C - Prune all inactive tasks
586         #
587         # Once all active tasks are marked, prune the ones we don't need.
588
589         maps = []
590         delcount = 0
591         for listid in range(len(self.runq_fnid)):
592             if runq_build[listid-delcount] == 1:
593                 maps.append(listid-delcount)
594             else:
595                 del self.runq_fnid[listid-delcount]
596                 del self.runq_task[listid-delcount]
597                 del self.runq_depends[listid-delcount]
598                 del runq_build[listid-delcount]
599                 del self.runq_revdeps[listid-delcount]
600                 delcount = delcount + 1
601                 maps.append(-1)
602
603         #
604         # Step D - Sanity checks and computation
605         #
606
607         # Check to make sure we still have tasks to run
608         if len(self.runq_fnid) == 0:
609             if not taskData.abort:
610                 bb.msg.fatal(bb.msg.domain.RunQueue, "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
611             else:               
612                 bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.")
613
614         bb.msg.note(2, bb.msg.domain.RunQueue, "Pruned %s inactive tasks, %s left" % (delcount, len(self.runq_fnid)))
615
616         # Remap the dependencies to account for the deleted tasks
617         # Check we didn't delete a task we depend on
618         for listid in range(len(self.runq_fnid)):
619             newdeps = []
620             origdeps = self.runq_depends[listid]
621             for origdep in origdeps:
622                 if maps[origdep] == -1:
623                     bb.msg.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!")
624                 newdeps.append(maps[origdep])
625             self.runq_depends[listid] = Set(newdeps)
626
627         bb.msg.note(2, bb.msg.domain.RunQueue, "Assign Weightings")
628
629         # Generate a list of reverse dependencies to ease future calculations
630         for listid in range(len(self.runq_fnid)):
631             for dep in self.runq_depends[listid]:
632                 self.runq_revdeps[dep].add(listid)
633
634         # Identify tasks at the end of dependency chains
635         # Error on circular dependency loops (length two)
636         endpoints = []
637         for listid in range(len(self.runq_fnid)):
638             revdeps = self.runq_revdeps[listid]
639             if len(revdeps) == 0:
640                 endpoints.append(listid)
641             for dep in revdeps:
642                 if dep in self.runq_depends[listid]:
643                     #self.dump_data(taskData)
644                     bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep] , taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
645
646         bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints))
647
648
649         # Calculate task weights 
650         # Check of higher length circular dependencies
651         self.runq_weight = self.calculate_task_weights(endpoints)
652
653         # Decide what order to execute the tasks in, pick a scheduler
654         #self.sched = RunQueueScheduler(self)
655         if self.scheduler == "completion":
656             self.sched = RunQueueSchedulerCompletion(self)
657         else:
658             self.sched = RunQueueSchedulerSpeed(self)
659
660         # Sanity Check - Check for multiple tasks building the same provider
661         prov_list = {}
662         seen_fn = []
663         for task in range(len(self.runq_fnid)):
664             fn = taskData.fn_index[self.runq_fnid[task]]
665             if fn in seen_fn:
666                 continue
667             seen_fn.append(fn)
668             for prov in self.dataCache.fn_provides[fn]:
669                 if prov not in prov_list:
670                     prov_list[prov] = [fn]
671                 elif fn not in prov_list[prov]: 
672                     prov_list[prov].append(fn)
673         error = False
674         for prov in prov_list:
675             if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
676                 error = True
677                 bb.msg.error(bb.msg.domain.RunQueue, "Multiple .bb files are due to be built which each provide %s (%s).\n This usually means one provides something the other doesn't and should." % (prov, " ".join(prov_list[prov])))
678         #if error:
679         #    bb.msg.fatal(bb.msg.domain.RunQueue, "Corrupted metadata configuration detected, aborting...")
680
681
682         # Create a whitelist usable by the stamp checks
683         stampfnwhitelist = []
684         for entry in self.stampwhitelist.split():
685             entryid = self.taskData.getbuild_id(entry)
686             if entryid not in self.taskData.build_targets:
687                 continue
688             fnid = self.taskData.build_targets[entryid][0]
689             fn = self.taskData.fn_index[fnid]
690             stampfnwhitelist.append(fn)
691         self.stampfnwhitelist = stampfnwhitelist
692
693         #self.dump_data(taskData)
694
695     def check_stamps(self):
696         unchecked = {}
697         current = []
698         notcurrent = []
699         buildable = []
700
701         if self.stamppolicy == "perfile":
702             fulldeptree = False
703         else:
704             fulldeptree = True
705             stampwhitelist = []
706             if self.stamppolicy == "whitelist":
707                 stampwhitelist = self.self.stampfnwhitelist
708
709         for task in range(len(self.runq_fnid)):
710             unchecked[task] = ""
711             if len(self.runq_depends[task]) == 0:
712                 buildable.append(task)
713
714         def check_buildable(self, task, buildable):
715              for revdep in self.runq_revdeps[task]:
716                 alldeps = 1
717                 for dep in self.runq_depends[revdep]:
718                     if dep in unchecked:
719                         alldeps = 0
720                 if alldeps == 1:
721                     if revdep in unchecked:
722                         buildable.append(revdep)
723
724         for task in range(len(self.runq_fnid)):
725             if task not in unchecked:
726                 continue
727             fn = self.taskData.fn_index[self.runq_fnid[task]]
728             taskname = self.runq_task[task]
729             stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
730             # If the stamp is missing its not current
731             if not os.access(stampfile, os.F_OK):
732                 del unchecked[task]
733                 notcurrent.append(task)
734                 check_buildable(self, task, buildable)
735                 continue
736             # If its a 'nostamp' task, it's not current
737             taskdep = self.dataCache.task_deps[fn]
738             if 'nostamp' in taskdep and task in taskdep['nostamp']:
739                 del unchecked[task]
740                 notcurrent.append(task)
741                 check_buildable(self, task, buildable)
742                 continue
743
744         while (len(buildable) > 0):
745             nextbuildable = []
746             for task in buildable:
747                 if task in unchecked:
748                     fn = self.taskData.fn_index[self.runq_fnid[task]]
749                     taskname = self.runq_task[task]
750                     stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
751                     iscurrent = True
752
753                     t1 = os.stat(stampfile)[stat.ST_MTIME]
754                     for dep in self.runq_depends[task]:
755                         if iscurrent:
756                             fn2 = self.taskData.fn_index[self.runq_fnid[dep]]
757                             taskname2 = self.runq_task[dep]
758                             stampfile2 = "%s.%s" % (self.dataCache.stamp[fn2], taskname2)
759                             if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
760                                 if dep in notcurrent:
761                                     iscurrent = False
762                                 else:
763                                     t2 = os.stat(stampfile2)[stat.ST_MTIME]
764                                     if t1 < t2:
765                                         iscurrent = False
766                     del unchecked[task]
767                     if iscurrent:
768                         current.append(task)
769                     else:
770                         notcurrent.append(task)
771
772                 check_buildable(self, task, nextbuildable)
773
774             buildable = nextbuildable
775
776         #for task in range(len(self.runq_fnid)):
777         #    fn = self.taskData.fn_index[self.runq_fnid[task]]
778         #    taskname = self.runq_task[task]
779         #    print "%s %s.%s" % (task, taskname, fn)
780
781         #print "Unchecked: %s" % unchecked
782         #print "Current: %s" % current
783         #print "Not current: %s" % notcurrent
784
785         if len(unchecked) > 0:
786             bb.fatal("check_stamps fatal internal error")
787         return current
788
789     def check_stamp_task(self, task):
790
791         if self.stamppolicy == "perfile":
792             fulldeptree = False
793         else:
794             fulldeptree = True
795             stampwhitelist = []
796             if self.stamppolicy == "whitelist":
797                 stampwhitelist = self.stampfnwhitelist
798
799         fn = self.taskData.fn_index[self.runq_fnid[task]]
800         taskname = self.runq_task[task]
801         stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
802         # If the stamp is missing its not current
803         if not os.access(stampfile, os.F_OK):
804             bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s not available\n" % stampfile)
805             return False
806         # If its a 'nostamp' task, it's not current
807         taskdep = self.dataCache.task_deps[fn]
808         if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
809             bb.msg.debug(2, bb.msg.domain.RunQueue, "%s.%s is nostamp\n" % (fn, taskname))
810             return False
811
812         iscurrent = True
813         t1 =  os.stat(stampfile)[stat.ST_MTIME]
814         for dep in self.runq_depends[task]:
815             if iscurrent:
816                 fn2 = self.taskData.fn_index[self.runq_fnid[dep]]
817                 taskname2 = self.runq_task[dep]
818                 stampfile2 = "%s.%s" % (self.dataCache.stamp[fn2], taskname2)
819                 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
820                     try:
821                         t2 = os.stat(stampfile2)[stat.ST_MTIME]
822                         if t1 < t2:
823                             bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s < %s" % (stampfile,stampfile2))
824                             iscurrent = False
825                     except:
826                         bb.msg.debug(2, bb.msg.domain.RunQueue, "Exception reading %s for %s" % (stampfile2 ,stampfile))
827                         iscurrent = False
828
829         return iscurrent
830
831     def execute_runqueue(self):
832         """
833         Run the tasks in a queue prepared by prepare_runqueue
834         Upon failure, optionally try to recover the build using any alternate providers
835         (if the abort on failure configuration option isn't set)
836         """
837
838         failures = 0
839         while 1:
840             failed_fnids = []
841             try:
842                 self.execute_runqueue_internal()
843             finally:
844                 if self.master_process:
845                     failed_fnids = self.finish_runqueue()
846             if len(failed_fnids) == 0:
847                 return failures
848             if self.taskData.abort:
849                 raise bb.runqueue.TaskFailure(failed_fnids)
850             for fnid in failed_fnids:
851                 #print "Failure: %s %s %s" % (fnid, self.taskData.fn_index[fnid],  self.runq_task[fnid])
852                 self.taskData.fail_fnid(fnid)
853                 failures = failures + 1
854             self.reset_runqueue()
855             self.prepare_runqueue()
856
857     def execute_runqueue_initVars(self):
858
859         self.stats = RunQueueStats()
860
861         self.active_builds = 0
862         self.runq_buildable = []
863         self.runq_running = []
864         self.runq_complete = []
865         self.build_pids = {}
866         self.failed_fnids = []
867         self.master_process = True
868
869         # Mark initial buildable tasks
870         for task in range(len(self.runq_fnid)):
871             self.runq_running.append(0)
872             self.runq_complete.append(0)
873             if len(self.runq_depends[task]) == 0:
874                 self.runq_buildable.append(1)
875             else:
876                 self.runq_buildable.append(0)
877
878     def task_complete(self, task):
879         """
880         Mark a task as completed
881         Look at the reverse dependencies and mark any task with 
882         completed dependencies as buildable
883         """
884         self.runq_complete[task] = 1
885         for revdep in self.runq_revdeps[task]:
886             if self.runq_running[revdep] == 1:
887                 continue
888             if self.runq_buildable[revdep] == 1:
889                 continue
890             alldeps = 1
891             for dep in self.runq_depends[revdep]:
892                 if self.runq_complete[dep] != 1:
893                     alldeps = 0
894             if alldeps == 1:
895                 self.runq_buildable[revdep] = 1
896                 fn = self.taskData.fn_index[self.runq_fnid[revdep]]
897                 taskname = self.runq_task[revdep]
898                 bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname))
899
900     def execute_runqueue_internal(self):
901         """
902         Run the tasks in a queue prepared by prepare_runqueue
903         """
904
905         bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
906
907         self.execute_runqueue_initVars()
908
909         if len(self.runq_fnid) == 0:
910             # nothing to do
911             return []
912
913         def sigint_handler(signum, frame):
914             raise KeyboardInterrupt
915
916         event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp, self.cfgdata))
917
918         while True:
919             task = self.sched.next()
920             if task is not None:
921                 fn = self.taskData.fn_index[self.runq_fnid[task]]
922
923                 taskname = self.runq_task[task]
924                 if self.check_stamp_task(task):
925                     bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task)))
926                     self.runq_running[task] = 1
927                     self.task_complete(task)
928                     self.stats.taskCompleted()
929                     self.stats.taskSkipped()
930                     continue
931
932                 bb.msg.note(1, bb.msg.domain.RunQueue, "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.active_builds + 1, len(self.runq_fnid), task, self.get_user_idstring(task)))
933                 sys.stdout.flush()
934                 sys.stderr.flush()
935                 try: 
936                     pid = os.fork() 
937                 except OSError, e: 
938                     bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
939                 if pid == 0:
940                     # Bypass master process' handling
941                     self.master_process = False
942                     # Stop Ctrl+C being sent to children
943                     # signal.signal(signal.SIGINT, signal.SIG_IGN)
944                     # Make the child the process group leader
945                     os.setpgid(0, 0)
946                     newsi = os.open('/dev/null', os.O_RDWR)
947                     os.dup2(newsi, sys.stdin.fileno())
948                     self.cooker.configuration.cmd = taskname[3:]
949                     bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
950                     try:
951                         self.cooker.tryBuild(fn)
952                     except bb.build.EventException:
953                         bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
954                         sys.exit(1)
955                     except:
956                         bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
957                         raise
958                     sys.exit(0)
959                 self.build_pids[pid] = task
960                 self.runq_running[task] = 1
961                 self.active_builds = self.active_builds + 1
962                 if self.active_builds < self.number_tasks:
963                     continue
964             if self.active_builds > 0:
965                 result = os.waitpid(-1, 0)
966                 self.active_builds = self.active_builds - 1
967                 task = self.build_pids[result[0]]
968                 if result[1] != 0:
969                     del self.build_pids[result[0]]
970                     bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task)))
971                     self.failed_fnids.append(self.runq_fnid[task])
972                     self.stats.taskFailed()
973                     break
974                 self.task_complete(task)
975                 self.stats.taskCompleted()
976                 del self.build_pids[result[0]]
977                 continue
978             return
979
980     def finish_runqueue(self):
981         try:
982             while self.active_builds > 0:
983                 bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.active_builds)
984                 tasknum = 1
985                 for k, v in self.build_pids.iteritems():
986                      bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v), k))
987                      tasknum = tasknum + 1
988                 result = os.waitpid(-1, 0)
989                 task = self.build_pids[result[0]]
990                 if result[1] != 0:
991                      bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task)))
992                      self.failed_fnids.append(self.runq_fnid[task])
993                      self.stats.taskFailed()
994                 del self.build_pids[result[0]]
995                 self.active_builds = self.active_builds - 1
996             bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
997             return self.failed_fnids
998         except KeyboardInterrupt:
999             bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.active_builds)
1000             for k, v in self.build_pids.iteritems():
1001                  try:
1002                      os.kill(-k, signal.SIGINT)
1003                  except:
1004                      pass
1005             raise
1006
1007         # Sanity Checks
1008         for task in range(len(self.runq_fnid)):
1009             if self.runq_buildable[task] == 0:
1010                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task)
1011             if self.runq_running[task] == 0:
1012                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task)
1013             if self.runq_complete[task] == 0:
1014                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task)
1015
1016         bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
1017
1018         return self.failed_fnids
1019
1020     def dump_data(self, taskQueue):
1021         """
1022         Dump some debug information on the internal data structures
1023         """
1024         bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
1025         for task in range(len(self.runq_fnid)):
1026                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
1027                         taskQueue.fn_index[self.runq_fnid[task]], 
1028                         self.runq_task[task], 
1029                         self.runq_weight[task], 
1030                         self.runq_depends[task], 
1031                         self.runq_revdeps[task]))
1032
1033         bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
1034         for task1 in range(len(self.runq_fnid)):
1035             if task1 in self.prio_map:
1036                 task = self.prio_map[task1]
1037                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
1038                         taskQueue.fn_index[self.runq_fnid[task]], 
1039                         self.runq_task[task], 
1040                         self.runq_weight[task], 
1041                         self.runq_depends[task], 
1042                         self.runq_revdeps[task]))
1043
1044
1045 def check_stamp_fn(fn, taskname, d):
1046     rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
1047     fnid = rq.taskData.getfn_id(fn)
1048     taskid = rq.get_task_id(fnid, taskname)
1049     if taskid is not None:
1050         return rq.check_stamp_task(taskid)
1051     return None
1052