the beginning of a jsonrpc client

This commit is contained in:
Alex Grintsvayg 2017-09-12 12:04:45 -04:00
parent 7f3e4460fd
commit 6e929495fc
6 changed files with 253 additions and 38 deletions

View file

@ -165,7 +165,7 @@ func (tm *tokenManager) token(addr *net.UDPAddr) string {
// clear removes expired tokens.
func (tm *tokenManager) clear() {
for _ = range time.Tick(time.Minute * 3) {
for range time.Tick(time.Minute * 3) {
keys := make([]interface{}, 0, 100)
for item := range tm.Iter() {
@ -191,16 +191,6 @@ func (tm *tokenManager) check(addr *net.UDPAddr, tokenString string) bool {
return ok && tokenString == tk.data
}
// makeQuery returns a query-formed data.
func makeQuery(t, q string, a map[string]interface{}) map[string]interface{} {
return map[string]interface{}{
"t": t,
"y": "q",
"q": q,
"a": a,
}
}
// send sends data to the udp.
func send(dht *DHT, addr *net.UDPAddr, data Message) error {
log.Infof("Sending %s", spew.Sdump(data))

39
dht/main_test.go Normal file
View file

@ -0,0 +1,39 @@
package dht
import (
"math/rand"
"strconv"
"testing"
"time"
)
func TestDHT(t *testing.T) {
rand.Seed(time.Now().UnixNano())
port := 49449 // + (rand.Int() % 10)
config := NewStandardConfig()
config.Address = "127.0.0.1:" + strconv.Itoa(port)
config.PrimeNodes = []string{
"127.0.0.1:10001",
}
d := New(config)
t.Log("Starting...")
go d.Run()
time.Sleep(2 * time.Second)
for {
peers, err := d.FindNode("012b66fc7052d9a0c8cb563b8ede7662003ba65f425c2661b5c6919d445deeb31469be8b842d6faeea3f2b3ebcaec845")
if err != nil {
time.Sleep(time.Second * 1)
continue
}
t.Log("Found peers:", peers)
break
}
t.Error("failed")
}

76
jsonrpc/daemon.go Normal file
View file

@ -0,0 +1,76 @@
package jsonrpc
import (
"errors"
"strconv"
"github.com/mitchellh/mapstructure"
"github.com/ybbus/jsonrpc"
)
const DefaultPort = 5279
type Client struct {
conn *jsonrpc.RPCClient
}
func NewClient(address string) *Client {
d := Client{}
if address == "" {
address = "http://localhost:" + strconv.Itoa(DefaultPort)
}
d.conn = jsonrpc.NewRPCClient(address)
return &d
}
func decode(data interface{}, targetStruct interface{}) error {
config := &mapstructure.DecoderConfig{
Metadata: nil,
Result: targetStruct,
TagName: "json",
//WeaklyTypedInput: true,
DecodeHook: fixDecodeProto,
}
decoder, err := mapstructure.NewDecoder(config)
if err != nil {
return err
}
return decoder.Decode(data)
}
func (d *Client) call(response interface{}, command string, params ...interface{}) error {
r, err := d.conn.Call(command, params...)
if err != nil {
return err
}
if r.Error != nil {
return errors.New("Error in daemon: " + r.Error.Message)
}
return decode(r.Result, response)
}
func (d *Client) Commands() (*CommandsResponse, error) {
response := &CommandsResponse{}
return response, d.call(response, "commands")
}
func (d *Client) Status() (*StatusResponse, error) {
response := &StatusResponse{}
return response, d.call(response, "status")
}
func (d *Client) Get(url string, filename *string, timeout *uint) (*GetResponse, error) {
response := &GetResponse{}
return response, d.call(response, "get", map[string]interface{}{
"uri": url,
"file_name": filename,
"timeout": timeout,
})
}

View file

@ -0,0 +1,118 @@
package jsonrpc
import (
"encoding/json"
"github.com/go-errors/errors"
"reflect"
lbryschema "github.com/lbryio/lbryschema.go/pb"
)
func getEnumVal(enum map[string]int32, data interface{}) (int32, error) {
s, ok := data.(string)
if !ok {
return 0, errors.New("expected a string")
}
val, ok := enum[s]
if !ok {
return 0, errors.New("invalid enum key")
}
return val, nil
}
func fixDecodeProto(src, dest reflect.Type, data interface{}) (interface{}, error) {
switch dest {
case reflect.TypeOf(uint64(0)):
if n, ok := data.(json.Number); ok {
val, err := n.Int64()
if err != nil {
return nil, err
} else if val < 0 {
return nil, errors.New("must be unsigned int")
}
return uint64(val), nil
}
case reflect.TypeOf([]byte{}):
if s, ok := data.(string); ok {
return []byte(s), nil
}
case reflect.TypeOf(lbryschema.Metadata_Version(0)):
val, err := getEnumVal(lbryschema.Metadata_Version_value, data)
return lbryschema.Metadata_Version(val), err
case reflect.TypeOf(lbryschema.Metadata_Language(0)):
val, err := getEnumVal(lbryschema.Metadata_Language_value, data)
return lbryschema.Metadata_Language(val), err
case reflect.TypeOf(lbryschema.Stream_Version(0)):
val, err := getEnumVal(lbryschema.Stream_Version_value, data)
return lbryschema.Stream_Version(val), err
case reflect.TypeOf(lbryschema.Claim_Version(0)):
val, err := getEnumVal(lbryschema.Claim_Version_value, data)
return lbryschema.Claim_Version(val), err
case reflect.TypeOf(lbryschema.Claim_ClaimType(0)):
val, err := getEnumVal(lbryschema.Claim_ClaimType_value, data)
return lbryschema.Claim_ClaimType(val), err
case reflect.TypeOf(lbryschema.Fee_Version(0)):
val, err := getEnumVal(lbryschema.Fee_Version_value, data)
return lbryschema.Fee_Version(val), err
case reflect.TypeOf(lbryschema.Fee_Currency(0)):
val, err := getEnumVal(lbryschema.Fee_Currency_value, data)
return lbryschema.Fee_Currency(val), err
case reflect.TypeOf(lbryschema.Source_Version(0)):
val, err := getEnumVal(lbryschema.Source_Version_value, data)
return lbryschema.Source_Version(val), err
case reflect.TypeOf(lbryschema.Source_SourceTypes(0)):
val, err := getEnumVal(lbryschema.Source_SourceTypes_value, data)
return lbryschema.Source_SourceTypes(val), err
}
return data, nil
}
type CommandsResponse []string
type StatusResponse struct {
BlockchainStatus struct {
BestBlockhash string `json:"best_blockhash"`
Blocks int `json:"blocks"`
BlocksBehind int `json:"blocks_behind"`
} `json:"blockchain_status"`
BlocksBehind int `json:"blocks_behind"`
ConnectionStatus struct {
Code string `json:"code"`
Message string `json:"message"`
} `json:"connection_status"`
InstallationID string `json:"installation_id"`
IsFirstRun bool `json:"is_first_run"`
IsRunning bool `json:"is_running"`
LbryID string `json:"lbry_id"`
StartupStatus struct {
Code string `json:"code"`
Message string `json:"message"`
} `json:"startup_status"`
}
type GetResponse struct {
ClaimID string `json:"claim_id"`
Completed bool `json:"completed"`
DownloadDirectory string `json:"download_directory"`
DownloadPath string `json:"download_path"`
FileName string `json:"file_name"`
Key string `json:"key"`
Message string `json:"message"`
Metadata *lbryschema.Claim `json:"metadata"`
MimeType string `json:"mime_type"`
Name string `json:"name"`
Outpoint string `json:"outpoint"`
PointsPaid float64 `json:"points_paid"`
SdHash string `json:"sd_hash"`
Stopped bool `json:"stopped"`
StreamHash string `json:"stream_hash"`
StreamName string `json:"stream_name"`
SuggestedFileName string `json:"suggested_file_name"`
TotalBytes uint64 `json:"total_bytes"`
WrittenBytes uint64 `json:"written_bytes"`
}

8
jsonrpc/daemon_test.go Normal file
View file

@ -0,0 +1,8 @@
package jsonrpc
import (
"testing"
)
func TestStatus(t *testing.T) {
}

38
main.go
View file

@ -1,41 +1,25 @@
package main
import (
"fmt"
"github.com/lbryio/lbry.go/dht"
log "github.com/sirupsen/logrus"
"math/rand"
"strconv"
"time"
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/davecgh/go-spew/spew"
log "github.com/sirupsen/logrus"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.Println("Starting...")
port := 49449 // + (rand.Int() % 10)
conn := jsonrpc.NewClient("")
config := dht.NewStandardConfig()
config.Address = "127.0.0.1:" + strconv.Itoa(port)
config.PrimeNodes = []string{
"127.0.0.1:10001",
response, err := conn.Get("one", nil, nil)
if err != nil {
panic(err)
}
spew.Dump(response)
d := dht.New(config)
log.Info("Starting...")
go d.Run()
time.Sleep(5 * time.Second)
for {
peers, err := d.FindNode("012b66fc7052d9a0c8cb563b8ede7662003ba65f425c2661b5c6919d445deeb31469be8b842d6faeea3f2b3ebcaec845")
if err != nil {
time.Sleep(time.Second * 1)
continue
}
fmt.Println("Found peers:", peers)
break
}
time.Sleep(1 * time.Hour)
}