forked from LBRYCommunity/lbry-sdk
fix filtering/blocking
This commit is contained in:
parent
a2e7afa87f
commit
31144a490e
4 changed files with 16 additions and 8 deletions
|
@ -89,7 +89,8 @@ class BlockProcessor:
|
||||||
self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor')
|
self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor')
|
||||||
self.db = HubDB(
|
self.db = HubDB(
|
||||||
env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
|
env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
|
||||||
max_open_files=env.db_max_open_files, executor=self._chain_executor
|
max_open_files=env.db_max_open_files, blocking_channel_ids=env.blocking_channel_ids,
|
||||||
|
filtering_channel_ids=env.filtering_channel_ids, executor=self._chain_executor
|
||||||
)
|
)
|
||||||
self.shutdown_event = asyncio.Event()
|
self.shutdown_event = asyncio.Event()
|
||||||
self.coin = env.coin
|
self.coin = env.coin
|
||||||
|
|
|
@ -24,7 +24,8 @@ class BlockchainReader:
|
||||||
|
|
||||||
self.db = HubDB(
|
self.db = HubDB(
|
||||||
env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
|
env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
|
||||||
secondary_name=secondary_name, max_open_files=-1, executor=self._executor
|
secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids,
|
||||||
|
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor
|
||||||
)
|
)
|
||||||
self.last_state: typing.Optional[DBState] = None
|
self.last_state: typing.Optional[DBState] = None
|
||||||
self._refresh_interval = 0.1
|
self._refresh_interval = 0.1
|
||||||
|
|
|
@ -44,7 +44,8 @@ class HubDB:
|
||||||
|
|
||||||
def __init__(self, coin, db_dir: str, cache_MB: int = 512, reorg_limit: int = 200,
|
def __init__(self, coin, db_dir: str, cache_MB: int = 512, reorg_limit: int = 200,
|
||||||
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||||
secondary_name: str = '', max_open_files: int = 256, executor: ThreadPoolExecutor = None):
|
secondary_name: str = '', max_open_files: int = 256, blocking_channel_ids: List[str] = None,
|
||||||
|
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None):
|
||||||
self.logger = util.class_logger(__name__, self.__class__.__name__)
|
self.logger = util.class_logger(__name__, self.__class__.__name__)
|
||||||
self.coin = coin
|
self.coin = coin
|
||||||
self._executor = executor
|
self._executor = executor
|
||||||
|
@ -69,18 +70,18 @@ class HubDB:
|
||||||
self.es_sync_height = 0
|
self.es_sync_height = 0
|
||||||
|
|
||||||
# blocking/filtering dicts
|
# blocking/filtering dicts
|
||||||
# blocking_channels = self.env.default('BLOCKING_CHANNEL_IDS', '').split(' ')
|
blocking_channels = blocking_channel_ids or [] #self.env.default('BLOCKING_CHANNEL_IDS', '').split(' ')
|
||||||
# filtering_channels = self.env.default('FILTERING_CHANNEL_IDS', '').split(' ')
|
filtering_channels = filtering_channel_ids or [] #self.env.default('FILTERING_CHANNEL_IDS', '').split(' ')
|
||||||
self.blocked_streams = {}
|
self.blocked_streams = {}
|
||||||
self.blocked_channels = {}
|
self.blocked_channels = {}
|
||||||
self.blocking_channel_hashes = {
|
self.blocking_channel_hashes = {
|
||||||
# bytes.fromhex(channel_id) for channel_id in blocking_channels if channel_id
|
bytes.fromhex(channel_id) for channel_id in blocking_channels if channel_id
|
||||||
}
|
}
|
||||||
self.filtered_streams = {}
|
self.filtered_streams = {}
|
||||||
|
|
||||||
self.filtered_channels = {}
|
self.filtered_channels = {}
|
||||||
self.filtering_channel_hashes = {
|
self.filtering_channel_hashes = {
|
||||||
# bytes.fromhex(channel_id) for channel_id in filtering_channels if channel_id
|
bytes.fromhex(channel_id) for channel_id in filtering_channels if channel_id
|
||||||
}
|
}
|
||||||
|
|
||||||
self.tx_counts = None
|
self.tx_counts = None
|
||||||
|
|
|
@ -36,7 +36,8 @@ class Env:
|
||||||
allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None,
|
allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None,
|
||||||
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
|
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
|
||||||
session_timeout=None, drop_client=None, description=None, daily_fee=None,
|
session_timeout=None, drop_client=None, description=None, daily_fee=None,
|
||||||
database_query_timeout=None, db_max_open_files=512, elastic_notifier_port=None):
|
database_query_timeout=None, db_max_open_files=512, elastic_notifier_port=None,
|
||||||
|
blocking_channel_ids=None, filtering_channel_ids=None):
|
||||||
self.logger = class_logger(__name__, self.__class__.__name__)
|
self.logger = class_logger(__name__, self.__class__.__name__)
|
||||||
|
|
||||||
self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY')
|
self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY')
|
||||||
|
@ -118,6 +119,10 @@ class Env:
|
||||||
self.database_query_timeout = database_query_timeout if database_query_timeout is not None else \
|
self.database_query_timeout = database_query_timeout if database_query_timeout is not None else \
|
||||||
(float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0)
|
(float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0)
|
||||||
|
|
||||||
|
# Filtering / Blocking
|
||||||
|
self.blocking_channel_ids = (blocking_channel_ids if blocking_channel_ids is not None else self.default('BLOCKING_CHANNEL_IDS', '')).split(' ')
|
||||||
|
self.filtering_channel_ids = (filtering_channel_ids if filtering_channel_ids is not None else self.default('FILTERING_CHANNEL_IDS', '')).split(' ')
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def default(cls, envvar, default):
|
def default(cls, envvar, default):
|
||||||
return environ.get(envvar, default)
|
return environ.get(envvar, default)
|
||||||
|
|
Loading…
Add table
Reference in a new issue