add flag to remove unpublished videos

refactor code
add workaround for SDK bug
add code to remove unpublished videos
fix alt downloader to only produce mp4
add missing height and width
add tags sorting
add a few channel wide tags
This commit is contained in:
Niko Storni 2019-06-04 22:21:40 +02:00
parent 3389a8be93
commit cfe8ff0879
No known key found for this signature in database
GPG key ID: F37FE63398800368
9 changed files with 109 additions and 36 deletions

2
go.mod
View file

@ -15,7 +15,7 @@ require (
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c
github.com/lbryio/lbry.go v1.0.7
github.com/lbryio/lbry.go v1.0.9
github.com/lusis/slack-test v0.0.0-20190408224659-6cf59653add2 // indirect
github.com/mitchellh/go-ps v0.0.0-20170309133038-4fdf99ab2936
github.com/mitchellh/mapstructure v1.1.2 // indirect

6
go.sum
View file

@ -127,10 +127,11 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c h1:BhdcWGsuKif/XoSZnqVGNqJ1iEmH0czWR5upj+AuR8M=
github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c/go.mod h1:muH7wpUqE8hRA3OrYYosw9+Sl681BF9cwcjzE+OCNK8=
github.com/lbryio/lbry.go v1.0.7 h1:CO9wnH/grsrX1O3YpeSBpLVPtc3lOOVTvZCcPFZ2Os0=
github.com/lbryio/lbry.go v1.0.7/go.mod h1:JtyI30bU51rm0LZ/po3mQuzf++14OWb6kR/6mMRAmKU=
github.com/lbryio/lbry.go v1.0.9 h1:SAqopNiISazYuF38quA3WEfqOCzFtUed6QfCkAYQoJo=
github.com/lbryio/lbry.go v1.0.9/go.mod h1:JtyI30bU51rm0LZ/po3mQuzf++14OWb6kR/6mMRAmKU=
github.com/lbryio/lbryschema.go v0.0.0-20190428231007-c54836bca002 h1:urfYK5ElpUrAv90auPLldoVC60LwiGAcY0OE6HJB9KI=
github.com/lbryio/lbryschema.go v0.0.0-20190428231007-c54836bca002/go.mod h1:dAzPCBj3CKKWBGYBZxK6tKBP5SCgY2tqd9SnQd/OyKo=
github.com/lbryio/ozzo-validation v0.0.0-20170323141101-d1008ad1fd04 h1:Nze+C2HbeKvhjI/kVn+9Poj/UuEW5sOQxcsxqO7L3GI=
github.com/lbryio/ozzo-validation v0.0.0-20170323141101-d1008ad1fd04/go.mod h1:fbG/dzobG8r95KzMwckXiLMHfFjZaBRQqC9hPs2XAQ4=
github.com/lbryio/types v0.0.0-20190422033210-321fb2abda9c h1:m3O7561xBQ00lfUVayW4c6SnpVbUDQtPUwGcGYSUYQA=
github.com/lbryio/types v0.0.0-20190422033210-321fb2abda9c/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE=
@ -194,6 +195,7 @@ github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3/go.mod h1
github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2YqsZKovtsgrwcDH1RZhVbTKCjLp47XpqCDs=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/spf13/cast v1.2.0 h1:HHl1DSRbEQN2i8tJmtS6ViPyHx35+p51amrdsiTCrkg=
github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg=
github.com/spf13/cobra v0.0.0-20190109003409-7547e83b2d85 h1:UQHWkFUuJBy5rWN1DxosG/efssLu7u0fXXSTC2HHKfQ=
github.com/spf13/cobra v0.0.0-20190109003409-7547e83b2d85/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=

View file

@ -36,6 +36,7 @@ var (
videosLimit int
maxVideoSize int
maxVideoLength float64
removeDBUnpublished bool
)
func main() {
@ -56,6 +57,7 @@ func main() {
cmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
cmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones")
cmd.Flags().BoolVar(&singleRun, "run-once", false, "Whether the process should be stopped after one cycle or not")
cmd.Flags().BoolVar(&singleRun, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published")
cmd.Flags().StringVar(&syncStatus, "status", "", "Specify which queue to pull from. Overrides --update")
cmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.")
cmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
@ -188,6 +190,7 @@ func ytSync(cmd *cobra.Command, args []string) {
syncProperties,
apiConfig,
maxVideoLength,
removeDBUnpublished,
)
err := sm.Start()
if err != nil {

View file

@ -39,12 +39,13 @@ type SyncManager struct {
singleRun bool
syncProperties *sdk.SyncProperties
apiConfig *sdk.APIConfig
removeDBUnpublished bool
}
func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool, refill int, limit int,
skipSpaceCheck bool, syncUpdate bool, concurrentJobs int, concurrentVideos int, blobsDir string, videosLimit int,
maxVideoSize int, lbrycrdString string, awsS3ID string, awsS3Secret string, awsS3Region string, awsS3Bucket string,
syncStatus string, singleRun bool, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig, maxVideoLength float64) *SyncManager {
syncStatus string, singleRun bool, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig, maxVideoLength float64, removeDBUnpublished bool) *SyncManager {
return &SyncManager{
stopOnError: stopOnError,
maxTries: maxTries,
@ -68,6 +69,7 @@ func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool
singleRun: singleRun,
syncProperties: syncProperties,
apiConfig: apiConfig,
removeDBUnpublished: removeDBUnpublished,
}
}

View file

@ -355,34 +355,28 @@ func (s *Sync) ensureChannelOwnership() error {
locations = []jsonrpc.Location{{Country: util.PtrToString(channelInfo.Country)}}
}
var c *jsonrpc.TransactionSummary
claimCreateOptions := jsonrpc.ClaimCreateOptions{
Title: &channelInfo.Title,
Description: &channelInfo.Description,
Tags: tagsManager.GetTagsForChannel(s.YoutubeChannelID),
Languages: languages,
Locations: locations,
ThumbnailURL: &thumbnailURL,
}
if channelUsesOldMetadata {
c, err = s.daemon.ChannelUpdate(s.lbryChannelID, jsonrpc.ChannelUpdateOptions{
ClearTags: util.PtrToBool(true),
ClearLocations: util.PtrToBool(true),
ClearLanguages: util.PtrToBool(true),
ChannelCreateOptions: jsonrpc.ChannelCreateOptions{
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{
Title: channelInfo.Title,
Description: channelInfo.Description,
Tags: tagsManager.GetTagsForChannel(s.YoutubeChannelID),
Languages: languages,
Locations: locations,
ThumbnailURL: &thumbnailURL,
},
CoverURL: bannerURL,
ClaimCreateOptions: claimCreateOptions,
CoverURL: bannerURL,
},
})
} else {
c, err = s.daemon.ChannelCreate(s.LbryChannelName, channelBidAmount, jsonrpc.ChannelCreateOptions{
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{
Title: channelInfo.Title,
Description: channelInfo.Description,
Tags: tagsManager.GetTagsForChannel(s.YoutubeChannelID),
Languages: languages,
Locations: locations,
ThumbnailURL: &thumbnailURL,
},
CoverURL: bannerURL,
ClaimCreateOptions: claimCreateOptions,
CoverURL: bannerURL,
})
}

View file

@ -445,8 +445,9 @@ func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) {
}
//updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published
func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err error) {
func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total, fixed, removed int, err error) {
count := 0
videoIDMap := make(map[string]string, len(claims))
for _, c := range claims {
if !isYtsyncClaim(c) {
continue
@ -455,6 +456,7 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err
//check if claimID is in remote db
tn := c.Value.GetThumbnail().GetUrl()
videoID := tn[strings.LastIndex(tn, "/")+1:]
videoIDMap[videoID] = c.ClaimID
pv, ok := s.syncedVideos[videoID]
if !ok || pv.ClaimName != c.Name {
fixed++
@ -464,13 +466,33 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err
size = 0
}
metadataVersion := uint(1)
claimIsUpgraded := strings.Contains(tn, thumbs.ThumbnailEndpoint)
if claimIsUpgraded {
metadataVersion = 2
}
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", util.PtrToInt64(int64(size)), metadataVersion)
if err != nil {
return count, fixed, err
return count, fixed, 0, err
}
}
}
return count, fixed, nil
idsToRemove := make([]string, 0, len(videoIDMap))
for vID, sv := range s.syncedVideos {
_, ok := videoIDMap[vID]
if !ok && sv.Published {
log.Debugf("%s: claims to be published but wasn't found in the list of claims and will be removed if --remove-db-unpublished was specified", vID)
idsToRemove = append(idsToRemove, vID)
}
}
removeCount := 0
if s.Manager.removeDBUnpublished {
err := s.Manager.apiConfig.DeleteVideos(idsToRemove)
if err != nil {
return count, fixed, 0, err
}
removeCount++
}
return count, fixed, removeCount, nil
}
func (s *Sync) getClaims() ([]jsonrpc.Claim, error) {
@ -516,16 +538,21 @@ func (s *Sync) doSync() error {
}
}
pubsOnWallet, nFixed, err := s.updateRemoteDB(allClaims)
pubsOnWallet, nFixed, nRemoved, err := s.updateRemoteDB(allClaims)
if err != nil {
return errors.Prefix("error counting claims", err)
}
if nFixed > 0 {
if nFixed > 0 || nRemoved > 0 {
err := s.setStatusSyncing()
if err != nil {
return err
}
SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed)
if nFixed > 0 {
SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed)
}
if nRemoved > 0 {
SendInfoToSlack("%d were marked as published but weren't actually published and thus removed from the database", nRemoved)
}
}
pubsOnDB := 0
for _, sv := range s.syncedVideos {
@ -648,6 +675,8 @@ func (s *Sync) startWorker(workerNum int) {
SendErrorToSlack("failed to setup the wallet for a refill: %v", err)
break
}
} else if strings.Contains(err.Error(), "Error in daemon: 'str' object has no attribute 'get'") {
time.Sleep(5 * time.Second)
}
log.Println("Retrying")
continue

View file

@ -11,6 +11,7 @@ import (
"strings"
"time"
"github.com/lbryio/lbry.go/extras/api"
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/null"
@ -167,6 +168,34 @@ const (
VideoStatusFailed = "failed"
)
func (a *APIConfig) DeleteVideos(videos []string) error {
endpoint := a.ApiURL + "/yt/video_status"
videoIDs := strings.Join(videos, ",")
vals := url.Values{
"video_id": {videoIDs},
"auth_token": {a.ApiToken},
}
res, _ := http.PostForm(endpoint, vals)
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
response := api.Response{}
err := json.Unmarshal(body, response)
if err != nil {
return errors.Err(err)
}
if response.Error != nil {
return errors.Err(response.Error)
}
str, ok := response.Data.(string)
if !ok {
return errors.Err("%x", response.Data)
}
if str != "ok" {
return errors.Err(str)
}
return nil
}
func (a *APIConfig) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64, metadataVersion uint) error {
endpoint := a.ApiURL + "/yt/video_status"

View file

@ -174,8 +174,11 @@ func (v *YoutubeVideo) fallbackDownload() error {
cmd := exec.Command("youtube-dl",
v.ID(),
"--no-progress",
"-fbestvideo[ext=mp4,height<=1080,filesize<2000M]+bestaudio/best[ext=mp4,height<=1080,filesize<2000M]",
"-o"+strings.TrimRight(v.getFullPath(), ".mp4"))
"-fbestvideo[ext=mp4,height<=1080,filesize<2000M]+best[ext=mp4,height<=1080,filesize<2000M]",
"-o"+strings.TrimRight(v.getFullPath(), ".mp4"),
"--merge-output-format",
"mp4")
log.Printf("Running command and waiting for it to finish...")
output, err := cmd.CombinedOutput()
log.Debugln(string(output))
@ -321,8 +324,8 @@ func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amou
options := jsonrpc.StreamCreateOptions{
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{
Title: v.title,
Description: v.getAbbrevDescription(),
Title: &v.title,
Description: util.PtrToString(v.getAbbrevDescription()),
ClaimAddress: &claimAddress,
Languages: languages,
ThumbnailURL: &v.thumbnailURL,
@ -463,6 +466,8 @@ func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, exis
Author: util.PtrToString(""),
License: util.PtrToString("Copyrighted (contact publisher)"),
ChannelID: &v.lbryChannelID,
Height: util.PtrToUint(720),
Width: util.PtrToUint(1280),
},
FileSize: &videoSize,
})
@ -487,8 +492,8 @@ func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, exis
ClearTags: util.PtrToBool(true),
StreamCreateOptions: &jsonrpc.StreamCreateOptions{
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{
Title: v.title,
Description: v.getAbbrevDescription(),
Title: &v.title,
Description: util.PtrToString(v.getAbbrevDescription()),
Tags: tags,
Languages: languages,
Locations: locations,
@ -496,8 +501,8 @@ func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, exis
},
Author: util.PtrToString(""),
License: util.PtrToString("Copyrighted (contact publisher)"),
VideoHeight: util.PtrToUint(720),
VideoWidth: util.PtrToUint(1280),
Height: util.PtrToUint(720),
Width: util.PtrToUint(1280),
ReleaseTime: util.PtrToInt64(v.publishedAt.Unix()),
Duration: util.PtrToUint64(uint64(math.Ceil(videoDuration.ToDuration().Seconds()))),
ChannelID: &v.lbryChannelID,

View file

@ -1,6 +1,7 @@
package tagsManager
import (
"sort"
"strings"
)
@ -60,6 +61,8 @@ func SanitizeTags(tags []string, youtubeChannelID string) ([]string, error) {
}
}
sanitizedTags := make([]string, 0, len(originalTags)+len(curatedTags))
sort.Strings(curatedTags)
sort.Strings(originalTags)
sanitizedTags = append(sanitizedTags, curatedTags...)
sanitizedTags = append(sanitizedTags, originalTags...)
return sanitizedTags, nil
@ -130,11 +133,17 @@ func (ts *tagsSanitizer) add() {
const (
Lunduke = "UCkK9UDm_ZNrq_rIXCz3xCGA"
SwissExperiments = "UCNQfQvFMPnInwsU_iGYArJQ"
Juggling = "UC2fhTIbnQlFYaFzyTcmPkXg"
JustJuggling = "UCftqelpjmbFrUwr3VVzzVwA"
JordanBPeterson = "UCL_f53ZEJxp8TtlOkHwMV9Q"
)
var channelWideTags = map[string][]string{
Lunduke: {"linux", "technology"},
SwissExperiments: {"science & technology", "experiments", "switzerland"},
Juggling: {"juggling", "circus arts", "malabares"},
JustJuggling: {"juggling", "circus arts", "malabares"},
JordanBPeterson: {"postmodernism", "psychology", "news"},
}
var tagsToSkip = map[string]*struct{}{
"#hangoutsonair": nil,