lbry-sdk/lbry/blockchain/sync.py
2020-06-10 15:25:45 -04:00

258 lines
9.7 KiB
Python

import os
import asyncio
import logging
from contextvars import ContextVar
from typing import Optional
from sqlalchemy import func, bindparam
from sqlalchemy.future import select
from lbry.event import BroadcastSubscription
from lbry.service.base import Sync, BlockEvent
from lbry.db import Database, queries, TXO_TYPES
from lbry.db.tables import Claim, Claimtrie, TXO, TXI, Block as BlockTable
from lbry.db.query_context import progress, context, Event
from lbry.db.utils import chunk
from .lbrycrd import Lbrycrd
from .block import Block, create_block_filter
from .bcd_data_stream import BCDataStream
log = logging.getLogger(__name__)
_chain: ContextVar[Lbrycrd] = ContextVar('chain')
def get_or_initialize_lbrycrd(ctx=None) -> Lbrycrd:
chain = _chain.get(None)
if chain is not None:
return chain
chain = Lbrycrd((ctx or context()).ledger)
chain.db.sync_open()
_chain.set(chain)
return chain
def process_block_file(block_file_number, current_height):
ctx = context()
chain = get_or_initialize_lbrycrd(ctx)
stop = ctx.stop_event
loader = ctx.get_bulk_loader()
with progress(Event.BLOCK_READ, 100) as p:
new_blocks = chain.db.sync_get_blocks_in_file(block_file_number, current_height)
if not new_blocks:
return -1
done, total, last_block_processed = 0, len(new_blocks), -1
block_file_path = chain.get_block_file_path_from_number(block_file_number)
p.start(total, {'block_file': block_file_number})
with open(block_file_path, 'rb') as fp:
stream = BCDataStream(fp=fp)
for done, block_info in enumerate(new_blocks, start=1):
if stop.is_set():
return -1
block_height = block_info['height']
fp.seek(block_info['data_offset'])
block = Block.from_data_stream(stream, block_height, block_file_number)
loader.add_block(block)
last_block_processed = block_height
p.step(done)
with progress(Event.BLOCK_SAVE) as p:
p.extra = {'block_file': block_file_number}
loader.save()
return last_block_processed
def process_claimtrie(heights):
chain = get_or_initialize_lbrycrd()
with progress(Event.TRIE_DELETE) as p:
p.start(1)
p.ctx.execute(Claimtrie.delete())
with progress(Event.TRIE_UPDATE) as p, context().connection.begin():
trie = chain.db.sync_get_claimtrie()
p.start(len(trie))
done = 0
for chunk_size, chunk_rows in chunk(trie, 10000):
p.ctx.execute(
Claimtrie.insert(), [{
'normalized': r['normalized'],
'claim_hash': r['claim_hash'],
'last_take_over_height': r['last_take_over_height'],
} for r in chunk_rows]
)
done += chunk_size
p.step(done)
with progress(Event.TRIE_UPDATE, 250) as p, context().connection.begin():
claims = chain.db.sync_get_claims()
p.start(len(claims))
done = 0
for record in claims:
p.ctx.execute(
Claim.update()
.where(Claim.c.claim_hash == record['claim_hash'])
.values(
activation_height=record['activation_height'],
expiration_height=record['expiration_height']
)
)
done += 1
p.step(done)
with context("effective amount update") as ctx:
support = TXO.alias('support')
effective_amount_update = (
Claim.update()
.where(Claim.c.activation_height <= heights[-1])
.values(
effective_amount=(
select(func.coalesce(func.sum(support.c.amount), 0) + Claim.c.amount)
.select_from(support).where(
(support.c.claim_hash == Claim.c.claim_hash) &
(support.c.txo_type == TXO_TYPES['support']) &
(support.c.txo_hash.notin_(select(TXI.c.txo_hash)))
).scalar_subquery()
)
)
)
ctx.execute(effective_amount_update)
def process_block_and_tx_filters():
with context("effective amount update") as ctx:
blocks = []
all_filters = []
all_addresses = []
for block in queries.get_blocks_without_filters():
addresses = {
ctx.ledger.address_to_hash160(r['address'])
for r in queries.get_block_tx_addresses(block_hash=block['block_hash'])
}
all_addresses.extend(addresses)
block_filter = create_block_filter(addresses)
all_filters.append(block_filter)
blocks.append({'pk': block['block_hash'], 'block_filter': block_filter})
# filters = [get_block_filter(f) for f in all_filters]
ctx.execute(BlockTable.update().where(BlockTable.c.block_hash == bindparam('pk')), blocks)
# txs = []
# for tx in queries.get_transactions_without_filters():
# tx_filter = create_block_filter(
# {r['address'] for r in queries.get_block_tx_addresses(tx_hash=tx['tx_hash'])}
# )
# txs.append({'pk': tx['tx_hash'], 'tx_filter': tx_filter})
# execute(TX.update().where(TX.c.tx_hash == bindparam('pk')), txs)
class BlockchainSync(Sync):
def __init__(self, chain: Lbrycrd, db: Database):
super().__init__(chain.ledger, db)
self.chain = chain
self.on_block_subscription: Optional[BroadcastSubscription] = None
self.advance_loop_task: Optional[asyncio.Task] = None
self.advance_loop_event = asyncio.Event()
async def start(self):
# initial advance as task so that it can be stop()'ed before finishing
self.advance_loop_task = asyncio.create_task(self.advance())
await self.advance_loop_task
self.chain.subscribe()
self.advance_loop_task = asyncio.create_task(self.advance_loop())
self.on_block_subscription = self.chain.on_block.listen(
lambda e: self.advance_loop_event.set()
)
async def stop(self):
self.chain.unsubscribe()
if self.on_block_subscription is not None:
self.on_block_subscription.cancel()
self.db.stop_event.set()
self.advance_loop_task.cancel()
async def run(self, f, *args):
return await asyncio.get_running_loop().run_in_executor(
self.db.executor, f, *args
)
async def load_blocks(self):
tasks = []
starting_height = None
tx_count = block_count = ending_height = 0
for file in await self.chain.db.get_block_files():
# block files may be read and saved out of order, need to check
# each file individually to see if we have missing blocks
current_height = await self.db.get_best_height_for_file(file['file_number'])
if current_height == file['max_height']:
# we have all blocks in this file, skipping
continue
if -1 < current_height < file['max_height']:
# we have some blocks, need to figure out what we're missing
# call get_block_files again limited to this file and current_height
file = (await self.chain.db.get_block_files(
file_number=file['file_number'], above_height=current_height
))[0]
tx_count += file['txs']
block_count += file['blocks']
starting_height = min(
current_height if starting_height is None else starting_height, current_height
)
ending_height = max(ending_height, file['max_height'])
tasks.append(self.run(process_block_file, file['file_number'], current_height))
if not tasks:
return None
await self._on_progress_controller.add({
"event": "blockchain.sync.start",
"data": {
"starting_height": starting_height,
"ending_height": ending_height,
"files": len(tasks),
"blocks": block_count,
"txs": tx_count
}
})
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_EXCEPTION
)
if pending:
self.db.stop_event.set()
for future in pending:
future.cancel()
return None
best_height_processed = max(f.result() for f in done)
# putting event in queue instead of add to progress_controller because
# we want this message to appear after all of the queued messages from workers
self.db.message_queue.put((
Event.BLOCK_DONE.value, os.getpid(),
len(done), len(tasks),
{"best_height_processed": best_height_processed}
))
return starting_height, best_height_processed
async def advance(self):
heights = await self.load_blocks()
if heights and heights[0] < heights[-1]:
await self.db.process_inputs(heights)
await self.db.process_claims(heights)
await self.db.process_supports(heights)
await self.run(process_claimtrie, heights)
if self.conf.spv_address_filters:
await self.run(process_block_and_tx_filters, heights)
await self._on_block_controller.add(BlockEvent(heights[1]))
async def advance_loop(self):
while True:
await self.advance_loop_event.wait()
self.advance_loop_event.clear()
try:
await self.advance()
except asyncio.CancelledError:
return
except Exception as e:
log.exception(e)
await self.stop()