Merge "xenapi: move session into new client module"

This commit is contained in:
Jenkins
2013-11-26 05:10:06 +00:00
committed by Gerrit Code Review
8 changed files with 345 additions and 312 deletions

View File

@@ -2902,6 +2902,18 @@
#use_agent_default=false
#
# Options defined in nova.virt.xenapi.client.session
#
# Timeout in seconds for XenAPI login. (integer value)
#login_timeout=10
# Maximum number of concurrent XenAPI connections. Used only
# if compute_driver=xenapi.XenAPIDriver (integer value)
#connection_concurrent=5
#
# Options defined in nova.virt.xenapi.driver
#
@@ -2922,10 +2934,6 @@
# value)
#connection_password=<None>
# Maximum number of concurrent XenAPI connections. Used only
# if compute_driver=xenapi.XenAPIDriver (integer value)
#connection_concurrent=5
# The interval used for polling of coalescing vhds. Used only
# if compute_driver=xenapi.XenAPIDriver (floating point value)
#vhd_coalesce_poll_interval=5.0
@@ -2958,9 +2966,6 @@
# /dev/sdb) (string value)
#remap_vbd_dev_prefix=sd
# Timeout in seconds for XenAPI login. (integer value)
#login_timeout=10
#
# Options defined in nova.virt.xenapi.image.bittorrent

View File

@@ -21,7 +21,7 @@ import tarfile
from nova.image import glance
from nova import test
from nova.virt.xenapi import driver
from nova.virt.xenapi.client import session as xenapi_session
from nova.virt.xenapi.image import vdi_through_dev
@@ -102,7 +102,7 @@ class TestUploadToGlanceAsRawTgz(test.NoDBTestCase):
store._perform_upload('devpath')
def test__get_vdi_ref(self):
session = self.mox.CreateMock(driver.XenAPISession)
session = self.mox.CreateMock(xenapi_session.XenAPISession)
store = vdi_through_dev.UploadToGlanceAsRawTgz(
'context', session, 'instance', ['vdi0', 'vdi1'], 'id')
session.call_xenapi('VDI.get_by_uuid', 'vdi0').AndReturn('vdi_ref')
@@ -112,7 +112,7 @@ class TestUploadToGlanceAsRawTgz(test.NoDBTestCase):
self.assertEqual('vdi_ref', store._get_vdi_ref())
def test__get_virtual_size(self):
session = self.mox.CreateMock(driver.XenAPISession)
session = self.mox.CreateMock(xenapi_session.XenAPISession)
store = vdi_through_dev.UploadToGlanceAsRawTgz(
'context', session, 'instance', ['vdi0', 'vdi1'], 'id')
self.mox.StubOutWithMock(store, '_get_vdi_ref')

View File

@@ -21,7 +21,7 @@ import random
from nova.openstack.common import jsonutils
from nova import test
import nova.tests.image.fake
from nova.virt.xenapi import driver as xenapi_conn
from nova.virt.xenapi.client import session
from nova.virt.xenapi import fake
from nova.virt.xenapi import vm_utils
from nova.virt.xenapi import vmops
@@ -55,9 +55,9 @@ def stubout_instance_snapshot(stubs):
def stubout_session(stubs, cls, product_version=(5, 6, 2),
product_brand='XenServer', **opt_args):
"""Stubs out methods from XenAPISession."""
stubs.Set(xenapi_conn.XenAPISession, '_create_session',
stubs.Set(session.XenAPISession, '_create_session',
lambda s, url: cls(url, **opt_args))
stubs.Set(xenapi_conn.XenAPISession, '_get_product_version_and_brand',
stubs.Set(session.XenAPISession, '_get_product_version_and_brand',
lambda s: (product_version, product_brand))

View File

@@ -25,7 +25,7 @@ from nova import test
from nova.tests.virt.xenapi import stubs
from nova.virt import fake
from nova.virt.xenapi import agent as xenapi_agent
from nova.virt.xenapi import driver as xenapi_conn
from nova.virt.xenapi.client import session as xenapi_session
from nova.virt.xenapi import fake as xenapi_fake
from nova.virt.xenapi import vm_utils
from nova.virt.xenapi import vmops
@@ -39,9 +39,8 @@ class VMOpsTestBase(stubs.XenAPITestBaseNoDB):
def _setup_mock_vmops(self, product_brand=None, product_version=None):
stubs.stubout_session(self.stubs, xenapi_fake.SessionBase)
self._session = xenapi_conn.XenAPISession('test_url', 'root',
'test_pass',
fake.FakeVirtAPI())
self._session = xenapi_session.XenAPISession('test_url', 'root',
'test_pass')
self.vmops = vmops.VMOps(self._session, fake.FakeVirtAPI())
def create_vm(self, name, state="running"):

View File

@@ -54,6 +54,7 @@ from nova.tests.objects import test_aggregate
from nova.tests.virt.xenapi import stubs
from nova.virt import fake
from nova.virt.xenapi import agent
from nova.virt.xenapi.client import session as xenapi_session
from nova.virt.xenapi import driver as xenapi_conn
from nova.virt.xenapi import fake as xenapi_fake
from nova.virt.xenapi import host
@@ -72,6 +73,8 @@ CONF.import_opt('network_manager', 'nova.service')
CONF.import_opt('compute_driver', 'nova.virt.driver')
CONF.import_opt('host', 'nova.netconf')
CONF.import_opt('default_availability_zone', 'nova.availability_zones')
CONF.import_opt('login_timeout', 'nova.virt.xenapi.client.session',
group="xenserver")
IMAGE_MACHINE = '1'
IMAGE_KERNEL = '2'
@@ -121,6 +124,10 @@ IMAGE_FIXTURES = {
}
def get_session():
return xenapi_session.XenAPISession('test_url', 'root', 'test_pass')
def set_image_fixtures():
image_service = fake_image.FakeImageService()
image_service.images.clear()
@@ -333,8 +340,7 @@ class XenAPIVMTestCase(stubs.XenAPITestBase):
super(XenAPIVMTestCase, self).tearDown()
def test_init_host(self):
session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass',
fake.FakeVirtAPI())
session = get_session()
vm = vm_utils._get_this_vm_ref(session)
# Local root disk
vdi0 = xenapi_fake.create_vdi('compute', None)
@@ -416,8 +422,7 @@ class XenAPIVMTestCase(stubs.XenAPITestBase):
def test_get_vnc_console(self):
instance = self._create_instance()
session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass',
fake.FakeVirtAPI())
session = get_session()
conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False)
vm_ref = vm_utils.lookup(session, instance['name'])
@@ -432,8 +437,7 @@ class XenAPIVMTestCase(stubs.XenAPITestBase):
def test_get_vnc_console_for_rescue(self):
instance = self._create_instance()
session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass',
fake.FakeVirtAPI())
session = get_session()
conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False)
rescue_vm = xenapi_fake.create_vm(instance['name'] + '-rescue',
'Running')
@@ -658,19 +662,11 @@ class XenAPIVMTestCase(stubs.XenAPITestBase):
self.assertEqual(self.vm['HVM_boot_policy'], '')
def _list_vdis(self):
url = CONF.xenserver.connection_url
username = CONF.xenserver.connection_username
password = CONF.xenserver.connection_password
session = xenapi_conn.XenAPISession(url, username, password,
fake.FakeVirtAPI())
session = get_session()
return session.call_xenapi('VDI.get_all')
def _list_vms(self):
url = CONF.xenserver.connection_url
username = CONF.xenserver.connection_username
password = CONF.xenserver.connection_password
session = xenapi_conn.XenAPISession(url, username, password,
fake.FakeVirtAPI())
session = get_session()
return session.call_xenapi('VM.get_all')
def _check_vdis(self, start_list, end_list):
@@ -1231,8 +1227,7 @@ class XenAPIVMTestCase(stubs.XenAPITestBase):
def test_rescue(self):
instance = self._create_instance()
session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass',
fake.FakeVirtAPI())
session = get_session()
vm_ref = vm_utils.lookup(session, instance['name'])
swap_vdi_ref = xenapi_fake.create_vdi('swap', None)
@@ -1260,8 +1255,7 @@ class XenAPIVMTestCase(stubs.XenAPITestBase):
# test that the original disk is preserved if rescue setup fails
# bug #1227898
instance = self._create_instance()
session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass',
fake.FakeVirtAPI())
session = get_session()
image_meta = {'id': IMAGE_VHD,
'disk_format': 'vhd'}
@@ -1337,8 +1331,7 @@ class XenAPIVMTestCase(stubs.XenAPITestBase):
conn.reboot(self.context, instance, None, "SOFT")
def test_reboot_halted(self):
session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass',
fake.FakeVirtAPI())
session = get_session()
instance = self._create_instance(spawn=False)
conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False)
xenapi_fake.create_vm(instance['name'], 'Halted')
@@ -2227,8 +2220,7 @@ class XenAPIAutoDiskConfigTestCase(stubs.XenAPITestBase):
fake_resize_part_and_fs)
ctx = context.RequestContext(self.user_id, self.project_id)
session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass',
fake.FakeVirtAPI())
session = get_session()
disk_image_type = vm_utils.ImageType.DISK_VHD
instance = create_instance_with_system_metadata(self.context,
@@ -2320,8 +2312,7 @@ class XenAPIGenerateLocal(stubs.XenAPITestBase):
def assertCalled(self, instance,
disk_image_type=vm_utils.ImageType.DISK_VHD):
ctx = context.RequestContext(self.user_id, self.project_id)
session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass',
fake.FakeVirtAPI())
session = get_session()
vm_ref = xenapi_fake.create_vm(instance['name'], 'Halted')
vdi_ref = xenapi_fake.create_vdi(instance['name'], 'fake')
@@ -2799,8 +2790,7 @@ class XenAPISRSelectionTestCase(stubs.XenAPITestBaseNoDB):
# Ensure StorageRepositoryNotFound is raise when wrong filter.
self.flags(sr_matching_filter='yadayadayada', group='xenserver')
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass',
fake.FakeVirtAPI())
session = get_session()
self.assertRaises(exception.StorageRepositoryNotFound,
vm_utils.safe_find_sr, session)
@@ -2809,8 +2799,7 @@ class XenAPISRSelectionTestCase(stubs.XenAPITestBaseNoDB):
self.flags(sr_matching_filter='other-config:i18n-key=local-storage',
group='xenserver')
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass',
fake.FakeVirtAPI())
session = get_session()
# This test is only guaranteed if there is one host in the pool
self.assertEqual(len(xenapi_fake.get_all('host')), 1)
host_ref = xenapi_fake.get_all('host')[0]
@@ -2830,8 +2819,7 @@ class XenAPISRSelectionTestCase(stubs.XenAPITestBaseNoDB):
self.flags(sr_matching_filter='other-config:my_fake_sr=true',
group='xenserver')
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass',
fake.FakeVirtAPI())
session = get_session()
host_ref = xenapi_fake.get_all('host')[0]
local_sr = xenapi_fake.create_sr(name_label='Fake Storage',
type='lvm',
@@ -2845,8 +2833,7 @@ class XenAPISRSelectionTestCase(stubs.XenAPITestBaseNoDB):
self.flags(sr_matching_filter='default-sr:true',
group='xenserver')
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass',
fake.FakeVirtAPI())
session = get_session()
pool_ref = session.call_xenapi('pool.get_all')[0]
expected = vm_utils.safe_find_sr(session)
self.assertEqual(session.call_xenapi('pool.get_default_SR', pool_ref),
@@ -3894,7 +3881,7 @@ class XenAPIInjectMetadataTestCase(stubs.XenAPITestBaseNoDB):
class XenAPISessionTestCase(test.NoDBTestCase):
def _get_mock_xapisession(self, software_version):
class MockXapiSession(xenapi_conn.XenAPISession):
class MockXapiSession(xenapi_session.XenAPISession):
def __init__(_ignore):
"Skip the superclass's dirty init"

View File

@@ -0,0 +1,13 @@
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@@ -0,0 +1,287 @@
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import cPickle as pickle
import time
import xmlrpclib
from eventlet import queue
from eventlet import timeout
from oslo.config import cfg
from nova import context
from nova import exception
from nova.objects import aggregate as aggregate_obj
from nova.openstack.common.gettextutils import _
from nova.openstack.common import log as logging
from nova.openstack.common import versionutils
from nova import utils
from nova.virt.xenapi import pool
from nova.virt.xenapi import pool_states
LOG = logging.getLogger(__name__)
xenapi_session_opts = [
cfg.IntOpt('login_timeout',
default=10,
deprecated_name='xenapi_login_timeout',
deprecated_group='DEFAULT',
help='Timeout in seconds for XenAPI login.'),
cfg.IntOpt('connection_concurrent',
default=5,
deprecated_name='xenapi_connection_concurrent',
deprecated_group='DEFAULT',
help='Maximum number of concurrent XenAPI connections. '
'Used only if compute_driver=xenapi.XenAPIDriver'),
]
CONF = cfg.CONF
CONF.register_opts(xenapi_session_opts, 'xenserver')
CONF.import_opt('host', 'nova.netconf')
class XenAPISession(object):
"""The session to invoke XenAPI SDK calls."""
# This is not a config option as it should only ever be
# changed in development environments.
# MAJOR VERSION: Incompatible changes with the plugins
# MINOR VERSION: Compatible changes, new plguins, etc
PLUGIN_REQUIRED_VERSION = '1.0'
def __init__(self, url, user, pw):
import XenAPI
self.XenAPI = XenAPI
self._sessions = queue.Queue()
self.is_slave = False
exception = self.XenAPI.Failure(_("Unable to log in to XenAPI "
"(is the Dom0 disk full?)"))
url = self._create_first_session(url, user, pw, exception)
self._populate_session_pool(url, user, pw, exception)
self.host_uuid = self._get_host_uuid()
self.product_version, self.product_brand = \
self._get_product_version_and_brand()
self._verify_plugin_version()
def _verify_plugin_version(self):
requested_version = self.PLUGIN_REQUIRED_VERSION
current_version = self.call_plugin_serialized(
'nova_plugin_version', 'get_version')
if not versionutils.is_compatible(requested_version, current_version):
raise self.XenAPI.Failure(
_("Plugin version mismatch (Expected %(exp)s, got %(got)s)") %
{'exp': requested_version, 'got': current_version})
def _create_first_session(self, url, user, pw, exception):
try:
session = self._create_session(url)
with timeout.Timeout(CONF.xenserver.login_timeout, exception):
session.login_with_password(user, pw)
except self.XenAPI.Failure as e:
# if user and pw of the master are different, we're doomed!
if e.details[0] == 'HOST_IS_SLAVE':
master = e.details[1]
url = pool.swap_xapi_host(url, master)
session = self.XenAPI.Session(url)
session.login_with_password(user, pw)
self.is_slave = True
else:
raise
self._sessions.put(session)
return url
def _populate_session_pool(self, url, user, pw, exception):
for i in xrange(CONF.xenserver.connection_concurrent - 1):
session = self._create_session(url)
with timeout.Timeout(CONF.xenserver.login_timeout, exception):
session.login_with_password(user, pw)
self._sessions.put(session)
def _get_host_uuid(self):
if self.is_slave:
aggr = aggregate_obj.AggregateList.get_by_host(
context.get_admin_context(),
CONF.host, key=pool_states.POOL_FLAG)[0]
if not aggr:
LOG.error(_('Host is member of a pool, but DB '
'says otherwise'))
raise exception.AggregateHostNotFound()
return aggr.metadetails[CONF.host]
else:
with self._get_session() as session:
host_ref = session.xenapi.session.get_this_host(session.handle)
return session.xenapi.host.get_uuid(host_ref)
def _get_product_version_and_brand(self):
"""Return a tuple of (major, minor, rev) for the host version and
a string of the product brand.
"""
software_version = self._get_software_version()
product_version_str = software_version.get('product_version')
# Product version is only set in some cases (e.g. XCP, XenServer) and
# not in others (e.g. xenserver-core, XAPI-XCP).
# In these cases, the platform version is the best number to use.
if product_version_str is None:
product_version_str = software_version.get('platform_version',
'0.0.0')
product_brand = software_version.get('product_brand')
product_version = utils.convert_version_to_tuple(product_version_str)
return product_version, product_brand
def _get_software_version(self):
host = self.get_xenapi_host()
return self.call_xenapi('host.get_software_version', host)
def get_session_id(self):
"""Return a string session_id. Used for vnc consoles."""
with self._get_session() as session:
return str(session._session)
@contextlib.contextmanager
def _get_session(self):
"""Return exclusive session for scope of with statement."""
session = self._sessions.get()
try:
yield session
finally:
self._sessions.put(session)
def get_xenapi_host(self):
"""Return the xenapi host on which nova-compute runs on."""
with self._get_session() as session:
return session.xenapi.host.get_by_uuid(self.host_uuid)
def call_xenapi(self, method, *args):
"""Call the specified XenAPI method on a background thread."""
with self._get_session() as session:
return session.xenapi_request(method, args)
def call_plugin(self, plugin, fn, args):
"""Call host.call_plugin on a background thread."""
# NOTE(johannes): Fetch host before we acquire a session. Since
# get_xenapi_host() acquires a session too, it can result in a
# deadlock if multiple greenthreads race with each other. See
# bug 924918
host = self.get_xenapi_host()
# NOTE(armando): pass the host uuid along with the args so that
# the plugin gets executed on the right host when using XS pools
args['host_uuid'] = self.host_uuid
with self._get_session() as session:
return self._unwrap_plugin_exceptions(
session.xenapi.host.call_plugin,
host, plugin, fn, args)
def call_plugin_serialized(self, plugin, fn, *args, **kwargs):
params = {'params': pickle.dumps(dict(args=args, kwargs=kwargs))}
rv = self.call_plugin(plugin, fn, params)
return pickle.loads(rv)
def call_plugin_serialized_with_retry(self, plugin, fn, num_retries,
callback, *args, **kwargs):
"""Allows a plugin to raise RetryableError so we can try again."""
attempts = num_retries + 1
sleep_time = 0.5
for attempt in xrange(1, attempts + 1):
LOG.info(_('%(plugin)s.%(fn)s attempt %(attempt)d/%(attempts)d'),
{'plugin': plugin, 'fn': fn, 'attempt': attempt,
'attempts': attempts})
try:
if attempt > 1:
time.sleep(sleep_time)
sleep_time = min(2 * sleep_time, 15)
if callback:
callback(kwargs)
return self.call_plugin_serialized(plugin, fn, *args, **kwargs)
except self.XenAPI.Failure as exc:
if self._is_retryable_exception(exc):
LOG.warn(_('%(plugin)s.%(fn)s failed. Retrying call.')
% {'plugin': plugin, 'fn': fn})
else:
raise
raise exception.PluginRetriesExceeded(num_retries=num_retries)
def _is_retryable_exception(self, exc):
_type, method, error = exc.details[:3]
if error == 'RetryableError':
LOG.debug(_("RetryableError, so retrying upload_vhd"),
exc_info=True)
return True
elif "signal" in method:
LOG.debug(_("Error due to a signal, retrying upload_vhd"),
exc_info=True)
return True
else:
return False
def _create_session(self, url):
"""Stubout point. This can be replaced with a mock session."""
self.is_local_connection = url == "unix://local"
if self.is_local_connection:
return self.XenAPI.xapi_local()
return self.XenAPI.Session(url)
def _unwrap_plugin_exceptions(self, func, *args, **kwargs):
"""Parse exception details."""
try:
return func(*args, **kwargs)
except self.XenAPI.Failure as exc:
LOG.debug(_("Got exception: %s"), exc)
if (len(exc.details) == 4 and
exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
exc.details[2] == 'Failure'):
params = None
try:
# FIXME(comstud): eval is evil.
params = eval(exc.details[3])
except Exception:
raise exc
raise self.XenAPI.Failure(params)
else:
raise
except xmlrpclib.ProtocolError as exc:
LOG.debug(_("Got exception: %s"), exc)
raise
def get_rec(self, record_type, ref):
try:
return self.call_xenapi('%s.get_record' % record_type, ref)
except self.XenAPI.Failure as e:
if e.details[0] != 'HANDLE_INVALID':
raise
return None
def get_all_refs_and_recs(self, record_type):
"""Retrieve all refs and recs for a Xen record type.
Handles race-conditions where the record may be deleted between
the `get_all` call and the `get_record` call.
"""
for ref in self.call_xenapi('%s.get_all' % record_type):
rec = self.get_rec(record_type, ref)
# Check to make sure the record still exists. It may have
# been deleted between the get_all call and get_record call
if rec:
yield ref, rec

View File

@@ -1,4 +1,3 @@
# Copyright (c) 2010 Citrix Systems, Inc.
# Copyright 2010 OpenStack Foundation
#
@@ -36,30 +35,20 @@ A driver for XenServer or Xen Cloud Platform.
- suffix "_rec" for record objects
"""
import contextlib
import cPickle as pickle
import math
import time
import urlparse
import xmlrpclib
from eventlet import queue
from eventlet import timeout
from oslo.config import cfg
from nova import context
from nova import exception
from nova.objects import aggregate as aggregate_obj
from nova.openstack.common.gettextutils import _
from nova.openstack.common import jsonutils
from nova.openstack.common import log as logging
from nova.openstack.common import versionutils
from nova import unit
from nova import utils
from nova.virt import driver
from nova.virt.xenapi.client import session
from nova.virt.xenapi import host
from nova.virt.xenapi import pool
from nova.virt.xenapi import pool_states
from nova.virt.xenapi import vm_utils
from nova.virt.xenapi import vmops
from nova.virt.xenapi import volumeops
@@ -86,12 +75,6 @@ xenapi_opts = [
help='Password for connection to XenServer/Xen Cloud Platform. '
'Used only if compute_driver=xenapi.XenAPIDriver',
secret=True),
cfg.IntOpt('connection_concurrent',
default=5,
deprecated_name='xenapi_connection_concurrent',
deprecated_group='DEFAULT',
help='Maximum number of concurrent XenAPI connections. '
'Used only if compute_driver=xenapi.XenAPIDriver'),
cfg.FloatOpt('vhd_coalesce_poll_interval',
default=5.0,
deprecated_name='xenapi_vhd_coalesce_poll_interval',
@@ -143,11 +126,6 @@ xenapi_opts = [
deprecated_group='DEFAULT',
help='Specify prefix to remap VBD dev to '
'(ex. /dev/xvdb -> /dev/sdb)'),
cfg.IntOpt('login_timeout',
default=10,
deprecated_name='xenapi_login_timeout',
deprecated_group='DEFAULT',
help='Timeout in seconds for XenAPI login.'),
]
CONF = cfg.CONF
@@ -170,7 +148,7 @@ class XenAPIDriver(driver.ComputeDriver):
'connection_password to use '
'compute_driver=xenapi.XenAPIDriver'))
self._session = XenAPISession(url, username, password, self.virtapi)
self._session = session.XenAPISession(url, username, password)
self._volumeops = volumeops.VolumeOps(self._session)
self._host_state = None
self._host = host.Host(self._session, self.virtapi)
@@ -691,239 +669,3 @@ class XenAPIDriver(driver.ComputeDriver):
info
"""
return self._vmops.get_per_instance_usage()
class XenAPISession(object):
"""The session to invoke XenAPI SDK calls."""
# This is not a config option as it should only ever be
# changed in development environments.
# MAJOR VERSION: Incompatible changes with the plugins
# MINOR VERSION: Compatible changes, new plguins, etc
PLUGIN_REQUIRED_VERSION = '1.0'
def __init__(self, url, user, pw, virtapi):
import XenAPI
self.XenAPI = XenAPI
self._sessions = queue.Queue()
self.is_slave = False
exception = self.XenAPI.Failure(_("Unable to log in to XenAPI "
"(is the Dom0 disk full?)"))
url = self._create_first_session(url, user, pw, exception)
self._populate_session_pool(url, user, pw, exception)
self.host_uuid = self._get_host_uuid()
self.product_version, self.product_brand = \
self._get_product_version_and_brand()
self._virtapi = virtapi
self._verify_plugin_version()
def _verify_plugin_version(self):
requested_version = self.PLUGIN_REQUIRED_VERSION
current_version = self.call_plugin_serialized(
'nova_plugin_version', 'get_version')
if not versionutils.is_compatible(requested_version, current_version):
raise self.XenAPI.Failure(
_("Plugin version mismatch (Expected %(exp)s, got %(got)s)") %
{'exp': requested_version, 'got': current_version})
def _create_first_session(self, url, user, pw, exception):
try:
session = self._create_session(url)
with timeout.Timeout(CONF.xenserver.login_timeout, exception):
session.login_with_password(user, pw)
except self.XenAPI.Failure as e:
# if user and pw of the master are different, we're doomed!
if e.details[0] == 'HOST_IS_SLAVE':
master = e.details[1]
url = pool.swap_xapi_host(url, master)
session = self.XenAPI.Session(url)
session.login_with_password(user, pw)
self.is_slave = True
else:
raise
self._sessions.put(session)
return url
def _populate_session_pool(self, url, user, pw, exception):
for i in xrange(CONF.xenserver.connection_concurrent - 1):
session = self._create_session(url)
with timeout.Timeout(CONF.xenserver.login_timeout, exception):
session.login_with_password(user, pw)
self._sessions.put(session)
def _get_host_uuid(self):
if self.is_slave:
aggr = aggregate_obj.AggregateList.get_by_host(
context.get_admin_context(),
CONF.host, key=pool_states.POOL_FLAG)[0]
if not aggr:
LOG.error(_('Host is member of a pool, but DB '
'says otherwise'))
raise exception.AggregateHostNotFound()
return aggr.metadetails[CONF.host]
else:
with self._get_session() as session:
host_ref = session.xenapi.session.get_this_host(session.handle)
return session.xenapi.host.get_uuid(host_ref)
def _get_product_version_and_brand(self):
"""Return a tuple of (major, minor, rev) for the host version and
a string of the product brand.
"""
software_version = self._get_software_version()
product_version_str = software_version.get('product_version')
# Product version is only set in some cases (e.g. XCP, XenServer) and
# not in others (e.g. xenserver-core, XAPI-XCP).
# In these cases, the platform version is the best number to use.
if product_version_str is None:
product_version_str = software_version.get('platform_version',
'0.0.0')
product_brand = software_version.get('product_brand')
product_version = utils.convert_version_to_tuple(product_version_str)
return product_version, product_brand
def _get_software_version(self):
host = self.get_xenapi_host()
return self.call_xenapi('host.get_software_version', host)
def get_session_id(self):
"""Return a string session_id. Used for vnc consoles."""
with self._get_session() as session:
return str(session._session)
@contextlib.contextmanager
def _get_session(self):
"""Return exclusive session for scope of with statement."""
session = self._sessions.get()
try:
yield session
finally:
self._sessions.put(session)
def get_xenapi_host(self):
"""Return the xenapi host on which nova-compute runs on."""
with self._get_session() as session:
return session.xenapi.host.get_by_uuid(self.host_uuid)
def call_xenapi(self, method, *args):
"""Call the specified XenAPI method on a background thread."""
with self._get_session() as session:
return session.xenapi_request(method, args)
def call_plugin(self, plugin, fn, args):
"""Call host.call_plugin on a background thread."""
# NOTE(johannes): Fetch host before we acquire a session. Since
# get_xenapi_host() acquires a session too, it can result in a
# deadlock if multiple greenthreads race with each other. See
# bug 924918
host = self.get_xenapi_host()
# NOTE(armando): pass the host uuid along with the args so that
# the plugin gets executed on the right host when using XS pools
args['host_uuid'] = self.host_uuid
with self._get_session() as session:
return self._unwrap_plugin_exceptions(
session.xenapi.host.call_plugin,
host, plugin, fn, args)
def call_plugin_serialized(self, plugin, fn, *args, **kwargs):
params = {'params': pickle.dumps(dict(args=args, kwargs=kwargs))}
rv = self.call_plugin(plugin, fn, params)
return pickle.loads(rv)
def call_plugin_serialized_with_retry(self, plugin, fn, num_retries,
callback, *args, **kwargs):
"""Allows a plugin to raise RetryableError so we can try again."""
attempts = num_retries + 1
sleep_time = 0.5
for attempt in xrange(1, attempts + 1):
LOG.info(_('%(plugin)s.%(fn)s attempt %(attempt)d/%(attempts)d'),
{'plugin': plugin, 'fn': fn, 'attempt': attempt,
'attempts': attempts})
try:
if attempt > 1:
time.sleep(sleep_time)
sleep_time = min(2 * sleep_time, 15)
if callback:
callback(kwargs)
return self.call_plugin_serialized(plugin, fn, *args, **kwargs)
except self.XenAPI.Failure as exc:
if self._is_retryable_exception(exc):
LOG.warn(_('%(plugin)s.%(fn)s failed. Retrying call.')
% {'plugin': plugin, 'fn': fn})
else:
raise
raise exception.PluginRetriesExceeded(num_retries=num_retries)
def _is_retryable_exception(self, exc):
_type, method, error = exc.details[:3]
if error == 'RetryableError':
LOG.debug(_("RetryableError, so retrying upload_vhd"),
exc_info=True)
return True
elif "signal" in method:
LOG.debug(_("Error due to a signal, retrying upload_vhd"),
exc_info=True)
return True
else:
return False
def _create_session(self, url):
"""Stubout point. This can be replaced with a mock session."""
self.is_local_connection = url == "unix://local"
if self.is_local_connection:
return self.XenAPI.xapi_local()
return self.XenAPI.Session(url)
def _unwrap_plugin_exceptions(self, func, *args, **kwargs):
"""Parse exception details."""
try:
return func(*args, **kwargs)
except self.XenAPI.Failure as exc:
LOG.debug(_("Got exception: %s"), exc)
if (len(exc.details) == 4 and
exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
exc.details[2] == 'Failure'):
params = None
try:
# FIXME(comstud): eval is evil.
params = eval(exc.details[3])
except Exception:
raise exc
raise self.XenAPI.Failure(params)
else:
raise
except xmlrpclib.ProtocolError as exc:
LOG.debug(_("Got exception: %s"), exc)
raise
def get_rec(self, record_type, ref):
try:
return self.call_xenapi('%s.get_record' % record_type, ref)
except self.XenAPI.Failure as e:
if e.details[0] != 'HANDLE_INVALID':
raise
return None
def get_all_refs_and_recs(self, record_type):
"""Retrieve all refs and recs for a Xen record type.
Handles race-conditions where the record may be deleted between
the `get_all` call and the `get_record` call.
"""
for ref in self.call_xenapi('%s.get_all' % record_type):
rec = self.get_rec(record_type, ref)
# Check to make sure the record still exists. It may have
# been deleted between the get_all call and get_record call
if rec:
yield ref, rec