Compare commits

...

11 commits

Author SHA1 Message Date
Jack Robison
ebcc6e5086
update snapshot url in example docker-composes 2023-03-03 13:15:25 -05:00
Jack Robison
c0766f6abc faster RevertableOpStack.apply_packed_undo_ops 2023-02-13 13:09:03 -05:00
Jack Robison
7bc90c425f
fix error 2023-02-07 17:31:36 -05:00
Jack Robison
f2c494d4d6 check for scribe needing a restart due to low memory 2023-02-07 17:23:23 -05:00
Jack Robison
8147bbf3b9 remove --cache_all_claim_txos setting 2023-02-07 17:23:23 -05:00
Jack Robison
adbeeaf203 update snapshot 2023-01-06 20:32:45 -05:00
Jack Robison
f55ed56215 add comment 2023-01-06 20:32:45 -05:00
Jack Robison
d1d33c4bce feedback 2023-01-06 20:32:45 -05:00
Jack Robison
b7de08ba0b add ResumableSHA256 and HashXHistoryHasherPrefixRow column family 2023-01-06 20:32:45 -05:00
Jack Robison
405cef8d28 ResumableSHA256 2023-01-06 20:32:45 -05:00
Jack Robison
21262d2e43
fix edge case deleting an empty value 2022-12-28 13:40:19 -05:00
20 changed files with 224 additions and 133 deletions

View file

@ -17,7 +17,7 @@ services:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1256013/lbry-rocksdb.tar
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1312050/lbry-rocksdb.tar
command: # for full options, see `scribe --help`
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--max_query_workers=2"

View file

@ -14,7 +14,7 @@ services:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1256013/lbry-rocksdb.tar
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1312050/lbry-rocksdb.tar
command:
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--max_query_workers=2"

View file

@ -7,12 +7,14 @@ import logging
import logging.handlers
import typing
import collections
from ctypes import cast, memmove, POINTER, c_void_p
from bisect import insort_right
from collections import deque
from decimal import Decimal
from typing import Iterable, Deque
from asyncio import get_event_loop, Event
from prometheus_client import Counter
from rehash.structs import EVPobject
from hub.schema.tags import clean_tags
from hub.schema.url import normalize_name
from hub.error import TooManyClaimSearchParametersError
@ -1059,3 +1061,41 @@ async def asyncify_for_loop(gen, ticks_per_sleep: int = 1000):
yield item
if cnt % ticks_per_sleep == 0:
await async_sleep(0)
_SHA256_DIGEST_STATE_SIZE = 120
class ResumableSHA256:
__slots__ = ['_hasher']
def __init__(self, state: typing.Optional[bytes] = None):
self._hasher = hashlib.sha256()
if state is not None:
ctx = self._get_evp_md_ctx()
ctx_size = ctx.digest.contents.ctx_size
if len(state) != _SHA256_DIGEST_STATE_SIZE != ctx_size:
raise Exception(f'invalid sha256 digester state, got {len(state)} bytes')
memmove(ctx.md_data, state, ctx_size)
def _get_evp_md_ctx(self):
c_evp_obj = cast(c_void_p(id(self._hasher)), POINTER(EVPobject))
if hasattr(c_evp_obj.contents.ctx, "contents"):
return c_evp_obj.contents.ctx.contents
else:
return c_evp_obj.contents.ctx
def get_state(self) -> bytes:
ctx = self._get_evp_md_ctx()
ctx_size = ctx.digest.contents.ctx_size
hasher_state = ctx.md_data[:ctx_size]
return hasher_state
def __copy__(self):
return ResumableSHA256(self.get_state())
def update(self, data: bytes):
self._hasher.update(data)
def digest(self):
return self._hasher.digest()

View file

@ -51,6 +51,7 @@ class DB_PREFIXES(enum.Enum):
reposted_count = b'j'
effective_amount = b'i'
future_effective_amount = b'k'
hashX_history_hash = b'l'
COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass

View file

@ -37,7 +37,7 @@ class SecondaryDB:
DB_VERSIONS = [7, 8, 9, 10, 11, 12]
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
cache_all_tx_hashes: bool = False,
blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768,
@ -47,9 +47,9 @@ class SecondaryDB:
self._executor = executor
self._db_dir = db_dir
self._reorg_limit = reorg_limit
self._cache_all_claim_txos = cache_all_claim_txos
self._cache_all_tx_hashes = cache_all_tx_hashes
self._secondary_name = secondary_name
self._need_restart_path = os.path.join(self._db_dir, 'NEED_SCRIBE_RESTART')
if secondary_name:
assert max_open_files == -1, 'max open files must be -1 for secondary readers'
self._db_max_open_files = max_open_files
@ -100,9 +100,6 @@ class SecondaryDB:
self.total_transactions: List[bytes] = []
self.tx_num_mapping: Dict[bytes, int] = {}
# these are only used if the cache_all_claim_txos setting is on
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict)
self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH)
def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]:
@ -956,21 +953,6 @@ class SecondaryDB:
else:
assert self.db_tx_count == 0
async def _read_claim_txos(self):
def read_claim_txos():
set_claim_to_txo = self.claim_to_txo.__setitem__
for k, v in self.prefix_db.claim_to_txo.iterate(fill_cache=False):
set_claim_to_txo(k.claim_hash, v)
self.txo_to_claim[v.tx_num][v.position] = k.claim_hash
self.claim_to_txo.clear()
self.txo_to_claim.clear()
start = time.perf_counter()
self.logger.info("loading claims")
await asyncio.get_event_loop().run_in_executor(self._executor, read_claim_txos)
ts = time.perf_counter() - start
self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4))
# async def _read_headers(self):
# # if self.headers is not None:
# # return
@ -1019,6 +1001,27 @@ class SecondaryDB:
secondary_path = '' if not self._secondary_name else os.path.join(
self._db_dir, self._secondary_name
)
open_db_canary = None
if self._secondary_name:
open_db_canary = os.path.join(self._db_dir, f'{self._secondary_name}-db-canary')
if os.path.exists(open_db_canary):
with open(self._need_restart_path, 'w+') as f:
f.write(f"{time.strftime(f'%Y-%m-%d %H:%M:%S')} {self._secondary_name}\n")
raise RuntimeError('scribe restart is needed')
else:
with open(open_db_canary, 'w'):
pass
else:
herald_db_canary = os.path.join(self._db_dir, 'lbry-reader-db-canary')
es_sync_db_canary = os.path.join(self._db_dir, 'lbry-elastic-writer-db-canary')
if os.path.exists(herald_db_canary):
os.remove(herald_db_canary)
if os.path.exists(es_sync_db_canary):
os.remove(es_sync_db_canary)
if os.path.exists(self._need_restart_path):
os.remove(self._need_restart_path)
db_path = os.path.join(self._db_dir, 'lbry-rocksdb')
self.prefix_db = PrefixDB(
db_path, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files,
@ -1027,6 +1030,7 @@ class SecondaryDB:
)
if secondary_path != '':
os.remove(open_db_canary)
self.logger.info(f'opened db for read only: lbry-rocksdb (%s)', db_path)
else:
self.logger.info(f'opened db for writing: lbry-rocksdb (%s)', db_path)
@ -1063,8 +1067,6 @@ class SecondaryDB:
async def initialize_caches(self):
await self._read_tx_counts()
await self._read_block_hashes()
if self._cache_all_claim_txos:
await self._read_claim_txos()
if self._cache_all_tx_hashes:
await self._read_tx_hashes()
if self.db_height > 0:
@ -1154,15 +1156,9 @@ class SecondaryDB:
}
def get_cached_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]:
if self._cache_all_claim_txos:
return self.claim_to_txo.get(claim_hash)
return self.prefix_db.claim_to_txo.get_pending(claim_hash)
def get_cached_claim_hash(self, tx_num: int, position: int) -> Optional[bytes]:
if self._cache_all_claim_txos:
if tx_num not in self.txo_to_claim:
return
return self.txo_to_claim[tx_num].get(position, None)
v = self.prefix_db.txo_to_claim.get_pending(tx_num, position)
return None if not v else v.claim_hash

View file

@ -273,6 +273,7 @@ class BasePrefixDB:
undo_c_f = self.column_families[DB_PREFIXES.undo.value]
undo_info = self._db.get((undo_c_f, undo_key))
self._op_stack.apply_packed_undo_ops(undo_info)
self._op_stack.validate_and_apply_stashed_ops()
try:
with self._db.write_batch(sync=True) as batch:
batch_put = batch.put

View file

@ -3,6 +3,7 @@ import struct
import array
import base64
from typing import Union, Tuple, NamedTuple, Optional
from hub.common import ResumableSHA256
from hub.db.common import DB_PREFIXES
from hub.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow
from hub.schema.url import normalize_name
@ -1851,6 +1852,46 @@ class FutureEffectiveAmountPrefixRow(PrefixRow):
return cls.pack_key(claim_hash), cls.pack_value(future_effective_amount)
class HashXHistoryHasherKey(NamedTuple):
hashX: bytes
class HashXHistoryHasherValue(NamedTuple):
hasher: ResumableSHA256
class HashXHistoryHasherPrefixRow(PrefixRow):
prefix = DB_PREFIXES.hashX_history_hash.value
key_struct = struct.Struct(b'>11s')
value_struct = struct.Struct(b'>120s')
cache_size = 1024 * 1024 * 64
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>11s').pack
]
@classmethod
def pack_key(cls, hashX: bytes):
return super().pack_key(hashX)
@classmethod
def unpack_key(cls, key: bytes) -> HashXHistoryHasherKey:
return HashXHistoryHasherKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, hasher: ResumableSHA256) -> bytes:
return super().pack_value(hasher.get_state())
@classmethod
def unpack_value(cls, data: bytes) -> HashXHistoryHasherValue:
return HashXHistoryHasherValue(ResumableSHA256(*super().unpack_value(data)))
@classmethod
def pack_item(cls, hashX: bytes, hasher: ResumableSHA256):
return cls.pack_key(hashX), cls.pack_value(hasher)
class PrefixDB(BasePrefixDB):
def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64,
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None,
@ -1897,6 +1938,7 @@ class PrefixDB(BasePrefixDB):
self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack)
self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack)
self.future_effective_amount = FutureEffectiveAmountPrefixRow(db, self._op_stack)
self.hashX_history_hasher = HashXHistoryHasherPrefixRow(db, self._op_stack)
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:

View file

@ -160,7 +160,7 @@ class RevertableOpStack:
# there is a value and we're not deleting it in this op
# check that a delete for the stored value is in the stack
raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}\nvs\n{stored_val}")
elif not stored_val:
elif not has_stored_val:
raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
elif stored_val != op.value:
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
@ -324,9 +324,21 @@ class RevertableOpStack:
"""
Unpack and apply a sequence of undo ops from serialized undo bytes
"""
while packed:
op, packed = RevertableOp.unpack(packed)
self.append_op(op)
offset = 0
packed_size = len(packed)
while offset < packed_size:
is_put, key_len, val_len = _OP_STRUCT.unpack(packed[offset:offset + 9])
offset += 9
key = packed[offset:offset + key_len]
offset += key_len
value = packed[offset:offset + val_len]
offset += val_len
if is_put == 1:
op = RevertablePut(key, value)
else:
op = RevertableDelete(key, value)
self._stash.append(op)
self._stashed_last_op_for_key[op.key] = op
def get_pending_op(self, key: bytes) -> Optional[RevertableOp]:
if key in self._stashed_last_op_for_key:

View file

@ -9,11 +9,11 @@ from hub.db.common import ResolveResult
class ElasticSyncDB(SecondaryDB):
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
cache_all_tx_hashes: bool = False,
blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False):
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit,
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
index_address_status)
self.block_timestamp_cache = LRUCache(1024)

View file

@ -3,11 +3,11 @@ from hub.env import Env
class ElasticEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None,
cache_all_tx_hashes=None, cache_all_claim_txos=None, elastic_host=None, elastic_port=None,
cache_all_tx_hashes=None, elastic_host=None, elastic_port=None,
es_index_prefix=None, elastic_notifier_host=None, elastic_notifier_port=None,
blocking_channel_ids=None, filtering_channel_ids=None, reindex=False):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids)
blocking_channel_ids, filtering_channel_ids)
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default(
@ -43,7 +43,7 @@ class ElasticEnv(Env):
elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain,
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit,
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos, blocking_channel_ids=args.blocking_channel_ids,
blocking_channel_ids=args.blocking_channel_ids,
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
elastic_notifier_port=args.elastic_notifier_port
)

View file

@ -49,7 +49,7 @@ class ElasticSyncService(BlockchainReaderService):
def open_db(self):
env = self.env
self.db = ElasticSyncDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status

View file

@ -30,7 +30,7 @@ class Env:
pass
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
prometheus_port=None, cache_all_tx_hashes=None,
blocking_channel_ids=None, filtering_channel_ids=None, index_address_status=None):
self.logger = logging.getLogger(__name__)
@ -46,7 +46,6 @@ class Env:
self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0)
self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False)
self.cache_all_claim_txos = cache_all_claim_txos if cache_all_claim_txos is not None else self.boolean('CACHE_ALL_CLAIM_TXOS', False)
# Filtering / Blocking
self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default(
'BLOCKING_CHANNEL_IDS', '').split(' ')
@ -171,11 +170,6 @@ class Env:
"resolve, transaction fetching, and block sync all faster at the expense of higher "
"memory usage (at least 10GB more). Can be set in env with 'CACHE_ALL_TX_HASHES'.",
default=cls.boolean('CACHE_ALL_TX_HASHES', False))
parser.add_argument('--cache_all_claim_txos', action='store_true',
help="Load all claim txos into memory. This will make address subscriptions and sync, "
"resolve, transaction fetching, and block sync all faster at the expense of higher "
"memory usage. Can be set in env with 'CACHE_ALL_CLAIM_TXOS'.",
default=cls.boolean('CACHE_ALL_CLAIM_TXOS', False))
parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0),
help="Port for prometheus metrics to listen on, disabled by default. "
"Can be set in env with 'PROMETHEUS_PORT'.")

View file

@ -6,11 +6,11 @@ from hub.db import SecondaryDB
class HeraldDB(SecondaryDB):
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
cache_all_tx_hashes: bool = False,
blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768):
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit,
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
index_address_status, merkle_cache_size, tx_cache_size)
# self.headers = None

View file

@ -19,7 +19,7 @@ def parse_es_services(elastic_services_arg: str):
class ServerEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
prometheus_port=None, cache_all_tx_hashes=None,
daemon_url=None, host=None, elastic_services=None, es_index_prefix=None,
tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None,
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
@ -29,7 +29,7 @@ class ServerEnv(Env):
merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None,
history_tx_cache_size=None, largest_address_history_cache_size=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
blocking_channel_ids, filtering_channel_ids, index_address_status)
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.host = host if host is not None else self.default('HOST', 'localhost')
self.elastic_services = deque(parse_es_services(elastic_services or 'localhost:9200/localhost:19080'))
@ -153,7 +153,7 @@ class ServerEnv(Env):
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file,
allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos, country=args.country, payment_address=args.payment_address,
country=args.country, payment_address=args.payment_address,
donation_address=args.donation_address, max_send=args.max_send, max_receive=args.max_receive,
max_sessions=args.max_sessions, session_timeout=args.session_timeout,
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,

View file

@ -53,7 +53,7 @@ class HubServerService(BlockchainReaderService):
def open_db(self):
env = self.env
self.db = HeraldDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status, merkle_cache_size=env.merkle_cache_size,

View file

@ -5,17 +5,17 @@ import time
from typing import List
from concurrent.futures.thread import ThreadPoolExecutor
from bisect import bisect_right
from hub.common import sha256
from hub.common import ResumableSHA256
from hub.db import SecondaryDB
class PrimaryDB(SecondaryDB):
def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
cache_all_tx_hashes: bool = False,
max_open_files: int = 64, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False, enforce_integrity=True):
super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_claim_txos, cache_all_tx_hashes,
super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_tx_hashes,
blocking_channel_ids, filtering_channel_ids, executor, index_address_status,
enforce_integrity=enforce_integrity)
@ -35,16 +35,19 @@ class PrimaryDB(SecondaryDB):
if last_hashX:
yield last_hashX
def hashX_status_from_history(history: bytes) -> bytes:
def hashX_status_from_history(history: bytes) -> ResumableSHA256:
tx_counts = self.tx_counts
hist_tx_nums = array.array('I')
hist_tx_nums.frombytes(history)
digest = hashlib.sha256()
for tx_num, tx_hash in zip(
digest = ResumableSHA256()
digest.update(
b''.join(f'{tx_hash[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'.encode()
for tx_num, tx_hash in zip(
hist_tx_nums,
self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)):
digest.update(f'{tx_hash[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'.encode())
return digest.digest()
self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)
))
)
return digest
start = time.perf_counter()
@ -67,17 +70,24 @@ class PrimaryDB(SecondaryDB):
hashX_cnt += 1
key = prefix_db.hashX_status.pack_key(hashX)
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
status = hashX_status_from_history(history)
digester = hashX_status_from_history(history)
status = digester.digest()
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
if existing_status and existing_status == status:
continue
elif not existing_status:
existing_digester = prefix_db.hashX_history_hasher.get(hashX)
if not existing_status:
prefix_db.stash_raw_put(key, status)
op_cnt += 1
else:
prefix_db.stash_raw_delete(key, existing_status)
prefix_db.stash_raw_put(key, status)
op_cnt += 2
if not existing_digester:
prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,))
op_cnt += 1
else:
prefix_db.hashX_history_hasher.stash_delete((hashX,), existing_digester)
prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,))
op_cnt += 2
if op_cnt > 100000:
prefix_db.unsafe_commit()
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")

View file

@ -3,14 +3,13 @@ from hub.env import Env
class BlockchainEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
blocking_channel_ids=None, filtering_channel_ids=None,
prometheus_port=None, cache_all_tx_hashes=None, blocking_channel_ids=None, filtering_channel_ids=None,
db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None,
index_address_status=None, rebuild_address_status_from_height=None,
daemon_ca_path=None, history_tx_cache_size=None,
db_disable_integrity_checks=False):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
blocking_channel_ids, filtering_channel_ids, index_address_status)
self.db_max_open_files = db_max_open_files
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \
@ -57,7 +56,7 @@ class BlockchainEnv(Env):
db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files,
max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit,
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses,
index_address_status=args.index_address_statuses,
hashX_history_cache_size=args.address_history_cache_size,
rebuild_address_status_from_height=args.rebuild_address_status_from_height,
daemon_ca_path=args.daemon_ca_path, history_tx_cache_size=args.history_tx_cache_size,

View file

@ -1,3 +1,4 @@
import os
import time
import asyncio
import typing
@ -11,7 +12,8 @@ from hub import PROMETHEUS_NAMESPACE
from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from hub.error.base import ChainError
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache, LFUCacheWithMetrics
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LFUCache
from hub.common import ResumableSHA256, LFUCacheWithMetrics
from hub.scribe.db import PrimaryDB
from hub.scribe.daemon import LBCDaemon
from hub.scribe.transaction import Tx, TxOutput, TxInput, Block
@ -137,11 +139,10 @@ class BlockchainProcessorService(BlockchainService):
def open_db(self):
env = self.env
self.db = PrimaryDB(
env.coin, env.db_dir, env.reorg_limit, cache_all_claim_txos=env.cache_all_claim_txos,
cache_all_tx_hashes=env.cache_all_tx_hashes, max_open_files=env.db_max_open_files,
blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids,
executor=self._executor, index_address_status=env.index_address_status,
enforce_integrity=not env.db_disable_integrity_checks
env.coin, env.db_dir, env.reorg_limit, cache_all_tx_hashes=env.cache_all_tx_hashes,
max_open_files=env.db_max_open_files, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status, enforce_integrity=not env.db_disable_integrity_checks
)
async def run_in_thread_with_lock(self, func, *args):
@ -169,10 +170,22 @@ class BlockchainProcessorService(BlockchainService):
def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete):
self.mempool.remove(to_delete)
touched_hashXs = self.mempool.update_mempool(to_put)
touched_hashXs = list(self.mempool.update_mempool(to_put))
if self.env.index_address_status:
for hashX in touched_hashXs:
self._get_update_hashX_mempool_status_ops(hashX)
status_hashers = {
k: v.hasher if v else ResumableSHA256() for k, v in zip(
touched_hashXs,
self.db.prefix_db.hashX_history_hasher.multi_get([(hashX,) for hashX in touched_hashXs])
)
}
for hashX, v in zip(
touched_hashXs,
self.db.prefix_db.hashX_mempool_status.multi_get([(hashX,) for hashX in touched_hashXs])):
if v is not None:
self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), v)
hasher = status_hashers[hashX]
hasher.update(self.mempool.mempool_history(hashX).encode())
self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (hasher.digest(),))
for tx_hash, raw_tx in to_put:
mempool_prefix.stash_put((tx_hash,), (raw_tx,))
for tx_hash, raw_tx in to_delete.items():
@ -263,9 +276,6 @@ class BlockchainProcessorService(BlockchainService):
for _ in range(count):
await self.run_in_thread_with_lock(self.backup_block)
self.log.info(f'backed up to height {self.height:,d}')
if self.env.cache_all_claim_txos:
await self.db._read_claim_txos() # TODO: don't do this
await self.prefetcher.reset_height(self.height)
self.reorg_count_metric.inc()
except:
@ -394,12 +404,6 @@ class BlockchainProcessorService(BlockchainService):
if claim_hash not in self.updated_claim_previous_activations:
self.updated_claim_previous_activations[claim_hash] = activation
if self.env.cache_all_claim_txos:
self.db.claim_to_txo[claim_hash] = ClaimToTXOValue(
tx_num, nout, root_tx_num, root_idx, txo.value, channel_signature_is_valid, claim_name
)
self.db.txo_to_claim[tx_num][nout] = claim_hash
pending = StagedClaimtrieItem(
claim_name, normalized_name, claim_hash, txo.value, self.coin.get_expiration_height(height), tx_num, nout,
root_tx_num, root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash
@ -690,11 +694,6 @@ class BlockchainProcessorService(BlockchainService):
if 0 < activation <= self.height:
self.effective_amount_delta[claim_hash] -= spent.amount
self.future_effective_amount_delta[spent.claim_hash] -= spent.amount
if self.env.cache_all_claim_txos:
claim_hash = self.db.txo_to_claim[txin_num].pop(nout)
if not self.db.txo_to_claim[txin_num]:
self.db.txo_to_claim.pop(txin_num)
self.db.claim_to_txo.pop(claim_hash)
if spent.reposted_claim_hash:
self.pending_reposted.add(spent.reposted_claim_hash)
if spent.signing_hash and spent.channel_signature_is_valid and spent.signing_hash not in self.abandoned_claims:
@ -1637,15 +1636,6 @@ class BlockchainProcessorService(BlockchainService):
self.hashX_full_cache[hashX] = history
return history
def _get_update_hashX_mempool_status_ops(self, hashX: bytes):
existing = self.db.prefix_db.hashX_mempool_status.get(hashX)
if existing:
self.db.prefix_db.hashX_mempool_status.stash_delete((hashX,), existing)
history = self._get_cached_hashX_history(hashX) + self.mempool.mempool_history(hashX)
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_mempool_status.stash_put((hashX,), (status,))
def advance_block(self, block: Block):
txo_count = 0
txi_count = 0
@ -1745,9 +1735,7 @@ class BlockchainProcessorService(BlockchainService):
# update hashX history status hashes and compactify the histories
self._get_update_hashX_histories_ops(height)
# only compactify adddress histories and update the status index if we're already caught up,
# a bulk update will happen once catchup finishes
if not self.db.catching_up and self.env.index_address_status:
if self.env.index_address_status:
self._get_compactify_ops(height)
self.db.last_indexed_address_status_height = height
@ -1802,6 +1790,17 @@ class BlockchainProcessorService(BlockchainService):
)
def _get_compactify_ops(self, height: int):
def _rebuild_hasher(hist_tx_nums):
hasher = ResumableSHA256()
hasher.update(
b''.join(f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num)}:'.encode()
for tx_num, tx_hash in zip(
hist_tx_nums,
self.db.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)
))
)
return hasher
existing_hashX_statuses = self.db.prefix_db.hashX_status.multi_get([(hashX,) for hashX in self.hashXs_by_tx.keys()], deserialize_value=False)
if existing_hashX_statuses:
pack_key = self.db.prefix_db.hashX_status.pack_key
@ -1816,6 +1815,13 @@ class BlockchainProcessorService(BlockchainService):
append_deletes_hashX_history = block_hashX_history_deletes.append
block_hashX_history_puts = []
existing_status_hashers = {
k: v.hasher if v else None for k, v in zip(
self.hashXs_by_tx,
self.db.prefix_db.hashX_history_hasher.multi_get([(hashX,) for hashX in self.hashXs_by_tx])
)
}
for (hashX, new_tx_nums), existing in zip(self.hashXs_by_tx.items(), existing_hashX_statuses):
new_history = [(self.pending_transactions[tx_num], height) for tx_num in new_tx_nums]
@ -1830,11 +1836,9 @@ class BlockchainProcessorService(BlockchainService):
unpack_key = self.db.prefix_db.hashX_history.unpack_key
needs_compaction = False
total_hist_txs = b''
for k, hist in self.db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_key=False,
deserialize_value=False):
hist_txs = unpack_history(hist)
total_hist_txs += hist
txs_extend(hist_txs)
hist_height = unpack_key(k).height
if height > reorg_limit and hist_height < height - reorg_limit:
@ -1853,27 +1857,19 @@ class BlockchainProcessorService(BlockchainService):
block_hashX_history_puts.append(((hashX, 0), (compact_hist_txs,)))
if not new_history:
continue
needed_tx_infos = []
append_needed_tx_info = needed_tx_infos.append
tx_infos = {}
for tx_num in tx_nums:
cached_tx_info = self.history_tx_info_cache.get(tx_num)
if cached_tx_info is not None:
tx_infos[tx_num] = cached_tx_info
else:
append_needed_tx_info(tx_num)
if needed_tx_infos:
for tx_num, tx_hash in zip(needed_tx_infos, self.db._get_tx_hashes(needed_tx_infos)):
tx_info = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:'
tx_infos[tx_num] = tx_info
self.history_tx_info_cache[tx_num] = tx_info
history = ''.join(map(tx_infos.__getitem__, tx_nums))
for tx_hash, height in new_history:
history += f'{tx_hash[::-1].hex()}:{height:d}:'
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_status.stash_put((hashX,), (status,))
hasher = existing_status_hashers[hashX]
if hasher is None:
# this is to migrate in the new column family, in the future it can be a new digester
# hasher = ResumableSHA256()
hasher = _rebuild_hasher(tx_nums)
else:
self.db.prefix_db.hashX_history_hasher.stash_delete((hashX,), (hasher,))
hasher.update(b''.join(
f'{tx_hash[::-1].hex()}:{height:d}:'.encode() for tx_hash, height in new_history
))
self.db.prefix_db.hashX_history_hasher.stash_put((hashX,), (hasher,))
status = hasher.digest()
self.db.prefix_db.hashX_status.stash_put((hashX,), (status,))
self.db.prefix_db.multi_delete(block_hashX_history_deletes)
self.db.prefix_db.hashX_history.stash_multi_put(block_hashX_history_puts)
@ -2074,6 +2070,8 @@ class BlockchainProcessorService(BlockchainService):
"""Loop forever processing blocks as they arrive."""
self._caught_up_event = caught_up_event
try:
if os.path.exists(self.db._need_restart_path):
raise RuntimeError('scribe restart is needed')
if self.height != self.daemon.cached_height() and not self.db.catching_up:
await self._need_catch_up() # tell the readers that we're still catching up with lbrycrd/lbcd
while not self._stopping:
@ -2135,9 +2133,6 @@ class BlockchainProcessorService(BlockchainService):
async def _finished_initial_catch_up(self):
self.log.info(f'caught up to height {self.height}')
if self.env.index_address_status and self.db.last_indexed_address_status_height < self.db.db_height:
await self.db.rebuild_hashX_status_index(self.db.last_indexed_address_status_height)
# Flush everything but with catching_up->False state.
self.db.catching_up = False

View file

@ -37,7 +37,7 @@ class BlockchainService:
def open_db(self):
env = self.env
self.db = SecondaryDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status

View file

@ -44,7 +44,8 @@ setup(
'filetype==1.0.9',
'grpcio==1.38.0',
'lbry-rocksdb==0.8.2',
'ujson==5.4.0'
'ujson==5.4.0',
'rehash==1.0.0'
],
extras_require={
'lint': ['pylint==2.10.0'],