91d55479e20aed7f4e3efbf4f02f404bf1a3e715
[samba.git] / third_party / waf / waflib / Runner.py
1 #!/usr/bin/env python
2 # encoding: utf-8
3 # Thomas Nagy, 2005-2018 (ita)
4
5 """
6 Runner.py: Task scheduling and execution
7 """
8
9 import heapq, traceback
10 try:
11         from queue import Queue, PriorityQueue
12 except ImportError:
13         from Queue import Queue
14         try:
15                 from Queue import PriorityQueue
16         except ImportError:
17                 class PriorityQueue(Queue):
18                         def _init(self, maxsize):
19                                 self.maxsize = maxsize
20                                 self.queue = []
21                         def _put(self, item):
22                                 heapq.heappush(self.queue, item)
23                         def _get(self):
24                                 return heapq.heappop(self.queue)
25
26 from waflib import Utils, Task, Errors, Logs
27
28 GAP = 5
29 """
30 Wait for at least ``GAP * njobs`` before trying to enqueue more tasks to run
31 """
32
33 class PriorityTasks(object):
34         def __init__(self):
35                 self.lst = []
36         def __len__(self):
37                 return len(self.lst)
38         def __iter__(self):
39                 return iter(self.lst)
40         def __str__(self):
41                 return 'PriorityTasks: [%s]' % '\n  '.join(str(x) for x in self.lst)
42         def clear(self):
43                 self.lst = []
44         def append(self, task):
45                 heapq.heappush(self.lst, task)
46         def appendleft(self, task):
47                 "Deprecated, do not use"
48                 heapq.heappush(self.lst, task)
49         def pop(self):
50                 return heapq.heappop(self.lst)
51         def extend(self, lst):
52                 if self.lst:
53                         for x in lst:
54                                 self.append(x)
55                 else:
56                         if isinstance(lst, list):
57                                 self.lst = lst
58                                 heapq.heapify(lst)
59                         else:
60                                 self.lst = lst.lst
61
62 class Consumer(Utils.threading.Thread):
63         """
64         Daemon thread object that executes a task. It shares a semaphore with
65         the coordinator :py:class:`waflib.Runner.Spawner`. There is one
66         instance per task to consume.
67         """
68         def __init__(self, spawner, task):
69                 Utils.threading.Thread.__init__(self)
70                 self.task = task
71                 """Task to execute"""
72                 self.spawner = spawner
73                 """Coordinator object"""
74                 self.setDaemon(1)
75                 self.start()
76         def run(self):
77                 """
78                 Processes a single task
79                 """
80                 try:
81                         if not self.spawner.master.stop:
82                                 self.spawner.master.process_task(self.task)
83                 finally:
84                         self.spawner.sem.release()
85                         self.spawner.master.out.put(self.task)
86                         self.task = None
87                         self.spawner = None
88
89 class Spawner(Utils.threading.Thread):
90         """
91         Daemon thread that consumes tasks from :py:class:`waflib.Runner.Parallel` producer and
92         spawns a consuming thread :py:class:`waflib.Runner.Consumer` for each
93         :py:class:`waflib.Task.Task` instance.
94         """
95         def __init__(self, master):
96                 Utils.threading.Thread.__init__(self)
97                 self.master = master
98                 """:py:class:`waflib.Runner.Parallel` producer instance"""
99                 self.sem = Utils.threading.Semaphore(master.numjobs)
100                 """Bounded semaphore that prevents spawning more than *n* concurrent consumers"""
101                 self.setDaemon(1)
102                 self.start()
103         def run(self):
104                 """
105                 Spawns new consumers to execute tasks by delegating to :py:meth:`waflib.Runner.Spawner.loop`
106                 """
107                 try:
108                         self.loop()
109                 except Exception:
110                         # Python 2 prints unnecessary messages when shutting down
111                         # we also want to stop the thread properly
112                         pass
113         def loop(self):
114                 """
115                 Consumes task objects from the producer; ends when the producer has no more
116                 task to provide.
117                 """
118                 master = self.master
119                 while 1:
120                         task = master.ready.get()
121                         self.sem.acquire()
122                         if not master.stop:
123                                 task.log_display(task.generator.bld)
124                         Consumer(self, task)
125
126 class Parallel(object):
127         """
128         Schedule the tasks obtained from the build context for execution.
129         """
130         def __init__(self, bld, j=2):
131                 """
132                 The initialization requires a build context reference
133                 for computing the total number of jobs.
134                 """
135
136                 self.numjobs = j
137                 """
138                 Amount of parallel consumers to use
139                 """
140
141                 self.bld = bld
142                 """
143                 Instance of :py:class:`waflib.Build.BuildContext`
144                 """
145
146                 self.outstanding = PriorityTasks()
147                 """Heap of :py:class:`waflib.Task.Task` that may be ready to be executed"""
148
149                 self.postponed = PriorityTasks()
150                 """Heap of :py:class:`waflib.Task.Task` which are not ready to run for non-DAG reasons"""
151
152                 self.incomplete = set()
153                 """List of :py:class:`waflib.Task.Task` waiting for dependent tasks to complete (DAG)"""
154
155                 self.ready = PriorityQueue(0)
156                 """List of :py:class:`waflib.Task.Task` ready to be executed by consumers"""
157
158                 self.out = Queue(0)
159                 """List of :py:class:`waflib.Task.Task` returned by the task consumers"""
160
161                 self.count = 0
162                 """Amount of tasks that may be processed by :py:class:`waflib.Runner.TaskConsumer`"""
163
164                 self.processed = 0
165                 """Amount of tasks processed"""
166
167                 self.stop = False
168                 """Error flag to stop the build"""
169
170                 self.error = []
171                 """Tasks that could not be executed"""
172
173                 self.biter = None
174                 """Task iterator which must give groups of parallelizable tasks when calling ``next()``"""
175
176                 self.dirty = False
177                 """
178                 Flag that indicates that the build cache must be saved when a task was executed
179                 (calls :py:meth:`waflib.Build.BuildContext.store`)"""
180
181                 self.revdeps = Utils.defaultdict(set)
182                 """
183                 The reverse dependency graph of dependencies obtained from Task.run_after
184                 """
185
186                 self.spawner = None
187                 """
188                 Coordinating daemon thread that spawns thread consumers
189                 """
190                 if self.numjobs > 1:
191                         self.spawner = Spawner(self)
192
193         def get_next_task(self):
194                 """
195                 Obtains the next Task instance to run
196
197                 :rtype: :py:class:`waflib.Task.Task`
198                 """
199                 if not self.outstanding:
200                         return None
201                 return self.outstanding.pop()
202
203         def postpone(self, tsk):
204                 """
205                 Adds the task to the list :py:attr:`waflib.Runner.Parallel.postponed`.
206                 The order is scrambled so as to consume as many tasks in parallel as possible.
207
208                 :param tsk: task instance
209                 :type tsk: :py:class:`waflib.Task.Task`
210                 """
211                 self.postponed.append(tsk)
212
213         def refill_task_list(self):
214                 """
215                 Pulls a next group of tasks to execute in :py:attr:`waflib.Runner.Parallel.outstanding`.
216                 Ensures that all tasks in the current build group are complete before processing the next one.
217                 """
218                 while self.count > self.numjobs * GAP:
219                         self.get_out()
220
221                 while not self.outstanding:
222                         if self.count:
223                                 self.get_out()
224                                 if self.outstanding:
225                                         break
226                         elif self.postponed:
227                                 try:
228                                         cond = self.deadlock == self.processed
229                                 except AttributeError:
230                                         pass
231                                 else:
232                                         if cond:
233                                                 # The most common reason is conflicting build order declaration
234                                                 # for example: "X run_after Y" and "Y run_after X"
235                                                 # Another can be changing "run_after" dependencies while the build is running
236                                                 # for example: updating "tsk.run_after" in the "runnable_status" method
237                                                 lst = []
238                                                 for tsk in self.postponed:
239                                                         deps = [id(x) for x in tsk.run_after if not x.hasrun]
240                                                         lst.append('%s\t-> %r' % (repr(tsk), deps))
241                                                         if not deps:
242                                                                 lst.append('\n  task %r dependencies are done, check its *runnable_status*?' % id(tsk))
243                                                 raise Errors.WafError('Deadlock detected: check the task build order%s' % ''.join(lst))
244                                 self.deadlock = self.processed
245
246                         if self.postponed:
247                                 self.outstanding.extend(self.postponed)
248                                 self.postponed.clear()
249                         elif not self.count:
250                                 if self.incomplete:
251                                         for x in self.incomplete:
252                                                 for k in x.run_after:
253                                                         if not k.hasrun:
254                                                                 break
255                                                 else:
256                                                         # dependency added after the build started without updating revdeps
257                                                         self.incomplete.remove(x)
258                                                         self.outstanding.append(x)
259                                                         break
260                                         else:
261                                                 if self.stop or self.error:
262                                                         break
263                                                 raise Errors.WafError('Broken revdeps detected on %r' % self.incomplete)
264                                 else:
265                                         tasks = next(self.biter)
266                                         ready, waiting = self.prio_and_split(tasks)
267                                         self.outstanding.extend(ready)
268                                         self.incomplete.update(waiting)
269                                         self.total = self.bld.total()
270                                         break
271
272         def add_more_tasks(self, tsk):
273                 """
274                 If a task provides :py:attr:`waflib.Task.Task.more_tasks`, then the tasks contained
275                 in that list are added to the current build and will be processed before the next build group.
276
277                 The priorities for dependent tasks are not re-calculated globally
278
279                 :param tsk: task instance
280                 :type tsk: :py:attr:`waflib.Task.Task`
281                 """
282                 if getattr(tsk, 'more_tasks', None):
283                         more = set(tsk.more_tasks)
284                         groups_done = set()
285                         def iteri(a, b):
286                                 for x in a:
287                                         yield x
288                                 for x in b:
289                                         yield x
290
291                         # Update the dependency tree
292                         # this assumes that task.run_after values were updated
293                         for x in iteri(self.outstanding, self.incomplete):
294                                 for k in x.run_after:
295                                         if isinstance(k, Task.TaskGroup):
296                                                 if k not in groups_done:
297                                                         groups_done.add(k)
298                                                         for j in k.prev & more:
299                                                                 self.revdeps[j].add(k)
300                                         elif k in more:
301                                                 self.revdeps[k].add(x)
302
303                         ready, waiting = self.prio_and_split(tsk.more_tasks)
304                         self.outstanding.extend(ready)
305                         self.incomplete.update(waiting)
306                         self.total += len(tsk.more_tasks)
307
308         def mark_finished(self, tsk):
309                 def try_unfreeze(x):
310                         # DAG ancestors are likely to be in the incomplete set
311                         # This assumes that the run_after contents have not changed
312                         # after the build starts, else a deadlock may occur
313                         if x in self.incomplete:
314                                 # TODO remove dependencies to free some memory?
315                                 # x.run_after.remove(tsk)
316                                 for k in x.run_after:
317                                         if not k.hasrun:
318                                                 break
319                                 else:
320                                         self.incomplete.remove(x)
321                                         self.outstanding.append(x)
322
323                 if tsk in self.revdeps:
324                         for x in self.revdeps[tsk]:
325                                 if isinstance(x, Task.TaskGroup):
326                                         x.prev.remove(tsk)
327                                         if not x.prev:
328                                                 for k in x.next:
329                                                         # TODO necessary optimization?
330                                                         k.run_after.remove(x)
331                                                         try_unfreeze(k)
332                                                 # TODO necessary optimization?
333                                                 x.next = []
334                                 else:
335                                         try_unfreeze(x)
336                         del self.revdeps[tsk]
337
338                 if hasattr(tsk, 'semaphore'):
339                         sem = tsk.semaphore
340                         try:
341                                 sem.release(tsk)
342                         except KeyError:
343                                 # TODO
344                                 pass
345                         else:
346                                 while sem.waiting and not sem.is_locked():
347                                         # take a frozen task, make it ready to run
348                                         x = sem.waiting.pop()
349                                         self._add_task(x)
350
351         def get_out(self):
352                 """
353                 Waits for a Task that task consumers add to :py:attr:`waflib.Runner.Parallel.out` after execution.
354                 Adds more Tasks if necessary through :py:attr:`waflib.Runner.Parallel.add_more_tasks`.
355
356                 :rtype: :py:attr:`waflib.Task.Task`
357                 """
358                 tsk = self.out.get()
359                 if not self.stop:
360                         self.add_more_tasks(tsk)
361                 self.mark_finished(tsk)
362
363                 self.count -= 1
364                 self.dirty = True
365                 return tsk
366
367         def add_task(self, tsk):
368                 """
369                 Enqueue a Task to :py:attr:`waflib.Runner.Parallel.ready` so that consumers can run them.
370
371                 :param tsk: task instance
372                 :type tsk: :py:attr:`waflib.Task.Task`
373                 """
374                 # TODO change in waf 2.1
375                 self.ready.put(tsk)
376
377         def _add_task(self, tsk):
378                 if hasattr(tsk, 'semaphore'):
379                         sem = tsk.semaphore
380                         try:
381                                 sem.acquire(tsk)
382                         except IndexError:
383                                 sem.waiting.add(tsk)
384                                 return
385
386                 self.count += 1
387                 self.processed += 1
388                 if self.numjobs == 1:
389                         tsk.log_display(tsk.generator.bld)
390                         try:
391                                 self.process_task(tsk)
392                         finally:
393                                 self.out.put(tsk)
394                 else:
395                         self.add_task(tsk)
396
397         def process_task(self, tsk):
398                 """
399                 Processes a task and attempts to stop the build in case of errors
400                 """
401                 tsk.process()
402                 if tsk.hasrun != Task.SUCCESS:
403                         self.error_handler(tsk)
404
405         def skip(self, tsk):
406                 """
407                 Mark a task as skipped/up-to-date
408                 """
409                 tsk.hasrun = Task.SKIPPED
410                 self.mark_finished(tsk)
411
412         def cancel(self, tsk):
413                 """
414                 Mark a task as failed because of unsatisfiable dependencies
415                 """
416                 tsk.hasrun = Task.CANCELED
417                 self.mark_finished(tsk)
418
419         def error_handler(self, tsk):
420                 """
421                 Called when a task cannot be executed. The flag :py:attr:`waflib.Runner.Parallel.stop` is set,
422                 unless the build is executed with::
423
424                         $ waf build -k
425
426                 :param tsk: task instance
427                 :type tsk: :py:attr:`waflib.Task.Task`
428                 """
429                 if not self.bld.keep:
430                         self.stop = True
431                 self.error.append(tsk)
432
433         def task_status(self, tsk):
434                 """
435                 Obtains the task status to decide whether to run it immediately or not.
436
437                 :return: the exit status, for example :py:attr:`waflib.Task.ASK_LATER`
438                 :rtype: integer
439                 """
440                 try:
441                         return tsk.runnable_status()
442                 except Exception:
443                         self.processed += 1
444                         tsk.err_msg = traceback.format_exc()
445                         if not self.stop and self.bld.keep:
446                                 self.skip(tsk)
447                                 if self.bld.keep == 1:
448                                         # if -k stop on the first exception, if -kk try to go as far as possible
449                                         if Logs.verbose > 1 or not self.error:
450                                                 self.error.append(tsk)
451                                         self.stop = True
452                                 else:
453                                         if Logs.verbose > 1:
454                                                 self.error.append(tsk)
455                                 return Task.EXCEPTION
456
457                         tsk.hasrun = Task.EXCEPTION
458                         self.error_handler(tsk)
459
460                         return Task.EXCEPTION
461
462         def start(self):
463                 """
464                 Obtains Task instances from the BuildContext instance and adds the ones that need to be executed to
465                 :py:class:`waflib.Runner.Parallel.ready` so that the :py:class:`waflib.Runner.Spawner` consumer thread
466                 has them executed. Obtains the executed Tasks back from :py:class:`waflib.Runner.Parallel.out`
467                 and marks the build as failed by setting the ``stop`` flag.
468                 If only one job is used, then executes the tasks one by one, without consumers.
469                 """
470                 self.total = self.bld.total()
471
472                 while not self.stop:
473
474                         self.refill_task_list()
475
476                         # consider the next task
477                         tsk = self.get_next_task()
478                         if not tsk:
479                                 if self.count:
480                                         # tasks may add new ones after they are run
481                                         continue
482                                 else:
483                                         # no tasks to run, no tasks running, time to exit
484                                         break
485
486                         if tsk.hasrun:
487                                 # if the task is marked as "run", just skip it
488                                 self.processed += 1
489                                 continue
490
491                         if self.stop: # stop immediately after a failure is detected
492                                 break
493
494                         st = self.task_status(tsk)
495                         if st == Task.RUN_ME:
496                                 self._add_task(tsk)
497                         elif st == Task.ASK_LATER:
498                                 self.postpone(tsk)
499                         elif st == Task.SKIP_ME:
500                                 self.processed += 1
501                                 self.skip(tsk)
502                                 self.add_more_tasks(tsk)
503                         elif st == Task.CANCEL_ME:
504                                 # A dependency problem has occurred, and the
505                                 # build is most likely run with `waf -k`
506                                 if Logs.verbose > 1:
507                                         self.error.append(tsk)
508                                 self.processed += 1
509                                 self.cancel(tsk)
510
511                 # self.count represents the tasks that have been made available to the consumer threads
512                 # collect all the tasks after an error else the message may be incomplete
513                 while self.error and self.count:
514                         self.get_out()
515
516                 self.ready.put(None)
517                 if not self.stop:
518                         assert not self.count
519                         assert not self.postponed
520                         assert not self.incomplete
521
522         def prio_and_split(self, tasks):
523                 """
524                 Label input tasks with priority values, and return a pair containing
525                 the tasks that are ready to run and the tasks that are necessarily
526                 waiting for other tasks to complete.
527
528                 The priority system is really meant as an optional layer for optimization:
529                 dependency cycles are found quickly, and builds should be more efficient.
530                 A high priority number means that a task is processed first.
531
532                 This method can be overridden to disable the priority system::
533
534                         def prio_and_split(self, tasks):
535                                 return tasks, []
536
537                 :return: A pair of task lists
538                 :rtype: tuple
539                 """
540                 # to disable:
541                 #return tasks, []
542                 for x in tasks:
543                         x.visited = 0
544
545                 reverse = self.revdeps
546
547                 groups_done = set()
548                 for x in tasks:
549                         for k in x.run_after:
550                                 if isinstance(k, Task.TaskGroup):
551                                         if k not in groups_done:
552                                                 groups_done.add(k)
553                                                 for j in k.prev:
554                                                         reverse[j].add(k)
555                                 else:
556                                         reverse[k].add(x)
557
558                 # the priority number is not the tree depth
559                 def visit(n):
560                         if isinstance(n, Task.TaskGroup):
561                                 return sum(visit(k) for k in n.next)
562
563                         if n.visited == 0:
564                                 n.visited = 1
565
566                                 if n in reverse:
567                                         rev = reverse[n]
568                                         n.prio_order = n.tree_weight + len(rev) + sum(visit(k) for k in rev)
569                                 else:
570                                         n.prio_order = n.tree_weight
571
572                                 n.visited = 2
573                         elif n.visited == 1:
574                                 raise Errors.WafError('Dependency cycle found!')
575                         return n.prio_order
576
577                 for x in tasks:
578                         if x.visited != 0:
579                                 # must visit all to detect cycles
580                                 continue
581                         try:
582                                 visit(x)
583                         except Errors.WafError:
584                                 self.debug_cycles(tasks, reverse)
585
586                 ready = []
587                 waiting = []
588                 for x in tasks:
589                         for k in x.run_after:
590                                 if not k.hasrun:
591                                         waiting.append(x)
592                                         break
593                         else:
594                                 ready.append(x)
595                 return (ready, waiting)
596
597         def debug_cycles(self, tasks, reverse):
598                 tmp = {}
599                 for x in tasks:
600                         tmp[x] = 0
601
602                 def visit(n, acc):
603                         if isinstance(n, Task.TaskGroup):
604                                 for k in n.next:
605                                         visit(k, acc)
606                                 return
607                         if tmp[n] == 0:
608                                 tmp[n] = 1
609                                 for k in reverse.get(n, []):
610                                         visit(k, [n] + acc)
611                                 tmp[n] = 2
612                         elif tmp[n] == 1:
613                                 lst = []
614                                 for tsk in acc:
615                                         lst.append(repr(tsk))
616                                         if tsk is n:
617                                                 # exclude prior nodes, we want the minimum cycle
618                                                 break
619                                 raise Errors.WafError('Task dependency cycle in "run_after" constraints: %s' % ''.join(lst))
620                 for x in tasks:
621                         visit(x, [])
622