2019-01-11 03:02:26 +01:00
package manager
2017-10-11 04:02:16 +02:00
import (
2018-05-17 01:42:06 +02:00
"fmt"
2017-10-11 04:02:16 +02:00
"io/ioutil"
"os"
2017-12-28 18:14:33 +01:00
"os/signal"
2018-10-09 21:57:07 +02:00
"runtime/debug"
2019-10-08 01:38:39 +02:00
"strconv"
2017-10-11 04:02:16 +02:00
"strings"
"sync"
2017-12-28 18:14:33 +01:00
"syscall"
2017-10-11 04:02:16 +02:00
"time"
2020-06-11 18:45:56 +02:00
"github.com/lbryio/ytsync/v5/ip_manager"
"github.com/lbryio/ytsync/v5/namer"
"github.com/lbryio/ytsync/v5/sdk"
2020-08-08 01:12:55 +02:00
"github.com/lbryio/ytsync/v5/shared"
2020-06-11 18:45:56 +02:00
"github.com/lbryio/ytsync/v5/sources"
"github.com/lbryio/ytsync/v5/thumbs"
"github.com/lbryio/ytsync/v5/timing"
logUtils "github.com/lbryio/ytsync/v5/util"
2020-07-27 20:48:05 +02:00
"github.com/lbryio/ytsync/v5/ytapi"
2018-10-08 22:19:17 +02:00
2019-10-10 16:50:33 +02:00
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/lbry.go/v2/extras/util"
2018-09-26 06:08:18 +02:00
2017-10-11 04:02:16 +02:00
log "github.com/sirupsen/logrus"
)
const (
2019-06-12 03:17:59 +02:00
channelClaimAmount = 0.01
estimatedMaxTxFee = 0.1
2019-09-04 19:24:11 +02:00
minimumAccountBalance = 1.0
2019-06-12 03:35:21 +02:00
minimumRefillAmount = 1
2019-06-12 03:17:59 +02:00
publishAmount = 0.01
maxReasonLength = 500
2017-10-11 04:02:16 +02:00
)
// Sync stores the options that control how syncing happens
type Sync struct {
2020-08-08 01:12:55 +02:00
DbChannelData * shared . YoutubeChannel
Manager * SyncManager
daemon * jsonrpc . Client
videoDirectory string
syncedVideosMux * sync . RWMutex
syncedVideos map [ string ] sdk . SyncedVideo
grp * stop . Group
namer * namer . Namer
walletMux * sync . RWMutex
queue chan ytapi . Video
defaultAccountID string
2020-11-03 21:41:39 +01:00
hardVideoFailure hardVideoFailure
}
type hardVideoFailure struct {
lock * sync . Mutex
failed bool
failureReason string
}
func ( hv * hardVideoFailure ) flagFailure ( reason string ) {
hv . lock . Lock ( )
defer hv . lock . Unlock ( )
if hv . failed {
return
}
hv . failed = true
hv . failureReason = reason
2018-08-03 19:21:42 +02:00
}
2019-06-12 03:35:21 +02:00
func ( s * Sync ) AppendSyncedVideo ( videoID string , published bool , failureReason string , claimName string , claimID string , metadataVersion int8 , size int64 ) {
2018-08-14 16:48:55 +02:00
s . syncedVideosMux . Lock ( )
defer s . syncedVideosMux . Unlock ( )
2018-09-26 06:08:18 +02:00
s . syncedVideos [ videoID ] = sdk . SyncedVideo {
2019-06-12 03:35:21 +02:00
VideoID : videoID ,
Published : published ,
FailureReason : failureReason ,
ClaimID : claimID ,
ClaimName : claimName ,
MetadataVersion : metadataVersion ,
Size : size ,
2018-08-03 19:21:42 +02:00
}
2017-11-02 16:20:22 +01:00
}
2018-05-24 02:32:11 +02:00
// IsInterrupted can be queried to discover if the sync process was interrupted manually
func ( s * Sync ) IsInterrupted ( ) bool {
select {
2018-07-12 14:28:20 +02:00
case <- s . grp . Ch ( ) :
2018-05-24 02:32:11 +02:00
return true
default :
return false
}
}
2018-09-20 01:05:47 +02:00
func ( s * Sync ) setStatusSyncing ( ) error {
2020-08-08 01:12:55 +02:00
syncedVideos , claimNames , err := s . Manager . ApiConfig . SetChannelStatus ( s . DbChannelData . ChannelId , shared . StatusSyncing , "" , nil )
2018-09-20 01:05:47 +02:00
if err != nil {
return err
}
s . syncedVideosMux . Lock ( )
s . syncedVideos = syncedVideos
2018-10-09 21:57:07 +02:00
s . namer . SetNames ( claimNames )
2018-09-20 01:05:47 +02:00
s . syncedVideosMux . Unlock ( )
return nil
}
2019-12-27 01:27:29 +01:00
var stopGroup = stop . New ( )
2018-07-17 18:54:22 +02:00
func ( s * Sync ) FullCycle ( ) ( e error ) {
2017-11-03 16:46:19 +01:00
if os . Getenv ( "HOME" ) == "" {
2018-03-09 17:47:38 +01:00
return errors . Err ( "no $HOME env var found" )
2017-11-03 16:46:19 +01:00
}
2020-05-19 23:13:01 +02:00
defer timing . ClearTimings ( )
2018-08-23 00:28:31 +02:00
s . syncedVideosMux = & sync . RWMutex { }
2019-06-12 03:17:59 +02:00
s . walletMux = & sync . RWMutex { }
2019-12-27 01:27:29 +01:00
s . grp = stopGroup
2020-07-27 20:48:05 +02:00
s . queue = make ( chan ytapi . Video )
2018-08-14 16:48:55 +02:00
interruptChan := make ( chan os . Signal , 1 )
signal . Notify ( interruptChan , os . Interrupt , syscall . SIGTERM )
2018-08-21 19:17:52 +02:00
defer signal . Stop ( interruptChan )
2018-08-14 16:48:55 +02:00
go func ( ) {
<- interruptChan
2020-07-28 18:47:28 +02:00
util . SendToSlack ( "got interrupt, shutting down" )
2018-08-14 16:48:55 +02:00
log . Println ( "Got interrupt signal, shutting down (if publishing, will shut down after current publish)" )
s . grp . Stop ( )
2020-07-29 03:34:08 +02:00
time . Sleep ( 5 * time . Second )
debug . PrintStack ( ) // so we can figure out what's not stopping
2018-08-14 16:48:55 +02:00
} ( )
2018-09-20 01:05:47 +02:00
err := s . setStatusSyncing ( )
2018-07-17 18:54:22 +02:00
if err != nil {
return err
2017-12-30 01:21:16 +01:00
}
2018-07-31 19:42:20 +02:00
2018-09-20 01:05:47 +02:00
defer s . setChannelTerminationStatus ( & e )
2018-08-09 22:54:23 +02:00
err = s . downloadWallet ( )
if err != nil && err . Error ( ) != "wallet not on S3" {
2018-10-03 02:51:42 +02:00
return errors . Prefix ( "failure in downloading wallet" , err )
2018-08-09 22:54:23 +02:00
} else if err == nil {
2018-08-08 23:59:59 +02:00
log . Println ( "Continuing previous upload" )
2018-08-09 22:54:23 +02:00
} else {
log . Println ( "Starting new wallet" )
2017-11-03 13:46:27 +01:00
}
2020-08-04 00:55:26 +02:00
err = s . downloadBlockchainDB ( )
if err != nil {
return errors . Prefix ( "failure in downloading blockchain.db" , err )
}
2017-11-03 13:46:27 +01:00
2018-08-09 22:54:23 +02:00
defer s . stopAndUploadWallet ( & e )
2017-11-03 13:46:27 +01:00
2019-08-02 03:43:41 +02:00
s . videoDirectory , err = ioutil . TempDir ( os . Getenv ( "TMP_DIR" ) , "ytsync" )
2017-12-28 18:14:33 +01:00
if err != nil {
return errors . Wrap ( err , 0 )
}
2019-08-11 04:50:43 +02:00
err = os . Chmod ( s . videoDirectory , 0766 )
if err != nil {
return errors . Err ( err )
}
2017-11-03 13:46:27 +01:00
2019-06-25 02:43:50 +02:00
defer deleteSyncFolder ( s . videoDirectory )
2017-12-28 18:14:33 +01:00
log . Printf ( "Starting daemon" )
2019-08-04 00:34:48 +02:00
err = logUtils . StartDaemon ( )
2017-11-03 13:46:27 +01:00
if err != nil {
return err
}
2017-12-28 18:14:33 +01:00
log . Infoln ( "Waiting for daemon to finish starting..." )
2019-07-31 05:32:02 +02:00
s . daemon = jsonrpc . NewClient ( os . Getenv ( "LBRYNET_ADDRESS" ) )
2020-05-20 04:28:29 +02:00
s . daemon . SetRPCTimeout ( 5 * time . Minute )
2018-03-09 01:27:54 +01:00
2018-08-09 22:54:23 +02:00
err = s . waitForDaemonStart ( )
if err != nil {
return err
2018-03-09 01:27:54 +01:00
}
2017-11-03 13:46:27 +01:00
2019-08-16 05:34:25 +02:00
err = s . doSync ( )
if err != nil {
return err
}
2019-08-21 19:40:35 +02:00
if s . shouldTransfer ( ) {
2020-05-19 23:13:01 +02:00
err = s . processTransfers ( )
2019-10-08 01:38:39 +02:00
}
2020-05-19 23:13:01 +02:00
timing . Report ( )
return err
2019-10-08 01:38:39 +02:00
}
func ( s * Sync ) processTransfers ( ) ( e error ) {
log . Println ( "Processing transfers" )
2020-08-08 01:12:55 +02:00
if s . DbChannelData . TransferState != 2 {
2020-08-04 01:23:05 +02:00
err := waitConfirmations ( s )
if err != nil {
return err
}
2019-10-08 01:38:39 +02:00
}
supportAmount , err := abandonSupports ( s )
if err != nil {
return errors . Prefix ( fmt . Sprintf ( "%.6f LBCs were abandoned before failing" , supportAmount ) , err )
}
if supportAmount > 0 {
2020-08-08 01:12:55 +02:00
logUtils . SendInfoToSlack ( "(%s) %.6f LBCs were abandoned and should be used as support" , s . DbChannelData . ChannelId , supportAmount )
2019-10-08 01:38:39 +02:00
}
err = transferVideos ( s )
if err != nil {
return err
}
err = transferChannel ( s )
if err != nil {
return err
}
2019-10-16 19:38:45 +02:00
defaultAccount , err := s . getDefaultAccount ( )
if err != nil {
return err
}
2019-10-08 01:38:39 +02:00
reallocateSupports := supportAmount > 0.01
if reallocateSupports {
err = waitConfirmations ( s )
2019-08-27 00:31:27 +02:00
if err != nil {
return err
}
2019-10-08 01:38:39 +02:00
isTip := true
2020-08-08 01:12:55 +02:00
summary , err := s . daemon . SupportCreate ( s . DbChannelData . ChannelClaimID , fmt . Sprintf ( "%.6f" , supportAmount ) , & isTip , nil , [ ] string { defaultAccount } , nil )
2019-08-28 21:08:25 +02:00
if err != nil {
2020-08-25 18:44:44 +02:00
if strings . Contains ( err . Error ( ) , "tx-size" ) { //TODO: this is a silly workaround...
_ , spendErr := s . daemon . TxoSpend ( util . PtrToString ( "other" ) , nil , nil , nil , nil , & s . defaultAccountID )
if spendErr != nil {
2019-12-24 05:00:16 +01:00
return errors . Prefix ( fmt . Sprintf ( "something went wrong while tipping the channel for %.6f LBCs" , supportAmount ) , err )
}
2020-08-25 18:44:44 +02:00
err = s . waitForNewBlock ( )
if err != nil {
return errors . Prefix ( fmt . Sprintf ( "something went wrong while tipping the channel for %.6f LBCs (waiting for new block)" , supportAmount ) , err )
}
summary , err = s . daemon . SupportCreate ( s . DbChannelData . ChannelClaimID , fmt . Sprintf ( "%.6f" , supportAmount ) , & isTip , nil , [ ] string { defaultAccount } , nil )
2019-12-24 05:00:16 +01:00
if err != nil {
2020-08-25 18:44:44 +02:00
return errors . Prefix ( fmt . Sprintf ( "something went wrong while tipping the channel for %.6f LBCs" , supportAmount ) , err )
2019-12-24 05:00:16 +01:00
}
} else {
2020-08-25 18:44:44 +02:00
return errors . Prefix ( fmt . Sprintf ( "something went wrong while tipping the channel for %.6f LBCs" , supportAmount ) , err )
2019-12-24 05:00:16 +01:00
}
2019-08-28 21:08:25 +02:00
}
2019-10-08 01:38:39 +02:00
if len ( summary . Outputs ) < 1 {
return errors . Err ( "something went wrong while tipping the channel for %.6f LBCs" , supportAmount )
2019-08-28 21:08:25 +02:00
}
2019-08-16 05:34:25 +02:00
}
2019-10-08 01:38:39 +02:00
log . Println ( "Done processing transfers" )
2019-08-16 05:34:25 +02:00
return nil
2017-11-03 13:46:27 +01:00
}
2019-05-03 05:11:52 +02:00
2019-06-25 02:43:50 +02:00
func deleteSyncFolder ( videoDirectory string ) {
if ! strings . Contains ( videoDirectory , "/tmp/ytsync" ) {
_ = util . SendToSlack ( errors . Err ( "Trying to delete an unexpected directory: %s" , videoDirectory ) . Error ( ) )
}
err := os . RemoveAll ( videoDirectory )
if err != nil {
_ = util . SendToSlack ( err . Error ( ) )
}
}
2019-09-25 04:38:49 +02:00
2019-08-21 19:40:35 +02:00
func ( s * Sync ) shouldTransfer ( ) bool {
2021-03-23 01:18:26 +01:00
return s . DbChannelData . TransferState >= 1 && s . DbChannelData . PublishAddress . Address != "" && ! s . Manager . CliFlags . DisableTransfers && s . DbChannelData . TransferState != 3
2019-08-21 19:40:35 +02:00
}
2019-09-25 04:38:49 +02:00
2018-09-20 01:05:47 +02:00
func ( s * Sync ) setChannelTerminationStatus ( e * error ) {
2019-08-21 19:40:35 +02:00
var transferState * int
if s . shouldTransfer ( ) {
2019-10-21 15:44:24 +02:00
if * e == nil {
2020-08-08 01:12:55 +02:00
transferState = util . PtrToInt ( shared . TransferStateComplete )
2019-08-21 19:40:35 +02:00
}
}
2018-08-09 22:54:23 +02:00
if * e != nil {
//conditions for which a channel shouldn't be marked as failed
noFailConditions := [ ] string {
"this youtube channel is being managed by another server" ,
2019-02-22 19:33:00 +01:00
"interrupted during daemon startup" ,
2020-10-22 23:14:21 +02:00
"interrupted by user" ,
2020-10-27 20:57:32 +01:00
"use --skip-space-check to ignore" ,
2018-08-09 22:54:23 +02:00
}
2020-11-03 02:14:01 +01:00
dbWipeConditions := [ ] string {
"Missing inputs" ,
}
2018-08-09 22:54:23 +02:00
if util . SubstringInSlice ( ( * e ) . Error ( ) , noFailConditions ) {
return
}
2020-11-03 02:14:01 +01:00
channelStatus := shared . StatusFailed
if util . SubstringInSlice ( ( * e ) . Error ( ) , dbWipeConditions ) {
channelStatus = shared . StatusWipeDb
}
2018-08-17 16:05:54 +02:00
failureReason := ( * e ) . Error ( )
2020-11-03 02:14:01 +01:00
_ , _ , err := s . Manager . ApiConfig . SetChannelStatus ( s . DbChannelData . ChannelId , channelStatus , failureReason , transferState )
2018-08-09 22:54:23 +02:00
if err != nil {
2020-08-08 01:12:55 +02:00
msg := fmt . Sprintf ( "Failed setting failed state for channel %s" , s . DbChannelData . DesiredChannelName )
2018-10-03 02:51:42 +02:00
* e = errors . Prefix ( msg + err . Error ( ) , * e )
2018-08-09 22:54:23 +02:00
}
} else if ! s . IsInterrupted ( ) {
2020-08-08 01:12:55 +02:00
_ , _ , err := s . Manager . ApiConfig . SetChannelStatus ( s . DbChannelData . ChannelId , shared . StatusSynced , "" , transferState )
2018-08-09 22:54:23 +02:00
if err != nil {
* e = err
}
}
}
func ( s * Sync ) waitForDaemonStart ( ) error {
2019-08-13 23:05:09 +02:00
beginTime := time . Now ( )
2020-05-19 23:13:01 +02:00
defer func ( start time . Time ) {
timing . TimedComponent ( "waitForDaemonStart" ) . Add ( time . Since ( start ) )
} ( beginTime )
2018-08-09 22:54:23 +02:00
for {
select {
case <- s . grp . Ch ( ) :
return errors . Err ( "interrupted during daemon startup" )
default :
2019-08-13 23:05:09 +02:00
status , err := s . daemon . Status ( )
if err == nil && status . StartupStatus . Wallet && status . IsRunning {
2018-08-09 22:54:23 +02:00
return nil
}
2020-08-01 05:28:19 +02:00
if time . Since ( beginTime ) . Minutes ( ) > 120 {
2019-08-13 23:05:09 +02:00
s . grp . Stop ( )
return errors . Err ( "the daemon is taking too long to start. Something is wrong" )
}
2018-08-09 22:54:23 +02:00
time . Sleep ( 5 * time . Second )
}
}
}
2018-09-21 16:26:27 +02:00
2018-08-09 22:54:23 +02:00
func ( s * Sync ) stopAndUploadWallet ( e * error ) {
log . Printf ( "Stopping daemon" )
2019-08-04 00:34:48 +02:00
shutdownErr := logUtils . StopDaemon ( )
2018-08-09 22:54:23 +02:00
if shutdownErr != nil {
logShutdownError ( shutdownErr )
} else {
// the cli will return long before the daemon effectively stops. we must observe the processes running
// before moving the wallet
waitTimeout := 8 * time . Minute
processDeathError := waitForDaemonProcess ( waitTimeout )
if processDeathError != nil {
logShutdownError ( processDeathError )
} else {
err := s . uploadWallet ( )
if err != nil {
if * e == nil {
2019-02-22 19:33:00 +01:00
e = & err
2020-08-04 00:55:26 +02:00
} else {
* e = errors . Prefix ( "failure uploading wallet" , * e )
}
}
err = s . uploadBlockchainDB ( )
if err != nil {
if * e == nil {
e = & err
2018-08-09 22:54:23 +02:00
} else {
2018-10-03 02:51:42 +02:00
* e = errors . Prefix ( "failure uploading wallet" , * e )
2018-08-09 22:54:23 +02:00
}
}
}
}
}
2018-04-26 12:12:54 +02:00
func logShutdownError ( shutdownErr error ) {
2019-08-04 00:34:48 +02:00
logUtils . SendErrorToSlack ( "error shutting down daemon: %s" , errors . FullTrace ( shutdownErr ) )
2019-07-15 22:16:02 +02:00
logUtils . SendErrorToSlack ( "WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR" )
2018-04-26 12:12:54 +02:00
}
2017-11-03 13:46:27 +01:00
2019-04-19 03:22:51 +02:00
var thumbnailHosts = [ ] string {
"berk.ninja/thumbnails/" ,
2019-05-03 05:11:52 +02:00
thumbs . ThumbnailEndpoint ,
2019-04-19 03:22:51 +02:00
}
2019-12-14 06:43:01 +01:00
func isYtsyncClaim ( c jsonrpc . Claim , expectedChannelID string ) bool {
2019-04-19 03:22:51 +02:00
if ! util . InSlice ( c . Type , [ ] string { "claim" , "update" } ) || c . Value . GetStream ( ) == nil {
2019-01-03 17:01:00 +01:00
return false
}
2019-04-25 13:16:45 +02:00
if c . Value . GetThumbnail ( ) == nil || c . Value . GetThumbnail ( ) . GetUrl ( ) == "" {
2019-01-03 17:01:00 +01:00
//most likely a claim created outside of ytsync, ignore!
return false
}
2019-12-14 06:43:01 +01:00
if c . SigningChannel == nil {
return false
}
if c . SigningChannel . ClaimID != expectedChannelID {
return false
}
2019-05-08 23:12:13 +02:00
for _ , th := range thumbnailHosts {
if strings . Contains ( c . Value . GetThumbnail ( ) . GetUrl ( ) , th ) {
return true
}
}
return false
2019-01-03 17:01:00 +01:00
}
2018-09-20 01:05:47 +02:00
// fixDupes abandons duplicate claims
func ( s * Sync ) fixDupes ( claims [ ] jsonrpc . Claim ) ( bool , error ) {
2020-05-19 23:13:01 +02:00
start := time . Now ( )
defer func ( start time . Time ) {
timing . TimedComponent ( "fixDupes" ) . Add ( time . Since ( start ) )
} ( start )
2018-09-20 01:05:47 +02:00
abandonedClaims := false
videoIDs := make ( map [ string ] jsonrpc . Claim )
2018-08-31 17:42:15 +02:00
for _ , c := range claims {
2020-08-08 01:12:55 +02:00
if ! isYtsyncClaim ( c , s . DbChannelData . ChannelClaimID ) {
2018-08-31 17:42:15 +02:00
continue
}
2019-04-25 13:16:45 +02:00
tn := c . Value . GetThumbnail ( ) . GetUrl ( )
2018-09-03 23:38:16 +02:00
videoID := tn [ strings . LastIndex ( tn , "/" ) + 1 : ]
2018-09-20 01:05:47 +02:00
cl , ok := videoIDs [ videoID ]
if ! ok || cl . ClaimID == c . ClaimID {
videoIDs [ videoID ] = c
2018-08-31 17:42:15 +02:00
continue
}
2018-09-20 01:05:47 +02:00
// only keep the most recent one
claimToAbandon := c
videoIDs [ videoID ] = cl
if c . Height > cl . Height {
claimToAbandon = cl
videoIDs [ videoID ] = c
}
2021-03-23 01:18:26 +01:00
//it's likely that all we need is s.DbChannelData.PublishAddress.IsMine but better be safe than sorry I guess
if ( claimToAbandon . Address != s . DbChannelData . PublishAddress . Address || s . DbChannelData . PublishAddress . IsMine ) && ! s . syncedVideos [ videoID ] . Transferred {
2019-10-01 16:55:43 +02:00
log . Debugf ( "abandoning %+v" , claimToAbandon )
2020-11-03 02:14:01 +01:00
_ , err := s . daemon . StreamAbandon ( claimToAbandon . Txid , claimToAbandon . Nout , nil , true )
2019-10-01 16:55:43 +02:00
if err != nil {
return true , err
}
2020-11-03 02:14:01 +01:00
abandonedClaims = true
2019-10-01 16:55:43 +02:00
} else {
2021-03-23 01:18:26 +01:00
log . Debugf ( "claim is not ours. Have the user run this: lbrynet stream abandon --txid=%s --nout=%d" , claimToAbandon . Txid , claimToAbandon . Nout )
2018-09-20 01:05:47 +02:00
}
2018-08-31 17:42:15 +02:00
}
2018-09-20 01:05:47 +02:00
return abandonedClaims , nil
2018-08-31 17:42:15 +02:00
}
2019-10-08 01:38:39 +02:00
type ytsyncClaim struct {
ClaimID string
MetadataVersion uint
ClaimName string
PublishAddress string
VideoID string
Claim * jsonrpc . Claim
}
// mapFromClaims returns a map of videoIDs (youtube id) to ytsyncClaim which is a structure holding blockchain related
// information
func ( s * Sync ) mapFromClaims ( claims [ ] jsonrpc . Claim ) map [ string ] ytsyncClaim {
videoIDMap := make ( map [ string ] ytsyncClaim , len ( claims ) )
2018-08-31 17:42:15 +02:00
for _ , c := range claims {
2020-08-08 01:12:55 +02:00
if ! isYtsyncClaim ( c , s . DbChannelData . ChannelClaimID ) {
2018-08-31 17:42:15 +02:00
continue
}
2019-04-25 13:16:45 +02:00
tn := c . Value . GetThumbnail ( ) . GetUrl ( )
2018-09-20 01:05:47 +02:00
videoID := tn [ strings . LastIndex ( tn , "/" ) + 1 : ]
2019-06-10 01:41:52 +02:00
claimMetadataVersion := uint ( 1 )
if strings . Contains ( tn , thumbs . ThumbnailEndpoint ) {
claimMetadataVersion = 2
}
2019-10-08 01:38:39 +02:00
videoIDMap [ videoID ] = ytsyncClaim {
ClaimID : c . ClaimID ,
MetadataVersion : claimMetadataVersion ,
ClaimName : c . Name ,
PublishAddress : c . Address ,
VideoID : videoID ,
Claim : & c ,
}
}
return videoIDMap
}
//updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published
//additionally it removes all entries in the database indicating that a video is published when it's actually not
func ( s * Sync ) updateRemoteDB ( claims [ ] jsonrpc . Claim , ownClaims [ ] jsonrpc . Claim ) ( total , fixed , removed int , err error ) {
allClaimsInfo := s . mapFromClaims ( claims )
ownClaimsInfo := s . mapFromClaims ( ownClaims )
count := len ( allClaimsInfo )
idsToRemove := make ( [ ] string , 0 , count )
for videoID , chainInfo := range allClaimsInfo {
s . syncedVideosMux . RLock ( )
sv , claimInDatabase := s . syncedVideos [ videoID ]
s . syncedVideosMux . RUnlock ( )
metadataDiffers := claimInDatabase && sv . MetadataVersion != int8 ( chainInfo . MetadataVersion )
claimIDDiffers := claimInDatabase && sv . ClaimID != chainInfo . ClaimID
claimNameDiffers := claimInDatabase && sv . ClaimName != chainInfo . ClaimName
claimMarkedUnpublished := claimInDatabase && ! sv . Published
_ , isOwnClaim := ownClaimsInfo [ videoID ]
2020-08-10 18:33:05 +02:00
transferred := ! isOwnClaim || s . DbChannelData . TransferState == 3
transferStatusMismatch := sv . Transferred != transferred
2019-10-08 01:38:39 +02:00
2019-06-10 01:41:52 +02:00
if metadataDiffers {
2019-10-08 01:38:39 +02:00
log . Debugf ( "%s: Mismatch in database for metadata. DB: %d - Blockchain: %d" , videoID , sv . MetadataVersion , chainInfo . MetadataVersion )
2019-06-10 01:41:52 +02:00
}
if claimIDDiffers {
2019-10-08 01:38:39 +02:00
log . Debugf ( "%s: Mismatch in database for claimID. DB: %s - Blockchain: %s" , videoID , sv . ClaimID , chainInfo . ClaimID )
2019-06-10 01:41:52 +02:00
}
2019-06-20 21:45:17 +02:00
if claimNameDiffers {
2019-10-08 01:38:39 +02:00
log . Debugf ( "%s: Mismatch in database for claimName. DB: %s - Blockchain: %s" , videoID , sv . ClaimName , chainInfo . ClaimName )
2019-06-10 01:41:52 +02:00
}
if claimMarkedUnpublished {
log . Debugf ( "%s: Mismatch in database: published but marked as unpublished" , videoID )
}
if ! claimInDatabase {
2019-10-08 01:38:39 +02:00
log . Debugf ( "%s: Published but is not in database (%s - %s)" , videoID , chainInfo . ClaimName , chainInfo . ClaimID )
2019-06-10 01:41:52 +02:00
}
2019-10-08 01:38:39 +02:00
if transferStatusMismatch {
2021-03-25 18:47:34 +01:00
log . Debugf ( "%s: is marked as transferred %t but it's actually %t" , videoID , sv . Transferred , transferred )
2019-09-04 19:24:11 +02:00
}
2019-10-08 01:38:39 +02:00
if ! claimInDatabase || metadataDiffers || claimIDDiffers || claimNameDiffers || claimMarkedUnpublished || transferStatusMismatch {
claimSize , err := chainInfo . Claim . GetStreamSizeByMagic ( )
2019-06-01 01:46:16 +02:00
if err != nil {
2019-06-10 01:41:52 +02:00
claimSize = 0
2019-06-01 01:46:16 +02:00
}
2019-06-10 01:41:52 +02:00
fixed ++
log . Debugf ( "updating %s in the database" , videoID )
2020-08-08 01:12:55 +02:00
err = s . Manager . ApiConfig . MarkVideoStatus ( shared . VideoStatus {
ChannelID : s . DbChannelData . ChannelId ,
2019-08-20 07:06:51 +02:00
VideoID : videoID ,
2020-08-08 01:12:55 +02:00
Status : shared . VideoStatusPublished ,
2019-10-08 01:38:39 +02:00
ClaimID : chainInfo . ClaimID ,
ClaimName : chainInfo . ClaimName ,
2019-08-20 07:06:51 +02:00
Size : util . PtrToInt64 ( int64 ( claimSize ) ) ,
2019-10-08 01:38:39 +02:00
MetaDataVersion : chainInfo . MetadataVersion ,
2020-08-10 18:33:05 +02:00
IsTransferred : & transferred ,
2019-08-20 07:06:51 +02:00
} )
2018-09-20 01:05:47 +02:00
if err != nil {
2019-06-04 22:21:40 +02:00
return count , fixed , 0 , err
2018-09-20 01:05:47 +02:00
}
2018-08-31 17:42:15 +02:00
}
}
2019-10-08 01:38:39 +02:00
//reload the synced videos map before we use it for further processing
if fixed > 0 {
err := s . setStatusSyncing ( )
if err != nil {
return count , fixed , 0 , err
}
}
2019-06-04 22:21:40 +02:00
for vID , sv := range s . syncedVideos {
2020-07-21 19:54:28 +02:00
if sv . Transferred || sv . IsLbryFirst {
2019-10-08 01:38:39 +02:00
_ , ok := allClaimsInfo [ vID ]
if ! ok && sv . Published {
2019-12-19 03:13:22 +01:00
searchResponse , err := s . daemon . ClaimSearch ( nil , & sv . ClaimID , nil , nil , 1 , 20 )
if err != nil {
log . Error ( err . Error ( ) )
continue
}
if len ( searchResponse . Claims ) == 0 {
log . Debugf ( "%s: was transferred but appears abandoned! we should ignore this - claimID: %s" , vID , sv . ClaimID )
continue //TODO: we should flag these on the db
} else {
2020-07-21 19:54:28 +02:00
if sv . IsLbryFirst {
log . Debugf ( "%s: was published using lbry-first so we don't want to do anything here! - claimID: %s" , vID , sv . ClaimID )
} else {
log . Debugf ( "%s: was transferred and was then edited! we should ignore this - claimID: %s" , vID , sv . ClaimID )
}
2020-01-12 04:01:40 +01:00
//return count, fixed, 0, errors.Err("%s: isn't our control but is on the database and on the blockchain. wtf is up? ClaimID: %s", vID, sv.ClaimID)
2019-12-19 03:13:22 +01:00
}
2019-10-08 01:38:39 +02:00
}
2019-08-23 01:28:51 +02:00
continue
}
2019-10-08 01:38:39 +02:00
_ , ok := ownClaimsInfo [ vID ]
2019-06-04 22:21:40 +02:00
if ! ok && sv . Published {
2020-08-08 01:12:55 +02:00
log . Debugf ( "%s: claims to be published but wasn't found in the list of claims and will be removed if --remove-db-unpublished was specified (%t)" , vID , s . Manager . CliFlags . RemoveDBUnpublished )
2019-06-04 22:21:40 +02:00
idsToRemove = append ( idsToRemove , vID )
}
}
2020-08-08 01:12:55 +02:00
if s . Manager . CliFlags . RemoveDBUnpublished && len ( idsToRemove ) > 0 {
2019-08-30 19:35:04 +02:00
log . Infof ( "removing: %s" , strings . Join ( idsToRemove , "," ) )
2020-08-08 01:12:55 +02:00
err := s . Manager . ApiConfig . DeleteVideos ( idsToRemove )
2019-06-04 22:21:40 +02:00
if err != nil {
2019-08-23 01:28:51 +02:00
return count , fixed , len ( idsToRemove ) , err
2019-06-04 22:21:40 +02:00
}
2019-10-08 01:38:39 +02:00
removed ++
}
//reload the synced videos map before we use it for further processing
if removed > 0 {
err := s . setStatusSyncing ( )
if err != nil {
return count , fixed , removed , err
}
2019-06-04 22:21:40 +02:00
}
2019-10-08 01:38:39 +02:00
return count , fixed , removed , nil
2018-08-31 17:42:15 +02:00
}
2019-10-08 01:38:39 +02:00
func ( s * Sync ) getClaims ( defaultOnly bool ) ( [ ] jsonrpc . Claim , error ) {
var account * string = nil
if defaultOnly {
a , err := s . getDefaultAccount ( )
if err != nil {
return nil , err
}
account = & a
}
2019-12-08 17:48:24 +01:00
claims , err := s . daemon . StreamList ( account , 1 , 30000 )
2019-10-15 00:29:26 +02:00
if err != nil {
return nil , errors . Prefix ( "cannot list claims" , err )
2019-01-30 13:42:23 +01:00
}
2019-12-13 18:20:41 +01:00
items := make ( [ ] jsonrpc . Claim , 0 , len ( claims . Items ) )
for _ , c := range claims . Items {
2020-08-08 01:12:55 +02:00
if c . SigningChannel != nil && c . SigningChannel . ClaimID == s . DbChannelData . ChannelClaimID {
2019-12-13 18:20:41 +01:00
items = append ( items , c )
}
}
return items , nil
2019-01-30 13:42:23 +01:00
}
2019-08-23 01:28:51 +02:00
func ( s * Sync ) checkIntegrity ( ) error {
2020-05-19 23:13:01 +02:00
start := time . Now ( )
defer func ( start time . Time ) {
timing . TimedComponent ( "checkIntegrity" ) . Add ( time . Since ( start ) )
} ( start )
2019-10-08 01:38:39 +02:00
allClaims , err := s . getClaims ( false )
2018-08-31 17:42:15 +02:00
if err != nil {
2019-01-30 13:42:23 +01:00
return err
2018-08-31 17:42:15 +02:00
}
2019-01-30 13:42:23 +01:00
hasDupes , err := s . fixDupes ( allClaims )
2018-08-31 17:42:15 +02:00
if err != nil {
2018-10-03 02:51:42 +02:00
return errors . Prefix ( "error checking for duplicates" , err )
2018-08-31 17:42:15 +02:00
}
if hasDupes {
2019-07-15 22:16:02 +02:00
logUtils . SendInfoToSlack ( "Channel had dupes and was fixed!" )
2018-09-21 16:26:27 +02:00
err = s . waitForNewBlock ( )
if err != nil {
return err
}
2019-10-08 01:38:39 +02:00
allClaims , err = s . getClaims ( false )
2018-09-20 01:05:47 +02:00
if err != nil {
2019-01-30 13:42:23 +01:00
return err
2018-09-20 01:05:47 +02:00
}
2018-08-31 17:42:15 +02:00
}
2018-09-20 01:05:47 +02:00
2019-10-08 01:38:39 +02:00
ownClaims , err := s . getClaims ( true )
if err != nil {
return err
}
pubsOnWallet , nFixed , nRemoved , err := s . updateRemoteDB ( allClaims , ownClaims )
2018-08-31 17:42:15 +02:00
if err != nil {
2019-06-06 16:31:35 +02:00
return errors . Prefix ( "error updating remote database" , err )
2018-08-31 17:42:15 +02:00
}
2019-08-14 03:31:41 +02:00
2019-06-04 22:21:40 +02:00
if nFixed > 0 || nRemoved > 0 {
if nFixed > 0 {
2019-07-15 22:16:02 +02:00
logUtils . SendInfoToSlack ( "%d claims had mismatched database info or were completely missing and were fixed" , nFixed )
2019-06-04 22:21:40 +02:00
}
if nRemoved > 0 {
2019-07-15 22:16:02 +02:00
logUtils . SendInfoToSlack ( "%d were marked as published but weren't actually published and thus removed from the database" , nRemoved )
2019-06-04 22:21:40 +02:00
}
2018-09-20 01:05:47 +02:00
}
2018-08-31 17:42:15 +02:00
pubsOnDB := 0
for _ , sv := range s . syncedVideos {
if sv . Published {
pubsOnDB ++
}
}
2018-09-20 01:05:47 +02:00
if pubsOnWallet > pubsOnDB { //This case should never happen
2020-08-08 01:12:55 +02:00
logUtils . SendInfoToSlack ( "We're claiming to have published %d videos but in reality we published %d (%s)" , pubsOnDB , pubsOnWallet , s . DbChannelData . ChannelId )
2018-09-20 01:05:47 +02:00
return errors . Err ( "not all published videos are in the database" )
}
2018-08-31 17:42:15 +02:00
if pubsOnWallet < pubsOnDB {
2020-08-08 01:12:55 +02:00
logUtils . SendInfoToSlack ( "we're claiming to have published %d videos but we only published %d (%s)" , pubsOnDB , pubsOnWallet , s . DbChannelData . ChannelId )
2018-08-31 17:42:15 +02:00
}
2019-10-08 01:38:39 +02:00
2020-12-16 16:59:07 +01:00
//_, err = s.getUnsentSupports() //TODO: use the returned value when it works
//if err != nil {
// return err
//}
2019-08-23 01:28:51 +02:00
return nil
}
func ( s * Sync ) doSync ( ) error {
err := s . enableAddressReuse ( )
if err != nil {
return errors . Prefix ( "could not set address reuse policy" , err )
}
2019-09-24 20:42:17 +02:00
err = s . importPublicKey ( )
if err != nil {
return errors . Prefix ( "could not import the transferee public key" , err )
}
2020-04-01 04:49:24 +02:00
_ , err = s . daemon . UTXORelease ( nil )
if err != nil {
return errors . Prefix ( "could not run uxo_release" , err )
}
2019-08-23 01:28:51 +02:00
err = s . walletSetup ( )
if err != nil {
return errors . Prefix ( "Initial wallet setup failed! Manual Intervention is required." , err )
}
err = s . checkIntegrity ( )
if err != nil {
return err
}
2020-08-08 01:12:55 +02:00
if s . DbChannelData . TransferState < shared . TransferStateComplete {
cert , err := s . daemon . ChannelExport ( s . DbChannelData . ChannelClaimID , nil , nil )
2019-08-23 01:28:51 +02:00
if err != nil {
return errors . Prefix ( "error getting channel cert" , err )
}
if cert != nil {
2020-08-08 01:12:55 +02:00
err = s . Manager . ApiConfig . SetChannelCert ( string ( * cert ) , s . DbChannelData . ChannelClaimID )
2019-08-23 01:28:51 +02:00
if err != nil {
return errors . Prefix ( "error setting channel cert" , err )
}
}
}
2017-10-11 19:13:47 +02:00
2020-08-08 01:12:55 +02:00
for i := 0 ; i < s . Manager . CliFlags . ConcurrentJobs ; i ++ {
2018-08-14 16:48:55 +02:00
s . grp . Add ( 1 )
2018-08-31 17:42:15 +02:00
go func ( i int ) {
2018-08-14 16:48:55 +02:00
defer s . grp . Done ( )
s . startWorker ( i )
2018-08-31 17:42:15 +02:00
} ( i )
2017-10-11 04:02:16 +02:00
}
2020-08-08 01:12:55 +02:00
if s . DbChannelData . DesiredChannelName == "@UCBerkeley" {
2019-06-06 23:25:31 +02:00
err = errors . Err ( "UCB is not supported in this version of YTSYNC" )
2018-02-13 18:47:05 +01:00
} else {
err = s . enqueueYoutubeVideos ( )
}
2017-12-28 18:14:33 +01:00
close ( s . queue )
2018-08-14 16:48:55 +02:00
s . grp . Wait ( )
2020-11-03 22:03:38 +01:00
if err != nil {
return err
}
if s . hardVideoFailure . failed {
return errors . Err ( s . hardVideoFailure . failureReason )
}
return nil
2017-10-11 04:02:16 +02:00
}
2020-11-03 22:03:38 +01:00
func ( s * Sync ) startWorker ( workerNum int ) {
2020-07-27 20:48:05 +02:00
var v ytapi . Video
2017-12-28 18:14:33 +01:00
var more bool
2020-11-03 21:41:39 +01:00
fatalErrors := [ ] string {
":5279: read: connection reset by peer" ,
"no space left on device" ,
"NotEnoughFunds" ,
"Cannot publish using channel" ,
"cannot concatenate 'str' and 'NoneType' objects" ,
"more than 90% of the space has been used." ,
"Couldn't find private key for id" ,
"You already have a stream claim published under the name" ,
"Missing inputs" ,
}
errorsNoRetry := [ ] string {
"non 200 status code received" ,
"This video contains content from" ,
"dont know which claim to update" ,
"uploader has not made this video available in your country" ,
"download error: AccessDenied: Access Denied" ,
"Playback on other websites has been disabled by the video owner" ,
"Error in daemon: Cannot publish empty file" ,
"Error extracting sts from embedded url response" ,
"Unable to extract signature tokens" ,
"Client.Timeout exceeded while awaiting headers" ,
"the video is too big to sync, skipping for now" ,
"video is too long to process" ,
"video is too short to process" ,
"no compatible format available for this video" ,
"Watch this video on YouTube." ,
"have blocked it on copyright grounds" ,
"the video must be republished as we can't get the right size" ,
"HTTP Error 403" ,
"giving up after 0 fragment retries" ,
"Sorry about that" ,
"This video is not available" ,
"requested format not available" ,
"interrupted by user" ,
"Sign in to confirm your age" ,
"This video is unavailable" ,
"video is a live stream and hasn't completed yet" ,
}
walletErrors := [ ] string {
"Not enough funds to cover this transaction" ,
"failed: Not enough funds" ,
"Error in daemon: Insufficient funds, please deposit additional LBC" ,
//"Missing inputs",
}
blockchainErrors := [ ] string {
"txn-mempool-conflict" ,
"too-long-mempool-chain" ,
}
2017-11-02 16:20:22 +01:00
for {
2017-12-28 18:14:33 +01:00
select {
2018-06-25 22:13:28 +02:00
case <- s . grp . Ch ( ) :
2017-12-28 18:14:33 +01:00
log . Printf ( "Stopping worker %d" , workerNum )
2020-11-03 22:03:38 +01:00
return
2017-12-28 18:14:33 +01:00
default :
2017-11-02 16:20:22 +01:00
}
2017-12-28 18:14:33 +01:00
select {
case v , more = <- s . queue :
if ! more {
2020-11-03 22:03:38 +01:00
return
2017-12-28 18:14:33 +01:00
}
2018-06-25 22:13:28 +02:00
case <- s . grp . Ch ( ) :
2017-12-28 18:14:33 +01:00
log . Printf ( "Stopping worker %d" , workerNum )
2020-11-03 22:03:38 +01:00
return
2017-10-11 04:02:16 +02:00
}
2018-02-13 18:47:05 +01:00
log . Println ( "================================================================================" )
2017-10-11 04:02:16 +02:00
2017-12-28 18:14:33 +01:00
tryCount := 0
for {
2020-07-29 03:34:08 +02:00
select { // check again inside the loop so this dies faster
case <- s . grp . Ch ( ) :
log . Printf ( "Stopping worker %d" , workerNum )
2020-11-03 22:03:38 +01:00
return
2020-07-29 03:34:08 +02:00
default :
}
2017-12-28 18:14:33 +01:00
tryCount ++
err := s . processVideo ( v )
if err != nil {
2020-11-03 21:41:39 +01:00
logUtils . SendErrorToSlack ( "error processing video %s: %s" , v . ID ( ) , err . Error ( ) )
shouldRetry := s . Manager . CliFlags . MaxTries > 1 && ! util . SubstringInSlice ( err . Error ( ) , errorsNoRetry ) && tryCount < s . Manager . CliFlags . MaxTries
2019-07-12 23:20:01 +02:00
if strings . Contains ( strings . ToLower ( err . Error ( ) ) , "interrupted by user" ) {
2018-06-25 22:13:28 +02:00
s . grp . Stop ( )
2020-11-03 21:41:39 +01:00
} else if util . SubstringInSlice ( err . Error ( ) , fatalErrors ) {
2020-11-03 22:03:38 +01:00
s . hardVideoFailure . flagFailure ( err . Error ( ) )
2020-11-03 21:41:39 +01:00
s . grp . Stop ( )
} else if shouldRetry {
if util . SubstringInSlice ( err . Error ( ) , blockchainErrors ) {
log . Println ( "waiting for a block before retrying" )
err := s . waitForNewBlock ( )
if err != nil {
s . grp . Stop ( )
logUtils . SendErrorToSlack ( "something went wrong while waiting for a block: %s" , errors . FullTrace ( err ) )
break
}
} else if util . SubstringInSlice ( err . Error ( ) , walletErrors ) {
log . Println ( "checking funds and UTXOs before retrying..." )
err := s . walletSetup ( )
if err != nil {
s . grp . Stop ( )
logUtils . SendErrorToSlack ( "failed to setup the wallet for a refill: %s" , errors . FullTrace ( err ) )
break
2018-04-20 21:06:55 +02:00
}
2020-11-03 21:41:39 +01:00
} else if strings . Contains ( err . Error ( ) , "Error in daemon: 'str' object has no attribute 'get'" ) {
time . Sleep ( 5 * time . Second )
2017-12-28 18:14:33 +01:00
}
2020-11-03 21:41:39 +01:00
log . Println ( "Retrying" )
continue
2017-12-28 18:14:33 +01:00
}
2020-11-04 17:22:27 +01:00
logUtils . SendErrorToSlack ( "Video %s failed after %d retries, skipping. Stack: %s" , v . ID ( ) , tryCount , errors . FullTrace ( err ) )
2020-11-03 21:41:39 +01:00
2019-06-26 20:40:40 +02:00
s . syncedVideosMux . RLock ( )
2019-05-24 19:01:16 +02:00
existingClaim , ok := s . syncedVideos [ v . ID ( ) ]
2019-06-26 20:40:40 +02:00
s . syncedVideosMux . RUnlock ( )
2019-05-24 19:01:16 +02:00
existingClaimID := ""
existingClaimName := ""
2019-06-12 03:35:21 +02:00
existingClaimSize := int64 ( 0 )
if v . Size ( ) != nil {
existingClaimSize = * v . Size ( )
}
2019-05-24 19:01:16 +02:00
if ok {
existingClaimID = existingClaim . ClaimID
existingClaimName = existingClaim . ClaimName
if existingClaim . Size > 0 {
2019-06-12 03:35:21 +02:00
existingClaimSize = existingClaim . Size
2019-05-24 19:01:16 +02:00
}
}
2020-08-08 01:12:55 +02:00
videoStatus := shared . VideoStatusFailed
2019-06-10 21:59:42 +02:00
if strings . Contains ( err . Error ( ) , "upgrade failed" ) {
2020-08-08 01:12:55 +02:00
videoStatus = shared . VideoStatusUpgradeFailed
2019-06-12 03:35:21 +02:00
} else {
s . AppendSyncedVideo ( v . ID ( ) , false , err . Error ( ) , existingClaimName , existingClaimID , 0 , existingClaimSize )
2019-06-10 21:59:42 +02:00
}
2020-08-08 01:12:55 +02:00
err = s . Manager . ApiConfig . MarkVideoStatus ( shared . VideoStatus {
ChannelID : s . DbChannelData . ChannelId ,
2019-08-20 07:06:51 +02:00
VideoID : v . ID ( ) ,
Status : videoStatus ,
ClaimID : existingClaimID ,
ClaimName : existingClaimName ,
FailureReason : err . Error ( ) ,
Size : & existingClaimSize ,
} )
2018-07-21 01:56:36 +02:00
if err != nil {
2019-08-04 00:34:48 +02:00
logUtils . SendErrorToSlack ( "Failed to mark video on the database: %s" , errors . FullTrace ( err ) )
2018-07-25 19:08:28 +02:00
}
2017-12-28 18:14:33 +01:00
}
break
2017-11-06 22:42:52 +01:00
}
2017-10-11 04:02:16 +02:00
}
}
2018-02-13 18:47:05 +01:00
func ( s * Sync ) enqueueYoutubeVideos ( ) error {
2020-07-27 20:48:05 +02:00
defer func ( start time . Time ) { timing . TimedComponent ( "enqueueYoutubeVideos" ) . Add ( time . Since ( start ) ) } ( time . Now ( ) )
2017-10-11 04:02:16 +02:00
2019-12-18 18:22:15 +01:00
ipPool , err := ip_manager . GetIPPool ( s . grp )
2019-12-10 23:02:56 +01:00
if err != nil {
return err
}
2019-12-27 18:12:41 +01:00
2020-11-19 03:11:23 +01:00
videos , err := ytapi . GetVideosToSync ( s . Manager . ApiConfig , s . DbChannelData . ChannelId , s . syncedVideos , s . Manager . CliFlags . QuickSync , s . Manager . CliFlags . VideosToSync ( s . DbChannelData . TotalSubscribers ) , ytapi . VideoParams {
2020-07-27 20:48:05 +02:00
VideoDir : s . videoDirectory ,
2020-08-08 01:12:55 +02:00
S3Config : * s . Manager . AwsConfigs . GetS3AWSConfig ( ) ,
2020-07-29 03:34:08 +02:00
Stopper : s . grp ,
2020-07-27 20:48:05 +02:00
IPPool : ipPool ,
2020-08-08 01:12:55 +02:00
} , s . DbChannelData . LastUploadedVideo )
2020-07-27 20:48:05 +02:00
if err != nil {
return err
2019-05-31 16:38:31 +02:00
}
2017-10-11 04:02:16 +02:00
2017-10-11 19:13:47 +02:00
Enqueue :
2017-10-11 04:02:16 +02:00
for _ , v := range videos {
select {
2018-06-25 22:13:28 +02:00
case <- s . grp . Ch ( ) :
2017-12-28 18:14:33 +01:00
break Enqueue
default :
}
select {
case s . queue <- v :
2018-06-25 22:13:28 +02:00
case <- s . grp . Ch ( ) :
2017-10-11 19:13:47 +02:00
break Enqueue
2017-10-11 04:02:16 +02:00
}
}
return nil
}
2020-07-27 20:48:05 +02:00
func ( s * Sync ) processVideo ( v ytapi . Video ) ( err error ) {
2018-03-09 17:47:38 +01:00
defer func ( ) {
if p := recover ( ) ; p != nil {
2019-12-27 01:39:27 +01:00
logUtils . SendErrorToSlack ( "Video processing panic! %s" , debug . Stack ( ) )
2018-03-09 17:47:38 +01:00
var ok bool
err , ok = p . ( error )
if ! ok {
err = errors . Err ( "%v" , p )
}
err = errors . Wrap ( p , 2 )
}
} ( )
2017-12-28 18:14:33 +01:00
log . Println ( "Processing " + v . IDAndNum ( ) )
2017-11-02 16:20:22 +01:00
defer func ( start time . Time ) {
2017-12-28 18:14:33 +01:00
log . Println ( v . ID ( ) + " took " + time . Since ( start ) . String ( ) )
2017-11-02 16:20:22 +01:00
} ( time . Now ( ) )
2017-10-11 04:02:16 +02:00
2018-08-23 00:28:31 +02:00
s . syncedVideosMux . RLock ( )
2018-07-31 01:19:12 +02:00
sv , ok := s . syncedVideos [ v . ID ( ) ]
2018-08-23 00:28:31 +02:00
s . syncedVideosMux . RUnlock ( )
2019-06-06 16:24:20 +02:00
newMetadataVersion := int8 ( 2 )
2018-07-31 01:19:12 +02:00
alreadyPublished := ok && sv . Published
2020-08-08 01:12:55 +02:00
videoRequiresUpgrade := ok && s . Manager . CliFlags . UpgradeMetadata && sv . MetadataVersion < newMetadataVersion
2018-07-31 01:19:12 +02:00
2020-08-18 00:03:38 +02:00
neverRetryFailures := shared . NeverRetryFailures
2018-08-02 14:05:06 +02:00
if ok && ! sv . Published && util . SubstringInSlice ( sv . FailureReason , neverRetryFailures ) {
log . Println ( v . ID ( ) + " can't ever be published" )
2018-07-31 01:19:12 +02:00
return nil
2017-10-11 04:02:16 +02:00
}
2019-06-06 16:24:20 +02:00
if alreadyPublished && ! videoRequiresUpgrade {
2017-12-28 18:14:33 +01:00
log . Println ( v . ID ( ) + " already published" )
return nil
2017-10-11 04:02:16 +02:00
}
2019-05-09 17:06:56 +02:00
if ok && sv . MetadataVersion >= newMetadataVersion {
log . Println ( v . ID ( ) + " upgraded to the new metadata" )
return nil
}
2017-10-11 04:02:16 +02:00
2020-11-19 03:11:23 +01:00
if ! videoRequiresUpgrade && v . PlaylistPosition ( ) >= s . Manager . CliFlags . VideosToSync ( s . DbChannelData . TotalSubscribers ) {
2018-04-25 23:06:17 +02:00
log . Println ( v . ID ( ) + " is old: skipping" )
return nil
}
2018-08-17 20:05:39 +02:00
err = s . Manager . checkUsedSpace ( )
if err != nil {
return err
}
2020-01-12 04:01:40 +01:00
da , err := s . getDefaultAccount ( )
if err != nil {
return err
}
2019-05-03 05:11:52 +02:00
sp := sources . SyncParams {
2021-03-23 01:18:26 +01:00
ClaimAddress : s . DbChannelData . PublishAddress . Address ,
2019-05-03 05:11:52 +02:00
Amount : publishAmount ,
2020-08-08 01:12:55 +02:00
ChannelID : s . DbChannelData . ChannelClaimID ,
MaxVideoSize : s . DbChannelData . SizeLimit ,
2019-05-03 05:11:52 +02:00
Namer : s . namer ,
2020-08-08 01:12:55 +02:00
MaxVideoLength : time . Duration ( s . DbChannelData . LengthLimit ) * time . Minute ,
Fee : s . DbChannelData . Fee ,
2020-01-12 04:01:40 +01:00
DefaultAccount : da ,
2019-05-03 05:11:52 +02:00
}
2018-09-18 21:20:34 +02:00
2019-06-12 03:17:59 +02:00
summary , err := v . Sync ( s . daemon , sp , & sv , videoRequiresUpgrade , s . walletMux )
2017-10-11 04:02:16 +02:00
if err != nil {
return err
}
2018-09-18 21:20:34 +02:00
2019-06-12 03:35:21 +02:00
s . AppendSyncedVideo ( v . ID ( ) , true , "" , summary . ClaimName , summary . ClaimID , newMetadataVersion , * v . Size ( ) )
2020-08-08 01:12:55 +02:00
err = s . Manager . ApiConfig . MarkVideoStatus ( shared . VideoStatus {
ChannelID : s . DbChannelData . ChannelId ,
2019-08-20 07:06:51 +02:00
VideoID : v . ID ( ) ,
2020-08-08 01:12:55 +02:00
Status : shared . VideoStatusPublished ,
2019-08-20 07:06:51 +02:00
ClaimID : summary . ClaimID ,
ClaimName : summary . ClaimName ,
Size : v . Size ( ) ,
2020-08-08 01:12:55 +02:00
MetaDataVersion : shared . LatestMetadataVersion ,
2019-08-23 01:28:51 +02:00
IsTransferred : util . PtrToBool ( s . shouldTransfer ( ) ) ,
2019-08-20 07:06:51 +02:00
} )
2017-10-11 04:02:16 +02:00
if err != nil {
2019-08-04 00:34:48 +02:00
logUtils . SendErrorToSlack ( "Failed to mark video on the database: %s" , errors . FullTrace ( err ) )
2017-11-03 13:46:27 +01:00
}
return nil
}
2017-12-30 01:21:16 +01:00
2019-09-24 20:42:17 +02:00
func ( s * Sync ) importPublicKey ( ) error {
2020-08-08 01:12:55 +02:00
if s . DbChannelData . PublicKey != "" {
2019-12-08 17:48:24 +01:00
accountsResponse , err := s . daemon . AccountList ( 1 , 50 )
2019-09-24 20:42:17 +02:00
if err != nil {
return errors . Err ( err )
}
2019-12-08 17:48:24 +01:00
ledger := "lbc_mainnet"
2019-09-24 20:42:17 +02:00
if logUtils . IsRegTest ( ) {
2019-12-08 17:48:24 +01:00
ledger = "lbc_regtest"
2019-09-24 20:42:17 +02:00
}
2019-12-08 17:48:24 +01:00
for _ , a := range accountsResponse . Items {
if * a . Ledger == ledger {
2020-08-08 01:12:55 +02:00
if a . PublicKey == s . DbChannelData . PublicKey {
2019-12-08 17:48:24 +01:00
return nil
}
2019-09-24 20:42:17 +02:00
}
}
2020-08-08 01:12:55 +02:00
log . Infof ( "Could not find public key %s in the wallet. Importing it..." , s . DbChannelData . PublicKey )
_ , err = s . daemon . AccountAdd ( s . DbChannelData . DesiredChannelName , nil , nil , & s . DbChannelData . PublicKey , util . PtrToBool ( true ) , nil )
2019-09-24 20:42:17 +02:00
return errors . Err ( err )
}
return nil
}
2019-10-08 01:38:39 +02:00
//TODO: fully implement this once I find a way to reliably get the abandoned supports amount
func ( s * Sync ) getUnsentSupports ( ) ( float64 , error ) {
defaultAccount , err := s . getDefaultAccount ( )
if err != nil {
return 0 , errors . Err ( err )
}
2020-08-08 01:12:55 +02:00
if s . DbChannelData . TransferState == 2 {
2019-10-08 01:38:39 +02:00
balance , err := s . daemon . AccountBalance ( & defaultAccount )
if err != nil {
return 0 , err
} else if balance == nil {
return 0 , errors . Err ( "no response" )
}
balanceAmount , err := strconv . ParseFloat ( balance . Available . String ( ) , 64 )
if err != nil {
return 0 , errors . Err ( err )
}
2020-07-10 20:19:34 +02:00
transactionList , err := s . daemon . TransactionList ( & defaultAccount , nil , 1 , 90000 )
2019-10-08 01:38:39 +02:00
if err != nil {
return 0 , errors . Err ( err )
}
sentSupports := 0.0
2019-12-08 17:48:24 +01:00
for _ , t := range transactionList . Items {
2019-10-08 01:38:39 +02:00
if len ( t . SupportInfo ) == 0 {
continue
}
for _ , support := range t . SupportInfo {
supportAmount , err := strconv . ParseFloat ( support . BalanceDelta , 64 )
if err != nil {
return 0 , err
}
if supportAmount < 0 { // && support.IsTip TODO: re-enable this when transaction list shows correct information
sentSupports += - supportAmount
}
}
}
2020-08-08 01:12:55 +02:00
if balanceAmount > 10 && sentSupports < 1 && s . DbChannelData . TransferState > 1 {
logUtils . SendErrorToSlack ( "(%s) this channel has quite some LBCs in it (%.2f) and %.2f LBC in sent tips, it's likely that the tips weren't actually sent or the wallet has unnecessary extra credits in it" , s . DbChannelData . ChannelId , balanceAmount , sentSupports )
2019-10-08 01:38:39 +02:00
return balanceAmount - 10 , nil
}
}
return 0 , nil
}
2018-04-20 21:06:55 +02:00
// waitForDaemonProcess observes the running processes and returns when the process is no longer running or when the timeout is up
func waitForDaemonProcess ( timeout time . Duration ) error {
2020-07-27 20:48:05 +02:00
stopTime := time . Now ( ) . Add ( timeout * time . Second )
2018-04-20 21:06:55 +02:00
for ! time . Now ( ) . After ( stopTime ) {
wait := 10 * time . Second
log . Println ( "the daemon is still running, waiting for it to exit" )
time . Sleep ( wait )
2019-09-25 05:07:19 +02:00
running , err := logUtils . IsLbrynetRunning ( )
2018-04-20 21:06:55 +02:00
if err != nil {
2019-09-25 05:07:19 +02:00
return errors . Err ( err )
2018-04-20 21:06:55 +02:00
}
2019-09-25 05:07:19 +02:00
if ! running {
2020-11-04 18:13:30 +01:00
log . Println ( "daemon stopped" )
2018-04-20 21:06:55 +02:00
return nil
}
}
return errors . Err ( "timeout reached" )
}