JSON RPC compatibility workarounds to support lbry-sdk (#75)

* Fix log spam (Already shutting down...)

* Workaround to allow lbry-sdk to call server.version and server.features.
Incoming/outgoing JSON is patched using yet another codec (jsonPatchingCodec).
Add more logging of raw/patched JSON.

* Elaborate comment on jsonPatchingCodec.
This commit is contained in:
Jonathan Moody 2022-10-29 11:42:24 -04:00 committed by GitHub
parent cd7f20a461
commit e070e8a51e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 162 additions and 20 deletions

View file

@ -73,17 +73,16 @@ type ServerVersionService struct {
Args *Args
}
type ServerVersionReq struct{}
type ServerVersionReq [2]string // [client_name, client_version]
type ServerVersionRes string
type ServerVersionRes [2]string // [version, protocol_version]
// Banner is the json rpc endpoint for 'server.version'.
// FIXME: This should return a struct with the version and the protocol version.
// <<-- that comment was written by github, scary shit because it's true
// Version is the json rpc endpoint for 'server.version'.
func (t *ServerService) Version(req *ServerVersionReq, res **ServerVersionRes) error {
log.Println("Version")
*res = (*ServerVersionRes)(&t.Args.ServerVersion)
// FIXME: We may need to do the computation of a negotiated version here.
// Also return an error if client is not supported?
result := [2]string{t.Args.ServerVersion, t.Args.ServerVersion}
*res = (*ServerVersionRes)(&result)
log.Printf("Version(%v) -> %v", *req, **res)
return nil
}

View file

@ -1,7 +1,9 @@
package server
import (
"bytes"
"encoding/hex"
"encoding/json"
"fmt"
"net"
"net/rpc"
@ -241,7 +243,7 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
sm.grp.Add(1)
go func() {
s1.ServeCodec(&SessionServerCodec{jsonrpc.NewServerCodec(conn), sess})
s1.ServeCodec(&sessionServerCodec{jsonrpc.NewServerCodec(newJsonPatchingCodec(conn)), sess})
log.Infof("session %v goroutine exit", sess.addr.String())
sm.grp.Done()
}()
@ -343,7 +345,7 @@ func (sm *sessionManager) doNotify(notification interface{}) {
}
}
type SessionServerCodec struct {
type sessionServerCodec struct {
rpc.ServerCodec
sess *session
}
@ -354,13 +356,14 @@ type SessionServerCodec struct {
// blockchain.address.listunspent -> blockchain.address.Listunspent
// This makes the "method" string compatible with rpc.Server
// requirements.
func (c *SessionServerCodec) ReadRequestHeader(req *rpc.Request) error {
log.Infof("receive header from %v", c.sess.addr.String())
func (c *sessionServerCodec) ReadRequestHeader(req *rpc.Request) error {
log.Infof("from %v receive header", c.sess.addr.String())
err := c.ServerCodec.ReadRequestHeader(req)
if err != nil {
log.Warnf("error: %v", err)
return err
}
log.Infof("from %v receive header: %#v", c.sess.addr.String(), *req)
rawMethod := req.ServiceMethod
parts := strings.Split(rawMethod, ".")
if len(parts) < 2 {
@ -377,20 +380,21 @@ func (c *SessionServerCodec) ReadRequestHeader(req *rpc.Request) error {
}
// ReadRequestBody wraps the regular implementation, but updates session stats too.
func (c *SessionServerCodec) ReadRequestBody(params any) error {
func (c *sessionServerCodec) ReadRequestBody(params any) error {
log.Infof("from %v receive body", c.sess.addr.String())
err := c.ServerCodec.ReadRequestBody(params)
if err != nil {
log.Warnf("error: %v", err)
return err
}
log.Infof("receive body from %v", c.sess.addr.String())
log.Infof("from %v receive body: %#v", c.sess.addr.String(), params)
// Bump last receive time.
c.sess.lastRecv = time.Now()
return err
}
// WriteResponse wraps the regular implementation, but updates session stats too.
func (c *SessionServerCodec) WriteResponse(resp *rpc.Response, reply any) error {
func (c *sessionServerCodec) WriteResponse(resp *rpc.Response, reply any) error {
log.Infof("respond to %v", c.sess.addr.String())
err := c.ServerCodec.WriteResponse(resp, reply)
if err != nil {
@ -400,3 +404,146 @@ func (c *SessionServerCodec) WriteResponse(resp *rpc.Response, reply any) error
c.sess.lastSend = time.Now()
return err
}
// serverRequest is a duplicate of serverRequest from
// net/rpc/jsonrpc/server.go with an added Version which
// we can check.
type serverRequest struct {
Version string `json:"jsonrpc"`
Method string `json:"method"`
Params *json.RawMessage `json:"params"`
Id *json.RawMessage `json:"id"`
}
// serverResponse is a duplicate of serverResponse from
// net/rpc/jsonrpc/server.go with an added Version which
// we can set at will.
type serverResponse struct {
Version string `json:"jsonrpc"`
Id *json.RawMessage `json:"id"`
Result any `json:"result,omitempty"`
Error any `json:"error,omitempty"`
}
// jsonPatchingCodec is able to intercept the JSON requests/responses
// and tweak them. Currently, it appears we need to make several changes:
// 1) add "jsonrpc": "2.0" (or "jsonrpc": "1.0") in response
// 2) add newline to frame response
// 3) add "params": [] when "params" is missing
// 4) replace params ["arg1", "arg2", ...] with [["arg1", "arg2", ...]]
type jsonPatchingCodec struct {
conn net.Conn
inBuffer *bytes.Buffer
dec *json.Decoder
enc *json.Encoder
outBuffer *bytes.Buffer
}
func newJsonPatchingCodec(conn net.Conn) *jsonPatchingCodec {
buf1, buf2 := bytes.NewBuffer(nil), bytes.NewBuffer(nil)
return &jsonPatchingCodec{
conn: conn,
inBuffer: buf1,
dec: json.NewDecoder(buf1),
enc: json.NewEncoder(buf2),
outBuffer: buf2,
}
}
func (c *jsonPatchingCodec) Read(p []byte) (n int, err error) {
if c.outBuffer.Len() > 0 {
// Return remaining decoded bytes.
return c.outBuffer.Read(p)
}
// Buffer contents consumed. Try to decode more JSON.
// Read until framing newline. This allows us to print the raw request.
for !bytes.ContainsAny(c.inBuffer.Bytes(), "\n") {
var buf [1024]byte
n, err = c.conn.Read(buf[:])
if err != nil {
return 0, err
}
c.inBuffer.Write(buf[:n])
}
log.Infof("raw request: %v", c.inBuffer.String())
var req serverRequest
err = c.dec.Decode(&req)
if err != nil {
return 0, err
}
if req.Params != nil {
n := len(*req.Params)
if n < 2 || (*req.Params)[0] != '[' && (*req.Params)[n-1] != ']' {
// This is an error, but we're not going to try to correct it.
goto encode
}
// FIXME: The heuristics here don't cover all possibilities.
// For example: [{obj1}, {obj2}] or ["foo,bar"] would not
// be handled correctly.
bracketed := (*req.Params)[1 : n-1]
n = len(bracketed)
if n > 1 && (bracketed[0] == '{' || bracketed[0] == '[') {
// Probable single object or list argument.
goto encode
}
args := strings.Split(string(bracketed), ",")
if len(args) <= 1 {
// No commas at all. Definitely a single argument.
goto encode
}
// The params look like ["arg1", "arg2", "arg3", ...].
// We're in trouble because our jsonrpc library does not
// handle this. So pack these args in an inner list.
// The handler method will receive ONE list argument.
params := json.RawMessage(fmt.Sprintf("[[%s]]", bracketed))
req.Params = &params
} else {
// Add empty argument list if params omitted.
params := json.RawMessage("[]")
req.Params = &params
}
encode:
// Encode the request. This allows us to print the patched request.
buf, err := json.Marshal(req)
if err != nil {
return 0, err
}
log.Infof("patched request: %v", string(buf))
err = c.enc.Encode(req)
if err != nil {
return 0, err
}
return c.outBuffer.Read(p)
}
func (c *jsonPatchingCodec) Write(p []byte) (n int, err error) {
log.Infof("raw response: %v", string(p))
var resp serverResponse
err = json.Unmarshal(p, &resp)
if err != nil {
return 0, err
}
// Add "jsonrpc": "2.0" if missing.
if len(resp.Version) == 0 {
resp.Version = "2.0"
}
buf, err := json.Marshal(resp)
if err != nil {
return 0, err
}
log.Infof("patched response: %v", string(buf))
// Add newline for framing.
return c.conn.Write(append(buf, '\n'))
}
func (c *jsonPatchingCodec) Close() error {
return c.conn.Close()
}

View file

@ -49,10 +49,6 @@ func interruptListener() <-chan struct{} {
case sig := <-interruptChannel:
log.Infof("Received signal (%s). Already "+
"shutting down...", sig)
case <-shutdownRequestChannel:
log.Info("Shutdown requested. Already " +
"shutting down...")
}
}
}()