diff --git a/ceilometer/polling/manager.py b/ceilometer/polling/manager.py index 2c0e8ae982..d344492f7b 100644 --- a/ceilometer/polling/manager.py +++ b/ceilometer/polling/manager.py @@ -117,6 +117,21 @@ POLLING_OPTS = [ default=None, help='The private key to allow this ceilometer to ' 'expose tls scrape endpoints'), + cfg.IntOpt('threads_to_process_pollsters', + default=1, + min=0, + help='The number of threads used to process the pollsters.' + 'The value one (1) means that the processing is in a' + 'serial fashion (not ordered!). The value zero (0) means ' + 'that the we will use as much threads as the number of ' + 'pollsters configured in the polling task. Any other' + 'positive integer can be used to fix an upper bound limit' + 'to the number of threads used for processing pollsters in' + 'parallel. One must bear in mind that, using more than one' + 'thread might not take full advantage of the discovery ' + 'cache and pollsters cache processes; it is possible ' + 'though to improve/use pollsters that synchronize ' + 'themselves in the cache objects.'), ] @@ -215,155 +230,211 @@ class PollingTask: poll_history = {} for source_name, pollsters in iter_random( self.pollster_matches.items()): - for pollster in iter_random(pollsters): - key = Resources.key(source_name, pollster) - candidate_res = list( - self.resources[key].get(discovery_cache)) - if not candidate_res and pollster.obj.default_discovery: - LOG.debug("Executing discovery process for pollsters [%s] " - "and discovery method [%s] via process [%s].", - pollster.obj, pollster.obj.default_discovery, - self.manager.discover) + self.execute_polling_task_processing(cache, discovery_cache, + poll_history, pollsters, + source_name) - candidate_res = self.manager.discover( - [pollster.obj.default_discovery], discovery_cache) + def execute_polling_task_processing(self, cache, discovery_cache, + poll_history, pollsters, source_name): + all_pollsters = list(pollsters) + number_workers_for_pollsters =\ + self.manager.conf.polling.threads_to_process_pollsters - # Remove duplicated resources and black resources. Using - # set() requires well defined __hash__ for each resource. - # Since __eq__ is defined, 'not in' is safe here. - polling_resources = [] - black_res = self.resources[key].blacklist - history = poll_history.get(pollster.name, []) - for x in candidate_res: - if x not in history: - history.append(x) - if x not in black_res: - polling_resources.append(x) - poll_history[pollster.name] = history + if number_workers_for_pollsters < 0: + raise RuntimeError("The configuration " + "'threads_to_process_pollsters' has a negative " + "value [%s], which should not be allowed.", + number_workers_for_pollsters) - # If no resources, skip for this pollster - if not polling_resources: - p_context = 'new' if history else '' - LOG.debug("Skip pollster %(name)s, no %(p_context)s " - "resources found this cycle", - {'name': pollster.name, 'p_context': p_context}) - continue + if number_workers_for_pollsters == 0: + number_workers_for_pollsters = len(all_pollsters) - LOG.info("Polling pollster %(poll)s in the context of " - "%(src)s", - dict(poll=pollster.name, src=source_name)) - try: - source_obj = self.sources_map[source_name] - coordination_group_name = source_obj.group_for_coordination + if number_workers_for_pollsters < len(all_pollsters): + LOG.debug("The number of pollsters in source [%s] is bigger " + "than the number of worker threads to execute them. " + "Therefore, one can expect the process to be longer " + "than the expected.", source_name) - LOG.debug("Checking if we need coordination for pollster " - "[%s] with coordination group name [%s].", - pollster, coordination_group_name) - if self.manager.hashrings and self.manager.hashrings.get( - coordination_group_name): - LOG.debug("The pollster [%s] is configured in a " - "source for polling that requires " - "coordination under name [%s].", pollster, - coordination_group_name) - group_coordination = self.manager.hashrings[ - coordination_group_name].belongs_to_self( - str(pollster.name)) + all_pollster_scheduled = [] + with futures.ThreadPoolExecutor( + thread_name_prefix="Pollster-executor", + max_workers=number_workers_for_pollsters) as executor: + LOG.debug("Processing pollsters for [%s] with [%s] threads.", + source_name, number_workers_for_pollsters) - LOG.debug("Pollster [%s] is configured with " - "coordination [%s] under name [%s].", - pollster.name, group_coordination, - coordination_group_name) - if not group_coordination: - LOG.info("The pollster [%s] should be processed " - "by other node.", pollster.name) - continue - else: - LOG.debug("The pollster [%s] is not configured in a " - "source for polling that requires " - "coordination. The current hashrings are " - "the following [%s].", pollster, - self.manager.hashrings) + for pollster in all_pollsters: + all_pollster_scheduled.append( + self.register_pollster_execution( + cache, discovery_cache, executor, poll_history, + pollster, source_name)) - polling_timestamp = timeutils.utcnow().isoformat() - samples = pollster.obj.get_samples( - manager=self.manager, - cache=cache, - resources=polling_resources - ) - sample_batch = [] + for s in all_pollster_scheduled: + LOG.debug(s.result()) - self.manager.heartbeat(pollster.name, polling_timestamp) + def register_pollster_execution(self, cache, discovery_cache, executor, + poll_history, pollster, source_name): + LOG.debug("Registering pollster [%s] from source [%s] to be executed " + "via executor [%s] with cache [%s], pollster history [%s], " + "and discovery cache [%s].", pollster, source_name, executor, + cache, poll_history, discovery_cache) - for sample in samples: - # Note(yuywz): Unify the timestamp of polled samples - sample.set_timestamp(polling_timestamp) + def _internal_function(): + self._internal_pollster_run(cache, discovery_cache, poll_history, + pollster, source_name) + return "Finished processing pollster [%s]." % pollster.name - if self._name_discovery and self._cache: + return executor.submit(_internal_function) - # Try to resolve project UUIDs from cache first, - # and then keystone - LOG.debug("Ceilometer is configured to resolve " - "project IDs to name; loading the " - "project name for project ID [%s] in " - "sample [%s].", sample.project_id, - sample) - if sample.project_id: - sample.project_name = \ - self._cache.resolve_uuid_from_cache( - "projects", - sample.project_id - ) + def _internal_pollster_run(self, cache, discovery_cache, poll_history, + pollster, source_name): + key = Resources.key(source_name, pollster) + candidate_res = list( + self.resources[key].get(discovery_cache)) + if not candidate_res and pollster.obj.default_discovery: + LOG.debug("Executing discovery process for pollsters [%s] " + "and discovery method [%s] via process [%s].", + pollster.obj, pollster.obj.default_discovery, + self.manager.discover) - # Try to resolve user UUIDs from cache first, - # and then keystone + candidate_res = self.manager.discover( + [pollster.obj.default_discovery], discovery_cache) - LOG.debug("Ceilometer is configured to resolve " - "user IDs to name; loading the " - "user name for user ID [%s] in " - "sample [%s].", sample.user_id, - sample) - if sample.user_id: - sample.user_name = \ - self._cache.resolve_uuid_from_cache( - "users", - sample.user_id - ) + # Remove duplicated resources and black resources. Using + # set() requires well defined __hash__ for each resource. + # Since __eq__ is defined, 'not in' is safe here. + polling_resources = [] + black_res = self.resources[key].blacklist + history = poll_history.get(pollster.name, []) + for x in candidate_res: + if x not in history: + history.append(x) + if x not in black_res: + polling_resources.append(x) + poll_history[pollster.name] = history - LOG.debug("Final sample generated after loading " - "the project and user names bases on " - "the IDs [%s].", sample) + # If no resources, skip for this pollster + if not polling_resources: + p_context = 'new' if history else '' + LOG.debug("Skip pollster %(name)s, no %(p_context)s " + "resources found this cycle", + {'name': pollster.name, 'p_context': p_context}) + return - sample_dict = ( - publisher_utils.meter_message_from_counter( - sample, self._telemetry_secret - )) - if self._batch_size: - if len(sample_batch) >= self._batch_size: - self._send_notification(sample_batch) - sample_batch = [] - sample_batch.append(sample_dict) - else: - self._send_notification([sample_dict]) + LOG.info("Polling pollster %(poll)s in the context of " + "%(src)s", + dict(poll=pollster.name, src=source_name)) + try: + source_obj = self.sources_map[source_name] + coordination_group_name = source_obj.group_for_coordination - if sample_batch: + LOG.debug("Checking if we need coordination for pollster " + "[%s] with coordination group name [%s].", + pollster, coordination_group_name) + if self.manager.hashrings and self.manager.hashrings.get( + coordination_group_name): + LOG.debug("The pollster [%s] is configured in a " + "source for polling that requires " + "coordination under name [%s].", pollster, + coordination_group_name) + group_coordination = self.manager.hashrings[ + coordination_group_name].belongs_to_self( + str(pollster.name)) + + LOG.debug("Pollster [%s] is configured with " + "coordination [%s] under name [%s].", + pollster.name, group_coordination, + coordination_group_name) + if not group_coordination: + LOG.info("The pollster [%s] should be processed " + "by other node.", pollster.name) + return + else: + LOG.debug("The pollster [%s] is not configured in a " + "source for polling that requires " + "coordination. The current hashrings are " + "the following [%s].", pollster, + self.manager.hashrings) + + polling_timestamp = timeutils.utcnow().isoformat() + samples = pollster.obj.get_samples( + manager=self.manager, + cache=cache, + resources=polling_resources + ) + sample_batch = [] + + self.manager.heartbeat(pollster.name, polling_timestamp) + + for sample in samples: + # Note(yuywz): Unify the timestamp of polled samples + sample.set_timestamp(polling_timestamp) + + if self._name_discovery and self._cache: + + # Try to resolve project UUIDs from cache first, + # and then keystone + LOG.debug("Ceilometer is configured to resolve " + "project IDs to name; loading the " + "project name for project ID [%s] in " + "sample [%s].", sample.project_id, + sample) + if sample.project_id: + sample.project_name = \ + self._cache.resolve_uuid_from_cache( + "projects", + sample.project_id + ) + + # Try to resolve user UUIDs from cache first, + # and then keystone + LOG.debug("Ceilometer is configured to resolve " + "user IDs to name; loading the " + "user name for user ID [%s] in " + "sample [%s].", sample.user_id, + sample) + + if sample.user_id: + sample.user_name = \ + self._cache.resolve_uuid_from_cache( + "users", + sample.user_id + ) + + LOG.debug("Final sample generated after loading " + "the project and user names bases on " + "the IDs [%s].", sample) + + sample_dict = ( + publisher_utils.meter_message_from_counter( + sample, self._telemetry_secret + )) + if self._batch_size: + if len(sample_batch) >= self._batch_size: self._send_notification(sample_batch) + sample_batch = [] + sample_batch.append(sample_dict) + else: + self._send_notification([sample_dict]) - LOG.info("Finished polling pollster %(poll)s in the " - "context of %(src)s", dict(poll=pollster.name, - src=source_name)) - except plugin_base.PollsterPermanentError as err: - LOG.error( - 'Prevent pollster %(name)s from ' - 'polling %(res_list)s on source %(source)s anymore!', - dict(name=pollster.name, - res_list=str(err.fail_res_list), - source=source_name)) - self.resources[key].blacklist.extend(err.fail_res_list) - except Exception as err: - LOG.error( - 'Continue after error from %(name)s: %(error)s' - % ({'name': pollster.name, 'error': err}), - exc_info=True) + if sample_batch: + self._send_notification(sample_batch) + + LOG.info("Finished polling pollster %(poll)s in the " + "context of %(src)s", dict(poll=pollster.name, + src=source_name)) + except plugin_base.PollsterPermanentError as err: + LOG.error( + 'Prevent pollster %(name)s from ' + 'polling %(res_list)s on source %(source)s anymore!', + dict(name=pollster.name, + res_list=str(err.fail_res_list), + source=source_name)) + self.resources[key].blacklist.extend(err.fail_res_list) + except Exception as err: + LOG.error( + 'Continue after error from %(name)s: %(error)s' + % ({'name': pollster.name, 'error': err}), + exc_info=True) def _send_notification(self, samples): if self.manager.conf.polling.enable_notifications: diff --git a/ceilometer/tests/unit/polling/test_manager.py b/ceilometer/tests/unit/polling/test_manager.py index 16af318d4e..b0dcdabd5e 100644 --- a/ceilometer/tests/unit/polling/test_manager.py +++ b/ceilometer/tests/unit/polling/test_manager.py @@ -254,10 +254,13 @@ class BaseAgent(base.BaseTestCase): class DiscoveryException(TestDiscoveryException): params = [] - def setup_polling(self, poll_cfg=None): + def setup_polling(self, poll_cfg=None, override_conf=None): name = self.cfg2file(poll_cfg or self.polling_cfg) - self.CONF.set_override('cfg_file', name, group='polling') - self.mgr.polling_manager = manager.PollingManager(self.CONF) + + conf_to_use = override_conf or self.CONF + + conf_to_use.set_override('cfg_file', name, group='polling') + self.mgr.polling_manager = manager.PollingManager(conf_to_use) def create_manager(self): queue = multiprocessing.Queue() @@ -687,6 +690,31 @@ class TestPollingAgent(BaseAgent): mock.call('Polster heartbeat update: test') ]) + @mock.patch('ceilometer.polling.manager.LOG') + def test_polling_and_notify_with_resources_with_threads(self, log_mock): + conf_to_use = self.CONF + conf_to_use.set_override( + 'threads_to_process_pollsters', 4, group='polling') + + self.setup_polling(override_conf=conf_to_use) + + polling_task = list(self.mgr.setup_polling_tasks().values())[0] + polling_task.poll_and_notify() + + log_mock.info.assert_has_calls([ + mock.call('Polling pollster %(poll)s in the context of %(src)s', + {'poll': 'test', 'src': 'test_polling'}), + mock.call('Finished polling pollster %(poll)s in the context ' + 'of %(src)s', {'poll': 'test', 'src': 'test_polling'}) + ]) + log_mock.debug.assert_has_calls([ + mock.call('Polster heartbeat update: test') + ]) + + # Even though we enabled 4 threads, we have only one metric configured. + # Therefore, there should be only one call here. + self.assertEqual(1, polling_task.manager.notifier.sample.call_count) + @mock.patch('ceilometer.polling.manager.LOG') def test_skip_polling_and_notify_with_no_resources(self, LOG): self.polling_cfg['sources'][0]['resources'] = [] @@ -694,9 +722,9 @@ class TestPollingAgent(BaseAgent): polling_task = list(self.mgr.setup_polling_tasks().values())[0] pollster = list(polling_task.pollster_matches['test_polling'])[0] polling_task.poll_and_notify() - LOG.debug.assert_called_with( - 'Skip pollster %(name)s, no %(p_context)s resources found this ' - 'cycle', {'name': pollster.name, 'p_context': ''}) + LOG.debug.assert_has_calls([mock.call( + 'Skip pollster %(name)s, no %(p_context)s resources found ' + 'this cycle', {'name': pollster.name, 'p_context': ''})]) @mock.patch('ceilometer.polling.manager.LOG') def test_skip_polling_polled_resources(self, LOG): @@ -709,9 +737,9 @@ class TestPollingAgent(BaseAgent): self.setup_polling() polling_task = list(self.mgr.setup_polling_tasks().values())[0] polling_task.poll_and_notify() - LOG.debug.assert_called_with( - 'Skip pollster %(name)s, no %(p_context)s resources found this ' - 'cycle', {'name': 'test', 'p_context': 'new'}) + LOG.debug.assert_has_calls([mock.call( + 'Skip pollster %(name)s, no %(p_context)s resources found ' + 'this cycle', {'name': 'test', 'p_context': 'new'})]) @mock.patch('oslo_utils.timeutils.utcnow') def test_polling_samples_timestamp(self, mock_utc): diff --git a/releasenotes/notes/threeads-process-pollsters-cbd22cca6f2effc4.yaml b/releasenotes/notes/threeads-process-pollsters-cbd22cca6f2effc4.yaml new file mode 100644 index 0000000000..c1912ba88f --- /dev/null +++ b/releasenotes/notes/threeads-process-pollsters-cbd22cca6f2effc4.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Introduce ``threads_to_process_pollsters`` to enable operators to define + the number of pollsters that can be executed in parallel inside a + polling task.