Files
cinder/cinder/volume/rpcapi.py
Eric Harney d6895f9d2a Backup manager: Synchronously call remove_export
Call the volume's rpcapi remove_export method with
a "call" instead of a "cast" when detaching volumes
in the backup manager.

This prevents _detach_volume() from returning before
the remove_export() has completed, which helps prevent
problems where remove_export() races against a
subsequent operation.

Closes-Bug: #1920237

Change-Id: I482c3d5520a7bbb9971942cdb4c4208052106d70
2021-03-22 11:04:16 -04:00

509 lines
23 KiB
Python

# Copyright 2012, Intel, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinder.common import constants
from cinder import objects
from cinder import quota
from cinder import rpc
from cinder.volume import volume_utils
QUOTAS = quota.QUOTAS
class VolumeAPI(rpc.RPCAPI):
"""Client side of the volume rpc API.
API version history:
.. code-block:: none
1.0 - Initial version.
1.1 - Adds clone volume option to create_volume.
1.2 - Add publish_service_capabilities() method.
1.3 - Pass all image metadata (not just ID) in copy_volume_to_image.
1.4 - Add request_spec, filter_properties and
allow_reschedule arguments to create_volume().
1.5 - Add accept_transfer.
1.6 - Add extend_volume.
1.7 - Adds host_name parameter to attach_volume()
to allow attaching to host rather than instance.
1.8 - Add migrate_volume, rename_volume.
1.9 - Add new_user and new_project to accept_transfer.
1.10 - Add migrate_volume_completion, remove rename_volume.
1.11 - Adds mode parameter to attach_volume()
to support volume read-only attaching.
1.12 - Adds retype.
1.13 - Adds create_export.
1.14 - Adds reservation parameter to extend_volume().
1.15 - Adds manage_existing and unmanage_only flag to delete_volume.
1.16 - Removes create_export.
1.17 - Add replica option to create_volume, promote_replica and
sync_replica.
1.18 - Adds create_consistencygroup, delete_consistencygroup,
create_cgsnapshot, and delete_cgsnapshot. Also adds
the consistencygroup_id parameter in create_volume.
1.19 - Adds update_migrated_volume
1.20 - Adds support for sending objects over RPC in create_snapshot()
and delete_snapshot()
1.21 - Adds update_consistencygroup.
1.22 - Adds create_consistencygroup_from_src.
1.23 - Adds attachment_id to detach_volume.
1.24 - Removed duplicated parameters: snapshot_id, image_id,
source_volid, source_replicaid, consistencygroup_id and
cgsnapshot_id from create_volume. All off them are already
passed either in request_spec or available in the DB.
1.25 - Add source_cg to create_consistencygroup_from_src.
1.26 - Adds support for sending objects over RPC in
create_consistencygroup(), create_consistencygroup_from_src(),
update_consistencygroup() and delete_consistencygroup().
1.27 - Adds support for replication V2
1.28 - Adds manage_existing_snapshot
1.29 - Adds get_capabilities.
1.30 - Adds remove_export
1.31 - Updated: create_consistencygroup_from_src(), create_cgsnapshot()
and delete_cgsnapshot() to cast method only with necessary
args. Forwarding CGSnapshot object instead of CGSnapshot_id.
1.32 - Adds support for sending objects over RPC in create_volume().
1.33 - Adds support for sending objects over RPC in delete_volume().
1.34 - Adds support for sending objects over RPC in retype().
1.35 - Adds support for sending objects over RPC in extend_volume().
1.36 - Adds support for sending objects over RPC in migrate_volume(),
migrate_volume_completion(), and update_migrated_volume().
1.37 - Adds old_reservations parameter to retype to support quota
checks in the API.
1.38 - Scaling backup service, add get_backup_device() and
secure_file_operations_enabled()
1.39 - Update replication methods to reflect new backend rep strategy
1.40 - Add cascade option to delete_volume().
... Mitaka supports messaging version 1.40. Any changes to existing
methods in 1.x after that point should be done so that they can handle
the version_cap being set to 1.40.
2.0 - Remove 1.x compatibility
2.1 - Add get_manageable_volumes() and get_manageable_snapshots().
2.2 - Adds support for sending objects over RPC in manage_existing().
2.3 - Adds support for sending objects over RPC in
initialize_connection().
2.4 - Sends request_spec as object in create_volume().
2.5 - Adds create_group, delete_group, and update_group
2.6 - Adds create_group_snapshot, delete_group_snapshot, and
create_group_from_src().
... Newton supports messaging version 2.6. Any changes to existing
methods in 2.x after that point should be done so that they can handle
the version_cap being set to 2.6.
3.0 - Drop 2.x compatibility
3.1 - Remove promote_replica and reenable_replication. This is
non-backward compatible, but the user-facing API was removed
back in Mitaka when introducing cheesecake replication.
3.2 - Adds support for sending objects over RPC in
get_backup_device().
3.3 - Adds support for sending objects over RPC in attach_volume().
3.4 - Adds support for sending objects over RPC in detach_volume().
3.5 - Adds support for cluster in retype and migrate_volume
3.6 - Switch to use oslo.messaging topics to indicate backends instead
of @backend suffixes in server names.
3.7 - Adds do_cleanup method to do volume cleanups from other nodes
that we were doing in init_host.
3.8 - Make failover_host cluster aware and add failover_completed.
3.9 - Adds new attach/detach methods
3.10 - Returning objects instead of raw dictionaries in
get_manageable_volumes & get_manageable_snapshots
3.11 - Removes create_consistencygroup, delete_consistencygroup,
create_cgsnapshot, delete_cgsnapshot, update_consistencygroup,
and create_consistencygroup_from_src.
3.12 - Adds set_log_levels and get_log_levels
3.13 - Add initialize_connection_snapshot,
terminate_connection_snapshot, and remove_export_snapshot.
3.14 - Adds enable_replication, disable_replication,
failover_replication, and list_replication_targets.
3.15 - Add revert_to_snapshot method
3.16 - Add no_snapshots to accept_transfer method
"""
RPC_API_VERSION = '3.16'
RPC_DEFAULT_VERSION = '3.0'
TOPIC = constants.VOLUME_TOPIC
BINARY = constants.VOLUME_BINARY
def _get_cctxt(self, host=None, version=None, **kwargs):
if host:
server = volume_utils.extract_host(host)
# TODO(dulek): If we're pinned before 3.6, we should send stuff the
# old way - addressing server=host@backend, topic=cinder-volume.
# Otherwise we're addressing server=host,
# topic=cinder-volume.host@backend. This conditional can go away
# when we stop supporting 3.x.
if self.client.can_send_version('3.6'):
kwargs['topic'] = '%(topic)s.%(host)s' % {'topic': self.TOPIC,
'host': server}
server = volume_utils.extract_host(server, 'host')
kwargs['server'] = server
return super(VolumeAPI, self)._get_cctxt(version=version, **kwargs)
def create_volume(self, ctxt, volume, request_spec, filter_properties,
allow_reschedule=True):
cctxt = self._get_cctxt(volume.service_topic_queue)
cctxt.cast(ctxt, 'create_volume',
request_spec=request_spec,
filter_properties=filter_properties,
allow_reschedule=allow_reschedule,
volume=volume)
@rpc.assert_min_rpc_version('3.15')
def revert_to_snapshot(self, ctxt, volume, snapshot):
version = self._compat_ver('3.15')
cctxt = self._get_cctxt(volume.service_topic_queue, version)
cctxt.cast(ctxt, 'revert_to_snapshot', volume=volume,
snapshot=snapshot)
def delete_volume(self, ctxt, volume, unmanage_only=False, cascade=False):
volume.create_worker()
cctxt = self._get_cctxt(volume.service_topic_queue)
msg_args = {
'volume': volume, 'unmanage_only': unmanage_only,
'cascade': cascade,
}
cctxt.cast(ctxt, 'delete_volume', **msg_args)
def create_snapshot(self, ctxt, volume, snapshot):
snapshot.create_worker()
cctxt = self._get_cctxt(volume.service_topic_queue)
cctxt.cast(ctxt, 'create_snapshot', snapshot=snapshot)
def delete_snapshot(self, ctxt, snapshot, unmanage_only=False):
cctxt = self._get_cctxt(snapshot.service_topic_queue)
cctxt.cast(ctxt, 'delete_snapshot', snapshot=snapshot,
unmanage_only=unmanage_only)
def attach_volume(self, ctxt, volume, instance_uuid, host_name,
mountpoint, mode):
msg_args = {'volume_id': volume.id,
'instance_uuid': instance_uuid,
'host_name': host_name,
'mountpoint': mountpoint,
'mode': mode,
'volume': volume}
cctxt = self._get_cctxt(volume.service_topic_queue, ('3.3', '3.0'))
if not cctxt.can_send_version('3.3'):
msg_args.pop('volume')
return cctxt.call(ctxt, 'attach_volume', **msg_args)
def detach_volume(self, ctxt, volume, attachment_id):
msg_args = {'volume_id': volume.id,
'attachment_id': attachment_id,
'volume': volume}
cctxt = self._get_cctxt(volume.service_topic_queue, ('3.4', '3.0'))
if not self.client.can_send_version('3.4'):
msg_args.pop('volume')
return cctxt.call(ctxt, 'detach_volume', **msg_args)
def copy_volume_to_image(self, ctxt, volume, image_meta):
cctxt = self._get_cctxt(volume.service_topic_queue)
cctxt.cast(ctxt, 'copy_volume_to_image', volume_id=volume['id'],
image_meta=image_meta)
def initialize_connection(self, ctxt, volume, connector):
cctxt = self._get_cctxt(volume.service_topic_queue)
return cctxt.call(ctxt, 'initialize_connection', connector=connector,
volume=volume)
def terminate_connection(self, ctxt, volume, connector, force=False):
cctxt = self._get_cctxt(volume.service_topic_queue)
return cctxt.call(ctxt, 'terminate_connection', volume_id=volume['id'],
connector=connector, force=force)
def remove_export(self, ctxt, volume, sync=False):
cctxt = self._get_cctxt(volume.service_topic_queue)
if sync:
cctxt.call(ctxt, 'remove_export', volume_id=volume.id)
else:
cctxt.cast(ctxt, 'remove_export', volume_id=volume.id)
def publish_service_capabilities(self, ctxt):
cctxt = self._get_cctxt(fanout=True)
cctxt.cast(ctxt, 'publish_service_capabilities')
def accept_transfer(self, ctxt, volume, new_user, new_project,
no_snapshots=False):
msg_args = {'volume_id': volume['id'],
'new_user': new_user,
'new_project': new_project,
'no_snapshots': no_snapshots
}
cctxt = self._get_cctxt(volume.service_topic_queue, ('3.16', '3.0'))
if not self.client.can_send_version('3.16'):
msg_args.pop('no_snapshots')
return cctxt.call(ctxt, 'accept_transfer', **msg_args)
def extend_volume(self, ctxt, volume, new_size, reservations):
cctxt = self._get_cctxt(volume.service_topic_queue)
cctxt.cast(ctxt, 'extend_volume', volume=volume, new_size=new_size,
reservations=reservations)
def migrate_volume(self, ctxt, volume, dest_backend, force_host_copy):
backend_p = {'host': dest_backend.host,
'cluster_name': dest_backend.cluster_name,
'capabilities': dest_backend.capabilities}
version = '3.5'
if not self.client.can_send_version(version):
version = '3.0'
del backend_p['cluster_name']
cctxt = self._get_cctxt(volume.service_topic_queue, version)
cctxt.cast(ctxt, 'migrate_volume', volume=volume, host=backend_p,
force_host_copy=force_host_copy)
def migrate_volume_completion(self, ctxt, volume, new_volume, error):
cctxt = self._get_cctxt(volume.service_topic_queue)
return cctxt.call(ctxt, 'migrate_volume_completion', volume=volume,
new_volume=new_volume, error=error,)
def retype(self, ctxt, volume, new_type_id, dest_backend,
migration_policy='never', reservations=None,
old_reservations=None):
backend_p = {'host': dest_backend.host,
'cluster_name': dest_backend.cluster_name,
'capabilities': dest_backend.capabilities}
version = '3.5'
if not self.client.can_send_version(version):
version = '3.0'
del backend_p['cluster_name']
cctxt = self._get_cctxt(volume.service_topic_queue, version)
cctxt.cast(ctxt, 'retype', volume=volume, new_type_id=new_type_id,
host=backend_p, migration_policy=migration_policy,
reservations=reservations,
old_reservations=old_reservations)
def manage_existing(self, ctxt, volume, ref):
cctxt = self._get_cctxt(volume.service_topic_queue)
cctxt.cast(ctxt, 'manage_existing', ref=ref, volume=volume)
def update_migrated_volume(self, ctxt, volume, new_volume,
original_volume_status):
cctxt = self._get_cctxt(new_volume.service_topic_queue)
cctxt.call(ctxt, 'update_migrated_volume',
volume=volume,
new_volume=new_volume,
volume_status=original_volume_status)
def freeze_host(self, ctxt, service):
"""Set backend host to frozen."""
cctxt = self._get_cctxt(service.service_topic_queue)
return cctxt.call(ctxt, 'freeze_host')
def thaw_host(self, ctxt, service):
"""Clear the frozen setting on a backend host."""
cctxt = self._get_cctxt(service.service_topic_queue)
return cctxt.call(ctxt, 'thaw_host')
def failover(self, ctxt, service, secondary_backend_id=None):
"""Failover host to the specified backend_id (secondary). """
version = '3.8'
method = 'failover'
if not self.client.can_send_version(version):
version = '3.0'
method = 'failover_host'
cctxt = self._get_cctxt(service.service_topic_queue, version)
cctxt.cast(ctxt, method, secondary_backend_id=secondary_backend_id)
def failover_completed(self, ctxt, service, updates):
"""Complete failover on all services of the cluster."""
cctxt = self._get_cctxt(service.service_topic_queue, '3.8',
fanout=True)
cctxt.cast(ctxt, 'failover_completed', updates=updates)
def manage_existing_snapshot(self, ctxt, snapshot, ref, backend):
cctxt = self._get_cctxt(backend)
cctxt.cast(ctxt, 'manage_existing_snapshot',
snapshot=snapshot,
ref=ref)
def get_capabilities(self, ctxt, backend_id, discover):
cctxt = self._get_cctxt(backend_id)
return cctxt.call(ctxt, 'get_capabilities', discover=discover)
def get_backup_device(self, ctxt, backup, volume):
cctxt = self._get_cctxt(volume.service_topic_queue, ('3.2', '3.0'))
if cctxt.can_send_version('3.2'):
backup_obj = cctxt.call(ctxt, 'get_backup_device', backup=backup,
want_objects=True)
else:
backup_dict = cctxt.call(ctxt, 'get_backup_device', backup=backup)
backup_obj = objects.BackupDeviceInfo.from_primitive(backup_dict,
ctxt)
return backup_obj
def secure_file_operations_enabled(self, ctxt, volume):
cctxt = self._get_cctxt(volume.service_topic_queue)
return cctxt.call(ctxt, 'secure_file_operations_enabled',
volume=volume)
def get_manageable_volumes(self, ctxt, service, marker, limit, offset,
sort_keys, sort_dirs):
version = ('3.10', '3.0')
cctxt = self._get_cctxt(service.service_topic_queue, version=version)
msg_args = {'marker': marker,
'limit': limit,
'offset': offset,
'sort_keys': sort_keys,
'sort_dirs': sort_dirs,
}
if cctxt.can_send_version('3.10'):
msg_args['want_objects'] = True
return cctxt.call(ctxt, 'get_manageable_volumes', **msg_args)
def get_manageable_snapshots(self, ctxt, service, marker, limit, offset,
sort_keys, sort_dirs):
version = ('3.10', '3.0')
cctxt = self._get_cctxt(service.service_topic_queue, version=version)
msg_args = {'marker': marker,
'limit': limit,
'offset': offset,
'sort_keys': sort_keys,
'sort_dirs': sort_dirs,
}
if cctxt.can_send_version('3.10'):
msg_args['want_objects'] = True
return cctxt.call(ctxt, 'get_manageable_snapshots', **msg_args)
def create_group(self, ctxt, group):
cctxt = self._get_cctxt(group.service_topic_queue)
cctxt.cast(ctxt, 'create_group', group=group)
def delete_group(self, ctxt, group):
cctxt = self._get_cctxt(group.service_topic_queue)
cctxt.cast(ctxt, 'delete_group', group=group)
def update_group(self, ctxt, group, add_volumes=None, remove_volumes=None):
cctxt = self._get_cctxt(group.service_topic_queue)
cctxt.cast(ctxt, 'update_group', group=group, add_volumes=add_volumes,
remove_volumes=remove_volumes)
def create_group_from_src(self, ctxt, group, group_snapshot=None,
source_group=None):
cctxt = self._get_cctxt(group.service_topic_queue)
cctxt.cast(ctxt, 'create_group_from_src', group=group,
group_snapshot=group_snapshot, source_group=source_group)
def create_group_snapshot(self, ctxt, group_snapshot):
cctxt = self._get_cctxt(group_snapshot.service_topic_queue)
cctxt.cast(ctxt, 'create_group_snapshot',
group_snapshot=group_snapshot)
def delete_group_snapshot(self, ctxt, group_snapshot):
cctxt = self._get_cctxt(group_snapshot.service_topic_queue)
cctxt.cast(ctxt, 'delete_group_snapshot',
group_snapshot=group_snapshot)
@rpc.assert_min_rpc_version('3.13')
def initialize_connection_snapshot(self, ctxt, snapshot, connector):
cctxt = self._get_cctxt(snapshot.service_topic_queue, version='3.13')
return cctxt.call(ctxt, 'initialize_connection_snapshot',
snapshot_id=snapshot.id,
connector=connector)
@rpc.assert_min_rpc_version('3.13')
def terminate_connection_snapshot(self, ctxt, snapshot, connector,
force=False):
cctxt = self._get_cctxt(snapshot.service_topic_queue, version='3.13')
return cctxt.call(ctxt, 'terminate_connection_snapshot',
snapshot_id=snapshot.id,
connector=connector, force=force)
@rpc.assert_min_rpc_version('3.13')
def remove_export_snapshot(self, ctxt, snapshot):
cctxt = self._get_cctxt(snapshot.service_topic_queue, version='3.13')
cctxt.cast(ctxt, 'remove_export_snapshot', snapshot_id=snapshot.id)
@rpc.assert_min_rpc_version('3.9')
def attachment_update(self, ctxt, vref, connector, attachment_id):
version = self._compat_ver('3.9')
cctxt = self._get_cctxt(vref.service_topic_queue, version=version)
return cctxt.call(ctxt,
'attachment_update',
vref=vref,
connector=connector,
attachment_id=attachment_id)
@rpc.assert_min_rpc_version('3.9')
def attachment_delete(self, ctxt, attachment_id, vref):
version = self._compat_ver('3.9')
cctxt = self._get_cctxt(vref.service_topic_queue, version=version)
return cctxt.call(ctxt,
'attachment_delete',
attachment_id=attachment_id,
vref=vref)
@rpc.assert_min_rpc_version('3.7')
def do_cleanup(self, ctxt, cleanup_request):
"""Perform this service/cluster resource cleanup as requested."""
destination = cleanup_request.service_topic_queue
cctxt = self._get_cctxt(destination, '3.7')
# NOTE(geguileo): This call goes to do_cleanup code in
# cinder.manager.CleanableManager unless in the future we overwrite it
# in cinder.volume.manager
cctxt.cast(ctxt, 'do_cleanup', cleanup_request=cleanup_request)
@rpc.assert_min_rpc_version('3.12')
def set_log_levels(self, context, service, log_request):
cctxt = self._get_cctxt(host=service.host, version='3.12')
cctxt.cast(context, 'set_log_levels', log_request=log_request)
@rpc.assert_min_rpc_version('3.12')
def get_log_levels(self, context, service, log_request):
cctxt = self._get_cctxt(host=service.host, version='3.12')
return cctxt.call(context, 'get_log_levels', log_request=log_request)
@rpc.assert_min_rpc_version('3.14')
def enable_replication(self, ctxt, group):
cctxt = self._get_cctxt(group.service_topic_queue, version='3.14')
cctxt.cast(ctxt, 'enable_replication',
group=group)
@rpc.assert_min_rpc_version('3.14')
def disable_replication(self, ctxt, group):
cctxt = self._get_cctxt(group.service_topic_queue, version='3.14')
cctxt.cast(ctxt, 'disable_replication',
group=group)
@rpc.assert_min_rpc_version('3.14')
def failover_replication(self, ctxt, group, allow_attached_volume=False,
secondary_backend_id=None):
cctxt = self._get_cctxt(group.service_topic_queue, version='3.14')
cctxt.cast(ctxt, 'failover_replication',
group=group, allow_attached_volume=allow_attached_volume,
secondary_backend_id=secondary_backend_id)
@rpc.assert_min_rpc_version('3.14')
def list_replication_targets(self, ctxt, group):
cctxt = self._get_cctxt(group.service_topic_queue, version='3.14')
return cctxt.call(ctxt, 'list_replication_targets',
group=group)