From 377ecd5f146229a8a1e399e84fae0cd7e1a97b90 Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Mon, 11 Apr 2022 00:41:50 +0000 Subject: [PATCH] fix last outstanding test failures --- scribe/elasticsearch/notifier_protocol.py | 18 ++++--- scribe/env.py | 6 +-- scribe/hub/service.py | 63 +++++++++++++++++++++-- tests/test_resolve_command.py | 20 ++----- 4 files changed, 77 insertions(+), 30 deletions(-) diff --git a/scribe/elasticsearch/notifier_protocol.py b/scribe/elasticsearch/notifier_protocol.py index adc2f67..ebfb6ec 100644 --- a/scribe/elasticsearch/notifier_protocol.py +++ b/scribe/elasticsearch/notifier_protocol.py @@ -57,7 +57,7 @@ class ElasticNotifierClientProtocol(asyncio.Protocol): await self.connect() first_connect = False synchronized.set() - log.info("connected to es notifier") + log.warning("connected to es notifier: %d", self.port) except Exception as e: if not isinstance(e, asyncio.CancelledError): log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)", self.host, self.port) @@ -75,13 +75,17 @@ class ElasticNotifierClientProtocol(asyncio.Protocol): self._lost_connection.clear() def connection_lost(self, exc) -> None: + log.warning("lost connection to notifier port %d: %s", self.port, exc) self.transport = None self._lost_connection.set() def data_received(self, data: bytes) -> None: - try: - height, block_hash = struct.unpack(b'>Q32s', data) - except: - log.exception("failed to decode %s", (data or b'').hex()) - raise - self.notifications.put_nowait((height, block_hash)) + log.warning("received data from notifier port %d: %s", self.port, data) + while data: + chunk, data = data[:40], data[40:] + try: + height, block_hash = struct.unpack(b'>Q32s', chunk) + except: + log.exception("failed to decode %s", (chunk or b'').hex()) + raise + self.notifications.put_nowait((height, block_hash)) diff --git a/scribe/env.py b/scribe/env.py index b6dc305..bb15f23 100644 --- a/scribe/env.py +++ b/scribe/env.py @@ -38,8 +38,8 @@ class Env: allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None, payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, session_timeout=None, drop_client=None, description=None, daily_fee=None, - database_query_timeout=None, db_max_open_files=64, elastic_notifier_host=None, - elastic_notifier_port=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, + database_query_timeout=None, db_max_open_files=64, elastic_notifier_port=None, + go_hub_notifier_port=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None): self.logger = logging.getLogger(__name__) self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY') @@ -49,8 +49,8 @@ class Env: self.host = host if host is not None else self.default('HOST', 'localhost') self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200) - self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default('ELASTIC_NOTIFIER_HOST', 'localhost') self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer('ELASTIC_NOTIFIER_PORT', 19080) + self.go_hub_notifier_port = go_hub_notifier_port if go_hub_notifier_port is not None else self.integer('GO_HUB_NOTIFIER_PORT', 18080) self.loop_policy = self.set_event_loop_policy( loop_policy if loop_policy is not None else self.default('EVENT_LOOP_POLICY', None) diff --git a/scribe/hub/service.py b/scribe/hub/service.py index afeef59..8578fe5 100644 --- a/scribe/hub/service.py +++ b/scribe/hub/service.py @@ -28,9 +28,17 @@ class HubServerService(BlockchainReaderService): self.es_notification_client = ElasticNotifierClientProtocol( self.es_notifications, self.env.elastic_notifier_host, self.env.elastic_notifier_port ) + self.go_hub_notifications = asyncio.Queue() + self.go_hub_notifications_client = ElasticNotifierClientProtocol( + self.go_hub_notifications, '127.0.0.1', self.env.go_hub_notifier_port + ) + self.es_synchronized = asyncio.Event() + self.go_hub_synchronized = asyncio.Event() self.synchronized = asyncio.Event() self._es_height = None self._es_block_hash = None + self._go_hub_height = None + self._go_hub_block_hash = None def clear_caches(self): self.session_manager.clear_caches() @@ -61,7 +69,9 @@ class HubServerService(BlockchainReaderService): await self.mempool.on_block(touched, height) self.log.info("reader advanced to %i", height) if self._es_height == self.db.db_height: - self.synchronized.set() + self.es_synchronized.set() + if self._go_hub_height == self.db.db_height: + self.go_hub_synchronized.set() if self.mempool_notifications: await self.mempool.on_mempool( set(self.mempool.touched_hashXs), self.mempool_notifications, self.db.db_height @@ -69,21 +79,63 @@ class HubServerService(BlockchainReaderService): self.mempool_notifications.clear() self.notifications_to_send.clear() - async def receive_es_notifications(self, synchronized: asyncio.Event): + async def receive_es_notifications(self, synchronized: asyncio.Event): #### synchronized.set() try: while True: self._es_height, self._es_block_hash = await self.es_notifications.get() self.clear_search_cache() if self.last_state and self._es_block_hash == self.last_state.tip: - self.synchronized.set() + self.es_synchronized.set() self.log.info("es and reader are in sync at block %i", self.last_state.height) else: self.log.info("es and reader are not yet in sync (block %s vs %s)", self._es_height, self.db.db_height) + except Exception as e: + if not isinstance(e, asyncio.CancelledError): + self.log.exception("es notification error: %s", e) + raise e finally: self.es_notification_client.close() + async def receive_go_hub_notifications(self, synchronized: asyncio.Event): #### + synchronized.set() + try: + while True: + self._go_hub_height, self._go_hub_block_hash = await self.go_hub_notifications.get() + if self.last_state and self._go_hub_block_hash == self.last_state.tip: + self.go_hub_synchronized.set() + self.log.info("go hub and reader are in sync at block %i", self.last_state.height) + else: + self.log.info("go hub and reader are not yet in sync (block %s vs %s)", self._go_hub_height, + self.db.db_height) + except Exception as e: + if not isinstance(e, asyncio.CancelledError): + self.log.exception("go hub notification error: %s", e) + raise e + finally: + self.go_hub_notifications_client.close() + + + async def wait_for_sync(self, synchronized: asyncio.Event): + synchronized.set() + try: + while True: + es_sync = asyncio.create_task(self.es_synchronized.wait()) + go_hub_sync = asyncio.create_task(self.go_hub_synchronized.wait()) + await asyncio.wait([es_sync, go_hub_sync], return_when=asyncio.FIRST_COMPLETED) + if not es_sync.done(): + es_sync.cancel() + if not go_hub_sync.done(): + go_hub_sync.cancel() + if self._go_hub_height == self._es_height == self.db.db_height: + self.synchronized.set() + # self.log.info("reader, hub, and es are in sync at block %i", self.db.db_height) + except Exception as e: + if not isinstance(e, asyncio.CancelledError): + self.log.exception("wait_for_sync error: %s", e) + raise e + async def start_status_server(self): if self.env.udp_port and int(self.env.udp_port): await self.status_server.start( @@ -93,13 +145,16 @@ class HubServerService(BlockchainReaderService): def _iter_start_tasks(self): yield self.start_status_server() - yield self.start_cancellable(self.es_notification_client.maintain_connection) yield self.start_cancellable(self.mempool.send_notifications_forever) + yield self.start_cancellable(self.es_notification_client.maintain_connection) + yield self.start_cancellable(self.go_hub_notifications_client.maintain_connection) yield self.start_cancellable(self.refresh_blocks_forever) yield self.finished_initial_catch_up.wait() self.block_count_metric.set(self.last_state.height) yield self.start_prometheus() yield self.start_cancellable(self.receive_es_notifications) + yield self.start_cancellable(self.receive_go_hub_notifications) + yield self.start_cancellable(self.wait_for_sync) yield self.session_manager.search_index.start() yield self.start_cancellable(self.session_manager.serve, self.mempool) diff --git a/tests/test_resolve_command.py b/tests/test_resolve_command.py index 4ac1bc2..babe23a 100644 --- a/tests/test_resolve_command.py +++ b/tests/test_resolve_command.py @@ -415,8 +415,6 @@ class ResolveCommand(BaseResolveTestCase): uri = 'lbry://@abc/on-channel-claim' # now, claim something on this channel (it will update the invalid claim, but we save and forcefully restore) valid_claim = await self.stream_create('on-channel-claim', '0.00000001', channel_id=channel['claim_id']) - print("sleeeeeeeeeping.....") - time.sleep(1) # FIXME: don't wait for the claim to be saved # resolves normally response = await self.resolve(uri) self.assertTrue(response['is_channel_signature_valid']) @@ -1536,6 +1534,7 @@ class ResolveAfterReorg(BaseResolveTestCase): self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order') async def test_reorg(self): + await asyncio.wait_for(self.on_header(206), 3.0) self.assertEqual(self.ledger.headers.height, 206) channel_name = '@abc' @@ -1587,8 +1586,6 @@ class ResolveAfterReorg(BaseResolveTestCase): ) await self.stream_abandon(stream_id) - print("Sleeeeeeeping to wait for abandoned stream to be removed") - time.sleep(1) self.assertNotIn('error', await self.resolve(channel_name)) self.assertIn('error', await self.resolve(stream_name)) self.assertEqual(channel_id, (await self.assertMatchWinningClaim(channel_name)).claim_hash.hex()) @@ -1596,16 +1593,12 @@ class ResolveAfterReorg(BaseResolveTestCase): # TODO: check @abc/foo too await self.reorg(206) - print("Sleeeeeeeping to wait for reorg") - time.sleep(1) self.assertNotIn('error', await self.resolve(channel_name)) self.assertIn('error', await self.resolve(stream_name)) self.assertEqual(channel_id, (await self.assertMatchWinningClaim(channel_name)).claim_hash.hex()) await self.assertNoClaimForName(stream_name) await self.channel_abandon(channel_id) - print("Sleeeeeeeping to wait for abandoned channel to be removed") - time.sleep(1) self.assertIn('error', await self.resolve(channel_name)) self.assertIn('error', await self.resolve(stream_name)) await self.reorg(206) @@ -1637,6 +1630,7 @@ class ResolveAfterReorg(BaseResolveTestCase): await self.assertBlockHash(208) claim = await self.resolve('hovercraft') + self.assertEqual(claim['txid'], broadcast_tx.id) self.assertEqual(claim['height'], 208) @@ -1644,6 +1638,7 @@ class ResolveAfterReorg(BaseResolveTestCase): invalidated_block_hash = (await self.ledger.headers.hash(208)).decode() block_207 = await self.blockchain.get_block(invalidated_block_hash) self.assertIn(claim['txid'], block_207['tx']) + await asyncio.wait_for(self.on_header(208), 3.0) self.assertEqual(208, claim['height']) # reorg the last block dropping our claim tx @@ -1653,8 +1648,6 @@ class ResolveAfterReorg(BaseResolveTestCase): # wait for the client to catch up and verify the reorg await asyncio.wait_for(self.on_header(209), 3.0) - # FIXME: add endpoint in golang to get height and wait for that to hit the right height - time.sleep(1) await self.assertBlockHash(207) await self.assertBlockHash(208) await self.assertBlockHash(209) @@ -1684,8 +1677,6 @@ class ResolveAfterReorg(BaseResolveTestCase): # wait for the client to catch up await asyncio.wait_for(self.on_header(210), 3.0) - # FIXME: add endpoint in golang to get height and wait for that to hit the right height - time.sleep(1) # verify the claim is in the new block and that it is returned by claim_search republished = await self.resolve('hovercraft') @@ -1719,6 +1710,7 @@ class ResolveAfterReorg(BaseResolveTestCase): self.assertEqual(self.ledger.headers.height, 208) await self.assertBlockHash(208) + await asyncio.wait_for(self.on_header(208), 30.0) claim = await self.resolve('hovercraft') self.assertEqual(claim['txid'], broadcast_tx.id) self.assertEqual(claim['height'], 208) @@ -1736,8 +1728,6 @@ class ResolveAfterReorg(BaseResolveTestCase): # wait for the client to catch up and verify the reorg await asyncio.wait_for(self.on_header(209), 30.0) - # FIXME: add endpoint in golang to get height and wait for that to hit the right height - time.sleep(1) await self.assertBlockHash(207) await self.assertBlockHash(208) await self.assertBlockHash(209) @@ -1767,8 +1757,6 @@ class ResolveAfterReorg(BaseResolveTestCase): # wait for the client to catch up await asyncio.wait_for(self.on_header(210), 1.0) - # FIXME: add endpoint in golang to get height and wait for that to hit the right height - time.sleep(1) # verify the claim is in the new block and that it is returned by claim_search republished = await self.resolve('hovercraft')