Compare commits
11 commits
moodyjon/l
...
master
Author | SHA1 | Date | |
---|---|---|---|
|
ebcc6e5086 | ||
|
c0766f6abc | ||
|
7bc90c425f | ||
|
f2c494d4d6 | ||
|
8147bbf3b9 | ||
|
adbeeaf203 | ||
|
f55ed56215 | ||
|
d1d33c4bce | ||
|
b7de08ba0b | ||
|
405cef8d28 | ||
|
21262d2e43 |
20 changed files with 224 additions and 133 deletions
|
@ -17,7 +17,7 @@ services:
|
|||
- "lbry_rocksdb:/database"
|
||||
environment:
|
||||
- HUB_COMMAND=scribe
|
||||
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1256013/lbry-rocksdb.tar
|
||||
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1312050/lbry-rocksdb.tar
|
||||
command: # for full options, see `scribe --help`
|
||||
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
|
||||
- "--max_query_workers=2"
|
||||
|
|
|
@ -14,7 +14,7 @@ services:
|
|||
- "lbry_rocksdb:/database"
|
||||
environment:
|
||||
- HUB_COMMAND=scribe
|
||||
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1256013/lbry-rocksdb.tar
|
||||
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1312050/lbry-rocksdb.tar
|
||||
command:
|
||||
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
|
||||
- "--max_query_workers=2"
|
||||
|
|
|
@ -7,12 +7,14 @@ import logging
|
|||
import logging.handlers
|
||||
import typing
|
||||
import collections
|
||||
from ctypes import cast, memmove, POINTER, c_void_p
|
||||
from bisect import insort_right
|
||||
from collections import deque
|
||||
from decimal import Decimal
|
||||
from typing import Iterable, Deque
|
||||
from asyncio import get_event_loop, Event
|
||||
from prometheus_client import Counter
|
||||
from rehash.structs import EVPobject
|
||||
from hub.schema.tags import clean_tags
|
||||
from hub.schema.url import normalize_name
|
||||
from hub.error import TooManyClaimSearchParametersError
|
||||
|
@ -1059,3 +1061,41 @@ async def asyncify_for_loop(gen, ticks_per_sleep: int = 1000):
|
|||
yield item
|
||||
if cnt % ticks_per_sleep == 0:
|
||||
await async_sleep(0)
|
||||
|
||||
|
||||
_SHA256_DIGEST_STATE_SIZE = 120
|
||||
|
||||
|
||||
class ResumableSHA256:
|
||||
__slots__ = ['_hasher']
|
||||
|
||||
def __init__(self, state: typing.Optional[bytes] = None):
|
||||
self._hasher = hashlib.sha256()
|
||||
if state is not None:
|
||||
ctx = self._get_evp_md_ctx()
|
||||
ctx_size = ctx.digest.contents.ctx_size
|
||||
if len(state) != _SHA256_DIGEST_STATE_SIZE != ctx_size:
|
||||
raise Exception(f'invalid sha256 digester state, got {len(state)} bytes')
|
||||
memmove(ctx.md_data, state, ctx_size)
|
||||
|
||||
def _get_evp_md_ctx(self):
|
||||
c_evp_obj = cast(c_void_p(id(self._hasher)), POINTER(EVPobject))
|
||||
if hasattr(c_evp_obj.contents.ctx, "contents"):
|
||||
return c_evp_obj.contents.ctx.contents
|
||||
else:
|
||||
return c_evp_obj.contents.ctx
|
||||
|
||||
def get_state(self) -> bytes:
|
||||
ctx = self._get_evp_md_ctx()
|
||||
ctx_size = ctx.digest.contents.ctx_size
|
||||
hasher_state = ctx.md_data[:ctx_size]
|
||||
return hasher_state
|
||||
|
||||
def __copy__(self):
|
||||
return ResumableSHA256(self.get_state())
|
||||
|
||||
def update(self, data: bytes):
|
||||
self._hasher.update(data)
|
||||
|
||||
def digest(self):
|
||||
return self._hasher.digest()
|
||||
|
|
|
@ -51,6 +51,7 @@ class DB_PREFIXES(enum.Enum):
|
|||
reposted_count = b'j'
|
||||
effective_amount = b'i'
|
||||
future_effective_amount = b'k'
|
||||
hashX_history_hash = b'l'
|
||||
|
||||
|
||||
COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass
|
||||
|
|
52
hub/db/db.py
52
hub/db/db.py
|
@ -37,7 +37,7 @@ class SecondaryDB:
|
|||
DB_VERSIONS = [7, 8, 9, 10, 11, 12]
|
||||
|
||||
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
|
||||
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||
cache_all_tx_hashes: bool = False,
|
||||
blocking_channel_ids: List[str] = None,
|
||||
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768,
|
||||
|
@ -47,9 +47,9 @@ class SecondaryDB:
|
|||
self._executor = executor
|
||||
self._db_dir = db_dir
|
||||
self._reorg_limit = reorg_limit
|
||||
self._cache_all_claim_txos = cache_all_claim_txos
|
||||
self._cache_all_tx_hashes = cache_all_tx_hashes
|
||||
self._secondary_name = secondary_name
|
||||
self._need_restart_path = os.path.join(self._db_dir, 'NEED_SCRIBE_RESTART')
|
||||
if secondary_name:
|
||||
assert max_open_files == -1, 'max open files must be -1 for secondary readers'
|
||||
self._db_max_open_files = max_open_files
|
||||
|
@ -100,9 +100,6 @@ class SecondaryDB:
|
|||
self.total_transactions: List[bytes] = []
|
||||
self.tx_num_mapping: Dict[bytes, int] = {}
|
||||
|
||||
# these are only used if the cache_all_claim_txos setting is on
|
||||
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
|
||||
self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict)
|
||||
self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH)
|
||||
|
||||
def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]:
|
||||
|
@ -956,21 +953,6 @@ class SecondaryDB:
|
|||
else:
|
||||
assert self.db_tx_count == 0
|
||||
|
||||
async def _read_claim_txos(self):
|
||||
def read_claim_txos():
|
||||
set_claim_to_txo = self.claim_to_txo.__setitem__
|
||||
for k, v in self.prefix_db.claim_to_txo.iterate(fill_cache=False):
|
||||
set_claim_to_txo(k.claim_hash, v)
|
||||
self.txo_to_claim[v.tx_num][v.position] = k.claim_hash
|
||||
|
||||
self.claim_to_txo.clear()
|
||||
self.txo_to_claim.clear()
|
||||
start = time.perf_counter()
|
||||
self.logger.info("loading claims")
|
||||
await asyncio.get_event_loop().run_in_executor(self._executor, read_claim_txos)
|
||||
ts = time.perf_counter() - start
|
||||
self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4))
|
||||
|
||||
# async def _read_headers(self):
|
||||
# # if self.headers is not None:
|
||||
# # return
|
||||
|
@ -1019,6 +1001,27 @@ class SecondaryDB:
|
|||
secondary_path = '' if not self._secondary_name else os.path.join(
|
||||
self._db_dir, self._secondary_name
|
||||
)
|
||||
open_db_canary = None
|
||||
|
||||
if self._secondary_name:
|
||||
open_db_canary = os.path.join(self._db_dir, f'{self._secondary_name}-db-canary')
|
||||
if os.path.exists(open_db_canary):
|
||||
with open(self._need_restart_path, 'w+') as f:
|
||||
f.write(f"{time.strftime(f'%Y-%m-%d %H:%M:%S')} {self._secondary_name}\n")
|
||||
raise RuntimeError('scribe restart is needed')
|
||||
else:
|
||||
with open(open_db_canary, 'w'):
|
||||
pass
|
||||
else:
|
||||
herald_db_canary = os.path.join(self._db_dir, 'lbry-reader-db-canary')
|
||||
es_sync_db_canary = os.path.join(self._db_dir, 'lbry-elastic-writer-db-canary')
|
||||
if os.path.exists(herald_db_canary):
|
||||
os.remove(herald_db_canary)
|
||||
if os.path.exists(es_sync_db_canary):
|
||||
os.remove(es_sync_db_canary)
|
||||
if os.path.exists(self._need_restart_path):
|
||||
os.remove(self._need_restart_path)
|
||||
|
||||
db_path = os.path.join(self._db_dir, 'lbry-rocksdb')
|
||||
self.prefix_db = PrefixDB(
|
||||
db_path, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files,
|
||||
|
@ -1027,6 +1030,7 @@ class SecondaryDB:
|
|||
)
|
||||
|
||||
if secondary_path != '':
|
||||
os.remove(open_db_canary)
|
||||
self.logger.info(f'opened db for read only: lbry-rocksdb (%s)', db_path)
|
||||
else:
|
||||
self.logger.info(f'opened db for writing: lbry-rocksdb (%s)', db_path)
|
||||
|
@ -1063,8 +1067,6 @@ class SecondaryDB:
|
|||
async def initialize_caches(self):
|
||||
await self._read_tx_counts()
|
||||
await self._read_block_hashes()
|
||||
if self._cache_all_claim_txos:
|
||||
await self._read_claim_txos()
|
||||
if self._cache_all_tx_hashes:
|
||||
await self._read_tx_hashes()
|
||||
if self.db_height > 0:
|
||||
|
@ -1154,15 +1156,9 @@ class SecondaryDB:
|
|||
}
|
||||
|
||||
def get_cached_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]:
|
||||
if self._cache_all_claim_txos:
|
||||
return self.claim_to_txo.get(claim_hash)
|
||||
return self.prefix_db.claim_to_txo.get_pending(claim_hash)
|
||||
|
||||
def get_cached_claim_hash(self, tx_num: int, position: int) -> Optional[bytes]:
|
||||
if self._cache_all_claim_txos:
|
||||
if tx_num not in self.txo_to_claim:
|
||||
return
|
||||
return self.txo_to_claim[tx_num].get(position, None)
|
||||
v = self.prefix_db.txo_to_claim.get_pending(tx_num, position)
|
||||
return None if not v else v.claim_hash
|
||||
|
||||
|
|
|
@ -273,6 +273,7 @@ class BasePrefixDB:
|
|||
undo_c_f = self.column_families[DB_PREFIXES.undo.value]
|
||||
undo_info = self._db.get((undo_c_f, undo_key))
|
||||
self._op_stack.apply_packed_undo_ops(undo_info)
|
||||
self._op_stack.validate_and_apply_stashed_ops()
|
||||
try:
|
||||
with self._db.write_batch(sync=True) as batch:
|
||||
batch_put = batch.put
|
||||
|
|
|
@ -3,6 +3,7 @@ import struct
|
|||
import array
|
||||
import base64
|
||||
from typing import Union, Tuple, NamedTuple, Optional
|
||||
from hub.common import ResumableSHA256
|
||||
from hub.db.common import DB_PREFIXES
|
||||
from hub.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow
|
||||
from hub.schema.url import normalize_name
|
||||
|
@ -1851,6 +1852,46 @@ class FutureEffectiveAmountPrefixRow(PrefixRow):
|
|||
return cls.pack_key(claim_hash), cls.pack_value(future_effective_amount)
|
||||
|
||||
|
||||
class HashXHistoryHasherKey(NamedTuple):
|
||||
hashX: bytes
|
||||
|
||||
|
||||
class HashXHistoryHasherValue(NamedTuple):
|
||||
hasher: ResumableSHA256
|
||||
|
||||
|
||||
class HashXHistoryHasherPrefixRow(PrefixRow):
|
||||
prefix = DB_PREFIXES.hashX_history_hash.value
|
||||
key_struct = struct.Struct(b'>11s')
|
||||
value_struct = struct.Struct(b'>120s')
|
||||
cache_size = 1024 * 1024 * 64
|
||||
|
||||
key_part_lambdas = [
|
||||
lambda: b'',
|
||||
struct.Struct(b'>11s').pack
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, hashX: bytes):
|
||||
return super().pack_key(hashX)
|
||||
|
||||
@classmethod
|
||||
def unpack_key(cls, key: bytes) -> HashXHistoryHasherKey:
|
||||
return HashXHistoryHasherKey(*super().unpack_key(key))
|
||||
|
||||
@classmethod
|
||||
def pack_value(cls, hasher: ResumableSHA256) -> bytes:
|
||||
return super().pack_value(hasher.get_state())
|
||||
|
||||
@classmethod
|
||||
def unpack_value(cls, data: bytes) -> HashXHistoryHasherValue:
|
||||
return HashXHistoryHasherValue(ResumableSHA256(*super().unpack_value(data)))
|
||||
|
||||
@classmethod
|
||||
def pack_item(cls, hashX: bytes, hasher: ResumableSHA256):
|
||||
return cls.pack_key(hashX), cls.pack_value(hasher)
|
||||
|
||||
|
||||
class PrefixDB(BasePrefixDB):
|
||||
def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64,
|
||||
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None,
|
||||
|
@ -1897,6 +1938,7 @@ class PrefixDB(BasePrefixDB):
|
|||
self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack)
|
||||
self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack)
|
||||
self.future_effective_amount = FutureEffectiveAmountPrefixRow(db, self._op_stack)
|
||||
self.hashX_history_hasher = HashXHistoryHasherPrefixRow(db, self._op_stack)
|
||||
|
||||
|
||||
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:
|
||||
|
|
|
@ -160,7 +160,7 @@ class RevertableOpStack:
|
|||
# there is a value and we're not deleting it in this op
|
||||
# check that a delete for the stored value is in the stack
|
||||
raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}\nvs\n{stored_val}")
|
||||
elif not stored_val:
|
||||
elif not has_stored_val:
|
||||
raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
|
||||
elif stored_val != op.value:
|
||||
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
|
||||
|
@ -324,9 +324,21 @@ class RevertableOpStack:
|
|||
"""
|
||||
Unpack and apply a sequence of undo ops from serialized undo bytes
|
||||
"""
|
||||
while packed:
|
||||
op, packed = RevertableOp.unpack(packed)
|
||||
self.append_op(op)
|
||||
offset = 0
|
||||
packed_size = len(packed)
|
||||
while offset < packed_size:
|
||||
is_put, key_len, val_len = _OP_STRUCT.unpack(packed[offset:offset + 9])
|
||||
offset += 9
|
||||
key = packed[offset:offset + key_len]
|
||||
offset += key_len
|
||||
value = packed[offset:offset + val_len]
|
||||
offset += val_len
|
||||
if is_put == 1:
|
||||
op = RevertablePut(key, value)
|
||||
else:
|
||||
op = RevertableDelete(key, value)
|
||||
self._stash.append(op)
|
||||
self._stashed_last_op_for_key[op.key] = op
|
||||
|
||||
def get_pending_op(self, key: bytes) -> Optional[RevertableOp]:
|
||||
if key in self._stashed_last_op_for_key:
|
||||
|
|
|
@ -9,11 +9,11 @@ from hub.db.common import ResolveResult
|
|||
|
||||
class ElasticSyncDB(SecondaryDB):
|
||||
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
|
||||
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||
cache_all_tx_hashes: bool = False,
|
||||
blocking_channel_ids: List[str] = None,
|
||||
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||
index_address_status=False):
|
||||
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
|
||||
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit,
|
||||
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
|
||||
index_address_status)
|
||||
self.block_timestamp_cache = LRUCache(1024)
|
||||
|
|
|
@ -3,11 +3,11 @@ from hub.env import Env
|
|||
|
||||
class ElasticEnv(Env):
|
||||
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None,
|
||||
cache_all_tx_hashes=None, cache_all_claim_txos=None, elastic_host=None, elastic_port=None,
|
||||
cache_all_tx_hashes=None, elastic_host=None, elastic_port=None,
|
||||
es_index_prefix=None, elastic_notifier_host=None, elastic_notifier_port=None,
|
||||
blocking_channel_ids=None, filtering_channel_ids=None, reindex=False):
|
||||
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
|
||||
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids)
|
||||
blocking_channel_ids, filtering_channel_ids)
|
||||
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
|
||||
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
|
||||
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default(
|
||||
|
@ -43,7 +43,7 @@ class ElasticEnv(Env):
|
|||
elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain,
|
||||
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit,
|
||||
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
|
||||
cache_all_claim_txos=args.cache_all_claim_txos, blocking_channel_ids=args.blocking_channel_ids,
|
||||
blocking_channel_ids=args.blocking_channel_ids,
|
||||
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
|
||||
elastic_notifier_port=args.elastic_notifier_port
|
||||
)
|
||||
|
|
|
@ -49,7 +49,7 @@ class ElasticSyncService(BlockchainReaderService):
|
|||
def open_db(self):
|
||||
env = self.env
|
||||
self.db = ElasticSyncDB(
|
||||
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
|
||||
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
|
||||
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
|
||||
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
||||
index_address_status=env.index_address_status
|
||||
|
|
|
@ -30,7 +30,7 @@ class Env:
|
|||
pass
|
||||
|
||||
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
|
||||
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
|
||||
prometheus_port=None, cache_all_tx_hashes=None,
|
||||
blocking_channel_ids=None, filtering_channel_ids=None, index_address_status=None):
|
||||
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
@ -46,7 +46,6 @@ class Env:
|
|||
self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
|
||||
self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0)
|
||||
self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False)
|
||||
self.cache_all_claim_txos = cache_all_claim_txos if cache_all_claim_txos is not None else self.boolean('CACHE_ALL_CLAIM_TXOS', False)
|
||||
# Filtering / Blocking
|
||||
self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default(
|
||||
'BLOCKING_CHANNEL_IDS', '').split(' ')
|
||||
|
@ -171,11 +170,6 @@ class Env:
|
|||
"resolve, transaction fetching, and block sync all faster at the expense of higher "
|
||||
"memory usage (at least 10GB more). Can be set in env with 'CACHE_ALL_TX_HASHES'.",
|
||||
default=cls.boolean('CACHE_ALL_TX_HASHES', False))
|
||||
parser.add_argument('--cache_all_claim_txos', action='store_true',
|
||||
help="Load all claim txos into memory. This will make address subscriptions and sync, "
|
||||
"resolve, transaction fetching, and block sync all faster at the expense of higher "
|
||||
"memory usage. Can be set in env with 'CACHE_ALL_CLAIM_TXOS'.",
|
||||
default=cls.boolean('CACHE_ALL_CLAIM_TXOS', False))
|
||||
parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0),
|
||||
help="Port for prometheus metrics to listen on, disabled by default. "
|
||||
"Can be set in env with 'PROMETHEUS_PORT'.")
|
||||
|
|
|
@ -6,11 +6,11 @@ from hub.db import SecondaryDB
|
|||
|
||||
class HeraldDB(SecondaryDB):
|
||||
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
|
||||
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||
cache_all_tx_hashes: bool = False,
|
||||
blocking_channel_ids: List[str] = None,
|
||||
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768):
|
||||
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
|
||||
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit,
|
||||
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
|
||||
index_address_status, merkle_cache_size, tx_cache_size)
|
||||
# self.headers = None
|
||||
|
|
|
@ -19,7 +19,7 @@ def parse_es_services(elastic_services_arg: str):
|
|||
|
||||
class ServerEnv(Env):
|
||||
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
|
||||
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
|
||||
prometheus_port=None, cache_all_tx_hashes=None,
|
||||
daemon_url=None, host=None, elastic_services=None, es_index_prefix=None,
|
||||
tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None,
|
||||
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
|
||||
|
@ -29,7 +29,7 @@ class ServerEnv(Env):
|
|||
merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None,
|
||||
history_tx_cache_size=None, largest_address_history_cache_size=None):
|
||||
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
|
||||
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
|
||||
blocking_channel_ids, filtering_channel_ids, index_address_status)
|
||||
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
|
||||
self.host = host if host is not None else self.default('HOST', 'localhost')
|
||||
self.elastic_services = deque(parse_es_services(elastic_services or 'localhost:9200/localhost:19080'))
|
||||
|
@ -153,7 +153,7 @@ class ServerEnv(Env):
|
|||
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
|
||||
udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file,
|
||||
allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes,
|
||||
cache_all_claim_txos=args.cache_all_claim_txos, country=args.country, payment_address=args.payment_address,
|
||||
country=args.country, payment_address=args.payment_address,
|
||||
donation_address=args.donation_address, max_send=args.max_send, max_receive=args.max_receive,
|
||||
max_sessions=args.max_sessions, session_timeout=args.session_timeout,
|
||||
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,
|
||||
|
|
|
@ -53,7 +53,7 @@ class HubServerService(BlockchainReaderService):
|
|||
def open_db(self):
|
||||
env = self.env
|
||||
self.db = HeraldDB(
|
||||
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
|
||||
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
|
||||
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
|
||||
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
||||
index_address_status=env.index_address_status, merkle_cache_size=env.merkle_cache_size,
|
||||
|
|
|
@ -5,17 +5,17 @@ import time
|
|||
from typing import List
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from bisect import bisect_right
|
||||
from hub.common import sha256
|
||||
from hub.common import ResumableSHA256
|
||||
from hub.db import SecondaryDB
|
||||
|
||||
|
||||
class PrimaryDB(SecondaryDB):
|
||||
def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
|
||||
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||
cache_all_tx_hashes: bool = False,
|
||||
max_open_files: int = 64, blocking_channel_ids: List[str] = None,
|
||||
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||
index_address_status=False, enforce_integrity=True):
|
||||
super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_claim_txos, cache_all_tx_hashes,
|
||||
super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_tx_hashes,
|
||||
blocking_channel_ids, filtering_channel_ids, executor, index_address_status,
|
||||
enforce_integrity=enforce_integrity)
|
||||
|
||||
|
@ -35,16 +35,19 @@ class PrimaryDB(SecondaryDB):
|
|||
if last_hashX:
|
||||
yield last_hashX
|
||||
|
||||
def hashX_status_from_history(history: bytes) -> bytes:
|
||||
def hashX_status_from_history(history: bytes) -> ResumableSHA256:
|
||||
tx_counts = self.tx_counts
|
||||
hist_tx_nums = array.array('I')
|
||||
hist_tx_nums.frombytes(history)
|
||||
digest = hashlib.sha256()
|
||||
for tx_num, tx_hash in zip(
|
||||
digest = ResumableSHA256()
|
||||
digest.update(
|
||||
b''.join(f'{tx_hash[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'.encode()
|
||||
for tx_num, tx_hash in zip(
|
||||
hist_tx_nums,
|
||||
self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)):
|
||||
digest.update(f'{tx_hash[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'.encode())
|
||||
return digest.digest()
|
||||
self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)
|
||||
))
|
||||
)
|
||||
return digest
|
||||
|
||||
start = time.perf_counter()
|
||||
|
||||
|
@ -67,17 +70,24 @@ class PrimaryDB(SecondaryDB):
|
|||
hashX_cnt += 1
|
||||
key = prefix_db.hashX_status.pack_key(hashX)
|
||||
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
|
||||
status = hashX_status_from_history(history)
|
||||
digester = hashX_status_from_history(history)
|
||||
status = digester.digest()
|
||||
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
|
||||
if existing_status and existing_status == status:
|
||||
continue
|
||||
elif not existing_status:
|
||||
existing_digester = prefix_db.hashX_history_hasher.get(hashX)
|
||||
if not existing_status:
|
||||
prefix_db.stash_raw_put(key, status)
|
||||
op_cnt += 1
|
||||
else:
|
||||
prefix_db.stash_raw_delete(key, existing_status)
|
||||
prefix_db.stash_raw_put(key, status)
|
||||
op_cnt += 2
|
||||
if not existing_digester:
|
||||
prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,))
|
||||
op_cnt += 1
|
||||
else:
|
||||
prefix_db.hashX_history_hasher.stash_delete((hashX,), existing_digester)
|
||||
prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,))
|
||||
op_cnt += 2
|
||||
if op_cnt > 100000:
|
||||
prefix_db.unsafe_commit()
|
||||
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
|
||||
|
|
|
@ -3,14 +3,13 @@ from hub.env import Env
|
|||
|
||||
class BlockchainEnv(Env):
|
||||
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
|
||||
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
|
||||
blocking_channel_ids=None, filtering_channel_ids=None,
|
||||
prometheus_port=None, cache_all_tx_hashes=None, blocking_channel_ids=None, filtering_channel_ids=None,
|
||||
db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None,
|
||||
index_address_status=None, rebuild_address_status_from_height=None,
|
||||
daemon_ca_path=None, history_tx_cache_size=None,
|
||||
db_disable_integrity_checks=False):
|
||||
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
|
||||
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
|
||||
blocking_channel_ids, filtering_channel_ids, index_address_status)
|
||||
self.db_max_open_files = db_max_open_files
|
||||
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
|
||||
self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \
|
||||
|
@ -57,7 +56,7 @@ class BlockchainEnv(Env):
|
|||
db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files,
|
||||
max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit,
|
||||
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
|
||||
cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses,
|
||||
index_address_status=args.index_address_statuses,
|
||||
hashX_history_cache_size=args.address_history_cache_size,
|
||||
rebuild_address_status_from_height=args.rebuild_address_status_from_height,
|
||||
daemon_ca_path=args.daemon_ca_path, history_tx_cache_size=args.history_tx_cache_size,
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import os
|
||||
import time
|
||||
import asyncio
|
||||
import typing
|
||||
|
@ -11,7 +12,8 @@ from hub import PROMETHEUS_NAMESPACE
|
|||
from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
|
||||
from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
|
||||
from hub.error.base import ChainError
|
||||
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache, LFUCacheWithMetrics
|
||||
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache
|
||||
from hub.common import ResumableSHA256, LFUCacheWithMetrics
|
||||
from hub.scribe.db import PrimaryDB
|
||||
from hub.scribe.daemon import LBCDaemon
|
||||
from hub.scribe.transaction import Tx, TxOutput, TxInput, Block
|
||||
|
@ -137,11 +139,10 @@ class BlockchainProcessorService(BlockchainService):
|
|||
def open_db(self):
|
||||
env = self.env
|
||||
self.db = PrimaryDB(
|
||||
env.coin, env.db_dir, env.reorg_limit, cache_all_claim_txos=env.cache_all_claim_txos,
|
||||
cache_all_tx_hashes=env.cache_all_tx_hashes, max_open_files=env.db_max_open_files,
|
||||
blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids,
|
||||
executor=self._executor, index_address_status=env.index_address_status,
|
||||
enforce_integrity=not env.db_disable_integrity_checks
|
||||
env.coin, env.db_dir, env.reorg_limit, cache_all_tx_hashes=env.cache_all_tx_hashes,
|
||||
max_open_files=env.db_max_open_files, blocking_channel_ids=env.blocking_channel_ids,
|
||||
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
||||
index_address_status=env.index_address_status, enforce_integrity=not env.db_disable_integrity_checks
|
||||
)
|
||||
|
||||
async def run_in_thread_with_lock(self, func, *args):
|
||||
|
@ -169,10 +170,22 @@ class BlockchainProcessorService(BlockchainService):
|
|||
|
||||
def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete):
|
||||
self.mempool.remove(to_delete)
|
||||
touched_hashXs = self.mempool.update_mempool(to_put)
|
||||
touched_hashXs = list(self.mempool.update_mempool(to_put))
|
||||
if self.env.index_address_status:
|
||||
for hashX in touched_hashXs:
|
||||
self._get_update_hashX_mempool_status_ops(hashX)
|
||||
status_hashers = {
|
||||
k: v.hasher if v else ResumableSHA256() for k, v in zip(
|
||||
touched_hashXs,
|
||||
self.db.prefix_db.hashX_history_hasher.multi_get([(hashX,) for hashX in touched_hashXs])
|
||||
)
|
||||
}
|
||||
for hashX, v in zip(
|
||||
touched_hashXs,
|
||||
self.db.prefix_db.hashX_mempool_status.multi_get([(hashX,) for hashX in touched_hashXs])):
|
||||
if v is not None:
|
||||
self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), v)
|
||||
hasher = status_hashers[hashX]
|
||||
hasher.update(self.mempool.mempool_history(hashX).encode())
|
||||
self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (hasher.digest(),))
|
||||
for tx_hash, raw_tx in to_put:
|
||||
mempool_prefix.stash_put((tx_hash,), (raw_tx,))
|
||||
for tx_hash, raw_tx in to_delete.items():
|
||||
|
@ -263,9 +276,6 @@ class BlockchainProcessorService(BlockchainService):
|
|||
for _ in range(count):
|
||||
await self.run_in_thread_with_lock(self.backup_block)
|
||||
self.log.info(f'backed up to height {self.height:,d}')
|
||||
|
||||
if self.env.cache_all_claim_txos:
|
||||
await self.db._read_claim_txos() # TODO: don't do this
|
||||
await self.prefetcher.reset_height(self.height)
|
||||
self.reorg_count_metric.inc()
|
||||
except:
|
||||
|
@ -394,12 +404,6 @@ class BlockchainProcessorService(BlockchainService):
|
|||
if claim_hash not in self.updated_claim_previous_activations:
|
||||
self.updated_claim_previous_activations[claim_hash] = activation
|
||||
|
||||
if self.env.cache_all_claim_txos:
|
||||
self.db.claim_to_txo[claim_hash] = ClaimToTXOValue(
|
||||
tx_num, nout, root_tx_num, root_idx, txo.value, channel_signature_is_valid, claim_name
|
||||
)
|
||||
self.db.txo_to_claim[tx_num][nout] = claim_hash
|
||||
|
||||
pending = StagedClaimtrieItem(
|
||||
claim_name, normalized_name, claim_hash, txo.value, self.coin.get_expiration_height(height), tx_num, nout,
|
||||
root_tx_num, root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash
|
||||
|
@ -690,11 +694,6 @@ class BlockchainProcessorService(BlockchainService):
|
|||
if 0 < activation <= self.height:
|
||||
self.effective_amount_delta[claim_hash] -= spent.amount
|
||||
self.future_effective_amount_delta[spent.claim_hash] -= spent.amount
|
||||
if self.env.cache_all_claim_txos:
|
||||
claim_hash = self.db.txo_to_claim[txin_num].pop(nout)
|
||||
if not self.db.txo_to_claim[txin_num]:
|
||||
self.db.txo_to_claim.pop(txin_num)
|
||||
self.db.claim_to_txo.pop(claim_hash)
|
||||
if spent.reposted_claim_hash:
|
||||
self.pending_reposted.add(spent.reposted_claim_hash)
|
||||
if spent.signing_hash and spent.channel_signature_is_valid and spent.signing_hash not in self.abandoned_claims:
|
||||
|
@ -1637,15 +1636,6 @@ class BlockchainProcessorService(BlockchainService):
|
|||
self.hashX_full_cache[hashX] = history
|
||||
return history
|
||||
|
||||
def _get_update_hashX_mempool_status_ops(self, hashX: bytes):
|
||||
existing = self.db.prefix_db.hashX_mempool_status.get(hashX)
|
||||
if existing:
|
||||
self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), existing)
|
||||
history = self._get_cached_hashX_history(hashX) + self.mempool.mempool_history(hashX)
|
||||
if history:
|
||||
status = sha256(history.encode())
|
||||
self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (status,))
|
||||
|
||||
def advance_block(self, block: Block):
|
||||
txo_count = 0
|
||||
txi_count = 0
|
||||
|
@ -1745,9 +1735,7 @@ class BlockchainProcessorService(BlockchainService):
|
|||
# update hashX history status hashes and compactify the histories
|
||||
self._get_update_hashX_histories_ops(height)
|
||||
|
||||
# only compactify adddress histories and update the status index if we're already caught up,
|
||||
# a bulk update will happen once catchup finishes
|
||||
if not self.db.catching_up and self.env.index_address_status:
|
||||
if self.env.index_address_status:
|
||||
self._get_compactify_ops(height)
|
||||
self.db.last_indexed_address_status_height = height
|
||||
|
||||
|
@ -1802,6 +1790,17 @@ class BlockchainProcessorService(BlockchainService):
|
|||
)
|
||||
|
||||
def _get_compactify_ops(self, height: int):
|
||||
def _rebuild_hasher(hist_tx_nums):
|
||||
hasher = ResumableSHA256()
|
||||
hasher.update(
|
||||
b''.join(f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num)}:'.encode()
|
||||
for tx_num, tx_hash in zip(
|
||||
hist_tx_nums,
|
||||
self.db.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)
|
||||
))
|
||||
)
|
||||
return hasher
|
||||
|
||||
existing_hashX_statuses = self.db.prefix_db.hashX_status.multi_get([(hashX,) for hashX in self.hashXs_by_tx.keys()], deserialize_value=False)
|
||||
if existing_hashX_statuses:
|
||||
pack_key = self.db.prefix_db.hashX_status.pack_key
|
||||
|
@ -1816,6 +1815,13 @@ class BlockchainProcessorService(BlockchainService):
|
|||
append_deletes_hashX_history = block_hashX_history_deletes.append
|
||||
block_hashX_history_puts = []
|
||||
|
||||
existing_status_hashers = {
|
||||
k: v.hasher if v else None for k, v in zip(
|
||||
self.hashXs_by_tx,
|
||||
self.db.prefix_db.hashX_history_hasher.multi_get([(hashX,) for hashX in self.hashXs_by_tx])
|
||||
)
|
||||
}
|
||||
|
||||
for (hashX, new_tx_nums), existing in zip(self.hashXs_by_tx.items(), existing_hashX_statuses):
|
||||
new_history = [(self.pending_transactions[tx_num], height) for tx_num in new_tx_nums]
|
||||
|
||||
|
@ -1830,11 +1836,9 @@ class BlockchainProcessorService(BlockchainService):
|
|||
unpack_key = self.db.prefix_db.hashX_history.unpack_key
|
||||
needs_compaction = False
|
||||
|
||||
total_hist_txs = b''
|
||||
for k, hist in self.db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_key=False,
|
||||
deserialize_value=False):
|
||||
hist_txs = unpack_history(hist)
|
||||
total_hist_txs += hist
|
||||
txs_extend(hist_txs)
|
||||
hist_height = unpack_key(k).height
|
||||
if height > reorg_limit and hist_height < height - reorg_limit:
|
||||
|
@ -1853,27 +1857,19 @@ class BlockchainProcessorService(BlockchainService):
|
|||
block_hashX_history_puts.append(((hashX, 0), (compact_hist_txs,)))
|
||||
if not new_history:
|
||||
continue
|
||||
|
||||
needed_tx_infos = []
|
||||
append_needed_tx_info = needed_tx_infos.append
|
||||
tx_infos = {}
|
||||
for tx_num in tx_nums:
|
||||
cached_tx_info = self.history_tx_info_cache.get(tx_num)
|
||||
if cached_tx_info is not None:
|
||||
tx_infos[tx_num] = cached_tx_info
|
||||
else:
|
||||
append_needed_tx_info(tx_num)
|
||||
if needed_tx_infos:
|
||||
for tx_num, tx_hash in zip(needed_tx_infos, self.db._get_tx_hashes(needed_tx_infos)):
|
||||
tx_info = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:'
|
||||
tx_infos[tx_num] = tx_info
|
||||
self.history_tx_info_cache[tx_num] = tx_info
|
||||
history = ''.join(map(tx_infos.__getitem__, tx_nums))
|
||||
for tx_hash, height in new_history:
|
||||
history += f'{tx_hash[::-1].hex()}:{height:d}:'
|
||||
if history:
|
||||
status = sha256(history.encode())
|
||||
self.db.prefix_db.hashX_status.stash_put((hashX,), (status,))
|
||||
hasher = existing_status_hashers[hashX]
|
||||
if hasher is None:
|
||||
# this is to migrate in the new column family, in the future it can be a new digester
|
||||
# hasher = ResumableSHA256()
|
||||
hasher = _rebuild_hasher(tx_nums)
|
||||
else:
|
||||
self.db.prefix_db.hashX_history_hasher.stash_delete((hashX,), (hasher,))
|
||||
hasher.update(b''.join(
|
||||
f'{tx_hash[::-1].hex()}:{height:d}:'.encode() for tx_hash, height in new_history
|
||||
))
|
||||
self.db.prefix_db.hashX_history_hasher.stash_put((hashX,), (hasher,))
|
||||
status = hasher.digest()
|
||||
self.db.prefix_db.hashX_status.stash_put((hashX,), (status,))
|
||||
|
||||
self.db.prefix_db.multi_delete(block_hashX_history_deletes)
|
||||
self.db.prefix_db.hashX_history.stash_multi_put(block_hashX_history_puts)
|
||||
|
@ -2074,6 +2070,8 @@ class BlockchainProcessorService(BlockchainService):
|
|||
"""Loop forever processing blocks as they arrive."""
|
||||
self._caught_up_event = caught_up_event
|
||||
try:
|
||||
if os.path.exists(self.db._need_restart_path):
|
||||
raise RuntimeError('scribe restart is needed')
|
||||
if self.height != self.daemon.cached_height() and not self.db.catching_up:
|
||||
await self._need_catch_up() # tell the readers that we're still catching up with lbrycrd/lbcd
|
||||
while not self._stopping:
|
||||
|
@ -2135,9 +2133,6 @@ class BlockchainProcessorService(BlockchainService):
|
|||
async def _finished_initial_catch_up(self):
|
||||
self.log.info(f'caught up to height {self.height}')
|
||||
|
||||
if self.env.index_address_status and self.db.last_indexed_address_status_height < self.db.db_height:
|
||||
await self.db.rebuild_hashX_status_index(self.db.last_indexed_address_status_height)
|
||||
|
||||
# Flush everything but with catching_up->False state.
|
||||
self.db.catching_up = False
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@ class BlockchainService:
|
|||
def open_db(self):
|
||||
env = self.env
|
||||
self.db = SecondaryDB(
|
||||
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
|
||||
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
|
||||
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
|
||||
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
||||
index_address_status=env.index_address_status
|
||||
|
|
3
setup.py
3
setup.py
|
@ -44,7 +44,8 @@ setup(
|
|||
'filetype==1.0.9',
|
||||
'grpcio==1.38.0',
|
||||
'lbry-rocksdb==0.8.2',
|
||||
'ujson==5.4.0'
|
||||
'ujson==5.4.0',
|
||||
'rehash==1.0.0'
|
||||
],
|
||||
extras_require={
|
||||
'lint': ['pylint==2.10.0'],
|
||||
|
|
Loading…
Reference in a new issue