diff --git a/lbry/lbry/wallet/server/metrics.py b/lbry/lbry/wallet/server/metrics.py index 622999de3..f1ee4e5d1 100644 --- a/lbry/lbry/wallet/server/metrics.py +++ b/lbry/lbry/wallet/server/metrics.py @@ -52,7 +52,7 @@ class APICallMetrics: self.errored_queries = set() self.interrupted_queries = set() - def to_json_and_reset(self): + def to_json(self): return { # total requests received "receive_count": self.receive_count, @@ -128,7 +128,7 @@ class ServerLoadData: def to_json_and_reset(self, status): try: return { - 'api': {name: api.to_json_and_reset() for name, api in self._apis.items()}, + 'api': {name: api.to_json() for name, api in self._apis.items()}, 'status': status } finally: diff --git a/lbry/lbry/wallet/server/session.py b/lbry/lbry/wallet/server/session.py index 2f3c29314..aca6ac8e7 100644 --- a/lbry/lbry/wallet/server/session.py +++ b/lbry/lbry/wallet/server/session.py @@ -112,33 +112,44 @@ class LBRYElectrumX(ElectrumX): } self.request_handlers.update(handlers) - async def run_in_executor(self, metrics: APICallMetrics, func, kwargs): + def get_metrics_or_placeholder_for_api(self, query_name): + """ Do not hold on to a reference to the metrics + returned by this method past an `await` or + you may be working with a stale metrics object. + """ + if self.env.track_metrics: + return self.session_mgr.metrics.for_api(query_name) + else: + return APICallMetrics(query_name) + + async def run_in_executor(self, query_name, func, kwargs): start = time.perf_counter() try: result = await asyncio.get_running_loop().run_in_executor( self.session_mgr.query_executor, func, kwargs ) except reader.SQLiteInterruptedError as error: + metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics.query_interrupt(start, error.metrics) raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out') except reader.SQLiteOperationalError as error: + metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics.query_error(start, error.metrics) raise RPCError(JSONRPC.INTERNAL_ERROR, 'query failed to execute') except: + metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics.query_error(start, {}) raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error') if self.env.track_metrics: + metrics = self.get_metrics_or_placeholder_for_api(query_name) (result, metrics_data) = result metrics.query_response(start, metrics_data) return base64.b64encode(result).decode() async def run_and_cache_query(self, query_name, function, kwargs): - if self.env.track_metrics: - metrics = self.session_mgr.metrics.for_api(query_name) - else: - metrics = APICallMetrics(query_name) + metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics.start() cache = self.session_mgr.search_cache[query_name] cache_key = str(kwargs) @@ -151,9 +162,10 @@ class LBRYElectrumX(ElectrumX): async with cache_item.lock: if cache_item.result is None: cache_item.result = await self.run_in_executor( - metrics, function, kwargs + query_name, function, kwargs ) else: + metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics.cache_response() return cache_item.result