Compare commits
1 commit
Author | SHA1 | Date | |
---|---|---|---|
|
71fb06fbb8 |
46 changed files with 881 additions and 1216 deletions
37
.github/workflows/go.yml
vendored
37
.github/workflows/go.yml
vendored
|
@ -1,37 +0,0 @@
|
||||||
name: Go
|
|
||||||
|
|
||||||
on:
|
|
||||||
push:
|
|
||||||
branches: [ master ]
|
|
||||||
pull_request:
|
|
||||||
branches: [ master ]
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
build:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
steps:
|
|
||||||
- name: Checkout
|
|
||||||
uses: actions/checkout@v3
|
|
||||||
|
|
||||||
- name: Set up Go
|
|
||||||
uses: actions/setup-go@v3
|
|
||||||
with:
|
|
||||||
go-version: 1.20.x
|
|
||||||
|
|
||||||
- name: Build linux
|
|
||||||
run: make linux
|
|
||||||
|
|
||||||
- name: Build macos
|
|
||||||
run: make macos
|
|
||||||
|
|
||||||
- name: Test
|
|
||||||
run: make test
|
|
||||||
|
|
||||||
- name: Lint
|
|
||||||
run: make lint
|
|
||||||
|
|
||||||
- name: retrieve all tags
|
|
||||||
run: git fetch --prune --unshallow --tags
|
|
||||||
|
|
||||||
- name: Print changes since last version
|
|
||||||
run: git log $(git describe --tags --abbrev=0)..HEAD --no-merges --oneline
|
|
62
.github/workflows/release.yml
vendored
62
.github/workflows/release.yml
vendored
|
@ -1,62 +0,0 @@
|
||||||
name: release
|
|
||||||
|
|
||||||
on:
|
|
||||||
push:
|
|
||||||
tags:
|
|
||||||
- "*.*.*"
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
build:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
steps:
|
|
||||||
- name: Checkout
|
|
||||||
uses: actions/checkout@v3
|
|
||||||
|
|
||||||
- name: Set up Go
|
|
||||||
uses: actions/setup-go@v3
|
|
||||||
with:
|
|
||||||
go-version: 1.20.x
|
|
||||||
|
|
||||||
- name: Build linux
|
|
||||||
run: make linux
|
|
||||||
|
|
||||||
- name: Build macos
|
|
||||||
run: make macos
|
|
||||||
|
|
||||||
- name: Test
|
|
||||||
run: make test
|
|
||||||
|
|
||||||
- name: Lint
|
|
||||||
run: make lint
|
|
||||||
|
|
||||||
- name: Zip macos
|
|
||||||
run: zip -r reflector_darwin_amd64.zip ./dist/darwin_amd64
|
|
||||||
|
|
||||||
- name: Zip linux
|
|
||||||
run: zip -r reflector_linux_amd64.zip ./dist/linux_amd64
|
|
||||||
|
|
||||||
- name: retrieve all tags
|
|
||||||
run: git fetch --prune --unshallow --tags
|
|
||||||
|
|
||||||
- name: Generate Changelog
|
|
||||||
run: git log $(git describe --tags --abbrev=0 @^)..@ --no-merges --oneline > ${{ github.workspace }}-CHANGELOG.txt
|
|
||||||
|
|
||||||
- name: upload to github releases
|
|
||||||
uses: softprops/action-gh-release@v1
|
|
||||||
with:
|
|
||||||
files: |
|
|
||||||
./reflector_linux_amd64.zip
|
|
||||||
./reflector_darwin_amd64.zip
|
|
||||||
body_path: ${{ github.workspace }}-CHANGELOG.txt
|
|
||||||
|
|
||||||
# - name: Login to DockerHub
|
|
||||||
# uses: docker/login-action@v2
|
|
||||||
# with:
|
|
||||||
# username: ${{ secrets.DOCKERHUB_USERNAME }}
|
|
||||||
# password: ${{ secrets.DOCKERHUB_TOKEN }}
|
|
||||||
|
|
||||||
# - name: Generate docker image
|
|
||||||
# run: make image
|
|
||||||
|
|
||||||
# - name: Docker push
|
|
||||||
# run: make publish_image
|
|
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -1,4 +1,3 @@
|
||||||
/vendor
|
/vendor
|
||||||
/config.json*
|
/config.json*
|
||||||
/dist
|
|
||||||
/bin
|
/bin
|
||||||
|
|
27
.travis.yml
27
.travis.yml
|
@ -1,9 +1,12 @@
|
||||||
os: linux
|
os: linux
|
||||||
dist: bionic
|
dist: trusty
|
||||||
language: go
|
language: go
|
||||||
|
|
||||||
|
env:
|
||||||
|
- GO111MODULE=on
|
||||||
|
|
||||||
go:
|
go:
|
||||||
- 1.20.x
|
- 1.15.x
|
||||||
|
|
||||||
cache:
|
cache:
|
||||||
directories:
|
directories:
|
||||||
|
@ -14,7 +17,7 @@ notifications:
|
||||||
email: false
|
email: false
|
||||||
|
|
||||||
# Skip the install step. Don't `go get` dependencies. Only build with the code in vendor/
|
# Skip the install step. Don't `go get` dependencies. Only build with the code in vendor/
|
||||||
#install: true
|
install: true
|
||||||
|
|
||||||
# Anything in before_script that returns a nonzero exit code will
|
# Anything in before_script that returns a nonzero exit code will
|
||||||
# flunk the build and immediately stop. It's sorta like having
|
# flunk the build and immediately stop. It's sorta like having
|
||||||
|
@ -22,14 +25,14 @@ notifications:
|
||||||
before_script:
|
before_script:
|
||||||
# All the .go files, excluding vendor/ and model (auto generated)
|
# All the .go files, excluding vendor/ and model (auto generated)
|
||||||
- GO_FILES=$(find . -iname '*.go' ! -iname '*_test.go' -type f | grep -v /vendor/ ) #i wish we were this crazy :p
|
- GO_FILES=$(find . -iname '*.go' ! -iname '*_test.go' -type f | grep -v /vendor/ ) #i wish we were this crazy :p
|
||||||
- go install golang.org/x/tools/cmd/goimports # Used in build script for generated files
|
- go get golang.org/x/tools/cmd/goimports # Used in build script for generated files
|
||||||
# - go get github.com/golang/lint/golint # Linter
|
# - go get github.com/golang/lint/golint # Linter
|
||||||
# - go get honnef.co/go/tools/cmd/megacheck # Badass static analyzer/linter
|
# - go get honnef.co/go/tools/cmd/megacheck # Badass static analyzer/linter
|
||||||
- go install github.com/fzipp/gocyclo/cmd/gocyclo@latest # Check against high complexity
|
- go get github.com/jgautheron/gocyclo # Check against high complexity
|
||||||
- go install github.com/mdempsky/unconvert@latest # Identifies unnecessary type conversions
|
- go get github.com/mdempsky/unconvert # Identifies unnecessary type conversions
|
||||||
- go install github.com/kisielk/errcheck@latest # Checks for unhandled errors
|
- go get github.com/kisielk/errcheck # Checks for unhandled errors
|
||||||
- go install github.com/opennota/check/cmd/varcheck@latest # Checks for unused vars
|
- go get github.com/opennota/check/cmd/varcheck # Checks for unused vars
|
||||||
- go install github.com/opennota/check/cmd/structcheck@latest # Checks for unused fields in structs
|
- go get github.com/opennota/check/cmd/structcheck # Checks for unused fields in structs
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -37,7 +40,7 @@ before_script:
|
||||||
# in a modern Go project.
|
# in a modern Go project.
|
||||||
script:
|
script:
|
||||||
# Fail if a .go file hasn't been formatted with gofmt
|
# Fail if a .go file hasn't been formatted with gofmt
|
||||||
- for i in $GO_FILES; do test -z $(gofmt -s -l $i); done
|
- test -z $(gofmt -s -l $GO_FILES)
|
||||||
# Run unit tests
|
# Run unit tests
|
||||||
- make test
|
- make test
|
||||||
# Checks for unused vars and fields on structs
|
# Checks for unused vars and fields on structs
|
||||||
|
@ -56,11 +59,11 @@ script:
|
||||||
# one last linter - ignore autogen code
|
# one last linter - ignore autogen code
|
||||||
#- golint -set_exit_status $(go list ./... | grep -v /vendor/ )
|
#- golint -set_exit_status $(go list ./... | grep -v /vendor/ )
|
||||||
# Finally, build the binary
|
# Finally, build the binary
|
||||||
- make linux
|
- make
|
||||||
|
|
||||||
deploy:
|
deploy:
|
||||||
- provider: s3
|
- provider: s3
|
||||||
local_dir: ./dist/linux_amd64
|
local_dir: ./bin
|
||||||
skip_cleanup: true
|
skip_cleanup: true
|
||||||
on:
|
on:
|
||||||
repo: lbryio/reflector.go
|
repo: lbryio/reflector.go
|
||||||
|
|
|
@ -3,7 +3,7 @@ EXPOSE 8080
|
||||||
|
|
||||||
RUN mkdir /app
|
RUN mkdir /app
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
COPY dist/linux_amd64/prism-bin ./prism
|
COPY bin/prism-bin ./prism
|
||||||
RUN chmod +x prism
|
RUN chmod +x prism
|
||||||
|
|
||||||
ENTRYPOINT [ "/app/prism" ]
|
ENTRYPOINT [ "/app/prism" ]
|
||||||
|
|
44
Makefile
44
Makefile
|
@ -1,33 +1,25 @@
|
||||||
version := $(shell git describe --dirty --always --long --abbrev=7)
|
|
||||||
commit := $(shell git rev-parse --short HEAD)
|
|
||||||
commit_long := $(shell git rev-parse HEAD)
|
|
||||||
branch := $(shell git rev-parse --abbrev-ref HEAD)
|
|
||||||
curTime := $(shell date +%s)
|
|
||||||
|
|
||||||
BINARY=prism-bin
|
BINARY=prism-bin
|
||||||
IMPORT_PATH = github.com/lbryio/reflector.go
|
|
||||||
LDFLAGS="-X ${IMPORT_PATH}/meta.version=$(version) -X ${IMPORT_PATH}/meta.commit=$(commit) -X ${IMPORT_PATH}/meta.commitLong=$(commit_long) -X ${IMPORT_PATH}/meta.branch=$(branch) -X '${IMPORT_PATH}/meta.Time=$(curTime)'"
|
|
||||||
DIR = $(shell cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd)
|
DIR = $(shell cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd)
|
||||||
BIN_DIR = $(DIR)/dist
|
BIN_DIR = ${DIR}/bin
|
||||||
|
IMPORT_PATH = github.com/lbryio/reflector.go
|
||||||
|
|
||||||
.DEFAULT_GOAL := linux
|
VERSION = $(shell git --git-dir=${DIR}/.git describe --dirty --always --long --abbrev=7)
|
||||||
|
LDFLAGS = -ldflags "-X ${IMPORT_PATH}/meta.Version=${VERSION} -X ${IMPORT_PATH}/meta.Time=$(shell date +%s)"
|
||||||
|
|
||||||
|
|
||||||
|
.PHONY: build clean test lint
|
||||||
|
.DEFAULT_GOAL: build
|
||||||
|
|
||||||
|
|
||||||
|
build:
|
||||||
|
mkdir -p ${BIN_DIR} && CGO_ENABLED=0 go build ${LDFLAGS} -asmflags -trimpath=${DIR} -o ${BIN_DIR}/${BINARY} main.go
|
||||||
|
|
||||||
|
clean:
|
||||||
|
if [ -f ${BIN_DIR}/${BINARY} ]; then rm ${BIN_DIR}/${BINARY}; fi
|
||||||
|
|
||||||
.PHONY: test
|
|
||||||
test:
|
test:
|
||||||
go test -cover -v ./...
|
go test ./... -v -cover
|
||||||
|
|
||||||
.PHONY: lint
|
|
||||||
lint:
|
lint:
|
||||||
./scripts/lint.sh
|
go get github.com/alecthomas/gometalinter && gometalinter --install && gometalinter ./...
|
||||||
|
|
||||||
.PHONY: linux
|
|
||||||
linux:
|
|
||||||
GOARCH=amd64 GOOS=linux go build -ldflags ${LDFLAGS} -asmflags -trimpath=${DIR} -o ${BIN_DIR}/linux_amd64/${BINARY}
|
|
||||||
|
|
||||||
.PHONY: macos
|
|
||||||
macos:
|
|
||||||
GOARCH=amd64 GOOS=darwin go build -ldflags ${LDFLAGS} -asmflags -trimpath=${DIR} -o ${BIN_DIR}/darwin_amd64/${BINARY}
|
|
||||||
|
|
||||||
.PHONY: image
|
|
||||||
image:
|
|
||||||
docker buildx build -t lbry/reflector:$(version) -t lbry/reflector:latest --platform linux/amd64 .
|
|
|
@ -1,7 +1,7 @@
|
||||||
package cluster
|
package cluster
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io/ioutil"
|
||||||
baselog "log"
|
baselog "log"
|
||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
@ -52,7 +52,7 @@ func (c *Cluster) Connect() error {
|
||||||
conf.MemberlistConfig.AdvertisePort = c.port
|
conf.MemberlistConfig.AdvertisePort = c.port
|
||||||
conf.NodeName = c.name
|
conf.NodeName = c.name
|
||||||
|
|
||||||
nullLogger := baselog.New(io.Discard, "", 0)
|
nullLogger := baselog.New(ioutil.Discard, "", 0)
|
||||||
conf.Logger = nullLogger
|
conf.Logger = nullLogger
|
||||||
|
|
||||||
c.eventCh = make(chan serf.Event)
|
c.eventCh = make(chan serf.Event)
|
||||||
|
|
|
@ -3,6 +3,7 @@ package cmd
|
||||||
import (
|
import (
|
||||||
"crypto/sha512"
|
"crypto/sha512"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
@ -71,7 +72,7 @@ func consume(worker int, tasks <-chan string, done chan<- bool, totalTasks int,
|
||||||
log.Infof("[T%d] %d/%d blobs processed so far. ETA: %s", worker, processedSoFar, totalTasks, remainingTime.String())
|
log.Infof("[T%d] %d/%d blobs processed so far. ETA: %s", worker, processedSoFar, totalTasks, remainingTime.String())
|
||||||
}
|
}
|
||||||
blobPath := path.Join(diskStorePath, b[:2], b)
|
blobPath := path.Join(diskStorePath, b[:2], b)
|
||||||
blob, err := os.ReadFile(blobPath)
|
blob, err := ioutil.ReadFile(blobPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -14,7 +14,7 @@ import (
|
||||||
"github.com/lbryio/reflector.go/meta"
|
"github.com/lbryio/reflector.go/meta"
|
||||||
"github.com/lbryio/reflector.go/reflector"
|
"github.com/lbryio/reflector.go/reflector"
|
||||||
"github.com/lbryio/reflector.go/server/http"
|
"github.com/lbryio/reflector.go/server/http"
|
||||||
"github.com/lbryio/reflector.go/server/http3"
|
http32 "github.com/lbryio/reflector.go/server/http3"
|
||||||
"github.com/lbryio/reflector.go/server/peer"
|
"github.com/lbryio/reflector.go/server/peer"
|
||||||
"github.com/lbryio/reflector.go/store"
|
"github.com/lbryio/reflector.go/store"
|
||||||
|
|
||||||
|
@ -43,7 +43,6 @@ var (
|
||||||
//upstream configuration
|
//upstream configuration
|
||||||
upstreamReflector string
|
upstreamReflector string
|
||||||
upstreamProtocol string
|
upstreamProtocol string
|
||||||
upstreamEdgeToken string
|
|
||||||
|
|
||||||
//downstream configuration
|
//downstream configuration
|
||||||
requestQueueSize int
|
requestQueueSize int
|
||||||
|
@ -85,7 +84,6 @@ func init() {
|
||||||
|
|
||||||
cmd.Flags().StringVar(&upstreamReflector, "upstream-reflector", "", "host:port of a reflector server where blobs are fetched from")
|
cmd.Flags().StringVar(&upstreamReflector, "upstream-reflector", "", "host:port of a reflector server where blobs are fetched from")
|
||||||
cmd.Flags().StringVar(&upstreamProtocol, "upstream-protocol", "http", "protocol used to fetch blobs from another upstream reflector server (tcp/http3/http)")
|
cmd.Flags().StringVar(&upstreamProtocol, "upstream-protocol", "http", "protocol used to fetch blobs from another upstream reflector server (tcp/http3/http)")
|
||||||
cmd.Flags().StringVar(&upstreamEdgeToken, "upstream-edge-token", "", "token used to retrieve/authenticate protected content")
|
|
||||||
|
|
||||||
cmd.Flags().IntVar(&requestQueueSize, "request-queue-size", 200, "How many concurrent requests from downstream should be handled at once (the rest will wait)")
|
cmd.Flags().IntVar(&requestQueueSize, "request-queue-size", 200, "How many concurrent requests from downstream should be handled at once (the rest will wait)")
|
||||||
|
|
||||||
|
@ -125,14 +123,14 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
|
||||||
}
|
}
|
||||||
defer peerServer.Shutdown()
|
defer peerServer.Shutdown()
|
||||||
|
|
||||||
http3PeerServer := http3.NewServer(underlyingStoreWithCaches, requestQueueSize)
|
http3PeerServer := http32.NewServer(underlyingStoreWithCaches, requestQueueSize)
|
||||||
err = http3PeerServer.Start(":" + strconv.Itoa(http3PeerPort))
|
err = http3PeerServer.Start(":" + strconv.Itoa(http3PeerPort))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
defer http3PeerServer.Shutdown()
|
defer http3PeerServer.Shutdown()
|
||||||
|
|
||||||
httpServer := http.NewServer(store.WithSingleFlight("sf-http", underlyingStoreWithCaches), requestQueueSize, upstreamEdgeToken)
|
httpServer := http.NewServer(underlyingStoreWithCaches, requestQueueSize)
|
||||||
err = httpServer.Start(":" + strconv.Itoa(httpPeerPort))
|
err = httpServer.Start(":" + strconv.Itoa(httpPeerPort))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
@ -164,12 +162,12 @@ func initUpstreamStore() store.BlobStore {
|
||||||
Timeout: 30 * time.Second,
|
Timeout: 30 * time.Second,
|
||||||
})
|
})
|
||||||
case "http3":
|
case "http3":
|
||||||
s = http3.NewStore(http3.StoreOpts{
|
s = http32.NewStore(http32.StoreOpts{
|
||||||
Address: upstreamReflector,
|
Address: upstreamReflector,
|
||||||
Timeout: 30 * time.Second,
|
Timeout: 30 * time.Second,
|
||||||
})
|
})
|
||||||
case "http":
|
case "http":
|
||||||
s = store.NewHttpStore(upstreamReflector, upstreamEdgeToken)
|
s = store.NewHttpStore(upstreamReflector)
|
||||||
default:
|
default:
|
||||||
log.Fatalf("protocol is not recognized: %s", upstreamProtocol)
|
log.Fatalf("protocol is not recognized: %s", upstreamProtocol)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
@ -24,7 +25,6 @@ type Config struct {
|
||||||
BucketName string `json:"bucket_name"`
|
BucketName string `json:"bucket_name"`
|
||||||
DBConn string `json:"db_conn"`
|
DBConn string `json:"db_conn"`
|
||||||
SlackHookURL string `json:"slack_hook_url"`
|
SlackHookURL string `json:"slack_hook_url"`
|
||||||
SlackChannel string `json:"slack_channel"`
|
|
||||||
UpdateBinURL string `json:"update_bin_url"`
|
UpdateBinURL string `json:"update_bin_url"`
|
||||||
UpdateCmd string `json:"update_cmd"`
|
UpdateCmd string `json:"update_cmd"`
|
||||||
}
|
}
|
||||||
|
@ -102,7 +102,7 @@ func preRun(cmd *cobra.Command, args []string) {
|
||||||
hook := &slackrus.SlackrusHook{
|
hook := &slackrus.SlackrusHook{
|
||||||
HookURL: globalConfig.SlackHookURL,
|
HookURL: globalConfig.SlackHookURL,
|
||||||
AcceptedLevels: slackrus.LevelThreshold(logrus.InfoLevel),
|
AcceptedLevels: slackrus.LevelThreshold(logrus.InfoLevel),
|
||||||
Channel: globalConfig.SlackChannel,
|
Channel: "#reflector-logs",
|
||||||
//IconEmoji: ":ghost:",
|
//IconEmoji: ":ghost:",
|
||||||
//Username: "reflector.go",
|
//Username: "reflector.go",
|
||||||
}
|
}
|
||||||
|
@ -141,7 +141,7 @@ func argFuncs(funcs ...cobra.PositionalArgs) cobra.PositionalArgs {
|
||||||
func loadConfig(path string) (Config, error) {
|
func loadConfig(path string) (Config, error) {
|
||||||
var c Config
|
var c Config
|
||||||
|
|
||||||
raw, err := os.ReadFile(path)
|
raw, err := ioutil.ReadFile(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return c, errors.Err("config file not found")
|
return c, errors.Err("config file not found")
|
||||||
|
|
10
cmd/send.go
10
cmd/send.go
|
@ -5,6 +5,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"path"
|
"path"
|
||||||
|
@ -38,12 +39,13 @@ func sendCmd(cmd *cobra.Command, args []string) {
|
||||||
reflectorAddress := args[0]
|
reflectorAddress := args[0]
|
||||||
err := hackyReflector.Connect(reflectorAddress)
|
err := hackyReflector.Connect(reflectorAddress)
|
||||||
checkErr(err)
|
checkErr(err)
|
||||||
defer func() { _ = hackyReflector.Close() }()
|
defer hackyReflector.Close()
|
||||||
|
|
||||||
filePath := args[1]
|
filePath := args[1]
|
||||||
file, err := os.Open(filePath)
|
file, err := os.Open(filePath)
|
||||||
checkErr(err)
|
checkErr(err)
|
||||||
defer func() { _ = file.Close() }()
|
defer file.Close()
|
||||||
|
|
||||||
sdCachePath := ""
|
sdCachePath := ""
|
||||||
sdCacheDir := mustGetFlagString(cmd, "sd-cache")
|
sdCacheDir := mustGetFlagString(cmd, "sd-cache")
|
||||||
if sdCacheDir != "" {
|
if sdCacheDir != "" {
|
||||||
|
@ -58,7 +60,7 @@ func sendCmd(cmd *cobra.Command, args []string) {
|
||||||
|
|
||||||
if sdCachePath != "" {
|
if sdCachePath != "" {
|
||||||
if _, err := os.Stat(sdCachePath); !os.IsNotExist(err) {
|
if _, err := os.Stat(sdCachePath); !os.IsNotExist(err) {
|
||||||
sdBlob, err := os.ReadFile(sdCachePath)
|
sdBlob, err := ioutil.ReadFile(sdCachePath)
|
||||||
checkErr(err)
|
checkErr(err)
|
||||||
cachedSDBlob := &stream.SDBlob{}
|
cachedSDBlob := &stream.SDBlob{}
|
||||||
err = cachedSDBlob.FromBlob(sdBlob)
|
err = cachedSDBlob.FromBlob(sdBlob)
|
||||||
|
@ -108,7 +110,7 @@ func sendCmd(cmd *cobra.Command, args []string) {
|
||||||
sd := enc.SDBlob()
|
sd := enc.SDBlob()
|
||||||
//sd.StreamName = filepath.Base(filePath)
|
//sd.StreamName = filepath.Base(filePath)
|
||||||
//sd.SuggestedFileName = filepath.Base(filePath)
|
//sd.SuggestedFileName = filepath.Base(filePath)
|
||||||
err = os.WriteFile(sdCachePath, sd.ToBlob(), 0666)
|
err = ioutil.WriteFile(sdCachePath, sd.ToBlob(), 0666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error saving sd blob: %v\n", err)
|
fmt.Printf("error saving sd blob: %v\n", err)
|
||||||
fmt.Println(sd.ToJson())
|
fmt.Println(sd.ToJson())
|
||||||
|
|
|
@ -51,7 +51,7 @@ func sendBlobCmd(cmd *cobra.Command, args []string) {
|
||||||
|
|
||||||
file, err := os.Open(path)
|
file, err := os.Open(path)
|
||||||
checkErr(err)
|
checkErr(err)
|
||||||
defer func() { _ = file.Close() }()
|
defer file.Close()
|
||||||
s, err := stream.New(file)
|
s, err := stream.New(file)
|
||||||
checkErr(err)
|
checkErr(err)
|
||||||
|
|
||||||
|
|
|
@ -18,5 +18,5 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func versionCmd(cmd *cobra.Command, args []string) {
|
func versionCmd(cmd *cobra.Command, args []string) {
|
||||||
fmt.Println(meta.FullName())
|
fmt.Println(meta.VersionString())
|
||||||
}
|
}
|
||||||
|
|
2
db/db.go
2
db/db.go
|
@ -17,7 +17,7 @@ import (
|
||||||
"github.com/go-sql-driver/mysql"
|
"github.com/go-sql-driver/mysql"
|
||||||
_ "github.com/go-sql-driver/mysql" // blank import for db driver ensures its imported even if its not used
|
_ "github.com/go-sql-driver/mysql" // blank import for db driver ensures its imported even if its not used
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/volatiletech/null/v8"
|
"github.com/volatiletech/null"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
145
go.mod
145
go.mod
|
@ -1,124 +1,51 @@
|
||||||
module github.com/lbryio/reflector.go
|
module github.com/lbryio/reflector.go
|
||||||
|
|
||||||
go 1.20
|
|
||||||
|
|
||||||
replace github.com/btcsuite/btcd => github.com/lbryio/lbrycrd.go v0.0.0-20200203050410-e1076f12bf19
|
replace github.com/btcsuite/btcd => github.com/lbryio/lbrycrd.go v0.0.0-20200203050410-e1076f12bf19
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/aws/aws-sdk-go v1.45.24
|
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
|
||||||
|
github.com/aws/aws-sdk-go v1.27.0
|
||||||
github.com/bluele/gcache v0.0.2
|
github.com/bluele/gcache v0.0.2
|
||||||
github.com/brk0v/directio v0.0.0-20190225130936-69406e757cf7
|
github.com/brk0v/directio v0.0.0-20190225130936-69406e757cf7
|
||||||
github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3
|
github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3
|
||||||
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
|
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
|
||||||
github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b
|
github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2
|
||||||
github.com/davecgh/go-spew v1.1.1
|
github.com/davecgh/go-spew v1.1.1
|
||||||
github.com/ekyoung/gin-nice-recovery v0.0.0-20160510022553-1654dca486db
|
github.com/ekyoung/gin-nice-recovery v0.0.0-20160510022553-1654dca486db
|
||||||
github.com/gin-gonic/gin v1.9.1
|
github.com/gin-gonic/gin v1.7.1
|
||||||
github.com/go-sql-driver/mysql v1.7.1
|
github.com/go-sql-driver/mysql v1.6.0
|
||||||
github.com/gogo/protobuf v1.3.2
|
github.com/gogo/protobuf v1.2.1
|
||||||
github.com/golang/protobuf v1.5.3
|
github.com/golang/protobuf v1.5.2
|
||||||
github.com/google/gops v0.3.28
|
github.com/google/go-cmp v0.5.6 // indirect
|
||||||
|
github.com/google/gops v0.3.18
|
||||||
github.com/gorilla/mux v1.8.0
|
github.com/gorilla/mux v1.8.0
|
||||||
github.com/hashicorp/serf v0.10.1
|
github.com/hashicorp/go-msgpack v0.5.5 // indirect
|
||||||
|
github.com/hashicorp/golang-lru v0.5.4 // indirect
|
||||||
|
github.com/hashicorp/serf v0.9.5
|
||||||
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf
|
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf
|
||||||
github.com/johntdyer/slackrus v0.0.0-20230315191314-80bc92dee4fc
|
github.com/johntdyer/slackrus v0.0.0-20210521205746-42486fb4c48c
|
||||||
github.com/karrick/godirwalk v1.17.0
|
github.com/karrick/godirwalk v1.16.1
|
||||||
github.com/lbryio/chainquery v1.9.1-0.20230515181855-2fcba3115cfe
|
github.com/lbryio/chainquery v1.9.0
|
||||||
github.com/lbryio/lbry.go/v2 v2.7.2-0.20230307181431-a01aa6dc0629
|
github.com/lbryio/lbry.go/v2 v2.7.2-0.20210416195322-6516df1418e3
|
||||||
github.com/lbryio/types v0.0.0-20220224142228-73610f6654a6
|
github.com/lbryio/types v0.0.0-20201019032447-f0b4476ef386
|
||||||
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5
|
github.com/lucas-clemente/quic-go v0.20.1
|
||||||
github.com/prometheus/client_golang v1.16.0
|
github.com/phayes/freeport v0.0.0-20171002185219-e27662a4a9d6
|
||||||
github.com/quic-go/quic-go v0.39.0
|
github.com/prometheus/client_golang v1.10.0
|
||||||
github.com/sirupsen/logrus v1.9.3
|
github.com/sergi/go-diff v1.2.0 // indirect
|
||||||
github.com/spf13/cast v1.5.1
|
github.com/sirupsen/logrus v1.8.1
|
||||||
github.com/spf13/cobra v1.7.0
|
github.com/spf13/afero v1.4.1 // indirect
|
||||||
github.com/stretchr/testify v1.8.4
|
github.com/spf13/cast v1.3.0
|
||||||
github.com/volatiletech/null/v8 v8.1.2
|
github.com/spf13/cobra v1.1.3
|
||||||
go.uber.org/atomic v1.11.0
|
github.com/spf13/viper v1.7.1 // indirect
|
||||||
golang.org/x/sync v0.4.0
|
github.com/stretchr/testify v1.7.0
|
||||||
|
github.com/volatiletech/null v8.0.0+incompatible
|
||||||
|
go.uber.org/atomic v1.7.0
|
||||||
|
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
|
||||||
|
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
|
||||||
|
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
|
||||||
|
golang.org/x/text v0.3.6 // indirect
|
||||||
|
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
|
||||||
|
google.golang.org/protobuf v1.27.1 // indirect
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
go 1.16
|
||||||
github.com/armon/go-metrics v0.4.0 // indirect
|
|
||||||
github.com/beorn7/perks v1.0.1 // indirect
|
|
||||||
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
|
|
||||||
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect
|
|
||||||
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 // indirect
|
|
||||||
github.com/bytedance/sonic v1.9.1 // indirect
|
|
||||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
|
||||||
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
|
|
||||||
github.com/friendsofgo/errors v0.9.2 // indirect
|
|
||||||
github.com/fsnotify/fsnotify v1.6.0 // indirect
|
|
||||||
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
|
|
||||||
github.com/gin-contrib/sse v0.1.0 // indirect
|
|
||||||
github.com/go-errors/errors v1.4.2 // indirect
|
|
||||||
github.com/go-ini/ini v1.67.0 // indirect
|
|
||||||
github.com/go-playground/locales v0.14.1 // indirect
|
|
||||||
github.com/go-playground/universal-translator v0.18.1 // indirect
|
|
||||||
github.com/go-playground/validator/v10 v10.14.0 // indirect
|
|
||||||
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
|
|
||||||
github.com/goccy/go-json v0.10.2 // indirect
|
|
||||||
github.com/gofrs/uuid v4.2.0+incompatible // indirect
|
|
||||||
github.com/google/btree v1.0.0 // indirect
|
|
||||||
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect
|
|
||||||
github.com/gorilla/rpc v1.2.0 // indirect
|
|
||||||
github.com/gorilla/websocket v1.4.2 // indirect
|
|
||||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
|
||||||
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
|
|
||||||
github.com/hashicorp/go-msgpack v0.5.3 // indirect
|
|
||||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
|
||||||
github.com/hashicorp/go-sockaddr v1.0.0 // indirect
|
|
||||||
github.com/hashicorp/golang-lru v0.5.4 // indirect
|
|
||||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
|
||||||
github.com/hashicorp/memberlist v0.5.0 // indirect
|
|
||||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
|
||||||
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
|
||||||
github.com/johntdyer/slack-go v0.0.0-20230314151037-c5bf334f9b6e // indirect
|
|
||||||
github.com/json-iterator/go v1.1.12 // indirect
|
|
||||||
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
|
|
||||||
github.com/leodido/go-urn v1.2.4 // indirect
|
|
||||||
github.com/lyoshenka/bencode v0.0.0-20180323155644-b7abd7672df5 // indirect
|
|
||||||
github.com/magiconair/properties v1.8.7 // indirect
|
|
||||||
github.com/mattn/go-isatty v0.0.19 // indirect
|
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
|
|
||||||
github.com/miekg/dns v1.1.41 // indirect
|
|
||||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
|
||||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
|
||||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
|
||||||
github.com/onsi/ginkgo/v2 v2.9.5 // indirect
|
|
||||||
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
|
|
||||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
|
||||||
github.com/prometheus/client_model v0.3.0 // indirect
|
|
||||||
github.com/prometheus/common v0.42.0 // indirect
|
|
||||||
github.com/prometheus/procfs v0.10.1 // indirect
|
|
||||||
github.com/quic-go/qpack v0.4.0 // indirect
|
|
||||||
github.com/quic-go/qtls-go1-20 v0.3.4 // indirect
|
|
||||||
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
|
|
||||||
github.com/shopspring/decimal v1.3.1 // indirect
|
|
||||||
github.com/slack-go/slack v0.12.1 // indirect
|
|
||||||
github.com/spf13/afero v1.9.3 // indirect
|
|
||||||
github.com/spf13/jwalterweatherman v1.1.0 // indirect
|
|
||||||
github.com/spf13/pflag v1.0.5 // indirect
|
|
||||||
github.com/spf13/viper v1.15.0 // indirect
|
|
||||||
github.com/subosito/gotenv v1.4.2 // indirect
|
|
||||||
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
|
||||||
github.com/ugorji/go/codec v1.2.11 // indirect
|
|
||||||
github.com/volatiletech/inflect v0.0.1 // indirect
|
|
||||||
github.com/volatiletech/randomize v0.0.1 // indirect
|
|
||||||
github.com/volatiletech/strmangle v0.0.4 // indirect
|
|
||||||
go.uber.org/mock v0.3.0 // indirect
|
|
||||||
golang.org/x/arch v0.3.0 // indirect
|
|
||||||
golang.org/x/crypto v0.9.0 // indirect
|
|
||||||
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect
|
|
||||||
golang.org/x/mod v0.11.0 // indirect
|
|
||||||
golang.org/x/net v0.10.0 // indirect
|
|
||||||
golang.org/x/sys v0.11.0 // indirect
|
|
||||||
golang.org/x/text v0.9.0 // indirect
|
|
||||||
golang.org/x/time v0.3.0 // indirect
|
|
||||||
golang.org/x/tools v0.9.1 // indirect
|
|
||||||
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
|
|
||||||
google.golang.org/protobuf v1.30.0 // indirect
|
|
||||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
|
||||||
gopkg.in/nullbio/null.v6 v6.0.0-20161116030900-40264a2e6b79 // indirect
|
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
|
||||||
)
|
|
||||||
|
|
|
@ -86,13 +86,8 @@ const (
|
||||||
errUnexpectedEOFStr = "unexpected_eof_str"
|
errUnexpectedEOFStr = "unexpected_eof_str"
|
||||||
errJSONSyntax = "json_syntax"
|
errJSONSyntax = "json_syntax"
|
||||||
errBlobTooBig = "blob_too_big"
|
errBlobTooBig = "blob_too_big"
|
||||||
errInvalidPeerJSON = "invalid_peer_json"
|
|
||||||
errInvalidPeerData = "invalid_peer_data"
|
|
||||||
errRequestTooLarge = "request_too_large"
|
|
||||||
errDeadlineExceeded = "deadline_exceeded"
|
errDeadlineExceeded = "deadline_exceeded"
|
||||||
errHashMismatch = "hash_mismatch"
|
errHashMismatch = "hash_mismatch"
|
||||||
errProtectedBlob = "protected_blob"
|
|
||||||
errInvalidBlobHash = "invalid_blob_hash"
|
|
||||||
errZeroByteBlob = "zero_byte_blob"
|
errZeroByteBlob = "zero_byte_blob"
|
||||||
errInvalidCharacter = "invalid_character"
|
errInvalidCharacter = "invalid_character"
|
||||||
errBlobNotFound = "blob_not_found"
|
errBlobNotFound = "blob_not_found"
|
||||||
|
@ -301,20 +296,10 @@ func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a
|
||||||
} else if strings.Contains(err.Error(), "blob must be at most") {
|
} else if strings.Contains(err.Error(), "blob must be at most") {
|
||||||
//log.Warnln("blob must be at most X bytes is not the same as ErrBlobTooBig")
|
//log.Warnln("blob must be at most X bytes is not the same as ErrBlobTooBig")
|
||||||
errType = errBlobTooBig
|
errType = errBlobTooBig
|
||||||
} else if strings.Contains(err.Error(), "invalid json request") {
|
|
||||||
errType = errInvalidPeerJSON
|
|
||||||
} else if strings.Contains(err.Error(), "Invalid data") {
|
|
||||||
errType = errInvalidPeerData
|
|
||||||
} else if strings.Contains(err.Error(), "request is too large") {
|
|
||||||
errType = errRequestTooLarge
|
|
||||||
} else if strings.Contains(err.Error(), "Invalid blob hash length") {
|
|
||||||
errType = errInvalidBlobHash
|
|
||||||
} else if strings.Contains(err.Error(), "hash of received blob data does not match hash from send request") {
|
} else if strings.Contains(err.Error(), "hash of received blob data does not match hash from send request") {
|
||||||
errType = errHashMismatch
|
errType = errHashMismatch
|
||||||
} else if strings.Contains(err.Error(), "blob not found") {
|
} else if strings.Contains(err.Error(), "blob not found") {
|
||||||
errType = errBlobNotFound
|
errType = errBlobNotFound
|
||||||
} else if strings.Contains(err.Error(), "requested blob is protected") {
|
|
||||||
errType = errProtectedBlob
|
|
||||||
} else if strings.Contains(err.Error(), "0-byte blob received") {
|
} else if strings.Contains(err.Error(), "0-byte blob received") {
|
||||||
errType = errZeroByteBlob
|
errType = errZeroByteBlob
|
||||||
} else if strings.Contains(err.Error(), "PROTOCOL_VIOLATION: tried to retire connection") {
|
} else if strings.Contains(err.Error(), "PROTOCOL_VIOLATION: tried to retire connection") {
|
||||||
|
|
|
@ -10,7 +10,7 @@ import (
|
||||||
"github.com/go-sql-driver/mysql"
|
"github.com/go-sql-driver/mysql"
|
||||||
_ "github.com/go-sql-driver/mysql" // blank import for db driver ensures its imported even if its not used
|
_ "github.com/go-sql-driver/mysql" // blank import for db driver ensures its imported even if its not used
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/volatiletech/null/v8"
|
"github.com/volatiletech/null"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SdBlob is a special blob that contains information on the rest of the blobs in the stream
|
// SdBlob is a special blob that contains information on the rest of the blobs in the stream
|
||||||
|
@ -111,7 +111,7 @@ func (s *SQL) HasBlob(hash string) (bool, error) {
|
||||||
// HasBlobs checks if the database contains the set of blobs and returns a bool map.
|
// HasBlobs checks if the database contains the set of blobs and returns a bool map.
|
||||||
func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
|
func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
|
||||||
exists, streamsNeedingTouch, err := s.hasBlobs(hashes)
|
exists, streamsNeedingTouch, err := s.hasBlobs(hashes)
|
||||||
_ = s.touch(streamsNeedingTouch)
|
s.touch(streamsNeedingTouch)
|
||||||
return exists, err
|
return exists, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
38
meta/meta.go
38
meta/meta.go
|
@ -6,36 +6,9 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var Version = ""
|
||||||
name = "prism-bin"
|
var Time = ""
|
||||||
version = "unknown"
|
var BuildTime time.Time
|
||||||
commit = "unknown"
|
|
||||||
commitLong = "unknown"
|
|
||||||
branch = "unknown"
|
|
||||||
Time = "unknown"
|
|
||||||
BuildTime time.Time
|
|
||||||
)
|
|
||||||
|
|
||||||
// Name returns main application name
|
|
||||||
func Name() string {
|
|
||||||
return name
|
|
||||||
}
|
|
||||||
|
|
||||||
// Version returns current application version
|
|
||||||
func Version() string {
|
|
||||||
return version
|
|
||||||
}
|
|
||||||
|
|
||||||
// FullName returns current app version, commit and build time
|
|
||||||
func FullName() string {
|
|
||||||
return fmt.Sprintf(
|
|
||||||
`Name: %v
|
|
||||||
Version: %v
|
|
||||||
branch: %v
|
|
||||||
commit: %v
|
|
||||||
commit long: %v
|
|
||||||
build date: %v`, Name(), Version(), branch, commit, commitLong, BuildTime.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
if Time != "" {
|
if Time != "" {
|
||||||
|
@ -47,6 +20,11 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func VersionString() string {
|
func VersionString() string {
|
||||||
|
version := Version
|
||||||
|
if version == "" {
|
||||||
|
version = "<unset>"
|
||||||
|
}
|
||||||
|
|
||||||
var buildTime string
|
var buildTime string
|
||||||
if BuildTime.IsZero() {
|
if BuildTime.IsZero() {
|
||||||
buildTime = "<now>"
|
buildTime = "<now>"
|
||||||
|
|
|
@ -115,7 +115,7 @@ func Publish(client *lbrycrd.Client, path, name, address string, details Details
|
||||||
return signedTx, txid, nil
|
return signedTx, txid, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: lots of assumptions. hardcoded values need to be passed in or calculated
|
//TODO: lots of assumptions. hardcoded values need to be passed in or calculated
|
||||||
func baseTx(client *lbrycrd.Client, amount float64, changeAddress btcutil.Address) (*wire.MsgTx, error) {
|
func baseTx(client *lbrycrd.Client, amount float64, changeAddress btcutil.Address) (*wire.MsgTx, error) {
|
||||||
txFee := 0.0002 // TODO: estimate this better?
|
txFee := 0.0002 // TODO: estimate this better?
|
||||||
|
|
||||||
|
@ -222,7 +222,8 @@ func makeStream(path string) (stream.Stream, *pb.Stream, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, errors.Err(err)
|
return nil, nil, errors.Err(err)
|
||||||
}
|
}
|
||||||
defer func() { _ = file.Close() }()
|
defer file.Close()
|
||||||
|
|
||||||
enc := stream.NewEncoder(file)
|
enc := stream.NewEncoder(file)
|
||||||
|
|
||||||
s, err := enc.Stream()
|
s, err := enc.Stream()
|
||||||
|
|
|
@ -13,7 +13,7 @@ This code includes a Go implementations of the LBRY peer protocol, reflector pro
|
||||||
|
|
||||||
- Install mysql 8 (5.7 might work too)
|
- Install mysql 8 (5.7 might work too)
|
||||||
- add a reflector user and database with password `reflector` with localhost access only
|
- add a reflector user and database with password `reflector` with localhost access only
|
||||||
- Create the tables as described [here](https://github.com/lbryio/reflector.go/blob/master/db/db.go#L735) (the link might not update as the code does so just look for the schema in that file)
|
- Create the tables as described [here](https://github.com/lbryio/reflector.go/blob/ittt/db/db.go#L735) (the link might not update as the code does so just look for the schema in that file)
|
||||||
|
|
||||||
#### We do not support running reflector.go as a blob receiver, however if you want to run it as a private blobcache you may compile it yourself and run it as following:
|
#### We do not support running reflector.go as a blob receiver, however if you want to run it as a private blobcache you may compile it yourself and run it as following:
|
||||||
```bash
|
```bash
|
||||||
|
@ -96,7 +96,7 @@ Flags:
|
||||||
```
|
```
|
||||||
## Running from Source
|
## Running from Source
|
||||||
|
|
||||||
This project requires [Go v1.20](https://golang.org/doc/install).
|
This project requires [Go v1.16](https://golang.org/doc/install).
|
||||||
|
|
||||||
On Ubuntu you can install it with `sudo snap install go --classic`
|
On Ubuntu you can install it with `sudo snap install go --classic`
|
||||||
|
|
||||||
|
@ -104,7 +104,7 @@ On Ubuntu you can install it with `sudo snap install go --classic`
|
||||||
git clone git@github.com:lbryio/reflector.go.git
|
git clone git@github.com:lbryio/reflector.go.git
|
||||||
cd reflector.go
|
cd reflector.go
|
||||||
make
|
make
|
||||||
./dist/linux_amd64/prism-bin
|
./bin/prism-bin
|
||||||
```
|
```
|
||||||
|
|
||||||
## Contributing
|
## Contributing
|
||||||
|
@ -118,7 +118,7 @@ This project is MIT licensed.
|
||||||
## Security
|
## Security
|
||||||
|
|
||||||
We take security seriously. Please contact security@lbry.com regarding any security issues.
|
We take security seriously. Please contact security@lbry.com regarding any security issues.
|
||||||
Our PGP key is [here](https://lbry.com/faq/pgp-key) if you need it.
|
Our PGP key is [here](https://keybase.io/lbry/key.asc) if you need it.
|
||||||
|
|
||||||
## Contact
|
## Contact
|
||||||
The primary contact for this project is [@Nikooo777](https://github.com/Nikooo777) (niko-at-lbry.com)
|
The primary contact for this project is [@Nikooo777](https://github.com/Nikooo777) (niko-at-lbry.com)
|
||||||
|
|
|
@ -1,81 +0,0 @@
|
||||||
package reflector
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"net/http"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/bluele/gcache"
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
|
||||||
"golang.org/x/sync/singleflight"
|
|
||||||
)
|
|
||||||
|
|
||||||
const protectedListURL = "https://api.odysee.com/file/list_protected"
|
|
||||||
|
|
||||||
type ProtectedContent struct {
|
|
||||||
SDHash string `json:"sd_hash"`
|
|
||||||
ClaimID string `json:"claim_id"`
|
|
||||||
}
|
|
||||||
|
|
||||||
var protectedCache = gcache.New(10).Expiration(2 * time.Minute).Build()
|
|
||||||
|
|
||||||
func GetProtectedContent() (interface{}, error) {
|
|
||||||
cachedVal, err := protectedCache.Get("protected")
|
|
||||||
if err == nil && cachedVal != nil {
|
|
||||||
return cachedVal.(map[string]bool), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
method := "GET"
|
|
||||||
var r struct {
|
|
||||||
Success bool `json:"success"`
|
|
||||||
Error string `json:"error"`
|
|
||||||
Data []ProtectedContent `json:"data"`
|
|
||||||
}
|
|
||||||
|
|
||||||
client := &http.Client{}
|
|
||||||
req, err := http.NewRequest(method, protectedListURL, nil)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Err(err)
|
|
||||||
}
|
|
||||||
res, err := client.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Err(err)
|
|
||||||
}
|
|
||||||
defer func() { _ = res.Body.Close() }()
|
|
||||||
if res.StatusCode != http.StatusOK {
|
|
||||||
return nil, errors.Err("unexpected status code %d", res.StatusCode)
|
|
||||||
}
|
|
||||||
if err = json.NewDecoder(res.Body).Decode(&r); err != nil {
|
|
||||||
return nil, errors.Err(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !r.Success {
|
|
||||||
return nil, errors.Prefix("file/list_protected API call", r.Error)
|
|
||||||
}
|
|
||||||
|
|
||||||
protectedMap := make(map[string]bool, len(r.Data))
|
|
||||||
for _, pc := range r.Data {
|
|
||||||
protectedMap[pc.SDHash] = true
|
|
||||||
}
|
|
||||||
err = protectedCache.Set("protected", protectedMap)
|
|
||||||
if err != nil {
|
|
||||||
return protectedMap, errors.Err(err)
|
|
||||||
}
|
|
||||||
return protectedMap, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var sf = singleflight.Group{}
|
|
||||||
|
|
||||||
func IsProtected(sdHash string) bool {
|
|
||||||
val, err, _ := sf.Do("protected", GetProtectedContent)
|
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
cachedMap, ok := val.(map[string]bool)
|
|
||||||
if !ok {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return cachedMap[sdHash]
|
|
||||||
}
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -61,7 +62,7 @@ func (s *Server) Shutdown() {
|
||||||
log.Println("reflector server stopped")
|
log.Println("reflector server stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start starts the server to handle connections.
|
//Start starts the server to handle connections.
|
||||||
func (s *Server) Start(address string) error {
|
func (s *Server) Start(address string) error {
|
||||||
l, err := net.Listen(network, address)
|
l, err := net.Listen(network, address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -366,7 +367,7 @@ func (s *Server) read(conn net.Conn, v interface{}) error {
|
||||||
dec := json.NewDecoder(conn)
|
dec := json.NewDecoder(conn)
|
||||||
err = dec.Decode(v)
|
err = dec.Decode(v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
data, _ := io.ReadAll(dec.Buffered())
|
data, _ := ioutil.ReadAll(dec.Buffered())
|
||||||
if len(data) > 0 {
|
if len(data) > 0 {
|
||||||
return errors.Err("%s. Data: %s", err.Error(), hex.EncodeToString(data))
|
return errors.Err("%s. Data: %s", err.Error(), hex.EncodeToString(data))
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package reflector
|
package reflector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -164,7 +165,7 @@ func (u *Uploader) uploadBlob(filepath string) (err error) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
blob, err := os.ReadFile(filepath)
|
blob, err := ioutil.ReadFile(filepath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Err(err)
|
return errors.Err(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,26 +0,0 @@
|
||||||
#!/usr/bin/env bash
|
|
||||||
|
|
||||||
err=0
|
|
||||||
trap 'err=1' ERR
|
|
||||||
# All the .go files, excluding auto generated folders
|
|
||||||
GO_FILES=$(find . -iname '*.go' -type f)
|
|
||||||
(
|
|
||||||
go install golang.org/x/tools/cmd/goimports@latest # Used in build script for generated files
|
|
||||||
# go install golang.org/x/lint/golint@latest # Linter
|
|
||||||
go install github.com/jgautheron/gocyclo@latest # Check against high complexity
|
|
||||||
go install github.com/mdempsky/unconvert@latest # Identifies unnecessary type conversions
|
|
||||||
go install github.com/kisielk/errcheck@latest # Checks for unhandled errors
|
|
||||||
go install github.com/opennota/check/cmd/varcheck@latest # Checks for unused vars
|
|
||||||
go install github.com/opennota/check/cmd/structcheck@latest # Checks for unused fields in structs
|
|
||||||
)
|
|
||||||
echo "Running varcheck..." && varcheck $(go list ./...)
|
|
||||||
echo "Running structcheck..." && structcheck $(go list ./...)
|
|
||||||
# go vet is the official Go static analyzer
|
|
||||||
echo "Running go vet..." && go vet $(go list ./...)
|
|
||||||
# checks for unhandled errors
|
|
||||||
echo "Running errcheck..." && errcheck $(go list ./...)
|
|
||||||
# check for unnecessary conversions - ignore autogen code
|
|
||||||
echo "Running unconvert..." && unconvert -v $(go list ./...)
|
|
||||||
echo "Running gocyclo..." && gocyclo -ignore "_test" -avg -over 28 $GO_FILES
|
|
||||||
#echo "Running golint..." && golint -set_exit_status $(go list ./...)
|
|
||||||
test $err = 0 # Return non-zero if any command failed
|
|
|
@ -6,14 +6,12 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/reflector.go/internal/metrics"
|
"github.com/lbryio/reflector.go/internal/metrics"
|
||||||
"github.com/lbryio/reflector.go/reflector"
|
|
||||||
"github.com/lbryio/reflector.go/shared"
|
"github.com/lbryio/reflector.go/shared"
|
||||||
"github.com/lbryio/reflector.go/store"
|
"github.com/lbryio/reflector.go/store"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Server) getBlob(c *gin.Context) {
|
func (s *Server) getBlob(c *gin.Context) {
|
||||||
|
@ -24,20 +22,9 @@ func (s *Server) getBlob(c *gin.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) HandleGetBlob(c *gin.Context) {
|
func (s *Server) HandleGetBlob(c *gin.Context) {
|
||||||
defer func() {
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
log.Errorf("Recovered from panic: %v", r)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
hash := c.Query("hash")
|
hash := c.Query("hash")
|
||||||
edgeToken := c.Query("edge_token")
|
|
||||||
|
|
||||||
if reflector.IsProtected(hash) && edgeToken != s.edgeToken {
|
|
||||||
_ = c.Error(errors.Err("requested blob is protected"))
|
|
||||||
c.String(http.StatusForbidden, "requested blob is protected")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if s.missesCache.Has(hash) {
|
if s.missesCache.Has(hash) {
|
||||||
serialized, err := shared.NewBlobTrace(time.Since(start), "http").Serialize()
|
serialized, err := shared.NewBlobTrace(time.Since(start), "http").Serialize()
|
||||||
c.Header("Via", serialized)
|
c.Header("Via", serialized)
|
||||||
|
|
|
@ -21,17 +21,15 @@ type Server struct {
|
||||||
grp *stop.Group
|
grp *stop.Group
|
||||||
concurrentRequests int
|
concurrentRequests int
|
||||||
missesCache gcache.Cache
|
missesCache gcache.Cache
|
||||||
edgeToken string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer returns an initialized Server pointer.
|
// NewServer returns an initialized Server pointer.
|
||||||
func NewServer(store store.BlobStore, requestQueueSize int, edgeToken string) *Server {
|
func NewServer(store store.BlobStore, requestQueueSize int) *Server {
|
||||||
return &Server{
|
return &Server{
|
||||||
store: store,
|
store: store,
|
||||||
grp: stop.New(),
|
grp: stop.New(),
|
||||||
concurrentRequests: requestQueueSize,
|
concurrentRequests: requestQueueSize,
|
||||||
missesCache: gcache.New(2000).Expiration(5 * time.Minute).ARC().Build(),
|
missesCache: gcache.New(2000).Expiration(5 * time.Minute).ARC().Build(),
|
||||||
edgeToken: edgeToken,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,12 +43,11 @@ func (s *Server) Shutdown() {
|
||||||
// Start starts the server listener to handle connections.
|
// Start starts the server listener to handle connections.
|
||||||
func (s *Server) Start(address string) error {
|
func (s *Server) Start(address string) error {
|
||||||
gin.SetMode(gin.ReleaseMode)
|
gin.SetMode(gin.ReleaseMode)
|
||||||
router := gin.New()
|
router := gin.Default()
|
||||||
router.Use(gin.Logger())
|
|
||||||
// Install nice.Recovery, passing the handler to call after recovery
|
|
||||||
router.Use(nice.Recovery(s.recoveryHandler))
|
|
||||||
router.GET("/blob", s.getBlob)
|
router.GET("/blob", s.getBlob)
|
||||||
router.HEAD("/blob", s.hasBlob)
|
router.HEAD("/blob", s.hasBlob)
|
||||||
|
// Install nice.Recovery, passing the handler to call after recovery
|
||||||
|
router.Use(nice.Recovery(s.recoveryHandler))
|
||||||
srv := &http.Server{
|
srv := &http.Server{
|
||||||
Addr: address,
|
Addr: address,
|
||||||
Handler: router,
|
Handler: router,
|
||||||
|
|
|
@ -16,7 +16,7 @@ import (
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
|
||||||
"github.com/quic-go/quic-go/http3"
|
"github.com/lucas-clemente/quic-go/http3"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -69,7 +69,7 @@ func (c *Client) HasBlob(hash string) (bool, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, errors.Err(err)
|
return false, errors.Err(err)
|
||||||
}
|
}
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer resp.Body.Close()
|
||||||
if resp.StatusCode == http.StatusOK {
|
if resp.StatusCode == http.StatusOK {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
@ -86,7 +86,7 @@ func (c *Client) GetBlob(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, shared.NewBlobTrace(time.Since(start), "http3"), errors.Err(err)
|
return nil, shared.NewBlobTrace(time.Since(start), "http3"), errors.Err(err)
|
||||||
}
|
}
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode == http.StatusNotFound {
|
if resp.StatusCode == http.StatusNotFound {
|
||||||
fmt.Printf("%s blob not found %d\n", hash, resp.StatusCode)
|
fmt.Printf("%s blob not found %d\n", hash, resp.StatusCode)
|
||||||
|
|
|
@ -15,15 +15,14 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/reflector.go/internal/metrics"
|
"github.com/lbryio/reflector.go/internal/metrics"
|
||||||
"github.com/lbryio/reflector.go/reflector"
|
|
||||||
"github.com/lbryio/reflector.go/store"
|
"github.com/lbryio/reflector.go/store"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/extras/stop"
|
"github.com/lbryio/lbry.go/v2/extras/stop"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/quic-go/quic-go"
|
"github.com/lucas-clemente/quic-go"
|
||||||
"github.com/quic-go/quic-go/http3"
|
"github.com/lucas-clemente/quic-go/http3"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -113,9 +112,11 @@ func (s *Server) Start(address string) error {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
server := http3.Server{
|
server := http3.Server{
|
||||||
Addr: address,
|
Server: &http.Server{
|
||||||
Handler: r,
|
Handler: r,
|
||||||
TLSConfig: generateTLSConfig(),
|
Addr: address,
|
||||||
|
TLSConfig: generateTLSConfig(),
|
||||||
|
},
|
||||||
QuicConfig: quicConf,
|
QuicConfig: quicConf,
|
||||||
}
|
}
|
||||||
go InitWorkers(s, s.concurrentRequests)
|
go InitWorkers(s, s.concurrentRequests)
|
||||||
|
@ -155,7 +156,7 @@ func generateTLSConfig() *tls.Config {
|
||||||
|
|
||||||
func (s *Server) listenAndServe(server *http3.Server) {
|
func (s *Server) listenAndServe(server *http3.Server) {
|
||||||
err := server.ListenAndServe()
|
err := server.ListenAndServe()
|
||||||
if err != nil && err != quic.ErrServerClosed {
|
if err != nil && err.Error() != "server closed" {
|
||||||
log.Errorln(errors.FullTrace(err))
|
log.Errorln(errors.FullTrace(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -180,10 +181,7 @@ func (s *Server) HandleGetBlob(w http.ResponseWriter, r *http.Request) {
|
||||||
wantsTrace = false
|
wantsTrace = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if reflector.IsProtected(requestedBlob) {
|
|
||||||
http.Error(w, "requested blob is protected", http.StatusForbidden)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
blob, trace, err := s.store.Get(requestedBlob)
|
blob, trace, err := s.store.Get(requestedBlob)
|
||||||
|
|
||||||
if wantsTrace {
|
if wantsTrace {
|
||||||
|
|
|
@ -14,8 +14,8 @@ import (
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
|
||||||
"github.com/quic-go/quic-go"
|
"github.com/lucas-clemente/quic-go"
|
||||||
"github.com/quic-go/quic-go/http3"
|
"github.com/lucas-clemente/quic-go/http3"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Store is a blob store that gets blobs from a peer.
|
// Store is a blob store that gets blobs from a peer.
|
||||||
|
@ -74,7 +74,7 @@ func (p *Store) Has(hash string) (bool, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
defer func() { _ = c.Close() }()
|
defer c.Close()
|
||||||
return c.HasBlob(hash)
|
return c.HasBlob(hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,7 +93,7 @@ func (p *Store) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, shared.NewBlobTrace(time.Since(start), p.Name()), err
|
return nil, shared.NewBlobTrace(time.Since(start), p.Name()), err
|
||||||
}
|
}
|
||||||
defer func() { _ = c.Close() }()
|
defer c.Close()
|
||||||
return c.GetBlob(hash)
|
return c.GetBlob(hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,15 +32,17 @@ const (
|
||||||
// Server is an instance of a peer server that houses the listener and store.
|
// Server is an instance of a peer server that houses the listener and store.
|
||||||
type Server struct {
|
type Server struct {
|
||||||
store store.BlobStore
|
store store.BlobStore
|
||||||
|
disk *store.DiskStore
|
||||||
closed bool
|
closed bool
|
||||||
|
|
||||||
grp *stop.Group
|
grp *stop.Group
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer returns an initialized Server pointer.
|
// NewServer returns an initialized Server pointer.
|
||||||
func NewServer(store store.BlobStore) *Server {
|
func NewServer(blobstore store.BlobStore) *Server {
|
||||||
return &Server{
|
return &Server{
|
||||||
store: store,
|
store: blobstore,
|
||||||
|
disk: store.NewDiskStore("/external/blobs", 2),
|
||||||
grp: stop.New(),
|
grp: stop.New(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -131,7 +133,7 @@ func (s *Server) handleConnection(conn net.Conn) {
|
||||||
log.Error(errors.FullTrace(err))
|
log.Error(errors.FullTrace(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
response, err = s.handleCompositeRequest(request)
|
response, blobHash, err := s.handleCompositeRequest(request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(errors.FullTrace(err))
|
log.Error(errors.FullTrace(err))
|
||||||
return
|
return
|
||||||
|
@ -143,6 +145,7 @@ func (s *Server) handleConnection(conn net.Conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err := conn.Write(response)
|
n, err := conn.Write(response)
|
||||||
|
s.disk.Sendfile(blobHash, conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !strings.Contains(err.Error(), "connection reset by peer") { // means the other side closed the connection using TCP reset
|
if !strings.Contains(err.Error(), "connection reset by peer") { // means the other side closed the connection using TCP reset
|
||||||
s.logError(err)
|
s.logError(err)
|
||||||
|
@ -221,78 +224,70 @@ func (s *Server) handleAvailabilityRequest(data []byte) ([]byte, error) {
|
||||||
// return append(response, blob...), nil
|
// return append(response, blob...), nil
|
||||||
//}
|
//}
|
||||||
|
|
||||||
func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
|
func (s *Server) handleCompositeRequest(data []byte) ([]byte, string, error) {
|
||||||
var request compositeRequest
|
var request compositeRequest
|
||||||
err := json.Unmarshal(data, &request)
|
err := json.Unmarshal(data, &request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var je *json.SyntaxError
|
var je *json.SyntaxError
|
||||||
if ee.As(err, &je) {
|
if ee.As(err, &je) {
|
||||||
return nil, errors.Err("invalid json request: offset %d in data %s", je.Offset, hex.EncodeToString(data))
|
return nil, "", errors.Err("invalid json at offset %d in data %s", je.Offset, hex.EncodeToString(data))
|
||||||
}
|
}
|
||||||
return nil, errors.Err(err)
|
return nil, "", errors.Err(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
response := compositeResponse{
|
response := compositeResponse{
|
||||||
LbrycrdAddress: LbrycrdAddress,
|
LbrycrdAddress: LbrycrdAddress,
|
||||||
AvailableBlobs: []string{},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(request.RequestedBlobs) > 0 {
|
if len(request.RequestedBlobs) > 0 {
|
||||||
|
var availableBlobs []string
|
||||||
for _, blobHash := range request.RequestedBlobs {
|
for _, blobHash := range request.RequestedBlobs {
|
||||||
if reflector.IsProtected(blobHash) {
|
|
||||||
return nil, errors.Err("requested blob is protected")
|
|
||||||
}
|
|
||||||
exists, err := s.store.Has(blobHash)
|
exists, err := s.store.Has(blobHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, "", err
|
||||||
}
|
}
|
||||||
if exists {
|
if exists {
|
||||||
response.AvailableBlobs = append(response.AvailableBlobs, blobHash)
|
availableBlobs = append(availableBlobs, blobHash)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
response.AvailableBlobs = availableBlobs
|
||||||
}
|
}
|
||||||
|
|
||||||
if request.BlobDataPaymentRate != nil {
|
response.BlobDataPaymentRate = paymentRateAccepted
|
||||||
response.BlobDataPaymentRate = paymentRateAccepted
|
if request.BlobDataPaymentRate < 0 {
|
||||||
if *request.BlobDataPaymentRate < 0 {
|
response.BlobDataPaymentRate = paymentRateTooLow
|
||||||
response.BlobDataPaymentRate = paymentRateTooLow
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var blob []byte
|
var blob []byte
|
||||||
var trace shared.BlobTrace
|
|
||||||
if request.RequestedBlob != "" {
|
if request.RequestedBlob != "" {
|
||||||
if len(request.RequestedBlob) != stream.BlobHashHexLength {
|
if len(request.RequestedBlob) != stream.BlobHashHexLength {
|
||||||
return nil, errors.Err("Invalid blob hash length")
|
return nil, "", errors.Err("Invalid blob hash length")
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugln("Sending blob " + request.RequestedBlob[:8])
|
log.Debugln("Sending blob " + request.RequestedBlob[:8])
|
||||||
|
|
||||||
blob, trace, err = s.store.Get(request.RequestedBlob)
|
size, err := s.disk.Size(request.RequestedBlob)
|
||||||
log.Debug(trace.String())
|
if err != nil {
|
||||||
if errors.Is(err, store.ErrBlobNotFound) {
|
response.IncomingBlob = incomingBlob{
|
||||||
response.IncomingBlob = &incomingBlob{
|
|
||||||
Error: err.Error(),
|
Error: err.Error(),
|
||||||
}
|
}
|
||||||
} else if err != nil {
|
|
||||||
return nil, err
|
|
||||||
} else {
|
} else {
|
||||||
response.IncomingBlob = &incomingBlob{
|
response.IncomingBlob = incomingBlob{
|
||||||
BlobHash: request.RequestedBlob,
|
BlobHash: request.RequestedBlob,
|
||||||
Length: len(blob),
|
Length: int(size),
|
||||||
}
|
}
|
||||||
metrics.MtrOutBytesTcp.Add(float64(len(blob)))
|
|
||||||
metrics.BlobDownloadCount.Inc()
|
|
||||||
metrics.PeerDownloadCount.Inc()
|
|
||||||
}
|
}
|
||||||
|
metrics.MtrOutBytesTcp.Add(float64(len(blob)))
|
||||||
|
metrics.BlobDownloadCount.Inc()
|
||||||
|
metrics.PeerDownloadCount.Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
respData, err := json.Marshal(response)
|
respData, err := json.Marshal(response)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
return append(respData, blob...), nil
|
return respData, request.RequestedBlob, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) logError(e error) {
|
func (s *Server) logError(e error) {
|
||||||
|
@ -306,15 +301,7 @@ func (s *Server) logError(e error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func readNextMessage(buf *bufio.Reader) ([]byte, error) {
|
func readNextMessage(buf *bufio.Reader) ([]byte, error) {
|
||||||
first_byte, err := buf.ReadByte()
|
msg := make([]byte, 0)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if first_byte != '{' {
|
|
||||||
// every request starts with '{'. Checking here disconnects earlier, so we don't wait until timeout
|
|
||||||
return nil, errInvalidData
|
|
||||||
}
|
|
||||||
msg := []byte("{")
|
|
||||||
eof := false
|
eof := false
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
@ -335,8 +322,6 @@ func readNextMessage(buf *bufio.Reader) ([]byte, error) {
|
||||||
|
|
||||||
if len(msg) > maxRequestSize {
|
if len(msg) > maxRequestSize {
|
||||||
return msg, errRequestTooLarge
|
return msg, errRequestTooLarge
|
||||||
} else if len(msg) > 0 && msg[0] != '{' {
|
|
||||||
return msg, errInvalidData
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// yes, this is how the peer protocol knows when the request finishes
|
// yes, this is how the peer protocol knows when the request finishes
|
||||||
|
@ -371,7 +356,6 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
var errRequestTooLarge = errors.Base("request is too large")
|
var errRequestTooLarge = errors.Base("request is too large")
|
||||||
var errInvalidData = errors.Base("Invalid data")
|
|
||||||
|
|
||||||
type availabilityRequest struct {
|
type availabilityRequest struct {
|
||||||
LbrycrdAddress bool `json:"lbrycrd_address"`
|
LbrycrdAddress bool `json:"lbrycrd_address"`
|
||||||
|
@ -408,13 +392,13 @@ type blobResponse struct {
|
||||||
type compositeRequest struct {
|
type compositeRequest struct {
|
||||||
LbrycrdAddress bool `json:"lbrycrd_address"`
|
LbrycrdAddress bool `json:"lbrycrd_address"`
|
||||||
RequestedBlobs []string `json:"requested_blobs"`
|
RequestedBlobs []string `json:"requested_blobs"`
|
||||||
BlobDataPaymentRate *float64 `json:"blob_data_payment_rate"`
|
BlobDataPaymentRate float64 `json:"blob_data_payment_rate"`
|
||||||
RequestedBlob string `json:"requested_blob"`
|
RequestedBlob string `json:"requested_blob"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type compositeResponse struct {
|
type compositeResponse struct {
|
||||||
LbrycrdAddress string `json:"lbrycrd_address,omitempty"`
|
LbrycrdAddress string `json:"lbrycrd_address,omitempty"`
|
||||||
AvailableBlobs []string `json:"available_blobs"`
|
AvailableBlobs []string `json:"available_blobs,omitempty"`
|
||||||
BlobDataPaymentRate string `json:"blob_data_payment_rate,omitempty"`
|
BlobDataPaymentRate string `json:"blob_data_payment_rate,omitempty"`
|
||||||
IncomingBlob *incomingBlob `json:"incoming_blob,omitempty"`
|
IncomingBlob incomingBlob `json:"incoming_blob,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,10 +2,7 @@ package peer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"io"
|
|
||||||
"net"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/lbryio/reflector.go/store"
|
"github.com/lbryio/reflector.go/store"
|
||||||
)
|
)
|
||||||
|
@ -78,62 +75,3 @@ func TestAvailabilityRequest_WithBlobs(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRequestFromConnection(t *testing.T) {
|
|
||||||
s := getServer(t, true)
|
|
||||||
err := s.Start("127.0.0.1:50505")
|
|
||||||
defer s.Shutdown()
|
|
||||||
if err != nil {
|
|
||||||
t.Error("error starting server", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, p := range availabilityRequests {
|
|
||||||
conn, err := net.Dial("tcp", "127.0.0.1:50505")
|
|
||||||
if err != nil {
|
|
||||||
t.Error("error opening connection", err)
|
|
||||||
}
|
|
||||||
defer func() { _ = conn.Close() }()
|
|
||||||
|
|
||||||
response := make([]byte, 8192)
|
|
||||||
_, err = conn.Write(p.request)
|
|
||||||
if err != nil {
|
|
||||||
t.Error("error writing", err)
|
|
||||||
}
|
|
||||||
_, err = conn.Read(response)
|
|
||||||
if err != nil {
|
|
||||||
t.Error("error reading", err)
|
|
||||||
}
|
|
||||||
if !bytes.Equal(response[:len(p.response)], p.response) {
|
|
||||||
t.Errorf("Response did not match expected response.\nExpected: %s\nGot: %s", string(p.response), string(response))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestInvalidData(t *testing.T) {
|
|
||||||
s := getServer(t, true)
|
|
||||||
err := s.Start("127.0.0.1:50503")
|
|
||||||
defer s.Shutdown()
|
|
||||||
if err != nil {
|
|
||||||
t.Error("error starting server", err)
|
|
||||||
}
|
|
||||||
conn, err := net.Dial("tcp", "127.0.0.1:50503")
|
|
||||||
if err != nil {
|
|
||||||
t.Error("error opening connection", err)
|
|
||||||
}
|
|
||||||
defer func() { _ = conn.Close() }()
|
|
||||||
|
|
||||||
response := make([]byte, 8192)
|
|
||||||
_, err = conn.Write([]byte("hello dear server, I would like blobs. Please"))
|
|
||||||
if err != nil {
|
|
||||||
t.Error("error writing", err)
|
|
||||||
}
|
|
||||||
err = conn.SetReadDeadline(time.Now().Add(5 * time.Second))
|
|
||||||
if err != nil {
|
|
||||||
t.Error("error setting read deadline", err)
|
|
||||||
}
|
|
||||||
_, err = conn.Read(response)
|
|
||||||
if err != io.EOF {
|
|
||||||
t.Error("error reading", err)
|
|
||||||
}
|
|
||||||
println(response)
|
|
||||||
}
|
|
||||||
|
|
|
@ -42,7 +42,7 @@ func (p *Store) Has(hash string) (bool, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
defer func() { _ = c.Close() }()
|
defer c.Close()
|
||||||
return c.HasBlob(hash)
|
return c.HasBlob(hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -53,7 +53,7 @@ func (p *Store) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, shared.NewBlobTrace(time.Since(start), p.Name()), err
|
return nil, shared.NewBlobTrace(time.Since(start), p.Name()), err
|
||||||
}
|
}
|
||||||
defer func() { _ = c.Close() }()
|
defer c.Close()
|
||||||
blob, trace, err := c.GetBlob(hash)
|
blob, trace, err := c.GetBlob(hash)
|
||||||
if err != nil && strings.Contains(err.Error(), "blob not found") {
|
if err != nil && strings.Contains(err.Error(), "blob not found") {
|
||||||
return nil, trace, store.ErrBlobNotFound
|
return nil, trace, store.ErrBlobNotFound
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
//go:build linux
|
|
||||||
// +build linux
|
// +build linux
|
||||||
|
|
||||||
package store
|
package store
|
||||||
|
@ -10,7 +9,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func timespecToTime(ts syscall.Timespec) time.Time {
|
func timespecToTime(ts syscall.Timespec) time.Time {
|
||||||
return time.Unix(ts.Sec, ts.Nsec)
|
return time.Unix(int64(ts.Sec), int64(ts.Nsec))
|
||||||
}
|
}
|
||||||
|
|
||||||
func atime(fi os.FileInfo) time.Time {
|
func atime(fi os.FileInfo) time.Time {
|
||||||
|
|
|
@ -2,6 +2,7 @@ package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -36,7 +37,8 @@ func (c *CloudFrontROStore) Has(hash string) (bool, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
defer func() { _ = body.Close() }()
|
defer body.Close()
|
||||||
|
|
||||||
switch status {
|
switch status {
|
||||||
case http.StatusNotFound, http.StatusForbidden:
|
case http.StatusNotFound, http.StatusForbidden:
|
||||||
return false, nil
|
return false, nil
|
||||||
|
@ -59,12 +61,12 @@ func (c *CloudFrontROStore) Get(hash string) (stream.Blob, shared.BlobTrace, err
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, shared.NewBlobTrace(time.Since(start), c.Name()), err
|
return nil, shared.NewBlobTrace(time.Since(start), c.Name()), err
|
||||||
}
|
}
|
||||||
defer func() { _ = body.Close() }()
|
defer body.Close()
|
||||||
switch status {
|
switch status {
|
||||||
case http.StatusNotFound, http.StatusForbidden:
|
case http.StatusNotFound, http.StatusForbidden:
|
||||||
return nil, shared.NewBlobTrace(time.Since(start), c.Name()), errors.Err(ErrBlobNotFound)
|
return nil, shared.NewBlobTrace(time.Since(start), c.Name()), errors.Err(ErrBlobNotFound)
|
||||||
case http.StatusOK:
|
case http.StatusOK:
|
||||||
b, err := io.ReadAll(body)
|
b, err := ioutil.ReadAll(body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, shared.NewBlobTrace(time.Since(start), c.Name()), errors.Err(err)
|
return nil, shared.NewBlobTrace(time.Since(start), c.Name()), errors.Err(err)
|
||||||
}
|
}
|
||||||
|
@ -81,7 +83,7 @@ func (c *CloudFrontROStore) cfRequest(method, hash string) (int, io.ReadCloser,
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, errors.Err(err)
|
return 0, nil, errors.Err(err)
|
||||||
}
|
}
|
||||||
req.Header.Add("User-Agent", "reflector.go/"+meta.Version())
|
req.Header.Add("User-Agent", "reflector.go/"+meta.Version)
|
||||||
|
|
||||||
res, err := http.DefaultClient.Do(req)
|
res, err := http.DefaultClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -112,22 +112,22 @@ func (d *DBBackedStore) Block(hash string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
//has, err := d.db.HasBlob(hash, false)
|
has, err := d.db.HasBlob(hash, false)
|
||||||
//if err != nil {
|
if err != nil {
|
||||||
// return err
|
return err
|
||||||
//}
|
}
|
||||||
//
|
|
||||||
//if has {
|
if has {
|
||||||
// err = d.blobs.Delete(hash)
|
err = d.blobs.Delete(hash)
|
||||||
// if err != nil {
|
if err != nil {
|
||||||
// return err
|
return err
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// err = d.db.Delete(hash)
|
err = d.db.Delete(hash)
|
||||||
// if err != nil {
|
if err != nil {
|
||||||
// return err
|
return err
|
||||||
// }
|
}
|
||||||
//}
|
}
|
||||||
|
|
||||||
return d.markBlocked(hash)
|
return d.markBlocked(hash)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,13 @@
|
||||||
package store
|
package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"crypto/sha512"
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"time"
|
"time"
|
||||||
|
@ -10,6 +17,10 @@ import (
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
|
||||||
|
"github.com/brk0v/directio"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"go.uber.org/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DiskStore stores blobs on a local disk
|
// DiskStore stores blobs on a local disk
|
||||||
|
@ -21,8 +32,12 @@ type DiskStore struct {
|
||||||
|
|
||||||
// true if initOnce ran, false otherwise
|
// true if initOnce ran, false otherwise
|
||||||
initialized bool
|
initialized bool
|
||||||
|
|
||||||
|
concurrentChecks atomic.Int32
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const maxConcurrentChecks = 30
|
||||||
|
|
||||||
// NewDiskStore returns an initialized file disk store pointer.
|
// NewDiskStore returns an initialized file disk store pointer.
|
||||||
func NewDiskStore(dir string, prefixLength int) *DiskStore {
|
func NewDiskStore(dir string, prefixLength int) *DiskStore {
|
||||||
return &DiskStore{
|
return &DiskStore{
|
||||||
|
@ -53,6 +68,14 @@ func (d *DiskStore) Has(hash string) (bool, error) {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *DiskStore) Size(hash string) (int64, error) {
|
||||||
|
stat, err := os.Stat(d.path(hash))
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return stat.Size(), nil
|
||||||
|
}
|
||||||
|
|
||||||
// Get returns the blob or an error if the blob doesn't exist.
|
// Get returns the blob or an error if the blob doesn't exist.
|
||||||
func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
@ -61,16 +84,69 @@ func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), err
|
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), err
|
||||||
}
|
}
|
||||||
|
|
||||||
blob, err := os.ReadFile(d.path(hash))
|
blob, err := ioutil.ReadFile(d.path(hash))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(ErrBlobNotFound)
|
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(ErrBlobNotFound)
|
||||||
}
|
}
|
||||||
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(err)
|
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// this is a rather poor yet effective way of throttling how many blobs can be checked concurrently
|
||||||
|
// poor because there is a possible race condition between the check and the actual +1
|
||||||
|
if d.concurrentChecks.Load() < maxConcurrentChecks {
|
||||||
|
d.concurrentChecks.Add(1)
|
||||||
|
defer d.concurrentChecks.Sub(1)
|
||||||
|
hashBytes := sha512.Sum384(blob)
|
||||||
|
readHash := hex.EncodeToString(hashBytes[:])
|
||||||
|
if hash != readHash {
|
||||||
|
message := fmt.Sprintf("[%s] found a broken blob while reading from disk. Actual hash: %s", hash, readHash)
|
||||||
|
log.Errorf("%s", message)
|
||||||
|
err := d.Delete(hash)
|
||||||
|
if err != nil {
|
||||||
|
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), err
|
||||||
|
}
|
||||||
|
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return blob, shared.NewBlobTrace(time.Since(start), d.Name()), nil
|
return blob, shared.NewBlobTrace(time.Since(start), d.Name()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Put stores the blob on disk
|
||||||
|
func (d *DiskStore) Put(hash string, blob stream.Blob) error {
|
||||||
|
err := d.initOnce()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = d.ensureDirExists(d.dir(hash))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open file with O_DIRECT
|
||||||
|
f, err := os.OpenFile(d.tmpPath(hash), openFileFlags, 0644)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Err(err)
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
// Use directio writer
|
||||||
|
dio, err := directio.New(f)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Err(err)
|
||||||
|
}
|
||||||
|
defer dio.Flush()
|
||||||
|
// Write the body to file
|
||||||
|
_, err = io.Copy(dio, bytes.NewReader(blob))
|
||||||
|
if err != nil {
|
||||||
|
return errors.Err(err)
|
||||||
|
}
|
||||||
|
err = os.Rename(d.tmpPath(hash), d.path(hash))
|
||||||
|
return errors.Err(err)
|
||||||
|
}
|
||||||
|
|
||||||
// PutSD stores the sd blob on the disk
|
// PutSD stores the sd blob on the disk
|
||||||
func (d *DiskStore) PutSD(hash string, blob stream.Blob) error {
|
func (d *DiskStore) PutSD(hash string, blob stream.Blob) error {
|
||||||
return d.Put(hash, blob)
|
return d.Put(hash, blob)
|
||||||
|
@ -144,3 +220,15 @@ func (d *DiskStore) initOnce() error {
|
||||||
// Shutdown shuts down the store gracefully
|
// Shutdown shuts down the store gracefully
|
||||||
func (d *DiskStore) Shutdown() {
|
func (d *DiskStore) Shutdown() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *DiskStore) Sendfile(hash string, conn net.Conn) error {
|
||||||
|
f, err := os.Open(d.path(hash))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = io.Copy(conn, f)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package store
|
package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -13,9 +14,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDiskStore_Get(t *testing.T) {
|
func TestDiskStore_Get(t *testing.T) {
|
||||||
tmpDir, err := os.MkdirTemp("", "reflector_test_*")
|
tmpDir, err := ioutil.TempDir("", "reflector_test_*")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer func() { _ = os.RemoveAll(tmpDir) }()
|
defer os.RemoveAll(tmpDir)
|
||||||
d := NewDiskStore(tmpDir, 2)
|
d := NewDiskStore(tmpDir, 2)
|
||||||
|
|
||||||
hash := "f428b8265d65dad7f8ffa52922bba836404cbd62f3ecfe10adba6b444f8f658938e54f5981ac4de39644d5b93d89a94b"
|
hash := "f428b8265d65dad7f8ffa52922bba836404cbd62f3ecfe10adba6b444f8f658938e54f5981ac4de39644d5b93d89a94b"
|
||||||
|
@ -24,7 +25,7 @@ func TestDiskStore_Get(t *testing.T) {
|
||||||
expectedPath := path.Join(tmpDir, hash[:2], hash)
|
expectedPath := path.Join(tmpDir, hash[:2], hash)
|
||||||
err = os.MkdirAll(filepath.Dir(expectedPath), os.ModePerm)
|
err = os.MkdirAll(filepath.Dir(expectedPath), os.ModePerm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = os.WriteFile(expectedPath, data, os.ModePerm)
|
err = ioutil.WriteFile(expectedPath, data, os.ModePerm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
blob, _, err := d.Get(hash)
|
blob, _, err := d.Get(hash)
|
||||||
|
@ -33,9 +34,9 @@ func TestDiskStore_Get(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDiskStore_GetNonexistentBlob(t *testing.T) {
|
func TestDiskStore_GetNonexistentBlob(t *testing.T) {
|
||||||
tmpDir, err := os.MkdirTemp("", "reflector_test_*")
|
tmpDir, err := ioutil.TempDir("", "reflector_test_*")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer func() { _ = os.RemoveAll(tmpDir) }()
|
defer os.RemoveAll(tmpDir)
|
||||||
d := NewDiskStore(tmpDir, 2)
|
d := NewDiskStore(tmpDir, 2)
|
||||||
|
|
||||||
blob, _, err := d.Get("nonexistent")
|
blob, _, err := d.Get("nonexistent")
|
||||||
|
|
|
@ -1,42 +0,0 @@
|
||||||
//go:build darwin
|
|
||||||
// +build darwin
|
|
||||||
|
|
||||||
package store
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"io"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
|
||||||
)
|
|
||||||
|
|
||||||
var openFileFlags = os.O_WRONLY | os.O_CREATE
|
|
||||||
|
|
||||||
// Put stores the blob on disk
|
|
||||||
func (d *DiskStore) Put(hash string, blob stream.Blob) error {
|
|
||||||
err := d.initOnce()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = d.ensureDirExists(d.dir(hash))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Open file with O_DIRECT
|
|
||||||
f, err := os.OpenFile(d.tmpPath(hash), openFileFlags, 0644)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Err(err)
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
|
|
||||||
_, err = io.Copy(f, bytes.NewReader(blob))
|
|
||||||
if err != nil {
|
|
||||||
return errors.Err(err)
|
|
||||||
}
|
|
||||||
err = os.Rename(d.tmpPath(hash), d.path(hash))
|
|
||||||
return errors.Err(err)
|
|
||||||
}
|
|
|
@ -1,49 +0,0 @@
|
||||||
//go:build linux
|
|
||||||
// +build linux
|
|
||||||
|
|
||||||
package store
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"io"
|
|
||||||
"os"
|
|
||||||
"syscall"
|
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
|
||||||
|
|
||||||
"github.com/brk0v/directio"
|
|
||||||
)
|
|
||||||
|
|
||||||
var openFileFlags = os.O_WRONLY | os.O_CREATE | syscall.O_DIRECT
|
|
||||||
|
|
||||||
// Put stores the blob on disk
|
|
||||||
func (d *DiskStore) Put(hash string, blob stream.Blob) error {
|
|
||||||
err := d.initOnce()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = d.ensureDirExists(d.dir(hash))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Open file with O_DIRECT
|
|
||||||
f, err := os.OpenFile(d.tmpPath(hash), openFileFlags, 0644)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Err(err)
|
|
||||||
}
|
|
||||||
defer func() { _ = f.Close() }()
|
|
||||||
dio, err := directio.New(f)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Err(err)
|
|
||||||
}
|
|
||||||
defer func() { _ = dio.Flush() }()
|
|
||||||
_, err = io.Copy(dio, bytes.NewReader(blob))
|
|
||||||
if err != nil {
|
|
||||||
return errors.Err(err)
|
|
||||||
}
|
|
||||||
err = os.Rename(d.tmpPath(hash), d.path(hash))
|
|
||||||
return errors.Err(err)
|
|
||||||
}
|
|
9
store/flags_darwin.go
Normal file
9
store/flags_darwin.go
Normal file
|
@ -0,0 +1,9 @@
|
||||||
|
// +build darwin
|
||||||
|
|
||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
var openFileFlags = os.O_WRONLY | os.O_CREATE
|
10
store/flags_linux.go
Normal file
10
store/flags_linux.go
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
// +build linux
|
||||||
|
|
||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"syscall"
|
||||||
|
)
|
||||||
|
|
||||||
|
var openFileFlags = os.O_WRONLY | os.O_CREATE | syscall.O_DIRECT
|
|
@ -59,7 +59,7 @@ func NewGcacheStore(component string, store BlobStore, maxSize int, strategy Evi
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
if lstr, ok := store.(lister); ok {
|
if lstr, ok := store.(lister); ok {
|
||||||
err := l.loadExisting(lstr, maxSize)
|
err := l.loadExisting(lstr, int(maxSize))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err) // TODO: what should happen here? panic? return nil? just keep going?
|
panic(err) // TODO: what should happen here? panic? return nil? just keep going?
|
||||||
}
|
}
|
||||||
|
@ -144,15 +144,11 @@ func (l *GcacheStore) loadExisting(store lister, maxItems int) error {
|
||||||
logrus.Infof("read %d files from underlying store", len(existing))
|
logrus.Infof("read %d files from underlying store", len(existing))
|
||||||
|
|
||||||
added := 0
|
added := 0
|
||||||
for i, h := range existing {
|
for _, h := range existing {
|
||||||
_ = l.cache.Set(h, true)
|
_ = l.cache.Set(h, true)
|
||||||
added++
|
added++
|
||||||
if maxItems > 0 && added >= maxItems { // underlying cache is bigger than the cache
|
if maxItems > 0 && added >= maxItems { // underlying cache is bigger than the cache
|
||||||
err := l.Delete(h)
|
break
|
||||||
logrus.Infof("deleted overflowing blob: %s (%d/%d)", h, i, len(existing))
|
|
||||||
if err != nil {
|
|
||||||
logrus.Warnf("error while deleting a blob that's overflowing the cache: %s", err.Error())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -2,6 +2,7 @@ package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -87,9 +88,9 @@ func TestGcacheStore_UnderlyingBlobMissing(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGcacheStore_loadExisting(t *testing.T) {
|
func TestGcacheStore_loadExisting(t *testing.T) {
|
||||||
tmpDir, err := os.MkdirTemp("", "reflector_test_*")
|
tmpDir, err := ioutil.TempDir("", "reflector_test_*")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer func() { _ = os.RemoveAll(tmpDir) }()
|
defer os.RemoveAll(tmpDir)
|
||||||
d := NewDiskStore(tmpDir, 2)
|
d := NewDiskStore(tmpDir, 2)
|
||||||
|
|
||||||
hash := "hash"
|
hash := "hash"
|
||||||
|
|
|
@ -2,8 +2,8 @@ package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -20,14 +20,12 @@ import (
|
||||||
type HttpStore struct {
|
type HttpStore struct {
|
||||||
upstream string
|
upstream string
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
edgeToken string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHttpStore(upstream, edgeToken string) *HttpStore {
|
func NewHttpStore(upstream string) *HttpStore {
|
||||||
return &HttpStore{
|
return &HttpStore{
|
||||||
upstream: "http://" + upstream,
|
upstream: "http://" + upstream,
|
||||||
httpClient: getClient(),
|
httpClient: getClient(),
|
||||||
edgeToken: edgeToken,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,7 +44,7 @@ func (n *HttpStore) Has(hash string) (bool, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, errors.Err(err)
|
return false, errors.Err(err)
|
||||||
}
|
}
|
||||||
defer func() { _ = res.Body.Close() }()
|
defer res.Body.Close()
|
||||||
if res.StatusCode == http.StatusNotFound {
|
if res.StatusCode == http.StatusNotFound {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
@ -55,7 +53,7 @@ func (n *HttpStore) Has(hash string) (bool, error) {
|
||||||
}
|
}
|
||||||
var body []byte
|
var body []byte
|
||||||
if res.Body != nil {
|
if res.Body != nil {
|
||||||
body, _ = io.ReadAll(res.Body)
|
body, _ = ioutil.ReadAll(res.Body)
|
||||||
}
|
}
|
||||||
return false, errors.Err("upstream error. Status code: %d (%s)", res.StatusCode, string(body))
|
return false, errors.Err("upstream error. Status code: %d (%s)", res.StatusCode, string(body))
|
||||||
}
|
}
|
||||||
|
@ -63,9 +61,6 @@ func (n *HttpStore) Has(hash string) (bool, error) {
|
||||||
func (n *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
func (n *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
url := n.upstream + "/blob?hash=" + hash
|
url := n.upstream + "/blob?hash=" + hash
|
||||||
if n.edgeToken != "" {
|
|
||||||
url += "&edge_token=" + n.edgeToken
|
|
||||||
}
|
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", url, nil)
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -76,7 +71,7 @@ func (n *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, shared.NewBlobTrace(time.Since(start), n.Name()), errors.Err(err)
|
return nil, shared.NewBlobTrace(time.Since(start), n.Name()), errors.Err(err)
|
||||||
}
|
}
|
||||||
defer func() { _ = res.Body.Close() }()
|
defer res.Body.Close()
|
||||||
tmp := getBuffer()
|
tmp := getBuffer()
|
||||||
defer putBuffer(tmp)
|
defer putBuffer(tmp)
|
||||||
serialized := res.Header.Get("Via")
|
serialized := res.Header.Get("Via")
|
||||||
|
@ -105,7 +100,7 @@ func (n *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
}
|
}
|
||||||
var body []byte
|
var body []byte
|
||||||
if res.Body != nil {
|
if res.Body != nil {
|
||||||
body, _ = io.ReadAll(res.Body)
|
body, _ = ioutil.ReadAll(res.Body)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, trace.Stack(time.Since(start), n.Name()), errors.Err("upstream error. Status code: %d (%s)", res.StatusCode, string(body))
|
return nil, trace.Stack(time.Since(start), n.Name()), errors.Err("upstream error. Status code: %d (%s)", res.StatusCode, string(body))
|
||||||
|
@ -143,19 +138,14 @@ func putBuffer(buf *bytes.Buffer) {
|
||||||
buffers.Put(buf)
|
buffers.Put(buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
|
|
||||||
dialer := &net.Dialer{
|
|
||||||
Timeout: 30 * time.Second,
|
|
||||||
KeepAlive: 30 * time.Second,
|
|
||||||
}
|
|
||||||
return dialer.DialContext(ctx, network, address)
|
|
||||||
}
|
|
||||||
|
|
||||||
// getClient gets an http client that's customized to be more performant when dealing with blobs of 2MB in size (most of our blobs)
|
// getClient gets an http client that's customized to be more performant when dealing with blobs of 2MB in size (most of our blobs)
|
||||||
func getClient() *http.Client {
|
func getClient() *http.Client {
|
||||||
// Customize the Transport to have larger connection pool
|
// Customize the Transport to have larger connection pool
|
||||||
defaultTransport := &http.Transport{
|
defaultTransport := &http.Transport{
|
||||||
DialContext: dialContext,
|
DialContext: (&net.Dialer{
|
||||||
|
Timeout: 30 * time.Second,
|
||||||
|
KeepAlive: 30 * time.Second,
|
||||||
|
}).DialContext,
|
||||||
ForceAttemptHTTP2: true,
|
ForceAttemptHTTP2: true,
|
||||||
MaxIdleConns: 100,
|
MaxIdleConns: 100,
|
||||||
IdleConnTimeout: 90 * time.Second,
|
IdleConnTimeout: 90 * time.Second,
|
||||||
|
|
|
@ -1,8 +1,7 @@
|
||||||
package speedwalk
|
package speedwalk
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io/fs"
|
"io/ioutil"
|
||||||
"os"
|
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -18,19 +17,7 @@ import (
|
||||||
// AllFiles recursively lists every file in every subdirectory of a given directory
|
// AllFiles recursively lists every file in every subdirectory of a given directory
|
||||||
// If basename is true, return the basename of each file. Otherwise return the full path starting at startDir.
|
// If basename is true, return the basename of each file. Otherwise return the full path starting at startDir.
|
||||||
func AllFiles(startDir string, basename bool) ([]string, error) {
|
func AllFiles(startDir string, basename bool) ([]string, error) {
|
||||||
entries, err := os.ReadDir(startDir)
|
items, err := ioutil.ReadDir(startDir)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
items := make([]fs.FileInfo, 0, len(entries))
|
|
||||||
for _, entry := range entries {
|
|
||||||
info, err := entry.Info()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
items = append(items, info)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue