Change orchestr8/node.py to allow manual selection of Herald.go.
This commit is contained in:
parent
359e36a5b8
commit
87c30c5b05
1 changed files with 149 additions and 2 deletions
|
@ -1,5 +1,6 @@
|
|||
# pylint: disable=import-error
|
||||
import os
|
||||
import signal
|
||||
import json
|
||||
import shutil
|
||||
import asyncio
|
||||
|
@ -30,7 +31,7 @@ try:
|
|||
from hub.elastic_sync.service import ElasticSyncService
|
||||
from hub.scribe.service import BlockchainProcessorService
|
||||
except ImportError:
|
||||
pass
|
||||
raise
|
||||
|
||||
|
||||
def get_lbcd_node_from_ledger(ledger_module):
|
||||
|
@ -226,6 +227,7 @@ class SPVNode:
|
|||
self.stopped = False
|
||||
try:
|
||||
self.data_path = tempfile.mkdtemp()
|
||||
#self.data_path = '/Users/swdev1/herald/test_db'
|
||||
conf = {
|
||||
'description': '',
|
||||
'payment_address': '',
|
||||
|
@ -249,7 +251,9 @@ class SPVNode:
|
|||
BlockchainEnv(db_dir=self.data_path, daemon_url=lbcwallet_node.rpc_url,
|
||||
reorg_limit=100, max_query_workers=0, chain='regtest', index_address_status=False)
|
||||
)
|
||||
self.server = HubServerService(ServerEnv(**conf))
|
||||
# Select Herald variant:
|
||||
self.server = HubNode("", "herald", self) # Go Herald
|
||||
#self.server = HubServerService(ServerEnv(**conf)) # Python Herald
|
||||
self.es_writer = ElasticSyncService(
|
||||
ElasticEnv(
|
||||
db_dir=self.data_path, reorg_limit=100, max_query_workers=0, chain='regtest',
|
||||
|
@ -674,3 +678,146 @@ class LBCWalletNode:
|
|||
|
||||
def get_raw_transaction(self, txid):
|
||||
return self._cli_cmnd('getrawtransaction', txid, '1')
|
||||
|
||||
|
||||
class HubProcess(asyncio.SubprocessProtocol):
|
||||
def __init__(self, ready, stopped):
|
||||
self.ready = ready
|
||||
self.stopped = stopped
|
||||
self.log = log.getChild('hub')
|
||||
self.transport = None
|
||||
|
||||
def pipe_data_received(self, fd, data):
|
||||
self.stopped.clear()
|
||||
self.ready.set()
|
||||
if self.log:
|
||||
self.log.warning(data.decode())
|
||||
#if b'error' in data.lower():
|
||||
# self.ready.set()
|
||||
# raise SystemError(data.decode())
|
||||
if b'listening on' in data:
|
||||
self.ready.set()
|
||||
str_lines = str(data.decode()).split("\n")
|
||||
for line in str_lines:
|
||||
if 'releaseTime' in line:
|
||||
print(line)
|
||||
|
||||
def process_exited(self):
|
||||
self.ready.clear()
|
||||
self.stopped.set()
|
||||
|
||||
async def stop(self):
|
||||
t = asyncio.create_task(self.stopped.wait())
|
||||
try:
|
||||
self.transport.send_signal(signal.SIGINT)
|
||||
await asyncio.wait_for(t, 3)
|
||||
# log.warning("stopped go hub")
|
||||
except asyncio.TimeoutError:
|
||||
if not t.done():
|
||||
t.cancel()
|
||||
self.transport.terminate()
|
||||
await self.stopped.wait()
|
||||
log.warning("terminated go hub")
|
||||
|
||||
|
||||
class HubNode:
|
||||
def __init__(self, url, daemon, spv_node):
|
||||
self.spv_node = spv_node
|
||||
self.latest_release_url = url
|
||||
self.project_dir = os.path.dirname(os.path.dirname(__file__))
|
||||
self.bin_dir = os.path.join(self.project_dir, 'bin')
|
||||
self.daemon_bin = os.path.join(self.bin_dir, daemon)
|
||||
self.cli_bin = os.path.join(self.bin_dir, daemon)
|
||||
self.log = log.getChild('hub')
|
||||
self.transport = None
|
||||
self.protocol = None
|
||||
self.hostname = 'localhost'
|
||||
self.rpcport = 50051 # avoid conflict with default rpc port
|
||||
self._stopped = asyncio.Event()
|
||||
self.running = asyncio.Event()
|
||||
|
||||
@property
|
||||
def stopped(self):
|
||||
return not self.running.is_set()
|
||||
|
||||
@property
|
||||
def exists(self):
|
||||
return (
|
||||
os.path.exists(self.cli_bin) and
|
||||
os.path.exists(self.daemon_bin)
|
||||
)
|
||||
|
||||
def download(self):
|
||||
downloaded_file = os.path.join(
|
||||
self.bin_dir,
|
||||
self.latest_release_url[self.latest_release_url.rfind('/')+1:]
|
||||
)
|
||||
|
||||
if not os.path.exists(self.bin_dir):
|
||||
os.mkdir(self.bin_dir)
|
||||
|
||||
if not os.path.exists(downloaded_file):
|
||||
self.log.info('Downloading: %s', self.latest_release_url)
|
||||
with urllib.request.urlopen(self.latest_release_url) as response:
|
||||
with open(downloaded_file, 'wb') as out_file:
|
||||
shutil.copyfileobj(response, out_file)
|
||||
|
||||
self.log.info('Extracting: %s', downloaded_file)
|
||||
|
||||
if downloaded_file.endswith('.zip'):
|
||||
with zipfile.ZipFile(downloaded_file) as dotzip:
|
||||
dotzip.extractall(self.bin_dir)
|
||||
# zipfile bug https://bugs.python.org/issue15795
|
||||
os.chmod(self.cli_bin, 0o755)
|
||||
os.chmod(self.daemon_bin, 0o755)
|
||||
|
||||
elif downloaded_file.endswith('.tar.gz'):
|
||||
with tarfile.open(downloaded_file) as tar:
|
||||
tar.extractall(self.bin_dir)
|
||||
|
||||
os.chmod(self.daemon_bin, 0o755)
|
||||
|
||||
return self.exists
|
||||
|
||||
def ensure(self):
|
||||
return self.exists or self.download()
|
||||
|
||||
async def start(self):
|
||||
assert self.ensure()
|
||||
loop = asyncio.get_event_loop()
|
||||
asyncio.get_child_watcher().attach_loop(loop)
|
||||
command = [
|
||||
self.daemon_bin, 'serve',
|
||||
'--db-path', self.spv_node.data_path + '/lbry-rocksdb',
|
||||
'--chain', 'regtest',
|
||||
'--json-rpc-port', str(self.spv_node.port),
|
||||
'--json-rpc-http-port', '0', # disabled
|
||||
'--esindex', self.spv_node.index_name + 'claims',
|
||||
'--notifier-port', str(self.spv_node.elastic_notifier_port),
|
||||
'--debug'
|
||||
]
|
||||
self.log.info(' '.join(command))
|
||||
self.protocol = HubProcess(self.running, self._stopped)
|
||||
try:
|
||||
self.transport, _ = await loop.subprocess_exec(
|
||||
lambda: self.protocol, *command
|
||||
)
|
||||
self.protocol.transport = self.transport
|
||||
except Exception as e:
|
||||
log.exception('failed to start go hub', exc_info=e)
|
||||
raise e
|
||||
await self.protocol.ready.wait()
|
||||
|
||||
async def stop(self, cleanup=True):
|
||||
try:
|
||||
if self.protocol:
|
||||
await self.protocol.stop()
|
||||
except Exception as e:
|
||||
log.exception('failed to stop go hub', exc_info=e)
|
||||
raise e
|
||||
finally:
|
||||
if cleanup:
|
||||
self.cleanup()
|
||||
|
||||
def cleanup(self):
|
||||
pass
|
||||
|
|
Loading…
Add table
Reference in a new issue