Add designate-manage service clean cmd
Add `designate-manage service clean` cmd to allow clean stale services. Any services not provide heartbeat within double heartbeat_interval time needs to be consider as dead services. And should be removed ASAP. And in any case, even service is actually alive (maybe in a crazy rpc timeout case), it can alway update and create it's service status back. So we're fine to delete the service status in this case too. Closes-Bug: #2110262 Change-Id: I211d0a60aa19ab4c0ae0fbb808e2d9080ccbbedd Signed-off-by: ricolin <rlin@vexxhost.com>
This commit is contained in:
54
designate/manage/service.py
Normal file
54
designate/manage/service.py
Normal file
@@ -0,0 +1,54 @@
|
||||
# Copyright (c) 2025 VEXXHOST, Inc.
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
from oslo_utils import timeutils
|
||||
|
||||
import designate.conf
|
||||
from designate.manage import base
|
||||
from designate import rpc
|
||||
from designate import storage
|
||||
|
||||
|
||||
CONF = designate.conf.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ServiceCommands(base.Commands):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.heartbeat_interval = None
|
||||
self.storage = storage.get_storage()
|
||||
|
||||
def clean(self):
|
||||
rpc.init(CONF)
|
||||
self.heartbeat_interval = CONF.heartbeat_emitter.heartbeat_interval
|
||||
LOG.info("Start cleaning dead services.")
|
||||
try:
|
||||
statuses = self.storage.find_service_statuses(self.context)
|
||||
for status in statuses:
|
||||
if status.heartbeated_at:
|
||||
# Clean stale servcie if it pass 2*(heartbeat_interval)
|
||||
check_interval = (
|
||||
timeutils.utcnow() - status.heartbeated_at
|
||||
).total_seconds()
|
||||
if check_interval > 2 * self.heartbeat_interval:
|
||||
LOG.warning("Found dead service for delete: "
|
||||
"%(service_name)s. "
|
||||
"Last service heartbeat time is "
|
||||
"%(check_interval)s seconds ago.",
|
||||
{
|
||||
'service_name': status.service_name,
|
||||
'check_interval': check_interval
|
||||
}
|
||||
)
|
||||
self.storage.delete_service_status(
|
||||
self.context, status)
|
||||
except messaging.exceptions.MessagingTimeout:
|
||||
LOG.critical(
|
||||
'No response received from designate-central. '
|
||||
'Check it is running, and retry'
|
||||
)
|
||||
raise SystemExit(1)
|
||||
|
||||
LOG.info("Job finished.")
|
||||
@@ -2461,6 +2461,17 @@ class SQLAlchemyStorage(base.SQLAlchemy):
|
||||
exceptions.DuplicateServiceStatus,
|
||||
exceptions.ServiceStatusNotFound)
|
||||
|
||||
def delete_service_status(self, context, service_status):
|
||||
"""
|
||||
Delete the Service status.
|
||||
|
||||
:param context: RPC Context.
|
||||
:param service_status: Status for a service.
|
||||
"""
|
||||
return self._delete(
|
||||
context, tables.service_status, service_status,
|
||||
exceptions.ServiceStatusNotFound)
|
||||
|
||||
# Reverse Name utils
|
||||
def _rname_check(self, criterion):
|
||||
# If the criterion has 'name' in it, switch it out for reverse_name
|
||||
|
||||
95
designate/tests/functional/manage/test_service.py
Normal file
95
designate/tests/functional/manage/test_service.py
Normal file
@@ -0,0 +1,95 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from unittest import mock
|
||||
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from designate.manage import service
|
||||
from designate import objects
|
||||
from designate.tests import base_fixtures
|
||||
import designate.tests.functional
|
||||
|
||||
CONF = designate.conf.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ManageServiceTestCase(designate.tests.functional.TestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.stdlog = base_fixtures.StandardLogging()
|
||||
self.useFixture(self.stdlog)
|
||||
self.command = service.ServiceCommands()
|
||||
|
||||
@mock.patch.object(service, 'LOG')
|
||||
def test_service_clean(self, m_log):
|
||||
values = self.get_service_status_fixture()
|
||||
service_status = objects.ServiceStatus.from_dict(values)
|
||||
service_status.heartbeated_at = timeutils.datetime.datetime(
|
||||
2024, 5, 8, 14, 46, 38, 314323)
|
||||
self.storage.create_service_status(
|
||||
self.admin_context, service_status)
|
||||
|
||||
values_new = self.get_service_status_fixture(fixture=1)
|
||||
service_status_new = objects.ServiceStatus.from_dict(values_new)
|
||||
self.storage.create_service_status(
|
||||
self.admin_context, service_status_new)
|
||||
self.assertEqual(
|
||||
len(self.storage.find_service_statuses(self.admin_context)), 2
|
||||
)
|
||||
self.command.clean()
|
||||
self.assertIn(
|
||||
service_status.service_name,
|
||||
m_log.warning.call_args_list[0].args[1]['service_name']
|
||||
)
|
||||
statuses = self.storage.find_service_statuses(self.admin_context)
|
||||
self.assertEqual(len(statuses), 1)
|
||||
|
||||
# Make sure the remaining service is not the one who expired
|
||||
self.assertEqual(service_status_new.service_name,
|
||||
statuses[0].service_name)
|
||||
self.assertNotEqual(service_status.service_name,
|
||||
statuses[0].service_name)
|
||||
|
||||
@mock.patch.object(service, 'LOG')
|
||||
def test_service_clean_no_dead_service(self, m_log):
|
||||
values = self.get_service_status_fixture()
|
||||
service_status = objects.ServiceStatus.from_dict(values)
|
||||
self.storage.create_service_status(
|
||||
self.admin_context, service_status)
|
||||
|
||||
values_new = self.get_service_status_fixture(fixture=1)
|
||||
service_status_new = objects.ServiceStatus.from_dict(values_new)
|
||||
self.storage.create_service_status(
|
||||
self.admin_context, service_status_new)
|
||||
self.assertEqual(
|
||||
len(self.storage.find_service_statuses(self.admin_context)), 2
|
||||
)
|
||||
self.command.clean()
|
||||
self.assertEqual(
|
||||
len(m_log.warning.call_args_list), 0
|
||||
)
|
||||
statuses = self.storage.find_service_statuses(self.admin_context)
|
||||
self.assertEqual(len(statuses), 2)
|
||||
|
||||
@mock.patch.object(service, 'LOG')
|
||||
def test_service_clean_message_timeout(self, m_log):
|
||||
self.command.storage.find_service_statuses = mock.Mock(
|
||||
side_effect=messaging.exceptions.MessagingTimeout
|
||||
)
|
||||
self.assertRaises(SystemExit, self.command.clean)
|
||||
self.assertEqual(
|
||||
len(m_log.critical.call_args_list), 1
|
||||
)
|
||||
@@ -0,0 +1,13 @@
|
||||
---
|
||||
fixes:
|
||||
- |
|
||||
New cmd `designate-manage service clean`.
|
||||
Previously Designate service always stuck in `UP` even we
|
||||
stop provide any heartbeat for long while.
|
||||
And no method to clean services status.
|
||||
For services that run on containers (like in K8s),
|
||||
The list of `UP` services statuses just piling up.
|
||||
Add new cmd `designate-manage service clean` to delect and
|
||||
clean any service that fail to provide heartbeat within
|
||||
double heartbeat interval time.
|
||||
`CONF.heartbeat_emitter.heartbeat_interval` default to 10 seconds.
|
||||
@@ -102,6 +102,7 @@ designate.scheduler.filters =
|
||||
designate.manage =
|
||||
database = designate.manage.database:DatabaseCommands
|
||||
pool = designate.manage.pool:PoolCommands
|
||||
service = designate.manage.service:ServiceCommands
|
||||
tlds = designate.manage.tlds:TLDCommands
|
||||
|
||||
designate.producer_tasks =
|
||||
|
||||
Reference in New Issue
Block a user