commit
ad1eee4eb7
9 changed files with 1297 additions and 31 deletions
190
Gopkg.lock
generated
190
Gopkg.lock
generated
|
@ -2,24 +2,48 @@
|
|||
|
||||
|
||||
[[projects]]
|
||||
digest = "1:a69ab3f1445ffd4815add4bd31ba05b65b3b9fec1ade5057d5d717f30e6efd6d"
|
||||
name = "github.com/SermoDigital/jose"
|
||||
packages = [
|
||||
".",
|
||||
"crypto",
|
||||
"jws",
|
||||
"jwt"
|
||||
"jwt",
|
||||
]
|
||||
pruneopts = "UT"
|
||||
revision = "f6df55f235c24f236d11dbcf665249a59ac2021f"
|
||||
version = "1.1"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
name = "github.com/anacrolix/dht"
|
||||
packages = ["krpc"]
|
||||
revision = "cae37fd1842087605e61382e82e4f87fab27afdc"
|
||||
digest = "1:7afff364b8e5e9f1085fe77ae5630b8e0f7482338a50535881aa0b433e48fb0b"
|
||||
name = "github.com/alicebob/gopher-json"
|
||||
packages = ["."]
|
||||
pruneopts = "UT"
|
||||
revision = "5a6b3ba71ee69b77cf64febf8b5a7526ca5eaef0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:b7cb0201d452c4a7079dc6e8673693c1153b115e906d50d85600c962fc6085a8"
|
||||
name = "github.com/alicebob/miniredis"
|
||||
packages = [
|
||||
".",
|
||||
"server",
|
||||
]
|
||||
pruneopts = "UT"
|
||||
revision = "3657542c8629876a1fa83e0b30a0246a67ffa652"
|
||||
version = "v2.4.5"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:5019de799334af68e1c4c38911c5e5603770dc879bf29e770561e7f5871ec639"
|
||||
name = "github.com/anacrolix/dht"
|
||||
packages = ["krpc"]
|
||||
pruneopts = "UT"
|
||||
revision = "b09db78595aaba14cd992537fbe992a4c5e0a141"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:8e3aed14c993c6d261a04d530e7de0a25113b8fef4c9bc4d8d427f512254a6a9"
|
||||
name = "github.com/anacrolix/missinggo"
|
||||
packages = [
|
||||
".",
|
||||
|
@ -27,175 +51,279 @@
|
|||
"httptoo",
|
||||
"mime",
|
||||
"pproffd",
|
||||
"slices"
|
||||
"slices",
|
||||
]
|
||||
revision = "60ef2fbf63df5d871ada2680d4d8a6013dcd1745"
|
||||
pruneopts = "UT"
|
||||
revision = "3237bf955fed3b69d69458bf21e0a63cb982339a"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
name = "github.com/anacrolix/torrent"
|
||||
packages = [
|
||||
"bencode",
|
||||
"tracker"
|
||||
"tracker",
|
||||
]
|
||||
revision = "4431464fd62c37843addc79822f7cd30b4467471"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:d6afaeed1502aa28e80a4ed0981d570ad91b2579193404256ce672ed0a609e0d"
|
||||
name = "github.com/beorn7/perks"
|
||||
packages = ["quantile"]
|
||||
pruneopts = "UT"
|
||||
revision = "3a771d992973f24aa725d07868b467d1ddfceafb"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:7bfd6f49553eb9409ecde1d0311fdcc4944a887d9fff23415f74b7bd462d95a4"
|
||||
name = "github.com/bradfitz/iter"
|
||||
packages = ["."]
|
||||
pruneopts = "UT"
|
||||
revision = "454541ec3da2a73fc34fd049b19ee5777bf19345"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:ffe9824d294da03b391f44e1ae8281281b4afc1bdaa9588c9097785e3af10cec"
|
||||
name = "github.com/davecgh/go-spew"
|
||||
packages = ["spew"]
|
||||
pruneopts = "UT"
|
||||
revision = "8991bc29aa16c548c550c7ff78260e27b9ab7c73"
|
||||
version = "v1.1.1"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:97df918963298c287643883209a2c3f642e6593379f97ab400c2a2e219ab647d"
|
||||
name = "github.com/golang/protobuf"
|
||||
packages = ["proto"]
|
||||
pruneopts = "UT"
|
||||
revision = "aa810b61a9c79d51363740d207bb46cf8e620ed5"
|
||||
version = "v1.2.0"
|
||||
|
||||
[[projects]]
|
||||
name = "github.com/huandu/xstrings"
|
||||
packages = ["."]
|
||||
revision = "55ae428c2ac4f74d7430952ef528631e656ac92c"
|
||||
version = "v1.1.0"
|
||||
digest = "1:38ec74012390146c45af1f92d46e5382b50531247929ff3a685d2b2be65155ac"
|
||||
name = "github.com/gomodule/redigo"
|
||||
packages = [
|
||||
"internal",
|
||||
"redis",
|
||||
]
|
||||
pruneopts = "UT"
|
||||
revision = "9c11da706d9b7902c6da69c592f75637793fe121"
|
||||
version = "v2.0.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:f9a5e090336881be43cfc1cf468330c1bdd60abdc9dd194e0b1ab69f4b94dd7c"
|
||||
name = "github.com/huandu/xstrings"
|
||||
packages = ["."]
|
||||
pruneopts = "UT"
|
||||
revision = "f02667b379e2fb5916c3cda2cf31e0eb885d79f8"
|
||||
version = "v1.2.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:870d441fe217b8e689d7949fef6e43efbc787e50f200cb1e70dbca9204a1d6be"
|
||||
name = "github.com/inconshreveable/mousetrap"
|
||||
packages = ["."]
|
||||
pruneopts = "UT"
|
||||
revision = "76626ae9c91c4f2a10f34cad8ce83ea42c93bb75"
|
||||
version = "v1.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:f97285a3b0a496dcf8801072622230d513f69175665d94de60eb042d03387f6c"
|
||||
name = "github.com/julienschmidt/httprouter"
|
||||
packages = ["."]
|
||||
revision = "8c199fb6259ffc1af525cc3ad52ee60ba8359669"
|
||||
version = "v1.1"
|
||||
pruneopts = "UT"
|
||||
revision = "348b672cd90d8190f8240323e372ecd1e66b59dc"
|
||||
version = "v1.2.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:0a69a1c0db3591fcefb47f115b224592c8dfa4368b7ba9fae509d5e16cdc95c8"
|
||||
name = "github.com/konsorten/go-windows-terminal-sequences"
|
||||
packages = ["."]
|
||||
pruneopts = "UT"
|
||||
revision = "5c8c8bd35d3832f5d134ae1e1e375b69a4d25242"
|
||||
version = "v1.0.1"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:ff5ebae34cfbf047d505ee150de27e60570e8c394b3b8fdbb720ff6ac71985fc"
|
||||
name = "github.com/matttproud/golang_protobuf_extensions"
|
||||
packages = ["pbutil"]
|
||||
pruneopts = "UT"
|
||||
revision = "c12348ce28de40eed0136aa2b644d0ee0650e56c"
|
||||
version = "v1.0.1"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:bad8c08367e93a0a30d1090ac9bc98e7157df162d21b638cc72d28d331806f8c"
|
||||
name = "github.com/mendsley/gojwk"
|
||||
packages = ["."]
|
||||
pruneopts = "UT"
|
||||
revision = "4d5ec6e58103388d6cb0d7d72bc72649be4f0504"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:445210ef1e723060b3ebeb691648b5090f154992ee35f7065b70a6a5a96a3bb8"
|
||||
name = "github.com/minio/sha256-simd"
|
||||
packages = ["."]
|
||||
revision = "ad98a36ba0da87206e3378c556abbfeaeaa98668"
|
||||
pruneopts = "UT"
|
||||
revision = "51976451ce1942acbb55707a983ed232fa027110"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:40e195917a951a8bf867cd05de2a46aaf1806c50cf92eebf4c16f78cd196f747"
|
||||
name = "github.com/pkg/errors"
|
||||
packages = ["."]
|
||||
pruneopts = "UT"
|
||||
revision = "645ef00459ed84a119197bfb8d8205042c6df63d"
|
||||
version = "v0.8.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:0028cb19b2e4c3112225cd871870f2d9cf49b9b4276531f03438a88e94be86fe"
|
||||
name = "github.com/pmezard/go-difflib"
|
||||
packages = ["difflib"]
|
||||
pruneopts = "UT"
|
||||
revision = "792786c7400a136282c1664665ae0a8db921c6c2"
|
||||
version = "v1.0.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:b6221ec0f8903b556e127c449e7106b63e6867170c2d10a7c058623d086f2081"
|
||||
name = "github.com/prometheus/client_golang"
|
||||
packages = ["prometheus"]
|
||||
pruneopts = "UT"
|
||||
revision = "c5b7fccd204277076155f10851dad72b76a49317"
|
||||
version = "v0.8.0"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:2d5cd61daa5565187e1d96bae64dbbc6080dacf741448e9629c64fd93203b0d4"
|
||||
name = "github.com/prometheus/client_model"
|
||||
packages = ["go"]
|
||||
pruneopts = "UT"
|
||||
revision = "5c3871d89910bfb32f5fcab2aa4b9ec68e65a99f"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:33c81bf709f084827a560e14415b1d7001a6d1d6fa4bec2cc2e60b84ecbc0e0a"
|
||||
name = "github.com/prometheus/common"
|
||||
packages = [
|
||||
"expfmt",
|
||||
"internal/bitbucket.org/ww/goautoneg",
|
||||
"model"
|
||||
"model",
|
||||
]
|
||||
revision = "c7de2306084e37d54b8be01f3541a8464345e9a5"
|
||||
pruneopts = "UT"
|
||||
revision = "67670fe90761d7ff18ec1d640135e53b9198328f"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:08eb8b60450efe841e37512d66ce366a87d187505d7c67b99307a6c1803483a2"
|
||||
name = "github.com/prometheus/procfs"
|
||||
packages = [
|
||||
".",
|
||||
"internal/util",
|
||||
"nfs",
|
||||
"xfs"
|
||||
"xfs",
|
||||
]
|
||||
revision = "05ee40e3a273f7245e8777337fc7b46e533a9a92"
|
||||
pruneopts = "UT"
|
||||
revision = "14fa7590c24d4615893b68e22fce3b3489689f65"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:69b1cc331fca23d702bd72f860c6a647afd0aa9fcbc1d0659b1365e26546dd70"
|
||||
name = "github.com/sirupsen/logrus"
|
||||
packages = ["."]
|
||||
revision = "3e01752db0189b9157070a0e1668a620f9a85da2"
|
||||
version = "v1.0.6"
|
||||
pruneopts = "UT"
|
||||
revision = "bcd833dfe83d3cebad139e4a29ed79cb2318bf95"
|
||||
version = "v1.2.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:645cabccbb4fa8aab25a956cbcbdf6a6845ca736b2c64e197ca7cbb9d210b939"
|
||||
name = "github.com/spf13/cobra"
|
||||
packages = ["."]
|
||||
pruneopts = "UT"
|
||||
revision = "ef82de70bb3f60c65fb8eebacbb2d122ef517385"
|
||||
version = "v0.0.3"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:c1b1102241e7f645bc8e0c22ae352e8f0dc6484b6cb4d132fa9f24174e0119e2"
|
||||
name = "github.com/spf13/pflag"
|
||||
packages = ["."]
|
||||
revision = "9a97c102cda95a86cec2345a6f09f55a939babf5"
|
||||
version = "v1.0.2"
|
||||
pruneopts = "UT"
|
||||
revision = "298182f68c66c05229eb03ac171abe6e309ee79a"
|
||||
version = "v1.0.3"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:c40d65817cdd41fac9aa7af8bed56927bb2d6d47e4fea566a74880f5c2b1c41e"
|
||||
name = "github.com/stretchr/testify"
|
||||
packages = [
|
||||
"assert",
|
||||
"require"
|
||||
"require",
|
||||
]
|
||||
pruneopts = "UT"
|
||||
revision = "f35b8ab0b5a2cef36673838d662e249dd9c94686"
|
||||
version = "v1.2.2"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
name = "golang.org/x/crypto"
|
||||
packages = ["ssh/terminal"]
|
||||
revision = "182538f80094b6a8efaade63a8fd8e0d9d5843dd"
|
||||
digest = "1:7d2aad811396da92f23df2890ed55cd8b5ccd2ecfd77ea34fc9e9f9c3a207d22"
|
||||
name = "github.com/yuin/gopher-lua"
|
||||
packages = [
|
||||
".",
|
||||
"ast",
|
||||
"parse",
|
||||
"pm",
|
||||
]
|
||||
pruneopts = "UT"
|
||||
revision = "1e6e6e1918e02ddf02e667e88e8aa756942448c5"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:38f553aff0273ad6f367cb0a0f8b6eecbaef8dc6cb8b50e57b6a81c1d5b1e332"
|
||||
name = "golang.org/x/crypto"
|
||||
packages = ["ssh/terminal"]
|
||||
pruneopts = "UT"
|
||||
revision = "8d7daa0c54b357f3071e11eaef7efc4e19a417e2"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:191cccd950a4aeadb60306062f2bdc2f924d750d0156ec6c691b17211bfd7349"
|
||||
name = "golang.org/x/sys"
|
||||
packages = [
|
||||
"unix",
|
||||
"windows"
|
||||
"windows",
|
||||
]
|
||||
revision = "49385e6e15226593f68b26af201feec29d5bba22"
|
||||
pruneopts = "UT"
|
||||
revision = "82a175fd1598e8a172e58ebdf5ed262bb29129e5"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:51fd139389bbbd2c9e0eb4e139c3719c8dabf4e6679f4f9f9916e03bb55306e2"
|
||||
name = "gopkg.in/redsync.v1"
|
||||
packages = ["."]
|
||||
pruneopts = "UT"
|
||||
revision = "89538344de92e78df4b7eeeceec1f3cf2d7c0aeb"
|
||||
version = "v1.1.1"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:4d2e5a73dc1500038e504a8d78b986630e3626dc027bc030ba5c75da257cdb96"
|
||||
name = "gopkg.in/yaml.v2"
|
||||
packages = ["."]
|
||||
revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183"
|
||||
version = "v2.2.1"
|
||||
pruneopts = "UT"
|
||||
revision = "51d6538a90f86fe93ac480b35f37b2be17fef232"
|
||||
version = "v2.2.2"
|
||||
|
||||
[solve-meta]
|
||||
analyzer-name = "dep"
|
||||
analyzer-version = 1
|
||||
inputs-digest = "073ea42bedc8b51977e04bb4e86deb764315fa1b904e1da4cf48e2247a056caf"
|
||||
input-imports = [
|
||||
"github.com/SermoDigital/jose/crypto",
|
||||
"github.com/SermoDigital/jose/jws",
|
||||
"github.com/SermoDigital/jose/jwt",
|
||||
"github.com/alicebob/miniredis",
|
||||
"github.com/anacrolix/torrent/tracker",
|
||||
"github.com/gomodule/redigo/redis",
|
||||
"github.com/julienschmidt/httprouter",
|
||||
"github.com/mendsley/gojwk",
|
||||
"github.com/minio/sha256-simd",
|
||||
"github.com/pkg/errors",
|
||||
"github.com/prometheus/client_golang/prometheus",
|
||||
"github.com/sirupsen/logrus",
|
||||
"github.com/spf13/cobra",
|
||||
"github.com/stretchr/testify/require",
|
||||
"gopkg.in/redsync.v1",
|
||||
"gopkg.in/yaml.v2",
|
||||
]
|
||||
solver-name = "gps-cdcl"
|
||||
solver-version = 1
|
||||
|
|
12
Gopkg.toml
12
Gopkg.toml
|
@ -69,6 +69,18 @@
|
|||
name = "gopkg.in/yaml.v2"
|
||||
version = "2.2.1"
|
||||
|
||||
[[constraint]]
|
||||
name = "gopkg.in/redsync.v1"
|
||||
version = "1.1.1"
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/gomodule/redigo"
|
||||
version = "2.0.0"
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/alicebob/miniredis"
|
||||
version = "2.4.5"
|
||||
|
||||
[prune]
|
||||
go-tests = true
|
||||
unused-packages = true
|
||||
|
|
|
@ -20,12 +20,14 @@ Differentiating features include:
|
|||
- IPv4 and IPv6 support
|
||||
- [YAML] configuration
|
||||
- Metrics via [Prometheus]
|
||||
- High Availability via [Redis]
|
||||
|
||||
[releases]: https://github.com/chihaya/chihaya/releases
|
||||
[BitTorrent tracker]: http://en.wikipedia.org/wiki/BitTorrent_tracker
|
||||
[Go]: https://golang.org
|
||||
[YAML]: http://yaml.org
|
||||
[Prometheus]: http://prometheus.io
|
||||
[Redis]: https://redis.io
|
||||
|
||||
## Why Chihaya?
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
// Imports to register storage drivers.
|
||||
_ "github.com/chihaya/chihaya/storage/memory"
|
||||
_ "github.com/chihaya/chihaya/storage/memorybysubnet"
|
||||
_ "github.com/chihaya/chihaya/storage/redis"
|
||||
)
|
||||
|
||||
type storageConfig struct {
|
||||
|
|
74
docs/storage/redis.md
Normal file
74
docs/storage/redis.md
Normal file
|
@ -0,0 +1,74 @@
|
|||
# Redis Storage
|
||||
|
||||
This storage system separates chihaya server from storage service, chihaya server achieves HA by storing all peer data in Redis, the storage service gets HA by creating cluster. If one chihaya node goes down, peer data will still be available in Redis service.
|
||||
|
||||
The HA of storage service is not considered here, it's another topic. In case Redis service is a single node, peer data will be unavailable if the node is down. So you should setup a Redis cluster for chihaya server in production.
|
||||
|
||||
## Use Case
|
||||
|
||||
When one chihaya instance is down, the Redis can continuily serve peer data through other chihaya instances.
|
||||
|
||||
## Configuration
|
||||
|
||||
```yaml
|
||||
chihaya:
|
||||
storage:
|
||||
name: redis
|
||||
config:
|
||||
# The frequency which stale peers are removed.
|
||||
gc_interval: 14m
|
||||
|
||||
# The frequency which metrics are pushed into a local Prometheus endpoint.
|
||||
prometheus_reporting_interval: 1s
|
||||
|
||||
# The amount of time until a peer is considered stale.
|
||||
# To avoid churn, keep this slightly larger than `announce_interval`
|
||||
peer_lifetime: 16m
|
||||
|
||||
# The address of redis storage.
|
||||
redis_broker: "redis://pwd@127.0.0.1:6379/0"
|
||||
|
||||
# The timeout for reading a command reply from redis.
|
||||
redis_read_timeout: 15s
|
||||
|
||||
# The timeout for writing a command to redis.
|
||||
redis_write_timeout: 15s
|
||||
|
||||
# The timeout for connecting to redis server.
|
||||
redis_connect_timeout: 15s
|
||||
```
|
||||
|
||||
## Implementation
|
||||
|
||||
Seeders and Leechers for a particular InfoHash are stored with a redis hash structure, the infohash is used as hash key, peer key is field, last modified time is value.
|
||||
|
||||
All the InfoHashes (swarms) are also stored into redis hash, IP family is the key, infohash is field, last modified time is value.
|
||||
|
||||
Here is an example
|
||||
|
||||
```
|
||||
- IPv4
|
||||
- IPv4_S_<infohash 1>: <modification time>
|
||||
- IPv4_L_<infohash 1>: <modification time>
|
||||
- IPv4_S_<infohash 2>: <modification time>
|
||||
- IPv4_S_<infohash 1>
|
||||
- <peer 1 key>: <modification time>
|
||||
- <peer 2 key>: <modification time>
|
||||
- IPv4_L_<infohash 1>
|
||||
- <peer 3 key>: <modification time>
|
||||
- IPv4_S_<infohash 2>
|
||||
- <peer 3 key>: <modification time>
|
||||
```
|
||||
|
||||
|
||||
In this case, prometheus will record two swarms, three seeders and one leecher.
|
||||
|
||||
So tree keys are used to record the count of swarms, seeders and leechers for each group (IPv4, IPv6).
|
||||
|
||||
```
|
||||
- IPv4_infohash_count: 2
|
||||
- IPv4_S_count: 3
|
||||
- IPv4_L_count: 1
|
||||
```
|
||||
|
||||
Note: IPv4_infohash_count has the different meaning with `memory` storage, it represents the number of infohashes reported by seeder.
|
|
@ -127,6 +127,32 @@ chihaya:
|
|||
# are collected and posted to Prometheus.
|
||||
prometheus_reporting_interval: 1s
|
||||
|
||||
# This block defines configuration used for redis storage.
|
||||
# storage:
|
||||
# name: redis
|
||||
# config:
|
||||
# # The frequency which stale peers are removed.
|
||||
# gc_interval: 14m
|
||||
|
||||
# # The frequency which metrics are pushed into a local Prometheus endpoint.
|
||||
# prometheus_reporting_interval: 1s
|
||||
|
||||
# # The amount of time until a peer is considered stale.
|
||||
# # To avoid churn, keep this slightly larger than `announce_interval`
|
||||
# peer_lifetime: 16m
|
||||
|
||||
# # The address of redis storage.
|
||||
# redis_broker: "redis://pwd@127.0.0.1:6379/0"
|
||||
|
||||
# # The timeout for reading a command reply from redis.
|
||||
# redis_read_timeout: 15s
|
||||
|
||||
# # The timeout for writing a command to redis.
|
||||
# redis_write_timeout: 15s
|
||||
|
||||
# # The timeout for connecting to redis server.
|
||||
# redis_connect_timeout: 15s
|
||||
|
||||
# This block defines configuration used for middleware executed before a
|
||||
# response has been returned to a BitTorrent client.
|
||||
prehooks:
|
||||
|
|
827
storage/redis/peer_store.go
Normal file
827
storage/redis/peer_store.go
Normal file
|
@ -0,0 +1,827 @@
|
|||
// Package redis implements the storage interface for a Chihaya
|
||||
// BitTorrent tracker keeping peer data in redis with hash.
|
||||
// There two categories of hash:
|
||||
//
|
||||
// - IPv{4,6}_{L,S}_infohash
|
||||
// To save peers that hold the infohash, used for fast searching,
|
||||
// deleting, and timeout handling
|
||||
//
|
||||
// - IPv{4,6}
|
||||
// To save all the infohashes, used for garbage collection,
|
||||
// metrics aggregation and leecher graduation
|
||||
//
|
||||
// Tree keys are used to record the count of swarms, seeders
|
||||
// and leechers for each group (IPv4, IPv6).
|
||||
//
|
||||
// - IPv{4,6}_infohash_count
|
||||
// To record the number of infohashes.
|
||||
//
|
||||
// - IPv{4,6}_S_count
|
||||
// To record the number of seeders.
|
||||
//
|
||||
// - IPv{4,6}_L_count
|
||||
// To record the number of leechers.
|
||||
package redis
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gomodule/redigo/redis"
|
||||
yaml "gopkg.in/yaml.v2"
|
||||
|
||||
"github.com/chihaya/chihaya/bittorrent"
|
||||
"github.com/chihaya/chihaya/pkg/log"
|
||||
"github.com/chihaya/chihaya/pkg/stop"
|
||||
"github.com/chihaya/chihaya/pkg/timecache"
|
||||
"github.com/chihaya/chihaya/storage"
|
||||
)
|
||||
|
||||
// Name is the name by which this peer store is registered with Chihaya.
|
||||
const Name = "redis"
|
||||
|
||||
// Default config constants.
|
||||
const (
|
||||
defaultPrometheusReportingInterval = time.Second * 1
|
||||
defaultGarbageCollectionInterval = time.Minute * 3
|
||||
defaultPeerLifetime = time.Minute * 30
|
||||
defaultRedisBroker = "redis://myRedis@127.0.0.1:6379/0"
|
||||
defaultRedisReadTimeout = time.Second * 15
|
||||
defaultRedisWriteTimeout = time.Second * 15
|
||||
defaultRedisConnectTimeout = time.Second * 15
|
||||
)
|
||||
|
||||
func init() {
|
||||
// Register the storage driver.
|
||||
storage.RegisterDriver(Name, driver{})
|
||||
}
|
||||
|
||||
type driver struct{}
|
||||
|
||||
func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) {
|
||||
// Marshal the config back into bytes.
|
||||
bytes, err := yaml.Marshal(icfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Unmarshal the bytes into the proper config type.
|
||||
var cfg Config
|
||||
err = yaml.Unmarshal(bytes, &cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return New(cfg)
|
||||
}
|
||||
|
||||
// Config holds the configuration of a redis PeerStore.
|
||||
type Config struct {
|
||||
GarbageCollectionInterval time.Duration `yaml:"gc_interval"`
|
||||
PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"`
|
||||
PeerLifetime time.Duration `yaml:"peer_lifetime"`
|
||||
RedisBroker string `yaml:"redis_broker"`
|
||||
RedisReadTimeout time.Duration `yaml:"redis_read_timeout"`
|
||||
RedisWriteTimeout time.Duration `yaml:"redis_write_timeout"`
|
||||
RedisConnectTimeout time.Duration `yaml:"redis_connect_timeout"`
|
||||
}
|
||||
|
||||
// LogFields renders the current config as a set of Logrus fields.
|
||||
func (cfg Config) LogFields() log.Fields {
|
||||
return log.Fields{
|
||||
"name": Name,
|
||||
"gcInterval": cfg.GarbageCollectionInterval,
|
||||
"promReportInterval": cfg.PrometheusReportingInterval,
|
||||
"peerLifetime": cfg.PeerLifetime,
|
||||
"redisBroker": cfg.RedisBroker,
|
||||
"redisReadTimeout": cfg.RedisReadTimeout,
|
||||
"redisWriteTimeout": cfg.RedisWriteTimeout,
|
||||
"redisConnectTimeout": cfg.RedisConnectTimeout,
|
||||
}
|
||||
}
|
||||
|
||||
// Validate sanity checks values set in a config and returns a new config with
|
||||
// default values replacing anything that is invalid.
|
||||
//
|
||||
// This function warns to the logger when a value is changed.
|
||||
func (cfg Config) Validate() Config {
|
||||
validcfg := cfg
|
||||
|
||||
if cfg.RedisBroker == "" {
|
||||
validcfg.RedisBroker = defaultRedisBroker
|
||||
log.Warn("falling back to default configuration", log.Fields{
|
||||
"name": Name + ".RedisBroker",
|
||||
"provided": cfg.RedisBroker,
|
||||
"default": validcfg.RedisBroker,
|
||||
})
|
||||
}
|
||||
|
||||
if cfg.RedisReadTimeout <= 0 {
|
||||
validcfg.RedisReadTimeout = defaultRedisReadTimeout
|
||||
log.Warn("falling back to default configuration", log.Fields{
|
||||
"name": Name + ".RedisReadTimeout",
|
||||
"provided": cfg.RedisReadTimeout,
|
||||
"default": validcfg.RedisReadTimeout,
|
||||
})
|
||||
}
|
||||
|
||||
if cfg.RedisWriteTimeout <= 0 {
|
||||
validcfg.RedisWriteTimeout = defaultRedisWriteTimeout
|
||||
log.Warn("falling back to default configuration", log.Fields{
|
||||
"name": Name + ".RedisWriteTimeout",
|
||||
"provided": cfg.RedisWriteTimeout,
|
||||
"default": validcfg.RedisWriteTimeout,
|
||||
})
|
||||
}
|
||||
|
||||
if cfg.RedisConnectTimeout <= 0 {
|
||||
validcfg.RedisConnectTimeout = defaultRedisConnectTimeout
|
||||
log.Warn("falling back to default configuration", log.Fields{
|
||||
"name": Name + ".RedisConnectTimeout",
|
||||
"provided": cfg.RedisConnectTimeout,
|
||||
"default": validcfg.RedisConnectTimeout,
|
||||
})
|
||||
}
|
||||
|
||||
if cfg.GarbageCollectionInterval <= 0 {
|
||||
validcfg.GarbageCollectionInterval = defaultGarbageCollectionInterval
|
||||
log.Warn("falling back to default configuration", log.Fields{
|
||||
"name": Name + ".GarbageCollectionInterval",
|
||||
"provided": cfg.GarbageCollectionInterval,
|
||||
"default": validcfg.GarbageCollectionInterval,
|
||||
})
|
||||
}
|
||||
|
||||
if cfg.PrometheusReportingInterval <= 0 {
|
||||
validcfg.PrometheusReportingInterval = defaultPrometheusReportingInterval
|
||||
log.Warn("falling back to default configuration", log.Fields{
|
||||
"name": Name + ".PrometheusReportingInterval",
|
||||
"provided": cfg.PrometheusReportingInterval,
|
||||
"default": validcfg.PrometheusReportingInterval,
|
||||
})
|
||||
}
|
||||
|
||||
if cfg.PeerLifetime <= 0 {
|
||||
validcfg.PeerLifetime = defaultPeerLifetime
|
||||
log.Warn("falling back to default configuration", log.Fields{
|
||||
"name": Name + ".PeerLifetime",
|
||||
"provided": cfg.PeerLifetime,
|
||||
"default": validcfg.PeerLifetime,
|
||||
})
|
||||
}
|
||||
|
||||
return validcfg
|
||||
}
|
||||
|
||||
// New creates a new PeerStore backed by redis.
|
||||
func New(provided Config) (storage.PeerStore, error) {
|
||||
cfg := provided.Validate()
|
||||
|
||||
u, err := parseRedisURL(cfg.RedisBroker)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ps := &peerStore{
|
||||
cfg: cfg,
|
||||
rb: newRedisBackend(&provided, u, ""),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Start a goroutine for garbage collection.
|
||||
ps.wg.Add(1)
|
||||
go func() {
|
||||
defer ps.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ps.closed:
|
||||
return
|
||||
case <-time.After(cfg.GarbageCollectionInterval):
|
||||
before := time.Now().Add(-cfg.PeerLifetime)
|
||||
log.Debug("storage: purging peers with no announces since", log.Fields{"before": before})
|
||||
if err = ps.collectGarbage(before); err != nil {
|
||||
log.Error("storage: collectGarbage error", log.Fields{"before": before, "error": err})
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Start a goroutine for reporting statistics to Prometheus.
|
||||
ps.wg.Add(1)
|
||||
go func() {
|
||||
defer ps.wg.Done()
|
||||
t := time.NewTicker(cfg.PrometheusReportingInterval)
|
||||
for {
|
||||
select {
|
||||
case <-ps.closed:
|
||||
t.Stop()
|
||||
return
|
||||
case <-t.C:
|
||||
before := time.Now()
|
||||
ps.populateProm()
|
||||
log.Debug("storage: populateProm() finished", log.Fields{"timeTaken": time.Since(before)})
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return ps, nil
|
||||
}
|
||||
|
||||
type serializedPeer string
|
||||
|
||||
func newPeerKey(p bittorrent.Peer) serializedPeer {
|
||||
b := make([]byte, 20+2+len(p.IP.IP))
|
||||
copy(b[:20], p.ID[:])
|
||||
binary.BigEndian.PutUint16(b[20:22], p.Port)
|
||||
copy(b[22:], p.IP.IP)
|
||||
|
||||
return serializedPeer(b)
|
||||
}
|
||||
|
||||
func decodePeerKey(pk serializedPeer) bittorrent.Peer {
|
||||
peer := bittorrent.Peer{
|
||||
ID: bittorrent.PeerIDFromString(string(pk[:20])),
|
||||
Port: binary.BigEndian.Uint16([]byte(pk[20:22])),
|
||||
IP: bittorrent.IP{IP: net.IP(pk[22:])}}
|
||||
|
||||
if ip := peer.IP.To4(); ip != nil {
|
||||
peer.IP.IP = ip
|
||||
peer.IP.AddressFamily = bittorrent.IPv4
|
||||
} else if len(peer.IP.IP) == net.IPv6len { // implies toReturn.IP.To4() == nil
|
||||
peer.IP.AddressFamily = bittorrent.IPv6
|
||||
} else {
|
||||
panic("IP is neither v4 nor v6")
|
||||
}
|
||||
|
||||
return peer
|
||||
}
|
||||
|
||||
type peerStore struct {
|
||||
cfg Config
|
||||
rb *redisBackend
|
||||
|
||||
closed chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func (ps *peerStore) groups() []string {
|
||||
return []string{bittorrent.IPv4.String(), bittorrent.IPv6.String()}
|
||||
}
|
||||
|
||||
func (ps *peerStore) leecherInfohashKey(af, ih string) string {
|
||||
return af + "_L_" + ih
|
||||
}
|
||||
|
||||
func (ps *peerStore) seederInfohashKey(af, ih string) string {
|
||||
return af + "_S_" + ih
|
||||
}
|
||||
|
||||
func (ps *peerStore) infohashCountKey(af string) string {
|
||||
return af + "_infohash_count"
|
||||
}
|
||||
|
||||
func (ps *peerStore) seederCountKey(af string) string {
|
||||
return af + "_S_count"
|
||||
}
|
||||
|
||||
func (ps *peerStore) leecherCountKey(af string) string {
|
||||
return af + "_L_count"
|
||||
}
|
||||
|
||||
// populateProm aggregates metrics over all groups and then posts them to
|
||||
// prometheus.
|
||||
func (ps *peerStore) populateProm() {
|
||||
var numInfohashes, numSeeders, numLeechers int64
|
||||
|
||||
conn := ps.rb.open()
|
||||
defer conn.Close()
|
||||
|
||||
for _, group := range ps.groups() {
|
||||
if n, err := redis.Int64(conn.Do("GET", ps.infohashCountKey(group))); err != nil && err != redis.ErrNil {
|
||||
log.Error("storage: GET counter failure", log.Fields{
|
||||
"key": ps.infohashCountKey(group),
|
||||
"error": err,
|
||||
})
|
||||
} else {
|
||||
numInfohashes += n
|
||||
}
|
||||
if n, err := redis.Int64(conn.Do("GET", ps.seederCountKey(group))); err != nil && err != redis.ErrNil {
|
||||
log.Error("storage: GET counter failure", log.Fields{
|
||||
"key": ps.seederCountKey(group),
|
||||
"error": err,
|
||||
})
|
||||
} else {
|
||||
numSeeders += n
|
||||
}
|
||||
if n, err := redis.Int64(conn.Do("GET", ps.leecherCountKey(group))); err != nil && err != redis.ErrNil {
|
||||
log.Error("storage: GET counter failure", log.Fields{
|
||||
"key": ps.leecherCountKey(group),
|
||||
"error": err,
|
||||
})
|
||||
} else {
|
||||
numLeechers += n
|
||||
}
|
||||
}
|
||||
|
||||
storage.PromInfohashesCount.Set(float64(numInfohashes))
|
||||
storage.PromSeedersCount.Set(float64(numSeeders))
|
||||
storage.PromLeechersCount.Set(float64(numLeechers))
|
||||
}
|
||||
|
||||
func (ps *peerStore) getClock() int64 {
|
||||
return timecache.NowUnixNano()
|
||||
}
|
||||
|
||||
func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
addressFamily := p.IP.AddressFamily.String()
|
||||
log.Debug("storage: PutSeeder", log.Fields{
|
||||
"InfoHash": ih.String(),
|
||||
"Peer": p,
|
||||
})
|
||||
|
||||
select {
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped redis store")
|
||||
default:
|
||||
}
|
||||
|
||||
pk := newPeerKey(p)
|
||||
|
||||
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String())
|
||||
ct := ps.getClock()
|
||||
|
||||
conn := ps.rb.open()
|
||||
defer conn.Close()
|
||||
|
||||
conn.Send("MULTI")
|
||||
conn.Send("HSET", encodedSeederInfoHash, pk, ct)
|
||||
conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct)
|
||||
reply, err := redis.Int64s(conn.Do("EXEC"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// pk is a new field.
|
||||
if reply[0] == 1 {
|
||||
_, err = conn.Do("INCR", ps.seederCountKey(addressFamily))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// encodedSeederInfoHash is a new field.
|
||||
if reply[1] == 1 {
|
||||
_, err = conn.Do("INCR", ps.infohashCountKey(addressFamily))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
addressFamily := p.IP.AddressFamily.String()
|
||||
log.Debug("storage: DeleteSeeder", log.Fields{
|
||||
"InfoHash": ih.String(),
|
||||
"Peer": p,
|
||||
})
|
||||
|
||||
select {
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped redis store")
|
||||
default:
|
||||
}
|
||||
|
||||
pk := newPeerKey(p)
|
||||
|
||||
conn := ps.rb.open()
|
||||
defer conn.Close()
|
||||
|
||||
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String())
|
||||
|
||||
delNum, err := redis.Int64(conn.Do("HDEL", encodedSeederInfoHash, pk))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if delNum == 0 {
|
||||
return storage.ErrResourceDoesNotExist
|
||||
}
|
||||
if _, err := conn.Do("DECR", ps.seederCountKey(addressFamily)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
addressFamily := p.IP.AddressFamily.String()
|
||||
log.Debug("storage: PutLeecher", log.Fields{
|
||||
"InfoHash": ih.String(),
|
||||
"Peer": p,
|
||||
})
|
||||
|
||||
select {
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped redis store")
|
||||
default:
|
||||
}
|
||||
|
||||
// Update the peer in the swarm.
|
||||
encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, ih.String())
|
||||
pk := newPeerKey(p)
|
||||
ct := ps.getClock()
|
||||
|
||||
conn := ps.rb.open()
|
||||
defer conn.Close()
|
||||
|
||||
conn.Send("MULTI")
|
||||
conn.Send("HSET", encodedLeecherInfoHash, pk, ct)
|
||||
conn.Send("HSET", addressFamily, encodedLeecherInfoHash, ct)
|
||||
reply, err := redis.Int64s(conn.Do("EXEC"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// pk is a new field.
|
||||
if reply[0] == 1 {
|
||||
_, err = conn.Do("INCR", ps.leecherCountKey(addressFamily))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
addressFamily := p.IP.AddressFamily.String()
|
||||
log.Debug("storage: DeleteLeecher", log.Fields{
|
||||
"InfoHash": ih.String(),
|
||||
"Peer": p,
|
||||
})
|
||||
|
||||
select {
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped redis store")
|
||||
default:
|
||||
}
|
||||
|
||||
conn := ps.rb.open()
|
||||
defer conn.Close()
|
||||
|
||||
pk := newPeerKey(p)
|
||||
encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, ih.String())
|
||||
|
||||
delNum, err := redis.Int64(conn.Do("HDEL", encodedLeecherInfoHash, pk))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if delNum == 0 {
|
||||
return storage.ErrResourceDoesNotExist
|
||||
}
|
||||
if _, err := conn.Do("DECR", ps.leecherCountKey(addressFamily)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
addressFamily := p.IP.AddressFamily.String()
|
||||
log.Debug("storage: GraduateLeecher", log.Fields{
|
||||
"InfoHash": ih.String(),
|
||||
"Peer": p,
|
||||
})
|
||||
|
||||
select {
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped redis store")
|
||||
default:
|
||||
}
|
||||
|
||||
encodedInfoHash := ih.String()
|
||||
encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash)
|
||||
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash)
|
||||
pk := newPeerKey(p)
|
||||
ct := ps.getClock()
|
||||
|
||||
conn := ps.rb.open()
|
||||
defer conn.Close()
|
||||
|
||||
conn.Send("MULTI")
|
||||
conn.Send("HDEL", encodedLeecherInfoHash, pk)
|
||||
conn.Send("HSET", encodedSeederInfoHash, pk, ct)
|
||||
conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct)
|
||||
reply, err := redis.Int64s(conn.Do("EXEC"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if reply[0] == 1 {
|
||||
_, err = conn.Do("DECR", ps.leecherCountKey(addressFamily))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if reply[1] == 1 {
|
||||
_, err = conn.Do("INCR", ps.seederCountKey(addressFamily))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if reply[2] == 1 {
|
||||
_, err = conn.Do("INCR", ps.infohashCountKey(addressFamily))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
|
||||
addressFamily := announcer.IP.AddressFamily.String()
|
||||
log.Debug("storage: AnnouncePeers", log.Fields{
|
||||
"InfoHash": ih.String(),
|
||||
"seeder": seeder,
|
||||
"numWant": numWant,
|
||||
"Peer": announcer,
|
||||
})
|
||||
|
||||
select {
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped redis store")
|
||||
default:
|
||||
}
|
||||
|
||||
encodedInfoHash := ih.String()
|
||||
encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash)
|
||||
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash)
|
||||
|
||||
conn := ps.rb.open()
|
||||
defer conn.Close()
|
||||
|
||||
leechers, err := conn.Do("HKEYS", encodedLeecherInfoHash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conLeechers := leechers.([]interface{})
|
||||
|
||||
seeders, err := conn.Do("HKEYS", encodedSeederInfoHash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conSeeders := seeders.([]interface{})
|
||||
|
||||
if len(conLeechers) == 0 && len(conSeeders) == 0 {
|
||||
return nil, storage.ErrResourceDoesNotExist
|
||||
}
|
||||
|
||||
if seeder {
|
||||
// Append leechers as possible.
|
||||
for _, pk := range conLeechers {
|
||||
if numWant == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
peers = append(peers, decodePeerKey(serializedPeer(pk.([]byte))))
|
||||
numWant--
|
||||
}
|
||||
} else {
|
||||
// Append as many seeders as possible.
|
||||
for _, pk := range conSeeders {
|
||||
if numWant == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
peers = append(peers, decodePeerKey(serializedPeer(pk.([]byte))))
|
||||
numWant--
|
||||
}
|
||||
|
||||
// Append leechers until we reach numWant.
|
||||
if numWant > 0 {
|
||||
announcerPK := newPeerKey(announcer)
|
||||
for _, pk := range conLeechers {
|
||||
if pk == announcerPK {
|
||||
continue
|
||||
}
|
||||
|
||||
if numWant == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
peers = append(peers, decodePeerKey(serializedPeer(pk.([]byte))))
|
||||
numWant--
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFamily) (resp bittorrent.Scrape) {
|
||||
select {
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped redis store")
|
||||
default:
|
||||
}
|
||||
|
||||
resp.InfoHash = ih
|
||||
addressFamily := af.String()
|
||||
encodedInfoHash := ih.String()
|
||||
encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash)
|
||||
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash)
|
||||
|
||||
conn := ps.rb.open()
|
||||
defer conn.Close()
|
||||
|
||||
leechersLen, err := redis.Int64(conn.Do("HLEN", encodedLeecherInfoHash))
|
||||
if err != nil {
|
||||
log.Error("storage: Redis HLEN failure", log.Fields{
|
||||
"Hkey": encodedLeecherInfoHash,
|
||||
"error": err,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
seedersLen, err := redis.Int64(conn.Do("HLEN", encodedSeederInfoHash))
|
||||
if err != nil {
|
||||
log.Error("storage: Redis HLEN failure", log.Fields{
|
||||
"Hkey": encodedSeederInfoHash,
|
||||
"error": err,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
resp.Incomplete = uint32(leechersLen)
|
||||
resp.Complete = uint32(seedersLen)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// collectGarbage deletes all Peers from the PeerStore which are older than the
|
||||
// cutoff time.
|
||||
//
|
||||
// This function must be able to execute while other methods on this interface
|
||||
// are being executed in parallel.
|
||||
//
|
||||
// - The Delete(Seeder|Leecher) and GraduateLeecher methods never delete an
|
||||
// infohash key from an addressFamily hash. They also never decrement the
|
||||
// infohash counter.
|
||||
// - The Put(Seeder|Leecher) and GraduateLeecher methods only ever add infohash
|
||||
// keys to addressFamily hashes and increment the infohash counter.
|
||||
// - The only method that deletes from the addressFamily hashes is
|
||||
// collectGarbage, which also decrements the counters. That means that,
|
||||
// even if a Delete(Seeder|Leecher) call removes the last peer from a swarm,
|
||||
// the infohash counter is not changed and the infohash is left in the
|
||||
// addressFamily hash until it will be cleaned up by collectGarbage.
|
||||
// - collectGarbage must run regularly.
|
||||
// - A WATCH ... MULTI ... EXEC block fails, if between the WATCH and the 'EXEC'
|
||||
// any of the watched keys have changed. The location of the 'MULTI' doesn't
|
||||
// matter.
|
||||
//
|
||||
// We have to analyze four cases to prove our algorithm works. I'll characterize
|
||||
// them by a tuple (number of peers in a swarm before WATCH, number of peers in
|
||||
// the swarm during the transaction).
|
||||
//
|
||||
// 1. (0,0), the easy case: The swarm is empty, we watch the key, we execute
|
||||
// HLEN and find it empty. We remove it and decrement the counter. It stays
|
||||
// empty the entire time, the transaction goes through.
|
||||
// 2. (1,n > 0): The swarm is not empty, we watch the key, we find it non-empty,
|
||||
// we unwatch the key. All good. No transaction is made, no transaction fails.
|
||||
// 3. (0,1): We have to analyze this in two ways.
|
||||
// - If the change happens before the HLEN call, we will see that the swarm is
|
||||
// not empty and start no transaction.
|
||||
// - If the change happens after the HLEN, we will attempt a transaction and it
|
||||
// will fail. This is okay, the swarm is not empty, we will try cleaning it up
|
||||
// next time collectGarbage runs.
|
||||
// 4. (1,0): Again, two ways:
|
||||
// - If the change happens before the HLEN, we will see an empty swarm. This
|
||||
// situation happens if a call to Delete(Seeder|Leecher) removed the last
|
||||
// peer asynchronously. We will attempt a transaction, but the transaction
|
||||
// will fail. This is okay, the infohash key will remain in the addressFamily
|
||||
// hash, we will attempt to clean it up the next time 'collectGarbage` runs.
|
||||
// - If the change happens after the HLEN, we will not even attempt to make the
|
||||
// transaction. The infohash key will remain in the addressFamil hash and
|
||||
// we'll attempt to clean it up the next time collectGarbage runs.
|
||||
func (ps *peerStore) collectGarbage(cutoff time.Time) error {
|
||||
select {
|
||||
case <-ps.closed:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
conn := ps.rb.open()
|
||||
defer conn.Close()
|
||||
|
||||
cutoffUnix := cutoff.UnixNano()
|
||||
start := time.Now()
|
||||
|
||||
for _, group := range ps.groups() {
|
||||
// list all infohashes in the group
|
||||
infohashesList, err := redis.Strings(conn.Do("HKEYS", group))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, ihStr := range infohashesList {
|
||||
isSeeder := len(ihStr) > 5 && ihStr[5:6] == "S"
|
||||
|
||||
// list all (peer, timeout) pairs for the ih
|
||||
ihList, err := redis.Strings(conn.Do("HGETALL", ihStr))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var pk serializedPeer
|
||||
var removedPeerCount int64
|
||||
for index, ihField := range ihList {
|
||||
if index%2 == 1 { // value
|
||||
mtime, err := strconv.ParseInt(ihField, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if mtime <= cutoffUnix {
|
||||
log.Debug("storage: deleting peer", log.Fields{
|
||||
"Peer": decodePeerKey(pk).String(),
|
||||
})
|
||||
ret, err := redis.Int64(conn.Do("HDEL", ihStr, pk))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
removedPeerCount += ret
|
||||
}
|
||||
} else { // key
|
||||
pk = serializedPeer([]byte(ihField))
|
||||
}
|
||||
}
|
||||
// DECR seeder/leecher counter
|
||||
decrCounter := ps.leecherCountKey(group)
|
||||
if isSeeder {
|
||||
decrCounter = ps.seederCountKey(group)
|
||||
}
|
||||
if removedPeerCount > 0 {
|
||||
if _, err := conn.Do("DECRBY", decrCounter, removedPeerCount); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// use WATCH to avoid race condition
|
||||
// https://redis.io/topics/transactions
|
||||
_, err = conn.Do("WATCH", ihStr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ihLen, err := redis.Int64(conn.Do("HLEN", ihStr))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ihLen == 0 {
|
||||
// Empty hashes are not shown among existing keys,
|
||||
// in other words, it's removed automatically after `HDEL` the last field.
|
||||
//_, err := conn.Do("DEL", ihStr)
|
||||
|
||||
conn.Send("MULTI")
|
||||
conn.Send("HDEL", group, ihStr)
|
||||
if isSeeder {
|
||||
conn.Send("DECR", ps.infohashCountKey(group))
|
||||
}
|
||||
_, err = redis.Values(conn.Do("EXEC"))
|
||||
if err != nil && err != redis.ErrNil {
|
||||
log.Error("storage: Redis EXEC failure", log.Fields{
|
||||
"group": group,
|
||||
"infohash": ihStr,
|
||||
"error": err,
|
||||
})
|
||||
}
|
||||
} else {
|
||||
if _, err = conn.Do("UNWATCH"); err != nil && err != redis.ErrNil {
|
||||
log.Error("storage: Redis UNWATCH failure", log.Fields{"error": err})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond)
|
||||
log.Debug("storage: recordGCDuration", log.Fields{"timeTaken(ms)": duration})
|
||||
storage.PromGCDurationMilliseconds.Observe(duration)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *peerStore) Stop() stop.Result {
|
||||
c := make(stop.Channel)
|
||||
go func() {
|
||||
close(ps.closed)
|
||||
ps.wg.Wait()
|
||||
log.Info("storage: exiting. chihaya does not clear data in redis when exiting. chihaya keys have prefix 'IPv{4,6}_'.")
|
||||
c.Done()
|
||||
}()
|
||||
|
||||
return c.Result()
|
||||
}
|
||||
|
||||
func (ps *peerStore) LogFields() log.Fields {
|
||||
return ps.cfg.LogFields()
|
||||
}
|
61
storage/redis/peer_store_test.go
Normal file
61
storage/redis/peer_store_test.go
Normal file
|
@ -0,0 +1,61 @@
|
|||
package redis
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/alicebob/miniredis"
|
||||
|
||||
s "github.com/chihaya/chihaya/storage"
|
||||
)
|
||||
|
||||
func createNew() s.PeerStore {
|
||||
rs, err := miniredis.Run()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
redisURL := fmt.Sprintf("redis://@%s/0", rs.Addr())
|
||||
ps, err := New(Config{
|
||||
GarbageCollectionInterval: 10 * time.Minute,
|
||||
PrometheusReportingInterval: 10 * time.Minute,
|
||||
PeerLifetime: 30 * time.Minute,
|
||||
RedisBroker: redisURL,
|
||||
RedisReadTimeout: 10 * time.Second,
|
||||
RedisWriteTimeout: 10 * time.Second,
|
||||
RedisConnectTimeout: 10 * time.Second})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return ps
|
||||
}
|
||||
|
||||
func TestPeerStore(t *testing.T) { s.TestPeerStore(t, createNew()) }
|
||||
|
||||
func BenchmarkNop(b *testing.B) { s.Nop(b, createNew()) }
|
||||
func BenchmarkPut(b *testing.B) { s.Put(b, createNew()) }
|
||||
func BenchmarkPut1k(b *testing.B) { s.Put1k(b, createNew()) }
|
||||
func BenchmarkPut1kInfohash(b *testing.B) { s.Put1kInfohash(b, createNew()) }
|
||||
func BenchmarkPut1kInfohash1k(b *testing.B) { s.Put1kInfohash1k(b, createNew()) }
|
||||
func BenchmarkPutDelete(b *testing.B) { s.PutDelete(b, createNew()) }
|
||||
func BenchmarkPutDelete1k(b *testing.B) { s.PutDelete1k(b, createNew()) }
|
||||
func BenchmarkPutDelete1kInfohash(b *testing.B) { s.PutDelete1kInfohash(b, createNew()) }
|
||||
func BenchmarkPutDelete1kInfohash1k(b *testing.B) { s.PutDelete1kInfohash1k(b, createNew()) }
|
||||
func BenchmarkDeleteNonexist(b *testing.B) { s.DeleteNonexist(b, createNew()) }
|
||||
func BenchmarkDeleteNonexist1k(b *testing.B) { s.DeleteNonexist1k(b, createNew()) }
|
||||
func BenchmarkDeleteNonexist1kInfohash(b *testing.B) { s.DeleteNonexist1kInfohash(b, createNew()) }
|
||||
func BenchmarkDeleteNonexist1kInfohash1k(b *testing.B) { s.DeleteNonexist1kInfohash1k(b, createNew()) }
|
||||
func BenchmarkPutGradDelete(b *testing.B) { s.PutGradDelete(b, createNew()) }
|
||||
func BenchmarkPutGradDelete1k(b *testing.B) { s.PutGradDelete1k(b, createNew()) }
|
||||
func BenchmarkPutGradDelete1kInfohash(b *testing.B) { s.PutGradDelete1kInfohash(b, createNew()) }
|
||||
func BenchmarkPutGradDelete1kInfohash1k(b *testing.B) { s.PutGradDelete1kInfohash1k(b, createNew()) }
|
||||
func BenchmarkGradNonexist(b *testing.B) { s.GradNonexist(b, createNew()) }
|
||||
func BenchmarkGradNonexist1k(b *testing.B) { s.GradNonexist1k(b, createNew()) }
|
||||
func BenchmarkGradNonexist1kInfohash(b *testing.B) { s.GradNonexist1kInfohash(b, createNew()) }
|
||||
func BenchmarkGradNonexist1kInfohash1k(b *testing.B) { s.GradNonexist1kInfohash1k(b, createNew()) }
|
||||
func BenchmarkAnnounceLeecher(b *testing.B) { s.AnnounceLeecher(b, createNew()) }
|
||||
func BenchmarkAnnounceLeecher1kInfohash(b *testing.B) { s.AnnounceLeecher1kInfohash(b, createNew()) }
|
||||
func BenchmarkAnnounceSeeder(b *testing.B) { s.AnnounceSeeder(b, createNew()) }
|
||||
func BenchmarkAnnounceSeeder1kInfohash(b *testing.B) { s.AnnounceSeeder1kInfohash(b, createNew()) }
|
||||
func BenchmarkScrapeSwarm(b *testing.B) { s.ScrapeSwarm(b, createNew()) }
|
||||
func BenchmarkScrapeSwarm1kInfohash(b *testing.B) { s.ScrapeSwarm1kInfohash(b, createNew()) }
|
135
storage/redis/redis.go
Normal file
135
storage/redis/redis.go
Normal file
|
@ -0,0 +1,135 @@
|
|||
package redis
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gomodule/redigo/redis"
|
||||
redsync "gopkg.in/redsync.v1"
|
||||
)
|
||||
|
||||
// redisBackend represents a redis handler.
|
||||
type redisBackend struct {
|
||||
pool *redis.Pool
|
||||
redsync *redsync.Redsync
|
||||
}
|
||||
|
||||
// newRedisBackend creates a redisBackend instance.
|
||||
func newRedisBackend(cfg *Config, u *redisURL, socketPath string) *redisBackend {
|
||||
rc := &redisConnector{
|
||||
URL: u,
|
||||
SocketPath: socketPath,
|
||||
ReadTimeout: cfg.RedisReadTimeout,
|
||||
WriteTimeout: cfg.RedisWriteTimeout,
|
||||
ConnectTimeout: cfg.RedisConnectTimeout,
|
||||
}
|
||||
pool := rc.NewPool()
|
||||
redsync := redsync.New([]redsync.Pool{pool})
|
||||
return &redisBackend{
|
||||
pool: pool,
|
||||
redsync: redsync,
|
||||
}
|
||||
}
|
||||
|
||||
// open returns or creates instance of Redis connection.
|
||||
func (rb *redisBackend) open() redis.Conn {
|
||||
return rb.pool.Get()
|
||||
}
|
||||
|
||||
type redisConnector struct {
|
||||
URL *redisURL
|
||||
SocketPath string
|
||||
ReadTimeout time.Duration
|
||||
WriteTimeout time.Duration
|
||||
ConnectTimeout time.Duration
|
||||
}
|
||||
|
||||
// NewPool returns a new pool of Redis connections
|
||||
func (rc *redisConnector) NewPool() *redis.Pool {
|
||||
return &redis.Pool{
|
||||
MaxIdle: 3,
|
||||
IdleTimeout: 240 * time.Second,
|
||||
Dial: func() (redis.Conn, error) {
|
||||
c, err := rc.open()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if rc.URL.DB != 0 {
|
||||
_, err = c.Do("SELECT", rc.URL.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return c, err
|
||||
},
|
||||
// PINGs connections that have been idle more than 10 seconds
|
||||
TestOnBorrow: func(c redis.Conn, t time.Time) error {
|
||||
if time.Since(t) < time.Duration(10*time.Second) {
|
||||
return nil
|
||||
}
|
||||
_, err := c.Do("PING")
|
||||
return err
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Open a new Redis connection
|
||||
func (rc *redisConnector) open() (redis.Conn, error) {
|
||||
var opts = []redis.DialOption{
|
||||
redis.DialDatabase(rc.URL.DB),
|
||||
redis.DialReadTimeout(rc.ReadTimeout),
|
||||
redis.DialWriteTimeout(rc.WriteTimeout),
|
||||
redis.DialConnectTimeout(rc.ConnectTimeout),
|
||||
}
|
||||
|
||||
if rc.URL.Password != "" {
|
||||
opts = append(opts, redis.DialPassword(rc.URL.Password))
|
||||
}
|
||||
|
||||
if rc.SocketPath != "" {
|
||||
return redis.Dial("unix", rc.SocketPath, opts...)
|
||||
}
|
||||
|
||||
return redis.Dial("tcp", rc.URL.Host, opts...)
|
||||
}
|
||||
|
||||
// A redisURL represents a parsed redisURL
|
||||
// The general form represented is:
|
||||
//
|
||||
// redis://[password@]host][/][db]
|
||||
type redisURL struct {
|
||||
Host string
|
||||
Password string
|
||||
DB int
|
||||
}
|
||||
|
||||
// parseRedisURL parse rawurl into redisURL
|
||||
func parseRedisURL(target string) (*redisURL, error) {
|
||||
var u *url.URL
|
||||
u, err := url.Parse(target)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if u.Scheme != "redis" {
|
||||
return nil, errors.New("no redis scheme found")
|
||||
}
|
||||
|
||||
db := 0 //default redis db
|
||||
parts := strings.Split(u.Path, "/")
|
||||
if len(parts) != 1 {
|
||||
db, err = strconv.Atoi(parts[1])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return &redisURL{
|
||||
Host: u.Host,
|
||||
Password: u.User.String(),
|
||||
DB: db,
|
||||
}, nil
|
||||
}
|
Loading…
Reference in a new issue