This commit is contained in:
Jack Robison 2021-07-08 16:08:33 -04:00
parent 68474e4057
commit a1ddd762e0
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 23 additions and 100 deletions

View file

@ -224,9 +224,9 @@ class BlockProcessor:
#################################
# txo to pending claim
self.txo_to_claim: typing.Dict[Tuple[int, int], StagedClaimtrieItem] = {}
self.txo_to_claim: Dict[Tuple[int, int], StagedClaimtrieItem] = {}
# claim hash to pending claim txo
self.claim_hash_to_txo: typing.Dict[bytes, Tuple[int, int]] = {}
self.claim_hash_to_txo: Dict[bytes, Tuple[int, int]] = {}
# claim hash to lists of pending support txos
self.support_txos_by_claim: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list)
# support txo: (supported claim hash, support amount)
@ -270,7 +270,10 @@ class BlockProcessor:
to_send_es = set(self.touched_claims_to_send_es)
to_send_es.update(self.pending_reposted.difference(self.removed_claims_to_send_es))
to_send_es.update({k for k, v in self.pending_channel_counts.items() if v != 0}.difference(self.removed_claims_to_send_es))
to_send_es.update(
{k for k, v in self.pending_channel_counts.items() if v != 0}.difference(
self.removed_claims_to_send_es)
)
for claim_hash in self.removed_claims_to_send_es:
yield 'delete', claim_hash.hex()
@ -372,7 +375,6 @@ class BlockProcessor:
return await self.daemon.raw_blocks(hex_hashes)
try:
await self.flush(True)
start, last, hashes = await self.reorg_hashes(count)
# Reverse and convert to hex strings.
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
@ -399,15 +401,7 @@ class BlockProcessor:
The hashes are returned in order of increasing height. Start
is the height of the first hash, last of the last.
"""
start, count = await self.calc_reorg_range(count)
last = start + count - 1
s = '' if count == 1 else 's'
self.logger.info(f'chain was reorganised replacing {count:,d} '
f'block{s} at heights {start:,d}-{last:,d}')
return start, last, await self.db.fs_block_hashes(start, count)
async def calc_reorg_range(self, count: Optional[int]):
"""Calculate the reorg range"""
def diff_pos(hashes1, hashes2):
@ -436,8 +430,12 @@ class BlockProcessor:
count = (self.height - start) + 1
else:
start = (self.height - count) + 1
last = start + count - 1
s = '' if count == 1 else 's'
self.logger.info(f'chain was reorganised replacing {count:,d} '
f'block{s} at heights {start:,d}-{last:,d}')
return start, count
return start, last, await self.db.fs_block_hashes(start, count)
# - Flushing
def flush_data(self):
@ -447,20 +445,11 @@ class BlockProcessor:
self.block_txs, self.db_op_stack, self.undo_infos, self.utxo_cache,
self.db_deletes, self.tip, self.undo_claims)
async def flush(self, flush_utxos):
async def flush(self):
def flush():
self.db.flush_dbs(self.flush_data())
await self.run_in_thread_with_lock(flush)
async def _maybe_flush(self):
# If caught up, flush everything as client queries are
# performed on the DB.
if self._caught_up_event.is_set():
await self.flush(True)
elif time.perf_counter() > self.next_cache_check:
await self.flush(True)
self.next_cache_check = time.perf_counter() + 30
def _add_claim_or_update(self, height: int, txo: 'Output', tx_hash: bytes, tx_num: int, nout: int,
spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]]):
try:
@ -1494,14 +1483,12 @@ class BlockProcessor:
# Flush everything but with first_sync->False state.
first_sync = self.db.first_sync
self.db.first_sync = False
await self.flush(True)
await self.flush()
if first_sync:
self.logger.info(f'{lbry.__version__} synced to '
f'height {self.height:,d}, halting here.')
self.shutdown_event.set()
# --- External API
async def fetch_and_process_blocks(self, caught_up_event):
"""Fetch, process and index blocks from the daemon.
@ -1536,69 +1523,6 @@ class BlockProcessor:
finally:
self.status_server.stop()
# Shut down block processing
self.logger.info('flushing to DB for a clean shutdown...')
await self.flush(True)
self.logger.info('closing the DB for a clean shutdown...')
self.db.close()
self.executor.shutdown(wait=True)
def force_chain_reorg(self, count):
"""Force a reorg of the given number of blocks.
Returns True if a reorg is queued, false if not caught up.
"""
if self._caught_up_event.is_set():
self.reorg_count = count
self.blocks_event.set()
return True
return False
class Timer:
def __init__(self, name):
self.name = name
self.total = 0
self.count = 0
self.sub_timers = {}
self._last_start = None
def add_timer(self, name):
if name not in self.sub_timers:
self.sub_timers[name] = Timer(name)
return self.sub_timers[name]
def run(self, func, *args, forward_timer=False, timer_name=None, **kwargs):
t = self.add_timer(timer_name or func.__name__)
t.start()
try:
if forward_timer:
return func(*args, **kwargs, timer=t)
else:
return func(*args, **kwargs)
finally:
t.stop()
def start(self):
self._last_start = time.time()
return self
def stop(self):
self.total += (time.time() - self._last_start)
self.count += 1
self._last_start = None
return self
def show(self, depth=0, height=None):
if depth == 0:
print('='*100)
if height is not None:
print(f'STATISTICS AT HEIGHT {height}')
print('='*100)
else:
print(
f"{' '*depth} {self.total/60:4.2f}mins {self.name}"
# f"{self.total/self.count:.5f}sec/call, "
)
for sub_timer in self.sub_timers.values():
sub_timer.show(depth+1)
if depth == 0:
print('='*100)

View file

@ -995,7 +995,6 @@ class LevelDB:
batch_delete(op.key)
flush_data.undo.clear()
flush_data.db_op_stack.clear()
while self.fs_height > flush_data.height:
self.fs_height -= 1

View file

@ -542,15 +542,15 @@ class SessionManager:
"""Return statistics about connected sessions."""
return self._session_data(for_log=False)
async def rpc_reorg(self, count):
"""Force a reorg of the given number of blocks.
count: number of blocks to reorg
"""
count = non_negative_integer(count)
if not self.bp.force_chain_reorg(count):
raise RPCError(BAD_REQUEST, 'still catching up with daemon')
return f'scheduled a reorg of {count:,d} blocks'
# async def rpc_reorg(self, count):
# """Force a reorg of the given number of blocks.
#
# count: number of blocks to reorg
# """
# count = non_negative_integer(count)
# if not self.bp.force_chain_reorg(count):
# raise RPCError(BAD_REQUEST, 'still catching up with daemon')
# return f'scheduled a reorg of {count:,d} blocks'
# --- External Interface