Merge pull request #45 from lbryio/ytsync-refactor

Ytsync refactor
This commit is contained in:
Alex Grin 2018-10-08 15:44:58 -04:00 committed by GitHub
commit 7a6eb57280
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 460 additions and 390 deletions

56
Gopkg.lock generated
View file

@ -2,7 +2,7 @@
[[projects]] [[projects]]
digest = "1:90272eae3bf75d2aa681ff3eee6cf4f49e8b06db533dc9c830ef214e5abbaaf2" digest = "1:9a88883f474d09f1da61894cd8115c7f33988d6941e4f6236324c777aaff8f2c"
name = "github.com/PuerkitoBio/goquery" name = "github.com/PuerkitoBio/goquery"
packages = ["."] packages = ["."]
pruneopts = "" pruneopts = ""
@ -10,7 +10,7 @@
version = "v1.4.1" version = "v1.4.1"
[[projects]] [[projects]]
digest = "1:4d0cb5aec47a2aec8b8b211540dc59902f8f00d571281356597b41ae349f4885" digest = "1:e3726ad6f38f710e84c8dcd0e830014de6eaeea81f28d91ae898afecc078479a"
name = "github.com/andybalholm/cascadia" name = "github.com/andybalholm/cascadia"
packages = ["."] packages = ["."]
pruneopts = "" pruneopts = ""
@ -18,7 +18,7 @@
version = "v1.0.0" version = "v1.0.0"
[[projects]] [[projects]]
digest = "1:d4336a8cea03c9729c9be12c142938a28a0dbac23a316f019f6fee323cf03709" digest = "1:261d95f4464744d542759a7a33846f56f24113f5a93c7577f4cd7044f7cb3d76"
name = "github.com/aws/aws-sdk-go" name = "github.com/aws/aws-sdk-go"
packages = [ packages = [
"aws", "aws",
@ -55,7 +55,7 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:ea2251fa804d1b978feac8146d751b32ce2017eaf1f2915fde0df389bacaf383" digest = "1:cc8ebf0c6745d09f728f1fa4fbd29baaa2e3a65efb49b5fefb0c163171ee7863"
name = "github.com/btcsuite/btcd" name = "github.com/btcsuite/btcd"
packages = [ packages = [
"btcec", "btcec",
@ -78,7 +78,7 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:471ae435f9ad7fc2f6b7a2e91ca026a124792859a7033fa60579c3aa618161ed" digest = "1:b0f4d2431c167d7127a029210c1a7cdc33c9114c1b3fd3582347baad5e832588"
name = "github.com/btcsuite/btcutil" name = "github.com/btcsuite/btcutil"
packages = [ packages = [
".", ".",
@ -98,31 +98,20 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:57c911bbbf529465cf2ca5d43546cd5875a59054c41e2fe97791419959282aa1" digest = "1:dfc248d5e6e1582fdec83796d3d1d451aa6cae773c4e4ba1dac2838caef6d381"
name = "github.com/btcsuite/websocket" name = "github.com/btcsuite/websocket"
packages = ["."] packages = ["."]
pruneopts = "" pruneopts = ""
revision = "31079b6807923eb23992c421b114992b95131b55" revision = "31079b6807923eb23992c421b114992b95131b55"
[[projects]] [[projects]]
digest = "1:0a39ec8bf5629610a4bc7873a92039ee509246da3cef1a0ea60f1ed7e5f9cea5" digest = "1:56c130d885a4aacae1dd9c7b71cfe39912c7ebc1ff7d2b46083c8812996dc43b"
name = "github.com/davecgh/go-spew" name = "github.com/davecgh/go-spew"
packages = ["spew"] packages = ["spew"]
pruneopts = "" pruneopts = ""
revision = "346938d642f2ec3594ed81d874461961cd0faa76" revision = "346938d642f2ec3594ed81d874461961cd0faa76"
version = "v1.1.0" version = "v1.1.0"
[[projects]]
digest = "1:4b5f8c148e7fa094b73bcb6d16ea46eac7fdc726e55b81845ff96e29df534421"
name = "github.com/garyburd/redigo"
packages = [
"internal",
"redis",
]
pruneopts = ""
revision = "a69d19351219b6dd56f274f96d85a7014a2ec34e"
version = "v1.6.0"
[[projects]] [[projects]]
digest = "1:968d8903d598e3fae738325d3410f33f07ea6a2b9ee5591e9c262ee37df6845a" digest = "1:968d8903d598e3fae738325d3410f33f07ea6a2b9ee5591e9c262ee37df6845a"
name = "github.com/go-errors/errors" name = "github.com/go-errors/errors"
@ -133,14 +122,14 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:4d5221853226d8d4be594d52d885ddde38170d2e3159b82ed92ecde4dded2304" digest = "1:cd5bab9c9e23ffa6858eaa79dc827fd84bc24bc00b0cfb0b14036e393da2b1fa"
name = "github.com/go-ini/ini" name = "github.com/go-ini/ini"
packages = ["."] packages = ["."]
pruneopts = "" pruneopts = ""
revision = "5cf292cae48347c2490ac1a58fe36735fb78df7e" revision = "5cf292cae48347c2490ac1a58fe36735fb78df7e"
[[projects]] [[projects]]
digest = "1:b1d3041d568e065ab4d76f7477844458e9209c0bb241eaccdc0770bf0a13b120" digest = "1:f958a1c137db276e52f0b50efee41a1a389dcdded59a69711f3e872757dab34b"
name = "github.com/golang/protobuf" name = "github.com/golang/protobuf"
packages = ["proto"] packages = ["proto"]
pruneopts = "" pruneopts = ""
@ -148,7 +137,7 @@
version = "v1.1.0" version = "v1.1.0"
[[projects]] [[projects]]
digest = "1:fe1b4d4cbe48c0d55507c55f8663aa4185576cc58fa0c8be03bb8f19dfe17a9c" digest = "1:64d212c703a2b94054be0ce470303286b177ad260b2f89a307e3d1bb6c073ef6"
name = "github.com/gorilla/websocket" name = "github.com/gorilla/websocket"
packages = ["."] packages = ["."]
pruneopts = "" pruneopts = ""
@ -164,7 +153,7 @@
version = "v1.0" version = "v1.0"
[[projects]] [[projects]]
digest = "1:4f767a115bc8e08576f6d38ab73c376fc1b1cd3bb5041171c9e8668cc7739b52" digest = "1:6f49eae0c1e5dab1dafafee34b207aeb7a42303105960944828c2079b92fc88e"
name = "github.com/jmespath/go-jmespath" name = "github.com/jmespath/go-jmespath"
packages = ["."] packages = ["."]
pruneopts = "" pruneopts = ""
@ -172,14 +161,14 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:375104fd705791c50351e652a9d80321813fefc4f83a7871cb2f6111a5bc1dc3" digest = "1:d261f80387a38eeddc1d819ee9ee56d37ca10fc02e6e09ff400fb0ce146e13dc"
name = "github.com/lbryio/lbryschema.go" name = "github.com/lbryio/lbryschema.go"
packages = ["pb"] packages = ["pb"]
pruneopts = "" pruneopts = ""
revision = "185433f2fd0c732547654749b98b37e56223dd22" revision = "185433f2fd0c732547654749b98b37e56223dd22"
[[projects]] [[projects]]
digest = "1:daad05ffdae6e2cd9bd9bbc14440e7e8e841037141f26a775a5a31b1b61cb14d" digest = "1:5e30b8342813a6a85a647f9277e34ffcd5872dc57ab590dd9b251b145b6ec88f"
name = "github.com/lbryio/ozzo-validation" name = "github.com/lbryio/ozzo-validation"
packages = ["."] packages = ["."]
pruneopts = "" pruneopts = ""
@ -203,7 +192,7 @@
revision = "bb74f1db0675b241733089d5a1faa5dd8b0ef57b" revision = "bb74f1db0675b241733089d5a1faa5dd8b0ef57b"
[[projects]] [[projects]]
digest = "1:ba7d1dfde0c2142011332bffce4d8468310228afd49dd4425ac59fa9124fb7c4" digest = "1:3cb50c403fa46c85697dbc4e06a95008689e058f33466b7eb8d31ea0eb291ea3"
name = "github.com/nlopes/slack" name = "github.com/nlopes/slack"
packages = ["."] packages = ["."]
pruneopts = "" pruneopts = ""
@ -212,7 +201,7 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:1ee326e6da62d87b3a07a9303d2cbb70f974207d14e6d992c61b6e650ff37c50" digest = "1:8d6d81d0d9d8153e65d637bda77a7c4e6ba496c61efac3578d7d8c981ac31a7b"
name = "github.com/rylio/ytdl" name = "github.com/rylio/ytdl"
packages = ["."] packages = ["."]
pruneopts = "" pruneopts = ""
@ -228,7 +217,7 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:4cb1f758b69097d419a148e64c86b358a4f77c695504f99de1ee86617d64f74e" digest = "1:c92f01303e3ab3b5da92657841639cb53d1548f0d2733d12ef3b9fd9d47c869e"
name = "github.com/sirupsen/logrus" name = "github.com/sirupsen/logrus"
packages = ["."] packages = ["."]
pruneopts = "" pruneopts = ""
@ -244,7 +233,7 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:7bcdb212f21d3cf318699d50af69a9192ef73fedad0d94d9ed5616f349457881" digest = "1:bfbf4a9c265ef41f8d03c9d91e340aaddae835710eaed6cd2e6be889cbc05f56"
name = "github.com/spf13/cobra" name = "github.com/spf13/cobra"
packages = ["."] packages = ["."]
pruneopts = "" pruneopts = ""
@ -276,7 +265,7 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:e7dc596c84a908dd326d2c07453307f192976c3edb9482b88290c42076fe378f" digest = "1:8af4dda167d0ef21ab0affc797bff87ed0e87c57bd1d9bf57ad8f72d348c7932"
name = "golang.org/x/crypto" name = "golang.org/x/crypto"
packages = [ packages = [
"ripemd160", "ripemd160",
@ -288,7 +277,7 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:3edb9c19d0b874999053badbbcc08edab3cde0262d2beb36ad6c0d78391c19ac" digest = "1:5dc6753986b9eeba4abdf05dedc5ba06bb52dad43cc8aad35ffb42bb7adfa68f"
name = "golang.org/x/net" name = "golang.org/x/net"
packages = [ packages = [
"context", "context",
@ -301,7 +290,7 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:b1ac49fd3eae66e95230ea3423158b18374a5ad1c53caf89bc7fc1a441e9e0e7" digest = "1:baee54aa41cb93366e76a9c29f8dd2e4c4e6a35ff89551721d5275d2c858edc9"
name = "golang.org/x/sys" name = "golang.org/x/sys"
packages = [ packages = [
"unix", "unix",
@ -312,7 +301,7 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:b4f82373e582dff1470e66574ac664b548aa69cffce0943321797fd75f46ee6f" digest = "1:b064108d68f82d0201d9f812297c928e57488e82ccdb77ed06ac69f64519a890"
name = "google.golang.org/api" name = "google.golang.org/api"
packages = [ packages = [
"gensupport", "gensupport",
@ -325,7 +314,7 @@
revision = "ef86ce4234efee96020bde00391d6a9cfae66561" revision = "ef86ce4234efee96020bde00391d6a9cfae66561"
[[projects]] [[projects]]
digest = "1:05eca53b271663de74078b5484b1995a8d56668a51434a698dc5d0863035d575" digest = "1:f771bf87a3253de520c2af6fb6e75314dce0fedc0b30b208134fe502932bb15d"
name = "gopkg.in/nullbio/null.v6" name = "gopkg.in/nullbio/null.v6"
packages = ["convert"] packages = ["convert"]
pruneopts = "" pruneopts = ""
@ -348,7 +337,6 @@
"github.com/btcsuite/btcutil", "github.com/btcsuite/btcutil",
"github.com/btcsuite/btcutil/base58", "github.com/btcsuite/btcutil/base58",
"github.com/davecgh/go-spew/spew", "github.com/davecgh/go-spew/spew",
"github.com/garyburd/redigo/redis",
"github.com/go-errors/errors", "github.com/go-errors/errors",
"github.com/go-ini/ini", "github.com/go-ini/ini",
"github.com/lbryio/lbryschema.go/pb", "github.com/lbryio/lbryschema.go/pb",

View file

@ -2,10 +2,6 @@
name = "github.com/davecgh/go-spew" name = "github.com/davecgh/go-spew"
version = "1.1.0" version = "1.1.0"
[[constraint]]
name = "github.com/garyburd/redigo"
version = "1.1.0"
[[constraint]] [[constraint]]
name = "github.com/go-errors/errors" name = "github.com/go-errors/errors"
version = "1.0.0" version = "1.0.0"

View file

@ -2,6 +2,7 @@ package cmd
import ( import (
sync "github.com/lbryio/lbry.go/ytsync" sync "github.com/lbryio/lbry.go/ytsync"
"github.com/lbryio/lbry.go/ytsync/sdk"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -22,7 +23,9 @@ func ytcount(cmd *cobra.Command, args []string) {
channelID := args[1] channelID := args[1]
s := sync.Sync{ s := sync.Sync{
YoutubeAPIKey: ytAPIKey, APIConfig: &sdk.APIConfig{
YoutubeAPIKey: ytAPIKey,
},
YoutubeChannelID: channelID, YoutubeChannelID: channelID,
} }

View file

@ -9,6 +9,7 @@ import (
"github.com/lbryio/lbry.go/util" "github.com/lbryio/lbry.go/util"
sync "github.com/lbryio/lbry.go/ytsync" sync "github.com/lbryio/lbry.go/ytsync"
"github.com/lbryio/lbry.go/ytsync/sdk"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -141,35 +142,40 @@ func ytSync(cmd *cobra.Command, args []string) {
blobsDir = usr.HomeDir + "/.lbrynet/blobfiles/" blobsDir = usr.HomeDir + "/.lbrynet/blobfiles/"
} }
sm := sync.SyncManager{ syncProperties := &sdk.SyncProperties{
StopOnError: stopOnError, SyncFrom: syncFrom,
MaxTries: maxTries, SyncUntil: syncUntil,
TakeOverExistingChannel: takeOverExistingChannel, YoutubeChannelID: channelID,
Refill: refill,
Limit: limit,
SkipSpaceCheck: skipSpaceCheck,
SyncUpdate: syncUpdate,
SyncStatus: syncStatus,
SyncFrom: syncFrom,
SyncUntil: syncUntil,
ConcurrentJobs: concurrentJobs,
ConcurrentVideos: concurrentJobs,
HostName: hostname,
YoutubeChannelID: channelID,
YoutubeAPIKey: youtubeAPIKey,
ApiURL: apiURL,
ApiToken: apiToken,
BlobsDir: blobsDir,
VideosLimit: videosLimit,
MaxVideoSize: maxVideoSize,
LbrycrdString: lbrycrdString,
AwsS3ID: awsS3ID,
AwsS3Secret: awsS3Secret,
AwsS3Region: awsS3Region,
AwsS3Bucket: awsS3Bucket,
SingleRun: singleRun,
} }
apiConfig := &sdk.APIConfig{
YoutubeAPIKey: youtubeAPIKey,
ApiURL: apiURL,
ApiToken: apiToken,
HostName: hostname,
}
sm := sync.NewSyncManager(
stopOnError,
maxTries,
takeOverExistingChannel,
refill,
limit,
skipSpaceCheck,
syncUpdate,
concurrentJobs,
concurrentJobs,
blobsDir,
videosLimit,
maxVideoSize,
lbrycrdString,
awsS3ID,
awsS3Secret,
awsS3Region,
awsS3Bucket,
syncStatus,
singleRun,
syncProperties,
apiConfig,
)
err := sm.Start() err := sm.Start()
if err != nil { if err != nil {
sync.SendErrorToSlack(err.Error()) sync.SendErrorToSlack(err.Error())

View file

@ -11,7 +11,7 @@ import (
func (s *Sync) CountVideos() (uint64, error) { func (s *Sync) CountVideos() (uint64, error) {
client := &http.Client{ client := &http.Client{
Transport: &transport.APIKey{Key: s.YoutubeAPIKey}, Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey},
} }
service, err := youtube.New(client) service, err := youtube.New(client)

View file

@ -1,49 +1,71 @@
package ytsync package ytsync
import ( import (
"encoding/json"
"fmt" "fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings" "strings"
"syscall" "syscall"
"time" "time"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/null"
"github.com/lbryio/lbry.go/util" "github.com/lbryio/lbry.go/util"
"github.com/lbryio/lbry.go/ytsync/namer"
"github.com/lbryio/lbry.go/ytsync/sdk"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
type SyncManager struct { type SyncManager struct {
StopOnError bool stopOnError bool
MaxTries int maxTries int
TakeOverExistingChannel bool takeOverExistingChannel bool
Refill int refill int
Limit int limit int
SkipSpaceCheck bool skipSpaceCheck bool
SyncUpdate bool syncUpdate bool
SyncStatus string concurrentJobs int
SyncFrom int64 concurrentVideos int
SyncUntil int64 blobsDir string
ConcurrentJobs int videosLimit int
ConcurrentVideos int maxVideoSize int
HostName string lbrycrdString string
YoutubeChannelID string awsS3ID string
YoutubeAPIKey string awsS3Secret string
ApiURL string awsS3Region string
ApiToken string syncStatus string
BlobsDir string awsS3Bucket string
VideosLimit int singleRun bool
MaxVideoSize int syncProperties *sdk.SyncProperties
LbrycrdString string apiConfig *sdk.APIConfig
AwsS3ID string namer *namer.Namer
AwsS3Secret string }
AwsS3Region string
AwsS3Bucket string func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool, refill int, limit int,
SingleRun bool skipSpaceCheck bool, syncUpdate bool, concurrentJobs int, concurrentVideos int, blobsDir string, videosLimit int,
maxVideoSize int, lbrycrdString string, awsS3ID string, awsS3Secret string, awsS3Region string, awsS3Bucket string,
syncStatus string, singleRun bool, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig) *SyncManager {
return &SyncManager{
stopOnError: stopOnError,
maxTries: maxTries,
takeOverExistingChannel: takeOverExistingChannel,
refill: refill,
limit: limit,
skipSpaceCheck: skipSpaceCheck,
syncUpdate: syncUpdate,
concurrentJobs: concurrentJobs,
concurrentVideos: concurrentVideos,
blobsDir: blobsDir,
videosLimit: videosLimit,
maxVideoSize: maxVideoSize,
lbrycrdString: lbrycrdString,
awsS3ID: awsS3ID,
awsS3Secret: awsS3Secret,
awsS3Region: awsS3Region,
awsS3Bucket: awsS3Bucket,
syncStatus: syncStatus,
singleRun: singleRun,
syncProperties: syncProperties,
apiConfig: apiConfig,
namer: namer.NewNamer(),
}
} }
const ( const (
@ -57,143 +79,13 @@ const (
var SyncStatuses = []string{StatusPending, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized} var SyncStatuses = []string{StatusPending, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized}
type apiJobsResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data []apiYoutubeChannel `json:"data"`
}
type apiYoutubeChannel struct {
ChannelId string `json:"channel_id"`
TotalVideos uint `json:"total_videos"`
DesiredChannelName string `json:"desired_channel_name"`
SyncServer null.String `json:"sync_server"`
}
func (s *SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
endpoint := s.ApiURL + "/yt/jobs"
res, _ := http.PostForm(endpoint, url.Values{
"auth_token": {s.ApiToken},
"sync_status": {status},
"min_videos": {strconv.Itoa(1)},
"after": {strconv.Itoa(int(s.SyncFrom))},
"before": {strconv.Itoa(int(s.SyncUntil))},
"sync_server": {s.HostName},
"channel_id": {s.YoutubeChannelID},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiJobsResponse
err := json.Unmarshal(body, &response)
if err != nil {
return nil, err
}
if response.Data == nil {
return nil, errors.Err(response.Error)
}
log.Printf("Fetched channels: %d", len(response.Data))
return response.Data, nil
}
type apiChannelStatusResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data []syncedVideo `json:"data"`
}
type syncedVideo struct {
VideoID string `json:"video_id"`
Published bool `json:"published"`
FailureReason string `json:"failure_reason"`
ClaimName string `json:"claim_name"`
}
func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, map[string]bool, error) {
endpoint := s.ApiURL + "/yt/channel_status"
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:maxReasonLength]
}
res, _ := http.PostForm(endpoint, url.Values{
"channel_id": {channelID},
"sync_server": {s.HostName},
"auth_token": {s.ApiToken},
"sync_status": {status},
"failure_reason": {failureReason},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiChannelStatusResponse
err := json.Unmarshal(body, &response)
if err != nil {
return nil, nil, err
}
if !response.Error.IsNull() {
return nil, nil, errors.Err(response.Error.String)
}
if response.Data != nil {
svs := make(map[string]syncedVideo)
claimNames := make(map[string]bool)
for _, v := range response.Data {
svs[v.VideoID] = v
claimNames[v.ClaimName] = v.Published
}
return svs, claimNames, nil
}
return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
const ( const (
VideoStatusPublished = "published" VideoStatusPublished = "published"
VideoStatusFailed = "failed" VideoStatusFailed = "failed"
) )
func (s *SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error {
endpoint := s.ApiURL + "/yt/video_status"
if len(failureReason) > maxReasonLength {
failureReason = failureReason[:maxReasonLength]
}
vals := url.Values{
"youtube_channel_id": {channelID},
"video_id": {videoID},
"status": {status},
"auth_token": {s.ApiToken},
}
if status == VideoStatusPublished {
if claimID == "" || claimName == "" {
return errors.Err("claimID or claimName missing")
}
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
vals.Add("claim_id", claimID)
vals.Add("claim_name", claimName)
if size != nil {
vals.Add("size", strconv.FormatInt(*size, 10))
}
}
if failureReason != "" {
vals.Add("failure_reason", failureReason)
}
res, _ := http.PostForm(endpoint, vals)
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data null.String `json:"data"`
}
err := json.Unmarshal(body, &response)
if err != nil {
return err
}
if !response.Error.IsNull() {
return errors.Err(response.Error.String)
}
if !response.Data.IsNull() && response.Data.String == "ok" {
return nil
}
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
func (s *SyncManager) Start() error { func (s *SyncManager) Start() error {
syncCount := 0 syncCount := 0
for { for {
err := s.checkUsedSpace() err := s.checkUsedSpace()
@ -204,9 +96,9 @@ func (s *SyncManager) Start() error {
var syncs []Sync var syncs []Sync
shouldInterruptLoop := false shouldInterruptLoop := false
isSingleChannelSync := s.YoutubeChannelID != "" isSingleChannelSync := s.syncProperties.YoutubeChannelID != ""
if isSingleChannelSync { if isSingleChannelSync {
channels, err := s.fetchChannels("") channels, err := s.apiConfig.FetchChannels("", s.syncProperties)
if err != nil { if err != nil {
return err return err
} }
@ -216,52 +108,53 @@ func (s *SyncManager) Start() error {
lbryChannelName := channels[0].DesiredChannelName lbryChannelName := channels[0].DesiredChannelName
syncs = make([]Sync, 1) syncs = make([]Sync, 1)
syncs[0] = Sync{ syncs[0] = Sync{
YoutubeAPIKey: s.YoutubeAPIKey, APIConfig: s.apiConfig,
YoutubeChannelID: s.YoutubeChannelID, YoutubeChannelID: s.syncProperties.YoutubeChannelID,
LbryChannelName: lbryChannelName, LbryChannelName: lbryChannelName,
StopOnError: s.StopOnError, StopOnError: s.stopOnError,
MaxTries: s.MaxTries, MaxTries: s.maxTries,
ConcurrentVideos: s.ConcurrentVideos, ConcurrentVideos: s.concurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel, TakeOverExistingChannel: s.takeOverExistingChannel,
Refill: s.Refill, Refill: s.refill,
Manager: s, Manager: s,
LbrycrdString: s.LbrycrdString, LbrycrdString: s.lbrycrdString,
AwsS3ID: s.AwsS3ID, AwsS3ID: s.awsS3ID,
AwsS3Secret: s.AwsS3Secret, AwsS3Secret: s.awsS3Secret,
AwsS3Region: s.AwsS3Region, AwsS3Region: s.awsS3Region,
AwsS3Bucket: s.AwsS3Bucket, AwsS3Bucket: s.awsS3Bucket,
namer: s.namer,
} }
shouldInterruptLoop = true shouldInterruptLoop = true
} else { } else {
var queuesToSync []string var queuesToSync []string
if s.SyncStatus != "" { if s.syncStatus != "" {
queuesToSync = append(queuesToSync, s.SyncStatus) queuesToSync = append(queuesToSync, s.syncStatus)
} else if s.SyncUpdate { } else if s.syncUpdate {
queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced) queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced)
} else { } else {
queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued) queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued)
} }
for _, q := range queuesToSync { for _, q := range queuesToSync {
channels, err := s.fetchChannels(q) channels, err := s.apiConfig.FetchChannels(q, s.syncProperties)
if err != nil { if err != nil {
return err return err
} }
for _, c := range channels { for _, c := range channels {
syncs = append(syncs, Sync{ syncs = append(syncs, Sync{
YoutubeAPIKey: s.YoutubeAPIKey, APIConfig: s.apiConfig,
YoutubeChannelID: c.ChannelId, YoutubeChannelID: c.ChannelId,
LbryChannelName: c.DesiredChannelName, LbryChannelName: c.DesiredChannelName,
StopOnError: s.StopOnError, StopOnError: s.stopOnError,
MaxTries: s.MaxTries, MaxTries: s.maxTries,
ConcurrentVideos: s.ConcurrentVideos, ConcurrentVideos: s.concurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel, TakeOverExistingChannel: s.takeOverExistingChannel,
Refill: s.Refill, Refill: s.refill,
Manager: s, Manager: s,
LbrycrdString: s.LbrycrdString, LbrycrdString: s.lbrycrdString,
AwsS3ID: s.AwsS3ID, AwsS3ID: s.awsS3ID,
AwsS3Secret: s.AwsS3Secret, AwsS3Secret: s.awsS3Secret,
AwsS3Region: s.AwsS3Region, AwsS3Region: s.awsS3Region,
AwsS3Bucket: s.AwsS3Bucket, AwsS3Bucket: s.awsS3Bucket,
}) })
} }
} }
@ -294,12 +187,12 @@ func (s *SyncManager) Start() error {
if !shouldNotCount { if !shouldNotCount {
syncCount++ syncCount++
} }
if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) { if sync.IsInterrupted() || (s.limit != 0 && syncCount >= s.limit) {
shouldInterruptLoop = true shouldInterruptLoop = true
break break
} }
} }
if shouldInterruptLoop || s.SingleRun { if shouldInterruptLoop || s.singleRun {
break break
} }
} }
@ -307,11 +200,11 @@ func (s *SyncManager) Start() error {
} }
func (s *SyncManager) checkUsedSpace() error { func (s *SyncManager) checkUsedSpace() error {
usedPctile, err := GetUsedSpace(s.BlobsDir) usedPctile, err := GetUsedSpace(s.blobsDir)
if err != nil { if err != nil {
return err return err
} }
if usedPctile >= 0.90 && !s.SkipSpaceCheck { if usedPctile >= 0.90 && !s.skipSpaceCheck {
return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100)) return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100))
} }
log.Infof("disk usage: %.1f%%", usedPctile*100) log.Infof("disk usage: %.1f%%", usedPctile*100)

83
ytsync/namer/names.go Normal file
View file

@ -0,0 +1,83 @@
package namer
import (
"crypto/md5"
"encoding/hex"
"fmt"
"regexp"
"strconv"
"strings"
"sync"
)
var titleRegexp = regexp.MustCompile(`[^a-zA-Z0-9]+`)
type Namer struct {
mu *sync.Mutex
names map[string]bool
}
func NewNamer() *Namer {
return &Namer{
mu: &sync.Mutex{},
names: make(map[string]bool),
}
}
func (n *Namer) SetNames(names map[string]bool) {
n.names = names
}
func (n *Namer) GetNextName(prefix string) string {
n.mu.Lock()
defer n.mu.Unlock()
attempt := 1
var name string
for {
name = getClaimNameFromTitle(prefix, attempt)
if _, exists := n.names[name]; !exists {
break
}
attempt++
}
//if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash
if len(name) < 2 {
sum := md5.Sum([]byte(prefix))
name = fmt.Sprintf("%s-%d", hex.EncodeToString(sum[:])[:15], attempt)
}
n.names[name] = true
return name
}
// TODO: clean this up some
func getClaimNameFromTitle(title string, attempt int) string {
suffix := ""
if attempt > 1 {
suffix = "-" + strconv.Itoa(attempt)
}
maxLen := 40 - len(suffix)
chunks := strings.Split(strings.ToLower(strings.Trim(titleRegexp.ReplaceAllString(title, "-"), "-")), "-")
name := chunks[0]
if len(name) > maxLen {
return name[:maxLen]
}
for _, chunk := range chunks[1:] {
tmpName := name + "-" + chunk
if len(tmpName) > maxLen {
if len(name) < 20 {
name = tmpName[:maxLen]
}
break
}
name = tmpName
}
return name + suffix
}

170
ytsync/sdk/api.go Normal file
View file

@ -0,0 +1,170 @@
package sdk
import (
"encoding/json"
"io/ioutil"
"log"
"net/http"
"net/url"
"strconv"
"time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/null"
)
const (
MaxReasonLength = 500
)
type APIConfig struct {
YoutubeAPIKey string
ApiURL string
ApiToken string
HostName string
}
type SyncProperties struct {
SyncFrom int64
SyncUntil int64
YoutubeChannelID string
}
type YoutubeChannel struct {
ChannelId string `json:"channel_id"`
TotalVideos uint `json:"total_videos"`
DesiredChannelName string `json:"desired_channel_name"`
SyncServer null.String `json:"sync_server"`
Fee *struct {
Amount string `json:"amount"`
Address string `json:"address"`
Currency string `json:"currency"`
} `json:"fee"`
}
func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeChannel, error) {
type apiJobsResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data []YoutubeChannel `json:"data"`
}
endpoint := a.ApiURL + "/yt/jobs"
res, _ := http.PostForm(endpoint, url.Values{
"auth_token": {a.ApiToken},
"sync_status": {status},
"min_videos": {strconv.Itoa(1)},
"after": {strconv.Itoa(int(cp.SyncFrom))},
"before": {strconv.Itoa(int(cp.SyncUntil))},
"sync_server": {a.HostName},
"channel_id": {cp.YoutubeChannelID},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiJobsResponse
err := json.Unmarshal(body, &response)
if err != nil {
return nil, err
}
if response.Data == nil {
return nil, errors.Err(response.Error)
}
log.Printf("Fetched channels: %d", len(response.Data))
return response.Data, nil
}
type SyncedVideo struct {
VideoID string `json:"video_id"`
Published bool `json:"published"`
FailureReason string `json:"failure_reason"`
ClaimName string `json:"claim_name"`
}
func (a *APIConfig) SetChannelStatus(channelID string, status string, failureReason string) (map[string]SyncedVideo, map[string]bool, error) {
type apiChannelStatusResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data []SyncedVideo `json:"data"`
}
endpoint := a.ApiURL + "/yt/channel_status"
if len(failureReason) > MaxReasonLength {
failureReason = failureReason[:MaxReasonLength]
}
res, _ := http.PostForm(endpoint, url.Values{
"channel_id": {channelID},
"sync_server": {a.HostName},
"auth_token": {a.ApiToken},
"sync_status": {status},
"failure_reason": {failureReason},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiChannelStatusResponse
err := json.Unmarshal(body, &response)
if err != nil {
return nil, nil, err
}
if !response.Error.IsNull() {
return nil, nil, errors.Err(response.Error.String)
}
if response.Data != nil {
svs := make(map[string]SyncedVideo)
claimNames := make(map[string]bool)
for _, v := range response.Data {
svs[v.VideoID] = v
claimNames[v.ClaimName] = v.Published
}
return svs, claimNames, nil
}
return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
const (
VideoStatusPublished = "published"
VideoStatusFailed = "failed"
)
func (a *APIConfig) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error {
endpoint := a.ApiURL + "/yt/video_status"
if len(failureReason) > MaxReasonLength {
failureReason = failureReason[:MaxReasonLength]
}
vals := url.Values{
"youtube_channel_id": {channelID},
"video_id": {videoID},
"status": {status},
"auth_token": {a.ApiToken},
}
if status == VideoStatusPublished {
if claimID == "" || claimName == "" {
return errors.Err("claimID or claimName missing")
}
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
vals.Add("claim_id", claimID)
vals.Add("claim_name", claimName)
if size != nil {
vals.Add("size", strconv.FormatInt(*size, 10))
}
}
if failureReason != "" {
vals.Add("failure_reason", failureReason)
}
res, _ := http.PostForm(endpoint, vals)
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data null.String `json:"data"`
}
err := json.Unmarshal(body, &response)
if err != nil {
return err
}
if !response.Error.IsNull() {
return errors.Err(response.Error.String)
}
if !response.Data.IsNull() && response.Data.String == "ok" {
return nil
}
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
}

View file

@ -50,8 +50,8 @@ func (s *Sync) walletSetup() error {
s.syncedVideosMux.RUnlock() s.syncedVideosMux.RUnlock()
log.Debugf("We already allocated credits for %d videos", numPublished) log.Debugf("We already allocated credits for %d videos", numPublished)
if numOnSource-numPublished > s.Manager.VideosLimit { if numOnSource-numPublished > s.Manager.videosLimit {
numOnSource = s.Manager.VideosLimit numOnSource = s.Manager.videosLimit
} }
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount

View file

@ -1,90 +1,27 @@
package sources package sources
import ( import (
"fmt"
"regexp"
"strconv"
"strings" "strings"
"sync"
"crypto/md5"
"encoding/hex"
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
log "github.com/sirupsen/logrus" "github.com/lbryio/lbry.go/ytsync/namer"
) )
var titleRegexp = regexp.MustCompile(`[^a-zA-Z0-9]+`)
type SyncSummary struct { type SyncSummary struct {
ClaimID string ClaimID string
ClaimName string ClaimName string
} }
func getClaimNameFromTitle(title string, attempt int) string { func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, namer *namer.Namer) (*SyncSummary, error) {
suffix := ""
if attempt > 1 {
suffix = "-" + strconv.Itoa(attempt)
}
maxLen := 40 - len(suffix)
chunks := strings.Split(strings.ToLower(strings.Trim(titleRegexp.ReplaceAllString(title, "-"), "-")), "-")
name := chunks[0]
if len(name) > maxLen {
return name[:maxLen]
}
for _, chunk := range chunks[1:] {
tmpName := name + "-" + chunk
if len(tmpName) > maxLen {
if len(name) < 20 {
name = tmpName[:maxLen]
}
break
}
name = tmpName
}
return name + suffix
}
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
attempt := 0
for { for {
attempt++ name := namer.GetNextName(title)
name := getClaimNameFromTitle(title, attempt)
syncedVideosMux.Lock()
_, exists := claimNames[name]
if exists {
log.Printf("name exists, retrying (%d attempts so far)", attempt)
syncedVideosMux.Unlock()
continue
}
claimNames[name] = false
syncedVideosMux.Unlock()
//if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash
if len(name) < 2 {
hasher := md5.New()
hasher.Write([]byte(title))
name = fmt.Sprintf("%s-%d", hex.EncodeToString(hasher.Sum(nil))[:15], attempt)
}
response, err := daemon.Publish(name, filename, amount, options) response, err := daemon.Publish(name, filename, amount, options)
if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") { if err != nil {
syncedVideosMux.Lock() if strings.Contains(err.Error(), "failed: Multiple claims (") {
claimNames[name] = true
syncedVideosMux.Unlock()
if err == nil {
return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
} else {
log.Printf("name exists, retrying (%d attempts so far)", attempt)
continue continue
} }
} else {
return nil, err return nil, err
} }
return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
} }
} }

View file

@ -6,15 +6,15 @@ import (
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
"time"
"sync" "sync"
"time"
"github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/ytsync/namer"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
@ -174,7 +174,7 @@ func (v *ucbVideo) saveThumbnail() error {
return err return err
} }
func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *namer.Namer) (*SyncSummary, error) {
options := jsonrpc.PublishOptions{ options := jsonrpc.PublishOptions{
Title: &v.title, Title: &v.title,
Author: strPtr("UC Berkeley"), Author: strPtr("UC Berkeley"),
@ -187,16 +187,14 @@ func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount f
ChangeAddress: &claimAddress, ChangeAddress: &claimAddress,
} }
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux) return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, namer)
} }
func (v *ucbVideo) Size() *int64 { func (v *ucbVideo) Size() *int64 {
return nil return nil
} }
func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) { func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *namer.Namer) (*SyncSummary, error) {
v.claimNames = claimNames
v.syncedVideosMux = syncedVideosMux
//download and thumbnail can be done in parallel //download and thumbnail can be done in parallel
err := v.download() err := v.download()
if err != nil { if err != nil {
@ -210,7 +208,7 @@ func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount floa
//} //}
//log.Debugln("Created thumbnail for " + v.id) //log.Debugln("Created thumbnail for " + v.id)
summary, err := v.publish(daemon, claimAddress, amount, channelID) summary, err := v.publish(daemon, claimAddress, amount, channelID, namer)
if err != nil { if err != nil {
return nil, errors.Prefix("publish error", err) return nil, errors.Prefix("publish error", err)
} }

View file

@ -9,12 +9,12 @@ import (
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
"time"
"sync" "sync"
"time"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/ytsync/namer"
"github.com/rylio/ytdl" "github.com/rylio/ytdl"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -155,6 +155,8 @@ func (v *YoutubeVideo) download() error {
err = videoInfo.Download(formats[formatIndex], downloadedFile) err = videoInfo.Download(formats[formatIndex], downloadedFile)
downloadedFile.Close() downloadedFile.Close()
if err != nil { if err != nil {
//delete the video and ignore the error
_ = v.delete()
break break
} }
fi, err := os.Stat(v.getFilename()) fi, err := os.Stat(v.getFilename())
@ -172,7 +174,6 @@ func (v *YoutubeVideo) download() error {
break break
} }
} }
return err return err
} }
@ -234,7 +235,7 @@ func (v *YoutubeVideo) triggerThumbnailSave() error {
func strPtr(s string) *string { return &s } func strPtr(s string) *string { return &s }
func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) { func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *namer.Namer) (*SyncSummary, error) {
if channelID == "" { if channelID == "" {
return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed? return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed?
} }
@ -249,18 +250,16 @@ func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amou
ChangeAddress: &claimAddress, ChangeAddress: &claimAddress,
ChannelID: &channelID, ChannelID: &channelID,
} }
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, namer)
} }
func (v *YoutubeVideo) Size() *int64 { func (v *YoutubeVideo) Size() *int64 {
return v.size return v.size
} }
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) { func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *namer.Namer) (*SyncSummary, error) {
v.claimNames = claimNames
v.syncedVideosMux = syncedVideosMux
v.maxVideoSize = int64(maxVideoSize) * 1024 * 1024 v.maxVideoSize = int64(maxVideoSize) * 1024 * 1024
//download and thumbnail can be done in parallel //download and thumbnail can be done in parallel
err := v.download() err := v.download()
if err != nil { if err != nil {
@ -274,14 +273,11 @@ func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount
} }
log.Debugln("Created thumbnail for " + v.id) log.Debugln("Created thumbnail for " + v.id)
summary, err := v.publish(daemon, claimAddress, amount, channelID) summary, err := v.publish(daemon, claimAddress, amount, channelID, namer)
//delete the video in all cases (and ignore the error) //delete the video in all cases (and ignore the error)
_ = v.delete() _ = v.delete()
if err != nil {
return nil, errors.Prefix("publish error", err)
}
return summary, nil return summary, errors.Prefix("publish error", err)
} }
// sorting videos // sorting videos

View file

@ -17,17 +17,20 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stop"
"github.com/lbryio/lbry.go/util"
"github.com/lbryio/lbry.go/ytsync/namer"
"github.com/lbryio/lbry.go/ytsync/sdk"
"github.com/lbryio/lbry.go/ytsync/sources"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stop"
"github.com/lbryio/lbry.go/util"
"github.com/lbryio/lbry.go/ytsync/sources"
"github.com/mitchellh/go-ps" "github.com/mitchellh/go-ps"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"google.golang.org/api/googleapi/transport" "google.golang.org/api/googleapi/transport"
@ -46,7 +49,7 @@ type video interface {
IDAndNum() string IDAndNum() string
PlaylistPosition() int PlaylistPosition() int
PublishedAt() time.Time PublishedAt() time.Time
Sync(*jsonrpc.Client, string, float64, string, int, map[string]bool, *sync.RWMutex) (*sources.SyncSummary, error) Sync(*jsonrpc.Client, string, float64, string, int, *namer.Namer) (*sources.SyncSummary, error)
} }
// sorting videos // sorting videos
@ -58,7 +61,7 @@ func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[
// Sync stores the options that control how syncing happens // Sync stores the options that control how syncing happens
type Sync struct { type Sync struct {
YoutubeAPIKey string APIConfig *sdk.APIConfig
YoutubeChannelID string YoutubeChannelID string
LbryChannelName string LbryChannelName string
StopOnError bool StopOnError bool
@ -77,10 +80,10 @@ type Sync struct {
claimAddress string claimAddress string
videoDirectory string videoDirectory string
syncedVideosMux *sync.RWMutex syncedVideosMux *sync.RWMutex
syncedVideos map[string]syncedVideo syncedVideos map[string]sdk.SyncedVideo
claimNames map[string]bool
grp *stop.Group grp *stop.Group
lbryChannelID string lbryChannelID string
namer *namer.Namer
walletMux *sync.Mutex walletMux *sync.Mutex
queue chan video queue chan video
@ -89,14 +92,11 @@ type Sync struct {
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) { func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) {
s.syncedVideosMux.Lock() s.syncedVideosMux.Lock()
defer s.syncedVideosMux.Unlock() defer s.syncedVideosMux.Unlock()
s.syncedVideos[videoID] = syncedVideo{ s.syncedVideos[videoID] = sdk.SyncedVideo{
VideoID: videoID, VideoID: videoID,
Published: published, Published: published,
FailureReason: failureReason, FailureReason: failureReason,
} }
if claimName != "" {
s.claimNames[claimName] = true
}
} }
// SendErrorToSlack Sends an error message to the default channel and to the process log. // SendErrorToSlack Sends an error message to the default channel and to the process log.
@ -221,13 +221,13 @@ func (s *Sync) uploadWallet() error {
} }
func (s *Sync) setStatusSyncing() error { func (s *Sync) setStatusSyncing() error {
syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "") syncedVideos, claimNames, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
if err != nil { if err != nil {
return err return err
} }
s.syncedVideosMux.Lock() s.syncedVideosMux.Lock()
s.syncedVideos = syncedVideos s.syncedVideos = syncedVideos
s.claimNames = claimNames s.Manager.namer.SetNames(claimNames)
s.syncedVideosMux.Unlock() s.syncedVideosMux.Unlock()
return nil return nil
} }
@ -260,7 +260,7 @@ func (s *Sync) FullCycle() (e error) {
err = s.downloadWallet() err = s.downloadWallet()
if err != nil && err.Error() != "wallet not on S3" { if err != nil && err.Error() != "wallet not on S3" {
return errors.Prefix("failure in downloading wallet: ", err) return errors.Prefix("failure in downloading wallet", err)
} else if err == nil { } else if err == nil {
log.Println("Continuing previous upload") log.Println("Continuing previous upload")
} else { } else {
@ -311,14 +311,13 @@ func (s *Sync) setChannelTerminationStatus(e *error) {
return return
} }
failureReason := (*e).Error() failureReason := (*e).Error()
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason) _, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
if err != nil { if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) msg := fmt.Sprintf("Failed setting failed state for channel %s", s.LbryChannelName)
err = errors.Prefix(msg, err) *e = errors.Prefix(msg+err.Error(), *e)
*e = errors.Prefix(err.Error(), *e)
} }
} else if !s.IsInterrupted() { } else if !s.IsInterrupted() {
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "") _, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSynced, "")
if err != nil { if err != nil {
*e = err *e = err
} }
@ -359,7 +358,7 @@ func (s *Sync) stopAndUploadWallet(e *error) {
e = &err //not 100% sure e = &err //not 100% sure
return return
} else { } else {
*e = errors.Prefix("failure uploading wallet: ", *e) *e = errors.Prefix("failure uploading wallet", *e)
} }
} }
} }
@ -424,7 +423,7 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err
pv, ok := s.syncedVideos[videoID] pv, ok := s.syncedVideos[videoID]
if !ok || pv.ClaimName != c.Name { if !ok || pv.ClaimName != c.Name {
fixed++ fixed++
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil) err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil)
if err != nil { if err != nil {
return total, fixed, err return total, fixed, err
} }
@ -438,11 +437,11 @@ func (s *Sync) doSync() error {
var err error var err error
claims, err := s.daemon.ClaimListMine() claims, err := s.daemon.ClaimListMine()
if err != nil { if err != nil {
return errors.Prefix("cannot list claims: ", err) return errors.Prefix("cannot list claims", err)
} }
hasDupes, err := s.fixDupes(*claims) hasDupes, err := s.fixDupes(*claims)
if err != nil { if err != nil {
return errors.Prefix("error checking for duplicates: ", err) return errors.Prefix("error checking for duplicates", err)
} }
if hasDupes { if hasDupes {
SendInfoToSlack("Channel had dupes and was fixed!") SendInfoToSlack("Channel had dupes and was fixed!")
@ -452,13 +451,13 @@ func (s *Sync) doSync() error {
} }
claims, err = s.daemon.ClaimListMine() claims, err = s.daemon.ClaimListMine()
if err != nil { if err != nil {
return errors.Prefix("cannot list claims: ", err) return errors.Prefix("cannot list claims", err)
} }
} }
pubsOnWallet, nFixed, err := s.updateRemoteDB(*claims) pubsOnWallet, nFixed, err := s.updateRemoteDB(*claims)
if err != nil { if err != nil {
return errors.Prefix("error counting claims: ", err) return errors.Prefix("error counting claims", err)
} }
if nFixed > 0 { if nFixed > 0 {
err := s.setStatusSyncing() err := s.setStatusSyncing()
@ -466,7 +465,6 @@ func (s *Sync) doSync() error {
return err return err
} }
SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed) SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed)
} }
pubsOnDB := 0 pubsOnDB := 0
for _, sv := range s.syncedVideos { for _, sv := range s.syncedVideos {
@ -593,7 +591,7 @@ func (s *Sync) startWorker(workerNum int) {
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg) SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
} }
s.AppendSyncedVideo(v.ID(), false, err.Error(), "") s.AppendSyncedVideo(v.ID(), false, err.Error(), "")
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size()) err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
if err != nil { if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
} }
@ -605,7 +603,7 @@ func (s *Sync) startWorker(workerNum int) {
func (s *Sync) enqueueYoutubeVideos() error { func (s *Sync) enqueueYoutubeVideos() error {
client := &http.Client{ client := &http.Client{
Transport: &transport.APIKey{Key: s.YoutubeAPIKey}, Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey},
} }
service, err := youtube.New(client) service, err := youtube.New(client)
@ -770,7 +768,7 @@ func (s *Sync) processVideo(v video) (err error) {
return nil return nil
} }
if v.PlaylistPosition() > s.Manager.VideosLimit { if v.PlaylistPosition() > s.Manager.videosLimit {
log.Println(v.ID() + " is old: skipping") log.Println(v.ID() + " is old: skipping")
return nil return nil
} }
@ -778,11 +776,13 @@ func (s *Sync) processVideo(v video) (err error) {
if err != nil { if err != nil {
return err return err
} }
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.claimNames, s.syncedVideosMux)
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.maxVideoSize, s.namer)
if err != nil { if err != nil {
return err return err
} }
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
if err != nil { if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
} }