- factor out the guts of Dogpile into a new object Lock, that simplifies

the state model and is a per-use object only
- instead of checking self.is_expired inside the "acquired createlock" region,
call value_and_created_fn() to coordinate with the state of other processes;
this doesn't occur in single-process Dogpile usage due to the whole NameRegistry thing,
but we can't really rely upon that for cross process.   This is the start of the fix
for #1, however some dogpile tests are now acting strangely.
This commit is contained in:
Mike Bayer
2012-10-29 14:13:59 -04:00
parent 268f3a028a
commit 5db7cd3bb5
5 changed files with 394 additions and 122 deletions

View File

@@ -9,3 +9,4 @@ syntax:regexp
\.coverage
\.DS_Store
test.cfg
^.venv

View File

@@ -1,8 +1,8 @@
from .dogpile import Dogpile, SyncReaderDogpile, NeedRegenerationException
from .dogpile import Dogpile, SyncReaderDogpile, NeedRegenerationException, Lock
from .nameregistry import NameRegistry
from .readwrite_lock import ReadWriteMutex
__all__ = 'Dogpile', 'SyncReaderDogpile', 'NeedRegenerationException', 'NameRegistry', 'ReadWriteMutex'
__all__ = 'Dogpile', 'SyncReaderDogpile', 'NeedRegenerationException', 'NameRegistry', 'ReadWriteMutex', 'Lock'
__version__ = '0.3.3'

View File

@@ -5,23 +5,117 @@ from .readwrite_lock import ReadWriteMutex
log = logging.getLogger(__name__)
class NeedRegenerationException(Exception):
"""An exception that when raised in the 'with' block,
forces the 'has_value' flag to False and incurs a
"""An exception that when raised in the 'with' block,
forces the 'has_value' flag to False and incurs a
regeneration of the value.
"""
NOT_REGENERATED = object()
class Lock(object):
def __init__(self,
mutex,
creator,
value_and_created_fn,
expiretime
):
self.mutex = mutex
self.creator = creator
self.value_and_created_fn = value_and_created_fn
self.expiretime = expiretime
def _is_expired(self, createdtime):
"""Return true if the expiration time is reached, or no
value is available."""
return not self._has_value(createdtime) or \
(
self.expiretime is not None and
time.time() - createdtime > self.expiretime
)
def _has_value(self, createdtime):
"""Return true if the creation function has proceeded
at least once."""
return createdtime > 0
def _enter(self):
value_fn = self.value_and_created_fn
try:
value = value_fn()
value, createdtime = value
except NeedRegenerationException:
log.debug("NeedRegenerationException")
value = NOT_REGENERATED
createdtime = -1
generated = self._enter_create(createdtime)
if generated is not NOT_REGENERATED:
generated, createdtime = generated
return generated
elif value is NOT_REGENERATED:
try:
value, createdtime = value_fn()
return value
except NeedRegenerationException:
raise Exception("Generation function should "
"have just been called by a concurrent "
"thread.")
else:
return value
def _enter_create(self, createdtime):
if not self._is_expired(createdtime):
return NOT_REGENERATED
if self._has_value(createdtime):
if not self.mutex.acquire(False):
log.debug("creation function in progress "
"elsewhere, returning")
return NOT_REGENERATED
else:
log.debug("no value, waiting for create lock")
self.mutex.acquire()
try:
log.debug("value creation lock %r acquired" % self.mutex)
# see if someone created the value already
try:
value, createdtime = self.value_and_created_fn()
except NeedRegenerationException:
pass
else:
if not self._is_expired(createdtime):
log.debug("value already present")
return value, createdtime
log.debug("Calling creation function")
created = self.creator()
return created
finally:
self.mutex.release()
log.debug("Released creation lock")
def __enter__(self):
return self._enter()
def __exit__(self, type, value, traceback):
pass
class Dogpile(object):
"""Dogpile lock class.
Provides an interface around an arbitrary mutex
that allows one thread/process to be elected as
the creator of a new value, while other threads/processes
continue to return the previous version
Provides an interface around an arbitrary mutex
that allows one thread/process to be elected as
the creator of a new value, while other threads/processes
continue to return the previous version
of that value.
:param expiretime: Expiration time in seconds. Set to
@@ -30,7 +124,7 @@ class Dogpile(object):
current time.
:param lock: a mutex object that provides
``acquire()`` and ``release()`` methods.
"""
def __init__(self, expiretime, init=False, lock=None):
"""Construct a new :class:`.Dogpile`.
@@ -51,130 +145,73 @@ class Dogpile(object):
If the value here is -1, it is assumed the value
should recreate immediately.
"""
def acquire(self, creator,
value_fn=None,
def acquire(self, creator,
value_fn=None,
value_and_created_fn=None):
"""Acquire the lock, returning a context manager.
:param creator: Creation function, used if this thread
is chosen to create a new value.
:param value_fn: Optional function that returns
the value from some datasource. Will be returned
if regeneration is not needed.
:param value_and_created_fn: Like value_fn, but returns a tuple
of (value, createdtime). The returned createdtime
of (value, createdtime). The returned createdtime
will replace the "createdtime" value on this dogpile
lock. This option removes the need for the dogpile lock
itself to remain persistent across usages; another
itself to remain persistent across usages; another
dogpile can come along later and pick up where the
previous one left off.
previous one left off.
"""
dogpile = self
class Lock(object):
def __enter__(self):
return dogpile._enter(creator, value_fn,
value_and_created_fn)
if value_and_created_fn is None:
if value_fn is None:
def value_and_created_fn():
return None, self.createdtime
else:
def value_and_created_fn():
return value_fn(), self.createdtime
def __exit__(self, type, value, traceback):
dogpile._exit()
return Lock()
def creator_wrapper():
return creator(), time.time()
else:
creator_wrapper = creator
return Lock(
self.dogpilelock,
creator_wrapper,
value_and_created_fn,
self.expiretime
)
@property
def is_expired(self):
"""Return true if the expiration time is reached, or no
"""Return true if the expiration time is reached, or no
value is available."""
return not self.has_value or \
(
self.expiretime is not None and
self.expiretime is not None and
time.time() - self.createdtime > self.expiretime
)
@property
def has_value(self):
"""Return true if the creation function has proceeded
"""Return true if the creation function has proceeded
at least once."""
return self.createdtime > 0
def _enter(self, creator, value_fn=None, value_and_created_fn=None):
if value_and_created_fn:
value_fn = value_and_created_fn
if not value_fn:
return self._enter_create(creator)
try:
value = value_fn()
if value_and_created_fn:
value, self.createdtime = value
except NeedRegenerationException:
log.debug("NeedRegenerationException")
self.createdtime = -1
value = NOT_REGENERATED
generated = self._enter_create(creator)
if generated is not NOT_REGENERATED:
if value_and_created_fn:
generated, self.createdtime = generated
return generated
elif value is NOT_REGENERATED:
try:
if value_and_created_fn:
value, self.createdtime = value_fn()
else:
value = value_fn()
return value
except NeedRegenerationException:
raise Exception("Generation function should "
"have just been called by a concurrent "
"thread.")
else:
return value
def _enter_create(self, creator):
if not self.is_expired:
return NOT_REGENERATED
if self.has_value:
if not self.dogpilelock.acquire(False):
log.debug("creation function in progress "
"elsewhere, returning")
return NOT_REGENERATED
else:
log.debug("no value, waiting for create lock")
self.dogpilelock.acquire()
try:
log.debug("value creation lock %r acquired" % self.dogpilelock)
# see if someone created the value already
if not self.is_expired:
log.debug("value already present")
return NOT_REGENERATED
log.debug("Calling creation function")
created = creator()
self.createdtime = time.time()
return created
finally:
self.dogpilelock.release()
log.debug("Released creation lock")
def _exit(self):
pass
class SyncReaderDogpile(Dogpile):
"""Provide a read-write lock function on top of the :class:`.Dogpile`
class.
"""
def __init__(self, *args, **kw):
super(SyncReaderDogpile, self).__init__(*args, **kw)
@@ -182,10 +219,10 @@ class SyncReaderDogpile(Dogpile):
def acquire_write_lock(self):
"""Return the "write" lock context manager.
This will provide a section that is mutexed against
all readers/writers for the dogpile-maintained value.
"""
dogpile = self

View File

@@ -93,11 +93,11 @@ class ConcurrencyTest(TestCase):
log.error("Assertion failed: " + msg, *args)
assert False, msg % args
def _test_multi(self, num_threads,
expiretime,
def _test_multi(self, num_threads,
expiretime,
creation_time,
num_usages,
usage_time,
num_usages,
usage_time,
delay_time,
cache_expire_time=None,
slow_write_time=None,
@@ -113,8 +113,8 @@ class ConcurrencyTest(TestCase):
use_registry = inline_create == 'get_value_plus_created'
if use_registry:
reg = NameRegistry(dogpile_cls)
get_dogpile = lambda: reg.get(expiretime)
reg = NameRegistry(lambda key, exptime: dogpile_cls(exptime))
get_dogpile = lambda: reg.get("somekey", expiretime)
else:
dogpile = dogpile_cls(expiretime)
get_dogpile = lambda: dogpile
@@ -139,7 +139,7 @@ class ConcurrencyTest(TestCase):
if slow_write_time:
effective_creation_time += slow_write_time
max_stale = (effective_expiretime + effective_creation_time +
max_stale = (effective_expiretime + effective_creation_time +
usage_time + delay_time) * 1.1
the_resource = []
@@ -177,8 +177,8 @@ class ConcurrencyTest(TestCase):
if not the_resource:
raise NeedRegenerationException()
if time.time() - the_resource[-1] > cache_expire_time:
# should never hit a cache invalidation
# if we've set expiretime below the cache
# should never hit a cache invalidation
# if we've set expiretime below the cache
# expire time (assuming a cache which
# honors this).
self._assert_log(
@@ -215,7 +215,7 @@ class ConcurrencyTest(TestCase):
@contextlib.contextmanager
def enter_dogpile_block(dogpile):
with dogpile.acquire(
lambda: create_resource(dogpile),
lambda: create_resource(dogpile),
get_value
) as rec:
yield rec
@@ -223,7 +223,7 @@ class ConcurrencyTest(TestCase):
@contextlib.contextmanager
def enter_dogpile_block(dogpile):
with dogpile.acquire(
lambda: create_resource(dogpile),
lambda: create_resource(dogpile),
value_and_created_fn=get_value
) as rec:
yield rec
@@ -258,7 +258,7 @@ class ConcurrencyTest(TestCase):
time_since_create < max_stale,
"Time since create %.4f max stale time %s, "
"total waited %s",
time_since_create, max_stale,
time_since_create, max_stale,
slow_waiters[0]
)
@@ -296,19 +296,19 @@ class ConcurrencyTest(TestCase):
log.info("Test Summary")
log.info("num threads: %s; expiretime: %s; creation_time: %s; "
"num_usages: %s; "
"usage_time: %s; delay_time: %s",
num_threads, expiretime, creation_time, num_usages,
"usage_time: %s; delay_time: %s",
num_threads, expiretime, creation_time, num_usages,
usage_time, delay_time
)
log.info("cache expire time: %s; unsafe cache: %s slow "
"write time: %s; inline: %s; registry: %s",
cache_expire_time, unsafe_cache, slow_write_time,
"write time: %s; inline: %s; registry: %s",
cache_expire_time, unsafe_cache, slow_write_time,
inline_create, use_registry)
log.info("Estimated run time %.2f actual run time %.2f",
log.info("Estimated run time %.2f actual run time %.2f",
expected_run_time, actual_run_time)
log.info("Effective expiretime (min(cache_exp_time, exptime)) %s",
log.info("Effective expiretime (min(cache_exp_time, exptime)) %s",
effective_expiretime)
log.info("Expected slow waits %s, Total slow waits %s",
log.info("Expected slow waits %s, Total slow waits %s",
expected_slow_waiters, slow_waiters[0])
log.info("Total generations %s Max generations expected %s" % (
len(the_resource), expected_generations
@@ -324,7 +324,7 @@ class ConcurrencyTest(TestCase):
)
assert len(the_resource) <= expected_generations,\
"Number of resource generations %d exceeded "\
"expected %d" % (len(the_resource),
"expected %d" % (len(the_resource),
expected_generations)
class DogpileTest(TestCase):

234
tests/core/test_lock.py Normal file
View File

@@ -0,0 +1,234 @@
from unittest import TestCase
import time
import threading
from dogpile.core import Lock, NeedRegenerationException
from dogpile.core.nameregistry import NameRegistry
import contextlib
import math
import logging
log = logging.getLogger(__name__)
class ConcurrencyTest(TestCase):
# expiretime, time to create, num usages, time spend using, delay btw usage
_assertion_lock = threading.Lock()
def test_quick(self):
self._test_multi(
10, 2, .5, 50, .05, .1,
)
def test_slow(self):
self._test_multi(
10, 5, 2, 50, .1, .1,
)
def test_return_while_in_progress(self):
self._test_multi(
10, 5, 2, 50, 1, .1
)
def test_get_value_plus_created_long_create(self):
self._test_multi(
10, 2, 2.5, 50, .05, .1,
)
def test_get_value_plus_created_registry_unsafe_cache(self):
self._test_multi(
10, 1, .6, 100, .05, .1,
cache_expire_time='unsafe'
)
def test_get_value_plus_created_registry_safe_cache_quick(self):
self._test_multi(
10, 2, .5, 50, .05, .1,
cache_expire_time='safe'
)
def test_get_value_plus_created_registry_safe_cache_slow(self):
self._test_multi(
10, 5, 2, 50, .1, .1,
cache_expire_time='safe'
)
def _assert_synchronized(self):
acq = self._assertion_lock.acquire(False)
assert acq, "Could not acquire"
@contextlib.contextmanager
def go():
try:
yield {}
except:
raise
finally:
self._assertion_lock.release()
return go()
def _assert_log(self, cond, msg, *args):
if cond:
log.debug(msg, *args)
else:
log.error("Assertion failed: " + msg, *args)
assert False, msg % args
def _test_multi(self, num_threads,
expiretime,
creation_time,
num_usages,
usage_time,
delay_time,
cache_expire_time=None):
mutex = threading.Lock()
unsafe_cache = False
if cache_expire_time:
if cache_expire_time == 'unsafe':
unsafe_cache = True
cache_expire_time = expiretime * .8
elif cache_expire_time == 'safe':
cache_expire_time = (expiretime + creation_time) * 1.1
else:
assert False, cache_expire_time
log.info("Cache expire time: %s", cache_expire_time)
effective_expiretime = min(cache_expire_time, expiretime)
else:
effective_expiretime = expiretime
effective_creation_time = creation_time
max_stale = (effective_expiretime + effective_creation_time +
usage_time + delay_time) * 1.1
the_resource = []
slow_waiters = [0]
failures = [0]
def create_resource():
with self._assert_synchronized():
log.debug("creating resource, will take %f sec" % creation_time)
time.sleep(creation_time)
the_resource.append(time.time())
value = the_resource[-1]
log.debug("finished creating resource")
return value, time.time()
def get_value():
if not the_resource:
raise NeedRegenerationException()
if cache_expire_time:
if time.time() - the_resource[-1] > cache_expire_time:
# should never hit a cache invalidation
# if we've set expiretime below the cache
# expire time (assuming a cache which
# honors this).
self._assert_log(
cache_expire_time < expiretime,
"Cache expiration hit, cache "
"expire time %s, expiretime %s",
cache_expire_time,
expiretime,
)
raise NeedRegenerationException()
return the_resource[-1], the_resource[-1]
def use_dogpile():
try:
for i in range(num_usages):
now = time.time()
with Lock(mutex, create_resource, get_value, expiretime) as value:
waited = time.time() - now
if waited > .01:
slow_waiters[0] += 1
check_value(value, waited)
time.sleep(usage_time)
time.sleep(delay_time)
except:
log.error("thread failed", exc_info=True)
failures[0] += 1
def check_value(value, waited):
assert value
# time since the current resource was
# created
time_since_create = time.time() - value
self._assert_log(
time_since_create < max_stale,
"Time since create %.4f max stale time %s, "
"total waited %s",
time_since_create, max_stale,
slow_waiters[0]
)
started_at = time.time()
threads = []
for i in range(num_threads):
t = threading.Thread(target=use_dogpile)
t.start()
threads.append(t)
for t in threads:
t.join()
actual_run_time = time.time() - started_at
# time spent starts with num usages * time per usage, with a 10% fudge
expected_run_time = (num_usages * (usage_time + delay_time)) * 1.1
expected_generations = math.ceil(expected_run_time / effective_expiretime)
if unsafe_cache:
expected_slow_waiters = expected_generations * num_threads
else:
expected_slow_waiters = expected_generations + num_threads - 1
# time spent also increments by one wait period in the beginning...
expected_run_time += effective_creation_time
# and a fudged version of the periodic waiting time anticipated
# for a single thread...
expected_run_time += (expected_slow_waiters * effective_creation_time) / num_threads
expected_run_time *= 1.1
log.info("Test Summary")
log.info("num threads: %s; expiretime: %s; creation_time: %s; "
"num_usages: %s; "
"usage_time: %s; delay_time: %s",
num_threads, expiretime, creation_time, num_usages,
usage_time, delay_time
)
log.info("cache expire time: %s; unsafe cache: %s",
cache_expire_time, unsafe_cache)
log.info("Estimated run time %.2f actual run time %.2f",
expected_run_time, actual_run_time)
log.info("Effective expiretime (min(cache_exp_time, exptime)) %s",
effective_expiretime)
log.info("Expected slow waits %s, Total slow waits %s",
expected_slow_waiters, slow_waiters[0])
log.info("Total generations %s Max generations expected %s" % (
len(the_resource), expected_generations
))
assert not failures[0], "%s failures occurred" % failures[0]
assert actual_run_time <= expected_run_time
assert slow_waiters[0] <= expected_slow_waiters, \
"Number of slow waiters %s exceeds expected slow waiters %s" % (
slow_waiters[0],
expected_slow_waiters
)
assert len(the_resource) <= expected_generations,\
"Number of resource generations %d exceeded "\
"expected %d" % (len(the_resource),
expected_generations)