Decouple L3 and service plugins during DVR router migration

This change leverages the event registry to decouple L3, VPN and FW
when checking whether a router can be converted to a DVR router.

This patch cleans the UT's too.

Depends-on: I5bfec047ec8404a6d699115a9da332988518f807
Depends-on: I6505fd11776e29895457e67806bec34d3f2c6e24

Related-blueprint: services-split
Related-blueprint: plugin-interface-perestroika

Change-Id: I6b5769a51b81b965c644d8a9a4e7d424f4f89114
This commit is contained in:
armando-migliaccio
2015-02-10 12:50:11 -08:00
parent de3bd2e61a
commit 15947d3399
2 changed files with 28 additions and 70 deletions

View File

@@ -14,8 +14,13 @@
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from neutron.api.v2 import attributes
from neutron.callbacks import events
from neutron.callbacks import exceptions
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.common import constants as l3_const
from neutron.common import exceptions as n_exc
from neutron.common import utils as n_utils
@@ -83,34 +88,19 @@ class L3_NAT_with_dvr_db_mixin(l3_db.L3_NAT_db_mixin,
raise NotImplementedError()
elif (not router_db.extra_attributes.distributed and
router_res.get('distributed')):
# Add a check for Services FWaaS and VPNaaS
# This check below ensures that the legacy routers with
# associated VPNaaS or FWaaS services are not allowed to
# migrate.
if (self.check_router_has_no_vpnaas(context, router_db) and
self.check_router_has_no_firewall(context, router_db)):
LOG.info(_LI("No Service associated, so safe to migrate: %s "
"listed"), router_db['id'])
def check_router_has_no_firewall(self, context, router_db):
"""Check if FWaaS is associated with the legacy router."""
fwaas_service = manager.NeutronManager.get_service_plugins().get(
constants.FIREWALL)
if fwaas_service:
tenant_firewalls = fwaas_service.get_firewalls(
context,
filters={'tenant_id': [router_db['tenant_id']]})
if tenant_firewalls:
raise l3.RouterInUse(router_id=router_db['id'])
return True
def check_router_has_no_vpnaas(self, context, router_db):
"""Check if VPNaaS is associated with the legacy router."""
vpn_plugin = manager.NeutronManager.get_service_plugins().get(
constants.VPN)
if vpn_plugin:
vpn_plugin.check_router_in_use(context, router_db['id'])
return True
# Notify advanced services of the imminent state transition
# for the router.
try:
kwargs = {'context': context, 'router': router_db}
registry.notify(
resources.ROUTER, events.BEFORE_UPDATE, self, **kwargs)
except exceptions.CallbackFailure as e:
with excutils.save_and_reraise_exception():
# NOTE(armax): preserve old check's behavior
if len(e.errors) == 1:
raise e.errors[0].error
raise l3.RouterInUse(router_id=router_db['id'],
reason=e)
def _update_distributed_attr(
self, context, router_id, router_db, data, gw_info):

View File

@@ -20,7 +20,6 @@ from neutron.common import constants as l3_const
from neutron.common import exceptions
from neutron import context
from neutron.db import l3_dvr_db
from neutron.extensions import l3
from neutron import manager
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants as plugin_const
@@ -478,47 +477,6 @@ class L3DvrTestCase(testlib_api.SqlTestCase):
self.assertFalse(create_fip.called)
self.assertFalse(delete_fip.called)
def test__validate_router_migration_prevent_check_advanced_svc(self):
router = {'name': 'foo_router', 'admin_state_up': True}
router_db = self._create_router(router)
# make sure the check are invoked, whether they pass or
# raise, it does not matter in the context of this test
with contextlib.nested(
mock.patch.object(self.mixin, 'check_router_has_no_firewall'),
mock.patch.object(self.mixin, 'check_router_has_no_vpnaas')
) as (check_fw, check_vpn):
self.mixin._validate_router_migration(
self.ctx, router_db, {'distributed': True})
check_fw.assert_called_once_with(self.ctx, router_db)
check_vpn.assert_called_once_with(self.ctx, router_db)
def test_check_router_has_no_firewall_raises(self):
with mock.patch.object(
manager.NeutronManager, 'get_service_plugins') as sp:
fw_plugin = mock.Mock()
sp.return_value = {'FIREWALL': fw_plugin}
fw_plugin.get_firewalls.return_value = [mock.ANY]
self.assertRaises(
l3.RouterInUse,
self.mixin.check_router_has_no_firewall,
self.ctx, {'id': 'foo_id', 'tenant_id': 'foo_tenant'})
def test_check_router_has_no_firewall_passes(self):
with mock.patch.object(manager.NeutronManager,
'get_service_plugins',
return_value={}):
self.assertTrue(
self.mixin.check_router_has_no_firewall(mock.ANY, mock.ANY))
def test_check_router_has_no_vpn(self):
with mock.patch.object(
manager.NeutronManager, 'get_service_plugins') as sp:
vpn_plugin = mock.Mock()
sp.return_value = {'VPN': vpn_plugin}
self.mixin.check_router_has_no_vpnaas(mock.ANY, {'id': 'foo_id'})
vpn_plugin.check_router_in_use.assert_called_once_with(
mock.ANY, 'foo_id')
def test_remove_router_interface_delete_router_l3agent_binding(self):
interface_info = {'subnet_id': '123'}
router = mock.MagicMock()
@@ -561,3 +519,13 @@ class L3DvrTestCase(testlib_api.SqlTestCase):
self.assertTrue(plugin.get_l3_agents_hosting_routers.called)
self.assertTrue(plugin.check_ports_exist_on_l3agent.called)
self.assertTrue(plugin.remove_router_from_l3_agent.called)
def test__validate_router_migration_notify_advanced_services(self):
router = {'name': 'foo_router', 'admin_state_up': True}
router_db = self._create_router(router)
with mock.patch.object(l3_dvr_db.registry, 'notify') as mock_notify:
self.mixin._validate_router_migration(
self.ctx, router_db, {'distributed': True})
kwargs = {'context': self.ctx, 'router': router_db}
mock_notify.assert_called_once_with(
'router', 'before_update', self.mixin, **kwargs)