Adds more usage data to Nova's usage notifications.

Adds in bandwidth, state and IP data on standard notifications,
and new notifications on add/remove IP.

These were missing before, and are needed to meet spec.
This fixes bug 849117

Change-Id: Ie586ff3a91a56e5f5eff8abc6905ba6a0b624451
This commit is contained in:
Monsyne Dragon
2011-09-30 00:39:46 +00:00
parent c0cf874acb
commit 5aa5229082
17 changed files with 609 additions and 59 deletions

View File

@@ -24,14 +24,13 @@
system consuming usage notification feeds to calculate instance usage
for each tenant.
Time periods are specified like so:
<number>[mdy]
Time periods are specified as 'hour', 'month', 'day' or 'year'
1m = previous month. If the script is run April 1, it will generate usages
for March 1 thry March 31.
3m = 3 previous months.
90d = previous 90 days.
1y = previous year. If run on Jan 1, it generates usages for
hour = previous hour. If run at 9:07am, will generate usage for 8-9am.
month = previous month. If the script is run April 1, it will generate
usages for March 1 thry March 31.
day = previous day. if run on July 4th, it generates usages for July 3rd.
year = previous year. If run on Jan 1, it generates usages for
Jan 1 thru Dec 31 of the previous year.
"""
@@ -59,57 +58,20 @@ from nova import flags
from nova import log as logging
from nova import utils
from nova.notifier import api as notifier_api
from nova.compute.utils import notify_usage_exists
FLAGS = flags.FLAGS
flags.DEFINE_string('instance_usage_audit_period', '1m',
'time period to generate instance usages for.')
def time_period(period):
today = datetime.date.today()
unit = period[-1]
if unit not in 'mdy':
raise ValueError('Time period must be m, d, or y')
n = int(period[:-1])
if unit == 'm':
year = today.year - (n // 12)
n = n % 12
if n >= today.month:
year -= 1
month = 12 + (today.month - n)
else:
month = today.month - n
begin = datetime.datetime(day=1, month=month, year=year)
end = datetime.datetime(day=1, month=today.month, year=today.year)
elif unit == 'y':
begin = datetime.datetime(day=1, month=1, year=today.year - n)
end = datetime.datetime(day=1, month=1, year=today.year)
elif unit == 'd':
b = today - datetime.timedelta(days=n)
begin = datetime.datetime(day=b.day, month=b.month, year=b.year)
end = datetime.datetime(day=today.day,
month=today.month,
year=today.year)
return (begin, end)
if __name__ == '__main__':
admin_context = context.get_admin_context()
utils.default_flagfile()
flags.FLAGS(sys.argv)
logging.setup()
begin, end = time_period(FLAGS.instance_usage_audit_period)
begin, end = utils.current_audit_period()
print "Creating usages for %s until %s" % (str(begin), str(end))
ctxt = context.get_admin_context()
instances = db.instance_get_active_by_window_joined(ctxt, begin, end)
instances = db.instance_get_active_by_window_joined(admin_context,
begin,
end)
print "%s instances" % len(instances)
for instance_ref in instances:
usage_info = utils.usage_from_instance(instance_ref,
audit_period_begining=str(begin),
audit_period_ending=str(end))
notifier_api.notify('compute.%s' % FLAGS.host,
'compute.instance.exists',
notifier_api.INFO,
usage_info)
notify_usage_exists(instance_ref)

View File

@@ -58,6 +58,7 @@ from nova.compute import power_state
from nova.compute import task_states
from nova.compute import vm_states
from nova.notifier import api as notifier
from nova.compute.utils import notify_usage_exists
from nova.virt import driver
@@ -141,6 +142,7 @@ class ComputeManager(manager.SchedulerDependentManager):
self.network_api = network.API()
self.network_manager = utils.import_object(FLAGS.network_manager)
self._last_host_check = 0
self._last_bw_usage_poll = 0
super(ComputeManager, self).__init__(service_name="compute",
*args, **kwargs)
@@ -565,6 +567,9 @@ class ComputeManager(manager.SchedulerDependentManager):
@checks_instance_lock
def terminate_instance(self, context, instance_id):
"""Terminate an instance on this host."""
#generate usage info.
instance = self.db.instance_get(context.elevated(), instance_id)
notify_usage_exists(instance, current_period=True)
self._delete_instance(context, instance_id)
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
@@ -1137,6 +1142,12 @@ class ComputeManager(manager.SchedulerDependentManager):
"""
self.network_api.add_fixed_ip_to_instance(context, instance_id,
self.host, network_id)
instance_ref = self.db.instance_get(context, instance_id)
usage = utils.usage_from_instance(instance_ref)
notifier.notify('compute.%s' % self.host,
'compute.instance.create_ip',
notifier.INFO, usage)
self.inject_network_info(context, instance_id)
self.reset_network(context, instance_id)
@@ -1149,6 +1160,12 @@ class ComputeManager(manager.SchedulerDependentManager):
"""
self.network_api.remove_fixed_ip_from_instance(context, instance_id,
address)
instance_ref = self.db.instance_get(context, instance_id)
usage = utils.usage_from_instance(instance_ref)
notifier.notify('compute.%s' % self.host,
'compute.instance.delete_ip',
notifier.INFO, usage)
self.inject_network_info(context, instance_id)
self.reset_network(context, instance_id)
@@ -1803,9 +1820,35 @@ class ComputeManager(manager.SchedulerDependentManager):
LOG.warning(_("Error during reclamation of queued deletes: %s"),
unicode(ex))
error_list.append(ex)
try:
start = utils.current_audit_period()[1]
self._update_bandwidth_usage(context, start)
except NotImplementedError:
# Not all hypervisors have bandwidth polling implemented yet.
# If they don't id doesn't break anything, they just don't get the
# info in the usage events. (mdragon)
pass
except Exception as ex:
LOG.warning(_("Error updating bandwidth usage: %s"),
unicode(ex))
error_list.append(ex)
return error_list
def _update_bandwidth_usage(self, context, start_time, stop_time=None):
curr_time = time.time()
if curr_time - self._last_bw_usage_poll > FLAGS.bandwith_poll_interval:
self._last_bw_usage_poll = curr_time
LOG.info(_("Updating bandwidth usage cache"))
bw_usage = self.driver.get_all_bw_usage(start_time, stop_time)
for usage in bw_usage:
vif = usage['virtual_interface']
self.db.bw_usage_update(context,
vif.instance_id,
vif.network.label,
start_time,
usage['bw_in'], usage['bw_out'])
def _report_driver_status(self):
curr_time = time.time()
if curr_time - self._last_host_check > FLAGS.host_state_interval:

56
nova/compute/utils.py Normal file
View File

@@ -0,0 +1,56 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Compute-related Utilities and helpers."""
from nova import context
from nova import db
from nova import flags
from nova import utils
from nova.notifier import api as notifier_api
FLAGS = flags.FLAGS
def notify_usage_exists(instance_ref, current_period=False):
""" Generates 'exists' notification for an instance for usage auditing
purposes.
Generates usage for last completed period, unless 'current_period'
is True."""
admin_context = context.get_admin_context()
begin, end = utils.current_audit_period()
bw = {}
if current_period:
audit_start = end
audit_end = utils.utcnow()
else:
audit_start = begin
audit_end = end
for b in db.bw_usage_get_by_instance(admin_context,
instance_ref['id'],
audit_start):
bw[b.network_label] = dict(bw_in=b.bw_in, bw_out=b.bw_out)
usage_info = utils.usage_from_instance(instance_ref,
audit_period_begining=str(audit_start),
audit_period_ending=str(audit_end),
bandwidth=bw)
notifier_api.notify('compute.%s' % FLAGS.host,
'compute.instance.exists',
notifier_api.INFO,
usage_info)

View File

@@ -1438,6 +1438,30 @@ def agent_build_update(context, agent_build_id, values):
####################
def bw_usage_get_by_instance(context, instance_id, start_period):
"""Return bw usages for an instance in a given audit period."""
return IMPL.bw_usage_get_by_instance(context, instance_id, start_period)
def bw_usage_update(context,
instance_id,
network_label,
start_period,
bw_in, bw_out,
session=None):
"""Update cached bw usage for an instance and network
Creates new record if needed."""
return IMPL.bw_usage_update(context,
instance_id,
network_label,
start_period,
bw_in, bw_out,
session=None)
####################
def instance_type_extra_specs_get(context, instance_type_id):
"""Get all extra specs for an instance type."""
return IMPL.instance_type_extra_specs_get(context, instance_type_id)

View File

@@ -3617,6 +3617,42 @@ def agent_build_update(context, agent_build_id, values):
agent_build_ref.save(session=session)
####################
@require_context
def bw_usage_get_by_instance(context, instance_id, start_period):
session = get_session()
return session.query(models.BandwidthUsage).\
filter_by(instance_id=instance_id).\
filter_by(start_period=start_period).\
all()
@require_context
def bw_usage_update(context,
instance_id,
network_label,
start_period,
bw_in, bw_out,
session=None):
session = session if session else get_session()
with session.begin():
bwusage = session.query(models.BandwidthUsage).\
filter_by(instance_id=instance_id).\
filter_by(start_period=start_period).\
filter_by(network_label=network_label).\
first()
if not bwusage:
bwusage = models.BandwidthUsage()
bwusage.instance_id = instance_id
bwusage.start_period = start_period
bwusage.network_label = network_label
bwusage.last_refreshed = utils.utcnow()
bwusage.bw_in = bw_in
bwusage.bw_out = bw_out
bwusage.save(session=session)
####################

View File

@@ -0,0 +1,57 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 MORITA Kazutaka.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Column, Table, MetaData
from sqlalchemy import Integer, BigInteger, DateTime, Boolean, String
from nova import log as logging
meta = MetaData()
bw_cache = Table('bw_usage_cache', meta,
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('deleted_at', DateTime(timezone=False)),
Column('deleted', Boolean(create_constraint=True, name=None)),
Column('id', Integer(), primary_key=True, nullable=False),
Column('instance_id', Integer(), nullable=False),
Column('network_label',
String(length=255, convert_unicode=False, assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False)),
Column('start_period', DateTime(timezone=False), nullable=False),
Column('last_refreshed', DateTime(timezone=False)),
Column('bw_in', BigInteger()),
Column('bw_out', BigInteger()))
def upgrade(migrate_engine):
# Upgrade operations go here. Don't create your own engine;
# bind migrate_engine to your metadata
meta.bind = migrate_engine
try:
bw_cache.create()
except Exception:
logging.info(repr(bw_cache))
logging.exception('Exception while creating table')
meta.drop_all(tables=[bw_cache])
raise
def downgrade(migrate_engine):
# Operations to reverse the above upgrade go here.
bw_cache.drop()

View File

@@ -21,7 +21,7 @@ SQLAlchemy models for nova data.
"""
from sqlalchemy.orm import relationship, backref, object_mapper
from sqlalchemy import Column, Integer, String, schema
from sqlalchemy import Column, Integer, BigInteger, String, schema
from sqlalchemy import ForeignKey, DateTime, Boolean, Text, Float
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declarative_base
@@ -846,6 +846,18 @@ class AgentBuild(BASE, NovaBase):
md5hash = Column(String(255))
class BandwidthUsage(BASE, NovaBase):
"""Cache for instance bandwidth usage data pulled from the hypervisor"""
__tablename__ = 'bw_usage_cache'
id = Column(Integer, primary_key=True, nullable=False)
instance_id = Column(Integer, nullable=False)
network_label = Column(String(255))
start_period = Column(DateTime, nullable=False)
last_refreshed = Column(DateTime)
bw_in = Column(BigInteger)
bw_out = Column(BigInteger)
def register_models():
"""Register Models and create metadata.

View File

@@ -853,3 +853,7 @@ class InstanceTypeDiskTooSmall(NovaException):
class InsufficientFreeMemory(NovaException):
message = _("Insufficient free memory on compute node to start %(uuid)s.")
class CouldNotFetchMetrics(NovaException):
message = _("Could not fetch bandwidth/cpu/disk metrics for this host.")

View File

@@ -406,6 +406,10 @@ DEFINE_list('zone_capabilities',
'Key/Multi-value list representng capabilities of this zone')
DEFINE_string('build_plan_encryption_key', None,
'128bit (hex) encryption key for scheduler build plans.')
DEFINE_string('instance_usage_audit_period', 'month',
'time period to generate instance usages for.')
DEFINE_integer('bandwith_poll_interval', 600,
'interval to pull bandwidth usage info')
DEFINE_bool('start_guests_on_host_boot', False,
'Whether to restart guests when the host reboots')

View File

@@ -33,6 +33,7 @@ from nova.scheduler import driver as scheduler_driver
from nova import rpc
from nova import test
from nova import utils
import nova
from nova.compute import instance_types
from nova.compute import manager as compute_manager
@@ -448,6 +449,48 @@ class ComputeTestCase(test.TestCase):
self.assert_(console)
self.compute.terminate_instance(self.context, instance_id)
def test_add_fixed_ip_usage_notification(self):
def dummy(*args, **kwargs):
pass
self.stubs.Set(nova.network.API, 'add_fixed_ip_to_instance',
dummy)
self.stubs.Set(nova.compute.manager.ComputeManager,
'inject_network_info', dummy)
self.stubs.Set(nova.compute.manager.ComputeManager,
'reset_network', dummy)
instance_id = self._create_instance()
self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
self.compute.add_fixed_ip_to_instance(self.context,
instance_id,
1)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
self.compute.terminate_instance(self.context, instance_id)
def test_remove_fixed_ip_usage_notification(self):
def dummy(*args, **kwargs):
pass
self.stubs.Set(nova.network.API, 'remove_fixed_ip_from_instance',
dummy)
self.stubs.Set(nova.compute.manager.ComputeManager,
'inject_network_info', dummy)
self.stubs.Set(nova.compute.manager.ComputeManager,
'reset_network', dummy)
instance_id = self._create_instance()
self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
self.compute.remove_fixed_ip_from_instance(self.context,
instance_id,
1)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
self.compute.terminate_instance(self.context, instance_id)
def test_run_instance_usage_notification(self):
"""Ensure run instance generates apropriate usage notification"""
instance_id = self._create_instance()
@@ -457,7 +500,7 @@ class ComputeTestCase(test.TestCase):
self.assertEquals(msg['priority'], 'INFO')
self.assertEquals(msg['event_type'], 'compute.instance.create')
payload = msg['payload']
self.assertEquals(payload['project_id'], self.project_id)
self.assertEquals(payload['tenant_id'], self.project_id)
self.assertEquals(payload['user_id'], self.user_id)
self.assertEquals(payload['instance_id'], instance_id)
self.assertEquals(payload['instance_type'], 'm1.tiny')
@@ -476,12 +519,16 @@ class ComputeTestCase(test.TestCase):
test_notifier.NOTIFICATIONS = []
self.compute.terminate_instance(self.context, instance_id)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
msg = test_notifier.NOTIFICATIONS[0]
self.assertEquals(msg['priority'], 'INFO')
self.assertEquals(msg['event_type'], 'compute.instance.exists')
msg = test_notifier.NOTIFICATIONS[1]
self.assertEquals(msg['priority'], 'INFO')
self.assertEquals(msg['event_type'], 'compute.instance.delete')
payload = msg['payload']
self.assertEquals(payload['project_id'], self.project_id)
self.assertEquals(payload['tenant_id'], self.project_id)
self.assertEquals(payload['user_id'], self.user_id)
self.assertEquals(payload['instance_id'], instance_id)
self.assertEquals(payload['instance_type'], 'm1.tiny')
@@ -564,7 +611,7 @@ class ComputeTestCase(test.TestCase):
self.assertEquals(msg['priority'], 'INFO')
self.assertEquals(msg['event_type'], 'compute.instance.resize.prep')
payload = msg['payload']
self.assertEquals(payload['project_id'], self.project_id)
self.assertEquals(payload['tenant_id'], self.project_id)
self.assertEquals(payload['user_id'], self.user_id)
self.assertEquals(payload['instance_id'], instance_id)
self.assertEquals(payload['instance_type'], 'm1.tiny')

View File

@@ -0,0 +1,99 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For misc util methods used with compute.
"""
from datetime import datetime
from datetime import timedelta
from nova import db
from nova import exception
from nova import flags
from nova import context
from nova import test
from nova import log as logging
from nova import utils
import nova.image.fake
from nova.compute import utils as compute_utils
from nova.compute import instance_types
from nova.notifier import test_notifier
LOG = logging.getLogger('nova.tests.compute_utils')
FLAGS = flags.FLAGS
flags.DECLARE('stub_network', 'nova.compute.manager')
class UsageInfoTestCase(test.TestCase):
def setUp(self):
super(UsageInfoTestCase, self).setUp()
self.flags(connection_type='fake',
stub_network=True,
notification_driver='nova.notifier.test_notifier',
network_manager='nova.network.manager.FlatManager')
self.compute = utils.import_object(FLAGS.compute_manager)
self.user_id = 'fake'
self.project_id = 'fake'
self.context = context.RequestContext(self.user_id, self.project_id)
test_notifier.NOTIFICATIONS = []
def fake_show(meh, context, id):
return {'id': 1, 'properties': {'kernel_id': 1, 'ramdisk_id': 1}}
self.stubs.Set(nova.image.fake._FakeImageService, 'show', fake_show)
def _create_instance(self, params={}):
"""Create a test instance"""
inst = {}
inst['image_ref'] = 1
inst['reservation_id'] = 'r-fakeres'
inst['launch_time'] = '10'
inst['user_id'] = self.user_id
inst['project_id'] = self.project_id
type_id = instance_types.get_instance_type_by_name('m1.tiny')['id']
inst['instance_type_id'] = type_id
inst['ami_launch_index'] = 0
inst.update(params)
return db.instance_create(self.context, inst)['id']
def test_notify_usage_exists(self):
"""Ensure 'exists' notification generates apropriate usage data."""
instance_id = self._create_instance()
instance = db.instance_get(self.context, instance_id)
compute_utils.notify_usage_exists(instance)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
msg = test_notifier.NOTIFICATIONS[0]
self.assertEquals(msg['priority'], 'INFO')
self.assertEquals(msg['event_type'], 'compute.instance.exists')
payload = msg['payload']
self.assertEquals(payload['tenant_id'], self.project_id)
self.assertEquals(payload['user_id'], self.user_id)
self.assertEquals(payload['instance_id'], instance_id)
self.assertEquals(payload['instance_type'], 'm1.tiny')
type_id = instance_types.get_instance_type_by_name('m1.tiny')['id']
self.assertEquals(str(payload['instance_type_id']), str(type_id))
for attr in ('display_name', 'created_at', 'launched_at',
'state', 'state_description', 'fixed_ips',
'bandwidth', 'audit_period_begining',
'audit_period_ending'):
self.assertTrue(attr in payload,
msg="Key %s not in payload" % attr)
self.assertEquals(payload['image_ref'], '1')
self.compute.terminate_instance(self.context, instance_id)

View File

@@ -294,9 +294,46 @@ EASIER_PASSWORD_SYMBOLS = ('23456789' # Removed: 0, 1
'ABCDEFGHJKLMNPQRSTUVWXYZ') # Removed: I, O
def current_audit_period(unit=None):
if not unit:
unit = FLAGS.instance_usage_audit_period
rightnow = utcnow()
if unit not in ('month', 'day', 'year', 'hour'):
raise ValueError('Time period must be hour, day, month or year')
n = 1 # we are currently only using multiples of 1 unit (mdragon)
if unit == 'month':
year = rightnow.year - (n // 12)
n = n % 12
if n >= rightnow.month:
year -= 1
month = 12 + (rightnow.month - n)
else:
month = rightnow.month - n
begin = datetime.datetime(day=1, month=month, year=year)
end = datetime.datetime(day=1,
month=rightnow.month,
year=rightnow.year)
elif unit == 'year':
begin = datetime.datetime(day=1, month=1, year=rightnow.year - n)
end = datetime.datetime(day=1, month=1, year=rightnow.year)
elif unit == 'day':
b = rightnow - datetime.timedelta(days=n)
begin = datetime.datetime(day=b.day, month=b.month, year=b.year)
end = datetime.datetime(day=rightnow.day,
month=rightnow.month,
year=rightnow.year)
elif unit == 'hour':
end = rightnow.replace(minute=0, second=0, microsecond=0)
begin = end - datetime.timedelta(hours=n)
return (begin, end)
def usage_from_instance(instance_ref, **kw):
usage_info = dict(
project_id=instance_ref['project_id'],
tenant_id=instance_ref['project_id'],
user_id=instance_ref['user_id'],
instance_id=instance_ref['id'],
instance_type=instance_ref['instance_type']['name'],
@@ -305,7 +342,11 @@ def usage_from_instance(instance_ref, **kw):
created_at=str(instance_ref['created_at']),
launched_at=str(instance_ref['launched_at']) \
if instance_ref['launched_at'] else '',
image_ref=instance_ref['image_ref'])
image_ref=instance_ref['image_ref'],
state=instance_ref['vm_state'],
state_description=instance_ref['task_state'] \
if instance_ref['task_state'] else '',
fixed_ips=[a.address for a in instance_ref['fixed_ips']])
usage_info.update(kw)
return usage_info
@@ -324,7 +365,7 @@ def last_octet(address):
return int(address.split('.')[-1])
def get_my_linklocal(interface):
def get_my_linklocal(interface):
try:
if_str = execute('ip', '-f', 'inet6', '-o', 'addr', 'show', interface)
condition = '\s+inet6\s+([0-9a-f:]+)/\d+\s+scope\s+link'

View File

@@ -200,6 +200,11 @@ class ComputeDriver(object):
# TODO(Vek): Need to pass context in for access to auth_token
raise NotImplementedError()
def get_all_bw_usage(self, start_time, stop_time=None):
"""Return bandwidth usage info for each interface on each
running VM"""
raise NotImplementedError()
def get_host_ip_addr(self):
"""
Retrieves the IP address of the dom0

View File

@@ -189,6 +189,12 @@ class FakeConnection(driver.ComputeDriver):
def get_diagnostics(self, instance_name):
return {}
def get_all_bw_usage(self, start_time, stop_time=None):
"""Return bandwidth usage info for each interface on each
running VM"""
bwusage = []
return bwusage
def list_disks(self, instance_name):
return ['A_DISK']

View File

@@ -21,6 +21,7 @@ their attributes like VDIs, VIFs, as well as their lookup functions.
"""
import json
import math
import os
import pickle
import re
@@ -29,6 +30,7 @@ import tempfile
import time
import urllib
import uuid
from decimal import Decimal
from xml.dom import minidom
from nova import db
@@ -809,6 +811,24 @@ class VMHelper(HelperBase):
except cls.XenAPI.Failure as e:
return {"Unable to retrieve diagnostics": e}
@classmethod
def compile_metrics(cls, session, start_time, stop_time=None):
"""Compile bandwidth usage, cpu, and disk metrics for all VMs on
this host"""
start_time = int(start_time)
try:
host = session.get_xenapi_host()
host_ip = session.get_xenapi().host.get_record(host)["address"]
except (cls.XenAPI.Failure, KeyError) as e:
raise exception.CouldNotFetchMetrics()
xml = get_rrd_updates(host_ip, start_time)
if xml:
doc = minidom.parseString(xml)
return parse_rrd_update(doc, start_time, stop_time)
raise exception.CouldNotFetchMetrics()
@classmethod
def scan_sr(cls, session, instance_id=None, sr_ref=None):
"""Scans the SR specified by sr_ref"""
@@ -837,6 +857,88 @@ def get_rrd(host, vm_uuid):
return None
def get_rrd_updates(host, start_time):
"""Return the RRD updates XML as a string"""
try:
xml = urllib.urlopen("http://%s:%s@%s/rrd_updates?start=%s" % (
FLAGS.xenapi_connection_username,
FLAGS.xenapi_connection_password,
host,
start_time))
return xml.read()
except IOError:
return None
def parse_rrd_meta(doc):
data = {}
meta = doc.getElementsByTagName('meta')[0]
for tag in ('start', 'end', 'step'):
data[tag] = int(meta.getElementsByTagName(tag)[0].firstChild.data)
legend = meta.getElementsByTagName('legend')[0]
data['legend'] = [child.firstChild.data for child in legend.childNodes]
return data
def parse_rrd_data(doc):
dnode = doc.getElementsByTagName('data')[0]
return [dict(
time=int(child.getElementsByTagName('t')[0].firstChild.data),
values=[Decimal(valnode.firstChild.data)
for valnode in child.getElementsByTagName('v')])
for child in dnode.childNodes]
def parse_rrd_update(doc, start, until=None):
sum_data = {}
meta = parse_rrd_meta(doc)
data = parse_rrd_data(doc)
for col, collabel in enumerate(meta['legend']):
datatype, objtype, uuid, name = collabel.split(':')
vm_data = sum_data.get(uuid, dict())
if name.startswith('vif'):
vm_data[name] = integrate_series(data, col, start, until)
else:
vm_data[name] = average_series(data, col, start, until)
sum_data[uuid] = vm_data
return sum_data
def average_series(data, col, start, until=None):
vals = [row['values'][col] for row in data
if (not until or (row['time'] <= until)) and
not row['values'][col].is_nan()]
if vals:
return (sum(vals) / len(vals)).quantize(Decimal('1.0000'))
else:
return Decimal('0.0000')
def integrate_series(data, col, start, until=None):
total = Decimal('0.0000')
prev_time = int(start)
prev_val = None
for row in reversed(data):
if not until or (row['time'] <= until):
time = row['time']
val = row['values'][col]
if val.is_nan():
val = Decimal('0.0000')
if prev_val is None:
prev_val = val
if prev_val >= val:
total += ((val * (time - prev_time)) +
(Decimal('0.5000') * (prev_val - val) *
(time - prev_time)))
else:
total += ((prev_val * (time - prev_time)) +
(Decimal('0.5000') * (val - prev_val) *
(time - prev_time)))
prev_time = time
prev_val = val
return total.quantize(Decimal('1.0000'))
#TODO(sirp): This code comes from XS5.6 pluginlib.py, we should refactor to
# use that implmenetation
def get_vhd_parent(session, vdi_rec):

View File

@@ -1189,6 +1189,38 @@ class VMOps(object):
vm_rec = self._session.get_xenapi().VM.get_record(vm_ref)
return VMHelper.compile_diagnostics(self._session, vm_rec)
def get_all_bw_usage(self, start_time, stop_time=None):
"""Return bandwidth usage info for each interface on each
running VM"""
try:
metrics = VMHelper.compile_metrics(self._session,
start_time,
stop_time)
except exception.CouldNotFetchMetrics:
LOG.exception(_("Could not get bandwidth info."),
exc_info=sys.exc_info())
bw = {}
for uuid, data in metrics.iteritems():
vm_ref = self._session.get_xenapi().VM.get_by_uuid(uuid)
vm_rec = self._session.get_xenapi().VM.get_record(vm_ref)
vif_map = {}
for vif in [self._session.get_xenapi().VIF.get_record(vrec)
for vrec in vm_rec['VIFs']]:
vif_map[vif['device']] = vif['MAC']
name = vm_rec['name_label']
if name.startswith('Control domain'):
continue
vifs_bw = bw.setdefault(name, {})
for key, val in data.iteritems():
if key.startswith('vif_'):
vname = key.split('_')[1]
vif_bw = vifs_bw.setdefault(vif_map[vname], {})
if key.endswith('tx'):
vif_bw['bw_out'] = int(val)
if key.endswith('rx'):
vif_bw['bw_in'] = int(val)
return bw
def get_console_output(self, instance):
"""Return snapshot of console."""
# TODO: implement this to fix pylint!

View File

@@ -60,6 +60,7 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block.
import json
import random
import sys
import time
import urlparse
import xmlrpclib
@@ -291,6 +292,25 @@ class XenAPIConnection(driver.ComputeDriver):
"""Return data about VM diagnostics"""
return self._vmops.get_diagnostics(instance)
def get_all_bw_usage(self, start_time, stop_time=None):
"""Return bandwidth usage info for each interface on each
running VM"""
bwusage = []
start_time = time.mktime(start_time.timetuple())
if stop_time:
stop_time = time.mktime(stop_time.timetuple())
for iusage in self._vmops.get_all_bw_usage(start_time, stop_time).\
values():
for macaddr, usage in iusage.iteritems():
vi = db.virtual_interface_get_by_address(
context.get_admin_context(),
macaddr)
if vi:
bwusage.append(dict(virtual_interface=vi,
bw_in=usage['bw_in'],
bw_out=usage['bw_out']))
return bwusage
def get_console_output(self, instance):
"""Return snapshot of console"""
return self._vmops.get_console_output(instance)