diff --git a/lbry/blockchain/lbrycrd.py b/lbry/blockchain/lbrycrd.py index 8f2549e70..92905936b 100644 --- a/lbry/blockchain/lbrycrd.py +++ b/lbry/blockchain/lbrycrd.py @@ -249,7 +249,6 @@ class Lbrycrd: "params": params or [] } async with self.session.post(self.rpc_url, json=message) as resp: - print(await resp.text()) if resp.status == 401: raise LbrycrdUnauthorizedError() try: diff --git a/lbry/blockchain/sync/claims.py b/lbry/blockchain/sync/claims.py index 068ed583d..0594989fd 100644 --- a/lbry/blockchain/sync/claims.py +++ b/lbry/blockchain/sync/claims.py @@ -105,6 +105,7 @@ def row_to_claim_for_saving(row) -> Tuple[Output, dict]: def claims_insert( blocks: Tuple[int, int], missing_in_claims_table: bool, + flush_size: int, p: ProgressContext ): chain = get_or_initialize_lbrycrd(p.ctx) @@ -139,7 +140,7 @@ def claims_insert( 'takeover_height': metadata.get('takeover_height'), }) loader.add_claim(txo, **extra) - if len(loader.claims) >= 25_000: + if len(loader.claims) >= flush_size: p.add(loader.flush(Claim)) p.add(loader.flush(Claim)) diff --git a/lbry/blockchain/sync/supports.py b/lbry/blockchain/sync/supports.py index de7d014aa..48da159d5 100644 --- a/lbry/blockchain/sync/supports.py +++ b/lbry/blockchain/sync/supports.py @@ -21,7 +21,12 @@ log = logging.getLogger(__name__) @event_emitter("blockchain.sync.supports.insert", "supports") -def supports_insert(blocks: Tuple[int, int], missing_in_supports_table: bool, p: ProgressContext): +def supports_insert( + blocks: Tuple[int, int], + missing_in_supports_table: bool, + flush_size: int, + p: ProgressContext +): p.start( count_unspent_txos( TXO_TYPES['support'], blocks, @@ -58,7 +63,7 @@ def supports_insert(blocks: Tuple[int, int], missing_in_supports_table: bool, p: signature_digest=row.signature_digest, channel_public_key=row.channel_public_key ) - if len(loader.supports) >= 25_000: + if len(loader.supports) >= flush_size: p.add(loader.flush(Support)) p.add(loader.flush(Support)) diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index 64d169a74..0da2d035a 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -30,13 +30,10 @@ SUPPORTS_MAIN_EVENT = Event.add("blockchain.sync.supports.main", "supports") class BlockchainSync(Sync): - TX_FLUSH_SIZE = 50_000 # flush to db after processing this many TXs and update progress - FILTER_CHUNK_SIZE = 100_000 # split filter generation tasks into this size block chunks + TX_FLUSH_SIZE = 25_000 # flush to db after processing this many TXs and update progress + CLAIM_FLUSH_SIZE = 25_000 # flush to db after processing this many claims and update progress + SUPPORT_FLUSH_SIZE = 25_000 # flush to db after processing this many supports and update progress FILTER_FLUSH_SIZE = 10_000 # flush to db after processing this many filters and update progress - CLAIM_CHUNK_SIZE = 50_000 # split claim sync tasks into this size block chunks - CLAIM_FLUSH_SIZE = 10_000 # flush to db after processing this many claims and update progress - SUPPORT_CHUNK_SIZE = 50_000 # split support sync tasks into this size block chunks - SUPPORT_FLUSH_SIZE = 10_000 # flush to db after processing this many supports and update progress def __init__(self, chain: Lbrycrd, db: Database): super().__init__(chain.ledger, db) @@ -133,7 +130,7 @@ class BlockchainSync(Sync): blocks = 0 tasks = [] # for chunk in range(select min(height), max(height) from block where filter is null): - # tasks.append(self.db.run(block_phase.sync_filters, chunk)) + # tasks.append(self.db.run(block_phase.sync_filters, chunk, self.FILTER_FLUSH_SIZE)) p.start(blocks) await self.run_tasks(tasks) @@ -169,6 +166,7 @@ class BlockchainSync(Sync): missing_in_supports_table, missing_in_claims_table, missing_or_stale_in_claims_table, + self.db.workers ) async def count_abandoned_supports(self) -> int: @@ -220,7 +218,8 @@ class BlockchainSync(Sync): p.start(total) if batches: await self.run_tasks([ - self.db.run(claim_phase.claims_insert, batch, not initial_sync) for batch in batches + self.db.run(claim_phase.claims_insert, batch, not initial_sync, self.CLAIM_FLUSH_SIZE) + for batch in batches ]) if not initial_sync: await self.run_tasks([ @@ -263,7 +262,7 @@ class BlockchainSync(Sync): if support_batches: await self.run_tasks([ self.db.run( - support_phase.supports_insert, batch, not initial_sync + support_phase.supports_insert, batch, not initial_sync, self.SUPPORT_FLUSH_SIZE ) for batch in support_batches ]) if delete_supports: diff --git a/lbry/db/queries/txio.py b/lbry/db/queries/txio.py index b15c1edae..727c66f00 100644 --- a/lbry/db/queries/txio.py +++ b/lbry/db/queries/txio.py @@ -110,9 +110,10 @@ def distribute_unspent_txos( missing_in_supports_table: bool = False, missing_in_claims_table: bool = False, missing_or_stale_in_claims_table: bool = False, + number_of_buckets: int = 10 ) -> Tuple[int, List[Tuple[int, int]]]: chunks = ( - select(func.ntile(10).over(order_by=TXO.c.height).label('chunk'), TXO.c.height) + select(func.ntile(number_of_buckets).over(order_by=TXO.c.height).label('chunk'), TXO.c.height) .where( where_unspent_txos( txo_types, blocks, diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index 99b00482d..6e9dc8073 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -496,6 +496,8 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): events = [] self.sync.on_progress.listen(events.append) + self.db.workers = 10 # sets how many claim/update workers there will be + # initial sync await self.sync.advance() await asyncio.sleep(1) # give it time to collect events