diff --git a/etc/mistral.conf.sample b/etc/mistral.conf.sample index 66ac6d863..aa9850849 100644 --- a/etc/mistral.conf.sample +++ b/etc/mistral.conf.sample @@ -327,6 +327,9 @@ # The version of the executor. (string value) #version = 1.0 +# The default maximum size in KB of large text fields of runtime +# execution objects. Use -1 for no limit. (integer value) +#execution_field_size_limit_kb = 1024 [keystone_authtoken] diff --git a/mistral/config.py b/mistral/config.py index f81033330..b18904fe6 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -72,7 +72,10 @@ engine_opts = [ cfg.StrOpt('topic', default='mistral_engine', help='The message topic that the engine listens on.'), cfg.StrOpt('version', default='1.0', - help='The version of the engine.') + help='The version of the engine.'), + cfg.IntOpt('execution_field_size_limit_kb', default=1024, + help='The default maximum size in KB of large text fields ' + 'of runtime execution objects. Use -1 for no limit.'), ] executor_opts = [ diff --git a/mistral/db/sqlalchemy/migration/alembic_migrations/versions/005_increase_execution_columns_size.py b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/005_increase_execution_columns_size.py new file mode 100644 index 000000000..14b96e65f --- /dev/null +++ b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/005_increase_execution_columns_size.py @@ -0,0 +1,45 @@ +# Copyright 2015 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Increase executions_v2 column size from JsonDictType to JsonLongDictType + +Revision ID: 005 +Revises: 004 +Create Date: 2015-07-21 08:48:51.636094 + +""" + +# revision identifiers, used by Alembic. +revision = '005' +down_revision = '004' + +from alembic import op +from mistral.db.sqlalchemy import types as st + + +def upgrade(): + # Changing column types from JsonDictType to JsonLongDictType + op.alter_column('executions_v2', 'runtime_context', + type_=st.JsonLongDictType()) + op.alter_column('executions_v2', 'input', + type_=st.JsonLongDictType()) + op.alter_column('executions_v2', 'params', + type_=st.JsonLongDictType()) + op.alter_column('executions_v2', 'context', + type_=st.JsonLongDictType()) + op.alter_column('executions_v2', 'action_spec', + type_=st.JsonLongDictType()) + op.alter_column('executions_v2', 'published', + type_=st.JsonLongDictType()) diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py index adde499f4..d0a17adde 100644 --- a/mistral/db/v2/sqlalchemy/models.py +++ b/mistral/db/v2/sqlalchemy/models.py @@ -19,14 +19,21 @@ import sqlalchemy as sa from sqlalchemy import event from sqlalchemy.orm import backref from sqlalchemy.orm import relationship +import sys + +from oslo_config import cfg +from oslo_log import log as logging from mistral.db.sqlalchemy import model_base as mb from mistral.db.sqlalchemy import types as st +from mistral import exceptions as exc from mistral import utils # Definition objects. +LOG = logging.getLogger(__name__) + class Definition(mb.MistralSecureModelBase): __abstract__ = True @@ -106,7 +113,7 @@ class Execution(mb.MistralSecureModelBase): # Runtime context like iteration_no of a repeater. # Effectively internal engine properties which will be used to determine # execution of a task. - runtime_context = sa.Column(st.JsonDictType()) + runtime_context = sa.Column(st.JsonLongDictType()) class ActionExecution(Execution): @@ -118,7 +125,7 @@ class ActionExecution(Execution): # Main properties. accepted = sa.Column(sa.Boolean(), default=False) - input = sa.Column(st.JsonDictType(), nullable=True) + input = sa.Column(st.JsonLongDictType(), nullable=True) output = sa.orm.deferred(sa.Column(st.JsonLongDictType(), nullable=True)) @@ -131,10 +138,10 @@ class WorkflowExecution(ActionExecution): } # Main properties. - params = sa.Column(st.JsonDictType()) + params = sa.Column(st.JsonLongDictType()) # TODO(rakhmerov): We need to get rid of this field at all. - context = sa.Column(st.JsonDictType()) + context = sa.Column(st.JsonLongDictType()) class TaskExecution(Execution): @@ -145,7 +152,7 @@ class TaskExecution(Execution): } # Main properties. - action_spec = sa.Column(st.JsonDictType()) + action_spec = sa.Column(st.JsonLongDictType()) # Whether the task is fully processed (publishing and calculating commands # after it). It allows to simplify workflow controller implementations @@ -154,7 +161,7 @@ class TaskExecution(Execution): # Data Flow properties. in_context = sa.Column(st.JsonLongDictType()) - published = sa.Column(st.JsonDictType()) + published = sa.Column(st.JsonLongDictType()) for cls in utils.iter_subclasses(Execution): @@ -166,6 +173,40 @@ for cls in utils.iter_subclasses(Execution): retval=True ) + +def validate_long_type_length(cls, field_name, value): + """Makes sure the value does not exceeds the maximum size.""" + if value: + # Get the configured limit. + size_limit_kb = cfg.CONF.engine.execution_field_size_limit_kb + + # If the size is unlimited. + if (size_limit_kb < 0): + return + + size_kb = sys.getsizeof(str(value)) / 1024 + if (size_kb > size_limit_kb): + LOG.error( + "Size limit %dKB exceed for class [%s], " + "field %s of size %dKB.", + size_limit_kb, str(cls), field_name, size_kb + ) + raise exc.SizeLimitExceededException(field_name, size_kb, + size_limit_kb) + + +def register_length_validator(attr_name): + """Register an event listener on the attribute that will + validate the size every time a 'set' occurs. + """ + for cls in utils.iter_subclasses(Execution): + if hasattr(cls, attr_name): + event.listen( + getattr(cls, attr_name), + 'set', + lambda t, v, o, i: validate_long_type_length(cls, attr_name, v) + ) + # Many-to-one for 'Execution' and 'TaskExecution'. Execution.task_execution_id = sa.Column( @@ -296,3 +337,8 @@ class CronTrigger(mb.MistralSecureModelBase): # Register all hooks related to secure models. mb.register_secure_model_hooks() + +# Register an event listener to verify that the size of all the long columns +# affected by the user do not exceed the limit configuration. +for attr_name in ['input', 'output', 'params', 'published']: + register_length_validator(attr_name) diff --git a/mistral/exceptions.py b/mistral/exceptions.py index 0dc7ddf1e..fdbcab172 100644 --- a/mistral/exceptions.py +++ b/mistral/exceptions.py @@ -110,3 +110,12 @@ class InvalidModelException(DSLParsingException): class InvalidResultException(MistralException): http_code = 400 message = "Unable to parse result" + + +class SizeLimitExceededException(MistralException): + http_code = 400 + + def __init__(self, field_name, size_kb, size_limit_kb): + super(SizeLimitExceededException, self).__init__( + "Size of '%s' is %dKB which exceeds the limit of %dKB" + % (field_name, size_kb, size_limit_kb)) diff --git a/mistral/tests/unit/engine/test_execution_fields_size_limitation.py b/mistral/tests/unit/engine/test_execution_fields_size_limitation.py new file mode 100644 index 000000000..70f555f12 --- /dev/null +++ b/mistral/tests/unit/engine/test_execution_fields_size_limitation.py @@ -0,0 +1,187 @@ +# Copyright 2015 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg +from oslo_log import log as logging +import testtools + +from mistral.actions import base as actions_base +from mistral.db.v2 import api as db_api +from mistral import exceptions as exc +from mistral.services import workflows as wf_service +from mistral.tests import base as test_base +from mistral.tests.unit.engine import base +from mistral.workflow import utils as wf_utils + +LOG = logging.getLogger(__name__) + +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') + +WF = """ +--- +version: '2.0' + +wf: + input: + - workflow_input: '__WORKFLOW_INPUT__' + - action_output_length: 0 + tasks: + task1: + action: my_action + input: + action_input: '__ACTION_INPUT__' + action_output_length: <% $.action_output_length %> + publish: + p_var: '__TASK_PUBLISHED__' +""" + + +class MyAction(actions_base.Action): + def __init__(self, action_input, action_output_length): + self.action_input = action_input + self.action_output_length = action_output_length + + def run(self): + return wf_utils.Result( + data=''.join('A' for _ in range(self.action_output_length)) + ) + + def test(self): + raise NotImplementedError + + +def expect_size_limit_exception(field_name): + def logger(test_func): + def wrapped(*args, **kwargs): + with testtools.ExpectedException(exc.SizeLimitExceededException, + value_re="Size of '%s' is 1KB " + "which exceeds the limit" + " of 0KB" % field_name): + return test_func(*args, **kwargs) + + return wrapped + + return logger + + +def generate_workflow(tokens): + new_wf = WF + long_string = ''.join('A' for _ in range(1024)) + for token in tokens: + new_wf = new_wf.replace(token, long_string) + return new_wf + + +class ExecutionFieldsSizeLimitTest(base.EngineTestCase): + def setUp(self): + """Resets the size limit config between tests""" + super(ExecutionFieldsSizeLimitTest, self).setUp() + cfg.CONF.set_default('execution_field_size_limit_kb', 0, + group='engine') + test_base.register_action_class('my_action', MyAction) + + def test_default_limit(self): + cfg.CONF.set_default('execution_field_size_limit_kb', -1, + group='engine') + + new_wf = generate_workflow( + ['__ACTION_INPUT_', '__WORKFLOW_INPUT__', + '__TASK_PUBLISHED__']) + + wf_service.create_workflows(new_wf) + + # Start workflow. + wf_ex = self.engine.start_workflow('wf', {}) + + self._await(lambda: self.is_execution_success(wf_ex.id)) + + @expect_size_limit_exception('input') + def test_workflow_input_default_value_limit(self): + new_wf = generate_workflow(['__WORKFLOW_INPUT__']) + + wf_service.create_workflows(new_wf) + + # Start workflow. + self.engine.start_workflow('wf', {}) + + @expect_size_limit_exception('input') + def test_workflow_input_limit(self): + wf_service.create_workflows(WF) + + # Start workflow. + self.engine.start_workflow( + 'wf', + { + 'workflow_input': ''.join('A' for _ in range(1024)) + } + ) + + @expect_size_limit_exception('input') + def test_action_input_limit(self): + new_wf = generate_workflow(['__ACTION_INPUT__']) + + wf_service.create_workflows(new_wf) + + # Start workflow. + self.engine.start_workflow('wf', {}) + + def test_action_output_limit(self): + wf_service.create_workflows(WF) + + # Start workflow. + wf_ex = self.engine.start_workflow('wf', { + 'action_output_length': 1024 + }) + + self._await(lambda: self.is_execution_error(wf_ex.id)) + + # Note: We need to reread execution to access related tasks. + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual("Size of 'output' is 1KB which exceeds " + "the limit of 0KB", + wf_ex.state_info) + + def test_task_published_limit(self): + new_wf = generate_workflow(['__TASK_PUBLISHED__']) + + wf_service.create_workflows(new_wf) + + # Start workflow. + wf_ex = self.engine.start_workflow('wf', {}) + + self._await(lambda: self.is_execution_error(wf_ex.id)) + + # Note: We need to reread execution to access related tasks. + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual("Size of 'published' is 1KB which exceeds " + "the limit of 0KB", + wf_ex.state_info) + + @expect_size_limit_exception('params') + def test_workflow_params_limit(self): + wf_service.create_workflows(WF) + + # Start workflow. + long_string = ''.join('A' for _ in range(1024)) + self.engine.start_workflow('wf', {}, '', env={'param': long_string}) + + def tearDown(self): + """Restores the size limit config to default""" + super(ExecutionFieldsSizeLimitTest, self).tearDown() + cfg.CONF.set_default('execution_field_size_limit_kb', 1024, + group='engine')