sweep the tree to change from gevent to ryu.lib.hub

mostly mechanical changes.
also, change the requirement from gevent to eventlet.

Signed-off-by: YAMAMOTO Takashi <yamamoto@valinux.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
YAMAMOTO Takashi
2013-04-25 16:05:50 +09:00
committed by FUJITA Tomonori
parent a01972e18c
commit 9e6d3053c0
17 changed files with 77 additions and 84 deletions

View File

@@ -16,9 +16,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import gevent from ryu.lib import hub
from gevent import monkey hub.patch()
monkey.patch_all()
# TODO: # TODO:
# Right now, we have our own patched copy of ovs python bindings # Right now, we have our own patched copy of ovs python bindings
@@ -69,16 +68,16 @@ def main():
services = [] services = []
ctlr = controller.OpenFlowController() ctlr = controller.OpenFlowController()
thr = gevent.spawn_later(0, ctlr) thr = hub.spawn(ctlr)
services.append(thr) services.append(thr)
webapp = wsgi.start_service(app_mgr) webapp = wsgi.start_service(app_mgr)
if webapp: if webapp:
thr = gevent.spawn_later(0, webapp) thr = hub.spawn(webapp)
services.append(thr) services.append(thr)
try: try:
gevent.joinall(services) hub.joinall(services)
finally: finally:
app_mgr.close() app_mgr.close()

View File

@@ -36,10 +36,6 @@ from ryu.lib.ovs import bridge
from ryu.lib.quantum_ifaces import QuantumIfaces from ryu.lib.quantum_ifaces import QuantumIfaces
from gevent import monkey
monkey.patch_all()
CONF = cfg.CONF CONF = cfg.CONF

View File

@@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import gevent
import logging import logging
from ryu.app import (conf_switch_key, from ryu.app import (conf_switch_key,
@@ -27,6 +26,7 @@ from ryu.controller import (conf_switch,
tunnels) tunnels)
import ryu.exception as ryu_exc import ryu.exception as ryu_exc
from ryu.lib import dpid as dpid_lib from ryu.lib import dpid as dpid_lib
from ryu.lib import hub
from ryu.lib.ovs import bridge from ryu.lib.ovs import bridge
from ryu.ofproto import nx_match from ryu.ofproto import nx_match
@@ -131,7 +131,7 @@ class SimpleVLAN(app_manager.RyuApp):
port_name = port.name.rstrip('\x00') port_name = port.name.rstrip('\x00')
try: try:
ovs_br.set_db_attribute("Port", port_name, "tag", tunnel_key) ovs_br.set_db_attribute("Port", port_name, "tag", tunnel_key)
except gevent.Timeout: except hub.Timeout:
self.logger.error('timeout') self.logger.error('timeout')
return return

View File

@@ -15,7 +15,6 @@
# limitations under the License. # limitations under the License.
import collections import collections
import gevent
from oslo.config import cfg from oslo.config import cfg
import logging import logging
import netaddr import netaddr
@@ -29,6 +28,7 @@ from ryu.controller import (conf_switch,
network, network,
tunnels) tunnels)
from ryu.lib import dpid as dpid_lib from ryu.lib import dpid as dpid_lib
from ryu.lib import hub
from ryu.lib.ovs import bridge as ovs_bridge from ryu.lib.ovs import bridge as ovs_bridge
@@ -120,8 +120,8 @@ class TunnelDP(object):
self.conf_switch = conf_switch_ self.conf_switch = conf_switch_
self.inited = False self.inited = False
self.req_q = gevent.queue.Queue() self.req_q = hub.Queue()
self.thr = gevent.spawn_later(0, self._serve_loop) self.thr = hub.spawn(self._serve_loop)
def _init(self): def _init(self):
self.ovs_bridge.init() self.ovs_bridge.init()
@@ -289,7 +289,7 @@ class TunnelDP(object):
if not self.inited: if not self.inited:
try: try:
self._init() self._init()
except gevent.timeout.Timeout: except hub.Timeout:
self.logger.warn('_init timeouted') self.logger.warn('_init timeouted')
req = None req = None
@@ -315,7 +315,7 @@ class TunnelDP(object):
self._del_tunnel_port_ip(req.remote_ip) self._del_tunnel_port_ip(req.remote_ip)
else: else:
self.logger.error('unknown request %s', req) self.logger.error('unknown request %s', req)
except gevent.timeout.Timeout: except hub.Timeout:
# timeout. try again # timeout. try again
self.logger.warn('timeout try again') self.logger.warn('timeout try again')
continue continue

View File

@@ -17,7 +17,7 @@
from oslo.config import cfg from oslo.config import cfg
import webob.dec import webob.dec
from gevent import pywsgi from ryu.lib import hub
from routes import Mapper from routes import Mapper
from routes.util import URLGenerator from routes.util import URLGenerator
@@ -80,7 +80,7 @@ class WSGIApplication(object):
return controller(req) return controller(req)
class WSGIServer(pywsgi.WSGIServer): class WSGIServer(hub.WSGIServer):
def __init__(self, application, **config): def __init__(self, application, **config):
super(WSGIServer, self).__init__((CONF.wsapi_host, CONF.wsapi_port), super(WSGIServer, self).__init__((CONF.wsapi_host, CONF.wsapi_port),
application, **config) application, **config)

View File

@@ -17,14 +17,12 @@
import inspect import inspect
import itertools import itertools
import logging import logging
import gevent
from gevent.queue import Queue
from ryu import utils from ryu import utils
from ryu.controller.handler import register_instance from ryu.controller.handler import register_instance
from ryu.controller.controller import Datapath from ryu.controller.controller import Datapath
from ryu.controller.event import EventRequestBase, EventReplyBase from ryu.controller.event import EventRequestBase, EventReplyBase
from ryu.lib import hub
LOG = logging.getLogger('ryu.base.app_manager') LOG = logging.getLogger('ryu.base.app_manager')
@@ -62,10 +60,10 @@ class RyuApp(object):
self.event_handlers = {} self.event_handlers = {}
self.observers = {} self.observers = {}
self.threads = [] self.threads = []
self.events = Queue() self.events = hub.Queue()
self.replies = Queue() self.replies = hub.Queue()
self.logger = logging.getLogger(self.name) self.logger = logging.getLogger(self.name)
self.threads.append(gevent.spawn(self._event_loop)) self.threads.append(hub.spawn(self._event_loop))
def register_handler(self, ev_cls, handler): def register_handler(self, ev_cls, handler):
assert callable(handler) assert callable(handler)

View File

@@ -17,13 +17,12 @@
import contextlib import contextlib
from oslo.config import cfg from oslo.config import cfg
import logging import logging
import gevent from ryu.lib import hub
from ryu.lib.hub import StreamServer
import traceback import traceback
import random import random
import greenlet import greenlet
import ssl import ssl
from gevent.server import StreamServer
from gevent.queue import Queue
import ryu.base.app_manager import ryu.base.app_manager
@@ -124,7 +123,7 @@ class Datapath(object):
# The limit is arbitrary. We need to limit queue size to # The limit is arbitrary. We need to limit queue size to
# prevent it from eating memory up # prevent it from eating memory up
self.send_q = Queue(16) self.send_q = hub.Queue(16)
self.set_version(max(self.supported_ofp_version)) self.set_version(max(self.supported_ofp_version))
self.xid = random.randint(0, self.ofproto.MAX_XID) self.xid = random.randint(0, self.ofproto.MAX_XID)
@@ -188,7 +187,7 @@ class Datapath(object):
count += 1 count += 1
if count > 2048: if count > 2048:
count = 0 count = 0
gevent.sleep(0) hub.sleep(0)
@_deactivate @_deactivate
def _send_loop(self): def _send_loop(self):
@@ -218,7 +217,7 @@ class Datapath(object):
self.send(msg.buf) self.send(msg.buf)
def serve(self): def serve(self):
send_thr = gevent.spawn(self._send_loop) send_thr = hub.spawn(self._send_loop)
# send hello message immediately # send hello message immediately
hello = self.ofproto_parser.OFPHello(self) hello = self.ofproto_parser.OFPHello(self)
@@ -227,8 +226,8 @@ class Datapath(object):
try: try:
self._recv_loop() self._recv_loop()
finally: finally:
gevent.kill(send_thr) hub.kill(send_thr)
gevent.joinall([send_thr]) hub.joinall([send_thr])
# #
# Utility methods for convenience # Utility methods for convenience

View File

@@ -114,5 +114,9 @@ if HUB_TYPE == 'eventlet':
def wait(self, timeout=None): def wait(self, timeout=None):
if timeout is None: if timeout is None:
self._wait() self._wait()
with Timeout(timeout): else:
self._wait() try:
with Timeout(timeout):
self._wait()
except Timeout:
pass

View File

@@ -16,9 +16,9 @@
import struct import struct
import socket import socket
import logging import logging
import gevent
from ryu.ofproto import ofproto_v1_0 from ryu.ofproto import ofproto_v1_0
from ryu.lib import hub
from ryu.lib.mac import haddr_to_bin, haddr_to_str from ryu.lib.mac import haddr_to_bin, haddr_to_str
@@ -195,13 +195,13 @@ def nw_dst_to_str(wildcards, addr):
def send_stats_request(dp, stats, waiters, msgs): def send_stats_request(dp, stats, waiters, msgs):
dp.set_xid(stats) dp.set_xid(stats)
waiters_per_dp = waiters.setdefault(dp.id, {}) waiters_per_dp = waiters.setdefault(dp.id, {})
lock = gevent.event.AsyncResult() lock = hub.Event()
waiters_per_dp[stats.xid] = (lock, msgs) waiters_per_dp[stats.xid] = (lock, msgs)
dp.send_msg(stats) dp.send_msg(stats)
try: try:
lock.get(timeout=DEFAULT_TIMEOUT) lock.wait(timeout=DEFAULT_TIMEOUT)
except gevent.Timeout: except hub.Timeout:
del waiters_per_dp[stats.xid] del waiters_per_dp[stats.xid]

View File

@@ -16,11 +16,11 @@
import struct import struct
import socket import socket
import logging import logging
import gevent
from ryu.ofproto import inet from ryu.ofproto import inet
from ryu.ofproto import ofproto_v1_2 from ryu.ofproto import ofproto_v1_2
from ryu.ofproto import ofproto_v1_2_parser from ryu.ofproto import ofproto_v1_2_parser
from ryu.lib import hub
from ryu.lib import mac from ryu.lib import mac
@@ -187,13 +187,13 @@ def match_ip_to_str(value, mask):
def send_stats_request(dp, stats, waiters, msgs): def send_stats_request(dp, stats, waiters, msgs):
dp.set_xid(stats) dp.set_xid(stats)
waiters_per_dp = waiters.setdefault(dp.id, {}) waiters_per_dp = waiters.setdefault(dp.id, {})
lock = gevent.event.AsyncResult() lock = hub.Event()
waiters_per_dp[stats.xid] = (lock, msgs) waiters_per_dp[stats.xid] = (lock, msgs)
dp.send_msg(stats) dp.send_msg(stats)
try: try:
lock.get(timeout=DEFAULT_TIMEOUT) lock.wait(timeout=DEFAULT_TIMEOUT)
except gevent.Timeout: except hub.Timeout:
del waiters_per_dp[stats.xid] del waiters_per_dp[stats.xid]

View File

@@ -15,7 +15,6 @@
# limitations under the License. # limitations under the License.
import gevent
import itertools import itertools
import logging import logging
import operator import operator
@@ -31,6 +30,7 @@ from ovs import (jsonrpc,
stream) stream)
from ovs.db import idl from ovs.db import idl
from ryu.lib import hub
from ryu.lib.ovs import vswitch_idl from ryu.lib.ovs import vswitch_idl
LOG = logging.getLogger(__name__) # use ovs.vlog? LOG = logging.getLogger(__name__) # use ovs.vlog?
@@ -1002,7 +1002,7 @@ class VSCtl(object):
if timeout_sec is None: if timeout_sec is None:
self._run_command(commands) self._run_command(commands)
else: else:
with gevent.Timeout(timeout_sec, exception): with hub.Timeout(timeout_sec, exception):
self._run_command(commands) self._run_command(commands)
# commands # commands

View File

@@ -110,7 +110,6 @@ Here is my sys.config used for this test.
""" """
import gevent
import traceback import traceback
import lxml.etree import lxml.etree
@@ -118,6 +117,7 @@ import ncclient
from ryu.base import app_manager from ryu.base import app_manager
from ryu.lib.netconf import constants as nc_consts from ryu.lib.netconf import constants as nc_consts
from ryu.lib import hub
from ryu.lib import of_config from ryu.lib import of_config
from ryu.lib.of_config import capable_switch from ryu.lib.of_config import capable_switch
from ryu.lib.of_config import constants as ofc_consts from ryu.lib.of_config import constants as ofc_consts
@@ -218,7 +218,7 @@ class OFConfigClient(app_manager.RyuApp):
self.switch = capable_switch.OFCapableSwitch( self.switch = capable_switch.OFCapableSwitch(
host=HOST, port=PORT, username=USERNAME, password=PASSWORD, host=HOST, port=PORT, username=USERNAME, password=PASSWORD,
unknown_host_cb=lambda host, fingeprint: True) unknown_host_cb=lambda host, fingeprint: True)
gevent.spawn(self._do_of_config) hub.spawn(self._do_of_config)
def _validate(self, tree): def _validate(self, tree):
xmlschema = _get_schema() xmlschema = _get_schema()

View File

@@ -79,13 +79,11 @@ class Test_hub(unittest.TestCase):
ev = hub.Event() ev = hub.Event()
result = [] result = []
with hub.Timeout(2): with hub.Timeout(2):
hub.spawn(_child, ev, result) t = hub.spawn(_child, ev, result)
try: ev.wait(timeout=0.5)
ev.wait(timeout=0.5)
raise BaseException("should timed out")
except hub.Timeout:
pass
assert len(result) == 0 assert len(result) == 0
ev.wait()
assert len(result) == 1
def test_spawn_event3(self): def test_spawn_event3(self):
def _child(ev, ev2, result): def _child(ev, ev2, result):

View File

@@ -15,12 +15,11 @@
import logging import logging
import gevent
import gevent.queue
import time import time
from ryu.base import app_manager from ryu.base import app_manager
from ryu.controller import handler from ryu.controller import handler
from ryu.lib import hub
from ryu.topology import event from ryu.topology import event
from ryu.topology import switches from ryu.topology import switches
@@ -39,14 +38,14 @@ class DiscoveryEventDumper(app_manager.RyuApp):
# For testing when sync and async request. # For testing when sync and async request.
# self.threads.append( # self.threads.append(
# gevent.spawn_later(0, self._switch_request_sync, 5)) # hub.spawn(self._switch_request_sync, 5))
# self.threads.append( # self.threads.append(
# gevent.spawn_later(0, self._switch_request_async, 10)) # hub.spawn(self._switch_request_async, 10))
# #
# self.threads.append( # self.threads.append(
# gevent.spawn_later(0, self._link_request_sync, 5)) # hub.spawn(self._link_request_sync, 5))
# self.threads.append( # self.threads.append(
# gevent.spawn_later(0, self._link_request_async, 10)) # hub.spawn(self._link_request_async, 10))
self.is_active = True self.is_active = True
@@ -82,19 +81,19 @@ class DiscoveryEventDumper(app_manager.RyuApp):
while self.is_active: while self.is_active:
request = event.EventSwitchRequest() request = event.EventSwitchRequest()
LOG.debug('switch_request sync %s thread(%s)', LOG.debug('switch_request sync %s thread(%s)',
request, id(gevent.getcurrent())) request, id(hub.getcurrent()))
reply = self.send_request(request) reply = self.send_request(request)
LOG.debug('switch_reply sync %s', reply) LOG.debug('switch_reply sync %s', reply)
if len(reply.switches) > 0: if len(reply.switches) > 0:
for sw in reply.switches: for sw in reply.switches:
LOG.debug(' %s', sw) LOG.debug(' %s', sw)
gevent.sleep(interval) hub.sleep(interval)
def _switch_request_async(self, interval): def _switch_request_async(self, interval):
while self.is_active: while self.is_active:
request = event.EventSwitchRequest() request = event.EventSwitchRequest()
LOG.debug('switch_request async %s thread(%s)', LOG.debug('switch_request async %s thread(%s)',
request, id(gevent.getcurrent())) request, id(hub.getcurrent()))
self.send_event(request.dst, request) self.send_event(request.dst, request)
start = time.time() start = time.time()
@@ -104,16 +103,16 @@ class DiscoveryEventDumper(app_manager.RyuApp):
if time.time() > start + i: if time.time() > start + i:
i += 1 i += 1
LOG.debug(' thread is busy... %s/%s thread(%s)', LOG.debug(' thread is busy... %s/%s thread(%s)',
i, busy, id(gevent.getcurrent())) i, busy, id(hub.getcurrent()))
LOG.debug(' thread yield to switch_reply handler. thread(%s)', LOG.debug(' thread yield to switch_reply handler. thread(%s)',
id(gevent.getcurrent())) id(hub.getcurrent()))
# yield # yield
gevent.sleep(0) hub.sleep(0)
LOG.debug(' thread get back. thread(%s)', LOG.debug(' thread get back. thread(%s)',
id(gevent.getcurrent())) id(hub.getcurrent()))
gevent.sleep(interval - busy) hub.sleep(interval - busy)
@handler.set_ev_cls(event.EventSwitchReply) @handler.set_ev_cls(event.EventSwitchReply)
def switch_reply_handler(self, reply): def switch_reply_handler(self, reply):
@@ -126,19 +125,19 @@ class DiscoveryEventDumper(app_manager.RyuApp):
while self.is_active: while self.is_active:
request = event.EventLinkRequest() request = event.EventLinkRequest()
LOG.debug('link_request sync %s thread(%s)', LOG.debug('link_request sync %s thread(%s)',
request, id(gevent.getcurrent())) request, id(hub.getcurrent()))
reply = self.send_request(request) reply = self.send_request(request)
LOG.debug('link_reply sync %s', reply) LOG.debug('link_reply sync %s', reply)
if len(reply.links) > 0: if len(reply.links) > 0:
for link in reply.links: for link in reply.links:
LOG.debug(' %s', link) LOG.debug(' %s', link)
gevent.sleep(interval) hub.sleep(interval)
def _link_request_async(self, interval): def _link_request_async(self, interval):
while self.is_active: while self.is_active:
request = event.EventLinkRequest() request = event.EventLinkRequest()
LOG.debug('link_request async %s thread(%s)', LOG.debug('link_request async %s thread(%s)',
request, id(gevent.getcurrent())) request, id(hub.getcurrent()))
self.send_event(request.dst, request) self.send_event(request.dst, request)
start = time.time() start = time.time()
@@ -148,16 +147,16 @@ class DiscoveryEventDumper(app_manager.RyuApp):
if time.time() > start + i: if time.time() > start + i:
i += 1 i += 1
LOG.debug(' thread is busy... %s/%s thread(%s)', LOG.debug(' thread is busy... %s/%s thread(%s)',
i, busy, id(gevent.getcurrent())) i, busy, id(hub.getcurrent()))
LOG.debug(' thread yield to link_reply handler. thread(%s)', LOG.debug(' thread yield to link_reply handler. thread(%s)',
id(gevent.getcurrent())) id(hub.getcurrent()))
# yield # yield
gevent.sleep(0) hub.sleep(0)
LOG.debug(' thread get back. thread(%s)', LOG.debug(' thread get back. thread(%s)',
id(gevent.getcurrent())) id(hub.getcurrent()))
gevent.sleep(interval - busy) hub.sleep(interval - busy)
@handler.set_ev_cls(event.EventLinkReply) @handler.set_ev_cls(event.EventLinkReply)
def link_reply_handler(self, reply): def link_reply_handler(self, reply):

View File

@@ -14,7 +14,6 @@
# limitations under the License. # limitations under the License.
import logging import logging
import gevent
import struct import struct
import time import time
import json import json
@@ -26,6 +25,7 @@ from ryu.controller import ofp_event
from ryu.controller.handler import set_ev_cls from ryu.controller.handler import set_ev_cls
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.exception import RyuException from ryu.exception import RyuException
from ryu.lib import hub
from ryu.lib.mac import DONTCARE, haddr_to_str from ryu.lib.mac import DONTCARE, haddr_to_str
from ryu.lib.dpid import dpid_to_str, str_to_dpid from ryu.lib.dpid import dpid_to_str, str_to_dpid
from ryu.lib.port_no import port_no_to_str from ryu.lib.port_no import port_no_to_str
@@ -453,17 +453,17 @@ class Switches(app_manager.RyuApp):
if self.link_discovery: if self.link_discovery:
self.install_flow = CONF.install_lldp_flow self.install_flow = CONF.install_lldp_flow
self.explicit_drop = CONF.explicit_drop self.explicit_drop = CONF.explicit_drop
self.lldp_event = gevent.event.Event() self.lldp_event = hub.Event()
self.link_event = gevent.event.Event() self.link_event = hub.Event()
self.threads.append(gevent.spawn_later(0, self.lldp_loop)) self.threads.append(hub.spawn(self.lldp_loop))
self.threads.append(gevent.spawn_later(0, self.link_loop)) self.threads.append(hub.spawn(self.link_loop))
def close(self): def close(self):
self.is_active = False self.is_active = False
if self.link_discovery: if self.link_discovery:
self.lldp_event.set() self.lldp_event.set()
self.link_event.set() self.link_event.set()
gevent.joinall(self.threads) hub.joinall(self.threads)
def _register(self, dp): def _register(self, dp):
assert dp.id is not None assert dp.id is not None
@@ -745,7 +745,7 @@ class Switches(app_manager.RyuApp):
self.send_lldp_packet(port) self.send_lldp_packet(port)
for port in ports: for port in ports:
self.send_lldp_packet(port) self.send_lldp_packet(port)
gevent.sleep(self.LLDP_SEND_GUARD) # don't burst hub.sleep(self.LLDP_SEND_GUARD) # don't burst
if timeout is not None and ports: if timeout is not None and ports:
timeout = 0 # We have already slept timeout = 0 # We have already slept

View File

@@ -6,7 +6,7 @@ source-dir = doc/source
[bdist_rpm] [bdist_rpm]
Release = 1 Release = 1
Group = Applications/Accessories Group = Applications/Accessories
Requires = python-gevent >= 0.13, python-routes, python-webob, python-paramiko Requires = python-eventlet, python-routes, python-webob, python-paramiko
doc_files = LICENSE doc_files = LICENSE
MANIFEST.in MANIFEST.in
README.rst README.rst

View File

@@ -1,4 +1,4 @@
gevent>=0.13 eventlet
routes routes
webob>=1.0.8 webob>=1.0.8
paramiko paramiko