Merge branch 'fix-database-locked'
This commit is contained in:
commit
d8a728e931
1 changed files with 19 additions and 3 deletions
|
@ -4,7 +4,7 @@ import time
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import traceback
|
import traceback
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
from twisted.internet import defer, task, reactor, threads
|
from twisted.internet import defer, task, threads
|
||||||
from twisted.enterprise import adbapi
|
from twisted.enterprise import adbapi
|
||||||
|
|
||||||
from lbryschema.claim import ClaimDict
|
from lbryschema.claim import ClaimDict
|
||||||
|
@ -73,16 +73,25 @@ def rerun_if_locked(f):
|
||||||
max_attempts = 3
|
max_attempts = 3
|
||||||
|
|
||||||
def rerun(err, rerun_count, *args, **kwargs):
|
def rerun(err, rerun_count, *args, **kwargs):
|
||||||
|
connection = args[0]
|
||||||
|
reactor = connection.reactor
|
||||||
log.debug("Failed to execute (%s): %s", err, args)
|
log.debug("Failed to execute (%s): %s", err, args)
|
||||||
if err.check(sqlite3.OperationalError) and err.value.message == "database is locked":
|
if err.check(sqlite3.OperationalError) and err.value.message == "database is locked":
|
||||||
log.warning("database was locked. rerunning %s with args %s, kwargs %s",
|
log.warning("database was locked. rerunning %s with args %s, kwargs %s",
|
||||||
str(f), str(args), str(kwargs))
|
str(f), str(args), str(kwargs))
|
||||||
if rerun_count < max_attempts:
|
if rerun_count < max_attempts:
|
||||||
return task.deferLater(reactor, 0, inner_wrapper, rerun_count + 1, *args, **kwargs)
|
delay = 2**rerun_count
|
||||||
|
return task.deferLater(reactor, delay, inner_wrapper, rerun_count + 1, *args, **kwargs)
|
||||||
raise err
|
raise err
|
||||||
|
|
||||||
|
def check_needed_rerun(result, rerun_count):
|
||||||
|
if rerun_count:
|
||||||
|
log.info("successfully reran database query")
|
||||||
|
return result
|
||||||
|
|
||||||
def inner_wrapper(rerun_count, *args, **kwargs):
|
def inner_wrapper(rerun_count, *args, **kwargs):
|
||||||
d = f(*args, **kwargs)
|
d = f(*args, **kwargs)
|
||||||
|
d.addCallback(check_needed_rerun, rerun_count)
|
||||||
d.addErrback(rerun, rerun_count, *args, **kwargs)
|
d.addErrback(rerun, rerun_count, *args, **kwargs)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
@ -100,6 +109,10 @@ class SqliteConnection(adbapi.ConnectionPool):
|
||||||
def runInteraction(self, interaction, *args, **kw):
|
def runInteraction(self, interaction, *args, **kw):
|
||||||
return adbapi.ConnectionPool.runInteraction(self, interaction, *args, **kw)
|
return adbapi.ConnectionPool.runInteraction(self, interaction, *args, **kw)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def set_reactor(cls, reactor):
|
||||||
|
cls.reactor = reactor
|
||||||
|
|
||||||
|
|
||||||
class SQLiteStorage(object):
|
class SQLiteStorage(object):
|
||||||
CREATE_TABLES_QUERY = """
|
CREATE_TABLES_QUERY = """
|
||||||
|
@ -164,11 +177,14 @@ class SQLiteStorage(object):
|
||||||
);
|
);
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, db_dir):
|
def __init__(self, db_dir, reactor=None):
|
||||||
|
if not reactor:
|
||||||
|
from twisted.internet import reactor
|
||||||
self.db_dir = db_dir
|
self.db_dir = db_dir
|
||||||
self._db_path = os.path.join(db_dir, "lbrynet.sqlite")
|
self._db_path = os.path.join(db_dir, "lbrynet.sqlite")
|
||||||
log.info("connecting to database: %s", self._db_path)
|
log.info("connecting to database: %s", self._db_path)
|
||||||
self.db = SqliteConnection(self._db_path)
|
self.db = SqliteConnection(self._db_path)
|
||||||
|
self.db.set_reactor(reactor)
|
||||||
|
|
||||||
def setup(self):
|
def setup(self):
|
||||||
def _create_tables(transaction):
|
def _create_tables(transaction):
|
||||||
|
|
Loading…
Reference in a new issue