diff --git a/README.md b/README.md index 84e3dbe..91180bf 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ docker build -f ./docker/Dockerfile.scribe -t lbry/scribe:development . Scribe has been tested with python 3.7-3.9. Higher versions probably work but have not yet been tested. -1. clone the scribe scribe +1. clone the scribe repo ``` git clone https://github.com/lbryio/scribe.git cd scribe @@ -49,21 +49,60 @@ source scribe-venv/bin/activate pip install -e . ``` +That completes the installation, now you should have the commands `scribe`, `scribe-elastic-sync` and `scribe-hub` + +These can also optionally be run with `python -m scribe.blockchain`, `python -m scribe.elasticsearch`, and `python -m scribe.hub` + ## Usage -Scribe needs either the [lbrycrd](https://github.com/lbryio/lbrycrd) or [lbcd](https://github.com/lbryio/lbcd) blockchain daemon to be running. +### Requirements -As of block 1124663 (3/10/22) the size of the rocksdb database is 87GB and the size of the elasticsearch volume is 49GB. +Scribe needs elasticsearch and either the [lbrycrd](https://github.com/lbryio/lbrycrd) or [lbcd](https://github.com/lbryio/lbcd) blockchain daemon to be running. + +With options for high performance, if you have 64gb of memory and 12 cores, everything can be run on the same machine. However, the recommended way is with elasticsearch on one instance with 8gb of memory and at least 4 cores dedicated to it and the blockchain daemon on another with 16gb of memory and at least 4 cores. Then the scribe hub services can be run their own instance with between 16 and 32gb of memory (depending on settings) and 8 cores. + +As of block 1147423 (4/21/22) the size of the scribe rocksdb database is 120GB and the size of the elasticsearch volume is 63GB. ### docker-compose +The recommended way to run a scribe hub is with docker. See [this guide](https://github.com/lbryio/scribe/blob/master/cluster_guide.md) for instructions. + +If you have the resources to run all of the services on one machine (at least 300gb of fast storage, preferably nvme, 64gb of RAM, 12 fast cores), see [this](https://github.com/lbryio/scribe/blob/master/docker/docker-compose.yml) docker-compose example. ### From source -To start scribe, run the following (providing your own args) +### Options -``` -scribe --db_dir /your/db/path --daemon_url rpcuser:rpcpass@localhost:9245 -``` +#### Content blocking and filtering + +For various reasons it may be desirable to block or filtering content from claim search and resolve results, [here](https://github.com/lbryio/scribe/blob/master/blocking.md) are instructions for how to configure and use this feature as well as information about the recommended defaults. + +#### Common options across `scribe`, `scribe-hub`, and `scribe-elastic-sync`: + - `--db_dir` (required) Path of the directory containing lbry-rocksdb, set from the environment with `DB_DIRECTORY` + - `--daemon_url` (required for `scribe` and `scribe-hub`) URL for rpc from lbrycrd or lbcd:@. + - `--reorg_limit` Max reorg depth, defaults to 200, set from the environment with `REORG_LIMIT`. + - `--chain` With blockchain to use - either `mainnet`, `testnet`, or `regtest` - set from the environment with `NET` + - `--max_query_workers` Size of the thread pool, set from the environment with `MAX_QUERY_WORKERS` + - `--cache_all_tx_hashes` If this flag is set, all tx hashes will be stored in memory. For `scribe`, this speeds up the rate it can apply blocks as well as process mempool. For `scribe-hub`, this will speed up syncing address histories. This setting will use 10+g of memory. It can be set from the environment with `CACHE_ALL_TX_HASHES=Yes` + - `--cache_all_claim_txos` If this flag is set, all claim txos will be indexed in memory. Set from the environment with `CACHE_ALL_CLAIM_TXOS=Yes` + - `--prometheus_port` If provided this port will be used to provide prometheus metrics, set from the environment with `PROMETHEUS_PORT` + +#### Options for `scribe` + - `--db_max_open_files` This setting translates into the max_open_files option given to rocksdb. A higher number will use more memory. Defaults to 64. + +#### Options for `scribe-elastic-sync` + - `--reindex` If this flag is set drop and rebuild the elasticsearch index. + +#### Options for `scribe-hub` + - `--host` Interface for server to listen on, use 0.0.0.0 to listen on the external interface. Can be set from the environment with `HOST` + - `--tcp_port` Electrum TCP port to listen on for hub server. Can be set from the environment with `TCP_PORT` + - `--udp_port` UDP port to listen on for hub server. Can be set from the environment with `UDP_PORT` + - `--elastic_host` Hostname or ip address of the elasticsearch instance to connect to. Can be set from the environment with `ELASTIC_HOST` + - `--elastic_port` Elasticsearch port to connect to. Can be set from the environment with `ELASTIC_PORT` + - `--elastic_notifier_host` Elastic sync notifier host to connect to, defaults to localhost. Can be set from the environment with `ELASTIC_NOTIFIER_HOST` + - `--elastic_notifier_port` Elastic sync notifier port to connect using. Can be set from the environment with `ELASTIC_NOTIFIER_PORT` + - `--query_timeout_ms` Timeout for claim searches in elasticsearch in milliseconds. Can be set from the environment with `QUERY_TIMEOUT_MS` + - `--blocking_channel_ids` Space separated list of channel claim ids used for blocking. Claims that are reposted by these channels can't be resolved or returned in search results. Can be set from the environment with `BLOCKING_CHANNEL_IDS`. + - `--filtering_channel_ids` Space separated list of channel claim ids used for blocking. Claims that are reposted by these channels aren't returned in search results. Can be set from the environment with `FILTERING_CHANNEL_IDS` ## Contributing diff --git a/blocking.md b/blocking.md new file mode 100644 index 0000000..472764a --- /dev/null +++ b/blocking.md @@ -0,0 +1,23 @@ +### Claim filtering and blocking + + - Filtered claims are removed from claim search results (`blockchain.claimtrie.search`), they can still be resolved (`blockchain.claimtrie.resolve`) + - Blocked claims are not included in claim search results and cannot be resolved. + +Claims that are either filtered or blocked are replaced with a corresponding error message that includes the censoring channel id in a result that would return them. + +#### How to filter or block claims: + 1. Make a channel (using lbry-sdk) and include the claim id of the channel in `--filtering_channel_ids` or `--blocking_channel_ids` used by `scribe-hub` **and** `scribe-elastic-sync`, depending on which you want to use the channel for. To use both blocking and filtering, make one channel for each. + 2. Using lbry-sdk, repost the claim to be blocked or filtered using your corresponding channel. If you block/filter a claim id for a channel, it will block/filter all of the claims in the channel. + +#### Defaults + +The example docker-composes in the setup guide use the following defaults: + +Filtering: + - `lbry://@LBRY-TagAbuse#770bd7ecba84fd2f7607fb15aedd2b172c2e153f` + - `lbry://@LBRY-UntaggedPorn#95e5db68a3101df19763f3a5182e4b12ba393ee8` + +Blocking + - `lbry://@LBRY-DMCA#dd687b357950f6f271999971f43c785e8067c3a9` + - `lbry://@LBRY-DMCARedFlag#06871aa438032244202840ec59a469b303257cad` + - `lbry://@LBRY-OtherUSIllegal#b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6` diff --git a/cluster_guide.md b/cluster_guide.md new file mode 100644 index 0000000..0958a40 --- /dev/null +++ b/cluster_guide.md @@ -0,0 +1,194 @@ +## Cluster environment guide + +For best performance the recommended setup uses three server instances, these can be rented VPSs, self hosted VMs (ideally not on one physical host unless the host is sufficiently powerful), or physical computers. One is a dedicated lbcd node, one an elasticsearch server, and the third runs the scribe services. With this configuration the lbcd and elasticsearch servers can be shared between multiple scribe hub servers - more on that later. +Server Requirements (space requirements are at least double what's needed so it's possible to copy snapshots into place or make snapshots): + - lbcd: 2 cores, 8gb ram (slightly more may be required syncing from scratch, from a snapshot 8 is plenty), 150gb of NVMe storage + - elasticsearch: 8 cores, 9gb of ram (8gb minimum given to ES), 150gb of SSD speed storage + - scribe: 8 cores, 32gb of ram, 200gb of NVMe storage + +All servers are assumed to be running ubuntu 20.04 with user named `lbry` with passwordless sudo and docker group permissions, ssh configured, ulimits set high, and docker + docker-compose installed. The server running elasticsearch should have swap disabled. The three servers need to be able to communicate with each other, they can be on a local network together or communicate over the internet. This guide will assume the three servers are on the internet. + +### Setting up the lbcd instance +Log in to the lbcd instance and perform the following steps: + - Build the lbcd docker image by running +``` +git clone https://github.com/lbryio/lbcd.git +cd lbcd +docker build . -t lbry/lbcd:latest +``` + - Copy the following to `~/docker-compose.yml` +``` +version: "3" + +volumes: + lbcd: + +services: + lbcd: + image: lbry/lbcd:latest + restart: always + network_mode: host + command: + - "--rpcuser=lbry" + - "--rpcpass=lbry" + - "--rpclisten=127.0.0.1" + volumes: + - "lbcd:/root/.lbcd" + ports: + - "127.0.0.1:9245:9245" + - "9246:9246" # p2p port +``` + - Start lbcd by running `docker-compose up -d` + - Check the progress with `docker-compose logs -f --tail 100` + +### Setting up the elasticsearch instance +Log in to the elasticsearch instance and perform the following steps: + - Copy the following to `~/docker-compose.yml` +``` +version: "3" + +volumes: + es01: + +services: + es01: + image: docker.elastic.co/elasticsearch/elasticsearch:7.16.0 + container_name: es01 + environment: + - node.name=es01 + - discovery.type=single-node + - indices.query.bool.max_clause_count=8192 + - bootstrap.memory_lock=true + - "ES_JAVA_OPTS=-Dlog4j2.formatMsgNoLookups=true -Xms8g -Xmx8g" # no more than 32, remember to disable swap + ulimits: + memlock: + soft: -1 + hard: -1 + volumes: + - "es01:/usr/share/elasticsearch/data" + ports: + - "127.0.0.1:9200:9200" +``` + - Start elasticsearch by running `docker-compose up -d` + - Check the status with `docker-compose logs -f --tail 100` + +### Setting up the scribe hub instance + - Log in (ssh) to the scribe instance and generate and print out a ssh key, this is needed to set up port forwards to the other two instances. Copy the output of the following: +``` +ssh-keygen -q -t ed25519 -N '' -f ~/.ssh/id_ed25519 <</dev/null 2>&1 +``` + - After copying the above key, log out of the scribe hub instance. + + - Log in to the elasticsearch instance add the copied key to `~/.ssh/authorized_keys` (see [this](https://stackoverflow.com/questions/6377009/adding-a-public-key-to-ssh-authorized-keys-does-not-log-me-in-automatically) if confused). Log out of the elasticsearch instance once done. + - Log in to the lbcd instance and add the copied key to `~/.ssh/authorized_keys`, log out when done. + - Log in to the scribe instance and copy the following to `/etc/systemd/system/es-tunnel.service`, replacing `lbry` with your user and `your-elastic-ip` with your elasticsearch instance ip. +``` +[Unit] +Description=Persistent SSH Tunnel for ES +After=network.target + +[Service] +Restart=on-failure +RestartSec=5 +ExecStart=/usr/bin/ssh -NTC -o ServerAliveInterval=60 -o ExitOnForwardFailure=yes -L 127.0.0.1:9200:127.0.0.1:9200 lbry@your-elastic-ip +User=lbry +Group=lbry + +[Install] +WantedBy=multi-user.target +``` + - Next, copy the following to `/etc/systemd/system/lbcd-tunnel.service` on the scribe instance, replacing `lbry` with your user and `your-lbcd-ip` with your lbcd instance ip. +``` +[Unit] +Description=Persistent SSH Tunnel for lbcd +After=network.target + +[Service] +Restart=on-failure +RestartSec=5 +ExecStart=/usr/bin/ssh -NTC -o ServerAliveInterval=60 -o ExitOnForwardFailure=yes -L 127.0.0.1:9245:127.0.0.1:9245 lbry@your-lbcd-ip +User=lbry +Group=lbry + +[Install] +WantedBy=multi-user.target +``` + - Verify you can ssh in to the elasticsearch and lbcd instances from the scribe instance + - Enable and start the ssh port forward services on the scribe instance +``` +sudo systemctl enable es-tunnel.service +sudo systemctl enable lbcd-tunnel.service +sudo systemctl start es-tunnel.service +sudo systemctl start lbcd-tunnel.service +``` + - Build the scribe docker image on the scribe hub instance by running the following: +``` +git clone https://github.com/lbryio/scribe.git +cd scribe +docker build -f ./docker/Dockerfile.scribe -t lbry/scribe:development . +``` + - Copy the following to `~/docker-compose.yml` on the scribe instance +``` +version: "3" + +volumes: + lbry_rocksdb: + +services: + scribe: + depends_on: + - scribe_elastic_sync + image: lbry/scribe:${SCRIBE_TAG:-development} + restart: always + network_mode: host + volumes: + - "lbry_rocksdb:/database" + environment: + - HUB_COMMAND=scribe + command: + - "--daemon_url=http://lbry:lbry@127.0.0.1:9245" + - "--max_query_workers=2" + - "--cache_all_tx_hashes" + scribe_elastic_sync: + image: lbry/scribe:${SCRIBE_TAG:-development} + restart: always + network_mode: host + ports: + - "127.0.0.1:19080:19080" # elastic notifier port + volumes: + - "lbry_rocksdb:/database" + environment: + - HUB_COMMAND=scribe-elastic-sync + command: + - "--elastic_host=127.0.0.1" + - "--elastic_port=9200" + - "--max_query_workers=2" + - "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8" + - "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6" + scribe_hub: + depends_on: + - scribe_elastic_sync + - scribe + image: lbry/scribe:${SCRIBE_TAG:-development} + restart: always + network_mode: host + ports: + - "50001:50001" # electrum rpc port and udp ping port + - "2112:2112" # comment out to disable prometheus metrics + volumes: + - "lbry_rocksdb:/database" + environment: + - HUB_COMMAND=scribe-hub + command: + - "--daemon_url=http://lbry:lbry@127.0.0.1:9245" + - "--elastic_host=127.0.0.1" + - "--elastic_port=9200" + - "--max_query_workers=4" + - "--host=0.0.0.0" + - "--max_sessions=100000" + - "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8" + - "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6" + - "--prometheus_port=2112" # comment out to disable prometheus metrics +``` + - Start the scribe hub services by running `docker-compose up -d` + - Check the status with `docker-compose logs -f --tail 100` diff --git a/docker/Dockerfile b/docker/Dockerfile index 256ee1d..e72a8bb 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -39,18 +39,8 @@ RUN python3.9 docker/set_build.py RUN rm ~/.cache -rf # entry point -ARG host=localhost -ARG tcp_port=50001 -ARG daemon_url=http://lbry:lbry@localhost:9245/ VOLUME $db_dir -ENV TCP_PORT=$tcp_port -ENV HOST=$host -ENV DAEMON_URL=$daemon_url ENV DB_DIRECTORY=$db_dir -ENV MAX_SESSIONS=100000 -ENV MAX_SEND=1000000000000000000 -ENV MAX_RECEIVE=1000000000000000000 - COPY ./docker/scribe_entrypoint.sh /entrypoint.sh ENTRYPOINT ["/entrypoint.sh"] diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 8bb764a..3d4f617 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -1,28 +1,30 @@ version: "3" volumes: + lbcd: lbry_rocksdb: es01: services: scribe: depends_on: + - lbcd - scribe_elastic_sync - image: lbry/scribe:${SCRIBE_TAG:-latest-release} + image: lbry/scribe:${SCRIBE_TAG:-latest} restart: always network_mode: host volumes: - "lbry_rocksdb:/database" environment: - HUB_COMMAND=scribe - - DAEMON_URL=http://lbry:lbry@127.0.0.1:9245 - - MAX_QUERY_WORKERS=2 - - FILTERING_CHANNEL_IDS=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8 - - BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6 + command: # for full options, see `scribe --help` + - "--daemon_url=http://lbry:lbry@127.0.0.1:9245" + - "--max_query_workers=2" + # - "--cache_all_tx_hashes" # uncomment to keep an index of all tx hashes in memory. This uses lots (10+g) of memory but substantially improves performance. scribe_elastic_sync: depends_on: - es01 - image: lbry/scribe:${SCRIBE_TAG:-latest-release} + image: lbry/scribe:${SCRIBE_TAG:-latest} restart: always network_mode: host ports: @@ -31,36 +33,42 @@ services: - "lbry_rocksdb:/database" environment: - HUB_COMMAND=scribe-elastic-sync - - MAX_QUERY_WORKERS=2 - - ELASTIC_HOST=127.0.0.1 - - ELASTIC_PORT=9200 - - FILTERING_CHANNEL_IDS=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8 - - BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6 + command: # for full options, see `scribe-elastic-sync --help` + - "--max_query_workers=2" + - "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8" + - "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6" + - "--elastic_host=127.0.0.1" # elasticsearch host + - "--elastic_port=9200" # elasticsearch port + - "--elastic_notifier_host=127.0.0.1" # address for the elastic sync notifier to connect to + - "--elastic_notifier_port=19080" scribe_hub: depends_on: + - lbcd - scribe_elastic_sync - scribe - image: lbry/scribe:${SCRIBE_TAG:-latest-release} + image: lbry/scribe:${SCRIBE_TAG:-latest} restart: always network_mode: host ports: - "50001:50001" # electrum rpc port and udp ping port - - "2112:2112" # comment out to disable prometheus + - "2112:2112" # comment out to disable prometheus metrics volumes: - "lbry_rocksdb:/database" environment: - HUB_COMMAND=scribe-hub - - DAEMON_URL=http://lbry:lbry@127.0.0.1:9245 # used for broadcasting transactions - - MAX_QUERY_WORKERS=4 # reader threads - - MAX_SESSIONS=100000 - - ELASTIC_HOST=127.0.0.1 - - ELASTIC_PORT=9200 - - HOST=0.0.0.0 - - PROMETHEUS_PORT=2112 - - TCP_PORT=50001 - - ALLOW_LAN_UDP=No - - FILTERING_CHANNEL_IDS=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8 - - BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6 + command: # for full options, see `scribe-hub --help` + - "--daemon_url=http://lbry:lbry@127.0.0.1:9245" + - "--max_query_workers=4" + - "--host=0.0.0.0" + - "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8" + - "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6" + - "--prometheus_port=2112" # comment out to disable prometheus metrics + # - "--elastic_host=127.0.0.1" # elasticsearch host + # - "--elastic_port=9200" # elasticsearch port + # - "--elastic_notifier_host=127.0.0.1" # address for the elastic sync notifier to connect to + # - "--elastic_notifier_port=19080" + # - "--max_sessions=100000 # uncomment to increase the maximum number of electrum connections, defaults to 1000 + # - "--allow_lan_udp" # uncomment to reply to clients on the local network es01: image: docker.elastic.co/elasticsearch/elasticsearch:7.16.0 container_name: es01 @@ -78,3 +86,17 @@ services: - "es01:/usr/share/elasticsearch/data" ports: - "127.0.0.1:9200:9200" + lbcd: + image: lbry/lbcd:latest + restart: always + network_mode: host + command: + - "--notls" + - "--listen=0.0.0.0:9246" + - "--rpclisten=127.0.0.1:9245" + - "--rpcuser=lbry" + - "--rpcpass=lbry" + volumes: + - "lbcd:/root/.lbcd" + ports: + - "9246:9246" # p2p diff --git a/docker/elastic-compose.yml b/docker/elastic-compose.yml new file mode 100644 index 0000000..4fcf28a --- /dev/null +++ b/docker/elastic-compose.yml @@ -0,0 +1,23 @@ +version: "3" + +volumes: + es01: + +services: + es01: + image: docker.elastic.co/elasticsearch/elasticsearch:7.16.0 + container_name: es01 + environment: + - node.name=es01 + - discovery.type=single-node + - indices.query.bool.max_clause_count=8192 + - bootstrap.memory_lock=true + - "ES_JAVA_OPTS=-Dlog4j2.formatMsgNoLookups=true -Xms8g -Xmx8g" # no more than 32, remember to disable swap + ulimits: + memlock: + soft: -1 + hard: -1 + volumes: + - "es01:/usr/share/elasticsearch/data" + ports: + - "127.0.0.1:9200:9200" diff --git a/docker/hub-compose.yml b/docker/hub-compose.yml new file mode 100644 index 0000000..cd61138 --- /dev/null +++ b/docker/hub-compose.yml @@ -0,0 +1,60 @@ +version: "3" + +volumes: + lbry_rocksdb: + +services: + scribe: + depends_on: + - scribe_elastic_sync + image: lbry/scribe:${SCRIBE_TAG:-development} + restart: always + network_mode: host + volumes: + - "lbry_rocksdb:/database" + environment: + - HUB_COMMAND=scribe + command: + - "--daemon_url=http://lbry:lbry@127.0.0.1:9245" + - "--max_query_workers=2" + - "--cache_all_tx_hashes" + scribe_elastic_sync: + image: lbry/scribe:${SCRIBE_TAG:-development} + restart: always + network_mode: host + ports: + - "127.0.0.1:19080:19080" # elastic notifier port + volumes: + - "lbry_rocksdb:/database" + environment: + - HUB_COMMAND=scribe-elastic-sync + command: + - "--elastic_host=127.0.0.1" + - "--elastic_port=9200" + - "--max_query_workers=2" + - "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8" + - "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6" + scribe_hub: + depends_on: + - scribe_elastic_sync + - scribe + image: lbry/scribe:${SCRIBE_TAG:-development} + restart: always + network_mode: host + ports: + - "50001:50001" # electrum rpc port and udp ping port + - "2112:2112" # comment out to disable prometheus metrics + volumes: + - "lbry_rocksdb:/database" + environment: + - HUB_COMMAND=scribe-hub + command: + - "--daemon_url=http://lbry:lbry@127.0.0.1:9245" + - "--elastic_host=127.0.0.1" + - "--elastic_port=9200" + - "--max_query_workers=4" + - "--host=0.0.0.0" + - "--max_sessions=100000" + - "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8" + - "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6" + - "--prometheus_port=2112" # comment out to disable prometheus metrics diff --git a/docker/lbcd-compose.yml b/docker/lbcd-compose.yml new file mode 100644 index 0000000..b39192e --- /dev/null +++ b/docker/lbcd-compose.yml @@ -0,0 +1,19 @@ +version: "3" + +volumes: + lbcd: + +services: + lbcd: + image: lbry/lbcd:latest + restart: always + network_mode: host + command: + - "--rpcuser=lbry" + - "--rpcpass=lbry" + - "--rpclisten=127.0.0.1" + volumes: + - "lbcd:/root/.lbcd" + ports: + - "127.0.0.1:9245:9245" + - "9246:9246" # p2p port diff --git a/docker/scribe_entrypoint.sh b/docker/scribe_entrypoint.sh index 63db297..2f44c48 100755 --- a/docker/scribe_entrypoint.sh +++ b/docker/scribe_entrypoint.sh @@ -12,6 +12,6 @@ fi case "$HUB_COMMAND" in scribe ) exec /home/lbry/.local/bin/scribe "$@" ;; scribe-hub ) exec /home/lbry/.local/bin/scribe-hub "$@" ;; - scribe-elastic-sync ) exec /home/lbry/.local/bin/scribe-elastic-sync ;; + scribe-elastic-sync ) exec /home/lbry/.local/bin/scribe-elastic-sync "$@" ;; * ) "HUB_COMMAND env variable must be scribe, scribe-hub, or scribe-elastic-sync" && exit 1 ;; esac diff --git a/scribe/blockchain/__main__.py b/scribe/blockchain/__main__.py index 274c520..32f851a 100644 --- a/scribe/blockchain/__main__.py +++ b/scribe/blockchain/__main__.py @@ -2,8 +2,8 @@ import os import logging import traceback import argparse -from scribe.env import Env from scribe.common import setup_logging +from scribe.blockchain.env import BlockchainEnv from scribe.blockchain.service import BlockchainProcessorService @@ -11,11 +11,11 @@ def main(): parser = argparse.ArgumentParser( prog='scribe' ) - Env.contribute_to_arg_parser(parser) + BlockchainEnv.contribute_to_arg_parser(parser) args = parser.parse_args() try: - env = Env.from_arg_parser(args) + env = BlockchainEnv.from_arg_parser(args) setup_logging(os.path.join(env.db_dir, 'scribe.log')) block_processor = BlockchainProcessorService(env) block_processor.run() diff --git a/scribe/blockchain/env.py b/scribe/blockchain/env.py new file mode 100644 index 0000000..ee07519 --- /dev/null +++ b/scribe/blockchain/env.py @@ -0,0 +1,33 @@ +from scribe.env import Env + + +class BlockchainEnv(Env): + def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, + prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, + blocking_channel_ids=None, filtering_channel_ids=None, + db_max_open_files=64, daemon_url=None): + super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, + cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids) + self.db_max_open_files = db_max_open_files + self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') + + @classmethod + def contribute_to_arg_parser(cls, parser): + super().contribute_to_arg_parser(parser) + env_daemon_url = cls.default('DAEMON_URL', None) + parser.add_argument('--daemon_url', required=env_daemon_url is None, + help="URL for rpc from lbrycrd or lbcd, " + ":@.", + default=env_daemon_url) + parser.add_argument('--db_max_open_files', type=int, default=64, + help='This setting translates into the max_open_files option given to rocksdb. ' + 'A higher number will use more memory. Defaults to 64.') + + @classmethod + def from_arg_parser(cls, args): + return cls( + db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files, + max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit, + prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes, + cache_all_claim_txos=args.cache_all_claim_txos + ) diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index 10b2e7c..865c859 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -19,7 +19,7 @@ from scribe.blockchain.mempool import MemPool from scribe.schema.url import normalize_name from scribe.service import BlockchainService if typing.TYPE_CHECKING: - from scribe.env import Env + from scribe.blockchain.env import BlockchainEnv from scribe.db.revertable import RevertableOpStack @@ -43,7 +43,7 @@ class BlockchainProcessorService(BlockchainService): "reorg_count", "Number of reorgs", namespace=NAMESPACE ) - def __init__(self, env: 'Env'): + def __init__(self, env: 'BlockchainEnv'): super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor') self.daemon = LBCDaemon(env.coin, env.daemon_url) self.mempool = MemPool(env.coin, self.db) diff --git a/scribe/db/db.py b/scribe/db/db.py index 6f58e25..98e1591 100644 --- a/scribe/db/db.py +++ b/scribe/db/db.py @@ -36,7 +36,7 @@ NAMESPACE = f"{PROMETHEUS_NAMESPACE}_db" class HubDB: DB_VERSIONS = [7, 8] - def __init__(self, coin, db_dir: str, cache_MB: int = 512, reorg_limit: int = 200, + def __init__(self, coin, db_dir: str, reorg_limit: int = 200, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, secondary_name: str = '', max_open_files: int = 64, blocking_channel_ids: List[str] = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None): @@ -45,7 +45,6 @@ class HubDB: self._executor = executor self._db_dir = db_dir - self._cache_MB = cache_MB self._reorg_limit = reorg_limit self._cache_all_claim_txos = cache_all_claim_txos self._cache_all_tx_hashes = cache_all_tx_hashes @@ -820,8 +819,7 @@ class HubDB: ) db_path = os.path.join(self._db_dir, 'lbry-rocksdb') self.prefix_db = PrefixDB( - db_path, cache_mb=self._cache_MB, - reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files, + db_path, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files, unsafe_prefixes={DBStatePrefixRow.prefix, MempoolTXPrefixRow.prefix, HashXMempoolStatusPrefixRow.prefix}, secondary_path=secondary_path ) diff --git a/scribe/db/prefixes.py b/scribe/db/prefixes.py index 098d89c..cb46b06 100644 --- a/scribe/db/prefixes.py +++ b/scribe/db/prefixes.py @@ -1694,7 +1694,7 @@ class HashXMempoolStatusPrefixRow(PrefixRow): class PrefixDB(BasePrefixDB): - def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 64, + def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64, secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None): super().__init__(path, max_open_files=max_open_files, secondary_path=secondary_path, max_undo_depth=reorg_limit, unsafe_prefixes=unsafe_prefixes) diff --git a/scribe/elasticsearch/__main__.py b/scribe/elasticsearch/__main__.py index c604e15..f968a8c 100644 --- a/scribe/elasticsearch/__main__.py +++ b/scribe/elasticsearch/__main__.py @@ -2,8 +2,8 @@ import os import logging import traceback import argparse -from scribe.env import Env from scribe.common import setup_logging +from scribe.elasticsearch.env import ElasticEnv from scribe.elasticsearch.service import ElasticSyncService @@ -11,12 +11,11 @@ def main(): parser = argparse.ArgumentParser( prog='scribe-elastic-sync' ) - Env.contribute_to_arg_parser(parser) - parser.add_argument('--reindex', type=bool, default=False) + ElasticEnv.contribute_to_arg_parser(parser) args = parser.parse_args() try: - env = Env.from_arg_parser(args) + env = ElasticEnv.from_arg_parser(args) setup_logging(os.path.join(env.db_dir, 'scribe-elastic-sync.log')) server = ElasticSyncService(env) server.run(args.reindex) diff --git a/scribe/elasticsearch/env.py b/scribe/elasticsearch/env.py new file mode 100644 index 0000000..209d593 --- /dev/null +++ b/scribe/elasticsearch/env.py @@ -0,0 +1,54 @@ +from scribe.env import Env + + +class ElasticEnv(Env): + def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None, + cache_all_tx_hashes=None, cache_all_claim_txos=None, elastic_host=None, elastic_port=None, + es_index_prefix=None, elastic_notifier_host=None, elastic_notifier_port=None, + blocking_channel_ids=None, filtering_channel_ids=None, reindex=False): + super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, + cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids) + self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') + self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200) + self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default( + 'ELASTIC_NOTIFIER_HOST', 'localhost') + self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer( + 'ELASTIC_NOTIFIER_PORT', 19080) + self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '') + # Filtering / Blocking + self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default( + 'BLOCKING_CHANNEL_IDS', '').split(' ') + self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default( + 'FILTERING_CHANNEL_IDS', '').split(' ') + self.reindex = reindex if reindex is not None else self.boolean('REINDEX_ES', False) + + @classmethod + def contribute_to_arg_parser(cls, parser): + super().contribute_to_arg_parser(parser) + parser.add_argument('--reindex', default=False, help="Drop and rebuild the elasticsearch index.", + action='store_true') + parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str, + help="Hostname or ip address of the elasticsearch instance to connect to. " + "Can be set in env with 'ELASTIC_HOST'") + parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int, + help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'") + parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'), + type=str, help='elasticsearch sync notifier host, defaults to localhost') + parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int, + help='elasticsearch sync notifier port') + parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str) + + parser.add_argument('--query_timeout_ms', type=int, default=cls.integer('QUERY_TIMEOUT_MS', 10000), + help="Elasticsearch query timeout, in ms. Can be set in env with 'QUERY_TIMEOUT_MS'") + + @classmethod + def from_arg_parser(cls, args): + return cls( + db_dir=args.db_dir, elastic_host=args.elastic_host, + elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain, + es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, + prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes, + cache_all_claim_txos=args.cache_all_claim_txos, blocking_channel_ids=args.blocking_channel_ids, + filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host, + elastic_notifier_port=args.elastic_notifier_port + ) diff --git a/scribe/elasticsearch/service.py b/scribe/elasticsearch/service.py index f22c195..9476d5b 100644 --- a/scribe/elasticsearch/service.py +++ b/scribe/elasticsearch/service.py @@ -13,13 +13,16 @@ from scribe.elasticsearch.notifier_protocol import ElasticNotifierProtocol from scribe.elasticsearch.search import IndexVersionMismatch, expand_query from scribe.elasticsearch.constants import ALL_FIELDS, INDEX_DEFAULT_SETTINGS from scribe.elasticsearch.fast_ar_trending import FAST_AR_TRENDING_SCRIPT +if typing.TYPE_CHECKING: + from scribe.elasticsearch.env import ElasticEnv class ElasticSyncService(BlockchainReaderService): VERSION = 1 - def __init__(self, env): + def __init__(self, env: 'ElasticEnv'): super().__init__(env, 'lbry-elastic-writer', thread_workers=1, thread_prefix='lbry-elastic-writer') + self.env = env # self._refresh_interval = 0.1 self._task = None self.index = self.env.es_index_prefix + 'claims' diff --git a/scribe/env.py b/scribe/env.py index 055cda2..a019540 100644 --- a/scribe/env.py +++ b/scribe/env.py @@ -3,7 +3,6 @@ import re import resource import logging from collections import namedtuple -from ipaddress import ip_address from scribe.blockchain.network import LBCMainNet, LBCTestNet, LBCRegTest @@ -30,31 +29,12 @@ class Env: class Error(Exception): pass - def __init__(self, db_dir=None, daemon_url=None, host=None, elastic_host=None, - elastic_port=None, loop_policy=None, max_query_workers=None, - chain=None, es_index_prefix=None, cache_MB=None, reorg_limit=None, tcp_port=None, - udp_port=None, ssl_port=None, ssl_certfile=None, ssl_keyfile=None, - prometheus_port=None, max_subscriptions=None, banner_file=None, anon_logs=None, log_sessions=None, - allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None, - payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, - session_timeout=None, drop_client=None, description=None, daily_fee=None, - database_query_timeout=None, db_max_open_files=64, elastic_notifier_host=None, - elastic_notifier_port=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, - peer_announce=None): + def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, + prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, + blocking_channel_ids=None, filtering_channel_ids=None): + self.logger = logging.getLogger(__name__) self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY') - self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') - self.db_max_open_files = db_max_open_files - - self.host = host if host is not None else self.default('HOST', 'localhost') - self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') - self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200) - self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default('ELASTIC_NOTIFIER_HOST', 'localhost') - self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer('ELASTIC_NOTIFIER_PORT', 19080) - - self.loop_policy = self.set_event_loop_policy( - loop_policy if loop_policy is not None else self.default('EVENT_LOOP_POLICY', None) - ) self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK']) self.max_query_workers = max_query_workers if max_query_workers is not None else self.integer('MAX_QUERY_WORKERS', 4) if chain == 'mainnet': @@ -63,61 +43,15 @@ class Env: self.coin = LBCTestNet else: self.coin = LBCRegTest - self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '') - self.cache_MB = cache_MB if cache_MB is not None else self.integer('CACHE_MB', 1024) self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) - # Server stuff - self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None) - self.udp_port = udp_port if udp_port is not None else self.integer('UDP_PORT', self.tcp_port) - self.ssl_port = ssl_port if ssl_port is not None else self.integer('SSL_PORT', None) - if self.ssl_port: - self.ssl_certfile = ssl_certfile if ssl_certfile is not None else self.required('SSL_CERTFILE') - self.ssl_keyfile = ssl_keyfile if ssl_keyfile is not None else self.required('SSL_KEYFILE') self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0) - self.max_subscriptions = max_subscriptions if max_subscriptions is not None else self.integer('MAX_SUBSCRIPTIONS', 10000) - self.banner_file = banner_file if banner_file is not None else self.default('BANNER_FILE', None) - # self.tor_banner_file = self.default('TOR_BANNER_FILE', self.banner_file) - self.anon_logs = anon_logs if anon_logs is not None else self.boolean('ANON_LOGS', False) - self.log_sessions = log_sessions if log_sessions is not None else self.integer('LOG_SESSIONS', 3600) - self.allow_lan_udp = allow_lan_udp if allow_lan_udp is not None else self.boolean('ALLOW_LAN_UDP', False) self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False) self.cache_all_claim_txos = cache_all_claim_txos if cache_all_claim_txos is not None else self.boolean('CACHE_ALL_CLAIM_TXOS', False) - self.country = country if country is not None else self.default('COUNTRY', 'US') - # Peer discovery - self.peer_discovery = self.peer_discovery_enum() - self.peer_announce = peer_announce if peer_announce is not None else self.boolean('PEER_ANNOUNCE', True) - if peer_hubs is not None: - self.peer_hubs = [p.strip("") for p in peer_hubs.split(",")] - else: - self.peer_hubs = self.extract_peer_hubs() - # self.tor_proxy_host = self.default('TOR_PROXY_HOST', 'localhost') - # self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None) - # The electrum client takes the empty string as unspecified - self.payment_address = payment_address if payment_address is not None else self.default('PAYMENT_ADDRESS', '') - self.donation_address = donation_address if donation_address is not None else self.default('DONATION_ADDRESS', '') - # Server limits to help prevent DoS - self.max_send = max_send if max_send is not None else self.integer('MAX_SEND', 1000000) - self.max_receive = max_receive if max_receive is not None else self.integer('MAX_RECEIVE', 1000000) - # self.max_subs = self.integer('MAX_SUBS', 250000) - self.max_sessions = max_sessions if max_sessions is not None else self.sane_max_sessions() - # self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000) - self.session_timeout = session_timeout if session_timeout is not None else self.integer('SESSION_TIMEOUT', 600) - self.drop_client = drop_client if drop_client is not None else self.custom("DROP_CLIENT", None, re.compile) - self.description = description if description is not None else self.default('DESCRIPTION', '') - self.daily_fee = daily_fee if daily_fee is not None else self.string_amount('DAILY_FEE', '0') - - # Identities - clearnet_identity = self.clearnet_identity() - tor_identity = self.tor_identity(clearnet_identity) - self.identities = [identity - for identity in (clearnet_identity, tor_identity) - if identity is not None] - self.database_query_timeout = database_query_timeout if database_query_timeout is not None else \ - (float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0) - # Filtering / Blocking - self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default('BLOCKING_CHANNEL_IDS', '').split(' ') - self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default('FILTERING_CHANNEL_IDS', '').split(' ') + self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default( + 'BLOCKING_CHANNEL_IDS', '').split(' ') + self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default( + 'FILTERING_CHANNEL_IDS', '').split(' ') @classmethod def default(cls, envvar, default): @@ -169,19 +103,6 @@ class Env: if bad: raise cls.Error(f'remove obsolete os.environment variables {bad}') - @classmethod - def set_event_loop_policy(cls, policy_name: str = None): - if not policy_name or policy_name == 'default': - import asyncio - return asyncio.get_event_loop_policy() - elif policy_name == 'uvloop': - import uvloop - import asyncio - loop_policy = uvloop.EventLoopPolicy() - asyncio.set_event_loop_policy(loop_policy) - return loop_policy - raise cls.Error(f'unknown event loop policy "{policy_name}"') - def cs_host(self): """Returns the 'host' argument to pass to asyncio's create_server call. The result can be a single host name string, a list of @@ -213,67 +134,6 @@ class Env: f'because your open file limit is {nofile_limit:,d}') return value - def clearnet_identity(self): - host = self.default('REPORT_HOST', None) - if host is None: - return None - try: - ip = ip_address(host) - except ValueError: - bad = (not is_valid_hostname(host) - or host.lower() == 'localhost') - else: - bad = (ip.is_multicast or ip.is_unspecified - or (ip.is_private and self.peer_announce)) - if bad: - raise self.Error(f'"{host}" is not a valid REPORT_HOST') - tcp_port = self.integer('REPORT_TCP_PORT', self.tcp_port) or None - ssl_port = self.integer('REPORT_SSL_PORT', self.ssl_port) or None - if tcp_port == ssl_port: - raise self.Error('REPORT_TCP_PORT and REPORT_SSL_PORT ' - f'both resolve to {tcp_port}') - return NetIdentity( - host, - tcp_port, - ssl_port, - '' - ) - - def tor_identity(self, clearnet): - host = self.default('REPORT_HOST_TOR', None) - if host is None: - return None - if not host.endswith('.onion'): - raise self.Error(f'tor host "{host}" must end with ".onion"') - - def port(port_kind): - """Returns the clearnet identity port, if any and not zero, - otherwise the listening port.""" - result = 0 - if clearnet: - result = getattr(clearnet, port_kind) - return result or getattr(self, port_kind) - - tcp_port = self.integer('REPORT_TCP_PORT_TOR', - port('tcp_port')) or None - ssl_port = self.integer('REPORT_SSL_PORT_TOR', - port('ssl_port')) or None - if tcp_port == ssl_port: - raise self.Error('REPORT_TCP_PORT_TOR and REPORT_SSL_PORT_TOR ' - f'both resolve to {tcp_port}') - - return NetIdentity( - host, - tcp_port, - ssl_port, - '_tor', - ) - - def hosts_dict(self): - return {identity.host: {'tcp_port': identity.tcp_port, - 'ssl_port': identity.ssl_port} - for identity in self.identities} - def peer_discovery_enum(self): pd = self.default('PEER_DISCOVERY', 'on').strip().lower() if pd in ('off', ''): @@ -291,98 +151,43 @@ class Env: @classmethod def contribute_to_arg_parser(cls, parser): - parser.add_argument('--db_dir', type=str, help='path of the directory containing lbry-leveldb', - default=cls.default('DB_DIRECTORY', None)) - parser.add_argument('--daemon_url', - help='URL for rpc from lbrycrd, :@', - default=cls.default('DAEMON_URL', None)) - parser.add_argument('--db_max_open_files', type=int, default=64, - help='number of files rocksdb can have open at a time') - parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'), - help='Interface for hub server to listen on') - parser.add_argument('--tcp_port', type=int, default=cls.integer('TCP_PORT', 50001), - help='TCP port to listen on for hub server') - parser.add_argument('--udp_port', type=int, default=cls.integer('UDP_PORT', 50001), - help='UDP port to listen on for hub server') + """ + Settings used by all services + """ - - parser.add_argument('--ssl_port', default=cls.integer('SSL_PORT', None), type=int, - help='SSL port to listen on for hub server') - parser.add_argument('--ssl_certfile', default=cls.default('SSL_CERTFILE', None), type=str, - help='Path to SSL cert file') - parser.add_argument('--ssl_keyfile', default=cls.default('SSL_KEYFILE', None), type=str, - help='Path to SSL key file') + env_db_dir = cls.default('DB_DIRECTORY', None) + parser.add_argument('--db_dir', type=str, required=env_db_dir is None, + help="Path of the directory containing lbry-rocksdb. ", default=env_db_dir) parser.add_argument('--reorg_limit', default=cls.integer('REORG_LIMIT', 200), type=int, help='Max reorg depth') - parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str, - help='elasticsearch host, defaults to localhost') - parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int, - help='elasticsearch port, defaults to 9200') - parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'), - type=str, help='elasticsearch sync notifier host, defaults to localhost') - parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int, - help='elasticsearch sync notifier port') - parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str) - parser.add_argument('--loop_policy', default=cls.default('EVENT_LOOP_POLICY', 'default'), type=str, - choices=['default', 'uvloop']) - parser.add_argument('--max_query_workers', type=int, default=cls.integer('MAX_QUERY_WORKERS', 4), - help='number of threads used by the request handler to read the database') - parser.add_argument('--cache_MB', type=int, default=cls.integer('CACHE_MB', 1024), - help='size of the leveldb lru cache, in megabytes') - parser.add_argument('--cache_all_tx_hashes', type=bool, - help='Load all tx hashes into memory. This will make address subscriptions and sync, ' - 'resolve, transaction fetching, and block sync all faster at the expense of higher ' - 'memory usage') - parser.add_argument('--cache_all_claim_txos', type=bool, - help='Load all claim txos into memory. This will make address subscriptions and sync, ' - 'resolve, transaction fetching, and block sync all faster at the expense of higher ' - 'memory usage') - parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0), - help='port for hub prometheus metrics to listen on, disabled by default') - parser.add_argument('--max_subscriptions', type=int, default=cls.integer('MAX_SUBSCRIPTIONS', 10000), - help='max subscriptions per connection') - parser.add_argument('--banner_file', type=str, default=cls.default('BANNER_FILE', None), - help='path to file containing banner text') - parser.add_argument('--anon_logs', type=bool, default=cls.boolean('ANON_LOGS', False), - help="don't log ip addresses") - parser.add_argument('--allow_lan_udp', type=bool, default=cls.boolean('ALLOW_LAN_UDP', False), - help='reply to hub UDP ping messages from LAN ip addresses') - parser.add_argument('--country', type=str, default=cls.default('COUNTRY', 'US'), help='') - parser.add_argument('--max_send', type=int, default=cls.default('MAX_SEND', 1000000), help='') - parser.add_argument('--max_receive', type=int, default=cls.default('MAX_RECEIVE', 1000000), help='') - parser.add_argument('--max_sessions', type=int, default=cls.default('MAX_SESSIONS', 1000), help='') - parser.add_argument('--session_timeout', type=int, default=cls.default('SESSION_TIMEOUT', 600), help='') - parser.add_argument('--drop_client', type=str, default=cls.default('DROP_CLIENT', None), help='') - parser.add_argument('--description', type=str, default=cls.default('DESCRIPTION', ''), help='') - parser.add_argument('--daily_fee', type=float, default=cls.default('DAILY_FEE', 0.0), help='') - parser.add_argument('--payment_address', type=str, default=cls.default('PAYMENT_ADDRESS', ''), help='') - parser.add_argument('--donation_address', type=str, default=cls.default('DONATION_ADDRESS', ''), help='') parser.add_argument('--chain', type=str, default=cls.default('NET', 'mainnet'), - help="Which chain to use, default is mainnet", choices=['mainnet', 'regtest', 'testnet']) - parser.add_argument('--query_timeout_ms', type=int, default=cls.integer('QUERY_TIMEOUT_MS', 10000), - help="elasticsearch query timeout") - - parser.add_argument('--blocking_channel_ids', nargs='*', help='', + help="Which chain to use, default is mainnet, others are used for testing", + choices=['mainnet', 'regtest', 'testnet']) + parser.add_argument('--max_query_workers', type=int, default=cls.integer('MAX_QUERY_WORKERS', 4), + help="Size of the thread pool. Can be set in env with 'MAX_QUERY_WORKERS'") + parser.add_argument('--cache_all_tx_hashes', action='store_true', + help="Load all tx hashes into memory. This will make address subscriptions and sync, " + "resolve, transaction fetching, and block sync all faster at the expense of higher " + "memory usage (at least 10GB more). Can be set in env with 'CACHE_ALL_TX_HASHES'.", + default=cls.boolean('CACHE_ALL_TX_HASHES', False)) + parser.add_argument('--cache_all_claim_txos', action='store_true', + help="Load all claim txos into memory. This will make address subscriptions and sync, " + "resolve, transaction fetching, and block sync all faster at the expense of higher " + "memory usage. Can be set in env with 'CACHE_ALL_CLAIM_TXOS'.", + default=cls.boolean('CACHE_ALL_CLAIM_TXOS', False)) + parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0), + help="Port for prometheus metrics to listen on, disabled by default. " + "Can be set in env with 'PROMETHEUS_PORT'.") + parser.add_argument('--blocking_channel_ids', nargs='*', + help="Space separated list of channel claim ids used for blocking. " + "Claims that are reposted by these channels can't be resolved " + "or returned in search results. Can be set in env with 'BLOCKING_CHANNEL_IDS'", default=cls.default('BLOCKING_CHANNEL_IDS', '').split(' ')) - parser.add_argument('--filtering_channel_ids', nargs='*', help='', + parser.add_argument('--filtering_channel_ids', nargs='*', + help="Space separated list of channel claim ids used for blocking. " + "Claims that are reposted by these channels aren't returned in search results. " + "Can be set in env with 'FILTERING_CHANNEL_IDS'", default=cls.default('FILTERING_CHANNEL_IDS', '').split(' ')) @classmethod def from_arg_parser(cls, args): - return cls( - db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files, - host=args.host, elastic_host=args.elastic_host, elastic_port=args.elastic_port, - loop_policy=args.loop_policy, max_query_workers=args.max_query_workers, - chain=args.chain, es_index_prefix=args.es_index_prefix, - cache_MB=args.cache_MB, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port, - udp_port=args.udp_port, ssl_port=args.ssl_port, ssl_certfile=args.ssl_certfile, - ssl_keyfile=args.ssl_keyfile, prometheus_port=args.prometheus_port, - max_subscriptions=args.max_subscriptions, banner_file=args.banner_file, anon_logs=args.anon_logs, - log_sessions=None, allow_lan_udp=args.allow_lan_udp, - cache_all_tx_hashes=args.cache_all_tx_hashes, cache_all_claim_txos=args.cache_all_claim_txos, - country=args.country, payment_address=args.payment_address, donation_address=args.donation_address, - max_send=args.max_send, max_receive=args.max_receive, max_sessions=args.max_sessions, - session_timeout=args.session_timeout, drop_client=args.drop_client, description=args.description, - daily_fee=args.daily_fee, database_query_timeout=(args.query_timeout_ms / 1000), - blocking_channel_ids=args.blocking_channel_ids, filtering_channel_ids=args.filtering_channel_ids, - elastic_notifier_host=args.elastic_notifier_host, elastic_notifier_port=args.elastic_notifier_port - ) + raise NotImplementedError() diff --git a/scribe/hub/__main__.py b/scribe/hub/__main__.py index 519026e..0862e87 100644 --- a/scribe/hub/__main__.py +++ b/scribe/hub/__main__.py @@ -2,8 +2,8 @@ import os import logging import traceback import argparse -from scribe.env import Env from scribe.common import setup_logging +from scribe.hub.env import ServerEnv from scribe.hub.service import HubServerService @@ -11,11 +11,10 @@ def main(): parser = argparse.ArgumentParser( prog='scribe-hub' ) - Env.contribute_to_arg_parser(parser) + ServerEnv.contribute_to_arg_parser(parser) args = parser.parse_args() - try: - env = Env.from_arg_parser(args) + env = ServerEnv.from_arg_parser(args) setup_logging(os.path.join(env.db_dir, 'scribe-hub.log')) server = HubServerService(env) server.run() diff --git a/scribe/hub/env.py b/scribe/hub/env.py new file mode 100644 index 0000000..bbfa99e --- /dev/null +++ b/scribe/hub/env.py @@ -0,0 +1,113 @@ +import re +from scribe.env import Env + + +class ServerEnv(Env): + def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, + prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, + daemon_url=None, host=None, elastic_host=None, elastic_port=None, es_index_prefix=None, + tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None, + payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, + session_timeout=None, drop_client=None, description=None, daily_fee=None, + database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None, + blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None): + super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, + cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids) + self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') + self.host = host if host is not None else self.default('HOST', 'localhost') + self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') + self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200) + self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default( + 'ELASTIC_NOTIFIER_HOST', 'localhost') + self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer( + 'ELASTIC_NOTIFIER_PORT', 19080) + self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '') + # Server stuff + self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None) + self.udp_port = udp_port if udp_port is not None else self.integer('UDP_PORT', self.tcp_port) + self.banner_file = banner_file if banner_file is not None else self.default('BANNER_FILE', None) + self.allow_lan_udp = allow_lan_udp if allow_lan_udp is not None else self.boolean('ALLOW_LAN_UDP', False) + self.country = country if country is not None else self.default('COUNTRY', 'US') + # Peer discovery + self.peer_discovery = self.peer_discovery_enum() + self.peer_announce = peer_announce if peer_announce is not None else self.boolean('PEER_ANNOUNCE', True) + if peer_hubs is not None: + self.peer_hubs = [p.strip("") for p in peer_hubs.split(",")] + else: + self.peer_hubs = self.extract_peer_hubs() + # The electrum client takes the empty string as unspecified + self.payment_address = payment_address if payment_address is not None else self.default('PAYMENT_ADDRESS', '') + self.donation_address = donation_address if donation_address is not None else self.default('DONATION_ADDRESS', + '') + # Server limits to help prevent DoS + self.max_send = max_send if max_send is not None else self.integer('MAX_SEND', 1000000000000000000) + self.max_receive = max_receive if max_receive is not None else self.integer('MAX_RECEIVE', 1000000000000000000) + self.max_sessions = max_sessions if max_sessions is not None else self.sane_max_sessions() + self.session_timeout = session_timeout if session_timeout is not None else self.integer('SESSION_TIMEOUT', 600) + self.drop_client = re.compile(drop_client) if drop_client is not None else self.custom("DROP_CLIENT", None, re.compile) + self.description = description if description is not None else self.default('DESCRIPTION', '') + self.daily_fee = daily_fee if daily_fee is not None else self.string_amount('DAILY_FEE', '0') + self.database_query_timeout = (database_query_timeout / 1000.0) if database_query_timeout is not None else \ + (float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0) + + @classmethod + def contribute_to_arg_parser(cls, parser): + super().contribute_to_arg_parser(parser) + env_daemon_url = cls.default('DAEMON_URL', None) + parser.add_argument('--daemon_url', required=env_daemon_url is None, + help="URL for rpc from lbrycrd or lbcd, " + ":@.", + default=env_daemon_url) + parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'), + help="Interface for hub server to listen on, use 0.0.0.0 to listen on the external " + "interface. Can be set in env with 'HOST'") + parser.add_argument('--tcp_port', type=int, default=cls.integer('TCP_PORT', 50001), + help="Electrum TCP port to listen on for hub server. Can be set in env with 'TCP_PORT'") + parser.add_argument('--udp_port', type=int, default=cls.integer('UDP_PORT', 50001), + help="'UDP port to listen on for hub server. Can be set in env with 'UDP_PORT'") + parser.add_argument('--max_sessions', type=int, default=cls.integer('MAX_SESSIONS', 100000), + help="Maximum number of electrum clients that can be connected, defaults to 100000.") + parser.add_argument('--max_send', type=int, default=cls.integer('MAX_SESSIONS', 1000000000000000000), + help="Maximum size of a request") + parser.add_argument('--max_receive', type=int, default=cls.integer('MAX_SESSIONS', 1000000000000000000), + help="Maximum size of a response") + parser.add_argument('--drop_client', type=str, default=cls.default('DROP_CLIENT', None), + help="Regex used for blocking clients") + parser.add_argument('--session_timeout', type=int, default=cls.integer('SESSION_TIMEOUT', 600), + help="Session inactivity timeout") + parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str, + help="Hostname or ip address of the elasticsearch instance to connect to. " + "Can be set in env with 'ELASTIC_HOST'") + parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int, + help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'") + parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'), + type=str, help='elasticsearch sync notifier host, defaults to localhost') + parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int, + help='elasticsearch sync notifier port') + parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str) + parser.add_argument('--allow_lan_udp', action='store_true', + help="Reply to clients on the local network", default=cls.boolean('ALLOW_LAN_UDP', False)) + parser.add_argument('--description', default=cls.default('DESCRIPTION', None), type=str) + parser.add_argument('--banner_file', default=cls.default('BANNER_FILE', None), type=str) + parser.add_argument('--country', default=cls.default('COUNTRY', 'US'), type=str) + parser.add_argument('--payment_address', default=cls.default('PAYMENT_ADDRESS', None), type=str) + parser.add_argument('--donation_address', default=cls.default('DONATION_ADDRESS', None), type=str) + parser.add_argument('--daily_fee', default=cls.default('DAILY_FEE', '0'), type=str) + parser.add_argument('--query_timeout_ms', default=cls.default('QUERY_TIMEOUT_MS', 10000), type=int) + + @classmethod + def from_arg_parser(cls, args): + return cls( + db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_host=args.elastic_host, + elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain, + es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port, + udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file, + allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes, + cache_all_claim_txos=args.cache_all_claim_txos, country=args.country, payment_address=args.payment_address, + donation_address=args.donation_address, max_send=args.max_send, max_receive=args.max_receive, + max_sessions=args.max_sessions, session_timeout=args.session_timeout, + drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee, + database_query_timeout=args.query_timeout_ms / 1000, blocking_channel_ids=args.blocking_channel_ids, + filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host, + elastic_notifier_port=args.elastic_notifier_port + ) diff --git a/scribe/hub/service.py b/scribe/hub/service.py index afeef59..ccc4d33 100644 --- a/scribe/hub/service.py +++ b/scribe/hub/service.py @@ -1,5 +1,5 @@ import time - +import typing import asyncio from scribe.blockchain.daemon import LBCDaemon from scribe.hub.session import SessionManager @@ -7,11 +7,14 @@ from scribe.hub.mempool import HubMemPool from scribe.hub.udp import StatusServer from scribe.service import BlockchainReaderService from scribe.elasticsearch import ElasticNotifierClientProtocol +if typing.TYPE_CHECKING: + from scribe.hub.env import ServerEnv class HubServerService(BlockchainReaderService): - def __init__(self, env): + def __init__(self, env: 'ServerEnv'): super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker') + self.env = env self.notifications_to_send = [] self.mempool_notifications = set() self.status_server = StatusServer() diff --git a/scribe/hub/session.py b/scribe/hub/session.py index cb4a45a..e350d5c 100644 --- a/scribe/hub/session.py +++ b/scribe/hub/session.py @@ -28,7 +28,7 @@ from scribe.hub.common import BatchRequest, ProtocolError, Request, Batch, Notif from scribe.hub.framer import NewlineFramer if typing.TYPE_CHECKING: from scribe.db import HubDB - from scribe.env import Env + from scribe.hub.env import ServerEnv from scribe.blockchain.daemon import LBCDaemon from scribe.hub.mempool import HubMemPool @@ -164,7 +164,7 @@ class SessionManager: namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS ) - def __init__(self, env: 'Env', db: 'HubDB', mempool: 'HubMemPool', + def __init__(self, env: 'ServerEnv', db: 'HubDB', mempool: 'HubMemPool', daemon: 'LBCDaemon', shutdown_event: asyncio.Event, on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]): env.max_send = max(350000, env.max_send) @@ -230,10 +230,6 @@ class SessionManager: host = env.cs_host() if env.tcp_port is not None: await self._start_server('TCP', host, env.tcp_port) - if env.ssl_port is not None: - sslc = ssl.SSLContext(ssl.PROTOCOL_TLS) - sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) - await self._start_server('SSL', host, env.ssl_port, ssl=sslc) async def _close_servers(self, kinds): """Close the servers of the given kinds (TCP etc.).""" @@ -698,7 +694,6 @@ class LBRYElectrumX(asyncio.Protocol): self.kind = kind # 'RPC', 'TCP' etc. self.coin = self.env.coin - self.anon_logs = self.env.anon_logs self.txs_sent = 0 self.log_me = False self.daemon_request = self.session_manager.daemon_request @@ -785,19 +780,6 @@ class LBRYElectrumX(asyncio.Protocol): def default_framer(self): return NewlineFramer(self.env.max_receive) - def peer_address_str(self, *, for_log=True): - """Returns the peer's IP address and port as a human-readable - string, respecting anon logs if the output is for a log.""" - if for_log and self.anon_logs: - return 'xx.xx.xx.xx:xx' - if not self._address: - return 'unknown' - ip_addr_str, port = self._address[:2] - if ':' in ip_addr_str: - return f'[{ip_addr_str}]:{port}' - else: - return f'{ip_addr_str}:{port}' - def toggle_logging(self): self.log_me = not self.log_me @@ -1037,7 +1019,7 @@ class LBRYElectrumX(asyncio.Protocol): await self._send_message(message) return True except asyncio.TimeoutError: - self.logger.info("timeout sending address notification to %s", self.peer_address_str(for_log=True)) + self.logger.info(f"timeout sending address notification to {self._address[0]}:{self._address[1]}") self.abort() return False @@ -1048,7 +1030,7 @@ class LBRYElectrumX(asyncio.Protocol): await self._send_message(message) return True except asyncio.TimeoutError: - self.logger.info("timeout sending address notification to %s", self.peer_address_str(for_log=True)) + self.logger.info(f"timeout sending address notification to {self._address[0]}:{self._address[1]}") self.abort() return False @@ -1078,7 +1060,7 @@ class LBRYElectrumX(asyncio.Protocol): """Return the server features dictionary.""" min_str, max_str = cls.protocol_min_max_strings() cls.cached_server_features.update({ - 'hosts': env.hosts_dict(), + 'hosts': {}, 'pruning': None, 'server_version': cls.version, 'protocol_min': min_str, diff --git a/scribe/service.py b/scribe/service.py index e97ce92..bcde306 100644 --- a/scribe/service.py +++ b/scribe/service.py @@ -28,7 +28,7 @@ class BlockchainService: self.lock = asyncio.Lock() self.last_state: typing.Optional[DBState] = None self.db = HubDB( - env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes, + env.coin, env.db_dir, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes, secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids, executor=self._executor ) diff --git a/tests/test_revertable.py b/tests/test_revertable.py index 37cbc59..3d566f0 100644 --- a/tests/test_revertable.py +++ b/tests/test_revertable.py @@ -107,7 +107,7 @@ class TestRevertableOpStack(unittest.TestCase): class TestRevertablePrefixDB(unittest.TestCase): def setUp(self): self.tmp_dir = tempfile.mkdtemp() - self.db = PrefixDB(self.tmp_dir, cache_mb=1, max_open_files=32) + self.db = PrefixDB(self.tmp_dir, max_open_files=32) def tearDown(self) -> None: self.db.close()