Merge branch 'automation-support'

* automation-support: (31 commits)
  address review comments
  rename selfsync to ytsync
  remove invalid tests
  remove common.go
  update dependencies
  resolve rebase conflicts
  remove launch parameter
  fix slack logging
  resolve conflict changes in rebase
  improve update process
  supply hostname for job listing
  add concurrency support
  add datetime boundaries
  fix selfsync issues with the queue
  move params to env vars
  refactor selfsync process
  improve continuos self sync
  add clean interruption
  add support for channel updates
  add regtest support
  ...
This commit is contained in:
Alex Grintsvayg 2018-07-25 10:37:25 -04:00
commit c72525e2c3
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
10 changed files with 644 additions and 140 deletions

View file

@ -1,27 +1,18 @@
package cmd package cmd
import ( import (
"github.com/lbryio/lbry.go/errors" "os"
sync "github.com/lbryio/lbry.go/ytsync"
"time"
"os/user"
"github.com/lbryio/lbry.go/util"
sync "github.com/lbryio/lbry.go/ytsync"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
func init() {
var ytSyncCmd = &cobra.Command{
Use: "ytsync <youtube_api_key> <lbry_channel_name> [<youtube_channel_id>]",
Args: cobra.RangeArgs(2, 3),
Short: "Publish youtube channel into LBRY network.",
Run: ytsync,
}
ytSyncCmd.Flags().BoolVar(&stopOnError, "stop-on-error", false, "If a publish fails, stop all publishing and exit")
ytSyncCmd.Flags().IntVar(&maxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails")
ytSyncCmd.Flags().BoolVar(&takeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
ytSyncCmd.Flags().IntVar(&refill, "refill", 0, "Also add this many credits to the wallet")
RootCmd.AddCommand(ytSyncCmd)
}
const defaultMaxTries = 3 const defaultMaxTries = 3
var ( var (
@ -29,19 +20,56 @@ var (
maxTries int maxTries int
takeOverExistingChannel bool takeOverExistingChannel bool
refill int refill int
limit int
skipSpaceCheck bool
syncUpdate bool
syncStatus string
channelID string
syncFrom int64
syncUntil int64
concurrentJobs int
) )
func ytsync(cmd *cobra.Command, args []string) { func init() {
ytAPIKey := args[0] var ytSyncCmd = &cobra.Command{
lbryChannelName := args[1] Use: "ytsync",
if string(lbryChannelName[0]) != "@" { Args: cobra.RangeArgs(0, 0),
log.Errorln("LBRY channel name must start with an @") Short: "Publish youtube channels into LBRY network automatically.",
return Run: ytSync,
}
ytSyncCmd.Flags().BoolVar(&stopOnError, "stop-on-error", false, "If a publish fails, stop all publishing and exit")
ytSyncCmd.Flags().IntVar(&maxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails")
ytSyncCmd.Flags().BoolVar(&takeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
ytSyncCmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync")
ytSyncCmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
ytSyncCmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones (short for --status synced)")
ytSyncCmd.Flags().StringVar(&syncStatus, "status", sync.StatusQueued, "Specify which queue to pull from. Overrides --update (Default: queued)")
ytSyncCmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.")
ytSyncCmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
ytSyncCmd.Flags().Int64Var(&syncUntil, "before", time.Now().Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)")
ytSyncCmd.Flags().IntVar(&concurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently (Default: 1)")
RootCmd.AddCommand(ytSyncCmd)
}
func ytSync(cmd *cobra.Command, args []string) {
var hostname string
slackToken := os.Getenv("SLACK_TOKEN")
if slackToken == "" {
log.Error("A slack token was not present in env vars! Slack messages disabled!")
} else {
var err error
hostname, err = os.Hostname()
if err != nil {
log.Error("could not detect system hostname")
hostname = "ytsync-unknown"
}
util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname)
} }
channelID := "" if !util.InSlice(syncStatus, sync.SyncStatuses) {
if len(args) > 2 { log.Errorf("status must be one of the following: %v\n", sync.SyncStatuses)
channelID = args[2] return
} }
if stopOnError && maxTries != defaultMaxTries { if stopOnError && maxTries != defaultMaxTries {
@ -53,20 +81,60 @@ func ytsync(cmd *cobra.Command, args []string) {
return return
} }
s := sync.Sync{ if limit < 0 {
YoutubeAPIKey: ytAPIKey, log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense")
YoutubeChannelID: channelID, return
LbryChannelName: lbryChannelName, }
apiURL := os.Getenv("LBRY_API")
apiToken := os.Getenv("LBRY_API_TOKEN")
youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY")
blobsDir := os.Getenv("BLOBS_DIRECTORY")
if apiURL == "" {
log.Errorln("An API URL was not defined. Please set the environment variable LBRY_API")
return
}
if apiToken == "" {
log.Errorln("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN")
return
}
if youtubeAPIKey == "" {
log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY")
return
}
if blobsDir == "" {
usr, err := user.Current()
if err != nil {
log.Errorln(err.Error())
return
}
blobsDir = usr.HomeDir + "/.lbrynet/blobfiles/"
}
sm := sync.SyncManager{
StopOnError: stopOnError, StopOnError: stopOnError,
MaxTries: maxTries, MaxTries: maxTries,
ConcurrentVideos: 1,
TakeOverExistingChannel: takeOverExistingChannel, TakeOverExistingChannel: takeOverExistingChannel,
Refill: refill, Refill: refill,
Limit: limit,
SkipSpaceCheck: skipSpaceCheck,
SyncUpdate: syncUpdate,
SyncStatus: syncStatus,
SyncFrom: syncFrom,
SyncUntil: syncUntil,
ConcurrentJobs: concurrentJobs,
ConcurrentVideos: concurrentJobs,
HostName: hostname,
YoutubeChannelID: channelID,
YoutubeAPIKey: youtubeAPIKey,
ApiURL: apiURL,
ApiToken: apiToken,
BlobsDir: blobsDir,
} }
err := s.FullCycle() err := sm.Start()
if err != nil { if err != nil {
log.Error(errors.FullTrace(err)) sync.SendErrorToSlack(err.Error())
} }
sync.SendInfoToSlack("Syncing process terminated!")
} }

View file

@ -144,6 +144,9 @@ func getLbrycrdURLFromConfFile() (string, error) {
} }
defaultConfFile := os.Getenv("HOME") + "/.lbrycrd/lbrycrd.conf" defaultConfFile := os.Getenv("HOME") + "/.lbrycrd/lbrycrd.conf"
if os.Getenv("REGTEST") == "true" {
defaultConfFile = os.Getenv("HOME") + "/.lbrycrd_regtest/lbrycrd.conf"
}
if _, err := os.Stat(defaultConfFile); os.IsNotExist(err) { if _, err := os.Stat(defaultConfFile); os.IsNotExist(err) {
return "", errors.Err("default lbrycrd conf file not found") return "", errors.Err("default lbrycrd conf file not found")
} }

View file

@ -1,6 +1,7 @@
package util package util
import ( import (
"fmt"
"strings" "strings"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
@ -21,7 +22,11 @@ func InitSlack(token string, channel string, username string) {
} }
// SendToSlackUser Sends message to a specific user. // SendToSlackUser Sends message to a specific user.
func SendToSlackUser(user, username, message string) error { func SendToSlackUser(user, username, format string, a ...interface{}) error {
message := format
if len(a) > 0 {
message = fmt.Sprintf(format, a...)
}
if !strings.HasPrefix(user, "@") { if !strings.HasPrefix(user, "@") {
user = "@" + user user = "@" + user
} }
@ -29,7 +34,11 @@ func SendToSlackUser(user, username, message string) error {
} }
// SendToSlackChannel Sends message to a specific channel. // SendToSlackChannel Sends message to a specific channel.
func SendToSlackChannel(channel, username, message string) error { func SendToSlackChannel(channel, username, format string, a ...interface{}) error {
message := format
if len(a) > 0 {
message = fmt.Sprintf(format, a...)
}
if !strings.HasPrefix(channel, "#") { if !strings.HasPrefix(channel, "#") {
channel = "#" + channel channel = "#" + channel
} }
@ -37,8 +46,11 @@ func SendToSlackChannel(channel, username, message string) error {
} }
// SendToSlack Sends message to the default channel. // SendToSlack Sends message to the default channel.
func SendToSlack(message string) error { func SendToSlack(format string, a ...interface{}) error {
message := format
if len(a) > 0 {
message = fmt.Sprintf(format, a...)
}
if defaultChannel == "" { if defaultChannel == "" {
return errors.Err("no default slack channel set") return errors.Err("no default slack channel set")
} }

View file

@ -1,5 +1,7 @@
package util package util
import "strings"
func InSlice(str string, values []string) bool { func InSlice(str string, values []string) bool {
for _, v := range values { for _, v := range values {
if str == v { if str == v {
@ -8,3 +10,13 @@ func InSlice(str string, values []string) bool {
} }
return false return false
} }
// SubstringInSlice returns true if str is contained within any element of the values slice. False otherwise
func SubstringInSlice(str string, values []string) bool {
for _, v := range values {
if strings.Contains(str, v) {
return true
}
}
return false
}

295
ytsync/manager.go Normal file
View file

@ -0,0 +1,295 @@
package ytsync
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"
"syscall"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/null"
"github.com/lbryio/lbry.go/util"
log "github.com/sirupsen/logrus"
)
type SyncManager struct {
StopOnError bool
MaxTries int
TakeOverExistingChannel bool
Refill int
Limit int
SkipSpaceCheck bool
SyncUpdate bool
SyncStatus string
SyncFrom int64
SyncUntil int64
ConcurrentJobs int
ConcurrentVideos int
HostName string
YoutubeChannelID string
YoutubeAPIKey string
ApiURL string
ApiToken string
BlobsDir string
}
const (
StatusPending = "pending" // waiting for permission to sync
StatusQueued = "queued" // in sync queue. will be synced soon
StatusSyncing = "syncing" // syncing now
StatusSynced = "synced" // done
StatusFailed = "failed"
)
var SyncStatuses = []string{StatusPending, StatusQueued, StatusSyncing, StatusSynced, StatusFailed}
type apiJobsResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data []apiYoutubeChannel `json:"data"`
}
type apiYoutubeChannel struct {
ChannelId string `json:"channel_id"`
TotalVideos uint `json:"total_videos"`
DesiredChannelName string `json:"desired_channel_name"`
SyncServer null.String `json:"sync_server"`
}
func (s SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
endpoint := s.ApiURL + "/yt/jobs"
res, _ := http.PostForm(endpoint, url.Values{
"auth_token": {s.ApiToken},
"sync_status": {status},
"min_videos": {strconv.Itoa(1)},
"after": {strconv.Itoa(int(s.SyncFrom))},
"before": {strconv.Itoa(int(s.SyncUntil))},
//"sync_server": {s.HostName},
"channel_id": {s.YoutubeChannelID},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiJobsResponse
err := json.Unmarshal(body, &response)
if err != nil {
return nil, err
}
if response.Data == nil {
return nil, errors.Err(response.Error)
}
log.Printf("Fetched channels: %d", len(response.Data))
return response.Data, nil
}
type apiSyncUpdateResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data null.String `json:"data"`
}
func (s SyncManager) setChannelSyncStatus(channelID string, status string) error {
endpoint := s.ApiURL + "/yt/sync_update"
res, _ := http.PostForm(endpoint, url.Values{
"channel_id": {channelID},
"sync_server": {s.HostName},
"auth_token": {s.ApiToken},
"sync_status": {status},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiSyncUpdateResponse
err := json.Unmarshal(body, &response)
if err != nil {
return err
}
if !response.Error.IsNull() {
return errors.Err(response.Error.String)
}
if !response.Data.IsNull() && response.Data.String == "ok" {
return nil
}
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
const (
VideoStatusPublished = "published"
VideoSStatusFailed = "failed"
)
func (s SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, details string) error {
endpoint := s.ApiURL + "/yt/track_video"
vals := url.Values{
"youtube_channel_id": {channelID},
"youtube_video_id": {videoID},
"status": {status},
"auth_token": {s.ApiToken},
}
if status == VideoStatusPublished {
if claimID == "" || claimName == "" {
return errors.Err("claimID or claimName missing")
}
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
vals.Add("claim_id", claimID)
vals.Add("claim_name", claimName)
}
if details != "" {
vals.Add("details", details)
}
res, _ := http.PostForm(endpoint, vals)
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiSyncUpdateResponse
err := json.Unmarshal(body, &response)
if err != nil {
return err
}
if !response.Error.IsNull() {
return errors.Err(response.Error.String)
}
if !response.Data.IsNull() && response.Data.String == "ok" {
return nil
}
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
func (s SyncManager) Start() error {
syncCount := 0
for {
err := s.checkUsedSpace()
if err != nil {
return err
}
var syncs []Sync
shouldInterruptLoop := false
isSingleChannelSync := s.YoutubeChannelID != ""
if isSingleChannelSync {
channels, err := s.fetchChannels("")
if err != nil {
return err
}
if len(channels) != 1 {
return errors.Err("Expected 1 channel, %d returned", len(channels))
}
lbryChannelName := channels[0].DesiredChannelName
if !s.isWorthProcessing(channels[0]) {
break
}
syncs = make([]Sync, 1)
syncs[0] = Sync{
YoutubeAPIKey: s.YoutubeAPIKey,
YoutubeChannelID: s.YoutubeChannelID,
LbryChannelName: lbryChannelName,
StopOnError: s.StopOnError,
MaxTries: s.MaxTries,
ConcurrentVideos: s.ConcurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill,
Manager: &s,
}
shouldInterruptLoop = true
} else {
var queuesToSync []string
if s.SyncStatus != "" {
queuesToSync = append(queuesToSync, s.SyncStatus)
} else if s.SyncUpdate {
queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced)
} else {
queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued)
}
for _, q := range queuesToSync {
channels, err := s.fetchChannels(q)
if err != nil {
return err
}
for _, c := range channels {
if !s.isWorthProcessing(c) {
continue
}
syncs = append(syncs, Sync{
YoutubeAPIKey: s.YoutubeAPIKey,
YoutubeChannelID: c.ChannelId,
LbryChannelName: c.DesiredChannelName,
StopOnError: s.StopOnError,
MaxTries: s.MaxTries,
ConcurrentVideos: s.ConcurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill,
Manager: &s,
})
}
}
}
if len(syncs) == 0 {
log.Infoln("No channels to sync. Pausing 5 minutes!")
time.Sleep(5 * time.Minute)
}
for i, sync := range syncs {
SendInfoToSlack("Syncing %s to LBRY! (iteration %d/%d - total session iterations: %d)", sync.LbryChannelName, i, len(syncs), syncCount)
err := sync.FullCycle()
if err != nil {
fatalErrors := []string{
"default_wallet already exists",
"WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR",
"NotEnoughFunds",
"no space left on device",
}
if util.SubstringInSlice(err.Error(), fatalErrors) {
return errors.Prefix("@Nikooo777 this requires manual intervention! Exiting...", err)
}
SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error())
}
SendInfoToSlack("Syncing %s reached an end. (Iteration %d/%d - total session iterations: %d))", sync.LbryChannelName, i, len(syncs), syncCount)
syncCount++
if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) {
shouldInterruptLoop = true
break
}
}
if shouldInterruptLoop {
break
}
}
return nil
}
func (s SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool {
return channel.TotalVideos > 0 && (channel.SyncServer.IsNull() || channel.SyncServer.String == s.HostName)
}
func (s SyncManager) checkUsedSpace() error {
usedPctile, err := GetUsedSpace(s.BlobsDir)
if err != nil {
return err
}
if usedPctile >= 0.90 && !s.SkipSpaceCheck {
return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100))
}
SendInfoToSlack("disk usage: %.1f%%", usedPctile*100)
return nil
}
// GetUsedSpace returns a value between 0 and 1, with 0 being completely empty and 1 being full, for the disk that holds the provided path
func GetUsedSpace(path string) (float32, error) {
var stat syscall.Statfs_t
err := syscall.Statfs(path, &stat)
if err != nil {
return 0, err
}
// Available blocks * size per block = available space in bytes
all := stat.Blocks * uint64(stat.Bsize)
free := stat.Bfree * uint64(stat.Bsize)
used := all - free
return float32(used) / float32(all), nil
}

View file

@ -13,6 +13,9 @@ import (
) )
func (s *Sync) walletSetup() error { func (s *Sync) walletSetup() error {
//prevent unnecessary concurrent execution
s.mux.Lock()
defer s.mux.Unlock()
err := s.ensureChannelOwnership() err := s.ensureChannelOwnership()
if err != nil { if err != nil {
return err return err
@ -37,6 +40,9 @@ func (s *Sync) walletSetup() error {
} }
} }
log.Debugf("Source channel has %d videos", numOnSource) log.Debugf("Source channel has %d videos", numOnSource)
if numOnSource == 0 {
return nil
}
numPublished, err := s.daemon.NumClaimsInChannel(s.LbryChannelName) numPublished, err := s.daemon.NumClaimsInChannel(s.LbryChannelName)
if err != nil { if err != nil {
@ -44,9 +50,15 @@ func (s *Sync) walletSetup() error {
} }
log.Debugf("We already published %d videos", numPublished) log.Debugf("We already published %d videos", numPublished)
minBalance := (float64(numOnSource)-float64(numPublished))*publishAmount + channelClaimAmount if float64(numOnSource)-float64(numPublished) > maximumVideosToPublish {
numOnSource = maximumVideosToPublish
}
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount
if numPublished > numOnSource {
SendErrorToSlack("something is going on as we published more videos than those available on source: %d/%d", numPublished, numOnSource)
minBalance = 1 //since we ended up in this function it means some juice is still needed
}
amountToAdd, _ := decimal.NewFromFloat(minBalance).Sub(balance).Float64() amountToAdd, _ := decimal.NewFromFloat(minBalance).Sub(balance).Float64()
amountToAdd *= 1.5 // add 50% margin for fees, future publishes, etc
if s.Refill > 0 { if s.Refill > 0 {
if amountToAdd < 0 { if amountToAdd < 0 {
@ -83,6 +95,7 @@ func (s *Sync) walletSetup() error {
} }
func (s *Sync) ensureEnoughUTXOs() error { func (s *Sync) ensureEnoughUTXOs() error {
utxolist, err := s.daemon.UTXOList() utxolist, err := s.daemon.UTXOList()
if err != nil { if err != nil {
return err return err
@ -92,10 +105,13 @@ func (s *Sync) ensureEnoughUTXOs() error {
if !allUTXOsConfirmed(utxolist) { if !allUTXOsConfirmed(utxolist) {
log.Println("Waiting for previous txns to confirm") // happens if you restarted the daemon soon after a previous publish run log.Println("Waiting for previous txns to confirm") // happens if you restarted the daemon soon after a previous publish run
s.waitUntilUTXOsConfirmed() err := s.waitUntilUTXOsConfirmed()
if err != nil {
return err
}
} }
target := 60 target := 40
count := 0 count := 0
for _, utxo := range *utxolist { for _, utxo := range *utxolist {
@ -140,6 +156,7 @@ func (s *Sync) ensureEnoughUTXOs() error {
} }
func (s *Sync) waitUntilUTXOsConfirmed() error { func (s *Sync) waitUntilUTXOsConfirmed() error {
origin := time.Now()
for { for {
r, err := s.daemon.UTXOList() r, err := s.daemon.UTXOList()
if err != nil { if err != nil {
@ -151,7 +168,11 @@ func (s *Sync) waitUntilUTXOsConfirmed() error {
if allUTXOsConfirmed(r) { if allUTXOsConfirmed(r) {
return nil return nil
} }
if time.Now().After(origin.Add(15 * time.Minute)) {
//lbryum is messing with us or something. restart the daemon
//this could also be a very long block
SendErrorToSlack("We've been waiting UTXOs confirmation for %s... and this isn't normal", time.Now().Sub(origin).String())
}
wait := 30 * time.Second wait := 30 * time.Second
log.Println("Waiting " + wait.String() + "...") log.Println("Waiting " + wait.String() + "...")
time.Sleep(wait) time.Sleep(wait)
@ -266,6 +287,7 @@ func (s *Sync) addCredits(amountToAdd float64) error {
log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new transaction") log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new transaction")
time.Sleep(wait) time.Sleep(wait)
log.Println("Waiting for transaction to be confirmed") return nil
return s.waitUntilUTXOsConfirmed() //log.Println("Waiting for transaction to be confirmed")
//return s.waitUntilUTXOsConfirmed()
} }

View file

@ -1,17 +1,26 @@
package sources package sources
import ( import (
"fmt"
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"crypto/md5"
"encoding/hex"
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
var titleRegexp = regexp.MustCompile(`[^a-zA-Z0-9]+`) var titleRegexp = regexp.MustCompile(`[^a-zA-Z0-9]+`)
type SyncSummary struct {
ClaimID string
ClaimName string
}
func getClaimNameFromTitle(title string, attempt int) string { func getClaimNameFromTitle(title string, attempt int) string {
suffix := "" suffix := ""
if attempt > 1 { if attempt > 1 {
@ -43,7 +52,7 @@ func getClaimNameFromTitle(title string, attempt int) string {
var publishedNamesMutex sync.RWMutex var publishedNamesMutex sync.RWMutex
var publishedNames = map[string]bool{} var publishedNames = map[string]bool{}
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions) error { func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions) (*SyncSummary, error) {
attempt := 0 attempt := 0
for { for {
attempt++ attempt++
@ -56,20 +65,26 @@ func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string
log.Printf("name exists, retrying (%d attempts so far)\n", attempt) log.Printf("name exists, retrying (%d attempts so far)\n", attempt)
continue continue
} }
//if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash
if len(name) < 2 {
hasher := md5.New()
hasher.Write([]byte(title))
name = fmt.Sprintf("%s-%d", hex.EncodeToString(hasher.Sum(nil))[:15], attempt)
}
_, err := daemon.Publish(name, filename, amount, options) response, err := daemon.Publish(name, filename, amount, options)
if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") { if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") {
publishedNamesMutex.Lock() publishedNamesMutex.Lock()
publishedNames[name] = true publishedNames[name] = true
publishedNamesMutex.Unlock() publishedNamesMutex.Unlock()
if err == nil { if err == nil {
return nil return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
} else { } else {
log.Printf("name exists, retrying (%d attempts so far)\n", attempt) log.Printf("name exists, retrying (%d attempts so far)\n", attempt)
continue continue
} }
} else { } else {
return err return nil, err
} }
} }
} }

View file

@ -170,7 +170,7 @@ func (v ucbVideo) saveThumbnail() error {
return err return err
} }
func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error { func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) {
options := jsonrpc.PublishOptions{ options := jsonrpc.PublishOptions{
Title: &v.title, Title: &v.title,
Author: strPtr("UC Berkeley"), Author: strPtr("UC Berkeley"),
@ -188,11 +188,11 @@ func (v ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount fl
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
} }
func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error { func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) {
//download and thumbnail can be done in parallel //download and thumbnail can be done in parallel
err := v.download() err := v.download()
if err != nil { if err != nil {
return errors.Prefix("download error", err) return nil, errors.Prefix("download error", err)
} }
log.Debugln("Downloaded " + v.id) log.Debugln("Downloaded " + v.id)
@ -202,10 +202,10 @@ func (v ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float
//} //}
//log.Debugln("Created thumbnail for " + v.id) //log.Debugln("Created thumbnail for " + v.id)
err = v.publish(daemon, claimAddress, amount, channelName) summary, err := v.publish(daemon, claimAddress, amount, channelName)
if err != nil { if err != nil {
return errors.Prefix("publish error", err) return nil, errors.Prefix("publish error", err)
} }
return nil return summary, nil
} }

View file

@ -59,11 +59,7 @@ func (v YoutubeVideo) PublishedAt() time.Time {
} }
func (v YoutubeVideo) getFilename() string { func (v YoutubeVideo) getFilename() string {
return v.dir + "/" + v.getClaimName() + ".mp4" maxLen := 30
}
func (v YoutubeVideo) getClaimName() string {
maxLen := 40
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`) reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
chunks := strings.Split(strings.ToLower(strings.Trim(reg.ReplaceAllString(v.title, "-"), "-")), "-") chunks := strings.Split(strings.ToLower(strings.Trim(reg.ReplaceAllString(v.title, "-"), "-")), "-")
@ -83,8 +79,10 @@ func (v YoutubeVideo) getClaimName() string {
} }
name = tmpName name = tmpName
} }
if len(name) < 1 {
return name name = v.id
}
return v.videoDir() + "/" + name + ".mp4"
} }
func (v YoutubeVideo) getAbbrevDescription() string { func (v YoutubeVideo) getAbbrevDescription() string {
@ -99,7 +97,12 @@ func (v YoutubeVideo) getAbbrevDescription() string {
func (v YoutubeVideo) download() error { func (v YoutubeVideo) download() error {
videoPath := v.getFilename() videoPath := v.getFilename()
_, err := os.Stat(videoPath) err := os.Mkdir(v.videoDir(), 0750)
if err != nil && !strings.Contains(err.Error(), "file exists") {
return errors.Wrap(err, 0)
}
_, err = os.Stat(videoPath)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
return err return err
} else if err == nil { } else if err == nil {
@ -114,7 +117,7 @@ func (v YoutubeVideo) download() error {
} }
var downloadedFile *os.File var downloadedFile *os.File
downloadedFile, err = os.Create(v.getFilename()) downloadedFile, err = os.Create(videoPath)
if err != nil { if err != nil {
return err return err
} }
@ -124,10 +127,15 @@ func (v YoutubeVideo) download() error {
return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[0], downloadedFile) return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[0], downloadedFile)
} }
func (v YoutubeVideo) videoDir() string {
return v.dir + "/" + v.id
}
func (v YoutubeVideo) delete() error { func (v YoutubeVideo) delete() error {
videoPath := v.getFilename() videoPath := v.getFilename()
err := os.Remove(videoPath) err := os.Remove(videoPath)
if err != nil { if err != nil {
log.Errorln(errors.Prefix("delete error", err))
return err return err
} }
log.Debugln(v.id + " deleted from disk (" + videoPath + ")") log.Debugln(v.id + " deleted from disk (" + videoPath + ")")
@ -177,7 +185,7 @@ func (v YoutubeVideo) triggerThumbnailSave() error {
func strPtr(s string) *string { return &s } func strPtr(s string) *string { return &s }
func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error { func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) {
options := jsonrpc.PublishOptions{ options := jsonrpc.PublishOptions{
Title: &v.title, Title: &v.title,
Author: &v.channelTitle, Author: &v.channelTitle,
@ -186,6 +194,7 @@ func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amoun
ClaimAddress: &claimAddress, ClaimAddress: &claimAddress,
Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id), Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id),
License: strPtr("Copyrighted (contact author)"), License: strPtr("Copyrighted (contact author)"),
ChangeAddress: &claimAddress,
} }
if channelName != "" { if channelName != "" {
options.ChannelName = &channelName options.ChannelName = &channelName
@ -194,33 +203,38 @@ func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amoun
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options) return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options)
} }
func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error { func (v YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) (*SyncSummary, error) {
//download and thumbnail can be done in parallel //download and thumbnail can be done in parallel
err := v.download() err := v.download()
if err != nil { if err != nil {
return errors.Prefix("download error", err) return nil, errors.Prefix("download error", err)
} }
log.Debugln("Downloaded " + v.id) log.Debugln("Downloaded " + v.id)
fi, err := os.Stat(v.getFilename())
if err != nil {
return nil, err
}
if fi.Size() > 2*1024*1024*1024 {
//delete the video and ignore the error
_ = v.delete()
return nil, errors.Err("video is bigger than 2GB, skipping for now")
}
err = v.triggerThumbnailSave() err = v.triggerThumbnailSave()
if err != nil { if err != nil {
return errors.Prefix("thumbnail error", err) return nil, errors.Prefix("thumbnail error", err)
} }
log.Debugln("Created thumbnail for " + v.id) log.Debugln("Created thumbnail for " + v.id)
err = v.publish(daemon, claimAddress, amount, channelName) summary, err := v.publish(daemon, claimAddress, amount, channelName)
//delete the video in all cases (and ignore the error)
_ = v.delete()
if err != nil { if err != nil {
return errors.Prefix("publish error", err) return nil, errors.Prefix("publish error", err)
} }
err = v.delete() return summary, nil
if err != nil {
// the video was published anyway so it should be marked as published
// for that to happen, no errors should be returned any further than here
log.Debugln(errors.Prefix("delete error", err))
}
return nil
} }
// sorting videos // sorting videos

View file

@ -4,6 +4,7 @@ import (
"bufio" "bufio"
"encoding/csv" "encoding/csv"
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -19,9 +20,9 @@ import (
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stop" "github.com/lbryio/lbry.go/stop"
"github.com/lbryio/lbry.go/util"
"github.com/lbryio/lbry.go/ytsync/redisdb" "github.com/lbryio/lbry.go/ytsync/redisdb"
"github.com/lbryio/lbry.go/ytsync/sources" "github.com/lbryio/lbry.go/ytsync/sources"
"github.com/mitchellh/go-ps" "github.com/mitchellh/go-ps"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"google.golang.org/api/googleapi/transport" "google.golang.org/api/googleapi/transport"
@ -31,6 +32,7 @@ import (
const ( const (
channelClaimAmount = 0.01 channelClaimAmount = 0.01
publishAmount = 0.01 publishAmount = 0.01
maximumVideosToPublish = 1000
) )
type video interface { type video interface {
@ -38,7 +40,7 @@ type video interface {
IDAndNum() string IDAndNum() string
PlaylistPosition() int PlaylistPosition() int
PublishedAt() time.Time PublishedAt() time.Time
Sync(*jsonrpc.Client, string, float64, string) error Sync(*jsonrpc.Client, string, float64, string) (*sources.SyncSummary, error)
} }
// sorting videos // sorting videos
@ -58,6 +60,7 @@ type Sync struct {
ConcurrentVideos int ConcurrentVideos int
TakeOverExistingChannel bool TakeOverExistingChannel bool
Refill int Refill int
Manager *SyncManager
daemon *jsonrpc.Client daemon *jsonrpc.Client
claimAddress string claimAddress string
@ -66,25 +69,79 @@ type Sync struct {
grp *stop.Group grp *stop.Group
mux sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
queue chan video queue chan video
} }
func (s *Sync) FullCycle() error { // SendErrorToSlack Sends an error message to the default channel and to the process log.
var err error func SendErrorToSlack(format string, a ...interface{}) error {
message := format
if len(a) > 0 {
message = fmt.Sprintf(format, a...)
}
log.Errorln(message)
return util.SendToSlack(":sos: " + message)
}
// SendInfoToSlack Sends an info message to the default channel and to the process log.
func SendInfoToSlack(format string, a ...interface{}) error {
message := format
if len(a) > 0 {
message = fmt.Sprintf(format, a...)
}
log.Infoln(message)
return util.SendToSlack(":information_source: " + message)
}
// IsInterrupted can be queried to discover if the sync process was interrupted manually
func (s *Sync) IsInterrupted() bool {
select {
case <-s.grp.Ch():
return true
default:
return false
}
}
func (s *Sync) FullCycle() (e error) {
if os.Getenv("HOME") == "" { if os.Getenv("HOME") == "" {
return errors.Err("no $HOME env var found") return errors.Err("no $HOME env var found")
} }
if s.YoutubeChannelID == "" { if s.YoutubeChannelID == "" {
channelID, err := getChannelIDFromFile(s.LbryChannelName) return errors.Err("channel ID not provided")
}
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSyncing)
if err != nil { if err != nil {
return err return err
} }
s.YoutubeChannelID = channelID defer func() {
if e != nil {
//conditions for which a channel shouldn't be marked as failed
noFailConditions := []string{
"this youtube channel is being managed by another server",
} }
if util.SubstringInSlice(e.Error(), noFailConditions) {
return
}
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed)
if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
err = errors.Prefix(msg, err)
e = errors.Prefix(err.Error(), e)
}
} else if !s.IsInterrupted() {
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced)
if err != nil {
e = err
}
}
}()
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet" defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
if os.Getenv("REGTEST") == "true" {
defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
}
walletBackupDir := os.Getenv("HOME") + "/wallets/" + strings.Replace(s.LbryChannelName, "@", "", 1) walletBackupDir := os.Getenv("HOME") + "/wallets/" + strings.Replace(s.LbryChannelName, "@", "", 1)
if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) { if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) {
@ -103,16 +160,14 @@ func (s *Sync) FullCycle() error {
log.Printf("Stopping daemon") log.Printf("Stopping daemon")
shutdownErr := stopDaemonViaSystemd() shutdownErr := stopDaemonViaSystemd()
if shutdownErr != nil { if shutdownErr != nil {
log.Errorf("error shutting down daemon: %v", shutdownErr) logShutdownError(shutdownErr)
log.Errorf("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
} else { } else {
// the cli will return long before the daemon effectively stops. we must observe the processes running // the cli will return long before the daemon effectively stops. we must observe the processes running
// before moving the wallet // before moving the wallet
var waitTimeout time.Duration = 60 * 6 var waitTimeout time.Duration = 60 * 8
processDeathError := waitForDaemonProcess(waitTimeout) processDeathError := waitForDaemonProcess(waitTimeout)
if processDeathError != nil { if processDeathError != nil {
log.Errorf("error shutdown down daemon: %v", processDeathError) logShutdownError(processDeathError)
log.Errorf("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
} else { } else {
walletErr := os.Rename(defaultWalletDir, walletBackupDir) walletErr := os.Rename(defaultWalletDir, walletBackupDir)
if walletErr != nil { if walletErr != nil {
@ -147,7 +202,7 @@ func (s *Sync) FullCycle() error {
log.Infoln("Waiting for daemon to finish starting...") log.Infoln("Waiting for daemon to finish starting...")
s.daemon = jsonrpc.NewClient("") s.daemon = jsonrpc.NewClient("")
s.daemon.SetRPCTimeout(5 * time.Minute) s.daemon.SetRPCTimeout(40 * time.Minute)
WaitForDaemonStart: WaitForDaemonStart:
for { for {
@ -175,13 +230,17 @@ WaitForDaemonStart:
return nil return nil
} }
func logShutdownError(shutdownErr error) {
SendErrorToSlack("error shutting down daemon: %v", shutdownErr)
SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
}
func (s *Sync) doSync() error { func (s *Sync) doSync() error {
var err error var err error
err = s.walletSetup() err = s.walletSetup()
if err != nil { if err != nil {
return err return errors.Prefix("Initial wallet setup failed! Manual Intervention is required.", err)
} }
if s.StopOnError { if s.StopOnError {
@ -235,30 +294,53 @@ func (s *Sync) startWorker(workerNum int) {
err := s.processVideo(v) err := s.processVideo(v)
if err != nil { if err != nil {
log.Errorln("error processing video: " + err.Error()) logMsg := fmt.Sprintf("error processing video: " + err.Error())
if s.StopOnError { log.Errorln(logMsg)
fatalErrors := []string{
":5279: read: connection reset by peer",
"no space left on device",
"NotEnoughFunds",
"Cannot publish using channel",
}
if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError {
s.grp.Stop() s.grp.Stop()
} else if s.MaxTries > 1 { } else if s.MaxTries > 1 {
if strings.Contains(err.Error(), "non 200 status code received") || errorsNoRetry := []string{
strings.Contains(err.Error(), " reason: 'This video contains content from") || "non 200 status code received",
strings.Contains(err.Error(), "dont know which claim to update") || " reason: 'This video contains content from",
strings.Contains(err.Error(), "uploader has not made this video available in your country") || "dont know which claim to update",
strings.Contains(err.Error(), "download error: AccessDenied: Access Denied") || "uploader has not made this video available in your country",
strings.Contains(err.Error(), "Playback on other websites has been disabled by the video owner") { "download error: AccessDenied: Access Denied",
"Playback on other websites has been disabled by the video owner",
"Error in daemon: Cannot publish empty file",
"Error extracting sts from embedded url response",
"Client.Timeout exceeded while awaiting headers)",
"video is bigger than 2GB, skipping for now",
}
if util.SubstringInSlice(err.Error(), errorsNoRetry) {
log.Println("This error should not be retried at all") log.Println("This error should not be retried at all")
} else if tryCount < s.MaxTries { } else if tryCount < s.MaxTries {
if strings.Contains(err.Error(), "The transaction was rejected by network rules.(258: txn-mempool-conflict)") { if strings.Contains(err.Error(), "txn-mempool-conflict") ||
strings.Contains(err.Error(), "failed: Not enough funds") ||
strings.Contains(err.Error(), "Error in daemon: Insufficient funds, please deposit additional LBC") ||
strings.Contains(err.Error(), "too-long-mempool-chain") {
log.Println("waiting for a block and refilling addresses before retrying") log.Println("waiting for a block and refilling addresses before retrying")
err = s.ensureEnoughUTXOs() err = s.walletSetup()
if err != nil { if err != nil {
log.Println(err.Error()) s.grp.Stop()
SendErrorToSlack("Failed to setup the wallet for a refill: %v", err)
break
} }
} }
log.Println("Retrying") log.Println("Retrying")
continue continue
} }
log.Printf("Video failed after %d retries, skipping", s.MaxTries) SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
} }
/*err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoSStatusFailed, "", "", err.Error())
if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}*/
} }
break break
} }
@ -312,8 +394,6 @@ func (s *Sync) enqueueYoutubeVideos() error {
} }
for _, item := range playlistResponse.Items { for _, item := range playlistResponse.Items {
// todo: there's thumbnail info here. why did we need lambda???
// normally we'd send the video into the channel here, but youtube api doesn't have sorting // normally we'd send the video into the channel here, but youtube api doesn't have sorting
// so we have to get ALL the videos, then sort them, then send them in // so we have to get ALL the videos, then sort them, then send them in
videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item.Snippet)) videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item.Snippet))
@ -424,15 +504,18 @@ func (s *Sync) processVideo(v video) (err error) {
return nil return nil
} }
if v.PlaylistPosition() > 3000 { if v.PlaylistPosition() > maximumVideosToPublish {
log.Println(v.ID() + " is old: skipping") log.Println(v.ID() + " is old: skipping")
return nil return nil
} }
err = v.Sync(s.daemon, s.claimAddress, publishAmount, s.LbryChannelName) _, err = v.Sync(s.daemon, s.claimAddress, publishAmount, s.LbryChannelName)
if err != nil { if err != nil {
return err return err
} }
/*err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "")
if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}*/
err = s.db.SetPublished(v.ID()) err = s.db.SetPublished(v.ID())
if err != nil { if err != nil {
return err return err
@ -457,26 +540,6 @@ func stopDaemonViaSystemd() error {
return nil return nil
} }
func getChannelIDFromFile(channelName string) (string, error) {
channelsJSON, err := ioutil.ReadFile("./channels")
if err != nil {
return "", errors.Wrap(err, 0)
}
var channels map[string]string
err = json.Unmarshal(channelsJSON, &channels)
if err != nil {
return "", errors.Wrap(err, 0)
}
channelID, ok := channels[channelName]
if !ok {
return "", errors.Err("channel not in list")
}
return channelID, nil
}
// waitForDaemonProcess observes the running processes and returns when the process is no longer running or when the timeout is up // waitForDaemonProcess observes the running processes and returns when the process is no longer running or when the timeout is up
func waitForDaemonProcess(timeout time.Duration) error { func waitForDaemonProcess(timeout time.Duration) error {
processes, err := ps.Processes() processes, err := ps.Processes()