lbry-sdk/lbry/blockchain/worker.py
Lex Berezhny 42224dadb6 pylint
2020-04-11 20:15:04 -04:00

106 lines
3.9 KiB
Python

from typing import Optional
from contextvars import ContextVar
from multiprocessing import Queue, Event
from dataclasses import dataclass
from itertools import islice
from lbry.wallet.bcd_data_stream import BCDataStream
from lbry.db import Database
from .lbrycrd import Lbrycrd
from .block import Block
PENDING = 'pending'
RUNNING = 'running'
STOPPED = 'stopped'
def chunk(rows, step):
it, total = iter(rows), len(rows)
for _ in range(0, total, step):
yield min(step, total), islice(it, step)
total -= step
@dataclass
class WorkerContext:
lbrycrd: Lbrycrd
db: Database
progress: Queue
stop: Event
context: ContextVar[Optional[WorkerContext]] = ContextVar('context')
def initializer(data_dir: str, regtest: bool, db_path: str, progress: Queue, stop: Event):
context.set(WorkerContext(
lbrycrd=Lbrycrd(data_dir, regtest),
db=Database(db_path).sync_open(),
progress=progress,
stop=stop
))
def process_block_file(block_file_number):
ctx: WorkerContext = context.get()
lbrycrd, db, progress, stop = ctx.lbrycrd, ctx.db, ctx.progress, ctx.stop
block_file_path = lbrycrd.get_block_file_path_from_number(block_file_number)
num = 0
progress.put_nowait((block_file_number, 1, num))
with open(block_file_path, 'rb') as fp:
stream = BCDataStream(fp=fp)
blocks, txs, claims, supports, spends = [], [], [], [], []
for num, block_info in enumerate(lbrycrd.db.get_file_details(block_file_number), start=1):
if stop.is_set():
return
if num % 100 == 0:
progress.put_nowait((block_file_number, 1, num))
fp.seek(block_info['data_offset'])
block = Block(stream)
for tx in block.txs:
txs.append((block.block_hash, tx.position, tx.hash))
for txi in tx.inputs:
if not txi.is_coinbase:
spends.append((block.block_hash, tx.hash, txi.txo_ref.hash))
for output in tx.outputs:
try:
if output.is_support:
supports.append((
block.block_hash, tx.hash, output.ref.hash, output.claim_hash, output.amount
))
elif output.script.is_claim_name:
claims.append((
block.block_hash, tx.hash, tx.position, output.ref.hash, output.claim_hash,
output.claim_name, 1, output.amount, None, None
))
elif output.script.is_update_claim:
claims.append((
block.block_hash, tx.hash, tx.position, output.ref.hash, output.claim_hash,
output.claim_name, 2, output.amount, None, None
))
except Exception:
pass
blocks.append(
(block.block_hash, block.prev_block_hash, block_file_number, 0 if block.is_first_block else None)
)
progress.put((block_file_number, 1, num))
queries = (
("insert into block values (?, ?, ?, ?)", blocks),
("insert into tx values (?, ?, ?)", txs),
("insert into txi values (?, ?, ?)", spends),
("insert into support values (?, ?, ?, ?, ?)", supports),
("insert into claim_history values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", claims),
)
total_txs = len(txs)
done_txs = 0
step = int(sum(len(q[1]) for q in queries)/total_txs)
progress.put((block_file_number, 2, done_txs))
for sql, rows in queries:
for chunk_size, chunk_rows in chunk(rows, 10000):
db.sync_executemany(sql, chunk_rows)
done_txs += int(chunk_size/step)
progress.put((block_file_number, 2, done_txs))
progress.put((block_file_number, 2, total_txs))