From 6b7b58ed6c9dcad1a4db07586ea299bf01661523 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Tue, 17 Apr 2018 17:38:07 +0700 Subject: [PATCH] Add '__task_execution' structure to task execution context on the fly * Previously we stored the data structure describing the current task execution (id and name) in the inbound task execution context directly so that it'd be saved to DB. This was needed to evaluate YAQL/Jinja function task() without parameters properly. However, it's not needed, we can just build a context view on the fly just before evaluating an expression. Change-Id: If523039446ab3e2ccc9542617de2a170168f6e20 Closes-Bug: #1764704 --- mistral/engine/policies.py | 1 + mistral/engine/tasks.py | 17 +++++-------- .../tests/unit/engine/test_direct_workflow.py | 8 ------- .../tests/unit/engine/test_yaql_functions.py | 11 +++++++-- mistral/workflow/data_flow.py | 24 +++++++++---------- mistral/workflow/direct_workflow.py | 5 +--- 6 files changed, 29 insertions(+), 37 deletions(-) diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index c570de7c9..418ca5ebc 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -338,6 +338,7 @@ class RetryPolicy(base.TaskPolicy): wf_ex = task_ex.workflow_execution ctx_view = data_flow.ContextView( + data_flow.get_current_task_dict(task_ex), data_flow.evaluate_task_outbound_context(task_ex), wf_ex.context, wf_ex.input diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index b1cb0abd4..c747eaa8a 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -311,8 +311,6 @@ class Task(object): task_name = self.task_spec.get_name() task_type = self.task_spec.get_type() - data_flow.add_current_task_to_context(self.ctx, task_id, task_name) - values = { 'id': task_id, 'name': task_name, @@ -430,16 +428,13 @@ class RegularTask(Task): self._schedule_actions() def _update_inbound_context(self): - task_ex = self.task_ex - assert task_ex + assert self.task_ex wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec) self.ctx = wf_ctrl.get_task_inbound_context(self.task_spec) - data_flow.add_current_task_to_context(self.ctx, task_ex.id, - task_ex.name) - utils.update_dict(task_ex.in_context, self.ctx) + utils.update_dict(self.task_ex.in_context, self.ctx) def _update_triggered_by(self): assert self.task_ex @@ -515,17 +510,17 @@ class RegularTask(Task): ) def _evaluate_expression(self, expression, ctx=None): - ctx = ctx or self.ctx ctx_view = data_flow.ContextView( - ctx, + data_flow.get_current_task_dict(self.task_ex), + ctx or self.ctx, self.wf_ex.context, self.wf_ex.input ) - input_dict = expr.evaluate_recursively( + + return expr.evaluate_recursively( expression, ctx_view ) - return input_dict def _build_action(self): action_name = self.task_spec.get_action_name() diff --git a/mistral/tests/unit/engine/test_direct_workflow.py b/mistral/tests/unit/engine/test_direct_workflow.py index 621f9a238..0836fc1ef 100644 --- a/mistral/tests/unit/engine/test_direct_workflow.py +++ b/mistral/tests/unit/engine/test_direct_workflow.py @@ -846,14 +846,6 @@ class DirectWorkflowEngineTest(base.EngineTestCase): task2_1_ex = self._assert_single_item(tasks_execs, name='task2_1') task2_2_ex = self._assert_single_item(tasks_execs, name='task2_2') - # TODO(rakhmerov): Find out why '__task_execution' is still - # in the inbound context - del task0_ex.in_context['__task_execution'] - del task1_1_ex.in_context['__task_execution'] - del task1_2_ex.in_context['__task_execution'] - del task2_1_ex.in_context['__task_execution'] - del task2_2_ex.in_context['__task_execution'] - self.assertDictEqual({}, task0_ex.in_context) self.assertDictEqual({'var0': 'val0'}, task1_1_ex.in_context) self.assertDictEqual( diff --git a/mistral/tests/unit/engine/test_yaql_functions.py b/mistral/tests/unit/engine/test_yaql_functions.py index 1d1a6ab2f..3bea149e9 100644 --- a/mistral/tests/unit/engine/test_yaql_functions.py +++ b/mistral/tests/unit/engine/test_yaql_functions.py @@ -206,10 +206,12 @@ class YAQLFunctionsEngineTest(engine_test_base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) task1_ex = self._assert_single_item( - wf_ex.task_executions, name='task1' + wf_ex.task_executions, + name='task1' ) task2_ex = self._assert_single_item( - wf_ex.task_executions, name='task2' + wf_ex.task_executions, + name='task2' ) self.assertDictEqual( @@ -229,6 +231,11 @@ class YAQLFunctionsEngineTest(engine_test_base.EngineTestCase): task2_ex.published ) + # The internal data needed for evaluation of the task() function + # should not be persisted to DB. + self.assertNotIn('__task_execution', task1_ex.in_context) + self.assertNotIn('__task_execution', task2_ex.in_context) + def test_task_function_no_name_on_complete_case(self): wf_text = """--- version: '2.0' diff --git a/mistral/workflow/data_flow.py b/mistral/workflow/data_flow.py index 12d43ebc2..6b23d8c50 100644 --- a/mistral/workflow/data_flow.py +++ b/mistral/workflow/data_flow.py @@ -190,7 +190,12 @@ def publish_variables(task_ex, task_spec): wf_ex = task_ex.workflow_execution - expr_ctx = ContextView(task_ex.in_context, wf_ex.context, wf_ex.input) + expr_ctx = ContextView( + get_current_task_dict(task_ex), + task_ex.in_context, + wf_ex.context, + wf_ex.input + ) if task_ex.name in expr_ctx: LOG.warning( @@ -268,19 +273,14 @@ def evaluate_workflow_output(wf_ex, wf_output, ctx): return output or ctx -def add_current_task_to_context(ctx, task_id, task_name): - ctx['__task_execution'] = { - 'id': task_id, - 'name': task_name +def get_current_task_dict(task_ex): + return { + '__task_execution': { + 'id': task_ex.id, + 'name': task_ex.name + } } - return ctx - - -def remove_internal_data_from_context(ctx): - if '__task_execution' in ctx: - del ctx['__task_execution'] - def add_openstack_data_to_context(wf_ex): wf_ex.context = wf_ex.context or {} diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index 017a610d1..98de02d24 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -125,8 +125,6 @@ class DirectWorkflowController(base.WorkflowController): elif not t_s: t_s = self.wf_spec.get_tasks()[task_ex.name] - data_flow.remove_internal_data_from_context(ctx) - triggered_by = [ { 'task_id': task_ex.id, @@ -176,8 +174,6 @@ class DirectWorkflowController(base.WorkflowController): data_flow.evaluate_task_outbound_context(t_ex) ) - data_flow.remove_internal_data_from_context(ctx) - return ctx def get_logical_task_state(self, task_ex): @@ -248,6 +244,7 @@ class DirectWorkflowController(base.WorkflowController): t_name = task_ex.name ctx_view = data_flow.ContextView( + data_flow.get_current_task_dict(task_ex), ctx or data_flow.evaluate_task_outbound_context(task_ex), self.wf_ex.context, self.wf_ex.input