diff --git a/cmd/reflector.go b/cmd/reflector.go index d2acfed..087c918 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -16,6 +16,7 @@ import ( "github.com/lbryio/reflector.go/peer" "github.com/lbryio/reflector.go/peer/http3" "github.com/lbryio/reflector.go/reflector" + "github.com/lbryio/reflector.go/server/http" "github.com/lbryio/reflector.go/store" "github.com/lbryio/lbry.go/v2/stream" @@ -28,6 +29,7 @@ import ( var ( tcpPeerPort int http3PeerPort int + httpPort int receiverPort int metricsPort int disableUploads bool @@ -57,6 +59,7 @@ func init() { cmd.Flags().StringVar(&WasabiEndpoint, "wasabi-endpoint", "", "Wasabi edge endpoint for standard HTTP retrieval") cmd.Flags().IntVar(&tcpPeerPort, "tcp-peer-port", 5567, "The port reflector will distribute content from") cmd.Flags().IntVar(&http3PeerPort, "http3-peer-port", 5568, "The port reflector will distribute content from over HTTP3 protocol") + cmd.Flags().IntVar(&httpPort, "http-port", 5569, "The port reflector will distribute content from over HTTP protocol") cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from") cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for metrics") cmd.Flags().IntVar(&requestQueueSize, "request-queue-size", 200, "How many concurrent requests should be submitted to upstream") @@ -105,6 +108,13 @@ func reflectorCmd(cmd *cobra.Command, args []string) { } defer http3PeerServer.Shutdown() + httpServer := http.NewServer(outerStore) + err = httpServer.Start(":" + strconv.Itoa(httpPort)) + if err != nil { + log.Fatal(err) + } + defer httpServer.Shutdown() + metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics") metricsServer.Start() defer metricsServer.Shutdown() @@ -133,6 +143,8 @@ func setupStore() store.BlobStore { Address: proxyAddress + ":" + proxyPort, Timeout: 30 * time.Second, }) + case "http": + s = store.NewHttpStore(proxyAddress + ":" + proxyPort) default: log.Fatalf("protocol is not recognized: %s", proxyProtocol) } diff --git a/go.mod b/go.mod index 82eaa1e..08137fa 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2 github.com/davecgh/go-spew v1.1.1 + github.com/gin-gonic/gin v1.7.1 // indirect github.com/go-sql-driver/mysql v1.4.1 github.com/golang/protobuf v1.4.2 github.com/google/gops v0.3.18 diff --git a/go.sum b/go.sum index 6af413e..810e719 100644 --- a/go.sum +++ b/go.sum @@ -89,6 +89,10 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.7.1 h1:qC89GU3p8TvKWMAVhEpmpB2CIb1hnqt2UdKZaP93mS8= +github.com/gin-gonic/gin v1.7.1/go.mod h1:jD2toBW3GZUr5UMcdrwQA10I7RuaFOl/SGeDjXkfUtY= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= github.com/go-errors/errors v1.1.1 h1:ljK/pL5ltg3qoN+OtN6yCv9HWSfMwxSx90GJCZQxYNg= @@ -103,6 +107,13 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-ozzo/ozzo-validation v3.5.0+incompatible/go.mod h1:gsEKFIVnabGBt6mXmxK0MoFy+cZoTJY6mu5Ll3LVLBU= github.com/go-ozzo/ozzo-validation v3.6.0+incompatible/go.mod h1:gsEKFIVnabGBt6mXmxK0MoFy+cZoTJY6mu5Ll3LVLBU= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= +github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= +github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator/v10 v10.4.1 h1:pH2c5ADXtd66mxoE0Zm9SUhxE20r7aM3F26W0hOn+GE= +github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4= github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -123,6 +134,7 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= @@ -140,6 +152,7 @@ github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gops v0.3.18 h1:my259V+172PVFmduS2RAsq4FKH+HjKqdh7pLr17Ot8c= github.com/google/gops v0.3.18/go.mod h1:Pfp8hWGIFdV/7rY9/O/U5WgdjYQXf/GiEK4NVuVd2ZE= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= @@ -226,6 +239,8 @@ github.com/johntdyer/slackrus v0.0.0-20180518184837-f7aae3243a07/go.mod h1:j1kV/ github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns= +github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jteeuwen/go-bindata v3.0.7+incompatible/go.mod h1:JVvhzYOiGBnFSYRyV00iY8q7/0PThjIYav1p9h5dmKs= github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= @@ -265,6 +280,8 @@ github.com/lbryio/ozzo-validation v0.0.0-20170323141101-d1008ad1fd04/go.mod h1:f github.com/lbryio/types v0.0.0-20190422033210-321fb2abda9c/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE= github.com/lbryio/types v0.0.0-20201019032447-f0b4476ef386 h1:JOQkGpeCM9FWkEHRx+kRPqySPCXElNW1em1++7tVS4M= github.com/lbryio/types v0.0.0-20201019032447-f0b4476ef386/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE= +github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= +github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lucas-clemente/quic-go v0.20.1 h1:hb5m76V8QS/8Nw/suHvXqo3BMHAozvIkcnzpJdpanSk= github.com/lucas-clemente/quic-go v0.20.1/go.mod h1:fZq/HUDIM+mW6X6wtzORjC0E/WDBMKe5Hf9bgjISwLk= @@ -285,6 +302,8 @@ github.com/marten-seemann/qtls-go1-16 v0.1.3 h1:XEZ1xGorVy9u+lJq+WXNE+hiqRYLNvJG github.com/marten-seemann/qtls-go1-16 v0.1.3/go.mod h1:gNpI2Ol+lRS3WwSOtIUUtRwZEQMXjYK+dQSBFbethAk= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= +github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= @@ -300,7 +319,11 @@ github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:F github.com/mitchellh/mapstructure v0.0.0-20180511142126-bb74f1db0675/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= @@ -445,6 +468,10 @@ github.com/tklauser/numcpus v0.2.1/go.mod h1:9aU+wOc6WjUIZEwWMP62PL/41d65P+iks1g github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g= +github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= +github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= +github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= +github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= github.com/volatiletech/inflect v0.0.0-20170731032912-e7201282ae8d h1:gI4/tqP6lCY5k6Sg+4k9qSoBXmPwG+xXgMpK7jivD4M= @@ -566,6 +593,7 @@ golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191009170203-06d7bd2c5f4f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -679,6 +707,7 @@ gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bl gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 4c30606..ca65021 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -194,6 +194,11 @@ var ( Name: "udp_in_bytes", Help: "Total number of bytes downloaded through UDP", }) + MtrInBytesHttp = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "http_in_bytes", + Help: "Total number of bytes downloaded through HTTP", + }) MtrOutBytesUdp = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, Name: "udp_out_bytes", diff --git a/peer/store.go b/peer/store.go index 3068794..689d1c0 100644 --- a/peer/store.go +++ b/peer/store.go @@ -1,11 +1,13 @@ package peer import ( + "strings" "time" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/shared" + "github.com/lbryio/reflector.go/store" ) // Store is a blob store that gets blobs from a peer. @@ -51,7 +53,12 @@ func (p *Store) Get(hash string) (stream.Blob, shared.BlobTrace, error) { return nil, shared.NewBlobTrace(time.Since(start), p.Name()), err } defer c.Close() - return c.GetBlob(hash) + blob, trace, err := c.GetBlob(hash) + if err != nil && strings.Contains(err.Error(), "blob not found") { + return nil, trace, store.ErrBlobNotFound + } + + return blob, trace, err } // Put is not supported diff --git a/server/http/routes.go b/server/http/routes.go new file mode 100644 index 0000000..eee696f --- /dev/null +++ b/server/http/routes.go @@ -0,0 +1,53 @@ +package http + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/reflector.go/store" + log "github.com/sirupsen/logrus" +) + +func (s *Server) getBlob(c *gin.Context) { + hash := c.Query("hash") + blob, trace, err := s.store.Get(hash) + if err != nil { + serialized, serializeErr := trace.Serialize() + if serializeErr != nil { + _ = c.AbortWithError(http.StatusInternalServerError, errors.Prefix(serializeErr.Error(), err)) + return + } + c.Header("Via", serialized) + + if errors.Is(err, store.ErrBlobNotFound) { + log.Errorf("wtf: %s", err.Error()) + c.AbortWithStatus(http.StatusNotFound) + return + } + _ = c.AbortWithError(http.StatusInternalServerError, err) + return + } + serialized, err := trace.Serialize() + if err != nil { + _ = c.AbortWithError(http.StatusInternalServerError, err) + return + } + c.Header("Via", serialized) + c.Header("Content-Disposition", "filename="+hash) + c.Data(http.StatusOK, "application/octet-stream", blob) +} + +func (s *Server) hasBlob(c *gin.Context) { + hash := c.Query("hash") + has, err := s.store.Has(hash) + if err != nil { + _ = c.AbortWithError(http.StatusInternalServerError, err) + return + } + if has { + c.Status(http.StatusNoContent) + return + } + c.Status(http.StatusNotFound) +} diff --git a/server/http/server.go b/server/http/server.go new file mode 100644 index 0000000..adc104f --- /dev/null +++ b/server/http/server.go @@ -0,0 +1,68 @@ +package http + +import ( + "context" + "net/http" + "time" + + "github.com/gin-gonic/gin" + "github.com/lbryio/lbry.go/v2/extras/stop" + "github.com/lbryio/reflector.go/store" + log "github.com/sirupsen/logrus" +) + +// Server is an instance of a peer server that houses the listener and store. +type Server struct { + store store.BlobStore + grp *stop.Group +} + +// NewServer returns an initialized Server pointer. +func NewServer(store store.BlobStore) *Server { + return &Server{ + store: store, + grp: stop.New(), + } +} + +// Shutdown gracefully shuts down the peer server. +func (s *Server) Shutdown() { + log.Debug("shutting down HTTP server") + s.grp.StopAndWait() + log.Debug("HTTP server stopped") +} + +// Start starts the server listener to handle connections. +func (s *Server) Start(address string) error { + gin.SetMode(gin.ReleaseMode) + router := gin.Default() + router.GET("/blob", s.getBlob) + router.HEAD("/blob", s.hasBlob) + srv := &http.Server{ + Addr: address, + Handler: router, + } + go s.listenForShutdown(srv) + // Initializing the server in a goroutine so that + // it won't block the graceful shutdown handling below + s.grp.Add(1) + go func() { + defer s.grp.Done() + log.Println("HTTP server listening on " + address) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("listen: %s\n", err) + } + }() + return nil +} + +func (s *Server) listenForShutdown(listener *http.Server) { + <-s.grp.Ch() + // The context is used to inform the server it has 5 seconds to finish + // the request it is currently handling + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := listener.Shutdown(ctx); err != nil { + log.Fatal("Server forced to shutdown:", err) + } +} diff --git a/store/disk.go b/store/disk.go index 7123b65..fb5e326 100644 --- a/store/disk.go +++ b/store/disk.go @@ -130,12 +130,6 @@ func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { func (d *DiskStore) Put(hash string, blob stream.Blob) error { d.lock.Lock(hash) defer d.lock.Unlock(hash) - start := time.Now() - defer func() { - if time.Since(start) > 100*time.Millisecond { - log.Infof("it took %s to write %s", time.Since(start), hash) - } - }() err := d.initOnce() if err != nil { return err @@ -145,23 +139,7 @@ func (d *DiskStore) Put(hash string, blob stream.Blob) error { if err != nil { return err } - hashBytes := sha512.Sum384(blob) - readHash := hex.EncodeToString(hashBytes[:]) - matchesBeforeWriting := readHash == hash writeFile(d.path(hash), blob, 0644) - readBlob, err := ioutil.ReadFile(d.path(hash)) - matchesAfterReading := false - if err != nil { - log.Errorf("for some fucking reasons I can't read the blob I just wrote %s", err.Error()) - } else { - hashBytes = sha512.Sum384(readBlob) - readHash = hex.EncodeToString(hashBytes[:]) - matchesAfterReading = readHash == hash - } - - log.Infof(`writing %s to disk: hash match: %t, error: %t -reading after writing: hash match: %t`, hash, matchesBeforeWriting, err == nil, matchesAfterReading) - return errors.Err(err) } diff --git a/store/http.go b/store/http.go new file mode 100644 index 0000000..8a934d7 --- /dev/null +++ b/store/http.go @@ -0,0 +1,152 @@ +package store + +import ( + "bytes" + "io" + "io/ioutil" + "net/http" + "sync" + "time" + + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/internal/metrics" + "github.com/lbryio/reflector.go/shared" +) + +// NoopStore is a store that does nothing +type HttpStore struct { + upstream string + httpClient *http.Client +} + +func NewHttpStore(upstream string) *HttpStore { + return &HttpStore{ + upstream: upstream, + httpClient: getClient(), + } +} + +const nameHttp = "http" + +func (n *HttpStore) Name() string { return nameNoop } +func (n *HttpStore) Has(hash string) (bool, error) { + url := n.upstream + "/blob?hash=" + hash + + req, err := http.NewRequest("HEAD", url, nil) + if err != nil { + return false, errors.Err(err) + } + + res, err := n.httpClient.Do(req) + if err != nil { + return false, errors.Err(err) + } + defer res.Body.Close() + if res.StatusCode == http.StatusNotFound { + return false, nil + } + if res.StatusCode == http.StatusNoContent { + return true, nil + } + var body []byte + if res.Body != nil { + body, _ = ioutil.ReadAll(res.Body) + } + return false, errors.Err("upstream error. Status code: %d (%s)", res.StatusCode, string(body)) +} + +func (n *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { + start := time.Now() + url := n.upstream + "/blob?hash=" + hash + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, shared.NewBlobTrace(time.Since(start), n.Name()), errors.Err(err) + } + + res, err := n.httpClient.Do(req) + if err != nil { + return nil, shared.NewBlobTrace(time.Since(start), n.Name()), errors.Err(err) + } + defer res.Body.Close() + tmp := getBuffer() + defer putBuffer(tmp) + serialized := res.Header.Get("Via") + trace := shared.NewBlobTrace(time.Since(start), n.Name()) + if serialized != "" { + parsedTrace, err := shared.Deserialize(serialized) + if err != nil { + return nil, shared.NewBlobTrace(time.Since(start), n.Name()), err + } + trace = *parsedTrace + } + + if res.StatusCode == http.StatusNotFound { + return nil, trace.Stack(time.Since(start), n.Name()), ErrBlobNotFound + } + if res.StatusCode == http.StatusOK { + written, err := io.Copy(tmp, res.Body) + if err != nil { + return nil, trace.Stack(time.Since(start), n.Name()), errors.Err(err) + } + + blob := make([]byte, written) + copy(blob, tmp.Bytes()) + metrics.MtrInBytesHttp.Add(float64(len(blob))) + return blob, trace.Stack(time.Since(start), n.Name()), ErrBlobNotFound + } + var body []byte + if res.Body != nil { + body, _ = ioutil.ReadAll(res.Body) + } + + return nil, trace.Stack(time.Since(start), n.Name()), errors.Err("upstream error. Status code: %d (%s)", res.StatusCode, string(body)) +} + +func (n *HttpStore) Put(string, stream.Blob) error { + return shared.ErrNotImplemented +} +func (n *HttpStore) PutSD(string, stream.Blob) error { + return shared.ErrNotImplemented +} +func (n *HttpStore) Delete(string) error { + return shared.ErrNotImplemented +} +func (n *HttpStore) Shutdown() { return } + +// buffer pool to reduce GC +// https://www.captaincodeman.com/2017/06/02/golang-buffer-pool-gotcha +var buffers = sync.Pool{ + // New is called when a new instance is needed + New: func() interface{} { + buf := make([]byte, 0, stream.MaxBlobSize) + return bytes.NewBuffer(buf) + }, +} + +// getBuffer fetches a buffer from the pool +func getBuffer() *bytes.Buffer { + return buffers.Get().(*bytes.Buffer) +} + +// putBuffer returns a buffer to the pool +func putBuffer(buf *bytes.Buffer) { + buf.Reset() + buffers.Put(buf) +} + +// getClient gets an http client that's customized to be more performant when dealing with blobs of 2MB in size (most of our blobs) +func getClient() *http.Client { + // Customize the Transport to have larger connection pool + defaultRoundTripper := http.DefaultTransport + defaultTransportPointer := defaultRoundTripper.(*http.Transport) + + defaultTransport := *defaultTransportPointer // dereference it to get a copy of the struct that the pointer points to + defaultTransport.MaxIdleConns = 100 + defaultTransport.DisableCompression = true + defaultTransport.MaxIdleConnsPerHost = 100 + defaultTransport.ReadBufferSize = stream.MaxBlobSize + 1024*10 //add an extra few KBs to make sure it fits the extra information + + return &http.Client{Transport: &defaultTransport} +} diff --git a/store/singleflight.go b/store/singleflight.go index 6de91f4..02337a2 100644 --- a/store/singleflight.go +++ b/store/singleflight.go @@ -3,6 +3,7 @@ package store import ( "time" + "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/shared" @@ -46,6 +47,9 @@ func (s *singleflightStore) Get(hash string) (stream.Blob, shared.BlobTrace, err if err != nil { return nil, shared.NewBlobTrace(time.Since(start), s.Name()), err } + if gr == nil { + return nil, shared.NewBlobTrace(time.Since(start), s.Name()), errors.Err("getter response is nil") + } rsp := gr.(getterResponse) return rsp.blob, rsp.stack, nil }