lbry-sdk/torba/stream.py

166 lines
4.7 KiB
Python
Raw Normal View History

2018-05-25 15:54:01 +02:00
import six
2018-05-25 08:03:25 +02:00
from twisted.internet.defer import Deferred, DeferredLock, maybeDeferred, inlineCallbacks
from twisted.python.failure import Failure
2018-05-25 15:54:01 +02:00
if six.PY3:
import asyncio
2018-05-25 08:03:25 +02:00
def execute_serially(f):
_lock = DeferredLock()
@inlineCallbacks
def allow_only_one_at_a_time(*args, **kwargs):
yield _lock.acquire()
allow_only_one_at_a_time.is_running = True
try:
yield maybeDeferred(f, *args, **kwargs)
finally:
allow_only_one_at_a_time.is_running = False
_lock.release()
allow_only_one_at_a_time.is_running = False
return allow_only_one_at_a_time
class BroadcastSubscription:
def __init__(self, controller, on_data, on_error, on_done):
self._controller = controller
self._previous = self._next = None
self._on_data = on_data
self._on_error = on_error
self._on_done = on_done
self.is_paused = False
self.is_canceled = False
self.is_closed = False
def pause(self):
self.is_paused = True
def resume(self):
self.is_paused = False
def cancel(self):
self._controller._cancel(self)
self.is_canceled = True
@property
def can_fire(self):
return not any((self.is_paused, self.is_canceled, self.is_closed))
def _add(self, data):
if self.can_fire and self._on_data is not None:
self._on_data(data)
def _add_error(self, error, traceback):
if self.can_fire and self._on_error is not None:
self._on_error(error, traceback)
def _close(self):
if self.can_fire and self._on_done is not None:
self._on_done()
self.is_closed = True
class StreamController:
def __init__(self):
self.stream = Stream(self)
self._first_subscription = None
self._last_subscription = None
@property
def has_listener(self):
return self._first_subscription is not None
@property
def _iterate_subscriptions(self):
next = self._first_subscription
while next is not None:
subscription = next
next = next._next
yield subscription
def add(self, event):
for subscription in self._iterate_subscriptions:
subscription._add(event)
def add_error(self, error, traceback):
for subscription in self._iterate_subscriptions:
subscription._add_error(error, traceback)
def close(self):
for subscription in self._iterate_subscriptions:
subscription._close()
def _cancel(self, subscription):
previous = subscription._previous
next = subscription._next
if previous is None:
self._first_subscription = next
else:
previous._next = next
if next is None:
self._last_subscription = previous
else:
next._previous = previous
subscription._next = subscription._previous = subscription
def _listen(self, on_data, on_error, on_done):
subscription = BroadcastSubscription(self, on_data, on_error, on_done)
old_last = self._last_subscription
self._last_subscription = subscription
subscription._previous = old_last
subscription._next = None
if old_last is None:
self._first_subscription = subscription
else:
old_last._next = subscription
return subscription
class Stream:
def __init__(self, controller):
self._controller = controller
def listen(self, on_data, on_error=None, on_done=None):
return self._controller._listen(on_data, on_error, on_done)
2018-05-25 17:11:28 +02:00
def deferred_where(self, condition):
2018-05-25 15:54:01 +02:00
deferred = Deferred()
def where_test(value):
if condition(value):
self._cancel_and_callback(subscription, deferred, value)
subscription = self.listen(
where_test,
lambda error, traceback: self._cancel_and_error(subscription, deferred, error, traceback)
)
return deferred
2018-05-25 17:11:28 +02:00
def where(self, condition):
return self.deferred_where(condition).asFuture(asyncio.get_event_loop())
2018-05-25 15:54:01 +02:00
2018-05-25 08:03:25 +02:00
@property
def first(self):
deferred = Deferred()
subscription = self.listen(
lambda value: self._cancel_and_callback(subscription, deferred, value),
lambda error, traceback: self._cancel_and_error(subscription, deferred, error, traceback)
)
return deferred
@staticmethod
def _cancel_and_callback(subscription, deferred, value):
subscription.cancel()
deferred.callback(value)
@staticmethod
def _cancel_and_error(subscription, deferred, error, traceback):
subscription.cancel()
deferred.errback(Failure(error, exc_tb=traceback))