
In L2 agent extensions, when the agent extension needed access to a datastructure within the L2 agent, an agent extension API object was created. This API object would be the interface permitting agent extensions to have access to those objects internal to the L2 agent. This change implements a similar agent extension API object for the L3 agent extensions. This is necessary to allow L3 agent extensions to have access to the RouterInfo class, so that they can do lookups on it, for example determining the namespace for a specific router. Without this API object, the L3 agent extension would not have access to this structure. Co-Authored-By: Margaret Frances <margaret_frances@cable.comcast.com> Partially-Implements: blueprint l3-agent-extensions Change-Id: I85f89accbeefd820130335674fd56cb54f1449de
707 lines
30 KiB
Python
707 lines
30 KiB
Python
# Copyright 2012 VMware, Inc. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
import eventlet
|
|
import netaddr
|
|
from neutron_lib import constants as lib_const
|
|
from oslo_config import cfg
|
|
from oslo_log import log as logging
|
|
import oslo_messaging
|
|
from oslo_service import loopingcall
|
|
from oslo_service import periodic_task
|
|
from oslo_utils import excutils
|
|
from oslo_utils import timeutils
|
|
|
|
from neutron._i18n import _, _LE, _LI, _LW
|
|
from neutron.agent.common import utils as common_utils
|
|
from neutron.agent.l3 import dvr
|
|
from neutron.agent.l3 import dvr_edge_ha_router
|
|
from neutron.agent.l3 import dvr_edge_router as dvr_router
|
|
from neutron.agent.l3 import dvr_local_router as dvr_local_router
|
|
from neutron.agent.l3 import ha
|
|
from neutron.agent.l3 import ha_router
|
|
from neutron.agent.l3 import l3_agent_extension_api as l3_ext_api
|
|
from neutron.agent.l3 import l3_agent_extensions_manager as l3_ext_manager
|
|
from neutron.agent.l3 import legacy_router
|
|
from neutron.agent.l3 import namespace_manager
|
|
from neutron.agent.l3 import namespaces
|
|
from neutron.agent.l3 import router_processing_queue as queue
|
|
from neutron.agent.linux import external_process
|
|
from neutron.agent.linux import ip_lib
|
|
from neutron.agent.linux import pd
|
|
from neutron.agent.metadata import driver as metadata_driver
|
|
from neutron.agent import rpc as agent_rpc
|
|
from neutron.callbacks import events
|
|
from neutron.callbacks import registry
|
|
from neutron.callbacks import resources
|
|
from neutron.common import constants as l3_constants
|
|
from neutron.common import exceptions as n_exc
|
|
from neutron.common import ipv6_utils
|
|
from neutron.common import rpc as n_rpc
|
|
from neutron.common import topics
|
|
from neutron import context as n_context
|
|
from neutron import manager
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
# TODO(Carl) Following constants retained to increase SNR during refactoring
|
|
NS_PREFIX = namespaces.NS_PREFIX
|
|
INTERNAL_DEV_PREFIX = namespaces.INTERNAL_DEV_PREFIX
|
|
EXTERNAL_DEV_PREFIX = namespaces.EXTERNAL_DEV_PREFIX
|
|
|
|
# Number of routers to fetch from server at a time on resync.
|
|
# Needed to reduce load on server side and to speed up resync on agent side.
|
|
SYNC_ROUTERS_MAX_CHUNK_SIZE = 256
|
|
SYNC_ROUTERS_MIN_CHUNK_SIZE = 32
|
|
|
|
|
|
class L3PluginApi(object):
|
|
"""Agent side of the l3 agent RPC API.
|
|
|
|
API version history:
|
|
1.0 - Initial version.
|
|
1.1 - Floating IP operational status updates
|
|
1.2 - DVR support: new L3 plugin methods added.
|
|
- get_ports_by_subnet
|
|
- get_agent_gateway_port
|
|
Needed by the agent when operating in DVR/DVR_SNAT mode
|
|
1.3 - Get the list of activated services
|
|
1.4 - Added L3 HA update_router_state. This method was reworked in
|
|
to update_ha_routers_states
|
|
1.5 - Added update_ha_routers_states
|
|
1.6 - Added process_prefix_update
|
|
1.7 - DVR support: new L3 plugin methods added.
|
|
- delete_agent_gateway_port
|
|
1.8 - Added address scope information
|
|
1.9 - Added get_router_ids
|
|
"""
|
|
|
|
def __init__(self, topic, host):
|
|
self.host = host
|
|
target = oslo_messaging.Target(topic=topic, version='1.0')
|
|
self.client = n_rpc.get_client(target)
|
|
|
|
def get_routers(self, context, router_ids=None):
|
|
"""Make a remote process call to retrieve the sync data for routers."""
|
|
cctxt = self.client.prepare()
|
|
return cctxt.call(context, 'sync_routers', host=self.host,
|
|
router_ids=router_ids)
|
|
|
|
def get_router_ids(self, context):
|
|
"""Make a remote process call to retrieve scheduled routers ids."""
|
|
cctxt = self.client.prepare(version='1.9')
|
|
return cctxt.call(context, 'get_router_ids', host=self.host)
|
|
|
|
def get_external_network_id(self, context):
|
|
"""Make a remote process call to retrieve the external network id.
|
|
|
|
@raise oslo_messaging.RemoteError: with TooManyExternalNetworks as
|
|
exc_type if there are more than one
|
|
external network
|
|
"""
|
|
cctxt = self.client.prepare()
|
|
return cctxt.call(context, 'get_external_network_id', host=self.host)
|
|
|
|
def update_floatingip_statuses(self, context, router_id, fip_statuses):
|
|
"""Call the plugin update floating IPs's operational status."""
|
|
cctxt = self.client.prepare(version='1.1')
|
|
return cctxt.call(context, 'update_floatingip_statuses',
|
|
router_id=router_id, fip_statuses=fip_statuses)
|
|
|
|
def get_ports_by_subnet(self, context, subnet_id):
|
|
"""Retrieve ports by subnet id."""
|
|
cctxt = self.client.prepare(version='1.2')
|
|
return cctxt.call(context, 'get_ports_by_subnet', host=self.host,
|
|
subnet_id=subnet_id)
|
|
|
|
def get_agent_gateway_port(self, context, fip_net):
|
|
"""Get or create an agent_gateway_port."""
|
|
cctxt = self.client.prepare(version='1.2')
|
|
return cctxt.call(context, 'get_agent_gateway_port',
|
|
network_id=fip_net, host=self.host)
|
|
|
|
def get_service_plugin_list(self, context):
|
|
"""Make a call to get the list of activated services."""
|
|
cctxt = self.client.prepare(version='1.3')
|
|
return cctxt.call(context, 'get_service_plugin_list')
|
|
|
|
def update_ha_routers_states(self, context, states):
|
|
"""Update HA routers states."""
|
|
cctxt = self.client.prepare(version='1.5')
|
|
return cctxt.call(context, 'update_ha_routers_states',
|
|
host=self.host, states=states)
|
|
|
|
def process_prefix_update(self, context, prefix_update):
|
|
"""Process prefix update whenever prefixes get changed."""
|
|
cctxt = self.client.prepare(version='1.6')
|
|
return cctxt.call(context, 'process_prefix_update',
|
|
subnets=prefix_update)
|
|
|
|
def delete_agent_gateway_port(self, context, fip_net):
|
|
"""Delete Floatingip_agent_gateway_port."""
|
|
cctxt = self.client.prepare(version='1.7')
|
|
return cctxt.call(context, 'delete_agent_gateway_port',
|
|
host=self.host, network_id=fip_net)
|
|
|
|
|
|
class L3NATAgent(ha.AgentMixin,
|
|
dvr.AgentMixin,
|
|
manager.Manager):
|
|
"""Manager for L3NatAgent
|
|
|
|
API version history:
|
|
1.0 initial Version
|
|
1.1 changed the type of the routers parameter
|
|
to the routers_updated method.
|
|
It was previously a list of routers in dict format.
|
|
It is now a list of router IDs only.
|
|
Per rpc versioning rules, it is backwards compatible.
|
|
1.2 - DVR support: new L3 agent methods added.
|
|
- add_arp_entry
|
|
- del_arp_entry
|
|
1.3 - fipnamespace_delete_on_ext_net - to delete fipnamespace
|
|
after the external network is removed
|
|
Needed by the L3 service when dealing with DVR
|
|
"""
|
|
target = oslo_messaging.Target(version='1.3')
|
|
|
|
def __init__(self, host, conf=None):
|
|
if conf:
|
|
self.conf = conf
|
|
else:
|
|
self.conf = cfg.CONF
|
|
self.router_info = {}
|
|
|
|
self._check_config_params()
|
|
|
|
self.process_monitor = external_process.ProcessMonitor(
|
|
config=self.conf,
|
|
resource_type='router')
|
|
|
|
self.driver = common_utils.load_interface_driver(self.conf)
|
|
|
|
self.context = n_context.get_admin_context_without_session()
|
|
self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host)
|
|
self.fullsync = True
|
|
self.sync_routers_chunk_size = SYNC_ROUTERS_MAX_CHUNK_SIZE
|
|
|
|
# Get the list of service plugins from Neutron Server
|
|
# This is the first place where we contact neutron-server on startup
|
|
# so retry in case its not ready to respond.
|
|
while True:
|
|
try:
|
|
self.neutron_service_plugins = (
|
|
self.plugin_rpc.get_service_plugin_list(self.context))
|
|
except oslo_messaging.RemoteError as e:
|
|
with excutils.save_and_reraise_exception() as ctx:
|
|
ctx.reraise = False
|
|
LOG.warning(_LW('l3-agent cannot check service plugins '
|
|
'enabled at the neutron server when '
|
|
'startup due to RPC error. It happens '
|
|
'when the server does not support this '
|
|
'RPC API. If the error is '
|
|
'UnsupportedVersion you can ignore this '
|
|
'warning. Detail message: %s'), e)
|
|
self.neutron_service_plugins = None
|
|
except oslo_messaging.MessagingTimeout as e:
|
|
with excutils.save_and_reraise_exception() as ctx:
|
|
ctx.reraise = False
|
|
LOG.warning(_LW('l3-agent cannot contact neutron server '
|
|
'to retrieve service plugins enabled. '
|
|
'Check connectivity to neutron server. '
|
|
'Retrying... '
|
|
'Detailed message: %(msg)s.'), {'msg': e})
|
|
continue
|
|
break
|
|
|
|
self.init_extension_manager(self.plugin_rpc)
|
|
|
|
self.metadata_driver = None
|
|
if self.conf.enable_metadata_proxy:
|
|
self.metadata_driver = metadata_driver.MetadataDriver(self)
|
|
|
|
self.namespaces_manager = namespace_manager.NamespaceManager(
|
|
self.conf,
|
|
self.driver,
|
|
self.metadata_driver)
|
|
|
|
self._queue = queue.RouterProcessingQueue()
|
|
super(L3NATAgent, self).__init__(host=self.conf.host)
|
|
|
|
self.target_ex_net_id = None
|
|
self.use_ipv6 = ipv6_utils.is_enabled()
|
|
|
|
self.pd = pd.PrefixDelegation(self.context, self.process_monitor,
|
|
self.driver,
|
|
self.plugin_rpc.process_prefix_update,
|
|
self.create_pd_router_update,
|
|
self.conf)
|
|
|
|
def _check_config_params(self):
|
|
"""Check items in configuration files.
|
|
|
|
Check for required and invalid configuration items.
|
|
The actual values are not verified for correctness.
|
|
"""
|
|
if not self.conf.interface_driver:
|
|
msg = _LE('An interface driver must be specified')
|
|
LOG.error(msg)
|
|
raise SystemExit(1)
|
|
|
|
if self.conf.ipv6_gateway:
|
|
# ipv6_gateway configured. Check for valid v6 link-local address.
|
|
try:
|
|
msg = _LE("%s used in config as ipv6_gateway is not a valid "
|
|
"IPv6 link-local address."),
|
|
ip_addr = netaddr.IPAddress(self.conf.ipv6_gateway)
|
|
if ip_addr.version != 6 or not ip_addr.is_link_local():
|
|
LOG.error(msg, self.conf.ipv6_gateway)
|
|
raise SystemExit(1)
|
|
except netaddr.AddrFormatError:
|
|
LOG.error(msg, self.conf.ipv6_gateway)
|
|
raise SystemExit(1)
|
|
|
|
def _fetch_external_net_id(self, force=False):
|
|
"""Find UUID of single external network for this agent."""
|
|
if self.conf.gateway_external_network_id:
|
|
return self.conf.gateway_external_network_id
|
|
|
|
# L3 agent doesn't use external_network_bridge to handle external
|
|
# networks, so bridge_mappings with provider networks will be used
|
|
# and the L3 agent is able to handle any external networks.
|
|
if not self.conf.external_network_bridge:
|
|
return
|
|
|
|
if not force and self.target_ex_net_id:
|
|
return self.target_ex_net_id
|
|
|
|
try:
|
|
self.target_ex_net_id = self.plugin_rpc.get_external_network_id(
|
|
self.context)
|
|
return self.target_ex_net_id
|
|
except oslo_messaging.RemoteError as e:
|
|
with excutils.save_and_reraise_exception() as ctx:
|
|
if e.exc_type == 'TooManyExternalNetworks':
|
|
ctx.reraise = False
|
|
msg = _(
|
|
"The 'gateway_external_network_id' option must be "
|
|
"configured for this agent as Neutron has more than "
|
|
"one external network.")
|
|
raise Exception(msg)
|
|
|
|
def _create_router(self, router_id, router):
|
|
args = []
|
|
kwargs = {
|
|
'router_id': router_id,
|
|
'router': router,
|
|
'use_ipv6': self.use_ipv6,
|
|
'agent_conf': self.conf,
|
|
'interface_driver': self.driver,
|
|
}
|
|
|
|
if router.get('distributed'):
|
|
kwargs['agent'] = self
|
|
kwargs['host'] = self.host
|
|
|
|
if router.get('distributed') and router.get('ha'):
|
|
if self.conf.agent_mode == lib_const.L3_AGENT_MODE_DVR_SNAT:
|
|
kwargs['state_change_callback'] = self.enqueue_state_change
|
|
return dvr_edge_ha_router.DvrEdgeHaRouter(*args, **kwargs)
|
|
|
|
if router.get('distributed'):
|
|
if self.conf.agent_mode == lib_const.L3_AGENT_MODE_DVR_SNAT:
|
|
return dvr_router.DvrEdgeRouter(*args, **kwargs)
|
|
else:
|
|
return dvr_local_router.DvrLocalRouter(*args, **kwargs)
|
|
|
|
if router.get('ha'):
|
|
kwargs['state_change_callback'] = self.enqueue_state_change
|
|
return ha_router.HaRouter(*args, **kwargs)
|
|
|
|
return legacy_router.LegacyRouter(*args, **kwargs)
|
|
|
|
def _router_added(self, router_id, router):
|
|
ri = self._create_router(router_id, router)
|
|
registry.notify(resources.ROUTER, events.BEFORE_CREATE,
|
|
self, router=ri)
|
|
|
|
self.router_info[router_id] = ri
|
|
|
|
ri.initialize(self.process_monitor)
|
|
|
|
def _safe_router_removed(self, router_id):
|
|
"""Try to delete a router and return True if successful."""
|
|
|
|
try:
|
|
self._router_removed(router_id)
|
|
self.l3_ext_manager.delete_router(self.context, router_id)
|
|
except Exception:
|
|
LOG.exception(_LE('Error while deleting router %s'), router_id)
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def _router_removed(self, router_id):
|
|
ri = self.router_info.get(router_id)
|
|
if ri is None:
|
|
LOG.warning(_LW("Info for router %s was not found. "
|
|
"Performing router cleanup"), router_id)
|
|
self.namespaces_manager.ensure_router_cleanup(router_id)
|
|
return
|
|
|
|
registry.notify(resources.ROUTER, events.BEFORE_DELETE,
|
|
self, router=ri)
|
|
|
|
ri.delete(self)
|
|
del self.router_info[router_id]
|
|
|
|
registry.notify(resources.ROUTER, events.AFTER_DELETE, self, router=ri)
|
|
|
|
def init_extension_manager(self, connection):
|
|
l3_ext_manager.register_opts(self.conf)
|
|
self.agent_api = l3_ext_api.L3AgentExtensionAPI(self.router_info)
|
|
self.l3_ext_manager = (
|
|
l3_ext_manager.L3AgentExtensionsManager(self.conf))
|
|
self.l3_ext_manager.initialize(
|
|
connection, lib_const.L3_AGENT_MODE,
|
|
self.agent_api)
|
|
|
|
def router_deleted(self, context, router_id):
|
|
"""Deal with router deletion RPC message."""
|
|
LOG.debug('Got router deleted notification for %s', router_id)
|
|
update = queue.RouterUpdate(router_id,
|
|
queue.PRIORITY_RPC,
|
|
action=queue.DELETE_ROUTER)
|
|
self._queue.add(update)
|
|
|
|
def routers_updated(self, context, routers):
|
|
"""Deal with routers modification and creation RPC message."""
|
|
LOG.debug('Got routers updated notification :%s', routers)
|
|
if routers:
|
|
# This is needed for backward compatibility
|
|
if isinstance(routers[0], dict):
|
|
routers = [router['id'] for router in routers]
|
|
for id in routers:
|
|
update = queue.RouterUpdate(id, queue.PRIORITY_RPC)
|
|
self._queue.add(update)
|
|
|
|
def router_removed_from_agent(self, context, payload):
|
|
LOG.debug('Got router removed from agent :%r', payload)
|
|
router_id = payload['router_id']
|
|
update = queue.RouterUpdate(router_id,
|
|
queue.PRIORITY_RPC,
|
|
action=queue.DELETE_ROUTER)
|
|
self._queue.add(update)
|
|
|
|
def router_added_to_agent(self, context, payload):
|
|
LOG.debug('Got router added to agent :%r', payload)
|
|
self.routers_updated(context, payload)
|
|
|
|
def _process_router_if_compatible(self, router):
|
|
if (self.conf.external_network_bridge and
|
|
not ip_lib.device_exists(self.conf.external_network_bridge)):
|
|
LOG.error(_LE("The external network bridge '%s' does not exist"),
|
|
self.conf.external_network_bridge)
|
|
return
|
|
|
|
# Either ex_net_id or handle_internal_only_routers must be set
|
|
ex_net_id = (router['external_gateway_info'] or {}).get('network_id')
|
|
if not ex_net_id and not self.conf.handle_internal_only_routers:
|
|
raise n_exc.RouterNotCompatibleWithAgent(router_id=router['id'])
|
|
|
|
# If target_ex_net_id and ex_net_id are set they must be equal
|
|
target_ex_net_id = self._fetch_external_net_id()
|
|
if (target_ex_net_id and ex_net_id and ex_net_id != target_ex_net_id):
|
|
# Double check that our single external_net_id has not changed
|
|
# by forcing a check by RPC.
|
|
if ex_net_id != self._fetch_external_net_id(force=True):
|
|
raise n_exc.RouterNotCompatibleWithAgent(
|
|
router_id=router['id'])
|
|
|
|
if router['id'] not in self.router_info:
|
|
self._process_added_router(router)
|
|
else:
|
|
self._process_updated_router(router)
|
|
|
|
def _process_added_router(self, router):
|
|
self._router_added(router['id'], router)
|
|
ri = self.router_info[router['id']]
|
|
ri.router = router
|
|
ri.process(self)
|
|
registry.notify(resources.ROUTER, events.AFTER_CREATE, self, router=ri)
|
|
self.l3_ext_manager.add_router(self.context, router)
|
|
|
|
def _process_updated_router(self, router):
|
|
ri = self.router_info[router['id']]
|
|
ri.router = router
|
|
registry.notify(resources.ROUTER, events.BEFORE_UPDATE,
|
|
self, router=ri)
|
|
ri.process(self)
|
|
registry.notify(resources.ROUTER, events.AFTER_UPDATE, self, router=ri)
|
|
self.l3_ext_manager.update_router(self.context, router)
|
|
|
|
def _resync_router(self, router_update,
|
|
priority=queue.PRIORITY_SYNC_ROUTERS_TASK):
|
|
router_update.timestamp = timeutils.utcnow()
|
|
router_update.priority = priority
|
|
router_update.router = None # Force the agent to resync the router
|
|
self._queue.add(router_update)
|
|
|
|
def _process_router_update(self):
|
|
for rp, update in self._queue.each_update_to_next_router():
|
|
LOG.debug("Starting router update for %s, action %s, priority %s",
|
|
update.id, update.action, update.priority)
|
|
if update.action == queue.PD_UPDATE:
|
|
self.pd.process_prefix_update()
|
|
LOG.debug("Finished a router update for %s", update.id)
|
|
continue
|
|
router = update.router
|
|
if update.action != queue.DELETE_ROUTER and not router:
|
|
try:
|
|
update.timestamp = timeutils.utcnow()
|
|
routers = self.plugin_rpc.get_routers(self.context,
|
|
[update.id])
|
|
except Exception:
|
|
msg = _LE("Failed to fetch router information for '%s'")
|
|
LOG.exception(msg, update.id)
|
|
self._resync_router(update)
|
|
continue
|
|
|
|
if routers:
|
|
router = routers[0]
|
|
|
|
if not router:
|
|
removed = self._safe_router_removed(update.id)
|
|
if not removed:
|
|
self._resync_router(update)
|
|
else:
|
|
# need to update timestamp of removed router in case
|
|
# there are older events for the same router in the
|
|
# processing queue (like events from fullsync) in order to
|
|
# prevent deleted router re-creation
|
|
rp.fetched_and_processed(update.timestamp)
|
|
LOG.debug("Finished a router update for %s", update.id)
|
|
continue
|
|
|
|
try:
|
|
self._process_router_if_compatible(router)
|
|
except n_exc.RouterNotCompatibleWithAgent as e:
|
|
LOG.exception(e.msg)
|
|
# Was the router previously handled by this agent?
|
|
if router['id'] in self.router_info:
|
|
LOG.error(_LE("Removing incompatible router '%s'"),
|
|
router['id'])
|
|
self._safe_router_removed(router['id'])
|
|
except Exception:
|
|
msg = _LE("Failed to process compatible router '%s'")
|
|
LOG.exception(msg, update.id)
|
|
self._resync_router(update)
|
|
continue
|
|
|
|
LOG.debug("Finished a router update for %s", update.id)
|
|
rp.fetched_and_processed(update.timestamp)
|
|
|
|
def _process_routers_loop(self):
|
|
LOG.debug("Starting _process_routers_loop")
|
|
pool = eventlet.GreenPool(size=8)
|
|
while True:
|
|
pool.spawn_n(self._process_router_update)
|
|
|
|
# NOTE(kevinbenton): this is set to 1 second because the actual interval
|
|
# is controlled by a FixedIntervalLoopingCall in neutron/service.py that
|
|
# is responsible for task execution.
|
|
@periodic_task.periodic_task(spacing=1, run_immediately=True)
|
|
def periodic_sync_routers_task(self, context):
|
|
if not self.fullsync:
|
|
return
|
|
LOG.debug("Starting fullsync periodic_sync_routers_task")
|
|
|
|
# self.fullsync is True at this point. If an exception -- caught or
|
|
# uncaught -- prevents setting it to False below then the next call
|
|
# to periodic_sync_routers_task will re-enter this code and try again.
|
|
|
|
# Context manager self.namespaces_manager captures a picture of
|
|
# namespaces *before* fetch_and_sync_all_routers fetches the full list
|
|
# of routers from the database. This is important to correctly
|
|
# identify stale ones.
|
|
|
|
try:
|
|
with self.namespaces_manager as ns_manager:
|
|
self.fetch_and_sync_all_routers(context, ns_manager)
|
|
except n_exc.AbortSyncRouters:
|
|
self.fullsync = True
|
|
|
|
def fetch_and_sync_all_routers(self, context, ns_manager):
|
|
prev_router_ids = set(self.router_info)
|
|
curr_router_ids = set()
|
|
timestamp = timeutils.utcnow()
|
|
|
|
try:
|
|
router_ids = self.plugin_rpc.get_router_ids(context)
|
|
# fetch routers by chunks to reduce the load on server and to
|
|
# start router processing earlier
|
|
for i in range(0, len(router_ids), self.sync_routers_chunk_size):
|
|
routers = self.plugin_rpc.get_routers(
|
|
context, router_ids[i:i + self.sync_routers_chunk_size])
|
|
LOG.debug('Processing :%r', routers)
|
|
for r in routers:
|
|
curr_router_ids.add(r['id'])
|
|
ns_manager.keep_router(r['id'])
|
|
if r.get('distributed'):
|
|
# need to keep fip namespaces as well
|
|
ext_net_id = (r['external_gateway_info'] or {}).get(
|
|
'network_id')
|
|
is_snat_agent = (self.conf.agent_mode ==
|
|
lib_const.L3_AGENT_MODE_DVR_SNAT)
|
|
if ext_net_id:
|
|
ns_manager.keep_ext_net(ext_net_id)
|
|
elif is_snat_agent:
|
|
ns_manager.ensure_snat_cleanup(r['id'])
|
|
update = queue.RouterUpdate(
|
|
r['id'],
|
|
queue.PRIORITY_SYNC_ROUTERS_TASK,
|
|
router=r,
|
|
timestamp=timestamp)
|
|
self._queue.add(update)
|
|
except oslo_messaging.MessagingTimeout:
|
|
if self.sync_routers_chunk_size > SYNC_ROUTERS_MIN_CHUNK_SIZE:
|
|
self.sync_routers_chunk_size = max(
|
|
self.sync_routers_chunk_size / 2,
|
|
SYNC_ROUTERS_MIN_CHUNK_SIZE)
|
|
LOG.error(_LE('Server failed to return info for routers in '
|
|
'required time, decreasing chunk size to: %s'),
|
|
self.sync_routers_chunk_size)
|
|
else:
|
|
LOG.error(_LE('Server failed to return info for routers in '
|
|
'required time even with min chunk size: %s. '
|
|
'It might be under very high load or '
|
|
'just inoperable'),
|
|
self.sync_routers_chunk_size)
|
|
raise
|
|
except oslo_messaging.MessagingException:
|
|
LOG.exception(_LE("Failed synchronizing routers due to RPC error"))
|
|
raise n_exc.AbortSyncRouters()
|
|
|
|
self.fullsync = False
|
|
LOG.debug("periodic_sync_routers_task successfully completed")
|
|
# adjust chunk size after successful sync
|
|
if self.sync_routers_chunk_size < SYNC_ROUTERS_MAX_CHUNK_SIZE:
|
|
self.sync_routers_chunk_size = min(
|
|
self.sync_routers_chunk_size + SYNC_ROUTERS_MIN_CHUNK_SIZE,
|
|
SYNC_ROUTERS_MAX_CHUNK_SIZE)
|
|
|
|
# Delete routers that have disappeared since the last sync
|
|
for router_id in prev_router_ids - curr_router_ids:
|
|
ns_manager.keep_router(router_id)
|
|
update = queue.RouterUpdate(router_id,
|
|
queue.PRIORITY_SYNC_ROUTERS_TASK,
|
|
timestamp=timestamp,
|
|
action=queue.DELETE_ROUTER)
|
|
self._queue.add(update)
|
|
|
|
def after_start(self):
|
|
# Note: the FWaaS' vArmourL3NATAgent is a subclass of L3NATAgent. It
|
|
# calls this method here. So Removing this after_start() would break
|
|
# vArmourL3NATAgent. We need to find out whether vArmourL3NATAgent
|
|
# can have L3NATAgentWithStateReport as its base class instead of
|
|
# L3NATAgent.
|
|
eventlet.spawn_n(self._process_routers_loop)
|
|
LOG.info(_LI("L3 agent started"))
|
|
|
|
def create_pd_router_update(self):
|
|
router_id = None
|
|
update = queue.RouterUpdate(router_id,
|
|
queue.PRIORITY_PD_UPDATE,
|
|
timestamp=timeutils.utcnow(),
|
|
action=queue.PD_UPDATE)
|
|
self._queue.add(update)
|
|
|
|
|
|
class L3NATAgentWithStateReport(L3NATAgent):
|
|
|
|
def __init__(self, host, conf=None):
|
|
super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf)
|
|
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
|
|
self.agent_state = {
|
|
'binary': 'neutron-l3-agent',
|
|
'host': host,
|
|
'availability_zone': self.conf.AGENT.availability_zone,
|
|
'topic': topics.L3_AGENT,
|
|
'configurations': {
|
|
'agent_mode': self.conf.agent_mode,
|
|
'handle_internal_only_routers':
|
|
self.conf.handle_internal_only_routers,
|
|
'external_network_bridge': self.conf.external_network_bridge,
|
|
'gateway_external_network_id':
|
|
self.conf.gateway_external_network_id,
|
|
'interface_driver': self.conf.interface_driver,
|
|
'log_agent_heartbeats': self.conf.AGENT.log_agent_heartbeats},
|
|
'start_flag': True,
|
|
'agent_type': lib_const.AGENT_TYPE_L3}
|
|
report_interval = self.conf.AGENT.report_interval
|
|
if report_interval:
|
|
self.heartbeat = loopingcall.FixedIntervalLoopingCall(
|
|
self._report_state)
|
|
self.heartbeat.start(interval=report_interval)
|
|
|
|
def _report_state(self):
|
|
num_ex_gw_ports = 0
|
|
num_interfaces = 0
|
|
num_floating_ips = 0
|
|
router_infos = self.router_info.values()
|
|
num_routers = len(router_infos)
|
|
for ri in router_infos:
|
|
ex_gw_port = ri.get_ex_gw_port()
|
|
if ex_gw_port:
|
|
num_ex_gw_ports += 1
|
|
num_interfaces += len(ri.router.get(lib_const.INTERFACE_KEY,
|
|
[]))
|
|
num_floating_ips += len(ri.router.get(lib_const.FLOATINGIP_KEY,
|
|
[]))
|
|
configurations = self.agent_state['configurations']
|
|
configurations['routers'] = num_routers
|
|
configurations['ex_gw_ports'] = num_ex_gw_ports
|
|
configurations['interfaces'] = num_interfaces
|
|
configurations['floating_ips'] = num_floating_ips
|
|
try:
|
|
agent_status = self.state_rpc.report_state(self.context,
|
|
self.agent_state,
|
|
True)
|
|
if agent_status == l3_constants.AGENT_REVIVED:
|
|
LOG.info(_LI('Agent has just been revived. '
|
|
'Doing a full sync.'))
|
|
self.fullsync = True
|
|
self.agent_state.pop('start_flag', None)
|
|
except AttributeError:
|
|
# This means the server does not support report_state
|
|
LOG.warning(_LW("Neutron server does not support state report. "
|
|
"State report for this agent will be disabled."))
|
|
self.heartbeat.stop()
|
|
return
|
|
except Exception:
|
|
LOG.exception(_LE("Failed reporting state!"))
|
|
|
|
def after_start(self):
|
|
eventlet.spawn_n(self._process_routers_loop)
|
|
LOG.info(_LI("L3 agent started"))
|
|
# Do the report state before we do the first full sync.
|
|
self._report_state()
|
|
|
|
self.pd.after_start()
|
|
|
|
def agent_updated(self, context, payload):
|
|
"""Handle the agent_updated notification event."""
|
|
self.fullsync = True
|
|
LOG.info(_LI("agent_updated by server side %s!"), payload)
|