peer protocol partially done

This commit is contained in:
Alex Grintsvayg 2018-01-29 14:37:26 -05:00
parent 3c8416b576
commit 5592f00c11
9 changed files with 490 additions and 75 deletions

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
/.idea
/blobs
/config.json

84
main.go
View file

@ -1,11 +1,16 @@
package main
import (
"encoding/json"
"flag"
"io/ioutil"
"math/rand"
"strconv"
"time"
"github.com/lbryio/reflector.go/peer"
"github.com/lbryio/reflector.go/store"
log "github.com/sirupsen/logrus"
)
@ -18,39 +23,58 @@ func checkErr(err error) {
func main() {
rand.Seed(time.Now().UnixNano())
port := DefaultPort
//address := "52.14.109.125:" + strconv.Itoa(port)
address := "localhost:" + strconv.Itoa(port)
serve := flag.Bool("server", false, "Run server")
blobDir := flag.String("blobdir", "", "Where blobs will be saved to")
confFile := flag.String("conf", "config.json", "Config file")
flag.Parse()
if serve != nil && *serve {
if blobDir == nil || *blobDir == "" {
log.Fatal("-blobdir required")
}
server := NewServer(*blobDir)
log.Fatal(server.ListenAndServe(address))
return
}
var err error
client := Client{}
conf := loadConfig(*confFile)
log.Println("Connecting to " + address)
err = client.Connect(address)
checkErr(err)
peerAddress := "localhost:" + strconv.Itoa(peer.DefaultPort)
server := peer.NewServer(store.NewS3BlobStore(conf.AwsID, conf.AwsSecret, conf.BucketRegion, conf.BucketName))
log.Fatal(server.ListenAndServe(peerAddress))
return
log.Println("Connected")
//
//address := "52.14.109.125:" + strconv.Itoa(port)
//reflectorAddress := "localhost:" + strconv.Itoa(reflector.DefaultPort)
//server := reflector.NewServer(store.NewS3BlobStore(conf.awsID, conf.awsSecret, conf.bucketRegion, conf.bucketName))
//log.Fatal(server.ListenAndServe(reflectorAddress))
defer func() {
log.Println("Closing connection")
client.Close()
}()
blob := make([]byte, 2*1024*1024)
_, err = rand.Read(blob)
checkErr(err)
err = client.SendBlob(blob)
checkErr(err)
//
//var err error
//client := reflector.Client{}
//
//log.Println("Connecting to " + reflectorAddress)
//err = client.Connect(reflectorAddress)
//checkErr(err)
//
//log.Println("Connected")
//
//defer func() {
// log.Println("Closing connection")
// client.Close()
//}()
//
//blob := make([]byte, 2*1024*1024)
//_, err = rand.Read(blob)
//checkErr(err)
//err = client.SendBlob(blob)
//checkErr(err)
}
type config struct {
AwsID string `json:"aws_id"`
AwsSecret string `json:"aws_secret"`
BucketRegion string `json:"bucket_region"`
BucketName string `json:"bucket_name"`
}
func loadConfig(path string) config {
raw, err := ioutil.ReadFile(path)
checkErr(err)
var c config
err = json.Unmarshal(raw, &c)
checkErr(err)
return c
}

189
peer/server.go Normal file
View file

@ -0,0 +1,189 @@
package peer
import (
"crypto/sha512"
"encoding/hex"
"encoding/json"
"io"
"net"
"github.com/lbryio/reflector.go/store"
log "github.com/sirupsen/logrus"
)
const (
DefaultPort = 3333
)
type Server struct {
store store.BlobStore
}
func NewServer(store store.BlobStore) *Server {
return &Server{
store: store,
}
}
func (s *Server) ListenAndServe(address string) error {
log.Println("Listening on " + address)
l, err := net.Listen("tcp", address)
if err != nil {
return err
}
defer l.Close()
for {
conn, err := l.Accept()
if err != nil {
log.Error(err)
} else {
go s.handleConn(conn)
}
}
}
func (s *Server) handleConn(conn net.Conn) {
// TODO: connection should time out eventually
defer conn.Close()
err := s.doAvailabilityRequest(conn)
if err != nil {
log.Error(err)
return
}
err = s.doPaymentRateNegotiation(conn)
if err != nil {
log.Error(err)
return
}
for {
err = s.doBlobRequest(conn)
if err != nil {
if err != io.EOF {
log.Error(err)
}
return
}
}
}
func (s *Server) doAvailabilityRequest(conn net.Conn) error {
var request availabilityRequest
err := json.NewDecoder(conn).Decode(&request)
if err != nil {
return err
}
address := "bJxKvpD96kaJLriqVajZ7SaQTsWWyrGQct"
availableBlobs := []string{}
for _, blobHash := range request.RequestedBlobs {
exists, err := s.store.Has(blobHash)
if err != nil {
return err
}
if exists {
availableBlobs = append(availableBlobs, blobHash)
}
}
response, err := json.Marshal(availabilityResponse{LbrycrdAddress: address, AvailableBlobs: availableBlobs})
if err != nil {
return err
}
_, err = conn.Write(response)
if err != nil {
return err
}
return nil
}
func (s *Server) doPaymentRateNegotiation(conn net.Conn) error {
var request paymentRateRequest
err := json.NewDecoder(conn).Decode(&request)
if err != nil {
return err
}
offerReply := paymentRateAccepted
if request.BlobDataPaymentRate < 0 {
offerReply = paymentRateTooLow
}
response, err := json.Marshal(paymentRateResponse{BlobDataPaymentRate: offerReply})
if err != nil {
return err
}
_, err = conn.Write(response)
if err != nil {
return err
}
return nil
}
func (s *Server) doBlobRequest(conn net.Conn) error {
var request blobRequest
err := json.NewDecoder(conn).Decode(&request)
if err != nil {
return err
}
log.Println("Sending blob " + request.RequestedBlob[:8])
blob, err := s.store.Get(request.RequestedBlob)
if err != nil {
return err
}
response, err := json.Marshal(blobResponse{IncomingBlob: incomingBlob{
BlobHash: getBlobHash(blob),
Length: len(blob),
}})
if err != nil {
return err
}
_, err = conn.Write(response)
if err != nil {
return err
}
_, err = conn.Write(blob)
if err != nil {
return err
}
return nil
}
func readAll(conn net.Conn) {
buf := make([]byte, 0, 4096) // big buffer
tmp := make([]byte, 256) // using small tmo buffer for demonstrating
for {
n, err := conn.Read(tmp)
if err != nil {
if err != io.EOF {
log.Println("read error:", err)
}
break
}
log.Println("got", n, "bytes.")
buf = append(buf, tmp[:n]...)
}
log.Println("total size:", len(buf))
if len(buf) > 0 {
log.Println(string(buf))
}
}
func getBlobHash(blob []byte) string {
hashBytes := sha512.Sum384(blob)
return hex.EncodeToString(hashBytes[:])
}

38
peer/shared.go Normal file
View file

@ -0,0 +1,38 @@
package peer
type availabilityRequest struct {
LbrycrdAddress bool `json:"lbrycrd_address"`
RequestedBlobs []string `json:"requested_blobs"`
}
type availabilityResponse struct {
LbrycrdAddress string `json:"lbrycrd_address"`
AvailableBlobs []string `json:"available_blobs"`
}
const (
paymentRateAccepted = "RATE_ACCEPTED"
paymentRateTooLow = "RATE_TOO_LOW"
paymentRateUnset = "RATE_UNSET"
)
type paymentRateRequest struct {
BlobDataPaymentRate float64 `json:"blob_data_payment_rate"`
}
type paymentRateResponse struct {
BlobDataPaymentRate string `json:"blob_data_payment_rate"`
}
type blobRequest struct {
RequestedBlob string `json:"requested_blob"`
}
type incomingBlob struct {
Error string `json:"error,omitempty"`
BlobHash string `json:"blob_hash"`
Length int `json:"length"`
}
type blobResponse struct {
IncomingBlob incomingBlob `json:"incoming_blob"`
}

View file

@ -1,4 +1,4 @@
package main
package reflector
import (
"encoding/json"
@ -33,8 +33,8 @@ func (c *Client) SendBlob(blob []byte) error {
return errors.Err("not connected")
}
if len(blob) != BlobSize {
return errors.Err("blob must be exactly " + strconv.Itoa(BlobSize) + " bytes")
if len(blob) != maxBlobSize {
return errors.Err("blob must be exactly " + strconv.Itoa(maxBlobSize) + " bytes")
}
blobHash := getBlobHash(blob)

View file

@ -1,4 +1,4 @@
package main
package reflector
import (
"io/ioutil"

View file

@ -1,37 +1,30 @@
package main
package reflector
import (
"bufio"
"encoding/json"
"io"
"io/ioutil"
"net"
"os"
"path"
"strconv"
"github.com/lbryio/reflector.go/store"
"github.com/lbryio/errors.go"
log "github.com/sirupsen/logrus"
)
type Server struct {
BlobDir string
store store.BlobStore
}
func NewServer(blobDir string) *Server {
func NewServer(store store.BlobStore) *Server {
return &Server{
BlobDir: blobDir,
store: store,
}
}
func (s *Server) ListenAndServe(address string) error {
log.Println("Blobs will be saved to " + s.BlobDir)
err := s.ensureBlobDirExists()
if err != nil {
return err
}
log.Println("Listening on " + address)
l, err := net.Listen("tcp", address)
if err != nil {
@ -42,10 +35,10 @@ func (s *Server) ListenAndServe(address string) error {
for {
conn, err := l.Accept()
if err != nil {
// TODO: dont crash server on error here
return err
log.Error(err)
} else {
go s.handleConn(conn)
}
go s.handleConn(conn)
}
}
@ -93,11 +86,13 @@ func (s *Server) receiveBlob(conn net.Conn) error {
}
blobExists := false
blobPath := path.Join(s.BlobDir, blobHash)
if !isSdBlob { // we have to say sd blobs are missing because if we say we have it, they wont try to send any content blobs
if _, err := os.Stat(blobPath); !os.IsNotExist(err) {
blobExists = true
if !isSdBlob {
// we have to say sd blobs are missing because if we say we have it, they wont try to send any content blobs
has, err := s.store.Has(blobHash)
if err != nil {
return err
}
blobExists = has
}
err = s.sendBlobResponse(conn, blobExists, isSdBlob)
@ -122,7 +117,7 @@ func (s *Server) receiveBlob(conn net.Conn) error {
}
log.Println("Got blob " + blobHash[:8])
err = ioutil.WriteFile(blobPath, blob, 0644)
err = s.store.Put(blobHash, blob)
if err != nil {
return err
}
@ -167,8 +162,8 @@ func (s *Server) readBlobRequest(conn net.Conn) (int, string, bool, error) {
var blobSize int
isSdBlob := sendRequest.SdBlobHash != ""
if blobSize > BlobSize {
return 0, "", isSdBlob, errors.Err("blob cannot be more than " + strconv.Itoa(BlobSize) + " bytes")
if blobSize > maxBlobSize {
return 0, "", isSdBlob, errors.Err("blob cannot be more than " + strconv.Itoa(maxBlobSize) + " bytes")
}
if isSdBlob {
@ -222,19 +217,3 @@ func (s *Server) sendTransferResponse(conn net.Conn, receivedBlob, isSdBlob bool
}
return nil
}
func (s *Server) ensureBlobDirExists() error {
if stat, err := os.Stat(s.BlobDir); err != nil {
if os.IsNotExist(err) {
err2 := os.Mkdir(s.BlobDir, 0755)
if err2 != nil {
return err2
}
} else {
return err
}
} else if !stat.IsDir() {
return errors.Err("blob dir exists but is not a dir")
}
return nil
}

View file

@ -1,4 +1,4 @@
package main
package reflector
import (
"crypto/sha512"
@ -10,7 +10,7 @@ import (
const (
DefaultPort = 5566
BlobSize = 2 * 1024 * 1024
maxBlobSize = 2 * 1024 * 1024
protocolVersion1 = 0
protocolVersion2 = 1

184
store/store.go Normal file
View file

@ -0,0 +1,184 @@
package store
import (
"bytes"
"io/ioutil"
"net/http"
"os"
"path"
"github.com/lbryio/errors.go"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
type BlobStore interface {
Has(string) (bool, error)
Get(string) ([]byte, error)
Put(string, []byte) error
}
type FileBlobStore struct {
dir string
initialized bool
}
func NewFileBlobStore(dir string) *FileBlobStore {
return &FileBlobStore{dir: dir}
}
func (f *FileBlobStore) path(hash string) string {
return path.Join(f.dir, hash)
}
func (f *FileBlobStore) initOnce() error {
if f.initialized {
return nil
}
defer func() { f.initialized = true }()
if stat, err := os.Stat(f.dir); err != nil {
if os.IsNotExist(err) {
err2 := os.Mkdir(f.dir, 0755)
if err2 != nil {
return err2
}
} else {
return err
}
} else if !stat.IsDir() {
return errors.Err("blob dir exists but is not a dir")
}
return nil
}
func (f *FileBlobStore) Has(hash string) (bool, error) {
err := f.initOnce()
if err != nil {
return false, err
}
_, err = os.Stat(f.path(hash))
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
return true, nil
}
func (f *FileBlobStore) Get(hash string) ([]byte, error) {
err := f.initOnce()
if err != nil {
return []byte{}, err
}
file, err := os.Open(f.path(hash))
if err != nil {
return []byte{}, err
}
return ioutil.ReadAll(file)
}
func (f *FileBlobStore) Put(hash string, blob []byte) error {
err := f.initOnce()
if err != nil {
return err
}
return ioutil.WriteFile(f.path(hash), blob, 0644)
}
type S3BlobStore struct {
awsID string
awsSecret string
region string
bucket string
session *session.Session
}
func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore {
return &S3BlobStore{
awsID: awsID,
awsSecret: awsSecret,
region: region,
bucket: bucket,
}
}
func (s *S3BlobStore) initOnce() error {
if s.session != nil {
return nil
}
sess, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""),
Region: aws.String(s.region),
})
if err != nil {
return err
}
s.session = sess
return nil
}
func (s *S3BlobStore) Has(hash string) (bool, error) {
err := s.initOnce()
if err != nil {
return false, err
}
_, err = s3.New(s.session).HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(hash),
})
if err != nil {
if reqFail, ok := err.(s3.RequestFailure); ok && reqFail.StatusCode() == http.StatusNotFound {
return false, nil
}
return false, err
}
return true, nil
}
func (s *S3BlobStore) Get(hash string) ([]byte, error) {
err := s.initOnce()
if err != nil {
return []byte{}, err
}
buf := &aws.WriteAtBuffer{}
_, err = s3manager.NewDownloader(s.session).Download(buf, &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(hash),
})
if err != nil {
return buf.Bytes(), err
}
return buf.Bytes(), nil
}
func (s *S3BlobStore) Put(hash string, blob []byte) error {
err := s.initOnce()
if err != nil {
return err
}
_, err = s3manager.NewUploader(s.session).Upload(&s3manager.UploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(hash),
Body: bytes.NewBuffer(blob),
})
return err
}