Source code for watchme.tasks.worker


Copyright (C) 2019 Vanessa Sochat.

This Source Code Form is subject to the terms of the
Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed
with this file, You can obtain one at


from watchme.logger import bot
from watchme.defaults import WATCHME_WORKERS
import multiprocessing
import itertools
import time
import signal
import sys

[docs]class Workers(object): def __init__(self, workers=None, show_progress=False): if workers is None: workers = WATCHME_WORKERS self.workers = workers self.show_progress = show_progress bot.debug("Using %s workers for multiprocess." % (self.workers))
[docs] def start(self): bot.debug("Starting multiprocess") self.start_time = time.time()
[docs] def end(self): self.end_time = time.time() self.runtime = self.runtime = self.end_time - self.start_time bot.debug("Ending multiprocess, runtime: %s sec" % (self.runtime))
[docs] def run(self, funcs, tasks): '''run will send a list of tasks, a tuple with arguments, through a function. the arguments should be ordered correctly. Parameters ========== funcs: the functions to run with multiprocessing.pool, a dictionary with lookup by the task name tasks: a dict of tasks, each task name (key) with a tuple of arguments to process ''' # Number of tasks must == number of functions assert len(funcs)==len(tasks) # Keep track of some progress for the user progress = 1 total = len(tasks) # if we don't have tasks, don't run if not tasks: return # results will also have the same key to look up finished = dict() results = [] try: prefix = "[%s/%s]" % (progress, total) if self.show_progress: bot.show_progress(0, total, length=35, prefix=prefix) pool = multiprocessing.Pool(self.workers, init_worker) self.start() for key, params in tasks.items(): func = funcs[key] if not self.show_progress:'Processing task %s:%s' % (key, params)) result = pool.apply_async(multi_wrapper, multi_package(func, [params])) # Store the key with the result results.append((key, result)) while len(results) > 0: pair = results.pop() key, result = pair result.wait() if self.show_progress: bot.show_progress(progress, total, length=35, prefix=prefix) progress += 1 prefix = "[%s/%s]" % (progress, total) finished[key] = result.get() self.end() pool.close() pool.join() except (KeyboardInterrupt, SystemExit): bot.error("Keyboard interrupt detected, terminating workers!") pool.terminate() sys.exit(1) except: bot.exit('Error running task.') return finished
# Supporting functions for MultiProcess Worker
[docs]def init_worker(): signal.signal(signal.SIGINT, signal.SIG_IGN)
[docs]def multi_wrapper(func_args): function, kwargs = func_args return function(**kwargs)
[docs]def multi_package(func, kwargs): zipped = zip(itertools.repeat(func), kwargs) return zipped