diff --git a/torba/tests/client_tests/unit/test_stream_controller.py b/torba/tests/client_tests/unit/test_stream_controller.py new file mode 100644 index 000000000..f82ab699f --- /dev/null +++ b/torba/tests/client_tests/unit/test_stream_controller.py @@ -0,0 +1,19 @@ +import unittest +from torba.stream import StreamController + +class StreamControllerTestCase(unittest.TestCase): + def test_non_unique_events(self): + events = [] + controller = StreamController() + controller.stream.listen(on_data=events.append) + controller.add("yo") + controller.add("yo") + self.assertEqual(events, ["yo", "yo"]) + + def test_unique_events(self): + events = [] + controller = StreamController(merge_repeated_events=True) + controller.stream.listen(on_data=events.append) + controller.add("yo") + controller.add("yo") + self.assertEqual(events, ["yo"]) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 3ce706357..a7fad05c0 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -89,10 +89,10 @@ class BaseNetwork: self._on_connected_controller = StreamController() self.on_connected = self._on_connected_controller.stream - self._on_header_controller = StreamController() + self._on_header_controller = StreamController(merge_repeated_events=True) self.on_header = self._on_header_controller.stream - self._on_status_controller = StreamController() + self._on_status_controller = StreamController(merge_repeated_events=True) self.on_status = self._on_status_controller.stream self.subscription_controllers = { diff --git a/torba/torba/stream.py b/torba/torba/stream.py index 40589ade0..412a94525 100644 --- a/torba/torba/stream.py +++ b/torba/torba/stream.py @@ -45,10 +45,12 @@ class BroadcastSubscription: class StreamController: - def __init__(self): + def __init__(self, merge_repeated_events=False): self.stream = Stream(self) self._first_subscription = None self._last_subscription = None + self._last_event = None + self._merge_repeated = merge_repeated_events @property def has_listener(self): @@ -76,8 +78,10 @@ class StreamController: return f def add(self, event): + skip = self._merge_repeated and event == self._last_event + self._last_event = event return self._notify_and_ensure_future( - lambda subscription: subscription._add(event) + lambda subscription: None if skip else subscription._add(event) ) def add_error(self, exception):