lbry-sdk/lbry/blockchain/worker.py

107 lines
3.9 KiB
Python
Raw Normal View History

2020-02-27 23:52:18 -05:00
from typing import Optional
from contextvars import ContextVar
from multiprocessing import Queue, Event
from dataclasses import dataclass
from itertools import islice
from lbry.wallet.bcd_data_stream import BCDataStream
2020-04-11 20:15:04 -04:00
from lbry.db import Database
from .lbrycrd import Lbrycrd
2020-02-27 23:52:18 -05:00
from .block import Block
PENDING = 'pending'
RUNNING = 'running'
STOPPED = 'stopped'
def chunk(rows, step):
it, total = iter(rows), len(rows)
for _ in range(0, total, step):
yield min(step, total), islice(it, step)
total -= step
@dataclass
class WorkerContext:
2020-04-11 20:15:04 -04:00
lbrycrd: Lbrycrd
db: Database
2020-02-27 23:52:18 -05:00
progress: Queue
stop: Event
context: ContextVar[Optional[WorkerContext]] = ContextVar('context')
2020-04-11 20:15:04 -04:00
def initializer(data_dir: str, regtest: bool, db_path: str, progress: Queue, stop: Event):
2020-02-27 23:52:18 -05:00
context.set(WorkerContext(
2020-04-11 20:15:04 -04:00
lbrycrd=Lbrycrd(data_dir, regtest),
db=Database(db_path).sync_open(),
2020-02-27 23:52:18 -05:00
progress=progress,
stop=stop
))
def process_block_file(block_file_number):
ctx: WorkerContext = context.get()
2020-04-11 20:15:04 -04:00
lbrycrd, db, progress, stop = ctx.lbrycrd, ctx.db, ctx.progress, ctx.stop
block_file_path = lbrycrd.get_block_file_path_from_number(block_file_number)
2020-02-27 23:52:18 -05:00
num = 0
progress.put_nowait((block_file_number, 1, num))
with open(block_file_path, 'rb') as fp:
stream = BCDataStream(fp=fp)
blocks, txs, claims, supports, spends = [], [], [], [], []
2020-04-11 20:15:04 -04:00
for num, block_info in enumerate(lbrycrd.db.get_file_details(block_file_number), start=1):
if stop.is_set():
2020-02-27 23:52:18 -05:00
return
if num % 100 == 0:
progress.put_nowait((block_file_number, 1, num))
2020-04-11 20:15:04 -04:00
fp.seek(block_info['data_offset'])
2020-02-27 23:52:18 -05:00
block = Block(stream)
for tx in block.txs:
txs.append((block.block_hash, tx.position, tx.hash))
for txi in tx.inputs:
if not txi.is_coinbase:
spends.append((block.block_hash, tx.hash, txi.txo_ref.hash))
for output in tx.outputs:
try:
if output.is_support:
supports.append((
block.block_hash, tx.hash, output.ref.hash, output.claim_hash, output.amount
))
elif output.script.is_claim_name:
claims.append((
block.block_hash, tx.hash, tx.position, output.ref.hash, output.claim_hash,
output.claim_name, 1, output.amount, None, None
))
elif output.script.is_update_claim:
claims.append((
block.block_hash, tx.hash, tx.position, output.ref.hash, output.claim_hash,
output.claim_name, 2, output.amount, None, None
))
2020-04-11 20:15:04 -04:00
except Exception:
2020-02-27 23:52:18 -05:00
pass
2020-04-11 20:15:04 -04:00
blocks.append(
(block.block_hash, block.prev_block_hash, block_file_number, 0 if block.is_first_block else None)
)
2020-02-27 23:52:18 -05:00
progress.put((block_file_number, 1, num))
queries = (
("insert into block values (?, ?, ?, ?)", blocks),
("insert into tx values (?, ?, ?)", txs),
("insert into txi values (?, ?, ?)", spends),
("insert into support values (?, ?, ?, ?, ?)", supports),
("insert into claim_history values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", claims),
)
total_txs = len(txs)
done_txs = 0
step = int(sum(len(q[1]) for q in queries)/total_txs)
progress.put((block_file_number, 2, done_txs))
for sql, rows in queries:
for chunk_size, chunk_rows in chunk(rows, 10000):
2020-04-11 20:15:04 -04:00
db.sync_executemany(sql, chunk_rows)
2020-02-27 23:52:18 -05:00
done_txs += int(chunk_size/step)
progress.put((block_file_number, 2, done_txs))
progress.put((block_file_number, 2, total_txs))