import os import struct import shutil import asyncio import logging import zipfile import tempfile import urllib.request from typing import Optional from binascii import hexlify import aiohttp import zmq import zmq.asyncio from lbry.conf import Config from lbry.event import EventController from lbry.error import LbrycrdEventSubscriptionError, LbrycrdUnauthorizedError from .database import BlockchainDB from .ledger import Ledger, RegTestLedger log = logging.getLogger(__name__) DOWNLOAD_URL = ( 'https://github.com/lbryio/lbrycrd/releases/download/v0.17.4.6/lbrycrd-linux-1746.zip' ) class Process(asyncio.SubprocessProtocol): IGNORE_OUTPUT = [ b'keypool keep', b'keypool reserve', b'keypool return', ] def __init__(self): self.ready = asyncio.Event() self.stopped = asyncio.Event() def pipe_data_received(self, fd, data): if not any(ignore in data for ignore in self.IGNORE_OUTPUT): if b'Error:' in data: log.error(data.decode()) else: for line in data.decode().splitlines(): log.debug(line.rstrip()) if b'Error:' in data: self.ready.set() raise SystemError(data.decode()) if b'Done loading' in data: self.ready.set() def process_exited(self): self.stopped.set() self.ready.set() ZMQ_BLOCK_EVENT = 'pubhashblock' class Lbrycrd: def __init__(self, ledger: Ledger): self.ledger, self.conf = ledger, ledger.conf self.data_dir = self.actual_data_dir = ledger.conf.lbrycrd_dir if self.is_regtest: self.actual_data_dir = os.path.join(self.data_dir, 'regtest') self.blocks_dir = os.path.join(self.actual_data_dir, 'blocks') self.bin_dir = os.path.join(os.path.dirname(__file__), 'bin') self.daemon_bin = os.path.join(self.bin_dir, 'lbrycrdd') self.cli_bin = os.path.join(self.bin_dir, 'lbrycrd-cli') self.protocol = None self.transport = None self.subscribed = False self.subscription: Optional[asyncio.Task] = None self.default_generate_address = None self._on_block_controller = EventController() self.on_block = self._on_block_controller.stream self.on_block.listen(lambda e: log.info('%s %s', hexlify(e['hash']), e['msg'])) self.db = BlockchainDB(self.actual_data_dir) self.session: Optional[aiohttp.ClientSession] = None @classmethod def temp_regtest(cls): return cls(RegTestLedger( Config.with_same_dir(tempfile.mkdtemp()).set( lbrycrd_rpc_port=9245 + 2, # avoid conflict with default rpc port lbrycrd_peer_port=9246 + 2, # avoid conflict with default peer port lbrycrd_zmq_blocks="tcp://127.0.0.1:29000" ) )) @staticmethod def get_block_file_name(block_file_number): return f'blk{block_file_number:05}.dat' def get_block_file_path(self, block_file_number): return os.path.join( self.actual_data_dir, 'blocks', self.get_block_file_name(block_file_number) ) @property def is_regtest(self): return isinstance(self.ledger, RegTestLedger) @property def rpc_url(self): return ( f'http://{self.conf.lbrycrd_rpc_user}:{self.conf.lbrycrd_rpc_pass}' f'@{self.conf.lbrycrd_rpc_host}:{self.conf.lbrycrd_rpc_port}/' ) @property def exists(self): return ( os.path.exists(self.cli_bin) and os.path.exists(self.daemon_bin) ) async def download(self): downloaded_file = os.path.join( self.bin_dir, DOWNLOAD_URL[DOWNLOAD_URL.rfind('/')+1:] ) if not os.path.exists(self.bin_dir): os.mkdir(self.bin_dir) if not os.path.exists(downloaded_file): log.info('Downloading: %s', DOWNLOAD_URL) async with aiohttp.ClientSession() as session: async with session.get(DOWNLOAD_URL) as response: with open(downloaded_file, 'wb') as out_file: while True: chunk = await response.content.read(4096) if not chunk: break out_file.write(chunk) with urllib.request.urlopen(DOWNLOAD_URL) as response: with open(downloaded_file, 'wb') as out_file: shutil.copyfileobj(response, out_file) log.info('Extracting: %s', downloaded_file) with zipfile.ZipFile(downloaded_file) as dotzip: dotzip.extractall(self.bin_dir) # zipfile bug https://bugs.python.org/issue15795 os.chmod(self.cli_bin, 0o755) os.chmod(self.daemon_bin, 0o755) return self.exists async def ensure(self): return self.exists or await self.download() def get_start_command(self, *args): if self.is_regtest: args += ('-regtest',) if self.conf.lbrycrd_zmq_blocks: args += (f'-zmqpubhashblock={self.conf.lbrycrd_zmq_blocks}',) return ( self.daemon_bin, f'-datadir={self.data_dir}', f'-port={self.conf.lbrycrd_peer_port}', f'-rpcport={self.conf.lbrycrd_rpc_port}', f'-rpcuser={self.conf.lbrycrd_rpc_user}', f'-rpcpassword={self.conf.lbrycrd_rpc_pass}', '-server', '-printtoconsole', *args ) async def open(self): self.session = aiohttp.ClientSession() await self.db.open() async def close(self): await self.db.close() if self.session is not None: await self.session.close() async def start(self, *args): loop = asyncio.get_running_loop() command = self.get_start_command(*args) log.info(' '.join(command)) self.transport, self.protocol = await loop.subprocess_exec(Process, *command) await self.protocol.ready.wait() assert not self.protocol.stopped.is_set() await self.open() async def stop(self, cleanup=True): try: await self.close() self.transport.terminate() await self.protocol.stopped.wait() assert self.transport.get_returncode() == 0, "lbrycrd daemon exit with error" self.transport.close() finally: if cleanup: await self.cleanup() async def cleanup(self): await asyncio.get_running_loop().run_in_executor( None, shutil.rmtree, self.data_dir, True ) async def ensure_subscribable(self): zmq_notifications = await self.get_zmq_notifications() subs = {e['type']: e['address'] for e in zmq_notifications} if ZMQ_BLOCK_EVENT not in subs: raise LbrycrdEventSubscriptionError(ZMQ_BLOCK_EVENT) if not self.conf.lbrycrd_zmq_blocks: self.conf.lbrycrd_zmq_blocks = subs[ZMQ_BLOCK_EVENT] async def subscribe(self): if not self.subscribed: self.subscribed = True ctx = zmq.asyncio.Context.instance() sock = ctx.socket(zmq.SUB) # pylint: disable=no-member sock.connect(self.conf.lbrycrd_zmq_blocks) sock.subscribe("hashblock") self.subscription = asyncio.create_task(self.subscription_handler(sock)) async def subscription_handler(self, sock): try: while self.subscribed: msg = await sock.recv_multipart() await self._on_block_controller.add({ 'hash': msg[1], 'msg': struct.unpack('