2020-05-22 18:40:21 -04:00
|
|
|
import asyncio
|
|
|
|
import logging
|
|
|
|
import multiprocessing as mp
|
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
|
2019-12-31 15:30:13 -05:00
|
|
|
from lbry.testcase import AsyncioTestCase
|
2020-05-22 18:40:21 -04:00
|
|
|
from lbry.event import EventController, EventQueuePublisher
|
2020-05-01 09:34:34 -04:00
|
|
|
from lbry.tasks import TaskGroup
|
2019-08-07 02:48:40 -03:00
|
|
|
|
2019-08-07 11:27:25 -03:00
|
|
|
|
|
|
|
class StreamControllerTestCase(AsyncioTestCase):
|
2020-05-01 09:34:34 -04:00
|
|
|
|
|
|
|
async def test_non_unique_events(self):
|
2019-08-07 02:48:40 -03:00
|
|
|
events = []
|
2020-05-01 09:34:34 -04:00
|
|
|
controller = EventController()
|
|
|
|
controller.stream.listen(events.append)
|
|
|
|
await controller.add("yo")
|
|
|
|
await controller.add("yo")
|
2019-10-06 00:12:01 +03:00
|
|
|
self.assertListEqual(events, ["yo", "yo"])
|
2019-08-07 02:48:40 -03:00
|
|
|
|
2020-05-01 09:34:34 -04:00
|
|
|
async def test_unique_events(self):
|
2019-08-07 02:48:40 -03:00
|
|
|
events = []
|
2020-05-01 09:34:34 -04:00
|
|
|
controller = EventController(merge_repeated_events=True)
|
|
|
|
controller.stream.listen(events.append)
|
|
|
|
await controller.add("yo")
|
|
|
|
await controller.add("yo")
|
2019-10-06 00:12:01 +03:00
|
|
|
self.assertListEqual(events, ["yo"])
|
2019-12-11 19:31:45 -03:00
|
|
|
|
2020-05-01 09:34:34 -04:00
|
|
|
async def test_sync_listener_errors(self):
|
2020-05-22 18:40:21 -04:00
|
|
|
def bad_listener(_):
|
2020-05-01 09:34:34 -04:00
|
|
|
raise ValueError('bad')
|
|
|
|
controller = EventController()
|
|
|
|
controller.stream.listen(bad_listener)
|
|
|
|
with self.assertRaises(ValueError):
|
|
|
|
await controller.add("yo")
|
|
|
|
|
|
|
|
async def test_async_listener_errors(self):
|
2020-05-22 18:40:21 -04:00
|
|
|
async def bad_listener(_):
|
2020-05-01 09:34:34 -04:00
|
|
|
raise ValueError('bad')
|
|
|
|
controller = EventController()
|
|
|
|
controller.stream.listen(bad_listener)
|
|
|
|
with self.assertRaises(ValueError):
|
|
|
|
await controller.add("yo")
|
|
|
|
|
2020-05-20 18:01:34 -04:00
|
|
|
async def test_first_event(self):
|
|
|
|
controller = EventController()
|
|
|
|
first = controller.stream.first
|
|
|
|
await controller.add("one")
|
|
|
|
second = controller.stream.first
|
|
|
|
await controller.add("two")
|
|
|
|
self.assertEqual("one", await first)
|
|
|
|
self.assertEqual("two", await second)
|
|
|
|
|
|
|
|
async def test_last_event(self):
|
|
|
|
controller = EventController()
|
|
|
|
last = controller.stream.last
|
|
|
|
await controller.add("one")
|
|
|
|
await controller.add("two")
|
|
|
|
await controller.close()
|
|
|
|
self.assertEqual("two", await last)
|
|
|
|
|
2019-12-11 19:31:45 -03:00
|
|
|
|
2020-05-22 18:40:21 -04:00
|
|
|
class TestEventQueuePublisher(AsyncioTestCase):
|
|
|
|
|
|
|
|
async def test_event_buffering_avoids_overloading_asyncio(self):
|
|
|
|
threads = 3
|
|
|
|
generate_events = 3000
|
|
|
|
expected_event_count = (threads * generate_events)-1
|
|
|
|
|
|
|
|
queue = mp.Queue()
|
|
|
|
executor = ThreadPoolExecutor(max_workers=threads)
|
|
|
|
controller = EventController()
|
|
|
|
events = []
|
|
|
|
|
|
|
|
async def event_logger(e):
|
|
|
|
await asyncio.sleep(0)
|
|
|
|
events.append(e)
|
|
|
|
|
|
|
|
controller.stream.listen(event_logger)
|
|
|
|
until_all_consumed = controller.stream.where(lambda _: len(events) == expected_event_count)
|
|
|
|
|
|
|
|
def event_producer(q, j):
|
|
|
|
for i in range(generate_events):
|
|
|
|
q.put(f'foo-{i}-{j}')
|
|
|
|
|
|
|
|
with EventQueuePublisher(queue, controller), self.assertLogs() as logs:
|
|
|
|
# assertLogs() requires that at least one message is logged
|
|
|
|
# this is that one message:
|
|
|
|
logging.getLogger().info("placeholder")
|
|
|
|
await asyncio.wait([
|
|
|
|
self.loop.run_in_executor(executor, event_producer, queue, j)
|
|
|
|
for j in range(threads)
|
|
|
|
])
|
|
|
|
await until_all_consumed
|
|
|
|
# assert that there were no WARNINGs from asyncio about slow tasks
|
|
|
|
# (should have exactly 1 log which is the placeholder above)
|
|
|
|
self.assertEqual(['INFO:root:placeholder'], logs.output)
|
|
|
|
|
|
|
|
|
2019-12-11 22:10:39 -05:00
|
|
|
class TaskGroupTestCase(AsyncioTestCase):
|
|
|
|
|
|
|
|
async def test_cancel_sets_it_done(self):
|
2019-12-11 19:31:45 -03:00
|
|
|
group = TaskGroup()
|
|
|
|
group.cancel()
|
2019-12-11 23:33:34 -03:00
|
|
|
self.assertTrue(group.done.is_set())
|