diff --git a/ironic/conductor/base_manager.py b/ironic/conductor/base_manager.py index ede4c16a3e..501ba6fe34 100644 --- a/ironic/conductor/base_manager.py +++ b/ironic/conductor/base_manager.py @@ -19,7 +19,6 @@ import time import futurist from futurist import periodics -from futurist import rejection from oslo_db import exception as db_exception from oslo_log import log from oslo_utils import excutils @@ -100,25 +99,51 @@ class BaseConductorManager(object): self.dbapi.clear_node_reservations_for_conductor(self.host) def _init_executors(self, total_workers, reserved_percentage): - # NOTE(dtantsur): do not allow queuing work. Given our model, it's - # better to reject an incoming request with HTTP 503 or reschedule - # a periodic task that end up with hidden backlog that is hard - # to track and debug. Using 1 instead of 0 because of how things are - # ordered in futurist (it checks for rejection first). - rejection_func = rejection.reject_when_reached(1) - + # NOTE(TheJulia): For context, as of 2025.2's development we have + # 16 periodics and 8 power sync threads. We end up scaling our + # minimum number of workers based upon the configuration later on, + # but we do this to try and keep a reasonable balance of workers reserved_workers = int(total_workers * reserved_percentage / 100) remaining = total_workers - reserved_workers LOG.info("Starting workers pool: %d normal workers + %d reserved", remaining, reserved_workers) + # NOTE(TheJulia): We need to create two separate rejection functions + # which are individually aware of their maximum sizes in order to + # appropriately know if work must be rejected, or not. + # In the past, Ironic's internal model prefers we *never* queue + # more work than the current item, but we won't have a choice + # with cross-thread activity. The custom rejector + # we invoke will result in work being rejected when the new + # work would exceed our overall capacity and thus return an API + # caller with an HTTP 503 or reschedule a task later. + rejection_func = reject_when_reached(remaining) + reserved_rejection_func = reject_when_reached(reserved_workers) - self._executor = futurist.GreenThreadPoolExecutor( + # Calculate our minimum worker count. + # This is imperfect because we presently have 8 power sync workers + # by default, along with 16 periodics, plus we need a little room + # to avoid adding/removing threads constantly. i.e. an idle deployment + # should hover around this many workers by default. + min_workers = int( + CONF.conductor.sync_power_state_workers + + (2 * CONF.conductor.periodic_max_workers) + 2) + + if min_workers >= remaining: + msg = ("The number of calculated minimum workers %(workers)s " + "exceeds the resulting maximum worker count %(max)s." % + {'workers': min_workers, + 'max': remaining}) + LOG.error("Cannot start ironic: %s", msg) + raise exception.IncorrectConfiguration(error=msg) + + self._executor = futurist.DynamicThreadPoolExecutor( + min_workers=min_workers, max_workers=remaining, check_and_reject=rejection_func) if reserved_workers: - self._reserved_executor = futurist.GreenThreadPoolExecutor( + self._reserved_executor = futurist.DynamicThreadPoolExecutor( max_workers=reserved_workers, - check_and_reject=rejection_func) + check_and_reject=reserved_rejection_func) else: self._reserved_executor = None @@ -136,6 +161,8 @@ class BaseConductorManager(object): :raises: DriverLoadError if an enabled driver cannot be loaded. :raises: DriverNameConflict if a classic driver and a dynamic driver are both enabled and have the same name. + :raises: IncorrectConfiguration if invalid configuration has been + set which cannot permit proper operation. """ if self._started: raise RuntimeError(_('Attempt to start an already running ' @@ -684,3 +711,40 @@ class BaseConductorManager(object): self._zeroconf.register_service('baremetal', deploy_utils.get_ironic_api_url(), params=params) + + +def reject_when_reached(pool_size): + """Return a function to reject new work for Ironic. + + :param pool_size: The maximum number of items in the execution + pool. + :returns: A function which is executed by futurist when attempting + to determine if work should be deleted, or not. + """ + # NOTE(TheJulia): This is based upon futurist's rejection.py as of + # commit d17f58d. It is extended because Ironic's operating model has + # more than one thread actively adding work which will compete for the + # same lock, and then see the other queued item and reject work without + # consulting the overall and really even before the other thread is + # actually running. + # In other words, Ironic's thread and operation model requires a bit more + # robustness where it is okay to queue items as long as we won't exceed + # the thresholds. + def _rejector(executor, backlog): + # Get the current size once do we're not revisiting it again + # if we are rejecting inbound work. + current_size = int(executor.num_workers) + # Consult the backlog, which might have a couple entries from other + # threads firing up, add 1 to represent the request we're being + # invoked for, and ultimately if this will exceed the pool size + # when compared to the current executor size. + # This does not consider *idle* threads, because + # that too requires the same lock which caused backlog to be a + # non-zero number in the first place. + if backlog + 1 + current_size > pool_size: + raise futurist.RejectedSubmission(_( + "Queued backlog count is %s, and the pool is presently " + "at %s executors. Adding new work would " + "go beyond %s, the maximum thread pool size.") % + (backlog, current_size, pool_size)) + return _rejector diff --git a/ironic/tests/unit/conductor/test_base_manager.py b/ironic/tests/unit/conductor/test_base_manager.py index f45df1647f..4ab4ea70f9 100644 --- a/ironic/tests/unit/conductor/test_base_manager.py +++ b/ironic/tests/unit/conductor/test_base_manager.py @@ -221,6 +221,18 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): self.assertTrue(log_mock.error.called) del_mock.assert_called_once() + @mock.patch.object(base_manager, 'LOG', autospec=True) + @mock.patch.object(base_manager.BaseConductorManager, + '_register_and_validate_hardware_interfaces', + autospec=True) + @mock.patch.object(base_manager.BaseConductorManager, 'del_host', + autospec=True) + def test_start_fails_incorrect_workers(self, del_mock, reg_mock, log_mock): + CONF.set_override('workers_pool_size', 24, group="conductor") + self.assertRaises(exception.IncorrectConfiguration, + self.service.init_host) + self.assertTrue(log_mock.error.called) + def test_start_recover_nodes_stuck(self): state_trans = [ (states.DEPLOYING, states.DEPLOYFAIL), @@ -246,8 +258,10 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): self.assertEqual(state[1], node.provision_state, 'Test failed when recovering from %s' % state[0]) + @mock.patch.object(base_manager.BaseConductorManager, '_spawn_worker', + autospec=True) @mock.patch.object(base_manager, 'LOG', autospec=True) - def test_warning_on_low_workers_pool(self, log_mock): + def test_warning_on_low_workers_pool(self, log_mock, mock_spawn): CONF.set_override('workers_pool_size', 3, 'conductor') self._start_service() self.assertTrue(log_mock.warning.called) @@ -379,9 +393,9 @@ class ManagerSpawnWorkerTestCase(tests_base.TestCase): super(ManagerSpawnWorkerTestCase, self).setUp() self.service = manager.ConductorManager('hostname', 'test-topic') self.service._executor = mock.Mock( - spec=futurist.GreenThreadPoolExecutor) + spec=futurist.DynamicThreadPoolExecutor) self.service._reserved_executor = mock.Mock( - spec=futurist.GreenThreadPoolExecutor) + spec=futurist.DynamicThreadPoolExecutor) self.func = lambda: None @@ -613,8 +627,10 @@ class StartConsolesTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): self.assertFalse(mock_notify.called) @mock.patch.object(base_manager, 'LOG', autospec=True) - def test__start_consoles_node_not_found(self, log_mock, mock_notify, - mock_start_console): + @mock.patch.object(base_manager.BaseConductorManager, '_spawn_worker', + autospec=True) + def test__start_consoles_node_not_found(self, mock_spawn, log_mock, + mock_notify, mock_start_console): test_node = obj_utils.create_test_node(self.context, driver='fake-hardware', console_enabled=True) @@ -658,3 +674,34 @@ class MiscTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): self.assertEqual('transition', entry['event_type']) self.assertEqual('ERROR', entry['severity']) self.assertEqual('unknown err', entry['event']) + + +class RejectorTestCase(db_base.DbTestCase): + + def setUp(self): + super(RejectorTestCase, self).setUp() + self.rejector = base_manager.reject_when_reached(5) + + def test_reject_when_reached(self): + fake_executor = mock.Mock() + fake_executor.num_workers = 4 + error = ('Queued backlog count is 1, and the pool is ' + 'presently at 4 executors. Adding new work ' + 'would go beyond 5, the maximum thread ' + 'pool size.') + self.assertRaisesRegex(futurist.RejectedSubmission, + error, + self.rejector, fake_executor, 1) + fake_executor.num_workers = 3 + error = ('Queued backlog count is 2, and the pool is ' + 'presently at 3 executors. Adding new work ' + 'would go beyond 5, the maximum thread ' + 'pool size.') + self.assertRaisesRegex(futurist.RejectedSubmission, + error, + self.rejector, fake_executor, 2) + + def test_not_reject_when_reached(self): + fake_executor = mock.Mock() + fake_executor.num_workers = 3 + self.rejector(fake_executor, 1) diff --git a/ironic/tests/unit/conductor/test_manager.py b/ironic/tests/unit/conductor/test_manager.py index e358853f71..c0309c6663 100644 --- a/ironic/tests/unit/conductor/test_manager.py +++ b/ironic/tests/unit/conductor/test_manager.py @@ -1616,7 +1616,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): 'async': False, 'attach': False, 'http_methods': ['POST']}} - self.service.init_host() + self._start_service() # init_host() called _spawn_worker because of the heartbeat mock_spawn.reset_mock() # init_host() called get_interface during driver loading @@ -1646,7 +1646,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): 'async': True, 'attach': False, 'http_methods': ['POST']}} - self.service.init_host() + self._start_service() # init_host() called _spawn_worker because of the heartbeat mock_spawn.reset_mock() @@ -1667,7 +1667,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): 'test_method': {'func': mock.MagicMock(), 'async': True, 'http_methods': ['POST']}} - self.service.init_host() + self._start_service() # GET not supported by test_method exc = self.assertRaises(messaging.ExpectedException, self.service.driver_vendor_passthru, @@ -1680,7 +1680,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): def test_driver_vendor_passthru_method_not_supported(self): # Test for when the vendor interface is set, but hasn't passed a # driver_passthru_mapping to MixinVendorInterface - self.service.init_host() + self._start_service() exc = self.assertRaises(messaging.ExpectedException, self.service.driver_vendor_passthru, self.context, 'fake-hardware', 'test_method', @@ -1690,7 +1690,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): exc.exc_info[0]) def test_driver_vendor_passthru_driver_not_found(self): - self.service.init_host() + self._start_service() self.assertRaises(messaging.ExpectedException, self.service.driver_vendor_passthru, self.context, 'does_not_exist', 'test_method', @@ -1699,7 +1699,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): @mock.patch.object(driver_factory, 'default_interface', autospec=True) def test_driver_vendor_passthru_no_default_interface(self, mock_def_iface): - self.service.init_host() + self._start_service() # NOTE(rloo): service.init_host() will call # driver_factory.default_interface() and we want these to # succeed, so we set the side effect *after* that call. @@ -1725,7 +1725,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): 'http_methods': ['POST'], 'func': None}} vendor_mock.driver_routes = fake_routes - self.service.init_host() + self._start_service() # init_host() will call get_interface mock_get_if.reset_mock() @@ -1741,7 +1741,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): @mock.patch.object(driver_factory, 'default_interface', autospec=True) def test_get_driver_vendor_passthru_methods_no_default_interface( self, mock_def_iface): - self.service.init_host() + self._start_service() # NOTE(rloo): service.init_host() will call # driver_factory.default_interface() and we want these to # succeed, so we set the side effect *after* that call. @@ -1766,7 +1766,7 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): 'test_method': {'func': test_method, 'async': False, 'http_methods': ['POST']}} - self.service.init_host() + self._start_service() exc = self.assertRaises(messaging.ExpectedException, self.service.driver_vendor_passthru, self.context, 'fake-hardware', 'test_method', @@ -3881,6 +3881,8 @@ class MiscTestCase(mgr_utils.ServiceSetUpMixin, mgr_utils.CommonMixIn, mock_fail_if_state): self._start_service() self.service._correct_stuck_states() + # reset_mock() after startup to verify only test-invoked ones + mock_nodeinfo_list.reset_mock() self.columns = ['uuid', 'driver', 'conductor_group', 'id'] nodes = [self._create_node(id=i, driver='fake-hardware', conductor_group='') @@ -4765,7 +4767,6 @@ class SensorsTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): self.service._sensors_nodes_task, self.context, mock.ANY) - @mock.patch.object(queue, 'Queue', autospec=True) @mock.patch.object(manager.ConductorManager, '_sensors_conductor', autospec=True) @mock.patch.object(manager.ConductorManager, '_spawn_worker', @@ -4776,8 +4777,7 @@ class SensorsTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): def test___send_sensor_data_disabled( self, get_nodeinfo_list_mock, _mapped_to_this_conductor_mock, - mock_spawn, mock_sensors_conductor, - mock_queue): + mock_spawn, mock_sensors_conductor): self._start_service() CONF.set_override('send_sensor_data', True, group='sensor_data') @@ -4790,7 +4790,15 @@ class SensorsTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): group='sensor_data') _mapped_to_this_conductor_mock.return_value = True get_nodeinfo_list_mock.return_value = [('fake_uuid', 'fake', None)] - self.service._send_sensor_data(self.context) + + # NOTE(cid): Patch here instead of as a decorator to avoid replacing + # queue.Queue globally. ThreadPoolExecutor uses it in + # _init_executors(), called by init_host(), and relies on real queue + # semantics for backlog checks. + with mock.patch('ironic.conductor.manager.queue.Queue', + autospec=True) as mock_queue: + self.service._send_sensor_data(self.context) + mock_sensors_conductor.assert_not_called() # NOTE(TheJulia): Can't use the spawn worker since it records other, # unrelated calls. So, queue works well here. @@ -6611,7 +6619,7 @@ class ManagerTestProperties(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): self._check_driver_properties('manual-management', []) def test_driver_properties_fail(self): - self.service.init_host() + self._start_service() exc = self.assertRaises(messaging.rpc.ExpectedException, self.service.get_driver_properties, self.context, "bad-driver") diff --git a/releasenotes/notes/futurist-minimum-ec66ccfcc4271a5c.yaml b/releasenotes/notes/futurist-minimum-ec66ccfcc4271a5c.yaml new file mode 100644 index 0000000000..c8a5253ee3 --- /dev/null +++ b/releasenotes/notes/futurist-minimum-ec66ccfcc4271a5c.yaml @@ -0,0 +1,6 @@ +--- +upgrade: + - | + The minimum version of the ``futurist`` library is now ``3.2.0``. + This change is being made to support the removal of the Eventlet + library from use in Ironic. diff --git a/requirements.txt b/requirements.txt index 028e209a9d..ddf8eb7c6a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -35,7 +35,7 @@ tenacity>=6.3.1 # Apache-2.0 oslo.versionedobjects>=1.31.2 # Apache-2.0 jsonschema>=4.0.0 # MIT psutil>=3.2.2 # BSD -futurist>=1.2.0 # Apache-2.0 +futurist>=3.2.0 # Apache-2.0 tooz>=2.7.0 # Apache-2.0 openstacksdk>=0.99.0 # Apache-2.0 sushy>=5.7.0