store: use stopper, extraxt StringStore tests
This commit is contained in:
parent
7e819dbb0c
commit
f4101f83e0
13 changed files with 465 additions and 189 deletions
|
@ -7,6 +7,8 @@ package store
|
|||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"github.com/chihaya/chihaya/pkg/stopper"
|
||||
)
|
||||
|
||||
var ipStoreDrivers = make(map[string]IPStoreDriver)
|
||||
|
@ -52,6 +54,12 @@ type IPStore interface {
|
|||
// Returns ErrResourceDoesNotExist if the given network is not
|
||||
// contained in the store.
|
||||
RemoveNetwork(network string) error
|
||||
|
||||
// Stopper provides the Stop method that stops the IPStore.
|
||||
// Stop should shut down the IPStore in a separate goroutine and send
|
||||
// an error to the channel if the shutdown failed. If the shutdown
|
||||
// was successful, the channel is to be closed.
|
||||
stopper.Stopper
|
||||
}
|
||||
|
||||
// IPStoreDriver represents an interface for creating a handle to the
|
||||
|
|
|
@ -23,6 +23,7 @@ func (d *ipStoreDriver) New(_ *store.DriverConfig) (store.IPStore, error) {
|
|||
return &ipStore{
|
||||
ips: make(map[[16]byte]struct{}),
|
||||
networks: netmatch.New(),
|
||||
closed: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -31,6 +32,7 @@ func (d *ipStoreDriver) New(_ *store.DriverConfig) (store.IPStore, error) {
|
|||
type ipStore struct {
|
||||
ips map[[16]byte]struct{}
|
||||
networks *netmatch.Trie
|
||||
closed chan struct{}
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
|
@ -65,6 +67,12 @@ func (s *ipStore) AddNetwork(network string) error {
|
|||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
select {
|
||||
case <-s.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
return s.networks.Add(key, length)
|
||||
}
|
||||
|
||||
|
@ -72,6 +80,12 @@ func (s *ipStore) AddIP(ip net.IP) error {
|
|||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
select {
|
||||
case <-s.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
s.ips[key(ip)] = struct{}{}
|
||||
|
||||
return nil
|
||||
|
@ -82,6 +96,12 @@ func (s *ipStore) HasIP(ip net.IP) (bool, error) {
|
|||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
||||
select {
|
||||
case <-s.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
_, ok := s.ips[key]
|
||||
if ok {
|
||||
return true, nil
|
||||
|
@ -99,6 +119,12 @@ func (s *ipStore) HasAnyIP(ips []net.IP) (bool, error) {
|
|||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
||||
select {
|
||||
case <-s.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
for _, ip := range ips {
|
||||
key := key(ip)
|
||||
if _, ok := s.ips[key]; ok {
|
||||
|
@ -121,6 +147,12 @@ func (s *ipStore) HasAllIPs(ips []net.IP) (bool, error) {
|
|||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
||||
select {
|
||||
case <-s.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
for _, ip := range ips {
|
||||
key := key(ip)
|
||||
if _, ok := s.ips[key]; !ok {
|
||||
|
@ -142,6 +174,12 @@ func (s *ipStore) RemoveIP(ip net.IP) error {
|
|||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
select {
|
||||
case <-s.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
if _, ok := s.ips[key]; !ok {
|
||||
return store.ErrResourceDoesNotExist
|
||||
}
|
||||
|
@ -160,9 +198,28 @@ func (s *ipStore) RemoveNetwork(network string) error {
|
|||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
select {
|
||||
case <-s.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
err = s.networks.Remove(key, length)
|
||||
if err != nil && err == netmatch.ErrNotContained {
|
||||
return store.ErrResourceDoesNotExist
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *ipStore) Stop() <-chan error {
|
||||
toReturn := make(chan error)
|
||||
go func() {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.ips = make(map[[16]byte]struct{})
|
||||
s.networks = netmatch.New()
|
||||
close(s.closed)
|
||||
close(toReturn)
|
||||
}()
|
||||
return toReturn
|
||||
}
|
||||
|
|
|
@ -10,7 +10,7 @@ import (
|
|||
|
||||
"github.com/chihaya/chihaya/server/store"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -31,7 +31,7 @@ func TestKey(t *testing.T) {
|
|||
|
||||
for _, tt := range table {
|
||||
got := key(tt.input)
|
||||
assert.Equal(t, got, tt.expected)
|
||||
require.Equal(t, got, tt.expected)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -39,100 +39,108 @@ func TestIPStore(t *testing.T) {
|
|||
var d = &ipStoreDriver{}
|
||||
|
||||
s, err := d.New(&store.DriverConfig{})
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, s)
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, s)
|
||||
|
||||
// check default state
|
||||
found, err := s.HasIP(v4)
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, found)
|
||||
require.Nil(t, err)
|
||||
require.False(t, found)
|
||||
|
||||
// check IPv4
|
||||
err = s.AddIP(v4)
|
||||
assert.Nil(t, err)
|
||||
require.Nil(t, err)
|
||||
|
||||
found, err = s.HasIP(v4)
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, found)
|
||||
require.Nil(t, err)
|
||||
require.True(t, found)
|
||||
|
||||
found, err = s.HasIP(v4s)
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, found)
|
||||
require.Nil(t, err)
|
||||
require.True(t, found)
|
||||
|
||||
found, err = s.HasIP(v6)
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, found)
|
||||
require.Nil(t, err)
|
||||
require.False(t, found)
|
||||
|
||||
// check removes
|
||||
err = s.RemoveIP(v6)
|
||||
assert.NotNil(t, err)
|
||||
require.NotNil(t, err)
|
||||
|
||||
err = s.RemoveIP(v4s)
|
||||
assert.Nil(t, err)
|
||||
require.Nil(t, err)
|
||||
|
||||
found, err = s.HasIP(v4)
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, found)
|
||||
require.Nil(t, err)
|
||||
require.False(t, found)
|
||||
|
||||
// check IPv6
|
||||
err = s.AddIP(v6)
|
||||
assert.Nil(t, err)
|
||||
require.Nil(t, err)
|
||||
|
||||
found, err = s.HasIP(v6)
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, found)
|
||||
require.Nil(t, err)
|
||||
require.True(t, found)
|
||||
|
||||
err = s.RemoveIP(v6)
|
||||
assert.Nil(t, err)
|
||||
require.Nil(t, err)
|
||||
|
||||
found, err = s.HasIP(v6)
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, found)
|
||||
require.Nil(t, err)
|
||||
require.False(t, found)
|
||||
|
||||
errChan := s.Stop()
|
||||
err = <-errChan
|
||||
require.Nil(t, err, "IPStore shutdown must not fail")
|
||||
}
|
||||
|
||||
func TestHasAllHasAny(t *testing.T) {
|
||||
var d = &ipStoreDriver{}
|
||||
s, err := d.New(&store.DriverConfig{})
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, s)
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, s)
|
||||
|
||||
found, err := s.HasAnyIP(nil)
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, found)
|
||||
require.Nil(t, err)
|
||||
require.False(t, found)
|
||||
|
||||
found, err = s.HasAllIPs(nil)
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, found)
|
||||
require.Nil(t, err)
|
||||
require.True(t, found)
|
||||
|
||||
found, err = s.HasAllIPs([]net.IP{v4})
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, found)
|
||||
require.Nil(t, err)
|
||||
require.False(t, found)
|
||||
|
||||
err = s.AddIP(v4)
|
||||
assert.Nil(t, err)
|
||||
require.Nil(t, err)
|
||||
|
||||
found, err = s.HasAnyIP([]net.IP{v4, v6})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, found)
|
||||
require.Nil(t, err)
|
||||
require.True(t, found)
|
||||
|
||||
found, err = s.HasAllIPs([]net.IP{v4, v6})
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, found)
|
||||
require.Nil(t, err)
|
||||
require.False(t, found)
|
||||
|
||||
found, err = s.HasAllIPs([]net.IP{v4})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, found)
|
||||
require.Nil(t, err)
|
||||
require.True(t, found)
|
||||
|
||||
err = s.AddIP(v6)
|
||||
assert.Nil(t, err)
|
||||
require.Nil(t, err)
|
||||
|
||||
found, err = s.HasAnyIP([]net.IP{v4, v6})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, found)
|
||||
require.Nil(t, err)
|
||||
require.True(t, found)
|
||||
|
||||
found, err = s.HasAllIPs([]net.IP{v4, v6})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, found)
|
||||
require.Nil(t, err)
|
||||
require.True(t, found)
|
||||
|
||||
errChan := s.Stop()
|
||||
err = <-errChan
|
||||
require.Nil(t, err, "IPStore shutdown must not fail")
|
||||
}
|
||||
|
||||
func TestNetworks(t *testing.T) {
|
||||
|
@ -145,46 +153,51 @@ func TestNetworks(t *testing.T) {
|
|||
)
|
||||
|
||||
s, err := d.New(&store.DriverConfig{})
|
||||
assert.Nil(t, err)
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, s)
|
||||
|
||||
match, err := s.HasIP(includedIP)
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, match)
|
||||
require.Nil(t, err)
|
||||
require.False(t, match)
|
||||
|
||||
match, err = s.HasIP(excludedIP)
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, match)
|
||||
require.Nil(t, err)
|
||||
require.False(t, match)
|
||||
|
||||
err = s.AddNetwork("")
|
||||
assert.NotNil(t, err)
|
||||
require.NotNil(t, err)
|
||||
|
||||
err = s.RemoveNetwork("")
|
||||
assert.NotNil(t, err)
|
||||
require.NotNil(t, err)
|
||||
|
||||
err = s.AddNetwork(net1)
|
||||
assert.Nil(t, err)
|
||||
require.Nil(t, err)
|
||||
|
||||
match, err = s.HasIP(includedIP)
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, match)
|
||||
require.Nil(t, err)
|
||||
require.True(t, match)
|
||||
|
||||
match, err = s.HasIP(excludedIP)
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, match)
|
||||
require.Nil(t, err)
|
||||
require.False(t, match)
|
||||
|
||||
err = s.RemoveNetwork(net2)
|
||||
assert.NotNil(t, err)
|
||||
require.NotNil(t, err)
|
||||
|
||||
err = s.RemoveNetwork(net1)
|
||||
assert.Nil(t, err)
|
||||
require.Nil(t, err)
|
||||
|
||||
match, err = s.HasIP(includedIP)
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, match)
|
||||
require.Nil(t, err)
|
||||
require.False(t, match)
|
||||
|
||||
match, err = s.HasIP(excludedIP)
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, match)
|
||||
require.Nil(t, err)
|
||||
require.False(t, match)
|
||||
|
||||
errChan := s.Stop()
|
||||
err = <-errChan
|
||||
require.Nil(t, err, "IPStore shutdown must not fail")
|
||||
}
|
||||
|
||||
func TestHasAllHasAnyNetworks(t *testing.T) {
|
||||
|
@ -197,61 +210,66 @@ func TestHasAllHasAnyNetworks(t *testing.T) {
|
|||
excluded = net.ParseIP("10.154.243.22")
|
||||
)
|
||||
s, err := d.New(&store.DriverConfig{})
|
||||
assert.Nil(t, err)
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, s)
|
||||
|
||||
match, err := s.HasAnyIP([]net.IP{inNet1, inNet2, excluded})
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, match)
|
||||
require.Nil(t, err)
|
||||
require.False(t, match)
|
||||
|
||||
match, err = s.HasAllIPs([]net.IP{inNet1, inNet2, excluded})
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, match)
|
||||
require.Nil(t, err)
|
||||
require.False(t, match)
|
||||
|
||||
err = s.AddNetwork(net1)
|
||||
assert.Nil(t, err)
|
||||
require.Nil(t, err)
|
||||
|
||||
match, err = s.HasAnyIP([]net.IP{inNet1, inNet2})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, match)
|
||||
require.Nil(t, err)
|
||||
require.True(t, match)
|
||||
|
||||
match, err = s.HasAllIPs([]net.IP{inNet1, inNet2})
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, match)
|
||||
require.Nil(t, err)
|
||||
require.False(t, match)
|
||||
|
||||
err = s.AddNetwork(net2)
|
||||
assert.Nil(t, err)
|
||||
require.Nil(t, err)
|
||||
|
||||
match, err = s.HasAnyIP([]net.IP{inNet1, inNet2, excluded})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, match)
|
||||
require.Nil(t, err)
|
||||
require.True(t, match)
|
||||
|
||||
match, err = s.HasAllIPs([]net.IP{inNet1, inNet2})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, match)
|
||||
require.Nil(t, err)
|
||||
require.True(t, match)
|
||||
|
||||
match, err = s.HasAllIPs([]net.IP{inNet1, inNet2, excluded})
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, match)
|
||||
require.Nil(t, err)
|
||||
require.False(t, match)
|
||||
|
||||
err = s.RemoveNetwork(net1)
|
||||
assert.Nil(t, err)
|
||||
require.Nil(t, err)
|
||||
|
||||
match, err = s.HasAnyIP([]net.IP{inNet1, inNet2})
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, match)
|
||||
require.Nil(t, err)
|
||||
require.True(t, match)
|
||||
|
||||
match, err = s.HasAllIPs([]net.IP{inNet1, inNet2})
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, match)
|
||||
require.Nil(t, err)
|
||||
require.False(t, match)
|
||||
|
||||
err = s.RemoveNetwork(net2)
|
||||
assert.Nil(t, err)
|
||||
require.Nil(t, err)
|
||||
|
||||
match, err = s.HasAnyIP([]net.IP{inNet1, inNet2})
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, match)
|
||||
require.Nil(t, err)
|
||||
require.False(t, match)
|
||||
|
||||
match, err = s.HasAllIPs([]net.IP{inNet1, inNet2})
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, match)
|
||||
require.Nil(t, err)
|
||||
require.False(t, match)
|
||||
|
||||
errChan := s.Stop()
|
||||
err = <-errChan
|
||||
require.Nil(t, err, "IPStore shutdown must not fail")
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ func (d *peerStoreDriver) New(storecfg *store.DriverConfig) (store.PeerStore, er
|
|||
}
|
||||
return &peerStore{
|
||||
shards: shards,
|
||||
closed: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -72,6 +73,7 @@ type peerShard struct {
|
|||
|
||||
type peerStore struct {
|
||||
shards []*peerShard
|
||||
closed chan struct{}
|
||||
}
|
||||
|
||||
var _ store.PeerStore = &peerStore{}
|
||||
|
@ -100,6 +102,12 @@ func (s *peerStore) PutSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) error {
|
|||
shard.Lock()
|
||||
defer shard.Unlock()
|
||||
|
||||
select {
|
||||
case <-s.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
if shard.peers[key] == nil {
|
||||
shard.peers[key] = make(map[string]peer)
|
||||
}
|
||||
|
@ -118,6 +126,12 @@ func (s *peerStore) DeleteSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) erro
|
|||
shard.Lock()
|
||||
defer shard.Unlock()
|
||||
|
||||
select {
|
||||
case <-s.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
if shard.peers[key] == nil {
|
||||
return store.ErrResourceDoesNotExist
|
||||
}
|
||||
|
@ -143,6 +157,12 @@ func (s *peerStore) PutLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error
|
|||
shard.Lock()
|
||||
defer shard.Unlock()
|
||||
|
||||
select {
|
||||
case <-s.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
if shard.peers[key] == nil {
|
||||
shard.peers[key] = make(map[string]peer)
|
||||
}
|
||||
|
@ -161,6 +181,12 @@ func (s *peerStore) DeleteLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) err
|
|||
shard.Lock()
|
||||
defer shard.Unlock()
|
||||
|
||||
select {
|
||||
case <-s.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
if shard.peers[key] == nil {
|
||||
return store.ErrResourceDoesNotExist
|
||||
}
|
||||
|
@ -187,6 +213,12 @@ func (s *peerStore) GraduateLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) e
|
|||
shard.Lock()
|
||||
defer shard.Unlock()
|
||||
|
||||
select {
|
||||
case <-s.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
if shard.peers[lkey] != nil {
|
||||
delete(shard.peers[lkey], peerKey(p))
|
||||
}
|
||||
|
@ -243,6 +275,12 @@ func (s *peerStore) AnnouncePeers(infoHash chihaya.InfoHash, seeder bool, numWan
|
|||
shard.RLock()
|
||||
defer shard.RUnlock()
|
||||
|
||||
select {
|
||||
case <-s.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
if seeder {
|
||||
// Append leechers as possible.
|
||||
leechers := shard.peers[lkey]
|
||||
|
@ -307,6 +345,12 @@ func (s *peerStore) GetSeeders(infoHash chihaya.InfoHash) (peers, peers6 []chiha
|
|||
shard.RLock()
|
||||
defer shard.RUnlock()
|
||||
|
||||
select {
|
||||
case <-s.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
seeders := shard.peers[key]
|
||||
for _, p := range seeders {
|
||||
if p.IP.To4() == nil {
|
||||
|
@ -324,6 +368,12 @@ func (s *peerStore) GetLeechers(infoHash chihaya.InfoHash) (peers, peers6 []chih
|
|||
shard.RLock()
|
||||
defer shard.RUnlock()
|
||||
|
||||
select {
|
||||
case <-s.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
leechers := shard.peers[key]
|
||||
for _, p := range leechers {
|
||||
if p.IP.To4() == nil {
|
||||
|
@ -341,6 +391,12 @@ func (s *peerStore) NumSeeders(infoHash chihaya.InfoHash) int {
|
|||
shard.RLock()
|
||||
defer shard.RUnlock()
|
||||
|
||||
select {
|
||||
case <-s.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
return len(shard.peers[key])
|
||||
}
|
||||
|
||||
|
@ -350,5 +406,33 @@ func (s *peerStore) NumLeechers(infoHash chihaya.InfoHash) int {
|
|||
shard.RLock()
|
||||
defer shard.RUnlock()
|
||||
|
||||
select {
|
||||
case <-s.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
return len(shard.peers[key])
|
||||
}
|
||||
|
||||
func (s *peerStore) Stop() <-chan error {
|
||||
toReturn := make(chan error)
|
||||
go func() {
|
||||
oldshards := s.shards
|
||||
for _, shard := range oldshards {
|
||||
shard.Lock()
|
||||
}
|
||||
shards := make([]*peerShard, len(oldshards))
|
||||
for i := 0; i < len(oldshards); i++ {
|
||||
shards[i] = &peerShard{}
|
||||
shards[i].peers = make(map[string]map[string]peer)
|
||||
}
|
||||
s.shards = shards
|
||||
close(s.closed)
|
||||
for _, shard := range oldshards {
|
||||
shard.Unlock()
|
||||
}
|
||||
close(toReturn)
|
||||
}()
|
||||
return toReturn
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
|
||||
"github.com/chihaya/chihaya"
|
||||
"github.com/chihaya/chihaya/server/store"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func peerInSlice(peer chihaya.Peer, peers []chihaya.Peer) bool {
|
||||
|
@ -56,8 +56,8 @@ func TestPeerStoreAPI(t *testing.T) {
|
|||
d = &peerStoreDriver{}
|
||||
)
|
||||
s, err := d.New(&config)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, s)
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, s)
|
||||
|
||||
for _, p := range peers {
|
||||
// Construct chihaya.Peer from test data.
|
||||
|
@ -72,22 +72,22 @@ func TestPeerStoreAPI(t *testing.T) {
|
|||
} else {
|
||||
err = s.PutLeecher(hash, peer)
|
||||
}
|
||||
assert.Nil(t, err)
|
||||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
leechers1, leechers61, err := s.GetLeechers(hash)
|
||||
assert.Nil(t, err)
|
||||
assert.NotEmpty(t, leechers1)
|
||||
assert.NotEmpty(t, leechers61)
|
||||
require.Nil(t, err)
|
||||
require.NotEmpty(t, leechers1)
|
||||
require.NotEmpty(t, leechers61)
|
||||
num := s.NumLeechers(hash)
|
||||
assert.Equal(t, len(leechers1)+len(leechers61), num)
|
||||
require.Equal(t, len(leechers1)+len(leechers61), num)
|
||||
|
||||
seeders1, seeders61, err := s.GetSeeders(hash)
|
||||
assert.Nil(t, err)
|
||||
assert.NotEmpty(t, seeders1)
|
||||
assert.NotEmpty(t, seeders61)
|
||||
require.Nil(t, err)
|
||||
require.NotEmpty(t, seeders1)
|
||||
require.NotEmpty(t, seeders61)
|
||||
num = s.NumSeeders(hash)
|
||||
assert.Equal(t, len(seeders1)+len(seeders61), num)
|
||||
require.Equal(t, len(seeders1)+len(seeders61), num)
|
||||
|
||||
leechers := append(leechers1, leechers61...)
|
||||
seeders := append(seeders1, seeders61...)
|
||||
|
@ -101,9 +101,9 @@ func TestPeerStoreAPI(t *testing.T) {
|
|||
}
|
||||
|
||||
if p.seeder {
|
||||
assert.True(t, peerInSlice(peer, seeders))
|
||||
require.True(t, peerInSlice(peer, seeders))
|
||||
} else {
|
||||
assert.True(t, peerInSlice(peer, leechers))
|
||||
require.True(t, peerInSlice(peer, leechers))
|
||||
}
|
||||
|
||||
if p.seeder {
|
||||
|
@ -111,11 +111,11 @@ func TestPeerStoreAPI(t *testing.T) {
|
|||
} else {
|
||||
err = s.DeleteLeecher(hash, peer)
|
||||
}
|
||||
assert.Nil(t, err)
|
||||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
assert.Zero(t, s.NumLeechers(hash))
|
||||
assert.Zero(t, s.NumSeeders(hash))
|
||||
require.Zero(t, s.NumLeechers(hash))
|
||||
require.Zero(t, s.NumSeeders(hash))
|
||||
|
||||
// Re-add all the peers to the peerStore.
|
||||
for _, p := range peers {
|
||||
|
@ -133,27 +133,31 @@ func TestPeerStoreAPI(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check that there are 6 seeders, and 4 leechers.
|
||||
assert.Equal(t, 6, s.NumSeeders(hash))
|
||||
assert.Equal(t, 4, s.NumLeechers(hash))
|
||||
require.Equal(t, 6, s.NumSeeders(hash))
|
||||
require.Equal(t, 4, s.NumLeechers(hash))
|
||||
peer := chihaya.Peer{
|
||||
ID: chihaya.PeerIDFromString(peers[0].peerID),
|
||||
IP: net.ParseIP(peers[0].ip),
|
||||
Port: peers[0].port,
|
||||
}
|
||||
err = s.GraduateLeecher(hash, peer)
|
||||
assert.Nil(t, err)
|
||||
require.Nil(t, err)
|
||||
// Check that there are 7 seeders, and 3 leechers after graduating a
|
||||
// leecher to a seeder.
|
||||
assert.Equal(t, 7, s.NumSeeders(hash))
|
||||
assert.Equal(t, 3, s.NumLeechers(hash))
|
||||
require.Equal(t, 7, s.NumSeeders(hash))
|
||||
require.Equal(t, 3, s.NumLeechers(hash))
|
||||
|
||||
peers1, peers61, err := s.AnnouncePeers(hash, true, 5, peer, chihaya.Peer{})
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, peers1)
|
||||
assert.NotNil(t, peers61)
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, peers1)
|
||||
require.NotNil(t, peers61)
|
||||
|
||||
err = s.CollectGarbage(time.Now())
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, s.NumLeechers(hash), 0)
|
||||
assert.Equal(t, s.NumSeeders(hash), 0)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, s.NumLeechers(hash), 0)
|
||||
require.Equal(t, s.NumSeeders(hash), 0)
|
||||
|
||||
errChan := s.Stop()
|
||||
err = <-errChan
|
||||
require.Nil(t, err, "PeerStore shutdown must not fail")
|
||||
}
|
||||
|
|
|
@ -19,11 +19,13 @@ type stringStoreDriver struct{}
|
|||
func (d *stringStoreDriver) New(_ *store.DriverConfig) (store.StringStore, error) {
|
||||
return &stringStore{
|
||||
strings: make(map[string]struct{}),
|
||||
closed: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
type stringStore struct {
|
||||
strings map[string]struct{}
|
||||
closed chan struct{}
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
|
@ -33,6 +35,12 @@ func (ss *stringStore) PutString(s string) error {
|
|||
ss.Lock()
|
||||
defer ss.Unlock()
|
||||
|
||||
select {
|
||||
case <-ss.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
ss.strings[s] = struct{}{}
|
||||
|
||||
return nil
|
||||
|
@ -42,6 +50,12 @@ func (ss *stringStore) HasString(s string) (bool, error) {
|
|||
ss.RLock()
|
||||
defer ss.RUnlock()
|
||||
|
||||
select {
|
||||
case <-ss.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
_, ok := ss.strings[s]
|
||||
|
||||
return ok, nil
|
||||
|
@ -51,6 +65,12 @@ func (ss *stringStore) RemoveString(s string) error {
|
|||
ss.Lock()
|
||||
defer ss.Unlock()
|
||||
|
||||
select {
|
||||
case <-ss.closed:
|
||||
panic("attempted to interact with stopped store")
|
||||
default:
|
||||
}
|
||||
|
||||
if _, ok := ss.strings[s]; !ok {
|
||||
return store.ErrResourceDoesNotExist
|
||||
}
|
||||
|
@ -59,3 +79,15 @@ func (ss *stringStore) RemoveString(s string) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ss *stringStore) Stop() <-chan error {
|
||||
toReturn := make(chan error)
|
||||
go func() {
|
||||
ss.Lock()
|
||||
defer ss.Unlock()
|
||||
ss.strings = make(map[string]struct{})
|
||||
close(ss.closed)
|
||||
close(toReturn)
|
||||
}()
|
||||
return toReturn
|
||||
}
|
||||
|
|
|
@ -7,69 +7,14 @@ package memory
|
|||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/chihaya/chihaya/server/store"
|
||||
)
|
||||
|
||||
var (
|
||||
driver = &stringStoreDriver{}
|
||||
s1 = "abc"
|
||||
s2 = "def"
|
||||
driver = &stringStoreDriver{}
|
||||
stringStoreTester = store.PrepareStringStoreTester(driver)
|
||||
)
|
||||
|
||||
func TestStringStore(t *testing.T) {
|
||||
ss, err := driver.New(&store.DriverConfig{})
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, ss)
|
||||
|
||||
has, err := ss.HasString(s1)
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, has)
|
||||
|
||||
has, err = ss.HasString(s2)
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, has)
|
||||
|
||||
err = ss.RemoveString(s1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
err = ss.PutString(s1)
|
||||
assert.Nil(t, err)
|
||||
|
||||
has, err = ss.HasString(s1)
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, has)
|
||||
|
||||
has, err = ss.HasString(s2)
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, has)
|
||||
|
||||
err = ss.PutString(s1)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = ss.PutString(s2)
|
||||
assert.Nil(t, err)
|
||||
|
||||
has, err = ss.HasString(s1)
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, has)
|
||||
|
||||
has, err = ss.HasString(s2)
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, has)
|
||||
|
||||
err = ss.RemoveString(s1)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = ss.RemoveString(s2)
|
||||
assert.Nil(t, err)
|
||||
|
||||
has, err = ss.HasString(s1)
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, has)
|
||||
|
||||
has, err = ss.HasString(s2)
|
||||
assert.Nil(t, err)
|
||||
assert.False(t, has)
|
||||
stringStoreTester.TestStringStore(t, &store.DriverConfig{})
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ func init() {
|
|||
tracker.RegisterAnnounceMiddleware("infohash_blacklist", blacklistAnnounceInfohash)
|
||||
tracker.RegisterScrapeMiddlewareConstructor("infohash_blacklist", blacklistScrapeInfohash)
|
||||
mustGetStore = func() store.StringStore {
|
||||
return store.MustGetStore()
|
||||
return store.MustGetStore().StringStore
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/chihaya/chihaya"
|
||||
"github.com/chihaya/chihaya/pkg/stopper"
|
||||
"github.com/chihaya/chihaya/server/store"
|
||||
"github.com/chihaya/chihaya/tracker"
|
||||
)
|
||||
|
@ -36,6 +37,10 @@ func (ss *storeMock) RemoveString(s string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (ss *storeMock) Stop() <-chan error {
|
||||
return stopper.AlreadyStopped
|
||||
}
|
||||
|
||||
var mock store.StringStore = &storeMock{
|
||||
strings: make(map[string]struct{}),
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/chihaya/chihaya"
|
||||
"github.com/chihaya/chihaya/pkg/stopper"
|
||||
)
|
||||
|
||||
var peerStoreDrivers = make(map[string]PeerStoreDriver)
|
||||
|
@ -61,6 +62,12 @@ type PeerStore interface {
|
|||
NumSeeders(infoHash chihaya.InfoHash) int
|
||||
// NumLeechers gets the amount of leechers for a particular infoHash.
|
||||
NumLeechers(infoHash chihaya.InfoHash) int
|
||||
|
||||
// Stopper provides the Stop method that stops the PeerStore.
|
||||
// Stop should shut down the PeerStore in a separate goroutine and send
|
||||
// an error to the channel if the shutdown failed. If the shutdown
|
||||
// was successful, the channel is to be closed.
|
||||
stopper.Stopper
|
||||
}
|
||||
|
||||
// PeerStoreDriver represents an interface for creating a handle to the storage
|
||||
|
|
|
@ -7,12 +7,12 @@ package store
|
|||
import (
|
||||
"errors"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gopkg.in/yaml.v2"
|
||||
|
||||
"github.com/chihaya/chihaya"
|
||||
"github.com/chihaya/chihaya/pkg/stopper"
|
||||
"github.com/chihaya/chihaya/server"
|
||||
"github.com/chihaya/chihaya/tracker"
|
||||
)
|
||||
|
@ -34,29 +34,34 @@ func constructor(srvcfg *chihaya.ServerConfig, tkr *tracker.Tracker) (server.Ser
|
|||
return nil, errors.New("store: invalid store config: " + err.Error())
|
||||
}
|
||||
|
||||
theStore = &Store{
|
||||
cfg: cfg,
|
||||
tkr: tkr,
|
||||
shutdown: make(chan struct{}),
|
||||
sg: stopper.NewStopGroup(),
|
||||
}
|
||||
|
||||
ps, err := OpenPeerStore(&cfg.PeerStore)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
theStore.sg.Add(ps)
|
||||
|
||||
ips, err := OpenIPStore(&cfg.IPStore)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
theStore.sg.Add(ips)
|
||||
|
||||
ss, err := OpenStringStore(&cfg.StringStore)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
theStore.sg.Add(ss)
|
||||
|
||||
theStore = &Store{
|
||||
cfg: cfg,
|
||||
tkr: tkr,
|
||||
shutdown: make(chan struct{}),
|
||||
PeerStore: ps,
|
||||
IPStore: ips,
|
||||
StringStore: ss,
|
||||
}
|
||||
theStore.PeerStore = ps
|
||||
theStore.IPStore = ips
|
||||
theStore.StringStore = ss
|
||||
}
|
||||
return theStore, nil
|
||||
}
|
||||
|
@ -110,7 +115,7 @@ type Store struct {
|
|||
cfg *Config
|
||||
tkr *tracker.Tracker
|
||||
shutdown chan struct{}
|
||||
wg sync.WaitGroup
|
||||
sg *stopper.StopGroup
|
||||
|
||||
PeerStore
|
||||
IPStore
|
||||
|
@ -120,12 +125,18 @@ type Store struct {
|
|||
// Start starts the store drivers and blocks until all of them exit.
|
||||
func (s *Store) Start() {
|
||||
<-s.shutdown
|
||||
s.wg.Wait()
|
||||
log.Println("Store server shut down cleanly")
|
||||
}
|
||||
|
||||
// Stop stops the store drivers and waits for them to exit.
|
||||
func (s *Store) Stop() {
|
||||
errors := s.sg.Stop()
|
||||
if len(errors) == 0 {
|
||||
log.Println("Store server shut down cleanly")
|
||||
} else {
|
||||
log.Println("Store server: failed to shutdown drivers")
|
||||
for _, err := range errors {
|
||||
log.Println(err.Error())
|
||||
}
|
||||
}
|
||||
close(s.shutdown)
|
||||
s.wg.Wait()
|
||||
}
|
||||
|
|
95
server/store/store_tests.go
Normal file
95
server/store/store_tests.go
Normal file
|
@ -0,0 +1,95 @@
|
|||
// Copyright 2016 The Chihaya Authors. All rights reserved.
|
||||
// Use of this source code is governed by the BSD 2-Clause license,
|
||||
// which can be found in the LICENSE file.
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// StringStoreTester is a collection of tests for a StringStore driver.
|
||||
// Every benchmark expects a new, clean storage. Every benchmark should be
|
||||
// called with a DriverConfig that ensures this.
|
||||
type StringStoreTester interface {
|
||||
TestStringStore(*testing.T, *DriverConfig)
|
||||
}
|
||||
|
||||
var _ StringStoreTester = &stringStoreTester{}
|
||||
|
||||
type stringStoreTester struct {
|
||||
s1, s2 string
|
||||
driver StringStoreDriver
|
||||
}
|
||||
|
||||
// PrepareStringStoreTester prepares a reusable suite for StringStore driver
|
||||
// tests.
|
||||
func PrepareStringStoreTester(driver StringStoreDriver) StringStoreTester {
|
||||
return &stringStoreTester{
|
||||
s1: "abc",
|
||||
s2: "def",
|
||||
driver: driver,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *stringStoreTester) TestStringStore(t *testing.T, cfg *DriverConfig) {
|
||||
ss, err := s.driver.New(cfg)
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, ss)
|
||||
|
||||
has, err := ss.HasString(s.s1)
|
||||
require.Nil(t, err)
|
||||
require.False(t, has)
|
||||
|
||||
has, err = ss.HasString(s.s2)
|
||||
require.Nil(t, err)
|
||||
require.False(t, has)
|
||||
|
||||
err = ss.RemoveString(s.s1)
|
||||
require.NotNil(t, err)
|
||||
|
||||
err = ss.PutString(s.s1)
|
||||
require.Nil(t, err)
|
||||
|
||||
has, err = ss.HasString(s.s1)
|
||||
require.Nil(t, err)
|
||||
require.True(t, has)
|
||||
|
||||
has, err = ss.HasString(s.s2)
|
||||
require.Nil(t, err)
|
||||
require.False(t, has)
|
||||
|
||||
err = ss.PutString(s.s1)
|
||||
require.Nil(t, err)
|
||||
|
||||
err = ss.PutString(s.s2)
|
||||
require.Nil(t, err)
|
||||
|
||||
has, err = ss.HasString(s.s1)
|
||||
require.Nil(t, err)
|
||||
require.True(t, has)
|
||||
|
||||
has, err = ss.HasString(s.s2)
|
||||
require.Nil(t, err)
|
||||
require.True(t, has)
|
||||
|
||||
err = ss.RemoveString(s.s1)
|
||||
require.Nil(t, err)
|
||||
|
||||
err = ss.RemoveString(s.s2)
|
||||
require.Nil(t, err)
|
||||
|
||||
has, err = ss.HasString(s.s1)
|
||||
require.Nil(t, err)
|
||||
require.False(t, has)
|
||||
|
||||
has, err = ss.HasString(s.s2)
|
||||
require.Nil(t, err)
|
||||
require.False(t, has)
|
||||
|
||||
errChan := ss.Stop()
|
||||
err = <-errChan
|
||||
require.Nil(t, err, "StringStore shutdown must not fail")
|
||||
}
|
|
@ -4,7 +4,11 @@
|
|||
|
||||
package store
|
||||
|
||||
import "fmt"
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/chihaya/chihaya/pkg/stopper"
|
||||
)
|
||||
|
||||
var stringStoreDrivers = make(map[string]StringStoreDriver)
|
||||
|
||||
|
@ -21,6 +25,12 @@ type StringStore interface {
|
|||
// Returns ErrResourceDoesNotExist if the given string is not contained
|
||||
// in the store.
|
||||
RemoveString(s string) error
|
||||
|
||||
// Stopper provides the Stop method that stops the StringStore.
|
||||
// Stop should shut down the StringStore in a separate goroutine and send
|
||||
// an error to the channel if the shutdown failed. If the shutdown
|
||||
// was successful, the channel is to be closed.
|
||||
stopper.Stopper
|
||||
}
|
||||
|
||||
// StringStoreDriver represents an interface for creating a handle to the
|
||||
|
|
Loading…
Add table
Reference in a new issue