Compare commits

..

8 commits

Author SHA1 Message Date
Jonathan Moody
d495ce9f0a Move IP version check earlier. Property ipv4_mapped
is only defined on IPv6 addrs.
2023-01-17 16:42:43 -06:00
Jonathan Moody
8794ff48e0 Revert "Bump protobuf from 3.17.2 to 3.18.3"
This reverts commit 75d64f9dc6.
2023-01-16 15:35:55 -06:00
Jonathan Moody
fa0d03fe95 Rework StatusServer start() to handle lists of addresses, hostnames.
Handle and retry EADDRINUSE errors.
2023-01-16 14:05:26 -06:00
Jonathan Moody
14f2f3b55b Exclude mapped IPv4 addresses. Add resolve_host() code from client. 2023-01-16 14:04:14 -06:00
Jonathan Moody
9c43c811a1 Handle mapped IPv4 address more neatly.
(for consistency with client code)
2023-01-11 11:17:43 -06:00
Jonathan Moody
6c037b29b5 Rename "p" -> "proto" for consistency with lbry-sdk.
Lint does not like variable named "p".
2022-12-31 12:46:32 -06:00
Jonathan Moody
252a1aa165 Remove override for 'localhost' allowing Hub to start server on IPv6. 2022-12-29 13:16:47 -06:00
Jonathan Moody
f370e263b5 Add IPv6 support to StatusServer and related classes. 2022-12-28 16:56:34 -06:00
22 changed files with 343 additions and 285 deletions

View file

@ -17,7 +17,7 @@ services:
- "lbry_rocksdb:/database" - "lbry_rocksdb:/database"
environment: environment:
- HUB_COMMAND=scribe - HUB_COMMAND=scribe
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1312050/lbry-rocksdb.tar - SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1256013/lbry-rocksdb.tar
command: # for full options, see `scribe --help` command: # for full options, see `scribe --help`
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245" - "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--max_query_workers=2" - "--max_query_workers=2"

View file

@ -14,7 +14,7 @@ services:
- "lbry_rocksdb:/database" - "lbry_rocksdb:/database"
environment: environment:
- HUB_COMMAND=scribe - HUB_COMMAND=scribe
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1312050/lbry-rocksdb.tar - SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1256013/lbry-rocksdb.tar
command: command:
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245" - "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--max_query_workers=2" - "--max_query_workers=2"

View file

@ -5,16 +5,15 @@ import hmac
import ipaddress import ipaddress
import logging import logging
import logging.handlers import logging.handlers
import socket
import typing import typing
import collections import collections
from ctypes import cast, memmove, POINTER, c_void_p
from bisect import insort_right from bisect import insort_right
from collections import deque from collections import deque
from decimal import Decimal from decimal import Decimal
from typing import Iterable, Deque from typing import Iterable, Deque
from asyncio import get_event_loop, Event from asyncio import get_event_loop, Event
from prometheus_client import Counter from prometheus_client import Counter
from rehash.structs import EVPobject
from hub.schema.tags import clean_tags from hub.schema.tags import clean_tags
from hub.schema.url import normalize_name from hub.schema.url import normalize_name
from hub.error import TooManyClaimSearchParametersError from hub.error import TooManyClaimSearchParametersError
@ -155,6 +154,38 @@ def protocol_version(client_req, min_tuple, max_tuple):
return result, client_min return result, client_min
async def resolve_host(url: str, port: int, proto: str,
family: int = socket.AF_INET, all_results: bool = False) \
-> typing.Union[str, typing.List[str]]:
if proto not in ['udp', 'tcp']:
raise Exception("invalid protocol")
try:
if ipaddress.ip_address(url):
return [url] if all_results else url
except ValueError:
pass
loop = asyncio.get_running_loop()
records = await loop.getaddrinfo(
url, port,
proto=socket.IPPROTO_TCP if proto == 'tcp' else socket.IPPROTO_UDP,
type=socket.SOCK_STREAM if proto == 'tcp' else socket.SOCK_DGRAM,
family=family,
)
def addr_not_ipv4_mapped(rec):
_, _, _, _, sockaddr = rec
ipaddr = ipaddress.ip_address(sockaddr[0])
return ipaddr.version != 6 or not ipaddr.ipv4_mapped
records = filter(addr_not_ipv4_mapped, records)
results = [sockaddr[0] for fam, type, prot, canonname, sockaddr in records]
if not results and not all_results:
raise socket.gaierror(
socket.EAI_ADDRFAMILY,
'The specified network host does not have any network '
'addresses in the requested address family'
)
return results if all_results else results[0]
class LRUCacheWithMetrics: class LRUCacheWithMetrics:
__slots__ = [ __slots__ = [
'capacity', 'capacity',
@ -579,11 +610,13 @@ IPV4_TO_6_RELAY_SUBNET = ipaddress.ip_network('192.88.99.0/24')
def is_valid_public_ipv4(address, allow_localhost: bool = False, allow_lan: bool = False): def is_valid_public_ipv4(address, allow_localhost: bool = False, allow_lan: bool = False):
try: try:
parsed_ip = ipaddress.ip_address(address) parsed_ip = ipaddress.ip_address(address)
if parsed_ip.is_loopback and allow_localhost: if parsed_ip.version != 4:
return True return False
if allow_lan and parsed_ip.is_private: if parsed_ip.is_loopback:
return True return allow_localhost
if any((parsed_ip.version != 4, parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback, if parsed_ip.is_private:
return allow_lan
if any((parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback,
parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private)): parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private)):
return False return False
else: else:
@ -592,6 +625,23 @@ def is_valid_public_ipv4(address, allow_localhost: bool = False, allow_lan: bool
except (ipaddress.AddressValueError, ValueError): except (ipaddress.AddressValueError, ValueError):
return False return False
def is_valid_public_ipv6(address, allow_localhost: bool = False, allow_lan: bool = False):
try:
parsed_ip = ipaddress.ip_address(address)
if parsed_ip.version != 6:
return False
if parsed_ip.is_loopback:
return allow_localhost
if parsed_ip.is_private:
return allow_lan
return not any((parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback,
parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private,
parsed_ip.ipv4_mapped))
except (ipaddress.AddressValueError, ValueError):
return False
def is_valid_public_ip(address, **kwargs):
return is_valid_public_ipv6(address, **kwargs) or is_valid_public_ipv4(address, **kwargs)
def sha256(x): def sha256(x):
"""Simple wrapper of hashlib sha256.""" """Simple wrapper of hashlib sha256."""
@ -1061,41 +1111,3 @@ async def asyncify_for_loop(gen, ticks_per_sleep: int = 1000):
yield item yield item
if cnt % ticks_per_sleep == 0: if cnt % ticks_per_sleep == 0:
await async_sleep(0) await async_sleep(0)
_SHA256_DIGEST_STATE_SIZE = 120
class ResumableSHA256:
__slots__ = ['_hasher']
def __init__(self, state: typing.Optional[bytes] = None):
self._hasher = hashlib.sha256()
if state is not None:
ctx = self._get_evp_md_ctx()
ctx_size = ctx.digest.contents.ctx_size
if len(state) != _SHA256_DIGEST_STATE_SIZE != ctx_size:
raise Exception(f'invalid sha256 digester state, got {len(state)} bytes')
memmove(ctx.md_data, state, ctx_size)
def _get_evp_md_ctx(self):
c_evp_obj = cast(c_void_p(id(self._hasher)), POINTER(EVPobject))
if hasattr(c_evp_obj.contents.ctx, "contents"):
return c_evp_obj.contents.ctx.contents
else:
return c_evp_obj.contents.ctx
def get_state(self) -> bytes:
ctx = self._get_evp_md_ctx()
ctx_size = ctx.digest.contents.ctx_size
hasher_state = ctx.md_data[:ctx_size]
return hasher_state
def __copy__(self):
return ResumableSHA256(self.get_state())
def update(self, data: bytes):
self._hasher.update(data)
def digest(self):
return self._hasher.digest()

View file

@ -51,7 +51,6 @@ class DB_PREFIXES(enum.Enum):
reposted_count = b'j' reposted_count = b'j'
effective_amount = b'i' effective_amount = b'i'
future_effective_amount = b'k' future_effective_amount = b'k'
hashX_history_hash = b'l'
COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass

View file

@ -37,7 +37,7 @@ class SecondaryDB:
DB_VERSIONS = [7, 8, 9, 10, 11, 12] DB_VERSIONS = [7, 8, 9, 10, 11, 12]
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200, def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_tx_hashes: bool = False, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
blocking_channel_ids: List[str] = None, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768, index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768,
@ -47,9 +47,9 @@ class SecondaryDB:
self._executor = executor self._executor = executor
self._db_dir = db_dir self._db_dir = db_dir
self._reorg_limit = reorg_limit self._reorg_limit = reorg_limit
self._cache_all_claim_txos = cache_all_claim_txos
self._cache_all_tx_hashes = cache_all_tx_hashes self._cache_all_tx_hashes = cache_all_tx_hashes
self._secondary_name = secondary_name self._secondary_name = secondary_name
self._need_restart_path = os.path.join(self._db_dir, 'NEED_SCRIBE_RESTART')
if secondary_name: if secondary_name:
assert max_open_files == -1, 'max open files must be -1 for secondary readers' assert max_open_files == -1, 'max open files must be -1 for secondary readers'
self._db_max_open_files = max_open_files self._db_max_open_files = max_open_files
@ -100,6 +100,9 @@ class SecondaryDB:
self.total_transactions: List[bytes] = [] self.total_transactions: List[bytes] = []
self.tx_num_mapping: Dict[bytes, int] = {} self.tx_num_mapping: Dict[bytes, int] = {}
# these are only used if the cache_all_claim_txos setting is on
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict)
self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH) self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH)
def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]: def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]:
@ -953,6 +956,21 @@ class SecondaryDB:
else: else:
assert self.db_tx_count == 0 assert self.db_tx_count == 0
async def _read_claim_txos(self):
def read_claim_txos():
set_claim_to_txo = self.claim_to_txo.__setitem__
for k, v in self.prefix_db.claim_to_txo.iterate(fill_cache=False):
set_claim_to_txo(k.claim_hash, v)
self.txo_to_claim[v.tx_num][v.position] = k.claim_hash
self.claim_to_txo.clear()
self.txo_to_claim.clear()
start = time.perf_counter()
self.logger.info("loading claims")
await asyncio.get_event_loop().run_in_executor(self._executor, read_claim_txos)
ts = time.perf_counter() - start
self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4))
# async def _read_headers(self): # async def _read_headers(self):
# # if self.headers is not None: # # if self.headers is not None:
# # return # # return
@ -1001,27 +1019,6 @@ class SecondaryDB:
secondary_path = '' if not self._secondary_name else os.path.join( secondary_path = '' if not self._secondary_name else os.path.join(
self._db_dir, self._secondary_name self._db_dir, self._secondary_name
) )
open_db_canary = None
if self._secondary_name:
open_db_canary = os.path.join(self._db_dir, f'{self._secondary_name}-db-canary')
if os.path.exists(open_db_canary):
with open(self._need_restart_path, 'w+') as f:
f.write(f"{time.strftime(f'%Y-%m-%d %H:%M:%S')} {self._secondary_name}\n")
raise RuntimeError('scribe restart is needed')
else:
with open(open_db_canary, 'w'):
pass
else:
herald_db_canary = os.path.join(self._db_dir, 'lbry-reader-db-canary')
es_sync_db_canary = os.path.join(self._db_dir, 'lbry-elastic-writer-db-canary')
if os.path.exists(herald_db_canary):
os.remove(herald_db_canary)
if os.path.exists(es_sync_db_canary):
os.remove(es_sync_db_canary)
if os.path.exists(self._need_restart_path):
os.remove(self._need_restart_path)
db_path = os.path.join(self._db_dir, 'lbry-rocksdb') db_path = os.path.join(self._db_dir, 'lbry-rocksdb')
self.prefix_db = PrefixDB( self.prefix_db = PrefixDB(
db_path, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files, db_path, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files,
@ -1030,7 +1027,6 @@ class SecondaryDB:
) )
if secondary_path != '': if secondary_path != '':
os.remove(open_db_canary)
self.logger.info(f'opened db for read only: lbry-rocksdb (%s)', db_path) self.logger.info(f'opened db for read only: lbry-rocksdb (%s)', db_path)
else: else:
self.logger.info(f'opened db for writing: lbry-rocksdb (%s)', db_path) self.logger.info(f'opened db for writing: lbry-rocksdb (%s)', db_path)
@ -1067,6 +1063,8 @@ class SecondaryDB:
async def initialize_caches(self): async def initialize_caches(self):
await self._read_tx_counts() await self._read_tx_counts()
await self._read_block_hashes() await self._read_block_hashes()
if self._cache_all_claim_txos:
await self._read_claim_txos()
if self._cache_all_tx_hashes: if self._cache_all_tx_hashes:
await self._read_tx_hashes() await self._read_tx_hashes()
if self.db_height > 0: if self.db_height > 0:
@ -1156,9 +1154,15 @@ class SecondaryDB:
} }
def get_cached_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]: def get_cached_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]:
if self._cache_all_claim_txos:
return self.claim_to_txo.get(claim_hash)
return self.prefix_db.claim_to_txo.get_pending(claim_hash) return self.prefix_db.claim_to_txo.get_pending(claim_hash)
def get_cached_claim_hash(self, tx_num: int, position: int) -> Optional[bytes]: def get_cached_claim_hash(self, tx_num: int, position: int) -> Optional[bytes]:
if self._cache_all_claim_txos:
if tx_num not in self.txo_to_claim:
return
return self.txo_to_claim[tx_num].get(position, None)
v = self.prefix_db.txo_to_claim.get_pending(tx_num, position) v = self.prefix_db.txo_to_claim.get_pending(tx_num, position)
return None if not v else v.claim_hash return None if not v else v.claim_hash

View file

@ -273,7 +273,6 @@ class BasePrefixDB:
undo_c_f = self.column_families[DB_PREFIXES.undo.value] undo_c_f = self.column_families[DB_PREFIXES.undo.value]
undo_info = self._db.get((undo_c_f, undo_key)) undo_info = self._db.get((undo_c_f, undo_key))
self._op_stack.apply_packed_undo_ops(undo_info) self._op_stack.apply_packed_undo_ops(undo_info)
self._op_stack.validate_and_apply_stashed_ops()
try: try:
with self._db.write_batch(sync=True) as batch: with self._db.write_batch(sync=True) as batch:
batch_put = batch.put batch_put = batch.put

View file

@ -3,7 +3,6 @@ import struct
import array import array
import base64 import base64
from typing import Union, Tuple, NamedTuple, Optional from typing import Union, Tuple, NamedTuple, Optional
from hub.common import ResumableSHA256
from hub.db.common import DB_PREFIXES from hub.db.common import DB_PREFIXES
from hub.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow from hub.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow
from hub.schema.url import normalize_name from hub.schema.url import normalize_name
@ -1852,46 +1851,6 @@ class FutureEffectiveAmountPrefixRow(PrefixRow):
return cls.pack_key(claim_hash), cls.pack_value(future_effective_amount) return cls.pack_key(claim_hash), cls.pack_value(future_effective_amount)
class HashXHistoryHasherKey(NamedTuple):
hashX: bytes
class HashXHistoryHasherValue(NamedTuple):
hasher: ResumableSHA256
class HashXHistoryHasherPrefixRow(PrefixRow):
prefix = DB_PREFIXES.hashX_history_hash.value
key_struct = struct.Struct(b'>11s')
value_struct = struct.Struct(b'>120s')
cache_size = 1024 * 1024 * 64
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>11s').pack
]
@classmethod
def pack_key(cls, hashX: bytes):
return super().pack_key(hashX)
@classmethod
def unpack_key(cls, key: bytes) -> HashXHistoryHasherKey:
return HashXHistoryHasherKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, hasher: ResumableSHA256) -> bytes:
return super().pack_value(hasher.get_state())
@classmethod
def unpack_value(cls, data: bytes) -> HashXHistoryHasherValue:
return HashXHistoryHasherValue(ResumableSHA256(*super().unpack_value(data)))
@classmethod
def pack_item(cls, hashX: bytes, hasher: ResumableSHA256):
return cls.pack_key(hashX), cls.pack_value(hasher)
class PrefixDB(BasePrefixDB): class PrefixDB(BasePrefixDB):
def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64, def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64,
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None, secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None,
@ -1938,7 +1897,6 @@ class PrefixDB(BasePrefixDB):
self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack) self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack)
self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack) self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack)
self.future_effective_amount = FutureEffectiveAmountPrefixRow(db, self._op_stack) self.future_effective_amount = FutureEffectiveAmountPrefixRow(db, self._op_stack)
self.hashX_history_hasher = HashXHistoryHasherPrefixRow(db, self._op_stack)
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]: def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:

View file

@ -160,7 +160,7 @@ class RevertableOpStack:
# there is a value and we're not deleting it in this op # there is a value and we're not deleting it in this op
# check that a delete for the stored value is in the stack # check that a delete for the stored value is in the stack
raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}\nvs\n{stored_val}") raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}\nvs\n{stored_val}")
elif not has_stored_val: elif not stored_val:
raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}") raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
elif stored_val != op.value: elif stored_val != op.value:
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}") raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
@ -324,21 +324,9 @@ class RevertableOpStack:
""" """
Unpack and apply a sequence of undo ops from serialized undo bytes Unpack and apply a sequence of undo ops from serialized undo bytes
""" """
offset = 0 while packed:
packed_size = len(packed) op, packed = RevertableOp.unpack(packed)
while offset < packed_size: self.append_op(op)
is_put, key_len, val_len = _OP_STRUCT.unpack(packed[offset:offset + 9])
offset += 9
key = packed[offset:offset + key_len]
offset += key_len
value = packed[offset:offset + val_len]
offset += val_len
if is_put == 1:
op = RevertablePut(key, value)
else:
op = RevertableDelete(key, value)
self._stash.append(op)
self._stashed_last_op_for_key[op.key] = op
def get_pending_op(self, key: bytes) -> Optional[RevertableOp]: def get_pending_op(self, key: bytes) -> Optional[RevertableOp]:
if key in self._stashed_last_op_for_key: if key in self._stashed_last_op_for_key:

View file

@ -9,11 +9,11 @@ from hub.db.common import ResolveResult
class ElasticSyncDB(SecondaryDB): class ElasticSyncDB(SecondaryDB):
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200, def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_tx_hashes: bool = False, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
blocking_channel_ids: List[str] = None, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False): index_address_status=False):
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor, cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
index_address_status) index_address_status)
self.block_timestamp_cache = LRUCache(1024) self.block_timestamp_cache = LRUCache(1024)

View file

@ -3,11 +3,11 @@ from hub.env import Env
class ElasticEnv(Env): class ElasticEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None, def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None,
cache_all_tx_hashes=None, elastic_host=None, elastic_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, elastic_host=None, elastic_port=None,
es_index_prefix=None, elastic_notifier_host=None, elastic_notifier_port=None, es_index_prefix=None, elastic_notifier_host=None, elastic_notifier_port=None,
blocking_channel_ids=None, filtering_channel_ids=None, reindex=False): blocking_channel_ids=None, filtering_channel_ids=None, reindex=False):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
blocking_channel_ids, filtering_channel_ids) cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids)
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200) self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default( self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default(
@ -43,7 +43,7 @@ class ElasticEnv(Env):
elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain, elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain,
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit,
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes, prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
blocking_channel_ids=args.blocking_channel_ids, cache_all_claim_txos=args.cache_all_claim_txos, blocking_channel_ids=args.blocking_channel_ids,
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host, filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
elastic_notifier_port=args.elastic_notifier_port elastic_notifier_port=args.elastic_notifier_port
) )

View file

@ -49,7 +49,7 @@ class ElasticSyncService(BlockchainReaderService):
def open_db(self): def open_db(self):
env = self.env env = self.env
self.db = ElasticSyncDB( self.db = ElasticSyncDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids, env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor, filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status index_address_status=env.index_address_status

View file

@ -30,7 +30,7 @@ class Env:
pass pass
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
blocking_channel_ids=None, filtering_channel_ids=None, index_address_status=None): blocking_channel_ids=None, filtering_channel_ids=None, index_address_status=None):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
@ -46,6 +46,7 @@ class Env:
self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0) self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0)
self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False) self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False)
self.cache_all_claim_txos = cache_all_claim_txos if cache_all_claim_txos is not None else self.boolean('CACHE_ALL_CLAIM_TXOS', False)
# Filtering / Blocking # Filtering / Blocking
self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default( self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default(
'BLOCKING_CHANNEL_IDS', '').split(' ') 'BLOCKING_CHANNEL_IDS', '').split(' ')
@ -116,10 +117,6 @@ class Env:
result = [part.strip() for part in host.split(',')] result = [part.strip() for part in host.split(',')]
if len(result) == 1: if len(result) == 1:
result = result[0] result = result[0]
if result == 'localhost':
# 'localhost' resolves to ::1 (ipv6) on many systems, which fails on default setup of
# docker, using 127.0.0.1 instead forces ipv4
result = '127.0.0.1'
return result return result
def sane_max_sessions(self): def sane_max_sessions(self):
@ -170,6 +167,11 @@ class Env:
"resolve, transaction fetching, and block sync all faster at the expense of higher " "resolve, transaction fetching, and block sync all faster at the expense of higher "
"memory usage (at least 10GB more). Can be set in env with 'CACHE_ALL_TX_HASHES'.", "memory usage (at least 10GB more). Can be set in env with 'CACHE_ALL_TX_HASHES'.",
default=cls.boolean('CACHE_ALL_TX_HASHES', False)) default=cls.boolean('CACHE_ALL_TX_HASHES', False))
parser.add_argument('--cache_all_claim_txos', action='store_true',
help="Load all claim txos into memory. This will make address subscriptions and sync, "
"resolve, transaction fetching, and block sync all faster at the expense of higher "
"memory usage. Can be set in env with 'CACHE_ALL_CLAIM_TXOS'.",
default=cls.boolean('CACHE_ALL_CLAIM_TXOS', False))
parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0), parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0),
help="Port for prometheus metrics to listen on, disabled by default. " help="Port for prometheus metrics to listen on, disabled by default. "
"Can be set in env with 'PROMETHEUS_PORT'.") "Can be set in env with 'PROMETHEUS_PORT'.")

View file

@ -6,11 +6,11 @@ from hub.db import SecondaryDB
class HeraldDB(SecondaryDB): class HeraldDB(SecondaryDB):
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200, def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_tx_hashes: bool = False, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
blocking_channel_ids: List[str] = None, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768): index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768):
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor, cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
index_address_status, merkle_cache_size, tx_cache_size) index_address_status, merkle_cache_size, tx_cache_size)
# self.headers = None # self.headers = None

View file

@ -19,7 +19,7 @@ def parse_es_services(elastic_services_arg: str):
class ServerEnv(Env): class ServerEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
daemon_url=None, host=None, elastic_services=None, es_index_prefix=None, daemon_url=None, host=None, elastic_services=None, es_index_prefix=None,
tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None, tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None,
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
@ -29,7 +29,7 @@ class ServerEnv(Env):
merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None, merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None,
history_tx_cache_size=None, largest_address_history_cache_size=None): history_tx_cache_size=None, largest_address_history_cache_size=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
blocking_channel_ids, filtering_channel_ids, index_address_status) cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.host = host if host is not None else self.default('HOST', 'localhost') self.host = host if host is not None else self.default('HOST', 'localhost')
self.elastic_services = deque(parse_es_services(elastic_services or 'localhost:9200/localhost:19080')) self.elastic_services = deque(parse_es_services(elastic_services or 'localhost:9200/localhost:19080'))
@ -153,7 +153,7 @@ class ServerEnv(Env):
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port, es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file, udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file,
allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes, allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes,
country=args.country, payment_address=args.payment_address, cache_all_claim_txos=args.cache_all_claim_txos, country=args.country, payment_address=args.payment_address,
donation_address=args.donation_address, max_send=args.max_send, max_receive=args.max_receive, donation_address=args.donation_address, max_send=args.max_send, max_receive=args.max_receive,
max_sessions=args.max_sessions, session_timeout=args.session_timeout, max_sessions=args.max_sessions, session_timeout=args.session_timeout,
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee, drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,

View file

@ -1,3 +1,4 @@
import errno
import time import time
import typing import typing
import asyncio import asyncio
@ -53,7 +54,7 @@ class HubServerService(BlockchainReaderService):
def open_db(self): def open_db(self):
env = self.env env = self.env
self.db = HeraldDB( self.db = HeraldDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids, env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor, filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status, merkle_cache_size=env.merkle_cache_size, index_address_status=env.index_address_status, merkle_cache_size=env.merkle_cache_size,
@ -170,10 +171,20 @@ class HubServerService(BlockchainReaderService):
async def start_status_server(self): async def start_status_server(self):
if self.env.udp_port and int(self.env.udp_port): if self.env.udp_port and int(self.env.udp_port):
await self.status_server.start( hosts = self.env.cs_host()
0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country, started = False
self.env.host, self.env.udp_port, self.env.allow_lan_udp while not started:
) try:
await self.status_server.start(
0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country,
hosts, self.env.udp_port, self.env.allow_lan_udp
)
started = True
except OSError as e:
if e.errno is errno.EADDRINUSE:
await asyncio.sleep(3)
continue
raise
def _iter_start_tasks(self): def _iter_start_tasks(self):
yield self.start_status_server() yield self.start_status_server()

View file

@ -271,7 +271,8 @@ class SessionManager:
f'{host}:{port:d} : {e!r}') f'{host}:{port:d} : {e!r}')
raise raise
else: else:
self.logger.info(f'{kind} server listening on {host}:{port:d}') for s in self.servers[kind].sockets:
self.logger.info(f'{kind} server listening on {s.getsockname()[:2]}')
async def _start_external_servers(self): async def _start_external_servers(self):
"""Start listening on TCP and SSL ports, but only if the respective """Start listening on TCP and SSL ports, but only if the respective

View file

@ -1,10 +1,18 @@
import asyncio import asyncio
import ipaddress
import socket
import struct import struct
from time import perf_counter from time import perf_counter
import logging import logging
from typing import Optional, Tuple, NamedTuple from typing import Optional, Tuple, NamedTuple, List, Union
from hub.schema.attrs import country_str_to_int, country_int_to_str from hub.schema.attrs import country_str_to_int, country_int_to_str
from hub.common import LRUCache, is_valid_public_ipv4 from hub.common import (
LRUCache,
resolve_host,
is_valid_public_ip,
is_valid_public_ipv4,
is_valid_public_ipv6,
)
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -36,48 +44,75 @@ class SPVPing(NamedTuple):
return decoded return decoded
PONG_ENCODING = b'!BBL32s4sH' PONG_ENCODING_PRE = b'!BBL32s'
PONG_ENCODING_POST = b'!H'
class SPVPong(NamedTuple): class SPVPong(NamedTuple):
protocol_version: int protocol_version: int
flags: int flags: int
height: int height: int
tip: bytes tip: bytes
source_address_raw: bytes ipaddr: Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
country: int country: int
FLAG_AVAILABLE = 0b00000001
FLAG_IPV6 = 0b00000010
def encode(self): def encode(self):
return struct.pack(PONG_ENCODING, *self) return (struct.pack(PONG_ENCODING_PRE, self.protocol_version, self.flags, self.height, self.tip) +
self.encode_address(self.ipaddr) +
struct.pack(PONG_ENCODING_POST, self.country))
@staticmethod @staticmethod
def encode_address(address: str): def encode_address(address: Union[str, ipaddress.IPv4Address, ipaddress.IPv6Address]):
return bytes(int(b) for b in address.split(".")) if not isinstance(address, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
address = ipaddress.ip_address(address)
return address.packed
@classmethod @classmethod
def make(cls, flags: int, height: int, tip: bytes, source_address: str, country: str) -> bytes: def make(cls, flags: int, height: int, tip: bytes, source_address: str, country: str) -> bytes:
ipaddr = ipaddress.ip_address(source_address)
flags = (flags | cls.FLAG_IPV6) if ipaddr.version == 6 else (flags & ~cls.FLAG_IPV6)
return SPVPong( return SPVPong(
PROTOCOL_VERSION, flags, height, tip, PROTOCOL_VERSION, flags, height, tip,
cls.encode_address(source_address), ipaddr,
country_str_to_int(country) country_str_to_int(country)
).encode() )
@classmethod @classmethod
def make_sans_source_address(cls, flags: int, height: int, tip: bytes, country: str) -> Tuple[bytes, bytes]: def make_sans_source_address(cls, flags: int, height: int, tip: bytes, country: str) -> Tuple[bytes, bytes]:
pong = cls.make(flags, height, tip, '0.0.0.0', country) pong = cls.make(flags, height, tip, '0.0.0.0', country)
return pong[:38], pong[42:] pong = pong.encode()
return pong[0:1], pong[2:38], pong[42:]
@classmethod @classmethod
def decode(cls, packet: bytes): def decode(cls, packet: bytes):
return cls(*struct.unpack(PONG_ENCODING, packet[:44])) offset = 0
protocol_version, flags, height, tip = struct.unpack(PONG_ENCODING_PRE, packet[offset:offset+38])
offset += 38
if flags & cls.FLAG_IPV6:
addr_len = ipaddress.IPV6LENGTH // 8
ipaddr = ipaddress.ip_address(packet[offset:offset+addr_len])
offset += addr_len
else:
addr_len = ipaddress.IPV4LENGTH // 8
ipaddr = ipaddress.ip_address(packet[offset:offset+addr_len])
offset += addr_len
country, = struct.unpack(PONG_ENCODING_POST, packet[offset:offset+2])
offset += 2
return cls(protocol_version, flags, height, tip, ipaddr, country)
@property @property
def available(self) -> bool: def available(self) -> bool:
return (self.flags & 0b00000001) > 0 return (self.flags & self.FLAG_AVAILABLE) > 0
@property
def ipv6(self) -> bool:
return (self.flags & self.FLAG_IPV6) > 0
@property @property
def ip_address(self) -> str: def ip_address(self) -> str:
return ".".join(map(str, self.source_address_raw)) return self.ipaddr.compressed
@property @property
def country_name(self): def country_name(self):
@ -94,7 +129,8 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol):
def __init__( def __init__(
self, height: int, tip: bytes, country: str, self, height: int, tip: bytes, country: str,
throttle_cache_size: int = 1024, throttle_reqs_per_sec: int = 10, throttle_cache_size: int = 1024, throttle_reqs_per_sec: int = 10,
allow_localhost: bool = False, allow_lan: bool = False allow_localhost: bool = False, allow_lan: bool = False,
is_valid_ip = is_valid_public_ip,
): ):
super().__init__() super().__init__()
self.transport: Optional[asyncio.transports.DatagramTransport] = None self.transport: Optional[asyncio.transports.DatagramTransport] = None
@ -102,26 +138,27 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol):
self._tip = tip self._tip = tip
self._flags = 0 self._flags = 0
self._country = country self._country = country
self._left_cache = self._right_cache = None self._cache0 = self._cache1 = self.cache2 = None
self.update_cached_response() self.update_cached_response()
self._throttle = LRUCache(throttle_cache_size) self._throttle = LRUCache(throttle_cache_size)
self._should_log = LRUCache(throttle_cache_size) self._should_log = LRUCache(throttle_cache_size)
self._min_delay = 1 / throttle_reqs_per_sec self._min_delay = 1 / throttle_reqs_per_sec
self._allow_localhost = allow_localhost self._allow_localhost = allow_localhost
self._allow_lan = allow_lan self._allow_lan = allow_lan
self._is_valid_ip = is_valid_ip
self.closed = asyncio.Event() self.closed = asyncio.Event()
def update_cached_response(self): def update_cached_response(self):
self._left_cache, self._right_cache = SPVPong.make_sans_source_address( self._cache0, self._cache1, self._cache2 = SPVPong.make_sans_source_address(
self._flags, max(0, self._height), self._tip, self._country self._flags, max(0, self._height), self._tip, self._country
) )
def set_unavailable(self): def set_unavailable(self):
self._flags &= 0b11111110 self._flags &= ~SPVPong.FLAG_AVAILABLE
self.update_cached_response() self.update_cached_response()
def set_available(self): def set_available(self):
self._flags |= 0b00000001 self._flags |= SPVPong.FLAG_AVAILABLE
self.update_cached_response() self.update_cached_response()
def set_height(self, height: int, tip: bytes): def set_height(self, height: int, tip: bytes):
@ -141,17 +178,25 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol):
return False return False
def make_pong(self, host): def make_pong(self, host):
return self._left_cache + SPVPong.encode_address(host) + self._right_cache ipaddr = ipaddress.ip_address(host)
if ipaddr.version == 6:
flags = self._flags | SPVPong.FLAG_IPV6
else:
flags = self._flags & ~SPVPong.FLAG_IPV6
return (self._cache0 + flags.to_bytes(1, 'big') +
self._cache1 + SPVPong.encode_address(ipaddr) +
self._cache2)
def datagram_received(self, data: bytes, addr: Tuple[str, int]): def datagram_received(self, data: bytes, addr: Union[Tuple[str, int], Tuple[str, int, int, int]]):
if self.should_throttle(addr[0]): if self.should_throttle(addr[0]):
# print(f"throttled: {addr}")
return return
try: try:
SPVPing.decode(data) SPVPing.decode(data)
except (ValueError, struct.error, AttributeError, TypeError): except (ValueError, struct.error, AttributeError, TypeError):
# log.exception("derp") # log.exception("derp")
return return
if addr[1] >= 1024 and is_valid_public_ipv4( if addr[1] >= 1024 and self._is_valid_ip(
addr[0], allow_localhost=self._allow_localhost, allow_lan=self._allow_lan): addr[0], allow_localhost=self._allow_localhost, allow_lan=self._allow_lan):
self.transport.sendto(self.make_pong(addr[0]), addr) self.transport.sendto(self.make_pong(addr[0]), addr)
else: else:
@ -174,39 +219,78 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol):
class StatusServer: class StatusServer:
def __init__(self): def __init__(self):
self._protocol: Optional[SPVServerStatusProtocol] = None self._protocols: List[SPVServerStatusProtocol] = []
async def start(self, height: int, tip: bytes, country: str, interface: str, port: int, allow_lan: bool = False): async def _start(self, height: int, tip: bytes, country: str, addr: str, port: int, allow_lan: bool = False):
if self.is_running: ipaddr = ipaddress.ip_address(addr)
return if ipaddr.version == 4:
loop = asyncio.get_event_loop() proto = SPVServerStatusProtocol(
interface = interface if interface.lower() != 'localhost' else '127.0.0.1' height, tip, country,
self._protocol = SPVServerStatusProtocol( allow_localhost=ipaddr.is_loopback or ipaddr.is_unspecified,
height, tip, country, allow_localhost=interface == '127.0.0.1', allow_lan=allow_lan allow_lan=allow_lan,
) is_valid_ip=is_valid_public_ipv4,
await loop.create_datagram_endpoint(lambda: self._protocol, (interface, port)) )
log.info("started udp status server on %s:%i", interface, port) loop = asyncio.get_event_loop()
await loop.create_datagram_endpoint(lambda: proto, (ipaddr.compressed, port), family=socket.AF_INET)
elif ipaddr.version == 6:
proto = SPVServerStatusProtocol(
height, tip, country,
allow_localhost=ipaddr.is_loopback or ipaddr.is_unspecified,
allow_lan=allow_lan,
is_valid_ip=is_valid_public_ipv6,
)
# Because dualstack / IPv4 mapped address behavior on an IPv6 socket
# differs based on system config, create the socket with IPV6_V6ONLY.
# This disables the IPv4 mapped feature, so we don't need to consider
# when an IPv6 socket may interfere with IPv4 binding / traffic.
sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
sock.bind((ipaddr.compressed, port))
loop = asyncio.get_event_loop()
await loop.create_datagram_endpoint(lambda: proto, sock=sock)
else:
raise ValueError(f'unexpected IP address version {ipaddr.version}')
log.info("started udp%i status server on %s", ipaddr.version, proto.transport.get_extra_info('sockname')[:2])
self._protocols.append(proto)
async def start(self, height: int, tip: bytes, country: str, hosts: List[str], port: int, allow_lan: bool = False):
if not isinstance(hosts, list):
hosts = [hosts]
try:
for host in hosts:
addr = None
if not host:
resolved = ['::', '0.0.0.0'] # unspecified address
else:
resolved = await resolve_host(host, port, 'udp', family=socket.AF_UNSPEC, all_results=True)
for addr in resolved:
await self._start(height, tip, country, addr, port, allow_lan)
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
log.error("UDP status server failed to listen on (%s:%i) : %s", addr or host, port, e)
await self.stop()
raise
async def stop(self): async def stop(self):
if self.is_running: for proto in self._protocols:
await self._protocol.close() await proto.close()
self._protocol = None self._protocols.clear()
@property @property
def is_running(self): def is_running(self):
return self._protocol is not None return self._protocols
def set_unavailable(self): def set_unavailable(self):
if self.is_running: for proto in self._protocols:
self._protocol.set_unavailable() proto.set_unavailable()
def set_available(self): def set_available(self):
if self.is_running: for proto in self._protocols:
self._protocol.set_available() proto.set_available()
def set_height(self, height: int, tip: bytes): def set_height(self, height: int, tip: bytes):
if self.is_running: for proto in self._protocols:
self._protocol.set_height(height, tip) proto.set_height(height, tip)
class SPVStatusClientProtocol(asyncio.DatagramProtocol): class SPVStatusClientProtocol(asyncio.DatagramProtocol):
@ -217,9 +301,14 @@ class SPVStatusClientProtocol(asyncio.DatagramProtocol):
self.responses = responses self.responses = responses
self._ping_packet = SPVPing.make() self._ping_packet = SPVPing.make()
def datagram_received(self, data: bytes, addr: Tuple[str, int]): def datagram_received(self, data: bytes, addr: Union[Tuple[str, int], Tuple[str, int, int, int]]):
try: try:
self.responses.put_nowait(((addr, perf_counter()), SPVPong.decode(data))) if len(addr) > 2: # IPv6 with possible mapped IPv4
ipaddr = ipaddress.ip_address(addr[0])
if ipaddr.ipv4_mapped:
# mapped IPv4 address identified
addr = (ipaddr.ipv4_mapped.compressed, addr[1])
self.responses.put_nowait(((addr[:2], perf_counter()), SPVPong.decode(data)))
except (ValueError, struct.error, AttributeError, TypeError, RuntimeError): except (ValueError, struct.error, AttributeError, TypeError, RuntimeError):
return return
@ -230,7 +319,7 @@ class SPVStatusClientProtocol(asyncio.DatagramProtocol):
self.transport = None self.transport = None
log.info("closed udp spv server selection client") log.info("closed udp spv server selection client")
def ping(self, server: Tuple[str, int]): def ping(self, server: Union[Tuple[str, int], Tuple[str, int, int, int]]):
self.transport.sendto(self._ping_packet, server) self.transport.sendto(self._ping_packet, server)
def close(self): def close(self):

View file

@ -5,17 +5,17 @@ import time
from typing import List from typing import List
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
from bisect import bisect_right from bisect import bisect_right
from hub.common import ResumableSHA256 from hub.common import sha256
from hub.db import SecondaryDB from hub.db import SecondaryDB
class PrimaryDB(SecondaryDB): class PrimaryDB(SecondaryDB):
def __init__(self, coin, db_dir: str, reorg_limit: int = 200, def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
cache_all_tx_hashes: bool = False, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
max_open_files: int = 64, blocking_channel_ids: List[str] = None, max_open_files: int = 64, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False, enforce_integrity=True): index_address_status=False, enforce_integrity=True):
super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_tx_hashes, super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_claim_txos, cache_all_tx_hashes,
blocking_channel_ids, filtering_channel_ids, executor, index_address_status, blocking_channel_ids, filtering_channel_ids, executor, index_address_status,
enforce_integrity=enforce_integrity) enforce_integrity=enforce_integrity)
@ -35,19 +35,16 @@ class PrimaryDB(SecondaryDB):
if last_hashX: if last_hashX:
yield last_hashX yield last_hashX
def hashX_status_from_history(history: bytes) -> ResumableSHA256: def hashX_status_from_history(history: bytes) -> bytes:
tx_counts = self.tx_counts tx_counts = self.tx_counts
hist_tx_nums = array.array('I') hist_tx_nums = array.array('I')
hist_tx_nums.frombytes(history) hist_tx_nums.frombytes(history)
digest = ResumableSHA256() digest = hashlib.sha256()
digest.update( for tx_num, tx_hash in zip(
b''.join(f'{tx_hash[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'.encode()
for tx_num, tx_hash in zip(
hist_tx_nums, hist_tx_nums,
self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False) self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)):
)) digest.update(f'{tx_hash[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'.encode())
) return digest.digest()
return digest
start = time.perf_counter() start = time.perf_counter()
@ -70,24 +67,17 @@ class PrimaryDB(SecondaryDB):
hashX_cnt += 1 hashX_cnt += 1
key = prefix_db.hashX_status.pack_key(hashX) key = prefix_db.hashX_status.pack_key(hashX)
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False)) history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
digester = hashX_status_from_history(history) status = hashX_status_from_history(history)
status = digester.digest()
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False) existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
existing_digester = prefix_db.hashX_history_hasher.get(hashX) if existing_status and existing_status == status:
if not existing_status: continue
elif not existing_status:
prefix_db.stash_raw_put(key, status) prefix_db.stash_raw_put(key, status)
op_cnt += 1 op_cnt += 1
else: else:
prefix_db.stash_raw_delete(key, existing_status) prefix_db.stash_raw_delete(key, existing_status)
prefix_db.stash_raw_put(key, status) prefix_db.stash_raw_put(key, status)
op_cnt += 2 op_cnt += 2
if not existing_digester:
prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,))
op_cnt += 1
else:
prefix_db.hashX_history_hasher.stash_delete((hashX,), existing_digester)
prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,))
op_cnt += 2
if op_cnt > 100000: if op_cnt > 100000:
prefix_db.unsafe_commit() prefix_db.unsafe_commit()
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...") self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")

View file

@ -3,13 +3,14 @@ from hub.env import Env
class BlockchainEnv(Env): class BlockchainEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, blocking_channel_ids=None, filtering_channel_ids=None, prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
blocking_channel_ids=None, filtering_channel_ids=None,
db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None, db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None,
index_address_status=None, rebuild_address_status_from_height=None, index_address_status=None, rebuild_address_status_from_height=None,
daemon_ca_path=None, history_tx_cache_size=None, daemon_ca_path=None, history_tx_cache_size=None,
db_disable_integrity_checks=False): db_disable_integrity_checks=False):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
blocking_channel_ids, filtering_channel_ids, index_address_status) cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
self.db_max_open_files = db_max_open_files self.db_max_open_files = db_max_open_files
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \ self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \
@ -56,7 +57,7 @@ class BlockchainEnv(Env):
db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files, db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files,
max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit, max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit,
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes, prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
index_address_status=args.index_address_statuses, cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses,
hashX_history_cache_size=args.address_history_cache_size, hashX_history_cache_size=args.address_history_cache_size,
rebuild_address_status_from_height=args.rebuild_address_status_from_height, rebuild_address_status_from_height=args.rebuild_address_status_from_height,
daemon_ca_path=args.daemon_ca_path, history_tx_cache_size=args.history_tx_cache_size, daemon_ca_path=args.daemon_ca_path, history_tx_cache_size=args.history_tx_cache_size,

View file

@ -1,4 +1,3 @@
import os
import time import time
import asyncio import asyncio
import typing import typing
@ -12,8 +11,7 @@ from hub import PROMETHEUS_NAMESPACE
from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from hub.error.base import ChainError from hub.error.base import ChainError
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache, LFUCacheWithMetrics
from hub.common import ResumableSHA256, LFUCacheWithMetrics
from hub.scribe.db import PrimaryDB from hub.scribe.db import PrimaryDB
from hub.scribe.daemon import LBCDaemon from hub.scribe.daemon import LBCDaemon
from hub.scribe.transaction import Tx, TxOutput, TxInput, Block from hub.scribe.transaction import Tx, TxOutput, TxInput, Block
@ -139,10 +137,11 @@ class BlockchainProcessorService(BlockchainService):
def open_db(self): def open_db(self):
env = self.env env = self.env
self.db = PrimaryDB( self.db = PrimaryDB(
env.coin, env.db_dir, env.reorg_limit, cache_all_tx_hashes=env.cache_all_tx_hashes, env.coin, env.db_dir, env.reorg_limit, cache_all_claim_txos=env.cache_all_claim_txos,
max_open_files=env.db_max_open_files, blocking_channel_ids=env.blocking_channel_ids, cache_all_tx_hashes=env.cache_all_tx_hashes, max_open_files=env.db_max_open_files,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor, blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids,
index_address_status=env.index_address_status, enforce_integrity=not env.db_disable_integrity_checks executor=self._executor, index_address_status=env.index_address_status,
enforce_integrity=not env.db_disable_integrity_checks
) )
async def run_in_thread_with_lock(self, func, *args): async def run_in_thread_with_lock(self, func, *args):
@ -170,22 +169,10 @@ class BlockchainProcessorService(BlockchainService):
def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete): def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete):
self.mempool.remove(to_delete) self.mempool.remove(to_delete)
touched_hashXs = list(self.mempool.update_mempool(to_put)) touched_hashXs = self.mempool.update_mempool(to_put)
if self.env.index_address_status: if self.env.index_address_status:
status_hashers = { for hashX in touched_hashXs:
k: v.hasher if v else ResumableSHA256() for k, v in zip( self._get_update_hashX_mempool_status_ops(hashX)
touched_hashXs,
self.db.prefix_db.hashX_history_hasher.multi_get([(hashX,) for hashX in touched_hashXs])
)
}
for hashX, v in zip(
touched_hashXs,
self.db.prefix_db.hashX_mempool_status.multi_get([(hashX,) for hashX in touched_hashXs])):
if v is not None:
self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), v)
hasher = status_hashers[hashX]
hasher.update(self.mempool.mempool_history(hashX).encode())
self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (hasher.digest(),))
for tx_hash, raw_tx in to_put: for tx_hash, raw_tx in to_put:
mempool_prefix.stash_put((tx_hash,), (raw_tx,)) mempool_prefix.stash_put((tx_hash,), (raw_tx,))
for tx_hash, raw_tx in to_delete.items(): for tx_hash, raw_tx in to_delete.items():
@ -276,6 +263,9 @@ class BlockchainProcessorService(BlockchainService):
for _ in range(count): for _ in range(count):
await self.run_in_thread_with_lock(self.backup_block) await self.run_in_thread_with_lock(self.backup_block)
self.log.info(f'backed up to height {self.height:,d}') self.log.info(f'backed up to height {self.height:,d}')
if self.env.cache_all_claim_txos:
await self.db._read_claim_txos() # TODO: don't do this
await self.prefetcher.reset_height(self.height) await self.prefetcher.reset_height(self.height)
self.reorg_count_metric.inc() self.reorg_count_metric.inc()
except: except:
@ -404,6 +394,12 @@ class BlockchainProcessorService(BlockchainService):
if claim_hash not in self.updated_claim_previous_activations: if claim_hash not in self.updated_claim_previous_activations:
self.updated_claim_previous_activations[claim_hash] = activation self.updated_claim_previous_activations[claim_hash] = activation
if self.env.cache_all_claim_txos:
self.db.claim_to_txo[claim_hash] = ClaimToTXOValue(
tx_num, nout, root_tx_num, root_idx, txo.value, channel_signature_is_valid, claim_name
)
self.db.txo_to_claim[tx_num][nout] = claim_hash
pending = StagedClaimtrieItem( pending = StagedClaimtrieItem(
claim_name, normalized_name, claim_hash, txo.value, self.coin.get_expiration_height(height), tx_num, nout, claim_name, normalized_name, claim_hash, txo.value, self.coin.get_expiration_height(height), tx_num, nout,
root_tx_num, root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash root_tx_num, root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash
@ -694,6 +690,11 @@ class BlockchainProcessorService(BlockchainService):
if 0 < activation <= self.height: if 0 < activation <= self.height:
self.effective_amount_delta[claim_hash] -= spent.amount self.effective_amount_delta[claim_hash] -= spent.amount
self.future_effective_amount_delta[spent.claim_hash] -= spent.amount self.future_effective_amount_delta[spent.claim_hash] -= spent.amount
if self.env.cache_all_claim_txos:
claim_hash = self.db.txo_to_claim[txin_num].pop(nout)
if not self.db.txo_to_claim[txin_num]:
self.db.txo_to_claim.pop(txin_num)
self.db.claim_to_txo.pop(claim_hash)
if spent.reposted_claim_hash: if spent.reposted_claim_hash:
self.pending_reposted.add(spent.reposted_claim_hash) self.pending_reposted.add(spent.reposted_claim_hash)
if spent.signing_hash and spent.channel_signature_is_valid and spent.signing_hash not in self.abandoned_claims: if spent.signing_hash and spent.channel_signature_is_valid and spent.signing_hash not in self.abandoned_claims:
@ -1636,6 +1637,15 @@ class BlockchainProcessorService(BlockchainService):
self.hashX_full_cache[hashX] = history self.hashX_full_cache[hashX] = history
return history return history
def _get_update_hashX_mempool_status_ops(self, hashX: bytes):
existing = self.db.prefix_db.hashX_mempool_status.get(hashX)
if existing:
self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), existing)
history = self._get_cached_hashX_history(hashX) + self.mempool.mempool_history(hashX)
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (status,))
def advance_block(self, block: Block): def advance_block(self, block: Block):
txo_count = 0 txo_count = 0
txi_count = 0 txi_count = 0
@ -1735,7 +1745,9 @@ class BlockchainProcessorService(BlockchainService):
# update hashX history status hashes and compactify the histories # update hashX history status hashes and compactify the histories
self._get_update_hashX_histories_ops(height) self._get_update_hashX_histories_ops(height)
if self.env.index_address_status: # only compactify adddress histories and update the status index if we're already caught up,
# a bulk update will happen once catchup finishes
if not self.db.catching_up and self.env.index_address_status:
self._get_compactify_ops(height) self._get_compactify_ops(height)
self.db.last_indexed_address_status_height = height self.db.last_indexed_address_status_height = height
@ -1790,17 +1802,6 @@ class BlockchainProcessorService(BlockchainService):
) )
def _get_compactify_ops(self, height: int): def _get_compactify_ops(self, height: int):
def _rebuild_hasher(hist_tx_nums):
hasher = ResumableSHA256()
hasher.update(
b''.join(f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num)}:'.encode()
for tx_num, tx_hash in zip(
hist_tx_nums,
self.db.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)
))
)
return hasher
existing_hashX_statuses = self.db.prefix_db.hashX_status.multi_get([(hashX,) for hashX in self.hashXs_by_tx.keys()], deserialize_value=False) existing_hashX_statuses = self.db.prefix_db.hashX_status.multi_get([(hashX,) for hashX in self.hashXs_by_tx.keys()], deserialize_value=False)
if existing_hashX_statuses: if existing_hashX_statuses:
pack_key = self.db.prefix_db.hashX_status.pack_key pack_key = self.db.prefix_db.hashX_status.pack_key
@ -1815,13 +1816,6 @@ class BlockchainProcessorService(BlockchainService):
append_deletes_hashX_history = block_hashX_history_deletes.append append_deletes_hashX_history = block_hashX_history_deletes.append
block_hashX_history_puts = [] block_hashX_history_puts = []
existing_status_hashers = {
k: v.hasher if v else None for k, v in zip(
self.hashXs_by_tx,
self.db.prefix_db.hashX_history_hasher.multi_get([(hashX,) for hashX in self.hashXs_by_tx])
)
}
for (hashX, new_tx_nums), existing in zip(self.hashXs_by_tx.items(), existing_hashX_statuses): for (hashX, new_tx_nums), existing in zip(self.hashXs_by_tx.items(), existing_hashX_statuses):
new_history = [(self.pending_transactions[tx_num], height) for tx_num in new_tx_nums] new_history = [(self.pending_transactions[tx_num], height) for tx_num in new_tx_nums]
@ -1836,9 +1830,11 @@ class BlockchainProcessorService(BlockchainService):
unpack_key = self.db.prefix_db.hashX_history.unpack_key unpack_key = self.db.prefix_db.hashX_history.unpack_key
needs_compaction = False needs_compaction = False
total_hist_txs = b''
for k, hist in self.db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_key=False, for k, hist in self.db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_key=False,
deserialize_value=False): deserialize_value=False):
hist_txs = unpack_history(hist) hist_txs = unpack_history(hist)
total_hist_txs += hist
txs_extend(hist_txs) txs_extend(hist_txs)
hist_height = unpack_key(k).height hist_height = unpack_key(k).height
if height > reorg_limit and hist_height < height - reorg_limit: if height > reorg_limit and hist_height < height - reorg_limit:
@ -1857,19 +1853,27 @@ class BlockchainProcessorService(BlockchainService):
block_hashX_history_puts.append(((hashX, 0), (compact_hist_txs,))) block_hashX_history_puts.append(((hashX, 0), (compact_hist_txs,)))
if not new_history: if not new_history:
continue continue
hasher = existing_status_hashers[hashX]
if hasher is None: needed_tx_infos = []
# this is to migrate in the new column family, in the future it can be a new digester append_needed_tx_info = needed_tx_infos.append
# hasher = ResumableSHA256() tx_infos = {}
hasher = _rebuild_hasher(tx_nums) for tx_num in tx_nums:
else: cached_tx_info = self.history_tx_info_cache.get(tx_num)
self.db.prefix_db.hashX_history_hasher.stash_delete((hashX,), (hasher,)) if cached_tx_info is not None:
hasher.update(b''.join( tx_infos[tx_num] = cached_tx_info
f'{tx_hash[::-1].hex()}:{height:d}:'.encode() for tx_hash, height in new_history else:
)) append_needed_tx_info(tx_num)
self.db.prefix_db.hashX_history_hasher.stash_put((hashX,), (hasher,)) if needed_tx_infos:
status = hasher.digest() for tx_num, tx_hash in zip(needed_tx_infos, self.db._get_tx_hashes(needed_tx_infos)):
self.db.prefix_db.hashX_status.stash_put((hashX,), (status,)) tx_info = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:'
tx_infos[tx_num] = tx_info
self.history_tx_info_cache[tx_num] = tx_info
history = ''.join(map(tx_infos.__getitem__, tx_nums))
for tx_hash, height in new_history:
history += f'{tx_hash[::-1].hex()}:{height:d}:'
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_status.stash_put((hashX,), (status,))
self.db.prefix_db.multi_delete(block_hashX_history_deletes) self.db.prefix_db.multi_delete(block_hashX_history_deletes)
self.db.prefix_db.hashX_history.stash_multi_put(block_hashX_history_puts) self.db.prefix_db.hashX_history.stash_multi_put(block_hashX_history_puts)
@ -2070,8 +2074,6 @@ class BlockchainProcessorService(BlockchainService):
"""Loop forever processing blocks as they arrive.""" """Loop forever processing blocks as they arrive."""
self._caught_up_event = caught_up_event self._caught_up_event = caught_up_event
try: try:
if os.path.exists(self.db._need_restart_path):
raise RuntimeError('scribe restart is needed')
if self.height != self.daemon.cached_height() and not self.db.catching_up: if self.height != self.daemon.cached_height() and not self.db.catching_up:
await self._need_catch_up() # tell the readers that we're still catching up with lbrycrd/lbcd await self._need_catch_up() # tell the readers that we're still catching up with lbrycrd/lbcd
while not self._stopping: while not self._stopping:
@ -2133,6 +2135,9 @@ class BlockchainProcessorService(BlockchainService):
async def _finished_initial_catch_up(self): async def _finished_initial_catch_up(self):
self.log.info(f'caught up to height {self.height}') self.log.info(f'caught up to height {self.height}')
if self.env.index_address_status and self.db.last_indexed_address_status_height < self.db.db_height:
await self.db.rebuild_hashX_status_index(self.db.last_indexed_address_status_height)
# Flush everything but with catching_up->False state. # Flush everything but with catching_up->False state.
self.db.catching_up = False self.db.catching_up = False

View file

@ -37,7 +37,7 @@ class BlockchainService:
def open_db(self): def open_db(self):
env = self.env env = self.env
self.db = SecondaryDB( self.db = SecondaryDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids, env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor, filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status index_address_status=env.index_address_status

View file

@ -33,7 +33,7 @@ setup(
'certifi>=2021.10.08', 'certifi>=2021.10.08',
'colorama==0.3.7', 'colorama==0.3.7',
'cffi==1.13.2', 'cffi==1.13.2',
'protobuf==3.18.3', 'protobuf==3.17.2',
'msgpack==0.6.1', 'msgpack==0.6.1',
'prometheus_client==0.7.1', 'prometheus_client==0.7.1',
'coincurve==15.0.0', 'coincurve==15.0.0',
@ -44,8 +44,7 @@ setup(
'filetype==1.0.9', 'filetype==1.0.9',
'grpcio==1.38.0', 'grpcio==1.38.0',
'lbry-rocksdb==0.8.2', 'lbry-rocksdb==0.8.2',
'ujson==5.4.0', 'ujson==5.4.0'
'rehash==1.0.0'
], ],
extras_require={ extras_require={
'lint': ['pylint==2.10.0'], 'lint': ['pylint==2.10.0'],