Compare commits
1 commit
master
...
demo_claim
Author | SHA1 | Date | |
---|---|---|---|
|
a58ebcbc53 |
2 changed files with 113 additions and 0 deletions
|
@ -1,6 +1,7 @@
|
|||
import logging
|
||||
import asyncio
|
||||
import sqlite3
|
||||
import sys
|
||||
|
||||
from binascii import hexlify
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
|
@ -49,6 +50,11 @@ class AIOSQLite:
|
|||
def executescript(self, script: str) -> Awaitable:
|
||||
return self.run(lambda conn: conn.executescript(script))
|
||||
|
||||
def create_function(self, name, num_params, func, deterministic=True) -> Awaitable:
|
||||
if sys.version_info < (3, 8):
|
||||
return self.run(lambda conn: conn.create_function(name, num_params, func))
|
||||
return self.run(lambda conn: conn.create_function(name, num_params, func, deterministic=deterministic))
|
||||
|
||||
def execute_fetchall(self, sql: str, parameters: Iterable = None) -> Awaitable[Iterable[sqlite3.Row]]:
|
||||
parameters = parameters if parameters is not None else []
|
||||
return self.run(lambda conn: conn.execute(sql, parameters).fetchall())
|
||||
|
|
107
scripts/stay_in_sync.py
Executable file
107
scripts/stay_in_sync.py
Executable file
|
@ -0,0 +1,107 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
from lbry.wallet import database
|
||||
|
||||
|
||||
def enable_logging():
|
||||
root = logging.getLogger()
|
||||
root.setLevel(logging.DEBUG)
|
||||
|
||||
handler = logging.StreamHandler(sys.stdout)
|
||||
handler.setLevel(logging.DEBUG)
|
||||
formatter = logging.Formatter('%(message)s') # %(asctime)s - %(levelname)s -
|
||||
handler.setFormatter(formatter)
|
||||
root.addHandler(handler)
|
||||
|
||||
|
||||
async def initialize_tables(queue, old_file, db):
|
||||
|
||||
await db.executescript("PRAGMA journal_mode=WAL")
|
||||
await db.executescript("PRAGMA case_sensitive_like=true")
|
||||
await db.executescript("CREATE TABLE IF NOT EXISTS claim_with_metadata "
|
||||
"(claimID BLOB NOT NULL PRIMARY KEY, "
|
||||
"name BLOB NOT NULL, "
|
||||
"originalHeight INTEGER NOT NULL, "
|
||||
"updateHeight INTEGER NOT NULL, "
|
||||
"activationHeight INTEGER NOT NULL, "
|
||||
"metadataField1 TEXT)")
|
||||
|
||||
await db.executescript(f"ATTACH '{old_file}' AS crd")
|
||||
|
||||
await db.create_function("NEEDS_UPDATE", 5, lambda id, name, original, update, activation:
|
||||
queue.put_nowait({
|
||||
"claimID": id, "name": name, "originalHeight": original,
|
||||
"updateHeight": update, "activationHeight": activation
|
||||
}))
|
||||
await db.executescript("CREATE TEMP TRIGGER on_claim_insert AFTER INSERT ON crd.claim "
|
||||
"FOR EACH ROW BEGIN SELECT NEEDS_UPDATE(NEW.claimID, NEW.nodeName, "
|
||||
"NEW.originalHeight, NEW.updateHeight, NEW.ActivationHeight); END")
|
||||
await db.executescript("CREATE TEMP TRIGGER on_claim_update AFTER UPDATE ON crd.claim "
|
||||
"FOR EACH ROW BEGIN SELECT NEEDS_UPDATE(NEW.claimID, NEW.nodeName, "
|
||||
"NEW.originalHeight, NEW.updateHeight, NEW.ActivationHeight); END")
|
||||
await db.executescript("CREATE TEMP TRIGGER on_claim_delete AFTER DELETE ON crd.claim "
|
||||
"FOR EACH ROW BEGIN SELECT NEEDS_UPDATE(OLD.claimID, OLD.nodeName, 0, 0, 0); END")
|
||||
|
||||
|
||||
async def catch_up_on_missing(queue, db):
|
||||
# items removed from crd are those that are in claim_with_metadata but not in crd
|
||||
# items inserted into crd are those in crd but not in claim_with_metadata
|
||||
# items updated are those that are in both but some columns don't match
|
||||
await db.executescript("DELETE FROM claim_with_metadata WHERE claimID NOT IN"
|
||||
"(SELECT c.claimID FROM crd.claim c)")
|
||||
await db.executescript("INSERT INTO claim_with_metadata(claimID, name, originalHeight, updateHeight, activationHeight) "
|
||||
"SELECT c.claimID, c.nodeName, c.originalHeight, c.updateHeight, c.activationHeight "
|
||||
"FROM crd.claim c LEFT JOIN claim_with_metadata m ON c.claimID = m.claimID "
|
||||
"WHERE m.claimID IS NULL OR c.nodeName != m.name OR c.updateHeight != m.updateHeight "
|
||||
"OR c.activationHeight != m.activationHeight ON CONFLICT(claimID) DO UPDATE SET "
|
||||
"name = excluded.name, originalHeight = excluded.originalHeight, "
|
||||
"updateHeight = excluded.updateHeight, activationHeight = excluded.activationHeight")
|
||||
|
||||
|
||||
async def remove_claim(claim, db):
|
||||
await db.executescript("DELETE FROM claim_with_metadata WHERE claimID = :claimID", claim)
|
||||
|
||||
|
||||
async def insert_update(claim, db):
|
||||
# TODO: lookup and parse metadata here
|
||||
await db.executescript("INSERT INTO claim_with_metadata(claimID, name, originalHeight, updateHeight, activationHeight) "
|
||||
"VALUES(:claimID, :name, :originalHeight, :updateHeight, :activationHeight) "
|
||||
"ON CONFLICT(claimID) DO UPDATE SET name = excluded.name, "
|
||||
"originalHeight = excluded.originalHeight, updateHeight = excluded.updateHeight, "
|
||||
"activationHeight = excluded.activationHeight", claim)
|
||||
|
||||
|
||||
async def handle_changes():
|
||||
old_file = os.path.expanduser("~/.lbrycrd/claims.sqlite")
|
||||
new_file = os.path.expanduser("~/.lbrycrd/duplicate.sqlite")
|
||||
db = database.AIOSQLite()
|
||||
db = await db.connect(new_file) # super confusing that you have to use the return value here
|
||||
|
||||
queue = asyncio.Queue()
|
||||
await initialize_tables(queue, old_file, db)
|
||||
await catch_up_on_missing(queue, db)
|
||||
|
||||
while True:
|
||||
claim = await queue.get()
|
||||
if claim.updateHeight <= 0:
|
||||
await remove_claim(claim, db)
|
||||
else:
|
||||
await insert_update(claim, db)
|
||||
|
||||
|
||||
def main():
|
||||
enable_logging()
|
||||
|
||||
try:
|
||||
asyncio.run(handle_changes())
|
||||
except KeyboardInterrupt:
|
||||
logging.info("Process interrupted")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Loading…
Add table
Reference in a new issue