From a566f8ddc6b3b46ae020d182496d153fb0c1b3e7 Mon Sep 17 00:00:00 2001 From: Kengo Takahara Date: Wed, 2 Nov 2016 10:46:01 +0000 Subject: [PATCH] Add implementation of handling events Implement code of handling libvirt events for instancemonitor. Change-Id: Icb17be66c55b6eec0d5396beb622bed078f5c663 --- masakarimonitors/conf/__init__.py | 2 + masakarimonitors/conf/instance.py | 66 ++++++++ masakarimonitors/instancemonitor/__init__.py | 0 masakarimonitors/instancemonitor/instance.py | 156 +++++++++++++++++- .../libvirt_handler/__init__.py | 0 .../libvirt_handler/callback.py | 76 +++++++++ .../libvirt_handler/eventfilter.py | 89 ++++++++++ .../libvirt_handler/eventfilter_table.py | 120 ++++++++++++++ 8 files changed, 508 insertions(+), 1 deletion(-) create mode 100644 masakarimonitors/conf/instance.py create mode 100644 masakarimonitors/instancemonitor/__init__.py create mode 100644 masakarimonitors/instancemonitor/libvirt_handler/__init__.py create mode 100644 masakarimonitors/instancemonitor/libvirt_handler/callback.py create mode 100644 masakarimonitors/instancemonitor/libvirt_handler/eventfilter.py create mode 100644 masakarimonitors/instancemonitor/libvirt_handler/eventfilter_table.py diff --git a/masakarimonitors/conf/__init__.py b/masakarimonitors/conf/__init__.py index 8a599fa..0ff2c24 100644 --- a/masakarimonitors/conf/__init__.py +++ b/masakarimonitors/conf/__init__.py @@ -14,9 +14,11 @@ from oslo_config import cfg from masakarimonitors.conf import base +from masakarimonitors.conf import instance from masakarimonitors.conf import service CONF = cfg.CONF base.register_opts(CONF) +instance.register_opts(CONF) service.register_opts(CONF) diff --git a/masakarimonitors/conf/instance.py b/masakarimonitors/conf/instance.py new file mode 100644 index 0000000..9269c4b --- /dev/null +++ b/masakarimonitors/conf/instance.py @@ -0,0 +1,66 @@ +# Copyright(c) 2016 Nippon Telegraph and Telephone Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from oslo_config import cfg + + +monitor_callback_opts = [ + cfg.IntOpt('retry_max', + default=12, + help='Number of retries when the notification processing' + ' is error.'), + cfg.IntOpt('retry_interval', + default=10, + help='Trial interval of time of the notification processing' + ' is error(in seconds).'), + cfg.StrOpt('api_version', + default='v1', + help='Masakari API Version.'), + cfg.StrOpt('interface', + default='public', + help='Interface of endpoint.'), +] + + +monitor_keystone_opts = [ + cfg.StrOpt('project_domain_name', + default='default', + help='Domain name which the project belongs.'), + cfg.StrOpt('username', + default='masakari', + help='The name of a user with administrative privileges.'), + cfg.StrOpt('password', + default='password', + help='Administrator user\'s password.'), + cfg.StrOpt('project_name', + default='service', + help='Project name.'), + cfg.StrOpt('auth_url', + default='http://localhost:5000', + help='Address of Keystone.'), + cfg.StrOpt('region', + default='RegionOne', + help='Region name.'), +] + + +def register_opts(conf): + conf.register_opts(monitor_callback_opts, group='callback') + conf.register_opts(monitor_keystone_opts, group='keystone') + + +def list_opts(): + return { + 'callback': monitor_callback_opts, + 'keystone': monitor_keystone_opts + } diff --git a/masakarimonitors/instancemonitor/__init__.py b/masakarimonitors/instancemonitor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/masakarimonitors/instancemonitor/instance.py b/masakarimonitors/instancemonitor/instance.py index 93266c5..86342da 100644 --- a/masakarimonitors/instancemonitor/instance.py +++ b/masakarimonitors/instancemonitor/instance.py @@ -12,8 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import threading +import time + +import libvirt from oslo_log import log as oslo_logging +from masakarimonitors.i18n import _LW +from masakarimonitors.instancemonitor.libvirt_handler import eventfilter from masakarimonitors import manager LOG = oslo_logging.getLogger(__name__) @@ -25,12 +31,160 @@ class InstancemonitorManager(manager.Manager): def __init__(self, *args, **kwargs): super(InstancemonitorManager, self).__init__( service_name="instancemonitor", *args, **kwargs) + self.evf = eventfilter.EventFilter() + # This keeps track of what thread is running the event loop, + # (if it is run in a background thread) + self.event_loop_thread = None + + def _vir_event_loop_native_run(self): + # Directly run the event loop in the current thread + while True: + libvirt.virEventRunDefaultImpl() + + def _vir_event_loop_native_start(self): + libvirt.virEventRegisterDefaultImpl() + self.event_loop_thread = threading.Thread( + target=self._vir_event_loop_native_run, + name="lib_virt_eventLoop") + self.event_loop_thread.setDaemon(True) + self.event_loop_thread.start() + + def _my_domain_event_callback(self, conn, dom, event, detail, opaque): + self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE, + event, detail, dom.UUIDString()) + + def _my_domain_event_reboot_callback(self, conn, dom, opaque): + self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_REBOOT, + -1, -1, dom.UUIDString()) + + def _my_domain_event_rtc_change_callback(self, conn, dom, utcoffset, + opaque): + self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_RTC_CHANGE, + -1, -1, dom.UUIDString()) + + def _my_domain_event_watchdog_callback(self, conn, dom, action, opaque): + self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_WATCHDOG, + action, -1, dom.UUIDString()) + + def _my_domain_event_io_error_callback(self, conn, dom, srcpath, + devalias, action, opaque): + self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR, + action, -1, dom.UUIDString()) + + def _my_domain_event_graphics_callback(self, conn, dom, phase, localAddr, + remoteAddr, authScheme, subject, + opaque): + self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_GRAPHICS, + -1, phase, dom.UUIDString()) + + def _my_domain_event_disk_change_callback(self, conn, dom, oldSrcPath, + newSrcPath, devAlias, reason, + opaque): + self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_DISK_CHANGE, + -1, -1, dom.UUIDString()) + + def _my_domain_event_io_error_reason_callback(self, conn, dom, srcPath, + devAlias, action, reason, + opaque): + self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR_REASON, + -1, -1, dom.UUIDString()) + + def _my_domain_event_generic_callback(self, conn, dom, opaque): + self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_CONTROL_ERROR, + -1, -1, dom.UUIDString()) + + def _err_handler(self, ctxt, err): + LOG.warning(_LW("%s"), err[2]) + + def _virt_event(self, uri): + # Run a background thread with the event loop + self._vir_event_loop_native_start() + + # Connect to libvert - If be disconnected, reprocess. + while True: + vc = libvirt.openReadOnly(uri) + + # Event callback settings + callback_ids = [] + cid = vc.domainEventRegisterAny( + None, + libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE, + self._my_domain_event_callback, None) + callback_ids.append(cid) + + cid = vc.domainEventRegisterAny( + None, + libvirt.VIR_DOMAIN_EVENT_ID_REBOOT, + self._my_domain_event_reboot_callback, None) + callback_ids.append(cid) + + cid = vc.domainEventRegisterAny( + None, + libvirt.VIR_DOMAIN_EVENT_ID_RTC_CHANGE, + self._my_domain_event_rtc_change_callback, None) + callback_ids.append(cid) + + cid = vc.domainEventRegisterAny( + None, + libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR, + self._my_domain_event_io_error_callback, None) + callback_ids.append(cid) + + cid = vc.domainEventRegisterAny( + None, + libvirt.VIR_DOMAIN_EVENT_ID_WATCHDOG, + self._my_domain_event_watchdog_callback, None) + callback_ids.append(cid) + + cid = vc.domainEventRegisterAny( + None, + libvirt.VIR_DOMAIN_EVENT_ID_GRAPHICS, + self._my_domain_event_graphics_callback, None) + callback_ids.append(cid) + + cid = vc.domainEventRegisterAny( + None, + libvirt.VIR_DOMAIN_EVENT_ID_DISK_CHANGE, + self._my_domain_event_disk_change_callback, None) + callback_ids.append(cid) + + cid = vc.domainEventRegisterAny( + None, + libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR_REASON, + self._my_domain_event_io_error_reason_callback, None) + callback_ids.append(cid) + + cid = vc.domainEventRegisterAny( + None, + libvirt.VIR_DOMAIN_EVENT_ID_CONTROL_ERROR, + self._my_domain_event_generic_callback, None) + callback_ids.append(cid) + + # Connection monitoring. + vc.setKeepAlive(5, 3) + while vc.isAlive() == 1: + time.sleep(1) + + # If connection between libvirtd was lost, + # clear callback connection. + LOG.warning(_LW("%s"), 'Libvirt Connection Closed Unexpectedly.') + for cid in callback_ids: + try: + vc.domainEventDeregisterAny(cid) + except Exception: + pass + vc.close() + del vc + time.sleep(3) def main(self): """Main method. Set the URI, error handler, and executes event loop processing. - """ uri = "qemu:///system" LOG.debug("Using uri:" + uri) + + # set error handler & do event loop + libvirt.registerErrorHandler(self._err_handler, '_virt_event') + self._virt_event(uri) diff --git a/masakarimonitors/instancemonitor/libvirt_handler/__init__.py b/masakarimonitors/instancemonitor/libvirt_handler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/masakarimonitors/instancemonitor/libvirt_handler/callback.py b/masakarimonitors/instancemonitor/libvirt_handler/callback.py new file mode 100644 index 0000000..bdd9c82 --- /dev/null +++ b/masakarimonitors/instancemonitor/libvirt_handler/callback.py @@ -0,0 +1,76 @@ +# Copyright(c) 2016 Nippon Telegraph and Telephone Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as oslo_logging + +import masakarimonitors.conf +from masakarimonitors.i18n import _LI + + +LOG = oslo_logging.getLogger(__name__) +CONF = masakarimonitors.conf.CONF +VMHA = "vmha" + + +class Callback(object): + """Class of callback processing.""" + + def _post_event(self, retry_max, retry_interval, event): + + # TODO(KengoTakahara): This method will be implemented after + # fixing masakariclient.sdk. + pass + + def libvirt_event_callback(self, eventID, detail, uuID, noticeType, + hostname, currentTime): + """Callback method. + + Callback processing be executed as result of the + libvirt event filter. + + :param eventID: Event ID notify to the callback function + :param detail: Event code notify to the callback function + :param uuID: Uuid notify to the callback function + :param noticeType: Notice type notify to the callback function + :param hostname: Server host name of the source an event occur + notify to the callback function + :param currentTime: Event occurred time notify to the callback + function + """ + + # Output to the syslog. + msg = "libvirt Event: type=%s hostname=%s uuid=%s \ + time=%s eventID=%s detail=%s " % ( + noticeType, hostname, uuID, currentTime, eventID, detail) + LOG.info(_LI("%s"), msg) + + # Set the event to the dictionary. + event = { + 'notification': { + 'type': noticeType, + 'hostname': hostname, + 'generated_time': currentTime, + 'payload': { + 'event': eventID, + 'instance_uuid': uuID, + 'vir_domain_event': detail + } + } + } + + retry_max = CONF.callback.retry_max + retry_interval = float(CONF.callback.retry_interval) + + self._post_event(retry_max, retry_interval, event) + return diff --git a/masakarimonitors/instancemonitor/libvirt_handler/eventfilter.py b/masakarimonitors/instancemonitor/libvirt_handler/eventfilter.py new file mode 100644 index 0000000..3d8991b --- /dev/null +++ b/masakarimonitors/instancemonitor/libvirt_handler/eventfilter.py @@ -0,0 +1,89 @@ +# Copyright(c) 2016 Nippon Telegraph and Telephone Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import socket +import sys +import threading + +from oslo_log import log as oslo_logging +from oslo_utils import timeutils + +from masakarimonitors.instancemonitor.libvirt_handler import callback +from masakarimonitors.instancemonitor.libvirt_handler \ + import eventfilter_table as evft + + +LOG = oslo_logging.getLogger(__name__) + + +class EventFilter(object): + """Class of filtering events.""" + + def __init__(self): + self.callback = callback.Callback() + + def vir_event_filter(self, eventID, eventType, detail, uuID): + """Filter events from libvirt. + + :param eventID: EventID + :param eventType: Event type + :param detail: Event name + :pram uuID: UUID + """ + + noticeType = 'VM' + hostname = socket.gethostname() + currentTime = timeutils.utcnow() + + # All Event Output if debug mode is on. + msg = "libvirt Event Received.type = %s \ + hostname = %s uuid = %s time = %s eventID = %d eventType = %d \ + detail = %d" % ( + noticeType, + hostname, uuID, currentTime, eventID, + eventType, detail) + LOG.debug(msg) + + try: + if detail in evft.event_filter_dic[eventID][eventType]: + LOG.debug("Event Filter Matched.") + + eventID_val = evft.eventID_dic[eventID] + detail_val = evft.detail_dic[eventID][eventType][detail] + + # callback Thread Start + thread = threading.Thread( + target=self.callback.libvirt_event_callback, + args=(eventID_val, detail_val, + uuID, noticeType, + hostname, currentTime) + ) + thread.start() + else: + LOG.debug("Event Filter Unmatched.") + pass + + except KeyError: + LOG.debug("virEventFilter KeyError") + pass + except IndexError: + LOG.debug("virEventFilter IndexError") + pass + except TypeError: + LOG.debug("virEventFilter TypeError") + pass + except Exception: + LOG.debug("Unexpected error") + sys.exc_info()[0] + raise diff --git a/masakarimonitors/instancemonitor/libvirt_handler/eventfilter_table.py b/masakarimonitors/instancemonitor/libvirt_handler/eventfilter_table.py new file mode 100644 index 0000000..cfadaa0 --- /dev/null +++ b/masakarimonitors/instancemonitor/libvirt_handler/eventfilter_table.py @@ -0,0 +1,120 @@ +# Copyright(c) 2016 Nippon Telegraph and Telephone Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import libvirt + +# If is not defined internal , -1 is stored. +DUMMY = -1 + +# Enumerate all event that can get. +# Comment out events that is not targeted in the callback. +event_filter_dic = { + libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE: + { + libvirt.VIR_DOMAIN_EVENT_SUSPENDED: + ( + libvirt.VIR_DOMAIN_EVENT_SUSPENDED_IOERROR, + libvirt.VIR_DOMAIN_EVENT_SUSPENDED_WATCHDOG, + libvirt.VIR_DOMAIN_EVENT_SUSPENDED_API_ERROR + ), + libvirt.VIR_DOMAIN_EVENT_STOPPED: + ( + libvirt.VIR_DOMAIN_EVENT_STOPPED_SHUTDOWN, + libvirt.VIR_DOMAIN_EVENT_STOPPED_DESTROYED, + libvirt.VIR_DOMAIN_EVENT_STOPPED_FAILED, + ), + libvirt.VIR_DOMAIN_EVENT_SHUTDOWN: + ( + libvirt.VIR_DOMAIN_EVENT_SHUTDOWN_FINISHED, + ) + }, + libvirt.VIR_DOMAIN_EVENT_ID_REBOOT: {DUMMY: (DUMMY,)}, + libvirt.VIR_DOMAIN_EVENT_ID_WATCHDOG: + { + libvirt.VIR_DOMAIN_EVENT_WATCHDOG_NONE: (DUMMY,), + libvirt.VIR_DOMAIN_EVENT_WATCHDOG_PAUSE: (DUMMY,), + libvirt.VIR_DOMAIN_EVENT_WATCHDOG_RESET: (DUMMY,), + libvirt.VIR_DOMAIN_EVENT_WATCHDOG_POWEROFF: (DUMMY,), + libvirt.VIR_DOMAIN_EVENT_WATCHDOG_SHUTDOWN: (DUMMY,), + libvirt.VIR_DOMAIN_EVENT_WATCHDOG_DEBUG: (DUMMY,) + }, + libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR: + { + libvirt.VIR_DOMAIN_EVENT_IO_ERROR_NONE: (DUMMY,), + libvirt.VIR_DOMAIN_EVENT_IO_ERROR_PAUSE: (DUMMY,), + libvirt.VIR_DOMAIN_EVENT_IO_ERROR_REPORT: (DUMMY,) + }, + libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR_REASON: {DUMMY: (DUMMY,)}, + libvirt.VIR_DOMAIN_EVENT_ID_CONTROL_ERROR: {DUMMY: (DUMMY,)} + +} + +eventID_dic = { + libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE: 'LIFECYCLE', + libvirt.VIR_DOMAIN_EVENT_ID_REBOOT: 'REBOOT', + libvirt.VIR_DOMAIN_EVENT_ID_WATCHDOG: 'WATCHDOG', + libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR: 'IO_ERROR', + libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR_REASON: 'IO_ERROR_REASON', + libvirt.VIR_DOMAIN_EVENT_ID_CONTROL_ERROR: 'CONTROL_ERROR'} + +detail_dic = { + libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE: { + libvirt.VIR_DOMAIN_EVENT_SUSPENDED: { + libvirt.VIR_DOMAIN_EVENT_SUSPENDED_IOERROR: + 'SUSPENDED_IOERROR', + libvirt.VIR_DOMAIN_EVENT_SUSPENDED_WATCHDOG: + 'SUSPENDED_WATCHDOG', + libvirt.VIR_DOMAIN_EVENT_SUSPENDED_API_ERROR: + 'SUSPENDED_API_ERROR'}, + libvirt.VIR_DOMAIN_EVENT_STOPPED: { + libvirt.VIR_DOMAIN_EVENT_STOPPED_SHUTDOWN: + 'STOPPED_SHUTDOWN', + libvirt.VIR_DOMAIN_EVENT_STOPPED_DESTROYED: + 'STOPPED_DESTROYED', + libvirt.VIR_DOMAIN_EVENT_STOPPED_FAILED: + 'STOPPED_FAILED'}, + libvirt.VIR_DOMAIN_EVENT_SHUTDOWN: { + libvirt.VIR_DOMAIN_EVENT_SHUTDOWN_FINISHED: + 'SHUTDOWN_FINISHED'} + }, + libvirt.VIR_DOMAIN_EVENT_ID_REBOOT: { + DUMMY: { + DUMMY: 'UNKNOWN'}}, + libvirt.VIR_DOMAIN_EVENT_ID_WATCHDOG: { + libvirt.VIR_DOMAIN_EVENT_WATCHDOG_NONE: { + DUMMY: 'WATCHDOG_NONE'}, + libvirt.VIR_DOMAIN_EVENT_WATCHDOG_PAUSE: { + DUMMY: 'WATCHDOG_PAUSE'}, + libvirt.VIR_DOMAIN_EVENT_WATCHDOG_RESET: { + DUMMY: 'WATCHDOG_RESET'}, + libvirt.VIR_DOMAIN_EVENT_WATCHDOG_POWEROFF: { + DUMMY: 'WATCHDOG_POWEROFF'}, + libvirt.VIR_DOMAIN_EVENT_WATCHDOG_SHUTDOWN: { + DUMMY: 'WATCHDOG_SHUTDOWN'}, + libvirt.VIR_DOMAIN_EVENT_WATCHDOG_DEBUG: { + DUMMY: 'WATCHDOG_DEBUG'}}, + libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR: { + libvirt.VIR_DOMAIN_EVENT_IO_ERROR_NONE: { + DUMMY: 'IO_ERROR_NONE'}, + libvirt.VIR_DOMAIN_EVENT_IO_ERROR_PAUSE: { + DUMMY: 'IO_ERROR_PAUSE'}, + libvirt.VIR_DOMAIN_EVENT_IO_ERROR_REPORT: { + DUMMY: 'IO_ERROR_REPORT'}}, + libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR_REASON: { + DUMMY: { + DUMMY: 'UNKNOWN'}}, + libvirt.VIR_DOMAIN_EVENT_ID_CONTROL_ERROR: { + DUMMY: { + DUMMY: 'UNKNOWN'}} +}