Merge "Remove eventlet timer from multi_cell_list"
This commit is contained in:
@@ -13,8 +13,8 @@
|
||||
import abc
|
||||
import copy
|
||||
import heapq
|
||||
import time
|
||||
|
||||
import eventlet
|
||||
from oslo_log import log as logging
|
||||
|
||||
import nova.conf
|
||||
@@ -31,6 +31,11 @@ class RecordSortContext(object):
|
||||
def __init__(self, sort_keys, sort_dirs):
|
||||
self.sort_keys = sort_keys
|
||||
self.sort_dirs = sort_dirs
|
||||
self.start_time = time.monotonic()
|
||||
|
||||
@property
|
||||
def timeout_expired(self):
|
||||
return time.monotonic() - self.start_time > context.CELL_TIMEOUT
|
||||
|
||||
def compare_records(self, rec1, rec2):
|
||||
"""Implements cmp(rec1, rec2) for the first key that is different.
|
||||
@@ -85,34 +90,6 @@ class RecordWrapper(object):
|
||||
return r == -1
|
||||
|
||||
|
||||
def query_wrapper(ctx, fn, *args, **kwargs):
|
||||
"""This is a helper to run a query with predictable fail semantics.
|
||||
|
||||
This is a generator which will mimic the scatter_gather_cells() behavior
|
||||
by honoring a timeout and catching exceptions, yielding the usual
|
||||
sentinel objects instead of raising. It wraps these in RecordWrapper
|
||||
objects, which will prioritize them to the merge sort, causing them to
|
||||
be handled by the main get_objects_sorted() feeder loop quickly and
|
||||
gracefully.
|
||||
"""
|
||||
with eventlet.timeout.Timeout(context.CELL_TIMEOUT, exception.CellTimeout):
|
||||
try:
|
||||
for record in fn(ctx, *args, **kwargs):
|
||||
yield record
|
||||
except exception.CellTimeout:
|
||||
# Here, we yield a RecordWrapper (no sort_ctx needed since
|
||||
# we won't call into the implementation's comparison routines)
|
||||
# wrapping the sentinel indicating timeout.
|
||||
yield RecordWrapper(ctx, None, context.did_not_respond_sentinel)
|
||||
return
|
||||
except Exception as e:
|
||||
# Here, we yield a RecordWrapper (no sort_ctx needed since
|
||||
# we won't call into the implementation's comparison routines)
|
||||
# wrapping the exception object indicating failure.
|
||||
yield RecordWrapper(ctx, None, e.__class__(e.args))
|
||||
return
|
||||
|
||||
|
||||
class CrossCellLister(metaclass=abc.ABCMeta):
|
||||
"""An implementation of a cross-cell efficient lister.
|
||||
|
||||
@@ -215,6 +192,34 @@ class CrossCellLister(metaclass=abc.ABCMeta):
|
||||
"""
|
||||
pass
|
||||
|
||||
def query_wrapper(self, ctx, fn, *args, **kwargs):
|
||||
"""This is a helper to run a query with predictable fail semantics.
|
||||
|
||||
This is a generator which will mimic the scatter_gather_cells()
|
||||
behavior by honoring a timeout and catching exceptions, yielding the
|
||||
usual sentinel objects instead of raising. It wraps these in
|
||||
RecordWrapper objects, which will prioritize them to the merge sort,
|
||||
causing them to be handled by the main get_objects_sorted() feeder
|
||||
loop quickly and gracefully.
|
||||
"""
|
||||
try:
|
||||
for record in fn(ctx, *args, **kwargs):
|
||||
if self.sort_ctx.timeout_expired:
|
||||
raise exception.CellTimeout()
|
||||
yield record
|
||||
except exception.CellTimeout:
|
||||
# Here, we yield a RecordWrapper (no sort_ctx needed since
|
||||
# we won't call into the implementation's comparison routines)
|
||||
# wrapping the sentinel indicating timeout.
|
||||
yield RecordWrapper(ctx, None, context.did_not_respond_sentinel)
|
||||
return
|
||||
except Exception as e:
|
||||
# Here, we yield a RecordWrapper (no sort_ctx needed since
|
||||
# we won't call into the implementation's comparison routines)
|
||||
# wrapping the exception object indicating failure.
|
||||
yield RecordWrapper(ctx, None, e.__class__(e.args))
|
||||
return
|
||||
|
||||
def get_records_sorted(self, ctx, filters, limit, marker, **kwargs):
|
||||
"""Get a cross-cell list of records matching filters.
|
||||
|
||||
@@ -387,10 +392,12 @@ class CrossCellLister(metaclass=abc.ABCMeta):
|
||||
if self.cells:
|
||||
results = context.scatter_gather_cells(ctx, self.cells,
|
||||
context.CELL_TIMEOUT,
|
||||
query_wrapper, do_query)
|
||||
self.query_wrapper,
|
||||
do_query)
|
||||
else:
|
||||
results = context.scatter_gather_all_cells(ctx,
|
||||
query_wrapper, do_query)
|
||||
self.query_wrapper,
|
||||
do_query)
|
||||
|
||||
# If a limit was provided, it was passed to the per-cell query
|
||||
# routines. That means we have NUM_CELLS * limit items across
|
||||
|
2
nova/tests/fixtures/nova.py
vendored
2
nova/tests/fixtures/nova.py
vendored
@@ -1592,7 +1592,7 @@ class DownCellFixture(fixtures.Fixture):
|
||||
ctxt.cell_uuid = cell_uuid
|
||||
return multi_cell_list.RecordWrapper(ctxt, sort_ctx, thing)
|
||||
|
||||
if fn is multi_cell_list.query_wrapper:
|
||||
if fn.__func__ is multi_cell_list.CrossCellLister.query_wrapper:
|
||||
# If the function called through scatter-gather utility is the
|
||||
# multi_cell_list.query_wrapper, we should wrap the exception
|
||||
# object into the multi_cell_list.RecordWrapper. This is
|
||||
|
@@ -145,27 +145,45 @@ class TestUtils(test.NoDBTestCase):
|
||||
for thing in data:
|
||||
yield thing
|
||||
|
||||
lister = TestLister([], [], [])
|
||||
self.assertEqual([1, 2, 3],
|
||||
list(multi_cell_list.query_wrapper(
|
||||
list(lister.query_wrapper(
|
||||
None, test, [1, 2, 3])))
|
||||
|
||||
def test_query_wrapper_timeout(self):
|
||||
@mock.patch('time.monotonic')
|
||||
def test_query_wrapper_timeout(self, mock_monotonic):
|
||||
# This fake query function will return more data that we have "time"
|
||||
# to process
|
||||
def test(ctx):
|
||||
raise exception.CellTimeout
|
||||
return [multi_cell_list.RecordWrapper(ctx, None, 1)] * 10
|
||||
|
||||
self.assertEqual([context.did_not_respond_sentinel],
|
||||
[x._db_record for x in
|
||||
multi_cell_list.query_wrapper(
|
||||
mock.MagicMock(), test)])
|
||||
# Return 0 for the start time, 1s later for the first query,
|
||||
# then 1000s (more than the timeout) for the subsequent queries.
|
||||
times = [0, 1]
|
||||
|
||||
def fake_monotonic():
|
||||
try:
|
||||
return times.pop(0)
|
||||
except IndexError:
|
||||
return 1000
|
||||
|
||||
mock_monotonic.side_effect = fake_monotonic
|
||||
|
||||
# Do the query and expect to get one result before the timeout
|
||||
lister = TestLister([], [], [])
|
||||
result = lister.query_wrapper(mock.MagicMock(), test)
|
||||
self.assertEqual([1, context.did_not_respond_sentinel],
|
||||
[x._db_record for x in result])
|
||||
|
||||
def test_query_wrapper_fail(self):
|
||||
def tester(ctx):
|
||||
raise test.TestingException
|
||||
|
||||
lister = TestLister([], [], [])
|
||||
self.assertIsInstance(
|
||||
# query_wrapper is a generator so we convert to a list and
|
||||
# check the type on the first and only result
|
||||
[x._db_record for x in multi_cell_list.query_wrapper(
|
||||
[x._db_record for x in lister.query_wrapper(
|
||||
mock.MagicMock(), tester)][0],
|
||||
test.TestingException)
|
||||
|
||||
|
Reference in New Issue
Block a user