Replace eval with function matching

Using eval in code is not safe and may lead to security
risks, especially given that query itself is supplied by a user.

This refactors the code making is presumably safe
and prone to code injections, which are possible with eval.

Story: 2011539
Task: 52866
Change-Id: If629023052aa2c067c419bba10837f77bcc3e59c
Signed-off-by: Dmitriy Rabotyagov <dmitriy@adria-cloud.com>
This commit is contained in:
Dmitriy Rabotyagov
2025-09-30 20:46:39 +02:00
parent a45ca6541b
commit 5b57e2b32a
2 changed files with 42 additions and 35 deletions

View File

@@ -0,0 +1,5 @@
---
security:
- |
A security issue in the entity graph querying mechanism has been fixed.
This change hardens the query parser against malicious input.

View File

@@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import operator
from oslo_log import log as logging
from vitrage.common.exception import VitrageError
@@ -21,13 +22,21 @@ LOG = logging.getLogger(__name__)
operators = [
'<',
'<=',
# '=',
'==',
'!=',
'>=',
'>',
]
ops = {
'<': operator.lt,
'<=': operator.le,
'==': operator.eq,
'!=': operator.ne,
'>=': operator.ge,
'>': operator.gt,
}
logical_operations = [
'and',
'or'
@@ -64,10 +73,7 @@ def create_predicate(query_dict):
:return: a predicate "match(item)"
"""
try:
expression = _create_query_expression(query=query_dict)
LOG.debug('create_predicate::%s', expression)
expression = 'lambda item: ' + expression
return eval(expression)
return _create_query_function(query=query_dict)
except Exception as e:
LOG.error('invalid query format %s. Exception: %s',
query_dict, e)
@@ -75,46 +81,42 @@ def create_predicate(query_dict):
query_dict, e)
def _create_query_expression(query, parent_operator=None):
expressions = []
def _create_query_function(query, parent_operator=None):
# First element or element under logical operation
if not parent_operator and isinstance(query, dict):
(key, value) = query.copy().popitem()
return _create_query_expression(value, key)
return _create_query_function(value, key)
# Continue recursion on logical (and/or) operation
elif parent_operator in logical_operations and isinstance(query, list):
for val in query:
expressions.append(_create_query_expression(val))
return _join_logical_operator(parent_operator, expressions)
predicates = [_create_query_function(val) for val in query]
if not predicates:
return lambda item: False
if parent_operator == 'and':
return lambda item: all(p(item) for p in predicates)
elif parent_operator == 'or':
return lambda item: any(p(item) for p in predicates)
# Recursion evaluate leaf (stop condition)
elif parent_operator in operators:
for key, val in query.items():
expressions.append('item.get(' + _evaluable_str(key) + ')' +
parent_operator + ' ' + _evaluable_str(val))
return _join_logical_operator('and', expressions)
predicates = []
op_func = ops[parent_operator]
for field, value in query.items():
predicates.append(
lambda item, f=field, v=value: op_func(item.get(f), v)
)
# Multiple conditions under a comparison operator are implicitly 'and'
if len(predicates) > 1:
return lambda item: all(p(item) for p in predicates)
elif predicates:
return predicates[0]
else:
return lambda item: False
else:
raise VitrageError('invalid partial query format',
parent_operator, query)
def _evaluable_str(value):
"""wrap string/unicode with back tick"""
if isinstance(value, str):
return '\'' + value + '\''
else:
return str(value)
def _join_logical_operator(op, expressions):
"""Create an expressions string
Example input:
op='AND'
expressions=['a == b', 'c < d']
Example output: (a == b AND c < d)
"""
separator = ' ' + op + ' '
return '(' + separator.join(expressions) + ')'