feat (proxy/admin): allow partition modifications
This patchset adds one new method to the partitions API - PATCH /v1/partitions/{partition} The partitions storage layer is also updated to support a simpler update interface: - update(self, name, **kwargs) [with kwargs in ('hosts', 'weight')] The partitions transport layer is cleared of cruft and reuses JSON handling utilities from the queues transport layer. This patch implementation uses jsonschema to achieve self-documenting, strict format validation of the incoming input. The partition storage controllers are modified accordingly, as are the tests. More tests are added for the partitions transport layer to verify the correctness and to ease future refactorings. Change-Id: Ifa92b1225421196f95131c2b74e3c07b07c4cfd4 Implements: blueprint placement-service Closes-Bug: 1230841
This commit is contained in:
@@ -96,6 +96,19 @@ class PartitionsBase(ControllerBase):
|
||||
"""Drops all partitions from storage."""
|
||||
raise NotImplementedError
|
||||
|
||||
def update(self, name, **kwargs):
|
||||
"""Updates the weight or hosts of this partition.
|
||||
|
||||
:param name: Name of the partition
|
||||
:type name: text
|
||||
:param weight: Weight, > 0
|
||||
:type weight: int
|
||||
:param kwargs: one of 'hosts' or 'weight'
|
||||
:type kwargs: dict
|
||||
:raises: PartitionNotFound
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class CatalogueBase(ControllerBase):
|
||||
|
@@ -45,6 +45,14 @@ class PartitionsController(base.PartitionsBase):
|
||||
'w': weight,
|
||||
'h': hosts}
|
||||
|
||||
def update(self, name, **kwargs):
|
||||
key, value = kwargs.popitem()
|
||||
assert key in ('weight', 'hosts'), "kwargs (hosts, weight)"
|
||||
try:
|
||||
self._col[name][key[0]] = value
|
||||
except KeyError:
|
||||
raise exceptions.PartitionNotFound(name)
|
||||
|
||||
def delete(self, name):
|
||||
try:
|
||||
del self._col[name]
|
||||
|
@@ -84,6 +84,16 @@ class PartitionsController(base.PartitionsBase):
|
||||
self._col.drop()
|
||||
self._col.ensure_index(PARTITIONS_INDEX, unique=True)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def update(self, name, **kwargs):
|
||||
key, value = kwargs.popitem()
|
||||
assert key in ('hosts', 'weight'), "kwargs (hosts, weight)"
|
||||
res = self._col.update({'n': name},
|
||||
{'$set': {key[0]: value}},
|
||||
upsert=False)
|
||||
if not res['updatedExisting']:
|
||||
raise exceptions.PartitionNotFound(name)
|
||||
|
||||
|
||||
def _normalize(entry):
|
||||
return {
|
||||
|
44
marconi/proxy/transport/schema.py
Normal file
44
marconi/proxy/transport/schema.py
Normal file
@@ -0,0 +1,44 @@
|
||||
# Copyright (c) 2013 Rackspace Hosting, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""schema: JSON Schemas for marconi proxy transports."""
|
||||
|
||||
partition_patch_hosts = {
|
||||
'type': 'object', 'properties': {
|
||||
'hosts': {
|
||||
'type': 'array', 'minItems': 1, 'items': {
|
||||
'type': 'string'
|
||||
}
|
||||
},
|
||||
'additionalProperties': False
|
||||
}
|
||||
}
|
||||
|
||||
partition_patch_weight = {
|
||||
'type': 'object', 'properties': {
|
||||
'weight': {
|
||||
'type': 'integer', 'minimum': 1, 'maximum': 2**32 - 1
|
||||
},
|
||||
'additionalProperties': False
|
||||
}
|
||||
}
|
||||
|
||||
partition_create = {
|
||||
'type': 'object', 'properties': {
|
||||
'weight': partition_patch_weight['properties']['weight'],
|
||||
'hosts': partition_patch_hosts['properties']['hosts']
|
||||
},
|
||||
'required': ['hosts', 'weight'],
|
||||
'additionalProperties': False
|
||||
}
|
37
marconi/proxy/transport/utils.py
Normal file
37
marconi/proxy/transport/utils.py
Normal file
@@ -0,0 +1,37 @@
|
||||
# Copyright (c) 2013 Rackspace Hosting, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""utils: utilities for transport handling."""
|
||||
|
||||
import jsonschema
|
||||
|
||||
from marconi.queues.transport.wsgi import exceptions as wsgi_errors
|
||||
|
||||
|
||||
# TODO(cpp-cabrera): generalize this
|
||||
def validate(validator, document):
|
||||
"""Verifies a document against a schema.
|
||||
|
||||
:param validator: a validator to use to check validity
|
||||
:type validator: jsonschema.Draft4Validator
|
||||
:param document: document to check
|
||||
:type document: dict
|
||||
:raises: wsgi_errors.HTTPBadRequestBody
|
||||
"""
|
||||
try:
|
||||
validator.validate(document)
|
||||
except jsonschema.ValidationError as ex:
|
||||
raise wsgi_errors.HTTPBadRequestBody(
|
||||
'{0}: {1}'.format(ex.args, ex.message)
|
||||
)
|
@@ -27,8 +27,29 @@ following fields are required:
|
||||
import json
|
||||
|
||||
import falcon
|
||||
import jsonschema
|
||||
|
||||
from marconi.proxy.storage import exceptions
|
||||
from marconi.proxy.transport import schema, utils
|
||||
from marconi.queues.transport import utils as json_utils
|
||||
from marconi.queues.transport.wsgi import exceptions as wsgi_errors
|
||||
|
||||
|
||||
def load(req):
|
||||
"""Reads request body, raising an exception if it is not JSON.
|
||||
|
||||
:param req: The request object to read from
|
||||
:type req: falcon.Request
|
||||
:return: a dictionary decoded from the JSON stream
|
||||
:rtype: dict
|
||||
:raises: wsgi_errors.HTTPBadRequestBody
|
||||
"""
|
||||
try:
|
||||
return json_utils.read_json(req.stream, req.content_length)
|
||||
except (json_utils.MalformedJSON, json_utils.OverflowedJSONInteger):
|
||||
raise wsgi_errors.HTTPBadRequestBody(
|
||||
'JSON could not be parsed.'
|
||||
)
|
||||
|
||||
|
||||
class Listing(object):
|
||||
@@ -69,6 +90,10 @@ class Resource(object):
|
||||
"""
|
||||
def __init__(self, partitions_controller):
|
||||
self._ctrl = partitions_controller
|
||||
validator_type = jsonschema.Draft4Validator
|
||||
self._put_validator = validator_type(schema.partition_create)
|
||||
self._hosts_validator = validator_type(schema.partition_patch_hosts)
|
||||
self._weight_validator = validator_type(schema.partition_patch_weight)
|
||||
|
||||
def on_get(self, request, response, partition):
|
||||
"""Returns a JSON object for a single partition entry:
|
||||
@@ -89,39 +114,6 @@ class Resource(object):
|
||||
'weight': data['weight'],
|
||||
}, ensure_ascii=False)
|
||||
|
||||
def _validate_put(self, data):
|
||||
if not isinstance(data, dict):
|
||||
raise falcon.HTTPBadRequest(
|
||||
'Invalid metadata', 'Define a partition as a dict'
|
||||
)
|
||||
|
||||
if 'hosts' not in data:
|
||||
raise falcon.HTTPBadRequest(
|
||||
'Missing hosts list', 'Provide a list of hosts'
|
||||
)
|
||||
|
||||
if not data['hosts']:
|
||||
raise falcon.HTTPBadRequest(
|
||||
'Empty hosts list', 'Hosts list cannot be empty'
|
||||
)
|
||||
|
||||
if not isinstance(data['hosts'], list):
|
||||
raise falcon.HTTPBadRequest(
|
||||
'Invalid hosts', 'Hosts must be a list of URLs'
|
||||
)
|
||||
|
||||
# TODO(cpp-cabrera): check [str]
|
||||
if 'weight' not in data:
|
||||
raise falcon.HTTPBadRequest(
|
||||
'Missing weight',
|
||||
'Provide an integer weight for this partition'
|
||||
)
|
||||
|
||||
if not isinstance(data['weight'], int):
|
||||
raise falcon.HTTPBadRequest(
|
||||
'Invalid weight', 'Weight must be an integer'
|
||||
)
|
||||
|
||||
def on_put(self, request, response, partition):
|
||||
"""Creates a new partition. Expects the following input:
|
||||
|
||||
@@ -129,23 +121,12 @@ class Resource(object):
|
||||
|
||||
:returns: HTTP | [201, 204]
|
||||
"""
|
||||
if partition.startswith('_'):
|
||||
raise falcon.HTTPBadRequest(
|
||||
'Reserved name', '_names are reserved for internal use'
|
||||
)
|
||||
|
||||
if self._ctrl.exists(partition):
|
||||
response.status = falcon.HTTP_204
|
||||
return
|
||||
|
||||
try:
|
||||
data = json.loads(request.stream.read().decode('utf8'))
|
||||
except ValueError:
|
||||
raise falcon.HTTPBadRequest(
|
||||
'Invalid JSON', 'This is not a valid JSON stream.'
|
||||
)
|
||||
|
||||
self._validate_put(data)
|
||||
data = load(request)
|
||||
utils.validate(self._put_validator, data)
|
||||
self._ctrl.create(partition,
|
||||
weight=data['weight'],
|
||||
hosts=data['hosts'])
|
||||
@@ -158,3 +139,32 @@ class Resource(object):
|
||||
"""
|
||||
self._ctrl.delete(partition)
|
||||
response.status = falcon.HTTP_204
|
||||
|
||||
def on_patch(self, request, response, partition):
|
||||
"""Allows one to update a partition's weight and/or hosts.
|
||||
|
||||
This method expects the user to submit a JSON object
|
||||
containing both or either of 'hosts' and 'weight'. If neither
|
||||
is found, the request is flagged as bad. There is also strict
|
||||
format checking through the use of jsonschema. Appropriate
|
||||
errors are returned in each case for badly formatted input.
|
||||
|
||||
:returns: HTTP | 200,400
|
||||
|
||||
"""
|
||||
data = load(request)
|
||||
|
||||
if 'weight' not in data and 'hosts' not in data:
|
||||
raise wsgi_errors.HTTPBadRequestBody(
|
||||
'One of `hosts` or `weight` needs to be specified'
|
||||
)
|
||||
|
||||
utils.validate(self._weight_validator, data)
|
||||
utils.validate(self._hosts_validator, data)
|
||||
try:
|
||||
if 'weight' in data:
|
||||
self._ctrl.update(partition, weight=data['weight'])
|
||||
if 'hosts' in data:
|
||||
self._ctrl.update(partition, hosts=data['hosts'])
|
||||
except exceptions.PartitionNotFound:
|
||||
raise falcon.HTTPNotFound()
|
||||
|
@@ -3,6 +3,7 @@ pbr>=0.5.21,<1.0
|
||||
Babel>=0.9.6
|
||||
netaddr
|
||||
falcon>=0.1.6,<0.1.7
|
||||
jsonschema>=1.3.0,!=1.4.0
|
||||
iso8601>=0.1.4
|
||||
msgpack-python
|
||||
pymongo>=2.4
|
||||
|
@@ -52,6 +52,7 @@ class PartitionsControllerTest(ControllerBaseTest):
|
||||
self.name = six.text_type(uuid.uuid1())
|
||||
|
||||
def tearDown(self):
|
||||
self.controller.drop_all()
|
||||
super(PartitionsControllerTest, self).tearDown()
|
||||
|
||||
def _check_structure(self, partition):
|
||||
@@ -88,6 +89,19 @@ class PartitionsControllerTest(ControllerBaseTest):
|
||||
# verify it isn't listable
|
||||
self.assertEqual(len(list(self.controller.list())), 0)
|
||||
|
||||
def test_partition_updates(self):
|
||||
with helpers.partition(self.controller, 'a', 10, ['a']):
|
||||
self.controller.update('a', weight=11)
|
||||
self.assertEqual(self.controller.get('a')['weight'], 11)
|
||||
|
||||
self.controller.update('a', hosts=['b'])
|
||||
self.assertEqual(self.controller.get('a')['hosts'], ['b'])
|
||||
|
||||
def test_update_on_nonexisting_raises(self):
|
||||
self.assertRaises(exceptions.PartitionNotFound,
|
||||
self.controller.update,
|
||||
'a', weight=10)
|
||||
|
||||
def test_list(self):
|
||||
with helpers.partitions(self.controller, 10) as expect:
|
||||
values = zip(self.controller.list(), expect)
|
||||
@@ -131,6 +145,7 @@ class CatalogueControllerTest(ControllerBaseTest):
|
||||
self.project = six.text_type(uuid.uuid1())
|
||||
|
||||
def tearDown(self):
|
||||
self.controller.drop_all()
|
||||
super(CatalogueControllerTest, self).tearDown()
|
||||
|
||||
def _check_structure(self, entry):
|
||||
|
@@ -16,7 +16,9 @@
|
||||
|
||||
import copy
|
||||
import json
|
||||
import uuid
|
||||
|
||||
import ddt
|
||||
import falcon
|
||||
|
||||
import base # noqa
|
||||
@@ -120,10 +122,93 @@ class PartitionTest(base.TestBase):
|
||||
body=json.dumps(doc))
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
def test_reserved_partition(self):
|
||||
doc = {'hosts': ['url'],
|
||||
'weight': 10}
|
||||
def test_fetch_nonexisting_partition_404s(self):
|
||||
self.simulate_get('/v1/partition/no')
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
self.simulate_put('/v1/partitions/__cplusplus',
|
||||
body=json.dumps(doc))
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_400)
|
||||
def test_patch_nonexisting_partition_404s(self):
|
||||
doc = {'weight': 1}
|
||||
self.simulate_patch('/v1/partition/no', body=json.dumps(doc))
|
||||
self.assertEquals(self.srmock.status, falcon.HTTP_404)
|
||||
|
||||
def test_bad_json_on_put_raises_bad_request(self):
|
||||
self.simulate_put('/v1/partitions/bad_json', body="")
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class ExistingPartitionTest(base.TestBase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(ExistingPartitionTest, cls).setUpClass()
|
||||
|
||||
def setUp(self):
|
||||
super(ExistingPartitionTest, self).setUp()
|
||||
self.weight = 100
|
||||
self.hosts = ['a']
|
||||
self.name = str(uuid.uuid1())
|
||||
self.partition_uri = '/v1/partitions/' + self.name
|
||||
doc = {'weight': self.weight, 'hosts': self.hosts}
|
||||
self.simulate_put(self.partition_uri, body=json.dumps(doc))
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_201)
|
||||
|
||||
def tearDown(self):
|
||||
self.simulate_delete(self.partition_uri)
|
||||
super(ExistingPartitionTest, self).tearDown()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
super(ExistingPartitionTest, cls).tearDownClass()
|
||||
|
||||
def test_put_on_existing_partition_204s(self):
|
||||
self.simulate_put(self.partition_uri, body="")
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_204)
|
||||
|
||||
def test_patch_weight(self):
|
||||
doc = {'weight': self.weight + 1}
|
||||
self.simulate_patch(self.partition_uri,
|
||||
body=json.dumps(doc))
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||
|
||||
result = self.simulate_get(self.partition_uri)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||
doc = json.loads(result[0])
|
||||
self.assertEqual(doc['weight'], self.weight + 1)
|
||||
|
||||
def test_patch_hosts(self):
|
||||
doc = {'hosts': self.hosts + ['b']}
|
||||
self.simulate_patch(self.partition_uri,
|
||||
body=json.dumps(doc))
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||
|
||||
result = self.simulate_get(self.partition_uri)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_200)
|
||||
doc = json.loads(result[0])
|
||||
self.assertEqual(doc['hosts'], self.hosts + ['b'])
|
||||
|
||||
def test_partition_route_respects_allowed(self):
|
||||
for method in ('head', 'post'):
|
||||
getattr(self, 'simulate_' + method)(self.partition_uri)
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_405)
|
||||
|
||||
def test_bad_json_on_patch_raises_bad_request(self):
|
||||
self.simulate_patch(self.partition_uri, body="{")
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
def test_patch_fails_validation_when_missing_hosts_and_weight(self):
|
||||
doc = {'winning': 1}
|
||||
self.simulate_patch(self.partition_uri, body=json.dumps(doc))
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
@ddt.data(-10, -1, 0, 'a', 2**64 + 1)
|
||||
def test_patch_fails_validation_with_invalid_weight(self, weight):
|
||||
doc = {'weight': weight}
|
||||
self.simulate_patch(self.partition_uri, body=json.dumps(doc))
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_400)
|
||||
|
||||
@ddt.data([], 'localhost', 1)
|
||||
def test_patch_fails_validation_with_invalid_hosts(self, hosts):
|
||||
doc = {'hosts': hosts}
|
||||
self.simulate_patch(self.partition_uri, body=json.dumps(doc))
|
||||
self.assertEqual(self.srmock.status, falcon.HTTP_400)
|
||||
|
Reference in New Issue
Block a user