storage: stop using global conf

Change-Id: Iadfc21edd63b9e8c529691c1ccb150b23d3ee8d4
This commit is contained in:
Mehdi Abaakouk
2016-10-13 12:39:03 +02:00
parent f7508d73ff
commit 3219e8bbb2
18 changed files with 88 additions and 87 deletions

View File

@@ -25,8 +25,8 @@ class Connection(object):
'storage': {'production_ready': False},
}
def __init__(self, url):
pass
def __init__(self, conf, url):
self.conf = conf
@staticmethod
def upgrade():

View File

@@ -70,7 +70,8 @@ class Connection(base.Connection):
# it is only searchable after periodic refreshes.
_refresh_on_write = False
def __init__(self, url):
def __init__(self, conf, url):
super(Connection, self).__init__(conf, url)
url_split = netutils.urlsplit(url)
self.conn = es.Elasticsearch(url_split.netloc)

View File

@@ -67,9 +67,6 @@ class Connection(hbase_base.Connection, base.Connection):
EVENT_TABLE = "event"
def __init__(self, url):
super(Connection, self).__init__(url)
def upgrade(self):
tables = [self.EVENT_TABLE]
column_families = {'f': dict(max_versions=1)}

View File

@@ -12,7 +12,6 @@
# under the License.
"""MongoDB storage backend"""
from oslo_config import cfg
from oslo_log import log
import pymongo
@@ -29,13 +28,14 @@ class Connection(pymongo_base.Connection):
CONNECTION_POOL = pymongo_utils.ConnectionPool()
def __init__(self, url):
def __init__(self, conf, url):
super(Connection, self).__init__(conf, url)
# NOTE(jd) Use our own connection pooling on top of the Pymongo one.
# We need that otherwise we overflow the MongoDB instance with new
# connection since we instantiate a Pymongo client each time someone
# requires a new storage connection.
self.conn = self.CONNECTION_POOL.connect(url)
self.conn = self.CONNECTION_POOL.connect(conf, url)
# Require MongoDB 2.4 to use $setOnInsert
if self.conn.server_info()['versionArray'] < [2, 4]:
@@ -64,7 +64,7 @@ class Connection(pymongo_base.Connection):
('timestamp', pymongo.ASCENDING)],
name='event_type_idx'
)
ttl = cfg.CONF.database.event_time_to_live
ttl = self.conf.database.event_time_to_live
impl_mongodb.Connection.update_ttl(ttl, 'event_ttl', 'timestamp',
self.db.event)

View File

@@ -17,7 +17,6 @@ from __future__ import absolute_import
import datetime
import os
from oslo_config import cfg
from oslo_db import exception as dbexc
from oslo_db.sqlalchemy import session as db_session
from oslo_log import log
@@ -124,12 +123,13 @@ class Connection(base.Connection):
AVAILABLE_STORAGE_CAPABILITIES,
)
def __init__(self, url):
def __init__(self, conf, url):
super(Connection, self).__init__(conf, url)
# Set max_retries to 0, since oslo.db in certain cases may attempt
# to retry making the db connection retried max_retries ^ 2 times
# in failure case and db reconnection has already been implemented
# in storage.__init__.get_connection_from_config function
options = dict(cfg.CONF.database.items())
options = dict(self.conf.database.items())
options['max_retries'] = 0
# oslo.db doesn't support options defined by Ceilometer
for opt in storage.OPTS:

View File

@@ -85,12 +85,12 @@ def get_connection_from_config(conf, purpose='metering'):
namespace = 'ceilometer.%s.storage' % purpose
url = (getattr(conf.database, '%s_connection' % purpose) or
conf.database.connection)
return get_connection(url, namespace)
return get_connection(conf, url, namespace)
return _inner()
def get_connection(url, namespace):
def get_connection(conf, url, namespace):
"""Return an open connection to the database."""
connection_scheme = urlparse.urlparse(url).scheme
# SqlAlchemy connections specify may specify a 'dialect' or
@@ -100,7 +100,7 @@ def get_connection(url, namespace):
LOG.debug('looking for %(name)r driver in %(namespace)r',
{'name': engine_name, 'namespace': namespace})
mgr = driver.DriverManager(namespace, engine_name)
return mgr.driver(url)
return mgr.driver(conf, url)
class SampleFilter(object):

View File

@@ -133,8 +133,8 @@ class Connection(object):
'storage': {'production_ready': False},
}
def __init__(self, url):
pass
def __init__(self, conf, url):
self.conf = conf
@staticmethod
def upgrade():

View File

@@ -28,7 +28,8 @@ class Connection(object):
_memory_instance = None
def __init__(self, url):
def __init__(self, conf, url):
super(Connection, self).__init__(conf, url)
"""Hbase Connection Initialization."""
opts = self._parse_connection_url(url)

View File

@@ -116,9 +116,6 @@ class Connection(hbase_base.Connection, base.Connection):
RESOURCE_TABLE = "resource"
METER_TABLE = "meter"
def __init__(self, url):
super(Connection, self).__init__(url)
def upgrade(self):
tables = [self.RESOURCE_TABLE, self.METER_TABLE]
column_families = {'f': dict(max_versions=1)}

View File

@@ -25,7 +25,6 @@ import uuid
import bson.code
import bson.objectid
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
import pymongo
@@ -138,13 +137,14 @@ class Connection(pymongo_base.Connection):
_APOCALYPSE = datetime.datetime(year=datetime.MAXYEAR, month=12, day=31,
hour=23, minute=59, second=59)
def __init__(self, url):
def __init__(self, conf, url):
super(Connection, self).__init__(conf, url)
# NOTE(jd) Use our own connection pooling on top of the Pymongo one.
# We need that otherwise we overflow the MongoDB instance with new
# connection since we instantiate a Pymongo client each time someone
# requires a new storage connection.
self.conn = self.CONNECTION_POOL.connect(url)
self.conn = self.CONNECTION_POOL.connect(conf, url)
self.version = self.conn.server_info()['versionArray']
# Require MongoDB 2.4 to use $setOnInsert
if self.version < pymongo_utils.MINIMUM_COMPATIBLE_MONGODB_VERSION:
@@ -231,7 +231,7 @@ class Connection(pymongo_base.Connection):
name='last_sample_timestamp_idx')
# update or create time_to_live index
ttl = cfg.CONF.database.metering_time_to_live
ttl = self.conf.database.metering_time_to_live
self.update_ttl(ttl, 'meter_ttl', 'timestamp', self.db.meter)
self.update_ttl(ttl, 'resource_ttl', 'last_sample_timestamp',
self.db.resource)

View File

@@ -18,7 +18,6 @@ import datetime
import hashlib
import os
from oslo_config import cfg
from oslo_db import api
from oslo_db import exception as dbexc
from oslo_db.sqlalchemy import session as db_session
@@ -220,12 +219,13 @@ class Connection(base.Connection):
AVAILABLE_STORAGE_CAPABILITIES,
)
def __init__(self, url):
def __init__(self, conf, url):
super(Connection, self).__init__(conf, url)
# Set max_retries to 0, since oslo.db in certain cases may attempt
# to retry making the db connection retried max_retries ^ 2 times
# in failure case and db reconnection has already been implemented
# in storage.__init__.get_connection_from_config function
options = dict(cfg.CONF.database.items())
options = dict(self.conf.database.items())
options['max_retries'] = 0
# oslo.db doesn't support options defined by Ceilometer
for opt in storage.OPTS:
@@ -387,7 +387,7 @@ class Connection(base.Connection):
rows = sample_q.delete()
LOG.info(_LI("%d samples removed from database"), rows)
if not cfg.CONF.database.sql_expire_samples_only:
if not self.conf.database.sql_expire_samples_only:
with session.begin():
# remove Meter definitions with no matching samples
(session.query(models.Meter)
@@ -459,7 +459,7 @@ class Connection(base.Connection):
# NOTE: When sql_expire_samples_only is enabled, there will be some
# resources without any sample, in such case we should use inner
# join on sample table to avoid wrong result.
if cfg.CONF.database.sql_expire_samples_only or has_timestamp:
if self.conf.database.sql_expire_samples_only or has_timestamp:
res_q = session.query(distinct(models.Resource.resource_id)).join(
models.Sample,
models.Sample.resource_id == models.Resource.internal_id)

View File

@@ -19,7 +19,6 @@ import datetime
import time
import weakref
from oslo_config import cfg
from oslo_log import log
from oslo_utils import netutils
import pymongo
@@ -239,7 +238,7 @@ class ConnectionPool(object):
def __init__(self):
self._pool = {}
def connect(self, url):
def connect(self, conf, url):
connection_options = pymongo.uri_parser.parse_uri(url)
del connection_options['database']
del connection_options['username']
@@ -255,14 +254,14 @@ class ConnectionPool(object):
log_data = {'db': splitted_url.scheme,
'nodelist': connection_options['nodelist']}
LOG.info(_LI('Connecting to %(db)s on %(nodelist)s') % log_data)
client = self._mongo_connect(url)
client = self._mongo_connect(conf, url)
self._pool[pool_key] = weakref.ref(client)
return client
@staticmethod
def _mongo_connect(url):
def _mongo_connect(conf, url):
try:
return MongoProxy(pymongo.MongoClient(url))
return MongoProxy(conf, pymongo.MongoClient(url))
except pymongo.errors.ConnectionFailure as e:
LOG.warning(_('Unable to connect to the database server: '
'%(errmsg)s.') % {'errmsg': e})
@@ -394,16 +393,16 @@ class QueryTransformer(object):
def safe_mongo_call(call):
def closure(*args, **kwargs):
def closure(self, *args, **kwargs):
# NOTE(idegtiarov) options max_retries and retry_interval have been
# registered in storage.__init__ in oslo_db.options.set_defaults
# default values for both options are 10.
max_retries = cfg.CONF.database.max_retries
retry_interval = cfg.CONF.database.retry_interval
max_retries = self.conf.database.max_retries
retry_interval = self.conf.database.retry_interval
attempts = 0
while True:
try:
return call(*args, **kwargs)
return call(self, *args, **kwargs)
except pymongo.errors.AutoReconnect as err:
if 0 <= max_retries <= attempts:
LOG.error(_('Unable to reconnect to the primary mongodb '
@@ -420,7 +419,8 @@ def safe_mongo_call(call):
class MongoConn(object):
def __init__(self, method):
def __init__(self, conf, method):
self.conf = conf
self.method = method
@safe_mongo_call
@@ -436,21 +436,22 @@ MONGO_METHODS.update(set([typ for typ in dir(pymongo)
class MongoProxy(object):
def __init__(self, conn):
def __init__(self, conf, conn):
self.conn = conn
self.conf = conf
def __getitem__(self, item):
"""Create and return proxy around the method in the connection.
:param item: name of the connection
"""
return MongoProxy(self.conn[item])
return MongoProxy(self.conf, self.conn[item])
def find(self, *args, **kwargs):
# We need this modifying method to return a CursorProxy object so that
# we can handle the Cursor next function to catch the AutoReconnect
# exception.
return CursorProxy(self.conn.find(*args, **kwargs))
return CursorProxy(self.conf, self.conn.find(*args, **kwargs))
def create_index(self, keys, name=None, *args, **kwargs):
try:
@@ -472,19 +473,22 @@ class MongoProxy(object):
insert, wrap this method in the MongoConn.
Else wrap getting attribute with MongoProxy.
"""
if item in ('name', 'database'):
if item in ("conf",):
return super(MongoProxy, self).__getattr__(item)
elif item in ('name', 'database'):
return getattr(self.conn, item)
if item in MONGO_METHODS:
return MongoConn(getattr(self.conn, item))
return MongoProxy(getattr(self.conn, item))
elif item in MONGO_METHODS:
return MongoConn(self.conf, getattr(self.conn, item))
return MongoProxy(self.conf, getattr(self.conn, item))
def __call__(self, *args, **kwargs):
return self.conn(*args, **kwargs)
class CursorProxy(pymongo.cursor.Cursor):
def __init__(self, cursor):
def __init__(self, conf, cursor):
self.cursor = cursor
self.conf = conf
def __getitem__(self, item):
return self.cursor[item]

View File

@@ -36,10 +36,13 @@ except ImportError:
mocks = None # happybase module is not Python 3 compatible yet
class MongoDbManager(fixtures.Fixture):
def __init__(self, url):
class DBManager(fixtures.Fixture):
def __init__(self, conf, url):
self._url = url
self._conf = conf
class MongoDbManager(DBManager):
def setUp(self):
super(MongoDbManager, self).setUp()
@@ -49,9 +52,9 @@ class MongoDbManager(fixtures.Fixture):
message='.*you must provide a username and password.*')
try:
self.connection = storage.get_connection(
self.url, 'ceilometer.metering.storage')
self._conf, self.url, 'ceilometer.metering.storage')
self.event_connection = storage.get_connection(
self.url, 'ceilometer.event.storage')
self._conf, self.url, 'ceilometer.event.storage')
except storage.StorageBadVersion as e:
raise testcase.TestSkipped(six.text_type(e))
@@ -63,8 +66,9 @@ class MongoDbManager(fixtures.Fixture):
}
class SQLManager(fixtures.Fixture):
def __init__(self, url):
class SQLManager(DBManager):
def __init__(self, conf, url):
super(SQLManager, self).__init__(conf, url)
db_name = 'ceilometer_%s' % uuid.uuid4().hex
engine = sqlalchemy.create_engine(url)
conn = engine.connect()
@@ -78,9 +82,9 @@ class SQLManager(fixtures.Fixture):
def setUp(self):
super(SQLManager, self).setUp()
self.connection = storage.get_connection(
self.url, 'ceilometer.metering.storage')
self._conf, self.url, 'ceilometer.metering.storage')
self.event_connection = storage.get_connection(
self.url, 'ceilometer.event.storage')
self._conf, self.url, 'ceilometer.event.storage')
class PgSQLManager(SQLManager):
@@ -97,32 +101,26 @@ class MySQLManager(SQLManager):
conn.execute('CREATE DATABASE %s;' % db_name)
class ElasticSearchManager(fixtures.Fixture):
def __init__(self, url):
self.url = url
class ElasticSearchManager(DBManager):
def setUp(self):
super(ElasticSearchManager, self).setUp()
self.connection = storage.get_connection(
'sqlite://', 'ceilometer.metering.storage')
self._conf, 'sqlite://', 'ceilometer.metering.storage')
self.event_connection = storage.get_connection(
self.url, 'ceilometer.event.storage')
self._conf, self.url, 'ceilometer.event.storage')
# prefix each test with unique index name
self.event_connection.index_name = 'events_%s' % uuid.uuid4().hex
# force index on write so data is queryable right away
self.event_connection._refresh_on_write = True
class HBaseManager(fixtures.Fixture):
def __init__(self, url):
self._url = url
class HBaseManager(DBManager):
def setUp(self):
super(HBaseManager, self).setUp()
self.connection = storage.get_connection(
self.url, 'ceilometer.metering.storage')
self._conf, self.url, 'ceilometer.metering.storage')
self.event_connection = storage.get_connection(
self.url, 'ceilometer.event.storage')
self._conn, self.url, 'ceilometer.event.storage')
# Unique prefix for each test to keep data is distinguished because
# all test data is stored in one table
data_prefix = str(uuid.uuid4().hex)
@@ -152,17 +150,14 @@ class HBaseManager(fixtures.Fixture):
)
class SQLiteManager(fixtures.Fixture):
def __init__(self, url):
self.url = url
class SQLiteManager(DBManager):
def setUp(self):
super(SQLiteManager, self).setUp()
self.url = self._url
self.connection = storage.get_connection(
self.url, 'ceilometer.metering.storage')
self._conf, self._url, 'ceilometer.metering.storage')
self.event_connection = storage.get_connection(
self.url, 'ceilometer.event.storage')
self._conf, self._url, 'ceilometer.event.storage')
@six.add_metaclass(test_base.SkipNotImplementedMeta)
@@ -202,7 +197,7 @@ class TestBase(test_base.BaseTestCase):
if not manager:
self.skipTest("missing driver manager: %s" % engine)
self.db_manager = manager(db_url)
self.db_manager = manager(self.CONF, db_url)
self.useFixture(self.db_manager)
@@ -231,7 +226,7 @@ class TestBase(test_base.BaseTestCase):
self.conn = None
super(TestBase, self).tearDown()
def _get_connection(self, url, namespace):
def _get_connection(self, conf, url, namespace):
if namespace == "ceilometer.event.storage":
return self.event_conn
return self.conn

View File

@@ -52,7 +52,7 @@ class ConnectionTest(tests_db.TestBase):
with mock.patch.object(hbase.Connection, '_get_connection_pool',
side_effect=get_connection_pool):
conn = hbase.Connection('hbase://test_hbase:9090')
conn = hbase.Connection(self.CONF, 'hbase://test_hbase:9090')
self.assertIsInstance(conn.conn_pool, TestConn)

View File

@@ -22,7 +22,7 @@ from ceilometer.storage import impl_log
class ConnectionTest(base.BaseTestCase):
@staticmethod
def test_get_connection():
conn = impl_log.Connection(None)
conn = impl_log.Connection(None, None)
conn.record_metering_data({'counter_name': 'test',
'resource_id': __name__,
'counter_volume': 1,

View File

@@ -30,12 +30,12 @@ from ceilometer.tests import db as tests_db
@tests_db.run_with('mongodb')
class MongoDBConnection(tests_db.TestBase):
def test_connection_pooling(self):
test_conn = impl_mongodb.Connection(self.db_manager.url)
test_conn = impl_mongodb.Connection(self.CONF, self.db_manager.url)
self.assertEqual(self.conn.conn, test_conn.conn)
def test_replica_set(self):
url = self.db_manager._url + '?replicaSet=foobar'
conn = impl_mongodb.Connection(url)
conn = impl_mongodb.Connection(self.CONF, url)
self.assertTrue(conn.conn)

View File

@@ -52,8 +52,8 @@ class EngineFacadeTest(tests_db.TestBase):
@mock.patch.object(warnings, 'warn')
def test_no_not_supported_warning(self, mocked):
impl_sqlalchemy.Connection('sqlite://')
impl_sqla_event.Connection('sqlite://')
impl_sqlalchemy.Connection(self.CONF, 'sqlite://')
impl_sqla_event.Connection(self.CONF, 'sqlite://')
self.assertNotIn(mock.call(mock.ANY, exception.NotSupportedWarning),
mocked.call_args_list)

View File

@@ -32,14 +32,20 @@ import six
class EngineTest(base.BaseTestCase):
def setUp(self):
super(EngineTest, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
def test_get_connection(self):
engine = storage.get_connection('log://localhost',
engine = storage.get_connection(self.CONF,
'log://localhost',
'ceilometer.metering.storage')
self.assertIsInstance(engine, impl_log.Connection)
def test_get_connection_no_such_engine(self):
try:
storage.get_connection('no-such-engine://localhost',
storage.get_connection(self.CONF,
'no-such-engine://localhost',
'ceilometer.metering.storage')
except RuntimeError as err:
self.assertIn('no-such-engine', six.text_type(err))