parsing and inserting works

This commit is contained in:
Lex Berezhny 2020-02-27 23:52:18 -05:00
parent 66666e1167
commit fd2f9846e9
7 changed files with 393 additions and 149 deletions

View file

@ -15,8 +15,7 @@ class Block:
'bits', 'nonce', 'txs'
)
def __init__(self, stream):
stream.read_uint32() # block size
def __init__(self, stream: BCDataStream):
header = stream.data.read(112)
version, = struct.unpack('<I', header[:4])
timestamp, bits, nonce = struct.unpack('<III', header[100:112])
@ -37,11 +36,3 @@ class Block:
@property
def is_first_block(self):
return self.prev_block_hash == ZERO_BLOCK
def read_blocks(block_file):
with open(block_file, 'rb') as fp:
stream = BCDataStream(fp=fp)
#while stream.read_uint32() == 4054508794:
while stream.read_uint32() == 3517637882:
yield Block(stream)

View file

@ -1,36 +1,39 @@
import os
import asyncio
from collections import namedtuple
from concurrent import futures
from collections import namedtuple, deque
import sqlite3
import apsw
DDL = """
pragma journal_mode=WAL;
create table block (
block_hash bytes not null,
create table if not exists block (
block_hash bytes not null primary key,
previous_hash bytes not null,
file_number integer not null,
height int
);
create table tx (
create table if not exists tx (
block_hash integer not null,
position integer not null,
tx_hash bytes not null
);
create table txi (
create table if not exists txi (
block_hash bytes not null,
tx_hash bytes not null,
txo_hash bytes not null
);
create table claim (
create table if not exists claim (
txo_hash bytes not null,
claim_hash bytes not null,
claim_name text not null,
amount integer not null,
height integer
);
create table claim_history (
create table if not exists claim_history (
block_hash bytes not null,
tx_hash bytes not null,
tx_position integer not null,
@ -42,7 +45,7 @@ create table claim_history (
height integer,
is_spent bool
);
create table support (
create table if not exists support (
block_hash bytes not null,
tx_hash bytes not null,
txo_hash bytes not null,
@ -60,22 +63,35 @@ class BlockchainDB:
self.db = None
self.directory = path
@property
def db_file_path(self):
return os.path.join(self.directory, 'blockchain.db')
def open(self):
self.db = apsw.Connection(
os.path.join(self.directory, 'blockchain.db'),
flags=(
apsw.SQLITE_OPEN_READWRITE |
apsw.SQLITE_OPEN_CREATE |
apsw.SQLITE_OPEN_URI
)
)
def exec_factory(cursor, statement, bindings):
tpl = namedtuple('row', (d[0] for d in cursor.getdescription()))
cursor.setrowtrace(lambda cursor, row: tpl(*row))
return True
self.db.setexectrace(exec_factory)
self.execute(DDL)
self.execute(f"ATTACH {os.path.join(self._db_path, 'block_index.sqlite')} AS block_index")
self.db = sqlite3.connect(self.db_file_path, isolation_level=None, uri=True, timeout=60.0 * 5)
self.db.executescript("""
pragma journal_mode=wal;
""")
# self.db = apsw.Connection(
# self.db_file_path,
# flags=(
# apsw.SQLITE_OPEN_READWRITE |
# apsw.SQLITE_OPEN_CREATE |
# apsw.SQLITE_OPEN_URI
# )
# )
self.execute_ddl(DDL)
self.execute(f"ATTACH ? AS block_index", ('file:'+os.path.join(self.directory, 'block_index.sqlite')+'?mode=ro',))
#def exec_factory(cursor, statement, bindings):
# tpl = namedtuple('row', (d[0] for d in cursor.getdescription()))
# cursor.setrowtrace(lambda cursor, row: tpl(*row))
# return True
#self.db.setexectrace(exec_factory)
def row_factory(cursor, row):
tpl = namedtuple('row', (d[0] for d in cursor.description))
return tpl(*row)
self.db.row_factory = row_factory
return self
def close(self):
if self.db is not None:
@ -84,41 +100,73 @@ class BlockchainDB:
def execute(self, *args):
return self.db.cursor().execute(*args)
def executemany(self, *args):
def execute_many(self, *args):
return self.db.cursor().executemany(*args)
def execute_many_tx(self, *args):
cursor = self.db.cursor()
cursor.execute('begin;')
result = cursor.executemany(*args)
cursor.execute('commit;')
return result
def execute_ddl(self, *args):
self.db.executescript(*args)
#deque(self.execute(*args), maxlen=0)
def begin(self):
self.execute('begin;')
def commit(self):
self.execute('commit;')
def get_blocks(self):
pass
def get_block_file_path_from_number(self, block_file_number):
return os.path.join(self.directory, 'blocks', f'blk{block_file_number:05}.dat')
def get_block_files_not_synced(self):
return list(self.execute(
"""
SELECT file as file_number, COUNT(hash) as blocks, SUM(txcount) as txs
FROM block_index.block_info
WHERE hash NOT IN (SELECT block_hash FROM block)
GROUP BY file ORDER BY file ASC;
"""
))
def get_blocks_not_synced(self, block_file):
return self.execute(
"""
SELECT datapos as data_offset, height, hash as block_hash, txCount as txs
FROM block_index.block_info
WHERE file = ? AND hash NOT IN (SELECT block_hash FROM block)
ORDER BY datapos ASC;
""", (block_file,)
)
class AsyncBlockchainDB:
__slots__ = 'db',
def __init__(self, db: BlockchainDB):
self.db = db
self.sync_db = db
self.executor = futures.ThreadPoolExecutor(max_workers=1)
@classmethod
def from_path(cls, path: str):
def from_path(cls, path: str) -> 'AsyncBlockchainDB':
return cls(BlockchainDB(path))
@staticmethod
async def run_in_executor(func, *args):
def get_block_file_path_from_number(self, block_file_number):
return self.sync_db.get_block_file_path_from_number(block_file_number)
async def run_in_executor(self, func, *args):
return await asyncio.get_running_loop().run_in_executor(
None, func, *args
self.executor, func, *args
)
async def open(self):
return await self.run_in_executor(self.db.open)
return await self.run_in_executor(self.sync_db.open)
async def close(self):
return await self.run_in_executor(self.db.close)
return await self.run_in_executor(self.sync_db.close)
async def get_blocks(self):
return await self.run_in_executor(self.db.get_blocks)
async def get_block_files_not_synced(self):
return await self.run_in_executor(self.sync_db.get_block_files_not_synced)

View file

@ -56,8 +56,12 @@ class Process(asyncio.SubprocessProtocol):
class Lbrycrd:
def __init__(self, path=None):
self.data_path = path
def __init__(self, path, regtest=False):
self.data_dir = self.actual_data_dir = path
self.regtest = regtest
if regtest:
self.actual_data_dir = os.path.join(self.data_dir, 'regtest')
self.blocks_dir = os.path.join(self.actual_data_dir, 'blocks')
self.bin_dir = os.path.join(os.path.dirname(__file__), 'bin')
self.daemon_bin = os.path.join(self.bin_dir, 'lbrycrdd')
self.cli_bin = os.path.join(self.bin_dir, 'lbrycrd-cli')
@ -71,13 +75,15 @@ class Lbrycrd:
self.session: Optional[aiohttp.ClientSession] = None
self.subscribed = False
self.subscription: Optional[asyncio.Task] = None
self.subscription_url = 'tcp://127.0.0.1:29000'
self.default_generate_address = None
self._on_block_controller = StreamController()
self.on_block = self._on_block_controller.stream
self.on_block.listen(lambda e: log.info('%s %s', hexlify(e['hash']), e['msg']))
@classmethod
def regtest(cls):
return cls(tempfile.mkdtemp())
def temp_regtest(cls):
return cls(tempfile.mkdtemp(), True)
@property
def rpc_url(self):
@ -125,18 +131,26 @@ class Lbrycrd:
async def ensure(self):
return self.exists or await self.download()
def get_start_command(self, *args):
if self.regtest:
args += '-regtest',
return (
self.daemon_bin,
f'-datadir={self.data_dir}',
f'-port={self.peerport}',
f'-rpcport={self.rpcport}',
f'-rpcuser={self.rpcuser}',
f'-rpcpassword={self.rpcpassword}',
f'-zmqpubhashblock={self.subscription_url}',
'-server', '-printtoconsole',
*args
)
async def start(self, *args):
loop = asyncio.get_event_loop()
command = [
self.daemon_bin,
f'-datadir={self.data_path}', '-printtoconsole', '-regtest', '-server',
f'-rpcuser={self.rpcuser}', f'-rpcpassword={self.rpcpassword}', f'-rpcport={self.rpcport}',
f'-port={self.peerport}', '-zmqpubhashblock=tcp://127.0.0.1:29000', *args
]
command = self.get_start_command(*args)
log.info(' '.join(command))
self.transport, self.protocol = await loop.subprocess_exec(
Process, *command
)
self.transport, self.protocol = await loop.subprocess_exec(Process, *command)
await self.protocol.ready.wait()
assert not self.protocol.stopped.is_set()
self.session = aiohttp.ClientSession()
@ -146,14 +160,14 @@ class Lbrycrd:
await self.session.close()
self.transport.terminate()
await self.protocol.stopped.wait()
self.transport.close()
assert self.transport.get_returncode() == 0, "lbrycrd daemon exit with error"
finally:
if cleanup:
await self.cleanup()
async def cleanup(self):
await asyncio.get_running_loop().run_in_executor(
None, shutil.rmtree, self.data_path, True
None, shutil.rmtree, self.data_dir, True
)
def subscribe(self):
@ -161,7 +175,7 @@ class Lbrycrd:
self.subscribed = True
ctx = zmq.asyncio.Context.instance()
sock = ctx.socket(zmq.SUB)
sock.connect("tcp://127.0.0.1:29000")
sock.connect(self.subscription_url)
sock.subscribe("hashblock")
self.subscription = asyncio.create_task(self.subscription_handler(sock))
@ -202,7 +216,24 @@ class Lbrycrd:
raise Exception(result['error'])
async def generate(self, blocks):
return await self.rpc("generate", [blocks])
if self.default_generate_address is None:
self.default_generate_address = await self.get_new_address()
return await self.generate_to_address(blocks, self.default_generate_address)
async def get_new_address(self):
return await self.rpc("getnewaddress")
async def generate_to_address(self, blocks, address):
return await self.rpc("generatetoaddress", [blocks, address])
async def fund_raw_transaction(self, tx):
return await self.rpc("fundrawtransaction", [tx])
async def sign_raw_transaction_with_wallet(self, tx):
return await self.rpc("signrawtransactionwithwallet", [tx])
async def send_raw_transaction(self, tx):
return await self.rpc("sendrawtransaction", [tx])
async def claim_name(self, name, data, amount):
return await self.rpc("claimname", [name, data, amount])

View file

@ -1,22 +1,100 @@
import os
import time
import asyncio
import logging
from glob import glob
from concurrent.futures import ProcessPoolExecutor
import tqdm
from threading import Thread
from multiprocessing import Queue, Event
from concurrent import futures
from typing import Dict, Tuple
from lbry.wallet.stream import StreamController
from .lbrycrd import Lbrycrd
from .block import read_blocks
from .db import AsyncBlockchainDB
from . import worker
log = logging.getLogger(__name__)
class ProgressMonitorThread(Thread):
STOP = 'stop'
FORMAT = '{l_bar}{bar}| {n_fmt:>6}/{total_fmt:>7} [{elapsed}<{remaining:>5}, {rate_fmt:>15}]'
def __init__(self, state: dict, queue: Queue, stream_controller: StreamController):
super().__init__()
self.state = state
self.queue = queue
self.stream_controller = stream_controller
self.loop = asyncio.get_event_loop()
def run(self):
asyncio.set_event_loop(self.loop)
block_bar = tqdm.tqdm(
desc='total parsing', total=sum(s['total_blocks'] for s in self.state.values()),
unit='blocks', bar_format=self.FORMAT
)
tx_bar = tqdm.tqdm(
desc='total loading', total=sum(s['total_txs'] for s in self.state.values()),
unit='txs', bar_format=self.FORMAT
)
bars: Dict[int, tqdm.tqdm] = {}
while True:
msg = self.queue.get()
if msg == self.STOP:
return
file_num, msg_type, done = msg
bar, state = bars.get(file_num, None), self.state[file_num]
if msg_type == 1:
if bar is None:
bar = bars[file_num] = tqdm.tqdm(
desc=f'├─ blk{file_num:05}.dat parsing', total=state['total_blocks'],
unit='blocks', bar_format=self.FORMAT
)
change = done - state['done_blocks']
state['done_blocks'] = done
bar.update(change)
block_bar.update(change)
if state['total_blocks'] == done:
bar.set_description(''+bar.desc[3:])
bar.close()
bars.pop(file_num)
elif msg_type == 2:
if bar is None:
bar = bars[file_num] = tqdm.tqdm(
desc=f'├─ blk{file_num:05}.dat loading', total=state['total_txs'],
unit='txs', bar_format=self.FORMAT
)
change = done - state['done_txs']
state['done_txs'] = done
bar.update(change)
tx_bar.update(change)
if state['total_txs'] == done:
bar.set_description(''+bar.desc[3:])
bar.close()
bars.pop(file_num)
self.stream_controller.add(msg)
def shutdown(self):
self.queue.put(self.STOP)
self.join()
def __enter__(self):
self.start()
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()
class BlockSync:
def __init__(self, chain: Lbrycrd):
def __init__(self, chain: Lbrycrd, use_process_pool=False):
self.chain = chain
self.db = AsyncBlockchainDB.from_path(os.path.join(self.chain.data_path, 'regtest'))
self.use_process_pool = use_process_pool
self.db = AsyncBlockchainDB.from_path(self.chain.actual_data_dir)
self._on_progress_controller = StreamController()
self.on_progress = self._on_progress_controller.stream
async def start(self):
await self.db.open()
@ -24,86 +102,59 @@ class BlockSync:
async def stop(self):
await self.db.close()
async def cleanup(self):
pass
def get_worker_pool(self, queue, full_stop) -> futures.Executor:
args = dict(
initializer=worker.initializer,
initargs=(self.chain.actual_data_dir, queue, full_stop)
)
if not self.use_process_pool:
return futures.ThreadPoolExecutor(max_workers=1, **args)
return futures.ProcessPoolExecutor(max_workers=max(os.cpu_count()-1, 4), **args)
def get_progress_monitor(self, state, queue) -> ProgressMonitorThread:
return ProgressMonitorThread(state, queue, self._on_progress_controller)
def process_file(block_file):
blocks, txs, claims, supports, spends = [], [], [], [], []
for block in read_blocks(block_file):
for tx in block.txs:
txs.append((block.block_hash, tx.position, tx.hash))
for txi in tx.inputs:
if not txi.is_coinbase:
spends.append((block.block_hash, tx.hash, txi.txo_ref.hash))
for output in tx.outputs:
try:
if output.is_support:
supports.append((
block.block_hash, tx.hash, output.ref.hash, output.claim_hash, output.amount
))
elif output.script.is_claim_name:
claims.append((
block.block_hash, tx.hash, tx.position, output.ref.hash, output.claim_hash,
output.claim_name, 1, output.amount, None, None
))
elif output.script.is_update_claim:
claims.append((
block.block_hash, tx.hash, tx.position, output.ref.hash, output.claim_hash,
output.claim_name, 2, output.amount, None, None
))
except:
pass
blocks.append((block.block_hash, block.prev_block_hash, 0 if block.is_first_block else None))
async def load_blocks(self):
jobs = []
queue, full_stop = Queue(), Event()
executor = self.get_worker_pool(queue, full_stop)
files = list(await self.db.get_block_files_not_synced())
state = {
file.file_number: {
'status': worker.PENDING,
'done_txs': 0,
'total_txs': file.txs,
'done_blocks': 0,
'total_blocks': file.blocks,
} for file in files
}
progress = self.get_progress_monitor(state, queue)
progress.start()
sql = db.get()
def cancel_all_the_things():
for job in jobs:
job.cancel()
full_stop.set()
for job in jobs:
exception = job.exception()
if exception is not None:
raise exception
sql.execute('begin;')
sql.executemany("insert into block values (?, ?, ?)", blocks)
sql.execute('commit;')
try:
sql.execute('begin;')
sql.executemany("insert into tx values (?, ?, ?)", txs)
sql.execute('commit;')
for file in files:
jobs.append(executor.submit(worker.process_block_file, file.file_number))
sql.execute('begin;')
sql.executemany("insert into txi values (?, ?, ?)", spends)
sql.execute('commit;')
done, not_done = await asyncio.get_event_loop().run_in_executor(
None, futures.wait, jobs, None, futures.FIRST_EXCEPTION
)
if not_done:
cancel_all_the_things()
sql.execute('begin;')
sql.executemany("insert into support values (?, ?, ?, ?, ?)", supports)
sql.execute('commit;')
except asyncio.CancelledError:
cancel_all_the_things()
raise
sql.execute('begin;')
sql.executemany("insert into claim_history values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", claims)
sql.execute('commit;')
return len(blocks), len(txs)
async def main():
#lbrycrd = os.path.expanduser('~/.lbrycrd/')
output_db = '/tmp/fast_sync.db'
if os.path.exists(output_db):
os.remove(output_db)
initializer(output_db)
create_db()
executor = ProcessPoolExecutor(
6, initializer=initializer, initargs=(output_db,)
)
file_paths = glob(os.path.join(lbrycrd, 'regtest', 'blocks', 'blk*.dat'))
file_paths.sort()
total_blocks, total_txs = 0, 0
start = time.perf_counter()
for file_path, (blocks, txs) in zip(file_paths, executor.map(process_file, file_paths)):
print(f"{file_path} {blocks}")
total_blocks += blocks
total_txs += txs
print(f'blocks: {total_blocks} (txs: {total_txs}) in {time.perf_counter()-start}s')
print('cleaning chain: set block heights and delete forks')
#clean_chain()
print(f'done in {time.perf_counter()-start}s')
await blockchain.stop()
finally:
progress.shutdown()
executor.shutdown()

101
lbry/blockchain/worker.py Normal file
View file

@ -0,0 +1,101 @@
from typing import Optional
from contextvars import ContextVar
from multiprocessing import Queue, Event
from dataclasses import dataclass
from itertools import islice
from lbry.wallet.bcd_data_stream import BCDataStream
from .db import BlockchainDB
from .block import Block
PENDING = 'pending'
RUNNING = 'running'
STOPPED = 'stopped'
def chunk(rows, step):
it, total = iter(rows), len(rows)
for _ in range(0, total, step):
yield min(step, total), islice(it, step)
total -= step
@dataclass
class WorkerContext:
db: BlockchainDB
progress: Queue
stop: Event
context: ContextVar[Optional[WorkerContext]] = ContextVar('context')
def initializer(data_dir: str, progress: Queue, stop: Event):
context.set(WorkerContext(
db=BlockchainDB(data_dir).open(),
progress=progress,
stop=stop
))
def process_block_file(block_file_number):
ctx: WorkerContext = context.get()
db, progress, stop = ctx.db, ctx.progress, ctx.stop
block_file_path = db.get_block_file_path_from_number(block_file_number)
num = 0
progress.put_nowait((block_file_number, 1, num))
with open(block_file_path, 'rb') as fp:
stream = BCDataStream(fp=fp)
blocks, txs, claims, supports, spends = [], [], [], [], []
for num, block_info in enumerate(db.get_blocks_not_synced(block_file_number), start=1):
if ctx.stop.is_set():
return
if num % 100 == 0:
progress.put_nowait((block_file_number, 1, num))
fp.seek(block_info.data_offset)
block = Block(stream)
for tx in block.txs:
txs.append((block.block_hash, tx.position, tx.hash))
for txi in tx.inputs:
if not txi.is_coinbase:
spends.append((block.block_hash, tx.hash, txi.txo_ref.hash))
for output in tx.outputs:
try:
if output.is_support:
supports.append((
block.block_hash, tx.hash, output.ref.hash, output.claim_hash, output.amount
))
elif output.script.is_claim_name:
claims.append((
block.block_hash, tx.hash, tx.position, output.ref.hash, output.claim_hash,
output.claim_name, 1, output.amount, None, None
))
elif output.script.is_update_claim:
claims.append((
block.block_hash, tx.hash, tx.position, output.ref.hash, output.claim_hash,
output.claim_name, 2, output.amount, None, None
))
except:
pass
blocks.append((block.block_hash, block.prev_block_hash, block_file_number, 0 if block.is_first_block else None))
progress.put((block_file_number, 1, num))
queries = (
("insert into block values (?, ?, ?, ?)", blocks),
("insert into tx values (?, ?, ?)", txs),
("insert into txi values (?, ?, ?)", spends),
("insert into support values (?, ?, ?, ?, ?)", supports),
("insert into claim_history values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", claims),
)
total_txs = len(txs)
done_txs = 0
step = int(sum(len(q[1]) for q in queries)/total_txs)
progress.put((block_file_number, 2, done_txs))
for sql, rows in queries:
for chunk_size, chunk_rows in chunk(rows, 10000):
db.execute_many_tx(sql, chunk_rows)
done_txs += int(chunk_size/step)
progress.put((block_file_number, 2, done_txs))
progress.put((block_file_number, 2, total_txs))

View file

@ -6,8 +6,8 @@ import asyncio
import struct
from contextvars import ContextVar
from concurrent.futures import ProcessPoolExecutor
from torba.client.bcd_data_stream import BCDataStream
from torba.client.hash import double_sha256
from lbry.wallet.bcd_data_stream import BCDataStream
from lbry.crypto.hash import double_sha256
from lbry.wallet.transaction import Transaction
from binascii import hexlify
@ -49,10 +49,11 @@ def parse_header(header):
def parse_txs(stream):
tx_count = stream.read_compact_size()
return [Transaction.from_stream(i, stream) for i in range(tx_count)]
return [Transaction(position=i)._deserialize(stream) for i in range(tx_count)]
def process_file(file_path):
print(f'started: {file_path}')
sql = db.get()
stream = BCDataStream()
stream.data = open(file_path, 'rb')
@ -86,6 +87,8 @@ def process_file(file_path):
pass
blocks.append((header['block_hash'], header['prev_block_hash'], 0 if is_first_block else None))
print(f'inserting sql in {file_path}')
sql.execute('begin;')
sql.executemany("insert into block values (?, ?, ?)", blocks)
sql.execute('commit;')
@ -265,7 +268,7 @@ async def main():
total_txs += txs
print(f'blocks: {total_blocks} (txs: {total_txs}) in {time.perf_counter()-start}s')
print('cleaning chain: set block heights and delete forks')
clean_chain()
#clean_chain()
print(f'done in {time.perf_counter()-start}s')
executor.shutdown(True)

19
scripts/fast_sync.py Normal file
View file

@ -0,0 +1,19 @@
import os.path
import asyncio
from lbry.blockchain import Lbrycrd
from lbry.blockchain.sync import BlockSync
async def main():
chain = Lbrycrd(os.path.expanduser('~/.lbrycrd'), False)
sync = BlockSync(chain, use_process_pool=True)
if os.path.exists(sync.db.sync_db.db_file_path):
os.remove(sync.db.sync_db.db_file_path)
await sync.db.open()
await sync.load_blocks()
#await chain.stop(False)
try:
asyncio.run(main())
except KeyboardInterrupt:
print('exiting')