diff --git a/octavia/controller/worker/v2/controller_worker.py b/octavia/controller/worker/v2/controller_worker.py index 3c2a9ce133..61278a3533 100644 --- a/octavia/controller/worker/v2/controller_worker.py +++ b/octavia/controller/worker/v2/controller_worker.py @@ -294,8 +294,17 @@ class ControllerWorker(object): :returns: None :raises ListenerNotFound: The referenced listener was not found """ - db_lb = self._lb_repo.get(db_apis.get_session(), - id=listener[constants.LOADBALANCER_ID]) + try: + db_lb = self._get_db_obj_until_pending_update( + self._lb_repo, listener[constants.LOADBALANCER_ID]) + except tenacity.RetryError as e: + LOG.warning('Loadbalancer did not go into %s in 60 seconds. ' + 'This either due to an in-progress Octavia upgrade ' + 'or an overloaded and failing database. Assuming ' + 'an upgrade is in progress and continuing.', + constants.PENDING_UPDATE) + db_lb = e.last_attempt.result() + store = {constants.LISTENER: listener, constants.UPDATE_DICT: listener_updates, constants.LOADBALANCER_ID: db_lb.id, @@ -391,6 +400,18 @@ class ControllerWorker(object): :returns: None :raises LBNotFound: The referenced load balancer was not found """ + + try: + self._get_db_obj_until_pending_update( + self._lb_repo, + original_load_balancer[constants.LOADBALANCER_ID]) + except tenacity.RetryError: + LOG.warning('Load balancer did not go into %s in 60 seconds. ' + 'This either due to an in-progress Octavia upgrade ' + 'or an overloaded and failing database. Assuming ' + 'an upgrade is in progress and continuing.', + constants.PENDING_UPDATE) + store = {constants.LOADBALANCER: original_load_balancer, constants.LOADBALANCER_ID: original_load_balancer[constants.LOADBALANCER_ID], @@ -479,8 +500,26 @@ class ControllerWorker(object): flow_utils.get_delete_member_flow, store=store) + @tenacity.retry( + retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), + wait=tenacity.wait_incrementing( + CONF.haproxy_amphora.api_db_commit_retry_initial_delay, + CONF.haproxy_amphora.api_db_commit_retry_backoff, + CONF.haproxy_amphora.api_db_commit_retry_max), + stop=tenacity.stop_after_attempt( + CONF.haproxy_amphora.api_db_commit_retry_attempts)) def batch_update_members(self, old_members, new_members, updated_members): + db_new_members = [self._member_repo.get(db_apis.get_session(), + id=member[constants.MEMBER_ID]) + for member in new_members] + # The API may not have commited all of the new member records yet. + # Make sure we retry looking them up. + if None in db_new_members or len(db_new_members) != len(new_members): + LOG.warning('Failed to fetch one of the new members from DB. ' + 'Retrying for up to 60 seconds.') + raise db_exceptions.NoResultFound + updated_members = [ (provider_utils.db_member_to_provider_member( self._member_repo.get(db_apis.get_session(), @@ -534,9 +573,19 @@ class ControllerWorker(object): :returns: None :raises MemberNotFound: The referenced member was not found """ - # TODO(ataraday) when other flows will use dicts - revisit this - pool = self._pool_repo.get(db_apis.get_session(), - id=member[constants.POOL_ID]) + + try: + db_member = self._get_db_obj_until_pending_update( + self._member_repo, member[constants.MEMBER_ID]) + except tenacity.RetryError as e: + LOG.warning('Member did not go into %s in 60 seconds. ' + 'This either due to an in-progress Octavia upgrade ' + 'or an overloaded and failing database. Assuming ' + 'an upgrade is in progress and continuing.', + constants.PENDING_UPDATE) + db_member = e.last_attempt.result() + + pool = db_member.pool load_balancer = pool.load_balancer provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer( load_balancer).to_dict(recurse=True) @@ -720,8 +769,18 @@ class ControllerWorker(object): :returns: None :raises L7PolicyNotFound: The referenced l7policy was not found """ - db_listener = self._listener_repo.get( - db_apis.get_session(), id=original_l7policy[constants.LISTENER_ID]) + try: + db_l7policy = self._get_db_obj_until_pending_update( + self._l7policy_repo, original_l7policy[constants.L7POLICY_ID]) + except tenacity.RetryError as e: + LOG.warning('L7 policy did not go into %s in 60 seconds. ' + 'This either due to an in-progress Octavia upgrade ' + 'or an overloaded and failing database. Assuming ' + 'an upgrade is in progress and continuing.', + constants.PENDING_UPDATE) + db_l7policy = e.last_attempt.result() + + db_listener = db_l7policy.listener listeners_dicts = ( provider_utils.db_listeners_to_provider_dicts_list_of_dicts( @@ -812,8 +871,17 @@ class ControllerWorker(object): :returns: None :raises L7RuleNotFound: The referenced l7rule was not found """ - db_l7policy = self._l7policy_repo.get( - db_apis.get_session(), id=original_l7rule[constants.L7POLICY_ID]) + try: + db_l7rule = self._get_db_obj_until_pending_update( + self._l7rule_repo, original_l7rule[constants.L7RULE_ID]) + except tenacity.RetryError as e: + LOG.warning('L7 rule did not go into %s in 60 seconds. ' + 'This either due to an in-progress Octavia upgrade ' + 'or an overloaded and failing database. Assuming ' + 'an upgrade is in progress and continuing.', + constants.PENDING_UPDATE) + db_l7rule = e.last_attempt.result() + db_l7policy = db_l7rule.l7policy load_balancer = db_l7policy.listener.load_balancer listeners_dicts = ( diff --git a/octavia/tests/unit/controller/worker/v2/test_controller_worker.py b/octavia/tests/unit/controller/worker/v2/test_controller_worker.py index ab8a9a37e0..6418ee1b2a 100644 --- a/octavia/tests/unit/controller/worker/v2/test_controller_worker.py +++ b/octavia/tests/unit/controller/worker/v2/test_controller_worker.py @@ -375,6 +375,11 @@ class TestControllerWorker(base.TestCase): mock_health_mon_repo_get, mock_amp_repo_get): + load_balancer_mock = mock.MagicMock() + load_balancer_mock.provisioning_status = constants.PENDING_UPDATE + load_balancer_mock.id = LB_ID + mock_lb_repo_get.return_value = load_balancer_mock + _flow_mock.reset_mock() _listener_mock.provisioning_status = constants.PENDING_UPDATE @@ -795,6 +800,10 @@ class TestControllerWorker(base.TestCase): mock_amp_repo_get): _flow_mock.reset_mock() + db_member = mock.MagicMock() + db_member.provisioning_status = constants.PENDING_UPDATE + db_member.pool = _db_pool_mock + mock_member_repo_get.return_value = db_member _member = _member_mock.to_dict() _member[constants.PROVISIONING_STATUS] = constants.PENDING_UPDATE mock_get_az_metadata_dict.return_value = {} @@ -843,7 +852,9 @@ class TestControllerWorker(base.TestCase): old_member = mock.MagicMock() old_member.to_dict.return_value = {'id': 9, constants.POOL_ID: 'testtest'} - mock_member_repo_get.side_effect = [_member_mock, old_member] + new_member = mock.MagicMock() + mock_member_repo_get.side_effect = [ + new_member, _member_mock, old_member] cw.batch_update_members([{constants.MEMBER_ID: 9, constants.POOL_ID: 'testtest'}], [{constants.MEMBER_ID: 11}], diff --git a/releasenotes/notes/fix-race-condiction-on-update-b5330c8fcf1800cd.yaml b/releasenotes/notes/fix-race-condiction-on-update-b5330c8fcf1800cd.yaml new file mode 100644 index 0000000000..0d07e64ae1 --- /dev/null +++ b/releasenotes/notes/fix-race-condiction-on-update-b5330c8fcf1800cd.yaml @@ -0,0 +1,7 @@ +--- +fixes: + - | + Fix a potential race condition when updating a resource in the amphorav2 + worker. The worker was not waiting for the resource to be set to + PENDING_UPDATE, so the resource may have been updated with old data from the + database, resulting in a no-op update.