event streams can now be "closed" and thus you can listen for .last event

This commit is contained in:
Lex Berezhny 2020-05-20 18:01:34 -04:00
parent b7ff6569e4
commit 4a9f9906a0
2 changed files with 46 additions and 5 deletions

View file

@ -70,9 +70,9 @@ class EventController:
next_sub = next_sub._next next_sub = next_sub._next
yield subscription yield subscription
async def _notify(self, notify, event): async def _notify(self, notify, *args):
try: try:
maybe_coroutine = notify(event) maybe_coroutine = notify(*args)
if asyncio.iscoroutine(maybe_coroutine): if asyncio.iscoroutine(maybe_coroutine):
await maybe_coroutine await maybe_coroutine
except Exception as e: except Exception as e:
@ -90,9 +90,9 @@ class EventController:
for subscription in self._iterate_subscriptions: for subscription in self._iterate_subscriptions:
await self._notify(subscription._add_error, exception) await self._notify(subscription._add_error, exception)
def close(self): async def close(self):
for subscription in self._iterate_subscriptions: for subscription in self._iterate_subscriptions:
subscription._close() await self._notify(subscription._close)
def _cancel(self, subscription): def _cancel(self, subscription):
previous = subscription._previous previous = subscription._previous
@ -151,6 +151,23 @@ class EventStream:
) )
return future return future
@property
def last(self) -> asyncio.Future:
future = asyncio.get_event_loop().create_future()
value = None
def update_value(v):
nonlocal value
value = v
subscription = self.listen(
lambda v: update_value(v),
lambda exception: not future.done() and self._cancel_and_error(subscription, future, exception),
lambda: not future.done() and self._cancel_and_callback(subscription, future, value),
)
return future
@staticmethod @staticmethod
def _cancel_and_callback(subscription: BroadcastSubscription, future: asyncio.Future, value): def _cancel_and_callback(subscription: BroadcastSubscription, future: asyncio.Future, value):
subscription.cancel() subscription.cancel()
@ -170,7 +187,14 @@ class EventQueuePublisher(threading.Thread):
super().__init__() super().__init__()
self.queue = queue self.queue = queue
self.event_controller = event_controller self.event_controller = event_controller
self.loop = None
def message_to_event(self, message):
return message
def start(self):
self.loop = asyncio.get_running_loop() self.loop = asyncio.get_running_loop()
super().start()
def run(self): def run(self):
while True: while True:
@ -178,7 +202,7 @@ class EventQueuePublisher(threading.Thread):
if msg == self.STOP: if msg == self.STOP:
return return
asyncio.run_coroutine_threadsafe( asyncio.run_coroutine_threadsafe(
self.event_controller.add(msg), self.loop self.event_controller.add(self.message_to_event(msg)), self.loop
) )
def stop(self): def stop(self):

View file

@ -37,6 +37,23 @@ class StreamControllerTestCase(AsyncioTestCase):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
await controller.add("yo") await controller.add("yo")
async def test_first_event(self):
controller = EventController()
first = controller.stream.first
await controller.add("one")
second = controller.stream.first
await controller.add("two")
self.assertEqual("one", await first)
self.assertEqual("two", await second)
async def test_last_event(self):
controller = EventController()
last = controller.stream.last
await controller.add("one")
await controller.add("two")
await controller.close()
self.assertEqual("two", await last)
class TaskGroupTestCase(AsyncioTestCase): class TaskGroupTestCase(AsyncioTestCase):