0a948ca955efa1e487b1f15b09bb1ff5cf97294b
[vuplus_bitbake] / lib / bb / runqueue.py
1                 #!/usr/bin/env python
2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4 """
5 BitBake 'RunQueue' implementation
6
7 Handles preparation and execution of a queue of tasks
8
9 Copyright (C) 2006  Richard Purdie
10
11 This program is free software; you can redistribute it and/or modify it under
12 the terms of the GNU General Public License version 2 as published by the Free 
13 Software Foundation
14
15 This program is distributed in the hope that it will be useful, but WITHOUT
16 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License along with
20 """
21
22 from bb import msg, data, fetch, event, mkdirhier, utils
23 from sets import Set 
24 import bb, os, sys
25
26 class TaskFailure(Exception):
27     """Exception raised when a task in a runqueue fails"""
28
29     def __init__(self, fnid, fn, taskname):
30         self.args = fnid, fn, taskname
31
32 class RunQueue:
33     """
34     BitBake Run Queue implementation
35     """
36     def __init__(self):
37         self.reset_runqueue()
38
39     def reset_runqueue(self):
40         self.runq_fnid = []
41         self.runq_task = []
42         self.runq_depends = []
43         self.runq_revdeps = []
44         self.runq_weight = []
45         self.prio_map = []
46
47     def get_user_idstring(self, task, taskData):
48         fn = taskData.fn_index[self.runq_fnid[task]]
49         taskname = self.runq_task[task]
50         return "%s, %s" % (fn, taskname)
51
52     def prepare_runqueue(self, cfgData, dataCache, taskData, targets):
53         """
54         Turn a set of taskData into a RunQueue and compute data needed 
55         to optimise the execution order.
56         targets is list of paired values - a provider name and the task to run
57         """
58
59         depends = []
60         runq_weight1 = []
61         runq_build = []
62         runq_done = []
63
64         bb.msg.note(1, bb.msg.domain.RunQueue, "Preparing Runqueue")
65
66         for task in range(len(taskData.tasks_name)):
67             fnid = taskData.tasks_fnid[task]
68             fn = taskData.fn_index[fnid]
69             task_deps = dataCache.task_deps[fn]
70
71             if fnid not in taskData.failed_fnids:
72
73                 depends = taskData.tasks_tdepends[task]
74
75                 # Resolve Depends
76                 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
77                     taskname = task_deps['deptask'][taskData.tasks_name[task]]
78                     for depid in taskData.depids[fnid]:
79                         if depid in taskData.build_targets:
80                             depdata = taskData.build_targets[depid][0]
81                             if depdata:
82                                 dep = taskData.fn_index[depdata]
83                                 depends.append(taskData.gettask_id(dep, taskname))
84
85                 # Resolve Runtime Depends
86                 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
87                     taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
88                     for depid in taskData.rdepids[fnid]:
89                         if depid in taskData.run_targets:
90                             depdata = taskData.run_targets[depid][0]
91                             if depdata:
92                                 dep = taskData.fn_index[depdata]
93                                 depends.append(taskData.gettask_id(dep, taskname))
94
95                 def add_recursive_build(depid):
96                     """
97                     Add build depends of depid to depends
98                     (if we've not see it before)
99                     (calls itself recursively)
100                     """
101                     if str(depid) in dep_seen:
102                         return
103                     dep_seen.append(depid)
104                     if depid in taskData.build_targets:
105                         depdata = taskData.build_targets[depid][0]
106                         if depdata:
107                             dep = taskData.fn_index[depdata]
108                             taskid = taskData.gettask_id(dep, taskname)
109                             depends.append(taskid)
110                             fnid = taskData.tasks_fnid[taskid]
111                             for nextdepid in taskData.depids[fnid]:
112                                 if nextdepid not in dep_seen:
113                                     add_recursive_build(nextdepid)
114                             for nextdepid in taskData.rdepids[fnid]:
115                                 if nextdepid not in rdep_seen:
116                                     add_recursive_run(nextdepid)
117
118                 def add_recursive_run(rdepid):
119                     """
120                     Add runtime depends of rdepid to depends
121                     (if we've not see it before)
122                     (calls itself recursively)
123                     """
124                     if str(rdepid) in rdep_seen:
125                         return
126                     rdep_seen.append(rdepid)
127                     if rdepid in taskData.run_targets:
128                         depdata = taskData.run_targets[rdepid][0]
129                         if depdata:
130                             dep = taskData.fn_index[depdata]
131                             taskid = taskData.gettask_id(dep, taskname)
132                             depends.append(taskid)
133                             fnid = taskData.tasks_fnid[taskid]
134                             for nextdepid in taskData.depids[fnid]:
135                                 if nextdepid not in dep_seen:
136                                     add_recursive_build(nextdepid)
137                             for nextdepid in taskData.rdepids[fnid]:
138                                 if nextdepid not in rdep_seen:
139                                     add_recursive_run(nextdepid)
140
141
142                 # Resolve Recursive Runtime Depends
143                 # Also includes all Build Depends (and their runtime depends)
144                 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
145                     dep_seen = []
146                     rdep_seen = []
147                     taskname = task_deps['recrdeptask'][taskData.tasks_name[task]]
148                     for depid in taskData.depids[fnid]:
149                         add_recursive_build(depid)
150                     for rdepid in taskData.rdepids[fnid]:
151                         add_recursive_run(rdepid)
152
153                 #Prune self references
154                 if task in depends:
155                     newdep = []
156                     bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s %s) contains self reference! %s" % (task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends))
157                     for dep in depends:
158                        if task != dep:
159                           newdep.append(dep)
160                     depends = newdep
161
162
163             self.runq_fnid.append(taskData.tasks_fnid[task])
164             self.runq_task.append(taskData.tasks_name[task])
165             self.runq_depends.append(Set(depends))
166             self.runq_revdeps.append(Set())
167             self.runq_weight.append(0)
168
169             runq_weight1.append(0)
170             runq_build.append(0)
171             runq_done.append(0)
172
173         bb.msg.note(2, bb.msg.domain.RunQueue, "Marking Active Tasks")
174
175         def mark_active(listid, depth):
176             """
177             Mark an item as active along with its depends
178             (calls itself recursively)
179             """
180
181             if runq_build[listid] == 1:
182                 return
183
184             runq_build[listid] = 1
185
186             depends = self.runq_depends[listid]
187             for depend in depends:
188                 mark_active(depend, depth+1)
189
190         for target in targets:
191             targetid = taskData.getbuild_id(target[0])
192             if targetid in taskData.failed_deps:
193                 continue
194
195             if targetid not in taskData.build_targets:
196                 continue
197
198             fnid = taskData.build_targets[targetid][0]
199             if fnid in taskData.failed_fnids:
200                 continue
201
202             listid = taskData.tasks_lookup[fnid][target[1]]
203
204             mark_active(listid, 1)
205
206         # Prune inactive tasks
207         maps = []
208         delcount = 0
209         for listid in range(len(self.runq_fnid)):
210             if runq_build[listid-delcount] == 1:
211                 maps.append(listid-delcount)
212             else:
213                 del self.runq_fnid[listid-delcount]
214                 del self.runq_task[listid-delcount]
215                 del self.runq_depends[listid-delcount]
216                 del self.runq_weight[listid-delcount]
217                 del runq_weight1[listid-delcount]
218                 del runq_build[listid-delcount]
219                 del runq_done[listid-delcount]
220                 del self.runq_revdeps[listid-delcount]
221                 delcount = delcount + 1
222                 maps.append(-1)
223
224         if len(self.runq_fnid) == 0:
225             if not taskData.abort:
226                 bb.msg.note(1, bb.msg.domain.RunQueue, "All possible tasks have been run but build incomplete (--continue mode). See errors above for incomplete tasks.")
227                 return
228             bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.")
229
230         bb.msg.note(2, bb.msg.domain.RunQueue, "Pruned %s inactive tasks, %s left" % (delcount, len(self.runq_fnid)))
231
232         for listid in range(len(self.runq_fnid)):
233             newdeps = []
234             origdeps = self.runq_depends[listid]
235             for origdep in origdeps:
236                 if maps[origdep] == -1:
237                     bb.msg.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!")
238                 newdeps.append(maps[origdep])
239             self.runq_depends[listid] = Set(newdeps)
240
241         bb.msg.note(2, bb.msg.domain.RunQueue, "Assign Weightings")
242
243         for listid in range(len(self.runq_fnid)):
244             for dep in self.runq_depends[listid]:
245                 self.runq_revdeps[dep].add(listid)
246
247         endpoints = []
248         for listid in range(len(self.runq_fnid)):
249             revdeps = self.runq_revdeps[listid]
250             if len(revdeps) == 0:
251                 runq_done[listid] = 1
252                 self.runq_weight[listid] = 1
253                 endpoints.append(listid)
254             for dep in revdeps:
255                 if dep in self.runq_depends[listid]:
256                     #self.dump_data(taskData)
257                     bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep] , taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
258             runq_weight1[listid] = len(revdeps)
259
260         bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints))
261
262         while 1:
263             next_points = []
264             for listid in endpoints:
265                 for revdep in self.runq_depends[listid]:
266                     self.runq_weight[revdep] = self.runq_weight[revdep] + self.runq_weight[listid]
267                     runq_weight1[revdep] = runq_weight1[revdep] - 1
268                     if runq_weight1[revdep] == 0:
269                         next_points.append(revdep)
270                         runq_done[revdep] = 1
271             endpoints = next_points
272             if len(next_points) == 0:
273                 break           
274
275         # Sanity Checks
276         for task in range(len(self.runq_fnid)):
277             if runq_done[task] == 0:
278                 seen = []
279                 deps_seen = []
280                 def print_chain(taskid, finish):
281                     seen.append(taskid)
282                     for revdep in self.runq_revdeps[taskid]:
283                         if runq_done[revdep] == 0 and revdep not in seen and not finish:
284                             bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) (depends: %s)" % (revdep, self.get_user_idstring(revdep, taskData), self.runq_depends[revdep]))
285                             if revdep in deps_seen:
286                                 bb.msg.error(bb.msg.domain.RunQueue, "Chain ends at Task %s (%s)" % (revdep, self.get_user_idstring(revdep, taskData)))
287                                 finish = True
288                                 return
289                             for dep in self.runq_depends[revdep]:
290                                 deps_seen.append(dep)
291                             print_chain(revdep, finish)
292                 print_chain(task, False)
293                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) not processed!\nThis is probably a circular dependency (the chain might be printed above)." % (task, self.get_user_idstring(task, taskData)))
294             if runq_weight1[task] != 0:
295                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) count not zero!" % (task, self.get_user_idstring(task, taskData)))
296
297         # Make a weight sorted map
298         from copy import deepcopy
299
300         sortweight = deepcopy(self.runq_weight)
301         sortweight.sort()
302         copyweight = deepcopy(self.runq_weight)
303         self.prio_map = []
304
305         for weight in sortweight:
306             idx = copyweight.index(weight)
307             self.prio_map.append(idx)
308             copyweight[idx] = -1
309         self.prio_map.reverse()
310
311         #self.dump_data(taskData)
312
313     def execute_runqueue(self, cooker, cfgData, dataCache, taskData, runlist):
314         """
315         Run the tasks in a queue prepared by prepare_runqueue
316         Upon failure, optionally try to recover the build using any alternate providers
317         (if the abort on failure configuration option isn't set)
318         """
319
320         failures = 0
321         while 1:
322             try:
323                 self.execute_runqueue_internal(cooker, cfgData, dataCache, taskData)
324                 return failures
325             except bb.runqueue.TaskFailure, (fnid, taskData.fn_index[fnid], taskname):
326                 if taskData.abort:
327                     raise
328                 taskData.fail_fnid(fnid)
329                 self.reset_runqueue()
330                 self.prepare_runqueue(cfgData, dataCache, taskData, runlist)
331                 failures = failures + 1
332
333     def execute_runqueue_internal(self, cooker, cfgData, dataCache, taskData):
334         """
335         Run the tasks in a queue prepared by prepare_runqueue
336         """
337
338         bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
339
340         runq_buildable = []
341         runq_running = []
342         runq_complete = []
343         active_builds = 0
344         build_pids = {}
345
346         if len(self.runq_fnid) == 0:
347             # nothing to do
348             return
349
350         def get_next_task(data):
351             """
352             Return the id of the highest priority task that is buildable
353             """
354             for task1 in range(len(data.runq_fnid)):
355                 task = data.prio_map[task1]
356                 if runq_running[task] == 1:
357                     continue
358                 if runq_buildable[task] == 1:
359                     return task
360             return None
361
362         def task_complete(data, task):
363             """
364             Mark a task as completed
365             Look at the reverse dependencies and mark any task with 
366             completed dependencies as buildable
367             """
368             runq_complete[task] = 1
369             for revdep in data.runq_revdeps[task]:
370                 if runq_running[revdep] == 1:
371                     continue
372                 if runq_buildable[revdep] == 1:
373                     continue
374                 alldeps = 1
375                 for dep in data.runq_depends[revdep]:
376                     if runq_complete[dep] != 1:
377                         alldeps = 0
378                 if alldeps == 1:
379                     runq_buildable[revdep] = 1
380                     fn = taskData.fn_index[self.runq_fnid[revdep]]
381                     taskname = self.runq_task[revdep]
382                     bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname))
383
384         # Mark initial buildable tasks
385         for task in range(len(self.runq_fnid)):
386             runq_running.append(0)
387             runq_complete.append(0)
388             if len(self.runq_depends[task]) == 0:
389                 runq_buildable.append(1)
390             else:
391                 runq_buildable.append(0)
392
393
394         number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData) or 1)
395
396         try:
397             while 1:
398                 task = get_next_task(self)
399                 if task is not None:
400                     fn = taskData.fn_index[self.runq_fnid[task]]
401                     taskname = self.runq_task[task]
402
403                     if bb.build.stamp_is_current_cache(dataCache, fn, taskname):
404                         targetid = taskData.gettask_id(fn, taskname)
405                         if not (targetid in taskData.external_targets and cooker.configuration.force):
406                             bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task, taskData)))
407                             runq_running[task] = 1
408                             task_complete(self, task)
409                             continue
410
411                     bb.msg.debug(1, bb.msg.domain.RunQueue, "Running task %s (%s)" % (task, self.get_user_idstring(task, taskData)))
412                     try: 
413                         pid = os.fork() 
414                     except OSError, e: 
415                         bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
416                     if pid == 0:
417                         cooker.configuration.cmd = taskname[3:]
418                         try: 
419                             cooker.tryBuild(fn, False)
420                         except bb.build.EventException:
421                             bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
422                             sys.exit(1)
423                         except:
424                             bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
425                             raise
426                         sys.exit(0)
427                     build_pids[pid] = task
428                     runq_running[task] = 1
429                     active_builds = active_builds + 1
430                     if active_builds < number_tasks:
431                         continue
432                 if active_builds > 0:
433                     result = os.waitpid(-1, 0)
434                     active_builds = active_builds - 1
435                     if result[1] != 0:
436                         bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (build_pids[result[0]], self.get_user_idstring(build_pids[result[0]], taskData)))
437                         raise bb.runqueue.TaskFailure(self.runq_fnid[build_pids[result[0]]], taskData.fn_index[self.runq_fnid[build_pids[result[0]]]], self.runq_task[build_pids[result[0]]])
438                     task_complete(self, build_pids[result[0]])
439                     del build_pids[result[0]]
440                     continue
441                 break
442         except SystemExit:
443             raise
444         except:
445             bb.msg.error(bb.msg.domain.RunQueue, "Exception received")
446             if active_builds > 0:
447                 while active_builds > 0:
448                     bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % active_builds)
449                     tasknum = 1
450                     for k, v in build_pids.iteritems():
451                         bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v, taskData), k))
452                         tasknum = tasknum + 1
453                     result = os.waitpid(-1, 0)
454                     del build_pids[result[0]]               
455                     active_builds = active_builds - 1
456             raise
457
458         # Sanity Checks
459         for task in range(len(self.runq_fnid)):
460             if runq_buildable[task] == 0:
461                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task)
462             if runq_running[task] == 0:
463                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task)
464             if runq_complete[task] == 0:
465                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task)
466
467         return 0
468
469     def dump_data(self, taskQueue):
470         """
471         Dump some debug information on the internal data structures
472         """
473         bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
474         for task in range(len(self.runq_fnid)):
475                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
476                         taskQueue.fn_index[self.runq_fnid[task]], 
477                         self.runq_task[task], 
478                         self.runq_weight[task], 
479                         self.runq_depends[task], 
480                         self.runq_revdeps[task]))
481
482         bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
483         for task1 in range(len(self.runq_fnid)):
484             if task1 in self.prio_map:
485                 task = self.prio_map[task1]
486                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
487                         taskQueue.fn_index[self.runq_fnid[task]], 
488                         self.runq_task[task], 
489                         self.runq_weight[task], 
490                         self.runq_depends[task], 
491                         self.runq_revdeps[task]))