diff --git a/main.go b/main.go index 217e1cc..5ebf656 100644 --- a/main.go +++ b/main.go @@ -43,7 +43,7 @@ func main() { return } - conn, err := grpc.Dial("localhost:"+string(args.Port), + conn, err := grpc.Dial("localhost:"+fmt.Sprintf("%d", args.Port), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), ) diff --git a/server/jsonrpc_federation.go b/server/jsonrpc_federation.go index 9060406..3e91489 100644 --- a/server/jsonrpc_federation.go +++ b/server/jsonrpc_federation.go @@ -1,14 +1,16 @@ package server import ( - "context" + "errors" - pb "github.com/lbryio/herald.go/protobuf/go" log "github.com/sirupsen/logrus" ) type PeersService struct { Server *Server + // needed for subscribe/unsubscribe + sessionMgr *sessionManager + session *session } type PeersSubscribeReq struct { @@ -22,23 +24,16 @@ type PeersSubscribeResp string // Features is the json rpc endpoint for 'server.peers.subcribe'. func (t *PeersService) Subscribe(req *PeersSubscribeReq, res **PeersSubscribeResp) error { log.Println("PeersSubscribe") - ctx := context.Background() - var port = "50001" + // var port = "50001" // FIXME: Get the actual port from the request details - msg := &pb.ServerMessage{ - Address: req.Ip, - Port: port, - } - - peers, err := t.Server.PeerSubscribe(ctx, msg) - if err != nil { - log.Println(err) + if t.sessionMgr == nil || t.session == nil { *res = nil - return err + return errors.New("no session, rpc not supported") } + t.sessionMgr.peersSubscribe(t.session, true /*subscribe*/) - *res = (*PeersSubscribeResp)(&peers.Value) + *res = nil return nil } diff --git a/server/session.go b/server/session.go index cd0a8a3..20f598f 100644 --- a/server/session.go +++ b/server/session.go @@ -313,6 +313,18 @@ func (sm *sessionManager) broadcastTx(rawTx []byte) (*chainhash.Hash, error) { return nil, nil } +func (sm *sessionManager) peersSubscribe(sess *session, subscribe bool) { + sm.sessionsMut.Lock() + defer sm.sessionsMut.Unlock() + if subscribe { + sm.peerSubs[sess.id] = sess + sess.peersSub = true + return + } + delete(sm.peerSubs, sess.id) + sess.peersSub = false +} + func (sm *sessionManager) headersSubscribe(sess *session, raw bool, subscribe bool) { sm.sessionsMut.Lock() defer sm.sessionsMut.Unlock()