Merge pull request #185 from mrd0ll4r/memory-optimization
memory: optimize peer store memory footprint and performance
This commit is contained in:
commit
6a47110c76
1 changed files with 177 additions and 154 deletions
|
@ -5,7 +5,9 @@
|
||||||
package memory
|
package memory
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"hash/fnv"
|
"encoding/binary"
|
||||||
|
"log"
|
||||||
|
"net"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -31,7 +33,7 @@ func (d *peerStoreDriver) New(storecfg *store.DriverConfig) (store.PeerStore, er
|
||||||
shards := make([]*peerShard, cfg.Shards)
|
shards := make([]*peerShard, cfg.Shards)
|
||||||
for i := 0; i < cfg.Shards; i++ {
|
for i := 0; i < cfg.Shards; i++ {
|
||||||
shards[i] = &peerShard{}
|
shards[i] = &peerShard{}
|
||||||
shards[i].peers = make(map[string]map[string]peer)
|
shards[i].swarms = make(map[chihaya.InfoHash]*swarm)
|
||||||
}
|
}
|
||||||
return &peerStore{
|
return &peerStore{
|
||||||
shards: shards,
|
shards: shards,
|
||||||
|
@ -61,16 +63,19 @@ func newPeerStoreConfig(storecfg *store.DriverConfig) (*peerStoreConfig, error)
|
||||||
return &cfg, nil
|
return &cfg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type peer struct {
|
type serializedPeer string
|
||||||
chihaya.Peer
|
|
||||||
LastAction time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
type peerShard struct {
|
type peerShard struct {
|
||||||
peers map[string]map[string]peer
|
swarms map[chihaya.InfoHash]*swarm
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type swarm struct {
|
||||||
|
// map serialized peer to mtime
|
||||||
|
seeders map[serializedPeer]int64
|
||||||
|
leechers map[serializedPeer]int64
|
||||||
|
}
|
||||||
|
|
||||||
type peerStore struct {
|
type peerStore struct {
|
||||||
shards []*peerShard
|
shards []*peerShard
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
|
@ -79,183 +84,193 @@ type peerStore struct {
|
||||||
var _ store.PeerStore = &peerStore{}
|
var _ store.PeerStore = &peerStore{}
|
||||||
|
|
||||||
func (s *peerStore) shardIndex(infoHash chihaya.InfoHash) uint32 {
|
func (s *peerStore) shardIndex(infoHash chihaya.InfoHash) uint32 {
|
||||||
idx := fnv.New32()
|
return binary.BigEndian.Uint32(infoHash[:4]) % uint32(len(s.shards))
|
||||||
idx.Write(infoHash[:])
|
|
||||||
return idx.Sum32() % uint32(len(s.shards))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func peerKey(p chihaya.Peer) string {
|
func peerKey(p chihaya.Peer) serializedPeer {
|
||||||
return string(p.IP) + string(p.ID[:])
|
b := make([]byte, 20+2+len(p.IP))
|
||||||
|
copy(b[:20], p.ID[:])
|
||||||
|
binary.BigEndian.PutUint16(b[20:22], p.Port)
|
||||||
|
copy(b[22:], p.IP)
|
||||||
|
|
||||||
|
return serializedPeer(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
func seedersKey(infoHash chihaya.InfoHash) string {
|
func decodePeerKey(pk serializedPeer) chihaya.Peer {
|
||||||
return string(infoHash[:]) + "-s"
|
return chihaya.Peer{
|
||||||
}
|
ID: chihaya.PeerIDFromString(string(pk[:20])),
|
||||||
|
Port: binary.BigEndian.Uint16([]byte(pk[20:22])),
|
||||||
func leechersKey(infoHash chihaya.InfoHash) string {
|
IP: net.IP(pk[22:]),
|
||||||
return string(infoHash[:]) + "-l"
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *peerStore) PutSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) error {
|
func (s *peerStore) PutSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) error {
|
||||||
key := seedersKey(infoHash)
|
|
||||||
shard := s.shards[s.shardIndex(infoHash)]
|
|
||||||
shard.Lock()
|
|
||||||
defer shard.Unlock()
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-s.closed:
|
case <-s.closed:
|
||||||
panic("attempted to interact with stopped store")
|
panic("attempted to interact with stopped store")
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
if shard.peers[key] == nil {
|
shard := s.shards[s.shardIndex(infoHash)]
|
||||||
shard.peers[key] = make(map[string]peer)
|
shard.Lock()
|
||||||
|
defer shard.Unlock()
|
||||||
|
|
||||||
|
if shard.swarms[infoHash] == nil {
|
||||||
|
shard.swarms[infoHash] = &swarm{
|
||||||
|
seeders: make(map[serializedPeer]int64),
|
||||||
|
leechers: make(map[serializedPeer]int64),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
shard.peers[key][peerKey(p)] = peer{
|
shard.swarms[infoHash].seeders[peerKey(p)] = time.Now().UnixNano()
|
||||||
Peer: p,
|
|
||||||
LastAction: time.Now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *peerStore) DeleteSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) error {
|
func (s *peerStore) DeleteSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) error {
|
||||||
key := seedersKey(infoHash)
|
|
||||||
shard := s.shards[s.shardIndex(infoHash)]
|
|
||||||
shard.Lock()
|
|
||||||
defer shard.Unlock()
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-s.closed:
|
case <-s.closed:
|
||||||
panic("attempted to interact with stopped store")
|
panic("attempted to interact with stopped store")
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
if shard.peers[key] == nil {
|
shard := s.shards[s.shardIndex(infoHash)]
|
||||||
return store.ErrResourceDoesNotExist
|
|
||||||
}
|
|
||||||
|
|
||||||
pk := peerKey(p)
|
pk := peerKey(p)
|
||||||
|
shard.Lock()
|
||||||
|
defer shard.Unlock()
|
||||||
|
|
||||||
if _, ok := shard.peers[key][pk]; !ok {
|
if shard.swarms[infoHash] == nil {
|
||||||
return store.ErrResourceDoesNotExist
|
return store.ErrResourceDoesNotExist
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(shard.peers[key], pk)
|
if _, ok := shard.swarms[infoHash].seeders[pk]; !ok {
|
||||||
|
return store.ErrResourceDoesNotExist
|
||||||
|
}
|
||||||
|
|
||||||
if len(shard.peers[key]) == 0 {
|
delete(shard.swarms[infoHash].seeders, pk)
|
||||||
delete(shard.peers, key)
|
|
||||||
|
if len(shard.swarms[infoHash].seeders)|len(shard.swarms[infoHash].leechers) == 0 {
|
||||||
|
delete(shard.swarms, infoHash)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *peerStore) PutLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error {
|
func (s *peerStore) PutLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error {
|
||||||
key := leechersKey(infoHash)
|
|
||||||
shard := s.shards[s.shardIndex(infoHash)]
|
|
||||||
shard.Lock()
|
|
||||||
defer shard.Unlock()
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-s.closed:
|
case <-s.closed:
|
||||||
panic("attempted to interact with stopped store")
|
panic("attempted to interact with stopped store")
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
if shard.peers[key] == nil {
|
shard := s.shards[s.shardIndex(infoHash)]
|
||||||
shard.peers[key] = make(map[string]peer)
|
shard.Lock()
|
||||||
|
defer shard.Unlock()
|
||||||
|
|
||||||
|
if shard.swarms[infoHash] == nil {
|
||||||
|
shard.swarms[infoHash] = &swarm{
|
||||||
|
seeders: make(map[serializedPeer]int64),
|
||||||
|
leechers: make(map[serializedPeer]int64),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
shard.peers[key][peerKey(p)] = peer{
|
shard.swarms[infoHash].leechers[peerKey(p)] = time.Now().UnixNano()
|
||||||
Peer: p,
|
|
||||||
LastAction: time.Now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *peerStore) DeleteLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error {
|
func (s *peerStore) DeleteLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error {
|
||||||
key := leechersKey(infoHash)
|
|
||||||
shard := s.shards[s.shardIndex(infoHash)]
|
|
||||||
shard.Lock()
|
|
||||||
defer shard.Unlock()
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-s.closed:
|
case <-s.closed:
|
||||||
panic("attempted to interact with stopped store")
|
panic("attempted to interact with stopped store")
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
if shard.peers[key] == nil {
|
shard := s.shards[s.shardIndex(infoHash)]
|
||||||
return store.ErrResourceDoesNotExist
|
|
||||||
}
|
|
||||||
|
|
||||||
pk := peerKey(p)
|
pk := peerKey(p)
|
||||||
|
shard.Lock()
|
||||||
|
defer shard.Unlock()
|
||||||
|
|
||||||
if _, ok := shard.peers[key][pk]; !ok {
|
if shard.swarms[infoHash] == nil {
|
||||||
return store.ErrResourceDoesNotExist
|
return store.ErrResourceDoesNotExist
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(shard.peers[key], pk)
|
if _, ok := shard.swarms[infoHash].leechers[pk]; !ok {
|
||||||
|
return store.ErrResourceDoesNotExist
|
||||||
|
}
|
||||||
|
|
||||||
if len(shard.peers[key]) == 0 {
|
delete(shard.swarms[infoHash].leechers, pk)
|
||||||
delete(shard.peers, key)
|
|
||||||
|
if len(shard.swarms[infoHash].seeders)|len(shard.swarms[infoHash].leechers) == 0 {
|
||||||
|
delete(shard.swarms, infoHash)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *peerStore) GraduateLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error {
|
func (s *peerStore) GraduateLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error {
|
||||||
lkey := leechersKey(infoHash)
|
|
||||||
skey := seedersKey(infoHash)
|
|
||||||
shard := s.shards[s.shardIndex(infoHash)]
|
|
||||||
shard.Lock()
|
|
||||||
defer shard.Unlock()
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-s.closed:
|
case <-s.closed:
|
||||||
panic("attempted to interact with stopped store")
|
panic("attempted to interact with stopped store")
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
if shard.peers[lkey] != nil {
|
key := peerKey(p)
|
||||||
delete(shard.peers[lkey], peerKey(p))
|
shard := s.shards[s.shardIndex(infoHash)]
|
||||||
|
shard.Lock()
|
||||||
|
defer shard.Unlock()
|
||||||
|
|
||||||
|
if shard.swarms[infoHash] == nil {
|
||||||
|
shard.swarms[infoHash] = &swarm{
|
||||||
|
seeders: make(map[serializedPeer]int64),
|
||||||
|
leechers: make(map[serializedPeer]int64),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if shard.peers[skey] == nil {
|
if _, ok := shard.swarms[infoHash].leechers[key]; ok {
|
||||||
shard.peers[skey] = make(map[string]peer)
|
delete(shard.swarms[infoHash].leechers, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
shard.peers[skey][peerKey(p)] = peer{
|
shard.swarms[infoHash].seeders[key] = time.Now().UnixNano()
|
||||||
Peer: p,
|
|
||||||
LastAction: time.Now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *peerStore) CollectGarbage(cutoff time.Time) error {
|
func (s *peerStore) CollectGarbage(cutoff time.Time) error {
|
||||||
|
select {
|
||||||
|
case <-s.closed:
|
||||||
|
panic("attempted to interact with stopped store")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("memory: collecting garbage. Cutoff time: %s", cutoff.String())
|
||||||
|
cutoffUnix := cutoff.UnixNano()
|
||||||
for _, shard := range s.shards {
|
for _, shard := range s.shards {
|
||||||
shard.RLock()
|
shard.RLock()
|
||||||
var keys []string
|
var infohashes []chihaya.InfoHash
|
||||||
for key := range shard.peers {
|
for key := range shard.swarms {
|
||||||
keys = append(keys, key)
|
infohashes = append(infohashes, key)
|
||||||
}
|
}
|
||||||
shard.RUnlock()
|
shard.RUnlock()
|
||||||
runtime.Gosched()
|
runtime.Gosched()
|
||||||
|
|
||||||
for _, key := range keys {
|
for _, infohash := range infohashes {
|
||||||
shard.Lock()
|
shard.Lock()
|
||||||
|
|
||||||
for peerKey, p := range shard.peers[key] {
|
for peerKey, mtime := range shard.swarms[infohash].leechers {
|
||||||
if p.LastAction.Before(cutoff) {
|
if mtime <= cutoffUnix {
|
||||||
delete(shard.peers[key], peerKey)
|
delete(shard.swarms[infohash].leechers, peerKey)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(shard.peers[key]) == 0 {
|
for peerKey, mtime := range shard.swarms[infohash].seeders {
|
||||||
delete(shard.peers, key)
|
if mtime <= cutoffUnix {
|
||||||
|
delete(shard.swarms[infohash].seeders, peerKey)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(shard.swarms[infohash].seeders)|len(shard.swarms[infohash].leechers) == 0 {
|
||||||
|
delete(shard.swarms, infohash)
|
||||||
}
|
}
|
||||||
|
|
||||||
shard.Unlock()
|
shard.Unlock()
|
||||||
|
@ -269,67 +284,68 @@ func (s *peerStore) CollectGarbage(cutoff time.Time) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *peerStore) AnnouncePeers(infoHash chihaya.InfoHash, seeder bool, numWant int, peer4, peer6 chihaya.Peer) (peers, peers6 []chihaya.Peer, err error) {
|
func (s *peerStore) AnnouncePeers(infoHash chihaya.InfoHash, seeder bool, numWant int, peer4, peer6 chihaya.Peer) (peers, peers6 []chihaya.Peer, err error) {
|
||||||
lkey := leechersKey(infoHash)
|
|
||||||
skey := seedersKey(infoHash)
|
|
||||||
shard := s.shards[s.shardIndex(infoHash)]
|
|
||||||
shard.RLock()
|
|
||||||
defer shard.RUnlock()
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-s.closed:
|
case <-s.closed:
|
||||||
panic("attempted to interact with stopped store")
|
panic("attempted to interact with stopped store")
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
shard := s.shards[s.shardIndex(infoHash)]
|
||||||
|
shard.RLock()
|
||||||
|
defer shard.RUnlock()
|
||||||
|
|
||||||
if seeder {
|
if seeder {
|
||||||
// Append leechers as possible.
|
// Append leechers as possible.
|
||||||
leechers := shard.peers[lkey]
|
leechers := shard.swarms[infoHash].leechers
|
||||||
for _, p := range leechers {
|
for p := range leechers {
|
||||||
|
decodedPeer := decodePeerKey(p)
|
||||||
if numWant == 0 {
|
if numWant == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.IP.To4() == nil {
|
if decodedPeer.IP.To4() == nil {
|
||||||
peers6 = append(peers6, p.Peer)
|
peers6 = append(peers6, decodedPeer)
|
||||||
} else {
|
} else {
|
||||||
peers = append(peers, p.Peer)
|
peers = append(peers, decodedPeer)
|
||||||
}
|
}
|
||||||
numWant--
|
numWant--
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Append as many seeders as possible.
|
// Append as many seeders as possible.
|
||||||
seeders := shard.peers[skey]
|
seeders := shard.swarms[infoHash].seeders
|
||||||
for _, p := range seeders {
|
for p := range seeders {
|
||||||
|
decodedPeer := decodePeerKey(p)
|
||||||
if numWant == 0 {
|
if numWant == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.IP.To4() == nil {
|
if decodedPeer.IP.To4() == nil {
|
||||||
peers6 = append(peers6, p.Peer)
|
peers6 = append(peers6, decodedPeer)
|
||||||
} else {
|
} else {
|
||||||
peers = append(peers, p.Peer)
|
peers = append(peers, decodedPeer)
|
||||||
}
|
}
|
||||||
numWant--
|
numWant--
|
||||||
}
|
}
|
||||||
|
|
||||||
// Append leechers until we reach numWant.
|
// Append leechers until we reach numWant.
|
||||||
leechers := shard.peers[lkey]
|
leechers := shard.swarms[infoHash].leechers
|
||||||
if numWant > 0 {
|
if numWant > 0 {
|
||||||
for _, p := range leechers {
|
for p := range leechers {
|
||||||
|
decodedPeer := decodePeerKey(p)
|
||||||
if numWant == 0 {
|
if numWant == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.IP.To4() == nil {
|
if decodedPeer.IP.To4() == nil {
|
||||||
if p.Equal(peer6) {
|
if decodedPeer.Equal(peer6) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
peers6 = append(peers6, p.Peer)
|
peers6 = append(peers6, decodedPeer)
|
||||||
} else {
|
} else {
|
||||||
if p.Equal(peer4) {
|
if decodedPeer.Equal(peer4) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
peers = append(peers, p.Peer)
|
peers = append(peers, decodedPeer)
|
||||||
}
|
}
|
||||||
numWant--
|
numWant--
|
||||||
}
|
}
|
||||||
|
@ -340,98 +356,105 @@ func (s *peerStore) AnnouncePeers(infoHash chihaya.InfoHash, seeder bool, numWan
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *peerStore) GetSeeders(infoHash chihaya.InfoHash) (peers, peers6 []chihaya.Peer, err error) {
|
func (s *peerStore) GetSeeders(infoHash chihaya.InfoHash) (peers, peers6 []chihaya.Peer, err error) {
|
||||||
key := seedersKey(infoHash)
|
|
||||||
shard := s.shards[s.shardIndex(infoHash)]
|
|
||||||
shard.RLock()
|
|
||||||
defer shard.RUnlock()
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-s.closed:
|
case <-s.closed:
|
||||||
panic("attempted to interact with stopped store")
|
panic("attempted to interact with stopped store")
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
seeders := shard.peers[key]
|
shard := s.shards[s.shardIndex(infoHash)]
|
||||||
for _, p := range seeders {
|
shard.RLock()
|
||||||
if p.IP.To4() == nil {
|
defer shard.RUnlock()
|
||||||
peers6 = append(peers6, p.Peer)
|
|
||||||
|
if shard.swarms[infoHash] == nil {
|
||||||
|
return nil, nil, store.ErrResourceDoesNotExist
|
||||||
|
}
|
||||||
|
|
||||||
|
seeders := shard.swarms[infoHash].seeders
|
||||||
|
for p := range seeders {
|
||||||
|
decodedPeer := decodePeerKey(p)
|
||||||
|
if decodedPeer.IP.To4() == nil {
|
||||||
|
peers6 = append(peers6, decodedPeer)
|
||||||
} else {
|
} else {
|
||||||
peers = append(peers, p.Peer)
|
peers = append(peers, decodedPeer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *peerStore) GetLeechers(infoHash chihaya.InfoHash) (peers, peers6 []chihaya.Peer, err error) {
|
func (s *peerStore) GetLeechers(infoHash chihaya.InfoHash) (peers, peers6 []chihaya.Peer, err error) {
|
||||||
key := leechersKey(infoHash)
|
|
||||||
shard := s.shards[s.shardIndex(infoHash)]
|
|
||||||
shard.RLock()
|
|
||||||
defer shard.RUnlock()
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-s.closed:
|
case <-s.closed:
|
||||||
panic("attempted to interact with stopped store")
|
panic("attempted to interact with stopped store")
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
leechers := shard.peers[key]
|
shard := s.shards[s.shardIndex(infoHash)]
|
||||||
for _, p := range leechers {
|
shard.RLock()
|
||||||
if p.IP.To4() == nil {
|
defer shard.RUnlock()
|
||||||
peers6 = append(peers6, p.Peer)
|
|
||||||
|
if shard.swarms[infoHash] == nil {
|
||||||
|
return nil, nil, store.ErrResourceDoesNotExist
|
||||||
|
}
|
||||||
|
|
||||||
|
seeders := shard.swarms[infoHash].leechers
|
||||||
|
for p := range seeders {
|
||||||
|
decodedPeer := decodePeerKey(p)
|
||||||
|
if decodedPeer.IP.To4() == nil {
|
||||||
|
peers6 = append(peers6, decodedPeer)
|
||||||
} else {
|
} else {
|
||||||
peers = append(peers, p.Peer)
|
peers = append(peers, decodedPeer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *peerStore) NumSeeders(infoHash chihaya.InfoHash) int {
|
func (s *peerStore) NumSeeders(infoHash chihaya.InfoHash) int {
|
||||||
key := seedersKey(infoHash)
|
|
||||||
shard := s.shards[s.shardIndex(infoHash)]
|
|
||||||
shard.RLock()
|
|
||||||
defer shard.RUnlock()
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-s.closed:
|
case <-s.closed:
|
||||||
panic("attempted to interact with stopped store")
|
panic("attempted to interact with stopped store")
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
return len(shard.peers[key])
|
shard := s.shards[s.shardIndex(infoHash)]
|
||||||
|
shard.RLock()
|
||||||
|
defer shard.RUnlock()
|
||||||
|
|
||||||
|
if shard.swarms[infoHash] == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return len(shard.swarms[infoHash].seeders)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *peerStore) NumLeechers(infoHash chihaya.InfoHash) int {
|
func (s *peerStore) NumLeechers(infoHash chihaya.InfoHash) int {
|
||||||
key := leechersKey(infoHash)
|
|
||||||
shard := s.shards[s.shardIndex(infoHash)]
|
|
||||||
shard.RLock()
|
|
||||||
defer shard.RUnlock()
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-s.closed:
|
case <-s.closed:
|
||||||
panic("attempted to interact with stopped store")
|
panic("attempted to interact with stopped store")
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
return len(shard.peers[key])
|
shard := s.shards[s.shardIndex(infoHash)]
|
||||||
|
shard.RLock()
|
||||||
|
defer shard.RUnlock()
|
||||||
|
|
||||||
|
if shard.swarms[infoHash] == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return len(shard.swarms[infoHash].leechers)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *peerStore) Stop() <-chan error {
|
func (s *peerStore) Stop() <-chan error {
|
||||||
toReturn := make(chan error)
|
toReturn := make(chan error)
|
||||||
go func() {
|
go func() {
|
||||||
oldshards := s.shards
|
shards := make([]*peerShard, len(s.shards))
|
||||||
for _, shard := range oldshards {
|
for i := 0; i < len(s.shards); i++ {
|
||||||
shard.Lock()
|
|
||||||
}
|
|
||||||
shards := make([]*peerShard, len(oldshards))
|
|
||||||
for i := 0; i < len(oldshards); i++ {
|
|
||||||
shards[i] = &peerShard{}
|
shards[i] = &peerShard{}
|
||||||
shards[i].peers = make(map[string]map[string]peer)
|
shards[i].swarms = make(map[chihaya.InfoHash]*swarm)
|
||||||
}
|
}
|
||||||
s.shards = shards
|
s.shards = shards
|
||||||
close(s.closed)
|
close(s.closed)
|
||||||
for _, shard := range oldshards {
|
|
||||||
shard.Unlock()
|
|
||||||
}
|
|
||||||
close(toReturn)
|
close(toReturn)
|
||||||
}()
|
}()
|
||||||
return toReturn
|
return toReturn
|
||||||
|
|
Loading…
Reference in a new issue