diff --git a/cloudkitty/common/config.py b/cloudkitty/common/config.py index 9052d5b2..a7d34c82 100644 --- a/cloudkitty/common/config.py +++ b/cloudkitty/common/config.py @@ -31,6 +31,7 @@ import cloudkitty.storage import cloudkitty.storage.v1.hybrid.backends.gnocchi import cloudkitty.storage.v2.elasticsearch import cloudkitty.storage.v2.influx +import cloudkitty.storage.v2.loki import cloudkitty.storage.v2.opensearch import cloudkitty.utils @@ -70,6 +71,8 @@ _opts = [ cloudkitty.storage.v2.opensearch.opensearch_storage_opts))), ('storage_gnocchi', list(itertools.chain( cloudkitty.storage.v1.hybrid.backends.gnocchi.gnocchi_storage_opts))), + ('storage_loki', list(itertools.chain( + cloudkitty.storage.v2.loki.loki_storage_opts))), (None, list(itertools.chain( cloudkitty.api.app.auth_opts, cloudkitty.service.service_opts))), diff --git a/cloudkitty/storage/v2/loki/__init__.py b/cloudkitty/storage/v2/loki/__init__.py new file mode 100644 index 00000000..bef4cdd4 --- /dev/null +++ b/cloudkitty/storage/v2/loki/__init__.py @@ -0,0 +1,178 @@ +# Copyright 2025 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +import datetime +import json + +from oslo_config import cfg +from oslo_log import log as oslo_logging + +from cloudkitty import dataframe +from cloudkitty.storage import v2 as v2_storage +from cloudkitty.storage.v2.loki import client as os_client +from cloudkitty.utils import tz as tzutils + +LOG = oslo_logging.getLogger(__name__) + +CONF = cfg.CONF + +LOKI_STORAGE_GROUP = 'storage_loki' + +loki_storage_opts = [ + cfg.StrOpt( + 'url', + help='Loki base url. Defaults to ' + 'http://localhost:3100/loki/api/v1', + default='http://localhost:3100/loki/api/v1'), + cfg.StrOpt( + 'tenant', + help='The loki tenant to be used. Defaults to tenant1.', + default='tenant1'), + cfg.DictOpt( + 'stream', + help='The labels that are going to be used to define the Loki stream ' + 'as Python dict. Defaults to {"service": "cloudkitty"}.', + default={"service": "cloudkitty"}), + cfg.IntOpt( + 'buffer_size', + help='The number of messages that will be grouped together before ' + 'launching a Loki HTTP POST request.', + default=1), + cfg.StrOpt( + 'content_type', + help='The http Content-Type that will be used to send info to Loki. ' + 'Defaults to application/json. It can also be ' + 'application/x-protobuf', + default='application/json'), + cfg.BoolOpt( + 'insecure', + help='Set to true to allow insecure HTTPS connections to Loki', + default=False), + cfg.StrOpt( + 'cafile', + help='Path of the CA certificate to trust for HTTPS connections.', + default=None) +] + +CONF.register_opts(loki_storage_opts, LOKI_STORAGE_GROUP) + + +class LokiStorage(v2_storage.BaseStorage): + + def __init__(self, *args, **kwargs): + super(LokiStorage, self).__init__(*args, **kwargs) + + verify = not CONF.storage_loki.insecure + if verify and CONF.storage_loki.cafile: + verify = CONF.storage_loki.cafile + + self._conn = os_client.LokiClient( + CONF.storage_loki.url, + CONF.storage_loki.tenant, + CONF.storage_loki.stream, + CONF.storage_loki.content_type, + CONF.storage_loki.buffer_size) + + def init(self): + LOG.debug('LokiStorage Init.') + + def push(self, dataframes, scope_id=None): + for frame in dataframes: + for type_, point in frame.iterpoints(): + start, end = self._local_to_utc(frame.start, frame.end) + self._conn.add_point(point, type_, start, end) + + @staticmethod + def _local_to_utc(*args): + return [tzutils.local_to_utc(arg) for arg in args] + + @staticmethod + def _log_to_datapoint(labels): + return dataframe.DataPoint( + labels['unit'], + labels['qty'], + labels['price'], + labels['groupby'], + labels['metadata'], + ) + + def _build_dataframes(self, logs): + dataframes = {} + for log in logs: + labels = json.loads(log['values'][0][1]) + start = tzutils.dt_from_iso(labels['start']) + end = tzutils.dt_from_iso(labels['end']) + key = (start, end) + if key not in dataframes.keys(): + dataframes[key] = dataframe.DataFrame(start=start, end=end) + dataframes[key].add_point( + self._log_to_datapoint(labels), labels['type']) + + output = list(dataframes.values()) + output.sort(key=lambda frame: (frame.start, frame.end)) + return output + + def retrieve(self, begin=None, end=None, + filters=None, + metric_types=None, + offset=0, limit=1000, paginate=True): + begin, end = self._local_to_utc(begin or tzutils.get_month_start(), + end or tzutils.get_next_month()) + total, logs = self._conn.retrieve( + begin, end, filters, metric_types, limit) + dataframes = self._build_dataframes(logs) + return { + 'total': total, + 'dataframes': dataframes + } + + def delete(self, begin=None, end=None, filters=None): + self._conn.delete(begin, end, filters) + + @staticmethod + def _normalize_time(t): + if isinstance(t, datetime.datetime): + return tzutils.utc_to_local(t) + return tzutils.dt_from_iso(t) + + def _doc_to_total_result(self, doc, start, end): + output = { + 'begin': self._normalize_time(doc.get('start', start)), + 'end': self._normalize_time(doc.get('end', end)), + 'qty': doc['sum_qty']['value'], + 'rate': doc['sum_price']['value'], + } + if 'key' in doc.keys(): + for key, value in doc['key'].items(): + output[key] = value + return output + + def total(self, groupby=None, begin=None, end=None, metric_types=None, + filters=None, custom_fields=None, offset=0, limit=1000, + paginate=False): + begin, end = self._local_to_utc(begin or tzutils.get_month_start(), + end or tzutils.get_next_month()) + + total, docs = self._conn.total(begin, end, metric_types, filters, + groupby, custom_fields=custom_fields, + offset=offset, limit=limit, + paginate=False) + results = [ + self._doc_to_total_result(doc, begin, end) for doc in docs + ] + + return { + 'total': total, + 'results': results, + } diff --git a/cloudkitty/storage/v2/loki/client.py b/cloudkitty/storage/v2/loki/client.py new file mode 100644 index 00000000..2b752123 --- /dev/null +++ b/cloudkitty/storage/v2/loki/client.py @@ -0,0 +1,294 @@ +# Copyright 2025 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +from oslo_log import log +import requests + +from cloudkitty.storage.v2.loki import exceptions +from cloudkitty.utils import json + +LOG = log.getLogger(__name__) + + +class LokiClient(object): + """Class used to ease interaction with Loki.""" + + def __init__(self, url, tenant, stream_labels, content_type, buffer_size): + if content_type != "application/json": + raise exceptions.UnsupportedContentType(content_type) + + self._base_url = url.strip('/') + self._stream_labels = stream_labels + self._headers = { + 'X-Scope-OrgID': tenant, + 'Content-Type': content_type + } + self._buffer_size = buffer_size + self._points = [] + + def _build_payload_json(self, batch): + payload = { + "streams": [ + { + "stream": self._stream_labels, + "values": batch + } + ] + } + return payload + + def _dict_to_loki_query(self, tags_dict, groupby=False, brackets=True): + """Converts from Python dict to Loki query language.""" + if not tags_dict: + return '{}' + + pairs = [] + for key, value in tags_dict.items(): + if isinstance(value, list): + value = value[0] + if isinstance(value, str): + value = value.replace('"', '\\"') + if groupby: + pairs.append(f'groupby_{key}="{value}"') + else: + pairs.append(f'{key}="{value}"') + + if brackets: + return '{' + ', '.join(pairs) + '}' + else: + return ', '.join(pairs) + + def _base_query(self): + """Makes sure that we always get json results.""" + return self._dict_to_loki_query(self._stream_labels) + ' | json' + + def search(self, query, begin, end, limit): + url = f"{self._base_url}/query_range" + + if query is None: + query = self._base_query() + + params = { + "query": query, + "start": int(begin.timestamp() * 1_000_000_000), + "end": int(end.timestamp() * 1_000_000_000), + "limit": limit + } + + response = requests.get(url, params=params, headers=self._headers) + + if response.status_code == 200: + data = response.json()['data'] + else: + msg = (f"Failed to query logs or empty result: " + f"{response.status_code} - {response.text}") + LOG.error(msg) + data = [] + return data + + def push(self): + """Send messages to Loki in batches.""" + url = f"{self._base_url}/push" + + while self._points: + payload = self._build_payload_json(self._points) + response = requests.post(url, json=payload, headers=self._headers) + + if response.status_code == 204: + LOG.debug( + f"Batch of {len(self._points)} messages pushed " + f"successfully." + ) + self._points = [] + else: + LOG.error( + f"Failed to push logs: {response.status_code} - " + f"{response.text}" + ) + break + + def delete_by_query(self, query, begin, end): + url = f"{self._base_url}/delete" + + if query is None: + query = self._base_query() + + params = { + "query": query, + "start": int(begin.timestamp()), + "end": int(end.timestamp()), + } + + LOG.debug(f"Request Params: {params}") + response = requests.post(url, params=params, headers=self._headers) + + if response.status_code == 204: + LOG.debug( + "Dataframes deleted successfully." + ) + else: + LOG.error( + f"Failed to delete dataframes: {response.status_code} - " + f"{response.text}" + ) + + def delete(self, begin, end, filters): + query = self._base_query() + loki_query_parts = [] + if filters: + loki_query_parts.append( + self._dict_to_loki_query( + filters, groupby=True, brackets=False + ) + ) + + if loki_query_parts: + query += ' | ' + ', '.join(loki_query_parts) + + self.delete_by_query(query, begin, end) + + def retrieve(self, begin, end, filters, metric_types, limit): + """Retrieves dataframes stored in Loki.""" + query = self._base_query() + loki_query_parts = [] + + if filters: + loki_query_parts.append( + self._dict_to_loki_query( + filters, groupby=True, brackets=False + ) + ) + + if metric_types: + if isinstance(metric_types, list): + current_metric_type = metric_types[0] + else: + current_metric_type = metric_types + loki_query_parts.append(f'type = "{current_metric_type}"') + + if loki_query_parts: + query += ' | ' + ', '.join(loki_query_parts) + + data_response = self.search(query, begin, end, limit) + + if not isinstance(data_response, dict) or \ + 'stats' not in data_response or \ + 'result' not in data_response: + LOG.warning( + f"Data from Loki search is not in the expected dictionary " + f"format or is missing keys. Query: '{query}'. Response " + f"received: {data_response}" + ) + return 0, [] + + total = data_response.get('stats', {})\ + .get('summary', {})\ + .get('totalEntriesReturned', 0) + output = data_response.get('result', []) + + return total, output + + def add_point(self, point, type, start, end): + """Append a point to the client.""" + timestamp_ns = int(end.timestamp() * 1_000_000_000) + timestamp = str(timestamp_ns) + + data = { + 'start': start, + 'end': end, + 'type': type, + 'unit': point.unit, + 'description': point.description, + 'qty': point.qty, + 'price': point.price, + 'groupby': point.groupby, + 'metadata': point.metadata, + } + + log_line = json.dumps(data) + self._points.append([timestamp, log_line]) + + if len(self._points) >= self._buffer_size: + self.push() + + def total(self, begin, end, metric_types, filters, groupby, + custom_fields, offset, limit, paginate): + """Calculate total sum of 'price' and 'qty' for entries. + + This method calculates totals for entries that match the specified + groupby value. + """ + if custom_fields: + LOG.warning( + "'custom_fields' are not implemented yet for Loki. " + "Therefore, the custom fields [%s] informed by the user " + "will be ignored.", custom_fields + ) + + if offset != 0: + LOG.warning("offset is not supported by Loki.") + + total_count, data = self.retrieve( + begin, end, filters, metric_types, limit + ) + + if not groupby: + total_qty = 0.0 + total_price = 0.0 + for item in data: + stream = item.get('stream', {}) + qty = float(stream.get('qty', 0)) + price = float(stream.get('price', 0)) + + total_qty += qty + total_price += price + + return 1, [{ + 'sum_qty': {'value': total_qty}, + 'sum_price': {'value': total_price} + }] + + grouped_data = {} + for item in data: + stream = item.get('stream', {}) + qty = float(stream.get('qty', 0)) + price = float(stream.get('price', 0)) + + key_parts = {} + for field in groupby: + if field == 'type': + key_parts[field] = stream.get(field, '') + else: + key_parts[field] = stream.get('groupby_' + field) + + key = tuple((k, v) for k, v in sorted(key_parts.items())) + + if key not in grouped_data: + grouped_data[key] = { + 'sum_qty': 0.0, + 'sum_price': 0.0, + 'key_parts': dict(key_parts) + } + + grouped_data[key]['sum_qty'] += qty + grouped_data[key]['sum_price'] += price + + result = [] + for _key_tuple, values in grouped_data.items(): + result.append({ + 'key': values['key_parts'], + 'sum_qty': {'value': values['sum_qty']}, + 'sum_price': {'value': values['sum_price']} + }) + return len(result), result diff --git a/cloudkitty/storage/v2/loki/exceptions.py b/cloudkitty/storage/v2/loki/exceptions.py new file mode 100644 index 00000000..e15af51b --- /dev/null +++ b/cloudkitty/storage/v2/loki/exceptions.py @@ -0,0 +1,33 @@ +# Copyright 2025 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + + +class BaseLokiException(Exception): + """Base exception raised by the Loki v2 storage driver""" + + +class UnsupportedContentType(BaseLokiException): + def __init__(self, content_type): + super(UnsupportedContentType, self).__init__( + "Content-Type {} is not supported. Use supported formats: " + "application/json".format(content_type) + ) + + +class NotImplementedYet(BaseLokiException): + def __init__(self, method): + super(NotImplementedYet, self).__init__( + "Method {} not implemented yet".format(method) + ) diff --git a/cloudkitty/tests/storage/v2/loki/__init__.py b/cloudkitty/tests/storage/v2/loki/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cloudkitty/tests/storage/v2/loki/test_client.py b/cloudkitty/tests/storage/v2/loki/test_client.py new file mode 100644 index 00000000..f5271927 --- /dev/null +++ b/cloudkitty/tests/storage/v2/loki/test_client.py @@ -0,0 +1,567 @@ +# Copyright 2025 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +from datetime import datetime +from datetime import timezone +import json +import unittest +from unittest.mock import call +from unittest.mock import MagicMock +from unittest.mock import patch + +from cloudkitty.storage.v2.loki import client +from cloudkitty.storage.v2.loki import exceptions + + +class MockDataPoint: + def __init__(self, unit="USD/h", description="desc", qty=1.0, price=10.0, + groupby=None, metadata=None): + self.unit = unit + self.description = description + self.qty = qty + self.price = price + self.groupby = groupby if groupby is not None \ + else {'project_id': 'proj1'} + self.metadata = metadata if metadata is not None \ + else {'meta_key': 'meta_val'} + + +@patch('cloudkitty.storage.v2.loki.client.requests', autospec=True) +@patch('cloudkitty.storage.v2.loki.client.LOG', autospec=True) +class TestLokiClient(unittest.TestCase): + + def setUp(self): + self.base_url = "http://loki:3100/loki/api/v1" + self.tenant = "test_tenant" + self.stream_labels = {"app": "cloudkitty", "source": "test"} + self.content_type = "application/json" + self.buffer_size = 2 + self.client = client.LokiClient( + self.base_url, + self.tenant, + self.stream_labels, + self.content_type, + self.buffer_size + ) + self.begin_dt = datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + self.end_dt = datetime(2024, 1, 1, 1, 0, 0, tzinfo=timezone.utc) + + def test_init_success(self, mock_log, mock_requests): + self.assertEqual(self.client._base_url, self.base_url) + self.assertEqual(self.client._stream_labels, self.stream_labels) + self.assertEqual(self.client._headers['X-Scope-OrgID'], self.tenant) + self.assertEqual(self.client._headers['Content-Type'], + self.content_type) + self.assertEqual(self.client._buffer_size, self.buffer_size) + self.assertEqual(self.client._points, []) + + def test_init_unsupported_content_type(self, mock_log, mock_requests): + with self.assertRaises(exceptions.UnsupportedContentType): + client.LokiClient(self.base_url, self.tenant, self.stream_labels, + "text/plain", self.buffer_size) + + def test_build_payload_json(self, mock_log, mock_requests): + batch = [["1609459200000000000", "log line 1"], + ["1609459200000000001", "log line 2"]] + payload = self.client._build_payload_json(batch) + expected_payload = { + "streams": [ + { + "stream": self.stream_labels, + "values": batch + } + ] + } + self.assertEqual(payload, expected_payload) + + def test_dict_to_loki_query(self, mock_log, mock_requests): + self.assertEqual(self.client._dict_to_loki_query({}), '{}') + self.assertEqual(self.client._dict_to_loki_query({"foo": "bar"}), + '{foo="bar"}') + self.assertEqual( + self.client._dict_to_loki_query({"foo": "bar", "baz": "qux"}), + '{foo="bar", baz="qux"}' + ) + self.assertIn('foo="bar"', self.client._dict_to_loki_query( + {"foo": "bar", "baz": "qux"})) + self.assertIn('baz="qux"', self.client._dict_to_loki_query( + {"foo": "bar", "baz": "qux"})) + + self.assertEqual( + self.client._dict_to_loki_query({"foo": ["bar", "baz"]}), + '{foo="bar"}' + ) + self.assertEqual( + self.client._dict_to_loki_query({"path": "/api/v1"}), + '{path="/api/v1"}' + ) + self.assertEqual( + self.client._dict_to_loki_query({"msg": 'hello "world"'}), + '{msg="hello \\"world\\""}' + ) + self.assertEqual( + self.client._dict_to_loki_query({"foo": "bar"}, groupby=True), + '{groupby_foo="bar"}' + ) + self.assertEqual( + self.client._dict_to_loki_query({"foo": "bar"}, brackets=False), + 'foo="bar"' + ) + self.assertEqual( + self.client._dict_to_loki_query( + {"foo": "bar", "baz": "qux"}, brackets=False + ), + 'foo="bar", baz="qux"' + ) + + def test_base_query(self, mock_log, mock_requests): + expected_query = (self.client._dict_to_loki_query(self.stream_labels) + + ' | json') + self.assertEqual(self.client._base_query(), expected_query) + + def test_search_success(self, mock_log, mock_requests): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "data": { + "resultType": "streams", + "result": [{"stream": {}, "values": []}] + } + } + mock_requests.get.return_value = mock_response + + query = '{app="test"} | json' + data = self.client.search(query, self.begin_dt, self.end_dt, 100) + + expected_url = f"{self.base_url}/query_range" + expected_params = { + "query": query, + "start": int(self.begin_dt.timestamp() * 1_000_000_000), + "end": int(self.end_dt.timestamp() * 1_000_000_000), + "limit": 100 + } + mock_requests.get.assert_called_once_with( + expected_url, + params=expected_params, + headers=self.client._headers + ) + self.assertEqual( + data, + {"resultType": "streams", "result": [{"stream": {}, "values": []}]} + ) + + def test_search_no_query_uses_base_query(self, mock_log, mock_requests): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"data": {"result": []}} + mock_requests.get.return_value = mock_response + + self.client.search(None, self.begin_dt, self.end_dt, 100) + + expected_query = self.client._base_query() + mock_requests.get.assert_called_once() + _called_args, called_kwargs = mock_requests.get.call_args + self.assertEqual(called_kwargs['params']['query'], expected_query) + + def test_search_failure(self, mock_log, mock_requests): + mock_response = MagicMock() + mock_response.status_code = 500 + mock_response.text = "Internal Server Error" + mock_requests.get.return_value = mock_response + + query = '{app="test"} | json' + data = self.client.search(query, self.begin_dt, self.end_dt, 100) + + self.assertEqual(data, []) + expected_msg = ("Failed to query logs or empty result: 500 - " + "Internal Server Error") + mock_log.error.assert_called_once_with(expected_msg) + + def test_push_success_batch(self, mock_log, mock_requests): + mock_response = MagicMock() + mock_response.status_code = 204 + mock_requests.post.return_value = mock_response + + self.client._points = [["ts1", "log1"], ["ts2", "log2"]] + self.client.push() + + expected_url = f"{self.base_url}/push" + expected_payload = self.client._build_payload_json( + [["ts1", "log1"], ["ts2", "log2"]] + ) + + mock_requests.post.assert_called_once_with( + expected_url, json=expected_payload, headers=self.client._headers + ) + self.assertEqual(self.client._points, []) + log_msg = "Batch of 2 messages pushed successfully." + mock_log.debug.assert_called_once_with(log_msg) + + def test_push_failure(self, mock_log, mock_requests): + mock_response = MagicMock() + mock_response.status_code = 400 + mock_response.text = "Bad Request" + mock_requests.post.return_value = mock_response + + initial_points = [["ts1", "log1"], ["ts2", "log2"]] + self.client._points = list(initial_points) + self.client.push() + + self.assertEqual(self.client._points, initial_points) + expected_msg = "Failed to push logs: 400 - Bad Request" + mock_log.error.assert_called_once_with(expected_msg) + + def test_push_no_points(self, mock_log, mock_requests): + self.client._points = [] + self.client.push() + mock_requests.post.assert_not_called() + + @patch.object(client.LokiClient, 'search') + def test_retrieve_no_filters_no_metric_types(self, mock_search, mock_log, + mock_requests_arg): + mock_search_result = { + "stats": {"summary": {"totalEntriesReturned": 5}}, + "result": [{"stream": {}, "values": [["ts", '{"key":"val"}']]}] + } + mock_search.return_value = mock_search_result + + total, output = self.client.retrieve(self.begin_dt, self.end_dt, + None, None, 100) + + expected_base_query = self.client._base_query() + mock_search.assert_called_once_with( + expected_base_query, self.begin_dt, self.end_dt, 100 + ) + self.assertEqual(total, 5) + self.assertEqual(output, mock_search_result["result"]) + + @patch.object(client.LokiClient, 'search') + def test_retrieve_with_filters_and_metric_type_string( + self, mock_search, mock_log, mock_requests_arg): + mock_search_result = { + "stats": {"summary": {"totalEntriesReturned": 2}}, + "result": [{"stream": {}, "values": [["ts", '{"type":"t1"}']]}] + } + mock_search.return_value = mock_search_result + filters = {"project_id": "proj1", "region": "reg1"} + metric_types = "cpu_util" + + total, output = self.client.retrieve(self.begin_dt, self.end_dt, + filters, metric_types, 50) + + base_query = self.client._base_query() + filter_query_part = self.client._dict_to_loki_query( + filters, groupby=True, brackets=False + ) + metric_query_part = f'type = "{metric_types}"' + expected_full_query = ( + f"{base_query} | {filter_query_part}, {metric_query_part}" + ) + + mock_search.assert_called_once_with( + expected_full_query, self.begin_dt, self.end_dt, 50 + ) + self.assertEqual(total, 2) + self.assertEqual(output, mock_search_result["result"]) + + @patch.object(client.LokiClient, 'search') + def test_retrieve_with_metric_type_list(self, mock_search, mock_log, + mock_requests_arg): + mock_search.return_value = { + "stats": {"summary": {"totalEntriesReturned": 1}}, + "result": ["data"] + } + metric_types = ["cpu_util", "ram_util"] + + self.client.retrieve(self.begin_dt, self.end_dt, None, + metric_types, 50) + + base_query = self.client._base_query() + metric_query_part = f'type = "{metric_types[0]}"' + expected_full_query = f"{base_query} | {metric_query_part}" + mock_search.assert_called_once_with( + expected_full_query, self.begin_dt, self.end_dt, 50 + ) + + @patch.object(client.LokiClient, 'search') + def test_retrieve_empty_or_malformed_search_response( + self, mock_search, mock_loki_client_log, mock_requests_arg): + mock_search.return_value = [] + expected_query_for_log = self.client._base_query() + + total, output = self.client.retrieve(self.begin_dt, self.end_dt, + None, None, 100) + + self.assertEqual(total, 0) + self.assertEqual(output, []) + + expected_log_message_case1 = ( + f"Data from Loki search is not in the expected dictionary format " + f"or is missing keys. Query: '{expected_query_for_log}'. " + f"Response received: {mock_search.return_value}" + ) + mock_loki_client_log.warning.assert_called_with( + expected_log_message_case1 + ) + + mock_search.reset_mock() + mock_loki_client_log.reset_mock() + + mock_search.return_value = {"nodata": True} + total, output = self.client.retrieve(self.begin_dt, self.end_dt, + None, None, 100) + + self.assertEqual(total, 0) + self.assertEqual(output, []) + + expected_log_message_case2 = ( + f"Data from Loki search is not in the expected dictionary format " + f"or is missing keys. Query: '{expected_query_for_log}'. " + f"Response received: {mock_search.return_value}" + ) + mock_loki_client_log.warning.assert_called_with( + expected_log_message_case2 + ) + + @patch.object(client.LokiClient, 'push') + def test_add_point_no_push(self, mock_push, mock_log, + mock_requests_arg): + self.client._buffer_size = 3 + point = MockDataPoint(qty=1, price=10) + + self.client.add_point(point, "test_type", self.begin_dt, self.end_dt) + + self.assertEqual(len(self.client._points), 1) + added_point_data = json.loads(self.client._points[0][1]) + self.assertEqual(added_point_data['type'], "test_type") + self.assertEqual(added_point_data['qty'], 1) + self.assertEqual(added_point_data['price'], 10) + self.assertEqual(added_point_data['start'], + "2024-01-01T00:00:00+00:00") + self.assertEqual(added_point_data['end'], "2024-01-01T01:00:00+00:00") + self.assertEqual(added_point_data['groupby'], point.groupby) + mock_push.assert_not_called() + + @patch.object(client.LokiClient, 'push') + def test_add_point_triggers_push(self, mock_push, mock_log, + mock_requests_arg): + self.client._buffer_size = 1 + point = MockDataPoint() + + self.client.add_point(point, "test_type", self.begin_dt, self.end_dt) + self.assertEqual(len(self.client._points), 1) + mock_push.assert_called_once() + + @patch.object(client.LokiClient, 'retrieve') + def test_total_no_groupby(self, mock_retrieve, mock_log, + mock_requests_arg): + loki_data_for_total = [ + { + "stream": {"groupby_project_id": "p1", "type": "t1", + "qty": "10.0", "price": "100.0"}, + "values": [["ts1", json.dumps( + {"qty": 10.0, "price": 100.0, "type": "t1", + "groupby": {"project_id": "p1"}})]] + }, + { + "stream": {"groupby_project_id": "p1", "type": "t1", + "qty": "5.0", "price": "50.0"}, + "values": [["ts2", json.dumps( + {"qty": 5.0, "price": 50.0, "type": "t1", + "groupby": {"project_id": "p1"}})]] + }, + ] + mock_retrieve.return_value = (2, loki_data_for_total) + + count, result = self.client.total( + self.begin_dt, self.end_dt, "some_type", None, None, + None, 0, 100, False + ) + + mock_retrieve.assert_called_once_with( + self.begin_dt, self.end_dt, None, "some_type", 100 + ) + self.assertEqual(count, 1) + self.assertEqual(len(result), 1) + self.assertAlmostEqual(result[0]['sum_qty']['value'], 15.0) + self.assertAlmostEqual(result[0]['sum_price']['value'], 150.0) + mock_log.warning.assert_not_called() + + @patch.object(client.LokiClient, 'retrieve') + def test_total_with_groupby(self, mock_retrieve, mock_log, + mock_requests_arg): + loki_data_for_total = [ + { + "stream": {"groupby_project_id": "proj1", "type": "typeA", + "qty": 10.0, "price": 100.0}, + "values": [["ts1", json.dumps( + {"qty": 10.0, "price": 100.0, "type": "typeA", + "groupby": {"project_id": "proj1"}})] + ] + }, + { + "stream": {"groupby_project_id": "proj1", "type": "typeB", + "qty": 5.0, "price": 50.0}, + "values": [["ts2", json.dumps( + {"qty": 5.0, "price": 50.0, "type": "typeB", + "groupby": {"project_id": "proj1"}})] + ] + }, + { + "stream": {"groupby_project_id": "proj2", "type": "typeA", + "qty": 2.0, "price": 20.0}, + "values": [["ts3", json.dumps( + {"qty": 2.0, "price": 20.0, "type": "typeA", + "groupby": {"project_id": "proj2"}})] + ] + }, + { + "stream": {"groupby_project_id": "proj1", "type": "typeA", + "qty": 8.0, "price": 80.0}, + "values": [["ts4", json.dumps( + {"qty": 8.0, "price": 80.0, "type": "typeA", + "groupby": {"project_id": "proj1"}})] + ] + }, + ] + mock_retrieve.return_value = (4, loki_data_for_total) + groupby_fields = ["type", "project_id"] + + count, result = self.client.total( + self.begin_dt, self.end_dt, "any_metric_type", + {"filter_key": "val"}, groupby_fields, + None, 0, 100, False + ) + + mock_retrieve.assert_called_once_with( + self.begin_dt, self.end_dt, {"filter_key": "val"}, + "any_metric_type", 100 + ) + self.assertEqual(count, 3) + + expected_results_map = { + tuple(sorted({'type': 'typeA', + 'project_id': 'proj1'}.items())): + {'qty': 18.0, 'price': 180.0}, + tuple(sorted({'type': 'typeB', + 'project_id': 'proj1'}.items())): + {'qty': 5.0, 'price': 50.0}, + tuple(sorted({'type': 'typeA', + 'project_id': 'proj2'}.items())): + {'qty': 2.0, 'price': 20.0}, + } + self.assertEqual(len(result), len(expected_results_map)) + for res_item in result: + key_from_result_tuple = tuple(sorted(res_item['key'].items())) + self.assertIn(key_from_result_tuple, expected_results_map) + expected_values = expected_results_map[key_from_result_tuple] + self.assertAlmostEqual(res_item['sum_qty']['value'], + expected_values['qty']) + self.assertAlmostEqual(res_item['sum_price']['value'], + expected_values['price']) + + @patch.object(client.LokiClient, 'retrieve') + def test_total_with_custom_fields_and_offset_logs_warnings( + self, mock_retrieve, mock_log, mock_requests_arg): + mock_retrieve.return_value = (0, []) + custom_fields = ["field1", "field2"] + offset = 5 + + self.client.total( + self.begin_dt, self.end_dt, None, None, None, + custom_fields, offset, 100, False + ) + + mock_log.warning.assert_any_call( + "'custom_fields' are not implemented yet for Loki. Therefore, " + "the custom fields [%s] informed by the user will be ignored.", + custom_fields + ) + mock_log.warning.assert_any_call("offset is not supported by Loki.") + + @patch.object(client.LokiClient, '_base_query') + def test_delete_by_query_success(self, mq, ml, mr): + mr.post.return_value = MagicMock(status_code=204) + test_query = '{app="cloudkitty"} | json ' \ + '| type="compute.instance.exists"}' + self.client.delete_by_query(test_query, self.begin_dt, self.end_dt) + mr.post.assert_called_once() + ml.debug.assert_has_calls([ + call(f"Request Params: {{'query': '{test_query}', " + f"'start': {int(self.begin_dt.timestamp())}, " + f"'end': {int(self.end_dt.timestamp())}}}"), + call("Dataframes deleted successfully.") + ]) + mq.assert_not_called() + + @patch.object(client.LokiClient, '_base_query') + def test_delete_by_query_failure(self, mock_base_query, mock_log, + mock_requests_arg): + mock_response = MagicMock() + mock_response.status_code = 500 + mock_response.text = "Internal Server Error" + mock_requests_arg.post.return_value = mock_response + + test_query = '{app="cloudkitty"} | json | ' + 'type="compute.instance.exists"' + self.client.delete_by_query(test_query, self.begin_dt, self.end_dt) + + expected_url = f"{self.base_url}/delete" + expected_params = { + "query": test_query, + "start": int(self.begin_dt.timestamp()), + "end": int(self.end_dt.timestamp()), + } + + mock_requests_arg.post.assert_called_once_with( + expected_url, + params=expected_params, + headers=self.client._headers + ) + expected_error_msg = ("Failed to delete dataframes: " + "500 - Internal Server Error") + mock_log.error.assert_called_once_with(expected_error_msg) + mock_base_query.assert_not_called() + + @patch.object(client.LokiClient, 'delete_by_query') + @patch.object(client.LokiClient, '_base_query') + def test_delete_with_filters(self, mock_base_query, mock_delete_by_query, + mock_log, mock_requests_arg): + mock_base_query.return_value = '{app="cloudkitty", source="test"} ' \ + '| json' + filters = {"project_id": "proj1", "resource_type": "instance"} + + self.client.delete(self.begin_dt, self.end_dt, filters) + + exp_query_filters = 'groupby_project_id="proj1", ' \ + 'groupby_resource_type="instance"' + exp_query = f'{mock_base_query.return_value} | {exp_query_filters}' + mock_delete_by_query.assert_called_once_with( + exp_query, self.begin_dt, self.end_dt + ) + mock_base_query.assert_called_once() + + @patch.object(client.LokiClient, 'delete_by_query') + @patch.object(client.LokiClient, '_base_query') + def test_delete_no_filters(self, mock_base_query, mock_delete_by_query, + mock_log, mock_requests_arg): + mock_base_query.return_value = '{app="cloudkitty", source="test"} ' \ + '| json' + + self.client.delete(self.begin_dt, self.end_dt, None) + + mock_delete_by_query.assert_called_once_with( + mock_base_query.return_value, self.begin_dt, self.end_dt + ) + mock_base_query.assert_called_once() diff --git a/cloudkitty/tests/storage/v2/loki_utils.py b/cloudkitty/tests/storage/v2/loki_utils.py new file mode 100644 index 00000000..457cfdc3 --- /dev/null +++ b/cloudkitty/tests/storage/v2/loki_utils.py @@ -0,0 +1,207 @@ +# Copyright 2025 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +from datetime import datetime +from datetime import timezone + +from cloudkitty.storage.v2.loki import client as loki_client_module +from cloudkitty.storage.v2.loki import exceptions as loki_exceptions +from cloudkitty.utils import json + + +class FakeLokiClient(loki_client_module.LokiClient): + + def __init__(self, url, tenant, stream_labels, content_type, + buffer_size, **kwargs): + if content_type != "application/json": + raise loki_exceptions.UnsupportedContentType(content_type) + + self._base_url = url.strip('/') if url else 'http://fake-loki' + self._stream_labels = stream_labels if stream_labels \ + else {"fake_label": "fake_value"} + self._headers = { + 'X-Scope-OrgID': tenant, + 'Content-Type': content_type + } + self._buffer_size = buffer_size + + self._logs = [] + self.init() + + def init(self): + self._logs = [] + + def add_point(self, point, type, start, end): + loki_timestamp_ns_str = str( + int(datetime.now(timezone.utc).timestamp() * 1_000_000_000) + ) + + data_to_log = { + 'start': start, + 'end': end, + 'type': type, + 'unit': point.unit, + 'description': point.description, + 'qty': point.qty, + 'price': point.price, + 'groupby': point.groupby, + 'metadata': point.metadata + } + + flattened_stream = { + "stream": { + 'detected_level': 'unknown', + 'start': start, + 'end': end, + 'groupby_day_of_the_year': data_to_log.get('groupby'). + get('day_of_the_year'), + 'groupby_id': data_to_log.get('groupby').get('id'), + 'groupby_month': data_to_log.get('groupby').get('month'), + 'groupby_project_id': data_to_log.get('groupby'). + get('project_id'), + 'groupby_user_id': data_to_log.get('groupby'). + get('user_id'), + 'groupby_week_of_the_year': data_to_log.get('groupby'). + get('week_of_the_year'), + 'groupby_year': data_to_log.get('groupby'). + get('year'), + 'metadata_flavor_id': data_to_log.get('metadata'). + get('flavor_id'), + 'metadata_flavor_name': data_to_log.get('metadata'). + get('flavor_name'), + 'metadata_vcpus': data_to_log.get('metadata'). + get('vcpus'), + 'price': data_to_log.get('price'), + 'qty': data_to_log.get('qty'), + 'type': data_to_log.get('type'), + 'unit': data_to_log.get('unit') + }, + "values": [ + [loki_timestamp_ns_str, json.dumps(data_to_log)] + ] + } + + self._logs.append(flattened_stream) + + def push(self): + pass + + def __filter_func(self, begin, end, filters, mtypes): + matched_points = [] + for log in self._logs: + stream = log.get('stream') + + if begin and stream.get('start') < begin: + continue + if end and stream.get('start') >= end: + continue + + if mtypes and stream.get('type') not in mtypes: + continue + + filter_match_passes = True + if filters: + for key, value in filters.items(): + if stream.get('groupby_' + key) != value: + filter_match_passes = False + break + if not filter_match_passes: + continue + + matched_points.append(log) + return matched_points + + def retrieve(self, begin, end, filters, metric_types, limit): + points = self.__filter_func(begin, end, filters, metric_types) + + total = len(points) + + if limit > 0: + effective_limit = limit + else: + effective_limit = 1000 + + output = points[:effective_limit] + + if not output: + return 0, [] + + return total, output + + def total(self, begin, end, metric_types, filters, groupby, + custom_fields=None, offset=0, limit=1000, paginate=True): + data = self.__filter_func(begin, end, filters, metric_types) + + if not data: + if not groupby: + return 1, [{'sum_qty': {'value': 0.0}, + 'sum_price': {'value': 0.0}}] + else: + return 0, [] + + if not groupby: + total_qty = 0.0 + total_price = 0.0 + for item in data: + stream = item.get('stream', {}) + qty = float(stream.get('qty', 0)) + price = float(stream.get('price', 0)) + + total_qty += qty + total_price += price + + return 1, [{ + 'sum_qty': {'value': total_qty}, + 'sum_price': {'value': total_price} + }] + + grouped_data = {} + for item in data: + stream = item.get('stream', {}) + qty = float(stream.get('qty', 0)) + price = float(stream.get('price', 0)) + + key_parts = {} + for field in groupby: + if field == 'type': + key_parts[field] = stream.get(field, '') + else: + key_parts[field] = stream.get('groupby_' + field) + + key = tuple((k, v) for k, v in sorted(key_parts.items())) + + if key not in grouped_data: + grouped_data[key] = { + 'sum_qty': 0.0, + 'sum_price': 0.0, + 'key_parts': dict(key_parts) + } + + grouped_data[key]['sum_qty'] += qty + grouped_data[key]['sum_price'] += price + + result = [] + for _key_tuple, values in grouped_data.items(): + result.append({ + 'key': values['key_parts'], + 'sum_qty': {'value': values['sum_qty']}, + 'sum_price': {'value': values['sum_price']} + }) + + if limit <= 0: + paginated_results = result[offset:] + else: + paginated_results = result[offset: offset + limit] + + return len(result), paginated_results diff --git a/cloudkitty/tests/storage/v2/test_storage_unit.py b/cloudkitty/tests/storage/v2/test_storage_unit.py index c2f5aa0d..87bbb2e3 100644 --- a/cloudkitty/tests/storage/v2/test_storage_unit.py +++ b/cloudkitty/tests/storage/v2/test_storage_unit.py @@ -23,6 +23,7 @@ from cloudkitty import storage from cloudkitty.tests import samples from cloudkitty.tests.storage.v2 import es_utils from cloudkitty.tests.storage.v2 import influx_utils +from cloudkitty.tests.storage.v2 import loki_utils from cloudkitty.tests.storage.v2 import opensearch_utils from cloudkitty.tests import TestCase from cloudkitty.tests import utils as test_utils @@ -38,13 +39,18 @@ _INFLUX_CLIENT_PATH = 'cloudkitty.storage.v2.influx.InfluxClient' _OS_CLIENT_PATH = ('cloudkitty.storage.v2.opensearch' '.client.OpenSearchClient') +_LOKI_CLIENT_PATH = ('cloudkitty.storage.v2.loki' + '.client.LokiClient') + class StorageUnitTest(TestCase): storage_scenarios = [ ('influxdb', dict(storage_backend='influxdb')), ('elasticsearch', dict(storage_backend='elasticsearch')), - ('opensearch', dict(storage_backend='opensearch'))] + ('opensearch', dict(storage_backend='opensearch')), + ('loki', dict(storage_backend='loki')) + ] @classmethod def generate_scenarios(cls): @@ -58,6 +64,8 @@ class StorageUnitTest(TestCase): new=influx_utils.FakeInfluxClient) @mock.patch(_OS_CLIENT_PATH, new=opensearch_utils.FakeOpenSearchClient) + @mock.patch(_LOKI_CLIENT_PATH, + new=loki_utils.FakeLokiClient) @mock.patch('cloudkitty.utils.load_conf', new=test_utils.load_conf) def setUp(self): super(StorageUnitTest, self).setUp() @@ -312,7 +320,6 @@ class StorageUnitTest(TestCase): retrieved_length = sum(len(list(frame.iterpoints())) for frame in frames['dataframes']) - self.assertEqual(expected_length, retrieved_length) def test_retrieve_all_scopes_one_type(self): diff --git a/doc/source/admin/configuration/storage.rst b/doc/source/admin/configuration/storage.rst index 4b6d8b33..04800655 100644 --- a/doc/source/admin/configuration/storage.rst +++ b/doc/source/admin/configuration/storage.rst @@ -28,6 +28,7 @@ the configuration file. The following options are available: - ``influxdb`` - ``elasticsearch`` - ``opensearch`` + - ``loki`` Driver-specific options ======================= @@ -111,3 +112,28 @@ Section ``storage_opensearch``: * ``scroll_duration``: Defaults to 30. Duration (in seconds) for which the OpenSearch scroll contexts should be kept alive. + +Loki (v2) +------------------- + +Section ``storage_loki``: + +* ``url``: Defaults to ``http://localhost:3100/loki/api/v1``. Loki host, along + with port and protocol. + +* ``tenant``: Defaults to tenant1. Loki tenant. + +* ``stream``: Defaults to ``{"service": "cloudkitty"}``. The labels that are + going to be used to define the Loki stream as Python dict. + +* ``buffer_size``: Defaults to ``1``. The number of messages that will be + grouped together before launching a Loki HTTP POST request. + +* ``content_type``: Defaults to ``application/json``. The http Content-Type + that will be used to send info to Loki. It can also be + ``application/x-protobuf`` (not supported yet). + +* ``insecure``: Defaults to ``false``. Set to true to allow insecure HTTPS + connections to Loki. + +* ``cafile``: Path of the CA certificate to trust for HTTPS connections. diff --git a/setup.cfg b/setup.cfg index 7f9be3cb..5017ccd7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -69,6 +69,7 @@ cloudkitty.storage.v2.backends = influxdb = cloudkitty.storage.v2.influx:InfluxStorage elasticsearch = cloudkitty.storage.v2.elasticsearch:ElasticsearchStorage opensearch = cloudkitty.storage.v2.opensearch:OpenSearchStorage + loki = cloudkitty.storage.v2.loki:LokiStorage cloudkitty.storage.hybrid.backends = gnocchi = cloudkitty.storage.v1.hybrid.backends.gnocchi:GnocchiStorage