diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index db661c8a1..ed6b663a3 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -4,7 +4,7 @@ import time import sqlite3 import traceback from decimal import Decimal -from twisted.internet import defer, task, reactor, threads +from twisted.internet import defer, task, threads from twisted.enterprise import adbapi from lbryschema.claim import ClaimDict @@ -73,6 +73,8 @@ def rerun_if_locked(f): max_attempts = 3 def rerun(err, rerun_count, *args, **kwargs): + connection = args[0] + reactor = connection.reactor log.debug("Failed to execute (%s): %s", err, args) if err.check(sqlite3.OperationalError) and err.value.message == "database is locked": log.warning("database was locked. rerunning %s with args %s, kwargs %s", @@ -82,8 +84,14 @@ def rerun_if_locked(f): return task.deferLater(reactor, delay, inner_wrapper, rerun_count + 1, *args, **kwargs) raise err + def check_needed_rerun(result, rerun_count): + if rerun_count: + log.info("successfully reran database query") + return result + def inner_wrapper(rerun_count, *args, **kwargs): d = f(*args, **kwargs) + d.addCallback(check_needed_rerun, rerun_count) d.addErrback(rerun, rerun_count, *args, **kwargs) return d @@ -101,6 +109,10 @@ class SqliteConnection(adbapi.ConnectionPool): def runInteraction(self, interaction, *args, **kw): return adbapi.ConnectionPool.runInteraction(self, interaction, *args, **kw) + @classmethod + def set_reactor(cls, reactor): + cls.reactor = reactor + class SQLiteStorage(object): CREATE_TABLES_QUERY = """ @@ -165,11 +177,14 @@ class SQLiteStorage(object): ); """ - def __init__(self, db_dir): + def __init__(self, db_dir, reactor=None): + if not reactor: + from twisted.internet import reactor self.db_dir = db_dir self._db_path = os.path.join(db_dir, "lbrynet.sqlite") log.info("connecting to database: %s", self._db_path) self.db = SqliteConnection(self._db_path) + self.db.set_reactor(reactor) def setup(self): def _create_tables(transaction):