Merge "Retry large object manifest upload"
This commit is contained in:
@@ -554,24 +554,55 @@ class ObjectStoreCloudMixin(_normalize.Normalizer):
|
|||||||
|
|
||||||
self._add_etag_to_manifest(segment_results, manifest)
|
self._add_etag_to_manifest(segment_results, manifest)
|
||||||
|
|
||||||
|
# If the final manifest upload fails, remove the segments we've
|
||||||
|
# already uploaded.
|
||||||
|
try:
|
||||||
if use_slo:
|
if use_slo:
|
||||||
return self._finish_large_object_slo(endpoint, headers, manifest)
|
return self._finish_large_object_slo(endpoint, headers,
|
||||||
|
manifest)
|
||||||
else:
|
else:
|
||||||
return self._finish_large_object_dlo(endpoint, headers)
|
return self._finish_large_object_dlo(endpoint, headers)
|
||||||
|
except Exception:
|
||||||
|
try:
|
||||||
|
segment_prefix = endpoint.split('/')[-1]
|
||||||
|
self.log.debug(
|
||||||
|
"Failed to upload large object manifest for %s. "
|
||||||
|
"Removing segment uploads.", segment_prefix)
|
||||||
|
self.delete_autocreated_image_objects(
|
||||||
|
segment_prefix=segment_prefix)
|
||||||
|
except Exception:
|
||||||
|
self.log.exception(
|
||||||
|
"Failed to cleanup image objects for %s:",
|
||||||
|
segment_prefix)
|
||||||
|
raise
|
||||||
|
|
||||||
def _finish_large_object_slo(self, endpoint, headers, manifest):
|
def _finish_large_object_slo(self, endpoint, headers, manifest):
|
||||||
# TODO(mordred) send an etag of the manifest, which is the md5sum
|
# TODO(mordred) send an etag of the manifest, which is the md5sum
|
||||||
# of the concatenation of the etags of the results
|
# of the concatenation of the etags of the results
|
||||||
headers = headers.copy()
|
headers = headers.copy()
|
||||||
|
retries = 3
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
return self._object_store_client.put(
|
return self._object_store_client.put(
|
||||||
endpoint,
|
endpoint,
|
||||||
params={'multipart-manifest': 'put'},
|
params={'multipart-manifest': 'put'},
|
||||||
headers=headers, data=json.dumps(manifest))
|
headers=headers, data=json.dumps(manifest))
|
||||||
|
except Exception:
|
||||||
|
retries -= 1
|
||||||
|
if retries == 0:
|
||||||
|
raise
|
||||||
|
|
||||||
def _finish_large_object_dlo(self, endpoint, headers):
|
def _finish_large_object_dlo(self, endpoint, headers):
|
||||||
headers = headers.copy()
|
headers = headers.copy()
|
||||||
headers['X-Object-Manifest'] = endpoint
|
headers['X-Object-Manifest'] = endpoint
|
||||||
|
retries = 3
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
return self._object_store_client.put(endpoint, headers=headers)
|
return self._object_store_client.put(endpoint, headers=headers)
|
||||||
|
except Exception:
|
||||||
|
retries -= 1
|
||||||
|
if retries == 0:
|
||||||
|
raise
|
||||||
|
|
||||||
def update_object(self, container, name, metadata=None, **headers):
|
def update_object(self, container, name, metadata=None, **headers):
|
||||||
"""Update the metadata of an object
|
"""Update the metadata of an object
|
||||||
@@ -668,7 +699,8 @@ class ObjectStoreCloudMixin(_normalize.Normalizer):
|
|||||||
except exc.OpenStackCloudHTTPError:
|
except exc.OpenStackCloudHTTPError:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def delete_autocreated_image_objects(self, container=None):
|
def delete_autocreated_image_objects(self, container=None,
|
||||||
|
segment_prefix=None):
|
||||||
"""Delete all objects autocreated for image uploads.
|
"""Delete all objects autocreated for image uploads.
|
||||||
|
|
||||||
This method should generally not be needed, as shade should clean up
|
This method should generally not be needed, as shade should clean up
|
||||||
@@ -676,6 +708,11 @@ class ObjectStoreCloudMixin(_normalize.Normalizer):
|
|||||||
goes wrong and it is found that there are leaked objects, this method
|
goes wrong and it is found that there are leaked objects, this method
|
||||||
can be used to delete any objects that shade has created on the user's
|
can be used to delete any objects that shade has created on the user's
|
||||||
behalf in service of image uploads.
|
behalf in service of image uploads.
|
||||||
|
|
||||||
|
:param str container: Name of the container. Defaults to 'images'.
|
||||||
|
:param str segment_prefix: Prefix for the image segment names to
|
||||||
|
delete. If not given, all image upload segments present are
|
||||||
|
deleted.
|
||||||
"""
|
"""
|
||||||
if container is None:
|
if container is None:
|
||||||
container = self._OBJECT_AUTOCREATE_CONTAINER
|
container = self._OBJECT_AUTOCREATE_CONTAINER
|
||||||
@@ -684,7 +721,7 @@ class ObjectStoreCloudMixin(_normalize.Normalizer):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
deleted = False
|
deleted = False
|
||||||
for obj in self.list_objects(container):
|
for obj in self.list_objects(container, prefix=segment_prefix):
|
||||||
meta = self.get_object_metadata(container, obj['name'])
|
meta = self.get_object_metadata(container, obj['name'])
|
||||||
if meta.get(
|
if meta.get(
|
||||||
self._OBJECT_AUTOCREATE_KEY, meta.get(
|
self._OBJECT_AUTOCREATE_KEY, meta.get(
|
||||||
|
@@ -841,6 +841,261 @@ class TestObjectUploads(BaseTestObject):
|
|||||||
},
|
},
|
||||||
], self.adapter.request_history[-1].json())
|
], self.adapter.request_history[-1].json())
|
||||||
|
|
||||||
|
def test_slo_manifest_retry(self):
|
||||||
|
"""
|
||||||
|
Uploading the SLO manifest file should be retried up to 3 times before
|
||||||
|
giving up. This test should succeed on the 3rd and final attempt.
|
||||||
|
"""
|
||||||
|
max_file_size = 25
|
||||||
|
min_file_size = 1
|
||||||
|
|
||||||
|
uris_to_mock = [
|
||||||
|
dict(method='GET', uri='https://object-store.example.com/info',
|
||||||
|
json=dict(
|
||||||
|
swift={'max_file_size': max_file_size},
|
||||||
|
slo={'min_segment_size': min_file_size})),
|
||||||
|
dict(method='HEAD',
|
||||||
|
uri='{endpoint}/{container}/{object}'.format(
|
||||||
|
endpoint=self.endpoint,
|
||||||
|
container=self.container, object=self.object),
|
||||||
|
status_code=404)
|
||||||
|
]
|
||||||
|
|
||||||
|
uris_to_mock.extend([
|
||||||
|
dict(method='PUT',
|
||||||
|
uri='{endpoint}/{container}/{object}/{index:0>6}'.format(
|
||||||
|
endpoint=self.endpoint,
|
||||||
|
container=self.container,
|
||||||
|
object=self.object,
|
||||||
|
index=index),
|
||||||
|
status_code=201,
|
||||||
|
headers=dict(Etag='etag{index}'.format(index=index)))
|
||||||
|
for index, offset in enumerate(
|
||||||
|
range(0, len(self.content), max_file_size))
|
||||||
|
])
|
||||||
|
|
||||||
|
# manifest file upload calls
|
||||||
|
uris_to_mock.extend([
|
||||||
|
dict(method='PUT',
|
||||||
|
uri='{endpoint}/{container}/{object}'.format(
|
||||||
|
endpoint=self.endpoint,
|
||||||
|
container=self.container, object=self.object),
|
||||||
|
status_code=400,
|
||||||
|
validate=dict(
|
||||||
|
params={
|
||||||
|
'multipart-manifest', 'put'
|
||||||
|
},
|
||||||
|
headers={
|
||||||
|
'x-object-meta-x-sdk-md5': self.md5,
|
||||||
|
'x-object-meta-x-sdk-sha256': self.sha256,
|
||||||
|
})),
|
||||||
|
dict(method='PUT',
|
||||||
|
uri='{endpoint}/{container}/{object}'.format(
|
||||||
|
endpoint=self.endpoint,
|
||||||
|
container=self.container, object=self.object),
|
||||||
|
status_code=400,
|
||||||
|
validate=dict(
|
||||||
|
params={
|
||||||
|
'multipart-manifest', 'put'
|
||||||
|
},
|
||||||
|
headers={
|
||||||
|
'x-object-meta-x-sdk-md5': self.md5,
|
||||||
|
'x-object-meta-x-sdk-sha256': self.sha256,
|
||||||
|
})),
|
||||||
|
dict(method='PUT',
|
||||||
|
uri='{endpoint}/{container}/{object}'.format(
|
||||||
|
endpoint=self.endpoint,
|
||||||
|
container=self.container, object=self.object),
|
||||||
|
status_code=201,
|
||||||
|
validate=dict(
|
||||||
|
params={
|
||||||
|
'multipart-manifest', 'put'
|
||||||
|
},
|
||||||
|
headers={
|
||||||
|
'x-object-meta-x-sdk-md5': self.md5,
|
||||||
|
'x-object-meta-x-sdk-sha256': self.sha256,
|
||||||
|
})),
|
||||||
|
])
|
||||||
|
|
||||||
|
self.register_uris(uris_to_mock)
|
||||||
|
|
||||||
|
self.cloud.create_object(
|
||||||
|
container=self.container, name=self.object,
|
||||||
|
filename=self.object_file.name, use_slo=True)
|
||||||
|
|
||||||
|
# After call 3, order become indeterminate because of thread pool
|
||||||
|
self.assert_calls(stop_after=3)
|
||||||
|
|
||||||
|
for key, value in self.calls[-1]['headers'].items():
|
||||||
|
self.assertEqual(
|
||||||
|
value, self.adapter.request_history[-1].headers[key],
|
||||||
|
'header mismatch in manifest call')
|
||||||
|
|
||||||
|
base_object = '/{container}/{object}'.format(
|
||||||
|
endpoint=self.endpoint,
|
||||||
|
container=self.container,
|
||||||
|
object=self.object)
|
||||||
|
|
||||||
|
self.assertEqual([
|
||||||
|
{
|
||||||
|
'path': "{base_object}/000000".format(
|
||||||
|
base_object=base_object),
|
||||||
|
'size_bytes': 25,
|
||||||
|
'etag': 'etag0',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'path': "{base_object}/000001".format(
|
||||||
|
base_object=base_object),
|
||||||
|
'size_bytes': 25,
|
||||||
|
'etag': 'etag1',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'path': "{base_object}/000002".format(
|
||||||
|
base_object=base_object),
|
||||||
|
'size_bytes': 25,
|
||||||
|
'etag': 'etag2',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'path': "{base_object}/000003".format(
|
||||||
|
base_object=base_object),
|
||||||
|
'size_bytes': len(self.object) - 75,
|
||||||
|
'etag': 'etag3',
|
||||||
|
},
|
||||||
|
], self.adapter.request_history[-1].json())
|
||||||
|
|
||||||
|
def test_slo_manifest_fail(self):
|
||||||
|
"""
|
||||||
|
Uploading the SLO manifest file should be retried up to 3 times before
|
||||||
|
giving up. This test fails all 3 attempts and should verify that we
|
||||||
|
delete uploaded segments that begin with the object prefix.
|
||||||
|
"""
|
||||||
|
max_file_size = 25
|
||||||
|
min_file_size = 1
|
||||||
|
|
||||||
|
uris_to_mock = [
|
||||||
|
dict(method='GET', uri='https://object-store.example.com/info',
|
||||||
|
json=dict(
|
||||||
|
swift={'max_file_size': max_file_size},
|
||||||
|
slo={'min_segment_size': min_file_size})),
|
||||||
|
dict(method='HEAD',
|
||||||
|
uri='{endpoint}/{container}/{object}'.format(
|
||||||
|
endpoint=self.endpoint,
|
||||||
|
container=self.container, object=self.object),
|
||||||
|
status_code=404)
|
||||||
|
]
|
||||||
|
|
||||||
|
uris_to_mock.extend([
|
||||||
|
dict(method='PUT',
|
||||||
|
uri='{endpoint}/{container}/{object}/{index:0>6}'.format(
|
||||||
|
endpoint=self.endpoint,
|
||||||
|
container=self.container,
|
||||||
|
object=self.object,
|
||||||
|
index=index),
|
||||||
|
status_code=201,
|
||||||
|
headers=dict(Etag='etag{index}'.format(index=index)))
|
||||||
|
for index, offset in enumerate(
|
||||||
|
range(0, len(self.content), max_file_size))
|
||||||
|
])
|
||||||
|
|
||||||
|
# manifest file upload calls
|
||||||
|
uris_to_mock.extend([
|
||||||
|
dict(method='PUT',
|
||||||
|
uri='{endpoint}/{container}/{object}'.format(
|
||||||
|
endpoint=self.endpoint,
|
||||||
|
container=self.container, object=self.object),
|
||||||
|
status_code=400,
|
||||||
|
validate=dict(
|
||||||
|
params={
|
||||||
|
'multipart-manifest', 'put'
|
||||||
|
},
|
||||||
|
headers={
|
||||||
|
'x-object-meta-x-sdk-md5': self.md5,
|
||||||
|
'x-object-meta-x-sdk-sha256': self.sha256,
|
||||||
|
})),
|
||||||
|
dict(method='PUT',
|
||||||
|
uri='{endpoint}/{container}/{object}'.format(
|
||||||
|
endpoint=self.endpoint,
|
||||||
|
container=self.container, object=self.object),
|
||||||
|
status_code=400,
|
||||||
|
validate=dict(
|
||||||
|
params={
|
||||||
|
'multipart-manifest', 'put'
|
||||||
|
},
|
||||||
|
headers={
|
||||||
|
'x-object-meta-x-sdk-md5': self.md5,
|
||||||
|
'x-object-meta-x-sdk-sha256': self.sha256,
|
||||||
|
})),
|
||||||
|
dict(method='PUT',
|
||||||
|
uri='{endpoint}/{container}/{object}'.format(
|
||||||
|
endpoint=self.endpoint,
|
||||||
|
container=self.container, object=self.object),
|
||||||
|
status_code=400,
|
||||||
|
validate=dict(
|
||||||
|
params={
|
||||||
|
'multipart-manifest', 'put'
|
||||||
|
},
|
||||||
|
headers={
|
||||||
|
'x-object-meta-x-sdk-md5': self.md5,
|
||||||
|
'x-object-meta-x-sdk-sha256': self.sha256,
|
||||||
|
})),
|
||||||
|
])
|
||||||
|
|
||||||
|
# Cleaning up image upload segments involves calling the
|
||||||
|
# delete_autocreated_image_objects() API method which will list
|
||||||
|
# objects (LIST), get the object metadata (HEAD), then delete the
|
||||||
|
# object (DELETE).
|
||||||
|
uris_to_mock.extend([
|
||||||
|
dict(method='GET',
|
||||||
|
uri='{endpoint}/images?format=json&prefix={prefix}'.format(
|
||||||
|
endpoint=self.endpoint,
|
||||||
|
prefix=self.object),
|
||||||
|
complete_qs=True,
|
||||||
|
json=[{
|
||||||
|
'content_type': 'application/octet-stream',
|
||||||
|
'bytes': 1437258240,
|
||||||
|
'hash': '249219347276c331b87bf1ac2152d9af',
|
||||||
|
'last_modified': '2015-02-16T17:50:05.289600',
|
||||||
|
'name': self.object
|
||||||
|
}]),
|
||||||
|
|
||||||
|
dict(method='HEAD',
|
||||||
|
uri='{endpoint}/images/{object}'.format(
|
||||||
|
endpoint=self.endpoint,
|
||||||
|
object=self.object),
|
||||||
|
headers={
|
||||||
|
'X-Timestamp': '1429036140.50253',
|
||||||
|
'X-Trans-Id': 'txbbb825960a3243b49a36f-005a0dadaedfw1',
|
||||||
|
'Content-Length': '1290170880',
|
||||||
|
'Last-Modified': 'Tue, 14 Apr 2015 18:29:01 GMT',
|
||||||
|
'x-object-meta-x-sdk-autocreated': 'true',
|
||||||
|
'X-Object-Meta-X-Shade-Sha256': 'does not matter',
|
||||||
|
'X-Object-Meta-X-Shade-Md5': 'does not matter',
|
||||||
|
'Date': 'Thu, 16 Nov 2017 15:24:30 GMT',
|
||||||
|
'Accept-Ranges': 'bytes',
|
||||||
|
'Content-Type': 'application/octet-stream',
|
||||||
|
'Etag': '249219347276c331b87bf1ac2152d9af',
|
||||||
|
}),
|
||||||
|
|
||||||
|
dict(method='DELETE',
|
||||||
|
uri='{endpoint}/images/{object}'.format(
|
||||||
|
endpoint=self.endpoint, object=self.object))
|
||||||
|
])
|
||||||
|
|
||||||
|
self.register_uris(uris_to_mock)
|
||||||
|
|
||||||
|
# image_api_use_tasks needs to be set to True in order for the API
|
||||||
|
# method delete_autocreated_image_objects() to do the cleanup.
|
||||||
|
self.cloud.image_api_use_tasks = True
|
||||||
|
|
||||||
|
self.assertRaises(
|
||||||
|
exc.OpenStackCloudException,
|
||||||
|
self.cloud.create_object,
|
||||||
|
container=self.container, name=self.object,
|
||||||
|
filename=self.object_file.name, use_slo=True)
|
||||||
|
|
||||||
|
# After call 3, order become indeterminate because of thread pool
|
||||||
|
self.assert_calls(stop_after=3)
|
||||||
|
|
||||||
def test_object_segment_retry_failure(self):
|
def test_object_segment_retry_failure(self):
|
||||||
|
|
||||||
max_file_size = 25
|
max_file_size = 25
|
||||||
|
Reference in New Issue
Block a user