batched update blocked/filtered in ES

This commit is contained in:
Jack Robison 2022-11-04 13:41:41 -04:00
parent 0877e34289
commit 9fa2d0b6ca

View file

@ -174,28 +174,31 @@ class ElasticSyncService(BlockchainReaderService):
async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels): async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels):
only_channels = lambda x: {k: chan for k, (chan, repost) in x.items()} only_channels = lambda x: {k: chan for k, (chan, repost) in x.items()}
async def batched_update_filter(items: typing.Dict[bytes, bytes], channel: bool, censor_type: int):
batches = [{}]
for k, v in items.items():
if len(batches[-1]) == 2000:
batches.append({})
batches[-1][k] = v
for batch in batches:
if batch:
await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(censor_type, only_channels(batch)), slices=4)
if channel:
await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(censor_type, only_channels(batch), True),
slices=4)
await self.sync_client.indices.refresh(self.index)
if filtered_streams: if filtered_streams:
await self.sync_client.update_by_query( await batched_update_filter(filtered_streams, False, Censor.SEARCH)
self.index, body=self.update_filter_query(Censor.SEARCH, only_channels(filtered_streams)), slices=4)
await self.sync_client.indices.refresh(self.index)
if filtered_channels: if filtered_channels:
await self.sync_client.update_by_query( await batched_update_filter(filtered_channels, True, Censor.SEARCH)
self.index, body=self.update_filter_query(Censor.SEARCH, only_channels(filtered_channels)), slices=4)
await self.sync_client.indices.refresh(self.index)
await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(Censor.SEARCH, only_channels(filtered_channels), True), slices=4)
await self.sync_client.indices.refresh(self.index)
if blocked_streams: if blocked_streams:
await self.sync_client.update_by_query( await batched_update_filter(blocked_streams, False, Censor.RESOLVE)
self.index, body=self.update_filter_query(Censor.RESOLVE, only_channels(blocked_streams)), slices=4)
await self.sync_client.indices.refresh(self.index)
if blocked_channels: if blocked_channels:
await self.sync_client.update_by_query( await batched_update_filter(blocked_channels, True, Censor.RESOLVE)
self.index, body=self.update_filter_query(Censor.RESOLVE, only_channels(blocked_channels)), slices=4)
await self.sync_client.indices.refresh(self.index)
await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(Censor.RESOLVE, only_channels(blocked_channels), True), slices=4)
await self.sync_client.indices.refresh(self.index)
@staticmethod @staticmethod
def _upsert_claim_query(index, claim): def _upsert_claim_query(index, claim):