fix SQLITE_MISUSE

fixes https://github.com/lbryio/lbry-sdk/issues/2164
This commit is contained in:
Jack Robison 2019-09-09 21:41:52 -04:00
parent f9880c8931
commit a05c44ad1e
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -1,6 +1,5 @@
import logging import logging
import asyncio import asyncio
from asyncio import wrap_future
from binascii import hexlify from binascii import hexlify
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
@ -23,8 +22,10 @@ class AIOSQLite:
@classmethod @classmethod
async def connect(cls, path: Union[bytes, str], *args, **kwargs): async def connect(cls, path: Union[bytes, str], *args, **kwargs):
def _connect():
return sqlite3.connect(path, *args, **kwargs)
db = cls() db = cls()
db.connection = await wrap_future(db.executor.submit(sqlite3.connect, path, *args, **kwargs)) db.connection = await asyncio.get_event_loop().run_in_executor(db.executor, _connect)
return db return db
async def close(self): async def close(self):
@ -38,25 +39,24 @@ class AIOSQLite:
return asyncio.get_event_loop_policy().get_event_loop().call_later(0.01, __close, conn) return asyncio.get_event_loop_policy().get_event_loop().call_later(0.01, __close, conn)
def executemany(self, sql: str, params: Iterable): def executemany(self, sql: str, params: Iterable):
def __executemany_in_a_transaction(conn: sqlite3.Connection, *args, **kwargs): params = params if params is not None else []
return conn.executemany(*args, **kwargs) return self.run(lambda conn: conn.executemany(sql, params).fetchall())
return self.run(__executemany_in_a_transaction, sql, params)
def executescript(self, script: str) -> Awaitable: def executescript(self, script: str) -> Awaitable:
return wrap_future(self.executor.submit(self.connection.executescript, script)) return self.run(lambda conn: conn.executescript(script))
def execute_fetchall(self, sql: str, parameters: Iterable = None) -> Awaitable[Iterable[sqlite3.Row]]: def execute_fetchall(self, sql: str, parameters: Iterable = None) -> Awaitable[Iterable[sqlite3.Row]]:
parameters = parameters if parameters is not None else [] parameters = parameters if parameters is not None else []
def __fetchall(conn: sqlite3.Connection, *args, **kwargs): return self.run(lambda conn: conn.execute(sql, parameters).fetchall())
return conn.execute(*args, **kwargs).fetchall()
return wrap_future(self.executor.submit(__fetchall, self.connection, sql, parameters))
def execute(self, sql: str, parameters: Iterable = None) -> Awaitable[sqlite3.Cursor]: def execute(self, sql: str, parameters: Iterable = None) -> Awaitable[sqlite3.Cursor]:
parameters = parameters if parameters is not None else [] parameters = parameters if parameters is not None else []
return self.run(lambda conn, sql, parameters: conn.execute(sql, parameters), sql, parameters) return self.run(lambda conn: conn.execute(sql, parameters))
def run(self, fun, *args, **kwargs) -> Awaitable: def run(self, fun, *args, **kwargs) -> Awaitable:
return wrap_future(self.executor.submit(self.__run_transaction, fun, *args, **kwargs)) return asyncio.get_event_loop().run_in_executor(
self.executor, lambda: self.__run_transaction(fun, *args, **kwargs)
)
def __run_transaction(self, fun: Callable[[sqlite3.Connection, Any, Any], Any], *args, **kwargs): def __run_transaction(self, fun: Callable[[sqlite3.Connection, Any, Any], Any], *args, **kwargs):
self.connection.execute('begin') self.connection.execute('begin')
@ -64,19 +64,20 @@ class AIOSQLite:
result = fun(self.connection, *args, **kwargs) # type: ignore result = fun(self.connection, *args, **kwargs) # type: ignore
self.connection.commit() self.connection.commit()
return result return result
except (Exception, OSError): # as e: except (Exception, OSError) as e:
#log.exception('Error running transaction:', exc_info=e) log.exception('Error running transaction:', exc_info=e)
self.connection.rollback() self.connection.rollback()
log.warning("rolled back")
raise raise
def run_with_foreign_keys_disabled(self, fun, *args, **kwargs) -> Awaitable: def run_with_foreign_keys_disabled(self, fun, *args, **kwargs) -> Awaitable:
return wrap_future( return asyncio.get_event_loop().run_in_executor(
self.executor.submit(self.__run_transaction_with_foreign_keys_disabled, fun, *args, **kwargs) self.executor, self.__run_transaction_with_foreign_keys_disabled, fun, args, kwargs
) )
def __run_transaction_with_foreign_keys_disabled(self, def __run_transaction_with_foreign_keys_disabled(self,
fun: Callable[[sqlite3.Connection, Any, Any], Any], fun: Callable[[sqlite3.Connection, Any, Any], Any],
*args, **kwargs): args, kwargs):
foreign_keys_enabled, = self.connection.execute("pragma foreign_keys").fetchone() foreign_keys_enabled, = self.connection.execute("pragma foreign_keys").fetchone()
if not foreign_keys_enabled: if not foreign_keys_enabled:
raise sqlite3.IntegrityError("foreign keys are disabled, use `AIOSQLite.run` instead") raise sqlite3.IntegrityError("foreign keys are disabled, use `AIOSQLite.run` instead")
@ -583,10 +584,12 @@ class BaseDatabase(SQLiteMixin):
async def add_keys(self, account, chain, keys): async def add_keys(self, account, chain, keys):
await self.db.executemany( await self.db.executemany(
"insert into pubkey_address (address, account, chain, position, pubkey) values (?, ?, ?, ?, ?)", "insert into pubkey_address values (?, ?, ?, ?, ?, NULL, 0)",
((pubkey.address, account.public_key.address, chain, (
position, sqlite3.Binary(pubkey.pubkey_bytes)) (pubkey.address, account.public_key.address, chain, position,
for position, pubkey in keys) sqlite3.Binary(pubkey.pubkey_bytes))
for position, pubkey in keys
)
) )
async def _set_address_history(self, address, history): async def _set_address_history(self, address, history):