diff --git a/docker/Dockerfile b/Dockerfile similarity index 92% rename from docker/Dockerfile rename to Dockerfile index e72a8bb..c8fd8ac 100644 --- a/docker/Dockerfile +++ b/Dockerfile @@ -35,12 +35,12 @@ USER $user WORKDIR $projects_dir RUN python3.9 -m pip install pip RUN python3.9 -m pip install -e . -RUN python3.9 docker/set_build.py +RUN python3.9 scripts/set_build.py RUN rm ~/.cache -rf # entry point VOLUME $db_dir ENV DB_DIRECTORY=$db_dir -COPY ./docker/scribe_entrypoint.sh /entrypoint.sh +COPY ./scripts/entrypoint.sh /entrypoint.sh ENTRYPOINT ["/entrypoint.sh"] diff --git a/docker/hooks/build b/docker/hooks/build deleted file mode 100644 index 371640d..0000000 --- a/docker/hooks/build +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash - -DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" -cd "$DIR/../.." ## make sure we're in the right place. Docker Hub screws this up sometimes -echo "docker build dir: $(pwd)" - -docker build --build-arg DOCKER_TAG=$DOCKER_TAG --build-arg DOCKER_COMMIT=$SOURCE_COMMIT -f $DOCKERFILE_PATH -t $IMAGE_NAME . diff --git a/blocking.md b/docs/blocking.md similarity index 100% rename from blocking.md rename to docs/blocking.md diff --git a/cluster_guide.md b/docs/cluster_guide.md similarity index 100% rename from cluster_guide.md rename to docs/cluster_guide.md diff --git a/diagram.png b/docs/diagram.png similarity index 100% rename from diagram.png rename to docs/diagram.png diff --git a/docker/docker-compose.yml b/docs/docker_examples/docker-compose.yml similarity index 97% rename from docker/docker-compose.yml rename to docs/docker_examples/docker-compose.yml index 3d4f617..6f6d321 100644 --- a/docker/docker-compose.yml +++ b/docs/docker_examples/docker-compose.yml @@ -55,8 +55,8 @@ services: volumes: - "lbry_rocksdb:/database" environment: - - HUB_COMMAND=scribe-hub - command: # for full options, see `scribe-hub --help` + - HUB_COMMAND=herald + command: # for full options, see `herald --help` - "--daemon_url=http://lbry:lbry@127.0.0.1:9245" - "--max_query_workers=4" - "--host=0.0.0.0" diff --git a/docker/elastic-compose.yml b/docs/docker_examples/elastic-compose.yml similarity index 100% rename from docker/elastic-compose.yml rename to docs/docker_examples/elastic-compose.yml diff --git a/docker/hub-compose.yml b/docs/docker_examples/hub-compose.yml similarity index 97% rename from docker/hub-compose.yml rename to docs/docker_examples/hub-compose.yml index cd61138..9038984 100644 --- a/docker/hub-compose.yml +++ b/docs/docker_examples/hub-compose.yml @@ -34,7 +34,7 @@ services: - "--max_query_workers=2" - "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8" - "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6" - scribe_hub: + herald: depends_on: - scribe_elastic_sync - scribe @@ -47,7 +47,7 @@ services: volumes: - "lbry_rocksdb:/database" environment: - - HUB_COMMAND=scribe-hub + - HUB_COMMAND=herald command: - "--daemon_url=http://lbry:lbry@127.0.0.1:9245" - "--elastic_host=127.0.0.1" diff --git a/docker/lbcd-compose.yml b/docs/docker_examples/lbcd-compose.yml similarity index 100% rename from docker/lbcd-compose.yml rename to docs/docker_examples/lbcd-compose.yml diff --git a/scribe/elasticsearch/trending algorithm.pdf b/docs/trending algorithm.pdf similarity index 100% rename from scribe/elasticsearch/trending algorithm.pdf rename to docs/trending algorithm.pdf diff --git a/scribe/__init__.py b/hub/__init__.py similarity index 100% rename from scribe/__init__.py rename to hub/__init__.py diff --git a/scribe/build_info.py b/hub/build_info.py similarity index 100% rename from scribe/build_info.py rename to hub/build_info.py diff --git a/hub/common.py b/hub/common.py new file mode 100644 index 0000000..f077c01 --- /dev/null +++ b/hub/common.py @@ -0,0 +1,765 @@ +import struct +import hashlib +import hmac +import ipaddress +import logging +import logging.handlers +import typing +import collections +from decimal import Decimal +from typing import Iterable +from asyncio import get_event_loop, Event +from prometheus_client import Counter +from hub.schema.tags import clean_tags +from hub.schema.url import normalize_name +from hub.error import TooManyClaimSearchParametersError + +log = logging.getLogger(__name__) + + +_sha256 = hashlib.sha256 +_sha512 = hashlib.sha512 +_new_hash = hashlib.new +_new_hmac = hmac.new +HASHX_LEN = 11 +CLAIM_HASH_LEN = 20 + + +HISTOGRAM_BUCKETS = ( + .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') +) + +CLAIM_TYPES = { + 'stream': 1, + 'channel': 2, + 'repost': 3, + 'collection': 4, +} + +STREAM_TYPES = { + 'video': 1, + 'audio': 2, + 'image': 3, + 'document': 4, + 'binary': 5, + 'model': 6, +} + + +def setup_logging(log_path: str): + log = logging.getLogger() + fmt = logging.Formatter("%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") + handler = logging.handlers.RotatingFileHandler(log_path, maxBytes=1024*1024*5, backupCount=2) + handler.setFormatter(fmt) + log.addHandler(handler) + handler = logging.StreamHandler() + handler.setFormatter(fmt) + log.addHandler(handler) + + log.setLevel(logging.INFO) + logging.getLogger('aiohttp').setLevel(logging.WARNING) + logging.getLogger('elasticsearch').setLevel(logging.WARNING) + + +class StagedClaimtrieItem(typing.NamedTuple): + """ + Represents a claim TXO, used internally by the block processor + """ + name: str + normalized_name: str + claim_hash: bytes + amount: int + expiration_height: int + tx_num: int + position: int + root_tx_num: int + root_position: int + channel_signature_is_valid: bool + signing_hash: typing.Optional[bytes] + reposted_claim_hash: typing.Optional[bytes] + + @property + def is_update(self) -> bool: + return (self.tx_num, self.position) != (self.root_tx_num, self.root_position) + + def invalidate_signature(self) -> 'StagedClaimtrieItem': + return StagedClaimtrieItem( + self.name, self.normalized_name, self.claim_hash, self.amount, self.expiration_height, self.tx_num, + self.position, self.root_tx_num, self.root_position, False, None, self.reposted_claim_hash + ) + + +def formatted_time(t, sep=' '): + """Return a number of seconds as a string in days, hours, mins and + maybe secs.""" + t = int(t) + fmts = (('{:d}d', 86400), ('{:02d}h', 3600), ('{:02d}m', 60)) + parts = [] + for fmt, n in fmts: + val = t // n + if parts or val: + parts.append(fmt.format(val)) + t %= n + if len(parts) < 3: + parts.append(f'{t:02d}s') + return sep.join(parts) + + +def protocol_tuple(s): + """Converts a protocol version number, such as "1.0" to a tuple (1, 0). + + If the version number is bad, (0, ) indicating version 0 is returned.""" + try: + return tuple(int(part) for part in s.split('.')) + except Exception: + return (0, ) + + +def version_string(ptuple): + """Convert a version tuple such as (1, 2) to "1.2". + There is always at least one dot, so (1, ) becomes "1.0".""" + while len(ptuple) < 2: + ptuple += (0, ) + return '.'.join(str(p) for p in ptuple) + + +def protocol_version(client_req, min_tuple, max_tuple): + """Given a client's protocol version string, return a pair of + protocol tuples: + (negotiated version, client min request) + If the request is unsupported, the negotiated protocol tuple is + None. + """ + if client_req is None: + client_min = client_max = min_tuple + else: + if isinstance(client_req, list) and len(client_req) == 2: + client_min, client_max = client_req + else: + client_min = client_max = client_req + client_min = protocol_tuple(client_min) + client_max = protocol_tuple(client_max) + + result = min(client_max, max_tuple) + if result < max(client_min, min_tuple) or result == (0, ): + result = None + + return result, client_min + + +class LRUCacheWithMetrics: + __slots__ = [ + 'capacity', + 'cache', + '_track_metrics', + 'hits', + 'misses' + ] + + def __init__(self, capacity: int, metric_name: typing.Optional[str] = None, namespace: str = "daemon_cache"): + self.capacity = capacity + self.cache = collections.OrderedDict() + if metric_name is None: + self._track_metrics = False + self.hits = self.misses = None + else: + self._track_metrics = True + try: + self.hits = Counter( + f"{metric_name}_cache_hit_count", "Number of cache hits", namespace=namespace + ) + self.misses = Counter( + f"{metric_name}_cache_miss_count", "Number of cache misses", namespace=namespace + ) + except ValueError as err: + log.debug("failed to set up prometheus %s_cache_miss_count metric: %s", metric_name, err) + self._track_metrics = False + self.hits = self.misses = None + + def get(self, key, default=None): + try: + value = self.cache.pop(key) + if self._track_metrics: + self.hits.inc() + except KeyError: + if self._track_metrics: + self.misses.inc() + return default + self.cache[key] = value + return value + + def set(self, key, value): + try: + self.cache.pop(key) + except KeyError: + if len(self.cache) >= self.capacity: + self.cache.popitem(last=False) + self.cache[key] = value + + def clear(self): + self.cache.clear() + + def pop(self, key): + return self.cache.pop(key) + + def __setitem__(self, key, value): + return self.set(key, value) + + def __getitem__(self, item): + return self.get(item) + + def __contains__(self, item) -> bool: + return item in self.cache + + def __len__(self): + return len(self.cache) + + def __delitem__(self, key): + self.cache.pop(key) + + def __del__(self): + self.clear() + + +class LRUCache: + __slots__ = [ + 'capacity', + 'cache' + ] + + def __init__(self, capacity: int): + self.capacity = capacity + self.cache = collections.OrderedDict() + + def get(self, key, default=None): + try: + value = self.cache.pop(key) + except KeyError: + return default + self.cache[key] = value + return value + + def set(self, key, value): + try: + self.cache.pop(key) + except KeyError: + if len(self.cache) >= self.capacity: + self.cache.popitem(last=False) + self.cache[key] = value + + def items(self): + return self.cache.items() + + def clear(self): + self.cache.clear() + + def pop(self, key, default=None): + return self.cache.pop(key, default) + + def __setitem__(self, key, value): + return self.set(key, value) + + def __getitem__(self, item): + return self.get(item) + + def __contains__(self, item) -> bool: + return item in self.cache + + def __len__(self): + return len(self.cache) + + def __delitem__(self, key): + self.cache.pop(key) + + def __del__(self): + self.clear() + + +# the ipaddress module does not show these subnets as reserved +CARRIER_GRADE_NAT_SUBNET = ipaddress.ip_network('100.64.0.0/10') +IPV4_TO_6_RELAY_SUBNET = ipaddress.ip_network('192.88.99.0/24') + + +def is_valid_public_ipv4(address, allow_localhost: bool = False, allow_lan: bool = False): + try: + parsed_ip = ipaddress.ip_address(address) + if parsed_ip.is_loopback and allow_localhost: + return True + if allow_lan and parsed_ip.is_private: + return True + if any((parsed_ip.version != 4, parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback, + parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private)): + return False + else: + return not any((CARRIER_GRADE_NAT_SUBNET.supernet_of(ipaddress.ip_network(f"{address}/32")), + IPV4_TO_6_RELAY_SUBNET.supernet_of(ipaddress.ip_network(f"{address}/32")))) + except (ipaddress.AddressValueError, ValueError): + return False + + +def sha256(x): + """Simple wrapper of hashlib sha256.""" + return _sha256(x).digest() + + +def ripemd160(x): + """Simple wrapper of hashlib ripemd160.""" + h = _new_hash('ripemd160') + h.update(x) + return h.digest() + + +def double_sha256(x): + """SHA-256 of SHA-256, as used extensively in bitcoin.""" + return sha256(sha256(x)) + + +def hmac_sha512(key, msg): + """Use SHA-512 to provide an HMAC.""" + return _new_hmac(key, msg, _sha512).digest() + + +def hash160(x): + """RIPEMD-160 of SHA-256. Used to make bitcoin addresses from pubkeys.""" + return ripemd160(sha256(x)) + + +def hash_to_hex_str(x: bytes) -> str: + """Convert a big-endian binary hash to displayed hex string. + + Display form of a binary hash is reversed and converted to hex. + """ + return x[::-1].hex() + + +def hex_str_to_hash(x: str) -> bytes: + """Convert a displayed hex string to a binary hash.""" + return bytes.fromhex(x)[::-1] + + + +INVALID_REQUEST = -32600 +INVALID_ARGS = -32602 + + +class CodeMessageError(Exception): + + @property + def code(self): + return self.args[0] + + @property + def message(self): + return self.args[1] + + def __eq__(self, other): + return (isinstance(other, self.__class__) and + self.code == other.code and self.message == other.message) + + def __hash__(self): + # overridden to make the exception hashable + # see https://bugs.python.org/issue28603 + return hash((self.code, self.message)) + + @classmethod + def invalid_args(cls, message): + return cls(INVALID_ARGS, message) + + @classmethod + def invalid_request(cls, message): + return cls(INVALID_REQUEST, message) + + @classmethod + def empty_batch(cls): + return cls.invalid_request('batch is empty') + + +class RPCError(CodeMessageError): + pass + + + +class DaemonError(Exception): + """Raised when the daemon returns an error in its results.""" + + +class WarmingUpError(Exception): + """Internal - when the daemon is warming up.""" + + +class WorkQueueFullError(Exception): + """Internal - when the daemon's work queue is full.""" + + +class TaskGroup: + def __init__(self, loop=None): + self._loop = loop or get_event_loop() + self._tasks = set() + self.done = Event() + self.started = Event() + + def __len__(self): + return len(self._tasks) + + def add(self, coro): + task = self._loop.create_task(coro) + self._tasks.add(task) + self.started.set() + self.done.clear() + task.add_done_callback(self._remove) + return task + + def _remove(self, task): + self._tasks.remove(task) + if len(self._tasks) < 1: + self.done.set() + self.started.clear() + + def cancel(self): + for task in self._tasks: + task.cancel() + self.done.set() + self.started.clear() + + +class IndexVersionMismatch(Exception): + def __init__(self, got_version, expected_version): + self.got_version = got_version + self.expected_version = expected_version + + +# Elasticsearch constants + +INDEX_DEFAULT_SETTINGS = { + "settings": + {"analysis": + {"analyzer": { + "default": {"tokenizer": "whitespace", "filter": ["lowercase", "porter_stem"]}}}, + "index": + {"refresh_interval": -1, + "number_of_shards": 1, + "number_of_replicas": 0, + "sort": { + "field": ["trending_score", "release_time"], + "order": ["desc", "desc"] + }} + }, + "mappings": { + "properties": { + "claim_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text", + "index_prefixes": { + "min_chars": 1, + "max_chars": 10 + } + }, + "sd_hash": { + "fields": { + "keyword": { + "ignore_above": 96, + "type": "keyword" + } + }, + "type": "text", + "index_prefixes": { + "min_chars": 1, + "max_chars": 4 + } + }, + "height": {"type": "integer"}, + "claim_type": {"type": "byte"}, + "censor_type": {"type": "byte"}, + "trending_score": {"type": "double"}, + "release_time": {"type": "long"} + } + } +} + +FIELDS = { + '_id', + 'claim_id', 'claim_type', 'claim_name', 'normalized_name', + 'tx_id', 'tx_nout', 'tx_position', + 'short_url', 'canonical_url', + 'is_controlling', 'last_take_over_height', + 'public_key_bytes', 'public_key_id', 'claims_in_channel', + 'channel_id', 'signature', 'signature_digest', 'is_signature_valid', + 'amount', 'effective_amount', 'support_amount', + 'fee_amount', 'fee_currency', + 'height', 'creation_height', 'activation_height', 'expiration_height', + 'stream_type', 'media_type', 'censor_type', + 'title', 'author', 'description', + 'timestamp', 'creation_timestamp', + 'duration', 'release_time', + 'tags', 'languages', 'has_source', 'reposted_claim_type', + 'reposted_claim_id', 'repost_count', 'sd_hash', + 'trending_score', 'tx_num', + 'channel_tx_id', 'channel_tx_position', 'channel_height', 'reposted_tx_id', + 'reposted_tx_position', 'reposted_height', +} + +TEXT_FIELDS = { + 'author', 'canonical_url', 'channel_id', 'description', 'claim_id', 'censoring_channel_id', + 'media_type', 'normalized_name', 'public_key_bytes', 'public_key_id', 'short_url', 'signature', + 'claim_name', 'signature_digest', 'title', 'tx_id', 'fee_currency', 'reposted_claim_id', + 'tags', 'sd_hash', 'channel_tx_id', 'reposted_tx_id', +} + +RANGE_FIELDS = { + 'height', 'creation_height', 'activation_height', 'expiration_height', + 'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount', + 'tx_position', 'repost_count', 'limit_claims_per_channel', + 'amount', 'effective_amount', 'support_amount', + 'trending_score', 'censor_type', 'tx_num', 'reposted_tx_position', 'reposted_height', + 'channel_tx_position', 'channel_height', +} + +ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS + +REPLACEMENTS = { + 'claim_name': 'normalized_name', + 'name': 'normalized_name', + 'txid': 'tx_id', + 'nout': 'tx_nout', + 'trending_group': 'trending_score', + 'trending_mixed': 'trending_score', + 'trending_global': 'trending_score', + 'trending_local': 'trending_score', + 'reposted': 'repost_count', + 'stream_types': 'stream_type', + 'media_types': 'media_type', + 'valid_channel_signature': 'is_signature_valid' +} + + +def expand_query(**kwargs): + if "amount_order" in kwargs: + kwargs["limit"] = 1 + kwargs["order_by"] = "effective_amount" + kwargs["offset"] = int(kwargs["amount_order"]) - 1 + if 'name' in kwargs: + kwargs['name'] = normalize_name(kwargs.pop('name')) + if kwargs.get('is_controlling') is False: + kwargs.pop('is_controlling') + query = {'must': [], 'must_not': []} + collapse = None + if 'fee_currency' in kwargs and kwargs['fee_currency'] is not None: + kwargs['fee_currency'] = kwargs['fee_currency'].upper() + for key, value in kwargs.items(): + key = key.replace('claim.', '') + many = key.endswith('__in') or isinstance(value, list) + if many and len(value) > 2048: + raise TooManyClaimSearchParametersError(key, 2048) + if many: + key = key.replace('__in', '') + value = list(filter(None, value)) + if value is None or isinstance(value, list) and len(value) == 0: + continue + key = REPLACEMENTS.get(key, key) + if key in FIELDS: + partial_id = False + if key == 'claim_type': + if isinstance(value, str): + value = CLAIM_TYPES[value] + else: + value = [CLAIM_TYPES[claim_type] for claim_type in value] + elif key == 'stream_type': + value = [STREAM_TYPES[value]] if isinstance(value, str) else list(map(STREAM_TYPES.get, value)) + if key == '_id': + if isinstance(value, Iterable): + value = [item[::-1].hex() for item in value] + else: + value = value[::-1].hex() + if not many and key in ('_id', 'claim_id', 'sd_hash') and len(value) < 20: + partial_id = True + if key in ('signature_valid', 'has_source'): + continue # handled later + if key in TEXT_FIELDS: + key += '.keyword' + ops = {'<=': 'lte', '>=': 'gte', '<': 'lt', '>': 'gt'} + if partial_id: + query['must'].append({"prefix": {key: value}}) + elif key in RANGE_FIELDS and isinstance(value, str) and value[0] in ops: + operator_length = 2 if value[:2] in ops else 1 + operator, value = value[:operator_length], value[operator_length:] + if key == 'fee_amount': + value = str(Decimal(value)*1000) + query['must'].append({"range": {key: {ops[operator]: value}}}) + elif key in RANGE_FIELDS and isinstance(value, list) and all(v[0] in ops for v in value): + range_constraints = [] + release_times = [] + for v in value: + operator_length = 2 if v[:2] in ops else 1 + operator, stripped_op_v = v[:operator_length], v[operator_length:] + if key == 'fee_amount': + stripped_op_v = str(Decimal(stripped_op_v)*1000) + if key == 'release_time': + release_times.append((operator, stripped_op_v)) + else: + range_constraints.append((operator, stripped_op_v)) + if key != 'release_time': + query['must'].append({"range": {key: {ops[operator]: v for operator, v in range_constraints}}}) + else: + query['must'].append( + {"bool": + {"should": [ + {"bool": { + "must_not": { + "exists": { + "field": "release_time" + } + } + }}, + {"bool": { + "must": [ + {"exists": {"field": "release_time"}}, + {'range': {key: {ops[operator]: v for operator, v in release_times}}}, + ]}}, + ]} + } + ) + elif many: + query['must'].append({"terms": {key: value}}) + else: + if key == 'fee_amount': + value = str(Decimal(value)*1000) + query['must'].append({"term": {key: {"value": value}}}) + elif key == 'not_channel_ids': + for channel_id in value: + query['must_not'].append({"term": {'channel_id.keyword': channel_id}}) + query['must_not'].append({"term": {'_id': channel_id}}) + elif key == 'channel_ids': + query['must'].append({"terms": {'channel_id.keyword': value}}) + elif key == 'claim_ids': + query['must'].append({"terms": {'claim_id.keyword': value}}) + elif key == 'media_types': + query['must'].append({"terms": {'media_type.keyword': value}}) + elif key == 'any_languages': + query['must'].append({"terms": {'languages': clean_tags(value)}}) + elif key == 'any_languages': + query['must'].append({"terms": {'languages': value}}) + elif key == 'all_languages': + query['must'].extend([{"term": {'languages': tag}} for tag in value]) + elif key == 'any_tags': + query['must'].append({"terms": {'tags.keyword': clean_tags(value)}}) + elif key == 'all_tags': + query['must'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)]) + elif key == 'not_tags': + query['must_not'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)]) + elif key == 'not_claim_id': + query['must_not'].extend([{"term": {'claim_id.keyword': cid}} for cid in value]) + elif key == 'limit_claims_per_channel': + collapse = ('channel_id.keyword', value) + if kwargs.get('has_channel_signature'): + query['must'].append({"exists": {"field": "signature"}}) + if 'signature_valid' in kwargs: + query['must'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}}) + elif 'signature_valid' in kwargs: + query['must'].append( + {"bool": + {"should": [ + {"bool": {"must_not": {"exists": {"field": "signature"}}}}, + {"bool" : {"must" : {"term": {"is_signature_valid": bool(kwargs["signature_valid"])}}}} + ]} + } + ) + if 'has_source' in kwargs: + is_stream_or_repost_terms = {"terms": {"claim_type": [CLAIM_TYPES['stream'], CLAIM_TYPES['repost']]}} + query['must'].append( + {"bool": + {"should": [ + {"bool": # when is_stream_or_repost AND has_source + {"must": [ + {"match": {"has_source": kwargs['has_source']}}, + is_stream_or_repost_terms, + ] + }, + }, + {"bool": # when not is_stream_or_repost + {"must_not": is_stream_or_repost_terms} + }, + {"bool": # when reposted_claim_type wouldn't have source + {"must_not": + [ + {"term": {"reposted_claim_type": CLAIM_TYPES['stream']}} + ], + "must": + [ + {"term": {"claim_type": CLAIM_TYPES['repost']}} + ] + } + } + ]} + } + ) + if kwargs.get('text'): + query['must'].append( + {"simple_query_string": + {"query": kwargs["text"], "fields": [ + "claim_name^4", "channel_name^8", "title^1", "description^.5", "author^1", "tags^.5" + ]}}) + query = { + "_source": {"excludes": ["description", "title"]}, + 'query': {'bool': query}, + "sort": [], + } + if "limit" in kwargs: + query["size"] = kwargs["limit"] + if 'offset' in kwargs: + query["from"] = kwargs["offset"] + if 'order_by' in kwargs: + if isinstance(kwargs["order_by"], str): + kwargs["order_by"] = [kwargs["order_by"]] + for value in kwargs['order_by']: + if 'trending_group' in value: + # fixme: trending_mixed is 0 for all records on variable decay, making sort slow. + continue + is_asc = value.startswith('^') + value = value[1:] if is_asc else value + value = REPLACEMENTS.get(value, value) + if value in TEXT_FIELDS: + value += '.keyword' + query['sort'].append({value: "asc" if is_asc else "desc"}) + if collapse: + query["collapse"] = { + "field": collapse[0], + "inner_hits": { + "name": collapse[0], + "size": collapse[1], + "sort": query["sort"] + } + } + return query + + +def expand_result(results): + inner_hits = [] + expanded = [] + for result in results: + if result.get("inner_hits"): + for _, inner_hit in result["inner_hits"].items(): + inner_hits.extend(inner_hit["hits"]["hits"]) + continue + result = result['_source'] + result['claim_hash'] = bytes.fromhex(result['claim_id'])[::-1] + if result['reposted_claim_id']: + result['reposted_claim_hash'] = bytes.fromhex(result['reposted_claim_id'])[::-1] + else: + result['reposted_claim_hash'] = None + result['channel_hash'] = bytes.fromhex(result['channel_id'])[::-1] if result['channel_id'] else None + result['txo_hash'] = bytes.fromhex(result['tx_id'])[::-1] + struct.pack('LH') diff --git a/scribe/db/interface.py b/hub/db/interface.py similarity index 98% rename from scribe/db/interface.py rename to hub/db/interface.py index 5045705..f3b7094 100644 --- a/scribe/db/interface.py +++ b/hub/db/interface.py @@ -2,8 +2,8 @@ import struct import typing import rocksdb from typing import Optional -from scribe.db.common import DB_PREFIXES, COLUMN_SETTINGS -from scribe.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete +from hub.db.common import DB_PREFIXES, COLUMN_SETTINGS +from hub.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete ROW_TYPES = {} diff --git a/scribe/db/merkle.py b/hub/db/merkle.py similarity index 99% rename from scribe/db/merkle.py rename to hub/db/merkle.py index 3321fa1..2541bf6 100644 --- a/scribe/db/merkle.py +++ b/hub/db/merkle.py @@ -29,7 +29,7 @@ import typing from asyncio import Event from math import ceil, log -from scribe.common import double_sha256 +from hub.common import double_sha256 class Merkle: diff --git a/scribe/db/migrators/__init__.py b/hub/db/migrators/__init__.py similarity index 100% rename from scribe/db/migrators/__init__.py rename to hub/db/migrators/__init__.py diff --git a/scribe/db/migrators/migrate7to8.py b/hub/db/migrators/migrate7to8.py similarity index 97% rename from scribe/db/migrators/migrate7to8.py rename to hub/db/migrators/migrate7to8.py index 58e1627..cfeaae5 100644 --- a/scribe/db/migrators/migrate7to8.py +++ b/hub/db/migrators/migrate7to8.py @@ -3,9 +3,9 @@ import time import array import typing from bisect import bisect_right -from scribe.common import sha256 +from hub.common import sha256 if typing.TYPE_CHECKING: - from scribe.db.db import HubDB + from hub.db.db import HubDB FROM_VERSION = 7 TO_VERSION = 8 diff --git a/scribe/db/migrators/migrate8to9.py b/hub/db/migrators/migrate8to9.py similarity index 100% rename from scribe/db/migrators/migrate8to9.py rename to hub/db/migrators/migrate8to9.py diff --git a/scribe/db/prefixes.py b/hub/db/prefixes.py similarity index 99% rename from scribe/db/prefixes.py rename to hub/db/prefixes.py index 4cb214b..3918652 100644 --- a/scribe/db/prefixes.py +++ b/hub/db/prefixes.py @@ -3,9 +3,9 @@ import struct import array import base64 from typing import Union, Tuple, NamedTuple, Optional -from scribe.db.common import DB_PREFIXES -from scribe.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow -from scribe.schema.url import normalize_name +from hub.db.common import DB_PREFIXES +from hub.db.interface import BasePrefixDB, ROW_TYPES, PrefixRow +from hub.schema.url import normalize_name ACTIVATED_CLAIM_TXO_TYPE = 1 ACTIVATED_SUPPORT_TXO_TYPE = 2 diff --git a/scribe/db/revertable.py b/hub/db/revertable.py similarity index 99% rename from scribe/db/revertable.py rename to hub/db/revertable.py index a982a97..37fbc4e 100644 --- a/scribe/db/revertable.py +++ b/hub/db/revertable.py @@ -3,7 +3,7 @@ import logging from string import printable from collections import defaultdict from typing import Tuple, Iterable, Callable, Optional, List -from scribe.db.common import DB_PREFIXES +from hub.db.common import DB_PREFIXES _OP_STRUCT = struct.Struct('>BLL') log = logging.getLogger(__name__) @@ -58,7 +58,7 @@ class RevertableOp: return str(self) def __str__(self) -> str: - from scribe.db.prefixes import auto_decode_item + from hub.db.prefixes import auto_decode_item k, v = auto_decode_item(self.key, self.value) key = ''.join(c if c in printable else '.' for c in str(k)) val = ''.join(c if c in printable else '.' for c in str(v)) diff --git a/scribe/schema/types/__init__.py b/hub/elastic_sync/__init__.py similarity index 100% rename from scribe/schema/types/__init__.py rename to hub/elastic_sync/__init__.py diff --git a/scribe/elasticsearch/__main__.py b/hub/elastic_sync/__main__.py similarity index 81% rename from scribe/elasticsearch/__main__.py rename to hub/elastic_sync/__main__.py index f968a8c..f8dfefc 100644 --- a/scribe/elasticsearch/__main__.py +++ b/hub/elastic_sync/__main__.py @@ -2,9 +2,9 @@ import os import logging import traceback import argparse -from scribe.common import setup_logging -from scribe.elasticsearch.env import ElasticEnv -from scribe.elasticsearch.service import ElasticSyncService +from hub.common import setup_logging +from hub.elastic_sync.env import ElasticEnv +from hub.elastic_sync.service import ElasticSyncService def main(): diff --git a/scribe/elasticsearch/env.py b/hub/elastic_sync/env.py similarity index 99% rename from scribe/elasticsearch/env.py rename to hub/elastic_sync/env.py index 58fcfad..b4a82cc 100644 --- a/scribe/elasticsearch/env.py +++ b/hub/elastic_sync/env.py @@ -1,4 +1,4 @@ -from scribe.env import Env +from hub.env import Env class ElasticEnv(Env): diff --git a/scribe/elasticsearch/fast_ar_trending.py b/hub/elastic_sync/fast_ar_trending.py similarity index 100% rename from scribe/elasticsearch/fast_ar_trending.py rename to hub/elastic_sync/fast_ar_trending.py diff --git a/scribe/elasticsearch/service.py b/hub/elastic_sync/service.py similarity index 97% rename from scribe/elasticsearch/service.py rename to hub/elastic_sync/service.py index 9476d5b..67e9fd4 100644 --- a/scribe/elasticsearch/service.py +++ b/hub/elastic_sync/service.py @@ -5,16 +5,15 @@ import asyncio from collections import defaultdict from elasticsearch import AsyncElasticsearch, NotFoundError from elasticsearch.helpers import async_streaming_bulk -from scribe.schema.result import Censor -from scribe.service import BlockchainReaderService -from scribe.db.revertable import RevertableOp -from scribe.db.common import TrendingNotification, DB_PREFIXES -from scribe.elasticsearch.notifier_protocol import ElasticNotifierProtocol -from scribe.elasticsearch.search import IndexVersionMismatch, expand_query -from scribe.elasticsearch.constants import ALL_FIELDS, INDEX_DEFAULT_SETTINGS -from scribe.elasticsearch.fast_ar_trending import FAST_AR_TRENDING_SCRIPT +from hub.schema.result import Censor +from hub.service import BlockchainReaderService +from hub.common import IndexVersionMismatch, ALL_FIELDS, INDEX_DEFAULT_SETTINGS, expand_query +from hub.db.revertable import RevertableOp +from hub.db.common import TrendingNotification, DB_PREFIXES +from hub.notifier_protocol import ElasticNotifierProtocol +from hub.elastic_sync.fast_ar_trending import FAST_AR_TRENDING_SCRIPT if typing.TYPE_CHECKING: - from scribe.elasticsearch.env import ElasticEnv + from hub.elastic_sync.env import ElasticEnv class ElasticSyncService(BlockchainReaderService): diff --git a/scribe/env.py b/hub/env.py similarity index 99% rename from scribe/env.py rename to hub/env.py index 6ca1146..ce18748 100644 --- a/scribe/env.py +++ b/hub/env.py @@ -3,7 +3,7 @@ import re import resource import logging from collections import namedtuple -from scribe.blockchain.network import LBCMainNet, LBCTestNet, LBCRegTest +from hub.scribe.network import LBCMainNet, LBCTestNet, LBCRegTest NetIdentity = namedtuple('NetIdentity', 'host tcp_port ssl_port nick_suffix') diff --git a/scribe/error/Makefile b/hub/error/Makefile similarity index 100% rename from scribe/error/Makefile rename to hub/error/Makefile diff --git a/scribe/error/README.md b/hub/error/README.md similarity index 100% rename from scribe/error/README.md rename to hub/error/README.md diff --git a/scribe/error/__init__.py b/hub/error/__init__.py similarity index 100% rename from scribe/error/__init__.py rename to hub/error/__init__.py diff --git a/scribe/error/base.py b/hub/error/base.py similarity index 100% rename from scribe/error/base.py rename to hub/error/base.py diff --git a/scribe/error/generate.py b/hub/error/generate.py similarity index 100% rename from scribe/error/generate.py rename to hub/error/generate.py diff --git a/scribe/hub/__init__.py b/hub/herald/__init__.py similarity index 100% rename from scribe/hub/__init__.py rename to hub/herald/__init__.py diff --git a/scribe/hub/__main__.py b/hub/herald/__main__.py similarity index 70% rename from scribe/hub/__main__.py rename to hub/herald/__main__.py index 0862e87..3532469 100644 --- a/scribe/hub/__main__.py +++ b/hub/herald/__main__.py @@ -2,20 +2,20 @@ import os import logging import traceback import argparse -from scribe.common import setup_logging -from scribe.hub.env import ServerEnv -from scribe.hub.service import HubServerService +from hub.common import setup_logging +from hub.herald.env import ServerEnv +from hub.herald.service import HubServerService def main(): parser = argparse.ArgumentParser( - prog='scribe-hub' + prog='herald' ) ServerEnv.contribute_to_arg_parser(parser) args = parser.parse_args() try: env = ServerEnv.from_arg_parser(args) - setup_logging(os.path.join(env.db_dir, 'scribe-hub.log')) + setup_logging(os.path.join(env.db_dir, 'herald.log')) server = HubServerService(env) server.run() except Exception: diff --git a/scribe/hub/common.py b/hub/herald/common.py similarity index 99% rename from scribe/hub/common.py rename to hub/herald/common.py index 7eb2c11..c1b00f9 100644 --- a/scribe/hub/common.py +++ b/hub/herald/common.py @@ -1,7 +1,7 @@ import inspect from collections import namedtuple from functools import lru_cache -from scribe.common import CodeMessageError +from hub.common import CodeMessageError SignatureInfo = namedtuple('SignatureInfo', 'min_args max_args ' diff --git a/scribe/hub/env.py b/hub/herald/env.py similarity index 99% rename from scribe/hub/env.py rename to hub/herald/env.py index 85ada99..da8f997 100644 --- a/scribe/hub/env.py +++ b/hub/herald/env.py @@ -1,5 +1,5 @@ import re -from scribe.env import Env +from hub.env import Env class ServerEnv(Env): diff --git a/scribe/hub/framer.py b/hub/herald/framer.py similarity index 100% rename from scribe/hub/framer.py rename to hub/herald/framer.py diff --git a/scribe/hub/jsonrpc.py b/hub/herald/jsonrpc.py similarity index 99% rename from scribe/hub/jsonrpc.py rename to hub/herald/jsonrpc.py index aae8c4e..c940f40 100644 --- a/scribe/hub/jsonrpc.py +++ b/hub/herald/jsonrpc.py @@ -6,8 +6,8 @@ import asyncio from asyncio import Event from functools import partial from numbers import Number -from scribe.common import RPCError, CodeMessageError -from scribe.hub.common import Notification, Request, Response, Batch, ProtocolError +from hub.common import RPCError, CodeMessageError +from hub.herald.common import Notification, Request, Response, Batch, ProtocolError class JSONRPC: diff --git a/scribe/hub/mempool.py b/hub/herald/mempool.py similarity index 97% rename from scribe/hub/mempool.py rename to hub/herald/mempool.py index a2db139..0ec1e0d 100644 --- a/scribe/hub/mempool.py +++ b/hub/herald/mempool.py @@ -6,14 +6,14 @@ import logging from collections import defaultdict from prometheus_client import Histogram, Gauge import rocksdb.errors -from scribe import PROMETHEUS_NAMESPACE -from scribe.common import HISTOGRAM_BUCKETS -from scribe.db.common import UTXO -from scribe.blockchain.transaction.deserializer import Deserializer +from hub import PROMETHEUS_NAMESPACE +from hub.common import HISTOGRAM_BUCKETS +from hub.db.common import UTXO +from hub.scribe.transaction.deserializer import Deserializer if typing.TYPE_CHECKING: - from scribe.hub.session import SessionManager - from scribe.db import HubDB + from hub.herald.session import SessionManager + from hub.db import HubDB @attr.s(slots=True) diff --git a/scribe/elasticsearch/search.py b/hub/herald/search.py similarity index 54% rename from scribe/elasticsearch/search.py rename to hub/herald/search.py index 7b77801..4ef0acd 100644 --- a/scribe/elasticsearch/search.py +++ b/hub/herald/search.py @@ -1,23 +1,16 @@ import logging import asyncio -import struct from bisect import bisect_right from collections import Counter, deque -from decimal import Decimal from operator import itemgetter -from typing import Optional, List, Iterable, TYPE_CHECKING +from typing import Optional, List, TYPE_CHECKING from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError -from scribe.schema.result import Censor, Outputs -from scribe.schema.tags import clean_tags -from scribe.schema.url import normalize_name -from scribe.error import TooManyClaimSearchParametersError -from scribe.common import LRUCache -from scribe.db.common import CLAIM_TYPES, STREAM_TYPES -from scribe.elasticsearch.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, RANGE_FIELDS -from scribe.db.common import ResolveResult +from hub.schema.result import Censor, Outputs +from hub.common import LRUCache, IndexVersionMismatch, INDEX_DEFAULT_SETTINGS, expand_query, expand_result +from hub.db.common import ResolveResult if TYPE_CHECKING: - from scribe.db import HubDB + from hub.db import HubDB class ChannelResolution(str): @@ -32,12 +25,6 @@ class StreamResolution(str): return LookupError(f'Could not find claim at "{url}".') -class IndexVersionMismatch(Exception): - def __init__(self, got_version, expected_version): - self.got_version = got_version - self.expected_version = expected_version - - class SearchIndex: VERSION = 1 @@ -91,7 +78,7 @@ class SearchIndex: self.logger.error("es search index has an incompatible version: %s vs %s", index_version, self.VERSION) raise IndexVersionMismatch(index_version, self.VERSION) await self.sync_client.indices.refresh(self.index) - return acked + return True async def stop(self): clients = [c for c in (self.sync_client, self.search_client) if c is not None] @@ -297,234 +284,6 @@ class SearchIndex: return referenced_txos -def expand_query(**kwargs): - if "amount_order" in kwargs: - kwargs["limit"] = 1 - kwargs["order_by"] = "effective_amount" - kwargs["offset"] = int(kwargs["amount_order"]) - 1 - if 'name' in kwargs: - kwargs['name'] = normalize_name(kwargs.pop('name')) - if kwargs.get('is_controlling') is False: - kwargs.pop('is_controlling') - query = {'must': [], 'must_not': []} - collapse = None - if 'fee_currency' in kwargs and kwargs['fee_currency'] is not None: - kwargs['fee_currency'] = kwargs['fee_currency'].upper() - for key, value in kwargs.items(): - key = key.replace('claim.', '') - many = key.endswith('__in') or isinstance(value, list) - if many and len(value) > 2048: - raise TooManyClaimSearchParametersError(key, 2048) - if many: - key = key.replace('__in', '') - value = list(filter(None, value)) - if value is None or isinstance(value, list) and len(value) == 0: - continue - key = REPLACEMENTS.get(key, key) - if key in FIELDS: - partial_id = False - if key == 'claim_type': - if isinstance(value, str): - value = CLAIM_TYPES[value] - else: - value = [CLAIM_TYPES[claim_type] for claim_type in value] - elif key == 'stream_type': - value = [STREAM_TYPES[value]] if isinstance(value, str) else list(map(STREAM_TYPES.get, value)) - if key == '_id': - if isinstance(value, Iterable): - value = [item[::-1].hex() for item in value] - else: - value = value[::-1].hex() - if not many and key in ('_id', 'claim_id', 'sd_hash') and len(value) < 20: - partial_id = True - if key in ('signature_valid', 'has_source'): - continue # handled later - if key in TEXT_FIELDS: - key += '.keyword' - ops = {'<=': 'lte', '>=': 'gte', '<': 'lt', '>': 'gt'} - if partial_id: - query['must'].append({"prefix": {key: value}}) - elif key in RANGE_FIELDS and isinstance(value, str) and value[0] in ops: - operator_length = 2 if value[:2] in ops else 1 - operator, value = value[:operator_length], value[operator_length:] - if key == 'fee_amount': - value = str(Decimal(value)*1000) - query['must'].append({"range": {key: {ops[operator]: value}}}) - elif key in RANGE_FIELDS and isinstance(value, list) and all(v[0] in ops for v in value): - range_constraints = [] - release_times = [] - for v in value: - operator_length = 2 if v[:2] in ops else 1 - operator, stripped_op_v = v[:operator_length], v[operator_length:] - if key == 'fee_amount': - stripped_op_v = str(Decimal(stripped_op_v)*1000) - if key == 'release_time': - release_times.append((operator, stripped_op_v)) - else: - range_constraints.append((operator, stripped_op_v)) - if key != 'release_time': - query['must'].append({"range": {key: {ops[operator]: v for operator, v in range_constraints}}}) - else: - query['must'].append( - {"bool": - {"should": [ - {"bool": { - "must_not": { - "exists": { - "field": "release_time" - } - } - }}, - {"bool": { - "must": [ - {"exists": {"field": "release_time"}}, - {'range': {key: {ops[operator]: v for operator, v in release_times}}}, - ]}}, - ]} - } - ) - elif many: - query['must'].append({"terms": {key: value}}) - else: - if key == 'fee_amount': - value = str(Decimal(value)*1000) - query['must'].append({"term": {key: {"value": value}}}) - elif key == 'not_channel_ids': - for channel_id in value: - query['must_not'].append({"term": {'channel_id.keyword': channel_id}}) - query['must_not'].append({"term": {'_id': channel_id}}) - elif key == 'channel_ids': - query['must'].append({"terms": {'channel_id.keyword': value}}) - elif key == 'claim_ids': - query['must'].append({"terms": {'claim_id.keyword': value}}) - elif key == 'media_types': - query['must'].append({"terms": {'media_type.keyword': value}}) - elif key == 'any_languages': - query['must'].append({"terms": {'languages': clean_tags(value)}}) - elif key == 'any_languages': - query['must'].append({"terms": {'languages': value}}) - elif key == 'all_languages': - query['must'].extend([{"term": {'languages': tag}} for tag in value]) - elif key == 'any_tags': - query['must'].append({"terms": {'tags.keyword': clean_tags(value)}}) - elif key == 'all_tags': - query['must'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)]) - elif key == 'not_tags': - query['must_not'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)]) - elif key == 'not_claim_id': - query['must_not'].extend([{"term": {'claim_id.keyword': cid}} for cid in value]) - elif key == 'limit_claims_per_channel': - collapse = ('channel_id.keyword', value) - if kwargs.get('has_channel_signature'): - query['must'].append({"exists": {"field": "signature"}}) - if 'signature_valid' in kwargs: - query['must'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}}) - elif 'signature_valid' in kwargs: - query['must'].append( - {"bool": - {"should": [ - {"bool": {"must_not": {"exists": {"field": "signature"}}}}, - {"bool" : {"must" : {"term": {"is_signature_valid": bool(kwargs["signature_valid"])}}}} - ]} - } - ) - if 'has_source' in kwargs: - is_stream_or_repost_terms = {"terms": {"claim_type": [CLAIM_TYPES['stream'], CLAIM_TYPES['repost']]}} - query['must'].append( - {"bool": - {"should": [ - {"bool": # when is_stream_or_repost AND has_source - {"must": [ - {"match": {"has_source": kwargs['has_source']}}, - is_stream_or_repost_terms, - ] - }, - }, - {"bool": # when not is_stream_or_repost - {"must_not": is_stream_or_repost_terms} - }, - {"bool": # when reposted_claim_type wouldn't have source - {"must_not": - [ - {"term": {"reposted_claim_type": CLAIM_TYPES['stream']}} - ], - "must": - [ - {"term": {"claim_type": CLAIM_TYPES['repost']}} - ] - } - } - ]} - } - ) - if kwargs.get('text'): - query['must'].append( - {"simple_query_string": - {"query": kwargs["text"], "fields": [ - "claim_name^4", "channel_name^8", "title^1", "description^.5", "author^1", "tags^.5" - ]}}) - query = { - "_source": {"excludes": ["description", "title"]}, - 'query': {'bool': query}, - "sort": [], - } - if "limit" in kwargs: - query["size"] = kwargs["limit"] - if 'offset' in kwargs: - query["from"] = kwargs["offset"] - if 'order_by' in kwargs: - if isinstance(kwargs["order_by"], str): - kwargs["order_by"] = [kwargs["order_by"]] - for value in kwargs['order_by']: - if 'trending_group' in value: - # fixme: trending_mixed is 0 for all records on variable decay, making sort slow. - continue - is_asc = value.startswith('^') - value = value[1:] if is_asc else value - value = REPLACEMENTS.get(value, value) - if value in TEXT_FIELDS: - value += '.keyword' - query['sort'].append({value: "asc" if is_asc else "desc"}) - if collapse: - query["collapse"] = { - "field": collapse[0], - "inner_hits": { - "name": collapse[0], - "size": collapse[1], - "sort": query["sort"] - } - } - return query - - -def expand_result(results): - inner_hits = [] - expanded = [] - for result in results: - if result.get("inner_hits"): - for _, inner_hit in result["inner_hits"].items(): - inner_hits.extend(inner_hit["hits"]["hits"]) - continue - result = result['_source'] - result['claim_hash'] = bytes.fromhex(result['claim_id'])[::-1] - if result['reposted_claim_id']: - result['reposted_claim_hash'] = bytes.fromhex(result['reposted_claim_id'])[::-1] - else: - result['reposted_claim_hash'] = None - result['channel_hash'] = bytes.fromhex(result['channel_id'])[::-1] if result['channel_id'] else None - result['txo_hash'] = bytes.fromhex(result['tx_id'])[::-1] + struct.pack(' (3, 7): diff --git a/scribe/schema/claim.py b/hub/schema/claim.py similarity index 97% rename from scribe/schema/claim.py rename to hub/schema/claim.py index a4a46d8..f59a55f 100644 --- a/scribe/schema/claim.py +++ b/hub/schema/claim.py @@ -11,15 +11,15 @@ from hachoir.core.log import log as hachoir_log from hachoir.parser import createParser as binary_file_parser from hachoir.metadata import extractMetadata as binary_file_metadata -from scribe.schema import compat -from scribe.schema.base import Signable -from scribe.schema.mime_types import guess_media_type, guess_stream_type -from scribe.schema.attrs import ( +from hub.schema import compat +from hub.schema.base import Signable +from hub.schema.mime_types import guess_media_type, guess_stream_type +from hub.schema.attrs import ( Source, Playable, Dimmensional, Fee, Image, Video, Audio, LanguageList, LocationList, ClaimList, ClaimReference, TagList ) -from scribe.schema.types.v2.claim_pb2 import Claim as ClaimMessage -from scribe.error import InputValueIsNoneError +from hub.schema.types.v2.claim_pb2 import Claim as ClaimMessage +from hub.error import InputValueIsNoneError hachoir_log.use_print = False diff --git a/scribe/schema/compat.py b/hub/schema/compat.py similarity index 94% rename from scribe/schema/compat.py rename to hub/schema/compat.py index 2dc99b0..46dd72f 100644 --- a/scribe/schema/compat.py +++ b/hub/schema/compat.py @@ -3,9 +3,9 @@ from decimal import Decimal from google.protobuf.message import DecodeError -from scribe.schema.types.v1.legacy_claim_pb2 import Claim as OldClaimMessage -from scribe.schema.types.v1.certificate_pb2 import KeyType -from scribe.schema.types.v1.fee_pb2 import Fee as FeeMessage +from hub.schema.types.v1.legacy_claim_pb2 import Claim as OldClaimMessage +from hub.schema.types.v1.certificate_pb2 import KeyType +from hub.schema.types.v1.fee_pb2 import Fee as FeeMessage def from_old_json_schema(claim, payload: bytes): diff --git a/scribe/schema/mime_types.py b/hub/schema/mime_types.py similarity index 100% rename from scribe/schema/mime_types.py rename to hub/schema/mime_types.py diff --git a/scribe/schema/purchase.py b/hub/schema/purchase.py similarity index 94% rename from scribe/schema/purchase.py rename to hub/schema/purchase.py index 22148e6..cbb9329 100644 --- a/scribe/schema/purchase.py +++ b/hub/schema/purchase.py @@ -1,6 +1,6 @@ from google.protobuf.message import DecodeError from google.protobuf.json_format import MessageToDict -from scribe.schema.types.v2.purchase_pb2 import Purchase as PurchaseMessage +from hub.schema.types.v2.purchase_pb2 import Purchase as PurchaseMessage from .attrs import ClaimReference diff --git a/scribe/schema/result.py b/hub/schema/result.py similarity index 97% rename from scribe/schema/result.py rename to hub/schema/result.py index 2429e93..6acfd3e 100644 --- a/scribe/schema/result.py +++ b/hub/schema/result.py @@ -2,11 +2,11 @@ import base64 from typing import List, TYPE_CHECKING, Union, Optional, Dict, Set, Tuple from itertools import chain -from scribe.error import ResolveCensoredError -from scribe.schema.types.v2.result_pb2 import Outputs as OutputsMessage -from scribe.schema.types.v2.result_pb2 import Error as ErrorMessage +from hub.error import ResolveCensoredError +from hub.schema.types.v2.result_pb2 import Outputs as OutputsMessage +from hub.schema.types.v2.result_pb2 import Error as ErrorMessage if TYPE_CHECKING: - from scribe.db.common import ResolveResult + from hub.db.common import ResolveResult INVALID = ErrorMessage.Code.Name(ErrorMessage.INVALID) NOT_FOUND = ErrorMessage.Code.Name(ErrorMessage.NOT_FOUND) BLOCKED = ErrorMessage.Code.Name(ErrorMessage.BLOCKED) diff --git a/scribe/schema/support.py b/hub/schema/support.py similarity index 78% rename from scribe/schema/support.py rename to hub/schema/support.py index 35f60f6..ccd2cc7 100644 --- a/scribe/schema/support.py +++ b/hub/schema/support.py @@ -1,5 +1,5 @@ -from scribe.schema.base import Signable -from scribe.schema.types.v2.support_pb2 import Support as SupportMessage +from hub.schema.base import Signable +from hub.schema.types.v2.support_pb2 import Support as SupportMessage class Support(Signable): diff --git a/scribe/schema/tags.py b/hub/schema/tags.py similarity index 100% rename from scribe/schema/tags.py rename to hub/schema/tags.py diff --git a/scribe/schema/types/v1/__init__.py b/hub/schema/types/__init__.py similarity index 100% rename from scribe/schema/types/v1/__init__.py rename to hub/schema/types/__init__.py diff --git a/scribe/schema/types/v2/__init__.py b/hub/schema/types/v1/__init__.py similarity index 100% rename from scribe/schema/types/v2/__init__.py rename to hub/schema/types/v1/__init__.py diff --git a/scribe/schema/types/v1/certificate_pb2.py b/hub/schema/types/v1/certificate_pb2.py similarity index 100% rename from scribe/schema/types/v1/certificate_pb2.py rename to hub/schema/types/v1/certificate_pb2.py diff --git a/scribe/schema/types/v1/fee_pb2.py b/hub/schema/types/v1/fee_pb2.py similarity index 100% rename from scribe/schema/types/v1/fee_pb2.py rename to hub/schema/types/v1/fee_pb2.py diff --git a/scribe/schema/types/v1/legacy_claim_pb2.py b/hub/schema/types/v1/legacy_claim_pb2.py similarity index 100% rename from scribe/schema/types/v1/legacy_claim_pb2.py rename to hub/schema/types/v1/legacy_claim_pb2.py diff --git a/scribe/schema/types/v1/metadata_pb2.py b/hub/schema/types/v1/metadata_pb2.py similarity index 100% rename from scribe/schema/types/v1/metadata_pb2.py rename to hub/schema/types/v1/metadata_pb2.py diff --git a/scribe/schema/types/v1/signature_pb2.py b/hub/schema/types/v1/signature_pb2.py similarity index 100% rename from scribe/schema/types/v1/signature_pb2.py rename to hub/schema/types/v1/signature_pb2.py diff --git a/scribe/schema/types/v1/source_pb2.py b/hub/schema/types/v1/source_pb2.py similarity index 100% rename from scribe/schema/types/v1/source_pb2.py rename to hub/schema/types/v1/source_pb2.py diff --git a/scribe/schema/types/v1/stream_pb2.py b/hub/schema/types/v1/stream_pb2.py similarity index 100% rename from scribe/schema/types/v1/stream_pb2.py rename to hub/schema/types/v1/stream_pb2.py diff --git a/hub/schema/types/v2/__init__.py b/hub/schema/types/v2/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scribe/schema/types/v2/claim_pb2.py b/hub/schema/types/v2/claim_pb2.py similarity index 100% rename from scribe/schema/types/v2/claim_pb2.py rename to hub/schema/types/v2/claim_pb2.py diff --git a/scribe/schema/types/v2/hub_pb2.py b/hub/schema/types/v2/hub_pb2.py similarity index 100% rename from scribe/schema/types/v2/hub_pb2.py rename to hub/schema/types/v2/hub_pb2.py diff --git a/scribe/schema/types/v2/hub_pb2_grpc.py b/hub/schema/types/v2/hub_pb2_grpc.py similarity index 100% rename from scribe/schema/types/v2/hub_pb2_grpc.py rename to hub/schema/types/v2/hub_pb2_grpc.py diff --git a/scribe/schema/types/v2/purchase_pb2.py b/hub/schema/types/v2/purchase_pb2.py similarity index 100% rename from scribe/schema/types/v2/purchase_pb2.py rename to hub/schema/types/v2/purchase_pb2.py diff --git a/scribe/schema/types/v2/result_pb2.py b/hub/schema/types/v2/result_pb2.py similarity index 100% rename from scribe/schema/types/v2/result_pb2.py rename to hub/schema/types/v2/result_pb2.py diff --git a/scribe/schema/types/v2/result_pb2_grpc.py b/hub/schema/types/v2/result_pb2_grpc.py similarity index 100% rename from scribe/schema/types/v2/result_pb2_grpc.py rename to hub/schema/types/v2/result_pb2_grpc.py diff --git a/scribe/schema/types/v2/support_pb2.py b/hub/schema/types/v2/support_pb2.py similarity index 100% rename from scribe/schema/types/v2/support_pb2.py rename to hub/schema/types/v2/support_pb2.py diff --git a/scribe/schema/url.py b/hub/schema/url.py similarity index 100% rename from scribe/schema/url.py rename to hub/schema/url.py diff --git a/hub/scribe/__init__.py b/hub/scribe/__init__.py new file mode 100644 index 0000000..2ee7962 --- /dev/null +++ b/hub/scribe/__init__.py @@ -0,0 +1 @@ +from hub.scribe.network import LBCTestNet, LBCRegTest, LBCMainNet diff --git a/scribe/blockchain/__main__.py b/hub/scribe/__main__.py similarity index 80% rename from scribe/blockchain/__main__.py rename to hub/scribe/__main__.py index 32f851a..1184b2b 100644 --- a/scribe/blockchain/__main__.py +++ b/hub/scribe/__main__.py @@ -2,9 +2,9 @@ import os import logging import traceback import argparse -from scribe.common import setup_logging -from scribe.blockchain.env import BlockchainEnv -from scribe.blockchain.service import BlockchainProcessorService +from hub.common import setup_logging +from hub.scribe.env import BlockchainEnv +from hub.scribe.service import BlockchainProcessorService def main(): diff --git a/scribe/blockchain/daemon.py b/hub/scribe/daemon.py similarity index 98% rename from scribe/blockchain/daemon.py rename to hub/scribe/daemon.py index 745eaa3..b5af795 100644 --- a/scribe/blockchain/daemon.py +++ b/hub/scribe/daemon.py @@ -7,8 +7,8 @@ from functools import wraps import aiohttp from prometheus_client import Gauge, Histogram -from scribe import PROMETHEUS_NAMESPACE -from scribe.common import LRUCacheWithMetrics, RPCError, DaemonError, WarmingUpError, WorkQueueFullError +from hub import PROMETHEUS_NAMESPACE +from hub.common import LRUCacheWithMetrics, RPCError, DaemonError, WarmingUpError, WorkQueueFullError log = logging.getLogger(__name__) diff --git a/scribe/blockchain/env.py b/hub/scribe/env.py similarity index 99% rename from scribe/blockchain/env.py rename to hub/scribe/env.py index ea5c560..e96669a 100644 --- a/scribe/blockchain/env.py +++ b/hub/scribe/env.py @@ -1,4 +1,4 @@ -from scribe.env import Env +from hub.env import Env class BlockchainEnv(Env): diff --git a/scribe/blockchain/mempool.py b/hub/scribe/mempool.py similarity index 97% rename from scribe/blockchain/mempool.py rename to hub/scribe/mempool.py index 848f929..9bf6221 100644 --- a/scribe/blockchain/mempool.py +++ b/hub/scribe/mempool.py @@ -2,10 +2,10 @@ import itertools import attr import typing from collections import defaultdict -from scribe.blockchain.transaction.deserializer import Deserializer +from hub.scribe.transaction.deserializer import Deserializer if typing.TYPE_CHECKING: - from scribe.db import HubDB + from hub.db import HubDB @attr.s(slots=True) diff --git a/scribe/blockchain/network.py b/hub/scribe/network.py similarity index 96% rename from scribe/blockchain/network.py rename to hub/scribe/network.py index eb63ecf..851b02b 100644 --- a/scribe/blockchain/network.py +++ b/hub/scribe/network.py @@ -4,12 +4,12 @@ import typing from typing import List from hashlib import sha256 from decimal import Decimal -from scribe.schema.base58 import Base58 -from scribe.schema.bip32 import PublicKey -from scribe.common import hash160, hash_to_hex_str, double_sha256 -from scribe.blockchain.transaction import TxOutput, TxInput, Block -from scribe.blockchain.transaction.deserializer import Deserializer -from scribe.blockchain.transaction.script import OpCodes, P2PKH_script, P2SH_script, txo_script_parser +from hub.schema.base58 import Base58 +from hub.schema.bip32 import PublicKey +from hub.common import hash160, hash_to_hex_str, double_sha256 +from hub.scribe.transaction import TxOutput, TxInput, Block +from hub.scribe.transaction.deserializer import Deserializer +from hub.scribe.transaction.script import OpCodes, P2PKH_script, P2SH_script, txo_script_parser HASHX_LEN = 11 diff --git a/scribe/blockchain/prefetcher.py b/hub/scribe/prefetcher.py similarity index 97% rename from scribe/blockchain/prefetcher.py rename to hub/scribe/prefetcher.py index ab4eaff..612d1ab 100644 --- a/scribe/blockchain/prefetcher.py +++ b/hub/scribe/prefetcher.py @@ -2,8 +2,8 @@ import asyncio import logging import typing if typing.TYPE_CHECKING: - from scribe.blockchain.network import LBCMainNet - from scribe.blockchain.daemon import LBCDaemon + from hub.scribe.network import LBCMainNet + from hub.scribe.daemon import LBCDaemon def chunks(items, size): diff --git a/scribe/blockchain/service.py b/hub/scribe/service.py similarity index 98% rename from scribe/blockchain/service.py rename to hub/scribe/service.py index 078f06b..b22451c 100644 --- a/scribe/blockchain/service.py +++ b/hub/scribe/service.py @@ -7,20 +7,20 @@ from typing import Optional, List, Tuple, Set, DefaultDict, Dict from prometheus_client import Gauge, Histogram from collections import defaultdict -from scribe import PROMETHEUS_NAMESPACE -from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE -from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue -from scribe.error.base import ChainError -from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LRUCache -from scribe.blockchain.daemon import LBCDaemon -from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block -from scribe.blockchain.prefetcher import Prefetcher -from scribe.blockchain.mempool import MemPool -from scribe.schema.url import normalize_name -from scribe.service import BlockchainService +from hub import PROMETHEUS_NAMESPACE +from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE +from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue +from hub.error.base import ChainError +from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LRUCache +from hub.scribe.daemon import LBCDaemon +from hub.scribe.transaction import Tx, TxOutput, TxInput, Block +from hub.scribe.prefetcher import Prefetcher +from hub.scribe.mempool import MemPool +from hub.schema.url import normalize_name +from hub.service import BlockchainService if typing.TYPE_CHECKING: - from scribe.blockchain.env import BlockchainEnv - from scribe.db.revertable import RevertableOpStack + from hub.scribe.env import BlockchainEnv + from hub.db.revertable import RevertableOpStack NAMESPACE = f"{PROMETHEUS_NAMESPACE}_writer" @@ -1725,9 +1725,9 @@ class BlockchainProcessorService(BlockchainService): def _iter_start_tasks(self): while self.db.db_version < max(self.db.DB_VERSIONS): if self.db.db_version == 7: - from scribe.db.migrators.migrate7to8 import migrate, FROM_VERSION, TO_VERSION + from hub.db.migrators.migrate7to8 import migrate, FROM_VERSION, TO_VERSION elif self.db.db_version == 8: - from scribe.db.migrators.migrate8to9 import migrate, FROM_VERSION, TO_VERSION + from hub.db.migrators.migrate8to9 import migrate, FROM_VERSION, TO_VERSION self.db._index_address_status = self.env.index_address_status else: raise RuntimeError("unknown db version") diff --git a/scribe/blockchain/transaction/__init__.py b/hub/scribe/transaction/__init__.py similarity index 98% rename from scribe/blockchain/transaction/__init__.py rename to hub/scribe/transaction/__init__.py index 88d5ce9..3d894fd 100644 --- a/scribe/blockchain/transaction/__init__.py +++ b/hub/scribe/transaction/__init__.py @@ -3,8 +3,8 @@ import functools import typing from dataclasses import dataclass from struct import Struct -from scribe.schema.claim import Claim -from scribe.common import double_sha256 +from hub.schema.claim import Claim +from hub.common import double_sha256 if (sys.version_info.major, sys.version_info.minor) > (3, 7): cachedproperty = functools.cached_property diff --git a/scribe/blockchain/transaction/deserializer.py b/hub/scribe/transaction/deserializer.py similarity index 97% rename from scribe/blockchain/transaction/deserializer.py rename to hub/scribe/transaction/deserializer.py index c9e027b..7f540cb 100644 --- a/scribe/blockchain/transaction/deserializer.py +++ b/hub/scribe/transaction/deserializer.py @@ -1,9 +1,9 @@ -from scribe.common import double_sha256 -from scribe.blockchain.transaction import ( +from hub.common import double_sha256 +from hub.scribe.transaction import ( unpack_le_int32_from, unpack_le_int64_from, unpack_le_uint16_from, unpack_le_uint32_from, unpack_le_uint64_from, Tx, TxInput, TxOutput ) -from scribe.blockchain.transaction.script import txo_script_parser +from hub.scribe.transaction.script import txo_script_parser class Deserializer: diff --git a/scribe/blockchain/transaction/script.py b/hub/scribe/transaction/script.py similarity index 97% rename from scribe/blockchain/transaction/script.py rename to hub/scribe/transaction/script.py index 4be83be..df4a2c4 100644 --- a/scribe/blockchain/transaction/script.py +++ b/hub/scribe/transaction/script.py @@ -1,6 +1,6 @@ import typing -from scribe.blockchain.transaction import NameClaim, ClaimUpdate, ClaimSupport -from scribe.blockchain.transaction import unpack_le_uint16_from, unpack_le_uint32_from, pack_le_uint16, pack_le_uint32 +from hub.scribe.transaction import NameClaim, ClaimUpdate, ClaimSupport +from hub.scribe.transaction import unpack_le_uint16_from, unpack_le_uint32_from, pack_le_uint16, pack_le_uint32 class _OpCodes(typing.NamedTuple): diff --git a/scribe/service.py b/hub/service.py similarity index 97% rename from scribe/service.py rename to hub/service.py index d2227fd..52ddaf1 100644 --- a/scribe/service.py +++ b/hub/service.py @@ -5,12 +5,12 @@ import signal from concurrent.futures.thread import ThreadPoolExecutor from prometheus_client import Gauge, Histogram -from scribe import __version__, PROMETHEUS_NAMESPACE -from scribe.env import Env -from scribe.db import HubDB -from scribe.db.prefixes import DBState -from scribe.common import HISTOGRAM_BUCKETS -from scribe.metrics import PrometheusServer +from hub import __version__, PROMETHEUS_NAMESPACE +from hub.env import Env +from hub.db import HubDB +from hub.db.prefixes import DBState +from hub.common import HISTOGRAM_BUCKETS +from hub.metrics import PrometheusServer class BlockchainService: diff --git a/scribe/blockchain/__init__.py b/scribe/blockchain/__init__.py deleted file mode 100644 index 8e0dfca..0000000 --- a/scribe/blockchain/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from scribe.blockchain.network import LBCTestNet, LBCRegTest, LBCMainNet diff --git a/scribe/common.py b/scribe/common.py deleted file mode 100644 index be4cb4c..0000000 --- a/scribe/common.py +++ /dev/null @@ -1,400 +0,0 @@ -import hashlib -import hmac -import ipaddress -import logging -import logging.handlers -import typing -import collections -from asyncio import get_event_loop, Event -from prometheus_client import Counter - -log = logging.getLogger(__name__) - - -_sha256 = hashlib.sha256 -_sha512 = hashlib.sha512 -_new_hash = hashlib.new -_new_hmac = hmac.new -HASHX_LEN = 11 -CLAIM_HASH_LEN = 20 - - -HISTOGRAM_BUCKETS = ( - .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') -) - - -def setup_logging(log_path: str): - log = logging.getLogger('scribe') - fmt = logging.Formatter("%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") - handler = logging.handlers.RotatingFileHandler(log_path, maxBytes=1024*1024*5, backupCount=2) - handler.setFormatter(fmt) - log.addHandler(handler) - handler = logging.StreamHandler() - handler.setFormatter(fmt) - log.addHandler(handler) - - log.setLevel(logging.INFO) - logging.getLogger('aiohttp').setLevel(logging.WARNING) - logging.getLogger('elasticsearch').setLevel(logging.WARNING) - - -class StagedClaimtrieItem(typing.NamedTuple): - """ - Represents a claim TXO, used internally by the block processor - """ - name: str - normalized_name: str - claim_hash: bytes - amount: int - expiration_height: int - tx_num: int - position: int - root_tx_num: int - root_position: int - channel_signature_is_valid: bool - signing_hash: typing.Optional[bytes] - reposted_claim_hash: typing.Optional[bytes] - - @property - def is_update(self) -> bool: - return (self.tx_num, self.position) != (self.root_tx_num, self.root_position) - - def invalidate_signature(self) -> 'StagedClaimtrieItem': - return StagedClaimtrieItem( - self.name, self.normalized_name, self.claim_hash, self.amount, self.expiration_height, self.tx_num, - self.position, self.root_tx_num, self.root_position, False, None, self.reposted_claim_hash - ) - - -def formatted_time(t, sep=' '): - """Return a number of seconds as a string in days, hours, mins and - maybe secs.""" - t = int(t) - fmts = (('{:d}d', 86400), ('{:02d}h', 3600), ('{:02d}m', 60)) - parts = [] - for fmt, n in fmts: - val = t // n - if parts or val: - parts.append(fmt.format(val)) - t %= n - if len(parts) < 3: - parts.append(f'{t:02d}s') - return sep.join(parts) - - -def protocol_tuple(s): - """Converts a protocol version number, such as "1.0" to a tuple (1, 0). - - If the version number is bad, (0, ) indicating version 0 is returned.""" - try: - return tuple(int(part) for part in s.split('.')) - except Exception: - return (0, ) - - -def version_string(ptuple): - """Convert a version tuple such as (1, 2) to "1.2". - There is always at least one dot, so (1, ) becomes "1.0".""" - while len(ptuple) < 2: - ptuple += (0, ) - return '.'.join(str(p) for p in ptuple) - - -def protocol_version(client_req, min_tuple, max_tuple): - """Given a client's protocol version string, return a pair of - protocol tuples: - (negotiated version, client min request) - If the request is unsupported, the negotiated protocol tuple is - None. - """ - if client_req is None: - client_min = client_max = min_tuple - else: - if isinstance(client_req, list) and len(client_req) == 2: - client_min, client_max = client_req - else: - client_min = client_max = client_req - client_min = protocol_tuple(client_min) - client_max = protocol_tuple(client_max) - - result = min(client_max, max_tuple) - if result < max(client_min, min_tuple) or result == (0, ): - result = None - - return result, client_min - - -class LRUCacheWithMetrics: - __slots__ = [ - 'capacity', - 'cache', - '_track_metrics', - 'hits', - 'misses' - ] - - def __init__(self, capacity: int, metric_name: typing.Optional[str] = None, namespace: str = "daemon_cache"): - self.capacity = capacity - self.cache = collections.OrderedDict() - if metric_name is None: - self._track_metrics = False - self.hits = self.misses = None - else: - self._track_metrics = True - try: - self.hits = Counter( - f"{metric_name}_cache_hit_count", "Number of cache hits", namespace=namespace - ) - self.misses = Counter( - f"{metric_name}_cache_miss_count", "Number of cache misses", namespace=namespace - ) - except ValueError as err: - log.debug("failed to set up prometheus %s_cache_miss_count metric: %s", metric_name, err) - self._track_metrics = False - self.hits = self.misses = None - - def get(self, key, default=None): - try: - value = self.cache.pop(key) - if self._track_metrics: - self.hits.inc() - except KeyError: - if self._track_metrics: - self.misses.inc() - return default - self.cache[key] = value - return value - - def set(self, key, value): - try: - self.cache.pop(key) - except KeyError: - if len(self.cache) >= self.capacity: - self.cache.popitem(last=False) - self.cache[key] = value - - def clear(self): - self.cache.clear() - - def pop(self, key): - return self.cache.pop(key) - - def __setitem__(self, key, value): - return self.set(key, value) - - def __getitem__(self, item): - return self.get(item) - - def __contains__(self, item) -> bool: - return item in self.cache - - def __len__(self): - return len(self.cache) - - def __delitem__(self, key): - self.cache.pop(key) - - def __del__(self): - self.clear() - - -class LRUCache: - __slots__ = [ - 'capacity', - 'cache' - ] - - def __init__(self, capacity: int): - self.capacity = capacity - self.cache = collections.OrderedDict() - - def get(self, key, default=None): - try: - value = self.cache.pop(key) - except KeyError: - return default - self.cache[key] = value - return value - - def set(self, key, value): - try: - self.cache.pop(key) - except KeyError: - if len(self.cache) >= self.capacity: - self.cache.popitem(last=False) - self.cache[key] = value - - def items(self): - return self.cache.items() - - def clear(self): - self.cache.clear() - - def pop(self, key, default=None): - return self.cache.pop(key, default) - - def __setitem__(self, key, value): - return self.set(key, value) - - def __getitem__(self, item): - return self.get(item) - - def __contains__(self, item) -> bool: - return item in self.cache - - def __len__(self): - return len(self.cache) - - def __delitem__(self, key): - self.cache.pop(key) - - def __del__(self): - self.clear() - - -# the ipaddress module does not show these subnets as reserved -CARRIER_GRADE_NAT_SUBNET = ipaddress.ip_network('100.64.0.0/10') -IPV4_TO_6_RELAY_SUBNET = ipaddress.ip_network('192.88.99.0/24') - - -def is_valid_public_ipv4(address, allow_localhost: bool = False, allow_lan: bool = False): - try: - parsed_ip = ipaddress.ip_address(address) - if parsed_ip.is_loopback and allow_localhost: - return True - if allow_lan and parsed_ip.is_private: - return True - if any((parsed_ip.version != 4, parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback, - parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private)): - return False - else: - return not any((CARRIER_GRADE_NAT_SUBNET.supernet_of(ipaddress.ip_network(f"{address}/32")), - IPV4_TO_6_RELAY_SUBNET.supernet_of(ipaddress.ip_network(f"{address}/32")))) - except (ipaddress.AddressValueError, ValueError): - return False - - -def sha256(x): - """Simple wrapper of hashlib sha256.""" - return _sha256(x).digest() - - -def ripemd160(x): - """Simple wrapper of hashlib ripemd160.""" - h = _new_hash('ripemd160') - h.update(x) - return h.digest() - - -def double_sha256(x): - """SHA-256 of SHA-256, as used extensively in bitcoin.""" - return sha256(sha256(x)) - - -def hmac_sha512(key, msg): - """Use SHA-512 to provide an HMAC.""" - return _new_hmac(key, msg, _sha512).digest() - - -def hash160(x): - """RIPEMD-160 of SHA-256. Used to make bitcoin addresses from pubkeys.""" - return ripemd160(sha256(x)) - - -def hash_to_hex_str(x: bytes) -> str: - """Convert a big-endian binary hash to displayed hex string. - - Display form of a binary hash is reversed and converted to hex. - """ - return x[::-1].hex() - - -def hex_str_to_hash(x: str) -> bytes: - """Convert a displayed hex string to a binary hash.""" - return bytes.fromhex(x)[::-1] - - - -INVALID_REQUEST = -32600 -INVALID_ARGS = -32602 - - -class CodeMessageError(Exception): - - @property - def code(self): - return self.args[0] - - @property - def message(self): - return self.args[1] - - def __eq__(self, other): - return (isinstance(other, self.__class__) and - self.code == other.code and self.message == other.message) - - def __hash__(self): - # overridden to make the exception hashable - # see https://bugs.python.org/issue28603 - return hash((self.code, self.message)) - - @classmethod - def invalid_args(cls, message): - return cls(INVALID_ARGS, message) - - @classmethod - def invalid_request(cls, message): - return cls(INVALID_REQUEST, message) - - @classmethod - def empty_batch(cls): - return cls.invalid_request('batch is empty') - - -class RPCError(CodeMessageError): - pass - - - -class DaemonError(Exception): - """Raised when the daemon returns an error in its results.""" - - -class WarmingUpError(Exception): - """Internal - when the daemon is warming up.""" - - -class WorkQueueFullError(Exception): - """Internal - when the daemon's work queue is full.""" - - -class TaskGroup: - def __init__(self, loop=None): - self._loop = loop or get_event_loop() - self._tasks = set() - self.done = Event() - self.started = Event() - - def __len__(self): - return len(self._tasks) - - def add(self, coro): - task = self._loop.create_task(coro) - self._tasks.add(task) - self.started.set() - self.done.clear() - task.add_done_callback(self._remove) - return task - - def _remove(self, task): - self._tasks.remove(task) - if len(self._tasks) < 1: - self.done.set() - self.started.clear() - - def cancel(self): - for task in self._tasks: - task.cancel() - self.done.set() - self.started.clear() diff --git a/scribe/elasticsearch/__init__.py b/scribe/elasticsearch/__init__.py deleted file mode 100644 index 68591c9..0000000 --- a/scribe/elasticsearch/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from scribe.elasticsearch.search import SearchIndex -from scribe.elasticsearch.notifier_protocol import ElasticNotifierClientProtocol \ No newline at end of file diff --git a/scribe/elasticsearch/constants.py b/scribe/elasticsearch/constants.py deleted file mode 100644 index 10193ef..0000000 --- a/scribe/elasticsearch/constants.py +++ /dev/null @@ -1,105 +0,0 @@ -INDEX_DEFAULT_SETTINGS = { - "settings": - {"analysis": - {"analyzer": { - "default": {"tokenizer": "whitespace", "filter": ["lowercase", "porter_stem"]}}}, - "index": - {"refresh_interval": -1, - "number_of_shards": 1, - "number_of_replicas": 0, - "sort": { - "field": ["trending_score", "release_time"], - "order": ["desc", "desc"] - }} - }, - "mappings": { - "properties": { - "claim_id": { - "fields": { - "keyword": { - "ignore_above": 256, - "type": "keyword" - } - }, - "type": "text", - "index_prefixes": { - "min_chars": 1, - "max_chars": 10 - } - }, - "sd_hash": { - "fields": { - "keyword": { - "ignore_above": 96, - "type": "keyword" - } - }, - "type": "text", - "index_prefixes": { - "min_chars": 1, - "max_chars": 4 - } - }, - "height": {"type": "integer"}, - "claim_type": {"type": "byte"}, - "censor_type": {"type": "byte"}, - "trending_score": {"type": "double"}, - "release_time": {"type": "long"} - } - } -} - -FIELDS = { - '_id', - 'claim_id', 'claim_type', 'claim_name', 'normalized_name', - 'tx_id', 'tx_nout', 'tx_position', - 'short_url', 'canonical_url', - 'is_controlling', 'last_take_over_height', - 'public_key_bytes', 'public_key_id', 'claims_in_channel', - 'channel_id', 'signature', 'signature_digest', 'is_signature_valid', - 'amount', 'effective_amount', 'support_amount', - 'fee_amount', 'fee_currency', - 'height', 'creation_height', 'activation_height', 'expiration_height', - 'stream_type', 'media_type', 'censor_type', - 'title', 'author', 'description', - 'timestamp', 'creation_timestamp', - 'duration', 'release_time', - 'tags', 'languages', 'has_source', 'reposted_claim_type', - 'reposted_claim_id', 'repost_count', 'sd_hash', - 'trending_score', 'tx_num', - 'channel_tx_id', 'channel_tx_position', 'channel_height', 'reposted_tx_id', - 'reposted_tx_position', 'reposted_height', -} - -TEXT_FIELDS = { - 'author', 'canonical_url', 'channel_id', 'description', 'claim_id', 'censoring_channel_id', - 'media_type', 'normalized_name', 'public_key_bytes', 'public_key_id', 'short_url', 'signature', - 'claim_name', 'signature_digest', 'title', 'tx_id', 'fee_currency', 'reposted_claim_id', - 'tags', 'sd_hash', 'channel_tx_id', 'reposted_tx_id', -} - -RANGE_FIELDS = { - 'height', 'creation_height', 'activation_height', 'expiration_height', - 'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount', - 'tx_position', 'repost_count', 'limit_claims_per_channel', - 'amount', 'effective_amount', 'support_amount', - 'trending_score', 'censor_type', 'tx_num', 'reposted_tx_position', 'reposted_height', - 'channel_tx_position', 'channel_height', -} - -ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS - -REPLACEMENTS = { - 'claim_name': 'normalized_name', - 'name': 'normalized_name', - 'txid': 'tx_id', - 'nout': 'tx_nout', - 'trending_group': 'trending_score', - 'trending_mixed': 'trending_score', - 'trending_global': 'trending_score', - 'trending_local': 'trending_score', - 'reposted': 'repost_count', - 'stream_types': 'stream_type', - 'media_types': 'media_type', - 'valid_channel_signature': 'is_signature_valid' -} diff --git a/scripts/deploy_scribe_dev.sh b/scripts/deploy_scribe_dev.sh index e22c196..7f3c454 100755 --- a/scripts/deploy_scribe_dev.sh +++ b/scripts/deploy_scribe_dev.sh @@ -4,10 +4,9 @@ TARGET_HOST=$1 SCRIPTS_DIR=`dirname $0` -SCRIBE_DIR=`dirname $SCRIPTS_DIR` # build the image -docker build -f $SCRIBE_DIR/docker/Dockerfile -t lbry/scribe:development $SCRIBE_DIR +docker build -t lbry/scribe:development . IMAGE=`docker image inspect lbry/scribe:development | sed -n "s/^.*Id\":\s*\"sha256:\s*\(\S*\)\".*$/\1/p"` # push the image to the server diff --git a/docker/scribe_entrypoint.sh b/scripts/entrypoint.sh similarity index 52% rename from docker/scribe_entrypoint.sh rename to scripts/entrypoint.sh index 2f44c48..6ff7cb1 100755 --- a/docker/scribe_entrypoint.sh +++ b/scripts/entrypoint.sh @@ -5,13 +5,13 @@ set -euo pipefail if [ -z "$HUB_COMMAND" ]; then - echo "HUB_COMMAND env variable must be scribe, scribe-hub, or scribe-elastic-sync" + echo "HUB_COMMAND env variable must be scribe, herald, or scribe-elastic-sync" exit 1 fi case "$HUB_COMMAND" in scribe ) exec /home/lbry/.local/bin/scribe "$@" ;; - scribe-hub ) exec /home/lbry/.local/bin/scribe-hub "$@" ;; + herald ) exec /home/lbry/.local/bin/herald "$@" ;; scribe-elastic-sync ) exec /home/lbry/.local/bin/scribe-elastic-sync "$@" ;; - * ) "HUB_COMMAND env variable must be scribe, scribe-hub, or scribe-elastic-sync" && exit 1 ;; + * ) "HUB_COMMAND env variable must be scribe, herald, or scribe-elastic-sync" && exit 1 ;; esac diff --git a/docker/set_build.py b/scripts/set_build.py similarity index 96% rename from docker/set_build.py rename to scripts/set_build.py index 4cefcce..af89f93 100644 --- a/docker/set_build.py +++ b/scripts/set_build.py @@ -2,7 +2,7 @@ import sys import os import re import logging -import scribe.build_info as build_info_mod +import hub.build_info as build_info_mod log = logging.getLogger() log.addHandler(logging.StreamHandler()) diff --git a/setup.py b/setup.py index c546d40..41ea264 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,5 @@ import os -from scribe import __name__, __version__ +from hub import __name__, __version__ from setuptools import setup, find_packages BASE = os.path.dirname(__file__) @@ -23,9 +23,9 @@ setup( zip_safe=False, entry_points={ 'console_scripts': [ - 'scribe=scribe.blockchain.__main__:main', - 'scribe-hub=scribe.hub.__main__:main', - 'scribe-elastic-sync=scribe.elasticsearch.__main__:main', + 'scribe=hub.scribe.__main__:main', + 'herald=hub.herald.__main__:main', + 'scribe-elastic-sync=hub.elastic_sync.__main__:main', ], }, install_requires=[ diff --git a/tests/test_resolve_command.py b/tests/test_resolve_command.py index 6a266df..8d99fb9 100644 --- a/tests/test_resolve_command.py +++ b/tests/test_resolve_command.py @@ -8,7 +8,7 @@ from collections import defaultdict from typing import NamedTuple, List from lbry.testcase import CommandTestCase from lbry.wallet.transaction import Transaction, Output -from scribe.schema.compat import OldClaimMessage +from hub.schema.compat import OldClaimMessage from lbry.crypto.hash import sha256 from lbry.crypto.base58 import Base58 diff --git a/tests/test_revertable.py b/tests/test_revertable.py index 3d566f0..d6b0dce 100644 --- a/tests/test_revertable.py +++ b/tests/test_revertable.py @@ -1,8 +1,8 @@ import unittest import tempfile import shutil -from scribe.db.revertable import RevertableOpStack, RevertableDelete, RevertablePut, OpStackIntegrity -from scribe.db.prefixes import ClaimToTXOPrefixRow, PrefixDB +from hub.db.revertable import RevertableOpStack, RevertableDelete, RevertablePut, OpStackIntegrity +from hub.db.prefixes import ClaimToTXOPrefixRow, PrefixDB class TestRevertableOpStack(unittest.TestCase): diff --git a/tests/testcase.py b/tests/testcase.py index f9d3865..e31dceb 100644 --- a/tests/testcase.py +++ b/tests/testcase.py @@ -20,7 +20,7 @@ from lbry.wallet.util import satoshis_to_coins from lbry.wallet.dewies import lbc_to_dewies from lbry.wallet.orchstr8 import Conductor from lbry.wallet.orchstr8.node import LBCWalletNode, WalletNode, HubNode -from scribe.schema.claim import Claim +from hub.schema.claim import Claim from lbry.extras.daemon.daemon import Daemon, jsonrpc_dumps_pretty from lbry.extras.daemon.components import Component, WalletComponent