2017-10-11 04:02:16 +02:00
|
|
|
package ytsync
|
|
|
|
|
|
|
|
import (
|
2018-02-13 18:47:05 +01:00
|
|
|
"bufio"
|
|
|
|
"encoding/csv"
|
2017-12-30 01:21:16 +01:00
|
|
|
"encoding/json"
|
2018-05-17 01:42:06 +02:00
|
|
|
"fmt"
|
2018-02-13 18:47:05 +01:00
|
|
|
"io"
|
2017-10-11 04:02:16 +02:00
|
|
|
"io/ioutil"
|
|
|
|
"net/http"
|
|
|
|
"os"
|
2017-11-03 13:46:27 +01:00
|
|
|
"os/exec"
|
2017-12-28 18:14:33 +01:00
|
|
|
"os/signal"
|
2017-10-11 04:02:16 +02:00
|
|
|
"sort"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
2017-12-28 18:14:33 +01:00
|
|
|
"syscall"
|
2017-10-11 04:02:16 +02:00
|
|
|
"time"
|
|
|
|
|
2018-03-09 17:47:38 +01:00
|
|
|
"github.com/lbryio/lbry.go/errors"
|
2017-10-11 04:02:16 +02:00
|
|
|
"github.com/lbryio/lbry.go/jsonrpc"
|
2018-06-25 22:13:28 +02:00
|
|
|
"github.com/lbryio/lbry.go/stop"
|
2018-05-26 02:43:16 +02:00
|
|
|
"github.com/lbryio/lbry.go/util"
|
2017-12-28 18:14:33 +01:00
|
|
|
"github.com/lbryio/lbry.go/ytsync/redisdb"
|
|
|
|
"github.com/lbryio/lbry.go/ytsync/sources"
|
2018-06-13 18:44:09 +02:00
|
|
|
"github.com/mitchellh/go-ps"
|
2017-10-11 04:02:16 +02:00
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"google.golang.org/api/googleapi/transport"
|
2018-06-09 01:14:55 +02:00
|
|
|
"google.golang.org/api/youtube/v3"
|
2017-10-11 04:02:16 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
2017-10-18 02:35:19 +02:00
|
|
|
channelClaimAmount = 0.01
|
|
|
|
publishAmount = 0.01
|
2017-10-11 04:02:16 +02:00
|
|
|
)
|
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
type video interface {
|
|
|
|
ID() string
|
|
|
|
IDAndNum() string
|
2018-04-25 20:56:26 +02:00
|
|
|
PlaylistPosition() int
|
2017-12-28 18:14:33 +01:00
|
|
|
PublishedAt() time.Time
|
|
|
|
Sync(*jsonrpc.Client, string, float64, string) error
|
|
|
|
}
|
|
|
|
|
|
|
|
// sorting videos
|
|
|
|
type byPublishedAt []video
|
|
|
|
|
|
|
|
func (a byPublishedAt) Len() int { return len(a) }
|
|
|
|
func (a byPublishedAt) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
|
|
func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[j].PublishedAt()) }
|
|
|
|
|
2017-10-11 04:02:16 +02:00
|
|
|
// Sync stores the options that control how syncing happens
|
|
|
|
type Sync struct {
|
2017-11-06 22:42:52 +01:00
|
|
|
YoutubeAPIKey string
|
|
|
|
YoutubeChannelID string
|
|
|
|
LbryChannelName string
|
|
|
|
StopOnError bool
|
|
|
|
MaxTries int
|
|
|
|
ConcurrentVideos int
|
|
|
|
TakeOverExistingChannel bool
|
2018-03-12 21:58:37 +01:00
|
|
|
Refill int
|
2018-07-17 18:54:22 +02:00
|
|
|
Manager *SyncManager
|
2017-10-11 04:02:16 +02:00
|
|
|
|
|
|
|
daemon *jsonrpc.Client
|
|
|
|
claimAddress string
|
|
|
|
videoDirectory string
|
2017-12-28 18:14:33 +01:00
|
|
|
db *redisdb.DB
|
2017-10-11 04:02:16 +02:00
|
|
|
|
2018-06-25 22:13:28 +02:00
|
|
|
grp *stop.Group
|
2017-11-02 16:20:22 +01:00
|
|
|
|
2018-06-06 23:47:28 +02:00
|
|
|
mux sync.Mutex
|
2017-12-28 18:14:33 +01:00
|
|
|
wg sync.WaitGroup
|
|
|
|
queue chan video
|
2017-11-02 16:20:22 +01:00
|
|
|
}
|
|
|
|
|
2018-05-24 02:32:11 +02:00
|
|
|
// IsInterrupted can be queried to discover if the sync process was interrupted manually
|
|
|
|
func (s *Sync) IsInterrupted() bool {
|
|
|
|
select {
|
2018-07-12 14:28:20 +02:00
|
|
|
case <-s.grp.Ch():
|
2018-05-24 02:32:11 +02:00
|
|
|
return true
|
|
|
|
default:
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-07-17 18:54:22 +02:00
|
|
|
func (s *Sync) FullCycle() (e error) {
|
2017-11-03 16:46:19 +01:00
|
|
|
if os.Getenv("HOME") == "" {
|
2018-03-09 17:47:38 +01:00
|
|
|
return errors.Err("no $HOME env var found")
|
2017-11-03 16:46:19 +01:00
|
|
|
}
|
2017-12-30 01:21:16 +01:00
|
|
|
if s.YoutubeChannelID == "" {
|
2018-07-17 18:54:22 +02:00
|
|
|
return errors.Err("channel ID not provided")
|
|
|
|
}
|
|
|
|
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSyncing)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2017-12-30 01:21:16 +01:00
|
|
|
}
|
2018-07-17 18:54:22 +02:00
|
|
|
defer func() {
|
|
|
|
if e != nil {
|
|
|
|
//conditions for which a channel shouldn't be marked as failed
|
|
|
|
noFailConditions := []string{
|
|
|
|
"this youtube channel is being managed by another server",
|
|
|
|
}
|
2018-07-17 20:58:47 +02:00
|
|
|
if util.ContainedInSlice(e.Error(), noFailConditions) {
|
2018-07-17 18:54:22 +02:00
|
|
|
return
|
|
|
|
}
|
|
|
|
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusFailed)
|
|
|
|
if err != nil {
|
|
|
|
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
|
|
|
|
err = errors.Prefix(msg, err)
|
|
|
|
e = errors.Prefix(err.Error(), e)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
err := s.Manager.setChannelSyncStatus(s.YoutubeChannelID, StatusSynced)
|
|
|
|
if err != nil {
|
|
|
|
e = err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2017-12-08 21:11:44 +01:00
|
|
|
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
|
2018-05-17 19:37:12 +02:00
|
|
|
if os.Getenv("REGTEST") == "true" {
|
|
|
|
defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
|
|
|
|
}
|
2017-12-08 21:11:44 +01:00
|
|
|
walletBackupDir := os.Getenv("HOME") + "/wallets/" + strings.Replace(s.LbryChannelName, "@", "", 1)
|
|
|
|
|
2018-02-13 18:47:05 +01:00
|
|
|
if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) {
|
2018-03-09 17:47:38 +01:00
|
|
|
return errors.Err("default_wallet already exists")
|
2018-02-13 18:47:05 +01:00
|
|
|
}
|
2017-11-03 16:46:19 +01:00
|
|
|
|
2018-02-13 18:47:05 +01:00
|
|
|
if _, err = os.Stat(walletBackupDir); !os.IsNotExist(err) {
|
2017-12-08 21:11:44 +01:00
|
|
|
err = os.Rename(walletBackupDir, defaultWalletDir)
|
2017-11-03 16:46:19 +01:00
|
|
|
if err != nil {
|
|
|
|
return errors.Wrap(err, 0)
|
|
|
|
}
|
|
|
|
log.Println("Continuing previous upload")
|
2017-11-03 13:46:27 +01:00
|
|
|
}
|
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
defer func() {
|
|
|
|
log.Printf("Stopping daemon")
|
|
|
|
shutdownErr := stopDaemonViaSystemd()
|
|
|
|
if shutdownErr != nil {
|
2018-04-26 12:12:54 +02:00
|
|
|
logShutdownError(shutdownErr)
|
2017-12-28 18:14:33 +01:00
|
|
|
} else {
|
2018-04-20 21:06:55 +02:00
|
|
|
// the cli will return long before the daemon effectively stops. we must observe the processes running
|
|
|
|
// before moving the wallet
|
2018-05-07 22:26:46 +02:00
|
|
|
var waitTimeout time.Duration = 60 * 8
|
2018-04-20 21:06:55 +02:00
|
|
|
processDeathError := waitForDaemonProcess(waitTimeout)
|
|
|
|
if processDeathError != nil {
|
2018-04-26 12:12:54 +02:00
|
|
|
logShutdownError(processDeathError)
|
2018-04-20 21:06:55 +02:00
|
|
|
} else {
|
|
|
|
walletErr := os.Rename(defaultWalletDir, walletBackupDir)
|
|
|
|
if walletErr != nil {
|
|
|
|
log.Errorf("error moving wallet to backup dir: %v", walletErr)
|
|
|
|
}
|
2017-12-28 18:14:33 +01:00
|
|
|
}
|
2017-11-03 16:46:19 +01:00
|
|
|
}
|
2017-12-28 18:14:33 +01:00
|
|
|
}()
|
2017-11-03 13:46:27 +01:00
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
s.videoDirectory, err = ioutil.TempDir("", "ytsync")
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrap(err, 0)
|
|
|
|
}
|
2017-11-03 13:46:27 +01:00
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
s.db = redisdb.New()
|
2018-06-25 22:13:28 +02:00
|
|
|
s.grp = stop.New()
|
2017-12-28 18:14:33 +01:00
|
|
|
s.queue = make(chan video)
|
2017-11-03 13:46:27 +01:00
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
interruptChan := make(chan os.Signal, 1)
|
|
|
|
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
|
|
|
|
go func() {
|
|
|
|
<-interruptChan
|
2018-03-12 21:58:37 +01:00
|
|
|
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
|
2018-06-25 22:13:28 +02:00
|
|
|
s.grp.Stop()
|
2017-12-28 18:14:33 +01:00
|
|
|
}()
|
2017-11-03 13:46:27 +01:00
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
log.Printf("Starting daemon")
|
|
|
|
err = startDaemonViaSystemd()
|
2017-11-03 13:46:27 +01:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
log.Infoln("Waiting for daemon to finish starting...")
|
2018-03-09 01:27:54 +01:00
|
|
|
s.daemon = jsonrpc.NewClient("")
|
2018-06-18 01:50:59 +02:00
|
|
|
s.daemon.SetRPCTimeout(40 * time.Minute)
|
2018-03-09 01:27:54 +01:00
|
|
|
|
|
|
|
WaitForDaemonStart:
|
|
|
|
for {
|
|
|
|
select {
|
2018-06-25 22:13:28 +02:00
|
|
|
case <-s.grp.Ch():
|
2018-03-09 01:27:54 +01:00
|
|
|
return nil
|
|
|
|
default:
|
|
|
|
_, err := s.daemon.WalletBalance()
|
|
|
|
if err == nil {
|
|
|
|
break WaitForDaemonStart
|
|
|
|
}
|
|
|
|
time.Sleep(5 * time.Second)
|
|
|
|
}
|
|
|
|
}
|
2017-11-03 13:46:27 +01:00
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
err = s.doSync()
|
2017-11-03 13:46:27 +01:00
|
|
|
if err != nil {
|
|
|
|
return err
|
2017-12-28 18:14:33 +01:00
|
|
|
} else {
|
|
|
|
// wait for reflection to finish???
|
|
|
|
wait := 15 * time.Second // should bump this up to a few min, but keeping it low for testing
|
|
|
|
log.Println("Waiting " + wait.String() + " to finish reflecting everything")
|
|
|
|
time.Sleep(wait)
|
2017-11-03 13:46:27 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
2018-04-26 12:12:54 +02:00
|
|
|
func logShutdownError(shutdownErr error) {
|
2018-06-15 23:03:28 +02:00
|
|
|
util.SendErrorToSlack("error shutting down daemon: %v", shutdownErr)
|
|
|
|
util.SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
|
2018-04-26 12:12:54 +02:00
|
|
|
}
|
2017-11-03 13:46:27 +01:00
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
func (s *Sync) doSync() error {
|
2017-10-11 19:13:47 +02:00
|
|
|
var err error
|
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
err = s.walletSetup()
|
2017-10-11 19:13:47 +02:00
|
|
|
if err != nil {
|
2018-07-17 20:58:47 +02:00
|
|
|
return errors.Prefix("Initial wallet setup failed! Manual Intervention is required.", err)
|
2017-10-11 19:13:47 +02:00
|
|
|
}
|
|
|
|
|
2017-10-11 04:02:16 +02:00
|
|
|
if s.StopOnError {
|
|
|
|
log.Println("Will stop publishing if an error is detected")
|
|
|
|
}
|
|
|
|
|
|
|
|
for i := 0; i < s.ConcurrentVideos; i++ {
|
2017-12-28 18:14:33 +01:00
|
|
|
go s.startWorker(i)
|
2017-10-11 04:02:16 +02:00
|
|
|
}
|
|
|
|
|
2018-02-13 18:47:05 +01:00
|
|
|
if s.LbryChannelName == "@UCBerkeley" {
|
|
|
|
err = s.enqueueUCBVideos()
|
|
|
|
} else {
|
|
|
|
err = s.enqueueYoutubeVideos()
|
|
|
|
}
|
2017-12-28 18:14:33 +01:00
|
|
|
close(s.queue)
|
|
|
|
s.wg.Wait()
|
2017-10-11 04:02:16 +02:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
func (s *Sync) startWorker(workerNum int) {
|
|
|
|
s.wg.Add(1)
|
|
|
|
defer s.wg.Done()
|
2017-11-02 16:20:22 +01:00
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
var v video
|
|
|
|
var more bool
|
2017-11-02 16:20:22 +01:00
|
|
|
|
|
|
|
for {
|
2017-12-28 18:14:33 +01:00
|
|
|
select {
|
2018-06-25 22:13:28 +02:00
|
|
|
case <-s.grp.Ch():
|
2017-12-28 18:14:33 +01:00
|
|
|
log.Printf("Stopping worker %d", workerNum)
|
|
|
|
return
|
|
|
|
default:
|
2017-11-02 16:20:22 +01:00
|
|
|
}
|
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
select {
|
|
|
|
case v, more = <-s.queue:
|
|
|
|
if !more {
|
|
|
|
return
|
|
|
|
}
|
2018-06-25 22:13:28 +02:00
|
|
|
case <-s.grp.Ch():
|
2017-12-28 18:14:33 +01:00
|
|
|
log.Printf("Stopping worker %d", workerNum)
|
|
|
|
return
|
2017-10-11 04:02:16 +02:00
|
|
|
}
|
|
|
|
|
2018-02-13 18:47:05 +01:00
|
|
|
log.Println("================================================================================")
|
2017-10-11 04:02:16 +02:00
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
tryCount := 0
|
|
|
|
for {
|
|
|
|
tryCount++
|
|
|
|
err := s.processVideo(v)
|
2017-10-11 04:02:16 +02:00
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
if err != nil {
|
2018-04-26 12:12:54 +02:00
|
|
|
logMsg := fmt.Sprintf("error processing video: " + err.Error())
|
|
|
|
log.Errorln(logMsg)
|
2018-04-30 21:18:27 +02:00
|
|
|
fatalErrors := []string{
|
|
|
|
":5279: read: connection reset by peer",
|
2018-06-06 23:47:28 +02:00
|
|
|
"no space left on device",
|
2018-06-15 23:03:28 +02:00
|
|
|
"NotEnoughFunds",
|
2018-06-18 01:50:59 +02:00
|
|
|
"Cannot publish using channel",
|
2018-04-30 21:18:27 +02:00
|
|
|
}
|
2018-07-17 20:58:47 +02:00
|
|
|
if util.ContainedInSlice(err.Error(), fatalErrors) || s.StopOnError {
|
2018-06-25 22:13:28 +02:00
|
|
|
s.grp.Stop()
|
2017-12-28 18:14:33 +01:00
|
|
|
} else if s.MaxTries > 1 {
|
2018-04-30 21:18:27 +02:00
|
|
|
errorsNoRetry := []string{
|
|
|
|
"non 200 status code received",
|
|
|
|
" reason: 'This video contains content from",
|
|
|
|
"dont know which claim to update",
|
|
|
|
"uploader has not made this video available in your country",
|
|
|
|
"download error: AccessDenied: Access Denied",
|
|
|
|
"Playback on other websites has been disabled by the video owner",
|
|
|
|
"Error in daemon: Cannot publish empty file",
|
2018-05-07 22:26:46 +02:00
|
|
|
"Error extracting sts from embedded url response",
|
2018-06-06 23:47:28 +02:00
|
|
|
"Client.Timeout exceeded while awaiting headers)",
|
2018-06-18 01:50:59 +02:00
|
|
|
"video is bigger than 2GB, skipping for now",
|
2018-04-30 21:18:27 +02:00
|
|
|
}
|
2018-07-17 20:58:47 +02:00
|
|
|
if util.ContainedInSlice(err.Error(), errorsNoRetry) {
|
2017-12-28 18:14:33 +01:00
|
|
|
log.Println("This error should not be retried at all")
|
2018-04-20 21:06:55 +02:00
|
|
|
} else if tryCount < s.MaxTries {
|
2018-06-18 01:50:59 +02:00
|
|
|
if strings.Contains(err.Error(), "txn-mempool-conflict") ||
|
2018-04-30 21:18:27 +02:00
|
|
|
strings.Contains(err.Error(), "failed: Not enough funds") ||
|
2018-05-17 01:42:06 +02:00
|
|
|
strings.Contains(err.Error(), "Error in daemon: Insufficient funds, please deposit additional LBC") ||
|
2018-06-18 01:50:59 +02:00
|
|
|
strings.Contains(err.Error(), "too-long-mempool-chain") {
|
2018-04-20 21:06:55 +02:00
|
|
|
log.Println("waiting for a block and refilling addresses before retrying")
|
2018-04-26 12:12:54 +02:00
|
|
|
err = s.walletSetup()
|
2018-04-20 21:06:55 +02:00
|
|
|
if err != nil {
|
2018-07-12 14:28:20 +02:00
|
|
|
s.grp.Stop()
|
2018-06-15 23:03:28 +02:00
|
|
|
util.SendErrorToSlack("Failed to setup the wallet for a refill: %v", err)
|
2018-05-05 16:15:02 +02:00
|
|
|
break
|
2018-04-20 21:06:55 +02:00
|
|
|
}
|
|
|
|
}
|
2017-12-28 18:14:33 +01:00
|
|
|
log.Println("Retrying")
|
|
|
|
continue
|
|
|
|
}
|
2018-06-15 23:03:28 +02:00
|
|
|
util.SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
|
2017-12-28 18:14:33 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
break
|
2017-11-06 22:42:52 +01:00
|
|
|
}
|
2017-10-11 04:02:16 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-02-13 18:47:05 +01:00
|
|
|
func (s *Sync) enqueueYoutubeVideos() error {
|
2017-10-11 04:02:16 +02:00
|
|
|
client := &http.Client{
|
|
|
|
Transport: &transport.APIKey{Key: s.YoutubeAPIKey},
|
|
|
|
}
|
|
|
|
|
|
|
|
service, err := youtube.New(client)
|
|
|
|
if err != nil {
|
2018-03-09 17:47:38 +01:00
|
|
|
return errors.Prefix("error creating YouTube service", err)
|
2017-10-11 04:02:16 +02:00
|
|
|
}
|
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
response, err := service.Channels.List("contentDetails").Id(s.YoutubeChannelID).Do()
|
2017-10-11 04:02:16 +02:00
|
|
|
if err != nil {
|
2018-03-09 17:47:38 +01:00
|
|
|
return errors.Prefix("error getting channels", err)
|
2017-10-11 04:02:16 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
if len(response.Items) < 1 {
|
2018-03-09 17:47:38 +01:00
|
|
|
return errors.Err("youtube channel not found")
|
2017-10-11 04:02:16 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
if response.Items[0].ContentDetails.RelatedPlaylists == nil {
|
2018-03-09 17:47:38 +01:00
|
|
|
return errors.Err("no related playlists")
|
2017-10-11 04:02:16 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
playlistID := response.Items[0].ContentDetails.RelatedPlaylists.Uploads
|
|
|
|
if playlistID == "" {
|
2018-03-09 17:47:38 +01:00
|
|
|
return errors.Err("no channel playlist")
|
2017-10-11 04:02:16 +02:00
|
|
|
}
|
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
var videos []video
|
2017-10-11 04:02:16 +02:00
|
|
|
|
|
|
|
nextPageToken := ""
|
|
|
|
for {
|
|
|
|
req := service.PlaylistItems.List("snippet").
|
|
|
|
PlaylistId(playlistID).
|
|
|
|
MaxResults(50).
|
|
|
|
PageToken(nextPageToken)
|
|
|
|
|
|
|
|
playlistResponse, err := req.Do()
|
|
|
|
if err != nil {
|
2018-03-09 17:47:38 +01:00
|
|
|
return errors.Prefix("error getting playlist items", err)
|
2017-10-11 04:02:16 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
if len(playlistResponse.Items) < 1 {
|
2018-03-09 17:47:38 +01:00
|
|
|
return errors.Err("playlist items not found")
|
2017-10-11 04:02:16 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
for _, item := range playlistResponse.Items {
|
|
|
|
// normally we'd send the video into the channel here, but youtube api doesn't have sorting
|
|
|
|
// so we have to get ALL the videos, then sort them, then send them in
|
2017-12-28 18:14:33 +01:00
|
|
|
videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item.Snippet))
|
2017-10-11 04:02:16 +02:00
|
|
|
}
|
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
log.Infof("Got info for %d videos from youtube API", len(videos))
|
2017-10-11 04:02:16 +02:00
|
|
|
|
|
|
|
nextPageToken = playlistResponse.NextPageToken
|
|
|
|
if nextPageToken == "" {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
sort.Sort(byPublishedAt(videos))
|
|
|
|
//or sort.Sort(sort.Reverse(byPlaylistPosition(videos)))
|
|
|
|
|
2017-10-11 19:13:47 +02:00
|
|
|
Enqueue:
|
2017-10-11 04:02:16 +02:00
|
|
|
for _, v := range videos {
|
|
|
|
select {
|
2018-06-25 22:13:28 +02:00
|
|
|
case <-s.grp.Ch():
|
2017-12-28 18:14:33 +01:00
|
|
|
break Enqueue
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case s.queue <- v:
|
2018-06-25 22:13:28 +02:00
|
|
|
case <-s.grp.Ch():
|
2017-10-11 19:13:47 +02:00
|
|
|
break Enqueue
|
2017-10-11 04:02:16 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-02-13 18:47:05 +01:00
|
|
|
func (s *Sync) enqueueUCBVideos() error {
|
|
|
|
var videos []video
|
|
|
|
|
|
|
|
csvFile, err := os.Open("ucb.csv")
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
reader := csv.NewReader(bufio.NewReader(csvFile))
|
|
|
|
for {
|
|
|
|
line, err := reader.Read()
|
|
|
|
if err == io.EOF {
|
|
|
|
break
|
|
|
|
} else if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
data := struct {
|
|
|
|
PublishedAt string `json:"publishedAt"`
|
|
|
|
}{}
|
|
|
|
err = json.Unmarshal([]byte(line[4]), &data)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
videos = append(videos, sources.NewUCBVideo(line[0], line[2], line[1], line[3], data.PublishedAt, s.videoDirectory))
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Printf("Publishing %d videos\n", len(videos))
|
|
|
|
|
|
|
|
sort.Sort(byPublishedAt(videos))
|
|
|
|
|
|
|
|
Enqueue:
|
|
|
|
for _, v := range videos {
|
|
|
|
select {
|
2018-06-25 22:13:28 +02:00
|
|
|
case <-s.grp.Ch():
|
2018-02-13 18:47:05 +01:00
|
|
|
break Enqueue
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case s.queue <- v:
|
2018-06-25 22:13:28 +02:00
|
|
|
case <-s.grp.Ch():
|
2018-02-13 18:47:05 +01:00
|
|
|
break Enqueue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-03-09 17:47:38 +01:00
|
|
|
func (s *Sync) processVideo(v video) (err error) {
|
|
|
|
defer func() {
|
|
|
|
if p := recover(); p != nil {
|
|
|
|
var ok bool
|
|
|
|
err, ok = p.(error)
|
|
|
|
if !ok {
|
|
|
|
err = errors.Err("%v", p)
|
|
|
|
}
|
|
|
|
err = errors.Wrap(p, 2)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
log.Println("Processing " + v.IDAndNum())
|
2017-11-02 16:20:22 +01:00
|
|
|
defer func(start time.Time) {
|
2017-12-28 18:14:33 +01:00
|
|
|
log.Println(v.ID() + " took " + time.Since(start).String())
|
2017-11-02 16:20:22 +01:00
|
|
|
}(time.Now())
|
2017-10-11 04:02:16 +02:00
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
alreadyPublished, err := s.db.IsPublished(v.ID())
|
2017-10-11 04:02:16 +02:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
if alreadyPublished {
|
|
|
|
log.Println(v.ID() + " already published")
|
|
|
|
return nil
|
2017-10-11 04:02:16 +02:00
|
|
|
}
|
|
|
|
|
2018-05-05 13:22:33 +02:00
|
|
|
if v.PlaylistPosition() > 1000 {
|
2018-04-25 23:06:17 +02:00
|
|
|
log.Println(v.ID() + " is old: skipping")
|
|
|
|
return nil
|
|
|
|
}
|
2017-12-28 18:14:33 +01:00
|
|
|
err = v.Sync(s.daemon, s.claimAddress, publishAmount, s.LbryChannelName)
|
2017-10-11 04:02:16 +02:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
err = s.db.SetPublished(v.ID())
|
2017-10-11 04:02:16 +02:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
2017-11-03 13:46:27 +01:00
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
func startDaemonViaSystemd() error {
|
2017-11-03 13:46:27 +01:00
|
|
|
err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "start", "lbrynet.service").Run()
|
|
|
|
if err != nil {
|
2018-03-09 17:47:38 +01:00
|
|
|
return errors.Err(err)
|
2017-11-03 13:46:27 +01:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-12-28 18:14:33 +01:00
|
|
|
func stopDaemonViaSystemd() error {
|
2017-11-03 13:46:27 +01:00
|
|
|
err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "stop", "lbrynet.service").Run()
|
|
|
|
if err != nil {
|
2018-03-09 17:47:38 +01:00
|
|
|
return errors.Err(err)
|
2017-11-03 13:46:27 +01:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
2017-12-30 01:21:16 +01:00
|
|
|
|
2018-04-20 21:06:55 +02:00
|
|
|
// waitForDaemonProcess observes the running processes and returns when the process is no longer running or when the timeout is up
|
|
|
|
func waitForDaemonProcess(timeout time.Duration) error {
|
|
|
|
processes, err := ps.Processes()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
var daemonProcessId = -1
|
|
|
|
for _, p := range processes {
|
|
|
|
if p.Executable() == "lbrynet-daemon" {
|
|
|
|
daemonProcessId = p.Pid()
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if daemonProcessId == -1 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
then := time.Now()
|
|
|
|
stopTime := then.Add(time.Duration(timeout * time.Second))
|
|
|
|
for !time.Now().After(stopTime) {
|
|
|
|
wait := 10 * time.Second
|
|
|
|
log.Println("the daemon is still running, waiting for it to exit")
|
|
|
|
time.Sleep(wait)
|
|
|
|
proc, err := os.FindProcess(daemonProcessId)
|
|
|
|
if err != nil {
|
|
|
|
// couldn't find the process, that means the daemon is stopped and can continue
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
//double check if process is running and alive
|
|
|
|
//by sending a signal 0
|
|
|
|
//NOTE : syscall.Signal is not available in Windows
|
|
|
|
err = proc.Signal(syscall.Signal(0))
|
|
|
|
//the process doesn't exist anymore! we're free to go
|
2018-04-24 20:50:01 +02:00
|
|
|
if err != nil && (err == syscall.ESRCH || err.Error() == "os: process already finished") {
|
2018-04-20 21:06:55 +02:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return errors.Err("timeout reached")
|
|
|
|
}
|