This commit is contained in:
Jack Robison 2022-03-24 12:54:22 -04:00
parent ba1a93b9b0
commit ca2ca23892
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
11 changed files with 37 additions and 60 deletions

View file

@ -7,10 +7,11 @@ from typing import Optional, List, Tuple, Set, DefaultDict, Dict
from prometheus_client import Gauge, Histogram from prometheus_client import Gauge, Histogram
from collections import defaultdict from collections import defaultdict
from scribe import __version__, PROMETHEUS_NAMESPACE from scribe import PROMETHEUS_NAMESPACE
from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS from scribe.error.base import ChainError
from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem
from scribe.blockchain.daemon import LBCDaemon from scribe.blockchain.daemon import LBCDaemon
from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block
from scribe.blockchain.prefetcher import Prefetcher from scribe.blockchain.prefetcher import Prefetcher
@ -21,35 +22,6 @@ if typing.TYPE_CHECKING:
from scribe.db.revertable import RevertableOpStack from scribe.db.revertable import RevertableOpStack
class ChainError(Exception):
"""Raised on error processing blocks."""
class StagedClaimtrieItem(typing.NamedTuple):
name: str
normalized_name: str
claim_hash: bytes
amount: int
expiration_height: int
tx_num: int
position: int
root_tx_num: int
root_position: int
channel_signature_is_valid: bool
signing_hash: Optional[bytes]
reposted_claim_hash: Optional[bytes]
@property
def is_update(self) -> bool:
return (self.tx_num, self.position) != (self.root_tx_num, self.root_position)
def invalidate_signature(self) -> 'StagedClaimtrieItem':
return StagedClaimtrieItem(
self.name, self.normalized_name, self.claim_hash, self.amount, self.expiration_height, self.tx_num,
self.position, self.root_tx_num, self.root_position, False, None, self.reposted_claim_hash
)
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_writer" NAMESPACE = f"{PROMETHEUS_NAMESPACE}_writer"

View file

@ -33,6 +33,34 @@ HISTOGRAM_BUCKETS = (
# return value # return value
class StagedClaimtrieItem(typing.NamedTuple):
"""
Represents a claim TXO, used internally by the block processor
"""
name: str
normalized_name: str
claim_hash: bytes
amount: int
expiration_height: int
tx_num: int
position: int
root_tx_num: int
root_position: int
channel_signature_is_valid: bool
signing_hash: typing.Optional[bytes]
reposted_claim_hash: typing.Optional[bytes]
@property
def is_update(self) -> bool:
return (self.tx_num, self.position) != (self.root_tx_num, self.root_position)
def invalidate_signature(self) -> 'StagedClaimtrieItem':
return StagedClaimtrieItem(
self.name, self.normalized_name, self.claim_hash, self.amount, self.expiration_height, self.tx_num,
self.position, self.root_tx_num, self.root_position, False, None, self.reposted_claim_hash
)
def formatted_time(t, sep=' '): def formatted_time(t, sep=' '):
"""Return a number of seconds as a string in days, hours, mins and """Return a number of seconds as a string in days, hours, mins and
maybe secs.""" maybe secs."""

View file

@ -9,7 +9,6 @@ import base64
import logging import logging
from typing import Optional, Iterable, Tuple, DefaultDict, Set, Dict, List, TYPE_CHECKING from typing import Optional, Iterable, Tuple, DefaultDict, Set, Dict, List, TYPE_CHECKING
from functools import partial from functools import partial
from asyncio import sleep
from bisect import bisect_right from bisect import bisect_right
from collections import defaultdict from collections import defaultdict
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor

View file

@ -2,8 +2,6 @@ import typing
import struct import struct
import array import array
import base64 import base64
import rocksdb
import rocksdb.interfaces
from typing import Union, Tuple, NamedTuple, Optional from typing import Union, Tuple, NamedTuple, Optional
from scribe.db.common import DB_PREFIXES from scribe.db.common import DB_PREFIXES
from scribe.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow from scribe.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow

View file

@ -1,5 +1,4 @@
import logging import logging
import time
import asyncio import asyncio
import struct import struct
from bisect import bisect_right from bisect import bisect_right
@ -9,15 +8,13 @@ from operator import itemgetter
from typing import Optional, List, Iterable, TYPE_CHECKING from typing import Optional, List, Iterable, TYPE_CHECKING
from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError
from elasticsearch.helpers import async_streaming_bulk
from scribe.schema.result import Censor, Outputs from scribe.schema.result import Censor, Outputs
from scribe.schema.tags import clean_tags from scribe.schema.tags import clean_tags
from scribe.schema.url import normalize_name from scribe.schema.url import normalize_name
from scribe.error import TooManyClaimSearchParametersError from scribe.error import TooManyClaimSearchParametersError
from scribe.common import LRUCache from scribe.common import LRUCache
from scribe.db.common import CLAIM_TYPES, STREAM_TYPES from scribe.db.common import CLAIM_TYPES, STREAM_TYPES
from scribe.elasticsearch.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, \ from scribe.elasticsearch.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, RANGE_FIELDS
RANGE_FIELDS, ALL_FIELDS
from scribe.db.common import ResolveResult from scribe.db.common import ResolveResult
if TYPE_CHECKING: if TYPE_CHECKING:
from scribe.db import HubDB from scribe.db import HubDB

View file

@ -5,12 +5,10 @@ import asyncio
from collections import defaultdict from collections import defaultdict
from elasticsearch import AsyncElasticsearch, NotFoundError from elasticsearch import AsyncElasticsearch, NotFoundError
from elasticsearch.helpers import async_streaming_bulk from elasticsearch.helpers import async_streaming_bulk
from scribe.schema.result import Censor from scribe.schema.result import Censor
from scribe.service import BlockchainReaderService from scribe.service import BlockchainReaderService
from scribe.db.revertable import RevertableOp from scribe.db.revertable import RevertableOp
from scribe.db.common import TrendingNotification, DB_PREFIXES from scribe.db.common import TrendingNotification, DB_PREFIXES
from scribe.elasticsearch.notifier_protocol import ElasticNotifierProtocol from scribe.elasticsearch.notifier_protocol import ElasticNotifierProtocol
from scribe.elasticsearch.search import IndexVersionMismatch, expand_query from scribe.elasticsearch.search import IndexVersionMismatch, expand_query
from scribe.elasticsearch.constants import ALL_FIELDS, INDEX_DEFAULT_SETTINGS from scribe.elasticsearch.constants import ALL_FIELDS, INDEX_DEFAULT_SETTINGS

View file

@ -7,3 +7,7 @@ def claim_id(claim_hash):
class BaseError(Exception): class BaseError(Exception):
pass pass
class ChainError(Exception):
"""Raised on error processing blocks."""

View file

@ -5,7 +5,6 @@ import typing
import logging import logging
from collections import defaultdict from collections import defaultdict
from prometheus_client import Histogram from prometheus_client import Histogram
import rocksdb.errors import rocksdb.errors
from scribe import PROMETHEUS_NAMESPACE from scribe import PROMETHEUS_NAMESPACE
from scribe.common import HISTOGRAM_BUCKETS from scribe.common import HISTOGRAM_BUCKETS

View file

@ -1,10 +1,8 @@
import asyncio import asyncio
from scribe.blockchain.daemon import LBCDaemon from scribe.blockchain.daemon import LBCDaemon
from scribe.hub.session import SessionManager from scribe.hub.session import SessionManager
from scribe.hub.mempool import MemPool from scribe.hub.mempool import MemPool
from scribe.hub.udp import StatusServer from scribe.hub.udp import StatusServer
from scribe.service import BlockchainReaderService from scribe.service import BlockchainReaderService
from scribe.elasticsearch import ElasticNotifierClientProtocol from scribe.elasticsearch import ElasticNotifierClientProtocol

View file

@ -5,7 +5,6 @@ import logging
from typing import Optional, Tuple, NamedTuple from typing import Optional, Tuple, NamedTuple
from scribe.schema.attrs import country_str_to_int, country_int_to_str from scribe.schema.attrs import country_str_to_int, country_int_to_str
from scribe.common import LRUCache, is_valid_public_ipv4 from scribe.common import LRUCache, is_valid_public_ipv4
# from prometheus_client import Counter
log = logging.getLogger(__name__) log = logging.getLogger(__name__)

View file

@ -15,7 +15,7 @@ from scribe.metrics import PrometheusServer
class BlockchainService: class BlockchainService:
""" """
Base class for blockchain readers and the writer Base class for blockchain readers as well as the block processor
""" """
def __init__(self, env: Env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'scribe'): def __init__(self, env: Env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'scribe'):
self.env = env self.env = env
@ -34,20 +34,6 @@ class BlockchainService:
) )
self._stopping = False self._stopping = False
def advance(self, height: int):
"""
Called when advancing to the given block height
Callbacks that look up new values from the added block can be put here
"""
raise NotImplementedError()
def unwind(self):
"""
Go backwards one block
"""
raise NotImplementedError()
def start_cancellable(self, run, *args): def start_cancellable(self, run, *args):
_flag = asyncio.Event() _flag = asyncio.Event()
self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag))) self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag)))
@ -178,7 +164,6 @@ class BlockchainReaderService(BlockchainService):
def unwind(self): def unwind(self):
""" """
Go backwards one block Go backwards one block
""" """
prev_count = self.db.tx_counts.pop() prev_count = self.db.tx_counts.pop()
tx_count = self.db.tx_counts[-1] tx_count = self.db.tx_counts[-1]