identity: Add tokens

This is mostly handled by keystoneauth, but there are a few non-auth
APIs that we can currently only handle with keystoneclient. Close that
gap.

Change-Id: Iff8bbcf982b817098b42e69201f125202b41fb04
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane
2025-09-05 12:15:59 +01:00
parent 68ff3f7e07
commit a0da527edc
6 changed files with 403 additions and 0 deletions

View File

@@ -86,6 +86,13 @@ User Operations
:members: create_user, update_user, delete_user, get_user, find_user, users,
user_groups
Token Operations
^^^^^^^^^^^^^^^^
.. autoclass:: openstack.identity.v3._proxy.Proxy
:noindex:
:members: validate_token, check_token, revoke_token
Trust Operations
^^^^^^^^^^^^^^^^

View File

@@ -0,0 +1,12 @@
openstack.identity.v3.token
===========================
.. automodule:: openstack.identity.v3.token
The Token Class
---------------
The ``Token`` class inherits from :class:`~openstack.resource.Resource`.
.. autoclass:: openstack.identity.v3.token.Token
:members:

View File

@@ -53,6 +53,7 @@ from openstack.identity.v3 import (
from openstack.identity.v3 import service as _service
from openstack.identity.v3 import service_provider as _service_provider
from openstack.identity.v3 import system as _system
from openstack.identity.v3 import token as _token
from openstack.identity.v3 import trust as _trust
from openstack.identity.v3 import user as _user
from openstack import proxy
@@ -87,6 +88,7 @@ class Proxy(proxy.Proxy):
"service": _service.Service,
"system": _system.System,
"trust": _trust.Trust,
"token": _token.Token,
"user": _user.User,
}
@@ -976,6 +978,43 @@ class Proxy(proxy.Proxy):
"""
return self._update(_user.User, user, **attrs)
# ========== Tokens ==========
def validate_token(
self, token: str, nocatalog: bool = False, allow_expired: bool = False
) -> _token.Token:
"""Validate a token
:param token: The token to validate.
:param nocatalog: Whether the returned token should not include a
catalog.
:param allow_expired: Whether to allow expired tokens.
:returns: A :class:`~openstack.identity.v3.token.Token`.
"""
return _token.Token.validate(
self, token, nocatalog=nocatalog, allow_expired=allow_expired
)
def check_token(self, token: str, allow_expired: bool = False) -> bool:
"""Check if a token is valid.
:param token: The token to check.
:param allow_expired: Whether to allow expired tokens.
:returns: True if valid, else False.
"""
return _token.Token.check(self, token, allow_expired=allow_expired)
def revoke_token(self, token: str) -> None:
"""Revoke a token.
:param token: The token to revoke.
:returns: None
"""
_token.Token.revoke(self, token)
# ========== Trusts ==========
def create_trust(self, **attrs):

View File

@@ -0,0 +1,115 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystoneauth1 import adapter
from openstack import exceptions
from openstack import resource
class Token(resource.Resource):
resource_key = 'token'
base_path = '/auth/tokens'
# capabilities
allow_fetch = False
allow_delete = False
allow_list = False
allow_head = False
# Properties
#: An authentication token. This is used rather than X-Auth-Token to allow
#: users check or revoke a token other than their own.
subject_token = resource.Header('x-subject-token')
#: A list of one or two audit IDs. An audit ID is a unique, randomly
#: generated, URL-safe string that you can use to track a token. The first
#: audit ID is the current audit ID for the token. The second audit ID is
#: present for only re-scoped tokens and is the audit ID from the token
#: before it was re-scoped. A re- scoped token is one that was exchanged
#: for another token of the same or different scope. You can use these
#: audit IDs to track the use of a token or chain of tokens across multiple
#: requests and endpoints without exposing the token ID to non-privileged
#: users.
audit_ids = resource.Body('audit_ids', type=list)
#: The service catalog.
catalog = resource.Body('catalog', type=list, list_type=dict)
#: The date and time when the token expires.
expires_at = resource.Body('expires_at')
#: The date and time when the token was issued.
issued_at = resource.Body('issued_at')
#: The authentication method.
methods = resource.Body('methods', type=list)
#: The user that owns the token.
user = resource.Body('user', type=dict)
#: The project that the token is scoped to, if any.
project = resource.Body('project', type=dict)
#: The domain that the token is scoped to, if any.
domain = resource.Body('domain', type=dict)
#: Whether the project, if set, is acting as a domain.
is_domain = resource.Body('is_domain', type=bool)
#: The parts of the system the token is scoped to, if system-scoped.
system = resource.Body('system', type=dict)
#: The roles associated with the user.
roles = resource.Body('roles', type=list, list_type=dict)
@classmethod
def validate(
cls,
session: adapter.Adapter,
token: str,
*,
nocatalog: bool = False,
allow_expired: bool = False,
) -> 'Token':
path = cls.base_path
params: dict[str, bool] = {}
if nocatalog:
params['nocatalog'] = nocatalog
if allow_expired:
params['allow_expired'] = allow_expired
response = session.get(
path, headers={'x-subject-token': token}, params=params
)
exceptions.raise_from_response(response)
ret = cls()
ret._translate_response(
response, resource_response_key=cls.resource_key
)
return ret
@classmethod
def check(
cls,
session: adapter.Adapter,
token: str,
*,
allow_expired: bool = False,
) -> bool:
params: dict[str, bool] = {}
if allow_expired:
params['allow_expired'] = allow_expired
response = session.head(
cls.base_path, headers={'x-subject-token': token}, params=params
)
return response.status_code == 200
@classmethod
def revoke(cls, session: adapter.Adapter, token: str) -> None:
response = session.delete(
cls.base_path, headers={'x-subject-token': token}
)
exceptions.raise_from_response(response)

View File

@@ -394,6 +394,38 @@ class TestIdentityProxyUser(TestIdentityProxyBase):
)
class TestIdentityProxyToken(TestIdentityProxyBase):
def test_token_validate(self):
self._verify(
"openstack.identity.v3.token.Token.validate",
self.proxy.validate_token,
method_args=['token'],
method_kwargs={'nocatalog': False, 'allow_expired': False},
expected_args=[self.proxy, 'token'],
expected_kwargs={'nocatalog': False, 'allow_expired': False},
)
def test_token_check(self):
self._verify(
"openstack.identity.v3.token.Token.check",
self.proxy.check_token,
method_args=['token'],
method_kwargs={'allow_expired': False},
expected_args=[self.proxy, 'token'],
expected_kwargs={'allow_expired': False},
)
def test_token_revoke(self):
self._verify(
"openstack.identity.v3.token.Token.revoke",
self.proxy.revoke_token,
method_args=['token'],
method_kwargs={},
expected_args=[self.proxy, 'token'],
expected_kwargs={},
)
class TestIdentityProxyTrust(TestIdentityProxyBase):
def test_trust_create_attrs(self):
self.verify_create(self.proxy.create_trust, trust.Trust)

View File

@@ -0,0 +1,198 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from keystoneauth1 import adapter
from openstack import exceptions
from openstack.identity.v3 import token
from openstack.tests.unit import base
IDENTIFIER = 'IDENTIFIER'
TOKEN_DATA = {
'audit_ids': ['VcxU2JEMTjufVx7sVk7bPw'],
'catalog': [
{
'endpoints': [
{
'id': '068d1b359ee84b438266cb736d81de97',
'interface': 'public',
'region': 'RegionOne',
'region_id': 'RegionOne',
'url': 'http://example.com/v2.1',
}
],
'id': '050726f278654128aba89757ae25950c',
'name': 'nova',
'type': 'compute',
}
],
'domain': {'id': 'default', 'name': 'Default'},
'expires_at': '2013-02-27T18:30:59.999999Z',
'issued_at': '2013-02-27T16:30:59.999999Z',
'methods': ['password'],
'project': {
'domain': {'id': 'default', 'name': 'Default'},
'id': '8538a3f13f9541b28c2620eb19065e45',
'name': 'admin',
},
'roles': [{'id': 'c703057be878458588961ce9a0ce686b', 'name': 'admin'}],
'system': {'all': True},
'user': {
'domain': {'id': 'default', 'name': 'Default'},
'id': '10a2e6e717a245d9acad3e5f97aeca3d',
'name': 'admin',
'password_expires_at': None,
},
'is_domain': False,
}
EXAMPLE = {'token': TOKEN_DATA}
class TestToken(base.TestCase):
def setUp(self):
super().setUp()
self.session = mock.Mock(spec=adapter.Adapter)
def test_basic(self):
sot = token.Token()
self.assertEqual('token', sot.resource_key)
self.assertEqual('/auth/tokens', sot.base_path)
self.assertFalse(sot.allow_fetch)
self.assertFalse(sot.allow_delete)
self.assertFalse(sot.allow_list)
self.assertFalse(sot.allow_head)
def test_make_it(self):
sot = token.Token(**TOKEN_DATA)
self.assertEqual(TOKEN_DATA['audit_ids'], sot.audit_ids)
self.assertEqual(TOKEN_DATA['catalog'], sot.catalog)
self.assertEqual(TOKEN_DATA['expires_at'], sot.expires_at)
self.assertEqual(TOKEN_DATA['issued_at'], sot.issued_at)
self.assertEqual(TOKEN_DATA['methods'], sot.methods)
self.assertEqual(TOKEN_DATA['user'], sot.user)
self.assertEqual(TOKEN_DATA['project'], sot.project)
self.assertEqual(TOKEN_DATA['domain'], sot.domain)
self.assertEqual(TOKEN_DATA['is_domain'], sot.is_domain)
self.assertEqual(TOKEN_DATA['system'], sot.system)
self.assertEqual(TOKEN_DATA['roles'], sot.roles)
def test_validate(self):
response = mock.Mock()
response.status_code = 200
response.json.return_value = EXAMPLE
response.headers = {'content-type': 'application/json'}
self.session.get.return_value = response
result = token.Token.validate(self.session, 'token')
self.session.get.assert_called_once_with(
'/auth/tokens', headers={'x-subject-token': 'token'}, params={}
)
self.assertIsInstance(result, token.Token)
def test_validate_with_params(self):
response = mock.Mock()
response.status_code = 200
response.json.return_value = EXAMPLE
response.headers = {'content-type': 'application/json'}
self.session.get.return_value = response
result = token.Token.validate(
self.session, 'token', nocatalog=True, allow_expired=True
)
self.session.get.assert_called_once_with(
'/auth/tokens',
headers={'x-subject-token': 'token'},
params={'nocatalog': True, 'allow_expired': True},
)
self.assertIsInstance(result, token.Token)
def test_validate_error(self):
response = mock.Mock()
response.status_code = 404
response.json.return_value = {}
response.headers = {'content-type': 'application/json'}
self.session.get.return_value = response
self.assertRaises(
exceptions.NotFoundException,
token.Token.validate,
self.session,
'token',
)
def test_check(self):
response = mock.Mock()
response.status_code = 200
self.session.head.return_value = response
result = token.Token.check(self.session, 'token')
self.session.head.assert_called_once_with(
'/auth/tokens', headers={'x-subject-token': 'token'}, params={}
)
self.assertTrue(result)
def test_check_with_param(self):
response = mock.Mock()
response.status_code = 200
self.session.head.return_value = response
result = token.Token.check(self.session, 'token', allow_expired=True)
self.session.head.assert_called_once_with(
'/auth/tokens',
headers={'x-subject-token': 'token'},
params={'allow_expired': True},
)
self.assertTrue(result)
def test_check_invalid_token(self):
response = mock.Mock()
response.status_code = 404
self.session.head.return_value = response
result = token.Token.check(self.session, 'token')
self.session.head.assert_called_once_with(
'/auth/tokens', headers={'x-subject-token': 'token'}, params={}
)
self.assertFalse(result)
def test_revoke(self):
response = mock.Mock()
response.status_code = 204
self.session.delete.return_value = response
token.Token.revoke(self.session, 'token')
self.session.delete.assert_called_once_with(
'/auth/tokens', headers={'x-subject-token': 'token'}
)
def test_revoke_error(self):
response = mock.Mock()
response.status_code = 404
response.json.return_value = {}
response.headers = {'content-type': 'application/json'}
self.session.delete.return_value = response
self.assertRaises(
exceptions.NotFoundException,
token.Token.revoke,
self.session,
'token',
)