simplify sync and use asyncio Queue instead

This commit is contained in:
Victor Shyba 2021-02-22 16:26:18 -03:00
parent ec89bcac8e
commit 920dad524a
2 changed files with 17 additions and 48 deletions

View file

@ -81,19 +81,17 @@ class SearchIndex:
return self.client.indices.delete(self.index, ignore_unavailable=True) return self.client.indices.delete(self.index, ignore_unavailable=True)
async def sync_queue(self, claim_queue): async def sync_queue(self, claim_queue):
self.logger.info("Writing to index from a queue with %d elements.", claim_queue.qsize())
if claim_queue.empty(): if claim_queue.empty():
return return
to_delete, to_update = [], [] actions = []
while not claim_queue.empty(): while not claim_queue.empty():
operation, doc = claim_queue.get_nowait() operation, doc = claim_queue.get_nowait()
if operation == 'delete': actions.append(extract_doc(doc, self.index))
to_delete.append(doc) self.logger.info("prepare update: %d elements. Queue: %d elements", len(actions), claim_queue.qsize())
else:
to_update.append(doc)
await self.delete(to_delete)
await self.client.indices.refresh(self.index) await self.client.indices.refresh(self.index)
for bulk in range(0, len(to_update), 400): self.logger.info("update done: %d elements. Queue: %d elements", len(actions), claim_queue.qsize())
await self.update(to_update[bulk:bulk+400]) await async_bulk(self.client, actions)
await self.client.indices.refresh(self.index) await self.client.indices.refresh(self.index)
await self.client.indices.flush(self.index) await self.client.indices.flush(self.index)
@ -129,40 +127,6 @@ class SearchIndex:
await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True), slices=32) await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True), slices=32)
await self.client.indices.refresh(self.index) await self.client.indices.refresh(self.index)
async def update(self, claims):
if not claims:
return
actions = [extract_doc(claim, self.index) for claim in claims]
names = []
claim_ids = []
for claim in claims:
if claim['is_controlling']:
names.append(claim['normalized'])
claim_ids.append(claim['claim_id'])
if names:
update = expand_query(name__in=names, not_claim_id=claim_ids, is_controlling=True)
update['script'] = {
"source": "ctx._source.is_controlling=false",
"lang": "painless"
}
await self.client.indices.refresh(self.index)
await self.client.update_by_query(self.index, body=update)
await self.client.indices.refresh(self.index)
await async_bulk(self.client, actions)
async def delete(self, claim_ids):
if not claim_ids:
return
actions = [{'_index': self.index, '_op_type': 'delete', '_id': claim_id} for claim_id in claim_ids]
await async_bulk(self.client, actions, raise_on_error=False)
update = expand_query(channel_id__in=claim_ids)
update['script'] = {
"source": "ctx._source.signature_valid=false",
"lang": "painless"
}
await self.client.indices.refresh(self.index)
await self.client.update_by_query(self.index, body=update)
async def delete_above_height(self, height): async def delete_above_height(self, height):
await self.client.delete_by_query(self.index, expand_query(height='>'+str(height))) await self.client.delete_by_query(self.index, expand_query(height='>'+str(height)))
await self.client.indices.refresh(self.index) await self.client.indices.refresh(self.index)

View file

@ -1,10 +1,12 @@
import os import os
from asyncio import Queue
import apsw import apsw
from typing import Union, Tuple, Set, List from typing import Union, Tuple, Set, List
from itertools import chain from itertools import chain
from decimal import Decimal from decimal import Decimal
from collections import namedtuple from collections import namedtuple
from multiprocessing import Manager, Queue from multiprocessing import Manager
from binascii import unhexlify, hexlify from binascii import unhexlify, hexlify
from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.util import class_logger from lbry.wallet.server.util import class_logger
@ -143,6 +145,11 @@ class SQLDB:
begin begin
insert or ignore into changelog (claim_hash) values (new.claim_hash); insert or ignore into changelog (claim_hash) values (new.claim_hash);
end; end;
create trigger if not exists claimtrie_changelog after update on claimtrie
begin
insert or ignore into changelog (claim_hash) values (new.claim_hash);
insert or ignore into changelog (claim_hash) values (old.claim_hash);
end;
""" """
SEARCH_INDEXES = """ SEARCH_INDEXES = """
@ -226,7 +233,7 @@ class SQLDB:
unhexlify(channel_id)[::-1] for channel_id in filtering_channels if channel_id unhexlify(channel_id)[::-1] for channel_id in filtering_channels if channel_id
} }
self.trending = trending self.trending = trending
self.claim_queue = Queue(maxsize=100_000) self.claim_queue = Queue()
def open(self): def open(self):
self.db = apsw.Connection( self.db = apsw.Connection(
@ -845,14 +852,12 @@ class SQLDB:
claim['tags'] = claim['tags'].split(',,') if claim['tags'] else [] claim['tags'] = claim['tags'].split(',,') if claim['tags'] else []
claim['languages'] = claim['languages'].split(' ') if claim['languages'] else [] claim['languages'] = claim['languages'].split(' ') if claim['languages'] else []
if not self.claim_queue.full(): self.claim_queue.put_nowait(('update', claim))
self.claim_queue.put_nowait(('update', claim))
self.execute("delete from changelog;") self.execute("delete from changelog;")
def enqueue_deleted(self, deleted_claims): def enqueue_deleted(self, deleted_claims):
for claim_hash in deleted_claims: for claim_hash in deleted_claims:
if not self.claim_queue.full(): self.claim_queue.put_nowait(('delete', hexlify(claim_hash[::-1]).decode()))
self.claim_queue.put_nowait(('delete', hexlify(claim_hash[::-1]).decode()))
def advance_txs(self, height, all_txs, header, daemon_height, timer): def advance_txs(self, height, all_txs, header, daemon_height, timer):
insert_claims = [] insert_claims = []