From 6f227674868df6082a77e8792643fe8eac1e3bdb Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 5 May 2022 14:07:39 -0400 Subject: [PATCH] split env classes --- README.md | 198 +------------------------------ cluster_guide.md | 194 ++++++++++++++++++++++++++++++ docker/Dockerfile | 3 - scribe/blockchain/__main__.py | 7 +- scribe/blockchain/env.py | 33 ++++++ scribe/elasticsearch/__main__.py | 7 +- scribe/elasticsearch/env.py | 54 +++++++++ scribe/env.py | 129 ++------------------ scribe/hub/__main__.py | 8 +- scribe/hub/env.py | 113 ++++++++++++++++++ 10 files changed, 416 insertions(+), 330 deletions(-) create mode 100644 cluster_guide.md create mode 100644 scribe/blockchain/env.py create mode 100644 scribe/elasticsearch/env.py create mode 100644 scribe/hub/env.py diff --git a/README.md b/README.md index 9f8c4f1..8c1c4b4 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ docker build -f ./docker/Dockerfile.scribe -t lbry/scribe:development . Scribe has been tested with python 3.7-3.9. Higher versions probably work but have not yet been tested. -1. clone the scribe scribe +1. clone the scribe repo ``` git clone https://github.com/lbryio/scribe.git cd scribe @@ -64,204 +64,10 @@ With options for high performance, if you have 64gb of memory and 12 cores, ever As of block 1147423 (4/21/22) the size of the scribe rocksdb database is 120GB and the size of the elasticsearch volume is 63GB. ### docker-compose -The recommended way to run a scribe hub is with docker. +The recommended way to run a scribe hub is with docker. See [this guide](https://github.com/lbryio/scribe/blob/master/cluster_guide.md) for instructions on how to run the services on a cluster. If you have the resources to run all of the services on one machine (at least 300gb of fast storage, preferably nvme, 64gb of RAM, 12 fast cores), see [this](https://github.com/lbryio/scribe/blob/master/docker/docker-compose.yml) docker-compose example. -#### Cluster environment -For best performance the recommended setup uses three server instances, these can be rented VPSs, self hosted VMs (ideally not on one physical host unless the host is sufficiently powerful), or physical computers. One is a dedicated lbcd node, one an elasticsearch server, and the third runs the scribe services. With this configuration the lbcd and elasticsearch servers can be shared between multiple scribe hub servers - more on that later. -Server Requirements (space requirements are at least double what's needed so it's possible to copy snapshots into place or make snapshots): - - lbcd: 2 cores, 8gb ram (slightly more may be required syncing from scratch, from a snapshot 8 is plenty), 150gb of NVMe storage - - elasticsearch: 8 cores, 9gb of ram (8gb minimum given to ES), 150gb of SSD speed storage - - scribe: 8 cores, 32gb of ram, 200gb of NVMe storage - -All servers are assumed to be running ubuntu 20.04 with user named `lbry` with passwordless sudo and docker group permissions, ssh configured, ulimits set high, and docker + docker-compose installed. The server running elasticsearch should have swap disabled. The three servers need to be able to communicate with each other, they can be on a local network together or communicate over the internet. This guide will assume the three servers are on the internet. - -##### Setting up the lbcd instance -Log in to the lbcd instance and perform the following steps: - - Build the lbcd docker image by running -``` -git clone https://github.com/lbryio/lbcd.git -cd lbcd -docker build . -t lbry/lbcd:latest -``` - - Copy the following to `~/docker-compose.yml` -``` -version: "3" - -volumes: - lbcd: - -services: - lbcd: - image: lbry/lbcd:latest - restart: always - network_mode: host - command: - - "--rpcuser=lbry" - - "--rpcpass=lbry" - - "--rpclisten=127.0.0.1" - volumes: - - "lbcd:/root/.lbcd" - ports: - - "127.0.0.1:9245:9245" - - "9246:9246" # p2p port -``` - - Start lbcd by running `docker-compose up -d` - - Check the progress with `docker-compose logs -f --tail 100` - -##### Setting up the elasticsearch instance -Log in to the elasticsearch instance and perform the following steps: - - Copy the following to `~/docker-compose.yml` -``` -version: "3" - -volumes: - es01: - -services: - es01: - image: docker.elastic.co/elasticsearch/elasticsearch:7.16.0 - container_name: es01 - environment: - - node.name=es01 - - discovery.type=single-node - - indices.query.bool.max_clause_count=8192 - - bootstrap.memory_lock=true - - "ES_JAVA_OPTS=-Dlog4j2.formatMsgNoLookups=true -Xms8g -Xmx8g" # no more than 32, remember to disable swap - ulimits: - memlock: - soft: -1 - hard: -1 - volumes: - - "es01:/usr/share/elasticsearch/data" - ports: - - "127.0.0.1:9200:9200" -``` - - Start elasticsearch by running `docker-compose up -d` - - Check the status with `docker-compose logs -f --tail 100` - -##### Setting up the scribe hub instance - - Log in (ssh) to the scribe instance and generate and print out a ssh key, this is needed to set up port forwards to the other two instances. Copy the output of the following: -``` -ssh-keygen -q -t ed25519 -N '' -f ~/.ssh/id_ed25519 <</dev/null 2>&1 -``` - - After copying the above key, log out of the scribe hub instance. - - - Log in to the elasticsearch instance add the copied key to `~/.ssh/authorized_keys` (see [this](https://stackoverflow.com/questions/6377009/adding-a-public-key-to-ssh-authorized-keys-does-not-log-me-in-automatically) if confused). Log out of the elasticsearch instance once done. - - Log in to the lbcd instance and add the copied key to `~/.ssh/authorized_keys`, log out when done. - - Log in to the scribe instance and copy the following to `/etc/systemd/system/es-tunnel.service`, replacing `lbry` with your user and `your-elastic-ip` with your elasticsearch instance ip. -``` -[Unit] -Description=Persistent SSH Tunnel for ES -After=network.target - -[Service] -Restart=on-failure -RestartSec=5 -ExecStart=/usr/bin/ssh -NTC -o ServerAliveInterval=60 -o ExitOnForwardFailure=yes -L 127.0.0.1:9200:127.0.0.1:9200 lbry@your-elastic-ip -User=lbry -Group=lbry - -[Install] -WantedBy=multi-user.target -``` - - Next, copy the following to `/etc/systemd/system/lbcd-tunnel.service` on the scribe instance, replacing `lbry` with your user and `your-lbcd-ip` with your lbcd instance ip. -``` -[Unit] -Description=Persistent SSH Tunnel for lbcd -After=network.target - -[Service] -Restart=on-failure -RestartSec=5 -ExecStart=/usr/bin/ssh -NTC -o ServerAliveInterval=60 -o ExitOnForwardFailure=yes -L 127.0.0.1:9245:127.0.0.1:9245 lbry@your-lbcd-ip -User=lbry -Group=lbry - -[Install] -WantedBy=multi-user.target -``` - - Verify you can ssh in to the elasticsearch and lbcd instances from the scribe instance - - Enable and start the ssh port forward services on the scribe instance -``` -sudo systemctl enable es-tunnel.service -sudo systemctl enable lbcd-tunnel.service -sudo systemctl start es-tunnel.service -sudo systemctl start lbcd-tunnel.service -``` - - Build the scribe docker image on the scribe hub instance by running the following: -``` -git clone https://github.com/lbryio/scribe.git -cd scribe -docker build -f ./docker/Dockerfile.scribe -t lbry/scribe:development . -``` - - Copy the following to `~/docker-compose.yml` on the scribe instance -``` -version: "3" - -volumes: - lbry_rocksdb: - -services: - scribe: - depends_on: - - scribe_elastic_sync - image: lbry/scribe:${SCRIBE_TAG:-development} - restart: always - network_mode: host - volumes: - - "lbry_rocksdb:/database" - environment: - - HUB_COMMAND=scribe - command: - - "--daemon_url=http://lbry:lbry@127.0.0.1:9245" - - "--max_query_workers=2" - - "--cache_all_tx_hashes" - scribe_elastic_sync: - image: lbry/scribe:${SCRIBE_TAG:-development} - restart: always - network_mode: host - ports: - - "127.0.0.1:19080:19080" # elastic notifier port - volumes: - - "lbry_rocksdb:/database" - environment: - - HUB_COMMAND=scribe-elastic-sync - command: - - "--elastic_host=127.0.0.1" - - "--elastic_port=9200" - - "--max_query_workers=2" - - "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8" - - "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6" - scribe_hub: - depends_on: - - scribe_elastic_sync - - scribe - image: lbry/scribe:${SCRIBE_TAG:-development} - restart: always - network_mode: host - ports: - - "50001:50001" # electrum rpc port and udp ping port - - "2112:2112" # comment out to disable prometheus metrics - volumes: - - "lbry_rocksdb:/database" - environment: - - HUB_COMMAND=scribe-hub - command: - - "--daemon_url=http://lbry:lbry@127.0.0.1:9245" - - "--elastic_host=127.0.0.1" - - "--elastic_port=9200" - - "--max_query_workers=4" - - "--host=0.0.0.0" - - "--max_sessions=100000" - - "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8" - - "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6" - - "--prometheus_port=2112" # comment out to disable prometheus metrics -``` - - Start the scribe hub services by running `docker-compose up -d` - - Check the status with `docker-compose logs -f --tail 100` - ### From source ### Options diff --git a/cluster_guide.md b/cluster_guide.md new file mode 100644 index 0000000..0958a40 --- /dev/null +++ b/cluster_guide.md @@ -0,0 +1,194 @@ +## Cluster environment guide + +For best performance the recommended setup uses three server instances, these can be rented VPSs, self hosted VMs (ideally not on one physical host unless the host is sufficiently powerful), or physical computers. One is a dedicated lbcd node, one an elasticsearch server, and the third runs the scribe services. With this configuration the lbcd and elasticsearch servers can be shared between multiple scribe hub servers - more on that later. +Server Requirements (space requirements are at least double what's needed so it's possible to copy snapshots into place or make snapshots): + - lbcd: 2 cores, 8gb ram (slightly more may be required syncing from scratch, from a snapshot 8 is plenty), 150gb of NVMe storage + - elasticsearch: 8 cores, 9gb of ram (8gb minimum given to ES), 150gb of SSD speed storage + - scribe: 8 cores, 32gb of ram, 200gb of NVMe storage + +All servers are assumed to be running ubuntu 20.04 with user named `lbry` with passwordless sudo and docker group permissions, ssh configured, ulimits set high, and docker + docker-compose installed. The server running elasticsearch should have swap disabled. The three servers need to be able to communicate with each other, they can be on a local network together or communicate over the internet. This guide will assume the three servers are on the internet. + +### Setting up the lbcd instance +Log in to the lbcd instance and perform the following steps: + - Build the lbcd docker image by running +``` +git clone https://github.com/lbryio/lbcd.git +cd lbcd +docker build . -t lbry/lbcd:latest +``` + - Copy the following to `~/docker-compose.yml` +``` +version: "3" + +volumes: + lbcd: + +services: + lbcd: + image: lbry/lbcd:latest + restart: always + network_mode: host + command: + - "--rpcuser=lbry" + - "--rpcpass=lbry" + - "--rpclisten=127.0.0.1" + volumes: + - "lbcd:/root/.lbcd" + ports: + - "127.0.0.1:9245:9245" + - "9246:9246" # p2p port +``` + - Start lbcd by running `docker-compose up -d` + - Check the progress with `docker-compose logs -f --tail 100` + +### Setting up the elasticsearch instance +Log in to the elasticsearch instance and perform the following steps: + - Copy the following to `~/docker-compose.yml` +``` +version: "3" + +volumes: + es01: + +services: + es01: + image: docker.elastic.co/elasticsearch/elasticsearch:7.16.0 + container_name: es01 + environment: + - node.name=es01 + - discovery.type=single-node + - indices.query.bool.max_clause_count=8192 + - bootstrap.memory_lock=true + - "ES_JAVA_OPTS=-Dlog4j2.formatMsgNoLookups=true -Xms8g -Xmx8g" # no more than 32, remember to disable swap + ulimits: + memlock: + soft: -1 + hard: -1 + volumes: + - "es01:/usr/share/elasticsearch/data" + ports: + - "127.0.0.1:9200:9200" +``` + - Start elasticsearch by running `docker-compose up -d` + - Check the status with `docker-compose logs -f --tail 100` + +### Setting up the scribe hub instance + - Log in (ssh) to the scribe instance and generate and print out a ssh key, this is needed to set up port forwards to the other two instances. Copy the output of the following: +``` +ssh-keygen -q -t ed25519 -N '' -f ~/.ssh/id_ed25519 <</dev/null 2>&1 +``` + - After copying the above key, log out of the scribe hub instance. + + - Log in to the elasticsearch instance add the copied key to `~/.ssh/authorized_keys` (see [this](https://stackoverflow.com/questions/6377009/adding-a-public-key-to-ssh-authorized-keys-does-not-log-me-in-automatically) if confused). Log out of the elasticsearch instance once done. + - Log in to the lbcd instance and add the copied key to `~/.ssh/authorized_keys`, log out when done. + - Log in to the scribe instance and copy the following to `/etc/systemd/system/es-tunnel.service`, replacing `lbry` with your user and `your-elastic-ip` with your elasticsearch instance ip. +``` +[Unit] +Description=Persistent SSH Tunnel for ES +After=network.target + +[Service] +Restart=on-failure +RestartSec=5 +ExecStart=/usr/bin/ssh -NTC -o ServerAliveInterval=60 -o ExitOnForwardFailure=yes -L 127.0.0.1:9200:127.0.0.1:9200 lbry@your-elastic-ip +User=lbry +Group=lbry + +[Install] +WantedBy=multi-user.target +``` + - Next, copy the following to `/etc/systemd/system/lbcd-tunnel.service` on the scribe instance, replacing `lbry` with your user and `your-lbcd-ip` with your lbcd instance ip. +``` +[Unit] +Description=Persistent SSH Tunnel for lbcd +After=network.target + +[Service] +Restart=on-failure +RestartSec=5 +ExecStart=/usr/bin/ssh -NTC -o ServerAliveInterval=60 -o ExitOnForwardFailure=yes -L 127.0.0.1:9245:127.0.0.1:9245 lbry@your-lbcd-ip +User=lbry +Group=lbry + +[Install] +WantedBy=multi-user.target +``` + - Verify you can ssh in to the elasticsearch and lbcd instances from the scribe instance + - Enable and start the ssh port forward services on the scribe instance +``` +sudo systemctl enable es-tunnel.service +sudo systemctl enable lbcd-tunnel.service +sudo systemctl start es-tunnel.service +sudo systemctl start lbcd-tunnel.service +``` + - Build the scribe docker image on the scribe hub instance by running the following: +``` +git clone https://github.com/lbryio/scribe.git +cd scribe +docker build -f ./docker/Dockerfile.scribe -t lbry/scribe:development . +``` + - Copy the following to `~/docker-compose.yml` on the scribe instance +``` +version: "3" + +volumes: + lbry_rocksdb: + +services: + scribe: + depends_on: + - scribe_elastic_sync + image: lbry/scribe:${SCRIBE_TAG:-development} + restart: always + network_mode: host + volumes: + - "lbry_rocksdb:/database" + environment: + - HUB_COMMAND=scribe + command: + - "--daemon_url=http://lbry:lbry@127.0.0.1:9245" + - "--max_query_workers=2" + - "--cache_all_tx_hashes" + scribe_elastic_sync: + image: lbry/scribe:${SCRIBE_TAG:-development} + restart: always + network_mode: host + ports: + - "127.0.0.1:19080:19080" # elastic notifier port + volumes: + - "lbry_rocksdb:/database" + environment: + - HUB_COMMAND=scribe-elastic-sync + command: + - "--elastic_host=127.0.0.1" + - "--elastic_port=9200" + - "--max_query_workers=2" + - "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8" + - "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6" + scribe_hub: + depends_on: + - scribe_elastic_sync + - scribe + image: lbry/scribe:${SCRIBE_TAG:-development} + restart: always + network_mode: host + ports: + - "50001:50001" # electrum rpc port and udp ping port + - "2112:2112" # comment out to disable prometheus metrics + volumes: + - "lbry_rocksdb:/database" + environment: + - HUB_COMMAND=scribe-hub + command: + - "--daemon_url=http://lbry:lbry@127.0.0.1:9245" + - "--elastic_host=127.0.0.1" + - "--elastic_port=9200" + - "--max_query_workers=4" + - "--host=0.0.0.0" + - "--max_sessions=100000" + - "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8" + - "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6" + - "--prometheus_port=2112" # comment out to disable prometheus metrics +``` + - Start the scribe hub services by running `docker-compose up -d` + - Check the status with `docker-compose logs -f --tail 100` diff --git a/docker/Dockerfile b/docker/Dockerfile index a83e9ee..e72a8bb 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -41,9 +41,6 @@ RUN rm ~/.cache -rf # entry point VOLUME $db_dir ENV DB_DIRECTORY=$db_dir -ENV MAX_SEND=1000000000000000000 -ENV MAX_RECEIVE=1000000000000000000 - COPY ./docker/scribe_entrypoint.sh /entrypoint.sh ENTRYPOINT ["/entrypoint.sh"] diff --git a/scribe/blockchain/__main__.py b/scribe/blockchain/__main__.py index dbb6776..32f851a 100644 --- a/scribe/blockchain/__main__.py +++ b/scribe/blockchain/__main__.py @@ -2,8 +2,8 @@ import os import logging import traceback import argparse -from scribe.env import Env from scribe.common import setup_logging +from scribe.blockchain.env import BlockchainEnv from scribe.blockchain.service import BlockchainProcessorService @@ -11,12 +11,11 @@ def main(): parser = argparse.ArgumentParser( prog='scribe' ) - Env.contribute_common_settings_to_arg_parser(parser) - Env.contribute_writer_settings_to_arg_parser(parser) + BlockchainEnv.contribute_to_arg_parser(parser) args = parser.parse_args() try: - env = Env.from_arg_parser(args) + env = BlockchainEnv.from_arg_parser(args) setup_logging(os.path.join(env.db_dir, 'scribe.log')) block_processor = BlockchainProcessorService(env) block_processor.run() diff --git a/scribe/blockchain/env.py b/scribe/blockchain/env.py new file mode 100644 index 0000000..ee07519 --- /dev/null +++ b/scribe/blockchain/env.py @@ -0,0 +1,33 @@ +from scribe.env import Env + + +class BlockchainEnv(Env): + def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, + prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, + blocking_channel_ids=None, filtering_channel_ids=None, + db_max_open_files=64, daemon_url=None): + super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, + cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids) + self.db_max_open_files = db_max_open_files + self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') + + @classmethod + def contribute_to_arg_parser(cls, parser): + super().contribute_to_arg_parser(parser) + env_daemon_url = cls.default('DAEMON_URL', None) + parser.add_argument('--daemon_url', required=env_daemon_url is None, + help="URL for rpc from lbrycrd or lbcd, " + ":@.", + default=env_daemon_url) + parser.add_argument('--db_max_open_files', type=int, default=64, + help='This setting translates into the max_open_files option given to rocksdb. ' + 'A higher number will use more memory. Defaults to 64.') + + @classmethod + def from_arg_parser(cls, args): + return cls( + db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files, + max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit, + prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes, + cache_all_claim_txos=args.cache_all_claim_txos + ) diff --git a/scribe/elasticsearch/__main__.py b/scribe/elasticsearch/__main__.py index 47d7dc5..f968a8c 100644 --- a/scribe/elasticsearch/__main__.py +++ b/scribe/elasticsearch/__main__.py @@ -2,8 +2,8 @@ import os import logging import traceback import argparse -from scribe.env import Env from scribe.common import setup_logging +from scribe.elasticsearch.env import ElasticEnv from scribe.elasticsearch.service import ElasticSyncService @@ -11,12 +11,11 @@ def main(): parser = argparse.ArgumentParser( prog='scribe-elastic-sync' ) - Env.contribute_common_settings_to_arg_parser(parser) - Env.contribute_elastic_sync_settings_to_arg_parser(parser) + ElasticEnv.contribute_to_arg_parser(parser) args = parser.parse_args() try: - env = Env.from_arg_parser(args) + env = ElasticEnv.from_arg_parser(args) setup_logging(os.path.join(env.db_dir, 'scribe-elastic-sync.log')) server = ElasticSyncService(env) server.run(args.reindex) diff --git a/scribe/elasticsearch/env.py b/scribe/elasticsearch/env.py new file mode 100644 index 0000000..209d593 --- /dev/null +++ b/scribe/elasticsearch/env.py @@ -0,0 +1,54 @@ +from scribe.env import Env + + +class ElasticEnv(Env): + def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None, + cache_all_tx_hashes=None, cache_all_claim_txos=None, elastic_host=None, elastic_port=None, + es_index_prefix=None, elastic_notifier_host=None, elastic_notifier_port=None, + blocking_channel_ids=None, filtering_channel_ids=None, reindex=False): + super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, + cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids) + self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') + self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200) + self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default( + 'ELASTIC_NOTIFIER_HOST', 'localhost') + self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer( + 'ELASTIC_NOTIFIER_PORT', 19080) + self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '') + # Filtering / Blocking + self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default( + 'BLOCKING_CHANNEL_IDS', '').split(' ') + self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default( + 'FILTERING_CHANNEL_IDS', '').split(' ') + self.reindex = reindex if reindex is not None else self.boolean('REINDEX_ES', False) + + @classmethod + def contribute_to_arg_parser(cls, parser): + super().contribute_to_arg_parser(parser) + parser.add_argument('--reindex', default=False, help="Drop and rebuild the elasticsearch index.", + action='store_true') + parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str, + help="Hostname or ip address of the elasticsearch instance to connect to. " + "Can be set in env with 'ELASTIC_HOST'") + parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int, + help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'") + parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'), + type=str, help='elasticsearch sync notifier host, defaults to localhost') + parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int, + help='elasticsearch sync notifier port') + parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str) + + parser.add_argument('--query_timeout_ms', type=int, default=cls.integer('QUERY_TIMEOUT_MS', 10000), + help="Elasticsearch query timeout, in ms. Can be set in env with 'QUERY_TIMEOUT_MS'") + + @classmethod + def from_arg_parser(cls, args): + return cls( + db_dir=args.db_dir, elastic_host=args.elastic_host, + elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain, + es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, + prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes, + cache_all_claim_txos=args.cache_all_claim_txos, blocking_channel_ids=args.blocking_channel_ids, + filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host, + elastic_notifier_port=args.elastic_notifier_port + ) diff --git a/scribe/env.py b/scribe/env.py index fc2ab72..a019540 100644 --- a/scribe/env.py +++ b/scribe/env.py @@ -3,7 +3,6 @@ import re import resource import logging from collections import namedtuple -from ipaddress import ip_address from scribe.blockchain.network import LBCMainNet, LBCTestNet, LBCRegTest @@ -30,25 +29,12 @@ class Env: class Error(Exception): pass - def __init__(self, db_dir=None, daemon_url=None, host=None, elastic_host=None, elastic_port=None, - max_query_workers=None, chain=None, es_index_prefix=None, reorg_limit=None, - tcp_port=None, udp_port=None, prometheus_port=None, banner_file=None, - allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None, - payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, - session_timeout=None, drop_client=None, description=None, daily_fee=None, - database_query_timeout=None, db_max_open_files=64, elastic_notifier_host=None, - elastic_notifier_port=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, - peer_announce=None): + def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, + prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, + blocking_channel_ids=None, filtering_channel_ids=None): + self.logger = logging.getLogger(__name__) self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY') - self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') - self.db_max_open_files = db_max_open_files - self.host = host if host is not None else self.default('HOST', 'localhost') - self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') - self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200) - self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default('ELASTIC_NOTIFIER_HOST', 'localhost') - self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer('ELASTIC_NOTIFIER_PORT', 19080) - self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK']) self.max_query_workers = max_query_workers if max_query_workers is not None else self.integer('MAX_QUERY_WORKERS', 4) if chain == 'mainnet': @@ -57,41 +43,15 @@ class Env: self.coin = LBCTestNet else: self.coin = LBCRegTest - self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '') self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) - # Server stuff - self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None) - self.udp_port = udp_port if udp_port is not None else self.integer('UDP_PORT', self.tcp_port) self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0) - self.banner_file = banner_file if banner_file is not None else self.default('BANNER_FILE', None) - self.allow_lan_udp = allow_lan_udp if allow_lan_udp is not None else self.boolean('ALLOW_LAN_UDP', False) self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False) self.cache_all_claim_txos = cache_all_claim_txos if cache_all_claim_txos is not None else self.boolean('CACHE_ALL_CLAIM_TXOS', False) - self.country = country if country is not None else self.default('COUNTRY', 'US') - # Peer discovery - self.peer_discovery = self.peer_discovery_enum() - self.peer_announce = peer_announce if peer_announce is not None else self.boolean('PEER_ANNOUNCE', True) - if peer_hubs is not None: - self.peer_hubs = [p.strip("") for p in peer_hubs.split(",")] - else: - self.peer_hubs = self.extract_peer_hubs() - # The electrum client takes the empty string as unspecified - self.payment_address = payment_address if payment_address is not None else self.default('PAYMENT_ADDRESS', '') - self.donation_address = donation_address if donation_address is not None else self.default('DONATION_ADDRESS', '') - # Server limits to help prevent DoS - self.max_send = max_send if max_send is not None else self.integer('MAX_SEND', 1000000) - self.max_receive = max_receive if max_receive is not None else self.integer('MAX_RECEIVE', 1000000) - self.max_sessions = max_sessions if max_sessions is not None else self.sane_max_sessions() - self.session_timeout = session_timeout if session_timeout is not None else self.integer('SESSION_TIMEOUT', 600) - self.drop_client = drop_client if drop_client is not None else self.custom("DROP_CLIENT", None, re.compile) - self.description = description if description is not None else self.default('DESCRIPTION', '') - self.daily_fee = daily_fee if daily_fee is not None else self.string_amount('DAILY_FEE', '0') - self.database_query_timeout = database_query_timeout if database_query_timeout is not None else \ - (float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0) - # Filtering / Blocking - self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default('BLOCKING_CHANNEL_IDS', '').split(' ') - self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default('FILTERING_CHANNEL_IDS', '').split(' ') + self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default( + 'BLOCKING_CHANNEL_IDS', '').split(' ') + self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default( + 'FILTERING_CHANNEL_IDS', '').split(' ') @classmethod def default(cls, envvar, default): @@ -190,10 +150,11 @@ class Env: return [hub.strip() for hub in peer_hubs.split(',')] @classmethod - def contribute_common_settings_to_arg_parser(cls, parser): + def contribute_to_arg_parser(cls, parser): """ Settings used by all services """ + env_db_dir = cls.default('DB_DIRECTORY', None) parser.add_argument('--db_dir', type=str, required=env_db_dir is None, help="Path of the directory containing lbry-rocksdb. ", default=env_db_dir) @@ -216,60 +177,6 @@ class Env: parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0), help="Port for prometheus metrics to listen on, disabled by default. " "Can be set in env with 'PROMETHEUS_PORT'.") - - @classmethod - def contribute_writer_settings_to_arg_parser(cls, parser): - env_daemon_url = cls.default('DAEMON_URL', None) - parser.add_argument('--daemon_url', required=env_daemon_url is None, - help="URL for rpc from lbrycrd or lbcd, " - ":@.", - default=env_daemon_url) - parser.add_argument('--db_max_open_files', type=int, default=64, - help='This setting translates into the max_open_files option given to rocksdb. ' - 'A higher number will use more memory. Defaults to 64.') - - @classmethod - def contribute_server_settings_to_arg_parser(cls, parser): - env_daemon_url = cls.default('DAEMON_URL', None) - parser.add_argument('--daemon_url', required=env_daemon_url is None, - help="URL for rpc from lbrycrd or lbcd, " - ":@.", - default=env_daemon_url) - parser.add_argument('--reindex', default=False, help="Drop and rebuild the elasticsearch index.", - action='store_true') - parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'), - help="Interface for hub server to listen on, use 0.0.0.0 to listen on the external " - "interface. Can be set in env with 'HOST'") - parser.add_argument('--tcp_port', type=int, default=cls.integer('TCP_PORT', 50001), - help="Electrum TCP port to listen on for hub server. Can be set in env with 'TCP_PORT'") - parser.add_argument('--udp_port', type=int, default=cls.integer('UDP_PORT', 50001), - help="'UDP port to listen on for hub server. Can be set in env with 'UDP_PORT'") - parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str, - help="Hostname or ip address of the elasticsearch instance to connect to. " - "Can be set in env with 'ELASTIC_HOST'") - parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int, - help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'") - parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'), - type=str, help='elasticsearch sync notifier host, defaults to localhost') - parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int, - help='elasticsearch sync notifier port') - parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str) - - @classmethod - def contribute_elastic_sync_settings_to_arg_parser(cls, parser): - parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str, - help="Hostname or ip address of the elasticsearch instance to connect to. " - "Can be set in env with 'ELASTIC_HOST'") - parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int, - help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'") - parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'), - type=str, help='elasticsearch sync notifier host, defaults to localhost') - parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int, - help='elasticsearch sync notifier port') - parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str) - - parser.add_argument('--query_timeout_ms', type=int, default=cls.integer('QUERY_TIMEOUT_MS', 10000), - help="Elasticsearch query timeout, in ms. Can be set in env with 'QUERY_TIMEOUT_MS'") parser.add_argument('--blocking_channel_ids', nargs='*', help="Space separated list of channel claim ids used for blocking. " "Claims that are reposted by these channels can't be resolved " @@ -283,18 +190,4 @@ class Env: @classmethod def from_arg_parser(cls, args): - return cls( - db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files, - host=args.host, elastic_host=args.elastic_host, elastic_port=args.elastic_port, - max_query_workers=args.max_query_workers, chain=args.chain, es_index_prefix=args.es_index_prefix, - reorg_limit=args.reorg_limit, tcp_port=args.tcp_port, - udp_port=args.udp_port, prometheus_port=args.prometheus_port, - banner_file=args.banner_file, allow_lan_udp=args.allow_lan_udp, - cache_all_tx_hashes=args.cache_all_tx_hashes, cache_all_claim_txos=args.cache_all_claim_txos, - country=args.country, payment_address=args.payment_address, donation_address=args.donation_address, - max_send=args.max_send, max_receive=args.max_receive, max_sessions=args.max_sessions, - session_timeout=args.session_timeout, drop_client=args.drop_client, description=args.description, - daily_fee=args.daily_fee, database_query_timeout=(args.query_timeout_ms / 1000), - blocking_channel_ids=args.blocking_channel_ids, filtering_channel_ids=args.filtering_channel_ids, - elastic_notifier_host=args.elastic_notifier_host, elastic_notifier_port=args.elastic_notifier_port - ) + raise NotImplementedError() diff --git a/scribe/hub/__main__.py b/scribe/hub/__main__.py index 16139ec..0862e87 100644 --- a/scribe/hub/__main__.py +++ b/scribe/hub/__main__.py @@ -2,8 +2,8 @@ import os import logging import traceback import argparse -from scribe.env import Env from scribe.common import setup_logging +from scribe.hub.env import ServerEnv from scribe.hub.service import HubServerService @@ -11,12 +11,10 @@ def main(): parser = argparse.ArgumentParser( prog='scribe-hub' ) - Env.contribute_common_settings_to_arg_parser(parser) - Env.contribute_server_settings_to_arg_parser(parser) + ServerEnv.contribute_to_arg_parser(parser) args = parser.parse_args() - try: - env = Env.from_arg_parser(args) + env = ServerEnv.from_arg_parser(args) setup_logging(os.path.join(env.db_dir, 'scribe-hub.log')) server = HubServerService(env) server.run() diff --git a/scribe/hub/env.py b/scribe/hub/env.py new file mode 100644 index 0000000..bbfa99e --- /dev/null +++ b/scribe/hub/env.py @@ -0,0 +1,113 @@ +import re +from scribe.env import Env + + +class ServerEnv(Env): + def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, + prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, + daemon_url=None, host=None, elastic_host=None, elastic_port=None, es_index_prefix=None, + tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None, + payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, + session_timeout=None, drop_client=None, description=None, daily_fee=None, + database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None, + blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None): + super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, + cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids) + self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') + self.host = host if host is not None else self.default('HOST', 'localhost') + self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') + self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200) + self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default( + 'ELASTIC_NOTIFIER_HOST', 'localhost') + self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer( + 'ELASTIC_NOTIFIER_PORT', 19080) + self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '') + # Server stuff + self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None) + self.udp_port = udp_port if udp_port is not None else self.integer('UDP_PORT', self.tcp_port) + self.banner_file = banner_file if banner_file is not None else self.default('BANNER_FILE', None) + self.allow_lan_udp = allow_lan_udp if allow_lan_udp is not None else self.boolean('ALLOW_LAN_UDP', False) + self.country = country if country is not None else self.default('COUNTRY', 'US') + # Peer discovery + self.peer_discovery = self.peer_discovery_enum() + self.peer_announce = peer_announce if peer_announce is not None else self.boolean('PEER_ANNOUNCE', True) + if peer_hubs is not None: + self.peer_hubs = [p.strip("") for p in peer_hubs.split(",")] + else: + self.peer_hubs = self.extract_peer_hubs() + # The electrum client takes the empty string as unspecified + self.payment_address = payment_address if payment_address is not None else self.default('PAYMENT_ADDRESS', '') + self.donation_address = donation_address if donation_address is not None else self.default('DONATION_ADDRESS', + '') + # Server limits to help prevent DoS + self.max_send = max_send if max_send is not None else self.integer('MAX_SEND', 1000000000000000000) + self.max_receive = max_receive if max_receive is not None else self.integer('MAX_RECEIVE', 1000000000000000000) + self.max_sessions = max_sessions if max_sessions is not None else self.sane_max_sessions() + self.session_timeout = session_timeout if session_timeout is not None else self.integer('SESSION_TIMEOUT', 600) + self.drop_client = re.compile(drop_client) if drop_client is not None else self.custom("DROP_CLIENT", None, re.compile) + self.description = description if description is not None else self.default('DESCRIPTION', '') + self.daily_fee = daily_fee if daily_fee is not None else self.string_amount('DAILY_FEE', '0') + self.database_query_timeout = (database_query_timeout / 1000.0) if database_query_timeout is not None else \ + (float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0) + + @classmethod + def contribute_to_arg_parser(cls, parser): + super().contribute_to_arg_parser(parser) + env_daemon_url = cls.default('DAEMON_URL', None) + parser.add_argument('--daemon_url', required=env_daemon_url is None, + help="URL for rpc from lbrycrd or lbcd, " + ":@.", + default=env_daemon_url) + parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'), + help="Interface for hub server to listen on, use 0.0.0.0 to listen on the external " + "interface. Can be set in env with 'HOST'") + parser.add_argument('--tcp_port', type=int, default=cls.integer('TCP_PORT', 50001), + help="Electrum TCP port to listen on for hub server. Can be set in env with 'TCP_PORT'") + parser.add_argument('--udp_port', type=int, default=cls.integer('UDP_PORT', 50001), + help="'UDP port to listen on for hub server. Can be set in env with 'UDP_PORT'") + parser.add_argument('--max_sessions', type=int, default=cls.integer('MAX_SESSIONS', 100000), + help="Maximum number of electrum clients that can be connected, defaults to 100000.") + parser.add_argument('--max_send', type=int, default=cls.integer('MAX_SESSIONS', 1000000000000000000), + help="Maximum size of a request") + parser.add_argument('--max_receive', type=int, default=cls.integer('MAX_SESSIONS', 1000000000000000000), + help="Maximum size of a response") + parser.add_argument('--drop_client', type=str, default=cls.default('DROP_CLIENT', None), + help="Regex used for blocking clients") + parser.add_argument('--session_timeout', type=int, default=cls.integer('SESSION_TIMEOUT', 600), + help="Session inactivity timeout") + parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str, + help="Hostname or ip address of the elasticsearch instance to connect to. " + "Can be set in env with 'ELASTIC_HOST'") + parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int, + help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'") + parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'), + type=str, help='elasticsearch sync notifier host, defaults to localhost') + parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int, + help='elasticsearch sync notifier port') + parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str) + parser.add_argument('--allow_lan_udp', action='store_true', + help="Reply to clients on the local network", default=cls.boolean('ALLOW_LAN_UDP', False)) + parser.add_argument('--description', default=cls.default('DESCRIPTION', None), type=str) + parser.add_argument('--banner_file', default=cls.default('BANNER_FILE', None), type=str) + parser.add_argument('--country', default=cls.default('COUNTRY', 'US'), type=str) + parser.add_argument('--payment_address', default=cls.default('PAYMENT_ADDRESS', None), type=str) + parser.add_argument('--donation_address', default=cls.default('DONATION_ADDRESS', None), type=str) + parser.add_argument('--daily_fee', default=cls.default('DAILY_FEE', '0'), type=str) + parser.add_argument('--query_timeout_ms', default=cls.default('QUERY_TIMEOUT_MS', 10000), type=int) + + @classmethod + def from_arg_parser(cls, args): + return cls( + db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_host=args.elastic_host, + elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain, + es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port, + udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file, + allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes, + cache_all_claim_txos=args.cache_all_claim_txos, country=args.country, payment_address=args.payment_address, + donation_address=args.donation_address, max_send=args.max_send, max_receive=args.max_receive, + max_sessions=args.max_sessions, session_timeout=args.session_timeout, + drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee, + database_query_timeout=args.query_timeout_ms / 1000, blocking_channel_ids=args.blocking_channel_ids, + filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host, + elastic_notifier_port=args.elastic_notifier_port + )