write lock metrics
This commit is contained in:
parent
797364ee5c
commit
3469abaefd
2 changed files with 61 additions and 7 deletions
|
@ -3,6 +3,7 @@ import codecs
|
||||||
import datetime
|
import datetime
|
||||||
import random
|
import random
|
||||||
import socket
|
import socket
|
||||||
|
import time
|
||||||
import string
|
import string
|
||||||
import sys
|
import sys
|
||||||
import json
|
import json
|
||||||
|
@ -19,6 +20,7 @@ import pkg_resources
|
||||||
|
|
||||||
import certifi
|
import certifi
|
||||||
import aiohttp
|
import aiohttp
|
||||||
|
from prometheus_client import Histogram
|
||||||
from lbry.schema.claim import Claim
|
from lbry.schema.claim import Claim
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
@ -282,3 +284,25 @@ async def get_external_ip() -> typing.Optional[str]: # used if upnp is disabled
|
||||||
def is_running_from_bundle():
|
def is_running_from_bundle():
|
||||||
# see https://pyinstaller.readthedocs.io/en/stable/runtime-information.html
|
# see https://pyinstaller.readthedocs.io/en/stable/runtime-information.html
|
||||||
return getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS')
|
return getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS')
|
||||||
|
|
||||||
|
|
||||||
|
class LockWithMetrics(asyncio.Lock):
|
||||||
|
def __init__(self, acquire_metric, held_time_metric, loop=None):
|
||||||
|
super().__init__(loop=loop)
|
||||||
|
self._acquire_metric = acquire_metric
|
||||||
|
self._lock_held_time_metric = held_time_metric
|
||||||
|
self._lock_acquired_time = None
|
||||||
|
|
||||||
|
async def acquire(self):
|
||||||
|
start = time.perf_counter()
|
||||||
|
try:
|
||||||
|
return await super().acquire()
|
||||||
|
finally:
|
||||||
|
self._lock_acquired_time = time.perf_counter()
|
||||||
|
self._acquire_metric.observe(self._lock_acquired_time - start)
|
||||||
|
|
||||||
|
def release(self):
|
||||||
|
try:
|
||||||
|
return super().release()
|
||||||
|
finally:
|
||||||
|
self._lock_held_time_metric.observe(time.perf_counter() - self._lock_acquired_time)
|
||||||
|
|
|
@ -3,7 +3,6 @@ import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import platform
|
import platform
|
||||||
import time
|
|
||||||
from binascii import hexlify
|
from binascii import hexlify
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from contextvars import ContextVar
|
from contextvars import ContextVar
|
||||||
|
@ -11,7 +10,8 @@ from concurrent.futures.thread import ThreadPoolExecutor
|
||||||
from concurrent.futures.process import ProcessPoolExecutor
|
from concurrent.futures.process import ProcessPoolExecutor
|
||||||
from typing import Tuple, List, Union, Callable, Any, Awaitable, Iterable, Dict, Optional
|
from typing import Tuple, List, Union, Callable, Any, Awaitable, Iterable, Dict, Optional
|
||||||
from datetime import date
|
from datetime import date
|
||||||
from prometheus_client import Gauge
|
from prometheus_client import Gauge, Counter, Histogram
|
||||||
|
from lbry.utils import LockWithMetrics
|
||||||
|
|
||||||
from .bip32 import PubKey
|
from .bip32 import PubKey
|
||||||
from .transaction import Transaction, Output, OutputScript, TXRefImmutable
|
from .transaction import Transaction, Output, OutputScript, TXRefImmutable
|
||||||
|
@ -72,6 +72,18 @@ class AIOSQLite:
|
||||||
waiting_reads_metric = Gauge(
|
waiting_reads_metric = Gauge(
|
||||||
"waiting_reads_count", "Number of waiting db writes", namespace="daemon_database"
|
"waiting_reads_count", "Number of waiting db writes", namespace="daemon_database"
|
||||||
)
|
)
|
||||||
|
write_count_metric = Counter(
|
||||||
|
"write_count", "Number of database writes", namespace="daemon_database"
|
||||||
|
)
|
||||||
|
read_count_metric = Counter(
|
||||||
|
"read_count", "Number of database reads", namespace="daemon_database"
|
||||||
|
)
|
||||||
|
acquire_write_lock_metric = Histogram(
|
||||||
|
f'write_lock_acquired', 'Time to acquire the write lock', namespace="daemon_database"
|
||||||
|
)
|
||||||
|
held_write_lock_metric = Histogram(
|
||||||
|
f'write_lock_held', 'Length of time the write lock is held for', namespace="daemon_database"
|
||||||
|
)
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
# has to be single threaded as there is no mapping of thread:connection
|
# has to be single threaded as there is no mapping of thread:connection
|
||||||
|
@ -79,7 +91,7 @@ class AIOSQLite:
|
||||||
self.writer_connection: Optional[sqlite3.Connection] = None
|
self.writer_connection: Optional[sqlite3.Connection] = None
|
||||||
self._closing = False
|
self._closing = False
|
||||||
self.query_count = 0
|
self.query_count = 0
|
||||||
self.write_lock = asyncio.Lock()
|
self.write_lock = LockWithMetrics(self.acquire_write_lock_metric, self.held_write_lock_metric)
|
||||||
self.writers = 0
|
self.writers = 0
|
||||||
self.read_ready = asyncio.Event()
|
self.read_ready = asyncio.Event()
|
||||||
self.urgent_read_done = asyncio.Event()
|
self.urgent_read_done = asyncio.Event()
|
||||||
|
@ -127,6 +139,7 @@ class AIOSQLite:
|
||||||
urgent_read = False
|
urgent_read = False
|
||||||
if read_only:
|
if read_only:
|
||||||
self.waiting_reads_metric.inc()
|
self.waiting_reads_metric.inc()
|
||||||
|
self.read_count_metric.inc()
|
||||||
try:
|
try:
|
||||||
while self.writers: # more writes can come in while we are waiting for the first
|
while self.writers: # more writes can come in while we are waiting for the first
|
||||||
if not urgent_read and still_waiting and self.urgent_read_done.is_set():
|
if not urgent_read and still_waiting and self.urgent_read_done.is_set():
|
||||||
|
@ -161,6 +174,7 @@ class AIOSQLite:
|
||||||
return self.run(lambda conn: conn.execute(sql, parameters))
|
return self.run(lambda conn: conn.execute(sql, parameters))
|
||||||
|
|
||||||
async def run(self, fun, *args, **kwargs):
|
async def run(self, fun, *args, **kwargs):
|
||||||
|
self.write_count_metric.inc()
|
||||||
self.waiting_writes_metric.inc()
|
self.waiting_writes_metric.inc()
|
||||||
try:
|
try:
|
||||||
await self.urgent_read_done.wait()
|
await self.urgent_read_done.wait()
|
||||||
|
@ -193,10 +207,26 @@ class AIOSQLite:
|
||||||
log.warning("rolled back")
|
log.warning("rolled back")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def run_with_foreign_keys_disabled(self, fun, *args, **kwargs) -> Awaitable:
|
async def run_with_foreign_keys_disabled(self, fun, *args, **kwargs):
|
||||||
return asyncio.get_event_loop().run_in_executor(
|
self.write_count_metric.inc()
|
||||||
self.writer_executor, self.__run_transaction_with_foreign_keys_disabled, fun, args, kwargs
|
self.waiting_writes_metric.inc()
|
||||||
)
|
try:
|
||||||
|
await self.urgent_read_done.wait()
|
||||||
|
except Exception as e:
|
||||||
|
self.waiting_writes_metric.dec()
|
||||||
|
raise e
|
||||||
|
self.writers += 1
|
||||||
|
self.read_ready.clear()
|
||||||
|
try:
|
||||||
|
async with self.write_lock:
|
||||||
|
return await asyncio.get_event_loop().run_in_executor(
|
||||||
|
self.writer_executor, self.__run_transaction_with_foreign_keys_disabled, fun, args, kwargs
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
self.writers -= 1
|
||||||
|
self.waiting_writes_metric.dec()
|
||||||
|
if not self.writers:
|
||||||
|
self.read_ready.set()
|
||||||
|
|
||||||
def __run_transaction_with_foreign_keys_disabled(self,
|
def __run_transaction_with_foreign_keys_disabled(self,
|
||||||
fun: Callable[[sqlite3.Connection, Any, Any], Any],
|
fun: Callable[[sqlite3.Connection, Any, Any], Any],
|
||||||
|
|
Loading…
Reference in a new issue