From 47a8c005d9d4b2a8d60f955840783f784a1d53ed Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 7 Sep 2020 15:56:21 -0300 Subject: [PATCH 1/7] change zmq port for tests so it doesnt conflict with running lbrycrd --- lbry/blockchain/lbrycrd.py | 2 +- tests/integration/blockchain/test_blockchain.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/lbry/blockchain/lbrycrd.py b/lbry/blockchain/lbrycrd.py index 441f8f506..11aa79ba2 100644 --- a/lbry/blockchain/lbrycrd.py +++ b/lbry/blockchain/lbrycrd.py @@ -90,7 +90,7 @@ class Lbrycrd: Config.with_same_dir(tempfile.mkdtemp()).set( lbrycrd_rpc_port=9245 + 2, # avoid conflict with default rpc port lbrycrd_peer_port=9246 + 2, # avoid conflict with default peer port - lbrycrd_zmq_blocks="tcp://127.0.0.1:29000" + lbrycrd_zmq_blocks="tcp://127.0.0.1:29002" # avoid conflict with default port ) )) diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index e8f0de508..7804dfc34 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -309,19 +309,19 @@ class TestLbrycrdAPIs(AsyncioTestCase): await chain.stop() # lbrycrdr started with zmq, ensure_subscribable updates lbrycrd_zmq_blocks config - await chain.start('-zmqpubhashblock=tcp://127.0.0.1:29000') + await chain.start('-zmqpubhashblock=tcp://127.0.0.1:29005') self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, '') await chain.ensure_subscribable() - self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, 'tcp://127.0.0.1:29000') + self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, 'tcp://127.0.0.1:29005') await chain.stop() # lbrycrdr started with zmq, ensure_subscribable does not override lbrycrd_zmq_blocks config chain.ledger.conf.set(lbrycrd_zmq_blocks='') - await chain.start('-zmqpubhashblock=tcp://127.0.0.1:29000') + await chain.start('-zmqpubhashblock=tcp://127.0.0.1:29005') self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, '') - chain.ledger.conf.set(lbrycrd_zmq_blocks='tcp://external-ip:29000') + chain.ledger.conf.set(lbrycrd_zmq_blocks='tcp://external-ip:29005') await chain.ensure_subscribable() - self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, 'tcp://external-ip:29000') + self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, 'tcp://external-ip:29005') async def test_block_event(self): chain = Lbrycrd.temp_regtest() From a5c117b542d21db94d16677663a84e7f352ffe73 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 7 Sep 2020 17:17:51 -0300 Subject: [PATCH 2/7] wait for lbrycrd on start --- lbry/blockchain/lbrycrd.py | 2 ++ lbry/blockchain/sync/synchronizer.py | 12 +++++++++++- tests/integration/blockchain/test_blockchain.py | 8 ++++++++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/lbry/blockchain/lbrycrd.py b/lbry/blockchain/lbrycrd.py index 11aa79ba2..a43d5f600 100644 --- a/lbry/blockchain/lbrycrd.py +++ b/lbry/blockchain/lbrycrd.py @@ -243,6 +243,8 @@ class Lbrycrd: self.subscription = None async def rpc(self, method, params=None): + if self.session.closed: + raise Exception("session is closed! We are shutting down.") message = { "jsonrpc": "1.0", "id": "1", diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index 003b24efa..95a68aea2 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -43,9 +43,19 @@ class BlockchainSync(Sync): self.advance_loop_task: Optional[asyncio.Task] = None self.advance_loop_event = asyncio.Event() + async def wait_for_chain_ready(self): + while True: + try: + return await self.chain.ensure_subscribable() + except asyncio.CancelledError: + raise + except Exception as e: + log.warning("Blockchain not ready, waiting for it: %s", str(e)) + await asyncio.sleep(1) + async def start(self): self.db.stop_event.clear() - await self.chain.ensure_subscribable() + await self.wait_for_chain_ready() self.advance_loop_task = asyncio.create_task(self.advance()) await self.advance_loop_task await self.chain.subscribe() diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index 7804dfc34..a706e079e 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -613,6 +613,14 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): class TestGeneralBlockchainSync(SyncingBlockchainTestCase): + async def test_sync_waits_for_lbrycrd_to_start(self): + await self.sync.stop() + await self.chain.stop() + sync_start = asyncio.ensure_future(self.sync.start()) + await asyncio.sleep(0) + await self.chain.start() + await sync_start + self.assertTrue(sync_start.done()) # test goal is to get here without exceptions async def test_sync_advances(self): blocks = [] From cb60cd99f40255c1371046deaba4340234288427 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 10 Sep 2020 13:19:48 -0300 Subject: [PATCH 3/7] use create_task instead --- tests/integration/blockchain/test_blockchain.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index a706e079e..2e94a0990 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -616,7 +616,7 @@ class TestGeneralBlockchainSync(SyncingBlockchainTestCase): async def test_sync_waits_for_lbrycrd_to_start(self): await self.sync.stop() await self.chain.stop() - sync_start = asyncio.ensure_future(self.sync.start()) + sync_start = asyncio.create_task(self.sync.start()) await asyncio.sleep(0) await self.chain.start() await sync_start From 39a4c4e5905909d77a959b8ed7b16182d941876d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 11 Sep 2020 14:03:58 -0300 Subject: [PATCH 4/7] exit if zmq misconfigured. tell whats wrong --- lbry/blockchain/lbrycrd.py | 4 +++- lbry/blockchain/sync/synchronizer.py | 5 ++++- lbry/error/__init__.py | 7 +++++++ tests/integration/blockchain/test_blockchain.py | 9 +++++---- 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/lbry/blockchain/lbrycrd.py b/lbry/blockchain/lbrycrd.py index a43d5f600..31fc85e14 100644 --- a/lbry/blockchain/lbrycrd.py +++ b/lbry/blockchain/lbrycrd.py @@ -15,7 +15,7 @@ import zmq.asyncio from lbry.conf import Config from lbry.event import EventController -from lbry.error import LbrycrdEventSubscriptionError, LbrycrdUnauthorizedError +from lbry.error import LbrycrdEventSubscriptionError, LbrycrdUnauthorizedError, LbrycrdMisconfigurationError from .database import BlockchainDB from .ledger import Ledger, RegTestLedger @@ -209,6 +209,8 @@ class Lbrycrd: async def ensure_subscribable(self): zmq_notifications = await self.get_zmq_notifications() + if not zmq_notifications: + raise LbrycrdMisconfigurationError("zmqpubhashblock") subs = {e['type']: e['address'] for e in zmq_notifications} if ZMQ_BLOCK_EVENT not in subs: raise LbrycrdEventSubscriptionError(ZMQ_BLOCK_EVENT) diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index 95a68aea2..5bdf3931e 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -12,7 +12,7 @@ from lbry.service.base import Sync, BlockEvent from lbry.blockchain.lbrycrd import Lbrycrd from . import blocks as block_phase, claims as claim_phase, supports as support_phase - +from ...error import LbrycrdMisconfigurationError log = logging.getLogger(__name__) @@ -49,6 +49,9 @@ class BlockchainSync(Sync): return await self.chain.ensure_subscribable() except asyncio.CancelledError: raise + except LbrycrdMisconfigurationError as e: + log.warning(str(e)) + raise except Exception as e: log.warning("Blockchain not ready, waiting for it: %s", str(e)) await asyncio.sleep(1) diff --git a/lbry/error/__init__.py b/lbry/error/__init__.py index 57086d852..cf0cee4c5 100644 --- a/lbry/error/__init__.py +++ b/lbry/error/__init__.py @@ -417,3 +417,10 @@ class LbrycrdEventSubscriptionError(LbrycrdError): def __init__(self, event): self.event = event super().__init__(f"Lbrycrd is not publishing '{event}' events.") + + +class LbrycrdMisconfigurationError(LbrycrdError): + def __init__(self, config_key): + self.config_key = config_key + super().__init__(f"Lbrycrd is misconfigured. Please double check if" + f" {config_key} is properly set on lbrycrd.conf") diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index 2e94a0990..9e9acfdc9 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -11,7 +11,7 @@ from lbry import Config, Database, RegTestLedger, Transaction, Output, Input from lbry.crypto.base58 import Base58 from lbry.schema.claim import Stream, Channel from lbry.schema.support import Support -from lbry.error import LbrycrdEventSubscriptionError, LbrycrdUnauthorizedError +from lbry.error import LbrycrdEventSubscriptionError, LbrycrdUnauthorizedError, LbrycrdMisconfigurationError from lbry.blockchain.lbrycrd import Lbrycrd from lbry.blockchain.sync import BlockchainSync from lbry.blockchain.dewies import dewies_to_lbc, lbc_to_dewies @@ -613,14 +613,15 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): class TestGeneralBlockchainSync(SyncingBlockchainTestCase): - async def test_sync_waits_for_lbrycrd_to_start(self): + async def test_sync_exits_if_zmq_is_misconfigured(self): await self.sync.stop() await self.chain.stop() sync_start = asyncio.create_task(self.sync.start()) await asyncio.sleep(0) + self.chain.ledger.conf.set(lbrycrd_zmq_blocks='') await self.chain.start() - await sync_start - self.assertTrue(sync_start.done()) # test goal is to get here without exceptions + with self.assertRaises(LbrycrdMisconfigurationError): + await asyncio.wait_for(sync_start, timeout=10) async def test_sync_advances(self): blocks = [] From d6bcbd631fc5b9456f3fd2238810ad03c1dbffad Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 11 Sep 2020 14:05:51 -0300 Subject: [PATCH 5/7] improve session close error message --- lbry/blockchain/lbrycrd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/blockchain/lbrycrd.py b/lbry/blockchain/lbrycrd.py index 31fc85e14..86dcda039 100644 --- a/lbry/blockchain/lbrycrd.py +++ b/lbry/blockchain/lbrycrd.py @@ -246,7 +246,7 @@ class Lbrycrd: async def rpc(self, method, params=None): if self.session.closed: - raise Exception("session is closed! We are shutting down.") + raise Exception("session is closed! RPC attempted during shutting down.") message = { "jsonrpc": "1.0", "id": "1", From 2a0089a4dd29cafe7cc13eacff813e95a42eb738 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 11 Sep 2020 14:16:11 -0300 Subject: [PATCH 6/7] test starting and waiting normally as before too --- tests/integration/blockchain/test_blockchain.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index 9e9acfdc9..9494015aa 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -304,7 +304,7 @@ class TestLbrycrdAPIs(AsyncioTestCase): # lbrycrdr started without zmq await chain.start() - with self.assertRaises(LbrycrdEventSubscriptionError): + with self.assertRaises(LbrycrdMisconfigurationError): await chain.ensure_subscribable() await chain.stop() @@ -613,7 +613,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): class TestGeneralBlockchainSync(SyncingBlockchainTestCase): - async def test_sync_exits_if_zmq_is_misconfigured(self): + async def test_sync_waits_for_lbrycrd_to_start_but_exits_if_zmq_misconfigured(self): await self.sync.stop() await self.chain.stop() sync_start = asyncio.create_task(self.sync.start()) @@ -623,6 +623,13 @@ class TestGeneralBlockchainSync(SyncingBlockchainTestCase): with self.assertRaises(LbrycrdMisconfigurationError): await asyncio.wait_for(sync_start, timeout=10) + await self.chain.stop() + await self.sync.stop() + sync_start = asyncio.create_task(self.sync.start()) + await self.chain.start('-zmqpubhashblock=tcp://127.0.0.1:29005') + await sync_start + self.assertTrue(sync_start.done()) + async def test_sync_advances(self): blocks = [] self.sync.on_block.listen(blocks.append) From a554c8838cc4191caa8c4df3954c2cdd48ee8020 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 14 Sep 2020 11:53:13 -0300 Subject: [PATCH 7/7] use previous exception insted of declaring a new one --- lbry/blockchain/lbrycrd.py | 4 +--- lbry/blockchain/sync/synchronizer.py | 9 ++++++--- lbry/error/__init__.py | 7 ------- tests/integration/blockchain/test_blockchain.py | 6 +++--- 4 files changed, 10 insertions(+), 16 deletions(-) diff --git a/lbry/blockchain/lbrycrd.py b/lbry/blockchain/lbrycrd.py index 86dcda039..d0dcce91a 100644 --- a/lbry/blockchain/lbrycrd.py +++ b/lbry/blockchain/lbrycrd.py @@ -15,7 +15,7 @@ import zmq.asyncio from lbry.conf import Config from lbry.event import EventController -from lbry.error import LbrycrdEventSubscriptionError, LbrycrdUnauthorizedError, LbrycrdMisconfigurationError +from lbry.error import LbrycrdEventSubscriptionError, LbrycrdUnauthorizedError from .database import BlockchainDB from .ledger import Ledger, RegTestLedger @@ -209,8 +209,6 @@ class Lbrycrd: async def ensure_subscribable(self): zmq_notifications = await self.get_zmq_notifications() - if not zmq_notifications: - raise LbrycrdMisconfigurationError("zmqpubhashblock") subs = {e['type']: e['address'] for e in zmq_notifications} if ZMQ_BLOCK_EVENT not in subs: raise LbrycrdEventSubscriptionError(ZMQ_BLOCK_EVENT) diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index 5bdf3931e..cea8bf751 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -10,9 +10,9 @@ from lbry.db.query_context import Event, Progress from lbry.event import BroadcastSubscription from lbry.service.base import Sync, BlockEvent from lbry.blockchain.lbrycrd import Lbrycrd +from lbry.error import LbrycrdEventSubscriptionError from . import blocks as block_phase, claims as claim_phase, supports as support_phase -from ...error import LbrycrdMisconfigurationError log = logging.getLogger(__name__) @@ -49,8 +49,11 @@ class BlockchainSync(Sync): return await self.chain.ensure_subscribable() except asyncio.CancelledError: raise - except LbrycrdMisconfigurationError as e: - log.warning(str(e)) + except LbrycrdEventSubscriptionError as e: + log.warning( + "Lbrycrd is misconfigured. Please double check if" + " zmqpubhashblock is properly set on lbrycrd.conf" + ) raise except Exception as e: log.warning("Blockchain not ready, waiting for it: %s", str(e)) diff --git a/lbry/error/__init__.py b/lbry/error/__init__.py index cf0cee4c5..57086d852 100644 --- a/lbry/error/__init__.py +++ b/lbry/error/__init__.py @@ -417,10 +417,3 @@ class LbrycrdEventSubscriptionError(LbrycrdError): def __init__(self, event): self.event = event super().__init__(f"Lbrycrd is not publishing '{event}' events.") - - -class LbrycrdMisconfigurationError(LbrycrdError): - def __init__(self, config_key): - self.config_key = config_key - super().__init__(f"Lbrycrd is misconfigured. Please double check if" - f" {config_key} is properly set on lbrycrd.conf") diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index 9494015aa..f99836fd5 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -11,7 +11,7 @@ from lbry import Config, Database, RegTestLedger, Transaction, Output, Input from lbry.crypto.base58 import Base58 from lbry.schema.claim import Stream, Channel from lbry.schema.support import Support -from lbry.error import LbrycrdEventSubscriptionError, LbrycrdUnauthorizedError, LbrycrdMisconfigurationError +from lbry.error import LbrycrdEventSubscriptionError, LbrycrdUnauthorizedError from lbry.blockchain.lbrycrd import Lbrycrd from lbry.blockchain.sync import BlockchainSync from lbry.blockchain.dewies import dewies_to_lbc, lbc_to_dewies @@ -304,7 +304,7 @@ class TestLbrycrdAPIs(AsyncioTestCase): # lbrycrdr started without zmq await chain.start() - with self.assertRaises(LbrycrdMisconfigurationError): + with self.assertRaises(LbrycrdEventSubscriptionError): await chain.ensure_subscribable() await chain.stop() @@ -620,7 +620,7 @@ class TestGeneralBlockchainSync(SyncingBlockchainTestCase): await asyncio.sleep(0) self.chain.ledger.conf.set(lbrycrd_zmq_blocks='') await self.chain.start() - with self.assertRaises(LbrycrdMisconfigurationError): + with self.assertRaises(LbrycrdEventSubscriptionError): await asyncio.wait_for(sync_start, timeout=10) await self.chain.stop()