split env classes

This commit is contained in:
Jack Robison 2022-05-05 14:07:39 -04:00
parent b230a13761
commit 6f22767486
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
10 changed files with 416 additions and 330 deletions

198
README.md
View file

@ -34,7 +34,7 @@ docker build -f ./docker/Dockerfile.scribe -t lbry/scribe:development .
Scribe has been tested with python 3.7-3.9. Higher versions probably work but have not yet been tested. Scribe has been tested with python 3.7-3.9. Higher versions probably work but have not yet been tested.
1. clone the scribe scribe 1. clone the scribe repo
``` ```
git clone https://github.com/lbryio/scribe.git git clone https://github.com/lbryio/scribe.git
cd scribe cd scribe
@ -64,204 +64,10 @@ With options for high performance, if you have 64gb of memory and 12 cores, ever
As of block 1147423 (4/21/22) the size of the scribe rocksdb database is 120GB and the size of the elasticsearch volume is 63GB. As of block 1147423 (4/21/22) the size of the scribe rocksdb database is 120GB and the size of the elasticsearch volume is 63GB.
### docker-compose ### docker-compose
The recommended way to run a scribe hub is with docker. The recommended way to run a scribe hub is with docker. See [this guide](https://github.com/lbryio/scribe/blob/master/cluster_guide.md) for instructions on how to run the services on a cluster.
If you have the resources to run all of the services on one machine (at least 300gb of fast storage, preferably nvme, 64gb of RAM, 12 fast cores), see [this](https://github.com/lbryio/scribe/blob/master/docker/docker-compose.yml) docker-compose example. If you have the resources to run all of the services on one machine (at least 300gb of fast storage, preferably nvme, 64gb of RAM, 12 fast cores), see [this](https://github.com/lbryio/scribe/blob/master/docker/docker-compose.yml) docker-compose example.
#### Cluster environment
For best performance the recommended setup uses three server instances, these can be rented VPSs, self hosted VMs (ideally not on one physical host unless the host is sufficiently powerful), or physical computers. One is a dedicated lbcd node, one an elasticsearch server, and the third runs the scribe services. With this configuration the lbcd and elasticsearch servers can be shared between multiple scribe hub servers - more on that later.
Server Requirements (space requirements are at least double what's needed so it's possible to copy snapshots into place or make snapshots):
- lbcd: 2 cores, 8gb ram (slightly more may be required syncing from scratch, from a snapshot 8 is plenty), 150gb of NVMe storage
- elasticsearch: 8 cores, 9gb of ram (8gb minimum given to ES), 150gb of SSD speed storage
- scribe: 8 cores, 32gb of ram, 200gb of NVMe storage
All servers are assumed to be running ubuntu 20.04 with user named `lbry` with passwordless sudo and docker group permissions, ssh configured, ulimits set high, and docker + docker-compose installed. The server running elasticsearch should have swap disabled. The three servers need to be able to communicate with each other, they can be on a local network together or communicate over the internet. This guide will assume the three servers are on the internet.
##### Setting up the lbcd instance
Log in to the lbcd instance and perform the following steps:
- Build the lbcd docker image by running
```
git clone https://github.com/lbryio/lbcd.git
cd lbcd
docker build . -t lbry/lbcd:latest
```
- Copy the following to `~/docker-compose.yml`
```
version: "3"
volumes:
lbcd:
services:
lbcd:
image: lbry/lbcd:latest
restart: always
network_mode: host
command:
- "--rpcuser=lbry"
- "--rpcpass=lbry"
- "--rpclisten=127.0.0.1"
volumes:
- "lbcd:/root/.lbcd"
ports:
- "127.0.0.1:9245:9245"
- "9246:9246" # p2p port
```
- Start lbcd by running `docker-compose up -d`
- Check the progress with `docker-compose logs -f --tail 100`
##### Setting up the elasticsearch instance
Log in to the elasticsearch instance and perform the following steps:
- Copy the following to `~/docker-compose.yml`
```
version: "3"
volumes:
es01:
services:
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.0
container_name: es01
environment:
- node.name=es01
- discovery.type=single-node
- indices.query.bool.max_clause_count=8192
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Dlog4j2.formatMsgNoLookups=true -Xms8g -Xmx8g" # no more than 32, remember to disable swap
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- "es01:/usr/share/elasticsearch/data"
ports:
- "127.0.0.1:9200:9200"
```
- Start elasticsearch by running `docker-compose up -d`
- Check the status with `docker-compose logs -f --tail 100`
##### Setting up the scribe hub instance
- Log in (ssh) to the scribe instance and generate and print out a ssh key, this is needed to set up port forwards to the other two instances. Copy the output of the following:
```
ssh-keygen -q -t ed25519 -N '' -f ~/.ssh/id_ed25519 <<<y >/dev/null 2>&1
```
- After copying the above key, log out of the scribe hub instance.
- Log in to the elasticsearch instance add the copied key to `~/.ssh/authorized_keys` (see [this](https://stackoverflow.com/questions/6377009/adding-a-public-key-to-ssh-authorized-keys-does-not-log-me-in-automatically) if confused). Log out of the elasticsearch instance once done.
- Log in to the lbcd instance and add the copied key to `~/.ssh/authorized_keys`, log out when done.
- Log in to the scribe instance and copy the following to `/etc/systemd/system/es-tunnel.service`, replacing `lbry` with your user and `your-elastic-ip` with your elasticsearch instance ip.
```
[Unit]
Description=Persistent SSH Tunnel for ES
After=network.target
[Service]
Restart=on-failure
RestartSec=5
ExecStart=/usr/bin/ssh -NTC -o ServerAliveInterval=60 -o ExitOnForwardFailure=yes -L 127.0.0.1:9200:127.0.0.1:9200 lbry@your-elastic-ip
User=lbry
Group=lbry
[Install]
WantedBy=multi-user.target
```
- Next, copy the following to `/etc/systemd/system/lbcd-tunnel.service` on the scribe instance, replacing `lbry` with your user and `your-lbcd-ip` with your lbcd instance ip.
```
[Unit]
Description=Persistent SSH Tunnel for lbcd
After=network.target
[Service]
Restart=on-failure
RestartSec=5
ExecStart=/usr/bin/ssh -NTC -o ServerAliveInterval=60 -o ExitOnForwardFailure=yes -L 127.0.0.1:9245:127.0.0.1:9245 lbry@your-lbcd-ip
User=lbry
Group=lbry
[Install]
WantedBy=multi-user.target
```
- Verify you can ssh in to the elasticsearch and lbcd instances from the scribe instance
- Enable and start the ssh port forward services on the scribe instance
```
sudo systemctl enable es-tunnel.service
sudo systemctl enable lbcd-tunnel.service
sudo systemctl start es-tunnel.service
sudo systemctl start lbcd-tunnel.service
```
- Build the scribe docker image on the scribe hub instance by running the following:
```
git clone https://github.com/lbryio/scribe.git
cd scribe
docker build -f ./docker/Dockerfile.scribe -t lbry/scribe:development .
```
- Copy the following to `~/docker-compose.yml` on the scribe instance
```
version: "3"
volumes:
lbry_rocksdb:
services:
scribe:
depends_on:
- scribe_elastic_sync
image: lbry/scribe:${SCRIBE_TAG:-development}
restart: always
network_mode: host
volumes:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe
command:
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--max_query_workers=2"
- "--cache_all_tx_hashes"
scribe_elastic_sync:
image: lbry/scribe:${SCRIBE_TAG:-development}
restart: always
network_mode: host
ports:
- "127.0.0.1:19080:19080" # elastic notifier port
volumes:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe-elastic-sync
command:
- "--elastic_host=127.0.0.1"
- "--elastic_port=9200"
- "--max_query_workers=2"
- "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8"
- "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6"
scribe_hub:
depends_on:
- scribe_elastic_sync
- scribe
image: lbry/scribe:${SCRIBE_TAG:-development}
restart: always
network_mode: host
ports:
- "50001:50001" # electrum rpc port and udp ping port
- "2112:2112" # comment out to disable prometheus metrics
volumes:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe-hub
command:
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--elastic_host=127.0.0.1"
- "--elastic_port=9200"
- "--max_query_workers=4"
- "--host=0.0.0.0"
- "--max_sessions=100000"
- "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8"
- "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6"
- "--prometheus_port=2112" # comment out to disable prometheus metrics
```
- Start the scribe hub services by running `docker-compose up -d`
- Check the status with `docker-compose logs -f --tail 100`
### From source ### From source
### Options ### Options

194
cluster_guide.md Normal file
View file

@ -0,0 +1,194 @@
## Cluster environment guide
For best performance the recommended setup uses three server instances, these can be rented VPSs, self hosted VMs (ideally not on one physical host unless the host is sufficiently powerful), or physical computers. One is a dedicated lbcd node, one an elasticsearch server, and the third runs the scribe services. With this configuration the lbcd and elasticsearch servers can be shared between multiple scribe hub servers - more on that later.
Server Requirements (space requirements are at least double what's needed so it's possible to copy snapshots into place or make snapshots):
- lbcd: 2 cores, 8gb ram (slightly more may be required syncing from scratch, from a snapshot 8 is plenty), 150gb of NVMe storage
- elasticsearch: 8 cores, 9gb of ram (8gb minimum given to ES), 150gb of SSD speed storage
- scribe: 8 cores, 32gb of ram, 200gb of NVMe storage
All servers are assumed to be running ubuntu 20.04 with user named `lbry` with passwordless sudo and docker group permissions, ssh configured, ulimits set high, and docker + docker-compose installed. The server running elasticsearch should have swap disabled. The three servers need to be able to communicate with each other, they can be on a local network together or communicate over the internet. This guide will assume the three servers are on the internet.
### Setting up the lbcd instance
Log in to the lbcd instance and perform the following steps:
- Build the lbcd docker image by running
```
git clone https://github.com/lbryio/lbcd.git
cd lbcd
docker build . -t lbry/lbcd:latest
```
- Copy the following to `~/docker-compose.yml`
```
version: "3"
volumes:
lbcd:
services:
lbcd:
image: lbry/lbcd:latest
restart: always
network_mode: host
command:
- "--rpcuser=lbry"
- "--rpcpass=lbry"
- "--rpclisten=127.0.0.1"
volumes:
- "lbcd:/root/.lbcd"
ports:
- "127.0.0.1:9245:9245"
- "9246:9246" # p2p port
```
- Start lbcd by running `docker-compose up -d`
- Check the progress with `docker-compose logs -f --tail 100`
### Setting up the elasticsearch instance
Log in to the elasticsearch instance and perform the following steps:
- Copy the following to `~/docker-compose.yml`
```
version: "3"
volumes:
es01:
services:
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.0
container_name: es01
environment:
- node.name=es01
- discovery.type=single-node
- indices.query.bool.max_clause_count=8192
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Dlog4j2.formatMsgNoLookups=true -Xms8g -Xmx8g" # no more than 32, remember to disable swap
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- "es01:/usr/share/elasticsearch/data"
ports:
- "127.0.0.1:9200:9200"
```
- Start elasticsearch by running `docker-compose up -d`
- Check the status with `docker-compose logs -f --tail 100`
### Setting up the scribe hub instance
- Log in (ssh) to the scribe instance and generate and print out a ssh key, this is needed to set up port forwards to the other two instances. Copy the output of the following:
```
ssh-keygen -q -t ed25519 -N '' -f ~/.ssh/id_ed25519 <<<y >/dev/null 2>&1
```
- After copying the above key, log out of the scribe hub instance.
- Log in to the elasticsearch instance add the copied key to `~/.ssh/authorized_keys` (see [this](https://stackoverflow.com/questions/6377009/adding-a-public-key-to-ssh-authorized-keys-does-not-log-me-in-automatically) if confused). Log out of the elasticsearch instance once done.
- Log in to the lbcd instance and add the copied key to `~/.ssh/authorized_keys`, log out when done.
- Log in to the scribe instance and copy the following to `/etc/systemd/system/es-tunnel.service`, replacing `lbry` with your user and `your-elastic-ip` with your elasticsearch instance ip.
```
[Unit]
Description=Persistent SSH Tunnel for ES
After=network.target
[Service]
Restart=on-failure
RestartSec=5
ExecStart=/usr/bin/ssh -NTC -o ServerAliveInterval=60 -o ExitOnForwardFailure=yes -L 127.0.0.1:9200:127.0.0.1:9200 lbry@your-elastic-ip
User=lbry
Group=lbry
[Install]
WantedBy=multi-user.target
```
- Next, copy the following to `/etc/systemd/system/lbcd-tunnel.service` on the scribe instance, replacing `lbry` with your user and `your-lbcd-ip` with your lbcd instance ip.
```
[Unit]
Description=Persistent SSH Tunnel for lbcd
After=network.target
[Service]
Restart=on-failure
RestartSec=5
ExecStart=/usr/bin/ssh -NTC -o ServerAliveInterval=60 -o ExitOnForwardFailure=yes -L 127.0.0.1:9245:127.0.0.1:9245 lbry@your-lbcd-ip
User=lbry
Group=lbry
[Install]
WantedBy=multi-user.target
```
- Verify you can ssh in to the elasticsearch and lbcd instances from the scribe instance
- Enable and start the ssh port forward services on the scribe instance
```
sudo systemctl enable es-tunnel.service
sudo systemctl enable lbcd-tunnel.service
sudo systemctl start es-tunnel.service
sudo systemctl start lbcd-tunnel.service
```
- Build the scribe docker image on the scribe hub instance by running the following:
```
git clone https://github.com/lbryio/scribe.git
cd scribe
docker build -f ./docker/Dockerfile.scribe -t lbry/scribe:development .
```
- Copy the following to `~/docker-compose.yml` on the scribe instance
```
version: "3"
volumes:
lbry_rocksdb:
services:
scribe:
depends_on:
- scribe_elastic_sync
image: lbry/scribe:${SCRIBE_TAG:-development}
restart: always
network_mode: host
volumes:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe
command:
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--max_query_workers=2"
- "--cache_all_tx_hashes"
scribe_elastic_sync:
image: lbry/scribe:${SCRIBE_TAG:-development}
restart: always
network_mode: host
ports:
- "127.0.0.1:19080:19080" # elastic notifier port
volumes:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe-elastic-sync
command:
- "--elastic_host=127.0.0.1"
- "--elastic_port=9200"
- "--max_query_workers=2"
- "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8"
- "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6"
scribe_hub:
depends_on:
- scribe_elastic_sync
- scribe
image: lbry/scribe:${SCRIBE_TAG:-development}
restart: always
network_mode: host
ports:
- "50001:50001" # electrum rpc port and udp ping port
- "2112:2112" # comment out to disable prometheus metrics
volumes:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe-hub
command:
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--elastic_host=127.0.0.1"
- "--elastic_port=9200"
- "--max_query_workers=4"
- "--host=0.0.0.0"
- "--max_sessions=100000"
- "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8"
- "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6"
- "--prometheus_port=2112" # comment out to disable prometheus metrics
```
- Start the scribe hub services by running `docker-compose up -d`
- Check the status with `docker-compose logs -f --tail 100`

View file

@ -41,9 +41,6 @@ RUN rm ~/.cache -rf
# entry point # entry point
VOLUME $db_dir VOLUME $db_dir
ENV DB_DIRECTORY=$db_dir ENV DB_DIRECTORY=$db_dir
ENV MAX_SEND=1000000000000000000
ENV MAX_RECEIVE=1000000000000000000
COPY ./docker/scribe_entrypoint.sh /entrypoint.sh COPY ./docker/scribe_entrypoint.sh /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"] ENTRYPOINT ["/entrypoint.sh"]

View file

@ -2,8 +2,8 @@ import os
import logging import logging
import traceback import traceback
import argparse import argparse
from scribe.env import Env
from scribe.common import setup_logging from scribe.common import setup_logging
from scribe.blockchain.env import BlockchainEnv
from scribe.blockchain.service import BlockchainProcessorService from scribe.blockchain.service import BlockchainProcessorService
@ -11,12 +11,11 @@ def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog='scribe' prog='scribe'
) )
Env.contribute_common_settings_to_arg_parser(parser) BlockchainEnv.contribute_to_arg_parser(parser)
Env.contribute_writer_settings_to_arg_parser(parser)
args = parser.parse_args() args = parser.parse_args()
try: try:
env = Env.from_arg_parser(args) env = BlockchainEnv.from_arg_parser(args)
setup_logging(os.path.join(env.db_dir, 'scribe.log')) setup_logging(os.path.join(env.db_dir, 'scribe.log'))
block_processor = BlockchainProcessorService(env) block_processor = BlockchainProcessorService(env)
block_processor.run() block_processor.run()

33
scribe/blockchain/env.py Normal file
View file

@ -0,0 +1,33 @@
from scribe.env import Env
class BlockchainEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
blocking_channel_ids=None, filtering_channel_ids=None,
db_max_open_files=64, daemon_url=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids)
self.db_max_open_files = db_max_open_files
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
@classmethod
def contribute_to_arg_parser(cls, parser):
super().contribute_to_arg_parser(parser)
env_daemon_url = cls.default('DAEMON_URL', None)
parser.add_argument('--daemon_url', required=env_daemon_url is None,
help="URL for rpc from lbrycrd or lbcd, "
"<rpcuser>:<rpcpassword>@<lbrycrd rpc ip><lbrycrd rpc port>.",
default=env_daemon_url)
parser.add_argument('--db_max_open_files', type=int, default=64,
help='This setting translates into the max_open_files option given to rocksdb. '
'A higher number will use more memory. Defaults to 64.')
@classmethod
def from_arg_parser(cls, args):
return cls(
db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files,
max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit,
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos
)

View file

@ -2,8 +2,8 @@ import os
import logging import logging
import traceback import traceback
import argparse import argparse
from scribe.env import Env
from scribe.common import setup_logging from scribe.common import setup_logging
from scribe.elasticsearch.env import ElasticEnv
from scribe.elasticsearch.service import ElasticSyncService from scribe.elasticsearch.service import ElasticSyncService
@ -11,12 +11,11 @@ def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog='scribe-elastic-sync' prog='scribe-elastic-sync'
) )
Env.contribute_common_settings_to_arg_parser(parser) ElasticEnv.contribute_to_arg_parser(parser)
Env.contribute_elastic_sync_settings_to_arg_parser(parser)
args = parser.parse_args() args = parser.parse_args()
try: try:
env = Env.from_arg_parser(args) env = ElasticEnv.from_arg_parser(args)
setup_logging(os.path.join(env.db_dir, 'scribe-elastic-sync.log')) setup_logging(os.path.join(env.db_dir, 'scribe-elastic-sync.log'))
server = ElasticSyncService(env) server = ElasticSyncService(env)
server.run(args.reindex) server.run(args.reindex)

View file

@ -0,0 +1,54 @@
from scribe.env import Env
class ElasticEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None,
cache_all_tx_hashes=None, cache_all_claim_txos=None, elastic_host=None, elastic_port=None,
es_index_prefix=None, elastic_notifier_host=None, elastic_notifier_port=None,
blocking_channel_ids=None, filtering_channel_ids=None, reindex=False):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids)
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default(
'ELASTIC_NOTIFIER_HOST', 'localhost')
self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer(
'ELASTIC_NOTIFIER_PORT', 19080)
self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '')
# Filtering / Blocking
self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default(
'BLOCKING_CHANNEL_IDS', '').split(' ')
self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default(
'FILTERING_CHANNEL_IDS', '').split(' ')
self.reindex = reindex if reindex is not None else self.boolean('REINDEX_ES', False)
@classmethod
def contribute_to_arg_parser(cls, parser):
super().contribute_to_arg_parser(parser)
parser.add_argument('--reindex', default=False, help="Drop and rebuild the elasticsearch index.",
action='store_true')
parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str,
help="Hostname or ip address of the elasticsearch instance to connect to. "
"Can be set in env with 'ELASTIC_HOST'")
parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int,
help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'")
parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'),
type=str, help='elasticsearch sync notifier host, defaults to localhost')
parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int,
help='elasticsearch sync notifier port')
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
parser.add_argument('--query_timeout_ms', type=int, default=cls.integer('QUERY_TIMEOUT_MS', 10000),
help="Elasticsearch query timeout, in ms. Can be set in env with 'QUERY_TIMEOUT_MS'")
@classmethod
def from_arg_parser(cls, args):
return cls(
db_dir=args.db_dir, elastic_host=args.elastic_host,
elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain,
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit,
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos, blocking_channel_ids=args.blocking_channel_ids,
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
elastic_notifier_port=args.elastic_notifier_port
)

View file

@ -3,7 +3,6 @@ import re
import resource import resource
import logging import logging
from collections import namedtuple from collections import namedtuple
from ipaddress import ip_address
from scribe.blockchain.network import LBCMainNet, LBCTestNet, LBCRegTest from scribe.blockchain.network import LBCMainNet, LBCTestNet, LBCRegTest
@ -30,25 +29,12 @@ class Env:
class Error(Exception): class Error(Exception):
pass pass
def __init__(self, db_dir=None, daemon_url=None, host=None, elastic_host=None, elastic_port=None, def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
max_query_workers=None, chain=None, es_index_prefix=None, reorg_limit=None, prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
tcp_port=None, udp_port=None, prometheus_port=None, banner_file=None, blocking_channel_ids=None, filtering_channel_ids=None):
allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None,
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
session_timeout=None, drop_client=None, description=None, daily_fee=None,
database_query_timeout=None, db_max_open_files=64, elastic_notifier_host=None,
elastic_notifier_port=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None,
peer_announce=None):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY') self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY')
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.db_max_open_files = db_max_open_files
self.host = host if host is not None else self.default('HOST', 'localhost')
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default('ELASTIC_NOTIFIER_HOST', 'localhost')
self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer('ELASTIC_NOTIFIER_PORT', 19080)
self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK']) self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK'])
self.max_query_workers = max_query_workers if max_query_workers is not None else self.integer('MAX_QUERY_WORKERS', 4) self.max_query_workers = max_query_workers if max_query_workers is not None else self.integer('MAX_QUERY_WORKERS', 4)
if chain == 'mainnet': if chain == 'mainnet':
@ -57,41 +43,15 @@ class Env:
self.coin = LBCTestNet self.coin = LBCTestNet
else: else:
self.coin = LBCRegTest self.coin = LBCRegTest
self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '')
self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
# Server stuff
self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None)
self.udp_port = udp_port if udp_port is not None else self.integer('UDP_PORT', self.tcp_port)
self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0) self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0)
self.banner_file = banner_file if banner_file is not None else self.default('BANNER_FILE', None)
self.allow_lan_udp = allow_lan_udp if allow_lan_udp is not None else self.boolean('ALLOW_LAN_UDP', False)
self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False) self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False)
self.cache_all_claim_txos = cache_all_claim_txos if cache_all_claim_txos is not None else self.boolean('CACHE_ALL_CLAIM_TXOS', False) self.cache_all_claim_txos = cache_all_claim_txos if cache_all_claim_txos is not None else self.boolean('CACHE_ALL_CLAIM_TXOS', False)
self.country = country if country is not None else self.default('COUNTRY', 'US')
# Peer discovery
self.peer_discovery = self.peer_discovery_enum()
self.peer_announce = peer_announce if peer_announce is not None else self.boolean('PEER_ANNOUNCE', True)
if peer_hubs is not None:
self.peer_hubs = [p.strip("") for p in peer_hubs.split(",")]
else:
self.peer_hubs = self.extract_peer_hubs()
# The electrum client takes the empty string as unspecified
self.payment_address = payment_address if payment_address is not None else self.default('PAYMENT_ADDRESS', '')
self.donation_address = donation_address if donation_address is not None else self.default('DONATION_ADDRESS', '')
# Server limits to help prevent DoS
self.max_send = max_send if max_send is not None else self.integer('MAX_SEND', 1000000)
self.max_receive = max_receive if max_receive is not None else self.integer('MAX_RECEIVE', 1000000)
self.max_sessions = max_sessions if max_sessions is not None else self.sane_max_sessions()
self.session_timeout = session_timeout if session_timeout is not None else self.integer('SESSION_TIMEOUT', 600)
self.drop_client = drop_client if drop_client is not None else self.custom("DROP_CLIENT", None, re.compile)
self.description = description if description is not None else self.default('DESCRIPTION', '')
self.daily_fee = daily_fee if daily_fee is not None else self.string_amount('DAILY_FEE', '0')
self.database_query_timeout = database_query_timeout if database_query_timeout is not None else \
(float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0)
# Filtering / Blocking # Filtering / Blocking
self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default('BLOCKING_CHANNEL_IDS', '').split(' ') self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default(
self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default('FILTERING_CHANNEL_IDS', '').split(' ') 'BLOCKING_CHANNEL_IDS', '').split(' ')
self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default(
'FILTERING_CHANNEL_IDS', '').split(' ')
@classmethod @classmethod
def default(cls, envvar, default): def default(cls, envvar, default):
@ -190,10 +150,11 @@ class Env:
return [hub.strip() for hub in peer_hubs.split(',')] return [hub.strip() for hub in peer_hubs.split(',')]
@classmethod @classmethod
def contribute_common_settings_to_arg_parser(cls, parser): def contribute_to_arg_parser(cls, parser):
""" """
Settings used by all services Settings used by all services
""" """
env_db_dir = cls.default('DB_DIRECTORY', None) env_db_dir = cls.default('DB_DIRECTORY', None)
parser.add_argument('--db_dir', type=str, required=env_db_dir is None, parser.add_argument('--db_dir', type=str, required=env_db_dir is None,
help="Path of the directory containing lbry-rocksdb. ", default=env_db_dir) help="Path of the directory containing lbry-rocksdb. ", default=env_db_dir)
@ -216,60 +177,6 @@ class Env:
parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0), parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0),
help="Port for prometheus metrics to listen on, disabled by default. " help="Port for prometheus metrics to listen on, disabled by default. "
"Can be set in env with 'PROMETHEUS_PORT'.") "Can be set in env with 'PROMETHEUS_PORT'.")
@classmethod
def contribute_writer_settings_to_arg_parser(cls, parser):
env_daemon_url = cls.default('DAEMON_URL', None)
parser.add_argument('--daemon_url', required=env_daemon_url is None,
help="URL for rpc from lbrycrd or lbcd, "
"<rpcuser>:<rpcpassword>@<lbrycrd rpc ip><lbrycrd rpc port>.",
default=env_daemon_url)
parser.add_argument('--db_max_open_files', type=int, default=64,
help='This setting translates into the max_open_files option given to rocksdb. '
'A higher number will use more memory. Defaults to 64.')
@classmethod
def contribute_server_settings_to_arg_parser(cls, parser):
env_daemon_url = cls.default('DAEMON_URL', None)
parser.add_argument('--daemon_url', required=env_daemon_url is None,
help="URL for rpc from lbrycrd or lbcd, "
"<rpcuser>:<rpcpassword>@<lbrycrd rpc ip><lbrycrd rpc port>.",
default=env_daemon_url)
parser.add_argument('--reindex', default=False, help="Drop and rebuild the elasticsearch index.",
action='store_true')
parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'),
help="Interface for hub server to listen on, use 0.0.0.0 to listen on the external "
"interface. Can be set in env with 'HOST'")
parser.add_argument('--tcp_port', type=int, default=cls.integer('TCP_PORT', 50001),
help="Electrum TCP port to listen on for hub server. Can be set in env with 'TCP_PORT'")
parser.add_argument('--udp_port', type=int, default=cls.integer('UDP_PORT', 50001),
help="'UDP port to listen on for hub server. Can be set in env with 'UDP_PORT'")
parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str,
help="Hostname or ip address of the elasticsearch instance to connect to. "
"Can be set in env with 'ELASTIC_HOST'")
parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int,
help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'")
parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'),
type=str, help='elasticsearch sync notifier host, defaults to localhost')
parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int,
help='elasticsearch sync notifier port')
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
@classmethod
def contribute_elastic_sync_settings_to_arg_parser(cls, parser):
parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str,
help="Hostname or ip address of the elasticsearch instance to connect to. "
"Can be set in env with 'ELASTIC_HOST'")
parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int,
help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'")
parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'),
type=str, help='elasticsearch sync notifier host, defaults to localhost')
parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int,
help='elasticsearch sync notifier port')
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
parser.add_argument('--query_timeout_ms', type=int, default=cls.integer('QUERY_TIMEOUT_MS', 10000),
help="Elasticsearch query timeout, in ms. Can be set in env with 'QUERY_TIMEOUT_MS'")
parser.add_argument('--blocking_channel_ids', nargs='*', parser.add_argument('--blocking_channel_ids', nargs='*',
help="Space separated list of channel claim ids used for blocking. " help="Space separated list of channel claim ids used for blocking. "
"Claims that are reposted by these channels can't be resolved " "Claims that are reposted by these channels can't be resolved "
@ -283,18 +190,4 @@ class Env:
@classmethod @classmethod
def from_arg_parser(cls, args): def from_arg_parser(cls, args):
return cls( raise NotImplementedError()
db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files,
host=args.host, elastic_host=args.elastic_host, elastic_port=args.elastic_port,
max_query_workers=args.max_query_workers, chain=args.chain, es_index_prefix=args.es_index_prefix,
reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
udp_port=args.udp_port, prometheus_port=args.prometheus_port,
banner_file=args.banner_file, allow_lan_udp=args.allow_lan_udp,
cache_all_tx_hashes=args.cache_all_tx_hashes, cache_all_claim_txos=args.cache_all_claim_txos,
country=args.country, payment_address=args.payment_address, donation_address=args.donation_address,
max_send=args.max_send, max_receive=args.max_receive, max_sessions=args.max_sessions,
session_timeout=args.session_timeout, drop_client=args.drop_client, description=args.description,
daily_fee=args.daily_fee, database_query_timeout=(args.query_timeout_ms / 1000),
blocking_channel_ids=args.blocking_channel_ids, filtering_channel_ids=args.filtering_channel_ids,
elastic_notifier_host=args.elastic_notifier_host, elastic_notifier_port=args.elastic_notifier_port
)

View file

@ -2,8 +2,8 @@ import os
import logging import logging
import traceback import traceback
import argparse import argparse
from scribe.env import Env
from scribe.common import setup_logging from scribe.common import setup_logging
from scribe.hub.env import ServerEnv
from scribe.hub.service import HubServerService from scribe.hub.service import HubServerService
@ -11,12 +11,10 @@ def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog='scribe-hub' prog='scribe-hub'
) )
Env.contribute_common_settings_to_arg_parser(parser) ServerEnv.contribute_to_arg_parser(parser)
Env.contribute_server_settings_to_arg_parser(parser)
args = parser.parse_args() args = parser.parse_args()
try: try:
env = Env.from_arg_parser(args) env = ServerEnv.from_arg_parser(args)
setup_logging(os.path.join(env.db_dir, 'scribe-hub.log')) setup_logging(os.path.join(env.db_dir, 'scribe-hub.log'))
server = HubServerService(env) server = HubServerService(env)
server.run() server.run()

113
scribe/hub/env.py Normal file
View file

@ -0,0 +1,113 @@
import re
from scribe.env import Env
class ServerEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
daemon_url=None, host=None, elastic_host=None, elastic_port=None, es_index_prefix=None,
tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None,
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
session_timeout=None, drop_client=None, description=None, daily_fee=None,
database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None,
blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids)
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.host = host if host is not None else self.default('HOST', 'localhost')
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default(
'ELASTIC_NOTIFIER_HOST', 'localhost')
self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer(
'ELASTIC_NOTIFIER_PORT', 19080)
self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '')
# Server stuff
self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None)
self.udp_port = udp_port if udp_port is not None else self.integer('UDP_PORT', self.tcp_port)
self.banner_file = banner_file if banner_file is not None else self.default('BANNER_FILE', None)
self.allow_lan_udp = allow_lan_udp if allow_lan_udp is not None else self.boolean('ALLOW_LAN_UDP', False)
self.country = country if country is not None else self.default('COUNTRY', 'US')
# Peer discovery
self.peer_discovery = self.peer_discovery_enum()
self.peer_announce = peer_announce if peer_announce is not None else self.boolean('PEER_ANNOUNCE', True)
if peer_hubs is not None:
self.peer_hubs = [p.strip("") for p in peer_hubs.split(",")]
else:
self.peer_hubs = self.extract_peer_hubs()
# The electrum client takes the empty string as unspecified
self.payment_address = payment_address if payment_address is not None else self.default('PAYMENT_ADDRESS', '')
self.donation_address = donation_address if donation_address is not None else self.default('DONATION_ADDRESS',
'')
# Server limits to help prevent DoS
self.max_send = max_send if max_send is not None else self.integer('MAX_SEND', 1000000000000000000)
self.max_receive = max_receive if max_receive is not None else self.integer('MAX_RECEIVE', 1000000000000000000)
self.max_sessions = max_sessions if max_sessions is not None else self.sane_max_sessions()
self.session_timeout = session_timeout if session_timeout is not None else self.integer('SESSION_TIMEOUT', 600)
self.drop_client = re.compile(drop_client) if drop_client is not None else self.custom("DROP_CLIENT", None, re.compile)
self.description = description if description is not None else self.default('DESCRIPTION', '')
self.daily_fee = daily_fee if daily_fee is not None else self.string_amount('DAILY_FEE', '0')
self.database_query_timeout = (database_query_timeout / 1000.0) if database_query_timeout is not None else \
(float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0)
@classmethod
def contribute_to_arg_parser(cls, parser):
super().contribute_to_arg_parser(parser)
env_daemon_url = cls.default('DAEMON_URL', None)
parser.add_argument('--daemon_url', required=env_daemon_url is None,
help="URL for rpc from lbrycrd or lbcd, "
"<rpcuser>:<rpcpassword>@<lbrycrd rpc ip><lbrycrd rpc port>.",
default=env_daemon_url)
parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'),
help="Interface for hub server to listen on, use 0.0.0.0 to listen on the external "
"interface. Can be set in env with 'HOST'")
parser.add_argument('--tcp_port', type=int, default=cls.integer('TCP_PORT', 50001),
help="Electrum TCP port to listen on for hub server. Can be set in env with 'TCP_PORT'")
parser.add_argument('--udp_port', type=int, default=cls.integer('UDP_PORT', 50001),
help="'UDP port to listen on for hub server. Can be set in env with 'UDP_PORT'")
parser.add_argument('--max_sessions', type=int, default=cls.integer('MAX_SESSIONS', 100000),
help="Maximum number of electrum clients that can be connected, defaults to 100000.")
parser.add_argument('--max_send', type=int, default=cls.integer('MAX_SESSIONS', 1000000000000000000),
help="Maximum size of a request")
parser.add_argument('--max_receive', type=int, default=cls.integer('MAX_SESSIONS', 1000000000000000000),
help="Maximum size of a response")
parser.add_argument('--drop_client', type=str, default=cls.default('DROP_CLIENT', None),
help="Regex used for blocking clients")
parser.add_argument('--session_timeout', type=int, default=cls.integer('SESSION_TIMEOUT', 600),
help="Session inactivity timeout")
parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str,
help="Hostname or ip address of the elasticsearch instance to connect to. "
"Can be set in env with 'ELASTIC_HOST'")
parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int,
help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'")
parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'),
type=str, help='elasticsearch sync notifier host, defaults to localhost')
parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int,
help='elasticsearch sync notifier port')
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
parser.add_argument('--allow_lan_udp', action='store_true',
help="Reply to clients on the local network", default=cls.boolean('ALLOW_LAN_UDP', False))
parser.add_argument('--description', default=cls.default('DESCRIPTION', None), type=str)
parser.add_argument('--banner_file', default=cls.default('BANNER_FILE', None), type=str)
parser.add_argument('--country', default=cls.default('COUNTRY', 'US'), type=str)
parser.add_argument('--payment_address', default=cls.default('PAYMENT_ADDRESS', None), type=str)
parser.add_argument('--donation_address', default=cls.default('DONATION_ADDRESS', None), type=str)
parser.add_argument('--daily_fee', default=cls.default('DAILY_FEE', '0'), type=str)
parser.add_argument('--query_timeout_ms', default=cls.default('QUERY_TIMEOUT_MS', 10000), type=int)
@classmethod
def from_arg_parser(cls, args):
return cls(
db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_host=args.elastic_host,
elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain,
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file,
allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos, country=args.country, payment_address=args.payment_address,
donation_address=args.donation_address, max_send=args.max_send, max_receive=args.max_receive,
max_sessions=args.max_sessions, session_timeout=args.session_timeout,
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,
database_query_timeout=args.query_timeout_ms / 1000, blocking_channel_ids=args.blocking_channel_ids,
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
elastic_notifier_port=args.elastic_notifier_port
)