lbry-sdk/lbry/event.py

248 lines
7.2 KiB
Python
Raw Normal View History

import time
import asyncio
2020-04-12 17:59:00 +02:00
import threading
2020-05-01 15:31:33 +02:00
import logging
from queue import Empty
from multiprocessing import Queue
2020-05-01 15:31:33 +02:00
log = logging.getLogger(__name__)
2018-05-25 08:03:25 +02:00
class BroadcastSubscription:
2020-05-01 15:31:33 +02:00
def __init__(self, controller: 'EventController', on_data, on_error, on_done):
2018-05-25 08:03:25 +02:00
self._controller = controller
self._previous = self._next = None
self._on_data = on_data
self._on_error = on_error
self._on_done = on_done
self.is_paused = False
self.is_canceled = False
self.is_closed = False
def pause(self):
self.is_paused = True
def resume(self):
self.is_paused = False
def cancel(self):
self._controller._cancel(self)
self.is_canceled = True
@property
def can_fire(self):
return not any((self.is_paused, self.is_canceled, self.is_closed))
def _add(self, data):
if self.can_fire and self._on_data is not None:
2018-11-19 04:54:00 +01:00
return self._on_data(data)
2018-05-25 08:03:25 +02:00
2018-10-15 04:16:51 +02:00
def _add_error(self, exception):
2018-05-25 08:03:25 +02:00
if self.can_fire and self._on_error is not None:
2018-11-19 04:54:00 +01:00
return self._on_error(exception)
2018-05-25 08:03:25 +02:00
def _close(self):
2018-11-19 04:54:00 +01:00
try:
if self.can_fire and self._on_done is not None:
return self._on_done()
finally:
self.is_closed = True
2018-05-25 08:03:25 +02:00
2020-05-01 15:31:33 +02:00
class EventController:
2018-05-25 08:03:25 +02:00
2019-08-07 07:48:40 +02:00
def __init__(self, merge_repeated_events=False):
2020-05-01 15:31:33 +02:00
self.stream = EventStream(self)
2018-05-25 08:03:25 +02:00
self._first_subscription = None
self._last_subscription = None
2019-08-07 07:48:40 +02:00
self._last_event = None
self._merge_repeated = merge_repeated_events
2018-05-25 08:03:25 +02:00
@property
def has_listener(self):
return self._first_subscription is not None
@property
def _iterate_subscriptions(self):
next_sub = self._first_subscription
while next_sub is not None:
subscription = next_sub
2018-05-25 08:03:25 +02:00
yield subscription
next_sub = next_sub._next
2018-05-25 08:03:25 +02:00
async def _notify(self, notify, *args):
2020-05-01 15:31:33 +02:00
try:
maybe_coroutine = notify(*args)
if maybe_coroutine is not None and asyncio.iscoroutine(maybe_coroutine):
2020-05-01 15:31:33 +02:00
await maybe_coroutine
except Exception as e:
log.exception(e)
raise
async def add(self, event):
if self._merge_repeated and event == self._last_event:
return
2019-08-07 07:48:40 +02:00
self._last_event = event
2020-05-01 15:31:33 +02:00
for subscription in self._iterate_subscriptions:
await self._notify(subscription._add, event)
2018-05-25 08:03:25 +02:00
async def add_all(self, events):
for event in events:
await self.add(event)
2020-05-01 15:31:33 +02:00
async def add_error(self, exception):
for subscription in self._iterate_subscriptions:
await self._notify(subscription._add_error, exception)
2018-05-25 08:03:25 +02:00
async def close(self):
2018-05-25 08:03:25 +02:00
for subscription in self._iterate_subscriptions:
await self._notify(subscription._close)
2018-05-25 08:03:25 +02:00
def _cancel(self, subscription):
previous = subscription._previous
next_sub = subscription._next
2018-05-25 08:03:25 +02:00
if previous is None:
self._first_subscription = next_sub
2018-05-25 08:03:25 +02:00
else:
previous._next = next_sub
if next_sub is None:
2018-05-25 08:03:25 +02:00
self._last_subscription = previous
else:
next_sub._previous = previous
2018-05-25 08:03:25 +02:00
def _listen(self, on_data, on_error, on_done):
subscription = BroadcastSubscription(self, on_data, on_error, on_done)
old_last = self._last_subscription
self._last_subscription = subscription
subscription._previous = old_last
subscription._next = None
if old_last is None:
self._first_subscription = subscription
else:
old_last._next = subscription
return subscription
2020-05-01 15:31:33 +02:00
class EventStream:
2018-05-25 08:03:25 +02:00
def __init__(self, controller: EventController):
2018-05-25 08:03:25 +02:00
self._controller = controller
2020-05-01 15:31:33 +02:00
def listen(self, on_data, on_error=None, on_done=None) -> BroadcastSubscription:
2018-05-25 08:03:25 +02:00
return self._controller._listen(on_data, on_error, on_done)
2018-10-15 04:16:51 +02:00
def where(self, condition) -> asyncio.Future:
2020-05-01 15:31:33 +02:00
future = asyncio.get_running_loop().create_future()
2018-05-25 15:54:01 +02:00
def where_test(value):
if condition(value):
2018-10-15 04:16:51 +02:00
self._cancel_and_callback(subscription, future, value)
2018-05-25 15:54:01 +02:00
subscription = self.listen(
where_test,
2018-10-15 04:16:51 +02:00
lambda exception: self._cancel_and_error(subscription, future, exception)
2018-05-25 15:54:01 +02:00
)
2018-10-15 04:16:51 +02:00
return future
2018-05-25 15:54:01 +02:00
2018-05-25 08:03:25 +02:00
@property
2020-05-01 15:31:33 +02:00
def first(self) -> asyncio.Future:
2020-08-20 16:43:44 +02:00
future = asyncio.get_running_loop().create_future()
2018-05-25 08:03:25 +02:00
subscription = self.listen(
2019-08-23 06:50:25 +02:00
lambda value: not future.done() and self._cancel_and_callback(subscription, future, value),
lambda exception: not future.done() and self._cancel_and_error(subscription, future, exception)
2018-05-25 08:03:25 +02:00
)
2018-10-15 04:16:51 +02:00
return future
2018-05-25 08:03:25 +02:00
@property
def last(self) -> asyncio.Future:
2020-08-20 16:43:44 +02:00
future = asyncio.get_running_loop().create_future()
value = None
2020-06-05 06:35:22 +02:00
def update_value(_value):
nonlocal value
2020-06-05 06:35:22 +02:00
value = _value
subscription = self.listen(
2020-06-05 06:35:22 +02:00
update_value,
lambda exception: not future.done() and self._cancel_and_error(subscription, future, exception),
lambda: not future.done() and self._cancel_and_callback(subscription, future, value),
)
return future
2018-05-25 08:03:25 +02:00
@staticmethod
2018-10-15 04:16:51 +02:00
def _cancel_and_callback(subscription: BroadcastSubscription, future: asyncio.Future, value):
2018-05-25 08:03:25 +02:00
subscription.cancel()
2018-10-15 04:16:51 +02:00
future.set_result(value)
2018-05-25 08:03:25 +02:00
@staticmethod
2018-10-15 04:16:51 +02:00
def _cancel_and_error(subscription: BroadcastSubscription, future: asyncio.Future, exception):
2018-05-25 08:03:25 +02:00
subscription.cancel()
2018-10-15 04:16:51 +02:00
future.set_exception(exception)
2020-04-12 17:59:00 +02:00
class EventQueuePublisher(threading.Thread):
2020-05-01 15:31:33 +02:00
STOP = 'STOP'
2020-04-12 17:59:00 +02:00
def __init__(self, queue: Queue, event_controller: EventController):
2020-04-12 17:59:00 +02:00
super().__init__()
self.queue = queue
2020-05-01 15:31:33 +02:00
self.event_controller = event_controller
self.loop = None
2020-06-05 06:35:22 +02:00
@staticmethod
def message_to_event(message):
return message
def start(self):
2020-04-12 17:59:00 +02:00
self.loop = asyncio.get_running_loop()
super().start()
2020-04-12 17:59:00 +02:00
def run(self):
queue_get_timeout = 0.2
buffer_drain_size = 100
buffer_drain_timeout = 0.1
buffer = []
last_drained_ms_ago = time.perf_counter()
2020-04-12 17:59:00 +02:00
while True:
try:
msg = self.queue.get(timeout=queue_get_timeout)
if msg != self.STOP:
buffer.append(msg)
except Empty:
msg = None
drain = any((
len(buffer) >= buffer_drain_size,
(time.perf_counter() - last_drained_ms_ago) >= buffer_drain_timeout,
msg == self.STOP
))
if drain and buffer:
asyncio.run_coroutine_threadsafe(
self.event_controller.add_all([
self.message_to_event(msg) for msg in buffer
]), self.loop
)
buffer.clear()
last_drained_ms_ago = time.perf_counter()
2020-04-12 17:59:00 +02:00
if msg == self.STOP:
return
def stop(self):
self.queue.put(self.STOP)
2020-08-20 16:43:44 +02:00
if self.is_alive():
self.join()
2020-04-12 17:59:00 +02:00
def __enter__(self):
self.start()
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()