refactoring, fix reconnecting to notifier

This commit is contained in:
Jack Robison 2022-03-09 17:26:20 -05:00
parent 0dee6ca226
commit c5dc8d5cad
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
10 changed files with 190 additions and 189 deletions

View file

@ -3,7 +3,6 @@ import time
import asyncio
import typing
import signal
from bisect import bisect_right
from struct import pack
from concurrent.futures.thread import ThreadPoolExecutor
@ -11,18 +10,15 @@ from typing import Optional, List, Tuple, Set, DefaultDict, Dict
from prometheus_client import Gauge, Histogram
from collections import defaultdict
from scribe.schema.url import normalize_name
from scribe import __version__, PROMETHEUS_NAMESPACE
from scribe.blockchain.daemon import LBCDaemon
from scribe.blockchain.transaction import Tx, TxOutput, TxInput
from scribe.db.db import HubDB
from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from scribe.common import hash_to_hex_str, hash160, RPCError
from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS
from scribe.blockchain.daemon import LBCDaemon
from scribe.blockchain.transaction import Tx, TxOutput, TxInput
from scribe.blockchain.prefetcher import Prefetcher
from scribe.schema.url import normalize_name
if typing.TYPE_CHECKING:
from scribe.env import Env
from scribe.db.revertable import RevertableOpStack
@ -57,11 +53,6 @@ class StagedClaimtrieItem(typing.NamedTuple):
)
HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
)
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_writer"

View file

@ -18,6 +18,10 @@ HASHX_LEN = 11
CLAIM_HASH_LEN = 20
HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
)
# class cachedproperty:
# def __init__(self, f):
# self.f = f

View file

@ -31,9 +31,40 @@ class ElasticNotifierProtocol(asyncio.Protocol):
class ElasticNotifierClientProtocol(asyncio.Protocol):
"""notifies the reader when ES has written updates"""
def __init__(self, notifications: asyncio.Queue):
def __init__(self, notifications: asyncio.Queue, host: str, port: int):
self.notifications = notifications
self.transport: typing.Optional[asyncio.Transport] = None
self.host = host
self.port = port
self._lost_connection = asyncio.Event()
self._lost_connection.set()
async def connect(self):
if self._lost_connection.is_set():
await asyncio.get_event_loop().create_connection(
lambda: self, self.host, self.port
)
async def maintain_connection(self, synchronized: asyncio.Event):
first_connect = True
if not self._lost_connection.is_set():
synchronized.set()
while True:
try:
await self._lost_connection.wait()
if not first_connect:
log.warning("lost connection to scribe-elastic-sync notifier")
await self.connect()
first_connect = False
synchronized.set()
log.info("connected to es notifier")
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)", self.host, self.port)
await asyncio.sleep(30)
else:
log.info("stopping the notifier loop")
raise e
def close(self):
if self.transport and not self.transport.is_closing():
@ -41,10 +72,11 @@ class ElasticNotifierClientProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
log.info("connected to es notifier")
self._lost_connection.clear()
def connection_lost(self, exc) -> None:
self.transport = None
self._lost_connection.set()
def data_received(self, data: bytes) -> None:
try:

View file

@ -4,11 +4,6 @@ from functools import lru_cache
from scribe.common import CodeMessageError
HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
)
SignatureInfo = namedtuple('SignatureInfo', 'min_args max_args '
'required_names other_names')

View file

@ -9,12 +9,6 @@ from scribe.common import RPCError, CodeMessageError
from scribe.hub.common import Notification, Request, Response, Batch, ProtocolError
HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
)
NAMESPACE = "scribe"
class JSONRPC:
"""Abstract base class that interprets and constructs JSON RPC messages."""

View file

@ -6,6 +6,7 @@ import logging
from collections import defaultdict
from prometheus_client import Histogram
from scribe import PROMETHEUS_NAMESPACE
from scribe.common import HISTOGRAM_BUCKETS
from scribe.blockchain.transaction.deserializer import Deserializer
if typing.TYPE_CHECKING:
@ -32,9 +33,6 @@ class MemPoolTxSummary:
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_hub"
HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
)
mempool_process_time_metric = Histogram(
"processed_mempool", "Time to process mempool and notify touched addresses",
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS

View file

@ -23,7 +23,7 @@ from scribe import __version__, PROTOCOL_MIN, PROTOCOL_MAX, PROMETHEUS_NAMESPACE
from scribe.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from scribe.elasticsearch import SearchIndex
from scribe.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time
from scribe.common import protocol_version, RPCError, DaemonError, TaskGroup
from scribe.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS
from scribe.hub.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC
from scribe.hub.common import BatchRequest, ProtocolError, Request, Batch, Notification
from scribe.hub.framer import NewlineFramer
@ -190,9 +190,6 @@ class SessionGroup:
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_hub"
HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
)
class SessionManager:

View file

@ -1,18 +1,13 @@
import os
import signal
import json
import typing
import struct
from collections import defaultdict
import asyncio
import logging
from decimal import Decimal
from collections import defaultdict
from elasticsearch import AsyncElasticsearch, NotFoundError
from elasticsearch.helpers import async_streaming_bulk
from prometheus_client import Gauge, Histogram
from scribe.schema.result import Censor
from scribe import PROMETHEUS_NAMESPACE
from scribe.elasticsearch.notifier_protocol import ElasticNotifierProtocol
from scribe.elasticsearch.search import IndexVersionMismatch, expand_query
from scribe.elasticsearch.constants import ALL_FIELDS, INDEX_DEFAULT_SETTINGS
@ -24,24 +19,9 @@ from scribe.db.common import TrendingNotification, DB_PREFIXES
log = logging.getLogger(__name__)
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_elastic_sync"
HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
)
class ElasticWriter(BaseBlockchainReader):
VERSION = 1
prometheus_namespace = ""
block_count_metric = Gauge(
"block_count", "Number of processed blocks", namespace=NAMESPACE
)
block_update_time_metric = Histogram(
"block_time", "Block update times", namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
)
reorg_count_metric = Gauge(
"reorg_count", "Number of reorgs", namespace=NAMESPACE
)
def __init__(self, env):
super().__init__(env, 'lbry-elastic-writer', thread_workers=1, thread_prefix='lbry-elastic-writer')
@ -65,6 +45,7 @@ class ElasticWriter(BaseBlockchainReader):
self._advanced = True
self.synchronized = asyncio.Event()
self._listeners: typing.List[ElasticNotifierProtocol] = []
self._force_reindex = False
async def run_es_notifier(self, synchronized: asyncio.Event):
server = await asyncio.get_event_loop().create_server(
@ -138,8 +119,10 @@ class ElasticWriter(BaseBlockchainReader):
await self.sync_client.indices.refresh(self.index)
return False
async def stop_index(self):
async def stop_index(self, delete=False):
if self.sync_client:
if delete:
await self.delete_index()
await self.sync_client.close()
self.sync_client = None
@ -311,60 +294,30 @@ class ElasticWriter(BaseBlockchainReader):
def last_synced_height(self) -> int:
return self._last_wrote_height
async def start(self, reindex=False):
await super().start()
def _start_cancellable(run, *args):
_flag = asyncio.Event()
self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag)))
return _flag.wait()
self.db.open_db()
await self.db.initialize_caches()
await self.read_es_height()
await self.start_index()
self.last_state = self.db.read_db_state()
await _start_cancellable(self.run_es_notifier)
if reindex or self._last_wrote_height == 0 and self.db.db_height > 0:
async def reindex(self, force=False):
if force or self._last_wrote_height == 0 and self.db.db_height > 0:
if self._last_wrote_height == 0:
self.log.info("running initial ES indexing of rocksdb at block height %i", self.db.db_height)
else:
self.log.info("reindex (last wrote: %i, db height: %i)", self._last_wrote_height, self.db.db_height)
await self.reindex()
await _start_cancellable(self.refresh_blocks_forever)
await self._reindex()
async def stop(self, delete_index=False):
async with self._lock:
while self.cancellable_tasks:
t = self.cancellable_tasks.pop()
if not t.done():
t.cancel()
if delete_index:
await self.delete_index()
await self.stop_index()
self._executor.shutdown(wait=True)
self._executor = None
self.shutdown_event.set()
def _iter_start_tasks(self):
yield self.read_es_height()
yield self.start_index()
yield self._start_cancellable(self.run_es_notifier)
yield self.reindex(force=self._force_reindex)
yield self._start_cancellable(self.refresh_blocks_forever)
def _iter_stop_tasks(self):
yield self._stop_cancellable_tasks()
yield self.stop_index()
def run(self, reindex=False):
loop = asyncio.get_event_loop()
loop.set_default_executor(self._executor)
self._force_reindex = reindex
return super().run()
def __exit():
raise SystemExit()
try:
loop.add_signal_handler(signal.SIGINT, __exit)
loop.add_signal_handler(signal.SIGTERM, __exit)
loop.run_until_complete(self.start(reindex=reindex))
loop.run_until_complete(self.shutdown_event.wait())
except (SystemExit, KeyboardInterrupt):
pass
finally:
loop.run_until_complete(self.stop())
async def reindex(self):
async def _reindex(self):
async with self._lock:
self.log.info("reindexing %i claims (estimate)", self.db.prefix_db.claim_to_txo.estimate_num_keys())
await self.delete_index()

View file

@ -1,14 +1,10 @@
import signal
import asyncio
import typing
from scribe import __version__
from scribe.blockchain.daemon import LBCDaemon
from scribe.reader import BaseBlockchainReader
from scribe.elasticsearch import ElasticNotifierClientProtocol
from scribe.hub.session import SessionManager
from scribe.hub.mempool import MemPool
from scribe.hub.udp import StatusServer
from scribe.hub.prometheus import PrometheusServer
class BlockchainReaderServer(BaseBlockchainReader):
@ -21,7 +17,6 @@ class BlockchainReaderServer(BaseBlockchainReader):
self.mempool_notifications = set()
self.status_server = StatusServer()
self.daemon = LBCDaemon(env.coin, env.daemon_url) # only needed for broadcasting txs
self.prometheus_server: typing.Optional[PrometheusServer] = None
self.mempool = MemPool(self.env.coin, self.db)
self.session_manager = SessionManager(
env, self.db, self.mempool, self.history_cache, self.resolve_cache,
@ -32,7 +27,9 @@ class BlockchainReaderServer(BaseBlockchainReader):
)
self.mempool.session_manager = self.session_manager
self.es_notifications = asyncio.Queue()
self.es_notification_client = ElasticNotifierClientProtocol(self.es_notifications)
self.es_notification_client = ElasticNotifierClientProtocol(
self.es_notifications, '127.0.0.1', self.env.elastic_notifier_port
)
self.synchronized = asyncio.Event()
self._es_height = None
self._es_block_hash = None
@ -75,9 +72,6 @@ class BlockchainReaderServer(BaseBlockchainReader):
self.notifications_to_send.clear()
async def receive_es_notifications(self, synchronized: asyncio.Event):
await asyncio.get_event_loop().create_connection(
lambda: self.es_notification_client, '127.0.0.1', self.env.elastic_notifier_port
)
synchronized.set()
try:
while True:
@ -92,71 +86,23 @@ class BlockchainReaderServer(BaseBlockchainReader):
finally:
self.es_notification_client.close()
async def start(self):
await super().start()
env = self.env
# min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
self.log.info(f'software version: {__version__}')
# self.log.info(f'supported protocol versions: {min_str}-{max_str}')
self.log.info(f'event loop policy: {env.loop_policy}')
self.log.info(f'reorg limit is {env.reorg_limit:,d} blocks')
await self.daemon.height()
def _start_cancellable(run, *args):
_flag = asyncio.Event()
self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag)))
return _flag.wait()
self.db.open_db()
await self.db.initialize_caches()
self.last_state = self.db.read_db_state()
await self.start_prometheus()
async def start_status_server(self):
if self.env.udp_port and int(self.env.udp_port):
await self.status_server.start(
0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country,
self.env.host, self.env.udp_port, self.env.allow_lan_udp
)
await _start_cancellable(self.receive_es_notifications)
await _start_cancellable(self.refresh_blocks_forever)
await self.session_manager.search_index.start()
await _start_cancellable(self.session_manager.serve, self.mempool)
async def stop(self):
await self.status_server.stop()
async with self._lock:
while self.cancellable_tasks:
t = self.cancellable_tasks.pop()
if not t.done():
t.cancel()
await self.session_manager.search_index.stop()
self.db.close()
if self.prometheus_server:
await self.prometheus_server.stop()
self.prometheus_server = None
await self.daemon.close()
self._executor.shutdown(wait=True)
self._executor = None
self.shutdown_event.set()
def _iter_start_tasks(self):
yield self.start_status_server()
yield self._start_cancellable(self.es_notification_client.maintain_connection)
yield self._start_cancellable(self.receive_es_notifications)
yield self._start_cancellable(self.refresh_blocks_forever)
yield self.session_manager.search_index.start()
yield self._start_cancellable(self.session_manager.serve, self.mempool)
def run(self):
loop = asyncio.get_event_loop()
loop.set_default_executor(self._executor)
def __exit():
raise SystemExit()
try:
loop.add_signal_handler(signal.SIGINT, __exit)
loop.add_signal_handler(signal.SIGTERM, __exit)
loop.run_until_complete(self.start())
loop.run_until_complete(self.shutdown_event.wait())
except (SystemExit, KeyboardInterrupt):
pass
finally:
loop.run_until_complete(self.stop())
async def start_prometheus(self):
if not self.prometheus_server and self.env.prometheus_port:
self.prometheus_server = PrometheusServer()
await self.prometheus_server.start("0.0.0.0", self.env.prometheus_port)
def _iter_stop_tasks(self):
yield self.status_server.stop()
yield self._stop_cancellable_tasks()
yield self.session_manager.search_index.stop()
yield self.daemon.close()

View file

@ -1,20 +1,51 @@
import logging
import asyncio
import typing
import signal
from concurrent.futures.thread import ThreadPoolExecutor
from prometheus_client import Gauge, Histogram
from scribe import PROMETHEUS_NAMESPACE
from scribe import PROMETHEUS_NAMESPACE, __version__
from scribe.common import HISTOGRAM_BUCKETS
from scribe.db.prefixes import DBState
from scribe.db import HubDB
from scribe.reader.prometheus import PrometheusServer
HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
)
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_reader"
class BaseBlockchainReader:
class BlockchainReaderInterface:
async def poll_for_changes(self):
"""
Detect and handle if the db has advanced to a new block or unwound during a chain reorganization
If a reorg is detected, this will first unwind() to the branching height and then advance() forward
to the new block(s).
"""
raise NotImplementedError()
def clear_caches(self):
"""
Called after finished advancing, used for invalidating caches
"""
pass
def advance(self, height: int):
"""
Called when advancing to the given block height
Callbacks that look up new values from the added block can be put here
"""
raise NotImplementedError()
def unwind(self):
"""
Go backwards one block
"""
raise NotImplementedError()
class BaseBlockchainReader(BlockchainReaderInterface):
block_count_metric = Gauge(
"block_count", "Number of processed blocks", namespace=NAMESPACE
)
@ -41,6 +72,7 @@ class BaseBlockchainReader:
self.last_state: typing.Optional[DBState] = None
self._refresh_interval = 0.1
self._lock = asyncio.Lock()
self.prometheus_server: typing.Optional[PrometheusServer] = None
def _detect_changes(self):
try:
@ -105,16 +137,7 @@ class BaseBlockchainReader:
await asyncio.sleep(self._refresh_interval)
synchronized.set()
def clear_caches(self):
"""
Called after finished advancing, used for invalidating caches
"""
pass
def advance(self, height: int):
"""
Advance to the given block height
"""
tx_count = self.db.prefix_db.tx_count.get(height).tx_count
assert tx_count not in self.db.tx_counts, f'boom {tx_count} in {len(self.db.tx_counts)} tx counts'
assert len(self.db.tx_counts) == height, f"{len(self.db.tx_counts)} != {height}"
@ -122,14 +145,82 @@ class BaseBlockchainReader:
self.db.headers.append(self.db.prefix_db.header.get(height, deserialize_value=False))
def unwind(self):
"""
Go backwards one block
"""
self.db.tx_counts.pop()
self.db.headers.pop()
def _start_cancellable(self, run, *args):
_flag = asyncio.Event()
self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag)))
return _flag.wait()
def _iter_start_tasks(self):
yield self._start_cancellable(self.refresh_blocks_forever)
def _iter_stop_tasks(self):
yield self._stop_cancellable_tasks()
async def _stop_cancellable_tasks(self):
async with self._lock:
while self.cancellable_tasks:
t = self.cancellable_tasks.pop()
if not t.done():
t.cancel()
async def start(self):
# TODO: make the method here useful
if not self._executor:
self._executor = ThreadPoolExecutor(self._thread_workers, thread_name_prefix=self._thread_prefix)
self.db._executor = self._executor
env = self.env
# min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
self.log.info(f'software version: {__version__}')
# self.log.info(f'supported protocol versions: {min_str}-{max_str}')
self.log.info(f'event loop policy: {env.loop_policy}')
self.log.info(f'reorg limit is {env.reorg_limit:,d} blocks')
self.db.open_db()
self.log.info(f'initializing caches')
await self.db.initialize_caches()
self.last_state = self.db.read_db_state()
self.log.info(f'opened db at block {self.last_state.height}')
self.block_count_metric.set(self.last_state.height)
await self.start_prometheus()
for start_task in self._iter_start_tasks():
await start_task
self.log.info("finished starting")
async def stop(self):
for stop_task in self._iter_stop_tasks():
await stop_task
await self.stop_prometheus()
self.db.close()
self._executor.shutdown(wait=True)
self._executor = None
self.shutdown_event.set()
async def start_prometheus(self):
if not self.prometheus_server and self.env.prometheus_port:
self.prometheus_server = PrometheusServer()
await self.prometheus_server.start("0.0.0.0", self.env.prometheus_port)
async def stop_prometheus(self):
if self.prometheus_server:
await self.prometheus_server.stop()
self.prometheus_server = None
def run(self):
loop = asyncio.get_event_loop()
loop.set_default_executor(self._executor)
def __exit():
raise SystemExit()
try:
loop.add_signal_handler(signal.SIGINT, __exit)
loop.add_signal_handler(signal.SIGTERM, __exit)
loop.run_until_complete(self.start())
loop.run_until_complete(self.shutdown_event.wait())
except (SystemExit, KeyboardInterrupt):
pass
finally:
loop.run_until_complete(self.stop())