Compare commits
4 commits
Author | SHA1 | Date | |
---|---|---|---|
|
6deeb67585 | ||
|
c53e76011f | ||
|
87c30c5b05 | ||
|
359e36a5b8 |
4 changed files with 167 additions and 11 deletions
2
Makefile
2
Makefile
|
@ -23,4 +23,4 @@ idea:
|
||||||
cp -r scripts/idea/* .idea
|
cp -r scripts/idea/* .idea
|
||||||
|
|
||||||
elastic-docker:
|
elastic-docker:
|
||||||
docker run -d -v lbryhub:/usr/share/elasticsearch/data -p 9200:9200 -p 9300:9300 -e"ES_JAVA_OPTS=-Xms512m -Xmx512m" -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.12.1
|
docker run -d --env network.publish_host=127.0.0.1 -v lbryhub:/usr/share/elasticsearch/data -p 9200:9200 -p 9300:9300 -e"ES_JAVA_OPTS=-Xms512m -Xmx512m" -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.12.1
|
||||||
|
|
|
@ -30,6 +30,7 @@ from lbry.wallet.bip32 import PublicKey, PrivateKey
|
||||||
from lbry.wallet.coinselection import CoinSelector
|
from lbry.wallet.coinselection import CoinSelector
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
log.setLevel("DEBUG")
|
||||||
|
|
||||||
LedgerType = Type['BaseLedger']
|
LedgerType = Type['BaseLedger']
|
||||||
|
|
||||||
|
@ -506,6 +507,7 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
|
|
||||||
def process_status_update(self, update):
|
def process_status_update(self, update):
|
||||||
address, remote_status = update
|
address, remote_status = update
|
||||||
|
print(f"**** status update {address} {remote_status}")
|
||||||
self._update_tasks.add(self.update_history(address, remote_status))
|
self._update_tasks.add(self.update_history(address, remote_status))
|
||||||
|
|
||||||
async def update_history(self, address, remote_status, address_manager: AddressManager = None,
|
async def update_history(self, address, remote_status, address_manager: AddressManager = None,
|
||||||
|
@ -518,6 +520,7 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
remote_history = await self.network.retriable_call(self.network.get_history, address)
|
remote_history = await self.network.retriable_call(self.network.get_history, address)
|
||||||
|
print(f'>>>>>> {remote_history}')
|
||||||
remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history))
|
remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history))
|
||||||
we_need = set(remote_history) - set(local_history)
|
we_need = set(remote_history) - set(local_history)
|
||||||
if not we_need:
|
if not we_need:
|
||||||
|
@ -549,7 +552,7 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
continue
|
continue
|
||||||
to_request[i] = (txid, remote_height)
|
to_request[i] = (txid, remote_height)
|
||||||
|
|
||||||
log.debug(
|
log.warning(
|
||||||
"request %i transactions, %i/%i for %s are already synced", len(to_request), len(already_synced),
|
"request %i transactions, %i/%i for %s are already synced", len(to_request), len(already_synced),
|
||||||
len(remote_history), address
|
len(remote_history), address
|
||||||
)
|
)
|
||||||
|
@ -558,8 +561,8 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
self.maybe_has_channel_key(tx)
|
self.maybe_has_channel_key(tx)
|
||||||
pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:"
|
pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:"
|
||||||
if len(pending_synced_history) % 100 == 0:
|
if len(pending_synced_history) % 100 == 0:
|
||||||
log.info("Syncing address %s: %d/%d", address, len(pending_synced_history), len(to_request))
|
log.warning("Syncing address %s: %d/%d", address, len(pending_synced_history), len(to_request))
|
||||||
log.info("Sync finished for address %s: %d/%d", address, len(pending_synced_history), len(to_request))
|
log.warning("Sync finished for address %s: %d/%d", address, len(pending_synced_history), len(to_request))
|
||||||
|
|
||||||
assert len(pending_synced_history) == len(remote_history), \
|
assert len(pending_synced_history) == len(remote_history), \
|
||||||
f"{len(pending_synced_history)} vs {len(remote_history)} for {address}"
|
f"{len(pending_synced_history)} vs {len(remote_history)} for {address}"
|
||||||
|
@ -606,7 +609,7 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
self._known_addresses_out_of_sync.add(address)
|
self._known_addresses_out_of_sync.add(address)
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
log.debug("finished syncing transaction history for %s, %i known txs", address, len(local_history))
|
log.warning("finished syncing transaction history for %s, %i known txs", address, len(local_history))
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def maybe_verify_transaction(self, tx, remote_height, merkle=None):
|
async def maybe_verify_transaction(self, tx, remote_height, merkle=None):
|
||||||
|
@ -621,6 +624,8 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
header = await self.headers.get(remote_height)
|
header = await self.headers.get(remote_height)
|
||||||
tx.position = merkle['pos']
|
tx.position = merkle['pos']
|
||||||
tx.is_verified = merkle_root == header['merkle_root']
|
tx.is_verified = merkle_root == header['merkle_root']
|
||||||
|
if not tx.is_verified:
|
||||||
|
print(f"&&&&&&& {tx.height}: {merkle_root} != {header['merkle_root']}")
|
||||||
return tx
|
return tx
|
||||||
|
|
||||||
def maybe_has_channel_key(self, tx):
|
def maybe_has_channel_key(self, tx):
|
||||||
|
|
|
@ -76,7 +76,9 @@ class ClientSession(BaseClientSession):
|
||||||
raise asyncio.TimeoutError
|
raise asyncio.TimeoutError
|
||||||
if done:
|
if done:
|
||||||
try:
|
try:
|
||||||
return request.result()
|
result = request.result()
|
||||||
|
log.warning("sent %s%s to %s:%i (%i timeout) result: %s", method, tuple(args), self.server[0], self.server[1], self.timeout, result)
|
||||||
|
return result
|
||||||
except ConnectionResetError:
|
except ConnectionResetError:
|
||||||
log.error(
|
log.error(
|
||||||
"wallet server (%s) reset connection upon our %s request, json of %i args is %i bytes",
|
"wallet server (%s) reset connection upon our %s request, json of %i args is %i bytes",
|
||||||
|
@ -304,7 +306,7 @@ class Network:
|
||||||
await client.ensure_server_version()
|
await client.ensure_server_version()
|
||||||
return client
|
return client
|
||||||
except (asyncio.TimeoutError, ConnectionError, OSError, IncompatibleWalletServerError, RPCError):
|
except (asyncio.TimeoutError, ConnectionError, OSError, IncompatibleWalletServerError, RPCError):
|
||||||
log.warning("Connecting to %s:%d failed", host, port)
|
log.exception("Connecting to %s:%d failed", host, port)
|
||||||
client._close()
|
client._close()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -328,7 +330,8 @@ class Network:
|
||||||
features = await client.send_request('server.features', [])
|
features = await client.send_request('server.features', [])
|
||||||
self.client, self.server_features = client, features
|
self.client, self.server_features = client, features
|
||||||
log.debug("discover other hubs %s:%i", *client.server)
|
log.debug("discover other hubs %s:%i", *client.server)
|
||||||
await self._update_hubs(await client.send_request('server.peers.subscribe', []))
|
# TODO: Enable this after herald.go supports 'server.peers.subscribe'.
|
||||||
|
#await self._update_hubs(await client.send_request('server.peers.subscribe', []))
|
||||||
log.info("subscribe to headers %s:%i", *client.server)
|
log.info("subscribe to headers %s:%i", *client.server)
|
||||||
self._update_remote_height((await self.subscribe_headers(),))
|
self._update_remote_height((await self.subscribe_headers(),))
|
||||||
self._on_connected_controller.add(True)
|
self._on_connected_controller.add(True)
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
# pylint: disable=import-error
|
# pylint: disable=import-error
|
||||||
import os
|
import os
|
||||||
|
import signal
|
||||||
import json
|
import json
|
||||||
import shutil
|
import shutil
|
||||||
import asyncio
|
import asyncio
|
||||||
|
@ -30,7 +31,7 @@ try:
|
||||||
from hub.elastic_sync.service import ElasticSyncService
|
from hub.elastic_sync.service import ElasticSyncService
|
||||||
from hub.scribe.service import BlockchainProcessorService
|
from hub.scribe.service import BlockchainProcessorService
|
||||||
except ImportError:
|
except ImportError:
|
||||||
pass
|
raise
|
||||||
|
|
||||||
|
|
||||||
def get_lbcd_node_from_ledger(ledger_module):
|
def get_lbcd_node_from_ledger(ledger_module):
|
||||||
|
@ -226,6 +227,7 @@ class SPVNode:
|
||||||
self.stopped = False
|
self.stopped = False
|
||||||
try:
|
try:
|
||||||
self.data_path = tempfile.mkdtemp()
|
self.data_path = tempfile.mkdtemp()
|
||||||
|
#self.data_path = '/Users/swdev1/herald/test_db'
|
||||||
conf = {
|
conf = {
|
||||||
'description': '',
|
'description': '',
|
||||||
'payment_address': '',
|
'payment_address': '',
|
||||||
|
@ -249,7 +251,9 @@ class SPVNode:
|
||||||
BlockchainEnv(db_dir=self.data_path, daemon_url=lbcwallet_node.rpc_url,
|
BlockchainEnv(db_dir=self.data_path, daemon_url=lbcwallet_node.rpc_url,
|
||||||
reorg_limit=100, max_query_workers=0, chain='regtest', index_address_status=False)
|
reorg_limit=100, max_query_workers=0, chain='regtest', index_address_status=False)
|
||||||
)
|
)
|
||||||
self.server = HubServerService(ServerEnv(**conf))
|
# Select Herald variant:
|
||||||
|
self.server = HubNode("", "herald", self) # Go Herald
|
||||||
|
#self.server = HubServerService(ServerEnv(**conf)) # Python Herald
|
||||||
self.es_writer = ElasticSyncService(
|
self.es_writer = ElasticSyncService(
|
||||||
ElasticEnv(
|
ElasticEnv(
|
||||||
db_dir=self.data_path, reorg_limit=100, max_query_workers=0, chain='regtest',
|
db_dir=self.data_path, reorg_limit=100, max_query_workers=0, chain='regtest',
|
||||||
|
@ -285,7 +289,8 @@ class SPVNode:
|
||||||
cleanup and self.cleanup()
|
cleanup and self.cleanup()
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
shutil.rmtree(self.data_path, ignore_errors=True)
|
log.error("skipping cleanup of data_path: %s", self.data_path)
|
||||||
|
#shutil.rmtree(self.data_path, ignore_errors=True)
|
||||||
|
|
||||||
|
|
||||||
class LBCDProcess(asyncio.SubprocessProtocol):
|
class LBCDProcess(asyncio.SubprocessProtocol):
|
||||||
|
@ -674,3 +679,146 @@ class LBCWalletNode:
|
||||||
|
|
||||||
def get_raw_transaction(self, txid):
|
def get_raw_transaction(self, txid):
|
||||||
return self._cli_cmnd('getrawtransaction', txid, '1')
|
return self._cli_cmnd('getrawtransaction', txid, '1')
|
||||||
|
|
||||||
|
|
||||||
|
class HubProcess(asyncio.SubprocessProtocol):
|
||||||
|
def __init__(self, ready, stopped):
|
||||||
|
self.ready = ready
|
||||||
|
self.stopped = stopped
|
||||||
|
self.log = log.getChild('hub')
|
||||||
|
self.transport = None
|
||||||
|
|
||||||
|
def pipe_data_received(self, fd, data):
|
||||||
|
self.stopped.clear()
|
||||||
|
self.ready.set()
|
||||||
|
if self.log:
|
||||||
|
self.log.warning(data.decode())
|
||||||
|
#if b'error' in data.lower():
|
||||||
|
# self.ready.set()
|
||||||
|
# raise SystemError(data.decode())
|
||||||
|
if b'listening on' in data:
|
||||||
|
self.ready.set()
|
||||||
|
str_lines = str(data.decode()).split("\n")
|
||||||
|
for line in str_lines:
|
||||||
|
if 'releaseTime' in line:
|
||||||
|
print(line)
|
||||||
|
|
||||||
|
def process_exited(self):
|
||||||
|
self.ready.clear()
|
||||||
|
self.stopped.set()
|
||||||
|
|
||||||
|
async def stop(self):
|
||||||
|
t = asyncio.create_task(self.stopped.wait())
|
||||||
|
try:
|
||||||
|
self.transport.send_signal(signal.SIGINT)
|
||||||
|
await asyncio.wait_for(t, 3)
|
||||||
|
# log.warning("stopped go hub")
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
if not t.done():
|
||||||
|
t.cancel()
|
||||||
|
self.transport.terminate()
|
||||||
|
await self.stopped.wait()
|
||||||
|
log.warning("terminated go hub")
|
||||||
|
|
||||||
|
|
||||||
|
class HubNode:
|
||||||
|
def __init__(self, url, daemon, spv_node):
|
||||||
|
self.spv_node = spv_node
|
||||||
|
self.latest_release_url = url
|
||||||
|
self.project_dir = os.path.dirname(os.path.dirname(__file__))
|
||||||
|
self.bin_dir = os.path.join(self.project_dir, 'bin')
|
||||||
|
self.daemon_bin = os.path.join(self.bin_dir, daemon)
|
||||||
|
self.cli_bin = os.path.join(self.bin_dir, daemon)
|
||||||
|
self.log = log.getChild('hub')
|
||||||
|
self.transport = None
|
||||||
|
self.protocol = None
|
||||||
|
self.hostname = 'localhost'
|
||||||
|
self.rpcport = 50051 # avoid conflict with default rpc port
|
||||||
|
self._stopped = asyncio.Event()
|
||||||
|
self.running = asyncio.Event()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def stopped(self):
|
||||||
|
return not self.running.is_set()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def exists(self):
|
||||||
|
return (
|
||||||
|
os.path.exists(self.cli_bin) and
|
||||||
|
os.path.exists(self.daemon_bin)
|
||||||
|
)
|
||||||
|
|
||||||
|
def download(self):
|
||||||
|
downloaded_file = os.path.join(
|
||||||
|
self.bin_dir,
|
||||||
|
self.latest_release_url[self.latest_release_url.rfind('/')+1:]
|
||||||
|
)
|
||||||
|
|
||||||
|
if not os.path.exists(self.bin_dir):
|
||||||
|
os.mkdir(self.bin_dir)
|
||||||
|
|
||||||
|
if not os.path.exists(downloaded_file):
|
||||||
|
self.log.info('Downloading: %s', self.latest_release_url)
|
||||||
|
with urllib.request.urlopen(self.latest_release_url) as response:
|
||||||
|
with open(downloaded_file, 'wb') as out_file:
|
||||||
|
shutil.copyfileobj(response, out_file)
|
||||||
|
|
||||||
|
self.log.info('Extracting: %s', downloaded_file)
|
||||||
|
|
||||||
|
if downloaded_file.endswith('.zip'):
|
||||||
|
with zipfile.ZipFile(downloaded_file) as dotzip:
|
||||||
|
dotzip.extractall(self.bin_dir)
|
||||||
|
# zipfile bug https://bugs.python.org/issue15795
|
||||||
|
os.chmod(self.cli_bin, 0o755)
|
||||||
|
os.chmod(self.daemon_bin, 0o755)
|
||||||
|
|
||||||
|
elif downloaded_file.endswith('.tar.gz'):
|
||||||
|
with tarfile.open(downloaded_file) as tar:
|
||||||
|
tar.extractall(self.bin_dir)
|
||||||
|
|
||||||
|
os.chmod(self.daemon_bin, 0o755)
|
||||||
|
|
||||||
|
return self.exists
|
||||||
|
|
||||||
|
def ensure(self):
|
||||||
|
return self.exists or self.download()
|
||||||
|
|
||||||
|
async def start(self):
|
||||||
|
assert self.ensure()
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
asyncio.get_child_watcher().attach_loop(loop)
|
||||||
|
command = [
|
||||||
|
self.daemon_bin, 'serve',
|
||||||
|
'--db-path', self.spv_node.data_path + '/lbry-rocksdb',
|
||||||
|
'--chain', 'regtest',
|
||||||
|
'--json-rpc-port', str(self.spv_node.port),
|
||||||
|
'--json-rpc-http-port', '0', # disabled
|
||||||
|
'--esindex', self.spv_node.index_name + 'claims',
|
||||||
|
'--notifier-port', str(self.spv_node.elastic_notifier_port),
|
||||||
|
'--debug'
|
||||||
|
]
|
||||||
|
self.log.info(' '.join(command))
|
||||||
|
self.protocol = HubProcess(self.running, self._stopped)
|
||||||
|
try:
|
||||||
|
self.transport, _ = await loop.subprocess_exec(
|
||||||
|
lambda: self.protocol, *command
|
||||||
|
)
|
||||||
|
self.protocol.transport = self.transport
|
||||||
|
except Exception as e:
|
||||||
|
log.exception('failed to start go hub', exc_info=e)
|
||||||
|
raise e
|
||||||
|
await self.protocol.ready.wait()
|
||||||
|
|
||||||
|
async def stop(self, cleanup=True):
|
||||||
|
try:
|
||||||
|
if self.protocol:
|
||||||
|
await self.protocol.stop()
|
||||||
|
except Exception as e:
|
||||||
|
log.exception('failed to stop go hub', exc_info=e)
|
||||||
|
raise e
|
||||||
|
finally:
|
||||||
|
if cleanup:
|
||||||
|
self.cleanup()
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
pass
|
||||||
|
|
Loading…
Reference in a new issue