From ba1a93b9b0f7b780ec55a6d69be6038db251a7b1 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 23 Mar 2022 14:02:26 -0400 Subject: [PATCH] re-add `'blockchain.address.listunspent` and `blockchain.address.getbalance` to scribe-hub --- scribe/db/db.py | 12 +++++----- scribe/hub/mempool.py | 39 ++++++++++++++++++++++++++++++++ scribe/hub/session.py | 52 ++++++++++++++++++++++++------------------- 3 files changed, 73 insertions(+), 30 deletions(-) diff --git a/scribe/db/db.py b/scribe/db/db.py index fa097c0..707142c 100644 --- a/scribe/db/db.py +++ b/scribe/db/db.py @@ -1103,13 +1103,11 @@ class HubDB: async def all_utxos(self, hashX): """Return all UTXOs for an address sorted in no particular order.""" def read_utxos(): - utxos = [] - utxos_append = utxos.append fs_tx_hash = self.fs_tx_hash - for k, v in self.prefix_db.utxo.iterate(prefix=(hashX, )): - tx_hash, height = fs_tx_hash(k.tx_num) - utxos_append(UTXO(k.tx_num, k.nout, tx_hash, height, v.amount)) - return utxos + utxo_info = [ + (k.tx_num, k.nout, v.amount) for k, v in self.prefix_db.utxo.iterate(prefix=(hashX, )) + ] + return [UTXO(tx_num, nout, *fs_tx_hash(tx_num), value=value) for (tx_num, nout, value) in utxo_info] while True: utxos = await asyncio.get_event_loop().run_in_executor(self._executor, read_utxos) @@ -1117,7 +1115,7 @@ class HubDB: return utxos self.logger.warning(f'all_utxos: tx hash not ' f'found (reorg?), retrying...') - await sleep(0.25) + await asyncio.sleep(0.25) async def lookup_utxos(self, prevouts): def lookup_utxos(): diff --git a/scribe/hub/mempool.py b/scribe/hub/mempool.py index 0a0cc34..04363ed 100644 --- a/scribe/hub/mempool.py +++ b/scribe/hub/mempool.py @@ -9,6 +9,7 @@ from prometheus_client import Histogram import rocksdb.errors from scribe import PROMETHEUS_NAMESPACE from scribe.common import HISTOGRAM_BUCKETS +from scribe.db.common import UTXO from scribe.blockchain.transaction.deserializer import Deserializer if typing.TYPE_CHECKING: @@ -146,6 +147,44 @@ class MemPool: result.append(MemPoolTxSummary(tx_hash, tx.fee, has_ui)) return result + def unordered_UTXOs(self, hashX): + """Return an unordered list of UTXO named tuples from mempool + transactions that pay to hashX. + This does not consider if any other mempool transactions spend + the outputs. + """ + utxos = [] + for tx_hash in self.touched_hashXs.get(hashX, ()): + tx = self.txs.get(tx_hash) + for pos, (hX, value) in enumerate(tx.out_pairs): + if hX == hashX: + utxos.append(UTXO(-1, pos, tx_hash, 0, value)) + return utxos + + def potential_spends(self, hashX): + """Return a set of (prev_hash, prev_idx) pairs from mempool + transactions that touch hashX. + None, some or all of these may be spends of the hashX, but all + actual spends of it (in the DB or mempool) will be included. + """ + result = set() + for tx_hash in self.touched_hashXs.get(hashX, ()): + tx = self.txs[tx_hash] + result.update(tx.prevouts) + return result + + def balance_delta(self, hashX): + """Return the unconfirmed amount in the mempool for hashX. + Can be positive or negative. + """ + value = 0 + if hashX in self.touched_hashXs: + for h in self.touched_hashXs[hashX]: + tx = self.txs[h] + value -= sum(v for h168, v in tx.in_pairs if h168 == hashX) + value += sum(v for h168, v in tx.out_pairs if h168 == hashX) + return value + def get_mempool_height(self, tx_hash: bytes) -> int: # Height Progression # -2: not broadcast diff --git a/scribe/hub/session.py b/scribe/hub/session.py index 92ef367..b1cc13c 100644 --- a/scribe/hub/session.py +++ b/scribe/hub/session.py @@ -652,7 +652,6 @@ class LBRYElectrumX(asyncio.Protocol): MAX_CHUNK_SIZE = 40960 session_counter = itertools.count() - request_handlers: typing.Dict[str, typing.Callable] = {} RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE, labelnames=("method", "version"), buckets=HISTOGRAM_BUCKETS) NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)", @@ -885,6 +884,10 @@ class LBRYElectrumX(asyncio.Protocol): coro = self.address_subscribe elif method == 'blockchain.address.unsubscribe': coro = self.address_unsubscribe + elif method == 'blockchain.address.listunspent': + coro = self.address_listunspent + elif method == 'blockchain.address.getbalance': + coro = self.address_get_balance elif method == 'blockchain.estimatefee': coro = self.estimatefee elif method == 'blockchain.relayfee': @@ -1351,19 +1354,22 @@ class LBRYElectrumX(asyncio.Protocol): self.session_manager.mempool_statuses.pop(hashX, None) return status - # async def hashX_listunspent(self, hashX): - # """Return the list of UTXOs of a script hash, including mempool - # effects.""" - # utxos = await self.db.all_utxos(hashX) - # utxos = sorted(utxos) - # utxos.extend(await self.mempool.unordered_UTXOs(hashX)) - # spends = await self.mempool.potential_spends(hashX) - # - # return [{'tx_hash': hash_to_hex_str(utxo.tx_hash), - # 'tx_pos': utxo.tx_pos, - # 'height': utxo.height, 'value': utxo.value} - # for utxo in utxos - # if (utxo.tx_hash, utxo.tx_pos) not in spends] + async def hashX_listunspent(self, hashX: bytes): + """Return the list of UTXOs of a script hash, including mempool + effects.""" + utxos = await self.db.all_utxos(hashX) + utxos = sorted(utxos) + utxos.extend(self.mempool.unordered_UTXOs(hashX)) + spends = self.mempool.potential_spends(hashX) + + return [{'tx_hash': hash_to_hex_str(utxo.tx_hash), + 'tx_pos': utxo.tx_pos, + 'height': utxo.height, 'value': utxo.value} + for utxo in utxos + if (utxo.tx_hash, utxo.tx_pos) not in spends] + + async def address_listunspent(self, address: str): + return await self.hashX_listunspent(self.address_to_hashX(address)) async def hashX_subscribe(self, hashX, alias): self.hashX_subs[hashX] = alias @@ -1386,10 +1392,10 @@ class LBRYElectrumX(asyncio.Protocol): pass raise RPCError(BAD_REQUEST, f'{address} is not a valid address') - # async def address_get_balance(self, address): - # """Return the confirmed and unconfirmed balance of an address.""" - # hashX = self.address_to_hashX(address) - # return await self.get_balance(hashX) + async def address_get_balance(self, address): + """Return the confirmed and unconfirmed balance of an address.""" + hashX = self.address_to_hashX(address) + return await self.get_balance(hashX) async def address_get_history(self, address): """Return the confirmed and unconfirmed history of an address.""" @@ -1425,11 +1431,11 @@ class LBRYElectrumX(asyncio.Protocol): hashX = self.address_to_hashX(address) return await self.hashX_unsubscribe(hashX, address) - # async def get_balance(self, hashX): - # utxos = await self.db.all_utxos(hashX) - # confirmed = sum(utxo.value for utxo in utxos) - # unconfirmed = await self.mempool.balance_delta(hashX) - # return {'confirmed': confirmed, 'unconfirmed': unconfirmed} + async def get_balance(self, hashX): + utxos = await self.db.all_utxos(hashX) + confirmed = sum(utxo.value for utxo in utxos) + unconfirmed = self.mempool.balance_delta(hashX) + return {'confirmed': confirmed, 'unconfirmed': unconfirmed} # async def scripthash_get_balance(self, scripthash): # """Return the confirmed and unconfirmed balance of a scripthash."""