diff --git a/os_brick/exception.py b/os_brick/exception.py index fcf708748..2e4d26280 100644 --- a/os_brick/exception.py +++ b/os_brick/exception.py @@ -132,3 +132,12 @@ class VolumeGroupCreationFailed(BrickException): class CommandExecutionFailed(BrickException): message = _("Failed to execute command %(cmd)s") + + +class VolumeDriverException(BrickException): + message = _('An error occurred while IO to volume %(name)s.') + + +class InvalidIOHandleObject(BrickException): + message = _('IO handle of %(protocol)s has wrong object ' + 'type %(actual_type)s.') diff --git a/os_brick/initiator/connector.py b/os_brick/initiator/connector.py index 26ec27510..16e8fb3be 100644 --- a/os_brick/initiator/connector.py +++ b/os_brick/initiator/connector.py @@ -53,6 +53,7 @@ from os_brick.initiator import host_driver from os_brick.initiator import linuxfc from os_brick.initiator import linuxrbd from os_brick.initiator import linuxscsi +from os_brick.initiator import linuxsheepdog from os_brick.remotefs import remotefs from os_brick.i18n import _, _LE, _LI, _LW @@ -80,6 +81,7 @@ SCALITY = "SCALITY" QUOBYTE = "QUOBYTE" DISCO = "DISCO" VZSTORAGE = "VZSTORAGE" +SHEEPDOG = "SHEEPDOG" def _check_multipathd_running(root_helper, enforce_multipath): @@ -265,6 +267,12 @@ class InitiatorConnector(executor.Executor): device_scan_attempts=device_scan_attempts, *args, **kwargs ) + elif protocol == SHEEPDOG: + return SheepdogConnector(root_helper=root_helper, + driver=driver, + execute=execute, + device_scan_attempts=device_scan_attempts, + *args, **kwargs) else: msg = (_("Invalid InitiatorConnector protocol " "specified %(protocol)s") % @@ -466,6 +474,13 @@ class InitiatorConnector(executor.Executor): else: return [] + def check_IO_handle_valid(self, handle, data_type, protocol): + """Check IO handle has correct data type.""" + if (handle and not isinstance(handle, data_type)): + raise exception.InvalidIOHandleObject( + protocol=protocol, + actual_type=type(handle)) + class FakeConnector(InitiatorConnector): @@ -2989,3 +3004,99 @@ class DISCOConnector(InitiatorConnector): def extend_volume(self, connection_properties): raise NotImplementedError + + +class SheepdogConnector(InitiatorConnector): + """"Connector class to attach/detach sheepdog volumes.""" + + def __init__(self, root_helper, driver=None, + execute=putils.execute, use_multipath=False, + device_scan_attempts=DEVICE_SCAN_ATTEMPTS_DEFAULT, + *args, **kwargs): + + super(SheepdogConnector, self).__init__(root_helper, driver=driver, + execute=execute, + device_scan_attempts= + device_scan_attempts, + *args, **kwargs) + + def get_volume_paths(self, connection_properties): + # TODO(lixiaoy1): don't know where the connector + # looks for sheepdog volumes. + return [] + + def get_search_path(self): + # TODO(lixiaoy1): don't know where the connector + # looks for sheepdog volumes. + return None + + def get_all_available_volumes(self, connection_properties=None): + # TODO(lixiaoy1): not sure what to return here for sheepdog + return [] + + def _get_sheepdog_handle(self, connection_properties): + try: + host = connection_properties['hosts'][0] + name = connection_properties['name'] + port = connection_properties['ports'][0] + except IndexError: + msg = _("Connect volume failed, malformed connection properties") + raise exception.BrickException(msg=msg) + + sheepdog_handle = linuxsheepdog.SheepdogVolumeIOWrapper( + host, port, name) + return sheepdog_handle + + def connect_volume(self, connection_properties): + """Connect to a volume. + + :param connection_properties: The dictionary that describes all + of the target volume attributes. + :type connection_properties: dict + :returns: dict + """ + + sheepdog_handle = self._get_sheepdog_handle(connection_properties) + return {'path': sheepdog_handle} + + def disconnect_volume(self, connection_properties, device_info): + """Disconnect a volume. + + :param connection_properties: The dictionary that describes all + of the target volume attributes. + :type connection_properties: dict + :param device_info: historical difference, but same as connection_props + :type device_info: dict + """ + if device_info: + sheepdog_handle = device_info.get('path', None) + self.check_IO_handle_valid(sheepdog_handle, + linuxsheepdog.SheepdogVolumeIOWrapper, + 'Sheepdog') + if sheepdog_handle is not None: + sheepdog_handle.close() + + def check_valid_device(self, path, run_as_root=True): + """Verify an existing sheepdog handle is connected and valid.""" + sheepdog_handle = path + + if sheepdog_handle is None: + return False + + original_offset = sheepdog_handle.tell() + + try: + sheepdog_handle.read(4096) + except Exception as e: + LOG.error(_LE("Failed to access sheepdog device " + "handle: %(error)s"), + {"error": e}) + return False + finally: + sheepdog_handle.seek(original_offset, 0) + + return True + + def extend_volume(self, connection_properties): + # TODO(lixiaoy1): is this possible? + raise NotImplementedError diff --git a/os_brick/initiator/linuxsheepdog.py b/os_brick/initiator/linuxsheepdog.py new file mode 100644 index 000000000..b874c8b32 --- /dev/null +++ b/os_brick/initiator/linuxsheepdog.py @@ -0,0 +1,118 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Generic SheepDog Connection Utilities. + +""" +import eventlet +import io +from oslo_concurrency import processutils +from oslo_log import log as logging + +from os_brick import exception +from os_brick.i18n import _ + + +LOG = logging.getLogger(__name__) + + +class SheepdogVolumeIOWrapper(io.RawIOBase): + """File-like object with Sheepdog backend.""" + + def __init__(self, addr, port, volume, snapshot_name=None): + self._addr = addr + self._port = port + self._vdiname = volume + self._snapshot_name = snapshot_name + self._offset = 0 + # SheepdogVolumeIOWrapper instance becomes invalid + # if a write error occurs. + self._valid = True + + def _execute(self, cmd, data=None): + try: + # NOTE(yamada-h): processutils.execute causes busy waiting + # under eventlet. + # To avoid wasting CPU resources, it should not be used for + # the command which takes long time to execute. + # For workaround, we replace a subprocess module with + # the original one while only executing a read/write command. + _processutils_subprocess = processutils.subprocess + processutils.subprocess = eventlet.patcher.original('subprocess') + return processutils.execute(*cmd, process_input=data)[0] + except (processutils.ProcessExecutionError, OSError): + self._valid = False + raise exception.VolumeDriverException(name=self._vdiname) + finally: + processutils.subprocess = _processutils_subprocess + + def read(self, length=None): + if not self._valid: + raise exception.VolumeDriverException(name=self._vdiname) + + cmd = ['dog', 'vdi', 'read', '-a', self._addr, '-p', self._port] + if self._snapshot_name: + cmd.extend(('-s', self._snapshot_name)) + cmd.extend((self._vdiname, self._offset)) + if length: + cmd.append(length) + data = self._execute(cmd) + self._offset += len(data) + return data + + def write(self, data): + if not self._valid: + raise exception.VolumeDriverException(name=self._vdiname) + + length = len(data) + cmd = ('dog', 'vdi', 'write', '-a', self._addr, '-p', self._port, + self._vdiname, self._offset, length) + self._execute(cmd, data) + self._offset += length + return length + + def seek(self, offset, whence=0): + if not self._valid: + raise exception.VolumeDriverException(name=self._vdiname) + + if whence == 0: + # SEEK_SET or 0 - start of the stream (the default); + # offset should be zero or positive + new_offset = offset + elif whence == 1: + # SEEK_CUR or 1 - current stream position; offset may be negative + new_offset = self._offset + offset + else: + # SEEK_END or 2 - end of the stream; offset is usually negative + # TODO(yamada-h): Support SEEK_END + raise IOError(_("Invalid argument - whence=%s not supported.") % + whence) + + if new_offset < 0: + raise IOError(_("Invalid argument - negative seek offset.")) + + self._offset = new_offset + + def tell(self): + return self._offset + + def flush(self): + pass + + def fileno(self): + """Sheepdog does not have support for fileno so we raise IOError. + + Raising IOError is recommended way to notify caller that interface is + not supported - see http://docs.python.org/2/library/io.html#io.IOBase + """ + raise IOError(_("fileno is not supported by SheepdogVolumeIOWrapper")) diff --git a/os_brick/tests/initiator/test_connector.py b/os_brick/tests/initiator/test_connector.py index 381ec319d..cbaba652d 100644 --- a/os_brick/tests/initiator/test_connector.py +++ b/os_brick/tests/initiator/test_connector.py @@ -35,6 +35,7 @@ from os_brick.initiator import host_driver from os_brick.initiator import linuxfc from os_brick.initiator import linuxrbd from os_brick.initiator import linuxscsi +from os_brick.initiator import linuxsheepdog from os_brick.remotefs import remotefs from os_brick.tests import base @@ -2600,3 +2601,63 @@ class DISCOConnectorTestCase(ConnectorTestCase): self.assertRaises(NotImplementedError, self.connector.extend_volume, self.fake_connection_properties) + + +class SheepdogConnectorTestCase(ConnectorTestCase): + + def setUp(self): + super(SheepdogConnectorTestCase, self).setUp() + + self.hosts = ['fake_hosts'] + self.ports = ['fake_ports'] + self.volume = 'fake_volume' + + self.connection_properties = { + 'hosts': self.hosts, + 'name': self.volume, + 'ports': self.ports, + } + + def test_get_search_path(self): + sheepdog = connector.SheepdogConnector(None) + path = sheepdog.get_search_path() + self.assertIsNone(path) + + def test_get_volume_paths(self): + sheepdog = connector.SheepdogConnector(None) + expected = [] + actual = sheepdog.get_volume_paths(self.connection_properties) + self.assertEqual(expected, actual) + + def test_connect_volume(self): + """Test the connect volume case.""" + sheepdog = connector.SheepdogConnector(None) + device_info = sheepdog.connect_volume(self.connection_properties) + + # Ensure expected object is returned correctly + self.assertTrue(isinstance(device_info['path'], + linuxsheepdog.SheepdogVolumeIOWrapper)) + + @mock.patch.object(linuxsheepdog.SheepdogVolumeIOWrapper, 'close') + def test_disconnect_volume(self, volume_close): + """Test the disconnect volume case.""" + sheepdog = connector.SheepdogConnector(None) + device_info = sheepdog.connect_volume(self.connection_properties) + sheepdog.disconnect_volume(self.connection_properties, device_info) + + self.assertEqual(1, volume_close.call_count) + + def test_disconnect_volume_with_invalid_handle(self): + """Test the disconnect volume case with invalid handle.""" + sheepdog = connector.SheepdogConnector(None) + device_info = {'path': 'fake_handle'} + self.assertRaises(exception.InvalidIOHandleObject, + sheepdog.disconnect_volume, + self.connection_properties, + device_info) + + def test_extend_volume(self): + sheepdog = connector.SheepdogConnector(None) + self.assertRaises(NotImplementedError, + sheepdog.extend_volume, + self.connection_properties) diff --git a/os_brick/tests/initiator/test_linuxsheepdog.py b/os_brick/tests/initiator/test_linuxsheepdog.py new file mode 100644 index 000000000..64fcdd2a0 --- /dev/null +++ b/os_brick/tests/initiator/test_linuxsheepdog.py @@ -0,0 +1,121 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import mock +from os_brick import exception +from os_brick.initiator import linuxsheepdog +from os_brick.tests import base +from oslo_concurrency import processutils + +SHEEP_ADDR = '127.0.0.1' +SHEEP_PORT = 7000 + + +class SheepdogVolumeIOWrapperTestCase(base.TestCase): + def setUp(self): + super(SheepdogVolumeIOWrapperTestCase, self).setUp() + self.volume = 'volume-2f9b2ff5-987b-4412-a91c-23caaf0d5aff' + self.snapshot_name = 'snapshot-bf452d80-068a-43d7-ba9f-196cf47bd0be' + + self.vdi_wrapper = linuxsheepdog.SheepdogVolumeIOWrapper( + SHEEP_ADDR, SHEEP_PORT, self.volume) + self.snapshot_wrapper = linuxsheepdog.SheepdogVolumeIOWrapper( + SHEEP_ADDR, SHEEP_PORT, self.volume, self.snapshot_name) + + self.execute = mock.MagicMock() + self.mock_object(processutils, 'execute', self.execute) + + def test_init(self): + self.assertEqual(self.volume, self.vdi_wrapper._vdiname) + self.assertIsNone(self.vdi_wrapper._snapshot_name) + self.assertEqual(0, self.vdi_wrapper._offset) + + self.assertEqual(self.snapshot_name, + self.snapshot_wrapper._snapshot_name) + + def test_execute(self): + cmd = ('cmd1', 'arg1') + data = 'data1' + + self.vdi_wrapper._execute(cmd, data) + + self.execute.assert_called_once_with(*cmd, process_input=data) + + def test_execute_error(self): + cmd = ('cmd1', 'arg1') + data = 'data1' + self.mock_object(processutils, 'execute', + mock.MagicMock(side_effect=OSError)) + + args = (cmd, data) + self.assertRaises(exception.VolumeDriverException, + self.vdi_wrapper._execute, + *args) + + def test_read_vdi(self): + self.vdi_wrapper.read() + self.execute.assert_called_once_with( + 'dog', 'vdi', 'read', '-a', SHEEP_ADDR, '-p', SHEEP_PORT, + self.volume, 0, process_input=None) + + def test_read_vdi_invalid(self): + self.vdi_wrapper._valid = False + self.assertRaises(exception.VolumeDriverException, + self.vdi_wrapper.read) + + def test_write_vdi(self): + data = 'data1' + + self.vdi_wrapper.write(data) + + self.execute.assert_called_once_with( + 'dog', 'vdi', 'write', '-a', SHEEP_ADDR, '-p', SHEEP_PORT, + self.volume, 0, len(data), + process_input=data) + self.assertEqual(len(data), self.vdi_wrapper.tell()) + + def test_write_vdi_invalid(self): + self.vdi_wrapper._valid = False + self.assertRaises(exception.VolumeDriverException, + self.vdi_wrapper.write, 'dummy_data') + + def test_read_snapshot(self): + self.snapshot_wrapper.read() + self.execute.assert_called_once_with( + 'dog', 'vdi', 'read', '-a', SHEEP_ADDR, '-p', SHEEP_PORT, + '-s', self.snapshot_name, self.volume, 0, + process_input=None) + + def test_seek(self): + self.vdi_wrapper.seek(12345) + self.assertEqual(12345, self.vdi_wrapper.tell()) + + self.vdi_wrapper.seek(-2345, whence=1) + self.assertEqual(10000, self.vdi_wrapper.tell()) + + # This results in negative offset. + self.assertRaises(IOError, self.vdi_wrapper.seek, -20000, whence=1) + + def test_seek_invalid(self): + seek_num = 12345 + self.vdi_wrapper._valid = False + self.assertRaises(exception.VolumeDriverException, + self.vdi_wrapper.seek, seek_num) + + def test_flush(self): + # flush does nothing. + self.vdi_wrapper.flush() + self.assertFalse(self.execute.called) + + def test_fileno(self): + self.assertRaises(IOError, self.vdi_wrapper.fileno)