lbry-sdk/lbry/blockchain/sync.py

259 lines
9.7 KiB
Python
Raw Normal View History

2020-02-14 12:19:55 -05:00
import os
2020-02-27 23:52:18 -05:00
import asyncio
2020-02-14 12:19:55 -05:00
import logging
2020-05-01 09:28:51 -04:00
from contextvars import ContextVar
2020-06-05 00:35:22 -04:00
from typing import Optional
2020-02-27 23:52:18 -05:00
2020-05-01 09:28:51 -04:00
from sqlalchemy import func, bindparam
from sqlalchemy.future import select
2020-06-05 00:35:22 -04:00
from lbry.event import BroadcastSubscription
2020-05-20 17:54:38 -04:00
from lbry.service.base import Sync, BlockEvent
from lbry.db import Database, queries, TXO_TYPES
2020-06-05 00:35:22 -04:00
from lbry.db.tables import Claim, Claimtrie, TXO, TXI, Block as BlockTable
from lbry.db.query_context import progress, context, Event
from lbry.db.utils import chunk
2020-02-14 12:19:55 -05:00
from .lbrycrd import Lbrycrd
2020-06-05 00:35:22 -04:00
from .block import Block, create_block_filter
2020-05-01 09:28:51 -04:00
from .bcd_data_stream import BCDataStream
2020-02-14 12:19:55 -05:00
log = logging.getLogger(__name__)
2020-06-05 00:35:22 -04:00
_chain: ContextVar[Lbrycrd] = ContextVar('chain')
2020-02-14 12:19:55 -05:00
2020-06-05 00:35:22 -04:00
def get_or_initialize_lbrycrd(ctx=None) -> Lbrycrd:
chain = _chain.get(None)
if chain is not None:
return chain
chain = Lbrycrd((ctx or context()).ledger)
2020-05-01 09:28:51 -04:00
chain.db.sync_open()
2020-06-05 00:35:22 -04:00
_chain.set(chain)
return chain
def process_block_file(block_file_number, current_height):
ctx = context()
chain = get_or_initialize_lbrycrd(ctx)
stop = ctx.stop_event
loader = ctx.get_bulk_loader()
with progress(Event.BLOCK_READ, 100) as p:
new_blocks = chain.db.sync_get_blocks_in_file(block_file_number, current_height)
if not new_blocks:
return -1
done, total, last_block_processed = 0, len(new_blocks), -1
block_file_path = chain.get_block_file_path_from_number(block_file_number)
p.start(total, {'block_file': block_file_number})
with open(block_file_path, 'rb') as fp:
stream = BCDataStream(fp=fp)
for done, block_info in enumerate(new_blocks, start=1):
if stop.is_set():
return -1
block_height = block_info['height']
fp.seek(block_info['data_offset'])
block = Block.from_data_stream(stream, block_height, block_file_number)
loader.add_block(block)
last_block_processed = block_height
p.step(done)
with progress(Event.BLOCK_SAVE) as p:
p.extra = {'block_file': block_file_number}
loader.save()
2020-05-20 17:54:38 -04:00
return last_block_processed
2020-05-01 09:28:51 -04:00
2020-06-05 00:35:22 -04:00
def process_claimtrie(heights):
chain = get_or_initialize_lbrycrd()
with progress(Event.TRIE_DELETE) as p:
p.start(1)
p.ctx.execute(Claimtrie.delete())
with progress(Event.TRIE_UPDATE) as p, context().connection.begin():
trie = chain.db.sync_get_claimtrie()
p.start(len(trie))
done = 0
for chunk_size, chunk_rows in chunk(trie, 10000):
p.ctx.execute(
Claimtrie.insert(), [{
'normalized': r['normalized'],
'claim_hash': r['claim_hash'],
'last_take_over_height': r['last_take_over_height'],
} for r in chunk_rows]
)
done += chunk_size
p.step(done)
with progress(Event.CLAIM_UPDATE, 250) as p, context().connection.begin():
claims = chain.db.sync_get_claims()
p.start(len(claims))
done = 0
for record in claims:
p.ctx.execute(
Claim.update()
.where(Claim.c.claim_hash == record['claim_hash'])
.values(
activation_height=record['activation_height'],
expiration_height=record['expiration_height']
)
)
done += 1
p.step(done)
2020-02-27 23:52:18 -05:00
2020-06-05 00:35:22 -04:00
with context("effective amount update") as ctx:
support = TXO.alias('support')
effective_amount_update = (
2020-05-01 09:28:51 -04:00
Claim.update()
2020-06-05 00:35:22 -04:00
.where(Claim.c.activation_height <= heights[-1])
2020-05-01 09:28:51 -04:00
.values(
2020-06-05 00:35:22 -04:00
effective_amount=(
select(func.coalesce(func.sum(support.c.amount), 0) + Claim.c.amount)
.select_from(support).where(
(support.c.claim_hash == Claim.c.claim_hash) &
(support.c.txo_type == TXO_TYPES['support']) &
(support.c.txo_hash.notin_(select(TXI.c.txo_hash)))
).scalar_subquery()
)
2020-05-01 09:28:51 -04:00
)
)
2020-06-05 00:35:22 -04:00
ctx.execute(effective_amount_update)
2020-02-27 23:52:18 -05:00
2020-05-01 09:28:51 -04:00
def process_block_and_tx_filters():
2020-02-14 12:19:55 -05:00
2020-06-05 00:35:22 -04:00
with context("effective amount update") as ctx:
blocks = []
all_filters = []
all_addresses = []
for block in queries.get_blocks_without_filters():
addresses = {
ctx.ledger.address_to_hash160(r['address'])
for r in queries.get_block_tx_addresses(block_hash=block['block_hash'])
}
all_addresses.extend(addresses)
block_filter = create_block_filter(addresses)
all_filters.append(block_filter)
blocks.append({'pk': block['block_hash'], 'block_filter': block_filter})
# filters = [get_block_filter(f) for f in all_filters]
ctx.execute(BlockTable.update().where(BlockTable.c.block_hash == bindparam('pk')), blocks)
2020-05-01 09:28:51 -04:00
2020-05-18 08:22:23 -04:00
# txs = []
# for tx in queries.get_transactions_without_filters():
# tx_filter = create_block_filter(
# {r['address'] for r in queries.get_block_tx_addresses(tx_hash=tx['tx_hash'])}
# )
# txs.append({'pk': tx['tx_hash'], 'tx_filter': tx_filter})
# execute(TX.update().where(TX.c.tx_hash == bindparam('pk')), txs)
2020-05-01 09:28:51 -04:00
class BlockchainSync(Sync):
2020-06-05 00:35:22 -04:00
def __init__(self, chain: Lbrycrd, db: Database):
2020-05-20 17:54:38 -04:00
super().__init__(chain.ledger, db)
2020-02-14 12:19:55 -05:00
self.chain = chain
2020-05-01 09:28:51 -04:00
self.on_block_subscription: Optional[BroadcastSubscription] = None
self.advance_loop_task: Optional[asyncio.Task] = None
self.advance_loop_event = asyncio.Event()
async def start(self):
2020-06-05 00:35:22 -04:00
# initial advance as task so that it can be stop()'ed before finishing
2020-05-20 17:54:38 -04:00
self.advance_loop_task = asyncio.create_task(self.advance())
await self.advance_loop_task
2020-05-01 09:28:51 -04:00
self.chain.subscribe()
self.advance_loop_task = asyncio.create_task(self.advance_loop())
self.on_block_subscription = self.chain.on_block.listen(
lambda e: self.advance_loop_event.set()
2020-02-27 23:52:18 -05:00
)
2020-05-01 09:28:51 -04:00
async def stop(self):
self.chain.unsubscribe()
if self.on_block_subscription is not None:
self.on_block_subscription.cancel()
2020-06-05 00:35:22 -04:00
self.db.stop_event.set()
2020-05-01 09:28:51 -04:00
self.advance_loop_task.cancel()
2020-06-05 00:35:22 -04:00
async def run(self, f, *args):
return await asyncio.get_running_loop().run_in_executor(
self.db.executor, f, *args
)
2020-02-27 23:52:18 -05:00
async def load_blocks(self):
2020-05-01 09:28:51 -04:00
tasks = []
2020-06-05 00:35:22 -04:00
starting_height = None
2020-05-20 17:54:38 -04:00
tx_count = block_count = ending_height = 0
2020-06-05 00:35:22 -04:00
for file in await self.chain.db.get_block_files():
# block files may be read and saved out of order, need to check
# each file individually to see if we have missing blocks
current_height = await self.db.get_best_height_for_file(file['file_number'])
if current_height == file['max_height']:
# we have all blocks in this file, skipping
continue
if -1 < current_height < file['max_height']:
# we have some blocks, need to figure out what we're missing
# call get_block_files again limited to this file and current_height
file = (await self.chain.db.get_block_files(
file_number=file['file_number'], above_height=current_height
))[0]
2020-05-20 17:54:38 -04:00
tx_count += file['txs']
block_count += file['blocks']
2020-06-05 00:35:22 -04:00
starting_height = min(
current_height if starting_height is None else starting_height, current_height
)
2020-05-20 17:54:38 -04:00
ending_height = max(ending_height, file['max_height'])
2020-06-05 00:35:22 -04:00
tasks.append(self.run(process_block_file, file['file_number'], current_height))
if not tasks:
return None
2020-05-20 17:54:38 -04:00
await self._on_progress_controller.add({
2020-06-05 00:35:22 -04:00
"event": "blockchain.sync.start",
"data": {
"starting_height": starting_height,
"ending_height": ending_height,
"files": len(tasks),
"blocks": block_count,
"txs": tx_count
2020-05-20 17:54:38 -04:00
}
})
2020-05-01 09:28:51 -04:00
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_EXCEPTION
)
if pending:
2020-06-05 00:35:22 -04:00
self.db.stop_event.set()
2020-05-01 09:28:51 -04:00
for future in pending:
future.cancel()
2020-06-05 00:35:22 -04:00
return None
2020-05-20 17:54:38 -04:00
best_height_processed = max(f.result() for f in done)
# putting event in queue instead of add to progress_controller because
# we want this message to appear after all of the queued messages from workers
2020-06-05 00:35:22 -04:00
self.db.message_queue.put((
Event.BLOCK_DONE.value, os.getpid(),
len(done), len(tasks),
{"best_height_processed": best_height_processed}
))
return starting_height, best_height_processed
2020-05-01 09:28:51 -04:00
async def advance(self):
2020-06-05 00:35:22 -04:00
heights = await self.load_blocks()
if heights and heights[0] < heights[-1]:
await self.db.process_inputs(heights)
await self.db.process_claims(heights)
await self.db.process_supports(heights)
await self.run(process_claimtrie, heights)
if self.conf.spv_address_filters:
await self.run(process_block_and_tx_filters, heights)
await self._on_block_controller.add(BlockEvent(heights[1]))
2020-05-01 09:28:51 -04:00
async def advance_loop(self):
while True:
await self.advance_loop_event.wait()
self.advance_loop_event.clear()
2020-06-05 00:35:22 -04:00
try:
await self.advance()
except asyncio.CancelledError:
return
except Exception as e:
log.exception(e)
await self.stop()