2017-10-11 04:02:16 +02:00
package ytsync
import (
2018-02-13 18:47:05 +01:00
"bufio"
"encoding/csv"
2017-12-30 01:21:16 +01:00
"encoding/json"
2018-05-17 01:42:06 +02:00
"fmt"
2018-02-13 18:47:05 +01:00
"io"
2017-10-11 04:02:16 +02:00
"io/ioutil"
"net/http"
"os"
2017-11-03 13:46:27 +01:00
"os/exec"
2017-12-28 18:14:33 +01:00
"os/signal"
2017-10-11 04:02:16 +02:00
"sort"
"strings"
"sync"
2017-12-28 18:14:33 +01:00
"syscall"
2017-10-11 04:02:16 +02:00
"time"
2018-08-09 22:54:23 +02:00
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
2018-03-09 17:47:38 +01:00
"github.com/lbryio/lbry.go/errors"
2017-10-11 04:02:16 +02:00
"github.com/lbryio/lbry.go/jsonrpc"
2018-06-25 22:13:28 +02:00
"github.com/lbryio/lbry.go/stop"
2018-05-26 02:43:16 +02:00
"github.com/lbryio/lbry.go/util"
2018-07-31 19:42:20 +02:00
"github.com/lbryio/lbry.go/ytsync/redisdb"
2017-12-28 18:14:33 +01:00
"github.com/lbryio/lbry.go/ytsync/sources"
2018-06-13 18:44:09 +02:00
"github.com/mitchellh/go-ps"
2017-10-11 04:02:16 +02:00
log "github.com/sirupsen/logrus"
"google.golang.org/api/googleapi/transport"
2018-06-09 01:14:55 +02:00
"google.golang.org/api/youtube/v3"
2017-10-11 04:02:16 +02:00
)
const (
2018-08-01 14:56:04 +02:00
channelClaimAmount = 0.01
publishAmount = 0.01
2017-10-11 04:02:16 +02:00
)
2017-12-28 18:14:33 +01:00
type video interface {
ID ( ) string
IDAndNum ( ) string
2018-04-25 20:56:26 +02:00
PlaylistPosition ( ) int
2017-12-28 18:14:33 +01:00
PublishedAt ( ) time . Time
2018-08-01 14:56:04 +02:00
Sync ( * jsonrpc . Client , string , float64 , string , int ) ( * sources . SyncSummary , error )
2017-12-28 18:14:33 +01:00
}
// sorting videos
type byPublishedAt [ ] video
func ( a byPublishedAt ) Len ( ) int { return len ( a ) }
func ( a byPublishedAt ) Swap ( i , j int ) { a [ i ] , a [ j ] = a [ j ] , a [ i ] }
func ( a byPublishedAt ) Less ( i , j int ) bool { return a [ i ] . PublishedAt ( ) . Before ( a [ j ] . PublishedAt ( ) ) }
2017-10-11 04:02:16 +02:00
// Sync stores the options that control how syncing happens
type Sync struct {
2017-11-06 22:42:52 +01:00
YoutubeAPIKey string
YoutubeChannelID string
LbryChannelName string
StopOnError bool
MaxTries int
ConcurrentVideos int
TakeOverExistingChannel bool
2018-03-12 21:58:37 +01:00
Refill int
2018-07-17 18:54:22 +02:00
Manager * SyncManager
2018-08-08 23:59:59 +02:00
LbrycrdString string
2018-08-09 22:54:23 +02:00
AwsS3ID string
AwsS3Secret string
AwsS3Region string
AwsS3Bucket string
2017-10-11 04:02:16 +02:00
2018-08-14 16:48:55 +02:00
daemon * jsonrpc . Client
claimAddress string
videoDirectory string
db * redisdb . DB
syncedVideos map [ string ] syncedVideo
syncedVideosMux * sync . Mutex
grp * stop . Group
lbryChannelID string
walletMux * sync . Mutex
queue chan video
2018-08-03 19:21:42 +02:00
}
func ( s * Sync ) AppendSyncedVideo ( videoID string , published bool , failureReason string ) {
2018-08-14 16:48:55 +02:00
s . syncedVideosMux . Lock ( )
defer s . syncedVideosMux . Unlock ( )
2018-08-03 19:21:42 +02:00
s . syncedVideos [ videoID ] = syncedVideo {
VideoID : videoID ,
Published : published ,
FailureReason : failureReason ,
}
2017-11-02 16:20:22 +01:00
}
2018-07-24 02:01:35 +02:00
// SendErrorToSlack Sends an error message to the default channel and to the process log.
func SendErrorToSlack ( format string , a ... interface { } ) error {
message := format
if len ( a ) > 0 {
message = fmt . Sprintf ( format , a ... )
}
log . Errorln ( message )
return util . SendToSlack ( ":sos: " + message )
}
// SendInfoToSlack Sends an info message to the default channel and to the process log.
func SendInfoToSlack ( format string , a ... interface { } ) error {
message := format
if len ( a ) > 0 {
message = fmt . Sprintf ( format , a ... )
}
log . Infoln ( message )
return util . SendToSlack ( ":information_source: " + message )
}
2018-05-24 02:32:11 +02:00
// IsInterrupted can be queried to discover if the sync process was interrupted manually
func ( s * Sync ) IsInterrupted ( ) bool {
select {
2018-07-12 14:28:20 +02:00
case <- s . grp . Ch ( ) :
2018-05-24 02:32:11 +02:00
return true
default :
return false
}
}
2018-08-09 22:54:23 +02:00
func ( s * Sync ) downloadWallet ( ) error {
defaultWalletDir := os . Getenv ( "HOME" ) + "/.lbryum/wallets/default_wallet"
defaultTempWalletDir := os . Getenv ( "HOME" ) + "/.lbryum/wallets/tmp_wallet"
key := aws . String ( "/wallets/" + s . YoutubeChannelID )
if os . Getenv ( "REGTEST" ) == "true" {
defaultWalletDir = os . Getenv ( "HOME" ) + "/.lbryum_regtest/wallets/default_wallet"
defaultTempWalletDir = os . Getenv ( "HOME" ) + "/.lbryum_regtest/wallets/tmp_wallet"
key = aws . String ( "/regtest/" + s . YoutubeChannelID )
}
if _ , err := os . Stat ( defaultWalletDir ) ; ! os . IsNotExist ( err ) {
return errors . Err ( "default_wallet already exists" )
}
creds := credentials . NewStaticCredentials ( s . AwsS3ID , s . AwsS3Secret , "" )
s3Session , err := session . NewSession ( & aws . Config { Region : aws . String ( s . AwsS3Region ) , Credentials : creds } )
if err != nil {
return err
}
downloader := s3manager . NewDownloader ( s3Session )
out , err := os . Create ( defaultTempWalletDir )
if err != nil {
return err
}
defer out . Close ( )
bytesWritten , err := downloader . Download ( out , & s3 . GetObjectInput {
Bucket : aws . String ( s . AwsS3Bucket ) ,
Key : key ,
} )
if err != nil {
// Casting to the awserr.Error type will allow you to inspect the error
// code returned by the service in code. The error code can be used
// to switch on context specific functionality. In this case a context
// specific error message is printed to the user based on the bucket
// and key existing.
//
// For information on other S3 API error codes see:
// http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
if aerr , ok := err . ( awserr . Error ) ; ok {
code := aerr . Code ( )
if code == s3 . ErrCodeNoSuchKey {
return errors . Err ( "wallet not on S3" )
}
}
return err
} else if bytesWritten == 0 {
return errors . Err ( "zero bytes written" )
}
return os . Rename ( defaultTempWalletDir , defaultWalletDir )
}
func ( s * Sync ) uploadWallet ( ) error {
defaultWalletDir := os . Getenv ( "HOME" ) + "/.lbryum/wallets/default_wallet"
key := aws . String ( "/wallets/" + s . YoutubeChannelID )
if os . Getenv ( "REGTEST" ) == "true" {
defaultWalletDir = os . Getenv ( "HOME" ) + "/.lbryum_regtest/wallets/default_wallet"
key = aws . String ( "/regtest/" + s . YoutubeChannelID )
}
if _ , err := os . Stat ( defaultWalletDir ) ; os . IsNotExist ( err ) {
return errors . Err ( "default_wallet does not exist" )
}
creds := credentials . NewStaticCredentials ( s . AwsS3ID , s . AwsS3Secret , "" )
s3Session , err := session . NewSession ( & aws . Config { Region : aws . String ( s . AwsS3Region ) , Credentials : creds } )
if err != nil {
return err
}
uploader := s3manager . NewUploader ( s3Session )
file , err := os . Open ( defaultWalletDir )
if err != nil {
return err
}
defer file . Close ( )
_ , err = uploader . Upload ( & s3manager . UploadInput {
Bucket : aws . String ( s . AwsS3Bucket ) ,
Key : key ,
Body : file ,
} )
if err != nil {
return err
}
return os . Remove ( defaultWalletDir )
}
2018-07-17 18:54:22 +02:00
func ( s * Sync ) FullCycle ( ) ( e error ) {
2017-11-03 16:46:19 +01:00
if os . Getenv ( "HOME" ) == "" {
2018-03-09 17:47:38 +01:00
return errors . Err ( "no $HOME env var found" )
2017-11-03 16:46:19 +01:00
}
2017-12-30 01:21:16 +01:00
if s . YoutubeChannelID == "" {
2018-07-17 18:54:22 +02:00
return errors . Err ( "channel ID not provided" )
}
2018-08-14 16:48:55 +02:00
s . syncedVideosMux = & sync . Mutex { }
s . walletMux = & sync . Mutex { }
s . db = redisdb . New ( )
s . grp = stop . New ( )
s . queue = make ( chan video )
interruptChan := make ( chan os . Signal , 1 )
signal . Notify ( interruptChan , os . Interrupt , syscall . SIGTERM )
go func ( ) {
<- interruptChan
log . Println ( "Got interrupt signal, shutting down (if publishing, will shut down after current publish)" )
s . grp . Stop ( )
} ( )
2018-07-31 19:42:20 +02:00
syncedVideos , err := s . Manager . setChannelStatus ( s . YoutubeChannelID , StatusSyncing )
2018-07-17 18:54:22 +02:00
if err != nil {
return err
2017-12-30 01:21:16 +01:00
}
2018-08-14 16:48:55 +02:00
s . syncedVideosMux . Lock ( )
s . syncedVideos = syncedVideos
s . syncedVideosMux . Unlock ( )
2018-07-31 19:42:20 +02:00
2018-08-09 22:54:23 +02:00
defer s . updateChannelStatus ( & e )
2017-11-03 16:46:19 +01:00
2018-08-09 22:54:23 +02:00
err = s . downloadWallet ( )
if err != nil && err . Error ( ) != "wallet not on S3" {
return errors . Prefix ( "failure in downloading wallet: " , err )
} else if err == nil {
2018-08-08 23:59:59 +02:00
log . Println ( "Continuing previous upload" )
2018-08-09 22:54:23 +02:00
} else {
log . Println ( "Starting new wallet" )
2017-11-03 13:46:27 +01:00
}
2018-08-09 22:54:23 +02:00
defer s . stopAndUploadWallet ( & e )
2017-11-03 13:46:27 +01:00
2017-12-28 18:14:33 +01:00
s . videoDirectory , err = ioutil . TempDir ( "" , "ytsync" )
if err != nil {
return errors . Wrap ( err , 0 )
}
2017-11-03 13:46:27 +01:00
2017-12-28 18:14:33 +01:00
log . Printf ( "Starting daemon" )
err = startDaemonViaSystemd ( )
2017-11-03 13:46:27 +01:00
if err != nil {
return err
}
2017-12-28 18:14:33 +01:00
log . Infoln ( "Waiting for daemon to finish starting..." )
2018-03-09 01:27:54 +01:00
s . daemon = jsonrpc . NewClient ( "" )
2018-06-18 01:50:59 +02:00
s . daemon . SetRPCTimeout ( 40 * time . Minute )
2018-03-09 01:27:54 +01:00
2018-08-09 22:54:23 +02:00
err = s . waitForDaemonStart ( )
if err != nil {
return err
2018-03-09 01:27:54 +01:00
}
2017-11-03 13:46:27 +01:00
2017-12-28 18:14:33 +01:00
err = s . doSync ( )
2017-11-03 13:46:27 +01:00
if err != nil {
return err
2017-12-28 18:14:33 +01:00
} else {
// wait for reflection to finish???
wait := 15 * time . Second // should bump this up to a few min, but keeping it low for testing
log . Println ( "Waiting " + wait . String ( ) + " to finish reflecting everything" )
time . Sleep ( wait )
2017-11-03 13:46:27 +01:00
}
return nil
}
2018-08-09 22:54:23 +02:00
func ( s * Sync ) updateChannelStatus ( e * error ) {
if * e != nil {
//conditions for which a channel shouldn't be marked as failed
noFailConditions := [ ] string {
"this youtube channel is being managed by another server" ,
}
if util . SubstringInSlice ( ( * e ) . Error ( ) , noFailConditions ) {
return
}
_ , err := s . Manager . setChannelStatus ( s . YoutubeChannelID , StatusFailed )
if err != nil {
msg := fmt . Sprintf ( "Failed setting failed state for channel %s." , s . LbryChannelName )
err = errors . Prefix ( msg , err )
* e = errors . Prefix ( err . Error ( ) , * e )
}
} else if ! s . IsInterrupted ( ) {
_ , err := s . Manager . setChannelStatus ( s . YoutubeChannelID , StatusSynced )
if err != nil {
* e = err
}
}
}
func ( s * Sync ) waitForDaemonStart ( ) error {
for {
select {
case <- s . grp . Ch ( ) :
return errors . Err ( "interrupted during daemon startup" )
default :
_ , err := s . daemon . WalletBalance ( )
if err == nil {
return nil
}
time . Sleep ( 5 * time . Second )
}
}
}
func ( s * Sync ) stopAndUploadWallet ( e * error ) {
log . Printf ( "Stopping daemon" )
shutdownErr := stopDaemonViaSystemd ( )
if shutdownErr != nil {
logShutdownError ( shutdownErr )
} else {
// the cli will return long before the daemon effectively stops. we must observe the processes running
// before moving the wallet
waitTimeout := 8 * time . Minute
processDeathError := waitForDaemonProcess ( waitTimeout )
if processDeathError != nil {
logShutdownError ( processDeathError )
} else {
err := s . uploadWallet ( )
if err != nil {
if * e == nil {
e = & err
return
} else {
* e = errors . Prefix ( "failure uploading wallet: " , e )
}
}
}
}
}
2018-04-26 12:12:54 +02:00
func logShutdownError ( shutdownErr error ) {
2018-07-24 02:01:35 +02:00
SendErrorToSlack ( "error shutting down daemon: %v" , shutdownErr )
SendErrorToSlack ( "WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR" )
2018-04-26 12:12:54 +02:00
}
2017-11-03 13:46:27 +01:00
2017-12-28 18:14:33 +01:00
func ( s * Sync ) doSync ( ) error {
2017-10-11 19:13:47 +02:00
var err error
2017-12-28 18:14:33 +01:00
err = s . walletSetup ( )
2017-10-11 19:13:47 +02:00
if err != nil {
2018-07-17 20:58:47 +02:00
return errors . Prefix ( "Initial wallet setup failed! Manual Intervention is required." , err )
2017-10-11 19:13:47 +02:00
}
2017-10-11 04:02:16 +02:00
if s . StopOnError {
log . Println ( "Will stop publishing if an error is detected" )
}
for i := 0 ; i < s . ConcurrentVideos ; i ++ {
2018-08-14 16:48:55 +02:00
s . grp . Add ( 1 )
go func ( ) {
defer s . grp . Done ( )
s . startWorker ( i )
} ( )
2017-10-11 04:02:16 +02:00
}
2018-02-13 18:47:05 +01:00
if s . LbryChannelName == "@UCBerkeley" {
err = s . enqueueUCBVideos ( )
} else {
err = s . enqueueYoutubeVideos ( )
}
2017-12-28 18:14:33 +01:00
close ( s . queue )
2018-08-14 16:48:55 +02:00
s . grp . Wait ( )
2017-10-11 04:02:16 +02:00
return err
}
2017-12-28 18:14:33 +01:00
func ( s * Sync ) startWorker ( workerNum int ) {
var v video
var more bool
2017-11-02 16:20:22 +01:00
for {
2017-12-28 18:14:33 +01:00
select {
2018-06-25 22:13:28 +02:00
case <- s . grp . Ch ( ) :
2017-12-28 18:14:33 +01:00
log . Printf ( "Stopping worker %d" , workerNum )
return
default :
2017-11-02 16:20:22 +01:00
}
2017-12-28 18:14:33 +01:00
select {
case v , more = <- s . queue :
if ! more {
return
}
2018-06-25 22:13:28 +02:00
case <- s . grp . Ch ( ) :
2017-12-28 18:14:33 +01:00
log . Printf ( "Stopping worker %d" , workerNum )
return
2017-10-11 04:02:16 +02:00
}
2018-02-13 18:47:05 +01:00
log . Println ( "================================================================================" )
2017-10-11 04:02:16 +02:00
2017-12-28 18:14:33 +01:00
tryCount := 0
for {
tryCount ++
err := s . processVideo ( v )
2017-10-11 04:02:16 +02:00
2017-12-28 18:14:33 +01:00
if err != nil {
2018-04-26 12:12:54 +02:00
logMsg := fmt . Sprintf ( "error processing video: " + err . Error ( ) )
log . Errorln ( logMsg )
2018-04-30 21:18:27 +02:00
fatalErrors := [ ] string {
":5279: read: connection reset by peer" ,
2018-06-06 23:47:28 +02:00
"no space left on device" ,
2018-06-15 23:03:28 +02:00
"NotEnoughFunds" ,
2018-06-18 01:50:59 +02:00
"Cannot publish using channel" ,
2018-08-09 22:54:23 +02:00
"cannot concatenate 'str' and 'NoneType' objects" ,
2018-08-17 20:05:39 +02:00
"more than 90% of the space has been used." ,
2018-04-30 21:18:27 +02:00
}
2018-07-24 02:01:35 +02:00
if util . SubstringInSlice ( err . Error ( ) , fatalErrors ) || s . StopOnError {
2018-06-25 22:13:28 +02:00
s . grp . Stop ( )
2017-12-28 18:14:33 +01:00
} else if s . MaxTries > 1 {
2018-04-30 21:18:27 +02:00
errorsNoRetry := [ ] string {
"non 200 status code received" ,
" reason: 'This video contains content from" ,
"dont know which claim to update" ,
"uploader has not made this video available in your country" ,
"download error: AccessDenied: Access Denied" ,
"Playback on other websites has been disabled by the video owner" ,
"Error in daemon: Cannot publish empty file" ,
2018-05-07 22:26:46 +02:00
"Error extracting sts from embedded url response" ,
2018-06-06 23:47:28 +02:00
"Client.Timeout exceeded while awaiting headers)" ,
2018-08-01 14:56:04 +02:00
"the video is too big to sync, skipping for now" ,
2018-04-30 21:18:27 +02:00
}
2018-07-24 02:01:35 +02:00
if util . SubstringInSlice ( err . Error ( ) , errorsNoRetry ) {
2017-12-28 18:14:33 +01:00
log . Println ( "This error should not be retried at all" )
2018-04-20 21:06:55 +02:00
} else if tryCount < s . MaxTries {
2018-06-18 01:50:59 +02:00
if strings . Contains ( err . Error ( ) , "txn-mempool-conflict" ) ||
2018-04-30 21:18:27 +02:00
strings . Contains ( err . Error ( ) , "failed: Not enough funds" ) ||
2018-05-17 01:42:06 +02:00
strings . Contains ( err . Error ( ) , "Error in daemon: Insufficient funds, please deposit additional LBC" ) ||
2018-06-18 01:50:59 +02:00
strings . Contains ( err . Error ( ) , "too-long-mempool-chain" ) {
2018-04-20 21:06:55 +02:00
log . Println ( "waiting for a block and refilling addresses before retrying" )
2018-04-26 12:12:54 +02:00
err = s . walletSetup ( )
2018-04-20 21:06:55 +02:00
if err != nil {
2018-07-12 14:28:20 +02:00
s . grp . Stop ( )
2018-07-24 02:01:35 +02:00
SendErrorToSlack ( "Failed to setup the wallet for a refill: %v" , err )
2018-05-05 16:15:02 +02:00
break
2018-04-20 21:06:55 +02:00
}
}
2017-12-28 18:14:33 +01:00
log . Println ( "Retrying" )
continue
}
2018-07-24 02:01:35 +02:00
SendErrorToSlack ( "Video failed after %d retries, skipping. Stack: %s" , tryCount , logMsg )
2017-12-28 18:14:33 +01:00
}
2018-08-03 23:19:36 +02:00
s . AppendSyncedVideo ( v . ID ( ) , false , err . Error ( ) )
2018-08-02 16:37:22 +02:00
err = s . Manager . MarkVideoStatus ( s . YoutubeChannelID , v . ID ( ) , VideoStatusFailed , "" , "" , err . Error ( ) )
2018-07-21 01:56:36 +02:00
if err != nil {
2018-07-24 02:01:35 +02:00
SendErrorToSlack ( "Failed to mark video on the database: %s" , err . Error ( ) )
2018-07-25 19:08:28 +02:00
}
2017-12-28 18:14:33 +01:00
}
break
2017-11-06 22:42:52 +01:00
}
2017-10-11 04:02:16 +02:00
}
}
2018-02-13 18:47:05 +01:00
func ( s * Sync ) enqueueYoutubeVideos ( ) error {
2017-10-11 04:02:16 +02:00
client := & http . Client {
Transport : & transport . APIKey { Key : s . YoutubeAPIKey } ,
}
service , err := youtube . New ( client )
if err != nil {
2018-03-09 17:47:38 +01:00
return errors . Prefix ( "error creating YouTube service" , err )
2017-10-11 04:02:16 +02:00
}
2017-12-28 18:14:33 +01:00
response , err := service . Channels . List ( "contentDetails" ) . Id ( s . YoutubeChannelID ) . Do ( )
2017-10-11 04:02:16 +02:00
if err != nil {
2018-03-09 17:47:38 +01:00
return errors . Prefix ( "error getting channels" , err )
2017-10-11 04:02:16 +02:00
}
if len ( response . Items ) < 1 {
2018-03-09 17:47:38 +01:00
return errors . Err ( "youtube channel not found" )
2017-10-11 04:02:16 +02:00
}
if response . Items [ 0 ] . ContentDetails . RelatedPlaylists == nil {
2018-03-09 17:47:38 +01:00
return errors . Err ( "no related playlists" )
2017-10-11 04:02:16 +02:00
}
playlistID := response . Items [ 0 ] . ContentDetails . RelatedPlaylists . Uploads
if playlistID == "" {
2018-03-09 17:47:38 +01:00
return errors . Err ( "no channel playlist" )
2017-10-11 04:02:16 +02:00
}
2017-12-28 18:14:33 +01:00
var videos [ ] video
2017-10-11 04:02:16 +02:00
nextPageToken := ""
for {
req := service . PlaylistItems . List ( "snippet" ) .
PlaylistId ( playlistID ) .
MaxResults ( 50 ) .
PageToken ( nextPageToken )
playlistResponse , err := req . Do ( )
if err != nil {
2018-03-09 17:47:38 +01:00
return errors . Prefix ( "error getting playlist items" , err )
2017-10-11 04:02:16 +02:00
}
if len ( playlistResponse . Items ) < 1 {
2018-03-09 17:47:38 +01:00
return errors . Err ( "playlist items not found" )
2017-10-11 04:02:16 +02:00
}
for _ , item := range playlistResponse . Items {
// normally we'd send the video into the channel here, but youtube api doesn't have sorting
// so we have to get ALL the videos, then sort them, then send them in
2017-12-28 18:14:33 +01:00
videos = append ( videos , sources . NewYoutubeVideo ( s . videoDirectory , item . Snippet ) )
2017-10-11 04:02:16 +02:00
}
2017-12-28 18:14:33 +01:00
log . Infof ( "Got info for %d videos from youtube API" , len ( videos ) )
2017-10-11 04:02:16 +02:00
nextPageToken = playlistResponse . NextPageToken
if nextPageToken == "" {
break
}
}
sort . Sort ( byPublishedAt ( videos ) )
//or sort.Sort(sort.Reverse(byPlaylistPosition(videos)))
2017-10-11 19:13:47 +02:00
Enqueue :
2017-10-11 04:02:16 +02:00
for _ , v := range videos {
select {
2018-06-25 22:13:28 +02:00
case <- s . grp . Ch ( ) :
2017-12-28 18:14:33 +01:00
break Enqueue
default :
}
select {
case s . queue <- v :
2018-06-25 22:13:28 +02:00
case <- s . grp . Ch ( ) :
2017-10-11 19:13:47 +02:00
break Enqueue
2017-10-11 04:02:16 +02:00
}
}
return nil
}
2018-02-13 18:47:05 +01:00
func ( s * Sync ) enqueueUCBVideos ( ) error {
var videos [ ] video
csvFile , err := os . Open ( "ucb.csv" )
if err != nil {
return err
}
reader := csv . NewReader ( bufio . NewReader ( csvFile ) )
for {
line , err := reader . Read ( )
if err == io . EOF {
break
} else if err != nil {
return err
}
data := struct {
PublishedAt string ` json:"publishedAt" `
} { }
err = json . Unmarshal ( [ ] byte ( line [ 4 ] ) , & data )
if err != nil {
return err
}
videos = append ( videos , sources . NewUCBVideo ( line [ 0 ] , line [ 2 ] , line [ 1 ] , line [ 3 ] , data . PublishedAt , s . videoDirectory ) )
}
log . Printf ( "Publishing %d videos\n" , len ( videos ) )
sort . Sort ( byPublishedAt ( videos ) )
Enqueue :
for _ , v := range videos {
select {
2018-06-25 22:13:28 +02:00
case <- s . grp . Ch ( ) :
2018-02-13 18:47:05 +01:00
break Enqueue
default :
}
select {
case s . queue <- v :
2018-06-25 22:13:28 +02:00
case <- s . grp . Ch ( ) :
2018-02-13 18:47:05 +01:00
break Enqueue
}
}
return nil
}
2018-03-09 17:47:38 +01:00
func ( s * Sync ) processVideo ( v video ) ( err error ) {
defer func ( ) {
if p := recover ( ) ; p != nil {
var ok bool
err , ok = p . ( error )
if ! ok {
err = errors . Err ( "%v" , p )
}
err = errors . Wrap ( p , 2 )
}
} ( )
2017-12-28 18:14:33 +01:00
log . Println ( "Processing " + v . IDAndNum ( ) )
2017-11-02 16:20:22 +01:00
defer func ( start time . Time ) {
2017-12-28 18:14:33 +01:00
log . Println ( v . ID ( ) + " took " + time . Since ( start ) . String ( ) )
2017-11-02 16:20:22 +01:00
} ( time . Now ( ) )
2017-10-11 04:02:16 +02:00
2018-08-14 16:48:55 +02:00
s . syncedVideosMux . Lock ( )
2018-07-31 01:19:12 +02:00
sv , ok := s . syncedVideos [ v . ID ( ) ]
2018-08-14 16:48:55 +02:00
s . syncedVideosMux . Unlock ( )
2018-07-31 01:19:12 +02:00
alreadyPublished := ok && sv . Published
2018-08-02 14:05:06 +02:00
neverRetryFailures := [ ] string {
"Error extracting sts from embedded url response" ,
"the video is too big to sync, skipping for now" ,
}
if ok && ! sv . Published && util . SubstringInSlice ( sv . FailureReason , neverRetryFailures ) {
log . Println ( v . ID ( ) + " can't ever be published" )
2018-07-31 01:19:12 +02:00
return nil
2017-10-11 04:02:16 +02:00
}
2018-08-02 14:05:06 +02:00
//TODO: remove this after a few runs...
2018-07-31 19:42:20 +02:00
alreadyPublishedOld , err := s . db . IsPublished ( v . ID ( ) )
if err != nil {
return err
}
2018-08-02 14:05:06 +02:00
//TODO: remove this after a few runs...
2018-07-31 19:42:20 +02:00
if alreadyPublishedOld && ! alreadyPublished {
//seems like something in the migration of blobs didn't go perfectly right so warn about it!
SendInfoToSlack ( "A video that was previously published is on the local database but isn't on the remote db! fix it @Nikooo777! \nchannelID: %s, videoID: %s" ,
s . YoutubeChannelID , v . ID ( ) )
return nil
}
2017-12-28 18:14:33 +01:00
if alreadyPublished {
log . Println ( v . ID ( ) + " already published" )
return nil
2017-10-11 04:02:16 +02:00
}
2018-08-01 14:56:04 +02:00
if v . PlaylistPosition ( ) > s . Manager . VideosLimit {
2018-04-25 23:06:17 +02:00
log . Println ( v . ID ( ) + " is old: skipping" )
return nil
}
2018-08-17 20:05:39 +02:00
err = s . Manager . checkUsedSpace ( )
if err != nil {
return err
}
2018-08-03 23:19:36 +02:00
summary , err := v . Sync ( s . daemon , s . claimAddress , publishAmount , s . lbryChannelID , s . Manager . MaxVideoSize )
2017-10-11 04:02:16 +02:00
if err != nil {
return err
}
2018-07-25 19:08:28 +02:00
err = s . Manager . MarkVideoStatus ( s . YoutubeChannelID , v . ID ( ) , VideoStatusPublished , summary . ClaimID , summary . ClaimName , "" )
2017-10-11 04:02:16 +02:00
if err != nil {
return err
}
2018-08-03 19:21:42 +02:00
s . AppendSyncedVideo ( v . ID ( ) , true , "" )
2017-10-11 04:02:16 +02:00
return nil
}
2017-11-03 13:46:27 +01:00
2017-12-28 18:14:33 +01:00
func startDaemonViaSystemd ( ) error {
2017-11-03 13:46:27 +01:00
err := exec . Command ( "/usr/bin/sudo" , "/bin/systemctl" , "start" , "lbrynet.service" ) . Run ( )
if err != nil {
2018-03-09 17:47:38 +01:00
return errors . Err ( err )
2017-11-03 13:46:27 +01:00
}
return nil
}
2017-12-28 18:14:33 +01:00
func stopDaemonViaSystemd ( ) error {
2017-11-03 13:46:27 +01:00
err := exec . Command ( "/usr/bin/sudo" , "/bin/systemctl" , "stop" , "lbrynet.service" ) . Run ( )
if err != nil {
2018-03-09 17:47:38 +01:00
return errors . Err ( err )
2017-11-03 13:46:27 +01:00
}
return nil
}
2017-12-30 01:21:16 +01:00
2018-04-20 21:06:55 +02:00
// waitForDaemonProcess observes the running processes and returns when the process is no longer running or when the timeout is up
func waitForDaemonProcess ( timeout time . Duration ) error {
processes , err := ps . Processes ( )
if err != nil {
return err
}
var daemonProcessId = - 1
for _ , p := range processes {
if p . Executable ( ) == "lbrynet-daemon" {
daemonProcessId = p . Pid ( )
break
}
}
if daemonProcessId == - 1 {
return nil
}
then := time . Now ( )
stopTime := then . Add ( time . Duration ( timeout * time . Second ) )
for ! time . Now ( ) . After ( stopTime ) {
wait := 10 * time . Second
log . Println ( "the daemon is still running, waiting for it to exit" )
time . Sleep ( wait )
proc , err := os . FindProcess ( daemonProcessId )
if err != nil {
// couldn't find the process, that means the daemon is stopped and can continue
return nil
}
//double check if process is running and alive
//by sending a signal 0
//NOTE : syscall.Signal is not available in Windows
err = proc . Signal ( syscall . Signal ( 0 ) )
//the process doesn't exist anymore! we're free to go
2018-04-24 20:50:01 +02:00
if err != nil && ( err == syscall . ESRCH || err . Error ( ) == "os: process already finished" ) {
2018-04-20 21:06:55 +02:00
return nil
}
}
return errors . Err ( "timeout reached" )
}