05027c9d4569b4d7a790b1a2576929540fa45ce4
[vuplus_bitbake] / lib / bb / runqueue.py
1 #!/usr/bin/env python
2 # ex:ts=4:sw=4:sts=4:et
3 # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4 """
5 BitBake 'RunQueue' implementation
6
7 Handles preparation and execution of a queue of tasks
8
9 Copyright (C) 2006  Richard Purdie
10
11 This program is free software; you can redistribute it and/or modify it under
12 the terms of the GNU General Public License version 2 as published by the Free 
13 Software Foundation
14
15 This program is distributed in the hope that it will be useful, but WITHOUT
16 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License along with
20 """
21
22 from bb import msg, data, fetch, event, mkdirhier, utils
23 from sets import Set 
24 import bb, os, sys
25
26 class TaskFailure(Exception):
27     """Exception raised when a task in a runqueue fails"""
28     def __init__(self, x): 
29         self.args = x
30
31 class RunQueue:
32     """
33     BitBake Run Queue implementation
34     """
35     def __init__(self):
36         self.reset_runqueue()
37
38     def reset_runqueue(self):
39         self.runq_fnid = []
40         self.runq_task = []
41         self.runq_depends = []
42         self.runq_revdeps = []
43         self.runq_weight = []
44         self.prio_map = []
45
46     def get_user_idstring(self, task, taskData):
47         fn = taskData.fn_index[self.runq_fnid[task]]
48         taskname = self.runq_task[task]
49         return "%s, %s" % (fn, taskname)
50
51     def prepare_runqueue(self, cfgData, dataCache, taskData, targets):
52         """
53         Turn a set of taskData into a RunQueue and compute data needed 
54         to optimise the execution order.
55         targets is list of paired values - a provider name and the task to run
56         """
57
58         depends = []
59         runq_weight1 = []
60         runq_build = []
61         runq_done = []
62
63         bb.msg.note(1, bb.msg.domain.RunQueue, "Preparing Runqueue")
64
65         for task in range(len(taskData.tasks_name)):
66             fnid = taskData.tasks_fnid[task]
67             fn = taskData.fn_index[fnid]
68             task_deps = dataCache.task_deps[fn]
69
70             if fnid not in taskData.failed_fnids:
71
72                 depends = taskData.tasks_tdepends[task]
73
74                 # Resolve Depends
75                 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
76                     taskname = task_deps['deptask'][taskData.tasks_name[task]]
77                     for depid in taskData.depids[fnid]:
78                         if depid in taskData.build_targets:
79                             depdata = taskData.build_targets[depid][0]
80                             if depdata:
81                                 dep = taskData.fn_index[depdata]
82                                 depends.append(taskData.gettask_id(dep, taskname))
83
84                 # Resolve Runtime Depends
85                 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
86                     taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
87                     for depid in taskData.rdepids[fnid]:
88                         if depid in taskData.run_targets:
89                             depdata = taskData.run_targets[depid][0]
90                             if depdata:
91                                 dep = taskData.fn_index[depdata]
92                                 depends.append(taskData.gettask_id(dep, taskname))
93
94                 def add_recursive_build(depid):
95                     """
96                     Add build depends of depid to depends
97                     (if we've not see it before)
98                     (calls itself recursively)
99                     """
100                     if str(depid) in dep_seen:
101                         return
102                     dep_seen.append(depid)
103                     if depid in taskData.build_targets:
104                         depdata = taskData.build_targets[depid][0]
105                         if depdata:
106                             dep = taskData.fn_index[depdata]
107                             taskid = taskData.gettask_id(dep, taskname)
108                             depends.append(taskid)
109                             fnid = taskData.tasks_fnid[taskid]
110                             for nextdepid in taskData.depids[fnid]:
111                                 if nextdepid not in dep_seen:
112                                     add_recursive_build(nextdepid)
113                             for nextdepid in taskData.rdepids[fnid]:
114                                 if nextdepid not in rdep_seen:
115                                     add_recursive_run(nextdepid)
116
117                 def add_recursive_run(rdepid):
118                     """
119                     Add runtime depends of rdepid to depends
120                     (if we've not see it before)
121                     (calls itself recursively)
122                     """
123                     if str(rdepid) in rdep_seen:
124                         return
125                     rdep_seen.append(rdepid)
126                     if rdepid in taskData.run_targets:
127                         depdata = taskData.run_targets[rdepid][0]
128                         if depdata:
129                             dep = taskData.fn_index[depdata]
130                             taskid = taskData.gettask_id(dep, taskname)
131                             depends.append(taskid)
132                             fnid = taskData.tasks_fnid[taskid]
133                             for nextdepid in taskData.depids[fnid]:
134                                 if nextdepid not in dep_seen:
135                                     add_recursive_build(nextdepid)
136                             for nextdepid in taskData.rdepids[fnid]:
137                                 if nextdepid not in rdep_seen:
138                                     add_recursive_run(nextdepid)
139
140
141                 # Resolve Recursive Runtime Depends
142                 # Also includes all Build Depends (and their runtime depends)
143                 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
144                     dep_seen = []
145                     rdep_seen = []
146                     taskname = task_deps['recrdeptask'][taskData.tasks_name[task]]
147                     for depid in taskData.depids[fnid]:
148                         add_recursive_build(depid)
149                     for rdepid in taskData.rdepids[fnid]:
150                         add_recursive_run(rdepid)
151
152                 #Prune self references
153                 if task in depends:
154                     newdep = []
155                     bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s %s) contains self reference! %s" % (task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends))
156                     for dep in depends:
157                        if task != dep:
158                           newdep.append(dep)
159                     depends = newdep
160
161
162             self.runq_fnid.append(taskData.tasks_fnid[task])
163             self.runq_task.append(taskData.tasks_name[task])
164             self.runq_depends.append(Set(depends))
165             self.runq_revdeps.append(Set())
166             self.runq_weight.append(0)
167
168             runq_weight1.append(0)
169             runq_build.append(0)
170             runq_done.append(0)
171
172         bb.msg.note(2, bb.msg.domain.RunQueue, "Marking Active Tasks")
173
174         def mark_active(listid, depth):
175             """
176             Mark an item as active along with its depends
177             (calls itself recursively)
178             """
179
180             if runq_build[listid] == 1:
181                 return
182
183             runq_build[listid] = 1
184
185             depends = self.runq_depends[listid]
186             for depend in depends:
187                 mark_active(depend, depth+1)
188
189         for target in targets:
190             targetid = taskData.getbuild_id(target[0])
191             if targetid in taskData.failed_deps:
192                 continue
193
194             if targetid not in taskData.build_targets:
195                 continue
196
197             fnid = taskData.build_targets[targetid][0]
198             if fnid in taskData.failed_fnids:
199                 continue
200
201             listid = taskData.tasks_lookup[fnid][target[1]]
202
203             mark_active(listid, 1)
204
205         # Prune inactive tasks
206         maps = []
207         delcount = 0
208         for listid in range(len(self.runq_fnid)):
209             if runq_build[listid-delcount] == 1:
210                 maps.append(listid-delcount)
211             else:
212                 del self.runq_fnid[listid-delcount]
213                 del self.runq_task[listid-delcount]
214                 del self.runq_depends[listid-delcount]
215                 del self.runq_weight[listid-delcount]
216                 del runq_weight1[listid-delcount]
217                 del runq_build[listid-delcount]
218                 del runq_done[listid-delcount]
219                 del self.runq_revdeps[listid-delcount]
220                 delcount = delcount + 1
221                 maps.append(-1)
222
223         if len(self.runq_fnid) == 0:
224             if not taskData.abort:
225                 bb.msg.note(1, bb.msg.domain.RunQueue, "All possible tasks have been run but build incomplete (--continue mode). See errors above for incomplete tasks.")
226                 return
227             bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.")
228
229         bb.msg.note(2, bb.msg.domain.RunQueue, "Pruned %s inactive tasks, %s left" % (delcount, len(self.runq_fnid)))
230
231         for listid in range(len(self.runq_fnid)):
232             newdeps = []
233             origdeps = self.runq_depends[listid]
234             for origdep in origdeps:
235                 if maps[origdep] == -1:
236                     bb.msg.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!")
237                 newdeps.append(maps[origdep])
238             self.runq_depends[listid] = Set(newdeps)
239
240         bb.msg.note(2, bb.msg.domain.RunQueue, "Assign Weightings")
241
242         for listid in range(len(self.runq_fnid)):
243             for dep in self.runq_depends[listid]:
244                 self.runq_revdeps[dep].add(listid)
245
246         endpoints = []
247         for listid in range(len(self.runq_fnid)):
248             revdeps = self.runq_revdeps[listid]
249             if len(revdeps) == 0:
250                 runq_done[listid] = 1
251                 self.runq_weight[listid] = 1
252                 endpoints.append(listid)
253             for dep in revdeps:
254                 if dep in self.runq_depends[listid]:
255                     #self.dump_data(taskData)
256                     bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep] , taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
257             runq_weight1[listid] = len(revdeps)
258
259         bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints))
260
261         while 1:
262             next_points = []
263             for listid in endpoints:
264                 for revdep in self.runq_depends[listid]:
265                     self.runq_weight[revdep] = self.runq_weight[revdep] + self.runq_weight[listid]
266                     runq_weight1[revdep] = runq_weight1[revdep] - 1
267                     if runq_weight1[revdep] == 0:
268                         next_points.append(revdep)
269                         runq_done[revdep] = 1
270             endpoints = next_points
271             if len(next_points) == 0:
272                 break           
273
274         # Sanity Checks
275         for task in range(len(self.runq_fnid)):
276             if runq_done[task] == 0:
277                 seen = []
278                 deps_seen = []
279                 def print_chain(taskid, finish):
280                     seen.append(taskid)
281                     for revdep in self.runq_revdeps[taskid]:
282                         if runq_done[revdep] == 0 and revdep not in seen and not finish:
283                             bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) (depends: %s)" % (revdep, self.get_user_idstring(revdep, taskData), self.runq_depends[revdep]))
284                             if revdep in deps_seen:
285                                 bb.msg.error(bb.msg.domain.RunQueue, "Chain ends at Task %s (%s)" % (revdep, self.get_user_idstring(revdep, taskData)))
286                                 finish = True
287                                 return
288                             for dep in self.runq_depends[revdep]:
289                                 deps_seen.append(dep)
290                             print_chain(revdep, finish)
291                 print_chain(task, False)
292                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) not processed!\nThis is probably a circular dependency (the chain might be printed above)." % (task, self.get_user_idstring(task, taskData)))
293             if runq_weight1[task] != 0:
294                 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) count not zero!" % (task, self.get_user_idstring(task, taskData)))
295
296         # Make a weight sorted map
297         from copy import deepcopy
298
299         sortweight = deepcopy(self.runq_weight)
300         sortweight.sort()
301         copyweight = deepcopy(self.runq_weight)
302         self.prio_map = []
303
304         for weight in sortweight:
305             idx = copyweight.index(weight)
306             self.prio_map.append(idx)
307             copyweight[idx] = -1
308         self.prio_map.reverse()
309
310         #self.dump_data(taskData)
311
312     def execute_runqueue(self, cooker, cfgData, dataCache, taskData, runlist):
313         """
314         Run the tasks in a queue prepared by prepare_runqueue
315         Upon failure, optionally try to recover the build using any alternate providers
316         (if the abort on failure configuration option isn't set)
317         """
318
319         failures = 0
320         while 1:
321             failed_fnids = self.execute_runqueue_internal(cooker, cfgData, dataCache, taskData)
322             if len(failed_fnids) == 0:
323                 return failures
324             if taskData.abort:
325                 raise bb.runqueue.TaskFailure(failed_fnids)
326             for fnid in failed_fnids:
327                 #print "Failure: %s %s %s" % (fnid, taskData.fn_index[fnid],  self.runq_task[fnid])
328                 taskData.fail_fnid(fnid)
329                 failures = failures + 1
330             self.reset_runqueue()
331             self.prepare_runqueue(cfgData, dataCache, taskData, runlist)
332
333     def execute_runqueue_internal(self, cooker, cfgData, dataCache, taskData):
334         """
335         Run the tasks in a queue prepared by prepare_runqueue
336         """
337         import signal
338
339         bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
340
341         runq_buildable = []
342         runq_running = []
343         runq_complete = []
344         active_builds = 0
345         build_pids = {}
346         failed_fnids = []
347
348         if len(self.runq_fnid) == 0:
349             # nothing to do
350             return
351
352         def sigint_handler(signum, frame):
353             raise KeyboardInterrupt
354
355         def get_next_task(data):
356             """
357             Return the id of the highest priority task that is buildable
358             """
359             for task1 in range(len(data.runq_fnid)):
360                 task = data.prio_map[task1]
361                 if runq_running[task] == 1:
362                     continue
363                 if runq_buildable[task] == 1:
364                     return task
365             return None
366
367         def task_complete(data, task):
368             """
369             Mark a task as completed
370             Look at the reverse dependencies and mark any task with 
371             completed dependencies as buildable
372             """
373             runq_complete[task] = 1
374             for revdep in data.runq_revdeps[task]:
375                 if runq_running[revdep] == 1:
376                     continue
377                 if runq_buildable[revdep] == 1:
378                     continue
379                 alldeps = 1
380                 for dep in data.runq_depends[revdep]:
381                     if runq_complete[dep] != 1:
382                         alldeps = 0
383                 if alldeps == 1:
384                     runq_buildable[revdep] = 1
385                     fn = taskData.fn_index[self.runq_fnid[revdep]]
386                     taskname = self.runq_task[revdep]
387                     bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname))
388
389         # Mark initial buildable tasks
390         for task in range(len(self.runq_fnid)):
391             runq_running.append(0)
392             runq_complete.append(0)
393             if len(self.runq_depends[task]) == 0:
394                 runq_buildable.append(1)
395             else:
396                 runq_buildable.append(0)
397
398
399         number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData) or 1)
400
401         try:
402             while 1:
403                 task = get_next_task(self)
404                 if task is not None:
405                     fn = taskData.fn_index[self.runq_fnid[task]]
406                     taskname = self.runq_task[task]
407
408                     if bb.build.stamp_is_current_cache(dataCache, fn, taskname):
409                         targetid = taskData.gettask_id(fn, taskname)
410                         if not (targetid in taskData.external_targets and cooker.configuration.force):
411                             bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task, taskData)))
412                             runq_running[task] = 1
413                             task_complete(self, task)
414                             continue
415
416                     bb.msg.debug(1, bb.msg.domain.RunQueue, "Running task %s (%s)" % (task, self.get_user_idstring(task, taskData)))
417                     try: 
418                         pid = os.fork() 
419                     except OSError, e: 
420                         bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
421                     if pid == 0:
422                         # Bypass finally below
423                         active_builds = 0 
424                         # Stop Ctrl+C being sent to children
425                         signal.signal(signal.SIGINT, signal.SIG_IGN)
426                         sys.stdin = open('/dev/null', 'r')
427                         cooker.configuration.cmd = taskname[3:]
428                         try: 
429                             cooker.tryBuild(fn, False)
430                         except bb.build.EventException:
431                             bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
432                             sys.exit(1)
433                         except:
434                             bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
435                             raise
436                         sys.exit(0)
437                     build_pids[pid] = task
438                     runq_running[task] = 1
439                     active_builds = active_builds + 1
440                     if active_builds < number_tasks:
441                         continue
442                 if active_builds > 0:
443                     result = os.waitpid(-1, 0)
444                     active_builds = active_builds - 1
445                     task = build_pids[result[0]]
446                     if result[1] != 0:
447                         del build_pids[result[0]]
448                         bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task, taskData)))
449                         failed_fnids.append(self.runq_fnid[task])
450                         break
451                     task_complete(self, task)
452                     del build_pids[result[0]]
453                     continue
454                 break
455         finally:
456             try:
457                 orig_builds = active_builds
458                 while active_builds > 0:
459                     bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % active_builds)
460                     tasknum = 1
461                     for k, v in build_pids.iteritems():
462                          bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v, taskData), k))
463                          tasknum = tasknum + 1
464                     result = os.waitpid(-1, 0)
465                     task = build_pids[result[0]]
466                     if result[1] != 0:
467                          bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task, taskData)))
468                          failed_fnids.append(self.runq_fnid[task])
469                     del build_pids[result[0]]
470                     active_builds = active_builds - 1
471                 if orig_builds > 0:
472                     return failed_fnids
473             except:
474                 bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGTERM to remaining %s tasks" % active_builds)
475                 for k, v in build_pids.iteritems():
476                      os.kill(k, signal.SIGTERM)
477                 raise
478
479         # Sanity Checks
480         for task in range(len(self.runq_fnid)):
481             if runq_buildable[task] == 0:
482                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task)
483             if runq_running[task] == 0:
484                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task)
485             if runq_complete[task] == 0:
486                 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task)
487
488         return failed_fnids
489
490     def dump_data(self, taskQueue):
491         """
492         Dump some debug information on the internal data structures
493         """
494         bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
495         for task in range(len(self.runq_fnid)):
496                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
497                         taskQueue.fn_index[self.runq_fnid[task]], 
498                         self.runq_task[task], 
499                         self.runq_weight[task], 
500                         self.runq_depends[task], 
501                         self.runq_revdeps[task]))
502
503         bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
504         for task1 in range(len(self.runq_fnid)):
505             if task1 in self.prio_map:
506                 task = self.prio_map[task1]
507                 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s   Deps %s RevDeps %s" % (task, 
508                         taskQueue.fn_index[self.runq_fnid[task]], 
509                         self.runq_task[task], 
510                         self.runq_weight[task], 
511                         self.runq_depends[task], 
512                         self.runq_revdeps[task]))