From ca2ca23892f9bc72e1a897eb8f16fc811d7134a5 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 24 Mar 2022 12:54:22 -0400 Subject: [PATCH] cleanup --- scribe/blockchain/service.py | 34 +++------------------------------ scribe/common.py | 28 +++++++++++++++++++++++++++ scribe/db/db.py | 1 - scribe/db/prefixes.py | 2 -- scribe/elasticsearch/search.py | 5 +---- scribe/elasticsearch/service.py | 2 -- scribe/error/base.py | 4 ++++ scribe/hub/mempool.py | 1 - scribe/hub/service.py | 2 -- scribe/hub/udp.py | 1 - scribe/service.py | 17 +---------------- 11 files changed, 37 insertions(+), 60 deletions(-) diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index 4cafeef..6c939d2 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -7,10 +7,11 @@ from typing import Optional, List, Tuple, Set, DefaultDict, Dict from prometheus_client import Gauge, Histogram from collections import defaultdict -from scribe import __version__, PROMETHEUS_NAMESPACE +from scribe import PROMETHEUS_NAMESPACE from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue -from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS +from scribe.error.base import ChainError +from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem from scribe.blockchain.daemon import LBCDaemon from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block from scribe.blockchain.prefetcher import Prefetcher @@ -21,35 +22,6 @@ if typing.TYPE_CHECKING: from scribe.db.revertable import RevertableOpStack -class ChainError(Exception): - """Raised on error processing blocks.""" - - -class StagedClaimtrieItem(typing.NamedTuple): - name: str - normalized_name: str - claim_hash: bytes - amount: int - expiration_height: int - tx_num: int - position: int - root_tx_num: int - root_position: int - channel_signature_is_valid: bool - signing_hash: Optional[bytes] - reposted_claim_hash: Optional[bytes] - - @property - def is_update(self) -> bool: - return (self.tx_num, self.position) != (self.root_tx_num, self.root_position) - - def invalidate_signature(self) -> 'StagedClaimtrieItem': - return StagedClaimtrieItem( - self.name, self.normalized_name, self.claim_hash, self.amount, self.expiration_height, self.tx_num, - self.position, self.root_tx_num, self.root_position, False, None, self.reposted_claim_hash - ) - - NAMESPACE = f"{PROMETHEUS_NAMESPACE}_writer" diff --git a/scribe/common.py b/scribe/common.py index 06f486a..490c3e4 100644 --- a/scribe/common.py +++ b/scribe/common.py @@ -33,6 +33,34 @@ HISTOGRAM_BUCKETS = ( # return value +class StagedClaimtrieItem(typing.NamedTuple): + """ + Represents a claim TXO, used internally by the block processor + """ + name: str + normalized_name: str + claim_hash: bytes + amount: int + expiration_height: int + tx_num: int + position: int + root_tx_num: int + root_position: int + channel_signature_is_valid: bool + signing_hash: typing.Optional[bytes] + reposted_claim_hash: typing.Optional[bytes] + + @property + def is_update(self) -> bool: + return (self.tx_num, self.position) != (self.root_tx_num, self.root_position) + + def invalidate_signature(self) -> 'StagedClaimtrieItem': + return StagedClaimtrieItem( + self.name, self.normalized_name, self.claim_hash, self.amount, self.expiration_height, self.tx_num, + self.position, self.root_tx_num, self.root_position, False, None, self.reposted_claim_hash + ) + + def formatted_time(t, sep=' '): """Return a number of seconds as a string in days, hours, mins and maybe secs.""" diff --git a/scribe/db/db.py b/scribe/db/db.py index 707142c..b43e17d 100644 --- a/scribe/db/db.py +++ b/scribe/db/db.py @@ -9,7 +9,6 @@ import base64 import logging from typing import Optional, Iterable, Tuple, DefaultDict, Set, Dict, List, TYPE_CHECKING from functools import partial -from asyncio import sleep from bisect import bisect_right from collections import defaultdict from concurrent.futures.thread import ThreadPoolExecutor diff --git a/scribe/db/prefixes.py b/scribe/db/prefixes.py index 09ae90e..199aad5 100644 --- a/scribe/db/prefixes.py +++ b/scribe/db/prefixes.py @@ -2,8 +2,6 @@ import typing import struct import array import base64 -import rocksdb -import rocksdb.interfaces from typing import Union, Tuple, NamedTuple, Optional from scribe.db.common import DB_PREFIXES from scribe.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow diff --git a/scribe/elasticsearch/search.py b/scribe/elasticsearch/search.py index 0e336a5..b4d335b 100644 --- a/scribe/elasticsearch/search.py +++ b/scribe/elasticsearch/search.py @@ -1,5 +1,4 @@ import logging -import time import asyncio import struct from bisect import bisect_right @@ -9,15 +8,13 @@ from operator import itemgetter from typing import Optional, List, Iterable, TYPE_CHECKING from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError -from elasticsearch.helpers import async_streaming_bulk from scribe.schema.result import Censor, Outputs from scribe.schema.tags import clean_tags from scribe.schema.url import normalize_name from scribe.error import TooManyClaimSearchParametersError from scribe.common import LRUCache from scribe.db.common import CLAIM_TYPES, STREAM_TYPES -from scribe.elasticsearch.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, \ - RANGE_FIELDS, ALL_FIELDS +from scribe.elasticsearch.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, RANGE_FIELDS from scribe.db.common import ResolveResult if TYPE_CHECKING: from scribe.db import HubDB diff --git a/scribe/elasticsearch/service.py b/scribe/elasticsearch/service.py index 2ec0fa6..1b36497 100644 --- a/scribe/elasticsearch/service.py +++ b/scribe/elasticsearch/service.py @@ -5,12 +5,10 @@ import asyncio from collections import defaultdict from elasticsearch import AsyncElasticsearch, NotFoundError from elasticsearch.helpers import async_streaming_bulk - from scribe.schema.result import Censor from scribe.service import BlockchainReaderService from scribe.db.revertable import RevertableOp from scribe.db.common import TrendingNotification, DB_PREFIXES - from scribe.elasticsearch.notifier_protocol import ElasticNotifierProtocol from scribe.elasticsearch.search import IndexVersionMismatch, expand_query from scribe.elasticsearch.constants import ALL_FIELDS, INDEX_DEFAULT_SETTINGS diff --git a/scribe/error/base.py b/scribe/error/base.py index fce1be2..0cd7bc4 100644 --- a/scribe/error/base.py +++ b/scribe/error/base.py @@ -7,3 +7,7 @@ def claim_id(claim_hash): class BaseError(Exception): pass + + +class ChainError(Exception): + """Raised on error processing blocks.""" diff --git a/scribe/hub/mempool.py b/scribe/hub/mempool.py index 04363ed..3613749 100644 --- a/scribe/hub/mempool.py +++ b/scribe/hub/mempool.py @@ -5,7 +5,6 @@ import typing import logging from collections import defaultdict from prometheus_client import Histogram - import rocksdb.errors from scribe import PROMETHEUS_NAMESPACE from scribe.common import HISTOGRAM_BUCKETS diff --git a/scribe/hub/service.py b/scribe/hub/service.py index 74fb768..7708cf2 100644 --- a/scribe/hub/service.py +++ b/scribe/hub/service.py @@ -1,10 +1,8 @@ import asyncio from scribe.blockchain.daemon import LBCDaemon - from scribe.hub.session import SessionManager from scribe.hub.mempool import MemPool from scribe.hub.udp import StatusServer - from scribe.service import BlockchainReaderService from scribe.elasticsearch import ElasticNotifierClientProtocol diff --git a/scribe/hub/udp.py b/scribe/hub/udp.py index 94cf337..f919324 100644 --- a/scribe/hub/udp.py +++ b/scribe/hub/udp.py @@ -5,7 +5,6 @@ import logging from typing import Optional, Tuple, NamedTuple from scribe.schema.attrs import country_str_to_int, country_int_to_str from scribe.common import LRUCache, is_valid_public_ipv4 -# from prometheus_client import Counter log = logging.getLogger(__name__) diff --git a/scribe/service.py b/scribe/service.py index 054544d..4b4e2a8 100644 --- a/scribe/service.py +++ b/scribe/service.py @@ -15,7 +15,7 @@ from scribe.metrics import PrometheusServer class BlockchainService: """ - Base class for blockchain readers and the writer + Base class for blockchain readers as well as the block processor """ def __init__(self, env: Env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'scribe'): self.env = env @@ -34,20 +34,6 @@ class BlockchainService: ) self._stopping = False - def advance(self, height: int): - """ - Called when advancing to the given block height - Callbacks that look up new values from the added block can be put here - """ - raise NotImplementedError() - - def unwind(self): - """ - Go backwards one block - - """ - raise NotImplementedError() - def start_cancellable(self, run, *args): _flag = asyncio.Event() self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag))) @@ -178,7 +164,6 @@ class BlockchainReaderService(BlockchainService): def unwind(self): """ Go backwards one block - """ prev_count = self.db.tx_counts.pop() tx_count = self.db.tx_counts[-1]