include the channel being filtered/blocked

This commit is contained in:
Victor Shyba 2021-01-31 04:42:22 -03:00
parent 0929088b12
commit e4d06a088b
2 changed files with 6 additions and 2 deletions

View file

@ -217,8 +217,6 @@ class BlockProcessor:
start = time.perf_counter() start = time.perf_counter()
await self.run_in_thread_with_lock(self.advance_blocks, blocks) await self.run_in_thread_with_lock(self.advance_blocks, blocks)
await self.db.search_index.sync_queue(self.sql.claim_queue) await self.db.search_index.sync_queue(self.sql.claim_queue)
await self.db.search_index.apply_filters(self.sql.blocked_streams, self.sql.blocked_channels,
self.sql.filtered_streams, self.sql.filtered_channels)
for cache in self.search_cache.values(): for cache in self.search_cache.values():
cache.clear() cache.clear()
self.history_cache.clear() self.history_cache.clear()
@ -232,6 +230,8 @@ class BlockProcessor:
s = '' if len(blocks) == 1 else 's' s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time)) self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time))
if self._caught_up_event.is_set(): if self._caught_up_event.is_set():
await self.db.search_index.apply_filters(self.sql.blocked_streams, self.sql.blocked_channels,
self.sql.filtered_streams, self.sql.filtered_channels)
await self.notifications.on_block(self.touched, self.height) await self.notifications.on_block(self.touched, self.height)
self.touched = set() self.touched = set()
elif hprevs[0] != chain[0]: elif hprevs[0] != chain[0]:

View file

@ -99,12 +99,16 @@ class SearchIndex:
await self.client.update_by_query(self.index, body=make_query(1, filtered_streams), slices=32) await self.client.update_by_query(self.index, body=make_query(1, filtered_streams), slices=32)
await self.client.indices.refresh(self.index) await self.client.indices.refresh(self.index)
if filtered_channels: if filtered_channels:
await self.client.update_by_query(self.index, body=make_query(1, filtered_channels), slices=32)
await self.client.indices.refresh(self.index)
await self.client.update_by_query(self.index, body=make_query(1, filtered_channels, True), slices=32) await self.client.update_by_query(self.index, body=make_query(1, filtered_channels, True), slices=32)
await self.client.indices.refresh(self.index) await self.client.indices.refresh(self.index)
if blocked_streams: if blocked_streams:
await self.client.update_by_query(self.index, body=make_query(2, blocked_streams), slices=32) await self.client.update_by_query(self.index, body=make_query(2, blocked_streams), slices=32)
await self.client.indices.refresh(self.index) await self.client.indices.refresh(self.index)
if blocked_channels: if blocked_channels:
await self.client.update_by_query(self.index, body=make_query(2, blocked_channels), slices=32)
await self.client.indices.refresh(self.index)
await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True), slices=32) await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True), slices=32)
await self.client.indices.refresh(self.index) await self.client.indices.refresh(self.index)