triggers never activated
This commit is contained in:
parent
3921180d77
commit
a58ebcbc53
2 changed files with 113 additions and 0 deletions
|
@ -1,6 +1,7 @@
|
||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
import sys
|
||||||
|
|
||||||
from binascii import hexlify
|
from binascii import hexlify
|
||||||
from concurrent.futures.thread import ThreadPoolExecutor
|
from concurrent.futures.thread import ThreadPoolExecutor
|
||||||
|
@ -49,6 +50,11 @@ class AIOSQLite:
|
||||||
def executescript(self, script: str) -> Awaitable:
|
def executescript(self, script: str) -> Awaitable:
|
||||||
return self.run(lambda conn: conn.executescript(script))
|
return self.run(lambda conn: conn.executescript(script))
|
||||||
|
|
||||||
|
def create_function(self, name, num_params, func, deterministic=True) -> Awaitable:
|
||||||
|
if sys.version_info < (3, 8):
|
||||||
|
return self.run(lambda conn: conn.create_function(name, num_params, func))
|
||||||
|
return self.run(lambda conn: conn.create_function(name, num_params, func, deterministic=deterministic))
|
||||||
|
|
||||||
def execute_fetchall(self, sql: str, parameters: Iterable = None) -> Awaitable[Iterable[sqlite3.Row]]:
|
def execute_fetchall(self, sql: str, parameters: Iterable = None) -> Awaitable[Iterable[sqlite3.Row]]:
|
||||||
parameters = parameters if parameters is not None else []
|
parameters = parameters if parameters is not None else []
|
||||||
return self.run(lambda conn: conn.execute(sql, parameters).fetchall())
|
return self.run(lambda conn: conn.execute(sql, parameters).fetchall())
|
||||||
|
|
107
scripts/stay_in_sync.py
Executable file
107
scripts/stay_in_sync.py
Executable file
|
@ -0,0 +1,107 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from lbry.wallet import database
|
||||||
|
|
||||||
|
|
||||||
|
def enable_logging():
|
||||||
|
root = logging.getLogger()
|
||||||
|
root.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
handler = logging.StreamHandler(sys.stdout)
|
||||||
|
handler.setLevel(logging.DEBUG)
|
||||||
|
formatter = logging.Formatter('%(message)s') # %(asctime)s - %(levelname)s -
|
||||||
|
handler.setFormatter(formatter)
|
||||||
|
root.addHandler(handler)
|
||||||
|
|
||||||
|
|
||||||
|
async def initialize_tables(queue, old_file, db):
|
||||||
|
|
||||||
|
await db.executescript("PRAGMA journal_mode=WAL")
|
||||||
|
await db.executescript("PRAGMA case_sensitive_like=true")
|
||||||
|
await db.executescript("CREATE TABLE IF NOT EXISTS claim_with_metadata "
|
||||||
|
"(claimID BLOB NOT NULL PRIMARY KEY, "
|
||||||
|
"name BLOB NOT NULL, "
|
||||||
|
"originalHeight INTEGER NOT NULL, "
|
||||||
|
"updateHeight INTEGER NOT NULL, "
|
||||||
|
"activationHeight INTEGER NOT NULL, "
|
||||||
|
"metadataField1 TEXT)")
|
||||||
|
|
||||||
|
await db.executescript(f"ATTACH '{old_file}' AS crd")
|
||||||
|
|
||||||
|
await db.create_function("NEEDS_UPDATE", 5, lambda id, name, original, update, activation:
|
||||||
|
queue.put_nowait({
|
||||||
|
"claimID": id, "name": name, "originalHeight": original,
|
||||||
|
"updateHeight": update, "activationHeight": activation
|
||||||
|
}))
|
||||||
|
await db.executescript("CREATE TEMP TRIGGER on_claim_insert AFTER INSERT ON crd.claim "
|
||||||
|
"FOR EACH ROW BEGIN SELECT NEEDS_UPDATE(NEW.claimID, NEW.nodeName, "
|
||||||
|
"NEW.originalHeight, NEW.updateHeight, NEW.ActivationHeight); END")
|
||||||
|
await db.executescript("CREATE TEMP TRIGGER on_claim_update AFTER UPDATE ON crd.claim "
|
||||||
|
"FOR EACH ROW BEGIN SELECT NEEDS_UPDATE(NEW.claimID, NEW.nodeName, "
|
||||||
|
"NEW.originalHeight, NEW.updateHeight, NEW.ActivationHeight); END")
|
||||||
|
await db.executescript("CREATE TEMP TRIGGER on_claim_delete AFTER DELETE ON crd.claim "
|
||||||
|
"FOR EACH ROW BEGIN SELECT NEEDS_UPDATE(OLD.claimID, OLD.nodeName, 0, 0, 0); END")
|
||||||
|
|
||||||
|
|
||||||
|
async def catch_up_on_missing(queue, db):
|
||||||
|
# items removed from crd are those that are in claim_with_metadata but not in crd
|
||||||
|
# items inserted into crd are those in crd but not in claim_with_metadata
|
||||||
|
# items updated are those that are in both but some columns don't match
|
||||||
|
await db.executescript("DELETE FROM claim_with_metadata WHERE claimID NOT IN"
|
||||||
|
"(SELECT c.claimID FROM crd.claim c)")
|
||||||
|
await db.executescript("INSERT INTO claim_with_metadata(claimID, name, originalHeight, updateHeight, activationHeight) "
|
||||||
|
"SELECT c.claimID, c.nodeName, c.originalHeight, c.updateHeight, c.activationHeight "
|
||||||
|
"FROM crd.claim c LEFT JOIN claim_with_metadata m ON c.claimID = m.claimID "
|
||||||
|
"WHERE m.claimID IS NULL OR c.nodeName != m.name OR c.updateHeight != m.updateHeight "
|
||||||
|
"OR c.activationHeight != m.activationHeight ON CONFLICT(claimID) DO UPDATE SET "
|
||||||
|
"name = excluded.name, originalHeight = excluded.originalHeight, "
|
||||||
|
"updateHeight = excluded.updateHeight, activationHeight = excluded.activationHeight")
|
||||||
|
|
||||||
|
|
||||||
|
async def remove_claim(claim, db):
|
||||||
|
await db.executescript("DELETE FROM claim_with_metadata WHERE claimID = :claimID", claim)
|
||||||
|
|
||||||
|
|
||||||
|
async def insert_update(claim, db):
|
||||||
|
# TODO: lookup and parse metadata here
|
||||||
|
await db.executescript("INSERT INTO claim_with_metadata(claimID, name, originalHeight, updateHeight, activationHeight) "
|
||||||
|
"VALUES(:claimID, :name, :originalHeight, :updateHeight, :activationHeight) "
|
||||||
|
"ON CONFLICT(claimID) DO UPDATE SET name = excluded.name, "
|
||||||
|
"originalHeight = excluded.originalHeight, updateHeight = excluded.updateHeight, "
|
||||||
|
"activationHeight = excluded.activationHeight", claim)
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_changes():
|
||||||
|
old_file = os.path.expanduser("~/.lbrycrd/claims.sqlite")
|
||||||
|
new_file = os.path.expanduser("~/.lbrycrd/duplicate.sqlite")
|
||||||
|
db = database.AIOSQLite()
|
||||||
|
db = await db.connect(new_file) # super confusing that you have to use the return value here
|
||||||
|
|
||||||
|
queue = asyncio.Queue()
|
||||||
|
await initialize_tables(queue, old_file, db)
|
||||||
|
await catch_up_on_missing(queue, db)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
claim = await queue.get()
|
||||||
|
if claim.updateHeight <= 0:
|
||||||
|
await remove_claim(claim, db)
|
||||||
|
else:
|
||||||
|
await insert_update(claim, db)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
enable_logging()
|
||||||
|
|
||||||
|
try:
|
||||||
|
asyncio.run(handle_changes())
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logging.info("Process interrupted")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
Loading…
Add table
Reference in a new issue