Merge pull request #38 from lbryio/rename-things

Rename package to `hub`
This commit is contained in:
Jack Robison 2022-05-18 16:18:21 -04:00 committed by GitHub
commit 9e9c778edd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
102 changed files with 934 additions and 942 deletions

View file

@ -35,12 +35,12 @@ USER $user
WORKDIR $projects_dir WORKDIR $projects_dir
RUN python3.9 -m pip install pip RUN python3.9 -m pip install pip
RUN python3.9 -m pip install -e . RUN python3.9 -m pip install -e .
RUN python3.9 docker/set_build.py RUN python3.9 scripts/set_build.py
RUN rm ~/.cache -rf RUN rm ~/.cache -rf
# entry point # entry point
VOLUME $db_dir VOLUME $db_dir
ENV DB_DIRECTORY=$db_dir ENV DB_DIRECTORY=$db_dir
COPY ./docker/scribe_entrypoint.sh /entrypoint.sh COPY ./scripts/entrypoint.sh /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"] ENTRYPOINT ["/entrypoint.sh"]

View file

@ -1,7 +0,0 @@
#!/bin/bash
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
cd "$DIR/../.." ## make sure we're in the right place. Docker Hub screws this up sometimes
echo "docker build dir: $(pwd)"
docker build --build-arg DOCKER_TAG=$DOCKER_TAG --build-arg DOCKER_COMMIT=$SOURCE_COMMIT -f $DOCKERFILE_PATH -t $IMAGE_NAME .

View file

Before

Width:  |  Height:  |  Size: 142 KiB

After

Width:  |  Height:  |  Size: 142 KiB

View file

@ -55,8 +55,8 @@ services:
volumes: volumes:
- "lbry_rocksdb:/database" - "lbry_rocksdb:/database"
environment: environment:
- HUB_COMMAND=scribe-hub - HUB_COMMAND=herald
command: # for full options, see `scribe-hub --help` command: # for full options, see `herald --help`
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245" - "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--max_query_workers=4" - "--max_query_workers=4"
- "--host=0.0.0.0" - "--host=0.0.0.0"

View file

@ -34,7 +34,7 @@ services:
- "--max_query_workers=2" - "--max_query_workers=2"
- "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8" - "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8"
- "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6" - "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6"
scribe_hub: herald:
depends_on: depends_on:
- scribe_elastic_sync - scribe_elastic_sync
- scribe - scribe
@ -47,7 +47,7 @@ services:
volumes: volumes:
- "lbry_rocksdb:/database" - "lbry_rocksdb:/database"
environment: environment:
- HUB_COMMAND=scribe-hub - HUB_COMMAND=herald
command: command:
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245" - "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--elastic_host=127.0.0.1" - "--elastic_host=127.0.0.1"

765
hub/common.py Normal file
View file

@ -0,0 +1,765 @@
import struct
import hashlib
import hmac
import ipaddress
import logging
import logging.handlers
import typing
import collections
from decimal import Decimal
from typing import Iterable
from asyncio import get_event_loop, Event
from prometheus_client import Counter
from hub.schema.tags import clean_tags
from hub.schema.url import normalize_name
from hub.error import TooManyClaimSearchParametersError
log = logging.getLogger(__name__)
_sha256 = hashlib.sha256
_sha512 = hashlib.sha512
_new_hash = hashlib.new
_new_hmac = hmac.new
HASHX_LEN = 11
CLAIM_HASH_LEN = 20
HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
)
CLAIM_TYPES = {
'stream': 1,
'channel': 2,
'repost': 3,
'collection': 4,
}
STREAM_TYPES = {
'video': 1,
'audio': 2,
'image': 3,
'document': 4,
'binary': 5,
'model': 6,
}
def setup_logging(log_path: str):
log = logging.getLogger()
fmt = logging.Formatter("%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
handler = logging.handlers.RotatingFileHandler(log_path, maxBytes=1024*1024*5, backupCount=2)
handler.setFormatter(fmt)
log.addHandler(handler)
handler = logging.StreamHandler()
handler.setFormatter(fmt)
log.addHandler(handler)
log.setLevel(logging.INFO)
logging.getLogger('aiohttp').setLevel(logging.WARNING)
logging.getLogger('elasticsearch').setLevel(logging.WARNING)
class StagedClaimtrieItem(typing.NamedTuple):
"""
Represents a claim TXO, used internally by the block processor
"""
name: str
normalized_name: str
claim_hash: bytes
amount: int
expiration_height: int
tx_num: int
position: int
root_tx_num: int
root_position: int
channel_signature_is_valid: bool
signing_hash: typing.Optional[bytes]
reposted_claim_hash: typing.Optional[bytes]
@property
def is_update(self) -> bool:
return (self.tx_num, self.position) != (self.root_tx_num, self.root_position)
def invalidate_signature(self) -> 'StagedClaimtrieItem':
return StagedClaimtrieItem(
self.name, self.normalized_name, self.claim_hash, self.amount, self.expiration_height, self.tx_num,
self.position, self.root_tx_num, self.root_position, False, None, self.reposted_claim_hash
)
def formatted_time(t, sep=' '):
"""Return a number of seconds as a string in days, hours, mins and
maybe secs."""
t = int(t)
fmts = (('{:d}d', 86400), ('{:02d}h', 3600), ('{:02d}m', 60))
parts = []
for fmt, n in fmts:
val = t // n
if parts or val:
parts.append(fmt.format(val))
t %= n
if len(parts) < 3:
parts.append(f'{t:02d}s')
return sep.join(parts)
def protocol_tuple(s):
"""Converts a protocol version number, such as "1.0" to a tuple (1, 0).
If the version number is bad, (0, ) indicating version 0 is returned."""
try:
return tuple(int(part) for part in s.split('.'))
except Exception:
return (0, )
def version_string(ptuple):
"""Convert a version tuple such as (1, 2) to "1.2".
There is always at least one dot, so (1, ) becomes "1.0"."""
while len(ptuple) < 2:
ptuple += (0, )
return '.'.join(str(p) for p in ptuple)
def protocol_version(client_req, min_tuple, max_tuple):
"""Given a client's protocol version string, return a pair of
protocol tuples:
(negotiated version, client min request)
If the request is unsupported, the negotiated protocol tuple is
None.
"""
if client_req is None:
client_min = client_max = min_tuple
else:
if isinstance(client_req, list) and len(client_req) == 2:
client_min, client_max = client_req
else:
client_min = client_max = client_req
client_min = protocol_tuple(client_min)
client_max = protocol_tuple(client_max)
result = min(client_max, max_tuple)
if result < max(client_min, min_tuple) or result == (0, ):
result = None
return result, client_min
class LRUCacheWithMetrics:
__slots__ = [
'capacity',
'cache',
'_track_metrics',
'hits',
'misses'
]
def __init__(self, capacity: int, metric_name: typing.Optional[str] = None, namespace: str = "daemon_cache"):
self.capacity = capacity
self.cache = collections.OrderedDict()
if metric_name is None:
self._track_metrics = False
self.hits = self.misses = None
else:
self._track_metrics = True
try:
self.hits = Counter(
f"{metric_name}_cache_hit_count", "Number of cache hits", namespace=namespace
)
self.misses = Counter(
f"{metric_name}_cache_miss_count", "Number of cache misses", namespace=namespace
)
except ValueError as err:
log.debug("failed to set up prometheus %s_cache_miss_count metric: %s", metric_name, err)
self._track_metrics = False
self.hits = self.misses = None
def get(self, key, default=None):
try:
value = self.cache.pop(key)
if self._track_metrics:
self.hits.inc()
except KeyError:
if self._track_metrics:
self.misses.inc()
return default
self.cache[key] = value
return value
def set(self, key, value):
try:
self.cache.pop(key)
except KeyError:
if len(self.cache) >= self.capacity:
self.cache.popitem(last=False)
self.cache[key] = value
def clear(self):
self.cache.clear()
def pop(self, key):
return self.cache.pop(key)
def __setitem__(self, key, value):
return self.set(key, value)
def __getitem__(self, item):
return self.get(item)
def __contains__(self, item) -> bool:
return item in self.cache
def __len__(self):
return len(self.cache)
def __delitem__(self, key):
self.cache.pop(key)
def __del__(self):
self.clear()
class LRUCache:
__slots__ = [
'capacity',
'cache'
]
def __init__(self, capacity: int):
self.capacity = capacity
self.cache = collections.OrderedDict()
def get(self, key, default=None):
try:
value = self.cache.pop(key)
except KeyError:
return default
self.cache[key] = value
return value
def set(self, key, value):
try:
self.cache.pop(key)
except KeyError:
if len(self.cache) >= self.capacity:
self.cache.popitem(last=False)
self.cache[key] = value
def items(self):
return self.cache.items()
def clear(self):
self.cache.clear()
def pop(self, key, default=None):
return self.cache.pop(key, default)
def __setitem__(self, key, value):
return self.set(key, value)
def __getitem__(self, item):
return self.get(item)
def __contains__(self, item) -> bool:
return item in self.cache
def __len__(self):
return len(self.cache)
def __delitem__(self, key):
self.cache.pop(key)
def __del__(self):
self.clear()
# the ipaddress module does not show these subnets as reserved
CARRIER_GRADE_NAT_SUBNET = ipaddress.ip_network('100.64.0.0/10')
IPV4_TO_6_RELAY_SUBNET = ipaddress.ip_network('192.88.99.0/24')
def is_valid_public_ipv4(address, allow_localhost: bool = False, allow_lan: bool = False):
try:
parsed_ip = ipaddress.ip_address(address)
if parsed_ip.is_loopback and allow_localhost:
return True
if allow_lan and parsed_ip.is_private:
return True
if any((parsed_ip.version != 4, parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback,
parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private)):
return False
else:
return not any((CARRIER_GRADE_NAT_SUBNET.supernet_of(ipaddress.ip_network(f"{address}/32")),
IPV4_TO_6_RELAY_SUBNET.supernet_of(ipaddress.ip_network(f"{address}/32"))))
except (ipaddress.AddressValueError, ValueError):
return False
def sha256(x):
"""Simple wrapper of hashlib sha256."""
return _sha256(x).digest()
def ripemd160(x):
"""Simple wrapper of hashlib ripemd160."""
h = _new_hash('ripemd160')
h.update(x)
return h.digest()
def double_sha256(x):
"""SHA-256 of SHA-256, as used extensively in bitcoin."""
return sha256(sha256(x))
def hmac_sha512(key, msg):
"""Use SHA-512 to provide an HMAC."""
return _new_hmac(key, msg, _sha512).digest()
def hash160(x):
"""RIPEMD-160 of SHA-256. Used to make bitcoin addresses from pubkeys."""
return ripemd160(sha256(x))
def hash_to_hex_str(x: bytes) -> str:
"""Convert a big-endian binary hash to displayed hex string.
Display form of a binary hash is reversed and converted to hex.
"""
return x[::-1].hex()
def hex_str_to_hash(x: str) -> bytes:
"""Convert a displayed hex string to a binary hash."""
return bytes.fromhex(x)[::-1]
INVALID_REQUEST = -32600
INVALID_ARGS = -32602
class CodeMessageError(Exception):
@property
def code(self):
return self.args[0]
@property
def message(self):
return self.args[1]
def __eq__(self, other):
return (isinstance(other, self.__class__) and
self.code == other.code and self.message == other.message)
def __hash__(self):
# overridden to make the exception hashable
# see https://bugs.python.org/issue28603
return hash((self.code, self.message))
@classmethod
def invalid_args(cls, message):
return cls(INVALID_ARGS, message)
@classmethod
def invalid_request(cls, message):
return cls(INVALID_REQUEST, message)
@classmethod
def empty_batch(cls):
return cls.invalid_request('batch is empty')
class RPCError(CodeMessageError):
pass
class DaemonError(Exception):
"""Raised when the daemon returns an error in its results."""
class WarmingUpError(Exception):
"""Internal - when the daemon is warming up."""
class WorkQueueFullError(Exception):
"""Internal - when the daemon's work queue is full."""
class TaskGroup:
def __init__(self, loop=None):
self._loop = loop or get_event_loop()
self._tasks = set()
self.done = Event()
self.started = Event()
def __len__(self):
return len(self._tasks)
def add(self, coro):
task = self._loop.create_task(coro)
self._tasks.add(task)
self.started.set()
self.done.clear()
task.add_done_callback(self._remove)
return task
def _remove(self, task):
self._tasks.remove(task)
if len(self._tasks) < 1:
self.done.set()
self.started.clear()
def cancel(self):
for task in self._tasks:
task.cancel()
self.done.set()
self.started.clear()
class IndexVersionMismatch(Exception):
def __init__(self, got_version, expected_version):
self.got_version = got_version
self.expected_version = expected_version
# Elasticsearch constants
INDEX_DEFAULT_SETTINGS = {
"settings":
{"analysis":
{"analyzer": {
"default": {"tokenizer": "whitespace", "filter": ["lowercase", "porter_stem"]}}},
"index":
{"refresh_interval": -1,
"number_of_shards": 1,
"number_of_replicas": 0,
"sort": {
"field": ["trending_score", "release_time"],
"order": ["desc", "desc"]
}}
},
"mappings": {
"properties": {
"claim_id": {
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
},
"type": "text",
"index_prefixes": {
"min_chars": 1,
"max_chars": 10
}
},
"sd_hash": {
"fields": {
"keyword": {
"ignore_above": 96,
"type": "keyword"
}
},
"type": "text",
"index_prefixes": {
"min_chars": 1,
"max_chars": 4
}
},
"height": {"type": "integer"},
"claim_type": {"type": "byte"},
"censor_type": {"type": "byte"},
"trending_score": {"type": "double"},
"release_time": {"type": "long"}
}
}
}
FIELDS = {
'_id',
'claim_id', 'claim_type', 'claim_name', 'normalized_name',
'tx_id', 'tx_nout', 'tx_position',
'short_url', 'canonical_url',
'is_controlling', 'last_take_over_height',
'public_key_bytes', 'public_key_id', 'claims_in_channel',
'channel_id', 'signature', 'signature_digest', 'is_signature_valid',
'amount', 'effective_amount', 'support_amount',
'fee_amount', 'fee_currency',
'height', 'creation_height', 'activation_height', 'expiration_height',
'stream_type', 'media_type', 'censor_type',
'title', 'author', 'description',
'timestamp', 'creation_timestamp',
'duration', 'release_time',
'tags', 'languages', 'has_source', 'reposted_claim_type',
'reposted_claim_id', 'repost_count', 'sd_hash',
'trending_score', 'tx_num',
'channel_tx_id', 'channel_tx_position', 'channel_height', 'reposted_tx_id',
'reposted_tx_position', 'reposted_height',
}
TEXT_FIELDS = {
'author', 'canonical_url', 'channel_id', 'description', 'claim_id', 'censoring_channel_id',
'media_type', 'normalized_name', 'public_key_bytes', 'public_key_id', 'short_url', 'signature',
'claim_name', 'signature_digest', 'title', 'tx_id', 'fee_currency', 'reposted_claim_id',
'tags', 'sd_hash', 'channel_tx_id', 'reposted_tx_id',
}
RANGE_FIELDS = {
'height', 'creation_height', 'activation_height', 'expiration_height',
'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount',
'tx_position', 'repost_count', 'limit_claims_per_channel',
'amount', 'effective_amount', 'support_amount',
'trending_score', 'censor_type', 'tx_num', 'reposted_tx_position', 'reposted_height',
'channel_tx_position', 'channel_height',
}
ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS
REPLACEMENTS = {
'claim_name': 'normalized_name',
'name': 'normalized_name',
'txid': 'tx_id',
'nout': 'tx_nout',
'trending_group': 'trending_score',
'trending_mixed': 'trending_score',
'trending_global': 'trending_score',
'trending_local': 'trending_score',
'reposted': 'repost_count',
'stream_types': 'stream_type',
'media_types': 'media_type',
'valid_channel_signature': 'is_signature_valid'
}
def expand_query(**kwargs):
if "amount_order" in kwargs:
kwargs["limit"] = 1
kwargs["order_by"] = "effective_amount"
kwargs["offset"] = int(kwargs["amount_order"]) - 1
if 'name' in kwargs:
kwargs['name'] = normalize_name(kwargs.pop('name'))
if kwargs.get('is_controlling') is False:
kwargs.pop('is_controlling')
query = {'must': [], 'must_not': []}
collapse = None
if 'fee_currency' in kwargs and kwargs['fee_currency'] is not None:
kwargs['fee_currency'] = kwargs['fee_currency'].upper()
for key, value in kwargs.items():
key = key.replace('claim.', '')
many = key.endswith('__in') or isinstance(value, list)
if many and len(value) > 2048:
raise TooManyClaimSearchParametersError(key, 2048)
if many:
key = key.replace('__in', '')
value = list(filter(None, value))
if value is None or isinstance(value, list) and len(value) == 0:
continue
key = REPLACEMENTS.get(key, key)
if key in FIELDS:
partial_id = False
if key == 'claim_type':
if isinstance(value, str):
value = CLAIM_TYPES[value]
else:
value = [CLAIM_TYPES[claim_type] for claim_type in value]
elif key == 'stream_type':
value = [STREAM_TYPES[value]] if isinstance(value, str) else list(map(STREAM_TYPES.get, value))
if key == '_id':
if isinstance(value, Iterable):
value = [item[::-1].hex() for item in value]
else:
value = value[::-1].hex()
if not many and key in ('_id', 'claim_id', 'sd_hash') and len(value) < 20:
partial_id = True
if key in ('signature_valid', 'has_source'):
continue # handled later
if key in TEXT_FIELDS:
key += '.keyword'
ops = {'<=': 'lte', '>=': 'gte', '<': 'lt', '>': 'gt'}
if partial_id:
query['must'].append({"prefix": {key: value}})
elif key in RANGE_FIELDS and isinstance(value, str) and value[0] in ops:
operator_length = 2 if value[:2] in ops else 1
operator, value = value[:operator_length], value[operator_length:]
if key == 'fee_amount':
value = str(Decimal(value)*1000)
query['must'].append({"range": {key: {ops[operator]: value}}})
elif key in RANGE_FIELDS and isinstance(value, list) and all(v[0] in ops for v in value):
range_constraints = []
release_times = []
for v in value:
operator_length = 2 if v[:2] in ops else 1
operator, stripped_op_v = v[:operator_length], v[operator_length:]
if key == 'fee_amount':
stripped_op_v = str(Decimal(stripped_op_v)*1000)
if key == 'release_time':
release_times.append((operator, stripped_op_v))
else:
range_constraints.append((operator, stripped_op_v))
if key != 'release_time':
query['must'].append({"range": {key: {ops[operator]: v for operator, v in range_constraints}}})
else:
query['must'].append(
{"bool":
{"should": [
{"bool": {
"must_not": {
"exists": {
"field": "release_time"
}
}
}},
{"bool": {
"must": [
{"exists": {"field": "release_time"}},
{'range': {key: {ops[operator]: v for operator, v in release_times}}},
]}},
]}
}
)
elif many:
query['must'].append({"terms": {key: value}})
else:
if key == 'fee_amount':
value = str(Decimal(value)*1000)
query['must'].append({"term": {key: {"value": value}}})
elif key == 'not_channel_ids':
for channel_id in value:
query['must_not'].append({"term": {'channel_id.keyword': channel_id}})
query['must_not'].append({"term": {'_id': channel_id}})
elif key == 'channel_ids':
query['must'].append({"terms": {'channel_id.keyword': value}})
elif key == 'claim_ids':
query['must'].append({"terms": {'claim_id.keyword': value}})
elif key == 'media_types':
query['must'].append({"terms": {'media_type.keyword': value}})
elif key == 'any_languages':
query['must'].append({"terms": {'languages': clean_tags(value)}})
elif key == 'any_languages':
query['must'].append({"terms": {'languages': value}})
elif key == 'all_languages':
query['must'].extend([{"term": {'languages': tag}} for tag in value])
elif key == 'any_tags':
query['must'].append({"terms": {'tags.keyword': clean_tags(value)}})
elif key == 'all_tags':
query['must'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)])
elif key == 'not_tags':
query['must_not'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)])
elif key == 'not_claim_id':
query['must_not'].extend([{"term": {'claim_id.keyword': cid}} for cid in value])
elif key == 'limit_claims_per_channel':
collapse = ('channel_id.keyword', value)
if kwargs.get('has_channel_signature'):
query['must'].append({"exists": {"field": "signature"}})
if 'signature_valid' in kwargs:
query['must'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}})
elif 'signature_valid' in kwargs:
query['must'].append(
{"bool":
{"should": [
{"bool": {"must_not": {"exists": {"field": "signature"}}}},
{"bool" : {"must" : {"term": {"is_signature_valid": bool(kwargs["signature_valid"])}}}}
]}
}
)
if 'has_source' in kwargs:
is_stream_or_repost_terms = {"terms": {"claim_type": [CLAIM_TYPES['stream'], CLAIM_TYPES['repost']]}}
query['must'].append(
{"bool":
{"should": [
{"bool": # when is_stream_or_repost AND has_source
{"must": [
{"match": {"has_source": kwargs['has_source']}},
is_stream_or_repost_terms,
]
},
},
{"bool": # when not is_stream_or_repost
{"must_not": is_stream_or_repost_terms}
},
{"bool": # when reposted_claim_type wouldn't have source
{"must_not":
[
{"term": {"reposted_claim_type": CLAIM_TYPES['stream']}}
],
"must":
[
{"term": {"claim_type": CLAIM_TYPES['repost']}}
]
}
}
]}
}
)
if kwargs.get('text'):
query['must'].append(
{"simple_query_string":
{"query": kwargs["text"], "fields": [
"claim_name^4", "channel_name^8", "title^1", "description^.5", "author^1", "tags^.5"
]}})
query = {
"_source": {"excludes": ["description", "title"]},
'query': {'bool': query},
"sort": [],
}
if "limit" in kwargs:
query["size"] = kwargs["limit"]
if 'offset' in kwargs:
query["from"] = kwargs["offset"]
if 'order_by' in kwargs:
if isinstance(kwargs["order_by"], str):
kwargs["order_by"] = [kwargs["order_by"]]
for value in kwargs['order_by']:
if 'trending_group' in value:
# fixme: trending_mixed is 0 for all records on variable decay, making sort slow.
continue
is_asc = value.startswith('^')
value = value[1:] if is_asc else value
value = REPLACEMENTS.get(value, value)
if value in TEXT_FIELDS:
value += '.keyword'
query['sort'].append({value: "asc" if is_asc else "desc"})
if collapse:
query["collapse"] = {
"field": collapse[0],
"inner_hits": {
"name": collapse[0],
"size": collapse[1],
"sort": query["sort"]
}
}
return query
def expand_result(results):
inner_hits = []
expanded = []
for result in results:
if result.get("inner_hits"):
for _, inner_hit in result["inner_hits"].items():
inner_hits.extend(inner_hit["hits"]["hits"])
continue
result = result['_source']
result['claim_hash'] = bytes.fromhex(result['claim_id'])[::-1]
if result['reposted_claim_id']:
result['reposted_claim_hash'] = bytes.fromhex(result['reposted_claim_id'])[::-1]
else:
result['reposted_claim_hash'] = None
result['channel_hash'] = bytes.fromhex(result['channel_id'])[::-1] if result['channel_id'] else None
result['txo_hash'] = bytes.fromhex(result['tx_id'])[::-1] + struct.pack('<I', result['tx_nout'])
result['tx_hash'] = bytes.fromhex(result['tx_id'])[::-1]
result['reposted'] = result.pop('repost_count')
result['signature_valid'] = result.pop('is_signature_valid')
# result['normalized'] = result.pop('normalized_name')
# if result['censoring_channel_hash']:
# result['censoring_channel_hash'] = unhexlify(result['censoring_channel_hash'])[::-1]
expanded.append(result)
if inner_hits:
return expand_result(inner_hits)
return expanded

View file

@ -1,7 +1,7 @@
import typing import typing
import enum import enum
from typing import Optional from typing import Optional
from scribe.error import ResolveCensoredError from hub.error import ResolveCensoredError
@enum.unique @enum.unique
@ -53,22 +53,6 @@ class DB_PREFIXES(enum.Enum):
COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass
CLAIM_TYPES = {
'stream': 1,
'channel': 2,
'repost': 3,
'collection': 4,
}
STREAM_TYPES = {
'video': 1,
'audio': 2,
'image': 3,
'document': 4,
'binary': 5,
'model': 6,
}
# 9/21/2020 # 9/21/2020
MOST_USED_TAGS = { MOST_USED_TAGS = {
"gaming", "gaming",

View file

@ -12,19 +12,19 @@ from functools import partial
from bisect import bisect_right from bisect import bisect_right
from collections import defaultdict from collections import defaultdict
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
from scribe import PROMETHEUS_NAMESPACE from hub import PROMETHEUS_NAMESPACE
from scribe.error import ResolveCensoredError from hub.error import ResolveCensoredError
from scribe.schema.url import URL, normalize_name from hub.schema.url import URL, normalize_name
from scribe.schema.claim import guess_stream_type from hub.schema.claim import guess_stream_type
from scribe.schema.result import Censor from hub.schema.result import Censor
from scribe.blockchain.transaction import TxInput from hub.scribe.transaction import TxInput
from scribe.common import hash_to_hex_str, hash160, LRUCacheWithMetrics, sha256 from hub.common import hash_to_hex_str, hash160, LRUCacheWithMetrics, sha256, STREAM_TYPES, CLAIM_TYPES
from scribe.db.merkle import Merkle, MerkleCache, FastMerkleCacheItem from hub.db.merkle import Merkle, MerkleCache, FastMerkleCacheItem
from scribe.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES, ExpandedResolveResult, DBError, UTXO from hub.db.common import ResolveResult,ExpandedResolveResult, DBError, UTXO
from scribe.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB from hub.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB
from scribe.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE, EffectiveAmountKey from hub.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE, EffectiveAmountKey
from scribe.db.prefixes import PendingActivationKey, TXOToClaimValue, DBStatePrefixRow, MempoolTXPrefixRow from hub.db.prefixes import PendingActivationKey, TXOToClaimValue, DBStatePrefixRow, MempoolTXPrefixRow
from scribe.db.prefixes import HashXMempoolStatusPrefixRow from hub.db.prefixes import HashXMempoolStatusPrefixRow
TXO_STRUCT = struct.Struct(b'>LH') TXO_STRUCT = struct.Struct(b'>LH')

View file

@ -2,8 +2,8 @@ import struct
import typing import typing
import rocksdb import rocksdb
from typing import Optional from typing import Optional
from scribe.db.common import DB_PREFIXES, COLUMN_SETTINGS from hub.db.common import DB_PREFIXES, COLUMN_SETTINGS
from scribe.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete from hub.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
ROW_TYPES = {} ROW_TYPES = {}

View file

@ -29,7 +29,7 @@ import typing
from asyncio import Event from asyncio import Event
from math import ceil, log from math import ceil, log
from scribe.common import double_sha256 from hub.common import double_sha256
class Merkle: class Merkle:

View file

@ -3,9 +3,9 @@ import time
import array import array
import typing import typing
from bisect import bisect_right from bisect import bisect_right
from scribe.common import sha256 from hub.common import sha256
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from scribe.db.db import HubDB from hub.db.db import HubDB
FROM_VERSION = 7 FROM_VERSION = 7
TO_VERSION = 8 TO_VERSION = 8

View file

@ -3,9 +3,9 @@ import struct
import array import array
import base64 import base64
from typing import Union, Tuple, NamedTuple, Optional from typing import Union, Tuple, NamedTuple, Optional
from scribe.db.common import DB_PREFIXES from hub.db.common import DB_PREFIXES
from scribe.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow from hub.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow
from scribe.schema.url import normalize_name from hub.schema.url import normalize_name
ACTIVATED_CLAIM_TXO_TYPE = 1 ACTIVATED_CLAIM_TXO_TYPE = 1
ACTIVATED_SUPPORT_TXO_TYPE = 2 ACTIVATED_SUPPORT_TXO_TYPE = 2

View file

@ -3,7 +3,7 @@ import logging
from string import printable from string import printable
from collections import defaultdict from collections import defaultdict
from typing import Tuple, Iterable, Callable, Optional, List from typing import Tuple, Iterable, Callable, Optional, List
from scribe.db.common import DB_PREFIXES from hub.db.common import DB_PREFIXES
_OP_STRUCT = struct.Struct('>BLL') _OP_STRUCT = struct.Struct('>BLL')
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -58,7 +58,7 @@ class RevertableOp:
return str(self) return str(self)
def __str__(self) -> str: def __str__(self) -> str:
from scribe.db.prefixes import auto_decode_item from hub.db.prefixes import auto_decode_item
k, v = auto_decode_item(self.key, self.value) k, v = auto_decode_item(self.key, self.value)
key = ''.join(c if c in printable else '.' for c in str(k)) key = ''.join(c if c in printable else '.' for c in str(k))
val = ''.join(c if c in printable else '.' for c in str(v)) val = ''.join(c if c in printable else '.' for c in str(v))

View file

@ -2,9 +2,9 @@ import os
import logging import logging
import traceback import traceback
import argparse import argparse
from scribe.common import setup_logging from hub.common import setup_logging
from scribe.elasticsearch.env import ElasticEnv from hub.elastic_sync.env import ElasticEnv
from scribe.elasticsearch.service import ElasticSyncService from hub.elastic_sync.service import ElasticSyncService
def main(): def main():

View file

@ -1,4 +1,4 @@
from scribe.env import Env from hub.env import Env
class ElasticEnv(Env): class ElasticEnv(Env):

View file

@ -5,16 +5,15 @@ import asyncio
from collections import defaultdict from collections import defaultdict
from elasticsearch import AsyncElasticsearch, NotFoundError from elasticsearch import AsyncElasticsearch, NotFoundError
from elasticsearch.helpers import async_streaming_bulk from elasticsearch.helpers import async_streaming_bulk
from scribe.schema.result import Censor from hub.schema.result import Censor
from scribe.service import BlockchainReaderService from hub.service import BlockchainReaderService
from scribe.db.revertable import RevertableOp from hub.common import IndexVersionMismatch, ALL_FIELDS, INDEX_DEFAULT_SETTINGS, expand_query
from scribe.db.common import TrendingNotification, DB_PREFIXES from hub.db.revertable import RevertableOp
from scribe.elasticsearch.notifier_protocol import ElasticNotifierProtocol from hub.db.common import TrendingNotification, DB_PREFIXES
from scribe.elasticsearch.search import IndexVersionMismatch, expand_query from hub.notifier_protocol import ElasticNotifierProtocol
from scribe.elasticsearch.constants import ALL_FIELDS, INDEX_DEFAULT_SETTINGS from hub.elastic_sync.fast_ar_trending import FAST_AR_TRENDING_SCRIPT
from scribe.elasticsearch.fast_ar_trending import FAST_AR_TRENDING_SCRIPT
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from scribe.elasticsearch.env import ElasticEnv from hub.elastic_sync.env import ElasticEnv
class ElasticSyncService(BlockchainReaderService): class ElasticSyncService(BlockchainReaderService):

View file

@ -3,7 +3,7 @@ import re
import resource import resource
import logging import logging
from collections import namedtuple from collections import namedtuple
from scribe.blockchain.network import LBCMainNet, LBCTestNet, LBCRegTest from hub.scribe.network import LBCMainNet, LBCTestNet, LBCRegTest
NetIdentity = namedtuple('NetIdentity', 'host tcp_port ssl_port nick_suffix') NetIdentity = namedtuple('NetIdentity', 'host tcp_port ssl_port nick_suffix')

View file

@ -2,20 +2,20 @@ import os
import logging import logging
import traceback import traceback
import argparse import argparse
from scribe.common import setup_logging from hub.common import setup_logging
from scribe.hub.env import ServerEnv from hub.herald.env import ServerEnv
from scribe.hub.service import HubServerService from hub.herald.service import HubServerService
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog='scribe-hub' prog='herald'
) )
ServerEnv.contribute_to_arg_parser(parser) ServerEnv.contribute_to_arg_parser(parser)
args = parser.parse_args() args = parser.parse_args()
try: try:
env = ServerEnv.from_arg_parser(args) env = ServerEnv.from_arg_parser(args)
setup_logging(os.path.join(env.db_dir, 'scribe-hub.log')) setup_logging(os.path.join(env.db_dir, 'herald.log'))
server = HubServerService(env) server = HubServerService(env)
server.run() server.run()
except Exception: except Exception:

View file

@ -1,7 +1,7 @@
import inspect import inspect
from collections import namedtuple from collections import namedtuple
from functools import lru_cache from functools import lru_cache
from scribe.common import CodeMessageError from hub.common import CodeMessageError
SignatureInfo = namedtuple('SignatureInfo', 'min_args max_args ' SignatureInfo = namedtuple('SignatureInfo', 'min_args max_args '

View file

@ -1,5 +1,5 @@
import re import re
from scribe.env import Env from hub.env import Env
class ServerEnv(Env): class ServerEnv(Env):

View file

@ -6,8 +6,8 @@ import asyncio
from asyncio import Event from asyncio import Event
from functools import partial from functools import partial
from numbers import Number from numbers import Number
from scribe.common import RPCError, CodeMessageError from hub.common import RPCError, CodeMessageError
from scribe.hub.common import Notification, Request, Response, Batch, ProtocolError from hub.herald.common import Notification, Request, Response, Batch, ProtocolError
class JSONRPC: class JSONRPC:

View file

@ -6,14 +6,14 @@ import logging
from collections import defaultdict from collections import defaultdict
from prometheus_client import Histogram, Gauge from prometheus_client import Histogram, Gauge
import rocksdb.errors import rocksdb.errors
from scribe import PROMETHEUS_NAMESPACE from hub import PROMETHEUS_NAMESPACE
from scribe.common import HISTOGRAM_BUCKETS from hub.common import HISTOGRAM_BUCKETS
from scribe.db.common import UTXO from hub.db.common import UTXO
from scribe.blockchain.transaction.deserializer import Deserializer from hub.scribe.transaction.deserializer import Deserializer
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from scribe.hub.session import SessionManager from hub.herald.session import SessionManager
from scribe.db import HubDB from hub.db import HubDB
@attr.s(slots=True) @attr.s(slots=True)

View file

@ -1,23 +1,16 @@
import logging import logging
import asyncio import asyncio
import struct
from bisect import bisect_right from bisect import bisect_right
from collections import Counter, deque from collections import Counter, deque
from decimal import Decimal
from operator import itemgetter from operator import itemgetter
from typing import Optional, List, Iterable, TYPE_CHECKING from typing import Optional, List, TYPE_CHECKING
from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError
from scribe.schema.result import Censor, Outputs from hub.schema.result import Censor, Outputs
from scribe.schema.tags import clean_tags from hub.common import LRUCache, IndexVersionMismatch, INDEX_DEFAULT_SETTINGS, expand_query, expand_result
from scribe.schema.url import normalize_name from hub.db.common import ResolveResult
from scribe.error import TooManyClaimSearchParametersError
from scribe.common import LRUCache
from scribe.db.common import CLAIM_TYPES, STREAM_TYPES
from scribe.elasticsearch.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, RANGE_FIELDS
from scribe.db.common import ResolveResult
if TYPE_CHECKING: if TYPE_CHECKING:
from scribe.db import HubDB from hub.db import HubDB
class ChannelResolution(str): class ChannelResolution(str):
@ -32,12 +25,6 @@ class StreamResolution(str):
return LookupError(f'Could not find claim at "{url}".') return LookupError(f'Could not find claim at "{url}".')
class IndexVersionMismatch(Exception):
def __init__(self, got_version, expected_version):
self.got_version = got_version
self.expected_version = expected_version
class SearchIndex: class SearchIndex:
VERSION = 1 VERSION = 1
@ -91,7 +78,7 @@ class SearchIndex:
self.logger.error("es search index has an incompatible version: %s vs %s", index_version, self.VERSION) self.logger.error("es search index has an incompatible version: %s vs %s", index_version, self.VERSION)
raise IndexVersionMismatch(index_version, self.VERSION) raise IndexVersionMismatch(index_version, self.VERSION)
await self.sync_client.indices.refresh(self.index) await self.sync_client.indices.refresh(self.index)
return acked return True
async def stop(self): async def stop(self):
clients = [c for c in (self.sync_client, self.search_client) if c is not None] clients = [c for c in (self.sync_client, self.search_client) if c is not None]
@ -297,234 +284,6 @@ class SearchIndex:
return referenced_txos return referenced_txos
def expand_query(**kwargs):
if "amount_order" in kwargs:
kwargs["limit"] = 1
kwargs["order_by"] = "effective_amount"
kwargs["offset"] = int(kwargs["amount_order"]) - 1
if 'name' in kwargs:
kwargs['name'] = normalize_name(kwargs.pop('name'))
if kwargs.get('is_controlling') is False:
kwargs.pop('is_controlling')
query = {'must': [], 'must_not': []}
collapse = None
if 'fee_currency' in kwargs and kwargs['fee_currency'] is not None:
kwargs['fee_currency'] = kwargs['fee_currency'].upper()
for key, value in kwargs.items():
key = key.replace('claim.', '')
many = key.endswith('__in') or isinstance(value, list)
if many and len(value) > 2048:
raise TooManyClaimSearchParametersError(key, 2048)
if many:
key = key.replace('__in', '')
value = list(filter(None, value))
if value is None or isinstance(value, list) and len(value) == 0:
continue
key = REPLACEMENTS.get(key, key)
if key in FIELDS:
partial_id = False
if key == 'claim_type':
if isinstance(value, str):
value = CLAIM_TYPES[value]
else:
value = [CLAIM_TYPES[claim_type] for claim_type in value]
elif key == 'stream_type':
value = [STREAM_TYPES[value]] if isinstance(value, str) else list(map(STREAM_TYPES.get, value))
if key == '_id':
if isinstance(value, Iterable):
value = [item[::-1].hex() for item in value]
else:
value = value[::-1].hex()
if not many and key in ('_id', 'claim_id', 'sd_hash') and len(value) < 20:
partial_id = True
if key in ('signature_valid', 'has_source'):
continue # handled later
if key in TEXT_FIELDS:
key += '.keyword'
ops = {'<=': 'lte', '>=': 'gte', '<': 'lt', '>': 'gt'}
if partial_id:
query['must'].append({"prefix": {key: value}})
elif key in RANGE_FIELDS and isinstance(value, str) and value[0] in ops:
operator_length = 2 if value[:2] in ops else 1
operator, value = value[:operator_length], value[operator_length:]
if key == 'fee_amount':
value = str(Decimal(value)*1000)
query['must'].append({"range": {key: {ops[operator]: value}}})
elif key in RANGE_FIELDS and isinstance(value, list) and all(v[0] in ops for v in value):
range_constraints = []
release_times = []
for v in value:
operator_length = 2 if v[:2] in ops else 1
operator, stripped_op_v = v[:operator_length], v[operator_length:]
if key == 'fee_amount':
stripped_op_v = str(Decimal(stripped_op_v)*1000)
if key == 'release_time':
release_times.append((operator, stripped_op_v))
else:
range_constraints.append((operator, stripped_op_v))
if key != 'release_time':
query['must'].append({"range": {key: {ops[operator]: v for operator, v in range_constraints}}})
else:
query['must'].append(
{"bool":
{"should": [
{"bool": {
"must_not": {
"exists": {
"field": "release_time"
}
}
}},
{"bool": {
"must": [
{"exists": {"field": "release_time"}},
{'range': {key: {ops[operator]: v for operator, v in release_times}}},
]}},
]}
}
)
elif many:
query['must'].append({"terms": {key: value}})
else:
if key == 'fee_amount':
value = str(Decimal(value)*1000)
query['must'].append({"term": {key: {"value": value}}})
elif key == 'not_channel_ids':
for channel_id in value:
query['must_not'].append({"term": {'channel_id.keyword': channel_id}})
query['must_not'].append({"term": {'_id': channel_id}})
elif key == 'channel_ids':
query['must'].append({"terms": {'channel_id.keyword': value}})
elif key == 'claim_ids':
query['must'].append({"terms": {'claim_id.keyword': value}})
elif key == 'media_types':
query['must'].append({"terms": {'media_type.keyword': value}})
elif key == 'any_languages':
query['must'].append({"terms": {'languages': clean_tags(value)}})
elif key == 'any_languages':
query['must'].append({"terms": {'languages': value}})
elif key == 'all_languages':
query['must'].extend([{"term": {'languages': tag}} for tag in value])
elif key == 'any_tags':
query['must'].append({"terms": {'tags.keyword': clean_tags(value)}})
elif key == 'all_tags':
query['must'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)])
elif key == 'not_tags':
query['must_not'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)])
elif key == 'not_claim_id':
query['must_not'].extend([{"term": {'claim_id.keyword': cid}} for cid in value])
elif key == 'limit_claims_per_channel':
collapse = ('channel_id.keyword', value)
if kwargs.get('has_channel_signature'):
query['must'].append({"exists": {"field": "signature"}})
if 'signature_valid' in kwargs:
query['must'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}})
elif 'signature_valid' in kwargs:
query['must'].append(
{"bool":
{"should": [
{"bool": {"must_not": {"exists": {"field": "signature"}}}},
{"bool" : {"must" : {"term": {"is_signature_valid": bool(kwargs["signature_valid"])}}}}
]}
}
)
if 'has_source' in kwargs:
is_stream_or_repost_terms = {"terms": {"claim_type": [CLAIM_TYPES['stream'], CLAIM_TYPES['repost']]}}
query['must'].append(
{"bool":
{"should": [
{"bool": # when is_stream_or_repost AND has_source
{"must": [
{"match": {"has_source": kwargs['has_source']}},
is_stream_or_repost_terms,
]
},
},
{"bool": # when not is_stream_or_repost
{"must_not": is_stream_or_repost_terms}
},
{"bool": # when reposted_claim_type wouldn't have source
{"must_not":
[
{"term": {"reposted_claim_type": CLAIM_TYPES['stream']}}
],
"must":
[
{"term": {"claim_type": CLAIM_TYPES['repost']}}
]
}
}
]}
}
)
if kwargs.get('text'):
query['must'].append(
{"simple_query_string":
{"query": kwargs["text"], "fields": [
"claim_name^4", "channel_name^8", "title^1", "description^.5", "author^1", "tags^.5"
]}})
query = {
"_source": {"excludes": ["description", "title"]},
'query': {'bool': query},
"sort": [],
}
if "limit" in kwargs:
query["size"] = kwargs["limit"]
if 'offset' in kwargs:
query["from"] = kwargs["offset"]
if 'order_by' in kwargs:
if isinstance(kwargs["order_by"], str):
kwargs["order_by"] = [kwargs["order_by"]]
for value in kwargs['order_by']:
if 'trending_group' in value:
# fixme: trending_mixed is 0 for all records on variable decay, making sort slow.
continue
is_asc = value.startswith('^')
value = value[1:] if is_asc else value
value = REPLACEMENTS.get(value, value)
if value in TEXT_FIELDS:
value += '.keyword'
query['sort'].append({value: "asc" if is_asc else "desc"})
if collapse:
query["collapse"] = {
"field": collapse[0],
"inner_hits": {
"name": collapse[0],
"size": collapse[1],
"sort": query["sort"]
}
}
return query
def expand_result(results):
inner_hits = []
expanded = []
for result in results:
if result.get("inner_hits"):
for _, inner_hit in result["inner_hits"].items():
inner_hits.extend(inner_hit["hits"]["hits"])
continue
result = result['_source']
result['claim_hash'] = bytes.fromhex(result['claim_id'])[::-1]
if result['reposted_claim_id']:
result['reposted_claim_hash'] = bytes.fromhex(result['reposted_claim_id'])[::-1]
else:
result['reposted_claim_hash'] = None
result['channel_hash'] = bytes.fromhex(result['channel_id'])[::-1] if result['channel_id'] else None
result['txo_hash'] = bytes.fromhex(result['tx_id'])[::-1] + struct.pack('<I', result['tx_nout'])
result['tx_hash'] = bytes.fromhex(result['tx_id'])[::-1]
result['reposted'] = result.pop('repost_count')
result['signature_valid'] = result.pop('is_signature_valid')
# result['normalized'] = result.pop('normalized_name')
# if result['censoring_channel_hash']:
# result['censoring_channel_hash'] = unhexlify(result['censoring_channel_hash'])[::-1]
expanded.append(result)
if inner_hits:
return expand_result(inner_hits)
return expanded
class ResultCacheItem: class ResultCacheItem:
__slots__ = '_result', 'lock', 'has_result' __slots__ = '_result', 'lock', 'has_result'

View file

@ -1,14 +1,14 @@
import time import time
import typing import typing
import asyncio import asyncio
from scribe.blockchain.daemon import LBCDaemon from hub.scribe.daemon import LBCDaemon
from scribe.hub.session import SessionManager from hub.herald.session import SessionManager
from scribe.hub.mempool import HubMemPool from hub.herald.mempool import HubMemPool
from scribe.hub.udp import StatusServer from hub.herald.udp import StatusServer
from scribe.service import BlockchainReaderService from hub.service import BlockchainReaderService
from scribe.elasticsearch import ElasticNotifierClientProtocol from hub.notifier_protocol import ElasticNotifierClientProtocol
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from scribe.hub.env import ServerEnv from hub.herald.env import ServerEnv
class HubServerService(BlockchainReaderService): class HubServerService(BlockchainReaderService):

View file

@ -15,22 +15,22 @@ from contextlib import suppress
from functools import partial from functools import partial
from elasticsearch import ConnectionTimeout from elasticsearch import ConnectionTimeout
from prometheus_client import Counter, Info, Histogram, Gauge from prometheus_client import Counter, Info, Histogram, Gauge
from scribe.schema.result import Outputs from hub.schema.result import Outputs
from scribe.error import ResolveCensoredError, TooManyClaimSearchParametersError from hub.error import ResolveCensoredError, TooManyClaimSearchParametersError
from scribe import __version__, PROMETHEUS_NAMESPACE from hub import __version__, PROMETHEUS_NAMESPACE
from scribe.hub import PROTOCOL_MIN, PROTOCOL_MAX, HUB_PROTOCOL_VERSION from hub.herald import PROTOCOL_MIN, PROTOCOL_MAX, HUB_PROTOCOL_VERSION
from scribe.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from scribe.elasticsearch import SearchIndex from hub.herald.search import SearchIndex
from scribe.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time
from scribe.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS
from scribe.hub.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC
from scribe.hub.common import BatchRequest, ProtocolError, Request, Batch, Notification from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification
from scribe.hub.framer import NewlineFramer from hub.herald.framer import NewlineFramer
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from scribe.db import HubDB from hub.db import HubDB
from scribe.hub.env import ServerEnv from hub.herald.env import ServerEnv
from scribe.blockchain.daemon import LBCDaemon from hub.scribe.daemon import LBCDaemon
from scribe.hub.mempool import HubMemPool from hub.herald.mempool import HubMemPool
BAD_REQUEST = 1 BAD_REQUEST = 1
DAEMON_ERROR = 2 DAEMON_ERROR = 2

View file

@ -3,8 +3,8 @@ import struct
from time import perf_counter from time import perf_counter
import logging import logging
from typing import Optional, Tuple, NamedTuple from typing import Optional, Tuple, NamedTuple
from scribe.schema.attrs import country_str_to_int, country_int_to_str from hub.schema.attrs import country_str_to_int, country_int_to_str
from scribe.common import LRUCache, is_valid_public_ipv4 from hub.common import LRUCache, is_valid_public_ipv4
log = logging.getLogger(__name__) log = logging.getLogger(__name__)

View file

@ -7,13 +7,13 @@ from string import ascii_letters
from decimal import Decimal, ROUND_UP from decimal import Decimal, ROUND_UP
from google.protobuf.json_format import MessageToDict from google.protobuf.json_format import MessageToDict
from scribe.schema.base58 import Base58, b58_encode from hub.schema.base58 import Base58, b58_encode
from scribe.error import MissingPublishedFileError, EmptyPublishedFileError from hub.error import MissingPublishedFileError, EmptyPublishedFileError
from scribe.schema.mime_types import guess_media_type from hub.schema.mime_types import guess_media_type
from scribe.schema.base import Metadata, BaseMessageList from hub.schema.base import Metadata, BaseMessageList
from scribe.schema.tags import normalize_tag from hub.schema.tags import normalize_tag
from scribe.schema.types.v2.claim_pb2 import ( from hub.schema.types.v2.claim_pb2 import (
Fee as FeeMessage, Fee as FeeMessage,
Location as LocationMessage, Location as LocationMessage,
Language as LanguageMessage Language as LanguageMessage

View file

@ -8,7 +8,7 @@ from coincurve.utils import (
pem_to_der, lib as libsecp256k1, ffi as libsecp256k1_ffi pem_to_der, lib as libsecp256k1, ffi as libsecp256k1_ffi
) )
from coincurve.ecdsa import CDATA_SIG_LENGTH from coincurve.ecdsa import CDATA_SIG_LENGTH
from scribe.schema.base58 import Base58 from hub.schema.base58 import Base58
if (sys.version_info.major, sys.version_info.minor) > (3, 7): if (sys.version_info.major, sys.version_info.minor) > (3, 7):

View file

@ -11,15 +11,15 @@ from hachoir.core.log import log as hachoir_log
from hachoir.parser import createParser as binary_file_parser from hachoir.parser import createParser as binary_file_parser
from hachoir.metadata import extractMetadata as binary_file_metadata from hachoir.metadata import extractMetadata as binary_file_metadata
from scribe.schema import compat from hub.schema import compat
from scribe.schema.base import Signable from hub.schema.base import Signable
from scribe.schema.mime_types import guess_media_type, guess_stream_type from hub.schema.mime_types import guess_media_type, guess_stream_type
from scribe.schema.attrs import ( from hub.schema.attrs import (
Source, Playable, Dimmensional, Fee, Image, Video, Audio, Source, Playable, Dimmensional, Fee, Image, Video, Audio,
LanguageList, LocationList, ClaimList, ClaimReference, TagList LanguageList, LocationList, ClaimList, ClaimReference, TagList
) )
from scribe.schema.types.v2.claim_pb2 import Claim as ClaimMessage from hub.schema.types.v2.claim_pb2 import Claim as ClaimMessage
from scribe.error import InputValueIsNoneError from hub.error import InputValueIsNoneError
hachoir_log.use_print = False hachoir_log.use_print = False

View file

@ -3,9 +3,9 @@ from decimal import Decimal
from google.protobuf.message import DecodeError from google.protobuf.message import DecodeError
from scribe.schema.types.v1.legacy_claim_pb2 import Claim as OldClaimMessage from hub.schema.types.v1.legacy_claim_pb2 import Claim as OldClaimMessage
from scribe.schema.types.v1.certificate_pb2 import KeyType from hub.schema.types.v1.certificate_pb2 import KeyType
from scribe.schema.types.v1.fee_pb2 import Fee as FeeMessage from hub.schema.types.v1.fee_pb2 import Fee as FeeMessage
def from_old_json_schema(claim, payload: bytes): def from_old_json_schema(claim, payload: bytes):

View file

@ -1,6 +1,6 @@
from google.protobuf.message import DecodeError from google.protobuf.message import DecodeError
from google.protobuf.json_format import MessageToDict from google.protobuf.json_format import MessageToDict
from scribe.schema.types.v2.purchase_pb2 import Purchase as PurchaseMessage from hub.schema.types.v2.purchase_pb2 import Purchase as PurchaseMessage
from .attrs import ClaimReference from .attrs import ClaimReference

View file

@ -2,11 +2,11 @@ import base64
from typing import List, TYPE_CHECKING, Union, Optional, Dict, Set, Tuple from typing import List, TYPE_CHECKING, Union, Optional, Dict, Set, Tuple
from itertools import chain from itertools import chain
from scribe.error import ResolveCensoredError from hub.error import ResolveCensoredError
from scribe.schema.types.v2.result_pb2 import Outputs as OutputsMessage from hub.schema.types.v2.result_pb2 import Outputs as OutputsMessage
from scribe.schema.types.v2.result_pb2 import Error as ErrorMessage from hub.schema.types.v2.result_pb2 import Error as ErrorMessage
if TYPE_CHECKING: if TYPE_CHECKING:
from scribe.db.common import ResolveResult from hub.db.common import ResolveResult
INVALID = ErrorMessage.Code.Name(ErrorMessage.INVALID) INVALID = ErrorMessage.Code.Name(ErrorMessage.INVALID)
NOT_FOUND = ErrorMessage.Code.Name(ErrorMessage.NOT_FOUND) NOT_FOUND = ErrorMessage.Code.Name(ErrorMessage.NOT_FOUND)
BLOCKED = ErrorMessage.Code.Name(ErrorMessage.BLOCKED) BLOCKED = ErrorMessage.Code.Name(ErrorMessage.BLOCKED)

View file

@ -1,5 +1,5 @@
from scribe.schema.base import Signable from hub.schema.base import Signable
from scribe.schema.types.v2.support_pb2 import Support as SupportMessage from hub.schema.types.v2.support_pb2 import Support as SupportMessage
class Support(Signable): class Support(Signable):

View file

1
hub/scribe/__init__.py Normal file
View file

@ -0,0 +1 @@
from hub.scribe.network import LBCTestNet, LBCRegTest, LBCMainNet

View file

@ -2,9 +2,9 @@ import os
import logging import logging
import traceback import traceback
import argparse import argparse
from scribe.common import setup_logging from hub.common import setup_logging
from scribe.blockchain.env import BlockchainEnv from hub.scribe.env import BlockchainEnv
from scribe.blockchain.service import BlockchainProcessorService from hub.scribe.service import BlockchainProcessorService
def main(): def main():

View file

@ -7,8 +7,8 @@ from functools import wraps
import aiohttp import aiohttp
from prometheus_client import Gauge, Histogram from prometheus_client import Gauge, Histogram
from scribe import PROMETHEUS_NAMESPACE from hub import PROMETHEUS_NAMESPACE
from scribe.common import LRUCacheWithMetrics, RPCError, DaemonError, WarmingUpError, WorkQueueFullError from hub.common import LRUCacheWithMetrics, RPCError, DaemonError, WarmingUpError, WorkQueueFullError
log = logging.getLogger(__name__) log = logging.getLogger(__name__)

View file

@ -1,4 +1,4 @@
from scribe.env import Env from hub.env import Env
class BlockchainEnv(Env): class BlockchainEnv(Env):

View file

@ -2,10 +2,10 @@ import itertools
import attr import attr
import typing import typing
from collections import defaultdict from collections import defaultdict
from scribe.blockchain.transaction.deserializer import Deserializer from hub.scribe.transaction.deserializer import Deserializer
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from scribe.db import HubDB from hub.db import HubDB
@attr.s(slots=True) @attr.s(slots=True)

View file

@ -4,12 +4,12 @@ import typing
from typing import List from typing import List
from hashlib import sha256 from hashlib import sha256
from decimal import Decimal from decimal import Decimal
from scribe.schema.base58 import Base58 from hub.schema.base58 import Base58
from scribe.schema.bip32 import PublicKey from hub.schema.bip32 import PublicKey
from scribe.common import hash160, hash_to_hex_str, double_sha256 from hub.common import hash160, hash_to_hex_str, double_sha256
from scribe.blockchain.transaction import TxOutput, TxInput, Block from hub.scribe.transaction import TxOutput, TxInput, Block
from scribe.blockchain.transaction.deserializer import Deserializer from hub.scribe.transaction.deserializer import Deserializer
from scribe.blockchain.transaction.script import OpCodes, P2PKH_script, P2SH_script, txo_script_parser from hub.scribe.transaction.script import OpCodes, P2PKH_script, P2SH_script, txo_script_parser
HASHX_LEN = 11 HASHX_LEN = 11

View file

@ -2,8 +2,8 @@ import asyncio
import logging import logging
import typing import typing
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from scribe.blockchain.network import LBCMainNet from hub.scribe.network import LBCMainNet
from scribe.blockchain.daemon import LBCDaemon from hub.scribe.daemon import LBCDaemon
def chunks(items, size): def chunks(items, size):

View file

@ -7,20 +7,20 @@ from typing import Optional, List, Tuple, Set, DefaultDict, Dict
from prometheus_client import Gauge, Histogram from prometheus_client import Gauge, Histogram
from collections import defaultdict from collections import defaultdict
from scribe import PROMETHEUS_NAMESPACE from hub import PROMETHEUS_NAMESPACE
from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from scribe.error.base import ChainError from hub.error.base import ChainError
from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LRUCache from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LRUCache
from scribe.blockchain.daemon import LBCDaemon from hub.scribe.daemon import LBCDaemon
from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block from hub.scribe.transaction import Tx, TxOutput, TxInput, Block
from scribe.blockchain.prefetcher import Prefetcher from hub.scribe.prefetcher import Prefetcher
from scribe.blockchain.mempool import MemPool from hub.scribe.mempool import MemPool
from scribe.schema.url import normalize_name from hub.schema.url import normalize_name
from scribe.service import BlockchainService from hub.service import BlockchainService
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from scribe.blockchain.env import BlockchainEnv from hub.scribe.env import BlockchainEnv
from scribe.db.revertable import RevertableOpStack from hub.db.revertable import RevertableOpStack
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_writer" NAMESPACE = f"{PROMETHEUS_NAMESPACE}_writer"
@ -1725,9 +1725,9 @@ class BlockchainProcessorService(BlockchainService):
def _iter_start_tasks(self): def _iter_start_tasks(self):
while self.db.db_version < max(self.db.DB_VERSIONS): while self.db.db_version < max(self.db.DB_VERSIONS):
if self.db.db_version == 7: if self.db.db_version == 7:
from scribe.db.migrators.migrate7to8 import migrate, FROM_VERSION, TO_VERSION from hub.db.migrators.migrate7to8 import migrate, FROM_VERSION, TO_VERSION
elif self.db.db_version == 8: elif self.db.db_version == 8:
from scribe.db.migrators.migrate8to9 import migrate, FROM_VERSION, TO_VERSION from hub.db.migrators.migrate8to9 import migrate, FROM_VERSION, TO_VERSION
self.db._index_address_status = self.env.index_address_status self.db._index_address_status = self.env.index_address_status
else: else:
raise RuntimeError("unknown db version") raise RuntimeError("unknown db version")

View file

@ -3,8 +3,8 @@ import functools
import typing import typing
from dataclasses import dataclass from dataclasses import dataclass
from struct import Struct from struct import Struct
from scribe.schema.claim import Claim from hub.schema.claim import Claim
from scribe.common import double_sha256 from hub.common import double_sha256
if (sys.version_info.major, sys.version_info.minor) > (3, 7): if (sys.version_info.major, sys.version_info.minor) > (3, 7):
cachedproperty = functools.cached_property cachedproperty = functools.cached_property

View file

@ -1,9 +1,9 @@
from scribe.common import double_sha256 from hub.common import double_sha256
from scribe.blockchain.transaction import ( from hub.scribe.transaction import (
unpack_le_int32_from, unpack_le_int64_from, unpack_le_uint16_from, unpack_le_int32_from, unpack_le_int64_from, unpack_le_uint16_from,
unpack_le_uint32_from, unpack_le_uint64_from, Tx, TxInput, TxOutput unpack_le_uint32_from, unpack_le_uint64_from, Tx, TxInput, TxOutput
) )
from scribe.blockchain.transaction.script import txo_script_parser from hub.scribe.transaction.script import txo_script_parser
class Deserializer: class Deserializer:

View file

@ -1,6 +1,6 @@
import typing import typing
from scribe.blockchain.transaction import NameClaim, ClaimUpdate, ClaimSupport from hub.scribe.transaction import NameClaim, ClaimUpdate, ClaimSupport
from scribe.blockchain.transaction import unpack_le_uint16_from, unpack_le_uint32_from, pack_le_uint16, pack_le_uint32 from hub.scribe.transaction import unpack_le_uint16_from, unpack_le_uint32_from, pack_le_uint16, pack_le_uint32
class _OpCodes(typing.NamedTuple): class _OpCodes(typing.NamedTuple):

View file

@ -5,12 +5,12 @@ import signal
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
from prometheus_client import Gauge, Histogram from prometheus_client import Gauge, Histogram
from scribe import __version__, PROMETHEUS_NAMESPACE from hub import __version__, PROMETHEUS_NAMESPACE
from scribe.env import Env from hub.env import Env
from scribe.db import HubDB from hub.db import HubDB
from scribe.db.prefixes import DBState from hub.db.prefixes import DBState
from scribe.common import HISTOGRAM_BUCKETS from hub.common import HISTOGRAM_BUCKETS
from scribe.metrics import PrometheusServer from hub.metrics import PrometheusServer
class BlockchainService: class BlockchainService:

View file

@ -1 +0,0 @@
from scribe.blockchain.network import LBCTestNet, LBCRegTest, LBCMainNet

View file

@ -1,400 +0,0 @@
import hashlib
import hmac
import ipaddress
import logging
import logging.handlers
import typing
import collections
from asyncio import get_event_loop, Event
from prometheus_client import Counter
log = logging.getLogger(__name__)
_sha256 = hashlib.sha256
_sha512 = hashlib.sha512
_new_hash = hashlib.new
_new_hmac = hmac.new
HASHX_LEN = 11
CLAIM_HASH_LEN = 20
HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
)
def setup_logging(log_path: str):
log = logging.getLogger('scribe')
fmt = logging.Formatter("%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
handler = logging.handlers.RotatingFileHandler(log_path, maxBytes=1024*1024*5, backupCount=2)
handler.setFormatter(fmt)
log.addHandler(handler)
handler = logging.StreamHandler()
handler.setFormatter(fmt)
log.addHandler(handler)
log.setLevel(logging.INFO)
logging.getLogger('aiohttp').setLevel(logging.WARNING)
logging.getLogger('elasticsearch').setLevel(logging.WARNING)
class StagedClaimtrieItem(typing.NamedTuple):
"""
Represents a claim TXO, used internally by the block processor
"""
name: str
normalized_name: str
claim_hash: bytes
amount: int
expiration_height: int
tx_num: int
position: int
root_tx_num: int
root_position: int
channel_signature_is_valid: bool
signing_hash: typing.Optional[bytes]
reposted_claim_hash: typing.Optional[bytes]
@property
def is_update(self) -> bool:
return (self.tx_num, self.position) != (self.root_tx_num, self.root_position)
def invalidate_signature(self) -> 'StagedClaimtrieItem':
return StagedClaimtrieItem(
self.name, self.normalized_name, self.claim_hash, self.amount, self.expiration_height, self.tx_num,
self.position, self.root_tx_num, self.root_position, False, None, self.reposted_claim_hash
)
def formatted_time(t, sep=' '):
"""Return a number of seconds as a string in days, hours, mins and
maybe secs."""
t = int(t)
fmts = (('{:d}d', 86400), ('{:02d}h', 3600), ('{:02d}m', 60))
parts = []
for fmt, n in fmts:
val = t // n
if parts or val:
parts.append(fmt.format(val))
t %= n
if len(parts) < 3:
parts.append(f'{t:02d}s')
return sep.join(parts)
def protocol_tuple(s):
"""Converts a protocol version number, such as "1.0" to a tuple (1, 0).
If the version number is bad, (0, ) indicating version 0 is returned."""
try:
return tuple(int(part) for part in s.split('.'))
except Exception:
return (0, )
def version_string(ptuple):
"""Convert a version tuple such as (1, 2) to "1.2".
There is always at least one dot, so (1, ) becomes "1.0"."""
while len(ptuple) < 2:
ptuple += (0, )
return '.'.join(str(p) for p in ptuple)
def protocol_version(client_req, min_tuple, max_tuple):
"""Given a client's protocol version string, return a pair of
protocol tuples:
(negotiated version, client min request)
If the request is unsupported, the negotiated protocol tuple is
None.
"""
if client_req is None:
client_min = client_max = min_tuple
else:
if isinstance(client_req, list) and len(client_req) == 2:
client_min, client_max = client_req
else:
client_min = client_max = client_req
client_min = protocol_tuple(client_min)
client_max = protocol_tuple(client_max)
result = min(client_max, max_tuple)
if result < max(client_min, min_tuple) or result == (0, ):
result = None
return result, client_min
class LRUCacheWithMetrics:
__slots__ = [
'capacity',
'cache',
'_track_metrics',
'hits',
'misses'
]
def __init__(self, capacity: int, metric_name: typing.Optional[str] = None, namespace: str = "daemon_cache"):
self.capacity = capacity
self.cache = collections.OrderedDict()
if metric_name is None:
self._track_metrics = False
self.hits = self.misses = None
else:
self._track_metrics = True
try:
self.hits = Counter(
f"{metric_name}_cache_hit_count", "Number of cache hits", namespace=namespace
)
self.misses = Counter(
f"{metric_name}_cache_miss_count", "Number of cache misses", namespace=namespace
)
except ValueError as err:
log.debug("failed to set up prometheus %s_cache_miss_count metric: %s", metric_name, err)
self._track_metrics = False
self.hits = self.misses = None
def get(self, key, default=None):
try:
value = self.cache.pop(key)
if self._track_metrics:
self.hits.inc()
except KeyError:
if self._track_metrics:
self.misses.inc()
return default
self.cache[key] = value
return value
def set(self, key, value):
try:
self.cache.pop(key)
except KeyError:
if len(self.cache) >= self.capacity:
self.cache.popitem(last=False)
self.cache[key] = value
def clear(self):
self.cache.clear()
def pop(self, key):
return self.cache.pop(key)
def __setitem__(self, key, value):
return self.set(key, value)
def __getitem__(self, item):
return self.get(item)
def __contains__(self, item) -> bool:
return item in self.cache
def __len__(self):
return len(self.cache)
def __delitem__(self, key):
self.cache.pop(key)
def __del__(self):
self.clear()
class LRUCache:
__slots__ = [
'capacity',
'cache'
]
def __init__(self, capacity: int):
self.capacity = capacity
self.cache = collections.OrderedDict()
def get(self, key, default=None):
try:
value = self.cache.pop(key)
except KeyError:
return default
self.cache[key] = value
return value
def set(self, key, value):
try:
self.cache.pop(key)
except KeyError:
if len(self.cache) >= self.capacity:
self.cache.popitem(last=False)
self.cache[key] = value
def items(self):
return self.cache.items()
def clear(self):
self.cache.clear()
def pop(self, key, default=None):
return self.cache.pop(key, default)
def __setitem__(self, key, value):
return self.set(key, value)
def __getitem__(self, item):
return self.get(item)
def __contains__(self, item) -> bool:
return item in self.cache
def __len__(self):
return len(self.cache)
def __delitem__(self, key):
self.cache.pop(key)
def __del__(self):
self.clear()
# the ipaddress module does not show these subnets as reserved
CARRIER_GRADE_NAT_SUBNET = ipaddress.ip_network('100.64.0.0/10')
IPV4_TO_6_RELAY_SUBNET = ipaddress.ip_network('192.88.99.0/24')
def is_valid_public_ipv4(address, allow_localhost: bool = False, allow_lan: bool = False):
try:
parsed_ip = ipaddress.ip_address(address)
if parsed_ip.is_loopback and allow_localhost:
return True
if allow_lan and parsed_ip.is_private:
return True
if any((parsed_ip.version != 4, parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback,
parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private)):
return False
else:
return not any((CARRIER_GRADE_NAT_SUBNET.supernet_of(ipaddress.ip_network(f"{address}/32")),
IPV4_TO_6_RELAY_SUBNET.supernet_of(ipaddress.ip_network(f"{address}/32"))))
except (ipaddress.AddressValueError, ValueError):
return False
def sha256(x):
"""Simple wrapper of hashlib sha256."""
return _sha256(x).digest()
def ripemd160(x):
"""Simple wrapper of hashlib ripemd160."""
h = _new_hash('ripemd160')
h.update(x)
return h.digest()
def double_sha256(x):
"""SHA-256 of SHA-256, as used extensively in bitcoin."""
return sha256(sha256(x))
def hmac_sha512(key, msg):
"""Use SHA-512 to provide an HMAC."""
return _new_hmac(key, msg, _sha512).digest()
def hash160(x):
"""RIPEMD-160 of SHA-256. Used to make bitcoin addresses from pubkeys."""
return ripemd160(sha256(x))
def hash_to_hex_str(x: bytes) -> str:
"""Convert a big-endian binary hash to displayed hex string.
Display form of a binary hash is reversed and converted to hex.
"""
return x[::-1].hex()
def hex_str_to_hash(x: str) -> bytes:
"""Convert a displayed hex string to a binary hash."""
return bytes.fromhex(x)[::-1]
INVALID_REQUEST = -32600
INVALID_ARGS = -32602
class CodeMessageError(Exception):
@property
def code(self):
return self.args[0]
@property
def message(self):
return self.args[1]
def __eq__(self, other):
return (isinstance(other, self.__class__) and
self.code == other.code and self.message == other.message)
def __hash__(self):
# overridden to make the exception hashable
# see https://bugs.python.org/issue28603
return hash((self.code, self.message))
@classmethod
def invalid_args(cls, message):
return cls(INVALID_ARGS, message)
@classmethod
def invalid_request(cls, message):
return cls(INVALID_REQUEST, message)
@classmethod
def empty_batch(cls):
return cls.invalid_request('batch is empty')
class RPCError(CodeMessageError):
pass
class DaemonError(Exception):
"""Raised when the daemon returns an error in its results."""
class WarmingUpError(Exception):
"""Internal - when the daemon is warming up."""
class WorkQueueFullError(Exception):
"""Internal - when the daemon's work queue is full."""
class TaskGroup:
def __init__(self, loop=None):
self._loop = loop or get_event_loop()
self._tasks = set()
self.done = Event()
self.started = Event()
def __len__(self):
return len(self._tasks)
def add(self, coro):
task = self._loop.create_task(coro)
self._tasks.add(task)
self.started.set()
self.done.clear()
task.add_done_callback(self._remove)
return task
def _remove(self, task):
self._tasks.remove(task)
if len(self._tasks) < 1:
self.done.set()
self.started.clear()
def cancel(self):
for task in self._tasks:
task.cancel()
self.done.set()
self.started.clear()

View file

@ -1,2 +0,0 @@
from scribe.elasticsearch.search import SearchIndex
from scribe.elasticsearch.notifier_protocol import ElasticNotifierClientProtocol

View file

@ -1,105 +0,0 @@
INDEX_DEFAULT_SETTINGS = {
"settings":
{"analysis":
{"analyzer": {
"default": {"tokenizer": "whitespace", "filter": ["lowercase", "porter_stem"]}}},
"index":
{"refresh_interval": -1,
"number_of_shards": 1,
"number_of_replicas": 0,
"sort": {
"field": ["trending_score", "release_time"],
"order": ["desc", "desc"]
}}
},
"mappings": {
"properties": {
"claim_id": {
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
},
"type": "text",
"index_prefixes": {
"min_chars": 1,
"max_chars": 10
}
},
"sd_hash": {
"fields": {
"keyword": {
"ignore_above": 96,
"type": "keyword"
}
},
"type": "text",
"index_prefixes": {
"min_chars": 1,
"max_chars": 4
}
},
"height": {"type": "integer"},
"claim_type": {"type": "byte"},
"censor_type": {"type": "byte"},
"trending_score": {"type": "double"},
"release_time": {"type": "long"}
}
}
}
FIELDS = {
'_id',
'claim_id', 'claim_type', 'claim_name', 'normalized_name',
'tx_id', 'tx_nout', 'tx_position',
'short_url', 'canonical_url',
'is_controlling', 'last_take_over_height',
'public_key_bytes', 'public_key_id', 'claims_in_channel',
'channel_id', 'signature', 'signature_digest', 'is_signature_valid',
'amount', 'effective_amount', 'support_amount',
'fee_amount', 'fee_currency',
'height', 'creation_height', 'activation_height', 'expiration_height',
'stream_type', 'media_type', 'censor_type',
'title', 'author', 'description',
'timestamp', 'creation_timestamp',
'duration', 'release_time',
'tags', 'languages', 'has_source', 'reposted_claim_type',
'reposted_claim_id', 'repost_count', 'sd_hash',
'trending_score', 'tx_num',
'channel_tx_id', 'channel_tx_position', 'channel_height', 'reposted_tx_id',
'reposted_tx_position', 'reposted_height',
}
TEXT_FIELDS = {
'author', 'canonical_url', 'channel_id', 'description', 'claim_id', 'censoring_channel_id',
'media_type', 'normalized_name', 'public_key_bytes', 'public_key_id', 'short_url', 'signature',
'claim_name', 'signature_digest', 'title', 'tx_id', 'fee_currency', 'reposted_claim_id',
'tags', 'sd_hash', 'channel_tx_id', 'reposted_tx_id',
}
RANGE_FIELDS = {
'height', 'creation_height', 'activation_height', 'expiration_height',
'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount',
'tx_position', 'repost_count', 'limit_claims_per_channel',
'amount', 'effective_amount', 'support_amount',
'trending_score', 'censor_type', 'tx_num', 'reposted_tx_position', 'reposted_height',
'channel_tx_position', 'channel_height',
}
ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS
REPLACEMENTS = {
'claim_name': 'normalized_name',
'name': 'normalized_name',
'txid': 'tx_id',
'nout': 'tx_nout',
'trending_group': 'trending_score',
'trending_mixed': 'trending_score',
'trending_global': 'trending_score',
'trending_local': 'trending_score',
'reposted': 'repost_count',
'stream_types': 'stream_type',
'media_types': 'media_type',
'valid_channel_signature': 'is_signature_valid'
}

View file

@ -4,10 +4,9 @@
TARGET_HOST=$1 TARGET_HOST=$1
SCRIPTS_DIR=`dirname $0` SCRIPTS_DIR=`dirname $0`
SCRIBE_DIR=`dirname $SCRIPTS_DIR`
# build the image # build the image
docker build -f $SCRIBE_DIR/docker/Dockerfile -t lbry/scribe:development $SCRIBE_DIR docker build -t lbry/scribe:development .
IMAGE=`docker image inspect lbry/scribe:development | sed -n "s/^.*Id\":\s*\"sha256:\s*\(\S*\)\".*$/\1/p"` IMAGE=`docker image inspect lbry/scribe:development | sed -n "s/^.*Id\":\s*\"sha256:\s*\(\S*\)\".*$/\1/p"`
# push the image to the server # push the image to the server

View file

@ -5,13 +5,13 @@
set -euo pipefail set -euo pipefail
if [ -z "$HUB_COMMAND" ]; then if [ -z "$HUB_COMMAND" ]; then
echo "HUB_COMMAND env variable must be scribe, scribe-hub, or scribe-elastic-sync" echo "HUB_COMMAND env variable must be scribe, herald, or scribe-elastic-sync"
exit 1 exit 1
fi fi
case "$HUB_COMMAND" in case "$HUB_COMMAND" in
scribe ) exec /home/lbry/.local/bin/scribe "$@" ;; scribe ) exec /home/lbry/.local/bin/scribe "$@" ;;
scribe-hub ) exec /home/lbry/.local/bin/scribe-hub "$@" ;; herald ) exec /home/lbry/.local/bin/herald "$@" ;;
scribe-elastic-sync ) exec /home/lbry/.local/bin/scribe-elastic-sync "$@" ;; scribe-elastic-sync ) exec /home/lbry/.local/bin/scribe-elastic-sync "$@" ;;
* ) "HUB_COMMAND env variable must be scribe, scribe-hub, or scribe-elastic-sync" && exit 1 ;; * ) "HUB_COMMAND env variable must be scribe, herald, or scribe-elastic-sync" && exit 1 ;;
esac esac

View file

@ -2,7 +2,7 @@ import sys
import os import os
import re import re
import logging import logging
import scribe.build_info as build_info_mod import hub.build_info as build_info_mod
log = logging.getLogger() log = logging.getLogger()
log.addHandler(logging.StreamHandler()) log.addHandler(logging.StreamHandler())

View file

@ -1,5 +1,5 @@
import os import os
from scribe import __name__, __version__ from hub import __name__, __version__
from setuptools import setup, find_packages from setuptools import setup, find_packages
BASE = os.path.dirname(__file__) BASE = os.path.dirname(__file__)
@ -23,9 +23,9 @@ setup(
zip_safe=False, zip_safe=False,
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'scribe=scribe.blockchain.__main__:main', 'scribe=hub.scribe.__main__:main',
'scribe-hub=scribe.hub.__main__:main', 'herald=hub.herald.__main__:main',
'scribe-elastic-sync=scribe.elasticsearch.__main__:main', 'scribe-elastic-sync=hub.elastic_sync.__main__:main',
], ],
}, },
install_requires=[ install_requires=[

View file

@ -8,7 +8,7 @@ from collections import defaultdict
from typing import NamedTuple, List from typing import NamedTuple, List
from lbry.testcase import CommandTestCase from lbry.testcase import CommandTestCase
from lbry.wallet.transaction import Transaction, Output from lbry.wallet.transaction import Transaction, Output
from scribe.schema.compat import OldClaimMessage from hub.schema.compat import OldClaimMessage
from lbry.crypto.hash import sha256 from lbry.crypto.hash import sha256
from lbry.crypto.base58 import Base58 from lbry.crypto.base58 import Base58

Some files were not shown because too many files have changed in this diff Show more