Add DynamicThreadPoolExecutor that resizes itself

The existing ThreadPoolExecutor can only grow its workers pool, and does
it until the size reaches max_workers. This change adds a new
DynamicThreadPoolExecutor that can both grow and shrink the pool based
on the proportion of the currently busy threads to the total number
of threads.

Change-Id: I5a92977b71a120748feb1c370cd3a0106b574f0a
Signed-off-by: Dmitry Tantsur <dtantsur@protonmail.com>
Signed-off-by: Julia Kreger <juliaashleykreger@gmail.com>
This commit is contained in:
Dmitry Tantsur
2025-07-16 19:10:59 +02:00
committed by Julia Kreger
parent 367ecabc67
commit 679355a335
5 changed files with 337 additions and 8 deletions

View File

@@ -22,6 +22,10 @@ Executors
:members:
:special-members: __init__
.. autoclass:: futurist.DynamicThreadPoolExecutor
:members:
:special-members: __init__
-------
Futures
-------

View File

@@ -20,6 +20,7 @@ from futurist._futures import GreenFuture
from futurist._futures import CancelledError
from futurist._futures import TimeoutError
from futurist._futures import DynamicThreadPoolExecutor
from futurist._futures import GreenThreadPoolExecutor
from futurist._futures import ProcessPoolExecutor
from futurist._futures import SynchronousExecutor
@@ -37,6 +38,7 @@ __all__ = [
'GreenThreadPoolExecutor', 'ProcessPoolExecutor',
'SynchronousExecutor', 'ThreadPoolExecutor',
'DynamicThreadPoolExecutor',
'RejectedSubmission',

View File

@@ -13,8 +13,10 @@
# under the License.
import functools
import logging
import queue
import threading
import time
from concurrent import futures as _futures
from concurrent.futures import process as _process
@@ -29,6 +31,9 @@ TimeoutError = _futures.TimeoutError
CancelledError = _futures.CancelledError
LOG = logging.getLogger(__name__)
class RejectedSubmission(Exception):
"""Exception raised when a submitted call is rejected (for some reason)."""
@@ -102,6 +107,10 @@ class ThreadPoolExecutor(_futures.Executor):
It gathers statistics about the submissions executed for post-analysis...
Note that this executor never shrinks its thread pool, which will cause
the pool to eventually reach its maximum capacity defined by max_workers.
Check :py:class:`DynamicThreadPoolExecutor` for an alternative.
See: https://docs.python.org/dev/library/concurrent.futures.html
"""
@@ -172,18 +181,19 @@ class ThreadPoolExecutor(_futures.Executor):
with self._shutdown_lock:
return sum(1 for w in self._workers if w.idle)
def _add_thread(self):
w = _thread.ThreadWorker.create_and_register(
self, self._work_queue)
# Always save it before we start (so that even if we fail
# starting it we can correctly join on it).
self._workers.append(w)
w.start()
def _maybe_spin_up(self):
"""Spin up a worker if needed."""
# Do more advanced idle checks and/or reaping of very idle
# threads in the future....
if (not self._workers or
len(self._workers) < self._max_workers):
w = _thread.ThreadWorker.create_and_register(
self, self._work_queue)
# Always save it before we start (so that even if we fail
# starting it we can correctly join on it).
self._workers.append(w)
w.start()
self._add_thread()
def shutdown(self, wait=True):
with self._shutdown_lock:
@@ -203,6 +213,9 @@ class ThreadPoolExecutor(_futures.Executor):
def submit(self, fn, *args, **kwargs):
"""Submit some work to be executed (and gather statistics)."""
# NOTE(dtantsur): DynamicThreadPoolExecutor relies on this lock for
# its complex logic around thread management. If you ever decide to
# remove it, please add a lock there instead.
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('Can not schedule new futures'
@@ -211,6 +224,174 @@ class ThreadPoolExecutor(_futures.Executor):
return self._gatherer.submit(fn, *args, **kwargs)
class DynamicThreadPoolExecutor(ThreadPoolExecutor):
"""Executor that creates or removes threads on demand.
As new work is scheduled on the executor, it will try to keep the
proportion of busy threads within the provided range (between 40% and 80%
by default). A busy thread is a thread that is not waiting on the
task queue.
Each time a task is submitted, the executor makes a decision whether to
grow or shrink the pool. It takes the proportion of the number of busy
threads to the total number of threads and compares it to shrink_threshold
and grow_threshold.
Initially, the pool is empty, so submitting a task always result in one
new thread. Since min_workers must be positive, at least one thread will
always be available after this point.
Once the proportion of busy threads reaches grow_threshold (e.g. 4 out of 5
with the default grow_threshold of 0.8), a new thread is created when a
task is submitted. If on submitting a task a proportion of busy threads is
below shrink_threshold (e.g. only 2 out of 5), one idle thread is stopped.
The values of grow_threshold and shrink_threshold are different to prevent
the number of threads from oscilating on reaching grow_threshold.
If threads are not created often in your application, the number of idle
threads may stay high for a long time. To avoid it, you can call
:py:meth:`.maintain` periodically to keep the number of threads within
the thresholds.
"""
def __init__(self, max_workers=None, check_and_reject=None,
min_workers=1, grow_threshold=0.8,
shrink_threshold=0.4):
"""Initializes a thread pool executor.
:param max_workers: maximum number of workers that can be
simultaneously active at the same time, further
submitted work will be queued up when this limit
is reached.
:type max_workers: int
:param check_and_reject: a callback function that will be provided
two position arguments, the first argument
will be this executor instance, and the second
will be the number of currently queued work
items in this executors backlog; the callback
should raise a :py:class:`.RejectedSubmission`
exception if it wants to have this submission
rejected.
:type check_and_reject: callback
:param min_workers: the minimum number of workers that can be reached
when shrinking the pool. Note that the pool always
starts at zero workers and will be smaller than
min_workers until enough workers are created.
At least one thread is required.
:type max_workers: int
:param grow_threshold: minimum proportion of busy threads to total
threads for the pool to grow.
:type grow_threshold: float
:param shrink_threshold: maximum proportion of busy threads to total
threads for the pool to shrink.
:type shrink_threshold: float
"""
super().__init__(max_workers=max_workers,
check_and_reject=check_and_reject)
if min_workers <= 0:
raise ValueError('min_workers must be positive')
if max_workers and min_workers >= max_workers:
raise ValueError('min_workers must be less than max_workers')
self._min_workers = min_workers
if grow_threshold <= 0 or grow_threshold > 1.0:
raise ValueError('grow_threshold must be within (0, 1]')
if shrink_threshold < 0 or shrink_threshold >= 1.0:
raise ValueError('shrink_threshold must be within [0, 1)')
if shrink_threshold >= grow_threshold:
raise ValueError(
'shrink_threshold must be less than grow_threshold')
self._grow_threshold = grow_threshold
self._shrink_threshold = shrink_threshold
self._dead_workers = []
def _drop_thread(self):
new_workers = []
idle_worker = None
for i, w in enumerate(self._workers):
if w.idle:
new_workers = self._workers[i + 1:]
idle_worker = w
break
new_workers.append(w)
if idle_worker is None:
# Should not actually happen but races are possible; do nothing
LOG.warning(
'No idle worker thread to delete when shrinking pool %r', self)
return False
w.stop()
self._workers = new_workers
self._dead_workers.append(w)
return True
def _maybe_spin_up(self):
nthreads = self.num_workers
if nthreads < self._min_workers:
self._add_thread()
return True
# NOTE(dtantsur): here we count the number of threads that are
# doing something (i.e. are not waiting on the queue) plus the
# number of tasks in the queue. In theory, if there are idle
# workers, the queue should be empty. But race conditions are
# possible when workers do not pick up tasks quickly enough,
# especially in the presence of CPU-bound tasks.
idle = self.get_num_idle_workers()
busy = (nthreads - idle + self.queue_size) / nthreads
if busy >= self._grow_threshold and nthreads < self._max_workers:
LOG.debug('Creating a new worker thread for pool %r '
'(%d thread(s) idle, queue size %d, total %d thread(s))',
self, idle, self.queue_size, nthreads)
self._add_thread()
return True
elif busy <= self._shrink_threshold and nthreads > self._min_workers:
LOG.debug('Deleting a worker thread from pool %r '
'(%d thread(s) idle, queue size %d, total %d thread(s))',
self, idle, self.queue_size, nthreads)
return self._drop_thread()
return False
def maintain(self):
"""Keep the number of threads within the expected range.
If too many idle threads are running, they are deleted.
Additionally, deleted workers are joined to free up resources.
"""
# NOTE(dtantsur): this call can potentially run for some time, so
# avoid taking shutdown_lock once and holding it for the entire
# duration (blocking any new tasks from being added).
keep_going = True
while keep_going:
if self._shutdown:
return
with self._shutdown_lock:
keep_going = self._maybe_spin_up()
time.sleep(0)
# NOTE(dtantsur): copy the value of _dead_workers to prevent races with
# other invocations for maintain or shutdown.
with self._shutdown_lock:
dead_workers = self._dead_workers
self._dead_workers = []
for w in dead_workers:
w.join()
def shutdown(self, wait=True):
super().shutdown(wait=wait)
if wait:
for w in self._dead_workers:
w.join()
class ProcessPoolExecutor(_process.ProcessPoolExecutor):
"""Executor that uses a process pool to execute calls asynchronously.

View File

@@ -13,6 +13,7 @@
import threading
import time
import unittest
from unittest import mock
from eventlet.green import threading as green_threading
import testscenarios
@@ -49,6 +50,8 @@ class TestExecutors(testscenarios.TestWithScenarios, base.TestCase):
'restartable': False, 'executor_kwargs': {}}),
('thread', {'executor_cls': futurist.ThreadPoolExecutor,
'restartable': False, 'executor_kwargs': {}}),
('thread_dyn', {'executor_cls': futurist.DynamicThreadPoolExecutor,
'restartable': False, 'executor_kwargs': {}}),
('process', {'executor_cls': futurist.ProcessPoolExecutor,
'restartable': False, 'executor_kwargs': {}}),
]
@@ -178,3 +181,132 @@ class TestRejection(testscenarios.TestWithScenarios, base.TestCase):
self.assertRaises(futurist.RejectedSubmission,
self.executor.submit, returns_one)
@mock.patch.object(futurist.DynamicThreadPoolExecutor, '_add_thread',
# Use the original function behind the scene
side_effect=futurist.DynamicThreadPoolExecutor._add_thread,
autospec=True)
class TestDynamicThreadPool(base.TestCase):
def _new(self, *args, **kwargs):
executor = futurist.DynamicThreadPoolExecutor(*args, **kwargs)
self.addCleanup(executor.shutdown, wait=True)
self.assertEqual(0, executor.queue_size)
self.assertEqual(0, executor.num_workers)
self.assertEqual(0, executor.get_num_idle_workers())
self.assertEqual(0, len(executor._dead_workers))
return executor
def test_stays_at_min_worker(self, mock_add_thread):
"""Executing tasks sequentially: no growth beyond 1 thread."""
executor = self._new(max_workers=3)
for _i in range(10):
executor.submit(lambda: None).result()
self.assertEqual(0, executor.queue_size)
self.assertEqual(1, executor.num_workers)
self.assertEqual(1, executor.get_num_idle_workers())
self.assertEqual(0, len(executor._dead_workers))
self.assertEqual(1, mock_add_thread.call_count)
def test_grow_and_shrink(self, mock_add_thread):
"""Executing tasks in parallel: grows and shrinks."""
executor = self._new(max_workers=10)
started = threading.Barrier(11)
done = threading.Event()
self.addCleanup(started.abort)
self.addCleanup(done.set)
def task():
started.wait()
done.wait()
for _i in range(10):
executor.submit(task)
time.sleep(0.1) # without this threads don't start
started.wait()
self.assertEqual(0, executor.queue_size)
self.assertEqual(10, executor.num_workers)
self.assertEqual(0, executor.get_num_idle_workers())
self.assertEqual(0, len(executor._dead_workers))
self.assertEqual(10, mock_add_thread.call_count)
done.set()
time.sleep(0.1)
executor.maintain()
self.assertEqual(0, executor.queue_size)
self.assertEqual(1, executor.num_workers)
self.assertEqual(1, executor.get_num_idle_workers())
self.assertEqual(0, len(executor._dead_workers))
@mock.patch('futurist._thread.ThreadWorker.create_and_register', autospec=True)
class TestDynamicThreadPoolMaintain(base.TestCase):
def test_ensure_one_worker(self, mock_create_thread):
executor = futurist.DynamicThreadPoolExecutor()
executor.maintain()
self.assertEqual(1, len(executor._workers))
created_worker = mock_create_thread.return_value
created_worker.start.assert_called_once_with()
created_worker.stop.assert_not_called()
def test_ensure_min_workers(self, mock_create_thread):
executor = futurist.DynamicThreadPoolExecutor(min_workers=42)
executor.maintain()
self.assertEqual(42, len(executor._workers))
created_worker = mock_create_thread.return_value
created_worker.start.assert_called_with()
self.assertEqual(42, created_worker.start.call_count)
created_worker.stop.assert_not_called()
def test_too_many_idle_workers(self, mock_create_thread):
executor = futurist.DynamicThreadPoolExecutor(min_workers=42)
executor._workers = [mock.Mock(idle=True)] * 100
executor.maintain()
self.assertEqual(42, len(executor._workers))
mock_create_thread.return_value.start.assert_not_called()
self.assertEqual(58, executor._workers[0].stop.call_count)
def test_all_busy_workers(self, mock_create_thread):
executor = futurist.DynamicThreadPoolExecutor(max_workers=100)
executor._workers = [mock.Mock(idle=False)] * 100
executor.maintain()
self.assertEqual(100, len(executor._workers))
mock_create_thread.return_value.start.assert_not_called()
executor._workers[0].stop.assert_not_called()
def test_busy_workers_create_more(self, mock_create_thread):
executor = futurist.DynamicThreadPoolExecutor(max_workers=200)
executor._workers = [mock.Mock(idle=False)] * 100
executor.maintain()
# NOTE(dtantsur): once the executor reaches 125 threads, the ratio of
# busy to total threads is exactly 100/125=0.8 (the default
# grow_threshold). One more thread is created, resulting in 126.
self.assertEqual(126, len(executor._workers))
self.assertEqual(26, executor.get_num_idle_workers())
created_worker = mock_create_thread.return_value
created_worker.start.assert_called_with()
self.assertEqual(26, created_worker.start.call_count)
created_worker.stop.assert_not_called()
def test_busy_workers_within_range(self, mock_create_thread):
executor = futurist.DynamicThreadPoolExecutor()
executor._workers = [mock.Mock(idle=i < 30) for i in range(100)]
executor.maintain()
self.assertEqual(100, len(executor._workers))
mock_create_thread.return_value.start.assert_not_called()
def test_busy_workers_and_large_queue(self, mock_create_thread):
executor = futurist.DynamicThreadPoolExecutor(max_workers=200)
executor._workers = [mock.Mock(idle=i < 30) for i in range(100)]
for i in range(20):
executor._work_queue.put(None)
executor.maintain()
# NOTE(dtantsur): initial busy ratio is (70+20)/100=0.9. As workers
# are added, it reaches (70+20)/113, which is just below 0.8.
self.assertEqual(113, len(executor._workers))
created_worker = mock_create_thread.return_value
created_worker.start.assert_called_with()
self.assertEqual(13, created_worker.start.call_count)

View File

@@ -0,0 +1,10 @@
---
features:
- |
Adds a new ``DynamicThreadPoolExecutor`` that can resize (grow or shrink)
its pool based on demand. As new work is scheduled on the executor, it will
try to keep the proportion of busy threads within the provided range
(between 40% and 80% by default).
The motivation is to provide a scalable alternative to
``GreenThreadPoolExecutor``.