diff --git a/lbry/wallet/database.py b/lbry/wallet/database.py index ec92d6522..446f53ee5 100644 --- a/lbry/wallet/database.py +++ b/lbry/wallet/database.py @@ -73,6 +73,7 @@ class AIOSQLite: self.write_lock = asyncio.Lock() self.writers = 0 self.read_ready = asyncio.Event() + self.urgent_read_done = asyncio.Event() @classmethod async def connect(cls, path: Union[bytes, str], *args, **kwargs): @@ -88,6 +89,7 @@ class AIOSQLite: ) await asyncio.get_event_loop().run_in_executor(db.writer_executor, _connect_writer) db.read_ready.set() + db.urgent_read_done.set() return db async def close(self): @@ -112,12 +114,25 @@ class AIOSQLite: read_only=False, fetch_all: bool = False) -> List[dict]: read_only_fn = run_read_only_fetchall if fetch_all else run_read_only_fetchone parameters = parameters if parameters is not None else [] + still_waiting = False + urgent_read = False if read_only: - while self.writers: - await self.read_ready.wait() - return await asyncio.get_event_loop().run_in_executor( - self.reader_executor, read_only_fn, sql, parameters - ) + try: + while self.writers: # more writes can come in while we are waiting for the first + if not urgent_read and still_waiting and self.urgent_read_done.is_set(): + # throttle the writes if they pile up + self.urgent_read_done.clear() + urgent_read = True + # wait until the running writes have finished + await self.read_ready.wait() + still_waiting = True + return await asyncio.get_event_loop().run_in_executor( + self.reader_executor, read_only_fn, sql, parameters + ) + finally: + if urgent_read: + # unthrottle the writers if they had to be throttled + self.urgent_read_done.set() if fetch_all: return await self.run(lambda conn: conn.execute(sql, parameters).fetchall()) return await self.run(lambda conn: conn.execute(sql, parameters).fetchone()) @@ -135,6 +150,7 @@ class AIOSQLite: return self.run(lambda conn: conn.execute(sql, parameters)) async def run(self, fun, *args, **kwargs): + await self.urgent_read_done.wait() self.writers += 1 self.read_ready.clear() try: