forked from LBRYCommunity/lbry-sdk
add vacuuming for tx table
This commit is contained in:
parent
a4680878c4
commit
ca10874006
2 changed files with 24 additions and 12 deletions
|
@ -64,23 +64,27 @@ def sync_block_file(
|
||||||
def sync_spends(initial_sync: bool, p: ProgressContext):
|
def sync_spends(initial_sync: bool, p: ProgressContext):
|
||||||
if initial_sync:
|
if initial_sync:
|
||||||
p.start(
|
p.start(
|
||||||
6 +
|
7 +
|
||||||
len(pg_add_tx_constraints_and_indexes) +
|
len(pg_add_tx_constraints_and_indexes) +
|
||||||
len(pg_add_txi_constraints_and_indexes) +
|
len(pg_add_txi_constraints_and_indexes) +
|
||||||
len(pg_add_txo_constraints_and_indexes)
|
len(pg_add_txo_constraints_and_indexes)
|
||||||
)
|
)
|
||||||
|
# 1. tx table stuff
|
||||||
|
if p.ctx.is_postgres:
|
||||||
|
p.ctx.execute_notx(text("VACUUM ANALYZE tx;"))
|
||||||
|
p.step()
|
||||||
for constraint in pg_add_tx_constraints_and_indexes:
|
for constraint in pg_add_tx_constraints_and_indexes:
|
||||||
if p.ctx.is_postgres:
|
if p.ctx.is_postgres:
|
||||||
p.ctx.execute(text(constraint))
|
p.ctx.execute(text(constraint))
|
||||||
p.step()
|
p.step()
|
||||||
# A. Update TXIs to have the address of TXO they are spending.
|
# A. Update TXIs to have the address of TXO they are spending.
|
||||||
# 1. txi table reshuffling
|
# 2. txi table reshuffling
|
||||||
p.ctx.execute(text("ALTER TABLE txi RENAME TO old_txi;"))
|
p.ctx.execute(text("ALTER TABLE txi RENAME TO old_txi;"))
|
||||||
p.ctx.execute(CreateTable(TXI, include_foreign_key_constraints=[]))
|
p.ctx.execute(CreateTable(TXI, include_foreign_key_constraints=[]))
|
||||||
if p.ctx.is_postgres:
|
if p.ctx.is_postgres:
|
||||||
p.ctx.execute(text("ALTER TABLE txi DROP CONSTRAINT txi_pkey;"))
|
p.ctx.execute(text("ALTER TABLE txi DROP CONSTRAINT txi_pkey;"))
|
||||||
p.step()
|
p.step()
|
||||||
# 2. insert
|
# 3. insert
|
||||||
old_txi = table("old_txi", *(c.copy() for c in TXI.columns)) # pylint: disable=not-an-iterable
|
old_txi = table("old_txi", *(c.copy() for c in TXI.columns)) # pylint: disable=not-an-iterable
|
||||||
columns = [c for c in old_txi.columns if c.name != "address"] + [TXO.c.address]
|
columns = [c for c in old_txi.columns if c.name != "address"] + [TXO.c.address]
|
||||||
join_txi_on_txo = old_txi.join(TXO, old_txi.c.txo_hash == TXO.c.txo_hash)
|
join_txi_on_txo = old_txi.join(TXO, old_txi.c.txo_hash == TXO.c.txo_hash)
|
||||||
|
@ -88,7 +92,7 @@ def sync_spends(initial_sync: bool, p: ProgressContext):
|
||||||
insert_txis = TXI.insert().from_select(columns, select_txis)
|
insert_txis = TXI.insert().from_select(columns, select_txis)
|
||||||
p.ctx.execute(insert_txis)
|
p.ctx.execute(insert_txis)
|
||||||
p.step()
|
p.step()
|
||||||
# 3. drop old txi and vacuum
|
# 4. drop old txi and vacuum
|
||||||
p.ctx.execute(text("DROP TABLE old_txi;"))
|
p.ctx.execute(text("DROP TABLE old_txi;"))
|
||||||
if p.ctx.is_postgres:
|
if p.ctx.is_postgres:
|
||||||
p.ctx.execute_notx(text("VACUUM ANALYZE txi;"))
|
p.ctx.execute_notx(text("VACUUM ANALYZE txi;"))
|
||||||
|
@ -98,13 +102,13 @@ def sync_spends(initial_sync: bool, p: ProgressContext):
|
||||||
p.ctx.execute(text(constraint))
|
p.ctx.execute(text(constraint))
|
||||||
p.step()
|
p.step()
|
||||||
# B. Update TXOs to have the height at which they were spent (if they were).
|
# B. Update TXOs to have the height at which they were spent (if they were).
|
||||||
# 4. txo table reshuffling
|
# 5. txo table reshuffling
|
||||||
p.ctx.execute(text("ALTER TABLE txo RENAME TO old_txo;"))
|
p.ctx.execute(text("ALTER TABLE txo RENAME TO old_txo;"))
|
||||||
p.ctx.execute(CreateTable(TXO, include_foreign_key_constraints=[]))
|
p.ctx.execute(CreateTable(TXO, include_foreign_key_constraints=[]))
|
||||||
if p.ctx.is_postgres:
|
if p.ctx.is_postgres:
|
||||||
p.ctx.execute(text("ALTER TABLE txo DROP CONSTRAINT txo_pkey;"))
|
p.ctx.execute(text("ALTER TABLE txo DROP CONSTRAINT txo_pkey;"))
|
||||||
p.step()
|
p.step()
|
||||||
# 5. insert
|
# 6. insert
|
||||||
old_txo = table("old_txo", *(c.copy() for c in TXO.columns)) # pylint: disable=not-an-iterable
|
old_txo = table("old_txo", *(c.copy() for c in TXO.columns)) # pylint: disable=not-an-iterable
|
||||||
columns = [c for c in old_txo.columns if c.name != "spent_height"]
|
columns = [c for c in old_txo.columns if c.name != "spent_height"]
|
||||||
insert_columns = columns + [TXO.c.spent_height]
|
insert_columns = columns + [TXO.c.spent_height]
|
||||||
|
@ -114,7 +118,7 @@ def sync_spends(initial_sync: bool, p: ProgressContext):
|
||||||
insert_txos = TXO.insert().from_select(insert_columns, select_txos)
|
insert_txos = TXO.insert().from_select(insert_columns, select_txos)
|
||||||
p.ctx.execute(insert_txos)
|
p.ctx.execute(insert_txos)
|
||||||
p.step()
|
p.step()
|
||||||
# 6. drop old txo
|
# 7. drop old txo
|
||||||
p.ctx.execute(text("DROP TABLE old_txo;"))
|
p.ctx.execute(text("DROP TABLE old_txo;"))
|
||||||
if p.ctx.is_postgres:
|
if p.ctx.is_postgres:
|
||||||
p.ctx.execute_notx(text("VACUUM ANALYZE txo;"))
|
p.ctx.execute_notx(text("VACUUM ANALYZE txo;"))
|
||||||
|
@ -124,14 +128,22 @@ def sync_spends(initial_sync: bool, p: ProgressContext):
|
||||||
p.ctx.execute(text(constraint))
|
p.ctx.execute(text(constraint))
|
||||||
p.step()
|
p.step()
|
||||||
else:
|
else:
|
||||||
p.start(3)
|
p.start(5)
|
||||||
# 1. Update spent TXOs setting spent_height
|
# 1. Update spent TXOs setting spent_height
|
||||||
update_spent_outputs(p.ctx)
|
update_spent_outputs(p.ctx)
|
||||||
p.step()
|
p.step()
|
||||||
# 2. Update TXIs to have the address of TXO they are spending.
|
# 2. Update TXIs to have the address of TXO they are spending.
|
||||||
set_input_addresses(p.ctx)
|
set_input_addresses(p.ctx)
|
||||||
p.step()
|
p.step()
|
||||||
# 3. Update visibility map, which speeds up index-only scans.
|
# 3. Update tx visibility map, which speeds up index-only scans.
|
||||||
|
if p.ctx.is_postgres:
|
||||||
|
p.ctx.execute_notx(text("VACUUM tx;"))
|
||||||
|
p.step()
|
||||||
|
# 4. Update txi visibility map, which speeds up index-only scans.
|
||||||
|
if p.ctx.is_postgres:
|
||||||
|
p.ctx.execute_notx(text("VACUUM txi;"))
|
||||||
|
p.step()
|
||||||
|
# 4. Update txo visibility map, which speeds up index-only scans.
|
||||||
if p.ctx.is_postgres:
|
if p.ctx.is_postgres:
|
||||||
p.ctx.execute_notx(text("VACUUM txo;"))
|
p.ctx.execute_notx(text("VACUUM txo;"))
|
||||||
p.step()
|
p.step()
|
||||||
|
|
|
@ -533,8 +533,8 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase):
|
||||||
)
|
)
|
||||||
self.assertConsumingEvents(
|
self.assertConsumingEvents(
|
||||||
events, "blockchain.sync.spends.main", ("steps",), [
|
events, "blockchain.sync.spends.main", ("steps",), [
|
||||||
(0, None, (14,), (1,), (2,), (3,), (4,), (5,), (6,),
|
(0, None, (15,), (1,), (2,), (3,), (4,), (5,), (6,), (7,),
|
||||||
(7,), (8,), (9,), (10,), (11,), (12,), (13,), (14,))
|
(8,), (9,), (10,), (11,), (12,), (13,), (14,), (15,))
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
|
@ -639,7 +639,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase):
|
||||||
)
|
)
|
||||||
self.assertConsumingEvents(
|
self.assertConsumingEvents(
|
||||||
events, "blockchain.sync.spends.main", ("steps",), [
|
events, "blockchain.sync.spends.main", ("steps",), [
|
||||||
(0, None, (3,), (1,), (2,), (3,))
|
(0, None, (5,), (1,), (2,), (3,), (4,), (5,))
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
self.assertConsumingEvents(
|
self.assertConsumingEvents(
|
||||||
|
|
Loading…
Reference in a new issue