diff --git a/.pydevproject b/.pydevproject
index 98cc65d3..03181631 100644
--- a/.pydevproject
+++ b/.pydevproject
@@ -1,8 +1,15 @@
-python 2.7
-Default
-
-/ceph-radosgw/hooks
-
+
+ python 2.7
+
+ Default
+
+
+ /${PROJECT_DIR_NAME}/lib
+ /${PROJECT_DIR_NAME}/hooks
+ /${PROJECT_DIR_NAME}/unit_tests
+ /${PROJECT_DIR_NAME}/actions
+
+
diff --git a/Makefile b/Makefile
index 6813bb22..a0ab412e 100644
--- a/Makefile
+++ b/Makefile
@@ -16,9 +16,12 @@ bin/charm_helpers_sync.py:
@mkdir -p bin
@curl -o bin/charm_helpers_sync.py https://raw.githubusercontent.com/juju/charm-helpers/master/tools/charm_helpers_sync/charm_helpers_sync.py
+bin/git_sync.py:
+ @mkdir -p bin
+ @wget -O bin/git_sync.py https://raw.githubusercontent.com/CanonicalLtd/git-sync/master/git_sync.py
+
sync: bin/charm_helpers_sync.py
@$(PYTHON) bin/charm_helpers_sync.py -c charm-helpers-hooks.yaml
-publish: lint test
- bzr push lp:charms/ceph-radosgw
- bzr push lp:charms/trusty/ceph-radosgw
+ceph-sync: bin/git_sync.py
+ $(PYTHON) bin/git_sync.py -d lib -s https://github.com/openstack/charms.ceph.git
diff --git a/hooks/ceph.py b/hooks/ceph_rgw.py
similarity index 100%
rename from hooks/ceph.py
rename to hooks/ceph_rgw.py
diff --git a/hooks/hooks.py b/hooks/hooks.py
index b1d79d84..77f42263 100755
--- a/hooks/hooks.py
+++ b/hooks/hooks.py
@@ -19,7 +19,10 @@ import subprocess
import sys
import socket
-import ceph
+sys.path.append('lib')
+
+import ceph_rgw as ceph
+import ceph.utils as ceph_utils
from charmhelpers.core.hookenv import (
relation_get,
@@ -39,6 +42,7 @@ from charmhelpers.fetch import (
apt_purge,
add_source,
filter_installed_packages,
+ filter_missing_packages,
)
from charmhelpers.payload.execd import execd_preinstall
from charmhelpers.core.host import (
@@ -115,16 +119,45 @@ APACHE_PACKAGES = [
]
+def upgrade_available():
+ """Check for upgrade for ceph
+
+ :returns: whether an upgrade is available
+ :rtype: boolean
+ """
+ c = config()
+ old_version = ceph_utils.resolve_ceph_version(c.previous('source') or
+ 'distro')
+ new_version = ceph_utils.resolve_ceph_version(c.get('source'))
+ if (old_version in ceph_utils.UPGRADE_PATHS and
+ new_version == ceph_utils.UPGRADE_PATHS[old_version]):
+ return True
+ return False
+
+
def install_packages():
- add_source(config('source'), config('key'))
- apt_update(fatal=True)
+ c = config()
+ if c.changed('source') or c.changed('key'):
+ add_source(c.get('source'), c.get('key'))
+ apt_update(fatal=True)
+
if is_container():
PACKAGES.remove('ntp')
- pkgs = filter_installed_packages(PACKAGES)
+
+ # NOTE: just use full package list if we're in an upgrade
+ # config-changed execution
+ pkgs = (
+ PACKAGES if upgrade_available() else
+ filter_installed_packages(PACKAGES)
+ )
if pkgs:
status_set('maintenance', 'Installing radosgw packages')
- apt_install(PACKAGES, fatal=True)
- apt_purge(APACHE_PACKAGES)
+ apt_install(pkgs, fatal=True)
+
+ pkgs = filter_missing_packages(APACHE_PACKAGES)
+ if pkgs:
+ apt_purge(pkgs)
+
disable_unused_apache_sites()
@@ -153,7 +186,6 @@ def config_changed():
return
install_packages()
- disable_unused_apache_sites()
if config('prefer-ipv6'):
status_set('maintenance', 'configuring ipv6')
diff --git a/hooks/install_deps b/hooks/install_deps
index 4d06619a..0f116166 100755
--- a/hooks/install_deps
+++ b/hooks/install_deps
@@ -1,7 +1,7 @@
#!/bin/bash -e
# Install required dependencies for charm runtime
-declare -a DEPS=('apt' 'netaddr' 'netifaces' 'yaml' 'jinja2' 'dnspython')
+declare -a DEPS=('apt' 'netaddr' 'netifaces' 'yaml' 'jinja2' 'dnspython' 'pyudev')
check_and_install() {
pkg="${1}-${2}"
diff --git a/lib/.keep b/lib/.keep
deleted file mode 100644
index f49b91ae..00000000
--- a/lib/.keep
+++ /dev/null
@@ -1,3 +0,0 @@
- This file was created by release-tools to ensure that this empty
- directory is preserved in vcs re: lint check definitions in global
- tox.ini files. This file can be removed if/when this dir is actually in use.
diff --git a/lib/ceph/__init__.py b/lib/ceph/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/lib/ceph/broker.py b/lib/ceph/broker.py
new file mode 100644
index 00000000..3e857d21
--- /dev/null
+++ b/lib/ceph/broker.py
@@ -0,0 +1,872 @@
+# Copyright 2016 Canonical Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import json
+import os
+
+from tempfile import NamedTemporaryFile
+
+from ceph.utils import (
+ get_cephfs,
+ get_osd_weight
+)
+from ceph.crush_utils import Crushmap
+
+from charmhelpers.core.hookenv import (
+ log,
+ DEBUG,
+ INFO,
+ ERROR,
+)
+from charmhelpers.contrib.storage.linux.ceph import (
+ create_erasure_profile,
+ delete_pool,
+ erasure_profile_exists,
+ get_osds,
+ monitor_key_get,
+ monitor_key_set,
+ pool_exists,
+ pool_set,
+ remove_pool_snapshot,
+ rename_pool,
+ set_pool_quota,
+ snapshot_pool,
+ validator,
+ ErasurePool,
+ Pool,
+ ReplicatedPool,
+)
+
+# This comes from http://docs.ceph.com/docs/master/rados/operations/pools/
+# This should do a decent job of preventing people from passing in bad values.
+# It will give a useful error message
+from subprocess import check_call, check_output, CalledProcessError
+
+POOL_KEYS = {
+ # "Ceph Key Name": [Python type, [Valid Range]]
+ "size": [int],
+ "min_size": [int],
+ "crash_replay_interval": [int],
+ "pgp_num": [int], # = or < pg_num
+ "crush_ruleset": [int],
+ "hashpspool": [bool],
+ "nodelete": [bool],
+ "nopgchange": [bool],
+ "nosizechange": [bool],
+ "write_fadvise_dontneed": [bool],
+ "noscrub": [bool],
+ "nodeep-scrub": [bool],
+ "hit_set_type": [str, ["bloom", "explicit_hash",
+ "explicit_object"]],
+ "hit_set_count": [int, [1, 1]],
+ "hit_set_period": [int],
+ "hit_set_fpp": [float, [0.0, 1.0]],
+ "cache_target_dirty_ratio": [float],
+ "cache_target_dirty_high_ratio": [float],
+ "cache_target_full_ratio": [float],
+ "target_max_bytes": [int],
+ "target_max_objects": [int],
+ "cache_min_flush_age": [int],
+ "cache_min_evict_age": [int],
+ "fast_read": [bool],
+ "allow_ec_overwrites": [bool],
+ "compression_mode": [str, ["none", "passive", "aggressive", "force"]],
+ "compression_algorithm": [str, ["lz4", "snappy", "zlib", "zstd"]],
+ "compression_required_ratio": [float, [0.0, 1.0]],
+}
+
+CEPH_BUCKET_TYPES = [
+ 'osd',
+ 'host',
+ 'chassis',
+ 'rack',
+ 'row',
+ 'pdu',
+ 'pod',
+ 'room',
+ 'datacenter',
+ 'region',
+ 'root'
+]
+
+
+def decode_req_encode_rsp(f):
+ """Decorator to decode incoming requests and encode responses."""
+
+ def decode_inner(req):
+ return json.dumps(f(json.loads(req)))
+
+ return decode_inner
+
+
+@decode_req_encode_rsp
+def process_requests(reqs):
+ """Process Ceph broker request(s).
+
+ This is a versioned api. API version must be supplied by the client making
+ the request.
+
+ :param reqs: dict of request parameters.
+ :returns: dict. exit-code and reason if not 0
+ """
+ request_id = reqs.get('request-id')
+ try:
+ version = reqs.get('api-version')
+ if version == 1:
+ log('Processing request {}'.format(request_id), level=DEBUG)
+ resp = process_requests_v1(reqs['ops'])
+ if request_id:
+ resp['request-id'] = request_id
+
+ return resp
+
+ except Exception as exc:
+ log(str(exc), level=ERROR)
+ msg = ("Unexpected error occurred while processing requests: %s" %
+ reqs)
+ log(msg, level=ERROR)
+ return {'exit-code': 1, 'stderr': msg}
+
+ msg = ("Missing or invalid api version ({})".format(version))
+ resp = {'exit-code': 1, 'stderr': msg}
+ if request_id:
+ resp['request-id'] = request_id
+
+ return resp
+
+
+def handle_create_erasure_profile(request, service):
+ """Create an erasure profile.
+
+ :param request: dict of request operations and params
+ :param service: The ceph client to run the command under.
+ :returns: dict. exit-code and reason if not 0
+ """
+ # "local" | "shec" or it defaults to "jerasure"
+ erasure_type = request.get('erasure-type')
+ # "host" | "rack" or it defaults to "host" # Any valid Ceph bucket
+ failure_domain = request.get('failure-domain')
+ name = request.get('name')
+ k = request.get('k')
+ m = request.get('m')
+ l = request.get('l')
+
+ if failure_domain not in CEPH_BUCKET_TYPES:
+ msg = "failure-domain must be one of {}".format(CEPH_BUCKET_TYPES)
+ log(msg, level=ERROR)
+ return {'exit-code': 1, 'stderr': msg}
+
+ create_erasure_profile(service=service, erasure_plugin_name=erasure_type,
+ profile_name=name, failure_domain=failure_domain,
+ data_chunks=k, coding_chunks=m, locality=l)
+
+
+def handle_add_permissions_to_key(request, service):
+ """Groups are defined by the key cephx.groups.(namespace-)?-(name). This
+ key will contain a dict serialized to JSON with data about the group,
+ including pools and members.
+
+ A group can optionally have a namespace defined that will be used to
+ further restrict pool access.
+ """
+ resp = {'exit-code': 0}
+
+ service_name = request.get('name')
+ group_name = request.get('group')
+ group_namespace = request.get('group-namespace')
+ if group_namespace:
+ group_name = "{}-{}".format(group_namespace, group_name)
+ group = get_group(group_name=group_name)
+ service_obj = get_service_groups(service=service_name,
+ namespace=group_namespace)
+ if request.get('object-prefix-permissions'):
+ service_obj['object_prefix_perms'] = request.get(
+ 'object-prefix-permissions')
+ format("Service object: {}".format(service_obj))
+ permission = request.get('group-permission') or "rwx"
+ if service_name not in group['services']:
+ group['services'].append(service_name)
+ save_group(group=group, group_name=group_name)
+ if permission not in service_obj['group_names']:
+ service_obj['group_names'][permission] = []
+ if group_name not in service_obj['group_names'][permission]:
+ service_obj['group_names'][permission].append(group_name)
+ save_service(service=service_obj, service_name=service_name)
+ service_obj['groups'] = _build_service_groups(service_obj,
+ group_namespace)
+ update_service_permissions(service_name, service_obj, group_namespace)
+
+ return resp
+
+
+def update_service_permissions(service, service_obj=None, namespace=None):
+ """Update the key permissions for the named client in Ceph"""
+ if not service_obj:
+ service_obj = get_service_groups(service=service, namespace=namespace)
+ permissions = pool_permission_list_for_service(service_obj)
+ call = ['ceph', 'auth', 'caps', 'client.{}'.format(service)] + permissions
+ try:
+ check_call(call)
+ except CalledProcessError as e:
+ log("Error updating key capabilities: {}".format(e))
+
+
+def add_pool_to_group(pool, group, namespace=None):
+ """Add a named pool to a named group"""
+ group_name = group
+ if namespace:
+ group_name = "{}-{}".format(namespace, group_name)
+ group = get_group(group_name=group_name)
+ if pool not in group['pools']:
+ group["pools"].append(pool)
+ save_group(group, group_name=group_name)
+ for service in group['services']:
+ update_service_permissions(service, namespace=namespace)
+
+
+def pool_permission_list_for_service(service):
+ """Build the permission string for Ceph for a given service"""
+ permissions = []
+ permission_types = collections.OrderedDict()
+ for permission, group in sorted(service["group_names"].items()):
+ if permission not in permission_types:
+ permission_types[permission] = []
+ for item in group:
+ permission_types[permission].append(item)
+ for permission, groups in permission_types.items():
+ permission = "allow {}".format(permission)
+ for group in groups:
+ for pool in service['groups'][group].get('pools', []):
+ permissions.append("{} pool={}".format(permission, pool))
+ for permission, prefixes in sorted(
+ service.get("object_prefix_perms", {}).items()):
+ for prefix in prefixes:
+ permissions.append("allow {} object_prefix {}".format(permission,
+ prefix))
+ return ['mon', 'allow r, allow command "osd blacklist"',
+ 'osd', ', '.join(permissions)]
+
+
+def get_service_groups(service, namespace=None):
+ """Services are objects stored with some metadata, they look like (for a
+ service named "nova"):
+ {
+ group_names: {'rwx': ['images']},
+ groups: {}
+ }
+ After populating the group, it looks like:
+ {
+ group_names: {'rwx': ['images']},
+ groups: {
+ 'images': {
+ pools: ['glance'],
+ services: ['nova']
+ }
+ }
+ }
+ """
+ service_json = monitor_key_get(service='admin',
+ key="cephx.services.{}".format(service))
+ try:
+ service = json.loads(service_json)
+ except (TypeError, ValueError):
+ service = None
+ if service:
+ service['groups'] = _build_service_groups(service, namespace)
+ else:
+ service = {'group_names': {}, 'groups': {}}
+ return service
+
+
+def _build_service_groups(service, namespace=None):
+ """Rebuild the 'groups' dict for a service group
+
+ :returns: dict: dictionary keyed by group name of the following
+ format:
+
+ {
+ 'images': {
+ pools: ['glance'],
+ services: ['nova', 'glance]
+ },
+ 'vms':{
+ pools: ['nova'],
+ services: ['nova']
+ }
+ }
+ """
+ all_groups = {}
+ for groups in service['group_names'].values():
+ for group in groups:
+ name = group
+ if namespace:
+ name = "{}-{}".format(namespace, name)
+ all_groups[group] = get_group(group_name=name)
+ return all_groups
+
+
+def get_group(group_name):
+ """A group is a structure to hold data about a named group, structured as:
+ {
+ pools: ['glance'],
+ services: ['nova']
+ }
+ """
+ group_key = get_group_key(group_name=group_name)
+ group_json = monitor_key_get(service='admin', key=group_key)
+ try:
+ group = json.loads(group_json)
+ except (TypeError, ValueError):
+ group = None
+ if not group:
+ group = {
+ 'pools': [],
+ 'services': []
+ }
+ return group
+
+
+def save_service(service_name, service):
+ """Persist a service in the monitor cluster"""
+ service['groups'] = {}
+ return monitor_key_set(service='admin',
+ key="cephx.services.{}".format(service_name),
+ value=json.dumps(service, sort_keys=True))
+
+
+def save_group(group, group_name):
+ """Persist a group in the monitor cluster"""
+ group_key = get_group_key(group_name=group_name)
+ return monitor_key_set(service='admin',
+ key=group_key,
+ value=json.dumps(group, sort_keys=True))
+
+
+def get_group_key(group_name):
+ """Build group key"""
+ return 'cephx.groups.{}'.format(group_name)
+
+
+def handle_erasure_pool(request, service):
+ """Create a new erasure coded pool.
+
+ :param request: dict of request operations and params.
+ :param service: The ceph client to run the command under.
+ :returns: dict. exit-code and reason if not 0.
+ """
+ pool_name = request.get('name')
+ erasure_profile = request.get('erasure-profile')
+ quota = request.get('max-bytes')
+ weight = request.get('weight')
+ group_name = request.get('group')
+
+ if erasure_profile is None:
+ erasure_profile = "default-canonical"
+
+ app_name = request.get('app-name')
+
+ # Check for missing params
+ if pool_name is None:
+ msg = "Missing parameter. name is required for the pool"
+ log(msg, level=ERROR)
+ return {'exit-code': 1, 'stderr': msg}
+
+ if group_name:
+ group_namespace = request.get('group-namespace')
+ # Add the pool to the group named "group_name"
+ add_pool_to_group(pool=pool_name,
+ group=group_name,
+ namespace=group_namespace)
+
+ # TODO: Default to 3/2 erasure coding. I believe this requires min 5 osds
+ if not erasure_profile_exists(service=service, name=erasure_profile):
+ # TODO: Fail and tell them to create the profile or default
+ msg = ("erasure-profile {} does not exist. Please create it with: "
+ "create-erasure-profile".format(erasure_profile))
+ log(msg, level=ERROR)
+ return {'exit-code': 1, 'stderr': msg}
+
+ pool = ErasurePool(service=service, name=pool_name,
+ erasure_code_profile=erasure_profile,
+ percent_data=weight, app_name=app_name)
+ # Ok make the erasure pool
+ if not pool_exists(service=service, name=pool_name):
+ log("Creating pool '{}' (erasure_profile={})"
+ .format(pool.name, erasure_profile), level=INFO)
+ pool.create()
+
+ # Set a quota if requested
+ if quota is not None:
+ set_pool_quota(service=service, pool_name=pool_name, max_bytes=quota)
+
+
+def handle_replicated_pool(request, service):
+ """Create a new replicated pool.
+
+ :param request: dict of request operations and params.
+ :param service: The ceph client to run the command under.
+ :returns: dict. exit-code and reason if not 0.
+ """
+ pool_name = request.get('name')
+ replicas = request.get('replicas')
+ quota = request.get('max-bytes')
+ weight = request.get('weight')
+ group_name = request.get('group')
+
+ # Optional params
+ pg_num = request.get('pg_num')
+ if pg_num:
+ # Cap pg_num to max allowed just in case.
+ osds = get_osds(service)
+ if osds:
+ pg_num = min(pg_num, (len(osds) * 100 // replicas))
+
+ app_name = request.get('app-name')
+ # Check for missing params
+ if pool_name is None or replicas is None:
+ msg = "Missing parameter. name and replicas are required"
+ log(msg, level=ERROR)
+ return {'exit-code': 1, 'stderr': msg}
+
+ if group_name:
+ group_namespace = request.get('group-namespace')
+ # Add the pool to the group named "group_name"
+ add_pool_to_group(pool=pool_name,
+ group=group_name,
+ namespace=group_namespace)
+
+ kwargs = {}
+ if pg_num:
+ kwargs['pg_num'] = pg_num
+ if weight:
+ kwargs['percent_data'] = weight
+ if replicas:
+ kwargs['replicas'] = replicas
+ if app_name:
+ kwargs['app_name'] = app_name
+
+ pool = ReplicatedPool(service=service,
+ name=pool_name, **kwargs)
+ if not pool_exists(service=service, name=pool_name):
+ log("Creating pool '{}' (replicas={})".format(pool.name, replicas),
+ level=INFO)
+ pool.create()
+ else:
+ log("Pool '{}' already exists - skipping create".format(pool.name),
+ level=DEBUG)
+
+ # Set a quota if requested
+ if quota is not None:
+ set_pool_quota(service=service, pool_name=pool_name, max_bytes=quota)
+
+
+def handle_create_cache_tier(request, service):
+ """Create a cache tier on a cold pool. Modes supported are
+ "writeback" and "readonly".
+
+ :param request: dict of request operations and params
+ :param service: The ceph client to run the command under.
+ :returns: dict. exit-code and reason if not 0
+ """
+ # mode = "writeback" | "readonly"
+ storage_pool = request.get('cold-pool')
+ cache_pool = request.get('hot-pool')
+ cache_mode = request.get('mode')
+
+ if cache_mode is None:
+ cache_mode = "writeback"
+
+ # cache and storage pool must exist first
+ if not pool_exists(service=service, name=storage_pool) or not pool_exists(
+ service=service, name=cache_pool):
+ msg = ("cold-pool: {} and hot-pool: {} must exist. Please create "
+ "them first".format(storage_pool, cache_pool))
+ log(msg, level=ERROR)
+ return {'exit-code': 1, 'stderr': msg}
+
+ p = Pool(service=service, name=storage_pool)
+ p.add_cache_tier(cache_pool=cache_pool, mode=cache_mode)
+
+
+def handle_remove_cache_tier(request, service):
+ """Remove a cache tier from the cold pool.
+
+ :param request: dict of request operations and params
+ :param service: The ceph client to run the command under.
+ :returns: dict. exit-code and reason if not 0
+ """
+ storage_pool = request.get('cold-pool')
+ cache_pool = request.get('hot-pool')
+ # cache and storage pool must exist first
+ if not pool_exists(service=service, name=storage_pool) or not pool_exists(
+ service=service, name=cache_pool):
+ msg = ("cold-pool: {} or hot-pool: {} doesn't exist. Not "
+ "deleting cache tier".format(storage_pool, cache_pool))
+ log(msg, level=ERROR)
+ return {'exit-code': 1, 'stderr': msg}
+
+ pool = Pool(name=storage_pool, service=service)
+ pool.remove_cache_tier(cache_pool=cache_pool)
+
+
+def handle_set_pool_value(request, service):
+ """Sets an arbitrary pool value.
+
+ :param request: dict of request operations and params
+ :param service: The ceph client to run the command under.
+ :returns: dict. exit-code and reason if not 0
+ """
+ # Set arbitrary pool values
+ params = {'pool': request.get('name'),
+ 'key': request.get('key'),
+ 'value': request.get('value')}
+ if params['key'] not in POOL_KEYS:
+ msg = "Invalid key '{}'".format(params['key'])
+ log(msg, level=ERROR)
+ return {'exit-code': 1, 'stderr': msg}
+
+ # Get the validation method
+ validator_params = POOL_KEYS[params['key']]
+ if len(validator_params) is 1:
+ # Validate that what the user passed is actually legal per Ceph's rules
+ validator(params['value'], validator_params[0])
+ else:
+ # Validate that what the user passed is actually legal per Ceph's rules
+ validator(params['value'], validator_params[0], validator_params[1])
+
+ # Set the value
+ pool_set(service=service, pool_name=params['pool'], key=params['key'],
+ value=params['value'])
+
+
+def handle_rgw_regionmap_update(request, service):
+ """Change the radosgw region map.
+
+ :param request: dict of request operations and params
+ :param service: The ceph client to run the command under.
+ :returns: dict. exit-code and reason if not 0
+ """
+ name = request.get('client-name')
+ if not name:
+ msg = "Missing rgw-region or client-name params"
+ log(msg, level=ERROR)
+ return {'exit-code': 1, 'stderr': msg}
+ try:
+ check_output(['radosgw-admin',
+ '--id', service,
+ 'regionmap', 'update', '--name', name])
+ except CalledProcessError as err:
+ log(err.output, level=ERROR)
+ return {'exit-code': 1, 'stderr': err.output}
+
+
+def handle_rgw_regionmap_default(request, service):
+ """Create a radosgw region map.
+
+ :param request: dict of request operations and params
+ :param service: The ceph client to run the command under.
+ :returns: dict. exit-code and reason if not 0
+ """
+ region = request.get('rgw-region')
+ name = request.get('client-name')
+ if not region or not name:
+ msg = "Missing rgw-region or client-name params"
+ log(msg, level=ERROR)
+ return {'exit-code': 1, 'stderr': msg}
+ try:
+ check_output(
+ [
+ 'radosgw-admin',
+ '--id', service,
+ 'regionmap',
+ 'default',
+ '--rgw-region', region,
+ '--name', name])
+ except CalledProcessError as err:
+ log(err.output, level=ERROR)
+ return {'exit-code': 1, 'stderr': err.output}
+
+
+def handle_rgw_zone_set(request, service):
+ """Create a radosgw zone.
+
+ :param request: dict of request operations and params
+ :param service: The ceph client to run the command under.
+ :returns: dict. exit-code and reason if not 0
+ """
+ json_file = request.get('zone-json')
+ name = request.get('client-name')
+ region_name = request.get('region-name')
+ zone_name = request.get('zone-name')
+ if not json_file or not name or not region_name or not zone_name:
+ msg = "Missing json-file or client-name params"
+ log(msg, level=ERROR)
+ return {'exit-code': 1, 'stderr': msg}
+ infile = NamedTemporaryFile(delete=False)
+ with open(infile.name, 'w') as infile_handle:
+ infile_handle.write(json_file)
+ try:
+ check_output(
+ [
+ 'radosgw-admin',
+ '--id', service,
+ 'zone',
+ 'set',
+ '--rgw-zone', zone_name,
+ '--infile', infile.name,
+ '--name', name,
+ ]
+ )
+ except CalledProcessError as err:
+ log(err.output, level=ERROR)
+ return {'exit-code': 1, 'stderr': err.output}
+ os.unlink(infile.name)
+
+
+def handle_put_osd_in_bucket(request, service):
+ """Move an osd into a specified crush bucket.
+
+ :param request: dict of request operations and params
+ :param service: The ceph client to run the command under.
+ :returns: dict. exit-code and reason if not 0
+ """
+ osd_id = request.get('osd')
+ target_bucket = request.get('bucket')
+ if not osd_id or not target_bucket:
+ msg = "Missing OSD ID or Bucket"
+ log(msg, level=ERROR)
+ return {'exit-code': 1, 'stderr': msg}
+ crushmap = Crushmap()
+ try:
+ crushmap.ensure_bucket_is_present(target_bucket)
+ check_output(
+ [
+ 'ceph',
+ '--id', service,
+ 'osd',
+ 'crush',
+ 'set',
+ str(osd_id),
+ str(get_osd_weight(osd_id)),
+ "root={}".format(target_bucket)
+ ]
+ )
+
+ except Exception as exc:
+ msg = "Failed to move OSD " \
+ "{} into Bucket {} :: {}".format(osd_id, target_bucket, exc)
+ log(msg, level=ERROR)
+ return {'exit-code': 1, 'stderr': msg}
+
+
+def handle_rgw_create_user(request, service):
+ """Create a new rados gateway user.
+
+ :param request: dict of request operations and params
+ :param service: The ceph client to run the command under.
+ :returns: dict. exit-code and reason if not 0
+ """
+ user_id = request.get('rgw-uid')
+ display_name = request.get('display-name')
+ name = request.get('client-name')
+ if not name or not display_name or not user_id:
+ msg = "Missing client-name, display-name or rgw-uid"
+ log(msg, level=ERROR)
+ return {'exit-code': 1, 'stderr': msg}
+ try:
+ create_output = check_output(
+ [
+ 'radosgw-admin',
+ '--id', service,
+ 'user',
+ 'create',
+ '--uid', user_id,
+ '--display-name', display_name,
+ '--name', name,
+ '--system'
+ ]
+ )
+ try:
+ user_json = json.loads(str(create_output.decode('UTF-8')))
+ return {'exit-code': 0, 'user': user_json}
+ except ValueError as err:
+ log(err, level=ERROR)
+ return {'exit-code': 1, 'stderr': err}
+
+ except CalledProcessError as err:
+ log(err.output, level=ERROR)
+ return {'exit-code': 1, 'stderr': err.output}
+
+
+def handle_create_cephfs(request, service):
+ """Create a new cephfs.
+
+ :param request: The broker request
+ :param service: The ceph client to run the command under.
+ :returns: dict. exit-code and reason if not 0
+ """
+ cephfs_name = request.get('mds_name')
+ data_pool = request.get('data_pool')
+ metadata_pool = request.get('metadata_pool')
+ # Check if the user params were provided
+ if not cephfs_name or not data_pool or not metadata_pool:
+ msg = "Missing mds_name, data_pool or metadata_pool params"
+ log(msg, level=ERROR)
+ return {'exit-code': 1, 'stderr': msg}
+
+ # Sanity check that the required pools exist
+ if not pool_exists(service=service, name=data_pool):
+ msg = "CephFS data pool does not exist. Cannot create CephFS"
+ log(msg, level=ERROR)
+ return {'exit-code': 1, 'stderr': msg}
+ if not pool_exists(service=service, name=metadata_pool):
+ msg = "CephFS metadata pool does not exist. Cannot create CephFS"
+ log(msg, level=ERROR)
+ return {'exit-code': 1, 'stderr': msg}
+
+ if get_cephfs(service=service):
+ # CephFS new has already been called
+ log("CephFS already created")
+ return
+
+ # Finally create CephFS
+ try:
+ check_output(["ceph",
+ '--id', service,
+ "fs", "new", cephfs_name,
+ metadata_pool,
+ data_pool])
+ except CalledProcessError as err:
+ if err.returncode == 22:
+ log("CephFS already created")
+ return
+ else:
+ log(err.output, level=ERROR)
+ return {'exit-code': 1, 'stderr': err.output}
+
+
+def handle_rgw_region_set(request, service):
+ # radosgw-admin region set --infile us.json --name client.radosgw.us-east-1
+ """Set the rados gateway region.
+
+ :param request: dict. The broker request.
+ :param service: The ceph client to run the command under.
+ :returns: dict. exit-code and reason if not 0
+ """
+ json_file = request.get('region-json')
+ name = request.get('client-name')
+ region_name = request.get('region-name')
+ zone_name = request.get('zone-name')
+ if not json_file or not name or not region_name or not zone_name:
+ msg = "Missing json-file or client-name params"
+ log(msg, level=ERROR)
+ return {'exit-code': 1, 'stderr': msg}
+ infile = NamedTemporaryFile(delete=False)
+ with open(infile.name, 'w') as infile_handle:
+ infile_handle.write(json_file)
+ try:
+ check_output(
+ [
+ 'radosgw-admin',
+ '--id', service,
+ 'region',
+ 'set',
+ '--rgw-zone', zone_name,
+ '--infile', infile.name,
+ '--name', name,
+ ]
+ )
+ except CalledProcessError as err:
+ log(err.output, level=ERROR)
+ return {'exit-code': 1, 'stderr': err.output}
+ os.unlink(infile.name)
+
+
+def process_requests_v1(reqs):
+ """Process v1 requests.
+
+ Takes a list of requests (dicts) and processes each one. If an error is
+ found, processing stops and the client is notified in the response.
+
+ Returns a response dict containing the exit code (non-zero if any
+ operation failed along with an explanation).
+ """
+ ret = None
+ log("Processing {} ceph broker requests".format(len(reqs)), level=INFO)
+ for req in reqs:
+ op = req.get('op')
+ log("Processing op='{}'".format(op), level=DEBUG)
+ # Use admin client since we do not have other client key locations
+ # setup to use them for these operations.
+ svc = 'admin'
+ if op == "create-pool":
+ pool_type = req.get('pool-type') # "replicated" | "erasure"
+
+ # Default to replicated if pool_type isn't given
+ if pool_type == 'erasure':
+ ret = handle_erasure_pool(request=req, service=svc)
+ else:
+ ret = handle_replicated_pool(request=req, service=svc)
+ elif op == "create-cephfs":
+ ret = handle_create_cephfs(request=req, service=svc)
+ elif op == "create-cache-tier":
+ ret = handle_create_cache_tier(request=req, service=svc)
+ elif op == "remove-cache-tier":
+ ret = handle_remove_cache_tier(request=req, service=svc)
+ elif op == "create-erasure-profile":
+ ret = handle_create_erasure_profile(request=req, service=svc)
+ elif op == "delete-pool":
+ pool = req.get('name')
+ ret = delete_pool(service=svc, name=pool)
+ elif op == "rename-pool":
+ old_name = req.get('name')
+ new_name = req.get('new-name')
+ ret = rename_pool(service=svc, old_name=old_name,
+ new_name=new_name)
+ elif op == "snapshot-pool":
+ pool = req.get('name')
+ snapshot_name = req.get('snapshot-name')
+ ret = snapshot_pool(service=svc, pool_name=pool,
+ snapshot_name=snapshot_name)
+ elif op == "remove-pool-snapshot":
+ pool = req.get('name')
+ snapshot_name = req.get('snapshot-name')
+ ret = remove_pool_snapshot(service=svc, pool_name=pool,
+ snapshot_name=snapshot_name)
+ elif op == "set-pool-value":
+ ret = handle_set_pool_value(request=req, service=svc)
+ elif op == "rgw-region-set":
+ ret = handle_rgw_region_set(request=req, service=svc)
+ elif op == "rgw-zone-set":
+ ret = handle_rgw_zone_set(request=req, service=svc)
+ elif op == "rgw-regionmap-update":
+ ret = handle_rgw_regionmap_update(request=req, service=svc)
+ elif op == "rgw-regionmap-default":
+ ret = handle_rgw_regionmap_default(request=req, service=svc)
+ elif op == "rgw-create-user":
+ ret = handle_rgw_create_user(request=req, service=svc)
+ elif op == "move-osd-to-bucket":
+ ret = handle_put_osd_in_bucket(request=req, service=svc)
+ elif op == "add-permissions-to-key":
+ ret = handle_add_permissions_to_key(request=req, service=svc)
+ else:
+ msg = "Unknown operation '{}'".format(op)
+ log(msg, level=ERROR)
+ return {'exit-code': 1, 'stderr': msg}
+
+ if type(ret) == dict and 'exit-code' in ret:
+ return ret
+
+ return {'exit-code': 0}
diff --git a/lib/ceph/crush_utils.py b/lib/ceph/crush_utils.py
new file mode 100644
index 00000000..8b6876c1
--- /dev/null
+++ b/lib/ceph/crush_utils.py
@@ -0,0 +1,154 @@
+# Copyright 2014 Canonical Limited.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import re
+
+from subprocess import check_output, CalledProcessError
+
+from charmhelpers.core.hookenv import (
+ log,
+ ERROR,
+)
+
+CRUSH_BUCKET = """root {name} {{
+ id {id} # do not change unnecessarily
+ # weight 0.000
+ alg straw
+ hash 0 # rjenkins1
+}}
+
+rule {name} {{
+ ruleset 0
+ type replicated
+ min_size 1
+ max_size 10
+ step take {name}
+ step chooseleaf firstn 0 type host
+ step emit
+}}"""
+
+# This regular expression looks for a string like:
+# root NAME {
+# id NUMBER
+# so that we can extract NAME and ID from the crushmap
+CRUSHMAP_BUCKETS_RE = re.compile(r"root\s+(.+)\s+\{\s*id\s+(-?\d+)")
+
+# This regular expression looks for ID strings in the crushmap like:
+# id NUMBER
+# so that we can extract the IDs from a crushmap
+CRUSHMAP_ID_RE = re.compile(r"id\s+(-?\d+)")
+
+
+class Crushmap(object):
+ """An object oriented approach to Ceph crushmap management."""
+
+ def __init__(self):
+ self._crushmap = self.load_crushmap()
+ roots = re.findall(CRUSHMAP_BUCKETS_RE, self._crushmap)
+ buckets = []
+ ids = list(map(
+ lambda x: int(x),
+ re.findall(CRUSHMAP_ID_RE, self._crushmap)))
+ ids = sorted(ids)
+ if roots != []:
+ for root in roots:
+ buckets.append(CRUSHBucket(root[0], root[1], True))
+
+ self._buckets = buckets
+ if ids != []:
+ self._ids = ids
+ else:
+ self._ids = [0]
+
+ def load_crushmap(self):
+ try:
+ crush = str(check_output(['ceph', 'osd', 'getcrushmap'])
+ .decode('UTF-8'))
+ return str(check_output(['crushtool', '-d', '-'],
+ stdin=crush.stdout)
+ .decode('UTF-8'))
+ except CalledProcessError as e:
+ log("Error occured while loading and decompiling CRUSH map:"
+ "{}".format(e), ERROR)
+ raise "Failed to read CRUSH map"
+
+ def ensure_bucket_is_present(self, bucket_name):
+ if bucket_name not in [bucket.name for bucket in self.buckets()]:
+ self.add_bucket(bucket_name)
+ self.save()
+
+ def buckets(self):
+ """Return a list of buckets that are in the Crushmap."""
+ return self._buckets
+
+ def add_bucket(self, bucket_name):
+ """Add a named bucket to Ceph"""
+ new_id = min(self._ids) - 1
+ self._ids.append(new_id)
+ self._buckets.append(CRUSHBucket(bucket_name, new_id))
+
+ def save(self):
+ """Persist Crushmap to Ceph"""
+ try:
+ crushmap = self.build_crushmap()
+ compiled = str(check_output(['crushtool', '-c', '/dev/stdin', '-o',
+ '/dev/stdout'], stdin=crushmap)
+ .decode('UTF-8'))
+ ceph_output = str(check_output(['ceph', 'osd', 'setcrushmap', '-i',
+ '/dev/stdin'], stdin=compiled)
+ .decode('UTF-8'))
+ return ceph_output
+ except CalledProcessError as e:
+ log("save error: {}".format(e))
+ raise "Failed to save CRUSH map."
+
+ def build_crushmap(self):
+ """Modifies the current CRUSH map to include the new buckets"""
+ tmp_crushmap = self._crushmap
+ for bucket in self._buckets:
+ if not bucket.default:
+ tmp_crushmap = "{}\n\n{}".format(
+ tmp_crushmap,
+ Crushmap.bucket_string(bucket.name, bucket.id))
+
+ return tmp_crushmap
+
+ @staticmethod
+ def bucket_string(name, id):
+ return CRUSH_BUCKET.format(name=name, id=id)
+
+
+class CRUSHBucket(object):
+ """CRUSH bucket description object."""
+
+ def __init__(self, name, id, default=False):
+ self.name = name
+ self.id = int(id)
+ self.default = default
+
+ def __repr__(self):
+ return "Bucket {{Name: {name}, ID: {id}}}".format(
+ name=self.name, id=self.id)
+
+ def __eq__(self, other):
+ """Override the default Equals behavior"""
+ if isinstance(other, self.__class__):
+ return self.__dict__ == other.__dict__
+ return NotImplemented
+
+ def __ne__(self, other):
+ """Define a non-equality test"""
+ if isinstance(other, self.__class__):
+ return not self.__eq__(other)
+ return NotImplemented
diff --git a/lib/ceph/utils.py b/lib/ceph/utils.py
new file mode 100644
index 00000000..98320acb
--- /dev/null
+++ b/lib/ceph/utils.py
@@ -0,0 +1,2729 @@
+# Copyright 2017 Canonical Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import glob
+import json
+import os
+import pyudev
+import random
+import re
+import socket
+import subprocess
+import sys
+import time
+import uuid
+
+from datetime import datetime
+
+from charmhelpers.core import hookenv
+from charmhelpers.core import templating
+from charmhelpers.core.decorators import retry_on_exception
+from charmhelpers.core.host import (
+ chownr,
+ cmp_pkgrevno,
+ lsb_release,
+ mkdir,
+ owner,
+ service_restart,
+ service_start,
+ service_stop,
+ CompareHostReleases,
+)
+from charmhelpers.core.hookenv import (
+ cached,
+ config,
+ log,
+ status_set,
+ DEBUG,
+ ERROR,
+ WARNING,
+ storage_get,
+ storage_list,
+)
+from charmhelpers.fetch import (
+ apt_cache,
+ add_source, apt_install, apt_update
+)
+from charmhelpers.contrib.storage.linux.ceph import (
+ get_mon_map,
+ monitor_key_set,
+ monitor_key_exists,
+ monitor_key_get,
+)
+from charmhelpers.contrib.storage.linux.utils import (
+ is_block_device,
+ is_device_mounted,
+)
+from charmhelpers.contrib.openstack.utils import (
+ get_os_codename_install_source,
+)
+from charmhelpers.contrib.storage.linux import lvm
+from charmhelpers.core.unitdata import kv
+
+CEPH_BASE_DIR = os.path.join(os.sep, 'var', 'lib', 'ceph')
+OSD_BASE_DIR = os.path.join(CEPH_BASE_DIR, 'osd')
+HDPARM_FILE = os.path.join(os.sep, 'etc', 'hdparm.conf')
+
+LEADER = 'leader'
+PEON = 'peon'
+QUORUM = [LEADER, PEON]
+
+PACKAGES = ['ceph', 'gdisk', 'btrfs-tools',
+ 'radosgw', 'xfsprogs',
+ 'lvm2', 'parted']
+
+CEPH_KEY_MANAGER = 'ceph'
+VAULT_KEY_MANAGER = 'vault'
+KEY_MANAGERS = [
+ CEPH_KEY_MANAGER,
+ VAULT_KEY_MANAGER,
+]
+
+LinkSpeed = {
+ "BASE_10": 10,
+ "BASE_100": 100,
+ "BASE_1000": 1000,
+ "GBASE_10": 10000,
+ "GBASE_40": 40000,
+ "GBASE_100": 100000,
+ "UNKNOWN": None
+}
+
+# Mapping of adapter speed to sysctl settings
+NETWORK_ADAPTER_SYSCTLS = {
+ # 10Gb
+ LinkSpeed["GBASE_10"]: {
+ 'net.core.rmem_default': 524287,
+ 'net.core.wmem_default': 524287,
+ 'net.core.rmem_max': 524287,
+ 'net.core.wmem_max': 524287,
+ 'net.core.optmem_max': 524287,
+ 'net.core.netdev_max_backlog': 300000,
+ 'net.ipv4.tcp_rmem': '10000000 10000000 10000000',
+ 'net.ipv4.tcp_wmem': '10000000 10000000 10000000',
+ 'net.ipv4.tcp_mem': '10000000 10000000 10000000'
+ },
+ # Mellanox 10/40Gb
+ LinkSpeed["GBASE_40"]: {
+ 'net.ipv4.tcp_timestamps': 0,
+ 'net.ipv4.tcp_sack': 1,
+ 'net.core.netdev_max_backlog': 250000,
+ 'net.core.rmem_max': 4194304,
+ 'net.core.wmem_max': 4194304,
+ 'net.core.rmem_default': 4194304,
+ 'net.core.wmem_default': 4194304,
+ 'net.core.optmem_max': 4194304,
+ 'net.ipv4.tcp_rmem': '4096 87380 4194304',
+ 'net.ipv4.tcp_wmem': '4096 65536 4194304',
+ 'net.ipv4.tcp_low_latency': 1,
+ 'net.ipv4.tcp_adv_win_scale': 1
+ }
+}
+
+
+class Partition(object):
+ def __init__(self, name, number, size, start, end, sectors, uuid):
+ """A block device partition.
+
+ :param name: Name of block device
+ :param number: Partition number
+ :param size: Capacity of the device
+ :param start: Starting block
+ :param end: Ending block
+ :param sectors: Number of blocks
+ :param uuid: UUID of the partition
+ """
+ self.name = name,
+ self.number = number
+ self.size = size
+ self.start = start
+ self.end = end
+ self.sectors = sectors
+ self.uuid = uuid
+
+ def __str__(self):
+ return "number: {} start: {} end: {} sectors: {} size: {} " \
+ "name: {} uuid: {}".format(self.number, self.start,
+ self.end,
+ self.sectors, self.size,
+ self.name, self.uuid)
+
+ def __eq__(self, other):
+ if isinstance(other, self.__class__):
+ return self.__dict__ == other.__dict__
+ return False
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+
+def unmounted_disks():
+ """List of unmounted block devices on the current host."""
+ disks = []
+ context = pyudev.Context()
+ for device in context.list_devices(DEVTYPE='disk'):
+ if device['SUBSYSTEM'] == 'block':
+ matched = False
+ for block_type in [u'dm', u'loop', u'ram', u'nbd']:
+ if block_type in device.device_node:
+ matched = True
+ if matched:
+ continue
+ disks.append(device.device_node)
+ log("Found disks: {}".format(disks))
+ return [disk for disk in disks if not is_device_mounted(disk)]
+
+
+def save_sysctls(sysctl_dict, save_location):
+ """Persist the sysctls to the hard drive.
+
+ :param sysctl_dict: dict
+ :param save_location: path to save the settings to
+ :raises: IOError if anything goes wrong with writing.
+ """
+ try:
+ # Persist the settings for reboots
+ with open(save_location, "w") as fd:
+ for key, value in sysctl_dict.items():
+ fd.write("{}={}\n".format(key, value))
+
+ except IOError as e:
+ log("Unable to persist sysctl settings to {}. Error {}".format(
+ save_location, e), level=ERROR)
+ raise
+
+
+def tune_nic(network_interface):
+ """This will set optimal sysctls for the particular network adapter.
+
+ :param network_interface: string The network adapter name.
+ """
+ speed = get_link_speed(network_interface)
+ if speed in NETWORK_ADAPTER_SYSCTLS:
+ status_set('maintenance', 'Tuning device {}'.format(
+ network_interface))
+ sysctl_file = os.path.join(
+ os.sep,
+ 'etc',
+ 'sysctl.d',
+ '51-ceph-osd-charm-{}.conf'.format(network_interface))
+ try:
+ log("Saving sysctl_file: {} values: {}".format(
+ sysctl_file, NETWORK_ADAPTER_SYSCTLS[speed]),
+ level=DEBUG)
+ save_sysctls(sysctl_dict=NETWORK_ADAPTER_SYSCTLS[speed],
+ save_location=sysctl_file)
+ except IOError as e:
+ log("Write to /etc/sysctl.d/51-ceph-osd-charm-{} "
+ "failed. {}".format(network_interface, e),
+ level=ERROR)
+
+ try:
+ # Apply the settings
+ log("Applying sysctl settings", level=DEBUG)
+ subprocess.check_output(["sysctl", "-p", sysctl_file])
+ except subprocess.CalledProcessError as err:
+ log('sysctl -p {} failed with error {}'.format(sysctl_file,
+ err.output),
+ level=ERROR)
+ else:
+ log("No settings found for network adapter: {}".format(
+ network_interface), level=DEBUG)
+
+
+def get_link_speed(network_interface):
+ """This will find the link speed for a given network device. Returns None
+ if an error occurs.
+ :param network_interface: string The network adapter interface.
+ :returns: LinkSpeed
+ """
+ speed_path = os.path.join(os.sep, 'sys', 'class', 'net',
+ network_interface, 'speed')
+ # I'm not sure where else we'd check if this doesn't exist
+ if not os.path.exists(speed_path):
+ return LinkSpeed["UNKNOWN"]
+
+ try:
+ with open(speed_path, 'r') as sysfs:
+ nic_speed = sysfs.readlines()
+
+ # Did we actually read anything?
+ if not nic_speed:
+ return LinkSpeed["UNKNOWN"]
+
+ # Try to find a sysctl match for this particular speed
+ for name, speed in LinkSpeed.items():
+ if speed == int(nic_speed[0].strip()):
+ return speed
+ # Default to UNKNOWN if we can't find a match
+ return LinkSpeed["UNKNOWN"]
+ except IOError as e:
+ log("Unable to open {path} because of error: {error}".format(
+ path=speed_path,
+ error=e), level='error')
+ return LinkSpeed["UNKNOWN"]
+
+
+def persist_settings(settings_dict):
+ # Write all settings to /etc/hdparm.conf
+ """ This will persist the hard drive settings to the /etc/hdparm.conf file
+
+ The settings_dict should be in the form of {"uuid": {"key":"value"}}
+
+ :param settings_dict: dict of settings to save
+ """
+ if not settings_dict:
+ return
+
+ try:
+ templating.render(source='hdparm.conf', target=HDPARM_FILE,
+ context=settings_dict)
+ except IOError as err:
+ log("Unable to open {path} because of error: {error}".format(
+ path=HDPARM_FILE, error=err), level=ERROR)
+ except Exception as e:
+ # The templating.render can raise a jinja2 exception if the
+ # template is not found. Rather than polluting the import
+ # space of this charm, simply catch Exception
+ log('Unable to render {path} due to error: {error}'.format(
+ path=HDPARM_FILE, error=e), level=ERROR)
+
+
+def set_max_sectors_kb(dev_name, max_sectors_size):
+ """This function sets the max_sectors_kb size of a given block device.
+
+ :param dev_name: Name of the block device to query
+ :param max_sectors_size: int of the max_sectors_size to save
+ """
+ max_sectors_kb_path = os.path.join('sys', 'block', dev_name, 'queue',
+ 'max_sectors_kb')
+ try:
+ with open(max_sectors_kb_path, 'w') as f:
+ f.write(max_sectors_size)
+ except IOError as e:
+ log('Failed to write max_sectors_kb to {}. Error: {}'.format(
+ max_sectors_kb_path, e), level=ERROR)
+
+
+def get_max_sectors_kb(dev_name):
+ """This function gets the max_sectors_kb size of a given block device.
+
+ :param dev_name: Name of the block device to query
+ :returns: int which is either the max_sectors_kb or 0 on error.
+ """
+ max_sectors_kb_path = os.path.join('sys', 'block', dev_name, 'queue',
+ 'max_sectors_kb')
+
+ # Read in what Linux has set by default
+ if os.path.exists(max_sectors_kb_path):
+ try:
+ with open(max_sectors_kb_path, 'r') as f:
+ max_sectors_kb = f.read().strip()
+ return int(max_sectors_kb)
+ except IOError as e:
+ log('Failed to read max_sectors_kb to {}. Error: {}'.format(
+ max_sectors_kb_path, e), level=ERROR)
+ # Bail.
+ return 0
+ return 0
+
+
+def get_max_hw_sectors_kb(dev_name):
+ """This function gets the max_hw_sectors_kb for a given block device.
+
+ :param dev_name: Name of the block device to query
+ :returns: int which is either the max_hw_sectors_kb or 0 on error.
+ """
+ max_hw_sectors_kb_path = os.path.join('sys', 'block', dev_name, 'queue',
+ 'max_hw_sectors_kb')
+ # Read in what the hardware supports
+ if os.path.exists(max_hw_sectors_kb_path):
+ try:
+ with open(max_hw_sectors_kb_path, 'r') as f:
+ max_hw_sectors_kb = f.read().strip()
+ return int(max_hw_sectors_kb)
+ except IOError as e:
+ log('Failed to read max_hw_sectors_kb to {}. Error: {}'.format(
+ max_hw_sectors_kb_path, e), level=ERROR)
+ return 0
+ return 0
+
+
+def set_hdd_read_ahead(dev_name, read_ahead_sectors=256):
+ """This function sets the hard drive read ahead.
+
+ :param dev_name: Name of the block device to set read ahead on.
+ :param read_ahead_sectors: int How many sectors to read ahead.
+ """
+ try:
+ # Set the read ahead sectors to 256
+ log('Setting read ahead to {} for device {}'.format(
+ read_ahead_sectors,
+ dev_name))
+ subprocess.check_output(['hdparm',
+ '-a{}'.format(read_ahead_sectors),
+ dev_name])
+ except subprocess.CalledProcessError as e:
+ log('hdparm failed with error: {}'.format(e.output),
+ level=ERROR)
+
+
+def get_block_uuid(block_dev):
+ """This queries blkid to get the uuid for a block device.
+
+ :param block_dev: Name of the block device to query.
+ :returns: The UUID of the device or None on Error.
+ """
+ try:
+ block_info = str(subprocess
+ .check_output(['blkid', '-o', 'export', block_dev])
+ .decode('UTF-8'))
+ for tag in block_info.split('\n'):
+ parts = tag.split('=')
+ if parts[0] == 'UUID':
+ return parts[1]
+ return None
+ except subprocess.CalledProcessError as err:
+ log('get_block_uuid failed with error: {}'.format(err.output),
+ level=ERROR)
+ return None
+
+
+def check_max_sectors(save_settings_dict,
+ block_dev,
+ uuid):
+ """Tune the max_hw_sectors if needed.
+
+ make sure that /sys/.../max_sectors_kb matches max_hw_sectors_kb or at
+ least 1MB for spinning disks
+ If the box has a RAID card with cache this could go much bigger.
+
+ :param save_settings_dict: The dict used to persist settings
+ :param block_dev: A block device name: Example: /dev/sda
+ :param uuid: The uuid of the block device
+ """
+ dev_name = None
+ path_parts = os.path.split(block_dev)
+ if len(path_parts) == 2:
+ dev_name = path_parts[1]
+ else:
+ log('Unable to determine the block device name from path: {}'.format(
+ block_dev))
+ # Play it safe and bail
+ return
+ max_sectors_kb = get_max_sectors_kb(dev_name=dev_name)
+ max_hw_sectors_kb = get_max_hw_sectors_kb(dev_name=dev_name)
+
+ if max_sectors_kb < max_hw_sectors_kb:
+ # OK we have a situation where the hardware supports more than Linux is
+ # currently requesting
+ config_max_sectors_kb = hookenv.config('max-sectors-kb')
+ if config_max_sectors_kb < max_hw_sectors_kb:
+ # Set the max_sectors_kb to the config.yaml value if it is less
+ # than the max_hw_sectors_kb
+ log('Setting max_sectors_kb for device {} to {}'.format(
+ dev_name, config_max_sectors_kb))
+ save_settings_dict[
+ "drive_settings"][uuid][
+ "read_ahead_sect"] = config_max_sectors_kb
+ set_max_sectors_kb(dev_name=dev_name,
+ max_sectors_size=config_max_sectors_kb)
+ else:
+ # Set to the max_hw_sectors_kb
+ log('Setting max_sectors_kb for device {} to {}'.format(
+ dev_name, max_hw_sectors_kb))
+ save_settings_dict[
+ "drive_settings"][uuid]['read_ahead_sect'] = max_hw_sectors_kb
+ set_max_sectors_kb(dev_name=dev_name,
+ max_sectors_size=max_hw_sectors_kb)
+ else:
+ log('max_sectors_kb match max_hw_sectors_kb. No change needed for '
+ 'device: {}'.format(block_dev))
+
+
+def tune_dev(block_dev):
+ """Try to make some intelligent decisions with HDD tuning. Future work will
+ include optimizing SSDs.
+
+ This function will change the read ahead sectors and the max write
+ sectors for each block device.
+
+ :param block_dev: A block device name: Example: /dev/sda
+ """
+ uuid = get_block_uuid(block_dev)
+ if uuid is None:
+ log('block device {} uuid is None. Unable to save to '
+ 'hdparm.conf'.format(block_dev), level=DEBUG)
+ return
+ save_settings_dict = {}
+ log('Tuning device {}'.format(block_dev))
+ status_set('maintenance', 'Tuning device {}'.format(block_dev))
+ set_hdd_read_ahead(block_dev)
+ save_settings_dict["drive_settings"] = {}
+ save_settings_dict["drive_settings"][uuid] = {}
+ save_settings_dict["drive_settings"][uuid]['read_ahead_sect'] = 256
+
+ check_max_sectors(block_dev=block_dev,
+ save_settings_dict=save_settings_dict,
+ uuid=uuid)
+
+ persist_settings(settings_dict=save_settings_dict)
+ status_set('maintenance', 'Finished tuning device {}'.format(block_dev))
+
+
+def ceph_user():
+ if get_version() > 1:
+ return 'ceph'
+ else:
+ return "root"
+
+
+class CrushLocation(object):
+ def __init__(self,
+ name,
+ identifier,
+ host,
+ rack,
+ row,
+ datacenter,
+ chassis,
+ root):
+ self.name = name
+ self.identifier = identifier
+ self.host = host
+ self.rack = rack
+ self.row = row
+ self.datacenter = datacenter
+ self.chassis = chassis
+ self.root = root
+
+ def __str__(self):
+ return "name: {} id: {} host: {} rack: {} row: {} datacenter: {} " \
+ "chassis :{} root: {}".format(self.name, self.identifier,
+ self.host, self.rack, self.row,
+ self.datacenter, self.chassis,
+ self.root)
+
+ def __eq__(self, other):
+ return not self.name < other.name and not other.name < self.name
+
+ def __ne__(self, other):
+ return self.name < other.name or other.name < self.name
+
+ def __gt__(self, other):
+ return self.name > other.name
+
+ def __ge__(self, other):
+ return not self.name < other.name
+
+ def __le__(self, other):
+ return self.name < other.name
+
+
+def get_osd_weight(osd_id):
+ """Returns the weight of the specified OSD.
+
+ :returns: Float
+ :raises: ValueError if the monmap fails to parse.
+ :raises: CalledProcessError if our ceph command fails.
+ """
+ try:
+ tree = str(subprocess
+ .check_output(['ceph', 'osd', 'tree', '--format=json'])
+ .decode('UTF-8'))
+ try:
+ json_tree = json.loads(tree)
+ # Make sure children are present in the json
+ if not json_tree['nodes']:
+ return None
+ for device in json_tree['nodes']:
+ if device['type'] == 'osd' and device['name'] == osd_id:
+ return device['crush_weight']
+ except ValueError as v:
+ log("Unable to parse ceph tree json: {}. Error: {}".format(
+ tree, v))
+ raise
+ except subprocess.CalledProcessError as e:
+ log("ceph osd tree command failed with message: {}".format(
+ e))
+ raise
+
+
+def get_osd_tree(service):
+ """Returns the current osd map in JSON.
+
+ :returns: List.
+ :raises: ValueError if the monmap fails to parse.
+ Also raises CalledProcessError if our ceph command fails
+ """
+ try:
+ tree = str(subprocess
+ .check_output(['ceph', '--id', service,
+ 'osd', 'tree', '--format=json'])
+ .decode('UTF-8'))
+ try:
+ json_tree = json.loads(tree)
+ crush_list = []
+ # Make sure children are present in the json
+ if not json_tree['nodes']:
+ return None
+ host_nodes = [
+ node for node in json_tree['nodes']
+ if node['type'] == 'host'
+ ]
+ for host in host_nodes:
+ crush_list.append(
+ CrushLocation(
+ name=host.get('name'),
+ identifier=host['id'],
+ host=host.get('host'),
+ rack=host.get('rack'),
+ row=host.get('row'),
+ datacenter=host.get('datacenter'),
+ chassis=host.get('chassis'),
+ root=host.get('root')
+ )
+ )
+ return crush_list
+ except ValueError as v:
+ log("Unable to parse ceph tree json: {}. Error: {}".format(
+ tree, v))
+ raise
+ except subprocess.CalledProcessError as e:
+ log("ceph osd tree command failed with message: {}".format(
+ e))
+ raise
+
+
+def _get_child_dirs(path):
+ """Returns a list of directory names in the specified path.
+
+ :param path: a full path listing of the parent directory to return child
+ directory names
+ :returns: list. A list of child directories under the parent directory
+ :raises: ValueError if the specified path does not exist or is not a
+ directory,
+ OSError if an error occurs reading the directory listing
+ """
+ if not os.path.exists(path):
+ raise ValueError('Specfied path "%s" does not exist' % path)
+ if not os.path.isdir(path):
+ raise ValueError('Specified path "%s" is not a directory' % path)
+
+ files_in_dir = [os.path.join(path, f) for f in os.listdir(path)]
+ return list(filter(os.path.isdir, files_in_dir))
+
+
+def _get_osd_num_from_dirname(dirname):
+ """Parses the dirname and returns the OSD id.
+
+ Parses a string in the form of 'ceph-{osd#}' and returns the osd number
+ from the directory name.
+
+ :param dirname: the directory name to return the OSD number from
+ :return int: the osd number the directory name corresponds to
+ :raises ValueError: if the osd number cannot be parsed from the provided
+ directory name.
+ """
+ match = re.search('ceph-(?P\d+)', dirname)
+ if not match:
+ raise ValueError("dirname not in correct format: {}".format(dirname))
+
+ return match.group('osd_id')
+
+
+def get_local_osd_ids():
+ """This will list the /var/lib/ceph/osd/* directories and try
+ to split the ID off of the directory name and return it in
+ a list.
+
+ :returns: list. A list of osd identifiers
+ :raises: OSError if something goes wrong with listing the directory.
+ """
+ osd_ids = []
+ osd_path = os.path.join(os.sep, 'var', 'lib', 'ceph', 'osd')
+ if os.path.exists(osd_path):
+ try:
+ dirs = os.listdir(osd_path)
+ for osd_dir in dirs:
+ osd_id = osd_dir.split('-')[1]
+ if _is_int(osd_id):
+ osd_ids.append(osd_id)
+ except OSError:
+ raise
+ return osd_ids
+
+
+def get_local_mon_ids():
+ """This will list the /var/lib/ceph/mon/* directories and try
+ to split the ID off of the directory name and return it in
+ a list.
+
+ :returns: list. A list of monitor identifiers
+ :raises: OSError if something goes wrong with listing the directory.
+ """
+ mon_ids = []
+ mon_path = os.path.join(os.sep, 'var', 'lib', 'ceph', 'mon')
+ if os.path.exists(mon_path):
+ try:
+ dirs = os.listdir(mon_path)
+ for mon_dir in dirs:
+ # Basically this takes everything after ceph- as the monitor ID
+ match = re.search('ceph-(?P.*)', mon_dir)
+ if match:
+ mon_ids.append(match.group('mon_id'))
+ except OSError:
+ raise
+ return mon_ids
+
+
+def _is_int(v):
+ """Return True if the object v can be turned into an integer."""
+ try:
+ int(v)
+ return True
+ except ValueError:
+ return False
+
+
+def get_version():
+ """Derive Ceph release from an installed package."""
+ import apt_pkg as apt
+
+ cache = apt_cache()
+ package = "ceph"
+ try:
+ pkg = cache[package]
+ except:
+ # the package is unknown to the current apt cache.
+ e = 'Could not determine version of package with no installation ' \
+ 'candidate: %s' % package
+ error_out(e)
+
+ if not pkg.current_ver:
+ # package is known, but no version is currently installed.
+ e = 'Could not determine version of uninstalled package: %s' % package
+ error_out(e)
+
+ vers = apt.upstream_version(pkg.current_ver.ver_str)
+
+ # x.y match only for 20XX.X
+ # and ignore patch level for other packages
+ match = re.match('^(\d+)\.(\d+)', vers)
+
+ if match:
+ vers = match.group(0)
+ return float(vers)
+
+
+def error_out(msg):
+ log("FATAL ERROR: {}".format(msg),
+ level=ERROR)
+ sys.exit(1)
+
+
+def is_quorum():
+ asok = "/var/run/ceph/ceph-mon.{}.asok".format(socket.gethostname())
+ cmd = [
+ "sudo",
+ "-u",
+ ceph_user(),
+ "ceph",
+ "--admin-daemon",
+ asok,
+ "mon_status"
+ ]
+ if os.path.exists(asok):
+ try:
+ result = json.loads(str(subprocess
+ .check_output(cmd)
+ .decode('UTF-8')))
+ except subprocess.CalledProcessError:
+ return False
+ except ValueError:
+ # Non JSON response from mon_status
+ return False
+ if result['state'] in QUORUM:
+ return True
+ else:
+ return False
+ else:
+ return False
+
+
+def is_leader():
+ asok = "/var/run/ceph/ceph-mon.{}.asok".format(socket.gethostname())
+ cmd = [
+ "sudo",
+ "-u",
+ ceph_user(),
+ "ceph",
+ "--admin-daemon",
+ asok,
+ "mon_status"
+ ]
+ if os.path.exists(asok):
+ try:
+ result = json.loads(str(subprocess
+ .check_output(cmd)
+ .decode('UTF-8')))
+ except subprocess.CalledProcessError:
+ return False
+ except ValueError:
+ # Non JSON response from mon_status
+ return False
+ if result['state'] == LEADER:
+ return True
+ else:
+ return False
+ else:
+ return False
+
+
+def wait_for_quorum():
+ while not is_quorum():
+ log("Waiting for quorum to be reached")
+ time.sleep(3)
+
+
+def add_bootstrap_hint(peer):
+ asok = "/var/run/ceph/ceph-mon.{}.asok".format(socket.gethostname())
+ cmd = [
+ "sudo",
+ "-u",
+ ceph_user(),
+ "ceph",
+ "--admin-daemon",
+ asok,
+ "add_bootstrap_peer_hint",
+ peer
+ ]
+ if os.path.exists(asok):
+ # Ignore any errors for this call
+ subprocess.call(cmd)
+
+
+DISK_FORMATS = [
+ 'xfs',
+ 'ext4',
+ 'btrfs'
+]
+
+CEPH_PARTITIONS = [
+ '89C57F98-2FE5-4DC0-89C1-5EC00CEFF2BE', # ceph encrypted disk in creation
+ '45B0969E-9B03-4F30-B4C6-5EC00CEFF106', # ceph encrypted journal
+ '4FBD7E29-9D25-41B8-AFD0-5EC00CEFF05D', # ceph encrypted osd data
+ '4FBD7E29-9D25-41B8-AFD0-062C0CEFF05D', # ceph osd data
+ '45B0969E-9B03-4F30-B4C6-B4B80CEFF106', # ceph osd journal
+ '89C57F98-2FE5-4DC0-89C1-F3AD0CEFF2BE', # ceph disk in creation
+]
+
+
+def get_partition_list(dev):
+ """Lists the partitions of a block device.
+
+ :param dev: Path to a block device. ex: /dev/sda
+ :returns: Returns a list of Partition objects.
+ :raises: CalledProcessException if lsblk fails
+ """
+ partitions_list = []
+ try:
+ partitions = get_partitions(dev)
+ # For each line of output
+ for partition in partitions:
+ parts = partition.split()
+ try:
+ partitions_list.append(
+ Partition(number=parts[0],
+ start=parts[1],
+ end=parts[2],
+ sectors=parts[3],
+ size=parts[4],
+ name=parts[5],
+ uuid=parts[6])
+ )
+ except IndexError:
+ partitions_list.append(
+ Partition(number=parts[0],
+ start=parts[1],
+ end=parts[2],
+ sectors=parts[3],
+ size=parts[4],
+ name="",
+ uuid=parts[5])
+ )
+
+ return partitions_list
+ except subprocess.CalledProcessError:
+ raise
+
+
+def is_pristine_disk(dev):
+ """
+ Read first 2048 bytes (LBA 0 - 3) of block device to determine whether it
+ is actually all zeros and safe for us to use.
+
+ Existing partitioning tools does not discern between a failure to read from
+ block device, failure to understand a partition table and the fact that a
+ block device has no partition table. Since we need to be positive about
+ which is which we need to read the device directly and confirm ourselves.
+
+ :param dev: Path to block device
+ :type dev: str
+ :returns: True all 2048 bytes == 0x0, False if not
+ :rtype: bool
+ """
+ want_bytes = 2048
+
+ f = open(dev, 'rb')
+ data = f.read(want_bytes)
+ read_bytes = len(data)
+ if read_bytes != want_bytes:
+ log('{}: short read, got {} bytes expected {}.'
+ .format(dev, read_bytes, want_bytes), level=WARNING)
+ return False
+
+ return all(byte == 0x0 for byte in data)
+
+
+def is_osd_disk(dev):
+ db = kv()
+ osd_devices = db.get('osd-devices', [])
+ if dev in osd_devices:
+ log('Device {} already processed by charm,'
+ ' skipping'.format(dev))
+ return True
+
+ partitions = get_partition_list(dev)
+ for partition in partitions:
+ try:
+ info = str(subprocess
+ .check_output(['sgdisk', '-i', partition.number, dev])
+ .decode('UTF-8'))
+ info = info.split("\n") # IGNORE:E1103
+ for line in info:
+ for ptype in CEPH_PARTITIONS:
+ sig = 'Partition GUID code: {}'.format(ptype)
+ if line.startswith(sig):
+ return True
+ except subprocess.CalledProcessError as e:
+ log("sgdisk inspection of partition {} on {} failed with "
+ "error: {}. Skipping".format(partition.minor, dev, e),
+ level=ERROR)
+ return False
+
+
+def start_osds(devices):
+ # Scan for ceph block devices
+ rescan_osd_devices()
+ if cmp_pkgrevno('ceph', "0.56.6") >= 0:
+ # Use ceph-disk activate for directory based OSD's
+ for dev_or_path in devices:
+ if os.path.exists(dev_or_path) and os.path.isdir(dev_or_path):
+ subprocess.check_call(['ceph-disk', 'activate', dev_or_path])
+
+
+def udevadm_settle():
+ cmd = ['udevadm', 'settle']
+ subprocess.call(cmd)
+
+
+def rescan_osd_devices():
+ cmd = [
+ 'udevadm', 'trigger',
+ '--subsystem-match=block', '--action=add'
+ ]
+
+ subprocess.call(cmd)
+
+ udevadm_settle()
+
+
+_bootstrap_keyring = "/var/lib/ceph/bootstrap-osd/ceph.keyring"
+_upgrade_keyring = "/var/lib/ceph/osd/ceph.client.osd-upgrade.keyring"
+
+
+def is_bootstrapped():
+ return os.path.exists(_bootstrap_keyring)
+
+
+def wait_for_bootstrap():
+ while not is_bootstrapped():
+ time.sleep(3)
+
+
+def import_osd_bootstrap_key(key):
+ if not os.path.exists(_bootstrap_keyring):
+ cmd = [
+ "sudo",
+ "-u",
+ ceph_user(),
+ 'ceph-authtool',
+ _bootstrap_keyring,
+ '--create-keyring',
+ '--name=client.bootstrap-osd',
+ '--add-key={}'.format(key)
+ ]
+ subprocess.check_call(cmd)
+
+
+def import_osd_upgrade_key(key):
+ if not os.path.exists(_upgrade_keyring):
+ cmd = [
+ "sudo",
+ "-u",
+ ceph_user(),
+ 'ceph-authtool',
+ _upgrade_keyring,
+ '--create-keyring',
+ '--name=client.osd-upgrade',
+ '--add-key={}'.format(key)
+ ]
+ subprocess.check_call(cmd)
+
+
+def generate_monitor_secret():
+ cmd = [
+ 'ceph-authtool',
+ '/dev/stdout',
+ '--name=mon.',
+ '--gen-key'
+ ]
+ res = str(subprocess.check_output(cmd).decode('UTF-8'))
+
+ return "{}==".format(res.split('=')[1].strip())
+
+# OSD caps taken from ceph-create-keys
+_osd_bootstrap_caps = {
+ 'mon': [
+ 'allow command osd create ...',
+ 'allow command osd crush set ...',
+ r'allow command auth add * osd allow\ * mon allow\ rwx',
+ 'allow command mon getmap'
+ ]
+}
+
+_osd_bootstrap_caps_profile = {
+ 'mon': [
+ 'allow profile bootstrap-osd'
+ ]
+}
+
+
+def parse_key(raw_key):
+ # get-or-create appears to have different output depending
+ # on whether its 'get' or 'create'
+ # 'create' just returns the key, 'get' is more verbose and
+ # needs parsing
+ key = None
+ if len(raw_key.splitlines()) == 1:
+ key = raw_key
+ else:
+ for element in raw_key.splitlines():
+ if 'key' in element:
+ return element.split(' = ')[1].strip() # IGNORE:E1103
+ return key
+
+
+def get_osd_bootstrap_key():
+ try:
+ # Attempt to get/create a key using the OSD bootstrap profile first
+ key = get_named_key('bootstrap-osd',
+ _osd_bootstrap_caps_profile)
+ except:
+ # If that fails try with the older style permissions
+ key = get_named_key('bootstrap-osd',
+ _osd_bootstrap_caps)
+ return key
+
+
+_radosgw_keyring = "/etc/ceph/keyring.rados.gateway"
+
+
+def import_radosgw_key(key):
+ if not os.path.exists(_radosgw_keyring):
+ cmd = [
+ "sudo",
+ "-u",
+ ceph_user(),
+ 'ceph-authtool',
+ _radosgw_keyring,
+ '--create-keyring',
+ '--name=client.radosgw.gateway',
+ '--add-key={}'.format(key)
+ ]
+ subprocess.check_call(cmd)
+
+# OSD caps taken from ceph-create-keys
+_radosgw_caps = {
+ 'mon': ['allow rw'],
+ 'osd': ['allow rwx']
+}
+_upgrade_caps = {
+ 'mon': ['allow rwx']
+}
+
+
+def get_radosgw_key(pool_list=None, name=None):
+ return get_named_key(name=name or 'radosgw.gateway',
+ caps=_radosgw_caps,
+ pool_list=pool_list)
+
+
+def get_mds_key(name):
+ return create_named_keyring(entity='mds',
+ name=name,
+ caps=mds_caps)
+
+
+_mds_bootstrap_caps_profile = {
+ 'mon': [
+ 'allow profile bootstrap-mds'
+ ]
+}
+
+
+def get_mds_bootstrap_key():
+ return get_named_key('bootstrap-mds',
+ _mds_bootstrap_caps_profile)
+
+
+_default_caps = collections.OrderedDict([
+ ('mon', ['allow r',
+ 'allow command "osd blacklist"']),
+ ('osd', ['allow rwx']),
+])
+
+admin_caps = collections.OrderedDict([
+ ('mds', ['allow *']),
+ ('mon', ['allow *']),
+ ('osd', ['allow *'])
+])
+
+mds_caps = collections.OrderedDict([
+ ('osd', ['allow *']),
+ ('mds', ['allow']),
+ ('mon', ['allow rwx']),
+])
+
+osd_upgrade_caps = collections.OrderedDict([
+ ('mon', ['allow command "config-key"',
+ 'allow command "osd tree"',
+ 'allow command "config-key list"',
+ 'allow command "config-key put"',
+ 'allow command "config-key get"',
+ 'allow command "config-key exists"',
+ 'allow command "osd out"',
+ 'allow command "osd in"',
+ 'allow command "osd rm"',
+ 'allow command "auth del"',
+ ])
+])
+
+
+def create_named_keyring(entity, name, caps=None):
+ caps = caps or _default_caps
+ cmd = [
+ "sudo",
+ "-u",
+ ceph_user(),
+ 'ceph',
+ '--name', 'mon.',
+ '--keyring',
+ '/var/lib/ceph/mon/ceph-{}/keyring'.format(
+ socket.gethostname()
+ ),
+ 'auth', 'get-or-create', '{entity}.{name}'.format(entity=entity,
+ name=name),
+ ]
+ for subsystem, subcaps in caps.items():
+ cmd.extend([subsystem, '; '.join(subcaps)])
+ log("Calling check_output: {}".format(cmd), level=DEBUG)
+ return (parse_key(str(subprocess
+ .check_output(cmd)
+ .decode('UTF-8'))
+ .strip())) # IGNORE:E1103
+
+
+def get_upgrade_key():
+ return get_named_key('upgrade-osd', _upgrade_caps)
+
+
+def get_named_key(name, caps=None, pool_list=None):
+ """Retrieve a specific named cephx key.
+
+ :param name: String Name of key to get.
+ :param pool_list: The list of pools to give access to
+ :param caps: dict of cephx capabilities
+ :returns: Returns a cephx key
+ """
+ key_name = 'client.{}'.format(name)
+ try:
+ # Does the key already exist?
+ output = str(subprocess.check_output(
+ [
+ 'sudo',
+ '-u', ceph_user(),
+ 'ceph',
+ '--name', 'mon.',
+ '--keyring',
+ '/var/lib/ceph/mon/ceph-{}/keyring'.format(
+ socket.gethostname()
+ ),
+ 'auth',
+ 'get',
+ key_name,
+ ]).decode('UTF-8')).strip()
+ # NOTE(jamespage);
+ # Apply any changes to key capabilities, dealing with
+ # upgrades which requires new caps for operation.
+ upgrade_key_caps(key_name,
+ caps or _default_caps,
+ pool_list)
+ return parse_key(output)
+ except subprocess.CalledProcessError:
+ # Couldn't get the key, time to create it!
+ log("Creating new key for {}".format(name), level=DEBUG)
+ caps = caps or _default_caps
+ cmd = [
+ "sudo",
+ "-u",
+ ceph_user(),
+ 'ceph',
+ '--name', 'mon.',
+ '--keyring',
+ '/var/lib/ceph/mon/ceph-{}/keyring'.format(
+ socket.gethostname()
+ ),
+ 'auth', 'get-or-create', key_name,
+ ]
+ # Add capabilities
+ for subsystem, subcaps in caps.items():
+ if subsystem == 'osd':
+ if pool_list:
+ # This will output a string similar to:
+ # "pool=rgw pool=rbd pool=something"
+ pools = " ".join(['pool={0}'.format(i) for i in pool_list])
+ subcaps[0] = subcaps[0] + " " + pools
+ cmd.extend([subsystem, '; '.join(subcaps)])
+
+ log("Calling check_output: {}".format(cmd), level=DEBUG)
+ return parse_key(str(subprocess
+ .check_output(cmd)
+ .decode('UTF-8'))
+ .strip()) # IGNORE:E1103
+
+
+def upgrade_key_caps(key, caps, pool_list=None):
+ """ Upgrade key to have capabilities caps """
+ if not is_leader():
+ # Not the MON leader OR not clustered
+ return
+ cmd = [
+ "sudo", "-u", ceph_user(), 'ceph', 'auth', 'caps', key
+ ]
+ for subsystem, subcaps in caps.items():
+ if subsystem == 'osd':
+ if pool_list:
+ # This will output a string similar to:
+ # "pool=rgw pool=rbd pool=something"
+ pools = " ".join(['pool={0}'.format(i) for i in pool_list])
+ subcaps[0] = subcaps[0] + " " + pools
+ cmd.extend([subsystem, '; '.join(subcaps)])
+ subprocess.check_call(cmd)
+
+
+@cached
+def systemd():
+ return CompareHostReleases(lsb_release()['DISTRIB_CODENAME']) >= 'vivid'
+
+
+def bootstrap_monitor_cluster(secret):
+ hostname = socket.gethostname()
+ path = '/var/lib/ceph/mon/ceph-{}'.format(hostname)
+ done = '{}/done'.format(path)
+ if systemd():
+ init_marker = '{}/systemd'.format(path)
+ else:
+ init_marker = '{}/upstart'.format(path)
+
+ keyring = '/var/lib/ceph/tmp/{}.mon.keyring'.format(hostname)
+
+ if os.path.exists(done):
+ log('bootstrap_monitor_cluster: mon already initialized.')
+ else:
+ # Ceph >= 0.61.3 needs this for ceph-mon fs creation
+ mkdir('/var/run/ceph', owner=ceph_user(),
+ group=ceph_user(), perms=0o755)
+ mkdir(path, owner=ceph_user(), group=ceph_user(),
+ perms=0o755)
+ # end changes for Ceph >= 0.61.3
+ try:
+ add_keyring_to_ceph(keyring,
+ secret,
+ hostname,
+ path,
+ done,
+ init_marker)
+
+ except:
+ raise
+ finally:
+ os.unlink(keyring)
+
+
+@retry_on_exception(3, base_delay=5)
+def add_keyring_to_ceph(keyring, secret, hostname, path, done, init_marker):
+ subprocess.check_call(['ceph-authtool', keyring,
+ '--create-keyring', '--name=mon.',
+ '--add-key={}'.format(secret),
+ '--cap', 'mon', 'allow *'])
+ subprocess.check_call(['ceph-mon', '--mkfs',
+ '-i', hostname,
+ '--keyring', keyring])
+ chownr('/var/log/ceph', ceph_user(), ceph_user())
+ chownr(path, ceph_user(), ceph_user())
+ with open(done, 'w'):
+ pass
+ with open(init_marker, 'w'):
+ pass
+
+ if systemd():
+ subprocess.check_call(['systemctl', 'enable', 'ceph-mon'])
+ service_restart('ceph-mon')
+ else:
+ service_restart('ceph-mon-all')
+
+ # NOTE(jamespage): Later ceph releases require explicit
+ # call to ceph-create-keys to setup the
+ # admin keys for the cluster; this command
+ # will wait for quorum in the cluster before
+ # returning.
+ # NOTE(fnordahl): Explicitly run `ceph-crate-keys` for older
+ # ceph releases too. This improves bootstrap
+ # resilience as the charm will wait for
+ # presence of peer units before attempting
+ # to bootstrap. Note that charms deploying
+ # ceph-mon service should disable running of
+ # `ceph-create-keys` service in init system.
+ cmd = ['ceph-create-keys', '--id', hostname]
+ if cmp_pkgrevno('ceph', '12.0.0') >= 0:
+ # NOTE(fnordahl): The default timeout in ceph-create-keys of 600
+ # seconds is not adequate. Increase timeout when
+ # timeout parameter available. For older releases
+ # we rely on retry_on_exception decorator.
+ # LP#1719436
+ cmd.extend(['--timeout', '1800'])
+ subprocess.check_call(cmd)
+ _client_admin_keyring = '/etc/ceph/ceph.client.admin.keyring'
+ osstat = os.stat(_client_admin_keyring)
+ if not osstat.st_size:
+ # NOTE(fnordahl): Retry will fail as long as this file exists.
+ # LP#1719436
+ os.remove(_client_admin_keyring)
+ raise Exception
+
+
+def update_monfs():
+ hostname = socket.gethostname()
+ monfs = '/var/lib/ceph/mon/ceph-{}'.format(hostname)
+ if systemd():
+ init_marker = '{}/systemd'.format(monfs)
+ else:
+ init_marker = '{}/upstart'.format(monfs)
+ if os.path.exists(monfs) and not os.path.exists(init_marker):
+ # Mark mon as managed by upstart so that
+ # it gets start correctly on reboots
+ with open(init_marker, 'w'):
+ pass
+
+
+def get_partitions(dev):
+ cmd = ['partx', '--raw', '--noheadings', dev]
+ try:
+ out = str(subprocess.check_output(cmd).decode('UTF-8')).splitlines()
+ log("get partitions: {}".format(out), level=DEBUG)
+ return out
+ except subprocess.CalledProcessError as e:
+ log("Can't get info for {0}: {1}".format(dev, e.output))
+ return []
+
+
+def get_lvs(dev):
+ """
+ List logical volumes for the provided block device
+
+ :param: dev: Full path to block device.
+ :raises subprocess.CalledProcessError: in the event that any supporting
+ operation failed.
+ :returns: list: List of logical volumes provided by the block device
+ """
+ if not lvm.is_lvm_physical_volume(dev):
+ return []
+ vg_name = lvm.list_lvm_volume_group(dev)
+ return lvm.list_logical_volumes('vg_name={}'.format(vg_name))
+
+
+def find_least_used_utility_device(utility_devices, lvs=False):
+ """
+ Find a utility device which has the smallest number of partitions
+ among other devices in the supplied list.
+
+ :utility_devices: A list of devices to be used for filestore journal
+ or bluestore wal or db.
+ :lvs: flag to indicate whether inspection should be based on LVM LV's
+ :return: string device name
+ """
+ if lvs:
+ usages = map(lambda a: (len(get_lvs(a)), a), utility_devices)
+ else:
+ usages = map(lambda a: (len(get_partitions(a)), a), utility_devices)
+ least = min(usages, key=lambda t: t[0])
+ return least[1]
+
+
+def get_devices(name):
+ """ Merge config and juju storage based devices
+
+ :name: THe name of the device type, eg: wal, osd, journal
+ :returns: Set(device names), which are strings
+ """
+ if config(name):
+ devices = [l.strip() for l in config(name).split(' ')]
+ else:
+ devices = []
+ storage_ids = storage_list(name)
+ devices.extend((storage_get('location', s) for s in storage_ids))
+ devices = filter(os.path.exists, devices)
+
+ return set(devices)
+
+
+def osdize(dev, osd_format, osd_journal, ignore_errors=False, encrypt=False,
+ bluestore=False, key_manager=CEPH_KEY_MANAGER):
+ if dev.startswith('/dev'):
+ osdize_dev(dev, osd_format, osd_journal,
+ ignore_errors, encrypt,
+ bluestore, key_manager)
+ else:
+ osdize_dir(dev, encrypt, bluestore)
+
+
+def osdize_dev(dev, osd_format, osd_journal, ignore_errors=False,
+ encrypt=False, bluestore=False, key_manager=CEPH_KEY_MANAGER):
+ """
+ Prepare a block device for use as a Ceph OSD
+
+ A block device will only be prepared once during the lifetime
+ of the calling charm unit; future executions will be skipped.
+
+ :param: dev: Full path to block device to use
+ :param: osd_format: Format for OSD filesystem
+ :param: osd_journal: List of block devices to use for OSD journals
+ :param: ignore_errors: Don't fail in the event of any errors during
+ processing
+ :param: encrypt: Encrypt block devices using 'key_manager'
+ :param: bluestore: Use bluestore native ceph block device format
+ :param: key_manager: Key management approach for encryption keys
+ :raises subprocess.CalledProcessError: in the event that any supporting
+ subprocess operation failed
+ :raises ValueError: if an invalid key_manager is provided
+ """
+ if key_manager not in KEY_MANAGERS:
+ raise ValueError('Unsupported key manager: {}'.format(key_manager))
+
+ db = kv()
+ osd_devices = db.get('osd-devices', [])
+ try:
+ if dev in osd_devices:
+ log('Device {} already processed by charm,'
+ ' skipping'.format(dev))
+ return
+
+ if not os.path.exists(dev):
+ log('Path {} does not exist - bailing'.format(dev))
+ return
+
+ if not is_block_device(dev):
+ log('Path {} is not a block device - bailing'.format(dev))
+ return
+
+ if is_osd_disk(dev):
+ log('Looks like {} is already an'
+ ' OSD data or journal, skipping.'.format(dev))
+ if is_device_mounted(dev):
+ osd_devices.append(dev)
+ return
+
+ if is_device_mounted(dev):
+ log('Looks like {} is in use, skipping.'.format(dev))
+ return
+
+ if is_active_bluestore_device(dev):
+ log('{} is in use as an active bluestore block device,'
+ ' skipping.'.format(dev))
+ osd_devices.append(dev)
+ return
+
+ if is_mapped_luks_device(dev):
+ log('{} is a mapped LUKS device,'
+ ' skipping.'.format(dev))
+ return
+
+ if cmp_pkgrevno('ceph', '12.2.4') >= 0:
+ cmd = _ceph_volume(dev,
+ osd_journal,
+ encrypt,
+ bluestore,
+ key_manager)
+ else:
+ cmd = _ceph_disk(dev,
+ osd_format,
+ osd_journal,
+ encrypt,
+ bluestore)
+
+ try:
+ status_set('maintenance', 'Initializing device {}'.format(dev))
+ log("osdize cmd: {}".format(cmd))
+ subprocess.check_call(cmd)
+ except subprocess.CalledProcessError:
+ try:
+ lsblk_output = subprocess.check_output(
+ ['lsblk', '-P']).decode('UTF-8')
+ except subprocess.CalledProcessError as e:
+ log("Couldn't get lsblk output: {}".format(e), ERROR)
+ if ignore_errors:
+ log('Unable to initialize device: {}'.format(dev), WARNING)
+ if lsblk_output:
+ log('lsblk output: {}'.format(lsblk_output), DEBUG)
+ else:
+ log('Unable to initialize device: {}'.format(dev), ERROR)
+ if lsblk_output:
+ log('lsblk output: {}'.format(lsblk_output), WARNING)
+ raise
+
+ # NOTE: Record processing of device only on success to ensure that
+ # the charm only tries to initialize a device of OSD usage
+ # once during its lifetime.
+ osd_devices.append(dev)
+ finally:
+ db.set('osd-devices', osd_devices)
+ db.flush()
+
+
+def _ceph_disk(dev, osd_format, osd_journal, encrypt=False, bluestore=False):
+ """
+ Prepare a device for usage as a Ceph OSD using ceph-disk
+
+ :param: dev: Full path to use for OSD block device setup,
+ The function looks up realpath of the device
+ :param: osd_journal: List of block devices to use for OSD journals
+ :param: encrypt: Use block device encryption (unsupported)
+ :param: bluestore: Use bluestore storage for OSD
+ :returns: list. 'ceph-disk' command and required parameters for
+ execution by check_call
+ """
+ cmd = ['ceph-disk', 'prepare']
+
+ if encrypt:
+ cmd.append('--dmcrypt')
+
+ if osd_format and not bluestore:
+ cmd.append('--fs-type')
+ cmd.append(osd_format)
+
+ # NOTE(jamespage): enable experimental bluestore support
+ if cmp_pkgrevno('ceph', '10.2.0') >= 0 and bluestore:
+ cmd.append('--bluestore')
+ wal = get_devices('bluestore-wal')
+ if wal:
+ cmd.append('--block.wal')
+ least_used_wal = find_least_used_utility_device(wal)
+ cmd.append(least_used_wal)
+ db = get_devices('bluestore-db')
+ if db:
+ cmd.append('--block.db')
+ least_used_db = find_least_used_utility_device(db)
+ cmd.append(least_used_db)
+ elif cmp_pkgrevno('ceph', '12.1.0') >= 0 and not bluestore:
+ cmd.append('--filestore')
+
+ cmd.append(os.path.realpath(dev))
+
+ if osd_journal:
+ least_used = find_least_used_utility_device(osd_journal)
+ cmd.append(least_used)
+
+ return cmd
+
+
+def _ceph_volume(dev, osd_journal, encrypt=False, bluestore=False,
+ key_manager=CEPH_KEY_MANAGER):
+ """
+ Prepare and activate a device for usage as a Ceph OSD using ceph-volume.
+
+ This also includes creation of all PV's, VG's and LV's required to
+ support the initialization of the OSD.
+
+ :param: dev: Full path to use for OSD block device setup
+ :param: osd_journal: List of block devices to use for OSD journals
+ :param: encrypt: Use block device encryption
+ :param: bluestore: Use bluestore storage for OSD
+ :param: key_manager: dm-crypt Key Manager to use
+ :raises subprocess.CalledProcessError: in the event that any supporting
+ LVM operation failed.
+ :returns: list. 'ceph-volume' command and required parameters for
+ execution by check_call
+ """
+ cmd = ['ceph-volume', 'lvm', 'create']
+
+ osd_fsid = str(uuid.uuid4())
+ cmd.append('--osd-fsid')
+ cmd.append(osd_fsid)
+
+ if bluestore:
+ cmd.append('--bluestore')
+ main_device_type = 'block'
+ else:
+ cmd.append('--filestore')
+ main_device_type = 'data'
+
+ if encrypt and key_manager == CEPH_KEY_MANAGER:
+ cmd.append('--dmcrypt')
+
+ # On-disk journal volume creation
+ if not osd_journal and not bluestore:
+ journal_lv_type = 'journal'
+ cmd.append('--journal')
+ cmd.append(_allocate_logical_volume(
+ dev=dev,
+ lv_type=journal_lv_type,
+ osd_fsid=osd_fsid,
+ size='{}M'.format(calculate_volume_size('journal')),
+ encrypt=encrypt,
+ key_manager=key_manager)
+ )
+
+ cmd.append('--data')
+ cmd.append(_allocate_logical_volume(dev=dev,
+ lv_type=main_device_type,
+ osd_fsid=osd_fsid,
+ encrypt=encrypt,
+ key_manager=key_manager))
+
+ if bluestore:
+ for extra_volume in ('wal', 'db'):
+ devices = get_devices('bluestore-{}'.format(extra_volume))
+ if devices:
+ cmd.append('--block.{}'.format(extra_volume))
+ least_used = find_least_used_utility_device(devices,
+ lvs=True)
+ cmd.append(_allocate_logical_volume(
+ dev=least_used,
+ lv_type=extra_volume,
+ osd_fsid=osd_fsid,
+ size='{}M'.format(calculate_volume_size(extra_volume)),
+ shared=True,
+ encrypt=encrypt,
+ key_manager=key_manager)
+ )
+
+ elif osd_journal:
+ cmd.append('--journal')
+ least_used = find_least_used_utility_device(osd_journal,
+ lvs=True)
+ cmd.append(_allocate_logical_volume(
+ dev=least_used,
+ lv_type='journal',
+ osd_fsid=osd_fsid,
+ size='{}M'.format(calculate_volume_size('journal')),
+ shared=True,
+ encrypt=encrypt,
+ key_manager=key_manager)
+ )
+
+ return cmd
+
+
+def _partition_name(dev):
+ """
+ Derive the first partition name for a block device
+
+ :param: dev: Full path to block device.
+ :returns: str: Full path to first partition on block device.
+ """
+ if dev[-1].isdigit():
+ return '{}p1'.format(dev)
+ else:
+ return '{}1'.format(dev)
+
+
+def is_active_bluestore_device(dev):
+ """
+ Determine whether provided device is part of an active
+ bluestore based OSD (as its block component).
+
+ :param: dev: Full path to block device to check for Bluestore usage.
+ :returns: boolean: indicating whether device is in active use.
+ """
+ if not lvm.is_lvm_physical_volume(dev):
+ return False
+
+ vg_name = lvm.list_lvm_volume_group(dev)
+ lv_name = lvm.list_logical_volumes('vg_name={}'.format(vg_name))[0]
+
+ block_symlinks = glob.glob('/var/lib/ceph/osd/ceph-*/block')
+ for block_candidate in block_symlinks:
+ if os.path.islink(block_candidate):
+ target = os.readlink(block_candidate)
+ if target.endswith(lv_name):
+ return True
+
+ return False
+
+
+def is_luks_device(dev):
+ """
+ Determine if dev is a LUKS-formatted block device.
+
+ :param: dev: A full path to a block device to check for LUKS header
+ presence
+ :returns: boolean: indicates whether a device is used based on LUKS header.
+ """
+ return True if _luks_uuid(dev) else False
+
+
+def is_mapped_luks_device(dev):
+ """
+ Determine if dev is a mapped LUKS device
+ :param: dev: A full path to a block device to be checked
+ :returns: boolean: indicates whether a device is mapped
+ """
+ _, dirs, _ = next(os.walk(
+ '/sys/class/block/{}/holders/'
+ .format(os.path.basename(os.path.realpath(dev))))
+ )
+ is_held = len(dirs) > 0
+ return is_held and is_luks_device(dev)
+
+
+def get_conf(variable):
+ """
+ Get the value of the given configuration variable from the
+ cluster.
+
+ :param variable: ceph configuration variable
+ :returns: str. configured value for provided variable
+
+ """
+ return subprocess.check_output([
+ 'ceph-osd',
+ '--show-config-value={}'.format(variable),
+ '--no-mon-config',
+ ]).strip()
+
+
+def calculate_volume_size(lv_type):
+ """
+ Determine the configured size for Bluestore DB/WAL or
+ Filestore Journal devices
+
+ :param lv_type: volume type (db, wal or journal)
+ :raises KeyError: if invalid lv_type is supplied
+ :returns: int. Configured size in megabytes for volume type
+ """
+ # lv_type -> ceph configuration option
+ _config_map = {
+ 'db': 'bluestore_block_db_size',
+ 'wal': 'bluestore_block_wal_size',
+ 'journal': 'osd_journal_size',
+ }
+
+ # default sizes in MB
+ _default_size = {
+ 'db': 1024,
+ 'wal': 576,
+ 'journal': 1024,
+ }
+
+ # conversion of ceph config units to MB
+ _units = {
+ 'db': 1048576, # Bytes -> MB
+ 'wal': 1048576, # Bytes -> MB
+ 'journal': 1, # Already in MB
+ }
+
+ configured_size = get_conf(_config_map[lv_type])
+
+ if configured_size is None or int(configured_size) == 0:
+ return _default_size[lv_type]
+ else:
+ return int(configured_size) / _units[lv_type]
+
+
+def _luks_uuid(dev):
+ """
+ Check to see if dev is a LUKS encrypted volume, returning the UUID
+ of volume if it is.
+
+ :param: dev: path to block device to check.
+ :returns: str. UUID of LUKS device or None if not a LUKS device
+ """
+ try:
+ cmd = ['cryptsetup', 'luksUUID', dev]
+ return subprocess.check_output(cmd).decode('UTF-8').strip()
+ except subprocess.CalledProcessError:
+ return None
+
+
+def _initialize_disk(dev, dev_uuid, encrypt=False,
+ key_manager=CEPH_KEY_MANAGER):
+ """
+ Initialize a raw block device consuming 100% of the avaliable
+ disk space.
+
+ Function assumes that block device has already been wiped.
+
+ :param: dev: path to block device to initialize
+ :param: dev_uuid: UUID to use for any dm-crypt operations
+ :param: encrypt: Encrypt OSD devices using dm-crypt
+ :param: key_manager: Key management approach for dm-crypt keys
+ :raises: subprocess.CalledProcessError: if any parted calls fail
+ :returns: str: Full path to new partition.
+ """
+ use_vaultlocker = encrypt and key_manager == VAULT_KEY_MANAGER
+
+ if use_vaultlocker:
+ # NOTE(jamespage): Check to see if already initialized as a LUKS
+ # volume, which indicates this is a shared block
+ # device for journal, db or wal volumes.
+ luks_uuid = _luks_uuid(dev)
+ if luks_uuid:
+ return '/dev/mapper/crypt-{}'.format(luks_uuid)
+
+ dm_crypt = '/dev/mapper/crypt-{}'.format(dev_uuid)
+
+ if use_vaultlocker and not os.path.exists(dm_crypt):
+ subprocess.check_call([
+ 'vaultlocker',
+ 'encrypt',
+ '--uuid', dev_uuid,
+ dev,
+ ])
+ subprocess.check_call([
+ 'dd',
+ 'if=/dev/zero',
+ 'of={}'.format(dm_crypt),
+ 'bs=512',
+ 'count=1',
+ ])
+
+ if use_vaultlocker:
+ return dm_crypt
+ else:
+ return dev
+
+
+def _allocate_logical_volume(dev, lv_type, osd_fsid,
+ size=None, shared=False,
+ encrypt=False,
+ key_manager=CEPH_KEY_MANAGER):
+ """
+ Allocate a logical volume from a block device, ensuring any
+ required initialization and setup of PV's and VG's to support
+ the LV.
+
+ :param: dev: path to block device to allocate from.
+ :param: lv_type: logical volume type to create
+ (data, block, journal, wal, db)
+ :param: osd_fsid: UUID of the OSD associate with the LV
+ :param: size: Size in LVM format for the device;
+ if unset 100% of VG
+ :param: shared: Shared volume group (journal, wal, db)
+ :param: encrypt: Encrypt OSD devices using dm-crypt
+ :param: key_manager: dm-crypt Key Manager to use
+ :raises subprocess.CalledProcessError: in the event that any supporting
+ LVM or parted operation fails.
+ :returns: str: String in the format 'vg_name/lv_name'.
+ """
+ lv_name = "osd-{}-{}".format(lv_type, osd_fsid)
+ current_volumes = lvm.list_logical_volumes()
+ if shared:
+ dev_uuid = str(uuid.uuid4())
+ else:
+ dev_uuid = osd_fsid
+ pv_dev = _initialize_disk(dev, dev_uuid, encrypt, key_manager)
+
+ vg_name = None
+ if not lvm.is_lvm_physical_volume(pv_dev):
+ lvm.create_lvm_physical_volume(pv_dev)
+ if shared:
+ vg_name = 'ceph-{}-{}'.format(lv_type,
+ str(uuid.uuid4()))
+ else:
+ vg_name = 'ceph-{}'.format(osd_fsid)
+ lvm.create_lvm_volume_group(vg_name, pv_dev)
+ else:
+ vg_name = lvm.list_lvm_volume_group(pv_dev)
+
+ if lv_name not in current_volumes:
+ lvm.create_logical_volume(lv_name, vg_name, size)
+
+ return "{}/{}".format(vg_name, lv_name)
+
+
+def osdize_dir(path, encrypt=False, bluestore=False):
+ """Ask ceph-disk to prepare a directory to become an osd.
+
+ :param path: str. The directory to osdize
+ :param encrypt: bool. Should the OSD directory be encrypted at rest
+ :returns: None
+ """
+
+ db = kv()
+ osd_devices = db.get('osd-devices', [])
+ if path in osd_devices:
+ log('Device {} already processed by charm,'
+ ' skipping'.format(path))
+ return
+
+ if os.path.exists(os.path.join(path, 'upstart')):
+ log('Path {} is already configured as an OSD - bailing'.format(path))
+ return
+
+ if cmp_pkgrevno('ceph', "0.56.6") < 0:
+ log('Unable to use directories for OSDs with ceph < 0.56.6',
+ level=ERROR)
+ return
+
+ mkdir(path, owner=ceph_user(), group=ceph_user(), perms=0o755)
+ chownr('/var/lib/ceph', ceph_user(), ceph_user())
+ cmd = [
+ 'sudo', '-u', ceph_user(),
+ 'ceph-disk',
+ 'prepare',
+ '--data-dir',
+ path
+ ]
+ if cmp_pkgrevno('ceph', '0.60') >= 0:
+ if encrypt:
+ cmd.append('--dmcrypt')
+
+ # NOTE(icey): enable experimental bluestore support
+ if cmp_pkgrevno('ceph', '10.2.0') >= 0 and bluestore:
+ cmd.append('--bluestore')
+ elif cmp_pkgrevno('ceph', '12.1.0') >= 0 and not bluestore:
+ cmd.append('--filestore')
+ log("osdize dir cmd: {}".format(cmd))
+ subprocess.check_call(cmd)
+
+ # NOTE: Record processing of device only on success to ensure that
+ # the charm only tries to initialize a device of OSD usage
+ # once during its lifetime.
+ osd_devices.append(path)
+ db.set('osd-devices', osd_devices)
+ db.flush()
+
+
+def filesystem_mounted(fs):
+ return subprocess.call(['grep', '-wqs', fs, '/proc/mounts']) == 0
+
+
+def get_running_osds():
+ """Returns a list of the pids of the current running OSD daemons"""
+ cmd = ['pgrep', 'ceph-osd']
+ try:
+ result = str(subprocess.check_output(cmd).decode('UTF-8'))
+ return result.split()
+ except subprocess.CalledProcessError:
+ return []
+
+
+def get_cephfs(service):
+ """List the Ceph Filesystems that exist.
+
+ :param service: The service name to run the ceph command under
+ :returns: list. Returns a list of the ceph filesystems
+ """
+ if get_version() < 0.86:
+ # This command wasn't introduced until 0.86 ceph
+ return []
+ try:
+ output = str(subprocess
+ .check_output(["ceph", '--id', service, "fs", "ls"])
+ .decode('UTF-8'))
+ if not output:
+ return []
+ """
+ Example subprocess output:
+ 'name: ip-172-31-23-165, metadata pool: ip-172-31-23-165_metadata,
+ data pools: [ip-172-31-23-165_data ]\n'
+ output: filesystems: ['ip-172-31-23-165']
+ """
+ filesystems = []
+ for line in output.splitlines():
+ parts = line.split(',')
+ for part in parts:
+ if "name" in part:
+ filesystems.append(part.split(' ')[1])
+ except subprocess.CalledProcessError:
+ return []
+
+
+def wait_for_all_monitors_to_upgrade(new_version, upgrade_key):
+ """Fairly self explanatory name. This function will wait
+ for all monitors in the cluster to upgrade or it will
+ return after a timeout period has expired.
+
+ :param new_version: str of the version to watch
+ :param upgrade_key: the cephx key name to use
+ """
+ done = False
+ start_time = time.time()
+ monitor_list = []
+
+ mon_map = get_mon_map('admin')
+ if mon_map['monmap']['mons']:
+ for mon in mon_map['monmap']['mons']:
+ monitor_list.append(mon['name'])
+ while not done:
+ try:
+ done = all(monitor_key_exists(upgrade_key, "{}_{}_{}_done".format(
+ "mon", mon, new_version
+ )) for mon in monitor_list)
+ current_time = time.time()
+ if current_time > (start_time + 10 * 60):
+ raise Exception
+ else:
+ # Wait 30 seconds and test again if all monitors are upgraded
+ time.sleep(30)
+ except subprocess.CalledProcessError:
+ raise
+
+
+# Edge cases:
+# 1. Previous node dies on upgrade, can we retry?
+def roll_monitor_cluster(new_version, upgrade_key):
+ """This is tricky to get right so here's what we're going to do.
+
+ There's 2 possible cases: Either I'm first in line or not.
+ If I'm not first in line I'll wait a random time between 5-30 seconds
+ and test to see if the previous monitor is upgraded yet.
+
+ :param new_version: str of the version to upgrade to
+ :param upgrade_key: the cephx key name to use when upgrading
+ """
+ log('roll_monitor_cluster called with {}'.format(new_version))
+ my_name = socket.gethostname()
+ monitor_list = []
+ mon_map = get_mon_map('admin')
+ if mon_map['monmap']['mons']:
+ for mon in mon_map['monmap']['mons']:
+ monitor_list.append(mon['name'])
+ else:
+ status_set('blocked', 'Unable to get monitor cluster information')
+ sys.exit(1)
+ log('monitor_list: {}'.format(monitor_list))
+
+ # A sorted list of osd unit names
+ mon_sorted_list = sorted(monitor_list)
+
+ try:
+ position = mon_sorted_list.index(my_name)
+ log("upgrade position: {}".format(position))
+ if position == 0:
+ # I'm first! Roll
+ # First set a key to inform others I'm about to roll
+ lock_and_roll(upgrade_key=upgrade_key,
+ service='mon',
+ my_name=my_name,
+ version=new_version)
+ else:
+ # Check if the previous node has finished
+ status_set('waiting',
+ 'Waiting on {} to finish upgrading'.format(
+ mon_sorted_list[position - 1]))
+ wait_on_previous_node(upgrade_key=upgrade_key,
+ service='mon',
+ previous_node=mon_sorted_list[position - 1],
+ version=new_version)
+ lock_and_roll(upgrade_key=upgrade_key,
+ service='mon',
+ my_name=my_name,
+ version=new_version)
+ # NOTE(jamespage):
+ # Wait until all monitors have upgraded before bootstrapping
+ # the ceph-mgr daemons due to use of new mgr keyring profiles
+ if new_version == 'luminous':
+ wait_for_all_monitors_to_upgrade(new_version=new_version,
+ upgrade_key=upgrade_key)
+ bootstrap_manager()
+ except ValueError:
+ log("Failed to find {} in list {}.".format(
+ my_name, mon_sorted_list))
+ status_set('blocked', 'failed to upgrade monitor')
+
+
+# TODO(jamespage):
+# Mimic support will need to ensure that ceph-mgr daemons are also
+# restarted during upgrades - probably through use of one of the
+# high level systemd targets shipped by the packaging.
+def upgrade_monitor(new_version):
+ """Upgrade the current ceph monitor to the new version
+
+ :param new_version: String version to upgrade to.
+ """
+ current_version = get_version()
+ status_set("maintenance", "Upgrading monitor")
+ log("Current ceph version is {}".format(current_version))
+ log("Upgrading to: {}".format(new_version))
+
+ try:
+ add_source(config('source'), config('key'))
+ apt_update(fatal=True)
+ except subprocess.CalledProcessError as err:
+ log("Adding the ceph source failed with message: {}".format(
+ err))
+ status_set("blocked", "Upgrade to {} failed".format(new_version))
+ sys.exit(1)
+ try:
+ if systemd():
+ service_stop('ceph-mon')
+ else:
+ service_stop('ceph-mon-all')
+ apt_install(packages=determine_packages(), fatal=True)
+
+ owner = ceph_user()
+
+ # Ensure the files and directories under /var/lib/ceph is chowned
+ # properly as part of the move to the Jewel release, which moved the
+ # ceph daemons to running as ceph:ceph instead of root:root.
+ if new_version == 'jewel':
+ # Ensure the ownership of Ceph's directories is correct
+ chownr(path=os.path.join(os.sep, "var", "lib", "ceph"),
+ owner=owner,
+ group=owner,
+ follow_links=True)
+
+ # Ensure that mon directory is user writable
+ hostname = socket.gethostname()
+ path = '/var/lib/ceph/mon/ceph-{}'.format(hostname)
+ mkdir(path, owner=ceph_user(), group=ceph_user(),
+ perms=0o755)
+
+ if systemd():
+ service_start('ceph-mon')
+ else:
+ service_start('ceph-mon-all')
+ except subprocess.CalledProcessError as err:
+ log("Stopping ceph and upgrading packages failed "
+ "with message: {}".format(err))
+ status_set("blocked", "Upgrade to {} failed".format(new_version))
+ sys.exit(1)
+
+
+def lock_and_roll(upgrade_key, service, my_name, version):
+ """Create a lock on the ceph monitor cluster and upgrade.
+
+ :param upgrade_key: str. The cephx key to use
+ :param service: str. The cephx id to use
+ :param my_name: str. The current hostname
+ :param version: str. The version we are upgrading to
+ """
+ start_timestamp = time.time()
+
+ log('monitor_key_set {}_{}_{}_start {}'.format(
+ service,
+ my_name,
+ version,
+ start_timestamp))
+ monitor_key_set(upgrade_key, "{}_{}_{}_start".format(
+ service, my_name, version), start_timestamp)
+ log("Rolling")
+
+ # This should be quick
+ if service == 'osd':
+ upgrade_osd(version)
+ elif service == 'mon':
+ upgrade_monitor(version)
+ else:
+ log("Unknown service {}. Unable to upgrade".format(service),
+ level=ERROR)
+ log("Done")
+
+ stop_timestamp = time.time()
+ # Set a key to inform others I am finished
+ log('monitor_key_set {}_{}_{}_done {}'.format(service,
+ my_name,
+ version,
+ stop_timestamp))
+ status_set('maintenance', 'Finishing upgrade')
+ monitor_key_set(upgrade_key, "{}_{}_{}_done".format(service,
+ my_name,
+ version),
+ stop_timestamp)
+
+
+def wait_on_previous_node(upgrade_key, service, previous_node, version):
+ """A lock that sleeps the current thread while waiting for the previous
+ node to finish upgrading.
+
+ :param upgrade_key:
+ :param service: str. the cephx id to use
+ :param previous_node: str. The name of the previous node to wait on
+ :param version: str. The version we are upgrading to
+ :returns: None
+ """
+ log("Previous node is: {}".format(previous_node))
+
+ previous_node_finished = monitor_key_exists(
+ upgrade_key,
+ "{}_{}_{}_done".format(service, previous_node, version))
+
+ while previous_node_finished is False:
+ log("{} is not finished. Waiting".format(previous_node))
+ # Has this node been trying to upgrade for longer than
+ # 10 minutes?
+ # If so then move on and consider that node dead.
+
+ # NOTE: This assumes the clusters clocks are somewhat accurate
+ # If the hosts clock is really far off it may cause it to skip
+ # the previous node even though it shouldn't.
+ current_timestamp = time.time()
+ previous_node_start_time = monitor_key_get(
+ upgrade_key,
+ "{}_{}_{}_start".format(service, previous_node, version))
+ if (previous_node_start_time is not None and
+ ((current_timestamp - (10 * 60)) >
+ float(previous_node_start_time))):
+ # NOTE(jamespage):
+ # Previous node is probably dead as we've been waiting
+ # for 10 minutes - lets move on and upgrade
+ log("Waited 10 mins on node {}. current time: {} > "
+ "previous node start time: {} Moving on".format(
+ previous_node,
+ (current_timestamp - (10 * 60)),
+ previous_node_start_time))
+ return
+ # NOTE(jamespage)
+ # Previous node has not started, or started less than
+ # 10 minutes ago - sleep a random amount of time and
+ # then check again.
+ wait_time = random.randrange(5, 30)
+ log('waiting for {} seconds'.format(wait_time))
+ time.sleep(wait_time)
+ previous_node_finished = monitor_key_exists(
+ upgrade_key,
+ "{}_{}_{}_done".format(service, previous_node, version))
+
+
+def get_upgrade_position(osd_sorted_list, match_name):
+ """Return the upgrade position for the given osd.
+
+ :param osd_sorted_list: list. Osds sorted
+ :param match_name: str. The osd name to match
+ :returns: int. The position or None if not found
+ """
+ for index, item in enumerate(osd_sorted_list):
+ if item.name == match_name:
+ return index
+ return None
+
+
+# Edge cases:
+# 1. Previous node dies on upgrade, can we retry?
+# 2. This assumes that the osd failure domain is not set to osd.
+# It rolls an entire server at a time.
+def roll_osd_cluster(new_version, upgrade_key):
+ """This is tricky to get right so here's what we're going to do.
+
+ There's 2 possible cases: Either I'm first in line or not.
+ If I'm not first in line I'll wait a random time between 5-30 seconds
+ and test to see if the previous osd is upgraded yet.
+
+ TODO: If you're not in the same failure domain it's safe to upgrade
+ 1. Examine all pools and adopt the most strict failure domain policy
+ Example: Pool 1: Failure domain = rack
+ Pool 2: Failure domain = host
+ Pool 3: Failure domain = row
+
+ outcome: Failure domain = host
+
+ :param new_version: str of the version to upgrade to
+ :param upgrade_key: the cephx key name to use when upgrading
+ """
+ log('roll_osd_cluster called with {}'.format(new_version))
+ my_name = socket.gethostname()
+ osd_tree = get_osd_tree(service=upgrade_key)
+ # A sorted list of osd unit names
+ osd_sorted_list = sorted(osd_tree)
+ log("osd_sorted_list: {}".format(osd_sorted_list))
+
+ try:
+ position = get_upgrade_position(osd_sorted_list, my_name)
+ log("upgrade position: {}".format(position))
+ if position == 0:
+ # I'm first! Roll
+ # First set a key to inform others I'm about to roll
+ lock_and_roll(upgrade_key=upgrade_key,
+ service='osd',
+ my_name=my_name,
+ version=new_version)
+ else:
+ # Check if the previous node has finished
+ status_set('waiting',
+ 'Waiting on {} to finish upgrading'.format(
+ osd_sorted_list[position - 1].name))
+ wait_on_previous_node(
+ upgrade_key=upgrade_key,
+ service='osd',
+ previous_node=osd_sorted_list[position - 1].name,
+ version=new_version)
+ lock_and_roll(upgrade_key=upgrade_key,
+ service='osd',
+ my_name=my_name,
+ version=new_version)
+ except ValueError:
+ log("Failed to find name {} in list {}".format(
+ my_name, osd_sorted_list))
+ status_set('blocked', 'failed to upgrade osd')
+
+
+def upgrade_osd(new_version):
+ """Upgrades the current osd
+
+ :param new_version: str. The new version to upgrade to
+ """
+ current_version = get_version()
+ status_set("maintenance", "Upgrading osd")
+ log("Current ceph version is {}".format(current_version))
+ log("Upgrading to: {}".format(new_version))
+
+ try:
+ add_source(config('source'), config('key'))
+ apt_update(fatal=True)
+ except subprocess.CalledProcessError as err:
+ log("Adding the ceph sources failed with message: {}".format(
+ err))
+ status_set("blocked", "Upgrade to {} failed".format(new_version))
+ sys.exit(1)
+
+ try:
+ # Upgrade the packages before restarting the daemons.
+ status_set('maintenance', 'Upgrading packages to %s' % new_version)
+ apt_install(packages=determine_packages(), fatal=True)
+
+ # If the upgrade does not need an ownership update of any of the
+ # directories in the osd service directory, then simply restart
+ # all of the OSDs at the same time as this will be the fastest
+ # way to update the code on the node.
+ if not dirs_need_ownership_update('osd'):
+ log('Restarting all OSDs to load new binaries', DEBUG)
+ if systemd():
+ service_restart('ceph-osd.target')
+ else:
+ service_restart('ceph-osd-all')
+ return
+
+ # Need to change the ownership of all directories which are not OSD
+ # directories as well.
+ # TODO - this should probably be moved to the general upgrade function
+ # and done before mon/osd.
+ update_owner(CEPH_BASE_DIR, recurse_dirs=False)
+ non_osd_dirs = filter(lambda x: not x == 'osd',
+ os.listdir(CEPH_BASE_DIR))
+ non_osd_dirs = map(lambda x: os.path.join(CEPH_BASE_DIR, x),
+ non_osd_dirs)
+ for path in non_osd_dirs:
+ update_owner(path)
+
+ # Fast service restart wasn't an option because each of the OSD
+ # directories need the ownership updated for all the files on
+ # the OSD. Walk through the OSDs one-by-one upgrading the OSD.
+ for osd_dir in _get_child_dirs(OSD_BASE_DIR):
+ try:
+ osd_num = _get_osd_num_from_dirname(osd_dir)
+ _upgrade_single_osd(osd_num, osd_dir)
+ except ValueError as ex:
+ # Directory could not be parsed - junk directory?
+ log('Could not parse osd directory %s: %s' % (osd_dir, ex),
+ WARNING)
+ continue
+
+ except (subprocess.CalledProcessError, IOError) as err:
+ log("Stopping ceph and upgrading packages failed "
+ "with message: {}".format(err))
+ status_set("blocked", "Upgrade to {} failed".format(new_version))
+ sys.exit(1)
+
+
+def _upgrade_single_osd(osd_num, osd_dir):
+ """Upgrades the single OSD directory.
+
+ :param osd_num: the num of the OSD
+ :param osd_dir: the directory of the OSD to upgrade
+ :raises CalledProcessError: if an error occurs in a command issued as part
+ of the upgrade process
+ :raises IOError: if an error occurs reading/writing to a file as part
+ of the upgrade process
+ """
+ stop_osd(osd_num)
+ disable_osd(osd_num)
+ update_owner(osd_dir)
+ enable_osd(osd_num)
+ start_osd(osd_num)
+
+
+def stop_osd(osd_num):
+ """Stops the specified OSD number.
+
+ :param osd_num: the osd number to stop
+ """
+ if systemd():
+ service_stop('ceph-osd@{}'.format(osd_num))
+ else:
+ service_stop('ceph-osd', id=osd_num)
+
+
+def start_osd(osd_num):
+ """Starts the specified OSD number.
+
+ :param osd_num: the osd number to start.
+ """
+ if systemd():
+ service_start('ceph-osd@{}'.format(osd_num))
+ else:
+ service_start('ceph-osd', id=osd_num)
+
+
+def disable_osd(osd_num):
+ """Disables the specified OSD number.
+
+ Ensures that the specified osd will not be automatically started at the
+ next reboot of the system. Due to differences between init systems,
+ this method cannot make any guarantees that the specified osd cannot be
+ started manually.
+
+ :param osd_num: the osd id which should be disabled.
+ :raises CalledProcessError: if an error occurs invoking the systemd cmd
+ to disable the OSD
+ :raises IOError, OSError: if the attempt to read/remove the ready file in
+ an upstart enabled system fails
+ """
+ if systemd():
+ # When running under systemd, the individual ceph-osd daemons run as
+ # templated units and can be directly addressed by referring to the
+ # templated service name ceph-osd@. Additionally, systemd
+ # allows one to disable a specific templated unit by running the
+ # 'systemctl disable ceph-osd@' command. When disabled, the
+ # OSD should remain disabled until re-enabled via systemd.
+ # Note: disabling an already disabled service in systemd returns 0, so
+ # no need to check whether it is enabled or not.
+ cmd = ['systemctl', 'disable', 'ceph-osd@{}'.format(osd_num)]
+ subprocess.check_call(cmd)
+ else:
+ # Neither upstart nor the ceph-osd upstart script provides for
+ # disabling the starting of an OSD automatically. The specific OSD
+ # cannot be prevented from running manually, however it can be
+ # prevented from running automatically on reboot by removing the
+ # 'ready' file in the OSD's root directory. This is due to the
+ # ceph-osd-all upstart script checking for the presence of this file
+ # before starting the OSD.
+ ready_file = os.path.join(OSD_BASE_DIR, 'ceph-{}'.format(osd_num),
+ 'ready')
+ if os.path.exists(ready_file):
+ os.unlink(ready_file)
+
+
+def enable_osd(osd_num):
+ """Enables the specified OSD number.
+
+ Ensures that the specified osd_num will be enabled and ready to start
+ automatically in the event of a reboot.
+
+ :param osd_num: the osd id which should be enabled.
+ :raises CalledProcessError: if the call to the systemd command issued
+ fails when enabling the service
+ :raises IOError: if the attempt to write the ready file in an usptart
+ enabled system fails
+ """
+ if systemd():
+ cmd = ['systemctl', 'enable', 'ceph-osd@{}'.format(osd_num)]
+ subprocess.check_call(cmd)
+ else:
+ # When running on upstart, the OSDs are started via the ceph-osd-all
+ # upstart script which will only start the osd if it has a 'ready'
+ # file. Make sure that file exists.
+ ready_file = os.path.join(OSD_BASE_DIR, 'ceph-{}'.format(osd_num),
+ 'ready')
+ with open(ready_file, 'w') as f:
+ f.write('ready')
+
+ # Make sure the correct user owns the file. It shouldn't be necessary
+ # as the upstart script should run with root privileges, but its better
+ # to have all the files matching ownership.
+ update_owner(ready_file)
+
+
+def update_owner(path, recurse_dirs=True):
+ """Changes the ownership of the specified path.
+
+ Changes the ownership of the specified path to the new ceph daemon user
+ using the system's native chown functionality. This may take awhile,
+ so this method will issue a set_status for any changes of ownership which
+ recurses into directory structures.
+
+ :param path: the path to recursively change ownership for
+ :param recurse_dirs: boolean indicating whether to recursively change the
+ ownership of all the files in a path's subtree or to
+ simply change the ownership of the path.
+ :raises CalledProcessError: if an error occurs issuing the chown system
+ command
+ """
+ user = ceph_user()
+ user_group = '{ceph_user}:{ceph_user}'.format(ceph_user=user)
+ cmd = ['chown', user_group, path]
+ if os.path.isdir(path) and recurse_dirs:
+ status_set('maintenance', ('Updating ownership of %s to %s' %
+ (path, user)))
+ cmd.insert(1, '-R')
+
+ log('Changing ownership of {path} to {user}'.format(
+ path=path, user=user_group), DEBUG)
+ start = datetime.now()
+ subprocess.check_call(cmd)
+ elapsed_time = (datetime.now() - start)
+
+ log('Took {secs} seconds to change the ownership of path: {path}'.format(
+ secs=elapsed_time.total_seconds(), path=path), DEBUG)
+
+
+def list_pools(service):
+ """This will list the current pools that Ceph has
+
+ :param service: String service id to run under
+ :returns: list. Returns a list of the ceph pools.
+ :raises: CalledProcessError if the subprocess fails to run.
+ """
+ try:
+ pool_list = []
+ pools = str(subprocess
+ .check_output(['rados', '--id', service, 'lspools'])
+ .decode('UTF-8'))
+ for pool in pools.splitlines():
+ pool_list.append(pool)
+ return pool_list
+ except subprocess.CalledProcessError as err:
+ log("rados lspools failed with error: {}".format(err.output))
+ raise
+
+
+def dirs_need_ownership_update(service):
+ """Determines if directories still need change of ownership.
+
+ Examines the set of directories under the /var/lib/ceph/{service} directory
+ and determines if they have the correct ownership or not. This is
+ necessary due to the upgrade from Hammer to Jewel where the daemon user
+ changes from root: to ceph:.
+
+ :param service: the name of the service folder to check (e.g. osd, mon)
+ :returns: boolean. True if the directories need a change of ownership,
+ False otherwise.
+ :raises IOError: if an error occurs reading the file stats from one of
+ the child directories.
+ :raises OSError: if the specified path does not exist or some other error
+ """
+ expected_owner = expected_group = ceph_user()
+ path = os.path.join(CEPH_BASE_DIR, service)
+ for child in _get_child_dirs(path):
+ curr_owner, curr_group = owner(child)
+
+ if (curr_owner == expected_owner) and (curr_group == expected_group):
+ continue
+
+ log('Directory "%s" needs its ownership updated' % child, DEBUG)
+ return True
+
+ # All child directories had the expected ownership
+ return False
+
+# A dict of valid ceph upgrade paths. Mapping is old -> new
+UPGRADE_PATHS = collections.OrderedDict([
+ ('firefly', 'hammer'),
+ ('hammer', 'jewel'),
+ ('jewel', 'luminous'),
+ ('luminous', 'mimic'),
+])
+
+# Map UCA codenames to ceph codenames
+UCA_CODENAME_MAP = {
+ 'icehouse': 'firefly',
+ 'juno': 'firefly',
+ 'kilo': 'hammer',
+ 'liberty': 'hammer',
+ 'mitaka': 'jewel',
+ 'newton': 'jewel',
+ 'ocata': 'jewel',
+ 'pike': 'luminous',
+ 'queens': 'luminous',
+ 'rocky': 'mimic',
+ 'stein': 'mimic',
+}
+
+
+def pretty_print_upgrade_paths():
+ """Pretty print supported upgrade paths for ceph"""
+ return ["{} -> {}".format(key, value)
+ for key, value in UPGRADE_PATHS.items()]
+
+
+def resolve_ceph_version(source):
+ """Resolves a version of ceph based on source configuration
+ based on Ubuntu Cloud Archive pockets.
+
+ @param: source: source configuration option of charm
+ :returns: ceph release codename or None if not resolvable
+ """
+ os_release = get_os_codename_install_source(source)
+ return UCA_CODENAME_MAP.get(os_release)
+
+
+def get_ceph_pg_stat():
+ """Returns the result of ceph pg stat.
+
+ :returns: dict
+ """
+ try:
+ tree = str(subprocess
+ .check_output(['ceph', 'pg', 'stat', '--format=json'])
+ .decode('UTF-8'))
+ try:
+ json_tree = json.loads(tree)
+ if not json_tree['num_pg_by_state']:
+ return None
+ return json_tree
+ except ValueError as v:
+ log("Unable to parse ceph pg stat json: {}. Error: {}".format(
+ tree, v))
+ raise
+ except subprocess.CalledProcessError as e:
+ log("ceph pg stat command failed with message: {}".format(e))
+ raise
+
+
+def get_ceph_health():
+ """Returns the health of the cluster from a 'ceph status'
+
+ :returns: dict tree of ceph status
+ :raises: CalledProcessError if our ceph command fails to get the overall
+ status, use get_ceph_health()['overall_status'].
+ """
+ try:
+ tree = str(subprocess
+ .check_output(['ceph', 'status', '--format=json'])
+ .decode('UTF-8'))
+ try:
+ json_tree = json.loads(tree)
+ # Make sure children are present in the json
+ if not json_tree['overall_status']:
+ return None
+
+ return json_tree
+ except ValueError as v:
+ log("Unable to parse ceph tree json: {}. Error: {}".format(
+ tree, v))
+ raise
+ except subprocess.CalledProcessError as e:
+ log("ceph status command failed with message: {}".format(e))
+ raise
+
+
+def reweight_osd(osd_num, new_weight):
+ """Changes the crush weight of an OSD to the value specified.
+
+ :param osd_num: the osd id which should be changed
+ :param new_weight: the new weight for the OSD
+ :returns: bool. True if output looks right, else false.
+ :raises CalledProcessError: if an error occurs invoking the systemd cmd
+ """
+ try:
+ cmd_result = str(subprocess
+ .check_output(['ceph', 'osd', 'crush',
+ 'reweight', "osd.{}".format(osd_num),
+ new_weight],
+ stderr=subprocess.STDOUT)
+ .decode('UTF-8'))
+ expected_result = "reweighted item id {ID} name \'osd.{ID}\'".format(
+ ID=osd_num) + " to {}".format(new_weight)
+ log(cmd_result)
+ if expected_result in cmd_result:
+ return True
+ return False
+ except subprocess.CalledProcessError as e:
+ log("ceph osd crush reweight command failed"
+ " with message: {}".format(e))
+ raise
+
+
+def determine_packages():
+ """Determines packages for installation.
+
+ :returns: list of ceph packages
+ """
+ return PACKAGES
+
+
+def bootstrap_manager():
+ hostname = socket.gethostname()
+ path = '/var/lib/ceph/mgr/ceph-{}'.format(hostname)
+ keyring = os.path.join(path, 'keyring')
+
+ if os.path.exists(keyring):
+ log('bootstrap_manager: mgr already initialized.')
+ else:
+ mkdir(path, owner=ceph_user(), group=ceph_user())
+ subprocess.check_call(['ceph', 'auth', 'get-or-create',
+ 'mgr.{}'.format(hostname), 'mon',
+ 'allow profile mgr', 'osd', 'allow *',
+ 'mds', 'allow *', '--out-file',
+ keyring])
+ chownr(path, ceph_user(), ceph_user())
+
+ unit = 'ceph-mgr@{}'.format(hostname)
+ subprocess.check_call(['systemctl', 'enable', unit])
+ service_restart(unit)
+
+
+def osd_noout(enable):
+ """Sets or unsets 'noout'
+
+ :param enable: bool. True to set noout, False to unset.
+ :returns: bool. True if output looks right.
+ :raises CalledProcessError: if an error occurs invoking the systemd cmd
+ """
+ operation = {
+ True: 'set',
+ False: 'unset',
+ }
+ try:
+ subprocess.check_call(['ceph', '--id', 'admin',
+ 'osd', operation[enable],
+ 'noout'])
+ log('running ceph osd {} noout'.format(operation[enable]))
+ return True
+ except subprocess.CalledProcessError as e:
+ log(e)
+ raise
diff --git a/unit_tests/test_ceph.py b/unit_tests/test_ceph.py
index 044d24e2..e13d7da1 100644
--- a/unit_tests/test_ceph.py
+++ b/unit_tests/test_ceph.py
@@ -23,7 +23,7 @@ mock_apt.apt_pkg = MagicMock()
sys.modules['apt'] = mock_apt
sys.modules['apt_pkg'] = mock_apt.apt_pkg
-import ceph # noqa
+import ceph_rgw as ceph # noqa
import utils # noqa
from test_utils import CharmTestCase # noqa
diff --git a/unit_tests/test_hooks.py b/unit_tests/test_hooks.py
index 7a9e2675..a6ddb431 100644
--- a/unit_tests/test_hooks.py
+++ b/unit_tests/test_hooks.py
@@ -13,7 +13,7 @@
# limitations under the License.
from mock import (
- patch, call
+ patch, call, MagicMock
)
from test_utils import (
@@ -63,6 +63,9 @@ TO_PATCH = [
'request_per_unit_key',
'get_certificate_request',
'process_certificates',
+ 'filter_installed_packages',
+ 'filter_missing_packages',
+ 'ceph_utils',
]
@@ -78,12 +81,69 @@ class CephRadosGWTests(CharmTestCase):
self.service_name.return_value = 'radosgw'
self.request_per_unit_key.return_value = False
self.systemd_based_radosgw.return_value = False
+ self.filter_installed_packages.side_effect = lambda pkgs: pkgs
+ self.filter_missing_packages.side_effect = lambda pkgs: pkgs
- def test_install_packages(self):
+ def test_upgrade_available(self):
+ _vers = {
+ 'distro': 'luminous',
+ 'cloud:bionic-rocky': 'mimic',
+ }
+ mock_config = MagicMock()
+ self.test_config.set('source', 'cloud:bionic-rocky')
+ mock_config.get.side_effect = self.test_config.get
+ mock_config.previous.return_value = 'distro'
+ self.config.side_effect = None
+ self.config.return_value = mock_config
+ self.ceph_utils.UPGRADE_PATHS = {
+ 'luminous': 'mimic',
+ }
+ self.ceph_utils.resolve_ceph_version.side_effect = (
+ lambda v: _vers.get(v)
+ )
+ self.assertTrue(ceph_hooks.upgrade_available())
+
+ @patch.object(ceph_hooks, 'upgrade_available')
+ def test_install_packages(self, upgrade_available):
+ mock_config = MagicMock()
+ mock_config.get.side_effect = self.test_config.get
+ mock_config.changed.return_value = True
+ self.config.side_effect = None
+ self.config.return_value = mock_config
+ upgrade_available.return_value = False
ceph_hooks.install_packages()
self.add_source.assert_called_with('distro', 'secretkey')
- self.assertTrue(self.apt_update.called)
- self.apt_purge.assert_called_with(['libapache2-mod-fastcgi'])
+ self.apt_update.assert_called_with(fatal=True)
+ self.apt_purge.assert_called_with(ceph_hooks.APACHE_PACKAGES)
+ self.apt_install.assert_called_with(ceph_hooks.PACKAGES,
+ fatal=True)
+ mock_config.changed.assert_called_with('source')
+ self.filter_installed_packages.assert_called_with(
+ ceph_hooks.PACKAGES
+ )
+ self.filter_missing_packages.assert_called_with(
+ ceph_hooks.APACHE_PACKAGES
+ )
+
+ @patch.object(ceph_hooks, 'upgrade_available')
+ def test_install_packages_upgrades(self, upgrade_available):
+ mock_config = MagicMock()
+ mock_config.get.side_effect = self.test_config.get
+ mock_config.changed.return_value = True
+ self.config.side_effect = None
+ self.config.return_value = mock_config
+ upgrade_available.return_value = True
+ ceph_hooks.install_packages()
+ self.add_source.assert_called_with('distro', 'secretkey')
+ self.apt_update.assert_called_with(fatal=True)
+ self.apt_purge.assert_called_with(ceph_hooks.APACHE_PACKAGES)
+ self.apt_install.assert_called_with(ceph_hooks.PACKAGES,
+ fatal=True)
+ mock_config.changed.assert_called_with('source')
+ self.filter_installed_packages.assert_not_called()
+ self.filter_missing_packages.assert_called_with(
+ ceph_hooks.APACHE_PACKAGES
+ )
def test_install(self):
_install_packages = self.patch('install_packages')