2019-01-11 03:02:26 +01:00
package manager
2017-10-11 04:02:16 +02:00
import (
2018-05-17 01:42:06 +02:00
"fmt"
2017-10-11 04:02:16 +02:00
"io/ioutil"
"net/http"
"os"
2017-12-28 18:14:33 +01:00
"os/signal"
2018-10-09 21:57:07 +02:00
"runtime/debug"
2017-10-11 04:02:16 +02:00
"sort"
"strings"
"sync"
2017-12-28 18:14:33 +01:00
"syscall"
2017-10-11 04:02:16 +02:00
"time"
2018-10-08 22:19:17 +02:00
"github.com/lbryio/ytsync/namer"
"github.com/lbryio/ytsync/sdk"
"github.com/lbryio/ytsync/sources"
2019-05-03 05:11:52 +02:00
"github.com/lbryio/ytsync/thumbs"
2019-07-15 22:16:02 +02:00
logUtils "github.com/lbryio/ytsync/util"
2018-10-08 22:19:17 +02:00
2019-01-11 02:34:34 +01:00
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/jsonrpc"
"github.com/lbryio/lbry.go/extras/stop"
"github.com/lbryio/lbry.go/extras/util"
2018-09-26 06:08:18 +02:00
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
2018-06-13 18:44:09 +02:00
"github.com/mitchellh/go-ps"
2017-10-11 04:02:16 +02:00
log "github.com/sirupsen/logrus"
"google.golang.org/api/googleapi/transport"
2018-06-09 01:14:55 +02:00
"google.golang.org/api/youtube/v3"
2017-10-11 04:02:16 +02:00
)
const (
2019-06-12 03:17:59 +02:00
channelClaimAmount = 0.01
estimatedMaxTxFee = 0.1
minimumAccountBalance = 4.0
2019-06-12 03:35:21 +02:00
minimumRefillAmount = 1
2019-06-12 03:17:59 +02:00
publishAmount = 0.01
maxReasonLength = 500
2017-10-11 04:02:16 +02:00
)
2017-12-28 18:14:33 +01:00
type video interface {
2018-08-14 17:09:23 +02:00
Size ( ) * int64
2017-12-28 18:14:33 +01:00
ID ( ) string
IDAndNum ( ) string
2018-04-25 20:56:26 +02:00
PlaylistPosition ( ) int
2017-12-28 18:14:33 +01:00
PublishedAt ( ) time . Time
2019-06-12 03:17:59 +02:00
Sync ( * jsonrpc . Client , sources . SyncParams , * sdk . SyncedVideo , bool , * sync . RWMutex ) ( * sources . SyncSummary , error )
2017-12-28 18:14:33 +01:00
}
// sorting videos
type byPublishedAt [ ] video
func ( a byPublishedAt ) Len ( ) int { return len ( a ) }
func ( a byPublishedAt ) Swap ( i , j int ) { a [ i ] , a [ j ] = a [ j ] , a [ i ] }
func ( a byPublishedAt ) Less ( i , j int ) bool { return a [ i ] . PublishedAt ( ) . Before ( a [ j ] . PublishedAt ( ) ) }
2017-10-11 04:02:16 +02:00
// Sync stores the options that control how syncing happens
type Sync struct {
2019-08-30 21:08:28 +02:00
APIConfig * sdk . APIConfig
YoutubeChannelID string
LbryChannelName string
MaxTries int
ConcurrentVideos int
Refill int
Manager * SyncManager
LbrycrdString string
AwsS3ID string
AwsS3Secret string
AwsS3Region string
AwsS3Bucket string
Fee * sdk . Fee
daemon * jsonrpc . Client
claimAddress string
videoDirectory string
syncedVideosMux * sync . RWMutex
syncedVideos map [ string ] sdk . SyncedVideo
grp * stop . Group
lbryChannelID string
namer * namer . Namer
walletMux * sync . RWMutex
queue chan video
transferState int
publishAddress string
2018-08-03 19:21:42 +02:00
}
2019-06-12 03:35:21 +02:00
func ( s * Sync ) AppendSyncedVideo ( videoID string , published bool , failureReason string , claimName string , claimID string , metadataVersion int8 , size int64 ) {
2018-08-14 16:48:55 +02:00
s . syncedVideosMux . Lock ( )
defer s . syncedVideosMux . Unlock ( )
2018-09-26 06:08:18 +02:00
s . syncedVideos [ videoID ] = sdk . SyncedVideo {
2019-06-12 03:35:21 +02:00
VideoID : videoID ,
Published : published ,
FailureReason : failureReason ,
ClaimID : claimID ,
ClaimName : claimName ,
MetadataVersion : metadataVersion ,
Size : size ,
2018-08-03 19:21:42 +02:00
}
2017-11-02 16:20:22 +01:00
}
2018-05-24 02:32:11 +02:00
// IsInterrupted can be queried to discover if the sync process was interrupted manually
func ( s * Sync ) IsInterrupted ( ) bool {
select {
2018-07-12 14:28:20 +02:00
case <- s . grp . Ch ( ) :
2018-05-24 02:32:11 +02:00
return true
default :
return false
}
}
2018-08-09 22:54:23 +02:00
func ( s * Sync ) downloadWallet ( ) error {
2019-07-31 05:32:02 +02:00
defaultWalletDir , defaultTempWalletDir , key , err := s . getWalletPaths ( )
if err != nil {
return errors . Err ( err )
2018-08-09 22:54:23 +02:00
}
creds := credentials . NewStaticCredentials ( s . AwsS3ID , s . AwsS3Secret , "" )
s3Session , err := session . NewSession ( & aws . Config { Region : aws . String ( s . AwsS3Region ) , Credentials : creds } )
if err != nil {
2019-08-04 00:34:48 +02:00
return errors . Prefix ( "error starting session: " , err )
2018-08-09 22:54:23 +02:00
}
downloader := s3manager . NewDownloader ( s3Session )
out , err := os . Create ( defaultTempWalletDir )
if err != nil {
2019-08-04 00:34:48 +02:00
return errors . Prefix ( "error creating temp wallet: " , err )
2018-08-09 22:54:23 +02:00
}
defer out . Close ( )
bytesWritten , err := downloader . Download ( out , & s3 . GetObjectInput {
Bucket : aws . String ( s . AwsS3Bucket ) ,
Key : key ,
} )
if err != nil {
// Casting to the awserr.Error type will allow you to inspect the error
// code returned by the service in code. The error code can be used
// to switch on context specific functionality. In this case a context
// specific error message is printed to the user based on the bucket
// and key existing.
//
// For information on other S3 API error codes see:
// http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
if aerr , ok := err . ( awserr . Error ) ; ok {
code := aerr . Code ( )
if code == s3 . ErrCodeNoSuchKey {
return errors . Err ( "wallet not on S3" )
}
}
return err
} else if bytesWritten == 0 {
return errors . Err ( "zero bytes written" )
}
2019-08-04 00:34:48 +02:00
err = os . Rename ( defaultTempWalletDir , defaultWalletDir )
if err != nil {
return errors . Prefix ( "error replacing temp wallet for default wallet: " , err )
}
return nil
2018-08-09 22:54:23 +02:00
}
2019-07-31 05:32:02 +02:00
func ( s * Sync ) getWalletPaths ( ) ( defaultWallet , tempWallet string , key * string , err error ) {
defaultWallet = os . Getenv ( "HOME" ) + "/.lbryum/wallets/default_wallet"
tempWallet = os . Getenv ( "HOME" ) + "/.lbryum/wallets/tmp_wallet"
key = aws . String ( "/wallets/" + s . YoutubeChannelID )
2019-08-04 00:34:48 +02:00
if logUtils . IsRegTest ( ) {
2019-07-31 05:32:02 +02:00
defaultWallet = os . Getenv ( "HOME" ) + "/.lbryum_regtest/wallets/default_wallet"
tempWallet = os . Getenv ( "HOME" ) + "/.lbryum_regtest/wallets/tmp_wallet"
key = aws . String ( "/regtest/" + s . YoutubeChannelID )
}
walletPath := os . Getenv ( "LBRYNET_WALLETS_DIR" )
if walletPath != "" {
defaultWallet = walletPath + "/wallets/default_wallet"
tempWallet = walletPath + "/wallets/tmp_wallet"
}
if _ , err := os . Stat ( defaultWallet ) ; ! os . IsNotExist ( err ) {
return "" , "" , nil , errors . Err ( "default_wallet already exists" )
}
return
}
2018-08-09 22:54:23 +02:00
func ( s * Sync ) uploadWallet ( ) error {
2019-08-04 00:34:48 +02:00
defaultWalletDir := logUtils . GetDefaultWalletPath ( )
2018-08-09 22:54:23 +02:00
key := aws . String ( "/wallets/" + s . YoutubeChannelID )
2019-08-04 00:34:48 +02:00
if logUtils . IsRegTest ( ) {
2018-08-09 22:54:23 +02:00
key = aws . String ( "/regtest/" + s . YoutubeChannelID )
}
if _ , err := os . Stat ( defaultWalletDir ) ; os . IsNotExist ( err ) {
return errors . Err ( "default_wallet does not exist" )
}
creds := credentials . NewStaticCredentials ( s . AwsS3ID , s . AwsS3Secret , "" )
s3Session , err := session . NewSession ( & aws . Config { Region : aws . String ( s . AwsS3Region ) , Credentials : creds } )
if err != nil {
return err
}
uploader := s3manager . NewUploader ( s3Session )
file , err := os . Open ( defaultWalletDir )
if err != nil {
return err
}
defer file . Close ( )
_ , err = uploader . Upload ( & s3manager . UploadInput {
Bucket : aws . String ( s . AwsS3Bucket ) ,
Key : key ,
Body : file ,
} )
if err != nil {
return err
}
return os . Remove ( defaultWalletDir )
}
2018-09-20 01:05:47 +02:00
func ( s * Sync ) setStatusSyncing ( ) error {
2019-08-21 19:40:35 +02:00
syncedVideos , claimNames , err := s . Manager . apiConfig . SetChannelStatus ( s . YoutubeChannelID , StatusSyncing , "" , nil )
2018-09-20 01:05:47 +02:00
if err != nil {
return err
}
s . syncedVideosMux . Lock ( )
s . syncedVideos = syncedVideos
2018-10-09 21:57:07 +02:00
s . namer . SetNames ( claimNames )
2018-09-20 01:05:47 +02:00
s . syncedVideosMux . Unlock ( )
return nil
}
2019-01-03 19:55:27 +01:00
func ( s * Sync ) setExceptions ( ) {
if s . YoutubeChannelID == "UCwjQfNRW6sGYb__pd7d4nUg" { //@FreeTalkLive
s . Manager . maxVideoLength = 0.0 // skips max length checks
s . Manager . maxVideoSize = 0
}
}
2018-07-17 18:54:22 +02:00
func ( s * Sync ) FullCycle ( ) ( e error ) {
2017-11-03 16:46:19 +01:00
if os . Getenv ( "HOME" ) == "" {
2018-03-09 17:47:38 +01:00
return errors . Err ( "no $HOME env var found" )
2017-11-03 16:46:19 +01:00
}
2017-12-30 01:21:16 +01:00
if s . YoutubeChannelID == "" {
2018-07-17 18:54:22 +02:00
return errors . Err ( "channel ID not provided" )
}
2019-01-03 19:55:27 +01:00
s . setExceptions ( )
2018-08-23 00:28:31 +02:00
s . syncedVideosMux = & sync . RWMutex { }
2019-06-12 03:17:59 +02:00
s . walletMux = & sync . RWMutex { }
2018-08-14 16:48:55 +02:00
s . grp = stop . New ( )
s . queue = make ( chan video )
interruptChan := make ( chan os . Signal , 1 )
signal . Notify ( interruptChan , os . Interrupt , syscall . SIGTERM )
2018-08-21 19:17:52 +02:00
defer signal . Stop ( interruptChan )
2018-08-14 16:48:55 +02:00
go func ( ) {
<- interruptChan
log . Println ( "Got interrupt signal, shutting down (if publishing, will shut down after current publish)" )
s . grp . Stop ( )
} ( )
2018-09-20 01:05:47 +02:00
err := s . setStatusSyncing ( )
2018-07-17 18:54:22 +02:00
if err != nil {
return err
2017-12-30 01:21:16 +01:00
}
2018-07-31 19:42:20 +02:00
2018-09-20 01:05:47 +02:00
defer s . setChannelTerminationStatus ( & e )
2017-11-03 16:46:19 +01:00
2018-08-09 22:54:23 +02:00
err = s . downloadWallet ( )
if err != nil && err . Error ( ) != "wallet not on S3" {
2018-10-03 02:51:42 +02:00
return errors . Prefix ( "failure in downloading wallet" , err )
2018-08-09 22:54:23 +02:00
} else if err == nil {
2018-08-08 23:59:59 +02:00
log . Println ( "Continuing previous upload" )
2018-08-09 22:54:23 +02:00
} else {
log . Println ( "Starting new wallet" )
2017-11-03 13:46:27 +01:00
}
2018-08-09 22:54:23 +02:00
defer s . stopAndUploadWallet ( & e )
2017-11-03 13:46:27 +01:00
2019-08-02 03:43:41 +02:00
s . videoDirectory , err = ioutil . TempDir ( os . Getenv ( "TMP_DIR" ) , "ytsync" )
2017-12-28 18:14:33 +01:00
if err != nil {
return errors . Wrap ( err , 0 )
}
2019-08-11 04:50:43 +02:00
err = os . Chmod ( s . videoDirectory , 0766 )
if err != nil {
return errors . Err ( err )
}
2017-11-03 13:46:27 +01:00
2019-06-25 02:43:50 +02:00
defer deleteSyncFolder ( s . videoDirectory )
2017-12-28 18:14:33 +01:00
log . Printf ( "Starting daemon" )
2019-08-04 00:34:48 +02:00
err = logUtils . StartDaemon ( )
2017-11-03 13:46:27 +01:00
if err != nil {
return err
}
2017-12-28 18:14:33 +01:00
log . Infoln ( "Waiting for daemon to finish starting..." )
2019-07-31 05:32:02 +02:00
s . daemon = jsonrpc . NewClient ( os . Getenv ( "LBRYNET_ADDRESS" ) )
2018-06-18 01:50:59 +02:00
s . daemon . SetRPCTimeout ( 40 * time . Minute )
2018-03-09 01:27:54 +01:00
2018-08-09 22:54:23 +02:00
err = s . waitForDaemonStart ( )
if err != nil {
return err
2018-03-09 01:27:54 +01:00
}
2017-11-03 13:46:27 +01:00
2019-08-16 05:34:25 +02:00
err = s . doSync ( )
if err != nil {
return err
}
2019-08-21 19:40:35 +02:00
if s . shouldTransfer ( ) {
2019-08-27 19:49:51 +02:00
err := waitConfirmations ( s )
if err != nil {
return err
}
2019-08-28 21:08:25 +02:00
supportAmount , err := abandonSupports ( s )
2019-08-27 19:49:51 +02:00
if err != nil {
2019-08-28 21:08:25 +02:00
return errors . Prefix ( fmt . Sprintf ( "%.6f LBCs were abandoned before failing" , supportAmount ) , err )
2019-08-27 19:49:51 +02:00
}
2019-08-27 00:31:27 +02:00
err = transferVideos ( s )
if err != nil {
return err
}
2019-08-28 21:08:25 +02:00
err = transferChannel ( s )
if err != nil {
return err
}
reallocateSupports := supportAmount > 0.01
if reallocateSupports {
err = waitConfirmations ( s )
if err != nil {
return err
}
isTip := true
summary , err := s . daemon . SupportCreate ( s . lbryChannelID , fmt . Sprintf ( "%.6f" , supportAmount ) , & isTip , nil , nil )
if err != nil {
return errors . Err ( err )
}
if len ( summary . Outputs ) < 1 {
return errors . Err ( "something went wrong while tipping the channel for %.6f LBCs" , supportAmount )
}
}
return nil
2019-08-16 05:34:25 +02:00
}
return nil
2017-11-03 13:46:27 +01:00
}
2019-05-03 05:11:52 +02:00
2019-06-25 02:43:50 +02:00
func deleteSyncFolder ( videoDirectory string ) {
if ! strings . Contains ( videoDirectory , "/tmp/ytsync" ) {
_ = util . SendToSlack ( errors . Err ( "Trying to delete an unexpected directory: %s" , videoDirectory ) . Error ( ) )
}
err := os . RemoveAll ( videoDirectory )
if err != nil {
_ = util . SendToSlack ( err . Error ( ) )
}
}
2019-08-21 19:40:35 +02:00
func ( s * Sync ) shouldTransfer ( ) bool {
2019-08-30 21:08:28 +02:00
return s . transferState == 1 && s . publishAddress != "" && ! s . Manager . SyncFlags . DisableTransfers
2019-08-21 19:40:35 +02:00
}
2018-09-20 01:05:47 +02:00
func ( s * Sync ) setChannelTerminationStatus ( e * error ) {
2019-08-21 19:40:35 +02:00
var transferState * int
if s . shouldTransfer ( ) {
if * e != nil {
transferState = util . PtrToInt ( TransferStateFailed )
2019-08-23 01:28:51 +02:00
} else {
transferState = util . PtrToInt ( TransferStateComplete )
2019-08-21 19:40:35 +02:00
}
}
2018-08-09 22:54:23 +02:00
if * e != nil {
//conditions for which a channel shouldn't be marked as failed
noFailConditions := [ ] string {
"this youtube channel is being managed by another server" ,
2019-02-22 19:33:00 +01:00
"interrupted during daemon startup" ,
2018-08-09 22:54:23 +02:00
}
if util . SubstringInSlice ( ( * e ) . Error ( ) , noFailConditions ) {
return
}
2018-08-17 16:05:54 +02:00
failureReason := ( * e ) . Error ( )
2019-08-21 19:40:35 +02:00
_ , _ , err := s . Manager . apiConfig . SetChannelStatus ( s . YoutubeChannelID , StatusFailed , failureReason , transferState )
2018-08-09 22:54:23 +02:00
if err != nil {
2018-10-03 02:51:42 +02:00
msg := fmt . Sprintf ( "Failed setting failed state for channel %s" , s . LbryChannelName )
* e = errors . Prefix ( msg + err . Error ( ) , * e )
2018-08-09 22:54:23 +02:00
}
} else if ! s . IsInterrupted ( ) {
2019-08-21 19:40:35 +02:00
_ , _ , err := s . Manager . apiConfig . SetChannelStatus ( s . YoutubeChannelID , StatusSynced , "" , transferState )
2018-08-09 22:54:23 +02:00
if err != nil {
* e = err
}
}
}
func ( s * Sync ) waitForDaemonStart ( ) error {
2019-08-13 23:05:09 +02:00
beginTime := time . Now ( )
2018-08-09 22:54:23 +02:00
for {
select {
case <- s . grp . Ch ( ) :
return errors . Err ( "interrupted during daemon startup" )
default :
2019-08-13 23:05:09 +02:00
status , err := s . daemon . Status ( )
if err == nil && status . StartupStatus . Wallet && status . IsRunning {
2018-08-09 22:54:23 +02:00
return nil
}
2019-08-13 23:05:09 +02:00
if time . Since ( beginTime ) . Minutes ( ) > 60 {
s . grp . Stop ( )
return errors . Err ( "the daemon is taking too long to start. Something is wrong" )
}
2018-08-09 22:54:23 +02:00
time . Sleep ( 5 * time . Second )
}
}
}
2018-09-21 16:26:27 +02:00
2018-08-09 22:54:23 +02:00
func ( s * Sync ) stopAndUploadWallet ( e * error ) {
log . Printf ( "Stopping daemon" )
2019-08-04 00:34:48 +02:00
shutdownErr := logUtils . StopDaemon ( )
2018-08-09 22:54:23 +02:00
if shutdownErr != nil {
logShutdownError ( shutdownErr )
} else {
// the cli will return long before the daemon effectively stops. we must observe the processes running
// before moving the wallet
waitTimeout := 8 * time . Minute
processDeathError := waitForDaemonProcess ( waitTimeout )
if processDeathError != nil {
logShutdownError ( processDeathError )
} else {
err := s . uploadWallet ( )
if err != nil {
if * e == nil {
2019-02-22 19:33:00 +01:00
e = & err
2018-08-09 22:54:23 +02:00
return
} else {
2018-10-03 02:51:42 +02:00
* e = errors . Prefix ( "failure uploading wallet" , * e )
2018-08-09 22:54:23 +02:00
}
}
}
}
}
2018-04-26 12:12:54 +02:00
func logShutdownError ( shutdownErr error ) {
2019-08-04 00:34:48 +02:00
logUtils . SendErrorToSlack ( "error shutting down daemon: %s" , errors . FullTrace ( shutdownErr ) )
2019-07-15 22:16:02 +02:00
logUtils . SendErrorToSlack ( "WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR" )
2018-04-26 12:12:54 +02:00
}
2017-11-03 13:46:27 +01:00
2019-04-19 03:22:51 +02:00
var thumbnailHosts = [ ] string {
"berk.ninja/thumbnails/" ,
2019-05-03 05:11:52 +02:00
thumbs . ThumbnailEndpoint ,
2019-04-19 03:22:51 +02:00
}
2019-01-03 17:01:00 +01:00
func isYtsyncClaim ( c jsonrpc . Claim ) bool {
2019-04-19 03:22:51 +02:00
if ! util . InSlice ( c . Type , [ ] string { "claim" , "update" } ) || c . Value . GetStream ( ) == nil {
2019-01-03 17:01:00 +01:00
return false
}
2019-04-25 13:16:45 +02:00
if c . Value . GetThumbnail ( ) == nil || c . Value . GetThumbnail ( ) . GetUrl ( ) == "" {
2019-01-03 17:01:00 +01:00
//most likely a claim created outside of ytsync, ignore!
return false
}
2019-05-08 23:12:13 +02:00
for _ , th := range thumbnailHosts {
if strings . Contains ( c . Value . GetThumbnail ( ) . GetUrl ( ) , th ) {
return true
}
}
return false
2019-01-03 17:01:00 +01:00
}
2018-09-20 01:05:47 +02:00
// fixDupes abandons duplicate claims
func ( s * Sync ) fixDupes ( claims [ ] jsonrpc . Claim ) ( bool , error ) {
abandonedClaims := false
videoIDs := make ( map [ string ] jsonrpc . Claim )
2018-08-31 17:42:15 +02:00
for _ , c := range claims {
2019-01-03 17:01:00 +01:00
if ! isYtsyncClaim ( c ) {
2018-08-31 17:42:15 +02:00
continue
}
2019-04-25 13:16:45 +02:00
tn := c . Value . GetThumbnail ( ) . GetUrl ( )
2018-09-03 23:38:16 +02:00
videoID := tn [ strings . LastIndex ( tn , "/" ) + 1 : ]
log . Infof ( "claimid: %s, claimName: %s, videoID: %s" , c . ClaimID , c . Name , videoID )
2018-09-20 01:05:47 +02:00
cl , ok := videoIDs [ videoID ]
if ! ok || cl . ClaimID == c . ClaimID {
videoIDs [ videoID ] = c
2018-08-31 17:42:15 +02:00
continue
}
2018-09-20 01:05:47 +02:00
// only keep the most recent one
claimToAbandon := c
videoIDs [ videoID ] = cl
if c . Height > cl . Height {
claimToAbandon = cl
videoIDs [ videoID ] = c
}
2019-01-03 17:01:00 +01:00
log . Debugf ( "abandoning %+v" , claimToAbandon )
2019-04-19 03:22:51 +02:00
_ , err := s . daemon . StreamAbandon ( claimToAbandon . Txid , claimToAbandon . Nout , nil , false )
2018-09-20 01:05:47 +02:00
if err != nil {
return true , err
}
abandonedClaims = true
//return true, nil
2018-08-31 17:42:15 +02:00
}
2018-09-20 01:05:47 +02:00
return abandonedClaims , nil
2018-08-31 17:42:15 +02:00
}
2018-09-20 01:05:47 +02:00
//updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published
2019-06-06 23:25:31 +02:00
//additionally it removes all entries in the database indicating that a video is published when it's actually not
2019-06-04 22:21:40 +02:00
func ( s * Sync ) updateRemoteDB ( claims [ ] jsonrpc . Claim ) ( total , fixed , removed int , err error ) {
2018-08-31 17:42:15 +02:00
count := 0
2019-06-04 22:21:40 +02:00
videoIDMap := make ( map [ string ] string , len ( claims ) )
2018-08-31 17:42:15 +02:00
for _ , c := range claims {
2019-01-03 17:01:00 +01:00
if ! isYtsyncClaim ( c ) {
2018-08-31 17:42:15 +02:00
continue
}
2019-01-03 17:01:00 +01:00
count ++
2018-09-20 01:05:47 +02:00
//check if claimID is in remote db
2019-04-25 13:16:45 +02:00
tn := c . Value . GetThumbnail ( ) . GetUrl ( )
2018-09-20 01:05:47 +02:00
videoID := tn [ strings . LastIndex ( tn , "/" ) + 1 : ]
2019-06-04 22:21:40 +02:00
videoIDMap [ videoID ] = c . ClaimID
2019-06-26 20:40:40 +02:00
s . syncedVideosMux . RLock ( )
2019-06-10 01:41:52 +02:00
pv , claimInDatabase := s . syncedVideos [ videoID ]
2019-06-26 20:40:40 +02:00
s . syncedVideosMux . RUnlock ( )
2019-06-10 01:41:52 +02:00
claimMetadataVersion := uint ( 1 )
if strings . Contains ( tn , thumbs . ThumbnailEndpoint ) {
claimMetadataVersion = 2
}
metadataDiffers := claimInDatabase && pv . MetadataVersion != int8 ( claimMetadataVersion )
claimIDDiffers := claimInDatabase && pv . ClaimID != c . ClaimID
claimNameDiffers := claimInDatabase && pv . ClaimName != c . Name
claimMarkedUnpublished := claimInDatabase && ! pv . Published
if metadataDiffers {
log . Debugf ( "%s: Mismatch in database for metadata. DB: %d - Blockchain: %d" , videoID , pv . MetadataVersion , claimMetadataVersion )
}
if claimIDDiffers {
log . Debugf ( "%s: Mismatch in database for claimID. DB: %s - Blockchain: %s" , videoID , pv . ClaimID , c . ClaimID )
}
2019-06-20 21:45:17 +02:00
if claimNameDiffers {
2019-06-10 01:41:52 +02:00
log . Debugf ( "%s: Mismatch in database for claimName. DB: %s - Blockchain: %s" , videoID , pv . ClaimName , c . Name )
}
if claimMarkedUnpublished {
log . Debugf ( "%s: Mismatch in database: published but marked as unpublished" , videoID )
}
if ! claimInDatabase {
2019-06-10 21:37:13 +02:00
log . Debugf ( "%s: Published but is not in database (%s - %s)" , videoID , c . Name , c . ClaimID )
2019-06-10 01:41:52 +02:00
}
if ! claimInDatabase || metadataDiffers || claimIDDiffers || claimNameDiffers || claimMarkedUnpublished {
claimSize , err := c . GetStreamSizeByMagic ( )
2019-06-01 01:46:16 +02:00
if err != nil {
2019-06-10 01:41:52 +02:00
claimSize = 0
2019-06-01 01:46:16 +02:00
}
2019-06-10 01:41:52 +02:00
fixed ++
log . Debugf ( "updating %s in the database" , videoID )
2019-08-20 07:06:51 +02:00
err = s . Manager . apiConfig . MarkVideoStatus ( sdk . VideoStatus {
ChannelID : s . YoutubeChannelID ,
VideoID : videoID ,
Status : VideoStatusPublished ,
ClaimID : c . ClaimID ,
ClaimName : c . Name ,
Size : util . PtrToInt64 ( int64 ( claimSize ) ) ,
MetaDataVersion : claimMetadataVersion ,
} )
2018-09-20 01:05:47 +02:00
if err != nil {
2019-06-04 22:21:40 +02:00
return count , fixed , 0 , err
2018-09-20 01:05:47 +02:00
}
2018-08-31 17:42:15 +02:00
}
}
2019-06-04 22:21:40 +02:00
idsToRemove := make ( [ ] string , 0 , len ( videoIDMap ) )
for vID , sv := range s . syncedVideos {
2019-08-23 01:28:51 +02:00
if sv . Transferred {
log . Infof ( "%s: claim was transferred, ignoring" )
continue
}
2019-06-04 22:21:40 +02:00
_ , ok := videoIDMap [ vID ]
if ! ok && sv . Published {
2019-08-30 21:08:28 +02:00
log . Debugf ( "%s: claims to be published but wasn't found in the list of claims and will be removed if --remove-db-unpublished was specified (%t)" , vID , s . Manager . SyncFlags . RemoveDBUnpublished )
2019-06-04 22:21:40 +02:00
idsToRemove = append ( idsToRemove , vID )
}
}
2019-08-30 21:08:28 +02:00
if s . Manager . SyncFlags . RemoveDBUnpublished && len ( idsToRemove ) > 0 {
2019-08-30 19:35:04 +02:00
log . Infof ( "removing: %s" , strings . Join ( idsToRemove , "," ) )
2019-06-04 22:21:40 +02:00
err := s . Manager . apiConfig . DeleteVideos ( idsToRemove )
if err != nil {
2019-08-23 01:28:51 +02:00
return count , fixed , len ( idsToRemove ) , err
2019-06-04 22:21:40 +02:00
}
}
2019-08-23 01:28:51 +02:00
return count , fixed , 0 , nil
2018-08-31 17:42:15 +02:00
}
2019-01-30 13:42:23 +01:00
func ( s * Sync ) getClaims ( ) ( [ ] jsonrpc . Claim , error ) {
totalPages := uint64 ( 1 )
var allClaims [ ] jsonrpc . Claim
for page := uint64 ( 1 ) ; page <= totalPages ; page ++ {
2019-04-19 03:22:51 +02:00
claims , err := s . daemon . ClaimList ( nil , page , 50 )
2019-01-30 13:42:23 +01:00
if err != nil {
return nil , errors . Prefix ( "cannot list claims" , err )
}
allClaims = append ( allClaims , ( * claims ) . Claims ... )
totalPages = ( * claims ) . TotalPages
}
return allClaims , nil
}
2019-08-23 01:28:51 +02:00
func ( s * Sync ) checkIntegrity ( ) error {
2019-01-30 13:42:23 +01:00
allClaims , err := s . getClaims ( )
2018-08-31 17:42:15 +02:00
if err != nil {
2019-01-30 13:42:23 +01:00
return err
2018-08-31 17:42:15 +02:00
}
2019-01-30 13:42:23 +01:00
hasDupes , err := s . fixDupes ( allClaims )
2018-08-31 17:42:15 +02:00
if err != nil {
2018-10-03 02:51:42 +02:00
return errors . Prefix ( "error checking for duplicates" , err )
2018-08-31 17:42:15 +02:00
}
if hasDupes {
2019-07-15 22:16:02 +02:00
logUtils . SendInfoToSlack ( "Channel had dupes and was fixed!" )
2018-09-21 16:26:27 +02:00
err = s . waitForNewBlock ( )
if err != nil {
return err
}
2019-01-30 13:42:23 +01:00
allClaims , err = s . getClaims ( )
2018-09-20 01:05:47 +02:00
if err != nil {
2019-01-30 13:42:23 +01:00
return err
2018-09-20 01:05:47 +02:00
}
2018-08-31 17:42:15 +02:00
}
2018-09-20 01:05:47 +02:00
2019-06-04 22:21:40 +02:00
pubsOnWallet , nFixed , nRemoved , err := s . updateRemoteDB ( allClaims )
2018-08-31 17:42:15 +02:00
if err != nil {
2019-06-06 16:31:35 +02:00
return errors . Prefix ( "error updating remote database" , err )
2018-08-31 17:42:15 +02:00
}
2019-08-14 03:31:41 +02:00
2019-06-04 22:21:40 +02:00
if nFixed > 0 || nRemoved > 0 {
2018-09-20 01:05:47 +02:00
err := s . setStatusSyncing ( )
if err != nil {
return err
}
2019-06-04 22:21:40 +02:00
if nFixed > 0 {
2019-07-15 22:16:02 +02:00
logUtils . SendInfoToSlack ( "%d claims had mismatched database info or were completely missing and were fixed" , nFixed )
2019-06-04 22:21:40 +02:00
}
if nRemoved > 0 {
2019-07-15 22:16:02 +02:00
logUtils . SendInfoToSlack ( "%d were marked as published but weren't actually published and thus removed from the database" , nRemoved )
2019-06-04 22:21:40 +02:00
}
2018-09-20 01:05:47 +02:00
}
2018-08-31 17:42:15 +02:00
pubsOnDB := 0
for _ , sv := range s . syncedVideos {
if sv . Published {
pubsOnDB ++
}
}
2018-09-20 01:05:47 +02:00
if pubsOnWallet > pubsOnDB { //This case should never happen
2019-07-15 22:16:02 +02:00
logUtils . SendInfoToSlack ( "We're claiming to have published %d videos but in reality we published %d (%s)" , pubsOnDB , pubsOnWallet , s . YoutubeChannelID )
2018-09-20 01:05:47 +02:00
return errors . Err ( "not all published videos are in the database" )
}
2018-08-31 17:42:15 +02:00
if pubsOnWallet < pubsOnDB {
2019-07-15 22:16:02 +02:00
logUtils . SendInfoToSlack ( "we're claiming to have published %d videos but we only published %d (%s)" , pubsOnDB , pubsOnWallet , s . YoutubeChannelID )
2018-08-31 17:42:15 +02:00
}
2019-08-23 01:28:51 +02:00
return nil
}
func ( s * Sync ) doSync ( ) error {
err := s . enableAddressReuse ( )
if err != nil {
return errors . Prefix ( "could not set address reuse policy" , err )
}
err = s . walletSetup ( )
if err != nil {
return errors . Prefix ( "Initial wallet setup failed! Manual Intervention is required." , err )
}
err = s . checkIntegrity ( )
if err != nil {
return err
}
if s . transferState != TransferStateComplete {
cert , err := s . daemon . ChannelExport ( s . lbryChannelID , nil , nil )
if err != nil {
return errors . Prefix ( "error getting channel cert" , err )
}
if cert != nil {
err = s . APIConfig . SetChannelCert ( string ( * cert ) , s . lbryChannelID )
if err != nil {
return errors . Prefix ( "error setting channel cert" , err )
}
}
}
2017-10-11 19:13:47 +02:00
2019-08-30 21:08:28 +02:00
if s . Manager . SyncFlags . StopOnError {
2017-10-11 04:02:16 +02:00
log . Println ( "Will stop publishing if an error is detected" )
}
for i := 0 ; i < s . ConcurrentVideos ; i ++ {
2018-08-14 16:48:55 +02:00
s . grp . Add ( 1 )
2018-08-31 17:42:15 +02:00
go func ( i int ) {
2018-08-14 16:48:55 +02:00
defer s . grp . Done ( )
s . startWorker ( i )
2018-08-31 17:42:15 +02:00
} ( i )
2017-10-11 04:02:16 +02:00
}
2018-02-13 18:47:05 +01:00
if s . LbryChannelName == "@UCBerkeley" {
2019-06-06 23:25:31 +02:00
err = errors . Err ( "UCB is not supported in this version of YTSYNC" )
2018-02-13 18:47:05 +01:00
} else {
err = s . enqueueYoutubeVideos ( )
}
2017-12-28 18:14:33 +01:00
close ( s . queue )
2018-08-14 16:48:55 +02:00
s . grp . Wait ( )
2017-10-11 04:02:16 +02:00
return err
}
2017-12-28 18:14:33 +01:00
func ( s * Sync ) startWorker ( workerNum int ) {
var v video
var more bool
2017-11-02 16:20:22 +01:00
for {
2017-12-28 18:14:33 +01:00
select {
2018-06-25 22:13:28 +02:00
case <- s . grp . Ch ( ) :
2017-12-28 18:14:33 +01:00
log . Printf ( "Stopping worker %d" , workerNum )
return
default :
2017-11-02 16:20:22 +01:00
}
2017-12-28 18:14:33 +01:00
select {
case v , more = <- s . queue :
if ! more {
return
}
2018-06-25 22:13:28 +02:00
case <- s . grp . Ch ( ) :
2017-12-28 18:14:33 +01:00
log . Printf ( "Stopping worker %d" , workerNum )
return
2017-10-11 04:02:16 +02:00
}
2018-02-13 18:47:05 +01:00
log . Println ( "================================================================================" )
2017-10-11 04:02:16 +02:00
2017-12-28 18:14:33 +01:00
tryCount := 0
for {
tryCount ++
err := s . processVideo ( v )
2017-10-11 04:02:16 +02:00
2017-12-28 18:14:33 +01:00
if err != nil {
2019-06-12 05:25:01 +02:00
logMsg := fmt . Sprintf ( "error processing video %s: %s" , v . ID ( ) , err . Error ( ) )
2018-04-26 12:12:54 +02:00
log . Errorln ( logMsg )
2019-07-12 23:20:01 +02:00
if strings . Contains ( strings . ToLower ( err . Error ( ) ) , "interrupted by user" ) {
2019-07-12 21:32:49 +02:00
return
}
2018-04-30 21:18:27 +02:00
fatalErrors := [ ] string {
":5279: read: connection reset by peer" ,
2018-06-06 23:47:28 +02:00
"no space left on device" ,
2018-06-15 23:03:28 +02:00
"NotEnoughFunds" ,
2018-06-18 01:50:59 +02:00
"Cannot publish using channel" ,
2018-08-09 22:54:23 +02:00
"cannot concatenate 'str' and 'NoneType' objects" ,
2018-08-17 20:05:39 +02:00
"more than 90% of the space has been used." ,
2019-07-12 02:21:27 +02:00
"Couldn't find private key for id" ,
2018-04-30 21:18:27 +02:00
}
2019-08-30 21:08:28 +02:00
if util . SubstringInSlice ( err . Error ( ) , fatalErrors ) || s . Manager . SyncFlags . StopOnError {
2018-06-25 22:13:28 +02:00
s . grp . Stop ( )
2017-12-28 18:14:33 +01:00
} else if s . MaxTries > 1 {
2018-04-30 21:18:27 +02:00
errorsNoRetry := [ ] string {
"non 200 status code received" ,
2019-07-12 22:58:34 +02:00
"This video contains content from" ,
2018-04-30 21:18:27 +02:00
"dont know which claim to update" ,
"uploader has not made this video available in your country" ,
"download error: AccessDenied: Access Denied" ,
"Playback on other websites has been disabled by the video owner" ,
"Error in daemon: Cannot publish empty file" ,
2018-05-07 22:26:46 +02:00
"Error extracting sts from embedded url response" ,
2018-08-29 13:33:02 +02:00
"Unable to extract signature tokens" ,
2018-06-06 23:47:28 +02:00
"Client.Timeout exceeded while awaiting headers)" ,
2018-08-01 14:56:04 +02:00
"the video is too big to sync, skipping for now" ,
2018-10-11 23:21:05 +02:00
"video is too long to process" ,
2019-02-15 14:11:38 +01:00
"no compatible format available for this video" ,
"Watch this video on YouTube." ,
"have blocked it on copyright grounds" ,
2019-05-07 16:01:11 +02:00
"the video must be republished as we can't get the right size" ,
2019-07-11 19:14:15 +02:00
"HTTP Error 403" ,
2019-07-22 02:45:38 +02:00
"giving up after 0 fragment retries" ,
2019-08-30 19:35:04 +02:00
"download error: ERROR: Sorry about that" ,
2018-04-30 21:18:27 +02:00
}
2019-07-10 15:46:54 +02:00
if util . SubstringInSlice ( err . Error ( ) , errorsNoRetry ) {
2017-12-28 18:14:33 +01:00
log . Println ( "This error should not be retried at all" )
2018-04-20 21:06:55 +02:00
} else if tryCount < s . MaxTries {
2019-07-10 15:46:54 +02:00
if strings . Contains ( err . Error ( ) , "txn-mempool-conflict" ) ||
strings . Contains ( err . Error ( ) , "too-long-mempool-chain" ) {
log . Println ( "waiting for a block before retrying" )
err := s . waitForNewBlock ( )
if err != nil {
s . grp . Stop ( )
2019-08-04 00:34:48 +02:00
logUtils . SendErrorToSlack ( "something went wrong while waiting for a block: %s" , errors . FullTrace ( err ) )
2019-07-10 15:46:54 +02:00
break
}
} else if util . SubstringInSlice ( err . Error ( ) , [ ] string {
2019-03-29 02:38:00 +01:00
"Not enough funds to cover this transaction" ,
"failed: Not enough funds" ,
2019-06-12 05:25:01 +02:00
"Error in daemon: Insufficient funds, please deposit additional LBC" ,
} ) {
log . Println ( "checking funds and UTXOs before retrying..." )
2019-06-13 19:33:58 +02:00
err := s . walletSetup ( )
2018-04-20 21:06:55 +02:00
if err != nil {
2018-07-12 14:28:20 +02:00
s . grp . Stop ( )
2019-08-04 00:34:48 +02:00
logUtils . SendErrorToSlack ( "failed to setup the wallet for a refill: %s" , errors . FullTrace ( err ) )
2018-05-05 16:15:02 +02:00
break
2018-04-20 21:06:55 +02:00
}
2019-06-04 22:21:40 +02:00
} else if strings . Contains ( err . Error ( ) , "Error in daemon: 'str' object has no attribute 'get'" ) {
time . Sleep ( 5 * time . Second )
2018-04-20 21:06:55 +02:00
}
2017-12-28 18:14:33 +01:00
log . Println ( "Retrying" )
continue
}
2019-07-15 22:16:02 +02:00
logUtils . SendErrorToSlack ( "Video failed after %d retries, skipping. Stack: %s" , tryCount , logMsg )
2017-12-28 18:14:33 +01:00
}
2019-06-26 20:40:40 +02:00
s . syncedVideosMux . RLock ( )
2019-05-24 19:01:16 +02:00
existingClaim , ok := s . syncedVideos [ v . ID ( ) ]
2019-06-26 20:40:40 +02:00
s . syncedVideosMux . RUnlock ( )
2019-05-24 19:01:16 +02:00
existingClaimID := ""
existingClaimName := ""
2019-06-12 03:35:21 +02:00
existingClaimSize := int64 ( 0 )
if v . Size ( ) != nil {
existingClaimSize = * v . Size ( )
}
2019-05-24 19:01:16 +02:00
if ok {
existingClaimID = existingClaim . ClaimID
existingClaimName = existingClaim . ClaimName
if existingClaim . Size > 0 {
2019-06-12 03:35:21 +02:00
existingClaimSize = existingClaim . Size
2019-05-24 19:01:16 +02:00
}
}
2019-06-10 21:59:42 +02:00
videoStatus := VideoStatusFailed
if strings . Contains ( err . Error ( ) , "upgrade failed" ) {
videoStatus = VideoStatusUpgradeFailed
2019-06-12 03:35:21 +02:00
} else {
s . AppendSyncedVideo ( v . ID ( ) , false , err . Error ( ) , existingClaimName , existingClaimID , 0 , existingClaimSize )
2019-06-10 21:59:42 +02:00
}
2019-08-20 07:06:51 +02:00
err = s . Manager . apiConfig . MarkVideoStatus ( sdk . VideoStatus {
ChannelID : s . YoutubeChannelID ,
VideoID : v . ID ( ) ,
Status : videoStatus ,
ClaimID : existingClaimID ,
ClaimName : existingClaimName ,
FailureReason : err . Error ( ) ,
Size : & existingClaimSize ,
} )
2018-07-21 01:56:36 +02:00
if err != nil {
2019-08-04 00:34:48 +02:00
logUtils . SendErrorToSlack ( "Failed to mark video on the database: %s" , errors . FullTrace ( err ) )
2018-07-25 19:08:28 +02:00
}
2017-12-28 18:14:33 +01:00
}
break
2017-11-06 22:42:52 +01:00
}
2017-10-11 04:02:16 +02:00
}
}
2018-02-13 18:47:05 +01:00
func ( s * Sync ) enqueueYoutubeVideos ( ) error {
2017-10-11 04:02:16 +02:00
client := & http . Client {
2018-09-26 06:08:18 +02:00
Transport : & transport . APIKey { Key : s . APIConfig . YoutubeAPIKey } ,
2017-10-11 04:02:16 +02:00
}
service , err := youtube . New ( client )
if err != nil {
2018-03-09 17:47:38 +01:00
return errors . Prefix ( "error creating YouTube service" , err )
2017-10-11 04:02:16 +02:00
}
2017-12-28 18:14:33 +01:00
response , err := service . Channels . List ( "contentDetails" ) . Id ( s . YoutubeChannelID ) . Do ( )
2017-10-11 04:02:16 +02:00
if err != nil {
2018-03-09 17:47:38 +01:00
return errors . Prefix ( "error getting channels" , err )
2017-10-11 04:02:16 +02:00
}
if len ( response . Items ) < 1 {
2018-03-09 17:47:38 +01:00
return errors . Err ( "youtube channel not found" )
2017-10-11 04:02:16 +02:00
}
if response . Items [ 0 ] . ContentDetails . RelatedPlaylists == nil {
2018-03-09 17:47:38 +01:00
return errors . Err ( "no related playlists" )
2017-10-11 04:02:16 +02:00
}
playlistID := response . Items [ 0 ] . ContentDetails . RelatedPlaylists . Uploads
if playlistID == "" {
2018-03-09 17:47:38 +01:00
return errors . Err ( "no channel playlist" )
2017-10-11 04:02:16 +02:00
}
2017-12-28 18:14:33 +01:00
var videos [ ] video
2019-05-31 16:38:31 +02:00
playlistMap := make ( map [ string ] * youtube . PlaylistItemSnippet , 50 )
2017-10-11 04:02:16 +02:00
nextPageToken := ""
for {
req := service . PlaylistItems . List ( "snippet" ) .
PlaylistId ( playlistID ) .
MaxResults ( 50 ) .
PageToken ( nextPageToken )
playlistResponse , err := req . Do ( )
if err != nil {
2018-03-09 17:47:38 +01:00
return errors . Prefix ( "error getting playlist items" , err )
2017-10-11 04:02:16 +02:00
}
if len ( playlistResponse . Items ) < 1 {
2019-01-11 20:15:17 +01:00
// If there are 50+ videos in a playlist but less than 50 are actually returned by the API, youtube will still redirect
// clients to a next page. Such next page will however be empty. This logic prevents ytsync from failing.
youtubeIsLying := len ( videos ) > 0
if youtubeIsLying {
break
}
2018-03-09 17:47:38 +01:00
return errors . Err ( "playlist items not found" )
2017-10-11 04:02:16 +02:00
}
2019-05-31 16:38:31 +02:00
//playlistMap := make(map[string]*youtube.PlaylistItemSnippet, 50)
2019-04-19 03:22:51 +02:00
videoIDs := make ( [ ] string , 50 )
for i , item := range playlistResponse . Items {
2017-10-11 04:02:16 +02:00
// normally we'd send the video into the channel here, but youtube api doesn't have sorting
// so we have to get ALL the videos, then sort them, then send them in
2019-04-19 03:22:51 +02:00
playlistMap [ item . Snippet . ResourceId . VideoId ] = item . Snippet
videoIDs [ i ] = item . Snippet . ResourceId . VideoId
}
2019-05-03 05:11:52 +02:00
req2 := service . Videos . List ( "snippet,contentDetails,recordingDetails" ) . Id ( strings . Join ( videoIDs [ : ] , "," ) )
2019-04-19 03:22:51 +02:00
videosListResponse , err := req2 . Do ( )
if err != nil {
return errors . Prefix ( "error getting videos info" , err )
}
for _ , item := range videosListResponse . Items {
2019-07-12 21:32:49 +02:00
videos = append ( videos , sources . NewYoutubeVideo ( s . videoDirectory , item , playlistMap [ item . Id ] . Position , s . Manager . GetS3AWSConfig ( ) , s . grp ) )
2017-10-11 04:02:16 +02:00
}
2017-12-28 18:14:33 +01:00
log . Infof ( "Got info for %d videos from youtube API" , len ( videos ) )
2017-10-11 04:02:16 +02:00
nextPageToken = playlistResponse . NextPageToken
if nextPageToken == "" {
break
}
}
2019-05-31 16:38:31 +02:00
for k , v := range s . syncedVideos {
if ! v . Published {
continue
}
_ , ok := playlistMap [ k ]
if ! ok {
2019-07-12 21:32:49 +02:00
videos = append ( videos , sources . NewMockedVideo ( s . videoDirectory , k , s . YoutubeChannelID , s . Manager . GetS3AWSConfig ( ) , s . grp ) )
2019-05-31 16:38:31 +02:00
}
2017-10-11 04:02:16 +02:00
2019-05-31 16:38:31 +02:00
}
2017-10-11 04:02:16 +02:00
sort . Sort ( byPublishedAt ( videos ) )
//or sort.Sort(sort.Reverse(byPlaylistPosition(videos)))
2017-10-11 19:13:47 +02:00
Enqueue :
2017-10-11 04:02:16 +02:00
for _ , v := range videos {
select {
2018-06-25 22:13:28 +02:00
case <- s . grp . Ch ( ) :
2017-12-28 18:14:33 +01:00
break Enqueue
default :
}
select {
case s . queue <- v :
2018-06-25 22:13:28 +02:00
case <- s . grp . Ch ( ) :
2017-10-11 19:13:47 +02:00
break Enqueue
2017-10-11 04:02:16 +02:00
}
}
return nil
}
2018-03-09 17:47:38 +01:00
func ( s * Sync ) processVideo ( v video ) ( err error ) {
defer func ( ) {
if p := recover ( ) ; p != nil {
2018-10-09 21:57:07 +02:00
log . Printf ( "stack: %s" , debug . Stack ( ) )
2018-03-09 17:47:38 +01:00
var ok bool
err , ok = p . ( error )
if ! ok {
err = errors . Err ( "%v" , p )
}
err = errors . Wrap ( p , 2 )
}
} ( )
2017-12-28 18:14:33 +01:00
log . Println ( "Processing " + v . IDAndNum ( ) )
2017-11-02 16:20:22 +01:00
defer func ( start time . Time ) {
2017-12-28 18:14:33 +01:00
log . Println ( v . ID ( ) + " took " + time . Since ( start ) . String ( ) )
2017-11-02 16:20:22 +01:00
} ( time . Now ( ) )
2017-10-11 04:02:16 +02:00
2018-08-23 00:28:31 +02:00
s . syncedVideosMux . RLock ( )
2018-07-31 01:19:12 +02:00
sv , ok := s . syncedVideos [ v . ID ( ) ]
2018-08-23 00:28:31 +02:00
s . syncedVideosMux . RUnlock ( )
2019-06-06 16:24:20 +02:00
newMetadataVersion := int8 ( 2 )
2018-07-31 01:19:12 +02:00
alreadyPublished := ok && sv . Published
2019-08-30 21:08:28 +02:00
videoRequiresUpgrade := ok && s . Manager . SyncFlags . UpgradeMetadata && sv . MetadataVersion < newMetadataVersion
2018-07-31 01:19:12 +02:00
2018-08-02 14:05:06 +02:00
neverRetryFailures := [ ] string {
"Error extracting sts from embedded url response" ,
2018-08-29 13:33:02 +02:00
"Unable to extract signature tokens" ,
2018-08-02 14:05:06 +02:00
"the video is too big to sync, skipping for now" ,
2018-10-11 23:21:05 +02:00
"video is too long to process" ,
2019-07-12 23:20:01 +02:00
"This video contains content from" ,
2019-02-15 14:11:38 +01:00
"no compatible format available for this video" ,
"Watch this video on YouTube." ,
"have blocked it on copyright grounds" ,
2019-07-22 02:45:38 +02:00
"giving up after 0 fragment retries" ,
2018-08-02 14:05:06 +02:00
}
if ok && ! sv . Published && util . SubstringInSlice ( sv . FailureReason , neverRetryFailures ) {
log . Println ( v . ID ( ) + " can't ever be published" )
2018-07-31 01:19:12 +02:00
return nil
2017-10-11 04:02:16 +02:00
}
2019-06-06 16:24:20 +02:00
if alreadyPublished && ! videoRequiresUpgrade {
2017-12-28 18:14:33 +01:00
log . Println ( v . ID ( ) + " already published" )
return nil
2017-10-11 04:02:16 +02:00
}
2019-05-09 17:06:56 +02:00
if ok && sv . MetadataVersion >= newMetadataVersion {
log . Println ( v . ID ( ) + " upgraded to the new metadata" )
return nil
}
2017-10-11 04:02:16 +02:00
2019-06-10 01:41:52 +02:00
if ! videoRequiresUpgrade && v . PlaylistPosition ( ) > s . Manager . videosLimit {
2018-04-25 23:06:17 +02:00
log . Println ( v . ID ( ) + " is old: skipping" )
return nil
}
2018-08-17 20:05:39 +02:00
err = s . Manager . checkUsedSpace ( )
if err != nil {
return err
}
2019-05-03 05:11:52 +02:00
sp := sources . SyncParams {
ClaimAddress : s . claimAddress ,
Amount : publishAmount ,
ChannelID : s . lbryChannelID ,
MaxVideoSize : s . Manager . maxVideoSize ,
Namer : s . namer ,
MaxVideoLength : s . Manager . maxVideoLength ,
2019-06-06 02:16:07 +02:00
Fee : s . Fee ,
2019-05-03 05:11:52 +02:00
}
2018-09-18 21:20:34 +02:00
2019-06-12 03:17:59 +02:00
summary , err := v . Sync ( s . daemon , sp , & sv , videoRequiresUpgrade , s . walletMux )
2017-10-11 04:02:16 +02:00
if err != nil {
return err
}
2018-09-18 21:20:34 +02:00
2019-06-12 03:35:21 +02:00
s . AppendSyncedVideo ( v . ID ( ) , true , "" , summary . ClaimName , summary . ClaimID , newMetadataVersion , * v . Size ( ) )
2019-08-20 07:06:51 +02:00
err = s . Manager . apiConfig . MarkVideoStatus ( sdk . VideoStatus {
ChannelID : s . YoutubeChannelID ,
VideoID : v . ID ( ) ,
Status : VideoStatusPublished ,
ClaimID : summary . ClaimID ,
ClaimName : summary . ClaimName ,
Size : v . Size ( ) ,
2019-08-21 20:01:24 +02:00
MetaDataVersion : LatestMetadataVersion ,
2019-08-23 01:28:51 +02:00
IsTransferred : util . PtrToBool ( s . shouldTransfer ( ) ) ,
2019-08-20 07:06:51 +02:00
} )
2017-10-11 04:02:16 +02:00
if err != nil {
2019-08-04 00:34:48 +02:00
logUtils . SendErrorToSlack ( "Failed to mark video on the database: %s" , errors . FullTrace ( err ) )
2017-11-03 13:46:27 +01:00
}
return nil
}
2017-12-30 01:21:16 +01:00
2018-04-20 21:06:55 +02:00
// waitForDaemonProcess observes the running processes and returns when the process is no longer running or when the timeout is up
func waitForDaemonProcess ( timeout time . Duration ) error {
processes , err := ps . Processes ( )
if err != nil {
return err
}
var daemonProcessId = - 1
for _ , p := range processes {
2019-01-30 13:42:23 +01:00
if p . Executable ( ) == "lbrynet" {
2018-04-20 21:06:55 +02:00
daemonProcessId = p . Pid ( )
break
}
}
if daemonProcessId == - 1 {
return nil
}
then := time . Now ( )
stopTime := then . Add ( time . Duration ( timeout * time . Second ) )
for ! time . Now ( ) . After ( stopTime ) {
wait := 10 * time . Second
log . Println ( "the daemon is still running, waiting for it to exit" )
time . Sleep ( wait )
proc , err := os . FindProcess ( daemonProcessId )
if err != nil {
// couldn't find the process, that means the daemon is stopped and can continue
return nil
}
//double check if process is running and alive
//by sending a signal 0
//NOTE : syscall.Signal is not available in Windows
err = proc . Signal ( syscall . Signal ( 0 ) )
//the process doesn't exist anymore! we're free to go
2018-04-24 20:50:01 +02:00
if err != nil && ( err == syscall . ESRCH || err . Error ( ) == "os: process already finished" ) {
2018-04-20 21:06:55 +02:00
return nil
}
}
return errors . Err ( "timeout reached" )
}