Add db models for event trigger

This patch contains the Mistral database and db_api changes
required for the event triggers.

Change-Id: Ib930ef8aac3792ed415e2817f67226962e6f7a92
Implements: blueprint event-notification-trigger
Co-Authored-By: Lingxian Kong <anlin.kong@gmail.com>
This commit is contained in:
Daryl Mowrer
2016-05-24 09:38:22 -05:00
committed by Lingxian Kong
parent 0dc00b3992
commit 54bbe3d4f1
7 changed files with 465 additions and 94 deletions

View File

@@ -0,0 +1,67 @@
# Copyright 2016 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""add event triggers table
Revision ID: 012
Revises: 011
Create Date: 2016-03-04 09:49:52.481791
"""
# revision identifiers, used by Alembic.
revision = '012'
down_revision = '011'
from alembic import op
import sqlalchemy as sa
from mistral.db.sqlalchemy import types as st
def upgrade():
op.create_table(
'event_triggers_v2',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('project_id', sa.String(length=80), nullable=True),
sa.Column('scope', sa.String(length=80), nullable=True),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('name', sa.String(length=200), nullable=True),
sa.Column('workflow_id', sa.String(length=36), nullable=False),
sa.Column('exchange', sa.String(length=80), nullable=False),
sa.Column('topic', sa.String(length=80), nullable=False),
sa.Column('event', sa.String(length=80), nullable=False),
sa.Column('workflow_params', st.JsonEncoded(), nullable=True),
sa.Column('workflow_input', st.JsonEncoded(), nullable=True),
sa.Column('trust_id', sa.String(length=80), nullable=True),
sa.ForeignKeyConstraint(
['workflow_id'],
[u'workflow_definitions_v2.id'],
),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint(
'exchange',
'topic',
'event',
'workflow_id',
'project_id'
),
sa.Index(
'event_triggers_v2_project_id_workflow_id',
'project_id', 'workflow_id'
)
)

View File

@@ -505,3 +505,42 @@ def delete_resource_member(resource_id, res_type, member_id):
def delete_resource_members(**kwargs):
IMPL.delete_resource_members(**kwargs)
# Event triggers.
def get_event_trigger(id, insecure=False):
return IMPL.get_event_trigger(id, insecure)
def get_event_triggers(insecure=False, limit=None, marker=None, sort_keys=None,
sort_dirs=None, fields=None, **kwargs):
return IMPL.get_event_triggers(
insecure=False,
limit=limit,
marker=marker,
sort_keys=sort_keys,
sort_dirs=sort_dirs,
fields=fields,
**kwargs
)
def create_event_trigger(values):
return IMPL.create_event_trigger(values)
def update_event_trigger(id, values):
return IMPL.update_event_trigger(id, values)
def delete_event_trigger(id):
return IMPL.delete_event_trigger(id)
def delete_event_triggers(**kwargs):
return IMPL.delete_event_triggers(**kwargs)
def ensure_event_trigger_exists(id):
return IMPL.ensure_event_trigger_exists(id)

View File

@@ -31,6 +31,7 @@ from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
from mistral.services import security
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@@ -110,6 +111,9 @@ def _lock_entity(model, id):
def _secure_query(model, *columns):
query = b.model_query(model, columns)
if not issubclass(model, mb.MistralSecureModelBase):
return query
shared_res_ids = []
res_type = RESOURCE_MAPPING.get(model, '')
@@ -117,15 +121,21 @@ def _secure_query(model, *columns):
shared_res = _get_accepted_resources(res_type)
shared_res_ids = [res.resource_id for res in shared_res]
if issubclass(model, mb.MistralSecureModelBase):
query = query.filter(
sa.or_(
model.project_id == security.get_project_id(),
model.scope == 'public',
model.id.in_(shared_res_ids)
)
query_criterion = sa.or_(
model.project_id == security.get_project_id(),
model.scope == 'public'
)
# NOTE(kong): Include IN_ predicate in query filter only if shared_res_ids
# is not empty to avoid sqlalchemy SAWarning and wasting a db call.
if shared_res_ids:
query_criterion = sa.or_(
query_criterion,
model.id.in_(shared_res_ids)
)
query = query.filter(query_criterion)
return query
@@ -147,32 +157,34 @@ def _paginate_query(model, limit=None, marker=None, sort_keys=None,
def _delete_all(model, session=None, **kwargs):
# NOTE(lane): Because we use 'in_' operator in _secure_query(), delete()
# NOTE(kong): Because we use 'in_' operator in _secure_query(), delete()
# method will raise error with default parameter. Please refer to
# http://docs.sqlalchemy.org/en/rel_1_0/orm/query.html#sqlalchemy.orm.query.Query.delete
_secure_query(model).filter_by(**kwargs).delete(synchronize_session=False)
def _get_collection(model, limit=None, marker=None, sort_keys=None,
sort_dirs=None, fields=None, query=None, **kwargs):
def _get_collection(model, insecure=False, limit=None, marker=None,
sort_keys=None, sort_dirs=None, fields=None, **kwargs):
columns = (
tuple([getattr(model, f) for f in fields if hasattr(model, f)])
if fields else ()
)
if query is None:
tags = kwargs.pop('tags', None)
query = _secure_query(model, *columns).filter_by(**kwargs)
tags = kwargs.pop('tags', None)
# To match the tag list, a resource must contain at least all of the
# tags present in the filter parameter.
if tags:
tag_attr = getattr(model, 'tags')
if len(tags) == 1:
expr = tag_attr.contains(tags)
else:
expr = sa.and_(*[tag_attr.contains(tag) for tag in tags])
query = query.filter(expr)
query = (b.model_query(model, *columns) if insecure
else _secure_query(model, *columns))
query = query.filter_by(**kwargs)
# To match the tag list, a resource must contain at least all of the
# tags present in the filter parameter.
if tags:
tag_attr = getattr(model, 'tags')
if len(tags) == 1:
expr = tag_attr.contains(tags)
else:
expr = sa.and_(*[tag_attr.contains(tag) for tag in tags])
query = query.filter(expr)
try:
return _paginate_query(
@@ -184,36 +196,32 @@ def _get_collection(model, limit=None, marker=None, sort_keys=None,
query
)
except Exception as e:
raise exc.DBQueryEntryException(
raise exc.DBQueryEntryError(
"Failed when querying database, error type: %s, "
"error message: %s" % (e.__class__.__name__, e.message)
)
def _get_collection_sorted_by_name(model, fields=None, sort_keys=['name'],
**kwargs):
# Note(lane): Sometimes tenant_A needs to get resources of tenant_B,
# especially in resource sharing scenario, the resource owner needs to
# check if the resource is used by a member.
columns = (
tuple([getattr(model, f) for f in fields if hasattr(model, f)])
if fields else ()
)
query = (b.model_query(model, *columns) if 'project_id' in kwargs
else _secure_query(model, *columns))
def _get_collection_sorted_by_name(model, insecure=False, fields=None,
sort_keys=['name'], **kwargs):
return _get_collection(
model=model,
query=query,
insecure=insecure,
sort_keys=sort_keys,
fields=fields,
**kwargs
)
def _get_collection_sorted_by_time(model, sort_keys=['created_at'], **kwargs):
return _get_collection(model, sort_keys=sort_keys, **kwargs)
def _get_collection_sorted_by_time(model, insecure=False, fields=None,
sort_keys=['created_at'], **kwargs):
return _get_collection(
model=model,
insecure=insecure,
sort_keys=sort_keys,
fields=fields,
**kwargs
)
def _get_db_object_by_name(model, name):
@@ -345,7 +353,7 @@ def get_workflow_definitions(sort_keys=['created_at'], fields=None, **kwargs):
fields.remove('input')
fields.append('spec')
return _get_collection(
return _get_collection_sorted_by_name(
model=models.WorkflowDefinition,
sort_keys=sort_keys,
fields=fields,
@@ -385,16 +393,29 @@ def update_workflow_definition(identifier, values, session=None):
)
if wf_def.scope == 'public' and values['scope'] == 'private':
cron_triggers = _get_associated_cron_triggers(identifier)
# Check cron triggers.
cron_triggers = get_cron_triggers(insecure=True, workflow_id=wf_def.id)
try:
[get_cron_trigger(name) for name in cron_triggers]
except exc.DBEntityNotFoundError:
raise exc.NotAllowedException(
"Can not update scope of workflow that has triggers "
"associated in other tenants."
"[workflow_identifier=%s]" % identifier
)
for c_t in cron_triggers:
if c_t.project_id != wf_def.project_id:
raise exc.NotAllowedException(
"Can not update scope of workflow that has cron triggers "
"associated in other tenants. [workflow_identifier=%s]" %
identifier
)
# Check event triggers.
event_triggers = get_event_triggers(
insecure=True,
workflow_id=wf_def.id
)
for e_t in event_triggers:
if e_t.project_id != wf_def.project_id:
raise exc.NotAllowedException(
"Can not update scope of workflow that has event triggers "
"associated in other tenants. [workflow_identifier=%s]" %
identifier
)
wf_def.update(values.copy())
@@ -423,13 +444,21 @@ def delete_workflow_definition(identifier, session=None):
msg = "Attempt to delete a system workflow: %s" % identifier
raise exc.DataAccessException(msg)
cron_triggers = _get_associated_cron_triggers(identifier)
cron_triggers = get_cron_triggers(insecure=True, workflow_id=wf_def.id)
if cron_triggers:
raise exc.DBError(
"Can't delete workflow that has triggers associated. "
"[workflow_identifier=%s], [cron_trigger_name(s)=%s]" %
(identifier, ', '.join(cron_triggers))
"Can't delete workflow that has cron triggers associated. "
"[workflow_identifier=%s], [cron_trigger_id(s)=%s]" %
(identifier, ', '.join([t.id for t in cron_triggers]))
)
event_triggers = get_event_triggers(insecure=True, workflow_id=wf_def.id)
if event_triggers:
raise exc.DBError(
"Can't delete workflow that has event triggers associated. "
"[workflow_identifier=%s], [event_trigger_id(s)=%s]" %
(identifier, ', '.join([t.id for t in event_triggers]))
)
# Delete workflow members first.
@@ -438,21 +467,6 @@ def delete_workflow_definition(identifier, session=None):
session.delete(wf_def)
def _get_associated_cron_triggers(wf_identifier):
criterion = (
{'workflow_id': wf_identifier}
if uuidutils.is_uuid_like(wf_identifier)
else {'workflow_name': wf_identifier}
)
cron_triggers = b.model_query(
models.CronTrigger,
[models.CronTrigger.name]
).filter_by(**criterion).all()
return [t[0] for t in cron_triggers]
@b.session_aware()
def delete_workflow_definitions(**kwargs):
return _delete_all(models.WorkflowDefinition, **kwargs)
@@ -494,10 +508,9 @@ def load_action_definition(name):
return _get_action_definition(name)
def get_action_definitions(sort_keys=['name'], **kwargs):
return _get_collection(
def get_action_definitions(**kwargs):
return _get_collection_sorted_by_name(
model=models.ActionDefinition,
sort_keys=sort_keys,
**kwargs
)
@@ -758,10 +771,9 @@ def ensure_workflow_execution_exists(id):
get_workflow_execution(id)
def get_workflow_executions(sort_keys=['created_at'], **kwargs):
return _get_collection(
def get_workflow_executions(**kwargs):
return _get_collection_sorted_by_time(
models.WorkflowExecution,
sort_keys=sort_keys,
**kwargs
)
@@ -1030,8 +1042,12 @@ def load_cron_trigger(name):
return _get_cron_trigger(name)
def get_cron_triggers(**kwargs):
return _get_collection_sorted_by_name(models.CronTrigger, **kwargs)
def get_cron_triggers(insecure=False, **kwargs):
return _get_collection_sorted_by_name(
models.CronTrigger,
insecure=insecure,
**kwargs
)
@b.session_aware()
@@ -1143,17 +1159,6 @@ def _get_cron_trigger(name):
return _get_db_object_by_name(models.CronTrigger, name)
def _get_cron_triggers(*columns, **kwargs):
query = b.model_query(models.CronTrigger)
return _get_collection(
models.CronTrigger,
query=query,
*columns,
**kwargs
)
# Environments.
def get_environment(name):
@@ -1371,7 +1376,7 @@ def delete_resource_member(resource_id, res_type, member_id, session=None):
(resource_id, member_id)
)
# TODO(lane): Check association with cron triggers when deleting a workflow
# TODO(kong): Check association with cron triggers when deleting a workflow
# member which is in 'accepted' status.
session.delete(res_member)
@@ -1392,3 +1397,85 @@ def _get_accepted_resources(res_type):
).all()
return resources
# Event triggers.
def get_event_trigger(id, insecure=False):
event_trigger = _get_event_trigger(id, insecure)
if not event_trigger:
raise exc.DBEntityNotFoundError(
"Event trigger not found [id=%s]." % id
)
return event_trigger
def get_event_triggers(insecure=False, **kwargs):
return _get_collection_sorted_by_time(
model=models.EventTrigger,
insecure=insecure,
**kwargs
)
@b.session_aware()
def create_event_trigger(values, session=None):
event_trigger = models.EventTrigger()
event_trigger.update(values)
try:
event_trigger.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntryError(
"Duplicate entry for event trigger %s: %s"
% (event_trigger.id, e.columns)
)
# TODO(nmakhotkin): Remove this 'except' after fixing
# https://bugs.launchpad.net/oslo.db/+bug/1458583.
except db_exc.DBError as e:
raise exc.DBDuplicateEntryError(
"Duplicate entry for event trigger: %s" % e
)
return event_trigger
@b.session_aware()
def update_event_trigger(id, values, session=None):
event_trigger = _get_event_trigger(id)
if not event_trigger:
raise exc.DBEntityNotFoundError("Event trigger not found [id=%s]" % id)
event_trigger.update(values.copy())
return event_trigger
@b.session_aware()
def delete_event_trigger(id, session=None):
event_trigger = _get_event_trigger(id)
if not event_trigger:
raise exc.DBEntityNotFoundError("Event trigger not found [id=%s]" % id)
session.delete(event_trigger)
@b.session_aware()
def delete_event_triggers(**kwargs):
return _delete_all(models.EventTrigger, **kwargs)
def _get_event_trigger(id, insecure=False):
if insecure:
return b.model_query(models.EventTrigger).filter_by(id=id).first()
else:
return _get_db_object_by_id(models.EventTrigger, id)
def ensure_event_trigger_exists(id):
get_event_trigger(id)

View File

@@ -424,3 +424,32 @@ class ResourceMember(mb.MistralModelBase):
project_id = sa.Column(sa.String(80), default=security.get_project_id)
member_id = sa.Column(sa.String(80), nullable=False)
status = sa.Column(sa.String(20), nullable=False, default="pending")
class EventTrigger(mb.MistralSecureModelBase):
"""Contains info about event triggers."""
__tablename__ = 'event_triggers_v2'
__table_args__ = (
sa.UniqueConstraint('exchange', 'topic', 'event', 'workflow_id',
'project_id'),
sa.Index('%s_project_id_workflow_id' % __tablename__, 'project_id',
'workflow_id'),
)
id = mb.id_column()
name = sa.Column(sa.String(200))
workflow_id = sa.Column(
sa.String(36),
sa.ForeignKey(WorkflowDefinition.id)
)
workflow_params = sa.Column(st.JsonDictType())
workflow_input = sa.Column(st.JsonDictType())
exchange = sa.Column(sa.String(80), nullable=False)
topic = sa.Column(sa.String(80), nullable=False)
event = sa.Column(sa.String(80), nullable=False)
trust_id = sa.Column(sa.String(80))

View File

@@ -245,11 +245,12 @@ class DbTestCase(BaseTest):
with mock.patch('mistral.services.security.get_project_id',
new=mock.MagicMock(return_value=ctx.project_id)):
with db_api_v2.transaction():
db_api_v2.delete_event_triggers()
db_api_v2.delete_executions()
db_api_v2.delete_workbooks()
db_api_v2.delete_cron_triggers()
db_api_v2.delete_workflow_definitions()
db_api_v2.delete_environments(),
db_api_v2.delete_environments()
db_api_v2.delete_resource_members()
sqlite_lock.cleanup()

View File

@@ -372,7 +372,9 @@ class WorkflowDefinitionTest(SQLAlchemyTest):
# Create a new user.
auth_context.set_ctx(test_base.get_context(default=False))
db_api.create_cron_trigger(CRON_TRIGGER)
cron_trigger = copy.copy(CRON_TRIGGER)
cron_trigger['workflow_id'] = created.id
db_api.create_cron_trigger(cron_trigger)
auth_context.set_ctx(test_base.get_context(default=True))
@@ -383,10 +385,50 @@ class WorkflowDefinitionTest(SQLAlchemyTest):
{'scope': 'private'}
)
def test_update_wf_scope_event_trigger_associated_in_diff_tenant(self):
created = db_api.create_workflow_definition(WF_DEFINITIONS[0])
# Switch to another user.
auth_context.set_ctx(test_base.get_context(default=False))
event_trigger = copy.copy(EVENT_TRIGGERS[0])
event_trigger.update({'workflow_id': created.id})
db_api.create_event_trigger(event_trigger)
# Switch back.
auth_context.set_ctx(test_base.get_context(default=True))
self.assertRaises(
exc.NotAllowedException,
db_api.update_workflow_definition,
created.id,
{'scope': 'private'}
)
def test_update_wf_scope_event_trigger_associated_in_same_tenant(self):
created = db_api.create_workflow_definition(WF_DEFINITIONS[0])
event_trigger = copy.copy(EVENT_TRIGGERS[0])
event_trigger.update({'workflow_id': created.id})
db_api.create_event_trigger(event_trigger)
updated = db_api.update_workflow_definition(
created.id,
{'scope': 'private'}
)
self.assertEqual('private', updated.scope)
def test_update_wf_scope_cron_trigger_associated_in_same_tenant(self):
created = db_api.create_workflow_definition(WF_DEFINITIONS[0])
db_api.create_cron_trigger(CRON_TRIGGER)
cron_trigger = copy.copy(CRON_TRIGGER)
cron_trigger.update({'workflow_id': created.id})
db_api.create_cron_trigger(cron_trigger)
updated = db_api.update_workflow_definition(
created['name'],
{'scope': 'private'}
@@ -429,6 +471,22 @@ class WorkflowDefinitionTest(SQLAlchemyTest):
identifier
)
def test_delete_workflow_definition_has_event_trigger(self):
created = db_api.create_workflow_definition(WF_DEFINITIONS[1])
event_trigger = copy.copy(EVENT_TRIGGERS[0])
event_trigger['workflow_id'] = created.id
trigger = db_api.create_event_trigger(event_trigger)
self.assertEqual(trigger.workflow_id, created.id)
self.assertRaises(
exc.DBError,
db_api.delete_workflow_definition,
created.id
)
def test_delete_other_project_workflow_definition(self):
created = db_api.create_workflow_definition(WF_DEFINITIONS[0])
@@ -483,7 +541,7 @@ class WorkflowDefinitionTest(SQLAlchemyTest):
self.assertEqual(1, len(fetched))
self.assertEqual(created0, fetched[0])
self.assertEqual('public', created0.scope)
self.assertEqual('public', fetched[0].scope)
def test_workflow_definition_repr(self):
s = db_api.create_workflow_definition(WF_DEFINITIONS[0]).__repr__()
@@ -1284,6 +1342,7 @@ class CronTriggerTest(SQLAlchemyTest):
auth_context.set_ctx(user_context)
fetched = db_api.get_cron_triggers(
insecure=True,
pattern='* * * * *',
project_id=security.DEFAULT_PROJECT_ID
)
@@ -1876,3 +1935,92 @@ class WorkflowSharingTest(SQLAlchemyTest):
db_api.delete_workflow_definition,
wf.id
)
EVENT_TRIGGERS = [
{
'name': 'trigger1',
'workflow_id': '',
'workflow_input': {},
'workflow_params': {},
'exchange': 'openstack',
'topic': 'notification',
'event': 'compute.create_instance',
},
{
'name': 'trigger2',
'workflow_id': '',
'workflow_input': {},
'workflow_params': {},
'exchange': 'openstack',
'topic': 'notification',
'event': 'compute.delete_instance',
},
]
class EventTriggerTest(SQLAlchemyTest):
def setUp(self):
super(EventTriggerTest, self).setUp()
self.wf = db_api.create_workflow_definition({'name': 'my_wf'})
for et in EVENT_TRIGGERS:
et['workflow_id'] = self.wf.id
def test_create_and_get_event_trigger(self):
created = db_api.create_event_trigger(EVENT_TRIGGERS[0])
fetched = db_api.get_event_trigger(created.id)
self.assertEqual(created, fetched)
def test_get_event_triggers_insecure(self):
for t in EVENT_TRIGGERS:
db_api.create_event_trigger(t)
fetched = db_api.get_event_triggers()
self.assertEqual(2, len(fetched))
def test_get_event_triggers_not_insecure(self):
db_api.create_event_trigger(EVENT_TRIGGERS[0])
# Switch to another tenant.
auth_context.set_ctx(user_context)
db_api.create_event_trigger(EVENT_TRIGGERS[1])
fetched = db_api.get_event_triggers()
self.assertEqual(1, len(fetched))
fetched = db_api.get_event_triggers(insecure=True)
self.assertEqual(2, len(fetched))
def test_update_event_trigger(self):
created = db_api.create_event_trigger(EVENT_TRIGGERS[0])
# Need a new existing workflow for updating event trigger because of
# foreign constraint.
new_wf = db_api.create_workflow_definition({'name': 'my_wf1'})
db_api.update_event_trigger(
created.id,
{'workflow_id': new_wf.id}
)
updated = db_api.get_event_trigger(created.id)
self.assertEqual(new_wf.id, updated.workflow_id)
def test_delete_event_triggers(self):
created = db_api.create_event_trigger(EVENT_TRIGGERS[0])
db_api.delete_event_trigger(created.id)
self.assertRaises(
exc.DBEntityNotFoundError,
db_api.get_event_trigger,
created.id
)

View File

@@ -270,7 +270,7 @@ class WorkflowTestsV2(base.TestCase):
)
self.assertIn(
"Can't delete workflow that has triggers associated",
"Can't delete workflow that has cron triggers associated",
exception.resp_body['faultstring']
)
finally: