diff --git a/oslo_service/backend/__init__.py b/oslo_service/backend/__init__.py index c42c91fe..81abb16b 100644 --- a/oslo_service/backend/__init__.py +++ b/oslo_service/backend/__init__.py @@ -12,12 +12,14 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. + from __future__ import annotations import enum import importlib import logging from typing import Any +from typing import Callable from typing import TYPE_CHECKING from . import exceptions @@ -35,31 +37,61 @@ class BackendType(enum.Enum): DEFAULT_BACKEND_TYPE = BackendType.EVENTLET -_cached_backend_type: BackendType | None = None # backend type -_cached_backend: BaseBackend | None = None # current backend -_cached_components: dict[str, Any] | None = None # backend components +_cached_backend_type: BackendType | None = None +_cached_backend: BaseBackend | None = None +_cached_components: dict[str, Any] | None = None + +# optional override hook +_backend_hook: Callable[[], BackendType] | None = None + + +def register_backend_default_hook(hook: Callable[[], BackendType]) -> None: + """Register a hook that decides the default backend type. + + This is used when no init_backend() call has been made, and + get_backend() is called. The hook will be invoked to determine + the backend type to use *only if* no backend has been selected yet. + + Example: + + def my_hook(): + return BackendType.THREADING + + register_backend_default_hook(my_hook) + + :param hook: A callable that returns a BackendType + """ + global _backend_hook + _backend_hook = hook def _reset_backend() -> None: - """used by test functions to reset the selected backend""" + """Used by test functions to reset the selected backend.""" + + global _cached_backend, _cached_components + global _cached_backend_type, _backend_hook - global _cached_backend, _cached_components, _cached_backend_type _cached_backend_type = _cached_backend = _cached_components = None + _backend_hook = None def init_backend(type_: BackendType) -> None: - """establish which backend will be used when get_backend() is called""" + """Establish which backend will be used when get_backend() is called.""" global _cached_backend, _cached_components, _cached_backend_type if _cached_backend_type is not None: - raise exceptions.BackendAlreadySelected( - f"The {_cached_backend_type.value!r} backend is already set up" - ) + if _cached_backend_type != type_: + raise exceptions.BackendAlreadySelected( + f"Backend already set to {_cached_backend_type.value!r}," + f" cannot reinitialize with {type_.value!r}" + ) + + return # already initialized with same value; no-op backend_name = type_.value - LOG.info(f"Loading backend: {backend_name}") + try: module_name = f"oslo_service.backend.{backend_name}" module = importlib.import_module(module_name) @@ -81,17 +113,33 @@ def init_backend(type_: BackendType) -> None: def get_backend() -> BaseBackend: - """Load backend dynamically based on the default constant.""" + """Load backend dynamically based on the default constant or hook.""" global _cached_backend if _cached_backend is None: - init_backend(DEFAULT_BACKEND_TYPE) + type_ = DEFAULT_BACKEND_TYPE + + if _backend_hook is not None: + try: + type_ = _backend_hook() + LOG.info(f"Backend hook selected: {type_.value}") + except Exception: + LOG.exception( + "Backend hook raised an exception." + " Falling back to default.") + + init_backend(type_) assert _cached_backend is not None # nosec B101 : this is for typing return _cached_backend +def get_backend_type() -> BackendType | None: + """Return the type of the current backend, or None if not set.""" + return _cached_backend_type + + def get_component(name: str) -> Any: """Retrieve a specific component from the backend.""" global _cached_components @@ -103,4 +151,5 @@ def get_component(name: str) -> Any: if name not in _cached_components: raise KeyError(f"Component {name!r} not found in backend.") + return _cached_components[name] diff --git a/oslo_service/backend/base.py b/oslo_service/backend/base.py index 82f7c07d..3be64763 100644 --- a/oslo_service/backend/base.py +++ b/oslo_service/backend/base.py @@ -15,17 +15,16 @@ from __future__ import annotations -from abc import ABC -from abc import abstractmethod +import abc from typing import Any from oslo_service.backend.exceptions import BackendComponentNotAvailable -class BaseBackend(ABC): +class BaseBackend(abc.ABC): """Base class for all backend implementations.""" - @abstractmethod + @abc.abstractmethod def get_service_components(self) -> dict[str, Any]: """Return the backend components. @@ -42,6 +41,29 @@ class BaseBackend(ABC): pass +class ServiceBase(metaclass=abc.ABCMeta): + """Base class for all services.""" + + @abc.abstractmethod + def start(self): + """Start service.""" + + @abc.abstractmethod + def stop(self): + """Stop service.""" + + @abc.abstractmethod + def wait(self): + """Wait for service to complete.""" + + @abc.abstractmethod + def reset(self): + """Reset service. + + Called in case a service running in daemon mode receives SIGHUP. + """ + + class ComponentRegistry: """A registry to manage access to backend components. @@ -49,12 +71,13 @@ class ComponentRegistry: raises an explicit error, improving clarity and debugging. It acts as a centralized registry for backend components. """ + def __init__(self, components): """Initialize the registry with a dictionary of components. - :param components: A dictionary containing backend components, where - the keys are component names and the values are - the respective implementations. + :param components: A dictionary containing backend components, + where the keys are component names and the values are the + respective implementations. """ self._components = components @@ -62,8 +85,8 @@ class ComponentRegistry: """Retrieve a component by its key from the registry. :param key: The name of the component to retrieve. - :raises NotImplementedError: If the component is - not registered or available. + :raises NotImplementedError: If the component is not registered + or available. :return: The requested component instance. """ if key not in self._components or self._components[key] is None: @@ -75,7 +98,7 @@ class ComponentRegistry: """Check if a component is registered and available. :param key: The name of the component to check. - :return: True if the component is registered - and available, False otherwise. + :return: True if the component is registered and available, + False otherwise. """ return key in self._components and self._components[key] is not None diff --git a/oslo_service/backend/common/__init__.py b/oslo_service/backend/common/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/oslo_service/backend/common/constants.py b/oslo_service/backend/common/constants.py new file mode 100644 index 00000000..3d9b04d6 --- /dev/null +++ b/oslo_service/backend/common/constants.py @@ -0,0 +1,16 @@ +# Copyright (C) 2025 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Restart method options +_LAUNCHER_RESTART_METHODS = ['reload', 'mutate'] diff --git a/oslo_service/backend/common/daemon_utils.py b/oslo_service/backend/common/daemon_utils.py new file mode 100644 index 00000000..06a67aba --- /dev/null +++ b/oslo_service/backend/common/daemon_utils.py @@ -0,0 +1,33 @@ +# Copyright (C) 2025 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import errno +import io +import os +import signal +import sys + + +def is_daemon(): + try: + return os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno()) + except io.UnsupportedOperation: + return True + except OSError as err: + return err.errno == errno.ENOTTY or False + + +def is_sighup_and_daemon(signo, signal_handler): + return (signal_handler.is_signal_supported('SIGHUP') and + signo == signal.SIGHUP and is_daemon()) diff --git a/oslo_service/backend/common/signal_utils.py b/oslo_service/backend/common/signal_utils.py new file mode 100644 index 00000000..4569fc8c --- /dev/null +++ b/oslo_service/backend/common/signal_utils.py @@ -0,0 +1,38 @@ +# Copyright (C) 2025 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import signal + + +def get_signal_mappings(ignore=('SIG_DFL', 'SIG_IGN')): + signals_by_name = { + name: getattr(signal, name) + for name in dir(signal) + if name.startswith('SIG') and name not in ignore + } + signals_to_name = {v: k for k, v in signals_by_name.items()} + + return signals_by_name, signals_to_name + + +class SignalExit(SystemExit): + """Raised to indicate a signal-based exit. + + This exception is commonly raised when the process receives a termination + signal (e.g., SIGTERM, SIGINT). The signal number is stored in `signo`. + """ + + def __init__(self, signo, exccode=1): + super().__init__(exccode) + self.signo = signo diff --git a/oslo_service/backend/common/singleton.py b/oslo_service/backend/common/singleton.py new file mode 100644 index 00000000..2b2909fc --- /dev/null +++ b/oslo_service/backend/common/singleton.py @@ -0,0 +1,26 @@ +# Copyright (C) 2025 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_concurrency import lockutils + + +class Singleton(type): + _instances = {} + _semaphores = lockutils.Semaphores() + + def __call__(cls, *args, **kwargs): + with lockutils.lock('singleton_lock', semaphores=cls._semaphores): + if cls not in cls._instances: + cls._instances[cls] = super().__call__(*args, **kwargs) + return cls._instances[cls] diff --git a/oslo_service/backend/common/validation_utils.py b/oslo_service/backend/common/validation_utils.py new file mode 100644 index 00000000..bca18237 --- /dev/null +++ b/oslo_service/backend/common/validation_utils.py @@ -0,0 +1,23 @@ +# Copyright (C) 2025 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_service._i18n import _ +from oslo_service.backend.base import ServiceBase + + +def check_service_base(service): + if not isinstance(service, ServiceBase): + raise TypeError( + _("Service %(service)s must be an instance of %(base)s!") + % {'service': service, 'base': ServiceBase}) diff --git a/oslo_service/backend/eventlet/__init__.py b/oslo_service/backend/eventlet/__init__.py index aa81605f..7b014f71 100644 --- a/oslo_service/backend/eventlet/__init__.py +++ b/oslo_service/backend/eventlet/__init__.py @@ -15,6 +15,9 @@ from oslo_service.backend.base import BaseBackend +from oslo_service.backend.common import daemon_utils +from oslo_service.backend.common import signal_utils +from oslo_service.backend.common import singleton from oslo_service.backend.eventlet import loopingcall from oslo_service.backend.eventlet import service from oslo_service.backend.eventlet import threadgroup @@ -37,8 +40,8 @@ class EventletBackend(BaseBackend): "Services": service.Services, "ServiceWrapper": service.ServiceWrapper, "SignalHandler": service.SignalHandler, - "SignalExit": service.SignalExit, - "Singleton": service.Singleton, + "SignalExit": signal_utils.SignalExit, + "Singleton": singleton.Singleton, # Looping call-related classes "LoopingCallBase": loopingcall.LoopingCallBase, @@ -57,6 +60,6 @@ class EventletBackend(BaseBackend): # Functions "launch": service.launch, - "_is_daemon": service._is_daemon, - "_is_sighup_and_daemon": service._is_sighup_and_daemon, + "_is_daemon": daemon_utils.is_daemon, + "_is_sighup_and_daemon": daemon_utils.is_sighup_and_daemon, } diff --git a/oslo_service/backend/eventlet/loopingcall.py b/oslo_service/backend/eventlet/loopingcall.py index ba271d03..dfb29871 100644 --- a/oslo_service/backend/eventlet/loopingcall.py +++ b/oslo_service/backend/eventlet/loopingcall.py @@ -37,13 +37,13 @@ LOG = logging.getLogger(__name__) class LoopingCallDone(Exception): """Exception to break out and stop a LoopingCallBase. - The poll-function passed to LoopingCallBase can raise this exception to - break out of the loop normally. This is somewhat analogous to + The poll-function passed to LoopingCallBase can raise this exception + to break out of the loop normally. This is somewhat analogous to StopIteration. - An optional return-value can be included as the argument to the exception; - this return-value will be returned by LoopingCallBase.wait() - + An optional return-value can be included as the argument to the + exception; this return-value will be returned by + LoopingCallBase.wait() """ def __init__(self, retvalue=True): @@ -109,16 +109,17 @@ class LoopingCallBase: self._abort.wait(timeout) def _start(self, idle_for, initial_delay=None, stop_on_exception=True): - """Start the looping + """Start the looping. - :param idle_for: Callable that takes two positional arguments, returns - how long to idle for. The first positional argument is - the last result from the function being looped and the - second positional argument is the time it took to - calculate that result. - :param initial_delay: How long to delay before starting the looping. - Value is in seconds. - :param stop_on_exception: Whether to stop if an exception occurs. + :param idle_for: Callable that takes two positional arguments, + returns how long to idle for. The first positional argument + is the last result from the function being looped and the + second positional argument is the time it took to calculate + that result. + :param initial_delay: How long to delay before starting the + looping. Value is in seconds. + :param stop_on_exception: Whether to stop if an exception + occurs. :returns: eventlet event instance """ if self._thread is not None: @@ -359,33 +360,33 @@ class BackOffLoopingCall(LoopingCallBase): class RetryDecorator: """Decorator for retrying a function upon suggested exceptions. - The decorated function is retried for the given number of times, and the - sleep time between the retries is incremented until max sleep time is - reached. If the max retry count is set to -1, then the decorated function - is invoked indefinitely until an exception is thrown, and the caught - exception is not in the list of suggested exceptions. + The decorated function is retried for the given number of times, and + the sleep time between the retries is incremented until max sleep + time is reached. If the max retry count is set to -1, then the + decorated function is invoked indefinitely until an exception is + thrown, and the caught exception is not in the list of suggested + exceptions. """ def __init__(self, max_retry_count=-1, inc_sleep_time=10, max_sleep_time=60, exceptions=()): """Configure the retry object using the input params. - :param max_retry_count: maximum number of times the given function must - be retried when one of the input 'exceptions' - is caught. When set to -1, it will be retried - indefinitely until an exception is thrown - and the caught exception is not in param - exceptions. - :param inc_sleep_time: incremental time in seconds for sleep time - between retries - :param max_sleep_time: max sleep time in seconds beyond which the sleep - time will not be incremented using param - inc_sleep_time. On reaching this threshold, - max_sleep_time will be used as the sleep time. - :param exceptions: suggested exceptions for which the function must be - retried, if no exceptions are provided (the default) - then all exceptions will be reraised, and no - retrying will be triggered. + :param max_retry_count: maximum number of times the given + function must be retried when one of the input 'exceptions' + is caught. When set to -1, it will be retried indefinitely + until an exception is thrown and the caught exception is not + in param exceptions. + :param inc_sleep_time: incremental time in seconds for sleep + time between retries + :param max_sleep_time: max sleep time in seconds beyond which + the sleep time will not be incremented using param + inc_sleep_time. On reaching this threshold, max_sleep_time + will be used as the sleep time. + :param exceptions: suggested exceptions for which the function + must be retried, if no exceptions are provided (the default) + then all exceptions will be reraised, and no retrying will + be triggered. """ self._max_retry_count = max_retry_count self._inc_sleep_time = inc_sleep_time diff --git a/oslo_service/backend/eventlet/service.py b/oslo_service/backend/eventlet/service.py index 75ee86de..9aec56b1 100644 --- a/oslo_service/backend/eventlet/service.py +++ b/oslo_service/backend/eventlet/service.py @@ -14,17 +14,13 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - - """Generic Node base class for all workers that run on hosts.""" -import abc import collections import errno import functools import gc import inspect -import io import logging import os import random @@ -36,9 +32,21 @@ import eventlet from eventlet import event from eventlet import tpool -from oslo_concurrency import lockutils from oslo_service._i18n import _ from oslo_service import _options +from oslo_service.backend.base import ServiceBase +from oslo_service.backend.common.constants import \ + _LAUNCHER_RESTART_METHODS +from oslo_service.backend.common.daemon_utils import \ + is_sighup_and_daemon as _is_sighup_and_daemon +from oslo_service.backend.common.signal_utils import \ + get_signal_mappings +from oslo_service.backend.common.signal_utils import \ + SignalExit +from oslo_service.backend.common.singleton import \ + Singleton +from oslo_service.backend.common.validation_utils import \ + check_service_base as _check_service_base from oslo_service.backend.eventlet import threadgroup from oslo_service import eventlet_backdoor from oslo_service import systemd @@ -46,78 +54,6 @@ from oslo_service import systemd LOG = logging.getLogger(__name__) -_LAUNCHER_RESTART_METHODS = ['reload', 'mutate'] - - -def _is_daemon(): - # The process group for a foreground process will match the - # process group of the controlling terminal. If those values do - # not match, or ioctl() fails on the stdout file handle, we assume - # the process is running in the background as a daemon. - # http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics - try: - is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno()) - except io.UnsupportedOperation: - # Could not get the fileno for stdout, so we must be a daemon. - is_daemon = True - except OSError as err: - if err.errno == errno.ENOTTY: - # Assume we are a daemon because there is no terminal. - is_daemon = True - else: - raise - return is_daemon - - -def _is_sighup_and_daemon(signo): - if not (SignalHandler().is_signal_supported('SIGHUP') and - signo == signal.SIGHUP): - # Avoid checking if we are a daemon, because the signal isn't - # SIGHUP. - return False - return _is_daemon() - - -def _check_service_base(service): - if not isinstance(service, ServiceBase): - raise TypeError(_("Service %(service)s must an instance of %(base)s!") - % {'service': service, 'base': ServiceBase}) - - -class ServiceBase(metaclass=abc.ABCMeta): - """Base class for all services.""" - - @abc.abstractmethod - def start(self): - """Start service.""" - - @abc.abstractmethod - def stop(self): - """Stop service.""" - - @abc.abstractmethod - def wait(self): - """Wait for service to complete.""" - - @abc.abstractmethod - def reset(self): - """Reset service. - - Called in case service running in daemon mode receives SIGHUP. - """ - - -class Singleton(type): - _instances = {} - _semaphores = lockutils.Semaphores() - - def __call__(cls, *args, **kwargs): - with lockutils.lock('singleton_lock', semaphores=cls._semaphores): - if cls not in cls._instances: - cls._instances[cls] = super().__call__( - *args, **kwargs) - return cls._instances[cls] - class SignalHandler(metaclass=Singleton): @@ -128,14 +64,7 @@ class SignalHandler(metaclass=Singleton): # Map all signal names to signal integer values and create a # reverse mapping (for easier + quick lookup). - self._ignore_signals = ('SIG_DFL', 'SIG_IGN') - self._signals_by_name = {name: getattr(signal, name) - for name in dir(signal) - if name.startswith("SIG") and - name not in self._ignore_signals} - self.signals_to_name = { - sigval: name - for (name, sigval) in self._signals_by_name.items()} + self._signals_by_name, self.signals_to_name = get_signal_mappings() self._signal_handlers = collections.defaultdict(list) self.clear() @@ -192,8 +121,8 @@ class SignalHandler(metaclass=Singleton): def __setup_signal_interruption(self): """Set up to do the Right Thing with signals during poll() and sleep(). - Deal with the changes introduced in PEP 475 that prevent a signal from - interrupting eventlet's call to poll() or sleep(). + Deal with the changes introduced in PEP 475 that prevent a + signal from interrupting eventlet's call to poll() or sleep(). """ select_module = eventlet.patcher.original('select') self.__force_interrupt_on_signal = hasattr(select_module, 'poll') @@ -245,10 +174,9 @@ class Launcher: """Initialize the service launcher. :param restart_method: If 'reload', calls reload_config_files on - SIGHUP. If 'mutate', calls mutate_config_files on SIGHUP. Other - values produce a ValueError. + SIGHUP. If 'mutate', calls mutate_config_files on SIGHUP. + Other values produce a ValueError. :returns: None - """ self.conf = conf conf.register_opts(_options.service_opts) @@ -266,7 +194,6 @@ class Launcher: ProcessLauncher.launch_service. It must be None, 1 or omitted. :returns: None - """ if workers is not None and workers != 1: raise ValueError(_("Launcher asked to start multiple workers")) @@ -278,7 +205,6 @@ class Launcher: """Stop all services which are currently running. :returns: None - """ self.services.stop() @@ -286,7 +212,6 @@ class Launcher: """Wait until all services have been stopped, and then return. :returns: None - """ self.services.wait() @@ -294,7 +219,7 @@ class Launcher: """Reload config files and restart service. :returns: The return value from reload_config_files or - mutate_config_files, according to the restart_method. + mutate_config_files, according to the restart_method. """ if self.restart_method == 'reload': self.conf.reload_config_files() @@ -303,14 +228,9 @@ class Launcher: self.services.restart() -class SignalExit(SystemExit): - def __init__(self, signo, exccode=1): - super().__init__(exccode) - self.signo = signo - - class ServiceLauncher(Launcher): """Runs one or more service in a parent process.""" + def __init__(self, conf, restart_method='reload'): """Constructor. @@ -381,7 +301,7 @@ class ServiceLauncher(Launcher): while True: self.handle_signal() status, signo = self._wait_for_exit_or_signal() - if not _is_sighup_and_daemon(signo): + if not _is_sighup_and_daemon(signo, SignalHandler()): break self.restart() @@ -405,10 +325,10 @@ class ProcessLauncher: :param conf: an instance of ConfigOpts :param wait_interval: The interval to sleep for between checks - of child process exit. + of child process exit. :param restart_method: If 'reload', calls reload_config_files on - SIGHUP. If 'mutate', calls mutate_config_files on SIGHUP. Other - values produce a ValueError. + SIGHUP. If 'mutate', calls mutate_config_files on SIGHUP. + Other values produce a ValueError. """ self.conf = conf conf.register_opts(_options.service_opts) @@ -563,7 +483,7 @@ class ProcessLauncher: self._child_process_handle_signal() status, signo = self._child_wait_for_exit_or_signal( self.launcher) - if not _is_sighup_and_daemon(signo): + if not _is_sighup_and_daemon(signo, SignalHandler()): self.launcher.wait() break self.launcher.restart() @@ -580,10 +500,10 @@ class ProcessLauncher: def launch_service(self, service, workers=1): """Launch a service with a given number of workers. - :param service: a service to launch, must be an instance of - :class:`oslo_service.service.ServiceBase` - :param workers: a number of processes in which a service - will be running + :param service: a service to launch, must be an instance of + :class:`oslo_service.service.ServiceBase` + :param workers: a number of processes in which a service + will be running """ _check_service_base(service) wrap = ServiceWrapper(service, workers) @@ -657,7 +577,7 @@ class ProcessLauncher: signame = self.signal_handler.signals_to_name[self.sigcaught] LOG.info('Caught %s, stopping children', signame) - if not _is_sighup_and_daemon(self.sigcaught): + if not _is_sighup_and_daemon(self.sigcaught, SignalHandler()): break child_signal = signal.SIGTERM @@ -725,8 +645,8 @@ class Service(ServiceBase): def stop(self, graceful=False): """Stop a service. - :param graceful: indicates whether to wait for all threads to finish - or terminate them instantly + :param graceful: indicates whether to wait for all threads to + finish or terminate them instantly """ self.tg.stop(graceful) @@ -798,7 +718,6 @@ class Services: :param service: service to run :param done: event to wait on until a shutdown is triggered :returns: None - """ try: service.start() diff --git a/oslo_service/backend/eventlet/threadgroup.py b/oslo_service/backend/eventlet/threadgroup.py index 3e7d71ef..a1dcc234 100644 --- a/oslo_service/backend/eventlet/threadgroup.py +++ b/oslo_service/backend/eventlet/threadgroup.py @@ -39,10 +39,11 @@ def _on_thread_done(_greenthread, group, thread): class Thread: """Wrapper around a greenthread. - Holds a reference to the :class:`ThreadGroup`. The Thread will notify - the :class:`ThreadGroup` when it has done so it can be removed from - the threads list. + Holds a reference to the :class:`ThreadGroup`. The Thread will notify + the :class:`ThreadGroup` when it has done so it can be removed from + the threads list. """ + def __init__(self, thread, group, link=True): self.thread = thread if link: @@ -91,8 +92,8 @@ class ThreadGroup: def __init__(self, thread_pool_size=10): """Create a ThreadGroup with a pool of greenthreads. - :param thread_pool_size: the maximum number of threads allowed to run - concurrently. + :param thread_pool_size: the maximum number of threads allowed + to run concurrently. """ self.pool = greenpool.GreenPool(thread_pool_size) self.threads = [] @@ -244,8 +245,8 @@ class ThreadGroup: def thread_done(self, thread): """Remove a completed thread from the group. - This method is automatically called on completion of a thread in the - group, and should not be called explicitly. + This method is automatically called on completion of a thread in + the group, and should not be called explicitly. """ self.threads.remove(thread) diff --git a/oslo_service/backend/exceptions.py b/oslo_service/backend/exceptions.py index 6c3e3cee..f718615a 100644 --- a/oslo_service/backend/exceptions.py +++ b/oslo_service/backend/exceptions.py @@ -15,7 +15,7 @@ class BackendAlreadySelected(Exception): - """raised when init_backend() is called more than once""" + """Raised when init_backend() is called more than once.""" pass diff --git a/oslo_service/backend/threading/__init__.py b/oslo_service/backend/threading/__init__.py new file mode 100644 index 00000000..b8b53f9d --- /dev/null +++ b/oslo_service/backend/threading/__init__.py @@ -0,0 +1,60 @@ +# Copyright (C) 2025 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_service.backend.base import BaseBackend +from oslo_service.backend.common import daemon_utils +from oslo_service.backend.common import signal_utils +from oslo_service.backend.common import singleton +from oslo_service.backend.threading import loopingcall +from oslo_service.backend.threading import service +from oslo_service.backend.threading import threadgroup + + +class ThreadingBackend(BaseBackend): + """Backend implementation using Python threading and Cotyledon.""" + + @staticmethod + def get_service_components(): + """Return the components provided by the Threading backend.""" + + return { + # Service-related classes + "ServiceBase": service.Service, + "ServiceLauncher": service.ProcessLauncher, + "Launcher": service.ProcessLauncher, + "ProcessLauncher": service.ProcessLauncher, + "Service": service.Service, + "Services": service.Services, + "ServiceWrapper": service.ServiceWrapper, + "SignalExit": signal_utils.SignalExit, + "SignalHandler": service.SignalHandler, + "Singleton": singleton.Singleton, + # Looping call-related classes + "LoopingCallBase": loopingcall.LoopingCallBase, + "LoopingCallDone": loopingcall.LoopingCallDone, + "LoopingCallTimeOut": loopingcall.LoopingCallTimeOut, + "FixedIntervalLoopingCall": loopingcall.FixedIntervalLoopingCall, + "FixedIntervalWithTimeoutLoopingCall": + loopingcall.FixedIntervalWithTimeoutLoopingCall, + "DynamicLoopingCall": loopingcall.DynamicLoopingCall, + "BackOffLoopingCall": loopingcall.BackOffLoopingCall, + "RetryDecorator": loopingcall.RetryDecorator, + # Thread group-related classes + "ThreadGroup": threadgroup.ThreadGroup, + "Thread": threadgroup.Thread, + # Functions + "_is_daemon": daemon_utils.is_daemon, + "_is_sighup_and_daemon": daemon_utils.is_sighup_and_daemon, + "launch": service.launch, + } diff --git a/oslo_service/backend/threading/loopingcall.py b/oslo_service/backend/threading/loopingcall.py new file mode 100644 index 00000000..7abad429 --- /dev/null +++ b/oslo_service/backend/threading/loopingcall.py @@ -0,0 +1,403 @@ +# Copyright (C) 2025 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import random +import sys +import threading +import time + +import futurist +from oslo_log import log as logging +from oslo_utils import excutils +from oslo_utils import reflection +from oslo_utils import timeutils + +from oslo_service._i18n import _ + +LOG = logging.getLogger(__name__) + + +class LoopingCallDone(Exception): + """Exception to break out and stop a LoopingCallBase. + + The function passed to a looping call may raise this exception to + break out of the loop normally. An optional return value may be + provided; this value will be returned by LoopingCallBase.wait(). + """ + + def __init__(self, retvalue=True): + """:param retvalue: Value that LoopingCallBase.wait() should return.""" + self.retvalue = retvalue + + +class LoopingCallTimeOut(Exception): + """Exception raised when a LoopingCall times out.""" + pass + + +class FutureEvent: + """A simple event object that can carry a result or an exception.""" + + def __init__(self): + self._event = threading.Event() + self._result = None + self._exc_info = None + + def send(self, result): + self._result = result + self._event.set() + + def send_exception(self, exc_type, exc_value, tb): + self._exc_info = (exc_type, exc_value, tb) + self._event.set() + + def wait(self, timeout=None): + flag = self._event.wait(timeout) + + if not flag: + raise RuntimeError("Timed out waiting for event") + + if self._exc_info: + exc_type, exc_value, tb = self._exc_info + raise exc_value.with_traceback(tb) + return self._result + + +def _safe_wrapper(f, kind, func_name): + """Wrapper that calls the wrapped function and logs errors as needed.""" + + def func(*args, **kwargs): + try: + return f(*args, **kwargs) + except LoopingCallDone: + raise # Let the outer handler process this + except Exception: + LOG.error('%(kind)s %(func_name)r failed', + {'kind': kind, 'func_name': func_name}, + exc_info=True) + return 0 + + return func + + +class LoopingCallBase: + _KIND = _("Unknown looping call") + _RUN_ONLY_ONE_MESSAGE = _( + "A looping call can only run one function at a time") + + def __init__(self, f=None, *args, **kwargs): + self.args = args + self.kwargs = kwargs + self.f = f + self._future = None + self.done = None + self._abort = threading.Event() # When set, the loop stops + + @property + def _running(self): + return not self._abort.is_set() + + def stop(self): + if self._running: + self._abort.set() + + def wait(self): + """Wait for the looping call to complete and return its result.""" + return self.done.wait() + + def _on_done(self, future): + self._future = None + + def _sleep(self, timeout): + # Instead of eventlet.sleep, we wait on the abort event for timeout + # seconds. + self._abort.wait(timeout) + + def _start(self, idle_for, initial_delay=None, stop_on_exception=True): + """Start the looping call. + + :param idle_for: Callable taking two arguments (last result, + elapsed time) and returning how long to idle. + :param initial_delay: Delay (in seconds) before starting the + loop. + :param stop_on_exception: Whether to stop on exception. + :returns: A FutureEvent instance. + """ + + if self._future is not None: + raise RuntimeError(self._RUN_ONLY_ONE_MESSAGE) + + self.done = FutureEvent() + self._abort.clear() + + def _run_loop(): + kind = self._KIND + func_name = reflection.get_callable_name(self.f) + func = self.f if stop_on_exception else _safe_wrapper(self.f, kind, + func_name) + if initial_delay: + self._sleep(initial_delay) + try: + watch = timeutils.StopWatch() + + while self._running: + watch.restart() + result = func(*self.args, **self.kwargs) + watch.stop() + + if not self._running: + break + + idle = idle_for(result, watch.elapsed()) + LOG.debug( + '%(kind)s %(func_name)r sleeping for %(idle).02f' + ' seconds', + {'func_name': func_name, 'idle': idle, 'kind': kind}) + self._sleep(idle) + except LoopingCallDone as e: + self.done.send(e.retvalue) + except Exception: + exc_info = sys.exc_info() + try: + LOG.error('%(kind)s %(func_name)r failed', + {'kind': kind, 'func_name': func_name}, + exc_info=exc_info) + self.done.send_exception(*exc_info) + finally: + del exc_info + return + else: + self.done.send(True) + + # Use futurist's ThreadPoolExecutor to run the loop in a background + # thread. + executor = futurist.ThreadPoolExecutor(max_workers=1) + self._future = executor.submit(_run_loop) + self._future.add_done_callback(self._on_done) + return self.done + + # NOTE: _elapsed() is a thin wrapper for StopWatch.elapsed() + def _elapsed(self, watch): + return watch.elapsed() + + +class FixedIntervalLoopingCall(LoopingCallBase): + """A fixed interval looping call.""" + _RUN_ONLY_ONE_MESSAGE = _( + "A fixed interval looping call can only run one function at a time") + _KIND = _('Fixed interval looping call') + + def start(self, interval, initial_delay=None, stop_on_exception=True): + def _idle_for(result, elapsed): + delay = round(elapsed - interval, 2) + if delay > 0: + func_name = reflection.get_callable_name(self.f) + LOG.warning( + 'Function %(func_name)r run outlasted interval by' + ' %(delay).2f sec', + {'func_name': func_name, 'delay': delay}) + return -delay if delay < 0 else 0 + + return self._start(_idle_for, initial_delay=initial_delay, + stop_on_exception=stop_on_exception) + + +class FixedIntervalWithTimeoutLoopingCall(LoopingCallBase): + """A fixed interval looping call with timeout checking.""" + _RUN_ONLY_ONE_MESSAGE = _( + "A fixed interval looping call with timeout checking" + " can only run one function at a time") + _KIND = _('Fixed interval looping call with timeout checking.') + + def start(self, interval, initial_delay=None, stop_on_exception=True, + timeout=0): + start_time = time.time() + + def _idle_for(result, elapsed): + delay = round(elapsed - interval, 2) + if delay > 0: + func_name = reflection.get_callable_name(self.f) + LOG.warning( + 'Function %(func_name)r run outlasted interval by' + ' %(delay).2f sec', + {'func_name': func_name, 'delay': delay}) + elapsed_time = time.time() - start_time + if timeout > 0 and elapsed_time > timeout: + raise LoopingCallTimeOut( + _('Looping call timed out after %.02f seconds') + % elapsed_time) + + return -delay if delay < 0 else 0 + + return self._start(_idle_for, initial_delay=initial_delay, + stop_on_exception=stop_on_exception) + + +class DynamicLoopingCall(LoopingCallBase): + """A looping call which sleeps until the next known event. + + The function called should return how long to sleep before being + called again. + """ + + _RUN_ONLY_ONE_MESSAGE = _( + "A dynamic interval looping call can only run one function at a time") + _TASK_MISSING_SLEEP_VALUE_MESSAGE = _( + "A dynamic interval looping call should supply either an interval or" + " periodic_interval_max" + ) + _KIND = _('Dynamic interval looping call') + + def start(self, initial_delay=None, periodic_interval_max=None, + stop_on_exception=True): + def _idle_for(suggested_delay, elapsed): + delay = suggested_delay + if delay is None: + if periodic_interval_max is not None: + delay = periodic_interval_max + else: + raise RuntimeError(self._TASK_MISSING_SLEEP_VALUE_MESSAGE) + else: + if periodic_interval_max is not None: + delay = min(delay, periodic_interval_max) + return delay + + return self._start(_idle_for, initial_delay=initial_delay, + stop_on_exception=stop_on_exception) + + +class BackOffLoopingCall(LoopingCallBase): + """Run a method in a loop with backoff on error. + + The provided function should return True (indicating success, which resets + the backoff interval), False (indicating an error, triggering a backoff), + or raise LoopingCallDone(retvalue=...) to quit the loop. + """ + + _RNG = random.SystemRandom() + _KIND = _('Dynamic backoff interval looping call') + _RUN_ONLY_ONE_MESSAGE = _( + "A dynamic backoff interval looping call can only run one function at" + " a time") + + def __init__(self, f=None, *args, **kwargs): + super().__init__(f=f, *args, **kwargs) + self._error_time = 0 + self._interval = 1 + + def start(self, initial_delay=None, starting_interval=1, timeout=300, + max_interval=300, jitter=0.75, min_interval=0.001): + if self._future is not None: + raise RuntimeError(self._RUN_ONLY_ONE_MESSAGE) + # Reset state. + self._error_time = 0 + self._interval = starting_interval + + def _idle_for(success, _elapsed): + random_jitter = abs(self._RNG.gauss(jitter, 0.1)) + if success: + # Reset error state on success. + self._interval = starting_interval + self._error_time = 0 + return self._interval * random_jitter + else: + # Back off on error with jitter. + idle = max(self._interval * 2 * random_jitter, min_interval) + idle = min(idle, max_interval) + self._interval = max(self._interval * 2 * jitter, min_interval) + if timeout > 0 and self._error_time + idle > timeout: + raise LoopingCallTimeOut( + _('Looping call timed out after %.02f seconds') % ( + self._error_time + idle)) + self._error_time += idle + return idle + + return self._start(_idle_for, initial_delay=initial_delay) + + +class RetryDecorator: + """Decorator for retrying a function upon suggested exceptions. + + The decorated function is retried for the given number of times, + with an incrementally increasing sleep time between retries. A max + sleep time may be set. If max_retry_count is -1, the function is + retried indefinitely until an exception is raised that is not in the + suggested exceptions. + """ + + def __init__(self, max_retry_count=-1, inc_sleep_time=10, + max_sleep_time=60, exceptions=()): + """Document parameters for retry behavior. + + :param max_retry_count: Maximum number of retries for exceptions in + 'exceptions'. -1 means retry indefinitely. + :param inc_sleep_time: Incremental sleep time (seconds) between + retries. + :param max_sleep_time: Maximum sleep time (seconds). + :param exceptions: A tuple of exception types to catch for retries. + """ + self._max_retry_count = max_retry_count + self._inc_sleep_time = inc_sleep_time + self._max_sleep_time = max_sleep_time + self._exceptions = exceptions + self._retry_count = 0 + self._sleep_time = 0 + + def __call__(self, f): + func_name = reflection.get_callable_name(f) + + def _func(*args, **kwargs): + try: + if self._retry_count: + LOG.debug( + "Invoking %(func_name)s; retry count is" + " %(retry_count)d.", + {'func_name': func_name, + 'retry_count': self._retry_count}) + result = f(*args, **kwargs) + except self._exceptions: + with excutils.save_and_reraise_exception() as ctxt: + LOG.debug( + "Exception in %(func_name)s occurred which is in the" + " retry list.", + {'func_name': func_name}) + if (self._max_retry_count != -1 and + self._retry_count >= self._max_retry_count): + LOG.debug( + "Cannot retry %(func_name)s because retry count" + " (%(retry_count)d) reached max" + " (%(max_retry_count)d).", + {'retry_count': self._retry_count, + 'max_retry_count': self._max_retry_count, + 'func_name': func_name}) + else: + ctxt.reraise = False + self._retry_count += 1 + self._sleep_time += self._inc_sleep_time + + return self._sleep_time + + raise LoopingCallDone(result) + + @functools.wraps(f) + def func(*args, **kwargs): + loop = DynamicLoopingCall(_func, *args, **kwargs) + evt = loop.start(periodic_interval_max=self._max_sleep_time) + LOG.debug("Waiting for function %s to return.", func_name) + + return evt.wait() + + return func diff --git a/oslo_service/backend/threading/service.py b/oslo_service/backend/threading/service.py new file mode 100644 index 00000000..dbe24f76 --- /dev/null +++ b/oslo_service/backend/threading/service.py @@ -0,0 +1,260 @@ +# Copyright (C) 2025 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import logging +import signal +import sys +import threading +import traceback + +import cotyledon +from cotyledon import oslo_config_glue + +from oslo_service._i18n import _ +from oslo_service.backend.base import ServiceBase +from oslo_service.backend.common.constants import _LAUNCHER_RESTART_METHODS +from oslo_service.backend.common.signal_utils import get_signal_mappings +from oslo_service.backend.common.singleton import Singleton +from oslo_service.backend.common.validation_utils import \ + check_service_base as _check_service_base +from oslo_service.backend.threading import threadgroup + +LOG = logging.getLogger(__name__) + + +class SignalHandler(metaclass=Singleton): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._signals_by_name, self.signals_to_name = get_signal_mappings() + self._signal_handlers = collections.defaultdict(list) + self.clear() + + def clear(self): + for sig in list(self._signal_handlers.keys()): + signal.signal(sig, signal.SIG_DFL) + self._signal_handlers.clear() + + def add_handlers(self, signals, handler): + for sig in signals: + self.add_handler(sig, handler) + + def add_handler(self, sig, handler): + if not self.is_signal_supported(sig): + return + signo = self._signals_by_name[sig] + self._signal_handlers[signo].append(handler) + signal.signal(signo, self._handle_signal) + + def _handle_signal(self, signo, frame): + threading.Thread(target=self._handle_signal_cb, + args=(signo, frame)).start() + + def _handle_signal_cb(self, signo, frame): + for handler in reversed(self._signal_handlers[signo]): + handler(signo, frame) + + def is_signal_supported(self, sig_name): + return sig_name in self._signals_by_name + + +class ServiceWrapper(cotyledon.Service): + def __init__(self, worker_id, service_instance, **kwargs): + super().__init__(worker_id) + if not isinstance(service_instance, ServiceBase): + raise TypeError("Service must be an instance of ServiceBase") + self.service_instance = service_instance + + def run(self): + try: + self.service_instance.start() + self.service_instance.wait() + except Exception: + traceback.print_exc() + sys.exit(1) + + def terminate(self): + self.service_instance.stop() + + +class Launcher: + def __init__(self): + self._launcher = None + + def launch_service(self, service, workers=None): + _check_service_base(service) + if workers not in (None, 1): + raise NotImplementedError("Multiple workers is not supported.") + self._launcher = service + service.start() + return service + + def wait(self): + if self._launcher: + self._launcher.wait() + + def stop(self): + if self._launcher: + self._launcher.stop() + + def restart(self): + if self._launcher: + self._launcher.stop() + self._launcher.start() + + +class ServiceLauncher: + def __init__(self, conf, restart_method='reload'): + self.conf = conf + self.restart_method = restart_method + self.backdoor_port = None + self._manager = cotyledon.ServiceManager() + oslo_config_glue.setup(self._manager, conf) + + def launch_service(self, service_instance, workers=1): + _check_service_base(service_instance) + service_instance.backdoor_port = self.backdoor_port + if not isinstance(workers, int) or workers < 1: + raise ValueError("Number of workers must be >= 1") + self._manager.add(ServiceWrapper, workers, args=(service_instance,)) + + def stop(self): + self._manager._terminate(None, None) + + def wait(self): + try: + return self._manager.run() + except SystemExit as exc: + self.stop() + return exc.code + except BaseException: + self.stop() + LOG.exception("Unhandled exception") + return 2 + + +class Service(ServiceBase): + def __init__(self, threads=1000): + super().__init__() + self.tg = threadgroup.ThreadGroup(threads) + self.backdoor_port = None + + def reset(self): + pass + + def start(self): + pass + + def stop(self, graceful=False): + self.tg.stop(graceful) + + def wait(self): + self.tg.waitall() + + +class Services: + def __init__(self, restart_method='reload'): + if restart_method not in _LAUNCHER_RESTART_METHODS: + raise ValueError(_("Invalid restart_method: %s") % restart_method) + self.restart_method = restart_method + self.services = [] + self.tg = threadgroup.ThreadGroup() + self.done = threading.Event() + + def add(self, service): + self.services.append(service) + self.tg.add_thread(self.run_service, service, self.done) + + def stop(self): + for service in self.services: + service.stop() + if not self.done.is_set(): + self.done.set() + self.tg.stop() + + def wait(self): + for service in self.services: + service.wait() + self.tg.wait() + + def restart(self): + if self.restart_method == 'reload': + self.stop() + self.done = threading.Event() + + for restart_service in self.services: + restart_service.reset() + if self.restart_method == 'reload': + self.tg.add_thread( + self.run_service, restart_service, self.done) + + @staticmethod + def run_service(service, done): + try: + service.start() + except Exception: + LOG.exception('Error starting service thread.') + raise SystemExit(1) + else: + done.wait() + + +class ProcessLauncher: + def __init__(self, conf, restart_method='reload', no_fork=False): + self.conf = conf + self.restart_method = restart_method + self.no_fork = no_fork + self._manager = None + self._service_instance = None + + def launch_service(self, service, workers=1): + _check_service_base(service) + self._service_instance = service + + if self.no_fork: + LOG.warning("no_fork=True: running service in main process") + service.start() + service.wait() + return + + self._manager = cotyledon.ServiceManager() + oslo_config_glue.setup(self._manager, self.conf) + self._manager.add(ServiceWrapper, workers, + args=(self._service_instance,)) + + def wait(self): + if self.no_fork: + return 0 + return self._manager.run() + + def stop(self): + LOG.info("Stopping service") + if self._manager: + self._manager._terminate(None, None) + + +def launch(conf, service, workers=1, restart_method='reload', no_fork=False): + if workers is not None and not isinstance(workers, int): + raise TypeError("Type of workers should be int!") + if workers is not None and workers <= 0: + raise ValueError("Number of workers should be positive!") + + if workers == 1 and not no_fork: + launcher = ServiceLauncher(conf, restart_method=restart_method) + else: + launcher = ProcessLauncher( + conf, restart_method=restart_method, no_fork=no_fork) + + launcher.launch_service(service, workers=workers) + return launcher diff --git a/oslo_service/backend/threading/threadgroup.py b/oslo_service/backend/threading/threadgroup.py new file mode 100644 index 00000000..b22bd445 --- /dev/null +++ b/oslo_service/backend/threading/threadgroup.py @@ -0,0 +1,233 @@ +# Copyright (C) 2025 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +import threading +import time +import warnings + +from oslo_service.backend.threading import loopingcall + +LOG = logging.getLogger(__name__) + +# --- New: Add a Thread wrapper to mimic the eventlet interface --- + + +class Thread: + """A simple wrapper around native threads. + + This class mimics the eventlet Thread interface (stop, wait, link, + cancel) for compatibility with oslo.service consumers. The methods + `stop`, `link`, and `cancel` are implemented as no-ops in the threading + backend since native Python threads do not support these operations + natively. + """ + + def __init__(self, thread, group=None, link=True): + self.thread = thread + self._ident = thread.ident + self.group = group + # Optionally, support for a link callback can be added here. + + @property + def ident(self): + return self._ident + + def stop(self): + # These methods are no-ops in the threading backend because native + # Python threads cannot be forcefully stopped or cancelled once + # started. They are kept here to preserve API compatibility with the + # eventlet backend, where these methods are implemented. + pass + + def wait(self): + self.thread.join() + + def link(self, func, *args, **kwargs): + # Optionally schedule a callback after thread completion. + pass + + def cancel(self, *throw_args): + # Optionally implement cancellation if required. + pass + +# --- End of new Thread wrapper --- + + +class ThreadGroup: + """A group of threads and timers similar to eventlet's GreenPool.""" + + def __init__(self, max_threads=1000): + self.max_threads = max_threads + self.threads = [] + self.timers = [] + self._lock = threading.Lock() + + def __getstate__(self): + # Exclude _lock from pickling. + state = self.__dict__.copy() + if '_lock' in state: + del state['_lock'] + return state + + def __setstate__(self, state): + self.__dict__.update(state) + # Recreate the lock after unpickling. + self._lock = threading.Lock() + + def add_timer(self, delay, callback, *args, **kwargs): + if args or kwargs: + warnings.warn( + "Calling add_timer() with arguments is deprecated. Use " + "add_timer_args() instead.", + DeprecationWarning + ) + new_args = list(args) + if new_args and new_args[0] == delay: + new_args = new_args[1:] + return self.add_timer_args(delay, callback, new_args, kwargs) + + def add_timer_args(self, interval, callback, args=None, kwargs=None, + initial_delay=None, stop_on_exception=True): + args = args or [] + kwargs = kwargs or {} + pulse = loopingcall.FixedIntervalLoopingCall( + callback, *args, **kwargs) + pulse.start(interval=interval, + initial_delay=initial_delay, + stop_on_exception=stop_on_exception) + self._set_attr(pulse, '_running', True) + pulse.args = tuple(args) + pulse.kw = kwargs + with self._lock: + self.timers.append(pulse) + return pulse + + def add_dynamic_timer(self, callback, initial_delay, periodic_interval_max, + *args, **kwargs): + warnings.warn( + "Calling add_dynamic_timer() with arguments is deprecated. Use " + "add_dynamic_timer_args() instead.", + DeprecationWarning + ) + return self.add_dynamic_timer_args( + callback, list(args), kwargs, initial_delay=initial_delay, + periodic_interval_max=periodic_interval_max) + + def add_dynamic_timer_args(self, callback, args=None, kwargs=None, + initial_delay=None, periodic_interval_max=None, + stop_on_exception=True): + args = args or [] + kwargs = kwargs or {} + timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs) + timer.start(initial_delay=initial_delay, + periodic_interval_max=periodic_interval_max, + stop_on_exception=stop_on_exception) + self._set_attr(timer, '_running', True) + timer.args = tuple(args) + timer.kw = kwargs + with self._lock: + self.timers.append(timer) + return timer + + def add_thread(self, callback, *args, **kwargs): + with self._lock: + if len(self.threads) >= self.max_threads: + raise RuntimeError("Maximum number of threads reached") + + t = threading.Thread(target=callback, args=args, kwargs=kwargs) + t.args = args + t.kw = kwargs + + self.threads.append(t) + + t.start() + return Thread(t, group=self) + + def thread_done(self, thread): + with self._lock: + try: + self.threads.remove( + thread.thread if isinstance(thread, Thread) else thread) + except ValueError: + pass + + def timer_done(self, timer): + with self._lock: + try: + self.timers.remove(timer) + except ValueError: + pass + + def cancel(self, *throw_args, **kwargs): + pass + + def stop_timers(self): + with self._lock: + + for timer in self.timers: + timer.stop() + + self.timers = [] + + def stop(self, graceful=False): + self.stop_timers() + + if graceful: + self._wait_threads() + else: + self._stop_threads() + time.sleep(0.05) + + def _stop_threads(self): + current = threading.current_thread() + + with self._lock: + for t in self.threads: + if t is not current and hasattr(t, "abort"): + t.abort.set() + + self.threads = [t for t in self.threads if t is current] + + def _wait_threads(self): + current = threading.current_thread() + + with self._lock: + for t in self.threads: + if t is not current: + try: + t.join() + except Exception: + LOG.exception('Error waiting on thread.') + + self.threads = [t for t in self.threads if t is current] + + def waitall(self): + with self._lock: + threads_copy = list(self.threads) + timers_copy = list(self.timers) + + for t in threads_copy: + t.join() + + for timer in timers_copy: + timer.wait() + + def _set_attr(self, obj, attr, value): + try: + object.__setattr__(obj, attr, value) + except Exception: + if hasattr(obj, '__dict__'): + obj.__dict__[attr] = value diff --git a/oslo_service/tests/backend/tests/test_backend_init.py b/oslo_service/tests/backend/tests/test_backend_init.py index fe974c33..1643d3e8 100644 --- a/oslo_service/tests/backend/tests/test_backend_init.py +++ b/oslo_service/tests/backend/tests/test_backend_init.py @@ -35,34 +35,38 @@ class TestBackend(unittest.TestCase): self.assertEqual(backend.__class__.__name__, "EventletBackend") def test_init_backend_explicit(self): - """test that init_backend() can be called before get_backend()""" - + """Test that init_backend() can be called before get_backend().""" init_backend(BackendType.EVENTLET) - backend = get_backend() self.assertEqual(backend.__class__.__name__, "EventletBackend") def test_dont_reinit_backend_from_default(self): - """test that init_backend() can't be called after get_backend()""" - + """Fail if init_backend() called after get_backend() with another.""" get_backend() - with self.assertRaisesRegex( exceptions.BackendAlreadySelected, - "The 'eventlet' backend is already set up", + "Backend already set to 'eventlet'", ): - init_backend(BackendType.EVENTLET) + init_backend(BackendType.THREADING) def test_dont_reinit_backend_explicit_init(self): - """test that init_backend() can't be called twice""" - + """Fail if init_backend() called twice with different backend.""" init_backend(BackendType.EVENTLET) - with self.assertRaisesRegex( exceptions.BackendAlreadySelected, - "The 'eventlet' backend is already set up", + "Backend already set to 'eventlet'", ): + init_backend(BackendType.THREADING) + + def test_reinit_backend_same_type_is_noop(self): + """init_backend() with same type is a no-op.""" + init_backend(BackendType.EVENTLET) + try: init_backend(BackendType.EVENTLET) + except exceptions.BackendAlreadySelected: + self.fail( + "init_backend() should be a no-op if same type is passed" + ) def test_cached_backend(self): """Test backend is cached after initial load.""" @@ -82,13 +86,46 @@ class TestBackend(unittest.TestCase): def test_backend_components(self): """Test that components are cached when init_backend is called.""" - init_backend(BackendType.EVENTLET) - backend = get_backend() - self.assertTrue( {"ServiceBase", "ServiceLauncher"}.intersection( backend.get_service_components() ) ) + + def test_get_backend_type(self): + """Ensure get_backend_type() returns the selected backend.""" + self.assertIsNone(backend_module.get_backend_type()) + init_backend(BackendType.THREADING) + self.assertEqual( + backend_module.get_backend_type(), BackendType.THREADING) + + +class TestBackendHook(unittest.TestCase): + def setUp(self): + super().setUp() + backend_module._reset_backend() + + def test_hook_sets_default_backend_when_not_explicitly_initialized(self): + backend_module.register_backend_default_hook( + lambda: BackendType.THREADING) + result = backend_module.get_backend() + self.assertEqual( + backend_module._cached_backend_type, BackendType.THREADING) + self.assertIsNotNone(result) + self.assertIsNotNone(result.get_service_components()) + + def test_hook_is_ignored_if_backend_already_initialized(self): + backend_module.init_backend(BackendType.EVENTLET) + backend_module.register_backend_default_hook( + lambda: BackendType.THREADING) + self.assertEqual( + backend_module._cached_backend_type, BackendType.EVENTLET) + + def test_second_init_backend_raises_exception_even_with_hook(self): + backend_module.init_backend(BackendType.THREADING) + backend_module.register_backend_default_hook( + lambda: BackendType.EVENTLET) + with self.assertRaises(exceptions.BackendAlreadySelected): + backend_module.init_backend(BackendType.EVENTLET) diff --git a/oslo_service/tests/backend/threading/__init__.py b/oslo_service/tests/backend/threading/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/releasenotes/notes/add-threading-backend-9b0e601e5c1282e1.yaml b/releasenotes/notes/add-threading-backend-9b0e601e5c1282e1.yaml new file mode 100644 index 00000000..e47da6ba --- /dev/null +++ b/releasenotes/notes/add-threading-backend-9b0e601e5c1282e1.yaml @@ -0,0 +1,64 @@ +features: + - | + Added a new `threading` backend to `oslo.service`, using standard Python + threads instead of `eventlet`. This includes a full implementation of: + + - `Service`, `Launcher`, `ServiceLauncher`, and `ProcessLauncher` using + `cotyledon` + - `LoopingCall` variants (`FixedIntervalLoopingCall`, + `DynamicLoopingCall`, etc.) using `futurist.ThreadPoolExecutor` + - A new `ThreadGroup` and `Thread` abstraction to mimic `eventlet`-like + behavior + - A native `SignalHandler` implementation using standard Python signals + + This backend provides a standard-thread-compatible alternative that avoids + monkey-patching, making it suitable for environments where `eventlet` is + problematic or undesirable. + + Additionally: + + - `ProcessLauncher` now supports a `no_fork=True` mode, allowing services + to run in the main process without forking. This is useful when `fork()` + is unsafe — for example, in threaded environments or with Python 3.12+ + where the default multiprocessing start method has changed to `spawn`. + + - A new `register_backend_default_hook()` API has been added. It allows + users to define a fallback backend type in case `init_backend()` was not + called early enough. This is helpful in environments where import order + or initialization timing cannot be guaranteed. + + Example: + + ```python + from oslo_service import backend + backend.register_backend_default_hook(lambda: backend.BackendType.THREADING) + ``` + + This hook will only be used if `init_backend()` has not already been called. + +upgrade: + - | + While Python 3.14 defaults to ``spawn`` as the multiprocessing start + method, `oslo.service` continues to rely on ``fork`` as the only supported + method for creating worker processes. Many parts of OpenStack depend on + objects that cannot be safely pickled (e.g. argparse parsers, thread locks, + lambdas in config defaults), which makes ``spawn`` currently impractical. + + In the long term, process scaling should be handled externally (e.g. via + Kubernetes or systemd), rather than by the service itself. + +issues: + - | + When using the `threading` backend with multiple workers + (i.e. `ProcessLauncher` with `fork()`), **starting threads before the fork + occurs can lead to corrupted state** due to how `os.fork()` behaves in + multi-threaded processes. This is a known limitation in Python. + + See: https://gibizer.github.io/posts/Eventlet-Removal-The-First-Threading-Bug/#threads--osfork--confused-threadpools + + To avoid this issue, you can: + + - Ensure that no threads are started before `oslo.service` forks the workers. + - Use `ProcessLauncher(no_fork=True)` to disable forking entirely. + - Explicitly manage thread lifecycle — for example, stop all threads before + forking, as currently done in Nova. diff --git a/setup.cfg b/setup.cfg index 88ac7543..9b2c7f03 100644 --- a/setup.cfg +++ b/setup.cfg @@ -22,6 +22,11 @@ classifier = Programming Language :: Python :: 3 :: Only Programming Language :: Python :: Implementation :: CPython +[options.extras_require] +threading = + cotyledon>=2.0.0 + futurist>=3.1.1 + [files] packages = oslo_service diff --git a/setup.py b/setup.py index cd35c3c3..481505b0 100644 --- a/setup.py +++ b/setup.py @@ -15,6 +15,4 @@ import setuptools -setuptools.setup( - setup_requires=['pbr>=2.0.0'], - pbr=True) +setuptools.setup(setup_requires=['pbr>=2.0.0'], pbr=True) diff --git a/test-requirements.txt b/test-requirements.txt index 2cff2d31..36179472 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -4,3 +4,5 @@ requests>=2.14.2 # Apache-2.0 stestr>=2.0.0 # Apache-2.0 coverage>=4.0 # Apache-2.0 +cotyledon>=2.0.0 +futurist>=3.1.1