From 3469abaefdcc1c7f9cd03b6a34f05727121e36e6 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sat, 2 May 2020 21:23:17 -0400 Subject: [PATCH] write lock metrics --- lbry/utils.py | 24 ++++++++++++++++++++++ lbry/wallet/database.py | 44 ++++++++++++++++++++++++++++++++++------- 2 files changed, 61 insertions(+), 7 deletions(-) diff --git a/lbry/utils.py b/lbry/utils.py index c24d8a971..4aa1215e3 100644 --- a/lbry/utils.py +++ b/lbry/utils.py @@ -3,6 +3,7 @@ import codecs import datetime import random import socket +import time import string import sys import json @@ -19,6 +20,7 @@ import pkg_resources import certifi import aiohttp +from prometheus_client import Histogram from lbry.schema.claim import Claim log = logging.getLogger(__name__) @@ -282,3 +284,25 @@ async def get_external_ip() -> typing.Optional[str]: # used if upnp is disabled def is_running_from_bundle(): # see https://pyinstaller.readthedocs.io/en/stable/runtime-information.html return getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS') + + +class LockWithMetrics(asyncio.Lock): + def __init__(self, acquire_metric, held_time_metric, loop=None): + super().__init__(loop=loop) + self._acquire_metric = acquire_metric + self._lock_held_time_metric = held_time_metric + self._lock_acquired_time = None + + async def acquire(self): + start = time.perf_counter() + try: + return await super().acquire() + finally: + self._lock_acquired_time = time.perf_counter() + self._acquire_metric.observe(self._lock_acquired_time - start) + + def release(self): + try: + return super().release() + finally: + self._lock_held_time_metric.observe(time.perf_counter() - self._lock_acquired_time) diff --git a/lbry/wallet/database.py b/lbry/wallet/database.py index 55f45ff63..8a39ee22a 100644 --- a/lbry/wallet/database.py +++ b/lbry/wallet/database.py @@ -3,7 +3,6 @@ import logging import asyncio import sqlite3 import platform -import time from binascii import hexlify from dataclasses import dataclass from contextvars import ContextVar @@ -11,7 +10,8 @@ from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.process import ProcessPoolExecutor from typing import Tuple, List, Union, Callable, Any, Awaitable, Iterable, Dict, Optional from datetime import date -from prometheus_client import Gauge +from prometheus_client import Gauge, Counter, Histogram +from lbry.utils import LockWithMetrics from .bip32 import PubKey from .transaction import Transaction, Output, OutputScript, TXRefImmutable @@ -72,6 +72,18 @@ class AIOSQLite: waiting_reads_metric = Gauge( "waiting_reads_count", "Number of waiting db writes", namespace="daemon_database" ) + write_count_metric = Counter( + "write_count", "Number of database writes", namespace="daemon_database" + ) + read_count_metric = Counter( + "read_count", "Number of database reads", namespace="daemon_database" + ) + acquire_write_lock_metric = Histogram( + f'write_lock_acquired', 'Time to acquire the write lock', namespace="daemon_database" + ) + held_write_lock_metric = Histogram( + f'write_lock_held', 'Length of time the write lock is held for', namespace="daemon_database" + ) def __init__(self): # has to be single threaded as there is no mapping of thread:connection @@ -79,7 +91,7 @@ class AIOSQLite: self.writer_connection: Optional[sqlite3.Connection] = None self._closing = False self.query_count = 0 - self.write_lock = asyncio.Lock() + self.write_lock = LockWithMetrics(self.acquire_write_lock_metric, self.held_write_lock_metric) self.writers = 0 self.read_ready = asyncio.Event() self.urgent_read_done = asyncio.Event() @@ -127,6 +139,7 @@ class AIOSQLite: urgent_read = False if read_only: self.waiting_reads_metric.inc() + self.read_count_metric.inc() try: while self.writers: # more writes can come in while we are waiting for the first if not urgent_read and still_waiting and self.urgent_read_done.is_set(): @@ -161,6 +174,7 @@ class AIOSQLite: return self.run(lambda conn: conn.execute(sql, parameters)) async def run(self, fun, *args, **kwargs): + self.write_count_metric.inc() self.waiting_writes_metric.inc() try: await self.urgent_read_done.wait() @@ -193,10 +207,26 @@ class AIOSQLite: log.warning("rolled back") raise - def run_with_foreign_keys_disabled(self, fun, *args, **kwargs) -> Awaitable: - return asyncio.get_event_loop().run_in_executor( - self.writer_executor, self.__run_transaction_with_foreign_keys_disabled, fun, args, kwargs - ) + async def run_with_foreign_keys_disabled(self, fun, *args, **kwargs): + self.write_count_metric.inc() + self.waiting_writes_metric.inc() + try: + await self.urgent_read_done.wait() + except Exception as e: + self.waiting_writes_metric.dec() + raise e + self.writers += 1 + self.read_ready.clear() + try: + async with self.write_lock: + return await asyncio.get_event_loop().run_in_executor( + self.writer_executor, self.__run_transaction_with_foreign_keys_disabled, fun, args, kwargs + ) + finally: + self.writers -= 1 + self.waiting_writes_metric.dec() + if not self.writers: + self.read_ready.set() def __run_transaction_with_foreign_keys_disabled(self, fun: Callable[[sqlite3.Connection, Any, Any], Any],