Compare commits

...
Sign in to create a new pull request.

3 commits

Author SHA1 Message Date
Jack Robison
5dc2dadb03
clarify second server.version isn't sent 2020-01-26 12:28:44 -05:00
Jack Robison
1af58c83f8
improve test_pick_fastest 2020-01-26 12:27:24 -05:00
Jack Robison
a10ae70773
fix cache_query race condition 2020-01-26 12:26:46 -05:00
3 changed files with 17 additions and 9 deletions

View file

@ -94,7 +94,7 @@ class ClientSession(BaseClientSession):
await self.create_connection(self.timeout)
await self.ensure_server_version()
self._on_connect_cb()
if (perf_counter() - self.last_send) > self.max_seconds_idle or self.response_time is None:
elif (perf_counter() - self.last_send) > self.max_seconds_idle or self.response_time is None:
await self.ensure_server_version()
retry_delay = default_delay
except RPCError as e:

View file

@ -919,12 +919,9 @@ class LBRYElectrumX(SessionBase):
metrics.start()
cache = self.session_mgr.search_cache[query_name]
cache_key = str(kwargs)
cache_item = cache.get(cache_key)
if cache_item is None:
cache_item = cache[cache_key] = ResultCacheItem()
elif cache_item.result is not None:
metrics.cache_response()
return cache_item.result
if cache_key not in cache:
cache[cache_key] = ResultCacheItem()
cache_item = cache[cache_key]
async with cache_item.lock:
if cache_item.result is None:
cache_item.result = await self.run_in_executor(

View file

@ -8,6 +8,7 @@ from unittest.mock import Mock
from lbry.wallet.network import Network
from lbry.wallet.orchstr8.node import SPVNode
from lbry.wallet.rpc import RPCSession
from lbry.wallet.rpc import RPCError
from lbry.testcase import IntegrationTestCase, AsyncioTestCase
@ -140,12 +141,16 @@ class ReconnectTests(IntegrationTestCase):
class ServerPickingTestCase(AsyncioTestCase):
async def _make_fake_server(self, latency=1.0, port=1):
async def _make_fake_server(self, latency=1.0, port=1, raise_version_rpc_error=False, return_versions=None):
# local fake server with artificial latency
class FakeSession(RPCSession):
async def handle_request(self, request):
await asyncio.sleep(latency)
if request.method == 'server.version':
if raise_version_rpc_error:
raise RPCError(1, 'derp')
if return_versions:
return return_versions
return tuple(request.args)
return {'height': 1}
server = await self.loop.create_server(lambda: FakeSession(), host='127.0.0.1', port=port)
@ -155,7 +160,10 @@ class ServerPickingTestCase(AsyncioTestCase):
async def _make_bad_server(self, port=42420):
async def echo(reader, writer):
while True:
writer.write(await reader.read())
try:
writer.write(await asyncio.wait_for(reader.read(), 1))
except asyncio.TimeoutError:
pass
server = await asyncio.start_server(echo, host='127.0.0.1', port=port)
self.addCleanup(server.close)
return '127.0.0.1', port
@ -169,6 +177,9 @@ class ServerPickingTestCase(AsyncioTestCase):
('example.that.doesnt.resolve', 9000),
await self._make_fake_server(latency=1.0, port=1340),
await self._make_fake_server(latency=0.1, port=1337),
await self._make_fake_server(latency=0, port=1338, raise_version_rpc_error=True),
await self._make_fake_server(latency=0, port=1341, return_versions=('0.0.1', '2.0')),
await self._make_fake_server(latency=0, port=1342, return_versions=('0.1', '2.0')),
await self._make_fake_server(latency=0.4, port=1339),
],
'connect_timeout': 3