Merge "Add Loki as a v2 dataframe storage"

This commit is contained in:
Zuul
2025-09-30 08:54:50 +00:00
committed by Gerrit Code Review
10 changed files with 1318 additions and 2 deletions

View File

@@ -31,6 +31,7 @@ import cloudkitty.storage
import cloudkitty.storage.v1.hybrid.backends.gnocchi
import cloudkitty.storage.v2.elasticsearch
import cloudkitty.storage.v2.influx
import cloudkitty.storage.v2.loki
import cloudkitty.storage.v2.opensearch
import cloudkitty.utils
@@ -70,6 +71,8 @@ _opts = [
cloudkitty.storage.v2.opensearch.opensearch_storage_opts))),
('storage_gnocchi', list(itertools.chain(
cloudkitty.storage.v1.hybrid.backends.gnocchi.gnocchi_storage_opts))),
('storage_loki', list(itertools.chain(
cloudkitty.storage.v2.loki.loki_storage_opts))),
(None, list(itertools.chain(
cloudkitty.api.app.auth_opts,
cloudkitty.service.service_opts))),

View File

@@ -0,0 +1,178 @@
# Copyright 2025 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import datetime
import json
from oslo_config import cfg
from oslo_log import log as oslo_logging
from cloudkitty import dataframe
from cloudkitty.storage import v2 as v2_storage
from cloudkitty.storage.v2.loki import client as os_client
from cloudkitty.utils import tz as tzutils
LOG = oslo_logging.getLogger(__name__)
CONF = cfg.CONF
LOKI_STORAGE_GROUP = 'storage_loki'
loki_storage_opts = [
cfg.StrOpt(
'url',
help='Loki base url. Defaults to '
'http://localhost:3100/loki/api/v1',
default='http://localhost:3100/loki/api/v1'),
cfg.StrOpt(
'tenant',
help='The loki tenant to be used. Defaults to tenant1.',
default='tenant1'),
cfg.DictOpt(
'stream',
help='The labels that are going to be used to define the Loki stream '
'as Python dict. Defaults to {"service": "cloudkitty"}.',
default={"service": "cloudkitty"}),
cfg.IntOpt(
'buffer_size',
help='The number of messages that will be grouped together before '
'launching a Loki HTTP POST request.',
default=1),
cfg.StrOpt(
'content_type',
help='The http Content-Type that will be used to send info to Loki. '
'Defaults to application/json. It can also be '
'application/x-protobuf',
default='application/json'),
cfg.BoolOpt(
'insecure',
help='Set to true to allow insecure HTTPS connections to Loki',
default=False),
cfg.StrOpt(
'cafile',
help='Path of the CA certificate to trust for HTTPS connections.',
default=None)
]
CONF.register_opts(loki_storage_opts, LOKI_STORAGE_GROUP)
class LokiStorage(v2_storage.BaseStorage):
def __init__(self, *args, **kwargs):
super(LokiStorage, self).__init__(*args, **kwargs)
verify = not CONF.storage_loki.insecure
if verify and CONF.storage_loki.cafile:
verify = CONF.storage_loki.cafile
self._conn = os_client.LokiClient(
CONF.storage_loki.url,
CONF.storage_loki.tenant,
CONF.storage_loki.stream,
CONF.storage_loki.content_type,
CONF.storage_loki.buffer_size)
def init(self):
LOG.debug('LokiStorage Init.')
def push(self, dataframes, scope_id=None):
for frame in dataframes:
for type_, point in frame.iterpoints():
start, end = self._local_to_utc(frame.start, frame.end)
self._conn.add_point(point, type_, start, end)
@staticmethod
def _local_to_utc(*args):
return [tzutils.local_to_utc(arg) for arg in args]
@staticmethod
def _log_to_datapoint(labels):
return dataframe.DataPoint(
labels['unit'],
labels['qty'],
labels['price'],
labels['groupby'],
labels['metadata'],
)
def _build_dataframes(self, logs):
dataframes = {}
for log in logs:
labels = json.loads(log['values'][0][1])
start = tzutils.dt_from_iso(labels['start'])
end = tzutils.dt_from_iso(labels['end'])
key = (start, end)
if key not in dataframes.keys():
dataframes[key] = dataframe.DataFrame(start=start, end=end)
dataframes[key].add_point(
self._log_to_datapoint(labels), labels['type'])
output = list(dataframes.values())
output.sort(key=lambda frame: (frame.start, frame.end))
return output
def retrieve(self, begin=None, end=None,
filters=None,
metric_types=None,
offset=0, limit=1000, paginate=True):
begin, end = self._local_to_utc(begin or tzutils.get_month_start(),
end or tzutils.get_next_month())
total, logs = self._conn.retrieve(
begin, end, filters, metric_types, limit)
dataframes = self._build_dataframes(logs)
return {
'total': total,
'dataframes': dataframes
}
def delete(self, begin=None, end=None, filters=None):
self._conn.delete(begin, end, filters)
@staticmethod
def _normalize_time(t):
if isinstance(t, datetime.datetime):
return tzutils.utc_to_local(t)
return tzutils.dt_from_iso(t)
def _doc_to_total_result(self, doc, start, end):
output = {
'begin': self._normalize_time(doc.get('start', start)),
'end': self._normalize_time(doc.get('end', end)),
'qty': doc['sum_qty']['value'],
'rate': doc['sum_price']['value'],
}
if 'key' in doc.keys():
for key, value in doc['key'].items():
output[key] = value
return output
def total(self, groupby=None, begin=None, end=None, metric_types=None,
filters=None, custom_fields=None, offset=0, limit=1000,
paginate=False):
begin, end = self._local_to_utc(begin or tzutils.get_month_start(),
end or tzutils.get_next_month())
total, docs = self._conn.total(begin, end, metric_types, filters,
groupby, custom_fields=custom_fields,
offset=offset, limit=limit,
paginate=False)
results = [
self._doc_to_total_result(doc, begin, end) for doc in docs
]
return {
'total': total,
'results': results,
}

View File

@@ -0,0 +1,294 @@
# Copyright 2025 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo_log import log
import requests
from cloudkitty.storage.v2.loki import exceptions
from cloudkitty.utils import json
LOG = log.getLogger(__name__)
class LokiClient(object):
"""Class used to ease interaction with Loki."""
def __init__(self, url, tenant, stream_labels, content_type, buffer_size):
if content_type != "application/json":
raise exceptions.UnsupportedContentType(content_type)
self._base_url = url.strip('/')
self._stream_labels = stream_labels
self._headers = {
'X-Scope-OrgID': tenant,
'Content-Type': content_type
}
self._buffer_size = buffer_size
self._points = []
def _build_payload_json(self, batch):
payload = {
"streams": [
{
"stream": self._stream_labels,
"values": batch
}
]
}
return payload
def _dict_to_loki_query(self, tags_dict, groupby=False, brackets=True):
"""Converts from Python dict to Loki query language."""
if not tags_dict:
return '{}'
pairs = []
for key, value in tags_dict.items():
if isinstance(value, list):
value = value[0]
if isinstance(value, str):
value = value.replace('"', '\\"')
if groupby:
pairs.append(f'groupby_{key}="{value}"')
else:
pairs.append(f'{key}="{value}"')
if brackets:
return '{' + ', '.join(pairs) + '}'
else:
return ', '.join(pairs)
def _base_query(self):
"""Makes sure that we always get json results."""
return self._dict_to_loki_query(self._stream_labels) + ' | json'
def search(self, query, begin, end, limit):
url = f"{self._base_url}/query_range"
if query is None:
query = self._base_query()
params = {
"query": query,
"start": int(begin.timestamp() * 1_000_000_000),
"end": int(end.timestamp() * 1_000_000_000),
"limit": limit
}
response = requests.get(url, params=params, headers=self._headers)
if response.status_code == 200:
data = response.json()['data']
else:
msg = (f"Failed to query logs or empty result: "
f"{response.status_code} - {response.text}")
LOG.error(msg)
data = []
return data
def push(self):
"""Send messages to Loki in batches."""
url = f"{self._base_url}/push"
while self._points:
payload = self._build_payload_json(self._points)
response = requests.post(url, json=payload, headers=self._headers)
if response.status_code == 204:
LOG.debug(
f"Batch of {len(self._points)} messages pushed "
f"successfully."
)
self._points = []
else:
LOG.error(
f"Failed to push logs: {response.status_code} - "
f"{response.text}"
)
break
def delete_by_query(self, query, begin, end):
url = f"{self._base_url}/delete"
if query is None:
query = self._base_query()
params = {
"query": query,
"start": int(begin.timestamp()),
"end": int(end.timestamp()),
}
LOG.debug(f"Request Params: {params}")
response = requests.post(url, params=params, headers=self._headers)
if response.status_code == 204:
LOG.debug(
"Dataframes deleted successfully."
)
else:
LOG.error(
f"Failed to delete dataframes: {response.status_code} - "
f"{response.text}"
)
def delete(self, begin, end, filters):
query = self._base_query()
loki_query_parts = []
if filters:
loki_query_parts.append(
self._dict_to_loki_query(
filters, groupby=True, brackets=False
)
)
if loki_query_parts:
query += ' | ' + ', '.join(loki_query_parts)
self.delete_by_query(query, begin, end)
def retrieve(self, begin, end, filters, metric_types, limit):
"""Retrieves dataframes stored in Loki."""
query = self._base_query()
loki_query_parts = []
if filters:
loki_query_parts.append(
self._dict_to_loki_query(
filters, groupby=True, brackets=False
)
)
if metric_types:
if isinstance(metric_types, list):
current_metric_type = metric_types[0]
else:
current_metric_type = metric_types
loki_query_parts.append(f'type = "{current_metric_type}"')
if loki_query_parts:
query += ' | ' + ', '.join(loki_query_parts)
data_response = self.search(query, begin, end, limit)
if not isinstance(data_response, dict) or \
'stats' not in data_response or \
'result' not in data_response:
LOG.warning(
f"Data from Loki search is not in the expected dictionary "
f"format or is missing keys. Query: '{query}'. Response "
f"received: {data_response}"
)
return 0, []
total = data_response.get('stats', {})\
.get('summary', {})\
.get('totalEntriesReturned', 0)
output = data_response.get('result', [])
return total, output
def add_point(self, point, type, start, end):
"""Append a point to the client."""
timestamp_ns = int(end.timestamp() * 1_000_000_000)
timestamp = str(timestamp_ns)
data = {
'start': start,
'end': end,
'type': type,
'unit': point.unit,
'description': point.description,
'qty': point.qty,
'price': point.price,
'groupby': point.groupby,
'metadata': point.metadata,
}
log_line = json.dumps(data)
self._points.append([timestamp, log_line])
if len(self._points) >= self._buffer_size:
self.push()
def total(self, begin, end, metric_types, filters, groupby,
custom_fields, offset, limit, paginate):
"""Calculate total sum of 'price' and 'qty' for entries.
This method calculates totals for entries that match the specified
groupby value.
"""
if custom_fields:
LOG.warning(
"'custom_fields' are not implemented yet for Loki. "
"Therefore, the custom fields [%s] informed by the user "
"will be ignored.", custom_fields
)
if offset != 0:
LOG.warning("offset is not supported by Loki.")
total_count, data = self.retrieve(
begin, end, filters, metric_types, limit
)
if not groupby:
total_qty = 0.0
total_price = 0.0
for item in data:
stream = item.get('stream', {})
qty = float(stream.get('qty', 0))
price = float(stream.get('price', 0))
total_qty += qty
total_price += price
return 1, [{
'sum_qty': {'value': total_qty},
'sum_price': {'value': total_price}
}]
grouped_data = {}
for item in data:
stream = item.get('stream', {})
qty = float(stream.get('qty', 0))
price = float(stream.get('price', 0))
key_parts = {}
for field in groupby:
if field == 'type':
key_parts[field] = stream.get(field, '')
else:
key_parts[field] = stream.get('groupby_' + field)
key = tuple((k, v) for k, v in sorted(key_parts.items()))
if key not in grouped_data:
grouped_data[key] = {
'sum_qty': 0.0,
'sum_price': 0.0,
'key_parts': dict(key_parts)
}
grouped_data[key]['sum_qty'] += qty
grouped_data[key]['sum_price'] += price
result = []
for _key_tuple, values in grouped_data.items():
result.append({
'key': values['key_parts'],
'sum_qty': {'value': values['sum_qty']},
'sum_price': {'value': values['sum_price']}
})
return len(result), result

View File

@@ -0,0 +1,33 @@
# Copyright 2025 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
class BaseLokiException(Exception):
"""Base exception raised by the Loki v2 storage driver"""
class UnsupportedContentType(BaseLokiException):
def __init__(self, content_type):
super(UnsupportedContentType, self).__init__(
"Content-Type {} is not supported. Use supported formats: "
"application/json".format(content_type)
)
class NotImplementedYet(BaseLokiException):
def __init__(self, method):
super(NotImplementedYet, self).__init__(
"Method {} not implemented yet".format(method)
)

View File

@@ -0,0 +1,567 @@
# Copyright 2025 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from datetime import datetime
from datetime import timezone
import json
import unittest
from unittest.mock import call
from unittest.mock import MagicMock
from unittest.mock import patch
from cloudkitty.storage.v2.loki import client
from cloudkitty.storage.v2.loki import exceptions
class MockDataPoint:
def __init__(self, unit="USD/h", description="desc", qty=1.0, price=10.0,
groupby=None, metadata=None):
self.unit = unit
self.description = description
self.qty = qty
self.price = price
self.groupby = groupby if groupby is not None \
else {'project_id': 'proj1'}
self.metadata = metadata if metadata is not None \
else {'meta_key': 'meta_val'}
@patch('cloudkitty.storage.v2.loki.client.requests', autospec=True)
@patch('cloudkitty.storage.v2.loki.client.LOG', autospec=True)
class TestLokiClient(unittest.TestCase):
def setUp(self):
self.base_url = "http://loki:3100/loki/api/v1"
self.tenant = "test_tenant"
self.stream_labels = {"app": "cloudkitty", "source": "test"}
self.content_type = "application/json"
self.buffer_size = 2
self.client = client.LokiClient(
self.base_url,
self.tenant,
self.stream_labels,
self.content_type,
self.buffer_size
)
self.begin_dt = datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
self.end_dt = datetime(2024, 1, 1, 1, 0, 0, tzinfo=timezone.utc)
def test_init_success(self, mock_log, mock_requests):
self.assertEqual(self.client._base_url, self.base_url)
self.assertEqual(self.client._stream_labels, self.stream_labels)
self.assertEqual(self.client._headers['X-Scope-OrgID'], self.tenant)
self.assertEqual(self.client._headers['Content-Type'],
self.content_type)
self.assertEqual(self.client._buffer_size, self.buffer_size)
self.assertEqual(self.client._points, [])
def test_init_unsupported_content_type(self, mock_log, mock_requests):
with self.assertRaises(exceptions.UnsupportedContentType):
client.LokiClient(self.base_url, self.tenant, self.stream_labels,
"text/plain", self.buffer_size)
def test_build_payload_json(self, mock_log, mock_requests):
batch = [["1609459200000000000", "log line 1"],
["1609459200000000001", "log line 2"]]
payload = self.client._build_payload_json(batch)
expected_payload = {
"streams": [
{
"stream": self.stream_labels,
"values": batch
}
]
}
self.assertEqual(payload, expected_payload)
def test_dict_to_loki_query(self, mock_log, mock_requests):
self.assertEqual(self.client._dict_to_loki_query({}), '{}')
self.assertEqual(self.client._dict_to_loki_query({"foo": "bar"}),
'{foo="bar"}')
self.assertEqual(
self.client._dict_to_loki_query({"foo": "bar", "baz": "qux"}),
'{foo="bar", baz="qux"}'
)
self.assertIn('foo="bar"', self.client._dict_to_loki_query(
{"foo": "bar", "baz": "qux"}))
self.assertIn('baz="qux"', self.client._dict_to_loki_query(
{"foo": "bar", "baz": "qux"}))
self.assertEqual(
self.client._dict_to_loki_query({"foo": ["bar", "baz"]}),
'{foo="bar"}'
)
self.assertEqual(
self.client._dict_to_loki_query({"path": "/api/v1"}),
'{path="/api/v1"}'
)
self.assertEqual(
self.client._dict_to_loki_query({"msg": 'hello "world"'}),
'{msg="hello \\"world\\""}'
)
self.assertEqual(
self.client._dict_to_loki_query({"foo": "bar"}, groupby=True),
'{groupby_foo="bar"}'
)
self.assertEqual(
self.client._dict_to_loki_query({"foo": "bar"}, brackets=False),
'foo="bar"'
)
self.assertEqual(
self.client._dict_to_loki_query(
{"foo": "bar", "baz": "qux"}, brackets=False
),
'foo="bar", baz="qux"'
)
def test_base_query(self, mock_log, mock_requests):
expected_query = (self.client._dict_to_loki_query(self.stream_labels)
+ ' | json')
self.assertEqual(self.client._base_query(), expected_query)
def test_search_success(self, mock_log, mock_requests):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"data": {
"resultType": "streams",
"result": [{"stream": {}, "values": []}]
}
}
mock_requests.get.return_value = mock_response
query = '{app="test"} | json'
data = self.client.search(query, self.begin_dt, self.end_dt, 100)
expected_url = f"{self.base_url}/query_range"
expected_params = {
"query": query,
"start": int(self.begin_dt.timestamp() * 1_000_000_000),
"end": int(self.end_dt.timestamp() * 1_000_000_000),
"limit": 100
}
mock_requests.get.assert_called_once_with(
expected_url,
params=expected_params,
headers=self.client._headers
)
self.assertEqual(
data,
{"resultType": "streams", "result": [{"stream": {}, "values": []}]}
)
def test_search_no_query_uses_base_query(self, mock_log, mock_requests):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"data": {"result": []}}
mock_requests.get.return_value = mock_response
self.client.search(None, self.begin_dt, self.end_dt, 100)
expected_query = self.client._base_query()
mock_requests.get.assert_called_once()
_called_args, called_kwargs = mock_requests.get.call_args
self.assertEqual(called_kwargs['params']['query'], expected_query)
def test_search_failure(self, mock_log, mock_requests):
mock_response = MagicMock()
mock_response.status_code = 500
mock_response.text = "Internal Server Error"
mock_requests.get.return_value = mock_response
query = '{app="test"} | json'
data = self.client.search(query, self.begin_dt, self.end_dt, 100)
self.assertEqual(data, [])
expected_msg = ("Failed to query logs or empty result: 500 - "
"Internal Server Error")
mock_log.error.assert_called_once_with(expected_msg)
def test_push_success_batch(self, mock_log, mock_requests):
mock_response = MagicMock()
mock_response.status_code = 204
mock_requests.post.return_value = mock_response
self.client._points = [["ts1", "log1"], ["ts2", "log2"]]
self.client.push()
expected_url = f"{self.base_url}/push"
expected_payload = self.client._build_payload_json(
[["ts1", "log1"], ["ts2", "log2"]]
)
mock_requests.post.assert_called_once_with(
expected_url, json=expected_payload, headers=self.client._headers
)
self.assertEqual(self.client._points, [])
log_msg = "Batch of 2 messages pushed successfully."
mock_log.debug.assert_called_once_with(log_msg)
def test_push_failure(self, mock_log, mock_requests):
mock_response = MagicMock()
mock_response.status_code = 400
mock_response.text = "Bad Request"
mock_requests.post.return_value = mock_response
initial_points = [["ts1", "log1"], ["ts2", "log2"]]
self.client._points = list(initial_points)
self.client.push()
self.assertEqual(self.client._points, initial_points)
expected_msg = "Failed to push logs: 400 - Bad Request"
mock_log.error.assert_called_once_with(expected_msg)
def test_push_no_points(self, mock_log, mock_requests):
self.client._points = []
self.client.push()
mock_requests.post.assert_not_called()
@patch.object(client.LokiClient, 'search')
def test_retrieve_no_filters_no_metric_types(self, mock_search, mock_log,
mock_requests_arg):
mock_search_result = {
"stats": {"summary": {"totalEntriesReturned": 5}},
"result": [{"stream": {}, "values": [["ts", '{"key":"val"}']]}]
}
mock_search.return_value = mock_search_result
total, output = self.client.retrieve(self.begin_dt, self.end_dt,
None, None, 100)
expected_base_query = self.client._base_query()
mock_search.assert_called_once_with(
expected_base_query, self.begin_dt, self.end_dt, 100
)
self.assertEqual(total, 5)
self.assertEqual(output, mock_search_result["result"])
@patch.object(client.LokiClient, 'search')
def test_retrieve_with_filters_and_metric_type_string(
self, mock_search, mock_log, mock_requests_arg):
mock_search_result = {
"stats": {"summary": {"totalEntriesReturned": 2}},
"result": [{"stream": {}, "values": [["ts", '{"type":"t1"}']]}]
}
mock_search.return_value = mock_search_result
filters = {"project_id": "proj1", "region": "reg1"}
metric_types = "cpu_util"
total, output = self.client.retrieve(self.begin_dt, self.end_dt,
filters, metric_types, 50)
base_query = self.client._base_query()
filter_query_part = self.client._dict_to_loki_query(
filters, groupby=True, brackets=False
)
metric_query_part = f'type = "{metric_types}"'
expected_full_query = (
f"{base_query} | {filter_query_part}, {metric_query_part}"
)
mock_search.assert_called_once_with(
expected_full_query, self.begin_dt, self.end_dt, 50
)
self.assertEqual(total, 2)
self.assertEqual(output, mock_search_result["result"])
@patch.object(client.LokiClient, 'search')
def test_retrieve_with_metric_type_list(self, mock_search, mock_log,
mock_requests_arg):
mock_search.return_value = {
"stats": {"summary": {"totalEntriesReturned": 1}},
"result": ["data"]
}
metric_types = ["cpu_util", "ram_util"]
self.client.retrieve(self.begin_dt, self.end_dt, None,
metric_types, 50)
base_query = self.client._base_query()
metric_query_part = f'type = "{metric_types[0]}"'
expected_full_query = f"{base_query} | {metric_query_part}"
mock_search.assert_called_once_with(
expected_full_query, self.begin_dt, self.end_dt, 50
)
@patch.object(client.LokiClient, 'search')
def test_retrieve_empty_or_malformed_search_response(
self, mock_search, mock_loki_client_log, mock_requests_arg):
mock_search.return_value = []
expected_query_for_log = self.client._base_query()
total, output = self.client.retrieve(self.begin_dt, self.end_dt,
None, None, 100)
self.assertEqual(total, 0)
self.assertEqual(output, [])
expected_log_message_case1 = (
f"Data from Loki search is not in the expected dictionary format "
f"or is missing keys. Query: '{expected_query_for_log}'. "
f"Response received: {mock_search.return_value}"
)
mock_loki_client_log.warning.assert_called_with(
expected_log_message_case1
)
mock_search.reset_mock()
mock_loki_client_log.reset_mock()
mock_search.return_value = {"nodata": True}
total, output = self.client.retrieve(self.begin_dt, self.end_dt,
None, None, 100)
self.assertEqual(total, 0)
self.assertEqual(output, [])
expected_log_message_case2 = (
f"Data from Loki search is not in the expected dictionary format "
f"or is missing keys. Query: '{expected_query_for_log}'. "
f"Response received: {mock_search.return_value}"
)
mock_loki_client_log.warning.assert_called_with(
expected_log_message_case2
)
@patch.object(client.LokiClient, 'push')
def test_add_point_no_push(self, mock_push, mock_log,
mock_requests_arg):
self.client._buffer_size = 3
point = MockDataPoint(qty=1, price=10)
self.client.add_point(point, "test_type", self.begin_dt, self.end_dt)
self.assertEqual(len(self.client._points), 1)
added_point_data = json.loads(self.client._points[0][1])
self.assertEqual(added_point_data['type'], "test_type")
self.assertEqual(added_point_data['qty'], 1)
self.assertEqual(added_point_data['price'], 10)
self.assertEqual(added_point_data['start'],
"2024-01-01T00:00:00+00:00")
self.assertEqual(added_point_data['end'], "2024-01-01T01:00:00+00:00")
self.assertEqual(added_point_data['groupby'], point.groupby)
mock_push.assert_not_called()
@patch.object(client.LokiClient, 'push')
def test_add_point_triggers_push(self, mock_push, mock_log,
mock_requests_arg):
self.client._buffer_size = 1
point = MockDataPoint()
self.client.add_point(point, "test_type", self.begin_dt, self.end_dt)
self.assertEqual(len(self.client._points), 1)
mock_push.assert_called_once()
@patch.object(client.LokiClient, 'retrieve')
def test_total_no_groupby(self, mock_retrieve, mock_log,
mock_requests_arg):
loki_data_for_total = [
{
"stream": {"groupby_project_id": "p1", "type": "t1",
"qty": "10.0", "price": "100.0"},
"values": [["ts1", json.dumps(
{"qty": 10.0, "price": 100.0, "type": "t1",
"groupby": {"project_id": "p1"}})]]
},
{
"stream": {"groupby_project_id": "p1", "type": "t1",
"qty": "5.0", "price": "50.0"},
"values": [["ts2", json.dumps(
{"qty": 5.0, "price": 50.0, "type": "t1",
"groupby": {"project_id": "p1"}})]]
},
]
mock_retrieve.return_value = (2, loki_data_for_total)
count, result = self.client.total(
self.begin_dt, self.end_dt, "some_type", None, None,
None, 0, 100, False
)
mock_retrieve.assert_called_once_with(
self.begin_dt, self.end_dt, None, "some_type", 100
)
self.assertEqual(count, 1)
self.assertEqual(len(result), 1)
self.assertAlmostEqual(result[0]['sum_qty']['value'], 15.0)
self.assertAlmostEqual(result[0]['sum_price']['value'], 150.0)
mock_log.warning.assert_not_called()
@patch.object(client.LokiClient, 'retrieve')
def test_total_with_groupby(self, mock_retrieve, mock_log,
mock_requests_arg):
loki_data_for_total = [
{
"stream": {"groupby_project_id": "proj1", "type": "typeA",
"qty": 10.0, "price": 100.0},
"values": [["ts1", json.dumps(
{"qty": 10.0, "price": 100.0, "type": "typeA",
"groupby": {"project_id": "proj1"}})]
]
},
{
"stream": {"groupby_project_id": "proj1", "type": "typeB",
"qty": 5.0, "price": 50.0},
"values": [["ts2", json.dumps(
{"qty": 5.0, "price": 50.0, "type": "typeB",
"groupby": {"project_id": "proj1"}})]
]
},
{
"stream": {"groupby_project_id": "proj2", "type": "typeA",
"qty": 2.0, "price": 20.0},
"values": [["ts3", json.dumps(
{"qty": 2.0, "price": 20.0, "type": "typeA",
"groupby": {"project_id": "proj2"}})]
]
},
{
"stream": {"groupby_project_id": "proj1", "type": "typeA",
"qty": 8.0, "price": 80.0},
"values": [["ts4", json.dumps(
{"qty": 8.0, "price": 80.0, "type": "typeA",
"groupby": {"project_id": "proj1"}})]
]
},
]
mock_retrieve.return_value = (4, loki_data_for_total)
groupby_fields = ["type", "project_id"]
count, result = self.client.total(
self.begin_dt, self.end_dt, "any_metric_type",
{"filter_key": "val"}, groupby_fields,
None, 0, 100, False
)
mock_retrieve.assert_called_once_with(
self.begin_dt, self.end_dt, {"filter_key": "val"},
"any_metric_type", 100
)
self.assertEqual(count, 3)
expected_results_map = {
tuple(sorted({'type': 'typeA',
'project_id': 'proj1'}.items())):
{'qty': 18.0, 'price': 180.0},
tuple(sorted({'type': 'typeB',
'project_id': 'proj1'}.items())):
{'qty': 5.0, 'price': 50.0},
tuple(sorted({'type': 'typeA',
'project_id': 'proj2'}.items())):
{'qty': 2.0, 'price': 20.0},
}
self.assertEqual(len(result), len(expected_results_map))
for res_item in result:
key_from_result_tuple = tuple(sorted(res_item['key'].items()))
self.assertIn(key_from_result_tuple, expected_results_map)
expected_values = expected_results_map[key_from_result_tuple]
self.assertAlmostEqual(res_item['sum_qty']['value'],
expected_values['qty'])
self.assertAlmostEqual(res_item['sum_price']['value'],
expected_values['price'])
@patch.object(client.LokiClient, 'retrieve')
def test_total_with_custom_fields_and_offset_logs_warnings(
self, mock_retrieve, mock_log, mock_requests_arg):
mock_retrieve.return_value = (0, [])
custom_fields = ["field1", "field2"]
offset = 5
self.client.total(
self.begin_dt, self.end_dt, None, None, None,
custom_fields, offset, 100, False
)
mock_log.warning.assert_any_call(
"'custom_fields' are not implemented yet for Loki. Therefore, "
"the custom fields [%s] informed by the user will be ignored.",
custom_fields
)
mock_log.warning.assert_any_call("offset is not supported by Loki.")
@patch.object(client.LokiClient, '_base_query')
def test_delete_by_query_success(self, mq, ml, mr):
mr.post.return_value = MagicMock(status_code=204)
test_query = '{app="cloudkitty"} | json ' \
'| type="compute.instance.exists"}'
self.client.delete_by_query(test_query, self.begin_dt, self.end_dt)
mr.post.assert_called_once()
ml.debug.assert_has_calls([
call(f"Request Params: {{'query': '{test_query}', "
f"'start': {int(self.begin_dt.timestamp())}, "
f"'end': {int(self.end_dt.timestamp())}}}"),
call("Dataframes deleted successfully.")
])
mq.assert_not_called()
@patch.object(client.LokiClient, '_base_query')
def test_delete_by_query_failure(self, mock_base_query, mock_log,
mock_requests_arg):
mock_response = MagicMock()
mock_response.status_code = 500
mock_response.text = "Internal Server Error"
mock_requests_arg.post.return_value = mock_response
test_query = '{app="cloudkitty"} | json | '
'type="compute.instance.exists"'
self.client.delete_by_query(test_query, self.begin_dt, self.end_dt)
expected_url = f"{self.base_url}/delete"
expected_params = {
"query": test_query,
"start": int(self.begin_dt.timestamp()),
"end": int(self.end_dt.timestamp()),
}
mock_requests_arg.post.assert_called_once_with(
expected_url,
params=expected_params,
headers=self.client._headers
)
expected_error_msg = ("Failed to delete dataframes: "
"500 - Internal Server Error")
mock_log.error.assert_called_once_with(expected_error_msg)
mock_base_query.assert_not_called()
@patch.object(client.LokiClient, 'delete_by_query')
@patch.object(client.LokiClient, '_base_query')
def test_delete_with_filters(self, mock_base_query, mock_delete_by_query,
mock_log, mock_requests_arg):
mock_base_query.return_value = '{app="cloudkitty", source="test"} ' \
'| json'
filters = {"project_id": "proj1", "resource_type": "instance"}
self.client.delete(self.begin_dt, self.end_dt, filters)
exp_query_filters = 'groupby_project_id="proj1", ' \
'groupby_resource_type="instance"'
exp_query = f'{mock_base_query.return_value} | {exp_query_filters}'
mock_delete_by_query.assert_called_once_with(
exp_query, self.begin_dt, self.end_dt
)
mock_base_query.assert_called_once()
@patch.object(client.LokiClient, 'delete_by_query')
@patch.object(client.LokiClient, '_base_query')
def test_delete_no_filters(self, mock_base_query, mock_delete_by_query,
mock_log, mock_requests_arg):
mock_base_query.return_value = '{app="cloudkitty", source="test"} ' \
'| json'
self.client.delete(self.begin_dt, self.end_dt, None)
mock_delete_by_query.assert_called_once_with(
mock_base_query.return_value, self.begin_dt, self.end_dt
)
mock_base_query.assert_called_once()

View File

@@ -0,0 +1,207 @@
# Copyright 2025 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from datetime import datetime
from datetime import timezone
from cloudkitty.storage.v2.loki import client as loki_client_module
from cloudkitty.storage.v2.loki import exceptions as loki_exceptions
from cloudkitty.utils import json
class FakeLokiClient(loki_client_module.LokiClient):
def __init__(self, url, tenant, stream_labels, content_type,
buffer_size, **kwargs):
if content_type != "application/json":
raise loki_exceptions.UnsupportedContentType(content_type)
self._base_url = url.strip('/') if url else 'http://fake-loki'
self._stream_labels = stream_labels if stream_labels \
else {"fake_label": "fake_value"}
self._headers = {
'X-Scope-OrgID': tenant,
'Content-Type': content_type
}
self._buffer_size = buffer_size
self._logs = []
self.init()
def init(self):
self._logs = []
def add_point(self, point, type, start, end):
loki_timestamp_ns_str = str(
int(datetime.now(timezone.utc).timestamp() * 1_000_000_000)
)
data_to_log = {
'start': start,
'end': end,
'type': type,
'unit': point.unit,
'description': point.description,
'qty': point.qty,
'price': point.price,
'groupby': point.groupby,
'metadata': point.metadata
}
flattened_stream = {
"stream": {
'detected_level': 'unknown',
'start': start,
'end': end,
'groupby_day_of_the_year': data_to_log.get('groupby').
get('day_of_the_year'),
'groupby_id': data_to_log.get('groupby').get('id'),
'groupby_month': data_to_log.get('groupby').get('month'),
'groupby_project_id': data_to_log.get('groupby').
get('project_id'),
'groupby_user_id': data_to_log.get('groupby').
get('user_id'),
'groupby_week_of_the_year': data_to_log.get('groupby').
get('week_of_the_year'),
'groupby_year': data_to_log.get('groupby').
get('year'),
'metadata_flavor_id': data_to_log.get('metadata').
get('flavor_id'),
'metadata_flavor_name': data_to_log.get('metadata').
get('flavor_name'),
'metadata_vcpus': data_to_log.get('metadata').
get('vcpus'),
'price': data_to_log.get('price'),
'qty': data_to_log.get('qty'),
'type': data_to_log.get('type'),
'unit': data_to_log.get('unit')
},
"values": [
[loki_timestamp_ns_str, json.dumps(data_to_log)]
]
}
self._logs.append(flattened_stream)
def push(self):
pass
def __filter_func(self, begin, end, filters, mtypes):
matched_points = []
for log in self._logs:
stream = log.get('stream')
if begin and stream.get('start') < begin:
continue
if end and stream.get('start') >= end:
continue
if mtypes and stream.get('type') not in mtypes:
continue
filter_match_passes = True
if filters:
for key, value in filters.items():
if stream.get('groupby_' + key) != value:
filter_match_passes = False
break
if not filter_match_passes:
continue
matched_points.append(log)
return matched_points
def retrieve(self, begin, end, filters, metric_types, limit):
points = self.__filter_func(begin, end, filters, metric_types)
total = len(points)
if limit > 0:
effective_limit = limit
else:
effective_limit = 1000
output = points[:effective_limit]
if not output:
return 0, []
return total, output
def total(self, begin, end, metric_types, filters, groupby,
custom_fields=None, offset=0, limit=1000, paginate=True):
data = self.__filter_func(begin, end, filters, metric_types)
if not data:
if not groupby:
return 1, [{'sum_qty': {'value': 0.0},
'sum_price': {'value': 0.0}}]
else:
return 0, []
if not groupby:
total_qty = 0.0
total_price = 0.0
for item in data:
stream = item.get('stream', {})
qty = float(stream.get('qty', 0))
price = float(stream.get('price', 0))
total_qty += qty
total_price += price
return 1, [{
'sum_qty': {'value': total_qty},
'sum_price': {'value': total_price}
}]
grouped_data = {}
for item in data:
stream = item.get('stream', {})
qty = float(stream.get('qty', 0))
price = float(stream.get('price', 0))
key_parts = {}
for field in groupby:
if field == 'type':
key_parts[field] = stream.get(field, '')
else:
key_parts[field] = stream.get('groupby_' + field)
key = tuple((k, v) for k, v in sorted(key_parts.items()))
if key not in grouped_data:
grouped_data[key] = {
'sum_qty': 0.0,
'sum_price': 0.0,
'key_parts': dict(key_parts)
}
grouped_data[key]['sum_qty'] += qty
grouped_data[key]['sum_price'] += price
result = []
for _key_tuple, values in grouped_data.items():
result.append({
'key': values['key_parts'],
'sum_qty': {'value': values['sum_qty']},
'sum_price': {'value': values['sum_price']}
})
if limit <= 0:
paginated_results = result[offset:]
else:
paginated_results = result[offset: offset + limit]
return len(result), paginated_results

View File

@@ -23,6 +23,7 @@ from cloudkitty import storage
from cloudkitty.tests import samples
from cloudkitty.tests.storage.v2 import es_utils
from cloudkitty.tests.storage.v2 import influx_utils
from cloudkitty.tests.storage.v2 import loki_utils
from cloudkitty.tests.storage.v2 import opensearch_utils
from cloudkitty.tests import TestCase
from cloudkitty.tests import utils as test_utils
@@ -38,13 +39,18 @@ _INFLUX_CLIENT_PATH = 'cloudkitty.storage.v2.influx.InfluxClient'
_OS_CLIENT_PATH = ('cloudkitty.storage.v2.opensearch'
'.client.OpenSearchClient')
_LOKI_CLIENT_PATH = ('cloudkitty.storage.v2.loki'
'.client.LokiClient')
class StorageUnitTest(TestCase):
storage_scenarios = [
('influxdb', dict(storage_backend='influxdb')),
('elasticsearch', dict(storage_backend='elasticsearch')),
('opensearch', dict(storage_backend='opensearch'))]
('opensearch', dict(storage_backend='opensearch')),
('loki', dict(storage_backend='loki'))
]
@classmethod
def generate_scenarios(cls):
@@ -58,6 +64,8 @@ class StorageUnitTest(TestCase):
new=influx_utils.FakeInfluxClient)
@mock.patch(_OS_CLIENT_PATH,
new=opensearch_utils.FakeOpenSearchClient)
@mock.patch(_LOKI_CLIENT_PATH,
new=loki_utils.FakeLokiClient)
@mock.patch('cloudkitty.utils.load_conf', new=test_utils.load_conf)
def setUp(self):
super(StorageUnitTest, self).setUp()
@@ -312,7 +320,6 @@ class StorageUnitTest(TestCase):
retrieved_length = sum(len(list(frame.iterpoints()))
for frame in frames['dataframes'])
self.assertEqual(expected_length, retrieved_length)
def test_retrieve_all_scopes_one_type(self):

View File

@@ -28,6 +28,7 @@ the configuration file. The following options are available:
- ``influxdb``
- ``elasticsearch``
- ``opensearch``
- ``loki``
Driver-specific options
=======================
@@ -111,3 +112,28 @@ Section ``storage_opensearch``:
* ``scroll_duration``: Defaults to 30. Duration (in seconds) for which the
OpenSearch scroll contexts should be kept alive.
Loki (v2)
-------------------
Section ``storage_loki``:
* ``url``: Defaults to ``http://localhost:3100/loki/api/v1``. Loki host, along
with port and protocol.
* ``tenant``: Defaults to tenant1. Loki tenant.
* ``stream``: Defaults to ``{"service": "cloudkitty"}``. The labels that are
going to be used to define the Loki stream as Python dict.
* ``buffer_size``: Defaults to ``1``. The number of messages that will be
grouped together before launching a Loki HTTP POST request.
* ``content_type``: Defaults to ``application/json``. The http Content-Type
that will be used to send info to Loki. It can also be
``application/x-protobuf`` (not supported yet).
* ``insecure``: Defaults to ``false``. Set to true to allow insecure HTTPS
connections to Loki.
* ``cafile``: Path of the CA certificate to trust for HTTPS connections.

View File

@@ -69,6 +69,7 @@ cloudkitty.storage.v2.backends =
influxdb = cloudkitty.storage.v2.influx:InfluxStorage
elasticsearch = cloudkitty.storage.v2.elasticsearch:ElasticsearchStorage
opensearch = cloudkitty.storage.v2.opensearch:OpenSearchStorage
loki = cloudkitty.storage.v2.loki:LokiStorage
cloudkitty.storage.hybrid.backends =
gnocchi = cloudkitty.storage.v1.hybrid.backends.gnocchi:GnocchiStorage