diff --git a/lbry/blockchain/lbrycrd.py b/lbry/blockchain/lbrycrd.py index 441f8f506..d0dcce91a 100644 --- a/lbry/blockchain/lbrycrd.py +++ b/lbry/blockchain/lbrycrd.py @@ -90,7 +90,7 @@ class Lbrycrd: Config.with_same_dir(tempfile.mkdtemp()).set( lbrycrd_rpc_port=9245 + 2, # avoid conflict with default rpc port lbrycrd_peer_port=9246 + 2, # avoid conflict with default peer port - lbrycrd_zmq_blocks="tcp://127.0.0.1:29000" + lbrycrd_zmq_blocks="tcp://127.0.0.1:29002" # avoid conflict with default port ) )) @@ -243,6 +243,8 @@ class Lbrycrd: self.subscription = None async def rpc(self, method, params=None): + if self.session.closed: + raise Exception("session is closed! RPC attempted during shutting down.") message = { "jsonrpc": "1.0", "id": "1", diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index 003b24efa..cea8bf751 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -10,10 +10,10 @@ from lbry.db.query_context import Event, Progress from lbry.event import BroadcastSubscription from lbry.service.base import Sync, BlockEvent from lbry.blockchain.lbrycrd import Lbrycrd +from lbry.error import LbrycrdEventSubscriptionError from . import blocks as block_phase, claims as claim_phase, supports as support_phase - log = logging.getLogger(__name__) BLOCKS_INIT_EVENT = Event.add("blockchain.sync.blocks.init", "steps") @@ -43,9 +43,25 @@ class BlockchainSync(Sync): self.advance_loop_task: Optional[asyncio.Task] = None self.advance_loop_event = asyncio.Event() + async def wait_for_chain_ready(self): + while True: + try: + return await self.chain.ensure_subscribable() + except asyncio.CancelledError: + raise + except LbrycrdEventSubscriptionError as e: + log.warning( + "Lbrycrd is misconfigured. Please double check if" + " zmqpubhashblock is properly set on lbrycrd.conf" + ) + raise + except Exception as e: + log.warning("Blockchain not ready, waiting for it: %s", str(e)) + await asyncio.sleep(1) + async def start(self): self.db.stop_event.clear() - await self.chain.ensure_subscribable() + await self.wait_for_chain_ready() self.advance_loop_task = asyncio.create_task(self.advance()) await self.advance_loop_task await self.chain.subscribe() diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index e8f0de508..f99836fd5 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -309,19 +309,19 @@ class TestLbrycrdAPIs(AsyncioTestCase): await chain.stop() # lbrycrdr started with zmq, ensure_subscribable updates lbrycrd_zmq_blocks config - await chain.start('-zmqpubhashblock=tcp://127.0.0.1:29000') + await chain.start('-zmqpubhashblock=tcp://127.0.0.1:29005') self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, '') await chain.ensure_subscribable() - self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, 'tcp://127.0.0.1:29000') + self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, 'tcp://127.0.0.1:29005') await chain.stop() # lbrycrdr started with zmq, ensure_subscribable does not override lbrycrd_zmq_blocks config chain.ledger.conf.set(lbrycrd_zmq_blocks='') - await chain.start('-zmqpubhashblock=tcp://127.0.0.1:29000') + await chain.start('-zmqpubhashblock=tcp://127.0.0.1:29005') self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, '') - chain.ledger.conf.set(lbrycrd_zmq_blocks='tcp://external-ip:29000') + chain.ledger.conf.set(lbrycrd_zmq_blocks='tcp://external-ip:29005') await chain.ensure_subscribable() - self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, 'tcp://external-ip:29000') + self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, 'tcp://external-ip:29005') async def test_block_event(self): chain = Lbrycrd.temp_regtest() @@ -613,6 +613,22 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): class TestGeneralBlockchainSync(SyncingBlockchainTestCase): + async def test_sync_waits_for_lbrycrd_to_start_but_exits_if_zmq_misconfigured(self): + await self.sync.stop() + await self.chain.stop() + sync_start = asyncio.create_task(self.sync.start()) + await asyncio.sleep(0) + self.chain.ledger.conf.set(lbrycrd_zmq_blocks='') + await self.chain.start() + with self.assertRaises(LbrycrdEventSubscriptionError): + await asyncio.wait_for(sync_start, timeout=10) + + await self.chain.stop() + await self.sync.stop() + sync_start = asyncio.create_task(self.sync.start()) + await self.chain.start('-zmqpubhashblock=tcp://127.0.0.1:29005') + await sync_start + self.assertTrue(sync_start.done()) async def test_sync_advances(self): blocks = []