commit
e7b8264e50
6 changed files with 283 additions and 55 deletions
|
@ -25,7 +25,7 @@ const ttl = 2 * time.Minute
|
||||||
// forgery probability of approximately 1 in 4 billion.
|
// forgery probability of approximately 1 in 4 billion.
|
||||||
func NewConnectionID(ip net.IP, now time.Time, key string) []byte {
|
func NewConnectionID(ip net.IP, now time.Time, key string) []byte {
|
||||||
buf := make([]byte, 8)
|
buf := make([]byte, 8)
|
||||||
binary.BigEndian.PutUint32(buf, uint32(now.UTC().Unix()))
|
binary.BigEndian.PutUint32(buf, uint32(now.Unix()))
|
||||||
|
|
||||||
mac := hmac.New(sha256.New, []byte(key))
|
mac := hmac.New(sha256.New, []byte(key))
|
||||||
mac.Write(buf[:4])
|
mac.Write(buf[:4])
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"github.com/chihaya/chihaya/frontend/udp/bytepool"
|
"github.com/chihaya/chihaya/frontend/udp/bytepool"
|
||||||
"github.com/chihaya/chihaya/pkg/log"
|
"github.com/chihaya/chihaya/pkg/log"
|
||||||
"github.com/chihaya/chihaya/pkg/stop"
|
"github.com/chihaya/chihaya/pkg/stop"
|
||||||
|
"github.com/chihaya/chihaya/pkg/timecache"
|
||||||
)
|
)
|
||||||
|
|
||||||
var allowedGeneratedPrivateKeyRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890")
|
var allowedGeneratedPrivateKeyRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890")
|
||||||
|
@ -256,7 +257,7 @@ func (t *Frontend) handleRequest(r Request, w ResponseWriter) (actionName string
|
||||||
|
|
||||||
// If this isn't requesting a new connection ID and the connection ID is
|
// If this isn't requesting a new connection ID and the connection ID is
|
||||||
// invalid, then fail.
|
// invalid, then fail.
|
||||||
if actionID != connectActionID && !ValidConnectionID(connID, r.IP, time.Now(), t.MaxClockSkew, t.PrivateKey) {
|
if actionID != connectActionID && !ValidConnectionID(connID, r.IP, timecache.Now(), t.MaxClockSkew, t.PrivateKey) {
|
||||||
err = errBadConnectionID
|
err = errBadConnectionID
|
||||||
WriteError(w, txID, err)
|
WriteError(w, txID, err)
|
||||||
return
|
return
|
||||||
|
@ -272,7 +273,7 @@ func (t *Frontend) handleRequest(r Request, w ResponseWriter) (actionName string
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
WriteConnectionID(w, txID, NewConnectionID(r.IP, time.Now(), t.PrivateKey))
|
WriteConnectionID(w, txID, NewConnectionID(r.IP, timecache.Now(), t.PrivateKey))
|
||||||
|
|
||||||
case announceActionID, announceV6ActionID:
|
case announceActionID, announceV6ActionID:
|
||||||
actionName = "announce"
|
actionName = "announce"
|
||||||
|
|
127
pkg/timecache/timecache.go
Normal file
127
pkg/timecache/timecache.go
Normal file
|
@ -0,0 +1,127 @@
|
||||||
|
// Package timecache provides a cache for the system clock, to avoid calls to
|
||||||
|
// time.Now().
|
||||||
|
// The time is stored as one int64 which holds the number of nanoseconds since
|
||||||
|
// the Unix Epoch. The value is accessed using atomic primitives, without
|
||||||
|
// locking.
|
||||||
|
// The package runs a global singleton TimeCache that is is updated every
|
||||||
|
// second.
|
||||||
|
package timecache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// t is the global TimeCache.
|
||||||
|
var t *TimeCache
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
t = &TimeCache{
|
||||||
|
clock: time.Now().UnixNano(),
|
||||||
|
closed: make(chan struct{}),
|
||||||
|
running: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
go t.Run(1 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
// A TimeCache is a cache for the current system time.
|
||||||
|
// The cached time has nanosecond precision.
|
||||||
|
type TimeCache struct {
|
||||||
|
// clock saves the current time's nanoseconds since the Epoch.
|
||||||
|
// Must be accessed atomically.
|
||||||
|
clock int64
|
||||||
|
|
||||||
|
closed chan struct{}
|
||||||
|
running chan struct{}
|
||||||
|
m sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// New returns a new TimeCache instance.
|
||||||
|
// The TimeCache must be started to update the time.
|
||||||
|
func New() *TimeCache {
|
||||||
|
return &TimeCache{
|
||||||
|
clock: time.Now().UnixNano(),
|
||||||
|
closed: make(chan struct{}),
|
||||||
|
running: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run runs the TimeCache, updating the cached clock value once every interval
|
||||||
|
// and blocks until Stop is called.
|
||||||
|
func (t *TimeCache) Run(interval time.Duration) {
|
||||||
|
t.m.Lock()
|
||||||
|
select {
|
||||||
|
case <-t.running:
|
||||||
|
panic("Run called multiple times")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
close(t.running)
|
||||||
|
t.m.Unlock()
|
||||||
|
|
||||||
|
tick := time.NewTicker(interval)
|
||||||
|
defer tick.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-t.closed:
|
||||||
|
tick.Stop()
|
||||||
|
return
|
||||||
|
case now := <-tick.C:
|
||||||
|
atomic.StoreInt64(&t.clock, now.UnixNano())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop stops the TimeCache.
|
||||||
|
// The cached time remains valid but will not be updated anymore.
|
||||||
|
// A TimeCache can not be restarted. Construct a new one instead.
|
||||||
|
// Calling Stop again is a no-op.
|
||||||
|
func (t *TimeCache) Stop() {
|
||||||
|
t.m.Lock()
|
||||||
|
defer t.m.Unlock()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-t.closed:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
close(t.closed)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now returns the cached time as a time.Time value.
|
||||||
|
func (t *TimeCache) Now() time.Time {
|
||||||
|
return time.Unix(0, atomic.LoadInt64(&t.clock))
|
||||||
|
}
|
||||||
|
|
||||||
|
// NowUnixNano returns the cached time as nanoseconds since the Unix Epoch.
|
||||||
|
func (t *TimeCache) NowUnixNano() int64 {
|
||||||
|
return atomic.LoadInt64(&t.clock)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NowUnix returns the cached time as seconds since the Unix Epoch.
|
||||||
|
func (t *TimeCache) NowUnix() int64 {
|
||||||
|
// Adopted from time.Unix
|
||||||
|
nsec := atomic.LoadInt64(&t.clock)
|
||||||
|
sec := nsec / 1e9
|
||||||
|
nsec -= sec * 1e9
|
||||||
|
if nsec < 0 {
|
||||||
|
sec--
|
||||||
|
}
|
||||||
|
return sec
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now calls Now on the global TimeCache instance.
|
||||||
|
func Now() time.Time {
|
||||||
|
return t.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
// NowUnixNano calls NowUnixNano on the global TimeCache instance.
|
||||||
|
func NowUnixNano() int64 {
|
||||||
|
return t.NowUnixNano()
|
||||||
|
}
|
||||||
|
|
||||||
|
// NowUnix calls NowUnix on the global TimeCache instance.
|
||||||
|
func NowUnix() int64 {
|
||||||
|
return t.NowUnix()
|
||||||
|
}
|
148
pkg/timecache/timecache_test.go
Normal file
148
pkg/timecache/timecache_test.go
Normal file
|
@ -0,0 +1,148 @@
|
||||||
|
package timecache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNew(t *testing.T) {
|
||||||
|
c := New()
|
||||||
|
require.NotNil(t, c)
|
||||||
|
|
||||||
|
now := c.Now()
|
||||||
|
require.False(t, now.IsZero())
|
||||||
|
|
||||||
|
nsec := c.NowUnixNano()
|
||||||
|
require.NotEqual(t, 0, nsec)
|
||||||
|
|
||||||
|
sec := c.NowUnix()
|
||||||
|
require.NotEqual(t, 0, sec)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRunStop(t *testing.T) {
|
||||||
|
c := New()
|
||||||
|
require.NotNil(t, c)
|
||||||
|
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
c.Run(1 * time.Second)
|
||||||
|
}()
|
||||||
|
|
||||||
|
c.Stop()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMultipleStop(t *testing.T) {
|
||||||
|
c := New()
|
||||||
|
require.NotNil(t, c)
|
||||||
|
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
c.Run(1 * time.Second)
|
||||||
|
}()
|
||||||
|
|
||||||
|
c.Stop()
|
||||||
|
c.Stop()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func doBenchmark(b *testing.B, f func(tc *TimeCache) func(*testing.PB)) {
|
||||||
|
tc := New()
|
||||||
|
require.NotNil(b, tc)
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
tc.Run(1 * time.Second)
|
||||||
|
}()
|
||||||
|
|
||||||
|
b.RunParallel(f(tc))
|
||||||
|
|
||||||
|
tc.Stop()
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkNow(b *testing.B) {
|
||||||
|
doBenchmark(b, func(tc *TimeCache) func(pb *testing.PB) {
|
||||||
|
return func(pb *testing.PB) {
|
||||||
|
var now time.Time
|
||||||
|
for pb.Next() {
|
||||||
|
now = tc.Now()
|
||||||
|
}
|
||||||
|
_ = now
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkNowUnix(b *testing.B) {
|
||||||
|
doBenchmark(b, func(tc *TimeCache) func(pb *testing.PB) {
|
||||||
|
return func(pb *testing.PB) {
|
||||||
|
var now int64
|
||||||
|
for pb.Next() {
|
||||||
|
now = tc.NowUnix()
|
||||||
|
}
|
||||||
|
_ = now
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkNowUnixNano(b *testing.B) {
|
||||||
|
doBenchmark(b, func(tc *TimeCache) func(pb *testing.PB) {
|
||||||
|
return func(pb *testing.PB) {
|
||||||
|
var now int64
|
||||||
|
for pb.Next() {
|
||||||
|
now = tc.NowUnixNano()
|
||||||
|
}
|
||||||
|
_ = now
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkNowGlobal(b *testing.B) {
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
var now time.Time
|
||||||
|
for pb.Next() {
|
||||||
|
now = Now()
|
||||||
|
}
|
||||||
|
_ = now
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkNowUnixGlobal(b *testing.B) {
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
var now int64
|
||||||
|
for pb.Next() {
|
||||||
|
now = NowUnix()
|
||||||
|
}
|
||||||
|
_ = now
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkNowUnixNanoGlobal(b *testing.B) {
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
var now int64
|
||||||
|
for pb.Next() {
|
||||||
|
now = NowUnixNano()
|
||||||
|
}
|
||||||
|
_ = now
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkTimeNow(b *testing.B) {
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
var now time.Time
|
||||||
|
for pb.Next() {
|
||||||
|
now = time.Now()
|
||||||
|
}
|
||||||
|
_ = now
|
||||||
|
})
|
||||||
|
}
|
|
@ -7,13 +7,13 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gopkg.in/yaml.v2"
|
"gopkg.in/yaml.v2"
|
||||||
|
|
||||||
"github.com/chihaya/chihaya/bittorrent"
|
"github.com/chihaya/chihaya/bittorrent"
|
||||||
"github.com/chihaya/chihaya/pkg/log"
|
"github.com/chihaya/chihaya/pkg/log"
|
||||||
|
"github.com/chihaya/chihaya/pkg/timecache"
|
||||||
"github.com/chihaya/chihaya/storage"
|
"github.com/chihaya/chihaya/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -146,22 +146,6 @@ func New(provided Config) (storage.PeerStore, error) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Start a goroutine for updating our cached system clock.
|
|
||||||
ps.wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer ps.wg.Done()
|
|
||||||
t := time.NewTicker(1 * time.Second)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ps.closed:
|
|
||||||
t.Stop()
|
|
||||||
return
|
|
||||||
case now := <-t.C:
|
|
||||||
ps.setClock(now.UnixNano())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Start a goroutine for reporting statistics to Prometheus.
|
// Start a goroutine for reporting statistics to Prometheus.
|
||||||
ps.wg.Add(1)
|
ps.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -229,10 +213,6 @@ type peerStore struct {
|
||||||
cfg Config
|
cfg Config
|
||||||
shards []*peerShard
|
shards []*peerShard
|
||||||
|
|
||||||
// clock stores the current time nanoseconds, updated every second.
|
|
||||||
// Must be accessed atomically!
|
|
||||||
clock int64
|
|
||||||
|
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
@ -263,11 +243,7 @@ func recordGCDuration(duration time.Duration) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *peerStore) getClock() int64 {
|
func (ps *peerStore) getClock() int64 {
|
||||||
return atomic.LoadInt64(&ps.clock)
|
return timecache.NowUnixNano()
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *peerStore) setClock(to int64) {
|
|
||||||
atomic.StoreInt64(&ps.clock, to)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 {
|
func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 {
|
||||||
|
|
|
@ -8,13 +8,13 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gopkg.in/yaml.v2"
|
"gopkg.in/yaml.v2"
|
||||||
|
|
||||||
"github.com/chihaya/chihaya/bittorrent"
|
"github.com/chihaya/chihaya/bittorrent"
|
||||||
"github.com/chihaya/chihaya/pkg/log"
|
"github.com/chihaya/chihaya/pkg/log"
|
||||||
|
"github.com/chihaya/chihaya/pkg/timecache"
|
||||||
"github.com/chihaya/chihaya/storage"
|
"github.com/chihaya/chihaya/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -155,22 +155,6 @@ func New(provided Config) (storage.PeerStore, error) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Start a goroutine for updating our cached system clock.
|
|
||||||
ps.wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer ps.wg.Done()
|
|
||||||
t := time.NewTicker(1 * time.Second)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ps.closed:
|
|
||||||
t.Stop()
|
|
||||||
return
|
|
||||||
case now := <-t.C:
|
|
||||||
ps.setClock(now.UnixNano())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Start a goroutine for reporting statistics to Prometheus.
|
// Start a goroutine for reporting statistics to Prometheus.
|
||||||
ps.wg.Add(1)
|
ps.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -269,10 +253,6 @@ type peerStore struct {
|
||||||
ipv6Mask net.IPMask
|
ipv6Mask net.IPMask
|
||||||
shards []*peerShard
|
shards []*peerShard
|
||||||
|
|
||||||
// clock stores the current time nanoseconds, updated every second.
|
|
||||||
// Must be accessed atomically!
|
|
||||||
clock int64
|
|
||||||
|
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
@ -303,11 +283,7 @@ func recordGCDuration(duration time.Duration) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *peerStore) getClock() int64 {
|
func (ps *peerStore) getClock() int64 {
|
||||||
return atomic.LoadInt64(&ps.clock)
|
return timecache.NowUnixNano()
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *peerStore) setClock(to int64) {
|
|
||||||
atomic.StoreInt64(&ps.clock, to)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 {
|
func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 {
|
||||||
|
|
Loading…
Reference in a new issue