diff --git a/lbry/blockchain/__init__.py b/lbry/blockchain/__init__.py index 6c1cc0ec5..d42e3e1b1 100644 --- a/lbry/blockchain/__init__.py +++ b/lbry/blockchain/__init__.py @@ -1,2 +1,2 @@ -from .sync import BlockSync -from .lbrycrd import Lbrycrd \ No newline at end of file +from .sync import BlockchainSync +from .lbrycrd import Lbrycrd diff --git a/lbry/blockchain/database.py b/lbry/blockchain/database.py new file mode 100644 index 000000000..227e91af4 --- /dev/null +++ b/lbry/blockchain/database.py @@ -0,0 +1,37 @@ +import os.path +import sqlite3 +from typing import Optional + + +class BlockchainDB: + + __slots__ = 'file_path', 'db' + + def __init__(self, directory: str): + self.file_path = f"file:{os.path.join(directory, 'block_index.sqlite')}?mode=ro" + self.db: Optional[sqlite3.Connection] = None + + def open(self): + self.db = sqlite3.connect(self.file_path, uri=True, timeout=60.0 * 5) + self.db.row_factory = sqlite3.Row + + def execute(self, *args, **kwargs): + if self.db is None: + self.open() + return list(self.db.execute(*args, **kwargs).fetchall()) + + def get_block_files(self): + return self.execute( + """ + SELECT file as file_number, COUNT(hash) as blocks, SUM(txcount) as txs + FROM block_info GROUP BY file ORDER BY file ASC; + """ + ) + + def get_file_details(self, block_file): + return self.execute( + """ + SELECT datapos as data_offset, height, hash as block_hash, txCount as txs + FROM block_info WHERE file = ? ORDER BY datapos ASC; + """, (block_file,) + ) diff --git a/lbry/blockchain/lbrycrd.py b/lbry/blockchain/lbrycrd.py index 24b5ecf2b..76d9a25a5 100644 --- a/lbry/blockchain/lbrycrd.py +++ b/lbry/blockchain/lbrycrd.py @@ -8,6 +8,7 @@ import tempfile import urllib.request from typing import Optional from binascii import hexlify +from concurrent.futures import ThreadPoolExecutor import aiohttp import zmq @@ -15,12 +16,13 @@ import zmq.asyncio from lbry.wallet.stream import StreamController +from .database import BlockchainDB + log = logging.getLogger(__name__) -download_url = ( -# 'https://github.com/lbryio/lbrycrd/releases/download/v0.17.4.2/lbrycrd-linux-1742.zip' - 'https://build.lbry.io/lbrycrd/fix_flush_to_not_corrupt/lbrycrd-linux.zip' +DOWNLOAD_URL = ( + 'https://github.com/lbryio/lbrycrd/releases/download/v0.17.4.4/lbrycrd-linux-1744.zip' ) @@ -81,6 +83,22 @@ class Lbrycrd: self.on_block = self._on_block_controller.stream self.on_block.listen(lambda e: log.info('%s %s', hexlify(e['hash']), e['msg'])) + self.db = BlockchainDB(self.actual_data_dir) + self.executor = ThreadPoolExecutor(max_workers=1) + + def get_block_file_path_from_number(self, block_file_number): + return os.path.join(self.actual_data_dir, 'blocks', f'blk{block_file_number:05}.dat') + + async def get_block_files(self): + return await asyncio.get_running_loop().run_in_executor( + self.executor, self.db.get_block_files + ) + + async def get_file_details(self, block_file): + return await asyncio.get_running_loop().run_in_executor( + self.executor, self.db.get_file_details, block_file + ) + @classmethod def temp_regtest(cls): return cls(tempfile.mkdtemp(), True) @@ -98,23 +116,23 @@ class Lbrycrd: async def download(self): downloaded_file = os.path.join( - self.bin_dir, download_url[download_url.rfind('/')+1:] + self.bin_dir, DOWNLOAD_URL[DOWNLOAD_URL.rfind('/')+1:] ) if not os.path.exists(self.bin_dir): os.mkdir(self.bin_dir) if not os.path.exists(downloaded_file): - log.info('Downloading: %s', download_url) + log.info('Downloading: %s', DOWNLOAD_URL) async with aiohttp.ClientSession() as session: - async with session.get(download_url) as response: + async with session.get(DOWNLOAD_URL) as response: with open(downloaded_file, 'wb') as out_file: while True: chunk = await response.content.read(4096) if not chunk: break out_file.write(chunk) - with urllib.request.urlopen(download_url) as response: + with urllib.request.urlopen(DOWNLOAD_URL) as response: with open(downloaded_file, 'wb') as out_file: shutil.copyfileobj(response, out_file) @@ -133,7 +151,7 @@ class Lbrycrd: def get_start_command(self, *args): if self.regtest: - args += '-regtest', + args += ('-regtest',) return ( self.daemon_bin, f'-datadir={self.data_dir}', @@ -174,7 +192,7 @@ class Lbrycrd: if not self.subscribed: self.subscribed = True ctx = zmq.asyncio.Context.instance() - sock = ctx.socket(zmq.SUB) + sock = ctx.socket(zmq.SUB) # pylint: disable=no-member sock.connect(self.subscription_url) sock.subscribe("hashblock") self.subscription = asyncio.create_task(self.subscription_handler(sock)) diff --git a/lbry/blockchain/sync.py b/lbry/blockchain/sync.py index a6374c580..6392b667a 100644 --- a/lbry/blockchain/sync.py +++ b/lbry/blockchain/sync.py @@ -1,16 +1,14 @@ import os import asyncio import logging -import tqdm from threading import Thread from multiprocessing import Queue, Event from concurrent import futures -from typing import Dict, Tuple from lbry.wallet.stream import StreamController +from lbry.db import Database from .lbrycrd import Lbrycrd -from .db import AsyncBlockchainDB from . import worker @@ -31,49 +29,10 @@ class ProgressMonitorThread(Thread): def run(self): asyncio.set_event_loop(self.loop) - block_bar = tqdm.tqdm( - desc='total parsing', total=sum(s['total_blocks'] for s in self.state.values()), - unit='blocks', bar_format=self.FORMAT - ) - tx_bar = tqdm.tqdm( - desc='total loading', total=sum(s['total_txs'] for s in self.state.values()), - unit='txs', bar_format=self.FORMAT - ) - bars: Dict[int, tqdm.tqdm] = {} while True: msg = self.queue.get() if msg == self.STOP: return - file_num, msg_type, done = msg - bar, state = bars.get(file_num, None), self.state[file_num] - if msg_type == 1: - if bar is None: - bar = bars[file_num] = tqdm.tqdm( - desc=f'├─ blk{file_num:05}.dat parsing', total=state['total_blocks'], - unit='blocks', bar_format=self.FORMAT - ) - change = done - state['done_blocks'] - state['done_blocks'] = done - bar.update(change) - block_bar.update(change) - if state['total_blocks'] == done: - bar.set_description('✔ '+bar.desc[3:]) - bar.close() - bars.pop(file_num) - elif msg_type == 2: - if bar is None: - bar = bars[file_num] = tqdm.tqdm( - desc=f'├─ blk{file_num:05}.dat loading', total=state['total_txs'], - unit='txs', bar_format=self.FORMAT - ) - change = done - state['done_txs'] - state['done_txs'] = done - bar.update(change) - tx_bar.update(change) - if state['total_txs'] == done: - bar.set_description('✔ '+bar.desc[3:]) - bar.close() - bars.pop(file_num) self.stream_controller.add(msg) def shutdown(self): @@ -87,25 +46,19 @@ class ProgressMonitorThread(Thread): self.shutdown() -class BlockSync: +class BlockchainSync: - def __init__(self, chain: Lbrycrd, use_process_pool=False): + def __init__(self, chain: Lbrycrd, db: Database, use_process_pool=False): self.chain = chain + self.db = db self.use_process_pool = use_process_pool - self.db = AsyncBlockchainDB.from_path(self.chain.actual_data_dir) self._on_progress_controller = StreamController() self.on_progress = self._on_progress_controller.stream - async def start(self): - await self.db.open() - - async def stop(self): - await self.db.close() - def get_worker_pool(self, queue, full_stop) -> futures.Executor: args = dict( initializer=worker.initializer, - initargs=(self.chain.actual_data_dir, queue, full_stop) + initargs=(self.chain.data_dir, self.chain.regtest, self.db.db_path, queue, full_stop) ) if not self.use_process_pool: return futures.ThreadPoolExecutor(max_workers=1, **args) @@ -118,7 +71,7 @@ class BlockSync: jobs = [] queue, full_stop = Queue(), Event() executor = self.get_worker_pool(queue, full_stop) - files = list(await self.db.get_block_files_not_synced()) + files = list(await self.chain.get_block_files_not_synced()) state = { file.file_number: { 'status': worker.PENDING, diff --git a/lbry/blockchain/worker.py b/lbry/blockchain/worker.py index a0cf48c41..7eadade4b 100644 --- a/lbry/blockchain/worker.py +++ b/lbry/blockchain/worker.py @@ -5,7 +5,8 @@ from dataclasses import dataclass from itertools import islice from lbry.wallet.bcd_data_stream import BCDataStream -from .db import BlockchainDB +from lbry.db import Database +from .lbrycrd import Lbrycrd from .block import Block @@ -23,7 +24,8 @@ def chunk(rows, step): @dataclass class WorkerContext: - db: BlockchainDB + lbrycrd: Lbrycrd + db: Database progress: Queue stop: Event @@ -31,9 +33,10 @@ class WorkerContext: context: ContextVar[Optional[WorkerContext]] = ContextVar('context') -def initializer(data_dir: str, progress: Queue, stop: Event): +def initializer(data_dir: str, regtest: bool, db_path: str, progress: Queue, stop: Event): context.set(WorkerContext( - db=BlockchainDB(data_dir).open(), + lbrycrd=Lbrycrd(data_dir, regtest), + db=Database(db_path).sync_open(), progress=progress, stop=stop )) @@ -41,19 +44,19 @@ def initializer(data_dir: str, progress: Queue, stop: Event): def process_block_file(block_file_number): ctx: WorkerContext = context.get() - db, progress, stop = ctx.db, ctx.progress, ctx.stop - block_file_path = db.get_block_file_path_from_number(block_file_number) + lbrycrd, db, progress, stop = ctx.lbrycrd, ctx.db, ctx.progress, ctx.stop + block_file_path = lbrycrd.get_block_file_path_from_number(block_file_number) num = 0 progress.put_nowait((block_file_number, 1, num)) with open(block_file_path, 'rb') as fp: stream = BCDataStream(fp=fp) blocks, txs, claims, supports, spends = [], [], [], [], [] - for num, block_info in enumerate(db.get_blocks_not_synced(block_file_number), start=1): - if ctx.stop.is_set(): + for num, block_info in enumerate(lbrycrd.db.get_file_details(block_file_number), start=1): + if stop.is_set(): return if num % 100 == 0: progress.put_nowait((block_file_number, 1, num)) - fp.seek(block_info.data_offset) + fp.seek(block_info['data_offset']) block = Block(stream) for tx in block.txs: txs.append((block.block_hash, tx.position, tx.hash)) @@ -76,9 +79,11 @@ def process_block_file(block_file_number): block.block_hash, tx.hash, tx.position, output.ref.hash, output.claim_hash, output.claim_name, 2, output.amount, None, None )) - except: + except Exception: pass - blocks.append((block.block_hash, block.prev_block_hash, block_file_number, 0 if block.is_first_block else None)) + blocks.append( + (block.block_hash, block.prev_block_hash, block_file_number, 0 if block.is_first_block else None) + ) progress.put((block_file_number, 1, num)) @@ -95,7 +100,7 @@ def process_block_file(block_file_number): progress.put((block_file_number, 2, done_txs)) for sql, rows in queries: for chunk_size, chunk_rows in chunk(rows, 10000): - db.execute_many_tx(sql, chunk_rows) + db.sync_executemany(sql, chunk_rows) done_txs += int(chunk_size/step) progress.put((block_file_number, 2, done_txs)) progress.put((block_file_number, 2, total_txs)) diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 032d51d97..45b0003af 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -18,7 +18,6 @@ from functools import wraps, partial import ecdsa import base58 -from sqlalchemy import text from aiohttp import web from prometheus_client import generate_latest as prom_generate_latest from google.protobuf.message import DecodeError diff --git a/setup.cfg b/setup.cfg index c5d268dbb..ac14fc554 100644 --- a/setup.cfg +++ b/setup.cfg @@ -15,7 +15,7 @@ ignore=words,server,rpc,schema,winpaths.py,migrator,undecorated.py max-parents=10 max-args=10 max-line-length=120 -good-names=T,t,n,i,j,k,x,y,s,f,d,h,c,e,op,db,tx,io,cachedproperty,log,id,r,iv,ts,l +good-names=T,t,n,i,j,k,x,y,s,f,d,h,c,e,op,db,tx,io,cachedproperty,log,id,r,iv,ts,l,it,fp valid-metaclass-classmethod-first-arg=mcs disable= fixme,