Compare commits
8 commits
master
...
moodyjon/i
Author | SHA1 | Date | |
---|---|---|---|
|
d495ce9f0a | ||
|
8794ff48e0 | ||
|
fa0d03fe95 | ||
|
14f2f3b55b | ||
|
9c43c811a1 | ||
|
6c037b29b5 | ||
|
252a1aa165 | ||
|
f370e263b5 |
22 changed files with 343 additions and 285 deletions
|
@ -17,7 +17,7 @@ services:
|
|||
- "lbry_rocksdb:/database"
|
||||
environment:
|
||||
- HUB_COMMAND=scribe
|
||||
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1312050/lbry-rocksdb.tar
|
||||
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1256013/lbry-rocksdb.tar
|
||||
command: # for full options, see `scribe --help`
|
||||
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
|
||||
- "--max_query_workers=2"
|
||||
|
|
|
@ -14,7 +14,7 @@ services:
|
|||
- "lbry_rocksdb:/database"
|
||||
environment:
|
||||
- HUB_COMMAND=scribe
|
||||
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1312050/lbry-rocksdb.tar
|
||||
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1256013/lbry-rocksdb.tar
|
||||
command:
|
||||
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
|
||||
- "--max_query_workers=2"
|
||||
|
|
102
hub/common.py
102
hub/common.py
|
@ -5,16 +5,15 @@ import hmac
|
|||
import ipaddress
|
||||
import logging
|
||||
import logging.handlers
|
||||
import socket
|
||||
import typing
|
||||
import collections
|
||||
from ctypes import cast, memmove, POINTER, c_void_p
|
||||
from bisect import insort_right
|
||||
from collections import deque
|
||||
from decimal import Decimal
|
||||
from typing import Iterable, Deque
|
||||
from asyncio import get_event_loop, Event
|
||||
from prometheus_client import Counter
|
||||
from rehash.structs import EVPobject
|
||||
from hub.schema.tags import clean_tags
|
||||
from hub.schema.url import normalize_name
|
||||
from hub.error import TooManyClaimSearchParametersError
|
||||
|
@ -155,6 +154,38 @@ def protocol_version(client_req, min_tuple, max_tuple):
|
|||
return result, client_min
|
||||
|
||||
|
||||
async def resolve_host(url: str, port: int, proto: str,
|
||||
family: int = socket.AF_INET, all_results: bool = False) \
|
||||
-> typing.Union[str, typing.List[str]]:
|
||||
if proto not in ['udp', 'tcp']:
|
||||
raise Exception("invalid protocol")
|
||||
try:
|
||||
if ipaddress.ip_address(url):
|
||||
return [url] if all_results else url
|
||||
except ValueError:
|
||||
pass
|
||||
loop = asyncio.get_running_loop()
|
||||
records = await loop.getaddrinfo(
|
||||
url, port,
|
||||
proto=socket.IPPROTO_TCP if proto == 'tcp' else socket.IPPROTO_UDP,
|
||||
type=socket.SOCK_STREAM if proto == 'tcp' else socket.SOCK_DGRAM,
|
||||
family=family,
|
||||
)
|
||||
def addr_not_ipv4_mapped(rec):
|
||||
_, _, _, _, sockaddr = rec
|
||||
ipaddr = ipaddress.ip_address(sockaddr[0])
|
||||
return ipaddr.version != 6 or not ipaddr.ipv4_mapped
|
||||
records = filter(addr_not_ipv4_mapped, records)
|
||||
results = [sockaddr[0] for fam, type, prot, canonname, sockaddr in records]
|
||||
if not results and not all_results:
|
||||
raise socket.gaierror(
|
||||
socket.EAI_ADDRFAMILY,
|
||||
'The specified network host does not have any network '
|
||||
'addresses in the requested address family'
|
||||
)
|
||||
return results if all_results else results[0]
|
||||
|
||||
|
||||
class LRUCacheWithMetrics:
|
||||
__slots__ = [
|
||||
'capacity',
|
||||
|
@ -579,11 +610,13 @@ IPV4_TO_6_RELAY_SUBNET = ipaddress.ip_network('192.88.99.0/24')
|
|||
def is_valid_public_ipv4(address, allow_localhost: bool = False, allow_lan: bool = False):
|
||||
try:
|
||||
parsed_ip = ipaddress.ip_address(address)
|
||||
if parsed_ip.is_loopback and allow_localhost:
|
||||
return True
|
||||
if allow_lan and parsed_ip.is_private:
|
||||
return True
|
||||
if any((parsed_ip.version != 4, parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback,
|
||||
if parsed_ip.version != 4:
|
||||
return False
|
||||
if parsed_ip.is_loopback:
|
||||
return allow_localhost
|
||||
if parsed_ip.is_private:
|
||||
return allow_lan
|
||||
if any((parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback,
|
||||
parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private)):
|
||||
return False
|
||||
else:
|
||||
|
@ -592,6 +625,23 @@ def is_valid_public_ipv4(address, allow_localhost: bool = False, allow_lan: bool
|
|||
except (ipaddress.AddressValueError, ValueError):
|
||||
return False
|
||||
|
||||
def is_valid_public_ipv6(address, allow_localhost: bool = False, allow_lan: bool = False):
|
||||
try:
|
||||
parsed_ip = ipaddress.ip_address(address)
|
||||
if parsed_ip.version != 6:
|
||||
return False
|
||||
if parsed_ip.is_loopback:
|
||||
return allow_localhost
|
||||
if parsed_ip.is_private:
|
||||
return allow_lan
|
||||
return not any((parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback,
|
||||
parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private,
|
||||
parsed_ip.ipv4_mapped))
|
||||
except (ipaddress.AddressValueError, ValueError):
|
||||
return False
|
||||
|
||||
def is_valid_public_ip(address, **kwargs):
|
||||
return is_valid_public_ipv6(address, **kwargs) or is_valid_public_ipv4(address, **kwargs)
|
||||
|
||||
def sha256(x):
|
||||
"""Simple wrapper of hashlib sha256."""
|
||||
|
@ -1061,41 +1111,3 @@ async def asyncify_for_loop(gen, ticks_per_sleep: int = 1000):
|
|||
yield item
|
||||
if cnt % ticks_per_sleep == 0:
|
||||
await async_sleep(0)
|
||||
|
||||
|
||||
_SHA256_DIGEST_STATE_SIZE = 120
|
||||
|
||||
|
||||
class ResumableSHA256:
|
||||
__slots__ = ['_hasher']
|
||||
|
||||
def __init__(self, state: typing.Optional[bytes] = None):
|
||||
self._hasher = hashlib.sha256()
|
||||
if state is not None:
|
||||
ctx = self._get_evp_md_ctx()
|
||||
ctx_size = ctx.digest.contents.ctx_size
|
||||
if len(state) != _SHA256_DIGEST_STATE_SIZE != ctx_size:
|
||||
raise Exception(f'invalid sha256 digester state, got {len(state)} bytes')
|
||||
memmove(ctx.md_data, state, ctx_size)
|
||||
|
||||
def _get_evp_md_ctx(self):
|
||||
c_evp_obj = cast(c_void_p(id(self._hasher)), POINTER(EVPobject))
|
||||
if hasattr(c_evp_obj.contents.ctx, "contents"):
|
||||
return c_evp_obj.contents.ctx.contents
|
||||
else:
|
||||
return c_evp_obj.contents.ctx
|
||||
|
||||
def get_state(self) -> bytes:
|
||||
ctx = self._get_evp_md_ctx()
|
||||
ctx_size = ctx.digest.contents.ctx_size
|
||||
hasher_state = ctx.md_data[:ctx_size]
|
||||
return hasher_state
|
||||
|
||||
def __copy__(self):
|
||||
return ResumableSHA256(self.get_state())
|
||||
|
||||
def update(self, data: bytes):
|
||||
self._hasher.update(data)
|
||||
|
||||
def digest(self):
|
||||
return self._hasher.digest()
|
||||
|
|
|
@ -51,7 +51,6 @@ class DB_PREFIXES(enum.Enum):
|
|||
reposted_count = b'j'
|
||||
effective_amount = b'i'
|
||||
future_effective_amount = b'k'
|
||||
hashX_history_hash = b'l'
|
||||
|
||||
|
||||
COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass
|
||||
|
|
52
hub/db/db.py
52
hub/db/db.py
|
@ -37,7 +37,7 @@ class SecondaryDB:
|
|||
DB_VERSIONS = [7, 8, 9, 10, 11, 12]
|
||||
|
||||
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
|
||||
cache_all_tx_hashes: bool = False,
|
||||
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||
blocking_channel_ids: List[str] = None,
|
||||
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768,
|
||||
|
@ -47,9 +47,9 @@ class SecondaryDB:
|
|||
self._executor = executor
|
||||
self._db_dir = db_dir
|
||||
self._reorg_limit = reorg_limit
|
||||
self._cache_all_claim_txos = cache_all_claim_txos
|
||||
self._cache_all_tx_hashes = cache_all_tx_hashes
|
||||
self._secondary_name = secondary_name
|
||||
self._need_restart_path = os.path.join(self._db_dir, 'NEED_SCRIBE_RESTART')
|
||||
if secondary_name:
|
||||
assert max_open_files == -1, 'max open files must be -1 for secondary readers'
|
||||
self._db_max_open_files = max_open_files
|
||||
|
@ -100,6 +100,9 @@ class SecondaryDB:
|
|||
self.total_transactions: List[bytes] = []
|
||||
self.tx_num_mapping: Dict[bytes, int] = {}
|
||||
|
||||
# these are only used if the cache_all_claim_txos setting is on
|
||||
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
|
||||
self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict)
|
||||
self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH)
|
||||
|
||||
def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]:
|
||||
|
@ -953,6 +956,21 @@ class SecondaryDB:
|
|||
else:
|
||||
assert self.db_tx_count == 0
|
||||
|
||||
async def _read_claim_txos(self):
|
||||
def read_claim_txos():
|
||||
set_claim_to_txo = self.claim_to_txo.__setitem__
|
||||
for k, v in self.prefix_db.claim_to_txo.iterate(fill_cache=False):
|
||||
set_claim_to_txo(k.claim_hash, v)
|
||||
self.txo_to_claim[v.tx_num][v.position] = k.claim_hash
|
||||
|
||||
self.claim_to_txo.clear()
|
||||
self.txo_to_claim.clear()
|
||||
start = time.perf_counter()
|
||||
self.logger.info("loading claims")
|
||||
await asyncio.get_event_loop().run_in_executor(self._executor, read_claim_txos)
|
||||
ts = time.perf_counter() - start
|
||||
self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4))
|
||||
|
||||
# async def _read_headers(self):
|
||||
# # if self.headers is not None:
|
||||
# # return
|
||||
|
@ -1001,27 +1019,6 @@ class SecondaryDB:
|
|||
secondary_path = '' if not self._secondary_name else os.path.join(
|
||||
self._db_dir, self._secondary_name
|
||||
)
|
||||
open_db_canary = None
|
||||
|
||||
if self._secondary_name:
|
||||
open_db_canary = os.path.join(self._db_dir, f'{self._secondary_name}-db-canary')
|
||||
if os.path.exists(open_db_canary):
|
||||
with open(self._need_restart_path, 'w+') as f:
|
||||
f.write(f"{time.strftime(f'%Y-%m-%d %H:%M:%S')} {self._secondary_name}\n")
|
||||
raise RuntimeError('scribe restart is needed')
|
||||
else:
|
||||
with open(open_db_canary, 'w'):
|
||||
pass
|
||||
else:
|
||||
herald_db_canary = os.path.join(self._db_dir, 'lbry-reader-db-canary')
|
||||
es_sync_db_canary = os.path.join(self._db_dir, 'lbry-elastic-writer-db-canary')
|
||||
if os.path.exists(herald_db_canary):
|
||||
os.remove(herald_db_canary)
|
||||
if os.path.exists(es_sync_db_canary):
|
||||
os.remove(es_sync_db_canary)
|
||||
if os.path.exists(self._need_restart_path):
|
||||
os.remove(self._need_restart_path)
|
||||
|
||||
db_path = os.path.join(self._db_dir, 'lbry-rocksdb')
|
||||
self.prefix_db = PrefixDB(
|
||||
db_path, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files,
|
||||
|
@ -1030,7 +1027,6 @@ class SecondaryDB:
|
|||
)
|
||||
|
||||
if secondary_path != '':
|
||||
os.remove(open_db_canary)
|
||||
self.logger.info(f'opened db for read only: lbry-rocksdb (%s)', db_path)
|
||||
else:
|
||||
self.logger.info(f'opened db for writing: lbry-rocksdb (%s)', db_path)
|
||||
|
@ -1067,6 +1063,8 @@ class SecondaryDB:
|
|||
async def initialize_caches(self):
|
||||
await self._read_tx_counts()
|
||||
await self._read_block_hashes()
|
||||
if self._cache_all_claim_txos:
|
||||
await self._read_claim_txos()
|
||||
if self._cache_all_tx_hashes:
|
||||
await self._read_tx_hashes()
|
||||
if self.db_height > 0:
|
||||
|
@ -1156,9 +1154,15 @@ class SecondaryDB:
|
|||
}
|
||||
|
||||
def get_cached_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]:
|
||||
if self._cache_all_claim_txos:
|
||||
return self.claim_to_txo.get(claim_hash)
|
||||
return self.prefix_db.claim_to_txo.get_pending(claim_hash)
|
||||
|
||||
def get_cached_claim_hash(self, tx_num: int, position: int) -> Optional[bytes]:
|
||||
if self._cache_all_claim_txos:
|
||||
if tx_num not in self.txo_to_claim:
|
||||
return
|
||||
return self.txo_to_claim[tx_num].get(position, None)
|
||||
v = self.prefix_db.txo_to_claim.get_pending(tx_num, position)
|
||||
return None if not v else v.claim_hash
|
||||
|
||||
|
|
|
@ -273,7 +273,6 @@ class BasePrefixDB:
|
|||
undo_c_f = self.column_families[DB_PREFIXES.undo.value]
|
||||
undo_info = self._db.get((undo_c_f, undo_key))
|
||||
self._op_stack.apply_packed_undo_ops(undo_info)
|
||||
self._op_stack.validate_and_apply_stashed_ops()
|
||||
try:
|
||||
with self._db.write_batch(sync=True) as batch:
|
||||
batch_put = batch.put
|
||||
|
|
|
@ -3,7 +3,6 @@ import struct
|
|||
import array
|
||||
import base64
|
||||
from typing import Union, Tuple, NamedTuple, Optional
|
||||
from hub.common import ResumableSHA256
|
||||
from hub.db.common import DB_PREFIXES
|
||||
from hub.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow
|
||||
from hub.schema.url import normalize_name
|
||||
|
@ -1852,46 +1851,6 @@ class FutureEffectiveAmountPrefixRow(PrefixRow):
|
|||
return cls.pack_key(claim_hash), cls.pack_value(future_effective_amount)
|
||||
|
||||
|
||||
class HashXHistoryHasherKey(NamedTuple):
|
||||
hashX: bytes
|
||||
|
||||
|
||||
class HashXHistoryHasherValue(NamedTuple):
|
||||
hasher: ResumableSHA256
|
||||
|
||||
|
||||
class HashXHistoryHasherPrefixRow(PrefixRow):
|
||||
prefix = DB_PREFIXES.hashX_history_hash.value
|
||||
key_struct = struct.Struct(b'>11s')
|
||||
value_struct = struct.Struct(b'>120s')
|
||||
cache_size = 1024 * 1024 * 64
|
||||
|
||||
key_part_lambdas = [
|
||||
lambda: b'',
|
||||
struct.Struct(b'>11s').pack
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, hashX: bytes):
|
||||
return super().pack_key(hashX)
|
||||
|
||||
@classmethod
|
||||
def unpack_key(cls, key: bytes) -> HashXHistoryHasherKey:
|
||||
return HashXHistoryHasherKey(*super().unpack_key(key))
|
||||
|
||||
@classmethod
|
||||
def pack_value(cls, hasher: ResumableSHA256) -> bytes:
|
||||
return super().pack_value(hasher.get_state())
|
||||
|
||||
@classmethod
|
||||
def unpack_value(cls, data: bytes) -> HashXHistoryHasherValue:
|
||||
return HashXHistoryHasherValue(ResumableSHA256(*super().unpack_value(data)))
|
||||
|
||||
@classmethod
|
||||
def pack_item(cls, hashX: bytes, hasher: ResumableSHA256):
|
||||
return cls.pack_key(hashX), cls.pack_value(hasher)
|
||||
|
||||
|
||||
class PrefixDB(BasePrefixDB):
|
||||
def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64,
|
||||
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None,
|
||||
|
@ -1938,7 +1897,6 @@ class PrefixDB(BasePrefixDB):
|
|||
self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack)
|
||||
self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack)
|
||||
self.future_effective_amount = FutureEffectiveAmountPrefixRow(db, self._op_stack)
|
||||
self.hashX_history_hasher = HashXHistoryHasherPrefixRow(db, self._op_stack)
|
||||
|
||||
|
||||
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:
|
||||
|
|
|
@ -160,7 +160,7 @@ class RevertableOpStack:
|
|||
# there is a value and we're not deleting it in this op
|
||||
# check that a delete for the stored value is in the stack
|
||||
raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}\nvs\n{stored_val}")
|
||||
elif not has_stored_val:
|
||||
elif not stored_val:
|
||||
raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
|
||||
elif stored_val != op.value:
|
||||
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
|
||||
|
@ -324,21 +324,9 @@ class RevertableOpStack:
|
|||
"""
|
||||
Unpack and apply a sequence of undo ops from serialized undo bytes
|
||||
"""
|
||||
offset = 0
|
||||
packed_size = len(packed)
|
||||
while offset < packed_size:
|
||||
is_put, key_len, val_len = _OP_STRUCT.unpack(packed[offset:offset + 9])
|
||||
offset += 9
|
||||
key = packed[offset:offset + key_len]
|
||||
offset += key_len
|
||||
value = packed[offset:offset + val_len]
|
||||
offset += val_len
|
||||
if is_put == 1:
|
||||
op = RevertablePut(key, value)
|
||||
else:
|
||||
op = RevertableDelete(key, value)
|
||||
self._stash.append(op)
|
||||
self._stashed_last_op_for_key[op.key] = op
|
||||
while packed:
|
||||
op, packed = RevertableOp.unpack(packed)
|
||||
self.append_op(op)
|
||||
|
||||
def get_pending_op(self, key: bytes) -> Optional[RevertableOp]:
|
||||
if key in self._stashed_last_op_for_key:
|
||||
|
|
|
@ -9,11 +9,11 @@ from hub.db.common import ResolveResult
|
|||
|
||||
class ElasticSyncDB(SecondaryDB):
|
||||
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
|
||||
cache_all_tx_hashes: bool = False,
|
||||
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||
blocking_channel_ids: List[str] = None,
|
||||
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||
index_address_status=False):
|
||||
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit,
|
||||
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
|
||||
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
|
||||
index_address_status)
|
||||
self.block_timestamp_cache = LRUCache(1024)
|
||||
|
|
|
@ -3,11 +3,11 @@ from hub.env import Env
|
|||
|
||||
class ElasticEnv(Env):
|
||||
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None,
|
||||
cache_all_tx_hashes=None, elastic_host=None, elastic_port=None,
|
||||
cache_all_tx_hashes=None, cache_all_claim_txos=None, elastic_host=None, elastic_port=None,
|
||||
es_index_prefix=None, elastic_notifier_host=None, elastic_notifier_port=None,
|
||||
blocking_channel_ids=None, filtering_channel_ids=None, reindex=False):
|
||||
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
|
||||
blocking_channel_ids, filtering_channel_ids)
|
||||
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids)
|
||||
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
|
||||
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
|
||||
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default(
|
||||
|
@ -43,7 +43,7 @@ class ElasticEnv(Env):
|
|||
elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain,
|
||||
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit,
|
||||
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
|
||||
blocking_channel_ids=args.blocking_channel_ids,
|
||||
cache_all_claim_txos=args.cache_all_claim_txos, blocking_channel_ids=args.blocking_channel_ids,
|
||||
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
|
||||
elastic_notifier_port=args.elastic_notifier_port
|
||||
)
|
||||
|
|
|
@ -49,7 +49,7 @@ class ElasticSyncService(BlockchainReaderService):
|
|||
def open_db(self):
|
||||
env = self.env
|
||||
self.db = ElasticSyncDB(
|
||||
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
|
||||
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
|
||||
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
|
||||
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
||||
index_address_status=env.index_address_status
|
||||
|
|
12
hub/env.py
12
hub/env.py
|
@ -30,7 +30,7 @@ class Env:
|
|||
pass
|
||||
|
||||
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
|
||||
prometheus_port=None, cache_all_tx_hashes=None,
|
||||
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
|
||||
blocking_channel_ids=None, filtering_channel_ids=None, index_address_status=None):
|
||||
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
@ -46,6 +46,7 @@ class Env:
|
|||
self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
|
||||
self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0)
|
||||
self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False)
|
||||
self.cache_all_claim_txos = cache_all_claim_txos if cache_all_claim_txos is not None else self.boolean('CACHE_ALL_CLAIM_TXOS', False)
|
||||
# Filtering / Blocking
|
||||
self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default(
|
||||
'BLOCKING_CHANNEL_IDS', '').split(' ')
|
||||
|
@ -116,10 +117,6 @@ class Env:
|
|||
result = [part.strip() for part in host.split(',')]
|
||||
if len(result) == 1:
|
||||
result = result[0]
|
||||
if result == 'localhost':
|
||||
# 'localhost' resolves to ::1 (ipv6) on many systems, which fails on default setup of
|
||||
# docker, using 127.0.0.1 instead forces ipv4
|
||||
result = '127.0.0.1'
|
||||
return result
|
||||
|
||||
def sane_max_sessions(self):
|
||||
|
@ -170,6 +167,11 @@ class Env:
|
|||
"resolve, transaction fetching, and block sync all faster at the expense of higher "
|
||||
"memory usage (at least 10GB more). Can be set in env with 'CACHE_ALL_TX_HASHES'.",
|
||||
default=cls.boolean('CACHE_ALL_TX_HASHES', False))
|
||||
parser.add_argument('--cache_all_claim_txos', action='store_true',
|
||||
help="Load all claim txos into memory. This will make address subscriptions and sync, "
|
||||
"resolve, transaction fetching, and block sync all faster at the expense of higher "
|
||||
"memory usage. Can be set in env with 'CACHE_ALL_CLAIM_TXOS'.",
|
||||
default=cls.boolean('CACHE_ALL_CLAIM_TXOS', False))
|
||||
parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0),
|
||||
help="Port for prometheus metrics to listen on, disabled by default. "
|
||||
"Can be set in env with 'PROMETHEUS_PORT'.")
|
||||
|
|
|
@ -6,11 +6,11 @@ from hub.db import SecondaryDB
|
|||
|
||||
class HeraldDB(SecondaryDB):
|
||||
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
|
||||
cache_all_tx_hashes: bool = False,
|
||||
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||
blocking_channel_ids: List[str] = None,
|
||||
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768):
|
||||
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit,
|
||||
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
|
||||
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
|
||||
index_address_status, merkle_cache_size, tx_cache_size)
|
||||
# self.headers = None
|
||||
|
|
|
@ -19,7 +19,7 @@ def parse_es_services(elastic_services_arg: str):
|
|||
|
||||
class ServerEnv(Env):
|
||||
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
|
||||
prometheus_port=None, cache_all_tx_hashes=None,
|
||||
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
|
||||
daemon_url=None, host=None, elastic_services=None, es_index_prefix=None,
|
||||
tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None,
|
||||
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
|
||||
|
@ -29,7 +29,7 @@ class ServerEnv(Env):
|
|||
merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None,
|
||||
history_tx_cache_size=None, largest_address_history_cache_size=None):
|
||||
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
|
||||
blocking_channel_ids, filtering_channel_ids, index_address_status)
|
||||
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
|
||||
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
|
||||
self.host = host if host is not None else self.default('HOST', 'localhost')
|
||||
self.elastic_services = deque(parse_es_services(elastic_services or 'localhost:9200/localhost:19080'))
|
||||
|
@ -153,7 +153,7 @@ class ServerEnv(Env):
|
|||
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
|
||||
udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file,
|
||||
allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes,
|
||||
country=args.country, payment_address=args.payment_address,
|
||||
cache_all_claim_txos=args.cache_all_claim_txos, country=args.country, payment_address=args.payment_address,
|
||||
donation_address=args.donation_address, max_send=args.max_send, max_receive=args.max_receive,
|
||||
max_sessions=args.max_sessions, session_timeout=args.session_timeout,
|
||||
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import errno
|
||||
import time
|
||||
import typing
|
||||
import asyncio
|
||||
|
@ -53,7 +54,7 @@ class HubServerService(BlockchainReaderService):
|
|||
def open_db(self):
|
||||
env = self.env
|
||||
self.db = HeraldDB(
|
||||
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
|
||||
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
|
||||
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
|
||||
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
||||
index_address_status=env.index_address_status, merkle_cache_size=env.merkle_cache_size,
|
||||
|
@ -170,10 +171,20 @@ class HubServerService(BlockchainReaderService):
|
|||
|
||||
async def start_status_server(self):
|
||||
if self.env.udp_port and int(self.env.udp_port):
|
||||
await self.status_server.start(
|
||||
0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country,
|
||||
self.env.host, self.env.udp_port, self.env.allow_lan_udp
|
||||
)
|
||||
hosts = self.env.cs_host()
|
||||
started = False
|
||||
while not started:
|
||||
try:
|
||||
await self.status_server.start(
|
||||
0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country,
|
||||
hosts, self.env.udp_port, self.env.allow_lan_udp
|
||||
)
|
||||
started = True
|
||||
except OSError as e:
|
||||
if e.errno is errno.EADDRINUSE:
|
||||
await asyncio.sleep(3)
|
||||
continue
|
||||
raise
|
||||
|
||||
def _iter_start_tasks(self):
|
||||
yield self.start_status_server()
|
||||
|
|
|
@ -271,7 +271,8 @@ class SessionManager:
|
|||
f'{host}:{port:d} : {e!r}')
|
||||
raise
|
||||
else:
|
||||
self.logger.info(f'{kind} server listening on {host}:{port:d}')
|
||||
for s in self.servers[kind].sockets:
|
||||
self.logger.info(f'{kind} server listening on {s.getsockname()[:2]}')
|
||||
|
||||
async def _start_external_servers(self):
|
||||
"""Start listening on TCP and SSL ports, but only if the respective
|
||||
|
|
|
@ -1,10 +1,18 @@
|
|||
import asyncio
|
||||
import ipaddress
|
||||
import socket
|
||||
import struct
|
||||
from time import perf_counter
|
||||
import logging
|
||||
from typing import Optional, Tuple, NamedTuple
|
||||
from typing import Optional, Tuple, NamedTuple, List, Union
|
||||
from hub.schema.attrs import country_str_to_int, country_int_to_str
|
||||
from hub.common import LRUCache, is_valid_public_ipv4
|
||||
from hub.common import (
|
||||
LRUCache,
|
||||
resolve_host,
|
||||
is_valid_public_ip,
|
||||
is_valid_public_ipv4,
|
||||
is_valid_public_ipv6,
|
||||
)
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
@ -36,48 +44,75 @@ class SPVPing(NamedTuple):
|
|||
return decoded
|
||||
|
||||
|
||||
PONG_ENCODING = b'!BBL32s4sH'
|
||||
|
||||
PONG_ENCODING_PRE = b'!BBL32s'
|
||||
PONG_ENCODING_POST = b'!H'
|
||||
|
||||
class SPVPong(NamedTuple):
|
||||
protocol_version: int
|
||||
flags: int
|
||||
height: int
|
||||
tip: bytes
|
||||
source_address_raw: bytes
|
||||
ipaddr: Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
|
||||
country: int
|
||||
|
||||
FLAG_AVAILABLE = 0b00000001
|
||||
FLAG_IPV6 = 0b00000010
|
||||
|
||||
def encode(self):
|
||||
return struct.pack(PONG_ENCODING, *self)
|
||||
return (struct.pack(PONG_ENCODING_PRE, self.protocol_version, self.flags, self.height, self.tip) +
|
||||
self.encode_address(self.ipaddr) +
|
||||
struct.pack(PONG_ENCODING_POST, self.country))
|
||||
|
||||
@staticmethod
|
||||
def encode_address(address: str):
|
||||
return bytes(int(b) for b in address.split("."))
|
||||
def encode_address(address: Union[str, ipaddress.IPv4Address, ipaddress.IPv6Address]):
|
||||
if not isinstance(address, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
|
||||
address = ipaddress.ip_address(address)
|
||||
return address.packed
|
||||
|
||||
@classmethod
|
||||
def make(cls, flags: int, height: int, tip: bytes, source_address: str, country: str) -> bytes:
|
||||
ipaddr = ipaddress.ip_address(source_address)
|
||||
flags = (flags | cls.FLAG_IPV6) if ipaddr.version == 6 else (flags & ~cls.FLAG_IPV6)
|
||||
return SPVPong(
|
||||
PROTOCOL_VERSION, flags, height, tip,
|
||||
cls.encode_address(source_address),
|
||||
ipaddr,
|
||||
country_str_to_int(country)
|
||||
).encode()
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def make_sans_source_address(cls, flags: int, height: int, tip: bytes, country: str) -> Tuple[bytes, bytes]:
|
||||
pong = cls.make(flags, height, tip, '0.0.0.0', country)
|
||||
return pong[:38], pong[42:]
|
||||
pong = pong.encode()
|
||||
return pong[0:1], pong[2:38], pong[42:]
|
||||
|
||||
@classmethod
|
||||
def decode(cls, packet: bytes):
|
||||
return cls(*struct.unpack(PONG_ENCODING, packet[:44]))
|
||||
offset = 0
|
||||
protocol_version, flags, height, tip = struct.unpack(PONG_ENCODING_PRE, packet[offset:offset+38])
|
||||
offset += 38
|
||||
if flags & cls.FLAG_IPV6:
|
||||
addr_len = ipaddress.IPV6LENGTH // 8
|
||||
ipaddr = ipaddress.ip_address(packet[offset:offset+addr_len])
|
||||
offset += addr_len
|
||||
else:
|
||||
addr_len = ipaddress.IPV4LENGTH // 8
|
||||
ipaddr = ipaddress.ip_address(packet[offset:offset+addr_len])
|
||||
offset += addr_len
|
||||
country, = struct.unpack(PONG_ENCODING_POST, packet[offset:offset+2])
|
||||
offset += 2
|
||||
return cls(protocol_version, flags, height, tip, ipaddr, country)
|
||||
|
||||
@property
|
||||
def available(self) -> bool:
|
||||
return (self.flags & 0b00000001) > 0
|
||||
return (self.flags & self.FLAG_AVAILABLE) > 0
|
||||
|
||||
@property
|
||||
def ipv6(self) -> bool:
|
||||
return (self.flags & self.FLAG_IPV6) > 0
|
||||
|
||||
@property
|
||||
def ip_address(self) -> str:
|
||||
return ".".join(map(str, self.source_address_raw))
|
||||
return self.ipaddr.compressed
|
||||
|
||||
@property
|
||||
def country_name(self):
|
||||
|
@ -94,7 +129,8 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol):
|
|||
def __init__(
|
||||
self, height: int, tip: bytes, country: str,
|
||||
throttle_cache_size: int = 1024, throttle_reqs_per_sec: int = 10,
|
||||
allow_localhost: bool = False, allow_lan: bool = False
|
||||
allow_localhost: bool = False, allow_lan: bool = False,
|
||||
is_valid_ip = is_valid_public_ip,
|
||||
):
|
||||
super().__init__()
|
||||
self.transport: Optional[asyncio.transports.DatagramTransport] = None
|
||||
|
@ -102,26 +138,27 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol):
|
|||
self._tip = tip
|
||||
self._flags = 0
|
||||
self._country = country
|
||||
self._left_cache = self._right_cache = None
|
||||
self._cache0 = self._cache1 = self.cache2 = None
|
||||
self.update_cached_response()
|
||||
self._throttle = LRUCache(throttle_cache_size)
|
||||
self._should_log = LRUCache(throttle_cache_size)
|
||||
self._min_delay = 1 / throttle_reqs_per_sec
|
||||
self._allow_localhost = allow_localhost
|
||||
self._allow_lan = allow_lan
|
||||
self._is_valid_ip = is_valid_ip
|
||||
self.closed = asyncio.Event()
|
||||
|
||||
def update_cached_response(self):
|
||||
self._left_cache, self._right_cache = SPVPong.make_sans_source_address(
|
||||
self._cache0, self._cache1, self._cache2 = SPVPong.make_sans_source_address(
|
||||
self._flags, max(0, self._height), self._tip, self._country
|
||||
)
|
||||
|
||||
def set_unavailable(self):
|
||||
self._flags &= 0b11111110
|
||||
self._flags &= ~SPVPong.FLAG_AVAILABLE
|
||||
self.update_cached_response()
|
||||
|
||||
def set_available(self):
|
||||
self._flags |= 0b00000001
|
||||
self._flags |= SPVPong.FLAG_AVAILABLE
|
||||
self.update_cached_response()
|
||||
|
||||
def set_height(self, height: int, tip: bytes):
|
||||
|
@ -141,17 +178,25 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol):
|
|||
return False
|
||||
|
||||
def make_pong(self, host):
|
||||
return self._left_cache + SPVPong.encode_address(host) + self._right_cache
|
||||
ipaddr = ipaddress.ip_address(host)
|
||||
if ipaddr.version == 6:
|
||||
flags = self._flags | SPVPong.FLAG_IPV6
|
||||
else:
|
||||
flags = self._flags & ~SPVPong.FLAG_IPV6
|
||||
return (self._cache0 + flags.to_bytes(1, 'big') +
|
||||
self._cache1 + SPVPong.encode_address(ipaddr) +
|
||||
self._cache2)
|
||||
|
||||
def datagram_received(self, data: bytes, addr: Tuple[str, int]):
|
||||
def datagram_received(self, data: bytes, addr: Union[Tuple[str, int], Tuple[str, int, int, int]]):
|
||||
if self.should_throttle(addr[0]):
|
||||
# print(f"throttled: {addr}")
|
||||
return
|
||||
try:
|
||||
SPVPing.decode(data)
|
||||
except (ValueError, struct.error, AttributeError, TypeError):
|
||||
# log.exception("derp")
|
||||
return
|
||||
if addr[1] >= 1024 and is_valid_public_ipv4(
|
||||
if addr[1] >= 1024 and self._is_valid_ip(
|
||||
addr[0], allow_localhost=self._allow_localhost, allow_lan=self._allow_lan):
|
||||
self.transport.sendto(self.make_pong(addr[0]), addr)
|
||||
else:
|
||||
|
@ -174,39 +219,78 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol):
|
|||
|
||||
class StatusServer:
|
||||
def __init__(self):
|
||||
self._protocol: Optional[SPVServerStatusProtocol] = None
|
||||
self._protocols: List[SPVServerStatusProtocol] = []
|
||||
|
||||
async def start(self, height: int, tip: bytes, country: str, interface: str, port: int, allow_lan: bool = False):
|
||||
if self.is_running:
|
||||
return
|
||||
loop = asyncio.get_event_loop()
|
||||
interface = interface if interface.lower() != 'localhost' else '127.0.0.1'
|
||||
self._protocol = SPVServerStatusProtocol(
|
||||
height, tip, country, allow_localhost=interface == '127.0.0.1', allow_lan=allow_lan
|
||||
)
|
||||
await loop.create_datagram_endpoint(lambda: self._protocol, (interface, port))
|
||||
log.info("started udp status server on %s:%i", interface, port)
|
||||
async def _start(self, height: int, tip: bytes, country: str, addr: str, port: int, allow_lan: bool = False):
|
||||
ipaddr = ipaddress.ip_address(addr)
|
||||
if ipaddr.version == 4:
|
||||
proto = SPVServerStatusProtocol(
|
||||
height, tip, country,
|
||||
allow_localhost=ipaddr.is_loopback or ipaddr.is_unspecified,
|
||||
allow_lan=allow_lan,
|
||||
is_valid_ip=is_valid_public_ipv4,
|
||||
)
|
||||
loop = asyncio.get_event_loop()
|
||||
await loop.create_datagram_endpoint(lambda: proto, (ipaddr.compressed, port), family=socket.AF_INET)
|
||||
elif ipaddr.version == 6:
|
||||
proto = SPVServerStatusProtocol(
|
||||
height, tip, country,
|
||||
allow_localhost=ipaddr.is_loopback or ipaddr.is_unspecified,
|
||||
allow_lan=allow_lan,
|
||||
is_valid_ip=is_valid_public_ipv6,
|
||||
)
|
||||
# Because dualstack / IPv4 mapped address behavior on an IPv6 socket
|
||||
# differs based on system config, create the socket with IPV6_V6ONLY.
|
||||
# This disables the IPv4 mapped feature, so we don't need to consider
|
||||
# when an IPv6 socket may interfere with IPv4 binding / traffic.
|
||||
sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
|
||||
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
|
||||
sock.bind((ipaddr.compressed, port))
|
||||
loop = asyncio.get_event_loop()
|
||||
await loop.create_datagram_endpoint(lambda: proto, sock=sock)
|
||||
else:
|
||||
raise ValueError(f'unexpected IP address version {ipaddr.version}')
|
||||
log.info("started udp%i status server on %s", ipaddr.version, proto.transport.get_extra_info('sockname')[:2])
|
||||
self._protocols.append(proto)
|
||||
|
||||
async def start(self, height: int, tip: bytes, country: str, hosts: List[str], port: int, allow_lan: bool = False):
|
||||
if not isinstance(hosts, list):
|
||||
hosts = [hosts]
|
||||
try:
|
||||
for host in hosts:
|
||||
addr = None
|
||||
if not host:
|
||||
resolved = ['::', '0.0.0.0'] # unspecified address
|
||||
else:
|
||||
resolved = await resolve_host(host, port, 'udp', family=socket.AF_UNSPEC, all_results=True)
|
||||
for addr in resolved:
|
||||
await self._start(height, tip, country, addr, port, allow_lan)
|
||||
except Exception as e:
|
||||
if not isinstance(e, asyncio.CancelledError):
|
||||
log.error("UDP status server failed to listen on (%s:%i) : %s", addr or host, port, e)
|
||||
await self.stop()
|
||||
raise
|
||||
|
||||
async def stop(self):
|
||||
if self.is_running:
|
||||
await self._protocol.close()
|
||||
self._protocol = None
|
||||
for proto in self._protocols:
|
||||
await proto.close()
|
||||
self._protocols.clear()
|
||||
|
||||
@property
|
||||
def is_running(self):
|
||||
return self._protocol is not None
|
||||
return self._protocols
|
||||
|
||||
def set_unavailable(self):
|
||||
if self.is_running:
|
||||
self._protocol.set_unavailable()
|
||||
for proto in self._protocols:
|
||||
proto.set_unavailable()
|
||||
|
||||
def set_available(self):
|
||||
if self.is_running:
|
||||
self._protocol.set_available()
|
||||
for proto in self._protocols:
|
||||
proto.set_available()
|
||||
|
||||
def set_height(self, height: int, tip: bytes):
|
||||
if self.is_running:
|
||||
self._protocol.set_height(height, tip)
|
||||
for proto in self._protocols:
|
||||
proto.set_height(height, tip)
|
||||
|
||||
|
||||
class SPVStatusClientProtocol(asyncio.DatagramProtocol):
|
||||
|
@ -217,9 +301,14 @@ class SPVStatusClientProtocol(asyncio.DatagramProtocol):
|
|||
self.responses = responses
|
||||
self._ping_packet = SPVPing.make()
|
||||
|
||||
def datagram_received(self, data: bytes, addr: Tuple[str, int]):
|
||||
def datagram_received(self, data: bytes, addr: Union[Tuple[str, int], Tuple[str, int, int, int]]):
|
||||
try:
|
||||
self.responses.put_nowait(((addr, perf_counter()), SPVPong.decode(data)))
|
||||
if len(addr) > 2: # IPv6 with possible mapped IPv4
|
||||
ipaddr = ipaddress.ip_address(addr[0])
|
||||
if ipaddr.ipv4_mapped:
|
||||
# mapped IPv4 address identified
|
||||
addr = (ipaddr.ipv4_mapped.compressed, addr[1])
|
||||
self.responses.put_nowait(((addr[:2], perf_counter()), SPVPong.decode(data)))
|
||||
except (ValueError, struct.error, AttributeError, TypeError, RuntimeError):
|
||||
return
|
||||
|
||||
|
@ -230,7 +319,7 @@ class SPVStatusClientProtocol(asyncio.DatagramProtocol):
|
|||
self.transport = None
|
||||
log.info("closed udp spv server selection client")
|
||||
|
||||
def ping(self, server: Tuple[str, int]):
|
||||
def ping(self, server: Union[Tuple[str, int], Tuple[str, int, int, int]]):
|
||||
self.transport.sendto(self._ping_packet, server)
|
||||
|
||||
def close(self):
|
||||
|
|
|
@ -5,17 +5,17 @@ import time
|
|||
from typing import List
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from bisect import bisect_right
|
||||
from hub.common import ResumableSHA256
|
||||
from hub.common import sha256
|
||||
from hub.db import SecondaryDB
|
||||
|
||||
|
||||
class PrimaryDB(SecondaryDB):
|
||||
def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
|
||||
cache_all_tx_hashes: bool = False,
|
||||
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||
max_open_files: int = 64, blocking_channel_ids: List[str] = None,
|
||||
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||
index_address_status=False, enforce_integrity=True):
|
||||
super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_tx_hashes,
|
||||
super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_claim_txos, cache_all_tx_hashes,
|
||||
blocking_channel_ids, filtering_channel_ids, executor, index_address_status,
|
||||
enforce_integrity=enforce_integrity)
|
||||
|
||||
|
@ -35,19 +35,16 @@ class PrimaryDB(SecondaryDB):
|
|||
if last_hashX:
|
||||
yield last_hashX
|
||||
|
||||
def hashX_status_from_history(history: bytes) -> ResumableSHA256:
|
||||
def hashX_status_from_history(history: bytes) -> bytes:
|
||||
tx_counts = self.tx_counts
|
||||
hist_tx_nums = array.array('I')
|
||||
hist_tx_nums.frombytes(history)
|
||||
digest = ResumableSHA256()
|
||||
digest.update(
|
||||
b''.join(f'{tx_hash[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'.encode()
|
||||
for tx_num, tx_hash in zip(
|
||||
digest = hashlib.sha256()
|
||||
for tx_num, tx_hash in zip(
|
||||
hist_tx_nums,
|
||||
self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)
|
||||
))
|
||||
)
|
||||
return digest
|
||||
self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)):
|
||||
digest.update(f'{tx_hash[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'.encode())
|
||||
return digest.digest()
|
||||
|
||||
start = time.perf_counter()
|
||||
|
||||
|
@ -70,24 +67,17 @@ class PrimaryDB(SecondaryDB):
|
|||
hashX_cnt += 1
|
||||
key = prefix_db.hashX_status.pack_key(hashX)
|
||||
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
|
||||
digester = hashX_status_from_history(history)
|
||||
status = digester.digest()
|
||||
status = hashX_status_from_history(history)
|
||||
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
|
||||
existing_digester = prefix_db.hashX_history_hasher.get(hashX)
|
||||
if not existing_status:
|
||||
if existing_status and existing_status == status:
|
||||
continue
|
||||
elif not existing_status:
|
||||
prefix_db.stash_raw_put(key, status)
|
||||
op_cnt += 1
|
||||
else:
|
||||
prefix_db.stash_raw_delete(key, existing_status)
|
||||
prefix_db.stash_raw_put(key, status)
|
||||
op_cnt += 2
|
||||
if not existing_digester:
|
||||
prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,))
|
||||
op_cnt += 1
|
||||
else:
|
||||
prefix_db.hashX_history_hasher.stash_delete((hashX,), existing_digester)
|
||||
prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,))
|
||||
op_cnt += 2
|
||||
if op_cnt > 100000:
|
||||
prefix_db.unsafe_commit()
|
||||
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
|
||||
|
|
|
@ -3,13 +3,14 @@ from hub.env import Env
|
|||
|
||||
class BlockchainEnv(Env):
|
||||
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
|
||||
prometheus_port=None, cache_all_tx_hashes=None, blocking_channel_ids=None, filtering_channel_ids=None,
|
||||
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
|
||||
blocking_channel_ids=None, filtering_channel_ids=None,
|
||||
db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None,
|
||||
index_address_status=None, rebuild_address_status_from_height=None,
|
||||
daemon_ca_path=None, history_tx_cache_size=None,
|
||||
db_disable_integrity_checks=False):
|
||||
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
|
||||
blocking_channel_ids, filtering_channel_ids, index_address_status)
|
||||
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
|
||||
self.db_max_open_files = db_max_open_files
|
||||
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
|
||||
self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \
|
||||
|
@ -56,7 +57,7 @@ class BlockchainEnv(Env):
|
|||
db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files,
|
||||
max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit,
|
||||
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
|
||||
index_address_status=args.index_address_statuses,
|
||||
cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses,
|
||||
hashX_history_cache_size=args.address_history_cache_size,
|
||||
rebuild_address_status_from_height=args.rebuild_address_status_from_height,
|
||||
daemon_ca_path=args.daemon_ca_path, history_tx_cache_size=args.history_tx_cache_size,
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
import os
|
||||
import time
|
||||
import asyncio
|
||||
import typing
|
||||
|
@ -12,8 +11,7 @@ from hub import PROMETHEUS_NAMESPACE
|
|||
from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
|
||||
from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
|
||||
from hub.error.base import ChainError
|
||||
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache
|
||||
from hub.common import ResumableSHA256, LFUCacheWithMetrics
|
||||
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache, LFUCacheWithMetrics
|
||||
from hub.scribe.db import PrimaryDB
|
||||
from hub.scribe.daemon import LBCDaemon
|
||||
from hub.scribe.transaction import Tx, TxOutput, TxInput, Block
|
||||
|
@ -139,10 +137,11 @@ class BlockchainProcessorService(BlockchainService):
|
|||
def open_db(self):
|
||||
env = self.env
|
||||
self.db = PrimaryDB(
|
||||
env.coin, env.db_dir, env.reorg_limit, cache_all_tx_hashes=env.cache_all_tx_hashes,
|
||||
max_open_files=env.db_max_open_files, blocking_channel_ids=env.blocking_channel_ids,
|
||||
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
||||
index_address_status=env.index_address_status, enforce_integrity=not env.db_disable_integrity_checks
|
||||
env.coin, env.db_dir, env.reorg_limit, cache_all_claim_txos=env.cache_all_claim_txos,
|
||||
cache_all_tx_hashes=env.cache_all_tx_hashes, max_open_files=env.db_max_open_files,
|
||||
blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids,
|
||||
executor=self._executor, index_address_status=env.index_address_status,
|
||||
enforce_integrity=not env.db_disable_integrity_checks
|
||||
)
|
||||
|
||||
async def run_in_thread_with_lock(self, func, *args):
|
||||
|
@ -170,22 +169,10 @@ class BlockchainProcessorService(BlockchainService):
|
|||
|
||||
def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete):
|
||||
self.mempool.remove(to_delete)
|
||||
touched_hashXs = list(self.mempool.update_mempool(to_put))
|
||||
touched_hashXs = self.mempool.update_mempool(to_put)
|
||||
if self.env.index_address_status:
|
||||
status_hashers = {
|
||||
k: v.hasher if v else ResumableSHA256() for k, v in zip(
|
||||
touched_hashXs,
|
||||
self.db.prefix_db.hashX_history_hasher.multi_get([(hashX,) for hashX in touched_hashXs])
|
||||
)
|
||||
}
|
||||
for hashX, v in zip(
|
||||
touched_hashXs,
|
||||
self.db.prefix_db.hashX_mempool_status.multi_get([(hashX,) for hashX in touched_hashXs])):
|
||||
if v is not None:
|
||||
self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), v)
|
||||
hasher = status_hashers[hashX]
|
||||
hasher.update(self.mempool.mempool_history(hashX).encode())
|
||||
self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (hasher.digest(),))
|
||||
for hashX in touched_hashXs:
|
||||
self._get_update_hashX_mempool_status_ops(hashX)
|
||||
for tx_hash, raw_tx in to_put:
|
||||
mempool_prefix.stash_put((tx_hash,), (raw_tx,))
|
||||
for tx_hash, raw_tx in to_delete.items():
|
||||
|
@ -276,6 +263,9 @@ class BlockchainProcessorService(BlockchainService):
|
|||
for _ in range(count):
|
||||
await self.run_in_thread_with_lock(self.backup_block)
|
||||
self.log.info(f'backed up to height {self.height:,d}')
|
||||
|
||||
if self.env.cache_all_claim_txos:
|
||||
await self.db._read_claim_txos() # TODO: don't do this
|
||||
await self.prefetcher.reset_height(self.height)
|
||||
self.reorg_count_metric.inc()
|
||||
except:
|
||||
|
@ -404,6 +394,12 @@ class BlockchainProcessorService(BlockchainService):
|
|||
if claim_hash not in self.updated_claim_previous_activations:
|
||||
self.updated_claim_previous_activations[claim_hash] = activation
|
||||
|
||||
if self.env.cache_all_claim_txos:
|
||||
self.db.claim_to_txo[claim_hash] = ClaimToTXOValue(
|
||||
tx_num, nout, root_tx_num, root_idx, txo.value, channel_signature_is_valid, claim_name
|
||||
)
|
||||
self.db.txo_to_claim[tx_num][nout] = claim_hash
|
||||
|
||||
pending = StagedClaimtrieItem(
|
||||
claim_name, normalized_name, claim_hash, txo.value, self.coin.get_expiration_height(height), tx_num, nout,
|
||||
root_tx_num, root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash
|
||||
|
@ -694,6 +690,11 @@ class BlockchainProcessorService(BlockchainService):
|
|||
if 0 < activation <= self.height:
|
||||
self.effective_amount_delta[claim_hash] -= spent.amount
|
||||
self.future_effective_amount_delta[spent.claim_hash] -= spent.amount
|
||||
if self.env.cache_all_claim_txos:
|
||||
claim_hash = self.db.txo_to_claim[txin_num].pop(nout)
|
||||
if not self.db.txo_to_claim[txin_num]:
|
||||
self.db.txo_to_claim.pop(txin_num)
|
||||
self.db.claim_to_txo.pop(claim_hash)
|
||||
if spent.reposted_claim_hash:
|
||||
self.pending_reposted.add(spent.reposted_claim_hash)
|
||||
if spent.signing_hash and spent.channel_signature_is_valid and spent.signing_hash not in self.abandoned_claims:
|
||||
|
@ -1636,6 +1637,15 @@ class BlockchainProcessorService(BlockchainService):
|
|||
self.hashX_full_cache[hashX] = history
|
||||
return history
|
||||
|
||||
def _get_update_hashX_mempool_status_ops(self, hashX: bytes):
|
||||
existing = self.db.prefix_db.hashX_mempool_status.get(hashX)
|
||||
if existing:
|
||||
self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), existing)
|
||||
history = self._get_cached_hashX_history(hashX) + self.mempool.mempool_history(hashX)
|
||||
if history:
|
||||
status = sha256(history.encode())
|
||||
self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (status,))
|
||||
|
||||
def advance_block(self, block: Block):
|
||||
txo_count = 0
|
||||
txi_count = 0
|
||||
|
@ -1735,7 +1745,9 @@ class BlockchainProcessorService(BlockchainService):
|
|||
# update hashX history status hashes and compactify the histories
|
||||
self._get_update_hashX_histories_ops(height)
|
||||
|
||||
if self.env.index_address_status:
|
||||
# only compactify adddress histories and update the status index if we're already caught up,
|
||||
# a bulk update will happen once catchup finishes
|
||||
if not self.db.catching_up and self.env.index_address_status:
|
||||
self._get_compactify_ops(height)
|
||||
self.db.last_indexed_address_status_height = height
|
||||
|
||||
|
@ -1790,17 +1802,6 @@ class BlockchainProcessorService(BlockchainService):
|
|||
)
|
||||
|
||||
def _get_compactify_ops(self, height: int):
|
||||
def _rebuild_hasher(hist_tx_nums):
|
||||
hasher = ResumableSHA256()
|
||||
hasher.update(
|
||||
b''.join(f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num)}:'.encode()
|
||||
for tx_num, tx_hash in zip(
|
||||
hist_tx_nums,
|
||||
self.db.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)
|
||||
))
|
||||
)
|
||||
return hasher
|
||||
|
||||
existing_hashX_statuses = self.db.prefix_db.hashX_status.multi_get([(hashX,) for hashX in self.hashXs_by_tx.keys()], deserialize_value=False)
|
||||
if existing_hashX_statuses:
|
||||
pack_key = self.db.prefix_db.hashX_status.pack_key
|
||||
|
@ -1815,13 +1816,6 @@ class BlockchainProcessorService(BlockchainService):
|
|||
append_deletes_hashX_history = block_hashX_history_deletes.append
|
||||
block_hashX_history_puts = []
|
||||
|
||||
existing_status_hashers = {
|
||||
k: v.hasher if v else None for k, v in zip(
|
||||
self.hashXs_by_tx,
|
||||
self.db.prefix_db.hashX_history_hasher.multi_get([(hashX,) for hashX in self.hashXs_by_tx])
|
||||
)
|
||||
}
|
||||
|
||||
for (hashX, new_tx_nums), existing in zip(self.hashXs_by_tx.items(), existing_hashX_statuses):
|
||||
new_history = [(self.pending_transactions[tx_num], height) for tx_num in new_tx_nums]
|
||||
|
||||
|
@ -1836,9 +1830,11 @@ class BlockchainProcessorService(BlockchainService):
|
|||
unpack_key = self.db.prefix_db.hashX_history.unpack_key
|
||||
needs_compaction = False
|
||||
|
||||
total_hist_txs = b''
|
||||
for k, hist in self.db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_key=False,
|
||||
deserialize_value=False):
|
||||
hist_txs = unpack_history(hist)
|
||||
total_hist_txs += hist
|
||||
txs_extend(hist_txs)
|
||||
hist_height = unpack_key(k).height
|
||||
if height > reorg_limit and hist_height < height - reorg_limit:
|
||||
|
@ -1857,19 +1853,27 @@ class BlockchainProcessorService(BlockchainService):
|
|||
block_hashX_history_puts.append(((hashX, 0), (compact_hist_txs,)))
|
||||
if not new_history:
|
||||
continue
|
||||
hasher = existing_status_hashers[hashX]
|
||||
if hasher is None:
|
||||
# this is to migrate in the new column family, in the future it can be a new digester
|
||||
# hasher = ResumableSHA256()
|
||||
hasher = _rebuild_hasher(tx_nums)
|
||||
else:
|
||||
self.db.prefix_db.hashX_history_hasher.stash_delete((hashX,), (hasher,))
|
||||
hasher.update(b''.join(
|
||||
f'{tx_hash[::-1].hex()}:{height:d}:'.encode() for tx_hash, height in new_history
|
||||
))
|
||||
self.db.prefix_db.hashX_history_hasher.stash_put((hashX,), (hasher,))
|
||||
status = hasher.digest()
|
||||
self.db.prefix_db.hashX_status.stash_put((hashX,), (status,))
|
||||
|
||||
needed_tx_infos = []
|
||||
append_needed_tx_info = needed_tx_infos.append
|
||||
tx_infos = {}
|
||||
for tx_num in tx_nums:
|
||||
cached_tx_info = self.history_tx_info_cache.get(tx_num)
|
||||
if cached_tx_info is not None:
|
||||
tx_infos[tx_num] = cached_tx_info
|
||||
else:
|
||||
append_needed_tx_info(tx_num)
|
||||
if needed_tx_infos:
|
||||
for tx_num, tx_hash in zip(needed_tx_infos, self.db._get_tx_hashes(needed_tx_infos)):
|
||||
tx_info = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:'
|
||||
tx_infos[tx_num] = tx_info
|
||||
self.history_tx_info_cache[tx_num] = tx_info
|
||||
history = ''.join(map(tx_infos.__getitem__, tx_nums))
|
||||
for tx_hash, height in new_history:
|
||||
history += f'{tx_hash[::-1].hex()}:{height:d}:'
|
||||
if history:
|
||||
status = sha256(history.encode())
|
||||
self.db.prefix_db.hashX_status.stash_put((hashX,), (status,))
|
||||
|
||||
self.db.prefix_db.multi_delete(block_hashX_history_deletes)
|
||||
self.db.prefix_db.hashX_history.stash_multi_put(block_hashX_history_puts)
|
||||
|
@ -2070,8 +2074,6 @@ class BlockchainProcessorService(BlockchainService):
|
|||
"""Loop forever processing blocks as they arrive."""
|
||||
self._caught_up_event = caught_up_event
|
||||
try:
|
||||
if os.path.exists(self.db._need_restart_path):
|
||||
raise RuntimeError('scribe restart is needed')
|
||||
if self.height != self.daemon.cached_height() and not self.db.catching_up:
|
||||
await self._need_catch_up() # tell the readers that we're still catching up with lbrycrd/lbcd
|
||||
while not self._stopping:
|
||||
|
@ -2133,6 +2135,9 @@ class BlockchainProcessorService(BlockchainService):
|
|||
async def _finished_initial_catch_up(self):
|
||||
self.log.info(f'caught up to height {self.height}')
|
||||
|
||||
if self.env.index_address_status and self.db.last_indexed_address_status_height < self.db.db_height:
|
||||
await self.db.rebuild_hashX_status_index(self.db.last_indexed_address_status_height)
|
||||
|
||||
# Flush everything but with catching_up->False state.
|
||||
self.db.catching_up = False
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@ class BlockchainService:
|
|||
def open_db(self):
|
||||
env = self.env
|
||||
self.db = SecondaryDB(
|
||||
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
|
||||
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
|
||||
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
|
||||
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
||||
index_address_status=env.index_address_status
|
||||
|
|
5
setup.py
5
setup.py
|
@ -33,7 +33,7 @@ setup(
|
|||
'certifi>=2021.10.08',
|
||||
'colorama==0.3.7',
|
||||
'cffi==1.13.2',
|
||||
'protobuf==3.18.3',
|
||||
'protobuf==3.17.2',
|
||||
'msgpack==0.6.1',
|
||||
'prometheus_client==0.7.1',
|
||||
'coincurve==15.0.0',
|
||||
|
@ -44,8 +44,7 @@ setup(
|
|||
'filetype==1.0.9',
|
||||
'grpcio==1.38.0',
|
||||
'lbry-rocksdb==0.8.2',
|
||||
'ujson==5.4.0',
|
||||
'rehash==1.0.0'
|
||||
'ujson==5.4.0'
|
||||
],
|
||||
extras_require={
|
||||
'lint': ['pylint==2.10.0'],
|
||||
|
|
Loading…
Reference in a new issue