diff --git a/lower-constraints.txt b/lower-constraints.txt index 12b574419..6704cf62b 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -152,6 +152,7 @@ tenacity==4.9.0 testresources==2.0.1 testscenarios==0.5.0 testtools==2.2.0 +tooz==1.58.0 tosca-parser==1.6.0 traceback2==1.4.0 unittest2==1.1.0 diff --git a/requirements.txt b/requirements.txt index 58d510612..1d33b769c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -50,6 +50,7 @@ python-barbicanclient>=4.5.2 # Apache-2.0 castellan>=0.16.0 # Apache-2.0 kubernetes>=5.0.0 # Apache-2.0 setuptools!=24.0.0,!=34.0.0,!=34.0.1,!=34.0.2,!=34.0.3,!=34.1.0,!=34.1.1,!=34.2.0,!=34.3.0,!=34.3.1,!=34.3.2,!=36.2.0,>=21.0.0 # PSF/ZPL +tooz>=1.58.0 # Apache-2.0 PyYAML>=3.12 # MIT # Glance Store diff --git a/tacker/common/coordination.py b/tacker/common/coordination.py new file mode 100644 index 000000000..4e8f1853b --- /dev/null +++ b/tacker/common/coordination.py @@ -0,0 +1,156 @@ +# Copyright 2015 Intel +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Coordination and locking utilities.""" + +import inspect +import uuid + +import decorator +from oslo_config import cfg +from oslo_log import log +from oslo_utils import timeutils +from tooz import coordination + +from tacker.common import exceptions + +LOG = log.getLogger(__name__) + +CONF = cfg.CONF + + +class Coordinator(object): + """Tooz coordination wrapper. + + Coordination member id is created from concatenated + `prefix` and `agent_id` parameters. + + :param str agent_id: Agent identifier + :param str prefix: Used to provide member identifier with a + meaningful prefix. + """ + + def __init__(self, agent_id=None, prefix=''): + self.coordinator = None + self.agent_id = agent_id or str(uuid.uuid4()) + self.started = False + self.prefix = prefix + + def start(self): + if self.started: + return + + # NOTE(bluex): Tooz expects member_id as a byte string. + member_id = (self.prefix + self.agent_id).encode('ascii') + self.coordinator = coordination.get_coordinator( + cfg.CONF.coordination.backend_url, member_id) + self.coordinator.start(start_heart=True) + self.started = True + + def stop(self): + """Disconnect from coordination backend and stop heartbeat.""" + if self.started: + self.coordinator.stop() + self.coordinator = None + self.started = False + + def get_lock(self, name): + """Return a Tooz backend lock. + + :param str name: The lock name that is used to identify it + across all nodes. + """ + # NOTE(bluex): Tooz expects lock name as a byte string. + lock_name = (self.prefix + name).encode('ascii') + if self.coordinator is not None: + return self.coordinator.get_lock(lock_name) + else: + raise exceptions.LockCreationFailed('Coordinator uninitialized.') + + +COORDINATOR = Coordinator(prefix='tacker-') + + +def synchronized(lock_name, blocking=True, coordinator=COORDINATOR): + """Synchronization decorator. + + :param str lock_name: Lock name. + :param blocking: If True, blocks until the lock is acquired. + If False, raises exception when not acquired. Otherwise, + the value is used as a timeout value and if lock is not acquired + after this number of seconds exception is raised. + :param coordinator: Coordinator class to use when creating lock. + Defaults to the global coordinator. + :raises tooz.coordination.LockAcquireFailed: if lock is not acquired + + Decorating a method like so:: + + @synchronized('mylock') + def foo(self, *args): + ... + + ensures that only one process will execute the foo method at a time. + + Different methods can share the same lock:: + + @synchronized('mylock') + def foo(self, *args): + ... + + @synchronized('mylock') + def bar(self, *args): + ... + + This way only one of either foo or bar can be executing at a time. + + Lock name can be formatted using Python format string syntax:: + + @synchronized('{f_name}-{vnf.id}') + def foo(self, vnf): + ... + + Available field names are: decorated function parameters and + `f_name` as a decorated function name. + """ + + @decorator.decorator + def _synchronized(f, *a, **k): + call_args = inspect.getcallargs(f, *a, **k) + call_args['f_name'] = f.__name__ + lock = coordinator.get_lock(lock_name.format(**call_args)) + t1 = timeutils.now() + t2 = None + try: + with lock(blocking): + t2 = timeutils.now() + LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: ' + 'waited %(wait_secs)0.3fs', + {'name': lock.name, + 'function': f.__name__, + 'wait_secs': (t2 - t1)}) + return f(*a, **k) + finally: + t3 = timeutils.now() + if t2 is None: + held_secs = "N/A" + else: + held_secs = "%0.3fs" % (t3 - t2) + LOG.debug('Lock "%(name)s" released by "%(function)s" :: held ' + '%(held_secs)s', + {'name': lock.name, + 'function': f.__name__, + 'held_secs': held_secs}) + + return _synchronized diff --git a/tacker/common/exceptions.py b/tacker/common/exceptions.py index 1e2c0cdf0..60d6f750d 100644 --- a/tacker/common/exceptions.py +++ b/tacker/common/exceptions.py @@ -270,6 +270,10 @@ class VnfHealFailed(TackerException): message = _("Heal Vnf failed for vnf %(id)s, error: %(error)s") +class LockCreationFailed(TackerException): + message = _('Unable to create lock. Coordination backend not started.') + + class OrphanedObjectError(TackerException): msg_fmt = _('Cannot call %(method)s on orphaned %(objtype)s object') diff --git a/tacker/conductor/conductor_server.py b/tacker/conductor/conductor_server.py index a322a19cb..bcb3d6a99 100644 --- a/tacker/conductor/conductor_server.py +++ b/tacker/conductor/conductor_server.py @@ -33,6 +33,7 @@ from oslo_utils import timeutils from sqlalchemy.orm import exc as orm_exc import yaml +from tacker.common import coordination from tacker.common import csar_utils from tacker.common import exceptions from tacker.common import safe_utils @@ -144,6 +145,12 @@ class Conductor(manager.Manager): self.vnfm_plugin = plugin.VNFMPlugin() self.vnflcm_driver = vnflcm_driver.VnfLcmDriver() + def start(self): + coordination.COORDINATOR.start() + + def stop(self): + coordination.COORDINATOR.stop() + def init_host(self): glance_store.initialize_glance_store() self._basic_config_check() @@ -395,7 +402,18 @@ class Conductor(manager.Manager): {'zip': csar_path, 'folder': csar_zip_temp_path, 'uuid': vnf_pack.id}) + @coordination.synchronized('{vnf_instance[id]}') def instantiate(self, context, vnf_instance, instantiate_vnf): + # Check if vnf is already instantiated. + vnf_instance = objects.VnfInstance.get_by_id(context, + vnf_instance.id) + if vnf_instance.instantiation_state == \ + fields.VnfInstanceState.INSTANTIATED: + LOG.error("Vnf instance %(id)s is already in %(state)s state.", + {"id": vnf_instance.id, + "state": vnf_instance.instantiation_state}) + return + self.vnflcm_driver.instantiate_vnf(context, vnf_instance, instantiate_vnf) @@ -409,6 +427,7 @@ class Conductor(manager.Manager): LOG.error("Failed to update usage_state of vnf package %s", vnf_package.id) + @coordination.synchronized('{vnf_package[id]}') def _update_package_usage_state(self, context, vnf_package): """Update vnf package usage state to IN_USE/NOT_IN_USE @@ -424,7 +443,19 @@ class Conductor(manager.Manager): vnf_package.save() + @coordination.synchronized('{vnf_instance[id]}') def terminate(self, context, vnf_instance, terminate_vnf_req): + # Check if vnf is in instantiated state. + vnf_instance = objects.VnfInstance.get_by_id(context, + vnf_instance.id) + if vnf_instance.instantiation_state == \ + fields.VnfInstanceState.NOT_INSTANTIATED: + LOG.error("Terminate action cannot be performed on vnf %(id)s " + "which is in %(state)s state.", + {"id": vnf_instance.id, + "state": vnf_instance.instantiation_state}) + return + self.vnflcm_driver.terminate_vnf(context, vnf_instance, terminate_vnf_req) @@ -438,7 +469,19 @@ class Conductor(manager.Manager): LOG.error("Failed to update usage_state of vnf package %s", vnf_package.id) + @coordination.synchronized('{vnf_instance[id]}') def heal(self, context, vnf_instance, heal_vnf_request): + # Check if vnf is in instantiated state. + vnf_instance = objects.VnfInstance.get_by_id(context, + vnf_instance.id) + if vnf_instance.instantiation_state == \ + fields.VnfInstanceState.NOT_INSTANTIATED: + LOG.error("Heal action cannot be performed on vnf %(id)s " + "which is in %(state)s state.", + {"id": vnf_instance.id, + "state": vnf_instance.instantiation_state}) + return + self.vnflcm_driver.heal_vnf(context, vnf_instance, heal_vnf_request) diff --git a/tacker/conf/__init__.py b/tacker/conf/__init__.py index 0180fba79..b6b702993 100644 --- a/tacker/conf/__init__.py +++ b/tacker/conf/__init__.py @@ -17,6 +17,7 @@ import glance_store from oslo_config import cfg from tacker.conf import conductor +from tacker.conf import coordination from tacker.conf import vnf_package CONF = cfg.CONF @@ -24,4 +25,5 @@ CONF.import_group('keystone_authtoken', 'keystonemiddleware.auth_token') vnf_package.register_opts(CONF) conductor.register_opts(CONF) +coordination.register_opts(CONF) glance_store.register_opts(CONF) diff --git a/tacker/conf/coordination.py b/tacker/conf/coordination.py new file mode 100644 index 000000000..e110e365e --- /dev/null +++ b/tacker/conf/coordination.py @@ -0,0 +1,33 @@ +# Copyright (C) 2020 NTT DATA +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg + + +CONF = cfg.CONF + +coordination_opts = [ + cfg.StrOpt('backend_url', + default='file://$state_path', + help='The backend URL to use for distributed coordination.'), +] + + +def register_opts(conf): + conf.register_opts(coordination_opts, group='coordination') + + +def list_opts(): + return {'coordination': coordination_opts} diff --git a/tacker/manager.py b/tacker/manager.py index 387fe1997..ee301bd0c 100644 --- a/tacker/manager.py +++ b/tacker/manager.py @@ -40,6 +40,14 @@ class Manager(periodic_task.PeriodicTasks): def periodic_tasks(self, context, raise_on_error=False): self.run_periodic_tasks(context, raise_on_error=raise_on_error) + def start(self): + """Start service.""" + pass + + def stop(self): + """Stop service.""" + pass + def init_host(self): """Handle initialization if this is a standalone service. diff --git a/tacker/service.py b/tacker/service.py index 453c66aa5..23632606c 100644 --- a/tacker/service.py +++ b/tacker/service.py @@ -152,6 +152,7 @@ class Service(n_rpc.Service): super(Service, self).__init__(host, topic, manager=self.manager) def start(self): + self.manager.start() self.manager.init_host() super(Service, self).start() if self.report_interval: @@ -219,6 +220,7 @@ class Service(n_rpc.Service): self.stop() def stop(self): + self.manager.stop() super(Service, self).stop() for x in self.timers: try: diff --git a/tacker/tests/unit/conductor/test_conductor_server.py b/tacker/tests/unit/conductor/test_conductor_server.py index 8a36ec67c..3486e619c 100644 --- a/tacker/tests/unit/conductor/test_conductor_server.py +++ b/tacker/tests/unit/conductor/test_conductor_server.py @@ -22,6 +22,7 @@ from glance_store import exceptions as store_exceptions import mock import yaml +from tacker.common import coordination from tacker.common import csar_utils from tacker.common import exceptions from tacker.conductor import conductor_server @@ -197,8 +198,10 @@ class TestConductor(SqlTestCase): return vnf_pack_vnfd_obj + @mock.patch.object(coordination.Coordinator, 'get_lock') @mock.patch.object(objects.VnfPackage, 'is_package_in_use') - def test_instantiate_vnf_instance(self, mock_package_in_use): + def test_instantiate_vnf_instance(self, mock_package_in_use, + mock_get_lock): vnf_package_vnfd = self._create_and_upload_vnf_package() vnf_instance_data = fake_obj.get_vnf_instance_data( vnf_package_vnfd.vnfd_id) @@ -210,12 +213,36 @@ class TestConductor(SqlTestCase): self.conductor.instantiate(self.context, vnf_instance, instantiate_vnf_req) self.vnflcm_driver.instantiate_vnf.assert_called_once_with( - self.context, vnf_instance, instantiate_vnf_req) + self.context, mock.ANY, instantiate_vnf_req) mock_package_in_use.assert_called_once() + @mock.patch.object(coordination.Coordinator, 'get_lock') + @mock.patch.object(objects.VnfPackage, 'is_package_in_use') + @mock.patch('tacker.conductor.conductor_server.LOG') + def test_instantiate_vnf_instance_already_instantiated(self, + mock_log, mock_package_in_use, mock_get_lock): + vnf_package_vnfd = self._create_and_upload_vnf_package() + vnf_instance_data = fake_obj.get_vnf_instance_data( + vnf_package_vnfd.vnfd_id) + vnf_instance_data['instantiation_state'] =\ + fields.VnfInstanceState.INSTANTIATED + vnf_instance = objects.VnfInstance(context=self.context, + **vnf_instance_data) + vnf_instance.create() + instantiate_vnf_req = vnflcm_fakes.get_instantiate_vnf_request_obj() + self.conductor.instantiate(self.context, vnf_instance, + instantiate_vnf_req) + self.vnflcm_driver.instantiate_vnf.assert_not_called() + mock_package_in_use.assert_not_called() + expected_log = 'Vnf instance %(id)s is already in %(state)s state.' + mock_log.error.assert_called_once_with(expected_log, + {'id': vnf_instance.id, + 'state': fields.VnfInstanceState.INSTANTIATED}) + + @mock.patch.object(coordination.Coordinator, 'get_lock') @mock.patch.object(objects.VnfPackage, 'is_package_in_use') def test_instantiate_vnf_instance_with_vnf_package_in_use(self, - mock_vnf_package_in_use): + mock_vnf_package_in_use, mock_get_lock): vnf_package_vnfd = self._create_and_upload_vnf_package() vnf_instance_data = fake_obj.get_vnf_instance_data( vnf_package_vnfd.vnfd_id) @@ -227,13 +254,14 @@ class TestConductor(SqlTestCase): self.conductor.instantiate(self.context, vnf_instance, instantiate_vnf_req) self.vnflcm_driver.instantiate_vnf.assert_called_once_with( - self.context, vnf_instance, instantiate_vnf_req) + self.context, mock.ANY, instantiate_vnf_req) mock_vnf_package_in_use.assert_called_once() + @mock.patch.object(coordination.Coordinator, 'get_lock') @mock.patch.object(objects.VnfPackage, 'is_package_in_use') @mock.patch('tacker.conductor.conductor_server.LOG') def test_instantiate_vnf_instance_failed_with_exception( - self, mock_log, mock_is_package_in_use): + self, mock_log, mock_is_package_in_use, mock_get_lock): vnf_package_vnfd = self._create_and_upload_vnf_package() vnf_instance_data = fake_obj.get_vnf_instance_data( vnf_package_vnfd.vnfd_id) @@ -245,14 +273,15 @@ class TestConductor(SqlTestCase): self.conductor.instantiate(self.context, vnf_instance, instantiate_vnf_req) self.vnflcm_driver.instantiate_vnf.assert_called_once_with( - self.context, vnf_instance, instantiate_vnf_req) + self.context, mock.ANY, instantiate_vnf_req) mock_is_package_in_use.assert_called_once() expected_log = 'Failed to update usage_state of vnf package %s' mock_log.error.assert_called_once_with(expected_log, vnf_package_vnfd.package_uuid) + @mock.patch.object(coordination.Coordinator, 'get_lock') @mock.patch.object(objects.VnfPackage, 'is_package_in_use') - def test_terminate_vnf_instance(self, mock_package_in_use): + def test_terminate_vnf_instance(self, mock_package_in_use, mock_get_lock): vnf_package_vnfd = self._create_and_upload_vnf_package() vnf_instance_data = fake_obj.get_vnf_instance_data( vnf_package_vnfd.vnfd_id) @@ -270,12 +299,42 @@ class TestConductor(SqlTestCase): terminate_vnf_req) self.vnflcm_driver.terminate_vnf.assert_called_once_with( - self.context, vnf_instance, terminate_vnf_req) + self.context, mock.ANY, terminate_vnf_req) mock_package_in_use.assert_called_once() + @mock.patch.object(coordination.Coordinator, 'get_lock') + @mock.patch.object(objects.VnfPackage, 'is_package_in_use') + @mock.patch('tacker.conductor.conductor_server.LOG') + def test_terminate_vnf_instance_already_not_instantiated(self, + mock_log, mock_package_in_use, mock_get_lock): + vnf_package_vnfd = self._create_and_upload_vnf_package() + vnf_instance_data = fake_obj.get_vnf_instance_data( + vnf_package_vnfd.vnfd_id) + mock_package_in_use.return_value = True + vnf_instance_data['instantiation_state'] =\ + fields.VnfInstanceState.NOT_INSTANTIATED + vnf_instance = objects.VnfInstance(context=self.context, + **vnf_instance_data) + vnf_instance.create() + + terminate_vnf_req = objects.TerminateVnfRequest( + termination_type=fields.VnfInstanceTerminationType.GRACEFUL) + + self.conductor.terminate(self.context, vnf_instance, + terminate_vnf_req) + + self.vnflcm_driver.terminate_vnf.assert_not_called() + mock_package_in_use.assert_not_called() + expected_log = ('Terminate action cannot be performed on vnf %(id)s ' + 'which is in %(state)s state.') + mock_log.error.assert_called_once_with(expected_log, + {'id': vnf_instance.id, + 'state': fields.VnfInstanceState.NOT_INSTANTIATED}) + + @mock.patch.object(coordination.Coordinator, 'get_lock') @mock.patch.object(objects.VnfPackage, 'is_package_in_use') def test_terminate_vnf_instance_with_usage_state_not_in_use(self, - mock_vnf_package_is_package_in_use): + mock_vnf_package_is_package_in_use, mock_get_lock): vnf_package_vnfd = self._create_and_upload_vnf_package() vnf_instance_data = fake_obj.get_vnf_instance_data( vnf_package_vnfd.vnfd_id) @@ -293,12 +352,13 @@ class TestConductor(SqlTestCase): terminate_vnf_req) self.vnflcm_driver.terminate_vnf.assert_called_once_with( - self.context, vnf_instance, terminate_vnf_req) + self.context, mock.ANY, terminate_vnf_req) mock_vnf_package_is_package_in_use.assert_called_once() + @mock.patch.object(coordination.Coordinator, 'get_lock') @mock.patch.object(objects.VnfPackage, 'is_package_in_use') def test_terminate_vnf_instance_with_usage_state_already_in_use(self, - mock_vnf_package_is_package_in_use): + mock_vnf_package_is_package_in_use, mock_get_lock): vnf_package_vnfd = self._create_and_upload_vnf_package() vnf_instance_data = fake_obj.get_vnf_instance_data( vnf_package_vnfd.vnfd_id) @@ -316,13 +376,14 @@ class TestConductor(SqlTestCase): terminate_vnf_req) self.vnflcm_driver.terminate_vnf.assert_called_once_with( - self.context, vnf_instance, terminate_vnf_req) + self.context, mock.ANY, terminate_vnf_req) mock_vnf_package_is_package_in_use.assert_called_once() + @mock.patch.object(coordination.Coordinator, 'get_lock') @mock.patch.object(objects.VnfPackage, 'is_package_in_use') @mock.patch('tacker.conductor.conductor_server.LOG') def test_terminate_vnf_instance_failed_to_update_usage_state( - self, mock_log, mock_is_package_in_use): + self, mock_log, mock_is_package_in_use, mock_get_lock): vnf_package_vnfd = self._create_and_upload_vnf_package() vnf_instance_data = fake_obj.get_vnf_instance_data( vnf_package_vnfd.vnfd_id) @@ -337,12 +398,13 @@ class TestConductor(SqlTestCase): self.conductor.terminate(self.context, vnf_instance, terminate_vnf_req) self.vnflcm_driver.terminate_vnf.assert_called_once_with( - self.context, vnf_instance, terminate_vnf_req) + self.context, mock.ANY, terminate_vnf_req) expected_msg = "Failed to update usage_state of vnf package %s" mock_log.error.assert_called_once_with(expected_msg, vnf_package_vnfd.package_uuid) - def test_heal_vnf_instance(self): + @mock.patch.object(coordination.Coordinator, 'get_lock') + def test_heal_vnf_instance(self, mock_get_lock): vnf_package_vnfd = self._create_and_upload_vnf_package() vnf_instance_data = fake_obj.get_vnf_instance_data( vnf_package_vnfd.vnfd_id) @@ -357,6 +419,30 @@ class TestConductor(SqlTestCase): self.vnflcm_driver.heal_vnf.assert_called_once_with( self.context, mock.ANY, heal_vnf_req) + @mock.patch.object(coordination.Coordinator, 'get_lock') + @mock.patch('tacker.conductor.conductor_server.LOG') + def test_heal_vnf_instance_already_not_instantiated(self, + mock_log, mock_get_lock): + vnf_package_vnfd = self._create_and_upload_vnf_package() + vnf_instance_data = fake_obj.get_vnf_instance_data( + vnf_package_vnfd.vnfd_id) + + vnf_instance_data['instantiation_state'] =\ + fields.VnfInstanceState.NOT_INSTANTIATED + vnf_instance = objects.VnfInstance(context=self.context, + **vnf_instance_data) + vnf_instance.create() + + heal_vnf_req = objects.HealVnfRequest(cause="healing request") + self.conductor.heal(self.context, vnf_instance, heal_vnf_req) + + self.vnflcm_driver.heal_vnf.assert_not_called() + expected_log = ('Heal action cannot be performed on vnf %(id)s ' + 'which is in %(state)s state.') + mock_log.error.assert_called_once_with(expected_log, + {'id': vnf_instance.id, + 'state': fields.VnfInstanceState.NOT_INSTANTIATED}) + @mock.patch.object(os, 'remove') @mock.patch.object(shutil, 'rmtree') @mock.patch.object(os.path, 'exists')