Merge pull request #3040 from lbryio/fwss_wait_lbrycrd

Synchronizer: wait for lbrycrd to start if its not ready, instead of raising
This commit is contained in:
Lex Berezhny 2020-09-14 13:21:10 -04:00 committed by GitHub
commit 144eb248e3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 42 additions and 8 deletions

View file

@ -90,7 +90,7 @@ class Lbrycrd:
Config.with_same_dir(tempfile.mkdtemp()).set( Config.with_same_dir(tempfile.mkdtemp()).set(
lbrycrd_rpc_port=9245 + 2, # avoid conflict with default rpc port lbrycrd_rpc_port=9245 + 2, # avoid conflict with default rpc port
lbrycrd_peer_port=9246 + 2, # avoid conflict with default peer port lbrycrd_peer_port=9246 + 2, # avoid conflict with default peer port
lbrycrd_zmq_blocks="tcp://127.0.0.1:29000" lbrycrd_zmq_blocks="tcp://127.0.0.1:29002" # avoid conflict with default port
) )
)) ))
@ -243,6 +243,8 @@ class Lbrycrd:
self.subscription = None self.subscription = None
async def rpc(self, method, params=None): async def rpc(self, method, params=None):
if self.session.closed:
raise Exception("session is closed! RPC attempted during shutting down.")
message = { message = {
"jsonrpc": "1.0", "jsonrpc": "1.0",
"id": "1", "id": "1",

View file

@ -10,10 +10,10 @@ from lbry.db.query_context import Event, Progress
from lbry.event import BroadcastSubscription from lbry.event import BroadcastSubscription
from lbry.service.base import Sync, BlockEvent from lbry.service.base import Sync, BlockEvent
from lbry.blockchain.lbrycrd import Lbrycrd from lbry.blockchain.lbrycrd import Lbrycrd
from lbry.error import LbrycrdEventSubscriptionError
from . import blocks as block_phase, claims as claim_phase, supports as support_phase from . import blocks as block_phase, claims as claim_phase, supports as support_phase
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
BLOCKS_INIT_EVENT = Event.add("blockchain.sync.blocks.init", "steps") BLOCKS_INIT_EVENT = Event.add("blockchain.sync.blocks.init", "steps")
@ -43,9 +43,25 @@ class BlockchainSync(Sync):
self.advance_loop_task: Optional[asyncio.Task] = None self.advance_loop_task: Optional[asyncio.Task] = None
self.advance_loop_event = asyncio.Event() self.advance_loop_event = asyncio.Event()
async def wait_for_chain_ready(self):
while True:
try:
return await self.chain.ensure_subscribable()
except asyncio.CancelledError:
raise
except LbrycrdEventSubscriptionError as e:
log.warning(
"Lbrycrd is misconfigured. Please double check if"
" zmqpubhashblock is properly set on lbrycrd.conf"
)
raise
except Exception as e:
log.warning("Blockchain not ready, waiting for it: %s", str(e))
await asyncio.sleep(1)
async def start(self): async def start(self):
self.db.stop_event.clear() self.db.stop_event.clear()
await self.chain.ensure_subscribable() await self.wait_for_chain_ready()
self.advance_loop_task = asyncio.create_task(self.advance()) self.advance_loop_task = asyncio.create_task(self.advance())
await self.advance_loop_task await self.advance_loop_task
await self.chain.subscribe() await self.chain.subscribe()

View file

@ -309,19 +309,19 @@ class TestLbrycrdAPIs(AsyncioTestCase):
await chain.stop() await chain.stop()
# lbrycrdr started with zmq, ensure_subscribable updates lbrycrd_zmq_blocks config # lbrycrdr started with zmq, ensure_subscribable updates lbrycrd_zmq_blocks config
await chain.start('-zmqpubhashblock=tcp://127.0.0.1:29000') await chain.start('-zmqpubhashblock=tcp://127.0.0.1:29005')
self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, '') self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, '')
await chain.ensure_subscribable() await chain.ensure_subscribable()
self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, 'tcp://127.0.0.1:29000') self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, 'tcp://127.0.0.1:29005')
await chain.stop() await chain.stop()
# lbrycrdr started with zmq, ensure_subscribable does not override lbrycrd_zmq_blocks config # lbrycrdr started with zmq, ensure_subscribable does not override lbrycrd_zmq_blocks config
chain.ledger.conf.set(lbrycrd_zmq_blocks='') chain.ledger.conf.set(lbrycrd_zmq_blocks='')
await chain.start('-zmqpubhashblock=tcp://127.0.0.1:29000') await chain.start('-zmqpubhashblock=tcp://127.0.0.1:29005')
self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, '') self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, '')
chain.ledger.conf.set(lbrycrd_zmq_blocks='tcp://external-ip:29000') chain.ledger.conf.set(lbrycrd_zmq_blocks='tcp://external-ip:29005')
await chain.ensure_subscribable() await chain.ensure_subscribable()
self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, 'tcp://external-ip:29000') self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, 'tcp://external-ip:29005')
async def test_block_event(self): async def test_block_event(self):
chain = Lbrycrd.temp_regtest() chain = Lbrycrd.temp_regtest()
@ -613,6 +613,22 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase):
class TestGeneralBlockchainSync(SyncingBlockchainTestCase): class TestGeneralBlockchainSync(SyncingBlockchainTestCase):
async def test_sync_waits_for_lbrycrd_to_start_but_exits_if_zmq_misconfigured(self):
await self.sync.stop()
await self.chain.stop()
sync_start = asyncio.create_task(self.sync.start())
await asyncio.sleep(0)
self.chain.ledger.conf.set(lbrycrd_zmq_blocks='')
await self.chain.start()
with self.assertRaises(LbrycrdEventSubscriptionError):
await asyncio.wait_for(sync_start, timeout=10)
await self.chain.stop()
await self.sync.stop()
sync_start = asyncio.create_task(self.sync.start())
await self.chain.start('-zmqpubhashblock=tcp://127.0.0.1:29005')
await sync_start
self.assertTrue(sync_start.done())
async def test_sync_advances(self): async def test_sync_advances(self):
blocks = [] blocks = []