From 492974dd0b4c5666defe2398e95d80aa325e3d0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Weing=C3=A4rtner?= Date: Fri, 20 Jun 2025 14:33:16 -0300 Subject: [PATCH] Threads to process pollsters MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The pollsters in Ceilometer are grouped in "sources" (A.K.A "polling task"); normally, people group pollsters in "sources" by interval, for instance, putting together all pollsters that should gather data every 10 minutes, and so on. On the other hand, some other people configure the "sources" to represent a polling context, such as, all pollsters that collect data from instances together, all pollsters that collect data from routers, load balancers, RadosGW, and so on. The "sources" definition, are all processed in their own thread. Therefore, "sources" are processed in parallel. On the other hand, the pollsters inside the "sources" are processed in a serial fashion in the same thread. This can be a problem, if a "source" has many pollsters, and their data collection and processing takes a while to finish. Of course, one can take it to the extreme, and configure only one pollster per "source". However, that is not a very interesting solution, and would make the configuration a bit odd. This patch proposes a configuration to enable operators to define the number of threads to use when processing pollsters of a "source". The value one (1) means that the processing is in a serial fashion (not ordered!). The value zero (0) means that we will use as much threads as the number of pollsters configured in the polling task. Any other positive integer can be used to fix an upper bound limit to the number of threads used for processing pollsters in parallel. One must bear in mind that, using more than one thread might not take full advantage of the discovery cache and pollsters cache processes; it is possible though to improve/use/develop pollsters that synchronize themselves in the cache objects. Change-Id: I80b2f18a70cea1ab6e31b37e75ac93ee897e6cb4 Signed-off-by: Rafael Weingärtner --- ceilometer/polling/manager.py | 333 +++++++++++------- ceilometer/tests/unit/polling/test_manager.py | 46 ++- ...ds-process-pollsters-cbd22cca6f2effc4.yaml | 6 + 3 files changed, 245 insertions(+), 140 deletions(-) create mode 100644 releasenotes/notes/threeads-process-pollsters-cbd22cca6f2effc4.yaml diff --git a/ceilometer/polling/manager.py b/ceilometer/polling/manager.py index 2c0e8ae982..d344492f7b 100644 --- a/ceilometer/polling/manager.py +++ b/ceilometer/polling/manager.py @@ -117,6 +117,21 @@ POLLING_OPTS = [ default=None, help='The private key to allow this ceilometer to ' 'expose tls scrape endpoints'), + cfg.IntOpt('threads_to_process_pollsters', + default=1, + min=0, + help='The number of threads used to process the pollsters.' + 'The value one (1) means that the processing is in a' + 'serial fashion (not ordered!). The value zero (0) means ' + 'that the we will use as much threads as the number of ' + 'pollsters configured in the polling task. Any other' + 'positive integer can be used to fix an upper bound limit' + 'to the number of threads used for processing pollsters in' + 'parallel. One must bear in mind that, using more than one' + 'thread might not take full advantage of the discovery ' + 'cache and pollsters cache processes; it is possible ' + 'though to improve/use pollsters that synchronize ' + 'themselves in the cache objects.'), ] @@ -215,155 +230,211 @@ class PollingTask: poll_history = {} for source_name, pollsters in iter_random( self.pollster_matches.items()): - for pollster in iter_random(pollsters): - key = Resources.key(source_name, pollster) - candidate_res = list( - self.resources[key].get(discovery_cache)) - if not candidate_res and pollster.obj.default_discovery: - LOG.debug("Executing discovery process for pollsters [%s] " - "and discovery method [%s] via process [%s].", - pollster.obj, pollster.obj.default_discovery, - self.manager.discover) + self.execute_polling_task_processing(cache, discovery_cache, + poll_history, pollsters, + source_name) - candidate_res = self.manager.discover( - [pollster.obj.default_discovery], discovery_cache) + def execute_polling_task_processing(self, cache, discovery_cache, + poll_history, pollsters, source_name): + all_pollsters = list(pollsters) + number_workers_for_pollsters =\ + self.manager.conf.polling.threads_to_process_pollsters - # Remove duplicated resources and black resources. Using - # set() requires well defined __hash__ for each resource. - # Since __eq__ is defined, 'not in' is safe here. - polling_resources = [] - black_res = self.resources[key].blacklist - history = poll_history.get(pollster.name, []) - for x in candidate_res: - if x not in history: - history.append(x) - if x not in black_res: - polling_resources.append(x) - poll_history[pollster.name] = history + if number_workers_for_pollsters < 0: + raise RuntimeError("The configuration " + "'threads_to_process_pollsters' has a negative " + "value [%s], which should not be allowed.", + number_workers_for_pollsters) - # If no resources, skip for this pollster - if not polling_resources: - p_context = 'new' if history else '' - LOG.debug("Skip pollster %(name)s, no %(p_context)s " - "resources found this cycle", - {'name': pollster.name, 'p_context': p_context}) - continue + if number_workers_for_pollsters == 0: + number_workers_for_pollsters = len(all_pollsters) - LOG.info("Polling pollster %(poll)s in the context of " - "%(src)s", - dict(poll=pollster.name, src=source_name)) - try: - source_obj = self.sources_map[source_name] - coordination_group_name = source_obj.group_for_coordination + if number_workers_for_pollsters < len(all_pollsters): + LOG.debug("The number of pollsters in source [%s] is bigger " + "than the number of worker threads to execute them. " + "Therefore, one can expect the process to be longer " + "than the expected.", source_name) - LOG.debug("Checking if we need coordination for pollster " - "[%s] with coordination group name [%s].", - pollster, coordination_group_name) - if self.manager.hashrings and self.manager.hashrings.get( - coordination_group_name): - LOG.debug("The pollster [%s] is configured in a " - "source for polling that requires " - "coordination under name [%s].", pollster, - coordination_group_name) - group_coordination = self.manager.hashrings[ - coordination_group_name].belongs_to_self( - str(pollster.name)) + all_pollster_scheduled = [] + with futures.ThreadPoolExecutor( + thread_name_prefix="Pollster-executor", + max_workers=number_workers_for_pollsters) as executor: + LOG.debug("Processing pollsters for [%s] with [%s] threads.", + source_name, number_workers_for_pollsters) - LOG.debug("Pollster [%s] is configured with " - "coordination [%s] under name [%s].", - pollster.name, group_coordination, - coordination_group_name) - if not group_coordination: - LOG.info("The pollster [%s] should be processed " - "by other node.", pollster.name) - continue - else: - LOG.debug("The pollster [%s] is not configured in a " - "source for polling that requires " - "coordination. The current hashrings are " - "the following [%s].", pollster, - self.manager.hashrings) + for pollster in all_pollsters: + all_pollster_scheduled.append( + self.register_pollster_execution( + cache, discovery_cache, executor, poll_history, + pollster, source_name)) - polling_timestamp = timeutils.utcnow().isoformat() - samples = pollster.obj.get_samples( - manager=self.manager, - cache=cache, - resources=polling_resources - ) - sample_batch = [] + for s in all_pollster_scheduled: + LOG.debug(s.result()) - self.manager.heartbeat(pollster.name, polling_timestamp) + def register_pollster_execution(self, cache, discovery_cache, executor, + poll_history, pollster, source_name): + LOG.debug("Registering pollster [%s] from source [%s] to be executed " + "via executor [%s] with cache [%s], pollster history [%s], " + "and discovery cache [%s].", pollster, source_name, executor, + cache, poll_history, discovery_cache) - for sample in samples: - # Note(yuywz): Unify the timestamp of polled samples - sample.set_timestamp(polling_timestamp) + def _internal_function(): + self._internal_pollster_run(cache, discovery_cache, poll_history, + pollster, source_name) + return "Finished processing pollster [%s]." % pollster.name - if self._name_discovery and self._cache: + return executor.submit(_internal_function) - # Try to resolve project UUIDs from cache first, - # and then keystone - LOG.debug("Ceilometer is configured to resolve " - "project IDs to name; loading the " - "project name for project ID [%s] in " - "sample [%s].", sample.project_id, - sample) - if sample.project_id: - sample.project_name = \ - self._cache.resolve_uuid_from_cache( - "projects", - sample.project_id - ) + def _internal_pollster_run(self, cache, discovery_cache, poll_history, + pollster, source_name): + key = Resources.key(source_name, pollster) + candidate_res = list( + self.resources[key].get(discovery_cache)) + if not candidate_res and pollster.obj.default_discovery: + LOG.debug("Executing discovery process for pollsters [%s] " + "and discovery method [%s] via process [%s].", + pollster.obj, pollster.obj.default_discovery, + self.manager.discover) - # Try to resolve user UUIDs from cache first, - # and then keystone + candidate_res = self.manager.discover( + [pollster.obj.default_discovery], discovery_cache) - LOG.debug("Ceilometer is configured to resolve " - "user IDs to name; loading the " - "user name for user ID [%s] in " - "sample [%s].", sample.user_id, - sample) - if sample.user_id: - sample.user_name = \ - self._cache.resolve_uuid_from_cache( - "users", - sample.user_id - ) + # Remove duplicated resources and black resources. Using + # set() requires well defined __hash__ for each resource. + # Since __eq__ is defined, 'not in' is safe here. + polling_resources = [] + black_res = self.resources[key].blacklist + history = poll_history.get(pollster.name, []) + for x in candidate_res: + if x not in history: + history.append(x) + if x not in black_res: + polling_resources.append(x) + poll_history[pollster.name] = history - LOG.debug("Final sample generated after loading " - "the project and user names bases on " - "the IDs [%s].", sample) + # If no resources, skip for this pollster + if not polling_resources: + p_context = 'new' if history else '' + LOG.debug("Skip pollster %(name)s, no %(p_context)s " + "resources found this cycle", + {'name': pollster.name, 'p_context': p_context}) + return - sample_dict = ( - publisher_utils.meter_message_from_counter( - sample, self._telemetry_secret - )) - if self._batch_size: - if len(sample_batch) >= self._batch_size: - self._send_notification(sample_batch) - sample_batch = [] - sample_batch.append(sample_dict) - else: - self._send_notification([sample_dict]) + LOG.info("Polling pollster %(poll)s in the context of " + "%(src)s", + dict(poll=pollster.name, src=source_name)) + try: + source_obj = self.sources_map[source_name] + coordination_group_name = source_obj.group_for_coordination - if sample_batch: + LOG.debug("Checking if we need coordination for pollster " + "[%s] with coordination group name [%s].", + pollster, coordination_group_name) + if self.manager.hashrings and self.manager.hashrings.get( + coordination_group_name): + LOG.debug("The pollster [%s] is configured in a " + "source for polling that requires " + "coordination under name [%s].", pollster, + coordination_group_name) + group_coordination = self.manager.hashrings[ + coordination_group_name].belongs_to_self( + str(pollster.name)) + + LOG.debug("Pollster [%s] is configured with " + "coordination [%s] under name [%s].", + pollster.name, group_coordination, + coordination_group_name) + if not group_coordination: + LOG.info("The pollster [%s] should be processed " + "by other node.", pollster.name) + return + else: + LOG.debug("The pollster [%s] is not configured in a " + "source for polling that requires " + "coordination. The current hashrings are " + "the following [%s].", pollster, + self.manager.hashrings) + + polling_timestamp = timeutils.utcnow().isoformat() + samples = pollster.obj.get_samples( + manager=self.manager, + cache=cache, + resources=polling_resources + ) + sample_batch = [] + + self.manager.heartbeat(pollster.name, polling_timestamp) + + for sample in samples: + # Note(yuywz): Unify the timestamp of polled samples + sample.set_timestamp(polling_timestamp) + + if self._name_discovery and self._cache: + + # Try to resolve project UUIDs from cache first, + # and then keystone + LOG.debug("Ceilometer is configured to resolve " + "project IDs to name; loading the " + "project name for project ID [%s] in " + "sample [%s].", sample.project_id, + sample) + if sample.project_id: + sample.project_name = \ + self._cache.resolve_uuid_from_cache( + "projects", + sample.project_id + ) + + # Try to resolve user UUIDs from cache first, + # and then keystone + LOG.debug("Ceilometer is configured to resolve " + "user IDs to name; loading the " + "user name for user ID [%s] in " + "sample [%s].", sample.user_id, + sample) + + if sample.user_id: + sample.user_name = \ + self._cache.resolve_uuid_from_cache( + "users", + sample.user_id + ) + + LOG.debug("Final sample generated after loading " + "the project and user names bases on " + "the IDs [%s].", sample) + + sample_dict = ( + publisher_utils.meter_message_from_counter( + sample, self._telemetry_secret + )) + if self._batch_size: + if len(sample_batch) >= self._batch_size: self._send_notification(sample_batch) + sample_batch = [] + sample_batch.append(sample_dict) + else: + self._send_notification([sample_dict]) - LOG.info("Finished polling pollster %(poll)s in the " - "context of %(src)s", dict(poll=pollster.name, - src=source_name)) - except plugin_base.PollsterPermanentError as err: - LOG.error( - 'Prevent pollster %(name)s from ' - 'polling %(res_list)s on source %(source)s anymore!', - dict(name=pollster.name, - res_list=str(err.fail_res_list), - source=source_name)) - self.resources[key].blacklist.extend(err.fail_res_list) - except Exception as err: - LOG.error( - 'Continue after error from %(name)s: %(error)s' - % ({'name': pollster.name, 'error': err}), - exc_info=True) + if sample_batch: + self._send_notification(sample_batch) + + LOG.info("Finished polling pollster %(poll)s in the " + "context of %(src)s", dict(poll=pollster.name, + src=source_name)) + except plugin_base.PollsterPermanentError as err: + LOG.error( + 'Prevent pollster %(name)s from ' + 'polling %(res_list)s on source %(source)s anymore!', + dict(name=pollster.name, + res_list=str(err.fail_res_list), + source=source_name)) + self.resources[key].blacklist.extend(err.fail_res_list) + except Exception as err: + LOG.error( + 'Continue after error from %(name)s: %(error)s' + % ({'name': pollster.name, 'error': err}), + exc_info=True) def _send_notification(self, samples): if self.manager.conf.polling.enable_notifications: diff --git a/ceilometer/tests/unit/polling/test_manager.py b/ceilometer/tests/unit/polling/test_manager.py index 16af318d4e..b0dcdabd5e 100644 --- a/ceilometer/tests/unit/polling/test_manager.py +++ b/ceilometer/tests/unit/polling/test_manager.py @@ -254,10 +254,13 @@ class BaseAgent(base.BaseTestCase): class DiscoveryException(TestDiscoveryException): params = [] - def setup_polling(self, poll_cfg=None): + def setup_polling(self, poll_cfg=None, override_conf=None): name = self.cfg2file(poll_cfg or self.polling_cfg) - self.CONF.set_override('cfg_file', name, group='polling') - self.mgr.polling_manager = manager.PollingManager(self.CONF) + + conf_to_use = override_conf or self.CONF + + conf_to_use.set_override('cfg_file', name, group='polling') + self.mgr.polling_manager = manager.PollingManager(conf_to_use) def create_manager(self): queue = multiprocessing.Queue() @@ -687,6 +690,31 @@ class TestPollingAgent(BaseAgent): mock.call('Polster heartbeat update: test') ]) + @mock.patch('ceilometer.polling.manager.LOG') + def test_polling_and_notify_with_resources_with_threads(self, log_mock): + conf_to_use = self.CONF + conf_to_use.set_override( + 'threads_to_process_pollsters', 4, group='polling') + + self.setup_polling(override_conf=conf_to_use) + + polling_task = list(self.mgr.setup_polling_tasks().values())[0] + polling_task.poll_and_notify() + + log_mock.info.assert_has_calls([ + mock.call('Polling pollster %(poll)s in the context of %(src)s', + {'poll': 'test', 'src': 'test_polling'}), + mock.call('Finished polling pollster %(poll)s in the context ' + 'of %(src)s', {'poll': 'test', 'src': 'test_polling'}) + ]) + log_mock.debug.assert_has_calls([ + mock.call('Polster heartbeat update: test') + ]) + + # Even though we enabled 4 threads, we have only one metric configured. + # Therefore, there should be only one call here. + self.assertEqual(1, polling_task.manager.notifier.sample.call_count) + @mock.patch('ceilometer.polling.manager.LOG') def test_skip_polling_and_notify_with_no_resources(self, LOG): self.polling_cfg['sources'][0]['resources'] = [] @@ -694,9 +722,9 @@ class TestPollingAgent(BaseAgent): polling_task = list(self.mgr.setup_polling_tasks().values())[0] pollster = list(polling_task.pollster_matches['test_polling'])[0] polling_task.poll_and_notify() - LOG.debug.assert_called_with( - 'Skip pollster %(name)s, no %(p_context)s resources found this ' - 'cycle', {'name': pollster.name, 'p_context': ''}) + LOG.debug.assert_has_calls([mock.call( + 'Skip pollster %(name)s, no %(p_context)s resources found ' + 'this cycle', {'name': pollster.name, 'p_context': ''})]) @mock.patch('ceilometer.polling.manager.LOG') def test_skip_polling_polled_resources(self, LOG): @@ -709,9 +737,9 @@ class TestPollingAgent(BaseAgent): self.setup_polling() polling_task = list(self.mgr.setup_polling_tasks().values())[0] polling_task.poll_and_notify() - LOG.debug.assert_called_with( - 'Skip pollster %(name)s, no %(p_context)s resources found this ' - 'cycle', {'name': 'test', 'p_context': 'new'}) + LOG.debug.assert_has_calls([mock.call( + 'Skip pollster %(name)s, no %(p_context)s resources found ' + 'this cycle', {'name': 'test', 'p_context': 'new'})]) @mock.patch('oslo_utils.timeutils.utcnow') def test_polling_samples_timestamp(self, mock_utc): diff --git a/releasenotes/notes/threeads-process-pollsters-cbd22cca6f2effc4.yaml b/releasenotes/notes/threeads-process-pollsters-cbd22cca6f2effc4.yaml new file mode 100644 index 0000000000..c1912ba88f --- /dev/null +++ b/releasenotes/notes/threeads-process-pollsters-cbd22cca6f2effc4.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Introduce ``threads_to_process_pollsters`` to enable operators to define + the number of pollsters that can be executed in parallel inside a + polling task.