forked from LBRYCommunity/lbry-sdk
create changelog trigger
This commit is contained in:
parent
afe7ed5b05
commit
19f70d7a11
2 changed files with 16 additions and 7 deletions
|
@ -164,7 +164,6 @@ class BlockProcessor:
|
||||||
self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event)
|
self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event)
|
||||||
self.logger = class_logger(__name__, self.__class__.__name__)
|
self.logger = class_logger(__name__, self.__class__.__name__)
|
||||||
self.executor = ThreadPoolExecutor(1)
|
self.executor = ThreadPoolExecutor(1)
|
||||||
self.index_executor = ThreadPoolExecutor(os.cpu_count())
|
|
||||||
|
|
||||||
# Meta
|
# Meta
|
||||||
self.next_cache_check = 0
|
self.next_cache_check = 0
|
||||||
|
@ -216,10 +215,6 @@ class BlockProcessor:
|
||||||
if hprevs == chain:
|
if hprevs == chain:
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
|
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
|
||||||
pending = []
|
|
||||||
for height in range(first, first + len(blocks)):
|
|
||||||
pending.append(asyncio.get_event_loop().run_in_executor(self.index_executor, self.db.sql.enqueue_changes, height))
|
|
||||||
await asyncio.gather(*pending)
|
|
||||||
await self.db.search_index.sync_queue(self.sql.claim_queue)
|
await self.db.search_index.sync_queue(self.sql.claim_queue)
|
||||||
for cache in self.search_cache.values():
|
for cache in self.search_cache.values():
|
||||||
cache.clear()
|
cache.clear()
|
||||||
|
|
|
@ -135,6 +135,17 @@ class SQLDB:
|
||||||
create index if not exists claimtrie_claim_hash_idx on claimtrie (claim_hash);
|
create index if not exists claimtrie_claim_hash_idx on claimtrie (claim_hash);
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
CREATE_CHANGELOG_TRIGGER = """
|
||||||
|
create table if not exists changelog (
|
||||||
|
claim_hash bytes primary key
|
||||||
|
);
|
||||||
|
create index if not exists claimtrie_claim_hash_idx on claimtrie (claim_hash);
|
||||||
|
create trigger if not exists claim_changelog after update on claim
|
||||||
|
begin
|
||||||
|
insert or ignore into changelog (claim_hash) values (new.claim_hash);
|
||||||
|
end;
|
||||||
|
"""
|
||||||
|
|
||||||
SEARCH_INDEXES = """
|
SEARCH_INDEXES = """
|
||||||
-- used by any tag clouds
|
-- used by any tag clouds
|
||||||
create index if not exists tag_tag_idx on tag (tag, claim_hash);
|
create index if not exists tag_tag_idx on tag (tag, claim_hash);
|
||||||
|
@ -194,6 +205,7 @@ class SQLDB:
|
||||||
CREATE_SUPPORT_TABLE +
|
CREATE_SUPPORT_TABLE +
|
||||||
CREATE_CLAIMTRIE_TABLE +
|
CREATE_CLAIMTRIE_TABLE +
|
||||||
CREATE_TAG_TABLE +
|
CREATE_TAG_TABLE +
|
||||||
|
CREATE_CHANGELOG_TRIGGER +
|
||||||
CREATE_LANGUAGE_TABLE
|
CREATE_LANGUAGE_TABLE
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -806,7 +818,7 @@ class SQLDB:
|
||||||
f"SELECT claim_hash, normalized FROM claim WHERE expiration_height = {height}"
|
f"SELECT claim_hash, normalized FROM claim WHERE expiration_height = {height}"
|
||||||
)
|
)
|
||||||
|
|
||||||
def enqueue_changes(self, height):
|
def enqueue_changes(self):
|
||||||
for claim in self.execute(f"""
|
for claim in self.execute(f"""
|
||||||
SELECT claimtrie.claim_hash as is_controlling,
|
SELECT claimtrie.claim_hash as is_controlling,
|
||||||
claimtrie.last_take_over_height,
|
claimtrie.last_take_over_height,
|
||||||
|
@ -814,7 +826,7 @@ class SQLDB:
|
||||||
(select group_concat(language, ' ') from language where language.claim_hash in (claim.claim_hash, claim.reposted_claim_hash)) as languages,
|
(select group_concat(language, ' ') from language where language.claim_hash in (claim.claim_hash, claim.reposted_claim_hash)) as languages,
|
||||||
claim.*
|
claim.*
|
||||||
FROM claim LEFT JOIN claimtrie USING (claim_hash)
|
FROM claim LEFT JOIN claimtrie USING (claim_hash)
|
||||||
WHERE claim.height = {height} OR claim.claim_hash in (SELECT claim_hash FROM support WHERE height = {height})
|
WHERE claim.claim_hash in (SELECT claim_hash FROM changelog)
|
||||||
"""):
|
"""):
|
||||||
claim = claim._asdict()
|
claim = claim._asdict()
|
||||||
id_set = set(filter(None, (claim['claim_hash'], claim['channel_hash'], claim['reposted_claim_hash'])))
|
id_set = set(filter(None, (claim['claim_hash'], claim['channel_hash'], claim['reposted_claim_hash'])))
|
||||||
|
@ -838,6 +850,7 @@ class SQLDB:
|
||||||
claim['languages'] = claim['languages'].split(' ') if claim['languages'] else []
|
claim['languages'] = claim['languages'].split(' ') if claim['languages'] else []
|
||||||
if not self.claim_queue.full():
|
if not self.claim_queue.full():
|
||||||
self.claim_queue.put_nowait(('update', claim))
|
self.claim_queue.put_nowait(('update', claim))
|
||||||
|
self.execute("delete from changelog;")
|
||||||
|
|
||||||
def enqueue_deleted(self, deleted_claims):
|
def enqueue_deleted(self, deleted_claims):
|
||||||
for claim_hash in deleted_claims:
|
for claim_hash in deleted_claims:
|
||||||
|
@ -940,6 +953,7 @@ class SQLDB:
|
||||||
r(first_sync_finished, self.db.cursor())
|
r(first_sync_finished, self.db.cursor())
|
||||||
self._fts_synced = True
|
self._fts_synced = True
|
||||||
r(self.enqueue_deleted, delete_claim_hashes)
|
r(self.enqueue_deleted, delete_claim_hashes)
|
||||||
|
r(self.enqueue_changes)
|
||||||
|
|
||||||
|
|
||||||
class LBRYLevelDB(LevelDB):
|
class LBRYLevelDB(LevelDB):
|
||||||
|
|
Loading…
Reference in a new issue