Rework and improve websocket notification system.

This commit refactors the entire websocket client code to resolve several
issues with the previous implementation.  Note that this commit does not
change the public API for websockets.  It only consists of internal
improvements.

The following is the major issues which have been addressed:
- A slow websocket client could impede notifications to all clients
- Long-running operations such as rescans would block all other requests
  until it had completed
- The above two points taken together could lead to apparant hangs since
  the client doing the rescan would eventually run out of channel buffer
  and block the entire group of clients until the rescan completed
- Disconnecting a websocket during certain operations could lead to a hang
- Stopping the rpc server with operations under way could lead to a hang
- There were no limits to the number of websocket clients that could
  connect

The following is a summary of the major changes:

- The websocket code has been split into two entities: a
  connection/notification manager and a websocket client
- The new connection/notification manager acts as the entry point from
  the rest of the subsystems to feed data which potentially needs to
  notify clients
- Each websocket client now has its own instance of the new websocket
  client type which controls its own lifecycle
- The data flow has been completely redesigned to closely resemble the
  peer data flow
- Each websocket now has its own long-lived goroutines for input, output,
  and queuing of notifications
- Notifications use the new notification queue goroutine along with
  queueing to ensure they dont't block on stalled or slow peers
- There is a new infrastructure for asynchronously executing long-running
  commands such as a rescan while still allowing the faster operations to
  continue to be serviced by the same client
- Since long-running operations now run asynchronously, they have been
  limited to one at a time
- Added a limit of 10 websocket clients.  This is hard coded for now, but
  will be made configurable in the future

Taken together these changes make the code far easier to reason about and
update as well solve the aforementioned issues.

Further optimizations to improve performance are possible in regards to
the way the connection/notification manager works, however this commit
already contains a ton of changes, so they are being left for another
time.
This commit is contained in:
Dave Collins 2014-02-18 17:23:33 -06:00
parent 97e0149dc3
commit 54203d7db0
4 changed files with 1180 additions and 850 deletions

View file

@ -998,8 +998,8 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
// Notify frontends
if r := b.server.rpcServer; r != nil {
go func() {
r.NotifyBlockTXs(b.server.db, block)
r.NotifyBlockConnected(block)
r.ntfnMgr.NotifyBlockTXs(block)
r.ntfnMgr.NotifyBlockConnected(block)
}()
}
@ -1025,7 +1025,7 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
// Notify frontends
if r := b.server.rpcServer; r != nil {
go r.NotifyBlockDisconnected(block)
go r.ntfnMgr.NotifyBlockDisconnected(block)
}
}
}

View file

@ -876,13 +876,15 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool, isNe
txmpLog.Debugf("Accepted transaction %v (pool size: %v)", txHash,
len(mp.pool))
// Notify wallets of mempool transactions to wallet addresses.
// Notify websocket clients about mempool transactions.
if mp.server.rpcServer != nil {
mp.server.rpcServer.NotifyForTxOuts(tx, nil)
go func() {
mp.server.rpcServer.ntfnMgr.NotifyForTxOuts(tx, nil)
if isNew {
mp.server.rpcServer.NotifyForNewTx(tx)
}
if isNew {
mp.server.rpcServer.ntfnMgr.NotifyForNewTx(tx)
}
}()
}
return nil

View file

@ -144,7 +144,7 @@ type rpcServer struct {
shutdown int32
server *server
authsha [sha256.Size]byte
ws *wsContext
ntfnMgr *wsNotificationManager
numClients int
numClientsMutex sync.Mutex
wg sync.WaitGroup
@ -184,9 +184,9 @@ func (s *rpcServer) Start() {
return
}
jsonRPCRead(w, r, s)
})
// Websocket endpoint.
rpcServeMux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
authenticated, err := s.checkAuth(r, false)
if err != nil {
@ -195,7 +195,7 @@ func (s *rpcServer) Start() {
}
wsServer := websocket.Server{
Handler: websocket.Handler(func(ws *websocket.Conn) {
s.walletReqsNotifications(ws, authenticated)
s.WebsocketHandler(ws, r.RemoteAddr, authenticated)
}),
}
wsServer.ServeHTTP(w, r)
@ -296,6 +296,7 @@ func (s *rpcServer) Stop() error {
return err
}
}
s.ntfnMgr.Shutdown()
close(s.quit)
s.wg.Wait()
rpcsLog.Infof("RPC server shutdown complete")
@ -333,9 +334,9 @@ func newRPCServer(listenAddrs []string, s *server) (*rpcServer, error) {
rpc := rpcServer{
authsha: sha256.Sum256([]byte(auth)),
server: s,
ws: newWebsocketContext(),
quit: make(chan int),
}
rpc.ntfnMgr = newWsNotificationManager(&rpc)
// check for existence of cert file and key file
if !fileExists(cfg.RPCKey) && !fileExists(cfg.RPCCert) {

File diff suppressed because it is too large Load diff