Ittt #52
10 changed files with 332 additions and 23 deletions
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/lbryio/reflector.go/peer"
|
||||
"github.com/lbryio/reflector.go/peer/http3"
|
||||
"github.com/lbryio/reflector.go/reflector"
|
||||
"github.com/lbryio/reflector.go/server/http"
|
||||
"github.com/lbryio/reflector.go/store"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
|
@ -28,6 +29,7 @@ import (
|
|||
var (
|
||||
tcpPeerPort int
|
||||
http3PeerPort int
|
||||
httpPort int
|
||||
receiverPort int
|
||||
metricsPort int
|
||||
disableUploads bool
|
||||
|
@ -57,6 +59,7 @@ func init() {
|
|||
cmd.Flags().StringVar(&WasabiEndpoint, "wasabi-endpoint", "", "Wasabi edge endpoint for standard HTTP retrieval")
|
||||
cmd.Flags().IntVar(&tcpPeerPort, "tcp-peer-port", 5567, "The port reflector will distribute content from")
|
||||
cmd.Flags().IntVar(&http3PeerPort, "http3-peer-port", 5568, "The port reflector will distribute content from over HTTP3 protocol")
|
||||
cmd.Flags().IntVar(&httpPort, "http-port", 5569, "The port reflector will distribute content from over HTTP protocol")
|
||||
cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from")
|
||||
cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for metrics")
|
||||
cmd.Flags().IntVar(&requestQueueSize, "request-queue-size", 200, "How many concurrent requests should be submitted to upstream")
|
||||
|
@ -105,6 +108,13 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
|
|||
}
|
||||
defer http3PeerServer.Shutdown()
|
||||
|
||||
httpServer := http.NewServer(outerStore)
|
||||
err = httpServer.Start(":" + strconv.Itoa(httpPort))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer httpServer.Shutdown()
|
||||
|
||||
metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics")
|
||||
metricsServer.Start()
|
||||
defer metricsServer.Shutdown()
|
||||
|
@ -133,6 +143,8 @@ func setupStore() store.BlobStore {
|
|||
Address: proxyAddress + ":" + proxyPort,
|
||||
Timeout: 30 * time.Second,
|
||||
})
|
||||
case "http":
|
||||
s = store.NewHttpStore(proxyAddress + ":" + proxyPort)
|
||||
default:
|
||||
log.Fatalf("protocol is not recognized: %s", proxyProtocol)
|
||||
}
|
||||
|
|
1
go.mod
1
go.mod
|
@ -13,6 +13,7 @@ require (
|
|||
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
|
||||
github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2
|
||||
github.com/davecgh/go-spew v1.1.1
|
||||
github.com/gin-gonic/gin v1.7.1 // indirect
|
||||
github.com/go-sql-driver/mysql v1.4.1
|
||||
github.com/golang/protobuf v1.4.2
|
||||
github.com/google/gops v0.3.18
|
||||
|
|
29
go.sum
29
go.sum
|
@ -89,6 +89,10 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
|
|||
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
|
||||
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
|
||||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
|
||||
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
|
||||
github.com/gin-gonic/gin v1.7.1 h1:qC89GU3p8TvKWMAVhEpmpB2CIb1hnqt2UdKZaP93mS8=
|
||||
github.com/gin-gonic/gin v1.7.1/go.mod h1:jD2toBW3GZUr5UMcdrwQA10I7RuaFOl/SGeDjXkfUtY=
|
||||
github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
|
||||
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
|
||||
github.com/go-errors/errors v1.1.1 h1:ljK/pL5ltg3qoN+OtN6yCv9HWSfMwxSx90GJCZQxYNg=
|
||||
|
@ -103,6 +107,13 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V
|
|||
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
|
||||
github.com/go-ozzo/ozzo-validation v3.5.0+incompatible/go.mod h1:gsEKFIVnabGBt6mXmxK0MoFy+cZoTJY6mu5Ll3LVLBU=
|
||||
github.com/go-ozzo/ozzo-validation v3.6.0+incompatible/go.mod h1:gsEKFIVnabGBt6mXmxK0MoFy+cZoTJY6mu5Ll3LVLBU=
|
||||
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
|
||||
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
|
||||
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
|
||||
github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no=
|
||||
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
|
||||
github.com/go-playground/validator/v10 v10.4.1 h1:pH2c5ADXtd66mxoE0Zm9SUhxE20r7aM3F26W0hOn+GE=
|
||||
github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4=
|
||||
github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA=
|
||||
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
|
||||
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
|
||||
|
@ -123,6 +134,7 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
|
|||
github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0=
|
||||
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
|
||||
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
|
||||
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
|
||||
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
|
||||
|
@ -140,6 +152,7 @@ github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
|
|||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
|
||||
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/gops v0.3.18 h1:my259V+172PVFmduS2RAsq4FKH+HjKqdh7pLr17Ot8c=
|
||||
github.com/google/gops v0.3.18/go.mod h1:Pfp8hWGIFdV/7rY9/O/U5WgdjYQXf/GiEK4NVuVd2ZE=
|
||||
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
|
||||
|
@ -226,6 +239,8 @@ github.com/johntdyer/slackrus v0.0.0-20180518184837-f7aae3243a07/go.mod h1:j1kV/
|
|||
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
|
||||
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
|
||||
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
|
||||
github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns=
|
||||
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
|
||||
github.com/jteeuwen/go-bindata v3.0.7+incompatible/go.mod h1:JVvhzYOiGBnFSYRyV00iY8q7/0PThjIYav1p9h5dmKs=
|
||||
github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
||||
|
@ -265,6 +280,8 @@ github.com/lbryio/ozzo-validation v0.0.0-20170323141101-d1008ad1fd04/go.mod h1:f
|
|||
github.com/lbryio/types v0.0.0-20190422033210-321fb2abda9c/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE=
|
||||
github.com/lbryio/types v0.0.0-20201019032447-f0b4476ef386 h1:JOQkGpeCM9FWkEHRx+kRPqySPCXElNW1em1++7tVS4M=
|
||||
github.com/lbryio/types v0.0.0-20201019032447-f0b4476ef386/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE=
|
||||
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
|
||||
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
|
||||
github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
github.com/lucas-clemente/quic-go v0.20.1 h1:hb5m76V8QS/8Nw/suHvXqo3BMHAozvIkcnzpJdpanSk=
|
||||
github.com/lucas-clemente/quic-go v0.20.1/go.mod h1:fZq/HUDIM+mW6X6wtzORjC0E/WDBMKe5Hf9bgjISwLk=
|
||||
|
@ -285,6 +302,8 @@ github.com/marten-seemann/qtls-go1-16 v0.1.3 h1:XEZ1xGorVy9u+lJq+WXNE+hiqRYLNvJG
|
|||
github.com/marten-seemann/qtls-go1-16 v0.1.3/go.mod h1:gNpI2Ol+lRS3WwSOtIUUtRwZEQMXjYK+dQSBFbethAk=
|
||||
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
|
||||
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
||||
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
|
||||
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
|
||||
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
||||
|
@ -300,7 +319,11 @@ github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:F
|
|||
github.com/mitchellh/mapstructure v0.0.0-20180511142126-bb74f1db0675/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
|
||||
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
|
||||
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
|
||||
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo=
|
||||
|
@ -445,6 +468,10 @@ github.com/tklauser/numcpus v0.2.1/go.mod h1:9aU+wOc6WjUIZEwWMP62PL/41d65P+iks1g
|
|||
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
|
||||
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
|
||||
github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=
|
||||
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
|
||||
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
|
||||
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
|
||||
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
|
||||
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
|
||||
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
|
||||
github.com/volatiletech/inflect v0.0.0-20170731032912-e7201282ae8d h1:gI4/tqP6lCY5k6Sg+4k9qSoBXmPwG+xXgMpK7jivD4M=
|
||||
|
@ -566,6 +593,7 @@ golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7w
|
|||
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191009170203-06d7bd2c5f4f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
|
@ -679,6 +707,7 @@ gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bl
|
|||
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
|
||||
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
|
||||
|
|
|
@ -194,6 +194,11 @@ var (
|
|||
Name: "udp_in_bytes",
|
||||
Help: "Total number of bytes downloaded through UDP",
|
||||
})
|
||||
MtrInBytesHttp = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Name: "http_in_bytes",
|
||||
Help: "Total number of bytes downloaded through HTTP",
|
||||
})
|
||||
MtrOutBytesUdp = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Name: "udp_out_bytes",
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
package peer
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
"github.com/lbryio/reflector.go/shared"
|
||||
"github.com/lbryio/reflector.go/store"
|
||||
)
|
||||
|
||||
// Store is a blob store that gets blobs from a peer.
|
||||
|
@ -51,7 +53,12 @@ func (p *Store) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
|||
return nil, shared.NewBlobTrace(time.Since(start), p.Name()), err
|
||||
}
|
||||
defer c.Close()
|
||||
return c.GetBlob(hash)
|
||||
blob, trace, err := c.GetBlob(hash)
|
||||
if err != nil && strings.Contains(err.Error(), "blob not found") {
|
||||
return nil, trace, store.ErrBlobNotFound
|
||||
}
|
||||
|
||||
return blob, trace, err
|
||||
}
|
||||
|
||||
// Put is not supported
|
||||
|
|
53
server/http/routes.go
Normal file
53
server/http/routes.go
Normal file
|
@ -0,0 +1,53 @@
|
|||
package http
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/reflector.go/store"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func (s *Server) getBlob(c *gin.Context) {
|
||||
hash := c.Query("hash")
|
||||
blob, trace, err := s.store.Get(hash)
|
||||
if err != nil {
|
||||
serialized, serializeErr := trace.Serialize()
|
||||
if serializeErr != nil {
|
||||
_ = c.AbortWithError(http.StatusInternalServerError, errors.Prefix(serializeErr.Error(), err))
|
||||
return
|
||||
}
|
||||
c.Header("Via", serialized)
|
||||
|
||||
if errors.Is(err, store.ErrBlobNotFound) {
|
||||
log.Errorf("wtf: %s", err.Error())
|
||||
c.AbortWithStatus(http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
_ = c.AbortWithError(http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
serialized, err := trace.Serialize()
|
||||
if err != nil {
|
||||
_ = c.AbortWithError(http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
c.Header("Via", serialized)
|
||||
c.Header("Content-Disposition", "filename="+hash)
|
||||
c.Data(http.StatusOK, "application/octet-stream", blob)
|
||||
}
|
||||
|
||||
func (s *Server) hasBlob(c *gin.Context) {
|
||||
hash := c.Query("hash")
|
||||
has, err := s.store.Has(hash)
|
||||
if err != nil {
|
||||
_ = c.AbortWithError(http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
if has {
|
||||
c.Status(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
c.Status(http.StatusNotFound)
|
||||
}
|
68
server/http/server.go
Normal file
68
server/http/server.go
Normal file
|
@ -0,0 +1,68 @@
|
|||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/lbryio/lbry.go/v2/extras/stop"
|
||||
"github.com/lbryio/reflector.go/store"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// Server is an instance of a peer server that houses the listener and store.
|
||||
type Server struct {
|
||||
store store.BlobStore
|
||||
grp *stop.Group
|
||||
}
|
||||
|
||||
// NewServer returns an initialized Server pointer.
|
||||
func NewServer(store store.BlobStore) *Server {
|
||||
return &Server{
|
||||
store: store,
|
||||
grp: stop.New(),
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown gracefully shuts down the peer server.
|
||||
func (s *Server) Shutdown() {
|
||||
log.Debug("shutting down HTTP server")
|
||||
s.grp.StopAndWait()
|
||||
log.Debug("HTTP server stopped")
|
||||
}
|
||||
|
||||
// Start starts the server listener to handle connections.
|
||||
func (s *Server) Start(address string) error {
|
||||
gin.SetMode(gin.ReleaseMode)
|
||||
router := gin.Default()
|
||||
router.GET("/blob", s.getBlob)
|
||||
router.HEAD("/blob", s.hasBlob)
|
||||
srv := &http.Server{
|
||||
Addr: address,
|
||||
Handler: router,
|
||||
}
|
||||
go s.listenForShutdown(srv)
|
||||
// Initializing the server in a goroutine so that
|
||||
// it won't block the graceful shutdown handling below
|
||||
s.grp.Add(1)
|
||||
go func() {
|
||||
defer s.grp.Done()
|
||||
log.Println("HTTP server listening on " + address)
|
||||
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
log.Fatalf("listen: %s\n", err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) listenForShutdown(listener *http.Server) {
|
||||
<-s.grp.Ch()
|
||||
// The context is used to inform the server it has 5 seconds to finish
|
||||
// the request it is currently handling
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := listener.Shutdown(ctx); err != nil {
|
||||
log.Fatal("Server forced to shutdown:", err)
|
||||
}
|
||||
}
|
|
@ -130,12 +130,6 @@ func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
|||
func (d *DiskStore) Put(hash string, blob stream.Blob) error {
|
||||
d.lock.Lock(hash)
|
||||
defer d.lock.Unlock(hash)
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
if time.Since(start) > 100*time.Millisecond {
|
||||
log.Infof("it took %s to write %s", time.Since(start), hash)
|
||||
}
|
||||
}()
|
||||
err := d.initOnce()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -145,23 +139,7 @@ func (d *DiskStore) Put(hash string, blob stream.Blob) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
hashBytes := sha512.Sum384(blob)
|
||||
readHash := hex.EncodeToString(hashBytes[:])
|
||||
matchesBeforeWriting := readHash == hash
|
||||
writeFile(d.path(hash), blob, 0644)
|
||||
readBlob, err := ioutil.ReadFile(d.path(hash))
|
||||
matchesAfterReading := false
|
||||
if err != nil {
|
||||
log.Errorf("for some fucking reasons I can't read the blob I just wrote %s", err.Error())
|
||||
} else {
|
||||
hashBytes = sha512.Sum384(readBlob)
|
||||
readHash = hex.EncodeToString(hashBytes[:])
|
||||
matchesAfterReading = readHash == hash
|
||||
}
|
||||
|
||||
log.Infof(`writing %s to disk: hash match: %t, error: %t
|
||||
reading after writing: hash match: %t`, hash, matchesBeforeWriting, err == nil, matchesAfterReading)
|
||||
|
||||
return errors.Err(err)
|
||||
}
|
||||
|
||||
|
|
152
store/http.go
Normal file
152
store/http.go
Normal file
|
@ -0,0 +1,152 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
"github.com/lbryio/reflector.go/internal/metrics"
|
||||
"github.com/lbryio/reflector.go/shared"
|
||||
)
|
||||
|
||||
// NoopStore is a store that does nothing
|
||||
type HttpStore struct {
|
||||
upstream string
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
func NewHttpStore(upstream string) *HttpStore {
|
||||
return &HttpStore{
|
||||
upstream: upstream,
|
||||
httpClient: getClient(),
|
||||
}
|
||||
}
|
||||
|
||||
const nameHttp = "http"
|
||||
|
||||
func (n *HttpStore) Name() string { return nameNoop }
|
||||
func (n *HttpStore) Has(hash string) (bool, error) {
|
||||
url := n.upstream + "/blob?hash=" + hash
|
||||
|
||||
req, err := http.NewRequest("HEAD", url, nil)
|
||||
if err != nil {
|
||||
return false, errors.Err(err)
|
||||
}
|
||||
|
||||
res, err := n.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return false, errors.Err(err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode == http.StatusNotFound {
|
||||
return false, nil
|
||||
}
|
||||
if res.StatusCode == http.StatusNoContent {
|
||||
return true, nil
|
||||
}
|
||||
var body []byte
|
||||
if res.Body != nil {
|
||||
body, _ = ioutil.ReadAll(res.Body)
|
||||
}
|
||||
return false, errors.Err("upstream error. Status code: %d (%s)", res.StatusCode, string(body))
|
||||
}
|
||||
|
||||
func (n *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||
start := time.Now()
|
||||
url := n.upstream + "/blob?hash=" + hash
|
||||
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, shared.NewBlobTrace(time.Since(start), n.Name()), errors.Err(err)
|
||||
}
|
||||
|
||||
res, err := n.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, shared.NewBlobTrace(time.Since(start), n.Name()), errors.Err(err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
tmp := getBuffer()
|
||||
defer putBuffer(tmp)
|
||||
serialized := res.Header.Get("Via")
|
||||
trace := shared.NewBlobTrace(time.Since(start), n.Name())
|
||||
if serialized != "" {
|
||||
parsedTrace, err := shared.Deserialize(serialized)
|
||||
if err != nil {
|
||||
return nil, shared.NewBlobTrace(time.Since(start), n.Name()), err
|
||||
}
|
||||
trace = *parsedTrace
|
||||
}
|
||||
|
||||
if res.StatusCode == http.StatusNotFound {
|
||||
return nil, trace.Stack(time.Since(start), n.Name()), ErrBlobNotFound
|
||||
}
|
||||
if res.StatusCode == http.StatusOK {
|
||||
written, err := io.Copy(tmp, res.Body)
|
||||
if err != nil {
|
||||
return nil, trace.Stack(time.Since(start), n.Name()), errors.Err(err)
|
||||
}
|
||||
|
||||
blob := make([]byte, written)
|
||||
copy(blob, tmp.Bytes())
|
||||
metrics.MtrInBytesHttp.Add(float64(len(blob)))
|
||||
return blob, trace.Stack(time.Since(start), n.Name()), ErrBlobNotFound
|
||||
}
|
||||
var body []byte
|
||||
if res.Body != nil {
|
||||
body, _ = ioutil.ReadAll(res.Body)
|
||||
}
|
||||
|
||||
return nil, trace.Stack(time.Since(start), n.Name()), errors.Err("upstream error. Status code: %d (%s)", res.StatusCode, string(body))
|
||||
}
|
||||
|
||||
func (n *HttpStore) Put(string, stream.Blob) error {
|
||||
return shared.ErrNotImplemented
|
||||
}
|
||||
func (n *HttpStore) PutSD(string, stream.Blob) error {
|
||||
return shared.ErrNotImplemented
|
||||
}
|
||||
func (n *HttpStore) Delete(string) error {
|
||||
return shared.ErrNotImplemented
|
||||
}
|
||||
func (n *HttpStore) Shutdown() { return }
|
||||
|
||||
// buffer pool to reduce GC
|
||||
// https://www.captaincodeman.com/2017/06/02/golang-buffer-pool-gotcha
|
||||
var buffers = sync.Pool{
|
||||
// New is called when a new instance is needed
|
||||
New: func() interface{} {
|
||||
buf := make([]byte, 0, stream.MaxBlobSize)
|
||||
return bytes.NewBuffer(buf)
|
||||
},
|
||||
}
|
||||
|
||||
// getBuffer fetches a buffer from the pool
|
||||
func getBuffer() *bytes.Buffer {
|
||||
return buffers.Get().(*bytes.Buffer)
|
||||
}
|
||||
|
||||
// putBuffer returns a buffer to the pool
|
||||
func putBuffer(buf *bytes.Buffer) {
|
||||
buf.Reset()
|
||||
buffers.Put(buf)
|
||||
}
|
||||
|
||||
// getClient gets an http client that's customized to be more performant when dealing with blobs of 2MB in size (most of our blobs)
|
||||
func getClient() *http.Client {
|
||||
// Customize the Transport to have larger connection pool
|
||||
defaultRoundTripper := http.DefaultTransport
|
||||
defaultTransportPointer := defaultRoundTripper.(*http.Transport)
|
||||
|
||||
defaultTransport := *defaultTransportPointer // dereference it to get a copy of the struct that the pointer points to
|
||||
defaultTransport.MaxIdleConns = 100
|
||||
defaultTransport.DisableCompression = true
|
||||
defaultTransport.MaxIdleConnsPerHost = 100
|
||||
defaultTransport.ReadBufferSize = stream.MaxBlobSize + 1024*10 //add an extra few KBs to make sure it fits the extra information
|
||||
|
||||
return &http.Client{Transport: &defaultTransport}
|
||||
}
|
|
@ -3,6 +3,7 @@ package store
|
|||
import (
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/reflector.go/internal/metrics"
|
||||
"github.com/lbryio/reflector.go/shared"
|
||||
|
||||
|
@ -46,6 +47,9 @@ func (s *singleflightStore) Get(hash string) (stream.Blob, shared.BlobTrace, err
|
|||
if err != nil {
|
||||
return nil, shared.NewBlobTrace(time.Since(start), s.Name()), err
|
||||
}
|
||||
if gr == nil {
|
||||
return nil, shared.NewBlobTrace(time.Since(start), s.Name()), errors.Err("getter response is nil")
|
||||
}
|
||||
rsp := gr.(getterResponse)
|
||||
return rsp.blob, rsp.stack, nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue