forked from LBRYCommunity/lbry-sdk
Merge pull request #3098 from lbryio/cached-txids
store txids in memory, faster address subscription/history
This commit is contained in:
commit
5cd2ebc960
3 changed files with 42 additions and 31 deletions
|
@ -444,6 +444,7 @@ class BlockProcessor:
|
|||
|
||||
append_hashX_by_tx(hashXs)
|
||||
update_touched(hashXs)
|
||||
self.db.total_transactions.append(tx_hash)
|
||||
tx_num += 1
|
||||
|
||||
self.db.history.add_unflushed(hashXs_by_tx, self.tx_count)
|
||||
|
@ -491,6 +492,7 @@ class BlockProcessor:
|
|||
undo_entry_len = 12 + HASHX_LEN
|
||||
|
||||
for tx, tx_hash in reversed(txs):
|
||||
self.db.total_transactions.pop()
|
||||
for idx, txout in enumerate(tx.outputs):
|
||||
# Spend the TX outputs. Be careful with unspendable
|
||||
# outputs - we didn't save those in the first place.
|
||||
|
|
|
@ -173,20 +173,20 @@ class History:
|
|||
|
||||
self.logger.info(f'backing up removed {nremoves:,d} history entries')
|
||||
|
||||
def get_txnums(self, hashX, limit=1000):
|
||||
"""Generator that returns an unpruned, sorted list of tx_nums in the
|
||||
history of a hashX. Includes both spending and receiving
|
||||
transactions. By default yields at most 1000 entries. Set
|
||||
limit to None to get them all. """
|
||||
limit = util.resolve_limit(limit)
|
||||
for key, hist in self.db.iterator(prefix=hashX):
|
||||
a = array.array('I')
|
||||
a.frombytes(hist)
|
||||
for tx_num in a:
|
||||
if limit == 0:
|
||||
return
|
||||
yield tx_num
|
||||
limit -= 1
|
||||
# def get_txnums(self, hashX, limit=1000):
|
||||
# """Generator that returns an unpruned, sorted list of tx_nums in the
|
||||
# history of a hashX. Includes both spending and receiving
|
||||
# transactions. By default yields at most 1000 entries. Set
|
||||
# limit to None to get them all. """
|
||||
# limit = util.resolve_limit(limit)
|
||||
# for key, hist in self.db.iterator(prefix=hashX):
|
||||
# a = array.array('I')
|
||||
# a.frombytes(hist)
|
||||
# for tx_num in a:
|
||||
# if limit == 0:
|
||||
# return
|
||||
# yield tx_num
|
||||
# limit -= 1
|
||||
|
||||
#
|
||||
# History compaction
|
||||
|
|
|
@ -96,6 +96,7 @@ class LevelDB:
|
|||
|
||||
self._block_txs_cache = pylru.lrucache(50000)
|
||||
self._merkle_tx_cache = pylru.lrucache(100000)
|
||||
self.total_transactions = None
|
||||
|
||||
async def _read_tx_counts(self):
|
||||
if self.tx_counts is not None:
|
||||
|
@ -119,6 +120,18 @@ class LevelDB:
|
|||
else:
|
||||
assert self.db_tx_count == 0
|
||||
|
||||
async def _read_txids(self):
|
||||
def get_txids():
|
||||
return list(self.tx_db.iterator(prefix=TX_HASH_PREFIX, include_key=False))
|
||||
|
||||
start = time.perf_counter()
|
||||
self.logger.info("loading txids")
|
||||
txids = await asyncio.get_event_loop().run_in_executor(self.executor, get_txids)
|
||||
assert len(txids) == len(self.tx_counts) == 0 or len(txids) == self.tx_counts[-1]
|
||||
self.total_transactions = txids
|
||||
ts = time.perf_counter() - start
|
||||
self.logger.info("loaded %i txids in %ss", len(self.total_transactions), round(ts, 4))
|
||||
|
||||
async def _read_headers(self):
|
||||
if self.headers is not None:
|
||||
return
|
||||
|
@ -169,6 +182,8 @@ class LevelDB:
|
|||
|
||||
# Read TX counts (requires meta directory)
|
||||
await self._read_tx_counts()
|
||||
if self.total_transactions is None:
|
||||
await self._read_txids()
|
||||
await self._read_headers()
|
||||
|
||||
def close(self):
|
||||
|
@ -452,10 +467,8 @@ class LevelDB:
|
|||
If the tx_height is not on disk, returns (None, tx_height)."""
|
||||
tx_height = bisect_right(self.tx_counts, tx_num)
|
||||
if tx_height > self.db_height:
|
||||
tx_hash = None
|
||||
else:
|
||||
tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
|
||||
return tx_hash, tx_height
|
||||
return None, tx_height
|
||||
return self.total_transactions[tx_num], tx_height
|
||||
|
||||
async def tx_merkle(self, tx_num, tx_height):
|
||||
if tx_height == -1:
|
||||
|
@ -557,38 +570,34 @@ class LevelDB:
|
|||
# return hashx_history
|
||||
# return hashx_history
|
||||
|
||||
def iter_tx_heights():
|
||||
def read_history():
|
||||
db_height = self.db_height
|
||||
tx_counts = self.tx_counts
|
||||
tx_db_get = self.tx_db.get
|
||||
pack_be_uint64 = util.pack_be_uint64
|
||||
|
||||
cnt = 0
|
||||
txs = []
|
||||
|
||||
for key, hist in self.history.db.iterator(prefix=hashX):
|
||||
for hist in self.history.db.iterator(prefix=hashX, include_key=False):
|
||||
a = array.array('I')
|
||||
a.frombytes(hist)
|
||||
for tx_num in a:
|
||||
tx_height = bisect_right(tx_counts, tx_num)
|
||||
if tx_height > db_height:
|
||||
yield None, tx_height
|
||||
return
|
||||
yield tx_db_get(TX_HASH_PREFIX + pack_be_uint64(tx_num)), tx_height
|
||||
txs.append((tx_num, tx_height))
|
||||
cnt += 1
|
||||
if limit and cnt >= limit:
|
||||
return
|
||||
break
|
||||
if limit and cnt >= limit:
|
||||
return
|
||||
|
||||
def read_history():
|
||||
return [
|
||||
(tx_num, tx_height) for (tx_num, tx_height) in iter_tx_heights()
|
||||
]
|
||||
break
|
||||
return txs
|
||||
|
||||
while True:
|
||||
history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history)
|
||||
if not history or history[-1][0] is not None:
|
||||
return history
|
||||
if history is not None:
|
||||
return [(self.total_transactions[tx_num], tx_height) for (tx_num, tx_height) in history]
|
||||
self.logger.warning(f'limited_history: tx hash '
|
||||
f'not found (reorg?), retrying...')
|
||||
await sleep(0.25)
|
||||
|
|
Loading…
Reference in a new issue