Remove kombu RPC driver

Since last release, we decided to get rid of kombu RPC driver in
mistral.
Kombu RPC driver was experimental and not widely used.

Having only one driver will simplify global mistral maintenance.

Change-Id: Ia847d4383af54fcc33f039ddd6dc122c58b1f126
Signed-off-by: Arnaud M <arnaud.morin@gmail.com>
This commit is contained in:
Arnaud M
2025-04-24 21:57:13 +02:00
parent 585edca5ba
commit 715fb3db76
22 changed files with 9 additions and 1739 deletions

View File

@@ -111,7 +111,6 @@
- mistral-devstack-tempest-ipv6-only
- mistral-devstack-non-apache-tempest-ipv6-only
- mistral-devstack-non-apache
- mistral-devstack-kombu
# Disable mysql / postgresql units as they are not working as expected
# - mistral-tox-unit-mysql
# - mistral-tox-unit-postgresql
@@ -123,7 +122,6 @@
- mistral-devstack-tempest-ipv6-only
- mistral-devstack-non-apache-tempest-ipv6-only
- mistral-devstack-non-apache
- mistral-devstack-kombu
# Disable mysql / postgresql units as they are not working as expected
# - mistral-tox-unit-mysql
# - mistral-tox-unit-postgresql

View File

@@ -193,17 +193,6 @@ js_impl_opt = cfg.StrOpt(
'action to evaluate scripts.')
)
rpc_impl_opt = cfg.StrOpt(
'rpc_implementation',
default='oslo',
choices=['oslo', 'kombu'],
deprecated_for_removal=True,
deprecated_reason='Kombu driver is deprecated and will be removed '
'in the F release cycle',
help=_('Specifies RPC implementation for RPC client and server. '
'Support of kombu driver is experimental.')
)
oslo_rpc_executor = cfg.StrOpt(
'oslo_rpc_executor',
default='threading',
@@ -823,7 +812,6 @@ CONF.register_opt(wf_trace_log_name_opt)
CONF.register_opt(auth_type_opt)
CONF.register_opt(scheduler_type_opt)
CONF.register_opt(js_impl_opt)
CONF.register_opt(rpc_impl_opt)
CONF.register_opt(oslo_rpc_executor)
CONF.register_opt(expiration_token_duration)
CONF.register_opts(service_opts.service_opts)
@@ -868,7 +856,6 @@ default_group_opts = CLI_OPTS + [
auth_type_opt,
scheduler_type_opt,
js_impl_opt,
rpc_impl_opt,
oslo_rpc_executor,
expiration_token_duration
]

View File

@@ -42,9 +42,6 @@ def cleanup():
_TRANSPORT = None
# TODO(rakhmerov): This method seems misplaced. Now we have different kind
# of transports (oslo, kombu) and this module should not have any oslo
# specific things anymore.
def get_transport():
global _TRANSPORT
@@ -54,27 +51,25 @@ def get_transport():
return _TRANSPORT
# TODO(amorin) maybe refactor this since we have only one impl now
def get_rpc_server_driver():
rpc_impl = cfg.CONF.rpc_implementation
global _IMPL_SERVER
if not _IMPL_SERVER:
_IMPL_SERVER = driver.DriverManager(
'mistral.rpc.backends',
'%s_server' % rpc_impl
'oslo_server'
).driver
return _IMPL_SERVER
# TODO(amorin) maybe refactor this since we have only one impl now
def get_rpc_client_driver():
rpc_impl = cfg.CONF.rpc_implementation
global _IMPL_CLIENT
if not _IMPL_CLIENT:
_IMPL_CLIENT = driver.DriverManager(
'mistral.rpc.backends',
'%s_client' % rpc_impl
'oslo_client'
).driver
return _IMPL_CLIENT

View File

@@ -1,149 +0,0 @@
# Copyright 2015 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import kombu
from mistral_lib import serialization as mistral_serialization
import oslo_messaging as messaging
from mistral import config as cfg
from mistral import exceptions as exc
IS_RECEIVED = 'kombu_rpc_is_received'
RESULT = 'kombu_rpc_result'
CORR_ID = 'kombu_rpc_correlation_id'
TYPE = 'kombu_rpc_type'
CONF = cfg.CONF
def set_transport_options(check_backend=True):
# We can be sure that all needed transport options are registered
# only if we at least once called method get_transport(). Because
# this is the method that registers them.
messaging.get_transport(CONF)
backend = messaging.TransportURL.parse(CONF, CONF.transport_url).transport
if check_backend and backend not in ['rabbit', 'kombu']:
raise exc.MistralException("Unsupported backend: %s" % backend)
class Base(object):
"""Base class for Client and Server."""
def __init__(self):
self.serializer = None
@staticmethod
def _make_connection(amqp_host, amqp_port, amqp_user, amqp_password,
amqp_vhost):
"""Create connection.
This method creates object representing the connection to RabbitMQ.
:param amqp_host: Address of RabbitMQ server.
:param amqp_user: Username for connecting to RabbitMQ.
:param amqp_password: Password matching the given username.
:param amqp_vhost: Virtual host to connect to.
:param amqp_port: Port of RabbitMQ server.
:return: New connection to RabbitMQ.
"""
return kombu.BrokerConnection(
hostname=amqp_host,
userid=amqp_user,
password=amqp_password,
virtual_host=amqp_vhost,
port=amqp_port,
transport_options={'confirm_publish': True}
)
@staticmethod
def _make_exchange(name, durable=False, auto_delete=True,
exchange_type='topic'):
"""Make named exchange.
This method creates object representing exchange on RabbitMQ. It would
create a new exchange if exchange with given name don't exists.
:param name: Name of the exchange.
:param durable: If set to True, messages on this exchange would be
store on disk - therefore can be retrieve after
failure.
:param auto_delete: If set to True, exchange would be automatically
deleted when none is connected.
:param exchange_type: Type of the exchange. Can be one of 'direct',
'topic', 'fanout', 'headers'. See Kombu docs for
further details.
:return: Kombu exchange object.
"""
return kombu.Exchange(
name=name,
type=exchange_type,
durable=durable,
auto_delete=auto_delete
)
@staticmethod
def _make_queue(name, exchange, routing_key='',
durable=False, auto_delete=True, **kwargs):
"""Make named queue for a given exchange.
This method creates object representing queue in RabbitMQ. It would
create a new queue if queue with given name don't exists.
:param name: Name of the queue
:param exchange: Kombu Exchange object (can be created using
_make_exchange).
:param routing_key: Routing key for queue. It behaves differently
depending on exchange type. See Kombu docs for
further details.
:param durable: If set to True, messages on this queue would be
store on disk - therefore can be retrieve after
failure.
:param auto_delete: If set to True, queue would be automatically
deleted when none is connected.
:param kwargs: See kombu documentation for all parameters than may be
may be passed to Queue.
:return: Kombu Queue object.
"""
return kombu.Queue(
name=name,
routing_key=routing_key,
exchange=exchange,
durable=durable,
auto_delete=auto_delete,
**kwargs
)
def _register_mistral_serialization(self):
"""Adds mistral serializer to available serializers in kombu."""
self.serializer = mistral_serialization.get_polymorphic_serializer()
def _serialize_message(self, kwargs):
result = {}
for argname, arg in kwargs.items():
result[argname] = self.serializer.serialize(arg)
return result
def _deserialize_message(self, kwargs):
result = {}
for argname, arg in kwargs.items():
result[argname] = self.serializer.deserialize(arg)
return result

View File

@@ -1,44 +0,0 @@
# Copyright 2015 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from mistral.rpc.kombu import kombu_client
# Example of using Kombu based RPC client.
def main():
conf = {
'user_id': 'guest',
'password': 'secret',
'exchange': 'my_exchange',
'topic': 'my_topic',
'server_id': 'host',
'host': 'localhost',
'port': 5672,
'virtual_host': '/'
}
kombu_rpc = kombu_client.KombuRPCClient(conf)
print(" [x] Requesting ...")
ctx = type('context', (object,), {'to_dict': lambda self: {}})()
response = kombu_rpc.sync_call(ctx, 'fib', n=44)
print(" [.] Got %r" % (response,))
if __name__ == '__main__':
sys.exit(main())

View File

@@ -1,53 +0,0 @@
# Copyright 2015 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from mistral.rpc.kombu import kombu_server
# Simple example of endpoint of RPC server, which just
# calculates given fibonacci number.
class MyServer(object):
cache = {0: 0, 1: 1}
def fib(self, rpc_ctx, n):
if self.cache.get(n) is None:
self.cache[n] = (self.fib(rpc_ctx, n - 1)
+ self.fib(rpc_ctx, n - 2))
return self.cache[n]
def get_name(self, rpc_ctx):
return self.__class__.__name__
# Example of using Kombu based RPC server.
def main():
conf = {
'user_id': 'guest',
'password': 'secret',
'exchange': 'my_exchange',
'topic': 'my_topic',
'server_id': 'host',
'host': 'localhost',
'port': 5672,
'virtual_host': '/'
}
rpc_server = kombu_server.KombuRPCServer(conf)
rpc_server.register_endpoint(MyServer())
rpc_server.run()
if __name__ == '__main__':
sys.exit(main())

View File

@@ -1,209 +0,0 @@
# Copyright 2015 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
import itertools
import errno
import queue
import kombu
from oslo_log import log as logging
from mistral import config as cfg
from mistral import exceptions as exc
from mistral.rpc import base as rpc_base
from mistral.rpc.kombu import base as kombu_base
from mistral.rpc.kombu import kombu_hosts
from mistral.rpc.kombu import kombu_listener
from mistral_lib import utils
#: When connection to the RabbitMQ server breaks, the
#: client will receive EPIPE socket errors. These indicate
#: an error that may be fixed by retrying. This constant
#: is a guess for how many times the retry may be reasonable
EPIPE_RETRIES = 4
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
def __init__(self, conf):
super(KombuRPCClient, self).__init__(conf)
kombu_base.set_transport_options()
self._register_mistral_serialization()
self.topic = conf.topic
self.server_id = conf.host
hosts = kombu_hosts.KombuHosts(CONF)
self.exchange = CONF.control_exchange
self.durable_queue = CONF.oslo_messaging_rabbit.amqp_durable_queues
self.auto_delete = CONF.oslo_messaging_rabbit.amqp_auto_delete
# NOTE(amorin) let's use a hardcoded 60s value until we deprecate the
# kombu RPC driver
self._timeout = 60
self.routing_key = self.topic
connections = []
for host in hosts.hosts:
conn = self._make_connection(
host.hostname,
host.port,
host.username,
host.password,
hosts.virtual_host
)
connections.append(conn)
self._connections = itertools.cycle(connections)
# Create exchange.
exchange = self._make_exchange(
self.exchange,
durable=self.durable_queue,
auto_delete=self.auto_delete
)
# Create queue.
self.queue_name = utils.generate_unicode_uuid()
self.callback_queue = kombu.Queue(
self.queue_name,
exchange=exchange,
routing_key=self.queue_name,
durable=False,
exclusive=True,
auto_delete=True
)
self._listener = kombu_listener.KombuRPCListener(
connections=self._connections,
callback_queue=self.callback_queue
)
self._listener.start()
def _wait_for_result(self, correlation_id):
"""Waits for the result from the server.
Waits for the result from the server, checks every second if
a timeout occurred. If a timeout occurred - the `RpcTimeout` exception
will be raised.
"""
try:
return self._listener.get_result(correlation_id, self._timeout)
except queue.Empty:
raise exc.MistralException(
"RPC Request timeout, correlation_id = %s" % correlation_id
)
def _call(self, ctx, method, target, async_=False, **kwargs):
"""Performs a remote call for the given method.
:param ctx: authentication context associated with mistral
:param method: name of the method that should be executed
:param kwargs: keyword parameters for the remote-method
:param target: Server name
:param async: bool value means whether the request is
asynchronous or not.
:return: result of the method or None if async.
"""
correlation_id = utils.generate_unicode_uuid()
body = {
'rpc_ctx': ctx.to_dict(),
'rpc_method': method,
'arguments': self._serialize_message(kwargs),
'async': async_
}
LOG.debug("Publish request: %s", body)
try:
if not async_:
self._listener.add_listener(correlation_id)
# Publish request.
for retry_round in range(EPIPE_RETRIES):
if self._publish_request(body, correlation_id):
break
# Start waiting for response.
if async_:
return
LOG.debug(
"Waiting a reply for sync call [reply_to = %s]",
self.queue_name
)
result = self._wait_for_result(correlation_id)
res_type = result[kombu_base.TYPE]
res_object = result[kombu_base.RESULT]
if res_type == 'error':
raise res_object
else:
res_object = self._deserialize_message(res_object)['body']
finally:
if not async_:
self._listener.remove_listener(correlation_id)
return res_object
def _publish_request(self, body, correlation_id):
"""Publishes the request message
.. note::
The :const:`errno.EPIPE` socket errors are suppressed
and result in False being returned. This is because
this type of error can usually be fixed by retrying.
:param body: message body
:param correlation_id: correlation id
:return: True if publish succeeded, False otherwise
:rtype: bool
"""
try:
conn = self._listener.wait_ready()
if conn:
with kombu.producers[conn].acquire(block=True) as producer:
producer.publish(
body=body,
exchange=self.exchange,
routing_key=self.topic,
reply_to=self.queue_name,
correlation_id=correlation_id,
delivery_mode=2
)
return True
except socket.error as e:
if e.errno != errno.EPIPE:
raise
else:
LOG.debug('Retrying publish due to broker connection failure')
return False
def sync_call(self, ctx, method, target=None, **kwargs):
return self._call(ctx, method, async_=False, target=target, **kwargs)
def async_call(self, ctx, method, target=None, fanout=False, **kwargs):
return self._call(ctx, method, async_=True, target=target, **kwargs)

View File

@@ -1,35 +0,0 @@
# Copyright (c) 2017 Intel Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import random
import oslo_messaging as messaging
class KombuHosts(object):
def __init__(self, conf):
transport_url = messaging.TransportURL.parse(conf, conf.transport_url)
self.virtual_host = transport_url.virtual_host
self.hosts = transport_url.hosts
if len(self.hosts) > 1:
random.shuffle(self.hosts)
self._hosts_cycle = itertools.cycle(self.hosts)
def get_host(self):
return next(self._hosts_cycle)

View File

@@ -1,127 +0,0 @@
# Copyright (c) 2016 Intel Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
from kombu.mixins import ConsumerMixin
import queue
import threading
from oslo_log import log as logging
from mistral.rpc.kombu import base as kombu_base
LOG = logging.getLogger(__name__)
class KombuRPCListener(ConsumerMixin):
def __init__(self, connections, callback_queue):
self._results = {}
self._connections = itertools.cycle(connections)
self._callback_queue = callback_queue
self._thread = None
self.connection = next(self._connections)
self.ready = threading.Event()
def add_listener(self, correlation_id):
self._results[correlation_id] = queue.Queue()
def remove_listener(self, correlation_id):
if correlation_id in self._results:
del self._results[correlation_id]
def get_consumers(self, Consumer, channel):
consumers = [Consumer(
self._callback_queue,
callbacks=[self.on_message],
accept=['pickle', 'json']
)]
self.ready.set()
return consumers
def start(self):
if self._thread is None:
self._thread = threading.Thread(target=self.run)
self._thread.daemon = True
self._thread.start()
def on_message(self, response, message):
"""Callback on response.
This method is automatically called when a response is incoming and
decides if it is the message we are waiting for - the message with the
result.
:param response: the body of the amqp message already deserialized
by kombu
:param message: the plain amqp kombu.message with additional
information
"""
LOG.debug("Got response: {}", response)
try:
message.ack()
except Exception as e:
LOG.exception("Failed to acknowledge AMQP message: %s", e)
else:
LOG.debug("AMQP message acknowledged.")
correlation_id = message.properties['correlation_id']
queue = self._results.get(correlation_id)
if queue:
result = {
kombu_base.TYPE: 'error'
if message.properties.get('type') == 'error'
else None,
kombu_base.RESULT: response
}
queue.put(result)
else:
LOG.debug(
"Got a response, but seems like no process is waiting for "
"it [correlation_id={}]", correlation_id
)
def get_result(self, correlation_id, timeout):
return self._results[correlation_id].get(block=True, timeout=timeout)
def on_connection_error(self, exc, interval):
self.ready.clear()
self.connection = next(self._connections)
LOG.debug("Broker connection failed: %s", exc)
LOG.debug(
"Sleeping for %s seconds, then retrying connection",
interval
)
def wait_ready(self, timeout=10.0):
"""Waits for the listener to successfully declare the consumer
:param timeout: timeout for waiting in seconds
:return: same as :func:`~threading.Event.wait`
:rtype: bool
"""
if self.ready.wait(timeout=timeout):
return self.connection
else:
return False

View File

@@ -1,281 +0,0 @@
# Copyright 2015 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import amqp
import socket
import threading
import time
import kombu
from oslo_config import cfg
from oslo_log import log as logging
from stevedore import driver
from mistral import context as auth_ctx
from mistral import exceptions as exc
from mistral.rpc import base as rpc_base
from mistral.rpc.kombu import base as kombu_base
from mistral.rpc.kombu import kombu_hosts
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
def __init__(self, conf):
super(KombuRPCServer, self).__init__(conf)
kombu_base.set_transport_options()
self._register_mistral_serialization()
self.topic = conf.topic
self.server_id = conf.host
self._hosts = kombu_hosts.KombuHosts(CONF)
# NOTE(amorin) let's use a hardcoded value until we deprecate the
# kombu RPC driver
self._executor_threads = 64
self.exchange = CONF.control_exchange
# TODO(rakhmerov): We shouldn't rely on any properties related
# to oslo.messaging. Only "transport_url" should matter.
self.durable_queue = CONF.oslo_messaging_rabbit.amqp_durable_queues
self.auto_delete = CONF.oslo_messaging_rabbit.amqp_auto_delete
self.routing_key = self.topic
self.channel = None
self.conn = None
self._running = threading.Event()
self._stopped = threading.Event()
self.endpoints = []
self._worker = None
self._thread = None
# TODO(ddeja): Those 2 options should be gathered from config.
self._sleep_time = 1
self._max_sleep_time = 10
@property
def is_running(self):
"""Return whether server is running."""
return self._running.is_set()
def run(self, executor='threading'):
if self._thread is None:
self._thread = threading.Thread(target=self._run, args=(executor,))
self._thread.daemon = True
self._thread.start()
def _run(self, executor):
"""Start the server."""
self._prepare_worker(executor)
while True:
try:
_retry_connection = False
host = self._hosts.get_host()
self.conn = self._make_connection(
host.hostname,
host.port,
host.username,
host.password,
self._hosts.virtual_host,
)
conn = kombu.connections[self.conn].acquire(block=True)
exchange = self._make_exchange(
self.exchange,
durable=self.durable_queue,
auto_delete=self.auto_delete
)
queue = self._make_queue(
self.topic,
exchange,
routing_key=self.routing_key,
durable=self.durable_queue,
auto_delete=self.auto_delete
)
with conn.Consumer(
queues=queue,
callbacks=[self._process_message],
) as consumer:
consumer.qos(prefetch_count=1)
self._running.set()
self._stopped.clear()
LOG.info(
"Connected to AMQP at %s:%s",
host.hostname,
host.port
)
self._sleep_time = 1
while self.is_running:
try:
conn.drain_events(timeout=1)
except socket.timeout:
pass
except KeyboardInterrupt:
self.stop()
LOG.info(
"Server with id='%s' stopped.",
self.server_id
)
return
except (socket.error, amqp.exceptions.ConnectionForced) as e:
LOG.debug("Broker connection failed: %s", e)
_retry_connection = True
finally:
self._stopped.set()
if _retry_connection:
LOG.debug(
"Sleeping for %s seconds, then retrying "
"connection",
self._sleep_time
)
time.sleep(self._sleep_time)
self._sleep_time = min(
self._sleep_time * 2,
self._max_sleep_time
)
def stop(self, graceful=False):
self._running.clear()
if graceful:
self.wait()
def wait(self):
self._stopped.wait()
try:
self._worker.shutdown(wait=True)
except AttributeError as e:
LOG.warning("Cannot stop worker in graceful way: %s", e)
def _get_rpc_method(self, method_name):
for endpoint in self.endpoints:
if hasattr(endpoint, method_name):
return getattr(endpoint, method_name)
return None
@staticmethod
def _set_auth_ctx(ctx):
if not isinstance(ctx, dict):
return
context = auth_ctx.MistralContext.from_dict(ctx)
auth_ctx.set_ctx(context)
return context
def publish_message(self, body, reply_to, corr_id, res_type='response'):
if res_type != 'error':
body = self._serialize_message({'body': body})
with kombu.producers[self.conn].acquire(block=True) as producer:
producer.publish(
body=body,
exchange=self.exchange,
routing_key=reply_to,
correlation_id=corr_id,
serializer='pickle' if res_type == 'error' else 'json',
type=res_type
)
def _on_message_safe(self, request, message):
try:
return self._on_message(request, message)
except Exception as e:
LOG.warning(
"Got exception while consuming message. Exception would be "
"send back to the caller."
)
LOG.debug("Exceptions: %s", str(e))
# Wrap exception into another exception for compatibility
# with oslo.
self.publish_message(
exc.KombuException(e),
message.properties['reply_to'],
message.properties['correlation_id'],
res_type='error'
)
finally:
message.ack()
def _on_message(self, request, message):
LOG.debug('Received message %s', request)
is_async = request.get('async', False)
rpc_ctx = request.get('rpc_ctx')
redelivered = message.delivery_info.get('redelivered')
rpc_method_name = request.get('rpc_method')
arguments = self._deserialize_message(request.get('arguments'))
correlation_id = message.properties['correlation_id']
reply_to = message.properties['reply_to']
if redelivered is not None:
rpc_ctx['redelivered'] = redelivered
rpc_context = self._set_auth_ctx(rpc_ctx)
rpc_method = self._get_rpc_method(rpc_method_name)
if not rpc_method:
raise exc.MistralException("No such method: %s" % rpc_method_name)
response = rpc_method(rpc_ctx=rpc_context, **arguments)
if not is_async:
LOG.debug(
"RPC server sent a reply [reply_to = %s, correlation_id = %s",
reply_to,
correlation_id
)
self.publish_message(
response,
reply_to,
correlation_id
)
def register_endpoint(self, endpoint):
self.endpoints.append(endpoint)
def _process_message(self, request, message):
self._worker.submit(self._on_message_safe, request, message)
def _prepare_worker(self, executor='blocking'):
mgr = driver.DriverManager('kombu_driver.executors', executor)
executor_opts = {}
if executor != 'blocking':
executor_opts['max_workers'] = self._executor_threads
self._worker = mgr.driver(**executor_opts)

View File

@@ -1,28 +0,0 @@
# Copyright (c) 2016 Intel Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral import config as cfg
from mistral.rpc.kombu import base as kombu_base
from mistral.tests.unit import base
class KombuTestCase(base.BaseTest):
def setUp(self):
super(KombuTestCase, self).setUp()
kombu_base.set_transport_options(check_backend=False)
cfg.CONF.set_default('transport_url', 'rabbit://localhost:567')

View File

@@ -1,48 +0,0 @@
# Copyright (c) 2016 Intel Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kombu import mixins as mx
from unittest import mock
# Hack for making tests works with kombu listener
mixins = mx
producer = mock.MagicMock()
producers = mock.MagicMock()
producers.__getitem__ = lambda *args, **kwargs: producer
connection = mock.MagicMock()
connections = mock.MagicMock()
connections.__getitem__ = lambda *args, **kwargs: connection
serialization = mock.MagicMock()
def BrokerConnection(*args, **kwargs):
return mock.MagicMock()
def Exchange(*args, **kwargs):
return mock.MagicMock()
def Queue(*args, **kwargs):
return mock.MagicMock()
def Consumer(*args, **kwargs):
return mock.MagicMock()

View File

@@ -1,100 +0,0 @@
# Copyright (c) 2016 Intel Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral import exceptions as exc
from mistral.tests.unit.rpc.kombu import base
from mistral.tests.unit.rpc.kombu import fake_kombu
from unittest import mock
import queue
with mock.patch.dict('sys.modules', kombu=fake_kombu):
from mistral.rpc.kombu import base as kombu_base
from mistral.rpc.kombu import kombu_client
class TestException(exc.MistralException):
pass
class KombuClientTest(base.KombuTestCase):
_RESPONSE = "response"
def setUp(self):
super(KombuClientTest, self).setUp()
conf = mock.MagicMock()
listener_class = kombu_client.kombu_listener.KombuRPCListener
kombu_client.kombu_listener.KombuRPCListener = mock.MagicMock()
def restore_listener():
kombu_client.kombu_listener.KombuRPCListener = listener_class
self.addCleanup(restore_listener)
self.client = kombu_client.KombuRPCClient(conf)
self.ctx = type(
'context',
(object,),
{'to_dict': lambda self: {}}
)()
def test_sync_call_result_get(self):
self.client._listener.get_result = mock.MagicMock(
return_value={
kombu_base.TYPE: None,
kombu_base.RESULT: self.client._serialize_message({
'body': self._RESPONSE
})
}
)
response = self.client.sync_call(self.ctx, 'method')
self.assertEqual(response, self._RESPONSE)
def test_sync_call_result_not_get(self):
self.client._listener.get_result = mock.MagicMock(
side_effect=queue.Empty
)
self.assertRaises(
exc.MistralException,
self.client.sync_call,
self.ctx,
'method_not_found'
)
def test_sync_call_result_type_error(self):
def side_effect(*args, **kwargs):
return {
kombu_base.TYPE: 'error',
kombu_base.RESULT: TestException()
}
self.client._wait_for_result = mock.MagicMock(side_effect=side_effect)
self.assertRaises(
TestException,
self.client.sync_call,
self.ctx,
'method'
)
def test_async_call(self):
self.assertIsNone(self.client.async_call(self.ctx, 'method'))

View File

@@ -1,101 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.rpc.kombu import kombu_hosts
from mistral.tests.unit import base
from oslo_config import cfg
import functools
import oslo_messaging
HOST_1 = 'rabbitmq_1'
PORT_1 = 5671
HOST_2 = 'rabbitmq_2'
PORT_2 = 5672
USER_1 = 'u_mistral_1'
PASSWORD_1 = 'p_mistral_1'
USER_2 = 'u_mistral_2'
PASSWORD_2 = 'p_mistral_2'
VIRTUAL_HOST_1 = 'vhost_1'
VIRTUAL_HOST_2 = 'vhost_2'
class KombuHostsTest(base.BaseTest):
def setUp(self):
super(KombuHostsTest, self).setUp()
# Oslo messaging set a default config option
oslo_messaging.get_transport(cfg.CONF)
def assert_transports_host(self, expected, result):
sorted_by_host = functools.partial(sorted, key=lambda x: x.hostname)
self.assertListEqual(sorted_by_host(expected), sorted_by_host(result))
def test_transport_url(self):
self.override_config(
'transport_url',
'rabbit://{user}:{password}@{host}:{port}/{virtual_host}'.format(
user=USER_1, port=PORT_1, host=HOST_1,
password=PASSWORD_1,
virtual_host=VIRTUAL_HOST_1
))
hosts = kombu_hosts.KombuHosts(cfg.CONF)
self.assertEqual(VIRTUAL_HOST_1, hosts.virtual_host)
self.assert_transports_host([oslo_messaging.TransportHost(
hostname=HOST_1,
port=PORT_1,
username=USER_1,
password=PASSWORD_1,
)], hosts.hosts)
def test_transport_url_multiple_hosts(self):
self.override_config(
'transport_url',
'rabbit://{user_1}:{password_1}@{host_1}:{port_1},'
'{user_2}:{password_2}@{host_2}:{port_2}/{virtual_host}'.format(
user_1=USER_1,
password_1=PASSWORD_1,
port_1=PORT_1,
host_1=HOST_1,
user_2=USER_2,
password_2=PASSWORD_2,
host_2=HOST_2,
port_2=PORT_2,
virtual_host=VIRTUAL_HOST_1
))
hosts = kombu_hosts.KombuHosts(cfg.CONF)
self.assertEqual(VIRTUAL_HOST_1, hosts.virtual_host)
self.assert_transports_host(
[
oslo_messaging.TransportHost(
hostname=HOST_1,
port=PORT_1,
username=USER_1,
password=PASSWORD_1
),
oslo_messaging.TransportHost(
hostname=HOST_2,
port=PORT_2,
username=USER_2,
password=PASSWORD_2
)
],
hosts.hosts
)

View File

@@ -1,219 +0,0 @@
# Copyright (c) 2017 Intel Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral import exceptions as exc
from mistral.tests.unit.rpc.kombu import base
from mistral.tests.unit.rpc.kombu import fake_kombu
from mistral_lib import utils
from unittest import mock
import queue
with mock.patch.dict('sys.modules', kombu=fake_kombu):
from mistral.rpc.kombu import base as kombu_base
from mistral.rpc.kombu import kombu_listener
class TestException(exc.MistralException):
pass
class KombuListenerTest(base.KombuTestCase):
def setUp(self):
super(KombuListenerTest, self).setUp()
self.listener = kombu_listener.KombuRPCListener(
[mock.MagicMock()],
mock.MagicMock()
)
self.ctx = type('context', (object,), {'to_dict': lambda self: {}})()
def test_add_listener(self):
correlation_id = utils.generate_unicode_uuid()
self.listener.add_listener(correlation_id)
self.assertEqual(
type(self.listener._results.get(correlation_id)),
queue.Queue
)
self.assertEqual(0, self.listener._results[correlation_id].qsize())
def test_remove_listener_correlation_id_in_results(self):
correlation_id = utils.generate_unicode_uuid()
self.listener.add_listener(correlation_id)
self.assertEqual(
type(self.listener._results.get(correlation_id)),
queue.Queue
)
self.listener.remove_listener(correlation_id)
self.assertIsNone(
self.listener._results.get(correlation_id)
)
def test_remove_listener_correlation_id_not_in_results(self):
correlation_id = utils.generate_unicode_uuid()
self.listener.add_listener(correlation_id)
self.assertEqual(
type(self.listener._results.get(correlation_id)),
queue.Queue
)
self.listener.remove_listener(utils.generate_unicode_uuid())
self.assertEqual(
type(self.listener._results.get(correlation_id)),
queue.Queue
)
@mock.patch('threading.Thread')
def test_start_thread_not_set(self, thread_class_mock):
thread_mock = mock.MagicMock()
thread_class_mock.return_value = thread_mock
self.listener.start()
self.assertTrue(thread_mock.daemon)
self.assertEqual(thread_mock.start.call_count, 1)
@mock.patch('threading.Thread')
def test_start_thread_set(self, thread_class_mock):
thread_mock = mock.MagicMock()
thread_class_mock.return_value = thread_mock
self.listener._thread = mock.MagicMock()
self.listener.start()
self.assertEqual(thread_mock.start.call_count, 0)
def test_get_result_results_in_queue(self):
expected_result = 'abcd'
correlation_id = utils.generate_unicode_uuid()
self.listener.add_listener(correlation_id)
self.listener._results.get(correlation_id).put(expected_result)
result = self.listener.get_result(correlation_id, 5)
self.assertEqual(result, expected_result)
def test_get_result_not_in_queue(self):
correlation_id = utils.generate_unicode_uuid()
self.listener.add_listener(correlation_id)
self.assertRaises(
queue.Empty,
self.listener.get_result,
correlation_id,
1 # timeout
)
def test_get_result_lack_of_queue(self):
correlation_id = utils.generate_unicode_uuid()
self.assertRaises(
KeyError,
self.listener.get_result,
correlation_id,
1 # timeout
)
def test__on_response_message_ack_fail(self):
message = mock.MagicMock()
message.ack.side_effect = Exception('Test Exception')
response = 'response'
kombu_listener.LOG = mock.MagicMock()
self.listener.on_message(response, message)
self.assertEqual(kombu_listener.LOG.debug.call_count, 1)
self.assertEqual(kombu_listener.LOG.exception.call_count, 1)
def test__on_response_message_ack_ok_corr_id_not_match(self):
message = mock.MagicMock()
message.properties = mock.MagicMock()
message.properties.__getitem__ = lambda *args, **kwargs: True
response = 'response'
kombu_listener.LOG = mock.MagicMock()
self.listener.on_message(response, message)
self.assertEqual(kombu_listener.LOG.debug.call_count, 3)
self.assertEqual(kombu_listener.LOG.exception.call_count, 0)
def test__on_response_message_ack_ok_messsage_type_error(self):
correlation_id = utils.generate_unicode_uuid()
message = mock.MagicMock()
message.properties = dict()
message.properties['type'] = 'error'
message.properties['correlation_id'] = correlation_id
response = TestException('response')
kombu_listener.LOG = mock.MagicMock()
self.listener.add_listener(correlation_id)
self.listener.on_message(response, message)
self.assertEqual(kombu_listener.LOG.debug.call_count, 2)
self.assertEqual(kombu_listener.LOG.exception.call_count, 0)
result = self.listener.get_result(correlation_id, 5)
self.assertDictEqual(
result,
{
kombu_base.TYPE: 'error',
kombu_base.RESULT: response
}
)
def test__on_response_message_ack_ok(self):
correlation_id = utils.generate_unicode_uuid()
message = mock.MagicMock()
message.properties = dict()
message.properties['type'] = None
message.properties['correlation_id'] = correlation_id
response = 'response'
kombu_listener.LOG = mock.MagicMock()
self.listener.add_listener(correlation_id)
self.listener.on_message(response, message)
self.assertEqual(kombu_listener.LOG.debug.call_count, 2)
self.assertEqual(kombu_listener.LOG.exception.call_count, 0)
result = self.listener.get_result(correlation_id, 5)
self.assertDictEqual(
result,
{
kombu_base.TYPE: None,
kombu_base.RESULT: response
}
)

View File

@@ -1,311 +0,0 @@
# Copyright (c) 2016 Intel Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import futurist
from mistral import context
from mistral import exceptions as exc
from mistral.tests.unit.rpc.kombu import base
from mistral.tests.unit.rpc.kombu import fake_kombu
from unittest import mock
import socket
import time
from stevedore import driver
with mock.patch.dict('sys.modules', kombu=fake_kombu):
from mistral.rpc.kombu import kombu_server
class TestException(exc.MistralError):
pass
class KombuServerTest(base.KombuTestCase):
def setUp(self):
super(KombuServerTest, self).setUp()
self.conf = mock.MagicMock()
self.conf.host = 'fakehost'
self.server = kombu_server.KombuRPCServer(self.conf)
self.ctx = type('context', (object,), {'to_dict': lambda self: {}})()
def test_is_running_is_running(self):
self.server._running.set()
self.assertTrue(self.server.is_running)
def test_is_running_is_not_running(self):
self.server._running.clear()
self.assertFalse(self.server.is_running)
def test_stop(self):
self.server.stop()
self.assertFalse(self.server.is_running)
def test_publish_message(self):
body = 'body'
reply_to = 'reply_to'
corr_id = 'corr_id'
type = 'type'
acquire_mock = mock.MagicMock()
fake_kombu.producer.acquire.return_value = acquire_mock
enter_mock = mock.MagicMock()
acquire_mock.__enter__.return_value = enter_mock
self.server.publish_message(body, reply_to, corr_id, type)
enter_mock.publish.assert_called_once_with(
body={'body': '"body"'},
exchange='openstack',
routing_key=reply_to,
correlation_id=corr_id,
type=type,
serializer='json'
)
def test_run_launch_successfully(self):
acquire_mock = mock.MagicMock()
acquire_mock.drain_events.side_effect = TestException()
fake_kombu.connection.acquire.return_value = acquire_mock
self.assertRaises(TestException, self.server._run, 'blocking')
self.assertTrue(self.server.is_running)
def test_run_launch_successfully_than_stop(self):
def side_effect(*args, **kwargs):
self.assertTrue(self.server.is_running)
raise KeyboardInterrupt
acquire_mock = mock.MagicMock()
acquire_mock.drain_events.side_effect = side_effect
fake_kombu.connection.acquire.return_value = acquire_mock
self.server._run('blocking')
self.assertFalse(self.server.is_running)
self.assertEqual(self.server._sleep_time, 1)
def test_run_socket_error_reconnect(self):
def side_effect(*args, **kwargs):
if acquire_mock.drain_events.call_count == 1:
raise socket.error()
raise TestException()
acquire_mock = mock.MagicMock()
acquire_mock.drain_events.side_effect = side_effect
fake_kombu.connection.acquire.return_value = acquire_mock
self.assertRaises(TestException, self.server._run, 'blocking')
self.assertEqual(self.server._sleep_time, 1)
def test_run_socket_timeout_still_running(self):
def side_effect(*args, **kwargs):
if acquire_mock.drain_events.call_count == 0:
raise socket.timeout()
raise TestException()
acquire_mock = mock.MagicMock()
acquire_mock.drain_events.side_effect = side_effect
fake_kombu.connection.acquire.return_value = acquire_mock
self.assertRaises(
TestException,
self.server._run,
'blocking'
)
self.assertTrue(self.server.is_running)
def test_run_keyboard_interrupt_not_running(self):
acquire_mock = mock.MagicMock()
acquire_mock.drain_events.side_effect = KeyboardInterrupt()
fake_kombu.connection.acquire.return_value = acquire_mock
self.assertIsNone(self.server.run())
# Wait 1 sec so the thread start listening on RPC and receive the
# side_effect
time.sleep(1)
self.assertFalse(self.server.is_running)
@mock.patch.object(
kombu_server.KombuRPCServer,
'_on_message',
mock.MagicMock()
)
@mock.patch.object(kombu_server.KombuRPCServer, 'publish_message')
def test__on_message_safe_message_processing_ok(self, publish_message):
message = mock.MagicMock()
self.server._on_message_safe(None, message)
self.assertEqual(message.ack.call_count, 1)
self.assertEqual(publish_message.call_count, 0)
@mock.patch.object(kombu_server.KombuRPCServer, '_on_message')
@mock.patch.object(kombu_server.KombuRPCServer, 'publish_message')
def test__on_message_safe_message_processing_raise(
self,
publish_message,
_on_message
):
reply_to = 'reply_to'
correlation_id = 'corr_id'
message = mock.MagicMock()
message.properties = {
'reply_to': reply_to,
'correlation_id': correlation_id
}
test_exception = TestException()
_on_message.side_effect = test_exception
self.server._on_message_safe(None, message)
self.assertEqual(message.ack.call_count, 1)
self.assertEqual(publish_message.call_count, 1)
@mock.patch.object(
kombu_server.KombuRPCServer,
'_get_rpc_method',
mock.MagicMock(return_value=None)
)
def test__on_message_rpc_method_not_found(self):
request = {
'rpc_ctx': {},
'rpc_method': 'not_found_method',
'arguments': {}
}
message = mock.MagicMock()
message.properties = {
'reply_to': None,
'correlation_id': None
}
self.assertRaises(
exc.MistralException,
self.server._on_message,
request,
message
)
@mock.patch.object(kombu_server.KombuRPCServer, 'publish_message')
@mock.patch.object(kombu_server.KombuRPCServer, '_get_rpc_method')
@mock.patch('mistral.context.MistralContext.from_dict')
def test__on_message_is_async(self, mock_get_context, get_rpc_method,
publish_message):
result = 'result'
request = {
'async': True,
'rpc_ctx': {},
'rpc_method': 'found_method',
'arguments': self.server._serialize_message({
'a': 1,
'b': 2
})
}
message = mock.MagicMock()
message.properties = {
'reply_to': None,
'correlation_id': None
}
message.delivery_info.get.return_value = False
rpc_method = mock.MagicMock(return_value=result)
get_rpc_method.return_value = rpc_method
ctx = context.MistralContext()
mock_get_context.return_value = ctx
self.server._on_message(request, message)
rpc_method.assert_called_once_with(
rpc_ctx=ctx,
a=1,
b=2
)
self.assertEqual(publish_message.call_count, 0)
@mock.patch.object(kombu_server.KombuRPCServer, 'publish_message')
@mock.patch.object(kombu_server.KombuRPCServer, '_get_rpc_method')
@mock.patch('mistral.context.MistralContext.from_dict')
def test__on_message_is_sync(self, mock_get_context, get_rpc_method,
publish_message):
result = 'result'
request = {
'async': False,
'rpc_ctx': {},
'rpc_method': 'found_method',
'arguments': self.server._serialize_message({
'a': 1,
'b': 2
})
}
reply_to = 'reply_to'
correlation_id = 'corr_id'
message = mock.MagicMock()
message.properties = {
'reply_to': reply_to,
'correlation_id': correlation_id
}
message.delivery_info.get.return_value = False
rpc_method = mock.MagicMock(return_value=result)
get_rpc_method.return_value = rpc_method
ctx = context.MistralContext()
mock_get_context.return_value = ctx
self.server._on_message(request, message)
rpc_method.assert_called_once_with(
rpc_ctx=ctx,
a=1,
b=2
)
publish_message.assert_called_once_with(
result,
reply_to,
correlation_id
)
def test__prepare_worker(self):
self.server._prepare_worker('blocking')
self.assertEqual(
futurist.SynchronousExecutor,
type(self.server._worker)
)
self.server._prepare_worker('threading')
self.assertEqual(
futurist.ThreadPoolExecutor,
type(self.server._worker)
)
@mock.patch('stevedore.driver.DriverManager')
def test__prepare_worker_no_valid_executor(self, driver_manager_mock):
driver_manager_mock.side_effect = driver.NoMatches()
self.assertRaises(
driver.NoMatches,
self.server._prepare_worker,
'non_valid_executor'
)

View File

@@ -0,0 +1,5 @@
---
deprecations:
- |
Kombu RPC driver is now deleted from mistral RPC drivers.
Only oslo messaging driver is supported.

View File

@@ -10,7 +10,6 @@ eventlet>=0.27.0 # MIT
Jinja2>=2.10 # BSD License (3 clause)
jsonschema>=3.2.0 # MIT
keystonemiddleware>=4.18.0 # Apache-2.0
kombu!=4.0.2,>=4.6.1 # BSD
mistral-lib>=2.3.0 # Apache-2.0
networkx>=2.3 # BSD
oslo.concurrency>=3.26.0 # Apache-2.0

View File

@@ -36,9 +36,6 @@ wsgi_scripts =
mistral.rpc.backends =
oslo_client = mistral.rpc.oslo.oslo_client:OsloRPCClient
oslo_server = mistral.rpc.oslo.oslo_server:OsloRPCServer
# NOTE(amorin) Kombu driver is deprecated
kombu_client = mistral.rpc.kombu.kombu_client:KombuRPCClient
kombu_server = mistral.rpc.kombu.kombu_server:KombuRPCServer
oslo.config.opts =
mistral.config = mistral.config:list_opts
@@ -108,12 +105,6 @@ mistral.auth =
keystone = mistral.auth.keystone:KeystoneAuthHandler
keycloak-oidc = mistral.auth.keycloak:KeycloakAuthHandler
# NOTE(amorin) Kombu driver is deprecated
kombu_driver.executors =
blocking = futurist:SynchronousExecutor
threading = futurist:ThreadPoolExecutor
eventlet = futurist:GreenThreadPoolExecutor
pygments.lexers =
mistral = mistral.ext.pygmentplugin:MistralLexer