Compare commits

...

51 commits

Author SHA1 Message Date
Jack Robison
ebcc6e5086
update snapshot url in example docker-composes 2023-03-03 13:15:25 -05:00
Jack Robison
c0766f6abc faster RevertableOpStack.apply_packed_undo_ops 2023-02-13 13:09:03 -05:00
Jack Robison
7bc90c425f
fix error 2023-02-07 17:31:36 -05:00
Jack Robison
f2c494d4d6 check for scribe needing a restart due to low memory 2023-02-07 17:23:23 -05:00
Jack Robison
8147bbf3b9 remove --cache_all_claim_txos setting 2023-02-07 17:23:23 -05:00
Jack Robison
adbeeaf203 update snapshot 2023-01-06 20:32:45 -05:00
Jack Robison
f55ed56215 add comment 2023-01-06 20:32:45 -05:00
Jack Robison
d1d33c4bce feedback 2023-01-06 20:32:45 -05:00
Jack Robison
b7de08ba0b add ResumableSHA256 and HashXHistoryHasherPrefixRow column family 2023-01-06 20:32:45 -05:00
Jack Robison
405cef8d28 ResumableSHA256 2023-01-06 20:32:45 -05:00
Jack Robison
21262d2e43
fix edge case deleting an empty value 2022-12-28 13:40:19 -05:00
dependabot[bot]
75d64f9dc6 Bump protobuf from 3.17.2 to 3.18.3
Bumps [protobuf](https://github.com/protocolbuffers/protobuf) from 3.17.2 to 3.18.3.
- [Release notes](https://github.com/protocolbuffers/protobuf/releases)
- [Changelog](https://github.com/protocolbuffers/protobuf/blob/main/generate_changelog.py)
- [Commits](https://github.com/protocolbuffers/protobuf/compare/v3.17.2...v3.18.3)

---
updated-dependencies:
- dependency-name: protobuf
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-11-13 00:01:36 -05:00
Jack Robison
929448d64b collect db values for integrity check in chunks 2022-11-12 23:58:36 -05:00
Jack Robison
134aad29f1 fix issue with supports with wrong names 2022-11-12 23:58:36 -05:00
Jack Robison
dcd4d7a7a8 update docker entrypoint and example composes 2022-11-09 22:06:24 -05:00
Jack Robison
e0c26c0952 update docs 2022-11-09 22:06:24 -05:00
Jack Robison
9fa2d0b6ca batched update blocked/filtered in ES 2022-11-09 22:06:24 -05:00
Jack Robison
0877e34289 use multi_get to make status hash 2022-11-09 22:06:24 -05:00
Jack Robison
92c3d3840d fix address history 2022-11-09 22:06:24 -05:00
Jack Robison
98017e9a76 fix uncaught error in es sync 2022-11-09 22:06:24 -05:00
Jack Robison
6932c38854 logging 2022-11-09 22:06:24 -05:00
Jack Robison
ecdb5ef859 logging 2022-11-09 22:06:24 -05:00
Jack Robison
81c44f5f4e assert 2022-11-09 22:06:24 -05:00
Jack Robison
59c07e315a fix https://github.com/lbryio/hub/issues/104 2022-11-09 22:06:24 -05:00
Jack Robison
07f8ec41a1 fix including early activated claim amount during takeover due to abandon 2022-11-09 22:06:24 -05:00
Jack Robison
01e52e795a fix edge case activating claim updates 2022-11-09 22:06:24 -05:00
Jack Robison
98ec0f5e0c fix supports that activate on the block the claim is abandoned 2022-11-09 22:06:24 -05:00
Jack Robison
eb3d7a183a fix spending claims and supports on the block they activate 2022-11-09 22:06:24 -05:00
Jack Robison
93aebedd7d tests 2022-11-09 22:06:24 -05:00
Jack Robison
616daec0d9 dead code 2022-11-09 22:06:24 -05:00
Jack Robison
745462d3ae migrate 11 to 12 2022-11-09 22:06:24 -05:00
Jack Robison
0634c1e05f fix handling supports without claims 2022-11-09 22:06:24 -05:00
Jack Robison
b230b693f8 effective amount 2022-11-09 22:06:24 -05:00
Jack Robison
97a0d00bfd faster active amount as of height 2022-11-09 22:06:24 -05:00
Jack Robison
f2f0f426aa stash 2022-11-09 22:06:24 -05:00
Jack Robison
7be5905b46 refactor to use batched pending and future effective amounts 2022-11-09 22:06:24 -05:00
Jack Robison
7c9e91f91a batched _get_pending_effective_amounts 2022-11-09 22:06:24 -05:00
Jack Robison
586e9a613b logging 2022-11-09 22:06:24 -05:00
Jack Robison
e61efcd00d future effective amount index 2022-11-09 22:06:24 -05:00
Jack Robison
9311d924f7 improve logging 2022-11-09 22:06:24 -05:00
Jack Robison
8947d3cb19 increase UTXO and HashXUTXO cache sizes 2022-11-09 22:06:24 -05:00
Jack Robison
2150363108 batch spend claims and supports 2022-11-09 22:06:24 -05:00
Jack Robison
6a9a2ad40f batch spend utxos 2022-11-09 22:06:24 -05:00
Jack Robison
55eb8818ea add --db_disable_integrity_checks option to scribe 2022-11-09 22:06:24 -05:00
Jack Robison
a48564e3b2 batched db integrity checks 2022-11-09 22:06:24 -05:00
Jack Robison
606e9bb0d6 support multiple column families in one raw multi_get call 2022-11-09 22:06:24 -05:00
Jack Robison
abc5184e19 add future effective amount index
-increase lru cache size for effective amount column famlily to 64mb
2022-11-09 22:06:24 -05:00
Jack Robison
99ddd208db add Block.decoded_header helper property 2022-11-09 22:06:24 -05:00
Jack Robison
fc234b12e5 handle es_info being an empty file 2022-11-09 22:06:24 -05:00
Jack Robison
04d747ff99 failover support for elastic-sync-notifier and elasticsearch
deprecates herald options `elastic_host`, `elastic_port`, `elastic_notifier_host`, and `elastic_notifier_port` in favor of the single new `elastic_services` option
2022-11-09 22:06:24 -05:00
Jack Robison
4586b344ce move search_index object to HubServerService 2022-11-09 22:06:24 -05:00
29 changed files with 1337 additions and 681 deletions

View file

@ -100,10 +100,7 @@ For various reasons it may be desirable to block or filtering content from claim
- `--host` Interface for server to listen on, use 0.0.0.0 to listen on the external interface. Can be set from the environment with `HOST`
- `--tcp_port` Electrum TCP port to listen on for hub server. Can be set from the environment with `TCP_PORT`
- `--udp_port` UDP port to listen on for hub server. Can be set from the environment with `UDP_PORT`
- `--elastic_host` Hostname or ip address of the elasticsearch instance to connect to. Can be set from the environment with `ELASTIC_HOST`
- `--elastic_port` Elasticsearch port to connect to. Can be set from the environment with `ELASTIC_PORT`
- `--elastic_notifier_host` Elastic sync notifier host to connect to, defaults to localhost. Can be set from the environment with `ELASTIC_NOTIFIER_HOST`
- `--elastic_notifier_port` Elastic sync notifier port to connect using. Can be set from the environment with `ELASTIC_NOTIFIER_PORT`
- `--elastic_services` Comma separated list of items in the format `elastic_host:elastic_port/notifier_host:notifier_port`. Can be set from the environment with `ELASTIC_SERVICES`
- `--query_timeout_ms` Timeout for claim searches in elasticsearch in milliseconds. Can be set from the environment with `QUERY_TIMEOUT_MS`
- `--blocking_channel_ids` Space separated list of channel claim ids used for blocking. Claims that are reposted by these channels can't be resolved or returned in search results. Can be set from the environment with `BLOCKING_CHANNEL_IDS`.
- `--filtering_channel_ids` Space separated list of channel claim ids used for blocking. Claims that are reposted by these channels aren't returned in search results. Can be set from the environment with `FILTERING_CHANNEL_IDS`

View file

@ -17,11 +17,10 @@ services:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/lbry-rocksdb.zip
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1312050/lbry-rocksdb.tar
command: # for full options, see `scribe --help`
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--max_query_workers=2"
# - "--cache_all_tx_hashes" # uncomment to keep an index of all tx hashes in memory. This uses lots (10+g) of memory but substantially improves performance.
- "--index_address_statuses"
scribe_elastic_sync:
depends_on:
@ -35,14 +34,12 @@ services:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe-elastic-sync
- FILTERING_CHANNEL_IDS=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8
- BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6
- FILTERING_CHANNEL_IDS=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8 d4612c256a44fc025c37a875751415299b1f8220
- BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6 145265bd234b7c9c28dfc6857d878cca402dda94 22335fbb132eee86d374b613875bf88bec83492f f665b89b999f411aa5def311bb2eb385778d49c8
command: # for full options, see `scribe-elastic-sync --help`
- "--max_query_workers=2"
- "--elastic_host=127.0.0.1" # elasticsearch host
- "--elastic_port=9200" # elasticsearch port
- "--elastic_notifier_host=127.0.0.1" # address for the elastic sync notifier to connect to
- "--elastic_notifier_port=19080"
herald:
depends_on:
- lbcd
@ -58,18 +55,15 @@ services:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=herald
- FILTERING_CHANNEL_IDS=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8
- BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6
- FILTERING_CHANNEL_IDS=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8 d4612c256a44fc025c37a875751415299b1f8220
- BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6 145265bd234b7c9c28dfc6857d878cca402dda94 22335fbb132eee86d374b613875bf88bec83492f f665b89b999f411aa5def311bb2eb385778d49c8
command: # for full options, see `herald --help`
- "--index_address_statuses"
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--max_query_workers=4"
- "--host=0.0.0.0"
- "--elastic_services=127.0.0.1:9200/127.0.0.1:19080"
- "--prometheus_port=2112" # comment out to disable prometheus metrics
# - "--elastic_host=127.0.0.1" # elasticsearch host
# - "--elastic_port=9200" # elasticsearch port
# - "--elastic_notifier_host=127.0.0.1" # address for the elastic sync notifier to connect to
# - "--elastic_notifier_port=19080"
# - "--max_sessions=100000 # uncomment to increase the maximum number of electrum connections, defaults to 1000
# - "--allow_lan_udp" # uncomment to reply to clients on the local network
es01:

View file

@ -14,11 +14,10 @@ services:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/lbry-rocksdb.zip
- SNAPSHOT_URL=https://snapshots.lbry.com/hub/block_1312050/lbry-rocksdb.tar
command:
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--max_query_workers=2"
- "--cache_all_tx_hashes"
scribe_elastic_sync:
image: lbry/hub:${SCRIBE_TAG:-master}
restart: always
@ -29,8 +28,8 @@ services:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe-elastic-sync
- FILTERING_CHANNEL_IDS=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8
- BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6
- FILTERING_CHANNEL_IDS=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8 d4612c256a44fc025c37a875751415299b1f8220
- BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6 145265bd234b7c9c28dfc6857d878cca402dda94 22335fbb132eee86d374b613875bf88bec83492f f665b89b999f411aa5def311bb2eb385778d49c8
command:
- "--elastic_host=127.0.0.1"
- "--elastic_port=9200"
@ -49,12 +48,11 @@ services:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=herald
- FILTERING_CHANNEL_IDS=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8
- BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6
- FILTERING_CHANNEL_IDS=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8 d4612c256a44fc025c37a875751415299b1f8220
- BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6 145265bd234b7c9c28dfc6857d878cca402dda94 22335fbb132eee86d374b613875bf88bec83492f f665b89b999f411aa5def311bb2eb385778d49c8
command:
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--elastic_host=127.0.0.1"
- "--elastic_port=9200"
- "--elastic_services=127.0.0.1:9200/127.0.0.1:19080"
- "--max_query_workers=4"
- "--host=0.0.0.0"
- "--max_sessions=100000"

View file

@ -7,12 +7,14 @@ import logging
import logging.handlers
import typing
import collections
from ctypes import cast, memmove, POINTER, c_void_p
from bisect import insort_right
from collections import deque
from decimal import Decimal
from typing import Iterable, Deque
from asyncio import get_event_loop, Event
from prometheus_client import Counter
from rehash.structs import EVPobject
from hub.schema.tags import clean_tags
from hub.schema.url import normalize_name
from hub.error import TooManyClaimSearchParametersError
@ -1059,3 +1061,41 @@ async def asyncify_for_loop(gen, ticks_per_sleep: int = 1000):
yield item
if cnt % ticks_per_sleep == 0:
await async_sleep(0)
_SHA256_DIGEST_STATE_SIZE = 120
class ResumableSHA256:
__slots__ = ['_hasher']
def __init__(self, state: typing.Optional[bytes] = None):
self._hasher = hashlib.sha256()
if state is not None:
ctx = self._get_evp_md_ctx()
ctx_size = ctx.digest.contents.ctx_size
if len(state) != _SHA256_DIGEST_STATE_SIZE != ctx_size:
raise Exception(f'invalid sha256 digester state, got {len(state)} bytes')
memmove(ctx.md_data, state, ctx_size)
def _get_evp_md_ctx(self):
c_evp_obj = cast(c_void_p(id(self._hasher)), POINTER(EVPobject))
if hasattr(c_evp_obj.contents.ctx, "contents"):
return c_evp_obj.contents.ctx.contents
else:
return c_evp_obj.contents.ctx
def get_state(self) -> bytes:
ctx = self._get_evp_md_ctx()
ctx_size = ctx.digest.contents.ctx_size
hasher_state = ctx.md_data[:ctx_size]
return hasher_state
def __copy__(self):
return ResumableSHA256(self.get_state())
def update(self, data: bytes):
self._hasher.update(data)
def digest(self):
return self._hasher.digest()

View file

@ -50,6 +50,8 @@ class DB_PREFIXES(enum.Enum):
hashX_mempool_status = b'g'
reposted_count = b'j'
effective_amount = b'i'
future_effective_amount = b'k'
hashX_history_hash = b'l'
COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass

View file

@ -34,25 +34,27 @@ NAMESPACE = f"{PROMETHEUS_NAMESPACE}_db"
class SecondaryDB:
DB_VERSIONS = [7, 8, 9, 10, 11]
DB_VERSIONS = [7, 8, 9, 10, 11, 12]
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
cache_all_tx_hashes: bool = False,
blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768):
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768,
enforce_integrity=True):
self.logger = logging.getLogger(__name__)
self.coin = coin
self._executor = executor
self._db_dir = db_dir
self._reorg_limit = reorg_limit
self._cache_all_claim_txos = cache_all_claim_txos
self._cache_all_tx_hashes = cache_all_tx_hashes
self._secondary_name = secondary_name
self._need_restart_path = os.path.join(self._db_dir, 'NEED_SCRIBE_RESTART')
if secondary_name:
assert max_open_files == -1, 'max open files must be -1 for secondary readers'
self._db_max_open_files = max_open_files
self._index_address_status = index_address_status
self._enforce_integrity = enforce_integrity
self.prefix_db: typing.Optional[PrefixDB] = None
self.hist_unflushed = defaultdict(partial(array.array, 'I'))
@ -98,9 +100,6 @@ class SecondaryDB:
self.total_transactions: List[bytes] = []
self.tx_num_mapping: Dict[bytes, int] = {}
# these are only used if the cache_all_claim_txos setting is on
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict)
self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH)
def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]:
@ -522,11 +521,16 @@ class SecondaryDB:
if not v:
return 0
return v.activated_sum - v.activated_support_sum
amount = 0
v = self.prefix_db.effective_amount.get(claim_hash)
if v:
amount = v.activated_sum - v.activated_support_sum
for v in self.prefix_db.active_amount.iterate(
start=(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, 0), stop=(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, height),
start=(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, self.db_height + 1),
stop=(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, height),
include_key=False, reverse=True):
return v.amount
return 0
return amount
def get_effective_amount(self, claim_hash: bytes) -> int:
v = self.prefix_db.effective_amount.get(claim_hash)
@ -783,7 +787,7 @@ class SecondaryDB:
for touched in claim_hashes:
extra = {}
claim_txo = claims[touched]
if not claim_txo:
if not claim_txo or (claim_txo and not controlling_claims[claim_txo.normalized_name]):
yield touched, None, extra
continue
if touched in total_extra:
@ -949,21 +953,6 @@ class SecondaryDB:
else:
assert self.db_tx_count == 0
async def _read_claim_txos(self):
def read_claim_txos():
set_claim_to_txo = self.claim_to_txo.__setitem__
for k, v in self.prefix_db.claim_to_txo.iterate(fill_cache=False):
set_claim_to_txo(k.claim_hash, v)
self.txo_to_claim[v.tx_num][v.position] = k.claim_hash
self.claim_to_txo.clear()
self.txo_to_claim.clear()
start = time.perf_counter()
self.logger.info("loading claims")
await asyncio.get_event_loop().run_in_executor(self._executor, read_claim_txos)
ts = time.perf_counter() - start
self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4))
# async def _read_headers(self):
# # if self.headers is not None:
# # return
@ -1012,14 +1001,36 @@ class SecondaryDB:
secondary_path = '' if not self._secondary_name else os.path.join(
self._db_dir, self._secondary_name
)
open_db_canary = None
if self._secondary_name:
open_db_canary = os.path.join(self._db_dir, f'{self._secondary_name}-db-canary')
if os.path.exists(open_db_canary):
with open(self._need_restart_path, 'w+') as f:
f.write(f"{time.strftime(f'%Y-%m-%d %H:%M:%S')} {self._secondary_name}\n")
raise RuntimeError('scribe restart is needed')
else:
with open(open_db_canary, 'w'):
pass
else:
herald_db_canary = os.path.join(self._db_dir, 'lbry-reader-db-canary')
es_sync_db_canary = os.path.join(self._db_dir, 'lbry-elastic-writer-db-canary')
if os.path.exists(herald_db_canary):
os.remove(herald_db_canary)
if os.path.exists(es_sync_db_canary):
os.remove(es_sync_db_canary)
if os.path.exists(self._need_restart_path):
os.remove(self._need_restart_path)
db_path = os.path.join(self._db_dir, 'lbry-rocksdb')
self.prefix_db = PrefixDB(
db_path, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files,
unsafe_prefixes={DBStatePrefixRow.prefix, MempoolTXPrefixRow.prefix, HashXMempoolStatusPrefixRow.prefix},
secondary_path=secondary_path
secondary_path=secondary_path, enforce_integrity=self._enforce_integrity
)
if secondary_path != '':
os.remove(open_db_canary)
self.logger.info(f'opened db for read only: lbry-rocksdb (%s)', db_path)
else:
self.logger.info(f'opened db for writing: lbry-rocksdb (%s)', db_path)
@ -1056,8 +1067,6 @@ class SecondaryDB:
async def initialize_caches(self):
await self._read_tx_counts()
await self._read_block_hashes()
if self._cache_all_claim_txos:
await self._read_claim_txos()
if self._cache_all_tx_hashes:
await self._read_tx_hashes()
if self.db_height > 0:
@ -1135,22 +1144,24 @@ class SecondaryDB:
return self.tx_num_mapping[tx_hash]
return self.prefix_db.tx_num.get(tx_hash).tx_num
def get_tx_nums(self, tx_hashes: List[bytes]) -> Dict[bytes, Optional[int]]:
if not tx_hashes:
return {}
if self._cache_all_tx_hashes:
return {tx_hash: self.tx_num_mapping[tx_hash] for tx_hash in tx_hashes}
return {
k: None if not v else v.tx_num for k, v in zip(
tx_hashes, self.prefix_db.tx_num.multi_get([(tx_hash,) for tx_hash in tx_hashes])
)
}
def get_cached_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]:
if self._cache_all_claim_txos:
return self.claim_to_txo.get(claim_hash)
return self.prefix_db.claim_to_txo.get_pending(claim_hash)
def get_cached_claim_hash(self, tx_num: int, position: int) -> Optional[bytes]:
if self._cache_all_claim_txos:
if tx_num not in self.txo_to_claim:
return
return self.txo_to_claim[tx_num].get(position, None)
v = self.prefix_db.txo_to_claim.get_pending(tx_num, position)
return None if not v else v.claim_hash
def get_cached_claim_exists(self, tx_num: int, position: int) -> bool:
return self.get_cached_claim_hash(tx_num, position) is not None
# Header merkle cache
async def populate_header_merkle_cache(self):
@ -1306,6 +1317,7 @@ class SecondaryDB:
for tx_hash_bytes, tx in zip(needed_mempool, await run_in_executor(
self._executor, self.prefix_db.mempool_tx.multi_get, [(tx_hash,) for tx_hash in needed_mempool],
True, False)):
if tx is not None:
self.tx_cache[tx_hash_bytes] = tx, None, None, -1
tx_infos[tx_hash_bytes[::-1].hex()] = None if not tx else tx.hex(), {'block_height': -1}
await asyncio.sleep(0)

View file

@ -89,6 +89,12 @@ class PrefixRow(metaclass=PrefixRowType):
if v:
return v if not deserialize_value else self.unpack_value(v)
def key_exists(self, *key_args):
key_may_exist, _ = self._db.key_may_exist((self._column_family, self.pack_key(*key_args)))
if not key_may_exist:
return False
return self._db.get((self._column_family, self.pack_key(*key_args)), fill_cache=True) is not None
def multi_get(self, key_args: typing.List[typing.Tuple], fill_cache=True, deserialize_value=True):
packed_keys = {tuple(args): self.pack_key(*args) for args in key_args}
db_result = self._db.multi_get([(self._column_family, packed_keys[tuple(args)]) for args in key_args],
@ -118,26 +124,28 @@ class PrefixRow(metaclass=PrefixRowType):
if idx % step == 0:
await asyncio.sleep(0)
def stage_multi_put(self, items):
self._op_stack.multi_put([RevertablePut(self.pack_key(*k), self.pack_value(*v)) for k, v in items])
def stash_multi_put(self, items):
self._op_stack.stash_ops([RevertablePut(self.pack_key(*k), self.pack_value(*v)) for k, v in items])
def stash_multi_delete(self, items):
self._op_stack.stash_ops([RevertableDelete(self.pack_key(*k), self.pack_value(*v)) for k, v in items])
def get_pending(self, *key_args, fill_cache=True, deserialize_value=True):
packed_key = self.pack_key(*key_args)
last_op = self._op_stack.get_last_op_for_key(packed_key)
if last_op:
if last_op.is_put:
return last_op.value if not deserialize_value else self.unpack_value(last_op.value)
else: # it's a delete
pending_op = self._op_stack.get_pending_op(packed_key)
if pending_op and pending_op.is_delete:
return
if pending_op:
v = pending_op.value
else:
v = self._db.get((self._column_family, packed_key), fill_cache=fill_cache)
if v:
return v if not deserialize_value else self.unpack_value(v)
return None if v is None else (v if not deserialize_value else self.unpack_value(v))
def stage_put(self, key_args=(), value_args=()):
self._op_stack.append_op(RevertablePut(self.pack_key(*key_args), self.pack_value(*value_args)))
def stash_put(self, key_args=(), value_args=()):
self._op_stack.stash_ops([RevertablePut(self.pack_key(*key_args), self.pack_value(*value_args))])
def stage_delete(self, key_args=(), value_args=()):
self._op_stack.append_op(RevertableDelete(self.pack_key(*key_args), self.pack_value(*value_args)))
def stash_delete(self, key_args=(), value_args=()):
self._op_stack.stash_ops([RevertableDelete(self.pack_key(*key_args), self.pack_value(*value_args))])
@classmethod
def pack_partial_key(cls, *args) -> bytes:
@ -175,13 +183,14 @@ class BasePrefixDB:
UNDO_KEY_STRUCT = struct.Struct(b'>Q32s')
PARTIAL_UNDO_KEY_STRUCT = struct.Struct(b'>Q')
def __init__(self, path, max_open_files=64, secondary_path='', max_undo_depth: int = 200, unsafe_prefixes=None):
def __init__(self, path, max_open_files=64, secondary_path='', max_undo_depth: int = 200, unsafe_prefixes=None,
enforce_integrity=True):
column_family_options = {}
for prefix in DB_PREFIXES:
settings = COLUMN_SETTINGS[prefix.value]
column_family_options[prefix.value] = rocksdb.ColumnFamilyOptions()
column_family_options[prefix.value].table_factory = rocksdb.BlockBasedTableFactory(
block_cache=rocksdb.LRUCache(settings['cache_size']),
block_cache=rocksdb.LRUCache(settings['cache_size'])
)
self.column_families: typing.Dict[bytes, 'rocksdb.ColumnFamilyHandle'] = {}
options = rocksdb.Options(
@ -198,7 +207,9 @@ class BasePrefixDB:
cf = self._db.get_column_family(prefix.value)
self.column_families[prefix.value] = cf
self._op_stack = RevertableOpStack(self.get, self.multi_get, unsafe_prefixes=unsafe_prefixes)
self._op_stack = RevertableOpStack(
self.get, self.multi_get, unsafe_prefixes=unsafe_prefixes, enforce_integrity=enforce_integrity
)
self._max_undo_depth = max_undo_depth
def unsafe_commit(self):
@ -206,6 +217,7 @@ class BasePrefixDB:
Write staged changes to the database without keeping undo information
Changes written cannot be undone
"""
self.apply_stash()
try:
if not len(self._op_stack):
return
@ -226,6 +238,7 @@ class BasePrefixDB:
"""
Write changes for a block height to the database and keep undo information so that the changes can be reverted
"""
self.apply_stash()
undo_ops = self._op_stack.get_undo_ops()
delete_undos = []
if height > self._max_undo_depth:
@ -260,6 +273,7 @@ class BasePrefixDB:
undo_c_f = self.column_families[DB_PREFIXES.undo.value]
undo_info = self._db.get((undo_c_f, undo_key))
self._op_stack.apply_packed_undo_ops(undo_info)
self._op_stack.validate_and_apply_stashed_ops()
try:
with self._db.write_batch(sync=True) as batch:
batch_put = batch.put
@ -275,6 +289,9 @@ class BasePrefixDB:
finally:
self._op_stack.clear()
def apply_stash(self):
self._op_stack.validate_and_apply_stashed_ops()
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
cf = self.column_families[key[:1]]
return self._db.get((cf, key), fill_cache=fill_cache)
@ -282,18 +299,15 @@ class BasePrefixDB:
def multi_get(self, keys: typing.List[bytes], fill_cache=True):
if len(keys) == 0:
return []
first_key = keys[0]
if not all(first_key[0] == key[0] for key in keys):
raise ValueError('cannot multi-delete across column families')
cf = self.column_families[first_key[:1]]
db_result = self._db.multi_get([(cf, k) for k in keys], fill_cache=fill_cache)
get_cf = self.column_families.__getitem__
db_result = self._db.multi_get([(get_cf(k[:1]), k) for k in keys], fill_cache=fill_cache)
return list(db_result.values())
def multi_delete(self, items: typing.List[typing.Tuple[bytes, bytes]]):
self._op_stack.multi_delete([RevertableDelete(k, v) for k, v in items])
self._op_stack.stash_ops([RevertableDelete(k, v) for k, v in items])
def multi_put(self, items: typing.List[typing.Tuple[bytes, bytes]]):
self._op_stack.multi_put([RevertablePut(k, v) for k, v in items])
self._op_stack.stash_ops([RevertablePut(k, v) for k, v in items])
def iterator(self, start: bytes, column_family: 'rocksdb.ColumnFamilyHandle' = None,
iterate_lower_bound: bytes = None, iterate_upper_bound: bytes = None,
@ -312,11 +326,11 @@ class BasePrefixDB:
def try_catch_up_with_primary(self):
self._db.try_catch_up_with_primary()
def stage_raw_put(self, key: bytes, value: bytes):
self._op_stack.append_op(RevertablePut(key, value))
def stash_raw_put(self, key: bytes, value: bytes):
self._op_stack.stash_ops([RevertablePut(key, value)])
def stage_raw_delete(self, key: bytes, value: bytes):
self._op_stack.append_op(RevertableDelete(key, value))
def stash_raw_delete(self, key: bytes, value: bytes):
self._op_stack.stash_ops([RevertableDelete(key, value)])
def estimate_num_keys(self, column_family: 'rocksdb.ColumnFamilyHandle' = None):
return int(self._db.get_property(b'rocksdb.estimate-num-keys', column_family).decode())

View file

@ -0,0 +1,57 @@
import logging
from collections import defaultdict
FROM_VERSION = 11
TO_VERSION = 12
def migrate(db):
log = logging.getLogger(__name__)
prefix_db = db.prefix_db
log.info("migrating the db to version 12")
effective_amounts = defaultdict(int)
log.info("deleting any existing future effective amounts")
to_delete = list(prefix_db.future_effective_amount.iterate(deserialize_key=False, deserialize_value=False))
while to_delete:
batch, to_delete = to_delete[:100000], to_delete[100000:]
if batch:
prefix_db.multi_delete(batch)
prefix_db.unsafe_commit()
log.info("calculating future claim effective amounts for the new index at block %i", db.db_height)
cnt = 0
for k, v in prefix_db.active_amount.iterate():
cnt += 1
effective_amounts[k.claim_hash] += v.amount
if cnt % 1000000 == 0:
log.info("scanned %i amounts for %i claims", cnt, len(effective_amounts))
log.info("preparing to insert future effective amounts")
effective_amounts_to_put = [
prefix_db.future_effective_amount.pack_item(claim_hash, effective_amount)
for claim_hash, effective_amount in effective_amounts.items()
]
log.info("inserting %i future effective amounts", len(effective_amounts_to_put))
cnt = 0
while effective_amounts_to_put:
batch, effective_amounts_to_put = effective_amounts_to_put[:100000], effective_amounts_to_put[100000:]
if batch:
prefix_db.multi_put(batch)
prefix_db.unsafe_commit()
cnt += len(batch)
if cnt % 1000000 == 0:
log.info("inserted effective amounts for %i claims", cnt)
log.info("finished building the effective amount index")
db.db_version = 12
db.write_db_state()
db.prefix_db.unsafe_commit()
log.info("finished migration to version 12")

View file

@ -3,6 +3,7 @@ import struct
import array
import base64
from typing import Union, Tuple, NamedTuple, Optional
from hub.common import ResumableSHA256
from hub.db.common import DB_PREFIXES
from hub.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow
from hub.schema.url import normalize_name
@ -1292,7 +1293,7 @@ class UTXOPrefixRow(PrefixRow):
prefix = DB_PREFIXES.utxo.value
key_struct = struct.Struct(b'>11sLH')
value_struct = struct.Struct(b'>Q')
cache_size = 1024 * 1024 * 64
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>11s').pack,
@ -1325,7 +1326,7 @@ class HashXUTXOPrefixRow(PrefixRow):
prefix = DB_PREFIXES.hashx_utxo.value
key_struct = struct.Struct(b'>4sLH')
value_struct = struct.Struct(b'>11s')
cache_size = 1024 * 1024 * 64
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>4s').pack,
@ -1783,7 +1784,7 @@ class EffectiveAmountPrefixRow(PrefixRow):
prefix = DB_PREFIXES.effective_amount.value
key_struct = struct.Struct(b'>20s')
value_struct = struct.Struct(b'>QQ')
cache_size = 1024 * 1024 * 64
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>20s').pack
@ -1799,6 +1800,7 @@ class EffectiveAmountPrefixRow(PrefixRow):
@classmethod
def pack_value(cls, effective_amount: int, support_sum: int) -> bytes:
assert effective_amount >= support_sum
return super().pack_value(effective_amount, support_sum)
@classmethod
@ -1810,11 +1812,93 @@ class EffectiveAmountPrefixRow(PrefixRow):
return cls.pack_key(claim_hash), cls.pack_value(effective_amount, support_sum)
class FutureEffectiveAmountKey(NamedTuple):
claim_hash: bytes
class FutureEffectiveAmountValue(NamedTuple):
future_effective_amount: int
class FutureEffectiveAmountPrefixRow(PrefixRow):
prefix = DB_PREFIXES.future_effective_amount.value
key_struct = struct.Struct(b'>20s')
value_struct = struct.Struct(b'>Q')
cache_size = 1024 * 1024 * 64
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>20s').pack
]
@classmethod
def pack_key(cls, claim_hash: bytes):
return super().pack_key(claim_hash)
@classmethod
def unpack_key(cls, key: bytes) -> FutureEffectiveAmountKey:
return FutureEffectiveAmountKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, future_effective_amount: int) -> bytes:
return super().pack_value(future_effective_amount)
@classmethod
def unpack_value(cls, data: bytes) -> FutureEffectiveAmountValue:
return FutureEffectiveAmountValue(*cls.value_struct.unpack(data))
@classmethod
def pack_item(cls, claim_hash: bytes, future_effective_amount: int):
return cls.pack_key(claim_hash), cls.pack_value(future_effective_amount)
class HashXHistoryHasherKey(NamedTuple):
hashX: bytes
class HashXHistoryHasherValue(NamedTuple):
hasher: ResumableSHA256
class HashXHistoryHasherPrefixRow(PrefixRow):
prefix = DB_PREFIXES.hashX_history_hash.value
key_struct = struct.Struct(b'>11s')
value_struct = struct.Struct(b'>120s')
cache_size = 1024 * 1024 * 64
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>11s').pack
]
@classmethod
def pack_key(cls, hashX: bytes):
return super().pack_key(hashX)
@classmethod
def unpack_key(cls, key: bytes) -> HashXHistoryHasherKey:
return HashXHistoryHasherKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, hasher: ResumableSHA256) -> bytes:
return super().pack_value(hasher.get_state())
@classmethod
def unpack_value(cls, data: bytes) -> HashXHistoryHasherValue:
return HashXHistoryHasherValue(ResumableSHA256(*super().unpack_value(data)))
@classmethod
def pack_item(cls, hashX: bytes, hasher: ResumableSHA256):
return cls.pack_key(hashX), cls.pack_value(hasher)
class PrefixDB(BasePrefixDB):
def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64,
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None):
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None,
enforce_integrity: bool = True):
super().__init__(path, max_open_files=max_open_files, secondary_path=secondary_path,
max_undo_depth=reorg_limit, unsafe_prefixes=unsafe_prefixes)
max_undo_depth=reorg_limit, unsafe_prefixes=unsafe_prefixes,
enforce_integrity=enforce_integrity)
db = self._db
self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack)
self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack)
@ -1853,6 +1937,8 @@ class PrefixDB(BasePrefixDB):
self.hashX_status = HashXStatusPrefixRow(db, self._op_stack)
self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack)
self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack)
self.future_effective_amount = FutureEffectiveAmountPrefixRow(db, self._op_stack)
self.hashX_history_hasher = HashXHistoryHasherPrefixRow(db, self._op_stack)
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:

View file

@ -1,8 +1,8 @@
import struct
import logging
from string import printable
from collections import defaultdict
from typing import Tuple, Iterable, Callable, Optional, List
from collections import defaultdict, deque
from typing import Tuple, Iterable, Callable, Optional, List, Deque
from hub.db.common import DB_PREFIXES
_OP_STRUCT = struct.Struct('>BLL')
@ -83,7 +83,8 @@ class OpStackIntegrity(Exception):
class RevertableOpStack:
def __init__(self, get_fn: Callable[[bytes], Optional[bytes]],
multi_get_fn: Callable[[List[bytes]], Iterable[Optional[bytes]]], unsafe_prefixes=None):
multi_get_fn: Callable[[List[bytes]], Iterable[Optional[bytes]]], unsafe_prefixes=None,
enforce_integrity=True):
"""
This represents a sequence of revertable puts and deletes to a key-value database that checks for integrity
violations when applying the puts and deletes. The integrity checks assure that keys that do not exist
@ -97,8 +98,86 @@ class RevertableOpStack:
"""
self._get = get_fn
self._multi_get = multi_get_fn
# a defaultdict of verified ops ready to be applied
self._items = defaultdict(list)
# a faster deque of ops that have not yet been checked for integrity errors
self._stash: Deque[RevertableOp] = deque()
self._stashed_last_op_for_key = {}
self._unsafe_prefixes = unsafe_prefixes or set()
self._enforce_integrity = enforce_integrity
def stash_ops(self, ops: Iterable[RevertableOp]):
self._stash.extend(ops)
for op in ops:
self._stashed_last_op_for_key[op.key] = op
def validate_and_apply_stashed_ops(self):
if not self._stash:
return
ops_to_apply = []
append_op_needed = ops_to_apply.append
pop_staged_op = self._stash.popleft
unique_keys = set()
# nullify the ops that cancel against the most recent staged for a key
while self._stash:
op = pop_staged_op()
if self._items[op.key] and op.invert() == self._items[op.key][-1]:
self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both
continue
elif self._items[op.key] and self._items[op.key][-1] == op: # duplicate of last op
continue # raise an error?
else:
append_op_needed(op)
unique_keys.add(op.key)
existing = {}
if self._enforce_integrity and unique_keys:
unique_keys = list(unique_keys)
for idx in range(0, len(unique_keys), 10000):
batch = unique_keys[idx:idx+10000]
existing.update({
k: v for k, v in zip(batch, self._multi_get(batch))
})
for op in ops_to_apply:
if op.key in self._items and len(self._items[op.key]) and self._items[op.key][-1] == op.invert():
self._items[op.key].pop()
if not self._items[op.key]:
self._items.pop(op.key)
continue
if not self._enforce_integrity:
self._items[op.key].append(op)
continue
stored_val = existing[op.key]
has_stored_val = stored_val is not None
delete_stored_op = None if not has_stored_val else RevertableDelete(op.key, stored_val)
will_delete_existing_stored = False if delete_stored_op is None else (delete_stored_op in self._items[op.key])
try:
if op.is_delete:
if has_stored_val and stored_val != op.value and not will_delete_existing_stored:
# there is a value and we're not deleting it in this op
# check that a delete for the stored value is in the stack
raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}\nvs\n{stored_val}")
elif not has_stored_val:
raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
elif stored_val != op.value:
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
else:
if has_stored_val and not will_delete_existing_stored:
raise OpStackIntegrity(f"db op tries to overwrite before deleting existing: {op}")
if op.key in self._items and len(self._items[op.key]) and self._items[op.key][-1].is_put:
raise OpStackIntegrity(f"db op tries to overwrite with {op} before deleting pending "
f"{self._items[op.key][-1]}")
except OpStackIntegrity as err:
if op.key[:1] in self._unsafe_prefixes:
log.debug(f"skipping over integrity error: {err}")
else:
raise err
self._items[op.key].append(op)
self._stashed_last_op_for_key.clear()
def append_op(self, op: RevertableOp):
"""
@ -217,15 +296,10 @@ class RevertableOpStack:
raise err
self._items[op.key].append(op)
def extend_ops(self, ops: Iterable[RevertableOp]):
"""
Apply a sequence of put or delete ops, checking that they introduce no integrity errors
"""
for op in ops:
self.append_op(op)
def clear(self):
self._items.clear()
self._stash.clear()
self._stashed_last_op_for_key.clear()
def __len__(self):
return sum(map(len, self._items.values()))
@ -250,10 +324,24 @@ class RevertableOpStack:
"""
Unpack and apply a sequence of undo ops from serialized undo bytes
"""
while packed:
op, packed = RevertableOp.unpack(packed)
self.append_op(op)
offset = 0
packed_size = len(packed)
while offset < packed_size:
is_put, key_len, val_len = _OP_STRUCT.unpack(packed[offset:offset + 9])
offset += 9
key = packed[offset:offset + key_len]
offset += key_len
value = packed[offset:offset + val_len]
offset += val_len
if is_put == 1:
op = RevertablePut(key, value)
else:
op = RevertableDelete(key, value)
self._stash.append(op)
self._stashed_last_op_for_key[op.key] = op
def get_last_op_for_key(self, key: bytes) -> Optional[RevertableOp]:
def get_pending_op(self, key: bytes) -> Optional[RevertableOp]:
if key in self._stashed_last_op_for_key:
return self._stashed_last_op_for_key[key]
if key in self._items and self._items[key]:
return self._items[key][-1]

View file

@ -9,11 +9,11 @@ from hub.db.common import ResolveResult
class ElasticSyncDB(SecondaryDB):
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
cache_all_tx_hashes: bool = False,
blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False):
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit,
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
index_address_status)
self.block_timestamp_cache = LRUCache(1024)

View file

@ -3,11 +3,11 @@ from hub.env import Env
class ElasticEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None,
cache_all_tx_hashes=None, cache_all_claim_txos=None, elastic_host=None, elastic_port=None,
cache_all_tx_hashes=None, elastic_host=None, elastic_port=None,
es_index_prefix=None, elastic_notifier_host=None, elastic_notifier_port=None,
blocking_channel_ids=None, filtering_channel_ids=None, reindex=False):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids)
blocking_channel_ids, filtering_channel_ids)
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default(
@ -43,7 +43,7 @@ class ElasticEnv(Env):
elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain,
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit,
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos, blocking_channel_ids=args.blocking_channel_ids,
blocking_channel_ids=args.blocking_channel_ids,
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
elastic_notifier_port=args.elastic_notifier_port
)

View file

@ -49,7 +49,7 @@ class ElasticSyncService(BlockchainReaderService):
def open_db(self):
env = self.env
self.db = ElasticSyncDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status
@ -89,7 +89,10 @@ class ElasticSyncService(BlockchainReaderService):
info = {}
if os.path.exists(self._es_info_path):
with open(self._es_info_path, 'r') as f:
try:
info.update(json.loads(f.read()))
except json.decoder.JSONDecodeError:
self.log.warning('failed to parse es sync status file')
self._last_wrote_height = int(info.get('height', 0))
self._last_wrote_block_hash = info.get('block_hash', None)
@ -171,28 +174,31 @@ class ElasticSyncService(BlockchainReaderService):
async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels):
only_channels = lambda x: {k: chan for k, (chan, repost) in x.items()}
async def batched_update_filter(items: typing.Dict[bytes, bytes], channel: bool, censor_type: int):
batches = [{}]
for k, v in items.items():
if len(batches[-1]) == 2000:
batches.append({})
batches[-1][k] = v
for batch in batches:
if batch:
await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(censor_type, only_channels(batch)), slices=4)
if channel:
await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(censor_type, only_channels(batch), True),
slices=4)
await self.sync_client.indices.refresh(self.index)
if filtered_streams:
await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(Censor.SEARCH, only_channels(filtered_streams)), slices=4)
await self.sync_client.indices.refresh(self.index)
await batched_update_filter(filtered_streams, False, Censor.SEARCH)
if filtered_channels:
await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(Censor.SEARCH, only_channels(filtered_channels)), slices=4)
await self.sync_client.indices.refresh(self.index)
await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(Censor.SEARCH, only_channels(filtered_channels), True), slices=4)
await self.sync_client.indices.refresh(self.index)
await batched_update_filter(filtered_channels, True, Censor.SEARCH)
if blocked_streams:
await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(Censor.RESOLVE, only_channels(blocked_streams)), slices=4)
await self.sync_client.indices.refresh(self.index)
await batched_update_filter(blocked_streams, False, Censor.RESOLVE)
if blocked_channels:
await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(Censor.RESOLVE, only_channels(blocked_channels)), slices=4)
await self.sync_client.indices.refresh(self.index)
await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(Censor.RESOLVE, only_channels(blocked_channels), True), slices=4)
await self.sync_client.indices.refresh(self.index)
await batched_update_filter(blocked_channels, True, Censor.RESOLVE)
@staticmethod
def _upsert_claim_query(index, claim):
@ -247,7 +253,7 @@ class ElasticSyncService(BlockchainReaderService):
apply_blocking=False,
apply_filtering=False):
if not claim:
self.log.warning("wat")
self.log.warning("cannot sync claim %s", (claim_hash or b'').hex())
continue
claims[claim_hash] = claim
total_extras[claim_hash] = claim

View file

@ -30,7 +30,7 @@ class Env:
pass
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
prometheus_port=None, cache_all_tx_hashes=None,
blocking_channel_ids=None, filtering_channel_ids=None, index_address_status=None):
self.logger = logging.getLogger(__name__)
@ -46,7 +46,6 @@ class Env:
self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0)
self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False)
self.cache_all_claim_txos = cache_all_claim_txos if cache_all_claim_txos is not None else self.boolean('CACHE_ALL_CLAIM_TXOS', False)
# Filtering / Blocking
self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default(
'BLOCKING_CHANNEL_IDS', '').split(' ')
@ -171,11 +170,6 @@ class Env:
"resolve, transaction fetching, and block sync all faster at the expense of higher "
"memory usage (at least 10GB more). Can be set in env with 'CACHE_ALL_TX_HASHES'.",
default=cls.boolean('CACHE_ALL_TX_HASHES', False))
parser.add_argument('--cache_all_claim_txos', action='store_true',
help="Load all claim txos into memory. This will make address subscriptions and sync, "
"resolve, transaction fetching, and block sync all faster at the expense of higher "
"memory usage. Can be set in env with 'CACHE_ALL_CLAIM_TXOS'.",
default=cls.boolean('CACHE_ALL_CLAIM_TXOS', False))
parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0),
help="Port for prometheus metrics to listen on, disabled by default. "
"Can be set in env with 'PROMETHEUS_PORT'.")

View file

@ -6,11 +6,11 @@ from hub.db import SecondaryDB
class HeraldDB(SecondaryDB):
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
cache_all_tx_hashes: bool = False,
blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False, merkle_cache_size=32768, tx_cache_size=32768):
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit,
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
index_address_status, merkle_cache_size, tx_cache_size)
# self.headers = None

View file

@ -1,29 +1,38 @@
import re
from collections import deque
from hub.env import Env
ELASTIC_SERVICES_REGEX = re.compile("(([\d|\.]|[^,:\/])*:\d*\/([\d|\.]|[^,:\/])*:\d*,?)*")
def parse_es_services(elastic_services_arg: str):
match = ELASTIC_SERVICES_REGEX.match(elastic_services_arg)
if not match:
return []
matching = match.group()
services = [item.split('/') for item in matching.split(',') if item]
return [
((es.split(':')[0], int(es.split(':')[1])), (notifier.split(':')[0], int(notifier.split(':')[1])))
for (es, notifier) in services
]
class ServerEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
daemon_url=None, host=None, elastic_host=None, elastic_port=None, es_index_prefix=None,
prometheus_port=None, cache_all_tx_hashes=None,
daemon_url=None, host=None, elastic_services=None, es_index_prefix=None,
tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None,
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
session_timeout=None, drop_client=None, description=None, daily_fee=None,
database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None,
blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None,
index_address_status=None, address_history_cache_size=None, daemon_ca_path=None,
database_query_timeout=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None,
peer_announce=None, index_address_status=None, address_history_cache_size=None, daemon_ca_path=None,
merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None,
history_tx_cache_size=None, largest_address_history_cache_size=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
blocking_channel_ids, filtering_channel_ids, index_address_status)
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.host = host if host is not None else self.default('HOST', 'localhost')
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default(
'ELASTIC_NOTIFIER_HOST', 'localhost')
self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer(
'ELASTIC_NOTIFIER_PORT', 19080)
self.elastic_services = deque(parse_es_services(elastic_services or 'localhost:9200/localhost:19080'))
self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '')
# Server stuff
self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None)
@ -93,15 +102,13 @@ class ServerEnv(Env):
help="Regex used for blocking clients")
parser.add_argument('--session_timeout', type=int, default=cls.integer('SESSION_TIMEOUT', 600),
help="Session inactivity timeout")
parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str,
help="Hostname or ip address of the elasticsearch instance to connect to. "
"Can be set in env with 'ELASTIC_HOST'")
parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int,
help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'")
parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'),
type=str, help='elasticsearch sync notifier host, defaults to localhost')
parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int,
help='elasticsearch sync notifier port')
parser.add_argument('--elastic_services',
default=cls.default('ELASTIC_SERVICES', 'localhost:9200/localhost:19080'), type=str,
help="Hosts and ports for elastic search and the scribe elastic sync notifier. "
"Given as a comma separated list without spaces of items in the format "
"<elastic host>:<elastic port>/<notifier host>:<notifier port> . "
"Defaults to 'localhost:9200/localhost:19080'. "
"Can be set in env with 'ELASTIC_SERVICES'")
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
parser.add_argument('--allow_lan_udp', action='store_true',
help="Reply to clients on the local network", default=cls.boolean('ALLOW_LAN_UDP', False))
@ -141,18 +148,17 @@ class ServerEnv(Env):
@classmethod
def from_arg_parser(cls, args):
return cls(
db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_host=args.elastic_host,
elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain,
db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_services=args.elastic_services,
max_query_workers=args.max_query_workers, chain=args.chain,
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file,
allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos, country=args.country, payment_address=args.payment_address,
country=args.country, payment_address=args.payment_address,
donation_address=args.donation_address, max_send=args.max_send, max_receive=args.max_receive,
max_sessions=args.max_sessions, session_timeout=args.session_timeout,
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,
database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids,
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
elastic_notifier_port=args.elastic_notifier_port, index_address_status=args.index_address_statuses,
filtering_channel_ids=args.filtering_channel_ids, index_address_status=args.index_address_statuses,
address_history_cache_size=args.address_history_cache_size, daemon_ca_path=args.daemon_ca_path,
merkle_cache_size=args.merkle_cache_size, resolved_url_cache_size=args.resolved_url_cache_size,
tx_cache_size=args.tx_cache_size, history_tx_cache_size=args.history_tx_cache_size,

View file

@ -3,7 +3,7 @@ import asyncio
from bisect import bisect_right
from collections import Counter, deque
from operator import itemgetter
from typing import Optional, List, TYPE_CHECKING
from typing import Optional, List, TYPE_CHECKING, Deque, Tuple
from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError
from hub.schema.result import Censor, Outputs
@ -29,8 +29,9 @@ class StreamResolution(str):
class SearchIndex:
VERSION = 1
def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost',
elastic_port=9200, timeout_counter: Optional['PrometheusCounter'] = None):
def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0,
elastic_services: Optional[Deque[Tuple[Tuple[str, int], Tuple[str, int]]]] = None,
timeout_counter: Optional['PrometheusCounter'] = None):
self.hub_db = hub_db
self.search_timeout = search_timeout
self.timeout_counter: Optional['PrometheusCounter'] = timeout_counter
@ -41,8 +42,8 @@ class SearchIndex:
self.logger = logging.getLogger(__name__)
self.claim_cache = LRUCache(2 ** 15)
self.search_cache = LRUCache(2 ** 17)
self._elastic_host = elastic_host
self._elastic_port = elastic_port
self._elastic_services = elastic_services
self.lost_connection = asyncio.Event()
async def get_index_version(self) -> int:
try:
@ -59,7 +60,7 @@ class SearchIndex:
async def start(self) -> bool:
if self.sync_client:
return False
hosts = [{'host': self._elastic_host, 'port': self._elastic_port}]
hosts = [{'host': self._elastic_services[0][0][0], 'port': self._elastic_services[0][0][1]}]
self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout)
self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout+1)
while True:

View file

@ -1,18 +1,25 @@
import time
import typing
import asyncio
from prometheus_client import Counter
from hub import PROMETHEUS_NAMESPACE
from hub.scribe.daemon import LBCDaemon
from hub.herald.session import SessionManager
from hub.herald.mempool import HubMemPool
from hub.herald.udp import StatusServer
from hub.herald.db import HeraldDB
from hub.herald.search import SearchIndex
from hub.service import BlockchainReaderService
from hub.notifier_protocol import ElasticNotifierClientProtocol
if typing.TYPE_CHECKING:
from hub.herald.env import ServerEnv
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_hub"
class HubServerService(BlockchainReaderService):
interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
def __init__(self, env: 'ServerEnv'):
super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker')
self.env = env
@ -21,8 +28,15 @@ class HubServerService(BlockchainReaderService):
self.status_server = StatusServer()
self.daemon = LBCDaemon(env.coin, env.daemon_url, daemon_ca_path=env.daemon_ca_path) # only needed for broadcasting txs
self.mempool = HubMemPool(self.env.coin, self.db)
self.search_index = SearchIndex(
self.db, self.env.es_index_prefix, self.env.database_query_timeout,
elastic_services=self.env.elastic_services,
timeout_counter=self.interrupt_count_metric
)
self.session_manager = SessionManager(
env, self.db, self.mempool, self.daemon,
env, self.db, self.mempool, self.daemon, self.search_index,
self.shutdown_event,
on_available_callback=self.status_server.set_available,
on_unavailable_callback=self.status_server.set_unavailable
@ -30,7 +44,7 @@ class HubServerService(BlockchainReaderService):
self.mempool.session_manager = self.session_manager
self.es_notifications = asyncio.Queue()
self.es_notification_client = ElasticNotifierClientProtocol(
self.es_notifications, self.env.elastic_notifier_host, self.env.elastic_notifier_port
self.es_notifications, self.env.elastic_services
)
self.synchronized = asyncio.Event()
self._es_height = None
@ -39,7 +53,7 @@ class HubServerService(BlockchainReaderService):
def open_db(self):
env = self.env
self.db = HeraldDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status, merkle_cache_size=env.merkle_cache_size,
@ -52,7 +66,7 @@ class HubServerService(BlockchainReaderService):
# self.mempool.notified_mempool_txs.clear()
def clear_search_cache(self):
self.session_manager.search_index.clear_caches()
self.search_index.clear_caches()
def advance(self, height: int):
super().advance(height)
@ -116,8 +130,44 @@ class HubServerService(BlockchainReaderService):
self.log.info("es and reader are not yet in sync (block %s vs %s)", self._es_height,
self.db.db_height)
finally:
self.log.warning("closing es sync notification loop at %s", self._es_height)
self.es_notification_client.close()
async def failover_elastic_services(self, synchronized: asyncio.Event):
first_connect = True
if not self.es_notification_client.lost_connection.is_set():
synchronized.set()
while True:
try:
await self.es_notification_client.lost_connection.wait()
if not first_connect:
self.log.warning("lost connection to scribe-elastic-sync notifier (%s:%i)",
self.es_notification_client.host, self.es_notification_client.port)
await self.es_notification_client.connect()
first_connect = False
synchronized.set()
self.log.info("connected to es notifier on %s:%i", self.es_notification_client.host,
self.es_notification_client.port)
await self.search_index.start()
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
self.log.warning("lost connection to scribe-elastic-sync notifier")
await self.search_index.stop()
self.search_index.clear_caches()
if len(self.env.elastic_services) > 1:
self.env.elastic_services.rotate(-1)
self.log.warning("attempting to failover to %s:%i", self.es_notification_client.host,
self.es_notification_client.port)
await asyncio.sleep(1)
else:
self.log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)",
self.es_notification_client.host, self.es_notification_client.port)
await asyncio.sleep(30)
else:
self.log.info("stopping the notifier loop")
raise e
async def start_status_server(self):
if self.env.udp_port and int(self.env.udp_port):
await self.status_server.start(
@ -127,14 +177,13 @@ class HubServerService(BlockchainReaderService):
def _iter_start_tasks(self):
yield self.start_status_server()
yield self.start_cancellable(self.es_notification_client.maintain_connection)
yield self.start_cancellable(self.receive_es_notifications)
yield self.start_cancellable(self.failover_elastic_services)
yield self.start_cancellable(self.mempool.send_notifications_forever)
yield self.start_cancellable(self.refresh_blocks_forever)
yield self.finished_initial_catch_up.wait()
self.block_count_metric.set(self.last_state.height)
yield self.start_prometheus()
yield self.start_cancellable(self.receive_es_notifications)
yield self.session_manager.search_index.start()
yield self.start_cancellable(self.session_manager.serve, self.mempool)
def _iter_stop_tasks(self):

View file

@ -142,7 +142,6 @@ class SessionManager:
tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE)
urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE)
resolved_url_count_metric = Counter("resolved_url", "Number of resolved urls", namespace=NAMESPACE)
interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
db_operational_error_metric = Counter(
"operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE
)
@ -181,7 +180,7 @@ class SessionManager:
)
def __init__(self, env: 'ServerEnv', db: 'SecondaryDB', mempool: 'HubMemPool',
daemon: 'LBCDaemon', shutdown_event: asyncio.Event,
daemon: 'LBCDaemon', search_index: 'SearchIndex', shutdown_event: asyncio.Event,
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
env.max_send = max(350000, env.max_send)
self.env = env
@ -190,6 +189,7 @@ class SessionManager:
self.on_unavailable_callback = on_unavailable_callback
self.daemon = daemon
self.mempool = mempool
self.search_index = search_index
self.shutdown_event = shutdown_event
self.logger = logging.getLogger(__name__)
self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
@ -208,12 +208,6 @@ class SessionManager:
self.protocol_class = LBRYElectrumX
self.session_event = Event()
# Search index
self.search_index = SearchIndex(
self.db, self.env.es_index_prefix, self.env.database_query_timeout,
elastic_host=env.elastic_host, elastic_port=env.elastic_port,
timeout_counter=self.interrupt_count_metric
)
self.running = False
# hashX: List[int]
self.hashX_raw_history_cache = LFUCacheWithMetrics(env.hashX_history_cache_size, metric_name='raw_history', namespace=NAMESPACE)
@ -1284,7 +1278,7 @@ class LBRYElectrumX(asyncio.Protocol):
kwargs['channel_id'] = channel_claim.claim_hash.hex()
return await self.session_manager.search_index.cached_search(kwargs)
except ConnectionTimeout:
self.session_manager.interrupt_count_metric.inc()
self.session_manager.search_index.timeout_counter.inc()
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
except TooManyClaimSearchParametersError as err:
await asyncio.sleep(2)

View file

@ -2,6 +2,7 @@ import typing
import struct
import asyncio
import logging
from typing import Deque, Tuple
log = logging.getLogger(__name__)
@ -31,52 +32,39 @@ class ElasticNotifierProtocol(asyncio.Protocol):
class ElasticNotifierClientProtocol(asyncio.Protocol):
"""notifies the reader when ES has written updates"""
def __init__(self, notifications: asyncio.Queue, host: str, port: int):
def __init__(self, notifications: asyncio.Queue, notifier_hosts: Deque[Tuple[Tuple[str, int], Tuple[str, int]]]):
assert len(notifier_hosts) > 0, 'no elastic notifier clients given'
self.notifications = notifications
self.transport: typing.Optional[asyncio.Transport] = None
self.host = host
self.port = port
self._lost_connection = asyncio.Event()
self._lost_connection.set()
self._notifier_hosts = notifier_hosts
self.lost_connection = asyncio.Event()
self.lost_connection.set()
@property
def host(self):
return self._notifier_hosts[0][1][0]
@property
def port(self):
return self._notifier_hosts[0][1][1]
async def connect(self):
if self._lost_connection.is_set():
if self.lost_connection.is_set():
await asyncio.get_event_loop().create_connection(
lambda: self, self.host, self.port
)
async def maintain_connection(self, synchronized: asyncio.Event):
first_connect = True
if not self._lost_connection.is_set():
synchronized.set()
while True:
try:
await self._lost_connection.wait()
if not first_connect:
log.warning("lost connection to scribe-elastic-sync notifier")
await self.connect()
first_connect = False
synchronized.set()
log.info("connected to es notifier")
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)", self.host, self.port)
await asyncio.sleep(30)
else:
log.info("stopping the notifier loop")
raise e
def close(self):
if self.transport and not self.transport.is_closing():
self.transport.close()
def connection_made(self, transport):
self.transport = transport
self._lost_connection.clear()
self.lost_connection.clear()
def connection_lost(self, exc) -> None:
self.transport = None
self._lost_connection.set()
self.lost_connection.set()
def data_received(self, data: bytes) -> None:
try:

View file

@ -1,21 +1,23 @@
import hashlib
import asyncio
import array
import time
from typing import List
from concurrent.futures.thread import ThreadPoolExecutor
from bisect import bisect_right
from hub.common import sha256
from hub.common import ResumableSHA256
from hub.db import SecondaryDB
class PrimaryDB(SecondaryDB):
def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
cache_all_tx_hashes: bool = False,
max_open_files: int = 64, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False):
super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_claim_txos, cache_all_tx_hashes,
blocking_channel_ids, filtering_channel_ids, executor, index_address_status)
index_address_status=False, enforce_integrity=True):
super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_tx_hashes,
blocking_channel_ids, filtering_channel_ids, executor, index_address_status,
enforce_integrity=enforce_integrity)
def _rebuild_hashX_status_index(self, start_height: int):
self.logger.warning("rebuilding the address status index...")
@ -33,20 +35,25 @@ class PrimaryDB(SecondaryDB):
if last_hashX:
yield last_hashX
def hashX_status_from_history(history: bytes) -> bytes:
def hashX_status_from_history(history: bytes) -> ResumableSHA256:
tx_counts = self.tx_counts
hist_tx_nums = array.array('I')
hist_tx_nums.frombytes(history)
hist = ''
for tx_num in hist_tx_nums:
hist += f'{self.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'
return sha256(hist.encode())
digest = ResumableSHA256()
digest.update(
b''.join(f'{tx_hash[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'.encode()
for tx_num, tx_hash in zip(
hist_tx_nums,
self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in hist_tx_nums], deserialize_value=False)
))
)
return digest
start = time.perf_counter()
if start_height <= 0:
self.logger.info("loading all blockchain addresses, this will take a little while...")
hashXs = [hashX for hashX in hashX_iterator()]
hashXs = list({hashX for hashX in hashX_iterator()})
else:
self.logger.info("loading addresses since block %i...", start_height)
hashXs = set()
@ -63,16 +70,23 @@ class PrimaryDB(SecondaryDB):
hashX_cnt += 1
key = prefix_db.hashX_status.pack_key(hashX)
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
status = hashX_status_from_history(history)
digester = hashX_status_from_history(history)
status = digester.digest()
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
if existing_status and existing_status == status:
continue
elif not existing_status:
prefix_db.stage_raw_put(key, status)
existing_digester = prefix_db.hashX_history_hasher.get(hashX)
if not existing_status:
prefix_db.stash_raw_put(key, status)
op_cnt += 1
else:
prefix_db.stage_raw_delete(key, existing_status)
prefix_db.stage_raw_put(key, status)
prefix_db.stash_raw_delete(key, existing_status)
prefix_db.stash_raw_put(key, status)
op_cnt += 2
if not existing_digester:
prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,))
op_cnt += 1
else:
prefix_db.hashX_history_hasher.stash_delete((hashX,), existing_digester)
prefix_db.hashX_history_hasher.stash_put((hashX,), (digester,))
op_cnt += 2
if op_cnt > 100000:
prefix_db.unsafe_commit()
@ -93,8 +107,8 @@ class PrimaryDB(SecondaryDB):
def apply_expiration_extension_fork(self):
# TODO: this can't be reorged
for k, v in self.prefix_db.claim_expiration.iterate():
self.prefix_db.claim_expiration.stage_delete(k, v)
self.prefix_db.claim_expiration.stage_put(
self.prefix_db.claim_expiration.stash_delete(k, v)
self.prefix_db.claim_expiration.stash_put(
(bisect_right(self.tx_counts, k.tx_num) + self.coin.nExtendedClaimExpirationTime,
k.tx_num, k.position), v
)
@ -104,8 +118,8 @@ class PrimaryDB(SecondaryDB):
"""Write (UTXO) state to the batch."""
if self.db_height > 0:
existing = self.prefix_db.db_state.get()
self.prefix_db.db_state.stage_delete((), existing.expanded)
self.prefix_db.db_state.stage_put((), (
self.prefix_db.db_state.stash_delete((), existing.expanded)
self.prefix_db.db_state.stash_put((), (
self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip,
self.utxo_flush_count, int(self.wall_time), self.catching_up, self._index_address_status, self.db_version,
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor,

View file

@ -3,13 +3,13 @@ from hub.env import Env
class BlockchainEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
blocking_channel_ids=None, filtering_channel_ids=None,
prometheus_port=None, cache_all_tx_hashes=None, blocking_channel_ids=None, filtering_channel_ids=None,
db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None,
index_address_status=None, rebuild_address_status_from_height=None,
daemon_ca_path=None, history_tx_cache_size=None):
daemon_ca_path=None, history_tx_cache_size=None,
db_disable_integrity_checks=False):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
blocking_channel_ids, filtering_channel_ids, index_address_status)
self.db_max_open_files = db_max_open_files
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \
@ -19,6 +19,7 @@ class BlockchainEnv(Env):
self.daemon_ca_path = daemon_ca_path if daemon_ca_path else None
self.history_tx_cache_size = history_tx_cache_size if history_tx_cache_size is not None else \
self.integer('HISTORY_TX_CACHE_SIZE', 4194304)
self.db_disable_integrity_checks = db_disable_integrity_checks
@classmethod
def contribute_to_arg_parser(cls, parser):
@ -30,6 +31,9 @@ class BlockchainEnv(Env):
default=env_daemon_url)
parser.add_argument('--daemon_ca_path', type=str, default='',
help='Path to the lbcd ca file, used for lbcd with ssl')
parser.add_argument('--db_disable_integrity_checks', action='store_true',
help="Disable verifications that no db operation breaks the ability to be rewound",
default=False)
parser.add_argument('--db_max_open_files', type=int, default=64,
help='This setting translates into the max_open_files option given to rocksdb. '
'A higher number will use more memory. Defaults to 64.')
@ -52,8 +56,9 @@ class BlockchainEnv(Env):
db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files,
max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit,
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses,
index_address_status=args.index_address_statuses,
hashX_history_cache_size=args.address_history_cache_size,
rebuild_address_status_from_height=args.rebuild_address_status_from_height,
daemon_ca_path=args.daemon_ca_path, history_tx_cache_size=args.history_tx_cache_size
daemon_ca_path=args.daemon_ca_path, history_tx_cache_size=args.history_tx_cache_size,
db_disable_integrity_checks=args.db_disable_integrity_checks
)

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,7 @@
import sys
import functools
import typing
import time
from dataclasses import dataclass
from struct import Struct
from hub.schema.claim import Claim
@ -192,3 +193,21 @@ class Block(typing.NamedTuple):
raw: bytes
header: bytes
transactions: typing.List[Tx]
@property
def decoded_header(self):
header = self.header
version = int.from_bytes(header[:4], byteorder='little')
ts = time.gmtime(int.from_bytes(header[100:104], byteorder='little'))
timestamp = f"{ts.tm_year}-{ts.tm_mon}-{ts.tm_mday}"
bits = int.from_bytes(header[104:108], byteorder='little')
nonce = int.from_bytes(header[108:112], byteorder='little')
return {
'version': version,
'prev_block_hash': header[4:36][::-1].hex(),
'merkle_root': header[36:68][::-1].hex(),
'claim_trie_root': header[68:100][::-1].hex(),
'timestamp': timestamp,
'bits': bits,
'nonce': nonce
}

View file

@ -37,7 +37,7 @@ class BlockchainService:
def open_db(self):
env = self.env
self.db = SecondaryDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status

View file

@ -8,16 +8,8 @@ SNAPSHOT_URL="${SNAPSHOT_URL:-}" #off by default. latest snapshot at https://lbr
if [[ "$HUB_COMMAND" == "scribe" ]] && [[ -n "$SNAPSHOT_URL" ]] && [[ ! -d /database/lbry-rocksdb ]]; then
files="$(ls)"
echo "Downloading hub snapshot from $SNAPSHOT_URL"
wget --no-verbose --trust-server-names --content-disposition "$SNAPSHOT_URL"
echo "Extracting snapshot..."
filename="$(grep -vf <(echo "$files") <(ls))" # finds the file that was not there before
case "$filename" in
*.tgz|*.tar.gz|*.tar.bz2 ) tar xvf "$filename" --directory /database ;;
*.zip ) unzip "$filename" -d /database/ ;;
* ) echo "Don't know how to extract ${filename}. SNAPSHOT COULD NOT BE LOADED" && exit 1 ;;
esac
rm "$filename"
echo "Downloading and extracting hub snapshot from $SNAPSHOT_URL"
wget --no-verbose -c "$SNAPSHOT_URL" -O - | tar x -C /database
fi
if [ -z "$HUB_COMMAND" ]; then

View file

@ -33,7 +33,7 @@ setup(
'certifi>=2021.10.08',
'colorama==0.3.7',
'cffi==1.13.2',
'protobuf==3.17.2',
'protobuf==3.18.3',
'msgpack==0.6.1',
'prometheus_client==0.7.1',
'coincurve==15.0.0',
@ -44,7 +44,8 @@ setup(
'filetype==1.0.9',
'grpcio==1.38.0',
'lbry-rocksdb==0.8.2',
'ujson==5.4.0'
'ujson==5.4.0',
'rehash==1.0.0'
],
extras_require={
'lint': ['pylint==2.10.0'],

View file

@ -1463,27 +1463,27 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
COIN = int(1E8)
self.assertEqual(self.conductor.spv_node.writer.height, 207)
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stage_put(
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stash_put(
(208, bytes.fromhex(claim_id1)), (0, 10 * COIN)
)
await self.generate(1)
self.assertEqual(self.conductor.spv_node.writer.height, 208)
self.assertEqual(1.7090807854206793, await get_trending_score(claim_id1))
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stage_put(
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stash_put(
(209, bytes.fromhex(claim_id1)), (10 * COIN, 100 * COIN)
)
await self.generate(1)
self.assertEqual(self.conductor.spv_node.writer.height, 209)
self.assertEqual(2.2437974397778886, await get_trending_score(claim_id1))
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stage_put(
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stash_put(
(309, bytes.fromhex(claim_id1)), (100 * COIN, 1000000 * COIN)
)
await self.generate(100)
self.assertEqual(self.conductor.spv_node.writer.height, 309)
self.assertEqual(5.157053472135866, await get_trending_score(claim_id1))
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stage_put(
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stash_put(
(409, bytes.fromhex(claim_id1)), (1000000 * COIN, 1 * COIN)
)

View file

@ -125,24 +125,24 @@ class TestRevertablePrefixDB(unittest.TestCase):
takeover_height = 10000000
self.assertIsNone(self.db.claim_takeover.get(name))
self.db.claim_takeover.stage_put((name,), (claim_hash1, takeover_height))
self.db.claim_takeover.stash_put((name,), (claim_hash1, takeover_height))
self.assertIsNone(self.db.claim_takeover.get(name))
self.assertEqual(10000000, self.db.claim_takeover.get_pending(name).height)
self.db.commit(10000000, b'\x00' * 32)
self.assertEqual(10000000, self.db.claim_takeover.get(name).height)
self.db.claim_takeover.stage_delete((name,), (claim_hash1, takeover_height))
self.db.claim_takeover.stage_put((name,), (claim_hash2, takeover_height + 1))
self.db.claim_takeover.stage_delete((name,), (claim_hash2, takeover_height + 1))
self.db.claim_takeover.stash_delete((name,), (claim_hash1, takeover_height))
self.db.claim_takeover.stash_put((name,), (claim_hash2, takeover_height + 1))
self.db.claim_takeover.stash_delete((name,), (claim_hash2, takeover_height + 1))
self.db.commit(10000001, b'\x01' * 32)
self.assertIsNone(self.db.claim_takeover.get(name))
self.db.claim_takeover.stage_put((name,), (claim_hash3, takeover_height + 2))
self.db.claim_takeover.stash_put((name,), (claim_hash3, takeover_height + 2))
self.db.commit(10000002, b'\x02' * 32)
self.assertEqual(10000002, self.db.claim_takeover.get(name).height)
self.db.claim_takeover.stage_delete((name,), (claim_hash3, takeover_height + 2))
self.db.claim_takeover.stage_put((name,), (claim_hash2, takeover_height + 3))
self.db.claim_takeover.stash_delete((name,), (claim_hash3, takeover_height + 2))
self.db.claim_takeover.stash_put((name,), (claim_hash2, takeover_height + 3))
self.db.commit(10000003, b'\x03' * 32)
self.assertEqual(10000003, self.db.claim_takeover.get(name).height)
@ -162,15 +162,15 @@ class TestRevertablePrefixDB(unittest.TestCase):
claim_hash2 = 20 * b'\x02'
claim_hash3 = 20 * b'\x03'
overflow_value = 0xffffffff
self.db.claim_expiration.stage_put((99, 999, 0), (claim_hash0, name))
self.db.claim_expiration.stage_put((100, 1000, 0), (claim_hash1, name))
self.db.claim_expiration.stage_put((100, 1001, 0), (claim_hash2, name))
self.db.claim_expiration.stage_put((101, 1002, 0), (claim_hash3, name))
self.db.claim_expiration.stage_put((overflow_value - 1, 1003, 0), (claim_hash3, name))
self.db.claim_expiration.stage_put((overflow_value, 1004, 0), (claim_hash3, name))
self.db.tx_num.stage_put((b'\x00' * 32,), (101,))
self.db.claim_takeover.stage_put((name,), (claim_hash3, 101))
self.db.db_state.stage_put((), (b'n?\xcf\x12\x99\xd4\xec]y\xc3\xa4\xc9\x1dbJJ\xcf\x9e.\x17=\x95\xa1\xa0POgvihuV', 0, 1, b'VuhivgOP\xa0\xa1\x95=\x17.\x9e\xcfJJb\x1d\xc9\xa4\xc3y]\xec\xd4\x99\x12\xcf?n', 1, 0, 1, 7, 1, -1, -1, 0, 0, 0))
self.db.claim_expiration.stash_put((99, 999, 0), (claim_hash0, name))
self.db.claim_expiration.stash_put((100, 1000, 0), (claim_hash1, name))
self.db.claim_expiration.stash_put((100, 1001, 0), (claim_hash2, name))
self.db.claim_expiration.stash_put((101, 1002, 0), (claim_hash3, name))
self.db.claim_expiration.stash_put((overflow_value - 1, 1003, 0), (claim_hash3, name))
self.db.claim_expiration.stash_put((overflow_value, 1004, 0), (claim_hash3, name))
self.db.tx_num.stash_put((b'\x00' * 32,), (101,))
self.db.claim_takeover.stash_put((name,), (claim_hash3, 101))
self.db.db_state.stash_put((), (b'n?\xcf\x12\x99\xd4\xec]y\xc3\xa4\xc9\x1dbJJ\xcf\x9e.\x17=\x95\xa1\xa0POgvihuV', 0, 1, b'VuhivgOP\xa0\xa1\x95=\x17.\x9e\xcfJJb\x1d\xc9\xa4\xc3y]\xec\xd4\x99\x12\xcf?n', 1, 0, 1, 7, 1, -1, -1, 0, 0, 0))
self.db.unsafe_commit()
state = self.db.db_state.get()
@ -217,9 +217,9 @@ class TestRevertablePrefixDB(unittest.TestCase):
tx_num = 101
for x in range(255):
claim_hash = 20 * chr(x).encode()
self.db.active_amount.stage_put((claim_hash, 1, 200, tx_num, 1), (100000,))
self.db.active_amount.stage_put((claim_hash, 1, 201, tx_num + 1, 1), (200000,))
self.db.active_amount.stage_put((claim_hash, 1, 202, tx_num + 2, 1), (300000,))
self.db.active_amount.stash_put((claim_hash, 1, 200, tx_num, 1), (100000,))
self.db.active_amount.stash_put((claim_hash, 1, 201, tx_num + 1, 1), (200000,))
self.db.active_amount.stash_put((claim_hash, 1, 202, tx_num + 2, 1), (300000,))
tx_num += 3
self.db.unsafe_commit()