Merge "[eventlet-removal] Remove eventlet from DHCP agent"

This commit is contained in:
Zuul
2025-02-28 16:48:49 +00:00
committed by Gerrit Code Review
2 changed files with 51 additions and 33 deletions

View File

@@ -14,11 +14,12 @@
# under the License.
import collections
import concurrent.futures
import functools
import os
import threading
import time
import eventlet
from neutron_lib.agent import constants as agent_consts
from neutron_lib.agent import topics
from neutron_lib import constants
@@ -132,7 +133,6 @@ class DhcpAgent(manager.Manager):
self._process_monitor = external_process.ProcessMonitor(
config=self.conf,
resource_type='dhcp')
self._pool = eventlet.GreenPool(1)
self._queue = queue.ResourceProcessingQueue()
self._network_bulk_allocations = {}
# Each dhcp-agent restart should trigger a restart of all
@@ -176,9 +176,12 @@ class DhcpAgent(manager.Manager):
"""Activate the DHCP agent."""
self.periodic_resync()
self.start_ready_ports_loop()
eventlet.spawn_n(self._process_loop)
pr_loop_thread = threading.Thread(target=self._process_loop)
pr_loop_thread.start()
if self.conf.bulk_reload_interval:
eventlet.spawn_n(self._reload_bulk_allocations)
bulk_thread = threading.Thread(
target=self._reload_bulk_allocations)
bulk_thread.start()
def _reload_bulk_allocations(self):
while True:
@@ -191,7 +194,7 @@ class DhcpAgent(manager.Manager):
network = self.cache.get_network_by_id(network_id)
if network is not None:
self.call_driver('bulk_reload_allocations', network)
eventlet.greenthread.sleep(self.conf.bulk_reload_interval)
time.sleep(self.conf.bulk_reload_interval)
def call_driver(self, action, network, **action_kwargs):
sid_segment = {}
@@ -288,7 +291,7 @@ class DhcpAgent(manager.Manager):
# This helps prevent one thread from acquiring the same lock over and
# over again, in which case no other threads waiting on the
# "dhcp-agent" lock would make any progress.
eventlet.greenthread.sleep(0)
time.sleep(0)
@_sync_lock
def sync_state(self, networks=None):
@@ -297,7 +300,6 @@ class DhcpAgent(manager.Manager):
"""
only_nets = set([] if (not networks or None in networks) else networks)
LOG.info('Synchronizing state')
pool = eventlet.GreenPool(self.conf.num_sync_threads)
known_network_ids = set(self.cache.get_network_ids())
try:
@@ -313,12 +315,19 @@ class DhcpAgent(manager.Manager):
LOG.exception('Unable to sync network state on '
'deleted network %s', deleted_id)
for network in active_networks:
if (not only_nets or # specifically resync all
network.id not in known_network_ids or # missing net
network.id in only_nets): # specific network to sync
pool.spawn(self.safe_configure_dhcp_for_network, network)
pool.waitall()
# Should we have max_executors set implicitly?
with concurrent.futures.ThreadPoolExecutor() as net_cfg_executor:
cfg_dhcp_for_net = []
for network in active_networks:
if (not only_nets or # specifically resync all
# missing net
network.id not in known_network_ids or
# specific network to sync
network.id in only_nets):
cfg_dhcp_for_net.append(net_cfg_executor.submit(
self.safe_configure_dhcp_for_network,
network))
concurrent.futures.wait(cfg_dhcp_for_net)
# we notify all ports in case some were created while the agent
# was down
self.dhcp_ready_ports |= set(self.cache.get_port_ids(only_nets))
@@ -364,12 +373,12 @@ class DhcpAgent(manager.Manager):
self.dhcp_ready_ports |= ports_to_send
while True:
eventlet.sleep(0.2)
time.sleep(0.2)
dhcp_ready_ports_loop()
def start_ready_ports_loop(self):
"""Spawn a thread to push changed ports to server."""
eventlet.spawn(self._dhcp_ready_ports_loop)
threading.Thread(target=self._dhcp_ready_ports_loop).start()
@utils.exception_logger()
def _periodic_resync_helper(self):
@@ -401,7 +410,8 @@ class DhcpAgent(manager.Manager):
def periodic_resync(self):
"""Spawn a thread to periodically resync the dhcp state."""
eventlet.spawn(self._periodic_resync_helper)
resync_thread = threading.Thread(target=self._periodic_resync_event)
resync_thread.start()
def safe_get_network_info(self, network_id):
try:
@@ -576,7 +586,9 @@ class DhcpAgent(manager.Manager):
LOG.debug("Starting _process_loop")
while True:
self._pool.spawn_n(self._process_resource_update)
pr_thread = threading.Thread(target=self._process_resource_update)
pr_thread.start()
pr_thread.join()
def _process_resource_update(self):
for tmp, update in self._queue.each_update_to_next_resource():

View File

@@ -18,10 +18,10 @@ import copy
import datetime
import signal
import sys
import time
from unittest import mock
import uuid
import eventlet
from neutron_lib.agent import constants as agent_consts
from neutron_lib import constants as const
from neutron_lib import exceptions
@@ -337,7 +337,7 @@ class TestDhcpAgent(base.BaseTestCase):
common_config.init(sys.argv[1:])
agent_mgr = dhcp_agent.DhcpAgentWithStateReport(
'testhost')
eventlet.greenthread.sleep(1)
time.sleep(1)
agent_mgr.after_start()
mock_periodic_resync.assert_called_once_with(agent_mgr)
mock_start_ready.assert_called_once_with(agent_mgr)
@@ -354,12 +354,13 @@ class TestDhcpAgent(base.BaseTestCase):
['periodic_resync', 'start_ready_ports_loop',
'_process_loop']}
with mock.patch.multiple(dhcp, **attrs_to_mock) as mocks:
with mock.patch.object(dhcp_agent.eventlet,
'spawn_n') as spawn_n:
with mock.patch.object(dhcp_agent.threading,
'Thread') as mock_thread:
dhcp.run()
mocks['periodic_resync'].assert_called_once_with()
mocks['start_ready_ports_loop'].assert_called_once_with()
spawn_n.assert_called_once_with(mocks['_process_loop'])
mock_thread.assert_called_once_with(
target=mocks['_process_loop'])
def test_call_driver(self):
network = mock.MagicMock()
@@ -498,11 +499,15 @@ class TestDhcpAgent(base.BaseTestCase):
self._test_sync_state_helper(['b'], ['a'])
def test_sync_state_waitall(self):
with mock.patch.object(dhcp_agent.eventlet.GreenPool, 'waitall') as w:
with mock.patch.object(dhcp_agent.concurrent.futures, 'wait') as w:
active_net_ids = ['1', '2', '3', '4', '5']
known_net_ids = ['1', '2', '3', '4', '5']
self._test_sync_state_helper(known_net_ids, active_net_ids)
w.assert_called_once_with()
# Expect concurrent.futures
wait_calls = [
mock.call([mock.ANY, mock.ANY, mock.ANY, mock.ANY, mock.ANY])
]
w.assert_has_calls(calls=wait_calls)
def test_sync_state_for_all_networks_plugin_error(self):
with mock.patch(DHCP_PLUGIN) as plug:
@@ -537,15 +542,16 @@ class TestDhcpAgent(base.BaseTestCase):
def test_periodic_resync(self):
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
with mock.patch.object(dhcp_agent.eventlet, 'spawn') as spawn:
with mock.patch.object(dhcp_agent.threading, "Thread") as mock_thread:
dhcp.periodic_resync()
spawn.assert_called_once_with(dhcp._periodic_resync_helper)
mock_thread.assert_called_once()
def test_start_ready_ports_loop(self):
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
with mock.patch.object(dhcp_agent.eventlet, 'spawn') as spawn:
with mock.patch.object(dhcp_agent.threading, 'Thread') as mock_thread:
dhcp.start_ready_ports_loop()
spawn.assert_called_once_with(dhcp._dhcp_ready_ports_loop)
mock_thread.assert_called_once_with(
target=dhcp._dhcp_ready_ports_loop)
def test__dhcp_ready_ports_doesnt_log_exception_on_timeout(self):
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
@@ -554,7 +560,7 @@ class TestDhcpAgent(base.BaseTestCase):
with mock.patch.object(dhcp.plugin_rpc, 'dhcp_ready_on_ports',
side_effect=oslo_messaging.MessagingTimeout):
# exit after 2 iterations
with mock.patch.object(dhcp_agent.eventlet, 'sleep',
with mock.patch.object(dhcp_agent.time, 'sleep',
side_effect=[0, 0, RuntimeError]):
with mock.patch.object(dhcp_agent.LOG, 'exception') as lex:
with testtools.ExpectedException(RuntimeError):
@@ -568,7 +574,7 @@ class TestDhcpAgent(base.BaseTestCase):
with mock.patch.object(dhcp.plugin_rpc, 'dhcp_ready_on_ports',
side_effect=[RuntimeError, 0]) as ready:
# exit after 2 iterations
with mock.patch.object(dhcp_agent.eventlet, 'sleep',
with mock.patch.object(dhcp_agent.time, 'sleep',
side_effect=[0, 0, RuntimeError]):
with testtools.ExpectedException(RuntimeError):
dhcp._dhcp_ready_ports_loop()
@@ -584,7 +590,7 @@ class TestDhcpAgent(base.BaseTestCase):
with mock.patch.object(dhcp.plugin_rpc,
'dhcp_ready_on_ports') as ready:
# exit after 2 iterations
with mock.patch.object(dhcp_agent.eventlet, 'sleep',
with mock.patch.object(dhcp_agent.time, 'sleep',
side_effect=[0, 0, RuntimeError]):
with testtools.ExpectedException(RuntimeError):
dhcp._dhcp_ready_ports_loop()
@@ -612,7 +618,7 @@ class TestDhcpAgent(base.BaseTestCase):
with mock.patch.object(dhcp.plugin_rpc,
'dhcp_ready_on_ports') as ready:
# exit after 1 iteration
with mock.patch.object(dhcp_agent.eventlet, 'sleep',
with mock.patch.object(dhcp_agent.time, 'sleep',
side_effect=[0, RuntimeError]):
with testtools.ExpectedException(RuntimeError):
dhcp._dhcp_ready_ports_loop()
@@ -633,7 +639,7 @@ class TestDhcpAgent(base.BaseTestCase):
with mock.patch.object(dhcp.plugin_rpc,
'dhcp_ready_on_ports') as ready:
# exit after 1 iteration
with mock.patch.object(dhcp_agent.eventlet, 'sleep',
with mock.patch.object(dhcp_agent.time, 'sleep',
side_effect=[0, RuntimeError]):
with testtools.ExpectedException(RuntimeError):
dhcp._dhcp_ready_ports_loop()