diff --git a/doc/source/reference/index.rst b/doc/source/reference/index.rst index 24b3b14..f22d349 100644 --- a/doc/source/reference/index.rst +++ b/doc/source/reference/index.rst @@ -22,6 +22,10 @@ Executors :members: :special-members: __init__ +.. autoclass:: futurist.DynamicThreadPoolExecutor + :members: + :special-members: __init__ + ------- Futures ------- diff --git a/futurist/__init__.py b/futurist/__init__.py index d3294e9..b65b6c5 100644 --- a/futurist/__init__.py +++ b/futurist/__init__.py @@ -20,6 +20,7 @@ from futurist._futures import GreenFuture from futurist._futures import CancelledError from futurist._futures import TimeoutError +from futurist._futures import DynamicThreadPoolExecutor from futurist._futures import GreenThreadPoolExecutor from futurist._futures import ProcessPoolExecutor from futurist._futures import SynchronousExecutor @@ -37,6 +38,7 @@ __all__ = [ 'GreenThreadPoolExecutor', 'ProcessPoolExecutor', 'SynchronousExecutor', 'ThreadPoolExecutor', + 'DynamicThreadPoolExecutor', 'RejectedSubmission', diff --git a/futurist/_futures.py b/futurist/_futures.py index 261dc39..2f08f2b 100644 --- a/futurist/_futures.py +++ b/futurist/_futures.py @@ -13,8 +13,10 @@ # under the License. import functools +import logging import queue import threading +import time from concurrent import futures as _futures from concurrent.futures import process as _process @@ -29,6 +31,9 @@ TimeoutError = _futures.TimeoutError CancelledError = _futures.CancelledError +LOG = logging.getLogger(__name__) + + class RejectedSubmission(Exception): """Exception raised when a submitted call is rejected (for some reason).""" @@ -102,6 +107,10 @@ class ThreadPoolExecutor(_futures.Executor): It gathers statistics about the submissions executed for post-analysis... + Note that this executor never shrinks its thread pool, which will cause + the pool to eventually reach its maximum capacity defined by max_workers. + Check :py:class:`DynamicThreadPoolExecutor` for an alternative. + See: https://docs.python.org/dev/library/concurrent.futures.html """ @@ -172,18 +181,19 @@ class ThreadPoolExecutor(_futures.Executor): with self._shutdown_lock: return sum(1 for w in self._workers if w.idle) + def _add_thread(self): + w = _thread.ThreadWorker.create_and_register( + self, self._work_queue) + # Always save it before we start (so that even if we fail + # starting it we can correctly join on it). + self._workers.append(w) + w.start() + def _maybe_spin_up(self): """Spin up a worker if needed.""" - # Do more advanced idle checks and/or reaping of very idle - # threads in the future.... if (not self._workers or len(self._workers) < self._max_workers): - w = _thread.ThreadWorker.create_and_register( - self, self._work_queue) - # Always save it before we start (so that even if we fail - # starting it we can correctly join on it). - self._workers.append(w) - w.start() + self._add_thread() def shutdown(self, wait=True): with self._shutdown_lock: @@ -203,6 +213,9 @@ class ThreadPoolExecutor(_futures.Executor): def submit(self, fn, *args, **kwargs): """Submit some work to be executed (and gather statistics).""" + # NOTE(dtantsur): DynamicThreadPoolExecutor relies on this lock for + # its complex logic around thread management. If you ever decide to + # remove it, please add a lock there instead. with self._shutdown_lock: if self._shutdown: raise RuntimeError('Can not schedule new futures' @@ -211,6 +224,174 @@ class ThreadPoolExecutor(_futures.Executor): return self._gatherer.submit(fn, *args, **kwargs) +class DynamicThreadPoolExecutor(ThreadPoolExecutor): + """Executor that creates or removes threads on demand. + + As new work is scheduled on the executor, it will try to keep the + proportion of busy threads within the provided range (between 40% and 80% + by default). A busy thread is a thread that is not waiting on the + task queue. + + Each time a task is submitted, the executor makes a decision whether to + grow or shrink the pool. It takes the proportion of the number of busy + threads to the total number of threads and compares it to shrink_threshold + and grow_threshold. + + Initially, the pool is empty, so submitting a task always result in one + new thread. Since min_workers must be positive, at least one thread will + always be available after this point. + + Once the proportion of busy threads reaches grow_threshold (e.g. 4 out of 5 + with the default grow_threshold of 0.8), a new thread is created when a + task is submitted. If on submitting a task a proportion of busy threads is + below shrink_threshold (e.g. only 2 out of 5), one idle thread is stopped. + + The values of grow_threshold and shrink_threshold are different to prevent + the number of threads from oscilating on reaching grow_threshold. + + If threads are not created often in your application, the number of idle + threads may stay high for a long time. To avoid it, you can call + :py:meth:`.maintain` periodically to keep the number of threads within + the thresholds. + + """ + + def __init__(self, max_workers=None, check_and_reject=None, + min_workers=1, grow_threshold=0.8, + shrink_threshold=0.4): + """Initializes a thread pool executor. + + :param max_workers: maximum number of workers that can be + simultaneously active at the same time, further + submitted work will be queued up when this limit + is reached. + :type max_workers: int + :param check_and_reject: a callback function that will be provided + two position arguments, the first argument + will be this executor instance, and the second + will be the number of currently queued work + items in this executors backlog; the callback + should raise a :py:class:`.RejectedSubmission` + exception if it wants to have this submission + rejected. + :type check_and_reject: callback + :param min_workers: the minimum number of workers that can be reached + when shrinking the pool. Note that the pool always + starts at zero workers and will be smaller than + min_workers until enough workers are created. + At least one thread is required. + :type max_workers: int + :param grow_threshold: minimum proportion of busy threads to total + threads for the pool to grow. + :type grow_threshold: float + :param shrink_threshold: maximum proportion of busy threads to total + threads for the pool to shrink. + :type shrink_threshold: float + """ + super().__init__(max_workers=max_workers, + check_and_reject=check_and_reject) + if min_workers <= 0: + raise ValueError('min_workers must be positive') + if max_workers and min_workers >= max_workers: + raise ValueError('min_workers must be less than max_workers') + self._min_workers = min_workers + + if grow_threshold <= 0 or grow_threshold > 1.0: + raise ValueError('grow_threshold must be within (0, 1]') + if shrink_threshold < 0 or shrink_threshold >= 1.0: + raise ValueError('shrink_threshold must be within [0, 1)') + if shrink_threshold >= grow_threshold: + raise ValueError( + 'shrink_threshold must be less than grow_threshold') + self._grow_threshold = grow_threshold + self._shrink_threshold = shrink_threshold + + self._dead_workers = [] + + def _drop_thread(self): + new_workers = [] + idle_worker = None + for i, w in enumerate(self._workers): + if w.idle: + new_workers = self._workers[i + 1:] + idle_worker = w + break + new_workers.append(w) + + if idle_worker is None: + # Should not actually happen but races are possible; do nothing + LOG.warning( + 'No idle worker thread to delete when shrinking pool %r', self) + return False + + w.stop() + self._workers = new_workers + self._dead_workers.append(w) + return True + + def _maybe_spin_up(self): + nthreads = self.num_workers + if nthreads < self._min_workers: + self._add_thread() + return True + + # NOTE(dtantsur): here we count the number of threads that are + # doing something (i.e. are not waiting on the queue) plus the + # number of tasks in the queue. In theory, if there are idle + # workers, the queue should be empty. But race conditions are + # possible when workers do not pick up tasks quickly enough, + # especially in the presence of CPU-bound tasks. + idle = self.get_num_idle_workers() + busy = (nthreads - idle + self.queue_size) / nthreads + if busy >= self._grow_threshold and nthreads < self._max_workers: + LOG.debug('Creating a new worker thread for pool %r ' + '(%d thread(s) idle, queue size %d, total %d thread(s))', + self, idle, self.queue_size, nthreads) + self._add_thread() + return True + elif busy <= self._shrink_threshold and nthreads > self._min_workers: + LOG.debug('Deleting a worker thread from pool %r ' + '(%d thread(s) idle, queue size %d, total %d thread(s))', + self, idle, self.queue_size, nthreads) + return self._drop_thread() + + return False + + def maintain(self): + """Keep the number of threads within the expected range. + + If too many idle threads are running, they are deleted. + Additionally, deleted workers are joined to free up resources. + """ + # NOTE(dtantsur): this call can potentially run for some time, so + # avoid taking shutdown_lock once and holding it for the entire + # duration (blocking any new tasks from being added). + keep_going = True + while keep_going: + if self._shutdown: + return + + with self._shutdown_lock: + keep_going = self._maybe_spin_up() + + time.sleep(0) + + # NOTE(dtantsur): copy the value of _dead_workers to prevent races with + # other invocations for maintain or shutdown. + with self._shutdown_lock: + dead_workers = self._dead_workers + self._dead_workers = [] + + for w in dead_workers: + w.join() + + def shutdown(self, wait=True): + super().shutdown(wait=wait) + if wait: + for w in self._dead_workers: + w.join() + + class ProcessPoolExecutor(_process.ProcessPoolExecutor): """Executor that uses a process pool to execute calls asynchronously. diff --git a/futurist/tests/test_executors.py b/futurist/tests/test_executors.py index fa45ef7..ad7db59 100644 --- a/futurist/tests/test_executors.py +++ b/futurist/tests/test_executors.py @@ -13,6 +13,7 @@ import threading import time import unittest +from unittest import mock from eventlet.green import threading as green_threading import testscenarios @@ -49,6 +50,8 @@ class TestExecutors(testscenarios.TestWithScenarios, base.TestCase): 'restartable': False, 'executor_kwargs': {}}), ('thread', {'executor_cls': futurist.ThreadPoolExecutor, 'restartable': False, 'executor_kwargs': {}}), + ('thread_dyn', {'executor_cls': futurist.DynamicThreadPoolExecutor, + 'restartable': False, 'executor_kwargs': {}}), ('process', {'executor_cls': futurist.ProcessPoolExecutor, 'restartable': False, 'executor_kwargs': {}}), ] @@ -178,3 +181,132 @@ class TestRejection(testscenarios.TestWithScenarios, base.TestCase): self.assertRaises(futurist.RejectedSubmission, self.executor.submit, returns_one) + + +@mock.patch.object(futurist.DynamicThreadPoolExecutor, '_add_thread', + # Use the original function behind the scene + side_effect=futurist.DynamicThreadPoolExecutor._add_thread, + autospec=True) +class TestDynamicThreadPool(base.TestCase): + + def _new(self, *args, **kwargs): + executor = futurist.DynamicThreadPoolExecutor(*args, **kwargs) + self.addCleanup(executor.shutdown, wait=True) + self.assertEqual(0, executor.queue_size) + self.assertEqual(0, executor.num_workers) + self.assertEqual(0, executor.get_num_idle_workers()) + self.assertEqual(0, len(executor._dead_workers)) + return executor + + def test_stays_at_min_worker(self, mock_add_thread): + """Executing tasks sequentially: no growth beyond 1 thread.""" + executor = self._new(max_workers=3) + for _i in range(10): + executor.submit(lambda: None).result() + self.assertEqual(0, executor.queue_size) + self.assertEqual(1, executor.num_workers) + self.assertEqual(1, executor.get_num_idle_workers()) + self.assertEqual(0, len(executor._dead_workers)) + self.assertEqual(1, mock_add_thread.call_count) + + def test_grow_and_shrink(self, mock_add_thread): + """Executing tasks in parallel: grows and shrinks.""" + executor = self._new(max_workers=10) + started = threading.Barrier(11) + done = threading.Event() + + self.addCleanup(started.abort) + self.addCleanup(done.set) + + def task(): + started.wait() + done.wait() + + for _i in range(10): + executor.submit(task) + time.sleep(0.1) # without this threads don't start + + started.wait() + self.assertEqual(0, executor.queue_size) + self.assertEqual(10, executor.num_workers) + self.assertEqual(0, executor.get_num_idle_workers()) + self.assertEqual(0, len(executor._dead_workers)) + self.assertEqual(10, mock_add_thread.call_count) + + done.set() + time.sleep(0.1) + executor.maintain() + self.assertEqual(0, executor.queue_size) + self.assertEqual(1, executor.num_workers) + self.assertEqual(1, executor.get_num_idle_workers()) + self.assertEqual(0, len(executor._dead_workers)) + + +@mock.patch('futurist._thread.ThreadWorker.create_and_register', autospec=True) +class TestDynamicThreadPoolMaintain(base.TestCase): + def test_ensure_one_worker(self, mock_create_thread): + executor = futurist.DynamicThreadPoolExecutor() + executor.maintain() + self.assertEqual(1, len(executor._workers)) + created_worker = mock_create_thread.return_value + created_worker.start.assert_called_once_with() + created_worker.stop.assert_not_called() + + def test_ensure_min_workers(self, mock_create_thread): + executor = futurist.DynamicThreadPoolExecutor(min_workers=42) + executor.maintain() + self.assertEqual(42, len(executor._workers)) + created_worker = mock_create_thread.return_value + created_worker.start.assert_called_with() + self.assertEqual(42, created_worker.start.call_count) + created_worker.stop.assert_not_called() + + def test_too_many_idle_workers(self, mock_create_thread): + executor = futurist.DynamicThreadPoolExecutor(min_workers=42) + executor._workers = [mock.Mock(idle=True)] * 100 + executor.maintain() + self.assertEqual(42, len(executor._workers)) + mock_create_thread.return_value.start.assert_not_called() + self.assertEqual(58, executor._workers[0].stop.call_count) + + def test_all_busy_workers(self, mock_create_thread): + executor = futurist.DynamicThreadPoolExecutor(max_workers=100) + executor._workers = [mock.Mock(idle=False)] * 100 + executor.maintain() + self.assertEqual(100, len(executor._workers)) + mock_create_thread.return_value.start.assert_not_called() + executor._workers[0].stop.assert_not_called() + + def test_busy_workers_create_more(self, mock_create_thread): + executor = futurist.DynamicThreadPoolExecutor(max_workers=200) + executor._workers = [mock.Mock(idle=False)] * 100 + executor.maintain() + # NOTE(dtantsur): once the executor reaches 125 threads, the ratio of + # busy to total threads is exactly 100/125=0.8 (the default + # grow_threshold). One more thread is created, resulting in 126. + self.assertEqual(126, len(executor._workers)) + self.assertEqual(26, executor.get_num_idle_workers()) + created_worker = mock_create_thread.return_value + created_worker.start.assert_called_with() + self.assertEqual(26, created_worker.start.call_count) + created_worker.stop.assert_not_called() + + def test_busy_workers_within_range(self, mock_create_thread): + executor = futurist.DynamicThreadPoolExecutor() + executor._workers = [mock.Mock(idle=i < 30) for i in range(100)] + executor.maintain() + self.assertEqual(100, len(executor._workers)) + mock_create_thread.return_value.start.assert_not_called() + + def test_busy_workers_and_large_queue(self, mock_create_thread): + executor = futurist.DynamicThreadPoolExecutor(max_workers=200) + executor._workers = [mock.Mock(idle=i < 30) for i in range(100)] + for i in range(20): + executor._work_queue.put(None) + executor.maintain() + # NOTE(dtantsur): initial busy ratio is (70+20)/100=0.9. As workers + # are added, it reaches (70+20)/113, which is just below 0.8. + self.assertEqual(113, len(executor._workers)) + created_worker = mock_create_thread.return_value + created_worker.start.assert_called_with() + self.assertEqual(13, created_worker.start.call_count) diff --git a/releasenotes/notes/dynamic-pool-ae8811eecd5f9009.yaml b/releasenotes/notes/dynamic-pool-ae8811eecd5f9009.yaml new file mode 100644 index 0000000..c6f51c0 --- /dev/null +++ b/releasenotes/notes/dynamic-pool-ae8811eecd5f9009.yaml @@ -0,0 +1,10 @@ +--- +features: + - | + Adds a new ``DynamicThreadPoolExecutor`` that can resize (grow or shrink) + its pool based on demand. As new work is scheduled on the executor, it will + try to keep the proportion of busy threads within the provided range + (between 40% and 80% by default). + + The motivation is to provide a scalable alternative to + ``GreenThreadPoolExecutor``.