Compare commits

..

11 commits

Author SHA1 Message Date
Jack Robison
ebcc6e5086
update snapshot url in example docker-composes 2023-03-03 13:15:25 -05:00
Jack Robison
c0766f6abc faster RevertableOpStack.apply_packed_undo_ops 2023-02-13 13:09:03 -05:00
Jack Robison
7bc90c425f
fix error 2023-02-07 17:31:36 -05:00
Jack Robison
f2c494d4d6 check for scribe needing a restart due to low memory 2023-02-07 17:23:23 -05:00
Jack Robison
8147bbf3b9 remove --cache_all_claim_txos setting 2023-02-07 17:23:23 -05:00
Jack Robison
adbeeaf203 update snapshot 2023-01-06 20:32:45 -05:00
Jack Robison
f55ed56215 add comment 2023-01-06 20:32:45 -05:00
Jack Robison
d1d33c4bce feedback 2023-01-06 20:32:45 -05:00
Jack Robison
b7de08ba0b add ResumableSHA256 and HashXHistoryHasherPrefixRow column family 2023-01-06 20:32:45 -05:00
Jack Robison
405cef8d28 ResumableSHA256 2023-01-06 20:32:45 -05:00
Jack Robison
21262d2e43
fix edge case deleting an empty value 2022-12-28 13:40:19 -05:00
22 changed files with 285 additions and 343 deletions

View file

@ -17,7 +17,7 @@ services:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1256013/lbry-rocksdb.tar
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1312050/lbry-rocksdb.tar
command: # for full options, see `scribe --help`
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--max_query_workers=2"

View file

@ -14,7 +14,7 @@ services:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1256013/lbry-rocksdb.tar
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1312050/lbry-rocksdb.tar
command:
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--max_query_workers=2"

View file

@ -5,15 +5,16 @@ import hmac
import ipaddress
import logging
import logging.handlers
import socket
import typing
import collections
from ctypes import cast, memmove, POINTER, c_void_p
from bisect import insort_right
from collections import deque
from decimal import Decimal
from typing import Iterable, Deque
from asyncio import get_event_loop, Event
from prometheus_client import Counter
from rehash.structs import EVPobject
from hub.schema.tags import clean_tags
from hub.schema.url import normalize_name
from hub.error import TooManyClaimSearchParametersError
@ -154,38 +155,6 @@ def protocol_version(client_req, min_tuple, max_tuple):
return result, client_min
async def resolve_host(url: str, port: int, proto: str,
family: int = socket.AF_INET, all_results: bool = False) \
-> typing.Union[str, typing.List[str]]:
if proto not in ['udp', 'tcp']:
raise Exception("invalid protocol")
try:
if ipaddress.ip_address(url):
return [url] if all_results else url
except ValueError:
pass
loop = asyncio.get_running_loop()
records = await loop.getaddrinfo(
url, port,
proto=socket.IPPROTO_TCP if proto == 'tcp' else socket.IPPROTO_UDP,
type=socket.SOCK_STREAM if proto == 'tcp' else socket.SOCK_DGRAM,
family=family,
)
def addr_not_ipv4_mapped(rec):
_, _, _, _, sockaddr = rec
ipaddr = ipaddress.ip_address(sockaddr[0])
return ipaddr.version != 6 or not ipaddr.ipv4_mapped
records = filter(addr_not_ipv4_mapped, records)
results = [sockaddr[0] for fam, type, prot, canonname, sockaddr in records]
if not results and not all_results:
raise socket.gaierror(
socket.EAI_ADDRFAMILY,
'The specified network host does not have any network '
'addresses in the requested address family'
)
return results if all_results else results[0]
class LRUCacheWithMetrics:
__slots__ = [
'capacity',
@ -610,13 +579,11 @@ IPV4_TO_6_RELAY_SUBNET = ipaddress.ip_network('192.88.99.0/24')
def is_valid_public_ipv4(address, allow_localhost: bool = False, allow_lan: bool = False):
try:
parsed_ip = ipaddress.ip_address(address)
if parsed_ip.version != 4:
return False
if parsed_ip.is_loopback:
return allow_localhost
if parsed_ip.is_private:
return allow_lan
if any((parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback,
if parsed_ip.is_loopback and allow_localhost:
return True
if allow_lan and parsed_ip.is_private:
return True
if any((parsed_ip.version != 4, parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback,
parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private)):
return False
else:
@ -625,23 +592,6 @@ def is_valid_public_ipv4(address, allow_localhost: bool = False, allow_lan: bool
except (ipaddress.AddressValueError, ValueError):
return False
def is_valid_public_ipv6(address, allow_localhost: bool = False, allow_lan: bool = False):
try:
parsed_ip = ipaddress.ip_address(address)
if parsed_ip.version != 6:
return False
if parsed_ip.is_loopback:
return allow_localhost
if parsed_ip.is_private:
return allow_lan
return not any((parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback,
parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private,
parsed_ip.ipv4_mapped))
except (ipaddress.AddressValueError, ValueError):
return False
def is_valid_public_ip(address, **kwargs):
return is_valid_public_ipv6(address, **kwargs) or is_valid_public_ipv4(address, **kwargs)
def sha256(x):
"""Simple wrapper of hashlib sha256."""
@ -1111,3 +1061,41 @@ async def asyncify_for_loop(gen, ticks_per_sleep: int = 1000):
yield item
if cnt % ticks_per_sleep == 0:
await async_sleep(0)
_SHA256_DIGEST_STATE_SIZE = 120
class ResumableSHA256:
__slots__ = ['_hasher']
def __init__(self, state: typing.Optional[bytes] = None):
self._hasher = hashlib.sha256()
if state is not None:
ctx = self._get_evp_md_ctx()
ctx_size = ctx.digest.contents.ctx_size
if len(state) != _SHA256_DIGEST_STATE_SIZE != ctx_size:
raise Exception(f'invalid sha256 digester state, got {len(state)} bytes')
memmove(ctx.md_data, state, ctx_size)
def _get_evp_md_ctx(self):
c_evp_obj = cast(c_void_p(id(self._hasher)), POINTER(EVPobject))
if hasattr(c_evp_obj.contents.ctx, "contents"):
return c_evp_obj.contents.ctx.contents
else:
return c_evp_obj.contents.ctx
def get_state(self) -> bytes:
ctx = self._get_evp_md_ctx()
ctx_size = ctx.digest.contents.ctx_size
hasher_state = ctx.md_data[:ctx_size]
return hasher_state
def __copy__(self):
return ResumableSHA256(self.get_state())
def update(self, data: bytes):
self._hasher.update(data)
def digest(self):
return self._hasher.digest()

View file

@ -51,6 +51,7 @@ class DB_PREFIXES(enum.Enum):
reposted_count = b'j'
effective_amount = b'i'
future_effective_amount = b'k'
hashX_history_hash = b'l'
COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass

View file

@ -37,7 +37,7 @@ class SecondaryDB:
DB_VERSIONS = [7, 8, 9, 10, 11, 12]
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
cache_all_tx_hashes: bool = False,
blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768,
@ -47,9 +47,9 @@ class SecondaryDB:
self._executor = executor
self._db_dir = db_dir
self._reorg_limit = reorg_limit
self._cache_all_claim_txos = cache_all_claim_txos
self._cache_all_tx_hashes = cache_all_tx_hashes
self._secondary_name = secondary_name
self._need_restart_path = os.path.join(self._db_dir, 'NEED_SCRIBE_RESTART')
if secondary_name:
assert max_open_files == -1, 'max open files must be -1 for secondary readers'
self._db_max_open_files = max_open_files
@ -100,9 +100,6 @@ class SecondaryDB:
self.total_transactions: List[bytes] = []
self.tx_num_mapping: Dict[bytes, int] = {}
# these are only used if the cache_all_claim_txos setting is on
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict)
self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH)
def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]:
@ -956,21 +953,6 @@ class SecondaryDB:
else:
assert self.db_tx_count == 0
async def _read_claim_txos(self):
def read_claim_txos():
set_claim_to_txo = self.claim_to_txo.__setitem__
for k, v in self.prefix_db.claim_to_txo.iterate(fill_cache=False):
set_claim_to_txo(k.claim_hash, v)
self.txo_to_claim[v.tx_num][v.position] = k.claim_hash
self.claim_to_txo.clear()
self.txo_to_claim.clear()
start = time.perf_counter()
self.logger.info("loading claims")
await asyncio.get_event_loop().run_in_executor(self._executor, read_claim_txos)
ts = time.perf_counter() - start
self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4))
# async def _read_headers(self):
# # if self.headers is not None:
# # return
@ -1019,6 +1001,27 @@ class SecondaryDB:
secondary_path = '' if not self._secondary_name else os.path.join(
self._db_dir, self._secondary_name
)
open_db_canary = None
if self._secondary_name:
open_db_canary = os.path.join(self._db_dir, f'{self._secondary_name}-db-canary')
if os.path.exists(open_db_canary):
with open(self._need_restart_path, 'w+') as f:
f.write(f"{time.strftime(f'%Y-%m-%d %H:%M:%S')} {self._secondary_name}\n")
raise RuntimeError('scribe restart is needed')
else:
with open(open_db_canary, 'w'):
pass
else:
herald_db_canary = os.path.join(self._db_dir, 'lbry-reader-db-canary')
es_sync_db_canary = os.path.join(self._db_dir, 'lbry-elastic-writer-db-canary')
if os.path.exists(herald_db_canary):
os.remove(herald_db_canary)
if os.path.exists(es_sync_db_canary):
os.remove(es_sync_db_canary)
if os.path.exists(self._need_restart_path):
os.remove(self._need_restart_path)
db_path = os.path.join(self._db_dir, 'lbry-rocksdb')
self.prefix_db = PrefixDB(
db_path, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files,
@ -1027,6 +1030,7 @@ class SecondaryDB:
)
if secondary_path != '':
os.remove(open_db_canary)
self.logger.info(f'opened db for read only: lbry-rocksdb (%s)', db_path)
else:
self.logger.info(f'opened db for writing: lbry-rocksdb (%s)', db_path)
@ -1063,8 +1067,6 @@ class SecondaryDB:
async def initialize_caches(self):
await self._read_tx_counts()
await self._read_block_hashes()
if self._cache_all_claim_txos:
await self._read_claim_txos()
if self._cache_all_tx_hashes:
await self._read_tx_hashes()
if self.db_height > 0:
@ -1154,15 +1156,9 @@ class SecondaryDB:
}
def get_cached_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]:
if self._cache_all_claim_txos:
return self.claim_to_txo.get(claim_hash)
return self.prefix_db.claim_to_txo.get_pending(claim_hash)
def get_cached_claim_hash(self, tx_num: int, position: int) -> Optional[bytes]:
if self._cache_all_claim_txos:
if tx_num not in self.txo_to_claim:
return
return self.txo_to_claim[tx_num].get(position, None)
v = self.prefix_db.txo_to_claim.get_pending(tx_num, position)
return None if not v else v.claim_hash

View file

@ -273,6 +273,7 @@ class BasePrefixDB:
undo_c_f = self.column_families[DB_PREFIXES.undo.value]
undo_info = self._db.get((undo_c_f, undo_key))
self._op_stack.apply_packed_undo_ops(undo_info)
self._op_stack.validate_and_apply_stashed_ops()
try:
with self._db.write_batch(sync=True) as batch:
batch_put = batch.put

View file

@ -3,6 +3,7 @@ import struct
import array
import base64
from typing import Union, Tuple, NamedTuple, Optional
from hub.common import ResumableSHA256
from hub.db.common import DB_PREFIXES
from hub.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow
from hub.schema.url import normalize_name
@ -1851,6 +1852,46 @@ class FutureEffectiveAmountPrefixRow(PrefixRow):
return cls.pack_key(claim_hash), cls.pack_value(future_effective_amount)
class HashXHistoryHasherKey(NamedTuple):
hashX: bytes
class HashXHistoryHasherValue(NamedTuple):
hasher: ResumableSHA256
class HashXHistoryHasherPrefixRow(PrefixRow):
prefix = DB_PREFIXES.hashX_history_hash.value
key_struct = struct.Struct(b'>11s')
value_struct = struct.Struct(b'>120s')
cache_size = 1024 * 1024 * 64
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>11s').pack
]
@classmethod
def pack_key(cls, hashX: bytes):
return super().pack_key(hashX)
@classmethod
def unpack_key(cls, key: bytes) -> HashXHistoryHasherKey:
return HashXHistoryHasherKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, hasher: ResumableSHA256) -> bytes:
return super().pack_value(hasher.get_state())
@classmethod
def unpack_value(cls, data: bytes) -> HashXHistoryHasherValue:
return HashXHistoryHasherValue(ResumableSHA256(*super().unpack_value(data)))
@classmethod
def pack_item(cls, hashX: bytes, hasher: ResumableSHA256):
return cls.pack_key(hashX), cls.pack_value(hasher)
class PrefixDB(BasePrefixDB):
def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64,
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None,
@ -1897,6 +1938,7 @@ class PrefixDB(BasePrefixDB):
self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack)
self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack)
self.future_effective_amount = FutureEffectiveAmountPrefixRow(db, self._op_stack)
self.hashX_history_hasher = HashXHistoryHasherPrefixRow(db, self._op_stack)
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:

View file

@ -160,7 +160,7 @@ class RevertableOpStack:
# there is a value and we're not deleting it in this op
# check that a delete for the stored value is in the stack
raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}\nvs\n{stored_val}")
elif not stored_val:
elif not has_stored_val:
raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
elif stored_val != op.value:
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
@ -324,9 +324,21 @@ class RevertableOpStack:
"""
Unpack and apply a sequence of undo ops from serialized undo bytes
"""
while packed:
op, packed = RevertableOp.unpack(packed)
self.append_op(op)
offset = 0
packed_size = len(packed)
while offset < packed_size:
is_put, key_len, val_len = _OP_STRUCT.unpack(packed[offset:offset + 9])
offset += 9
key = packed[offset:offset + key_len]
offset += key_len
value = packed[offset:offset + val_len]
offset += val_len
if is_put == 1:
op = RevertablePut(key, value)
else:
op = RevertableDelete(key, value)
self._stash.append(op)
self._stashed_last_op_for_key[op.key] = op
def get_pending_op(self, key: bytes) -> Optional[RevertableOp]:
if key in self._stashed_last_op_for_key:

View file

@ -9,11 +9,11 @@ from hub.db.common import ResolveResult
class ElasticSyncDB(SecondaryDB):
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
cache_all_tx_hashes: bool = False,
blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False):
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit,
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
index_address_status)
self.block_timestamp_cache = LRUCache(1024)

View file

@ -3,11 +3,11 @@ from hub.env import Env
class ElasticEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None,
cache_all_tx_hashes=None, cache_all_claim_txos=None, elastic_host=None, elastic_port=None,
cache_all_tx_hashes=None, elastic_host=None, elastic_port=None,
es_index_prefix=None, elastic_notifier_host=None, elastic_notifier_port=None,
blocking_channel_ids=None, filtering_channel_ids=None, reindex=False):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids)
blocking_channel_ids, filtering_channel_ids)
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default(
@ -43,7 +43,7 @@ class ElasticEnv(Env):
elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain,
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit,
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos, blocking_channel_ids=args.blocking_channel_ids,
blocking_channel_ids=args.blocking_channel_ids,
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
elastic_notifier_port=args.elastic_notifier_port
)

View file

@ -49,7 +49,7 @@ class ElasticSyncService(BlockchainReaderService):
def open_db(self):
env = self.env
self.db = ElasticSyncDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status

View file

@ -30,7 +30,7 @@ class Env:
pass
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
prometheus_port=None, cache_all_tx_hashes=None,
blocking_channel_ids=None, filtering_channel_ids=None, index_address_status=None):
self.logger = logging.getLogger(__name__)
@ -46,7 +46,6 @@ class Env:
self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0)
self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False)
self.cache_all_claim_txos = cache_all_claim_txos if cache_all_claim_txos is not None else self.boolean('CACHE_ALL_CLAIM_TXOS', False)
# Filtering / Blocking
self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default(
'BLOCKING_CHANNEL_IDS', '').split(' ')
@ -117,6 +116,10 @@ class Env:
result = [part.strip() for part in host.split(',')]
if len(result) == 1:
result = result[0]
if result == 'localhost':
# 'localhost' resolves to ::1 (ipv6) on many systems, which fails on default setup of
# docker, using 127.0.0.1 instead forces ipv4
result = '127.0.0.1'
return result
def sane_max_sessions(self):
@ -167,11 +170,6 @@ class Env:
"resolve, transaction fetching, and block sync all faster at the expense of higher "
"memory usage (at least 10GB more). Can be set in env with 'CACHE_ALL_TX_HASHES'.",
default=cls.boolean('CACHE_ALL_TX_HASHES', False))
parser.add_argument('--cache_all_claim_txos', action='store_true',
help="Load all claim txos into memory. This will make address subscriptions and sync, "
"resolve, transaction fetching, and block sync all faster at the expense of higher "
"memory usage. Can be set in env with 'CACHE_ALL_CLAIM_TXOS'.",
default=cls.boolean('CACHE_ALL_CLAIM_TXOS', False))
parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0),
help="Port for prometheus metrics to listen on, disabled by default. "
"Can be set in env with 'PROMETHEUS_PORT'.")

View file

@ -6,11 +6,11 @@ from hub.db import SecondaryDB
class HeraldDB(SecondaryDB):
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
cache_all_tx_hashes: bool = False,
blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768):
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit,
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
index_address_status, merkle_cache_size, tx_cache_size)
# self.headers = None

View file

@ -19,7 +19,7 @@ def parse_es_services(elastic_services_arg: str):
class ServerEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
prometheus_port=None, cache_all_tx_hashes=None,
daemon_url=None, host=None, elastic_services=None, es_index_prefix=None,
tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None,
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
@ -29,7 +29,7 @@ class ServerEnv(Env):
merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None,
history_tx_cache_size=None, largest_address_history_cache_size=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
blocking_channel_ids, filtering_channel_ids, index_address_status)
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.host = host if host is not None else self.default('HOST', 'localhost')
self.elastic_services = deque(parse_es_services(elastic_services or 'localhost:9200/localhost:19080'))
@ -153,7 +153,7 @@ class ServerEnv(Env):
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file,
allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos, country=args.country, payment_address=args.payment_address,
country=args.country, payment_address=args.payment_address,
donation_address=args.donation_address, max_send=args.max_send, max_receive=args.max_receive,
max_sessions=args.max_sessions, session_timeout=args.session_timeout,
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,

View file

@ -1,4 +1,3 @@
import errno
import time
import typing
import asyncio
@ -54,7 +53,7 @@ class HubServerService(BlockchainReaderService):
def open_db(self):
env = self.env
self.db = HeraldDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status, merkle_cache_size=env.merkle_cache_size,
@ -171,20 +170,10 @@ class HubServerService(BlockchainReaderService):
async def start_status_server(self):
if self.env.udp_port and int(self.env.udp_port):
hosts = self.env.cs_host()
started = False
while not started:
try:
await self.status_server.start(
0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country,
hosts, self.env.udp_port, self.env.allow_lan_udp
)
started = True
except OSError as e:
if e.errno is errno.EADDRINUSE:
await asyncio.sleep(3)
continue
raise
await self.status_server.start(
0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country,
self.env.host, self.env.udp_port, self.env.allow_lan_udp
)
def _iter_start_tasks(self):
yield self.start_status_server()

View file

@ -271,8 +271,7 @@ class SessionManager:
f'{host}:{port:d} : {e!r}')
raise
else:
for s in self.servers[kind].sockets:
self.logger.info(f'{kind} server listening on {s.getsockname()[:2]}')
self.logger.info(f'{kind} server listening on {host}:{port:d}')
async def _start_external_servers(self):
"""Start listening on TCP and SSL ports, but only if the respective

View file

@ -1,18 +1,10 @@
import asyncio
import ipaddress
import socket
import struct
from time import perf_counter
import logging
from typing import Optional, Tuple, NamedTuple, List, Union
from typing import Optional, Tuple, NamedTuple
from hub.schema.attrs import country_str_to_int, country_int_to_str
from hub.common import (
LRUCache,
resolve_host,
is_valid_public_ip,
is_valid_public_ipv4,
is_valid_public_ipv6,
)
from hub.common import LRUCache, is_valid_public_ipv4
log = logging.getLogger(__name__)
@ -44,75 +36,48 @@ class SPVPing(NamedTuple):
return decoded
PONG_ENCODING_PRE = b'!BBL32s'
PONG_ENCODING_POST = b'!H'
PONG_ENCODING = b'!BBL32s4sH'
class SPVPong(NamedTuple):
protocol_version: int
flags: int
height: int
tip: bytes
ipaddr: Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
source_address_raw: bytes
country: int
FLAG_AVAILABLE = 0b00000001
FLAG_IPV6 = 0b00000010
def encode(self):
return (struct.pack(PONG_ENCODING_PRE, self.protocol_version, self.flags, self.height, self.tip) +
self.encode_address(self.ipaddr) +
struct.pack(PONG_ENCODING_POST, self.country))
return struct.pack(PONG_ENCODING, *self)
@staticmethod
def encode_address(address: Union[str, ipaddress.IPv4Address, ipaddress.IPv6Address]):
if not isinstance(address, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
address = ipaddress.ip_address(address)
return address.packed
def encode_address(address: str):
return bytes(int(b) for b in address.split("."))
@classmethod
def make(cls, flags: int, height: int, tip: bytes, source_address: str, country: str) -> bytes:
ipaddr = ipaddress.ip_address(source_address)
flags = (flags | cls.FLAG_IPV6) if ipaddr.version == 6 else (flags & ~cls.FLAG_IPV6)
return SPVPong(
PROTOCOL_VERSION, flags, height, tip,
ipaddr,
cls.encode_address(source_address),
country_str_to_int(country)
)
).encode()
@classmethod
def make_sans_source_address(cls, flags: int, height: int, tip: bytes, country: str) -> Tuple[bytes, bytes]:
pong = cls.make(flags, height, tip, '0.0.0.0', country)
pong = pong.encode()
return pong[0:1], pong[2:38], pong[42:]
return pong[:38], pong[42:]
@classmethod
def decode(cls, packet: bytes):
offset = 0
protocol_version, flags, height, tip = struct.unpack(PONG_ENCODING_PRE, packet[offset:offset+38])
offset += 38
if flags & cls.FLAG_IPV6:
addr_len = ipaddress.IPV6LENGTH // 8
ipaddr = ipaddress.ip_address(packet[offset:offset+addr_len])
offset += addr_len
else:
addr_len = ipaddress.IPV4LENGTH // 8
ipaddr = ipaddress.ip_address(packet[offset:offset+addr_len])
offset += addr_len
country, = struct.unpack(PONG_ENCODING_POST, packet[offset:offset+2])
offset += 2
return cls(protocol_version, flags, height, tip, ipaddr, country)
return cls(*struct.unpack(PONG_ENCODING, packet[:44]))
@property
def available(self) -> bool:
return (self.flags & self.FLAG_AVAILABLE) > 0
@property
def ipv6(self) -> bool:
return (self.flags & self.FLAG_IPV6) > 0
return (self.flags & 0b00000001) > 0
@property
def ip_address(self) -> str:
return self.ipaddr.compressed
return ".".join(map(str, self.source_address_raw))
@property
def country_name(self):
@ -129,8 +94,7 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol):
def __init__(
self, height: int, tip: bytes, country: str,
throttle_cache_size: int = 1024, throttle_reqs_per_sec: int = 10,
allow_localhost: bool = False, allow_lan: bool = False,
is_valid_ip = is_valid_public_ip,
allow_localhost: bool = False, allow_lan: bool = False
):
super().__init__()
self.transport: Optional[asyncio.transports.DatagramTransport] = None
@ -138,27 +102,26 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol):
self._tip = tip
self._flags = 0
self._country = country
self._cache0 = self._cache1 = self.cache2 = None
self._left_cache = self._right_cache = None
self.update_cached_response()
self._throttle = LRUCache(throttle_cache_size)
self._should_log = LRUCache(throttle_cache_size)
self._min_delay = 1 / throttle_reqs_per_sec
self._allow_localhost = allow_localhost
self._allow_lan = allow_lan
self._is_valid_ip = is_valid_ip
self.closed = asyncio.Event()
def update_cached_response(self):
self._cache0, self._cache1, self._cache2 = SPVPong.make_sans_source_address(
self._left_cache, self._right_cache = SPVPong.make_sans_source_address(
self._flags, max(0, self._height), self._tip, self._country
)
def set_unavailable(self):
self._flags &= ~SPVPong.FLAG_AVAILABLE
self._flags &= 0b11111110
self.update_cached_response()
def set_available(self):
self._flags |= SPVPong.FLAG_AVAILABLE
self._flags |= 0b00000001
self.update_cached_response()
def set_height(self, height: int, tip: bytes):
@ -178,25 +141,17 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol):
return False
def make_pong(self, host):
ipaddr = ipaddress.ip_address(host)
if ipaddr.version == 6:
flags = self._flags | SPVPong.FLAG_IPV6
else:
flags = self._flags & ~SPVPong.FLAG_IPV6
return (self._cache0 + flags.to_bytes(1, 'big') +
self._cache1 + SPVPong.encode_address(ipaddr) +
self._cache2)
return self._left_cache + SPVPong.encode_address(host) + self._right_cache
def datagram_received(self, data: bytes, addr: Union[Tuple[str, int], Tuple[str, int, int, int]]):
def datagram_received(self, data: bytes, addr: Tuple[str, int]):
if self.should_throttle(addr[0]):
# print(f"throttled: {addr}")
return
try:
SPVPing.decode(data)
except (ValueError, struct.error, AttributeError, TypeError):
# log.exception("derp")
return
if addr[1] >= 1024 and self._is_valid_ip(
if addr[1] >= 1024 and is_valid_public_ipv4(
addr[0], allow_localhost=self._allow_localhost, allow_lan=self._allow_lan):
self.transport.sendto(self.make_pong(addr[0]), addr)
else:
@ -219,78 +174,39 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol):
class StatusServer:
def __init__(self):
self._protocols: List[SPVServerStatusProtocol] = []
self._protocol: Optional[SPVServerStatusProtocol] = None
async def _start(self, height: int, tip: bytes, country: str, addr: str, port: int, allow_lan: bool = False):
ipaddr = ipaddress.ip_address(addr)
if ipaddr.version == 4:
proto = SPVServerStatusProtocol(
height, tip, country,
allow_localhost=ipaddr.is_loopback or ipaddr.is_unspecified,
allow_lan=allow_lan,
is_valid_ip=is_valid_public_ipv4,
)
loop = asyncio.get_event_loop()
await loop.create_datagram_endpoint(lambda: proto, (ipaddr.compressed, port), family=socket.AF_INET)
elif ipaddr.version == 6:
proto = SPVServerStatusProtocol(
height, tip, country,
allow_localhost=ipaddr.is_loopback or ipaddr.is_unspecified,
allow_lan=allow_lan,
is_valid_ip=is_valid_public_ipv6,
)
# Because dualstack / IPv4 mapped address behavior on an IPv6 socket
# differs based on system config, create the socket with IPV6_V6ONLY.
# This disables the IPv4 mapped feature, so we don't need to consider
# when an IPv6 socket may interfere with IPv4 binding / traffic.
sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
sock.bind((ipaddr.compressed, port))
loop = asyncio.get_event_loop()
await loop.create_datagram_endpoint(lambda: proto, sock=sock)
else:
raise ValueError(f'unexpected IP address version {ipaddr.version}')
log.info("started udp%i status server on %s", ipaddr.version, proto.transport.get_extra_info('sockname')[:2])
self._protocols.append(proto)
async def start(self, height: int, tip: bytes, country: str, hosts: List[str], port: int, allow_lan: bool = False):
if not isinstance(hosts, list):
hosts = [hosts]
try:
for host in hosts:
addr = None
if not host:
resolved = ['::', '0.0.0.0'] # unspecified address
else:
resolved = await resolve_host(host, port, 'udp', family=socket.AF_UNSPEC, all_results=True)
for addr in resolved:
await self._start(height, tip, country, addr, port, allow_lan)
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
log.error("UDP status server failed to listen on (%s:%i) : %s", addr or host, port, e)
await self.stop()
raise
async def start(self, height: int, tip: bytes, country: str, interface: str, port: int, allow_lan: bool = False):
if self.is_running:
return
loop = asyncio.get_event_loop()
interface = interface if interface.lower() != 'localhost' else '127.0.0.1'
self._protocol = SPVServerStatusProtocol(
height, tip, country, allow_localhost=interface == '127.0.0.1', allow_lan=allow_lan
)
await loop.create_datagram_endpoint(lambda: self._protocol, (interface, port))
log.info("started udp status server on %s:%i", interface, port)
async def stop(self):
for proto in self._protocols:
await proto.close()
self._protocols.clear()
if self.is_running:
await self._protocol.close()
self._protocol = None
@property
def is_running(self):
return self._protocols
return self._protocol is not None
def set_unavailable(self):
for proto in self._protocols:
proto.set_unavailable()
if self.is_running:
self._protocol.set_unavailable()
def set_available(self):
for proto in self._protocols:
proto.set_available()
if self.is_running:
self._protocol.set_available()
def set_height(self, height: int, tip: bytes):
for proto in self._protocols:
proto.set_height(height, tip)
if self.is_running:
self._protocol.set_height(height, tip)
class SPVStatusClientProtocol(asyncio.DatagramProtocol):
@ -301,14 +217,9 @@ class SPVStatusClientProtocol(asyncio.DatagramProtocol):
self.responses = responses
self._ping_packet = SPVPing.make()
def datagram_received(self, data: bytes, addr: Union[Tuple[str, int], Tuple[str, int, int, int]]):
def datagram_received(self, data: bytes, addr: Tuple[str, int]):
try:
if len(addr) > 2: # IPv6 with possible mapped IPv4
ipaddr = ipaddress.ip_address(addr[0])
if ipaddr.ipv4_mapped:
# mapped IPv4 address identified
addr = (ipaddr.ipv4_mapped.compressed, addr[1])
self.responses.put_nowait(((addr[:2], perf_counter()), SPVPong.decode(data)))
self.responses.put_nowait(((addr, perf_counter()), SPVPong.decode(data)))
except (ValueError, struct.error, AttributeError, TypeError, RuntimeError):
return
@ -319,7 +230,7 @@ class SPVStatusClientProtocol(asyncio.DatagramProtocol):
self.transport = None
log.info("closed udp spv server selection client")
def ping(self, server: Union[Tuple[str, int], Tuple[str, int, int, int]]):
def ping(self, server: Tuple[str, int]):
self.transport.sendto(self._ping_packet, server)
def close(self):

View file

@ -5,17 +5,17 @@ import time
from typing import List
from concurrent.futures.thread import ThreadPoolExecutor
from bisect import bisect_right
from hub.common import sha256
from hub.common import ResumableSHA256
from hub.db import SecondaryDB
class PrimaryDB(SecondaryDB):
def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
cache_all_tx_hashes: bool = False,
max_open_files: int = 64, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False, enforce_integrity=True):
super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_claim_txos, cache_all_tx_hashes,
super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_tx_hashes,
blocking_channel_ids, filtering_channel_ids, executor, index_address_status,
enforce_integrity=enforce_integrity)
@ -35,16 +35,19 @@ class PrimaryDB(SecondaryDB):
if last_hashX:
yield last_hashX
def hashX_status_from_history(history: bytes) -> bytes:
def hashX_status_from_history(history: bytes) -> ResumableSHA256:
tx_counts = self.tx_counts
hist_tx_nums = array.array('I')
hist_tx_nums.frombytes(history)
digest = hashlib.sha256()
for tx_num, tx_hash in zip(
digest = ResumableSHA256()
digest.update(
b''.join(f'{tx_hash[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'.encode()
for tx_num, tx_hash in zip(
hist_tx_nums,
self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)):
digest.update(f'{tx_hash[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'.encode())
return digest.digest()
self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)
))
)
return digest
start = time.perf_counter()
@ -67,17 +70,24 @@ class PrimaryDB(SecondaryDB):
hashX_cnt += 1
key = prefix_db.hashX_status.pack_key(hashX)
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
status = hashX_status_from_history(history)
digester = hashX_status_from_history(history)
status = digester.digest()
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
if existing_status and existing_status == status:
continue
elif not existing_status:
existing_digester = prefix_db.hashX_history_hasher.get(hashX)
if not existing_status:
prefix_db.stash_raw_put(key, status)
op_cnt += 1
else:
prefix_db.stash_raw_delete(key, existing_status)
prefix_db.stash_raw_put(key, status)
op_cnt += 2
if not existing_digester:
prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,))
op_cnt += 1
else:
prefix_db.hashX_history_hasher.stash_delete((hashX,), existing_digester)
prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,))
op_cnt += 2
if op_cnt > 100000:
prefix_db.unsafe_commit()
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")

View file

@ -3,14 +3,13 @@ from hub.env import Env
class BlockchainEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
blocking_channel_ids=None, filtering_channel_ids=None,
prometheus_port=None, cache_all_tx_hashes=None, blocking_channel_ids=None, filtering_channel_ids=None,
db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None,
index_address_status=None, rebuild_address_status_from_height=None,
daemon_ca_path=None, history_tx_cache_size=None,
db_disable_integrity_checks=False):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
blocking_channel_ids, filtering_channel_ids, index_address_status)
self.db_max_open_files = db_max_open_files
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \
@ -57,7 +56,7 @@ class BlockchainEnv(Env):
db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files,
max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit,
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses,
index_address_status=args.index_address_statuses,
hashX_history_cache_size=args.address_history_cache_size,
rebuild_address_status_from_height=args.rebuild_address_status_from_height,
daemon_ca_path=args.daemon_ca_path, history_tx_cache_size=args.history_tx_cache_size,

View file

@ -1,3 +1,4 @@
import os
import time
import asyncio
import typing
@ -11,7 +12,8 @@ from hub import PROMETHEUS_NAMESPACE
from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from hub.error.base import ChainError
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache, LFUCacheWithMetrics
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache
from hub.common import ResumableSHA256, LFUCacheWithMetrics
from hub.scribe.db import PrimaryDB
from hub.scribe.daemon import LBCDaemon
from hub.scribe.transaction import Tx, TxOutput, TxInput, Block
@ -137,11 +139,10 @@ class BlockchainProcessorService(BlockchainService):
def open_db(self):
env = self.env
self.db = PrimaryDB(
env.coin, env.db_dir, env.reorg_limit, cache_all_claim_txos=env.cache_all_claim_txos,
cache_all_tx_hashes=env.cache_all_tx_hashes, max_open_files=env.db_max_open_files,
blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids,
executor=self._executor, index_address_status=env.index_address_status,
enforce_integrity=not env.db_disable_integrity_checks
env.coin, env.db_dir, env.reorg_limit, cache_all_tx_hashes=env.cache_all_tx_hashes,
max_open_files=env.db_max_open_files, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status, enforce_integrity=not env.db_disable_integrity_checks
)
async def run_in_thread_with_lock(self, func, *args):
@ -169,10 +170,22 @@ class BlockchainProcessorService(BlockchainService):
def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete):
self.mempool.remove(to_delete)
touched_hashXs = self.mempool.update_mempool(to_put)
touched_hashXs = list(self.mempool.update_mempool(to_put))
if self.env.index_address_status:
for hashX in touched_hashXs:
self._get_update_hashX_mempool_status_ops(hashX)
status_hashers = {
k: v.hasher if v else ResumableSHA256() for k, v in zip(
touched_hashXs,
self.db.prefix_db.hashX_history_hasher.multi_get([(hashX,) for hashX in touched_hashXs])
)
}
for hashX, v in zip(
touched_hashXs,
self.db.prefix_db.hashX_mempool_status.multi_get([(hashX,) for hashX in touched_hashXs])):
if v is not None:
self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), v)
hasher = status_hashers[hashX]
hasher.update(self.mempool.mempool_history(hashX).encode())
self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (hasher.digest(),))
for tx_hash, raw_tx in to_put:
mempool_prefix.stash_put((tx_hash,), (raw_tx,))
for tx_hash, raw_tx in to_delete.items():
@ -263,9 +276,6 @@ class BlockchainProcessorService(BlockchainService):
for _ in range(count):
await self.run_in_thread_with_lock(self.backup_block)
self.log.info(f'backed up to height {self.height:,d}')
if self.env.cache_all_claim_txos:
await self.db._read_claim_txos() # TODO: don't do this
await self.prefetcher.reset_height(self.height)
self.reorg_count_metric.inc()
except:
@ -394,12 +404,6 @@ class BlockchainProcessorService(BlockchainService):
if claim_hash not in self.updated_claim_previous_activations:
self.updated_claim_previous_activations[claim_hash] = activation
if self.env.cache_all_claim_txos:
self.db.claim_to_txo[claim_hash] = ClaimToTXOValue(
tx_num, nout, root_tx_num, root_idx, txo.value, channel_signature_is_valid, claim_name
)
self.db.txo_to_claim[tx_num][nout] = claim_hash
pending = StagedClaimtrieItem(
claim_name, normalized_name, claim_hash, txo.value, self.coin.get_expiration_height(height), tx_num, nout,
root_tx_num, root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash
@ -690,11 +694,6 @@ class BlockchainProcessorService(BlockchainService):
if 0 < activation <= self.height:
self.effective_amount_delta[claim_hash] -= spent.amount
self.future_effective_amount_delta[spent.claim_hash] -= spent.amount
if self.env.cache_all_claim_txos:
claim_hash = self.db.txo_to_claim[txin_num].pop(nout)
if not self.db.txo_to_claim[txin_num]:
self.db.txo_to_claim.pop(txin_num)
self.db.claim_to_txo.pop(claim_hash)
if spent.reposted_claim_hash:
self.pending_reposted.add(spent.reposted_claim_hash)
if spent.signing_hash and spent.channel_signature_is_valid and spent.signing_hash not in self.abandoned_claims:
@ -1637,15 +1636,6 @@ class BlockchainProcessorService(BlockchainService):
self.hashX_full_cache[hashX] = history
return history
def _get_update_hashX_mempool_status_ops(self, hashX: bytes):
existing = self.db.prefix_db.hashX_mempool_status.get(hashX)
if existing:
self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), existing)
history = self._get_cached_hashX_history(hashX) + self.mempool.mempool_history(hashX)
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (status,))
def advance_block(self, block: Block):
txo_count = 0
txi_count = 0
@ -1745,9 +1735,7 @@ class BlockchainProcessorService(BlockchainService):
# update hashX history status hashes and compactify the histories
self._get_update_hashX_histories_ops(height)
# only compactify adddress histories and update the status index if we're already caught up,
# a bulk update will happen once catchup finishes
if not self.db.catching_up and self.env.index_address_status:
if self.env.index_address_status:
self._get_compactify_ops(height)
self.db.last_indexed_address_status_height = height
@ -1802,6 +1790,17 @@ class BlockchainProcessorService(BlockchainService):
)
def _get_compactify_ops(self, height: int):
def _rebuild_hasher(hist_tx_nums):
hasher = ResumableSHA256()
hasher.update(
b''.join(f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num)}:'.encode()
for tx_num, tx_hash in zip(
hist_tx_nums,
self.db.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)
))
)
return hasher
existing_hashX_statuses = self.db.prefix_db.hashX_status.multi_get([(hashX,) for hashX in self.hashXs_by_tx.keys()], deserialize_value=False)
if existing_hashX_statuses:
pack_key = self.db.prefix_db.hashX_status.pack_key
@ -1816,6 +1815,13 @@ class BlockchainProcessorService(BlockchainService):
append_deletes_hashX_history = block_hashX_history_deletes.append
block_hashX_history_puts = []
existing_status_hashers = {
k: v.hasher if v else None for k, v in zip(
self.hashXs_by_tx,
self.db.prefix_db.hashX_history_hasher.multi_get([(hashX,) for hashX in self.hashXs_by_tx])
)
}
for (hashX, new_tx_nums), existing in zip(self.hashXs_by_tx.items(), existing_hashX_statuses):
new_history = [(self.pending_transactions[tx_num], height) for tx_num in new_tx_nums]
@ -1830,11 +1836,9 @@ class BlockchainProcessorService(BlockchainService):
unpack_key = self.db.prefix_db.hashX_history.unpack_key
needs_compaction = False
total_hist_txs = b''
for k, hist in self.db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_key=False,
deserialize_value=False):
hist_txs = unpack_history(hist)
total_hist_txs += hist
txs_extend(hist_txs)
hist_height = unpack_key(k).height
if height > reorg_limit and hist_height < height - reorg_limit:
@ -1853,27 +1857,19 @@ class BlockchainProcessorService(BlockchainService):
block_hashX_history_puts.append(((hashX, 0), (compact_hist_txs,)))
if not new_history:
continue
needed_tx_infos = []
append_needed_tx_info = needed_tx_infos.append
tx_infos = {}
for tx_num in tx_nums:
cached_tx_info = self.history_tx_info_cache.get(tx_num)
if cached_tx_info is not None:
tx_infos[tx_num] = cached_tx_info
else:
append_needed_tx_info(tx_num)
if needed_tx_infos:
for tx_num, tx_hash in zip(needed_tx_infos, self.db._get_tx_hashes(needed_tx_infos)):
tx_info = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:'
tx_infos[tx_num] = tx_info
self.history_tx_info_cache[tx_num] = tx_info
history = ''.join(map(tx_infos.__getitem__, tx_nums))
for tx_hash, height in new_history:
history += f'{tx_hash[::-1].hex()}:{height:d}:'
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_status.stash_put((hashX,), (status,))
hasher = existing_status_hashers[hashX]
if hasher is None:
# this is to migrate in the new column family, in the future it can be a new digester
# hasher = ResumableSHA256()
hasher = _rebuild_hasher(tx_nums)
else:
self.db.prefix_db.hashX_history_hasher.stash_delete((hashX,), (hasher,))
hasher.update(b''.join(
f'{tx_hash[::-1].hex()}:{height:d}:'.encode() for tx_hash, height in new_history
))
self.db.prefix_db.hashX_history_hasher.stash_put((hashX,), (hasher,))
status = hasher.digest()
self.db.prefix_db.hashX_status.stash_put((hashX,), (status,))
self.db.prefix_db.multi_delete(block_hashX_history_deletes)
self.db.prefix_db.hashX_history.stash_multi_put(block_hashX_history_puts)
@ -2074,6 +2070,8 @@ class BlockchainProcessorService(BlockchainService):
"""Loop forever processing blocks as they arrive."""
self._caught_up_event = caught_up_event
try:
if os.path.exists(self.db._need_restart_path):
raise RuntimeError('scribe restart is needed')
if self.height != self.daemon.cached_height() and not self.db.catching_up:
await self._need_catch_up() # tell the readers that we're still catching up with lbrycrd/lbcd
while not self._stopping:
@ -2135,9 +2133,6 @@ class BlockchainProcessorService(BlockchainService):
async def _finished_initial_catch_up(self):
self.log.info(f'caught up to height {self.height}')
if self.env.index_address_status and self.db.last_indexed_address_status_height < self.db.db_height:
await self.db.rebuild_hashX_status_index(self.db.last_indexed_address_status_height)
# Flush everything but with catching_up->False state.
self.db.catching_up = False

View file

@ -37,7 +37,7 @@ class BlockchainService:
def open_db(self):
env = self.env
self.db = SecondaryDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status

View file

@ -33,7 +33,7 @@ setup(
'certifi>=2021.10.08',
'colorama==0.3.7',
'cffi==1.13.2',
'protobuf==3.17.2',
'protobuf==3.18.3',
'msgpack==0.6.1',
'prometheus_client==0.7.1',
'coincurve==15.0.0',
@ -44,7 +44,8 @@ setup(
'filetype==1.0.9',
'grpcio==1.38.0',
'lbry-rocksdb==0.8.2',
'ujson==5.4.0'
'ujson==5.4.0',
'rehash==1.0.0'
],
extras_require={
'lint': ['pylint==2.10.0'],