From aa401b2ff39a5cba0a69fa0d3009e6540f0ed568 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Mon, 8 Oct 2018 16:26:45 -0400 Subject: [PATCH] split out ytsync to https://github.com/lbryio/ytsync --- Gopkg.lock | 175 +++---- Gopkg.toml | 13 +- cmd/count.go | 38 -- cmd/ytsync.go | 184 ------- ytsync/README.md | 13 - ytsync/count.go | 32 -- ytsync/manager.go | 227 --------- ytsync/namer/names.go | 83 ---- ytsync/sdk/api.go | 170 ------- ytsync/setup.go | 293 ------------ ytsync/sources/shared.go | 27 -- ytsync/sources/ucbVideo.go | 217 --------- ytsync/sources/youtubeVideo.go | 294 ------------ ytsync/splitter.py | 152 ------ ytsync/ytsync.go | 848 --------------------------------- 15 files changed, 78 insertions(+), 2688 deletions(-) delete mode 100644 cmd/count.go delete mode 100644 cmd/ytsync.go delete mode 100644 ytsync/README.md delete mode 100644 ytsync/count.go delete mode 100644 ytsync/manager.go delete mode 100644 ytsync/namer/names.go delete mode 100644 ytsync/sdk/api.go delete mode 100644 ytsync/setup.go delete mode 100644 ytsync/sources/shared.go delete mode 100644 ytsync/sources/ucbVideo.go delete mode 100644 ytsync/sources/youtubeVideo.go delete mode 100644 ytsync/splitter.py delete mode 100644 ytsync/ytsync.go diff --git a/Gopkg.lock b/Gopkg.lock index 2a72ba3..5f50686 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1,58 +1,6 @@ # This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. -[[projects]] - digest = "1:9a88883f474d09f1da61894cd8115c7f33988d6941e4f6236324c777aaff8f2c" - name = "github.com/PuerkitoBio/goquery" - packages = ["."] - pruneopts = "" - revision = "dc2ec5c7ca4d9aae063b79b9f581dd3ea6afd2b2" - version = "v1.4.1" - -[[projects]] - digest = "1:e3726ad6f38f710e84c8dcd0e830014de6eaeea81f28d91ae898afecc078479a" - name = "github.com/andybalholm/cascadia" - packages = ["."] - pruneopts = "" - revision = "901648c87902174f774fac311d7f176f8647bdaa" - version = "v1.0.0" - -[[projects]] - digest = "1:261d95f4464744d542759a7a33846f56f24113f5a93c7577f4cd7044f7cb3d76" - name = "github.com/aws/aws-sdk-go" - packages = [ - "aws", - "aws/awserr", - "aws/awsutil", - "aws/client", - "aws/client/metadata", - "aws/corehandlers", - "aws/credentials", - "aws/credentials/ec2rolecreds", - "aws/credentials/endpointcreds", - "aws/credentials/stscreds", - "aws/defaults", - "aws/ec2metadata", - "aws/endpoints", - "aws/request", - "aws/session", - "aws/signer/v4", - "internal/shareddefaults", - "private/protocol", - "private/protocol/query", - "private/protocol/query/queryutil", - "private/protocol/rest", - "private/protocol/restxml", - "private/protocol/xml/xmlutil", - "service/s3", - "service/s3/s3iface", - "service/s3/s3manager", - "service/sts", - ] - pruneopts = "" - revision = "b69f447375c7fa0047ebcdd8ae5d585d5aac2f71" - version = "v1.10.51" - [[projects]] branch = "master" digest = "1:cc8ebf0c6745d09f728f1fa4fbd29baaa2e3a65efb49b5fefb0c163171ee7863" @@ -131,7 +79,13 @@ [[projects]] digest = "1:f958a1c137db276e52f0b50efee41a1a389dcdded59a69711f3e872757dab34b" name = "github.com/golang/protobuf" - packages = ["proto"] + packages = [ + "proto", + "ptypes", + "ptypes/any", + "ptypes/duration", + "ptypes/timestamp", + ] pruneopts = "" revision = "b4deda0973fb4c70b50d226b1af49f3da59f5265" version = "v1.1.0" @@ -152,13 +106,6 @@ revision = "76626ae9c91c4f2a10f34cad8ce83ea42c93bb75" version = "v1.0" -[[projects]] - digest = "1:6f49eae0c1e5dab1dafafee34b207aeb7a42303105960944828c2079b92fc88e" - name = "github.com/jmespath/go-jmespath" - packages = ["."] - pruneopts = "" - revision = "0b12d6b5" - [[projects]] branch = "master" digest = "1:d261f80387a38eeddc1d819ee9ee56d37ca10fc02e6e09ff400fb0ce146e13dc" @@ -175,14 +122,6 @@ revision = "d1008ad1fd04ceb5faedaf34881df0c504382706" version = "v3.1" -[[projects]] - branch = "master" - digest = "1:1dee6133ab829c8559a39031ad1e0e3538e4a7b34d3e0509d1fc247737e928c1" - name = "github.com/mitchellh/go-ps" - packages = ["."] - pruneopts = "" - revision = "4fdf99ab29366514c69ccccddab5dc58b8d84062" - [[projects]] branch = "master" digest = "1:eb9117392ee8e7aa44f78e0db603f70b1050ee0ebda4bd40040befb5b218c546" @@ -199,14 +138,6 @@ revision = "8ab4d0b364ef1e9af5d102531da20d5ec902b6c4" version = "v0.2.0" -[[projects]] - branch = "master" - digest = "1:8d6d81d0d9d8153e65d637bda77a7c4e6ba496c61efac3578d7d8c981ac31a7b" - name = "github.com/rylio/ytdl" - packages = ["."] - pruneopts = "" - revision = "06f6510946275931157f5fe73f55ec7d6fd65870" - [[projects]] branch = "master" digest = "1:67b7dcb3b7e67cb6f96fb38fe7358bc1210453189da210e40cf357a92d57c1c1" @@ -281,9 +212,12 @@ name = "golang.org/x/net" packages = [ "context", - "context/ctxhttp", - "html", - "html/atom", + "http/httpguts", + "http2", + "http2/hpack", + "idna", + "internal/timeseries", + "trace", ] pruneopts = "" revision = "db08ff08e8622530d9ed3a0e8ac279f6d4c02196" @@ -300,18 +234,70 @@ revision = "bff228c7b664c5fce602223a05fb708fd8654986" [[projects]] - branch = "master" - digest = "1:b064108d68f82d0201d9f812297c928e57488e82ccdb77ed06ac69f64519a890" - name = "google.golang.org/api" + digest = "1:5acd3512b047305d49e8763eef7ba423901e85d5dd2fd1e71778a0ea8de10bd4" + name = "golang.org/x/text" packages = [ - "gensupport", - "googleapi", - "googleapi/internal/uritemplates", - "googleapi/transport", - "youtube/v3", + "collate", + "collate/build", + "internal/colltab", + "internal/gen", + "internal/tag", + "internal/triegen", + "internal/ucd", + "language", + "secure/bidirule", + "transform", + "unicode/bidi", + "unicode/cldr", + "unicode/norm", + "unicode/rangetable", ] pruneopts = "" - revision = "ef86ce4234efee96020bde00391d6a9cfae66561" + revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0" + version = "v0.3.0" + +[[projects]] + branch = "master" + digest = "1:1b3b4ec811695907c4a3cb92e4f32834a4a42459bff7e02068b6b2b5344803cd" + name = "google.golang.org/genproto" + packages = ["googleapis/rpc/status"] + pruneopts = "" + revision = "af9cb2a35e7f169ec875002c1829c9b315cddc04" + +[[projects]] + digest = "1:15656947b87a6a240e61dcfae9e71a55a8d5677f240d12ab48f02cdbabf1e309" + name = "google.golang.org/grpc" + packages = [ + ".", + "balancer", + "balancer/base", + "balancer/roundrobin", + "codes", + "connectivity", + "credentials", + "encoding", + "encoding/proto", + "grpclog", + "internal", + "internal/backoff", + "internal/channelz", + "internal/envconfig", + "internal/grpcrand", + "internal/transport", + "keepalive", + "metadata", + "naming", + "peer", + "resolver", + "resolver/dns", + "resolver/passthrough", + "stats", + "status", + "tap", + ] + pruneopts = "" + revision = "8dea3dc473e90c8179e519d91302d0597c0ca1d1" + version = "v1.15.0" [[projects]] digest = "1:f771bf87a3253de520c2af6fb6e75314dce0fedc0b30b208134fe502932bb15d" @@ -325,12 +311,6 @@ analyzer-name = "dep" analyzer-version = 1 input-imports = [ - "github.com/aws/aws-sdk-go/aws", - "github.com/aws/aws-sdk-go/aws/awserr", - "github.com/aws/aws-sdk-go/aws/credentials", - "github.com/aws/aws-sdk-go/aws/session", - "github.com/aws/aws-sdk-go/service/s3", - "github.com/aws/aws-sdk-go/service/s3/s3manager", "github.com/btcsuite/btcd/chaincfg", "github.com/btcsuite/btcd/chaincfg/chainhash", "github.com/btcsuite/btcd/rpcclient", @@ -339,12 +319,11 @@ "github.com/davecgh/go-spew/spew", "github.com/go-errors/errors", "github.com/go-ini/ini", + "github.com/golang/protobuf/proto", "github.com/lbryio/lbryschema.go/pb", "github.com/lbryio/ozzo-validation", - "github.com/mitchellh/go-ps", "github.com/mitchellh/mapstructure", "github.com/nlopes/slack", - "github.com/rylio/ytdl", "github.com/shopspring/decimal", "github.com/sirupsen/logrus", "github.com/spf13/cast", @@ -353,8 +332,8 @@ "github.com/zeebo/bencode", "golang.org/x/crypto/ripemd160", "golang.org/x/crypto/sha3", - "google.golang.org/api/googleapi/transport", - "google.golang.org/api/youtube/v3", + "golang.org/x/net/context", + "google.golang.org/grpc", "gopkg.in/nullbio/null.v6/convert", ] solver-name = "gps-cdcl" diff --git a/Gopkg.toml b/Gopkg.toml index d1e4711..9b0e8c2 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -6,10 +6,6 @@ name = "github.com/go-errors/errors" version = "1.0.0" -[[constraint]] - branch = "master" - name = "github.com/rylio/ytdl" - [[constraint]] branch = "master" name = "github.com/lbryio/lbryschema.go" @@ -42,10 +38,6 @@ branch = "master" name = "github.com/zeebo/bencode" -[[constraint]] - branch = "master" - name = "google.golang.org/api" - [[constraint]] branch = "master" name = "github.com/btcsuite/btcd" @@ -56,7 +48,4 @@ [[constraint]] branch = "master" - name = "github.com/btcsuite/btcutil" -[[constraint]] - name = "github.com/aws/aws-sdk-go" - version = "^1.10.51" + name = "github.com/btcsuite/btcutil" \ No newline at end of file diff --git a/cmd/count.go b/cmd/count.go deleted file mode 100644 index 77f6952..0000000 --- a/cmd/count.go +++ /dev/null @@ -1,38 +0,0 @@ -package cmd - -import ( - sync "github.com/lbryio/lbry.go/ytsync" - "github.com/lbryio/lbry.go/ytsync/sdk" - - log "github.com/sirupsen/logrus" - "github.com/spf13/cobra" -) - -func init() { - var ytCountCmd = &cobra.Command{ - Use: "ytcount ", - Args: cobra.ExactArgs(2), - Short: "Count videos in a youtube channel", - Run: ytcount, - } - RootCmd.AddCommand(ytCountCmd) -} - -func ytcount(cmd *cobra.Command, args []string) { - ytAPIKey := args[0] - channelID := args[1] - - s := sync.Sync{ - APIConfig: &sdk.APIConfig{ - YoutubeAPIKey: ytAPIKey, - }, - YoutubeChannelID: channelID, - } - - count, err := s.CountVideos() - if err != nil { - panic(err) - } - - log.Printf("%d videos in channel %s\n", count, channelID) -} diff --git a/cmd/ytsync.go b/cmd/ytsync.go deleted file mode 100644 index affd9fe..0000000 --- a/cmd/ytsync.go +++ /dev/null @@ -1,184 +0,0 @@ -package cmd - -import ( - "os" - - "time" - - "os/user" - - "github.com/lbryio/lbry.go/util" - sync "github.com/lbryio/lbry.go/ytsync" - "github.com/lbryio/lbry.go/ytsync/sdk" - log "github.com/sirupsen/logrus" - "github.com/spf13/cobra" -) - -const defaultMaxTries = 3 - -var ( - stopOnError bool - maxTries int - takeOverExistingChannel bool - refill int - limit int - skipSpaceCheck bool - syncUpdate bool - singleRun bool - syncStatus string - channelID string - syncFrom int64 - syncUntil int64 - concurrentJobs int - videosLimit int - maxVideoSize int -) - -func init() { - var ytSyncCmd = &cobra.Command{ - Use: "ytsync", - Args: cobra.RangeArgs(0, 0), - Short: "Publish youtube channels into LBRY network automatically.", - Run: ytSync, - } - ytSyncCmd.Flags().BoolVar(&stopOnError, "stop-on-error", false, "If a publish fails, stop all publishing and exit") - ytSyncCmd.Flags().IntVar(&maxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails") - ytSyncCmd.Flags().BoolVar(&takeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel") - ytSyncCmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync") - ytSyncCmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup") - ytSyncCmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones") - ytSyncCmd.Flags().BoolVar(&singleRun, "run-once", false, "Whether the process should be stopped after one cycle or not") - ytSyncCmd.Flags().StringVar(&syncStatus, "status", "", "Specify which queue to pull from. Overrides --update") - ytSyncCmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.") - ytSyncCmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)") - ytSyncCmd.Flags().Int64Var(&syncUntil, "before", time.Now().Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)") - ytSyncCmd.Flags().IntVar(&concurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently") - ytSyncCmd.Flags().IntVar(&videosLimit, "videos-limit", 1000, "how many videos to process per channel") - ytSyncCmd.Flags().IntVar(&maxVideoSize, "max-size", 2048, "Maximum video size to process (in MB)") - - RootCmd.AddCommand(ytSyncCmd) -} - -func ytSync(cmd *cobra.Command, args []string) { - var hostname string - slackToken := os.Getenv("SLACK_TOKEN") - if slackToken == "" { - log.Error("A slack token was not present in env vars! Slack messages disabled!") - } else { - var err error - hostname, err = os.Hostname() - if err != nil { - log.Error("could not detect system hostname") - hostname = "ytsync-unknown" - } - util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname) - } - - if syncStatus != "" && !util.InSlice(syncStatus, sync.SyncStatuses) { - log.Errorf("status must be one of the following: %v\n", sync.SyncStatuses) - return - } - - if stopOnError && maxTries != defaultMaxTries { - log.Errorln("--stop-on-error and --max-tries are mutually exclusive") - return - } - if maxTries < 1 { - log.Errorln("setting --max-tries less than 1 doesn't make sense") - return - } - - if limit < 0 { - log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense") - return - } - - apiURL := os.Getenv("LBRY_API") - apiToken := os.Getenv("LBRY_API_TOKEN") - youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY") - blobsDir := os.Getenv("BLOBS_DIRECTORY") - lbrycrdString := os.Getenv("LBRYCRD_STRING") - awsS3ID := os.Getenv("AWS_S3_ID") - awsS3Secret := os.Getenv("AWS_S3_SECRET") - awsS3Region := os.Getenv("AWS_S3_REGION") - awsS3Bucket := os.Getenv("AWS_S3_BUCKET") - if apiURL == "" { - log.Errorln("An API URL was not defined. Please set the environment variable LBRY_API") - return - } - if apiToken == "" { - log.Errorln("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN") - return - } - if youtubeAPIKey == "" { - log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY") - return - } - if awsS3ID == "" { - log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID") - return - } - if awsS3Secret == "" { - log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET") - return - } - if awsS3Region == "" { - log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION") - return - } - if awsS3Bucket == "" { - log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET") - return - } - if lbrycrdString == "" { - log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else") - } - if blobsDir == "" { - usr, err := user.Current() - if err != nil { - log.Errorln(err.Error()) - return - } - blobsDir = usr.HomeDir + "/.lbrynet/blobfiles/" - } - - syncProperties := &sdk.SyncProperties{ - SyncFrom: syncFrom, - SyncUntil: syncUntil, - YoutubeChannelID: channelID, - } - apiConfig := &sdk.APIConfig{ - YoutubeAPIKey: youtubeAPIKey, - ApiURL: apiURL, - ApiToken: apiToken, - HostName: hostname, - } - sm := sync.NewSyncManager( - stopOnError, - maxTries, - takeOverExistingChannel, - refill, - limit, - skipSpaceCheck, - syncUpdate, - concurrentJobs, - concurrentJobs, - blobsDir, - videosLimit, - maxVideoSize, - lbrycrdString, - awsS3ID, - awsS3Secret, - awsS3Region, - awsS3Bucket, - syncStatus, - singleRun, - syncProperties, - apiConfig, - ) - err := sm.Start() - if err != nil { - sync.SendErrorToSlack(err.Error()) - } - sync.SendInfoToSlack("Syncing process terminated!") -} diff --git a/ytsync/README.md b/ytsync/README.md deleted file mode 100644 index be2b461..0000000 --- a/ytsync/README.md +++ /dev/null @@ -1,13 +0,0 @@ -# YT Sync Process - -- make sure you don't have a `.lbryum/wallets/default_wallet` - - delete existing wallet if there's nothing you need there, or better yet, move it somewhere else in case you need it later -- make sure daemon is stopped and can be controlled with `systemctl` -- run `lbry ytsync YOUTUBE_KEY LBRY_CHANNEL_NAME YOUTUBE_CHANNEL_ID` -- after sync is complete, daemon will be stopped and wallet will be moved to `~/wallets/` -- now mark content as synced in doc - -Running the sync command for a channel that was already started will resume the sync. This can also be used to update a channel with new -content that was put on Youtube since the last sync. - ---- diff --git a/ytsync/count.go b/ytsync/count.go deleted file mode 100644 index dbba141..0000000 --- a/ytsync/count.go +++ /dev/null @@ -1,32 +0,0 @@ -package ytsync - -import ( - "net/http" - - "github.com/lbryio/lbry.go/errors" - - "google.golang.org/api/googleapi/transport" - "google.golang.org/api/youtube/v3" -) - -func (s *Sync) CountVideos() (uint64, error) { - client := &http.Client{ - Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey}, - } - - service, err := youtube.New(client) - if err != nil { - return 0, errors.Prefix("error creating YouTube service", err) - } - - response, err := service.Channels.List("statistics").Id(s.YoutubeChannelID).Do() - if err != nil { - return 0, errors.Prefix("error getting channels", err) - } - - if len(response.Items) < 1 { - return 0, errors.Err("youtube channel not found") - } - - return response.Items[0].Statistics.VideoCount, nil -} diff --git a/ytsync/manager.go b/ytsync/manager.go deleted file mode 100644 index de9e67f..0000000 --- a/ytsync/manager.go +++ /dev/null @@ -1,227 +0,0 @@ -package ytsync - -import ( - "fmt" - "strings" - "syscall" - "time" - - "github.com/lbryio/lbry.go/errors" - "github.com/lbryio/lbry.go/util" - "github.com/lbryio/lbry.go/ytsync/namer" - "github.com/lbryio/lbry.go/ytsync/sdk" - log "github.com/sirupsen/logrus" -) - -type SyncManager struct { - stopOnError bool - maxTries int - takeOverExistingChannel bool - refill int - limit int - skipSpaceCheck bool - syncUpdate bool - concurrentJobs int - concurrentVideos int - blobsDir string - videosLimit int - maxVideoSize int - lbrycrdString string - awsS3ID string - awsS3Secret string - awsS3Region string - syncStatus string - awsS3Bucket string - singleRun bool - syncProperties *sdk.SyncProperties - apiConfig *sdk.APIConfig - namer *namer.Namer -} - -func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool, refill int, limit int, - skipSpaceCheck bool, syncUpdate bool, concurrentJobs int, concurrentVideos int, blobsDir string, videosLimit int, - maxVideoSize int, lbrycrdString string, awsS3ID string, awsS3Secret string, awsS3Region string, awsS3Bucket string, - syncStatus string, singleRun bool, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig) *SyncManager { - return &SyncManager{ - stopOnError: stopOnError, - maxTries: maxTries, - takeOverExistingChannel: takeOverExistingChannel, - refill: refill, - limit: limit, - skipSpaceCheck: skipSpaceCheck, - syncUpdate: syncUpdate, - concurrentJobs: concurrentJobs, - concurrentVideos: concurrentVideos, - blobsDir: blobsDir, - videosLimit: videosLimit, - maxVideoSize: maxVideoSize, - lbrycrdString: lbrycrdString, - awsS3ID: awsS3ID, - awsS3Secret: awsS3Secret, - awsS3Region: awsS3Region, - awsS3Bucket: awsS3Bucket, - syncStatus: syncStatus, - singleRun: singleRun, - syncProperties: syncProperties, - apiConfig: apiConfig, - namer: namer.NewNamer(), - } -} - -const ( - StatusPending = "pending" // waiting for permission to sync - StatusQueued = "queued" // in sync queue. will be synced soon - StatusSyncing = "syncing" // syncing now - StatusSynced = "synced" // done - StatusFailed = "failed" - StatusFinalized = "finalized" // no more changes allowed -) - -var SyncStatuses = []string{StatusPending, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized} - -const ( - VideoStatusPublished = "published" - VideoStatusFailed = "failed" -) - -func (s *SyncManager) Start() error { - - syncCount := 0 - for { - err := s.checkUsedSpace() - if err != nil { - return err - } - - var syncs []Sync - shouldInterruptLoop := false - - isSingleChannelSync := s.syncProperties.YoutubeChannelID != "" - if isSingleChannelSync { - channels, err := s.apiConfig.FetchChannels("", s.syncProperties) - if err != nil { - return err - } - if len(channels) != 1 { - return errors.Err("Expected 1 channel, %d returned", len(channels)) - } - lbryChannelName := channels[0].DesiredChannelName - syncs = make([]Sync, 1) - syncs[0] = Sync{ - APIConfig: s.apiConfig, - YoutubeChannelID: s.syncProperties.YoutubeChannelID, - LbryChannelName: lbryChannelName, - StopOnError: s.stopOnError, - MaxTries: s.maxTries, - ConcurrentVideos: s.concurrentVideos, - TakeOverExistingChannel: s.takeOverExistingChannel, - Refill: s.refill, - Manager: s, - LbrycrdString: s.lbrycrdString, - AwsS3ID: s.awsS3ID, - AwsS3Secret: s.awsS3Secret, - AwsS3Region: s.awsS3Region, - AwsS3Bucket: s.awsS3Bucket, - namer: s.namer, - } - shouldInterruptLoop = true - } else { - var queuesToSync []string - if s.syncStatus != "" { - queuesToSync = append(queuesToSync, s.syncStatus) - } else if s.syncUpdate { - queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced) - } else { - queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued) - } - for _, q := range queuesToSync { - channels, err := s.apiConfig.FetchChannels(q, s.syncProperties) - if err != nil { - return err - } - for _, c := range channels { - syncs = append(syncs, Sync{ - APIConfig: s.apiConfig, - YoutubeChannelID: c.ChannelId, - LbryChannelName: c.DesiredChannelName, - StopOnError: s.stopOnError, - MaxTries: s.maxTries, - ConcurrentVideos: s.concurrentVideos, - TakeOverExistingChannel: s.takeOverExistingChannel, - Refill: s.refill, - Manager: s, - LbrycrdString: s.lbrycrdString, - AwsS3ID: s.awsS3ID, - AwsS3Secret: s.awsS3Secret, - AwsS3Region: s.awsS3Region, - AwsS3Bucket: s.awsS3Bucket, - }) - } - } - } - if len(syncs) == 0 { - log.Infoln("No channels to sync. Pausing 5 minutes!") - time.Sleep(5 * time.Minute) - } - for i, sync := range syncs { - shouldNotCount := false - SendInfoToSlack("Syncing %s (%s) to LBRY! (iteration %d/%d - total processed channels: %d)", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1) - err := sync.FullCycle() - if err != nil { - fatalErrors := []string{ - "default_wallet already exists", - "WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR", - "NotEnoughFunds", - "no space left on device", - "failure uploading wallet", - } - if util.SubstringInSlice(err.Error(), fatalErrors) { - return errors.Prefix("@Nikooo777 this requires manual intervention! Exiting...", err) - } - shouldNotCount = strings.Contains(err.Error(), "this youtube channel is being managed by another server") - if !shouldNotCount { - SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error()) - } - } - SendInfoToSlack("Syncing %s (%s) reached an end. (iteration %d/%d - total processed channels: %d)", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1) - if !shouldNotCount { - syncCount++ - } - if sync.IsInterrupted() || (s.limit != 0 && syncCount >= s.limit) { - shouldInterruptLoop = true - break - } - } - if shouldInterruptLoop || s.singleRun { - break - } - } - return nil -} - -func (s *SyncManager) checkUsedSpace() error { - usedPctile, err := GetUsedSpace(s.blobsDir) - if err != nil { - return err - } - if usedPctile >= 0.90 && !s.skipSpaceCheck { - return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100)) - } - log.Infof("disk usage: %.1f%%", usedPctile*100) - return nil -} - -// GetUsedSpace returns a value between 0 and 1, with 0 being completely empty and 1 being full, for the disk that holds the provided path -func GetUsedSpace(path string) (float32, error) { - var stat syscall.Statfs_t - err := syscall.Statfs(path, &stat) - if err != nil { - return 0, err - } - // Available blocks * size per block = available space in bytes - all := stat.Blocks * uint64(stat.Bsize) - free := stat.Bfree * uint64(stat.Bsize) - used := all - free - - return float32(used) / float32(all), nil -} diff --git a/ytsync/namer/names.go b/ytsync/namer/names.go deleted file mode 100644 index 531bb67..0000000 --- a/ytsync/namer/names.go +++ /dev/null @@ -1,83 +0,0 @@ -package namer - -import ( - "crypto/md5" - "encoding/hex" - "fmt" - "regexp" - "strconv" - "strings" - "sync" -) - -var titleRegexp = regexp.MustCompile(`[^a-zA-Z0-9]+`) - -type Namer struct { - mu *sync.Mutex - names map[string]bool -} - -func NewNamer() *Namer { - return &Namer{ - mu: &sync.Mutex{}, - names: make(map[string]bool), - } -} - -func (n *Namer) SetNames(names map[string]bool) { - n.names = names -} - -func (n *Namer) GetNextName(prefix string) string { - n.mu.Lock() - defer n.mu.Unlock() - - attempt := 1 - var name string - for { - name = getClaimNameFromTitle(prefix, attempt) - if _, exists := n.names[name]; !exists { - break - } - attempt++ - } - - //if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash - if len(name) < 2 { - sum := md5.Sum([]byte(prefix)) - name = fmt.Sprintf("%s-%d", hex.EncodeToString(sum[:])[:15], attempt) - } - - n.names[name] = true - - return name -} - -// TODO: clean this up some -func getClaimNameFromTitle(title string, attempt int) string { - suffix := "" - if attempt > 1 { - suffix = "-" + strconv.Itoa(attempt) - } - maxLen := 40 - len(suffix) - - chunks := strings.Split(strings.ToLower(strings.Trim(titleRegexp.ReplaceAllString(title, "-"), "-")), "-") - - name := chunks[0] - if len(name) > maxLen { - return name[:maxLen] - } - - for _, chunk := range chunks[1:] { - tmpName := name + "-" + chunk - if len(tmpName) > maxLen { - if len(name) < 20 { - name = tmpName[:maxLen] - } - break - } - name = tmpName - } - - return name + suffix -} diff --git a/ytsync/sdk/api.go b/ytsync/sdk/api.go deleted file mode 100644 index 1223b0f..0000000 --- a/ytsync/sdk/api.go +++ /dev/null @@ -1,170 +0,0 @@ -package sdk - -import ( - "encoding/json" - "io/ioutil" - "log" - "net/http" - "net/url" - "strconv" - "time" - - "github.com/lbryio/lbry.go/errors" - "github.com/lbryio/lbry.go/null" -) - -const ( - MaxReasonLength = 500 -) - -type APIConfig struct { - YoutubeAPIKey string - ApiURL string - ApiToken string - HostName string -} - -type SyncProperties struct { - SyncFrom int64 - SyncUntil int64 - YoutubeChannelID string -} - -type YoutubeChannel struct { - ChannelId string `json:"channel_id"` - TotalVideos uint `json:"total_videos"` - DesiredChannelName string `json:"desired_channel_name"` - SyncServer null.String `json:"sync_server"` - Fee *struct { - Amount string `json:"amount"` - Address string `json:"address"` - Currency string `json:"currency"` - } `json:"fee"` -} - -func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeChannel, error) { - type apiJobsResponse struct { - Success bool `json:"success"` - Error null.String `json:"error"` - Data []YoutubeChannel `json:"data"` - } - endpoint := a.ApiURL + "/yt/jobs" - res, _ := http.PostForm(endpoint, url.Values{ - "auth_token": {a.ApiToken}, - "sync_status": {status}, - "min_videos": {strconv.Itoa(1)}, - "after": {strconv.Itoa(int(cp.SyncFrom))}, - "before": {strconv.Itoa(int(cp.SyncUntil))}, - "sync_server": {a.HostName}, - "channel_id": {cp.YoutubeChannelID}, - }) - defer res.Body.Close() - body, _ := ioutil.ReadAll(res.Body) - var response apiJobsResponse - err := json.Unmarshal(body, &response) - if err != nil { - return nil, err - } - if response.Data == nil { - return nil, errors.Err(response.Error) - } - log.Printf("Fetched channels: %d", len(response.Data)) - return response.Data, nil -} - -type SyncedVideo struct { - VideoID string `json:"video_id"` - Published bool `json:"published"` - FailureReason string `json:"failure_reason"` - ClaimName string `json:"claim_name"` -} - -func (a *APIConfig) SetChannelStatus(channelID string, status string, failureReason string) (map[string]SyncedVideo, map[string]bool, error) { - type apiChannelStatusResponse struct { - Success bool `json:"success"` - Error null.String `json:"error"` - Data []SyncedVideo `json:"data"` - } - endpoint := a.ApiURL + "/yt/channel_status" - if len(failureReason) > MaxReasonLength { - failureReason = failureReason[:MaxReasonLength] - } - res, _ := http.PostForm(endpoint, url.Values{ - "channel_id": {channelID}, - "sync_server": {a.HostName}, - "auth_token": {a.ApiToken}, - "sync_status": {status}, - "failure_reason": {failureReason}, - }) - defer res.Body.Close() - body, _ := ioutil.ReadAll(res.Body) - var response apiChannelStatusResponse - err := json.Unmarshal(body, &response) - if err != nil { - return nil, nil, err - } - if !response.Error.IsNull() { - return nil, nil, errors.Err(response.Error.String) - } - if response.Data != nil { - svs := make(map[string]SyncedVideo) - claimNames := make(map[string]bool) - for _, v := range response.Data { - svs[v.VideoID] = v - claimNames[v.ClaimName] = v.Published - } - return svs, claimNames, nil - } - return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode) -} - -const ( - VideoStatusPublished = "published" - VideoStatusFailed = "failed" -) - -func (a *APIConfig) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error { - endpoint := a.ApiURL + "/yt/video_status" - if len(failureReason) > MaxReasonLength { - failureReason = failureReason[:MaxReasonLength] - } - vals := url.Values{ - "youtube_channel_id": {channelID}, - "video_id": {videoID}, - "status": {status}, - "auth_token": {a.ApiToken}, - } - if status == VideoStatusPublished { - if claimID == "" || claimName == "" { - return errors.Err("claimID or claimName missing") - } - vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10)) - vals.Add("claim_id", claimID) - vals.Add("claim_name", claimName) - if size != nil { - vals.Add("size", strconv.FormatInt(*size, 10)) - } - } - if failureReason != "" { - vals.Add("failure_reason", failureReason) - } - res, _ := http.PostForm(endpoint, vals) - defer res.Body.Close() - body, _ := ioutil.ReadAll(res.Body) - var response struct { - Success bool `json:"success"` - Error null.String `json:"error"` - Data null.String `json:"data"` - } - err := json.Unmarshal(body, &response) - if err != nil { - return err - } - if !response.Error.IsNull() { - return errors.Err(response.Error.String) - } - if !response.Data.IsNull() && response.Data.String == "ok" { - return nil - } - return errors.Err("invalid API response. Status code: %d", res.StatusCode) -} diff --git a/ytsync/setup.go b/ytsync/setup.go deleted file mode 100644 index 05d2c18..0000000 --- a/ytsync/setup.go +++ /dev/null @@ -1,293 +0,0 @@ -package ytsync - -import ( - "strings" - "time" - - "github.com/lbryio/lbry.go/errors" - "github.com/lbryio/lbry.go/jsonrpc" - "github.com/lbryio/lbry.go/lbrycrd" - - "github.com/shopspring/decimal" - log "github.com/sirupsen/logrus" -) - -func (s *Sync) walletSetup() error { - //prevent unnecessary concurrent execution - s.walletMux.Lock() - defer s.walletMux.Unlock() - err := s.ensureChannelOwnership() - if err != nil { - return err - } - - balanceResp, err := s.daemon.WalletBalance() - if err != nil { - return err - } else if balanceResp == nil { - return errors.Err("no response") - } - balance := decimal.Decimal(*balanceResp) - log.Debugf("Starting balance is %s", balance.String()) - - var numOnSource int - if s.LbryChannelName == "@UCBerkeley" { - numOnSource = 10104 - } else { - n, err := s.CountVideos() - if err != nil { - return err - } - numOnSource = int(n) - } - log.Debugf("Source channel has %d videos", numOnSource) - if numOnSource == 0 { - return nil - } - - s.syncedVideosMux.RLock() - numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones... - s.syncedVideosMux.RUnlock() - log.Debugf("We already allocated credits for %d videos", numPublished) - - if numOnSource-numPublished > s.Manager.videosLimit { - numOnSource = s.Manager.videosLimit - } - - minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount - if numPublished > numOnSource && balance.LessThan(decimal.NewFromFloat(1)) { - SendErrorToSlack("something is going on as we published more videos than those available on source: %d/%d", numPublished, numOnSource) - minBalance = 1 //since we ended up in this function it means some juice is still needed - } - amountToAdd, _ := decimal.NewFromFloat(minBalance).Sub(balance).Float64() - - if s.Refill > 0 { - if amountToAdd < 0 { - amountToAdd = float64(s.Refill) - } else { - amountToAdd += float64(s.Refill) - } - } - - if amountToAdd > 0 { - if amountToAdd < 1 { - amountToAdd = 1 // no reason to bother adding less than 1 credit - } - s.addCredits(amountToAdd) - } - - claimAddress, err := s.daemon.WalletUnusedAddress() - if err != nil { - return err - } else if claimAddress == nil { - return errors.Err("could not get unused address") - } - s.claimAddress = string(*claimAddress) - if s.claimAddress == "" { - return errors.Err("found blank claim address") - } - - err = s.ensureEnoughUTXOs() - if err != nil { - return err - } - - return nil -} - -func (s *Sync) ensureEnoughUTXOs() error { - utxolist, err := s.daemon.UTXOList() - if err != nil { - return err - } else if utxolist == nil { - return errors.Err("no response") - } - - target := 40 - slack := int(float32(0.1) * float32(target)) - count := 0 - - for _, utxo := range *utxolist { - if !utxo.IsClaim && !utxo.IsSupport && !utxo.IsUpdate && utxo.Amount.Cmp(decimal.New(0, 0)) == 1 { - count++ - } - } - - if count < target-slack { - newAddresses := target - count - - balance, err := s.daemon.WalletBalance() - if err != nil { - return err - } else if balance == nil { - return errors.Err("no response") - } - - log.Println("balance is " + decimal.Decimal(*balance).String()) - - amountPerAddress := decimal.Decimal(*balance).Div(decimal.NewFromFloat(float64(target))) - log.Infof("Putting %s credits into each of %d new addresses", amountPerAddress.String(), newAddresses) - prefillTx, err := s.daemon.WalletPrefillAddresses(newAddresses, amountPerAddress, true) - if err != nil { - return err - } else if prefillTx == nil { - return errors.Err("no response") - } - - err = s.waitForNewBlock() - if err != nil { - return err - } - } else if !allUTXOsConfirmed(utxolist) { - log.Println("Waiting for previous txns to confirm") - err := s.waitForNewBlock() - if err != nil { - return err - } - } - - return nil -} - -func (s *Sync) waitForNewBlock() error { - status, err := s.daemon.Status() - if err != nil { - return err - } - - for status.Wallet.Blocks == 0 || status.Wallet.BlocksBehind != 0 { - time.Sleep(5 * time.Second) - status, err = s.daemon.Status() - if err != nil { - return err - } - } - currentBlock := status.Wallet.Blocks - for i := 0; status.Wallet.Blocks <= currentBlock; i++ { - if i%3 == 0 { - log.Printf("Waiting for new block (%d)...", currentBlock+1) - } - time.Sleep(10 * time.Second) - status, err = s.daemon.Status() - if err != nil { - return err - } - } - return nil -} - -func (s *Sync) ensureChannelOwnership() error { - if s.LbryChannelName == "" { - return errors.Err("no channel name set") - } - - channels, err := s.daemon.ChannelList() - if err != nil { - return err - } else if channels == nil { - return errors.Err("no channel response") - } - - isChannelMine := false - for _, channel := range *channels { - if channel.Name == s.LbryChannelName { - s.lbryChannelID = channel.ClaimID - isChannelMine = true - } else { - return errors.Err("this wallet has multiple channels. maybe something went wrong during setup?") - } - } - if isChannelMine { - return nil - } - - resolveResp, err := s.daemon.Resolve(s.LbryChannelName) - if err != nil { - return err - } - - channel := (*resolveResp)[s.LbryChannelName] - channelBidAmount := channelClaimAmount - - channelNotFound := channel.Error != nil && strings.Contains(*(channel.Error), "cannot be resolved") - if !channelNotFound { - if !s.TakeOverExistingChannel { - return errors.Err("Channel exists and we don't own it. Pick another channel.") - } - log.Println("Channel exists and we don't own it. Outbidding existing claim.") - channelBidAmount, _ = channel.Certificate.Amount.Add(decimal.NewFromFloat(channelClaimAmount)).Float64() - } - - balanceResp, err := s.daemon.WalletBalance() - if err != nil { - return err - } else if balanceResp == nil { - return errors.Err("no response") - } - balance := decimal.Decimal(*balanceResp) - - if balance.LessThan(decimal.NewFromFloat(channelBidAmount)) { - s.addCredits(channelBidAmount + 0.1) - } - - c, err := s.daemon.ChannelNew(s.LbryChannelName, channelBidAmount) - if err != nil { - return err - } - s.lbryChannelID = c.ClaimID - return nil -} - -func allUTXOsConfirmed(utxolist *jsonrpc.UTXOListResponse) bool { - if utxolist == nil { - return false - } - - if len(*utxolist) < 1 { - return false - } - - for _, utxo := range *utxolist { - if utxo.Height == 0 { - return false - } - } - - return true -} - -func (s *Sync) addCredits(amountToAdd float64) error { - log.Printf("Adding %f credits", amountToAdd) - var lbrycrdd *lbrycrd.Client - var err error - if s.LbrycrdString == "" { - lbrycrdd, err = lbrycrd.NewWithDefaultURL() - if err != nil { - return err - } - } else { - lbrycrdd, err = lbrycrd.New(s.LbrycrdString) - if err != nil { - return err - } - } - - addressResp, err := s.daemon.WalletUnusedAddress() - if err != nil { - return err - } else if addressResp == nil { - return errors.Err("no response") - } - address := string(*addressResp) - - _, err = lbrycrdd.SimpleSend(address, amountToAdd) - if err != nil { - return err - } - - wait := 15 * time.Second - log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new transaction") - time.Sleep(wait) - - return nil -} diff --git a/ytsync/sources/shared.go b/ytsync/sources/shared.go deleted file mode 100644 index 96d5121..0000000 --- a/ytsync/sources/shared.go +++ /dev/null @@ -1,27 +0,0 @@ -package sources - -import ( - "strings" - - "github.com/lbryio/lbry.go/jsonrpc" - "github.com/lbryio/lbry.go/ytsync/namer" -) - -type SyncSummary struct { - ClaimID string - ClaimName string -} - -func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, namer *namer.Namer) (*SyncSummary, error) { - for { - name := namer.GetNextName(title) - response, err := daemon.Publish(name, filename, amount, options) - if err != nil { - if strings.Contains(err.Error(), "failed: Multiple claims (") { - continue - } - return nil, err - } - return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil - } -} diff --git a/ytsync/sources/ucbVideo.go b/ytsync/sources/ucbVideo.go deleted file mode 100644 index 57fdc33..0000000 --- a/ytsync/sources/ucbVideo.go +++ /dev/null @@ -1,217 +0,0 @@ -package sources - -import ( - "net/http" - "os" - "regexp" - "strconv" - "strings" - "sync" - "time" - - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/s3/s3manager" - "github.com/lbryio/lbry.go/errors" - "github.com/lbryio/lbry.go/jsonrpc" - "github.com/lbryio/lbry.go/ytsync/namer" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - log "github.com/sirupsen/logrus" -) - -type ucbVideo struct { - id string - title string - channel string - description string - publishedAt time.Time - dir string - claimNames map[string]bool - syncedVideosMux *sync.RWMutex -} - -func NewUCBVideo(id, title, channel, description, publishedAt, dir string) *ucbVideo { - p, _ := time.Parse(time.RFC3339Nano, publishedAt) // ignore parse errors - return &ucbVideo{ - id: id, - title: title, - description: description, - channel: channel, - dir: dir, - publishedAt: p, - } -} - -func (v *ucbVideo) ID() string { - return v.id -} - -func (v *ucbVideo) PlaylistPosition() int { - return 0 -} - -func (v *ucbVideo) IDAndNum() string { - return v.ID() + " (?)" -} - -func (v *ucbVideo) PublishedAt() time.Time { - return v.publishedAt - //r := regexp.MustCompile(`(\d\d\d\d)-(\d\d)-(\d\d)`) - //matches := r.FindStringSubmatch(v.title) - //if len(matches) > 0 { - // year, _ := strconv.Atoi(matches[1]) - // month, _ := strconv.Atoi(matches[2]) - // day, _ := strconv.Atoi(matches[3]) - // return time.Date(year, time.Month(month), day, 0, 0, 0, 0, time.UTC) - //} - //return time.Now() -} - -func (v *ucbVideo) getFilename() string { - return v.dir + "/" + v.id + ".mp4" -} - -func (v *ucbVideo) getClaimName(attempt int) string { - reg := regexp.MustCompile(`[^a-zA-Z0-9]+`) - suffix := "" - if attempt > 1 { - suffix = "-" + strconv.Itoa(attempt) - } - maxLen := 40 - len(suffix) - - chunks := strings.Split(strings.ToLower(strings.Trim(reg.ReplaceAllString(v.title, "-"), "-")), "-") - - name := chunks[0] - if len(name) > maxLen { - return name[:maxLen] - } - - for _, chunk := range chunks[1:] { - tmpName := name + "-" + chunk - if len(tmpName) > maxLen { - if len(name) < 20 { - name = tmpName[:maxLen] - } - break - } - name = tmpName - } - - return name + suffix -} - -func (v *ucbVideo) getAbbrevDescription() string { - maxLines := 10 - description := strings.TrimSpace(v.description) - if strings.Count(description, "\n") < maxLines { - return description - } - return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." -} - -func (v *ucbVideo) download() error { - videoPath := v.getFilename() - - _, err := os.Stat(videoPath) - if err != nil && !os.IsNotExist(err) { - return err - } else if err == nil { - log.Debugln(v.id + " already exists at " + videoPath) - return nil - } - - creds := credentials.NewStaticCredentials("ID-GOES-HERE", "SECRET-GOES-HERE", "") - s, err := session.NewSession(&aws.Config{Region: aws.String("us-east-2"), Credentials: creds}) - if err != nil { - return err - } - downloader := s3manager.NewDownloader(s) - - out, err := os.Create(videoPath) - if err != nil { - return err - } - defer out.Close() - - log.Println("lbry-niko2/videos/" + v.channel + "/" + v.id) - - bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{ - Bucket: aws.String("lbry-niko2"), - Key: aws.String("/videos/" + v.channel + "/" + v.id + ".mp4"), - }) - if err != nil { - return err - } else if bytesWritten == 0 { - return errors.Err("zero bytes written") - } - - return nil -} - -func (v *ucbVideo) saveThumbnail() error { - resp, err := http.Get("https://s3.us-east-2.amazonaws.com/lbry-niko2/thumbnails/" + v.id) - if err != nil { - return err - } - defer resp.Body.Close() - - creds := credentials.NewStaticCredentials("ID-GOES-HERE", "SECRET-GOES-HERE", "") - s, err := session.NewSession(&aws.Config{Region: aws.String("us-east-2"), Credentials: creds}) - if err != nil { - return err - } - uploader := s3manager.NewUploader(s) - - _, err = uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String("berk.ninja"), - Key: aws.String("thumbnails/" + v.id), - ContentType: aws.String("image/jpeg"), - Body: resp.Body, - }) - - return err -} - -func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *namer.Namer) (*SyncSummary, error) { - options := jsonrpc.PublishOptions{ - Title: &v.title, - Author: strPtr("UC Berkeley"), - Description: strPtr(v.getAbbrevDescription()), - Language: strPtr("en"), - ClaimAddress: &claimAddress, - Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id), - License: strPtr("see description"), - ChannelID: &channelID, - ChangeAddress: &claimAddress, - } - - return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, namer) -} - -func (v *ucbVideo) Size() *int64 { - return nil -} - -func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *namer.Namer) (*SyncSummary, error) { - //download and thumbnail can be done in parallel - err := v.download() - if err != nil { - return nil, errors.Prefix("download error", err) - } - log.Debugln("Downloaded " + v.id) - - //err = v.SaveThumbnail() - //if err != nil { - // return errors.WrapPrefix(err, "thumbnail error", 0) - //} - //log.Debugln("Created thumbnail for " + v.id) - - summary, err := v.publish(daemon, claimAddress, amount, channelID, namer) - if err != nil { - return nil, errors.Prefix("publish error", err) - } - - return summary, nil -} diff --git a/ytsync/sources/youtubeVideo.go b/ytsync/sources/youtubeVideo.go deleted file mode 100644 index 925dfc3..0000000 --- a/ytsync/sources/youtubeVideo.go +++ /dev/null @@ -1,294 +0,0 @@ -package sources - -import ( - "bytes" - "encoding/json" - "io/ioutil" - "net/http" - "os" - "regexp" - "strconv" - "strings" - "sync" - "time" - - "github.com/lbryio/lbry.go/errors" - "github.com/lbryio/lbry.go/jsonrpc" - "github.com/lbryio/lbry.go/ytsync/namer" - - "github.com/rylio/ytdl" - log "github.com/sirupsen/logrus" - "google.golang.org/api/youtube/v3" -) - -type YoutubeVideo struct { - id string - channelTitle string - title string - description string - playlistPosition int64 - size *int64 - maxVideoSize int64 - publishedAt time.Time - dir string - claimNames map[string]bool - syncedVideosMux *sync.RWMutex -} - -func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) *YoutubeVideo { - publishedAt, _ := time.Parse(time.RFC3339Nano, snippet.PublishedAt) // ignore parse errors - return &YoutubeVideo{ - id: snippet.ResourceId.VideoId, - title: snippet.Title, - description: snippet.Description, - channelTitle: snippet.ChannelTitle, - playlistPosition: snippet.Position, - publishedAt: publishedAt, - dir: directory, - } -} - -func (v *YoutubeVideo) ID() string { - return v.id -} - -func (v *YoutubeVideo) PlaylistPosition() int { - return int(v.playlistPosition) -} - -func (v *YoutubeVideo) IDAndNum() string { - return v.ID() + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)" -} - -func (v *YoutubeVideo) PublishedAt() time.Time { - return v.publishedAt -} - -func (v *YoutubeVideo) getFilename() string { - maxLen := 30 - reg := regexp.MustCompile(`[^a-zA-Z0-9]+`) - - chunks := strings.Split(strings.ToLower(strings.Trim(reg.ReplaceAllString(v.title, "-"), "-")), "-") - - name := chunks[0] - if len(name) > maxLen { - name = name[:maxLen] - } - - for _, chunk := range chunks[1:] { - tmpName := name + "-" + chunk - if len(tmpName) > maxLen { - if len(name) < 20 { - name = tmpName[:maxLen] - } - break - } - name = tmpName - } - if len(name) < 1 { - name = v.id - } - return v.videoDir() + "/" + name + ".mp4" -} - -func (v *YoutubeVideo) getAbbrevDescription() string { - maxLines := 10 - description := strings.TrimSpace(v.description) - if strings.Count(description, "\n") < maxLines { - return description - } - return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." -} - -func (v *YoutubeVideo) download() error { - videoPath := v.getFilename() - - err := os.Mkdir(v.videoDir(), 0750) - if err != nil && !strings.Contains(err.Error(), "file exists") { - return errors.Wrap(err, 0) - } - - _, err = os.Stat(videoPath) - if err != nil && !os.IsNotExist(err) { - return err - } else if err == nil { - log.Debugln(v.id + " already exists at " + videoPath) - return nil - } - - videoUrl := "https://www.youtube.com/watch?v=" + v.id - videoInfo, err := ytdl.GetVideoInfo(videoUrl) - if err != nil { - return err - } - - codec := []string{"H.264"} - ext := []string{"mp4"} - - //Filter requires a [] interface{} - codecFilter := make([]interface{}, len(codec)) - for i, v := range codec { - codecFilter[i] = v - } - - //Filter requires a [] interface{} - extFilter := make([]interface{}, len(ext)) - for i, v := range ext { - extFilter[i] = v - } - - formats := videoInfo.Formats.Filter(ytdl.FormatVideoEncodingKey, codecFilter).Filter(ytdl.FormatExtensionKey, extFilter) - if len(formats) == 0 { - return errors.Err("no compatible format available for this video") - } - maxRetryAttempts := 5 - for i := 0; i < len(formats) && i < maxRetryAttempts; i++ { - formatIndex := i - if i == maxRetryAttempts-1 { - formatIndex = len(formats) - 1 - } - var downloadedFile *os.File - downloadedFile, err = os.Create(videoPath) - if err != nil { - return err - } - err = videoInfo.Download(formats[formatIndex], downloadedFile) - downloadedFile.Close() - if err != nil { - //delete the video and ignore the error - _ = v.delete() - break - } - fi, err := os.Stat(v.getFilename()) - if err != nil { - return err - } - videoSize := fi.Size() - v.size = &videoSize - - if videoSize > v.maxVideoSize { - //delete the video and ignore the error - _ = v.delete() - err = errors.Err("file is too big and there is no other format available") - } else { - break - } - } - return err -} - -func (v *YoutubeVideo) videoDir() string { - return v.dir + "/" + v.id -} - -func (v *YoutubeVideo) delete() error { - videoPath := v.getFilename() - err := os.Remove(videoPath) - if err != nil { - log.Errorln(errors.Prefix("delete error", err)) - return err - } - log.Debugln(v.id + " deleted from disk (" + videoPath + ")") - return nil -} - -func (v *YoutubeVideo) triggerThumbnailSave() error { - client := &http.Client{Timeout: 30 * time.Second} - - params, err := json.Marshal(map[string]string{"videoid": v.id}) - if err != nil { - return err - } - - request, err := http.NewRequest(http.MethodPut, "https://jgp4g1qoud.execute-api.us-east-1.amazonaws.com/prod/thumbnail", bytes.NewBuffer(params)) - if err != nil { - return err - } - - response, err := client.Do(request) - if err != nil { - return err - } - defer response.Body.Close() - - contents, err := ioutil.ReadAll(response.Body) - if err != nil { - return err - } - - var decoded struct { - Error int `json:"error"` - Url string `json:"url,omitempty"` - Message string `json:"message,omitempty"` - } - err = json.Unmarshal(contents, &decoded) - if err != nil { - return err - } - - if decoded.Error != 0 { - return errors.Err("error creating thumbnail: " + decoded.Message) - } - - return nil -} - -func strPtr(s string) *string { return &s } - -func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *namer.Namer) (*SyncSummary, error) { - if channelID == "" { - return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed? - } - options := jsonrpc.PublishOptions{ - Title: &v.title, - Author: &v.channelTitle, - Description: strPtr(v.getAbbrevDescription() + "\nhttps://www.youtube.com/watch?v=" + v.id), - Language: strPtr("en"), - ClaimAddress: &claimAddress, - Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id), - License: strPtr("Copyrighted (contact author)"), - ChangeAddress: &claimAddress, - ChannelID: &channelID, - } - - return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, namer) -} - -func (v *YoutubeVideo) Size() *int64 { - return v.size -} - -func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *namer.Namer) (*SyncSummary, error) { - v.maxVideoSize = int64(maxVideoSize) * 1024 * 1024 - //download and thumbnail can be done in parallel - err := v.download() - if err != nil { - return nil, errors.Prefix("download error", err) - } - log.Debugln("Downloaded " + v.id) - - err = v.triggerThumbnailSave() - if err != nil { - return nil, errors.Prefix("thumbnail error", err) - } - log.Debugln("Created thumbnail for " + v.id) - - summary, err := v.publish(daemon, claimAddress, amount, channelID, namer) - //delete the video in all cases (and ignore the error) - _ = v.delete() - - return summary, errors.Prefix("publish error", err) -} - -// sorting videos -//type ByPublishedAt []YoutubeVideo -// -//func (a ByPublishedAt) Len() int { return len(a) } -//func (a ByPublishedAt) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -//func (a ByPublishedAt) Less(i, j int) bool { return a[i].publishedAt.Before(a[j].publishedAt) } -// -//type ByPlaylistPosition []YoutubeVideo -// -//func (a ByPlaylistPosition) Len() int { return len(a) } -//func (a ByPlaylistPosition) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -//func (a ByPlaylistPosition) Less(i, j int) bool { return a[i].playlistPosition < a[j].playlistPosition } diff --git a/ytsync/splitter.py b/ytsync/splitter.py deleted file mode 100644 index dc991f5..0000000 --- a/ytsync/splitter.py +++ /dev/null @@ -1,152 +0,0 @@ -import os -import sys -from decimal import Decimal -from bitcoinrpc.authproxy import AuthServiceProxy - -from lbryum.wallet import Wallet, WalletStorage -from lbryum.commands import known_commands, Commands -from lbryum.simple_config import SimpleConfig -from lbryum.blockchain import get_blockchain -from lbryum.network import Network - - -def get_lbrycrdd_connection_string(wallet_conf): - settings = {"username": "rpcuser", - "password": "rpcpassword", - "rpc_port": 9245} - if wallet_conf and os.path.exists(wallet_conf): - with open(wallet_conf, "r") as conf: - conf_lines = conf.readlines() - for l in conf_lines: - if l.startswith("rpcuser="): - settings["username"] = l[8:].rstrip('\n') - if l.startswith("rpcpassword="): - settings["password"] = l[12:].rstrip('\n') - if l.startswith("rpcport="): - settings["rpc_port"] = int(l[8:].rstrip('\n')) - - rpc_user = settings["username"] - rpc_pass = settings["password"] - rpc_port = settings["rpc_port"] - rpc_url = "127.0.0.1" - return "http://%s:%s@%s:%i" % (rpc_user, rpc_pass, rpc_url, rpc_port) - - -class LBRYumWallet(object): - def __init__(self, lbryum_path): - self.config = SimpleConfig() - self.config.set_key('chain', 'lbrycrd_main') - self.storage = WalletStorage(lbryum_path) - self.wallet = Wallet(self.storage) - self.cmd_runner = Commands(self.config, self.wallet, None) - if not self.wallet.has_seed(): - seed = self.wallet.make_seed() - self.wallet.add_seed(seed, "derp") - self.wallet.create_master_keys("derp") - self.wallet.create_main_account() - self.wallet.update_password("derp", "") - self.network = Network(self.config) - self.blockchain = get_blockchain(self.config, self.network) - print self.config.get('chain'), self.blockchain - self.wallet.storage.write() - - def command(self, command_name, *args, **kwargs): - cmd_runner = Commands(self.config, self.wallet, None) - cmd = known_commands[command_name] - func = getattr(cmd_runner, cmd.name) - return func(*args, **kwargs) - - def generate_address(self): - address = self.wallet.create_new_address() - self.wallet.storage.write() - return address - - -class LBRYcrd(object): - def __init__(self, lbrycrdd_path): - self.lbrycrdd_conn_str = get_lbrycrdd_connection_string(lbrycrdd_path) - - def __call__(self, method, *args, **kwargs): - return self.rpc(method)(*args, **kwargs) - - def rpc(self, method): - return AuthServiceProxy(self.lbrycrdd_conn_str, service_name=method) - - -def get_wallet_path(): - cwd = os.getcwd() - wallet_path = os.path.join(cwd, "wallet.json") - if not os.path.exists(wallet_path): - return wallet_path - i = 1 - while True: - wallet_path = os.path.join(cwd, "wallet_%i.json" % i) - if not os.path.exists(wallet_path): - return wallet_path - i += 1 - - -def coin_chooser(lbrycrdd, amount, fee=0.001): - def iter_txis(): - unspent = lbrycrdd("listunspent") - unspent = sorted(unspent, key=lambda x: x['amount'], reverse=True) - spendable = Decimal(0.0) - for txi in unspent: - if spendable >= amount: - break - else: - spendable += txi['amount'] - yield txi - if spendable < amount: - print spendable, amount - raise Exception("Not enough funds") - - coins = list(iter(iter_txis())) - total = sum(c['amount'] for c in coins) - change = Decimal(total) - Decimal(amount) - Decimal(fee) - - if change < 0: - raise Exception("Not enough funds") - if change: - change_address = lbrycrdd("getnewaddress") - else: - change_address = None - - print "Total: %f, amount: %f, change: %f" % (total, amount, change) - - return coins, change, change_address - - -def get_raw_tx(lbrycrdd, addresses, coins, amount, change, change_address): - txi = [{'txid': c['txid'], 'vout': c['vout']} for c in coins] - txo = {address: float(amount) for address in addresses} - if change_address: - txo[change_address] = float(change) - return lbrycrdd("createrawtransaction", txi, txo) - - -def main(count, value=None, lbryum_path=None, lbrycrdd_path=None): - count = int(count) - lbryum_path = lbryum_path or get_wallet_path() - if sys.platform == "darwin": - default_lbrycrdd = os.path.join(os.path.expanduser("~"), - "Library/Application Support/lbrycrd/lbrycrd.conf") - else: - default_lbrycrdd = os.path.join(os.path.expanduser("~"), ".lbrycrd/lbrycrd.conf") - lbrycrdd_path = lbrycrdd_path or default_lbrycrdd - l = LBRYcrd(lbrycrdd_path=lbrycrdd_path) - s = LBRYumWallet(lbryum_path) - value = value or 1.0 - value = Decimal(value) - - coins, change, change_address = coin_chooser(l, count * value) - addresses = [s.generate_address() for i in range(count)] - raw_tx = get_raw_tx(l, addresses, coins, value, change, change_address) - signed = l("signrawtransaction", raw_tx)['hex'] - txid = l("sendrawtransaction", signed) - print txid - - -if __name__ == "__main__": - args = sys.argv[1:] - main(*args) diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go deleted file mode 100644 index 784b06b..0000000 --- a/ytsync/ytsync.go +++ /dev/null @@ -1,848 +0,0 @@ -package ytsync - -import ( - "bufio" - "encoding/csv" - "encoding/json" - "fmt" - "io" - "io/ioutil" - "net/http" - "os" - "os/exec" - "os/signal" - "sort" - "strings" - "sync" - "syscall" - "time" - - "github.com/lbryio/lbry.go/errors" - "github.com/lbryio/lbry.go/jsonrpc" - "github.com/lbryio/lbry.go/stop" - "github.com/lbryio/lbry.go/util" - "github.com/lbryio/lbry.go/ytsync/namer" - "github.com/lbryio/lbry.go/ytsync/sdk" - "github.com/lbryio/lbry.go/ytsync/sources" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/s3/s3manager" - "github.com/mitchellh/go-ps" - log "github.com/sirupsen/logrus" - "google.golang.org/api/googleapi/transport" - "google.golang.org/api/youtube/v3" -) - -const ( - channelClaimAmount = 0.01 - publishAmount = 0.01 - maxReasonLength = 500 -) - -type video interface { - Size() *int64 - ID() string - IDAndNum() string - PlaylistPosition() int - PublishedAt() time.Time - Sync(*jsonrpc.Client, string, float64, string, int, *namer.Namer) (*sources.SyncSummary, error) -} - -// sorting videos -type byPublishedAt []video - -func (a byPublishedAt) Len() int { return len(a) } -func (a byPublishedAt) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[j].PublishedAt()) } - -// Sync stores the options that control how syncing happens -type Sync struct { - APIConfig *sdk.APIConfig - YoutubeChannelID string - LbryChannelName string - StopOnError bool - MaxTries int - ConcurrentVideos int - TakeOverExistingChannel bool - Refill int - Manager *SyncManager - LbrycrdString string - AwsS3ID string - AwsS3Secret string - AwsS3Region string - AwsS3Bucket string - - daemon *jsonrpc.Client - claimAddress string - videoDirectory string - syncedVideosMux *sync.RWMutex - syncedVideos map[string]sdk.SyncedVideo - grp *stop.Group - lbryChannelID string - namer *namer.Namer - - walletMux *sync.Mutex - queue chan video -} - -func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) { - s.syncedVideosMux.Lock() - defer s.syncedVideosMux.Unlock() - s.syncedVideos[videoID] = sdk.SyncedVideo{ - VideoID: videoID, - Published: published, - FailureReason: failureReason, - } -} - -// SendErrorToSlack Sends an error message to the default channel and to the process log. -func SendErrorToSlack(format string, a ...interface{}) error { - message := format - if len(a) > 0 { - message = fmt.Sprintf(format, a...) - } - log.Errorln(message) - return util.SendToSlack(":sos: " + message) -} - -// SendInfoToSlack Sends an info message to the default channel and to the process log. -func SendInfoToSlack(format string, a ...interface{}) error { - message := format - if len(a) > 0 { - message = fmt.Sprintf(format, a...) - } - log.Infoln(message) - return util.SendToSlack(":information_source: " + message) -} - -// IsInterrupted can be queried to discover if the sync process was interrupted manually -func (s *Sync) IsInterrupted() bool { - select { - case <-s.grp.Ch(): - return true - default: - return false - } -} - -func (s *Sync) downloadWallet() error { - defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet" - defaultTempWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/tmp_wallet" - key := aws.String("/wallets/" + s.YoutubeChannelID) - if os.Getenv("REGTEST") == "true" { - defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet" - defaultTempWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/tmp_wallet" - key = aws.String("/regtest/" + s.YoutubeChannelID) - } - - if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) { - return errors.Err("default_wallet already exists") - } - - creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "") - s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds}) - if err != nil { - return err - } - downloader := s3manager.NewDownloader(s3Session) - out, err := os.Create(defaultTempWalletDir) - if err != nil { - return err - } - defer out.Close() - - bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{ - Bucket: aws.String(s.AwsS3Bucket), - Key: key, - }) - if err != nil { - // Casting to the awserr.Error type will allow you to inspect the error - // code returned by the service in code. The error code can be used - // to switch on context specific functionality. In this case a context - // specific error message is printed to the user based on the bucket - // and key existing. - // - // For information on other S3 API error codes see: - // http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html - if aerr, ok := err.(awserr.Error); ok { - code := aerr.Code() - if code == s3.ErrCodeNoSuchKey { - return errors.Err("wallet not on S3") - } - } - return err - } else if bytesWritten == 0 { - return errors.Err("zero bytes written") - } - - return os.Rename(defaultTempWalletDir, defaultWalletDir) -} - -func (s *Sync) uploadWallet() error { - defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet" - key := aws.String("/wallets/" + s.YoutubeChannelID) - if os.Getenv("REGTEST") == "true" { - defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet" - key = aws.String("/regtest/" + s.YoutubeChannelID) - } - - if _, err := os.Stat(defaultWalletDir); os.IsNotExist(err) { - return errors.Err("default_wallet does not exist") - } - - creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "") - s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds}) - if err != nil { - return err - } - - uploader := s3manager.NewUploader(s3Session) - - file, err := os.Open(defaultWalletDir) - if err != nil { - return err - } - defer file.Close() - - _, err = uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(s.AwsS3Bucket), - Key: key, - Body: file, - }) - if err != nil { - return err - } - - return os.Remove(defaultWalletDir) -} - -func (s *Sync) setStatusSyncing() error { - syncedVideos, claimNames, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSyncing, "") - if err != nil { - return err - } - s.syncedVideosMux.Lock() - s.syncedVideos = syncedVideos - s.Manager.namer.SetNames(claimNames) - s.syncedVideosMux.Unlock() - return nil -} - -func (s *Sync) FullCycle() (e error) { - if os.Getenv("HOME") == "" { - return errors.Err("no $HOME env var found") - } - if s.YoutubeChannelID == "" { - return errors.Err("channel ID not provided") - } - s.syncedVideosMux = &sync.RWMutex{} - s.walletMux = &sync.Mutex{} - s.grp = stop.New() - s.queue = make(chan video) - interruptChan := make(chan os.Signal, 1) - signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) - defer signal.Stop(interruptChan) - go func() { - <-interruptChan - log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") - s.grp.Stop() - }() - err := s.setStatusSyncing() - if err != nil { - return err - } - - defer s.setChannelTerminationStatus(&e) - - err = s.downloadWallet() - if err != nil && err.Error() != "wallet not on S3" { - return errors.Prefix("failure in downloading wallet", err) - } else if err == nil { - log.Println("Continuing previous upload") - } else { - log.Println("Starting new wallet") - } - - defer s.stopAndUploadWallet(&e) - - s.videoDirectory, err = ioutil.TempDir("", "ytsync") - if err != nil { - return errors.Wrap(err, 0) - } - - log.Printf("Starting daemon") - err = startDaemonViaSystemd() - if err != nil { - return err - } - - log.Infoln("Waiting for daemon to finish starting...") - s.daemon = jsonrpc.NewClient("") - s.daemon.SetRPCTimeout(40 * time.Minute) - - err = s.waitForDaemonStart() - if err != nil { - return err - } - - err = s.doSync() - if err != nil { - return err - } else { - // wait for reflection to finish??? - wait := 15 * time.Second // should bump this up to a few min, but keeping it low for testing - log.Println("Waiting " + wait.String() + " to finish reflecting everything") - time.Sleep(wait) - } - - return nil -} -func (s *Sync) setChannelTerminationStatus(e *error) { - if *e != nil { - //conditions for which a channel shouldn't be marked as failed - noFailConditions := []string{ - "this youtube channel is being managed by another server", - } - if util.SubstringInSlice((*e).Error(), noFailConditions) { - return - } - failureReason := (*e).Error() - _, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason) - if err != nil { - msg := fmt.Sprintf("Failed setting failed state for channel %s", s.LbryChannelName) - *e = errors.Prefix(msg+err.Error(), *e) - } - } else if !s.IsInterrupted() { - _, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSynced, "") - if err != nil { - *e = err - } - } -} - -func (s *Sync) waitForDaemonStart() error { - for { - select { - case <-s.grp.Ch(): - return errors.Err("interrupted during daemon startup") - default: - s, err := s.daemon.Status() - if err == nil && s.StartupStatus.Wallet && s.StartupStatus.FileManager { - return nil - } - time.Sleep(5 * time.Second) - } - } -} - -func (s *Sync) stopAndUploadWallet(e *error) { - log.Printf("Stopping daemon") - shutdownErr := stopDaemonViaSystemd() - if shutdownErr != nil { - logShutdownError(shutdownErr) - } else { - // the cli will return long before the daemon effectively stops. we must observe the processes running - // before moving the wallet - waitTimeout := 8 * time.Minute - processDeathError := waitForDaemonProcess(waitTimeout) - if processDeathError != nil { - logShutdownError(processDeathError) - } else { - err := s.uploadWallet() - if err != nil { - if *e == nil { - e = &err //not 100% sure - return - } else { - *e = errors.Prefix("failure uploading wallet", *e) - } - } - } - } -} -func logShutdownError(shutdownErr error) { - SendErrorToSlack("error shutting down daemon: %v", shutdownErr) - SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR") -} - -// fixDupes abandons duplicate claims -func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) { - abandonedClaims := false - videoIDs := make(map[string]jsonrpc.Claim) - for _, c := range claims { - if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil { - continue - } - if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil { - return false, errors.Err("something is wrong with this claim: %s", c.ClaimID) - } - tn := *c.Value.Stream.Metadata.Thumbnail - videoID := tn[strings.LastIndex(tn, "/")+1:] - - log.Infof("claimid: %s, claimName: %s, videoID: %s", c.ClaimID, c.Name, videoID) - cl, ok := videoIDs[videoID] - if !ok || cl.ClaimID == c.ClaimID { - videoIDs[videoID] = c - continue - } - // only keep the most recent one - claimToAbandon := c - videoIDs[videoID] = cl - if c.Height > cl.Height { - claimToAbandon = cl - videoIDs[videoID] = c - } - _, err := s.daemon.ClaimAbandon(claimToAbandon.Txid, claimToAbandon.Nout) - if err != nil { - return true, err - } - log.Debugf("abandoning %+v", claimToAbandon) - abandonedClaims = true - //return true, nil - } - return abandonedClaims, nil -} - -//updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published -func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err error) { - count := 0 - for _, c := range claims { - if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil { - continue - } - if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil { - return count, fixed, errors.Err("something is wrong with the this claim: %s", c.ClaimID) - } - //check if claimID is in remote db - tn := *c.Value.Stream.Metadata.Thumbnail - videoID := tn[strings.LastIndex(tn, "/")+1:] - pv, ok := s.syncedVideos[videoID] - if !ok || pv.ClaimName != c.Name { - fixed++ - err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil) - if err != nil { - return total, fixed, err - } - } - total++ - } - return total, fixed, nil -} - -func (s *Sync) doSync() error { - var err error - claims, err := s.daemon.ClaimListMine() - if err != nil { - return errors.Prefix("cannot list claims", err) - } - hasDupes, err := s.fixDupes(*claims) - if err != nil { - return errors.Prefix("error checking for duplicates", err) - } - if hasDupes { - SendInfoToSlack("Channel had dupes and was fixed!") - err = s.waitForNewBlock() - if err != nil { - return err - } - claims, err = s.daemon.ClaimListMine() - if err != nil { - return errors.Prefix("cannot list claims", err) - } - } - - pubsOnWallet, nFixed, err := s.updateRemoteDB(*claims) - if err != nil { - return errors.Prefix("error counting claims", err) - } - if nFixed > 0 { - err := s.setStatusSyncing() - if err != nil { - return err - } - SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed) - } - pubsOnDB := 0 - for _, sv := range s.syncedVideos { - if sv.Published { - pubsOnDB++ - } - } - - if pubsOnWallet > pubsOnDB { //This case should never happen - SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) - return errors.Err("not all published videos are in the database") - } - if pubsOnWallet < pubsOnDB { - SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) - } - err = s.walletSetup() - if err != nil { - return errors.Prefix("Initial wallet setup failed! Manual Intervention is required.", err) - } - - if s.StopOnError { - log.Println("Will stop publishing if an error is detected") - } - - for i := 0; i < s.ConcurrentVideos; i++ { - s.grp.Add(1) - go func(i int) { - defer s.grp.Done() - s.startWorker(i) - }(i) - } - - if s.LbryChannelName == "@UCBerkeley" { - err = s.enqueueUCBVideos() - } else { - err = s.enqueueYoutubeVideos() - } - close(s.queue) - s.grp.Wait() - return err -} - -func (s *Sync) startWorker(workerNum int) { - var v video - var more bool - - for { - select { - case <-s.grp.Ch(): - log.Printf("Stopping worker %d", workerNum) - return - default: - } - - select { - case v, more = <-s.queue: - if !more { - return - } - case <-s.grp.Ch(): - log.Printf("Stopping worker %d", workerNum) - return - } - - log.Println("================================================================================") - - tryCount := 0 - for { - tryCount++ - err := s.processVideo(v) - - if err != nil { - logMsg := fmt.Sprintf("error processing video: " + err.Error()) - log.Errorln(logMsg) - fatalErrors := []string{ - ":5279: read: connection reset by peer", - "no space left on device", - "NotEnoughFunds", - "Cannot publish using channel", - "cannot concatenate 'str' and 'NoneType' objects", - "more than 90% of the space has been used.", - } - if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError { - s.grp.Stop() - } else if s.MaxTries > 1 { - errorsNoRetry := []string{ - "non 200 status code received", - " reason: 'This video contains content from", - "dont know which claim to update", - "uploader has not made this video available in your country", - "download error: AccessDenied: Access Denied", - "Playback on other websites has been disabled by the video owner", - "Error in daemon: Cannot publish empty file", - "Error extracting sts from embedded url response", - "Unable to extract signature tokens", - "Client.Timeout exceeded while awaiting headers)", - "the video is too big to sync, skipping for now", - } - if util.SubstringInSlice(err.Error(), errorsNoRetry) { - log.Println("This error should not be retried at all") - } else if tryCount < s.MaxTries { - if strings.Contains(err.Error(), "txn-mempool-conflict") || - strings.Contains(err.Error(), "too-long-mempool-chain") { - log.Println("waiting for a block before retrying") - err = s.waitForNewBlock() - if err != nil { - s.grp.Stop() - SendErrorToSlack("something went wrong while waiting for a block: %v", err) - break - } - } else if strings.Contains(err.Error(), "failed: Not enough funds") || - strings.Contains(err.Error(), "Error in daemon: Insufficient funds, please deposit additional LBC") { - log.Println("refilling addresses before retrying") - err = s.walletSetup() - if err != nil { - s.grp.Stop() - SendErrorToSlack("failed to setup the wallet for a refill: %v", err) - break - } - } - log.Println("Retrying") - continue - } - SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg) - } - s.AppendSyncedVideo(v.ID(), false, err.Error(), "") - err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size()) - if err != nil { - SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) - } - } - break - } - } -} - -func (s *Sync) enqueueYoutubeVideos() error { - client := &http.Client{ - Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey}, - } - - service, err := youtube.New(client) - if err != nil { - return errors.Prefix("error creating YouTube service", err) - } - - response, err := service.Channels.List("contentDetails").Id(s.YoutubeChannelID).Do() - if err != nil { - return errors.Prefix("error getting channels", err) - } - - if len(response.Items) < 1 { - return errors.Err("youtube channel not found") - } - - if response.Items[0].ContentDetails.RelatedPlaylists == nil { - return errors.Err("no related playlists") - } - - playlistID := response.Items[0].ContentDetails.RelatedPlaylists.Uploads - if playlistID == "" { - return errors.Err("no channel playlist") - } - - var videos []video - - nextPageToken := "" - for { - req := service.PlaylistItems.List("snippet"). - PlaylistId(playlistID). - MaxResults(50). - PageToken(nextPageToken) - - playlistResponse, err := req.Do() - if err != nil { - return errors.Prefix("error getting playlist items", err) - } - - if len(playlistResponse.Items) < 1 { - return errors.Err("playlist items not found") - } - - for _, item := range playlistResponse.Items { - // normally we'd send the video into the channel here, but youtube api doesn't have sorting - // so we have to get ALL the videos, then sort them, then send them in - videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item.Snippet)) - } - - log.Infof("Got info for %d videos from youtube API", len(videos)) - - nextPageToken = playlistResponse.NextPageToken - if nextPageToken == "" { - break - } - } - - sort.Sort(byPublishedAt(videos)) - //or sort.Sort(sort.Reverse(byPlaylistPosition(videos))) - -Enqueue: - for _, v := range videos { - select { - case <-s.grp.Ch(): - break Enqueue - default: - } - - select { - case s.queue <- v: - case <-s.grp.Ch(): - break Enqueue - } - } - - return nil -} - -func (s *Sync) enqueueUCBVideos() error { - var videos []video - - csvFile, err := os.Open("ucb.csv") - if err != nil { - return err - } - - reader := csv.NewReader(bufio.NewReader(csvFile)) - for { - line, err := reader.Read() - if err == io.EOF { - break - } else if err != nil { - return err - } - data := struct { - PublishedAt string `json:"publishedAt"` - }{} - err = json.Unmarshal([]byte(line[4]), &data) - if err != nil { - return err - } - - videos = append(videos, sources.NewUCBVideo(line[0], line[2], line[1], line[3], data.PublishedAt, s.videoDirectory)) - } - - log.Printf("Publishing %d videos\n", len(videos)) - - sort.Sort(byPublishedAt(videos)) - -Enqueue: - for _, v := range videos { - select { - case <-s.grp.Ch(): - break Enqueue - default: - } - - select { - case s.queue <- v: - case <-s.grp.Ch(): - break Enqueue - } - } - - return nil -} - -func (s *Sync) processVideo(v video) (err error) { - defer func() { - if p := recover(); p != nil { - var ok bool - err, ok = p.(error) - if !ok { - err = errors.Err("%v", p) - } - err = errors.Wrap(p, 2) - } - }() - - log.Println("Processing " + v.IDAndNum()) - defer func(start time.Time) { - log.Println(v.ID() + " took " + time.Since(start).String()) - }(time.Now()) - - s.syncedVideosMux.RLock() - sv, ok := s.syncedVideos[v.ID()] - s.syncedVideosMux.RUnlock() - alreadyPublished := ok && sv.Published - - neverRetryFailures := []string{ - "Error extracting sts from embedded url response", - "Unable to extract signature tokens", - "the video is too big to sync, skipping for now", - } - if ok && !sv.Published && util.SubstringInSlice(sv.FailureReason, neverRetryFailures) { - log.Println(v.ID() + " can't ever be published") - return nil - } - - if alreadyPublished { - log.Println(v.ID() + " already published") - return nil - } - - if v.PlaylistPosition() > s.Manager.videosLimit { - log.Println(v.ID() + " is old: skipping") - return nil - } - err = s.Manager.checkUsedSpace() - if err != nil { - return err - } - - summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.maxVideoSize, s.namer) - if err != nil { - return err - } - - err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size()) - if err != nil { - SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) - } - - s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName) - - return nil -} - -func startDaemonViaSystemd() error { - err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "start", "lbrynet.service").Run() - if err != nil { - return errors.Err(err) - } - return nil -} - -func stopDaemonViaSystemd() error { - err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "stop", "lbrynet.service").Run() - if err != nil { - return errors.Err(err) - } - return nil -} - -// waitForDaemonProcess observes the running processes and returns when the process is no longer running or when the timeout is up -func waitForDaemonProcess(timeout time.Duration) error { - processes, err := ps.Processes() - if err != nil { - return err - } - var daemonProcessId = -1 - for _, p := range processes { - if p.Executable() == "lbrynet-daemon" { - daemonProcessId = p.Pid() - break - } - } - if daemonProcessId == -1 { - return nil - } - then := time.Now() - stopTime := then.Add(time.Duration(timeout * time.Second)) - for !time.Now().After(stopTime) { - wait := 10 * time.Second - log.Println("the daemon is still running, waiting for it to exit") - time.Sleep(wait) - proc, err := os.FindProcess(daemonProcessId) - if err != nil { - // couldn't find the process, that means the daemon is stopped and can continue - return nil - } - //double check if process is running and alive - //by sending a signal 0 - //NOTE : syscall.Signal is not available in Windows - err = proc.Signal(syscall.Signal(0)) - //the process doesn't exist anymore! we're free to go - if err != nil && (err == syscall.ESRCH || err.Error() == "os: process already finished") { - return nil - } - } - return errors.Err("timeout reached") -}