Set up initial outbound synchronization structure
This commit is contained in:
parent
ae9eaf4351
commit
080a24c7be
10 changed files with 173 additions and 10 deletions
4
cache/cache.go
vendored
4
cache/cache.go
vendored
|
@ -15,8 +15,8 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
drivers = make(map[string]Driver)
|
||||
ErrTxDone = errors.New("cache: Transaction has already been committed or rolled back")
|
||||
drivers = make(map[string]Driver)
|
||||
ErrTxDone = errors.New("cache: Transaction has already been committed or rolled back")
|
||||
ErrTxConflict = errors.New("cache: Commit interrupted, update transaction and repeat")
|
||||
)
|
||||
|
||||
|
|
2
cache/redis/redis.go
vendored
2
cache/redis/redis.go
vendored
|
@ -31,7 +31,7 @@ func (d *driver) New(conf *config.DataStore) cache.Pool {
|
|||
return &Pool{
|
||||
conf: conf,
|
||||
pool: redis.Pool{
|
||||
MaxIdle: conf.MaxIdleConn,
|
||||
MaxIdle: conf.MaxIdleConns,
|
||||
IdleTimeout: conf.IdleTimeout.Duration,
|
||||
Dial: makeDialFunc(conf),
|
||||
TestOnBorrow: testOnBorrow,
|
||||
|
|
2
cache/redis/redis_test.go
vendored
2
cache/redis/redis_test.go
vendored
|
@ -47,7 +47,7 @@ func createTestTxObj(t TestReporter) *Tx {
|
|||
testPool := &Pool{
|
||||
conf: conf,
|
||||
pool: redis.Pool{
|
||||
MaxIdle: conf.MaxIdleConn,
|
||||
MaxIdle: conf.MaxIdleConns,
|
||||
IdleTimeout: conf.IdleTimeout.Duration,
|
||||
Dial: makeDialFunc(conf),
|
||||
TestOnBorrow: testOnBorrow,
|
||||
|
|
|
@ -39,8 +39,8 @@ type DataStore struct {
|
|||
Encoding string `json:"encoding,omitempty"`
|
||||
Prefix string `json:"prefix,omitempty"`
|
||||
|
||||
MaxIdleConn int `json:"max_idle_conn,omitempty"`
|
||||
IdleTimeout *Duration `json:"idle_timeout,omitempty"`
|
||||
MaxIdleConns int `json:"max_idle_conns,omitempty"`
|
||||
IdleTimeout *Duration `json:"idle_timeout,omitempty"`
|
||||
}
|
||||
|
||||
// Config represents a configuration for a server.Server.
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
"pass": "",
|
||||
"prefix": "test:",
|
||||
|
||||
"max_idle_conn": 3,
|
||||
"max_idle_conns": 3,
|
||||
"idle_timeout": "240s"
|
||||
},
|
||||
|
||||
|
|
4
main.go
4
main.go
|
@ -12,9 +12,11 @@ import (
|
|||
"runtime"
|
||||
"runtime/pprof"
|
||||
|
||||
_ "github.com/pushrax/chihaya/cache/redis"
|
||||
"github.com/pushrax/chihaya/config"
|
||||
"github.com/pushrax/chihaya/server"
|
||||
|
||||
_ "github.com/pushrax/chihaya/cache/redis"
|
||||
_ "github.com/pushrax/chihaya/storage/gazelle"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
|
@ -36,3 +36,13 @@ type User struct {
|
|||
Slots int64 `json:"slots"`
|
||||
SlotsUsed int64 `json:"slots_used"`
|
||||
}
|
||||
|
||||
type AnnounceDelta struct {
|
||||
Peer *Peer
|
||||
Torrent *Torrent
|
||||
User *User
|
||||
|
||||
Uploaded uint64
|
||||
Downloaded uint64
|
||||
Timestamp float64
|
||||
}
|
||||
|
|
62
storage/gazelle/flush.go
Normal file
62
storage/gazelle/flush.go
Normal file
|
@ -0,0 +1,62 @@
|
|||
// Copyright 2013 The Chihaya Authors. All rights reserved.
|
||||
// Use of this source code is governed by the BSD 2-Clause license,
|
||||
// which can be found in the LICENSE file.
|
||||
|
||||
package gazelle
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (c *Conn) flushTorrents() {
|
||||
var query bytes.Buffer
|
||||
c.waitGroup.Add(1)
|
||||
defer c.waitGroup.Done()
|
||||
var count int
|
||||
|
||||
for {
|
||||
length := len(c.torrentChannel)
|
||||
query.Reset()
|
||||
|
||||
query.WriteString("INSERT INTO torrents (ID, Snatched, Seeders, Leechers, last_action) VALUES\n")
|
||||
|
||||
for count = 0; count < length; count++ {
|
||||
b := <-c.torrentChannel
|
||||
if b == nil {
|
||||
break
|
||||
}
|
||||
query.Write(b.Bytes())
|
||||
|
||||
if count != length-1 {
|
||||
query.WriteRune(',')
|
||||
}
|
||||
}
|
||||
|
||||
if !c.terminate {
|
||||
log.Printf("[torrents] Flushing %d\n", count)
|
||||
}
|
||||
|
||||
if count > 0 {
|
||||
query.WriteString("\nON DUPLICATE KEY UPDATE Snatched = Snatched + VALUES(Snatched), " +
|
||||
"Seeders = VALUES(Seeders), Leechers = VALUES(Leechers), " +
|
||||
"last_action = IF(last_action < VALUES(last_action), VALUES(last_action), last_action);")
|
||||
|
||||
c.db.Exec(query.String())
|
||||
|
||||
if length < cap(c.torrentChannel)/2 {
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
} else if c.terminate {
|
||||
break
|
||||
} else {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conn) flushUsers() {}
|
||||
func (c *Conn) flushTransferHistory() {}
|
||||
func (c *Conn) flushTransferIps() {}
|
||||
func (c *Conn) flushSnatches() {}
|
88
storage/gazelle/gazelle.go
Normal file
88
storage/gazelle/gazelle.go
Normal file
|
@ -0,0 +1,88 @@
|
|||
// Copyright 2013 The Chihaya Authors. All rights reserved.
|
||||
// Use of this source code is governed by the BSD 2-Clause license,
|
||||
// which can be found in the LICENSE file.
|
||||
|
||||
// Package gazelle provides a driver for a BitTorrent tracker to interface
|
||||
// with the MySQL database used by Gazelle (github.com/WhatCD/Gazelle).
|
||||
package gazelle
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/pushrax/chihaya/config"
|
||||
"github.com/pushrax/chihaya/models"
|
||||
"github.com/pushrax/chihaya/storage"
|
||||
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
)
|
||||
|
||||
type driver struct{}
|
||||
|
||||
func (d *driver) New(conf *config.DataStore) storage.Conn {
|
||||
dsn := fmt.Sprintf(
|
||||
"%s:%s@%s:%s/%s?charset=utf8mb4,utf8",
|
||||
conf.Username,
|
||||
conf.Password,
|
||||
conf.Host,
|
||||
conf.Port,
|
||||
conf.Schema,
|
||||
)
|
||||
db, err := sql.Open("mysql", dsn)
|
||||
if err != nil {
|
||||
panic("gazelle: failed to open connection to MySQL")
|
||||
}
|
||||
db.SetMaxIdleConns(conf.MaxIdleConns)
|
||||
|
||||
conn := &Conn{db: db}
|
||||
|
||||
// TODO Buffer sizes
|
||||
conn.torrentChannel = make(chan *bytes.Buffer, 1000)
|
||||
conn.userChannel = make(chan *bytes.Buffer, 1000)
|
||||
conn.transferHistoryChannel = make(chan *bytes.Buffer, 1000)
|
||||
conn.transferIpsChannel = make(chan *bytes.Buffer, 1000)
|
||||
conn.snatchChannel = make(chan *bytes.Buffer, 100)
|
||||
|
||||
return conn
|
||||
}
|
||||
|
||||
type Conn struct {
|
||||
db *sql.DB
|
||||
waitGroup sync.WaitGroup
|
||||
terminate bool
|
||||
|
||||
torrentChannel chan *bytes.Buffer
|
||||
userChannel chan *bytes.Buffer
|
||||
transferHistoryChannel chan *bytes.Buffer
|
||||
transferIpsChannel chan *bytes.Buffer
|
||||
snatchChannel chan *bytes.Buffer
|
||||
}
|
||||
|
||||
func (c *Conn) Start() error {
|
||||
go c.flushTorrents()
|
||||
go c.flushUsers()
|
||||
go c.flushTransferHistory()
|
||||
go c.flushTransferIps()
|
||||
go c.flushSnatches()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Conn) Close() error {
|
||||
c.terminate = true
|
||||
c.waitGroup.Wait()
|
||||
return c.db.Close()
|
||||
}
|
||||
|
||||
func (c *Conn) RecordAnnounce(delta *models.AnnounceDelta) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Conn) RecordSnatch(peer *models.Peer) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
storage.Register("gazelle", &driver{})
|
||||
}
|
|
@ -49,7 +49,8 @@ func Open(conf *config.DataStore) (Conn, error) {
|
|||
|
||||
// Conn represents a connection to the data store.
|
||||
type Conn interface {
|
||||
Start() error
|
||||
Close() error
|
||||
UpdateTorrents(t []models.Torrent) error
|
||||
UpdateUsers(u []models.User) error
|
||||
RecordAnnounce(delta *models.AnnounceDelta) error
|
||||
RecordSnatch(peer *models.Peer) error
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue