diff --git a/jsonrpc/daemon.go b/jsonrpc/daemon.go index 4bd41d1..128f05b 100644 --- a/jsonrpc/daemon.go +++ b/jsonrpc/daemon.go @@ -5,9 +5,12 @@ import ( "errors" "strconv" + "fmt" "github.com/mitchellh/mapstructure" log "github.com/sirupsen/logrus" "github.com/ybbus/jsonrpc" + "reflect" + "strings" ) const DefaultPort = 5279 @@ -45,8 +48,20 @@ func decode(data interface{}, targetStruct interface{}) error { return decoder.Decode(data) } +func debugParams(params map[string]interface{}) string { + var s []string + for k, v := range params { + r := reflect.ValueOf(v) + if r.Kind() == reflect.Ptr && r.IsNil() { + continue + } + s = append(s, fmt.Sprintf("%s=%+v", k, v)) + } + return strings.Join(s, " ") +} + func (d *Client) callNoDecode(command string, params map[string]interface{}) (interface{}, error) { - log.Debugln("Calling " + command) + log.Debugln("jsonrpc: " + command + " " + debugParams(params)) r, err := d.conn.CallNamed(command, params) if err != nil { return nil, err diff --git a/main.go b/main.go index 4f87f54..eb009c6 100644 --- a/main.go +++ b/main.go @@ -1,9 +1,8 @@ package main import ( - "flag" - "os" "strconv" + "sync" "time" "github.com/lbryio/lbry.go/jsonrpc" @@ -12,105 +11,166 @@ import ( log "github.com/sirupsen/logrus" ) -const maxPrice = float64(10) -const waitForStart = 5 * time.Second -const waitForEnd = 3 * time.Minute +const ( + maxPrice = float64(999) + waitForStart = 5 * time.Second + waitForEnd = 60 * time.Minute + maxParallelTests = 5 +) + +type Result struct { + started bool + finished bool +} func main() { log.SetLevel(log.DebugLevel) - flag.Parse() - name := flag.Arg(0) - if name == "" { - log.Errorln("Usage: " + os.Args[0] + " URL") - return - } conn := jsonrpc.NewClient("") - log.Println("Starting...") - err := testUri(conn, name) - if err != nil { - panic(err) + var wg sync.WaitGroup + queue := make(chan string) + + var mutex sync.Mutex + results := map[string]Result{} + + for i := 0; i < maxParallelTests; i++ { + go func() { + wg.Add(1) + defer wg.Done() + for { + name, more := <-queue + if !more { + return + } + + res, err := testUri(conn, name) + mutex.Lock() + results[name] = res + mutex.Unlock() + if err != nil { + log.Errorln(name + ": " + err.Error()) + } + } + }() } - /* - todo: - - check multiple names in parallel - - limit how many parallel checks run - - within each name, check if its done every 20 seconds or so. no need to wait a fixed amount of time - - but set a max limit on how much to wait (based on filesize?) - - report aggregate stats to slack - */ + names := []string{"one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"} + for _, name := range names { + queue <- name + } + close(queue) + + wg.Wait() + + countStarted := 0 + countFinished := 0 + for _, r := range results { + if r.started { + countStarted++ + } + if r.finished { + countFinished++ + } + } + + log.Println("Started: " + strconv.Itoa(countStarted) + " of " + strconv.Itoa(len(results))) + log.Println("Finished: " + strconv.Itoa(countFinished) + " of " + strconv.Itoa(len(results))) } -func testUri(conn *jsonrpc.Client, url string) error { - log.Infoln("Testing " + url) +func testUri(conn *jsonrpc.Client, url string) (Result, error) { + log.Infoln(url + ": Starting") + + result := Result{} price, err := conn.StreamCostEstimate(url, nil) if err != nil { - return err + return result, err } if price == nil { - return errors.New("could not get price of " + url) + return result, errors.New("could not get price of " + url) } if float64(*price) > maxPrice { - return errors.New("the price of " + url + " is too damn high") + return result, errors.New("the price of " + url + " is too damn high") } startTime := time.Now() get, err := conn.Get(url, nil, nil) if err != nil { - return err + return result, err } else if get == nil { - return errors.New("received no response for 'get' of " + url) + return result, errors.New("received no response for 'get' of " + url) } if get.Completed { - log.Infoln("cannot test " + url + " because we already have it") - return nil + log.Infoln(url + ": cannot test because we already have it") + return result, nil } - getDuration := time.Since(startTime) + log.Infoln(url + ": get took " + time.Since(startTime).String()) - log.Infoln("'get' for " + url + " took " + getDuration.String()) - - log.Infoln("waiting " + waitForStart.String() + " to see if " + url + " starts") + log.Infoln(url + ": waiting " + waitForStart.String() + " to see if it starts") time.Sleep(waitForStart) fileStartedResult, err := conn.FileList(jsonrpc.FileListOptions{Outpoint: &get.Outpoint}) if err != nil { - return err + return result, err } if fileStartedResult == nil || len(*fileStartedResult) < 1 { - log.Errorln(url + " failed to start in " + waitForStart.String()) + log.Errorln(url + ": failed to start in " + waitForStart.String()) } else if (*fileStartedResult)[0].Completed { - log.Errorln(url + " already finished after " + waitForStart.String() + ". boom!") + log.Infoln(url + ": already finished after " + waitForStart.String() + ". boom!") + result.started = true + result.finished = true + return result, nil } else if (*fileStartedResult)[0].WrittenBytes == 0 { - log.Errorln(url + " says it started, but has 0 bytes downloaded after " + waitForStart.String()) + log.Errorln(url + ": says it started, but has 0 bytes downloaded after " + waitForStart.String()) } else { - log.Infoln(url + " started, with " + strconv.FormatUint((*fileStartedResult)[0].WrittenBytes, 10) + " bytes downloaded") + log.Infoln(url + ": started, with " + strconv.FormatUint((*fileStartedResult)[0].WrittenBytes, 10) + " bytes downloaded") + result.started = true } - log.Infoln("waiting " + waitForEnd.String() + " for file to finish") + log.Infoln(url + ": waiting up to " + waitForEnd.String() + " for file to finish") - time.Sleep(waitForEnd) + var fileFinishedResult *jsonrpc.FileListResponse + ticker := time.NewTicker(15 * time.Second) + // todo: timeout should be based on file size + timeout := time.After(waitForEnd) - fileFinishedResult, err := conn.FileList(jsonrpc.FileListOptions{Outpoint: &get.Outpoint}) - if err != nil { - return err +WaitForFinish: + for { + select { + case <-ticker.C: + fileFinishedResult, err = conn.FileList(jsonrpc.FileListOptions{Outpoint: &get.Outpoint}) + if err != nil { + return result, err + } + if fileFinishedResult != nil && len(*fileFinishedResult) > 0 { + if (*fileFinishedResult)[0].Completed { + ticker.Stop() + break WaitForFinish + } else { + log.Infoln(url + ": " + strconv.FormatUint((*fileFinishedResult)[0].WrittenBytes, 10) + " bytes downloaded after " + time.Since(startTime).String()) + } + } + case <-timeout: + ticker.Stop() + break WaitForFinish + } } if fileFinishedResult == nil || len(*fileFinishedResult) < 1 { - log.Errorln(url + " failed to start at all") + log.Errorln(url + ": failed to start at all") } else if !(*fileFinishedResult)[0].Completed { - log.Errorln(url + " says it started, but has not finished after " + waitForEnd.String() + " (" + strconv.FormatUint((*fileFinishedResult)[0].WrittenBytes, 10) + " bytes written)") + log.Errorln(url + ": says it started, but has not finished after " + waitForEnd.String() + " (" + strconv.FormatUint((*fileFinishedResult)[0].WrittenBytes, 10) + " bytes written)") } else { - log.Infoln(url + " finished, with " + strconv.FormatUint((*fileFinishedResult)[0].WrittenBytes, 10) + " bytes downloaded") + log.Infoln(url + ": finished after " + time.Since(startTime).String() + " , with " + strconv.FormatUint((*fileFinishedResult)[0].WrittenBytes, 10) + " bytes downloaded") + result.finished = true } - return nil + return result, nil }