This commit is contained in:
Jack Robison 2021-07-08 16:08:33 -04:00 committed by Victor Shyba
parent 768934e1cc
commit fd2753b95a
3 changed files with 23 additions and 100 deletions

View file

@ -224,9 +224,9 @@ class BlockProcessor:
################################# #################################
# txo to pending claim # txo to pending claim
self.txo_to_claim: typing.Dict[Tuple[int, int], StagedClaimtrieItem] = {} self.txo_to_claim: Dict[Tuple[int, int], StagedClaimtrieItem] = {}
# claim hash to pending claim txo # claim hash to pending claim txo
self.claim_hash_to_txo: typing.Dict[bytes, Tuple[int, int]] = {} self.claim_hash_to_txo: Dict[bytes, Tuple[int, int]] = {}
# claim hash to lists of pending support txos # claim hash to lists of pending support txos
self.support_txos_by_claim: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list) self.support_txos_by_claim: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list)
# support txo: (supported claim hash, support amount) # support txo: (supported claim hash, support amount)
@ -270,7 +270,10 @@ class BlockProcessor:
to_send_es = set(self.touched_claims_to_send_es) to_send_es = set(self.touched_claims_to_send_es)
to_send_es.update(self.pending_reposted.difference(self.removed_claims_to_send_es)) to_send_es.update(self.pending_reposted.difference(self.removed_claims_to_send_es))
to_send_es.update({k for k, v in self.pending_channel_counts.items() if v != 0}.difference(self.removed_claims_to_send_es)) to_send_es.update(
{k for k, v in self.pending_channel_counts.items() if v != 0}.difference(
self.removed_claims_to_send_es)
)
for claim_hash in self.removed_claims_to_send_es: for claim_hash in self.removed_claims_to_send_es:
yield 'delete', claim_hash.hex() yield 'delete', claim_hash.hex()
@ -372,7 +375,6 @@ class BlockProcessor:
return await self.daemon.raw_blocks(hex_hashes) return await self.daemon.raw_blocks(hex_hashes)
try: try:
await self.flush(True)
start, last, hashes = await self.reorg_hashes(count) start, last, hashes = await self.reorg_hashes(count)
# Reverse and convert to hex strings. # Reverse and convert to hex strings.
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
@ -399,15 +401,7 @@ class BlockProcessor:
The hashes are returned in order of increasing height. Start The hashes are returned in order of increasing height. Start
is the height of the first hash, last of the last. is the height of the first hash, last of the last.
""" """
start, count = await self.calc_reorg_range(count)
last = start + count - 1
s = '' if count == 1 else 's'
self.logger.info(f'chain was reorganised replacing {count:,d} '
f'block{s} at heights {start:,d}-{last:,d}')
return start, last, await self.db.fs_block_hashes(start, count)
async def calc_reorg_range(self, count: Optional[int]):
"""Calculate the reorg range""" """Calculate the reorg range"""
def diff_pos(hashes1, hashes2): def diff_pos(hashes1, hashes2):
@ -436,8 +430,12 @@ class BlockProcessor:
count = (self.height - start) + 1 count = (self.height - start) + 1
else: else:
start = (self.height - count) + 1 start = (self.height - count) + 1
last = start + count - 1
s = '' if count == 1 else 's'
self.logger.info(f'chain was reorganised replacing {count:,d} '
f'block{s} at heights {start:,d}-{last:,d}')
return start, count return start, last, await self.db.fs_block_hashes(start, count)
# - Flushing # - Flushing
def flush_data(self): def flush_data(self):
@ -447,20 +445,11 @@ class BlockProcessor:
self.block_txs, self.db_op_stack, self.undo_infos, self.utxo_cache, self.block_txs, self.db_op_stack, self.undo_infos, self.utxo_cache,
self.db_deletes, self.tip, self.undo_claims) self.db_deletes, self.tip, self.undo_claims)
async def flush(self, flush_utxos): async def flush(self):
def flush(): def flush():
self.db.flush_dbs(self.flush_data()) self.db.flush_dbs(self.flush_data())
await self.run_in_thread_with_lock(flush) await self.run_in_thread_with_lock(flush)
async def _maybe_flush(self):
# If caught up, flush everything as client queries are
# performed on the DB.
if self._caught_up_event.is_set():
await self.flush(True)
elif time.perf_counter() > self.next_cache_check:
await self.flush(True)
self.next_cache_check = time.perf_counter() + 30
def _add_claim_or_update(self, height: int, txo: 'Output', tx_hash: bytes, tx_num: int, nout: int, def _add_claim_or_update(self, height: int, txo: 'Output', tx_hash: bytes, tx_num: int, nout: int,
spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]]): spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]]):
try: try:
@ -1494,14 +1483,12 @@ class BlockProcessor:
# Flush everything but with first_sync->False state. # Flush everything but with first_sync->False state.
first_sync = self.db.first_sync first_sync = self.db.first_sync
self.db.first_sync = False self.db.first_sync = False
await self.flush(True) await self.flush()
if first_sync: if first_sync:
self.logger.info(f'{lbry.__version__} synced to ' self.logger.info(f'{lbry.__version__} synced to '
f'height {self.height:,d}, halting here.') f'height {self.height:,d}, halting here.')
self.shutdown_event.set() self.shutdown_event.set()
# --- External API
async def fetch_and_process_blocks(self, caught_up_event): async def fetch_and_process_blocks(self, caught_up_event):
"""Fetch, process and index blocks from the daemon. """Fetch, process and index blocks from the daemon.
@ -1536,69 +1523,6 @@ class BlockProcessor:
finally: finally:
self.status_server.stop() self.status_server.stop()
# Shut down block processing # Shut down block processing
self.logger.info('flushing to DB for a clean shutdown...') self.logger.info('closing the DB for a clean shutdown...')
await self.flush(True)
self.db.close() self.db.close()
self.executor.shutdown(wait=True) self.executor.shutdown(wait=True)
def force_chain_reorg(self, count):
"""Force a reorg of the given number of blocks.
Returns True if a reorg is queued, false if not caught up.
"""
if self._caught_up_event.is_set():
self.reorg_count = count
self.blocks_event.set()
return True
return False
class Timer:
def __init__(self, name):
self.name = name
self.total = 0
self.count = 0
self.sub_timers = {}
self._last_start = None
def add_timer(self, name):
if name not in self.sub_timers:
self.sub_timers[name] = Timer(name)
return self.sub_timers[name]
def run(self, func, *args, forward_timer=False, timer_name=None, **kwargs):
t = self.add_timer(timer_name or func.__name__)
t.start()
try:
if forward_timer:
return func(*args, **kwargs, timer=t)
else:
return func(*args, **kwargs)
finally:
t.stop()
def start(self):
self._last_start = time.time()
return self
def stop(self):
self.total += (time.time() - self._last_start)
self.count += 1
self._last_start = None
return self
def show(self, depth=0, height=None):
if depth == 0:
print('='*100)
if height is not None:
print(f'STATISTICS AT HEIGHT {height}')
print('='*100)
else:
print(
f"{' '*depth} {self.total/60:4.2f}mins {self.name}"
# f"{self.total/self.count:.5f}sec/call, "
)
for sub_timer in self.sub_timers.values():
sub_timer.show(depth+1)
if depth == 0:
print('='*100)

View file

@ -995,7 +995,6 @@ class LevelDB:
batch_delete(op.key) batch_delete(op.key)
flush_data.undo.clear() flush_data.undo.clear()
flush_data.db_op_stack.clear()
while self.fs_height > flush_data.height: while self.fs_height > flush_data.height:
self.fs_height -= 1 self.fs_height -= 1

View file

@ -542,15 +542,15 @@ class SessionManager:
"""Return statistics about connected sessions.""" """Return statistics about connected sessions."""
return self._session_data(for_log=False) return self._session_data(for_log=False)
async def rpc_reorg(self, count): # async def rpc_reorg(self, count):
"""Force a reorg of the given number of blocks. # """Force a reorg of the given number of blocks.
#
count: number of blocks to reorg # count: number of blocks to reorg
""" # """
count = non_negative_integer(count) # count = non_negative_integer(count)
if not self.bp.force_chain_reorg(count): # if not self.bp.force_chain_reorg(count):
raise RPCError(BAD_REQUEST, 'still catching up with daemon') # raise RPCError(BAD_REQUEST, 'still catching up with daemon')
return f'scheduled a reorg of {count:,d} blocks' # return f'scheduled a reorg of {count:,d} blocks'
# --- External Interface # --- External Interface