rpcserver: Refactor listener logic to server.

This refactors the RPC server to accept and take ownership of already
configured listeners and refactors the logic to setup those listeners to
the server.  This mirrors the logic used by the connection manager and
is desirable since it is another step closer to being able to split the
RPC server code out into a separate package and will make it much easier
to internally test since it allows creating mock listeners.
This commit is contained in:
Dave Collins 2017-08-13 18:58:58 -05:00
parent ff700325ac
commit c5c46376ba
No known key found for this signature in database
GPG key ID: B8904D9D9C93D1F2
2 changed files with 78 additions and 66 deletions

View file

@ -9,7 +9,6 @@ import (
"bytes"
"crypto/sha256"
"crypto/subtle"
"crypto/tls"
"encoding/base64"
"encoding/hex"
"encoding/json"
@ -3542,7 +3541,6 @@ type rpcServer struct {
statusLines map[int]string
statusLock sync.RWMutex
wg sync.WaitGroup
listeners []net.Listener
gbtWorkState *gbtWorkState
helpCacher *helpCacher
requestProcessShutdown chan struct{}
@ -3611,7 +3609,7 @@ func (s *rpcServer) Stop() error {
return nil
}
rpcsLog.Warnf("RPC server shutting down")
for _, listener := range s.listeners {
for _, listener := range s.cfg.Listeners {
err := listener.Close()
if err != nil {
rpcsLog.Errorf("Problem shutting down rpc: %v", err)
@ -4006,7 +4004,7 @@ func (s *rpcServer) Start() {
s.WebsocketHandler(ws, r.RemoteAddr, authenticated, isAdmin)
})
for _, listener := range s.listeners {
for _, listener := range s.cfg.Listeners {
s.wg.Add(1)
go func(listener net.Listener) {
rpcsLog.Infof("RPC server listening on %s", listener.Addr())
@ -4154,8 +4152,11 @@ type rpcserverSyncManager interface {
// rpcserverConfig is a descriptor containing the RPC server configuration.
type rpcserverConfig struct {
// ListenAddrs are the addresses the RPC server should listen on.
ListenAddrs []string
// Listeners defines a slice of listeners for which the RPC server will
// take ownership of and accept connections. Since the RPC server takes
// ownership of these listeners, they will be closed when the RPC server
// is stopped.
Listeners []net.Listener
// StartupTime is the unix timestamp for when the server that is hosting
// the RPC server started.
@ -4215,65 +4216,6 @@ func newRPCServer(config *rpcserverConfig) (*rpcServer, error) {
rpc.limitauthsha = sha256.Sum256([]byte(auth))
}
rpc.ntfnMgr = newWsNotificationManager(&rpc)
// Setup TLS if not disabled.
listenFunc := net.Listen
if !cfg.DisableTLS {
// Generate the TLS cert and key file if both don't already
// exist.
if !fileExists(cfg.RPCKey) && !fileExists(cfg.RPCCert) {
err := genCertPair(cfg.RPCCert, cfg.RPCKey)
if err != nil {
return nil, err
}
}
keypair, err := tls.LoadX509KeyPair(cfg.RPCCert, cfg.RPCKey)
if err != nil {
return nil, err
}
tlsConfig := tls.Config{
Certificates: []tls.Certificate{keypair},
MinVersion: tls.VersionTLS12,
}
// Change the standard net.Listen function to the tls one.
listenFunc = func(net string, laddr string) (net.Listener, error) {
return tls.Listen(net, laddr, &tlsConfig)
}
}
// TODO: this code is similar to that in server, should be
// factored into something shared.
ipv4ListenAddrs, ipv6ListenAddrs, _, err := parseListeners(config.ListenAddrs)
if err != nil {
return nil, err
}
listeners := make([]net.Listener, 0,
len(ipv6ListenAddrs)+len(ipv4ListenAddrs))
for _, addr := range ipv4ListenAddrs {
listener, err := listenFunc("tcp4", addr)
if err != nil {
rpcsLog.Warnf("Can't listen on %s: %v", addr, err)
continue
}
listeners = append(listeners, listener)
}
for _, addr := range ipv6ListenAddrs {
listener, err := listenFunc("tcp6", addr)
if err != nil {
rpcsLog.Warnf("Can't listen on %s: %v", addr, err)
continue
}
listeners = append(listeners, listener)
}
if len(listeners) == 0 {
return nil, errors.New("RPCS: No valid listen address")
}
rpc.listeners = listeners
rpc.cfg.Chain.Subscribe(rpc.handleBlockchainNotification)
return &rpc, nil

View file

@ -8,6 +8,7 @@ package main
import (
"bytes"
"crypto/rand"
"crypto/tls"
"encoding/binary"
"errors"
"fmt"
@ -2192,6 +2193,65 @@ out:
s.wg.Done()
}
// setupRPCListeners returns a slice of listners that are configured for use
// with the RPC server depending on the configuration settings for listen
// addresses and TLS.
func setupRPCListeners() ([]net.Listener, error) {
// Setup TLS if not disabled.
listenFunc := net.Listen
if !cfg.DisableTLS {
// Generate the TLS cert and key file if both don't already
// exist.
if !fileExists(cfg.RPCKey) && !fileExists(cfg.RPCCert) {
err := genCertPair(cfg.RPCCert, cfg.RPCKey)
if err != nil {
return nil, err
}
}
keypair, err := tls.LoadX509KeyPair(cfg.RPCCert, cfg.RPCKey)
if err != nil {
return nil, err
}
tlsConfig := tls.Config{
Certificates: []tls.Certificate{keypair},
MinVersion: tls.VersionTLS12,
}
// Change the standard net.Listen function to the tls one.
listenFunc = func(net string, laddr string) (net.Listener, error) {
return tls.Listen(net, laddr, &tlsConfig)
}
}
// TODO: This code is similar to the peer listener code. It should be
// factored into something shared.
ipv4Addrs, ipv6Addrs, _, err := parseListeners(cfg.RPCListeners)
if err != nil {
return nil, err
}
listeners := make([]net.Listener, 0, len(ipv4Addrs)+len(ipv4Addrs))
for _, addr := range ipv4Addrs {
listener, err := listenFunc("tcp4", addr)
if err != nil {
rpcsLog.Warnf("Can't listen on %s: %v", addr, err)
continue
}
listeners = append(listeners, listener)
}
for _, addr := range ipv6Addrs {
listener, err := listenFunc("tcp6", addr)
if err != nil {
rpcsLog.Warnf("Can't listen on %s: %v", addr, err)
continue
}
listeners = append(listeners, listener)
}
return listeners, nil
}
// newServer returns a new btcd server configured to listen on addr for the
// bitcoin network type specified by chainParams. Use start to begin accepting
// connections from peers.
@ -2543,8 +2603,18 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
}
if !cfg.DisableRPC {
// Setup listeners for the configured RPC listen addresses and
// TLS settings.
rpcListeners, err := setupRPCListeners()
if err != nil {
return nil, err
}
if len(rpcListeners) == 0 {
return nil, errors.New("RPCS: No valid listen address")
}
s.rpcServer, err = newRPCServer(&rpcserverConfig{
ListenAddrs: cfg.RPCListeners,
Listeners: rpcListeners,
StartupTime: s.startupTime,
ConnMgr: &rpcConnManager{&s},
SyncMgr: &rpcSyncMgr{&s, s.blockManager},