diff --git a/go.mod b/go.mod index 9b0f980..669a9ec 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/kr/pretty v0.1.0 // indirect github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c - github.com/lbryio/lbry.go v1.0.7 + github.com/lbryio/lbry.go v1.0.9 github.com/lusis/slack-test v0.0.0-20190408224659-6cf59653add2 // indirect github.com/mitchellh/go-ps v0.0.0-20170309133038-4fdf99ab2936 github.com/mitchellh/mapstructure v1.1.2 // indirect diff --git a/go.sum b/go.sum index 8e2159e..6cb066f 100644 --- a/go.sum +++ b/go.sum @@ -127,10 +127,11 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c h1:BhdcWGsuKif/XoSZnqVGNqJ1iEmH0czWR5upj+AuR8M= github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c/go.mod h1:muH7wpUqE8hRA3OrYYosw9+Sl681BF9cwcjzE+OCNK8= -github.com/lbryio/lbry.go v1.0.7 h1:CO9wnH/grsrX1O3YpeSBpLVPtc3lOOVTvZCcPFZ2Os0= -github.com/lbryio/lbry.go v1.0.7/go.mod h1:JtyI30bU51rm0LZ/po3mQuzf++14OWb6kR/6mMRAmKU= +github.com/lbryio/lbry.go v1.0.9 h1:SAqopNiISazYuF38quA3WEfqOCzFtUed6QfCkAYQoJo= +github.com/lbryio/lbry.go v1.0.9/go.mod h1:JtyI30bU51rm0LZ/po3mQuzf++14OWb6kR/6mMRAmKU= github.com/lbryio/lbryschema.go v0.0.0-20190428231007-c54836bca002 h1:urfYK5ElpUrAv90auPLldoVC60LwiGAcY0OE6HJB9KI= github.com/lbryio/lbryschema.go v0.0.0-20190428231007-c54836bca002/go.mod h1:dAzPCBj3CKKWBGYBZxK6tKBP5SCgY2tqd9SnQd/OyKo= +github.com/lbryio/ozzo-validation v0.0.0-20170323141101-d1008ad1fd04 h1:Nze+C2HbeKvhjI/kVn+9Poj/UuEW5sOQxcsxqO7L3GI= github.com/lbryio/ozzo-validation v0.0.0-20170323141101-d1008ad1fd04/go.mod h1:fbG/dzobG8r95KzMwckXiLMHfFjZaBRQqC9hPs2XAQ4= github.com/lbryio/types v0.0.0-20190422033210-321fb2abda9c h1:m3O7561xBQ00lfUVayW4c6SnpVbUDQtPUwGcGYSUYQA= github.com/lbryio/types v0.0.0-20190422033210-321fb2abda9c/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE= @@ -194,6 +195,7 @@ github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3/go.mod h1 github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s= github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2YqsZKovtsgrwcDH1RZhVbTKCjLp47XpqCDs= github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/spf13/cast v1.2.0 h1:HHl1DSRbEQN2i8tJmtS6ViPyHx35+p51amrdsiTCrkg= github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg= github.com/spf13/cobra v0.0.0-20190109003409-7547e83b2d85 h1:UQHWkFUuJBy5rWN1DxosG/efssLu7u0fXXSTC2HHKfQ= github.com/spf13/cobra v0.0.0-20190109003409-7547e83b2d85/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= diff --git a/main.go b/main.go index 59a4166..ef0915a 100644 --- a/main.go +++ b/main.go @@ -36,6 +36,7 @@ var ( videosLimit int maxVideoSize int maxVideoLength float64 + removeDBUnpublished bool ) func main() { @@ -56,6 +57,7 @@ func main() { cmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup") cmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones") cmd.Flags().BoolVar(&singleRun, "run-once", false, "Whether the process should be stopped after one cycle or not") + cmd.Flags().BoolVar(&singleRun, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published") cmd.Flags().StringVar(&syncStatus, "status", "", "Specify which queue to pull from. Overrides --update") cmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.") cmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)") @@ -188,6 +190,7 @@ func ytSync(cmd *cobra.Command, args []string) { syncProperties, apiConfig, maxVideoLength, + removeDBUnpublished, ) err := sm.Start() if err != nil { diff --git a/manager/manager.go b/manager/manager.go index b56c4f8..ed8c3cf 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -39,12 +39,13 @@ type SyncManager struct { singleRun bool syncProperties *sdk.SyncProperties apiConfig *sdk.APIConfig + removeDBUnpublished bool } func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool, refill int, limit int, skipSpaceCheck bool, syncUpdate bool, concurrentJobs int, concurrentVideos int, blobsDir string, videosLimit int, maxVideoSize int, lbrycrdString string, awsS3ID string, awsS3Secret string, awsS3Region string, awsS3Bucket string, - syncStatus string, singleRun bool, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig, maxVideoLength float64) *SyncManager { + syncStatus string, singleRun bool, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig, maxVideoLength float64, removeDBUnpublished bool) *SyncManager { return &SyncManager{ stopOnError: stopOnError, maxTries: maxTries, @@ -68,6 +69,7 @@ func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool singleRun: singleRun, syncProperties: syncProperties, apiConfig: apiConfig, + removeDBUnpublished: removeDBUnpublished, } } diff --git a/manager/setup.go b/manager/setup.go index 4f4adc8..be1d84d 100644 --- a/manager/setup.go +++ b/manager/setup.go @@ -355,34 +355,28 @@ func (s *Sync) ensureChannelOwnership() error { locations = []jsonrpc.Location{{Country: util.PtrToString(channelInfo.Country)}} } var c *jsonrpc.TransactionSummary + claimCreateOptions := jsonrpc.ClaimCreateOptions{ + Title: &channelInfo.Title, + Description: &channelInfo.Description, + Tags: tagsManager.GetTagsForChannel(s.YoutubeChannelID), + Languages: languages, + Locations: locations, + ThumbnailURL: &thumbnailURL, + } if channelUsesOldMetadata { c, err = s.daemon.ChannelUpdate(s.lbryChannelID, jsonrpc.ChannelUpdateOptions{ ClearTags: util.PtrToBool(true), ClearLocations: util.PtrToBool(true), ClearLanguages: util.PtrToBool(true), ChannelCreateOptions: jsonrpc.ChannelCreateOptions{ - ClaimCreateOptions: jsonrpc.ClaimCreateOptions{ - Title: channelInfo.Title, - Description: channelInfo.Description, - Tags: tagsManager.GetTagsForChannel(s.YoutubeChannelID), - Languages: languages, - Locations: locations, - ThumbnailURL: &thumbnailURL, - }, - CoverURL: bannerURL, + ClaimCreateOptions: claimCreateOptions, + CoverURL: bannerURL, }, }) } else { c, err = s.daemon.ChannelCreate(s.LbryChannelName, channelBidAmount, jsonrpc.ChannelCreateOptions{ - ClaimCreateOptions: jsonrpc.ClaimCreateOptions{ - Title: channelInfo.Title, - Description: channelInfo.Description, - Tags: tagsManager.GetTagsForChannel(s.YoutubeChannelID), - Languages: languages, - Locations: locations, - ThumbnailURL: &thumbnailURL, - }, - CoverURL: bannerURL, + ClaimCreateOptions: claimCreateOptions, + CoverURL: bannerURL, }) } diff --git a/manager/ytsync.go b/manager/ytsync.go index fa88307..1fe04d7 100644 --- a/manager/ytsync.go +++ b/manager/ytsync.go @@ -445,8 +445,9 @@ func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) { } //updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published -func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err error) { +func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total, fixed, removed int, err error) { count := 0 + videoIDMap := make(map[string]string, len(claims)) for _, c := range claims { if !isYtsyncClaim(c) { continue @@ -455,6 +456,7 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err //check if claimID is in remote db tn := c.Value.GetThumbnail().GetUrl() videoID := tn[strings.LastIndex(tn, "/")+1:] + videoIDMap[videoID] = c.ClaimID pv, ok := s.syncedVideos[videoID] if !ok || pv.ClaimName != c.Name { fixed++ @@ -464,13 +466,33 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err size = 0 } metadataVersion := uint(1) + claimIsUpgraded := strings.Contains(tn, thumbs.ThumbnailEndpoint) + if claimIsUpgraded { + metadataVersion = 2 + } err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", util.PtrToInt64(int64(size)), metadataVersion) if err != nil { - return count, fixed, err + return count, fixed, 0, err } } } - return count, fixed, nil + idsToRemove := make([]string, 0, len(videoIDMap)) + for vID, sv := range s.syncedVideos { + _, ok := videoIDMap[vID] + if !ok && sv.Published { + log.Debugf("%s: claims to be published but wasn't found in the list of claims and will be removed if --remove-db-unpublished was specified", vID) + idsToRemove = append(idsToRemove, vID) + } + } + removeCount := 0 + if s.Manager.removeDBUnpublished { + err := s.Manager.apiConfig.DeleteVideos(idsToRemove) + if err != nil { + return count, fixed, 0, err + } + removeCount++ + } + return count, fixed, removeCount, nil } func (s *Sync) getClaims() ([]jsonrpc.Claim, error) { @@ -516,16 +538,21 @@ func (s *Sync) doSync() error { } } - pubsOnWallet, nFixed, err := s.updateRemoteDB(allClaims) + pubsOnWallet, nFixed, nRemoved, err := s.updateRemoteDB(allClaims) if err != nil { return errors.Prefix("error counting claims", err) } - if nFixed > 0 { + if nFixed > 0 || nRemoved > 0 { err := s.setStatusSyncing() if err != nil { return err } - SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed) + if nFixed > 0 { + SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed) + } + if nRemoved > 0 { + SendInfoToSlack("%d were marked as published but weren't actually published and thus removed from the database", nRemoved) + } } pubsOnDB := 0 for _, sv := range s.syncedVideos { @@ -648,6 +675,8 @@ func (s *Sync) startWorker(workerNum int) { SendErrorToSlack("failed to setup the wallet for a refill: %v", err) break } + } else if strings.Contains(err.Error(), "Error in daemon: 'str' object has no attribute 'get'") { + time.Sleep(5 * time.Second) } log.Println("Retrying") continue diff --git a/sdk/api.go b/sdk/api.go index 603a1f9..1c19974 100644 --- a/sdk/api.go +++ b/sdk/api.go @@ -11,6 +11,7 @@ import ( "strings" "time" + "github.com/lbryio/lbry.go/extras/api" "github.com/lbryio/lbry.go/extras/errors" "github.com/lbryio/lbry.go/extras/null" @@ -167,6 +168,34 @@ const ( VideoStatusFailed = "failed" ) +func (a *APIConfig) DeleteVideos(videos []string) error { + endpoint := a.ApiURL + "/yt/video_status" + videoIDs := strings.Join(videos, ",") + vals := url.Values{ + "video_id": {videoIDs}, + "auth_token": {a.ApiToken}, + } + res, _ := http.PostForm(endpoint, vals) + defer res.Body.Close() + body, _ := ioutil.ReadAll(res.Body) + response := api.Response{} + err := json.Unmarshal(body, response) + if err != nil { + return errors.Err(err) + } + if response.Error != nil { + return errors.Err(response.Error) + } + str, ok := response.Data.(string) + if !ok { + return errors.Err("%x", response.Data) + } + if str != "ok" { + return errors.Err(str) + } + return nil +} + func (a *APIConfig) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64, metadataVersion uint) error { endpoint := a.ApiURL + "/yt/video_status" diff --git a/sources/youtubeVideo.go b/sources/youtubeVideo.go index 5be0a5d..bb259d6 100644 --- a/sources/youtubeVideo.go +++ b/sources/youtubeVideo.go @@ -174,8 +174,11 @@ func (v *YoutubeVideo) fallbackDownload() error { cmd := exec.Command("youtube-dl", v.ID(), "--no-progress", - "-fbestvideo[ext=mp4,height<=1080,filesize<2000M]+bestaudio/best[ext=mp4,height<=1080,filesize<2000M]", - "-o"+strings.TrimRight(v.getFullPath(), ".mp4")) + "-fbestvideo[ext=mp4,height<=1080,filesize<2000M]+best[ext=mp4,height<=1080,filesize<2000M]", + "-o"+strings.TrimRight(v.getFullPath(), ".mp4"), + "--merge-output-format", + "mp4") + log.Printf("Running command and waiting for it to finish...") output, err := cmd.CombinedOutput() log.Debugln(string(output)) @@ -321,8 +324,8 @@ func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amou options := jsonrpc.StreamCreateOptions{ ClaimCreateOptions: jsonrpc.ClaimCreateOptions{ - Title: v.title, - Description: v.getAbbrevDescription(), + Title: &v.title, + Description: util.PtrToString(v.getAbbrevDescription()), ClaimAddress: &claimAddress, Languages: languages, ThumbnailURL: &v.thumbnailURL, @@ -463,6 +466,8 @@ func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, exis Author: util.PtrToString(""), License: util.PtrToString("Copyrighted (contact publisher)"), ChannelID: &v.lbryChannelID, + Height: util.PtrToUint(720), + Width: util.PtrToUint(1280), }, FileSize: &videoSize, }) @@ -487,8 +492,8 @@ func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, exis ClearTags: util.PtrToBool(true), StreamCreateOptions: &jsonrpc.StreamCreateOptions{ ClaimCreateOptions: jsonrpc.ClaimCreateOptions{ - Title: v.title, - Description: v.getAbbrevDescription(), + Title: &v.title, + Description: util.PtrToString(v.getAbbrevDescription()), Tags: tags, Languages: languages, Locations: locations, @@ -496,8 +501,8 @@ func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, exis }, Author: util.PtrToString(""), License: util.PtrToString("Copyrighted (contact publisher)"), - VideoHeight: util.PtrToUint(720), - VideoWidth: util.PtrToUint(1280), + Height: util.PtrToUint(720), + Width: util.PtrToUint(1280), ReleaseTime: util.PtrToInt64(v.publishedAt.Unix()), Duration: util.PtrToUint64(uint64(math.Ceil(videoDuration.ToDuration().Seconds()))), ChannelID: &v.lbryChannelID, diff --git a/tagsManager/tags_mapping.go b/tagsManager/tags_mapping.go index 79f45d5..adb1024 100644 --- a/tagsManager/tags_mapping.go +++ b/tagsManager/tags_mapping.go @@ -1,6 +1,7 @@ package tagsManager import ( + "sort" "strings" ) @@ -60,6 +61,8 @@ func SanitizeTags(tags []string, youtubeChannelID string) ([]string, error) { } } sanitizedTags := make([]string, 0, len(originalTags)+len(curatedTags)) + sort.Strings(curatedTags) + sort.Strings(originalTags) sanitizedTags = append(sanitizedTags, curatedTags...) sanitizedTags = append(sanitizedTags, originalTags...) return sanitizedTags, nil @@ -130,11 +133,17 @@ func (ts *tagsSanitizer) add() { const ( Lunduke = "UCkK9UDm_ZNrq_rIXCz3xCGA" SwissExperiments = "UCNQfQvFMPnInwsU_iGYArJQ" + Juggling = "UC2fhTIbnQlFYaFzyTcmPkXg" + JustJuggling = "UCftqelpjmbFrUwr3VVzzVwA" + JordanBPeterson = "UCL_f53ZEJxp8TtlOkHwMV9Q" ) var channelWideTags = map[string][]string{ Lunduke: {"linux", "technology"}, SwissExperiments: {"science & technology", "experiments", "switzerland"}, + Juggling: {"juggling", "circus arts", "malabares"}, + JustJuggling: {"juggling", "circus arts", "malabares"}, + JordanBPeterson: {"postmodernism", "psychology", "news"}, } var tagsToSkip = map[string]*struct{}{ "#hangoutsonair": nil,