Merge "Threads to process pollsters"

This commit is contained in:
Zuul
2025-07-04 09:47:21 +00:00
committed by Gerrit Code Review
3 changed files with 245 additions and 140 deletions

View File

@@ -117,6 +117,21 @@ POLLING_OPTS = [
default=None, default=None,
help='The private key to allow this ceilometer to ' help='The private key to allow this ceilometer to '
'expose tls scrape endpoints'), 'expose tls scrape endpoints'),
cfg.IntOpt('threads_to_process_pollsters',
default=1,
min=0,
help='The number of threads used to process the pollsters.'
'The value one (1) means that the processing is in a'
'serial fashion (not ordered!). The value zero (0) means '
'that the we will use as much threads as the number of '
'pollsters configured in the polling task. Any other'
'positive integer can be used to fix an upper bound limit'
'to the number of threads used for processing pollsters in'
'parallel. One must bear in mind that, using more than one'
'thread might not take full advantage of the discovery '
'cache and pollsters cache processes; it is possible '
'though to improve/use pollsters that synchronize '
'themselves in the cache objects.'),
] ]
@@ -215,155 +230,211 @@ class PollingTask:
poll_history = {} poll_history = {}
for source_name, pollsters in iter_random( for source_name, pollsters in iter_random(
self.pollster_matches.items()): self.pollster_matches.items()):
for pollster in iter_random(pollsters): self.execute_polling_task_processing(cache, discovery_cache,
key = Resources.key(source_name, pollster) poll_history, pollsters,
candidate_res = list( source_name)
self.resources[key].get(discovery_cache))
if not candidate_res and pollster.obj.default_discovery:
LOG.debug("Executing discovery process for pollsters [%s] "
"and discovery method [%s] via process [%s].",
pollster.obj, pollster.obj.default_discovery,
self.manager.discover)
candidate_res = self.manager.discover( def execute_polling_task_processing(self, cache, discovery_cache,
[pollster.obj.default_discovery], discovery_cache) poll_history, pollsters, source_name):
all_pollsters = list(pollsters)
number_workers_for_pollsters =\
self.manager.conf.polling.threads_to_process_pollsters
# Remove duplicated resources and black resources. Using if number_workers_for_pollsters < 0:
# set() requires well defined __hash__ for each resource. raise RuntimeError("The configuration "
# Since __eq__ is defined, 'not in' is safe here. "'threads_to_process_pollsters' has a negative "
polling_resources = [] "value [%s], which should not be allowed.",
black_res = self.resources[key].blacklist number_workers_for_pollsters)
history = poll_history.get(pollster.name, [])
for x in candidate_res:
if x not in history:
history.append(x)
if x not in black_res:
polling_resources.append(x)
poll_history[pollster.name] = history
# If no resources, skip for this pollster if number_workers_for_pollsters == 0:
if not polling_resources: number_workers_for_pollsters = len(all_pollsters)
p_context = 'new' if history else ''
LOG.debug("Skip pollster %(name)s, no %(p_context)s "
"resources found this cycle",
{'name': pollster.name, 'p_context': p_context})
continue
LOG.info("Polling pollster %(poll)s in the context of " if number_workers_for_pollsters < len(all_pollsters):
"%(src)s", LOG.debug("The number of pollsters in source [%s] is bigger "
dict(poll=pollster.name, src=source_name)) "than the number of worker threads to execute them. "
try: "Therefore, one can expect the process to be longer "
source_obj = self.sources_map[source_name] "than the expected.", source_name)
coordination_group_name = source_obj.group_for_coordination
LOG.debug("Checking if we need coordination for pollster " all_pollster_scheduled = []
"[%s] with coordination group name [%s].", with futures.ThreadPoolExecutor(
pollster, coordination_group_name) thread_name_prefix="Pollster-executor",
if self.manager.hashrings and self.manager.hashrings.get( max_workers=number_workers_for_pollsters) as executor:
coordination_group_name): LOG.debug("Processing pollsters for [%s] with [%s] threads.",
LOG.debug("The pollster [%s] is configured in a " source_name, number_workers_for_pollsters)
"source for polling that requires "
"coordination under name [%s].", pollster,
coordination_group_name)
group_coordination = self.manager.hashrings[
coordination_group_name].belongs_to_self(
str(pollster.name))
LOG.debug("Pollster [%s] is configured with " for pollster in all_pollsters:
"coordination [%s] under name [%s].", all_pollster_scheduled.append(
pollster.name, group_coordination, self.register_pollster_execution(
coordination_group_name) cache, discovery_cache, executor, poll_history,
if not group_coordination: pollster, source_name))
LOG.info("The pollster [%s] should be processed "
"by other node.", pollster.name)
continue
else:
LOG.debug("The pollster [%s] is not configured in a "
"source for polling that requires "
"coordination. The current hashrings are "
"the following [%s].", pollster,
self.manager.hashrings)
polling_timestamp = timeutils.utcnow().isoformat() for s in all_pollster_scheduled:
samples = pollster.obj.get_samples( LOG.debug(s.result())
manager=self.manager,
cache=cache,
resources=polling_resources
)
sample_batch = []
self.manager.heartbeat(pollster.name, polling_timestamp) def register_pollster_execution(self, cache, discovery_cache, executor,
poll_history, pollster, source_name):
LOG.debug("Registering pollster [%s] from source [%s] to be executed "
"via executor [%s] with cache [%s], pollster history [%s], "
"and discovery cache [%s].", pollster, source_name, executor,
cache, poll_history, discovery_cache)
for sample in samples: def _internal_function():
# Note(yuywz): Unify the timestamp of polled samples self._internal_pollster_run(cache, discovery_cache, poll_history,
sample.set_timestamp(polling_timestamp) pollster, source_name)
return "Finished processing pollster [%s]." % pollster.name
if self._name_discovery and self._cache: return executor.submit(_internal_function)
# Try to resolve project UUIDs from cache first, def _internal_pollster_run(self, cache, discovery_cache, poll_history,
# and then keystone pollster, source_name):
LOG.debug("Ceilometer is configured to resolve " key = Resources.key(source_name, pollster)
"project IDs to name; loading the " candidate_res = list(
"project name for project ID [%s] in " self.resources[key].get(discovery_cache))
"sample [%s].", sample.project_id, if not candidate_res and pollster.obj.default_discovery:
sample) LOG.debug("Executing discovery process for pollsters [%s] "
if sample.project_id: "and discovery method [%s] via process [%s].",
sample.project_name = \ pollster.obj, pollster.obj.default_discovery,
self._cache.resolve_uuid_from_cache( self.manager.discover)
"projects",
sample.project_id
)
# Try to resolve user UUIDs from cache first, candidate_res = self.manager.discover(
# and then keystone [pollster.obj.default_discovery], discovery_cache)
LOG.debug("Ceilometer is configured to resolve " # Remove duplicated resources and black resources. Using
"user IDs to name; loading the " # set() requires well defined __hash__ for each resource.
"user name for user ID [%s] in " # Since __eq__ is defined, 'not in' is safe here.
"sample [%s].", sample.user_id, polling_resources = []
sample) black_res = self.resources[key].blacklist
if sample.user_id: history = poll_history.get(pollster.name, [])
sample.user_name = \ for x in candidate_res:
self._cache.resolve_uuid_from_cache( if x not in history:
"users", history.append(x)
sample.user_id if x not in black_res:
) polling_resources.append(x)
poll_history[pollster.name] = history
LOG.debug("Final sample generated after loading " # If no resources, skip for this pollster
"the project and user names bases on " if not polling_resources:
"the IDs [%s].", sample) p_context = 'new' if history else ''
LOG.debug("Skip pollster %(name)s, no %(p_context)s "
"resources found this cycle",
{'name': pollster.name, 'p_context': p_context})
return
sample_dict = ( LOG.info("Polling pollster %(poll)s in the context of "
publisher_utils.meter_message_from_counter( "%(src)s",
sample, self._telemetry_secret dict(poll=pollster.name, src=source_name))
)) try:
if self._batch_size: source_obj = self.sources_map[source_name]
if len(sample_batch) >= self._batch_size: coordination_group_name = source_obj.group_for_coordination
self._send_notification(sample_batch)
sample_batch = []
sample_batch.append(sample_dict)
else:
self._send_notification([sample_dict])
if sample_batch: LOG.debug("Checking if we need coordination for pollster "
"[%s] with coordination group name [%s].",
pollster, coordination_group_name)
if self.manager.hashrings and self.manager.hashrings.get(
coordination_group_name):
LOG.debug("The pollster [%s] is configured in a "
"source for polling that requires "
"coordination under name [%s].", pollster,
coordination_group_name)
group_coordination = self.manager.hashrings[
coordination_group_name].belongs_to_self(
str(pollster.name))
LOG.debug("Pollster [%s] is configured with "
"coordination [%s] under name [%s].",
pollster.name, group_coordination,
coordination_group_name)
if not group_coordination:
LOG.info("The pollster [%s] should be processed "
"by other node.", pollster.name)
return
else:
LOG.debug("The pollster [%s] is not configured in a "
"source for polling that requires "
"coordination. The current hashrings are "
"the following [%s].", pollster,
self.manager.hashrings)
polling_timestamp = timeutils.utcnow().isoformat()
samples = pollster.obj.get_samples(
manager=self.manager,
cache=cache,
resources=polling_resources
)
sample_batch = []
self.manager.heartbeat(pollster.name, polling_timestamp)
for sample in samples:
# Note(yuywz): Unify the timestamp of polled samples
sample.set_timestamp(polling_timestamp)
if self._name_discovery and self._cache:
# Try to resolve project UUIDs from cache first,
# and then keystone
LOG.debug("Ceilometer is configured to resolve "
"project IDs to name; loading the "
"project name for project ID [%s] in "
"sample [%s].", sample.project_id,
sample)
if sample.project_id:
sample.project_name = \
self._cache.resolve_uuid_from_cache(
"projects",
sample.project_id
)
# Try to resolve user UUIDs from cache first,
# and then keystone
LOG.debug("Ceilometer is configured to resolve "
"user IDs to name; loading the "
"user name for user ID [%s] in "
"sample [%s].", sample.user_id,
sample)
if sample.user_id:
sample.user_name = \
self._cache.resolve_uuid_from_cache(
"users",
sample.user_id
)
LOG.debug("Final sample generated after loading "
"the project and user names bases on "
"the IDs [%s].", sample)
sample_dict = (
publisher_utils.meter_message_from_counter(
sample, self._telemetry_secret
))
if self._batch_size:
if len(sample_batch) >= self._batch_size:
self._send_notification(sample_batch) self._send_notification(sample_batch)
sample_batch = []
sample_batch.append(sample_dict)
else:
self._send_notification([sample_dict])
LOG.info("Finished polling pollster %(poll)s in the " if sample_batch:
"context of %(src)s", dict(poll=pollster.name, self._send_notification(sample_batch)
src=source_name))
except plugin_base.PollsterPermanentError as err: LOG.info("Finished polling pollster %(poll)s in the "
LOG.error( "context of %(src)s", dict(poll=pollster.name,
'Prevent pollster %(name)s from ' src=source_name))
'polling %(res_list)s on source %(source)s anymore!', except plugin_base.PollsterPermanentError as err:
dict(name=pollster.name, LOG.error(
res_list=str(err.fail_res_list), 'Prevent pollster %(name)s from '
source=source_name)) 'polling %(res_list)s on source %(source)s anymore!',
self.resources[key].blacklist.extend(err.fail_res_list) dict(name=pollster.name,
except Exception as err: res_list=str(err.fail_res_list),
LOG.error( source=source_name))
'Continue after error from %(name)s: %(error)s' self.resources[key].blacklist.extend(err.fail_res_list)
% ({'name': pollster.name, 'error': err}), except Exception as err:
exc_info=True) LOG.error(
'Continue after error from %(name)s: %(error)s'
% ({'name': pollster.name, 'error': err}),
exc_info=True)
def _send_notification(self, samples): def _send_notification(self, samples):
if self.manager.conf.polling.enable_notifications: if self.manager.conf.polling.enable_notifications:

View File

@@ -254,10 +254,13 @@ class BaseAgent(base.BaseTestCase):
class DiscoveryException(TestDiscoveryException): class DiscoveryException(TestDiscoveryException):
params = [] params = []
def setup_polling(self, poll_cfg=None): def setup_polling(self, poll_cfg=None, override_conf=None):
name = self.cfg2file(poll_cfg or self.polling_cfg) name = self.cfg2file(poll_cfg or self.polling_cfg)
self.CONF.set_override('cfg_file', name, group='polling')
self.mgr.polling_manager = manager.PollingManager(self.CONF) conf_to_use = override_conf or self.CONF
conf_to_use.set_override('cfg_file', name, group='polling')
self.mgr.polling_manager = manager.PollingManager(conf_to_use)
def create_manager(self): def create_manager(self):
queue = multiprocessing.Queue() queue = multiprocessing.Queue()
@@ -687,6 +690,31 @@ class TestPollingAgent(BaseAgent):
mock.call('Polster heartbeat update: test') mock.call('Polster heartbeat update: test')
]) ])
@mock.patch('ceilometer.polling.manager.LOG')
def test_polling_and_notify_with_resources_with_threads(self, log_mock):
conf_to_use = self.CONF
conf_to_use.set_override(
'threads_to_process_pollsters', 4, group='polling')
self.setup_polling(override_conf=conf_to_use)
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
polling_task.poll_and_notify()
log_mock.info.assert_has_calls([
mock.call('Polling pollster %(poll)s in the context of %(src)s',
{'poll': 'test', 'src': 'test_polling'}),
mock.call('Finished polling pollster %(poll)s in the context '
'of %(src)s', {'poll': 'test', 'src': 'test_polling'})
])
log_mock.debug.assert_has_calls([
mock.call('Polster heartbeat update: test')
])
# Even though we enabled 4 threads, we have only one metric configured.
# Therefore, there should be only one call here.
self.assertEqual(1, polling_task.manager.notifier.sample.call_count)
@mock.patch('ceilometer.polling.manager.LOG') @mock.patch('ceilometer.polling.manager.LOG')
def test_skip_polling_and_notify_with_no_resources(self, LOG): def test_skip_polling_and_notify_with_no_resources(self, LOG):
self.polling_cfg['sources'][0]['resources'] = [] self.polling_cfg['sources'][0]['resources'] = []
@@ -694,9 +722,9 @@ class TestPollingAgent(BaseAgent):
polling_task = list(self.mgr.setup_polling_tasks().values())[0] polling_task = list(self.mgr.setup_polling_tasks().values())[0]
pollster = list(polling_task.pollster_matches['test_polling'])[0] pollster = list(polling_task.pollster_matches['test_polling'])[0]
polling_task.poll_and_notify() polling_task.poll_and_notify()
LOG.debug.assert_called_with( LOG.debug.assert_has_calls([mock.call(
'Skip pollster %(name)s, no %(p_context)s resources found this ' 'Skip pollster %(name)s, no %(p_context)s resources found '
'cycle', {'name': pollster.name, 'p_context': ''}) 'this cycle', {'name': pollster.name, 'p_context': ''})])
@mock.patch('ceilometer.polling.manager.LOG') @mock.patch('ceilometer.polling.manager.LOG')
def test_skip_polling_polled_resources(self, LOG): def test_skip_polling_polled_resources(self, LOG):
@@ -709,9 +737,9 @@ class TestPollingAgent(BaseAgent):
self.setup_polling() self.setup_polling()
polling_task = list(self.mgr.setup_polling_tasks().values())[0] polling_task = list(self.mgr.setup_polling_tasks().values())[0]
polling_task.poll_and_notify() polling_task.poll_and_notify()
LOG.debug.assert_called_with( LOG.debug.assert_has_calls([mock.call(
'Skip pollster %(name)s, no %(p_context)s resources found this ' 'Skip pollster %(name)s, no %(p_context)s resources found '
'cycle', {'name': 'test', 'p_context': 'new'}) 'this cycle', {'name': 'test', 'p_context': 'new'})])
@mock.patch('oslo_utils.timeutils.utcnow') @mock.patch('oslo_utils.timeutils.utcnow')
def test_polling_samples_timestamp(self, mock_utc): def test_polling_samples_timestamp(self, mock_utc):

View File

@@ -0,0 +1,6 @@
---
features:
- |
Introduce ``threads_to_process_pollsters`` to enable operators to define
the number of pollsters that can be executed in parallel inside a
polling task.