From 080a24c7be20b1537fa5a30e6253cc8da405924e Mon Sep 17 00:00:00 2001
From: Justin Li <jli.justinli@gmail.com>
Date: Fri, 6 Sep 2013 18:39:14 -0400
Subject: [PATCH] Set up initial outbound synchronization structure

---
 cache/cache.go             |  4 +-
 cache/redis/redis.go       |  2 +-
 cache/redis/redis_test.go  |  2 +-
 config/config.go           |  4 +-
 config/example.json        |  2 +-
 main.go                    |  4 +-
 models/models.go           | 10 +++++
 storage/gazelle/flush.go   | 62 +++++++++++++++++++++++++++
 storage/gazelle/gazelle.go | 88 ++++++++++++++++++++++++++++++++++++++
 storage/storage.go         |  5 ++-
 10 files changed, 173 insertions(+), 10 deletions(-)
 create mode 100644 storage/gazelle/flush.go
 create mode 100644 storage/gazelle/gazelle.go

diff --git a/cache/cache.go b/cache/cache.go
index 5f3ef69..569d9c7 100644
--- a/cache/cache.go
+++ b/cache/cache.go
@@ -15,8 +15,8 @@ import (
 )
 
 var (
-	drivers   = make(map[string]Driver)
-	ErrTxDone = errors.New("cache: Transaction has already been committed or rolled back")
+	drivers       = make(map[string]Driver)
+	ErrTxDone     = errors.New("cache: Transaction has already been committed or rolled back")
 	ErrTxConflict = errors.New("cache: Commit interrupted, update transaction and repeat")
 )
 
diff --git a/cache/redis/redis.go b/cache/redis/redis.go
index a1b194e..21ae0ea 100644
--- a/cache/redis/redis.go
+++ b/cache/redis/redis.go
@@ -31,7 +31,7 @@ func (d *driver) New(conf *config.DataStore) cache.Pool {
 	return &Pool{
 		conf: conf,
 		pool: redis.Pool{
-			MaxIdle:      conf.MaxIdleConn,
+			MaxIdle:      conf.MaxIdleConns,
 			IdleTimeout:  conf.IdleTimeout.Duration,
 			Dial:         makeDialFunc(conf),
 			TestOnBorrow: testOnBorrow,
diff --git a/cache/redis/redis_test.go b/cache/redis/redis_test.go
index 6f45ff3..f1a3cd4 100644
--- a/cache/redis/redis_test.go
+++ b/cache/redis/redis_test.go
@@ -47,7 +47,7 @@ func createTestTxObj(t TestReporter) *Tx {
 	testPool := &Pool{
 		conf: conf,
 		pool: redis.Pool{
-			MaxIdle:      conf.MaxIdleConn,
+			MaxIdle:      conf.MaxIdleConns,
 			IdleTimeout:  conf.IdleTimeout.Duration,
 			Dial:         makeDialFunc(conf),
 			TestOnBorrow: testOnBorrow,
diff --git a/config/config.go b/config/config.go
index c751909..c5907da 100644
--- a/config/config.go
+++ b/config/config.go
@@ -39,8 +39,8 @@ type DataStore struct {
 	Encoding string `json:"encoding,omitempty"`
 	Prefix   string `json:"prefix,omitempty"`
 
-	MaxIdleConn int       `json:"max_idle_conn,omitempty"`
-	IdleTimeout *Duration `json:"idle_timeout,omitempty"`
+	MaxIdleConns int       `json:"max_idle_conns,omitempty"`
+	IdleTimeout  *Duration `json:"idle_timeout,omitempty"`
 }
 
 // Config represents a configuration for a server.Server.
diff --git a/config/example.json b/config/example.json
index 90b5111..44a1ba9 100644
--- a/config/example.json
+++ b/config/example.json
@@ -12,7 +12,7 @@
     "pass": "",
     "prefix": "test:",
 
-    "max_idle_conn": 3,
+    "max_idle_conns": 3,
     "idle_timeout": "240s"
   },
 
diff --git a/main.go b/main.go
index f1ed81d..77831a8 100644
--- a/main.go
+++ b/main.go
@@ -12,9 +12,11 @@ import (
 	"runtime"
 	"runtime/pprof"
 
-	_ "github.com/pushrax/chihaya/cache/redis"
 	"github.com/pushrax/chihaya/config"
 	"github.com/pushrax/chihaya/server"
+
+	_ "github.com/pushrax/chihaya/cache/redis"
+	_ "github.com/pushrax/chihaya/storage/gazelle"
 )
 
 var (
diff --git a/models/models.go b/models/models.go
index 727c0a4..36cfd62 100644
--- a/models/models.go
+++ b/models/models.go
@@ -36,3 +36,13 @@ type User struct {
 	Slots          int64   `json:"slots"`
 	SlotsUsed      int64   `json:"slots_used"`
 }
+
+type AnnounceDelta struct {
+	Peer    *Peer
+	Torrent *Torrent
+	User    *User
+
+	Uploaded   uint64
+	Downloaded uint64
+	Timestamp  float64
+}
diff --git a/storage/gazelle/flush.go b/storage/gazelle/flush.go
new file mode 100644
index 0000000..d9fd9f7
--- /dev/null
+++ b/storage/gazelle/flush.go
@@ -0,0 +1,62 @@
+// Copyright 2013 The Chihaya Authors. All rights reserved.
+// Use of this source code is governed by the BSD 2-Clause license,
+// which can be found in the LICENSE file.
+
+package gazelle
+
+import (
+	"bytes"
+	"log"
+	"time"
+)
+
+func (c *Conn) flushTorrents() {
+	var query bytes.Buffer
+	c.waitGroup.Add(1)
+	defer c.waitGroup.Done()
+	var count int
+
+	for {
+		length := len(c.torrentChannel)
+		query.Reset()
+
+		query.WriteString("INSERT INTO torrents (ID, Snatched, Seeders, Leechers, last_action) VALUES\n")
+
+		for count = 0; count < length; count++ {
+			b := <-c.torrentChannel
+			if b == nil {
+				break
+			}
+			query.Write(b.Bytes())
+
+			if count != length-1 {
+				query.WriteRune(',')
+			}
+		}
+
+		if !c.terminate {
+			log.Printf("[torrents] Flushing %d\n", count)
+		}
+
+		if count > 0 {
+			query.WriteString("\nON DUPLICATE KEY UPDATE Snatched = Snatched + VALUES(Snatched), " +
+				"Seeders = VALUES(Seeders), Leechers = VALUES(Leechers), " +
+				"last_action = IF(last_action < VALUES(last_action), VALUES(last_action), last_action);")
+
+			c.db.Exec(query.String())
+
+			if length < cap(c.torrentChannel)/2 {
+				time.Sleep(200 * time.Millisecond)
+			}
+		} else if c.terminate {
+			break
+		} else {
+			time.Sleep(time.Second)
+		}
+	}
+}
+
+func (c *Conn) flushUsers()           {}
+func (c *Conn) flushTransferHistory() {}
+func (c *Conn) flushTransferIps()     {}
+func (c *Conn) flushSnatches()        {}
diff --git a/storage/gazelle/gazelle.go b/storage/gazelle/gazelle.go
new file mode 100644
index 0000000..7f50387
--- /dev/null
+++ b/storage/gazelle/gazelle.go
@@ -0,0 +1,88 @@
+// Copyright 2013 The Chihaya Authors. All rights reserved.
+// Use of this source code is governed by the BSD 2-Clause license,
+// which can be found in the LICENSE file.
+
+// Package gazelle provides a driver for a BitTorrent tracker to interface
+// with the MySQL database used by Gazelle (github.com/WhatCD/Gazelle).
+package gazelle
+
+import (
+	"bytes"
+	"database/sql"
+	"fmt"
+	"sync"
+
+	"github.com/pushrax/chihaya/config"
+	"github.com/pushrax/chihaya/models"
+	"github.com/pushrax/chihaya/storage"
+
+	_ "github.com/go-sql-driver/mysql"
+)
+
+type driver struct{}
+
+func (d *driver) New(conf *config.DataStore) storage.Conn {
+	dsn := fmt.Sprintf(
+		"%s:%s@%s:%s/%s?charset=utf8mb4,utf8",
+		conf.Username,
+		conf.Password,
+		conf.Host,
+		conf.Port,
+		conf.Schema,
+	)
+	db, err := sql.Open("mysql", dsn)
+	if err != nil {
+		panic("gazelle: failed to open connection to MySQL")
+	}
+	db.SetMaxIdleConns(conf.MaxIdleConns)
+
+	conn := &Conn{db: db}
+
+	// TODO Buffer sizes
+	conn.torrentChannel = make(chan *bytes.Buffer, 1000)
+	conn.userChannel = make(chan *bytes.Buffer, 1000)
+	conn.transferHistoryChannel = make(chan *bytes.Buffer, 1000)
+	conn.transferIpsChannel = make(chan *bytes.Buffer, 1000)
+	conn.snatchChannel = make(chan *bytes.Buffer, 100)
+
+	return conn
+}
+
+type Conn struct {
+	db        *sql.DB
+	waitGroup sync.WaitGroup
+	terminate bool
+
+	torrentChannel         chan *bytes.Buffer
+	userChannel            chan *bytes.Buffer
+	transferHistoryChannel chan *bytes.Buffer
+	transferIpsChannel     chan *bytes.Buffer
+	snatchChannel          chan *bytes.Buffer
+}
+
+func (c *Conn) Start() error {
+	go c.flushTorrents()
+	go c.flushUsers()
+	go c.flushTransferHistory()
+	go c.flushTransferIps()
+	go c.flushSnatches()
+	return nil
+}
+
+func (c *Conn) Close() error {
+	c.terminate = true
+	c.waitGroup.Wait()
+	return c.db.Close()
+}
+
+func (c *Conn) RecordAnnounce(delta *models.AnnounceDelta) error {
+	return nil
+}
+
+func (c *Conn) RecordSnatch(peer *models.Peer) error {
+	return nil
+}
+
+func init() {
+	storage.Register("gazelle", &driver{})
+}
diff --git a/storage/storage.go b/storage/storage.go
index 10109c0..e80a3d5 100644
--- a/storage/storage.go
+++ b/storage/storage.go
@@ -49,7 +49,8 @@ func Open(conf *config.DataStore) (Conn, error) {
 
 // Conn represents a connection to the data store.
 type Conn interface {
+	Start() error
 	Close() error
-	UpdateTorrents(t []models.Torrent) error
-	UpdateUsers(u []models.User) error
+	RecordAnnounce(delta *models.AnnounceDelta) error
+	RecordSnatch(peer *models.Peer) error
 }