store txids in memory, faster address subscription/history

This commit is contained in:
Jack Robison 2020-11-25 16:08:04 -05:00
parent 23bb5598d5
commit 9b4afe9816
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 42 additions and 31 deletions

View file

@ -444,6 +444,7 @@ class BlockProcessor:
append_hashX_by_tx(hashXs) append_hashX_by_tx(hashXs)
update_touched(hashXs) update_touched(hashXs)
self.db.total_transactions.append(tx_hash)
tx_num += 1 tx_num += 1
self.db.history.add_unflushed(hashXs_by_tx, self.tx_count) self.db.history.add_unflushed(hashXs_by_tx, self.tx_count)
@ -491,6 +492,7 @@ class BlockProcessor:
undo_entry_len = 12 + HASHX_LEN undo_entry_len = 12 + HASHX_LEN
for tx, tx_hash in reversed(txs): for tx, tx_hash in reversed(txs):
self.db.total_transactions.pop()
for idx, txout in enumerate(tx.outputs): for idx, txout in enumerate(tx.outputs):
# Spend the TX outputs. Be careful with unspendable # Spend the TX outputs. Be careful with unspendable
# outputs - we didn't save those in the first place. # outputs - we didn't save those in the first place.

View file

@ -173,20 +173,20 @@ class History:
self.logger.info(f'backing up removed {nremoves:,d} history entries') self.logger.info(f'backing up removed {nremoves:,d} history entries')
def get_txnums(self, hashX, limit=1000): # def get_txnums(self, hashX, limit=1000):
"""Generator that returns an unpruned, sorted list of tx_nums in the # """Generator that returns an unpruned, sorted list of tx_nums in the
history of a hashX. Includes both spending and receiving # history of a hashX. Includes both spending and receiving
transactions. By default yields at most 1000 entries. Set # transactions. By default yields at most 1000 entries. Set
limit to None to get them all. """ # limit to None to get them all. """
limit = util.resolve_limit(limit) # limit = util.resolve_limit(limit)
for key, hist in self.db.iterator(prefix=hashX): # for key, hist in self.db.iterator(prefix=hashX):
a = array.array('I') # a = array.array('I')
a.frombytes(hist) # a.frombytes(hist)
for tx_num in a: # for tx_num in a:
if limit == 0: # if limit == 0:
return # return
yield tx_num # yield tx_num
limit -= 1 # limit -= 1
# #
# History compaction # History compaction

View file

@ -96,6 +96,7 @@ class LevelDB:
self._block_txs_cache = pylru.lrucache(50000) self._block_txs_cache = pylru.lrucache(50000)
self._merkle_tx_cache = pylru.lrucache(100000) self._merkle_tx_cache = pylru.lrucache(100000)
self.total_transactions = None
async def _read_tx_counts(self): async def _read_tx_counts(self):
if self.tx_counts is not None: if self.tx_counts is not None:
@ -119,6 +120,18 @@ class LevelDB:
else: else:
assert self.db_tx_count == 0 assert self.db_tx_count == 0
async def _read_txids(self):
def get_txids():
return list(self.tx_db.iterator(prefix=TX_HASH_PREFIX, include_key=False))
start = time.perf_counter()
self.logger.info("loading txids")
txids = await asyncio.get_event_loop().run_in_executor(self.executor, get_txids)
assert len(txids) == len(self.tx_counts) == 0 or len(txids) == self.tx_counts[-1]
self.total_transactions = txids
ts = time.perf_counter() - start
self.logger.info("loaded %i txids in %ss", len(self.total_transactions), round(ts, 4))
async def _read_headers(self): async def _read_headers(self):
if self.headers is not None: if self.headers is not None:
return return
@ -169,6 +182,8 @@ class LevelDB:
# Read TX counts (requires meta directory) # Read TX counts (requires meta directory)
await self._read_tx_counts() await self._read_tx_counts()
if self.total_transactions is None:
await self._read_txids()
await self._read_headers() await self._read_headers()
def close(self): def close(self):
@ -452,10 +467,8 @@ class LevelDB:
If the tx_height is not on disk, returns (None, tx_height).""" If the tx_height is not on disk, returns (None, tx_height)."""
tx_height = bisect_right(self.tx_counts, tx_num) tx_height = bisect_right(self.tx_counts, tx_num)
if tx_height > self.db_height: if tx_height > self.db_height:
tx_hash = None return None, tx_height
else: return self.total_transactions[tx_num], tx_height
tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
return tx_hash, tx_height
async def tx_merkle(self, tx_num, tx_height): async def tx_merkle(self, tx_num, tx_height):
if tx_height == -1: if tx_height == -1:
@ -557,38 +570,34 @@ class LevelDB:
# return hashx_history # return hashx_history
# return hashx_history # return hashx_history
def iter_tx_heights(): def read_history():
db_height = self.db_height db_height = self.db_height
tx_counts = self.tx_counts tx_counts = self.tx_counts
tx_db_get = self.tx_db.get tx_db_get = self.tx_db.get
pack_be_uint64 = util.pack_be_uint64 pack_be_uint64 = util.pack_be_uint64
cnt = 0 cnt = 0
txs = []
for key, hist in self.history.db.iterator(prefix=hashX): for hist in self.history.db.iterator(prefix=hashX, include_key=False):
a = array.array('I') a = array.array('I')
a.frombytes(hist) a.frombytes(hist)
for tx_num in a: for tx_num in a:
tx_height = bisect_right(tx_counts, tx_num) tx_height = bisect_right(tx_counts, tx_num)
if tx_height > db_height: if tx_height > db_height:
yield None, tx_height
return return
yield tx_db_get(TX_HASH_PREFIX + pack_be_uint64(tx_num)), tx_height txs.append((tx_num, tx_height))
cnt += 1 cnt += 1
if limit and cnt >= limit: if limit and cnt >= limit:
return break
if limit and cnt >= limit: if limit and cnt >= limit:
return break
return txs
def read_history():
return [
(tx_num, tx_height) for (tx_num, tx_height) in iter_tx_heights()
]
while True: while True:
history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history) history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history)
if not history or history[-1][0] is not None: if history is not None:
return history return [(self.total_transactions[tx_num], tx_height) for (tx_num, tx_height) in history]
self.logger.warning(f'limited_history: tx hash ' self.logger.warning(f'limited_history: tx hash '
f'not found (reorg?), retrying...') f'not found (reorg?), retrying...')
await sleep(0.25) await sleep(0.25)