diff --git a/neutron/agent/l3/extensions/conntrack_helper.py b/neutron/agent/l3/extensions/conntrack_helper.py new file mode 100644 index 00000000000..2ab1942019f --- /dev/null +++ b/neutron/agent/l3/extensions/conntrack_helper.py @@ -0,0 +1,280 @@ +# Copyright (c) 2019 Red Hat Inc. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections + +from neutron_lib.agent import l3_extension +from neutron_lib import constants +from neutron_lib import rpc as n_rpc +from oslo_concurrency import lockutils +from oslo_log import log as logging + +from neutron.api.rpc.callbacks.consumer import registry +from neutron.api.rpc.callbacks import events +from neutron.api.rpc.callbacks import resources +from neutron.api.rpc.handlers import resources_rpc + + +LOG = logging.getLogger(__name__) +DEFAULT_CONNTRACK_HELPER_CHAIN = 'cth' +CONNTRACK_HELPER_PREFIX = 'cthelper-' +CONNTRACK_HELPER_CHAIN_PREFIX = DEFAULT_CONNTRACK_HELPER_CHAIN + '-' + + +class ConntrackHelperMapping(object): + + def __init__(self): + self._managed_conntrack_helpers = {} + """ + router_conntrack_helper_mapping = { + router_id_1: set(cth_id_1, cth_id_2), + router_id_2: set(cth_id_3, cth_id_4) + } + """ + self._router_conntrack_helper_mapping = collections.defaultdict(set) + + def set_conntrack_helpers(self, conntrack_helpers): + for cth in conntrack_helpers: + self._router_conntrack_helper_mapping[cth.router_id].add(cth.id) + self._managed_conntrack_helpers[cth.id] = cth + + def update_conntrack_helpers(self, conntrack_helpers): + for cth in conntrack_helpers: + if (cth.id not in + self._router_conntrack_helper_mapping[cth.router_id]): + self._router_conntrack_helper_mapping[cth.router_id].add( + cth.id) + self._managed_conntrack_helpers[cth.id] = cth + + def get_conntack_helper(self, conntrack_helper_id): + return self._managed_conntrack_helpers.get(conntrack_helper_id) + + def get_managed_conntrack_helpers(self): + return self._managed_conntrack_helpers + + def del_conntrack_helpers(self, conntrack_helpers): + for cth in conntrack_helpers: + if not self.get_conntack_helper(cth.id): + continue + del self._managed_conntrack_helpers[cth.id] + self._router_conntrack_helper_mapping[cth.router_id].remove( + cth.id) + if not self._router_conntrack_helper_mapping[cth.router_id]: + del self._router_conntrack_helper_mapping[cth.router_id] + + def clear_by_router_id(self, router_id): + router_cth_ids = self._router_conntrack_helper_mapping.get(router_id) + if not router_cth_ids: + return + for cth_id in router_cth_ids: + del self._managed_conntrack_helpers[cth_id] + del self._router_conntrack_helper_mapping[router_id] + + def check_conntrack_helper_changes(self, new_cth): + old_cth = self.get_conntack_helper(new_cth.id) + return old_cth != new_cth + + +class ConntrackHelperAgentExtension(l3_extension.L3AgentExtension): + SUPPORTED_RESOURCE_TYPES = [resources.CONNTRACKHELPER] + + def initialize(self, connection, driver_type): + self.resource_rpc = resources_rpc.ResourcesPullRpcApi() + self._register_rpc_consumers() + self.mapping = ConntrackHelperMapping() + + def _register_rpc_consumers(self): + registry.register(self._handle_notification, resources.CONNTRACKHELPER) + + self._connection = n_rpc.Connection() + endpoints = [resources_rpc.ResourcesPushRpcCallback()] + topic = resources_rpc.resource_type_versioned_topic( + resources.CONNTRACKHELPER) + self._connection.create_consumer(topic, endpoints, fanout=True) + self._connection.consume_in_threads() + + def consume_api(self, agent_api): + self.agent_api = agent_api + + @lockutils.synchronized('conntrack-helpers') + def _handle_notification(self, context, resource_type, conntrack_helpers, + event_type): + for conntrack_helper in conntrack_helpers: + router_info = self.agent_api.get_router_info( + conntrack_helper.router_id) + if not router_info: + return + + iptables_manager = self._get_iptables_manager(router_info) + + if event_type == events.CREATED: + self._process_create([conntrack_helper], iptables_manager) + elif event_type == events.UPDATED: + self._process_update([conntrack_helper], iptables_manager) + elif event_type == events.DELETED: + self._process_delete([conntrack_helper], iptables_manager) + + def _get_chain_name(self, id): + return (CONNTRACK_HELPER_CHAIN_PREFIX + id)[ + :constants.MAX_IPTABLES_CHAIN_LEN_WRAP] + + def _install_default_rules(self, iptables_manager, version): + default_rule = '-j %s-%s' % (iptables_manager.wrap_name, + DEFAULT_CONNTRACK_HELPER_CHAIN) + if version == constants.IPv4: + iptables_manager.ipv4['raw'].add_chain( + DEFAULT_CONNTRACK_HELPER_CHAIN) + iptables_manager.ipv4['raw'].add_rule('PREROUTING', default_rule) + elif version == constants.IPv6: + iptables_manager.ipv6['raw'].add_chain( + DEFAULT_CONNTRACK_HELPER_CHAIN) + iptables_manager.ipv6['raw'].add_rule('PREROUTING', default_rule) + iptables_manager.apply() + + def _get_chain_rules_list(self, conntrack_helper, wrap_name): + chain_name = self._get_chain_name(conntrack_helper.id) + chain_rule_list = [(DEFAULT_CONNTRACK_HELPER_CHAIN, + '-j %s-%s' % (wrap_name, chain_name))] + chain_rule_list.append((chain_name, + '-p %(proto)s --dport %(dport)s -j CT ' + '--helper %(helper)s' % + {'proto': conntrack_helper.protocol, + 'dport': conntrack_helper.port, + 'helper': conntrack_helper.helper})) + + return chain_rule_list + + def _rule_apply(self, iptables_manager, conntrack_helper): + tag = CONNTRACK_HELPER_PREFIX + conntrack_helper.id + iptables_manager.ipv4['raw'].clear_rules_by_tag(tag) + iptables_manager.ipv6['raw'].clear_rules_by_tag(tag) + for chain, rule in self._get_chain_rules_list( + conntrack_helper, iptables_manager.wrap_name): + if chain not in iptables_manager.ipv4['raw'].chains: + iptables_manager.ipv4['raw'].add_chain(chain) + if chain not in iptables_manager.ipv6['raw'].chains: + iptables_manager.ipv6['raw'].add_chain(chain) + + iptables_manager.ipv4['raw'].add_rule(chain, rule, tag=tag) + iptables_manager.ipv6['raw'].add_rule(chain, rule, tag=tag) + + def _process_create(self, conntrack_helpers, iptables_manager): + if not conntrack_helpers: + return + + if (DEFAULT_CONNTRACK_HELPER_CHAIN not in + iptables_manager.ipv4['raw'].chains): + self._install_default_rules(iptables_manager, constants.IPv4) + if (DEFAULT_CONNTRACK_HELPER_CHAIN not in + iptables_manager.ipv6['raw'].chains): + self._install_default_rules(iptables_manager, constants.IPv6) + + for conntrack_helper in conntrack_helpers: + self._rule_apply(iptables_manager, conntrack_helper) + + iptables_manager.apply() + self.mapping.set_conntrack_helpers(conntrack_helpers) + + def _process_update(self, conntrack_helpers, iptables_manager): + if not conntrack_helpers: + return + + for conntrack_helper in conntrack_helpers: + if not self.mapping.check_conntrack_helper_changes( + conntrack_helper): + LOG.debug("Skip conntrack helper %s for update, as there is " + "no difference between the memory managed by agent", + conntrack_helper.id) + continue + + current_chain = self._get_chain_name(conntrack_helper.id) + iptables_manager.ipv4['raw'].remove_chain(current_chain) + iptables_manager.ipv6['raw'].remove_chain(current_chain) + + self._rule_apply(iptables_manager, conntrack_helper) + + iptables_manager.apply() + self.mapping.update_conntrack_helpers(conntrack_helpers) + + def _process_delete(self, conntrack_helpers, iptables_manager): + if not conntrack_helpers: + return + + for conntrack_helper in conntrack_helpers: + chain_name = self._get_chain_name(conntrack_helper.id) + iptables_manager.ipv4['raw'].remove_chain(chain_name) + iptables_manager.ipv6['raw'].remove_chain(chain_name) + + iptables_manager.apply() + self.mapping.del_conntrack_helpers(conntrack_helpers) + + def _get_iptables_manager(self, router_info): + if router_info.router.get('distributed'): + return router_info.snat_iptables_manager + + return router_info.iptables_manager + + def check_local_conntrack_helpers(self, context, router_info): + local_ct_helpers = set(self.mapping.get_managed_conntrack_helpers() + .keys()) + new_ct_helpers = [] + updated_cth_helpers = [] + current_ct_helpers = set() + + ct_helpers = self.resource_rpc.bulk_pull( + context, resources.CONNTRACKHELPER, filter_kwargs={ + 'router_id': router_info.router['id']}) + + for cth in ct_helpers: + # Split request conntrack helpers into update, new and current + if (cth.id in self.mapping.get_managed_conntrack_helpers() and + self.mapping.check_conntrack_helper_changes(cth)): + updated_cth_helpers.append(cth) + elif cth.id not in self.mapping.get_managed_conntrack_helpers(): + new_ct_helpers.append(cth) + current_ct_helpers.add(cth.id) + + remove_ct_helpers = [ + self.mapping.get_managed_conntrack_helpers().get(cth_id) for cth_id + in local_ct_helpers.difference(current_ct_helpers)] + + iptables_manager = self._get_iptables_manager(router_info) + + self._process_update(updated_cth_helpers, iptables_manager) + self._process_create(new_ct_helpers, iptables_manager) + self._process_delete(remove_ct_helpers, iptables_manager) + + def process_conntrack_helper(self, context, data): + router_info = self.agent_api.get_router_info(data['id']) + if not router_info: + LOG.debug("Router %s is not managed by this agent. " + "It was possibly deleted concurrently.", data['id']) + return + + self.check_local_conntrack_helpers(context, router_info) + + @lockutils.synchronized('conntrack-helpers') + def add_router(self, context, data): + self.process_conntrack_helper(context, data) + + @lockutils.synchronized('conntrack-helpers') + def update_router(self, context, data): + self.process_conntrack_helper(context, data) + + def delete_router(self, context, data): + self.mapping.clear_by_router_id(data['id']) + + def ha_state_change(self, context, data): + pass diff --git a/neutron/tests/functional/agent/l3/extensions/test_conntrack_helper_extension.py b/neutron/tests/functional/agent/l3/extensions/test_conntrack_helper_extension.py new file mode 100644 index 00000000000..a96cc3b6fce --- /dev/null +++ b/neutron/tests/functional/agent/l3/extensions/test_conntrack_helper_extension.py @@ -0,0 +1,136 @@ +# Copyright (c) 2019 Red Hat Inc. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import collections + +import mock + +from neutron_lib import constants +from oslo_utils import uuidutils + +from neutron.agent.l3 import agent as neutron_l3_agent +from neutron.agent.l3.extensions import conntrack_helper +from neutron.agent.linux import iptables_manager as iptable_mng +from neutron.common import utils as common_utils +from neutron.objects import conntrack_helper as cth_obj +from neutron.tests.functional.agent.l3 import framework +from neutron.tests.functional.agent.l3 import test_dvr_router + + +class L3AgentConntrackHelperExtensionTestFramework( + framework.L3AgentTestFramework): + + def setUp(self): + super(L3AgentConntrackHelperExtensionTestFramework, self).setUp() + self.conf.set_override('extensions', ['conntrack_helper'], 'agent') + self.agent = neutron_l3_agent.L3NATAgentWithStateReport('agent1', + self.conf) + + self.cth_ext = conntrack_helper.ConntrackHelperAgentExtension() + + self.router_id1 = uuidutils.generate_uuid() + self.router_id2 = uuidutils.generate_uuid() + self.conntrackhelper1 = cth_obj.ConntrackHelper( + context=None, id=uuidutils.generate_uuid(), protocol='udp', + port=69, helper='tftp', router_id=self.router_id1) + self.conntrackhelper2 = cth_obj.ConntrackHelper( + context=None, id=uuidutils.generate_uuid(), protocol='tcp', + port=21, helper='ftp', router_id=self.router_id2) + + self.conntrack_helpers = [self.conntrackhelper1, self.conntrackhelper2] + + self.managed_cths = {} + self.managed_cths[self.conntrackhelper1.id] = self.conntrackhelper1 + self.managed_cths[self.conntrackhelper2.id] = self.conntrackhelper2 + + self.router_cth_map = collections.defaultdict(set) + self.router_cth_map[self.router_id1].add(self.conntrackhelper1.id) + self.router_cth_map[self.router_id2].add(self.conntrackhelper2.id) + + self._set_bulk_poll_mock() + + def _set_bulk_poll_mock(self): + + def _bulk_pull_mock(context, resource_type, filter_kwargs=None): + if 'router_id' in filter_kwargs: + result = [] + for cthobj in self.conntrack_helpers: + if cthobj.router_id in filter_kwargs['router_id']: + result.append(cthobj) + return result + return self.conntrack_helpers + + self.bulk_pull = mock.patch('neutron.api.rpc.handlers.resources_rpc.' + 'ResourcesPullRpcApi.bulk_pull').start() + self.bulk_pull.side_effect = _bulk_pull_mock + + def _assert_conntrack_helper_iptables_is_set(self, router_info, cth): + iptables_manager = self.cth_ext._get_iptables_manager(router_info) + tag = conntrack_helper.CONNTRACK_HELPER_PREFIX + cth.id + chain_name = (conntrack_helper.CONNTRACK_HELPER_CHAIN_PREFIX + + cth.id)[:constants.MAX_IPTABLES_CHAIN_LEN_WRAP] + rule = ('-p %s --dport %s -j CT --helper %s' % + (cth.protocol, cth.port, cth.helper)) + + rule_obj = iptable_mng.IptablesRule(chain_name, rule, True, False, + iptables_manager.wrap_name, tag, + None) + + def check_chain_rules_set(): + existing_ipv4_chains = iptables_manager.ipv4['raw'].chains + existing_ipv6_chains = iptables_manager.ipv6['raw'].chains + if (chain_name not in existing_ipv4_chains or + chain_name not in existing_ipv6_chains): + return False + existing_ipv4_rules = iptables_manager.ipv4['raw'].rules + existing_ipv6_rules = iptables_manager.ipv6['raw'].rules + return (rule_obj in existing_ipv4_rules and + rule_obj in existing_ipv6_rules) + + common_utils.wait_until_true(check_chain_rules_set) + + def _test_centralized_routers(self, router_info): + router_id = router_info['id'] + for cthobj in self.conntrack_helpers: + cthobj.router_id = router_id + router_info['managed_conntrack_helpers'] = self.managed_cths + router_info['router_conntrack_helper_mapping'] = self.router_cth_map + ri = self.manage_router(self.agent, router_info) + for cthobj in self.conntrack_helpers: + self._assert_conntrack_helper_iptables_is_set(ri, cthobj) + + +class TestL3AgentConntrackHelperExtension( + test_dvr_router.DvrRouterTestFramework, + L3AgentConntrackHelperExtensionTestFramework): + + def test_legacy_router_conntrack_helper(self): + router_info = self.generate_router_info(enable_ha=False) + self._test_centralized_routers(router_info) + + def test_ha_router_conntrack_helper(self): + router_info = self.generate_router_info(enable_ha=True) + self._test_centralized_routers(router_info) + + def test_dvr_edge_router(self): + self.agent.conf.agent_mode = constants.L3_AGENT_MODE_DVR_SNAT + router_info = self.generate_dvr_router_info(enable_ha=False) + self._test_centralized_routers(router_info) + + def test_dvr_ha_router(self): + self.agent.conf.agent_mode = constants.L3_AGENT_MODE_DVR_SNAT + router_info = self.generate_dvr_router_info(enable_ha=True) + self._test_centralized_routers(router_info) diff --git a/neutron/tests/functional/agent/l3/test_dvr_router.py b/neutron/tests/functional/agent/l3/test_dvr_router.py index 1784715eac9..f883a83ddfd 100644 --- a/neutron/tests/functional/agent/l3/test_dvr_router.py +++ b/neutron/tests/functional/agent/l3/test_dvr_router.py @@ -43,7 +43,145 @@ from neutron.tests.functional.agent.l3 import framework DEVICE_OWNER_COMPUTE = lib_constants.DEVICE_OWNER_COMPUTE_PREFIX + 'fake' -class TestDvrRouter(framework.L3AgentTestFramework): +class DvrRouterTestFramework(framework.L3AgentTestFramework): + + def generate_dvr_router_info(self, + enable_ha=False, + enable_snat=False, + enable_gw=True, + snat_bound_fip=False, + agent=None, + extra_routes=False, + enable_floating_ip=True, + enable_centralized_fip=False, + vrrp_id=None, + **kwargs): + if not agent: + agent = self.agent + router = l3_test_common.prepare_router_data( + enable_snat=enable_snat, + enable_floating_ip=enable_floating_ip, + enable_ha=enable_ha, + extra_routes=extra_routes, + num_internal_ports=2, + enable_gw=enable_gw, + snat_bound_fip=snat_bound_fip, + vrrp_id=vrrp_id, + **kwargs) + internal_ports = router.get(lib_constants.INTERFACE_KEY, []) + router['distributed'] = True + router['gw_port_host'] = agent.conf.host + if enable_floating_ip: + for floating_ip in router[lib_constants.FLOATINGIP_KEY]: + floating_ip['host'] = agent.conf.host + + if enable_floating_ip and enable_centralized_fip: + # For centralizing the fip, we are emulating the legacy + # router behavior were the fip dict does not contain any + # host information. + router[lib_constants.FLOATINGIP_KEY][0]['host'] = None + + # In order to test the mixed dvr_snat and compute scenario, we create + # two floating IPs, one is distributed, another is centralized. + # The distributed floating IP should have the host, which was + # just set to None above, then we set it back. The centralized + # floating IP has host None, and this IP will be used to test + # migration from centralized to distributed. + if snat_bound_fip: + router[lib_constants.FLOATINGIP_KEY][0]['host'] = agent.conf.host + router[lib_constants.FLOATINGIP_KEY][1][ + lib_constants.DVR_SNAT_BOUND] = True + router[lib_constants.FLOATINGIP_KEY][1]['host'] = None + + if enable_gw: + external_gw_port = router['gw_port'] + router['gw_port'][portbindings.HOST_ID] = agent.conf.host + self._add_snat_port_info_to_router(router, internal_ports) + # FIP has a dependency on external gateway. So we need to create + # the snat_port info and fip_agent_gw_port_info irrespective of + # the agent type the dvr supports. The namespace creation is + # dependent on the agent_type. + if enable_floating_ip: + for index, floating_ip in enumerate(router['_floatingips']): + floating_ip['floating_network_id'] = ( + external_gw_port['network_id']) + floating_ip['port_id'] = internal_ports[index]['id'] + floating_ip['status'] = 'ACTIVE' + + self._add_fip_agent_gw_port_info_to_router(router, + external_gw_port) + # Router creation is delegated to router_factory. We have to + # re-register here so that factory can find override agent mode + # normally. + self.agent._register_router_cls(self.agent.router_factory) + return router + + def _add_fip_agent_gw_port_info_to_router(self, router, external_gw_port): + # Add fip agent gateway port information to the router_info + fip_gw_port_list = router.get( + lib_constants.FLOATINGIP_AGENT_INTF_KEY, []) + if not fip_gw_port_list and external_gw_port: + # Get values from external gateway port + fixed_ip = external_gw_port['fixed_ips'][0] + float_subnet = external_gw_port['subnets'][0] + port_ip = fixed_ip['ip_address'] + # Pick an ip address which is not the same as port_ip + fip_gw_port_ip = str(netaddr.IPAddress(port_ip) + 5) + # Add floatingip agent gateway port info to router + prefixlen = netaddr.IPNetwork(float_subnet['cidr']).prefixlen + router[lib_constants.FLOATINGIP_AGENT_INTF_KEY] = [ + {'subnets': [ + {'cidr': float_subnet['cidr'], + 'gateway_ip': float_subnet['gateway_ip'], + 'id': fixed_ip['subnet_id']}], + 'extra_subnets': external_gw_port['extra_subnets'], + 'network_id': external_gw_port['network_id'], + 'device_owner': lib_constants.DEVICE_OWNER_AGENT_GW, + 'mac_address': 'fa:16:3e:80:8d:89', + portbindings.HOST_ID: self.agent.conf.host, + 'fixed_ips': [{'subnet_id': fixed_ip['subnet_id'], + 'ip_address': fip_gw_port_ip, + 'prefixlen': prefixlen}], + 'id': framework._uuid(), + 'device_id': framework._uuid()} + ] + + def _add_snat_port_info_to_router(self, router, internal_ports): + # Add snat port information to the router + snat_port_list = router.get(lib_constants.SNAT_ROUTER_INTF_KEY, []) + if not snat_port_list and internal_ports: + router[lib_constants.SNAT_ROUTER_INTF_KEY] = [] + for port in internal_ports: + # Get values from internal port + fixed_ip = port['fixed_ips'][0] + snat_subnet = port['subnets'][0] + port_ip = fixed_ip['ip_address'] + # Pick an ip address which is not the same as port_ip + snat_ip = str(netaddr.IPAddress(port_ip) + 5) + # Add the info to router as the first snat port + # in the list of snat ports + prefixlen = netaddr.IPNetwork(snat_subnet['cidr']).prefixlen + snat_router_port = { + 'subnets': [ + {'cidr': snat_subnet['cidr'], + 'gateway_ip': snat_subnet['gateway_ip'], + 'id': fixed_ip['subnet_id']}], + 'network_id': port['network_id'], + 'device_owner': lib_constants.DEVICE_OWNER_ROUTER_SNAT, + 'mac_address': 'fa:16:3e:80:8d:89', + 'fixed_ips': [{'subnet_id': fixed_ip['subnet_id'], + 'ip_address': snat_ip, + 'prefixlen': prefixlen}], + 'id': framework._uuid(), + 'device_id': framework._uuid()} + # Get the address scope if there is any + if 'address_scopes' in port: + snat_router_port['address_scopes'] = port['address_scopes'] + router[lib_constants.SNAT_ROUTER_INTF_KEY].append( + snat_router_port) + + +class TestDvrRouter(DvrRouterTestFramework, framework.L3AgentTestFramework): def manage_router(self, agent, router): def _safe_fipnamespace_delete_on_ext_net(ext_net_id): try: @@ -513,77 +651,6 @@ class TestDvrRouter(framework.L3AgentTestFramework): self._assert_router_does_not_exist(router) self._assert_snat_namespace_does_not_exist(router) - def generate_dvr_router_info(self, - enable_ha=False, - enable_snat=False, - enable_gw=True, - snat_bound_fip=False, - agent=None, - extra_routes=False, - enable_floating_ip=True, - enable_centralized_fip=False, - vrrp_id=None, - **kwargs): - if not agent: - agent = self.agent - router = l3_test_common.prepare_router_data( - enable_snat=enable_snat, - enable_floating_ip=enable_floating_ip, - enable_ha=enable_ha, - extra_routes=extra_routes, - num_internal_ports=2, - enable_gw=enable_gw, - snat_bound_fip=snat_bound_fip, - vrrp_id=vrrp_id, - **kwargs) - internal_ports = router.get(lib_constants.INTERFACE_KEY, []) - router['distributed'] = True - router['gw_port_host'] = agent.conf.host - if enable_floating_ip: - for floating_ip in router[lib_constants.FLOATINGIP_KEY]: - floating_ip['host'] = agent.conf.host - - if enable_floating_ip and enable_centralized_fip: - # For centralizing the fip, we are emulating the legacy - # router behavior were the fip dict does not contain any - # host information. - router[lib_constants.FLOATINGIP_KEY][0]['host'] = None - - # In order to test the mixed dvr_snat and compute scenario, we create - # two floating IPs, one is distributed, another is centralized. - # The distributed floating IP should have the host, which was - # just set to None above, then we set it back. The centralized - # floating IP has host None, and this IP will be used to test - # migration from centralized to distributed. - if snat_bound_fip: - router[lib_constants.FLOATINGIP_KEY][0]['host'] = agent.conf.host - router[lib_constants.FLOATINGIP_KEY][1][ - lib_constants.DVR_SNAT_BOUND] = True - router[lib_constants.FLOATINGIP_KEY][1]['host'] = None - - if enable_gw: - external_gw_port = router['gw_port'] - router['gw_port'][portbindings.HOST_ID] = agent.conf.host - self._add_snat_port_info_to_router(router, internal_ports) - # FIP has a dependency on external gateway. So we need to create - # the snat_port info and fip_agent_gw_port_info irrespective of - # the agent type the dvr supports. The namespace creation is - # dependent on the agent_type. - if enable_floating_ip: - for index, floating_ip in enumerate(router['_floatingips']): - floating_ip['floating_network_id'] = ( - external_gw_port['network_id']) - floating_ip['port_id'] = internal_ports[index]['id'] - floating_ip['status'] = 'ACTIVE' - - self._add_fip_agent_gw_port_info_to_router(router, - external_gw_port) - # Router creation is delegated to router_factory. We have to - # re-register here so that factory can find override agent mode - # normally. - self.agent._register_router_cls(self.agent.router_factory) - return router - def _get_fip_agent_gw_port_for_router(self, external_gw_port): # Add fip agent gateway port information to the router_info if external_gw_port: @@ -613,70 +680,6 @@ class TestDvrRouter(framework.L3AgentTestFramework): } return fip_agent_gw_port_info - def _add_fip_agent_gw_port_info_to_router(self, router, external_gw_port): - # Add fip agent gateway port information to the router_info - fip_gw_port_list = router.get( - lib_constants.FLOATINGIP_AGENT_INTF_KEY, []) - if not fip_gw_port_list and external_gw_port: - # Get values from external gateway port - fixed_ip = external_gw_port['fixed_ips'][0] - float_subnet = external_gw_port['subnets'][0] - port_ip = fixed_ip['ip_address'] - # Pick an ip address which is not the same as port_ip - fip_gw_port_ip = str(netaddr.IPAddress(port_ip) + 5) - # Add floatingip agent gateway port info to router - prefixlen = netaddr.IPNetwork(float_subnet['cidr']).prefixlen - router[lib_constants.FLOATINGIP_AGENT_INTF_KEY] = [ - {'subnets': [ - {'cidr': float_subnet['cidr'], - 'gateway_ip': float_subnet['gateway_ip'], - 'id': fixed_ip['subnet_id']}], - 'extra_subnets': external_gw_port['extra_subnets'], - 'network_id': external_gw_port['network_id'], - 'device_owner': lib_constants.DEVICE_OWNER_AGENT_GW, - 'mac_address': 'fa:16:3e:80:8d:89', - portbindings.HOST_ID: self.agent.conf.host, - 'fixed_ips': [{'subnet_id': fixed_ip['subnet_id'], - 'ip_address': fip_gw_port_ip, - 'prefixlen': prefixlen}], - 'id': framework._uuid(), - 'device_id': framework._uuid()} - ] - - def _add_snat_port_info_to_router(self, router, internal_ports): - # Add snat port information to the router - snat_port_list = router.get(lib_constants.SNAT_ROUTER_INTF_KEY, []) - if not snat_port_list and internal_ports: - router[lib_constants.SNAT_ROUTER_INTF_KEY] = [] - for port in internal_ports: - # Get values from internal port - fixed_ip = port['fixed_ips'][0] - snat_subnet = port['subnets'][0] - port_ip = fixed_ip['ip_address'] - # Pick an ip address which is not the same as port_ip - snat_ip = str(netaddr.IPAddress(port_ip) + 5) - # Add the info to router as the first snat port - # in the list of snat ports - prefixlen = netaddr.IPNetwork(snat_subnet['cidr']).prefixlen - snat_router_port = { - 'subnets': [ - {'cidr': snat_subnet['cidr'], - 'gateway_ip': snat_subnet['gateway_ip'], - 'id': fixed_ip['subnet_id']}], - 'network_id': port['network_id'], - 'device_owner': lib_constants.DEVICE_OWNER_ROUTER_SNAT, - 'mac_address': 'fa:16:3e:80:8d:89', - 'fixed_ips': [{'subnet_id': fixed_ip['subnet_id'], - 'ip_address': snat_ip, - 'prefixlen': prefixlen}], - 'id': framework._uuid(), - 'device_id': framework._uuid()} - # Get the address scope if there is any - if 'address_scopes' in port: - snat_router_port['address_scopes'] = port['address_scopes'] - router[lib_constants.SNAT_ROUTER_INTF_KEY].append( - snat_router_port) - def _assert_dvr_external_device(self, router): external_port = router.get_ex_gw_port() snat_ns_name = dvr_snat_ns.SnatNamespace.get_snat_ns_name( diff --git a/neutron/tests/unit/agent/l3/extensions/test_conntrack_helper.py b/neutron/tests/unit/agent/l3/extensions/test_conntrack_helper.py new file mode 100644 index 00000000000..4d396261728 --- /dev/null +++ b/neutron/tests/unit/agent/l3/extensions/test_conntrack_helper.py @@ -0,0 +1,317 @@ +# Copyright (c) 2019 Red Hat Inc. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from neutron_lib import constants +from neutron_lib import context +from oslo_utils import uuidutils + +from neutron.agent.l3 import agent as l3_agent +from neutron.agent.l3.extensions import conntrack_helper as cth +from neutron.agent.l3 import l3_agent_extension_api as l3_ext_api +from neutron.agent.l3 import router_info as l3router +from neutron.agent.linux import iptables_manager +from neutron.api.rpc.callbacks.consumer import registry +from neutron.api.rpc.callbacks import resources +from neutron.api.rpc.handlers import resources_rpc +from neutron.objects import conntrack_helper as cth_obj +from neutron.tests import base +from neutron.tests.unit.agent.l3 import test_agent + + +BINARY_NAME = iptables_manager.get_binary_name() +DEFAULT_RULE = ('PREROUTING', '-j %s-' % BINARY_NAME + + cth.DEFAULT_CONNTRACK_HELPER_CHAIN) +HOSTNAME = 'testhost' + + +class ConntrackHelperExtensionBaseTestCase( + test_agent.BasicRouterOperationsFramework): + + def setUp(self): + super(ConntrackHelperExtensionBaseTestCase, self).setUp() + + self.cth_ext = cth.ConntrackHelperAgentExtension() + + self.context = context.get_admin_context() + self.connection = mock.Mock() + + self.router_id = uuidutils.generate_uuid() + self.conntrack_helper1 = cth_obj.ConntrackHelper( + context=None, id=uuidutils.generate_uuid(), protocol='udp', + port=69, helper='tftp', router_id=self.router_id) + + self.agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) + self.router = {'id': self.router_id, + 'ha': False, + 'distributed': False} + self.router_info = l3router.RouterInfo(self.agent, self.router_id, + self.router, **self.ri_kwargs) + self.agent.router_info[self.router['id']] = self.router_info + + self.get_router_info = mock.patch( + 'neutron.agent.l3.l3_agent_extension_api.' + 'L3AgentExtensionAPI.get_router_info').start() + self.get_router_info.return_value = self.router_info + + self.agent_api = l3_ext_api.L3AgentExtensionAPI(None, None) + self.cth_ext.consume_api(self.agent_api) + + self.conntrack_helpers = [self.conntrack_helper1] + + +class ConntrackHelperExtensionInitializeTestCase( + ConntrackHelperExtensionBaseTestCase): + + @mock.patch.object(registry, 'register') + @mock.patch.object(resources_rpc, 'ResourcesPushRpcCallback') + def test_initialize_subscribed_to_rpc(self, rpc_mock, subscribe_mock): + call_to_patch = 'neutron_lib.rpc.Connection' + with mock.patch(call_to_patch, + return_value=self.connection) as create_connection: + self.cth_ext.initialize( + self.connection, constants.L3_AGENT_MODE) + create_connection.assert_has_calls([mock.call()]) + self.connection.create_consumer.assert_has_calls( + [mock.call( + resources_rpc.resource_type_versioned_topic( + resources.CONNTRACKHELPER), + [rpc_mock()], + fanout=True)] + ) + subscribe_mock.assert_called_with( + mock.ANY, resources.CONNTRACKHELPER) + + +class ConntrackHelperExtensionTestCase(ConntrackHelperExtensionBaseTestCase): + + def setUp(self): + super(ConntrackHelperExtensionTestCase, self).setUp() + self.cth_ext.initialize( + self.connection, constants.L3_AGENT_MODE) + self._set_bulk_pull_mock() + + def _set_bulk_pull_mock(self): + + def _bulk_pull_mock(context, resource_type, filter_kwargs=None): + if 'router_id' in filter_kwargs: + result = [] + for cthobj in self.conntrack_helpers: + if cthobj.router_id in filter_kwargs['router_id']: + result.append(cthobj) + return result + return self.conntrack_helpers + self.bulk_pull = mock.patch( + 'neutron.api.rpc.handlers.resources_rpc.' + 'ResourcesPullRpcApi.bulk_pull').start() + self.bulk_pull.side_effect = _bulk_pull_mock + + @mock.patch.object(iptables_manager.IptablesTable, 'add_rule') + @mock.patch.object(iptables_manager.IptablesTable, 'add_chain') + def test_create_router(self, mock_add_chain, mock_add_rule): + self.cth_ext.add_router(self.context, self.router) + + chain_name = (cth.CONNTRACK_HELPER_CHAIN_PREFIX + + self.conntrack_helper1.id)[ + :constants.MAX_IPTABLES_CHAIN_LEN_WRAP] + chain_rule = ('-p %(protocol)s --dport %(dport)s -j CT --helper ' + '%(helper)s' % + {'protocol': self.conntrack_helper1.protocol, + 'dport': self.conntrack_helper1.port, + 'helper': self.conntrack_helper1.helper}) + tag = cth.CONNTRACK_HELPER_PREFIX + self.conntrack_helper1.id + + self.assertEqual(mock_add_chain.call_count, 6) + self.assertEqual(mock_add_rule.call_count, 6) + + mock_add_chain.assert_has_calls([ + mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN), + mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN), + mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN), + mock.call(chain_name), + mock.call(chain_name) + ]) + + mock_add_rule.assert_has_calls([ + mock.call(DEFAULT_RULE[0], DEFAULT_RULE[1]), + mock.call(DEFAULT_RULE[0], DEFAULT_RULE[1]), + mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN, '-j %s-' % + BINARY_NAME + chain_name, tag=tag), + mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN, '-j %s-' % + BINARY_NAME + chain_name, tag=tag), + mock.call(chain_name, chain_rule, tag=tag), + mock.call(chain_name, chain_rule, tag=tag) + ]) + + @mock.patch.object(iptables_manager.IptablesTable, 'add_rule') + @mock.patch.object(iptables_manager.IptablesTable, 'add_chain') + def test_update_roter(self, mock_add_chain, mock_add_rule): + self.cth_ext.add_router(self.context, self.router) + mock_add_chain.reset_mock() + mock_add_rule.reset_mock() + self.cth_ext.update_router(self.context, self.router) + mock_add_chain.assert_not_called() + mock_add_rule.assert_not_called() + + @mock.patch.object(iptables_manager.IptablesTable, 'add_rule') + @mock.patch.object(iptables_manager.IptablesTable, 'add_chain') + def test_add_conntrack_helper_update_router(self, mock_add_chain, + mock_add_rule): + self.cth_ext.add_router(self.context, self.router) + # Create another conntrack helper with the same router_id + mock_add_chain.reset_mock() + mock_add_rule.reset_mock() + + test_conntrackhelper = cth_obj.ConntrackHelper( + context=None, + id=uuidutils.generate_uuid(), + protocol='tcp', + port=21, + helper='ftp', + router_id=self.conntrack_helper1.router_id) + self.conntrack_helpers.append(test_conntrackhelper) + self.cth_ext.update_router(self.context, self.router) + + chain_name = (cth.CONNTRACK_HELPER_CHAIN_PREFIX + + test_conntrackhelper.id)[ + :constants.MAX_IPTABLES_CHAIN_LEN_WRAP] + chain_rule = ('-p %(protocol)s --dport %(dport)s -j CT --helper ' + '%(helper)s' % + {'protocol': test_conntrackhelper.protocol, + 'dport': test_conntrackhelper.port, + 'helper': test_conntrackhelper.helper}) + tag = cth.CONNTRACK_HELPER_PREFIX + test_conntrackhelper.id + + self.assertEqual(mock_add_chain.call_count, 6) + self.assertEqual(mock_add_rule.call_count, 6) + + mock_add_chain.assert_has_calls([ + mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN), + mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN), + mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN), + mock.call(chain_name), + mock.call(chain_name) + ]) + + mock_add_rule.assert_has_calls([ + mock.call(DEFAULT_RULE[0], DEFAULT_RULE[1]), + mock.call(DEFAULT_RULE[0], DEFAULT_RULE[1]), + mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN, '-j %s-' % + BINARY_NAME + chain_name, tag=tag), + mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN, '-j %s-' % + BINARY_NAME + chain_name, tag=tag), + mock.call(chain_name, chain_rule, tag=tag), + mock.call(chain_name, chain_rule, tag=tag) + ]) + + @mock.patch.object(cth.ConntrackHelperMapping, 'clear_by_router_id') + def test_delete_router(self, mock_clear_by_router_id): + router_data = {'id': self.router_id, + 'ha': False, + 'distributed': False} + self.cth_ext.delete_router(self.context, router_data) + mock_clear_by_router_id.assert_called_with(self.router_id) + + +class ConntrackHelperMappingTestCase(base.BaseTestCase): + def setUp(self): + super(ConntrackHelperMappingTestCase, self).setUp() + self.mapping = cth.ConntrackHelperMapping() + self.router1 = uuidutils.generate_uuid() + self.router2 = uuidutils.generate_uuid() + self.conntrack_helper1 = cth_obj.ConntrackHelper( + context=None, id=uuidutils.generate_uuid(), protocol='udp', + port=69, helper='tftp', router_id=self.router1) + self.conntrack_helper2 = cth_obj.ConntrackHelper( + context=None, id=uuidutils.generate_uuid(), protocol='udp', + port=69, helper='tftp', router_id=self.router2) + self.conntrack_helper3 = cth_obj.ConntrackHelper( + context=None, id=uuidutils.generate_uuid(), protocol='udp', + port=21, helper='ftp', router_id=self.router1) + self.conntrack_helper4 = cth_obj.ConntrackHelper( + context=None, id=uuidutils.generate_uuid(), protocol='udp', + port=21, helper='ftp', router_id=self.router2) + self.conntrack_helper_dict = { + self.conntrack_helper1.id: self.conntrack_helper1, + self.conntrack_helper2.id: self.conntrack_helper2, + self.conntrack_helper3.id: self.conntrack_helper3, + self.conntrack_helper4.id: self.conntrack_helper4} + + def _set_cth(self): + self.mapping.set_conntrack_helpers( + self.conntrack_helper_dict.values()) + + def test_set_conntrack_helpers(self): + self._set_cth() + cth_ids = self.conntrack_helper_dict.keys() + managed_cths = self.mapping.get_managed_conntrack_helpers() + + for cth_id, obj in managed_cths.items(): + self.assertIn(cth_id, cth_ids) + self.assertEqual(obj, self.conntrack_helper_dict[cth_id]) + self.assertEqual( + len(cth_ids), len(managed_cths.keys())) + + def test_update_conntrack_helper(self): + self._set_cth() + new_conntrack_helper1 = cth_obj.ConntrackHelper( + context=None, id=self.conntrack_helper1.id, protocol='udp', + port=6969, helper='tftp', router_id=self.router1) + self.mapping.update_conntrack_helpers([new_conntrack_helper1]) + managed_cths = self.mapping.get_managed_conntrack_helpers() + self.assertEqual( + new_conntrack_helper1, + managed_cths[self.conntrack_helper1.id]) + for router_id in self.mapping._router_conntrack_helper_mapping.keys(): + self.assertIn(router_id, [self.router1, self.router2]) + self.assertEqual( + len([self.router1, self.router2]), + len(self.mapping._router_conntrack_helper_mapping.keys())) + + def test_del_conntrack_helper(self): + self._set_cth() + self.mapping.del_conntrack_helpers([self.conntrack_helper3, + self.conntrack_helper2, + self.conntrack_helper4]) + managed_cths = self.mapping.get_managed_conntrack_helpers() + self.assertEqual([self.conntrack_helper1.id], + list(managed_cths.keys())) + self.assertNotIn(self.conntrack_helper3.id, + self.mapping._router_conntrack_helper_mapping[ + self.conntrack_helper3.router_id]) + self.assertNotIn(self.router2, + self.mapping._router_conntrack_helper_mapping.keys()) + + def test_clear_by_router_id(self): + self._set_cth() + self.mapping.clear_by_router_id(self.router2) + managed_cths = self.mapping.get_managed_conntrack_helpers() + self.assertNotIn(self.conntrack_helper2, managed_cths.keys()) + self.assertNotIn(self.conntrack_helper4, managed_cths.keys()) + + def test_check_conntrack_helper_changes(self): + self._set_cth() + new_cth = cth_obj.ConntrackHelper( + context=None, id=self.conntrack_helper1.id, protocol='udp', + port=6969, helper='tftp', router_id=self.router1) + self.assertTrue(self.mapping.check_conntrack_helper_changes(new_cth)) + + def test_check_conntrack_helper_changes_no_change(self): + self._set_cth() + new_cth = cth_obj.ConntrackHelper( + context=None, id=self.conntrack_helper1.id, protocol='udp', + port=69, helper='tftp', router_id=self.router1) + self.assertFalse(self.mapping.check_conntrack_helper_changes(new_cth)) diff --git a/setup.cfg b/setup.cfg index adc74a30259..5a3152d6463 100644 --- a/setup.cfg +++ b/setup.cfg @@ -118,6 +118,7 @@ neutron.agent.l3.extensions = gateway_ip_qos = neutron.agent.l3.extensions.qos.gateway_ip:RouterGatewayIPQosAgentExtension port_forwarding = neutron.agent.l3.extensions.port_forwarding:PortForwardingAgentExtension snat_log = neutron.agent.l3.extensions.snat_log:SNATLoggingExtension + conntrack_helper = neutron.agent.l3.extensions.conntrack_helper:ConntrackHelperAgentExtension neutron.services.logapi.drivers = ovs = neutron.services.logapi.drivers.openvswitch.ovs_firewall_log:OVSFirewallLoggingDriver neutron.qos.agent_drivers =