This commit is contained in:
Lex Berezhny 2020-04-11 20:15:04 -04:00
parent 53fc94d688
commit 42224dadb6
7 changed files with 90 additions and 78 deletions

View file

@ -1,2 +1,2 @@
from .sync import BlockSync from .sync import BlockchainSync
from .lbrycrd import Lbrycrd from .lbrycrd import Lbrycrd

View file

@ -0,0 +1,37 @@
import os.path
import sqlite3
from typing import Optional
class BlockchainDB:
__slots__ = 'file_path', 'db'
def __init__(self, directory: str):
self.file_path = f"file:{os.path.join(directory, 'block_index.sqlite')}?mode=ro"
self.db: Optional[sqlite3.Connection] = None
def open(self):
self.db = sqlite3.connect(self.file_path, uri=True, timeout=60.0 * 5)
self.db.row_factory = sqlite3.Row
def execute(self, *args, **kwargs):
if self.db is None:
self.open()
return list(self.db.execute(*args, **kwargs).fetchall())
def get_block_files(self):
return self.execute(
"""
SELECT file as file_number, COUNT(hash) as blocks, SUM(txcount) as txs
FROM block_info GROUP BY file ORDER BY file ASC;
"""
)
def get_file_details(self, block_file):
return self.execute(
"""
SELECT datapos as data_offset, height, hash as block_hash, txCount as txs
FROM block_info WHERE file = ? ORDER BY datapos ASC;
""", (block_file,)
)

View file

@ -8,6 +8,7 @@ import tempfile
import urllib.request import urllib.request
from typing import Optional from typing import Optional
from binascii import hexlify from binascii import hexlify
from concurrent.futures import ThreadPoolExecutor
import aiohttp import aiohttp
import zmq import zmq
@ -15,12 +16,13 @@ import zmq.asyncio
from lbry.wallet.stream import StreamController from lbry.wallet.stream import StreamController
from .database import BlockchainDB
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
download_url = ( DOWNLOAD_URL = (
# 'https://github.com/lbryio/lbrycrd/releases/download/v0.17.4.2/lbrycrd-linux-1742.zip' 'https://github.com/lbryio/lbrycrd/releases/download/v0.17.4.4/lbrycrd-linux-1744.zip'
'https://build.lbry.io/lbrycrd/fix_flush_to_not_corrupt/lbrycrd-linux.zip'
) )
@ -81,6 +83,22 @@ class Lbrycrd:
self.on_block = self._on_block_controller.stream self.on_block = self._on_block_controller.stream
self.on_block.listen(lambda e: log.info('%s %s', hexlify(e['hash']), e['msg'])) self.on_block.listen(lambda e: log.info('%s %s', hexlify(e['hash']), e['msg']))
self.db = BlockchainDB(self.actual_data_dir)
self.executor = ThreadPoolExecutor(max_workers=1)
def get_block_file_path_from_number(self, block_file_number):
return os.path.join(self.actual_data_dir, 'blocks', f'blk{block_file_number:05}.dat')
async def get_block_files(self):
return await asyncio.get_running_loop().run_in_executor(
self.executor, self.db.get_block_files
)
async def get_file_details(self, block_file):
return await asyncio.get_running_loop().run_in_executor(
self.executor, self.db.get_file_details, block_file
)
@classmethod @classmethod
def temp_regtest(cls): def temp_regtest(cls):
return cls(tempfile.mkdtemp(), True) return cls(tempfile.mkdtemp(), True)
@ -98,23 +116,23 @@ class Lbrycrd:
async def download(self): async def download(self):
downloaded_file = os.path.join( downloaded_file = os.path.join(
self.bin_dir, download_url[download_url.rfind('/')+1:] self.bin_dir, DOWNLOAD_URL[DOWNLOAD_URL.rfind('/')+1:]
) )
if not os.path.exists(self.bin_dir): if not os.path.exists(self.bin_dir):
os.mkdir(self.bin_dir) os.mkdir(self.bin_dir)
if not os.path.exists(downloaded_file): if not os.path.exists(downloaded_file):
log.info('Downloading: %s', download_url) log.info('Downloading: %s', DOWNLOAD_URL)
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get(download_url) as response: async with session.get(DOWNLOAD_URL) as response:
with open(downloaded_file, 'wb') as out_file: with open(downloaded_file, 'wb') as out_file:
while True: while True:
chunk = await response.content.read(4096) chunk = await response.content.read(4096)
if not chunk: if not chunk:
break break
out_file.write(chunk) out_file.write(chunk)
with urllib.request.urlopen(download_url) as response: with urllib.request.urlopen(DOWNLOAD_URL) as response:
with open(downloaded_file, 'wb') as out_file: with open(downloaded_file, 'wb') as out_file:
shutil.copyfileobj(response, out_file) shutil.copyfileobj(response, out_file)
@ -133,7 +151,7 @@ class Lbrycrd:
def get_start_command(self, *args): def get_start_command(self, *args):
if self.regtest: if self.regtest:
args += '-regtest', args += ('-regtest',)
return ( return (
self.daemon_bin, self.daemon_bin,
f'-datadir={self.data_dir}', f'-datadir={self.data_dir}',
@ -174,7 +192,7 @@ class Lbrycrd:
if not self.subscribed: if not self.subscribed:
self.subscribed = True self.subscribed = True
ctx = zmq.asyncio.Context.instance() ctx = zmq.asyncio.Context.instance()
sock = ctx.socket(zmq.SUB) sock = ctx.socket(zmq.SUB) # pylint: disable=no-member
sock.connect(self.subscription_url) sock.connect(self.subscription_url)
sock.subscribe("hashblock") sock.subscribe("hashblock")
self.subscription = asyncio.create_task(self.subscription_handler(sock)) self.subscription = asyncio.create_task(self.subscription_handler(sock))

View file

@ -1,16 +1,14 @@
import os import os
import asyncio import asyncio
import logging import logging
import tqdm
from threading import Thread from threading import Thread
from multiprocessing import Queue, Event from multiprocessing import Queue, Event
from concurrent import futures from concurrent import futures
from typing import Dict, Tuple
from lbry.wallet.stream import StreamController from lbry.wallet.stream import StreamController
from lbry.db import Database
from .lbrycrd import Lbrycrd from .lbrycrd import Lbrycrd
from .db import AsyncBlockchainDB
from . import worker from . import worker
@ -31,49 +29,10 @@ class ProgressMonitorThread(Thread):
def run(self): def run(self):
asyncio.set_event_loop(self.loop) asyncio.set_event_loop(self.loop)
block_bar = tqdm.tqdm(
desc='total parsing', total=sum(s['total_blocks'] for s in self.state.values()),
unit='blocks', bar_format=self.FORMAT
)
tx_bar = tqdm.tqdm(
desc='total loading', total=sum(s['total_txs'] for s in self.state.values()),
unit='txs', bar_format=self.FORMAT
)
bars: Dict[int, tqdm.tqdm] = {}
while True: while True:
msg = self.queue.get() msg = self.queue.get()
if msg == self.STOP: if msg == self.STOP:
return return
file_num, msg_type, done = msg
bar, state = bars.get(file_num, None), self.state[file_num]
if msg_type == 1:
if bar is None:
bar = bars[file_num] = tqdm.tqdm(
desc=f'├─ blk{file_num:05}.dat parsing', total=state['total_blocks'],
unit='blocks', bar_format=self.FORMAT
)
change = done - state['done_blocks']
state['done_blocks'] = done
bar.update(change)
block_bar.update(change)
if state['total_blocks'] == done:
bar.set_description(''+bar.desc[3:])
bar.close()
bars.pop(file_num)
elif msg_type == 2:
if bar is None:
bar = bars[file_num] = tqdm.tqdm(
desc=f'├─ blk{file_num:05}.dat loading', total=state['total_txs'],
unit='txs', bar_format=self.FORMAT
)
change = done - state['done_txs']
state['done_txs'] = done
bar.update(change)
tx_bar.update(change)
if state['total_txs'] == done:
bar.set_description(''+bar.desc[3:])
bar.close()
bars.pop(file_num)
self.stream_controller.add(msg) self.stream_controller.add(msg)
def shutdown(self): def shutdown(self):
@ -87,25 +46,19 @@ class ProgressMonitorThread(Thread):
self.shutdown() self.shutdown()
class BlockSync: class BlockchainSync:
def __init__(self, chain: Lbrycrd, use_process_pool=False): def __init__(self, chain: Lbrycrd, db: Database, use_process_pool=False):
self.chain = chain self.chain = chain
self.db = db
self.use_process_pool = use_process_pool self.use_process_pool = use_process_pool
self.db = AsyncBlockchainDB.from_path(self.chain.actual_data_dir)
self._on_progress_controller = StreamController() self._on_progress_controller = StreamController()
self.on_progress = self._on_progress_controller.stream self.on_progress = self._on_progress_controller.stream
async def start(self):
await self.db.open()
async def stop(self):
await self.db.close()
def get_worker_pool(self, queue, full_stop) -> futures.Executor: def get_worker_pool(self, queue, full_stop) -> futures.Executor:
args = dict( args = dict(
initializer=worker.initializer, initializer=worker.initializer,
initargs=(self.chain.actual_data_dir, queue, full_stop) initargs=(self.chain.data_dir, self.chain.regtest, self.db.db_path, queue, full_stop)
) )
if not self.use_process_pool: if not self.use_process_pool:
return futures.ThreadPoolExecutor(max_workers=1, **args) return futures.ThreadPoolExecutor(max_workers=1, **args)
@ -118,7 +71,7 @@ class BlockSync:
jobs = [] jobs = []
queue, full_stop = Queue(), Event() queue, full_stop = Queue(), Event()
executor = self.get_worker_pool(queue, full_stop) executor = self.get_worker_pool(queue, full_stop)
files = list(await self.db.get_block_files_not_synced()) files = list(await self.chain.get_block_files_not_synced())
state = { state = {
file.file_number: { file.file_number: {
'status': worker.PENDING, 'status': worker.PENDING,

View file

@ -5,7 +5,8 @@ from dataclasses import dataclass
from itertools import islice from itertools import islice
from lbry.wallet.bcd_data_stream import BCDataStream from lbry.wallet.bcd_data_stream import BCDataStream
from .db import BlockchainDB from lbry.db import Database
from .lbrycrd import Lbrycrd
from .block import Block from .block import Block
@ -23,7 +24,8 @@ def chunk(rows, step):
@dataclass @dataclass
class WorkerContext: class WorkerContext:
db: BlockchainDB lbrycrd: Lbrycrd
db: Database
progress: Queue progress: Queue
stop: Event stop: Event
@ -31,9 +33,10 @@ class WorkerContext:
context: ContextVar[Optional[WorkerContext]] = ContextVar('context') context: ContextVar[Optional[WorkerContext]] = ContextVar('context')
def initializer(data_dir: str, progress: Queue, stop: Event): def initializer(data_dir: str, regtest: bool, db_path: str, progress: Queue, stop: Event):
context.set(WorkerContext( context.set(WorkerContext(
db=BlockchainDB(data_dir).open(), lbrycrd=Lbrycrd(data_dir, regtest),
db=Database(db_path).sync_open(),
progress=progress, progress=progress,
stop=stop stop=stop
)) ))
@ -41,19 +44,19 @@ def initializer(data_dir: str, progress: Queue, stop: Event):
def process_block_file(block_file_number): def process_block_file(block_file_number):
ctx: WorkerContext = context.get() ctx: WorkerContext = context.get()
db, progress, stop = ctx.db, ctx.progress, ctx.stop lbrycrd, db, progress, stop = ctx.lbrycrd, ctx.db, ctx.progress, ctx.stop
block_file_path = db.get_block_file_path_from_number(block_file_number) block_file_path = lbrycrd.get_block_file_path_from_number(block_file_number)
num = 0 num = 0
progress.put_nowait((block_file_number, 1, num)) progress.put_nowait((block_file_number, 1, num))
with open(block_file_path, 'rb') as fp: with open(block_file_path, 'rb') as fp:
stream = BCDataStream(fp=fp) stream = BCDataStream(fp=fp)
blocks, txs, claims, supports, spends = [], [], [], [], [] blocks, txs, claims, supports, spends = [], [], [], [], []
for num, block_info in enumerate(db.get_blocks_not_synced(block_file_number), start=1): for num, block_info in enumerate(lbrycrd.db.get_file_details(block_file_number), start=1):
if ctx.stop.is_set(): if stop.is_set():
return return
if num % 100 == 0: if num % 100 == 0:
progress.put_nowait((block_file_number, 1, num)) progress.put_nowait((block_file_number, 1, num))
fp.seek(block_info.data_offset) fp.seek(block_info['data_offset'])
block = Block(stream) block = Block(stream)
for tx in block.txs: for tx in block.txs:
txs.append((block.block_hash, tx.position, tx.hash)) txs.append((block.block_hash, tx.position, tx.hash))
@ -76,9 +79,11 @@ def process_block_file(block_file_number):
block.block_hash, tx.hash, tx.position, output.ref.hash, output.claim_hash, block.block_hash, tx.hash, tx.position, output.ref.hash, output.claim_hash,
output.claim_name, 2, output.amount, None, None output.claim_name, 2, output.amount, None, None
)) ))
except: except Exception:
pass pass
blocks.append((block.block_hash, block.prev_block_hash, block_file_number, 0 if block.is_first_block else None)) blocks.append(
(block.block_hash, block.prev_block_hash, block_file_number, 0 if block.is_first_block else None)
)
progress.put((block_file_number, 1, num)) progress.put((block_file_number, 1, num))
@ -95,7 +100,7 @@ def process_block_file(block_file_number):
progress.put((block_file_number, 2, done_txs)) progress.put((block_file_number, 2, done_txs))
for sql, rows in queries: for sql, rows in queries:
for chunk_size, chunk_rows in chunk(rows, 10000): for chunk_size, chunk_rows in chunk(rows, 10000):
db.execute_many_tx(sql, chunk_rows) db.sync_executemany(sql, chunk_rows)
done_txs += int(chunk_size/step) done_txs += int(chunk_size/step)
progress.put((block_file_number, 2, done_txs)) progress.put((block_file_number, 2, done_txs))
progress.put((block_file_number, 2, total_txs)) progress.put((block_file_number, 2, total_txs))

View file

@ -18,7 +18,6 @@ from functools import wraps, partial
import ecdsa import ecdsa
import base58 import base58
from sqlalchemy import text
from aiohttp import web from aiohttp import web
from prometheus_client import generate_latest as prom_generate_latest from prometheus_client import generate_latest as prom_generate_latest
from google.protobuf.message import DecodeError from google.protobuf.message import DecodeError

View file

@ -15,7 +15,7 @@ ignore=words,server,rpc,schema,winpaths.py,migrator,undecorated.py
max-parents=10 max-parents=10
max-args=10 max-args=10
max-line-length=120 max-line-length=120
good-names=T,t,n,i,j,k,x,y,s,f,d,h,c,e,op,db,tx,io,cachedproperty,log,id,r,iv,ts,l good-names=T,t,n,i,j,k,x,y,s,f,d,h,c,e,op,db,tx,io,cachedproperty,log,id,r,iv,ts,l,it,fp
valid-metaclass-classmethod-first-arg=mcs valid-metaclass-classmethod-first-arg=mcs
disable= disable=
fixme, fixme,