From 534c2dfdaa839ed7b30d1c2ce6d53fb8954b2be2 Mon Sep 17 00:00:00 2001 From: elajkat Date: Thu, 20 Feb 2025 18:20:47 +0100 Subject: [PATCH] [eventlet-removal] Remove eventlet from DHCP agent Related-Bug: #2087944 Change-Id: I22a275d84584f8e0c9b1adb90ee5b5f22fe908c5 --- neutron/agent/dhcp/agent.py | 46 +++++++++++++-------- neutron/tests/unit/agent/dhcp/test_agent.py | 38 ++++++++++------- 2 files changed, 51 insertions(+), 33 deletions(-) diff --git a/neutron/agent/dhcp/agent.py b/neutron/agent/dhcp/agent.py index 68a61aea236..f4fc5ed69f9 100644 --- a/neutron/agent/dhcp/agent.py +++ b/neutron/agent/dhcp/agent.py @@ -14,11 +14,12 @@ # under the License. import collections +import concurrent.futures import functools import os import threading +import time -import eventlet from neutron_lib.agent import constants as agent_consts from neutron_lib.agent import topics from neutron_lib import constants @@ -132,7 +133,6 @@ class DhcpAgent(manager.Manager): self._process_monitor = external_process.ProcessMonitor( config=self.conf, resource_type='dhcp') - self._pool = eventlet.GreenPool(1) self._queue = queue.ResourceProcessingQueue() self._network_bulk_allocations = {} # Each dhcp-agent restart should trigger a restart of all @@ -176,9 +176,12 @@ class DhcpAgent(manager.Manager): """Activate the DHCP agent.""" self.periodic_resync() self.start_ready_ports_loop() - eventlet.spawn_n(self._process_loop) + pr_loop_thread = threading.Thread(target=self._process_loop) + pr_loop_thread.start() if self.conf.bulk_reload_interval: - eventlet.spawn_n(self._reload_bulk_allocations) + bulk_thread = threading.Thread( + target=self._reload_bulk_allocations) + bulk_thread.start() def _reload_bulk_allocations(self): while True: @@ -191,7 +194,7 @@ class DhcpAgent(manager.Manager): network = self.cache.get_network_by_id(network_id) if network is not None: self.call_driver('bulk_reload_allocations', network) - eventlet.greenthread.sleep(self.conf.bulk_reload_interval) + time.sleep(self.conf.bulk_reload_interval) def call_driver(self, action, network, **action_kwargs): sid_segment = {} @@ -288,7 +291,7 @@ class DhcpAgent(manager.Manager): # This helps prevent one thread from acquiring the same lock over and # over again, in which case no other threads waiting on the # "dhcp-agent" lock would make any progress. - eventlet.greenthread.sleep(0) + time.sleep(0) @_sync_lock def sync_state(self, networks=None): @@ -297,7 +300,6 @@ class DhcpAgent(manager.Manager): """ only_nets = set([] if (not networks or None in networks) else networks) LOG.info('Synchronizing state') - pool = eventlet.GreenPool(self.conf.num_sync_threads) known_network_ids = set(self.cache.get_network_ids()) try: @@ -313,12 +315,19 @@ class DhcpAgent(manager.Manager): LOG.exception('Unable to sync network state on ' 'deleted network %s', deleted_id) - for network in active_networks: - if (not only_nets or # specifically resync all - network.id not in known_network_ids or # missing net - network.id in only_nets): # specific network to sync - pool.spawn(self.safe_configure_dhcp_for_network, network) - pool.waitall() + # Should we have max_executors set implicitly? + with concurrent.futures.ThreadPoolExecutor() as net_cfg_executor: + cfg_dhcp_for_net = [] + for network in active_networks: + if (not only_nets or # specifically resync all + # missing net + network.id not in known_network_ids or + # specific network to sync + network.id in only_nets): + cfg_dhcp_for_net.append(net_cfg_executor.submit( + self.safe_configure_dhcp_for_network, + network)) + concurrent.futures.wait(cfg_dhcp_for_net) # we notify all ports in case some were created while the agent # was down self.dhcp_ready_ports |= set(self.cache.get_port_ids(only_nets)) @@ -364,12 +373,12 @@ class DhcpAgent(manager.Manager): self.dhcp_ready_ports |= ports_to_send while True: - eventlet.sleep(0.2) + time.sleep(0.2) dhcp_ready_ports_loop() def start_ready_ports_loop(self): """Spawn a thread to push changed ports to server.""" - eventlet.spawn(self._dhcp_ready_ports_loop) + threading.Thread(target=self._dhcp_ready_ports_loop).start() @utils.exception_logger() def _periodic_resync_helper(self): @@ -401,7 +410,8 @@ class DhcpAgent(manager.Manager): def periodic_resync(self): """Spawn a thread to periodically resync the dhcp state.""" - eventlet.spawn(self._periodic_resync_helper) + resync_thread = threading.Thread(target=self._periodic_resync_event) + resync_thread.start() def safe_get_network_info(self, network_id): try: @@ -576,7 +586,9 @@ class DhcpAgent(manager.Manager): LOG.debug("Starting _process_loop") while True: - self._pool.spawn_n(self._process_resource_update) + pr_thread = threading.Thread(target=self._process_resource_update) + pr_thread.start() + pr_thread.join() def _process_resource_update(self): for tmp, update in self._queue.each_update_to_next_resource(): diff --git a/neutron/tests/unit/agent/dhcp/test_agent.py b/neutron/tests/unit/agent/dhcp/test_agent.py index d2c7bc93c89..023dce58058 100644 --- a/neutron/tests/unit/agent/dhcp/test_agent.py +++ b/neutron/tests/unit/agent/dhcp/test_agent.py @@ -18,10 +18,10 @@ import copy import datetime import signal import sys +import time from unittest import mock import uuid -import eventlet from neutron_lib.agent import constants as agent_consts from neutron_lib import constants as const from neutron_lib import exceptions @@ -337,7 +337,7 @@ class TestDhcpAgent(base.BaseTestCase): common_config.init(sys.argv[1:]) agent_mgr = dhcp_agent.DhcpAgentWithStateReport( 'testhost') - eventlet.greenthread.sleep(1) + time.sleep(1) agent_mgr.after_start() mock_periodic_resync.assert_called_once_with(agent_mgr) mock_start_ready.assert_called_once_with(agent_mgr) @@ -354,12 +354,13 @@ class TestDhcpAgent(base.BaseTestCase): ['periodic_resync', 'start_ready_ports_loop', '_process_loop']} with mock.patch.multiple(dhcp, **attrs_to_mock) as mocks: - with mock.patch.object(dhcp_agent.eventlet, - 'spawn_n') as spawn_n: + with mock.patch.object(dhcp_agent.threading, + 'Thread') as mock_thread: dhcp.run() mocks['periodic_resync'].assert_called_once_with() mocks['start_ready_ports_loop'].assert_called_once_with() - spawn_n.assert_called_once_with(mocks['_process_loop']) + mock_thread.assert_called_once_with( + target=mocks['_process_loop']) def test_call_driver(self): network = mock.MagicMock() @@ -498,11 +499,15 @@ class TestDhcpAgent(base.BaseTestCase): self._test_sync_state_helper(['b'], ['a']) def test_sync_state_waitall(self): - with mock.patch.object(dhcp_agent.eventlet.GreenPool, 'waitall') as w: + with mock.patch.object(dhcp_agent.concurrent.futures, 'wait') as w: active_net_ids = ['1', '2', '3', '4', '5'] known_net_ids = ['1', '2', '3', '4', '5'] self._test_sync_state_helper(known_net_ids, active_net_ids) - w.assert_called_once_with() + # Expect concurrent.futures + wait_calls = [ + mock.call([mock.ANY, mock.ANY, mock.ANY, mock.ANY, mock.ANY]) + ] + w.assert_has_calls(calls=wait_calls) def test_sync_state_for_all_networks_plugin_error(self): with mock.patch(DHCP_PLUGIN) as plug: @@ -537,15 +542,16 @@ class TestDhcpAgent(base.BaseTestCase): def test_periodic_resync(self): dhcp = dhcp_agent.DhcpAgent(HOSTNAME) - with mock.patch.object(dhcp_agent.eventlet, 'spawn') as spawn: + with mock.patch.object(dhcp_agent.threading, "Thread") as mock_thread: dhcp.periodic_resync() - spawn.assert_called_once_with(dhcp._periodic_resync_helper) + mock_thread.assert_called_once() def test_start_ready_ports_loop(self): dhcp = dhcp_agent.DhcpAgent(HOSTNAME) - with mock.patch.object(dhcp_agent.eventlet, 'spawn') as spawn: + with mock.patch.object(dhcp_agent.threading, 'Thread') as mock_thread: dhcp.start_ready_ports_loop() - spawn.assert_called_once_with(dhcp._dhcp_ready_ports_loop) + mock_thread.assert_called_once_with( + target=dhcp._dhcp_ready_ports_loop) def test__dhcp_ready_ports_doesnt_log_exception_on_timeout(self): dhcp = dhcp_agent.DhcpAgent(HOSTNAME) @@ -554,7 +560,7 @@ class TestDhcpAgent(base.BaseTestCase): with mock.patch.object(dhcp.plugin_rpc, 'dhcp_ready_on_ports', side_effect=oslo_messaging.MessagingTimeout): # exit after 2 iterations - with mock.patch.object(dhcp_agent.eventlet, 'sleep', + with mock.patch.object(dhcp_agent.time, 'sleep', side_effect=[0, 0, RuntimeError]): with mock.patch.object(dhcp_agent.LOG, 'exception') as lex: with testtools.ExpectedException(RuntimeError): @@ -568,7 +574,7 @@ class TestDhcpAgent(base.BaseTestCase): with mock.patch.object(dhcp.plugin_rpc, 'dhcp_ready_on_ports', side_effect=[RuntimeError, 0]) as ready: # exit after 2 iterations - with mock.patch.object(dhcp_agent.eventlet, 'sleep', + with mock.patch.object(dhcp_agent.time, 'sleep', side_effect=[0, 0, RuntimeError]): with testtools.ExpectedException(RuntimeError): dhcp._dhcp_ready_ports_loop() @@ -584,7 +590,7 @@ class TestDhcpAgent(base.BaseTestCase): with mock.patch.object(dhcp.plugin_rpc, 'dhcp_ready_on_ports') as ready: # exit after 2 iterations - with mock.patch.object(dhcp_agent.eventlet, 'sleep', + with mock.patch.object(dhcp_agent.time, 'sleep', side_effect=[0, 0, RuntimeError]): with testtools.ExpectedException(RuntimeError): dhcp._dhcp_ready_ports_loop() @@ -612,7 +618,7 @@ class TestDhcpAgent(base.BaseTestCase): with mock.patch.object(dhcp.plugin_rpc, 'dhcp_ready_on_ports') as ready: # exit after 1 iteration - with mock.patch.object(dhcp_agent.eventlet, 'sleep', + with mock.patch.object(dhcp_agent.time, 'sleep', side_effect=[0, RuntimeError]): with testtools.ExpectedException(RuntimeError): dhcp._dhcp_ready_ports_loop() @@ -633,7 +639,7 @@ class TestDhcpAgent(base.BaseTestCase): with mock.patch.object(dhcp.plugin_rpc, 'dhcp_ready_on_ports') as ready: # exit after 1 iteration - with mock.patch.object(dhcp_agent.eventlet, 'sleep', + with mock.patch.object(dhcp_agent.time, 'sleep', side_effect=[0, RuntimeError]): with testtools.ExpectedException(RuntimeError): dhcp._dhcp_ready_ports_loop()