lbry-sdk/lbry/event.py
2020-04-25 08:29:25 -04:00

191 lines
5.7 KiB
Python

import asyncio
import threading
import multiprocessing
class BroadcastSubscription:
def __init__(self, controller, on_data, on_error, on_done):
self._controller = controller
self._previous = self._next = None
self._on_data = on_data
self._on_error = on_error
self._on_done = on_done
self.is_paused = False
self.is_canceled = False
self.is_closed = False
def pause(self):
self.is_paused = True
def resume(self):
self.is_paused = False
def cancel(self):
self._controller._cancel(self)
self.is_canceled = True
@property
def can_fire(self):
return not any((self.is_paused, self.is_canceled, self.is_closed))
def _add(self, data):
if self.can_fire and self._on_data is not None:
return self._on_data(data)
def _add_error(self, exception):
if self.can_fire and self._on_error is not None:
return self._on_error(exception)
def _close(self):
try:
if self.can_fire and self._on_done is not None:
return self._on_done()
finally:
self.is_closed = True
class StreamController:
def __init__(self, merge_repeated_events=False):
self.stream = Stream(self)
self._first_subscription = None
self._last_subscription = None
self._last_event = None
self._merge_repeated = merge_repeated_events
@property
def has_listener(self):
return self._first_subscription is not None
@property
def _iterate_subscriptions(self):
next_sub = self._first_subscription
while next_sub is not None:
subscription = next_sub
next_sub = next_sub._next
yield subscription
def _notify_and_ensure_future(self, notify):
tasks = []
for subscription in self._iterate_subscriptions:
maybe_coroutine = notify(subscription)
if asyncio.iscoroutine(maybe_coroutine):
tasks.append(maybe_coroutine)
if tasks:
return asyncio.ensure_future(asyncio.wait(tasks))
else:
f = asyncio.get_event_loop().create_future()
f.set_result(None)
return f
def add(self, event):
skip = self._merge_repeated and event == self._last_event
self._last_event = event
return self._notify_and_ensure_future(
lambda subscription: None if skip else subscription._add(event)
)
def add_error(self, exception):
return self._notify_and_ensure_future(
lambda subscription: subscription._add_error(exception)
)
def close(self):
for subscription in self._iterate_subscriptions:
subscription._close()
def _cancel(self, subscription):
previous = subscription._previous
next_sub = subscription._next
if previous is None:
self._first_subscription = next_sub
else:
previous._next = next_sub
if next_sub is None:
self._last_subscription = previous
else:
next_sub._previous = previous
subscription._next = subscription._previous = subscription
def _listen(self, on_data, on_error, on_done):
subscription = BroadcastSubscription(self, on_data, on_error, on_done)
old_last = self._last_subscription
self._last_subscription = subscription
subscription._previous = old_last
subscription._next = None
if old_last is None:
self._first_subscription = subscription
else:
old_last._next = subscription
return subscription
class Stream:
def __init__(self, controller):
self._controller = controller
def listen(self, on_data, on_error=None, on_done=None):
return self._controller._listen(on_data, on_error, on_done)
def where(self, condition) -> asyncio.Future:
future = asyncio.get_event_loop().create_future()
def where_test(value):
if condition(value):
self._cancel_and_callback(subscription, future, value)
subscription = self.listen(
where_test,
lambda exception: self._cancel_and_error(subscription, future, exception)
)
return future
@property
def first(self):
future = asyncio.get_event_loop().create_future()
subscription = self.listen(
lambda value: not future.done() and self._cancel_and_callback(subscription, future, value),
lambda exception: not future.done() and self._cancel_and_error(subscription, future, exception)
)
return future
@staticmethod
def _cancel_and_callback(subscription: BroadcastSubscription, future: asyncio.Future, value):
subscription.cancel()
future.set_result(value)
@staticmethod
def _cancel_and_error(subscription: BroadcastSubscription, future: asyncio.Future, exception):
subscription.cancel()
future.set_exception(exception)
class EventQueuePublisher(threading.Thread):
STOP = object()
def __init__(self, queue: multiprocessing.Queue, stream_controller: StreamController):
super().__init__()
self.queue = queue
self.stream_controller = stream_controller
self.loop = asyncio.get_running_loop()
def run(self):
while True:
msg = self.queue.get()
if msg == self.STOP:
return
self.loop.call_soon_threadsafe(self.stream_controller.add, msg)
def stop(self):
self.queue.put(self.STOP)
self.join()
def __enter__(self):
self.start()
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()