2019-01-11 03:02:26 +01:00
package manager
2017-10-11 04:02:16 +02:00
import (
2018-05-17 01:42:06 +02:00
"fmt"
2017-10-11 04:02:16 +02:00
"io/ioutil"
"net/http"
"os"
2017-12-28 18:14:33 +01:00
"os/signal"
2018-10-09 21:57:07 +02:00
"runtime/debug"
2017-10-11 04:02:16 +02:00
"sort"
2019-10-08 01:38:39 +02:00
"strconv"
2017-10-11 04:02:16 +02:00
"strings"
"sync"
2017-12-28 18:14:33 +01:00
"syscall"
2017-10-11 04:02:16 +02:00
"time"
2019-12-10 23:02:56 +01:00
"github.com/lbryio/ytsync/ip_manager"
2018-10-08 22:19:17 +02:00
"github.com/lbryio/ytsync/namer"
"github.com/lbryio/ytsync/sdk"
"github.com/lbryio/ytsync/sources"
2019-05-03 05:11:52 +02:00
"github.com/lbryio/ytsync/thumbs"
2019-07-15 22:16:02 +02:00
logUtils "github.com/lbryio/ytsync/util"
2018-10-08 22:19:17 +02:00
2019-10-10 16:50:33 +02:00
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/lbry.go/v2/extras/util"
2018-09-26 06:08:18 +02:00
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
2017-10-11 04:02:16 +02:00
log "github.com/sirupsen/logrus"
"google.golang.org/api/googleapi/transport"
2018-06-09 01:14:55 +02:00
"google.golang.org/api/youtube/v3"
2017-10-11 04:02:16 +02:00
)
const (
2019-06-12 03:17:59 +02:00
channelClaimAmount = 0.01
estimatedMaxTxFee = 0.1
2019-09-04 19:24:11 +02:00
minimumAccountBalance = 1.0
2019-06-12 03:35:21 +02:00
minimumRefillAmount = 1
2019-06-12 03:17:59 +02:00
publishAmount = 0.01
maxReasonLength = 500
2017-10-11 04:02:16 +02:00
)
2017-12-28 18:14:33 +01:00
type video interface {
2018-08-14 17:09:23 +02:00
Size ( ) * int64
2017-12-28 18:14:33 +01:00
ID ( ) string
IDAndNum ( ) string
2018-04-25 20:56:26 +02:00
PlaylistPosition ( ) int
2017-12-28 18:14:33 +01:00
PublishedAt ( ) time . Time
2019-06-12 03:17:59 +02:00
Sync ( * jsonrpc . Client , sources . SyncParams , * sdk . SyncedVideo , bool , * sync . RWMutex ) ( * sources . SyncSummary , error )
2017-12-28 18:14:33 +01:00
}
// sorting videos
type byPublishedAt [ ] video
func ( a byPublishedAt ) Len ( ) int { return len ( a ) }
func ( a byPublishedAt ) Swap ( i , j int ) { a [ i ] , a [ j ] = a [ j ] , a [ i ] }
func ( a byPublishedAt ) Less ( i , j int ) bool { return a [ i ] . PublishedAt ( ) . Before ( a [ j ] . PublishedAt ( ) ) }
2017-10-11 04:02:16 +02:00
// Sync stores the options that control how syncing happens
type Sync struct {
2019-10-08 01:59:18 +02:00
APIConfig * sdk . APIConfig
YoutubeChannelID string
LbryChannelName string
MaxTries int
ConcurrentVideos int
Refill int
Manager * SyncManager
LbrycrdString string
AwsS3ID string
AwsS3Secret string
AwsS3Region string
AwsS3Bucket string
Fee * sdk . Fee
daemon * jsonrpc . Client
claimAddress string
videoDirectory string
syncedVideosMux * sync . RWMutex
syncedVideos map [ string ] sdk . SyncedVideo
grp * stop . Group
lbryChannelID string
namer * namer . Namer
walletMux * sync . RWMutex
queue chan video
transferState int
clientPublishAddress string
publicKey string
2019-10-16 19:38:45 +02:00
defaultAccountID string
2020-04-08 23:14:10 +02:00
MaxVideoLength float64
2018-08-03 19:21:42 +02:00
}
2019-06-12 03:35:21 +02:00
func ( s * Sync ) AppendSyncedVideo ( videoID string , published bool , failureReason string , claimName string , claimID string , metadataVersion int8 , size int64 ) {
2018-08-14 16:48:55 +02:00
s . syncedVideosMux . Lock ( )
defer s . syncedVideosMux . Unlock ( )
2018-09-26 06:08:18 +02:00
s . syncedVideos [ videoID ] = sdk . SyncedVideo {
2019-06-12 03:35:21 +02:00
VideoID : videoID ,
Published : published ,
FailureReason : failureReason ,
ClaimID : claimID ,
ClaimName : claimName ,
MetadataVersion : metadataVersion ,
Size : size ,
2018-08-03 19:21:42 +02:00
}
2017-11-02 16:20:22 +01:00
}
2018-05-24 02:32:11 +02:00
// IsInterrupted can be queried to discover if the sync process was interrupted manually
func ( s * Sync ) IsInterrupted ( ) bool {
select {
2018-07-12 14:28:20 +02:00
case <- s . grp . Ch ( ) :
2018-05-24 02:32:11 +02:00
return true
default :
return false
}
}
2018-08-09 22:54:23 +02:00
func ( s * Sync ) downloadWallet ( ) error {
2019-07-31 05:32:02 +02:00
defaultWalletDir , defaultTempWalletDir , key , err := s . getWalletPaths ( )
if err != nil {
return errors . Err ( err )
2018-08-09 22:54:23 +02:00
}
creds := credentials . NewStaticCredentials ( s . AwsS3ID , s . AwsS3Secret , "" )
s3Session , err := session . NewSession ( & aws . Config { Region : aws . String ( s . AwsS3Region ) , Credentials : creds } )
if err != nil {
2019-08-04 00:34:48 +02:00
return errors . Prefix ( "error starting session: " , err )
2018-08-09 22:54:23 +02:00
}
downloader := s3manager . NewDownloader ( s3Session )
out , err := os . Create ( defaultTempWalletDir )
if err != nil {
2019-08-04 00:34:48 +02:00
return errors . Prefix ( "error creating temp wallet: " , err )
2018-08-09 22:54:23 +02:00
}
defer out . Close ( )
bytesWritten , err := downloader . Download ( out , & s3 . GetObjectInput {
Bucket : aws . String ( s . AwsS3Bucket ) ,
Key : key ,
} )
if err != nil {
// Casting to the awserr.Error type will allow you to inspect the error
// code returned by the service in code. The error code can be used
// to switch on context specific functionality. In this case a context
// specific error message is printed to the user based on the bucket
// and key existing.
//
// For information on other S3 API error codes see:
// http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
if aerr , ok := err . ( awserr . Error ) ; ok {
code := aerr . Code ( )
if code == s3 . ErrCodeNoSuchKey {
return errors . Err ( "wallet not on S3" )
}
}
return err
} else if bytesWritten == 0 {
return errors . Err ( "zero bytes written" )
}
2019-08-04 00:34:48 +02:00
err = os . Rename ( defaultTempWalletDir , defaultWalletDir )
if err != nil {
return errors . Prefix ( "error replacing temp wallet for default wallet: " , err )
}
return nil
2018-08-09 22:54:23 +02:00
}
2019-07-31 05:32:02 +02:00
func ( s * Sync ) getWalletPaths ( ) ( defaultWallet , tempWallet string , key * string , err error ) {
defaultWallet = os . Getenv ( "HOME" ) + "/.lbryum/wallets/default_wallet"
tempWallet = os . Getenv ( "HOME" ) + "/.lbryum/wallets/tmp_wallet"
key = aws . String ( "/wallets/" + s . YoutubeChannelID )
2019-08-04 00:34:48 +02:00
if logUtils . IsRegTest ( ) {
2019-07-31 05:32:02 +02:00
defaultWallet = os . Getenv ( "HOME" ) + "/.lbryum_regtest/wallets/default_wallet"
tempWallet = os . Getenv ( "HOME" ) + "/.lbryum_regtest/wallets/tmp_wallet"
key = aws . String ( "/regtest/" + s . YoutubeChannelID )
}
walletPath := os . Getenv ( "LBRYNET_WALLETS_DIR" )
if walletPath != "" {
defaultWallet = walletPath + "/wallets/default_wallet"
tempWallet = walletPath + "/wallets/tmp_wallet"
}
if _ , err := os . Stat ( defaultWallet ) ; ! os . IsNotExist ( err ) {
return "" , "" , nil , errors . Err ( "default_wallet already exists" )
}
return
}
2018-08-09 22:54:23 +02:00
func ( s * Sync ) uploadWallet ( ) error {
2019-08-04 00:34:48 +02:00
defaultWalletDir := logUtils . GetDefaultWalletPath ( )
2018-08-09 22:54:23 +02:00
key := aws . String ( "/wallets/" + s . YoutubeChannelID )
2019-08-04 00:34:48 +02:00
if logUtils . IsRegTest ( ) {
2018-08-09 22:54:23 +02:00
key = aws . String ( "/regtest/" + s . YoutubeChannelID )
}
if _ , err := os . Stat ( defaultWalletDir ) ; os . IsNotExist ( err ) {
return errors . Err ( "default_wallet does not exist" )
}
creds := credentials . NewStaticCredentials ( s . AwsS3ID , s . AwsS3Secret , "" )
s3Session , err := session . NewSession ( & aws . Config { Region : aws . String ( s . AwsS3Region ) , Credentials : creds } )
if err != nil {
return err
}
uploader := s3manager . NewUploader ( s3Session )
file , err := os . Open ( defaultWalletDir )
if err != nil {
return err
}
defer file . Close ( )
_ , err = uploader . Upload ( & s3manager . UploadInput {
Bucket : aws . String ( s . AwsS3Bucket ) ,
Key : key ,
Body : file ,
} )
if err != nil {
return err
}
return os . Remove ( defaultWalletDir )
}
2018-09-20 01:05:47 +02:00
func ( s * Sync ) setStatusSyncing ( ) error {
2019-08-21 19:40:35 +02:00
syncedVideos , claimNames , err := s . Manager . apiConfig . SetChannelStatus ( s . YoutubeChannelID , StatusSyncing , "" , nil )
2018-09-20 01:05:47 +02:00
if err != nil {
return err
}
s . syncedVideosMux . Lock ( )
s . syncedVideos = syncedVideos
2018-10-09 21:57:07 +02:00
s . namer . SetNames ( claimNames )
2018-09-20 01:05:47 +02:00
s . syncedVideosMux . Unlock ( )
return nil
}
2019-01-03 19:55:27 +01:00
func ( s * Sync ) setExceptions ( ) {
if s . YoutubeChannelID == "UCwjQfNRW6sGYb__pd7d4nUg" { //@FreeTalkLive
s . Manager . maxVideoLength = 0.0 // skips max length checks
s . Manager . maxVideoSize = 0
}
}
2019-12-27 01:27:29 +01:00
var stopGroup = stop . New ( )
2018-07-17 18:54:22 +02:00
func ( s * Sync ) FullCycle ( ) ( e error ) {
2017-11-03 16:46:19 +01:00
if os . Getenv ( "HOME" ) == "" {
2018-03-09 17:47:38 +01:00
return errors . Err ( "no $HOME env var found" )
2017-11-03 16:46:19 +01:00
}
2017-12-30 01:21:16 +01:00
if s . YoutubeChannelID == "" {
2018-07-17 18:54:22 +02:00
return errors . Err ( "channel ID not provided" )
}
2019-01-03 19:55:27 +01:00
s . setExceptions ( )
2018-08-23 00:28:31 +02:00
s . syncedVideosMux = & sync . RWMutex { }
2019-06-12 03:17:59 +02:00
s . walletMux = & sync . RWMutex { }
2019-12-27 01:27:29 +01:00
s . grp = stopGroup
2018-08-14 16:48:55 +02:00
s . queue = make ( chan video )
interruptChan := make ( chan os . Signal , 1 )
signal . Notify ( interruptChan , os . Interrupt , syscall . SIGTERM )
2018-08-21 19:17:52 +02:00
defer signal . Stop ( interruptChan )
2018-08-14 16:48:55 +02:00
go func ( ) {
<- interruptChan
log . Println ( "Got interrupt signal, shutting down (if publishing, will shut down after current publish)" )
s . grp . Stop ( )
} ( )
2018-09-20 01:05:47 +02:00
err := s . setStatusSyncing ( )
2018-07-17 18:54:22 +02:00
if err != nil {
return err
2017-12-30 01:21:16 +01:00
}
2018-07-31 19:42:20 +02:00
2018-09-20 01:05:47 +02:00
defer s . setChannelTerminationStatus ( & e )
2017-11-03 16:46:19 +01:00
2018-08-09 22:54:23 +02:00
err = s . downloadWallet ( )
if err != nil && err . Error ( ) != "wallet not on S3" {
2018-10-03 02:51:42 +02:00
return errors . Prefix ( "failure in downloading wallet" , err )
2018-08-09 22:54:23 +02:00
} else if err == nil {
2018-08-08 23:59:59 +02:00
log . Println ( "Continuing previous upload" )
2018-08-09 22:54:23 +02:00
} else {
log . Println ( "Starting new wallet" )
2017-11-03 13:46:27 +01:00
}
2018-08-09 22:54:23 +02:00
defer s . stopAndUploadWallet ( & e )
2017-11-03 13:46:27 +01:00
2019-08-02 03:43:41 +02:00
s . videoDirectory , err = ioutil . TempDir ( os . Getenv ( "TMP_DIR" ) , "ytsync" )
2017-12-28 18:14:33 +01:00
if err != nil {
return errors . Wrap ( err , 0 )
}
2019-08-11 04:50:43 +02:00
err = os . Chmod ( s . videoDirectory , 0766 )
if err != nil {
return errors . Err ( err )
}
2017-11-03 13:46:27 +01:00
2019-06-25 02:43:50 +02:00
defer deleteSyncFolder ( s . videoDirectory )
2017-12-28 18:14:33 +01:00
log . Printf ( "Starting daemon" )
2019-08-04 00:34:48 +02:00
err = logUtils . StartDaemon ( )
2017-11-03 13:46:27 +01:00
if err != nil {
return err
}
2017-12-28 18:14:33 +01:00
log . Infoln ( "Waiting for daemon to finish starting..." )
2019-07-31 05:32:02 +02:00
s . daemon = jsonrpc . NewClient ( os . Getenv ( "LBRYNET_ADDRESS" ) )
2018-06-18 01:50:59 +02:00
s . daemon . SetRPCTimeout ( 40 * time . Minute )
2018-03-09 01:27:54 +01:00
2018-08-09 22:54:23 +02:00
err = s . waitForDaemonStart ( )
if err != nil {
return err
2018-03-09 01:27:54 +01:00
}
2017-11-03 13:46:27 +01:00
2019-08-16 05:34:25 +02:00
err = s . doSync ( )
if err != nil {
return err
}
2019-08-21 19:40:35 +02:00
if s . shouldTransfer ( ) {
2019-10-08 01:38:39 +02:00
return s . processTransfers ( )
}
return nil
}
func ( s * Sync ) processTransfers ( ) ( e error ) {
log . Println ( "Processing transfers" )
err := waitConfirmations ( s )
if err != nil {
return err
}
supportAmount , err := abandonSupports ( s )
if err != nil {
return errors . Prefix ( fmt . Sprintf ( "%.6f LBCs were abandoned before failing" , supportAmount ) , err )
}
if supportAmount > 0 {
2019-12-08 16:31:15 +01:00
logUtils . SendInfoToSlack ( "(%s) %.6f LBCs were abandoned and should be used as support" , s . YoutubeChannelID , supportAmount )
2019-10-08 01:38:39 +02:00
}
err = transferVideos ( s )
if err != nil {
return err
}
err = transferChannel ( s )
if err != nil {
return err
}
2019-10-16 19:38:45 +02:00
defaultAccount , err := s . getDefaultAccount ( )
if err != nil {
return err
}
2019-10-08 01:38:39 +02:00
reallocateSupports := supportAmount > 0.01
if reallocateSupports {
err = waitConfirmations ( s )
2019-08-27 00:31:27 +02:00
if err != nil {
return err
}
2019-10-08 01:38:39 +02:00
isTip := true
2019-12-08 17:48:24 +01:00
summary , err := s . daemon . SupportCreate ( s . lbryChannelID , fmt . Sprintf ( "%.6f" , supportAmount ) , & isTip , nil , [ ] string { defaultAccount } , nil )
2019-08-28 21:08:25 +02:00
if err != nil {
2019-12-24 05:00:16 +01:00
if strings . Contains ( err . Error ( ) , "tx-size" ) { //TODO: this is a silly workaround and should be written in an recursive function
summary , err = s . daemon . SupportCreate ( s . lbryChannelID , fmt . Sprintf ( "%.6f" , supportAmount / 2.0 ) , & isTip , nil , [ ] string { defaultAccount } , nil )
if err != nil {
return errors . Prefix ( fmt . Sprintf ( "something went wrong while tipping the channel for %.6f LBCs" , supportAmount ) , err )
}
summary , err = s . daemon . SupportCreate ( s . lbryChannelID , fmt . Sprintf ( "%.6f" , supportAmount / 2.0 ) , & isTip , nil , [ ] string { defaultAccount } , nil )
if err != nil {
return errors . Err ( err )
}
} else {
return errors . Err ( err )
}
2019-08-28 21:08:25 +02:00
}
2019-10-08 01:38:39 +02:00
if len ( summary . Outputs ) < 1 {
return errors . Err ( "something went wrong while tipping the channel for %.6f LBCs" , supportAmount )
2019-08-28 21:08:25 +02:00
}
2019-08-16 05:34:25 +02:00
}
2019-10-08 01:38:39 +02:00
log . Println ( "Done processing transfers" )
2019-08-16 05:34:25 +02:00
return nil
2017-11-03 13:46:27 +01:00
}
2019-05-03 05:11:52 +02:00
2019-06-25 02:43:50 +02:00
func deleteSyncFolder ( videoDirectory string ) {
if ! strings . Contains ( videoDirectory , "/tmp/ytsync" ) {
_ = util . SendToSlack ( errors . Err ( "Trying to delete an unexpected directory: %s" , videoDirectory ) . Error ( ) )
}
err := os . RemoveAll ( videoDirectory )
if err != nil {
_ = util . SendToSlack ( err . Error ( ) )
}
}
2019-09-25 04:38:49 +02:00
2019-08-21 19:40:35 +02:00
func ( s * Sync ) shouldTransfer ( ) bool {
2019-10-08 01:59:18 +02:00
return s . transferState >= 1 && s . clientPublishAddress != "" && ! s . Manager . SyncFlags . DisableTransfers
2019-08-21 19:40:35 +02:00
}
2019-09-25 04:38:49 +02:00
2018-09-20 01:05:47 +02:00
func ( s * Sync ) setChannelTerminationStatus ( e * error ) {
2019-08-21 19:40:35 +02:00
var transferState * int
if s . shouldTransfer ( ) {
2019-10-21 15:44:24 +02:00
if * e == nil {
2019-08-23 01:28:51 +02:00
transferState = util . PtrToInt ( TransferStateComplete )
2019-08-21 19:40:35 +02:00
}
}
2018-08-09 22:54:23 +02:00
if * e != nil {
//conditions for which a channel shouldn't be marked as failed
noFailConditions := [ ] string {
"this youtube channel is being managed by another server" ,
2019-02-22 19:33:00 +01:00
"interrupted during daemon startup" ,
2018-08-09 22:54:23 +02:00
}
if util . SubstringInSlice ( ( * e ) . Error ( ) , noFailConditions ) {
return
}
2018-08-17 16:05:54 +02:00
failureReason := ( * e ) . Error ( )
2019-08-21 19:40:35 +02:00
_ , _ , err := s . Manager . apiConfig . SetChannelStatus ( s . YoutubeChannelID , StatusFailed , failureReason , transferState )
2018-08-09 22:54:23 +02:00
if err != nil {
2018-10-03 02:51:42 +02:00
msg := fmt . Sprintf ( "Failed setting failed state for channel %s" , s . LbryChannelName )
* e = errors . Prefix ( msg + err . Error ( ) , * e )
2018-08-09 22:54:23 +02:00
}
} else if ! s . IsInterrupted ( ) {
2019-08-21 19:40:35 +02:00
_ , _ , err := s . Manager . apiConfig . SetChannelStatus ( s . YoutubeChannelID , StatusSynced , "" , transferState )
2018-08-09 22:54:23 +02:00
if err != nil {
* e = err
}
}
}
func ( s * Sync ) waitForDaemonStart ( ) error {
2019-08-13 23:05:09 +02:00
beginTime := time . Now ( )
2018-08-09 22:54:23 +02:00
for {
select {
case <- s . grp . Ch ( ) :
return errors . Err ( "interrupted during daemon startup" )
default :
2019-08-13 23:05:09 +02:00
status , err := s . daemon . Status ( )
if err == nil && status . StartupStatus . Wallet && status . IsRunning {
2018-08-09 22:54:23 +02:00
return nil
}
2019-08-13 23:05:09 +02:00
if time . Since ( beginTime ) . Minutes ( ) > 60 {
s . grp . Stop ( )
return errors . Err ( "the daemon is taking too long to start. Something is wrong" )
}
2018-08-09 22:54:23 +02:00
time . Sleep ( 5 * time . Second )
}
}
}
2018-09-21 16:26:27 +02:00
2018-08-09 22:54:23 +02:00
func ( s * Sync ) stopAndUploadWallet ( e * error ) {
log . Printf ( "Stopping daemon" )
2019-08-04 00:34:48 +02:00
shutdownErr := logUtils . StopDaemon ( )
2018-08-09 22:54:23 +02:00
if shutdownErr != nil {
logShutdownError ( shutdownErr )
} else {
// the cli will return long before the daemon effectively stops. we must observe the processes running
// before moving the wallet
waitTimeout := 8 * time . Minute
processDeathError := waitForDaemonProcess ( waitTimeout )
if processDeathError != nil {
logShutdownError ( processDeathError )
} else {
err := s . uploadWallet ( )
if err != nil {
if * e == nil {
2019-02-22 19:33:00 +01:00
e = & err
2018-08-09 22:54:23 +02:00
return
} else {
2018-10-03 02:51:42 +02:00
* e = errors . Prefix ( "failure uploading wallet" , * e )
2018-08-09 22:54:23 +02:00
}
}
}
}
}
2018-04-26 12:12:54 +02:00
func logShutdownError ( shutdownErr error ) {
2019-08-04 00:34:48 +02:00
logUtils . SendErrorToSlack ( "error shutting down daemon: %s" , errors . FullTrace ( shutdownErr ) )
2019-07-15 22:16:02 +02:00
logUtils . SendErrorToSlack ( "WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR" )
2018-04-26 12:12:54 +02:00
}
2017-11-03 13:46:27 +01:00
2019-04-19 03:22:51 +02:00
var thumbnailHosts = [ ] string {
"berk.ninja/thumbnails/" ,
2019-05-03 05:11:52 +02:00
thumbs . ThumbnailEndpoint ,
2019-04-19 03:22:51 +02:00
}
2019-12-14 06:43:01 +01:00
func isYtsyncClaim ( c jsonrpc . Claim , expectedChannelID string ) bool {
2019-04-19 03:22:51 +02:00
if ! util . InSlice ( c . Type , [ ] string { "claim" , "update" } ) || c . Value . GetStream ( ) == nil {
2019-01-03 17:01:00 +01:00
return false
}
2019-04-25 13:16:45 +02:00
if c . Value . GetThumbnail ( ) == nil || c . Value . GetThumbnail ( ) . GetUrl ( ) == "" {
2019-01-03 17:01:00 +01:00
//most likely a claim created outside of ytsync, ignore!
return false
}
2019-12-14 06:43:01 +01:00
if c . SigningChannel == nil {
return false
}
if c . SigningChannel . ClaimID != expectedChannelID {
return false
}
2019-05-08 23:12:13 +02:00
for _ , th := range thumbnailHosts {
if strings . Contains ( c . Value . GetThumbnail ( ) . GetUrl ( ) , th ) {
return true
}
}
return false
2019-01-03 17:01:00 +01:00
}
2018-09-20 01:05:47 +02:00
// fixDupes abandons duplicate claims
func ( s * Sync ) fixDupes ( claims [ ] jsonrpc . Claim ) ( bool , error ) {
abandonedClaims := false
videoIDs := make ( map [ string ] jsonrpc . Claim )
2018-08-31 17:42:15 +02:00
for _ , c := range claims {
2019-12-14 06:43:01 +01:00
if ! isYtsyncClaim ( c , s . lbryChannelID ) {
2018-08-31 17:42:15 +02:00
continue
}
2019-04-25 13:16:45 +02:00
tn := c . Value . GetThumbnail ( ) . GetUrl ( )
2018-09-03 23:38:16 +02:00
videoID := tn [ strings . LastIndex ( tn , "/" ) + 1 : ]
2018-09-20 01:05:47 +02:00
cl , ok := videoIDs [ videoID ]
if ! ok || cl . ClaimID == c . ClaimID {
videoIDs [ videoID ] = c
2018-08-31 17:42:15 +02:00
continue
}
2018-09-20 01:05:47 +02:00
// only keep the most recent one
claimToAbandon := c
videoIDs [ videoID ] = cl
if c . Height > cl . Height {
claimToAbandon = cl
videoIDs [ videoID ] = c
}
2019-10-08 01:59:18 +02:00
if claimToAbandon . Address != s . clientPublishAddress && ! s . syncedVideos [ videoID ] . Transferred {
2019-10-01 16:55:43 +02:00
log . Debugf ( "abandoning %+v" , claimToAbandon )
_ , err := s . daemon . StreamAbandon ( claimToAbandon . Txid , claimToAbandon . Nout , nil , false )
if err != nil {
return true , err
}
} else {
log . Debugf ( "lbrynet stream abandon --txid=%s --nout=%d" , claimToAbandon . Txid , claimToAbandon . Nout )
2018-09-20 01:05:47 +02:00
}
abandonedClaims = true
//return true, nil
2018-08-31 17:42:15 +02:00
}
2018-09-20 01:05:47 +02:00
return abandonedClaims , nil
2018-08-31 17:42:15 +02:00
}
2019-10-08 01:38:39 +02:00
type ytsyncClaim struct {
ClaimID string
MetadataVersion uint
ClaimName string
PublishAddress string
VideoID string
Claim * jsonrpc . Claim
}
// mapFromClaims returns a map of videoIDs (youtube id) to ytsyncClaim which is a structure holding blockchain related
// information
func ( s * Sync ) mapFromClaims ( claims [ ] jsonrpc . Claim ) map [ string ] ytsyncClaim {
videoIDMap := make ( map [ string ] ytsyncClaim , len ( claims ) )
2018-08-31 17:42:15 +02:00
for _ , c := range claims {
2019-12-14 06:43:01 +01:00
if ! isYtsyncClaim ( c , s . lbryChannelID ) {
2018-08-31 17:42:15 +02:00
continue
}
2019-04-25 13:16:45 +02:00
tn := c . Value . GetThumbnail ( ) . GetUrl ( )
2018-09-20 01:05:47 +02:00
videoID := tn [ strings . LastIndex ( tn , "/" ) + 1 : ]
2019-06-10 01:41:52 +02:00
claimMetadataVersion := uint ( 1 )
if strings . Contains ( tn , thumbs . ThumbnailEndpoint ) {
claimMetadataVersion = 2
}
2019-10-08 01:38:39 +02:00
videoIDMap [ videoID ] = ytsyncClaim {
ClaimID : c . ClaimID ,
MetadataVersion : claimMetadataVersion ,
ClaimName : c . Name ,
PublishAddress : c . Address ,
VideoID : videoID ,
Claim : & c ,
}
}
return videoIDMap
}
//updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published
//additionally it removes all entries in the database indicating that a video is published when it's actually not
func ( s * Sync ) updateRemoteDB ( claims [ ] jsonrpc . Claim , ownClaims [ ] jsonrpc . Claim ) ( total , fixed , removed int , err error ) {
allClaimsInfo := s . mapFromClaims ( claims )
ownClaimsInfo := s . mapFromClaims ( ownClaims )
count := len ( allClaimsInfo )
idsToRemove := make ( [ ] string , 0 , count )
for videoID , chainInfo := range allClaimsInfo {
s . syncedVideosMux . RLock ( )
sv , claimInDatabase := s . syncedVideos [ videoID ]
s . syncedVideosMux . RUnlock ( )
metadataDiffers := claimInDatabase && sv . MetadataVersion != int8 ( chainInfo . MetadataVersion )
claimIDDiffers := claimInDatabase && sv . ClaimID != chainInfo . ClaimID
claimNameDiffers := claimInDatabase && sv . ClaimName != chainInfo . ClaimName
claimMarkedUnpublished := claimInDatabase && ! sv . Published
_ , isOwnClaim := ownClaimsInfo [ videoID ]
tranferred := ! isOwnClaim
transferStatusMismatch := sv . Transferred != tranferred
2019-06-10 01:41:52 +02:00
if metadataDiffers {
2019-10-08 01:38:39 +02:00
log . Debugf ( "%s: Mismatch in database for metadata. DB: %d - Blockchain: %d" , videoID , sv . MetadataVersion , chainInfo . MetadataVersion )
2019-06-10 01:41:52 +02:00
}
if claimIDDiffers {
2019-10-08 01:38:39 +02:00
log . Debugf ( "%s: Mismatch in database for claimID. DB: %s - Blockchain: %s" , videoID , sv . ClaimID , chainInfo . ClaimID )
2019-06-10 01:41:52 +02:00
}
2019-06-20 21:45:17 +02:00
if claimNameDiffers {
2019-10-08 01:38:39 +02:00
log . Debugf ( "%s: Mismatch in database for claimName. DB: %s - Blockchain: %s" , videoID , sv . ClaimName , chainInfo . ClaimName )
2019-06-10 01:41:52 +02:00
}
if claimMarkedUnpublished {
log . Debugf ( "%s: Mismatch in database: published but marked as unpublished" , videoID )
}
if ! claimInDatabase {
2019-10-08 01:38:39 +02:00
log . Debugf ( "%s: Published but is not in database (%s - %s)" , videoID , chainInfo . ClaimName , chainInfo . ClaimID )
2019-06-10 01:41:52 +02:00
}
2019-10-08 01:38:39 +02:00
if transferStatusMismatch {
log . Debugf ( "%s: is marked as transferred %t on it's actually %t" , videoID , sv . Transferred , tranferred )
2019-09-04 19:24:11 +02:00
}
2019-10-08 01:38:39 +02:00
if ! claimInDatabase || metadataDiffers || claimIDDiffers || claimNameDiffers || claimMarkedUnpublished || transferStatusMismatch {
claimSize , err := chainInfo . Claim . GetStreamSizeByMagic ( )
2019-06-01 01:46:16 +02:00
if err != nil {
2019-06-10 01:41:52 +02:00
claimSize = 0
2019-06-01 01:46:16 +02:00
}
2019-06-10 01:41:52 +02:00
fixed ++
log . Debugf ( "updating %s in the database" , videoID )
2019-08-20 07:06:51 +02:00
err = s . Manager . apiConfig . MarkVideoStatus ( sdk . VideoStatus {
ChannelID : s . YoutubeChannelID ,
VideoID : videoID ,
Status : VideoStatusPublished ,
2019-10-08 01:38:39 +02:00
ClaimID : chainInfo . ClaimID ,
ClaimName : chainInfo . ClaimName ,
2019-08-20 07:06:51 +02:00
Size : util . PtrToInt64 ( int64 ( claimSize ) ) ,
2019-10-08 01:38:39 +02:00
MetaDataVersion : chainInfo . MetadataVersion ,
IsTransferred : & tranferred ,
2019-08-20 07:06:51 +02:00
} )
2018-09-20 01:05:47 +02:00
if err != nil {
2019-06-04 22:21:40 +02:00
return count , fixed , 0 , err
2018-09-20 01:05:47 +02:00
}
2018-08-31 17:42:15 +02:00
}
}
2019-10-08 01:38:39 +02:00
//reload the synced videos map before we use it for further processing
if fixed > 0 {
err := s . setStatusSyncing ( )
if err != nil {
return count , fixed , 0 , err
}
}
2019-06-04 22:21:40 +02:00
for vID , sv := range s . syncedVideos {
2019-08-23 01:28:51 +02:00
if sv . Transferred {
2019-10-08 01:38:39 +02:00
_ , ok := allClaimsInfo [ vID ]
if ! ok && sv . Published {
2019-12-19 03:13:22 +01:00
searchResponse , err := s . daemon . ClaimSearch ( nil , & sv . ClaimID , nil , nil , 1 , 20 )
if err != nil {
log . Error ( err . Error ( ) )
continue
}
if len ( searchResponse . Claims ) == 0 {
log . Debugf ( "%s: was transferred but appears abandoned! we should ignore this - claimID: %s" , vID , sv . ClaimID )
continue //TODO: we should flag these on the db
} else {
2020-01-14 04:14:14 +01:00
log . Debugf ( "%s: was transferred and was then edited! we should ignore this - claimID: %s" , vID , sv . ClaimID )
2020-01-12 04:01:40 +01:00
//return count, fixed, 0, errors.Err("%s: isn't our control but is on the database and on the blockchain. wtf is up? ClaimID: %s", vID, sv.ClaimID)
2019-12-19 03:13:22 +01:00
}
2019-10-08 01:38:39 +02:00
}
2019-08-23 01:28:51 +02:00
continue
}
2019-10-08 01:38:39 +02:00
_ , ok := ownClaimsInfo [ vID ]
2019-06-04 22:21:40 +02:00
if ! ok && sv . Published {
2019-08-30 21:08:28 +02:00
log . Debugf ( "%s: claims to be published but wasn't found in the list of claims and will be removed if --remove-db-unpublished was specified (%t)" , vID , s . Manager . SyncFlags . RemoveDBUnpublished )
2019-06-04 22:21:40 +02:00
idsToRemove = append ( idsToRemove , vID )
}
}
2019-08-30 21:08:28 +02:00
if s . Manager . SyncFlags . RemoveDBUnpublished && len ( idsToRemove ) > 0 {
2019-08-30 19:35:04 +02:00
log . Infof ( "removing: %s" , strings . Join ( idsToRemove , "," ) )
2019-06-04 22:21:40 +02:00
err := s . Manager . apiConfig . DeleteVideos ( idsToRemove )
if err != nil {
2019-08-23 01:28:51 +02:00
return count , fixed , len ( idsToRemove ) , err
2019-06-04 22:21:40 +02:00
}
2019-10-08 01:38:39 +02:00
removed ++
}
//reload the synced videos map before we use it for further processing
if removed > 0 {
err := s . setStatusSyncing ( )
if err != nil {
return count , fixed , removed , err
}
2019-06-04 22:21:40 +02:00
}
2019-10-08 01:38:39 +02:00
return count , fixed , removed , nil
2018-08-31 17:42:15 +02:00
}
2019-10-08 01:38:39 +02:00
func ( s * Sync ) getClaims ( defaultOnly bool ) ( [ ] jsonrpc . Claim , error ) {
var account * string = nil
if defaultOnly {
a , err := s . getDefaultAccount ( )
if err != nil {
return nil , err
}
account = & a
}
2019-12-08 17:48:24 +01:00
claims , err := s . daemon . StreamList ( account , 1 , 30000 )
2019-10-15 00:29:26 +02:00
if err != nil {
return nil , errors . Prefix ( "cannot list claims" , err )
2019-01-30 13:42:23 +01:00
}
2019-12-13 18:20:41 +01:00
items := make ( [ ] jsonrpc . Claim , 0 , len ( claims . Items ) )
for _ , c := range claims . Items {
if c . SigningChannel != nil && c . SigningChannel . ClaimID == s . lbryChannelID {
items = append ( items , c )
}
}
return items , nil
2019-01-30 13:42:23 +01:00
}
2019-08-23 01:28:51 +02:00
func ( s * Sync ) checkIntegrity ( ) error {
2019-10-08 01:38:39 +02:00
allClaims , err := s . getClaims ( false )
2018-08-31 17:42:15 +02:00
if err != nil {
2019-01-30 13:42:23 +01:00
return err
2018-08-31 17:42:15 +02:00
}
2019-01-30 13:42:23 +01:00
hasDupes , err := s . fixDupes ( allClaims )
2018-08-31 17:42:15 +02:00
if err != nil {
2018-10-03 02:51:42 +02:00
return errors . Prefix ( "error checking for duplicates" , err )
2018-08-31 17:42:15 +02:00
}
if hasDupes {
2019-07-15 22:16:02 +02:00
logUtils . SendInfoToSlack ( "Channel had dupes and was fixed!" )
2018-09-21 16:26:27 +02:00
err = s . waitForNewBlock ( )
if err != nil {
return err
}
2019-10-08 01:38:39 +02:00
allClaims , err = s . getClaims ( false )
2018-09-20 01:05:47 +02:00
if err != nil {
2019-01-30 13:42:23 +01:00
return err
2018-09-20 01:05:47 +02:00
}
2018-08-31 17:42:15 +02:00
}
2018-09-20 01:05:47 +02:00
2019-10-08 01:38:39 +02:00
ownClaims , err := s . getClaims ( true )
if err != nil {
return err
}
pubsOnWallet , nFixed , nRemoved , err := s . updateRemoteDB ( allClaims , ownClaims )
2018-08-31 17:42:15 +02:00
if err != nil {
2019-06-06 16:31:35 +02:00
return errors . Prefix ( "error updating remote database" , err )
2018-08-31 17:42:15 +02:00
}
2019-08-14 03:31:41 +02:00
2019-06-04 22:21:40 +02:00
if nFixed > 0 || nRemoved > 0 {
if nFixed > 0 {
2019-07-15 22:16:02 +02:00
logUtils . SendInfoToSlack ( "%d claims had mismatched database info or were completely missing and were fixed" , nFixed )
2019-06-04 22:21:40 +02:00
}
if nRemoved > 0 {
2019-07-15 22:16:02 +02:00
logUtils . SendInfoToSlack ( "%d were marked as published but weren't actually published and thus removed from the database" , nRemoved )
2019-06-04 22:21:40 +02:00
}
2018-09-20 01:05:47 +02:00
}
2018-08-31 17:42:15 +02:00
pubsOnDB := 0
for _ , sv := range s . syncedVideos {
if sv . Published {
pubsOnDB ++
}
}
2018-09-20 01:05:47 +02:00
if pubsOnWallet > pubsOnDB { //This case should never happen
2019-07-15 22:16:02 +02:00
logUtils . SendInfoToSlack ( "We're claiming to have published %d videos but in reality we published %d (%s)" , pubsOnDB , pubsOnWallet , s . YoutubeChannelID )
2018-09-20 01:05:47 +02:00
return errors . Err ( "not all published videos are in the database" )
}
2018-08-31 17:42:15 +02:00
if pubsOnWallet < pubsOnDB {
2019-07-15 22:16:02 +02:00
logUtils . SendInfoToSlack ( "we're claiming to have published %d videos but we only published %d (%s)" , pubsOnDB , pubsOnWallet , s . YoutubeChannelID )
2018-08-31 17:42:15 +02:00
}
2019-10-08 01:38:39 +02:00
_ , err = s . getUnsentSupports ( ) //TODO: use the returned value when it works
if err != nil {
return err
}
2019-08-23 01:28:51 +02:00
return nil
}
func ( s * Sync ) doSync ( ) error {
err := s . enableAddressReuse ( )
if err != nil {
return errors . Prefix ( "could not set address reuse policy" , err )
}
2019-09-24 20:42:17 +02:00
err = s . importPublicKey ( )
if err != nil {
return errors . Prefix ( "could not import the transferee public key" , err )
}
2020-04-01 04:49:24 +02:00
_ , err = s . daemon . UTXORelease ( nil )
if err != nil {
return errors . Prefix ( "could not run uxo_release" , err )
}
2019-08-23 01:28:51 +02:00
err = s . walletSetup ( )
if err != nil {
return errors . Prefix ( "Initial wallet setup failed! Manual Intervention is required." , err )
}
err = s . checkIntegrity ( )
if err != nil {
return err
}
2019-12-24 05:00:16 +01:00
if s . transferState < TransferStateComplete {
2019-08-23 01:28:51 +02:00
cert , err := s . daemon . ChannelExport ( s . lbryChannelID , nil , nil )
if err != nil {
return errors . Prefix ( "error getting channel cert" , err )
}
if cert != nil {
err = s . APIConfig . SetChannelCert ( string ( * cert ) , s . lbryChannelID )
if err != nil {
return errors . Prefix ( "error setting channel cert" , err )
}
}
}
2017-10-11 19:13:47 +02:00
2019-08-30 21:08:28 +02:00
if s . Manager . SyncFlags . StopOnError {
2017-10-11 04:02:16 +02:00
log . Println ( "Will stop publishing if an error is detected" )
}
for i := 0 ; i < s . ConcurrentVideos ; i ++ {
2018-08-14 16:48:55 +02:00
s . grp . Add ( 1 )
2018-08-31 17:42:15 +02:00
go func ( i int ) {
2018-08-14 16:48:55 +02:00
defer s . grp . Done ( )
s . startWorker ( i )
2018-08-31 17:42:15 +02:00
} ( i )
2017-10-11 04:02:16 +02:00
}
2018-02-13 18:47:05 +01:00
if s . LbryChannelName == "@UCBerkeley" {
2019-06-06 23:25:31 +02:00
err = errors . Err ( "UCB is not supported in this version of YTSYNC" )
2018-02-13 18:47:05 +01:00
} else {
err = s . enqueueYoutubeVideos ( )
}
2017-12-28 18:14:33 +01:00
close ( s . queue )
2018-08-14 16:48:55 +02:00
s . grp . Wait ( )
2017-10-11 04:02:16 +02:00
return err
}
2017-12-28 18:14:33 +01:00
func ( s * Sync ) startWorker ( workerNum int ) {
var v video
var more bool
2017-11-02 16:20:22 +01:00
for {
2017-12-28 18:14:33 +01:00
select {
2018-06-25 22:13:28 +02:00
case <- s . grp . Ch ( ) :
2017-12-28 18:14:33 +01:00
log . Printf ( "Stopping worker %d" , workerNum )
return
default :
2017-11-02 16:20:22 +01:00
}
2017-12-28 18:14:33 +01:00
select {
case v , more = <- s . queue :
if ! more {
return
}
2018-06-25 22:13:28 +02:00
case <- s . grp . Ch ( ) :
2017-12-28 18:14:33 +01:00
log . Printf ( "Stopping worker %d" , workerNum )
return
2017-10-11 04:02:16 +02:00
}
2018-02-13 18:47:05 +01:00
log . Println ( "================================================================================" )
2017-10-11 04:02:16 +02:00
2017-12-28 18:14:33 +01:00
tryCount := 0
for {
tryCount ++
err := s . processVideo ( v )
2017-10-11 04:02:16 +02:00
2017-12-28 18:14:33 +01:00
if err != nil {
2019-06-12 05:25:01 +02:00
logMsg := fmt . Sprintf ( "error processing video %s: %s" , v . ID ( ) , err . Error ( ) )
2018-04-26 12:12:54 +02:00
log . Errorln ( logMsg )
2019-07-12 23:20:01 +02:00
if strings . Contains ( strings . ToLower ( err . Error ( ) ) , "interrupted by user" ) {
2019-07-12 21:32:49 +02:00
return
}
2018-04-30 21:18:27 +02:00
fatalErrors := [ ] string {
":5279: read: connection reset by peer" ,
2018-06-06 23:47:28 +02:00
"no space left on device" ,
2018-06-15 23:03:28 +02:00
"NotEnoughFunds" ,
2018-06-18 01:50:59 +02:00
"Cannot publish using channel" ,
2018-08-09 22:54:23 +02:00
"cannot concatenate 'str' and 'NoneType' objects" ,
2018-08-17 20:05:39 +02:00
"more than 90% of the space has been used." ,
2019-07-12 02:21:27 +02:00
"Couldn't find private key for id" ,
2019-09-10 11:43:20 +02:00
"You already have a stream claim published under the name" ,
2018-04-30 21:18:27 +02:00
}
2019-08-30 21:08:28 +02:00
if util . SubstringInSlice ( err . Error ( ) , fatalErrors ) || s . Manager . SyncFlags . StopOnError {
2018-06-25 22:13:28 +02:00
s . grp . Stop ( )
2017-12-28 18:14:33 +01:00
} else if s . MaxTries > 1 {
2018-04-30 21:18:27 +02:00
errorsNoRetry := [ ] string {
"non 200 status code received" ,
2019-07-12 22:58:34 +02:00
"This video contains content from" ,
2018-04-30 21:18:27 +02:00
"dont know which claim to update" ,
"uploader has not made this video available in your country" ,
"download error: AccessDenied: Access Denied" ,
"Playback on other websites has been disabled by the video owner" ,
"Error in daemon: Cannot publish empty file" ,
2018-05-07 22:26:46 +02:00
"Error extracting sts from embedded url response" ,
2018-08-29 13:33:02 +02:00
"Unable to extract signature tokens" ,
2019-09-27 19:24:05 +02:00
"Client.Timeout exceeded while awaiting headers" ,
2018-08-01 14:56:04 +02:00
"the video is too big to sync, skipping for now" ,
2018-10-11 23:21:05 +02:00
"video is too long to process" ,
2019-02-15 14:11:38 +01:00
"no compatible format available for this video" ,
"Watch this video on YouTube." ,
"have blocked it on copyright grounds" ,
2019-05-07 16:01:11 +02:00
"the video must be republished as we can't get the right size" ,
2019-07-11 19:14:15 +02:00
"HTTP Error 403" ,
2019-07-22 02:45:38 +02:00
"giving up after 0 fragment retries" ,
2019-09-27 19:24:05 +02:00
"Sorry about that" ,
"This video is not available" ,
2019-12-14 14:58:04 +01:00
"requested format not available" ,
2019-12-18 18:22:15 +01:00
"interrupted by user" ,
2019-12-24 05:00:16 +01:00
"Sign in to confirm your age" ,
2019-12-26 17:52:44 +01:00
"This video is unavailable" ,
2018-04-30 21:18:27 +02:00
}
2019-07-10 15:46:54 +02:00
if util . SubstringInSlice ( err . Error ( ) , errorsNoRetry ) {
2017-12-28 18:14:33 +01:00
log . Println ( "This error should not be retried at all" )
2018-04-20 21:06:55 +02:00
} else if tryCount < s . MaxTries {
2019-12-18 20:24:50 +01:00
if util . SubstringInSlice ( err . Error ( ) , [ ] string {
"txn-mempool-conflict" ,
"too-long-mempool-chain" ,
} ) {
2019-07-10 15:46:54 +02:00
log . Println ( "waiting for a block before retrying" )
err := s . waitForNewBlock ( )
if err != nil {
s . grp . Stop ( )
2019-08-04 00:34:48 +02:00
logUtils . SendErrorToSlack ( "something went wrong while waiting for a block: %s" , errors . FullTrace ( err ) )
2019-07-10 15:46:54 +02:00
break
}
} else if util . SubstringInSlice ( err . Error ( ) , [ ] string {
2019-03-29 02:38:00 +01:00
"Not enough funds to cover this transaction" ,
"failed: Not enough funds" ,
2019-06-12 05:25:01 +02:00
"Error in daemon: Insufficient funds, please deposit additional LBC" ,
2019-12-18 20:24:50 +01:00
"Missing inputs" ,
2019-06-12 05:25:01 +02:00
} ) {
log . Println ( "checking funds and UTXOs before retrying..." )
2019-06-13 19:33:58 +02:00
err := s . walletSetup ( )
2018-04-20 21:06:55 +02:00
if err != nil {
2018-07-12 14:28:20 +02:00
s . grp . Stop ( )
2019-08-04 00:34:48 +02:00
logUtils . SendErrorToSlack ( "failed to setup the wallet for a refill: %s" , errors . FullTrace ( err ) )
2018-05-05 16:15:02 +02:00
break
2018-04-20 21:06:55 +02:00
}
2019-06-04 22:21:40 +02:00
} else if strings . Contains ( err . Error ( ) , "Error in daemon: 'str' object has no attribute 'get'" ) {
time . Sleep ( 5 * time . Second )
2018-04-20 21:06:55 +02:00
}
2017-12-28 18:14:33 +01:00
log . Println ( "Retrying" )
continue
}
2019-07-15 22:16:02 +02:00
logUtils . SendErrorToSlack ( "Video failed after %d retries, skipping. Stack: %s" , tryCount , logMsg )
2017-12-28 18:14:33 +01:00
}
2019-06-26 20:40:40 +02:00
s . syncedVideosMux . RLock ( )
2019-05-24 19:01:16 +02:00
existingClaim , ok := s . syncedVideos [ v . ID ( ) ]
2019-06-26 20:40:40 +02:00
s . syncedVideosMux . RUnlock ( )
2019-05-24 19:01:16 +02:00
existingClaimID := ""
existingClaimName := ""
2019-06-12 03:35:21 +02:00
existingClaimSize := int64 ( 0 )
if v . Size ( ) != nil {
existingClaimSize = * v . Size ( )
}
2019-05-24 19:01:16 +02:00
if ok {
existingClaimID = existingClaim . ClaimID
existingClaimName = existingClaim . ClaimName
if existingClaim . Size > 0 {
2019-06-12 03:35:21 +02:00
existingClaimSize = existingClaim . Size
2019-05-24 19:01:16 +02:00
}
}
2019-06-10 21:59:42 +02:00
videoStatus := VideoStatusFailed
if strings . Contains ( err . Error ( ) , "upgrade failed" ) {
videoStatus = VideoStatusUpgradeFailed
2019-06-12 03:35:21 +02:00
} else {
s . AppendSyncedVideo ( v . ID ( ) , false , err . Error ( ) , existingClaimName , existingClaimID , 0 , existingClaimSize )
2019-06-10 21:59:42 +02:00
}
2019-08-20 07:06:51 +02:00
err = s . Manager . apiConfig . MarkVideoStatus ( sdk . VideoStatus {
ChannelID : s . YoutubeChannelID ,
VideoID : v . ID ( ) ,
Status : videoStatus ,
ClaimID : existingClaimID ,
ClaimName : existingClaimName ,
FailureReason : err . Error ( ) ,
Size : & existingClaimSize ,
} )
2018-07-21 01:56:36 +02:00
if err != nil {
2019-08-04 00:34:48 +02:00
logUtils . SendErrorToSlack ( "Failed to mark video on the database: %s" , errors . FullTrace ( err ) )
2018-07-25 19:08:28 +02:00
}
2017-12-28 18:14:33 +01:00
}
break
2017-11-06 22:42:52 +01:00
}
2017-10-11 04:02:16 +02:00
}
}
2020-04-08 23:14:10 +02:00
var mostRecentlyFailedChannel string
2018-02-13 18:47:05 +01:00
func ( s * Sync ) enqueueYoutubeVideos ( ) error {
2017-10-11 04:02:16 +02:00
client := & http . Client {
2018-09-26 06:08:18 +02:00
Transport : & transport . APIKey { Key : s . APIConfig . YoutubeAPIKey } ,
2017-10-11 04:02:16 +02:00
}
service , err := youtube . New ( client )
if err != nil {
2018-03-09 17:47:38 +01:00
return errors . Prefix ( "error creating YouTube service" , err )
2017-10-11 04:02:16 +02:00
}
2017-12-28 18:14:33 +01:00
response , err := service . Channels . List ( "contentDetails" ) . Id ( s . YoutubeChannelID ) . Do ( )
2017-10-11 04:02:16 +02:00
if err != nil {
2018-03-09 17:47:38 +01:00
return errors . Prefix ( "error getting channels" , err )
2017-10-11 04:02:16 +02:00
}
if len ( response . Items ) < 1 {
2018-03-09 17:47:38 +01:00
return errors . Err ( "youtube channel not found" )
2017-10-11 04:02:16 +02:00
}
if response . Items [ 0 ] . ContentDetails . RelatedPlaylists == nil {
2018-03-09 17:47:38 +01:00
return errors . Err ( "no related playlists" )
2017-10-11 04:02:16 +02:00
}
playlistID := response . Items [ 0 ] . ContentDetails . RelatedPlaylists . Uploads
if playlistID == "" {
2018-03-09 17:47:38 +01:00
return errors . Err ( "no channel playlist" )
2017-10-11 04:02:16 +02:00
}
2017-12-28 18:14:33 +01:00
var videos [ ] video
2019-12-18 18:22:15 +01:00
ipPool , err := ip_manager . GetIPPool ( s . grp )
2019-12-10 23:02:56 +01:00
if err != nil {
return err
}
2019-12-27 18:12:41 +01:00
2019-05-31 16:38:31 +02:00
playlistMap := make ( map [ string ] * youtube . PlaylistItemSnippet , 50 )
2017-10-11 04:02:16 +02:00
nextPageToken := ""
for {
req := service . PlaylistItems . List ( "snippet" ) .
PlaylistId ( playlistID ) .
MaxResults ( 50 ) .
PageToken ( nextPageToken )
playlistResponse , err := req . Do ( )
if err != nil {
2018-03-09 17:47:38 +01:00
return errors . Prefix ( "error getting playlist items" , err )
2017-10-11 04:02:16 +02:00
}
if len ( playlistResponse . Items ) < 1 {
2019-01-11 20:15:17 +01:00
// If there are 50+ videos in a playlist but less than 50 are actually returned by the API, youtube will still redirect
// clients to a next page. Such next page will however be empty. This logic prevents ytsync from failing.
youtubeIsLying := len ( videos ) > 0
if youtubeIsLying {
break
}
2020-04-08 23:14:10 +02:00
if s . YoutubeChannelID == mostRecentlyFailedChannel {
return errors . Err ( "playlist items not found" )
}
mostRecentlyFailedChannel = s . YoutubeChannelID
2020-03-31 02:45:53 +02:00
break //return errors.Err("playlist items not found") //TODO: will this work?
2017-10-11 04:02:16 +02:00
}
2019-04-19 03:22:51 +02:00
videoIDs := make ( [ ] string , 50 )
for i , item := range playlistResponse . Items {
2017-10-11 04:02:16 +02:00
// normally we'd send the video into the channel here, but youtube api doesn't have sorting
// so we have to get ALL the videos, then sort them, then send them in
2019-04-19 03:22:51 +02:00
playlistMap [ item . Snippet . ResourceId . VideoId ] = item . Snippet
videoIDs [ i ] = item . Snippet . ResourceId . VideoId
}
2019-05-03 05:11:52 +02:00
req2 := service . Videos . List ( "snippet,contentDetails,recordingDetails" ) . Id ( strings . Join ( videoIDs [ : ] , "," ) )
2019-04-19 03:22:51 +02:00
videosListResponse , err := req2 . Do ( )
if err != nil {
return errors . Prefix ( "error getting videos info" , err )
}
for _ , item := range videosListResponse . Items {
2019-12-10 23:02:56 +01:00
videos = append ( videos , sources . NewYoutubeVideo ( s . videoDirectory , item , playlistMap [ item . Id ] . Position , s . Manager . GetS3AWSConfig ( ) , s . grp , ipPool ) )
2017-10-11 04:02:16 +02:00
}
2017-12-28 18:14:33 +01:00
log . Infof ( "Got info for %d videos from youtube API" , len ( videos ) )
2017-10-11 04:02:16 +02:00
nextPageToken = playlistResponse . NextPageToken
2019-12-27 18:12:41 +01:00
if nextPageToken == "" || s . Manager . SyncFlags . QuickSync || len ( videos ) >= s . Manager . videosLimit {
2017-10-11 04:02:16 +02:00
break
}
}
2019-05-31 16:38:31 +02:00
for k , v := range s . syncedVideos {
if ! v . Published {
continue
}
_ , ok := playlistMap [ k ]
if ! ok {
2019-12-10 23:02:56 +01:00
videos = append ( videos , sources . NewMockedVideo ( s . videoDirectory , k , s . YoutubeChannelID , s . Manager . GetS3AWSConfig ( ) , s . grp , ipPool ) )
2019-05-31 16:38:31 +02:00
}
2017-10-11 04:02:16 +02:00
2019-05-31 16:38:31 +02:00
}
2017-10-11 04:02:16 +02:00
sort . Sort ( byPublishedAt ( videos ) )
2017-10-11 19:13:47 +02:00
Enqueue :
2017-10-11 04:02:16 +02:00
for _ , v := range videos {
select {
2018-06-25 22:13:28 +02:00
case <- s . grp . Ch ( ) :
2017-12-28 18:14:33 +01:00
break Enqueue
default :
}
select {
case s . queue <- v :
2018-06-25 22:13:28 +02:00
case <- s . grp . Ch ( ) :
2017-10-11 19:13:47 +02:00
break Enqueue
2017-10-11 04:02:16 +02:00
}
}
return nil
}
2018-03-09 17:47:38 +01:00
func ( s * Sync ) processVideo ( v video ) ( err error ) {
defer func ( ) {
if p := recover ( ) ; p != nil {
2019-12-27 01:39:27 +01:00
logUtils . SendErrorToSlack ( "Video processing panic! %s" , debug . Stack ( ) )
2018-03-09 17:47:38 +01:00
var ok bool
err , ok = p . ( error )
if ! ok {
err = errors . Err ( "%v" , p )
}
err = errors . Wrap ( p , 2 )
}
} ( )
2017-12-28 18:14:33 +01:00
log . Println ( "Processing " + v . IDAndNum ( ) )
2017-11-02 16:20:22 +01:00
defer func ( start time . Time ) {
2017-12-28 18:14:33 +01:00
log . Println ( v . ID ( ) + " took " + time . Since ( start ) . String ( ) )
2017-11-02 16:20:22 +01:00
} ( time . Now ( ) )
2017-10-11 04:02:16 +02:00
2018-08-23 00:28:31 +02:00
s . syncedVideosMux . RLock ( )
2018-07-31 01:19:12 +02:00
sv , ok := s . syncedVideos [ v . ID ( ) ]
2018-08-23 00:28:31 +02:00
s . syncedVideosMux . RUnlock ( )
2019-06-06 16:24:20 +02:00
newMetadataVersion := int8 ( 2 )
2018-07-31 01:19:12 +02:00
alreadyPublished := ok && sv . Published
2019-08-30 21:08:28 +02:00
videoRequiresUpgrade := ok && s . Manager . SyncFlags . UpgradeMetadata && sv . MetadataVersion < newMetadataVersion
2018-07-31 01:19:12 +02:00
2018-08-02 14:05:06 +02:00
neverRetryFailures := [ ] string {
"Error extracting sts from embedded url response" ,
2018-08-29 13:33:02 +02:00
"Unable to extract signature tokens" ,
2018-08-02 14:05:06 +02:00
"the video is too big to sync, skipping for now" ,
2018-10-11 23:21:05 +02:00
"video is too long to process" ,
2019-07-12 23:20:01 +02:00
"This video contains content from" ,
2019-02-15 14:11:38 +01:00
"no compatible format available for this video" ,
"Watch this video on YouTube." ,
"have blocked it on copyright grounds" ,
2019-07-22 02:45:38 +02:00
"giving up after 0 fragment retries" ,
2019-12-24 05:00:16 +01:00
"Sign in to confirm your age" ,
2018-08-02 14:05:06 +02:00
}
if ok && ! sv . Published && util . SubstringInSlice ( sv . FailureReason , neverRetryFailures ) {
log . Println ( v . ID ( ) + " can't ever be published" )
2018-07-31 01:19:12 +02:00
return nil
2017-10-11 04:02:16 +02:00
}
2019-06-06 16:24:20 +02:00
if alreadyPublished && ! videoRequiresUpgrade {
2017-12-28 18:14:33 +01:00
log . Println ( v . ID ( ) + " already published" )
return nil
2017-10-11 04:02:16 +02:00
}
2019-05-09 17:06:56 +02:00
if ok && sv . MetadataVersion >= newMetadataVersion {
log . Println ( v . ID ( ) + " upgraded to the new metadata" )
return nil
}
2017-10-11 04:02:16 +02:00
2019-10-11 00:57:42 +02:00
if ! videoRequiresUpgrade && v . PlaylistPosition ( ) >= s . Manager . videosLimit {
2018-04-25 23:06:17 +02:00
log . Println ( v . ID ( ) + " is old: skipping" )
return nil
}
2018-08-17 20:05:39 +02:00
err = s . Manager . checkUsedSpace ( )
if err != nil {
return err
}
2020-01-12 04:01:40 +01:00
da , err := s . getDefaultAccount ( )
if err != nil {
return err
}
2019-05-03 05:11:52 +02:00
sp := sources . SyncParams {
ClaimAddress : s . claimAddress ,
Amount : publishAmount ,
ChannelID : s . lbryChannelID ,
MaxVideoSize : s . Manager . maxVideoSize ,
Namer : s . namer ,
2020-04-08 23:14:10 +02:00
MaxVideoLength : s . MaxVideoLength ,
2019-06-06 02:16:07 +02:00
Fee : s . Fee ,
2020-01-12 04:01:40 +01:00
DefaultAccount : da ,
2019-05-03 05:11:52 +02:00
}
2018-09-18 21:20:34 +02:00
2019-06-12 03:17:59 +02:00
summary , err := v . Sync ( s . daemon , sp , & sv , videoRequiresUpgrade , s . walletMux )
2017-10-11 04:02:16 +02:00
if err != nil {
return err
}
2018-09-18 21:20:34 +02:00
2019-06-12 03:35:21 +02:00
s . AppendSyncedVideo ( v . ID ( ) , true , "" , summary . ClaimName , summary . ClaimID , newMetadataVersion , * v . Size ( ) )
2019-08-20 07:06:51 +02:00
err = s . Manager . apiConfig . MarkVideoStatus ( sdk . VideoStatus {
ChannelID : s . YoutubeChannelID ,
VideoID : v . ID ( ) ,
Status : VideoStatusPublished ,
ClaimID : summary . ClaimID ,
ClaimName : summary . ClaimName ,
Size : v . Size ( ) ,
2019-08-21 20:01:24 +02:00
MetaDataVersion : LatestMetadataVersion ,
2019-08-23 01:28:51 +02:00
IsTransferred : util . PtrToBool ( s . shouldTransfer ( ) ) ,
2019-08-20 07:06:51 +02:00
} )
2017-10-11 04:02:16 +02:00
if err != nil {
2019-08-04 00:34:48 +02:00
logUtils . SendErrorToSlack ( "Failed to mark video on the database: %s" , errors . FullTrace ( err ) )
2017-11-03 13:46:27 +01:00
}
return nil
}
2017-12-30 01:21:16 +01:00
2019-09-24 20:42:17 +02:00
func ( s * Sync ) importPublicKey ( ) error {
if s . publicKey != "" {
2019-12-08 17:48:24 +01:00
accountsResponse , err := s . daemon . AccountList ( 1 , 50 )
2019-09-24 20:42:17 +02:00
if err != nil {
return errors . Err ( err )
}
2019-12-08 17:48:24 +01:00
ledger := "lbc_mainnet"
2019-09-24 20:42:17 +02:00
if logUtils . IsRegTest ( ) {
2019-12-08 17:48:24 +01:00
ledger = "lbc_regtest"
2019-09-24 20:42:17 +02:00
}
2019-12-08 17:48:24 +01:00
for _ , a := range accountsResponse . Items {
if * a . Ledger == ledger {
if a . PublicKey == s . publicKey {
return nil
}
2019-09-24 20:42:17 +02:00
}
}
2019-09-25 03:30:07 +02:00
log . Infof ( "Could not find public key %s in the wallet. Importing it..." , s . publicKey )
2019-09-24 20:42:17 +02:00
_ , err = s . daemon . AccountAdd ( s . LbryChannelName , nil , nil , & s . publicKey , util . PtrToBool ( true ) , nil )
return errors . Err ( err )
}
return nil
}
2019-10-08 01:38:39 +02:00
//TODO: fully implement this once I find a way to reliably get the abandoned supports amount
func ( s * Sync ) getUnsentSupports ( ) ( float64 , error ) {
defaultAccount , err := s . getDefaultAccount ( )
if err != nil {
return 0 , errors . Err ( err )
}
if s . transferState == 2 {
balance , err := s . daemon . AccountBalance ( & defaultAccount )
if err != nil {
return 0 , err
} else if balance == nil {
return 0 , errors . Err ( "no response" )
}
balanceAmount , err := strconv . ParseFloat ( balance . Available . String ( ) , 64 )
if err != nil {
return 0 , errors . Err ( err )
}
2019-12-08 17:48:24 +01:00
transactionList , err := s . daemon . TransactionList ( & defaultAccount , 1 , 90000 )
2019-10-08 01:38:39 +02:00
if err != nil {
return 0 , errors . Err ( err )
}
sentSupports := 0.0
2019-12-08 17:48:24 +01:00
for _ , t := range transactionList . Items {
2019-10-08 01:38:39 +02:00
if len ( t . SupportInfo ) == 0 {
continue
}
for _ , support := range t . SupportInfo {
supportAmount , err := strconv . ParseFloat ( support . BalanceDelta , 64 )
if err != nil {
return 0 , err
}
if supportAmount < 0 { // && support.IsTip TODO: re-enable this when transaction list shows correct information
sentSupports += - supportAmount
}
}
}
if balanceAmount > 10 && sentSupports < 1 {
2019-12-08 16:31:15 +01:00
logUtils . SendErrorToSlack ( "(%s) this channel has quite some LBCs in it (%.2f) and %.2f LBC in sent tips, it's likely that the tips weren't actually sent or the wallet has unnecessary extra credits in it" , s . YoutubeChannelID , balanceAmount , sentSupports )
2019-10-08 01:38:39 +02:00
return balanceAmount - 10 , nil
}
}
return 0 , nil
}
2018-04-20 21:06:55 +02:00
// waitForDaemonProcess observes the running processes and returns when the process is no longer running or when the timeout is up
func waitForDaemonProcess ( timeout time . Duration ) error {
then := time . Now ( )
stopTime := then . Add ( time . Duration ( timeout * time . Second ) )
for ! time . Now ( ) . After ( stopTime ) {
wait := 10 * time . Second
log . Println ( "the daemon is still running, waiting for it to exit" )
time . Sleep ( wait )
2019-09-25 05:07:19 +02:00
running , err := logUtils . IsLbrynetRunning ( )
2018-04-20 21:06:55 +02:00
if err != nil {
2019-09-25 05:07:19 +02:00
return errors . Err ( err )
2018-04-20 21:06:55 +02:00
}
2019-09-25 05:07:19 +02:00
if ! running {
2018-04-20 21:06:55 +02:00
return nil
}
}
return errors . Err ( "timeout reached" )
}