This commit is contained in:
Jack Robison 2022-02-01 14:04:12 -05:00
parent d7e50b269f
commit 888d47f88b
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 22 additions and 8 deletions

View file

@ -63,6 +63,12 @@ class BlockchainReader:
self.advance(height) self.advance(height)
self.clear_caches() self.clear_caches()
self.last_state = state self.last_state = state
self.db.blocked_streams, self.db.blocked_channels = self.db.get_streams_and_channels_reposted_by_channel_hashes(
self.db.blocking_channel_hashes
)
self.db.filtered_streams, self.db.filtered_channels = self.db.get_streams_and_channels_reposted_by_channel_hashes(
self.db.filtering_channel_hashes
)
async def poll_for_changes(self): async def poll_for_changes(self):
await asyncio.get_event_loop().run_in_executor(self._executor, self._detect_changes) await asyncio.get_event_loop().run_in_executor(self._executor, self._detect_changes)

View file

@ -261,6 +261,11 @@ class ElasticWriter(BlockchainReader):
else: else:
success += 1 success += 1
await self.sync_client.indices.refresh(self.index) await self.sync_client.indices.refresh(self.index)
await self.db.reload_blocking_filtering_streams()
await self.apply_filters(
self.db.blocked_streams, self.db.blocked_channels, self.db.filtered_streams,
self.db.filtered_channels
)
self.write_es_height(self.db.db_height, self.db.db_tip[::-1].hex()) self.write_es_height(self.db.db_height, self.db.db_tip[::-1].hex())
self.log.info("Indexing block %i done. %i/%i successful", self._last_wrote_height, success, cnt) self.log.info("Indexing block %i done. %i/%i successful", self._last_wrote_height, success, cnt)
self._touched_claims.clear() self._touched_claims.clear()

View file

@ -122,8 +122,8 @@ class Env:
(float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0) (float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0)
# Filtering / Blocking # Filtering / Blocking
self.blocking_channel_ids = (blocking_channel_ids if blocking_channel_ids is not None else self.default('BLOCKING_CHANNEL_IDS', '')).split(' ') self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default('BLOCKING_CHANNEL_IDS', '').split(' ')
self.filtering_channel_ids = (filtering_channel_ids if filtering_channel_ids is not None else self.default('FILTERING_CHANNEL_IDS', '')).split(' ') self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default('FILTERING_CHANNEL_IDS', '').split(' ')
@classmethod @classmethod
def default(cls, envvar, default): def default(cls, envvar, default):

View file

@ -1513,9 +1513,11 @@ class StreamCommands(ClaimTestCase):
await self.channel_create('@filtering', '0.1') await self.channel_create('@filtering', '0.1')
) )
self.conductor.spv_node.server.db.filtering_channel_hashes.add(bytes.fromhex(filtering_channel_id)) self.conductor.spv_node.server.db.filtering_channel_hashes.add(bytes.fromhex(filtering_channel_id))
self.assertEqual(0, len(self.conductor.spv_node.server.db.filtered_streams)) self.conductor.spv_node.es_writer.db.filtering_channel_hashes.add(bytes.fromhex(filtering_channel_id))
self.assertEqual(0, len(self.conductor.spv_node.es_writer.db.filtered_streams))
await self.stream_repost(bad_content_id, 'filter1', '0.1', channel_name='@filtering') await self.stream_repost(bad_content_id, 'filter1', '0.1', channel_name='@filtering')
self.assertEqual(1, len(self.conductor.spv_node.server.db.filtered_streams)) self.assertEqual(1, len(self.conductor.spv_node.es_writer.db.filtered_streams))
# search for filtered content directly # search for filtered content directly
result = await self.out(self.daemon.jsonrpc_claim_search(name='bad_content')) result = await self.out(self.daemon.jsonrpc_claim_search(name='bad_content'))
@ -1560,15 +1562,16 @@ class StreamCommands(ClaimTestCase):
# test setting from env vars and starting from scratch # test setting from env vars and starting from scratch
await self.conductor.spv_node.stop(False) await self.conductor.spv_node.stop(False)
await self.conductor.spv_node.start(self.conductor.lbcwallet_node, await self.conductor.spv_node.start(self.conductor.lbcwallet_node,
extraconf={'BLOCKING_CHANNEL_IDS': blocking_channel_id, extraconf={'blocking_channel_ids': [blocking_channel_id],
'FILTERING_CHANNEL_IDS': filtering_channel_id}) 'filtering_channel_ids': [filtering_channel_id]})
await self.daemon.wallet_manager.reset() await self.daemon.wallet_manager.reset()
self.assertEqual(0, len(self.conductor.spv_node.server.db.blocked_streams)) self.assertEqual(0, len(self.conductor.spv_node.es_writer.db.blocked_streams))
await self.stream_repost(bad_content_id, 'block1', '0.1', channel_name='@blocking') await self.stream_repost(bad_content_id, 'block1', '0.1', channel_name='@blocking')
self.assertEqual(1, len(self.conductor.spv_node.server.db.blocked_streams)) self.assertEqual(1, len(self.conductor.spv_node.es_writer.db.blocked_streams))
# blocked content is not resolveable # blocked content is not resolveable
print((await self.resolve('lbry://@some_channel/bad_content')))
error = (await self.resolve('lbry://@some_channel/bad_content'))['error'] error = (await self.resolve('lbry://@some_channel/bad_content'))['error']
self.assertEqual(error['name'], 'BLOCKED') self.assertEqual(error['name'], 'BLOCKED')
self.assertTrue(error['text'].startswith("Resolve of 'lbry://@some_channel/bad_content' was censored")) self.assertTrue(error['text'].startswith("Resolve of 'lbry://@some_channel/bad_content' was censored"))