Merge "xenapi: Moving Glance fetch code into image/glance:download_vhd"

This commit is contained in:
Jenkins
2013-08-05 11:34:17 +00:00
committed by Gerrit Code Review
6 changed files with 195 additions and 205 deletions

View File

@@ -1327,3 +1327,7 @@ class InstanceGroupMemberNotFound(NotFound):
class InstanceGroupPolicyNotFound(NotFound):
msg_fmt = _("Instance group %(group_uuid)s has no policy %(policy)s.")
class PluginRetriesExceeded(NovaException):
msg_fmt = _("Number of retries to plugin (%(num_retries)d) exceeded.")

View File

@@ -19,55 +19,80 @@
import mox
from nova import context
from nova import test
from nova.tests.virt.xenapi import stubs
from nova.virt.xenapi import driver as xenapi_conn
from nova.virt.xenapi import fake
from nova.virt.xenapi.image import glance
from nova.virt.xenapi import vm_utils
class TestGlanceStore(test.TestCase):
class TestGlanceStore(stubs.XenAPITestBase):
def setUp(self):
super(TestGlanceStore, self).setUp()
self.store = glance.GlanceStore()
self.mox = mox.Mox()
def test_upload_image(self):
glance_host = '0.1.2.3'
glance_port = 8143
glance_use_ssl = False
sr_path = '/fake/sr/path'
self.flags(glance_host=glance_host)
self.flags(glance_port=glance_port)
self.flags(glance_api_insecure=glance_use_ssl)
self.flags(glance_host='1.1.1.1',
glance_port=123,
glance_api_insecure=False,
xenapi_connection_url='test_url',
xenapi_connection_password='test_pass')
def fake_get_sr_path(*_args, **_kwargs):
return sr_path
self.context = context.RequestContext(
'user', 'project', auth_token='foobar')
self.stubs.Set(vm_utils, 'get_sr_path', fake_get_sr_path)
fake.reset()
stubs.stubout_session(self.stubs, fake.SessionBase)
driver = xenapi_conn.XenAPIDriver(False)
self.session = driver._session
ctx = context.RequestContext('user', 'project', auth_token='foobar')
properties = {
'auto_disk_config': True,
'os_type': 'default',
'xenapi_use_agent': 'true',
}
image_id = 'fake_image_uuid'
vdi_uuids = ['fake_vdi_uuid']
instance = {'uuid': 'blah',
'system_metadata': {'image_xenapi_use_agent': 'true'}}
instance.update(properties)
self.stubs.Set(
vm_utils, 'get_sr_path', lambda *a, **kw: '/fake/sr/path')
params = {'vdi_uuids': vdi_uuids,
'image_id': image_id,
'glance_host': glance_host,
'glance_port': glance_port,
'glance_use_ssl': glance_use_ssl,
'sr_path': sr_path,
self.instance = {'uuid': 'blah',
'system_metadata': {'image_xenapi_use_agent': 'true'},
'auto_disk_config': True,
'os_type': 'default',
'xenapi_use_agent': 'true'}
def test_download_image(self):
params = {'image_id': 'fake_image_uuid',
'glance_host': '1.1.1.1',
'glance_port': 123,
'glance_use_ssl': False,
'sr_path': '/fake/sr/path',
'auth_token': 'foobar',
'properties': properties}
session = self.mox.CreateMockAnything()
session.call_plugin_serialized('glance', 'upload_vhd', **params)
'uuid_stack': ['uuid1']}
self.stubs.Set(vm_utils, '_make_uuid_stack',
lambda *a, **kw: ['uuid1'])
self.mox.StubOutWithMock(self.session, 'call_plugin_serialized')
self.session.call_plugin_serialized('glance', 'download_vhd', **params)
self.mox.ReplayAll()
self.store.upload_image(ctx, session, instance, vdi_uuids, image_id)
vdis = self.store.download_image(
self.context, self.session, self.instance, 'fake_image_uuid')
self.mox.VerifyAll()
def test_upload_image(self):
params = {'vdi_uuids': ['fake_vdi_uuid'],
'image_id': 'fake_image_uuid',
'glance_host': '1.1.1.1',
'glance_port': 123,
'glance_use_ssl': False,
'sr_path': '/fake/sr/path',
'auth_token': 'foobar',
'properties': {'auto_disk_config': True,
'os_type': 'default',
'xenapi_use_agent': 'true'}}
self.mox.StubOutWithMock(self.session, 'call_plugin_serialized')
self.session.call_plugin_serialized('glance', 'upload_vhd', **params)
self.mox.ReplayAll()
self.store.upload_image(self.context, self.session, self.instance,
['fake_vdi_uuid'], 'fake_image_uuid')
self.mox.VerifyAll()

View File

@@ -16,7 +16,6 @@
# under the License.
import contextlib
import copy
import pkg_resources
import urlparse
@@ -215,60 +214,55 @@ class FakeSession():
def call_xenapi(self, *args):
pass
def call_plugin_serialized_with_retry(self, plugin, fn, num_retries,
callback, *args, **kwargs):
pass
class FetchVhdImageTestCase(test.TestCase):
def _apply_stubouts(self):
self.mox.StubOutWithMock(vm_utils, '_make_uuid_stack')
self.mox.StubOutWithMock(vm_utils, 'get_sr_path')
self.mox.StubOutWithMock(vm_utils, '_image_uses_bittorrent')
self.mox.StubOutWithMock(vm_utils, '_add_torrent_url')
self.mox.StubOutWithMock(vm_utils, '_add_bittorrent_params')
self.mox.StubOutWithMock(vm_utils, '_generate_glance_callback')
self.mox.StubOutWithMock(vm_utils,
'_fetch_using_dom0_plugin_with_retry')
self.mox.StubOutWithMock(vm_utils, 'safe_find_sr')
self.mox.StubOutWithMock(vm_utils, '_scan_sr')
self.mox.StubOutWithMock(vm_utils, '_check_vdi_size')
self.mox.StubOutWithMock(vm_utils, 'destroy_vdi')
def setUp(self):
super(FetchVhdImageTestCase, self).setUp()
self.context = context.get_admin_context()
self.context.auth_token = 'auth_token'
def _common_params_setup(self, uses_bittorrent):
self.context = "context"
self.session = FakeSession()
self.instance = {"uuid": "uuid"}
self.image_id = "image_id"
self.uuid_stack = ["uuid_stack"]
self.sr_path = "sr_path"
self.params = {'image_id': self.image_id,
'uuid_stack': self.uuid_stack, 'sr_path': self.sr_path}
self.bt_params = copy.copy(self.params)
self.bt_params['torrent_url'] = "%s.torrent" % self.image_id
'uuid_stack': self.uuid_stack,
'sr_path': self.sr_path}
self.vdis = {'root': {'uuid': 'vdi'}}
self.mox.StubOutWithMock(vm_utils, '_make_uuid_stack')
self.mox.StubOutWithMock(vm_utils, 'get_sr_path')
self.mox.StubOutWithMock(vm_utils, '_image_uses_bittorrent')
self.mox.StubOutWithMock(vm_utils, '_add_bittorrent_params')
self.mox.StubOutWithMock(vm_utils, 'safe_find_sr')
self.mox.StubOutWithMock(vm_utils, '_scan_sr')
self.mox.StubOutWithMock(vm_utils, '_check_vdi_size')
self.mox.StubOutWithMock(vm_utils, 'destroy_vdi')
self.mox.StubOutWithMock(
self.session, 'call_plugin_serialized_with_retry')
self.mox.StubOutWithMock(vm_utils, '_add_torrent_url')
vm_utils._make_uuid_stack().AndReturn(self.uuid_stack)
vm_utils.get_sr_path(self.session).AndReturn(self.sr_path)
vm_utils._image_uses_bittorrent(self.context,
self.instance).AndReturn(uses_bittorrent)
if uses_bittorrent:
def set_url(instance, image_id, params):
params['torrent_url'] = "%s.torrent" % image_id
vm_utils._add_torrent_url(self.instance, self.image_id,
self.params).WithSideEffects(set_url).AndReturn(True)
def test_fetch_vhd_image_works_with_glance(self):
self._apply_stubouts()
self._common_params_setup(False)
vm_utils._image_uses_bittorrent(
self.context, self.instance).AndReturn(False)
self.params['auth_token'] = 'auth_token'
vm_utils._generate_glance_callback(self.context).AndReturn("dummy")
vm_utils._fetch_using_dom0_plugin_with_retry(self.context,
self.session, self.image_id, "glance", self.params,
callback="dummy").AndReturn(self.vdis)
self.session.call_plugin_serialized_with_retry(
'glance', 'download_vhd', 0, mox.IgnoreArg(),
**self.params).AndReturn(self.vdis)
vm_utils.safe_find_sr(self.session).AndReturn("sr")
vm_utils._scan_sr(self.session, "sr")
vm_utils._check_vdi_size(self.context, self.session, self.instance,
"vdi")
vm_utils._check_vdi_size(
self.context, self.session, self.instance, "vdi")
self.mox.ReplayAll()
@@ -277,15 +271,24 @@ class FetchVhdImageTestCase(test.TestCase):
self.mox.VerifyAll()
def _setup_bittorrent(self):
vm_utils._image_uses_bittorrent(
self.context, self.instance).AndReturn(True)
def _add_torrent_url(instance, image_id, params):
params['torrent_url'] = "%s.torrent" % image_id
vm_utils._add_torrent_url(
self.instance, self.image_id, mox.IgnoreArg()).AndReturn(True)
vm_utils._add_bittorrent_params(self.image_id, self.params)
self.session.call_plugin_serialized_with_retry(
'bittorrent', 'download_vhd', 0, None,
**self.params).AndReturn(self.vdis)
def test_fetch_vhd_image_works_with_bittorrent(self):
self._apply_stubouts()
self._common_params_setup(True)
vm_utils._add_bittorrent_params(self.image_id, self.bt_params)
vm_utils._fetch_using_dom0_plugin_with_retry(self.context,
self.session, self.image_id, "bittorrent", self.bt_params,
callback=None).AndReturn(self.vdis)
self._setup_bittorrent()
vm_utils.safe_find_sr(self.session).AndReturn("sr")
vm_utils._scan_sr(self.session, "sr")
@@ -300,15 +303,8 @@ class FetchVhdImageTestCase(test.TestCase):
self.mox.VerifyAll()
def test_fetch_vhd_image_cleans_up_vdi_on_fail(self):
self._apply_stubouts()
self._common_params_setup(True)
self.mox.StubOutWithMock(self.session, 'call_xenapi')
vm_utils._add_bittorrent_params(self.image_id, self.bt_params)
vm_utils._fetch_using_dom0_plugin_with_retry(self.context,
self.session, self.image_id, "bittorrent", self.bt_params,
callback=None).AndReturn(self.vdis)
self._setup_bittorrent()
vm_utils.safe_find_sr(self.session).AndReturn("sr")
vm_utils._scan_sr(self.session, "sr")

View File

@@ -39,6 +39,7 @@ A driver for XenServer or Xen Cloud Platform.
import contextlib
import cPickle as pickle
import time
import urlparse
import xmlrpclib
@@ -742,6 +743,33 @@ class XenAPISession(object):
rv = self.call_plugin(plugin, fn, params)
return pickle.loads(rv)
def call_plugin_serialized_with_retry(self, plugin, fn, num_retries,
callback, *args, **kwargs):
"""Allows a plugin to raise RetryableError so we can try again."""
attempts = num_retries + 1
sleep_time = 0.5
for attempt in xrange(1, attempts + 1):
LOG.info(_('%(plugin)s.%(fn)s attempt %(attempt)d/%(attempts)d'),
{'plugin': plugin, 'fn': fn, 'attempt': attempt,
'attempts': attempts})
try:
if callback:
callback(kwargs)
return self.call_plugin_serialized(plugin, fn, *args, **kwargs)
except self.XenAPI.Failure as exc:
_type, _method, error = exc.details[:3]
if error == 'RetryableError':
LOG.error(_('%(plugin)s.%(fn)s failed: %(details)r') %
{'plugin': plugin, 'fn': fn,
'details': exc.details[3:]})
else:
raise
time.sleep(sleep_time)
sleep_time = min(2 * sleep_time, 15)
raise exception.PluginRetriesExceeded(num_retries=num_retries)
def _create_session(self, url):
"""Stubout point. This can be replaced with a mock session."""
return self.XenAPI.Session(url)

View File

@@ -13,83 +13,59 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo.config import cfg
from nova import exception
from nova.image import glance
from nova.openstack.common.gettextutils import _
import nova.openstack.common.log as logging
from nova.virt.xenapi import agent
from nova.virt.xenapi import vm_utils
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_opt('glance_num_retries', 'nova.image.glance')
class GlanceStore(object):
def _call_glance_plugin(self, session, fn, params):
glance_api_servers = glance.get_api_servers()
def pick_glance(kwargs):
g_host, g_port, g_use_ssl = glance_api_servers.next()
kwargs['glance_host'] = g_host
kwargs['glance_port'] = g_port
kwargs['glance_use_ssl'] = g_use_ssl
return session.call_plugin_serialized_with_retry(
'glance', fn, CONF.glance_num_retries, pick_glance, **params)
def _make_params(self, context, session, image_id):
return {'image_id': image_id,
'sr_path': vm_utils.get_sr_path(session),
'auth_token': getattr(context, 'auth_token', None)}
def download_image(self, context, session, instance, image_id):
params = self._make_params(context, session, image_id)
params['uuid_stack'] = vm_utils._make_uuid_stack()
try:
vdis = self._call_glance_plugin(session, 'download_vhd', params)
except exception.PluginRetriesExceeded:
raise exception.CouldNotFetchImage(image_id=image_id)
return vdis
def upload_image(self, context, session, instance, vdi_uuids, image_id):
"""Requests that the Glance plugin bundle the specified VDIs and
push them into Glance using the specified human-friendly name.
"""
# NOTE(sirp): Currently we only support uploading images as VHD, there
# is no RAW equivalent (yet)
max_attempts = CONF.glance_num_retries + 1
sleep_time = 0.5
glance_api_servers = glance.get_api_servers()
properties = {
'auto_disk_config': instance['auto_disk_config'],
'os_type': instance['os_type'] or CONF.default_os_type,
}
params = self._make_params(context, session, image_id)
params['vdi_uuids'] = vdi_uuids
if agent.USE_AGENT_SM_KEY in instance["system_metadata"]:
properties[agent.USE_AGENT_KEY] = \
instance["system_metadata"][agent.USE_AGENT_SM_KEY]
props = params['properties'] = {}
props['auto_disk_config'] = instance['auto_disk_config']
props['os_type'] = instance['os_type'] or CONF.default_os_type
for attempt_num in xrange(1, max_attempts + 1):
sys_meta = instance["system_metadata"]
if agent.USE_AGENT_SM_KEY in sys_meta:
props[agent.USE_AGENT_KEY] = sys_meta[agent.USE_AGENT_SM_KEY]
(glance_host,
glance_port,
glance_use_ssl) = glance_api_servers.next()
try:
params = {'vdi_uuids': vdi_uuids,
'image_id': image_id,
'glance_host': glance_host,
'glance_port': glance_port,
'glance_use_ssl': glance_use_ssl,
'sr_path': vm_utils.get_sr_path(session),
'auth_token': getattr(context, 'auth_token', None),
'properties': properties}
LOG.debug(_("Asking xapi to upload to glance %(vdi_uuids)s as"
" ID %(image_id)s"
" glance server: %(glance_host)s:%(glance_port)d"
" attempt %(attempt_num)d/%(max_attempts)d"),
{'vdi_uuids': vdi_uuids,
'image_id': image_id,
'glance_host': glance_host,
'glance_port': glance_port,
'attempt_num': attempt_num,
'max_attempts': max_attempts}, instance=instance)
return session.call_plugin_serialized('glance',
'upload_vhd',
**params)
except session.XenAPI.Failure as exc:
_type, _method, error = exc.details[:3]
if error == 'RetryableError':
LOG.error(_('upload_vhd failed: %r') %
(exc.details[3:],))
else:
raise
time.sleep(sleep_time)
sleep_time = min(2 * sleep_time, 15)
raise exception.CouldNotUploadImage(image_id=image_id)
try:
self._call_glance_plugin(session, 'upload_vhd', params)
except exception.PluginRetriesExceeded:
raise exception.CouldNotUploadImage(image_id=image_id)

View File

@@ -44,6 +44,7 @@ from nova import exception
from nova.image import glance
from nova.openstack.common import excutils
from nova.openstack.common.gettextutils import _
from nova.openstack.common import importutils
from nova.openstack.common import log as logging
from nova.openstack.common import processutils
from nova.openstack.common import strutils
@@ -1107,36 +1108,6 @@ def _fetch_image(context, session, instance, name_label, image_id, image_type):
return vdis
def _fetch_using_dom0_plugin_with_retry(context, session, image_id,
plugin_name, params, callback=None):
max_attempts = CONF.glance_num_retries + 1
sleep_time = 0.5
for attempt_num in xrange(1, max_attempts + 1):
LOG.info(_('download_vhd %(image_id)s, attempt '
'%(attempt_num)d/%(max_attempts)d, params: %(params)s'),
{'image_id': image_id, 'attempt_num': attempt_num,
'max_attempts': max_attempts, 'params': params})
try:
if callback:
callback(params)
return session.call_plugin_serialized(
plugin_name, 'download_vhd', **params)
except session.XenAPI.Failure as exc:
_type, _method, error = exc.details[:3]
if error == 'RetryableError':
LOG.error(_('download_vhd failed: %r') %
(exc.details[3:],))
else:
raise
time.sleep(sleep_time)
sleep_time = min(2 * sleep_time, 15)
raise exception.CouldNotFetchImage(image_id=image_id)
def _make_uuid_stack():
# NOTE(sirp): The XenAPI plugins run under Python 2.4
# which does not have the `uuid` module. To work around this,
@@ -1175,23 +1146,26 @@ def _fetch_vhd_image(context, session, instance, image_id):
LOG.debug(_("Asking xapi to fetch vhd image %s"), image_id,
instance=instance)
params = {'image_id': image_id,
'uuid_stack': _make_uuid_stack(),
'sr_path': get_sr_path(session)}
params = {}
if (_image_uses_bittorrent(context, instance) and
_add_torrent_url(instance, image_id, params)):
plugin_name = 'bittorrent'
callback = None
params.update({'image_id': image_id,
'uuid_stack': _make_uuid_stack(),
'sr_path': get_sr_path(session)})
_add_bittorrent_params(image_id, params)
else:
plugin_name = 'glance'
callback = _generate_glance_callback(context)
try:
vdis = session.call_plugin_serialized_with_retry(
'bittorrent', 'download_vhd', CONF.glance_num_retries,
None, **params)
except exception.PluginRetriesExceeded:
raise exception.CouldNotFetchImage(image_id=image_id)
vdis = _fetch_using_dom0_plugin_with_retry(
context, session, image_id, plugin_name, params,
callback=callback)
else:
download_handler = importutils.import_object(
'nova.virt.xenapi.image.glance.GlanceStore')
vdis = download_handler.download_image(
context, session, instance, image_id)
sr_ref = safe_find_sr(session)
_scan_sr(session, sr_ref)
@@ -1209,19 +1183,6 @@ def _fetch_vhd_image(context, session, instance, image_id):
return vdis
def _generate_glance_callback(context):
glance_api_servers = glance.get_api_servers()
def pick_glance(params):
g_host, g_port, g_use_ssl = glance_api_servers.next()
params['glance_host'] = g_host
params['glance_port'] = g_port
params['glance_use_ssl'] = g_use_ssl
params['auth_token'] = getattr(context, 'auth_token', None)
return pick_glance
_TORRENT_URL_FN = None # driver function to determine torrent URL to use