ytsync/manager/ytsync.go

885 lines
23 KiB
Go
Raw Normal View History

package manager
2017-10-11 04:02:16 +02:00
import (
2018-02-13 18:47:05 +01:00
"bufio"
"encoding/csv"
"encoding/json"
"fmt"
2018-02-13 18:47:05 +01:00
"io"
2017-10-11 04:02:16 +02:00
"io/ioutil"
"net/http"
"os"
"os/exec"
"os/signal"
2018-10-09 21:57:07 +02:00
"runtime/debug"
2017-10-11 04:02:16 +02:00
"sort"
"strings"
"sync"
"syscall"
2017-10-11 04:02:16 +02:00
"time"
"github.com/lbryio/ytsync/namer"
"github.com/lbryio/ytsync/sdk"
"github.com/lbryio/ytsync/sources"
2019-01-11 02:34:34 +01:00
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/jsonrpc"
"github.com/lbryio/lbry.go/extras/stop"
"github.com/lbryio/lbry.go/extras/util"
2018-09-26 06:08:18 +02:00
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
2018-06-13 18:44:09 +02:00
"github.com/mitchellh/go-ps"
2017-10-11 04:02:16 +02:00
log "github.com/sirupsen/logrus"
"google.golang.org/api/googleapi/transport"
"google.golang.org/api/youtube/v3"
2017-10-11 04:02:16 +02:00
)
const (
2018-08-01 14:56:04 +02:00
channelClaimAmount = 0.01
publishAmount = 0.01
maxReasonLength = 500
2017-10-11 04:02:16 +02:00
)
type video interface {
2018-08-14 17:09:23 +02:00
Size() *int64
ID() string
IDAndNum() string
PlaylistPosition() int
PublishedAt() time.Time
Sync(*jsonrpc.Client, string, float64, string, int, *namer.Namer, float64) (*sources.SyncSummary, error)
}
// sorting videos
type byPublishedAt []video
func (a byPublishedAt) Len() int { return len(a) }
func (a byPublishedAt) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[j].PublishedAt()) }
2017-10-11 04:02:16 +02:00
// Sync stores the options that control how syncing happens
type Sync struct {
2018-09-26 06:08:18 +02:00
APIConfig *sdk.APIConfig
2017-11-06 22:42:52 +01:00
YoutubeChannelID string
LbryChannelName string
StopOnError bool
MaxTries int
ConcurrentVideos int
TakeOverExistingChannel bool
Refill int
Manager *SyncManager
LbrycrdString string
AwsS3ID string
AwsS3Secret string
AwsS3Region string
AwsS3Bucket string
2017-10-11 04:02:16 +02:00
daemon *jsonrpc.Client
claimAddress string
videoDirectory string
syncedVideosMux *sync.RWMutex
2018-09-26 06:08:18 +02:00
syncedVideos map[string]sdk.SyncedVideo
grp *stop.Group
lbryChannelID string
2018-09-18 22:57:25 +02:00
namer *namer.Namer
walletMux *sync.Mutex
queue chan video
}
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) {
s.syncedVideosMux.Lock()
defer s.syncedVideosMux.Unlock()
2018-09-26 06:08:18 +02:00
s.syncedVideos[videoID] = sdk.SyncedVideo{
VideoID: videoID,
Published: published,
FailureReason: failureReason,
}
}
2018-07-24 02:01:35 +02:00
// SendErrorToSlack Sends an error message to the default channel and to the process log.
func SendErrorToSlack(format string, a ...interface{}) {
2018-07-24 02:01:35 +02:00
message := format
if len(a) > 0 {
message = fmt.Sprintf(format, a...)
}
log.Errorln(message)
err := util.SendToSlack(":sos: " + message)
if err != nil {
log.Errorln(err)
}
2018-07-24 02:01:35 +02:00
}
// SendInfoToSlack Sends an info message to the default channel and to the process log.
func SendInfoToSlack(format string, a ...interface{}) {
2018-07-24 02:01:35 +02:00
message := format
if len(a) > 0 {
message = fmt.Sprintf(format, a...)
}
log.Infoln(message)
err := util.SendToSlack(":information_source: " + message)
if err != nil {
log.Errorln(err)
}
2018-07-24 02:01:35 +02:00
}
// IsInterrupted can be queried to discover if the sync process was interrupted manually
func (s *Sync) IsInterrupted() bool {
select {
2018-07-12 14:28:20 +02:00
case <-s.grp.Ch():
return true
default:
return false
}
}
func (s *Sync) downloadWallet() error {
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
defaultTempWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/tmp_wallet"
key := aws.String("/wallets/" + s.YoutubeChannelID)
if os.Getenv("REGTEST") == "true" {
defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
defaultTempWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/tmp_wallet"
key = aws.String("/regtest/" + s.YoutubeChannelID)
}
if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) {
return errors.Err("default_wallet already exists")
}
creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "")
s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds})
if err != nil {
return err
}
downloader := s3manager.NewDownloader(s3Session)
out, err := os.Create(defaultTempWalletDir)
if err != nil {
return err
}
defer out.Close()
bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{
Bucket: aws.String(s.AwsS3Bucket),
Key: key,
})
if err != nil {
// Casting to the awserr.Error type will allow you to inspect the error
// code returned by the service in code. The error code can be used
// to switch on context specific functionality. In this case a context
// specific error message is printed to the user based on the bucket
// and key existing.
//
// For information on other S3 API error codes see:
// http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
if aerr, ok := err.(awserr.Error); ok {
code := aerr.Code()
if code == s3.ErrCodeNoSuchKey {
return errors.Err("wallet not on S3")
}
}
return err
} else if bytesWritten == 0 {
return errors.Err("zero bytes written")
}
return os.Rename(defaultTempWalletDir, defaultWalletDir)
}
func (s *Sync) uploadWallet() error {
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
key := aws.String("/wallets/" + s.YoutubeChannelID)
if os.Getenv("REGTEST") == "true" {
defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
key = aws.String("/regtest/" + s.YoutubeChannelID)
}
if _, err := os.Stat(defaultWalletDir); os.IsNotExist(err) {
return errors.Err("default_wallet does not exist")
}
creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "")
s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds})
if err != nil {
return err
}
uploader := s3manager.NewUploader(s3Session)
file, err := os.Open(defaultWalletDir)
if err != nil {
return err
}
defer file.Close()
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.AwsS3Bucket),
Key: key,
Body: file,
})
if err != nil {
return err
}
return os.Remove(defaultWalletDir)
}
func (s *Sync) setStatusSyncing() error {
2018-09-26 06:08:18 +02:00
syncedVideos, claimNames, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
if err != nil {
return err
}
s.syncedVideosMux.Lock()
s.syncedVideos = syncedVideos
2018-10-09 21:57:07 +02:00
s.namer.SetNames(claimNames)
s.syncedVideosMux.Unlock()
return nil
}
func (s *Sync) setExceptions() {
if s.YoutubeChannelID == "UCwjQfNRW6sGYb__pd7d4nUg" { //@FreeTalkLive
s.Manager.maxVideoLength = 0.0 // skips max length checks
s.Manager.maxVideoSize = 0
}
}
func (s *Sync) FullCycle() (e error) {
2017-11-03 16:46:19 +01:00
if os.Getenv("HOME") == "" {
return errors.Err("no $HOME env var found")
2017-11-03 16:46:19 +01:00
}
if s.YoutubeChannelID == "" {
return errors.Err("channel ID not provided")
}
s.setExceptions()
s.syncedVideosMux = &sync.RWMutex{}
s.walletMux = &sync.Mutex{}
s.grp = stop.New()
s.queue = make(chan video)
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(interruptChan)
go func() {
<-interruptChan
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
s.grp.Stop()
}()
err := s.setStatusSyncing()
if err != nil {
return err
}
defer s.setChannelTerminationStatus(&e)
2017-11-03 16:46:19 +01:00
err = s.downloadWallet()
if err != nil && err.Error() != "wallet not on S3" {
return errors.Prefix("failure in downloading wallet", err)
} else if err == nil {
log.Println("Continuing previous upload")
} else {
log.Println("Starting new wallet")
}
defer s.stopAndUploadWallet(&e)
s.videoDirectory, err = ioutil.TempDir("", "ytsync")
if err != nil {
return errors.Wrap(err, 0)
}
log.Printf("Starting daemon")
err = startDaemonViaSystemd()
if err != nil {
return err
}
log.Infoln("Waiting for daemon to finish starting...")
s.daemon = jsonrpc.NewClient("")
s.daemon.SetRPCTimeout(40 * time.Minute)
err = s.waitForDaemonStart()
if err != nil {
return err
}
err = s.doSync()
if err != nil {
return err
} else {
// wait for reflection to finish???
wait := 15 * time.Second // should bump this up to a few min, but keeping it low for testing
log.Println("Waiting " + wait.String() + " to finish reflecting everything")
time.Sleep(wait)
}
return nil
}
func (s *Sync) setChannelTerminationStatus(e *error) {
if *e != nil {
//conditions for which a channel shouldn't be marked as failed
noFailConditions := []string{
"this youtube channel is being managed by another server",
}
if util.SubstringInSlice((*e).Error(), noFailConditions) {
return
}
failureReason := (*e).Error()
2018-09-26 06:08:18 +02:00
_, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s", s.LbryChannelName)
*e = errors.Prefix(msg+err.Error(), *e)
}
} else if !s.IsInterrupted() {
2018-09-26 06:08:18 +02:00
_, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSynced, "")
if err != nil {
*e = err
}
}
}
func (s *Sync) waitForDaemonStart() error {
for {
select {
case <-s.grp.Ch():
return errors.Err("interrupted during daemon startup")
default:
2018-08-20 21:26:52 +02:00
s, err := s.daemon.Status()
2018-09-21 16:26:27 +02:00
if err == nil && s.StartupStatus.Wallet && s.StartupStatus.FileManager {
return nil
}
time.Sleep(5 * time.Second)
}
}
}
2018-09-21 16:26:27 +02:00
func (s *Sync) stopAndUploadWallet(e *error) {
log.Printf("Stopping daemon")
shutdownErr := stopDaemonViaSystemd()
if shutdownErr != nil {
logShutdownError(shutdownErr)
} else {
// the cli will return long before the daemon effectively stops. we must observe the processes running
// before moving the wallet
waitTimeout := 8 * time.Minute
processDeathError := waitForDaemonProcess(waitTimeout)
if processDeathError != nil {
logShutdownError(processDeathError)
} else {
err := s.uploadWallet()
if err != nil {
if *e == nil {
e = &err //not 100% sure
return
} else {
*e = errors.Prefix("failure uploading wallet", *e)
}
}
}
}
}
func logShutdownError(shutdownErr error) {
2018-07-24 02:01:35 +02:00
SendErrorToSlack("error shutting down daemon: %v", shutdownErr)
SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
}
func isYtsyncClaim(c jsonrpc.Claim) bool {
if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil {
return false
}
if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil {
//most likely a claim created outside of ytsync, ignore!
return false
}
return strings.Contains(*c.Value.Stream.Metadata.Thumbnail, "https://berk.ninja/thumbnails/")
}
// fixDupes abandons duplicate claims
func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) {
abandonedClaims := false
videoIDs := make(map[string]jsonrpc.Claim)
for _, c := range claims {
if !isYtsyncClaim(c) {
continue
}
tn := *c.Value.Stream.Metadata.Thumbnail
2018-09-03 23:38:16 +02:00
videoID := tn[strings.LastIndex(tn, "/")+1:]
log.Infof("claimid: %s, claimName: %s, videoID: %s", c.ClaimID, c.Name, videoID)
cl, ok := videoIDs[videoID]
if !ok || cl.ClaimID == c.ClaimID {
videoIDs[videoID] = c
continue
}
// only keep the most recent one
claimToAbandon := c
videoIDs[videoID] = cl
if c.Height > cl.Height {
claimToAbandon = cl
videoIDs[videoID] = c
}
log.Debugf("abandoning %+v", claimToAbandon)
_, err := s.daemon.ClaimAbandon(claimToAbandon.Txid, claimToAbandon.Nout)
if err != nil {
return true, err
}
abandonedClaims = true
//return true, nil
}
return abandonedClaims, nil
}
//updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published
func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err error) {
count := 0
for _, c := range claims {
if !isYtsyncClaim(c) {
continue
}
count++
//check if claimID is in remote db
tn := *c.Value.Stream.Metadata.Thumbnail
videoID := tn[strings.LastIndex(tn, "/")+1:]
pv, ok := s.syncedVideos[videoID]
if !ok || pv.ClaimName != c.Name {
fixed++
log.Debugf("adding %s to the database", c.Name)
2018-09-26 06:08:18 +02:00
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil)
if err != nil {
return count, fixed, err
}
}
}
return count, fixed, nil
}
func (s *Sync) doSync() error {
2017-10-11 19:13:47 +02:00
var err error
err = s.walletSetup()
if err != nil {
return errors.Prefix("Initial wallet setup failed! Manual Intervention is required.", err)
}
claims, err := s.daemon.ClaimListMine()
if err != nil {
return errors.Prefix("cannot list claims", err)
}
hasDupes, err := s.fixDupes(*claims)
if err != nil {
return errors.Prefix("error checking for duplicates", err)
}
if hasDupes {
SendInfoToSlack("Channel had dupes and was fixed!")
2018-09-21 16:26:27 +02:00
err = s.waitForNewBlock()
if err != nil {
return err
}
claims, err = s.daemon.ClaimListMine()
if err != nil {
return errors.Prefix("cannot list claims", err)
}
}
pubsOnWallet, nFixed, err := s.updateRemoteDB(*claims)
if err != nil {
return errors.Prefix("error counting claims", err)
}
if nFixed > 0 {
err := s.setStatusSyncing()
if err != nil {
return err
}
SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed)
}
pubsOnDB := 0
for _, sv := range s.syncedVideos {
if sv.Published {
pubsOnDB++
}
}
if pubsOnWallet > pubsOnDB { //This case should never happen
SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
return errors.Err("not all published videos are in the database")
}
if pubsOnWallet < pubsOnDB {
SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
}
2017-10-11 19:13:47 +02:00
2017-10-11 04:02:16 +02:00
if s.StopOnError {
log.Println("Will stop publishing if an error is detected")
}
for i := 0; i < s.ConcurrentVideos; i++ {
s.grp.Add(1)
go func(i int) {
defer s.grp.Done()
s.startWorker(i)
}(i)
2017-10-11 04:02:16 +02:00
}
2018-02-13 18:47:05 +01:00
if s.LbryChannelName == "@UCBerkeley" {
err = s.enqueueUCBVideos()
} else {
err = s.enqueueYoutubeVideos()
}
close(s.queue)
s.grp.Wait()
2017-10-11 04:02:16 +02:00
return err
}
func (s *Sync) startWorker(workerNum int) {
var v video
var more bool
for {
select {
case <-s.grp.Ch():
log.Printf("Stopping worker %d", workerNum)
return
default:
}
select {
case v, more = <-s.queue:
if !more {
return
}
case <-s.grp.Ch():
log.Printf("Stopping worker %d", workerNum)
return
2017-10-11 04:02:16 +02:00
}
2018-02-13 18:47:05 +01:00
log.Println("================================================================================")
2017-10-11 04:02:16 +02:00
tryCount := 0
for {
tryCount++
err := s.processVideo(v)
2017-10-11 04:02:16 +02:00
if err != nil {
2018-12-28 16:28:01 +01:00
logMsg := "error processing video: " + err.Error()
log.Errorln(logMsg)
fatalErrors := []string{
":5279: read: connection reset by peer",
"no space left on device",
"NotEnoughFunds",
"Cannot publish using channel",
"cannot concatenate 'str' and 'NoneType' objects",
"more than 90% of the space has been used.",
}
2018-07-24 02:01:35 +02:00
if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError {
s.grp.Stop()
} else if s.MaxTries > 1 {
errorsNoRetry := []string{
"non 200 status code received",
" reason: 'This video contains content from",
"dont know which claim to update",
"uploader has not made this video available in your country",
"download error: AccessDenied: Access Denied",
"Playback on other websites has been disabled by the video owner",
"Error in daemon: Cannot publish empty file",
"Error extracting sts from embedded url response",
2018-08-29 13:33:02 +02:00
"Unable to extract signature tokens",
"Client.Timeout exceeded while awaiting headers)",
2018-08-01 14:56:04 +02:00
"the video is too big to sync, skipping for now",
"video is too long to process",
}
2018-07-24 02:01:35 +02:00
if util.SubstringInSlice(err.Error(), errorsNoRetry) {
log.Println("This error should not be retried at all")
} else if tryCount < s.MaxTries {
if strings.Contains(err.Error(), "txn-mempool-conflict") ||
strings.Contains(err.Error(), "too-long-mempool-chain") {
2018-08-10 02:04:39 +02:00
log.Println("waiting for a block before retrying")
err = s.waitForNewBlock()
if err != nil {
s.grp.Stop()
SendErrorToSlack("something went wrong while waiting for a block: %v", err)
break
}
} else if strings.Contains(err.Error(), "failed: Not enough funds") ||
strings.Contains(err.Error(), "Error in daemon: Insufficient funds, please deposit additional LBC") {
log.Println("refilling addresses before retrying")
err = s.walletSetup()
if err != nil {
2018-07-12 14:28:20 +02:00
s.grp.Stop()
2018-08-10 02:04:39 +02:00
SendErrorToSlack("failed to setup the wallet for a refill: %v", err)
break
}
}
log.Println("Retrying")
continue
}
2018-07-24 02:01:35 +02:00
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
}
s.AppendSyncedVideo(v.ID(), false, err.Error(), "")
2018-09-26 06:08:18 +02:00
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
if err != nil {
2018-07-24 02:01:35 +02:00
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}
}
break
2017-11-06 22:42:52 +01:00
}
2017-10-11 04:02:16 +02:00
}
}
2018-02-13 18:47:05 +01:00
func (s *Sync) enqueueYoutubeVideos() error {
2017-10-11 04:02:16 +02:00
client := &http.Client{
2018-09-26 06:08:18 +02:00
Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey},
2017-10-11 04:02:16 +02:00
}
service, err := youtube.New(client)
if err != nil {
return errors.Prefix("error creating YouTube service", err)
2017-10-11 04:02:16 +02:00
}
response, err := service.Channels.List("contentDetails").Id(s.YoutubeChannelID).Do()
2017-10-11 04:02:16 +02:00
if err != nil {
return errors.Prefix("error getting channels", err)
2017-10-11 04:02:16 +02:00
}
if len(response.Items) < 1 {
return errors.Err("youtube channel not found")
2017-10-11 04:02:16 +02:00
}
if response.Items[0].ContentDetails.RelatedPlaylists == nil {
return errors.Err("no related playlists")
2017-10-11 04:02:16 +02:00
}
playlistID := response.Items[0].ContentDetails.RelatedPlaylists.Uploads
if playlistID == "" {
return errors.Err("no channel playlist")
2017-10-11 04:02:16 +02:00
}
var videos []video
2017-10-11 04:02:16 +02:00
nextPageToken := ""
for {
req := service.PlaylistItems.List("snippet").
PlaylistId(playlistID).
MaxResults(50).
PageToken(nextPageToken)
playlistResponse, err := req.Do()
if err != nil {
return errors.Prefix("error getting playlist items", err)
2017-10-11 04:02:16 +02:00
}
if len(playlistResponse.Items) < 1 {
2019-01-11 20:15:17 +01:00
// If there are 50+ videos in a playlist but less than 50 are actually returned by the API, youtube will still redirect
// clients to a next page. Such next page will however be empty. This logic prevents ytsync from failing.
youtubeIsLying := len(videos) > 0
if youtubeIsLying {
break
}
return errors.Err("playlist items not found")
2017-10-11 04:02:16 +02:00
}
for _, item := range playlistResponse.Items {
// normally we'd send the video into the channel here, but youtube api doesn't have sorting
// so we have to get ALL the videos, then sort them, then send them in
videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item.Snippet))
2017-10-11 04:02:16 +02:00
}
log.Infof("Got info for %d videos from youtube API", len(videos))
2017-10-11 04:02:16 +02:00
nextPageToken = playlistResponse.NextPageToken
if nextPageToken == "" {
break
}
}
sort.Sort(byPublishedAt(videos))
//or sort.Sort(sort.Reverse(byPlaylistPosition(videos)))
2017-10-11 19:13:47 +02:00
Enqueue:
2017-10-11 04:02:16 +02:00
for _, v := range videos {
select {
case <-s.grp.Ch():
break Enqueue
default:
}
select {
case s.queue <- v:
case <-s.grp.Ch():
2017-10-11 19:13:47 +02:00
break Enqueue
2017-10-11 04:02:16 +02:00
}
}
return nil
}
2018-02-13 18:47:05 +01:00
func (s *Sync) enqueueUCBVideos() error {
var videos []video
csvFile, err := os.Open("ucb.csv")
if err != nil {
return err
}
reader := csv.NewReader(bufio.NewReader(csvFile))
for {
line, err := reader.Read()
if err == io.EOF {
break
} else if err != nil {
return err
}
data := struct {
PublishedAt string `json:"publishedAt"`
}{}
err = json.Unmarshal([]byte(line[4]), &data)
if err != nil {
return err
}
videos = append(videos, sources.NewUCBVideo(line[0], line[2], line[1], line[3], data.PublishedAt, s.videoDirectory))
}
log.Printf("Publishing %d videos\n", len(videos))
sort.Sort(byPublishedAt(videos))
Enqueue:
for _, v := range videos {
select {
case <-s.grp.Ch():
2018-02-13 18:47:05 +01:00
break Enqueue
default:
}
select {
case s.queue <- v:
case <-s.grp.Ch():
2018-02-13 18:47:05 +01:00
break Enqueue
}
}
return nil
}
func (s *Sync) processVideo(v video) (err error) {
defer func() {
if p := recover(); p != nil {
2018-10-09 21:57:07 +02:00
log.Printf("stack: %s", debug.Stack())
var ok bool
err, ok = p.(error)
if !ok {
err = errors.Err("%v", p)
}
err = errors.Wrap(p, 2)
}
}()
log.Println("Processing " + v.IDAndNum())
defer func(start time.Time) {
log.Println(v.ID() + " took " + time.Since(start).String())
}(time.Now())
2017-10-11 04:02:16 +02:00
s.syncedVideosMux.RLock()
sv, ok := s.syncedVideos[v.ID()]
s.syncedVideosMux.RUnlock()
alreadyPublished := ok && sv.Published
2018-08-02 14:05:06 +02:00
neverRetryFailures := []string{
"Error extracting sts from embedded url response",
2018-08-29 13:33:02 +02:00
"Unable to extract signature tokens",
2018-08-02 14:05:06 +02:00
"the video is too big to sync, skipping for now",
"video is too long to process",
2018-08-02 14:05:06 +02:00
}
if ok && !sv.Published && util.SubstringInSlice(sv.FailureReason, neverRetryFailures) {
log.Println(v.ID() + " can't ever be published")
return nil
2017-10-11 04:02:16 +02:00
}
if alreadyPublished {
log.Println(v.ID() + " already published")
return nil
2017-10-11 04:02:16 +02:00
}
2018-09-26 06:08:18 +02:00
if v.PlaylistPosition() > s.Manager.videosLimit {
2018-04-25 23:06:17 +02:00
log.Println(v.ID() + " is old: skipping")
return nil
}
err = s.Manager.checkUsedSpace()
if err != nil {
return err
}
2018-09-18 21:20:34 +02:00
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.maxVideoSize, s.namer, s.Manager.maxVideoLength)
2017-10-11 04:02:16 +02:00
if err != nil {
return err
}
2018-09-18 21:20:34 +02:00
2018-09-26 06:08:18 +02:00
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
2017-10-11 04:02:16 +02:00
if err != nil {
2018-08-14 17:09:23 +02:00
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
2017-10-11 04:02:16 +02:00
}
2018-08-14 17:09:23 +02:00
s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName)
2017-10-11 04:02:16 +02:00
return nil
}
func startDaemonViaSystemd() error {
err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "start", "lbrynet.service").Run()
if err != nil {
return errors.Err(err)
}
return nil
}
func stopDaemonViaSystemd() error {
err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "stop", "lbrynet.service").Run()
if err != nil {
return errors.Err(err)
}
return nil
}
// waitForDaemonProcess observes the running processes and returns when the process is no longer running or when the timeout is up
func waitForDaemonProcess(timeout time.Duration) error {
processes, err := ps.Processes()
if err != nil {
return err
}
var daemonProcessId = -1
for _, p := range processes {
if p.Executable() == "lbrynet-daemon" {
daemonProcessId = p.Pid()
break
}
}
if daemonProcessId == -1 {
return nil
}
then := time.Now()
stopTime := then.Add(time.Duration(timeout * time.Second))
for !time.Now().After(stopTime) {
wait := 10 * time.Second
log.Println("the daemon is still running, waiting for it to exit")
time.Sleep(wait)
proc, err := os.FindProcess(daemonProcessId)
if err != nil {
// couldn't find the process, that means the daemon is stopped and can continue
return nil
}
//double check if process is running and alive
//by sending a signal 0
//NOTE : syscall.Signal is not available in Windows
err = proc.Signal(syscall.Signal(0))
//the process doesn't exist anymore! we're free to go
2018-04-24 20:50:01 +02:00
if err != nil && (err == syscall.ESRCH || err.Error() == "os: process already finished") {
return nil
}
}
return errors.Err("timeout reached")
}