fix wallet refill handler
add slack logging add PoC for channel fetching
This commit is contained in:
parent
06a20b7d5e
commit
efd6a4b620
3 changed files with 60 additions and 8 deletions
|
@ -4,8 +4,14 @@ import (
|
||||||
"github.com/lbryio/lbry.go/errors"
|
"github.com/lbryio/lbry.go/errors"
|
||||||
sync "github.com/lbryio/lbry.go/ytsync"
|
sync "github.com/lbryio/lbry.go/ytsync"
|
||||||
|
|
||||||
|
"fmt"
|
||||||
|
"github.com/lbryio/lbry.go/util"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -20,6 +26,12 @@ func init() {
|
||||||
ytSyncCmd.Flags().BoolVar(&takeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
|
ytSyncCmd.Flags().BoolVar(&takeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
|
||||||
ytSyncCmd.Flags().IntVar(&refill, "refill", 0, "Also add this many credits to the wallet")
|
ytSyncCmd.Flags().IntVar(&refill, "refill", 0, "Also add this many credits to the wallet")
|
||||||
RootCmd.AddCommand(ytSyncCmd)
|
RootCmd.AddCommand(ytSyncCmd)
|
||||||
|
slackToken := os.Getenv("SLACK_TOKEN")
|
||||||
|
if slackToken == "" {
|
||||||
|
log.Error("A slack token was not present in env vars! Slack messages disabled!")
|
||||||
|
} else {
|
||||||
|
util.InitSlack(os.Getenv("SLACK_TOKEN"))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const defaultMaxTries = 3
|
const defaultMaxTries = 3
|
||||||
|
@ -31,6 +43,19 @@ var (
|
||||||
refill int
|
refill int
|
||||||
)
|
)
|
||||||
|
|
||||||
|
//PoC
|
||||||
|
func fetchChannels() {
|
||||||
|
url := "http://localhost:8080/yt/jobs"
|
||||||
|
payload := strings.NewReader("------WebKitFormBoundary7MA4YWxkTrZu0gW\r\nContent-Disposition: form-data; name=\"auth_token\"\r\n\r\n620280\r\n------WebKitFormBoundary7MA4YWxkTrZu0gW--")
|
||||||
|
req, _ := http.NewRequest("POST", url, payload)
|
||||||
|
req.Header.Add("content-type", "multipart/form-data")
|
||||||
|
res, _ := http.DefaultClient.Do(req)
|
||||||
|
defer res.Body.Close()
|
||||||
|
body, _ := ioutil.ReadAll(res.Body)
|
||||||
|
fmt.Println(res)
|
||||||
|
fmt.Println(string(body))
|
||||||
|
}
|
||||||
|
|
||||||
func ytsync(cmd *cobra.Command, args []string) {
|
func ytsync(cmd *cobra.Command, args []string) {
|
||||||
ytAPIKey := args[0]
|
ytAPIKey := args[0]
|
||||||
lbryChannelName := args[1]
|
lbryChannelName := args[1]
|
||||||
|
|
15
util/slack_test.go
Normal file
15
util/slack_test.go
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSendToSlack(t *testing.T) {
|
||||||
|
slackToken := os.Getenv("SLACK_TOKEN")
|
||||||
|
if slackToken == "" {
|
||||||
|
t.Error("A slack token was not provided")
|
||||||
|
}
|
||||||
|
InitSlack(slackToken)
|
||||||
|
SendToSlack("This is a test :)")
|
||||||
|
}
|
|
@ -22,6 +22,8 @@ import (
|
||||||
"github.com/lbryio/lbry.go/ytsync/redisdb"
|
"github.com/lbryio/lbry.go/ytsync/redisdb"
|
||||||
"github.com/lbryio/lbry.go/ytsync/sources"
|
"github.com/lbryio/lbry.go/ytsync/sources"
|
||||||
|
|
||||||
|
"fmt"
|
||||||
|
"github.com/lbryio/lbry.go/util"
|
||||||
"github.com/mitchellh/go-ps"
|
"github.com/mitchellh/go-ps"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"google.golang.org/api/googleapi/transport"
|
"google.golang.org/api/googleapi/transport"
|
||||||
|
@ -103,16 +105,14 @@ func (s *Sync) FullCycle() error {
|
||||||
log.Printf("Stopping daemon")
|
log.Printf("Stopping daemon")
|
||||||
shutdownErr := stopDaemonViaSystemd()
|
shutdownErr := stopDaemonViaSystemd()
|
||||||
if shutdownErr != nil {
|
if shutdownErr != nil {
|
||||||
log.Errorf("error shutting down daemon: %v", shutdownErr)
|
logShutdownError(shutdownErr)
|
||||||
log.Errorf("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
|
|
||||||
} else {
|
} else {
|
||||||
// the cli will return long before the daemon effectively stops. we must observe the processes running
|
// the cli will return long before the daemon effectively stops. we must observe the processes running
|
||||||
// before moving the wallet
|
// before moving the wallet
|
||||||
var waitTimeout time.Duration = 60 * 6
|
var waitTimeout time.Duration = 60 * 6
|
||||||
processDeathError := waitForDaemonProcess(waitTimeout)
|
processDeathError := waitForDaemonProcess(waitTimeout)
|
||||||
if processDeathError != nil {
|
if processDeathError != nil {
|
||||||
log.Errorf("error shutdown down daemon: %v", processDeathError)
|
logShutdownError(processDeathError)
|
||||||
log.Errorf("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
|
|
||||||
} else {
|
} else {
|
||||||
walletErr := os.Rename(defaultWalletDir, walletBackupDir)
|
walletErr := os.Rename(defaultWalletDir, walletBackupDir)
|
||||||
if walletErr != nil {
|
if walletErr != nil {
|
||||||
|
@ -175,6 +175,13 @@ WaitForDaemonStart:
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
func logShutdownError(shutdownErr error) {
|
||||||
|
errMsg := fmt.Sprintf("error shutting down daemon: %v", shutdownErr)
|
||||||
|
log.Errorf(errMsg)
|
||||||
|
util.SendToSlack(errMsg)
|
||||||
|
log.Errorf("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
|
||||||
|
util.SendToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Sync) doSync() error {
|
func (s *Sync) doSync() error {
|
||||||
var err error
|
var err error
|
||||||
|
@ -235,7 +242,9 @@ func (s *Sync) startWorker(workerNum int) {
|
||||||
err := s.processVideo(v)
|
err := s.processVideo(v)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorln("error processing video: " + err.Error())
|
logMsg := fmt.Sprintf("error processing video: " + err.Error())
|
||||||
|
log.Errorln(logMsg)
|
||||||
|
|
||||||
if s.StopOnError {
|
if s.StopOnError {
|
||||||
s.grp.Stop()
|
s.grp.Stop()
|
||||||
} else if s.MaxTries > 1 {
|
} else if s.MaxTries > 1 {
|
||||||
|
@ -247,9 +256,10 @@ func (s *Sync) startWorker(workerNum int) {
|
||||||
strings.Contains(err.Error(), "Playback on other websites has been disabled by the video owner") {
|
strings.Contains(err.Error(), "Playback on other websites has been disabled by the video owner") {
|
||||||
log.Println("This error should not be retried at all")
|
log.Println("This error should not be retried at all")
|
||||||
} else if tryCount < s.MaxTries {
|
} else if tryCount < s.MaxTries {
|
||||||
if strings.Contains(err.Error(), "The transaction was rejected by network rules.(258: txn-mempool-conflict)") {
|
if strings.Contains(err.Error(), "The transaction was rejected by network rules.(258: txn-mempool-conflict)") ||
|
||||||
|
strings.Contains(err.Error(), "failed: Not enough funds") {
|
||||||
log.Println("waiting for a block and refilling addresses before retrying")
|
log.Println("waiting for a block and refilling addresses before retrying")
|
||||||
err = s.ensureEnoughUTXOs()
|
err = s.walletSetup()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err.Error())
|
log.Println(err.Error())
|
||||||
}
|
}
|
||||||
|
@ -257,7 +267,9 @@ func (s *Sync) startWorker(workerNum int) {
|
||||||
log.Println("Retrying")
|
log.Println("Retrying")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Printf("Video failed after %d retries, skipping", s.MaxTries)
|
logMsg = fmt.Sprintf("Video failed after %d retries, skipping. Stack: %s", s.MaxTries, logMsg)
|
||||||
|
log.Printf(logMsg)
|
||||||
|
util.SendToSlack(logMsg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
|
|
Loading…
Reference in a new issue