From d1f5b25418864ff8b0ec1ce0e5437863f660fb31 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 7 Aug 2019 02:48:40 -0300 Subject: [PATCH] merge repeated subscription events --- .../unit/test_stream_controller.py | 19 +++++++++++++++++++ torba/torba/client/basenetwork.py | 4 ++-- torba/torba/stream.py | 8 ++++++-- 3 files changed, 27 insertions(+), 4 deletions(-) create mode 100644 torba/tests/client_tests/unit/test_stream_controller.py diff --git a/torba/tests/client_tests/unit/test_stream_controller.py b/torba/tests/client_tests/unit/test_stream_controller.py new file mode 100644 index 000000000..f82ab699f --- /dev/null +++ b/torba/tests/client_tests/unit/test_stream_controller.py @@ -0,0 +1,19 @@ +import unittest +from torba.stream import StreamController + +class StreamControllerTestCase(unittest.TestCase): + def test_non_unique_events(self): + events = [] + controller = StreamController() + controller.stream.listen(on_data=events.append) + controller.add("yo") + controller.add("yo") + self.assertEqual(events, ["yo", "yo"]) + + def test_unique_events(self): + events = [] + controller = StreamController(merge_repeated_events=True) + controller.stream.listen(on_data=events.append) + controller.add("yo") + controller.add("yo") + self.assertEqual(events, ["yo"]) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 3ce706357..a7fad05c0 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -89,10 +89,10 @@ class BaseNetwork: self._on_connected_controller = StreamController() self.on_connected = self._on_connected_controller.stream - self._on_header_controller = StreamController() + self._on_header_controller = StreamController(merge_repeated_events=True) self.on_header = self._on_header_controller.stream - self._on_status_controller = StreamController() + self._on_status_controller = StreamController(merge_repeated_events=True) self.on_status = self._on_status_controller.stream self.subscription_controllers = { diff --git a/torba/torba/stream.py b/torba/torba/stream.py index 40589ade0..412a94525 100644 --- a/torba/torba/stream.py +++ b/torba/torba/stream.py @@ -45,10 +45,12 @@ class BroadcastSubscription: class StreamController: - def __init__(self): + def __init__(self, merge_repeated_events=False): self.stream = Stream(self) self._first_subscription = None self._last_subscription = None + self._last_event = None + self._merge_repeated = merge_repeated_events @property def has_listener(self): @@ -76,8 +78,10 @@ class StreamController: return f def add(self, event): + skip = self._merge_repeated and event == self._last_event + self._last_event = event return self._notify_and_ensure_future( - lambda subscription: subscription._add(event) + lambda subscription: None if skip else subscription._add(event) ) def add_error(self, exception):