lbry-sdk/lbry/blockchain/sync/blocks.py

337 lines
13 KiB
Python
Raw Normal View History

2020-07-12 00:18:33 +02:00
import logging
2020-09-16 16:37:49 +02:00
from binascii import hexlify, unhexlify
2020-09-25 16:22:06 +02:00
from typing import Tuple, List
2020-07-12 00:18:33 +02:00
from sqlalchemy import table, text, func, union, between
2020-07-12 00:18:33 +02:00
from sqlalchemy.future import select
from sqlalchemy.schema import CreateTable
from lbry.db.tables import (
Block as BlockTable, BlockFilter, BlockGroupFilter,
TX, TXFilter, MempoolFilter, TXO, TXI, Claim, Tag, Support
)
from lbry.db.tables import (
2020-09-17 17:25:50 +02:00
pg_add_block_constraints_and_indexes,
pg_add_block_filter_constraints_and_indexes,
2020-07-14 03:00:24 +02:00
pg_add_tx_constraints_and_indexes,
pg_add_tx_filter_constraints_and_indexes,
2020-07-12 00:18:33 +02:00
pg_add_txo_constraints_and_indexes,
pg_add_txi_constraints_and_indexes,
2020-07-12 00:18:33 +02:00
)
from lbry.db.query_context import ProgressContext, event_emitter, context
from lbry.db.sync import set_input_addresses, update_spent_outputs
2020-09-16 16:37:49 +02:00
from lbry.blockchain.transaction import Transaction
from lbry.blockchain.block import Block, create_address_filter
2020-07-12 00:18:33 +02:00
from lbry.blockchain.bcd_data_stream import BCDataStream
from .context import get_or_initialize_lbrycrd
from .filter_builder import FilterBuilder
2020-07-12 00:18:33 +02:00
log = logging.getLogger(__name__)
def get_best_block_height_for_file(file_number):
return context().fetchone(
select(func.coalesce(func.max(BlockTable.c.height), -1).label('height'))
.where(BlockTable.c.file_number == file_number)
)['height']
2020-07-13 06:55:30 +02:00
@event_emitter("blockchain.sync.blocks.file", "blocks", "txs", throttle=100)
2020-07-12 00:18:33 +02:00
def sync_block_file(
file_number: int, start_height: int, txs: int, flush_size: int, p: ProgressContext
):
chain = get_or_initialize_lbrycrd(p.ctx)
new_blocks = chain.db.sync_get_blocks_in_file(file_number, start_height)
if not new_blocks:
return -1
file_name = chain.get_block_file_name(file_number)
p.start(len(new_blocks), txs, progress_id=file_number, label=file_name)
block_file_path = chain.get_block_file_path(file_number)
done_blocks = done_txs = 0
last_block_processed, loader = -1, p.ctx.get_bulk_loader()
with open(block_file_path, "rb") as fp:
stream = BCDataStream(fp=fp)
for done_blocks, block_info in enumerate(new_blocks, start=1):
block_height = block_info["height"]
fp.seek(block_info["data_offset"])
block = Block.from_data_stream(stream, block_height, file_number)
loader.add_block(block)
if len(loader.txs) >= flush_size:
done_txs += loader.flush(TX)
p.step(done_blocks, done_txs)
last_block_processed = block_height
if p.ctx.stop_event.is_set():
return last_block_processed
if loader.txs:
done_txs += loader.flush(TX)
p.step(done_blocks, done_txs)
return last_block_processed
@event_emitter("blockchain.sync.blocks.indexes", "steps")
def blocks_constraints_and_indexes(p: ProgressContext):
p.start(1 + len(pg_add_block_constraints_and_indexes))
if p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM ANALYZE block;"))
p.step()
for constraint in pg_add_block_constraints_and_indexes:
if p.ctx.is_postgres:
p.ctx.execute(text(constraint))
p.step()
@event_emitter("blockchain.sync.blocks.vacuum", "steps")
def blocks_vacuum(p: ProgressContext):
p.start(1)
if p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM block;"))
p.step()
2020-07-13 06:55:30 +02:00
@event_emitter("blockchain.sync.spends.main", "steps")
def sync_spends(initial_sync: bool, p: ProgressContext):
2020-07-12 00:18:33 +02:00
if initial_sync:
2020-07-14 03:00:24 +02:00
p.start(
2020-07-14 23:11:06 +02:00
7 +
2020-07-14 03:00:24 +02:00
len(pg_add_tx_constraints_and_indexes) +
len(pg_add_txi_constraints_and_indexes) +
len(pg_add_txo_constraints_and_indexes)
)
2020-07-14 23:11:06 +02:00
# 1. tx table stuff
if p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM ANALYZE tx;"))
p.step()
2020-07-14 03:00:24 +02:00
for constraint in pg_add_tx_constraints_and_indexes:
if p.ctx.is_postgres:
p.ctx.execute(text(constraint))
p.step()
2020-07-12 00:18:33 +02:00
# A. Update TXIs to have the address of TXO they are spending.
2020-07-14 23:11:06 +02:00
# 2. txi table reshuffling
2020-07-12 00:18:33 +02:00
p.ctx.execute(text("ALTER TABLE txi RENAME TO old_txi;"))
p.ctx.execute(CreateTable(TXI, include_foreign_key_constraints=[]))
if p.ctx.is_postgres:
p.ctx.execute(text("ALTER TABLE txi DROP CONSTRAINT txi_pkey;"))
p.step()
2020-07-14 23:11:06 +02:00
# 3. insert
2020-07-13 21:45:21 +02:00
old_txi = table("old_txi", *(c.copy() for c in TXI.columns)) # pylint: disable=not-an-iterable
2020-07-12 00:18:33 +02:00
columns = [c for c in old_txi.columns if c.name != "address"] + [TXO.c.address]
join_txi_on_txo = old_txi.join(TXO, old_txi.c.txo_hash == TXO.c.txo_hash)
select_txis = select(*columns).select_from(join_txi_on_txo)
insert_txis = TXI.insert().from_select(columns, select_txis)
p.ctx.execute(insert_txis)
p.step()
2020-07-14 23:11:06 +02:00
# 4. drop old txi and vacuum
2020-07-12 00:18:33 +02:00
p.ctx.execute(text("DROP TABLE old_txi;"))
if p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM ANALYZE txi;"))
2020-07-12 00:18:33 +02:00
p.step()
2020-07-14 03:00:24 +02:00
for constraint in pg_add_txi_constraints_and_indexes:
if p.ctx.is_postgres:
p.ctx.execute(text(constraint))
p.step()
# B. Update TXOs to have the height at which they were spent (if they were).
2020-07-14 23:11:06 +02:00
# 5. txo table reshuffling
2020-07-12 00:18:33 +02:00
p.ctx.execute(text("ALTER TABLE txo RENAME TO old_txo;"))
p.ctx.execute(CreateTable(TXO, include_foreign_key_constraints=[]))
if p.ctx.is_postgres:
p.ctx.execute(text("ALTER TABLE txo DROP CONSTRAINT txo_pkey;"))
p.step()
2020-07-14 23:11:06 +02:00
# 6. insert
2020-07-13 21:45:21 +02:00
old_txo = table("old_txo", *(c.copy() for c in TXO.columns)) # pylint: disable=not-an-iterable
2020-07-12 00:18:33 +02:00
columns = [c for c in old_txo.columns if c.name != "spent_height"]
insert_columns = columns + [TXO.c.spent_height]
2020-07-14 03:00:24 +02:00
select_columns = columns + [func.coalesce(TXI.c.height, 0).label("spent_height")]
join_txo_on_txi = old_txo.join(TXI, old_txo.c.txo_hash == TXI.c.txo_hash, isouter=True)
2020-07-12 00:18:33 +02:00
select_txos = select(*select_columns).select_from(join_txo_on_txi)
insert_txos = TXO.insert().from_select(insert_columns, select_txos)
p.ctx.execute(insert_txos)
p.step()
2020-07-14 23:11:06 +02:00
# 7. drop old txo
2020-07-12 00:18:33 +02:00
p.ctx.execute(text("DROP TABLE old_txo;"))
if p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM ANALYZE txo;"))
2020-07-12 00:18:33 +02:00
p.step()
2020-07-14 03:00:24 +02:00
for constraint in pg_add_txo_constraints_and_indexes:
if p.ctx.is_postgres:
p.ctx.execute(text(constraint))
p.step()
2020-07-12 00:18:33 +02:00
else:
2020-07-14 23:11:06 +02:00
p.start(5)
2020-07-12 22:27:51 +02:00
# 1. Update spent TXOs setting spent_height
2020-07-12 00:18:33 +02:00
update_spent_outputs(p.ctx)
p.step()
2020-07-12 22:27:51 +02:00
# 2. Update TXIs to have the address of TXO they are spending.
set_input_addresses(p.ctx)
p.step()
2020-07-14 23:11:06 +02:00
# 3. Update tx visibility map, which speeds up index-only scans.
if p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM tx;"))
p.step()
# 4. Update txi visibility map, which speeds up index-only scans.
if p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM txi;"))
p.step()
# 4. Update txo visibility map, which speeds up index-only scans.
2020-07-14 19:26:32 +02:00
if p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM txo;"))
2020-07-14 19:26:32 +02:00
p.step()
2020-07-12 00:18:33 +02:00
2020-09-16 16:37:49 +02:00
@event_emitter("blockchain.sync.mempool.clear", "txs")
def clear_mempool(p: ProgressContext):
delete_all_the_things(-1, p)
@event_emitter("blockchain.sync.mempool.main", "txs")
2020-09-25 16:22:06 +02:00
def sync_mempool(p: ProgressContext) -> List[str]:
2020-09-16 16:37:49 +02:00
chain = get_or_initialize_lbrycrd(p.ctx)
mempool = chain.sync_run(chain.get_raw_mempool())
2020-09-25 16:22:06 +02:00
current = [hexlify(r['tx_hash'][::-1]).decode() for r in p.ctx.fetchall(
2020-09-16 16:37:49 +02:00
select(TX.c.tx_hash).where(TX.c.height < 0)
)]
loader = p.ctx.get_bulk_loader()
2020-09-25 16:22:06 +02:00
added = []
2020-09-16 16:37:49 +02:00
for txid in mempool:
if txid not in current:
raw_tx = chain.sync_run(chain.get_raw_transaction(txid))
loader.add_transaction(
None, Transaction(unhexlify(raw_tx), height=-1)
)
2020-09-25 16:22:06 +02:00
added.append(txid)
2020-09-16 16:37:49 +02:00
if p.ctx.stop_event.is_set():
return
loader.flush(TX)
2020-09-25 16:22:06 +02:00
return added
2020-09-16 16:37:49 +02:00
@event_emitter("blockchain.sync.filters.generate", "blocks", throttle=100)
def sync_filters(start, end, p: ProgressContext):
fp = FilterBuilder(start, end)
p.start((end-start)+1, progress_id=start, label=f"generate filters {start}-{end}")
with p.ctx.connect_streaming() as c:
loader = p.ctx.get_bulk_loader()
tx_hash, height, addresses, last_added = None, None, set(), None
address_to_hash = p.ctx.ledger.address_to_hash160
for row in c.execute(get_block_tx_addresses_sql(*fp.query_heights)):
if tx_hash != row.tx_hash:
if tx_hash is not None:
last_added = tx_hash
fp.add(tx_hash, height, addresses)
tx_hash, height, addresses = row.tx_hash, row.height, set()
addresses.add(address_to_hash(row.address))
if all([last_added, tx_hash]) and last_added != tx_hash: # pickup last tx
fp.add(tx_hash, height, addresses)
for tx_hash, height, addresses in fp.tx_filters:
loader.add_transaction_filter(
tx_hash, height, create_address_filter(list(addresses))
)
for height, addresses in fp.block_filters.items():
loader.add_block_filter(
height, create_address_filter(list(addresses))
)
for group_filter in fp.group_filters:
for height, addresses in group_filter.groups.items():
loader.add_group_filter(
height, group_filter.factor, create_address_filter(list(addresses))
)
p.add(loader.flush(BlockFilter))
@event_emitter("blockchain.sync.filters.indexes", "steps")
def filters_constraints_and_indexes(p: ProgressContext):
constraints = (
pg_add_tx_filter_constraints_and_indexes +
pg_add_block_filter_constraints_and_indexes
2020-07-12 00:18:33 +02:00
)
p.start(2 + len(constraints))
if p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM ANALYZE block_filter;"))
p.step()
if p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM ANALYZE tx_filter;"))
p.step()
for constraint in constraints:
if p.ctx.is_postgres:
p.ctx.execute(text(constraint))
2020-09-17 17:25:50 +02:00
p.step()
2020-07-12 00:18:33 +02:00
@event_emitter("blockchain.sync.filters.vacuum", "steps")
def filters_vacuum(p: ProgressContext):
p.start(2)
if p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM block_filter;"))
p.step()
if p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM tx_filter;"))
p.step()
2020-07-12 00:18:33 +02:00
def get_block_range_without_filters() -> Tuple[int, int]:
sql = (
select(
func.coalesce(func.min(BlockTable.c.height), -1).label('start_height'),
func.coalesce(func.max(BlockTable.c.height), -1).label('end_height'),
2020-07-12 00:18:33 +02:00
)
.select_from(
BlockTable.join(BlockFilter, BlockTable.c.height == BlockFilter.c.height, isouter=True)
)
2020-09-22 04:17:45 +02:00
.where(BlockFilter.c.height.is_(None))
2020-07-12 00:18:33 +02:00
)
result = context().fetchone(sql)
return result['start_height'], result['end_height']
def get_block_tx_addresses_sql(start_height, end_height):
return union(
select(TXO.c.tx_hash, TXO.c.height, TXO.c.address).where(
(TXO.c.address.isnot(None)) & between(TXO.c.height, start_height, end_height)
),
select(TXI.c.tx_hash, TXI.c.height, TXI.c.address).where(
(TXI.c.address.isnot(None)) & between(TXI.c.height, start_height, end_height)
),
).order_by('height', 'tx_hash')
@event_emitter("blockchain.sync.rewind.main", "steps")
def rewind(height: int, p: ProgressContext):
2020-09-16 16:37:49 +02:00
delete_all_the_things(height, p)
def delete_all_the_things(height: int, p: ProgressContext):
def constrain(col):
2020-09-25 16:22:06 +02:00
if height == -1:
return col == -1
2020-09-16 16:37:49 +02:00
return col <= height
deletes = [
2020-09-16 16:37:49 +02:00
BlockTable.delete().where(constrain(BlockTable.c.height)),
TXI.delete().where(constrain(TXI.c.height)),
TXO.delete().where(constrain(TXO.c.height)),
TX.delete().where(constrain(TX.c.height)),
2020-08-04 04:36:14 +02:00
Tag.delete().where(
Tag.c.claim_hash.in_(
2020-09-16 16:37:49 +02:00
select(Claim.c.claim_hash).where(constrain(Claim.c.height))
2020-08-04 04:36:14 +02:00
)
),
2020-09-16 16:37:49 +02:00
Claim.delete().where(constrain(Claim.c.height)),
Support.delete().where(constrain(Support.c.height)),
MempoolFilter.delete(),
]
2020-09-16 16:37:49 +02:00
if height > 0:
deletes.extend([
BlockFilter.delete().where(BlockFilter.c.height >= height),
# TODO: group and tx filters need where() clauses (below actually breaks things)
BlockGroupFilter.delete(),
TXFilter.delete(),
])
for delete in p.iter(deletes):
p.ctx.execute(delete)