From 65af204a1ccefbc59dcc9e0bba3d0914bea92785 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Weing=C3=A4rtner?= Date: Fri, 2 Jul 2021 10:26:11 -0300 Subject: [PATCH] Introduce reprocessing API Depends-on: https://review.opendev.org/c/openstack/cloudkitty/+/777442 Depends-on: https://review.opendev.org/c/openstack/requirements/+/799315 Implements: https://review.opendev.org/c/openstack/cloudkitty-specs/+/791245 Change-Id: Idb0032eba17d83409344ab58153097ac70814e86 --- .gitignore | 1 + cloudkitty/api/v2/__init__.py | 1 + cloudkitty/api/v2/scope/state.py | 5 +- cloudkitty/api/v2/task/__init__.py | 35 + cloudkitty/api/v2/task/reprocess.py | 280 ++++++ cloudkitty/cli/processor.py | 2 +- cloudkitty/common/policies/__init__.py | 2 + cloudkitty/common/policies/v2/tasks.py | 36 + cloudkitty/orchestrator.py | 392 ++++++-- cloudkitty/storage/v2/influx.py | 4 + cloudkitty/storage_state/__init__.py | 139 +++ ...4d69395f_add_storage_scope_state_fields.py | 1 - .../9feccd32_create_reprocessing_scheduler.py | 45 + cloudkitty/storage_state/models.py | 44 + cloudkitty/tests/api/v2/task/__init__.py | 0 .../tests/api/v2/task/test_reprocess.py | 381 ++++++++ cloudkitty/tests/test_orchestrator.py | 863 +++++++++++++++++- doc/source/_static/cloudkitty.conf.sample | 12 +- .../_static/cloudkitty.policy.yaml.sample | 8 + doc/source/api-reference/v2/index.rst | 1 + .../api-reference/v2/task/reprocessing.inc | 147 +++ .../v2/task/reprocessing_parameters.yml | 56 ++ lower-constraints.txt | 3 +- ...uce-reprocessing-api-822db3edc256507a.yaml | 5 + requirements.txt | 3 +- 25 files changed, 2353 insertions(+), 113 deletions(-) create mode 100644 cloudkitty/api/v2/task/__init__.py create mode 100644 cloudkitty/api/v2/task/reprocess.py create mode 100644 cloudkitty/common/policies/v2/tasks.py create mode 100644 cloudkitty/storage_state/alembic/versions/9feccd32_create_reprocessing_scheduler.py create mode 100644 cloudkitty/tests/api/v2/task/__init__.py create mode 100644 cloudkitty/tests/api/v2/task/test_reprocess.py create mode 100644 doc/source/api-reference/v2/task/reprocessing.inc create mode 100644 doc/source/api-reference/v2/task/reprocessing_parameters.yml create mode 100644 releasenotes/notes/introduce-reprocessing-api-822db3edc256507a.yaml diff --git a/.gitignore b/.gitignore index 71ea2839..e9296f50 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ var/ *.egg cloudkitty.egg-info .idea/ +.python-version # Configuration file etc/cloudkitty/cloudkitty.conf.sample diff --git a/cloudkitty/api/v2/__init__.py b/cloudkitty/api/v2/__init__.py index dea0e560..cb910b5b 100644 --- a/cloudkitty/api/v2/__init__.py +++ b/cloudkitty/api/v2/__init__.py @@ -34,6 +34,7 @@ API_MODULES = [ 'cloudkitty.api.v2.scope', 'cloudkitty.api.v2.dataframes', 'cloudkitty.api.v2.summary', + 'cloudkitty.api.v2.task' ] diff --git a/cloudkitty/api/v2/scope/state.py b/cloudkitty/api/v2/scope/state.py index f380789b..71afd231 100644 --- a/cloudkitty/api/v2/scope/state.py +++ b/cloudkitty/api/v2/scope/state.py @@ -196,7 +196,7 @@ class ScopeState(base.BaseResource): 'scope:patch_state', {'tenant_id': scope_id or flask.request.context.project_id} ) - results = self._storage_state.get_all(identifier=scope_id) + results = self._storage_state.get_all(identifier=scope_id, active=None) if len(results) < 1: raise http_exceptions.NotFound( @@ -217,7 +217,8 @@ class ScopeState(base.BaseResource): collector=collector, active=active) - storage_scopes = self._storage_state.get_all(identifier=scope_id) + storage_scopes = self._storage_state.get_all( + identifier=scope_id, active=active) update_storage_scope = storage_scopes[0] return { 'scope_id': update_storage_scope.identifier, diff --git a/cloudkitty/api/v2/task/__init__.py b/cloudkitty/api/v2/task/__init__.py new file mode 100644 index 00000000..7a07ee81 --- /dev/null +++ b/cloudkitty/api/v2/task/__init__.py @@ -0,0 +1,35 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +from cloudkitty.api.v2 import utils as api_utils + + +def init(app): + api_utils.do_init(app, 'task', [ + { + 'module': __name__ + '.' + 'reprocess', + 'resource_class': 'ReprocessSchedulerPostApi', + 'url': '/reprocesses', + }, + { + 'module': __name__ + '.' + 'reprocess', + 'resource_class': 'ReprocessSchedulerGetApi', + 'url': '/reprocesses/', + }, + { + 'module': __name__ + '.' + 'reprocess', + 'resource_class': 'ReprocessesSchedulerGetApi', + 'url': '/reprocesses', + }, + ]) + return app diff --git a/cloudkitty/api/v2/task/reprocess.py b/cloudkitty/api/v2/task/reprocess.py new file mode 100644 index 00000000..e132d882 --- /dev/null +++ b/cloudkitty/api/v2/task/reprocess.py @@ -0,0 +1,280 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +from datetimerange import DateTimeRange +import flask +from oslo_log import log +import voluptuous +from werkzeug import exceptions as http_exceptions + +from cloudkitty.api.v2 import base +from cloudkitty.api.v2 import utils as api_utils +from cloudkitty.common import policy +from cloudkitty import storage_state +from cloudkitty.storage_state.models import ReprocessingScheduler +from cloudkitty.utils import tz as tzutils +from cloudkitty.utils import validation as validation_utils + + +LOG = log.getLogger(__name__) + +ALL_SCOPES_OPTION = 'ALL' + + +def dt_from_iso_as_utc(date_string): + return tzutils.dt_from_iso(date_string, as_utc=True) + + +class ReprocessSchedulerPostApi(base.BaseResource): + def __init__(self, *args, **kwargs): + super(ReprocessSchedulerPostApi, self).__init__(*args, **kwargs) + self.storage_state_manager = storage_state.StateManager() + self.schedule_reprocessing_db = storage_state.ReprocessingSchedulerDb() + + @api_utils.add_input_schema('body', { + voluptuous.Required('scope_ids'): api_utils.MultiQueryParam(str), + voluptuous.Required('start_reprocess_time'): + voluptuous.Coerce(dt_from_iso_as_utc), + voluptuous.Required('end_reprocess_time'): + voluptuous.Coerce(dt_from_iso_as_utc), + voluptuous.Required('reason'): api_utils.SingleQueryParam(str), + }) + def post(self, scope_ids=[], start_reprocess_time=None, + end_reprocess_time=None, reason=None): + + policy.authorize( + flask.request.context, + 'schedule:task_reprocesses', + {'tenant_id': flask.request.context.project_id or scope_ids} + ) + + ReprocessSchedulerPostApi.validate_inputs( + end_reprocess_time, reason, scope_ids, start_reprocess_time) + + if ALL_SCOPES_OPTION in scope_ids: + scope_ids = [] + + if not isinstance(scope_ids, list): + scope_ids = [scope_ids] + + all_scopes_to_reprocess = self.storage_state_manager.get_all( + identifier=scope_ids, offset=None, limit=None) + + ReprocessSchedulerPostApi.check_if_there_are_invalid_scopes( + all_scopes_to_reprocess, end_reprocess_time, scope_ids, + start_reprocess_time) + + ReprocessSchedulerPostApi.validate_start_end_for_reprocessing( + all_scopes_to_reprocess, end_reprocess_time, start_reprocess_time) + + self.validate_reprocessing_schedules_overlaps( + all_scopes_to_reprocess, end_reprocess_time, start_reprocess_time) + + for scope in all_scopes_to_reprocess: + schedule = ReprocessingScheduler( + identifier=scope.identifier, reason=reason, + start_reprocess_time=start_reprocess_time, + end_reprocess_time=end_reprocess_time) + + LOG.debug("Persisting scope reprocessing schedule [%s].", schedule) + self.schedule_reprocessing_db.persist(schedule) + + return {}, 202 + + @staticmethod + def validate_inputs( + end_reprocess_time, reason, scope_ids, start_reprocess_time): + ReprocessSchedulerPostApi.validate_scope_ids(scope_ids) + + if not reason.strip(): + raise http_exceptions.BadRequest( + "Empty or blank reason text is not allowed. Please, do " + "inform/register the reason for the reprocessing of a " + "previously processed timestamp.") + if end_reprocess_time < start_reprocess_time: + raise http_exceptions.BadRequest( + "End reprocessing timestamp [%s] cannot be less than " + "start reprocessing timestamp [%s]." + % (start_reprocess_time, end_reprocess_time)) + + @staticmethod + def validate_scope_ids(scope_ids): + option_all_selected = False + for s in scope_ids: + if s == ALL_SCOPES_OPTION: + option_all_selected = True + continue + + if option_all_selected and len(scope_ids) != 1: + raise http_exceptions.BadRequest( + "Cannot use 'ALL' with scope ID [%s]. Either schedule a " + "reprocessing for all active scopes using 'ALL' option, " + "or inform only the scopes you desire to schedule a " + "reprocessing." % scope_ids) + + @staticmethod + def check_if_there_are_invalid_scopes( + all_scopes_to_reprocess, end_reprocess_time, scope_ids, + start_reprocess_time): + + invalid_scopes = [] + for s in scope_ids: + scope_exist_in_db = False + for scope_to_reprocess in all_scopes_to_reprocess: + if s == scope_to_reprocess.identifier: + scope_exist_in_db = True + break + + if not scope_exist_in_db: + invalid_scopes.append(s) + + if invalid_scopes: + raise http_exceptions.BadRequest( + "Scopes %s scheduled to reprocess [start=%s, end=%s] " + "do not exist." + % (invalid_scopes, start_reprocess_time, end_reprocess_time)) + + @staticmethod + def validate_start_end_for_reprocessing(all_scopes_to_reprocess, + end_reprocess_time, + start_reprocess_time): + + for scope in all_scopes_to_reprocess: + last_processed_timestamp = scope.last_processed_timestamp + if start_reprocess_time > last_processed_timestamp: + raise http_exceptions.BadRequest( + "Cannot execute a reprocessing [start=%s] for scope [%s] " + "starting after the last possible timestamp [%s]." + % (start_reprocess_time, scope, last_processed_timestamp)) + if end_reprocess_time > scope.last_processed_timestamp: + raise http_exceptions.BadRequest( + "Cannot execute a reprocessing [end=%s] for scope [%s] " + "ending after the last possible timestamp [%s]." + % (end_reprocess_time, scope, last_processed_timestamp)) + + def validate_reprocessing_schedules_overlaps( + self, all_scopes_to_reprocess, end_reprocess_time, + start_reprocess_time): + + scheduling_range = DateTimeRange( + start_reprocess_time, end_reprocess_time) + + for scope_to_reprocess in all_scopes_to_reprocess: + all_reprocessing_schedules = self.schedule_reprocessing_db.get_all( + identifier=[scope_to_reprocess.identifier]) + + LOG.debug("All schedules [%s] for reprocessing found for scope " + "[%s]", all_reprocessing_schedules, scope_to_reprocess) + if not all_reprocessing_schedules: + LOG.debug( + "No need to validate possible collision of reprocessing " + "for scope [%s] because it does not have active " + "reprocessing schedules." % scope_to_reprocess) + continue + + for schedule in all_reprocessing_schedules: + scheduled_range = DateTimeRange( + tzutils.local_to_utc(schedule.start_reprocess_time), + tzutils.local_to_utc(schedule.end_reprocess_time)) + + try: + if scheduling_range.is_intersection(scheduled_range): + raise http_exceptions.BadRequest( + self.generate_overlap_error_message( + scheduled_range, scheduling_range, + scope_to_reprocess)) + except ValueError as e: + raise http_exceptions.BadRequest( + self.generate_overlap_error_message( + scheduled_range, scheduling_range, + scope_to_reprocess) + "Error: [%s]." % e) + + @staticmethod + def generate_overlap_error_message(scheduled_range, scheduling_range, + scope_to_reprocess): + return "Cannot schedule a reprocessing for scope [%s] for " \ + "reprocessing time [%s], because it already has a schedule " \ + "for a similar time range [%s]." % (scope_to_reprocess, + scheduling_range, + scheduled_range) + + +ACCEPTED_GET_REPROCESSING_REQUEST_ORDERS = ['asc', 'desc'] + + +class ReprocessSchedulerGetApi(base.BaseResource): + def __init__(self, *args, **kwargs): + super(ReprocessSchedulerGetApi, self).__init__(*args, **kwargs) + self.schedule_reprocessing_db = storage_state.ReprocessingSchedulerDb() + + @api_utils.paginated + @api_utils.add_input_schema('query', { + voluptuous.Optional('scope_ids'): api_utils.MultiQueryParam(str), + voluptuous.Optional('order', default="desc"): + api_utils.SingleQueryParam(str) + }) + @api_utils.add_output_schema({'results': [{ + voluptuous.Required('reason'): validation_utils.get_string_type(), + voluptuous.Required('scope_id'): validation_utils.get_string_type(), + voluptuous.Required('start_reprocess_time'): + validation_utils.get_string_type(), + voluptuous.Required('end_reprocess_time'): + validation_utils.get_string_type(), + voluptuous.Required('current_reprocess_time'): + validation_utils.get_string_type(), + }]}) + def get(self, scope_ids=[], path_scope_id=None, offset=0, limit=100, + order="desc"): + if path_scope_id and scope_ids: + LOG.warning("Filtering by scope IDs [%s] and path scope ID [%s] " + "does not make sense. You should use only one of " + "them. We will use only the path scope ID for this " + "request.", scope_ids, path_scope_id) + + if path_scope_id: + scope_ids = [path_scope_id] + + policy.authorize( + flask.request.context, + 'schedule:get_task_reprocesses', + {'tenant_id': flask.request.context.project_id or scope_ids} + ) + + if not isinstance(scope_ids, list): + scope_ids = [scope_ids] + + if order not in ACCEPTED_GET_REPROCESSING_REQUEST_ORDERS: + raise http_exceptions.BadRequest( + "The order [%s] is not valid. Accepted values are %s.", + order, ACCEPTED_GET_REPROCESSING_REQUEST_ORDERS) + + schedules = self.schedule_reprocessing_db.get_all( + identifier=scope_ids, remove_finished=False, + offset=offset, limit=limit, order=order) + + return { + 'results': [{ + 'scope_id': s.identifier, + 'reason': s.reason, + 'start_reprocess_time': s.start_reprocess_time.isoformat(), + 'end_reprocess_time': s.end_reprocess_time.isoformat(), + 'current_reprocess_time': + s.current_reprocess_time.isoformat() if + s.current_reprocess_time else "", + } for s in schedules]} + + +class ReprocessesSchedulerGetApi(ReprocessSchedulerGetApi): + + def __init__(self, *args, **kwargs): + super(ReprocessesSchedulerGetApi, self).__init__(*args, **kwargs) diff --git a/cloudkitty/cli/processor.py b/cloudkitty/cli/processor.py index a8253b08..cace2fdf 100644 --- a/cloudkitty/cli/processor.py +++ b/cloudkitty/cli/processor.py @@ -25,7 +25,7 @@ def main(): # before the prepare_service(), making cfg.CONF returning default values # systematically. from cloudkitty import orchestrator - orchestrator.OrchestratorServiceManager().run() + orchestrator.CloudKittyServiceManager().run() if __name__ == '__main__': diff --git a/cloudkitty/common/policies/__init__.py b/cloudkitty/common/policies/__init__.py index c4759211..3d74b276 100644 --- a/cloudkitty/common/policies/__init__.py +++ b/cloudkitty/common/policies/__init__.py @@ -24,6 +24,7 @@ from cloudkitty.common.policies.v1 import storage as v1_storage from cloudkitty.common.policies.v2 import dataframes as v2_dataframes from cloudkitty.common.policies.v2 import scope as v2_scope from cloudkitty.common.policies.v2 import summary as v2_summary +from cloudkitty.common.policies.v2 import tasks as v2_tasks def list_rules(): @@ -37,4 +38,5 @@ def list_rules(): v2_dataframes.list_rules(), v2_scope.list_rules(), v2_summary.list_rules(), + v2_tasks.list_rules() ) diff --git a/cloudkitty/common/policies/v2/tasks.py b/cloudkitty/common/policies/v2/tasks.py new file mode 100644 index 00000000..f2d4ee44 --- /dev/null +++ b/cloudkitty/common/policies/v2/tasks.py @@ -0,0 +1,36 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +from oslo_policy import policy + +from cloudkitty.common.policies import base + + +schedule_policies = [ + policy.DocumentedRuleDefault( + name='schedule:task_reprocesses', + check_str=base.ROLE_ADMIN, + description='Schedule a scope for reprocessing', + operations=[{'path': '/v2/task/reprocesses', + 'method': 'POST'}]), + policy.DocumentedRuleDefault( + name='schedule:get_task_reprocesses', + check_str=base.ROLE_ADMIN, + description='Get reprocessing schedule tasks for scopes.', + operations=[{'path': '/v2/task/reprocesses', + 'method': 'GET'}]), +] + + +def list_rules(): + return schedule_policies diff --git a/cloudkitty/orchestrator.py b/cloudkitty/orchestrator.py index 80263308..f50f56fe 100644 --- a/cloudkitty/orchestrator.py +++ b/cloudkitty/orchestrator.py @@ -12,6 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. # +import copy + from datetime import timedelta import decimal import functools @@ -57,9 +59,15 @@ orchestrator_opts = [ 'max_workers', default=multiprocessing.cpu_count(), sample_default=4, - min=1, - help='Maximal number of workers to run. Defaults to the number of ' - 'available CPUs'), + min=0, + help='Max number of workers to execute the rating process. Defaults ' + 'to the number of available CPU cores.'), + cfg.IntOpt( + 'max_workers_reprocessing', + default=multiprocessing.cpu_count(), + min=0, + help='Max number of workers to execute the reprocessing. Defaults to ' + 'the number of available CPU cores.'), cfg.IntOpt('max_threads', # NOTE(peschk_l): This is the futurist default default=multiprocessing.cpu_count() * 5, @@ -272,17 +280,18 @@ def _check_state(obj, period, tenant_id): class Worker(BaseWorker): def __init__(self, collector, storage, tenant_id, worker_id): + super(Worker, self).__init__(tenant_id) + self._collector = collector self._storage = storage self._period = CONF.collect.period self._wait_time = CONF.collect.wait_periods * self._period - self._tenant_id = tenant_id self._worker_id = worker_id self._log_prefix = '[scope: {scope}, worker: {worker}] '.format( scope=self._tenant_id, worker=self._worker_id) self._conf = ck_utils.load_conf(CONF.collect.metrics_conf) self._state = state.StateManager() - self._check_state = functools.partial( + self.next_timestamp_to_process = functools.partial( _check_state, self, self._period, self._tenant_id) super(Worker, self).__init__(self._tenant_id) @@ -295,10 +304,10 @@ class Worker(BaseWorker): metric, start_timestamp, next_timestamp, - self._tenant_id, + self._tenant_id ) if not data: - raise collector.NoDataCollected + raise collector.NoDataCollected(self._collector, metric) return name, data @@ -366,63 +375,179 @@ class Worker(BaseWorker): return dict(filter(lambda x: x[1] is not None, results)) def run(self): - while True: - timestamp = self._check_state() - LOG.debug("Processing timestamp [%s] for storage scope [%s].", - timestamp, self._tenant_id) - - if not timestamp: - break - - if self._state.get_state(self._tenant_id): - if not self._state.is_storage_scope_active(self._tenant_id): - LOG.debug("Skipping processing for storage scope [%s] " - "because it is marked as inactive.", - self._tenant_id) - break - else: - LOG.debug("No need to check if [%s] is de-activated. " - "We have never processed it before.") + should_continue_processing = self.execute_worker_processing() + while should_continue_processing: + should_continue_processing = self.execute_worker_processing() + def execute_worker_processing(self): + timestamp = self.next_timestamp_to_process() + LOG.debug("Processing timestamp [%s] for storage scope [%s].", + timestamp, self._tenant_id) + if not timestamp: + LOG.debug("Worker [%s] finished processing storage scope [%s].", + self._worker_id, self._tenant_id) + return False + if self._state.get_state(self._tenant_id): if not self._state.is_storage_scope_active(self._tenant_id): - LOG.debug("Skipping processing for storage scope [%s] because " - "it is marked as inactive.", self._tenant_id) - break + LOG.debug("Skipping processing for storage scope [%s] " + "because it is marked as inactive.", + self._tenant_id) + return False + else: + LOG.debug("No need to check if [%s] is de-activated. " + "We have never processed it before.") + self.do_execute_scope_processing(timestamp) + return True - metrics = list(self._conf['metrics'].keys()) + def do_execute_scope_processing(self, timestamp): + metrics = list(self._conf['metrics'].keys()) + # Collection + metrics = sorted(metrics) + usage_data = self._do_collection(metrics, timestamp) - # Collection - usage_data = self._do_collection(metrics, timestamp) - LOG.debug("Usage data [%s] found for storage scope [%s] in " - "timestamp [%s].", usage_data, self._tenant_id, - timestamp) + LOG.debug("Usage data [%s] found for storage scope [%s] in " + "timestamp [%s].", usage_data, self._tenant_id, + timestamp) + start_time = timestamp + end_time = tzutils.add_delta(timestamp, + timedelta(seconds=self._period)) + # No usage records found in + if not usage_data: + LOG.warning("No usage data for storage scope [%s] on " + "timestamp [%s]. You might want to consider " + "de-activating it.", self._tenant_id, timestamp) - start_time = timestamp - end_time = tzutils.add_delta(timestamp, - timedelta(seconds=self._period)) + else: + frame = self.execute_measurements_rating(end_time, start_time, + usage_data) + self.persist_rating_data(end_time, frame, start_time) - frame = dataframe.DataFrame( - start=start_time, - end=end_time, - usage=usage_data, - ) - # Rating - for processor in self._processors: - frame = processor.obj.process(frame) + self.update_scope_processing_state_db(timestamp) - # Writing - LOG.debug("Persisting processed frames [%s] for tenant [%s] and " - "time [start=%s,end=%s]", frame, self._tenant_id, - start_time, end_time) + def persist_rating_data(self, end_time, frame, start_time): + LOG.debug("Persisting processed frames [%s] for scope [%s] and time " + "[start=%s,end=%s]", frame, self._tenant_id, start_time, + end_time) - self._storage.push([frame], self._tenant_id) - self._state.set_state(self._tenant_id, timestamp) + self._storage.push([frame], self._tenant_id) + + def execute_measurements_rating(self, end_time, start_time, usage_data): + frame = dataframe.DataFrame( + start=start_time, + end=end_time, + usage=usage_data, + ) + + for processor in self._processors: + original_data = copy.deepcopy(frame) + frame = processor.obj.process(frame) + LOG.debug("Results [%s] for processing [%s] of data points [%s].", + frame, processor.obj.process, original_data) + return frame + + def update_scope_processing_state_db(self, timestamp): + self._state.set_state(self._tenant_id, timestamp) -class Orchestrator(cotyledon.Service): +class ReprocessingWorker(Worker): + def __init__(self, collector, storage, tenant_id, worker_id): + self.scope = tenant_id + self.scope_key = None + + super(ReprocessingWorker, self).__init__( + collector, storage, self.scope.identifier, worker_id) + + self.reprocessing_scheduler_db = state.ReprocessingSchedulerDb() + self.next_timestamp_to_process = self._next_timestamp_to_process + + self.load_scope_key() + + def load_scope_key(self): + scope_from_db = self._state.get_all(self._tenant_id) + + if len(scope_from_db) < 1: + raise Exception("Scope [%s] scheduled for reprocessing does not " + "seem to exist anymore." % self.scope) + + if len(scope_from_db) > 1: + raise Exception("Unexpected number of storage state entries found " + "for scope [%s]." % self.scope) + + self.scope_key = scope_from_db[0].scope_key + + def _next_timestamp_to_process(self): + db_item = self.reprocessing_scheduler_db.get_from_db( + identifier=self.scope.identifier, + start_reprocess_time=self.scope.start_reprocess_time, + end_reprocess_time=self.scope.end_reprocess_time) + + if not db_item: + LOG.info("It seems that the processing for schedule [%s] was " + "finished by other worker.", self.scope) + return None + + return ReprocessingWorker.generate_next_timestamp( + db_item, self._period) + + @staticmethod + def generate_next_timestamp(db_item, processing_period_interval): + new_timestamp = db_item.start_reprocess_time + if db_item.current_reprocess_time: + period_delta = timedelta(seconds=processing_period_interval) + + new_timestamp = db_item.current_reprocess_time + period_delta + + LOG.debug("Current reprocessed time is [%s], therefore, the next " + "one to process is [%s] based on the processing " + "interval [%s].", db_item.start_reprocess_time, + new_timestamp, processing_period_interval) + else: + LOG.debug("There is no reprocessing for the schedule [%s]. " + "Therefore, we use the start time [%s] as the first " + "time to process.", db_item, new_timestamp) + if new_timestamp <= db_item.end_reprocess_time: + return tzutils.local_to_utc(new_timestamp) + else: + LOG.debug("No need to keep reprocessing schedule [%s] as we " + "processed all requested timestamps.", db_item) + return None + + def do_execute_scope_processing(self, timestamp): + end_of_this_processing = timestamp + timedelta(seconds=self._period) + + end_of_this_processing = tzutils.local_to_utc(end_of_this_processing) + + LOG.debug("Cleaning backend [%s] data for reprocessing scope [%s] " + "for timeframe[start=%s, end=%s].", + self._storage, self.scope, timestamp, end_of_this_processing) + + self._storage.delete( + begin=timestamp, end=end_of_this_processing, + filters={self.scope_key: self._tenant_id}) + + LOG.debug("Executing the reprocessing of scope [%s] for " + "timeframe[start=%s, end=%s].", self.scope, timestamp, + end_of_this_processing) + + super(ReprocessingWorker, self).do_execute_scope_processing(timestamp) + + def update_scope_processing_state_db(self, timestamp): + LOG.debug("After data is persisted in the storage backend [%s], we " + "will update the scope [%s] current processing time to " + "[%s].", self._storage, self.scope, timestamp) + self.reprocessing_scheduler_db.update_reprocessing_time( + identifier=self.scope.identifier, + start_reprocess_time=self.scope.start_reprocess_time, + end_reprocess_time=self.scope.end_reprocess_time, + new_current_time_stamp=timestamp) + + +class CloudKittyProcessor(cotyledon.Service): def __init__(self, worker_id): self._worker_id = worker_id - super(Orchestrator, self).__init__(self._worker_id) + super(CloudKittyProcessor, self).__init__(self._worker_id) + + self.tenants = [] self.fetcher = driver.DriverManager( FETCHERS_NAMESPACE, @@ -445,9 +570,16 @@ class Orchestrator(cotyledon.Service): CONF.orchestrator.coordination_url, uuidutils.generate_uuid().encode('ascii')) self.coord.start(start_heart=True) - self._check_state = functools.partial( + self.next_timestamp_to_process = functools.partial( _check_state, self, CONF.collect.period) + self.worker_class = Worker + self.log_worker_initiated() + + def log_worker_initiated(self): + LOG.info("Processor worker ID [%s] is initiated as CloudKitty " + "rating processor.", self._worker_id) + def _init_messaging(self): target = oslo_messaging.Target(topic='cloudkitty', server=CONF.host, @@ -469,51 +601,125 @@ class Orchestrator(cotyledon.Service): def run(self): LOG.debug('Started worker {}.'.format(self._worker_id)) while True: - self.tenants = self.fetcher.get_tenants() - random.shuffle(self.tenants) - LOG.info('[Worker: {w}] {n} tenants loaded for fetcher {f}'.format( - w=self._worker_id, n=len(self.tenants), f=self.fetcher.name)) - - for tenant_id in self.tenants: - - lock_name, lock = get_lock(self.coord, tenant_id) - LOG.debug( - '[Worker: {w}] Trying to acquire lock "{lck}" ...'.format( - w=self._worker_id, lck=lock_name) - ) - if lock.acquire(blocking=False): - LOG.debug( - '[Worker: {w}] Acquired lock "{lck}" ...'.format( - w=self._worker_id, lck=lock_name) - ) - state = self._check_state(tenant_id) - LOG.debug("Next timestamp [%s] found for processing for " - "storage scope [%s].", state, tenant_id) - if state: - worker = Worker( - self.collector, - self.storage, - tenant_id, - self._worker_id, - ) - worker.run() - - lock.release() - - LOG.debug("Finished processing all storage scopes.") - - # FIXME(sheeprine): We may cause a drift here - time.sleep(CONF.collect.period) + self.internal_run() def terminate(self): - LOG.debug('Terminating worker {}...'.format(self._worker_id)) + LOG.debug('Terminating worker {}.'.format(self._worker_id)) self.coord.stop() LOG.debug('Terminated worker {}.'.format(self._worker_id)) + def internal_run(self): + self.load_scopes_to_process() + for tenant_id in self.tenants: + lock_name, lock = get_lock( + self.coord, self.generate_lock_base_name(tenant_id)) -class OrchestratorServiceManager(cotyledon.ServiceManager): + LOG.debug('[Worker: {w}] Trying to acquire lock "{lock_name}".' + .format(w=self._worker_id, lock_name=lock_name)) + + lock_acquired = lock.acquire(blocking=False) + if lock_acquired: + LOG.debug('[Worker: {w}] Acquired lock "{lock_name}".'.format( + w=self._worker_id, lock_name=lock_name)) + + try: + self.process_scope(tenant_id) + finally: + lock.release() + + LOG.debug("Finished processing scopes [%s].", tenant_id) + else: + LOG.debug("Could not acquire lock [%s] for processing " + "scope [%s] with worker [%s].", lock_name, + tenant_id, self.worker_class) + LOG.debug("Finished processing all storage scopes with worker " + "[worker_id=%s, class=%s].", + self._worker_id, self.worker_class) + # FIXME(sheeprine): We may cause a drift here + time.sleep(CONF.collect.period) + + def process_scope(self, scope_to_process): + timestamp = self.next_timestamp_to_process(scope_to_process) + LOG.debug("Next timestamp [%s] found for processing for " + "storage scope [%s].", state, scope_to_process) + + if not timestamp: + LOG.debug("There is no next timestamp to process for scope [%s]", + scope_to_process) + return + + worker = self.worker_class( + self.collector, + self.storage, + scope_to_process, + self._worker_id, + ) + worker.run() + + def generate_lock_base_name(self, tenant_id): + return tenant_id + + def load_scopes_to_process(self): + self.tenants = self.fetcher.get_tenants() + random.shuffle(self.tenants) + + LOG.info('[Worker: {w}] Tenants loaded for fetcher {f}'.format( + w=self._worker_id, f=self.fetcher.name)) + + +class CloudKittyReprocessor(CloudKittyProcessor): + def __init__(self, worker_id): + super(CloudKittyReprocessor, self).__init__(worker_id) + + self.next_timestamp_to_process = self._next_timestamp_to_process + self.worker_class = ReprocessingWorker + + self.reprocessing_scheduler_db = state.ReprocessingSchedulerDb() + + def log_worker_initiated(self): + LOG.info("Processor worker ID [%s] is initiated as CloudKitty " + "rating reprocessor.", self._worker_id) + + def _next_timestamp_to_process(self, scope): + scope_db = self.reprocessing_scheduler_db.get_from_db( + identifier=scope.identifier, + start_reprocess_time=scope.start_reprocess_time, + end_reprocess_time=scope.end_reprocess_time) + + if scope_db: + return ReprocessingWorker.generate_next_timestamp( + scope_db, CONF.collect.period) + else: + LOG.debug("It seems that the processing for schedule [%s] was " + "finished by other CloudKitty reprocessor.", scope) + return None + + def load_scopes_to_process(self): + self.tenants = self.reprocessing_scheduler_db.get_all() + random.shuffle(self.tenants) + + LOG.info('Reprocessing worker [%s] loaded [%s] schedules to process.', + self._worker_id, len(self.tenants)) + + def generate_lock_base_name(self, scope): + return "%s-id=%s-start=%s-end=%s-current=%s" % ( + self.worker_class, scope.identifier, scope.start_reprocess_time, + scope.end_reprocess_time, scope.current_reprocess_time) + + +class CloudKittyServiceManager(cotyledon.ServiceManager): def __init__(self): - super(OrchestratorServiceManager, self).__init__() - self.service_id = self.add(Orchestrator, - workers=CONF.orchestrator.max_workers) + super(CloudKittyServiceManager, self).__init__() + if CONF.orchestrator.max_workers: + self.cloudkitty_processor_service_id = self.add( + CloudKittyProcessor, workers=CONF.orchestrator.max_workers) + else: + LOG.info("No worker configured for CloudKitty processing.") + + if CONF.orchestrator.max_workers_reprocessing: + self.cloudkitty_reprocessor_service_id = self.add( + CloudKittyReprocessor, + workers=CONF.orchestrator.max_workers_reprocessing) + else: + LOG.info("No worker configured for CloudKitty reprocessing.") diff --git a/cloudkitty/storage/v2/influx.py b/cloudkitty/storage/v2/influx.py index f162f346..3ff6bfe0 100644 --- a/cloudkitty/storage/v2/influx.py +++ b/cloudkitty/storage/v2/influx.py @@ -275,6 +275,10 @@ class InfluxClient(object): else: query += filter_query query += ';' + + LOG.debug("InfluxDB query to delete elements filtering by [%s] and " + "with [begin=%s, end=%s]: [%].", filters, begin, end, query) + self._conn.query(query) diff --git a/cloudkitty/storage_state/__init__.py b/cloudkitty/storage_state/__init__.py index 0660a0b9..99d6a255 100644 --- a/cloudkitty/storage_state/__init__.py +++ b/cloudkitty/storage_state/__init__.py @@ -16,6 +16,8 @@ from oslo_config import cfg from oslo_db.sqlalchemy import utils from oslo_log import log +from sqlalchemy import or_ as or_operation +from sqlalchemy import sql from cloudkitty import db from cloudkitty.storage_state import migration @@ -303,3 +305,140 @@ class StateManager(object): session.close() return r.active + + +class ReprocessingSchedulerDb(object): + """Class to access and operator the reprocessing scheduler in the DB""" + + model = models.ReprocessingScheduler + + def get_all(self, identifier=None, remove_finished=True, + limit=100, offset=0, order="desc"): + """Returns all schedules for reprocessing for a given resource + + :param identifier: Identifiers of the scopes + :type identifier: str + :param remove_finished: option to remove from the projection the + reprocessing scheduled that already finished. + :type remove_finished: bool + :param limit: optional to restrict the projection + :type limit: int + :param offset: optional to shift the projection + :type offset: int + :param order: optional parameter to indicate the order of the + projection. The ordering field will be the `id`. + :type order: str + """ + session = db.get_session() + session.begin() + + query = utils.model_query(self.model, session) + + if identifier: + query = query.filter(self.model.identifier.in_(identifier)) + if remove_finished: + query = self.remove_finished_processing_schedules(query) + if order: + query = query.order_by(sql.text("id %s" % order)) + + query = apply_offset_and_limit(limit, offset, query) + + result_set = query.all() + + session.close() + return result_set + + def remove_finished_processing_schedules(self, query): + return query.filter(or_operation( + self.model.current_reprocess_time.is_(None), + self.model.current_reprocess_time < self.model.end_reprocess_time + )) + + def persist(self, reprocessing_scheduler): + """Persists the reprocessing_schedule + + :param reprocessing_scheduler: reprocessing schedule that we want to + persist in the database. + :type reprocessing_scheduler: models.ReprocessingScheduler + """ + + session = db.get_session() + session.begin() + + session.add(reprocessing_scheduler) + session.commit() + + session.close() + + def get_from_db(self, identifier=None, start_reprocess_time=None, + end_reprocess_time=None): + """Get the reprocessing schedule from DB + + :param identifier: Identifier of the scope + :type identifier: str + :param start_reprocess_time: the start time used in the + reprocessing schedule + :type start_reprocess_time: datetime.datetime + :param end_reprocess_time: the end time used in the + reprocessing schedule + :type end_reprocess_time: datetime.datetime + """ + session = db.get_session() + session.begin() + + result_set = self._get_db_item( + end_reprocess_time, identifier, session, start_reprocess_time) + session.close() + + return result_set + + def _get_db_item(self, end_reprocess_time, identifier, session, + start_reprocess_time): + + query = utils.model_query(self.model, session) + query = query.filter(self.model.identifier == identifier) + query = query.filter( + self.model.start_reprocess_time == start_reprocess_time) + query = query.filter( + self.model.end_reprocess_time == end_reprocess_time) + query = self.remove_finished_processing_schedules(query) + + return query.first() + + def update_reprocessing_time(self, identifier=None, + start_reprocess_time=None, + end_reprocess_time=None, + new_current_time_stamp=None): + """Update current processing time for a reprocessing schedule + + :param identifier: Identifier of the scope + :type identifier: str + :param start_reprocess_time: the start time used in the + reprocessing schedule + :type start_reprocess_time: datetime.datetime + :param end_reprocess_time: the end time used in the + reprocessing schedule + :type end_reprocess_time: datetime.datetime + :param new_current_time_stamp: the new current timestamp to set + :type new_current_time_stamp: datetime.datetime + """ + + session = db.get_session() + session.begin() + + result_set = self._get_db_item( + end_reprocess_time, identifier, session, start_reprocess_time) + + if not result_set: + LOG.warning("Trying to update current time to [%s] for identifier " + "[%s] and reprocessing range [start=%, end=%s], but " + "we could not find a this task in the database.", + new_current_time_stamp, identifier, + start_reprocess_time, end_reprocess_time) + return + new_current_time_stamp = tzutils.local_to_utc( + new_current_time_stamp, naive=True) + + result_set.current_reprocess_time = new_current_time_stamp + session.commit() + session.close() diff --git a/cloudkitty/storage_state/alembic/versions/4d69395f_add_storage_scope_state_fields.py b/cloudkitty/storage_state/alembic/versions/4d69395f_add_storage_scope_state_fields.py index 03b21e20..23cf0086 100644 --- a/cloudkitty/storage_state/alembic/versions/4d69395f_add_storage_scope_state_fields.py +++ b/cloudkitty/storage_state/alembic/versions/4d69395f_add_storage_scope_state_fields.py @@ -38,7 +38,6 @@ def upgrade(): with op.batch_alter_table(name, copy_from=table, recreate='always') as batch_op: - batch_op.alter_column('identifier') batch_op.add_column( sqlalchemy.Column('scope_activation_toggle_date', sqlalchemy.DateTime, nullable=False, diff --git a/cloudkitty/storage_state/alembic/versions/9feccd32_create_reprocessing_scheduler.py b/cloudkitty/storage_state/alembic/versions/9feccd32_create_reprocessing_scheduler.py new file mode 100644 index 00000000..ce3cc375 --- /dev/null +++ b/cloudkitty/storage_state/alembic/versions/9feccd32_create_reprocessing_scheduler.py @@ -0,0 +1,45 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Create reprocessing scheduler table + +Revision ID: 9feccd32 +Revises: 4d69395f +Create Date: 2021-06-04 16:27:00.595274 + +""" +from alembic import op +import sqlalchemy + +# revision identifiers, used by Alembic. +revision = '9feccd32' +down_revision = '4d69395f' + + +def upgrade(): + op.create_table( + 'storage_scope_reprocessing_schedule', + sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True), + sqlalchemy.Column('identifier', sqlalchemy.String(length=256), + nullable=False, unique=False), + sqlalchemy.Column('start_reprocess_time', sqlalchemy.DateTime, + nullable=False), + sqlalchemy.Column('end_reprocess_time', sqlalchemy.DateTime, + nullable=False), + sqlalchemy.Column('current_reprocess_time', sqlalchemy.DateTime, + nullable=True), + sqlalchemy.Column('reason', sqlalchemy.Text, nullable=False), + + sqlalchemy.PrimaryKeyConstraint('id'), + mysql_charset='utf8', mysql_engine='InnoDB' + ) diff --git a/cloudkitty/storage_state/models.py b/cloudkitty/storage_state/models.py index 24a73525..55cff8f8 100644 --- a/cloudkitty/storage_state/models.py +++ b/cloudkitty/storage_state/models.py @@ -20,6 +20,14 @@ from sqlalchemy.ext import declarative Base = declarative.declarative_base() +def to_string_selected_fields(object_to_print, fields=[]): + object_to_return = {} + if object_to_print: + object_to_return = { + a: y for a, y in object_to_print.items() if a in fields} + return str(object_to_return) + + class IdentifierState(Base, models.ModelBase): """Represents the state of a given identifier.""" @@ -57,3 +65,39 @@ class IdentifierState(Base, models.ModelBase): server_default=sqlalchemy.sql.func.now()) active = sqlalchemy.Column('active', sqlalchemy.Boolean, nullable=False, default=True) + + def __str__(self): + return to_string_selected_fields( + self, ['id', 'identifier', 'state', 'active']) + + +class ReprocessingScheduler(Base, models.ModelBase): + """Represents the reprocessing scheduler table.""" + + @declarative.declared_attr + def __table_args__(cls): + return ( + sqlalchemy.schema.PrimaryKeyConstraint('id'), + ) + + __tablename__ = 'storage_scope_reprocessing_schedule' + + id = sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True) + reason = sqlalchemy.Column("reason", sqlalchemy.Text, nullable=False) + + identifier = sqlalchemy.Column("identifier", sqlalchemy.String(256), + nullable=False, unique=False) + start_reprocess_time = sqlalchemy.Column("start_reprocess_time", + sqlalchemy.DateTime, + nullable=False) + end_reprocess_time = sqlalchemy.Column("end_reprocess_time", + sqlalchemy.DateTime, + nullable=False) + current_reprocess_time = sqlalchemy.Column("current_reprocess_time", + sqlalchemy.DateTime, + nullable=True) + + def __str__(self): + return to_string_selected_fields( + self, ['id', 'identifier', 'start_reprocess_time', + 'end_reprocess_time', 'current_reprocess_time']) diff --git a/cloudkitty/tests/api/v2/task/__init__.py b/cloudkitty/tests/api/v2/task/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cloudkitty/tests/api/v2/task/test_reprocess.py b/cloudkitty/tests/api/v2/task/test_reprocess.py new file mode 100644 index 00000000..5f8d9f3e --- /dev/null +++ b/cloudkitty/tests/api/v2/task/test_reprocess.py @@ -0,0 +1,381 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +import datetime +import re +from unittest import mock + +from datetimerange import DateTimeRange +from werkzeug import exceptions as http_exceptions + +from cloudkitty.api.v2.task import reprocess +from cloudkitty import tests +from cloudkitty.utils import tz as tzutils + + +class TestReprocessSchedulerPostApi(tests.TestCase): + + def setUp(self): + super(TestReprocessSchedulerPostApi, self).setUp() + self.endpoint = reprocess.ReprocessSchedulerPostApi() + self.scope_ids = ["some-other-scope-id", + "5e56cb64-4980-4466-9fce-d0133c0c221e"] + + self.start_reprocess_time = tzutils.localized_now() + self.end_reprocess_time =\ + self.start_reprocess_time + datetime.timedelta(hours=1) + + self.reason = "We are testing the reprocess API." + + def test_validate_scope_ids_all_option_with_scope_ids(self): + self.scope_ids.append('ALL') + + expected_message = \ + "400 Bad Request: Cannot use 'ALL' with scope ID [['some-other-" \ + "scope-id', '5e56cb64-4980-4466-9fce-d0133c0c221e', 'ALL']]. " \ + "Either schedule a reprocessing for all active scopes using " \ + "'ALL' option, or inform only the scopes you desire to schedule " \ + "a reprocessing." + expected_message = re.escape(expected_message) + + self.assertRaisesRegexp(http_exceptions.BadRequest, expected_message, + self.endpoint.validate_scope_ids, + self.scope_ids) + + self.scope_ids.remove('ALL') + self.endpoint.validate_scope_ids(self.scope_ids) + + def test_validate_inputs_blank_reason(self): + + expected_message = \ + "400 Bad Request: Empty or blank reason text is not allowed. " \ + "Please, do inform/register the reason for the reprocessing of " \ + "a previously processed timestamp." + expected_message = re.escape(expected_message) + + self.assertRaisesRegexp(http_exceptions.BadRequest, expected_message, + self.endpoint.validate_inputs, + self.end_reprocess_time, "", self.scope_ids, + self.start_reprocess_time) + + self.assertRaisesRegexp( + http_exceptions.BadRequest, expected_message, + self.endpoint.validate_inputs, self.end_reprocess_time, + " ", self.scope_ids, self.start_reprocess_time) + + self.endpoint.validate_inputs( + self.end_reprocess_time, self.reason, self.scope_ids, + self.start_reprocess_time) + + def test_validate_inputs_end_date_less_than_start_date(self): + original_end_reprocess_time = self.end_reprocess_time + + self.end_reprocess_time =\ + self.start_reprocess_time - datetime.timedelta(hours=1) + + expected_message = \ + "400 Bad Request: End reprocessing timestamp [%s] cannot be " \ + "less than start reprocessing timestamp [%s]." % ( + self.start_reprocess_time, self.end_reprocess_time) + + expected_message = re.escape(expected_message) + + self.assertRaisesRegexp(http_exceptions.BadRequest, expected_message, + self.endpoint.validate_inputs, + self.end_reprocess_time, self.reason, + self.scope_ids, self.start_reprocess_time) + + self.end_reprocess_time = original_end_reprocess_time + self.endpoint.validate_inputs( + self.end_reprocess_time, self.reason, self.scope_ids, + self.start_reprocess_time) + + def test_check_if_there_are_invalid_scopes(self): + all_scopes = self.generate_all_scopes_object() + + element_removed = all_scopes.pop(0) + + expected_message = \ + "400 Bad Request: Scopes [\'%s\'] scheduled to reprocess "\ + "[start=%s, end=%s] do not exist."\ + % (element_removed.identifier, self.start_reprocess_time, + self.end_reprocess_time) + + expected_message = re.escape(expected_message) + + self.assertRaisesRegexp( + http_exceptions.BadRequest, expected_message, + self.endpoint.check_if_there_are_invalid_scopes, all_scopes, + self.end_reprocess_time, self.scope_ids, self.start_reprocess_time) + + all_scopes.append(element_removed) + self.endpoint.check_if_there_are_invalid_scopes( + all_scopes, self.end_reprocess_time, self.scope_ids, + self.start_reprocess_time) + + def generate_all_scopes_object(self, last_processed_time=None): + all_scopes = [] + + def mock_to_string(self): + return "toStringMock" + + for s in self.scope_ids: + scope = mock.Mock() + scope.identifier = s + scope.last_processed_timestamp = last_processed_time + scope.__str__ = mock_to_string + all_scopes.append(scope) + return all_scopes + + @mock.patch("cloudkitty.storage_state.ReprocessingSchedulerDb.get_all") + def test_validate_reprocessing_schedules_overlaps( + self, schedule_get_all_mock): + + self.configure_and_execute_overlap_test(schedule_get_all_mock, + self.start_reprocess_time, + self.end_reprocess_time) + + self.configure_and_execute_overlap_test(schedule_get_all_mock, + self.end_reprocess_time, + self.start_reprocess_time) + + end_reprocess_time =\ + self.end_reprocess_time + datetime.timedelta(hours=5) + + self.configure_and_execute_overlap_test(schedule_get_all_mock, + self.start_reprocess_time, + end_reprocess_time) + + start_reprocess_time =\ + self.start_reprocess_time + datetime.timedelta(hours=1) + + self.configure_and_execute_overlap_test(schedule_get_all_mock, + start_reprocess_time, + end_reprocess_time) + + start_reprocess_time =\ + self.start_reprocess_time - datetime.timedelta(hours=1) + + self.configure_and_execute_overlap_test(schedule_get_all_mock, + start_reprocess_time, + end_reprocess_time) + + start_reprocess_time =\ + self.end_reprocess_time + datetime.timedelta(hours=1) + + self.configure_schedules_mock(schedule_get_all_mock, + start_reprocess_time, + end_reprocess_time) + + self.endpoint.validate_reprocessing_schedules_overlaps( + self.generate_all_scopes_object(), self.end_reprocess_time, + self.start_reprocess_time) + + schedule_get_all_mock.assert_has_calls([ + mock.call(identifier=[self.scope_ids[0]]), + mock.call(identifier=[self.scope_ids[1]])]) + + def configure_and_execute_overlap_test(self, schedule_get_all_mock, + start_reprocess_time, + end_reprocess_time): + + self.configure_schedules_mock( + schedule_get_all_mock, start_reprocess_time, end_reprocess_time) + + scheduling_range = DateTimeRange( + tzutils.utc_to_local(self.start_reprocess_time), + tzutils.utc_to_local(self.end_reprocess_time)) + scheduled_range = DateTimeRange( + tzutils.local_to_utc(start_reprocess_time), + tzutils.local_to_utc(end_reprocess_time)) + expected_message = \ + "400 Bad Request: Cannot schedule a reprocessing for scope " \ + "[toStringMock] for reprocessing time [%s], because it already " \ + "has a schedule for a similar time range [%s]." \ + % (scheduling_range, scheduled_range) + + expected_message = re.escape(expected_message) + + self.assertRaisesRegexp( + http_exceptions.BadRequest, expected_message, + self.endpoint.validate_reprocessing_schedules_overlaps, + self.generate_all_scopes_object(), + self.end_reprocess_time, self.start_reprocess_time) + + schedule_get_all_mock.assert_called_with( + identifier=[self.scope_ids[0]]) + + def configure_schedules_mock(self, schedule_get_all_mock, + start_reprocess_time, end_reprocess_time): + schedules = [] + schedule_get_all_mock.return_value = schedules + all_scopes = self.generate_all_scopes_object() + for s in all_scopes: + schedule_mock = mock.Mock() + schedule_mock.identifier = s.identifier + schedule_mock.start_reprocess_time = start_reprocess_time + schedule_mock.end_reprocess_time = end_reprocess_time + schedules.append(schedule_mock) + + def test_validate_start_end_for_reprocessing(self): + all_scopes = self.generate_all_scopes_object( + last_processed_time=self.start_reprocess_time) + + base_error_message = "400 Bad Request: Cannot execute a " \ + "reprocessing [%s=%s] for scope [toStringMock] " \ + "%s after the last possible timestamp [%s]." + + start_reprocess_time =\ + self.start_reprocess_time + datetime.timedelta(hours=1) + + expected_message = base_error_message % ("start", + start_reprocess_time, + "starting", + self.start_reprocess_time) + expected_message = re.escape(expected_message) + + self.assertRaisesRegexp( + http_exceptions.BadRequest, expected_message, + self.endpoint.validate_start_end_for_reprocessing, all_scopes, + self.end_reprocess_time, start_reprocess_time) + + all_scopes = self.generate_all_scopes_object( + last_processed_time=self.end_reprocess_time) + + end_processing_time =\ + self.end_reprocess_time + datetime.timedelta(hours=1) + + expected_message = base_error_message % ("end", + end_processing_time, + "ending", + self.end_reprocess_time) + expected_message = re.escape(expected_message) + + self.assertRaisesRegexp( + http_exceptions.BadRequest, expected_message, + self.endpoint.validate_start_end_for_reprocessing, all_scopes, + end_processing_time, self.start_reprocess_time) + + self.endpoint.validate_start_end_for_reprocessing( + all_scopes, self.end_reprocess_time, + self.start_reprocess_time) + + all_scopes = self.generate_all_scopes_object( + last_processed_time=self.start_reprocess_time) + + self.endpoint.validate_start_end_for_reprocessing( + all_scopes, self.start_reprocess_time, + self.start_reprocess_time) + + @mock.patch("flask.request") + @mock.patch("cloudkitty.common.policy.authorize") + @mock.patch("cloudkitty.api.v2.task.reprocess" + ".ReprocessSchedulerPostApi.validate_inputs") + @mock.patch("cloudkitty.api.v2.task.reprocess" + ".ReprocessSchedulerPostApi" + ".check_if_there_are_invalid_scopes") + @mock.patch("cloudkitty.api.v2.task.reprocess." + "ReprocessSchedulerPostApi." + "validate_start_end_for_reprocessing") + @mock.patch("cloudkitty.api.v2.task.reprocess" + ".ReprocessSchedulerPostApi" + ".validate_reprocessing_schedules_overlaps") + @mock.patch("cloudkitty.storage_state.StateManager.get_all") + @mock.patch("cloudkitty.storage_state.ReprocessingSchedulerDb.persist") + def test_post(self, reprocessing_scheduler_db_persist_mock, + state_manager_get_all_mock, + validate_reprocessing_schedules_overlaps_mock, + validate_start_end_for_reprocessing_mock, + check_if_there_are_invalid_scopes_mock, validate_inputs_mock, + policy_mock, request_mock): + + state_manager_get_all_mock.return_value =\ + self.generate_all_scopes_object() + + request_mock.context = mock.Mock() + request_mock.context.project_id = "project_id_mock" + + def get_json_mock(): + return {"scope_ids": self.scope_ids[0], + "start_reprocess_time": str(self.start_reprocess_time), + "end_reprocess_time": str(self.end_reprocess_time), + "reason": self.reason} + + request_mock.get_json = get_json_mock + + self.endpoint.post() + + self.assertEqual(reprocessing_scheduler_db_persist_mock.call_count, 2) + state_manager_get_all_mock.assert_called_once() + validate_reprocessing_schedules_overlaps_mock.assert_called_once() + validate_start_end_for_reprocessing_mock.assert_called_once() + check_if_there_are_invalid_scopes_mock.assert_called_once() + validate_inputs_mock.assert_called_once() + policy_mock.assert_called_once() + + +class TestReprocessingSchedulerGetApi(tests.TestCase): + + def setUp(self): + super(TestReprocessingSchedulerGetApi, self).setUp() + self.endpoint = reprocess.ReprocessSchedulerGetApi() + + @mock.patch("flask.request") + @mock.patch("cloudkitty.common.policy.authorize") + @mock.patch("cloudkitty.storage_state.ReprocessingSchedulerDb.get_all") + def test_get(self, reprocessing_db_get_all_mock, + policy_mock, request_mock): + + time_now = tzutils.localized_now() + schedule_mock = mock.Mock() + schedule_mock.id = 1 + schedule_mock.identifier = "scope_identifier" + schedule_mock.reason = "reason to process" + schedule_mock.current_reprocess_time = time_now + schedule_mock.start_reprocess_time =\ + time_now - datetime.timedelta(hours=10) + + schedule_mock.end_reprocess_time =\ + time_now + datetime.timedelta(hours=10) + + reprocessing_db_get_all_mock.return_value = [schedule_mock] + request_mock.context = mock.Mock() + request_mock.args = mock.Mock() + request_mock.args.lists = mock.Mock() + request_mock.args.lists.return_value = [] + list_all_return = self.endpoint.get() + + self.assertTrue("results" in list_all_return) + self.assertTrue("id" not in list_all_return['results'][0]) + self.assertTrue("scope_id" in list_all_return['results'][0]) + self.assertTrue("reason" in list_all_return['results'][0]) + self.assertTrue( + "current_reprocess_time" in list_all_return['results'][0]) + self.assertTrue( + "start_reprocess_time" in list_all_return['results'][0]) + self.assertTrue( + "end_reprocess_time" in list_all_return['results'][0]) + + self.assertEqual("scope_identifier", + list_all_return['results'][0]['scope_id']) + self.assertEqual("reason to process", + list_all_return['results'][0]['reason']) + self.assertEqual(time_now.isoformat(), list_all_return['results'][0][ + 'current_reprocess_time']) + self.assertEqual((time_now - datetime.timedelta(hours=10)).isoformat(), + list_all_return['results'][0]['start_reprocess_time']) + self.assertEqual((time_now + datetime.timedelta(hours=10)).isoformat(), + list_all_return['results'][0]['end_reprocess_time']) + + reprocessing_db_get_all_mock.assert_called_once() + policy_mock.assert_called_once() diff --git a/cloudkitty/tests/test_orchestrator.py b/cloudkitty/tests/test_orchestrator.py index b1b1cd76..64b2cff6 100644 --- a/cloudkitty/tests/test_orchestrator.py +++ b/cloudkitty/tests/test_orchestrator.py @@ -14,6 +14,8 @@ # under the License. # import datetime +import re + from unittest import mock from oslo_messaging import conffixture @@ -168,22 +170,96 @@ class OrchestratorTest(tests.TestCase): self.assertEqual('fake2', worker._processors[2].name) self.assertEqual(1, worker._processors[2].obj.priority) + @mock.patch("cotyledon.ServiceManager.add") + @mock.patch("cotyledon._service_manager.ServiceManager.__init__") + def test_cloudkitty_service_manager_only_processing( + self, service_manager_init_mock, cotyledon_add_mock): + + OrchestratorTest.execute_cloudkitty_service_manager_test( + cotyledon_add_mock=cotyledon_add_mock, max_workers_reprocessing=0, + max_workers=1) + + self.assertTrue(service_manager_init_mock.called) + + @mock.patch("cotyledon.ServiceManager.add") + @mock.patch("cotyledon._service_manager.ServiceManager.__init__") + def test_cloudkitty_service_manager_only_reprocessing( + self, service_manager_init_mock, cotyledon_add_mock): + OrchestratorTest.execute_cloudkitty_service_manager_test( + cotyledon_add_mock=cotyledon_add_mock, max_workers_reprocessing=1, + max_workers=0) + + self.assertTrue(service_manager_init_mock.called) + + @mock.patch("cotyledon.ServiceManager.add") + @mock.patch("cotyledon._service_manager.ServiceManager.__init__") + def test_cloudkitty_service_manager_both_processings( + self, service_manager_init_mock, cotyledon_add_mock): + OrchestratorTest.execute_cloudkitty_service_manager_test( + cotyledon_add_mock=cotyledon_add_mock) + + self.assertTrue(service_manager_init_mock.called) + + @staticmethod + def execute_cloudkitty_service_manager_test(cotyledon_add_mock=None, + max_workers=1, + max_workers_reprocessing=1): + + original_conf = orchestrator.CONF + try: + orchestrator.CONF = mock.Mock() + orchestrator.CONF.orchestrator = mock.Mock() + orchestrator.CONF.orchestrator.max_workers = max_workers + orchestrator.CONF.orchestrator.max_workers_reprocessing = \ + max_workers_reprocessing + + orchestrator.CloudKittyServiceManager() + + expected_calls = [] + if max_workers: + expected_calls.append( + mock.call(orchestrator.CloudKittyProcessor, + workers=max_workers)) + + if max_workers_reprocessing: + expected_calls.append( + mock.call(orchestrator.CloudKittyReprocessor, + workers=max_workers_reprocessing)) + + cotyledon_add_mock.assert_has_calls(expected_calls) + finally: + orchestrator.CONF = original_conf + class WorkerTest(tests.TestCase): def setUp(self): super(WorkerTest, self).setUp() - class FakeWorker(orchestrator.Worker): - def __init__(self): - self._tenant_id = 'a' - self._worker_id = '0' - self._log_prefix = '[IGNORE THIS MESSAGE]' + patcher_state_manager_set_state = mock.patch( + "cloudkitty.storage_state.StateManager.set_state") + self.addCleanup(patcher_state_manager_set_state.stop) + self.state_manager_set_state_mock = \ + patcher_state_manager_set_state.start() - self.worker = FakeWorker() - self.worker._collect = mock.MagicMock() + self._tenant_id = 'a' + self._worker_id = '0' + + self.collector_mock = mock.MagicMock() + self.storage_mock = mock.MagicMock() + self.collector_mock.__str__.return_value = "toString" + + load_conf_manager = mock.patch("cloudkitty.utils.load_conf") + self.addCleanup(load_conf_manager.stop) + self.load_conf_mock = load_conf_manager.start() + + self.worker = orchestrator.Worker(self.collector_mock, + self.storage_mock, self._tenant_id, + self._worker_id) def test_do_collection_all_valid(self): + timestamp_now = tzutils.localized_now() + metrics = ['metric{}'.format(i) for i in range(5)] side_effect = [( metrics[i], @@ -191,12 +267,15 @@ class WorkerTest(tests.TestCase): 'end': 3600}, 'usage': i}, ) for i in range(5)] - self.worker._collect.side_effect = side_effect - output = sorted(self.worker._do_collection(metrics, 0).items(), + self.collector_mock.retrieve.side_effect = side_effect + output = sorted(self.worker._do_collection(metrics, + timestamp_now).items(), key=lambda x: x[1]['usage']) self.assertEqual(side_effect, output) def test_do_collection_some_empty(self): + timestamp_now = tzutils.localized_now() + metrics = ['metric{}'.format(i) for i in range(7)] side_effect = [( metrics[i], @@ -206,10 +285,772 @@ class WorkerTest(tests.TestCase): ) for i in range(5)] side_effect.insert(2, collector.NoDataCollected('a', 'b')) side_effect.insert(4, collector.NoDataCollected('a', 'b')) - self.worker._collect.side_effect = side_effect - output = sorted(self.worker._do_collection(metrics, 0).items(), + self.collector_mock.retrieve.side_effect = side_effect + output = sorted(self.worker._do_collection( + metrics, timestamp_now).items(), key=lambda x: x[1]['usage']) self.assertEqual([ i for i in side_effect if not isinstance(i, collector.NoDataCollected) ], output) + + def test_update_scope_processing_state_db(self): + timestamp = tzutils.localized_now() + self.worker.update_scope_processing_state_db(timestamp) + + self.state_manager_set_state_mock.assert_has_calls([ + mock.call(self.worker._tenant_id, timestamp) + ]) + + @mock.patch("cloudkitty.dataframe.DataFrame") + def test_execute_measurements_rating(self, dataframe_mock): + new_data_frame_mock = mock.Mock() + + dataframe_mock.return_value = new_data_frame_mock + processor_mock_1 = mock.Mock() + + return_processor_1 = mock.Mock() + processor_mock_1.obj.process.return_value = return_processor_1 + + processor_mock_2 = mock.Mock() + return_processor_2 = mock.Mock() + processor_mock_2.obj.process.return_value = return_processor_2 + + self.worker._processors = [processor_mock_1, processor_mock_2] + + start_time = tzutils.localized_now() + end_time = start_time + datetime.timedelta(hours=1) + return_of_method = self.worker.execute_measurements_rating( + end_time, start_time, {}) + + self.assertEqual(return_processor_2, return_of_method) + + processor_mock_1.obj.process.assert_has_calls([ + mock.call(new_data_frame_mock) + ]) + processor_mock_2.obj.process.assert_has_calls([ + mock.call(return_processor_1) + ]) + dataframe_mock.assert_has_calls([ + mock.call(start=start_time, end=end_time, usage={}) + ]) + + def test_persist_rating_data(self): + start_time = tzutils.localized_now() + end_time = start_time + datetime.timedelta(hours=1) + + frame = {"id": "sd"} + self.worker.persist_rating_data(end_time, frame, start_time) + + self.storage_mock.push.assert_has_calls([ + mock.call([frame], self.worker._tenant_id) + ]) + + @mock.patch("cloudkitty.orchestrator.Worker._do_collection") + @mock.patch("cloudkitty.orchestrator.Worker.execute_measurements_rating") + @mock.patch("cloudkitty.orchestrator.Worker.persist_rating_data") + @mock.patch("cloudkitty.orchestrator.Worker" + ".update_scope_processing_state_db") + def test_do_execute_scope_processing_with_no_usage_data( + self, update_scope_processing_state_db_mock, + persist_rating_data_mock, execute_measurements_rating_mock, + do_collection_mock): + self.worker._conf = {"metrics": {"metric1": "s", "metric2": "d"}} + do_collection_mock.return_value = None + + timestamp_now = tzutils.localized_now() + self.worker.do_execute_scope_processing(timestamp_now) + + do_collection_mock.assert_has_calls([ + mock.call(["metric1", "metric2"], timestamp_now) + ]) + + self.assertFalse(execute_measurements_rating_mock.called) + self.assertFalse(persist_rating_data_mock.called) + self.assertTrue(update_scope_processing_state_db_mock.called) + + @mock.patch("cloudkitty.orchestrator.Worker._do_collection") + @mock.patch("cloudkitty.orchestrator.Worker.execute_measurements_rating") + @mock.patch("cloudkitty.orchestrator.Worker.persist_rating_data") + @mock.patch("cloudkitty.orchestrator.Worker" + ".update_scope_processing_state_db") + def test_do_execute_scope_processing_with_usage_data( + self, update_scope_processing_state_db_mock, + persist_rating_data_mock, execute_measurements_rating_mock, + do_collection_mock): + self.worker._conf = {"metrics": {"metric1": "s", "metric2": "d"}} + + usage_data_mock = {"some_usage_data": 2} + do_collection_mock.return_value = usage_data_mock + + execute_measurements_rating_mock_return = mock.Mock() + execute_measurements_rating_mock.return_value =\ + execute_measurements_rating_mock_return + + timestamp_now = tzutils.localized_now() + self.worker.do_execute_scope_processing(timestamp_now) + + do_collection_mock.assert_has_calls([ + mock.call(["metric1", "metric2"], timestamp_now) + ]) + + end_time = tzutils.add_delta( + timestamp_now, datetime.timedelta(seconds=self.worker._period)) + execute_measurements_rating_mock.assert_has_calls([ + mock.call(end_time, timestamp_now, usage_data_mock) + ]) + + persist_rating_data_mock.assert_has_calls([ + mock.call(end_time, execute_measurements_rating_mock_return, + timestamp_now) + ]) + self.assertTrue(update_scope_processing_state_db_mock.called) + + @mock.patch("cloudkitty.storage_state.StateManager.get_state") + @mock.patch("cloudkitty.storage_state.StateManager" + ".is_storage_scope_active") + @mock.patch("cloudkitty.orchestrator.Worker.do_execute_scope_processing") + def test_execute_worker_processing_no_next_timestamp( + self, do_execute_scope_processing_mock, + state_manager_is_storage_scope_active_mock, + state_manager_get_stage_mock): + + next_timestamp_to_process_mock = mock.Mock() + next_timestamp_to_process_mock.return_value = None + + self.worker.next_timestamp_to_process = next_timestamp_to_process_mock + + return_method_value = self.worker.execute_worker_processing() + + self.assertFalse(return_method_value) + self.assertFalse(state_manager_get_stage_mock.called) + self.assertFalse(state_manager_is_storage_scope_active_mock.called) + self.assertFalse(do_execute_scope_processing_mock.called) + self.assertTrue(next_timestamp_to_process_mock.called) + + @mock.patch("cloudkitty.storage_state.StateManager.get_state") + @mock.patch("cloudkitty.storage_state.StateManager" + ".is_storage_scope_active") + @mock.patch("cloudkitty.orchestrator.Worker.do_execute_scope_processing") + def test_execute_worker_processing_scope_not_processed_yet( + self, do_execute_scope_processing_mock, + state_manager_is_storage_scope_active_mock, + state_manager_get_stage_mock): + + timestamp_now = tzutils.localized_now() + next_timestamp_to_process_mock = mock.Mock() + next_timestamp_to_process_mock.return_value = timestamp_now + + self.worker.next_timestamp_to_process = next_timestamp_to_process_mock + + state_manager_get_stage_mock.return_value = None + return_method_value = self.worker.execute_worker_processing() + + self.assertTrue(return_method_value) + + state_manager_get_stage_mock.assert_has_calls([ + mock.call(self.worker._tenant_id) + ]) + + do_execute_scope_processing_mock.assert_has_calls([ + mock.call(timestamp_now) + ]) + self.assertFalse(state_manager_is_storage_scope_active_mock.called) + self.assertTrue(next_timestamp_to_process_mock.called) + + @mock.patch("cloudkitty.storage_state.StateManager.get_state") + @mock.patch("cloudkitty.storage_state.StateManager" + ".is_storage_scope_active") + @mock.patch("cloudkitty.orchestrator.Worker.do_execute_scope_processing") + def test_execute_worker_processing_scope_already_processed_active( + self, do_execute_scope_processing_mock, + state_manager_is_storage_scope_active_mock, + state_manager_get_stage_mock): + + timestamp_now = tzutils.localized_now() + next_timestamp_to_process_mock = mock.Mock() + next_timestamp_to_process_mock.return_value = timestamp_now + + self.worker.next_timestamp_to_process = next_timestamp_to_process_mock + + state_manager_get_stage_mock.return_value = mock.Mock() + state_manager_is_storage_scope_active_mock.return_value = True + + return_method_value = self.worker.execute_worker_processing() + + self.assertTrue(return_method_value) + + state_manager_get_stage_mock.assert_has_calls([ + mock.call(self.worker._tenant_id) + ]) + + do_execute_scope_processing_mock.assert_has_calls([ + mock.call(timestamp_now) + ]) + state_manager_is_storage_scope_active_mock.assert_has_calls([ + mock.call(self.worker._tenant_id) + ]) + + self.assertTrue(next_timestamp_to_process_mock.called) + + @mock.patch("cloudkitty.storage_state.StateManager.get_state") + @mock.patch("cloudkitty.storage_state.StateManager" + ".is_storage_scope_active") + @mock.patch("cloudkitty.orchestrator.Worker.do_execute_scope_processing") + def test_execute_worker_processing_scope_already_processed_inactive( + self, do_execute_scope_processing_mock, + state_manager_is_storage_scope_active_mock, + state_manager_get_stage_mock): + + timestamp_now = tzutils.localized_now() + next_timestamp_to_process_mock = mock.Mock() + next_timestamp_to_process_mock.return_value = timestamp_now + + self.worker.next_timestamp_to_process = next_timestamp_to_process_mock + + state_manager_get_stage_mock.return_value = mock.Mock() + state_manager_is_storage_scope_active_mock.return_value = False + + return_method_value = self.worker.execute_worker_processing() + + self.assertFalse(return_method_value) + + state_manager_get_stage_mock.assert_has_calls([ + mock.call(self.worker._tenant_id) + ]) + + state_manager_is_storage_scope_active_mock.assert_has_calls([ + mock.call(self.worker._tenant_id) + ]) + + self.assertTrue(next_timestamp_to_process_mock.called) + self.assertFalse(do_execute_scope_processing_mock.called) + + @mock.patch("cloudkitty.orchestrator.Worker.execute_worker_processing") + def test_run(self, execute_worker_processing_mock): + execute_worker_processing_mock.side_effect = [True, True, False, True] + + self.worker.run() + + self.assertEqual(execute_worker_processing_mock.call_count, 3) + + def test_collect_no_data(self): + metric = "metric1" + timestamp_now = tzutils.localized_now() + + self.collector_mock.retrieve.return_value = (metric, None) + + expected_message = "Collector 'toString' returned no data for " \ + "resource 'metric1'" + expected_message = re.escape(expected_message) + + self.assertRaisesRegexp( + collector.NoDataCollected, expected_message, self.worker._collect, + metric, timestamp_now) + + next_timestamp = tzutils.add_delta( + timestamp_now, datetime.timedelta(seconds=self.worker._period)) + + self.collector_mock.retrieve.assert_has_calls([ + mock.call(metric, timestamp_now, next_timestamp, + self.worker._tenant_id)]) + + def test_collect_with_data(self): + metric = "metric1" + timestamp_now = tzutils.localized_now() + + usage_data = {"some_usage_data": 3} + self.collector_mock.retrieve.return_value = (metric, usage_data) + + return_of_method = self.worker._collect(metric, timestamp_now) + + next_timestamp = tzutils.add_delta( + timestamp_now, datetime.timedelta(seconds=self.worker._period)) + + self.collector_mock.retrieve.assert_has_calls([ + mock.call(metric, timestamp_now, next_timestamp, + self.worker._tenant_id)]) + + self.assertEqual((metric, usage_data), return_of_method) + + @mock.patch("cloudkitty.utils.check_time_state") + def test_check_state(self, check_time_state_mock): + state_mock = mock.Mock() + + timestamp_now = tzutils.localized_now() + state_mock._state.get_state.return_value = timestamp_now + + expected_time = timestamp_now + datetime.timedelta(hours=1) + check_time_state_mock.return_value = \ + expected_time + + return_of_method = orchestrator._check_state( + state_mock, 3600, self._tenant_id) + + self.assertEqual(expected_time, return_of_method) + + state_mock._state.get_state.assert_has_calls([ + mock.call(self._tenant_id)]) + check_time_state_mock.assert_has_calls([ + mock.call(timestamp_now, 3600, 2)]) + + +class CloudKittyReprocessorTest(tests.TestCase): + + def setUp(self): + super(CloudKittyReprocessorTest, self).setUp() + + @mock.patch("cloudkitty.orchestrator.CloudKittyProcessor.__init__") + @mock.patch("cloudkitty.storage_state.ReprocessingSchedulerDb") + def test_generate_lock_base_name(self, reprocessing_scheduler_db_mock, + cloudkitty_processor_init_mock): + + scope_mock = mock.Mock() + scope_mock.identifier = "scope_identifier" + + return_generate_lock_name = orchestrator.CloudKittyReprocessor( + 1).generate_lock_base_name(scope_mock) + + expected_lock_name = "-id=scope_identifier-" \ + "start=%s-end=%s-current=%s" % ( + scope_mock.start_reprocess_time, + scope_mock.end_reprocess_time, + scope_mock.current_reprocess_time) + + self.assertEqual(expected_lock_name, return_generate_lock_name) + + cloudkitty_processor_init_mock.assert_called_once() + reprocessing_scheduler_db_mock.assert_called_once() + + @mock.patch("cloudkitty.orchestrator.CloudKittyProcessor.__init__") + @mock.patch("cloudkitty.storage_state.ReprocessingSchedulerDb.get_all") + def test_load_scopes_to_process(self, scheduler_db_mock_get_all_mock, + cloudkitty_processor_init_mock): + scheduler_db_mock_get_all_mock.return_value = ["teste"] + + reprocessor = CloudKittyReprocessorTest.create_cloudkitty_reprocessor() + reprocessor.load_scopes_to_process() + + self.assertEqual(["teste"], reprocessor.tenants) + cloudkitty_processor_init_mock.assert_called_once() + scheduler_db_mock_get_all_mock.assert_called_once() + + @mock.patch("cloudkitty.orchestrator.CloudKittyProcessor.__init__") + @mock.patch("cloudkitty.storage_state.ReprocessingSchedulerDb.get_from_db") + def test_next_timestamp_to_process_processing_finished( + self, scheduler_db_mock_get_from_db_mock, + cloudkitty_processor_init_mock): + + start_time = tzutils.localized_now() + + scope = CloudKittyReprocessorTest.create_scope_mock(start_time) + + scheduler_db_mock_get_from_db_mock.return_value = None + + reprocessor = CloudKittyReprocessorTest.create_cloudkitty_reprocessor() + + next_timestamp = reprocessor._next_timestamp_to_process(scope) + + expected_calls = [ + mock.call(identifier=scope.identifier, + start_reprocess_time=scope.start_reprocess_time, + end_reprocess_time=scope.end_reprocess_time)] + + self.assertIsNone(next_timestamp) + cloudkitty_processor_init_mock.assert_called_once() + scheduler_db_mock_get_from_db_mock.assert_has_calls(expected_calls) + + @staticmethod + def create_scope_mock(start_time): + scope = mock.Mock() + scope.identifier = "scope_identifier" + scope.start_reprocess_time = start_time + scope.current_reprocess_time = None + scope.end_reprocess_time = start_time + datetime.timedelta(hours=1) + return scope + + @staticmethod + def create_cloudkitty_reprocessor(): + reprocessor = orchestrator.CloudKittyReprocessor(1) + reprocessor._worker_id = 1 + + return reprocessor + + @mock.patch("cloudkitty.orchestrator.CloudKittyProcessor.__init__") + @mock.patch("cloudkitty.storage_state.ReprocessingSchedulerDb.get_from_db") + def test_next_timestamp_to_process( + self, scheduler_db_mock_get_from_db_mock, + cloudkitty_processor_init_mock): + + start_time = tzutils.localized_now() + + scope = CloudKittyReprocessorTest.create_scope_mock(start_time) + + scheduler_db_mock_get_from_db_mock.return_value = scope + + reprocessor = CloudKittyReprocessorTest.create_cloudkitty_reprocessor() + + next_timestamp = reprocessor._next_timestamp_to_process(scope) + + expected_calls = [ + mock.call(identifier=scope.identifier, + start_reprocess_time=scope.start_reprocess_time, + end_reprocess_time=scope.end_reprocess_time)] + + # There is no current timestamp in the mock object. + # Therefore, the next to process is the start timestamp + expected_next_timestamp = start_time + self.assertEqual(expected_next_timestamp, next_timestamp) + cloudkitty_processor_init_mock.assert_called_once() + scheduler_db_mock_get_from_db_mock.assert_has_calls(expected_calls) + + +class CloudKittyProcessorTest(tests.TestCase): + + def setUp(self): + super(CloudKittyProcessorTest, self).setUp() + + patcher_oslo_messaging_target = mock.patch("oslo_messaging.Target") + self.addCleanup(patcher_oslo_messaging_target.stop) + self.oslo_messaging_target_mock = patcher_oslo_messaging_target.start() + + patcher_messaging_get_server = mock.patch( + "cloudkitty.messaging.get_server") + + self.addCleanup(patcher_messaging_get_server.stop) + self.messaging_get_server_mock = patcher_messaging_get_server.start() + + patcher_driver_manager = mock.patch("stevedore.driver.DriverManager") + self.addCleanup(patcher_driver_manager.stop) + self.driver_manager_mock = patcher_driver_manager.start() + + get_collector_manager = mock.patch( + "cloudkitty.collector.get_collector") + self.addCleanup(get_collector_manager.stop) + self.get_collector_mock = get_collector_manager.start() + + self.worker_id = 1 + self.cloudkitty_processor = orchestrator.CloudKittyProcessor( + self.worker_id) + + def test_init_messaging(self): + server_mock = mock.Mock() + self.messaging_get_server_mock.return_value = server_mock + + target_object_mock = mock.Mock() + self.oslo_messaging_target_mock.return_value = target_object_mock + + self.cloudkitty_processor._init_messaging() + + server_mock.start.assert_called_once() + self.oslo_messaging_target_mock.assert_has_calls([ + mock.call(topic='cloudkitty', server=orchestrator.CONF.host, + version='1.0')]) + + self.messaging_get_server_mock.assert_has_calls([ + mock.call(target_object_mock, [ + self.cloudkitty_processor._rating_endpoint, + self.cloudkitty_processor._scope_endpoint])]) + + @mock.patch("time.sleep") + @mock.patch("cloudkitty.orchestrator.CloudKittyProcessor." + "load_scopes_to_process") + @mock.patch("cloudkitty.orchestrator.CloudKittyProcessor." + "process_scope") + @mock.patch("cloudkitty.orchestrator.get_lock") + def test_internal_run(self, get_lock_mock, process_scope_mock, + load_scopes_to_process_mock, sleep_mock): + + lock_mock = mock.Mock() + lock_mock.acquire.return_value = True + get_lock_mock.return_value = ("lock_name", lock_mock) + + self.cloudkitty_processor.tenants = ["tenant1"] + + self.cloudkitty_processor.internal_run() + + lock_mock.acquire.assert_has_calls([mock.call(blocking=False)]) + lock_mock.release.assert_called_once() + + get_lock_mock.assert_has_calls( + [mock.call(self.cloudkitty_processor.coord, "tenant1")]) + + sleep_mock.assert_called_once() + process_scope_mock.assert_called_once() + load_scopes_to_process_mock.assert_called_once() + + @mock.patch("cloudkitty.orchestrator.Worker") + def test_process_scope_no_next_timestamp(self, worker_class_mock): + + original_next_timestamp_method = \ + self.cloudkitty_processor.next_timestamp_to_process + next_timestamp_mock_method = mock.Mock() + try: + self.cloudkitty_processor.next_timestamp_to_process =\ + next_timestamp_mock_method + + scope_mock = mock.Mock() + next_timestamp_mock_method.return_value = None + + self.cloudkitty_processor.process_scope(scope_mock) + + next_timestamp_mock_method.assert_has_calls( + [mock.call(scope_mock)]) + self.assertFalse(worker_class_mock.called) + finally: + self.cloudkitty_processor.next_timestamp_to_process =\ + original_next_timestamp_method + + @mock.patch("cloudkitty.orchestrator.Worker") + def test_process_scope(self, worker_class_mock): + original_next_timestamp_method =\ + self.cloudkitty_processor.next_timestamp_to_process + next_timestamp_mock_method = mock.Mock() + + worker_mock = mock.Mock() + worker_class_mock.return_value = worker_mock + + original_worker_class = self.cloudkitty_processor.worker_class + self.cloudkitty_processor.worker_class = worker_class_mock + + try: + self.cloudkitty_processor.next_timestamp_to_process =\ + next_timestamp_mock_method + + scope_mock = mock.Mock() + next_timestamp_mock_method.return_value = tzutils.localized_now() + + self.cloudkitty_processor.process_scope(scope_mock) + + next_timestamp_mock_method.assert_has_calls( + [mock.call(scope_mock)]) + worker_class_mock.assert_has_calls( + [mock.call(self.cloudkitty_processor.collector, + self.cloudkitty_processor.storage, scope_mock, + self.cloudkitty_processor._worker_id)]) + + worker_mock.run.assert_called_once() + finally: + self.cloudkitty_processor.next_timestamp_to_process =\ + original_next_timestamp_method + self.cloudkitty_processor.worker_class = original_worker_class + + def test_generate_lock_base_name(self): + generated_lock_name = self.cloudkitty_processor.\ + generate_lock_base_name("scope_id") + + self.assertEqual("scope_id", generated_lock_name) + + def test_load_scopes_to_process(self): + fetcher_mock = mock.Mock() + self.cloudkitty_processor.fetcher = fetcher_mock + + fetcher_mock.get_tenants.return_value = ["scope_1"] + + self.cloudkitty_processor.load_scopes_to_process() + + fetcher_mock.get_tenants.assert_called_once() + self.assertEqual(["scope_1"], self.cloudkitty_processor.tenants) + + def test_terminate(self): + coordinator_mock = mock.Mock() + self.cloudkitty_processor.coord = coordinator_mock + + self.cloudkitty_processor.terminate() + + coordinator_mock.stop.assert_called_once() + + +class ReprocessingWorkerTest(tests.TestCase): + + def setUp(self): + super(ReprocessingWorkerTest, self).setUp() + + patcher_reprocessing_scheduler_db_get_from_db = mock.patch( + "cloudkitty.storage_state.ReprocessingSchedulerDb.get_from_db") + self.addCleanup(patcher_reprocessing_scheduler_db_get_from_db.stop) + self.reprocessing_scheduler_db_get_from_db_mock =\ + patcher_reprocessing_scheduler_db_get_from_db.start() + + patcher_state_manager_get_all = mock.patch( + "cloudkitty.storage_state.StateManager.get_all") + self.addCleanup(patcher_state_manager_get_all.stop) + self.state_manager_get_all_mock = patcher_state_manager_get_all.start() + + self.collector_mock = mock.Mock() + self.storage_mock = mock.Mock() + + self.scope_key_mock = "key_mock" + self.worker_id = 1 + self.scope_id = "scope_id1" + self.scope_mock = mock.Mock() + self.scope_mock.identifier = self.scope_id + + load_conf_manager = mock.patch("cloudkitty.utils.load_conf") + self.addCleanup(load_conf_manager.stop) + self.load_conf_mock = load_conf_manager.start() + + def to_string_scope_mock(self): + return "toStringMock" + self.scope_mock.__str__ = to_string_scope_mock + self.scope_mock.scope_key = self.scope_key_mock + + self.state_manager_get_all_mock.return_value = [self.scope_mock] + + self.reprocessing_worker = self.create_reprocessing_worker() + + self.mock_scheduler = mock.Mock() + self.mock_scheduler.identifier = self.scope_id + + self.start_schedule_mock = tzutils.localized_now() + self.mock_scheduler.start_reprocess_time = self.start_schedule_mock + self.mock_scheduler.current_reprocess_time = None + self.mock_scheduler.end_reprocess_time =\ + self.start_schedule_mock + datetime.timedelta(hours=1) + + def create_reprocessing_worker(self): + return orchestrator.ReprocessingWorker( + self.collector_mock, self.storage_mock, self.scope_mock, + self.worker_id) + + def test_load_scope_key_scope_not_found(self): + self.state_manager_get_all_mock.return_value = [] + + expected_message = "Scope [toStringMock] scheduled for reprocessing " \ + "does not seem to exist anymore." + expected_message = re.escape(expected_message) + + self.assertRaisesRegexp(Exception, expected_message, + self.reprocessing_worker.load_scope_key) + + self.state_manager_get_all_mock.assert_has_calls([ + mock.call(self.reprocessing_worker._tenant_id)]) + + def test_load_scope_key_more_than_one_scope_found(self): + self.state_manager_get_all_mock.return_value = [ + self.scope_mock, self.scope_mock] + + expected_message = "Unexpected number of storage state entries " \ + "found for scope [toStringMock]." + expected_message = re.escape(expected_message) + + self.assertRaisesRegexp(Exception, expected_message, + self.reprocessing_worker.load_scope_key) + + self.state_manager_get_all_mock.assert_has_calls([ + mock.call(self.reprocessing_worker._tenant_id)]) + + def test_load_scope_key(self): + self.reprocessing_worker.load_scope_key() + + self.state_manager_get_all_mock.assert_has_calls([ + mock.call(self.reprocessing_worker._tenant_id)]) + + self.assertEqual(self.scope_key_mock, + self.reprocessing_worker.scope_key) + + @mock.patch("cloudkitty.orchestrator.ReprocessingWorker" + ".generate_next_timestamp") + def test_next_timestamp_to_process_no_db_item( + self, generate_next_timestamp_mock): + + self.reprocessing_scheduler_db_get_from_db_mock.return_value = [] + + self.reprocessing_worker._next_timestamp_to_process() + + self.reprocessing_scheduler_db_get_from_db_mock.assert_has_calls([ + mock.call( + identifier=self.scope_mock.identifier, + start_reprocess_time=self.scope_mock.start_reprocess_time, + end_reprocess_time=self.scope_mock.end_reprocess_time)]) + + self.assertFalse(generate_next_timestamp_mock.called) + + @mock.patch("cloudkitty.orchestrator.ReprocessingWorker" + ".generate_next_timestamp") + def test_next_timestamp_to_process(self, generate_next_timestamp_mock): + self.reprocessing_scheduler_db_get_from_db_mock.\ + return_value = self.scope_mock + + self.reprocessing_worker._next_timestamp_to_process() + + self.reprocessing_scheduler_db_get_from_db_mock.assert_has_calls([ + mock.call( + identifier=self.scope_mock.identifier, + start_reprocess_time=self.scope_mock.start_reprocess_time, + end_reprocess_time=self.scope_mock.end_reprocess_time)]) + + generate_next_timestamp_mock.assert_has_calls([ + mock.call(self.scope_mock, self.reprocessing_worker._period)]) + + def test_generate_next_timestamp_no_current_processing(self): + next_timestamp = self.reprocessing_worker.generate_next_timestamp( + self.mock_scheduler, 300) + + self.assertEqual(self.start_schedule_mock, next_timestamp) + + self.mock_scheduler.start_reprocess_time += datetime.timedelta(hours=2) + + next_timestamp = self.reprocessing_worker.generate_next_timestamp( + self.mock_scheduler, 300) + + self.assertIsNone(next_timestamp) + + def test_generate_next_timestamp_with_current_processing(self): + period = 300 + + self.mock_scheduler.current_reprocess_time =\ + self.start_schedule_mock + datetime.timedelta(seconds=period) + + expected_next_time_stamp =\ + self.mock_scheduler.current_reprocess_time + datetime.timedelta( + seconds=period) + + next_timestamp = self.reprocessing_worker.generate_next_timestamp( + self.mock_scheduler, period) + + self.assertEqual(expected_next_time_stamp, next_timestamp) + + self.mock_scheduler.current_reprocess_time +=\ + datetime.timedelta(hours=2) + + next_timestamp = self.reprocessing_worker.generate_next_timestamp( + self.mock_scheduler, period) + + self.assertIsNone(next_timestamp) + + @mock.patch("cloudkitty.orchestrator.Worker.do_execute_scope_processing") + def test_do_execute_scope_processing( + self, do_execute_scope_processing_mock_from_worker): + + now_timestamp = tzutils.localized_now() + self.reprocessing_worker.do_execute_scope_processing(now_timestamp) + + expected_end = tzutils.localized_now() + datetime.timedelta( + seconds=self.reprocessing_worker._period) + + self.storage_mock.delete.assert_has_calls([ + mock.call(begin=now_timestamp, end=expected_end, + filters={self.reprocessing_worker.scope_key: + self.reprocessing_worker._tenant_id})]) + + do_execute_scope_processing_mock_from_worker.assert_has_calls([ + mock.call(now_timestamp)]) + + @mock.patch("cloudkitty.storage_state.ReprocessingSchedulerDb" + ".update_reprocessing_time") + def test_update_scope_processing_state_db( + self, update_reprocessing_time_mock): + + timestamp_now = tzutils.localized_now() + self.reprocessing_worker.update_scope_processing_state_db( + timestamp_now) + + start_time = self.reprocessing_worker.scope.start_reprocess_time + end_time = self.reprocessing_worker.scope.end_reprocess_time + update_reprocessing_time_mock.assert_has_calls([ + mock.call( + identifier=self.reprocessing_worker.scope.identifier, + start_reprocess_time=start_time, end_reprocess_time=end_time, + new_current_time_stamp=timestamp_now)]) diff --git a/doc/source/_static/cloudkitty.conf.sample b/doc/source/_static/cloudkitty.conf.sample index fff95428..0915249b 100644 --- a/doc/source/_static/cloudkitty.conf.sample +++ b/doc/source/_static/cloudkitty.conf.sample @@ -753,15 +753,21 @@ # Coordination backend URL (string value) #coordination_url = file:///var/lib/cloudkitty/locks -# Maximal number of workers to run. Defaults to the number of -# available CPUs (integer value) -# Minimum value: 1 +# Max number of workers to execute the rating process. Defaults to the +# number of available CPU cores. (integer value) +# Minimum value: 0 + # # This option has a sample default set, which means that # its actual default value may vary from the one documented # below. #max_workers = 4 +# Max number of workers to execute the reprocessing. Defaults to the +# number of available CPU cores. (integer value) +# Minimum value: 0 +#max_workers_reprocessing = 8 + # Maximal number of threads to use per worker. Defaults to 5 times the # number of available CPUs (integer value) # Minimum value: 1 diff --git a/doc/source/_static/cloudkitty.policy.yaml.sample b/doc/source/_static/cloudkitty.policy.yaml.sample index a7886c63..78c56cf5 100644 --- a/doc/source/_static/cloudkitty.policy.yaml.sample +++ b/doc/source/_static/cloudkitty.policy.yaml.sample @@ -105,3 +105,11 @@ # GET /v2/summary #"summary:get_summary": "rule:admin_or_owner" +# Schedule a scope for reprocessing +# POST /v2/task/reprocesses +#"schedule:task_reprocesses": "role:admin" + +# Get reprocessing schedule tasks for scopes. +# GET /v2/task/reprocesses +#"schedule:get_task_reprocesses": "role:admin" + diff --git a/doc/source/api-reference/v2/index.rst b/doc/source/api-reference/v2/index.rst index 71131421..37911060 100644 --- a/doc/source/api-reference/v2/index.rst +++ b/doc/source/api-reference/v2/index.rst @@ -3,3 +3,4 @@ .. include:: dataframes/dataframes.inc .. include:: scope/scope.inc .. include:: summary/summary.inc +.. include:: task/reprocessing.inc diff --git a/doc/source/api-reference/v2/task/reprocessing.inc b/doc/source/api-reference/v2/task/reprocessing.inc new file mode 100644 index 00000000..7588d292 --- /dev/null +++ b/doc/source/api-reference/v2/task/reprocessing.inc @@ -0,0 +1,147 @@ +====================== +Task schedule endpoint +====================== +CloudKitty has a task endpoint `/v2/task/`, which allows +operators to schedule administrative tasks, such as reprocessing. + +Currently, the only task available is the reprocessing one, which is avaiable +via the following endpoints. + + - POST `/v2/task/reprocesses` -- to create a reprocessing task. + - GET `/v2/task/reprocesses/` -- to retrieve a reprocessing task. + - GET `/v2/task/reprocesses` -- to retrieve all reprocessing task. + +Create a reprocessing task +========================== + +The endpoint used to schedule a reprocessing task. The scheduled tasks are +loaded to execution once every processing cycle, as defined in the +CloudKitty `period` configuration. + +.. rest_method:: POST `/v2/task/reprocesses` + +.. rest_parameters:: task/reprocessing_parameters.yml + + - scope_ids: scope_ids + - start_reprocess_time: start_reprocess_time + - end_reprocess_time: end_reprocess_time + - reason: reason + +Status codes +------------ + +.. rest_status_code:: success http_status.yml + + - 200 + +.. rest_status_code:: error http_status.yml + + - 400 + - 403 + - 405 + +Response +-------- + +We will return an empty object as the response in case of success: + +.. code-block:: javascript + + {} + +Example +------- +.. code-block:: shell + + curl -s -X POST "https:///v2/task/reprocess" -H "Accept: application/json" -H "User-Agent: python-keystoneclient" -H "X-Auth-Token: ${ACCESS_TOKEN_KEYSTONE}" -H "Content-Type: application/json" -d '{"reason": "Reprocessing test", "scope_ids": "", "start_reprocess_time": "2021-06-01 00:00:00+00:00", "end_reprocess_time": "2021-06-01 23:00:00+00:00"}' + + +The scope IDs can be retrieved via "/v2/scope" API, which is the API that one can use to list all scopes, and their status. + +Retrieve a reprocessing task +============================ + +The endpoint used to retrieve a reprocessing task. By using this endpoint, one +can for instance check the progress of the reprocessing tasks. + +.. rest_method:: GET `/v2/task/reprocesses/` + +.. rest_parameters:: task/reprocessing_parameters.yml + + - path_scope_id: path_scope_id + +Status codes +------------ + +.. rest_status_code:: success http_status.yml + + - 200 + +.. rest_status_code:: error http_status.yml + + - 400 + - 403 + - 405 + +Response +-------- + +We will return the scope data in case of a valid scope ID: + +.. code-block:: javascript + + {"scope_id": "scope ID goes here", + "reason": "The reason for this reprocessing for this scope", + "start_reprocess_time": "2021-06-01 00:00:00+00:00", + "end_reprocess_time": "2021-07-01 00:00:00+00:00", + "current_reprocess_time": "2021-06-06 00:00:00+00:00"} + +Example +------- +.. code-block:: shell + + curl -s -X GET "https:///v2/task/reprocesses/" -H "Accept: application/json" -H "User-Agent: python-keystoneclient" -H "X-Auth-Token: ${ACCESS_TOKEN_KEYSTONE}" + +Retrieve all reprocessing tasks +=============================== + +The endpoint used to retrieve all reprocessing tasks. By using this endpoint, +one can retrieve all reprocessing tasks scheduled for a scope. + +.. rest_method:: GET `/v2/task/reprocesses` + +.. rest_parameters:: task/reprocessing_parameters.yml + + - scope_ids: scope_ids_query + +Status codes +------------ + +.. rest_status_code:: success http_status.yml + + - 200 + +.. rest_status_code:: error http_status.yml + + - 400 + - 403 + - 405 + +Response +-------- + +We will return the scope data in case of a valid scope ID: + +.. code-block:: javascript + + [{"scope_id": "scope ID goes here", + "reason": "The reason for this reprocessing for this scope", + "start_reprocess_time": "2021-06-01 00:00:00+00:00", + "end_reprocess_time": "2021-07-01 00:00:00+00:00", + "current_reprocess_time": "2021-06-06 00:00:00+00:00"}] + +Example +------- +.. code-block:: shell + + curl -s -X GET "https:///v2/task/reprocesses" -H "Accept: application/json" -H "User-Agent: python-keystoneclient" -H "X-Auth-Token: ${ACCESS_TOKEN_KEYSTONE}" \ No newline at end of file diff --git a/doc/source/api-reference/v2/task/reprocessing_parameters.yml b/doc/source/api-reference/v2/task/reprocessing_parameters.yml new file mode 100644 index 00000000..c6ecf471 --- /dev/null +++ b/doc/source/api-reference/v2/task/reprocessing_parameters.yml @@ -0,0 +1,56 @@ +path_scope_id: + in: path + description: | + The scope ID to retrieve. + type: string + required: true + +limit: + in: query + description: | + For pagination. The maximum number of results to return. + type: int + required: false + +offset: + in: query + description: | + For pagination. The index of the first element that should be returned. + type: int + required: false + +scope_ids_query: &scope_ids_query + in: query + description: | + The scope IDs one wants to retrieve the reprocessing tasks of. If not + informed, all reprocessing tasks, for all scopes are retrieved. + required: false + type: string + +end_reprocess_time: + in: body + description: | + The end date for the reprocessing task. + type: iso8601 timestamp + required: true + +reason: + in: body + description: | + The reason for the reprocessing to take place. + type: string + required: true + +scope_ids: + <<: *scope_ids_query + in: body + description: | + The scope IDs to reprocess. Must be comma-separated to schedule more than one. + required: true + +start_reprocess_time: + in: body + description: | + The start date for the reprocessing task. + type: iso8601 timestamp + required: true diff --git a/lower-constraints.txt b/lower-constraints.txt index e5703d45..ecb00d39 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -21,7 +21,7 @@ oslo.middleware==4.1.1 # Apache-2.0 oslo.policy==3.6.0 # Apache-2.0 oslo.utils==4.7.0 # Apache-2.0 oslo.upgradecheck==1.3.0 # Apache-2.0 -python-dateutil==2.7.0 # BSD +python-dateutil==2.8.0 # BSD SQLAlchemy==1.3.20 # MIT stevedore==3.2.2 # Apache-2.0 tooz==2.7.1 # Apache-2.0 @@ -32,6 +32,7 @@ Flask-RESTful==0.3.9 # BSD cotyledon==1.7.3 # Apache-2.0 futurist==2.3.0 # Apache-2.0 bandit>=1.6.0 # Apache-2.0 +datetimerange==0.6.1 # MIT # test-requirements coverage==5.3 # Apache-2.0 diff --git a/releasenotes/notes/introduce-reprocessing-api-822db3edc256507a.yaml b/releasenotes/notes/introduce-reprocessing-api-822db3edc256507a.yaml new file mode 100644 index 00000000..f7a3d261 --- /dev/null +++ b/releasenotes/notes/introduce-reprocessing-api-822db3edc256507a.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + Introduce the reprocessing schedule API, which allows operators to + schedule reprocessing tasks to reprocess scopes in given timeframes. diff --git a/requirements.txt b/requirements.txt index 2678a9fe..a7904de5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,7 +23,7 @@ oslo.middleware>=4.1.1 # Apache-2.0 oslo.policy>=3.6.0 # Apache-2.0 oslo.utils>=4.7.0 # Apache-2.0 oslo.upgradecheck>=1.3.0 # Apache-2.0 -python-dateutil>=2.7.0 # BSD +python-dateutil>=2.8.0 # BSD SQLAlchemy>=1.3.20 # MIT stevedore>=3.2.2 # Apache-2.0 tooz>=2.7.1 # Apache-2.0 @@ -33,3 +33,4 @@ Flask>=2.0.0 # BSD Flask-RESTful>=0.3.9 # BSD cotyledon>=1.7.3 # Apache-2.0 futurist>=2.3.0 # Apache-2.0 +datetimerange>=0.6.1 # MIT