From ca6d5937ba4a534e359e8fc6ec0db25a65ecea7f Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Fri, 25 May 2018 09:54:01 -0400 Subject: [PATCH] stream.where and stream.async_where --- torba/baseledger.py | 2 +- torba/basetransaction.py | 4 ++-- torba/stream.py | 21 +++++++++++++++++++++ 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/torba/baseledger.py b/torba/baseledger.py index 7a25af41e..99fb5f1d1 100644 --- a/torba/baseledger.py +++ b/torba/baseledger.py @@ -105,7 +105,7 @@ class BaseLedger: if address not in self.addresses: self.addresses[address] = Address(self.coin_class.address_to_hash160(address)) self.addresses[address].add_transaction(transaction) - self.transactions.setdefault(hexlify(transaction.id), transaction) + self.transactions.setdefault(transaction.id, transaction) self._on_transaction_controller.add(transaction) def has_address(self, address): diff --git a/torba/basetransaction.py b/torba/basetransaction.py index af854e970..ee23b0929 100644 --- a/torba/basetransaction.py +++ b/torba/basetransaction.py @@ -1,7 +1,7 @@ import six import logging from typing import List -from collections import namedtuple +from binascii import hexlify from torba.basecoin import BaseCoin from torba.basescript import BaseInputScript, BaseOutputScript @@ -161,7 +161,7 @@ class BaseTransaction: @property def id(self): if self._id is None: - self._id = self.hash[::-1] + self._id = hexlify(self.hash[::-1]) return self._id @property diff --git a/torba/stream.py b/torba/stream.py index 0f089dc5f..a5f3f63c9 100644 --- a/torba/stream.py +++ b/torba/stream.py @@ -1,6 +1,10 @@ +import six from twisted.internet.defer import Deferred, DeferredLock, maybeDeferred, inlineCallbacks from twisted.python.failure import Failure +if six.PY3: + import asyncio + def execute_serially(f): _lock = DeferredLock() @@ -124,6 +128,23 @@ class Stream: def listen(self, on_data, on_error=None, on_done=None): return self._controller._listen(on_data, on_error, on_done) + def where(self, condition): + deferred = Deferred() + + def where_test(value): + if condition(value): + self._cancel_and_callback(subscription, deferred, value) + + subscription = self.listen( + where_test, + lambda error, traceback: self._cancel_and_error(subscription, deferred, error, traceback) + ) + + return deferred + + def async_where(self, condition): + return self.where(condition).asFuture(asyncio.get_event_loop()) + @property def first(self): deferred = Deferred()