diff --git a/astara/api/config/router.py b/astara/api/config/router.py index ab499116..95c3eb14 100644 --- a/astara/api/config/router.py +++ b/astara/api/config/router.py @@ -67,7 +67,8 @@ def build_config(worker_context, router, management_port, interfaces): 'tenant_id': router.tenant_id, 'hostname': 'ak-%s' % router.tenant_id, 'orchestrator': worker_context.config, - 'vpn': generate_vpn_config(router, worker_context.neutron) + 'ha_resource': router.ha, + 'vpn': generate_vpn_config(router, worker_context.neutron), } diff --git a/astara/api/neutron.py b/astara/api/neutron.py index d3796340..58eb882d 100644 --- a/astara/api/neutron.py +++ b/astara/api/neutron.py @@ -169,7 +169,8 @@ class DictModelBase(object): class Router(object): def __init__(self, id_, tenant_id, name, admin_state_up, status, - external_port=None, internal_ports=None, floating_ips=None): + external_port=None, internal_ports=None, floating_ips=None, + ha=False): self.id = id_ self.tenant_id = tenant_id self.name = name @@ -178,6 +179,7 @@ class Router(object): self.external_port = external_port self.internal_ports = internal_ports or [] self.floating_ips = floating_ips or [] + self.ha = ha def __repr__(self): return '<%s (%s:%s)>' % (self.__class__.__name__, @@ -205,6 +207,8 @@ class Router(object): fips = [FloatingIP.from_dict(fip) for fip in d.get('_floatingips', [])] + ha = d.get('ha', False) + return cls( d['id'], d['tenant_id'], @@ -214,6 +218,7 @@ class Router(object): external_port, internal_ports, floating_ips=fips, + ha=ha, ) @property diff --git a/astara/api/nova.py b/astara/api/nova.py index 41f39fe7..d2d2e179 100644 --- a/astara/api/nova.py +++ b/astara/api/nova.py @@ -47,6 +47,10 @@ OPTIONS = [ cfg.CONF.register_opts(OPTIONS) +class NovaInstanceDeleteTimeout(Exception): + pass + + class InstanceInfo(object): def __init__(self, instance_id, name, management_port=None, ports=(), image_uuid=None, status=None, last_boot=None): @@ -256,17 +260,20 @@ class Nova(object): default) self.instance_provider = default(self.client) - def get_instance_info(self, name): - """Retrieves an InstanceInfo object for a given instance name + def get_instances_for_obj(self, name): + """Retreives all nova servers for a given instance name. :param name: name of the instance being queried - :returns: an InstanceInfo object representing the router instance + :returns: a list of novaclient.v2.servers.Server objects or [] """ - instance = self.get_instance_for_obj(name) - - if instance: - return InstanceInfo.from_nova(instance) + search_opt = '^' + name + '.*$' + instances = self.client.servers.list( + search_opts=dict(name=search_opt) + ) + if not instances: + return [] + return [InstanceInfo.from_nova(i) for i in instances] def get_instance_for_obj(self, name): """Retreives a nova server for a given instance name. @@ -342,9 +349,48 @@ class Nova(object): def update_instance_info(self, instance_info): """Used primarily for updating tracked instance status""" instance = self.get_instance_by_id(instance_info.id_) + if not instance: + return None instance_info.nova_status = instance.status return instance_info + def delete_instances_and_wait(self, instance_infos): + """Deletes the nova instance and waits for its deletion to complete""" + to_poll = list(instance_infos) + + for inst in instance_infos: + try: + self.destroy_instance(inst) + except novaclient_exceptions.NotFound: + pass + except Exception: + LOG.exception( + _LE('Error deleting instance %s' % inst.id_)) + to_poll.remove(inst) + + # XXX parallelize this + timed_out = [] + for inst in to_poll: + start = time.time() + i = 0 + while time.time() - start < cfg.CONF.boot_timeout: + i += 1 + if not self.get_instance_by_id(inst.id_): + LOG.debug('Instance %s has been deleted', inst.id_) + break + LOG.debug( + 'Instance %s has not finished stopping', inst.id_) + time.sleep(cfg.CONF.retry_delay) + else: + timed_out.append(inst) + LOG.error(_LE( + 'Instance %s failed to stop within %d secs'), + inst.id_, cfg.CONF.boot_timeout) + + if timed_out: + raise NovaInstanceDeleteTimeout() + + # TODO(mark): Convert this to dynamic yaml, proper network prefix and ssh-keys TEMPLATE = """#cloud-config diff --git a/astara/common/container.py b/astara/common/container.py new file mode 100644 index 00000000..21d63a94 --- /dev/null +++ b/astara/common/container.py @@ -0,0 +1,70 @@ +# Copyright (c) 2016 Akanda, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import threading + + +class ResourceContainer(object): + + def __init__(self): + self.resources = {} + self.deleted = collections.deque(maxlen=50) + self.lock = threading.Lock() + + def __delitem__(self, item): + with self.lock: + del self.resources[item] + self.deleted.append(item) + + def items(self): + """Get all state machines. + :returns: all state machines in this RouterContainer + """ + with self.lock: + return list(self.resources.items()) + + def values(self): + with self.lock: + return list(self.resources.values()) + + def has_been_deleted(self, resource_id): + """Check if a resource has been deleted. + + :param resource_id: The resource's id to check against the deleted list + :returns: Returns True if the resource_id has been deleted. + """ + with self.lock: + return resource_id in self.deleted + + def __getitem__(self, item): + with self.lock: + return self.resources[item] + + def __setitem__(self, key, value): + with self.lock: + self.resources[key] = value + + def __contains__(self, item): + with self.lock: + return item in self.resources + + def __bool__(self): + if self.values(): + return True + else: + return False + + def __nonzero__(self): + return self.__bool__() diff --git a/astara/debug.py b/astara/debug.py index 75e941b0..3310fee4 100644 --- a/astara/debug.py +++ b/astara/debug.py @@ -42,11 +42,11 @@ class Fake(object): self.crud = crud -def delete_callback(self): +def delete_callback(): print('DELETE') -def bandwidth_callback(self, *args, **kwargs): +def bandwidth_callback(*args, **kwargs): print('BANDWIDTH:', args, kwargs) diff --git a/astara/drivers/base.py b/astara/drivers/base.py index 7df1cb40..92c8eb0f 100644 --- a/astara/drivers/base.py +++ b/astara/drivers/base.py @@ -187,3 +187,8 @@ class BaseDriver(object): def get_state(self, worker_context): """Returns the state of the managed resource""" + + @property + def is_ha(self): + """Returns True if logical resource is set to be highly-available""" + return False diff --git a/astara/drivers/router.py b/astara/drivers/router.py index 9a56a9d2..e2b30f82 100644 --- a/astara/drivers/router.py +++ b/astara/drivers/router.py @@ -61,6 +61,7 @@ STATUS_MAP = { states.UP: neutron.STATUS_BUILD, states.CONFIGURED: neutron.STATUS_ACTIVE, states.ERROR: neutron.STATUS_ERROR, + states.DEGRADED: neutron.STATUS_BUILD, } @@ -356,3 +357,10 @@ class Router(BaseDriver): :returns: bool True if alive, False if not """ return astara_client.is_alive(management_address, self.mgt_port) + + @property + def is_ha(self): + """Returns True if logical resource is set to be highly-available""" + if not self._router: + return False + return self._router.ha diff --git a/astara/drivers/states.py b/astara/drivers/states.py index 21d80311..c42b0f25 100644 --- a/astara/drivers/states.py +++ b/astara/drivers/states.py @@ -24,6 +24,7 @@ RESTART = 'restart' REPLUG = 'replug' GONE = 'gone' ERROR = 'error' +DEGRADED = 'degraded' # base list of ready states, driver can use its own list. -READY_STATES = (UP, CONFIGURED) +READY_STATES = (UP, CONFIGURED, DEGRADED) diff --git a/astara/event.py b/astara/event.py index a750ff82..ddbbf4c9 100644 --- a/astara/event.py +++ b/astara/event.py @@ -23,6 +23,7 @@ POLL = 'poll' COMMAND = 'command' # an external command to be processed REBUILD = 'rebuild' REBALANCE = 'rebalance' +CLUSTER_REBUILD = 'cluster_rebuild' class Event(object): diff --git a/astara/instance_manager.py b/astara/instance_manager.py index f8882757..383ab551 100644 --- a/astara/instance_manager.py +++ b/astara/instance_manager.py @@ -23,6 +23,8 @@ from oslo_config import cfg from astara.drivers import states from astara.common.i18n import _LE, _LI +from astara.common import container + CONF = cfg.CONF INSTANCE_MANAGER_OPTS = [ @@ -43,6 +45,24 @@ INSTANCE_MANAGER_OPTS = [ CONF.register_opts(INSTANCE_MANAGER_OPTS) +def _generate_interface_map(instance, interfaces): + # TODO(mark): We're in the first phase of VRRP, so we need + # map the interface to the network ID. + # Eventually we'll send VRRP data and real interface data + port_mac_to_net = { + p.mac_address: p.network_id + for p in instance.ports + } + # Add in the management port + mgt_port = instance.management_port + port_mac_to_net[mgt_port.mac_address] = mgt_port.network_id + # this is a network to logical interface id + return { + port_mac_to_net[i['lladdr']]: i['ifname'] + for i in interfaces if i['lladdr'] in port_mac_to_net + } + + def synchronize_driver_state(f): """Wrapper that triggers a driver's synchronize_state function""" def wrapper(self, *args, **kw): @@ -55,7 +75,7 @@ def synchronize_driver_state(f): def ensure_cache(f): """Decorator to wrap around any function that uses self.instance_info. - Insures that self.instance_info is up to date and catches instances in a + Ensures that self.instance_info is up to date and catches instances in a GONE or missing state before wasting cycles trying to do something with it. NOTE: This replaces the old function called _ensure_cache made a Decorator @@ -63,17 +83,14 @@ def ensure_cache(f): """ @wraps(f) def wrapper(self, worker_context, *args, **kw): - # insure that self.instance_info is current before doing anything. - self.instance_info = ( - worker_context.nova_client.get_instance_info(self.resource.name) - ) - if self.instance_info: - ( - self.instance_info.management_port, - self.instance_info.ports - ) = worker_context.neutron.get_ports_for_instance( - self.instance_info.id_ - ) + self.instances.refresh(worker_context) + + instances = worker_context.nova_client.get_instances_for_obj( + self.resource.name) + for inst_info in instances: + self.instances[inst_info.id_] = inst_info + + self.instances.update_ports(worker_context) return f(self, worker_context, *args, **kw) @@ -95,6 +112,318 @@ class BootAttemptCounter(object): return self._attempts +class InstanceGroupManager(container.ResourceContainer): + def __init__(self, log, resource): + super(InstanceGroupManager, self).__init__() + self.log = log + self.resource = resource + self._alive = set() + + @property + def instances(self): + """Returns the managed instances sorted by name""" + return sorted(self.resources.values(), key=lambda i: i.name) + + def validate_ports(self): + """Checks whether instance have management ports attached + + :returns: tuple containing two lists: + (instances that have ports, instances that don't) + """ + has_ports = set() + for inst_info in set(self.resources.values()): + if inst_info.management_address: + has_ports.add(inst_info) + return has_ports, set(self.resources.values()) - has_ports + + def are_alive(self): + """Calls the check_check function all instances to ensure liveliness + + :returns: tuple containing two lists (alive_instances, dead_instances) + """ + alive = set() + for i in six.moves.range(cfg.CONF.max_retries): + for inst_info in set(self.instances) - alive: + if (inst_info.management_address and + self.resource.is_alive(inst_info.management_address)): + self.log.debug( + 'Instance %s found alive after %s of %s attempts', + inst_info.id_, i, cfg.CONF.max_retries) + alive.add(inst_info) + else: + self.log.debug( + 'Alive check failed for instance %s. Attempt %d of %d', + inst_info.id_, i, cfg.CONF.max_retries) + + # all managed instances report alive + if alive == set(self.instances): + self._alive = [i.id_ for i in alive] + return alive, [] + + # zero managed instances report alive + if not alive: + self.log.debug( + 'Alive check failed for all instnaces after %s attempts.', + cfg.CONF.max_retries) + return [], self.instances + + dead = set(self.resources.values()) - alive + self._alive = [i.id_ for i in alive - dead] + return list(alive), list(dead) + + def update_ports(self, worker_context): + """Refresh ports on all managed instance info objects""" + for instance_info in self.instances: + if not instance_info: + continue + ( + instance_info.management_port, + instance_info.ports + ) = worker_context.neutron.get_ports_for_instance( + instance_info.id_ + ) + + def get_interfaces(self): + """Obtain a list of interfaces from each managed instance + + Skips any instance that has not already been verified as being alive. + + :returns: dict of {instance: [interfaces_dict]} + """ + interfaces = {} + for inst in self.instances: + if inst.id_ not in self._alive: + self.log.debug( + 'Skipping interfaces on query on instance %s that ' + 'is not yet alive.', inst.id_) + continue + else: + interfaces[inst] = self.resource.get_interfaces( + inst.management_address) + return interfaces + + def verify_interfaces(self, ports): + """Verify at least one instance in group has correct ports plugged""" + for inst, interfaces in self.get_interfaces().items(): + actual_macs = set((iface['lladdr'] for iface in interfaces)) + self.log.debug( + 'MACs found on %s: %s', inst.id_, + ', '.join(sorted(actual_macs))) + if not all( + getattr(p, 'mac_address', None) for p in ports + ): + return False + + num_instance_ports = len(list(inst.ports)) + num_logical_ports = len(list(ports)) + if num_logical_ports != num_instance_ports: + self.log.debug( + 'Expected %s instance ports but found %s', + num_logical_ports, num_instance_ports) + return False + + expected_macs = set(p.mac_address + for p in inst.ports) + expected_macs.add(inst.management_port.mac_address) + self.log.debug( + 'MACs expected on: %s, %s', + inst.id_, ', '.join(sorted(expected_macs))) + + if actual_macs == expected_macs: + self.log.debug('Found all expected MACs on %s', inst.id_) + return True + + self.log.debug( + 'Did not find all expected MACs on instance %s, ' + 'actual MACs: %s', inst.id_, ', '.join(actual_macs)) + + return False + + def _update_config(self, instance, config): + self.log.debug( + 'Updating config for instance %s on resource %s', + instance.id_, self.resource.id) + self.log.debug('New config: %r', config) + attempts = cfg.CONF.max_retries + for i in six.moves.range(attempts): + try: + self.resource.update_config( + instance.management_address, + config) + except Exception: + if i == attempts - 1: + # Only log the traceback if we encounter it many times. + self.log.exception(_LE('failed to update config')) + else: + self.log.debug( + 'failed to update config, attempt %d', + i + ) + time.sleep(cfg.CONF.retry_delay) + else: + self.log.info('Instance config updated') + return True + else: + return False + + def _ha_config(self, instance): + """Builds configuration describing the HA cluster + + This informs the instance about any configuration relating to the HA + cluster it should be joining. ATM this is primarily used to inform + an instance about the management addresses of its peers. + + :param instance: InstanceInfo object + :returns: dict of HA configuration + """ + peers = [ + i.management_address for i in self.instances + if i.management_address != instance.management_address] + + # determine cluster priority by instance age. the older instance + # gets the higher priority + sorted_by_age = sorted( + self.instances, key=lambda i: i.time_since_boot, + reverse=True) + + if sorted_by_age.index(instance) == 0: + priority = 100 + else: + priority = 50 + + return { + 'peers': peers, + 'priority': priority, + } + + def configure(self, worker_context): + # XXX config update can be dispatched to threads to speed + # things up across multiple instances + failed = [] + + # get_interfaces() return returns only instances that are up and ready + # for config + instances_interfaces = self.get_interfaces() + + for inst, interfaces in instances_interfaces.items(): + # sending all the standard config over to the driver for + # final updates + config = self.resource.build_config( + worker_context, + inst.management_port, + _generate_interface_map(inst, interfaces) + ) + + # while drivers are free to express their own ha config + # requirements, the instance manager is the only one with + # high level view of the cluster, ie knowledge of membership + if self.resource.is_ha: + config['ha_config'] = config.get('ha') or {} + config['ha_config'].update(self._ha_config(inst)) + + self.log.debug( + 'preparing to update config for instance %s on %s resource ' + 'to %r', inst.id_, self.resource.RESOURCE_NAME, config) + + if self._update_config(inst, config) is not True: + failed.append(inst) + + if set(failed) == set(self.instances): + # all updates have failed + self.log.error( + 'Could not update config for any instances on %s resource %s, ' + 'marking resource state %s', + self.resource.id, self.resource.RESOURCE_NAME, states.RESTART) + return states.RESTART + elif failed: + # some updates to instances we thought to be alive have failed + self.log.error( + 'Could not update config for some instances on %s ' + 'resource %s marking %s resource state', + self.resource.RESOURCE_NAME, self.resource.id, states.DEGRADED) + return states.DEGRADED + elif len(instances_interfaces.keys()) != len(self.instances): + # instance_interfaces contains only instances that are alive + # if we're still waiting on instances, remain degraded + self.log.debug( + 'Config updated on %s of %s instances', + len(instances_interfaces.keys()), len(self.instances)) + return states.DEGRADED + else: + self.log.debug( + 'Config updated across all instances on %s resource %s', + self.resource.RESOURCE_NAME, self.resource.id) + return states.CONFIGURED + + def delete(self, instance): + """Removes nova server reference from manager""" + del self.resources[instance.id_] + + def refresh(self, worker_context): + """Update nova server reference for all managed instances""" + for i in self.instances: + if not worker_context.nova_client.update_instance_info(i): + self.delete(i) + + def destroy(self, worker_context): + """Destroys all nova instances and blocks until deletion""" + worker_context.nova_client.delete_instances_and_wait( + self.instances) + + def remove(self, worker_context, instance): + """Destroys the nova instance, removes instance from group manager""" + worker_context.nova_client.destroy_instance(instance) + self.delete(instance) + + @property + def next_instance_index(self): + ids = [ + int(i.name.split('_')[1]) for i in + self.instances] + try: + return max(ids) + 1 + except ValueError: + return 0 + + def create(self, worker_context): + to_boot = self.required_instance_count - len(self.instances) + self.log.debug( + 'Booting an additional %s instance(s) for resource %s', + to_boot, self.resource.id) + + for i in six.moves.range(to_boot): + name = '%s_%s' % (self.resource.name, self.next_instance_index) + instance = worker_context.nova_client.boot_instance( + resource_type=self.resource.RESOURCE_NAME, + prev_instance_info=None, + name=name, + image_uuid=self.resource.image_uuid, + flavor=self.resource.flavor, + make_ports_callback=self.resource.make_ports(worker_context) + + ) + self.add_instance(instance) + + @property + def required_instance_count(self): + if self.resource.is_ha is True: + return 2 + else: + return 1 + + @property + def instance_count(self): + return len(self.instances) + + @property + def cluster_degraded(self): + return self.instance_count < self.required_instance_count + + def add_instance(self, instance): + """Adds a new instance or updates existing""" + self.resources[instance.id_] = instance + + class InstanceManager(object): def __init__(self, resource, worker_context): @@ -113,9 +442,10 @@ class InstanceManager(object): self.state = states.DOWN self.instance_info = None + self.instances = InstanceGroupManager(self.log, self.resource) self.last_error = None self._boot_counter = BootAttemptCounter() - self._boot_logged = False + self._boot_logged = [] self._last_synced_status = None self.state = self.update_state(worker_context, silent=True) @@ -150,111 +480,98 @@ class InstanceManager(object): self.state = states.GONE return self.state - if self.instance_info is None: - self.log.info(_LI('no backing instance, marking as %s'), + if not self.instances: + self.log.info(_LI('no backing instance(s), marking as %s'), states.DOWN) self.state = states.DOWN return self.state + elif self.instances.cluster_degraded is True: + self.log.info(_LI( + 'instance cluster for resource %s reports degraded'), + self.resource.id) + self.state = states.DEGRADED + return self.state - addr = self.instance_info.management_address - if not addr: + has_ports, no_ports = self.instances.validate_ports() + + # ports_state=None means no instances have ports + if not has_ports: self.log.debug('waiting for instance ports to be attached') self.state = states.BOOTING return self.state - for i in six.moves.range(cfg.CONF.max_retries): - if self.resource.is_alive(self.instance_info.management_address): - if self.state != states.CONFIGURED: - self.state = states.UP - break - if not silent: - self.log.debug('Alive check failed. Attempt %d of %d', - i, - cfg.CONF.max_retries) - time.sleep(cfg.CONF.retry_delay) - else: - old_state = self.state - self._check_boot_timeout() + # XXX TODO need to account for when only a subset of the cluster have + # correct ports, kick back to Replug - # If the instance isn't responding, make sure Nova knows about it - instance = worker_context.nova_client.get_instance_for_obj( - self.resource.id) - if instance is None and self.state != states.ERROR: - self.log.info('No instance was found; rebooting') - self.state = states.DOWN - self.instance_info = None + alive, dead = self.instances.are_alive() + if not alive: + # alive checked failed on all instances for an already configured + # resource, mark it down. + # XXX need to track timeouts per instance + # self._check_boot_timeout() - # update_state() is called from Alive() to check the - # status of the router. If we can't talk to the API at - # that point, the router should be considered missing and - # we should reboot it, so mark it states.DOWN if we think it was - # configured before. - if old_state == states.CONFIGURED and self.state != states.ERROR: - self.log.debug('Instance not alive, marking it as %s', + if self.state == states.CONFIGURED: + self.log.debug('No instance(s) alive, marking it as %s', states.DOWN) self.state = states.DOWN + return self.state + elif dead: + # some subset of instances reported not alive, mark it degraded. + if self.state == states.CONFIGURED: + for i in dead: + instance = worker_context.nova_client.get_instance_by_id( + i.id_) + if instance is None and self.state != states.ERROR: + self.log.info( + 'Instance %s was found; rebooting', i.id_) + self.instances.delete(i) + self.state = states.DEGRADED + return self.state - # After the instance is all the way up, record how long it took - # to boot and accept a configuration. - self.instance_info = ( - worker_context.nova_client.update_instance_info( - self.instance_info)) + self.instances.refresh(worker_context) + if self.state == states.CONFIGURED: + for i in alive: + if not i.booting and i not in self._boot_logged: + self.log.info( + '%s booted in %s seconds after %s attempts', + self.resource.RESOURCE_NAME, + i.time_since_boot.total_seconds(), + self._boot_counter.count) + self._boot_logged.append(i) + self.reset_boot_counter() + else: + if alive: + self.state = states.UP - if not self.instance_info.booting and self.state == states.CONFIGURED: - # If we didn't boot the server (because we were restarted - # while it remained running, for example), we won't have a - # duration to log. - if not self._boot_logged: - boot_time = self.instance_info.time_since_boot.total_seconds() - self.log.info('%s booted in %s seconds after %s attempts', - self.resource.RESOURCE_NAME, boot_time, - self._boot_counter.count) - self._boot_logged = True - - # Always reset the boot counter, even if we didn't boot - # the server ourself, so we don't accidentally think we - # have an erroring router. - self._boot_counter.reset() return self.state @ensure_cache def boot(self, worker_context): - """Boots the instance with driver pre/post boot hooks. + """Boots the instances with driver pre/post boot hooks. :returns: None """ self.log.info('Booting %s' % self.resource.RESOURCE_NAME) - self.state = states.DOWN - self._boot_counter.start() + + if self.state != states.DEGRADED: + self.state = states.DOWN + self._boot_counter.start() # driver preboot hook self.resource.pre_boot(worker_context) - # try to boot the instance try: - instance_info = worker_context.nova_client.boot_instance( - resource_type=self.resource.RESOURCE_NAME, - prev_instance_info=self.instance_info, - name=self.resource.name, - image_uuid=self.resource.image_uuid, - flavor=self.resource.flavor, - make_ports_callback=self.resource.make_ports(worker_context) - ) - if not instance_info: - self.log.info(_LI('Previous instance is still deleting')) + self.instances.create(worker_context) + if not self.instances: + self.log.info(_LI('Previous instances are still deleting')) # Reset the boot counter, causing the state machine to start # again with a new Instance. self.reset_boot_counter() - self.instance_info = None return except: - self.log.exception(_LE('Instance failed to start boot')) - self.resource.delete_ports(worker_context) + self.log.exception(_LE('Instances failed to start boot')) else: - # We have successfully started a (re)boot attempt so - # record the timestamp so we can report how long it takes. self.state = states.BOOTING - self.instance_info = instance_info # driver post boot hook self.resource.post_boot(worker_context) @@ -303,7 +620,7 @@ class InstanceManager(object): @synchronize_driver_state @ensure_cache def stop(self, worker_context): - """Attempts to destroy the instance with configured timeout. + """Attempts to destroy the instance cluster :param worker_context: :returns: @@ -312,31 +629,18 @@ class InstanceManager(object): self.resource.delete_ports(worker_context) - if not self.instance_info: - self.log.info(_LI('Instance already destroyed.')) + if not self.instances: + self.log.info(_LI('Instance(s) already destroyed.')) if self.state != states.GONE: self.state = states.DOWN return self.state try: - worker_context.nova_client.destroy_instance(self.instance_info) + self.instances.destroy(worker_context) + if self.state != states.GONE: + self.state = states.DOWN except Exception: - self.log.exception(_LE('Error deleting router instance')) - - start = time.time() - i = 0 - while time.time() - start < cfg.CONF.boot_timeout: - i += 1 - if not worker_context.nova_client.\ - get_instance_by_id(self.instance_info.id_): - if self.state != states.GONE: - self.state = states.DOWN - return self.state - self.log.debug('Router has not finished stopping') - time.sleep(cfg.CONF.retry_delay) - self.log.error(_LE( - 'Router failed to stop within %d secs'), - cfg.CONF.boot_timeout) + self.log.exception(_LE('Failed to stop instance(s)')) @synchronize_driver_state @ensure_cache @@ -350,68 +654,22 @@ class InstanceManager(object): """ self.log.debug('Begin instance config') self.state = states.UP - attempts = cfg.CONF.max_retries if self.resource.get_state(worker_context) == states.GONE: return states.GONE - interfaces = self.resource.get_interfaces( - self.instance_info.management_address) + if not self.instances: + return states.DOWN - if not self._verify_interfaces(self.resource.ports, interfaces): - # FIXME: Need a states.REPLUG state when we support hot-plugging - # interfaces. + if not self.instances.verify_interfaces(self.resource.ports): + # XXX Need to acct for degraded cluster /w subset of nodes + # having incorrect plugging. self.log.debug("Interfaces aren't plugged as expected.") self.state = states.REPLUG return self.state - # TODO(mark): We're in the first phase of VRRP, so we need - # map the interface to the network ID. - # Eventually we'll send VRRP data and real interface data - port_mac_to_net = { - p.mac_address: p.network_id - for p in self.instance_info.ports - } - # Add in the management port - mgt_port = self.instance_info.management_port - port_mac_to_net[mgt_port.mac_address] = mgt_port.network_id - # this is a network to logical interface id - iface_map = { - port_mac_to_net[i['lladdr']]: i['ifname'] - for i in interfaces if i['lladdr'] in port_mac_to_net - } - - # sending all the standard config over to the driver for final updates - config = self.resource.build_config( - worker_context, - mgt_port, - iface_map - ) - - self.log.debug('preparing to update config to %r', config) - - for i in six.moves.range(attempts): - try: - self.resource.update_config( - self.instance_info.management_address, - config) - except Exception: - if i == attempts - 1: - # Only log the traceback if we encounter it many times. - self.log.exception(_LE('failed to update config')) - else: - self.log.debug( - 'failed to update config, attempt %d', - i - ) - time.sleep(cfg.CONF.retry_delay) - else: - self.state = states.CONFIGURED - self.log.info('Instance config updated') - return self.state - else: - self.state = states.RESTART - return self.state + self.state = self.instances.configure(worker_context) + return self.state def replug(self, worker_context): @@ -424,51 +682,55 @@ class InstanceManager(object): self.resource.pre_plug(worker_context) - interfaces = self.resource.get_interfaces( - self.instance_info.management_address) + for instance, interfaces in self.instances.get_interfaces().items(): + actual_macs = set((iface['lladdr'] for iface in interfaces)) + instance_macs = set(p.mac_address for p in instance.ports) + instance_macs.add(instance.management_port.mac_address) - actual_macs = set((iface['lladdr'] for iface in interfaces)) - instance_macs = set(p.mac_address for p in self.instance_info.ports) - instance_macs.add(self.instance_info.management_port.mac_address) - - if instance_macs != actual_macs: - # our cached copy of the ports is wrong reboot and clean up - self.log.warning( - ('Instance macs(%s) do not match actual macs (%s). Instance ' - 'cache appears out-of-sync'), - instance_macs, actual_macs - ) - self.state = states.RESTART - return - - instance_ports = {p.network_id: p for p in self.instance_info.ports} - instance_networks = set(instance_ports.keys()) - - logical_networks = set(p.network_id for p in self.resource.ports) - - if logical_networks != instance_networks: - instance = worker_context.nova_client.get_instance_by_id( - self.instance_info.id_ - ) - - # For each port that doesn't have a mac address on the instance... - for network_id in logical_networks - instance_networks: - port = worker_context.neutron.create_vrrp_port( - self.resource.id, - network_id + if instance_macs != actual_macs: + # our cached copy of the ports is wrong reboot and clean up + self.log.warning(( + 'Instance macs(%s) do not match actual macs (%s). Instance' + ' cache appears out-of-sync'), + instance_macs, actual_macs ) - self.log.debug( - 'Net %s is missing from the router, plugging: %s', - network_id, port.id + self.state = states.RESTART + return + + instance_ports = {p.network_id: p for p in instance.ports} + instance_networks = set(instance_ports.keys()) + + logical_networks = set(p.network_id for p in self.resource.ports) + + if logical_networks != instance_networks: + nova_instance = worker_context.nova_client.get_instance_by_id( + instance.id_ ) - try: - instance.interface_attach(port.id, None, None) - except: - self.log.exception('Interface attach failed') - self.state = states.RESTART - return - self.instance_info.ports.append(port) + # For each port that doesn't have a mac address on the instance + for network_id in logical_networks - instance_networks: + port = worker_context.neutron.create_vrrp_port( + self.resource.id, + network_id + ) + self.log.debug( + 'Net %s is missing from the appliance instance %s, ' + 'plugging: %s', network_id, instance.id_, port.id + ) + + try: + nova_instance.interface_attach(port.id, None, None) + instance.ports.append(port) + except: + self.log.exception( + 'Interface attach failed on instance %s', + instance.id_) + self.instances.remove(worker_context, instance) + + # instance has been removed for failure, do not continue with + # plugging + if instance not in self.instances.values(): + continue ports_to_delete = [] for network_id in instance_networks - logical_networks: @@ -479,39 +741,60 @@ class InstanceManager(object): ) try: - instance.interface_detach(port.id) + nova_instance.interface_detach(port.id) + instance.ports.remove(port) ports_to_delete.append(port) except: - self.log.exception('Interface detach failed') - self.state = states.RESTART - return + self.log.exception( + 'Interface detach failed on instance %s', + instance.id_) + self.instances.remove(worker_context, instance) - self.instance_info.ports.remove(port) + # instance has been removed for failure, do not continue with + # plugging + if instance not in self.instances.values(): + continue - # The action of attaching/detaching interfaces in Nova happens via the - # message bus and is *not* blocking. We need to wait a few seconds to - # see if the list of tap devices on the appliance actually changed. If - # not, assume the hotplug failed, and reboot the Instance. - replug_seconds = cfg.CONF.hotplug_timeout - while replug_seconds > 0: + if self._wait_for_interface_hotplug(instance) is not True: + self.instances.remove(worker_context, instance) + + if not self.instances: + # all instances were destroyed for plugging failure + self.state = states.RESTART + elif self.instances.cluster_degraded: + # some instances were destroyed for plugging failure + self.state = states.DEGRADED + else: + # plugging was successful + for p in ports_to_delete: + worker_context.neutron.api_client.delete_port(port.id) + return + + def _wait_for_interface_hotplug(self, instance): + """Waits for instance to report interfaces for all expected ports""" + # The action of attaching/detaching interfaces in Nova happens via + # the message bus and is *not* blocking. We need to wait a few + # seconds to if the list of tap devices on the appliance actually + # changed. If not, assume the hotplug failed, and reboot the + # Instance. + for i in six.moves.range(1, cfg.CONF.hotplug_timeout): self.log.debug( "Waiting for interface attachments to take effect..." ) interfaces = self.resource.get_interfaces( - self.instance_info.management_address) - - if self._verify_interfaces(self.resource.ports, interfaces): - # replugging was successful - # TODO(mark) update port states - for p in ports_to_delete: - worker_context.neutron.api_client.delete_port(port.id) - return + instance.management_address) + actual_macs = set((iface['lladdr'] for iface in interfaces)) + instance_macs = set(p.mac_address for p in instance.ports) + instance_macs.add(instance.management_port.mac_address) + if actual_macs == instance_macs: + return True time.sleep(1) - replug_seconds -= 1 - - self.log.debug("Interfaces aren't plugged as expected, rebooting.") - self.state = states.RESTART + else: + self.log.debug( + "Interfaces aren't plugged as expected on instance %s, ", + "marking for rebooting.", instance.id_) + return False def _check_boot_timeout(self): """If the instance was created more than `boot_timeout` seconds @@ -539,25 +822,3 @@ class InstanceManager(object): # forced rebuild. if self.state != states.ERROR: self.state = states.DOWN - - def _verify_interfaces(self, ports, interfaces): - """Verifies the network interfaces are what they should be. - """ - actual_macs = set((iface['lladdr'] for iface in interfaces)) - self.log.debug('MACs found: %s', ', '.join(sorted(actual_macs))) - if not all( - getattr(p, 'mac_address', None) for p in ports - ): - return False - - num_logical_ports = len(list(ports)) - num_instance_ports = len(list(self.instance_info.ports)) - if num_logical_ports != num_instance_ports: - return False - - expected_macs = set(p.mac_address - for p in self.instance_info.ports) - expected_macs.add(self.instance_info.management_port.mac_address) - self.log.debug('MACs expected: %s', ', '.join(sorted(expected_macs))) - - return actual_macs == expected_macs diff --git a/astara/state.py b/astara/state.py index 8478f771..41e63328 100644 --- a/astara/state.py +++ b/astara/state.py @@ -26,7 +26,8 @@ import collections import itertools from astara.common.i18n import _LE, _LI, _LW -from astara.event import POLL, CREATE, READ, UPDATE, DELETE, REBUILD +from astara.event import (POLL, CREATE, READ, UPDATE, DELETE, REBUILD, + CLUSTER_REBUILD) from astara import instance_manager from astara.drivers import states @@ -85,6 +86,12 @@ class CalcAction(State): self.params.resource.log.debug('shortcutting to delete') return DELETE + if (self.params.instance.state == states.DEGRADED and + CLUSTER_REBUILD not in queue): + self.params.resource.log.debug( + 'Scheduling a rebuild on degraded cluster') + queue.append(CLUSTER_REBUILD) + while queue: self.params.resource.log.debug( 'action = %s, len(queue) = %s, queue = %s', @@ -101,7 +108,8 @@ class CalcAction(State): action = queue.popleft() continue - elif action in (CREATE, UPDATE) and queue[0] == REBUILD: + elif (action in (CREATE, UPDATE, CLUSTER_REBUILD) and + queue[0] == REBUILD): # upgrade to REBUILD from CREATE/UPDATE by taking the next # item from the queue self.params.resource.log.debug('upgrading from %s to rebuild', @@ -145,12 +153,16 @@ class CalcAction(State): next_action = StopInstance(self.params) elif action == REBUILD: next_action = RebuildInstance(self.params) + elif (action == CLUSTER_REBUILD and + self.instance.state in (states.DEGRADED, states.DOWN)): + next_action = CreateInstance(self.params) elif self.instance.state == states.BOOTING: next_action = CheckBoot(self.params) - elif self.instance.state == states.DOWN: + elif self.instance.state in (states.DOWN, states.DEGRADED): next_action = CreateInstance(self.params) else: next_action = Alive(self.params) + if self.instance.state == states.ERROR: if action == POLL: # If the selected action is to poll, and we are in an @@ -212,7 +224,7 @@ class Alive(State): def transition(self, action, worker_context): if self.instance.state == states.GONE: return StopInstance(self.params) - elif self.instance.state == states.DOWN: + elif self.instance.state in (states.DOWN, states.DEGRADED): return CreateInstance(self.params) elif action == POLL and \ self.instance.state == states.CONFIGURED: @@ -228,7 +240,8 @@ class CreateInstance(State): def execute(self, action, worker_context): # Check for a loop where the resource keeps failing to boot or # accept the configuration. - if self.instance.attempts >= self.params.reboot_error_threshold: + if (not self.instance.state == states.DEGRADED and + self.instance.attempts >= self.params.reboot_error_threshold): self.params.resource.log.info(_LI( 'Dropping out of boot loop after %s trials'), self.instance.attempts) diff --git a/astara/tenant.py b/astara/tenant.py index e7cdda56..ee4ae620 100644 --- a/astara/tenant.py +++ b/astara/tenant.py @@ -18,8 +18,6 @@ """Manage the resources for a given tenant. """ -import collections -import threading import datetime from oslo_config import cfg @@ -29,6 +27,7 @@ from oslo_utils import timeutils from astara.common.i18n import _LE from astara import state from astara import drivers +from astara.common import container LOG = logging.getLogger(__name__) @@ -45,50 +44,7 @@ class InvalidIncomingMessage(Exception): pass -class ResourceContainer(object): - - def __init__(self): - self.state_machines = {} - self.deleted = collections.deque(maxlen=50) - self.lock = threading.Lock() - - def __delitem__(self, item): - with self.lock: - del self.state_machines[item] - self.deleted.append(item) - - def items(self): - """Get all state machines. - :returns: all state machines in this RouterContainer - """ - with self.lock: - return list(self.state_machines.items()) - - def values(self): - with self.lock: - return list(self.state_machines.values()) - - def has_been_deleted(self, resource_id): - """Check if a resource has been deleted. - - :param resource_id: The resource's id to check against the deleted list - :returns: Returns True if the resource_id has been deleted. - """ - with self.lock: - return resource_id in self.deleted - - def __getitem__(self, item): - with self.lock: - return self.state_machines[item] - - def __setitem__(self, key, value): - with self.lock: - self.state_machines[key] = value - - def __contains__(self, item): - with self.lock: - return item in self.state_machines - +class StateMachineContainer(container.ResourceContainer): def unmanage(self, resource_id): """Used to delete a state machine from local management @@ -102,7 +58,7 @@ class ResourceContainer(object): """ try: with self.lock: - sm = self.state_machines.pop(resource_id) + sm = self.resources.pop(resource_id) sm.drop_queue() LOG.debug('unmanaged tenant state machine for resource %s', resource_id) @@ -123,7 +79,7 @@ class TenantResourceManager(object): self.notify = notify_callback self._queue_warning_threshold = queue_warning_threshold self._reboot_error_threshold = reboot_error_threshold - self.state_machines = ResourceContainer() + self.state_machines = StateMachineContainer() self._default_resource_id = None def _delete_resource(self, resource): diff --git a/astara/test/functional/test.conf b/astara/test/functional/test.conf index 77f3ad16..73564bc6 100644 --- a/astara/test/functional/test.conf +++ b/astara/test/functional/test.conf @@ -10,33 +10,33 @@ # Password of admin user (string value) #os_password = +# The port on which appliance API servers listen (string value) +#appliance_api_port = + +# Timeout (sec) for an appliance to become ACTIVE (integer value) +#appliance_active_timeout = 340 + +# Time health_check_period astara-orchestrator is configured to use (integer +# value) +#health_check_period = 60 + # Tenant ID for the astara service user (string value) #service_tenant_id = # Tenant name of admin user (string value) #os_tenant_name = -# (string value) -#test_subnet_cidr = 10.1.1.0/24 +# Keystone auth URL (string value) +#os_auth_url = # Whether astara-neutron is configured to auto-add resources (boolean value) #astara_auto_add_resources = true -# Keystone auth URL (string value) -#os_auth_url = +# (string value) +#test_subnet_cidr = 10.1.1.0/24 # Username of admin user (string value) #os_username = # Tenant name of the astara service user (string value) #service_tenant_name = - -# Timeout (sec) for an appliance to become ACTIVE (integer value) -#appliance_active_timeout = 340 - -# The port on which appliance API servers listen (string value) -#appliance_api_port = - -# Time health_check_period astara-orchestrator is configured to use (integer -# value) -#health_check_period = 60 diff --git a/astara/test/unit/api/config/config_fakes.py b/astara/test/unit/api/config/config_fakes.py index 28677790..73fb0eb1 100644 --- a/astara/test/unit/api/config/config_fakes.py +++ b/astara/test/unit/api/config/config_fakes.py @@ -114,4 +114,5 @@ fake_router = FakeModel( name='router_name', external_port=fake_ext_port, management_port=fake_mgt_port, - internal_ports=[fake_int_port]) + internal_ports=[fake_int_port], + ha=False) diff --git a/astara/test/unit/api/config/test_router_config.py b/astara/test/unit/api/config/test_router_config.py index 25f36774..c4460537 100644 --- a/astara/test/unit/api/config/test_router_config.py +++ b/astara/test/unit/api/config/test_router_config.py @@ -92,6 +92,7 @@ class TestAstaraClient(unittest.TestCase): 'asn': 64512, 'neighbor_asn': 64512, 'tenant_id': 'tenant_id', + 'ha_resource': False, 'hostname': 'ak-tenant_id', 'orchestrator': { 'host': 'foohost', diff --git a/astara/test/unit/fakes.py b/astara/test/unit/fakes.py index 72b5f6ac..e99c8295 100644 --- a/astara/test/unit/fakes.py +++ b/astara/test/unit/fakes.py @@ -157,6 +157,7 @@ def fake_driver(resource_id=None): fake_driver.image_uuid = 'fake_image_uuid' fake_driver.make_ports.return_value = 'fake_ports_callback' fake_driver.delete_ports.return_value = 'fake_delete_ports_callback' + fake_driver.is_ha = True return fake_driver diff --git a/astara/test/unit/test_instance_manager.py b/astara/test/unit/test_instance_manager.py index 33197b87..1c61d7d6 100644 --- a/astara/test/unit/test_instance_manager.py +++ b/astara/test/unit/test_instance_manager.py @@ -15,8 +15,11 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import mock -import unittest2 as unittest +import six +import uuid + from datetime import datetime, timedelta from six.moves import range @@ -63,6 +66,21 @@ fake_add_port = FakeModel( fixed_ips=[FakeModel('', ip_address='8.8.8.8', subnet_id='s3')]) +def instance_info(mgt_port=fake_mgt_port, name=None): + if not name: + name = 'ak-router-' + str(uuid.uuid4()) + + return nova.InstanceInfo( + instance_id=str(uuid.uuid4()), + name=name, + management_port=mgt_port, + ports=[fake_int_port, fake_ext_port], + image_uuid='9f3dbe8e-66d8-11e5-9952-525400cfc326', + status='ACTIVE', + last_boot=(datetime.utcnow() - timedelta(minutes=15)), + ) + + class TestInstanceManager(base.RugTestBase): def setUp(self): @@ -93,27 +111,14 @@ class TestInstanceManager(base.RugTestBase): ] self.fake_driver.ports = ports - self.INSTANCE_INFO = nova.InstanceInfo( - instance_id='fake_instance_id', - name='ak-router-83f16d4c-66d8-11e5-938a-525400cfc326', - management_port=fake_mgt_port, - ports=[fake_int_port, fake_ext_port, fake_mgt_port], - image_uuid='9f3dbe8e-66d8-11e5-9952-525400cfc326', - status='ACTIVE', - last_boot=(datetime.utcnow() - timedelta(minutes=15)), - ) - - self.ctx.nova_client.get_instance_info.return_value = ( - self.INSTANCE_INFO) - self.ctx.neutron.get_ports_for_instance.return_value = ( - fake_mgt_port, [fake_int_port, fake_ext_port]) - self.mock_update_state = self.update_state_p.start() self.instance_mgr = instance_manager.InstanceManager( self.fake_driver, self.ctx ) - self.instance_mgr.instance_info = self.INSTANCE_INFO + self.instances_patch = mock.patch.object( + instance_manager, 'InstanceGroupManager', autospec=True) + self.instance_mgr.instances = self.instances_patch.start() self.next_state = None @@ -123,528 +128,580 @@ class TestInstanceManager(base.RugTestBase): return self.instance_mgr.state self.mock_update_state.side_effect = next_state - def test_update_state_is_alive(self): + def set_instances_container_mocks(self, instances=None, mocks=None): + # set up a mock InstanceGroupManager based on dict + # with specified mocks + self.instances_patch.stop() + + mocks = mocks or [] + instances = instances or [] + + class FakeInstancesContainer(dict): + @property + def instance_count(self): + return len(self.values()) + + @property + def cluster_degraded(self): + return len(self.values()) < self.count + + def remove(self, worker_context, instance): + self.pop(instance.id_) + + def refresh(self, worker_context): + pass + + self.instance_mgr.instances = FakeInstancesContainer() + for attr, _mock in mocks: + if attr not in dir(instance_manager.InstanceGroupManager): + raise AttributeError( + 'Attempting to mock non-existent method: %s' % attr) + setattr(self.instance_mgr.instances, attr, _mock) + + self.instance_mgr.instances.update({ + i.id_: i for i in instances + }) + self.instance_mgr.instances.count = len(instances) + + def test_update_state_gone(self): self.update_state_p.stop() - self.fake_driver.is_alive.return_value = True - - self.assertEqual(self.instance_mgr.update_state(self.ctx), - states.UP) - self.fake_driver.is_alive.assert_called_once_with( - self.INSTANCE_INFO.management_address) - - def test_update_state_no_backing_instance(self): - # this tests that a mgr gets its instance_info updated to None - # when the backing instance is no longer present. - self.instance_mgr.instance_info = None - self.ctx.nova_client.get_instance_info.return_value = None - self.update_state_p.stop() - self.assertEqual(self.instance_mgr.update_state(self.ctx), - states.DOWN) - self.assertFalse(self.fake_driver.is_alive.called) - - def test_update_state_instance_no_ports_still_booting(self): - self.update_state_p.stop() - self.ctx.neutron.get_ports_for_instance.return_value = (None, []) - - self.assertEqual(self.instance_mgr.update_state(self.ctx), - states.BOOTING) - self.assertFalse(self.fake_driver.is_alive.called) - - def test_update_state_log_boot_time_once(self): - self.update_state_p.stop() - self.instance_mgr.log = mock.Mock( - info=mock.Mock()) - self.ctx.nova_client.update_instance_info.return_value = ( - self.INSTANCE_INFO) - self.instance_mgr.state = states.CONFIGURED - self.fake_driver.is_alive.return_value = True - self.instance_mgr.update_state(self.ctx) + self.fake_driver.get_state.return_value = states.GONE self.assertEqual( - len(self.instance_mgr.log.info.call_args_list), - 1) - self.instance_mgr.update_state(self.ctx) + self.instance_mgr.update_state(self.ctx), + states.GONE + ) + + def test_update_state_down_no_backing_instances(self): + self.update_state_p.stop() + self.fake_driver.get_state.return_value = states.UP + self.instance_mgr.instances.__nonzero__.return_value = False self.assertEqual( - len(self.instance_mgr.log.info.call_args_list), - 1) - - @mock.patch('time.sleep', lambda *a: None) - def test_router_status_sync(self): - self.ctx.nova_client.update_instance_info.return_value = ( - self.INSTANCE_INFO) - self.update_state_p.stop() - self.fake_driver.is_alive.return_value = False - - # Router state should start down - self.instance_mgr.update_state(self.ctx) - self.fake_driver.synchronize_state.assert_called_with( - self.ctx, - state='down', + self.instance_mgr.update_state(self.ctx), + states.DOWN ) - self.fake_driver.synchronize_state.reset_mock() - - # Bring the router to UP with `is_alive = True` - self.fake_driver.is_alive.return_value = True - self.instance_mgr.update_state(self.ctx) - self.fake_driver.synchronize_state.assert_called_with( - self.ctx, - state='up', + self.assertEqual( + self.instance_mgr.state, + states.DOWN ) - self.fake_driver.synchronize_state.reset_mock() - self.fake_driver.build_config.return_value = {} - # Configure the router and make sure state is synchronized as ACTIVE - with mock.patch.object(self.instance_mgr, - '_verify_interfaces') as verify: - verify.return_value = True - self.instance_mgr.last_boot = datetime.utcnow() - self.instance_mgr.configure(self.ctx) - self.instance_mgr.update_state(self.ctx) - self.fake_driver.synchronize_state.assert_called_with( - self.ctx, - state='configured', - ) - self.fake_driver.synchronize_state.reset_mock() - - @mock.patch('time.sleep', lambda *a: None) - def test_router_status_caching(self): + def test_update_state_degraded(self): self.update_state_p.stop() - self.fake_driver.is_alive.return_value = False + self.fake_driver.get_state.return_value = states.UP + self.instance_mgr.instances.cluster_degraded = True + self.assertEqual( + self.instance_mgr.update_state(self.ctx), + states.DEGRADED + ) + self.assertEqual( + self.instance_mgr.state, + states.DEGRADED + ) - # Router state should start down - self.instance_mgr.update_state(self.ctx) - self.fake_driver.synchronize_state.assert_called_once_with( - self.ctx, state='down') - - @mock.patch('time.sleep') - def test_boot_timeout_still_booting(self, sleep): - now = datetime.utcnow() - self.INSTANCE_INFO.last_boot = now - self.instance_mgr.last_boot = now + def test_update_state_booting(self): self.update_state_p.stop() - self.fake_driver.is_alive.return_value = False - + self.fake_driver.get_state.return_value = states.UP + self.instance_mgr.instances.validate_ports.return_value = \ + ([], [mock.Mock()]) # (has_ports, no_ports) self.assertEqual( self.instance_mgr.update_state(self.ctx), states.BOOTING ) - self.fake_driver.is_alive.assert_has_calls([ - mock.call(self.INSTANCE_INFO.management_address), - mock.call(self.INSTANCE_INFO.management_address), - mock.call(self.INSTANCE_INFO.management_address), - ]) - @mock.patch('time.sleep') - def test_boot_timeout_error(self, sleep): - self.instance_mgr.state = states.ERROR - self.instance_mgr.last_boot = datetime.utcnow() + def test_update_state_down_all_instances_dead(self): self.update_state_p.stop() - self.fake_driver.is_alive.return_value = False + self.instance_mgr.state = states.CONFIGURED + self.instance_mgr.instances.validate_ports.return_value = \ + ([mock.Mock()], []) # (has_ports, no_ports) + self.instance_mgr.instances.are_alive.return_value = \ + ([], [mock.Mock()]) # (alive, dead) self.assertEqual( self.instance_mgr.update_state(self.ctx), - states.ERROR, + states.DOWN ) - self.fake_driver.is_alive.assert_has_calls([ - mock.call(self.INSTANCE_INFO.management_address), - mock.call(self.INSTANCE_INFO.management_address), - mock.call(self.INSTANCE_INFO.management_address), - ]) - @mock.patch('time.sleep') - def test_boot_timeout_error_no_last_boot(self, sleep): - self.instance_mgr.state = states.ERROR - self.instance_mgr.last_boot = None + def test_update_state_degraded_some_instances_dead(self): self.update_state_p.stop() - self.fake_driver.is_alive.return_value = False + self.instance_mgr.state = states.CONFIGURED + self.instance_mgr.instances.validate_ports.return_value = \ + ([mock.Mock()], []) # (has_ports, no_ports) + self.instance_mgr.instances.are_alive.return_value = \ + ([mock.Mock()], [mock.Mock()]) # (alive, dead) self.assertEqual( self.instance_mgr.update_state(self.ctx), - states.ERROR, - ) - self.fake_driver.is_alive.assert_has_calls([ - mock.call(self.INSTANCE_INFO.management_address), - mock.call(self.INSTANCE_INFO.management_address), - mock.call(self.INSTANCE_INFO.management_address), - ]) - - @mock.patch('time.sleep') - def test_boot_timeout(self, sleep): - self.instance_mgr.last_boot = datetime.utcnow() - timedelta(minutes=5) - self.update_state_p.stop() - self.fake_driver.is_alive.return_value = False - - self.assertEqual(self.instance_mgr.update_state(self.ctx), - states.DOWN) - self.fake_driver.is_alive.assert_has_calls([ - mock.call(self.INSTANCE_INFO.management_address), - mock.call(self.INSTANCE_INFO.management_address), - mock.call(self.INSTANCE_INFO.management_address), - ]) - self.instance_mgr.log.info.assert_called_once_with( - mock.ANY, - self.conf.boot_timeout, + states.DEGRADED ) - @mock.patch('time.sleep') - def test_update_state_is_down(self, sleep): + def test_update_state_up(self): self.update_state_p.stop() - self.fake_driver.is_alive.return_value = False + self.instance_mgr.state = states.BOOTING + self.instance_mgr.instances.validate_ports.return_value = \ + ([mock.Mock()], []) # (has_ports, no_ports) + self.instance_mgr.instances.are_alive.return_value = \ + ([mock.Mock()], []) # (alive, dead) - self.assertEqual(self.instance_mgr.update_state(self.ctx), - states.DOWN) - self.fake_driver.is_alive.assert_has_calls([ - mock.call(self.INSTANCE_INFO.management_address), - mock.call(self.INSTANCE_INFO.management_address), - mock.call(self.INSTANCE_INFO.management_address), - ]) + self.assertEqual( + self.instance_mgr.update_state(self.ctx), + states.UP + ) - @mock.patch('time.sleep') - def test_update_state_retry_delay(self, sleep): + def test_update_state_configured(self): self.update_state_p.stop() - self.fake_driver.is_alive.side_effect = [False, False, True] - max_retries = 5 - self.conf.max_retries = max_retries - self.instance_mgr.update_state(self.ctx, silent=False) - self.assertEqual(sleep.call_count, 2) + self.instance_mgr.log = mock.Mock( + info=mock.Mock()) + + self.instance_mgr.state = states.CONFIGURED + self.instance_mgr.instances.validate_ports.return_value = \ + ([mock.Mock()], []) # (has_ports, no_ports) + self.instance_mgr.instances.are_alive.return_value = \ + ([mock.Mock(booting=False)], []) # (alive, dead) + + self.assertEqual( + self.instance_mgr.update_state(self.ctx), + states.CONFIGURED + ) + + self.instance_mgr.update_state(self.ctx), + self.instance_mgr.update_state(self.ctx), + self.instance_mgr.update_state(self.ctx), + # ensure the boot was logged only once + self.assertEqual(len(self.instance_mgr.log.info.call_args_list), 1) @mock.patch('time.sleep') def test_boot_success(self, sleep): self.next_state = states.UP self.instance_mgr.boot(self.ctx) self.assertEqual(self.instance_mgr.state, states.BOOTING) - - self.ctx.nova_client.boot_instance.assert_called_once_with( - resource_type=self.fake_driver.RESOURCE_NAME, - prev_instance_info=self.INSTANCE_INFO, - name=self.fake_driver.name, - image_uuid=self.fake_driver.image_uuid, - flavor=self.fake_driver.flavor, - make_ports_callback='fake_ports_callback') - + self.instance_mgr.instances.create.assert_called_with( + self.ctx) self.assertEqual(1, self.instance_mgr.attempts) @mock.patch('time.sleep') def test_boot_instance_deleted(self, sleep): - self.ctx.nova_client.boot_instance.return_value = None + self.instance_mgr.instances.__nonzero__.return_value = False self.instance_mgr.boot(self.ctx) # a deleted VM should reset the vm mgr state and not as a failed # attempt self.assertEqual(self.instance_mgr.attempts, 0) - self.assertIsNone(self.instance_mgr.instance_info) - - @mock.patch('time.sleep') - def test_boot_fail(self, sleep): - self.next_state = states.DOWN - self.instance_mgr.boot(self.ctx) - self.assertEqual(self.instance_mgr.state, states.BOOTING) - self.ctx.nova_client.boot_instance.assert_called_once_with( - resource_type=self.fake_driver.RESOURCE_NAME, - prev_instance_info=self.INSTANCE_INFO, - name=self.fake_driver.name, - image_uuid=self.fake_driver.image_uuid, - flavor=self.fake_driver.flavor, - make_ports_callback='fake_ports_callback') - self.assertEqual(1, self.instance_mgr.attempts) @mock.patch('time.sleep') def test_boot_exception(self, sleep): - self.ctx.nova_client.boot_instance.side_effect = RuntimeError + self.instance_mgr.instances.create.side_effect = RuntimeError self.instance_mgr.boot(self.ctx) self.assertEqual(self.instance_mgr.state, states.DOWN) - self.ctx.nova_client.boot_instance.assert_called_once_with( - resource_type=self.fake_driver.RESOURCE_NAME, - prev_instance_info=self.INSTANCE_INFO, - name=self.fake_driver.name, - image_uuid=self.fake_driver.image_uuid, - flavor=self.fake_driver.flavor, - make_ports_callback='fake_ports_callback') + self.instance_mgr.instances.create.assert_called_with( + self.ctx) self.assertEqual(1, self.instance_mgr.attempts) - @mock.patch('time.sleep') - def test_boot_with_port_cleanup(self, sleep): - self.next_state = states.UP - - management_port = mock.Mock(id='mgmt', device_id='INSTANCE1') - external_port = mock.Mock(id='ext', device_id='INSTANCE1') - internal_port = mock.Mock(id='int', device_id='INSTANCE1') - - rtr = mock.sentinel.router - instance = mock.sentinel.instance - self.ctx.neutron.get_router_detail.return_value = rtr - self.ctx.nova_client.boot_instance.side_effect = RuntimeError - rtr.id = 'ROUTER1' - instance.id = 'INSTANCE1' - rtr.management_port = management_port - rtr.external_port = external_port - rtr.ports = mock.MagicMock() - rtr.ports.__iter__.return_value = [management_port, external_port, - internal_port] - self.instance_mgr.boot(self.ctx) - self.ctx.nova_client.boot_instance.assert_called_once_with( - resource_type=self.fake_driver.RESOURCE_NAME, - prev_instance_info=self.INSTANCE_INFO, - name=self.fake_driver.name, - image_uuid=self.fake_driver.image_uuid, - flavor=self.fake_driver.flavor, - make_ports_callback='fake_ports_callback') - self.instance_mgr.resource.delete_ports.assert_called_once_with( - self.ctx) - - @mock.patch('time.sleep') - def test_stop_success(self, sleep): + def test_stop_success(self): self.instance_mgr.state = states.UP - self.ctx.nova_client.get_instance_by_id.return_value = None + instance = instance_info() + self.set_instances_container_mocks( + instances=[instance], + mocks=[ + ('destroy', mock.Mock()), + ('update_ports', mock.Mock())]) + self.instance_mgr.stop(self.ctx) - self.ctx.nova_client.destroy_instance.assert_called_once_with( - self.INSTANCE_INFO - ) + self.instance_mgr.instances.destroy.assert_called_with(self.ctx) self.instance_mgr.resource.delete_ports.assert_called_once_with( self.ctx) self.assertEqual(self.instance_mgr.state, states.DOWN) - @mock.patch('time.time') - @mock.patch('time.sleep') - def test_stop_fail(self, sleep, time): - t = 1444679566 - side_effects = [t] - for i in range(30): - t = t + 1 - side_effects.append(t) - time.side_effect = side_effects - self.config(boot_timeout=30) + def test_stop_fail(self): self.instance_mgr.state = states.UP + self.set_instances_container_mocks( + instances=[instance_info()], + mocks=[ + ('destroy', mock.Mock()), + ('update_ports', mock.Mock())]) + self.instance_mgr.instances.destroy.side_effect = Exception self.instance_mgr.stop(self.ctx) self.assertEqual(self.instance_mgr.state, states.UP) - self.ctx.nova_client.destroy_instance.assert_called_once_with( - self.INSTANCE_INFO - ) - - @mock.patch('time.sleep') - def test_stop_router_already_deleted_from_neutron(self, sleep): - self.instance_mgr.state = states.GONE - self.ctx.nova_client.get_instance_by_id.return_value = None - self.instance_mgr.stop(self.ctx) - self.ctx.nova_client.destroy_instance.assert_called_once_with( - self.INSTANCE_INFO) - self.ctx.nova_client.get_instance_by_id.assert_called_with( - self.INSTANCE_INFO.id_ - ) self.fake_driver.delete_ports.assert_called_with(self.ctx) + + def test_stop_router_already_deleted_from_neutron(self): + self.instance_mgr.state = states.GONE + instance = instance_info() + self.set_instances_container_mocks( + instances=[instance], + mocks=[ + ('destroy', mock.Mock()), + ('update_ports', mock.Mock())]) + + self.instance_mgr.stop(self.ctx) + self.instance_mgr.instances.destroy.assert_called_with(self.ctx) + self.instance_mgr.resource.delete_ports.assert_called_once_with( + self.ctx) self.assertEqual(self.instance_mgr.state, states.GONE) - @mock.patch('time.sleep') - def test_stop_no_inst_router_already_deleted_from_neutron(self, sleep): + def test_stop_no_inst_router_already_deleted_from_neutron(self): self.instance_mgr.state = states.GONE - self.ctx.nova_client.get_instance_info.return_value = None + self.set_instances_container_mocks( + instances=[], + mocks=[ + ('destroy', mock.Mock()), + ('update_ports', mock.Mock())]) self.instance_mgr.stop(self.ctx) self.fake_driver.delete_ports.assert_called_with(self.ctx) self.assertEqual(self.instance_mgr.state, states.GONE) - @mock.patch('time.sleep') - def test_stop_instance_already_deleted_from_nova(self, sleep): + def test_stop_instance_already_deleted_from_nova(self): self.instance_mgr.state = states.RESTART - self.ctx.nova_client.get_instance_info.return_value = None + self.set_instances_container_mocks( + instances=[], + mocks=[ + ('destroy', mock.Mock()), + ('update_ports', mock.Mock())]) + self.instance_mgr.stop(self.ctx) self.fake_driver.delete_ports.assert_called_with(self.ctx) self.assertEqual(self.instance_mgr.state, states.DOWN) - def test_configure_success(self): - fake_config_dict = {'fake_config': 'foo'} - self.fake_driver.build_config.return_value = dict(fake_config_dict) - self.config(astara_metadata_port=4321) - self.config(host='foobarhost') - with mock.patch.object(self.instance_mgr, - '_verify_interfaces') as verify: - verify.return_value = True - self.instance_mgr.configure(self.ctx) - - verify.assert_called_once_with( - self.fake_driver.ports, - self.fake_driver.get_interfaces.return_value) - - self.fake_driver.build_config.assert_called_once_with( - self.ctx, - self.INSTANCE_INFO.management_port, - {'ext-net': 'ge1', 'int-net': 'ge2', 'mgt-net': 'ge0'}) - - self.fake_driver.update_config.assert_called_with( - self.INSTANCE_INFO.management_address, fake_config_dict) - self.assertEqual(self.instance_mgr.state, - states.CONFIGURED) - def test_configure_mismatched_interfaces(self): - with mock.patch.object(self.instance_mgr, - '_verify_interfaces') as verify: - verify.return_value = False - self.instance_mgr.configure(self.ctx) + self.instance_mgr.instances.verify_interfaces.return_value = False + self.assertEqual( + self.instance_mgr.configure(self.ctx), + states.REPLUG, + ) - verify.assert_called_once_with( - self.fake_driver.ports, - self.fake_driver.get_interfaces.return_value) + def test_configure_gone(self): + self.fake_driver.get_state.return_value = states.GONE + self.assertEqual( + self.instance_mgr.configure(self.ctx), states.GONE) - self.assertFalse(self.fake_driver.update_config.called) - self.assertEqual(self.instance_mgr.state, states.REPLUG) + def test_configure(self): + self.instance_mgr.instances.verify_interfaces.return_value = True + self.instance_mgr.instances.configure.return_value = states.RESTART + self.assertEqual( + self.instance_mgr.configure(self.ctx), + states.RESTART, + ) + self.instance_mgr.instances.verify_interfaces.assert_called_with( + self.fake_driver.ports + ) + self.instance_mgr.instances.configure.assert_called_with(self.ctx) - @mock.patch('time.sleep') - def test_configure_failure(self, sleep): - fake_config_dict = {'fake_config': 'foo'} - - self.fake_driver.update_config.side_effect = Exception - self.fake_driver.build_config.return_value = fake_config_dict - - with mock.patch.object(self.instance_mgr, - '_verify_interfaces') as verify: - verify.return_value = True - self.instance_mgr.configure(self.ctx) - - interfaces = self.fake_driver.get_interfaces.return_value - verify.assert_called_once_with( - self.fake_driver.ports, interfaces) - - expected_calls = [ - mock.call(self.INSTANCE_INFO.management_address, - fake_config_dict) - for i in range(0, 2)] - self.fake_driver.update_config.assert_has_calls(expected_calls) - self.assertEqual(self.instance_mgr.state, states.RESTART) - - @mock.patch('time.sleep', lambda *a: None) - def test_replug_add_new_port_success(self): + @mock.patch.object(instance_manager.InstanceManager, + '_wait_for_interface_hotplug') + def test_replug_add_new_port_success(self, wait_for_hotplug): self.instance_mgr.state = states.REPLUG - - self.fake_driver.get_interfaces.return_value = [ - {'lladdr': fake_mgt_port.mac_address}, - {'lladdr': fake_ext_port.mac_address}, - {'lladdr': fake_int_port.mac_address} - ] - self.conf.hotplug_timeout = 5 + instance = instance_info() + get_interfaces = mock.Mock( + return_value={ + instance: [ + {'lladdr': fake_mgt_port.mac_address}, + {'lladdr': fake_ext_port.mac_address}, + {'lladdr': fake_int_port.mac_address}] + } + ) + self.set_instances_container_mocks( + instances=[instance], mocks=[('get_interfaces', get_interfaces)]) fake_instance = mock.MagicMock() self.ctx.nova_client.get_instance_by_id = mock.Mock( return_value=fake_instance) + fake_new_port = fake_add_port self.fake_driver.ports.append(fake_new_port) self.ctx.neutron.create_vrrp_port.return_value = fake_new_port - with mock.patch.object(self.instance_mgr, - '_verify_interfaces') as verify: - verify.return_value = True # the hotplug worked! - self.instance_mgr.replug(self.ctx) + self.fake_driver.get_interfaces.return_value = [ + {'lladdr': fake_mgt_port.mac_address}, + {'lladdr': fake_ext_port.mac_address}, + {'lladdr': fake_int_port.mac_address}, + {'lladdr': fake_new_port.mac_address}, + ] - self.ctx.neutron.create_vrrp_port.assert_called_with( - self.fake_driver.id, 'additional-net' - ) - self.assertEqual(self.instance_mgr.state, states.REPLUG) - fake_instance.interface_attach.assert_called_once_with( - fake_new_port.id, None, None - ) - self.assertIn(fake_new_port, self.INSTANCE_INFO.ports) + wait_for_hotplug.return_value = True + self.instance_mgr.replug(self.ctx) + + self.ctx.neutron.create_vrrp_port.assert_called_with( + self.fake_driver.id, 'additional-net' + ) + self.assertEqual(self.instance_mgr.state, states.REPLUG) + fake_instance.interface_attach.assert_called_once_with( + fake_new_port.id, None, None + ) + self.assertIn(fake_new_port, instance.ports) - @mock.patch('time.sleep', lambda *a: None) def test_replug_add_new_port_failure(self): self.instance_mgr.state = states.REPLUG + instance = instance_info() + get_interfaces = mock.Mock( + return_value={ + instance: [ + {'lladdr': fake_mgt_port.mac_address}, + {'lladdr': fake_ext_port.mac_address}, + {'lladdr': fake_int_port.mac_address}] + } + ) + self.set_instances_container_mocks( + instances=[instance], + mocks=[('get_interfaces', get_interfaces)] + ) self.fake_driver.get_interfaces.return_value = [ {'lladdr': fake_mgt_port.mac_address}, {'lladdr': fake_ext_port.mac_address}, {'lladdr': fake_int_port.mac_address} ] - self.conf.hotplug_timeout = 5 - fake_instance = mock.MagicMock() + fake_instance.interface_attach = mock.Mock( + side_effect=Exception, + ) self.ctx.nova_client.get_instance_by_id = mock.Mock( return_value=fake_instance) fake_new_port = fake_add_port self.fake_driver.ports.append(fake_new_port) self.ctx.neutron.create_vrrp_port.return_value = fake_new_port + self.instance_mgr.replug(self.ctx) + self.assertEqual(self.instance_mgr.state, states.RESTART) - with mock.patch.object(self.instance_mgr, - '_verify_interfaces') as verify: - verify.return_value = False # The hotplug didn't work! - self.instance_mgr.replug(self.ctx) - self.assertEqual(self.instance_mgr.state, states.RESTART) + fake_instance.interface_attach.assert_called_once_with( + fake_new_port.id, None, None) - fake_instance.interface_attach.assert_called_once_with( - fake_new_port.id, None, None - ) + @mock.patch.object(instance_manager.InstanceManager, + '_wait_for_interface_hotplug') + def test_replug_add_new_port_failed_degraded(self, wait_for_hotplug): + self.conf.hotplug_timeout = 2 + self.instance_mgr.state = states.REPLUG + instance_1 = instance_info() + instance_2 = instance_info() + get_interfaces = mock.Mock( + return_value={ + instance_1: [ + {'lladdr': fake_mgt_port.mac_address}, + {'lladdr': fake_ext_port.mac_address}, + {'lladdr': fake_int_port.mac_address}], + instance_2: [ + {'lladdr': fake_mgt_port.mac_address}, + {'lladdr': fake_ext_port.mac_address}, + {'lladdr': fake_int_port.mac_address}] + } + ) - @mock.patch('time.sleep', lambda *a: None) - def test_replug_remove_port_success(self): + self.set_instances_container_mocks( + instances=[instance_1, instance_2], + mocks=[('get_interfaces', get_interfaces)]) + self.instance_mgr.instances.update({ + i.id_: i for i in [instance_1, instance_2] + }) + + instances = [] + for i in range(2): + fake_instance = mock.MagicMock() + fake_instance.interface_attach = mock.Mock() + instances.append(fake_instance) + + instances[1].interface_attach.side_effect = Exception + self.ctx.nova_client.get_instance_by_id.side_effect = instances + + fake_new_port = fake_add_port + self.fake_driver.ports.append(fake_new_port) + self.ctx.neutron.create_vrrp_port.return_value = fake_new_port + + wait_for_hotplug.return_value = True + self.instance_mgr.replug(self.ctx) + self.assertEqual(self.instance_mgr.state, states.DEGRADED) + + for instance in instances: + instance.interface_attach.assert_called_with( + fake_new_port.id, None, None) + self.assertNotIn(instances[1], self.instance_mgr.instances.values()) + + @mock.patch.object(instance_manager.InstanceManager, + '_wait_for_interface_hotplug') + def test_replug_add_new_port_hotplug_failed_degraded(self, + wait_for_hotplug): + self.instance_mgr.state = states.REPLUG + instance_1 = instance_info() + instance_2 = instance_info() + get_interfaces = mock.Mock( + return_value={ + instance_1: [ + {'lladdr': fake_mgt_port.mac_address}, + {'lladdr': fake_ext_port.mac_address}, + {'lladdr': fake_int_port.mac_address}], + instance_2: [ + {'lladdr': fake_mgt_port.mac_address}, + {'lladdr': fake_ext_port.mac_address}, + {'lladdr': fake_int_port.mac_address}] + } + ) + + self.set_instances_container_mocks( + instances=[instance_1, instance_2], + mocks=[('get_interfaces', get_interfaces)]) + + fake_new_port = fake_add_port + + instances = [] + for i in range(2): + fake_instance = mock.MagicMock() + fake_instance.interface_attach = mock.Mock() + instances.append(fake_instance) + self.ctx.nova_client.get_instance_by_id.side_effect = instances + + fake_new_port = fake_add_port + self.fake_driver.ports.append(fake_new_port) + self.ctx.neutron.create_vrrp_port.return_value = fake_new_port + + # the second instance fails to hotplug + wait_for_hotplug.side_effect = [True, False] + + self.instance_mgr.replug(self.ctx) + self.assertEqual(self.instance_mgr.state, states.DEGRADED) + + for instance in instances: + instance.interface_attach.assert_called_with( + fake_new_port.id, None, None) + self.assertNotIn(instances[1], self.instance_mgr.instances.values()) + + @mock.patch.object(instance_manager.InstanceManager, + '_wait_for_interface_hotplug') + def test_replug_remove_port_success(self, wait_for_hotplug): self.instance_mgr.state = states.REPLUG - # Resource lacks the fake_ext_port, it will be unplugged - self.fake_driver.ports = [fake_mgt_port, fake_int_port] - self.fake_driver.get_interfaces.return_value = [ - {'lladdr': fake_mgt_port.mac_address}, - {'lladdr': fake_int_port.mac_address}, - {'lladdr': fake_ext_port.mac_address}, - ] - self.conf.hotplug_timeout = 5 + self.fake_driver.ports = [fake_ext_port, fake_int_port] + + instance_1 = instance_info() + instance_1.ports.append(fake_add_port) + + get_interfaces = mock.Mock( + return_value={ + # Instance contains an extra port, it will be removed + instance_1: [ + {'lladdr': fake_mgt_port.mac_address}, + {'lladdr': fake_ext_port.mac_address}, + {'lladdr': fake_int_port.mac_address}, + {'lladdr': fake_add_port.mac_address}, + ], + } + ) + self.set_instances_container_mocks( + instances=[instance_1], + mocks=[('get_interfaces', get_interfaces)]) fake_instance = mock.MagicMock() self.ctx.nova_client.get_instance_by_id = mock.Mock( return_value=fake_instance) - with mock.patch.object(self.instance_mgr, - '_verify_interfaces') as verify: - verify.return_value = True # the unplug worked! - self.instance_mgr.replug(self.ctx) - self.assertEqual(self.instance_mgr.state, states.REPLUG) - fake_instance.interface_detach.assert_called_once_with( - fake_ext_port.id - ) - self.assertNotIn(fake_ext_port, self.INSTANCE_INFO.ports) + wait_for_hotplug.return_value = True + self.instance_mgr.replug(self.ctx) + self.assertEqual(self.instance_mgr.state, states.REPLUG) + fake_instance.interface_detach.assert_called_once_with( + fake_add_port.id) + self.assertNotIn(fake_add_port, instance_1.ports) - @mock.patch('time.sleep', lambda *a: None) def test_replug_remove_port_failure(self): self.instance_mgr.state = states.REPLUG - # Router lacks the fake_ext_port, it will be unplugged - self.fake_driver.ports = [fake_mgt_port, fake_int_port] - self.fake_driver.get_interfaces.return_value = [ - {'lladdr': fake_mgt_port.mac_address}, - {'lladdr': fake_ext_port.mac_address}, - {'lladdr': fake_int_port.mac_address} - ] - self.conf.hotplug_timeout = 5 + self.fake_driver.ports = [fake_ext_port, fake_int_port] + + instance_1 = instance_info() + instance_1.ports.append(fake_add_port) + + get_interfaces = mock.Mock( + return_value={ + # Instance contains an extra port, it will be removed + instance_1: [ + {'lladdr': fake_mgt_port.mac_address}, + {'lladdr': fake_ext_port.mac_address}, + {'lladdr': fake_int_port.mac_address}, + {'lladdr': fake_add_port.mac_address}], + } + ) + self.set_instances_container_mocks( + instances=[instance_1], + mocks=[('get_interfaces', get_interfaces)]) + + fake_instance = mock.MagicMock() + self.ctx.nova_client.get_instance_by_id = mock.Mock( + return_value=fake_instance) + fake_instance.interface_detach.side_effect = Exception + + self.instance_mgr.replug(self.ctx) + self.assertEqual(self.instance_mgr.state, + states.RESTART) + fake_instance.interface_detach.assert_called_once_with( + fake_add_port.id + ) + + @mock.patch.object(instance_manager.InstanceManager, + '_wait_for_interface_hotplug') + def test_replug_remove_port_hotplug_failed(self, wait_for_hotplug): + self.instance_mgr.state = states.REPLUG + + self.fake_driver.ports = [fake_ext_port, fake_int_port] + + instance_1 = instance_info() + instance_1.ports.append(fake_add_port) + + get_interfaces = mock.Mock( + return_value={ + # Instance contains an extra port, it will be removed + instance_1: [ + {'lladdr': fake_mgt_port.mac_address}, + {'lladdr': fake_ext_port.mac_address}, + {'lladdr': fake_int_port.mac_address}, + {'lladdr': fake_add_port.mac_address} + ], + } + ) + self.set_instances_container_mocks( + instances=[instance_1], + mocks=[('get_interfaces', get_interfaces)]) fake_instance = mock.MagicMock() self.ctx.nova_client.get_instance_by_id = mock.Mock( return_value=fake_instance) - with mock.patch.object(self.instance_mgr, - '_verify_interfaces') as verify: - verify.return_value = False # the unplug failed! - self.instance_mgr.replug(self.ctx) - self.assertEqual(self.instance_mgr.state, - states.RESTART) - fake_instance.interface_detach.assert_called_once_with( - fake_ext_port.id - ) + wait_for_hotplug.return_value = False + self.instance_mgr.replug(self.ctx) + self.assertEqual(self.instance_mgr.state, + states.RESTART) + fake_instance.interface_detach.assert_called_once_with( + fake_add_port.id + ) - def test_verify_interfaces(self): - self.fake_driver.ports = [fake_mgt_port, fake_ext_port, fake_int_port] - interfaces = [ - {'lladdr': fake_mgt_port.mac_address}, - {'lladdr': fake_ext_port.mac_address}, - {'lladdr': fake_int_port.mac_address} + def test_wait_for_interface_hotplug_true(self): + instance = instance_info() + self.fake_driver.get_interfaces.side_effect = [ + [ + {'lladdr': fake_mgt_port.mac_address}, + {'lladdr': fake_ext_port.mac_address}, + ], + [ + {'lladdr': fake_mgt_port.mac_address}, + {'lladdr': fake_ext_port.mac_address}, + ], + [ + {'lladdr': fake_mgt_port.mac_address}, + {'lladdr': fake_ext_port.mac_address}, + {'lladdr': fake_int_port.mac_address}, + ], ] + self.assertEqual( + self.instance_mgr._wait_for_interface_hotplug(instance), True) + self.assertEqual( + len(self.fake_driver.get_interfaces.call_args_list), 3) - self.assertTrue(self.instance_mgr._verify_interfaces( - self.fake_driver.ports, interfaces)) - - def test_verify_interfaces_with_cleared_gateway(self): - self.fake_driver.ports = [fake_mgt_port, fake_ext_port, fake_int_port] - - interfaces = [ - {'lladdr': 'a:b:c:d'}, - {'lladdr': 'd:c:b:a'}, - {'lladdr': 'a:a:a:a'} - ] - - self.assertFalse(self.instance_mgr._verify_interfaces( - self.fake_driver.ports, interfaces)) + def test_wait_for_interface_hotplug_false(self): + self.conf.hotplug_timeout = 5 + instance = instance_info() + self.fake_driver.get_interfaces.side_effect = [ + [ + {'lladdr': fake_mgt_port.mac_address}, + {'lladdr': fake_ext_port.mac_address}, + ] + for i in six.moves.range(5)] + self.assertEqual( + self.instance_mgr._wait_for_interface_hotplug(instance), False) + self.assertEqual( + len(self.fake_driver.get_interfaces.call_args_list), 4) def test_set_error_when_booting(self): self.instance_mgr.state = states.BOOTING @@ -672,14 +729,7 @@ class TestInstanceManager(base.RugTestBase): self.instance_mgr.set_error(self.ctx) self.instance_mgr.boot(self.ctx) self.assertEqual(self.instance_mgr.state, states.BOOTING) - - self.ctx.nova_client.boot_instance.assert_called_once_with( - resource_type=self.fake_driver.RESOURCE_NAME, - prev_instance_info=self.INSTANCE_INFO, - name=self.fake_driver.name, - image_uuid=self.fake_driver.image_uuid, - flavor=self.fake_driver.flavor, - make_ports_callback='fake_ports_callback') + self.instance_mgr.instances.create.assert_called_with(self.ctx) def test_error_cooldown(self): self.config(error_state_cooldown=30) @@ -694,20 +744,35 @@ class TestInstanceManager(base.RugTestBase): self.assertFalse(self.instance_mgr.error_cooldown) def test_ensure_cache(self): - self.instance_mgr.instance_info = 'stale_info' - self.ctx.nova_client.get_instance_info.return_value = \ - self.INSTANCE_INFO + self.set_instances_container_mocks(mocks=[ + ('update_ports', mock.Mock()) + ]) + self.instance_mgr.instances['fake_instance_id1'] = 'stale_instance1' + self.instance_mgr.instances['fake_instance_id2'] = 'stale_instance2' + + fake_inst_1 = mock.Mock(id_='fake_instance_id1') + fake_inst_2 = mock.Mock(id_='fake_instance_id2') + + self.ctx.nova_client.get_instances_for_obj.return_value = [ + fake_inst_1, fake_inst_2] def ensured_cache(self, ctx): pass + wrapped = instance_manager.ensure_cache(ensured_cache) wrapped(self.instance_mgr, self.ctx) - self.assertEqual(self.instance_mgr.instance_info, self.INSTANCE_INFO) + exp_updated_instances = { + 'fake_instance_id1': fake_inst_1, + 'fake_instance_id2': fake_inst_2, + } + self.assertEqual( + self.instance_mgr.instances, exp_updated_instances) + self.instance_mgr.instances.update_ports.assert_called_with(self.ctx) -class TestBootAttemptCounter(unittest.TestCase): - +class TestBootAttemptCounter(base.RugTestBase): def setUp(self): + super(TestBootAttemptCounter, self).setUp() self.c = instance_manager.BootAttemptCounter() def test_start(self): @@ -720,3 +785,379 @@ class TestBootAttemptCounter(unittest.TestCase): self.c._attempts = 2 self.c.reset() self.assertEqual(0, self.c._attempts) + + +class TestInstanceGroupManager(base.RugTestBase): + def setUp(self): + super(TestInstanceGroupManager, self).setUp() + self.ctx = fakes.fake_worker_context() + self.fake_driver = fakes.fake_driver() + self.group_mgr = instance_manager.InstanceGroupManager( + log=mock.Mock(), resource=self.fake_driver) + name = 'ak-resource-' + str(uuid.uuid4()) + self.instance_1 = instance_info(mgt_port=fake_mgt_port, + name=name + '_0') + self.instance_2 = instance_info(mgt_port=fake_add_port, + name=name + '_1') + self.instances = [self.instance_1, self.instance_2] + [self.group_mgr.add_instance(i) for i in self.instances] + + def test_validate_ports(self): + self.instance_2.management_port = None + has_ports, no_ports = self.group_mgr.validate_ports() + self.assertIn(self.instance_1, has_ports) + self.assertIn(self.instance_2, no_ports) + + def test_are_alive_all_alive(self): + self.fake_driver.is_alive.side_effect = [ + False, False, True, False, True] + alive, dead = self.group_mgr.are_alive() + self.assertEqual(sorted(alive), sorted(self.instances)) + + def test_are_alive_all_dead(self): + self.fake_driver.is_alive.return_value = False + alive, dead = self.group_mgr.are_alive() + self.assertEqual(sorted(dead), sorted(self.instances)) + self.assertEqual(alive, []) + + def test_are_alive_some_dead(self): + self.group_mgr = instance_manager.InstanceGroupManager( + log=mock.Mock(), resource=self.fake_driver) + self.instance_1 = instance_info(mgt_port=fake_mgt_port) + self.instance_2 = instance_info(mgt_port=fake_add_port) + instances = [self.instance_1, self.instance_2] + [self.group_mgr.add_instance(i) for i in instances] + + def fake_is_alive(mgt_addr, i1=self.instance_1, i2=self.instance_2): + # tag instance 2 as dead + if mgt_addr == fake_add_port.fixed_ips[0].ip_address: + return False + else: + return True + [self.group_mgr.add_instance(i) for i in instances] + self.fake_driver.is_alive = fake_is_alive + alive, dead = self.group_mgr.are_alive() + self.assertEqual(dead, [self.instance_2]) + self.assertEqual(alive, [self.instance_1]) + + def test_update_ports(self): + self.ctx.neutron.get_ports_for_instance.side_effect = [ + ('instance1_mgt_port', ['instance1_inst_port']), + ('instance2_mgt_port', ['instance2_inst_port']), + ] + self.group_mgr.update_ports(self.ctx) + self.assertEqual(self.instance_1.management_port, 'instance1_mgt_port') + self.assertEqual(self.instance_1.ports, ['instance1_inst_port']) + self.assertEqual(self.instance_2.management_port, 'instance2_mgt_port') + self.assertEqual(self.instance_2.ports, ['instance2_inst_port']) + + def test_get_interfaces(self): + self.fake_driver.get_interfaces.side_effect = [ + ['instance1_interfaces'], + ['instance2_interfaces'], + ] + self.group_mgr._alive = [i.id_ for i in self.instances] + interfaces_dict = self.group_mgr.get_interfaces() + self.assertIn( + (self.instance_1, ['instance1_interfaces']), + interfaces_dict.items()) + self.assertIn( + (self.instance_2, ['instance2_interfaces']), + interfaces_dict.items()) + + def test_get_interfaces_skip_dead(self): + self.fake_driver.get_interfaces.side_effect = [ + ['instance1_interfaces'], + ['instance2_interfaces'], + ] + self.group_mgr._alive = [self.instance_1.id_] + interfaces_dict = self.group_mgr.get_interfaces() + self.assertIn( + (self.instance_1, ['instance1_interfaces']), + interfaces_dict.items()) + self.assertNotIn( + (self.instance_2, ['instance2_interfaces']), + interfaces_dict.items()) + + @mock.patch('astara.instance_manager.InstanceGroupManager.get_interfaces') + def test_verify_interfaces_true(self, fake_get_interfaces): + fake_get_interfaces.return_value = { + self.instance_1: [ + {'lladdr': p.mac_address} + for p in self.instance_1.ports + + [self.instance_1.management_port] + ], + self.instance_2: [ + {'lladdr': p.mac_address} + for p in self.instance_2.ports + + [self.instance_2.management_port] + ] + } + + ports = [fake_ext_port, fake_int_port] + self.assertTrue(self.group_mgr.verify_interfaces(ports)) + + @mock.patch('astara.instance_manager.InstanceGroupManager.get_interfaces') + def test_verify_interfaces_false_missing_inst_port(self, + fake_get_interfaces): + fake_get_interfaces.return_value = { + self.instance_1: [ + {'lladdr': p.mac_address} + for p in self.instance_1.ports + + [self.instance_1.management_port] + ], + self.instance_2: [ + {'lladdr': p.mac_address} + for p in self.instance_2.ports + + [self.instance_2.management_port] + ] + } + + ports = [fake_ext_port, fake_int_port, fake_add_port] + self.assertFalse(self.group_mgr.verify_interfaces(ports)) + + @mock.patch('astara.instance_manager.InstanceGroupManager.get_interfaces') + def test_verify_interfaces_false_missing_macs(self, fake_get_interfaces): + fake_get_interfaces.return_value = { + self.instance_1: [ + {'lladdr': p.mac_address} + for p in self.instance_1.ports + ], + self.instance_2: [ + {'lladdr': p.mac_address} + for p in self.instance_2.ports] + } + + ports = [fake_ext_port, fake_int_port] + self.assertFalse(self.group_mgr.verify_interfaces(ports)) + + def test__update_config_success(self): + self.fake_driver.update_config.side_effect = [ + Exception, Exception, True] + self.assertTrue(self.group_mgr._update_config(self.instance_1, {})) + self.fake_driver.update_config.assert_called_with( + self.instance_1.management_address, {}) + + def test__update_config_fail(self): + self.fake_driver.update_config.side_effect = Exception + self.assertFalse(self.group_mgr._update_config(self.instance_1, {})) + self.fake_driver.update_config.assert_called_with( + self.instance_1.management_address, {}) + + def test__ha_config(self): + instance_1_ha_config = self.group_mgr._ha_config(self.instance_1) + instance_2_ha_config = self.group_mgr._ha_config(self.instance_2) + self.assertEqual( + instance_1_ha_config, + { + 'priority': 100, + 'peers': [self.instance_2.management_address], + }) + self.assertEqual( + instance_2_ha_config, + { + 'priority': 50, + 'peers': [self.instance_1.management_address], + }) + + @mock.patch('astara.instance_manager.InstanceGroupManager._update_config') + @mock.patch('astara.instance_manager.InstanceGroupManager._ha_config') + @mock.patch('astara.instance_manager._generate_interface_map') + @mock.patch('astara.instance_manager.InstanceGroupManager.get_interfaces') + def test_configure_success(self, fake_get_interfaces, fake_gen_iface_map, + fake_ha_config, fake_update_config): + fake_ha_config.return_value = {'fake_ha_config': 'peers'} + self.fake_driver.is_ha = True + self.fake_driver.build_config.side_effect = [ + {'instance_1_config': 'config'}, + {'instance_2_config': 'config'}, + ] + fake_get_interfaces.return_value = collections.OrderedDict([ + (self.instance_1, [ + {'lladdr': p.mac_address} for p in self.instance_1.ports + + [self.instance_1.management_port]]), + (self.instance_2, [ + {'lladdr': p.mac_address} for p in self.instance_2.ports + + [self.instance_2.management_port]]) + ]) + + fake_update_config.return_value = True + self.assertEqual(self.group_mgr.configure(self.ctx), states.CONFIGURED) + self.assertIn( + mock.call( + self.instance_1, + { + 'instance_1_config': 'config', + 'ha_config': {'fake_ha_config': 'peers'} + }), + fake_update_config.call_args_list) + self.assertIn( + mock.call( + self.instance_2, + { + 'instance_2_config': 'config', + 'ha_config': {'fake_ha_config': 'peers'} + }), + fake_update_config.call_args_list) + + @mock.patch('astara.instance_manager.InstanceGroupManager._update_config') + @mock.patch('astara.instance_manager.InstanceGroupManager._ha_config') + @mock.patch('astara.instance_manager._generate_interface_map') + @mock.patch('astara.instance_manager.InstanceGroupManager.get_interfaces') + def test_configure_failed_all(self, fake_get_interfaces, + fake_gen_iface_map, fake_ha_config, + fake_update_config): + fake_ha_config.return_value = {'fake_ha_config': 'peers'} + self.fake_driver.is_ha = True + self.fake_driver.build_config.side_effect = [ + {'instance_1_config': 'config'}, + {'instance_2_config': 'config'}, + ] + fake_get_interfaces.return_value = collections.OrderedDict([ + (self.instance_1, [ + {'lladdr': p.mac_address} for p in self.instance_1.ports + + [self.instance_1.management_port]]), + (self.instance_2, [ + {'lladdr': p.mac_address} for p in self.instance_2.ports + + [self.instance_2.management_port]]) + ]) + + fake_update_config.return_value = False + self.assertEqual(self.group_mgr.configure(self.ctx), states.RESTART) + + @mock.patch('astara.instance_manager.InstanceGroupManager._update_config') + @mock.patch('astara.instance_manager.InstanceGroupManager._ha_config') + @mock.patch('astara.instance_manager._generate_interface_map') + @mock.patch('astara.instance_manager.InstanceGroupManager.get_interfaces') + def test_configure_failed_some(self, fake_get_interfaces, + fake_gen_iface_map, fake_ha_config, + fake_update_config): + fake_ha_config.return_value = {'fake_ha_config': 'peers'} + self.fake_driver.is_ha = True + self.fake_driver.build_config.side_effect = [ + {'instance_1_config': 'config'}, + {'instance_2_config': 'config'}, + ] + fake_get_interfaces.return_value = collections.OrderedDict([ + (self.instance_1, [ + {'lladdr': p.mac_address} for p in self.instance_1.ports + + [self.instance_1.management_port]]), + (self.instance_2, [ + {'lladdr': p.mac_address} for p in self.instance_2.ports + + [self.instance_2.management_port]])]) + + fake_update_config.side_effect = [False, True] + self.assertEqual(self.group_mgr.configure(self.ctx), states.DEGRADED) + + @mock.patch('astara.instance_manager.InstanceGroupManager._update_config') + @mock.patch('astara.instance_manager.InstanceGroupManager._ha_config') + @mock.patch('astara.instance_manager._generate_interface_map') + @mock.patch('astara.instance_manager.InstanceGroupManager.get_interfaces') + def test_configure_degraded_waiting(self, fake_get_interfaces, + fake_gen_iface_map, fake_ha_config, + fake_update_config): + fake_ha_config.return_value = {'fake_ha_config': 'peers'} + self.fake_driver.is_ha = True + self.fake_driver.build_config.side_effect = [ + {'instance_1_config': 'config'}, + {'instance_2_config': 'config'}, + ] + fake_get_interfaces.return_value = collections.OrderedDict([ + (self.instance_1, [ + {'lladdr': p.mac_address} for p in self.instance_1.ports + + [self.instance_1.management_port]]) + ]) + + fake_update_config.return_value = True + self.assertEqual(self.group_mgr.configure(self.ctx), states.DEGRADED) + + def test_delete(self): + self.group_mgr.delete(self.instance_2) + self.assertNotIn( + self.instance_2, self.group_mgr.instances) + + def test_refresh(self): + self.ctx.nova_client.update_instance_info.return_value = True + self.group_mgr.refresh(self.ctx) + [self.assertIn(mock.call(i), + self.ctx.nova_client.update_instance_info.call_args_list) + for i in self.instances] + [self.assertIn(i, self.group_mgr.instances) for i in self.instances] + + def test_refresh_instance_gone(self): + self.ctx.nova_client.update_instance_info.side_effect = [True, None] + self.group_mgr.refresh(self.ctx) + [self.assertIn(mock.call(i), + self.ctx.nova_client.update_instance_info.call_args_list) + for i in self.instances] + self.assertIn(self.instance_1, self.group_mgr.instances) + self.assertNotIn(self.instance_2, self.group_mgr.instances) + + def test_destroy(self): + self.group_mgr.destroy(self.ctx) + self.ctx.nova_client.delete_instances_and_wait.assert_called_with( + self.group_mgr.instances) + + def test_remove(self): + self.group_mgr.remove(self.ctx, self.instance_1) + self.ctx.nova_client.destroy_instance.assert_called_with( + self.instance_1) + self.assertNotIn(self.instance_1, self.group_mgr.instances) + + def test_next_instance_index(self): + self.assertEqual( + self.group_mgr.next_instance_index, 2) + + def test_next_instance_index_empty(self): + group_mgr = instance_manager.InstanceGroupManager( + log=mock.Mock(), resource=self.fake_driver) + self.assertEqual( + group_mgr.next_instance_index, 0) + + def test_create_all(self): + [self.group_mgr.delete(i) for i in self.instances] + self.ctx.nova_client.boot_instance.side_effect = [ + instance_info(name='new-instance_0'), + instance_info(name='new-instance_1'), + ] + self.group_mgr.create(self.ctx) + self.assertEqual( + len(self.ctx.nova_client.boot_instance.call_args_list), 2) + + def test_create_some(self): + self.group_mgr.delete(self.instance_1) + self.ctx.nova_client.boot_instance.side_effect = [ + instance_info(name='new-instance_0'), + ] + self.group_mgr.create(self.ctx) + self.assertEqual( + len(self.ctx.nova_client.boot_instance.call_args_list), 1) + self.ctx.nova_client.boot_instance.assert_called_with( + resource_type=self.fake_driver.RESOURCE_NAME, + prev_instance_info=None, + name='ak-FakeDriver-fake_resource_id_2', + image_uuid=self.fake_driver.image_uuid, + flavor=self.fake_driver.flavor, + make_ports_callback=self.fake_driver.make_ports(self.ctx), + ) + + def test_required_instance_count(self): + self.fake_driver.is_ha = True + self.assertEqual(self.group_mgr.required_instance_count, 2) + self.fake_driver.is_ha = False + self.assertEqual(self.group_mgr.required_instance_count, 1) + + def test_instance_count(self): + self.assertEqual(self.group_mgr.instance_count, 2) + + def test_cluster_degraded_false(self): + self.assertFalse(self.group_mgr.cluster_degraded) + + def test_cluster_degraded_true(self): + self.group_mgr.delete(self.instance_1) + self.assertTrue(self.group_mgr.cluster_degraded) + + def test_add_instance(self): + instance_3 = instance_info() + self.group_mgr.add_instance(instance_3) + self.assertIn(instance_3, self.group_mgr.instances) diff --git a/doc/source/install.rst b/doc/source/install.rst index 82efd204..0711380c 100644 --- a/doc/source/install.rst +++ b/doc/source/install.rst @@ -1,3 +1,5 @@ +.. _install_astara: + Astara Installation =================== @@ -244,7 +246,7 @@ All configuration is to be performed on the controller node. If you don't plan to build your own appliance image, one can be downloaded for testing at: http://tarballs.openstack.org/akanda-appliance/images/ - If you want to build one yourself instructions are found in the :ref:`appliance documation` + If you want to build one yourself instructions are found in the :ref:`appliance documentation` In either case, upload the image to Glance (this command must be performed in the directory where the image was downloaded/created):: openstack image create astara --public --container-format=bare --disk-format=qcow2 --file astara.qcow2 @@ -321,3 +323,23 @@ Output similar to:: +--------------------------------------+------------------------------------------------+----------------------------------+--------+------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ The line with the ak-router shows that astara has built the router VM. Further operation and debug information can be found in the :ref:`operator tools` section. + +.. _cluster_astara: + +Clustering astara-orchestrator +------------------------------ + +The ``astara-orchestartor`` service supports clustering among multiple processes spanning multiple nodes to provide active/active clustering for +purposes of load-distribution and high-availability (HA). In this setup, multiple ``astara-orchestrator`` processes form a distributed hash ring, +in which each is responsible for orchestrating a subset of virtual appliances. When one ``astara-orchestrator`` falls offline, management of +its resources are redistributed to remaining nodes. This feature requires the use of an external coordination service (ie, zookeeper), +as provided by the `tooz library `_. To find out more about which services ``tooz`` supports, +see ``_. + +To enable this feature, you must set the following in ``orchestrator.ini``:: + + [coordination] + enabled=True # enable the feature + url=kazoo://zookeeper.localnet:2181?timeout=5 # a URL to a tooz-supported coordination service + group_id=astara.orchestrator # optional, change this if deploying multiple clusters + heartbeat_interval=1 # optional, tune as needed diff --git a/doc/source/orchestrator.rst b/doc/source/orchestrator.rst index 7c9220b5..6175c561 100644 --- a/doc/source/orchestrator.rst +++ b/doc/source/orchestrator.rst @@ -146,3 +146,23 @@ event transitions the state machine into the ``Alive`` state, which (depending on the availability of the router), may simply exit the state machine (because the router's status API replies with an ``HTTP 200``) or transition to the ``CreateVM`` state (because the router is unresponsive and must be recreated). + +High Availability +----------------- + +Astara supports high-availability (HA) on both the control plane and data +plane. + +The ``astara-orchestrator`` service may be deployed in a configuration that +allows multiple service processes to span nodes to allow load-distribution +and HA. For more information on clustering, see the :ref:`install docs`. + +It also supports orchestrating pairs of virtual appliances to provide +HA of the data path, allowing pairs of virtual routers to be clustered among +themselves using VRRP and connection tracking. To enable this, simply +create Neutron routers with the ``ha=True`` parameter or set this property +on existing routers and issue a rebuild command via ``astara-ctl`` for that +router. + + + diff --git a/releasenotes/notes/ha_appliances-c2048033c2be6d51.yaml b/releasenotes/notes/ha_appliances-c2048033c2be6d51.yaml new file mode 100644 index 00000000..7d26de87 --- /dev/null +++ b/releasenotes/notes/ha_appliances-c2048033c2be6d51.yaml @@ -0,0 +1,4 @@ +--- +features: + - Astara now supports orchestrating clustered pairs of appliance VMs for + Neutron routers that have the been set to highly-available.