Merge pull request #176 from mrd0ll4r/store-bench

store: add general benchmark and test, add synchronization to string store
This commit is contained in:
Jimmy Zelinskie 2016-06-19 20:39:27 -04:00 committed by GitHub
commit 1fdbe1bad1
16 changed files with 2511 additions and 427 deletions

101
pkg/stopper/stopper.go Normal file
View file

@ -0,0 +1,101 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package stopper
import (
"sync"
)
// AlreadyStopped is a closed error channel to be used by StopperFuncs when
// an element was already stopped.
var AlreadyStopped <-chan error
// AlreadyStoppedFunc is a StopperFunc that returns AlreadyStopped.
var AlreadyStoppedFunc = func() <-chan error { return AlreadyStopped }
func init() {
closeMe := make(chan error)
close(closeMe)
AlreadyStopped = closeMe
}
// Stopper is an interface that allows a clean shutdown.
type Stopper interface {
// Stop returns a channel that indicates whether the stop was
// successful.
// The channel can either return one error or be closed. Closing the
// channel signals a clean shutdown.
// The Stop function should return immediately and perform the actual
// shutdown in a seperate goroutine.
Stop() <-chan error
}
// StopGroup is a group that can be stopped.
type StopGroup struct {
stoppables []StopperFunc
stoppablesLock sync.Mutex
}
// StopperFunc is a function that can be used to provide a clean shutdown.
type StopperFunc func() <-chan error
// NewStopGroup creates a new StopGroup.
func NewStopGroup() *StopGroup {
return &StopGroup{
stoppables: make([]StopperFunc, 0),
}
}
// Add adds a Stopper to the StopGroup.
// On the next call to Stop(), the Stopper will be stopped.
func (cg *StopGroup) Add(toAdd Stopper) {
cg.stoppablesLock.Lock()
defer cg.stoppablesLock.Unlock()
cg.stoppables = append(cg.stoppables, toAdd.Stop)
}
// AddFunc adds a StopperFunc to the StopGroup.
// On the next call to Stop(), the StopperFunc will be called.
func (cg *StopGroup) AddFunc(toAddFunc StopperFunc) {
cg.stoppablesLock.Lock()
defer cg.stoppablesLock.Unlock()
cg.stoppables = append(cg.stoppables, toAddFunc)
}
// Stop stops all members of the StopGroup.
// Stopping will be done in a concurrent fashion.
// The slice of errors returned contains all errors returned by stopping the
// members.
func (cg *StopGroup) Stop() []error {
cg.stoppablesLock.Lock()
defer cg.stoppablesLock.Unlock()
var errors []error
whenDone := make(chan struct{})
waitChannels := make([]<-chan error, 0, len(cg.stoppables))
for _, toStop := range cg.stoppables {
waitFor := toStop()
if waitFor == nil {
panic("received a nil chan from Stop")
}
waitChannels = append(waitChannels, waitFor)
}
go func() {
for _, waitForMe := range waitChannels {
err := <-waitForMe
if err != nil {
errors = append(errors, err)
}
}
close(whenDone)
}()
<-whenDone
return errors
}

43
server/store/README.md Normal file
View file

@ -0,0 +1,43 @@
## The store Package
The `store` package offers a storage interface and middlewares sufficient to run a public tracker based on it.
### Architecture
The store consists of three parts:
- A set of interfaces, tests based on these interfaces and the store logic, unifying these interfaces into the store
- Drivers, implementing the store interfaces and
- Middleware that depends on the store
The store interfaces are `IPStore`, `PeerStore` and `StringStore`.
During runtime, each of them will be implemented by a driver.
Even though all different drivers for one interface provide the same functionality, their behaviour can be very different.
For example: The memory implementation keeps all state in-memory - this is very fast, but not persistent, it loses its state on every restart.
A database-backed driver on the other hand could provide persistence, at the cost of performance.
The pluggable design of Chihaya allows for the different interfaces to use different drivers.
For example: A typical use case of the `StringStore` is to provide blacklists or whitelists for infohashes/client IDs/....
You'd typically want these lists to be persistent, so you'd choose a driver that provides persistence.
The `PeerStore` on the other hand rarely needs to be persistent, as all peer state will be restored after one announce interval.
You'd therefore typically choose a very performant but non-persistent driver for the `PeerStore`.
### Testing
The main store package also contains a set of tests and benchmarks for drivers.
Both use the store interfaces and can work with any driver that implements these interfaces.
The tests verify that the driver behaves as specified by the interface and its documentation.
The benchmarks can be used to compare performance of a wide range of operations on the interfaces.
This makes it very easy to implement a new driver:
All functions that are part of the store interfaces can be tested easily with the tests that come with the store package.
Generally the memory implementation can be used as a guideline for implementing new drivers.
Both benchmarks and tests require a clean state to work correctly.
All of the test and benchmark functions therefore take a `*DriverConfig` as a parameter, this should be used to configure the driver in a way that it provides a clean state for every test or benchmark.
For example: Imagine a file-based driver that achieves persistence by storing its state in a file.
It must then be possible to provide the location of this file in the `'DriverConfig`, so that every different benchmark gets to work with a new file.
Most benchmarks come in two flavors: The "normal" version and the "1K" version.
A normal benchmark uses the same value over and over again to benchmark one operation.
A 1K benchmark uses a different value from a set of 1000 values for every iteration, this can show caching effects, if the driver uses them.
The 1K benchmarks require a little more computation to select the values and thus typically yield slightly lower results even for a "perfect" cache, i.e. the memory implementation.

View file

@ -7,6 +7,8 @@ package store
import (
"fmt"
"net"
"github.com/chihaya/chihaya/pkg/stopper"
)
var ipStoreDrivers = make(map[string]IPStoreDriver)
@ -52,6 +54,12 @@ type IPStore interface {
// Returns ErrResourceDoesNotExist if the given network is not
// contained in the store.
RemoveNetwork(network string) error
// Stopper provides the Stop method that stops the IPStore.
// Stop should shut down the IPStore in a separate goroutine and send
// an error to the channel if the shutdown failed. If the shutdown
// was successful, the channel is to be closed.
stopper.Stopper
}
// IPStoreDriver represents an interface for creating a handle to the

View file

@ -23,6 +23,7 @@ func (d *ipStoreDriver) New(_ *store.DriverConfig) (store.IPStore, error) {
return &ipStore{
ips: make(map[[16]byte]struct{}),
networks: netmatch.New(),
closed: make(chan struct{}),
}, nil
}
@ -31,6 +32,7 @@ func (d *ipStoreDriver) New(_ *store.DriverConfig) (store.IPStore, error) {
type ipStore struct {
ips map[[16]byte]struct{}
networks *netmatch.Trie
closed chan struct{}
sync.RWMutex
}
@ -65,6 +67,12 @@ func (s *ipStore) AddNetwork(network string) error {
s.Lock()
defer s.Unlock()
select {
case <-s.closed:
panic("attempted to interact with stopped store")
default:
}
return s.networks.Add(key, length)
}
@ -72,6 +80,12 @@ func (s *ipStore) AddIP(ip net.IP) error {
s.Lock()
defer s.Unlock()
select {
case <-s.closed:
panic("attempted to interact with stopped store")
default:
}
s.ips[key(ip)] = struct{}{}
return nil
@ -82,6 +96,12 @@ func (s *ipStore) HasIP(ip net.IP) (bool, error) {
s.RLock()
defer s.RUnlock()
select {
case <-s.closed:
panic("attempted to interact with stopped store")
default:
}
_, ok := s.ips[key]
if ok {
return true, nil
@ -99,6 +119,12 @@ func (s *ipStore) HasAnyIP(ips []net.IP) (bool, error) {
s.RLock()
defer s.RUnlock()
select {
case <-s.closed:
panic("attempted to interact with stopped store")
default:
}
for _, ip := range ips {
key := key(ip)
if _, ok := s.ips[key]; ok {
@ -121,6 +147,12 @@ func (s *ipStore) HasAllIPs(ips []net.IP) (bool, error) {
s.RLock()
defer s.RUnlock()
select {
case <-s.closed:
panic("attempted to interact with stopped store")
default:
}
for _, ip := range ips {
key := key(ip)
if _, ok := s.ips[key]; !ok {
@ -142,6 +174,12 @@ func (s *ipStore) RemoveIP(ip net.IP) error {
s.Lock()
defer s.Unlock()
select {
case <-s.closed:
panic("attempted to interact with stopped store")
default:
}
if _, ok := s.ips[key]; !ok {
return store.ErrResourceDoesNotExist
}
@ -160,9 +198,28 @@ func (s *ipStore) RemoveNetwork(network string) error {
s.Lock()
defer s.Unlock()
select {
case <-s.closed:
panic("attempted to interact with stopped store")
default:
}
err = s.networks.Remove(key, length)
if err != nil && err == netmatch.ErrNotContained {
return store.ErrResourceDoesNotExist
}
return err
}
func (s *ipStore) Stop() <-chan error {
toReturn := make(chan error)
go func() {
s.Lock()
defer s.Unlock()
s.ips = make(map[[16]byte]struct{})
s.networks = netmatch.New()
close(s.closed)
close(toReturn)
}()
return toReturn
}

View file

@ -10,13 +10,16 @@ import (
"github.com/chihaya/chihaya/server/store"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var (
v6 = net.ParseIP("0c22:384e:0:0c22:384e::68")
v4 = net.ParseIP("12.13.14.15")
v4s = net.ParseIP("12.13.14.15").To4()
ipStoreTester = store.PrepareIPStoreTester(&ipStoreDriver{})
ipStoreBenchmarker = store.PrepareIPStoreBenchmarker(&ipStoreDriver{})
)
func TestKey(t *testing.T) {
@ -31,227 +34,166 @@ func TestKey(t *testing.T) {
for _, tt := range table {
got := key(tt.input)
assert.Equal(t, got, tt.expected)
require.Equal(t, got, tt.expected)
}
}
func TestIPStore(t *testing.T) {
var d = &ipStoreDriver{}
s, err := d.New(&store.DriverConfig{})
assert.Nil(t, err)
assert.NotNil(t, s)
// check default state
found, err := s.HasIP(v4)
assert.Nil(t, err)
assert.False(t, found)
// check IPv4
err = s.AddIP(v4)
assert.Nil(t, err)
found, err = s.HasIP(v4)
assert.Nil(t, err)
assert.True(t, found)
found, err = s.HasIP(v4s)
assert.Nil(t, err)
assert.True(t, found)
found, err = s.HasIP(v6)
assert.Nil(t, err)
assert.False(t, found)
// check removes
err = s.RemoveIP(v6)
assert.NotNil(t, err)
err = s.RemoveIP(v4s)
assert.Nil(t, err)
found, err = s.HasIP(v4)
assert.Nil(t, err)
assert.False(t, found)
// check IPv6
err = s.AddIP(v6)
assert.Nil(t, err)
found, err = s.HasIP(v6)
assert.Nil(t, err)
assert.True(t, found)
err = s.RemoveIP(v6)
assert.Nil(t, err)
found, err = s.HasIP(v6)
assert.Nil(t, err)
assert.False(t, found)
ipStoreTester.TestIPStore(t, &store.DriverConfig{})
}
func TestHasAllHasAny(t *testing.T) {
var d = &ipStoreDriver{}
s, err := d.New(&store.DriverConfig{})
assert.Nil(t, err)
assert.NotNil(t, s)
found, err := s.HasAnyIP(nil)
assert.Nil(t, err)
assert.False(t, found)
found, err = s.HasAllIPs(nil)
assert.Nil(t, err)
assert.True(t, found)
found, err = s.HasAllIPs([]net.IP{v4})
assert.Nil(t, err)
assert.False(t, found)
err = s.AddIP(v4)
assert.Nil(t, err)
found, err = s.HasAnyIP([]net.IP{v4, v6})
assert.Nil(t, err)
assert.True(t, found)
found, err = s.HasAllIPs([]net.IP{v4, v6})
assert.Nil(t, err)
assert.False(t, found)
found, err = s.HasAllIPs([]net.IP{v4})
assert.Nil(t, err)
assert.True(t, found)
err = s.AddIP(v6)
assert.Nil(t, err)
found, err = s.HasAnyIP([]net.IP{v4, v6})
assert.Nil(t, err)
assert.True(t, found)
found, err = s.HasAllIPs([]net.IP{v4, v6})
assert.Nil(t, err)
assert.True(t, found)
ipStoreTester.TestHasAllHasAny(t, &store.DriverConfig{})
}
func TestNetworks(t *testing.T) {
var (
d = &ipStoreDriver{}
net1 = "192.168.22.255/24"
net2 = "192.168.23.255/24"
includedIP = net.ParseIP("192.168.22.23")
excludedIP = net.ParseIP("192.168.23.22")
)
s, err := d.New(&store.DriverConfig{})
assert.Nil(t, err)
match, err := s.HasIP(includedIP)
assert.Nil(t, err)
assert.False(t, match)
match, err = s.HasIP(excludedIP)
assert.Nil(t, err)
assert.False(t, match)
err = s.AddNetwork("")
assert.NotNil(t, err)
err = s.RemoveNetwork("")
assert.NotNil(t, err)
err = s.AddNetwork(net1)
assert.Nil(t, err)
match, err = s.HasIP(includedIP)
assert.Nil(t, err)
assert.True(t, match)
match, err = s.HasIP(excludedIP)
assert.Nil(t, err)
assert.False(t, match)
err = s.RemoveNetwork(net2)
assert.NotNil(t, err)
err = s.RemoveNetwork(net1)
assert.Nil(t, err)
match, err = s.HasIP(includedIP)
assert.Nil(t, err)
assert.False(t, match)
match, err = s.HasIP(excludedIP)
assert.Nil(t, err)
assert.False(t, match)
ipStoreTester.TestNetworks(t, &store.DriverConfig{})
}
func TestHasAllHasAnyNetworks(t *testing.T) {
var (
d = &ipStoreDriver{}
net1 = "192.168.22.255/24"
net2 = "192.168.23.255/24"
inNet1 = net.ParseIP("192.168.22.234")
inNet2 = net.ParseIP("192.168.23.123")
excluded = net.ParseIP("10.154.243.22")
)
s, err := d.New(&store.DriverConfig{})
assert.Nil(t, err)
match, err := s.HasAnyIP([]net.IP{inNet1, inNet2, excluded})
assert.Nil(t, err)
assert.False(t, match)
match, err = s.HasAllIPs([]net.IP{inNet1, inNet2, excluded})
assert.Nil(t, err)
assert.False(t, match)
err = s.AddNetwork(net1)
assert.Nil(t, err)
match, err = s.HasAnyIP([]net.IP{inNet1, inNet2})
assert.Nil(t, err)
assert.True(t, match)
match, err = s.HasAllIPs([]net.IP{inNet1, inNet2})
assert.Nil(t, err)
assert.False(t, match)
err = s.AddNetwork(net2)
assert.Nil(t, err)
match, err = s.HasAnyIP([]net.IP{inNet1, inNet2, excluded})
assert.Nil(t, err)
assert.True(t, match)
match, err = s.HasAllIPs([]net.IP{inNet1, inNet2})
assert.Nil(t, err)
assert.True(t, match)
match, err = s.HasAllIPs([]net.IP{inNet1, inNet2, excluded})
assert.Nil(t, err)
assert.False(t, match)
err = s.RemoveNetwork(net1)
assert.Nil(t, err)
match, err = s.HasAnyIP([]net.IP{inNet1, inNet2})
assert.Nil(t, err)
assert.True(t, match)
match, err = s.HasAllIPs([]net.IP{inNet1, inNet2})
assert.Nil(t, err)
assert.False(t, match)
err = s.RemoveNetwork(net2)
assert.Nil(t, err)
match, err = s.HasAnyIP([]net.IP{inNet1, inNet2})
assert.Nil(t, err)
assert.False(t, match)
match, err = s.HasAllIPs([]net.IP{inNet1, inNet2})
assert.Nil(t, err)
assert.False(t, match)
ipStoreTester.TestHasAllHasAnyNetworks(t, &store.DriverConfig{})
}
func BenchmarkIPStore_AddV4(b *testing.B) {
ipStoreBenchmarker.AddV4(b, &store.DriverConfig{})
}
func BenchmarkIPStore_AddV6(b *testing.B) {
ipStoreBenchmarker.AddV6(b, &store.DriverConfig{})
}
func BenchmarkIPStore_LookupV4(b *testing.B) {
ipStoreBenchmarker.LookupV4(b, &store.DriverConfig{})
}
func BenchmarkIPStore_LookupV6(b *testing.B) {
ipStoreBenchmarker.LookupV6(b, &store.DriverConfig{})
}
func BenchmarkIPStore_AddRemoveV4(b *testing.B) {
ipStoreBenchmarker.AddRemoveV4(b, &store.DriverConfig{})
}
func BenchmarkIPStore_AddRemoveV6(b *testing.B) {
ipStoreBenchmarker.AddRemoveV6(b, &store.DriverConfig{})
}
func BenchmarkIPStore_LookupNonExistV4(b *testing.B) {
ipStoreBenchmarker.LookupNonExistV4(b, &store.DriverConfig{})
}
func BenchmarkIPStore_LookupNonExistV6(b *testing.B) {
ipStoreBenchmarker.LookupNonExistV6(b, &store.DriverConfig{})
}
func BenchmarkIPStore_RemoveNonExistV4(b *testing.B) {
ipStoreBenchmarker.RemoveNonExistV4(b, &store.DriverConfig{})
}
func BenchmarkIPStore_RemoveNonExistV6(b *testing.B) {
ipStoreBenchmarker.RemoveNonExistV6(b, &store.DriverConfig{})
}
func BenchmarkIPStore_AddV4Network(b *testing.B) {
ipStoreBenchmarker.AddV4Network(b, &store.DriverConfig{})
}
func BenchmarkIPStore_AddV6Network(b *testing.B) {
ipStoreBenchmarker.AddV6Network(b, &store.DriverConfig{})
}
func BenchmarkIPStore_LookupV4Network(b *testing.B) {
ipStoreBenchmarker.LookupV4Network(b, &store.DriverConfig{})
}
func BenchmarkIPStore_LookupV6Network(b *testing.B) {
ipStoreBenchmarker.LookupV6Network(b, &store.DriverConfig{})
}
func BenchmarkIPStore_AddRemoveV4Network(b *testing.B) {
ipStoreBenchmarker.AddRemoveV4Network(b, &store.DriverConfig{})
}
func BenchmarkIPStore_AddRemoveV6Network(b *testing.B) {
ipStoreBenchmarker.AddRemoveV6Network(b, &store.DriverConfig{})
}
func BenchmarkIPStore_RemoveNonExistV4Network(b *testing.B) {
ipStoreBenchmarker.RemoveNonExistV4Network(b, &store.DriverConfig{})
}
func BenchmarkIPStore_RemoveNonExistV6Network(b *testing.B) {
ipStoreBenchmarker.RemoveNonExistV6Network(b, &store.DriverConfig{})
}
func BenchmarkIPStore_Add1KV4(b *testing.B) {
ipStoreBenchmarker.Add1KV4(b, &store.DriverConfig{})
}
func BenchmarkIPStore_Add1KV6(b *testing.B) {
ipStoreBenchmarker.Add1KV6(b, &store.DriverConfig{})
}
func BenchmarkIPStore_Lookup1KV4(b *testing.B) {
ipStoreBenchmarker.Lookup1KV4(b, &store.DriverConfig{})
}
func BenchmarkIPStore_Lookup1KV6(b *testing.B) {
ipStoreBenchmarker.Lookup1KV6(b, &store.DriverConfig{})
}
func BenchmarkIPStore_AddRemove1KV4(b *testing.B) {
ipStoreBenchmarker.AddRemove1KV4(b, &store.DriverConfig{})
}
func BenchmarkIPStore_AddRemove1KV6(b *testing.B) {
ipStoreBenchmarker.AddRemove1KV6(b, &store.DriverConfig{})
}
func BenchmarkIPStore_LookupNonExist1KV4(b *testing.B) {
ipStoreBenchmarker.LookupNonExist1KV4(b, &store.DriverConfig{})
}
func BenchmarkIPStore_LookupNonExist1KV6(b *testing.B) {
ipStoreBenchmarker.LookupNonExist1KV6(b, &store.DriverConfig{})
}
func BenchmarkIPStore_RemoveNonExist1KV4(b *testing.B) {
ipStoreBenchmarker.RemoveNonExist1KV4(b, &store.DriverConfig{})
}
func BenchmarkIPStore_RemoveNonExist1KV6(b *testing.B) {
ipStoreBenchmarker.RemoveNonExist1KV6(b, &store.DriverConfig{})
}
func BenchmarkIPStore_Add1KV4Network(b *testing.B) {
ipStoreBenchmarker.Add1KV4Network(b, &store.DriverConfig{})
}
func BenchmarkIPStore_Add1KV6Network(b *testing.B) {
ipStoreBenchmarker.Add1KV6Network(b, &store.DriverConfig{})
}
func BenchmarkIPStore_Lookup1KV4Network(b *testing.B) {
ipStoreBenchmarker.Lookup1KV4Network(b, &store.DriverConfig{})
}
func BenchmarkIPStore_Lookup1KV6Network(b *testing.B) {
ipStoreBenchmarker.Lookup1KV6Network(b, &store.DriverConfig{})
}
func BenchmarkIPStore_AddRemove1KV4Network(b *testing.B) {
ipStoreBenchmarker.AddRemove1KV4Network(b, &store.DriverConfig{})
}
func BenchmarkIPStore_AddRemove1KV6Network(b *testing.B) {
ipStoreBenchmarker.AddRemove1KV6Network(b, &store.DriverConfig{})
}
func BenchmarkIPStore_RemoveNonExist1KV4Network(b *testing.B) {
ipStoreBenchmarker.RemoveNonExist1KV4Network(b, &store.DriverConfig{})
}
func BenchmarkIPStore_RemoveNonExist1KV6Network(b *testing.B) {
ipStoreBenchmarker.RemoveNonExist1KV6Network(b, &store.DriverConfig{})
}

View file

@ -35,6 +35,7 @@ func (d *peerStoreDriver) New(storecfg *store.DriverConfig) (store.PeerStore, er
}
return &peerStore{
shards: shards,
closed: make(chan struct{}),
}, nil
}
@ -72,6 +73,7 @@ type peerShard struct {
type peerStore struct {
shards []*peerShard
closed chan struct{}
}
var _ store.PeerStore = &peerStore{}
@ -100,6 +102,12 @@ func (s *peerStore) PutSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) error {
shard.Lock()
defer shard.Unlock()
select {
case <-s.closed:
panic("attempted to interact with stopped store")
default:
}
if shard.peers[key] == nil {
shard.peers[key] = make(map[string]peer)
}
@ -118,6 +126,12 @@ func (s *peerStore) DeleteSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) erro
shard.Lock()
defer shard.Unlock()
select {
case <-s.closed:
panic("attempted to interact with stopped store")
default:
}
if shard.peers[key] == nil {
return store.ErrResourceDoesNotExist
}
@ -143,6 +157,12 @@ func (s *peerStore) PutLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error
shard.Lock()
defer shard.Unlock()
select {
case <-s.closed:
panic("attempted to interact with stopped store")
default:
}
if shard.peers[key] == nil {
shard.peers[key] = make(map[string]peer)
}
@ -161,6 +181,12 @@ func (s *peerStore) DeleteLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) err
shard.Lock()
defer shard.Unlock()
select {
case <-s.closed:
panic("attempted to interact with stopped store")
default:
}
if shard.peers[key] == nil {
return store.ErrResourceDoesNotExist
}
@ -187,6 +213,12 @@ func (s *peerStore) GraduateLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) e
shard.Lock()
defer shard.Unlock()
select {
case <-s.closed:
panic("attempted to interact with stopped store")
default:
}
if shard.peers[lkey] != nil {
delete(shard.peers[lkey], peerKey(p))
}
@ -243,6 +275,12 @@ func (s *peerStore) AnnouncePeers(infoHash chihaya.InfoHash, seeder bool, numWan
shard.RLock()
defer shard.RUnlock()
select {
case <-s.closed:
panic("attempted to interact with stopped store")
default:
}
if seeder {
// Append leechers as possible.
leechers := shard.peers[lkey]
@ -307,6 +345,12 @@ func (s *peerStore) GetSeeders(infoHash chihaya.InfoHash) (peers, peers6 []chiha
shard.RLock()
defer shard.RUnlock()
select {
case <-s.closed:
panic("attempted to interact with stopped store")
default:
}
seeders := shard.peers[key]
for _, p := range seeders {
if p.IP.To4() == nil {
@ -324,6 +368,12 @@ func (s *peerStore) GetLeechers(infoHash chihaya.InfoHash) (peers, peers6 []chih
shard.RLock()
defer shard.RUnlock()
select {
case <-s.closed:
panic("attempted to interact with stopped store")
default:
}
leechers := shard.peers[key]
for _, p := range leechers {
if p.IP.To4() == nil {
@ -341,6 +391,12 @@ func (s *peerStore) NumSeeders(infoHash chihaya.InfoHash) int {
shard.RLock()
defer shard.RUnlock()
select {
case <-s.closed:
panic("attempted to interact with stopped store")
default:
}
return len(shard.peers[key])
}
@ -350,5 +406,33 @@ func (s *peerStore) NumLeechers(infoHash chihaya.InfoHash) int {
shard.RLock()
defer shard.RUnlock()
select {
case <-s.closed:
panic("attempted to interact with stopped store")
default:
}
return len(shard.peers[key])
}
func (s *peerStore) Stop() <-chan error {
toReturn := make(chan error)
go func() {
oldshards := s.shards
for _, shard := range oldshards {
shard.Lock()
}
shards := make([]*peerShard, len(oldshards))
for i := 0; i < len(oldshards); i++ {
shards[i] = &peerShard{}
shards[i].peers = make(map[string]map[string]peer)
}
s.shards = shards
close(s.closed)
for _, shard := range oldshards {
shard.Unlock()
}
close(toReturn)
}()
return toReturn
}

View file

@ -5,155 +5,138 @@
package memory
import (
"net"
"testing"
"time"
"github.com/chihaya/chihaya"
"github.com/chihaya/chihaya/server/store"
"github.com/stretchr/testify/assert"
)
func peerInSlice(peer chihaya.Peer, peers []chihaya.Peer) bool {
for _, v := range peers {
if v.Equal(peer) {
return true
}
var (
peerStoreTester = store.PreparePeerStoreTester(&peerStoreDriver{})
peerStoreBenchmarker = store.PreparePeerStoreBenchmarker(&peerStoreDriver{})
peerStoreTestConfig = &store.DriverConfig{}
)
func init() {
unmarshalledConfig := struct {
Shards int
}{
1,
}
return false
peerStoreTestConfig.Config = unmarshalledConfig
}
func TestPeerStoreAPI(t *testing.T) {
var (
hash = chihaya.InfoHash([20]byte{})
peers = []struct {
seeder bool
peerID string
ip string
port uint16
}{
{false, "-AZ3034-6wfG2wk6wWLc", "250.183.81.177", 5720},
{false, "-AZ3042-6ozMq5q6Q3NX", "38.241.13.19", 4833},
{false, "-BS5820-oy4La2MWGEFj", "fd45:7856:3dae::48", 2878},
{false, "-AR6360-6oZyyMWoOOBe", "fd0a:29a8:8445::38", 3167},
{true, "-AG2083-s1hiF8vGAAg0", "231.231.49.173", 1453},
{true, "-AG3003-lEl2Mm4NEO4n", "254.99.84.77", 7032},
{true, "-MR1100-00HS~T7*65rm", "211.229.100.129", 2614},
{true, "-LK0140-ATIV~nbEQAMr", "fdad:c435:bf79::12", 4114},
{true, "-KT2210-347143496631", "fdda:1b35:7d6e::9", 6179},
{true, "-TR0960-6ep6svaa61r4", "fd7f:78f0:4c77::55", 4727},
}
unmarshalledConfig = struct {
Shards int
}{
1,
}
config = store.DriverConfig{
Name: "memory",
Config: unmarshalledConfig,
}
d = &peerStoreDriver{}
)
s, err := d.New(&config)
assert.Nil(t, err)
assert.NotNil(t, s)
for _, p := range peers {
// Construct chihaya.Peer from test data.
peer := chihaya.Peer{
ID: chihaya.PeerIDFromString(p.peerID),
IP: net.ParseIP(p.ip),
Port: p.port,
}
if p.seeder {
err = s.PutSeeder(hash, peer)
} else {
err = s.PutLeecher(hash, peer)
}
assert.Nil(t, err)
}
leechers1, leechers61, err := s.GetLeechers(hash)
assert.Nil(t, err)
assert.NotEmpty(t, leechers1)
assert.NotEmpty(t, leechers61)
num := s.NumLeechers(hash)
assert.Equal(t, len(leechers1)+len(leechers61), num)
seeders1, seeders61, err := s.GetSeeders(hash)
assert.Nil(t, err)
assert.NotEmpty(t, seeders1)
assert.NotEmpty(t, seeders61)
num = s.NumSeeders(hash)
assert.Equal(t, len(seeders1)+len(seeders61), num)
leechers := append(leechers1, leechers61...)
seeders := append(seeders1, seeders61...)
for _, p := range peers {
// Construct chihaya.Peer from test data.
peer := chihaya.Peer{
ID: chihaya.PeerIDFromString(p.peerID),
IP: net.ParseIP(p.ip),
Port: p.port,
}
if p.seeder {
assert.True(t, peerInSlice(peer, seeders))
} else {
assert.True(t, peerInSlice(peer, leechers))
}
if p.seeder {
err = s.DeleteSeeder(hash, peer)
} else {
err = s.DeleteLeecher(hash, peer)
}
assert.Nil(t, err)
}
assert.Zero(t, s.NumLeechers(hash))
assert.Zero(t, s.NumSeeders(hash))
// Re-add all the peers to the peerStore.
for _, p := range peers {
// Construct chihaya.Peer from test data.
peer := chihaya.Peer{
ID: chihaya.PeerIDFromString(p.peerID),
IP: net.ParseIP(p.ip),
Port: p.port,
}
if p.seeder {
s.PutSeeder(hash, peer)
} else {
s.PutLeecher(hash, peer)
}
}
// Check that there are 6 seeders, and 4 leechers.
assert.Equal(t, 6, s.NumSeeders(hash))
assert.Equal(t, 4, s.NumLeechers(hash))
peer := chihaya.Peer{
ID: chihaya.PeerIDFromString(peers[0].peerID),
IP: net.ParseIP(peers[0].ip),
Port: peers[0].port,
}
err = s.GraduateLeecher(hash, peer)
assert.Nil(t, err)
// Check that there are 7 seeders, and 3 leechers after graduating a
// leecher to a seeder.
assert.Equal(t, 7, s.NumSeeders(hash))
assert.Equal(t, 3, s.NumLeechers(hash))
peers1, peers61, err := s.AnnouncePeers(hash, true, 5, peer, chihaya.Peer{})
assert.Nil(t, err)
assert.NotNil(t, peers1)
assert.NotNil(t, peers61)
err = s.CollectGarbage(time.Now())
assert.Nil(t, err)
assert.Equal(t, s.NumLeechers(hash), 0)
assert.Equal(t, s.NumSeeders(hash), 0)
func TestPeerStore(t *testing.T) {
peerStoreTester.TestPeerStore(t, peerStoreTestConfig)
}
func BenchmarkPeerStore_PutSeeder(b *testing.B) {
peerStoreBenchmarker.PutSeeder(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_PutSeeder1KInfohash(b *testing.B) {
peerStoreBenchmarker.PutSeeder1KInfohash(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_PutSeeder1KSeeders(b *testing.B) {
peerStoreBenchmarker.PutSeeder1KSeeders(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_PutSeeder1KInfohash1KSeeders(b *testing.B) {
peerStoreBenchmarker.PutSeeder1KInfohash1KSeeders(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_PutDeleteSeeder(b *testing.B) {
peerStoreBenchmarker.PutDeleteSeeder(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_PutDeleteSeeder1KInfohash(b *testing.B) {
peerStoreBenchmarker.PutDeleteSeeder1KInfohash(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_PutDeleteSeeder1KSeeders(b *testing.B) {
peerStoreBenchmarker.PutDeleteSeeder1KSeeders(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_PutDeleteSeeder1KInfohash1KSeeders(b *testing.B) {
peerStoreBenchmarker.PutDeleteSeeder1KInfohash1KSeeders(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_DeleteSeederNonExist(b *testing.B) {
peerStoreBenchmarker.DeleteSeederNonExist(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_DeleteSeederNonExist1KInfohash(b *testing.B) {
peerStoreBenchmarker.DeleteSeederNonExist1KInfohash(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_DeleteSeederNonExist1KSeeders(b *testing.B) {
peerStoreBenchmarker.DeleteSeederNonExist1KSeeders(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_DeleteSeederNonExist1KInfohash1KSeeders(b *testing.B) {
peerStoreBenchmarker.DeleteSeederNonExist1KInfohash1KSeeders(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_PutGraduateDeleteLeecher(b *testing.B) {
peerStoreBenchmarker.PutGraduateDeleteLeecher(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_PutGraduateDeleteLeecher1KInfohash(b *testing.B) {
peerStoreBenchmarker.PutGraduateDeleteLeecher1KInfohash(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_PutGraduateDeleteLeecher1KSeeders(b *testing.B) {
peerStoreBenchmarker.PutGraduateDeleteLeecher1KLeechers(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_PutGraduateDeleteLeecher1KInfohash1KSeeders(b *testing.B) {
peerStoreBenchmarker.PutGraduateDeleteLeecher1KInfohash1KLeechers(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_GraduateLeecherNonExist(b *testing.B) {
peerStoreBenchmarker.GraduateLeecherNonExist(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_GraduateLeecherNonExist1KInfohash(b *testing.B) {
peerStoreBenchmarker.GraduateLeecherNonExist1KInfohash(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_GraduateLeecherNonExist1KSeeders(b *testing.B) {
peerStoreBenchmarker.GraduateLeecherNonExist1KLeechers(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_GraduateLeecherNonExist1KInfohash1KSeeders(b *testing.B) {
peerStoreBenchmarker.GraduateLeecherNonExist1KInfohash1KLeechers(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_AnnouncePeers(b *testing.B) {
peerStoreBenchmarker.AnnouncePeers(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_AnnouncePeers1KInfohash(b *testing.B) {
peerStoreBenchmarker.AnnouncePeers1KInfohash(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_AnnouncePeersSeeder(b *testing.B) {
peerStoreBenchmarker.AnnouncePeersSeeder(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_AnnouncePeersSeeder1KInfohash(b *testing.B) {
peerStoreBenchmarker.AnnouncePeersSeeder1KInfohash(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_GetSeeders(b *testing.B) {
peerStoreBenchmarker.GetSeeders(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_GetSeeders1KInfohash(b *testing.B) {
peerStoreBenchmarker.GetSeeders1KInfohash(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_NumSeeders(b *testing.B) {
peerStoreBenchmarker.NumSeeders(b, peerStoreTestConfig)
}
func BenchmarkPeerStore_NumSeeders1KInfohash(b *testing.B) {
peerStoreBenchmarker.NumSeeders1KInfohash(b, peerStoreTestConfig)
}

View file

@ -19,11 +19,13 @@ type stringStoreDriver struct{}
func (d *stringStoreDriver) New(_ *store.DriverConfig) (store.StringStore, error) {
return &stringStore{
strings: make(map[string]struct{}),
closed: make(chan struct{}),
}, nil
}
type stringStore struct {
strings map[string]struct{}
closed chan struct{}
sync.RWMutex
}
@ -33,6 +35,12 @@ func (ss *stringStore) PutString(s string) error {
ss.Lock()
defer ss.Unlock()
select {
case <-ss.closed:
panic("attempted to interact with stopped store")
default:
}
ss.strings[s] = struct{}{}
return nil
@ -42,6 +50,12 @@ func (ss *stringStore) HasString(s string) (bool, error) {
ss.RLock()
defer ss.RUnlock()
select {
case <-ss.closed:
panic("attempted to interact with stopped store")
default:
}
_, ok := ss.strings[s]
return ok, nil
@ -51,6 +65,12 @@ func (ss *stringStore) RemoveString(s string) error {
ss.Lock()
defer ss.Unlock()
select {
case <-ss.closed:
panic("attempted to interact with stopped store")
default:
}
if _, ok := ss.strings[s]; !ok {
return store.ErrResourceDoesNotExist
}
@ -59,3 +79,15 @@ func (ss *stringStore) RemoveString(s string) error {
return nil
}
func (ss *stringStore) Stop() <-chan error {
toReturn := make(chan error)
go func() {
ss.Lock()
defer ss.Unlock()
ss.strings = make(map[string]struct{})
close(ss.closed)
close(toReturn)
}()
return toReturn
}

View file

@ -7,69 +7,95 @@ package memory
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/chihaya/chihaya/server/store"
)
var (
driver = &stringStoreDriver{}
s1 = "abc"
s2 = "def"
driver = &stringStoreDriver{}
stringStoreTester = store.PrepareStringStoreTester(driver)
stringStoreBenchmarker = store.PrepareStringStoreBenchmarker(&stringStoreDriver{})
)
func TestStringStore(t *testing.T) {
ss, err := driver.New(&store.DriverConfig{})
assert.Nil(t, err)
assert.NotNil(t, ss)
has, err := ss.HasString(s1)
assert.Nil(t, err)
assert.False(t, has)
has, err = ss.HasString(s2)
assert.Nil(t, err)
assert.False(t, has)
err = ss.RemoveString(s1)
assert.NotNil(t, err)
err = ss.PutString(s1)
assert.Nil(t, err)
has, err = ss.HasString(s1)
assert.Nil(t, err)
assert.True(t, has)
has, err = ss.HasString(s2)
assert.Nil(t, err)
assert.False(t, has)
err = ss.PutString(s1)
assert.Nil(t, err)
err = ss.PutString(s2)
assert.Nil(t, err)
has, err = ss.HasString(s1)
assert.Nil(t, err)
assert.True(t, has)
has, err = ss.HasString(s2)
assert.Nil(t, err)
assert.True(t, has)
err = ss.RemoveString(s1)
assert.Nil(t, err)
err = ss.RemoveString(s2)
assert.Nil(t, err)
has, err = ss.HasString(s1)
assert.Nil(t, err)
assert.False(t, has)
has, err = ss.HasString(s2)
assert.Nil(t, err)
assert.False(t, has)
stringStoreTester.TestStringStore(t, &store.DriverConfig{})
}
func BenchmarkStringStore_AddShort(b *testing.B) {
stringStoreBenchmarker.AddShort(b, &store.DriverConfig{})
}
func BenchmarkStringStore_AddLong(b *testing.B) {
stringStoreBenchmarker.AddLong(b, &store.DriverConfig{})
}
func BenchmarkStringStore_LookupShort(b *testing.B) {
stringStoreBenchmarker.LookupShort(b, &store.DriverConfig{})
}
func BenchmarkStringStore_LookupLong(b *testing.B) {
stringStoreBenchmarker.LookupLong(b, &store.DriverConfig{})
}
func BenchmarkStringStore_AddRemoveShort(b *testing.B) {
stringStoreBenchmarker.AddRemoveShort(b, &store.DriverConfig{})
}
func BenchmarkStringStore_AddRemoveLong(b *testing.B) {
stringStoreBenchmarker.AddRemoveLong(b, &store.DriverConfig{})
}
func BenchmarkStringStore_LookupNonExistShort(b *testing.B) {
stringStoreBenchmarker.LookupNonExistShort(b, &store.DriverConfig{})
}
func BenchmarkStringStore_LookupNonExistLong(b *testing.B) {
stringStoreBenchmarker.LookupNonExistLong(b, &store.DriverConfig{})
}
func BenchmarkStringStore_RemoveNonExistShort(b *testing.B) {
stringStoreBenchmarker.RemoveNonExistShort(b, &store.DriverConfig{})
}
func BenchmarkStringStore_RemoveNonExistLong(b *testing.B) {
stringStoreBenchmarker.RemoveNonExistLong(b, &store.DriverConfig{})
}
func BenchmarkStringStore_Add1KShort(b *testing.B) {
stringStoreBenchmarker.Add1KShort(b, &store.DriverConfig{})
}
func BenchmarkStringStore_Add1KLong(b *testing.B) {
stringStoreBenchmarker.Add1KLong(b, &store.DriverConfig{})
}
func BenchmarkStringStore_Lookup1KShort(b *testing.B) {
stringStoreBenchmarker.Lookup1KShort(b, &store.DriverConfig{})
}
func BenchmarkStringStore_Lookup1KLong(b *testing.B) {
stringStoreBenchmarker.Lookup1KLong(b, &store.DriverConfig{})
}
func BenchmarkStringStore_AddRemove1KShort(b *testing.B) {
stringStoreBenchmarker.AddRemove1KShort(b, &store.DriverConfig{})
}
func BenchmarkStringStore_AddRemove1KLong(b *testing.B) {
stringStoreBenchmarker.AddRemove1KLong(b, &store.DriverConfig{})
}
func BenchmarkStringStore_LookupNonExist1KShort(b *testing.B) {
stringStoreBenchmarker.LookupNonExist1KShort(b, &store.DriverConfig{})
}
func BenchmarkStringStore_LookupNonExist1KLong(b *testing.B) {
stringStoreBenchmarker.LookupNonExist1KLong(b, &store.DriverConfig{})
}
func BenchmarkStringStore_RemoveNonExist1KShort(b *testing.B) {
stringStoreBenchmarker.RemoveNonExist1KShort(b, &store.DriverConfig{})
}
func BenchmarkStringStore_RemoveNonExist1KLong(b *testing.B) {
stringStoreBenchmarker.RemoveNonExist1KLong(b, &store.DriverConfig{})
}

View file

@ -14,7 +14,7 @@ func init() {
tracker.RegisterAnnounceMiddleware("infohash_blacklist", blacklistAnnounceInfohash)
tracker.RegisterScrapeMiddlewareConstructor("infohash_blacklist", blacklistScrapeInfohash)
mustGetStore = func() store.StringStore {
return store.MustGetStore()
return store.MustGetStore().StringStore
}
}

View file

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/chihaya/chihaya"
"github.com/chihaya/chihaya/pkg/stopper"
"github.com/chihaya/chihaya/server/store"
"github.com/chihaya/chihaya/tracker"
)
@ -36,6 +37,10 @@ func (ss *storeMock) RemoveString(s string) error {
return nil
}
func (ss *storeMock) Stop() <-chan error {
return stopper.AlreadyStopped
}
var mock store.StringStore = &storeMock{
strings: make(map[string]struct{}),
}

View file

@ -9,6 +9,7 @@ import (
"time"
"github.com/chihaya/chihaya"
"github.com/chihaya/chihaya/pkg/stopper"
)
var peerStoreDrivers = make(map[string]PeerStoreDriver)
@ -61,6 +62,12 @@ type PeerStore interface {
NumSeeders(infoHash chihaya.InfoHash) int
// NumLeechers gets the amount of leechers for a particular infoHash.
NumLeechers(infoHash chihaya.InfoHash) int
// Stopper provides the Stop method that stops the PeerStore.
// Stop should shut down the PeerStore in a separate goroutine and send
// an error to the channel if the shutdown failed. If the shutdown
// was successful, the channel is to be closed.
stopper.Stopper
}
// PeerStoreDriver represents an interface for creating a handle to the storage

View file

@ -7,12 +7,12 @@ package store
import (
"errors"
"log"
"sync"
"time"
"gopkg.in/yaml.v2"
"github.com/chihaya/chihaya"
"github.com/chihaya/chihaya/pkg/stopper"
"github.com/chihaya/chihaya/server"
"github.com/chihaya/chihaya/tracker"
)
@ -34,29 +34,34 @@ func constructor(srvcfg *chihaya.ServerConfig, tkr *tracker.Tracker) (server.Ser
return nil, errors.New("store: invalid store config: " + err.Error())
}
theStore = &Store{
cfg: cfg,
tkr: tkr,
shutdown: make(chan struct{}),
sg: stopper.NewStopGroup(),
}
ps, err := OpenPeerStore(&cfg.PeerStore)
if err != nil {
return nil, err
}
theStore.sg.Add(ps)
ips, err := OpenIPStore(&cfg.IPStore)
if err != nil {
return nil, err
}
theStore.sg.Add(ips)
ss, err := OpenStringStore(&cfg.StringStore)
if err != nil {
return nil, err
}
theStore.sg.Add(ss)
theStore = &Store{
cfg: cfg,
tkr: tkr,
shutdown: make(chan struct{}),
PeerStore: ps,
IPStore: ips,
StringStore: ss,
}
theStore.PeerStore = ps
theStore.IPStore = ips
theStore.StringStore = ss
}
return theStore, nil
}
@ -110,7 +115,7 @@ type Store struct {
cfg *Config
tkr *tracker.Tracker
shutdown chan struct{}
wg sync.WaitGroup
sg *stopper.StopGroup
PeerStore
IPStore
@ -120,12 +125,18 @@ type Store struct {
// Start starts the store drivers and blocks until all of them exit.
func (s *Store) Start() {
<-s.shutdown
s.wg.Wait()
log.Println("Store server shut down cleanly")
}
// Stop stops the store drivers and waits for them to exit.
func (s *Store) Stop() {
errors := s.sg.Stop()
if len(errors) == 0 {
log.Println("Store server shut down cleanly")
} else {
log.Println("Store server: failed to shutdown drivers")
for _, err := range errors {
log.Println(err.Error())
}
}
close(s.shutdown)
s.wg.Wait()
}

1260
server/store/store_bench.go Normal file

File diff suppressed because it is too large Load diff

515
server/store/store_tests.go Normal file
View file

@ -0,0 +1,515 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package store
import (
"testing"
"net"
"time"
"github.com/chihaya/chihaya"
"github.com/stretchr/testify/require"
)
// StringStoreTester is a collection of tests for a StringStore driver.
// Every benchmark expects a new, clean storage. Every benchmark should be
// called with a DriverConfig that ensures this.
type StringStoreTester interface {
TestStringStore(*testing.T, *DriverConfig)
}
var _ StringStoreTester = &stringStoreTester{}
type stringStoreTester struct {
s1, s2 string
driver StringStoreDriver
}
// PrepareStringStoreTester prepares a reusable suite for StringStore driver
// tests.
func PrepareStringStoreTester(driver StringStoreDriver) StringStoreTester {
return &stringStoreTester{
s1: "abc",
s2: "def",
driver: driver,
}
}
func (s *stringStoreTester) TestStringStore(t *testing.T, cfg *DriverConfig) {
ss, err := s.driver.New(cfg)
require.Nil(t, err)
require.NotNil(t, ss)
has, err := ss.HasString(s.s1)
require.Nil(t, err)
require.False(t, has)
has, err = ss.HasString(s.s2)
require.Nil(t, err)
require.False(t, has)
err = ss.RemoveString(s.s1)
require.NotNil(t, err)
err = ss.PutString(s.s1)
require.Nil(t, err)
has, err = ss.HasString(s.s1)
require.Nil(t, err)
require.True(t, has)
has, err = ss.HasString(s.s2)
require.Nil(t, err)
require.False(t, has)
err = ss.PutString(s.s1)
require.Nil(t, err)
err = ss.PutString(s.s2)
require.Nil(t, err)
has, err = ss.HasString(s.s1)
require.Nil(t, err)
require.True(t, has)
has, err = ss.HasString(s.s2)
require.Nil(t, err)
require.True(t, has)
err = ss.RemoveString(s.s1)
require.Nil(t, err)
err = ss.RemoveString(s.s2)
require.Nil(t, err)
has, err = ss.HasString(s.s1)
require.Nil(t, err)
require.False(t, has)
has, err = ss.HasString(s.s2)
require.Nil(t, err)
require.False(t, has)
errChan := ss.Stop()
err = <-errChan
require.Nil(t, err, "StringStore shutdown must not fail")
}
// IPStoreTester is a collection of tests for an IPStore driver.
// Every benchmark expects a new, clean storage. Every benchmark should be
// called with a DriverConfig that ensures this.
type IPStoreTester interface {
TestIPStore(*testing.T, *DriverConfig)
TestHasAllHasAny(*testing.T, *DriverConfig)
TestNetworks(*testing.T, *DriverConfig)
TestHasAllHasAnyNetworks(*testing.T, *DriverConfig)
}
var _ IPStoreTester = &ipStoreTester{}
type ipStoreTester struct {
v6, v4, v4s net.IP
net1, net2 string
inNet1, inNet2 net.IP
excluded net.IP
driver IPStoreDriver
}
// PrepareIPStoreTester prepares a reusable suite for IPStore driver
// tests.
func PrepareIPStoreTester(driver IPStoreDriver) IPStoreTester {
return &ipStoreTester{
v6: net.ParseIP("0c22:384e:0:0c22:384e::68"),
v4: net.ParseIP("12.13.14.15"),
v4s: net.ParseIP("12.13.14.15").To4(),
net1: "192.168.22.255/24",
net2: "192.168.23.255/24",
inNet1: net.ParseIP("192.168.22.22"),
inNet2: net.ParseIP("192.168.23.23"),
excluded: net.ParseIP("10.154.243.22"),
driver: driver,
}
}
func (s *ipStoreTester) TestIPStore(t *testing.T, cfg *DriverConfig) {
is, err := s.driver.New(cfg)
require.Nil(t, err)
require.NotNil(t, is)
// check default state
found, err := is.HasIP(s.v4)
require.Nil(t, err)
require.False(t, found)
// check IPv4
err = is.AddIP(s.v4)
require.Nil(t, err)
found, err = is.HasIP(s.v4)
require.Nil(t, err)
require.True(t, found)
found, err = is.HasIP(s.v4s)
require.Nil(t, err)
require.True(t, found)
found, err = is.HasIP(s.v6)
require.Nil(t, err)
require.False(t, found)
// check removes
err = is.RemoveIP(s.v6)
require.NotNil(t, err)
err = is.RemoveIP(s.v4s)
require.Nil(t, err)
found, err = is.HasIP(s.v4)
require.Nil(t, err)
require.False(t, found)
// check IPv6
err = is.AddIP(s.v6)
require.Nil(t, err)
found, err = is.HasIP(s.v6)
require.Nil(t, err)
require.True(t, found)
err = is.RemoveIP(s.v6)
require.Nil(t, err)
found, err = is.HasIP(s.v6)
require.Nil(t, err)
require.False(t, found)
errChan := is.Stop()
err = <-errChan
require.Nil(t, err, "IPStore shutdown must not fail")
}
func (s *ipStoreTester) TestHasAllHasAny(t *testing.T, cfg *DriverConfig) {
is, err := s.driver.New(cfg)
require.Nil(t, err)
require.NotNil(t, is)
found, err := is.HasAnyIP(nil)
require.Nil(t, err)
require.False(t, found)
found, err = is.HasAllIPs(nil)
require.Nil(t, err)
require.True(t, found)
found, err = is.HasAllIPs([]net.IP{s.v6})
require.Nil(t, err)
require.False(t, found)
err = is.AddIP(s.v4)
require.Nil(t, err)
found, err = is.HasAnyIP([]net.IP{s.v6, s.v4})
require.Nil(t, err)
require.True(t, found)
found, err = is.HasAllIPs([]net.IP{s.v6, s.v4})
require.Nil(t, err)
require.False(t, found)
found, err = is.HasAllIPs([]net.IP{s.v4})
require.Nil(t, err)
require.True(t, found)
err = is.AddIP(s.v6)
require.Nil(t, err)
found, err = is.HasAnyIP([]net.IP{s.v6, s.v6})
require.Nil(t, err)
require.True(t, found)
found, err = is.HasAllIPs([]net.IP{s.v6, s.v6})
require.Nil(t, err)
require.True(t, found)
errChan := is.Stop()
err = <-errChan
require.Nil(t, err, "IPStore shutdown must not fail")
}
func (s *ipStoreTester) TestNetworks(t *testing.T, cfg *DriverConfig) {
is, err := s.driver.New(cfg)
require.Nil(t, err)
require.NotNil(t, is)
match, err := is.HasIP(s.inNet1)
require.Nil(t, err)
require.False(t, match)
match, err = is.HasIP(s.inNet2)
require.Nil(t, err)
require.False(t, match)
err = is.AddNetwork("")
require.NotNil(t, err)
err = is.RemoveNetwork("")
require.NotNil(t, err)
err = is.AddNetwork(s.net1)
require.Nil(t, err)
match, err = is.HasIP(s.inNet1)
require.Nil(t, err)
require.True(t, match)
match, err = is.HasIP(s.inNet2)
require.Nil(t, err)
require.False(t, match)
err = is.RemoveNetwork(s.net2)
require.NotNil(t, err)
err = is.RemoveNetwork(s.net1)
require.Nil(t, err)
match, err = is.HasIP(s.inNet1)
require.Nil(t, err)
require.False(t, match)
match, err = is.HasIP(s.inNet2)
require.Nil(t, err)
require.False(t, match)
errChan := is.Stop()
err = <-errChan
require.Nil(t, err, "IPStore shutdown must not fail")
}
func (s *ipStoreTester) TestHasAllHasAnyNetworks(t *testing.T, cfg *DriverConfig) {
is, err := s.driver.New(cfg)
require.Nil(t, err)
require.NotNil(t, s)
match, err := is.HasAnyIP([]net.IP{s.inNet1, s.inNet2, s.excluded})
require.Nil(t, err)
require.False(t, match)
match, err = is.HasAllIPs([]net.IP{s.inNet1, s.inNet2, s.excluded})
require.Nil(t, err)
require.False(t, match)
err = is.AddNetwork(s.net1)
require.Nil(t, err)
match, err = is.HasAnyIP([]net.IP{s.inNet1, s.inNet2})
require.Nil(t, err)
require.True(t, match)
match, err = is.HasAllIPs([]net.IP{s.inNet1, s.inNet2})
require.Nil(t, err)
require.False(t, match)
err = is.AddNetwork(s.net2)
require.Nil(t, err)
match, err = is.HasAnyIP([]net.IP{s.inNet1, s.inNet2, s.excluded})
require.Nil(t, err)
require.True(t, match)
match, err = is.HasAllIPs([]net.IP{s.inNet1, s.inNet2})
require.Nil(t, err)
require.True(t, match)
match, err = is.HasAllIPs([]net.IP{s.inNet1, s.inNet2, s.excluded})
require.Nil(t, err)
require.False(t, match)
err = is.RemoveNetwork(s.net1)
require.Nil(t, err)
match, err = is.HasAnyIP([]net.IP{s.inNet1, s.inNet2})
require.Nil(t, err)
require.True(t, match)
match, err = is.HasAllIPs([]net.IP{s.inNet1, s.inNet2})
require.Nil(t, err)
require.False(t, match)
err = is.RemoveNetwork(s.net2)
require.Nil(t, err)
match, err = is.HasAnyIP([]net.IP{s.inNet1, s.inNet2})
require.Nil(t, err)
require.False(t, match)
match, err = is.HasAllIPs([]net.IP{s.inNet1, s.inNet2})
require.Nil(t, err)
require.False(t, match)
errChan := is.Stop()
err = <-errChan
require.Nil(t, err, "IPStore shutdown must not fail")
}
// PeerStoreTester is a collection of tests for a PeerStore driver.
// Every benchmark expects a new, clean storage. Every benchmark should be
// called with a DriverConfig that ensures this.
type PeerStoreTester interface {
TestPeerStore(*testing.T, *DriverConfig)
}
var _ PeerStoreTester = &peerStoreTester{}
type peerStoreTester struct {
driver PeerStoreDriver
}
// PreparePeerStoreTester prepares a reusable suite for PeerStore driver
// tests.
func PreparePeerStoreTester(driver PeerStoreDriver) PeerStoreTester {
return &peerStoreTester{
driver: driver,
}
}
func peerInSlice(peer chihaya.Peer, peers []chihaya.Peer) bool {
for _, v := range peers {
if v.Equal(peer) {
return true
}
}
return false
}
func (pt *peerStoreTester) TestPeerStore(t *testing.T, cfg *DriverConfig) {
var (
hash = chihaya.InfoHash([20]byte{})
peers = []struct {
seeder bool
peerID string
ip string
port uint16
}{
{false, "-AZ3034-6wfG2wk6wWLc", "250.183.81.177", 5720},
{false, "-AZ3042-6ozMq5q6Q3NX", "38.241.13.19", 4833},
{false, "-BS5820-oy4La2MWGEFj", "fd45:7856:3dae::48", 2878},
{false, "-AR6360-6oZyyMWoOOBe", "fd0a:29a8:8445::38", 3167},
{true, "-AG2083-s1hiF8vGAAg0", "231.231.49.173", 1453},
{true, "-AG3003-lEl2Mm4NEO4n", "254.99.84.77", 7032},
{true, "-MR1100-00HS~T7*65rm", "211.229.100.129", 2614},
{true, "-LK0140-ATIV~nbEQAMr", "fdad:c435:bf79::12", 4114},
{true, "-KT2210-347143496631", "fdda:1b35:7d6e::9", 6179},
{true, "-TR0960-6ep6svaa61r4", "fd7f:78f0:4c77::55", 4727},
}
)
s, err := pt.driver.New(cfg)
require.Nil(t, err)
require.NotNil(t, s)
for _, p := range peers {
// Construct chihaya.Peer from test data.
peer := chihaya.Peer{
ID: chihaya.PeerIDFromString(p.peerID),
IP: net.ParseIP(p.ip),
Port: p.port,
}
if p.seeder {
err = s.PutSeeder(hash, peer)
} else {
err = s.PutLeecher(hash, peer)
}
require.Nil(t, err)
}
leechers1, leechers61, err := s.GetLeechers(hash)
require.Nil(t, err)
require.NotEmpty(t, leechers1)
require.NotEmpty(t, leechers61)
num := s.NumLeechers(hash)
require.Equal(t, len(leechers1)+len(leechers61), num)
seeders1, seeders61, err := s.GetSeeders(hash)
require.Nil(t, err)
require.NotEmpty(t, seeders1)
require.NotEmpty(t, seeders61)
num = s.NumSeeders(hash)
require.Equal(t, len(seeders1)+len(seeders61), num)
leechers := append(leechers1, leechers61...)
seeders := append(seeders1, seeders61...)
for _, p := range peers {
// Construct chihaya.Peer from test data.
peer := chihaya.Peer{
ID: chihaya.PeerIDFromString(p.peerID),
IP: net.ParseIP(p.ip),
Port: p.port,
}
if p.seeder {
require.True(t, peerInSlice(peer, seeders))
} else {
require.True(t, peerInSlice(peer, leechers))
}
if p.seeder {
err = s.DeleteSeeder(hash, peer)
} else {
err = s.DeleteLeecher(hash, peer)
}
require.Nil(t, err)
}
require.Zero(t, s.NumLeechers(hash))
require.Zero(t, s.NumSeeders(hash))
// Re-add all the peers to the peerStore.
for _, p := range peers {
// Construct chihaya.Peer from test data.
peer := chihaya.Peer{
ID: chihaya.PeerIDFromString(p.peerID),
IP: net.ParseIP(p.ip),
Port: p.port,
}
if p.seeder {
s.PutSeeder(hash, peer)
} else {
s.PutLeecher(hash, peer)
}
}
// Check that there are 6 seeders, and 4 leechers.
require.Equal(t, 6, s.NumSeeders(hash))
require.Equal(t, 4, s.NumLeechers(hash))
peer := chihaya.Peer{
ID: chihaya.PeerIDFromString(peers[0].peerID),
IP: net.ParseIP(peers[0].ip),
Port: peers[0].port,
}
err = s.GraduateLeecher(hash, peer)
require.Nil(t, err)
// Check that there are 7 seeders, and 3 leechers after graduating a
// leecher to a seeder.
require.Equal(t, 7, s.NumSeeders(hash))
require.Equal(t, 3, s.NumLeechers(hash))
peers1, peers61, err := s.AnnouncePeers(hash, true, 5, peer, chihaya.Peer{})
require.Nil(t, err)
require.NotNil(t, peers1)
require.NotNil(t, peers61)
err = s.CollectGarbage(time.Now())
require.Nil(t, err)
require.Equal(t, s.NumLeechers(hash), 0)
require.Equal(t, s.NumSeeders(hash), 0)
errChan := s.Stop()
err = <-errChan
require.Nil(t, err, "PeerStore shutdown must not fail")
}

View file

@ -4,7 +4,11 @@
package store
import "fmt"
import (
"fmt"
"github.com/chihaya/chihaya/pkg/stopper"
)
var stringStoreDrivers = make(map[string]StringStoreDriver)
@ -21,6 +25,12 @@ type StringStore interface {
// Returns ErrResourceDoesNotExist if the given string is not contained
// in the store.
RemoveString(s string) error
// Stopper provides the Stop method that stops the StringStore.
// Stop should shut down the StringStore in a separate goroutine and send
// an error to the channel if the shutdown failed. If the shutdown
// was successful, the channel is to be closed.
stopper.Stopper
}
// StringStoreDriver represents an interface for creating a handle to the