diff --git a/Gopkg.lock b/Gopkg.lock index f813090..224395f 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -152,7 +152,7 @@ "stopOnce", "util" ] - revision = "a656ad8a1456310d6ca712098826d02950e46e0b" + revision = "2a6ea528bdd66de4f3c707304e26f69dcf003909" [[projects]] branch = "master" @@ -172,6 +172,12 @@ packages = ["."] revision = "e2103e2c35297fb7e17febb81e49b312087a2372" +[[projects]] + branch = "master" + name = "github.com/sebdah/goldie" + packages = ["."] + revision = "8784dd1ab561dcf43d141f6678e9e41f3d0dff55" + [[projects]] branch = "master" name = "github.com/sirupsen/logrus" @@ -244,6 +250,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "b3d2f1578da4f4fca9a15d04df16cdb7f76f83c7916bab11fe1e50f65334a2c8" + inputs-digest = "bfa7ee41b88f515ef386012a9dd28e045f9108de93c1dcfa66d9361340cf7eec" solver-name = "gps-cdcl" solver-version = 1 diff --git a/cluster/cluster.go b/cluster/cluster.go index 9b8faf0..100e4c2 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -3,13 +3,84 @@ package cluster import ( "io/ioutil" baselog "log" + "os" + "os/signal" + "strconv" + "sync" + "syscall" + "github.com/davecgh/go-spew/spew" + "github.com/lbryio/lbry.go/crypto" "github.com/lbryio/lbry.go/errors" + "github.com/lbryio/reflector.go/cluster" "github.com/hashicorp/serf/serf" log "github.com/sirupsen/logrus" ) +type Cluster struct { + s *serf.Serf + eventCh <-chan serf.Event +} + +func New() { + c := &Cluster{} + var err error + + nodeName := crypto.RandString(12) + clusterAddr := "127.0.0.1:" + strconv.Itoa(clusterPort) + if args[0] == clusterStart { + c.s, c.eventCh, err = cluster.Connect(nodeName, clusterAddr, clusterPort) + } else { + c.s, c.eventCh, err = cluster.Connect(nodeName, clusterAddr, clusterPort+1+int(crypto.RandInt64(1000))) + } + if err != nil { + log.Fatal(err) + } + defer c.Leave() + + shutdownCh := make(chan struct{}) + var shutdownWg sync.WaitGroup + + shutdownWg.Add(1) + go func() { + defer shutdownWg.Done() + for { + select { + case event := <-eventCh: + spew.Dump(event) + switch event.EventType() { + case serf.EventMemberJoin, serf.EventMemberFailed, serf.EventMemberLeave: + memberEvent := event.(serf.MemberEvent) + if event.EventType() == serf.EventMemberJoin && len(memberEvent.Members) == 1 && memberEvent.Members[0].Name == nodeName { + // ignore event from my own joining of the cluster + } else { + //spew.Dump(c.Members()) + alive := getAliveMembers(c.Members()) + log.Printf("%s: my hash range is now %d of %d\n", nodeName, getHashRangeStart(nodeName, alive), len(alive)) + // figure out my new hash range based on the start and the number of alive members + // get hashes in that range that need announcing + // announce them + // if more than one node is announcing each hash, figure out how to deal with last_announced_at so both nodes dont announce the same thing at the same time + } + } + case <-shutdownCh: + log.Debugln("shutting down event dumper") + return + } + } + }() + + interruptChan := make(chan os.Signal, 1) + signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) + <-interruptChan + log.Debugln("received interrupt") + close(shutdownCh) + log.Debugln("waiting for threads to finish") + shutdownWg.Wait() + log.Debugln("shutting down main thread") +} + func Connect(nodeName, addr string, port int) (*serf.Serf, <-chan serf.Event, error) { conf := serf.DefaultConfig() conf.MemberlistConfig.BindPort = port diff --git a/cmd/upload.go b/cmd/upload.go index ad0197d..7c937c8 100644 --- a/cmd/upload.go +++ b/cmd/upload.go @@ -98,7 +98,7 @@ func uploadCmd(cmd *cobra.Command, args []string) { for { select { - case <-stopper.Chan(): + case <-stopper.Ch(): return case filename, ok := <-filenameChan: if !ok { @@ -113,7 +113,7 @@ func uploadCmd(cmd *cobra.Command, args []string) { log.Errorf("worker %d: filename does not match hash (%s != %s), skipping", i, filename, hash) select { case countChan <- errInc: - case <-stopper.Chan(): + case <-stopper.Ch(): } continue } @@ -123,14 +123,14 @@ func uploadCmd(cmd *cobra.Command, args []string) { blobStore.PutSD(hash, blob) select { case countChan <- sdInc: - case <-stopper.Chan(): + case <-stopper.Ch(): } } else { log.Printf("worker %d: putting %s", i, hash) blobStore.Put(hash, blob) select { case countChan <- blobInc: - case <-stopper.Chan(): + case <-stopper.Ch(): } } } @@ -143,7 +143,7 @@ func uploadCmd(cmd *cobra.Command, args []string) { defer counterWG.Done() for { select { - case <-stopper.Chan(): + case <-stopper.Ch(): return case countType, ok := <-countChan: if !ok { @@ -172,7 +172,7 @@ Upload: select { case filenameChan <- filename: - case <-stopper.Chan(): + case <-stopper.Ch(): log.Warnln("Caught interrupt, quitting at first opportunity...") break Upload } diff --git a/dht/bitmap.go b/dht/bitmap.go index 53774d9..aa09166 100644 --- a/dht/bitmap.go +++ b/dht/bitmap.go @@ -10,6 +10,8 @@ import ( "github.com/lyoshenka/bencode" ) +// TODO: http://roaringbitmap.org/ + type Bitmap [nodeIDLength]byte func (b Bitmap) RawString() string { diff --git a/dht/bootstrap.go b/dht/bootstrap.go index 898431a..34f431d 100644 --- a/dht/bootstrap.go +++ b/dht/bootstrap.go @@ -62,7 +62,7 @@ func (b *BootstrapNode) Connect(conn UDPConn) error { select { case <-t.C: b.check() - case <-b.stop.Chan(): + case <-b.stop.Ch(): return } } @@ -130,7 +130,7 @@ func (b *BootstrapNode) ping(c Contact) { select { case res = <-resCh: - case <-b.stop.Chan(): + case <-b.stop.Ch(): cancel() return } diff --git a/dht/dht.go b/dht/dht.go index afd9491..67615f4 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -81,8 +81,6 @@ type DHT struct { node *Node // stopper to shut down DHT stop *stopOnce.Stopper - // wait group for all the things that need to be stopped when DHT shuts down - stopWG *sync.WaitGroup // channel is closed when DHT joins network joined chan struct{} // lock for announced list @@ -107,7 +105,6 @@ func New(config *Config) (*DHT, error) { contact: contact, node: NewNode(contact.ID), stop: stopOnce.New(), - stopWG: &sync.WaitGroup{}, joined: make(chan struct{}), lock: &sync.RWMutex{}, announced: make(map[Bitmap]bool), @@ -143,6 +140,9 @@ func (dht *DHT) join() { if err != nil { log.Errorf("[%s] join: %s", dht.node.id.HexShort(), err.Error()) } + + // TODO: after joining, refresh all the buckets all buckets further away than our closest neighbor + // http://xlattice.sourceforge.net/components/protocol/kademlia/specs.html#join } // Start starts the dht @@ -176,7 +176,7 @@ func (dht *DHT) WaitUntilJoined() { func (dht *DHT) Shutdown() { log.Debugf("[%s] DHT shutting down", dht.node.id.HexShort()) dht.stop.Stop() - dht.stopWG.Wait() + dht.stop.Wait() dht.node.Shutdown() log.Debugf("[%s] DHT stopped", dht.node.id.HexShort()) } @@ -244,12 +244,12 @@ func (dht *DHT) Announce(hash Bitmap) error { } func (dht *DHT) startReannouncer() { - dht.stopWG.Add(1) - defer dht.stopWG.Done() + dht.stop.Add(1) + defer dht.stop.Done() tick := time.NewTicker(tReannounce) for { select { - case <-dht.stop.Chan(): + case <-dht.stop.Ch(): return case <-tick.C: dht.lock.RLock() @@ -262,8 +262,8 @@ func (dht *DHT) startReannouncer() { } func (dht *DHT) storeOnNode(hash Bitmap, c Contact) { - dht.stopWG.Add(1) - defer dht.stopWG.Done() + dht.stop.Add(1) + defer dht.stop.Done() // self-store if dht.contact.Equals(c) { @@ -280,7 +280,7 @@ func (dht *DHT) storeOnNode(hash Bitmap, c Contact) { select { case res = <-resCh: - case <-dht.stop.Chan(): + case <-dht.stop.Ch(): cancel() return } @@ -304,7 +304,7 @@ func (dht *DHT) storeOnNode(hash Bitmap, c Contact) { go func() { select { case <-resCh: - case <-dht.stop.Chan(): + case <-dht.stop.Ch(): cancel() } }() diff --git a/dht/message.go b/dht/message.go index 7191bbe..bf43c97 100644 --- a/dht/message.go +++ b/dht/message.go @@ -154,8 +154,8 @@ type storeArgsValue struct { type storeArgs struct { BlobHash Bitmap Value storeArgsValue - NodeID Bitmap - SelfStore bool // this is an int on the wire + NodeID Bitmap // original publisher id? I think this is getting fixed in the new dht stuff + SelfStore bool // this is an int on the wire } func (s storeArgs) MarshalBencode() ([]byte, error) { diff --git a/dht/node.go b/dht/node.go index 1ad8fd8..f60578a 100644 --- a/dht/node.go +++ b/dht/node.go @@ -87,7 +87,7 @@ func (n *Node) Connect(conn UDPConn) error { // dht.PrintState() // select { // case <-t.C: - // case <-dht.stop.Chan(): + // case <-dht.stop.Ch(): // return // } // } @@ -106,7 +106,7 @@ func (n *Node) Connect(conn UDPConn) error { for { select { - case <-n.stop.Chan(): + case <-n.stop.Ch(): return default: } @@ -128,7 +128,7 @@ func (n *Node) Connect(conn UDPConn) error { select { // needs select here because packet consumer can quit and the packets channel gets filled up and blocks case packets <- packet{data: data, raddr: raddr}: - case <-n.stop.Chan(): + case <-n.stop.Ch(): } } }() @@ -143,7 +143,7 @@ func (n *Node) Connect(conn UDPConn) error { select { case pkt = <-packets: n.handlePacket(pkt) - case <-n.stop.Chan(): + case <-n.stop.Ch(): return } } @@ -434,8 +434,8 @@ func (n *Node) startRoutingTableGrooming() { for { select { case <-refreshTicker.C: - RoutingTableRefresh(n, tRefresh, n.stop.Chan()) - case <-n.stop.Chan(): + RoutingTableRefresh(n, tRefresh, n.stop.Ch()) + case <-n.stop.Ch(): return } } diff --git a/dht/node_finder.go b/dht/node_finder.go index 3ecdfe5..af8910e 100644 --- a/dht/node_finder.go +++ b/dht/node_finder.go @@ -19,8 +19,7 @@ type contactFinder struct { target Bitmap node *Node - done *stopOnce.Stopper - doneWG *sync.WaitGroup + stop *stopOnce.Stopper findValueMutex *sync.Mutex findValueResult []Contact @@ -50,15 +49,14 @@ func newContactFinder(node *Node, target Bitmap, findValue bool) *contactFinder activeContactsMutex: &sync.Mutex{}, shortlistMutex: &sync.Mutex{}, shortlistAdded: make(map[Bitmap]bool), - done: stopOnce.New(), - doneWG: &sync.WaitGroup{}, + stop: stopOnce.New(), outstandingRequestsMutex: &sync.RWMutex{}, } } func (cf *contactFinder) Cancel() { - cf.done.Stop() - cf.doneWG.Wait() + cf.stop.Stop() + cf.stop.Wait() } func (cf *contactFinder) Find() (findNodeResponse, error) { @@ -73,14 +71,14 @@ func (cf *contactFinder) Find() (findNodeResponse, error) { } for i := 0; i < alpha; i++ { - cf.doneWG.Add(1) + cf.stop.Add(1) go func(i int) { - defer cf.doneWG.Done() + defer cf.stop.Done() cf.iterationWorker(i + 1) }(i) } - cf.doneWG.Wait() + cf.stop.Wait() // TODO: what to do if we have less than K active contacts, shortlist is empty, but we // TODO: have other contacts in our routing table whom we have not contacted. prolly contact them @@ -131,7 +129,7 @@ func (cf *contactFinder) iterationWorker(num int) { resCh, cancel := cf.node.SendCancelable(contact, req) select { case res = <-resCh: - case <-cf.done.Chan(): + case <-cf.stop.Ch(): log.Debugf("[%s] worker %d: canceled", cf.node.id.HexShort(), num) cancel() return @@ -145,7 +143,7 @@ func (cf *contactFinder) iterationWorker(num int) { cf.findValueMutex.Lock() cf.findValueResult = res.Contacts cf.findValueMutex.Unlock() - cf.done.Stop() + cf.stop.Stop() return } else { log.Debugf("[%s] worker %d: got contacts", cf.node.id.HexShort(), num) @@ -158,7 +156,7 @@ func (cf *contactFinder) iterationWorker(num int) { if cf.isSearchFinished() { log.Debugf("[%s] worker %d: search is finished", cf.node.id.HexShort(), num) - cf.done.Stop() + cf.stop.Stop() return } } @@ -214,7 +212,7 @@ func (cf *contactFinder) isSearchFinished() bool { } select { - case <-cf.done.Chan(): + case <-cf.stop.Ch(): return true default: } diff --git a/dht/token_manager.go b/dht/token_manager.go index db9782f..ee1d856 100644 --- a/dht/token_manager.go +++ b/dht/token_manager.go @@ -37,7 +37,7 @@ func (tm *tokenManager) Start(interval time.Duration) { select { case <-tick.C: tm.rotateSecret() - case <-tm.done.Chan(): + case <-tm.done.Ch(): return } }