diff --git a/lbry/blockchain/sync/blocks.py b/lbry/blockchain/sync/blocks.py index 41c986cb8..464d54cb7 100644 --- a/lbry/blockchain/sync/blocks.py +++ b/lbry/blockchain/sync/blocks.py @@ -128,13 +128,19 @@ def sync_spends(initial_sync: bool, p: ProgressContext): p.ctx.execute(text(constraint)) p.step() else: - p.start(2) + p.start(3) # 1. Update spent TXOs setting spent_height update_spent_outputs(p.ctx) p.step() # 2. Update TXIs to have the address of TXO they are spending. set_input_addresses(p.ctx) p.step() + # 3. Update visibility map, which speeds up index-only scans. + if p.ctx.is_postgres: + with p.ctx.engine.connect() as c: + c.execute(text("COMMIT;")) + c.execute(text("VACUUM txo;")) + p.step() @event_emitter("blockchain.sync.filter.generate", "blocks") diff --git a/lbry/blockchain/sync/claims.py b/lbry/blockchain/sync/claims.py index a7b256e07..1778cb88d 100644 --- a/lbry/blockchain/sync/claims.py +++ b/lbry/blockchain/sync/claims.py @@ -159,6 +159,19 @@ def claims_constraints_and_indexes(p: ProgressContext): p.step() +@event_emitter("blockchain.sync.claims.vacuum", "steps") +def claims_vacuum(p: ProgressContext): + p.start(2) + with p.ctx.engine.connect() as c: + if p.ctx.is_postgres: + c.execute(text("COMMIT;")) + c.execute(text("VACUUM claim;")) + p.step() + if p.ctx.is_postgres: + c.execute(text("VACUUM tag;")) + p.step() + + @event_emitter("blockchain.sync.claims.update", "claims") def claims_update(blocks: Tuple[int, int], p: ProgressContext): p.start( diff --git a/lbry/blockchain/sync/supports.py b/lbry/blockchain/sync/supports.py index 670b13b7e..06304d202 100644 --- a/lbry/blockchain/sync/supports.py +++ b/lbry/blockchain/sync/supports.py @@ -63,6 +63,13 @@ def supports_insert(blocks: Tuple[int, int], missing_in_supports_table: bool, p: p.add(loader.flush(Support)) +@event_emitter("blockchain.sync.supports.delete", "supports") +def supports_delete(supports, p: ProgressContext): + p.start(supports, label="del supprt") + deleted = p.ctx.execute(Support.delete().where(where_abandoned_supports())) + p.step(deleted.rowcount) + + @event_emitter("blockchain.sync.supports.indexes", "steps") def supports_constraints_and_indexes(p: ProgressContext): p.start(1 + len(pg_add_support_constraints_and_indexes)) @@ -77,8 +84,11 @@ def supports_constraints_and_indexes(p: ProgressContext): p.step() -@event_emitter("blockchain.sync.supports.delete", "supports") -def supports_delete(supports, p: ProgressContext): - p.start(supports, label="del supprt") - deleted = p.ctx.execute(Support.delete().where(where_abandoned_supports())) - p.step(deleted.rowcount) +@event_emitter("blockchain.sync.supports.vacuum", "steps") +def supports_vacuum(p: ProgressContext): + p.start(1) + with p.ctx.engine.connect() as c: + if p.ctx.is_postgres: + c.execute(text("COMMIT;")) + c.execute(text("VACUUM support;")) + p.step() diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index e348006d4..d628116ce 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -233,6 +233,8 @@ class BlockchainSync(Sync): await self.db.run(claim_phase.update_stakes, blocks, claims_with_changed_supports) if initial_sync: await self.db.run(claim_phase.claims_constraints_and_indexes) + else: + await self.db.run(claim_phase.claims_vacuum) return initial_sync async def sync_supports(self, blocks): @@ -267,6 +269,8 @@ class BlockchainSync(Sync): await self.db.run(support_phase.supports_delete, delete_supports) if initial_sync: await self.db.run(support_phase.supports_constraints_and_indexes) + else: + await self.db.run(support_phase.supports_vacuum) async def sync_channel_stats(self, blocks, initial_sync): await self.db.run(claim_phase.update_channel_stats, blocks, initial_sync) diff --git a/lbry/db/query_context.py b/lbry/db/query_context.py index 777ca3bbe..c94fca8ff 100644 --- a/lbry/db/query_context.py +++ b/lbry/db/query_context.py @@ -645,7 +645,6 @@ class BulkLoader: def update_claim(self, txo: Output, **extra): d, tags = self.claim_to_rows(txo, **extra) d['pk'] = txo.claim_hash - d['set_canonical_url'] = d['is_signature_valid'] self.update_claims.append(d) self.delete_tags.append({'pk': txo.claim_hash}) self.tags.extend(tags) diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index 52abfcde9..903f5a157 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -639,7 +639,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): ) self.assertConsumingEvents( events, "blockchain.sync.spends.main", ("steps",), [ - (0, None, (2,), (1,), (2,)) + (0, None, (3,), (1,), (2,), (3,)) ] ) self.assertConsumingEvents( @@ -668,6 +668,11 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): (0, None, (1,), (1,)), ] ) + self.assertConsumingEvents( + events, "blockchain.sync.claims.vacuum", ("steps",), [ + (0, None, (2,), (1,), (2,)) + ] + ) self.assertEqual( events.pop(0), { "event": "blockchain.sync.claims.main", @@ -690,6 +695,11 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): (353, "add supprt 353- 354", (1,), (1,)), ] ) + self.assertConsumingEvents( + events, "blockchain.sync.supports.vacuum", ("steps",), [ + (0, None, (1,), (1,)) + ] + ) self.assertEqual( events.pop(0), { "event": "blockchain.sync.supports.main",