Source code for thorn._state
"""Internal state."""
from __future__ import absolute_import, unicode_literals
import threading
from vine.five import monotonic
__all__ = [
'current_app', 'set_current_app', 'set_default_app',
'app_or_default', 'buffer_events',
]
class _TLS(threading.local):
current_app = None
_tls = _TLS()
default_app = None
def current_app():
"""Return the currently active app for this thread."""
app = _tls.current_app
if app is None:
if default_app is None:
from thorn.app import Thorn
set_default_app(Thorn())
return default_app
return app
def set_current_app(app):
"""Set thread-local current app instance."""
_tls.current_app = app
def set_default_app(app):
"""Set default app instance."""
global default_app
default_app = app
def app_or_default(app):
"""Return app if defined, otherwise return the default app."""
return app if app is not None else current_app()
[docs]class buffer_events(object):
"""Context that enables event buffering.
The buffer will be flushed at context exit, or when
the buffer is flushed explicitly::
with buffer_events() as buffer:
...
buffer.flush() # <-- flush here.
# <-- # implicit flush here.
"""
def __init__(self, flush_freq=None, flush_timeout=None, app=None):
self.app = app_or_default(app)
self.flush_freq = flush_freq
self.flush_timeout = flush_timeout
self.flush_count = 0
self.flush_last = None
[docs] def flush(self):
self._flush(None)
[docs] def maybe_flush(self):
self.flush_count += 1
if self.should_flush():
self.flush()
[docs] def should_flush(self):
if self.flush_last is None:
self.flush_last = monotonic()
return (
not self.flush_count % self.flush_freq or
monotonic() > (self.flush_last or 0) + self.flush_timeout
)
def _flush(self, owner):
self.flush_last = monotonic()
self.app.flush_buffer(owner=owner)
def _enable(self):
self.app.enable_buffer(owner=self)
def _disable(self):
self.app.disable_buffer(owner=self)
def __enter__(self):
self._enable()
return self
def __exit__(self, *exc_inf):
self._flush(self)
self._disable()