Use session with neutronclient
Use the standard session and auth plugin helpers from keystoneclient to standardize the options available for talking to neutron. This will allow improvements such as using keystone v3 authentication for the neutron user. DocImpact: Deprecating the existing auth parameters in favour of keystoneclient's session and auth plugin loading parameters. Closes-Bug: #1424462 Change-Id: I7b3b825737dde333c8d88019d814304cbefdbfc7
This commit is contained in:
@@ -18,6 +18,10 @@
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from keystoneclient import auth
|
||||
from keystoneclient.auth.identity import v2 as v2_auth
|
||||
from keystoneclient.auth import token_endpoint
|
||||
from keystoneclient import session
|
||||
from neutronclient.common import exceptions as neutron_client_exc
|
||||
from neutronclient.v2_0 import client as clientv20
|
||||
from oslo_concurrency import lockutils
|
||||
@@ -43,36 +47,50 @@ neutron_opts = [
|
||||
cfg.StrOpt('url',
|
||||
default='http://127.0.0.1:9696',
|
||||
help='URL for connecting to neutron'),
|
||||
cfg.IntOpt('url_timeout',
|
||||
default=30,
|
||||
help='Timeout value for connecting to neutron in seconds'),
|
||||
# deprecated in Kilo, may be removed in Liberty.
|
||||
cfg.StrOpt('admin_user_id',
|
||||
help='User id for connecting to neutron in admin context'),
|
||||
help='User id for connecting to neutron in admin context. '
|
||||
'DEPRECATED: specify an auth_plugin and appropriate '
|
||||
'credentials instead.'),
|
||||
# deprecated in Kilo, may be removed in Liberty.
|
||||
cfg.StrOpt('admin_username',
|
||||
help='Username for connecting to neutron in admin context'),
|
||||
help='Username for connecting to neutron in admin context '
|
||||
'DEPRECATED: specify an auth_plugin and appropriate '
|
||||
'credentials instead.'),
|
||||
# deprecated in Kilo, may be removed in Liberty.
|
||||
cfg.StrOpt('admin_password',
|
||||
help='Password for connecting to neutron in admin context',
|
||||
help='Password for connecting to neutron in admin context '
|
||||
'DEPRECATED: specify an auth_plugin and appropriate '
|
||||
'credentials instead.',
|
||||
secret=True),
|
||||
# deprecated in Kilo, may be removed in Liberty.
|
||||
cfg.StrOpt('admin_tenant_id',
|
||||
help='Tenant id for connecting to neutron in admin context'),
|
||||
help='Tenant id for connecting to neutron in admin context '
|
||||
'DEPRECATED: specify an auth_plugin and appropriate '
|
||||
'credentials instead.'),
|
||||
# deprecated in Kilo, may be removed in Liberty.
|
||||
cfg.StrOpt('admin_tenant_name',
|
||||
help='Tenant name for connecting to neutron in admin context. '
|
||||
'This option will be ignored if neutron_admin_tenant_id '
|
||||
'is set. Note that with Keystone V3 tenant names are '
|
||||
'only unique within a domain.'),
|
||||
'only unique within a domain. '
|
||||
'DEPRECATED: specify an auth_plugin and appropriate '
|
||||
'credentials instead.'),
|
||||
cfg.StrOpt('region_name',
|
||||
help='Region name for connecting to neutron in admin context'),
|
||||
# deprecated in Kilo, may be removed in Liberty.
|
||||
cfg.StrOpt('admin_auth_url',
|
||||
default='http://localhost:5000/v2.0',
|
||||
help='Authorization URL for connecting to neutron in admin '
|
||||
'context'),
|
||||
cfg.BoolOpt('api_insecure',
|
||||
default=False,
|
||||
help='If set, ignore any SSL validation issues'),
|
||||
'context. DEPRECATED: specify an auth_plugin and '
|
||||
'appropriate credentials instead.'),
|
||||
# deprecated in Kilo, may be removed in Liberty.
|
||||
cfg.StrOpt('auth_strategy',
|
||||
default='keystone',
|
||||
help='Authorization strategy for connecting to '
|
||||
'neutron in admin context'),
|
||||
help='Authorization strategy for connecting to neutron in '
|
||||
'admin context. DEPRECATED: specify an auth_plugin and '
|
||||
'appropriate credentials instead. If an auth_plugin is '
|
||||
'specified strategy will be ignored.'),
|
||||
# TODO(berrange) temporary hack until Neutron can pass over the
|
||||
# name of the OVS bridge it is configured with
|
||||
cfg.StrOpt('ovs_bridge',
|
||||
@@ -82,17 +100,29 @@ neutron_opts = [
|
||||
default=600,
|
||||
help='Number of seconds before querying neutron for'
|
||||
' extensions'),
|
||||
cfg.StrOpt('ca_certificates_file',
|
||||
help='Location of CA certificates file to use for '
|
||||
'neutron client requests.'),
|
||||
cfg.BoolOpt('allow_duplicate_networks',
|
||||
default=False,
|
||||
help='Allow an instance to have multiple vNICs attached to '
|
||||
'the same Neutron network.'),
|
||||
]
|
||||
|
||||
NEUTRON_GROUP = 'neutron'
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(neutron_opts, 'neutron')
|
||||
CONF.register_opts(neutron_opts, NEUTRON_GROUP)
|
||||
|
||||
deprecations = {'cafile': [cfg.DeprecatedOpt('ca_certificates_file',
|
||||
group=NEUTRON_GROUP)],
|
||||
'insecure': [cfg.DeprecatedOpt('api_insecure',
|
||||
group=NEUTRON_GROUP)],
|
||||
'timeout': [cfg.DeprecatedOpt('url_timeout',
|
||||
group=NEUTRON_GROUP)]}
|
||||
|
||||
session.Session.register_conf_options(CONF, NEUTRON_GROUP,
|
||||
deprecated_opts=deprecations)
|
||||
auth.register_conf_options(CONF, NEUTRON_GROUP)
|
||||
|
||||
|
||||
CONF.import_opt('default_floating_pool', 'nova.network.floating_ips')
|
||||
CONF.import_opt('flat_injected', 'nova.network.manager')
|
||||
LOG = logging.getLogger(__name__)
|
||||
@@ -100,97 +130,90 @@ LOG = logging.getLogger(__name__)
|
||||
soft_external_network_attach_authorize = extensions.soft_core_authorizer(
|
||||
'network', 'attach_external_network')
|
||||
|
||||
|
||||
class AdminTokenStore(object):
|
||||
|
||||
_instance = None
|
||||
|
||||
def __init__(self):
|
||||
self.admin_auth_token = None
|
||||
|
||||
@classmethod
|
||||
def get(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = cls()
|
||||
return cls._instance
|
||||
_SESSION = None
|
||||
_ADMIN_AUTH = None
|
||||
|
||||
|
||||
def _get_client(token=None, admin=False):
|
||||
params = {
|
||||
'endpoint_url': CONF.neutron.url,
|
||||
'timeout': CONF.neutron.url_timeout,
|
||||
'insecure': CONF.neutron.api_insecure,
|
||||
'ca_cert': CONF.neutron.ca_certificates_file,
|
||||
'auth_strategy': CONF.neutron.auth_strategy,
|
||||
'token': token,
|
||||
}
|
||||
def reset_state():
|
||||
global _ADMIN_AUTH
|
||||
global _SESSION
|
||||
|
||||
if admin:
|
||||
if CONF.neutron.admin_user_id:
|
||||
params['user_id'] = CONF.neutron.admin_user_id
|
||||
else:
|
||||
params['username'] = CONF.neutron.admin_username
|
||||
if CONF.neutron.admin_tenant_id:
|
||||
params['tenant_id'] = CONF.neutron.admin_tenant_id
|
||||
else:
|
||||
params['tenant_name'] = CONF.neutron.admin_tenant_name
|
||||
params['password'] = CONF.neutron.admin_password
|
||||
params['auth_url'] = CONF.neutron.admin_auth_url
|
||||
return clientv20.Client(**params)
|
||||
_ADMIN_AUTH = None
|
||||
_SESSION = None
|
||||
|
||||
|
||||
class ClientWrapper(clientv20.Client):
|
||||
'''A neutron client wrapper class.
|
||||
Wraps the callable methods, executes it and updates the token,
|
||||
as it might change when expires.
|
||||
'''
|
||||
def _load_auth_plugin(conf):
|
||||
auth_plugin = auth.load_from_conf_options(conf, NEUTRON_GROUP)
|
||||
|
||||
def __init__(self, base_client):
|
||||
# Expose all attributes from the base_client instance
|
||||
self.__dict__ = base_client.__dict__
|
||||
self.base_client = base_client
|
||||
if auth_plugin:
|
||||
return auth_plugin
|
||||
|
||||
def __getattribute__(self, name):
|
||||
obj = object.__getattribute__(self, name)
|
||||
if callable(obj):
|
||||
obj = object.__getattribute__(self, 'proxy')(obj)
|
||||
return obj
|
||||
if conf.neutron.auth_strategy == 'noauth':
|
||||
if not conf.neutron.url:
|
||||
message = _('For "noauth" authentication strategy, the '
|
||||
'endpoint must be specified conf.neutron.url')
|
||||
raise neutron_client_exc.Unauthorized(message=message)
|
||||
|
||||
def proxy(self, obj):
|
||||
def wrapper(*args, **kwargs):
|
||||
ret = obj(*args, **kwargs)
|
||||
new_token = self.base_client.get_auth_info()['auth_token']
|
||||
_update_token(new_token)
|
||||
return ret
|
||||
return wrapper
|
||||
# NOTE(jamielennox): This will actually send 'noauth' as the token
|
||||
# value because the plugin requires you to send something. It doesn't
|
||||
# matter as it will be ignored anyway.
|
||||
return token_endpoint.Token(conf.neutron.url, 'noauth')
|
||||
|
||||
if conf.neutron.auth_strategy in ('keystone', None):
|
||||
return v2_auth.Password(auth_url=conf.neutron.admin_auth_url,
|
||||
user_id=conf.neutron.admin_user_id,
|
||||
username=conf.neutron.admin_username,
|
||||
password=conf.neutron.admin_password,
|
||||
tenant_id=conf.neutron.admin_tenant_id,
|
||||
tenant_name=conf.neutron.admin_tenant_name)
|
||||
|
||||
def _update_token(new_token):
|
||||
with lockutils.lock('neutron_admin_auth_token_lock'):
|
||||
token_store = AdminTokenStore.get()
|
||||
token_store.admin_auth_token = new_token
|
||||
err_msg = _('Unknown auth strategy: %s') % conf.neutron.auth_strategy
|
||||
raise neutron_client_exc.Unauthorized(message=err_msg)
|
||||
|
||||
|
||||
def get_client(context, admin=False):
|
||||
# NOTE(dprince): In the case where no auth_token is present
|
||||
# we allow use of neutron admin tenant credentials if
|
||||
# it is an admin context.
|
||||
# This is to support some services (metadata API) where
|
||||
# an admin context is used without an auth token.
|
||||
# NOTE(dprince): In the case where no auth_token is present we allow use of
|
||||
# neutron admin tenant credentials if it is an admin context. This is to
|
||||
# support some services (metadata API) where an admin context is used
|
||||
# without an auth token.
|
||||
global _ADMIN_AUTH
|
||||
global _SESSION
|
||||
|
||||
auth_plugin = None
|
||||
|
||||
if not _SESSION:
|
||||
_SESSION = session.Session.load_from_conf_options(CONF, NEUTRON_GROUP)
|
||||
|
||||
if admin or (context.is_admin and not context.auth_token):
|
||||
# NOTE(jamielennox): The theory here is that we maintain one
|
||||
# authenticated admin auth globally. The plugin will authenticate
|
||||
# internally (not thread safe) and on demand so we extract a current
|
||||
# auth plugin from it (whilst locked). This may or may not require
|
||||
# reauthentication. We then use the static token plugin to issue the
|
||||
# actual request with that current token in a thread safe way.
|
||||
if not _ADMIN_AUTH:
|
||||
_ADMIN_AUTH = _load_auth_plugin(CONF)
|
||||
|
||||
with lockutils.lock('neutron_admin_auth_token_lock'):
|
||||
orig_token = AdminTokenStore.get().admin_auth_token
|
||||
client = _get_client(orig_token, admin=True)
|
||||
return ClientWrapper(client)
|
||||
# FIXME(jamielennox): We should also retrieve the endpoint from the
|
||||
# catalog here rather than relying on setting it in CONF.
|
||||
auth_token = _ADMIN_AUTH.get_token(_SESSION)
|
||||
|
||||
# We got a user token that we can use that as-is
|
||||
if context.auth_token:
|
||||
token = context.auth_token
|
||||
return _get_client(token=token)
|
||||
# FIXME(jamielennox): why aren't we using the service catalog?
|
||||
auth_plugin = token_endpoint.Token(CONF.neutron.url, auth_token)
|
||||
|
||||
# We did not get a user token and we should not be using
|
||||
# an admin token so log an error
|
||||
raise neutron_client_exc.Unauthorized()
|
||||
elif context.auth_token:
|
||||
auth_plugin = context.get_auth_plugin()
|
||||
|
||||
if not auth_plugin:
|
||||
# We did not get a user token and we should not be using
|
||||
# an admin token so log an error
|
||||
raise neutron_client_exc.Unauthorized()
|
||||
|
||||
return clientv20.Client(session=_SESSION,
|
||||
auth=auth_plugin,
|
||||
endpoint_override=CONF.neutron.url,
|
||||
region_name=CONF.neutron.region_name)
|
||||
|
||||
|
||||
class API(base_api.NetworkAPI):
|
||||
|
@@ -3459,7 +3459,7 @@ class AttachInterfacesSampleJsonTest(ServersSampleBase):
|
||||
fake_detach_interface)
|
||||
self.flags(auth_strategy=None, group='neutron')
|
||||
self.flags(url='http://anyhost/', group='neutron')
|
||||
self.flags(url_timeout=30, group='neutron')
|
||||
self.flags(timeout=30, group='neutron')
|
||||
|
||||
def generalize_subs(self, subs, vanilla_regexes):
|
||||
subs['subnet_id'] = vanilla_regexes['uuid']
|
||||
|
@@ -91,7 +91,7 @@ class AttachInterfacesSampleJsonTest(test_servers.ServersSampleBase):
|
||||
fake_detach_interface)
|
||||
self.flags(auth_strategy=None, group='neutron')
|
||||
self.flags(url='http://anyhost/', group='neutron')
|
||||
self.flags(url_timeout=30, group='neutron')
|
||||
self.flags(timeout=30, group='neutron')
|
||||
|
||||
def generalize_subs(self, subs, vanilla_regexes):
|
||||
subs['subnet_id'] = vanilla_regexes['uuid']
|
||||
|
@@ -136,7 +136,7 @@ class InterfaceAttachTestsV21(test.NoDBTestCase):
|
||||
super(InterfaceAttachTestsV21, self).setUp()
|
||||
self.flags(auth_strategy=None, group='neutron')
|
||||
self.flags(url='http://anyhost/', group='neutron')
|
||||
self.flags(url_timeout=30, group='neutron')
|
||||
self.flags(timeout=30, group='neutron')
|
||||
self.stubs.Set(network_api.API, 'show_port', fake_show_port)
|
||||
self.stubs.Set(network_api.API, 'list_ports', fake_list_ports)
|
||||
self.stubs.Set(compute_api.API, 'get', fake_get_instance)
|
||||
|
@@ -99,22 +99,23 @@ class MyComparator(mox.Comparator):
|
||||
|
||||
|
||||
class TestNeutronClient(test.NoDBTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestNeutronClient, self).setUp()
|
||||
neutronapi.reset_state()
|
||||
|
||||
def test_withtoken(self):
|
||||
self.flags(url='http://anyhost/', group='neutron')
|
||||
self.flags(url_timeout=30, group='neutron')
|
||||
self.flags(timeout=30, group='neutron')
|
||||
my_context = context.RequestContext('userid',
|
||||
'my_tenantid',
|
||||
auth_token='token')
|
||||
self.mox.StubOutWithMock(client.Client, "__init__")
|
||||
client.Client.__init__(
|
||||
auth_strategy=CONF.neutron.auth_strategy,
|
||||
endpoint_url=CONF.neutron.url,
|
||||
token=my_context.auth_token,
|
||||
timeout=CONF.neutron.url_timeout,
|
||||
insecure=False,
|
||||
ca_cert=None).AndReturn(None)
|
||||
self.mox.ReplayAll()
|
||||
neutronapi.get_client(my_context)
|
||||
cl = neutronapi.get_client(my_context)
|
||||
|
||||
self.assertEqual(CONF.neutron.url, cl.httpclient.endpoint_override)
|
||||
self.assertEqual(my_context.auth_token,
|
||||
cl.httpclient.auth.auth_token)
|
||||
self.assertEqual(CONF.neutron.timeout, cl.httpclient.session.timeout)
|
||||
|
||||
def test_withouttoken(self):
|
||||
my_context = context.RequestContext('userid', 'my_tenantid')
|
||||
@@ -124,24 +125,17 @@ class TestNeutronClient(test.NoDBTestCase):
|
||||
|
||||
def test_withtoken_context_is_admin(self):
|
||||
self.flags(url='http://anyhost/', group='neutron')
|
||||
self.flags(url_timeout=30, group='neutron')
|
||||
self.flags(timeout=30, group='neutron')
|
||||
my_context = context.RequestContext('userid',
|
||||
'my_tenantid',
|
||||
auth_token='token',
|
||||
is_admin=True)
|
||||
self.mox.StubOutWithMock(client.Client, "__init__")
|
||||
client.Client.__init__(
|
||||
auth_strategy=CONF.neutron.auth_strategy,
|
||||
endpoint_url=CONF.neutron.url,
|
||||
token=my_context.auth_token,
|
||||
timeout=CONF.neutron.url_timeout,
|
||||
insecure=False,
|
||||
ca_cert=None).AndReturn(None)
|
||||
self.mox.ReplayAll()
|
||||
# Note that although we have admin set in the context we
|
||||
# are not asking for an admin client, and so we auth with
|
||||
# our own token
|
||||
neutronapi.get_client(my_context)
|
||||
cl = neutronapi.get_client(my_context)
|
||||
|
||||
self.assertEqual(CONF.neutron.url, cl.httpclient.endpoint_override)
|
||||
self.assertEqual(my_context.auth_token,
|
||||
cl.httpclient.auth.auth_token)
|
||||
self.assertEqual(CONF.neutron.timeout, cl.httpclient.session.timeout)
|
||||
|
||||
def test_withouttoken_keystone_connection_error(self):
|
||||
self.flags(auth_strategy='keystone', group='neutron')
|
||||
@@ -151,46 +145,27 @@ class TestNeutronClient(test.NoDBTestCase):
|
||||
neutronapi.get_client,
|
||||
my_context)
|
||||
|
||||
def test_reuse_admin_token(self):
|
||||
@mock.patch('nova.network.neutronv2.api._ADMIN_AUTH')
|
||||
@mock.patch.object(client.Client, "list_networks", new=mock.Mock())
|
||||
def test_reuse_admin_token(self, m):
|
||||
self.flags(url='http://anyhost/', group='neutron')
|
||||
self.flags(url_timeout=30, group='neutron')
|
||||
token_store = neutronapi.AdminTokenStore.get()
|
||||
token_store.admin_auth_token = 'new_token'
|
||||
my_context = context.RequestContext('userid', 'my_tenantid',
|
||||
auth_token='token')
|
||||
with contextlib.nested(
|
||||
mock.patch.object(client.Client, "list_networks",
|
||||
side_effect=mock.Mock),
|
||||
mock.patch.object(client.Client, 'get_auth_info',
|
||||
return_value={'auth_token': 'new_token1'}),
|
||||
):
|
||||
client1 = neutronapi.get_client(my_context, True)
|
||||
client1.list_networks(retrieve_all=False)
|
||||
self.assertEqual('new_token1', token_store.admin_auth_token)
|
||||
client1 = neutronapi.get_client(my_context, True)
|
||||
client1.list_networks(retrieve_all=False)
|
||||
self.assertEqual('new_token1', token_store.admin_auth_token)
|
||||
|
||||
def test_admin_token_updated(self):
|
||||
self.flags(url='http://anyhost/', group='neutron')
|
||||
self.flags(url_timeout=30, group='neutron')
|
||||
token_store = neutronapi.AdminTokenStore.get()
|
||||
token_store.admin_auth_token = 'new_token'
|
||||
tokens = [{'auth_token': 'new_token1'}, {'auth_token': 'new_token'}]
|
||||
my_context = context.RequestContext('userid', 'my_tenantid',
|
||||
auth_token='token')
|
||||
with contextlib.nested(
|
||||
mock.patch.object(client.Client, "list_networks",
|
||||
side_effect=mock.Mock),
|
||||
mock.patch.object(client.Client, 'get_auth_info',
|
||||
side_effect=tokens.pop),
|
||||
):
|
||||
client1 = neutronapi.get_client(my_context, True)
|
||||
client1.list_networks(retrieve_all=False)
|
||||
self.assertEqual('new_token', token_store.admin_auth_token)
|
||||
client1 = neutronapi.get_client(my_context, True)
|
||||
client1.list_networks(retrieve_all=False)
|
||||
self.assertEqual('new_token1', token_store.admin_auth_token)
|
||||
tokens = ['new_token2', 'new_token1']
|
||||
|
||||
def token_vals(*args, **kwargs):
|
||||
return tokens.pop()
|
||||
|
||||
m.get_token.side_effect = token_vals
|
||||
|
||||
client1 = neutronapi.get_client(my_context, True)
|
||||
client1.list_networks(retrieve_all=False)
|
||||
self.assertEqual('new_token1', client1.httpclient.auth.get_token(None))
|
||||
|
||||
client1 = neutronapi.get_client(my_context, True)
|
||||
client1.list_networks(retrieve_all=False)
|
||||
self.assertEqual('new_token2', client1.httpclient.auth.get_token(None))
|
||||
|
||||
|
||||
class TestNeutronv2Base(test.NoDBTestCase):
|
||||
@@ -3577,14 +3552,15 @@ class TestNeutronv2ExtraDhcpOpts(TestNeutronv2Base):
|
||||
|
||||
class TestNeutronClientForAdminScenarios(test.NoDBTestCase):
|
||||
|
||||
def _test_get_client_for_admin(self, use_id=False, admin_context=False):
|
||||
|
||||
def client_mock(*args, **kwargs):
|
||||
client.Client.httpclient = mock.MagicMock()
|
||||
@mock.patch('keystoneclient.auth.identity.v2.Password.get_token')
|
||||
def _test_get_client_for_admin(self, auth_mock,
|
||||
use_id=False, admin_context=False):
|
||||
token_value = uuid.uuid4().hex
|
||||
auth_mock.return_value = token_value
|
||||
|
||||
self.flags(auth_strategy=None, group='neutron')
|
||||
self.flags(url='http://anyhost/', group='neutron')
|
||||
self.flags(url_timeout=30, group='neutron')
|
||||
self.flags(timeout=30, group='neutron')
|
||||
if use_id:
|
||||
self.flags(admin_tenant_id='admin_tenant_id', group='neutron')
|
||||
self.flags(admin_user_id='admin_user_id', group='neutron')
|
||||
@@ -3593,39 +3569,47 @@ class TestNeutronClientForAdminScenarios(test.NoDBTestCase):
|
||||
my_context = context.get_admin_context()
|
||||
else:
|
||||
my_context = context.RequestContext('userid', 'my_tenantid',
|
||||
auth_token='token')
|
||||
self.mox.StubOutWithMock(client.Client, "__init__")
|
||||
kwargs = {
|
||||
'auth_url': CONF.neutron.admin_auth_url,
|
||||
'password': CONF.neutron.admin_password,
|
||||
'endpoint_url': CONF.neutron.url,
|
||||
'auth_strategy': None,
|
||||
'timeout': CONF.neutron.url_timeout,
|
||||
'insecure': False,
|
||||
'ca_cert': None,
|
||||
'token': None}
|
||||
if use_id:
|
||||
kwargs['tenant_id'] = CONF.neutron.admin_tenant_id
|
||||
kwargs['user_id'] = CONF.neutron.admin_user_id
|
||||
else:
|
||||
kwargs['tenant_name'] = CONF.neutron.admin_tenant_name
|
||||
kwargs['username'] = CONF.neutron.admin_username
|
||||
client.Client.__init__(**kwargs).WithSideEffects(client_mock)
|
||||
self.mox.ReplayAll()
|
||||
auth_token='token')
|
||||
|
||||
# clean global
|
||||
token_store = neutronapi.AdminTokenStore.get()
|
||||
token_store.admin_auth_token = None
|
||||
neutronapi.reset_state()
|
||||
|
||||
if admin_context:
|
||||
# Note that the context does not contain a token but is
|
||||
# an admin context which will force an elevation to admin
|
||||
# credentials.
|
||||
neutronapi.get_client(my_context)
|
||||
context_client = neutronapi.get_client(my_context)
|
||||
else:
|
||||
# Note that the context is not elevated, but the True is passed in
|
||||
# which will force an elevation to admin credentials even though
|
||||
# the context has an auth_token.
|
||||
neutronapi.get_client(my_context, True)
|
||||
context_client = neutronapi.get_client(my_context, True)
|
||||
|
||||
admin_auth = neutronapi._ADMIN_AUTH
|
||||
|
||||
self.assertEqual(CONF.neutron.admin_auth_url, admin_auth.auth_url)
|
||||
self.assertEqual(CONF.neutron.admin_password, admin_auth.password)
|
||||
|
||||
if use_id:
|
||||
self.assertEqual(CONF.neutron.admin_tenant_id,
|
||||
admin_auth.tenant_id)
|
||||
self.assertEqual(CONF.neutron.admin_user_id, admin_auth.user_id)
|
||||
|
||||
self.assertIsNone(admin_auth.tenant_name)
|
||||
self.assertIsNone(admin_auth.username)
|
||||
else:
|
||||
self.assertEqual(CONF.neutron.admin_tenant_name,
|
||||
admin_auth.tenant_name)
|
||||
self.assertEqual(CONF.neutron.admin_username, admin_auth.username)
|
||||
|
||||
self.assertIsNone(admin_auth.tenant_id)
|
||||
self.assertIsNone(admin_auth.user_id)
|
||||
|
||||
self.assertEqual(CONF.neutron.timeout, neutronapi._SESSION.timeout)
|
||||
|
||||
self.assertEqual(token_value, context_client.httpclient.auth.token)
|
||||
self.assertEqual(CONF.neutron.url,
|
||||
context_client.httpclient.auth.endpoint)
|
||||
|
||||
def test_get_client_for_admin(self):
|
||||
self._test_get_client_for_admin()
|
||||
|
Reference in New Issue
Block a user