Add implementation of handling events

Implement code of handling libvirt events for instancemonitor.

Change-Id: Icb17be66c55b6eec0d5396beb622bed078f5c663
This commit is contained in:
Kengo Takahara
2016-11-02 10:46:01 +00:00
committed by takahara.kengo
parent f0b0540ab2
commit a566f8ddc6
8 changed files with 508 additions and 1 deletions

View File

@@ -14,9 +14,11 @@
from oslo_config import cfg
from masakarimonitors.conf import base
from masakarimonitors.conf import instance
from masakarimonitors.conf import service
CONF = cfg.CONF
base.register_opts(CONF)
instance.register_opts(CONF)
service.register_opts(CONF)

View File

@@ -0,0 +1,66 @@
# Copyright(c) 2016 Nippon Telegraph and Telephone Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
monitor_callback_opts = [
cfg.IntOpt('retry_max',
default=12,
help='Number of retries when the notification processing'
' is error.'),
cfg.IntOpt('retry_interval',
default=10,
help='Trial interval of time of the notification processing'
' is error(in seconds).'),
cfg.StrOpt('api_version',
default='v1',
help='Masakari API Version.'),
cfg.StrOpt('interface',
default='public',
help='Interface of endpoint.'),
]
monitor_keystone_opts = [
cfg.StrOpt('project_domain_name',
default='default',
help='Domain name which the project belongs.'),
cfg.StrOpt('username',
default='masakari',
help='The name of a user with administrative privileges.'),
cfg.StrOpt('password',
default='password',
help='Administrator user\'s password.'),
cfg.StrOpt('project_name',
default='service',
help='Project name.'),
cfg.StrOpt('auth_url',
default='http://localhost:5000',
help='Address of Keystone.'),
cfg.StrOpt('region',
default='RegionOne',
help='Region name.'),
]
def register_opts(conf):
conf.register_opts(monitor_callback_opts, group='callback')
conf.register_opts(monitor_keystone_opts, group='keystone')
def list_opts():
return {
'callback': monitor_callback_opts,
'keystone': monitor_keystone_opts
}

View File

@@ -12,8 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
import time
import libvirt
from oslo_log import log as oslo_logging
from masakarimonitors.i18n import _LW
from masakarimonitors.instancemonitor.libvirt_handler import eventfilter
from masakarimonitors import manager
LOG = oslo_logging.getLogger(__name__)
@@ -25,12 +31,160 @@ class InstancemonitorManager(manager.Manager):
def __init__(self, *args, **kwargs):
super(InstancemonitorManager, self).__init__(
service_name="instancemonitor", *args, **kwargs)
self.evf = eventfilter.EventFilter()
# This keeps track of what thread is running the event loop,
# (if it is run in a background thread)
self.event_loop_thread = None
def _vir_event_loop_native_run(self):
# Directly run the event loop in the current thread
while True:
libvirt.virEventRunDefaultImpl()
def _vir_event_loop_native_start(self):
libvirt.virEventRegisterDefaultImpl()
self.event_loop_thread = threading.Thread(
target=self._vir_event_loop_native_run,
name="lib_virt_eventLoop")
self.event_loop_thread.setDaemon(True)
self.event_loop_thread.start()
def _my_domain_event_callback(self, conn, dom, event, detail, opaque):
self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE,
event, detail, dom.UUIDString())
def _my_domain_event_reboot_callback(self, conn, dom, opaque):
self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_REBOOT,
-1, -1, dom.UUIDString())
def _my_domain_event_rtc_change_callback(self, conn, dom, utcoffset,
opaque):
self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_RTC_CHANGE,
-1, -1, dom.UUIDString())
def _my_domain_event_watchdog_callback(self, conn, dom, action, opaque):
self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_WATCHDOG,
action, -1, dom.UUIDString())
def _my_domain_event_io_error_callback(self, conn, dom, srcpath,
devalias, action, opaque):
self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR,
action, -1, dom.UUIDString())
def _my_domain_event_graphics_callback(self, conn, dom, phase, localAddr,
remoteAddr, authScheme, subject,
opaque):
self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_GRAPHICS,
-1, phase, dom.UUIDString())
def _my_domain_event_disk_change_callback(self, conn, dom, oldSrcPath,
newSrcPath, devAlias, reason,
opaque):
self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_DISK_CHANGE,
-1, -1, dom.UUIDString())
def _my_domain_event_io_error_reason_callback(self, conn, dom, srcPath,
devAlias, action, reason,
opaque):
self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR_REASON,
-1, -1, dom.UUIDString())
def _my_domain_event_generic_callback(self, conn, dom, opaque):
self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_CONTROL_ERROR,
-1, -1, dom.UUIDString())
def _err_handler(self, ctxt, err):
LOG.warning(_LW("%s"), err[2])
def _virt_event(self, uri):
# Run a background thread with the event loop
self._vir_event_loop_native_start()
# Connect to libvert - If be disconnected, reprocess.
while True:
vc = libvirt.openReadOnly(uri)
# Event callback settings
callback_ids = []
cid = vc.domainEventRegisterAny(
None,
libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE,
self._my_domain_event_callback, None)
callback_ids.append(cid)
cid = vc.domainEventRegisterAny(
None,
libvirt.VIR_DOMAIN_EVENT_ID_REBOOT,
self._my_domain_event_reboot_callback, None)
callback_ids.append(cid)
cid = vc.domainEventRegisterAny(
None,
libvirt.VIR_DOMAIN_EVENT_ID_RTC_CHANGE,
self._my_domain_event_rtc_change_callback, None)
callback_ids.append(cid)
cid = vc.domainEventRegisterAny(
None,
libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR,
self._my_domain_event_io_error_callback, None)
callback_ids.append(cid)
cid = vc.domainEventRegisterAny(
None,
libvirt.VIR_DOMAIN_EVENT_ID_WATCHDOG,
self._my_domain_event_watchdog_callback, None)
callback_ids.append(cid)
cid = vc.domainEventRegisterAny(
None,
libvirt.VIR_DOMAIN_EVENT_ID_GRAPHICS,
self._my_domain_event_graphics_callback, None)
callback_ids.append(cid)
cid = vc.domainEventRegisterAny(
None,
libvirt.VIR_DOMAIN_EVENT_ID_DISK_CHANGE,
self._my_domain_event_disk_change_callback, None)
callback_ids.append(cid)
cid = vc.domainEventRegisterAny(
None,
libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR_REASON,
self._my_domain_event_io_error_reason_callback, None)
callback_ids.append(cid)
cid = vc.domainEventRegisterAny(
None,
libvirt.VIR_DOMAIN_EVENT_ID_CONTROL_ERROR,
self._my_domain_event_generic_callback, None)
callback_ids.append(cid)
# Connection monitoring.
vc.setKeepAlive(5, 3)
while vc.isAlive() == 1:
time.sleep(1)
# If connection between libvirtd was lost,
# clear callback connection.
LOG.warning(_LW("%s"), 'Libvirt Connection Closed Unexpectedly.')
for cid in callback_ids:
try:
vc.domainEventDeregisterAny(cid)
except Exception:
pass
vc.close()
del vc
time.sleep(3)
def main(self):
"""Main method.
Set the URI, error handler, and executes event loop processing.
"""
uri = "qemu:///system"
LOG.debug("Using uri:" + uri)
# set error handler & do event loop
libvirt.registerErrorHandler(self._err_handler, '_virt_event')
self._virt_event(uri)

View File

@@ -0,0 +1,76 @@
# Copyright(c) 2016 Nippon Telegraph and Telephone Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as oslo_logging
import masakarimonitors.conf
from masakarimonitors.i18n import _LI
LOG = oslo_logging.getLogger(__name__)
CONF = masakarimonitors.conf.CONF
VMHA = "vmha"
class Callback(object):
"""Class of callback processing."""
def _post_event(self, retry_max, retry_interval, event):
# TODO(KengoTakahara): This method will be implemented after
# fixing masakariclient.sdk.
pass
def libvirt_event_callback(self, eventID, detail, uuID, noticeType,
hostname, currentTime):
"""Callback method.
Callback processing be executed as result of the
libvirt event filter.
:param eventID: Event ID notify to the callback function
:param detail: Event code notify to the callback function
:param uuID: Uuid notify to the callback function
:param noticeType: Notice type notify to the callback function
:param hostname: Server host name of the source an event occur
notify to the callback function
:param currentTime: Event occurred time notify to the callback
function
"""
# Output to the syslog.
msg = "libvirt Event: type=%s hostname=%s uuid=%s \
time=%s eventID=%s detail=%s " % (
noticeType, hostname, uuID, currentTime, eventID, detail)
LOG.info(_LI("%s"), msg)
# Set the event to the dictionary.
event = {
'notification': {
'type': noticeType,
'hostname': hostname,
'generated_time': currentTime,
'payload': {
'event': eventID,
'instance_uuid': uuID,
'vir_domain_event': detail
}
}
}
retry_max = CONF.callback.retry_max
retry_interval = float(CONF.callback.retry_interval)
self._post_event(retry_max, retry_interval, event)
return

View File

@@ -0,0 +1,89 @@
# Copyright(c) 2016 Nippon Telegraph and Telephone Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
import sys
import threading
from oslo_log import log as oslo_logging
from oslo_utils import timeutils
from masakarimonitors.instancemonitor.libvirt_handler import callback
from masakarimonitors.instancemonitor.libvirt_handler \
import eventfilter_table as evft
LOG = oslo_logging.getLogger(__name__)
class EventFilter(object):
"""Class of filtering events."""
def __init__(self):
self.callback = callback.Callback()
def vir_event_filter(self, eventID, eventType, detail, uuID):
"""Filter events from libvirt.
:param eventID: EventID
:param eventType: Event type
:param detail: Event name
:pram uuID: UUID
"""
noticeType = 'VM'
hostname = socket.gethostname()
currentTime = timeutils.utcnow()
# All Event Output if debug mode is on.
msg = "libvirt Event Received.type = %s \
hostname = %s uuid = %s time = %s eventID = %d eventType = %d \
detail = %d" % (
noticeType,
hostname, uuID, currentTime, eventID,
eventType, detail)
LOG.debug(msg)
try:
if detail in evft.event_filter_dic[eventID][eventType]:
LOG.debug("Event Filter Matched.")
eventID_val = evft.eventID_dic[eventID]
detail_val = evft.detail_dic[eventID][eventType][detail]
# callback Thread Start
thread = threading.Thread(
target=self.callback.libvirt_event_callback,
args=(eventID_val, detail_val,
uuID, noticeType,
hostname, currentTime)
)
thread.start()
else:
LOG.debug("Event Filter Unmatched.")
pass
except KeyError:
LOG.debug("virEventFilter KeyError")
pass
except IndexError:
LOG.debug("virEventFilter IndexError")
pass
except TypeError:
LOG.debug("virEventFilter TypeError")
pass
except Exception:
LOG.debug("Unexpected error")
sys.exc_info()[0]
raise

View File

@@ -0,0 +1,120 @@
# Copyright(c) 2016 Nippon Telegraph and Telephone Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import libvirt
# If is not defined internal , -1 is stored.
DUMMY = -1
# Enumerate all event that can get.
# Comment out events that is not targeted in the callback.
event_filter_dic = {
libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE:
{
libvirt.VIR_DOMAIN_EVENT_SUSPENDED:
(
libvirt.VIR_DOMAIN_EVENT_SUSPENDED_IOERROR,
libvirt.VIR_DOMAIN_EVENT_SUSPENDED_WATCHDOG,
libvirt.VIR_DOMAIN_EVENT_SUSPENDED_API_ERROR
),
libvirt.VIR_DOMAIN_EVENT_STOPPED:
(
libvirt.VIR_DOMAIN_EVENT_STOPPED_SHUTDOWN,
libvirt.VIR_DOMAIN_EVENT_STOPPED_DESTROYED,
libvirt.VIR_DOMAIN_EVENT_STOPPED_FAILED,
),
libvirt.VIR_DOMAIN_EVENT_SHUTDOWN:
(
libvirt.VIR_DOMAIN_EVENT_SHUTDOWN_FINISHED,
)
},
libvirt.VIR_DOMAIN_EVENT_ID_REBOOT: {DUMMY: (DUMMY,)},
libvirt.VIR_DOMAIN_EVENT_ID_WATCHDOG:
{
libvirt.VIR_DOMAIN_EVENT_WATCHDOG_NONE: (DUMMY,),
libvirt.VIR_DOMAIN_EVENT_WATCHDOG_PAUSE: (DUMMY,),
libvirt.VIR_DOMAIN_EVENT_WATCHDOG_RESET: (DUMMY,),
libvirt.VIR_DOMAIN_EVENT_WATCHDOG_POWEROFF: (DUMMY,),
libvirt.VIR_DOMAIN_EVENT_WATCHDOG_SHUTDOWN: (DUMMY,),
libvirt.VIR_DOMAIN_EVENT_WATCHDOG_DEBUG: (DUMMY,)
},
libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR:
{
libvirt.VIR_DOMAIN_EVENT_IO_ERROR_NONE: (DUMMY,),
libvirt.VIR_DOMAIN_EVENT_IO_ERROR_PAUSE: (DUMMY,),
libvirt.VIR_DOMAIN_EVENT_IO_ERROR_REPORT: (DUMMY,)
},
libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR_REASON: {DUMMY: (DUMMY,)},
libvirt.VIR_DOMAIN_EVENT_ID_CONTROL_ERROR: {DUMMY: (DUMMY,)}
}
eventID_dic = {
libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE: 'LIFECYCLE',
libvirt.VIR_DOMAIN_EVENT_ID_REBOOT: 'REBOOT',
libvirt.VIR_DOMAIN_EVENT_ID_WATCHDOG: 'WATCHDOG',
libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR: 'IO_ERROR',
libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR_REASON: 'IO_ERROR_REASON',
libvirt.VIR_DOMAIN_EVENT_ID_CONTROL_ERROR: 'CONTROL_ERROR'}
detail_dic = {
libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE: {
libvirt.VIR_DOMAIN_EVENT_SUSPENDED: {
libvirt.VIR_DOMAIN_EVENT_SUSPENDED_IOERROR:
'SUSPENDED_IOERROR',
libvirt.VIR_DOMAIN_EVENT_SUSPENDED_WATCHDOG:
'SUSPENDED_WATCHDOG',
libvirt.VIR_DOMAIN_EVENT_SUSPENDED_API_ERROR:
'SUSPENDED_API_ERROR'},
libvirt.VIR_DOMAIN_EVENT_STOPPED: {
libvirt.VIR_DOMAIN_EVENT_STOPPED_SHUTDOWN:
'STOPPED_SHUTDOWN',
libvirt.VIR_DOMAIN_EVENT_STOPPED_DESTROYED:
'STOPPED_DESTROYED',
libvirt.VIR_DOMAIN_EVENT_STOPPED_FAILED:
'STOPPED_FAILED'},
libvirt.VIR_DOMAIN_EVENT_SHUTDOWN: {
libvirt.VIR_DOMAIN_EVENT_SHUTDOWN_FINISHED:
'SHUTDOWN_FINISHED'}
},
libvirt.VIR_DOMAIN_EVENT_ID_REBOOT: {
DUMMY: {
DUMMY: 'UNKNOWN'}},
libvirt.VIR_DOMAIN_EVENT_ID_WATCHDOG: {
libvirt.VIR_DOMAIN_EVENT_WATCHDOG_NONE: {
DUMMY: 'WATCHDOG_NONE'},
libvirt.VIR_DOMAIN_EVENT_WATCHDOG_PAUSE: {
DUMMY: 'WATCHDOG_PAUSE'},
libvirt.VIR_DOMAIN_EVENT_WATCHDOG_RESET: {
DUMMY: 'WATCHDOG_RESET'},
libvirt.VIR_DOMAIN_EVENT_WATCHDOG_POWEROFF: {
DUMMY: 'WATCHDOG_POWEROFF'},
libvirt.VIR_DOMAIN_EVENT_WATCHDOG_SHUTDOWN: {
DUMMY: 'WATCHDOG_SHUTDOWN'},
libvirt.VIR_DOMAIN_EVENT_WATCHDOG_DEBUG: {
DUMMY: 'WATCHDOG_DEBUG'}},
libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR: {
libvirt.VIR_DOMAIN_EVENT_IO_ERROR_NONE: {
DUMMY: 'IO_ERROR_NONE'},
libvirt.VIR_DOMAIN_EVENT_IO_ERROR_PAUSE: {
DUMMY: 'IO_ERROR_PAUSE'},
libvirt.VIR_DOMAIN_EVENT_IO_ERROR_REPORT: {
DUMMY: 'IO_ERROR_REPORT'}},
libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR_REASON: {
DUMMY: {
DUMMY: 'UNKNOWN'}},
libvirt.VIR_DOMAIN_EVENT_ID_CONTROL_ERROR: {
DUMMY: {
DUMMY: 'UNKNOWN'}}
}