diff --git a/doc/source/policies.rst b/doc/source/policies.rst index 3aacdb4e9a..626a2d5a32 100644 --- a/doc/source/policies.rst +++ b/doc/source/policies.rst @@ -116,6 +116,57 @@ To limit an action to a particular role or roles, you list the roles like so :: The above would add a rule that only allowed users that had roles of either "admin" or "superuser" to delete an image. +Writing Rules +------------- + +Role checks are going to continue to work exactly as they already do. If the +role defined in the check is one that the user holds, then that will pass, +e.g., ``role:admin``. + +To write a generic rule, you need to know that there are three values provided +by Glance that can be used in a rule on the left side of the colon (``:``). +Those values are the current user's credentials in the form of: + +- role +- tenant +- owner + +The left side of the colon can also contain any value that Python can +understand, e.g.,: + +- ``True`` +- ``False`` +- ``"a string"`` +- &c. + +Using ``tenant`` and ``owner`` will only work with images. Consider the +following rule:: + + tenant:%(owner)s + +This will use the ``tenant`` value of the currently authenticated user. It +will also use ``owner`` from the image it is acting upon. If those two +values are equivalent the check will pass. All attributes on an image (as well +as extra image properties) are available for use on the right side of the +colon. The most useful are the following: + +- ``owner`` +- ``protected`` +- ``is_public`` + +Therefore, you could construct a set of rules like the following:: + + { + "not_protected": "False:%(protected)s", + "is_owner": "tenant:%(owner)s", + "is_owner_or_admin": "rule:is_owner or role:admin", + "not_protected_and_is_owner": "rule:not_protected and rule:is_owner", + + "get_image": "rule:is_owner_or_admin", + "delete_image": "rule:not_protected_and_is_owner", + "add_member": "rule:not_protected_and_is_owner" + } + Examples -------- @@ -124,7 +175,7 @@ Example 1. (The default policy configuration) :: { - "default": [] + "default": "" } Note that an empty JSON list means that all methods of the @@ -135,8 +186,8 @@ Example 2. Disallow modification calls to non-admins :: { - "default": [], - "add_image": ["role:admin"], - "modify_image": ["role:admin"], - "delete_image": ["role:admin"] + "default": "", + "add_image": "role:admin", + "modify_image": "role:admin", + "delete_image": "role:admin" } diff --git a/glance/api/policy.py b/glance/api/policy.py index 308fd555fd..cd2385b945 100755 --- a/glance/api/policy.py +++ b/glance/api/policy.py @@ -111,19 +111,25 @@ class ImageRepoProxy(glance.domain.proxy.Repo): item_proxy_kwargs=proxy_kwargs) def get(self, image_id): - self.policy.enforce(self.context, 'get_image', {}) - return super(ImageRepoProxy, self).get(image_id) + try: + image = super(ImageRepoProxy, self).get(image_id) + except exception.NotFound: + self.policy.enforce(self.context, 'get_image', {}) + raise + else: + self.policy.enforce(self.context, 'get_image', ImageTarget(image)) + return image def list(self, *args, **kwargs): self.policy.enforce(self.context, 'get_images', {}) return super(ImageRepoProxy, self).list(*args, **kwargs) def save(self, image, from_state=None): - self.policy.enforce(self.context, 'modify_image', {}) + self.policy.enforce(self.context, 'modify_image', image.target) return super(ImageRepoProxy, self).save(image, from_state=from_state) def add(self, image): - self.policy.enforce(self.context, 'add_image', {}) + self.policy.enforce(self.context, 'add_image', image.target) return super(ImageRepoProxy, self).add(image) @@ -131,6 +137,7 @@ class ImageProxy(glance.domain.proxy.Image): def __init__(self, image, context, policy): self.image = image + self.target = ImageTarget(image) self.context = context self.policy = policy super(ImageProxy, self).__init__(image) @@ -142,7 +149,7 @@ class ImageProxy(glance.domain.proxy.Image): @visibility.setter def visibility(self, value): if value == 'public': - self.policy.enforce(self.context, 'publicize_image', {}) + self.policy.enforce(self.context, 'publicize_image', self.target) self.image.visibility = value @property @@ -154,25 +161,24 @@ class ImageProxy(glance.domain.proxy.Image): def locations(self, value): if not isinstance(value, (list, ImageLocationsProxy)): raise exception.Invalid(_('Invalid locations: %s') % value) - self.policy.enforce(self.context, 'set_image_location', {}) + self.policy.enforce(self.context, 'set_image_location', self.target) new_locations = list(value) if (set([loc['url'] for loc in self.image.locations]) - set([loc['url'] for loc in new_locations])): - self.policy.enforce(self.context, 'delete_image_location', {}) + self.policy.enforce(self.context, 'delete_image_location', + self.target) self.image.locations = new_locations def delete(self): - self.policy.enforce(self.context, 'delete_image', {}) + self.policy.enforce(self.context, 'delete_image', self.target) return self.image.delete() def get_data(self, *args, **kwargs): - target = ImageTarget(self.image) - self.policy.enforce(self.context, 'download_image', - target=target) + self.policy.enforce(self.context, 'download_image', self.target) return self.image.get_data(*args, **kwargs) def set_data(self, *args, **kwargs): - self.policy.enforce(self.context, 'upload_image', {}) + self.policy.enforce(self.context, 'upload_image', self.target) return self.image.set_data(*args, **kwargs) def get_member_repo(self, **kwargs): @@ -210,27 +216,28 @@ class ImageMemberRepoProxy(glance.domain.proxy.Repo): def __init__(self, member_repo, context, policy): self.member_repo = member_repo + self.target = ImageTarget(self.member_repo.image) self.context = context self.policy = policy def add(self, member): - self.policy.enforce(self.context, 'add_member', {}) + self.policy.enforce(self.context, 'add_member', self.target) self.member_repo.add(member) def get(self, member_id): - self.policy.enforce(self.context, 'get_member', {}) + self.policy.enforce(self.context, 'get_member', self.target) return self.member_repo.get(member_id) def save(self, member, from_state=None): - self.policy.enforce(self.context, 'modify_member', {}) + self.policy.enforce(self.context, 'modify_member', self.target) self.member_repo.save(member, from_state=from_state) def list(self, *args, **kwargs): - self.policy.enforce(self.context, 'get_members', {}) + self.policy.enforce(self.context, 'get_members', self.target) return self.member_repo.list(*args, **kwargs) def remove(self, member): - self.policy.enforce(self.context, 'delete_member', {}) + self.policy.enforce(self.context, 'delete_member', self.target) self.member_repo.remove(member) @@ -356,31 +363,38 @@ class TaskFactoryProxy(glance.domain.proxy.TaskFactory): class ImageTarget(object): + SENTINEL = object() - def __init__(self, image): - """ - Initialize the object + def __init__(self, target): + """Initialize the object - :param image: Image object + :param target: Object being targetted """ - self.image = image + self.target = target def __getitem__(self, key): - """ - Returns the value of 'key' from the image if image has that attribute - else tries to retrieve value from the extra_properties of image. + """Return the value of 'key' from the target. + + If the target has the attribute 'key', return it. :param key: value to retrieve """ - # Need to change the key 'id' to 'image_id' as Image object has - # attribute as 'image_id' in case of V2. + key = self.key_transforms(key) + + value = getattr(self.target, key, self.SENTINEL) + if value is self.SENTINEL: + extra_properties = getattr(self.target, 'extra_properties', None) + if extra_properties is not None: + value = extra_properties[key] + else: + value = None + return value + + def key_transforms(self, key): if key == 'id': key = 'image_id' - if hasattr(self.image, key): - return getattr(self.image, key) - else: - return self.image.extra_properties[key] + return key # Metadef Namespace classes diff --git a/glance/tests/functional/v2/test_images.py b/glance/tests/functional/v2/test_images.py index 8777857578..b2bd28729a 100644 --- a/glance/tests/functional/v2/test_images.py +++ b/glance/tests/functional/v2/test_images.py @@ -777,6 +777,290 @@ class TestImages(functional.FunctionalTest): self.stop_servers() + def test_image_modification_works_for_owning_tenant_id(self): + rules = { + "context_is_admin": "role:admin", + "default": "", + "add_image": "", + "get_image": "", + "modify_image": "tenant:%(owner)s", + "upload_image": "", + "get_image_location": "", + "delete_image": "", + "restricted": + "not ('aki':%(container_format)s and role:_member_)", + "download_image": "role:admin or rule:restricted" + } + + self.set_policy_rules(rules) + self.start_servers(**self.__dict__.copy()) + + path = self._url('/v2/images') + headers = self._headers({'content-type': 'application/json', + 'X-Roles': 'admin'}) + data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki', + 'container_format': 'aki'}) + response = requests.post(path, headers=headers, data=data) + self.assertEqual(201, response.status_code) + + # Get the image's ID + image = jsonutils.loads(response.text) + image_id = image['id'] + + path = self._url('/v2/images/%s' % image_id) + media_type = 'application/openstack-images-v2.1-json-patch' + headers['content-type'] = media_type + del headers['X-Roles'] + data = jsonutils.dumps([ + {'op': 'replace', 'path': '/name', 'value': 'new-name'}, + ]) + response = requests.patch(path, headers=headers, data=data) + self.assertEqual(200, response.status_code) + + self.stop_servers() + + def test_image_modification_fails_on_mismatched_tenant_ids(self): + rules = { + "context_is_admin": "role:admin", + "default": "", + "add_image": "", + "get_image": "", + "modify_image": "'A-Fake-Tenant-Id':%(owner)s", + "upload_image": "", + "get_image_location": "", + "delete_image": "", + "restricted": + "not ('aki':%(container_format)s and role:_member_)", + "download_image": "role:admin or rule:restricted" + } + + self.set_policy_rules(rules) + self.start_servers(**self.__dict__.copy()) + + path = self._url('/v2/images') + headers = self._headers({'content-type': 'application/json', + 'X-Roles': 'admin'}) + data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki', + 'container_format': 'aki'}) + response = requests.post(path, headers=headers, data=data) + self.assertEqual(201, response.status_code) + + # Get the image's ID + image = jsonutils.loads(response.text) + image_id = image['id'] + + path = self._url('/v2/images/%s' % image_id) + media_type = 'application/openstack-images-v2.1-json-patch' + headers['content-type'] = media_type + del headers['X-Roles'] + data = jsonutils.dumps([ + {'op': 'replace', 'path': '/name', 'value': 'new-name'}, + ]) + response = requests.patch(path, headers=headers, data=data) + self.assertEqual(403, response.status_code) + + self.stop_servers() + + def test_member_additions_works_for_owning_tenant_id(self): + rules = { + "context_is_admin": "role:admin", + "default": "", + "add_image": "", + "get_image": "", + "modify_image": "", + "upload_image": "", + "get_image_location": "", + "delete_image": "", + "restricted": + "not ('aki':%(container_format)s and role:_member_)", + "download_image": "role:admin or rule:restricted", + "add_member": "tenant:%(owner)s", + } + + self.set_policy_rules(rules) + self.start_servers(**self.__dict__.copy()) + + path = self._url('/v2/images') + headers = self._headers({'content-type': 'application/json', + 'X-Roles': 'admin'}) + data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki', + 'container_format': 'aki'}) + response = requests.post(path, headers=headers, data=data) + self.assertEqual(201, response.status_code) + + # Get the image's ID + image = jsonutils.loads(response.text) + image_id = image['id'] + + # Get the image's members resource + path = self._url('/v2/images/%s/members' % image_id) + body = jsonutils.dumps({'member': TENANT3}) + del headers['X-Roles'] + response = requests.post(path, headers=headers, data=body) + self.assertEqual(200, response.status_code) + + self.stop_servers() + + def test_image_additions_works_only_for_specific_tenant_id(self): + rules = { + "context_is_admin": "role:admin", + "default": "", + "add_image": "'{0}':%(owner)s".format(TENANT1), + "get_image": "", + "modify_image": "", + "upload_image": "", + "get_image_location": "", + "delete_image": "", + "restricted": + "not ('aki':%(container_format)s and role:_member_)", + "download_image": "role:admin or rule:restricted", + "add_member": "", + } + + self.set_policy_rules(rules) + self.start_servers(**self.__dict__.copy()) + + path = self._url('/v2/images') + headers = self._headers({'content-type': 'application/json', + 'X-Roles': 'admin', 'X-Tenant-Id': TENANT1}) + data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki', + 'container_format': 'aki'}) + response = requests.post(path, headers=headers, data=data) + self.assertEqual(201, response.status_code) + + headers['X-Tenant-Id'] = TENANT2 + response = requests.post(path, headers=headers, data=data) + self.assertEqual(403, response.status_code) + + self.stop_servers() + + def test_owning_tenant_id_can_retrieve_image_information(self): + rules = { + "context_is_admin": "role:admin", + "default": "", + "add_image": "", + "get_image": "tenant:%(owner)s", + "modify_image": "", + "upload_image": "", + "get_image_location": "", + "delete_image": "", + "restricted": + "not ('aki':%(container_format)s and role:_member_)", + "download_image": "role:admin or rule:restricted", + "add_member": "", + } + + self.set_policy_rules(rules) + self.start_servers(**self.__dict__.copy()) + + path = self._url('/v2/images') + headers = self._headers({'content-type': 'application/json', + 'X-Roles': 'admin', 'X-Tenant-Id': TENANT1}) + data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki', + 'container_format': 'aki'}) + response = requests.post(path, headers=headers, data=data) + self.assertEqual(201, response.status_code) + + # Remove the admin role + del headers['X-Roles'] + # Get the image's ID + image = jsonutils.loads(response.text) + image_id = image['id'] + + # Can retrieve the image as TENANT1 + path = self._url('/v2/images/%s' % image_id) + response = requests.get(path, headers=headers) + self.assertEqual(200, response.status_code) + + # Can retrieve the image's members as TENANT1 + path = self._url('/v2/images/%s/members' % image_id) + response = requests.get(path, headers=headers) + self.assertEqual(200, response.status_code) + + headers['X-Tenant-Id'] = TENANT2 + response = requests.get(path, headers=headers) + self.assertEqual(403, response.status_code) + + self.stop_servers() + + def test_owning_tenant_can_publicize_image(self): + rules = { + "context_is_admin": "role:admin", + "default": "", + "add_image": "", + "publicize_iamge": "tenant:%(owner)s", + "get_image": "tenant:%(owner)s", + "modify_image": "", + "upload_image": "", + "get_image_location": "", + "delete_image": "", + "restricted": + "not ('aki':%(container_format)s and role:_member_)", + "download_image": "role:admin or rule:restricted", + "add_member": "", + } + + self.set_policy_rules(rules) + self.start_servers(**self.__dict__.copy()) + + path = self._url('/v2/images') + headers = self._headers({'content-type': 'application/json', + 'X-Roles': 'admin', 'X-Tenant-Id': TENANT1}) + data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki', + 'container_format': 'aki'}) + response = requests.post(path, headers=headers, data=data) + self.assertEqual(201, response.status_code) + + # Get the image's ID + image = jsonutils.loads(response.text) + image_id = image['id'] + + path = self._url('/v2/images/%s' % image_id) + headers = self._headers({ + 'Content-Type': 'application/openstack-images-v2.1-json-patch', + 'X-Tenant-Id': TENANT1, + }) + doc = [{'op': 'replace', 'path': '/visibility', 'value': 'public'}] + data = jsonutils.dumps(doc) + response = requests.patch(path, headers=headers, data=data) + self.assertEqual(200, response.status_code) + + def test_owning_tenant_can_delete_image(self): + rules = { + "context_is_admin": "role:admin", + "default": "", + "add_image": "", + "publicize_iamge": "tenant:%(owner)s", + "get_image": "tenant:%(owner)s", + "modify_image": "", + "upload_image": "", + "get_image_location": "", + "delete_image": "", + "restricted": + "not ('aki':%(container_format)s and role:_member_)", + "download_image": "role:admin or rule:restricted", + "add_member": "", + } + + self.set_policy_rules(rules) + self.start_servers(**self.__dict__.copy()) + + path = self._url('/v2/images') + headers = self._headers({'content-type': 'application/json', + 'X-Roles': 'admin', 'X-Tenant-Id': TENANT1}) + data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki', + 'container_format': 'aki'}) + response = requests.post(path, headers=headers, data=data) + self.assertEqual(201, response.status_code) + + # Get the image's ID + image = jsonutils.loads(response.text) + image_id = image['id'] + + path = self._url('/v2/images/%s' % image_id) + response = requests.delete(path, headers=headers) + self.assertEqual(204, response.status_code) + def test_image_size_cap(self): self.api_server.image_size_cap = 128 self.start_servers(**self.__dict__.copy()) diff --git a/glance/tests/unit/test_policy.py b/glance/tests/unit/test_policy.py index 30e140ba40..1bbfee28bc 100644 --- a/glance/tests/unit/test_policy.py +++ b/glance/tests/unit/test_policy.py @@ -72,6 +72,8 @@ class ImageFactoryStub(object): class MemberRepoStub(object): + image = None + def add(self, image_member): image_member.output = 'member_repo_add' @@ -207,41 +209,53 @@ class TestImagePolicy(test_utils.BaseTestCase): self.assertRaises(exception.Forbidden, setattr, image, 'visibility', 'public') self.assertEqual('private', image.visibility) - self.policy.enforce.assert_called_once_with({}, "publicize_image", {}) + self.policy.enforce.assert_called_once_with({}, "publicize_image", + image.target) def test_publicize_image_allowed(self): image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy) image.visibility = 'public' self.assertEqual('public', image.visibility) - self.policy.enforce.assert_called_once_with({}, "publicize_image", {}) + self.policy.enforce.assert_called_once_with({}, "publicize_image", + image.target) def test_delete_image_not_allowed(self): self.policy.enforce.side_effect = exception.Forbidden image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy) self.assertRaises(exception.Forbidden, image.delete) self.assertEqual('active', image.status) - self.policy.enforce.assert_called_once_with({}, "delete_image", {}) + self.policy.enforce.assert_called_once_with({}, "delete_image", + image.target) def test_delete_image_allowed(self): image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy) image.delete() self.assertEqual('deleted', image.status) - self.policy.enforce.assert_called_once_with({}, "delete_image", {}) + self.policy.enforce.assert_called_once_with({}, "delete_image", + image.target) def test_get_image_not_allowed(self): self.policy.enforce.side_effect = exception.Forbidden - image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub, - {}, self.policy) - self.assertRaises(exception.Forbidden, image_repo.get, UUID1) - self.policy.enforce.assert_called_once_with({}, "get_image", {}) + image_target = mock.Mock() + with mock.patch.object(glance.api.policy, 'ImageTarget') as target: + target.return_value = image_target + image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub, + {}, self.policy) + self.assertRaises(exception.Forbidden, image_repo.get, UUID1) + self.policy.enforce.assert_called_once_with({}, "get_image", + image_target) def test_get_image_allowed(self): - image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub, - {}, self.policy) - output = image_repo.get(UUID1) + image_target = mock.Mock() + with mock.patch.object(glance.api.policy, 'ImageTarget') as target: + target.return_value = image_target + image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub, + {}, self.policy) + output = image_repo.get(UUID1) self.assertIsInstance(output, glance.api.policy.ImageProxy) self.assertEqual('image_from_get', output.image) - self.policy.enforce.assert_called_once_with({}, "get_image", {}) + self.policy.enforce.assert_called_once_with({}, "get_image", + image_target) def test_get_images_not_allowed(self): self.policy.enforce.side_effect = exception.Forbidden @@ -265,14 +279,16 @@ class TestImagePolicy(test_utils.BaseTestCase): {}, self.policy) image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy) self.assertRaises(exception.Forbidden, image_repo.save, image) - self.policy.enforce.assert_called_once_with({}, "modify_image", {}) + self.policy.enforce.assert_called_once_with({}, "modify_image", + image.target) def test_modify_image_allowed(self): image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub, {}, self.policy) image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy) image_repo.save(image) - self.policy.enforce.assert_called_once_with({}, "modify_image", {}) + self.policy.enforce.assert_called_once_with({}, "modify_image", + image.target) def test_add_image_not_allowed(self): self.policy.enforce.side_effect = exception.Forbidden @@ -280,14 +296,16 @@ class TestImagePolicy(test_utils.BaseTestCase): {}, self.policy) image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy) self.assertRaises(exception.Forbidden, image_repo.add, image) - self.policy.enforce.assert_called_once_with({}, "add_image", {}) + self.policy.enforce.assert_called_once_with({}, "add_image", + image.target) def test_add_image_allowed(self): image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub, {}, self.policy) image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy) image_repo.add(image) - self.policy.enforce.assert_called_once_with({}, "add_image", {}) + self.policy.enforce.assert_called_once_with({}, "add_image", + image.target) def test_new_image_visibility(self): self.policy.enforce.side_effect = exception.Forbidden @@ -308,20 +326,21 @@ class TestImagePolicy(test_utils.BaseTestCase): 'test_key': 'test_4321' } image_stub = ImageStub(UUID1, extra_properties=extra_properties) - image = glance.api.policy.ImageProxy(image_stub, {}, self.policy) + with mock.patch('glance.api.policy.ImageTarget'): + image = glance.api.policy.ImageProxy(image_stub, {}, self.policy) + target = image.target self.policy.enforce.side_effect = exception.Forbidden - glance.api.policy.ImageTarget = mock.Mock() - target = glance.api.policy.ImageTarget(image) self.assertRaises(exception.Forbidden, image.get_data) self.policy.enforce.assert_called_once_with({}, "download_image", - target=target) + target) def test_image_set_data(self): self.policy.enforce.side_effect = exception.Forbidden image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy) self.assertRaises(exception.Forbidden, image.set_data) - self.policy.enforce.assert_called_once_with({}, "upload_image", {}) + self.policy.enforce.assert_called_once_with({}, "upload_image", + image.target) class TestMemberPolicy(test_utils.BaseTestCase): @@ -330,60 +349,71 @@ class TestMemberPolicy(test_utils.BaseTestCase): self.policy.enforce = mock.Mock() self.member_repo = glance.api.policy.ImageMemberRepoProxy( MemberRepoStub(), {}, self.policy) + self.target = self.member_repo.target super(TestMemberPolicy, self).setUp() def test_add_member_not_allowed(self): self.policy.enforce.side_effect = exception.Forbidden self.assertRaises(exception.Forbidden, self.member_repo.add, '') - self.policy.enforce.assert_called_once_with({}, "add_member", {}) + self.policy.enforce.assert_called_once_with({}, "add_member", + self.target) def test_add_member_allowed(self): image_member = ImageMembershipStub() self.member_repo.add(image_member) self.assertEqual('member_repo_add', image_member.output) - self.policy.enforce.assert_called_once_with({}, "add_member", {}) + self.policy.enforce.assert_called_once_with({}, "add_member", + self.target) def test_get_member_not_allowed(self): self.policy.enforce.side_effect = exception.Forbidden self.assertRaises(exception.Forbidden, self.member_repo.get, '') - self.policy.enforce.assert_called_once_with({}, "get_member", {}) + self.policy.enforce.assert_called_once_with({}, "get_member", + self.target) def test_get_member_allowed(self): output = self.member_repo.get('') self.assertEqual('member_repo_get', output) - self.policy.enforce.assert_called_once_with({}, "get_member", {}) + self.policy.enforce.assert_called_once_with({}, "get_member", + self.target) def test_modify_member_not_allowed(self): self.policy.enforce.side_effect = exception.Forbidden self.assertRaises(exception.Forbidden, self.member_repo.save, '') - self.policy.enforce.assert_called_once_with({}, "modify_member", {}) + self.policy.enforce.assert_called_once_with({}, "modify_member", + self.target) def test_modify_member_allowed(self): image_member = ImageMembershipStub() self.member_repo.save(image_member) self.assertEqual('member_repo_save', image_member.output) - self.policy.enforce.assert_called_once_with({}, "modify_member", {}) + self.policy.enforce.assert_called_once_with({}, "modify_member", + self.target) def test_get_members_not_allowed(self): self.policy.enforce.side_effect = exception.Forbidden self.assertRaises(exception.Forbidden, self.member_repo.list, '') - self.policy.enforce.assert_called_once_with({}, "get_members", {}) + self.policy.enforce.assert_called_once_with({}, "get_members", + self.target) def test_get_members_allowed(self): output = self.member_repo.list('') self.assertEqual('member_repo_list', output) - self.policy.enforce.assert_called_once_with({}, "get_members", {}) + self.policy.enforce.assert_called_once_with({}, "get_members", + self.target) def test_delete_member_not_allowed(self): self.policy.enforce.side_effect = exception.Forbidden self.assertRaises(exception.Forbidden, self.member_repo.remove, '') - self.policy.enforce.assert_called_once_with({}, "delete_member", {}) + self.policy.enforce.assert_called_once_with({}, "delete_member", + self.target) def test_delete_member_allowed(self): image_member = ImageMembershipStub() self.member_repo.remove(image_member) self.assertEqual('member_repo_remove', image_member.output) - self.policy.enforce.assert_called_once_with({}, "delete_member", {}) + self.policy.enforce.assert_called_once_with({}, "delete_member", + self.target) class TestTaskPolicy(test_utils.BaseTestCase):