BlockchainService base class for readers and the writer

-move base58.py and bip32.py into scribe.schema
-fix https://github.com/lbryio/scribe/issues/3
This commit is contained in:
Jack Robison 2022-03-21 22:47:11 -04:00
parent 94547d332c
commit 1badc5f38c
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
17 changed files with 260 additions and 298 deletions

View file

@ -1,12 +1,12 @@
Scribe maintains a [rocksdb](https://github.com/lbryio/lbry-rocksdb) database containing the [LBRY blockchain](https://github.com/lbryio/lbrycrd) and provides an interface for python based services that utilize the blockchain data in an ongoing manner. Scribe includes implementations of this interface to provide an electrum server for thin-wallet clients (such as lbry-sdk) and to maintain an elasticsearch database of metadata for claims in the LBRY blockchain.
* Uses Python 3.7-3.8
* Uses Python 3.7-3.9 (3.10 probably works but hasn't yet been tested)
* Protobuf schema for encoding and decoding metadata stored on the blockchain ([scribe.schema](https://github.com/lbryio/scribe/tree/master/scribe/schema)).
* Blockchain processor that maintains an up to date rocksdb database ([scribe.blockchain](https://github.com/lbryio/scribe/tree/master/scribe/blockchain))
* Rocksdb based database containing the blockchain data ([scribe.db](https://github.com/lbryio/scribe/tree/master/scribe/db))
* Interface for python services to implement in order for them maintain a read only view of the blockchain data ([scribe.reader.interface](https://github.com/lbryio/scribe/tree/master/scribe/reader/interface.py))
* Electrum based server for thin-wallet clients like lbry-sdk ([scribe.reader.hub_server](https://github.com/lbryio/scribe/tree/master/scribe/reader/hub_server.py))
* Elasticsearch sync utility to index all the claim metadata in the blockchain into an easily searchable form ([scribe.reader.elastic_sync](https://github.com/lbryio/scribe/tree/master/scribe/reader/elastic_sync.py))
* Blockchain processor that maintains an up to date rocksdb database ([scribe.blockchain.service](https://github.com/lbryio/scribe/tree/master/scribe/blockchain/service.py))
* [Rocksdb](https://github.com/lbryio/lbry-rocksdb/) based database containing the blockchain data ([scribe.db](https://github.com/lbryio/scribe/tree/master/scribe/db))
* Interface for python services to implement in order for them maintain a read only view of the blockchain data ([scribe.service](https://github.com/lbryio/scribe/tree/master/scribe/service.py))
* Electrum based server for thin-wallet clients like lbry-sdk ([scribe.hub.service](https://github.com/lbryio/scribe/tree/master/scribe/hub/service.py))
* Elasticsearch sync utility to index all the claim metadata in the blockchain into an easily searchable form ([scribe.elasticsearch.service](https://github.com/lbryio/scribe/tree/master/scribe/elasticsearch/service.py))
## Installation

View file

@ -1,5 +1,7 @@
FROM debian:11-slim
STOPSIGNAL SIGINT
ARG user=lbry
ARG db_dir=/database
ARG projects_dir=/home/$user
@ -49,5 +51,5 @@ ENV MAX_SEND=1000000000000000000
ENV MAX_RECEIVE=1000000000000000000
COPY ./docker/wallet_server_entrypoint.sh /entrypoint.sh
COPY ./docker/scribe_entrypoint.sh /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]

20
docker/deploy_scribe_dev.sh Executable file
View file

@ -0,0 +1,20 @@
#!/usr/bin/env bash
# usage: deploy_scribe_dev.sh <host to update>
TARGET_HOST=$1
DOCKER_DIR=`dirname $0`
SCRIBE_DIR=`dirname $DOCKER_DIR`
# build the image
docker build -f $DOCKER_DIR/Dockerfile.scribe -t lbry/scribe:development $SCRIBE_DIR
IMAGE=`docker image inspect lbry/scribe:development | sed -n "s/^.*Id\":\s*\"sha256:\s*\(\S*\)\".*$/\1/p"`
# push the image to the server
ssh $TARGET_HOST docker image prune --force
docker save $IMAGE | ssh $TARGET_HOST docker load
ssh $TARGET_HOST docker tag $IMAGE lbry/scribe:development
## restart the wallet server
ssh $TARGET_HOST docker-compose down
ssh $TARGET_HOST SCRIBE_TAG="development" docker-compose up -d

View file

@ -10,8 +10,8 @@ if [ -z "$HUB_COMMAND" ]; then
fi
case "$HUB_COMMAND" in
scribe ) /home/lbry/.local/bin/scribe "$@" ;;
scribe-hub ) /home/lbry/.local/bin/scribe-hub "$@" ;;
scribe-elastic-sync ) /home/lbry/.local/bin/scribe-elastic-sync ;;
scribe ) exec /home/lbry/.local/bin/scribe "$@" ;;
scribe-hub ) exec /home/lbry/.local/bin/scribe-hub "$@" ;;
scribe-elastic-sync ) exec /home/lbry/.local/bin/scribe-elastic-sync ;;
* ) "HUB_COMMAND env variable must be scribe, scribe-hub, or scribe-elastic-sync" && exit 1 ;;
esac

View file

@ -4,8 +4,8 @@ import typing
from typing import List
from hashlib import sha256
from decimal import Decimal
from scribe.base58 import Base58
from scribe.bip32 import PublicKey
from scribe.schema.base58 import Base58
from scribe.schema.bip32 import PublicKey
from scribe.common import hash160, hash_to_hex_str, double_sha256
from scribe.blockchain.transaction import TxOutput, TxInput, Block
from scribe.blockchain.transaction.deserializer import Deserializer

View file

@ -34,9 +34,10 @@ class Prefetcher:
self.ave_size = self.min_cache_size // 10
self.polling_delay = 0.5
async def main_loop(self, bp_height):
async def main_loop(self, bp_height, started: asyncio.Event):
"""Loop forever polling for more blocks."""
await self.reset_height(bp_height)
started.set()
try:
while True:
# Sleep a while if there is nothing to prefetch

View file

@ -1,24 +1,21 @@
import logging
import time
import asyncio
import typing
import signal
from bisect import bisect_right
from struct import pack
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Optional, List, Tuple, Set, DefaultDict, Dict
from prometheus_client import Gauge, Histogram
from collections import defaultdict
from scribe import __version__, PROMETHEUS_NAMESPACE
from scribe.db.db import HubDB
from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS
from scribe.blockchain.daemon import LBCDaemon
from scribe.blockchain.transaction import Tx, TxOutput, TxInput
from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block
from scribe.blockchain.prefetcher import Prefetcher
from scribe.schema.url import normalize_name
from scribe.service import BlockchainService
if typing.TYPE_CHECKING:
from scribe.env import Env
from scribe.db.revertable import RevertableOpStack
@ -56,7 +53,7 @@ class StagedClaimtrieItem(typing.NamedTuple):
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_writer"
class BlockProcessor:
class BlockchainProcessorService(BlockchainService):
"""Process blocks and update the DB state to match.
Employ a prefetcher to prefetch blocks in batches for processing.
@ -74,20 +71,11 @@ class BlockProcessor:
)
def __init__(self, env: 'Env'):
self.cancellable_tasks = []
self.env = env
self.state_lock = asyncio.Lock()
super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor')
self.daemon = LBCDaemon(env.coin, env.daemon_url)
self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor')
self.db = HubDB(
env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
max_open_files=env.db_max_open_files, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._chain_executor
)
self.shutdown_event = asyncio.Event()
self.coin = env.coin
self.wait_for_blocks_duration = 0.1
self._ready_to_stop = asyncio.Event()
self._caught_up_event: Optional[asyncio.Event] = None
self.height = 0
@ -96,7 +84,7 @@ class BlockProcessor:
self.blocks_event = asyncio.Event()
self.prefetcher = Prefetcher(self.daemon, env.coin, self.blocks_event)
self.logger = logging.getLogger(__name__)
# self.logger = logging.getLogger(__name__)
# Meta
self.touched_hashXs: Set[bytes] = set()
@ -163,9 +151,6 @@ class BlockProcessor:
self.pending_transaction_num_mapping: Dict[bytes, int] = {}
self.pending_transactions: Dict[int, bytes] = {}
self._stopping = False
self._ready_to_stop = asyncio.Event()
async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that
# cancellations from shutdown don't lose work - when the task
@ -173,13 +158,13 @@ class BlockProcessor:
# Take the state lock to be certain in-memory state is
# consistent and not being updated elsewhere.
async def run_in_thread_locked():
async with self.state_lock:
return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args)
async with self.lock:
return await asyncio.get_event_loop().run_in_executor(self._executor, func, *args)
return await asyncio.shield(run_in_thread_locked())
async def run_in_thread(self, func, *args):
async def run_in_thread():
return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args)
return await asyncio.get_event_loop().run_in_executor(self._executor, func, *args)
return await asyncio.shield(run_in_thread())
async def refresh_mempool(self):
@ -195,13 +180,13 @@ class BlockProcessor:
mempool_prefix.stage_delete((tx_hash,), (raw_tx,))
unsafe_commit()
async with self.state_lock:
async with self.lock:
current_mempool = await self.run_in_thread(fetch_mempool, self.db.prefix_db.mempool_tx)
_to_put = []
try:
mempool_hashes = await self.daemon.mempool_hashes()
except (TypeError, RPCError):
self.logger.warning("failed to get mempool tx hashes, reorg underway?")
except (TypeError, RPCError) as err:
self.log.exception("failed to get mempool tx hashes, reorg underway? (%s)", err)
return
for hh in mempool_hashes:
tx_hash = bytes.fromhex(hh)[::-1]
@ -211,7 +196,7 @@ class BlockProcessor:
try:
_to_put.append((tx_hash, bytes.fromhex(await self.daemon.getrawtransaction(hh))))
except (TypeError, RPCError):
self.logger.warning("failed to get a mempool tx, reorg underway?")
self.log.warning("failed to get a mempool tx, reorg underway?")
return
if current_mempool:
if bytes.fromhex(await self.daemon.getbestblockhash())[::-1] != self.coin.header_hash(self.db.headers[-1]):
@ -243,17 +228,17 @@ class BlockProcessor:
start = time.perf_counter()
start_count = self.tx_count
txo_count = await self.run_in_thread_with_lock(self.advance_block, block)
self.logger.info(
self.log.info(
"writer advanced to %i (%i txs, %i txos) in %0.3fs", self.height, self.tx_count - start_count,
txo_count, time.perf_counter() - start
)
if self.height == self.coin.nExtendedClaimExpirationForkHeight:
self.logger.warning(
self.log.warning(
"applying extended claim expiration fork on claims accepted by, %i", self.height
)
await self.run_in_thread_with_lock(self.db.apply_expiration_extension_fork)
except:
self.logger.exception("advance blocks failed")
self.log.exception("advance blocks failed")
raise
processed_time = time.perf_counter() - total_start
self.block_count_metric.set(self.height)
@ -271,29 +256,29 @@ class BlockProcessor:
if self.db.get_block_hash(height)[::-1].hex() == block_hash:
break
count += 1
self.logger.warning(f"blockchain reorg detected at {self.height}, unwinding last {count} blocks")
self.log.warning(f"blockchain reorg detected at {self.height}, unwinding last {count} blocks")
try:
assert count > 0, count
for _ in range(count):
await self.run_in_thread_with_lock(self.backup_block)
self.logger.info(f'backed up to height {self.height:,d}')
self.log.info(f'backed up to height {self.height:,d}')
if self.env.cache_all_claim_txos:
await self.db._read_claim_txos() # TODO: don't do this
await self.prefetcher.reset_height(self.height)
self.reorg_count_metric.inc()
except:
self.logger.exception("reorg blocks failed")
self.log.exception("reorg blocks failed")
raise
finally:
self.logger.info("backed up to block %i", self.height)
self.log.info("backed up to block %i", self.height)
else:
# It is probably possible but extremely rare that what
# bitcoind returns doesn't form a chain because it
# reorg-ed the chain as it was processing the batched
# block hash requests. Should this happen it's simplest
# just to reset the prefetcher and try again.
self.logger.warning('daemon blocks do not form a chain; '
self.log.warning('daemon blocks do not form a chain; '
'resetting the prefetcher')
await self.prefetcher.reset_height(self.height)
@ -365,7 +350,7 @@ class BlockProcessor:
# else:
# print("\tfailed to validate signed claim")
except:
self.logger.exception(f"error validating channel signature for %s:%i", tx_hash[::-1].hex(), nout)
self.log.exception(f"error validating channel signature for %s:%i", tx_hash[::-1].hex(), nout)
if txo.is_claim: # it's a root claim
root_tx_num, root_idx = tx_num, nout
@ -375,7 +360,7 @@ class BlockProcessor:
# print(f"\tthis is a wonky tx, contains unlinked claim update {claim_hash.hex()}")
return
if normalized_name != spent_claims[claim_hash][2]:
self.logger.warning(
self.log.warning(
f"{tx_hash[::-1].hex()} contains mismatched name for claim update {claim_hash.hex()}"
)
return
@ -1280,7 +1265,7 @@ class BlockProcessor:
self.touched_claims_to_send_es.difference_update(self.removed_claim_hashes)
self.removed_claims_to_send_es.update(self.removed_claim_hashes)
def advance_block(self, block):
def advance_block(self, block: Block):
height = self.height + 1
# print("advance ", height)
# Use local vars for speed in the loops
@ -1454,7 +1439,7 @@ class BlockProcessor:
self.removed_claims_to_send_es.update(touched_and_deleted.deleted_claims)
# self.db.assert_flushed(self.flush_data())
self.logger.info("backup block %i", self.height)
self.log.info("backup block %i", self.height)
# Check and update self.tip
self.db.tx_counts.pop()
@ -1507,7 +1492,7 @@ class BlockProcessor:
self.db.assert_db_state()
elapsed = self.db.last_flush - start_time
self.logger.warning(f'backup flush #{self.db.hist_flush_count:,d} took {elapsed:.1f}s. '
self.log.warning(f'backup flush #{self.db.hist_flush_count:,d} took {elapsed:.1f}s. '
f'Height {self.height:,d} txs: {self.tx_count:,d} ({tx_delta:+,d})')
def add_utxo(self, tx_hash: bytes, tx_num: int, nout: int, txout: 'TxOutput') -> Optional[bytes]:
@ -1535,7 +1520,7 @@ class BlockProcessor:
hashX = hashX_value.hashX
utxo_value = self.db.prefix_db.utxo.get(hashX, txin_num, nout)
if not utxo_value:
self.logger.warning(
self.log.warning(
"%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), nout, hash_to_hex_str(hashX)
)
raise ChainError(
@ -1551,8 +1536,9 @@ class BlockProcessor:
self.touched_hashXs.add(hashX)
return hashX
async def process_blocks_and_mempool_forever(self):
async def process_blocks_and_mempool_forever(self, caught_up_event):
"""Loop forever processing blocks as they arrive."""
self._caught_up_event = caught_up_event
try:
while not self._stopping:
if self.height == self.daemon.cached_height():
@ -1573,7 +1559,7 @@ class BlockProcessor:
except asyncio.CancelledError:
raise
except Exception:
self.logger.exception("error while updating mempool txs")
self.log.exception("error while updating mempool txs")
raise
else:
try:
@ -1581,13 +1567,13 @@ class BlockProcessor:
except asyncio.CancelledError:
raise
except Exception:
self.logger.exception("error while processing txs")
self.log.exception("error while processing txs")
raise
finally:
self._ready_to_stop.set()
async def _first_caught_up(self):
self.logger.info(f'caught up to height {self.height}')
self.log.info(f'caught up to height {self.height}')
# Flush everything but with first_sync->False state.
first_sync = self.db.first_sync
self.db.first_sync = False
@ -1601,86 +1587,19 @@ class BlockProcessor:
await self.run_in_thread_with_lock(flush)
if first_sync:
self.logger.info(f'{__version__} synced to '
self.log.info(f'{__version__} synced to '
f'height {self.height:,d}, halting here.')
self.shutdown_event.set()
async def open(self):
self.db.open_db()
def _iter_start_tasks(self):
self.height = self.db.db_height
self.tip = self.db.db_tip
self.tx_count = self.db.db_tx_count
await self.db.initialize_caches()
yield self.daemon.height()
yield self.start_cancellable(self.prefetcher.main_loop, self.height)
yield self.start_cancellable(self.process_blocks_and_mempool_forever)
async def fetch_and_process_blocks(self, caught_up_event):
"""Fetch, process and index blocks from the daemon.
Sets caught_up_event when first caught up. Flushes to disk
and shuts down cleanly if cancelled.
This is mainly because if, during initial sync ElectrumX is
asked to shut down when a large number of blocks have been
processed but not written to disk, it should write those to
disk before exiting, as otherwise a significant amount of work
could be lost.
"""
await self.open()
self._caught_up_event = caught_up_event
try:
await asyncio.wait([
self.prefetcher.main_loop(self.height),
self.process_blocks_and_mempool_forever()
])
except asyncio.CancelledError:
raise
except:
self.logger.exception("Block processing failed!")
raise
finally:
# Shut down block processing
self.logger.info('closing the DB for a clean shutdown...')
self._chain_executor.shutdown(wait=True)
self.db.close()
async def start(self):
self._stopping = False
env = self.env
self.logger.info(f'software version: {__version__}')
self.logger.info(f'event loop policy: {env.loop_policy}')
self.logger.info(f'reorg limit is {env.reorg_limit:,d} blocks')
await self.daemon.height()
def _start_cancellable(run, *args):
_flag = asyncio.Event()
self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag)))
return _flag.wait()
await _start_cancellable(self.fetch_and_process_blocks)
async def stop(self):
self._stopping = True
await self._ready_to_stop.wait()
for task in reversed(self.cancellable_tasks):
task.cancel()
await asyncio.wait(self.cancellable_tasks)
self.shutdown_event.set()
await self.daemon.close()
def run(self):
loop = asyncio.get_event_loop()
loop.set_default_executor(self._chain_executor)
def __exit():
raise SystemExit()
try:
loop.add_signal_handler(signal.SIGINT, __exit)
loop.add_signal_handler(signal.SIGTERM, __exit)
loop.run_until_complete(self.start())
loop.run_until_complete(self.shutdown_event.wait())
except (SystemExit, KeyboardInterrupt):
pass
finally:
loop.run_until_complete(self.stop())
def _iter_stop_tasks(self):
yield self._ready_to_stop.wait()
yield self._stop_cancellable_tasks()
yield self.daemon.close()

View file

@ -2,8 +2,9 @@ import logging
import traceback
import argparse
from scribe.env import Env
from scribe.blockchain.block_processor import BlockProcessor
from scribe.reader import BlockchainReaderServer, ElasticWriter
from scribe.blockchain.service import BlockchainProcessorService
from scribe.hub.service import HubServerService
from scribe.elasticsearch.service import ElasticSyncService
def get_arg_parser(name):
@ -24,7 +25,7 @@ def run_writer_forever():
setup_logging()
args = get_arg_parser('scribe').parse_args()
try:
block_processor = BlockProcessor(Env.from_arg_parser(args))
block_processor = BlockchainProcessorService(Env.from_arg_parser(args))
block_processor.run()
except Exception:
traceback.print_exc()
@ -38,7 +39,7 @@ def run_server_forever():
args = get_arg_parser('scribe-hub').parse_args()
try:
server = BlockchainReaderServer(Env.from_arg_parser(args))
server = HubServerService(Env.from_arg_parser(args))
server.run()
except Exception:
traceback.print_exc()
@ -54,7 +55,7 @@ def run_es_sync_forever():
args = parser.parse_args()
try:
server = ElasticWriter(Env.from_arg_parser(args))
server = ElasticSyncService(Env.from_arg_parser(args))
server.run(args.reindex)
except Exception:
traceback.print_exc()

View file

@ -2,25 +2,22 @@ import os
import json
import typing
import asyncio
import logging
from collections import defaultdict
from elasticsearch import AsyncElasticsearch, NotFoundError
from elasticsearch.helpers import async_streaming_bulk
from scribe.schema.result import Censor
from scribe.service import BlockchainReaderService
from scribe.db.revertable import RevertableOp
from scribe.db.common import TrendingNotification, DB_PREFIXES
from scribe.elasticsearch.notifier_protocol import ElasticNotifierProtocol
from scribe.elasticsearch.search import IndexVersionMismatch, expand_query
from scribe.elasticsearch.constants import ALL_FIELDS, INDEX_DEFAULT_SETTINGS
from scribe.elasticsearch.fast_ar_trending import FAST_AR_TRENDING_SCRIPT
from scribe.reader import BaseBlockchainReader
from scribe.db.revertable import RevertableOp
from scribe.db.common import TrendingNotification, DB_PREFIXES
log = logging.getLogger(__name__)
class ElasticWriter(BaseBlockchainReader):
class ElasticSyncService(BlockchainReaderService):
VERSION = 1
def __init__(self, env):
@ -342,10 +339,10 @@ class ElasticWriter(BaseBlockchainReader):
def _iter_start_tasks(self):
yield self.read_es_height()
yield self.start_index()
yield self._start_cancellable(self.run_es_notifier)
yield self.start_cancellable(self.run_es_notifier)
yield self.reindex(force=self._force_reindex)
yield self.catch_up()
yield self._start_cancellable(self.refresh_blocks_forever)
yield self.start_cancellable(self.refresh_blocks_forever)
def _iter_stop_tasks(self):
yield self._stop_cancellable_tasks()
@ -363,7 +360,7 @@ class ElasticWriter(BaseBlockchainReader):
self._force_reindex = False
async def _reindex(self):
async with self._lock:
async with self.lock:
self.log.info("reindexing %i claims (estimate)", self.db.prefix_db.claim_to_txo.estimate_num_keys())
await self.delete_index()
res = await self.sync_client.indices.create(self.index, INDEX_DEFAULT_SETTINGS, ignore=400)

View file

@ -1,13 +1,15 @@
import asyncio
from scribe.blockchain.daemon import LBCDaemon
from scribe.reader import BaseBlockchainReader
from scribe.elasticsearch import ElasticNotifierClientProtocol
from scribe.hub.session import SessionManager
from scribe.hub.mempool import MemPool
from scribe.hub.udp import StatusServer
from scribe.service import BlockchainReaderService
from scribe.elasticsearch import ElasticNotifierClientProtocol
class BlockchainReaderServer(BaseBlockchainReader):
class HubServerService(BlockchainReaderService):
def __init__(self, env):
super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker')
self.notifications_to_send = []
@ -89,11 +91,11 @@ class BlockchainReaderServer(BaseBlockchainReader):
def _iter_start_tasks(self):
yield self.start_status_server()
yield self._start_cancellable(self.es_notification_client.maintain_connection)
yield self._start_cancellable(self.receive_es_notifications)
yield self._start_cancellable(self.refresh_blocks_forever)
yield self.start_cancellable(self.es_notification_client.maintain_connection)
yield self.start_cancellable(self.receive_es_notifications)
yield self.start_cancellable(self.refresh_blocks_forever)
yield self.session_manager.search_index.start()
yield self._start_cancellable(self.session_manager.serve, self.mempool)
yield self.start_cancellable(self.session_manager.serve, self.mempool)
def _iter_stop_tasks(self):
yield self.status_server.stop()

View file

@ -12,11 +12,11 @@ from bisect import bisect_right
from asyncio import Event, sleep
from collections import defaultdict, namedtuple
from contextlib import suppress
from functools import partial, lru_cache
from functools import partial
from elasticsearch import ConnectionTimeout
from prometheus_client import Counter, Info, Histogram, Gauge
from scribe.schema.result import Outputs
from scribe.base58 import Base58Error
from scribe.schema.base58 import Base58Error
from scribe.error import ResolveCensoredError, TooManyClaimSearchParametersError
from scribe import __version__, PROTOCOL_MIN, PROTOCOL_MAX, PROMETHEUS_NAMESPACE
from scribe.build_info import BUILD, COMMIT_HASH, DOCKER_TAG

View file

@ -1,3 +0,0 @@
from scribe.reader.interface import BaseBlockchainReader
from scribe.reader.hub_server import BlockchainReaderServer
from scribe.reader.elastic_sync import ElasticWriter

View file

@ -7,12 +7,12 @@ from string import ascii_letters
from decimal import Decimal, ROUND_UP
from google.protobuf.json_format import MessageToDict
from scribe.base58 import Base58, b58_encode
from scribe.schema.base58 import Base58, b58_encode
from scribe.error import MissingPublishedFileError, EmptyPublishedFileError
from scribe.schema.mime_types import guess_media_type
from scribe.schema.base import Metadata, BaseMessageList
from scribe.schema.tags import clean_tags, normalize_tag
from scribe.schema.tags import normalize_tag
from scribe.schema.types.v2.claim_pb2 import (
Fee as FeeMessage,
Location as LocationMessage,

View file

@ -8,7 +8,7 @@ from coincurve.utils import (
pem_to_der, lib as libsecp256k1, ffi as libsecp256k1_ffi
)
from coincurve.ecdsa import CDATA_SIG_LENGTH
from scribe.base58 import Base58
from scribe.schema.base58 import Base58
if (sys.version_info.major, sys.version_info.minor) > (3, 7):

View file

@ -4,31 +4,35 @@ import typing
import signal
from concurrent.futures.thread import ThreadPoolExecutor
from prometheus_client import Gauge, Histogram
from scribe import PROMETHEUS_NAMESPACE, __version__
from scribe.common import HISTOGRAM_BUCKETS
from scribe.db.prefixes import DBState
from scribe import __version__, PROMETHEUS_NAMESPACE
from scribe.env import Env
from scribe.db import HubDB
from scribe.reader.prometheus import PrometheusServer
from scribe.db.prefixes import DBState
from scribe.common import HISTOGRAM_BUCKETS
from scribe.metrics import PrometheusServer
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_reader"
class BlockchainReaderInterface:
async def poll_for_changes(self):
"""
Detect and handle if the db has advanced to a new block or unwound during a chain reorganization
If a reorg is detected, this will first unwind() to the branching height and then advance() forward
to the new block(s).
"""
raise NotImplementedError()
def clear_caches(self):
"""
Called after finished advancing, used for invalidating caches
"""
pass
class BlockchainService:
"""
Base class for blockchain readers and the writer
"""
def __init__(self, env: Env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'scribe'):
self.env = env
self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
self.shutdown_event = asyncio.Event()
self.cancellable_tasks = []
self._thread_workers = thread_workers
self._thread_prefix = thread_prefix
self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix)
self.lock = asyncio.Lock()
self.last_state: typing.Optional[DBState] = None
self.db = HubDB(
env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor
)
self._stopping = False
def advance(self, height: int):
"""
@ -44,8 +48,84 @@ class BlockchainReaderInterface:
"""
raise NotImplementedError()
def start_cancellable(self, run, *args):
_flag = asyncio.Event()
self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag)))
return _flag.wait()
class BaseBlockchainReader(BlockchainReaderInterface):
def _iter_start_tasks(self):
raise NotImplementedError()
def _iter_stop_tasks(self):
yield self._stop_cancellable_tasks()
async def _stop_cancellable_tasks(self):
async with self.lock:
while self.cancellable_tasks:
t = self.cancellable_tasks.pop()
if not t.done():
t.cancel()
async def start(self):
if not self._executor:
self._executor = ThreadPoolExecutor(self._thread_workers, thread_name_prefix=self._thread_prefix)
self.db._executor = self._executor
env = self.env
# min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
self.log.info(f'software version: {__version__}')
# self.log.info(f'supported protocol versions: {min_str}-{max_str}')
self.log.info(f'reorg limit is {env.reorg_limit:,d} blocks')
self.db.open_db()
self.log.info(f'initializing caches')
await self.db.initialize_caches()
self.last_state = self.db.read_db_state()
self.log.info(f'opened db at block {self.db.db_height}')
for start_task in self._iter_start_tasks():
await start_task
async def stop(self):
self.log.info("stopping")
self._stopping = True
for stop_task in self._iter_stop_tasks():
await stop_task
self.db.close()
self._executor.shutdown(wait=True)
self._executor = None
self.shutdown_event.set()
async def _run(self):
try:
await self.start()
self.log.info("finished start(), waiting for shutdown event")
await self.shutdown_event.wait()
except (SystemExit, KeyboardInterrupt, asyncio.CancelledError):
self.log.warning("exiting")
self._stopping = True
except Exception as err:
self.log.exception("unexpected fatal error: %s", err)
self._stopping = True
def run(self):
def __exit():
raise SystemExit()
loop = asyncio.get_event_loop()
loop.set_default_executor(self._executor)
loop.add_signal_handler(signal.SIGINT, __exit)
loop.add_signal_handler(signal.SIGTERM, __exit)
try:
loop.run_until_complete(self._run())
finally:
loop.run_until_complete(self.stop())
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_reader"
class BlockchainReaderService(BlockchainService):
block_count_metric = Gauge(
"block_count", "Number of processed blocks", namespace=NAMESPACE
)
@ -57,23 +137,56 @@ class BaseBlockchainReader(BlockchainReaderInterface):
)
def __init__(self, env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'blockchain-reader'):
self.env = env
self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
self.shutdown_event = asyncio.Event()
self.cancellable_tasks = []
self._thread_workers = thread_workers
self._thread_prefix = thread_prefix
self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix)
self.db = HubDB(
env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor
)
self.last_state: typing.Optional[DBState] = None
super().__init__(env, secondary_name, thread_workers, thread_prefix)
self._refresh_interval = 0.1
self._lock = asyncio.Lock()
self.prometheus_server: typing.Optional[PrometheusServer] = None
async def poll_for_changes(self):
"""
Detect and handle if the db has advanced to a new block or unwound during a chain reorganization
If a reorg is detected, this will first unwind() to the branching height and then advance() forward
to the new block(s).
"""
await asyncio.get_event_loop().run_in_executor(self._executor, self._detect_changes)
def clear_caches(self):
"""
Called after finished advancing, used for invalidating caches
"""
pass
def advance(self, height: int):
"""
Called when advancing to the given block height
Callbacks that look up new values from the added block can be put here
"""
tx_count = self.db.prefix_db.tx_count.get(height).tx_count
assert tx_count not in self.db.tx_counts, f'boom {tx_count} in {len(self.db.tx_counts)} tx counts'
assert len(self.db.tx_counts) == height, f"{len(self.db.tx_counts)} != {height}"
prev_count = self.db.tx_counts[-1]
self.db.tx_counts.append(tx_count)
if self.db._cache_all_tx_hashes:
for tx_num in range(prev_count, tx_count):
tx_hash = self.db.prefix_db.tx_hash.get(tx_num).tx_hash
self.db.total_transactions.append(tx_hash)
self.db.tx_num_mapping[tx_hash] = tx_count
assert len(self.db.total_transactions) == tx_count, f"{len(self.db.total_transactions)} vs {tx_count}"
self.db.headers.append(self.db.prefix_db.header.get(height, deserialize_value=False))
def unwind(self):
"""
Go backwards one block
"""
prev_count = self.db.tx_counts.pop()
tx_count = self.db.tx_counts[-1]
self.db.headers.pop()
if self.db._cache_all_tx_hashes:
for _ in range(prev_count - tx_count):
self.db.tx_num_mapping.pop(self.db.total_transactions.pop())
assert len(self.db.total_transactions) == tx_count, f"{len(self.db.total_transactions)} vs {tx_count}"
def _detect_changes(self):
try:
self.db.prefix_db.try_catch_up_with_primary()
@ -115,19 +228,10 @@ class BaseBlockchainReader(BlockchainReaderInterface):
self.db.filtering_channel_hashes
)
async def poll_for_changes(self):
"""
Detect and handle if the db has advanced to a new block or unwound during a chain reorganization
If a reorg is detected, this will first unwind() to the branching height and then advance() forward
to the new block(s).
"""
await asyncio.get_event_loop().run_in_executor(self._executor, self._detect_changes)
async def refresh_blocks_forever(self, synchronized: asyncio.Event):
while True:
try:
async with self._lock:
async with self.lock:
await self.poll_for_changes()
except asyncio.CancelledError:
raise
@ -137,80 +241,15 @@ class BaseBlockchainReader(BlockchainReaderInterface):
await asyncio.sleep(self._refresh_interval)
synchronized.set()
def advance(self, height: int):
tx_count = self.db.prefix_db.tx_count.get(height).tx_count
assert tx_count not in self.db.tx_counts, f'boom {tx_count} in {len(self.db.tx_counts)} tx counts'
assert len(self.db.tx_counts) == height, f"{len(self.db.tx_counts)} != {height}"
prev_count = self.db.tx_counts[-1]
self.db.tx_counts.append(tx_count)
if self.db._cache_all_tx_hashes:
for tx_num in range(prev_count, tx_count):
tx_hash = self.db.prefix_db.tx_hash.get(tx_num).tx_hash
self.db.total_transactions.append(tx_hash)
self.db.tx_num_mapping[tx_hash] = tx_count
assert len(self.db.total_transactions) == tx_count, f"{len(self.db.total_transactions)} vs {tx_count}"
self.db.headers.append(self.db.prefix_db.header.get(height, deserialize_value=False))
def unwind(self):
prev_count = self.db.tx_counts.pop()
tx_count = self.db.tx_counts[-1]
self.db.headers.pop()
if self.db._cache_all_tx_hashes:
for _ in range(prev_count - tx_count):
self.db.tx_num_mapping.pop(self.db.total_transactions.pop())
assert len(self.db.total_transactions) == tx_count, f"{len(self.db.total_transactions)} vs {tx_count}"
def _start_cancellable(self, run, *args):
_flag = asyncio.Event()
self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag)))
return _flag.wait()
def _iter_start_tasks(self):
yield self._start_cancellable(self.refresh_blocks_forever)
self.block_count_metric.set(self.last_state.height)
yield self.start_prometheus()
yield self.start_cancellable(self.refresh_blocks_forever)
def _iter_stop_tasks(self):
yield self.stop_prometheus()
yield self._stop_cancellable_tasks()
async def _stop_cancellable_tasks(self):
async with self._lock:
while self.cancellable_tasks:
t = self.cancellable_tasks.pop()
if not t.done():
t.cancel()
async def start(self):
if not self._executor:
self._executor = ThreadPoolExecutor(self._thread_workers, thread_name_prefix=self._thread_prefix)
self.db._executor = self._executor
env = self.env
# min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
self.log.info(f'software version: {__version__}')
# self.log.info(f'supported protocol versions: {min_str}-{max_str}')
self.log.info(f'event loop policy: {env.loop_policy}')
self.log.info(f'reorg limit is {env.reorg_limit:,d} blocks')
self.db.open_db()
self.log.info(f'initializing caches')
await self.db.initialize_caches()
self.last_state = self.db.read_db_state()
self.log.info(f'opened db at block {self.last_state.height}')
self.block_count_metric.set(self.last_state.height)
await self.start_prometheus()
for start_task in self._iter_start_tasks():
await start_task
self.log.info("finished starting")
async def stop(self):
for stop_task in self._iter_stop_tasks():
await stop_task
await self.stop_prometheus()
self.db.close()
self._executor.shutdown(wait=True)
self._executor = None
self.shutdown_event.set()
async def start_prometheus(self):
if not self.prometheus_server and self.env.prometheus_port:
self.prometheus_server = PrometheusServer()
@ -220,19 +259,3 @@ class BaseBlockchainReader(BlockchainReaderInterface):
if self.prometheus_server:
await self.prometheus_server.stop()
self.prometheus_server = None
def run(self):
loop = asyncio.get_event_loop()
loop.set_default_executor(self._executor)
def __exit():
raise SystemExit()
try:
loop.add_signal_handler(signal.SIGINT, __exit)
loop.add_signal_handler(signal.SIGTERM, __exit)
loop.run_until_complete(self.start())
loop.run_until_complete(self.shutdown_event.wait())
except (SystemExit, KeyboardInterrupt):
pass
finally:
loop.run_until_complete(self.stop())