Add clustering capabilities to instance manager

This changes the instance manager to use the InstanceGroupManager
to manage clusters of instances instead of individual instances,
including clusters of 1 node for non-HA resources.

This also adds some missing documenation on enabling clustering
of the astara-orchestartor service.

Change-Id: Ib679453aafe68e6653c8c5f9f412efa72c2b7cb1
This commit is contained in:
Adam Gandelman
2016-01-13 17:20:53 -08:00
parent fd8fe9c431
commit 6613031d91
20 changed files with 1609 additions and 752 deletions

View File

@@ -67,7 +67,8 @@ def build_config(worker_context, router, management_port, interfaces):
'tenant_id': router.tenant_id,
'hostname': 'ak-%s' % router.tenant_id,
'orchestrator': worker_context.config,
'vpn': generate_vpn_config(router, worker_context.neutron)
'ha_resource': router.ha,
'vpn': generate_vpn_config(router, worker_context.neutron),
}

View File

@@ -169,7 +169,8 @@ class DictModelBase(object):
class Router(object):
def __init__(self, id_, tenant_id, name, admin_state_up, status,
external_port=None, internal_ports=None, floating_ips=None):
external_port=None, internal_ports=None, floating_ips=None,
ha=False):
self.id = id_
self.tenant_id = tenant_id
self.name = name
@@ -178,6 +179,7 @@ class Router(object):
self.external_port = external_port
self.internal_ports = internal_ports or []
self.floating_ips = floating_ips or []
self.ha = ha
def __repr__(self):
return '<%s (%s:%s)>' % (self.__class__.__name__,
@@ -205,6 +207,8 @@ class Router(object):
fips = [FloatingIP.from_dict(fip) for fip in d.get('_floatingips', [])]
ha = d.get('ha', False)
return cls(
d['id'],
d['tenant_id'],
@@ -214,6 +218,7 @@ class Router(object):
external_port,
internal_ports,
floating_ips=fips,
ha=ha,
)
@property

View File

@@ -47,6 +47,10 @@ OPTIONS = [
cfg.CONF.register_opts(OPTIONS)
class NovaInstanceDeleteTimeout(Exception):
pass
class InstanceInfo(object):
def __init__(self, instance_id, name, management_port=None, ports=(),
image_uuid=None, status=None, last_boot=None):
@@ -256,17 +260,20 @@ class Nova(object):
default)
self.instance_provider = default(self.client)
def get_instance_info(self, name):
"""Retrieves an InstanceInfo object for a given instance name
def get_instances_for_obj(self, name):
"""Retreives all nova servers for a given instance name.
:param name: name of the instance being queried
:returns: an InstanceInfo object representing the router instance
:returns: a list of novaclient.v2.servers.Server objects or []
"""
instance = self.get_instance_for_obj(name)
if instance:
return InstanceInfo.from_nova(instance)
search_opt = '^' + name + '.*$'
instances = self.client.servers.list(
search_opts=dict(name=search_opt)
)
if not instances:
return []
return [InstanceInfo.from_nova(i) for i in instances]
def get_instance_for_obj(self, name):
"""Retreives a nova server for a given instance name.
@@ -342,9 +349,48 @@ class Nova(object):
def update_instance_info(self, instance_info):
"""Used primarily for updating tracked instance status"""
instance = self.get_instance_by_id(instance_info.id_)
if not instance:
return None
instance_info.nova_status = instance.status
return instance_info
def delete_instances_and_wait(self, instance_infos):
"""Deletes the nova instance and waits for its deletion to complete"""
to_poll = list(instance_infos)
for inst in instance_infos:
try:
self.destroy_instance(inst)
except novaclient_exceptions.NotFound:
pass
except Exception:
LOG.exception(
_LE('Error deleting instance %s' % inst.id_))
to_poll.remove(inst)
# XXX parallelize this
timed_out = []
for inst in to_poll:
start = time.time()
i = 0
while time.time() - start < cfg.CONF.boot_timeout:
i += 1
if not self.get_instance_by_id(inst.id_):
LOG.debug('Instance %s has been deleted', inst.id_)
break
LOG.debug(
'Instance %s has not finished stopping', inst.id_)
time.sleep(cfg.CONF.retry_delay)
else:
timed_out.append(inst)
LOG.error(_LE(
'Instance %s failed to stop within %d secs'),
inst.id_, cfg.CONF.boot_timeout)
if timed_out:
raise NovaInstanceDeleteTimeout()
# TODO(mark): Convert this to dynamic yaml, proper network prefix and ssh-keys
TEMPLATE = """#cloud-config

View File

@@ -0,0 +1,70 @@
# Copyright (c) 2016 Akanda, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import threading
class ResourceContainer(object):
def __init__(self):
self.resources = {}
self.deleted = collections.deque(maxlen=50)
self.lock = threading.Lock()
def __delitem__(self, item):
with self.lock:
del self.resources[item]
self.deleted.append(item)
def items(self):
"""Get all state machines.
:returns: all state machines in this RouterContainer
"""
with self.lock:
return list(self.resources.items())
def values(self):
with self.lock:
return list(self.resources.values())
def has_been_deleted(self, resource_id):
"""Check if a resource has been deleted.
:param resource_id: The resource's id to check against the deleted list
:returns: Returns True if the resource_id has been deleted.
"""
with self.lock:
return resource_id in self.deleted
def __getitem__(self, item):
with self.lock:
return self.resources[item]
def __setitem__(self, key, value):
with self.lock:
self.resources[key] = value
def __contains__(self, item):
with self.lock:
return item in self.resources
def __bool__(self):
if self.values():
return True
else:
return False
def __nonzero__(self):
return self.__bool__()

View File

@@ -42,11 +42,11 @@ class Fake(object):
self.crud = crud
def delete_callback(self):
def delete_callback():
print('DELETE')
def bandwidth_callback(self, *args, **kwargs):
def bandwidth_callback(*args, **kwargs):
print('BANDWIDTH:', args, kwargs)

View File

@@ -187,3 +187,8 @@ class BaseDriver(object):
def get_state(self, worker_context):
"""Returns the state of the managed resource"""
@property
def is_ha(self):
"""Returns True if logical resource is set to be highly-available"""
return False

View File

@@ -61,6 +61,7 @@ STATUS_MAP = {
states.UP: neutron.STATUS_BUILD,
states.CONFIGURED: neutron.STATUS_ACTIVE,
states.ERROR: neutron.STATUS_ERROR,
states.DEGRADED: neutron.STATUS_BUILD,
}
@@ -356,3 +357,10 @@ class Router(BaseDriver):
:returns: bool True if alive, False if not
"""
return astara_client.is_alive(management_address, self.mgt_port)
@property
def is_ha(self):
"""Returns True if logical resource is set to be highly-available"""
if not self._router:
return False
return self._router.ha

View File

@@ -24,6 +24,7 @@ RESTART = 'restart'
REPLUG = 'replug'
GONE = 'gone'
ERROR = 'error'
DEGRADED = 'degraded'
# base list of ready states, driver can use its own list.
READY_STATES = (UP, CONFIGURED)
READY_STATES = (UP, CONFIGURED, DEGRADED)

View File

@@ -23,6 +23,7 @@ POLL = 'poll'
COMMAND = 'command' # an external command to be processed
REBUILD = 'rebuild'
REBALANCE = 'rebalance'
CLUSTER_REBUILD = 'cluster_rebuild'
class Event(object):

View File

@@ -23,6 +23,8 @@ from oslo_config import cfg
from astara.drivers import states
from astara.common.i18n import _LE, _LI
from astara.common import container
CONF = cfg.CONF
INSTANCE_MANAGER_OPTS = [
@@ -43,6 +45,24 @@ INSTANCE_MANAGER_OPTS = [
CONF.register_opts(INSTANCE_MANAGER_OPTS)
def _generate_interface_map(instance, interfaces):
# TODO(mark): We're in the first phase of VRRP, so we need
# map the interface to the network ID.
# Eventually we'll send VRRP data and real interface data
port_mac_to_net = {
p.mac_address: p.network_id
for p in instance.ports
}
# Add in the management port
mgt_port = instance.management_port
port_mac_to_net[mgt_port.mac_address] = mgt_port.network_id
# this is a network to logical interface id
return {
port_mac_to_net[i['lladdr']]: i['ifname']
for i in interfaces if i['lladdr'] in port_mac_to_net
}
def synchronize_driver_state(f):
"""Wrapper that triggers a driver's synchronize_state function"""
def wrapper(self, *args, **kw):
@@ -55,7 +75,7 @@ def synchronize_driver_state(f):
def ensure_cache(f):
"""Decorator to wrap around any function that uses self.instance_info.
Insures that self.instance_info is up to date and catches instances in a
Ensures that self.instance_info is up to date and catches instances in a
GONE or missing state before wasting cycles trying to do something with it.
NOTE: This replaces the old function called _ensure_cache made a Decorator
@@ -63,17 +83,14 @@ def ensure_cache(f):
"""
@wraps(f)
def wrapper(self, worker_context, *args, **kw):
# insure that self.instance_info is current before doing anything.
self.instance_info = (
worker_context.nova_client.get_instance_info(self.resource.name)
)
if self.instance_info:
(
self.instance_info.management_port,
self.instance_info.ports
) = worker_context.neutron.get_ports_for_instance(
self.instance_info.id_
)
self.instances.refresh(worker_context)
instances = worker_context.nova_client.get_instances_for_obj(
self.resource.name)
for inst_info in instances:
self.instances[inst_info.id_] = inst_info
self.instances.update_ports(worker_context)
return f(self, worker_context, *args, **kw)
@@ -95,6 +112,318 @@ class BootAttemptCounter(object):
return self._attempts
class InstanceGroupManager(container.ResourceContainer):
def __init__(self, log, resource):
super(InstanceGroupManager, self).__init__()
self.log = log
self.resource = resource
self._alive = set()
@property
def instances(self):
"""Returns the managed instances sorted by name"""
return sorted(self.resources.values(), key=lambda i: i.name)
def validate_ports(self):
"""Checks whether instance have management ports attached
:returns: tuple containing two lists:
(instances that have ports, instances that don't)
"""
has_ports = set()
for inst_info in set(self.resources.values()):
if inst_info.management_address:
has_ports.add(inst_info)
return has_ports, set(self.resources.values()) - has_ports
def are_alive(self):
"""Calls the check_check function all instances to ensure liveliness
:returns: tuple containing two lists (alive_instances, dead_instances)
"""
alive = set()
for i in six.moves.range(cfg.CONF.max_retries):
for inst_info in set(self.instances) - alive:
if (inst_info.management_address and
self.resource.is_alive(inst_info.management_address)):
self.log.debug(
'Instance %s found alive after %s of %s attempts',
inst_info.id_, i, cfg.CONF.max_retries)
alive.add(inst_info)
else:
self.log.debug(
'Alive check failed for instance %s. Attempt %d of %d',
inst_info.id_, i, cfg.CONF.max_retries)
# all managed instances report alive
if alive == set(self.instances):
self._alive = [i.id_ for i in alive]
return alive, []
# zero managed instances report alive
if not alive:
self.log.debug(
'Alive check failed for all instnaces after %s attempts.',
cfg.CONF.max_retries)
return [], self.instances
dead = set(self.resources.values()) - alive
self._alive = [i.id_ for i in alive - dead]
return list(alive), list(dead)
def update_ports(self, worker_context):
"""Refresh ports on all managed instance info objects"""
for instance_info in self.instances:
if not instance_info:
continue
(
instance_info.management_port,
instance_info.ports
) = worker_context.neutron.get_ports_for_instance(
instance_info.id_
)
def get_interfaces(self):
"""Obtain a list of interfaces from each managed instance
Skips any instance that has not already been verified as being alive.
:returns: dict of {instance: [interfaces_dict]}
"""
interfaces = {}
for inst in self.instances:
if inst.id_ not in self._alive:
self.log.debug(
'Skipping interfaces on query on instance %s that '
'is not yet alive.', inst.id_)
continue
else:
interfaces[inst] = self.resource.get_interfaces(
inst.management_address)
return interfaces
def verify_interfaces(self, ports):
"""Verify at least one instance in group has correct ports plugged"""
for inst, interfaces in self.get_interfaces().items():
actual_macs = set((iface['lladdr'] for iface in interfaces))
self.log.debug(
'MACs found on %s: %s', inst.id_,
', '.join(sorted(actual_macs)))
if not all(
getattr(p, 'mac_address', None) for p in ports
):
return False
num_instance_ports = len(list(inst.ports))
num_logical_ports = len(list(ports))
if num_logical_ports != num_instance_ports:
self.log.debug(
'Expected %s instance ports but found %s',
num_logical_ports, num_instance_ports)
return False
expected_macs = set(p.mac_address
for p in inst.ports)
expected_macs.add(inst.management_port.mac_address)
self.log.debug(
'MACs expected on: %s, %s',
inst.id_, ', '.join(sorted(expected_macs)))
if actual_macs == expected_macs:
self.log.debug('Found all expected MACs on %s', inst.id_)
return True
self.log.debug(
'Did not find all expected MACs on instance %s, '
'actual MACs: %s', inst.id_, ', '.join(actual_macs))
return False
def _update_config(self, instance, config):
self.log.debug(
'Updating config for instance %s on resource %s',
instance.id_, self.resource.id)
self.log.debug('New config: %r', config)
attempts = cfg.CONF.max_retries
for i in six.moves.range(attempts):
try:
self.resource.update_config(
instance.management_address,
config)
except Exception:
if i == attempts - 1:
# Only log the traceback if we encounter it many times.
self.log.exception(_LE('failed to update config'))
else:
self.log.debug(
'failed to update config, attempt %d',
i
)
time.sleep(cfg.CONF.retry_delay)
else:
self.log.info('Instance config updated')
return True
else:
return False
def _ha_config(self, instance):
"""Builds configuration describing the HA cluster
This informs the instance about any configuration relating to the HA
cluster it should be joining. ATM this is primarily used to inform
an instance about the management addresses of its peers.
:param instance: InstanceInfo object
:returns: dict of HA configuration
"""
peers = [
i.management_address for i in self.instances
if i.management_address != instance.management_address]
# determine cluster priority by instance age. the older instance
# gets the higher priority
sorted_by_age = sorted(
self.instances, key=lambda i: i.time_since_boot,
reverse=True)
if sorted_by_age.index(instance) == 0:
priority = 100
else:
priority = 50
return {
'peers': peers,
'priority': priority,
}
def configure(self, worker_context):
# XXX config update can be dispatched to threads to speed
# things up across multiple instances
failed = []
# get_interfaces() return returns only instances that are up and ready
# for config
instances_interfaces = self.get_interfaces()
for inst, interfaces in instances_interfaces.items():
# sending all the standard config over to the driver for
# final updates
config = self.resource.build_config(
worker_context,
inst.management_port,
_generate_interface_map(inst, interfaces)
)
# while drivers are free to express their own ha config
# requirements, the instance manager is the only one with
# high level view of the cluster, ie knowledge of membership
if self.resource.is_ha:
config['ha_config'] = config.get('ha') or {}
config['ha_config'].update(self._ha_config(inst))
self.log.debug(
'preparing to update config for instance %s on %s resource '
'to %r', inst.id_, self.resource.RESOURCE_NAME, config)
if self._update_config(inst, config) is not True:
failed.append(inst)
if set(failed) == set(self.instances):
# all updates have failed
self.log.error(
'Could not update config for any instances on %s resource %s, '
'marking resource state %s',
self.resource.id, self.resource.RESOURCE_NAME, states.RESTART)
return states.RESTART
elif failed:
# some updates to instances we thought to be alive have failed
self.log.error(
'Could not update config for some instances on %s '
'resource %s marking %s resource state',
self.resource.RESOURCE_NAME, self.resource.id, states.DEGRADED)
return states.DEGRADED
elif len(instances_interfaces.keys()) != len(self.instances):
# instance_interfaces contains only instances that are alive
# if we're still waiting on instances, remain degraded
self.log.debug(
'Config updated on %s of %s instances',
len(instances_interfaces.keys()), len(self.instances))
return states.DEGRADED
else:
self.log.debug(
'Config updated across all instances on %s resource %s',
self.resource.RESOURCE_NAME, self.resource.id)
return states.CONFIGURED
def delete(self, instance):
"""Removes nova server reference from manager"""
del self.resources[instance.id_]
def refresh(self, worker_context):
"""Update nova server reference for all managed instances"""
for i in self.instances:
if not worker_context.nova_client.update_instance_info(i):
self.delete(i)
def destroy(self, worker_context):
"""Destroys all nova instances and blocks until deletion"""
worker_context.nova_client.delete_instances_and_wait(
self.instances)
def remove(self, worker_context, instance):
"""Destroys the nova instance, removes instance from group manager"""
worker_context.nova_client.destroy_instance(instance)
self.delete(instance)
@property
def next_instance_index(self):
ids = [
int(i.name.split('_')[1]) for i in
self.instances]
try:
return max(ids) + 1
except ValueError:
return 0
def create(self, worker_context):
to_boot = self.required_instance_count - len(self.instances)
self.log.debug(
'Booting an additional %s instance(s) for resource %s',
to_boot, self.resource.id)
for i in six.moves.range(to_boot):
name = '%s_%s' % (self.resource.name, self.next_instance_index)
instance = worker_context.nova_client.boot_instance(
resource_type=self.resource.RESOURCE_NAME,
prev_instance_info=None,
name=name,
image_uuid=self.resource.image_uuid,
flavor=self.resource.flavor,
make_ports_callback=self.resource.make_ports(worker_context)
)
self.add_instance(instance)
@property
def required_instance_count(self):
if self.resource.is_ha is True:
return 2
else:
return 1
@property
def instance_count(self):
return len(self.instances)
@property
def cluster_degraded(self):
return self.instance_count < self.required_instance_count
def add_instance(self, instance):
"""Adds a new instance or updates existing"""
self.resources[instance.id_] = instance
class InstanceManager(object):
def __init__(self, resource, worker_context):
@@ -113,9 +442,10 @@ class InstanceManager(object):
self.state = states.DOWN
self.instance_info = None
self.instances = InstanceGroupManager(self.log, self.resource)
self.last_error = None
self._boot_counter = BootAttemptCounter()
self._boot_logged = False
self._boot_logged = []
self._last_synced_status = None
self.state = self.update_state(worker_context, silent=True)
@@ -150,111 +480,98 @@ class InstanceManager(object):
self.state = states.GONE
return self.state
if self.instance_info is None:
self.log.info(_LI('no backing instance, marking as %s'),
if not self.instances:
self.log.info(_LI('no backing instance(s), marking as %s'),
states.DOWN)
self.state = states.DOWN
return self.state
elif self.instances.cluster_degraded is True:
self.log.info(_LI(
'instance cluster for resource %s reports degraded'),
self.resource.id)
self.state = states.DEGRADED
return self.state
addr = self.instance_info.management_address
if not addr:
has_ports, no_ports = self.instances.validate_ports()
# ports_state=None means no instances have ports
if not has_ports:
self.log.debug('waiting for instance ports to be attached')
self.state = states.BOOTING
return self.state
for i in six.moves.range(cfg.CONF.max_retries):
if self.resource.is_alive(self.instance_info.management_address):
if self.state != states.CONFIGURED:
self.state = states.UP
break
if not silent:
self.log.debug('Alive check failed. Attempt %d of %d',
i,
cfg.CONF.max_retries)
time.sleep(cfg.CONF.retry_delay)
else:
old_state = self.state
self._check_boot_timeout()
# XXX TODO need to account for when only a subset of the cluster have
# correct ports, kick back to Replug
# If the instance isn't responding, make sure Nova knows about it
instance = worker_context.nova_client.get_instance_for_obj(
self.resource.id)
if instance is None and self.state != states.ERROR:
self.log.info('No instance was found; rebooting')
self.state = states.DOWN
self.instance_info = None
alive, dead = self.instances.are_alive()
if not alive:
# alive checked failed on all instances for an already configured
# resource, mark it down.
# XXX need to track timeouts per instance
# self._check_boot_timeout()
# update_state() is called from Alive() to check the
# status of the router. If we can't talk to the API at
# that point, the router should be considered missing and
# we should reboot it, so mark it states.DOWN if we think it was
# configured before.
if old_state == states.CONFIGURED and self.state != states.ERROR:
self.log.debug('Instance not alive, marking it as %s',
if self.state == states.CONFIGURED:
self.log.debug('No instance(s) alive, marking it as %s',
states.DOWN)
self.state = states.DOWN
return self.state
elif dead:
# some subset of instances reported not alive, mark it degraded.
if self.state == states.CONFIGURED:
for i in dead:
instance = worker_context.nova_client.get_instance_by_id(
i.id_)
if instance is None and self.state != states.ERROR:
self.log.info(
'Instance %s was found; rebooting', i.id_)
self.instances.delete(i)
self.state = states.DEGRADED
return self.state
# After the instance is all the way up, record how long it took
# to boot and accept a configuration.
self.instance_info = (
worker_context.nova_client.update_instance_info(
self.instance_info))
self.instances.refresh(worker_context)
if self.state == states.CONFIGURED:
for i in alive:
if not i.booting and i not in self._boot_logged:
self.log.info(
'%s booted in %s seconds after %s attempts',
self.resource.RESOURCE_NAME,
i.time_since_boot.total_seconds(),
self._boot_counter.count)
self._boot_logged.append(i)
self.reset_boot_counter()
else:
if alive:
self.state = states.UP
if not self.instance_info.booting and self.state == states.CONFIGURED:
# If we didn't boot the server (because we were restarted
# while it remained running, for example), we won't have a
# duration to log.
if not self._boot_logged:
boot_time = self.instance_info.time_since_boot.total_seconds()
self.log.info('%s booted in %s seconds after %s attempts',
self.resource.RESOURCE_NAME, boot_time,
self._boot_counter.count)
self._boot_logged = True
# Always reset the boot counter, even if we didn't boot
# the server ourself, so we don't accidentally think we
# have an erroring router.
self._boot_counter.reset()
return self.state
@ensure_cache
def boot(self, worker_context):
"""Boots the instance with driver pre/post boot hooks.
"""Boots the instances with driver pre/post boot hooks.
:returns: None
"""
self.log.info('Booting %s' % self.resource.RESOURCE_NAME)
self.state = states.DOWN
self._boot_counter.start()
if self.state != states.DEGRADED:
self.state = states.DOWN
self._boot_counter.start()
# driver preboot hook
self.resource.pre_boot(worker_context)
# try to boot the instance
try:
instance_info = worker_context.nova_client.boot_instance(
resource_type=self.resource.RESOURCE_NAME,
prev_instance_info=self.instance_info,
name=self.resource.name,
image_uuid=self.resource.image_uuid,
flavor=self.resource.flavor,
make_ports_callback=self.resource.make_ports(worker_context)
)
if not instance_info:
self.log.info(_LI('Previous instance is still deleting'))
self.instances.create(worker_context)
if not self.instances:
self.log.info(_LI('Previous instances are still deleting'))
# Reset the boot counter, causing the state machine to start
# again with a new Instance.
self.reset_boot_counter()
self.instance_info = None
return
except:
self.log.exception(_LE('Instance failed to start boot'))
self.resource.delete_ports(worker_context)
self.log.exception(_LE('Instances failed to start boot'))
else:
# We have successfully started a (re)boot attempt so
# record the timestamp so we can report how long it takes.
self.state = states.BOOTING
self.instance_info = instance_info
# driver post boot hook
self.resource.post_boot(worker_context)
@@ -303,7 +620,7 @@ class InstanceManager(object):
@synchronize_driver_state
@ensure_cache
def stop(self, worker_context):
"""Attempts to destroy the instance with configured timeout.
"""Attempts to destroy the instance cluster
:param worker_context:
:returns:
@@ -312,31 +629,18 @@ class InstanceManager(object):
self.resource.delete_ports(worker_context)
if not self.instance_info:
self.log.info(_LI('Instance already destroyed.'))
if not self.instances:
self.log.info(_LI('Instance(s) already destroyed.'))
if self.state != states.GONE:
self.state = states.DOWN
return self.state
try:
worker_context.nova_client.destroy_instance(self.instance_info)
self.instances.destroy(worker_context)
if self.state != states.GONE:
self.state = states.DOWN
except Exception:
self.log.exception(_LE('Error deleting router instance'))
start = time.time()
i = 0
while time.time() - start < cfg.CONF.boot_timeout:
i += 1
if not worker_context.nova_client.\
get_instance_by_id(self.instance_info.id_):
if self.state != states.GONE:
self.state = states.DOWN
return self.state
self.log.debug('Router has not finished stopping')
time.sleep(cfg.CONF.retry_delay)
self.log.error(_LE(
'Router failed to stop within %d secs'),
cfg.CONF.boot_timeout)
self.log.exception(_LE('Failed to stop instance(s)'))
@synchronize_driver_state
@ensure_cache
@@ -350,68 +654,22 @@ class InstanceManager(object):
"""
self.log.debug('Begin instance config')
self.state = states.UP
attempts = cfg.CONF.max_retries
if self.resource.get_state(worker_context) == states.GONE:
return states.GONE
interfaces = self.resource.get_interfaces(
self.instance_info.management_address)
if not self.instances:
return states.DOWN
if not self._verify_interfaces(self.resource.ports, interfaces):
# FIXME: Need a states.REPLUG state when we support hot-plugging
# interfaces.
if not self.instances.verify_interfaces(self.resource.ports):
# XXX Need to acct for degraded cluster /w subset of nodes
# having incorrect plugging.
self.log.debug("Interfaces aren't plugged as expected.")
self.state = states.REPLUG
return self.state
# TODO(mark): We're in the first phase of VRRP, so we need
# map the interface to the network ID.
# Eventually we'll send VRRP data and real interface data
port_mac_to_net = {
p.mac_address: p.network_id
for p in self.instance_info.ports
}
# Add in the management port
mgt_port = self.instance_info.management_port
port_mac_to_net[mgt_port.mac_address] = mgt_port.network_id
# this is a network to logical interface id
iface_map = {
port_mac_to_net[i['lladdr']]: i['ifname']
for i in interfaces if i['lladdr'] in port_mac_to_net
}
# sending all the standard config over to the driver for final updates
config = self.resource.build_config(
worker_context,
mgt_port,
iface_map
)
self.log.debug('preparing to update config to %r', config)
for i in six.moves.range(attempts):
try:
self.resource.update_config(
self.instance_info.management_address,
config)
except Exception:
if i == attempts - 1:
# Only log the traceback if we encounter it many times.
self.log.exception(_LE('failed to update config'))
else:
self.log.debug(
'failed to update config, attempt %d',
i
)
time.sleep(cfg.CONF.retry_delay)
else:
self.state = states.CONFIGURED
self.log.info('Instance config updated')
return self.state
else:
self.state = states.RESTART
return self.state
self.state = self.instances.configure(worker_context)
return self.state
def replug(self, worker_context):
@@ -424,51 +682,55 @@ class InstanceManager(object):
self.resource.pre_plug(worker_context)
interfaces = self.resource.get_interfaces(
self.instance_info.management_address)
for instance, interfaces in self.instances.get_interfaces().items():
actual_macs = set((iface['lladdr'] for iface in interfaces))
instance_macs = set(p.mac_address for p in instance.ports)
instance_macs.add(instance.management_port.mac_address)
actual_macs = set((iface['lladdr'] for iface in interfaces))
instance_macs = set(p.mac_address for p in self.instance_info.ports)
instance_macs.add(self.instance_info.management_port.mac_address)
if instance_macs != actual_macs:
# our cached copy of the ports is wrong reboot and clean up
self.log.warning(
('Instance macs(%s) do not match actual macs (%s). Instance '
'cache appears out-of-sync'),
instance_macs, actual_macs
)
self.state = states.RESTART
return
instance_ports = {p.network_id: p for p in self.instance_info.ports}
instance_networks = set(instance_ports.keys())
logical_networks = set(p.network_id for p in self.resource.ports)
if logical_networks != instance_networks:
instance = worker_context.nova_client.get_instance_by_id(
self.instance_info.id_
)
# For each port that doesn't have a mac address on the instance...
for network_id in logical_networks - instance_networks:
port = worker_context.neutron.create_vrrp_port(
self.resource.id,
network_id
if instance_macs != actual_macs:
# our cached copy of the ports is wrong reboot and clean up
self.log.warning((
'Instance macs(%s) do not match actual macs (%s). Instance'
' cache appears out-of-sync'),
instance_macs, actual_macs
)
self.log.debug(
'Net %s is missing from the router, plugging: %s',
network_id, port.id
self.state = states.RESTART
return
instance_ports = {p.network_id: p for p in instance.ports}
instance_networks = set(instance_ports.keys())
logical_networks = set(p.network_id for p in self.resource.ports)
if logical_networks != instance_networks:
nova_instance = worker_context.nova_client.get_instance_by_id(
instance.id_
)
try:
instance.interface_attach(port.id, None, None)
except:
self.log.exception('Interface attach failed')
self.state = states.RESTART
return
self.instance_info.ports.append(port)
# For each port that doesn't have a mac address on the instance
for network_id in logical_networks - instance_networks:
port = worker_context.neutron.create_vrrp_port(
self.resource.id,
network_id
)
self.log.debug(
'Net %s is missing from the appliance instance %s, '
'plugging: %s', network_id, instance.id_, port.id
)
try:
nova_instance.interface_attach(port.id, None, None)
instance.ports.append(port)
except:
self.log.exception(
'Interface attach failed on instance %s',
instance.id_)
self.instances.remove(worker_context, instance)
# instance has been removed for failure, do not continue with
# plugging
if instance not in self.instances.values():
continue
ports_to_delete = []
for network_id in instance_networks - logical_networks:
@@ -479,39 +741,60 @@ class InstanceManager(object):
)
try:
instance.interface_detach(port.id)
nova_instance.interface_detach(port.id)
instance.ports.remove(port)
ports_to_delete.append(port)
except:
self.log.exception('Interface detach failed')
self.state = states.RESTART
return
self.log.exception(
'Interface detach failed on instance %s',
instance.id_)
self.instances.remove(worker_context, instance)
self.instance_info.ports.remove(port)
# instance has been removed for failure, do not continue with
# plugging
if instance not in self.instances.values():
continue
# The action of attaching/detaching interfaces in Nova happens via the
# message bus and is *not* blocking. We need to wait a few seconds to
# see if the list of tap devices on the appliance actually changed. If
# not, assume the hotplug failed, and reboot the Instance.
replug_seconds = cfg.CONF.hotplug_timeout
while replug_seconds > 0:
if self._wait_for_interface_hotplug(instance) is not True:
self.instances.remove(worker_context, instance)
if not self.instances:
# all instances were destroyed for plugging failure
self.state = states.RESTART
elif self.instances.cluster_degraded:
# some instances were destroyed for plugging failure
self.state = states.DEGRADED
else:
# plugging was successful
for p in ports_to_delete:
worker_context.neutron.api_client.delete_port(port.id)
return
def _wait_for_interface_hotplug(self, instance):
"""Waits for instance to report interfaces for all expected ports"""
# The action of attaching/detaching interfaces in Nova happens via
# the message bus and is *not* blocking. We need to wait a few
# seconds to if the list of tap devices on the appliance actually
# changed. If not, assume the hotplug failed, and reboot the
# Instance.
for i in six.moves.range(1, cfg.CONF.hotplug_timeout):
self.log.debug(
"Waiting for interface attachments to take effect..."
)
interfaces = self.resource.get_interfaces(
self.instance_info.management_address)
if self._verify_interfaces(self.resource.ports, interfaces):
# replugging was successful
# TODO(mark) update port states
for p in ports_to_delete:
worker_context.neutron.api_client.delete_port(port.id)
return
instance.management_address)
actual_macs = set((iface['lladdr'] for iface in interfaces))
instance_macs = set(p.mac_address for p in instance.ports)
instance_macs.add(instance.management_port.mac_address)
if actual_macs == instance_macs:
return True
time.sleep(1)
replug_seconds -= 1
self.log.debug("Interfaces aren't plugged as expected, rebooting.")
self.state = states.RESTART
else:
self.log.debug(
"Interfaces aren't plugged as expected on instance %s, ",
"marking for rebooting.", instance.id_)
return False
def _check_boot_timeout(self):
"""If the instance was created more than `boot_timeout` seconds
@@ -539,25 +822,3 @@ class InstanceManager(object):
# forced rebuild.
if self.state != states.ERROR:
self.state = states.DOWN
def _verify_interfaces(self, ports, interfaces):
"""Verifies the network interfaces are what they should be.
"""
actual_macs = set((iface['lladdr'] for iface in interfaces))
self.log.debug('MACs found: %s', ', '.join(sorted(actual_macs)))
if not all(
getattr(p, 'mac_address', None) for p in ports
):
return False
num_logical_ports = len(list(ports))
num_instance_ports = len(list(self.instance_info.ports))
if num_logical_ports != num_instance_ports:
return False
expected_macs = set(p.mac_address
for p in self.instance_info.ports)
expected_macs.add(self.instance_info.management_port.mac_address)
self.log.debug('MACs expected: %s', ', '.join(sorted(expected_macs)))
return actual_macs == expected_macs

View File

@@ -26,7 +26,8 @@ import collections
import itertools
from astara.common.i18n import _LE, _LI, _LW
from astara.event import POLL, CREATE, READ, UPDATE, DELETE, REBUILD
from astara.event import (POLL, CREATE, READ, UPDATE, DELETE, REBUILD,
CLUSTER_REBUILD)
from astara import instance_manager
from astara.drivers import states
@@ -85,6 +86,12 @@ class CalcAction(State):
self.params.resource.log.debug('shortcutting to delete')
return DELETE
if (self.params.instance.state == states.DEGRADED and
CLUSTER_REBUILD not in queue):
self.params.resource.log.debug(
'Scheduling a rebuild on degraded cluster')
queue.append(CLUSTER_REBUILD)
while queue:
self.params.resource.log.debug(
'action = %s, len(queue) = %s, queue = %s',
@@ -101,7 +108,8 @@ class CalcAction(State):
action = queue.popleft()
continue
elif action in (CREATE, UPDATE) and queue[0] == REBUILD:
elif (action in (CREATE, UPDATE, CLUSTER_REBUILD) and
queue[0] == REBUILD):
# upgrade to REBUILD from CREATE/UPDATE by taking the next
# item from the queue
self.params.resource.log.debug('upgrading from %s to rebuild',
@@ -145,12 +153,16 @@ class CalcAction(State):
next_action = StopInstance(self.params)
elif action == REBUILD:
next_action = RebuildInstance(self.params)
elif (action == CLUSTER_REBUILD and
self.instance.state in (states.DEGRADED, states.DOWN)):
next_action = CreateInstance(self.params)
elif self.instance.state == states.BOOTING:
next_action = CheckBoot(self.params)
elif self.instance.state == states.DOWN:
elif self.instance.state in (states.DOWN, states.DEGRADED):
next_action = CreateInstance(self.params)
else:
next_action = Alive(self.params)
if self.instance.state == states.ERROR:
if action == POLL:
# If the selected action is to poll, and we are in an
@@ -212,7 +224,7 @@ class Alive(State):
def transition(self, action, worker_context):
if self.instance.state == states.GONE:
return StopInstance(self.params)
elif self.instance.state == states.DOWN:
elif self.instance.state in (states.DOWN, states.DEGRADED):
return CreateInstance(self.params)
elif action == POLL and \
self.instance.state == states.CONFIGURED:
@@ -228,7 +240,8 @@ class CreateInstance(State):
def execute(self, action, worker_context):
# Check for a loop where the resource keeps failing to boot or
# accept the configuration.
if self.instance.attempts >= self.params.reboot_error_threshold:
if (not self.instance.state == states.DEGRADED and
self.instance.attempts >= self.params.reboot_error_threshold):
self.params.resource.log.info(_LI(
'Dropping out of boot loop after %s trials'),
self.instance.attempts)

View File

@@ -18,8 +18,6 @@
"""Manage the resources for a given tenant.
"""
import collections
import threading
import datetime
from oslo_config import cfg
@@ -29,6 +27,7 @@ from oslo_utils import timeutils
from astara.common.i18n import _LE
from astara import state
from astara import drivers
from astara.common import container
LOG = logging.getLogger(__name__)
@@ -45,50 +44,7 @@ class InvalidIncomingMessage(Exception):
pass
class ResourceContainer(object):
def __init__(self):
self.state_machines = {}
self.deleted = collections.deque(maxlen=50)
self.lock = threading.Lock()
def __delitem__(self, item):
with self.lock:
del self.state_machines[item]
self.deleted.append(item)
def items(self):
"""Get all state machines.
:returns: all state machines in this RouterContainer
"""
with self.lock:
return list(self.state_machines.items())
def values(self):
with self.lock:
return list(self.state_machines.values())
def has_been_deleted(self, resource_id):
"""Check if a resource has been deleted.
:param resource_id: The resource's id to check against the deleted list
:returns: Returns True if the resource_id has been deleted.
"""
with self.lock:
return resource_id in self.deleted
def __getitem__(self, item):
with self.lock:
return self.state_machines[item]
def __setitem__(self, key, value):
with self.lock:
self.state_machines[key] = value
def __contains__(self, item):
with self.lock:
return item in self.state_machines
class StateMachineContainer(container.ResourceContainer):
def unmanage(self, resource_id):
"""Used to delete a state machine from local management
@@ -102,7 +58,7 @@ class ResourceContainer(object):
"""
try:
with self.lock:
sm = self.state_machines.pop(resource_id)
sm = self.resources.pop(resource_id)
sm.drop_queue()
LOG.debug('unmanaged tenant state machine for resource %s',
resource_id)
@@ -123,7 +79,7 @@ class TenantResourceManager(object):
self.notify = notify_callback
self._queue_warning_threshold = queue_warning_threshold
self._reboot_error_threshold = reboot_error_threshold
self.state_machines = ResourceContainer()
self.state_machines = StateMachineContainer()
self._default_resource_id = None
def _delete_resource(self, resource):

View File

@@ -10,33 +10,33 @@
# Password of admin user (string value)
#os_password = <None>
# The port on which appliance API servers listen (string value)
#appliance_api_port = <None>
# Timeout (sec) for an appliance to become ACTIVE (integer value)
#appliance_active_timeout = 340
# Time health_check_period astara-orchestrator is configured to use (integer
# value)
#health_check_period = 60
# Tenant ID for the astara service user (string value)
#service_tenant_id = <None>
# Tenant name of admin user (string value)
#os_tenant_name = <None>
# (string value)
#test_subnet_cidr = 10.1.1.0/24
# Keystone auth URL (string value)
#os_auth_url = <None>
# Whether astara-neutron is configured to auto-add resources (boolean value)
#astara_auto_add_resources = true
# Keystone auth URL (string value)
#os_auth_url = <None>
# (string value)
#test_subnet_cidr = 10.1.1.0/24
# Username of admin user (string value)
#os_username = <None>
# Tenant name of the astara service user (string value)
#service_tenant_name = <None>
# Timeout (sec) for an appliance to become ACTIVE (integer value)
#appliance_active_timeout = 340
# The port on which appliance API servers listen (string value)
#appliance_api_port = <None>
# Time health_check_period astara-orchestrator is configured to use (integer
# value)
#health_check_period = 60

View File

@@ -114,4 +114,5 @@ fake_router = FakeModel(
name='router_name',
external_port=fake_ext_port,
management_port=fake_mgt_port,
internal_ports=[fake_int_port])
internal_ports=[fake_int_port],
ha=False)

View File

@@ -92,6 +92,7 @@ class TestAstaraClient(unittest.TestCase):
'asn': 64512,
'neighbor_asn': 64512,
'tenant_id': 'tenant_id',
'ha_resource': False,
'hostname': 'ak-tenant_id',
'orchestrator': {
'host': 'foohost',

View File

@@ -157,6 +157,7 @@ def fake_driver(resource_id=None):
fake_driver.image_uuid = 'fake_image_uuid'
fake_driver.make_ports.return_value = 'fake_ports_callback'
fake_driver.delete_ports.return_value = 'fake_delete_ports_callback'
fake_driver.is_ha = True
return fake_driver

File diff suppressed because it is too large Load Diff

View File

@@ -1,3 +1,5 @@
.. _install_astara:
Astara Installation
===================
@@ -244,7 +246,7 @@ All configuration is to be performed on the controller node.
If you don't plan to build your own appliance image, one can be downloaded for testing at: http://tarballs.openstack.org/akanda-appliance/images/
If you want to build one yourself instructions are found in the :ref:`appliance documation<appliance_build>`
If you want to build one yourself instructions are found in the :ref:`appliance documentation`
In either case, upload the image to Glance (this command must be performed in the directory where the image was downloaded/created)::
openstack image create astara --public --container-format=bare --disk-format=qcow2 --file astara.qcow2
@@ -321,3 +323,23 @@ Output similar to::
+--------------------------------------+------------------------------------------------+----------------------------------+--------+------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
The line with the ak-router shows that astara has built the router VM. Further operation and debug information can be found in the :ref:`operator tools<operator_tools>` section.
.. _cluster_astara:
Clustering astara-orchestrator
------------------------------
The ``astara-orchestartor`` service supports clustering among multiple processes spanning multiple nodes to provide active/active clustering for
purposes of load-distribution and high-availability (HA). In this setup, multiple ``astara-orchestrator`` processes form a distributed hash ring,
in which each is responsible for orchestrating a subset of virtual appliances. When one ``astara-orchestrator`` falls offline, management of
its resources are redistributed to remaining nodes. This feature requires the use of an external coordination service (ie, zookeeper),
as provided by the `tooz library <http://docs.openstack.org/developer/tooz/>`_. To find out more about which services ``tooz`` supports,
see `<http://docs.openstack.org/developer/tooz/drivers.html>`_.
To enable this feature, you must set the following in ``orchestrator.ini``::
[coordination]
enabled=True # enable the feature
url=kazoo://zookeeper.localnet:2181?timeout=5 # a URL to a tooz-supported coordination service
group_id=astara.orchestrator # optional, change this if deploying multiple clusters
heartbeat_interval=1 # optional, tune as needed

View File

@@ -146,3 +146,23 @@ event transitions the state machine into the ``Alive`` state, which (depending
on the availability of the router), may simply exit the state machine (because
the router's status API replies with an ``HTTP 200``) or transition to the
``CreateVM`` state (because the router is unresponsive and must be recreated).
High Availability
-----------------
Astara supports high-availability (HA) on both the control plane and data
plane.
The ``astara-orchestrator`` service may be deployed in a configuration that
allows multiple service processes to span nodes to allow load-distribution
and HA. For more information on clustering, see the :ref:`install docs<cluster_astara>`.
It also supports orchestrating pairs of virtual appliances to provide
HA of the data path, allowing pairs of virtual routers to be clustered among
themselves using VRRP and connection tracking. To enable this, simply
create Neutron routers with the ``ha=True`` parameter or set this property
on existing routers and issue a rebuild command via ``astara-ctl`` for that
router.

View File

@@ -0,0 +1,4 @@
---
features:
- Astara now supports orchestrating clustered pairs of appliance VMs for
Neutron routers that have the been set to highly-available.