Support project_id and project_name in ContextBase class
In order for users to migrate to using project_id and project_name, add keyword arguments for them, which take precedence over their tenant_* counterparts. Add tests to cover the permutations. Moved 'project_id' to be the second positional argument so callers not passing by keyword automatically change to using it. Started using project_id instead of tenant_id where possible. Updated exception HANetworkConcurrentDeletion class to accept project_id and warn when tenant_id passed. Change-Id: Ia6fe45f64e0ed1a7533af85b58f86e8dc37a3441 Blueprint: https://blueprints.launchpad.net/neutron/+spec/keystone-v3
This commit is contained in:
@@ -33,15 +33,27 @@ class ContextBase(oslo_context.RequestContext):
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, user_id=None, tenant_id=None, is_admin=None,
|
def __init__(self, user_id=None, project_id=None, is_admin=None,
|
||||||
timestamp=None, tenant_name=None, user_name=None,
|
timestamp=None, project_name=None, user_name=None,
|
||||||
is_advsvc=None, **kwargs):
|
is_advsvc=None, tenant_id=None, tenant_name=None,
|
||||||
# NOTE(jamielennox): We maintain this argument in order for tests that
|
**kwargs):
|
||||||
|
# NOTE(jamielennox): We maintain this argument order for tests that
|
||||||
# pass arguments positionally.
|
# pass arguments positionally.
|
||||||
kwargs.setdefault('project_id', tenant_id)
|
|
||||||
# prefer project_name, as that's what's going to be set by
|
# Prefer project_id and project_name, as that's what's going to be
|
||||||
# keystone. Fall back to tenant_name if for some reason it's blank.
|
# set by keystone.
|
||||||
kwargs.setdefault('project_name', tenant_name)
|
# NOTE(haleyb): remove fall-back and warning in E+2 release, or when
|
||||||
|
# all callers have been changed to use project_*.
|
||||||
|
project_id = project_id or tenant_id
|
||||||
|
project_name = project_name or tenant_name
|
||||||
|
if tenant_id:
|
||||||
|
LOG.warning('Keyword tenant_id has been deprecated, use '
|
||||||
|
'project_id instead')
|
||||||
|
if tenant_name:
|
||||||
|
LOG.warning('Keyword tenant_name has been deprecated, use '
|
||||||
|
'project_name instead')
|
||||||
|
kwargs.setdefault('project_id', project_id)
|
||||||
|
kwargs.setdefault('project_name', project_name)
|
||||||
super().__init__(
|
super().__init__(
|
||||||
is_admin=is_admin, user_id=user_id, **kwargs)
|
is_admin=is_admin, user_id=user_id, **kwargs)
|
||||||
|
|
||||||
@@ -199,7 +211,7 @@ def get_admin_context():
|
|||||||
# explicity here will avoid checking in policy rules if is_admin should be
|
# explicity here will avoid checking in policy rules if is_admin should be
|
||||||
# set to True or not
|
# set to True or not
|
||||||
return Context(user_id=None,
|
return Context(user_id=None,
|
||||||
tenant_id=None,
|
project_id=None,
|
||||||
is_admin=True,
|
is_admin=True,
|
||||||
overwrite=False).elevated()
|
overwrite=False).elevated()
|
||||||
|
|
||||||
@@ -208,4 +220,4 @@ def get_admin_context_without_session():
|
|||||||
# NOTE(slaweq): elevated() method will set is_admin=True but setting it
|
# NOTE(slaweq): elevated() method will set is_admin=True but setting it
|
||||||
# explicity here will avoid checking in policy rules if is_admin should be
|
# explicity here will avoid checking in policy rules if is_admin should be
|
||||||
# set to True or not
|
# set to True or not
|
||||||
return ContextBase(user_id=None, tenant_id=None, is_admin=True).elevated()
|
return ContextBase(user_id=None, project_id=None, is_admin=True).elevated()
|
||||||
|
@@ -159,9 +159,9 @@ def query_with_hooks(context, model, field=None, lazy_fields=None):
|
|||||||
query = query.outerjoin(model.rbac_entries)
|
query = query.outerjoin(model.rbac_entries)
|
||||||
rbac_model = model.rbac_entries.property.mapper.class_
|
rbac_model = model.rbac_entries.property.mapper.class_
|
||||||
query_filter = (
|
query_filter = (
|
||||||
(model.tenant_id == context.tenant_id) |
|
(model.tenant_id == context.project_id) |
|
||||||
(rbac_model.action.in_(get_rbac_actions(model)) &
|
(rbac_model.action.in_(get_rbac_actions(model)) &
|
||||||
((rbac_model.target_project == context.tenant_id) |
|
((rbac_model.target_project == context.project_id) |
|
||||||
(rbac_model.target_project == '*'))))
|
(rbac_model.target_project == '*'))))
|
||||||
# This "group_by" clause will limit the number of registers
|
# This "group_by" clause will limit the number of registers
|
||||||
# returned by the query, avoiding the problem of the low SQL
|
# returned by the query, avoiding the problem of the low SQL
|
||||||
@@ -169,10 +169,10 @@ def query_with_hooks(context, model, field=None, lazy_fields=None):
|
|||||||
# project ID.
|
# project ID.
|
||||||
group_by = model.id
|
group_by = model.id
|
||||||
elif hasattr(model, 'shared'):
|
elif hasattr(model, 'shared'):
|
||||||
query_filter = ((model.tenant_id == context.tenant_id) |
|
query_filter = ((model.tenant_id == context.project_id) |
|
||||||
(model.shared == sql.true()))
|
(model.shared == sql.true()))
|
||||||
else:
|
else:
|
||||||
query_filter = (model.tenant_id == context.tenant_id)
|
query_filter = (model.tenant_id == context.project_id)
|
||||||
# Execute query hooks registered from mixins and plugins
|
# Execute query hooks registered from mixins and plugins
|
||||||
for hook in get_hooks(model):
|
for hook in get_hooks(model):
|
||||||
query_hook = helpers.resolve_ref(hook.get('query'))
|
query_hook = helpers.resolve_ref(hook.get('query'))
|
||||||
@@ -261,7 +261,7 @@ def apply_filters(query, model, filters, context=None):
|
|||||||
rbac = model.rbac_entries.property.mapper.class_
|
rbac = model.rbac_entries.property.mapper.class_
|
||||||
matches = [rbac.target_project == '*']
|
matches = [rbac.target_project == '*']
|
||||||
if context:
|
if context:
|
||||||
matches.append(rbac.target_project == context.tenant_id)
|
matches.append(rbac.target_project == context.project_id)
|
||||||
# any 'access_as_shared' records that match the
|
# any 'access_as_shared' records that match the
|
||||||
# wildcard or requesting tenant
|
# wildcard or requesting tenant
|
||||||
is_shared = and_(rbac.action == constants.ACCESS_SHARED,
|
is_shared = and_(rbac.action == constants.ACCESS_SHARED,
|
||||||
|
@@ -196,7 +196,7 @@ def model_query(context, model):
|
|||||||
# define basic filter condition for model query
|
# define basic filter condition for model query
|
||||||
query_filter = None
|
query_filter = None
|
||||||
if model_query_scope_is_project(context, model):
|
if model_query_scope_is_project(context, model):
|
||||||
query_filter = (model.tenant_id == context.tenant_id)
|
query_filter = (model.tenant_id == context.project_id)
|
||||||
|
|
||||||
if query_filter is not None:
|
if query_filter is not None:
|
||||||
query = query.filter(query_filter)
|
query = query.filter(query_filter)
|
||||||
|
@@ -780,7 +780,7 @@ class CTZoneExhaustedError(NeutronException):
|
|||||||
|
|
||||||
|
|
||||||
class TenantQuotaNotFound(NotFound):
|
class TenantQuotaNotFound(NotFound):
|
||||||
message = _("Quota for tenant %(tenant_id)s could not be found.")
|
message = _("Quota for project %(tenant_id)s could not be found.")
|
||||||
|
|
||||||
|
|
||||||
class MultipleFilterIDForIPFound(Conflict):
|
class MultipleFilterIDForIPFound(Conflict):
|
||||||
|
@@ -12,9 +12,13 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
from oslo_log import log as logging
|
||||||
|
|
||||||
from neutron_lib._i18n import _
|
from neutron_lib._i18n import _
|
||||||
from neutron_lib import exceptions
|
from neutron_lib import exceptions
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class MaxVRIDAllocationTriesReached(exceptions.NeutronException):
|
class MaxVRIDAllocationTriesReached(exceptions.NeutronException):
|
||||||
message = _("Failed to allocate a VRID in the network %(network_id)s "
|
message = _("Failed to allocate a VRID in the network %(network_id)s "
|
||||||
@@ -28,7 +32,19 @@ class NoVRIDAvailable(exceptions.Conflict):
|
|||||||
|
|
||||||
|
|
||||||
class HANetworkConcurrentDeletion(exceptions.Conflict):
|
class HANetworkConcurrentDeletion(exceptions.Conflict):
|
||||||
message = _("Network for tenant %(tenant_id)s concurrently deleted.")
|
message = _("Network for project %(project_id)s concurrently deleted.")
|
||||||
|
|
||||||
|
# NOTE(haleyb): remove fall-back and warning in E+2 release, or when
|
||||||
|
# all callers have been changed to use project_id.
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
project_id = kwargs.get('project_id')
|
||||||
|
tenant_id = kwargs.get('tenant_id')
|
||||||
|
project_id = project_id or tenant_id
|
||||||
|
if tenant_id:
|
||||||
|
LOG.warning('Keyword tenant_id has been deprecated, use '
|
||||||
|
'project_id instead')
|
||||||
|
kwargs.setdefault('project_id', project_id)
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
|
||||||
|
|
||||||
class HANetworkCIDRNotValid(exceptions.NeutronException):
|
class HANetworkCIDRNotValid(exceptions.NeutronException):
|
||||||
|
@@ -198,11 +198,11 @@ class TestAttributeInfo(base.BaseTestCase):
|
|||||||
{'key': 1}, self._EXC_CLS)
|
{'key': 1}, self._EXC_CLS)
|
||||||
|
|
||||||
def test_populate_project_id_admin_req(self):
|
def test_populate_project_id_admin_req(self):
|
||||||
tenant_id_1 = uuidutils.generate_uuid()
|
project_id_1 = uuidutils.generate_uuid()
|
||||||
tenant_id_2 = uuidutils.generate_uuid()
|
project_id_2 = uuidutils.generate_uuid()
|
||||||
# non-admin users can't create a res on behalf of another project
|
# non-admin users can't create a res on behalf of another project
|
||||||
ctx = context.Context(user_id=None, tenant_id=tenant_id_1)
|
ctx = context.Context(user_id=None, project_id=project_id_1)
|
||||||
res_dict = {'tenant_id': tenant_id_2}
|
res_dict = {'project_id': project_id_2}
|
||||||
attr_inst = attributes.AttributeInfo({})
|
attr_inst = attributes.AttributeInfo({})
|
||||||
self.assertRaises(exc.HTTPBadRequest,
|
self.assertRaises(exc.HTTPBadRequest,
|
||||||
attr_inst.populate_project_id,
|
attr_inst.populate_project_id,
|
||||||
@@ -212,26 +212,26 @@ class TestAttributeInfo(base.BaseTestCase):
|
|||||||
attr_inst.populate_project_id(ctx, res_dict, is_create=False)
|
attr_inst.populate_project_id(ctx, res_dict, is_create=False)
|
||||||
|
|
||||||
def test_populate_project_id_from_context(self):
|
def test_populate_project_id_from_context(self):
|
||||||
tenant_id = uuidutils.generate_uuid()
|
project_id = uuidutils.generate_uuid()
|
||||||
ctx = context.Context(user_id=None, tenant_id=tenant_id)
|
ctx = context.Context(user_id=None, project_id=project_id)
|
||||||
# for each create request, for the resources which require tenant_id,
|
# for each create request, for the resources which require project_id,
|
||||||
# it should be added to the req body
|
# it should be added to the req body
|
||||||
res_dict = {}
|
res_dict = {}
|
||||||
attr_inst = attributes.AttributeInfo(
|
attr_inst = attributes.AttributeInfo(
|
||||||
{'tenant_id': {'allow_post': True}})
|
{'project_id': {'allow_post': True}})
|
||||||
attr_inst.populate_project_id(ctx, res_dict, is_create=True)
|
attr_inst.populate_project_id(ctx, res_dict, is_create=True)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
{'tenant_id': ctx.tenant_id, 'project_id': ctx.tenant_id},
|
{'tenant_id': ctx.project_id, 'project_id': ctx.project_id},
|
||||||
res_dict)
|
res_dict)
|
||||||
|
|
||||||
def test_populate_project_id_mandatory_not_specified(self):
|
def test_populate_project_id_mandatory_not_specified(self):
|
||||||
tenant_id = uuidutils.generate_uuid()
|
project_id = uuidutils.generate_uuid()
|
||||||
ctx = context.Context(user_id=None, tenant_id=tenant_id)
|
ctx = context.Context(user_id=None, project_id=project_id)
|
||||||
# if the tenant_id is mandatory for the resource and not specified
|
# if the project_id is mandatory for the resource and not specified
|
||||||
# in the request nor in the context, an exception should be raised
|
# in the request nor in the context, an exception should be raised
|
||||||
res_dict = {}
|
res_dict = {}
|
||||||
attr_info = {'tenant_id': {'allow_post': True}}
|
attr_info = {'project_id': {'allow_post': True}}
|
||||||
ctx.tenant_id = None
|
ctx.project_id = None
|
||||||
attr_inst = attributes.AttributeInfo(attr_info)
|
attr_inst = attributes.AttributeInfo(attr_info)
|
||||||
self.assertRaises(exc.HTTPBadRequest,
|
self.assertRaises(exc.HTTPBadRequest,
|
||||||
attr_inst.populate_project_id,
|
attr_inst.populate_project_id,
|
||||||
@@ -239,11 +239,11 @@ class TestAttributeInfo(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_populate_project_id_not_mandatory(self):
|
def test_populate_project_id_not_mandatory(self):
|
||||||
ctx = context.Context(user_id=None)
|
ctx = context.Context(user_id=None)
|
||||||
# if the tenant_id is not mandatory for the resource it should be
|
# if the project_id is not mandatory for the resource it should be
|
||||||
# OK if it is not in the request.
|
# OK if it is not in the request.
|
||||||
res_dict = {'name': 'test_port'}
|
res_dict = {'name': 'test_port'}
|
||||||
attr_inst = attributes.AttributeInfo({})
|
attr_inst = attributes.AttributeInfo({})
|
||||||
ctx.tenant_id = None
|
ctx.project_id = None
|
||||||
attr_inst.populate_project_id(ctx, res_dict, True)
|
attr_inst.populate_project_id(ctx, res_dict, True)
|
||||||
self.assertEqual({'name': 'test_port'}, res_dict)
|
self.assertEqual({'name': 'test_port'}, res_dict)
|
||||||
|
|
||||||
@@ -253,7 +253,7 @@ class TestAttributeInfo(base.BaseTestCase):
|
|||||||
def test_verify_attributes_ok_with_project_id(self):
|
def test_verify_attributes_ok_with_project_id(self):
|
||||||
attributes.AttributeInfo(
|
attributes.AttributeInfo(
|
||||||
{'tenant_id': 'foo', 'project_id': 'foo'}).verify_attributes(
|
{'tenant_id': 'foo', 'project_id': 'foo'}).verify_attributes(
|
||||||
{'tenant_id': 'foo'})
|
{'project_id': 'foo'})
|
||||||
|
|
||||||
def test_verify_attributes_ok_subset(self):
|
def test_verify_attributes_ok_subset(self):
|
||||||
attributes.AttributeInfo(
|
attributes.AttributeInfo(
|
||||||
@@ -351,6 +351,9 @@ class TestRetrieveValidSortKeys(base.BaseTestCase):
|
|||||||
},
|
},
|
||||||
"tenant_id": {
|
"tenant_id": {
|
||||||
"visible": True,
|
"visible": True,
|
||||||
|
},
|
||||||
|
"project_id": {
|
||||||
|
"visible": True,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
expect_val = {"id", "name"}
|
expect_val = {"id", "name"}
|
||||||
|
@@ -23,7 +23,7 @@ import neutron_lib.callbacks.exceptions as ex
|
|||||||
from neutron_lib.tests.unit.exceptions import test_exceptions
|
from neutron_lib.tests.unit.exceptions import test_exceptions
|
||||||
|
|
||||||
|
|
||||||
class TestCallbackExceptions(test_exceptions.TestExceptions):
|
class TestCallbackExceptions(test_exceptions.TestExceptionsBase):
|
||||||
|
|
||||||
def _check_exception(self, exc_class, expected_msg, **kwargs):
|
def _check_exception(self, exc_class, expected_msg, **kwargs):
|
||||||
raise_exc_class = functools.partial(test_exceptions._raise, exc_class)
|
raise_exc_class = functools.partial(test_exceptions._raise, exc_class)
|
||||||
|
@@ -28,13 +28,16 @@ def _raise(exc_class, **kwargs):
|
|||||||
raise exc_class(**kwargs)
|
raise exc_class(**kwargs)
|
||||||
|
|
||||||
|
|
||||||
class TestExceptions(base.BaseTestCase):
|
class TestExceptionsBase(base.BaseTestCase):
|
||||||
|
|
||||||
def _check_nexc(self, exc_class, expected_msg, **kwargs):
|
def _check_nexc(self, exc_class, expected_msg, **kwargs):
|
||||||
raise_exc_class = functools.partial(_raise, exc_class)
|
raise_exc_class = functools.partial(_raise, exc_class)
|
||||||
e = self.assertRaises(exc_class, raise_exc_class, **kwargs)
|
e = self.assertRaises(exc_class, raise_exc_class, **kwargs)
|
||||||
self.assertEqual(expected_msg, str(e))
|
self.assertEqual(expected_msg, str(e))
|
||||||
|
|
||||||
|
|
||||||
|
class TestExceptions(TestExceptionsBase):
|
||||||
|
|
||||||
def test_base(self):
|
def test_base(self):
|
||||||
self._check_nexc(
|
self._check_nexc(
|
||||||
ne.NeutronException,
|
ne.NeutronException,
|
||||||
|
45
neutron_lib/tests/unit/exceptions/test_l3_ext_ha_mode.py
Normal file
45
neutron_lib/tests/unit/exceptions/test_l3_ext_ha_mode.py
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
import neutron_lib.exceptions.l3_ext_ha_mode as lehm
|
||||||
|
from neutron_lib.tests.unit.exceptions import test_exceptions
|
||||||
|
|
||||||
|
|
||||||
|
class TestHANetworkConcurrentDeletion(test_exceptions.TestExceptionsBase):
|
||||||
|
|
||||||
|
# NOTE(haleyb) We only test HANetworkConcurrentDeletion because it is
|
||||||
|
# the only one with an __init__() section.
|
||||||
|
def test_ha_network_concurrent_deletion(self):
|
||||||
|
self._check_nexc(
|
||||||
|
lehm.HANetworkConcurrentDeletion,
|
||||||
|
_('Network for project project_id concurrently deleted.'),
|
||||||
|
project_id='project_id')
|
||||||
|
|
||||||
|
@mock.patch.object(lehm, 'LOG')
|
||||||
|
def test_ha_network_concurrent_deletion_tenant_id(self, mock_log):
|
||||||
|
self._check_nexc(
|
||||||
|
lehm.HANetworkConcurrentDeletion,
|
||||||
|
_('Network for project tenant_id concurrently deleted.'),
|
||||||
|
tenant_id='tenant_id')
|
||||||
|
mock_log.warning.assert_called_once_with(
|
||||||
|
'Keyword tenant_id has been deprecated, use project_id instead')
|
||||||
|
|
||||||
|
@mock.patch.object(lehm, 'LOG')
|
||||||
|
def test_ha_network_concurrent_deletion_both(self, mock_log):
|
||||||
|
self._check_nexc(
|
||||||
|
lehm.HANetworkConcurrentDeletion,
|
||||||
|
_('Network for project project_id concurrently deleted.'),
|
||||||
|
project_id='project_id', tenant_id='tenant_id')
|
||||||
|
mock_log.warning.assert_called_once_with(
|
||||||
|
'Keyword tenant_id has been deprecated, use project_id instead')
|
@@ -28,11 +28,11 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
self._db_api_session_patcher = mock.patch(db_api)
|
self._db_api_session_patcher = mock.patch(db_api)
|
||||||
self.db_api_session = self._db_api_session_patcher.start()
|
self.db_api_session = self._db_api_session_patcher.start()
|
||||||
|
|
||||||
def test_neutron_context_create(self):
|
def test_neutron_context_create_positional(self):
|
||||||
ctx = context.Context('user_id', 'tenant_id')
|
ctx = context.Context('user_id', 'project_id')
|
||||||
self.assertEqual('user_id', ctx.user_id)
|
self.assertEqual('user_id', ctx.user_id)
|
||||||
self.assertEqual('tenant_id', ctx.project_id)
|
self.assertEqual('project_id', ctx.project_id)
|
||||||
self.assertEqual('tenant_id', ctx.tenant_id)
|
self.assertEqual('project_id', ctx.tenant_id)
|
||||||
request_id = ctx.request_id
|
request_id = ctx.request_id
|
||||||
if isinstance(request_id, bytes):
|
if isinstance(request_id, bytes):
|
||||||
request_id = request_id.decode('utf-8')
|
request_id = request_id.decode('utf-8')
|
||||||
@@ -42,67 +42,116 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
self.assertIsNone(ctx.project_name)
|
self.assertIsNone(ctx.project_name)
|
||||||
self.assertIsNone(ctx.auth_token)
|
self.assertIsNone(ctx.auth_token)
|
||||||
|
|
||||||
def test_neutron_context_getter_setter(self):
|
def _test_neutron_context_create(self, project_id, tenant_id):
|
||||||
ctx = context.Context('Anakin', 'Skywalker')
|
p_id = project_id or tenant_id
|
||||||
|
ctx = context.Context('user_id', project_id=project_id,
|
||||||
|
tenant_id=tenant_id)
|
||||||
|
self.assertEqual('user_id', ctx.user_id)
|
||||||
|
self.assertEqual(p_id, ctx.project_id)
|
||||||
|
self.assertEqual(p_id, ctx.tenant_id)
|
||||||
|
request_id = ctx.request_id
|
||||||
|
if isinstance(request_id, bytes):
|
||||||
|
request_id = request_id.decode('utf-8')
|
||||||
|
self.assertThat(request_id, matchers.StartsWith('req-'))
|
||||||
|
self.assertIsNone(ctx.user_name)
|
||||||
|
self.assertIsNone(ctx.tenant_name)
|
||||||
|
self.assertIsNone(ctx.project_name)
|
||||||
|
self.assertIsNone(ctx.auth_token)
|
||||||
|
|
||||||
|
def test_neutron_context_create(self):
|
||||||
|
self._test_neutron_context_create('project_id', None)
|
||||||
|
|
||||||
|
def test_neutron_context_create_tenant_id(self):
|
||||||
|
self._test_neutron_context_create(None, 'tenant_id')
|
||||||
|
|
||||||
|
def _test_neutron_context_getter_setter(self, project_id, tenant_id):
|
||||||
|
p_id = project_id or tenant_id
|
||||||
|
ctx = context.Context('Anakin', project_id=project_id,
|
||||||
|
tenant_id=tenant_id)
|
||||||
self.assertEqual('Anakin', ctx.user_id)
|
self.assertEqual('Anakin', ctx.user_id)
|
||||||
self.assertEqual('Skywalker', ctx.tenant_id)
|
self.assertEqual(p_id, ctx.tenant_id)
|
||||||
ctx.user_id = 'Darth'
|
ctx.user_id = 'Darth'
|
||||||
ctx.tenant_id = 'Vader'
|
ctx.project_id = 'Vader'
|
||||||
self.assertEqual('Darth', ctx.user_id)
|
self.assertEqual('Darth', ctx.user_id)
|
||||||
self.assertEqual('Vader', ctx.tenant_id)
|
self.assertEqual('Vader', ctx.tenant_id)
|
||||||
|
self.assertEqual('Vader', ctx.project_id)
|
||||||
|
|
||||||
def test_neutron_context_create_with_name(self):
|
def test_neutron_context_getter_setter(self):
|
||||||
ctx = context.Context('user_id', 'tenant_id',
|
self._test_neutron_context_getter_setter('Skywalker', None)
|
||||||
tenant_name='tenant_name', user_name='user_name')
|
|
||||||
|
def test_neutron_context_getter_setter_tenant_id(self):
|
||||||
|
self._test_neutron_context_getter_setter(None, 'Skywalker')
|
||||||
|
|
||||||
|
def _test_neutron_context_create_with_name(
|
||||||
|
self, project_name, tenant_name):
|
||||||
|
project_name = project_name or tenant_name
|
||||||
|
ctx = context.Context('user_id', 'project_id',
|
||||||
|
tenant_name=tenant_name, user_name='user_name',
|
||||||
|
project_name=project_name)
|
||||||
# Check name is set
|
# Check name is set
|
||||||
self.assertEqual('user_name', ctx.user_name)
|
self.assertEqual('user_name', ctx.user_name)
|
||||||
self.assertEqual('tenant_name', ctx.tenant_name)
|
self.assertEqual(project_name, ctx.tenant_name)
|
||||||
self.assertEqual('tenant_name', ctx.project_name)
|
self.assertEqual(project_name, ctx.project_name)
|
||||||
# Check user/tenant contains its ID even if user/tenant_name is passed
|
# Check user/tenant contains its ID even if user/tenant_name is passed
|
||||||
self.assertEqual('user_id', ctx.user_id)
|
self.assertEqual('user_id', ctx.user_id)
|
||||||
self.assertEqual('tenant_id', ctx.tenant_id)
|
self.assertEqual('project_id', ctx.project_id)
|
||||||
|
self.assertEqual('project_id', ctx.tenant_id)
|
||||||
|
|
||||||
|
def test_neutron_context_create_with_name(self):
|
||||||
|
self._test_neutron_context_create_with_name('project_name', None)
|
||||||
|
|
||||||
|
def test_neutron_context_create_with_name_tenant_name(self):
|
||||||
|
self._test_neutron_context_create_with_name(None, 'tenant_name')
|
||||||
|
|
||||||
def test_neutron_context_create_with_request_id(self):
|
def test_neutron_context_create_with_request_id(self):
|
||||||
ctx = context.Context('user_id', 'tenant_id', request_id='req_id_xxx')
|
ctx = context.Context('user_id', 'project_id', request_id='req_id_xxx')
|
||||||
self.assertEqual('req_id_xxx', ctx.request_id)
|
self.assertEqual('req_id_xxx', ctx.request_id)
|
||||||
|
|
||||||
def test_neutron_context_create_with_timestamp(self):
|
def test_neutron_context_create_with_timestamp(self):
|
||||||
now = "Right Now!"
|
now = "Right Now!"
|
||||||
ctx = context.Context('user_id', 'tenant_id', timestamp=now)
|
ctx = context.Context('user_id', 'project_id', timestamp=now)
|
||||||
self.assertEqual(now, ctx.timestamp)
|
self.assertEqual(now, ctx.timestamp)
|
||||||
|
|
||||||
def test_neutron_context_create_is_advsvc(self):
|
def test_neutron_context_create_is_advsvc(self):
|
||||||
ctx = context.Context('user_id', 'tenant_id', is_advsvc=True)
|
ctx = context.Context('user_id', 'project_id', is_advsvc=True)
|
||||||
self.assertFalse(ctx.is_admin)
|
self.assertFalse(ctx.is_admin)
|
||||||
self.assertTrue(ctx.is_advsvc)
|
self.assertTrue(ctx.is_advsvc)
|
||||||
|
|
||||||
def test_neutron_context_create_is_service_role(self):
|
def test_neutron_context_create_is_service_role(self):
|
||||||
ctx = context.Context('user_id', 'tenant_id', roles=['service'])
|
ctx = context.Context('user_id', 'project_id', roles=['service'])
|
||||||
self.assertFalse(ctx.is_admin)
|
self.assertFalse(ctx.is_admin)
|
||||||
self.assertTrue(ctx.is_service_role)
|
self.assertTrue(ctx.is_service_role)
|
||||||
|
|
||||||
def test_neutron_context_create_with_auth_token(self):
|
def test_neutron_context_create_with_auth_token(self):
|
||||||
ctx = context.Context('user_id', 'tenant_id',
|
ctx = context.Context('user_id', 'project_id',
|
||||||
auth_token='auth_token_xxx')
|
auth_token='auth_token_xxx')
|
||||||
self.assertEqual('auth_token_xxx', ctx.auth_token)
|
self.assertEqual('auth_token_xxx', ctx.auth_token)
|
||||||
|
|
||||||
def test_neutron_context_from_dict(self):
|
def test_neutron_context_from_dict(self):
|
||||||
owner = {'user_id': 'Luke', 'tenant_id': 'Skywalker'}
|
owner = {'user_id': 'Luke', 'project_id': 'Skywalker'}
|
||||||
ctx = context.Context.from_dict(owner)
|
ctx = context.Context.from_dict(owner)
|
||||||
self.assertEqual(owner['user_id'], ctx.user_id)
|
self.assertEqual(owner['user_id'], ctx.user_id)
|
||||||
self.assertEqual(owner['tenant_id'], ctx.tenant_id)
|
self.assertEqual(owner['project_id'], ctx.project_id)
|
||||||
|
self.assertEqual(owner['project_id'], ctx.tenant_id)
|
||||||
|
|
||||||
def test_neutron_context_from_dict_validate(self):
|
def _test_neutron_context_from_dict_validate(
|
||||||
|
self, project_id, project_name, tenant_id, tenant_name):
|
||||||
|
project_id = project_id or tenant_id
|
||||||
|
project_name = project_name or tenant_name
|
||||||
ctx = context.Context(user_id='user_id1',
|
ctx = context.Context(user_id='user_id1',
|
||||||
user_name='user',
|
user_name='user',
|
||||||
tenant_id='tenant_id1',
|
project_id=project_id,
|
||||||
tenant_name='project1',
|
project_name=project_name,
|
||||||
|
tenant_id=tenant_id,
|
||||||
|
tenant_name=tenant_name,
|
||||||
request_id='request_id1',
|
request_id='request_id1',
|
||||||
auth_token='auth_token1')
|
auth_token='auth_token1')
|
||||||
values = {'user_id': 'user_id1',
|
values = {'user_id': 'user_id1',
|
||||||
'user_name': 'user',
|
'user_name': 'user',
|
||||||
'tenant_id': 'tenant_id1',
|
'project_id': project_id,
|
||||||
'tenant_name': 'project1',
|
'project_name': project_name,
|
||||||
|
'tenant_id': project_id,
|
||||||
|
'tenant_name': project_name,
|
||||||
'request_id': 'request_id1',
|
'request_id': 'request_id1',
|
||||||
'auth_token': 'auth_token1'}
|
'auth_token': 'auth_token1'}
|
||||||
ctx2 = context.Context.from_dict(values)
|
ctx2 = context.Context.from_dict(values)
|
||||||
@@ -113,11 +162,21 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
ctx2_dict.pop('timestamp')
|
ctx2_dict.pop('timestamp')
|
||||||
self.assertEqual(ctx_dict, ctx2_dict)
|
self.assertEqual(ctx_dict, ctx2_dict)
|
||||||
|
|
||||||
def test_neutron_context_to_dict(self):
|
def test_neutron_context_from_dict_validate(self):
|
||||||
ctx = context.Context('user_id', 'tenant_id')
|
self._test_neutron_context_from_dict_validate(
|
||||||
|
'project_id', 'project_name', None, None)
|
||||||
|
|
||||||
|
def test_neutron_context_from_dict_validate_tenant(self):
|
||||||
|
self._test_neutron_context_from_dict_validate(
|
||||||
|
None, None, 'tenant_id', 'tenant_name')
|
||||||
|
|
||||||
|
def _test_neutron_context_to_dict(self, project_id, tenant_id):
|
||||||
|
project_id = project_id or tenant_id
|
||||||
|
ctx = context.Context('user_id', project_id, tenant_id=tenant_id)
|
||||||
ctx_dict = ctx.to_dict()
|
ctx_dict = ctx.to_dict()
|
||||||
self.assertEqual('user_id', ctx_dict['user_id'])
|
self.assertEqual('user_id', ctx_dict['user_id'])
|
||||||
self.assertEqual('tenant_id', ctx_dict['project_id'])
|
self.assertEqual(project_id, ctx_dict['project_id'])
|
||||||
|
self.assertEqual(project_id, ctx_dict['tenant_id'])
|
||||||
self.assertEqual(ctx.request_id, ctx_dict['request_id'])
|
self.assertEqual(ctx.request_id, ctx_dict['request_id'])
|
||||||
self.assertEqual('user_id', ctx_dict['user'])
|
self.assertEqual('user_id', ctx_dict['user'])
|
||||||
self.assertIsNone(ctx_dict['user_name'])
|
self.assertIsNone(ctx_dict['user_name'])
|
||||||
@@ -125,17 +184,32 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
self.assertIsNone(ctx_dict['project_name'])
|
self.assertIsNone(ctx_dict['project_name'])
|
||||||
self.assertIsNone(ctx_dict['auth_token'])
|
self.assertIsNone(ctx_dict['auth_token'])
|
||||||
|
|
||||||
def test_neutron_context_to_dict_with_name(self):
|
def test_neutron_context_to_dict(self):
|
||||||
ctx = context.Context('user_id', 'tenant_id',
|
self._test_neutron_context_to_dict('project_id', None)
|
||||||
tenant_name='tenant_name',
|
|
||||||
|
def test_neutron_context_to_dict_tenant(self):
|
||||||
|
self._test_neutron_context_to_dict(None, 'tenant_id')
|
||||||
|
|
||||||
|
def _test_neutron_context_to_dict_with_name(
|
||||||
|
self, project_name, tenant_name):
|
||||||
|
project_name = project_name or tenant_name
|
||||||
|
ctx = context.Context('user_id', 'project_id',
|
||||||
|
project_name=project_name,
|
||||||
|
tenant_name=tenant_name,
|
||||||
user_name='user_name')
|
user_name='user_name')
|
||||||
ctx_dict = ctx.to_dict()
|
ctx_dict = ctx.to_dict()
|
||||||
self.assertEqual('user_name', ctx_dict['user_name'])
|
self.assertEqual('user_name', ctx_dict['user_name'])
|
||||||
self.assertEqual('tenant_name', ctx_dict['tenant_name'])
|
self.assertEqual(project_name, ctx_dict['tenant_name'])
|
||||||
self.assertEqual('tenant_name', ctx_dict['project_name'])
|
self.assertEqual(project_name, ctx_dict['project_name'])
|
||||||
|
|
||||||
|
def test_neutron_context_to_dict_with_name(self):
|
||||||
|
self._test_neutron_context_to_dict_with_name('project_name', None)
|
||||||
|
|
||||||
|
def test_neutron_context_to_dict_with_name_tenant(self):
|
||||||
|
self._test_neutron_context_to_dict_with_name(None, 'tenant_name')
|
||||||
|
|
||||||
def test_neutron_context_to_dict_with_auth_token(self):
|
def test_neutron_context_to_dict_with_auth_token(self):
|
||||||
ctx = context.Context('user_id', 'tenant_id',
|
ctx = context.Context('user_id', 'project_id',
|
||||||
auth_token='auth_token_xxx')
|
auth_token='auth_token_xxx')
|
||||||
ctx_dict = ctx.to_dict()
|
ctx_dict = ctx.to_dict()
|
||||||
self.assertEqual('auth_token_xxx', ctx_dict['auth_token'])
|
self.assertEqual('auth_token_xxx', ctx_dict['auth_token'])
|
||||||
@@ -145,6 +219,7 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
ctx = context.get_admin_context()
|
ctx = context.get_admin_context()
|
||||||
ctx_dict = ctx.to_dict()
|
ctx_dict = ctx.to_dict()
|
||||||
self.assertIsNone(ctx_dict['user_id'])
|
self.assertIsNone(ctx_dict['user_id'])
|
||||||
|
self.assertIsNone(ctx_dict['project_id'])
|
||||||
self.assertIsNone(ctx_dict['tenant_id'])
|
self.assertIsNone(ctx_dict['tenant_id'])
|
||||||
self.assertIsNone(ctx_dict['auth_token'])
|
self.assertIsNone(ctx_dict['auth_token'])
|
||||||
self.assertTrue(ctx_dict['is_admin'])
|
self.assertTrue(ctx_dict['is_admin'])
|
||||||
@@ -156,6 +231,7 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
ctx = context.get_admin_context_without_session()
|
ctx = context.get_admin_context_without_session()
|
||||||
ctx_dict = ctx.to_dict()
|
ctx_dict = ctx.to_dict()
|
||||||
self.assertIsNone(ctx_dict['user_id'])
|
self.assertIsNone(ctx_dict['user_id'])
|
||||||
|
self.assertIsNone(ctx_dict['project_id'])
|
||||||
self.assertIsNone(ctx_dict['tenant_id'])
|
self.assertIsNone(ctx_dict['tenant_id'])
|
||||||
self.assertIsNone(ctx_dict['auth_token'])
|
self.assertIsNone(ctx_dict['auth_token'])
|
||||||
self.assertIn('admin', ctx_dict['roles'])
|
self.assertIn('admin', ctx_dict['roles'])
|
||||||
@@ -163,7 +239,7 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
|
|
||||||
def test_neutron_context_elevated_retains_request_id(self):
|
def test_neutron_context_elevated_retains_request_id(self):
|
||||||
expected_roles = ['admin', 'member', 'reader']
|
expected_roles = ['admin', 'member', 'reader']
|
||||||
ctx = context.Context('user_id', 'tenant_id')
|
ctx = context.Context('user_id', 'project_id')
|
||||||
self.assertFalse(ctx.is_admin)
|
self.assertFalse(ctx.is_admin)
|
||||||
req_id_before = ctx.request_id
|
req_id_before = ctx.request_id
|
||||||
|
|
||||||
@@ -174,7 +250,7 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
self.assertIn(expected_role, elevated_ctx.roles)
|
self.assertIn(expected_role, elevated_ctx.roles)
|
||||||
|
|
||||||
def test_neutron_context_elevated_idempotent(self):
|
def test_neutron_context_elevated_idempotent(self):
|
||||||
ctx = context.Context('user_id', 'tenant_id')
|
ctx = context.Context('user_id', 'project_id')
|
||||||
expected_roles = ['admin', 'member', 'reader']
|
expected_roles = ['admin', 'member', 'reader']
|
||||||
self.assertFalse(ctx.is_admin)
|
self.assertFalse(ctx.is_admin)
|
||||||
elevated_ctx = ctx.elevated()
|
elevated_ctx = ctx.elevated()
|
||||||
@@ -189,7 +265,7 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
def test_neutron_context_elevated_system_scope_for_new_policies(self):
|
def test_neutron_context_elevated_system_scope_for_new_policies(self):
|
||||||
cfg.CONF.set_override('enforce_scope', True, group='oslo_policy')
|
cfg.CONF.set_override('enforce_scope', True, group='oslo_policy')
|
||||||
expected_roles = ['admin', 'member', 'reader']
|
expected_roles = ['admin', 'member', 'reader']
|
||||||
ctx = context.Context('user_id', 'tenant_id')
|
ctx = context.Context('user_id', 'project_id')
|
||||||
self.assertFalse(ctx.is_admin)
|
self.assertFalse(ctx.is_admin)
|
||||||
self.assertNotEqual('all', ctx.system_scope)
|
self.assertNotEqual('all', ctx.system_scope)
|
||||||
elevated_ctx = ctx.elevated()
|
elevated_ctx = ctx.elevated()
|
||||||
@@ -202,7 +278,7 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
def test_neutron_context_elevated_keeps_custom_roles(self):
|
def test_neutron_context_elevated_keeps_custom_roles(self):
|
||||||
expected_admin_roles = ['admin', 'member', 'reader']
|
expected_admin_roles = ['admin', 'member', 'reader']
|
||||||
custom_roles = ['custom_role']
|
custom_roles = ['custom_role']
|
||||||
ctx = context.Context('user_id', 'tenant_id', roles=custom_roles)
|
ctx = context.Context('user_id', 'project_id', roles=custom_roles)
|
||||||
self.assertFalse(ctx.is_admin)
|
self.assertFalse(ctx.is_admin)
|
||||||
self.assertNotEqual('all', ctx.system_scope)
|
self.assertNotEqual('all', ctx.system_scope)
|
||||||
for expected_admin_role in expected_admin_roles:
|
for expected_admin_role in expected_admin_roles:
|
||||||
@@ -218,24 +294,24 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
self.assertIn(custom_role, ctx.roles)
|
self.assertIn(custom_role, ctx.roles)
|
||||||
|
|
||||||
def test_neutron_context_overwrite(self):
|
def test_neutron_context_overwrite(self):
|
||||||
ctx1 = context.Context('user_id', 'tenant_id')
|
ctx1 = context.Context('user_id', 'project_id')
|
||||||
self.assertEqual(ctx1.request_id,
|
self.assertEqual(ctx1.request_id,
|
||||||
oslo_context.get_current().request_id)
|
oslo_context.get_current().request_id)
|
||||||
|
|
||||||
# If overwrite is not specified, request_id should be updated.
|
# If overwrite is not specified, request_id should be updated.
|
||||||
ctx2 = context.Context('user_id', 'tenant_id')
|
ctx2 = context.Context('user_id', 'project_id')
|
||||||
self.assertNotEqual(ctx2.request_id, ctx1.request_id)
|
self.assertNotEqual(ctx2.request_id, ctx1.request_id)
|
||||||
self.assertEqual(ctx2.request_id,
|
self.assertEqual(ctx2.request_id,
|
||||||
oslo_context.get_current().request_id)
|
oslo_context.get_current().request_id)
|
||||||
|
|
||||||
# If overwrite is specified, request_id should be kept.
|
# If overwrite is specified, request_id should be kept.
|
||||||
ctx3 = context.Context('user_id', 'tenant_id', overwrite=False)
|
ctx3 = context.Context('user_id', 'project_id', overwrite=False)
|
||||||
self.assertNotEqual(ctx3.request_id, ctx2.request_id)
|
self.assertNotEqual(ctx3.request_id, ctx2.request_id)
|
||||||
self.assertEqual(ctx2.request_id,
|
self.assertEqual(ctx2.request_id,
|
||||||
oslo_context.get_current().request_id)
|
oslo_context.get_current().request_id)
|
||||||
|
|
||||||
def test_neutron_context_get_admin_context_not_update_local_store(self):
|
def test_neutron_context_get_admin_context_not_update_local_store(self):
|
||||||
ctx = context.Context('user_id', 'tenant_id')
|
ctx = context.Context('user_id', 'project_id')
|
||||||
req_id_before = oslo_context.get_current().request_id
|
req_id_before = oslo_context.get_current().request_id
|
||||||
self.assertEqual(ctx.request_id, req_id_before)
|
self.assertEqual(ctx.request_id, req_id_before)
|
||||||
|
|
||||||
@@ -246,9 +322,9 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
def test_to_policy_values(self):
|
def test_to_policy_values(self):
|
||||||
values = {
|
values = {
|
||||||
'user_id': 'user_id',
|
'user_id': 'user_id',
|
||||||
'tenant_id': 'tenant_id',
|
'project_id': 'project_id',
|
||||||
'is_admin': 'is_admin',
|
'is_admin': 'is_admin',
|
||||||
'tenant_name': 'tenant_name',
|
'project_name': 'project_name',
|
||||||
'user_name': 'user_name',
|
'user_name': 'user_name',
|
||||||
'domain_id': 'domain',
|
'domain_id': 'domain',
|
||||||
'user_domain_id': 'user_domain',
|
'user_domain_id': 'user_domain',
|
||||||
@@ -256,9 +332,9 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
}
|
}
|
||||||
additional_values = {
|
additional_values = {
|
||||||
'user': 'user_id',
|
'user': 'user_id',
|
||||||
'tenant': 'tenant_id',
|
'tenant': 'project_id',
|
||||||
'project_id': 'tenant_id',
|
'tenant_id': 'project_id',
|
||||||
'project_name': 'tenant_name',
|
'tenant_name': 'project_name',
|
||||||
}
|
}
|
||||||
ctx = context.Context(**values)
|
ctx = context.Context(**values)
|
||||||
# apply dict() to get a real dictionary, needed for newer oslo.context
|
# apply dict() to get a real dictionary, needed for newer oslo.context
|
||||||
@@ -268,13 +344,13 @@ class TestNeutronContext(_base.BaseTestCase):
|
|||||||
self.assertDictSupersetOf(additional_values, policy_values)
|
self.assertDictSupersetOf(additional_values, policy_values)
|
||||||
|
|
||||||
def test_session_cached(self):
|
def test_session_cached(self):
|
||||||
ctx = context.Context('user_id', 'tenant_id')
|
ctx = context.Context('user_id', 'project_id')
|
||||||
session1 = ctx.session
|
session1 = ctx.session
|
||||||
session2 = ctx.session
|
session2 = ctx.session
|
||||||
self.assertIs(session1, session2)
|
self.assertIs(session1, session2)
|
||||||
|
|
||||||
def test_add_get_remove_constraint(self):
|
def test_add_get_remove_constraint(self):
|
||||||
ctx = context.Context('user_id', 'tenant_id')
|
ctx = context.Context('user_id', 'project_id')
|
||||||
self.assertIsNone(ctx.get_transaction_constraint())
|
self.assertIsNone(ctx.get_transaction_constraint())
|
||||||
ctx.set_transaction_constraint('networks', 'net_id', 44)
|
ctx.set_transaction_constraint('networks', 'net_id', 44)
|
||||||
constraint = ctx.get_transaction_constraint()
|
constraint = ctx.get_transaction_constraint()
|
||||||
|
Reference in New Issue
Block a user