rename executor -> writer_executor

This commit is contained in:
Jack Robison 2020-02-17 18:12:52 -05:00
parent c271361552
commit 7a6b1930bf
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -19,27 +19,28 @@ class AIOSQLite:
def __init__(self):
# has to be single threaded as there is no mapping of thread:connection
self.executor = ThreadPoolExecutor(max_workers=1)
self.connection: sqlite3.Connection = None
self.writer_executor = ThreadPoolExecutor(max_workers=1)
self.writer_connection: Optional[sqlite3.Connection] = None
self._closing = False
self.query_count = 0
@classmethod
async def connect(cls, path: Union[bytes, str], *args, **kwargs):
sqlite3.enable_callback_tracebacks(True)
def _connect():
return sqlite3.connect(path, *args, **kwargs)
db = cls()
db.connection = await asyncio.get_event_loop().run_in_executor(db.executor, _connect)
def _connect_writer():
db.writer_connection = sqlite3.connect(path, *args, **kwargs)
await asyncio.get_event_loop().run_in_executor(db.writer_executor, _connect_writer)
return db
async def close(self):
if self._closing:
return
self._closing = True
await asyncio.get_event_loop().run_in_executor(self.executor, self.connection.close)
self.executor.shutdown(wait=True)
self.connection = None
await asyncio.get_event_loop().run_in_executor(self.writer_executor, self.writer_connection.close)
self.writer_executor.shutdown(wait=True)
self.writer_connection = None
def executemany(self, sql: str, params: Iterable):
params = params if params is not None else []
@ -63,38 +64,38 @@ class AIOSQLite:
def run(self, fun, *args, **kwargs) -> Awaitable:
return asyncio.get_event_loop().run_in_executor(
self.executor, lambda: self.__run_transaction(fun, *args, **kwargs)
self.writer_executor, lambda: self.__run_transaction(fun, *args, **kwargs)
)
def __run_transaction(self, fun: Callable[[sqlite3.Connection, Any, Any], Any], *args, **kwargs):
self.connection.execute('begin')
self.writer_connection.execute('begin')
try:
self.query_count += 1
result = fun(self.connection, *args, **kwargs) # type: ignore
self.connection.commit()
result = fun(self.writer_connection, *args, **kwargs) # type: ignore
self.writer_connection.commit()
return result
except (Exception, OSError) as e:
log.exception('Error running transaction:', exc_info=e)
self.connection.rollback()
self.writer_connection.rollback()
log.warning("rolled back")
raise
def run_with_foreign_keys_disabled(self, fun, *args, **kwargs) -> Awaitable:
return asyncio.get_event_loop().run_in_executor(
self.executor, self.__run_transaction_with_foreign_keys_disabled, fun, args, kwargs
self.writer_executor, self.__run_transaction_with_foreign_keys_disabled, fun, args, kwargs
)
def __run_transaction_with_foreign_keys_disabled(self,
fun: Callable[[sqlite3.Connection, Any, Any], Any],
args, kwargs):
foreign_keys_enabled, = self.connection.execute("pragma foreign_keys").fetchone()
foreign_keys_enabled, = self.writer_connection.execute("pragma foreign_keys").fetchone()
if not foreign_keys_enabled:
raise sqlite3.IntegrityError("foreign keys are disabled, use `AIOSQLite.run` instead")
try:
self.connection.execute('pragma foreign_keys=off').fetchone()
self.writer_connection.execute('pragma foreign_keys=off').fetchone()
return self.__run_transaction(fun, *args, **kwargs)
finally:
self.connection.execute('pragma foreign_keys=on').fetchone()
self.writer_connection.execute('pragma foreign_keys=on').fetchone()
def constraints_to_sql(constraints, joiner=' AND ', prepend_key=''):