diff --git a/lbry/wallet/orchstr8/node.py b/lbry/wallet/orchstr8/node.py index ea041802a..31c7b9102 100644 --- a/lbry/wallet/orchstr8/node.py +++ b/lbry/wallet/orchstr8/node.py @@ -1,5 +1,6 @@ # pylint: disable=import-error import os +import signal import json import shutil import asyncio @@ -30,7 +31,7 @@ try: from hub.elastic_sync.service import ElasticSyncService from hub.scribe.service import BlockchainProcessorService except ImportError: - pass + raise def get_lbcd_node_from_ledger(ledger_module): @@ -226,6 +227,7 @@ class SPVNode: self.stopped = False try: self.data_path = tempfile.mkdtemp() + #self.data_path = '/Users/swdev1/herald/test_db' conf = { 'description': '', 'payment_address': '', @@ -249,7 +251,9 @@ class SPVNode: BlockchainEnv(db_dir=self.data_path, daemon_url=lbcwallet_node.rpc_url, reorg_limit=100, max_query_workers=0, chain='regtest', index_address_status=False) ) - self.server = HubServerService(ServerEnv(**conf)) + # Select Herald variant: + self.server = HubNode("", "herald", self) # Go Herald + #self.server = HubServerService(ServerEnv(**conf)) # Python Herald self.es_writer = ElasticSyncService( ElasticEnv( db_dir=self.data_path, reorg_limit=100, max_query_workers=0, chain='regtest', @@ -674,3 +678,146 @@ class LBCWalletNode: def get_raw_transaction(self, txid): return self._cli_cmnd('getrawtransaction', txid, '1') + + +class HubProcess(asyncio.SubprocessProtocol): + def __init__(self, ready, stopped): + self.ready = ready + self.stopped = stopped + self.log = log.getChild('hub') + self.transport = None + + def pipe_data_received(self, fd, data): + self.stopped.clear() + self.ready.set() + if self.log: + self.log.warning(data.decode()) + #if b'error' in data.lower(): + # self.ready.set() + # raise SystemError(data.decode()) + if b'listening on' in data: + self.ready.set() + str_lines = str(data.decode()).split("\n") + for line in str_lines: + if 'releaseTime' in line: + print(line) + + def process_exited(self): + self.ready.clear() + self.stopped.set() + + async def stop(self): + t = asyncio.create_task(self.stopped.wait()) + try: + self.transport.send_signal(signal.SIGINT) + await asyncio.wait_for(t, 3) + # log.warning("stopped go hub") + except asyncio.TimeoutError: + if not t.done(): + t.cancel() + self.transport.terminate() + await self.stopped.wait() + log.warning("terminated go hub") + + +class HubNode: + def __init__(self, url, daemon, spv_node): + self.spv_node = spv_node + self.latest_release_url = url + self.project_dir = os.path.dirname(os.path.dirname(__file__)) + self.bin_dir = os.path.join(self.project_dir, 'bin') + self.daemon_bin = os.path.join(self.bin_dir, daemon) + self.cli_bin = os.path.join(self.bin_dir, daemon) + self.log = log.getChild('hub') + self.transport = None + self.protocol = None + self.hostname = 'localhost' + self.rpcport = 50051 # avoid conflict with default rpc port + self._stopped = asyncio.Event() + self.running = asyncio.Event() + + @property + def stopped(self): + return not self.running.is_set() + + @property + def exists(self): + return ( + os.path.exists(self.cli_bin) and + os.path.exists(self.daemon_bin) + ) + + def download(self): + downloaded_file = os.path.join( + self.bin_dir, + self.latest_release_url[self.latest_release_url.rfind('/')+1:] + ) + + if not os.path.exists(self.bin_dir): + os.mkdir(self.bin_dir) + + if not os.path.exists(downloaded_file): + self.log.info('Downloading: %s', self.latest_release_url) + with urllib.request.urlopen(self.latest_release_url) as response: + with open(downloaded_file, 'wb') as out_file: + shutil.copyfileobj(response, out_file) + + self.log.info('Extracting: %s', downloaded_file) + + if downloaded_file.endswith('.zip'): + with zipfile.ZipFile(downloaded_file) as dotzip: + dotzip.extractall(self.bin_dir) + # zipfile bug https://bugs.python.org/issue15795 + os.chmod(self.cli_bin, 0o755) + os.chmod(self.daemon_bin, 0o755) + + elif downloaded_file.endswith('.tar.gz'): + with tarfile.open(downloaded_file) as tar: + tar.extractall(self.bin_dir) + + os.chmod(self.daemon_bin, 0o755) + + return self.exists + + def ensure(self): + return self.exists or self.download() + + async def start(self): + assert self.ensure() + loop = asyncio.get_event_loop() + asyncio.get_child_watcher().attach_loop(loop) + command = [ + self.daemon_bin, 'serve', + '--db-path', self.spv_node.data_path + '/lbry-rocksdb', + '--chain', 'regtest', + '--json-rpc-port', str(self.spv_node.port), + '--json-rpc-http-port', '0', # disabled + '--esindex', self.spv_node.index_name + 'claims', + '--notifier-port', str(self.spv_node.elastic_notifier_port), + '--debug' + ] + self.log.info(' '.join(command)) + self.protocol = HubProcess(self.running, self._stopped) + try: + self.transport, _ = await loop.subprocess_exec( + lambda: self.protocol, *command + ) + self.protocol.transport = self.transport + except Exception as e: + log.exception('failed to start go hub', exc_info=e) + raise e + await self.protocol.ready.wait() + + async def stop(self, cleanup=True): + try: + if self.protocol: + await self.protocol.stop() + except Exception as e: + log.exception('failed to stop go hub', exc_info=e) + raise e + finally: + if cleanup: + self.cleanup() + + def cleanup(self): + pass