Compare commits
3 commits
master
...
fix-cache-
Author | SHA1 | Date | |
---|---|---|---|
|
5dc2dadb03 | ||
|
1af58c83f8 | ||
|
a10ae70773 |
3 changed files with 17 additions and 9 deletions
|
@ -94,7 +94,7 @@ class ClientSession(BaseClientSession):
|
||||||
await self.create_connection(self.timeout)
|
await self.create_connection(self.timeout)
|
||||||
await self.ensure_server_version()
|
await self.ensure_server_version()
|
||||||
self._on_connect_cb()
|
self._on_connect_cb()
|
||||||
if (perf_counter() - self.last_send) > self.max_seconds_idle or self.response_time is None:
|
elif (perf_counter() - self.last_send) > self.max_seconds_idle or self.response_time is None:
|
||||||
await self.ensure_server_version()
|
await self.ensure_server_version()
|
||||||
retry_delay = default_delay
|
retry_delay = default_delay
|
||||||
except RPCError as e:
|
except RPCError as e:
|
||||||
|
|
|
@ -919,12 +919,9 @@ class LBRYElectrumX(SessionBase):
|
||||||
metrics.start()
|
metrics.start()
|
||||||
cache = self.session_mgr.search_cache[query_name]
|
cache = self.session_mgr.search_cache[query_name]
|
||||||
cache_key = str(kwargs)
|
cache_key = str(kwargs)
|
||||||
cache_item = cache.get(cache_key)
|
if cache_key not in cache:
|
||||||
if cache_item is None:
|
cache[cache_key] = ResultCacheItem()
|
||||||
cache_item = cache[cache_key] = ResultCacheItem()
|
cache_item = cache[cache_key]
|
||||||
elif cache_item.result is not None:
|
|
||||||
metrics.cache_response()
|
|
||||||
return cache_item.result
|
|
||||||
async with cache_item.lock:
|
async with cache_item.lock:
|
||||||
if cache_item.result is None:
|
if cache_item.result is None:
|
||||||
cache_item.result = await self.run_in_executor(
|
cache_item.result = await self.run_in_executor(
|
||||||
|
|
|
@ -8,6 +8,7 @@ from unittest.mock import Mock
|
||||||
from lbry.wallet.network import Network
|
from lbry.wallet.network import Network
|
||||||
from lbry.wallet.orchstr8.node import SPVNode
|
from lbry.wallet.orchstr8.node import SPVNode
|
||||||
from lbry.wallet.rpc import RPCSession
|
from lbry.wallet.rpc import RPCSession
|
||||||
|
from lbry.wallet.rpc import RPCError
|
||||||
from lbry.testcase import IntegrationTestCase, AsyncioTestCase
|
from lbry.testcase import IntegrationTestCase, AsyncioTestCase
|
||||||
|
|
||||||
|
|
||||||
|
@ -140,12 +141,16 @@ class ReconnectTests(IntegrationTestCase):
|
||||||
|
|
||||||
class ServerPickingTestCase(AsyncioTestCase):
|
class ServerPickingTestCase(AsyncioTestCase):
|
||||||
|
|
||||||
async def _make_fake_server(self, latency=1.0, port=1):
|
async def _make_fake_server(self, latency=1.0, port=1, raise_version_rpc_error=False, return_versions=None):
|
||||||
# local fake server with artificial latency
|
# local fake server with artificial latency
|
||||||
class FakeSession(RPCSession):
|
class FakeSession(RPCSession):
|
||||||
async def handle_request(self, request):
|
async def handle_request(self, request):
|
||||||
await asyncio.sleep(latency)
|
await asyncio.sleep(latency)
|
||||||
if request.method == 'server.version':
|
if request.method == 'server.version':
|
||||||
|
if raise_version_rpc_error:
|
||||||
|
raise RPCError(1, 'derp')
|
||||||
|
if return_versions:
|
||||||
|
return return_versions
|
||||||
return tuple(request.args)
|
return tuple(request.args)
|
||||||
return {'height': 1}
|
return {'height': 1}
|
||||||
server = await self.loop.create_server(lambda: FakeSession(), host='127.0.0.1', port=port)
|
server = await self.loop.create_server(lambda: FakeSession(), host='127.0.0.1', port=port)
|
||||||
|
@ -155,7 +160,10 @@ class ServerPickingTestCase(AsyncioTestCase):
|
||||||
async def _make_bad_server(self, port=42420):
|
async def _make_bad_server(self, port=42420):
|
||||||
async def echo(reader, writer):
|
async def echo(reader, writer):
|
||||||
while True:
|
while True:
|
||||||
writer.write(await reader.read())
|
try:
|
||||||
|
writer.write(await asyncio.wait_for(reader.read(), 1))
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
server = await asyncio.start_server(echo, host='127.0.0.1', port=port)
|
server = await asyncio.start_server(echo, host='127.0.0.1', port=port)
|
||||||
self.addCleanup(server.close)
|
self.addCleanup(server.close)
|
||||||
return '127.0.0.1', port
|
return '127.0.0.1', port
|
||||||
|
@ -169,6 +177,9 @@ class ServerPickingTestCase(AsyncioTestCase):
|
||||||
('example.that.doesnt.resolve', 9000),
|
('example.that.doesnt.resolve', 9000),
|
||||||
await self._make_fake_server(latency=1.0, port=1340),
|
await self._make_fake_server(latency=1.0, port=1340),
|
||||||
await self._make_fake_server(latency=0.1, port=1337),
|
await self._make_fake_server(latency=0.1, port=1337),
|
||||||
|
await self._make_fake_server(latency=0, port=1338, raise_version_rpc_error=True),
|
||||||
|
await self._make_fake_server(latency=0, port=1341, return_versions=('0.0.1', '2.0')),
|
||||||
|
await self._make_fake_server(latency=0, port=1342, return_versions=('0.1', '2.0')),
|
||||||
await self._make_fake_server(latency=0.4, port=1339),
|
await self._make_fake_server(latency=0.4, port=1339),
|
||||||
],
|
],
|
||||||
'connect_timeout': 3
|
'connect_timeout': 3
|
||||||
|
|
Loading…
Add table
Reference in a new issue