Compare commits

...
Sign in to create a new pull request.

3 commits

Author SHA1 Message Date
Jack Robison
5dc2dadb03
clarify second server.version isn't sent 2020-01-26 12:28:44 -05:00
Jack Robison
1af58c83f8
improve test_pick_fastest 2020-01-26 12:27:24 -05:00
Jack Robison
a10ae70773
fix cache_query race condition 2020-01-26 12:26:46 -05:00
3 changed files with 17 additions and 9 deletions

View file

@ -94,7 +94,7 @@ class ClientSession(BaseClientSession):
await self.create_connection(self.timeout) await self.create_connection(self.timeout)
await self.ensure_server_version() await self.ensure_server_version()
self._on_connect_cb() self._on_connect_cb()
if (perf_counter() - self.last_send) > self.max_seconds_idle or self.response_time is None: elif (perf_counter() - self.last_send) > self.max_seconds_idle or self.response_time is None:
await self.ensure_server_version() await self.ensure_server_version()
retry_delay = default_delay retry_delay = default_delay
except RPCError as e: except RPCError as e:

View file

@ -919,12 +919,9 @@ class LBRYElectrumX(SessionBase):
metrics.start() metrics.start()
cache = self.session_mgr.search_cache[query_name] cache = self.session_mgr.search_cache[query_name]
cache_key = str(kwargs) cache_key = str(kwargs)
cache_item = cache.get(cache_key) if cache_key not in cache:
if cache_item is None: cache[cache_key] = ResultCacheItem()
cache_item = cache[cache_key] = ResultCacheItem() cache_item = cache[cache_key]
elif cache_item.result is not None:
metrics.cache_response()
return cache_item.result
async with cache_item.lock: async with cache_item.lock:
if cache_item.result is None: if cache_item.result is None:
cache_item.result = await self.run_in_executor( cache_item.result = await self.run_in_executor(

View file

@ -8,6 +8,7 @@ from unittest.mock import Mock
from lbry.wallet.network import Network from lbry.wallet.network import Network
from lbry.wallet.orchstr8.node import SPVNode from lbry.wallet.orchstr8.node import SPVNode
from lbry.wallet.rpc import RPCSession from lbry.wallet.rpc import RPCSession
from lbry.wallet.rpc import RPCError
from lbry.testcase import IntegrationTestCase, AsyncioTestCase from lbry.testcase import IntegrationTestCase, AsyncioTestCase
@ -140,12 +141,16 @@ class ReconnectTests(IntegrationTestCase):
class ServerPickingTestCase(AsyncioTestCase): class ServerPickingTestCase(AsyncioTestCase):
async def _make_fake_server(self, latency=1.0, port=1): async def _make_fake_server(self, latency=1.0, port=1, raise_version_rpc_error=False, return_versions=None):
# local fake server with artificial latency # local fake server with artificial latency
class FakeSession(RPCSession): class FakeSession(RPCSession):
async def handle_request(self, request): async def handle_request(self, request):
await asyncio.sleep(latency) await asyncio.sleep(latency)
if request.method == 'server.version': if request.method == 'server.version':
if raise_version_rpc_error:
raise RPCError(1, 'derp')
if return_versions:
return return_versions
return tuple(request.args) return tuple(request.args)
return {'height': 1} return {'height': 1}
server = await self.loop.create_server(lambda: FakeSession(), host='127.0.0.1', port=port) server = await self.loop.create_server(lambda: FakeSession(), host='127.0.0.1', port=port)
@ -155,7 +160,10 @@ class ServerPickingTestCase(AsyncioTestCase):
async def _make_bad_server(self, port=42420): async def _make_bad_server(self, port=42420):
async def echo(reader, writer): async def echo(reader, writer):
while True: while True:
writer.write(await reader.read()) try:
writer.write(await asyncio.wait_for(reader.read(), 1))
except asyncio.TimeoutError:
pass
server = await asyncio.start_server(echo, host='127.0.0.1', port=port) server = await asyncio.start_server(echo, host='127.0.0.1', port=port)
self.addCleanup(server.close) self.addCleanup(server.close)
return '127.0.0.1', port return '127.0.0.1', port
@ -169,6 +177,9 @@ class ServerPickingTestCase(AsyncioTestCase):
('example.that.doesnt.resolve', 9000), ('example.that.doesnt.resolve', 9000),
await self._make_fake_server(latency=1.0, port=1340), await self._make_fake_server(latency=1.0, port=1340),
await self._make_fake_server(latency=0.1, port=1337), await self._make_fake_server(latency=0.1, port=1337),
await self._make_fake_server(latency=0, port=1338, raise_version_rpc_error=True),
await self._make_fake_server(latency=0, port=1341, return_versions=('0.0.1', '2.0')),
await self._make_fake_server(latency=0, port=1342, return_versions=('0.1', '2.0')),
await self._make_fake_server(latency=0.4, port=1339), await self._make_fake_server(latency=0.4, port=1339),
], ],
'connect_timeout': 3 'connect_timeout': 3