Compare commits

...

2 commits

Author SHA1 Message Date
Jeffrey Picard 377ecd5f14 fix last outstanding test failures 2022-05-02 09:51:13 +00:00
Jeffrey Picard cd34564d78 Changes to support go hub in testing 2022-05-02 09:51:13 +00:00
9 changed files with 3183 additions and 2046 deletions

View file

@ -234,9 +234,7 @@ class HubDB:
# winning resolution
controlling = self.get_controlling_claim(normalized_name)
if not controlling:
# print(f"none controlling for lbry://{normalized_name}")
return
# print(f"resolved controlling lbry://{normalized_name}#{controlling.claim_hash.hex()}")
return self._fs_get_claim_by_hash(controlling.claim_hash)
amount_order = max(int(amount_order or 1), 1)

View file

@ -57,7 +57,7 @@ class ElasticNotifierClientProtocol(asyncio.Protocol):
await self.connect()
first_connect = False
synchronized.set()
log.info("connected to es notifier")
log.warning("connected to es notifier: %d", self.port)
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)", self.host, self.port)
@ -75,13 +75,17 @@ class ElasticNotifierClientProtocol(asyncio.Protocol):
self._lost_connection.clear()
def connection_lost(self, exc) -> None:
log.warning("lost connection to notifier port %d: %s", self.port, exc)
self.transport = None
self._lost_connection.set()
def data_received(self, data: bytes) -> None:
try:
height, block_hash = struct.unpack(b'>Q32s', data)
except:
log.exception("failed to decode %s", (data or b'').hex())
raise
self.notifications.put_nowait((height, block_hash))
log.warning("received data from notifier port %d: %s", self.port, data)
while data:
chunk, data = data[:40], data[40:]
try:
height, block_hash = struct.unpack(b'>Q32s', chunk)
except:
log.exception("failed to decode %s", (chunk or b'').hex())
raise
self.notifications.put_nowait((height, block_hash))

View file

@ -38,8 +38,8 @@ class Env:
allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None,
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
session_timeout=None, drop_client=None, description=None, daily_fee=None,
database_query_timeout=None, db_max_open_files=64, elastic_notifier_host=None,
elastic_notifier_port=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None,
database_query_timeout=None, db_max_open_files=64, elastic_notifier_port=None,
go_hub_notifier_port=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None,
peer_announce=None):
self.logger = logging.getLogger(__name__)
self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY')
@ -49,8 +49,8 @@ class Env:
self.host = host if host is not None else self.default('HOST', 'localhost')
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default('ELASTIC_NOTIFIER_HOST', 'localhost')
self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer('ELASTIC_NOTIFIER_PORT', 19080)
self.go_hub_notifier_port = go_hub_notifier_port if go_hub_notifier_port is not None else self.integer('GO_HUB_NOTIFIER_PORT', 18080)
self.loop_policy = self.set_event_loop_policy(
loop_policy if loop_policy is not None else self.default('EVENT_LOOP_POLICY', None)

View file

@ -28,9 +28,17 @@ class HubServerService(BlockchainReaderService):
self.es_notification_client = ElasticNotifierClientProtocol(
self.es_notifications, self.env.elastic_notifier_host, self.env.elastic_notifier_port
)
self.go_hub_notifications = asyncio.Queue()
self.go_hub_notifications_client = ElasticNotifierClientProtocol(
self.go_hub_notifications, '127.0.0.1', self.env.go_hub_notifier_port
)
self.es_synchronized = asyncio.Event()
self.go_hub_synchronized = asyncio.Event()
self.synchronized = asyncio.Event()
self._es_height = None
self._es_block_hash = None
self._go_hub_height = None
self._go_hub_block_hash = None
def clear_caches(self):
self.session_manager.clear_caches()
@ -61,7 +69,9 @@ class HubServerService(BlockchainReaderService):
await self.mempool.on_block(touched, height)
self.log.info("reader advanced to %i", height)
if self._es_height == self.db.db_height:
self.synchronized.set()
self.es_synchronized.set()
if self._go_hub_height == self.db.db_height:
self.go_hub_synchronized.set()
if self.mempool_notifications:
await self.mempool.on_mempool(
set(self.mempool.touched_hashXs), self.mempool_notifications, self.db.db_height
@ -69,21 +79,63 @@ class HubServerService(BlockchainReaderService):
self.mempool_notifications.clear()
self.notifications_to_send.clear()
async def receive_es_notifications(self, synchronized: asyncio.Event):
async def receive_es_notifications(self, synchronized: asyncio.Event): ####
synchronized.set()
try:
while True:
self._es_height, self._es_block_hash = await self.es_notifications.get()
self.clear_search_cache()
if self.last_state and self._es_block_hash == self.last_state.tip:
self.synchronized.set()
self.es_synchronized.set()
self.log.info("es and reader are in sync at block %i", self.last_state.height)
else:
self.log.info("es and reader are not yet in sync (block %s vs %s)", self._es_height,
self.db.db_height)
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
self.log.exception("es notification error: %s", e)
raise e
finally:
self.es_notification_client.close()
async def receive_go_hub_notifications(self, synchronized: asyncio.Event): ####
synchronized.set()
try:
while True:
self._go_hub_height, self._go_hub_block_hash = await self.go_hub_notifications.get()
if self.last_state and self._go_hub_block_hash == self.last_state.tip:
self.go_hub_synchronized.set()
self.log.info("go hub and reader are in sync at block %i", self.last_state.height)
else:
self.log.info("go hub and reader are not yet in sync (block %s vs %s)", self._go_hub_height,
self.db.db_height)
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
self.log.exception("go hub notification error: %s", e)
raise e
finally:
self.go_hub_notifications_client.close()
async def wait_for_sync(self, synchronized: asyncio.Event):
synchronized.set()
try:
while True:
es_sync = asyncio.create_task(self.es_synchronized.wait())
go_hub_sync = asyncio.create_task(self.go_hub_synchronized.wait())
await asyncio.wait([es_sync, go_hub_sync], return_when=asyncio.FIRST_COMPLETED)
if not es_sync.done():
es_sync.cancel()
if not go_hub_sync.done():
go_hub_sync.cancel()
if self._go_hub_height == self._es_height == self.db.db_height:
self.synchronized.set()
# self.log.info("reader, hub, and es are in sync at block %i", self.db.db_height)
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
self.log.exception("wait_for_sync error: %s", e)
raise e
async def start_status_server(self):
if self.env.udp_port and int(self.env.udp_port):
await self.status_server.start(
@ -93,13 +145,16 @@ class HubServerService(BlockchainReaderService):
def _iter_start_tasks(self):
yield self.start_status_server()
yield self.start_cancellable(self.es_notification_client.maintain_connection)
yield self.start_cancellable(self.mempool.send_notifications_forever)
yield self.start_cancellable(self.es_notification_client.maintain_connection)
yield self.start_cancellable(self.go_hub_notifications_client.maintain_connection)
yield self.start_cancellable(self.refresh_blocks_forever)
yield self.finished_initial_catch_up.wait()
self.block_count_metric.set(self.last_state.height)
yield self.start_prometheus()
yield self.start_cancellable(self.receive_es_notifications)
yield self.start_cancellable(self.receive_go_hub_notifications)
yield self.start_cancellable(self.wait_for_sync)
yield self.session_manager.search_index.start()
yield self.start_cancellable(self.session_manager.serve, self.mempool)

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,4 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

View file

@ -20,7 +20,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
syntax='proto3',
serialized_options=b'Z$github.com/lbryio/hub/protobuf/go/pb',
create_key=_descriptor._internal_create_key,
serialized_pb=b'\n\thub.proto\x12\x02pb\x1a\x0cresult.proto\"\x0e\n\x0c\x45mptyMessage\".\n\rServerMessage\x12\x0f\n\x07\x61\x64\x64ress\x18\x01 \x01(\t\x12\x0c\n\x04port\x18\x02 \x01(\t\"N\n\x0cHelloMessage\x12\x0c\n\x04port\x18\x01 \x01(\t\x12\x0c\n\x04host\x18\x02 \x01(\t\x12\"\n\x07servers\x18\x03 \x03(\x0b\x32\x11.pb.ServerMessage\"0\n\x0fInvertibleField\x12\x0e\n\x06invert\x18\x01 \x01(\x08\x12\r\n\x05value\x18\x02 \x03(\t\"\x1c\n\x0bStringValue\x12\r\n\x05value\x18\x01 \x01(\t\"\x1a\n\tBoolValue\x12\r\n\x05value\x18\x01 \x01(\x08\"\x1c\n\x0bUInt32Value\x12\r\n\x05value\x18\x01 \x01(\r\"j\n\nRangeField\x12\x1d\n\x02op\x18\x01 \x01(\x0e\x32\x11.pb.RangeField.Op\x12\r\n\x05value\x18\x02 \x03(\x05\".\n\x02Op\x12\x06\n\x02\x45Q\x10\x00\x12\x07\n\x03LTE\x10\x01\x12\x07\n\x03GTE\x10\x02\x12\x06\n\x02LT\x10\x03\x12\x06\n\x02GT\x10\x04\"\x8e\x0c\n\rSearchRequest\x12%\n\x08\x63laim_id\x18\x01 \x01(\x0b\x32\x13.pb.InvertibleField\x12\'\n\nchannel_id\x18\x02 \x01(\x0b\x32\x13.pb.InvertibleField\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\r\n\x05limit\x18\x04 \x01(\x05\x12\x10\n\x08order_by\x18\x05 \x03(\t\x12\x0e\n\x06offset\x18\x06 \x01(\r\x12\x16\n\x0eis_controlling\x18\x07 \x01(\x08\x12\x1d\n\x15last_take_over_height\x18\x08 \x01(\t\x12\x12\n\nclaim_name\x18\t \x01(\t\x12\x17\n\x0fnormalized_name\x18\n \x01(\t\x12#\n\x0btx_position\x18\x0b \x03(\x0b\x32\x0e.pb.RangeField\x12\x1e\n\x06\x61mount\x18\x0c \x03(\x0b\x32\x0e.pb.RangeField\x12!\n\ttimestamp\x18\r \x03(\x0b\x32\x0e.pb.RangeField\x12*\n\x12\x63reation_timestamp\x18\x0e \x03(\x0b\x32\x0e.pb.RangeField\x12\x1e\n\x06height\x18\x0f \x03(\x0b\x32\x0e.pb.RangeField\x12\'\n\x0f\x63reation_height\x18\x10 \x03(\x0b\x32\x0e.pb.RangeField\x12)\n\x11\x61\x63tivation_height\x18\x11 \x03(\x0b\x32\x0e.pb.RangeField\x12)\n\x11\x65xpiration_height\x18\x12 \x03(\x0b\x32\x0e.pb.RangeField\x12$\n\x0crelease_time\x18\x13 \x03(\x0b\x32\x0e.pb.RangeField\x12\x11\n\tshort_url\x18\x14 \x01(\t\x12\x15\n\rcanonical_url\x18\x15 \x01(\t\x12\r\n\x05title\x18\x16 \x01(\t\x12\x0e\n\x06\x61uthor\x18\x17 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x18 \x01(\t\x12\x12\n\nclaim_type\x18\x19 \x03(\t\x12$\n\x0crepost_count\x18\x1a \x03(\x0b\x32\x0e.pb.RangeField\x12\x13\n\x0bstream_type\x18\x1b \x03(\t\x12\x12\n\nmedia_type\x18\x1c \x03(\t\x12\"\n\nfee_amount\x18\x1d \x03(\x0b\x32\x0e.pb.RangeField\x12\x14\n\x0c\x66\x65\x65_currency\x18\x1e \x01(\t\x12 \n\x08\x64uration\x18\x1f \x03(\x0b\x32\x0e.pb.RangeField\x12\x19\n\x11reposted_claim_id\x18 \x01(\t\x12#\n\x0b\x63\x65nsor_type\x18! \x03(\x0b\x32\x0e.pb.RangeField\x12\x19\n\x11\x63laims_in_channel\x18\" \x01(\t\x12)\n\x12is_signature_valid\x18$ \x01(\x0b\x32\r.pb.BoolValue\x12(\n\x10\x65\x66\x66\x65\x63tive_amount\x18% \x03(\x0b\x32\x0e.pb.RangeField\x12&\n\x0esupport_amount\x18& \x03(\x0b\x32\x0e.pb.RangeField\x12&\n\x0etrending_score\x18\' \x03(\x0b\x32\x0e.pb.RangeField\x12\r\n\x05tx_id\x18+ \x01(\t\x12 \n\x07tx_nout\x18, \x01(\x0b\x32\x0f.pb.UInt32Value\x12\x11\n\tsignature\x18- \x01(\t\x12\x18\n\x10signature_digest\x18. \x01(\t\x12\x18\n\x10public_key_bytes\x18/ \x01(\t\x12\x15\n\rpublic_key_id\x18\x30 \x01(\t\x12\x10\n\x08\x61ny_tags\x18\x31 \x03(\t\x12\x10\n\x08\x61ll_tags\x18\x32 \x03(\t\x12\x10\n\x08not_tags\x18\x33 \x03(\t\x12\x1d\n\x15has_channel_signature\x18\x34 \x01(\x08\x12!\n\nhas_source\x18\x35 \x01(\x0b\x32\r.pb.BoolValue\x12 \n\x18limit_claims_per_channel\x18\x36 \x01(\x05\x12\x15\n\rany_languages\x18\x37 \x03(\t\x12\x15\n\rall_languages\x18\x38 \x03(\t\x12\x19\n\x11remove_duplicates\x18\x39 \x01(\x08\x12\x11\n\tno_totals\x18: \x01(\x08\x12\x0f\n\x07sd_hash\x18; \x01(\t2\x88\x03\n\x03Hub\x12*\n\x06Search\x12\x11.pb.SearchRequest\x1a\x0b.pb.Outputs\"\x00\x12+\n\x04Ping\x12\x10.pb.EmptyMessage\x1a\x0f.pb.StringValue\"\x00\x12-\n\x05Hello\x12\x10.pb.HelloMessage\x1a\x10.pb.HelloMessage\"\x00\x12/\n\x07\x41\x64\x64Peer\x12\x11.pb.ServerMessage\x1a\x0f.pb.StringValue\"\x00\x12\x35\n\rPeerSubscribe\x12\x11.pb.ServerMessage\x1a\x0f.pb.StringValue\"\x00\x12.\n\x07Version\x12\x10.pb.EmptyMessage\x1a\x0f.pb.StringValue\"\x00\x12/\n\x08\x46\x65\x61tures\x12\x10.pb.EmptyMessage\x1a\x0f.pb.StringValue\"\x00\x12\x30\n\tBroadcast\x12\x10.pb.EmptyMessage\x1a\x0f.pb.UInt32Value\"\x00\x42&Z$github.com/lbryio/hub/protobuf/go/pbb\x06proto3'
serialized_pb=b'\n\thub.proto\x12\x02pb\x1a\x0cresult.proto\"\x0e\n\x0c\x45mptyMessage\".\n\rServerMessage\x12\x0f\n\x07\x61\x64\x64ress\x18\x01 \x01(\t\x12\x0c\n\x04port\x18\x02 \x01(\t\"N\n\x0cHelloMessage\x12\x0c\n\x04port\x18\x01 \x01(\t\x12\x0c\n\x04host\x18\x02 \x01(\t\x12\"\n\x07servers\x18\x03 \x03(\x0b\x32\x11.pb.ServerMessage\"0\n\x0fInvertibleField\x12\x0e\n\x06invert\x18\x01 \x01(\x08\x12\r\n\x05value\x18\x02 \x03(\t\"\x1c\n\x0bStringValue\x12\r\n\x05value\x18\x01 \x01(\t\"\x1c\n\x0bStringArray\x12\r\n\x05value\x18\x01 \x03(\t\"\x1a\n\tBoolValue\x12\r\n\x05value\x18\x01 \x01(\x08\"\x1c\n\x0bUInt32Value\x12\r\n\x05value\x18\x01 \x01(\r\"j\n\nRangeField\x12\x1d\n\x02op\x18\x01 \x01(\x0e\x32\x11.pb.RangeField.Op\x12\r\n\x05value\x18\x02 \x03(\x05\".\n\x02Op\x12\x06\n\x02\x45Q\x10\x00\x12\x07\n\x03LTE\x10\x01\x12\x07\n\x03GTE\x10\x02\x12\x06\n\x02LT\x10\x03\x12\x06\n\x02GT\x10\x04\"\xfd\x0b\n\rSearchRequest\x12%\n\x08\x63laim_id\x18\x01 \x01(\x0b\x32\x13.pb.InvertibleField\x12\'\n\nchannel_id\x18\x02 \x01(\x0b\x32\x13.pb.InvertibleField\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\r\n\x05limit\x18\x04 \x01(\x05\x12\x10\n\x08order_by\x18\x05 \x03(\t\x12\x0e\n\x06offset\x18\x06 \x01(\r\x12\x16\n\x0eis_controlling\x18\x07 \x01(\x08\x12\x1d\n\x15last_take_over_height\x18\x08 \x01(\t\x12\x12\n\nclaim_name\x18\t \x01(\t\x12\x17\n\x0fnormalized_name\x18\n \x01(\t\x12#\n\x0btx_position\x18\x0b \x03(\x0b\x32\x0e.pb.RangeField\x12\x1e\n\x06\x61mount\x18\x0c \x03(\x0b\x32\x0e.pb.RangeField\x12!\n\ttimestamp\x18\r \x03(\x0b\x32\x0e.pb.RangeField\x12*\n\x12\x63reation_timestamp\x18\x0e \x03(\x0b\x32\x0e.pb.RangeField\x12\x1e\n\x06height\x18\x0f \x03(\x0b\x32\x0e.pb.RangeField\x12\'\n\x0f\x63reation_height\x18\x10 \x03(\x0b\x32\x0e.pb.RangeField\x12)\n\x11\x61\x63tivation_height\x18\x11 \x03(\x0b\x32\x0e.pb.RangeField\x12)\n\x11\x65xpiration_height\x18\x12 \x03(\x0b\x32\x0e.pb.RangeField\x12$\n\x0crelease_time\x18\x13 \x03(\x0b\x32\x0e.pb.RangeField\x12\x11\n\tshort_url\x18\x14 \x01(\t\x12\x15\n\rcanonical_url\x18\x15 \x01(\t\x12\r\n\x05title\x18\x16 \x01(\t\x12\x0e\n\x06\x61uthor\x18\x17 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x18 \x01(\t\x12\x12\n\nclaim_type\x18\x19 \x03(\t\x12$\n\x0crepost_count\x18\x1a \x03(\x0b\x32\x0e.pb.RangeField\x12\x13\n\x0bstream_type\x18\x1b \x03(\t\x12\x12\n\nmedia_type\x18\x1c \x03(\t\x12\"\n\nfee_amount\x18\x1d \x03(\x0b\x32\x0e.pb.RangeField\x12\x14\n\x0c\x66\x65\x65_currency\x18\x1e \x01(\t\x12 \n\x08\x64uration\x18\x1f \x03(\x0b\x32\x0e.pb.RangeField\x12\x19\n\x11reposted_claim_id\x18 \x01(\t\x12#\n\x0b\x63\x65nsor_type\x18! \x03(\x0b\x32\x0e.pb.RangeField\x12\x19\n\x11\x63laims_in_channel\x18\" \x01(\t\x12)\n\x12is_signature_valid\x18$ \x01(\x0b\x32\r.pb.BoolValue\x12(\n\x10\x65\x66\x66\x65\x63tive_amount\x18% \x03(\x0b\x32\x0e.pb.RangeField\x12&\n\x0esupport_amount\x18& \x03(\x0b\x32\x0e.pb.RangeField\x12&\n\x0etrending_score\x18\' \x03(\x0b\x32\x0e.pb.RangeField\x12\r\n\x05tx_id\x18+ \x01(\t\x12 \n\x07tx_nout\x18, \x01(\x0b\x32\x0f.pb.UInt32Value\x12\x11\n\tsignature\x18- \x01(\t\x12\x18\n\x10signature_digest\x18. \x01(\t\x12\x18\n\x10public_key_bytes\x18/ \x01(\t\x12\x15\n\rpublic_key_id\x18\x30 \x01(\t\x12\x10\n\x08\x61ny_tags\x18\x31 \x03(\t\x12\x10\n\x08\x61ll_tags\x18\x32 \x03(\t\x12\x10\n\x08not_tags\x18\x33 \x03(\t\x12\x1d\n\x15has_channel_signature\x18\x34 \x01(\x08\x12!\n\nhas_source\x18\x35 \x01(\x0b\x32\r.pb.BoolValue\x12 \n\x18limit_claims_per_channel\x18\x36 \x01(\x05\x12\x15\n\rany_languages\x18\x37 \x03(\t\x12\x15\n\rall_languages\x18\x38 \x03(\t\x12\x19\n\x11remove_duplicates\x18\x39 \x01(\x08\x12\x11\n\tno_totals\x18: \x01(\x08\x32\xe2\x03\n\x03Hub\x12*\n\x06Search\x12\x11.pb.SearchRequest\x1a\x0b.pb.Outputs\"\x00\x12+\n\x04Ping\x12\x10.pb.EmptyMessage\x1a\x0f.pb.StringValue\"\x00\x12-\n\x05Hello\x12\x10.pb.HelloMessage\x1a\x10.pb.HelloMessage\"\x00\x12/\n\x07\x41\x64\x64Peer\x12\x11.pb.ServerMessage\x1a\x0f.pb.StringValue\"\x00\x12\x35\n\rPeerSubscribe\x12\x11.pb.ServerMessage\x1a\x0f.pb.StringValue\"\x00\x12.\n\x07Version\x12\x10.pb.EmptyMessage\x1a\x0f.pb.StringValue\"\x00\x12/\n\x08\x46\x65\x61tures\x12\x10.pb.EmptyMessage\x1a\x0f.pb.StringValue\"\x00\x12\x30\n\tBroadcast\x12\x10.pb.EmptyMessage\x1a\x0f.pb.UInt32Value\"\x00\x12-\n\x06Height\x12\x10.pb.EmptyMessage\x1a\x0f.pb.UInt32Value\"\x00\x12)\n\x07Resolve\x12\x0f.pb.StringArray\x1a\x0b.pb.Outputs\"\x00\x42&Z$github.com/lbryio/hub/protobuf/go/pbb\x06proto3'
,
dependencies=[result__pb2.DESCRIPTOR,])
@ -61,8 +61,8 @@ _RANGEFIELD_OP = _descriptor.EnumDescriptor(
],
containing_type=None,
serialized_options=None,
serialized_start=373,
serialized_end=419,
serialized_start=403,
serialized_end=449,
)
_sym_db.RegisterEnumDescriptor(_RANGEFIELD_OP)
@ -248,6 +248,38 @@ _STRINGVALUE = _descriptor.Descriptor(
)
_STRINGARRAY = _descriptor.Descriptor(
name='StringArray',
full_name='pb.StringArray',
filename=None,
file=DESCRIPTOR,
containing_type=None,
create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='value', full_name='pb.StringArray.value', index=0,
number=1, type=9, cpp_type=9, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=255,
serialized_end=283,
)
_BOOLVALUE = _descriptor.Descriptor(
name='BoolValue',
full_name='pb.BoolValue',
@ -275,8 +307,8 @@ _BOOLVALUE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=255,
serialized_end=281,
serialized_start=285,
serialized_end=311,
)
@ -307,8 +339,8 @@ _UINT32VALUE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=283,
serialized_end=311,
serialized_start=313,
serialized_end=341,
)
@ -347,8 +379,8 @@ _RANGEFIELD = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=313,
serialized_end=419,
serialized_start=343,
serialized_end=449,
)
@ -738,13 +770,6 @@ _SEARCHREQUEST = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='sd_hash', full_name='pb.SearchRequest.sd_hash', index=54,
number=59, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
@ -757,8 +782,8 @@ _SEARCHREQUEST = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=422,
serialized_end=1972,
serialized_start=452,
serialized_end=1985,
)
_HELLOMESSAGE.fields_by_name['servers'].message_type = _SERVERMESSAGE
@ -790,6 +815,7 @@ DESCRIPTOR.message_types_by_name['ServerMessage'] = _SERVERMESSAGE
DESCRIPTOR.message_types_by_name['HelloMessage'] = _HELLOMESSAGE
DESCRIPTOR.message_types_by_name['InvertibleField'] = _INVERTIBLEFIELD
DESCRIPTOR.message_types_by_name['StringValue'] = _STRINGVALUE
DESCRIPTOR.message_types_by_name['StringArray'] = _STRINGARRAY
DESCRIPTOR.message_types_by_name['BoolValue'] = _BOOLVALUE
DESCRIPTOR.message_types_by_name['UInt32Value'] = _UINT32VALUE
DESCRIPTOR.message_types_by_name['RangeField'] = _RANGEFIELD
@ -831,6 +857,13 @@ StringValue = _reflection.GeneratedProtocolMessageType('StringValue', (_message.
})
_sym_db.RegisterMessage(StringValue)
StringArray = _reflection.GeneratedProtocolMessageType('StringArray', (_message.Message,), {
'DESCRIPTOR' : _STRINGARRAY,
'__module__' : 'hub_pb2'
# @@protoc_insertion_point(class_scope:pb.StringArray)
})
_sym_db.RegisterMessage(StringArray)
BoolValue = _reflection.GeneratedProtocolMessageType('BoolValue', (_message.Message,), {
'DESCRIPTOR' : _BOOLVALUE,
'__module__' : 'hub_pb2'
@ -869,8 +902,8 @@ _HUB = _descriptor.ServiceDescriptor(
index=0,
serialized_options=None,
create_key=_descriptor._internal_create_key,
serialized_start=1975,
serialized_end=2367,
serialized_start=1988,
serialized_end=2470,
methods=[
_descriptor.MethodDescriptor(
name='Search',
@ -952,6 +985,26 @@ _HUB = _descriptor.ServiceDescriptor(
serialized_options=None,
create_key=_descriptor._internal_create_key,
),
_descriptor.MethodDescriptor(
name='Height',
full_name='pb.Hub.Height',
index=8,
containing_service=None,
input_type=_EMPTYMESSAGE,
output_type=_UINT32VALUE,
serialized_options=None,
create_key=_descriptor._internal_create_key,
),
_descriptor.MethodDescriptor(
name='Resolve',
full_name='pb.Hub.Resolve',
index=9,
containing_service=None,
input_type=_STRINGARRAY,
output_type=result__pb2._OUTPUTS,
serialized_options=None,
create_key=_descriptor._internal_create_key,
),
])
_sym_db.RegisterServiceDescriptor(_HUB)

View file

@ -55,6 +55,16 @@ class HubStub(object):
request_serializer=hub__pb2.EmptyMessage.SerializeToString,
response_deserializer=hub__pb2.UInt32Value.FromString,
)
self.Height = channel.unary_unary(
'/pb.Hub/Height',
request_serializer=hub__pb2.EmptyMessage.SerializeToString,
response_deserializer=hub__pb2.UInt32Value.FromString,
)
self.Resolve = channel.unary_unary(
'/pb.Hub/Resolve',
request_serializer=hub__pb2.StringArray.SerializeToString,
response_deserializer=result__pb2.Outputs.FromString,
)
class HubServicer(object):
@ -108,6 +118,18 @@ class HubServicer(object):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Height(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Resolve(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_HubServicer_to_server(servicer, server):
rpc_method_handlers = {
@ -151,6 +173,16 @@ def add_HubServicer_to_server(servicer, server):
request_deserializer=hub__pb2.EmptyMessage.FromString,
response_serializer=hub__pb2.UInt32Value.SerializeToString,
),
'Height': grpc.unary_unary_rpc_method_handler(
servicer.Height,
request_deserializer=hub__pb2.EmptyMessage.FromString,
response_serializer=hub__pb2.UInt32Value.SerializeToString,
),
'Resolve': grpc.unary_unary_rpc_method_handler(
servicer.Resolve,
request_deserializer=hub__pb2.StringArray.FromString,
response_serializer=result__pb2.Outputs.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'pb.Hub', rpc_method_handlers)
@ -296,3 +328,37 @@ class Hub(object):
hub__pb2.UInt32Value.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def Height(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/pb.Hub/Height',
hub__pb2.EmptyMessage.SerializeToString,
hub__pb2.UInt32Value.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def Resolve(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/pb.Hub/Resolve',
hub__pb2.StringArray.SerializeToString,
result__pb2.Outputs.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

View file

@ -2,6 +2,7 @@ import asyncio
import json
import hashlib
import sys
import time
from bisect import bisect_right
from binascii import hexlify, unhexlify
from collections import defaultdict
@ -1533,6 +1534,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')
async def test_reorg(self):
await asyncio.wait_for(self.on_header(206), 3.0)
self.assertEqual(self.ledger.headers.height, 206)
channel_name = '@abc'
@ -1628,6 +1630,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
await self.assertBlockHash(208)
claim = await self.resolve('hovercraft')
self.assertEqual(claim['txid'], broadcast_tx.id)
self.assertEqual(claim['height'], 208)
@ -1635,6 +1638,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
invalidated_block_hash = (await self.ledger.headers.hash(208)).decode()
block_207 = await self.blockchain.get_block(invalidated_block_hash)
self.assertIn(claim['txid'], block_207['tx'])
await asyncio.wait_for(self.on_header(208), 3.0)
self.assertEqual(208, claim['height'])
# reorg the last block dropping our claim tx
@ -1706,6 +1710,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
self.assertEqual(self.ledger.headers.height, 208)
await self.assertBlockHash(208)
await asyncio.wait_for(self.on_header(208), 30.0)
claim = await self.resolve('hovercraft')
self.assertEqual(claim['txid'], broadcast_tx.id)
self.assertEqual(claim['height'], 208)