package manager import ( "fmt" "io/ioutil" "os" "os/signal" "runtime/debug" "strconv" "strings" "sync" "syscall" "time" "github.com/lbryio/ytsync/v5/ip_manager" "github.com/lbryio/ytsync/v5/namer" "github.com/lbryio/ytsync/v5/sdk" "github.com/lbryio/ytsync/v5/shared" "github.com/lbryio/ytsync/v5/sources" "github.com/lbryio/ytsync/v5/thumbs" "github.com/lbryio/ytsync/v5/timing" logUtils "github.com/lbryio/ytsync/v5/util" "github.com/lbryio/ytsync/v5/ytapi" "github.com/vbauerster/mpb/v7" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/jsonrpc" "github.com/lbryio/lbry.go/v2/extras/stop" "github.com/lbryio/lbry.go/v2/extras/util" log "github.com/sirupsen/logrus" ) const ( channelClaimAmount = 0.01 estimatedMaxTxFee = 0.1 minimumAccountBalance = 1.0 minimumRefillAmount = 1 publishAmount = 0.01 maxReasonLength = 500 ) // Sync stores the options that control how syncing happens type Sync struct { DbChannelData *shared.YoutubeChannel Manager *SyncManager daemon *jsonrpc.Client videoDirectory string syncedVideosMux *sync.RWMutex syncedVideos map[string]sdk.SyncedVideo grp *stop.Group namer *namer.Namer walletMux *sync.RWMutex queue chan ytapi.Video defaultAccountID string hardVideoFailure hardVideoFailure progressBarWg *sync.WaitGroup progressBar *mpb.Progress } type hardVideoFailure struct { lock *sync.Mutex failed bool failureReason string } func (hv *hardVideoFailure) flagFailure(reason string) { hv.lock.Lock() defer hv.lock.Unlock() if hv.failed { return } hv.failed = true hv.failureReason = reason } func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string, claimID string, metadataVersion int8, size int64) { s.syncedVideosMux.Lock() defer s.syncedVideosMux.Unlock() s.syncedVideos[videoID] = sdk.SyncedVideo{ VideoID: videoID, Published: published, FailureReason: failureReason, ClaimID: claimID, ClaimName: claimName, MetadataVersion: metadataVersion, Size: size, } } // IsInterrupted can be queried to discover if the sync process was interrupted manually func (s *Sync) IsInterrupted() bool { select { case <-s.grp.Ch(): return true default: return false } } func (s *Sync) setStatusSyncing() error { syncedVideos, claimNames, err := s.Manager.ApiConfig.SetChannelStatus(s.DbChannelData.ChannelId, shared.StatusSyncing, "", nil) if err != nil { return err } s.syncedVideosMux.Lock() s.syncedVideos = syncedVideos s.namer.SetNames(claimNames) s.syncedVideosMux.Unlock() return nil } var stopGroup = stop.New() func (s *Sync) FullCycle() (e error) { if os.Getenv("HOME") == "" { return errors.Err("no $HOME env var found") } defer timing.ClearTimings() s.syncedVideosMux = &sync.RWMutex{} s.walletMux = &sync.RWMutex{} s.grp = stopGroup s.queue = make(chan ytapi.Video) interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) defer signal.Stop(interruptChan) go func() { <-interruptChan util.SendToSlack("got interrupt, shutting down") log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") s.grp.Stop() time.Sleep(5 * time.Second) }() err := s.setStatusSyncing() if err != nil { return err } defer s.setChannelTerminationStatus(&e) err = s.downloadWallet() if err != nil && err.Error() != "wallet not on S3" { return errors.Prefix("failure in downloading wallet", err) } else if err == nil { log.Println("Continuing previous upload") } else { log.Println("Starting new wallet") } err = s.downloadBlockchainDB() if err != nil { return errors.Prefix("failure in downloading blockchain.db", err) } defer s.stopAndUploadWallet(&e) s.videoDirectory, err = ioutil.TempDir(os.Getenv("TMP_DIR"), "ytsync") if err != nil { return errors.Wrap(err, 0) } err = os.Chmod(s.videoDirectory, 0766) if err != nil { return errors.Err(err) } defer deleteSyncFolder(s.videoDirectory) log.Printf("Starting daemon") err = logUtils.StartDaemon() if err != nil { return err } log.Infoln("Waiting for daemon to finish starting...") s.daemon = jsonrpc.NewClient(os.Getenv("LBRYNET_ADDRESS")) s.daemon.SetRPCTimeout(5 * time.Minute) err = s.waitForDaemonStart() if err != nil { return err } s.progressBarWg = &sync.WaitGroup{} s.progressBar = mpb.New(mpb.WithWaitGroup(s.progressBarWg)) err = s.doSync() // Waiting for passed &wg and for all bars to complete and flush s.progressBar.Wait() if err != nil { return err } if s.shouldTransfer() { err = s.processTransfers() } timing.Report() return err } func (s *Sync) processTransfers() (e error) { log.Println("Processing transfers") if s.DbChannelData.TransferState != 2 { err := waitConfirmations(s) if err != nil { return err } } supportAmount, err := abandonSupports(s) if err != nil { return errors.Prefix(fmt.Sprintf("%.6f LBCs were abandoned before failing", supportAmount), err) } if supportAmount > 0 { logUtils.SendInfoToSlack("(%s) %.6f LBCs were abandoned and should be used as support", s.DbChannelData.ChannelId, supportAmount) } err = transferVideos(s) if err != nil { return err } err = transferChannel(s) if err != nil { return err } defaultAccount, err := s.getDefaultAccount() if err != nil { return err } reallocateSupports := supportAmount > 0.01 if reallocateSupports { err = waitConfirmations(s) if err != nil { return err } isTip := true summary, err := s.daemon.SupportCreate(s.DbChannelData.ChannelClaimID, fmt.Sprintf("%.6f", supportAmount), &isTip, nil, []string{defaultAccount}, nil) if err != nil { if strings.Contains(err.Error(), "tx-size") { //TODO: this is a silly workaround... _, spendErr := s.daemon.TxoSpend(util.PtrToString("other"), nil, nil, nil, nil, &s.defaultAccountID) if spendErr != nil { return errors.Prefix(fmt.Sprintf("something went wrong while tipping the channel for %.6f LBCs", supportAmount), err) } err = s.waitForNewBlock() if err != nil { return errors.Prefix(fmt.Sprintf("something went wrong while tipping the channel for %.6f LBCs (waiting for new block)", supportAmount), err) } summary, err = s.daemon.SupportCreate(s.DbChannelData.ChannelClaimID, fmt.Sprintf("%.6f", supportAmount), &isTip, nil, []string{defaultAccount}, nil) if err != nil { return errors.Prefix(fmt.Sprintf("something went wrong while tipping the channel for %.6f LBCs", supportAmount), err) } } else { return errors.Prefix(fmt.Sprintf("something went wrong while tipping the channel for %.6f LBCs", supportAmount), err) } } if len(summary.Outputs) < 1 { return errors.Err("something went wrong while tipping the channel for %.6f LBCs", supportAmount) } } log.Println("Done processing transfers") return nil } func deleteSyncFolder(videoDirectory string) { if !strings.Contains(videoDirectory, "/tmp/ytsync") { _ = util.SendToSlack(errors.Err("Trying to delete an unexpected directory: %s", videoDirectory).Error()) } err := os.RemoveAll(videoDirectory) if err != nil { _ = util.SendToSlack(err.Error()) } } func (s *Sync) shouldTransfer() bool { return s.DbChannelData.TransferState >= 1 && s.DbChannelData.PublishAddress.Address != "" && !s.Manager.CliFlags.DisableTransfers && s.DbChannelData.TransferState != 3 } func (s *Sync) setChannelTerminationStatus(e *error) { var transferState *int if s.shouldTransfer() { if *e == nil { transferState = util.PtrToInt(shared.TransferStateComplete) } } if *e != nil { //conditions for which a channel shouldn't be marked as failed noFailConditions := []string{ "this youtube channel is being managed by another server", "interrupted during daemon startup", "interrupted by user", "use --skip-space-check to ignore", } dbWipeConditions := []string{ "Missing inputs", } if util.SubstringInSlice((*e).Error(), noFailConditions) { return } channelStatus := shared.StatusFailed if util.SubstringInSlice((*e).Error(), dbWipeConditions) { channelStatus = shared.StatusWipeDb } failureReason := (*e).Error() _, _, err := s.Manager.ApiConfig.SetChannelStatus(s.DbChannelData.ChannelId, channelStatus, failureReason, transferState) if err != nil { msg := fmt.Sprintf("Failed setting failed state for channel %s", s.DbChannelData.DesiredChannelName) *e = errors.Prefix(msg+err.Error(), *e) } } else if !s.IsInterrupted() { _, _, err := s.Manager.ApiConfig.SetChannelStatus(s.DbChannelData.ChannelId, shared.StatusSynced, "", transferState) if err != nil { *e = err } } } func (s *Sync) waitForDaemonStart() error { beginTime := time.Now() defer func(start time.Time) { timing.TimedComponent("waitForDaemonStart").Add(time.Since(start)) }(beginTime) for { select { case <-s.grp.Ch(): return errors.Err("interrupted during daemon startup") default: status, err := s.daemon.Status() if err == nil && status.StartupStatus.Wallet && status.IsRunning { return nil } if time.Since(beginTime).Minutes() > 120 { s.grp.Stop() return errors.Err("the daemon is taking too long to start. Something is wrong") } time.Sleep(5 * time.Second) } } } func (s *Sync) stopAndUploadWallet(e *error) { log.Printf("Stopping daemon") shutdownErr := logUtils.StopDaemon() if shutdownErr != nil { logShutdownError(shutdownErr) } else { // the cli will return long before the daemon effectively stops. we must observe the processes running // before moving the wallet waitTimeout := 8 * time.Minute processDeathError := waitForDaemonProcess(waitTimeout) if processDeathError != nil { logShutdownError(processDeathError) } else { err := s.uploadWallet() if err != nil { if *e == nil { e = &err } else { *e = errors.Prefix("failure uploading wallet", *e) } } err = s.uploadBlockchainDB() if err != nil { if *e == nil { *e = err } else { *e = errors.Prefix(fmt.Sprintf("failure uploading blockchain DB: %s + original error", errors.FullTrace(err)), *e) } } } } } func logShutdownError(shutdownErr error) { logUtils.SendErrorToSlack("error shutting down daemon: %s", errors.FullTrace(shutdownErr)) logUtils.SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR") } var thumbnailHosts = []string{ "berk.ninja/thumbnails/", thumbs.ThumbnailEndpoint, } func isYtsyncClaim(c jsonrpc.Claim, expectedChannelID string) bool { if !util.InSlice(c.Type, []string{"claim", "update"}) || c.Value.GetStream() == nil { return false } if c.Value.GetThumbnail() == nil || c.Value.GetThumbnail().GetUrl() == "" { //most likely a claim created outside of ytsync, ignore! return false } if c.SigningChannel == nil { return false } if c.SigningChannel.ClaimID != expectedChannelID { return false } for _, th := range thumbnailHosts { if strings.Contains(c.Value.GetThumbnail().GetUrl(), th) { return true } } return false } // fixDupes abandons duplicate claims func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) { start := time.Now() defer func(start time.Time) { timing.TimedComponent("fixDupes").Add(time.Since(start)) }(start) abandonedClaims := false videoIDs := make(map[string]jsonrpc.Claim) for _, c := range claims { if !isYtsyncClaim(c, s.DbChannelData.ChannelClaimID) { continue } tn := c.Value.GetThumbnail().GetUrl() videoID := tn[strings.LastIndex(tn, "/")+1:] cl, ok := videoIDs[videoID] if !ok || cl.ClaimID == c.ClaimID { videoIDs[videoID] = c continue } // only keep the most recent one claimToAbandon := c videoIDs[videoID] = cl if c.Height > cl.Height { claimToAbandon = cl videoIDs[videoID] = c } //it's likely that all we need is s.DbChannelData.PublishAddress.IsMine but better be safe than sorry I guess if (claimToAbandon.Address != s.DbChannelData.PublishAddress.Address || s.DbChannelData.PublishAddress.IsMine) && !s.syncedVideos[videoID].Transferred { log.Debugf("abandoning %+v", claimToAbandon) _, err := s.daemon.StreamAbandon(claimToAbandon.Txid, claimToAbandon.Nout, nil, true) if err != nil { return true, err } abandonedClaims = true } else { log.Debugf("claim is not ours. Have the user run this: lbrynet stream abandon --txid=%s --nout=%d", claimToAbandon.Txid, claimToAbandon.Nout) } } return abandonedClaims, nil } type ytsyncClaim struct { ClaimID string MetadataVersion uint ClaimName string PublishAddress string VideoID string Claim *jsonrpc.Claim } // mapFromClaims returns a map of videoIDs (youtube id) to ytsyncClaim which is a structure holding blockchain related // information func (s *Sync) mapFromClaims(claims []jsonrpc.Claim) map[string]ytsyncClaim { videoIDMap := make(map[string]ytsyncClaim, len(claims)) for _, c := range claims { if !isYtsyncClaim(c, s.DbChannelData.ChannelClaimID) { continue } tn := c.Value.GetThumbnail().GetUrl() videoID := tn[strings.LastIndex(tn, "/")+1:] claimMetadataVersion := uint(1) if strings.Contains(tn, thumbs.ThumbnailEndpoint) { claimMetadataVersion = 2 } videoIDMap[videoID] = ytsyncClaim{ ClaimID: c.ClaimID, MetadataVersion: claimMetadataVersion, ClaimName: c.Name, PublishAddress: c.Address, VideoID: videoID, Claim: &c, } } return videoIDMap } //updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published //additionally it removes all entries in the database indicating that a video is published when it's actually not func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim, ownClaims []jsonrpc.Claim) (total, fixed, removed int, err error) { allClaimsInfo := s.mapFromClaims(claims) ownClaimsInfo := s.mapFromClaims(ownClaims) count := len(allClaimsInfo) idsToRemove := make([]string, 0, count) for videoID, chainInfo := range allClaimsInfo { s.syncedVideosMux.RLock() sv, claimInDatabase := s.syncedVideos[videoID] s.syncedVideosMux.RUnlock() metadataDiffers := claimInDatabase && sv.MetadataVersion != int8(chainInfo.MetadataVersion) claimIDDiffers := claimInDatabase && sv.ClaimID != chainInfo.ClaimID claimNameDiffers := claimInDatabase && sv.ClaimName != chainInfo.ClaimName claimMarkedUnpublished := claimInDatabase && !sv.Published _, isOwnClaim := ownClaimsInfo[videoID] transferred := !isOwnClaim || s.DbChannelData.TransferState == 3 transferStatusMismatch := sv.Transferred != transferred if metadataDiffers { log.Debugf("%s: Mismatch in database for metadata. DB: %d - Blockchain: %d", videoID, sv.MetadataVersion, chainInfo.MetadataVersion) } if claimIDDiffers { log.Debugf("%s: Mismatch in database for claimID. DB: %s - Blockchain: %s", videoID, sv.ClaimID, chainInfo.ClaimID) } if claimNameDiffers { log.Debugf("%s: Mismatch in database for claimName. DB: %s - Blockchain: %s", videoID, sv.ClaimName, chainInfo.ClaimName) } if claimMarkedUnpublished { log.Debugf("%s: Mismatch in database: published but marked as unpublished", videoID) } if !claimInDatabase { log.Debugf("%s: Published but is not in database (%s - %s)", videoID, chainInfo.ClaimName, chainInfo.ClaimID) } if transferStatusMismatch { log.Debugf("%s: is marked as transferred %t but it's actually %t", videoID, sv.Transferred, transferred) } if !claimInDatabase || metadataDiffers || claimIDDiffers || claimNameDiffers || claimMarkedUnpublished || transferStatusMismatch { claimSize := uint64(0) if chainInfo.Claim.Value.GetStream().Source != nil { claimSize, err = chainInfo.Claim.GetStreamSizeByMagic() if err != nil { claimSize = 0 } } else { util.SendToSlack("[%s] video with claimID %s has no source?! panic prevented...", s.DbChannelData.ChannelId, chainInfo.ClaimID) } fixed++ log.Debugf("updating %s in the database", videoID) err = s.Manager.ApiConfig.MarkVideoStatus(shared.VideoStatus{ ChannelID: s.DbChannelData.ChannelId, VideoID: videoID, Status: shared.VideoStatusPublished, ClaimID: chainInfo.ClaimID, ClaimName: chainInfo.ClaimName, Size: util.PtrToInt64(int64(claimSize)), MetaDataVersion: chainInfo.MetadataVersion, IsTransferred: &transferred, }) if err != nil { return count, fixed, 0, err } } } //reload the synced videos map before we use it for further processing if fixed > 0 { err := s.setStatusSyncing() if err != nil { return count, fixed, 0, err } } for vID, sv := range s.syncedVideos { if sv.Transferred || sv.IsLbryFirst { _, ok := allClaimsInfo[vID] if !ok && sv.Published { searchResponse, err := s.daemon.ClaimSearch(nil, &sv.ClaimID, nil, nil, 1, 20) if err != nil { log.Error(err.Error()) continue } if len(searchResponse.Claims) == 0 { log.Debugf("%s: was transferred but appears abandoned! we should ignore this - claimID: %s", vID, sv.ClaimID) continue //TODO: we should flag these on the db } else { if sv.IsLbryFirst { log.Debugf("%s: was published using lbry-first so we don't want to do anything here! - claimID: %s", vID, sv.ClaimID) } else { log.Debugf("%s: was transferred and was then edited! we should ignore this - claimID: %s", vID, sv.ClaimID) } //return count, fixed, 0, errors.Err("%s: isn't our control but is on the database and on the blockchain. wtf is up? ClaimID: %s", vID, sv.ClaimID) } } continue } _, ok := ownClaimsInfo[vID] if !ok && sv.Published { log.Debugf("%s: claims to be published but wasn't found in the list of claims and will be removed if --remove-db-unpublished was specified (%t)", vID, s.Manager.CliFlags.RemoveDBUnpublished) idsToRemove = append(idsToRemove, vID) } } if s.Manager.CliFlags.RemoveDBUnpublished && len(idsToRemove) > 0 { log.Infof("removing: %s", strings.Join(idsToRemove, ",")) err := s.Manager.ApiConfig.DeleteVideos(idsToRemove) if err != nil { return count, fixed, len(idsToRemove), err } removed++ } //reload the synced videos map before we use it for further processing if removed > 0 { err := s.setStatusSyncing() if err != nil { return count, fixed, removed, err } } return count, fixed, removed, nil } func (s *Sync) getClaims(defaultOnly bool) ([]jsonrpc.Claim, error) { var account *string = nil if defaultOnly { a, err := s.getDefaultAccount() if err != nil { return nil, err } account = &a } claims, err := s.daemon.StreamList(account, 1, 30000) if err != nil { return nil, errors.Prefix("cannot list claims", err) } items := make([]jsonrpc.Claim, 0, len(claims.Items)) for _, c := range claims.Items { if c.SigningChannel != nil && c.SigningChannel.ClaimID == s.DbChannelData.ChannelClaimID { items = append(items, c) } } return items, nil } func (s *Sync) checkIntegrity() error { start := time.Now() defer func(start time.Time) { timing.TimedComponent("checkIntegrity").Add(time.Since(start)) }(start) allClaims, err := s.getClaims(false) if err != nil { return err } hasDupes, err := s.fixDupes(allClaims) if err != nil { return errors.Prefix("error checking for duplicates", err) } if hasDupes { logUtils.SendInfoToSlack("Channel had dupes and was fixed!") err = s.waitForNewBlock() if err != nil { return err } allClaims, err = s.getClaims(false) if err != nil { return err } } ownClaims, err := s.getClaims(true) if err != nil { return err } pubsOnWallet, nFixed, nRemoved, err := s.updateRemoteDB(allClaims, ownClaims) if err != nil { return errors.Prefix("error updating remote database", err) } if nFixed > 0 || nRemoved > 0 { if nFixed > 0 { logUtils.SendInfoToSlack("%d claims had mismatched database info or were completely missing and were fixed", nFixed) } if nRemoved > 0 { logUtils.SendInfoToSlack("%d were marked as published but weren't actually published and thus removed from the database", nRemoved) } } pubsOnDB := 0 for _, sv := range s.syncedVideos { if sv.Published { pubsOnDB++ } } if pubsOnWallet > pubsOnDB { //This case should never happen logUtils.SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.DbChannelData.ChannelId) return errors.Err("not all published videos are in the database") } if pubsOnWallet < pubsOnDB { logUtils.SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.DbChannelData.ChannelId) } //_, err = s.getUnsentSupports() //TODO: use the returned value when it works //if err != nil { // return err //} return nil } func (s *Sync) doSync() error { err := s.enableAddressReuse() if err != nil { return errors.Prefix("could not set address reuse policy", err) } err = s.importPublicKey() if err != nil { return errors.Prefix("could not import the transferee public key", err) } _, err = s.daemon.UTXORelease(nil) if err != nil { return errors.Prefix("could not run uxo_release", err) } err = s.walletSetup() if err != nil { return errors.Prefix("Initial wallet setup failed! Manual Intervention is required.", err) } err = s.checkIntegrity() if err != nil { return err } if s.DbChannelData.TransferState < shared.TransferStateComplete { cert, err := s.daemon.ChannelExport(s.DbChannelData.ChannelClaimID, nil, nil) if err != nil { return errors.Prefix("error getting channel cert", err) } if cert != nil { err = s.Manager.ApiConfig.SetChannelCert(string(*cert), s.DbChannelData.ChannelClaimID) if err != nil { return errors.Prefix("error setting channel cert", err) } } } for i := 0; i < s.Manager.CliFlags.ConcurrentJobs; i++ { s.grp.Add(1) go func(i int) { defer s.grp.Done() s.startWorker(i) }(i) } if s.DbChannelData.DesiredChannelName == "@UCBerkeley" { err = errors.Err("UCB is not supported in this version of YTSYNC") } else { err = s.enqueueYoutubeVideos() } close(s.queue) s.grp.Wait() if err != nil { return err } if s.hardVideoFailure.failed { return errors.Err(s.hardVideoFailure.failureReason) } return nil } func (s *Sync) startWorker(workerNum int) { var v ytapi.Video var more bool for { select { case <-s.grp.Ch(): log.Printf("Stopping worker %d", workerNum) return default: } select { case v, more = <-s.queue: if !more { return } case <-s.grp.Ch(): log.Printf("Stopping worker %d", workerNum) return } log.Println("================================================================================") tryCount := 0 for { select { // check again inside the loop so this dies faster case <-s.grp.Ch(): log.Printf("Stopping worker %d", workerNum) return default: } tryCount++ err := s.processVideo(v) if err != nil { logUtils.SendErrorToSlack("error processing video %s: %s", v.ID(), err.Error()) shouldRetry := s.Manager.CliFlags.MaxTries > 1 && !util.SubstringInSlice(err.Error(), shared.ErrorsNoRetry) && tryCount < s.Manager.CliFlags.MaxTries if strings.Contains(strings.ToLower(err.Error()), "interrupted by user") { s.grp.Stop() } else if util.SubstringInSlice(err.Error(), shared.FatalErrors) { s.hardVideoFailure.flagFailure(err.Error()) s.grp.Stop() } else if shouldRetry { if util.SubstringInSlice(err.Error(), shared.BlockchainErrors) { log.Println("waiting for a block before retrying") err := s.waitForNewBlock() if err != nil { s.grp.Stop() logUtils.SendErrorToSlack("something went wrong while waiting for a block: %s", errors.FullTrace(err)) break } } else if util.SubstringInSlice(err.Error(), shared.WalletErrors) { log.Println("checking funds and UTXOs before retrying...") err := s.walletSetup() if err != nil { s.grp.Stop() logUtils.SendErrorToSlack("failed to setup the wallet for a refill: %s", errors.FullTrace(err)) break } } else if strings.Contains(err.Error(), "Error in daemon: 'str' object has no attribute 'get'") { time.Sleep(5 * time.Second) } log.Println("Retrying") continue } logUtils.SendErrorToSlack("Video %s failed after %d retries, skipping. Stack: %s", v.ID(), tryCount, errors.FullTrace(err)) s.syncedVideosMux.RLock() existingClaim, ok := s.syncedVideos[v.ID()] s.syncedVideosMux.RUnlock() existingClaimID := "" existingClaimName := "" existingClaimSize := int64(0) if v.Size() != nil { existingClaimSize = *v.Size() } if ok { existingClaimID = existingClaim.ClaimID existingClaimName = existingClaim.ClaimName if existingClaim.Size > 0 { existingClaimSize = existingClaim.Size } } videoStatus := shared.VideoStatusFailed if strings.Contains(err.Error(), "upgrade failed") { videoStatus = shared.VideoStatusUpgradeFailed } else { s.AppendSyncedVideo(v.ID(), false, err.Error(), existingClaimName, existingClaimID, 0, existingClaimSize) } err = s.Manager.ApiConfig.MarkVideoStatus(shared.VideoStatus{ ChannelID: s.DbChannelData.ChannelId, VideoID: v.ID(), Status: videoStatus, ClaimID: existingClaimID, ClaimName: existingClaimName, FailureReason: err.Error(), Size: &existingClaimSize, }) if err != nil { logUtils.SendErrorToSlack("Failed to mark video on the database: %s", errors.FullTrace(err)) } } break } } } func (s *Sync) enqueueYoutubeVideos() error { defer func(start time.Time) { timing.TimedComponent("enqueueYoutubeVideos").Add(time.Since(start)) }(time.Now()) ipPool, err := ip_manager.GetIPPool(s.grp) if err != nil { return err } videos, err := ytapi.GetVideosToSync(s.Manager.ApiConfig, s.DbChannelData.ChannelId, s.syncedVideos, s.Manager.CliFlags.QuickSync, s.Manager.CliFlags.VideosToSync(s.DbChannelData.TotalSubscribers), ytapi.VideoParams{ VideoDir: s.videoDirectory, Stopper: s.grp, IPPool: ipPool, }, s.DbChannelData.LastUploadedVideo) if err != nil { return err } Enqueue: for _, v := range videos { select { case <-s.grp.Ch(): break Enqueue default: } select { case s.queue <- v: case <-s.grp.Ch(): break Enqueue } } return nil } func (s *Sync) processVideo(v ytapi.Video) (err error) { defer func() { if p := recover(); p != nil { logUtils.SendErrorToSlack("Video processing panic! %s", debug.Stack()) var ok bool err, ok = p.(error) if !ok { err = errors.Err("%v", p) } err = errors.Wrap(p, 2) } }() log.Println("Processing " + v.IDAndNum()) defer func(start time.Time) { log.Println(v.ID() + " took " + time.Since(start).String()) }(time.Now()) s.syncedVideosMux.RLock() sv, ok := s.syncedVideos[v.ID()] s.syncedVideosMux.RUnlock() newMetadataVersion := int8(2) alreadyPublished := ok && sv.Published videoRequiresUpgrade := ok && s.Manager.CliFlags.UpgradeMetadata && sv.MetadataVersion < newMetadataVersion neverRetryFailures := shared.NeverRetryFailures if ok && !sv.Published && util.SubstringInSlice(sv.FailureReason, neverRetryFailures) { log.Println(v.ID() + " can't ever be published") return nil } if alreadyPublished && !videoRequiresUpgrade { log.Println(v.ID() + " already published") return nil } if ok && sv.MetadataVersion >= newMetadataVersion { log.Println(v.ID() + " upgraded to the new metadata") return nil } if !videoRequiresUpgrade && v.PlaylistPosition() >= s.Manager.CliFlags.VideosToSync(s.DbChannelData.TotalSubscribers) { log.Println(v.ID() + " is old: skipping") return nil } err = s.Manager.checkUsedSpace() if err != nil { return err } da, err := s.getDefaultAccount() if err != nil { return err } sp := sources.SyncParams{ ClaimAddress: s.DbChannelData.PublishAddress.Address, Amount: publishAmount, ChannelID: s.DbChannelData.ChannelClaimID, MaxVideoSize: s.DbChannelData.SizeLimit, Namer: s.namer, MaxVideoLength: time.Duration(s.DbChannelData.LengthLimit) * time.Minute, Fee: s.DbChannelData.Fee, DefaultAccount: da, } summary, err := v.Sync(s.daemon, sp, &sv, videoRequiresUpgrade, s.walletMux, s.progressBarWg, s.progressBar) if err != nil { return err } s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName, summary.ClaimID, newMetadataVersion, *v.Size()) err = s.Manager.ApiConfig.MarkVideoStatus(shared.VideoStatus{ ChannelID: s.DbChannelData.ChannelId, VideoID: v.ID(), Status: shared.VideoStatusPublished, ClaimID: summary.ClaimID, ClaimName: summary.ClaimName, Size: v.Size(), MetaDataVersion: shared.LatestMetadataVersion, IsTransferred: util.PtrToBool(s.shouldTransfer()), }) if err != nil { logUtils.SendErrorToSlack("Failed to mark video on the database: %s", errors.FullTrace(err)) } return nil } func (s *Sync) importPublicKey() error { if s.DbChannelData.PublicKey != "" { accountsResponse, err := s.daemon.AccountList(1, 50) if err != nil { return errors.Err(err) } ledger := "lbc_mainnet" if logUtils.IsRegTest() { ledger = "lbc_regtest" } for _, a := range accountsResponse.Items { if *a.Ledger == ledger { if a.PublicKey == s.DbChannelData.PublicKey { return nil } } } log.Infof("Could not find public key %s in the wallet. Importing it...", s.DbChannelData.PublicKey) _, err = s.daemon.AccountAdd(s.DbChannelData.DesiredChannelName, nil, nil, &s.DbChannelData.PublicKey, util.PtrToBool(true), nil) return errors.Err(err) } return nil } //TODO: fully implement this once I find a way to reliably get the abandoned supports amount func (s *Sync) getUnsentSupports() (float64, error) { defaultAccount, err := s.getDefaultAccount() if err != nil { return 0, errors.Err(err) } if s.DbChannelData.TransferState == 2 { balance, err := s.daemon.AccountBalance(&defaultAccount) if err != nil { return 0, err } else if balance == nil { return 0, errors.Err("no response") } balanceAmount, err := strconv.ParseFloat(balance.Available.String(), 64) if err != nil { return 0, errors.Err(err) } transactionList, err := s.daemon.TransactionList(&defaultAccount, nil, 1, 90000) if err != nil { return 0, errors.Err(err) } sentSupports := 0.0 for _, t := range transactionList.Items { if len(t.SupportInfo) == 0 { continue } for _, support := range t.SupportInfo { supportAmount, err := strconv.ParseFloat(support.BalanceDelta, 64) if err != nil { return 0, err } if supportAmount < 0 { // && support.IsTip TODO: re-enable this when transaction list shows correct information sentSupports += -supportAmount } } } if balanceAmount > 10 && sentSupports < 1 && s.DbChannelData.TransferState > 1 { logUtils.SendErrorToSlack("(%s) this channel has quite some LBCs in it (%.2f) and %.2f LBC in sent tips, it's likely that the tips weren't actually sent or the wallet has unnecessary extra credits in it", s.DbChannelData.ChannelId, balanceAmount, sentSupports) return balanceAmount - 10, nil } } return 0, nil } // waitForDaemonProcess observes the running processes and returns when the process is no longer running or when the timeout is up func waitForDaemonProcess(timeout time.Duration) error { stopTime := time.Now().Add(timeout * time.Second) for !time.Now().After(stopTime) { wait := 10 * time.Second log.Println("the daemon is still running, waiting for it to exit") time.Sleep(wait) running, err := logUtils.IsLbrynetRunning() if err != nil { return errors.Err(err) } if !running { log.Println("daemon stopped") return nil } } return errors.Err("timeout reached") }