test multiple names in parallel
This commit is contained in:
parent
c24db1580a
commit
035fb753c9
2 changed files with 126 additions and 51 deletions
|
@ -5,9 +5,12 @@ import (
|
|||
"errors"
|
||||
"strconv"
|
||||
|
||||
"fmt"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/ybbus/jsonrpc"
|
||||
"reflect"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const DefaultPort = 5279
|
||||
|
@ -45,8 +48,20 @@ func decode(data interface{}, targetStruct interface{}) error {
|
|||
return decoder.Decode(data)
|
||||
}
|
||||
|
||||
func debugParams(params map[string]interface{}) string {
|
||||
var s []string
|
||||
for k, v := range params {
|
||||
r := reflect.ValueOf(v)
|
||||
if r.Kind() == reflect.Ptr && r.IsNil() {
|
||||
continue
|
||||
}
|
||||
s = append(s, fmt.Sprintf("%s=%+v", k, v))
|
||||
}
|
||||
return strings.Join(s, " ")
|
||||
}
|
||||
|
||||
func (d *Client) callNoDecode(command string, params map[string]interface{}) (interface{}, error) {
|
||||
log.Debugln("Calling " + command)
|
||||
log.Debugln("jsonrpc: " + command + " " + debugParams(params))
|
||||
r, err := d.conn.CallNamed(command, params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
156
main.go
156
main.go
|
@ -1,9 +1,8 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/jsonrpc"
|
||||
|
@ -12,105 +11,166 @@ import (
|
|||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const maxPrice = float64(10)
|
||||
const waitForStart = 5 * time.Second
|
||||
const waitForEnd = 3 * time.Minute
|
||||
const (
|
||||
maxPrice = float64(999)
|
||||
waitForStart = 5 * time.Second
|
||||
waitForEnd = 60 * time.Minute
|
||||
maxParallelTests = 5
|
||||
)
|
||||
|
||||
type Result struct {
|
||||
started bool
|
||||
finished bool
|
||||
}
|
||||
|
||||
func main() {
|
||||
log.SetLevel(log.DebugLevel)
|
||||
flag.Parse()
|
||||
name := flag.Arg(0)
|
||||
if name == "" {
|
||||
log.Errorln("Usage: " + os.Args[0] + " URL")
|
||||
|
||||
conn := jsonrpc.NewClient("")
|
||||
|
||||
var wg sync.WaitGroup
|
||||
queue := make(chan string)
|
||||
|
||||
var mutex sync.Mutex
|
||||
results := map[string]Result{}
|
||||
|
||||
for i := 0; i < maxParallelTests; i++ {
|
||||
go func() {
|
||||
wg.Add(1)
|
||||
defer wg.Done()
|
||||
for {
|
||||
name, more := <-queue
|
||||
if !more {
|
||||
return
|
||||
}
|
||||
|
||||
conn := jsonrpc.NewClient("")
|
||||
log.Println("Starting...")
|
||||
|
||||
err := testUri(conn, name)
|
||||
res, err := testUri(conn, name)
|
||||
mutex.Lock()
|
||||
results[name] = res
|
||||
mutex.Unlock()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
log.Errorln(name + ": " + err.Error())
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
/*
|
||||
todo:
|
||||
- check multiple names in parallel
|
||||
- limit how many parallel checks run
|
||||
- within each name, check if its done every 20 seconds or so. no need to wait a fixed amount of time
|
||||
- but set a max limit on how much to wait (based on filesize?)
|
||||
- report aggregate stats to slack
|
||||
*/
|
||||
names := []string{"one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"}
|
||||
for _, name := range names {
|
||||
queue <- name
|
||||
}
|
||||
close(queue)
|
||||
|
||||
wg.Wait()
|
||||
|
||||
countStarted := 0
|
||||
countFinished := 0
|
||||
for _, r := range results {
|
||||
if r.started {
|
||||
countStarted++
|
||||
}
|
||||
if r.finished {
|
||||
countFinished++
|
||||
}
|
||||
}
|
||||
|
||||
log.Println("Started: " + strconv.Itoa(countStarted) + " of " + strconv.Itoa(len(results)))
|
||||
log.Println("Finished: " + strconv.Itoa(countFinished) + " of " + strconv.Itoa(len(results)))
|
||||
}
|
||||
|
||||
func testUri(conn *jsonrpc.Client, url string) error {
|
||||
log.Infoln("Testing " + url)
|
||||
func testUri(conn *jsonrpc.Client, url string) (Result, error) {
|
||||
log.Infoln(url + ": Starting")
|
||||
|
||||
result := Result{}
|
||||
|
||||
price, err := conn.StreamCostEstimate(url, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
return result, err
|
||||
}
|
||||
|
||||
if price == nil {
|
||||
return errors.New("could not get price of " + url)
|
||||
return result, errors.New("could not get price of " + url)
|
||||
}
|
||||
|
||||
if float64(*price) > maxPrice {
|
||||
return errors.New("the price of " + url + " is too damn high")
|
||||
return result, errors.New("the price of " + url + " is too damn high")
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
get, err := conn.Get(url, nil, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
return result, err
|
||||
} else if get == nil {
|
||||
return errors.New("received no response for 'get' of " + url)
|
||||
return result, errors.New("received no response for 'get' of " + url)
|
||||
}
|
||||
|
||||
if get.Completed {
|
||||
log.Infoln("cannot test " + url + " because we already have it")
|
||||
return nil
|
||||
log.Infoln(url + ": cannot test because we already have it")
|
||||
return result, nil
|
||||
}
|
||||
|
||||
getDuration := time.Since(startTime)
|
||||
log.Infoln(url + ": get took " + time.Since(startTime).String())
|
||||
|
||||
log.Infoln("'get' for " + url + " took " + getDuration.String())
|
||||
|
||||
log.Infoln("waiting " + waitForStart.String() + " to see if " + url + " starts")
|
||||
log.Infoln(url + ": waiting " + waitForStart.String() + " to see if it starts")
|
||||
|
||||
time.Sleep(waitForStart)
|
||||
|
||||
fileStartedResult, err := conn.FileList(jsonrpc.FileListOptions{Outpoint: &get.Outpoint})
|
||||
if err != nil {
|
||||
return err
|
||||
return result, err
|
||||
}
|
||||
|
||||
if fileStartedResult == nil || len(*fileStartedResult) < 1 {
|
||||
log.Errorln(url + " failed to start in " + waitForStart.String())
|
||||
log.Errorln(url + ": failed to start in " + waitForStart.String())
|
||||
} else if (*fileStartedResult)[0].Completed {
|
||||
log.Errorln(url + " already finished after " + waitForStart.String() + ". boom!")
|
||||
log.Infoln(url + ": already finished after " + waitForStart.String() + ". boom!")
|
||||
result.started = true
|
||||
result.finished = true
|
||||
return result, nil
|
||||
} else if (*fileStartedResult)[0].WrittenBytes == 0 {
|
||||
log.Errorln(url + " says it started, but has 0 bytes downloaded after " + waitForStart.String())
|
||||
log.Errorln(url + ": says it started, but has 0 bytes downloaded after " + waitForStart.String())
|
||||
} else {
|
||||
log.Infoln(url + " started, with " + strconv.FormatUint((*fileStartedResult)[0].WrittenBytes, 10) + " bytes downloaded")
|
||||
log.Infoln(url + ": started, with " + strconv.FormatUint((*fileStartedResult)[0].WrittenBytes, 10) + " bytes downloaded")
|
||||
result.started = true
|
||||
}
|
||||
|
||||
log.Infoln("waiting " + waitForEnd.String() + " for file to finish")
|
||||
log.Infoln(url + ": waiting up to " + waitForEnd.String() + " for file to finish")
|
||||
|
||||
time.Sleep(waitForEnd)
|
||||
var fileFinishedResult *jsonrpc.FileListResponse
|
||||
ticker := time.NewTicker(15 * time.Second)
|
||||
// todo: timeout should be based on file size
|
||||
timeout := time.After(waitForEnd)
|
||||
|
||||
fileFinishedResult, err := conn.FileList(jsonrpc.FileListOptions{Outpoint: &get.Outpoint})
|
||||
WaitForFinish:
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
fileFinishedResult, err = conn.FileList(jsonrpc.FileListOptions{Outpoint: &get.Outpoint})
|
||||
if err != nil {
|
||||
return err
|
||||
return result, err
|
||||
}
|
||||
if fileFinishedResult != nil && len(*fileFinishedResult) > 0 {
|
||||
if (*fileFinishedResult)[0].Completed {
|
||||
ticker.Stop()
|
||||
break WaitForFinish
|
||||
} else {
|
||||
log.Infoln(url + ": " + strconv.FormatUint((*fileFinishedResult)[0].WrittenBytes, 10) + " bytes downloaded after " + time.Since(startTime).String())
|
||||
}
|
||||
}
|
||||
case <-timeout:
|
||||
ticker.Stop()
|
||||
break WaitForFinish
|
||||
}
|
||||
}
|
||||
|
||||
if fileFinishedResult == nil || len(*fileFinishedResult) < 1 {
|
||||
log.Errorln(url + " failed to start at all")
|
||||
log.Errorln(url + ": failed to start at all")
|
||||
} else if !(*fileFinishedResult)[0].Completed {
|
||||
log.Errorln(url + " says it started, but has not finished after " + waitForEnd.String() + " (" + strconv.FormatUint((*fileFinishedResult)[0].WrittenBytes, 10) + " bytes written)")
|
||||
log.Errorln(url + ": says it started, but has not finished after " + waitForEnd.String() + " (" + strconv.FormatUint((*fileFinishedResult)[0].WrittenBytes, 10) + " bytes written)")
|
||||
} else {
|
||||
log.Infoln(url + " finished, with " + strconv.FormatUint((*fileFinishedResult)[0].WrittenBytes, 10) + " bytes downloaded")
|
||||
log.Infoln(url + ": finished after " + time.Since(startTime).String() + " , with " + strconv.FormatUint((*fileFinishedResult)[0].WrittenBytes, 10) + " bytes downloaded")
|
||||
result.finished = true
|
||||
}
|
||||
|
||||
return nil
|
||||
return result, nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue