Merge "Make put_allocations() retry on concurrent update"
This commit is contained in:
@@ -94,6 +94,33 @@ def safe_connect(f):
|
|||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
class Retry(Exception):
|
||||||
|
def __init__(self, operation, reason):
|
||||||
|
self.operation = operation
|
||||||
|
self.reason = reason
|
||||||
|
|
||||||
|
|
||||||
|
def retries(f):
|
||||||
|
"""Decorator to retry a call three times if it raises Retry
|
||||||
|
|
||||||
|
Note that this returns the actual value of the inner call on success
|
||||||
|
or returns False if all the retries fail.
|
||||||
|
"""
|
||||||
|
@functools.wraps(f)
|
||||||
|
def wrapper(self, *a, **k):
|
||||||
|
for retry in range(0, 3):
|
||||||
|
try:
|
||||||
|
return f(self, *a, **k)
|
||||||
|
except Retry as e:
|
||||||
|
LOG.debug(
|
||||||
|
'Unable to %(op)s because %(reason)s; retrying...',
|
||||||
|
{'op': e.operation, 'reason': e.reason})
|
||||||
|
LOG.error('Failed scheduler client operation %s: out of retries',
|
||||||
|
f.__name__)
|
||||||
|
return False
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
def _compute_node_to_inventory_dict(compute_node):
|
def _compute_node_to_inventory_dict(compute_node):
|
||||||
"""Given a supplied `objects.ComputeNode` object, return a dict, keyed
|
"""Given a supplied `objects.ComputeNode` object, return a dict, keyed
|
||||||
by resource class, of various inventory information.
|
by resource class, of various inventory information.
|
||||||
@@ -1125,6 +1152,7 @@ class SchedulerReportClient(object):
|
|||||||
return r.status_code == 204
|
return r.status_code == 204
|
||||||
|
|
||||||
@safe_connect
|
@safe_connect
|
||||||
|
@retries
|
||||||
def put_allocations(self, rp_uuid, consumer_uuid, alloc_data, project_id,
|
def put_allocations(self, rp_uuid, consumer_uuid, alloc_data, project_id,
|
||||||
user_id):
|
user_id):
|
||||||
"""Creates allocation records for the supplied instance UUID against
|
"""Creates allocation records for the supplied instance UUID against
|
||||||
@@ -1141,6 +1169,8 @@ class SchedulerReportClient(object):
|
|||||||
:param project_id: The project_id associated with the allocations.
|
:param project_id: The project_id associated with the allocations.
|
||||||
:param user_id: The user_id associated with the allocations.
|
:param user_id: The user_id associated with the allocations.
|
||||||
:returns: True if the allocations were created, False otherwise.
|
:returns: True if the allocations were created, False otherwise.
|
||||||
|
:raises: Retry if the operation should be retried due to a concurrent
|
||||||
|
update.
|
||||||
"""
|
"""
|
||||||
payload = {
|
payload = {
|
||||||
'allocations': [
|
'allocations': [
|
||||||
@@ -1164,12 +1194,20 @@ class SchedulerReportClient(object):
|
|||||||
payload.pop('user_id')
|
payload.pop('user_id')
|
||||||
r = self.put(url, payload)
|
r = self.put(url, payload)
|
||||||
if r.status_code != 204:
|
if r.status_code != 204:
|
||||||
LOG.warning(
|
# NOTE(jaypipes): Yes, it sucks doing string comparison like this
|
||||||
'Unable to submit allocation for instance '
|
# but we have no error codes, only error messages.
|
||||||
|
if 'concurrently updated' in r.text:
|
||||||
|
reason = ('another process changed the resource providers '
|
||||||
|
'involved in our attempt to put allocations for '
|
||||||
|
'consumer %s' % consumer_uuid)
|
||||||
|
raise Retry('put_allocations', reason)
|
||||||
|
else:
|
||||||
|
LOG.warning(
|
||||||
|
'Unable to submit allocation for instance '
|
||||||
'%(uuid)s (%(code)i %(text)s)',
|
'%(uuid)s (%(code)i %(text)s)',
|
||||||
{'uuid': consumer_uuid,
|
{'uuid': consumer_uuid,
|
||||||
'code': r.status_code,
|
'code': r.status_code,
|
||||||
'text': r.text})
|
'text': r.text})
|
||||||
return r.status_code == 204
|
return r.status_code == 204
|
||||||
|
|
||||||
@safe_connect
|
@safe_connect
|
||||||
|
@@ -292,6 +292,52 @@ class TestPutAllocations(SchedulerReportClientTestCase):
|
|||||||
log_msg = mock_warn.call_args[0][0]
|
log_msg = mock_warn.call_args[0][0]
|
||||||
self.assertIn("Unable to submit allocation for instance", log_msg)
|
self.assertIn("Unable to submit allocation for instance", log_msg)
|
||||||
|
|
||||||
|
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.put')
|
||||||
|
def test_put_allocations_retries_conflict(self, mock_put):
|
||||||
|
|
||||||
|
failed = mock.MagicMock()
|
||||||
|
failed.status_code = 409
|
||||||
|
failed.text = "concurrently updated"
|
||||||
|
|
||||||
|
succeeded = mock.MagicMock()
|
||||||
|
succeeded.status_code = 204
|
||||||
|
|
||||||
|
mock_put.side_effect = (failed, succeeded)
|
||||||
|
|
||||||
|
rp_uuid = mock.sentinel.rp
|
||||||
|
consumer_uuid = mock.sentinel.consumer
|
||||||
|
data = {"MEMORY_MB": 1024}
|
||||||
|
expected_url = "/allocations/%s" % consumer_uuid
|
||||||
|
resp = self.client.put_allocations(rp_uuid, consumer_uuid, data,
|
||||||
|
mock.sentinel.project_id,
|
||||||
|
mock.sentinel.user_id)
|
||||||
|
self.assertTrue(resp)
|
||||||
|
mock_put.assert_has_calls([
|
||||||
|
mock.call(expected_url, mock.ANY, version='1.8'),
|
||||||
|
mock.call(expected_url, mock.ANY, version='1.8')])
|
||||||
|
|
||||||
|
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.put')
|
||||||
|
def test_put_allocations_retry_gives_up(self, mock_put):
|
||||||
|
|
||||||
|
failed = mock.MagicMock()
|
||||||
|
failed.status_code = 409
|
||||||
|
failed.text = "concurrently updated"
|
||||||
|
|
||||||
|
mock_put.return_value = failed
|
||||||
|
|
||||||
|
rp_uuid = mock.sentinel.rp
|
||||||
|
consumer_uuid = mock.sentinel.consumer
|
||||||
|
data = {"MEMORY_MB": 1024}
|
||||||
|
expected_url = "/allocations/%s" % consumer_uuid
|
||||||
|
resp = self.client.put_allocations(rp_uuid, consumer_uuid, data,
|
||||||
|
mock.sentinel.project_id,
|
||||||
|
mock.sentinel.user_id)
|
||||||
|
self.assertFalse(resp)
|
||||||
|
mock_put.assert_has_calls([
|
||||||
|
mock.call(expected_url, mock.ANY, version='1.8'),
|
||||||
|
mock.call(expected_url, mock.ANY, version='1.8'),
|
||||||
|
mock.call(expected_url, mock.ANY, version='1.8')])
|
||||||
|
|
||||||
def test_claim_resources_success(self):
|
def test_claim_resources_success(self):
|
||||||
get_resp_mock = mock.Mock(status_code=200)
|
get_resp_mock = mock.Mock(status_code=200)
|
||||||
get_resp_mock.json.return_value = {
|
get_resp_mock.json.return_value = {
|
||||||
|
Reference in New Issue
Block a user