initial delta support

This commit is contained in:
Jimmy Zelinskie 2014-02-22 23:47:11 -05:00
parent e8e692cbf6
commit e80b17b2ad
5 changed files with 80 additions and 11 deletions

View file

@ -13,6 +13,7 @@ import (
"time"
"github.com/chihaya/chihaya/storage"
"github.com/chihaya/chihaya/storage/backend"
)
func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
@ -24,7 +25,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
}
// Get a connection to the tracker db
conn, err := s.dbConnPool.Get()
conn, err := s.trackerPool.Get()
if err != nil {
log.Panicf("server: %s", err)
}
@ -65,6 +66,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
}
}
now := time.Now().Unix()
// Create a new peer object from the request
peer := &storage.Peer{
ID: peerID,
@ -75,7 +77,14 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
Uploaded: uploaded,
Downloaded: downloaded,
Left: left,
LastAnnounce: time.Now().Unix(),
LastAnnounce: now,
}
delta := &backend.AnnounceDelta{
Peer: peer,
Torrent: torrent,
User: user,
Timestamp: now,
}
// Look for the user in in the pool of seeders and leechers
@ -126,6 +135,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
log.Panicf("server: %s", err)
}
}
delta.Created = true
}
// Handle any events in the request
@ -149,6 +159,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
if err != nil {
log.Panicf("server: %s", err)
}
delta.Snatched = true
if leecher {
err := conn.LeecherFinished(torrent, peer)
if err != nil {
@ -217,6 +228,22 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
}
}
writeBencoded(w, "e")
rawDeltaUp := peer.Uploaded - uploaded
rawDeltaDown := peer.Downloaded - downloaded
// Restarting a torrent may cause a delta to be negative.
if rawDeltaUp < 0 {
rawDeltaUp = 0
}
if rawDeltaDown < 0 {
rawDeltaDown = 0
}
delta.Uploaded = uint64(float64(rawDeltaUp) * user.UpMultiplier * torrent.UpMultiplier)
delta.Downloaded = uint64(float64(rawDeltaDown) * user.DownMultiplier * torrent.DownMultiplier)
s.backendCon.RecordAnnounce(delta)
}
func (s Server) validateAnnounceQuery(r *http.Request) (compact bool, numWant int, infohash, peerID, event, ip string, port, uploaded, downloaded, left uint64, err error) {

View file

@ -23,7 +23,7 @@ func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) {
}
// Get a connection to the tracker db
conn, err := s.dbConnPool.Get()
conn, err := s.trackerPool.Get()
if err != nil {
log.Fatal(err)
}

View file

@ -20,13 +20,15 @@ import (
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/storage"
"github.com/chihaya/chihaya/storage/backend"
"github.com/chihaya/chihaya/storage/tracker"
)
type Server struct {
conf *config.Config
listener *stoppableListener.StoppableListener
dbConnPool tracker.Pool
trackerPool tracker.Pool
backendCon backend.Conn
startTime time.Time
@ -37,14 +39,23 @@ type Server struct {
}
func New(conf *config.Config) (*Server, error) {
pool, err := tracker.Open(&conf.Tracker)
trackerPool, err := tracker.Open(&conf.Tracker)
if err != nil {
return nil, err
}
backendConn, err := backend.Open(&conf.Backend)
if err != nil {
return nil, err
}
err = backendConn.Start()
if err != nil {
return nil, err
}
s := &Server{
conf: conf,
dbConnPool: pool,
trackerPool: trackerPool,
backendCon: backendConn,
Server: http.Server{
Addr: conf.Addr,
ReadTimeout: conf.ReadTimeout.Duration,
@ -72,7 +83,7 @@ func (s *Server) ListenAndServe() error {
func (s *Server) Stop() error {
s.listener.Stop <- true
err := s.dbConnPool.Close()
err := s.trackerPool.Close()
if err != nil {
return err
}

View file

@ -91,7 +91,7 @@ type AnnounceDelta struct {
Downloaded uint64
// Timestamp is the unix timestamp this announce occurred at
Timestamp float64
Timestamp int64
// Snatched is true if this announce completed the download
Snatched bool

View file

@ -9,15 +9,46 @@ package mock
import (
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/storage"
"github.com/chihaya/chihaya/storage/backend"
)
type driver struct{}
type mock struct{}
func (d *driver) New(conf *config.DataStore) backend.Conn {
return &mock{}
}
func (m *mock) Start() error {
return nil
}
func (m *mock) Close() error {
return nil
}
func (m *mock) RecordAnnounce(delta *backend.AnnounceDelta) error {
return nil
}
func (m *mock) LoadTorrents(ids []uint64) ([]*storage.Torrent, error) {
return nil, nil
}
func (m *mock) LoadAllTorrents() ([]*storage.Torrent, error) {
return nil, nil
}
func (m *mock) LoadUsers(ids []uint64) ([]*storage.User, error) {
return nil, nil
}
func (m *mock) LoadAllUsers(ids []uint64) ([]*storage.User, error) {
return nil, nil
}
func init() {
backend.Register("mock", &driver{})
}