Compare commits

...

11 commits

Author SHA1 Message Date
Jack Robison
ebcc6e5086
update snapshot url in example docker-composes 2023-03-03 13:15:25 -05:00
Jack Robison
c0766f6abc faster RevertableOpStack.apply_packed_undo_ops 2023-02-13 13:09:03 -05:00
Jack Robison
7bc90c425f
fix error 2023-02-07 17:31:36 -05:00
Jack Robison
f2c494d4d6 check for scribe needing a restart due to low memory 2023-02-07 17:23:23 -05:00
Jack Robison
8147bbf3b9 remove --cache_all_claim_txos setting 2023-02-07 17:23:23 -05:00
Jack Robison
adbeeaf203 update snapshot 2023-01-06 20:32:45 -05:00
Jack Robison
f55ed56215 add comment 2023-01-06 20:32:45 -05:00
Jack Robison
d1d33c4bce feedback 2023-01-06 20:32:45 -05:00
Jack Robison
b7de08ba0b add ResumableSHA256 and HashXHistoryHasherPrefixRow column family 2023-01-06 20:32:45 -05:00
Jack Robison
405cef8d28 ResumableSHA256 2023-01-06 20:32:45 -05:00
Jack Robison
21262d2e43
fix edge case deleting an empty value 2022-12-28 13:40:19 -05:00
20 changed files with 224 additions and 133 deletions

View file

@ -17,7 +17,7 @@ services:
- "lbry_rocksdb:/database" - "lbry_rocksdb:/database"
environment: environment:
- HUB_COMMAND=scribe - HUB_COMMAND=scribe
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1256013/lbry-rocksdb.tar - SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1312050/lbry-rocksdb.tar
command: # for full options, see `scribe --help` command: # for full options, see `scribe --help`
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245" - "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--max_query_workers=2" - "--max_query_workers=2"

View file

@ -14,7 +14,7 @@ services:
- "lbry_rocksdb:/database" - "lbry_rocksdb:/database"
environment: environment:
- HUB_COMMAND=scribe - HUB_COMMAND=scribe
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1256013/lbry-rocksdb.tar - SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1312050/lbry-rocksdb.tar
command: command:
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245" - "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--max_query_workers=2" - "--max_query_workers=2"

View file

@ -7,12 +7,14 @@ import logging
import logging.handlers import logging.handlers
import typing import typing
import collections import collections
from ctypes import cast, memmove, POINTER, c_void_p
from bisect import insort_right from bisect import insort_right
from collections import deque from collections import deque
from decimal import Decimal from decimal import Decimal
from typing import Iterable, Deque from typing import Iterable, Deque
from asyncio import get_event_loop, Event from asyncio import get_event_loop, Event
from prometheus_client import Counter from prometheus_client import Counter
from rehash.structs import EVPobject
from hub.schema.tags import clean_tags from hub.schema.tags import clean_tags
from hub.schema.url import normalize_name from hub.schema.url import normalize_name
from hub.error import TooManyClaimSearchParametersError from hub.error import TooManyClaimSearchParametersError
@ -1059,3 +1061,41 @@ async def asyncify_for_loop(gen, ticks_per_sleep: int = 1000):
yield item yield item
if cnt % ticks_per_sleep == 0: if cnt % ticks_per_sleep == 0:
await async_sleep(0) await async_sleep(0)
_SHA256_DIGEST_STATE_SIZE = 120
class ResumableSHA256:
__slots__ = ['_hasher']
def __init__(self, state: typing.Optional[bytes] = None):
self._hasher = hashlib.sha256()
if state is not None:
ctx = self._get_evp_md_ctx()
ctx_size = ctx.digest.contents.ctx_size
if len(state) != _SHA256_DIGEST_STATE_SIZE != ctx_size:
raise Exception(f'invalid sha256 digester state, got {len(state)} bytes')
memmove(ctx.md_data, state, ctx_size)
def _get_evp_md_ctx(self):
c_evp_obj = cast(c_void_p(id(self._hasher)), POINTER(EVPobject))
if hasattr(c_evp_obj.contents.ctx, "contents"):
return c_evp_obj.contents.ctx.contents
else:
return c_evp_obj.contents.ctx
def get_state(self) -> bytes:
ctx = self._get_evp_md_ctx()
ctx_size = ctx.digest.contents.ctx_size
hasher_state = ctx.md_data[:ctx_size]
return hasher_state
def __copy__(self):
return ResumableSHA256(self.get_state())
def update(self, data: bytes):
self._hasher.update(data)
def digest(self):
return self._hasher.digest()

View file

@ -51,6 +51,7 @@ class DB_PREFIXES(enum.Enum):
reposted_count = b'j' reposted_count = b'j'
effective_amount = b'i' effective_amount = b'i'
future_effective_amount = b'k' future_effective_amount = b'k'
hashX_history_hash = b'l'
COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass

View file

@ -37,7 +37,7 @@ class SecondaryDB:
DB_VERSIONS = [7, 8, 9, 10, 11, 12] DB_VERSIONS = [7, 8, 9, 10, 11, 12]
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200, def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, cache_all_tx_hashes: bool = False,
blocking_channel_ids: List[str] = None, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768, index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768,
@ -47,9 +47,9 @@ class SecondaryDB:
self._executor = executor self._executor = executor
self._db_dir = db_dir self._db_dir = db_dir
self._reorg_limit = reorg_limit self._reorg_limit = reorg_limit
self._cache_all_claim_txos = cache_all_claim_txos
self._cache_all_tx_hashes = cache_all_tx_hashes self._cache_all_tx_hashes = cache_all_tx_hashes
self._secondary_name = secondary_name self._secondary_name = secondary_name
self._need_restart_path = os.path.join(self._db_dir, 'NEED_SCRIBE_RESTART')
if secondary_name: if secondary_name:
assert max_open_files == -1, 'max open files must be -1 for secondary readers' assert max_open_files == -1, 'max open files must be -1 for secondary readers'
self._db_max_open_files = max_open_files self._db_max_open_files = max_open_files
@ -100,9 +100,6 @@ class SecondaryDB:
self.total_transactions: List[bytes] = [] self.total_transactions: List[bytes] = []
self.tx_num_mapping: Dict[bytes, int] = {} self.tx_num_mapping: Dict[bytes, int] = {}
# these are only used if the cache_all_claim_txos setting is on
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict)
self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH) self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH)
def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]: def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]:
@ -956,21 +953,6 @@ class SecondaryDB:
else: else:
assert self.db_tx_count == 0 assert self.db_tx_count == 0
async def _read_claim_txos(self):
def read_claim_txos():
set_claim_to_txo = self.claim_to_txo.__setitem__
for k, v in self.prefix_db.claim_to_txo.iterate(fill_cache=False):
set_claim_to_txo(k.claim_hash, v)
self.txo_to_claim[v.tx_num][v.position] = k.claim_hash
self.claim_to_txo.clear()
self.txo_to_claim.clear()
start = time.perf_counter()
self.logger.info("loading claims")
await asyncio.get_event_loop().run_in_executor(self._executor, read_claim_txos)
ts = time.perf_counter() - start
self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4))
# async def _read_headers(self): # async def _read_headers(self):
# # if self.headers is not None: # # if self.headers is not None:
# # return # # return
@ -1019,6 +1001,27 @@ class SecondaryDB:
secondary_path = '' if not self._secondary_name else os.path.join( secondary_path = '' if not self._secondary_name else os.path.join(
self._db_dir, self._secondary_name self._db_dir, self._secondary_name
) )
open_db_canary = None
if self._secondary_name:
open_db_canary = os.path.join(self._db_dir, f'{self._secondary_name}-db-canary')
if os.path.exists(open_db_canary):
with open(self._need_restart_path, 'w+') as f:
f.write(f"{time.strftime(f'%Y-%m-%d %H:%M:%S')} {self._secondary_name}\n")
raise RuntimeError('scribe restart is needed')
else:
with open(open_db_canary, 'w'):
pass
else:
herald_db_canary = os.path.join(self._db_dir, 'lbry-reader-db-canary')
es_sync_db_canary = os.path.join(self._db_dir, 'lbry-elastic-writer-db-canary')
if os.path.exists(herald_db_canary):
os.remove(herald_db_canary)
if os.path.exists(es_sync_db_canary):
os.remove(es_sync_db_canary)
if os.path.exists(self._need_restart_path):
os.remove(self._need_restart_path)
db_path = os.path.join(self._db_dir, 'lbry-rocksdb') db_path = os.path.join(self._db_dir, 'lbry-rocksdb')
self.prefix_db = PrefixDB( self.prefix_db = PrefixDB(
db_path, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files, db_path, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files,
@ -1027,6 +1030,7 @@ class SecondaryDB:
) )
if secondary_path != '': if secondary_path != '':
os.remove(open_db_canary)
self.logger.info(f'opened db for read only: lbry-rocksdb (%s)', db_path) self.logger.info(f'opened db for read only: lbry-rocksdb (%s)', db_path)
else: else:
self.logger.info(f'opened db for writing: lbry-rocksdb (%s)', db_path) self.logger.info(f'opened db for writing: lbry-rocksdb (%s)', db_path)
@ -1063,8 +1067,6 @@ class SecondaryDB:
async def initialize_caches(self): async def initialize_caches(self):
await self._read_tx_counts() await self._read_tx_counts()
await self._read_block_hashes() await self._read_block_hashes()
if self._cache_all_claim_txos:
await self._read_claim_txos()
if self._cache_all_tx_hashes: if self._cache_all_tx_hashes:
await self._read_tx_hashes() await self._read_tx_hashes()
if self.db_height > 0: if self.db_height > 0:
@ -1154,15 +1156,9 @@ class SecondaryDB:
} }
def get_cached_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]: def get_cached_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]:
if self._cache_all_claim_txos:
return self.claim_to_txo.get(claim_hash)
return self.prefix_db.claim_to_txo.get_pending(claim_hash) return self.prefix_db.claim_to_txo.get_pending(claim_hash)
def get_cached_claim_hash(self, tx_num: int, position: int) -> Optional[bytes]: def get_cached_claim_hash(self, tx_num: int, position: int) -> Optional[bytes]:
if self._cache_all_claim_txos:
if tx_num not in self.txo_to_claim:
return
return self.txo_to_claim[tx_num].get(position, None)
v = self.prefix_db.txo_to_claim.get_pending(tx_num, position) v = self.prefix_db.txo_to_claim.get_pending(tx_num, position)
return None if not v else v.claim_hash return None if not v else v.claim_hash

View file

@ -273,6 +273,7 @@ class BasePrefixDB:
undo_c_f = self.column_families[DB_PREFIXES.undo.value] undo_c_f = self.column_families[DB_PREFIXES.undo.value]
undo_info = self._db.get((undo_c_f, undo_key)) undo_info = self._db.get((undo_c_f, undo_key))
self._op_stack.apply_packed_undo_ops(undo_info) self._op_stack.apply_packed_undo_ops(undo_info)
self._op_stack.validate_and_apply_stashed_ops()
try: try:
with self._db.write_batch(sync=True) as batch: with self._db.write_batch(sync=True) as batch:
batch_put = batch.put batch_put = batch.put

View file

@ -3,6 +3,7 @@ import struct
import array import array
import base64 import base64
from typing import Union, Tuple, NamedTuple, Optional from typing import Union, Tuple, NamedTuple, Optional
from hub.common import ResumableSHA256
from hub.db.common import DB_PREFIXES from hub.db.common import DB_PREFIXES
from hub.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow from hub.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow
from hub.schema.url import normalize_name from hub.schema.url import normalize_name
@ -1851,6 +1852,46 @@ class FutureEffectiveAmountPrefixRow(PrefixRow):
return cls.pack_key(claim_hash), cls.pack_value(future_effective_amount) return cls.pack_key(claim_hash), cls.pack_value(future_effective_amount)
class HashXHistoryHasherKey(NamedTuple):
hashX: bytes
class HashXHistoryHasherValue(NamedTuple):
hasher: ResumableSHA256
class HashXHistoryHasherPrefixRow(PrefixRow):
prefix = DB_PREFIXES.hashX_history_hash.value
key_struct = struct.Struct(b'>11s')
value_struct = struct.Struct(b'>120s')
cache_size = 1024 * 1024 * 64
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>11s').pack
]
@classmethod
def pack_key(cls, hashX: bytes):
return super().pack_key(hashX)
@classmethod
def unpack_key(cls, key: bytes) -> HashXHistoryHasherKey:
return HashXHistoryHasherKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, hasher: ResumableSHA256) -> bytes:
return super().pack_value(hasher.get_state())
@classmethod
def unpack_value(cls, data: bytes) -> HashXHistoryHasherValue:
return HashXHistoryHasherValue(ResumableSHA256(*super().unpack_value(data)))
@classmethod
def pack_item(cls, hashX: bytes, hasher: ResumableSHA256):
return cls.pack_key(hashX), cls.pack_value(hasher)
class PrefixDB(BasePrefixDB): class PrefixDB(BasePrefixDB):
def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64, def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64,
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None, secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None,
@ -1897,6 +1938,7 @@ class PrefixDB(BasePrefixDB):
self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack) self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack)
self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack) self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack)
self.future_effective_amount = FutureEffectiveAmountPrefixRow(db, self._op_stack) self.future_effective_amount = FutureEffectiveAmountPrefixRow(db, self._op_stack)
self.hashX_history_hasher = HashXHistoryHasherPrefixRow(db, self._op_stack)
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]: def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:

View file

@ -160,7 +160,7 @@ class RevertableOpStack:
# there is a value and we're not deleting it in this op # there is a value and we're not deleting it in this op
# check that a delete for the stored value is in the stack # check that a delete for the stored value is in the stack
raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}\nvs\n{stored_val}") raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}\nvs\n{stored_val}")
elif not stored_val: elif not has_stored_val:
raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}") raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
elif stored_val != op.value: elif stored_val != op.value:
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}") raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
@ -324,9 +324,21 @@ class RevertableOpStack:
""" """
Unpack and apply a sequence of undo ops from serialized undo bytes Unpack and apply a sequence of undo ops from serialized undo bytes
""" """
while packed: offset = 0
op, packed = RevertableOp.unpack(packed) packed_size = len(packed)
self.append_op(op) while offset < packed_size:
is_put, key_len, val_len = _OP_STRUCT.unpack(packed[offset:offset + 9])
offset += 9
key = packed[offset:offset + key_len]
offset += key_len
value = packed[offset:offset + val_len]
offset += val_len
if is_put == 1:
op = RevertablePut(key, value)
else:
op = RevertableDelete(key, value)
self._stash.append(op)
self._stashed_last_op_for_key[op.key] = op
def get_pending_op(self, key: bytes) -> Optional[RevertableOp]: def get_pending_op(self, key: bytes) -> Optional[RevertableOp]:
if key in self._stashed_last_op_for_key: if key in self._stashed_last_op_for_key:

View file

@ -9,11 +9,11 @@ from hub.db.common import ResolveResult
class ElasticSyncDB(SecondaryDB): class ElasticSyncDB(SecondaryDB):
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200, def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, cache_all_tx_hashes: bool = False,
blocking_channel_ids: List[str] = None, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False): index_address_status=False):
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos, super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit,
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor, cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
index_address_status) index_address_status)
self.block_timestamp_cache = LRUCache(1024) self.block_timestamp_cache = LRUCache(1024)

View file

@ -3,11 +3,11 @@ from hub.env import Env
class ElasticEnv(Env): class ElasticEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None, def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None,
cache_all_tx_hashes=None, cache_all_claim_txos=None, elastic_host=None, elastic_port=None, cache_all_tx_hashes=None, elastic_host=None, elastic_port=None,
es_index_prefix=None, elastic_notifier_host=None, elastic_notifier_port=None, es_index_prefix=None, elastic_notifier_host=None, elastic_notifier_port=None,
blocking_channel_ids=None, filtering_channel_ids=None, reindex=False): blocking_channel_ids=None, filtering_channel_ids=None, reindex=False):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids) blocking_channel_ids, filtering_channel_ids)
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200) self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default( self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default(
@ -43,7 +43,7 @@ class ElasticEnv(Env):
elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain, elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain,
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit,
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes, prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos, blocking_channel_ids=args.blocking_channel_ids, blocking_channel_ids=args.blocking_channel_ids,
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host, filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
elastic_notifier_port=args.elastic_notifier_port elastic_notifier_port=args.elastic_notifier_port
) )

View file

@ -49,7 +49,7 @@ class ElasticSyncService(BlockchainReaderService):
def open_db(self): def open_db(self):
env = self.env env = self.env
self.db = ElasticSyncDB( self.db = ElasticSyncDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos, env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids, env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor, filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status index_address_status=env.index_address_status

View file

@ -30,7 +30,7 @@ class Env:
pass pass
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, prometheus_port=None, cache_all_tx_hashes=None,
blocking_channel_ids=None, filtering_channel_ids=None, index_address_status=None): blocking_channel_ids=None, filtering_channel_ids=None, index_address_status=None):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
@ -46,7 +46,6 @@ class Env:
self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0) self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0)
self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False) self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False)
self.cache_all_claim_txos = cache_all_claim_txos if cache_all_claim_txos is not None else self.boolean('CACHE_ALL_CLAIM_TXOS', False)
# Filtering / Blocking # Filtering / Blocking
self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default( self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default(
'BLOCKING_CHANNEL_IDS', '').split(' ') 'BLOCKING_CHANNEL_IDS', '').split(' ')
@ -171,11 +170,6 @@ class Env:
"resolve, transaction fetching, and block sync all faster at the expense of higher " "resolve, transaction fetching, and block sync all faster at the expense of higher "
"memory usage (at least 10GB more). Can be set in env with 'CACHE_ALL_TX_HASHES'.", "memory usage (at least 10GB more). Can be set in env with 'CACHE_ALL_TX_HASHES'.",
default=cls.boolean('CACHE_ALL_TX_HASHES', False)) default=cls.boolean('CACHE_ALL_TX_HASHES', False))
parser.add_argument('--cache_all_claim_txos', action='store_true',
help="Load all claim txos into memory. This will make address subscriptions and sync, "
"resolve, transaction fetching, and block sync all faster at the expense of higher "
"memory usage. Can be set in env with 'CACHE_ALL_CLAIM_TXOS'.",
default=cls.boolean('CACHE_ALL_CLAIM_TXOS', False))
parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0), parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0),
help="Port for prometheus metrics to listen on, disabled by default. " help="Port for prometheus metrics to listen on, disabled by default. "
"Can be set in env with 'PROMETHEUS_PORT'.") "Can be set in env with 'PROMETHEUS_PORT'.")

View file

@ -6,11 +6,11 @@ from hub.db import SecondaryDB
class HeraldDB(SecondaryDB): class HeraldDB(SecondaryDB):
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200, def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, cache_all_tx_hashes: bool = False,
blocking_channel_ids: List[str] = None, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768): index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768):
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos, super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit,
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor, cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
index_address_status, merkle_cache_size, tx_cache_size) index_address_status, merkle_cache_size, tx_cache_size)
# self.headers = None # self.headers = None

View file

@ -19,7 +19,7 @@ def parse_es_services(elastic_services_arg: str):
class ServerEnv(Env): class ServerEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, prometheus_port=None, cache_all_tx_hashes=None,
daemon_url=None, host=None, elastic_services=None, es_index_prefix=None, daemon_url=None, host=None, elastic_services=None, es_index_prefix=None,
tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None, tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None,
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
@ -29,7 +29,7 @@ class ServerEnv(Env):
merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None, merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None,
history_tx_cache_size=None, largest_address_history_cache_size=None): history_tx_cache_size=None, largest_address_history_cache_size=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) blocking_channel_ids, filtering_channel_ids, index_address_status)
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.host = host if host is not None else self.default('HOST', 'localhost') self.host = host if host is not None else self.default('HOST', 'localhost')
self.elastic_services = deque(parse_es_services(elastic_services or 'localhost:9200/localhost:19080')) self.elastic_services = deque(parse_es_services(elastic_services or 'localhost:9200/localhost:19080'))
@ -153,7 +153,7 @@ class ServerEnv(Env):
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port, es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file, udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file,
allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes, allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos, country=args.country, payment_address=args.payment_address, country=args.country, payment_address=args.payment_address,
donation_address=args.donation_address, max_send=args.max_send, max_receive=args.max_receive, donation_address=args.donation_address, max_send=args.max_send, max_receive=args.max_receive,
max_sessions=args.max_sessions, session_timeout=args.session_timeout, max_sessions=args.max_sessions, session_timeout=args.session_timeout,
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee, drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,

View file

@ -53,7 +53,7 @@ class HubServerService(BlockchainReaderService):
def open_db(self): def open_db(self):
env = self.env env = self.env
self.db = HeraldDB( self.db = HeraldDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos, env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids, env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor, filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status, merkle_cache_size=env.merkle_cache_size, index_address_status=env.index_address_status, merkle_cache_size=env.merkle_cache_size,

View file

@ -5,17 +5,17 @@ import time
from typing import List from typing import List
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
from bisect import bisect_right from bisect import bisect_right
from hub.common import sha256 from hub.common import ResumableSHA256
from hub.db import SecondaryDB from hub.db import SecondaryDB
class PrimaryDB(SecondaryDB): class PrimaryDB(SecondaryDB):
def __init__(self, coin, db_dir: str, reorg_limit: int = 200, def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, cache_all_tx_hashes: bool = False,
max_open_files: int = 64, blocking_channel_ids: List[str] = None, max_open_files: int = 64, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False, enforce_integrity=True): index_address_status=False, enforce_integrity=True):
super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_claim_txos, cache_all_tx_hashes, super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_tx_hashes,
blocking_channel_ids, filtering_channel_ids, executor, index_address_status, blocking_channel_ids, filtering_channel_ids, executor, index_address_status,
enforce_integrity=enforce_integrity) enforce_integrity=enforce_integrity)
@ -35,16 +35,19 @@ class PrimaryDB(SecondaryDB):
if last_hashX: if last_hashX:
yield last_hashX yield last_hashX
def hashX_status_from_history(history: bytes) -> bytes: def hashX_status_from_history(history: bytes) -> ResumableSHA256:
tx_counts = self.tx_counts tx_counts = self.tx_counts
hist_tx_nums = array.array('I') hist_tx_nums = array.array('I')
hist_tx_nums.frombytes(history) hist_tx_nums.frombytes(history)
digest = hashlib.sha256() digest = ResumableSHA256()
for tx_num, tx_hash in zip( digest.update(
b''.join(f'{tx_hash[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'.encode()
for tx_num, tx_hash in zip(
hist_tx_nums, hist_tx_nums,
self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)): self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)
digest.update(f'{tx_hash[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'.encode()) ))
return digest.digest() )
return digest
start = time.perf_counter() start = time.perf_counter()
@ -67,17 +70,24 @@ class PrimaryDB(SecondaryDB):
hashX_cnt += 1 hashX_cnt += 1
key = prefix_db.hashX_status.pack_key(hashX) key = prefix_db.hashX_status.pack_key(hashX)
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False)) history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
status = hashX_status_from_history(history) digester = hashX_status_from_history(history)
status = digester.digest()
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False) existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
if existing_status and existing_status == status: existing_digester = prefix_db.hashX_history_hasher.get(hashX)
continue if not existing_status:
elif not existing_status:
prefix_db.stash_raw_put(key, status) prefix_db.stash_raw_put(key, status)
op_cnt += 1 op_cnt += 1
else: else:
prefix_db.stash_raw_delete(key, existing_status) prefix_db.stash_raw_delete(key, existing_status)
prefix_db.stash_raw_put(key, status) prefix_db.stash_raw_put(key, status)
op_cnt += 2 op_cnt += 2
if not existing_digester:
prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,))
op_cnt += 1
else:
prefix_db.hashX_history_hasher.stash_delete((hashX,), existing_digester)
prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,))
op_cnt += 2
if op_cnt > 100000: if op_cnt > 100000:
prefix_db.unsafe_commit() prefix_db.unsafe_commit()
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...") self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")

View file

@ -3,14 +3,13 @@ from hub.env import Env
class BlockchainEnv(Env): class BlockchainEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, prometheus_port=None, cache_all_tx_hashes=None, blocking_channel_ids=None, filtering_channel_ids=None,
blocking_channel_ids=None, filtering_channel_ids=None,
db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None, db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None,
index_address_status=None, rebuild_address_status_from_height=None, index_address_status=None, rebuild_address_status_from_height=None,
daemon_ca_path=None, history_tx_cache_size=None, daemon_ca_path=None, history_tx_cache_size=None,
db_disable_integrity_checks=False): db_disable_integrity_checks=False):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) blocking_channel_ids, filtering_channel_ids, index_address_status)
self.db_max_open_files = db_max_open_files self.db_max_open_files = db_max_open_files
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \ self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \
@ -57,7 +56,7 @@ class BlockchainEnv(Env):
db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files, db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files,
max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit, max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit,
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes, prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses, index_address_status=args.index_address_statuses,
hashX_history_cache_size=args.address_history_cache_size, hashX_history_cache_size=args.address_history_cache_size,
rebuild_address_status_from_height=args.rebuild_address_status_from_height, rebuild_address_status_from_height=args.rebuild_address_status_from_height,
daemon_ca_path=args.daemon_ca_path, history_tx_cache_size=args.history_tx_cache_size, daemon_ca_path=args.daemon_ca_path, history_tx_cache_size=args.history_tx_cache_size,

View file

@ -1,3 +1,4 @@
import os
import time import time
import asyncio import asyncio
import typing import typing
@ -11,7 +12,8 @@ from hub import PROMETHEUS_NAMESPACE
from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from hub.error.base import ChainError from hub.error.base import ChainError
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache, LFUCacheWithMetrics from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache
from hub.common import ResumableSHA256, LFUCacheWithMetrics
from hub.scribe.db import PrimaryDB from hub.scribe.db import PrimaryDB
from hub.scribe.daemon import LBCDaemon from hub.scribe.daemon import LBCDaemon
from hub.scribe.transaction import Tx, TxOutput, TxInput, Block from hub.scribe.transaction import Tx, TxOutput, TxInput, Block
@ -137,11 +139,10 @@ class BlockchainProcessorService(BlockchainService):
def open_db(self): def open_db(self):
env = self.env env = self.env
self.db = PrimaryDB( self.db = PrimaryDB(
env.coin, env.db_dir, env.reorg_limit, cache_all_claim_txos=env.cache_all_claim_txos, env.coin, env.db_dir, env.reorg_limit, cache_all_tx_hashes=env.cache_all_tx_hashes,
cache_all_tx_hashes=env.cache_all_tx_hashes, max_open_files=env.db_max_open_files, max_open_files=env.db_max_open_files, blocking_channel_ids=env.blocking_channel_ids,
blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids, filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
executor=self._executor, index_address_status=env.index_address_status, index_address_status=env.index_address_status, enforce_integrity=not env.db_disable_integrity_checks
enforce_integrity=not env.db_disable_integrity_checks
) )
async def run_in_thread_with_lock(self, func, *args): async def run_in_thread_with_lock(self, func, *args):
@ -169,10 +170,22 @@ class BlockchainProcessorService(BlockchainService):
def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete): def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete):
self.mempool.remove(to_delete) self.mempool.remove(to_delete)
touched_hashXs = self.mempool.update_mempool(to_put) touched_hashXs = list(self.mempool.update_mempool(to_put))
if self.env.index_address_status: if self.env.index_address_status:
for hashX in touched_hashXs: status_hashers = {
self._get_update_hashX_mempool_status_ops(hashX) k: v.hasher if v else ResumableSHA256() for k, v in zip(
touched_hashXs,
self.db.prefix_db.hashX_history_hasher.multi_get([(hashX,) for hashX in touched_hashXs])
)
}
for hashX, v in zip(
touched_hashXs,
self.db.prefix_db.hashX_mempool_status.multi_get([(hashX,) for hashX in touched_hashXs])):
if v is not None:
self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), v)
hasher = status_hashers[hashX]
hasher.update(self.mempool.mempool_history(hashX).encode())
self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (hasher.digest(),))
for tx_hash, raw_tx in to_put: for tx_hash, raw_tx in to_put:
mempool_prefix.stash_put((tx_hash,), (raw_tx,)) mempool_prefix.stash_put((tx_hash,), (raw_tx,))
for tx_hash, raw_tx in to_delete.items(): for tx_hash, raw_tx in to_delete.items():
@ -263,9 +276,6 @@ class BlockchainProcessorService(BlockchainService):
for _ in range(count): for _ in range(count):
await self.run_in_thread_with_lock(self.backup_block) await self.run_in_thread_with_lock(self.backup_block)
self.log.info(f'backed up to height {self.height:,d}') self.log.info(f'backed up to height {self.height:,d}')
if self.env.cache_all_claim_txos:
await self.db._read_claim_txos() # TODO: don't do this
await self.prefetcher.reset_height(self.height) await self.prefetcher.reset_height(self.height)
self.reorg_count_metric.inc() self.reorg_count_metric.inc()
except: except:
@ -394,12 +404,6 @@ class BlockchainProcessorService(BlockchainService):
if claim_hash not in self.updated_claim_previous_activations: if claim_hash not in self.updated_claim_previous_activations:
self.updated_claim_previous_activations[claim_hash] = activation self.updated_claim_previous_activations[claim_hash] = activation
if self.env.cache_all_claim_txos:
self.db.claim_to_txo[claim_hash] = ClaimToTXOValue(
tx_num, nout, root_tx_num, root_idx, txo.value, channel_signature_is_valid, claim_name
)
self.db.txo_to_claim[tx_num][nout] = claim_hash
pending = StagedClaimtrieItem( pending = StagedClaimtrieItem(
claim_name, normalized_name, claim_hash, txo.value, self.coin.get_expiration_height(height), tx_num, nout, claim_name, normalized_name, claim_hash, txo.value, self.coin.get_expiration_height(height), tx_num, nout,
root_tx_num, root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash root_tx_num, root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash
@ -690,11 +694,6 @@ class BlockchainProcessorService(BlockchainService):
if 0 < activation <= self.height: if 0 < activation <= self.height:
self.effective_amount_delta[claim_hash] -= spent.amount self.effective_amount_delta[claim_hash] -= spent.amount
self.future_effective_amount_delta[spent.claim_hash] -= spent.amount self.future_effective_amount_delta[spent.claim_hash] -= spent.amount
if self.env.cache_all_claim_txos:
claim_hash = self.db.txo_to_claim[txin_num].pop(nout)
if not self.db.txo_to_claim[txin_num]:
self.db.txo_to_claim.pop(txin_num)
self.db.claim_to_txo.pop(claim_hash)
if spent.reposted_claim_hash: if spent.reposted_claim_hash:
self.pending_reposted.add(spent.reposted_claim_hash) self.pending_reposted.add(spent.reposted_claim_hash)
if spent.signing_hash and spent.channel_signature_is_valid and spent.signing_hash not in self.abandoned_claims: if spent.signing_hash and spent.channel_signature_is_valid and spent.signing_hash not in self.abandoned_claims:
@ -1637,15 +1636,6 @@ class BlockchainProcessorService(BlockchainService):
self.hashX_full_cache[hashX] = history self.hashX_full_cache[hashX] = history
return history return history
def _get_update_hashX_mempool_status_ops(self, hashX: bytes):
existing = self.db.prefix_db.hashX_mempool_status.get(hashX)
if existing:
self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), existing)
history = self._get_cached_hashX_history(hashX) + self.mempool.mempool_history(hashX)
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (status,))
def advance_block(self, block: Block): def advance_block(self, block: Block):
txo_count = 0 txo_count = 0
txi_count = 0 txi_count = 0
@ -1745,9 +1735,7 @@ class BlockchainProcessorService(BlockchainService):
# update hashX history status hashes and compactify the histories # update hashX history status hashes and compactify the histories
self._get_update_hashX_histories_ops(height) self._get_update_hashX_histories_ops(height)
# only compactify adddress histories and update the status index if we're already caught up, if self.env.index_address_status:
# a bulk update will happen once catchup finishes
if not self.db.catching_up and self.env.index_address_status:
self._get_compactify_ops(height) self._get_compactify_ops(height)
self.db.last_indexed_address_status_height = height self.db.last_indexed_address_status_height = height
@ -1802,6 +1790,17 @@ class BlockchainProcessorService(BlockchainService):
) )
def _get_compactify_ops(self, height: int): def _get_compactify_ops(self, height: int):
def _rebuild_hasher(hist_tx_nums):
hasher = ResumableSHA256()
hasher.update(
b''.join(f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num)}:'.encode()
for tx_num, tx_hash in zip(
hist_tx_nums,
self.db.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)
))
)
return hasher
existing_hashX_statuses = self.db.prefix_db.hashX_status.multi_get([(hashX,) for hashX in self.hashXs_by_tx.keys()], deserialize_value=False) existing_hashX_statuses = self.db.prefix_db.hashX_status.multi_get([(hashX,) for hashX in self.hashXs_by_tx.keys()], deserialize_value=False)
if existing_hashX_statuses: if existing_hashX_statuses:
pack_key = self.db.prefix_db.hashX_status.pack_key pack_key = self.db.prefix_db.hashX_status.pack_key
@ -1816,6 +1815,13 @@ class BlockchainProcessorService(BlockchainService):
append_deletes_hashX_history = block_hashX_history_deletes.append append_deletes_hashX_history = block_hashX_history_deletes.append
block_hashX_history_puts = [] block_hashX_history_puts = []
existing_status_hashers = {
k: v.hasher if v else None for k, v in zip(
self.hashXs_by_tx,
self.db.prefix_db.hashX_history_hasher.multi_get([(hashX,) for hashX in self.hashXs_by_tx])
)
}
for (hashX, new_tx_nums), existing in zip(self.hashXs_by_tx.items(), existing_hashX_statuses): for (hashX, new_tx_nums), existing in zip(self.hashXs_by_tx.items(), existing_hashX_statuses):
new_history = [(self.pending_transactions[tx_num], height) for tx_num in new_tx_nums] new_history = [(self.pending_transactions[tx_num], height) for tx_num in new_tx_nums]
@ -1830,11 +1836,9 @@ class BlockchainProcessorService(BlockchainService):
unpack_key = self.db.prefix_db.hashX_history.unpack_key unpack_key = self.db.prefix_db.hashX_history.unpack_key
needs_compaction = False needs_compaction = False
total_hist_txs = b''
for k, hist in self.db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_key=False, for k, hist in self.db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_key=False,
deserialize_value=False): deserialize_value=False):
hist_txs = unpack_history(hist) hist_txs = unpack_history(hist)
total_hist_txs += hist
txs_extend(hist_txs) txs_extend(hist_txs)
hist_height = unpack_key(k).height hist_height = unpack_key(k).height
if height > reorg_limit and hist_height < height - reorg_limit: if height > reorg_limit and hist_height < height - reorg_limit:
@ -1853,27 +1857,19 @@ class BlockchainProcessorService(BlockchainService):
block_hashX_history_puts.append(((hashX, 0), (compact_hist_txs,))) block_hashX_history_puts.append(((hashX, 0), (compact_hist_txs,)))
if not new_history: if not new_history:
continue continue
hasher = existing_status_hashers[hashX]
needed_tx_infos = [] if hasher is None:
append_needed_tx_info = needed_tx_infos.append # this is to migrate in the new column family, in the future it can be a new digester
tx_infos = {} # hasher = ResumableSHA256()
for tx_num in tx_nums: hasher = _rebuild_hasher(tx_nums)
cached_tx_info = self.history_tx_info_cache.get(tx_num) else:
if cached_tx_info is not None: self.db.prefix_db.hashX_history_hasher.stash_delete((hashX,), (hasher,))
tx_infos[tx_num] = cached_tx_info hasher.update(b''.join(
else: f'{tx_hash[::-1].hex()}:{height:d}:'.encode() for tx_hash, height in new_history
append_needed_tx_info(tx_num) ))
if needed_tx_infos: self.db.prefix_db.hashX_history_hasher.stash_put((hashX,), (hasher,))
for tx_num, tx_hash in zip(needed_tx_infos, self.db._get_tx_hashes(needed_tx_infos)): status = hasher.digest()
tx_info = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:' self.db.prefix_db.hashX_status.stash_put((hashX,), (status,))
tx_infos[tx_num] = tx_info
self.history_tx_info_cache[tx_num] = tx_info
history = ''.join(map(tx_infos.__getitem__, tx_nums))
for tx_hash, height in new_history:
history += f'{tx_hash[::-1].hex()}:{height:d}:'
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_status.stash_put((hashX,), (status,))
self.db.prefix_db.multi_delete(block_hashX_history_deletes) self.db.prefix_db.multi_delete(block_hashX_history_deletes)
self.db.prefix_db.hashX_history.stash_multi_put(block_hashX_history_puts) self.db.prefix_db.hashX_history.stash_multi_put(block_hashX_history_puts)
@ -2074,6 +2070,8 @@ class BlockchainProcessorService(BlockchainService):
"""Loop forever processing blocks as they arrive.""" """Loop forever processing blocks as they arrive."""
self._caught_up_event = caught_up_event self._caught_up_event = caught_up_event
try: try:
if os.path.exists(self.db._need_restart_path):
raise RuntimeError('scribe restart is needed')
if self.height != self.daemon.cached_height() and not self.db.catching_up: if self.height != self.daemon.cached_height() and not self.db.catching_up:
await self._need_catch_up() # tell the readers that we're still catching up with lbrycrd/lbcd await self._need_catch_up() # tell the readers that we're still catching up with lbrycrd/lbcd
while not self._stopping: while not self._stopping:
@ -2135,9 +2133,6 @@ class BlockchainProcessorService(BlockchainService):
async def _finished_initial_catch_up(self): async def _finished_initial_catch_up(self):
self.log.info(f'caught up to height {self.height}') self.log.info(f'caught up to height {self.height}')
if self.env.index_address_status and self.db.last_indexed_address_status_height < self.db.db_height:
await self.db.rebuild_hashX_status_index(self.db.last_indexed_address_status_height)
# Flush everything but with catching_up->False state. # Flush everything but with catching_up->False state.
self.db.catching_up = False self.db.catching_up = False

View file

@ -37,7 +37,7 @@ class BlockchainService:
def open_db(self): def open_db(self):
env = self.env env = self.env
self.db = SecondaryDB( self.db = SecondaryDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos, env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids, env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor, filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status index_address_status=env.index_address_status

View file

@ -44,7 +44,8 @@ setup(
'filetype==1.0.9', 'filetype==1.0.9',
'grpcio==1.38.0', 'grpcio==1.38.0',
'lbry-rocksdb==0.8.2', 'lbry-rocksdb==0.8.2',
'ujson==5.4.0' 'ujson==5.4.0',
'rehash==1.0.0'
], ],
extras_require={ extras_require={
'lint': ['pylint==2.10.0'], 'lint': ['pylint==2.10.0'],