refactor reload_blocking_filtering_streams

This commit is contained in:
Jack Robison 2021-09-21 13:27:05 -04:00
parent 8c75098a9a
commit 6ec70192fe
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 20 additions and 22 deletions

View file

@ -325,7 +325,7 @@ class BlockProcessor:
await self.run_in_thread(self.db.apply_expiration_extension_fork) await self.run_in_thread(self.db.apply_expiration_extension_fork)
# TODO: we shouldnt wait on the search index updating before advancing to the next block # TODO: we shouldnt wait on the search index updating before advancing to the next block
if not self.db.first_sync: if not self.db.first_sync:
self.db.reload_blocking_filtering_streams() await self.db.reload_blocking_filtering_streams()
await self.db.search_index.claim_consumer(self.claim_producer()) await self.db.search_index.claim_consumer(self.claim_producer())
await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels, await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels,
self.db.filtered_streams, self.db.filtered_channels) self.db.filtered_streams, self.db.filtered_channels)

View file

@ -481,32 +481,30 @@ class LevelDB:
count += 1 count += 1
return count return count
def reload_blocking_filtering_streams(self): async def reload_blocking_filtering_streams(self):
self.blocked_streams, self.blocked_channels = self.get_streams_and_channels_reposted_by_channel_hashes(self.blocking_channel_hashes) def reload():
self.filtered_streams, self.filtered_channels = self.get_streams_and_channels_reposted_by_channel_hashes(self.filtering_channel_hashes) self.blocked_streams, self.blocked_channels = self.get_streams_and_channels_reposted_by_channel_hashes(
self.blocking_channel_hashes
)
self.filtered_streams, self.filtered_channels = self.get_streams_and_channels_reposted_by_channel_hashes(
self.filtering_channel_hashes
)
await asyncio.get_event_loop().run_in_executor(None, reload)
def get_streams_and_channels_reposted_by_channel_hashes(self, reposter_channel_hashes: bytes): def get_streams_and_channels_reposted_by_channel_hashes(self, reposter_channel_hashes: Set[bytes]):
streams, channels = {}, {} streams, channels = {}, {}
for reposter_channel_hash in reposter_channel_hashes: for reposter_channel_hash in reposter_channel_hashes:
reposts = self.get_reposts_in_channel(reposter_channel_hash) for stream in self.prefix_db.channel_to_claim.iterate((reposter_channel_hash, ), include_key=False):
for repost in reposts: repost = self.get_repost(stream.claim_hash)
txo = self.get_claim_txo(repost) if repost:
if txo: txo = self.get_claim_txo(repost)
if txo.normalized_name.startswith('@'): if txo:
channels[repost] = reposter_channel_hash if txo.normalized_name.startswith('@'):
else: channels[repost] = reposter_channel_hash
streams[repost] = reposter_channel_hash else:
streams[repost] = reposter_channel_hash
return streams, channels return streams, channels
def get_reposts_in_channel(self, channel_hash):
reposts = set()
for value in self.db.iterator(prefix=Prefixes.channel_to_claim.pack_partial_key(channel_hash), include_key=False):
stream = Prefixes.channel_to_claim.unpack_value(value)
repost = self.get_repost(stream.claim_hash)
if repost:
reposts.add(repost)
return reposts
def get_channel_for_claim(self, claim_hash, tx_num, position) -> Optional[bytes]: def get_channel_for_claim(self, claim_hash, tx_num, position) -> Optional[bytes]:
return self.db.get(Prefixes.claim_to_channel.pack_key(claim_hash, tx_num, position)) return self.db.get(Prefixes.claim_to_channel.pack_key(claim_hash, tx_num, position))