reproduce sync error

This commit is contained in:
Jack Robison 2020-05-29 14:22:32 -04:00
parent 5d5cd3499a
commit 1298e0e725
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 75 additions and 3 deletions

View file

@ -167,6 +167,7 @@ class Ledger(metaclass=LedgerRegistry):
self.coin_selection_strategy = None self.coin_selection_strategy = None
self._known_addresses_out_of_sync = set() self._known_addresses_out_of_sync = set()
self.went_out_of_sync = asyncio.Queue()
self.fee_per_name_char = self.config.get('fee_per_name_char', self.default_fee_per_name_char) self.fee_per_name_char = self.config.get('fee_per_name_char', self.default_fee_per_name_char)
self._balance_cache = pylru.lrucache(100000) self._balance_cache = pylru.lrucache(100000)
@ -589,11 +590,12 @@ class Ledger(metaclass=LedgerRegistry):
"******", "******",
address, remote_status, len(remote_history), len(remote_set), address, remote_status, len(remote_history), len(remote_set),
local_status, len(local_history), len(local_set), len(remote_set.symmetric_difference(local_set)), local_status, len(local_history), len(local_set), len(remote_set.symmetric_difference(local_set)),
"\n".join([f"{txid} - {height}" for txid, height in local_set.difference(remote_set)]), "\n".join([f"{txid} - {height}" for txid, height in remote_set.difference(local_set)]),
"\n".join([f"{txid} - {height}" for txid, height in remote_set.difference(local_set)]) "\n".join([f"{txid} - {height}" for txid, height in local_set.difference(remote_set)])
) )
# log.warning("local: %s", local_history) # log.warning("local: %s", local_history)
# log.warning("remote: %s", remote_history) # log.warning("remote: %s", remote_history)
self.went_out_of_sync.put_nowait(address)
self._known_addresses_out_of_sync.add(address) self._known_addresses_out_of_sync.add(address)
return False return False
else: else:

View file

@ -756,6 +756,8 @@ class LBRYBlockProcessor(BlockProcessor):
self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}") self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}")
self.sql: SQLDB = self.db.sql self.sql: SQLDB = self.db.sql
self.timer = Timer('BlockProcessor') self.timer = Timer('BlockProcessor')
self.block_notify = asyncio.Event()
self.block_notify.set()
def advance_blocks(self, blocks): def advance_blocks(self, blocks):
self.sql.begin() self.sql.begin()

View file

@ -915,10 +915,15 @@ class LBRYElectrumX(SessionBase):
def sub_count(self): def sub_count(self):
return len(self.hashX_subs) return len(self.hashX_subs)
UGLY_COUNT = 0
async def notify(self, touched, height_changed): async def notify(self, touched, height_changed):
"""Notify the client about changes to touched addresses (from mempool """Notify the client about changes to touched addresses (from mempool
updates or new blocks) and height. updates or new blocks) and height.
""" """
self.UGLY_COUNT += 1
if height_changed and self.subscribe_headers: if height_changed and self.subscribe_headers:
args = (await self.subscribe_headers_result(), ) args = (await self.subscribe_headers_result(), )
try: try:
@ -934,6 +939,11 @@ class LBRYElectrumX(SessionBase):
for hashX in touched: for hashX in touched:
alias = self.hashX_subs[hashX] alias = self.hashX_subs[hashX]
if self.UGLY_COUNT == 25:
print('sleeping for ', hashX)
if not self.bp.block_notify.is_set():
await self.bp.block_notify.wait()
await asyncio.sleep(3)
status = await self.address_status(hashX) status = await self.address_status(hashX)
changed[alias] = status changed[alias] = status
@ -955,8 +965,10 @@ class LBRYElectrumX(SessionBase):
else: else:
method = 'blockchain.address.subscribe' method = 'blockchain.address.subscribe'
start = time.perf_counter() start = time.perf_counter()
if not self.bp.block_notify.is_set():
await self.bp.block_notify.wait()
t = asyncio.create_task(self.send_notification(method, (alias, status))) t = asyncio.create_task(self.send_notification(method, (alias, status)))
t.add_done_callback(lambda _: self.logger.info("sent notification to %s in %s", alias, time.perf_counter() - start)) t.add_done_callback(lambda _: self.logger.warning("sent notification to %s in %s", alias, time.perf_counter() - start))
if changed: if changed:
es = '' if len(changed) == 1 else 'es' es = '' if len(changed) == 1 else 'es'

View file

@ -40,6 +40,8 @@ class ClaimTestCase(CommandTestCase):
class ClaimSearchCommand(ClaimTestCase): class ClaimSearchCommand(ClaimTestCase):
VERBOSITY = logging.WARNING
async def create_channel(self): async def create_channel(self):
self.channel = await self.channel_create('@abc', '1.0') self.channel = await self.channel_create('@abc', '1.0')
self.channel_id = self.get_claim_id(self.channel) self.channel_id = self.get_claim_id(self.channel)
@ -157,6 +159,60 @@ class ClaimSearchCommand(ClaimTestCase):
await self.stream_abandon(txid=signed2['txid'], nout=0) await self.stream_abandon(txid=signed2['txid'], nout=0)
await self.assertFindsClaims([], channel_ids=[channel_id2]) await self.assertFindsClaims([], channel_ids=[channel_id2])
async def test_break_it(self):
await self.generate(5)
address = await self.account.receiving.get_or_create_usable_address()
sendtxid = await self.blockchain.send_to_address(address, 1)
await self.confirm_tx(sendtxid)
address = await self.account.receiving.get_or_create_usable_address()
sendtxid = await self.blockchain.send_to_address(address, 1)
await self.confirm_tx(sendtxid)
address = await self.account.receiving.get_or_create_usable_address()
sendtxid = await self.blockchain.send_to_address(address, 1)
await self.confirm_tx(sendtxid)
address = await self.account.receiving.get_or_create_usable_address()
sendtxid = await self.blockchain.send_to_address(address, 1)
await self.confirm_tx(sendtxid)
await self.generate(7)
async def _doit(n):
try:
await self.daemon.jsonrpc_channel_create(
name=f'@arena{n}', bid='0.1', blocking=True
)
except InsufficientFundsError:
pass
def doit(n):
asyncio.create_task(_doit(n))
async def break_it():
count = 0
for _ in range(4):
for _ in range(10):
count += 1
if not count % 7:
asyncio.create_task(self.generate(1))
doit(count)
if self.ledger._known_addresses_out_of_sync:
print('out of sync', self.ledger._known_addresses_out_of_sync)
await asyncio.sleep(1)
await self.generate(1)
bp = self.conductor.spv_node.server.bp
break_task = asyncio.create_task(break_it())
address = await self.ledger.went_out_of_sync.get()
bp.block_notify.clear()
print('%s is out of sync' % address)
with self.assertRaises(InsufficientFundsError):
await self.daemon.jsonrpc_channel_create(
name=f'@derp', bid='0.1', blocking=True
)
self.assertTrue(False)
print("woohoo")
if not break_task.done():
break_task.cancel()
async def test_pagination(self): async def test_pagination(self):
await self.create_channel() await self.create_channel()
await self.create_lots_of_streams() await self.create_lots_of_streams()