diff --git a/lbry/wallet/server/chain_reader.py b/lbry/wallet/server/chain_reader.py index 7f1f8b48f..d3e05c277 100644 --- a/lbry/wallet/server/chain_reader.py +++ b/lbry/wallet/server/chain_reader.py @@ -63,6 +63,12 @@ class BlockchainReader: self.advance(height) self.clear_caches() self.last_state = state + self.db.blocked_streams, self.db.blocked_channels = self.db.get_streams_and_channels_reposted_by_channel_hashes( + self.db.blocking_channel_hashes + ) + self.db.filtered_streams, self.db.filtered_channels = self.db.get_streams_and_channels_reposted_by_channel_hashes( + self.db.filtering_channel_hashes + ) async def poll_for_changes(self): await asyncio.get_event_loop().run_in_executor(self._executor, self._detect_changes) diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index 494293f8d..699f0709c 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -261,6 +261,11 @@ class ElasticWriter(BlockchainReader): else: success += 1 await self.sync_client.indices.refresh(self.index) + await self.db.reload_blocking_filtering_streams() + await self.apply_filters( + self.db.blocked_streams, self.db.blocked_channels, self.db.filtered_streams, + self.db.filtered_channels + ) self.write_es_height(self.db.db_height, self.db.db_tip[::-1].hex()) self.log.info("Indexing block %i done. %i/%i successful", self._last_wrote_height, success, cnt) self._touched_claims.clear() diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index f2bd8b5e8..74f88b938 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -122,8 +122,8 @@ class Env: (float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0) # Filtering / Blocking - self.blocking_channel_ids = (blocking_channel_ids if blocking_channel_ids is not None else self.default('BLOCKING_CHANNEL_IDS', '')).split(' ') - self.filtering_channel_ids = (filtering_channel_ids if filtering_channel_ids is not None else self.default('FILTERING_CHANNEL_IDS', '')).split(' ') + self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default('BLOCKING_CHANNEL_IDS', '').split(' ') + self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default('FILTERING_CHANNEL_IDS', '').split(' ') @classmethod def default(cls, envvar, default): diff --git a/tests/integration/claims/test_claim_commands.py b/tests/integration/claims/test_claim_commands.py index e9d1487cb..1a3ddb4ee 100644 --- a/tests/integration/claims/test_claim_commands.py +++ b/tests/integration/claims/test_claim_commands.py @@ -1513,9 +1513,11 @@ class StreamCommands(ClaimTestCase): await self.channel_create('@filtering', '0.1') ) self.conductor.spv_node.server.db.filtering_channel_hashes.add(bytes.fromhex(filtering_channel_id)) - self.assertEqual(0, len(self.conductor.spv_node.server.db.filtered_streams)) + self.conductor.spv_node.es_writer.db.filtering_channel_hashes.add(bytes.fromhex(filtering_channel_id)) + + self.assertEqual(0, len(self.conductor.spv_node.es_writer.db.filtered_streams)) await self.stream_repost(bad_content_id, 'filter1', '0.1', channel_name='@filtering') - self.assertEqual(1, len(self.conductor.spv_node.server.db.filtered_streams)) + self.assertEqual(1, len(self.conductor.spv_node.es_writer.db.filtered_streams)) # search for filtered content directly result = await self.out(self.daemon.jsonrpc_claim_search(name='bad_content')) @@ -1560,15 +1562,16 @@ class StreamCommands(ClaimTestCase): # test setting from env vars and starting from scratch await self.conductor.spv_node.stop(False) await self.conductor.spv_node.start(self.conductor.lbcwallet_node, - extraconf={'BLOCKING_CHANNEL_IDS': blocking_channel_id, - 'FILTERING_CHANNEL_IDS': filtering_channel_id}) + extraconf={'blocking_channel_ids': [blocking_channel_id], + 'filtering_channel_ids': [filtering_channel_id]}) await self.daemon.wallet_manager.reset() - self.assertEqual(0, len(self.conductor.spv_node.server.db.blocked_streams)) + self.assertEqual(0, len(self.conductor.spv_node.es_writer.db.blocked_streams)) await self.stream_repost(bad_content_id, 'block1', '0.1', channel_name='@blocking') - self.assertEqual(1, len(self.conductor.spv_node.server.db.blocked_streams)) + self.assertEqual(1, len(self.conductor.spv_node.es_writer.db.blocked_streams)) # blocked content is not resolveable + print((await self.resolve('lbry://@some_channel/bad_content'))) error = (await self.resolve('lbry://@some_channel/bad_content'))['error'] self.assertEqual(error['name'], 'BLOCKED') self.assertTrue(error['text'].startswith("Resolve of 'lbry://@some_channel/bad_content' was censored"))