From 66ed57c834e537f97150e97a04f281892d887391 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 21 Sep 2021 13:27:05 -0400 Subject: [PATCH] refactor reload_blocking_filtering_streams --- lbry/wallet/server/block_processor.py | 2 +- lbry/wallet/server/leveldb.py | 40 +++++++++++++-------------- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index afe6176d9..9bdb236b2 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -325,7 +325,7 @@ class BlockProcessor: await self.run_in_thread(self.db.apply_expiration_extension_fork) # TODO: we shouldnt wait on the search index updating before advancing to the next block if not self.db.first_sync: - self.db.reload_blocking_filtering_streams() + await self.db.reload_blocking_filtering_streams() await self.db.search_index.claim_consumer(self.claim_producer()) await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels, self.db.filtered_streams, self.db.filtered_channels) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index ce27501d6..df34a30b2 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -481,32 +481,30 @@ class LevelDB: count += 1 return count - def reload_blocking_filtering_streams(self): - self.blocked_streams, self.blocked_channels = self.get_streams_and_channels_reposted_by_channel_hashes(self.blocking_channel_hashes) - self.filtered_streams, self.filtered_channels = self.get_streams_and_channels_reposted_by_channel_hashes(self.filtering_channel_hashes) + async def reload_blocking_filtering_streams(self): + def reload(): + self.blocked_streams, self.blocked_channels = self.get_streams_and_channels_reposted_by_channel_hashes( + self.blocking_channel_hashes + ) + self.filtered_streams, self.filtered_channels = self.get_streams_and_channels_reposted_by_channel_hashes( + self.filtering_channel_hashes + ) + await asyncio.get_event_loop().run_in_executor(None, reload) - def get_streams_and_channels_reposted_by_channel_hashes(self, reposter_channel_hashes: bytes): + def get_streams_and_channels_reposted_by_channel_hashes(self, reposter_channel_hashes: Set[bytes]): streams, channels = {}, {} for reposter_channel_hash in reposter_channel_hashes: - reposts = self.get_reposts_in_channel(reposter_channel_hash) - for repost in reposts: - txo = self.get_claim_txo(repost) - if txo: - if txo.normalized_name.startswith('@'): - channels[repost] = reposter_channel_hash - else: - streams[repost] = reposter_channel_hash + for stream in self.prefix_db.channel_to_claim.iterate((reposter_channel_hash, ), include_key=False): + repost = self.get_repost(stream.claim_hash) + if repost: + txo = self.get_claim_txo(repost) + if txo: + if txo.normalized_name.startswith('@'): + channels[repost] = reposter_channel_hash + else: + streams[repost] = reposter_channel_hash return streams, channels - def get_reposts_in_channel(self, channel_hash): - reposts = set() - for value in self.db.iterator(prefix=Prefixes.channel_to_claim.pack_partial_key(channel_hash), include_key=False): - stream = Prefixes.channel_to_claim.unpack_value(value) - repost = self.get_repost(stream.claim_hash) - if repost: - reposts.add(repost) - return reposts - def get_channel_for_claim(self, claim_hash, tx_num, position) -> Optional[bytes]: return self.db.get(Prefixes.claim_to_channel.pack_key(claim_hash, tx_num, position))