diff --git a/nova/exception.py b/nova/exception.py index e449f9637eb8..6af10f2ef2de 100644 --- a/nova/exception.py +++ b/nova/exception.py @@ -2148,3 +2148,8 @@ class InvalidReservedMemoryPagesOption(Invalid): msg_fmt = _("The format of the option 'reserved_huge_pages' is invalid. " "(found '%(conf)s') Please refer to the nova " "config-reference.") + + +class ConcurrentUpdateDetected(NovaException): + msg_fmt = _("Another thread concurrently updated the data. " + "Please retry your update") diff --git a/nova/objects/resource_provider.py b/nova/objects/resource_provider.py index ef2c2a7e87ec..0d0701ceb9fa 100644 --- a/nova/objects/resource_provider.py +++ b/nova/objects/resource_provider.py @@ -11,6 +11,7 @@ # under the License. import six +import sqlalchemy as sa from sqlalchemy.orm import contains_eager from nova.db.sqlalchemy import api as db_api @@ -20,6 +21,9 @@ from nova import objects from nova.objects import base from nova.objects import fields +_INV_TBL = models.Inventory.__table__ +_RP_TBL = models.ResourceProvider.__table__ + @db_api.api_context_manager.writer def _create_rp_in_db(context, updates): @@ -46,6 +50,148 @@ def _get_rp_by_uuid_from_db(context, uuid): return result +def _get_current_inventory_resources(conn, rp): + """Returns a set() containing the resource class IDs for all resources + currently having an inventory record for the supplied resource provider. + + :param conn: DB connection to use. + :param rp: Resource provider to query inventory for. + """ + cur_res_sel = sa.select([_INV_TBL.c.resource_class_id]).where( + _INV_TBL.c.resource_provider_id == rp.id) + existing_resources = conn.execute(cur_res_sel).fetchall() + return set([r[0] for r in existing_resources]) + + +def _delete_inventory_from_provider(conn, rp, to_delete): + """Deletes any inventory records from the supplied provider and set() of + resource class identifiers. + + :param conn: DB connection to use. + :param rp: Resource provider from which to delete inventory. + :param to_delete: set() containing resource class IDs for records to + delete. + """ + del_stmt = _INV_TBL.delete().where(sa.and_( + _INV_TBL.c.resource_provider_id == rp.id, + _INV_TBL.c.resource_class_id.in_(to_delete))) + conn.execute(del_stmt) + + +def _add_inventory_to_provider(conn, rp, inv_list, to_add): + """Inserts new inventory records for the supplied resource provider. + + :param conn: DB connection to use. + :param rp: Resource provider to add inventory to. + :param inv_list: InventoryList object + :param to_add: set() containing resource class IDs to search inv_list for + adding to resource provider. + """ + for res_class in to_add: + inv_record = inv_list.find(res_class) + ins_stmt = _INV_TBL.insert().values( + resource_provider_id=rp.id, + resource_class_id=res_class, + total=inv_record.total, + reserved=inv_record.reserved, + min_unit=inv_record.min_unit, + max_unit=inv_record.max_unit, + step_size=inv_record.step_size, + allocation_ratio=inv_record.allocation_ratio) + conn.execute(ins_stmt) + + +def _update_inventory_for_provider(conn, rp, inv_list, to_update): + """Updates existing inventory records for the supplied resource provider. + + :param conn: DB connection to use. + :param rp: Resource provider to add inventory to. + :param inv_list: InventoryList object + :param to_update: set() containing resource class IDs to search inv_list + for updating in resource provider. + """ + for res_class in to_update: + inv_record = inv_list.find(res_class) + upd_stmt = _INV_TBL.update().where(sa.and_( + _INV_TBL.c.resource_provider_id == rp.id, + _INV_TBL.c.resource_class_id == res_class)).values( + total=inv_record.total, + reserved=inv_record.reserved, + min_unit=inv_record.min_unit, + max_unit=inv_record.max_unit, + step_size=inv_record.step_size, + allocation_ratio=inv_record.allocation_ratio) + conn.execute(upd_stmt) + + +def _increment_provider_generation(conn, rp): + """Increments the supplied provider's generation value, supplying the + currently-known generation. Returns whether the increment succeeded. + + :param conn: DB connection to use. + :param rp: `ResourceProvider` whose generation should be updated. + :returns True if the generation was incremented, False otherwise. + """ + rp_gen = rp.generation + new_generation = rp_gen + 1 + upd_stmt = _RP_TBL.update().where(sa.and_( + _RP_TBL.c.id == rp.id, + _RP_TBL.c.generation == rp_gen)).values( + generation=(new_generation)) + + res = conn.execute(upd_stmt) + if res.rowcount != 1: + raise exception.ConcurrentUpdateDetected + return new_generation + + +@db_api.api_context_manager.writer +def _set_inventory(context, rp, inv_list): + """Given an InventoryList object, replaces the inventory of the + resource provider in a safe, atomic fashion using the resource + provider's generation as a consistent view marker. + + :param rp: `ResourceProvider` object upon which to set inventory. + :param inv_list: `InventoryList` object to save to backend storage. + :raises `ConcurrentUpdateDetected` if another thread updated the + same resource provider's view of its inventory or allocations + in between the time when this object was originally read + and the call to set the inventory. + """ + + conn = context.session.connection() + + existing_resources = _get_current_inventory_resources(conn, rp) + these_resources = set([fields.ResourceClass.index(r.resource_class) + for r in inv_list.objects]) + + # Determine which resources we should be adding, deleting and/or + # updating in the resource provider's inventory by comparing sets + # of resource class identifiers. + to_add = these_resources - existing_resources + to_delete = existing_resources - these_resources + to_update = these_resources & existing_resources + + with conn.begin(): + if to_delete: + _delete_inventory_from_provider(conn, rp, to_delete) + if to_add: + _add_inventory_to_provider(conn, rp, inv_list, to_add) + if to_update: + _update_inventory_for_provider(conn, rp, inv_list, to_update) + + # Here is where we update the resource provider's generation value. + # If this update updates zero rows, that means that another + # thread has updated the inventory for this resource provider + # between the time the caller originally read the resource provider + # record and inventory information and this point. We raise an + # exception here which will rollback the above transaction and + # return an error to the caller to indicate that they can attempt + # to retry the inventory save after reverifying any capacity + # conditions and re-reading the existing inventory information. + rp.generation = _increment_provider_generation(conn, rp) + + @base.NovaObjectRegistry.register class ResourceProvider(base.NovaObject): # Version 1.0: Initial version @@ -87,6 +233,11 @@ class ResourceProvider(base.NovaObject): db_resource_provider = cls._get_by_uuid_from_db(context, uuid) return cls._from_db_object(context, cls(), db_resource_provider) + @base.remotable + def set_inventory(self, inv_list): + _set_inventory(self._context, self, inv_list) + self.obj_reset_changes() + @staticmethod def _create_in_db(context, updates): return _create_rp_in_db(context, updates) diff --git a/nova/tests/functional/db/test_resource_provider.py b/nova/tests/functional/db/test_resource_provider.py index ed13d7237eae..5fb7d1312e2c 100644 --- a/nova/tests/functional/db/test_resource_provider.py +++ b/nova/tests/functional/db/test_resource_provider.py @@ -136,3 +136,88 @@ class ResourceProviderTestCase(test.NoDBTestCase): objects.InventoryList.get_all_by_resource_provider_uuid( self.context, resource_provider.uuid)) self.assertEqual(33, reloaded_inventories[0].total) + + def test_provider_set_inventory(self): + rp = objects.ResourceProvider(context=self.context, + uuid=uuidsentinel.rp_uuid, + name=uuidsentinel.rp_name) + rp.create() + saved_generation = rp.generation + + disk_inv = objects.Inventory( + resource_provider=rp, + resource_class=fields.ResourceClass.DISK_GB, + total=1024, + reserved=15, + min_unit=10, + max_unit=100, + step_size=10, + allocation_ratio=1.0) + + vcpu_inv = objects.Inventory( + resource_provider=rp, + resource_class=fields.ResourceClass.VCPU, + total=12, + reserved=0, + min_unit=1, + max_unit=12, + step_size=1, + allocation_ratio=16.0) + + # set to new list + inv_list = objects.InventoryList(objects=[disk_inv, vcpu_inv]) + rp.set_inventory(inv_list) + + # generation has bumped + self.assertEqual(saved_generation + 1, rp.generation) + saved_generation = rp.generation + + new_inv_list = objects.InventoryList.get_all_by_resource_provider_uuid( + self.context, uuidsentinel.rp_uuid) + self.assertEqual(2, len(new_inv_list)) + resource_classes = [inv.resource_class for inv in new_inv_list] + self.assertIn(fields.ResourceClass.VCPU, resource_classes) + self.assertIn(fields.ResourceClass.DISK_GB, resource_classes) + + # reset list to just disk_inv + inv_list = objects.InventoryList(objects=[disk_inv]) + rp.set_inventory(inv_list) + + # generation has bumped + self.assertEqual(saved_generation + 1, rp.generation) + saved_generation = rp.generation + + new_inv_list = objects.InventoryList.get_all_by_resource_provider_uuid( + self.context, uuidsentinel.rp_uuid) + self.assertEqual(1, len(new_inv_list)) + resource_classes = [inv.resource_class for inv in new_inv_list] + self.assertNotIn(fields.ResourceClass.VCPU, resource_classes) + self.assertIn(fields.ResourceClass.DISK_GB, resource_classes) + self.assertEqual(1024, new_inv_list[0].total) + + # update existing disk inv to new settings + disk_inv = objects.Inventory( + resource_provider=rp, + resource_class=fields.ResourceClass.DISK_GB, + total=2048, + reserved=15, + min_unit=10, + max_unit=100, + step_size=10, + allocation_ratio=1.0) + inv_list = objects.InventoryList(objects=[disk_inv]) + rp.set_inventory(inv_list) + + # generation has bumped + self.assertEqual(saved_generation + 1, rp.generation) + saved_generation = rp.generation + + new_inv_list = objects.InventoryList.get_all_by_resource_provider_uuid( + self.context, uuidsentinel.rp_uuid) + self.assertEqual(1, len(new_inv_list)) + self.assertEqual(2048, new_inv_list[0].total) + + # fail when generation wrong + rp.generation = rp.generation - 1 + self.assertRaises(exception.ConcurrentUpdateDetected, + rp.set_inventory, inv_list) diff --git a/nova/tests/unit/objects/test_objects.py b/nova/tests/unit/objects/test_objects.py index 000f805f1292..bc2a3c6b872d 100644 --- a/nova/tests/unit/objects/test_objects.py +++ b/nova/tests/unit/objects/test_objects.py @@ -1180,7 +1180,7 @@ object_data = { 'Quotas': '1.2-1fe4cd50593aaf5d36a6dc5ab3f98fb3', 'QuotasNoOp': '1.2-e041ddeb7dc8188ca71706f78aad41c1', 'RequestSpec': '1.6-c1cb516acdf120d367a42d343ed695b5', - 'ResourceProvider': '1.0-57f2a7e6fa50c6573af211521e83f8c7', + 'ResourceProvider': '1.0-94e0e906feb26a24e217935c1e401467', 'S3ImageMapping': '1.0-7dd7366a890d82660ed121de9092276e', 'SchedulerLimits': '1.0-249c4bd8e62a9b327b7026b7f19cc641', 'SchedulerRetries': '1.1-3c9c8b16143ebbb6ad7030e999d14cc0',