Coworker

class coworker.Coworker(max_concurrency=10, sliding_window=True)

Generic worker to perform concurrent tasks using coroutine IO loop.

Initialize worker

Parameters:
  • max_concurrency (int) – How many tasks can be done at the same time. Defaults to 10.
  • sliding_window (bool) – Start a task as soon as there is an available slot based on concurrency instead of waiting for all concurrent tasks to be completed first.
add_tasks(tasks)

Add task(s) to queue

Parameters:tasks (object|list) – A single or list of task(s) to add to the queue.
Returns:If a single task is given, then returns a single task future that will contain result from self.do_task(). If a list of tasks is given, then a list of task futures, one for each task.

Note that if hash(task) is the same as another/existing task, the same future will be returned, and the task is only performed once. If it is desired to perform the same task multiple times / distinctly, then the task will need to be wrapped in another object that has a unique hash.

available_slots

Number of available slots to do tasks based on concurrency and window settings

cancel_task(task)

Cancel a task

do_task(task)

Perform the task. Sub-class should override this to do something more meaningful.

idle

Worker has nothing to do and is doing nothing

on_finish()

Invoked after worker completes all tasks before exiting worker. Subclass should override if needed.

on_finish_task(task, result)

” Invoked after the task is completed. Subclass should override if needed.

Parameters:
  • task – Task that was finished
  • result – Return value from self.do_task(task)()
on_start()

Invoked before worker starts. Subclass should override if needed.

on_start_task(task)

Invoked before starting the task. Subclass should override if needed.

Parameters:task – Task that will start
start(tasks=None)

Start the worker.

Parameters:tasks (list) – List of tasks to do. If provided, worker will exit immediately after all tasks are done. If that’s not desired, use self.add_task() instead.
Returns:List of futures for each task in the same order.
stop()

Stop the worker by canceling all tasks and then wait for worker to finish.