parent
061a220354
commit
9036d36e68
1 changed files with 62 additions and 33 deletions
95
rpcserver.go
95
rpcserver.go
|
@ -477,7 +477,6 @@ func (s *rpcServer) WebsocketClientRead(wsc *websocketClient) {
|
|||
}
|
||||
wsc.allRequests <- request
|
||||
}
|
||||
s.wg.Done()
|
||||
}
|
||||
|
||||
type rawRequest struct {
|
||||
|
@ -582,40 +581,57 @@ type unauthedRequest struct {
|
|||
|
||||
func (s *rpcServer) WebsocketClientGateway(wsc *websocketClient) {
|
||||
out:
|
||||
for request := range wsc.allRequests {
|
||||
// Get the method of the request and check whether it should be
|
||||
// handled by wallet or passed down to btcd. If the latter,
|
||||
// handle in a new goroutine (to not block or be blocked by
|
||||
// the handling of actual wallet requests).
|
||||
//
|
||||
// This is done by unmarshaling the JSON bytes into a rawRequest
|
||||
// to avoid the mangling of unmarshaling and re-marshaling of
|
||||
// large JSON numbers, as well as the overhead of unneeded
|
||||
// unmarshals and marshals.
|
||||
var raw rawRequest
|
||||
if err := json.Unmarshal(request, &raw); err != nil {
|
||||
if !wsc.authenticated {
|
||||
// Disconnect immediately.
|
||||
// A for-select with a read of the quit channel is used instead of a
|
||||
// for-range to provide clean shutdown. This is necessary due to
|
||||
// WebsocketClientRead (which sends to the allRequests chan) not closing
|
||||
// allRequests during shutdown if the remote websocket client is still
|
||||
// connected.
|
||||
for {
|
||||
select {
|
||||
case request, ok := <-wsc.allRequests:
|
||||
if !ok {
|
||||
// client disconnected
|
||||
break out
|
||||
}
|
||||
err = wsc.send(marshalError(idPointer(raw.ID)))
|
||||
if err != nil {
|
||||
break out
|
||||
// Get the method of the request and check whether it
|
||||
// should be handled by wallet or passed down to btcd.
|
||||
// If the latter, handle in a new goroutine (to not
|
||||
// block or be blocked by the handling of actual wallet
|
||||
// requests).
|
||||
//
|
||||
// This is done by unmarshaling the JSON bytes into a
|
||||
// rawRequest to avoid the mangling of unmarshaling and
|
||||
// re-marshaling of large JSON numbers, as well as the
|
||||
// overhead of unneeded unmarshals and marshals.
|
||||
var raw rawRequest
|
||||
if err := json.Unmarshal(request, &raw); err != nil {
|
||||
if !wsc.authenticated {
|
||||
// Disconnect immediately.
|
||||
break out
|
||||
}
|
||||
err = wsc.send(marshalError(idPointer(raw.ID)))
|
||||
if err != nil {
|
||||
break out
|
||||
}
|
||||
continue
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
f, ok := handlerFunc(raw.Method, true)
|
||||
if ok || raw.Method == "authenticate" {
|
||||
// unauthedRequests is buffered to the max number of
|
||||
// concurrent websocket client requests so as to not
|
||||
// block the passthrough of later btcd requests.
|
||||
wsc.unauthedRequests <- unauthedRequest{request, f}
|
||||
} else {
|
||||
// websocketPassthrough is run as a goroutine to
|
||||
// send an unhandled request to the chain server without
|
||||
// blocking the handling of later wallet requests.
|
||||
go s.websocketPassthrough(wsc, raw)
|
||||
f, ok := handlerFunc(raw.Method, true)
|
||||
if ok || raw.Method == "authenticate" {
|
||||
// unauthedRequests is buffered to the max
|
||||
// number of concurrent websocket client
|
||||
// requests so as to not block the passthrough
|
||||
// of later btcd requests.
|
||||
wsc.unauthedRequests <- unauthedRequest{request, f}
|
||||
} else {
|
||||
// websocketPassthrough is run as a goroutine to
|
||||
// send an unhandled request to the chain server
|
||||
// without blocking the handling of later wallet
|
||||
// requests.
|
||||
go s.websocketPassthrough(wsc, raw)
|
||||
}
|
||||
case <-s.quit:
|
||||
break out
|
||||
}
|
||||
}
|
||||
close(wsc.unauthedRequests)
|
||||
|
@ -773,8 +789,13 @@ func (s *rpcServer) WebsocketClientRPC(wsc *websocketClient) {
|
|||
return
|
||||
}
|
||||
|
||||
s.wg.Add(4)
|
||||
// WebsocketClientRead is intentionally not run with the waitgroup
|
||||
// so it is ignored during shutdown. This is to prevent a hang during
|
||||
// shutdown where the goroutine is blocked on a read of the
|
||||
// websocket connection if the client is still connected.
|
||||
go s.WebsocketClientRead(wsc)
|
||||
|
||||
s.wg.Add(3)
|
||||
go s.WebsocketClientGateway(wsc)
|
||||
go s.WebsocketClientRespond(wsc)
|
||||
go s.WebsocketClientSend(wsc)
|
||||
|
@ -1044,7 +1065,15 @@ out:
|
|||
jsonErr.Code = btcjson.ErrWallet.Code
|
||||
}
|
||||
}
|
||||
r.response <- handlerResponse{result, jsonErr}
|
||||
// The goroutine which requested this may not be running
|
||||
// anymore. If the quit chan is read instead, break out
|
||||
// of the loop now so more requests aren't potentially
|
||||
// read after reentering the loop.
|
||||
select {
|
||||
case r.response <- handlerResponse{result, jsonErr}:
|
||||
case <-s.quit:
|
||||
break out
|
||||
}
|
||||
|
||||
case <-s.quit:
|
||||
break out
|
||||
|
|
Loading…
Reference in a new issue