Merge pull request #2908 from lbryio/batched-resolve

Fix connection to wallet server breaking upon giant resolve requests
This commit is contained in:
Jack Robison 2020-04-06 12:52:51 -04:00 committed by GitHub
commit 3591768745
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 19 additions and 3 deletions

View file

@ -752,7 +752,11 @@ class Ledger(metaclass=LedgerRegistry):
async def resolve(self, accounts, urls, **kwargs): async def resolve(self, accounts, urls, **kwargs):
resolve = partial(self.network.retriable_call, self.network.resolve) resolve = partial(self.network.retriable_call, self.network.resolve)
txos = (await self._inflate_outputs(resolve(urls), accounts, **kwargs))[0] urls_copy = list(urls)
txos = []
while urls_copy:
batch, urls_copy = urls_copy[:500], urls_copy[500:]
txos.extend((await self._inflate_outputs(resolve(batch), accounts, **kwargs))[0])
assert len(urls) == len(txos), "Mismatch between urls requested for resolve and responses received." assert len(urls) == len(txos), "Mismatch between urls requested for resolve and responses received."
result = {} result = {}
for url, txo in zip(urls, txos): for url, txo in zip(urls, txos):

View file

@ -39,8 +39,7 @@ from lbry.wallet.tasks import TaskGroup
from .jsonrpc import Request, JSONRPCConnection, JSONRPCv2, JSONRPC, Batch, Notification from .jsonrpc import Request, JSONRPCConnection, JSONRPCv2, JSONRPC, Batch, Notification
from .jsonrpc import RPCError, ProtocolError from .jsonrpc import RPCError, ProtocolError
from .framing import BadMagicError, BadChecksumError, OversizedPayloadError, BitcoinFramer, NewlineFramer from .framing import BadMagicError, BadChecksumError, OversizedPayloadError, BitcoinFramer, NewlineFramer
from .util import Concurrency from lbry.wallet.server.prometheus import NOTIFICATION_COUNT, RESPONSE_TIMES, REQUEST_ERRORS_COUNT, RESET_CONNECTIONS
from lbry.wallet.server.prometheus import NOTIFICATION_COUNT, RESPONSE_TIMES, REQUEST_ERRORS_COUNT
class Connector: class Connector:
@ -389,6 +388,7 @@ class RPCSession(SessionBase):
except MemoryError: except MemoryError:
self.logger.warning('received oversized message from %s:%s, dropping connection', self.logger.warning('received oversized message from %s:%s, dropping connection',
self._address[0], self._address[1]) self._address[0], self._address[1])
RESET_CONNECTIONS.labels(version=self.client_version).inc()
self._close() self._close()
return return

View file

@ -54,6 +54,11 @@ BLOCK_UPDATE_TIMES = Histogram("block_time", "Block update times", namespace=NAM
REORG_COUNT = Gauge( REORG_COUNT = Gauge(
"reorg_count", "Number of reorgs", namespace=NAMESPACE "reorg_count", "Number of reorgs", namespace=NAMESPACE
) )
RESET_CONNECTIONS = Counter(
"reset_clients", "Number of reset connections by client version",
namespace=NAMESPACE, labelnames=("version",)
)
class PrometheusServer: class PrometheusServer:
def __init__(self): def __init__(self):

View file

@ -1,6 +1,7 @@
import os.path import os.path
import tempfile import tempfile
import logging import logging
import asyncio
from binascii import unhexlify from binascii import unhexlify
from urllib.request import urlopen from urllib.request import urlopen
@ -79,6 +80,12 @@ class ClaimSearchCommand(ClaimTestCase):
] * 23828 ] * 23828
self.assertListEqual([], await self.claim_search(claim_ids=claim_ids)) self.assertListEqual([], await self.claim_search(claim_ids=claim_ids))
# this should do nothing... if the resolve (which is retried) results in the server disconnecting,
# it kerplodes
await asyncio.wait_for(self.daemon.jsonrpc_resolve([
f'0000000000000000000000000000000000000000{i}' for i in range(30000)
]), 30)
# 23829 claim ids makes the request just large enough # 23829 claim ids makes the request just large enough
claim_ids = [ claim_ids = [
'0000000000000000000000000000000000000000', '0000000000000000000000000000000000000000',