Compare commits
8 commits
master
...
moodyjon/i
Author | SHA1 | Date | |
---|---|---|---|
|
d495ce9f0a | ||
|
8794ff48e0 | ||
|
fa0d03fe95 | ||
|
14f2f3b55b | ||
|
9c43c811a1 | ||
|
6c037b29b5 | ||
|
252a1aa165 | ||
|
f370e263b5 |
22 changed files with 343 additions and 285 deletions
|
@ -17,7 +17,7 @@ services:
|
||||||
- "lbry_rocksdb:/database"
|
- "lbry_rocksdb:/database"
|
||||||
environment:
|
environment:
|
||||||
- HUB_COMMAND=scribe
|
- HUB_COMMAND=scribe
|
||||||
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1312050/lbry-rocksdb.tar
|
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1256013/lbry-rocksdb.tar
|
||||||
command: # for full options, see `scribe --help`
|
command: # for full options, see `scribe --help`
|
||||||
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
|
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
|
||||||
- "--max_query_workers=2"
|
- "--max_query_workers=2"
|
||||||
|
|
|
@ -14,7 +14,7 @@ services:
|
||||||
- "lbry_rocksdb:/database"
|
- "lbry_rocksdb:/database"
|
||||||
environment:
|
environment:
|
||||||
- HUB_COMMAND=scribe
|
- HUB_COMMAND=scribe
|
||||||
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1312050/lbry-rocksdb.tar
|
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1256013/lbry-rocksdb.tar
|
||||||
command:
|
command:
|
||||||
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
|
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
|
||||||
- "--max_query_workers=2"
|
- "--max_query_workers=2"
|
||||||
|
|
102
hub/common.py
102
hub/common.py
|
@ -5,16 +5,15 @@ import hmac
|
||||||
import ipaddress
|
import ipaddress
|
||||||
import logging
|
import logging
|
||||||
import logging.handlers
|
import logging.handlers
|
||||||
|
import socket
|
||||||
import typing
|
import typing
|
||||||
import collections
|
import collections
|
||||||
from ctypes import cast, memmove, POINTER, c_void_p
|
|
||||||
from bisect import insort_right
|
from bisect import insort_right
|
||||||
from collections import deque
|
from collections import deque
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
from typing import Iterable, Deque
|
from typing import Iterable, Deque
|
||||||
from asyncio import get_event_loop, Event
|
from asyncio import get_event_loop, Event
|
||||||
from prometheus_client import Counter
|
from prometheus_client import Counter
|
||||||
from rehash.structs import EVPobject
|
|
||||||
from hub.schema.tags import clean_tags
|
from hub.schema.tags import clean_tags
|
||||||
from hub.schema.url import normalize_name
|
from hub.schema.url import normalize_name
|
||||||
from hub.error import TooManyClaimSearchParametersError
|
from hub.error import TooManyClaimSearchParametersError
|
||||||
|
@ -155,6 +154,38 @@ def protocol_version(client_req, min_tuple, max_tuple):
|
||||||
return result, client_min
|
return result, client_min
|
||||||
|
|
||||||
|
|
||||||
|
async def resolve_host(url: str, port: int, proto: str,
|
||||||
|
family: int = socket.AF_INET, all_results: bool = False) \
|
||||||
|
-> typing.Union[str, typing.List[str]]:
|
||||||
|
if proto not in ['udp', 'tcp']:
|
||||||
|
raise Exception("invalid protocol")
|
||||||
|
try:
|
||||||
|
if ipaddress.ip_address(url):
|
||||||
|
return [url] if all_results else url
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
records = await loop.getaddrinfo(
|
||||||
|
url, port,
|
||||||
|
proto=socket.IPPROTO_TCP if proto == 'tcp' else socket.IPPROTO_UDP,
|
||||||
|
type=socket.SOCK_STREAM if proto == 'tcp' else socket.SOCK_DGRAM,
|
||||||
|
family=family,
|
||||||
|
)
|
||||||
|
def addr_not_ipv4_mapped(rec):
|
||||||
|
_, _, _, _, sockaddr = rec
|
||||||
|
ipaddr = ipaddress.ip_address(sockaddr[0])
|
||||||
|
return ipaddr.version != 6 or not ipaddr.ipv4_mapped
|
||||||
|
records = filter(addr_not_ipv4_mapped, records)
|
||||||
|
results = [sockaddr[0] for fam, type, prot, canonname, sockaddr in records]
|
||||||
|
if not results and not all_results:
|
||||||
|
raise socket.gaierror(
|
||||||
|
socket.EAI_ADDRFAMILY,
|
||||||
|
'The specified network host does not have any network '
|
||||||
|
'addresses in the requested address family'
|
||||||
|
)
|
||||||
|
return results if all_results else results[0]
|
||||||
|
|
||||||
|
|
||||||
class LRUCacheWithMetrics:
|
class LRUCacheWithMetrics:
|
||||||
__slots__ = [
|
__slots__ = [
|
||||||
'capacity',
|
'capacity',
|
||||||
|
@ -579,11 +610,13 @@ IPV4_TO_6_RELAY_SUBNET = ipaddress.ip_network('192.88.99.0/24')
|
||||||
def is_valid_public_ipv4(address, allow_localhost: bool = False, allow_lan: bool = False):
|
def is_valid_public_ipv4(address, allow_localhost: bool = False, allow_lan: bool = False):
|
||||||
try:
|
try:
|
||||||
parsed_ip = ipaddress.ip_address(address)
|
parsed_ip = ipaddress.ip_address(address)
|
||||||
if parsed_ip.is_loopback and allow_localhost:
|
if parsed_ip.version != 4:
|
||||||
return True
|
return False
|
||||||
if allow_lan and parsed_ip.is_private:
|
if parsed_ip.is_loopback:
|
||||||
return True
|
return allow_localhost
|
||||||
if any((parsed_ip.version != 4, parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback,
|
if parsed_ip.is_private:
|
||||||
|
return allow_lan
|
||||||
|
if any((parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback,
|
||||||
parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private)):
|
parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private)):
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
|
@ -592,6 +625,23 @@ def is_valid_public_ipv4(address, allow_localhost: bool = False, allow_lan: bool
|
||||||
except (ipaddress.AddressValueError, ValueError):
|
except (ipaddress.AddressValueError, ValueError):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def is_valid_public_ipv6(address, allow_localhost: bool = False, allow_lan: bool = False):
|
||||||
|
try:
|
||||||
|
parsed_ip = ipaddress.ip_address(address)
|
||||||
|
if parsed_ip.version != 6:
|
||||||
|
return False
|
||||||
|
if parsed_ip.is_loopback:
|
||||||
|
return allow_localhost
|
||||||
|
if parsed_ip.is_private:
|
||||||
|
return allow_lan
|
||||||
|
return not any((parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback,
|
||||||
|
parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private,
|
||||||
|
parsed_ip.ipv4_mapped))
|
||||||
|
except (ipaddress.AddressValueError, ValueError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def is_valid_public_ip(address, **kwargs):
|
||||||
|
return is_valid_public_ipv6(address, **kwargs) or is_valid_public_ipv4(address, **kwargs)
|
||||||
|
|
||||||
def sha256(x):
|
def sha256(x):
|
||||||
"""Simple wrapper of hashlib sha256."""
|
"""Simple wrapper of hashlib sha256."""
|
||||||
|
@ -1061,41 +1111,3 @@ async def asyncify_for_loop(gen, ticks_per_sleep: int = 1000):
|
||||||
yield item
|
yield item
|
||||||
if cnt % ticks_per_sleep == 0:
|
if cnt % ticks_per_sleep == 0:
|
||||||
await async_sleep(0)
|
await async_sleep(0)
|
||||||
|
|
||||||
|
|
||||||
_SHA256_DIGEST_STATE_SIZE = 120
|
|
||||||
|
|
||||||
|
|
||||||
class ResumableSHA256:
|
|
||||||
__slots__ = ['_hasher']
|
|
||||||
|
|
||||||
def __init__(self, state: typing.Optional[bytes] = None):
|
|
||||||
self._hasher = hashlib.sha256()
|
|
||||||
if state is not None:
|
|
||||||
ctx = self._get_evp_md_ctx()
|
|
||||||
ctx_size = ctx.digest.contents.ctx_size
|
|
||||||
if len(state) != _SHA256_DIGEST_STATE_SIZE != ctx_size:
|
|
||||||
raise Exception(f'invalid sha256 digester state, got {len(state)} bytes')
|
|
||||||
memmove(ctx.md_data, state, ctx_size)
|
|
||||||
|
|
||||||
def _get_evp_md_ctx(self):
|
|
||||||
c_evp_obj = cast(c_void_p(id(self._hasher)), POINTER(EVPobject))
|
|
||||||
if hasattr(c_evp_obj.contents.ctx, "contents"):
|
|
||||||
return c_evp_obj.contents.ctx.contents
|
|
||||||
else:
|
|
||||||
return c_evp_obj.contents.ctx
|
|
||||||
|
|
||||||
def get_state(self) -> bytes:
|
|
||||||
ctx = self._get_evp_md_ctx()
|
|
||||||
ctx_size = ctx.digest.contents.ctx_size
|
|
||||||
hasher_state = ctx.md_data[:ctx_size]
|
|
||||||
return hasher_state
|
|
||||||
|
|
||||||
def __copy__(self):
|
|
||||||
return ResumableSHA256(self.get_state())
|
|
||||||
|
|
||||||
def update(self, data: bytes):
|
|
||||||
self._hasher.update(data)
|
|
||||||
|
|
||||||
def digest(self):
|
|
||||||
return self._hasher.digest()
|
|
||||||
|
|
|
@ -51,7 +51,6 @@ class DB_PREFIXES(enum.Enum):
|
||||||
reposted_count = b'j'
|
reposted_count = b'j'
|
||||||
effective_amount = b'i'
|
effective_amount = b'i'
|
||||||
future_effective_amount = b'k'
|
future_effective_amount = b'k'
|
||||||
hashX_history_hash = b'l'
|
|
||||||
|
|
||||||
|
|
||||||
COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass
|
COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass
|
||||||
|
|
52
hub/db/db.py
52
hub/db/db.py
|
@ -37,7 +37,7 @@ class SecondaryDB:
|
||||||
DB_VERSIONS = [7, 8, 9, 10, 11, 12]
|
DB_VERSIONS = [7, 8, 9, 10, 11, 12]
|
||||||
|
|
||||||
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
|
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
|
||||||
cache_all_tx_hashes: bool = False,
|
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||||
blocking_channel_ids: List[str] = None,
|
blocking_channel_ids: List[str] = None,
|
||||||
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||||
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768,
|
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768,
|
||||||
|
@ -47,9 +47,9 @@ class SecondaryDB:
|
||||||
self._executor = executor
|
self._executor = executor
|
||||||
self._db_dir = db_dir
|
self._db_dir = db_dir
|
||||||
self._reorg_limit = reorg_limit
|
self._reorg_limit = reorg_limit
|
||||||
|
self._cache_all_claim_txos = cache_all_claim_txos
|
||||||
self._cache_all_tx_hashes = cache_all_tx_hashes
|
self._cache_all_tx_hashes = cache_all_tx_hashes
|
||||||
self._secondary_name = secondary_name
|
self._secondary_name = secondary_name
|
||||||
self._need_restart_path = os.path.join(self._db_dir, 'NEED_SCRIBE_RESTART')
|
|
||||||
if secondary_name:
|
if secondary_name:
|
||||||
assert max_open_files == -1, 'max open files must be -1 for secondary readers'
|
assert max_open_files == -1, 'max open files must be -1 for secondary readers'
|
||||||
self._db_max_open_files = max_open_files
|
self._db_max_open_files = max_open_files
|
||||||
|
@ -100,6 +100,9 @@ class SecondaryDB:
|
||||||
self.total_transactions: List[bytes] = []
|
self.total_transactions: List[bytes] = []
|
||||||
self.tx_num_mapping: Dict[bytes, int] = {}
|
self.tx_num_mapping: Dict[bytes, int] = {}
|
||||||
|
|
||||||
|
# these are only used if the cache_all_claim_txos setting is on
|
||||||
|
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
|
||||||
|
self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict)
|
||||||
self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH)
|
self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH)
|
||||||
|
|
||||||
def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]:
|
def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]:
|
||||||
|
@ -953,6 +956,21 @@ class SecondaryDB:
|
||||||
else:
|
else:
|
||||||
assert self.db_tx_count == 0
|
assert self.db_tx_count == 0
|
||||||
|
|
||||||
|
async def _read_claim_txos(self):
|
||||||
|
def read_claim_txos():
|
||||||
|
set_claim_to_txo = self.claim_to_txo.__setitem__
|
||||||
|
for k, v in self.prefix_db.claim_to_txo.iterate(fill_cache=False):
|
||||||
|
set_claim_to_txo(k.claim_hash, v)
|
||||||
|
self.txo_to_claim[v.tx_num][v.position] = k.claim_hash
|
||||||
|
|
||||||
|
self.claim_to_txo.clear()
|
||||||
|
self.txo_to_claim.clear()
|
||||||
|
start = time.perf_counter()
|
||||||
|
self.logger.info("loading claims")
|
||||||
|
await asyncio.get_event_loop().run_in_executor(self._executor, read_claim_txos)
|
||||||
|
ts = time.perf_counter() - start
|
||||||
|
self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4))
|
||||||
|
|
||||||
# async def _read_headers(self):
|
# async def _read_headers(self):
|
||||||
# # if self.headers is not None:
|
# # if self.headers is not None:
|
||||||
# # return
|
# # return
|
||||||
|
@ -1001,27 +1019,6 @@ class SecondaryDB:
|
||||||
secondary_path = '' if not self._secondary_name else os.path.join(
|
secondary_path = '' if not self._secondary_name else os.path.join(
|
||||||
self._db_dir, self._secondary_name
|
self._db_dir, self._secondary_name
|
||||||
)
|
)
|
||||||
open_db_canary = None
|
|
||||||
|
|
||||||
if self._secondary_name:
|
|
||||||
open_db_canary = os.path.join(self._db_dir, f'{self._secondary_name}-db-canary')
|
|
||||||
if os.path.exists(open_db_canary):
|
|
||||||
with open(self._need_restart_path, 'w+') as f:
|
|
||||||
f.write(f"{time.strftime(f'%Y-%m-%d %H:%M:%S')} {self._secondary_name}\n")
|
|
||||||
raise RuntimeError('scribe restart is needed')
|
|
||||||
else:
|
|
||||||
with open(open_db_canary, 'w'):
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
herald_db_canary = os.path.join(self._db_dir, 'lbry-reader-db-canary')
|
|
||||||
es_sync_db_canary = os.path.join(self._db_dir, 'lbry-elastic-writer-db-canary')
|
|
||||||
if os.path.exists(herald_db_canary):
|
|
||||||
os.remove(herald_db_canary)
|
|
||||||
if os.path.exists(es_sync_db_canary):
|
|
||||||
os.remove(es_sync_db_canary)
|
|
||||||
if os.path.exists(self._need_restart_path):
|
|
||||||
os.remove(self._need_restart_path)
|
|
||||||
|
|
||||||
db_path = os.path.join(self._db_dir, 'lbry-rocksdb')
|
db_path = os.path.join(self._db_dir, 'lbry-rocksdb')
|
||||||
self.prefix_db = PrefixDB(
|
self.prefix_db = PrefixDB(
|
||||||
db_path, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files,
|
db_path, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files,
|
||||||
|
@ -1030,7 +1027,6 @@ class SecondaryDB:
|
||||||
)
|
)
|
||||||
|
|
||||||
if secondary_path != '':
|
if secondary_path != '':
|
||||||
os.remove(open_db_canary)
|
|
||||||
self.logger.info(f'opened db for read only: lbry-rocksdb (%s)', db_path)
|
self.logger.info(f'opened db for read only: lbry-rocksdb (%s)', db_path)
|
||||||
else:
|
else:
|
||||||
self.logger.info(f'opened db for writing: lbry-rocksdb (%s)', db_path)
|
self.logger.info(f'opened db for writing: lbry-rocksdb (%s)', db_path)
|
||||||
|
@ -1067,6 +1063,8 @@ class SecondaryDB:
|
||||||
async def initialize_caches(self):
|
async def initialize_caches(self):
|
||||||
await self._read_tx_counts()
|
await self._read_tx_counts()
|
||||||
await self._read_block_hashes()
|
await self._read_block_hashes()
|
||||||
|
if self._cache_all_claim_txos:
|
||||||
|
await self._read_claim_txos()
|
||||||
if self._cache_all_tx_hashes:
|
if self._cache_all_tx_hashes:
|
||||||
await self._read_tx_hashes()
|
await self._read_tx_hashes()
|
||||||
if self.db_height > 0:
|
if self.db_height > 0:
|
||||||
|
@ -1156,9 +1154,15 @@ class SecondaryDB:
|
||||||
}
|
}
|
||||||
|
|
||||||
def get_cached_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]:
|
def get_cached_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]:
|
||||||
|
if self._cache_all_claim_txos:
|
||||||
|
return self.claim_to_txo.get(claim_hash)
|
||||||
return self.prefix_db.claim_to_txo.get_pending(claim_hash)
|
return self.prefix_db.claim_to_txo.get_pending(claim_hash)
|
||||||
|
|
||||||
def get_cached_claim_hash(self, tx_num: int, position: int) -> Optional[bytes]:
|
def get_cached_claim_hash(self, tx_num: int, position: int) -> Optional[bytes]:
|
||||||
|
if self._cache_all_claim_txos:
|
||||||
|
if tx_num not in self.txo_to_claim:
|
||||||
|
return
|
||||||
|
return self.txo_to_claim[tx_num].get(position, None)
|
||||||
v = self.prefix_db.txo_to_claim.get_pending(tx_num, position)
|
v = self.prefix_db.txo_to_claim.get_pending(tx_num, position)
|
||||||
return None if not v else v.claim_hash
|
return None if not v else v.claim_hash
|
||||||
|
|
||||||
|
|
|
@ -273,7 +273,6 @@ class BasePrefixDB:
|
||||||
undo_c_f = self.column_families[DB_PREFIXES.undo.value]
|
undo_c_f = self.column_families[DB_PREFIXES.undo.value]
|
||||||
undo_info = self._db.get((undo_c_f, undo_key))
|
undo_info = self._db.get((undo_c_f, undo_key))
|
||||||
self._op_stack.apply_packed_undo_ops(undo_info)
|
self._op_stack.apply_packed_undo_ops(undo_info)
|
||||||
self._op_stack.validate_and_apply_stashed_ops()
|
|
||||||
try:
|
try:
|
||||||
with self._db.write_batch(sync=True) as batch:
|
with self._db.write_batch(sync=True) as batch:
|
||||||
batch_put = batch.put
|
batch_put = batch.put
|
||||||
|
|
|
@ -3,7 +3,6 @@ import struct
|
||||||
import array
|
import array
|
||||||
import base64
|
import base64
|
||||||
from typing import Union, Tuple, NamedTuple, Optional
|
from typing import Union, Tuple, NamedTuple, Optional
|
||||||
from hub.common import ResumableSHA256
|
|
||||||
from hub.db.common import DB_PREFIXES
|
from hub.db.common import DB_PREFIXES
|
||||||
from hub.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow
|
from hub.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow
|
||||||
from hub.schema.url import normalize_name
|
from hub.schema.url import normalize_name
|
||||||
|
@ -1852,46 +1851,6 @@ class FutureEffectiveAmountPrefixRow(PrefixRow):
|
||||||
return cls.pack_key(claim_hash), cls.pack_value(future_effective_amount)
|
return cls.pack_key(claim_hash), cls.pack_value(future_effective_amount)
|
||||||
|
|
||||||
|
|
||||||
class HashXHistoryHasherKey(NamedTuple):
|
|
||||||
hashX: bytes
|
|
||||||
|
|
||||||
|
|
||||||
class HashXHistoryHasherValue(NamedTuple):
|
|
||||||
hasher: ResumableSHA256
|
|
||||||
|
|
||||||
|
|
||||||
class HashXHistoryHasherPrefixRow(PrefixRow):
|
|
||||||
prefix = DB_PREFIXES.hashX_history_hash.value
|
|
||||||
key_struct = struct.Struct(b'>11s')
|
|
||||||
value_struct = struct.Struct(b'>120s')
|
|
||||||
cache_size = 1024 * 1024 * 64
|
|
||||||
|
|
||||||
key_part_lambdas = [
|
|
||||||
lambda: b'',
|
|
||||||
struct.Struct(b'>11s').pack
|
|
||||||
]
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def pack_key(cls, hashX: bytes):
|
|
||||||
return super().pack_key(hashX)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def unpack_key(cls, key: bytes) -> HashXHistoryHasherKey:
|
|
||||||
return HashXHistoryHasherKey(*super().unpack_key(key))
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def pack_value(cls, hasher: ResumableSHA256) -> bytes:
|
|
||||||
return super().pack_value(hasher.get_state())
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def unpack_value(cls, data: bytes) -> HashXHistoryHasherValue:
|
|
||||||
return HashXHistoryHasherValue(ResumableSHA256(*super().unpack_value(data)))
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def pack_item(cls, hashX: bytes, hasher: ResumableSHA256):
|
|
||||||
return cls.pack_key(hashX), cls.pack_value(hasher)
|
|
||||||
|
|
||||||
|
|
||||||
class PrefixDB(BasePrefixDB):
|
class PrefixDB(BasePrefixDB):
|
||||||
def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64,
|
def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64,
|
||||||
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None,
|
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None,
|
||||||
|
@ -1938,7 +1897,6 @@ class PrefixDB(BasePrefixDB):
|
||||||
self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack)
|
self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack)
|
||||||
self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack)
|
self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack)
|
||||||
self.future_effective_amount = FutureEffectiveAmountPrefixRow(db, self._op_stack)
|
self.future_effective_amount = FutureEffectiveAmountPrefixRow(db, self._op_stack)
|
||||||
self.hashX_history_hasher = HashXHistoryHasherPrefixRow(db, self._op_stack)
|
|
||||||
|
|
||||||
|
|
||||||
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:
|
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:
|
||||||
|
|
|
@ -160,7 +160,7 @@ class RevertableOpStack:
|
||||||
# there is a value and we're not deleting it in this op
|
# there is a value and we're not deleting it in this op
|
||||||
# check that a delete for the stored value is in the stack
|
# check that a delete for the stored value is in the stack
|
||||||
raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}\nvs\n{stored_val}")
|
raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}\nvs\n{stored_val}")
|
||||||
elif not has_stored_val:
|
elif not stored_val:
|
||||||
raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
|
raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
|
||||||
elif stored_val != op.value:
|
elif stored_val != op.value:
|
||||||
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
|
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
|
||||||
|
@ -324,21 +324,9 @@ class RevertableOpStack:
|
||||||
"""
|
"""
|
||||||
Unpack and apply a sequence of undo ops from serialized undo bytes
|
Unpack and apply a sequence of undo ops from serialized undo bytes
|
||||||
"""
|
"""
|
||||||
offset = 0
|
while packed:
|
||||||
packed_size = len(packed)
|
op, packed = RevertableOp.unpack(packed)
|
||||||
while offset < packed_size:
|
self.append_op(op)
|
||||||
is_put, key_len, val_len = _OP_STRUCT.unpack(packed[offset:offset + 9])
|
|
||||||
offset += 9
|
|
||||||
key = packed[offset:offset + key_len]
|
|
||||||
offset += key_len
|
|
||||||
value = packed[offset:offset + val_len]
|
|
||||||
offset += val_len
|
|
||||||
if is_put == 1:
|
|
||||||
op = RevertablePut(key, value)
|
|
||||||
else:
|
|
||||||
op = RevertableDelete(key, value)
|
|
||||||
self._stash.append(op)
|
|
||||||
self._stashed_last_op_for_key[op.key] = op
|
|
||||||
|
|
||||||
def get_pending_op(self, key: bytes) -> Optional[RevertableOp]:
|
def get_pending_op(self, key: bytes) -> Optional[RevertableOp]:
|
||||||
if key in self._stashed_last_op_for_key:
|
if key in self._stashed_last_op_for_key:
|
||||||
|
|
|
@ -9,11 +9,11 @@ from hub.db.common import ResolveResult
|
||||||
|
|
||||||
class ElasticSyncDB(SecondaryDB):
|
class ElasticSyncDB(SecondaryDB):
|
||||||
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
|
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
|
||||||
cache_all_tx_hashes: bool = False,
|
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||||
blocking_channel_ids: List[str] = None,
|
blocking_channel_ids: List[str] = None,
|
||||||
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||||
index_address_status=False):
|
index_address_status=False):
|
||||||
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit,
|
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
|
||||||
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
|
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
|
||||||
index_address_status)
|
index_address_status)
|
||||||
self.block_timestamp_cache = LRUCache(1024)
|
self.block_timestamp_cache = LRUCache(1024)
|
||||||
|
|
|
@ -3,11 +3,11 @@ from hub.env import Env
|
||||||
|
|
||||||
class ElasticEnv(Env):
|
class ElasticEnv(Env):
|
||||||
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None,
|
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None,
|
||||||
cache_all_tx_hashes=None, elastic_host=None, elastic_port=None,
|
cache_all_tx_hashes=None, cache_all_claim_txos=None, elastic_host=None, elastic_port=None,
|
||||||
es_index_prefix=None, elastic_notifier_host=None, elastic_notifier_port=None,
|
es_index_prefix=None, elastic_notifier_host=None, elastic_notifier_port=None,
|
||||||
blocking_channel_ids=None, filtering_channel_ids=None, reindex=False):
|
blocking_channel_ids=None, filtering_channel_ids=None, reindex=False):
|
||||||
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
|
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
|
||||||
blocking_channel_ids, filtering_channel_ids)
|
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids)
|
||||||
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
|
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
|
||||||
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
|
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
|
||||||
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default(
|
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default(
|
||||||
|
@ -43,7 +43,7 @@ class ElasticEnv(Env):
|
||||||
elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain,
|
elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain,
|
||||||
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit,
|
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit,
|
||||||
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
|
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
|
||||||
blocking_channel_ids=args.blocking_channel_ids,
|
cache_all_claim_txos=args.cache_all_claim_txos, blocking_channel_ids=args.blocking_channel_ids,
|
||||||
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
|
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
|
||||||
elastic_notifier_port=args.elastic_notifier_port
|
elastic_notifier_port=args.elastic_notifier_port
|
||||||
)
|
)
|
||||||
|
|
|
@ -49,7 +49,7 @@ class ElasticSyncService(BlockchainReaderService):
|
||||||
def open_db(self):
|
def open_db(self):
|
||||||
env = self.env
|
env = self.env
|
||||||
self.db = ElasticSyncDB(
|
self.db = ElasticSyncDB(
|
||||||
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
|
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
|
||||||
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
|
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
|
||||||
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
||||||
index_address_status=env.index_address_status
|
index_address_status=env.index_address_status
|
||||||
|
|
12
hub/env.py
12
hub/env.py
|
@ -30,7 +30,7 @@ class Env:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
|
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
|
||||||
prometheus_port=None, cache_all_tx_hashes=None,
|
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
|
||||||
blocking_channel_ids=None, filtering_channel_ids=None, index_address_status=None):
|
blocking_channel_ids=None, filtering_channel_ids=None, index_address_status=None):
|
||||||
|
|
||||||
self.logger = logging.getLogger(__name__)
|
self.logger = logging.getLogger(__name__)
|
||||||
|
@ -46,6 +46,7 @@ class Env:
|
||||||
self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
|
self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
|
||||||
self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0)
|
self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0)
|
||||||
self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False)
|
self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False)
|
||||||
|
self.cache_all_claim_txos = cache_all_claim_txos if cache_all_claim_txos is not None else self.boolean('CACHE_ALL_CLAIM_TXOS', False)
|
||||||
# Filtering / Blocking
|
# Filtering / Blocking
|
||||||
self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default(
|
self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default(
|
||||||
'BLOCKING_CHANNEL_IDS', '').split(' ')
|
'BLOCKING_CHANNEL_IDS', '').split(' ')
|
||||||
|
@ -116,10 +117,6 @@ class Env:
|
||||||
result = [part.strip() for part in host.split(',')]
|
result = [part.strip() for part in host.split(',')]
|
||||||
if len(result) == 1:
|
if len(result) == 1:
|
||||||
result = result[0]
|
result = result[0]
|
||||||
if result == 'localhost':
|
|
||||||
# 'localhost' resolves to ::1 (ipv6) on many systems, which fails on default setup of
|
|
||||||
# docker, using 127.0.0.1 instead forces ipv4
|
|
||||||
result = '127.0.0.1'
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def sane_max_sessions(self):
|
def sane_max_sessions(self):
|
||||||
|
@ -170,6 +167,11 @@ class Env:
|
||||||
"resolve, transaction fetching, and block sync all faster at the expense of higher "
|
"resolve, transaction fetching, and block sync all faster at the expense of higher "
|
||||||
"memory usage (at least 10GB more). Can be set in env with 'CACHE_ALL_TX_HASHES'.",
|
"memory usage (at least 10GB more). Can be set in env with 'CACHE_ALL_TX_HASHES'.",
|
||||||
default=cls.boolean('CACHE_ALL_TX_HASHES', False))
|
default=cls.boolean('CACHE_ALL_TX_HASHES', False))
|
||||||
|
parser.add_argument('--cache_all_claim_txos', action='store_true',
|
||||||
|
help="Load all claim txos into memory. This will make address subscriptions and sync, "
|
||||||
|
"resolve, transaction fetching, and block sync all faster at the expense of higher "
|
||||||
|
"memory usage. Can be set in env with 'CACHE_ALL_CLAIM_TXOS'.",
|
||||||
|
default=cls.boolean('CACHE_ALL_CLAIM_TXOS', False))
|
||||||
parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0),
|
parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0),
|
||||||
help="Port for prometheus metrics to listen on, disabled by default. "
|
help="Port for prometheus metrics to listen on, disabled by default. "
|
||||||
"Can be set in env with 'PROMETHEUS_PORT'.")
|
"Can be set in env with 'PROMETHEUS_PORT'.")
|
||||||
|
|
|
@ -6,11 +6,11 @@ from hub.db import SecondaryDB
|
||||||
|
|
||||||
class HeraldDB(SecondaryDB):
|
class HeraldDB(SecondaryDB):
|
||||||
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
|
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
|
||||||
cache_all_tx_hashes: bool = False,
|
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||||
blocking_channel_ids: List[str] = None,
|
blocking_channel_ids: List[str] = None,
|
||||||
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||||
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768):
|
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768):
|
||||||
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit,
|
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
|
||||||
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
|
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
|
||||||
index_address_status, merkle_cache_size, tx_cache_size)
|
index_address_status, merkle_cache_size, tx_cache_size)
|
||||||
# self.headers = None
|
# self.headers = None
|
||||||
|
|
|
@ -19,7 +19,7 @@ def parse_es_services(elastic_services_arg: str):
|
||||||
|
|
||||||
class ServerEnv(Env):
|
class ServerEnv(Env):
|
||||||
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
|
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
|
||||||
prometheus_port=None, cache_all_tx_hashes=None,
|
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
|
||||||
daemon_url=None, host=None, elastic_services=None, es_index_prefix=None,
|
daemon_url=None, host=None, elastic_services=None, es_index_prefix=None,
|
||||||
tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None,
|
tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None,
|
||||||
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
|
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
|
||||||
|
@ -29,7 +29,7 @@ class ServerEnv(Env):
|
||||||
merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None,
|
merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None,
|
||||||
history_tx_cache_size=None, largest_address_history_cache_size=None):
|
history_tx_cache_size=None, largest_address_history_cache_size=None):
|
||||||
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
|
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
|
||||||
blocking_channel_ids, filtering_channel_ids, index_address_status)
|
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
|
||||||
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
|
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
|
||||||
self.host = host if host is not None else self.default('HOST', 'localhost')
|
self.host = host if host is not None else self.default('HOST', 'localhost')
|
||||||
self.elastic_services = deque(parse_es_services(elastic_services or 'localhost:9200/localhost:19080'))
|
self.elastic_services = deque(parse_es_services(elastic_services or 'localhost:9200/localhost:19080'))
|
||||||
|
@ -153,7 +153,7 @@ class ServerEnv(Env):
|
||||||
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
|
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
|
||||||
udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file,
|
udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file,
|
||||||
allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes,
|
allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes,
|
||||||
country=args.country, payment_address=args.payment_address,
|
cache_all_claim_txos=args.cache_all_claim_txos, country=args.country, payment_address=args.payment_address,
|
||||||
donation_address=args.donation_address, max_send=args.max_send, max_receive=args.max_receive,
|
donation_address=args.donation_address, max_send=args.max_send, max_receive=args.max_receive,
|
||||||
max_sessions=args.max_sessions, session_timeout=args.session_timeout,
|
max_sessions=args.max_sessions, session_timeout=args.session_timeout,
|
||||||
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,
|
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import errno
|
||||||
import time
|
import time
|
||||||
import typing
|
import typing
|
||||||
import asyncio
|
import asyncio
|
||||||
|
@ -53,7 +54,7 @@ class HubServerService(BlockchainReaderService):
|
||||||
def open_db(self):
|
def open_db(self):
|
||||||
env = self.env
|
env = self.env
|
||||||
self.db = HeraldDB(
|
self.db = HeraldDB(
|
||||||
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
|
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
|
||||||
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
|
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
|
||||||
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
||||||
index_address_status=env.index_address_status, merkle_cache_size=env.merkle_cache_size,
|
index_address_status=env.index_address_status, merkle_cache_size=env.merkle_cache_size,
|
||||||
|
@ -170,10 +171,20 @@ class HubServerService(BlockchainReaderService):
|
||||||
|
|
||||||
async def start_status_server(self):
|
async def start_status_server(self):
|
||||||
if self.env.udp_port and int(self.env.udp_port):
|
if self.env.udp_port and int(self.env.udp_port):
|
||||||
await self.status_server.start(
|
hosts = self.env.cs_host()
|
||||||
0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country,
|
started = False
|
||||||
self.env.host, self.env.udp_port, self.env.allow_lan_udp
|
while not started:
|
||||||
)
|
try:
|
||||||
|
await self.status_server.start(
|
||||||
|
0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country,
|
||||||
|
hosts, self.env.udp_port, self.env.allow_lan_udp
|
||||||
|
)
|
||||||
|
started = True
|
||||||
|
except OSError as e:
|
||||||
|
if e.errno is errno.EADDRINUSE:
|
||||||
|
await asyncio.sleep(3)
|
||||||
|
continue
|
||||||
|
raise
|
||||||
|
|
||||||
def _iter_start_tasks(self):
|
def _iter_start_tasks(self):
|
||||||
yield self.start_status_server()
|
yield self.start_status_server()
|
||||||
|
|
|
@ -271,7 +271,8 @@ class SessionManager:
|
||||||
f'{host}:{port:d} : {e!r}')
|
f'{host}:{port:d} : {e!r}')
|
||||||
raise
|
raise
|
||||||
else:
|
else:
|
||||||
self.logger.info(f'{kind} server listening on {host}:{port:d}')
|
for s in self.servers[kind].sockets:
|
||||||
|
self.logger.info(f'{kind} server listening on {s.getsockname()[:2]}')
|
||||||
|
|
||||||
async def _start_external_servers(self):
|
async def _start_external_servers(self):
|
||||||
"""Start listening on TCP and SSL ports, but only if the respective
|
"""Start listening on TCP and SSL ports, but only if the respective
|
||||||
|
|
|
@ -1,10 +1,18 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import ipaddress
|
||||||
|
import socket
|
||||||
import struct
|
import struct
|
||||||
from time import perf_counter
|
from time import perf_counter
|
||||||
import logging
|
import logging
|
||||||
from typing import Optional, Tuple, NamedTuple
|
from typing import Optional, Tuple, NamedTuple, List, Union
|
||||||
from hub.schema.attrs import country_str_to_int, country_int_to_str
|
from hub.schema.attrs import country_str_to_int, country_int_to_str
|
||||||
from hub.common import LRUCache, is_valid_public_ipv4
|
from hub.common import (
|
||||||
|
LRUCache,
|
||||||
|
resolve_host,
|
||||||
|
is_valid_public_ip,
|
||||||
|
is_valid_public_ipv4,
|
||||||
|
is_valid_public_ipv6,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
@ -36,48 +44,75 @@ class SPVPing(NamedTuple):
|
||||||
return decoded
|
return decoded
|
||||||
|
|
||||||
|
|
||||||
PONG_ENCODING = b'!BBL32s4sH'
|
PONG_ENCODING_PRE = b'!BBL32s'
|
||||||
|
PONG_ENCODING_POST = b'!H'
|
||||||
|
|
||||||
class SPVPong(NamedTuple):
|
class SPVPong(NamedTuple):
|
||||||
protocol_version: int
|
protocol_version: int
|
||||||
flags: int
|
flags: int
|
||||||
height: int
|
height: int
|
||||||
tip: bytes
|
tip: bytes
|
||||||
source_address_raw: bytes
|
ipaddr: Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
|
||||||
country: int
|
country: int
|
||||||
|
|
||||||
|
FLAG_AVAILABLE = 0b00000001
|
||||||
|
FLAG_IPV6 = 0b00000010
|
||||||
|
|
||||||
def encode(self):
|
def encode(self):
|
||||||
return struct.pack(PONG_ENCODING, *self)
|
return (struct.pack(PONG_ENCODING_PRE, self.protocol_version, self.flags, self.height, self.tip) +
|
||||||
|
self.encode_address(self.ipaddr) +
|
||||||
|
struct.pack(PONG_ENCODING_POST, self.country))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def encode_address(address: str):
|
def encode_address(address: Union[str, ipaddress.IPv4Address, ipaddress.IPv6Address]):
|
||||||
return bytes(int(b) for b in address.split("."))
|
if not isinstance(address, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
|
||||||
|
address = ipaddress.ip_address(address)
|
||||||
|
return address.packed
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def make(cls, flags: int, height: int, tip: bytes, source_address: str, country: str) -> bytes:
|
def make(cls, flags: int, height: int, tip: bytes, source_address: str, country: str) -> bytes:
|
||||||
|
ipaddr = ipaddress.ip_address(source_address)
|
||||||
|
flags = (flags | cls.FLAG_IPV6) if ipaddr.version == 6 else (flags & ~cls.FLAG_IPV6)
|
||||||
return SPVPong(
|
return SPVPong(
|
||||||
PROTOCOL_VERSION, flags, height, tip,
|
PROTOCOL_VERSION, flags, height, tip,
|
||||||
cls.encode_address(source_address),
|
ipaddr,
|
||||||
country_str_to_int(country)
|
country_str_to_int(country)
|
||||||
).encode()
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def make_sans_source_address(cls, flags: int, height: int, tip: bytes, country: str) -> Tuple[bytes, bytes]:
|
def make_sans_source_address(cls, flags: int, height: int, tip: bytes, country: str) -> Tuple[bytes, bytes]:
|
||||||
pong = cls.make(flags, height, tip, '0.0.0.0', country)
|
pong = cls.make(flags, height, tip, '0.0.0.0', country)
|
||||||
return pong[:38], pong[42:]
|
pong = pong.encode()
|
||||||
|
return pong[0:1], pong[2:38], pong[42:]
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def decode(cls, packet: bytes):
|
def decode(cls, packet: bytes):
|
||||||
return cls(*struct.unpack(PONG_ENCODING, packet[:44]))
|
offset = 0
|
||||||
|
protocol_version, flags, height, tip = struct.unpack(PONG_ENCODING_PRE, packet[offset:offset+38])
|
||||||
|
offset += 38
|
||||||
|
if flags & cls.FLAG_IPV6:
|
||||||
|
addr_len = ipaddress.IPV6LENGTH // 8
|
||||||
|
ipaddr = ipaddress.ip_address(packet[offset:offset+addr_len])
|
||||||
|
offset += addr_len
|
||||||
|
else:
|
||||||
|
addr_len = ipaddress.IPV4LENGTH // 8
|
||||||
|
ipaddr = ipaddress.ip_address(packet[offset:offset+addr_len])
|
||||||
|
offset += addr_len
|
||||||
|
country, = struct.unpack(PONG_ENCODING_POST, packet[offset:offset+2])
|
||||||
|
offset += 2
|
||||||
|
return cls(protocol_version, flags, height, tip, ipaddr, country)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def available(self) -> bool:
|
def available(self) -> bool:
|
||||||
return (self.flags & 0b00000001) > 0
|
return (self.flags & self.FLAG_AVAILABLE) > 0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def ipv6(self) -> bool:
|
||||||
|
return (self.flags & self.FLAG_IPV6) > 0
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def ip_address(self) -> str:
|
def ip_address(self) -> str:
|
||||||
return ".".join(map(str, self.source_address_raw))
|
return self.ipaddr.compressed
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def country_name(self):
|
def country_name(self):
|
||||||
|
@ -94,7 +129,8 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol):
|
||||||
def __init__(
|
def __init__(
|
||||||
self, height: int, tip: bytes, country: str,
|
self, height: int, tip: bytes, country: str,
|
||||||
throttle_cache_size: int = 1024, throttle_reqs_per_sec: int = 10,
|
throttle_cache_size: int = 1024, throttle_reqs_per_sec: int = 10,
|
||||||
allow_localhost: bool = False, allow_lan: bool = False
|
allow_localhost: bool = False, allow_lan: bool = False,
|
||||||
|
is_valid_ip = is_valid_public_ip,
|
||||||
):
|
):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.transport: Optional[asyncio.transports.DatagramTransport] = None
|
self.transport: Optional[asyncio.transports.DatagramTransport] = None
|
||||||
|
@ -102,26 +138,27 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol):
|
||||||
self._tip = tip
|
self._tip = tip
|
||||||
self._flags = 0
|
self._flags = 0
|
||||||
self._country = country
|
self._country = country
|
||||||
self._left_cache = self._right_cache = None
|
self._cache0 = self._cache1 = self.cache2 = None
|
||||||
self.update_cached_response()
|
self.update_cached_response()
|
||||||
self._throttle = LRUCache(throttle_cache_size)
|
self._throttle = LRUCache(throttle_cache_size)
|
||||||
self._should_log = LRUCache(throttle_cache_size)
|
self._should_log = LRUCache(throttle_cache_size)
|
||||||
self._min_delay = 1 / throttle_reqs_per_sec
|
self._min_delay = 1 / throttle_reqs_per_sec
|
||||||
self._allow_localhost = allow_localhost
|
self._allow_localhost = allow_localhost
|
||||||
self._allow_lan = allow_lan
|
self._allow_lan = allow_lan
|
||||||
|
self._is_valid_ip = is_valid_ip
|
||||||
self.closed = asyncio.Event()
|
self.closed = asyncio.Event()
|
||||||
|
|
||||||
def update_cached_response(self):
|
def update_cached_response(self):
|
||||||
self._left_cache, self._right_cache = SPVPong.make_sans_source_address(
|
self._cache0, self._cache1, self._cache2 = SPVPong.make_sans_source_address(
|
||||||
self._flags, max(0, self._height), self._tip, self._country
|
self._flags, max(0, self._height), self._tip, self._country
|
||||||
)
|
)
|
||||||
|
|
||||||
def set_unavailable(self):
|
def set_unavailable(self):
|
||||||
self._flags &= 0b11111110
|
self._flags &= ~SPVPong.FLAG_AVAILABLE
|
||||||
self.update_cached_response()
|
self.update_cached_response()
|
||||||
|
|
||||||
def set_available(self):
|
def set_available(self):
|
||||||
self._flags |= 0b00000001
|
self._flags |= SPVPong.FLAG_AVAILABLE
|
||||||
self.update_cached_response()
|
self.update_cached_response()
|
||||||
|
|
||||||
def set_height(self, height: int, tip: bytes):
|
def set_height(self, height: int, tip: bytes):
|
||||||
|
@ -141,17 +178,25 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def make_pong(self, host):
|
def make_pong(self, host):
|
||||||
return self._left_cache + SPVPong.encode_address(host) + self._right_cache
|
ipaddr = ipaddress.ip_address(host)
|
||||||
|
if ipaddr.version == 6:
|
||||||
|
flags = self._flags | SPVPong.FLAG_IPV6
|
||||||
|
else:
|
||||||
|
flags = self._flags & ~SPVPong.FLAG_IPV6
|
||||||
|
return (self._cache0 + flags.to_bytes(1, 'big') +
|
||||||
|
self._cache1 + SPVPong.encode_address(ipaddr) +
|
||||||
|
self._cache2)
|
||||||
|
|
||||||
def datagram_received(self, data: bytes, addr: Tuple[str, int]):
|
def datagram_received(self, data: bytes, addr: Union[Tuple[str, int], Tuple[str, int, int, int]]):
|
||||||
if self.should_throttle(addr[0]):
|
if self.should_throttle(addr[0]):
|
||||||
|
# print(f"throttled: {addr}")
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
SPVPing.decode(data)
|
SPVPing.decode(data)
|
||||||
except (ValueError, struct.error, AttributeError, TypeError):
|
except (ValueError, struct.error, AttributeError, TypeError):
|
||||||
# log.exception("derp")
|
# log.exception("derp")
|
||||||
return
|
return
|
||||||
if addr[1] >= 1024 and is_valid_public_ipv4(
|
if addr[1] >= 1024 and self._is_valid_ip(
|
||||||
addr[0], allow_localhost=self._allow_localhost, allow_lan=self._allow_lan):
|
addr[0], allow_localhost=self._allow_localhost, allow_lan=self._allow_lan):
|
||||||
self.transport.sendto(self.make_pong(addr[0]), addr)
|
self.transport.sendto(self.make_pong(addr[0]), addr)
|
||||||
else:
|
else:
|
||||||
|
@ -174,39 +219,78 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol):
|
||||||
|
|
||||||
class StatusServer:
|
class StatusServer:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._protocol: Optional[SPVServerStatusProtocol] = None
|
self._protocols: List[SPVServerStatusProtocol] = []
|
||||||
|
|
||||||
async def start(self, height: int, tip: bytes, country: str, interface: str, port: int, allow_lan: bool = False):
|
async def _start(self, height: int, tip: bytes, country: str, addr: str, port: int, allow_lan: bool = False):
|
||||||
if self.is_running:
|
ipaddr = ipaddress.ip_address(addr)
|
||||||
return
|
if ipaddr.version == 4:
|
||||||
loop = asyncio.get_event_loop()
|
proto = SPVServerStatusProtocol(
|
||||||
interface = interface if interface.lower() != 'localhost' else '127.0.0.1'
|
height, tip, country,
|
||||||
self._protocol = SPVServerStatusProtocol(
|
allow_localhost=ipaddr.is_loopback or ipaddr.is_unspecified,
|
||||||
height, tip, country, allow_localhost=interface == '127.0.0.1', allow_lan=allow_lan
|
allow_lan=allow_lan,
|
||||||
)
|
is_valid_ip=is_valid_public_ipv4,
|
||||||
await loop.create_datagram_endpoint(lambda: self._protocol, (interface, port))
|
)
|
||||||
log.info("started udp status server on %s:%i", interface, port)
|
loop = asyncio.get_event_loop()
|
||||||
|
await loop.create_datagram_endpoint(lambda: proto, (ipaddr.compressed, port), family=socket.AF_INET)
|
||||||
|
elif ipaddr.version == 6:
|
||||||
|
proto = SPVServerStatusProtocol(
|
||||||
|
height, tip, country,
|
||||||
|
allow_localhost=ipaddr.is_loopback or ipaddr.is_unspecified,
|
||||||
|
allow_lan=allow_lan,
|
||||||
|
is_valid_ip=is_valid_public_ipv6,
|
||||||
|
)
|
||||||
|
# Because dualstack / IPv4 mapped address behavior on an IPv6 socket
|
||||||
|
# differs based on system config, create the socket with IPV6_V6ONLY.
|
||||||
|
# This disables the IPv4 mapped feature, so we don't need to consider
|
||||||
|
# when an IPv6 socket may interfere with IPv4 binding / traffic.
|
||||||
|
sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
|
||||||
|
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
|
||||||
|
sock.bind((ipaddr.compressed, port))
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
await loop.create_datagram_endpoint(lambda: proto, sock=sock)
|
||||||
|
else:
|
||||||
|
raise ValueError(f'unexpected IP address version {ipaddr.version}')
|
||||||
|
log.info("started udp%i status server on %s", ipaddr.version, proto.transport.get_extra_info('sockname')[:2])
|
||||||
|
self._protocols.append(proto)
|
||||||
|
|
||||||
|
async def start(self, height: int, tip: bytes, country: str, hosts: List[str], port: int, allow_lan: bool = False):
|
||||||
|
if not isinstance(hosts, list):
|
||||||
|
hosts = [hosts]
|
||||||
|
try:
|
||||||
|
for host in hosts:
|
||||||
|
addr = None
|
||||||
|
if not host:
|
||||||
|
resolved = ['::', '0.0.0.0'] # unspecified address
|
||||||
|
else:
|
||||||
|
resolved = await resolve_host(host, port, 'udp', family=socket.AF_UNSPEC, all_results=True)
|
||||||
|
for addr in resolved:
|
||||||
|
await self._start(height, tip, country, addr, port, allow_lan)
|
||||||
|
except Exception as e:
|
||||||
|
if not isinstance(e, asyncio.CancelledError):
|
||||||
|
log.error("UDP status server failed to listen on (%s:%i) : %s", addr or host, port, e)
|
||||||
|
await self.stop()
|
||||||
|
raise
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
if self.is_running:
|
for proto in self._protocols:
|
||||||
await self._protocol.close()
|
await proto.close()
|
||||||
self._protocol = None
|
self._protocols.clear()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_running(self):
|
def is_running(self):
|
||||||
return self._protocol is not None
|
return self._protocols
|
||||||
|
|
||||||
def set_unavailable(self):
|
def set_unavailable(self):
|
||||||
if self.is_running:
|
for proto in self._protocols:
|
||||||
self._protocol.set_unavailable()
|
proto.set_unavailable()
|
||||||
|
|
||||||
def set_available(self):
|
def set_available(self):
|
||||||
if self.is_running:
|
for proto in self._protocols:
|
||||||
self._protocol.set_available()
|
proto.set_available()
|
||||||
|
|
||||||
def set_height(self, height: int, tip: bytes):
|
def set_height(self, height: int, tip: bytes):
|
||||||
if self.is_running:
|
for proto in self._protocols:
|
||||||
self._protocol.set_height(height, tip)
|
proto.set_height(height, tip)
|
||||||
|
|
||||||
|
|
||||||
class SPVStatusClientProtocol(asyncio.DatagramProtocol):
|
class SPVStatusClientProtocol(asyncio.DatagramProtocol):
|
||||||
|
@ -217,9 +301,14 @@ class SPVStatusClientProtocol(asyncio.DatagramProtocol):
|
||||||
self.responses = responses
|
self.responses = responses
|
||||||
self._ping_packet = SPVPing.make()
|
self._ping_packet = SPVPing.make()
|
||||||
|
|
||||||
def datagram_received(self, data: bytes, addr: Tuple[str, int]):
|
def datagram_received(self, data: bytes, addr: Union[Tuple[str, int], Tuple[str, int, int, int]]):
|
||||||
try:
|
try:
|
||||||
self.responses.put_nowait(((addr, perf_counter()), SPVPong.decode(data)))
|
if len(addr) > 2: # IPv6 with possible mapped IPv4
|
||||||
|
ipaddr = ipaddress.ip_address(addr[0])
|
||||||
|
if ipaddr.ipv4_mapped:
|
||||||
|
# mapped IPv4 address identified
|
||||||
|
addr = (ipaddr.ipv4_mapped.compressed, addr[1])
|
||||||
|
self.responses.put_nowait(((addr[:2], perf_counter()), SPVPong.decode(data)))
|
||||||
except (ValueError, struct.error, AttributeError, TypeError, RuntimeError):
|
except (ValueError, struct.error, AttributeError, TypeError, RuntimeError):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -230,7 +319,7 @@ class SPVStatusClientProtocol(asyncio.DatagramProtocol):
|
||||||
self.transport = None
|
self.transport = None
|
||||||
log.info("closed udp spv server selection client")
|
log.info("closed udp spv server selection client")
|
||||||
|
|
||||||
def ping(self, server: Tuple[str, int]):
|
def ping(self, server: Union[Tuple[str, int], Tuple[str, int, int, int]]):
|
||||||
self.transport.sendto(self._ping_packet, server)
|
self.transport.sendto(self._ping_packet, server)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
|
|
|
@ -5,17 +5,17 @@ import time
|
||||||
from typing import List
|
from typing import List
|
||||||
from concurrent.futures.thread import ThreadPoolExecutor
|
from concurrent.futures.thread import ThreadPoolExecutor
|
||||||
from bisect import bisect_right
|
from bisect import bisect_right
|
||||||
from hub.common import ResumableSHA256
|
from hub.common import sha256
|
||||||
from hub.db import SecondaryDB
|
from hub.db import SecondaryDB
|
||||||
|
|
||||||
|
|
||||||
class PrimaryDB(SecondaryDB):
|
class PrimaryDB(SecondaryDB):
|
||||||
def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
|
def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
|
||||||
cache_all_tx_hashes: bool = False,
|
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||||
max_open_files: int = 64, blocking_channel_ids: List[str] = None,
|
max_open_files: int = 64, blocking_channel_ids: List[str] = None,
|
||||||
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||||
index_address_status=False, enforce_integrity=True):
|
index_address_status=False, enforce_integrity=True):
|
||||||
super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_tx_hashes,
|
super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_claim_txos, cache_all_tx_hashes,
|
||||||
blocking_channel_ids, filtering_channel_ids, executor, index_address_status,
|
blocking_channel_ids, filtering_channel_ids, executor, index_address_status,
|
||||||
enforce_integrity=enforce_integrity)
|
enforce_integrity=enforce_integrity)
|
||||||
|
|
||||||
|
@ -35,19 +35,16 @@ class PrimaryDB(SecondaryDB):
|
||||||
if last_hashX:
|
if last_hashX:
|
||||||
yield last_hashX
|
yield last_hashX
|
||||||
|
|
||||||
def hashX_status_from_history(history: bytes) -> ResumableSHA256:
|
def hashX_status_from_history(history: bytes) -> bytes:
|
||||||
tx_counts = self.tx_counts
|
tx_counts = self.tx_counts
|
||||||
hist_tx_nums = array.array('I')
|
hist_tx_nums = array.array('I')
|
||||||
hist_tx_nums.frombytes(history)
|
hist_tx_nums.frombytes(history)
|
||||||
digest = ResumableSHA256()
|
digest = hashlib.sha256()
|
||||||
digest.update(
|
for tx_num, tx_hash in zip(
|
||||||
b''.join(f'{tx_hash[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'.encode()
|
|
||||||
for tx_num, tx_hash in zip(
|
|
||||||
hist_tx_nums,
|
hist_tx_nums,
|
||||||
self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)
|
self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)):
|
||||||
))
|
digest.update(f'{tx_hash[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'.encode())
|
||||||
)
|
return digest.digest()
|
||||||
return digest
|
|
||||||
|
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
|
|
||||||
|
@ -70,24 +67,17 @@ class PrimaryDB(SecondaryDB):
|
||||||
hashX_cnt += 1
|
hashX_cnt += 1
|
||||||
key = prefix_db.hashX_status.pack_key(hashX)
|
key = prefix_db.hashX_status.pack_key(hashX)
|
||||||
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
|
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
|
||||||
digester = hashX_status_from_history(history)
|
status = hashX_status_from_history(history)
|
||||||
status = digester.digest()
|
|
||||||
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
|
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
|
||||||
existing_digester = prefix_db.hashX_history_hasher.get(hashX)
|
if existing_status and existing_status == status:
|
||||||
if not existing_status:
|
continue
|
||||||
|
elif not existing_status:
|
||||||
prefix_db.stash_raw_put(key, status)
|
prefix_db.stash_raw_put(key, status)
|
||||||
op_cnt += 1
|
op_cnt += 1
|
||||||
else:
|
else:
|
||||||
prefix_db.stash_raw_delete(key, existing_status)
|
prefix_db.stash_raw_delete(key, existing_status)
|
||||||
prefix_db.stash_raw_put(key, status)
|
prefix_db.stash_raw_put(key, status)
|
||||||
op_cnt += 2
|
op_cnt += 2
|
||||||
if not existing_digester:
|
|
||||||
prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,))
|
|
||||||
op_cnt += 1
|
|
||||||
else:
|
|
||||||
prefix_db.hashX_history_hasher.stash_delete((hashX,), existing_digester)
|
|
||||||
prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,))
|
|
||||||
op_cnt += 2
|
|
||||||
if op_cnt > 100000:
|
if op_cnt > 100000:
|
||||||
prefix_db.unsafe_commit()
|
prefix_db.unsafe_commit()
|
||||||
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
|
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
|
||||||
|
|
|
@ -3,13 +3,14 @@ from hub.env import Env
|
||||||
|
|
||||||
class BlockchainEnv(Env):
|
class BlockchainEnv(Env):
|
||||||
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
|
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
|
||||||
prometheus_port=None, cache_all_tx_hashes=None, blocking_channel_ids=None, filtering_channel_ids=None,
|
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
|
||||||
|
blocking_channel_ids=None, filtering_channel_ids=None,
|
||||||
db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None,
|
db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None,
|
||||||
index_address_status=None, rebuild_address_status_from_height=None,
|
index_address_status=None, rebuild_address_status_from_height=None,
|
||||||
daemon_ca_path=None, history_tx_cache_size=None,
|
daemon_ca_path=None, history_tx_cache_size=None,
|
||||||
db_disable_integrity_checks=False):
|
db_disable_integrity_checks=False):
|
||||||
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
|
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
|
||||||
blocking_channel_ids, filtering_channel_ids, index_address_status)
|
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
|
||||||
self.db_max_open_files = db_max_open_files
|
self.db_max_open_files = db_max_open_files
|
||||||
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
|
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
|
||||||
self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \
|
self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \
|
||||||
|
@ -56,7 +57,7 @@ class BlockchainEnv(Env):
|
||||||
db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files,
|
db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files,
|
||||||
max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit,
|
max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit,
|
||||||
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
|
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
|
||||||
index_address_status=args.index_address_statuses,
|
cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses,
|
||||||
hashX_history_cache_size=args.address_history_cache_size,
|
hashX_history_cache_size=args.address_history_cache_size,
|
||||||
rebuild_address_status_from_height=args.rebuild_address_status_from_height,
|
rebuild_address_status_from_height=args.rebuild_address_status_from_height,
|
||||||
daemon_ca_path=args.daemon_ca_path, history_tx_cache_size=args.history_tx_cache_size,
|
daemon_ca_path=args.daemon_ca_path, history_tx_cache_size=args.history_tx_cache_size,
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
import os
|
|
||||||
import time
|
import time
|
||||||
import asyncio
|
import asyncio
|
||||||
import typing
|
import typing
|
||||||
|
@ -12,8 +11,7 @@ from hub import PROMETHEUS_NAMESPACE
|
||||||
from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
|
from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
|
||||||
from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
|
from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
|
||||||
from hub.error.base import ChainError
|
from hub.error.base import ChainError
|
||||||
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache
|
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache, LFUCacheWithMetrics
|
||||||
from hub.common import ResumableSHA256, LFUCacheWithMetrics
|
|
||||||
from hub.scribe.db import PrimaryDB
|
from hub.scribe.db import PrimaryDB
|
||||||
from hub.scribe.daemon import LBCDaemon
|
from hub.scribe.daemon import LBCDaemon
|
||||||
from hub.scribe.transaction import Tx, TxOutput, TxInput, Block
|
from hub.scribe.transaction import Tx, TxOutput, TxInput, Block
|
||||||
|
@ -139,10 +137,11 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
def open_db(self):
|
def open_db(self):
|
||||||
env = self.env
|
env = self.env
|
||||||
self.db = PrimaryDB(
|
self.db = PrimaryDB(
|
||||||
env.coin, env.db_dir, env.reorg_limit, cache_all_tx_hashes=env.cache_all_tx_hashes,
|
env.coin, env.db_dir, env.reorg_limit, cache_all_claim_txos=env.cache_all_claim_txos,
|
||||||
max_open_files=env.db_max_open_files, blocking_channel_ids=env.blocking_channel_ids,
|
cache_all_tx_hashes=env.cache_all_tx_hashes, max_open_files=env.db_max_open_files,
|
||||||
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids,
|
||||||
index_address_status=env.index_address_status, enforce_integrity=not env.db_disable_integrity_checks
|
executor=self._executor, index_address_status=env.index_address_status,
|
||||||
|
enforce_integrity=not env.db_disable_integrity_checks
|
||||||
)
|
)
|
||||||
|
|
||||||
async def run_in_thread_with_lock(self, func, *args):
|
async def run_in_thread_with_lock(self, func, *args):
|
||||||
|
@ -170,22 +169,10 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
|
|
||||||
def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete):
|
def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete):
|
||||||
self.mempool.remove(to_delete)
|
self.mempool.remove(to_delete)
|
||||||
touched_hashXs = list(self.mempool.update_mempool(to_put))
|
touched_hashXs = self.mempool.update_mempool(to_put)
|
||||||
if self.env.index_address_status:
|
if self.env.index_address_status:
|
||||||
status_hashers = {
|
for hashX in touched_hashXs:
|
||||||
k: v.hasher if v else ResumableSHA256() for k, v in zip(
|
self._get_update_hashX_mempool_status_ops(hashX)
|
||||||
touched_hashXs,
|
|
||||||
self.db.prefix_db.hashX_history_hasher.multi_get([(hashX,) for hashX in touched_hashXs])
|
|
||||||
)
|
|
||||||
}
|
|
||||||
for hashX, v in zip(
|
|
||||||
touched_hashXs,
|
|
||||||
self.db.prefix_db.hashX_mempool_status.multi_get([(hashX,) for hashX in touched_hashXs])):
|
|
||||||
if v is not None:
|
|
||||||
self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), v)
|
|
||||||
hasher = status_hashers[hashX]
|
|
||||||
hasher.update(self.mempool.mempool_history(hashX).encode())
|
|
||||||
self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (hasher.digest(),))
|
|
||||||
for tx_hash, raw_tx in to_put:
|
for tx_hash, raw_tx in to_put:
|
||||||
mempool_prefix.stash_put((tx_hash,), (raw_tx,))
|
mempool_prefix.stash_put((tx_hash,), (raw_tx,))
|
||||||
for tx_hash, raw_tx in to_delete.items():
|
for tx_hash, raw_tx in to_delete.items():
|
||||||
|
@ -276,6 +263,9 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
for _ in range(count):
|
for _ in range(count):
|
||||||
await self.run_in_thread_with_lock(self.backup_block)
|
await self.run_in_thread_with_lock(self.backup_block)
|
||||||
self.log.info(f'backed up to height {self.height:,d}')
|
self.log.info(f'backed up to height {self.height:,d}')
|
||||||
|
|
||||||
|
if self.env.cache_all_claim_txos:
|
||||||
|
await self.db._read_claim_txos() # TODO: don't do this
|
||||||
await self.prefetcher.reset_height(self.height)
|
await self.prefetcher.reset_height(self.height)
|
||||||
self.reorg_count_metric.inc()
|
self.reorg_count_metric.inc()
|
||||||
except:
|
except:
|
||||||
|
@ -404,6 +394,12 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
if claim_hash not in self.updated_claim_previous_activations:
|
if claim_hash not in self.updated_claim_previous_activations:
|
||||||
self.updated_claim_previous_activations[claim_hash] = activation
|
self.updated_claim_previous_activations[claim_hash] = activation
|
||||||
|
|
||||||
|
if self.env.cache_all_claim_txos:
|
||||||
|
self.db.claim_to_txo[claim_hash] = ClaimToTXOValue(
|
||||||
|
tx_num, nout, root_tx_num, root_idx, txo.value, channel_signature_is_valid, claim_name
|
||||||
|
)
|
||||||
|
self.db.txo_to_claim[tx_num][nout] = claim_hash
|
||||||
|
|
||||||
pending = StagedClaimtrieItem(
|
pending = StagedClaimtrieItem(
|
||||||
claim_name, normalized_name, claim_hash, txo.value, self.coin.get_expiration_height(height), tx_num, nout,
|
claim_name, normalized_name, claim_hash, txo.value, self.coin.get_expiration_height(height), tx_num, nout,
|
||||||
root_tx_num, root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash
|
root_tx_num, root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash
|
||||||
|
@ -694,6 +690,11 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
if 0 < activation <= self.height:
|
if 0 < activation <= self.height:
|
||||||
self.effective_amount_delta[claim_hash] -= spent.amount
|
self.effective_amount_delta[claim_hash] -= spent.amount
|
||||||
self.future_effective_amount_delta[spent.claim_hash] -= spent.amount
|
self.future_effective_amount_delta[spent.claim_hash] -= spent.amount
|
||||||
|
if self.env.cache_all_claim_txos:
|
||||||
|
claim_hash = self.db.txo_to_claim[txin_num].pop(nout)
|
||||||
|
if not self.db.txo_to_claim[txin_num]:
|
||||||
|
self.db.txo_to_claim.pop(txin_num)
|
||||||
|
self.db.claim_to_txo.pop(claim_hash)
|
||||||
if spent.reposted_claim_hash:
|
if spent.reposted_claim_hash:
|
||||||
self.pending_reposted.add(spent.reposted_claim_hash)
|
self.pending_reposted.add(spent.reposted_claim_hash)
|
||||||
if spent.signing_hash and spent.channel_signature_is_valid and spent.signing_hash not in self.abandoned_claims:
|
if spent.signing_hash and spent.channel_signature_is_valid and spent.signing_hash not in self.abandoned_claims:
|
||||||
|
@ -1636,6 +1637,15 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
self.hashX_full_cache[hashX] = history
|
self.hashX_full_cache[hashX] = history
|
||||||
return history
|
return history
|
||||||
|
|
||||||
|
def _get_update_hashX_mempool_status_ops(self, hashX: bytes):
|
||||||
|
existing = self.db.prefix_db.hashX_mempool_status.get(hashX)
|
||||||
|
if existing:
|
||||||
|
self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), existing)
|
||||||
|
history = self._get_cached_hashX_history(hashX) + self.mempool.mempool_history(hashX)
|
||||||
|
if history:
|
||||||
|
status = sha256(history.encode())
|
||||||
|
self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (status,))
|
||||||
|
|
||||||
def advance_block(self, block: Block):
|
def advance_block(self, block: Block):
|
||||||
txo_count = 0
|
txo_count = 0
|
||||||
txi_count = 0
|
txi_count = 0
|
||||||
|
@ -1735,7 +1745,9 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
# update hashX history status hashes and compactify the histories
|
# update hashX history status hashes and compactify the histories
|
||||||
self._get_update_hashX_histories_ops(height)
|
self._get_update_hashX_histories_ops(height)
|
||||||
|
|
||||||
if self.env.index_address_status:
|
# only compactify adddress histories and update the status index if we're already caught up,
|
||||||
|
# a bulk update will happen once catchup finishes
|
||||||
|
if not self.db.catching_up and self.env.index_address_status:
|
||||||
self._get_compactify_ops(height)
|
self._get_compactify_ops(height)
|
||||||
self.db.last_indexed_address_status_height = height
|
self.db.last_indexed_address_status_height = height
|
||||||
|
|
||||||
|
@ -1790,17 +1802,6 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
)
|
)
|
||||||
|
|
||||||
def _get_compactify_ops(self, height: int):
|
def _get_compactify_ops(self, height: int):
|
||||||
def _rebuild_hasher(hist_tx_nums):
|
|
||||||
hasher = ResumableSHA256()
|
|
||||||
hasher.update(
|
|
||||||
b''.join(f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num)}:'.encode()
|
|
||||||
for tx_num, tx_hash in zip(
|
|
||||||
hist_tx_nums,
|
|
||||||
self.db.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)
|
|
||||||
))
|
|
||||||
)
|
|
||||||
return hasher
|
|
||||||
|
|
||||||
existing_hashX_statuses = self.db.prefix_db.hashX_status.multi_get([(hashX,) for hashX in self.hashXs_by_tx.keys()], deserialize_value=False)
|
existing_hashX_statuses = self.db.prefix_db.hashX_status.multi_get([(hashX,) for hashX in self.hashXs_by_tx.keys()], deserialize_value=False)
|
||||||
if existing_hashX_statuses:
|
if existing_hashX_statuses:
|
||||||
pack_key = self.db.prefix_db.hashX_status.pack_key
|
pack_key = self.db.prefix_db.hashX_status.pack_key
|
||||||
|
@ -1815,13 +1816,6 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
append_deletes_hashX_history = block_hashX_history_deletes.append
|
append_deletes_hashX_history = block_hashX_history_deletes.append
|
||||||
block_hashX_history_puts = []
|
block_hashX_history_puts = []
|
||||||
|
|
||||||
existing_status_hashers = {
|
|
||||||
k: v.hasher if v else None for k, v in zip(
|
|
||||||
self.hashXs_by_tx,
|
|
||||||
self.db.prefix_db.hashX_history_hasher.multi_get([(hashX,) for hashX in self.hashXs_by_tx])
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
for (hashX, new_tx_nums), existing in zip(self.hashXs_by_tx.items(), existing_hashX_statuses):
|
for (hashX, new_tx_nums), existing in zip(self.hashXs_by_tx.items(), existing_hashX_statuses):
|
||||||
new_history = [(self.pending_transactions[tx_num], height) for tx_num in new_tx_nums]
|
new_history = [(self.pending_transactions[tx_num], height) for tx_num in new_tx_nums]
|
||||||
|
|
||||||
|
@ -1836,9 +1830,11 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
unpack_key = self.db.prefix_db.hashX_history.unpack_key
|
unpack_key = self.db.prefix_db.hashX_history.unpack_key
|
||||||
needs_compaction = False
|
needs_compaction = False
|
||||||
|
|
||||||
|
total_hist_txs = b''
|
||||||
for k, hist in self.db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_key=False,
|
for k, hist in self.db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_key=False,
|
||||||
deserialize_value=False):
|
deserialize_value=False):
|
||||||
hist_txs = unpack_history(hist)
|
hist_txs = unpack_history(hist)
|
||||||
|
total_hist_txs += hist
|
||||||
txs_extend(hist_txs)
|
txs_extend(hist_txs)
|
||||||
hist_height = unpack_key(k).height
|
hist_height = unpack_key(k).height
|
||||||
if height > reorg_limit and hist_height < height - reorg_limit:
|
if height > reorg_limit and hist_height < height - reorg_limit:
|
||||||
|
@ -1857,19 +1853,27 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
block_hashX_history_puts.append(((hashX, 0), (compact_hist_txs,)))
|
block_hashX_history_puts.append(((hashX, 0), (compact_hist_txs,)))
|
||||||
if not new_history:
|
if not new_history:
|
||||||
continue
|
continue
|
||||||
hasher = existing_status_hashers[hashX]
|
|
||||||
if hasher is None:
|
needed_tx_infos = []
|
||||||
# this is to migrate in the new column family, in the future it can be a new digester
|
append_needed_tx_info = needed_tx_infos.append
|
||||||
# hasher = ResumableSHA256()
|
tx_infos = {}
|
||||||
hasher = _rebuild_hasher(tx_nums)
|
for tx_num in tx_nums:
|
||||||
else:
|
cached_tx_info = self.history_tx_info_cache.get(tx_num)
|
||||||
self.db.prefix_db.hashX_history_hasher.stash_delete((hashX,), (hasher,))
|
if cached_tx_info is not None:
|
||||||
hasher.update(b''.join(
|
tx_infos[tx_num] = cached_tx_info
|
||||||
f'{tx_hash[::-1].hex()}:{height:d}:'.encode() for tx_hash, height in new_history
|
else:
|
||||||
))
|
append_needed_tx_info(tx_num)
|
||||||
self.db.prefix_db.hashX_history_hasher.stash_put((hashX,), (hasher,))
|
if needed_tx_infos:
|
||||||
status = hasher.digest()
|
for tx_num, tx_hash in zip(needed_tx_infos, self.db._get_tx_hashes(needed_tx_infos)):
|
||||||
self.db.prefix_db.hashX_status.stash_put((hashX,), (status,))
|
tx_info = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:'
|
||||||
|
tx_infos[tx_num] = tx_info
|
||||||
|
self.history_tx_info_cache[tx_num] = tx_info
|
||||||
|
history = ''.join(map(tx_infos.__getitem__, tx_nums))
|
||||||
|
for tx_hash, height in new_history:
|
||||||
|
history += f'{tx_hash[::-1].hex()}:{height:d}:'
|
||||||
|
if history:
|
||||||
|
status = sha256(history.encode())
|
||||||
|
self.db.prefix_db.hashX_status.stash_put((hashX,), (status,))
|
||||||
|
|
||||||
self.db.prefix_db.multi_delete(block_hashX_history_deletes)
|
self.db.prefix_db.multi_delete(block_hashX_history_deletes)
|
||||||
self.db.prefix_db.hashX_history.stash_multi_put(block_hashX_history_puts)
|
self.db.prefix_db.hashX_history.stash_multi_put(block_hashX_history_puts)
|
||||||
|
@ -2070,8 +2074,6 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
"""Loop forever processing blocks as they arrive."""
|
"""Loop forever processing blocks as they arrive."""
|
||||||
self._caught_up_event = caught_up_event
|
self._caught_up_event = caught_up_event
|
||||||
try:
|
try:
|
||||||
if os.path.exists(self.db._need_restart_path):
|
|
||||||
raise RuntimeError('scribe restart is needed')
|
|
||||||
if self.height != self.daemon.cached_height() and not self.db.catching_up:
|
if self.height != self.daemon.cached_height() and not self.db.catching_up:
|
||||||
await self._need_catch_up() # tell the readers that we're still catching up with lbrycrd/lbcd
|
await self._need_catch_up() # tell the readers that we're still catching up with lbrycrd/lbcd
|
||||||
while not self._stopping:
|
while not self._stopping:
|
||||||
|
@ -2133,6 +2135,9 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
async def _finished_initial_catch_up(self):
|
async def _finished_initial_catch_up(self):
|
||||||
self.log.info(f'caught up to height {self.height}')
|
self.log.info(f'caught up to height {self.height}')
|
||||||
|
|
||||||
|
if self.env.index_address_status and self.db.last_indexed_address_status_height < self.db.db_height:
|
||||||
|
await self.db.rebuild_hashX_status_index(self.db.last_indexed_address_status_height)
|
||||||
|
|
||||||
# Flush everything but with catching_up->False state.
|
# Flush everything but with catching_up->False state.
|
||||||
self.db.catching_up = False
|
self.db.catching_up = False
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,7 @@ class BlockchainService:
|
||||||
def open_db(self):
|
def open_db(self):
|
||||||
env = self.env
|
env = self.env
|
||||||
self.db = SecondaryDB(
|
self.db = SecondaryDB(
|
||||||
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
|
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
|
||||||
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
|
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
|
||||||
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
||||||
index_address_status=env.index_address_status
|
index_address_status=env.index_address_status
|
||||||
|
|
5
setup.py
5
setup.py
|
@ -33,7 +33,7 @@ setup(
|
||||||
'certifi>=2021.10.08',
|
'certifi>=2021.10.08',
|
||||||
'colorama==0.3.7',
|
'colorama==0.3.7',
|
||||||
'cffi==1.13.2',
|
'cffi==1.13.2',
|
||||||
'protobuf==3.18.3',
|
'protobuf==3.17.2',
|
||||||
'msgpack==0.6.1',
|
'msgpack==0.6.1',
|
||||||
'prometheus_client==0.7.1',
|
'prometheus_client==0.7.1',
|
||||||
'coincurve==15.0.0',
|
'coincurve==15.0.0',
|
||||||
|
@ -44,8 +44,7 @@ setup(
|
||||||
'filetype==1.0.9',
|
'filetype==1.0.9',
|
||||||
'grpcio==1.38.0',
|
'grpcio==1.38.0',
|
||||||
'lbry-rocksdb==0.8.2',
|
'lbry-rocksdb==0.8.2',
|
||||||
'ujson==5.4.0',
|
'ujson==5.4.0'
|
||||||
'rehash==1.0.0'
|
|
||||||
],
|
],
|
||||||
extras_require={
|
extras_require={
|
||||||
'lint': ['pylint==2.10.0'],
|
'lint': ['pylint==2.10.0'],
|
||||||
|
|
Loading…
Reference in a new issue