Merge "Compute manager should remove dead resources"
This commit is contained in:
@@ -3676,11 +3676,18 @@ class ComputeManager(manager.SchedulerDependentManager):
|
||||
:param context: security context
|
||||
"""
|
||||
new_resource_tracker_dict = {}
|
||||
nodenames = self.driver.get_available_nodes()
|
||||
nodenames = set(self.driver.get_available_nodes())
|
||||
for nodename in nodenames:
|
||||
rt = self._get_resource_tracker(nodename)
|
||||
rt.update_available_resource(context)
|
||||
new_resource_tracker_dict[nodename] = rt
|
||||
|
||||
# delete nodes that the driver no longer reports
|
||||
known_nodes = set(self._resource_tracker_dict.keys())
|
||||
for nodename in known_nodes - nodenames:
|
||||
rt = self._get_resource_tracker(nodename)
|
||||
rt.update_available_resource(context, delete=True)
|
||||
|
||||
self._resource_tracker_dict = new_resource_tracker_dict
|
||||
|
||||
@manager.periodic_task(spacing=CONF.running_deleted_instance_poll_interval)
|
||||
|
@@ -226,7 +226,7 @@ class ResourceTracker(object):
|
||||
return self.compute_node is None
|
||||
|
||||
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
|
||||
def update_available_resource(self, context):
|
||||
def update_available_resource(self, context, delete=False):
|
||||
"""Override in-memory calculations of compute node resource usage based
|
||||
on data audited from the hypervisor layer.
|
||||
|
||||
@@ -237,11 +237,15 @@ class ResourceTracker(object):
|
||||
LOG.audit(_("Auditing locally available compute resources"))
|
||||
resources = self.driver.get_available_resource(self.nodename)
|
||||
if not resources:
|
||||
# The virt driver does not support this function
|
||||
LOG.audit(_("Virt driver does not support "
|
||||
"'get_available_resource' Compute tracking is disabled."))
|
||||
self.compute_node = None
|
||||
return
|
||||
if delete:
|
||||
self._delete_compute_node(context)
|
||||
return
|
||||
else:
|
||||
# The virt driver does not support this function
|
||||
LOG.audit(_("Virt driver does not support "
|
||||
"'get_available_resource' Compute tracking is disabled."))
|
||||
self.compute_node = None
|
||||
return
|
||||
|
||||
self._verify_resources(resources)
|
||||
|
||||
@@ -270,6 +274,13 @@ class ResourceTracker(object):
|
||||
|
||||
self._sync_compute_node(context, resources)
|
||||
|
||||
def _delete_compute_node(self, context):
|
||||
"""Delete a compute node DB record."""
|
||||
if self.compute_node:
|
||||
LOG.audit(_("Deleting compute node %s") % self.compute_node['id'])
|
||||
self.compute_node = self.conductor_api.compute_node_delete(
|
||||
context, self.compute_node)
|
||||
|
||||
def _sync_compute_node(self, context, resources):
|
||||
"""Create or update the compute node DB record."""
|
||||
if not self.compute_node:
|
||||
|
@@ -276,6 +276,9 @@ class LocalAPI(object):
|
||||
return self._manager.compute_node_update(context, node, values,
|
||||
prune_stats)
|
||||
|
||||
def compute_node_delete(self, context, node):
|
||||
return self._manager.compute_node_delete(context, node)
|
||||
|
||||
def service_update(self, context, service, values):
|
||||
return self._manager.service_update(context, service, values)
|
||||
|
||||
@@ -605,6 +608,9 @@ class API(object):
|
||||
return self.conductor_rpcapi.compute_node_update(context, node,
|
||||
values, prune_stats)
|
||||
|
||||
def compute_node_delete(self, context, node):
|
||||
return self.conductor_rpcapi.compute_node_delete(context, node)
|
||||
|
||||
def service_update(self, context, service, values):
|
||||
return self.conductor_rpcapi.service_update(context, service, values)
|
||||
|
||||
|
@@ -49,7 +49,7 @@ datetime_fields = ['launched_at', 'terminated_at']
|
||||
class ConductorManager(manager.SchedulerDependentManager):
|
||||
"""Mission: TBD."""
|
||||
|
||||
RPC_API_VERSION = '1.43'
|
||||
RPC_API_VERSION = '1.44'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(ConductorManager, self).__init__(service_name='conductor',
|
||||
@@ -352,6 +352,10 @@ class ConductorManager(manager.SchedulerDependentManager):
|
||||
prune_stats)
|
||||
return jsonutils.to_primitive(result)
|
||||
|
||||
def compute_node_delete(self, context, node):
|
||||
result = self.db.compute_node_delete(context, node['id'])
|
||||
return jsonutils.to_primitive(result)
|
||||
|
||||
@rpc_common.client_exceptions(exception.ServiceNotFound)
|
||||
def service_update(self, context, service, values):
|
||||
svc = self.db.service_update(context, service['id'], values)
|
||||
|
@@ -81,6 +81,7 @@ class ConductorAPI(nova.openstack.common.rpc.proxy.RpcProxy):
|
||||
quota_rollback
|
||||
1.42 - Added get_ec2_ids, aggregate_metadata_get_by_host
|
||||
1.43 - Added compute_stop
|
||||
1.44 - Added compute_node_delete
|
||||
"""
|
||||
|
||||
BASE_RPC_API_VERSION = '1.0'
|
||||
@@ -346,6 +347,11 @@ class ConductorAPI(nova.openstack.common.rpc.proxy.RpcProxy):
|
||||
prune_stats=prune_stats)
|
||||
return self.call(context, msg, version='1.33')
|
||||
|
||||
def compute_node_delete(self, context, node):
|
||||
node_p = jsonutils.to_primitive(node)
|
||||
msg = self.make_msg('compute_node_delete', node=node_p)
|
||||
return self.call(context, msg, version='1.44')
|
||||
|
||||
def service_update(self, context, service, values):
|
||||
service_p = jsonutils.to_primitive(service)
|
||||
msg = self.make_msg('service_update', service=service_p, values=values)
|
||||
|
@@ -195,11 +195,19 @@ def compute_node_create(context, values):
|
||||
def compute_node_update(context, compute_id, values, prune_stats=False):
|
||||
"""Set the given properties on a computeNode and update it.
|
||||
|
||||
Raises NotFound if computeNode does not exist.
|
||||
Raises ComputeHostNotFound if computeNode does not exist.
|
||||
"""
|
||||
return IMPL.compute_node_update(context, compute_id, values, prune_stats)
|
||||
|
||||
|
||||
def compute_node_delete(context, compute_id):
|
||||
"""Delete a computeNode from the database.
|
||||
|
||||
Raises ComputeHostNotFound if computeNode does not exist.
|
||||
"""
|
||||
return IMPL.compute_node_delete(context, compute_id)
|
||||
|
||||
|
||||
def compute_node_get_by_host(context, host):
|
||||
return IMPL.compute_node_get_by_host(context, host)
|
||||
|
||||
|
@@ -536,6 +536,17 @@ def compute_node_update(context, compute_id, values, prune_stats=False):
|
||||
return compute_ref
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def compute_node_delete(context, compute_id):
|
||||
"""Delete a ComputeNode record."""
|
||||
result = model_query(context, models.ComputeNode).\
|
||||
filter_by(id=compute_id).\
|
||||
soft_delete()
|
||||
|
||||
if not result:
|
||||
raise exception.ComputeHostNotFound(host=compute_id)
|
||||
|
||||
|
||||
def compute_node_get_by_host(context, host):
|
||||
"""Get all capacity entries for the given host."""
|
||||
result = model_query(context, models.ComputeNode, read_deleted="no").\
|
||||
|
@@ -19,7 +19,6 @@
|
||||
from oslo.config import cfg
|
||||
|
||||
from nova import context
|
||||
from nova import exception
|
||||
from nova.openstack.common import importutils
|
||||
from nova import test
|
||||
from nova.virt import fake
|
||||
@@ -72,8 +71,8 @@ class FakeDriverMultiNodeTestCase(BaseTestCase):
|
||||
res_b = self.driver.get_available_resource('bbb')
|
||||
self.assertEqual(res_b['hypervisor_hostname'], 'bbb')
|
||||
|
||||
self.assertRaises(exception.NovaException,
|
||||
self.driver.get_available_resource, 'xxx')
|
||||
res_x = self.driver.get_available_resource('xxx')
|
||||
self.assertEqual(res_x, {})
|
||||
|
||||
|
||||
class MultiNodeComputeTestCase(BaseTestCase):
|
||||
@@ -101,3 +100,22 @@ class MultiNodeComputeTestCase(BaseTestCase):
|
||||
self.compute.update_available_resource(ctx)
|
||||
self.assertEqual(sorted(self.compute._resource_tracker_dict.keys()),
|
||||
['A', 'B', 'C'])
|
||||
|
||||
def test_compute_manager_removes_deleted_node(self):
|
||||
ctx = context.get_admin_context()
|
||||
fake.set_nodes(['A', 'B'])
|
||||
self.compute.update_available_resource(ctx)
|
||||
|
||||
rt_A = self.compute._resource_tracker_dict['A']
|
||||
rt_B = self.compute._resource_tracker_dict['B']
|
||||
self.mox.StubOutWithMock(rt_A, 'update_available_resource')
|
||||
self.mox.StubOutWithMock(rt_B, 'update_available_resource')
|
||||
rt_A.update_available_resource(ctx)
|
||||
rt_B.update_available_resource(ctx, delete=True)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
fake.set_nodes(['A'])
|
||||
self.compute.update_available_resource(ctx)
|
||||
self.mox.VerifyAll()
|
||||
self.assertEqual(sorted(self.compute._resource_tracker_dict.keys()),
|
||||
['A'])
|
||||
|
@@ -347,6 +347,9 @@ class BaseTrackerTestCase(BaseTestCase):
|
||||
# database models and a compatible compute driver:
|
||||
super(BaseTrackerTestCase, self).setUp()
|
||||
|
||||
self.updated = False
|
||||
self.deleted = False
|
||||
|
||||
self.tracker = self._tracker()
|
||||
self._migrations = {}
|
||||
|
||||
@@ -354,6 +357,8 @@ class BaseTrackerTestCase(BaseTestCase):
|
||||
self._fake_service_get_by_compute_host)
|
||||
self.stubs.Set(db, 'compute_node_update',
|
||||
self._fake_compute_node_update)
|
||||
self.stubs.Set(db, 'compute_node_delete',
|
||||
self._fake_compute_node_delete)
|
||||
self.stubs.Set(db, 'migration_update',
|
||||
self._fake_migration_update)
|
||||
self.stubs.Set(db, 'migration_get_in_progress_by_host_and_node',
|
||||
@@ -375,6 +380,11 @@ class BaseTrackerTestCase(BaseTestCase):
|
||||
self.compute.update(values)
|
||||
return self.compute
|
||||
|
||||
def _fake_compute_node_delete(self, ctx, compute_node_id):
|
||||
self.deleted = True
|
||||
self.compute.update({'deleted': 1})
|
||||
return self.compute
|
||||
|
||||
def _fake_migration_get_in_progress_by_host_and_node(self, ctxt, host,
|
||||
node):
|
||||
status = ['confirmed', 'reverted']
|
||||
@@ -892,3 +902,18 @@ class OrphanTestCase(BaseTrackerTestCase):
|
||||
orphans = self.tracker._find_orphaned_instances()
|
||||
|
||||
self.assertEqual(2, len(orphans))
|
||||
|
||||
|
||||
class DeletedNodeTestCase(BaseTrackerTestCase):
|
||||
|
||||
def test_remove_deleted_node(self):
|
||||
self.assertFalse(self.tracker.disabled)
|
||||
self.assertTrue(self.updated)
|
||||
|
||||
def _get_available_resource(nodename):
|
||||
return {}
|
||||
self.tracker.driver.get_available_resource = _get_available_resource
|
||||
|
||||
self.tracker.update_available_resource(self.context, delete=True)
|
||||
self.assertEqual(self.deleted, True)
|
||||
self.assertEqual(self.compute['deleted'], 1)
|
||||
|
@@ -429,6 +429,14 @@ class _BaseTestCase(object):
|
||||
'fake-values', False)
|
||||
self.assertEqual(result, 'fake-result')
|
||||
|
||||
def test_compute_node_delete(self):
|
||||
node = {'id': 'fake-id'}
|
||||
self.mox.StubOutWithMock(db, 'compute_node_delete')
|
||||
db.compute_node_delete(self.context, node['id']).AndReturn(None)
|
||||
self.mox.ReplayAll()
|
||||
result = self.conductor.compute_node_delete(self.context, node)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
def test_instance_fault_create(self):
|
||||
self.mox.StubOutWithMock(db, 'instance_fault_create')
|
||||
db.instance_fault_create(self.context, 'fake-values').AndReturn(
|
||||
|
@@ -410,9 +410,13 @@ class BareMetalDriver(driver.ComputeDriver):
|
||||
|
||||
def get_available_resource(self, nodename):
|
||||
context = nova_context.get_admin_context()
|
||||
node = db.bm_node_get_by_node_uuid(context, nodename)
|
||||
dic = self._node_resource(node)
|
||||
return dic
|
||||
resource = {}
|
||||
try:
|
||||
node = db.bm_node_get_by_node_uuid(context, nodename)
|
||||
resource = self._node_resource(node)
|
||||
except exception.NodeNotFoundByUUID:
|
||||
pass
|
||||
return resource
|
||||
|
||||
def ensure_filtering_rules_for_instance(self, instance_ref, network_info):
|
||||
self.firewall_driver.setup_basic_filtering(instance_ref, network_info)
|
||||
|
@@ -321,7 +321,7 @@ class FakeDriver(driver.ComputeDriver):
|
||||
disk and ram.
|
||||
"""
|
||||
if nodename not in _FAKE_NODES:
|
||||
raise exception.NovaException("node %s is not found" % nodename)
|
||||
return {}
|
||||
|
||||
dic = {'vcpus': 1,
|
||||
'memory_mb': 8192,
|
||||
|
Reference in New Issue
Block a user