Replace GreenThreadPoolExecutor in conductor

As part of the broader effort to remove eventlet support from Ironic
this patch updates the conductor’s worker pools to use
`futurist.DynamicThreadPoolExecutor` in place of
`futurist.GreenThreadPoolExecutor`

Although we are now explicitly configuring DynamicThreadPoolExecutor,
eventlet monkey-patching remains in effect, meaning these threads are
still cooperative green threads underneath.

One or more follow-up patches will be required to remove monkey-patching
entirely, switch backends and service launchers to native thread-based
implementations.

The goal here is to decouple from `GreenThreadPoolExecutor`,
which is deprecated, and start validating behavior and tuning under the
real thread interface. To this end, we also change the rejector out
from the futurist provided default rejector, to one which better models
the threaded behavior Ironic will exhibit once Ironic is launching
in threading mode.

Note: This does *not* complete the migration, but is one of the last
major steps before we explicitly begin to remove the invocation of
eventlet.

Depends-On: https://review.opendev.org/c/openstack/futurist/+/955217
Change-Id: I433b0b51f80d7c238e8109878b5c8bc15f9f5849
Signed-off-by: Afonne-CID <afonnepaulc@gmail.com>
Signed-off-by: Julia Kreger <juliaashleykreger@gmail.com>
This commit is contained in:
Afonne-CID
2025-06-19 17:57:00 +01:00
committed by Julia Kreger
parent 56afcc2738
commit bedfd143eb
5 changed files with 156 additions and 31 deletions

View File

@@ -19,7 +19,6 @@ import time
import futurist
from futurist import periodics
from futurist import rejection
from oslo_db import exception as db_exception
from oslo_log import log
from oslo_utils import excutils
@@ -100,25 +99,51 @@ class BaseConductorManager(object):
self.dbapi.clear_node_reservations_for_conductor(self.host)
def _init_executors(self, total_workers, reserved_percentage):
# NOTE(dtantsur): do not allow queuing work. Given our model, it's
# better to reject an incoming request with HTTP 503 or reschedule
# a periodic task that end up with hidden backlog that is hard
# to track and debug. Using 1 instead of 0 because of how things are
# ordered in futurist (it checks for rejection first).
rejection_func = rejection.reject_when_reached(1)
# NOTE(TheJulia): For context, as of 2025.2's development we have
# 16 periodics and 8 power sync threads. We end up scaling our
# minimum number of workers based upon the configuration later on,
# but we do this to try and keep a reasonable balance of workers
reserved_workers = int(total_workers * reserved_percentage / 100)
remaining = total_workers - reserved_workers
LOG.info("Starting workers pool: %d normal workers + %d reserved",
remaining, reserved_workers)
# NOTE(TheJulia): We need to create two separate rejection functions
# which are individually aware of their maximum sizes in order to
# appropriately know if work must be rejected, or not.
# In the past, Ironic's internal model prefers we *never* queue
# more work than the current item, but we won't have a choice
# with cross-thread activity. The custom rejector
# we invoke will result in work being rejected when the new
# work would exceed our overall capacity and thus return an API
# caller with an HTTP 503 or reschedule a task later.
rejection_func = reject_when_reached(remaining)
reserved_rejection_func = reject_when_reached(reserved_workers)
self._executor = futurist.GreenThreadPoolExecutor(
# Calculate our minimum worker count.
# This is imperfect because we presently have 8 power sync workers
# by default, along with 16 periodics, plus we need a little room
# to avoid adding/removing threads constantly. i.e. an idle deployment
# should hover around this many workers by default.
min_workers = int(
CONF.conductor.sync_power_state_workers
+ (2 * CONF.conductor.periodic_max_workers) + 2)
if min_workers >= remaining:
msg = ("The number of calculated minimum workers %(workers)s "
"exceeds the resulting maximum worker count %(max)s." %
{'workers': min_workers,
'max': remaining})
LOG.error("Cannot start ironic: %s", msg)
raise exception.IncorrectConfiguration(error=msg)
self._executor = futurist.DynamicThreadPoolExecutor(
min_workers=min_workers,
max_workers=remaining,
check_and_reject=rejection_func)
if reserved_workers:
self._reserved_executor = futurist.GreenThreadPoolExecutor(
self._reserved_executor = futurist.DynamicThreadPoolExecutor(
max_workers=reserved_workers,
check_and_reject=rejection_func)
check_and_reject=reserved_rejection_func)
else:
self._reserved_executor = None
@@ -136,6 +161,8 @@ class BaseConductorManager(object):
:raises: DriverLoadError if an enabled driver cannot be loaded.
:raises: DriverNameConflict if a classic driver and a dynamic driver
are both enabled and have the same name.
:raises: IncorrectConfiguration if invalid configuration has been
set which cannot permit proper operation.
"""
if self._started:
raise RuntimeError(_('Attempt to start an already running '
@@ -684,3 +711,40 @@ class BaseConductorManager(object):
self._zeroconf.register_service('baremetal',
deploy_utils.get_ironic_api_url(),
params=params)
def reject_when_reached(pool_size):
"""Return a function to reject new work for Ironic.
:param pool_size: The maximum number of items in the execution
pool.
:returns: A function which is executed by futurist when attempting
to determine if work should be deleted, or not.
"""
# NOTE(TheJulia): This is based upon futurist's rejection.py as of
# commit d17f58d. It is extended because Ironic's operating model has
# more than one thread actively adding work which will compete for the
# same lock, and then see the other queued item and reject work without
# consulting the overall and really even before the other thread is
# actually running.
# In other words, Ironic's thread and operation model requires a bit more
# robustness where it is okay to queue items as long as we won't exceed
# the thresholds.
def _rejector(executor, backlog):
# Get the current size once do we're not revisiting it again
# if we are rejecting inbound work.
current_size = int(executor.num_workers)
# Consult the backlog, which might have a couple entries from other
# threads firing up, add 1 to represent the request we're being
# invoked for, and ultimately if this will exceed the pool size
# when compared to the current executor size.
# This does not consider *idle* threads, because
# that too requires the same lock which caused backlog to be a
# non-zero number in the first place.
if backlog + 1 + current_size > pool_size:
raise futurist.RejectedSubmission(_(
"Queued backlog count is %s, and the pool is presently "
"at %s executors. Adding new work would "
"go beyond %s, the maximum thread pool size.") %
(backlog, current_size, pool_size))
return _rejector

View File

@@ -221,6 +221,18 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
self.assertTrue(log_mock.error.called)
del_mock.assert_called_once()
@mock.patch.object(base_manager, 'LOG', autospec=True)
@mock.patch.object(base_manager.BaseConductorManager,
'_register_and_validate_hardware_interfaces',
autospec=True)
@mock.patch.object(base_manager.BaseConductorManager, 'del_host',
autospec=True)
def test_start_fails_incorrect_workers(self, del_mock, reg_mock, log_mock):
CONF.set_override('workers_pool_size', 24, group="conductor")
self.assertRaises(exception.IncorrectConfiguration,
self.service.init_host)
self.assertTrue(log_mock.error.called)
def test_start_recover_nodes_stuck(self):
state_trans = [
(states.DEPLOYING, states.DEPLOYFAIL),
@@ -246,8 +258,10 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
self.assertEqual(state[1], node.provision_state,
'Test failed when recovering from %s' % state[0])
@mock.patch.object(base_manager.BaseConductorManager, '_spawn_worker',
autospec=True)
@mock.patch.object(base_manager, 'LOG', autospec=True)
def test_warning_on_low_workers_pool(self, log_mock):
def test_warning_on_low_workers_pool(self, log_mock, mock_spawn):
CONF.set_override('workers_pool_size', 3, 'conductor')
self._start_service()
self.assertTrue(log_mock.warning.called)
@@ -379,9 +393,9 @@ class ManagerSpawnWorkerTestCase(tests_base.TestCase):
super(ManagerSpawnWorkerTestCase, self).setUp()
self.service = manager.ConductorManager('hostname', 'test-topic')
self.service._executor = mock.Mock(
spec=futurist.GreenThreadPoolExecutor)
spec=futurist.DynamicThreadPoolExecutor)
self.service._reserved_executor = mock.Mock(
spec=futurist.GreenThreadPoolExecutor)
spec=futurist.DynamicThreadPoolExecutor)
self.func = lambda: None
@@ -613,8 +627,10 @@ class StartConsolesTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
self.assertFalse(mock_notify.called)
@mock.patch.object(base_manager, 'LOG', autospec=True)
def test__start_consoles_node_not_found(self, log_mock, mock_notify,
mock_start_console):
@mock.patch.object(base_manager.BaseConductorManager, '_spawn_worker',
autospec=True)
def test__start_consoles_node_not_found(self, mock_spawn, log_mock,
mock_notify, mock_start_console):
test_node = obj_utils.create_test_node(self.context,
driver='fake-hardware',
console_enabled=True)
@@ -658,3 +674,34 @@ class MiscTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
self.assertEqual('transition', entry['event_type'])
self.assertEqual('ERROR', entry['severity'])
self.assertEqual('unknown err', entry['event'])
class RejectorTestCase(db_base.DbTestCase):
def setUp(self):
super(RejectorTestCase, self).setUp()
self.rejector = base_manager.reject_when_reached(5)
def test_reject_when_reached(self):
fake_executor = mock.Mock()
fake_executor.num_workers = 4
error = ('Queued backlog count is 1, and the pool is '
'presently at 4 executors. Adding new work '
'would go beyond 5, the maximum thread '
'pool size.')
self.assertRaisesRegex(futurist.RejectedSubmission,
error,
self.rejector, fake_executor, 1)
fake_executor.num_workers = 3
error = ('Queued backlog count is 2, and the pool is '
'presently at 3 executors. Adding new work '
'would go beyond 5, the maximum thread '
'pool size.')
self.assertRaisesRegex(futurist.RejectedSubmission,
error,
self.rejector, fake_executor, 2)
def test_not_reject_when_reached(self):
fake_executor = mock.Mock()
fake_executor.num_workers = 3
self.rejector(fake_executor, 1)

View File

@@ -1616,7 +1616,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
'async': False,
'attach': False,
'http_methods': ['POST']}}
self.service.init_host()
self._start_service()
# init_host() called _spawn_worker because of the heartbeat
mock_spawn.reset_mock()
# init_host() called get_interface during driver loading
@@ -1646,7 +1646,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
'async': True,
'attach': False,
'http_methods': ['POST']}}
self.service.init_host()
self._start_service()
# init_host() called _spawn_worker because of the heartbeat
mock_spawn.reset_mock()
@@ -1667,7 +1667,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
'test_method': {'func': mock.MagicMock(),
'async': True,
'http_methods': ['POST']}}
self.service.init_host()
self._start_service()
# GET not supported by test_method
exc = self.assertRaises(messaging.ExpectedException,
self.service.driver_vendor_passthru,
@@ -1680,7 +1680,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
def test_driver_vendor_passthru_method_not_supported(self):
# Test for when the vendor interface is set, but hasn't passed a
# driver_passthru_mapping to MixinVendorInterface
self.service.init_host()
self._start_service()
exc = self.assertRaises(messaging.ExpectedException,
self.service.driver_vendor_passthru,
self.context, 'fake-hardware', 'test_method',
@@ -1690,7 +1690,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
exc.exc_info[0])
def test_driver_vendor_passthru_driver_not_found(self):
self.service.init_host()
self._start_service()
self.assertRaises(messaging.ExpectedException,
self.service.driver_vendor_passthru,
self.context, 'does_not_exist', 'test_method',
@@ -1699,7 +1699,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
@mock.patch.object(driver_factory, 'default_interface', autospec=True)
def test_driver_vendor_passthru_no_default_interface(self,
mock_def_iface):
self.service.init_host()
self._start_service()
# NOTE(rloo): service.init_host() will call
# driver_factory.default_interface() and we want these to
# succeed, so we set the side effect *after* that call.
@@ -1725,7 +1725,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
'http_methods': ['POST'],
'func': None}}
vendor_mock.driver_routes = fake_routes
self.service.init_host()
self._start_service()
# init_host() will call get_interface
mock_get_if.reset_mock()
@@ -1741,7 +1741,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
@mock.patch.object(driver_factory, 'default_interface', autospec=True)
def test_get_driver_vendor_passthru_methods_no_default_interface(
self, mock_def_iface):
self.service.init_host()
self._start_service()
# NOTE(rloo): service.init_host() will call
# driver_factory.default_interface() and we want these to
# succeed, so we set the side effect *after* that call.
@@ -1766,7 +1766,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
'test_method': {'func': test_method,
'async': False,
'http_methods': ['POST']}}
self.service.init_host()
self._start_service()
exc = self.assertRaises(messaging.ExpectedException,
self.service.driver_vendor_passthru,
self.context, 'fake-hardware', 'test_method',
@@ -3881,6 +3881,8 @@ class MiscTestCase(mgr_utils.ServiceSetUpMixin, mgr_utils.CommonMixIn,
mock_fail_if_state):
self._start_service()
self.service._correct_stuck_states()
# reset_mock() after startup to verify only test-invoked ones
mock_nodeinfo_list.reset_mock()
self.columns = ['uuid', 'driver', 'conductor_group', 'id']
nodes = [self._create_node(id=i, driver='fake-hardware',
conductor_group='')
@@ -4765,7 +4767,6 @@ class SensorsTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
self.service._sensors_nodes_task,
self.context, mock.ANY)
@mock.patch.object(queue, 'Queue', autospec=True)
@mock.patch.object(manager.ConductorManager, '_sensors_conductor',
autospec=True)
@mock.patch.object(manager.ConductorManager, '_spawn_worker',
@@ -4776,8 +4777,7 @@ class SensorsTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
def test___send_sensor_data_disabled(
self, get_nodeinfo_list_mock,
_mapped_to_this_conductor_mock,
mock_spawn, mock_sensors_conductor,
mock_queue):
mock_spawn, mock_sensors_conductor):
self._start_service()
CONF.set_override('send_sensor_data', True, group='sensor_data')
@@ -4790,7 +4790,15 @@ class SensorsTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
group='sensor_data')
_mapped_to_this_conductor_mock.return_value = True
get_nodeinfo_list_mock.return_value = [('fake_uuid', 'fake', None)]
self.service._send_sensor_data(self.context)
# NOTE(cid): Patch here instead of as a decorator to avoid replacing
# queue.Queue globally. ThreadPoolExecutor uses it in
# _init_executors(), called by init_host(), and relies on real queue
# semantics for backlog checks.
with mock.patch('ironic.conductor.manager.queue.Queue',
autospec=True) as mock_queue:
self.service._send_sensor_data(self.context)
mock_sensors_conductor.assert_not_called()
# NOTE(TheJulia): Can't use the spawn worker since it records other,
# unrelated calls. So, queue works well here.
@@ -6611,7 +6619,7 @@ class ManagerTestProperties(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
self._check_driver_properties('manual-management', [])
def test_driver_properties_fail(self):
self.service.init_host()
self._start_service()
exc = self.assertRaises(messaging.rpc.ExpectedException,
self.service.get_driver_properties,
self.context, "bad-driver")

View File

@@ -0,0 +1,6 @@
---
upgrade:
- |
The minimum version of the ``futurist`` library is now ``3.2.0``.
This change is being made to support the removal of the Eventlet
library from use in Ironic.

View File

@@ -35,7 +35,7 @@ tenacity>=6.3.1 # Apache-2.0
oslo.versionedobjects>=1.31.2 # Apache-2.0
jsonschema>=4.0.0 # MIT
psutil>=3.2.2 # BSD
futurist>=1.2.0 # Apache-2.0
futurist>=3.2.0 # Apache-2.0
tooz>=2.7.0 # Apache-2.0
openstacksdk>=0.99.0 # Apache-2.0
sushy>=5.7.0