From 55b57549a679d307471165a530f894ee4495c8ed Mon Sep 17 00:00:00 2001 From: Leo Balduf Date: Fri, 29 Sep 2017 00:49:38 +0200 Subject: [PATCH 1/3] pkg/timecache: implement a time cache --- pkg/timecache/timecache.go | 127 +++++++++++++++++++++++++++ pkg/timecache/timecache_test.go | 148 ++++++++++++++++++++++++++++++++ 2 files changed, 275 insertions(+) create mode 100644 pkg/timecache/timecache.go create mode 100644 pkg/timecache/timecache_test.go diff --git a/pkg/timecache/timecache.go b/pkg/timecache/timecache.go new file mode 100644 index 0000000..bb992a6 --- /dev/null +++ b/pkg/timecache/timecache.go @@ -0,0 +1,127 @@ +// Package timecache provides a cache for the system clock, to avoid calls to +// time.Now(). +// The time is stored as one int64 which holds the number of nanoseconds since +// the Unix Epoch. The value is accessed using atomic primitives, without +// locking. +// The package runs a global singleton TimeCache that is is updated every +// second. +package timecache + +import ( + "sync" + "sync/atomic" + "time" +) + +// t is the global TimeCache. +var t *TimeCache + +func init() { + t = &TimeCache{ + clock: time.Now().UnixNano(), + closed: make(chan struct{}), + running: make(chan struct{}), + } + + go t.Run(1 * time.Second) +} + +// A TimeCache is a cache for the current system time. +// The cached time has nanosecond precision. +type TimeCache struct { + // clock saves the current time's nanoseconds since the Epoch. + // Must be accessed atomically. + clock int64 + + closed chan struct{} + running chan struct{} + m sync.Mutex +} + +// New returns a new TimeCache instance. +// The TimeCache must be started to update the time. +func New() *TimeCache { + return &TimeCache{ + clock: time.Now().UnixNano(), + closed: make(chan struct{}), + running: make(chan struct{}), + } +} + +// Run runs the TimeCache, updating the cached clock value once every interval +// and blocks until Stop is called. +func (t *TimeCache) Run(interval time.Duration) { + t.m.Lock() + select { + case <-t.running: + panic("Run called multiple times") + default: + } + close(t.running) + t.m.Unlock() + + tick := time.NewTicker(interval) + defer tick.Stop() + for { + select { + case <-t.closed: + tick.Stop() + return + case now := <-tick.C: + atomic.StoreInt64(&t.clock, now.UnixNano()) + } + } +} + +// Stop stops the TimeCache. +// The cached time remains valid but will not be updated anymore. +// A TimeCache can not be restarted. Construct a new one instead. +// Calling Stop again is a no-op. +func (t *TimeCache) Stop() { + t.m.Lock() + defer t.m.Unlock() + + select { + case <-t.closed: + return + default: + } + close(t.closed) +} + +// Now returns the cached time as a time.Time value. +func (t *TimeCache) Now() time.Time { + return time.Unix(0, atomic.LoadInt64(&t.clock)) +} + +// NowUnixNano returns the cached time as nanoseconds since the Unix Epoch. +func (t *TimeCache) NowUnixNano() int64 { + return atomic.LoadInt64(&t.clock) +} + +// NowUnix returns the cached time as seconds since the Unix Epoch. +func (t *TimeCache) NowUnix() int64 { + // Adopted from time.Unix + nsec := atomic.LoadInt64(&t.clock) + sec := nsec / 1e9 + nsec -= sec * 1e9 + if nsec < 0 { + sec-- + } + return sec +} + +// Now calls Now on the global TimeCache instance. +func Now() time.Time { + return t.Now() +} + +// NowUnixNano calls NowUnixNano on the global TimeCache instance. +func NowUnixNano() int64 { + return t.NowUnixNano() +} + +// NowUnix calls NowUnix on the global TimeCache instance. +func NowUnix() int64 { + return t.NowUnix() +} diff --git a/pkg/timecache/timecache_test.go b/pkg/timecache/timecache_test.go new file mode 100644 index 0000000..385b912 --- /dev/null +++ b/pkg/timecache/timecache_test.go @@ -0,0 +1,148 @@ +package timecache + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestNew(t *testing.T) { + c := New() + require.NotNil(t, c) + + now := c.Now() + require.False(t, now.IsZero()) + + nsec := c.NowUnixNano() + require.NotEqual(t, 0, nsec) + + sec := c.NowUnix() + require.NotEqual(t, 0, sec) +} + +func TestRunStop(t *testing.T) { + c := New() + require.NotNil(t, c) + + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + c.Run(1 * time.Second) + }() + + c.Stop() + + wg.Wait() +} + +func TestMultipleStop(t *testing.T) { + c := New() + require.NotNil(t, c) + + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + c.Run(1 * time.Second) + }() + + c.Stop() + c.Stop() + + wg.Wait() +} + +func doBenchmark(b *testing.B, f func(tc *TimeCache) func(*testing.PB)) { + tc := New() + require.NotNil(b, tc) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + tc.Run(1 * time.Second) + }() + + b.RunParallel(f(tc)) + + tc.Stop() + wg.Wait() +} + +func BenchmarkNow(b *testing.B) { + doBenchmark(b, func(tc *TimeCache) func(pb *testing.PB) { + return func(pb *testing.PB) { + var now time.Time + for pb.Next() { + now = tc.Now() + } + _ = now + } + }) +} + +func BenchmarkNowUnix(b *testing.B) { + doBenchmark(b, func(tc *TimeCache) func(pb *testing.PB) { + return func(pb *testing.PB) { + var now int64 + for pb.Next() { + now = tc.NowUnix() + } + _ = now + } + }) +} + +func BenchmarkNowUnixNano(b *testing.B) { + doBenchmark(b, func(tc *TimeCache) func(pb *testing.PB) { + return func(pb *testing.PB) { + var now int64 + for pb.Next() { + now = tc.NowUnixNano() + } + _ = now + } + }) +} + +func BenchmarkNowGlobal(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + var now time.Time + for pb.Next() { + now = Now() + } + _ = now + }) +} + +func BenchmarkNowUnixGlobal(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + var now int64 + for pb.Next() { + now = NowUnix() + } + _ = now + }) +} + +func BenchmarkNowUnixNanoGlobal(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + var now int64 + for pb.Next() { + now = NowUnixNano() + } + _ = now + }) +} + +func BenchmarkTimeNow(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + var now time.Time + for pb.Next() { + now = time.Now() + } + _ = now + }) +} From 89bc479a3b95896db7b3eef7cd1d17c0b243391f Mon Sep 17 00:00:00 2001 From: Leo Balduf Date: Fri, 29 Sep 2017 00:50:20 +0200 Subject: [PATCH 2/3] *: make use of timecache --- frontend/udp/frontend.go | 5 +++-- storage/memory/peer_store.go | 28 ++-------------------------- storage/memorybysubnet/peer_store.go | 28 ++-------------------------- 3 files changed, 7 insertions(+), 54 deletions(-) diff --git a/frontend/udp/frontend.go b/frontend/udp/frontend.go index 4ecbf85..acd77a3 100644 --- a/frontend/udp/frontend.go +++ b/frontend/udp/frontend.go @@ -18,6 +18,7 @@ import ( "github.com/chihaya/chihaya/frontend/udp/bytepool" "github.com/chihaya/chihaya/pkg/log" "github.com/chihaya/chihaya/pkg/stop" + "github.com/chihaya/chihaya/pkg/timecache" ) var allowedGeneratedPrivateKeyRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890") @@ -256,7 +257,7 @@ func (t *Frontend) handleRequest(r Request, w ResponseWriter) (actionName string // If this isn't requesting a new connection ID and the connection ID is // invalid, then fail. - if actionID != connectActionID && !ValidConnectionID(connID, r.IP, time.Now(), t.MaxClockSkew, t.PrivateKey) { + if actionID != connectActionID && !ValidConnectionID(connID, r.IP, timecache.Now(), t.MaxClockSkew, t.PrivateKey) { err = errBadConnectionID WriteError(w, txID, err) return @@ -272,7 +273,7 @@ func (t *Frontend) handleRequest(r Request, w ResponseWriter) (actionName string return } - WriteConnectionID(w, txID, NewConnectionID(r.IP, time.Now(), t.PrivateKey)) + WriteConnectionID(w, txID, NewConnectionID(r.IP, timecache.Now(), t.PrivateKey)) case announceActionID, announceV6ActionID: actionName = "announce" diff --git a/storage/memory/peer_store.go b/storage/memory/peer_store.go index f2a21bb..073671b 100644 --- a/storage/memory/peer_store.go +++ b/storage/memory/peer_store.go @@ -7,13 +7,13 @@ import ( "net" "runtime" "sync" - "sync/atomic" "time" "gopkg.in/yaml.v2" "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/pkg/log" + "github.com/chihaya/chihaya/pkg/timecache" "github.com/chihaya/chihaya/storage" ) @@ -146,22 +146,6 @@ func New(provided Config) (storage.PeerStore, error) { } }() - // Start a goroutine for updating our cached system clock. - ps.wg.Add(1) - go func() { - defer ps.wg.Done() - t := time.NewTicker(1 * time.Second) - for { - select { - case <-ps.closed: - t.Stop() - return - case now := <-t.C: - ps.setClock(now.UnixNano()) - } - } - }() - // Start a goroutine for reporting statistics to Prometheus. ps.wg.Add(1) go func() { @@ -229,10 +213,6 @@ type peerStore struct { cfg Config shards []*peerShard - // clock stores the current time nanoseconds, updated every second. - // Must be accessed atomically! - clock int64 - closed chan struct{} wg sync.WaitGroup } @@ -263,11 +243,7 @@ func recordGCDuration(duration time.Duration) { } func (ps *peerStore) getClock() int64 { - return atomic.LoadInt64(&ps.clock) -} - -func (ps *peerStore) setClock(to int64) { - atomic.StoreInt64(&ps.clock, to) + return timecache.NowUnixNano() } func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 { diff --git a/storage/memorybysubnet/peer_store.go b/storage/memorybysubnet/peer_store.go index 5bff7b5..206074e 100644 --- a/storage/memorybysubnet/peer_store.go +++ b/storage/memorybysubnet/peer_store.go @@ -8,13 +8,13 @@ import ( "net" "runtime" "sync" - "sync/atomic" "time" "gopkg.in/yaml.v2" "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/pkg/log" + "github.com/chihaya/chihaya/pkg/timecache" "github.com/chihaya/chihaya/storage" ) @@ -155,22 +155,6 @@ func New(provided Config) (storage.PeerStore, error) { } }() - // Start a goroutine for updating our cached system clock. - ps.wg.Add(1) - go func() { - defer ps.wg.Done() - t := time.NewTicker(1 * time.Second) - for { - select { - case <-ps.closed: - t.Stop() - return - case now := <-t.C: - ps.setClock(now.UnixNano()) - } - } - }() - // Start a goroutine for reporting statistics to Prometheus. ps.wg.Add(1) go func() { @@ -269,10 +253,6 @@ type peerStore struct { ipv6Mask net.IPMask shards []*peerShard - // clock stores the current time nanoseconds, updated every second. - // Must be accessed atomically! - clock int64 - closed chan struct{} wg sync.WaitGroup } @@ -303,11 +283,7 @@ func recordGCDuration(duration time.Duration) { } func (ps *peerStore) getClock() int64 { - return atomic.LoadInt64(&ps.clock) -} - -func (ps *peerStore) setClock(to int64) { - atomic.StoreInt64(&ps.clock, to) + return timecache.NowUnixNano() } func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 { From 6dfdb7e19283840d31ee8c6478f8a5a125938b85 Mon Sep 17 00:00:00 2001 From: Leo Balduf Date: Fri, 29 Sep 2017 00:50:50 +0200 Subject: [PATCH 3/3] udp: clean up connection ID generation --- frontend/udp/connection_id.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend/udp/connection_id.go b/frontend/udp/connection_id.go index c103142..5436c77 100644 --- a/frontend/udp/connection_id.go +++ b/frontend/udp/connection_id.go @@ -25,7 +25,7 @@ const ttl = 2 * time.Minute // forgery probability of approximately 1 in 4 billion. func NewConnectionID(ip net.IP, now time.Time, key string) []byte { buf := make([]byte, 8) - binary.BigEndian.PutUint32(buf, uint32(now.UTC().Unix())) + binary.BigEndian.PutUint32(buf, uint32(now.Unix())) mac := hmac.New(sha256.New, []byte(key)) mac.Write(buf[:4])