From e80b17b2ada25cc3bfd2cd9773bcc02c145bb44c Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Sat, 22 Feb 2014 23:47:11 -0500 Subject: [PATCH] initial delta support --- server/announce.go | 31 +++++++++++++++++++++++++++++-- server/scrape.go | 2 +- server/server.go | 25 ++++++++++++++++++------- storage/backend/backend.go | 2 +- storage/backend/mock/driver.go | 31 +++++++++++++++++++++++++++++++ 5 files changed, 80 insertions(+), 11 deletions(-) diff --git a/server/announce.go b/server/announce.go index ca39aeb..da51de3 100644 --- a/server/announce.go +++ b/server/announce.go @@ -13,6 +13,7 @@ import ( "time" "github.com/chihaya/chihaya/storage" + "github.com/chihaya/chihaya/storage/backend" ) func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { @@ -24,7 +25,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { } // Get a connection to the tracker db - conn, err := s.dbConnPool.Get() + conn, err := s.trackerPool.Get() if err != nil { log.Panicf("server: %s", err) } @@ -65,6 +66,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { } } + now := time.Now().Unix() // Create a new peer object from the request peer := &storage.Peer{ ID: peerID, @@ -75,7 +77,14 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { Uploaded: uploaded, Downloaded: downloaded, Left: left, - LastAnnounce: time.Now().Unix(), + LastAnnounce: now, + } + + delta := &backend.AnnounceDelta{ + Peer: peer, + Torrent: torrent, + User: user, + Timestamp: now, } // Look for the user in in the pool of seeders and leechers @@ -126,6 +135,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { log.Panicf("server: %s", err) } } + delta.Created = true } // Handle any events in the request @@ -149,6 +159,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { if err != nil { log.Panicf("server: %s", err) } + delta.Snatched = true if leecher { err := conn.LeecherFinished(torrent, peer) if err != nil { @@ -217,6 +228,22 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { } } writeBencoded(w, "e") + + rawDeltaUp := peer.Uploaded - uploaded + rawDeltaDown := peer.Downloaded - downloaded + + // Restarting a torrent may cause a delta to be negative. + if rawDeltaUp < 0 { + rawDeltaUp = 0 + } + if rawDeltaDown < 0 { + rawDeltaDown = 0 + } + + delta.Uploaded = uint64(float64(rawDeltaUp) * user.UpMultiplier * torrent.UpMultiplier) + delta.Downloaded = uint64(float64(rawDeltaDown) * user.DownMultiplier * torrent.DownMultiplier) + + s.backendCon.RecordAnnounce(delta) } func (s Server) validateAnnounceQuery(r *http.Request) (compact bool, numWant int, infohash, peerID, event, ip string, port, uploaded, downloaded, left uint64, err error) { diff --git a/server/scrape.go b/server/scrape.go index d1ebe6e..15fdf0b 100644 --- a/server/scrape.go +++ b/server/scrape.go @@ -23,7 +23,7 @@ func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) { } // Get a connection to the tracker db - conn, err := s.dbConnPool.Get() + conn, err := s.trackerPool.Get() if err != nil { log.Fatal(err) } diff --git a/server/server.go b/server/server.go index 281c61b..2144843 100644 --- a/server/server.go +++ b/server/server.go @@ -20,13 +20,15 @@ import ( "github.com/chihaya/chihaya/config" "github.com/chihaya/chihaya/storage" + "github.com/chihaya/chihaya/storage/backend" "github.com/chihaya/chihaya/storage/tracker" ) type Server struct { - conf *config.Config - listener *stoppableListener.StoppableListener - dbConnPool tracker.Pool + conf *config.Config + listener *stoppableListener.StoppableListener + trackerPool tracker.Pool + backendCon backend.Conn startTime time.Time @@ -37,14 +39,23 @@ type Server struct { } func New(conf *config.Config) (*Server, error) { - pool, err := tracker.Open(&conf.Tracker) + trackerPool, err := tracker.Open(&conf.Tracker) + if err != nil { + return nil, err + } + backendConn, err := backend.Open(&conf.Backend) + if err != nil { + return nil, err + } + err = backendConn.Start() if err != nil { return nil, err } s := &Server{ - conf: conf, - dbConnPool: pool, + conf: conf, + trackerPool: trackerPool, + backendCon: backendConn, Server: http.Server{ Addr: conf.Addr, ReadTimeout: conf.ReadTimeout.Duration, @@ -72,7 +83,7 @@ func (s *Server) ListenAndServe() error { func (s *Server) Stop() error { s.listener.Stop <- true - err := s.dbConnPool.Close() + err := s.trackerPool.Close() if err != nil { return err } diff --git a/storage/backend/backend.go b/storage/backend/backend.go index 1b84e44..7934995 100644 --- a/storage/backend/backend.go +++ b/storage/backend/backend.go @@ -91,7 +91,7 @@ type AnnounceDelta struct { Downloaded uint64 // Timestamp is the unix timestamp this announce occurred at - Timestamp float64 + Timestamp int64 // Snatched is true if this announce completed the download Snatched bool diff --git a/storage/backend/mock/driver.go b/storage/backend/mock/driver.go index b8c7224..f3b1deb 100644 --- a/storage/backend/mock/driver.go +++ b/storage/backend/mock/driver.go @@ -9,15 +9,46 @@ package mock import ( "github.com/chihaya/chihaya/config" + "github.com/chihaya/chihaya/storage" "github.com/chihaya/chihaya/storage/backend" ) type driver struct{} +type mock struct{} + func (d *driver) New(conf *config.DataStore) backend.Conn { + return &mock{} +} + +func (m *mock) Start() error { return nil } +func (m *mock) Close() error { + return nil +} + +func (m *mock) RecordAnnounce(delta *backend.AnnounceDelta) error { + return nil +} + +func (m *mock) LoadTorrents(ids []uint64) ([]*storage.Torrent, error) { + return nil, nil +} + +func (m *mock) LoadAllTorrents() ([]*storage.Torrent, error) { + return nil, nil +} + +func (m *mock) LoadUsers(ids []uint64) ([]*storage.User, error) { + return nil, nil +} + +func (m *mock) LoadAllUsers(ids []uint64) ([]*storage.User, error) { + return nil, nil +} + func init() { backend.Register("mock", &driver{}) }