/* * Module: Async * Author: Jeff Ober * Homepage: http://www.artfulcode.net/articles/thread-pool-module-for-pike/ * Version: 1.0-ish * Released: 2009-03-10 * * Copyright (c) 2009 Jeff Ober * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. * * Async provides a simple process pool that allocates a number of worker threads * that may then be utilized collectively without having to deal directly with * threads. Results of jobs sent to the pool are "future" objects, called Asyncs. * Asyncs' values are then acquired by calling Async.sync, which blocks until the * value has been set by the pool. A supervisor thread monitors the worker threads * and restarts any that terminate abnormally. * * Usage is simple: * * // create a pool of four processes * Pool pool = Pool(4) * * // send a job to the pool * Async result = Pool->send(exp, 10); * * // get the result * result->sync(); * // => 22026.46484375 * * // alternately, you can call an async * result(); * // => 22026.46484375 * * // pools can conveniently map across a sequence * Async mapped_result = pool->map(exp, enumerate(5)); * mapped_result->sync(); * // => ({ 1.0, 2.718281745910645, 7.389056205749512, 20.08553695678711, 54.59814834594727 }) * * // or iterate without collecting return values * int x = 0; * pool->iter(lambda (int n) { x += n; }, enumerate(100)); * write("%d", x); * // => 4950 * * // close the pool, blocking until all workers have terminated * pool->close(); * */ //! Asyncs are "future values" that have not yet been evaluated. They are created //! with a function and its arguments, which are applied using Async->eval(). To //! access the result of the evaluation, the client calls Async->sync(), which //! blocks until the evaluation has completed, and returns the result. Any errors //! raised during evaluation are caught and rethrown from sync(). As a convenience, //! Async objects are callable (calling sync()). public class Async { inherit Thread.Mutex; inherit Thread.Condition; private function fn; private array(mixed) params; private mixed result; private int(0..1) error_p, complete_p; //! Creates a new Async that will call f(mixed ... parameters). public void create(function f, mixed ... parameters) { fn = f; params = parameters; } //! Evaluates the async and signals any waiting threads the the evaluation has //! completed. public void eval() { mixed rs; if (catch { result = fn(@params); } == 1) { error_p = 1; result = rs; } complete_p = 1; broadcast(); } //! Blocks until the Async has been evaluated and returns the result of the //! evaluation. Any errors thrown during evaluation are rethrown here. public mixed sync() { Thread.MutexKey key = lock(); while (!complete_p) wait(key); if (error_p) throw(result); else return result; } //! An alias of sync(). public mixed `()() { return sync(); } } //! A Worker thread in the ThreadPool. Loops repeatedly, evaluating Asyncs, //! until a null value is sent in the queue. private class Worker { private Thread.Queue queue; private Thread.Thread thread; //! Create a Worker that reads asyncs from q. public void create(Thread.Queue q) { queue = q; start(); } //! Gets the thread id public int id() { return thread->id_number(); } //! Initializes the work loop. public void start() { thread = Thread.Thread(run); } //! Loops indefinitely, evaluating Asyncs until it reads a null from the queue. private void run() { while (1) { Async task = queue->read(); if (!task) return; else task->eval(); task = 0; } } //! Returns 1 if this thread is currently running. public int(0..1) running_p() { return thread->status() == Thread.THREAD_RUNNING; } //! Sends a null value to the queue, causing the Worker to halt. public void close() { queue->write(0); } //! Blocks until the Worker completes. public void wait() { thread->wait(); } //! Repeatedly attempts to kill the worker until it is dead. public void kill() { thread->kill(); thread->wait(); } } //! Supervises worker threads, restarting any closed threads every few seconds. private class Supervisor { private Thread.Thread thread; private array(Worker) workers; private int|float check; //! Creates a new Supervisor for worker_threads. Will check worker status //! every check_seconds seconds. void create(array(Worker) worker_threads, int|float check_seconds) { check = check_seconds; workers = worker_threads; thread = Thread.Thread(run); } //! Kills the supervisor and blocks until it completes. void kill() { thread->kill(); thread->wait(); } //! The supervisor work loop. void run() { while (1) { sleep(check); foreach(workers;; Worker worker) { if (!worker->running_p()) { write("Worker %O has died. Restarting.\n", worker->id()); worker->start(); } } } } } //! Manages a pool of worker threads. public class Pool { Supervisor supervisor; Thread.Queue queue; array(Worker) workers; //! Creates a new Pool with processes worker threads and a supervisor thread //! to watch the worker threads and restart them when necessary. If //! check_seconds is provided, it controls how often the supervisor checks //! the status of each worker. If void, defaults to every 1 second. public void create(int processes, int|float|void check_seconds) { queue = Thread.Queue(); workers = allocate(processes); for (int i = 0; i < processes; ++i) workers[i] = Worker(queue); supervisor = Supervisor(workers, check_seconds | 1); } //! Closes the pool and blocks until all workers have stopped. public void close() { supervisor->kill(); foreach(workers;; Worker worker) worker->close(); } //! Sends a new job to the pool and returns an Async result. public Async send(function f, mixed ... args) { Async async = Async(f, @args); queue->write(async); return async; } //! Private method to evaluate an array of asyncs private array(mixed) collect_results(array(Async) results) { for (int i = 0; i < sizeof(results); ++i) results[i] = results[i]->sync(); return results; } //! Maps f across args in the pool and returns an Async result which, when //! synchronized, returns an array of the results. public Async map(function f, array(mixed) args) { array(Async) results = allocate(sizeof(args)); for (int i = 0; i < sizeof(args); ++i) results[i] = send(f, args[i]); return send(collect_results, results); } //! Iterates across args, applying f to each in the pool. Returns void and //! should be used only for side effects of f. public void iter(function f, array(mixed) args) { foreach(args;; mixed arg) send(f, arg); } }