Merge "Fix join on branch error"
This commit is contained in:
@@ -200,7 +200,14 @@ class DefaultEngine(base.Engine, coordination.Service):
|
||||
if states.is_paused_or_completed(wf_ex.state):
|
||||
return
|
||||
|
||||
if wf_utils.find_incomplete_task_executions(wf_ex):
|
||||
# Workflow is not completed if there are any incomplete task
|
||||
# executions that are not in WAITING state. If all incomplete
|
||||
# tasks are waiting and there are unhandled errors, then these
|
||||
# tasks will not reach completion. In this case, mark the
|
||||
# workflow complete.
|
||||
incomplete_tasks = wf_utils.find_incomplete_task_executions(wf_ex)
|
||||
|
||||
if any(not states.is_waiting(t.state) for t in incomplete_tasks):
|
||||
return
|
||||
|
||||
if wf_ctrl.all_errors_handled():
|
||||
|
@@ -513,8 +513,8 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
|
||||
states.SUCCESS))
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.RUNNING, wf_ex.state)
|
||||
self.assertIsNone(wf_ex.state_info)
|
||||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
self.assertIsNotNone(wf_ex.state_info)
|
||||
self.assertEqual(3, len(wf_ex.task_executions))
|
||||
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
|
@@ -556,3 +556,61 @@ class JoinEngineTest(base.EngineTestCase):
|
||||
},
|
||||
exec_db.output
|
||||
)
|
||||
|
||||
def test_full_join_with_branch_errors(self):
|
||||
wf_full_join_with_errors = """---
|
||||
version: '2.0'
|
||||
|
||||
main:
|
||||
type: direct
|
||||
|
||||
tasks:
|
||||
task10:
|
||||
action: std.noop
|
||||
on-success:
|
||||
- task21
|
||||
- task31
|
||||
|
||||
task21:
|
||||
action: std.noop
|
||||
on-success:
|
||||
- task22
|
||||
task22:
|
||||
action: std.noop
|
||||
on-success:
|
||||
- task40
|
||||
|
||||
task31:
|
||||
action: std.fail
|
||||
on-success:
|
||||
- task32
|
||||
task32:
|
||||
action: std.noop
|
||||
on-success:
|
||||
- task40
|
||||
|
||||
task40:
|
||||
join: all
|
||||
action: std.noop
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_full_join_with_errors)
|
||||
wf_ex = self.engine.start_workflow('main', {})
|
||||
|
||||
self._await(lambda: self.is_execution_error(wf_ex.id))
|
||||
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
tasks = wf_ex.task_executions
|
||||
|
||||
task10 = self._assert_single_item(tasks, name='task10')
|
||||
task21 = self._assert_single_item(tasks, name='task21')
|
||||
task22 = self._assert_single_item(tasks, name='task22')
|
||||
task31 = self._assert_single_item(tasks, name='task31')
|
||||
task40 = self._assert_single_item(tasks, name='task40')
|
||||
|
||||
self.assertEqual(states.SUCCESS, task10.state)
|
||||
self.assertEqual(states.SUCCESS, task21.state)
|
||||
self.assertEqual(states.SUCCESS, task22.state)
|
||||
self.assertEqual(states.ERROR, task31.state)
|
||||
self.assertNotIn('task32', [task.name for task in tasks])
|
||||
self.assertEqual(states.WAITING, task40.state)
|
||||
|
@@ -287,7 +287,7 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
)
|
||||
|
||||
# TODO(rakhmerov): Temporary hack. See the previous comment.
|
||||
in_t_ex = in_t_execs[-1]
|
||||
in_t_ex = in_t_execs[-1] if in_t_execs else None
|
||||
|
||||
if not in_t_ex or not states.is_completed(in_t_ex.state):
|
||||
return False
|
||||
|
Reference in New Issue
Block a user