ytsync/manager/ytsync.go

907 lines
24 KiB
Go
Raw Normal View History

package manager
2017-10-11 04:02:16 +02:00
import (
2018-02-13 18:47:05 +01:00
"bufio"
"encoding/csv"
"encoding/json"
"fmt"
2018-02-13 18:47:05 +01:00
"io"
2017-10-11 04:02:16 +02:00
"io/ioutil"
"net/http"
"os"
"os/exec"
"os/signal"
2018-10-09 21:57:07 +02:00
"runtime/debug"
2017-10-11 04:02:16 +02:00
"sort"
"strings"
"sync"
"syscall"
2017-10-11 04:02:16 +02:00
"time"
"github.com/lbryio/ytsync/namer"
"github.com/lbryio/ytsync/sdk"
"github.com/lbryio/ytsync/sources"
2019-01-11 02:34:34 +01:00
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/jsonrpc"
"github.com/lbryio/lbry.go/extras/stop"
"github.com/lbryio/lbry.go/extras/util"
2018-09-26 06:08:18 +02:00
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
2018-06-13 18:44:09 +02:00
"github.com/mitchellh/go-ps"
2017-10-11 04:02:16 +02:00
log "github.com/sirupsen/logrus"
"google.golang.org/api/googleapi/transport"
"google.golang.org/api/youtube/v3"
2017-10-11 04:02:16 +02:00
)
const (
2018-08-01 14:56:04 +02:00
channelClaimAmount = 0.01
publishAmount = 0.01
maxReasonLength = 500
2017-10-11 04:02:16 +02:00
)
type video interface {
2018-08-14 17:09:23 +02:00
Size() *int64
ID() string
IDAndNum() string
PlaylistPosition() int
PublishedAt() time.Time
Sync(*jsonrpc.Client, string, float64, string, int, *namer.Namer, float64) (*sources.SyncSummary, error)
}
// sorting videos
type byPublishedAt []video
func (a byPublishedAt) Len() int { return len(a) }
func (a byPublishedAt) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[j].PublishedAt()) }
2017-10-11 04:02:16 +02:00
// Sync stores the options that control how syncing happens
type Sync struct {
2018-09-26 06:08:18 +02:00
APIConfig *sdk.APIConfig
2017-11-06 22:42:52 +01:00
YoutubeChannelID string
LbryChannelName string
StopOnError bool
MaxTries int
ConcurrentVideos int
TakeOverExistingChannel bool
Refill int
Manager *SyncManager
LbrycrdString string
AwsS3ID string
AwsS3Secret string
AwsS3Region string
AwsS3Bucket string
2017-10-11 04:02:16 +02:00
daemon *jsonrpc.Client
claimAddress string
videoDirectory string
syncedVideosMux *sync.RWMutex
2018-09-26 06:08:18 +02:00
syncedVideos map[string]sdk.SyncedVideo
grp *stop.Group
lbryChannelID string
2018-09-18 22:57:25 +02:00
namer *namer.Namer
walletMux *sync.Mutex
queue chan video
}
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) {
s.syncedVideosMux.Lock()
defer s.syncedVideosMux.Unlock()
2018-09-26 06:08:18 +02:00
s.syncedVideos[videoID] = sdk.SyncedVideo{
VideoID: videoID,
Published: published,
FailureReason: failureReason,
}
}
2018-07-24 02:01:35 +02:00
// SendErrorToSlack Sends an error message to the default channel and to the process log.
func SendErrorToSlack(format string, a ...interface{}) {
2018-07-24 02:01:35 +02:00
message := format
if len(a) > 0 {
message = fmt.Sprintf(format, a...)
}
log.Errorln(message)
err := util.SendToSlack(":sos: " + message)
if err != nil {
log.Errorln(err)
}
2018-07-24 02:01:35 +02:00
}
// SendInfoToSlack Sends an info message to the default channel and to the process log.
func SendInfoToSlack(format string, a ...interface{}) {
2018-07-24 02:01:35 +02:00
message := format
if len(a) > 0 {
message = fmt.Sprintf(format, a...)
}
log.Infoln(message)
err := util.SendToSlack(":information_source: " + message)
if err != nil {
log.Errorln(err)
}
2018-07-24 02:01:35 +02:00
}
// IsInterrupted can be queried to discover if the sync process was interrupted manually
func (s *Sync) IsInterrupted() bool {
select {
2018-07-12 14:28:20 +02:00
case <-s.grp.Ch():
return true
default:
return false
}
}
func (s *Sync) downloadWallet() error {
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
defaultTempWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/tmp_wallet"
key := aws.String("/wallets/" + s.YoutubeChannelID)
if os.Getenv("REGTEST") == "true" {
defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
defaultTempWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/tmp_wallet"
key = aws.String("/regtest/" + s.YoutubeChannelID)
}
if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) {
return errors.Err("default_wallet already exists")
}
creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "")
s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds})
if err != nil {
return err
}
downloader := s3manager.NewDownloader(s3Session)
out, err := os.Create(defaultTempWalletDir)
if err != nil {
return err
}
defer out.Close()
bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{
Bucket: aws.String(s.AwsS3Bucket),
Key: key,
})
if err != nil {
// Casting to the awserr.Error type will allow you to inspect the error
// code returned by the service in code. The error code can be used
// to switch on context specific functionality. In this case a context
// specific error message is printed to the user based on the bucket
// and key existing.
//
// For information on other S3 API error codes see:
// http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
if aerr, ok := err.(awserr.Error); ok {
code := aerr.Code()
if code == s3.ErrCodeNoSuchKey {
return errors.Err("wallet not on S3")
}
}
return err
} else if bytesWritten == 0 {
return errors.Err("zero bytes written")
}
return os.Rename(defaultTempWalletDir, defaultWalletDir)
}
func (s *Sync) uploadWallet() error {
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
key := aws.String("/wallets/" + s.YoutubeChannelID)
if os.Getenv("REGTEST") == "true" {
defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
key = aws.String("/regtest/" + s.YoutubeChannelID)
}
if _, err := os.Stat(defaultWalletDir); os.IsNotExist(err) {
return errors.Err("default_wallet does not exist")
}
creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "")
s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds})
if err != nil {
return err
}
uploader := s3manager.NewUploader(s3Session)
file, err := os.Open(defaultWalletDir)
if err != nil {
return err
}
defer file.Close()
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.AwsS3Bucket),
Key: key,
Body: file,
})
if err != nil {
return err
}
return os.Remove(defaultWalletDir)
}
func (s *Sync) setStatusSyncing() error {
2018-09-26 06:08:18 +02:00
syncedVideos, claimNames, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
if err != nil {
return err
}
s.syncedVideosMux.Lock()
s.syncedVideos = syncedVideos
2018-10-09 21:57:07 +02:00
s.namer.SetNames(claimNames)
s.syncedVideosMux.Unlock()
return nil
}
func (s *Sync) setExceptions() {
if s.YoutubeChannelID == "UCwjQfNRW6sGYb__pd7d4nUg" { //@FreeTalkLive
s.Manager.maxVideoLength = 0.0 // skips max length checks
s.Manager.maxVideoSize = 0
}
}
func (s *Sync) FullCycle() (e error) {
2017-11-03 16:46:19 +01:00
if os.Getenv("HOME") == "" {
return errors.Err("no $HOME env var found")
2017-11-03 16:46:19 +01:00
}
if s.YoutubeChannelID == "" {
return errors.Err("channel ID not provided")
}
s.setExceptions()
s.syncedVideosMux = &sync.RWMutex{}
s.walletMux = &sync.Mutex{}
s.grp = stop.New()
s.queue = make(chan video)
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(interruptChan)
go func() {
<-interruptChan
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
s.grp.Stop()
}()
err := s.setStatusSyncing()
if err != nil {
return err
}
defer s.setChannelTerminationStatus(&e)
2017-11-03 16:46:19 +01:00
err = s.downloadWallet()
if err != nil && err.Error() != "wallet not on S3" {
return errors.Prefix("failure in downloading wallet", err)
} else if err == nil {
log.Println("Continuing previous upload")
} else {
log.Println("Starting new wallet")
}
defer s.stopAndUploadWallet(&e)
s.videoDirectory, err = ioutil.TempDir("", "ytsync")
if err != nil {
return errors.Wrap(err, 0)
}
log.Printf("Starting daemon")
err = startDaemonViaSystemd()
if err != nil {
return err
}
log.Infoln("Waiting for daemon to finish starting...")
s.daemon = jsonrpc.NewClient("")
s.daemon.SetRPCTimeout(40 * time.Minute)
err = s.waitForDaemonStart()
if err != nil {
return err
}
err = s.doSync()
if err != nil {
return err
} else {
// wait for reflection to finish???
wait := 15 * time.Second // should bump this up to a few min, but keeping it low for testing
log.Println("Waiting " + wait.String() + " to finish reflecting everything")
time.Sleep(wait)
}
return nil
}
func (s *Sync) setChannelTerminationStatus(e *error) {
if *e != nil {
//conditions for which a channel shouldn't be marked as failed
noFailConditions := []string{
"this youtube channel is being managed by another server",
"interrupted during daemon startup",
}
if util.SubstringInSlice((*e).Error(), noFailConditions) {
return
}
failureReason := (*e).Error()
2018-09-26 06:08:18 +02:00
_, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s", s.LbryChannelName)
*e = errors.Prefix(msg+err.Error(), *e)
}
} else if !s.IsInterrupted() {
2018-09-26 06:08:18 +02:00
_, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSynced, "")
if err != nil {
*e = err
}
}
}
func (s *Sync) waitForDaemonStart() error {
for {
select {
case <-s.grp.Ch():
return errors.Err("interrupted during daemon startup")
default:
2018-08-20 21:26:52 +02:00
s, err := s.daemon.Status()
if err == nil && s.StartupStatus.Wallet && s.IsRunning {
return nil
}
time.Sleep(5 * time.Second)
}
}
}
2018-09-21 16:26:27 +02:00
func (s *Sync) stopAndUploadWallet(e *error) {
log.Printf("Stopping daemon")
shutdownErr := stopDaemonViaSystemd()
if shutdownErr != nil {
logShutdownError(shutdownErr)
} else {
// the cli will return long before the daemon effectively stops. we must observe the processes running
// before moving the wallet
waitTimeout := 8 * time.Minute
processDeathError := waitForDaemonProcess(waitTimeout)
if processDeathError != nil {
logShutdownError(processDeathError)
} else {
err := s.uploadWallet()
if err != nil {
if *e == nil {
e = &err
return
} else {
*e = errors.Prefix("failure uploading wallet", *e)
}
}
}
}
}
func logShutdownError(shutdownErr error) {
2018-07-24 02:01:35 +02:00
SendErrorToSlack("error shutting down daemon: %v", shutdownErr)
SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
}
func isYtsyncClaim(c jsonrpc.Claim) bool {
2019-01-30 13:42:23 +01:00
if !util.InSlice(c.Type, []string{"claim", "update"}) || c.Value.Stream == nil {
return false
}
if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil {
//most likely a claim created outside of ytsync, ignore!
return false
}
return strings.Contains(*c.Value.Stream.Metadata.Thumbnail, "https://berk.ninja/thumbnails/")
}
// fixDupes abandons duplicate claims
func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) {
abandonedClaims := false
videoIDs := make(map[string]jsonrpc.Claim)
for _, c := range claims {
if !isYtsyncClaim(c) {
continue
}
tn := *c.Value.Stream.Metadata.Thumbnail
2018-09-03 23:38:16 +02:00
videoID := tn[strings.LastIndex(tn, "/")+1:]
log.Infof("claimid: %s, claimName: %s, videoID: %s", c.ClaimID, c.Name, videoID)
cl, ok := videoIDs[videoID]
if !ok || cl.ClaimID == c.ClaimID {
videoIDs[videoID] = c
continue
}
// only keep the most recent one
claimToAbandon := c
videoIDs[videoID] = cl
if c.Height > cl.Height {
claimToAbandon = cl
videoIDs[videoID] = c
}
log.Debugf("abandoning %+v", claimToAbandon)
2019-01-30 13:42:23 +01:00
_, err := s.daemon.ClaimAbandon(claimToAbandon.Txid, claimToAbandon.Nout, nil, false)
if err != nil {
return true, err
}
abandonedClaims = true
//return true, nil
}
return abandonedClaims, nil
}
//updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published
func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err error) {
count := 0
for _, c := range claims {
if !isYtsyncClaim(c) {
continue
}
count++
//check if claimID is in remote db
tn := *c.Value.Stream.Metadata.Thumbnail
videoID := tn[strings.LastIndex(tn, "/")+1:]
pv, ok := s.syncedVideos[videoID]
if !ok || pv.ClaimName != c.Name {
fixed++
log.Debugf("adding %s to the database", c.Name)
2018-09-26 06:08:18 +02:00
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil)
if err != nil {
return count, fixed, err
}
}
}
return count, fixed, nil
}
2019-01-30 13:42:23 +01:00
func (s *Sync) getClaims() ([]jsonrpc.Claim, error) {
totalPages := uint64(1)
var allClaims []jsonrpc.Claim
for page := uint64(1); page <= totalPages; page++ {
claims, err := s.daemon.ClaimListMine(nil, page, 50)
if err != nil {
return nil, errors.Prefix("cannot list claims", err)
}
allClaims = append(allClaims, (*claims).Claims...)
totalPages = (*claims).TotalPages
}
return allClaims, nil
}
func (s *Sync) doSync() error {
2017-10-11 19:13:47 +02:00
var err error
err = s.walletSetup()
if err != nil {
return errors.Prefix("Initial wallet setup failed! Manual Intervention is required.", err)
}
2019-01-30 13:42:23 +01:00
allClaims, err := s.getClaims()
if err != nil {
2019-01-30 13:42:23 +01:00
return err
}
2019-01-30 13:42:23 +01:00
hasDupes, err := s.fixDupes(allClaims)
if err != nil {
return errors.Prefix("error checking for duplicates", err)
}
if hasDupes {
SendInfoToSlack("Channel had dupes and was fixed!")
2018-09-21 16:26:27 +02:00
err = s.waitForNewBlock()
if err != nil {
return err
}
2019-01-30 13:42:23 +01:00
allClaims, err = s.getClaims()
if err != nil {
2019-01-30 13:42:23 +01:00
return err
}
}
2019-01-30 13:42:23 +01:00
pubsOnWallet, nFixed, err := s.updateRemoteDB(allClaims)
if err != nil {
return errors.Prefix("error counting claims", err)
}
if nFixed > 0 {
err := s.setStatusSyncing()
if err != nil {
return err
}
SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed)
}
pubsOnDB := 0
for _, sv := range s.syncedVideos {
if sv.Published {
pubsOnDB++
}
}
if pubsOnWallet > pubsOnDB { //This case should never happen
SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
return errors.Err("not all published videos are in the database")
}
if pubsOnWallet < pubsOnDB {
SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
}
2017-10-11 19:13:47 +02:00
2017-10-11 04:02:16 +02:00
if s.StopOnError {
log.Println("Will stop publishing if an error is detected")
}
for i := 0; i < s.ConcurrentVideos; i++ {
s.grp.Add(1)
go func(i int) {
defer s.grp.Done()
s.startWorker(i)
}(i)
2017-10-11 04:02:16 +02:00
}
2018-02-13 18:47:05 +01:00
if s.LbryChannelName == "@UCBerkeley" {
err = s.enqueueUCBVideos()
} else {
err = s.enqueueYoutubeVideos()
}
close(s.queue)
s.grp.Wait()
2017-10-11 04:02:16 +02:00
return err
}
func (s *Sync) startWorker(workerNum int) {
var v video
var more bool
for {
select {
case <-s.grp.Ch():
log.Printf("Stopping worker %d", workerNum)
return
default:
}
select {
case v, more = <-s.queue:
if !more {
return
}
case <-s.grp.Ch():
log.Printf("Stopping worker %d", workerNum)
return
2017-10-11 04:02:16 +02:00
}
2018-02-13 18:47:05 +01:00
log.Println("================================================================================")
2017-10-11 04:02:16 +02:00
tryCount := 0
for {
tryCount++
err := s.processVideo(v)
2017-10-11 04:02:16 +02:00
if err != nil {
2018-12-28 16:28:01 +01:00
logMsg := "error processing video: " + err.Error()
log.Errorln(logMsg)
fatalErrors := []string{
":5279: read: connection reset by peer",
"no space left on device",
"NotEnoughFunds",
"Cannot publish using channel",
"cannot concatenate 'str' and 'NoneType' objects",
"more than 90% of the space has been used.",
}
2018-07-24 02:01:35 +02:00
if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError {
s.grp.Stop()
} else if s.MaxTries > 1 {
errorsNoRetry := []string{
"non 200 status code received",
" reason: 'This video contains content from",
"dont know which claim to update",
"uploader has not made this video available in your country",
"download error: AccessDenied: Access Denied",
"Playback on other websites has been disabled by the video owner",
"Error in daemon: Cannot publish empty file",
"Error extracting sts from embedded url response",
2018-08-29 13:33:02 +02:00
"Unable to extract signature tokens",
"Client.Timeout exceeded while awaiting headers)",
2018-08-01 14:56:04 +02:00
"the video is too big to sync, skipping for now",
"video is too long to process",
"no compatible format available for this video",
"Watch this video on YouTube.",
"have blocked it on copyright grounds",
}
2018-07-24 02:01:35 +02:00
if util.SubstringInSlice(err.Error(), errorsNoRetry) {
log.Println("This error should not be retried at all")
} else if tryCount < s.MaxTries {
if strings.Contains(err.Error(), "txn-mempool-conflict") ||
strings.Contains(err.Error(), "too-long-mempool-chain") {
2018-08-10 02:04:39 +02:00
log.Println("waiting for a block before retrying")
err = s.waitForNewBlock()
if err != nil {
s.grp.Stop()
SendErrorToSlack("something went wrong while waiting for a block: %v", err)
break
}
} else if util.SubstringInSlice(err.Error(), []string{
"Not enough funds to cover this transaction",
"failed: Not enough funds",
"Error in daemon: Insufficient funds, please deposit additional LBC"}) {
2018-08-10 02:04:39 +02:00
log.Println("refilling addresses before retrying")
err = s.walletSetup()
if err != nil {
2018-07-12 14:28:20 +02:00
s.grp.Stop()
2018-08-10 02:04:39 +02:00
SendErrorToSlack("failed to setup the wallet for a refill: %v", err)
break
}
}
log.Println("Retrying")
continue
}
2018-07-24 02:01:35 +02:00
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
}
s.AppendSyncedVideo(v.ID(), false, err.Error(), "")
2018-09-26 06:08:18 +02:00
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
if err != nil {
2018-07-24 02:01:35 +02:00
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}
}
break
2017-11-06 22:42:52 +01:00
}
2017-10-11 04:02:16 +02:00
}
}
2018-02-13 18:47:05 +01:00
func (s *Sync) enqueueYoutubeVideos() error {
2017-10-11 04:02:16 +02:00
client := &http.Client{
2018-09-26 06:08:18 +02:00
Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey},
2017-10-11 04:02:16 +02:00
}
service, err := youtube.New(client)
if err != nil {
return errors.Prefix("error creating YouTube service", err)
2017-10-11 04:02:16 +02:00
}
response, err := service.Channels.List("contentDetails").Id(s.YoutubeChannelID).Do()
2017-10-11 04:02:16 +02:00
if err != nil {
return errors.Prefix("error getting channels", err)
2017-10-11 04:02:16 +02:00
}
if len(response.Items) < 1 {
return errors.Err("youtube channel not found")
2017-10-11 04:02:16 +02:00
}
if response.Items[0].ContentDetails.RelatedPlaylists == nil {
return errors.Err("no related playlists")
2017-10-11 04:02:16 +02:00
}
playlistID := response.Items[0].ContentDetails.RelatedPlaylists.Uploads
if playlistID == "" {
return errors.Err("no channel playlist")
2017-10-11 04:02:16 +02:00
}
var videos []video
2017-10-11 04:02:16 +02:00
nextPageToken := ""
for {
req := service.PlaylistItems.List("snippet").
PlaylistId(playlistID).
MaxResults(50).
PageToken(nextPageToken)
playlistResponse, err := req.Do()
if err != nil {
return errors.Prefix("error getting playlist items", err)
2017-10-11 04:02:16 +02:00
}
if len(playlistResponse.Items) < 1 {
2019-01-11 20:15:17 +01:00
// If there are 50+ videos in a playlist but less than 50 are actually returned by the API, youtube will still redirect
// clients to a next page. Such next page will however be empty. This logic prevents ytsync from failing.
youtubeIsLying := len(videos) > 0
if youtubeIsLying {
break
}
return errors.Err("playlist items not found")
2017-10-11 04:02:16 +02:00
}
for _, item := range playlistResponse.Items {
// normally we'd send the video into the channel here, but youtube api doesn't have sorting
// so we have to get ALL the videos, then sort them, then send them in
videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item.Snippet))
2017-10-11 04:02:16 +02:00
}
log.Infof("Got info for %d videos from youtube API", len(videos))
2017-10-11 04:02:16 +02:00
nextPageToken = playlistResponse.NextPageToken
if nextPageToken == "" {
break
}
}
sort.Sort(byPublishedAt(videos))
//or sort.Sort(sort.Reverse(byPlaylistPosition(videos)))
2017-10-11 19:13:47 +02:00
Enqueue:
2017-10-11 04:02:16 +02:00
for _, v := range videos {
select {
case <-s.grp.Ch():
break Enqueue
default:
}
select {
case s.queue <- v:
case <-s.grp.Ch():
2017-10-11 19:13:47 +02:00
break Enqueue
2017-10-11 04:02:16 +02:00
}
}
return nil
}
2018-02-13 18:47:05 +01:00
func (s *Sync) enqueueUCBVideos() error {
var videos []video
csvFile, err := os.Open("ucb.csv")
if err != nil {
return err
}
reader := csv.NewReader(bufio.NewReader(csvFile))
for {
line, err := reader.Read()
if err == io.EOF {
break
} else if err != nil {
return err
}
data := struct {
PublishedAt string `json:"publishedAt"`
}{}
err = json.Unmarshal([]byte(line[4]), &data)
if err != nil {
return err
}
videos = append(videos, sources.NewUCBVideo(line[0], line[2], line[1], line[3], data.PublishedAt, s.videoDirectory))
}
log.Printf("Publishing %d videos\n", len(videos))
sort.Sort(byPublishedAt(videos))
Enqueue:
for _, v := range videos {
select {
case <-s.grp.Ch():
2018-02-13 18:47:05 +01:00
break Enqueue
default:
}
select {
case s.queue <- v:
case <-s.grp.Ch():
2018-02-13 18:47:05 +01:00
break Enqueue
}
}
return nil
}
func (s *Sync) processVideo(v video) (err error) {
defer func() {
if p := recover(); p != nil {
2018-10-09 21:57:07 +02:00
log.Printf("stack: %s", debug.Stack())
var ok bool
err, ok = p.(error)
if !ok {
err = errors.Err("%v", p)
}
err = errors.Wrap(p, 2)
}
}()
log.Println("Processing " + v.IDAndNum())
defer func(start time.Time) {
log.Println(v.ID() + " took " + time.Since(start).String())
}(time.Now())
2017-10-11 04:02:16 +02:00
s.syncedVideosMux.RLock()
sv, ok := s.syncedVideos[v.ID()]
s.syncedVideosMux.RUnlock()
alreadyPublished := ok && sv.Published
2018-08-02 14:05:06 +02:00
neverRetryFailures := []string{
"Error extracting sts from embedded url response",
2018-08-29 13:33:02 +02:00
"Unable to extract signature tokens",
2018-08-02 14:05:06 +02:00
"the video is too big to sync, skipping for now",
"video is too long to process",
"no compatible format available for this video",
"Watch this video on YouTube.",
"have blocked it on copyright grounds",
2018-08-02 14:05:06 +02:00
}
if ok && !sv.Published && util.SubstringInSlice(sv.FailureReason, neverRetryFailures) {
log.Println(v.ID() + " can't ever be published")
return nil
2017-10-11 04:02:16 +02:00
}
if alreadyPublished {
log.Println(v.ID() + " already published")
return nil
2017-10-11 04:02:16 +02:00
}
2018-09-26 06:08:18 +02:00
if v.PlaylistPosition() > s.Manager.videosLimit {
2018-04-25 23:06:17 +02:00
log.Println(v.ID() + " is old: skipping")
return nil
}
err = s.Manager.checkUsedSpace()
if err != nil {
return err
}
2018-09-18 21:20:34 +02:00
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.maxVideoSize, s.namer, s.Manager.maxVideoLength)
2017-10-11 04:02:16 +02:00
if err != nil {
return err
}
2018-09-18 21:20:34 +02:00
2018-09-26 06:08:18 +02:00
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
2017-10-11 04:02:16 +02:00
if err != nil {
2018-08-14 17:09:23 +02:00
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
2017-10-11 04:02:16 +02:00
}
2018-08-14 17:09:23 +02:00
s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName)
2017-10-11 04:02:16 +02:00
return nil
}
func startDaemonViaSystemd() error {
err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "start", "lbrynet.service").Run()
if err != nil {
return errors.Err(err)
}
return nil
}
func stopDaemonViaSystemd() error {
err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "stop", "lbrynet.service").Run()
if err != nil {
return errors.Err(err)
}
return nil
}
// waitForDaemonProcess observes the running processes and returns when the process is no longer running or when the timeout is up
func waitForDaemonProcess(timeout time.Duration) error {
processes, err := ps.Processes()
if err != nil {
return err
}
var daemonProcessId = -1
for _, p := range processes {
2019-01-30 13:42:23 +01:00
if p.Executable() == "lbrynet" {
daemonProcessId = p.Pid()
break
}
}
if daemonProcessId == -1 {
return nil
}
then := time.Now()
stopTime := then.Add(time.Duration(timeout * time.Second))
for !time.Now().After(stopTime) {
wait := 10 * time.Second
log.Println("the daemon is still running, waiting for it to exit")
time.Sleep(wait)
proc, err := os.FindProcess(daemonProcessId)
if err != nil {
// couldn't find the process, that means the daemon is stopped and can continue
return nil
}
//double check if process is running and alive
//by sending a signal 0
//NOTE : syscall.Signal is not available in Windows
err = proc.Signal(syscall.Signal(0))
//the process doesn't exist anymore! we're free to go
2018-04-24 20:50:01 +02:00
if err != nil && (err == syscall.ESRCH || err.Error() == "os: process already finished") {
return nil
}
}
return errors.Err("timeout reached")
}