diff --git a/nova/conductor/manager.py b/nova/conductor/manager.py index fb1530a39d62..c9067a971312 100644 --- a/nova/conductor/manager.py +++ b/nova/conductor/manager.py @@ -19,9 +19,9 @@ import contextlib import copy import functools import sys +import threading import typing as ty -import futurist from keystoneauth1 import exceptions as ks_exc from oslo_config import cfg from oslo_db import exception as db_exc @@ -2062,7 +2062,7 @@ class ComputeTaskManager: the host list :param image_id: The IDs of the image to cache """ - + local_lock = threading.Lock() # TODO(mriedem): Consider including the list of images in the # notification payload. compute_utils.notify_about_aggregate_action( @@ -2072,7 +2072,7 @@ class ComputeTaskManager: clock = timeutils.StopWatch() threads = CONF.image_cache.precache_concurrency - fetch_executor = futurist.GreenThreadPoolExecutor(max_workers=threads) + fetch_executor = utils.create_executor(threads) hosts_by_cell = {} cells_by_uuid = {} @@ -2099,24 +2099,24 @@ class ComputeTaskManager: } def host_completed(context, host, result): - for image_id, status in result.items(): - cached, existing, error, unsupported = stats[image_id] - if status == 'error': - failed_images[image_id] += 1 - error += 1 - elif status == 'cached': - cached += 1 - elif status == 'existing': - existing += 1 - elif status == 'unsupported': - unsupported += 1 - stats[image_id] = (cached, existing, error, unsupported) + with local_lock: + for image_id, status in result.items(): + cached, existing, error, unsupported = stats[image_id] + if status == 'error': + failed_images[image_id] += 1 + error += 1 + elif status == 'cached': + cached += 1 + elif status == 'existing': + existing += 1 + elif status == 'unsupported': + unsupported += 1 + stats[image_id] = (cached, existing, error, unsupported) host_stats['completed'] += 1 - compute_utils.notify_about_aggregate_cache(context, aggregate, - host, result, - host_stats['completed'], - host_stats['total']) + compute_utils.notify_about_aggregate_cache( + context, aggregate, host, result, + host_stats['completed'], host_stats['total']) def wrap_cache_images(ctxt, host, image_ids): result = self.compute_rpcapi.cache_images( diff --git a/nova/utils.py b/nova/utils.py index 38fdd79fefd8..db00c515bdd5 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -104,19 +104,23 @@ def destroy_default_executor(): DEFAULT_EXECUTOR = None +def create_executor(max_workers): + if concurrency_mode_threading(): + executor = futurist.ThreadPoolExecutor(max_workers) + else: + executor = futurist.GreenThreadPoolExecutor(max_workers) + return executor + + def _get_default_executor(): global DEFAULT_EXECUTOR if not DEFAULT_EXECUTOR: - if concurrency_mode_threading(): - DEFAULT_EXECUTOR = futurist.ThreadPoolExecutor( - CONF.default_thread_pool_size + max_workers = ( + CONF.default_thread_pool_size if concurrency_mode_threading() + else CONF.default_green_pool_size ) - else: - DEFAULT_EXECUTOR = futurist.GreenThreadPoolExecutor( - CONF.default_green_pool_size - ) - + DEFAULT_EXECUTOR = create_executor(max_workers) pname = multiprocessing.current_process().name executor_name = f"{pname}.default" DEFAULT_EXECUTOR.name = executor_name @@ -1183,11 +1187,11 @@ def get_scatter_gather_executor(): global SCATTER_GATHER_EXECUTOR if not SCATTER_GATHER_EXECUTOR: - if concurrency_mode_threading(): - SCATTER_GATHER_EXECUTOR = futurist.ThreadPoolExecutor( - CONF.cell_worker_thread_pool_size) - else: - SCATTER_GATHER_EXECUTOR = futurist.GreenThreadPoolExecutor() + max_workers = ( + CONF.cell_worker_thread_pool_size + if concurrency_mode_threading() else 1000 + ) + SCATTER_GATHER_EXECUTOR = create_executor(max_workers) pname = multiprocessing.current_process().name executor_name = f"{pname}.cell_worker" diff --git a/requirements.txt b/requirements.txt index 89873efe0954..291ccbd828ae 100644 --- a/requirements.txt +++ b/requirements.txt @@ -60,6 +60,6 @@ cursive>=0.2.1 # Apache-2.0 retrying>=1.3.3 # Apache-2.0 os-service-types>=1.7.0 # Apache-2.0 python-dateutil>=2.7.0 # BSD -futurist>=1.8.0 # Apache-2.0 +futurist>=3.2.1 # Apache-2.0 openstacksdk>=4.4.0 # Apache-2.0 PyYAML>=5.1 # MIT