Merge pull request #28 from lbryio/update-settings

Improve documentation
This commit is contained in:
Jack Robison 2022-05-05 17:27:59 -04:00 committed by GitHub
commit 9fa232e3a1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 682 additions and 323 deletions

View file

@ -34,7 +34,7 @@ docker build -f ./docker/Dockerfile.scribe -t lbry/scribe:development .
Scribe has been tested with python 3.7-3.9. Higher versions probably work but have not yet been tested. Scribe has been tested with python 3.7-3.9. Higher versions probably work but have not yet been tested.
1. clone the scribe scribe 1. clone the scribe repo
``` ```
git clone https://github.com/lbryio/scribe.git git clone https://github.com/lbryio/scribe.git
cd scribe cd scribe
@ -49,21 +49,60 @@ source scribe-venv/bin/activate
pip install -e . pip install -e .
``` ```
That completes the installation, now you should have the commands `scribe`, `scribe-elastic-sync` and `scribe-hub`
These can also optionally be run with `python -m scribe.blockchain`, `python -m scribe.elasticsearch`, and `python -m scribe.hub`
## Usage ## Usage
Scribe needs either the [lbrycrd](https://github.com/lbryio/lbrycrd) or [lbcd](https://github.com/lbryio/lbcd) blockchain daemon to be running. ### Requirements
As of block 1124663 (3/10/22) the size of the rocksdb database is 87GB and the size of the elasticsearch volume is 49GB. Scribe needs elasticsearch and either the [lbrycrd](https://github.com/lbryio/lbrycrd) or [lbcd](https://github.com/lbryio/lbcd) blockchain daemon to be running.
With options for high performance, if you have 64gb of memory and 12 cores, everything can be run on the same machine. However, the recommended way is with elasticsearch on one instance with 8gb of memory and at least 4 cores dedicated to it and the blockchain daemon on another with 16gb of memory and at least 4 cores. Then the scribe hub services can be run their own instance with between 16 and 32gb of memory (depending on settings) and 8 cores.
As of block 1147423 (4/21/22) the size of the scribe rocksdb database is 120GB and the size of the elasticsearch volume is 63GB.
### docker-compose ### docker-compose
The recommended way to run a scribe hub is with docker. See [this guide](https://github.com/lbryio/scribe/blob/master/cluster_guide.md) for instructions.
If you have the resources to run all of the services on one machine (at least 300gb of fast storage, preferably nvme, 64gb of RAM, 12 fast cores), see [this](https://github.com/lbryio/scribe/blob/master/docker/docker-compose.yml) docker-compose example.
### From source ### From source
To start scribe, run the following (providing your own args) ### Options
``` #### Content blocking and filtering
scribe --db_dir /your/db/path --daemon_url rpcuser:rpcpass@localhost:9245
``` For various reasons it may be desirable to block or filtering content from claim search and resolve results, [here](https://github.com/lbryio/scribe/blob/master/blocking.md) are instructions for how to configure and use this feature as well as information about the recommended defaults.
#### Common options across `scribe`, `scribe-hub`, and `scribe-elastic-sync`:
- `--db_dir` (required) Path of the directory containing lbry-rocksdb, set from the environment with `DB_DIRECTORY`
- `--daemon_url` (required for `scribe` and `scribe-hub`) URL for rpc from lbrycrd or lbcd<rpcuser>:<rpcpassword>@<lbrycrd rpc ip><lbrycrd rpc port>.
- `--reorg_limit` Max reorg depth, defaults to 200, set from the environment with `REORG_LIMIT`.
- `--chain` With blockchain to use - either `mainnet`, `testnet`, or `regtest` - set from the environment with `NET`
- `--max_query_workers` Size of the thread pool, set from the environment with `MAX_QUERY_WORKERS`
- `--cache_all_tx_hashes` If this flag is set, all tx hashes will be stored in memory. For `scribe`, this speeds up the rate it can apply blocks as well as process mempool. For `scribe-hub`, this will speed up syncing address histories. This setting will use 10+g of memory. It can be set from the environment with `CACHE_ALL_TX_HASHES=Yes`
- `--cache_all_claim_txos` If this flag is set, all claim txos will be indexed in memory. Set from the environment with `CACHE_ALL_CLAIM_TXOS=Yes`
- `--prometheus_port` If provided this port will be used to provide prometheus metrics, set from the environment with `PROMETHEUS_PORT`
#### Options for `scribe`
- `--db_max_open_files` This setting translates into the max_open_files option given to rocksdb. A higher number will use more memory. Defaults to 64.
#### Options for `scribe-elastic-sync`
- `--reindex` If this flag is set drop and rebuild the elasticsearch index.
#### Options for `scribe-hub`
- `--host` Interface for server to listen on, use 0.0.0.0 to listen on the external interface. Can be set from the environment with `HOST`
- `--tcp_port` Electrum TCP port to listen on for hub server. Can be set from the environment with `TCP_PORT`
- `--udp_port` UDP port to listen on for hub server. Can be set from the environment with `UDP_PORT`
- `--elastic_host` Hostname or ip address of the elasticsearch instance to connect to. Can be set from the environment with `ELASTIC_HOST`
- `--elastic_port` Elasticsearch port to connect to. Can be set from the environment with `ELASTIC_PORT`
- `--elastic_notifier_host` Elastic sync notifier host to connect to, defaults to localhost. Can be set from the environment with `ELASTIC_NOTIFIER_HOST`
- `--elastic_notifier_port` Elastic sync notifier port to connect using. Can be set from the environment with `ELASTIC_NOTIFIER_PORT`
- `--query_timeout_ms` Timeout for claim searches in elasticsearch in milliseconds. Can be set from the environment with `QUERY_TIMEOUT_MS`
- `--blocking_channel_ids` Space separated list of channel claim ids used for blocking. Claims that are reposted by these channels can't be resolved or returned in search results. Can be set from the environment with `BLOCKING_CHANNEL_IDS`.
- `--filtering_channel_ids` Space separated list of channel claim ids used for blocking. Claims that are reposted by these channels aren't returned in search results. Can be set from the environment with `FILTERING_CHANNEL_IDS`
## Contributing ## Contributing

23
blocking.md Normal file
View file

@ -0,0 +1,23 @@
### Claim filtering and blocking
- Filtered claims are removed from claim search results (`blockchain.claimtrie.search`), they can still be resolved (`blockchain.claimtrie.resolve`)
- Blocked claims are not included in claim search results and cannot be resolved.
Claims that are either filtered or blocked are replaced with a corresponding error message that includes the censoring channel id in a result that would return them.
#### How to filter or block claims:
1. Make a channel (using lbry-sdk) and include the claim id of the channel in `--filtering_channel_ids` or `--blocking_channel_ids` used by `scribe-hub` **and** `scribe-elastic-sync`, depending on which you want to use the channel for. To use both blocking and filtering, make one channel for each.
2. Using lbry-sdk, repost the claim to be blocked or filtered using your corresponding channel. If you block/filter a claim id for a channel, it will block/filter all of the claims in the channel.
#### Defaults
The example docker-composes in the setup guide use the following defaults:
Filtering:
- `lbry://@LBRY-TagAbuse#770bd7ecba84fd2f7607fb15aedd2b172c2e153f`
- `lbry://@LBRY-UntaggedPorn#95e5db68a3101df19763f3a5182e4b12ba393ee8`
Blocking
- `lbry://@LBRY-DMCA#dd687b357950f6f271999971f43c785e8067c3a9`
- `lbry://@LBRY-DMCARedFlag#06871aa438032244202840ec59a469b303257cad`
- `lbry://@LBRY-OtherUSIllegal#b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6`

194
cluster_guide.md Normal file
View file

@ -0,0 +1,194 @@
## Cluster environment guide
For best performance the recommended setup uses three server instances, these can be rented VPSs, self hosted VMs (ideally not on one physical host unless the host is sufficiently powerful), or physical computers. One is a dedicated lbcd node, one an elasticsearch server, and the third runs the scribe services. With this configuration the lbcd and elasticsearch servers can be shared between multiple scribe hub servers - more on that later.
Server Requirements (space requirements are at least double what's needed so it's possible to copy snapshots into place or make snapshots):
- lbcd: 2 cores, 8gb ram (slightly more may be required syncing from scratch, from a snapshot 8 is plenty), 150gb of NVMe storage
- elasticsearch: 8 cores, 9gb of ram (8gb minimum given to ES), 150gb of SSD speed storage
- scribe: 8 cores, 32gb of ram, 200gb of NVMe storage
All servers are assumed to be running ubuntu 20.04 with user named `lbry` with passwordless sudo and docker group permissions, ssh configured, ulimits set high, and docker + docker-compose installed. The server running elasticsearch should have swap disabled. The three servers need to be able to communicate with each other, they can be on a local network together or communicate over the internet. This guide will assume the three servers are on the internet.
### Setting up the lbcd instance
Log in to the lbcd instance and perform the following steps:
- Build the lbcd docker image by running
```
git clone https://github.com/lbryio/lbcd.git
cd lbcd
docker build . -t lbry/lbcd:latest
```
- Copy the following to `~/docker-compose.yml`
```
version: "3"
volumes:
lbcd:
services:
lbcd:
image: lbry/lbcd:latest
restart: always
network_mode: host
command:
- "--rpcuser=lbry"
- "--rpcpass=lbry"
- "--rpclisten=127.0.0.1"
volumes:
- "lbcd:/root/.lbcd"
ports:
- "127.0.0.1:9245:9245"
- "9246:9246" # p2p port
```
- Start lbcd by running `docker-compose up -d`
- Check the progress with `docker-compose logs -f --tail 100`
### Setting up the elasticsearch instance
Log in to the elasticsearch instance and perform the following steps:
- Copy the following to `~/docker-compose.yml`
```
version: "3"
volumes:
es01:
services:
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.0
container_name: es01
environment:
- node.name=es01
- discovery.type=single-node
- indices.query.bool.max_clause_count=8192
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Dlog4j2.formatMsgNoLookups=true -Xms8g -Xmx8g" # no more than 32, remember to disable swap
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- "es01:/usr/share/elasticsearch/data"
ports:
- "127.0.0.1:9200:9200"
```
- Start elasticsearch by running `docker-compose up -d`
- Check the status with `docker-compose logs -f --tail 100`
### Setting up the scribe hub instance
- Log in (ssh) to the scribe instance and generate and print out a ssh key, this is needed to set up port forwards to the other two instances. Copy the output of the following:
```
ssh-keygen -q -t ed25519 -N '' -f ~/.ssh/id_ed25519 <<<y >/dev/null 2>&1
```
- After copying the above key, log out of the scribe hub instance.
- Log in to the elasticsearch instance add the copied key to `~/.ssh/authorized_keys` (see [this](https://stackoverflow.com/questions/6377009/adding-a-public-key-to-ssh-authorized-keys-does-not-log-me-in-automatically) if confused). Log out of the elasticsearch instance once done.
- Log in to the lbcd instance and add the copied key to `~/.ssh/authorized_keys`, log out when done.
- Log in to the scribe instance and copy the following to `/etc/systemd/system/es-tunnel.service`, replacing `lbry` with your user and `your-elastic-ip` with your elasticsearch instance ip.
```
[Unit]
Description=Persistent SSH Tunnel for ES
After=network.target
[Service]
Restart=on-failure
RestartSec=5
ExecStart=/usr/bin/ssh -NTC -o ServerAliveInterval=60 -o ExitOnForwardFailure=yes -L 127.0.0.1:9200:127.0.0.1:9200 lbry@your-elastic-ip
User=lbry
Group=lbry
[Install]
WantedBy=multi-user.target
```
- Next, copy the following to `/etc/systemd/system/lbcd-tunnel.service` on the scribe instance, replacing `lbry` with your user and `your-lbcd-ip` with your lbcd instance ip.
```
[Unit]
Description=Persistent SSH Tunnel for lbcd
After=network.target
[Service]
Restart=on-failure
RestartSec=5
ExecStart=/usr/bin/ssh -NTC -o ServerAliveInterval=60 -o ExitOnForwardFailure=yes -L 127.0.0.1:9245:127.0.0.1:9245 lbry@your-lbcd-ip
User=lbry
Group=lbry
[Install]
WantedBy=multi-user.target
```
- Verify you can ssh in to the elasticsearch and lbcd instances from the scribe instance
- Enable and start the ssh port forward services on the scribe instance
```
sudo systemctl enable es-tunnel.service
sudo systemctl enable lbcd-tunnel.service
sudo systemctl start es-tunnel.service
sudo systemctl start lbcd-tunnel.service
```
- Build the scribe docker image on the scribe hub instance by running the following:
```
git clone https://github.com/lbryio/scribe.git
cd scribe
docker build -f ./docker/Dockerfile.scribe -t lbry/scribe:development .
```
- Copy the following to `~/docker-compose.yml` on the scribe instance
```
version: "3"
volumes:
lbry_rocksdb:
services:
scribe:
depends_on:
- scribe_elastic_sync
image: lbry/scribe:${SCRIBE_TAG:-development}
restart: always
network_mode: host
volumes:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe
command:
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--max_query_workers=2"
- "--cache_all_tx_hashes"
scribe_elastic_sync:
image: lbry/scribe:${SCRIBE_TAG:-development}
restart: always
network_mode: host
ports:
- "127.0.0.1:19080:19080" # elastic notifier port
volumes:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe-elastic-sync
command:
- "--elastic_host=127.0.0.1"
- "--elastic_port=9200"
- "--max_query_workers=2"
- "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8"
- "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6"
scribe_hub:
depends_on:
- scribe_elastic_sync
- scribe
image: lbry/scribe:${SCRIBE_TAG:-development}
restart: always
network_mode: host
ports:
- "50001:50001" # electrum rpc port and udp ping port
- "2112:2112" # comment out to disable prometheus metrics
volumes:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe-hub
command:
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--elastic_host=127.0.0.1"
- "--elastic_port=9200"
- "--max_query_workers=4"
- "--host=0.0.0.0"
- "--max_sessions=100000"
- "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8"
- "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6"
- "--prometheus_port=2112" # comment out to disable prometheus metrics
```
- Start the scribe hub services by running `docker-compose up -d`
- Check the status with `docker-compose logs -f --tail 100`

View file

@ -39,18 +39,8 @@ RUN python3.9 docker/set_build.py
RUN rm ~/.cache -rf RUN rm ~/.cache -rf
# entry point # entry point
ARG host=localhost
ARG tcp_port=50001
ARG daemon_url=http://lbry:lbry@localhost:9245/
VOLUME $db_dir VOLUME $db_dir
ENV TCP_PORT=$tcp_port
ENV HOST=$host
ENV DAEMON_URL=$daemon_url
ENV DB_DIRECTORY=$db_dir ENV DB_DIRECTORY=$db_dir
ENV MAX_SESSIONS=100000
ENV MAX_SEND=1000000000000000000
ENV MAX_RECEIVE=1000000000000000000
COPY ./docker/scribe_entrypoint.sh /entrypoint.sh COPY ./docker/scribe_entrypoint.sh /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"] ENTRYPOINT ["/entrypoint.sh"]

View file

@ -1,28 +1,30 @@
version: "3" version: "3"
volumes: volumes:
lbcd:
lbry_rocksdb: lbry_rocksdb:
es01: es01:
services: services:
scribe: scribe:
depends_on: depends_on:
- lbcd
- scribe_elastic_sync - scribe_elastic_sync
image: lbry/scribe:${SCRIBE_TAG:-latest-release} image: lbry/scribe:${SCRIBE_TAG:-latest}
restart: always restart: always
network_mode: host network_mode: host
volumes: volumes:
- "lbry_rocksdb:/database" - "lbry_rocksdb:/database"
environment: environment:
- HUB_COMMAND=scribe - HUB_COMMAND=scribe
- DAEMON_URL=http://lbry:lbry@127.0.0.1:9245 command: # for full options, see `scribe --help`
- MAX_QUERY_WORKERS=2 - "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- FILTERING_CHANNEL_IDS=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8 - "--max_query_workers=2"
- BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6 # - "--cache_all_tx_hashes" # uncomment to keep an index of all tx hashes in memory. This uses lots (10+g) of memory but substantially improves performance.
scribe_elastic_sync: scribe_elastic_sync:
depends_on: depends_on:
- es01 - es01
image: lbry/scribe:${SCRIBE_TAG:-latest-release} image: lbry/scribe:${SCRIBE_TAG:-latest}
restart: always restart: always
network_mode: host network_mode: host
ports: ports:
@ -31,36 +33,42 @@ services:
- "lbry_rocksdb:/database" - "lbry_rocksdb:/database"
environment: environment:
- HUB_COMMAND=scribe-elastic-sync - HUB_COMMAND=scribe-elastic-sync
- MAX_QUERY_WORKERS=2 command: # for full options, see `scribe-elastic-sync --help`
- ELASTIC_HOST=127.0.0.1 - "--max_query_workers=2"
- ELASTIC_PORT=9200 - "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8"
- FILTERING_CHANNEL_IDS=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8 - "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6"
- BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6 - "--elastic_host=127.0.0.1" # elasticsearch host
- "--elastic_port=9200" # elasticsearch port
- "--elastic_notifier_host=127.0.0.1" # address for the elastic sync notifier to connect to
- "--elastic_notifier_port=19080"
scribe_hub: scribe_hub:
depends_on: depends_on:
- lbcd
- scribe_elastic_sync - scribe_elastic_sync
- scribe - scribe
image: lbry/scribe:${SCRIBE_TAG:-latest-release} image: lbry/scribe:${SCRIBE_TAG:-latest}
restart: always restart: always
network_mode: host network_mode: host
ports: ports:
- "50001:50001" # electrum rpc port and udp ping port - "50001:50001" # electrum rpc port and udp ping port
- "2112:2112" # comment out to disable prometheus - "2112:2112" # comment out to disable prometheus metrics
volumes: volumes:
- "lbry_rocksdb:/database" - "lbry_rocksdb:/database"
environment: environment:
- HUB_COMMAND=scribe-hub - HUB_COMMAND=scribe-hub
- DAEMON_URL=http://lbry:lbry@127.0.0.1:9245 # used for broadcasting transactions command: # for full options, see `scribe-hub --help`
- MAX_QUERY_WORKERS=4 # reader threads - "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- MAX_SESSIONS=100000 - "--max_query_workers=4"
- ELASTIC_HOST=127.0.0.1 - "--host=0.0.0.0"
- ELASTIC_PORT=9200 - "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8"
- HOST=0.0.0.0 - "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6"
- PROMETHEUS_PORT=2112 - "--prometheus_port=2112" # comment out to disable prometheus metrics
- TCP_PORT=50001 # - "--elastic_host=127.0.0.1" # elasticsearch host
- ALLOW_LAN_UDP=No # - "--elastic_port=9200" # elasticsearch port
- FILTERING_CHANNEL_IDS=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8 # - "--elastic_notifier_host=127.0.0.1" # address for the elastic sync notifier to connect to
- BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6 # - "--elastic_notifier_port=19080"
# - "--max_sessions=100000 # uncomment to increase the maximum number of electrum connections, defaults to 1000
# - "--allow_lan_udp" # uncomment to reply to clients on the local network
es01: es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.0 image: docker.elastic.co/elasticsearch/elasticsearch:7.16.0
container_name: es01 container_name: es01
@ -78,3 +86,17 @@ services:
- "es01:/usr/share/elasticsearch/data" - "es01:/usr/share/elasticsearch/data"
ports: ports:
- "127.0.0.1:9200:9200" - "127.0.0.1:9200:9200"
lbcd:
image: lbry/lbcd:latest
restart: always
network_mode: host
command:
- "--notls"
- "--listen=0.0.0.0:9246"
- "--rpclisten=127.0.0.1:9245"
- "--rpcuser=lbry"
- "--rpcpass=lbry"
volumes:
- "lbcd:/root/.lbcd"
ports:
- "9246:9246" # p2p

View file

@ -0,0 +1,23 @@
version: "3"
volumes:
es01:
services:
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.0
container_name: es01
environment:
- node.name=es01
- discovery.type=single-node
- indices.query.bool.max_clause_count=8192
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Dlog4j2.formatMsgNoLookups=true -Xms8g -Xmx8g" # no more than 32, remember to disable swap
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- "es01:/usr/share/elasticsearch/data"
ports:
- "127.0.0.1:9200:9200"

60
docker/hub-compose.yml Normal file
View file

@ -0,0 +1,60 @@
version: "3"
volumes:
lbry_rocksdb:
services:
scribe:
depends_on:
- scribe_elastic_sync
image: lbry/scribe:${SCRIBE_TAG:-development}
restart: always
network_mode: host
volumes:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe
command:
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--max_query_workers=2"
- "--cache_all_tx_hashes"
scribe_elastic_sync:
image: lbry/scribe:${SCRIBE_TAG:-development}
restart: always
network_mode: host
ports:
- "127.0.0.1:19080:19080" # elastic notifier port
volumes:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe-elastic-sync
command:
- "--elastic_host=127.0.0.1"
- "--elastic_port=9200"
- "--max_query_workers=2"
- "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8"
- "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6"
scribe_hub:
depends_on:
- scribe_elastic_sync
- scribe
image: lbry/scribe:${SCRIBE_TAG:-development}
restart: always
network_mode: host
ports:
- "50001:50001" # electrum rpc port and udp ping port
- "2112:2112" # comment out to disable prometheus metrics
volumes:
- "lbry_rocksdb:/database"
environment:
- HUB_COMMAND=scribe-hub
command:
- "--daemon_url=http://lbry:lbry@127.0.0.1:9245"
- "--elastic_host=127.0.0.1"
- "--elastic_port=9200"
- "--max_query_workers=4"
- "--host=0.0.0.0"
- "--max_sessions=100000"
- "--filtering_channel_ids=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8"
- "--blocking_channel_ids=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6"
- "--prometheus_port=2112" # comment out to disable prometheus metrics

19
docker/lbcd-compose.yml Normal file
View file

@ -0,0 +1,19 @@
version: "3"
volumes:
lbcd:
services:
lbcd:
image: lbry/lbcd:latest
restart: always
network_mode: host
command:
- "--rpcuser=lbry"
- "--rpcpass=lbry"
- "--rpclisten=127.0.0.1"
volumes:
- "lbcd:/root/.lbcd"
ports:
- "127.0.0.1:9245:9245"
- "9246:9246" # p2p port

View file

@ -12,6 +12,6 @@ fi
case "$HUB_COMMAND" in case "$HUB_COMMAND" in
scribe ) exec /home/lbry/.local/bin/scribe "$@" ;; scribe ) exec /home/lbry/.local/bin/scribe "$@" ;;
scribe-hub ) exec /home/lbry/.local/bin/scribe-hub "$@" ;; scribe-hub ) exec /home/lbry/.local/bin/scribe-hub "$@" ;;
scribe-elastic-sync ) exec /home/lbry/.local/bin/scribe-elastic-sync ;; scribe-elastic-sync ) exec /home/lbry/.local/bin/scribe-elastic-sync "$@" ;;
* ) "HUB_COMMAND env variable must be scribe, scribe-hub, or scribe-elastic-sync" && exit 1 ;; * ) "HUB_COMMAND env variable must be scribe, scribe-hub, or scribe-elastic-sync" && exit 1 ;;
esac esac

View file

@ -2,8 +2,8 @@ import os
import logging import logging
import traceback import traceback
import argparse import argparse
from scribe.env import Env
from scribe.common import setup_logging from scribe.common import setup_logging
from scribe.blockchain.env import BlockchainEnv
from scribe.blockchain.service import BlockchainProcessorService from scribe.blockchain.service import BlockchainProcessorService
@ -11,11 +11,11 @@ def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog='scribe' prog='scribe'
) )
Env.contribute_to_arg_parser(parser) BlockchainEnv.contribute_to_arg_parser(parser)
args = parser.parse_args() args = parser.parse_args()
try: try:
env = Env.from_arg_parser(args) env = BlockchainEnv.from_arg_parser(args)
setup_logging(os.path.join(env.db_dir, 'scribe.log')) setup_logging(os.path.join(env.db_dir, 'scribe.log'))
block_processor = BlockchainProcessorService(env) block_processor = BlockchainProcessorService(env)
block_processor.run() block_processor.run()

33
scribe/blockchain/env.py Normal file
View file

@ -0,0 +1,33 @@
from scribe.env import Env
class BlockchainEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
blocking_channel_ids=None, filtering_channel_ids=None,
db_max_open_files=64, daemon_url=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids)
self.db_max_open_files = db_max_open_files
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
@classmethod
def contribute_to_arg_parser(cls, parser):
super().contribute_to_arg_parser(parser)
env_daemon_url = cls.default('DAEMON_URL', None)
parser.add_argument('--daemon_url', required=env_daemon_url is None,
help="URL for rpc from lbrycrd or lbcd, "
"<rpcuser>:<rpcpassword>@<lbrycrd rpc ip><lbrycrd rpc port>.",
default=env_daemon_url)
parser.add_argument('--db_max_open_files', type=int, default=64,
help='This setting translates into the max_open_files option given to rocksdb. '
'A higher number will use more memory. Defaults to 64.')
@classmethod
def from_arg_parser(cls, args):
return cls(
db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files,
max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit,
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos
)

View file

@ -19,7 +19,7 @@ from scribe.blockchain.mempool import MemPool
from scribe.schema.url import normalize_name from scribe.schema.url import normalize_name
from scribe.service import BlockchainService from scribe.service import BlockchainService
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from scribe.env import Env from scribe.blockchain.env import BlockchainEnv
from scribe.db.revertable import RevertableOpStack from scribe.db.revertable import RevertableOpStack
@ -43,7 +43,7 @@ class BlockchainProcessorService(BlockchainService):
"reorg_count", "Number of reorgs", namespace=NAMESPACE "reorg_count", "Number of reorgs", namespace=NAMESPACE
) )
def __init__(self, env: 'Env'): def __init__(self, env: 'BlockchainEnv'):
super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor') super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor')
self.daemon = LBCDaemon(env.coin, env.daemon_url) self.daemon = LBCDaemon(env.coin, env.daemon_url)
self.mempool = MemPool(env.coin, self.db) self.mempool = MemPool(env.coin, self.db)

View file

@ -36,7 +36,7 @@ NAMESPACE = f"{PROMETHEUS_NAMESPACE}_db"
class HubDB: class HubDB:
DB_VERSIONS = [7, 8] DB_VERSIONS = [7, 8]
def __init__(self, coin, db_dir: str, cache_MB: int = 512, reorg_limit: int = 200, def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
secondary_name: str = '', max_open_files: int = 64, blocking_channel_ids: List[str] = None, secondary_name: str = '', max_open_files: int = 64, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None): filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None):
@ -45,7 +45,6 @@ class HubDB:
self._executor = executor self._executor = executor
self._db_dir = db_dir self._db_dir = db_dir
self._cache_MB = cache_MB
self._reorg_limit = reorg_limit self._reorg_limit = reorg_limit
self._cache_all_claim_txos = cache_all_claim_txos self._cache_all_claim_txos = cache_all_claim_txos
self._cache_all_tx_hashes = cache_all_tx_hashes self._cache_all_tx_hashes = cache_all_tx_hashes
@ -820,8 +819,7 @@ class HubDB:
) )
db_path = os.path.join(self._db_dir, 'lbry-rocksdb') db_path = os.path.join(self._db_dir, 'lbry-rocksdb')
self.prefix_db = PrefixDB( self.prefix_db = PrefixDB(
db_path, cache_mb=self._cache_MB, db_path, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files,
reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files,
unsafe_prefixes={DBStatePrefixRow.prefix, MempoolTXPrefixRow.prefix, HashXMempoolStatusPrefixRow.prefix}, unsafe_prefixes={DBStatePrefixRow.prefix, MempoolTXPrefixRow.prefix, HashXMempoolStatusPrefixRow.prefix},
secondary_path=secondary_path secondary_path=secondary_path
) )

View file

@ -1694,7 +1694,7 @@ class HashXMempoolStatusPrefixRow(PrefixRow):
class PrefixDB(BasePrefixDB): class PrefixDB(BasePrefixDB):
def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 64, def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64,
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None): secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None):
super().__init__(path, max_open_files=max_open_files, secondary_path=secondary_path, super().__init__(path, max_open_files=max_open_files, secondary_path=secondary_path,
max_undo_depth=reorg_limit, unsafe_prefixes=unsafe_prefixes) max_undo_depth=reorg_limit, unsafe_prefixes=unsafe_prefixes)

View file

@ -2,8 +2,8 @@ import os
import logging import logging
import traceback import traceback
import argparse import argparse
from scribe.env import Env
from scribe.common import setup_logging from scribe.common import setup_logging
from scribe.elasticsearch.env import ElasticEnv
from scribe.elasticsearch.service import ElasticSyncService from scribe.elasticsearch.service import ElasticSyncService
@ -11,12 +11,11 @@ def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog='scribe-elastic-sync' prog='scribe-elastic-sync'
) )
Env.contribute_to_arg_parser(parser) ElasticEnv.contribute_to_arg_parser(parser)
parser.add_argument('--reindex', type=bool, default=False)
args = parser.parse_args() args = parser.parse_args()
try: try:
env = Env.from_arg_parser(args) env = ElasticEnv.from_arg_parser(args)
setup_logging(os.path.join(env.db_dir, 'scribe-elastic-sync.log')) setup_logging(os.path.join(env.db_dir, 'scribe-elastic-sync.log'))
server = ElasticSyncService(env) server = ElasticSyncService(env)
server.run(args.reindex) server.run(args.reindex)

View file

@ -0,0 +1,54 @@
from scribe.env import Env
class ElasticEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None,
cache_all_tx_hashes=None, cache_all_claim_txos=None, elastic_host=None, elastic_port=None,
es_index_prefix=None, elastic_notifier_host=None, elastic_notifier_port=None,
blocking_channel_ids=None, filtering_channel_ids=None, reindex=False):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids)
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default(
'ELASTIC_NOTIFIER_HOST', 'localhost')
self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer(
'ELASTIC_NOTIFIER_PORT', 19080)
self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '')
# Filtering / Blocking
self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default(
'BLOCKING_CHANNEL_IDS', '').split(' ')
self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default(
'FILTERING_CHANNEL_IDS', '').split(' ')
self.reindex = reindex if reindex is not None else self.boolean('REINDEX_ES', False)
@classmethod
def contribute_to_arg_parser(cls, parser):
super().contribute_to_arg_parser(parser)
parser.add_argument('--reindex', default=False, help="Drop and rebuild the elasticsearch index.",
action='store_true')
parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str,
help="Hostname or ip address of the elasticsearch instance to connect to. "
"Can be set in env with 'ELASTIC_HOST'")
parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int,
help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'")
parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'),
type=str, help='elasticsearch sync notifier host, defaults to localhost')
parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int,
help='elasticsearch sync notifier port')
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
parser.add_argument('--query_timeout_ms', type=int, default=cls.integer('QUERY_TIMEOUT_MS', 10000),
help="Elasticsearch query timeout, in ms. Can be set in env with 'QUERY_TIMEOUT_MS'")
@classmethod
def from_arg_parser(cls, args):
return cls(
db_dir=args.db_dir, elastic_host=args.elastic_host,
elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain,
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit,
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos, blocking_channel_ids=args.blocking_channel_ids,
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
elastic_notifier_port=args.elastic_notifier_port
)

View file

@ -13,13 +13,16 @@ from scribe.elasticsearch.notifier_protocol import ElasticNotifierProtocol
from scribe.elasticsearch.search import IndexVersionMismatch, expand_query from scribe.elasticsearch.search import IndexVersionMismatch, expand_query
from scribe.elasticsearch.constants import ALL_FIELDS, INDEX_DEFAULT_SETTINGS from scribe.elasticsearch.constants import ALL_FIELDS, INDEX_DEFAULT_SETTINGS
from scribe.elasticsearch.fast_ar_trending import FAST_AR_TRENDING_SCRIPT from scribe.elasticsearch.fast_ar_trending import FAST_AR_TRENDING_SCRIPT
if typing.TYPE_CHECKING:
from scribe.elasticsearch.env import ElasticEnv
class ElasticSyncService(BlockchainReaderService): class ElasticSyncService(BlockchainReaderService):
VERSION = 1 VERSION = 1
def __init__(self, env): def __init__(self, env: 'ElasticEnv'):
super().__init__(env, 'lbry-elastic-writer', thread_workers=1, thread_prefix='lbry-elastic-writer') super().__init__(env, 'lbry-elastic-writer', thread_workers=1, thread_prefix='lbry-elastic-writer')
self.env = env
# self._refresh_interval = 0.1 # self._refresh_interval = 0.1
self._task = None self._task = None
self.index = self.env.es_index_prefix + 'claims' self.index = self.env.es_index_prefix + 'claims'

View file

@ -3,7 +3,6 @@ import re
import resource import resource
import logging import logging
from collections import namedtuple from collections import namedtuple
from ipaddress import ip_address
from scribe.blockchain.network import LBCMainNet, LBCTestNet, LBCRegTest from scribe.blockchain.network import LBCMainNet, LBCTestNet, LBCRegTest
@ -30,31 +29,12 @@ class Env:
class Error(Exception): class Error(Exception):
pass pass
def __init__(self, db_dir=None, daemon_url=None, host=None, elastic_host=None, def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
elastic_port=None, loop_policy=None, max_query_workers=None, prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
chain=None, es_index_prefix=None, cache_MB=None, reorg_limit=None, tcp_port=None, blocking_channel_ids=None, filtering_channel_ids=None):
udp_port=None, ssl_port=None, ssl_certfile=None, ssl_keyfile=None,
prometheus_port=None, max_subscriptions=None, banner_file=None, anon_logs=None, log_sessions=None,
allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None,
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
session_timeout=None, drop_client=None, description=None, daily_fee=None,
database_query_timeout=None, db_max_open_files=64, elastic_notifier_host=None,
elastic_notifier_port=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None,
peer_announce=None):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY') self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY')
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.db_max_open_files = db_max_open_files
self.host = host if host is not None else self.default('HOST', 'localhost')
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default('ELASTIC_NOTIFIER_HOST', 'localhost')
self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer('ELASTIC_NOTIFIER_PORT', 19080)
self.loop_policy = self.set_event_loop_policy(
loop_policy if loop_policy is not None else self.default('EVENT_LOOP_POLICY', None)
)
self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK']) self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK'])
self.max_query_workers = max_query_workers if max_query_workers is not None else self.integer('MAX_QUERY_WORKERS', 4) self.max_query_workers = max_query_workers if max_query_workers is not None else self.integer('MAX_QUERY_WORKERS', 4)
if chain == 'mainnet': if chain == 'mainnet':
@ -63,61 +43,15 @@ class Env:
self.coin = LBCTestNet self.coin = LBCTestNet
else: else:
self.coin = LBCRegTest self.coin = LBCRegTest
self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '')
self.cache_MB = cache_MB if cache_MB is not None else self.integer('CACHE_MB', 1024)
self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
# Server stuff
self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None)
self.udp_port = udp_port if udp_port is not None else self.integer('UDP_PORT', self.tcp_port)
self.ssl_port = ssl_port if ssl_port is not None else self.integer('SSL_PORT', None)
if self.ssl_port:
self.ssl_certfile = ssl_certfile if ssl_certfile is not None else self.required('SSL_CERTFILE')
self.ssl_keyfile = ssl_keyfile if ssl_keyfile is not None else self.required('SSL_KEYFILE')
self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0) self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0)
self.max_subscriptions = max_subscriptions if max_subscriptions is not None else self.integer('MAX_SUBSCRIPTIONS', 10000)
self.banner_file = banner_file if banner_file is not None else self.default('BANNER_FILE', None)
# self.tor_banner_file = self.default('TOR_BANNER_FILE', self.banner_file)
self.anon_logs = anon_logs if anon_logs is not None else self.boolean('ANON_LOGS', False)
self.log_sessions = log_sessions if log_sessions is not None else self.integer('LOG_SESSIONS', 3600)
self.allow_lan_udp = allow_lan_udp if allow_lan_udp is not None else self.boolean('ALLOW_LAN_UDP', False)
self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False) self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False)
self.cache_all_claim_txos = cache_all_claim_txos if cache_all_claim_txos is not None else self.boolean('CACHE_ALL_CLAIM_TXOS', False) self.cache_all_claim_txos = cache_all_claim_txos if cache_all_claim_txos is not None else self.boolean('CACHE_ALL_CLAIM_TXOS', False)
self.country = country if country is not None else self.default('COUNTRY', 'US')
# Peer discovery
self.peer_discovery = self.peer_discovery_enum()
self.peer_announce = peer_announce if peer_announce is not None else self.boolean('PEER_ANNOUNCE', True)
if peer_hubs is not None:
self.peer_hubs = [p.strip("") for p in peer_hubs.split(",")]
else:
self.peer_hubs = self.extract_peer_hubs()
# self.tor_proxy_host = self.default('TOR_PROXY_HOST', 'localhost')
# self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None)
# The electrum client takes the empty string as unspecified
self.payment_address = payment_address if payment_address is not None else self.default('PAYMENT_ADDRESS', '')
self.donation_address = donation_address if donation_address is not None else self.default('DONATION_ADDRESS', '')
# Server limits to help prevent DoS
self.max_send = max_send if max_send is not None else self.integer('MAX_SEND', 1000000)
self.max_receive = max_receive if max_receive is not None else self.integer('MAX_RECEIVE', 1000000)
# self.max_subs = self.integer('MAX_SUBS', 250000)
self.max_sessions = max_sessions if max_sessions is not None else self.sane_max_sessions()
# self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000)
self.session_timeout = session_timeout if session_timeout is not None else self.integer('SESSION_TIMEOUT', 600)
self.drop_client = drop_client if drop_client is not None else self.custom("DROP_CLIENT", None, re.compile)
self.description = description if description is not None else self.default('DESCRIPTION', '')
self.daily_fee = daily_fee if daily_fee is not None else self.string_amount('DAILY_FEE', '0')
# Identities
clearnet_identity = self.clearnet_identity()
tor_identity = self.tor_identity(clearnet_identity)
self.identities = [identity
for identity in (clearnet_identity, tor_identity)
if identity is not None]
self.database_query_timeout = database_query_timeout if database_query_timeout is not None else \
(float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0)
# Filtering / Blocking # Filtering / Blocking
self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default('BLOCKING_CHANNEL_IDS', '').split(' ') self.blocking_channel_ids = blocking_channel_ids if blocking_channel_ids is not None else self.default(
self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default('FILTERING_CHANNEL_IDS', '').split(' ') 'BLOCKING_CHANNEL_IDS', '').split(' ')
self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default(
'FILTERING_CHANNEL_IDS', '').split(' ')
@classmethod @classmethod
def default(cls, envvar, default): def default(cls, envvar, default):
@ -169,19 +103,6 @@ class Env:
if bad: if bad:
raise cls.Error(f'remove obsolete os.environment variables {bad}') raise cls.Error(f'remove obsolete os.environment variables {bad}')
@classmethod
def set_event_loop_policy(cls, policy_name: str = None):
if not policy_name or policy_name == 'default':
import asyncio
return asyncio.get_event_loop_policy()
elif policy_name == 'uvloop':
import uvloop
import asyncio
loop_policy = uvloop.EventLoopPolicy()
asyncio.set_event_loop_policy(loop_policy)
return loop_policy
raise cls.Error(f'unknown event loop policy "{policy_name}"')
def cs_host(self): def cs_host(self):
"""Returns the 'host' argument to pass to asyncio's create_server """Returns the 'host' argument to pass to asyncio's create_server
call. The result can be a single host name string, a list of call. The result can be a single host name string, a list of
@ -213,67 +134,6 @@ class Env:
f'because your open file limit is {nofile_limit:,d}') f'because your open file limit is {nofile_limit:,d}')
return value return value
def clearnet_identity(self):
host = self.default('REPORT_HOST', None)
if host is None:
return None
try:
ip = ip_address(host)
except ValueError:
bad = (not is_valid_hostname(host)
or host.lower() == 'localhost')
else:
bad = (ip.is_multicast or ip.is_unspecified
or (ip.is_private and self.peer_announce))
if bad:
raise self.Error(f'"{host}" is not a valid REPORT_HOST')
tcp_port = self.integer('REPORT_TCP_PORT', self.tcp_port) or None
ssl_port = self.integer('REPORT_SSL_PORT', self.ssl_port) or None
if tcp_port == ssl_port:
raise self.Error('REPORT_TCP_PORT and REPORT_SSL_PORT '
f'both resolve to {tcp_port}')
return NetIdentity(
host,
tcp_port,
ssl_port,
''
)
def tor_identity(self, clearnet):
host = self.default('REPORT_HOST_TOR', None)
if host is None:
return None
if not host.endswith('.onion'):
raise self.Error(f'tor host "{host}" must end with ".onion"')
def port(port_kind):
"""Returns the clearnet identity port, if any and not zero,
otherwise the listening port."""
result = 0
if clearnet:
result = getattr(clearnet, port_kind)
return result or getattr(self, port_kind)
tcp_port = self.integer('REPORT_TCP_PORT_TOR',
port('tcp_port')) or None
ssl_port = self.integer('REPORT_SSL_PORT_TOR',
port('ssl_port')) or None
if tcp_port == ssl_port:
raise self.Error('REPORT_TCP_PORT_TOR and REPORT_SSL_PORT_TOR '
f'both resolve to {tcp_port}')
return NetIdentity(
host,
tcp_port,
ssl_port,
'_tor',
)
def hosts_dict(self):
return {identity.host: {'tcp_port': identity.tcp_port,
'ssl_port': identity.ssl_port}
for identity in self.identities}
def peer_discovery_enum(self): def peer_discovery_enum(self):
pd = self.default('PEER_DISCOVERY', 'on').strip().lower() pd = self.default('PEER_DISCOVERY', 'on').strip().lower()
if pd in ('off', ''): if pd in ('off', ''):
@ -291,98 +151,43 @@ class Env:
@classmethod @classmethod
def contribute_to_arg_parser(cls, parser): def contribute_to_arg_parser(cls, parser):
parser.add_argument('--db_dir', type=str, help='path of the directory containing lbry-leveldb', """
default=cls.default('DB_DIRECTORY', None)) Settings used by all services
parser.add_argument('--daemon_url', """
help='URL for rpc from lbrycrd, <rpcuser>:<rpcpassword>@<lbrycrd rpc ip><lbrycrd rpc port>',
default=cls.default('DAEMON_URL', None))
parser.add_argument('--db_max_open_files', type=int, default=64,
help='number of files rocksdb can have open at a time')
parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'),
help='Interface for hub server to listen on')
parser.add_argument('--tcp_port', type=int, default=cls.integer('TCP_PORT', 50001),
help='TCP port to listen on for hub server')
parser.add_argument('--udp_port', type=int, default=cls.integer('UDP_PORT', 50001),
help='UDP port to listen on for hub server')
env_db_dir = cls.default('DB_DIRECTORY', None)
parser.add_argument('--ssl_port', default=cls.integer('SSL_PORT', None), type=int, parser.add_argument('--db_dir', type=str, required=env_db_dir is None,
help='SSL port to listen on for hub server') help="Path of the directory containing lbry-rocksdb. ", default=env_db_dir)
parser.add_argument('--ssl_certfile', default=cls.default('SSL_CERTFILE', None), type=str,
help='Path to SSL cert file')
parser.add_argument('--ssl_keyfile', default=cls.default('SSL_KEYFILE', None), type=str,
help='Path to SSL key file')
parser.add_argument('--reorg_limit', default=cls.integer('REORG_LIMIT', 200), type=int, help='Max reorg depth') parser.add_argument('--reorg_limit', default=cls.integer('REORG_LIMIT', 200), type=int, help='Max reorg depth')
parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str,
help='elasticsearch host, defaults to localhost')
parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int,
help='elasticsearch port, defaults to 9200')
parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'),
type=str, help='elasticsearch sync notifier host, defaults to localhost')
parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int,
help='elasticsearch sync notifier port')
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
parser.add_argument('--loop_policy', default=cls.default('EVENT_LOOP_POLICY', 'default'), type=str,
choices=['default', 'uvloop'])
parser.add_argument('--max_query_workers', type=int, default=cls.integer('MAX_QUERY_WORKERS', 4),
help='number of threads used by the request handler to read the database')
parser.add_argument('--cache_MB', type=int, default=cls.integer('CACHE_MB', 1024),
help='size of the leveldb lru cache, in megabytes')
parser.add_argument('--cache_all_tx_hashes', type=bool,
help='Load all tx hashes into memory. This will make address subscriptions and sync, '
'resolve, transaction fetching, and block sync all faster at the expense of higher '
'memory usage')
parser.add_argument('--cache_all_claim_txos', type=bool,
help='Load all claim txos into memory. This will make address subscriptions and sync, '
'resolve, transaction fetching, and block sync all faster at the expense of higher '
'memory usage')
parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0),
help='port for hub prometheus metrics to listen on, disabled by default')
parser.add_argument('--max_subscriptions', type=int, default=cls.integer('MAX_SUBSCRIPTIONS', 10000),
help='max subscriptions per connection')
parser.add_argument('--banner_file', type=str, default=cls.default('BANNER_FILE', None),
help='path to file containing banner text')
parser.add_argument('--anon_logs', type=bool, default=cls.boolean('ANON_LOGS', False),
help="don't log ip addresses")
parser.add_argument('--allow_lan_udp', type=bool, default=cls.boolean('ALLOW_LAN_UDP', False),
help='reply to hub UDP ping messages from LAN ip addresses')
parser.add_argument('--country', type=str, default=cls.default('COUNTRY', 'US'), help='')
parser.add_argument('--max_send', type=int, default=cls.default('MAX_SEND', 1000000), help='')
parser.add_argument('--max_receive', type=int, default=cls.default('MAX_RECEIVE', 1000000), help='')
parser.add_argument('--max_sessions', type=int, default=cls.default('MAX_SESSIONS', 1000), help='')
parser.add_argument('--session_timeout', type=int, default=cls.default('SESSION_TIMEOUT', 600), help='')
parser.add_argument('--drop_client', type=str, default=cls.default('DROP_CLIENT', None), help='')
parser.add_argument('--description', type=str, default=cls.default('DESCRIPTION', ''), help='')
parser.add_argument('--daily_fee', type=float, default=cls.default('DAILY_FEE', 0.0), help='')
parser.add_argument('--payment_address', type=str, default=cls.default('PAYMENT_ADDRESS', ''), help='')
parser.add_argument('--donation_address', type=str, default=cls.default('DONATION_ADDRESS', ''), help='')
parser.add_argument('--chain', type=str, default=cls.default('NET', 'mainnet'), parser.add_argument('--chain', type=str, default=cls.default('NET', 'mainnet'),
help="Which chain to use, default is mainnet", choices=['mainnet', 'regtest', 'testnet']) help="Which chain to use, default is mainnet, others are used for testing",
parser.add_argument('--query_timeout_ms', type=int, default=cls.integer('QUERY_TIMEOUT_MS', 10000), choices=['mainnet', 'regtest', 'testnet'])
help="elasticsearch query timeout") parser.add_argument('--max_query_workers', type=int, default=cls.integer('MAX_QUERY_WORKERS', 4),
help="Size of the thread pool. Can be set in env with 'MAX_QUERY_WORKERS'")
parser.add_argument('--blocking_channel_ids', nargs='*', help='', parser.add_argument('--cache_all_tx_hashes', action='store_true',
help="Load all tx hashes into memory. This will make address subscriptions and sync, "
"resolve, transaction fetching, and block sync all faster at the expense of higher "
"memory usage (at least 10GB more). Can be set in env with 'CACHE_ALL_TX_HASHES'.",
default=cls.boolean('CACHE_ALL_TX_HASHES', False))
parser.add_argument('--cache_all_claim_txos', action='store_true',
help="Load all claim txos into memory. This will make address subscriptions and sync, "
"resolve, transaction fetching, and block sync all faster at the expense of higher "
"memory usage. Can be set in env with 'CACHE_ALL_CLAIM_TXOS'.",
default=cls.boolean('CACHE_ALL_CLAIM_TXOS', False))
parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0),
help="Port for prometheus metrics to listen on, disabled by default. "
"Can be set in env with 'PROMETHEUS_PORT'.")
parser.add_argument('--blocking_channel_ids', nargs='*',
help="Space separated list of channel claim ids used for blocking. "
"Claims that are reposted by these channels can't be resolved "
"or returned in search results. Can be set in env with 'BLOCKING_CHANNEL_IDS'",
default=cls.default('BLOCKING_CHANNEL_IDS', '').split(' ')) default=cls.default('BLOCKING_CHANNEL_IDS', '').split(' '))
parser.add_argument('--filtering_channel_ids', nargs='*', help='', parser.add_argument('--filtering_channel_ids', nargs='*',
help="Space separated list of channel claim ids used for blocking. "
"Claims that are reposted by these channels aren't returned in search results. "
"Can be set in env with 'FILTERING_CHANNEL_IDS'",
default=cls.default('FILTERING_CHANNEL_IDS', '').split(' ')) default=cls.default('FILTERING_CHANNEL_IDS', '').split(' '))
@classmethod @classmethod
def from_arg_parser(cls, args): def from_arg_parser(cls, args):
return cls( raise NotImplementedError()
db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files,
host=args.host, elastic_host=args.elastic_host, elastic_port=args.elastic_port,
loop_policy=args.loop_policy, max_query_workers=args.max_query_workers,
chain=args.chain, es_index_prefix=args.es_index_prefix,
cache_MB=args.cache_MB, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
udp_port=args.udp_port, ssl_port=args.ssl_port, ssl_certfile=args.ssl_certfile,
ssl_keyfile=args.ssl_keyfile, prometheus_port=args.prometheus_port,
max_subscriptions=args.max_subscriptions, banner_file=args.banner_file, anon_logs=args.anon_logs,
log_sessions=None, allow_lan_udp=args.allow_lan_udp,
cache_all_tx_hashes=args.cache_all_tx_hashes, cache_all_claim_txos=args.cache_all_claim_txos,
country=args.country, payment_address=args.payment_address, donation_address=args.donation_address,
max_send=args.max_send, max_receive=args.max_receive, max_sessions=args.max_sessions,
session_timeout=args.session_timeout, drop_client=args.drop_client, description=args.description,
daily_fee=args.daily_fee, database_query_timeout=(args.query_timeout_ms / 1000),
blocking_channel_ids=args.blocking_channel_ids, filtering_channel_ids=args.filtering_channel_ids,
elastic_notifier_host=args.elastic_notifier_host, elastic_notifier_port=args.elastic_notifier_port
)

View file

@ -2,8 +2,8 @@ import os
import logging import logging
import traceback import traceback
import argparse import argparse
from scribe.env import Env
from scribe.common import setup_logging from scribe.common import setup_logging
from scribe.hub.env import ServerEnv
from scribe.hub.service import HubServerService from scribe.hub.service import HubServerService
@ -11,11 +11,10 @@ def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog='scribe-hub' prog='scribe-hub'
) )
Env.contribute_to_arg_parser(parser) ServerEnv.contribute_to_arg_parser(parser)
args = parser.parse_args() args = parser.parse_args()
try: try:
env = Env.from_arg_parser(args) env = ServerEnv.from_arg_parser(args)
setup_logging(os.path.join(env.db_dir, 'scribe-hub.log')) setup_logging(os.path.join(env.db_dir, 'scribe-hub.log'))
server = HubServerService(env) server = HubServerService(env)
server.run() server.run()

113
scribe/hub/env.py Normal file
View file

@ -0,0 +1,113 @@
import re
from scribe.env import Env
class ServerEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
daemon_url=None, host=None, elastic_host=None, elastic_port=None, es_index_prefix=None,
tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None,
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
session_timeout=None, drop_client=None, description=None, daily_fee=None,
database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None,
blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids)
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.host = host if host is not None else self.default('HOST', 'localhost')
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default(
'ELASTIC_NOTIFIER_HOST', 'localhost')
self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer(
'ELASTIC_NOTIFIER_PORT', 19080)
self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '')
# Server stuff
self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None)
self.udp_port = udp_port if udp_port is not None else self.integer('UDP_PORT', self.tcp_port)
self.banner_file = banner_file if banner_file is not None else self.default('BANNER_FILE', None)
self.allow_lan_udp = allow_lan_udp if allow_lan_udp is not None else self.boolean('ALLOW_LAN_UDP', False)
self.country = country if country is not None else self.default('COUNTRY', 'US')
# Peer discovery
self.peer_discovery = self.peer_discovery_enum()
self.peer_announce = peer_announce if peer_announce is not None else self.boolean('PEER_ANNOUNCE', True)
if peer_hubs is not None:
self.peer_hubs = [p.strip("") for p in peer_hubs.split(",")]
else:
self.peer_hubs = self.extract_peer_hubs()
# The electrum client takes the empty string as unspecified
self.payment_address = payment_address if payment_address is not None else self.default('PAYMENT_ADDRESS', '')
self.donation_address = donation_address if donation_address is not None else self.default('DONATION_ADDRESS',
'')
# Server limits to help prevent DoS
self.max_send = max_send if max_send is not None else self.integer('MAX_SEND', 1000000000000000000)
self.max_receive = max_receive if max_receive is not None else self.integer('MAX_RECEIVE', 1000000000000000000)
self.max_sessions = max_sessions if max_sessions is not None else self.sane_max_sessions()
self.session_timeout = session_timeout if session_timeout is not None else self.integer('SESSION_TIMEOUT', 600)
self.drop_client = re.compile(drop_client) if drop_client is not None else self.custom("DROP_CLIENT", None, re.compile)
self.description = description if description is not None else self.default('DESCRIPTION', '')
self.daily_fee = daily_fee if daily_fee is not None else self.string_amount('DAILY_FEE', '0')
self.database_query_timeout = (database_query_timeout / 1000.0) if database_query_timeout is not None else \
(float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0)
@classmethod
def contribute_to_arg_parser(cls, parser):
super().contribute_to_arg_parser(parser)
env_daemon_url = cls.default('DAEMON_URL', None)
parser.add_argument('--daemon_url', required=env_daemon_url is None,
help="URL for rpc from lbrycrd or lbcd, "
"<rpcuser>:<rpcpassword>@<lbrycrd rpc ip><lbrycrd rpc port>.",
default=env_daemon_url)
parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'),
help="Interface for hub server to listen on, use 0.0.0.0 to listen on the external "
"interface. Can be set in env with 'HOST'")
parser.add_argument('--tcp_port', type=int, default=cls.integer('TCP_PORT', 50001),
help="Electrum TCP port to listen on for hub server. Can be set in env with 'TCP_PORT'")
parser.add_argument('--udp_port', type=int, default=cls.integer('UDP_PORT', 50001),
help="'UDP port to listen on for hub server. Can be set in env with 'UDP_PORT'")
parser.add_argument('--max_sessions', type=int, default=cls.integer('MAX_SESSIONS', 100000),
help="Maximum number of electrum clients that can be connected, defaults to 100000.")
parser.add_argument('--max_send', type=int, default=cls.integer('MAX_SESSIONS', 1000000000000000000),
help="Maximum size of a request")
parser.add_argument('--max_receive', type=int, default=cls.integer('MAX_SESSIONS', 1000000000000000000),
help="Maximum size of a response")
parser.add_argument('--drop_client', type=str, default=cls.default('DROP_CLIENT', None),
help="Regex used for blocking clients")
parser.add_argument('--session_timeout', type=int, default=cls.integer('SESSION_TIMEOUT', 600),
help="Session inactivity timeout")
parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str,
help="Hostname or ip address of the elasticsearch instance to connect to. "
"Can be set in env with 'ELASTIC_HOST'")
parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int,
help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'")
parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'),
type=str, help='elasticsearch sync notifier host, defaults to localhost')
parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int,
help='elasticsearch sync notifier port')
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
parser.add_argument('--allow_lan_udp', action='store_true',
help="Reply to clients on the local network", default=cls.boolean('ALLOW_LAN_UDP', False))
parser.add_argument('--description', default=cls.default('DESCRIPTION', None), type=str)
parser.add_argument('--banner_file', default=cls.default('BANNER_FILE', None), type=str)
parser.add_argument('--country', default=cls.default('COUNTRY', 'US'), type=str)
parser.add_argument('--payment_address', default=cls.default('PAYMENT_ADDRESS', None), type=str)
parser.add_argument('--donation_address', default=cls.default('DONATION_ADDRESS', None), type=str)
parser.add_argument('--daily_fee', default=cls.default('DAILY_FEE', '0'), type=str)
parser.add_argument('--query_timeout_ms', default=cls.default('QUERY_TIMEOUT_MS', 10000), type=int)
@classmethod
def from_arg_parser(cls, args):
return cls(
db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_host=args.elastic_host,
elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain,
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file,
allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos, country=args.country, payment_address=args.payment_address,
donation_address=args.donation_address, max_send=args.max_send, max_receive=args.max_receive,
max_sessions=args.max_sessions, session_timeout=args.session_timeout,
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,
database_query_timeout=args.query_timeout_ms / 1000, blocking_channel_ids=args.blocking_channel_ids,
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
elastic_notifier_port=args.elastic_notifier_port
)

View file

@ -1,5 +1,5 @@
import time import time
import typing
import asyncio import asyncio
from scribe.blockchain.daemon import LBCDaemon from scribe.blockchain.daemon import LBCDaemon
from scribe.hub.session import SessionManager from scribe.hub.session import SessionManager
@ -7,11 +7,14 @@ from scribe.hub.mempool import HubMemPool
from scribe.hub.udp import StatusServer from scribe.hub.udp import StatusServer
from scribe.service import BlockchainReaderService from scribe.service import BlockchainReaderService
from scribe.elasticsearch import ElasticNotifierClientProtocol from scribe.elasticsearch import ElasticNotifierClientProtocol
if typing.TYPE_CHECKING:
from scribe.hub.env import ServerEnv
class HubServerService(BlockchainReaderService): class HubServerService(BlockchainReaderService):
def __init__(self, env): def __init__(self, env: 'ServerEnv'):
super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker') super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker')
self.env = env
self.notifications_to_send = [] self.notifications_to_send = []
self.mempool_notifications = set() self.mempool_notifications = set()
self.status_server = StatusServer() self.status_server = StatusServer()

View file

@ -28,7 +28,7 @@ from scribe.hub.common import BatchRequest, ProtocolError, Request, Batch, Notif
from scribe.hub.framer import NewlineFramer from scribe.hub.framer import NewlineFramer
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from scribe.db import HubDB from scribe.db import HubDB
from scribe.env import Env from scribe.hub.env import ServerEnv
from scribe.blockchain.daemon import LBCDaemon from scribe.blockchain.daemon import LBCDaemon
from scribe.hub.mempool import HubMemPool from scribe.hub.mempool import HubMemPool
@ -164,7 +164,7 @@ class SessionManager:
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
) )
def __init__(self, env: 'Env', db: 'HubDB', mempool: 'HubMemPool', def __init__(self, env: 'ServerEnv', db: 'HubDB', mempool: 'HubMemPool',
daemon: 'LBCDaemon', shutdown_event: asyncio.Event, daemon: 'LBCDaemon', shutdown_event: asyncio.Event,
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]): on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
@ -230,10 +230,6 @@ class SessionManager:
host = env.cs_host() host = env.cs_host()
if env.tcp_port is not None: if env.tcp_port is not None:
await self._start_server('TCP', host, env.tcp_port) await self._start_server('TCP', host, env.tcp_port)
if env.ssl_port is not None:
sslc = ssl.SSLContext(ssl.PROTOCOL_TLS)
sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile)
await self._start_server('SSL', host, env.ssl_port, ssl=sslc)
async def _close_servers(self, kinds): async def _close_servers(self, kinds):
"""Close the servers of the given kinds (TCP etc.).""" """Close the servers of the given kinds (TCP etc.)."""
@ -698,7 +694,6 @@ class LBRYElectrumX(asyncio.Protocol):
self.kind = kind # 'RPC', 'TCP' etc. self.kind = kind # 'RPC', 'TCP' etc.
self.coin = self.env.coin self.coin = self.env.coin
self.anon_logs = self.env.anon_logs
self.txs_sent = 0 self.txs_sent = 0
self.log_me = False self.log_me = False
self.daemon_request = self.session_manager.daemon_request self.daemon_request = self.session_manager.daemon_request
@ -785,19 +780,6 @@ class LBRYElectrumX(asyncio.Protocol):
def default_framer(self): def default_framer(self):
return NewlineFramer(self.env.max_receive) return NewlineFramer(self.env.max_receive)
def peer_address_str(self, *, for_log=True):
"""Returns the peer's IP address and port as a human-readable
string, respecting anon logs if the output is for a log."""
if for_log and self.anon_logs:
return 'xx.xx.xx.xx:xx'
if not self._address:
return 'unknown'
ip_addr_str, port = self._address[:2]
if ':' in ip_addr_str:
return f'[{ip_addr_str}]:{port}'
else:
return f'{ip_addr_str}:{port}'
def toggle_logging(self): def toggle_logging(self):
self.log_me = not self.log_me self.log_me = not self.log_me
@ -1037,7 +1019,7 @@ class LBRYElectrumX(asyncio.Protocol):
await self._send_message(message) await self._send_message(message)
return True return True
except asyncio.TimeoutError: except asyncio.TimeoutError:
self.logger.info("timeout sending address notification to %s", self.peer_address_str(for_log=True)) self.logger.info(f"timeout sending address notification to {self._address[0]}:{self._address[1]}")
self.abort() self.abort()
return False return False
@ -1048,7 +1030,7 @@ class LBRYElectrumX(asyncio.Protocol):
await self._send_message(message) await self._send_message(message)
return True return True
except asyncio.TimeoutError: except asyncio.TimeoutError:
self.logger.info("timeout sending address notification to %s", self.peer_address_str(for_log=True)) self.logger.info(f"timeout sending address notification to {self._address[0]}:{self._address[1]}")
self.abort() self.abort()
return False return False
@ -1078,7 +1060,7 @@ class LBRYElectrumX(asyncio.Protocol):
"""Return the server features dictionary.""" """Return the server features dictionary."""
min_str, max_str = cls.protocol_min_max_strings() min_str, max_str = cls.protocol_min_max_strings()
cls.cached_server_features.update({ cls.cached_server_features.update({
'hosts': env.hosts_dict(), 'hosts': {},
'pruning': None, 'pruning': None,
'server_version': cls.version, 'server_version': cls.version,
'protocol_min': min_str, 'protocol_min': min_str,

View file

@ -28,7 +28,7 @@ class BlockchainService:
self.lock = asyncio.Lock() self.lock = asyncio.Lock()
self.last_state: typing.Optional[DBState] = None self.last_state: typing.Optional[DBState] = None
self.db = HubDB( self.db = HubDB(
env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes, env.coin, env.db_dir, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids, secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor filtering_channel_ids=env.filtering_channel_ids, executor=self._executor
) )

View file

@ -107,7 +107,7 @@ class TestRevertableOpStack(unittest.TestCase):
class TestRevertablePrefixDB(unittest.TestCase): class TestRevertablePrefixDB(unittest.TestCase):
def setUp(self): def setUp(self):
self.tmp_dir = tempfile.mkdtemp() self.tmp_dir = tempfile.mkdtemp()
self.db = PrefixDB(self.tmp_dir, cache_mb=1, max_open_files=32) self.db = PrefixDB(self.tmp_dir, max_open_files=32)
def tearDown(self) -> None: def tearDown(self) -> None:
self.db.close() self.db.close()