From e070e8a51ead553f31680e99d6cacd56bfcf3a2c Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Sat, 29 Oct 2022 11:42:24 -0400 Subject: [PATCH] JSON RPC compatibility workarounds to support lbry-sdk (#75) * Fix log spam (Already shutting down...) * Workaround to allow lbry-sdk to call server.version and server.features. Incoming/outgoing JSON is patched using yet another codec (jsonPatchingCodec). Add more logging of raw/patched JSON. * Elaborate comment on jsonPatchingCodec. --- server/jsonrpc_server.go | 17 ++--- server/session.go | 161 +++++++++++++++++++++++++++++++++++++-- signal.go | 4 - 3 files changed, 162 insertions(+), 20 deletions(-) diff --git a/server/jsonrpc_server.go b/server/jsonrpc_server.go index a818966..967cbf8 100644 --- a/server/jsonrpc_server.go +++ b/server/jsonrpc_server.go @@ -73,17 +73,16 @@ type ServerVersionService struct { Args *Args } -type ServerVersionReq struct{} +type ServerVersionReq [2]string // [client_name, client_version] -type ServerVersionRes string +type ServerVersionRes [2]string // [version, protocol_version] -// Banner is the json rpc endpoint for 'server.version'. -// FIXME: This should return a struct with the version and the protocol version. -// <<-- that comment was written by github, scary shit because it's true +// Version is the json rpc endpoint for 'server.version'. func (t *ServerService) Version(req *ServerVersionReq, res **ServerVersionRes) error { - log.Println("Version") - - *res = (*ServerVersionRes)(&t.Args.ServerVersion) - + // FIXME: We may need to do the computation of a negotiated version here. + // Also return an error if client is not supported? + result := [2]string{t.Args.ServerVersion, t.Args.ServerVersion} + *res = (*ServerVersionRes)(&result) + log.Printf("Version(%v) -> %v", *req, **res) return nil } diff --git a/server/session.go b/server/session.go index 5caf06b..03e2254 100644 --- a/server/session.go +++ b/server/session.go @@ -1,7 +1,9 @@ package server import ( + "bytes" "encoding/hex" + "encoding/json" "fmt" "net" "net/rpc" @@ -241,7 +243,7 @@ func (sm *sessionManager) addSession(conn net.Conn) *session { sm.grp.Add(1) go func() { - s1.ServeCodec(&SessionServerCodec{jsonrpc.NewServerCodec(conn), sess}) + s1.ServeCodec(&sessionServerCodec{jsonrpc.NewServerCodec(newJsonPatchingCodec(conn)), sess}) log.Infof("session %v goroutine exit", sess.addr.String()) sm.grp.Done() }() @@ -343,7 +345,7 @@ func (sm *sessionManager) doNotify(notification interface{}) { } } -type SessionServerCodec struct { +type sessionServerCodec struct { rpc.ServerCodec sess *session } @@ -354,13 +356,14 @@ type SessionServerCodec struct { // blockchain.address.listunspent -> blockchain.address.Listunspent // This makes the "method" string compatible with rpc.Server // requirements. -func (c *SessionServerCodec) ReadRequestHeader(req *rpc.Request) error { - log.Infof("receive header from %v", c.sess.addr.String()) +func (c *sessionServerCodec) ReadRequestHeader(req *rpc.Request) error { + log.Infof("from %v receive header", c.sess.addr.String()) err := c.ServerCodec.ReadRequestHeader(req) if err != nil { log.Warnf("error: %v", err) return err } + log.Infof("from %v receive header: %#v", c.sess.addr.String(), *req) rawMethod := req.ServiceMethod parts := strings.Split(rawMethod, ".") if len(parts) < 2 { @@ -377,20 +380,21 @@ func (c *SessionServerCodec) ReadRequestHeader(req *rpc.Request) error { } // ReadRequestBody wraps the regular implementation, but updates session stats too. -func (c *SessionServerCodec) ReadRequestBody(params any) error { +func (c *sessionServerCodec) ReadRequestBody(params any) error { + log.Infof("from %v receive body", c.sess.addr.String()) err := c.ServerCodec.ReadRequestBody(params) if err != nil { log.Warnf("error: %v", err) return err } - log.Infof("receive body from %v", c.sess.addr.String()) + log.Infof("from %v receive body: %#v", c.sess.addr.String(), params) // Bump last receive time. c.sess.lastRecv = time.Now() return err } // WriteResponse wraps the regular implementation, but updates session stats too. -func (c *SessionServerCodec) WriteResponse(resp *rpc.Response, reply any) error { +func (c *sessionServerCodec) WriteResponse(resp *rpc.Response, reply any) error { log.Infof("respond to %v", c.sess.addr.String()) err := c.ServerCodec.WriteResponse(resp, reply) if err != nil { @@ -400,3 +404,146 @@ func (c *SessionServerCodec) WriteResponse(resp *rpc.Response, reply any) error c.sess.lastSend = time.Now() return err } + +// serverRequest is a duplicate of serverRequest from +// net/rpc/jsonrpc/server.go with an added Version which +// we can check. +type serverRequest struct { + Version string `json:"jsonrpc"` + Method string `json:"method"` + Params *json.RawMessage `json:"params"` + Id *json.RawMessage `json:"id"` +} + +// serverResponse is a duplicate of serverResponse from +// net/rpc/jsonrpc/server.go with an added Version which +// we can set at will. +type serverResponse struct { + Version string `json:"jsonrpc"` + Id *json.RawMessage `json:"id"` + Result any `json:"result,omitempty"` + Error any `json:"error,omitempty"` +} + +// jsonPatchingCodec is able to intercept the JSON requests/responses +// and tweak them. Currently, it appears we need to make several changes: +// 1) add "jsonrpc": "2.0" (or "jsonrpc": "1.0") in response +// 2) add newline to frame response +// 3) add "params": [] when "params" is missing +// 4) replace params ["arg1", "arg2", ...] with [["arg1", "arg2", ...]] +type jsonPatchingCodec struct { + conn net.Conn + inBuffer *bytes.Buffer + dec *json.Decoder + enc *json.Encoder + outBuffer *bytes.Buffer +} + +func newJsonPatchingCodec(conn net.Conn) *jsonPatchingCodec { + buf1, buf2 := bytes.NewBuffer(nil), bytes.NewBuffer(nil) + return &jsonPatchingCodec{ + conn: conn, + inBuffer: buf1, + dec: json.NewDecoder(buf1), + enc: json.NewEncoder(buf2), + outBuffer: buf2, + } +} + +func (c *jsonPatchingCodec) Read(p []byte) (n int, err error) { + if c.outBuffer.Len() > 0 { + // Return remaining decoded bytes. + return c.outBuffer.Read(p) + } + // Buffer contents consumed. Try to decode more JSON. + + // Read until framing newline. This allows us to print the raw request. + for !bytes.ContainsAny(c.inBuffer.Bytes(), "\n") { + var buf [1024]byte + n, err = c.conn.Read(buf[:]) + if err != nil { + return 0, err + } + c.inBuffer.Write(buf[:n]) + } + log.Infof("raw request: %v", c.inBuffer.String()) + + var req serverRequest + err = c.dec.Decode(&req) + if err != nil { + return 0, err + } + + if req.Params != nil { + n := len(*req.Params) + if n < 2 || (*req.Params)[0] != '[' && (*req.Params)[n-1] != ']' { + // This is an error, but we're not going to try to correct it. + goto encode + } + // FIXME: The heuristics here don't cover all possibilities. + // For example: [{obj1}, {obj2}] or ["foo,bar"] would not + // be handled correctly. + bracketed := (*req.Params)[1 : n-1] + n = len(bracketed) + if n > 1 && (bracketed[0] == '{' || bracketed[0] == '[') { + // Probable single object or list argument. + goto encode + } + args := strings.Split(string(bracketed), ",") + if len(args) <= 1 { + // No commas at all. Definitely a single argument. + goto encode + } + // The params look like ["arg1", "arg2", "arg3", ...]. + // We're in trouble because our jsonrpc library does not + // handle this. So pack these args in an inner list. + // The handler method will receive ONE list argument. + params := json.RawMessage(fmt.Sprintf("[[%s]]", bracketed)) + req.Params = ¶ms + } else { + // Add empty argument list if params omitted. + params := json.RawMessage("[]") + req.Params = ¶ms + } + +encode: + // Encode the request. This allows us to print the patched request. + buf, err := json.Marshal(req) + if err != nil { + return 0, err + } + log.Infof("patched request: %v", string(buf)) + + err = c.enc.Encode(req) + if err != nil { + return 0, err + } + return c.outBuffer.Read(p) +} + +func (c *jsonPatchingCodec) Write(p []byte) (n int, err error) { + log.Infof("raw response: %v", string(p)) + var resp serverResponse + err = json.Unmarshal(p, &resp) + if err != nil { + return 0, err + } + + // Add "jsonrpc": "2.0" if missing. + if len(resp.Version) == 0 { + resp.Version = "2.0" + } + + buf, err := json.Marshal(resp) + if err != nil { + return 0, err + } + log.Infof("patched response: %v", string(buf)) + + // Add newline for framing. + return c.conn.Write(append(buf, '\n')) +} + +func (c *jsonPatchingCodec) Close() error { + return c.conn.Close() +} diff --git a/signal.go b/signal.go index a2561a6..1a9b7e2 100644 --- a/signal.go +++ b/signal.go @@ -49,10 +49,6 @@ func interruptListener() <-chan struct{} { case sig := <-interruptChannel: log.Infof("Received signal (%s). Already "+ "shutting down...", sig) - - case <-shutdownRequestChannel: - log.Info("Shutdown requested. Already " + - "shutting down...") } } }()