Set the backend to threading

This change removes eventlet usage and explicitly invokes
threading instead of eventlet.

To do this, it also revises signal handling because this
change also moves from an all in one process to a multi-process
runtime model. The side effect of this is an increased memory
footprint, depending on the process launch model.

Change-Id: I184245d67edb1a2543aa24654836392f38777d71
Signed-off-by: Julia Kreger <juliaashleykreger@gmail.com>
This commit is contained in:
Julia Kreger
2025-06-27 16:27:23 -07:00
parent bedfd143eb
commit bbf57b1c2a
18 changed files with 236 additions and 163 deletions

View File

@@ -12,10 +12,19 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
eventlet.monkey_patch()
import os
import threading
from oslo_service import backend
backend.init_backend(backend.BackendType.THREADING)
from ironic.common import i18n # noqa
# NOTE(TheJulia): We are setting a default thread stack size to for all
# following thread invocations. Ultimately, while the python minimum is
# any positive number with a minimum of 32760 Bytes, in 4096 Byte
# increments, this appears to work well in basic benchmarking.
threading.stack_size(
os.environ.get('IRONIC_THREAD_STACK_SIZE', 65536))
i18n.install('ironic')

View File

@@ -25,6 +25,7 @@ from oslo_config import cfg
from oslo_log import log
from oslo_service import service
from ironic.command import utils as command_utils
from ironic.common import service as ironic_service
from ironic.common import utils
from ironic.conductor import rpc_service
@@ -146,13 +147,29 @@ def main():
issue_startup_warnings(CONF)
# Ultimately this returns a ServiceLauncher class which has a _manager
# object (cotyledon), and where launch_service has been invoked which
# adds an instance of the service to the _manager object.
launcher = service.launch(CONF, mgr, restart_method='mutate')
# NOTE(dtantsur): handling start-up failures before launcher.wait() helps
# notify systemd about them. Otherwise the launcher will report successful
# service start-up before checking the threads.
mgr.wait_for_start()
# The approach above also is for a single application where we then start a
# worker process.
# TODO(TheJulia): At this location, we're missing signaling, but where
# the signaling needs to be is past a spawn() call triggered by wait().
# What signaling here would make sense is signal handling to force this
# process to exit.
# TODO(TheJulia): So, the tl;dr of execution is wait() triggers the
# process launch, fork, and then other actions.
# This is in cotyledon where the Service Manager run() method is called
# https://github.com/sileht/cotyledon/blob/main/cotyledon/_service_manager.py#L240
# which then will wait forever and never return.
# Set override signals.
command_utils.handle_signal()
# Start the processes!
sys.exit(launcher.wait())

View File

@@ -17,6 +17,7 @@ from oslo_log import log
from oslo_service import service
from ironic.command import conductor as conductor_cmd
from ironic.command import utils
from ironic.common import service as ironic_service
from ironic.common import wsgi_service
from ironic.conductor import local_rpc
@@ -50,11 +51,6 @@ def main():
conductor_cmd.issue_startup_warnings(CONF)
launcher.launch_service(mgr)
# NOTE(dtantsur): handling start-up failures before launcher.wait() helps
# notify systemd about them. Otherwise the launcher will report successful
# service start-up before checking the threads.
mgr.wait_for_start()
wsgi = wsgi_service.WSGIService('ironic_api', CONF.api.enable_ssl_api)
launcher.launch_service(wsgi)
@@ -63,4 +59,8 @@ def main():
novncproxy = novncproxy_service.NoVNCProxyService()
launcher.launch_service(novncproxy)
# Register our signal overrides before launching the processes
utils.handle_signal()
# Start the processes!
sys.exit(launcher.wait())

60
ironic/command/utils.py Normal file
View File

@@ -0,0 +1,60 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import signal
from oslo_config import cfg
from oslo_log import log
from ironic.conductor import rpc_service
LOG = log.getLogger(__name__)
CONF = cfg.CONF
# NOTE(TheJulia): These methods are for the parent processes to
# glue behavior expectations to conductor shutdowns.
def handle_no_deregister(signo, frame):
LOG.info('Got signal SIGUSR1. Not deregistering on next shutdown '
'on host %(host)s.',
{'host': CONF.host})
rpc_service.DEREGISTER_ON_SHUTDOWN.clear()
def handle_drain(signo, frame):
LOG.info('Got signal SIGUSR2. Initiating a workload drain and '
'shutdown on host %(host)s.',
{'host': CONF.host})
rpc_service.DRAIN.set()
# NOTE(TheJulia): This is sort of aggressive, but it works.
# Issue in part is we need to trigger the child process to stop, and
# the manager shutdown method is for parent process calls, in other words
# the application triggering a self shutdown. Utlimately this triggers the
# application stop() method to be called.
os.kill(0, signal.SIGTERM)
def handle_signal():
"""Add a signal handler for SIGUSR1, SIGUSR2.
The SIGUSR1 handler ensures that the manager is not deregistered when
it is shutdown.
The SIGUSR2 handler starts a drain shutdown.
"""
signal.signal(signal.SIGUSR1, handle_no_deregister)
# In ironic, USR2 triggers a draining shutdown, we'll need to figure out
# how to do that in this model, most likely set a flag and request the
# manager to shutdown.
signal.signal(signal.SIGUSR2, handle_drain)

View File

@@ -14,8 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import sys
import time
from oslo_config import cfg
from oslo_log import log
@@ -45,13 +43,6 @@ class BaseRPCService(service.Service):
self._started = False
self._failure = None
def wait_for_start(self):
while not self._started and not self._failure:
time.sleep(0.1)
if self._failure:
LOG.critical(self._failure)
sys.exit(self._failure)
def start(self):
self._failure = None
self._started = False
@@ -64,9 +55,6 @@ class BaseRPCService(service.Service):
else:
self._started = True
def handle_signal(self):
pass
def _real_start(self):
admin_context = context.get_admin_context()
@@ -84,7 +72,6 @@ class BaseRPCService(service.Service):
if self.rpcserver is not None:
self.rpcserver.start()
self.handle_signal()
self.manager.init_host(admin_context)
LOG.info('Created RPC server with %(transport)s transport for service '

View File

@@ -59,7 +59,7 @@ class BaseConductorManager(object):
self.topic = topic
self.sensors_notifier = rpc.get_sensors_notifier()
self._started = False
self._shutdown = None
self._shutdown = threading.Event()
self._zeroconf = None
self.dbapi = None
@@ -167,7 +167,6 @@ class BaseConductorManager(object):
if self._started:
raise RuntimeError(_('Attempt to start an already running '
'conductor manager'))
self._shutdown = False
if not self.dbapi:
self.dbapi = dbapi.get_instance()
@@ -378,9 +377,9 @@ class BaseConductorManager(object):
if not hasattr(self, 'conductor'):
return
# the keepalive heartbeat greenthread will continue to run, but will
# the keepalive heartbeat thread will continue to run, but will
# now be setting online=False
self._shutdown = True
self._shutdown.set()
if clear_node_reservations:
# clear all locks held by this conductor before deregistering
@@ -399,11 +398,14 @@ class BaseConductorManager(object):
else:
LOG.info('Not deregistering conductor with hostname %(hostname)s.',
{'hostname': self.host})
# Stop keepalive operations
self.keepalive_halt()
# Waiting here to give workers the chance to finish. This has the
# benefit of releasing locks workers placed on nodes, as well as
# having work complete normally.
self._periodic_tasks.stop()
self._periodic_tasks.wait()
# Shutdown the reserved and normal executors.
if self._reserved_executor is not None:
self._reserved_executor.shutdown(wait=True)
self._executor.shutdown(wait=True)
@@ -496,7 +498,7 @@ class BaseConductorManager(object):
columns = ['uuid', 'driver', 'conductor_group'] + list(fields or ())
node_list = self.dbapi.get_nodeinfo_list(columns=columns, **kwargs)
for result in node_list:
if self._shutdown:
if self._shutdown.is_set():
break
if self._mapped_to_this_conductor(*result[:3]):
yield result
@@ -533,7 +535,7 @@ class BaseConductorManager(object):
return
while not self._keepalive_evt.is_set():
try:
self.conductor.touch(online=not self._shutdown)
self.conductor.touch(online=not self._shutdown.is_set())
except db_exception.DBConnectionError:
LOG.warning('Conductor could not connect to database '
'while heartbeating.')

View File

@@ -1612,7 +1612,7 @@ class ConductorManager(base_manager.BaseConductorManager):
# (through to its DB API call) so that we can eliminate our call
# and first set of checks below.
while not self._shutdown:
while not self._shutdown.is_set():
try:
(node_uuid, driver, conductor_group,
node_id) = nodes.get_nowait()
@@ -2753,7 +2753,7 @@ class ConductorManager(base_manager.BaseConductorManager):
@METRICS.timer('ConductorManager._sensors_nodes_task')
def _sensors_nodes_task(self, context, nodes):
"""Sends sensors data for nodes from synchronized queue."""
while not self._shutdown:
while not self._shutdown.is_set():
try:
(node_uuid, driver, conductor_group,
instance_uuid) = nodes.get_nowait()

View File

@@ -11,7 +11,7 @@
# under the License.
import datetime
import signal
import multiprocessing
import time
from oslo_config import cfg
@@ -25,13 +25,33 @@ from ironic.common import rpc_service
LOG = log.getLogger(__name__)
CONF = cfg.CONF
# NOTE(TheJulia): We set the following flags as it relates to process
# shutdown. Because in the post-eventlet model we now consist of a primary
# process and an application process with then threads, we *must* use
# multiprocessing because this needs to cross the process boundary where
# multiprocessing was invoked to launch new processes.
# Always set a flag for deregistering, when shutting down the primary
# process will clear the flag.
DEREGISTER_ON_SHUTDOWN = multiprocessing.Event()
DEREGISTER_ON_SHUTDOWN.set()
# Flag which can be set to indicate if we need to drain the conductor
# workload, or not. Set by the primary process when shutdown has been
# requested.
DRAIN = multiprocessing.Event()
class RPCService(rpc_service.BaseRPCService):
def __init__(self, host, manager_module, manager_class):
super().__init__(host, manager_module, manager_class)
self.deregister = True
self.draining = False
@property
def deregister_on_shutdown(self):
return DEREGISTER_ON_SHUTDOWN.is_set()
def is_draining(self):
return DRAIN.is_set()
def _real_start(self):
super()._real_start()
@@ -48,8 +68,9 @@ class RPCService(rpc_service.BaseRPCService):
seconds=CONF.hash_ring_reset_interval)
try:
self.manager.del_host(deregister=self.deregister,
clear_node_reservations=False)
self.manager.del_host(
deregister=self.deregister_on_shutdown,
clear_node_reservations=False)
except Exception as e:
LOG.exception('Service error occurred when cleaning up '
'the RPC manager. Error: %s', e)
@@ -104,7 +125,7 @@ class RPCService(rpc_service.BaseRPCService):
provider.stop_all_containers()
def _shutdown_timeout_reached(self, initial_time):
if self.draining:
if self.is_draining():
shutdown_timeout = CONF.drain_shutdown_timeout
else:
shutdown_timeout = CONF.conductor.graceful_shutdown_timeout
@@ -114,27 +135,3 @@ class RPCService(rpc_service.BaseRPCService):
shutdown_time = initial_time + datetime.timedelta(
seconds=shutdown_timeout)
return shutdown_time < timeutils.utcnow()
def _handle_no_deregister(self, signo, frame):
LOG.info('Got signal SIGUSR1. Not deregistering on next shutdown '
'of service %(service)s on host %(host)s.',
{'service': self.topic, 'host': self.host})
self.deregister = False
def _handle_drain(self, signo, frame):
LOG.info('Got signal SIGUSR2. Starting drain shutdown'
'of service %(service)s on host %(host)s.',
{'service': self.topic, 'host': self.host})
self.draining = True
self.stop()
def handle_signal(self):
"""Add a signal handler for SIGUSR1, SIGUSR2.
The SIGUSR1 handler ensures that the manager is not deregistered when
it is shutdown.
The SIGUSR2 handler starts a drain shutdown.
"""
signal.signal(signal.SIGUSR1, self._handle_no_deregister)
signal.signal(signal.SIGUSR2, self._handle_drain)

View File

@@ -28,11 +28,11 @@ from ironic.conf import types as ir_types
opts = [
cfg.IntOpt('workers_pool_size',
default=300, min=3,
help=_('The size of the workers greenthread pool. '
help=_('The size of the workers thread pool. '
'Note that 2 threads will be reserved by the conductor '
'itself for handling heart beats and periodic tasks. '
'On top of that, `sync_power_state_workers` will take '
'up to 7 green threads with the default value of 8.')),
'up to 7 threads with the default value of 8.')),
cfg.IntOpt('reserved_workers_pool_percentage',
default=5, min=0, max=50,
help=_('The percentage of the whole workers pool that will be '

View File

@@ -22,14 +22,9 @@
:platform: Unix
"""
import eventlet
# NOTE(JayF): We must green all python stdlib modules before anything else
# is imported for consistent behavior. For instance, sqlalchemy
# creates a threading.RLock early, and if it was imported before
eventlet.monkey_patch() # noqa
# NOTE(TheJulia): This is to force oslo_service from trying to use eventlet.
from oslo_service import backend
backend.init_backend(backend.BackendType.THREADING)
from oslo_config import cfg # noqa E402
from oslo_log import log # noqa E402

View File

@@ -167,7 +167,7 @@ class ServiceSetUpMixin(object):
ensure a similar piece goes into ironic/conductor/base_manager.py.
"""
self.service._shutdown = False
self.service._shutdown = threading.Event()
# Test class structure sets up self.dbapi, attaching it to
# self.service for executing code to be able to leverage

View File

@@ -268,9 +268,9 @@ class StartStopTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
def test_conductor_shutdown_flag(self):
self._start_service()
self.assertFalse(self.service._shutdown)
self.assertFalse(self.service._shutdown.is_set())
self.service.del_host()
self.assertTrue(self.service._shutdown)
self.assertTrue(self.service._shutdown.is_set())
@mock.patch.object(deploy_utils, 'get_ironic_api_url', autospec=True)
@mock.patch.object(mdns, 'Zeroconf', autospec=True)
@@ -456,6 +456,7 @@ class RegisterInterfacesTestCase(mgr_utils.ServiceSetUpMixin,
def setUp(self):
super(RegisterInterfacesTestCase, self).setUp()
self._start_service()
self._executor = futurist.SynchronousExecutor()
def test__register_and_validate_hardware_interfaces(self,
esi_mock,

View File

@@ -652,7 +652,6 @@ class DoNextDeployStepTestCase(mgr_utils.ServiceSetUpMixin,
'step': 'deploy_middle', 'priority': 30, 'interface': 'deploy'}
def test__do_next_deploy_step_none(self):
self._start_service()
node = obj_utils.create_test_node(self.context, driver='fake-hardware')
task = task_manager.TaskManager(self.context, node.uuid)
task.process_event('deploy')
@@ -667,7 +666,6 @@ class DoNextDeployStepTestCase(mgr_utils.ServiceSetUpMixin,
def test__do_next_deploy_step_async(self, mock_execute):
driver_internal_info = {'deploy_step_index': None,
'deploy_steps': self.deploy_steps}
self._start_service()
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
driver_internal_info=driver_internal_info,
@@ -693,7 +691,6 @@ class DoNextDeployStepTestCase(mgr_utils.ServiceSetUpMixin,
driver_internal_info = {'deploy_step_index': None,
'deploy_steps': self.deploy_steps,
'deployment_polling': True}
self._start_service()
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
driver_internal_info=driver_internal_info,
@@ -727,7 +724,6 @@ class DoNextDeployStepTestCase(mgr_utils.ServiceSetUpMixin,
# Resume an in-progress deploy after the first async step
driver_internal_info = {'deploy_step_index': 0,
'deploy_steps': self.deploy_steps}
self._start_service()
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.DEPLOYWAIT,
@@ -760,7 +756,6 @@ class DoNextDeployStepTestCase(mgr_utils.ServiceSetUpMixin,
# Resume where last_step is the last deploy step that was executed
driver_internal_info = {'deploy_step_index': 1,
'deploy_steps': self.deploy_steps}
self._start_service()
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.DEPLOYWAIT,
@@ -840,7 +835,6 @@ class DoNextDeployStepTestCase(mgr_utils.ServiceSetUpMixin,
'deploy_steps': self.deploy_steps[:1],
'agent_url': 'url',
'agent_secret_token': 'token'}
self._start_service()
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
driver_internal_info=driver_internal_info,
@@ -916,7 +910,6 @@ class DoNextDeployStepTestCase(mgr_utils.ServiceSetUpMixin,
# When a deploy step fails, go to DEPLOYFAIL
driver_internal_info = {'deploy_step_index': None,
'deploy_steps': self.deploy_steps}
self._start_service()
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
@@ -1023,7 +1016,6 @@ class DoNextDeployStepTestCase(mgr_utils.ServiceSetUpMixin,
# When a deploy step fails, go to DEPLOYWAIT
tgt_prov_state = states.ACTIVE
self._start_service()
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.DEPLOYING,
@@ -1057,7 +1049,6 @@ class DoNextDeployStepTestCase(mgr_utils.ServiceSetUpMixin,
# When a deploy step fails, go to DEPLOYWAIT
tgt_prov_state = states.ACTIVE
self._start_service()
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.DEPLOYING,
@@ -1091,7 +1082,6 @@ class DoNextDeployStepTestCase(mgr_utils.ServiceSetUpMixin,
# When a deploy step fails with no reboot requested go to DEPLOYFAIL
tgt_prov_state = states.ACTIVE
self._start_service()
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.DEPLOYING,

View File

@@ -25,6 +25,7 @@ import re
import time
from unittest import mock
import futurist
from futurist import waiters
from oslo_config import cfg
import oslo_messaging as messaging
@@ -1780,6 +1781,10 @@ class VendorPassthruTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
@mock.patch.object(images, 'is_whole_disk_image', autospec=True)
class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
db_base.DbTestCase):
class fake_conductor:
id = 1
def test_do_node_deploy_invalid_state(self, mock_iwdi):
mock_iwdi.return_value = False
self._start_service()
@@ -1817,14 +1822,17 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
self.assertFalse(mock_iwdi.called)
def _test_do_node_deploy_validate_fail(self, mock_validate, mock_iwdi):
self._start_service()
mock_iwdi.return_value = False
# InvalidParameterValue should be re-raised as InstanceDeployFailure
mock_validate.side_effect = exception.InvalidParameterValue('error')
node = obj_utils.create_test_node(self.context, driver='fake-hardware')
exc = self.assertRaises(messaging.rpc.ExpectedException,
self.service.do_node_deploy,
self.context, node.uuid)
with mock.patch.object(self.service, '_concurrent_action_limit',
autospec=True) as mock_ca_limit:
exc = self.assertRaises(messaging.rpc.ExpectedException,
self.service.do_node_deploy,
self.context, node.uuid)
mock_ca_limit.assert_called_once_with(action='provisioning')
# Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.InstanceDeployFailure, exc.exc_info[0])
self.assertEqual(exc.exc_info[1].code, 400)
@@ -2046,13 +2054,15 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
def test_do_node_deploy_rebuild_from_available_state(self, mock_iwdi):
mock_iwdi.return_value = False
self._start_service()
# test node will not rebuild if state is AVAILABLE
node = obj_utils.create_test_node(self.context, driver='fake-hardware',
provision_state=states.AVAILABLE)
exc = self.assertRaises(messaging.rpc.ExpectedException,
self.service.do_node_deploy,
self.context, node['uuid'], rebuild=True)
with mock.patch.object(self.service, '_concurrent_action_limit',
autospec=True) as mock_ca_limit:
exc = self.assertRaises(messaging.rpc.ExpectedException,
self.service.do_node_deploy,
self.context, node['uuid'], rebuild=True)
mock_ca_limit.assert_called_once_with(action='provisioning')
# Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.InvalidStateRequested, exc.exc_info[0])
# Last_error should be None.
@@ -2064,13 +2074,15 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
def test_do_node_deploy_rebuild_protected(self, mock_iwdi):
mock_iwdi.return_value = False
self._start_service()
node = obj_utils.create_test_node(self.context, driver='fake-hardware',
provision_state=states.ACTIVE,
protected=True)
exc = self.assertRaises(messaging.rpc.ExpectedException,
self.service.do_node_deploy,
self.context, node['uuid'], rebuild=True)
with mock.patch.object(self.service, '_concurrent_action_limit',
autospec=True) as mock_ca_limit:
exc = self.assertRaises(messaging.rpc.ExpectedException,
self.service.do_node_deploy,
self.context, node['uuid'], rebuild=True)
mock_ca_limit.assert_called_once_with(action='provisioning')
# Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.NodeProtected, exc.exc_info[0])
# Last_error should be None.
@@ -2094,9 +2106,12 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
autospec=True) as mock_spawn:
mock_spawn.side_effect = exception.NoFreeConductorWorker()
exc = self.assertRaises(messaging.rpc.ExpectedException,
self.service.do_node_deploy,
self.context, node.uuid)
with mock.patch.object(self.service, '_concurrent_action_limit',
autospec=True) as mock_ca_limit:
exc = self.assertRaises(messaging.rpc.ExpectedException,
self.service.do_node_deploy,
self.context, node.uuid)
mock_ca_limit.assert_called_once_with(action='provisioning')
# Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.NoFreeConductorWorker, exc.exc_info[0])
node.refresh()
@@ -3442,7 +3457,6 @@ class DoNodeRescueTestCase(mgr_utils.CommonMixIn, mgr_utils.ServiceSetUpMixin,
])
def test_do_node_rescue_invalid_state(self):
self._start_service()
node = obj_utils.create_test_node(self.context, driver='fake-hardware',
network_interface='noop',
provision_state=states.AVAILABLE,
@@ -3504,7 +3518,6 @@ class DoNodeRescueTestCase(mgr_utils.CommonMixIn, mgr_utils.ServiceSetUpMixin,
@mock.patch('ironic.drivers.modules.fake.FakeRescue.rescue', autospec=True)
def test__do_node_rescue_returns_rescuewait(self, mock_rescue):
self._start_service()
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.RESCUING,
@@ -3521,7 +3534,6 @@ class DoNodeRescueTestCase(mgr_utils.CommonMixIn, mgr_utils.ServiceSetUpMixin,
@mock.patch('ironic.drivers.modules.fake.FakeRescue.rescue', autospec=True)
def test__do_node_rescue_returns_rescue(self, mock_rescue):
self._start_service()
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.RESCUING,
@@ -3540,7 +3552,6 @@ class DoNodeRescueTestCase(mgr_utils.CommonMixIn, mgr_utils.ServiceSetUpMixin,
@mock.patch.object(manager, 'LOG', autospec=True)
@mock.patch('ironic.drivers.modules.fake.FakeRescue.rescue', autospec=True)
def test__do_node_rescue_errors(self, mock_rescue, mock_log):
self._start_service()
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.RESCUING,
@@ -3563,7 +3574,6 @@ class DoNodeRescueTestCase(mgr_utils.CommonMixIn, mgr_utils.ServiceSetUpMixin,
@mock.patch.object(manager, 'LOG', autospec=True)
@mock.patch('ironic.drivers.modules.fake.FakeRescue.rescue', autospec=True)
def test__do_node_rescue_bad_state(self, mock_rescue, mock_log):
self._start_service()
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.RESCUING,
@@ -3583,7 +3593,6 @@ class DoNodeRescueTestCase(mgr_utils.CommonMixIn, mgr_utils.ServiceSetUpMixin,
@mock.patch('ironic.conductor.task_manager.acquire', autospec=True)
def test_do_node_unrescue(self, mock_acquire):
self._start_service()
task = self._create_task(
node_attrs=dict(driver='fake-hardware',
provision_state=states.RESCUE,
@@ -3601,7 +3610,6 @@ class DoNodeRescueTestCase(mgr_utils.CommonMixIn, mgr_utils.ServiceSetUpMixin,
err_handler=conductor_utils.provisioning_error_handler)
def test_do_node_unrescue_invalid_state(self):
self._start_service()
node = obj_utils.create_test_node(self.context, driver='fake-hardware',
provision_state=states.AVAILABLE)
exc = self.assertRaises(messaging.rpc.ExpectedException,
@@ -3643,7 +3651,6 @@ class DoNodeRescueTestCase(mgr_utils.CommonMixIn, mgr_utils.ServiceSetUpMixin,
@mock.patch('ironic.drivers.modules.fake.FakeRescue.unrescue',
autospec=True)
def test__do_node_unrescue(self, mock_unrescue):
self._start_service()
dii = {'agent_url': 'http://url',
'agent_secret_token': 'token',
'other field': 'value'}
@@ -3665,7 +3672,6 @@ class DoNodeRescueTestCase(mgr_utils.CommonMixIn, mgr_utils.ServiceSetUpMixin,
@mock.patch('ironic.drivers.modules.fake.FakeRescue.unrescue',
autospec=True)
def test__do_node_unrescue_ironic_error(self, mock_unrescue, mock_log):
self._start_service()
node = obj_utils.create_test_node(self.context, driver='fake-hardware',
provision_state=states.UNRESCUING,
target_provision_state=states.ACTIVE,
@@ -3685,7 +3691,6 @@ class DoNodeRescueTestCase(mgr_utils.CommonMixIn, mgr_utils.ServiceSetUpMixin,
@mock.patch('ironic.drivers.modules.fake.FakeRescue.unrescue',
autospec=True)
def test__do_node_unrescue_other_error(self, mock_unrescue, mock_log):
self._start_service()
node = obj_utils.create_test_node(self.context, driver='fake-hardware',
provision_state=states.UNRESCUING,
target_provision_state=states.ACTIVE,
@@ -3703,7 +3708,6 @@ class DoNodeRescueTestCase(mgr_utils.CommonMixIn, mgr_utils.ServiceSetUpMixin,
@mock.patch('ironic.drivers.modules.fake.FakeRescue.unrescue',
autospec=True)
def test__do_node_unrescue_bad_state(self, mock_unrescue):
self._start_service()
node = obj_utils.create_test_node(self.context, driver='fake-hardware',
provision_state=states.UNRESCUING,
instance_info={})
@@ -3723,7 +3727,6 @@ class DoNodeRescueTestCase(mgr_utils.CommonMixIn, mgr_utils.ServiceSetUpMixin,
provision_state=states.RESCUEWAIT,
target_provision_state=states.RESCUE,
instance_info={'rescue_password': 'password'})
self._start_service()
self.service.do_provisioning_action(self.context, node.uuid, 'abort')
node.refresh()
self.assertEqual(states.RESCUEFAIL, node.provision_state)
@@ -3917,11 +3920,12 @@ class MiscTestCase(mgr_utils.ServiceSetUpMixin, mgr_utils.CommonMixIn,
nodes = [self._create_node(driver='fake-hardware')]
mock_nodeinfo_list.return_value = self._get_nodeinfo_list_response(
nodes)
self.service._shutdown = True
self.service._shutdown.set()
result = list(self.service.iter_nodes(fields=['id'],
filters=mock.sentinel.filters))
self.assertEqual([], result)
self.service._shutdown.clear()
def test_get_node_with_token(self):
node = obj_utils.create_test_node(
@@ -3971,7 +3975,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
autospec=True)
def test_set_console_mode_enabled(self, mock_notify):
node = obj_utils.create_test_node(self.context, driver='fake-hardware')
self._start_service()
self.service._executor = futurist.SynchronousExecutor()
self.service.set_console_mode(self.context, node.uuid, True)
node.refresh()
self.assertTrue(node.console_enabled)
@@ -3986,7 +3990,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
def test_set_console_mode_disabled(self, mock_notify):
node = obj_utils.create_test_node(self.context, driver='fake-hardware',
console_enabled=True)
self._start_service()
self.service._executor = futurist.SynchronousExecutor()
self.service.set_console_mode(self.context, node.uuid, False)
node.refresh()
self.assertFalse(node.console_enabled)
@@ -4000,7 +4004,6 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
def test_set_console_mode_validation_fail(self, mock_val):
node = obj_utils.create_test_node(self.context, driver='fake-hardware',
last_error=None)
self._start_service()
mock_val.side_effect = exception.InvalidParameterValue('error')
exc = self.assertRaises(messaging.rpc.ExpectedException,
self.service.set_console_mode,
@@ -4015,7 +4018,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
node = obj_utils.create_test_node(self.context, driver='fake-hardware',
last_error=None,
console_enabled=False)
self._start_service()
self.service._executor = futurist.SynchronousExecutor()
mock_sc.side_effect = exception.IronicException('test-error')
self.service.set_console_mode(self.context, node.uuid, True)
mock_sc.assert_called_once_with(mock.ANY, mock.ANY)
@@ -4034,7 +4037,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
node = obj_utils.create_test_node(self.context, driver='fake-hardware',
last_error=None,
console_enabled=True)
self._start_service()
self.service._executor = futurist.SynchronousExecutor()
mock_sc.side_effect = exception.IronicException('test-error')
self.service.set_console_mode(self.context, node.uuid, False)
mock_sc.assert_called_once_with(mock.ANY, mock.ANY)
@@ -4052,7 +4055,7 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
def test_enable_console_already_enabled(self, mock_notify, mock_sc):
node = obj_utils.create_test_node(self.context, driver='fake-hardware',
console_enabled=True)
self._start_service()
self.service._executor = futurist.SynchronousExecutor()
self.service.set_console_mode(self.context, node.uuid, True)
self.assertFalse(mock_sc.called)
self.assertFalse(mock_notify.called)
@@ -4063,7 +4066,6 @@ class ConsoleTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
def test_disable_console_already_disabled(self, mock_notify, mock_sc):
node = obj_utils.create_test_node(self.context, driver='fake-hardware',
console_enabled=False)
self._start_service()
self.service.set_console_mode(self.context, node.uuid, False)
self.assertFalse(mock_sc.called)
self.assertFalse(mock_notify.called)
@@ -4703,11 +4705,12 @@ class SensorsTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
nodes = queue.Queue()
nodes.put_nowait(('fake_uuid', 'fake-hardware', '', None))
self._start_service()
self.service._shutdown = True
self.service._shutdown.set()
CONF.set_override('send_sensor_data', True,
group='sensor_data')
self.service._sensors_nodes_task(self.context, nodes)
acquire_mock.return_value.__enter__.assert_not_called()
self.service._shutdown.clear()
@mock.patch.object(task_manager, 'acquire', autospec=True)
def test_send_sensor_task_no_management(self, acquire_mock):
@@ -6556,7 +6559,6 @@ class ManagerTestProperties(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
def _check_driver_properties(self, hw_type, expected, agent_common=True,
pxe_common=True):
self._start_service()
properties = self.service.get_driver_properties(self.context, hw_type)
if agent_common:
expected.extend(['agent_verify_ca',
@@ -6726,7 +6728,6 @@ class GetStepsForAutomatedCleaningTestCase(mgr_utils.ServiceSetUpMixin,
def setUp(self):
super(GetStepsForAutomatedCleaningTestCase, self).setUp()
self._start_service()
self.node = obj_utils.create_test_node(
self.context,
driver='fake-hardware',

View File

@@ -67,7 +67,6 @@ class TestRPCService(db_base.DbTestCase):
self.assertIs(rpc.GLOBAL_MANAGER, self.rpc_svc.manager)
self.assertTrue(self.rpc_svc._started)
self.assertFalse(self.rpc_svc._failure)
self.rpc_svc.wait_for_start() # should be no-op
@mock.patch.object(manager.ConductorManager, 'prepare_host', autospec=True)
@mock.patch.object(oslo_messaging, 'Target', autospec=True)
@@ -93,7 +92,6 @@ class TestRPCService(db_base.DbTestCase):
self.assertIsNone(rpc.GLOBAL_MANAGER)
self.assertFalse(self.rpc_svc._started)
self.assertIn("boom", self.rpc_svc._failure)
self.assertRaises(SystemExit, self.rpc_svc.wait_for_start)
@mock.patch.object(timeutils, 'utcnow', autospec=True)
@mock.patch.object(time, 'sleep', autospec=True)
@@ -206,30 +204,6 @@ class TestRPCService(db_base.DbTestCase):
mock_sleep.assert_has_calls(
[mock.call(15), mock.call(1), mock.call(1)])
@mock.patch.object(timeutils, 'utcnow', autospec=True)
@mock.patch.object(time, 'sleep', autospec=True)
def test_drain_has_reserved(self, mock_sleep, mock_utcnow):
mock_utcnow.return_value = datetime.datetime(2023, 2, 2, 21, 10, 0)
conductor1 = db_utils.get_test_conductor(hostname='fake_host')
conductor2 = db_utils.get_test_conductor(hostname='other_fake_host')
with mock.patch.object(self.dbapi, 'get_online_conductors',
autospec=True) as mock_cond_list:
# multiple conductors, so wait for hash_ring_reset_interval
mock_cond_list.return_value = [conductor1, conductor2]
with mock.patch.object(self.dbapi, 'get_nodeinfo_list',
autospec=True) as mock_nodeinfo_list:
# 3 calls to manager has_reserved until all reservation locks
# are released
mock_nodeinfo_list.side_effect = [['a', 'b'], ['a'], []]
self.rpc_svc._handle_drain(None, None)
self.assertEqual(3, mock_nodeinfo_list.call_count)
# wait the remaining 15 seconds, then wait until has_reserved
# returns False
mock_sleep.assert_has_calls(
[mock.call(15), mock.call(1), mock.call(1)])
@mock.patch.object(timeutils, 'utcnow', autospec=True)
def test_shutdown_timeout_reached(self, mock_utcnow):
@@ -245,14 +219,24 @@ class TestRPCService(db_base.DbTestCase):
mock_utcnow.return_value = after_graceful
self.assertTrue(self.rpc_svc._shutdown_timeout_reached(initial_time))
self.rpc_svc.draining = True
self.assertFalse(self.rpc_svc._shutdown_timeout_reached(initial_time))
with mock.patch.object(self.rpc_svc, 'is_draining',
return_value=True,
autospec=True) as mock_drain:
self.assertFalse(
self.rpc_svc._shutdown_timeout_reached(initial_time))
self.assertEqual(1, mock_drain.call_count)
mock_utcnow.return_value = before_drain
self.assertFalse(self.rpc_svc._shutdown_timeout_reached(initial_time))
mock_utcnow.return_value = before_drain
self.assertFalse(
self.rpc_svc._shutdown_timeout_reached(initial_time))
self.assertEqual(2, mock_drain.call_count)
mock_utcnow.return_value = after_drain
self.assertTrue(self.rpc_svc._shutdown_timeout_reached(initial_time))
mock_utcnow.return_value = after_drain
self.assertTrue(
self.rpc_svc._shutdown_timeout_reached(initial_time))
self.assertEqual(3, mock_drain.call_count)
CONF.set_override('drain_shutdown_timeout', 0)
self.assertFalse(self.rpc_svc._shutdown_timeout_reached(initial_time))
CONF.set_override('drain_shutdown_timeout', 0)
self.assertFalse(
self.rpc_svc._shutdown_timeout_reached(initial_time))
self.assertEqual(4, mock_drain.call_count)

View File

@@ -46,7 +46,6 @@ class DoNodeVerifyTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
autospec=True)
def test__do_node_verify(self, mock_validate, mock_get_power_state,
mock_notif, mock_cache_vendor):
self._start_service()
mock_get_power_state.return_value = states.POWER_OFF
# Required for exception handling
mock_notif.__name__ = 'NodeCorrectedPowerStateNotification'
@@ -85,7 +84,6 @@ class DoNodeVerifyTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
autospec=True)
def test__do_node_verify_validation_fails(self, mock_validate,
mock_get_power_state):
self._start_service()
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.VERIFYING,
@@ -114,20 +112,17 @@ class DoNodeVerifyTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
autospec=True)
def test__do_node_verify_get_state_fails(self, mock_validate,
mock_get_power_state):
self._start_service()
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.VERIFYING,
target_provision_state=states.MANAGEABLE,
last_error=None,
power_state=states.NOSTATE)
mock_get_power_state.side_effect = RuntimeError("boom")
with task_manager.acquire(
self.context, node['id'], shared=False) as task:
verify.do_node_verify(task)
node.refresh()
mock_get_power_state.assert_called_once_with(mock.ANY, task)

View File

@@ -0,0 +1,34 @@
---
features:
- |
Ironic now utilizes native OS threads for all periodic and background
activities. This change is not configurable, and should any issues be
observed with Ironic's operation, please do not hesitate to report them
to the developer community.
critical:
- |
With the move to the use of native threading, the required memory
footprint of Ironic has increased. This is a result of both threading
being used combined with process model changes which were necessary
to remove the ``eventlet`` library from Ironic. The new process model
consists of a launcher process with a sub-process which represents the
actual application workload. In the case of single-process Ironic,
a single launcher process with two workers is what operators should
expect.
As for memory utilization, it is generally tracked by two
measurements. A Virtual Memory Size (VSZ) which represents memory an
application has allocated, but may not actively using. The more relevant
masurement is the Resident Set Size (RSS), which is the amount of memory
in actual use. Due to the process changes, Operators can expect a 2-3x
increase in the amount overall system memory in use (RSS). The VSZ
can be expected to be upwards of 10x larger, and will closely
track with the current number of active threads being executed upon.
The Ironic project is aware this may require some operator tuning of
resource constraints applied to Ironic. However, the improved
concurrency and performance should be an appropriate trade-off.
fixes:
- |
The ``eventlet`` library is no longer invoked by Ironic for the management
of threads.

View File

@@ -18,7 +18,7 @@ oslo.log>=4.3.0 # Apache-2.0
oslo.middleware>=3.31.0 # Apache-2.0
oslo.policy>=4.5.0 # Apache-2.0
oslo.serialization>=2.25.0 # Apache-2.0
oslo.service>=1.24.0 # Apache-2.0
oslo.service>=4.2.1 # Apache-2.0
oslo.upgradecheck>=1.3.0 # Apache-2.0
oslo.utils>=8.0.0 # Apache-2.0
osprofiler>=1.5.0 # Apache-2.0
@@ -49,3 +49,4 @@ websockify>=0.9.0 # LGPLv3
PyYAML>=6.0.2 # MIT
cheroot>=10.0.1 # BSD
cryptography>=2.3 # BSD/Apache-2.0
cotyledon>=2.0.0 # Apache-2.0