diff --git a/neutron/agent/ovn/metadata/agent.py b/neutron/agent/ovn/metadata/agent.py index e3aea2bd6d7..d806312ccfb 100644 --- a/neutron/agent/ovn/metadata/agent.py +++ b/neutron/agent/ovn/metadata/agent.py @@ -15,11 +15,11 @@ import collections import functools import re +import uuid from neutron_lib import constants as n_const from oslo_concurrency import lockutils from oslo_log import log -from oslo_utils import uuidutils from ovsdbapp.backend.ovs_idl import event as row_event from ovsdbapp.backend.ovs_idl import vlog import tenacity @@ -261,8 +261,10 @@ class MetadataAgent(object): # NOTE(lucasagomes): db_add() will not overwrite the UUID if # it's already set. table = ('Chassis_Private' if self.has_chassis_private else 'Chassis') - ext_ids = { - ovn_const.OVN_AGENT_METADATA_ID_KEY: uuidutils.generate_uuid()} + chassis_id = uuid.UUID(self._get_own_chassis_name()) + # Generate unique, but consistent metadata id for chassis name + agent_id = uuid.uuid5(chassis_id, 'metadata_agent') + ext_ids = {ovn_const.OVN_AGENT_METADATA_ID_KEY: str(agent_id)} self.sb_idl.db_add(table, self.chassis, 'external_ids', ext_ids).execute(check_error=True) diff --git a/neutron/plugins/ml2/drivers/ovn/agent/neutron_agent.py b/neutron/plugins/ml2/drivers/ovn/agent/neutron_agent.py index ea4290c701a..da3125ba404 100644 --- a/neutron/plugins/ml2/drivers/ovn/agent/neutron_agent.py +++ b/neutron/plugins/ml2/drivers/ovn/agent/neutron_agent.py @@ -14,10 +14,13 @@ import abc +from oslo_config import cfg from oslo_utils import timeutils +from neutron._i18n import _ from neutron.common.ovn import constants as ovn_const from neutron.common.ovn import utils as ovn_utils +from neutron.common import utils class NeutronAgent(abc.ABC): @@ -27,26 +30,26 @@ class NeutronAgent(abc.ABC): # Register the subclasses to be looked up by their type NeutronAgent.types[cls.agent_type] = cls - def __init__(self, chassis_private): - self.chassis_private = chassis_private - self.chassis = self.get_chassis(chassis_private) + def __init__(self, chassis_private, driver, updated_at=None): + self.driver = driver + self.set_down = False + self.update(chassis_private, updated_at) - @staticmethod - def get_chassis(chassis_private): - try: - return chassis_private.chassis[0] - except (AttributeError, IndexError): - # No Chassis_Private support, just use Chassis - return chassis_private + def update(self, chassis_private, updated_at=None, clear_down=False): + self.chassis_private = chassis_private + self.updated_at = updated_at or timeutils.utcnow(with_timezone=True) + if clear_down: + self.set_down = False @property - def updated_at(self): + def chassis(self): try: - return timeutils.parse_isotime(self.chassis.external_ids[self.key]) - except KeyError: - return timeutils.utcnow(with_timezone=True) + return self.chassis_private.chassis[0] + except (AttributeError, IndexError): + # No Chassis_Private support, just use Chassis + return self.chassis_private - def as_dict(self, alive): + def as_dict(self): return { 'binary': self.binary, 'host': self.chassis.hostname, @@ -62,39 +65,62 @@ class NeutronAgent(abc.ABC): 'start_flag': True, 'agent_type': self.agent_type, 'id': self.agent_id, - 'alive': alive, + 'alive': self.alive, 'admin_state_up': True} - @classmethod - def from_type(cls, _type, chassis_private): - return cls.types[_type](chassis_private) - - @staticmethod - def matches_chassis(chassis): - """Is this Agent type found on the passed in chassis?""" - return True + @property + def alive(self): + if self.set_down: + return False + # TODO(twilson) Determine if we can go back to just checking: + # if self.driver._nb_ovn.nb_global.nb_cfg == self.nb_cfg: + if self.driver._nb_ovn.nb_global.nb_cfg - self.nb_cfg <= 1: + return True + now = timeutils.utcnow(with_timezone=True) + if (now - self.updated_at).total_seconds() < cfg.CONF.agent_down_time: + # down, but not yet timed out + return True + return False @classmethod - def agents_from_chassis(cls, chassis_private): - return [AgentCls(chassis_private) - for AgentCls in cls.types.values() - if AgentCls.matches_chassis(cls.get_chassis(chassis_private))] + def from_type(cls, _type, chassis_private, driver, updated_at=None): + return cls.types[_type](chassis_private, driver, updated_at) @property @abc.abstractmethod def agent_type(self): pass + @property + @abc.abstractmethod + def binary(self): + pass + + @property + @abc.abstractmethod + def nb_cfg(self): + pass + + @property + @abc.abstractmethod + def agent_id(self): + pass + class ControllerAgent(NeutronAgent): agent_type = ovn_const.OVN_CONTROLLER_AGENT binary = 'ovn-controller' - key = ovn_const.OVN_LIVENESS_CHECK_EXT_ID_KEY + + @staticmethod # it is by default, but this makes pep8 happy + def __new__(cls, chassis_private, driver, updated_at=None): + if ('enable-chassis-as-gw' in + chassis_private.external_ids.get('ovn-cms-options', [])): + cls = ControllerGatewayAgent + return super().__new__(cls) @staticmethod - def matches_chassis(chassis): - return ('enable-chassis-as-gw' not in - chassis.external_ids.get('ovn-cms-options', [])) + def id_from_chassis_private(chassis_private): + return chassis_private.name @property def nb_cfg(self): @@ -102,7 +128,7 @@ class ControllerAgent(NeutronAgent): @property def agent_id(self): - return self.chassis_private.name + return self.id_from_chassis_private(self.chassis_private) @property def description(self): @@ -113,28 +139,76 @@ class ControllerAgent(NeutronAgent): class ControllerGatewayAgent(ControllerAgent): agent_type = ovn_const.OVN_CONTROLLER_GW_AGENT - @staticmethod - def matches_chassis(chassis): - return ('enable-chassis-as-gw' in - chassis.external_ids.get('ovn-cms-options', [])) - class MetadataAgent(NeutronAgent): agent_type = ovn_const.OVN_METADATA_AGENT binary = 'neutron-ovn-metadata-agent' - key = ovn_const.METADATA_LIVENESS_CHECK_EXT_ID_KEY + + @property + def alive(self): + # If ovn-controller is down, then metadata agent is down even + # if the metadata-agent binary is updating external_ids. + try: + if not AgentCache()[self.chassis_private.name].alive: + return False + except KeyError: + return False + return super().alive @property def nb_cfg(self): return int(self.chassis_private.external_ids.get( ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY, 0)) + @staticmethod + def id_from_chassis_private(chassis_private): + return chassis_private.external_ids.get( + ovn_const.OVN_AGENT_METADATA_ID_KEY) + @property def agent_id(self): - return self.chassis_private.external_ids.get( - ovn_const.OVN_AGENT_METADATA_ID_KEY) + return self.id_from_chassis_private(self.chassis_private) @property def description(self): return self.chassis_private.external_ids.get( ovn_const.OVN_AGENT_METADATA_DESC_KEY, '') + + +@utils.SingletonDecorator +class AgentCache: + def __init__(self, driver=None): + # This is just to make pylint happy because it doesn't like calls to + # AgentCache() with no arguments, despite init only being called the + # first time--and we do really want a driver passed in. + if driver is None: + raise ValueError(_("driver cannot be None")) + self.agents = {} + self.driver = driver + + def __iter__(self): + return iter(self.agents.values()) + + def __getitem__(self, key): + return self.agents[key] + + def update(self, agent_type, row, updated_at=None, clear_down=False): + cls = NeutronAgent.types[agent_type] + try: + agent = self.agents[cls.id_from_chassis_private(row)] + agent.update(row, updated_at=updated_at, clear_down=clear_down) + except KeyError: + agent = NeutronAgent.from_type(agent_type, row, self.driver, + updated_at=updated_at) + self.agents[agent.agent_id] = agent + return agent + + def __delitem__(self, agent_id): + del self.agents[agent_id] + + def agents_by_chassis_private(self, chassis_private): + # Get unique agent ids based on the chassis_private + agent_ids = {cls.id_from_chassis_private(chassis_private) + for cls in NeutronAgent.types.values()} + # Return the cached agents of agent_ids whose keys are in the cache + return (self.agents[id_] for id_ in agent_ids & self.agents.keys()) diff --git a/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py b/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py index 5596a3875d4..f5258344df1 100644 --- a/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py +++ b/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py @@ -36,7 +36,6 @@ from oslo_config import cfg from oslo_db import exception as os_db_exc from oslo_log import log from oslo_utils import timeutils -from ovsdbapp.backend.ovs_idl import idlutils from neutron._i18n import _ from neutron.common.ovn import acl as ovn_acl @@ -64,7 +63,6 @@ import neutron.wsgi LOG = log.getLogger(__name__) METADATA_READY_WAIT_TIMEOUT = 15 -AGENTS = {} class MetadataServiceReadyWaitTimeoutException(Exception): @@ -277,15 +275,12 @@ class OVNMechanismDriver(api.MechanismDriver): self.node_uuid = ovn_hash_ring_db.add_node(admin_context, self.hash_ring_group) + n_agent.AgentCache(self) # Initialize singleton agent cache self._nb_ovn, self._sb_ovn = impl_idl_ovn.get_ovn_idls(self, trigger) if self._sb_ovn.is_table_present('Chassis_Private'): self.agent_chassis_table = 'Chassis_Private' - # AGENTS must be populated after fork so if ovn-controller is stopped - # before a worker handles a get_agents request, we still show agents - populate_agents(self) - # Override agents API methods self.patch_plugin_merge("get_agents", get_agents) self.patch_plugin_choose("get_agent", get_agent) @@ -1130,38 +1125,6 @@ class OVNMechanismDriver(api.MechanismDriver): " neutron-ovn-metadata-agent status/logs.", port_id) - def agent_alive(self, agent, update_db): - # Allow a maximum of 1 difference between expected and read values - # to avoid false positives. - if self._nb_ovn.nb_global.nb_cfg - agent.nb_cfg <= 1: - if update_db: - self.mark_agent_alive(agent) - return True - - now = timeutils.utcnow(with_timezone=True) - if (now - agent.updated_at).total_seconds() < cfg.CONF.agent_down_time: - # down, but not yet timed out - return True - return False - - def mark_agent_alive(self, agent): - # Update the time of our successful check - value = timeutils.utcnow(with_timezone=True).isoformat() - self._sb_ovn.db_set( - self.agent_chassis_table, agent.chassis_private.uuid, - ('external_ids', {agent.key: value})).execute(check_error=True) - - def agents_from_chassis(self, chassis_private, update_db=True): - agent_dict = {} - # For each Chassis there will possibly be a Metadata agent and either - # a Controller or Controller Gateway agent. - for agent in n_agent.NeutronAgent.agents_from_chassis(chassis_private): - if not agent.agent_id: - continue - alive = self.agent_alive(agent, update_db) - agent_dict[agent.agent_id] = agent.as_dict(alive) - return agent_dict - def patch_plugin_merge(self, method_name, new_fn, op=operator.add): old_method = getattr(self._plugin, method_name) @@ -1228,42 +1191,22 @@ class OVNMechanismDriver(api.MechanismDriver): return azs -def populate_agents(driver): - for ch in driver._sb_ovn.tables[driver.agent_chassis_table].rows.values(): - # update the cache, rows are hashed on uuid but it is the name that - # stays consistent across ovn-controller restarts - AGENTS.update({ch.name: ch}) - - def get_agents(self, context, filters=None, fields=None, _driver=None): - update_db = _driver.ping_all_chassis() + _driver.ping_all_chassis() filters = filters or {} agent_list = [] - populate_agents(_driver) - for ch in AGENTS.values(): - for agent in _driver.agents_from_chassis(ch, update_db).values(): - if all(agent[k] in v for k, v in filters.items()): - agent_list.append(agent) + for agent in n_agent.AgentCache(): + agent_dict = agent.as_dict() + if all(agent_dict[k] in v for k, v in filters.items()): + agent_list.append(agent_dict) return agent_list def get_agent(self, context, id, fields=None, _driver=None): - chassis = None try: - # look up Chassis by *name*, which the id attribute is - chassis = _driver._sb_ovn.lookup(_driver.agent_chassis_table, id) - except idlutils.RowNotFound: - # If the UUID is not found, check for the metadata agent ID - for ch in _driver._sb_ovn.tables[ - _driver.agent_chassis_table].rows.values(): - metadata_agent_id = ch.external_ids.get( - ovn_const.OVN_AGENT_METADATA_ID_KEY) - if id == metadata_agent_id: - chassis = ch - break - else: - raise n_exc.agent.AgentNotFound(id=id) - return _driver.agents_from_chassis(chassis)[id] + return n_agent.AgentCache()[id].as_dict() + except KeyError: + raise n_exc.agent.AgentNotFound(id=id) def update_agent(self, context, id, agent, _driver=None): @@ -1289,9 +1232,28 @@ def update_agent(self, context, id, agent, _driver=None): def delete_agent(self, context, id, _driver=None): - get_agent(self, None, id, _driver=_driver) - raise n_exc.BadRequest(resource='agent', - msg='OVN agents cannot be deleted') + # raise AgentNotFound if this isn't an ml2/ovn-related agent + agent = get_agent(self, None, id, _driver=_driver) + + # NOTE(twilson) According to the API docs, an agent must be disabled + # before deletion. Otherwise, behavior seems to be undefined. We could + # check that alive=False before allowing deletion, but depending on the + # agent_down_time setting, that could take quite a while. + # If ovn-controller is up, the Chassis will be recreated and so the agent + # will still show as up. The recreated Chassis will cause all kinds of + # events to fire. But again, undefined behavior. + chassis_name = agent['configurations']['chassis_name'] + _driver._sb_ovn.chassis_del(chassis_name, if_exists=True).execute( + check_error=True) + # Send a specific event that all API workers can get to delete the agent + # from their caches. Ideally we could send a single transaction that both + # created and deleted the key, but alas python-ovs is too "smart" + _driver._sb_ovn.db_set( + 'SB_Global', '.', ('external_ids', {'delete_agent': str(id)})).execute( + check_error=True) + _driver._sb_ovn.db_remove( + 'SB_Global', '.', 'external_ids', delete_agent=str(id), + if_exists=True).execute(check_error=True) def create_default_drop_port_group(nb_idl): diff --git a/neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/ovsdb_monitor.py b/neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/ovsdb_monitor.py index 4951aa73fb0..cc091727baa 100644 --- a/neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/ovsdb_monitor.py +++ b/neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/ovsdb_monitor.py @@ -34,6 +34,7 @@ from neutron.common.ovn import hash_ring_manager from neutron.common.ovn import utils from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf from neutron.db import ovn_hash_ring_db +from neutron.plugins.ml2.drivers.ovn.agent import neutron_agent as n_agent CONF = cfg.CONF @@ -213,6 +214,92 @@ class PortBindingChassisUpdateEvent(row_event.RowEvent): self.driver.set_port_status_up(row.logical_port) +class ChassisAgentEvent(BaseEvent): + GLOBAL = True + + # NOTE (twilson) Do not run new transactions out of a GLOBAL Event since + # it will be running on every single process, and you almost certainly + # don't want to insert/update/delete something a bajillion times. + def __init__(self, driver): + self.driver = driver + super().__init__() + + @property + def table(self): + # It probably doesn't matter, but since agent_chassis_table changes + # in post_fork_initialize(), resolve this at runtime + return self.driver.agent_chassis_table + + @table.setter + def table(self, value): + pass + + +class ChassisAgentDownEvent(ChassisAgentEvent): + events = (BaseEvent.ROW_DELETE,) + + def run(self, event, row, old): + for agent in n_agent.AgentCache().agents_by_chassis_private(row): + agent.set_down = True + + def match_fn(self, event, row, old=None): + return True + + +class ChassisAgentDeleteEvent(ChassisAgentEvent): + events = (BaseEvent.ROW_UPDATE,) + table = 'SB_Global' + + def match_fn(self, event, row, old=None): + try: + return (old.external_ids.get('delete_agent') != + row.external_ids['delete_agent']) + except (AttributeError, KeyError): + return False + + def run(self, event, row, old): + del n_agent.AgentCache()[row.external_ids['delete_agent']] + + +class ChassisAgentWriteEvent(ChassisAgentEvent): + events = (BaseEvent.ROW_CREATE, BaseEvent.ROW_UPDATE) + + def match_fn(self, event, row, old=None): + return event == self.ROW_CREATE or getattr(old, 'nb_cfg', False) + + def run(self, event, row, old): + n_agent.AgentCache().update(ovn_const.OVN_CONTROLLER_AGENT, row, + clear_down=event == self.ROW_CREATE) + + +class ChassisMetadataAgentWriteEvent(ChassisAgentEvent): + events = (BaseEvent.ROW_CREATE, BaseEvent.ROW_UPDATE) + + @staticmethod + def _metadata_nb_cfg(row): + return int( + row.external_ids.get(ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY, -1)) + + @staticmethod + def agent_id(row): + return row.external_ids.get(ovn_const.OVN_AGENT_METADATA_ID_KEY) + + def match_fn(self, event, row, old=None): + if not self.agent_id(row): + # Don't create a cached object with an agent_id of 'None' + return False + if event == self.ROW_CREATE: + return True + try: + return self._metadata_nb_cfg(row) != self._metadata_nb_cfg(old) + except (AttributeError, KeyError): + return False + + def run(self, event, row, old): + n_agent.AgentCache().update(ovn_const.OVN_METADATA_AGENT, row, + clear_down=True) + + class PortBindingChassisEvent(row_event.RowEvent): """Port_Binding update event - set chassis for chassisredirect port. @@ -359,8 +446,24 @@ class NeutronPgDropPortGroupCreated(row_event.WaitEvent): class OvnDbNotifyHandler(row_event.RowEventHandler): def __init__(self, driver): - super(OvnDbNotifyHandler, self).__init__() self.driver = driver + super(OvnDbNotifyHandler, self).__init__() + try: + self._lock = self._RowEventHandler__lock + self._watched_events = self._RowEventHandler__watched_events + except AttributeError: + pass + + def notify(self, event, row, updates=None, global_=False): + matching = self.matching_events(event, row, updates, global_) + for match in matching: + self.notifications.put((match, event, row, updates)) + + def matching_events(self, event, row, updates, global_=False): + with self._lock: + return tuple(t for t in self._watched_events + if getattr(t, 'GLOBAL', False) == global_ and + self.match(t, event, row, updates)) class Ml2OvnIdlBase(connection.OvsdbIdl): @@ -448,12 +551,12 @@ class OvnIdlDistributedLock(BaseOvnIdl): self._last_touch = None def notify(self, event, row, updates=None): + self.notify_handler.notify(event, row, updates, global_=True) try: target_node = self._hash_ring.get_node(str(row.uuid)) except exceptions.HashRingIsEmpty as e: LOG.error('HashRing is empty, error: %s', e) return - if target_node != self._node_uuid: return @@ -530,6 +633,14 @@ class OvnNbIdl(OvnIdlDistributedLock): class OvnSbIdl(OvnIdlDistributedLock): + def __init__(self, driver, remote, schema): + super(OvnSbIdl, self).__init__(driver, remote, schema) + self.notify_handler.watch_events([ + ChassisAgentDeleteEvent(self.driver), + ChassisAgentDownEvent(self.driver), + ChassisAgentWriteEvent(self.driver), + ChassisMetadataAgentWriteEvent(self.driver)]) + @classmethod def from_server(cls, connection_string, schema_name, driver): _check_and_set_ssl_files(schema_name) @@ -541,6 +652,7 @@ class OvnSbIdl(OvnIdlDistributedLock): helper.register_table('Port_Binding') helper.register_table('Datapath_Binding') helper.register_table('MAC_Binding') + helper.register_columns('SB_Global', ['external_ids']) return cls(driver, connection_string, helper) def post_connect(self): diff --git a/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py b/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py index 490b7c0d552..92937f47149 100644 --- a/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py +++ b/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py @@ -15,7 +15,6 @@ from unittest import mock import fixtures as og_fixtures -from oslo_config import cfg from oslo_utils import uuidutils from neutron.common.ovn import constants as ovn_const @@ -70,6 +69,10 @@ class DistributedLockTestEvent(event.WaitEvent): self.event.set() +class GlobalTestEvent(DistributedLockTestEvent): + GLOBAL = True + + class TestNBDbMonitor(base.TestOVNFunctionalBase): def setUp(self): @@ -198,15 +201,12 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase): self._test_port_binding_and_status(port['id'], 'bind', 'ACTIVE') self._test_port_binding_and_status(port['id'], 'unbind', 'DOWN') - def test_distributed_lock(self): - api_workers = 11 - cfg.CONF.set_override('api_workers', api_workers) - row_event = DistributedLockTestEvent() + def _create_workers(self, row_event, worker_num): self.mech_driver._nb_ovn.idl.notify_handler.watch_event(row_event) - worker_list = [self.mech_driver._nb_ovn, ] + worker_list = [self.mech_driver._nb_ovn] # Create 10 fake workers - for _ in range(api_workers - len(worker_list)): + for _ in range(worker_num): node_uuid = uuidutils.generate_uuid() db_hash_ring.add_node( self.context, ovn_const.HASH_RING_ML2_GROUP, node_uuid) @@ -228,11 +228,17 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase): # Assert we have 11 active workers in the ring self.assertEqual( - 11, len(db_hash_ring.get_active_nodes( - self.context, - interval=ovn_const.HASH_RING_NODES_TIMEOUT, - group_name=ovn_const.HASH_RING_ML2_GROUP))) + worker_num + 1, + len(db_hash_ring.get_active_nodes( + self.context, + interval=ovn_const.HASH_RING_NODES_TIMEOUT, + group_name=ovn_const.HASH_RING_ML2_GROUP))) + return worker_list + + def test_distributed_lock(self): + row_event = DistributedLockTestEvent() + self._create_workers(row_event, worker_num=10) # Trigger the event self.create_port() @@ -242,6 +248,30 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase): # Assert that only one worker handled the event self.assertEqual(1, row_event.COUNTER) + def test_global_events(self): + worker_num = 10 + distributed_event = DistributedLockTestEvent() + global_event = GlobalTestEvent() + worker_list = self._create_workers(distributed_event, worker_num) + for worker in worker_list: + worker.idl.notify_handler.watch_event(global_event) + + # This should generate one distributed even handled by a single worker + # and one global event, that should be handled by all workers + self.create_port() + + # Wait for the distributed event to complete + self.assertTrue(distributed_event.wait()) + + # Assert that only one worker handled the distributed event + self.assertEqual(1, distributed_event.COUNTER) + + n_utils.wait_until_true( + lambda: global_event.COUNTER == worker_num + 1, + exception=Exception( + "Fanout event didn't get handled expected %d times" % + (worker_num + 1))) + class TestNBDbMonitorOverTcp(TestNBDbMonitor): def get_ovsdb_server_protocol(self): diff --git a/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py b/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py index e50b013b7c7..7d15d5a5b1c 100644 --- a/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py +++ b/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py @@ -17,8 +17,10 @@ from unittest import mock from neutron_lib.api.definitions import portbindings from neutron_lib import constants +from neutron_lib.exceptions import agent as agent_exc from oslo_config import cfg from oslo_utils import uuidutils +from ovsdbapp.backend.ovs_idl import event from ovsdbapp.tests.functional import base as ovs_base from neutron.common.ovn import constants as ovn_const @@ -744,23 +746,75 @@ class TestProvnetPorts(base.TestOVNFunctionalBase): self.assertIsNone(ovn_localnetport) +class AgentWaitEvent(event.WaitEvent): + """Wait for a list of Chassis to be created""" + + ONETIME = False + + def __init__(self, driver, chassis_names): + table = driver.agent_chassis_table + events = (self.ROW_CREATE,) + self.chassis_names = chassis_names + super().__init__(events, table, None) + self.event_name = "AgentWaitEvent" + + def match_fn(self, event, row, old): + return row.name in self.chassis_names + + def run(self, event, row, old): + self.chassis_names.remove(row.name) + if not self.chassis_names: + self.event.set() + + class TestAgentApi(base.TestOVNFunctionalBase): + TEST_AGENT = 'test' def setUp(self): super().setUp() - self.host = 'test-host' - self.controller_agent = self.add_fake_chassis(self.host) + self.host = n_utils.get_rand_name(prefix='testhost-') self.plugin = self.mech_driver._plugin - agent = {'agent_type': 'test', 'binary': '/bin/test', - 'host': self.host, 'topic': 'test_topic'} - _, status = self.plugin.create_or_update_agent(self.context, agent) - self.test_agent = status['id'] mock.patch.object(self.mech_driver, 'ping_all_chassis', return_value=False).start() - def test_agent_show_non_ovn(self): - self.assertTrue(self.plugin.get_agent(self.context, self.test_agent)) + metadata_agent_id = uuidutils.generate_uuid() + # To be *mostly* sure the agent cache has been updated, we need to + # wait for the Chassis events to run. So add a new event that should + # run afterthey do and wait for it. I've only had to do this when + # adding *a bunch* of Chassis at a time, but better safe than sorry. + chassis_name = uuidutils.generate_uuid() + agent_event = AgentWaitEvent(self.mech_driver, [chassis_name]) + self.sb_api.idl.notify_handler.watch_event(agent_event) - def test_agent_show_ovn_controller(self): - self.assertTrue(self.plugin.get_agent(self.context, - self.controller_agent)) + self.chassis = self.add_fake_chassis(self.host, name=chassis_name, + external_ids={ + ovn_const.OVN_AGENT_METADATA_ID_KEY: metadata_agent_id}) + + self.assertTrue(agent_event.wait()) + + self.agent_types = { + self.TEST_AGENT: self._create_test_agent(), + ovn_const.OVN_CONTROLLER_AGENT: self.chassis, + ovn_const.OVN_METADATA_AGENT: metadata_agent_id, + } + + def _create_test_agent(self): + agent = {'agent_type': self.TEST_AGENT, 'binary': '/bin/test', + 'host': self.host, 'topic': 'test_topic'} + _, status = self.plugin.create_or_update_agent(self.context, agent) + return status['id'] + + def test_agent_show(self): + for agent_id in self.agent_types.values(): + self.assertTrue(self.plugin.get_agent(self.context, agent_id)) + + def test_agent_list(self): + agent_ids = [a['id'] for a in self.plugin.get_agents( + self.context, filters={'host': self.host})] + self.assertCountEqual(list(self.agent_types.values()), agent_ids) + + def test_agent_delete(self): + for agent_id in self.agent_types.values(): + self.plugin.delete_agent(self.context, agent_id) + self.assertRaises(agent_exc.AgentNotFound, self.plugin.get_agent, + self.context, agent_id) diff --git a/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py b/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py index d2341288c98..d6aa8ece836 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py +++ b/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py @@ -217,13 +217,18 @@ class TestOvnIdlDistributedLock(base.BaseTestCase): hash_ring_manager.HashRingManager, 'get_node', return_value=self.node_uuid).start() + def _assert_has_notify_calls(self): + self.idl.notify_handler.notify.assert_has_calls([ + mock.call(self.fake_event, self.fake_row, None, global_=True), + mock.call(self.fake_event, self.fake_row, None)]) + self.assertEqual(2, len(self.idl.notify_handler.mock_calls)) + @mock.patch.object(ovn_hash_ring_db, 'touch_node') def test_notify(self, mock_touch_node): self.idl.notify(self.fake_event, self.fake_row) mock_touch_node.assert_called_once_with(mock.ANY, self.node_uuid) - self.idl.notify_handler.notify.assert_called_once_with( - self.fake_event, self.fake_row, None) + self._assert_has_notify_calls() @mock.patch.object(ovn_hash_ring_db, 'touch_node') def test_notify_skip_touch_node(self, mock_touch_node): @@ -233,8 +238,7 @@ class TestOvnIdlDistributedLock(base.BaseTestCase): # Assert that touch_node() wasn't called self.assertFalse(mock_touch_node.called) - self.idl.notify_handler.notify.assert_called_once_with( - self.fake_event, self.fake_row, None) + self._assert_has_notify_calls() @mock.patch.object(ovn_hash_ring_db, 'touch_node') def test_notify_last_touch_expired(self, mock_touch_node): @@ -250,8 +254,7 @@ class TestOvnIdlDistributedLock(base.BaseTestCase): # Assert that touch_node() was invoked mock_touch_node.assert_called_once_with(mock.ANY, self.node_uuid) - self.idl.notify_handler.notify.assert_called_once_with( - self.fake_event, self.fake_row, None) + self._assert_has_notify_calls() @mock.patch.object(ovsdb_monitor.LOG, 'exception') @mock.patch.object(ovn_hash_ring_db, 'touch_node') @@ -264,14 +267,14 @@ class TestOvnIdlDistributedLock(base.BaseTestCase): mock_touch_node.assert_called_once_with(mock.ANY, self.node_uuid) # Assert we are logging the exception self.assertTrue(mock_log.called) - self.idl.notify_handler.notify.assert_called_once_with( - self.fake_event, self.fake_row, None) + self._assert_has_notify_calls() def test_notify_different_node(self): self.mock_get_node.return_value = 'different-node-uuid' self.idl.notify('fake-event', self.fake_row) # Assert that notify() wasn't called for a different node uuid - self.assertFalse(self.idl.notify_handler.notify.called) + self.idl.notify_handler.notify.assert_called_once_with( + self.fake_event, self.fake_row, None, global_=True) class TestPortBindingChassisUpdateEvent(base.BaseTestCase): @@ -420,8 +423,9 @@ class TestOvnNbIdlNotifyHandler(test_mech_driver.OVNMechanismDriverTestCase): self.idl.notify_handler.notify = mock.Mock() self.idl.notify("create", row) # Assert that if the target_node returned by the ring is different - # than this driver's node_uuid, notify() won't be called - self.assertFalse(self.idl.notify_handler.notify.called) + # than this driver's node_uuid, only global notify() won't be called + self.idl.notify_handler.notify.assert_called_once_with( + "create", row, None, global_=True) class TestOvnSbIdlNotifyHandler(test_mech_driver.OVNMechanismDriverTestCase): @@ -432,6 +436,7 @@ class TestOvnSbIdlNotifyHandler(test_mech_driver.OVNMechanismDriverTestCase): super(TestOvnSbIdlNotifyHandler, self).setUp() sb_helper = ovs_idl.SchemaHelper(schema_json=OVN_SB_SCHEMA) sb_helper.register_table('Chassis') + self.driver.agent_chassis_table = 'Chassis' self.sb_idl = ovsdb_monitor.OvnSbIdl(self.driver, "remote", sb_helper) self.sb_idl.post_connect() self.chassis_table = self.sb_idl.tables.get('Chassis') diff --git a/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py b/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py index 37ac85a1238..22f885fcde8 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py +++ b/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py @@ -87,6 +87,11 @@ class TestOVNMechanismDriver(test_plugin.Ml2PluginV2TestCase): super(TestOVNMechanismDriver, self).setUp() mm = directory.get_plugin().mechanism_manager self.mech_driver = mm.mech_drivers['ovn'].obj + neutron_agent.AgentCache(self.mech_driver) + # Because AgentCache is a singleton and we get a new mech_driver each + # setUp(), override the AgentCache driver. + neutron_agent.AgentCache().driver = self.mech_driver + self.mech_driver._nb_ovn = fakes.FakeOvsdbNbOvnIdl() self.mech_driver._sb_ovn = fakes.FakeOvsdbSbOvnIdl() self.mech_driver._ovn_client._qos_driver = mock.Mock() @@ -1724,73 +1729,75 @@ class TestOVNMechanismDriver(test_plugin.Ml2PluginV2TestCase): self.plugin.update_port_status.assert_called_once_with( fake_context, fake_port['id'], const.PORT_STATUS_ACTIVE) - def _add_chassis_agent(self, nb_cfg, agent_type, updated_at=None): - updated_at = updated_at or datetime.datetime.utcnow() + def _add_chassis(self, nb_cfg): chassis_private = mock.Mock() chassis_private.nb_cfg = nb_cfg chassis_private.uuid = uuid.uuid4() + chassis_private.name = str(uuid.uuid4()) + return chassis_private + + def _add_chassis_agent(self, nb_cfg, agent_type, chassis_private=None, + updated_at=None): + updated_at = updated_at or timeutils.utcnow(with_timezone=True) + chassis_private = chassis_private or self._add_chassis(nb_cfg) chassis_private.external_ids = { ovn_const.OVN_LIVENESS_CHECK_EXT_ID_KEY: datetime.datetime.isoformat(updated_at)} if agent_type == ovn_const.OVN_METADATA_AGENT: chassis_private.external_ids.update({ ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY: nb_cfg, - ovn_const.METADATA_LIVENESS_CHECK_EXT_ID_KEY: - datetime.datetime.isoformat(updated_at)}) + ovn_const.OVN_AGENT_METADATA_ID_KEY: str(uuid.uuid4())}) chassis_private.chassis = [chassis_private] - - return neutron_agent.NeutronAgent.from_type( - agent_type, chassis_private) + return neutron_agent.AgentCache().update(agent_type, chassis_private, + updated_at) def test_agent_alive_true(self): + chassis_private = self._add_chassis(5) for agent_type in (ovn_const.OVN_CONTROLLER_AGENT, ovn_const.OVN_METADATA_AGENT): self.mech_driver._nb_ovn.nb_global.nb_cfg = 5 - agent = self._add_chassis_agent(5, agent_type) - self.assertTrue(self.mech_driver.agent_alive(agent, - update_db=True)) - # Assert that each Chassis has been updated in the SB database - self.assertEqual(2, self.sb_ovn.db_set.call_count) + agent = self._add_chassis_agent(5, agent_type, chassis_private) + self.assertTrue(agent.alive, "Agent of type %s alive=%s" % + (agent.agent_type, agent.alive)) def test_agent_alive_true_one_diff(self): # Agent should be reported as alive when the nb_cfg delta is 1 # even if the last update time was old enough. + nb_cfg = 5 + chassis_private = self._add_chassis(nb_cfg) for agent_type in (ovn_const.OVN_CONTROLLER_AGENT, ovn_const.OVN_METADATA_AGENT): - self.mech_driver._nb_ovn.nb_global.nb_cfg = 5 + self.mech_driver._nb_ovn.nb_global.nb_cfg = nb_cfg + 1 now = timeutils.utcnow() updated_at = now - datetime.timedelta(cfg.CONF.agent_down_time + 1) - agent = self._add_chassis_agent(4, agent_type, updated_at) - self.assertTrue(self.mech_driver.agent_alive(agent, - update_db=True)) + agent = self._add_chassis_agent(nb_cfg, agent_type, + chassis_private, updated_at) + self.assertTrue(agent.alive, "Agent of type %s alive=%s" % + (agent.agent_type, agent.alive)) def test_agent_alive_not_timed_out(self): + nb_cfg = 3 + chassis_private = self._add_chassis(nb_cfg) for agent_type in (ovn_const.OVN_CONTROLLER_AGENT, ovn_const.OVN_METADATA_AGENT): - self.mech_driver._nb_ovn.nb_global.nb_cfg = 5 - agent = self._add_chassis_agent(3, agent_type) - self.assertTrue(self.mech_driver.agent_alive( - agent, update_db=True), - "Agent type %s is not alive" % agent_type) + self.mech_driver._nb_ovn.nb_global.nb_cfg = nb_cfg + 2 + agent = self._add_chassis_agent(nb_cfg, agent_type, + chassis_private) + self.assertTrue(agent.alive, "Agent of type %s alive=%s" % + (agent.agent_type, agent.alive)) def test_agent_alive_timed_out(self): + nb_cfg = 3 + chassis_private = self._add_chassis(nb_cfg) for agent_type in (ovn_const.OVN_CONTROLLER_AGENT, ovn_const.OVN_METADATA_AGENT): - self.mech_driver._nb_ovn.nb_global.nb_cfg = 5 - now = timeutils.utcnow() + self.mech_driver._nb_ovn.nb_global.nb_cfg = nb_cfg + 2 + now = timeutils.utcnow(with_timezone=True) updated_at = now - datetime.timedelta(cfg.CONF.agent_down_time + 1) - agent = self._add_chassis_agent(3, agent_type, updated_at) - self.assertFalse(self.mech_driver.agent_alive(agent, - update_db=True)) - - def test_agent_alive_true_skip_db_update(self): - for agent_type in (ovn_const.OVN_CONTROLLER_AGENT, - ovn_const.OVN_METADATA_AGENT): - self.mech_driver._nb_ovn.nb_global.nb_cfg = 5 - agent = self._add_chassis_agent(5, agent_type) - self.assertTrue(self.mech_driver.agent_alive(agent, - update_db=False)) - self.sb_ovn.db_set.assert_not_called() + agent = self._add_chassis_agent(nb_cfg, agent_type, + chassis_private, updated_at) + self.assertFalse(agent.alive, "Agent of type %s alive=%s" % + (agent.agent_type, agent.alive)) def _test__update_dnat_entry_if_needed(self, up=True): ovn_conf.cfg.CONF.set_override( diff --git a/releasenotes/notes/support-deleting-ovn-agents-0a5635d9078498ba.yaml b/releasenotes/notes/support-deleting-ovn-agents-0a5635d9078498ba.yaml new file mode 100644 index 00000000000..4a6fa981767 --- /dev/null +++ b/releasenotes/notes/support-deleting-ovn-agents-0a5635d9078498ba.yaml @@ -0,0 +1,7 @@ +--- +features: + - | + Add support for deleting ML2/OVN agents. Previously, deleting an agent + would return a Bad Request error. In addition to deleting the agent, + this change also drastically improves the scalability of the ML2/OVN + agent handling code.