Ytsync refactor #45
13 changed files with 460 additions and 390 deletions
56
Gopkg.lock
generated
56
Gopkg.lock
generated
|
@ -2,7 +2,7 @@
|
|||
|
||||
|
||||
[[projects]]
|
||||
digest = "1:90272eae3bf75d2aa681ff3eee6cf4f49e8b06db533dc9c830ef214e5abbaaf2"
|
||||
digest = "1:9a88883f474d09f1da61894cd8115c7f33988d6941e4f6236324c777aaff8f2c"
|
||||
name = "github.com/PuerkitoBio/goquery"
|
||||
packages = ["."]
|
||||
pruneopts = ""
|
||||
|
@ -10,7 +10,7 @@
|
|||
version = "v1.4.1"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:4d0cb5aec47a2aec8b8b211540dc59902f8f00d571281356597b41ae349f4885"
|
||||
digest = "1:e3726ad6f38f710e84c8dcd0e830014de6eaeea81f28d91ae898afecc078479a"
|
||||
name = "github.com/andybalholm/cascadia"
|
||||
packages = ["."]
|
||||
pruneopts = ""
|
||||
|
@ -18,7 +18,7 @@
|
|||
version = "v1.0.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:d4336a8cea03c9729c9be12c142938a28a0dbac23a316f019f6fee323cf03709"
|
||||
digest = "1:261d95f4464744d542759a7a33846f56f24113f5a93c7577f4cd7044f7cb3d76"
|
||||
name = "github.com/aws/aws-sdk-go"
|
||||
packages = [
|
||||
"aws",
|
||||
|
@ -55,7 +55,7 @@
|
|||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:ea2251fa804d1b978feac8146d751b32ce2017eaf1f2915fde0df389bacaf383"
|
||||
digest = "1:cc8ebf0c6745d09f728f1fa4fbd29baaa2e3a65efb49b5fefb0c163171ee7863"
|
||||
name = "github.com/btcsuite/btcd"
|
||||
packages = [
|
||||
"btcec",
|
||||
|
@ -78,7 +78,7 @@
|
|||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:471ae435f9ad7fc2f6b7a2e91ca026a124792859a7033fa60579c3aa618161ed"
|
||||
digest = "1:b0f4d2431c167d7127a029210c1a7cdc33c9114c1b3fd3582347baad5e832588"
|
||||
name = "github.com/btcsuite/btcutil"
|
||||
packages = [
|
||||
".",
|
||||
|
@ -98,31 +98,20 @@
|
|||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:57c911bbbf529465cf2ca5d43546cd5875a59054c41e2fe97791419959282aa1"
|
||||
digest = "1:dfc248d5e6e1582fdec83796d3d1d451aa6cae773c4e4ba1dac2838caef6d381"
|
||||
name = "github.com/btcsuite/websocket"
|
||||
packages = ["."]
|
||||
pruneopts = ""
|
||||
revision = "31079b6807923eb23992c421b114992b95131b55"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:0a39ec8bf5629610a4bc7873a92039ee509246da3cef1a0ea60f1ed7e5f9cea5"
|
||||
digest = "1:56c130d885a4aacae1dd9c7b71cfe39912c7ebc1ff7d2b46083c8812996dc43b"
|
||||
name = "github.com/davecgh/go-spew"
|
||||
packages = ["spew"]
|
||||
pruneopts = ""
|
||||
revision = "346938d642f2ec3594ed81d874461961cd0faa76"
|
||||
version = "v1.1.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:4b5f8c148e7fa094b73bcb6d16ea46eac7fdc726e55b81845ff96e29df534421"
|
||||
name = "github.com/garyburd/redigo"
|
||||
packages = [
|
||||
"internal",
|
||||
"redis",
|
||||
]
|
||||
pruneopts = ""
|
||||
revision = "a69d19351219b6dd56f274f96d85a7014a2ec34e"
|
||||
version = "v1.6.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:968d8903d598e3fae738325d3410f33f07ea6a2b9ee5591e9c262ee37df6845a"
|
||||
name = "github.com/go-errors/errors"
|
||||
|
@ -133,14 +122,14 @@
|
|||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:4d5221853226d8d4be594d52d885ddde38170d2e3159b82ed92ecde4dded2304"
|
||||
digest = "1:cd5bab9c9e23ffa6858eaa79dc827fd84bc24bc00b0cfb0b14036e393da2b1fa"
|
||||
name = "github.com/go-ini/ini"
|
||||
packages = ["."]
|
||||
pruneopts = ""
|
||||
revision = "5cf292cae48347c2490ac1a58fe36735fb78df7e"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:b1d3041d568e065ab4d76f7477844458e9209c0bb241eaccdc0770bf0a13b120"
|
||||
digest = "1:f958a1c137db276e52f0b50efee41a1a389dcdded59a69711f3e872757dab34b"
|
||||
name = "github.com/golang/protobuf"
|
||||
packages = ["proto"]
|
||||
pruneopts = ""
|
||||
|
@ -148,7 +137,7 @@
|
|||
version = "v1.1.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:fe1b4d4cbe48c0d55507c55f8663aa4185576cc58fa0c8be03bb8f19dfe17a9c"
|
||||
digest = "1:64d212c703a2b94054be0ce470303286b177ad260b2f89a307e3d1bb6c073ef6"
|
||||
name = "github.com/gorilla/websocket"
|
||||
packages = ["."]
|
||||
pruneopts = ""
|
||||
|
@ -164,7 +153,7 @@
|
|||
version = "v1.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:4f767a115bc8e08576f6d38ab73c376fc1b1cd3bb5041171c9e8668cc7739b52"
|
||||
digest = "1:6f49eae0c1e5dab1dafafee34b207aeb7a42303105960944828c2079b92fc88e"
|
||||
name = "github.com/jmespath/go-jmespath"
|
||||
packages = ["."]
|
||||
pruneopts = ""
|
||||
|
@ -172,14 +161,14 @@
|
|||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:375104fd705791c50351e652a9d80321813fefc4f83a7871cb2f6111a5bc1dc3"
|
||||
digest = "1:d261f80387a38eeddc1d819ee9ee56d37ca10fc02e6e09ff400fb0ce146e13dc"
|
||||
name = "github.com/lbryio/lbryschema.go"
|
||||
packages = ["pb"]
|
||||
pruneopts = ""
|
||||
revision = "185433f2fd0c732547654749b98b37e56223dd22"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:daad05ffdae6e2cd9bd9bbc14440e7e8e841037141f26a775a5a31b1b61cb14d"
|
||||
digest = "1:5e30b8342813a6a85a647f9277e34ffcd5872dc57ab590dd9b251b145b6ec88f"
|
||||
name = "github.com/lbryio/ozzo-validation"
|
||||
packages = ["."]
|
||||
pruneopts = ""
|
||||
|
@ -203,7 +192,7 @@
|
|||
revision = "bb74f1db0675b241733089d5a1faa5dd8b0ef57b"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:ba7d1dfde0c2142011332bffce4d8468310228afd49dd4425ac59fa9124fb7c4"
|
||||
digest = "1:3cb50c403fa46c85697dbc4e06a95008689e058f33466b7eb8d31ea0eb291ea3"
|
||||
name = "github.com/nlopes/slack"
|
||||
packages = ["."]
|
||||
pruneopts = ""
|
||||
|
@ -212,7 +201,7 @@
|
|||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:1ee326e6da62d87b3a07a9303d2cbb70f974207d14e6d992c61b6e650ff37c50"
|
||||
digest = "1:8d6d81d0d9d8153e65d637bda77a7c4e6ba496c61efac3578d7d8c981ac31a7b"
|
||||
name = "github.com/rylio/ytdl"
|
||||
packages = ["."]
|
||||
pruneopts = ""
|
||||
|
@ -228,7 +217,7 @@
|
|||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:4cb1f758b69097d419a148e64c86b358a4f77c695504f99de1ee86617d64f74e"
|
||||
digest = "1:c92f01303e3ab3b5da92657841639cb53d1548f0d2733d12ef3b9fd9d47c869e"
|
||||
name = "github.com/sirupsen/logrus"
|
||||
packages = ["."]
|
||||
pruneopts = ""
|
||||
|
@ -244,7 +233,7 @@
|
|||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:7bcdb212f21d3cf318699d50af69a9192ef73fedad0d94d9ed5616f349457881"
|
||||
digest = "1:bfbf4a9c265ef41f8d03c9d91e340aaddae835710eaed6cd2e6be889cbc05f56"
|
||||
name = "github.com/spf13/cobra"
|
||||
packages = ["."]
|
||||
pruneopts = ""
|
||||
|
@ -276,7 +265,7 @@
|
|||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:e7dc596c84a908dd326d2c07453307f192976c3edb9482b88290c42076fe378f"
|
||||
digest = "1:8af4dda167d0ef21ab0affc797bff87ed0e87c57bd1d9bf57ad8f72d348c7932"
|
||||
name = "golang.org/x/crypto"
|
||||
packages = [
|
||||
"ripemd160",
|
||||
|
@ -288,7 +277,7 @@
|
|||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:3edb9c19d0b874999053badbbcc08edab3cde0262d2beb36ad6c0d78391c19ac"
|
||||
digest = "1:5dc6753986b9eeba4abdf05dedc5ba06bb52dad43cc8aad35ffb42bb7adfa68f"
|
||||
name = "golang.org/x/net"
|
||||
packages = [
|
||||
"context",
|
||||
|
@ -301,7 +290,7 @@
|
|||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:b1ac49fd3eae66e95230ea3423158b18374a5ad1c53caf89bc7fc1a441e9e0e7"
|
||||
digest = "1:baee54aa41cb93366e76a9c29f8dd2e4c4e6a35ff89551721d5275d2c858edc9"
|
||||
name = "golang.org/x/sys"
|
||||
packages = [
|
||||
"unix",
|
||||
|
@ -312,7 +301,7 @@
|
|||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:b4f82373e582dff1470e66574ac664b548aa69cffce0943321797fd75f46ee6f"
|
||||
digest = "1:b064108d68f82d0201d9f812297c928e57488e82ccdb77ed06ac69f64519a890"
|
||||
name = "google.golang.org/api"
|
||||
packages = [
|
||||
"gensupport",
|
||||
|
@ -325,7 +314,7 @@
|
|||
revision = "ef86ce4234efee96020bde00391d6a9cfae66561"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:05eca53b271663de74078b5484b1995a8d56668a51434a698dc5d0863035d575"
|
||||
digest = "1:f771bf87a3253de520c2af6fb6e75314dce0fedc0b30b208134fe502932bb15d"
|
||||
name = "gopkg.in/nullbio/null.v6"
|
||||
packages = ["convert"]
|
||||
pruneopts = ""
|
||||
|
@ -348,7 +337,6 @@
|
|||
"github.com/btcsuite/btcutil",
|
||||
"github.com/btcsuite/btcutil/base58",
|
||||
"github.com/davecgh/go-spew/spew",
|
||||
"github.com/garyburd/redigo/redis",
|
||||
"github.com/go-errors/errors",
|
||||
"github.com/go-ini/ini",
|
||||
"github.com/lbryio/lbryschema.go/pb",
|
||||
|
|
|
@ -2,10 +2,6 @@
|
|||
name = "github.com/davecgh/go-spew"
|
||||
version = "1.1.0"
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/garyburd/redigo"
|
||||
version = "1.1.0"
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/go-errors/errors"
|
||||
version = "1.0.0"
|
||||
|
|
|
@ -2,6 +2,7 @@ package cmd
|
|||
|
||||
import (
|
||||
sync "github.com/lbryio/lbry.go/ytsync"
|
||||
"github.com/lbryio/lbry.go/ytsync/sdk"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
|
@ -22,7 +23,9 @@ func ytcount(cmd *cobra.Command, args []string) {
|
|||
channelID := args[1]
|
||||
|
||||
s := sync.Sync{
|
||||
APIConfig: &sdk.APIConfig{
|
||||
YoutubeAPIKey: ytAPIKey,
|
||||
},
|
||||
YoutubeChannelID: channelID,
|
||||
}
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/lbryio/lbry.go/util"
|
||||
sync "github.com/lbryio/lbry.go/ytsync"
|
||||
"github.com/lbryio/lbry.go/ytsync/sdk"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
@ -141,35 +142,40 @@ func ytSync(cmd *cobra.Command, args []string) {
|
|||
blobsDir = usr.HomeDir + "/.lbrynet/blobfiles/"
|
||||
}
|
||||
|
||||
sm := sync.SyncManager{
|
||||
StopOnError: stopOnError,
|
||||
MaxTries: maxTries,
|
||||
TakeOverExistingChannel: takeOverExistingChannel,
|
||||
Refill: refill,
|
||||
Limit: limit,
|
||||
SkipSpaceCheck: skipSpaceCheck,
|
||||
SyncUpdate: syncUpdate,
|
||||
SyncStatus: syncStatus,
|
||||
syncProperties := &sdk.SyncProperties{
|
||||
SyncFrom: syncFrom,
|
||||
SyncUntil: syncUntil,
|
||||
ConcurrentJobs: concurrentJobs,
|
||||
ConcurrentVideos: concurrentJobs,
|
||||
HostName: hostname,
|
||||
YoutubeChannelID: channelID,
|
||||
}
|
||||
apiConfig := &sdk.APIConfig{
|
||||
YoutubeAPIKey: youtubeAPIKey,
|
||||
ApiURL: apiURL,
|
||||
ApiToken: apiToken,
|
||||
BlobsDir: blobsDir,
|
||||
VideosLimit: videosLimit,
|
||||
MaxVideoSize: maxVideoSize,
|
||||
LbrycrdString: lbrycrdString,
|
||||
AwsS3ID: awsS3ID,
|
||||
AwsS3Secret: awsS3Secret,
|
||||
AwsS3Region: awsS3Region,
|
||||
AwsS3Bucket: awsS3Bucket,
|
||||
SingleRun: singleRun,
|
||||
HostName: hostname,
|
||||
}
|
||||
|
||||
sm := sync.NewSyncManager(
|
||||
stopOnError,
|
||||
maxTries,
|
||||
takeOverExistingChannel,
|
||||
refill,
|
||||
limit,
|
||||
skipSpaceCheck,
|
||||
syncUpdate,
|
||||
concurrentJobs,
|
||||
concurrentJobs,
|
||||
blobsDir,
|
||||
videosLimit,
|
||||
maxVideoSize,
|
||||
lbrycrdString,
|
||||
awsS3ID,
|
||||
awsS3Secret,
|
||||
awsS3Region,
|
||||
awsS3Bucket,
|
||||
syncStatus,
|
||||
singleRun,
|
||||
syncProperties,
|
||||
apiConfig,
|
||||
)
|
||||
err := sm.Start()
|
||||
if err != nil {
|
||||
sync.SendErrorToSlack(err.Error())
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
|
||||
func (s *Sync) CountVideos() (uint64, error) {
|
||||
client := &http.Client{
|
||||
Transport: &transport.APIKey{Key: s.YoutubeAPIKey},
|
||||
Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey},
|
||||
}
|
||||
|
||||
service, err := youtube.New(client)
|
||||
|
|
|
@ -1,49 +1,71 @@
|
|||
package ytsync
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/errors"
|
||||
"github.com/lbryio/lbry.go/null"
|
||||
"github.com/lbryio/lbry.go/util"
|
||||
"github.com/lbryio/lbry.go/ytsync/namer"
|
||||
"github.com/lbryio/lbry.go/ytsync/sdk"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type SyncManager struct {
|
||||
StopOnError bool
|
||||
MaxTries int
|
||||
TakeOverExistingChannel bool
|
||||
Refill int
|
||||
Limit int
|
||||
SkipSpaceCheck bool
|
||||
SyncUpdate bool
|
||||
SyncStatus string
|
||||
SyncFrom int64
|
||||
SyncUntil int64
|
||||
ConcurrentJobs int
|
||||
ConcurrentVideos int
|
||||
HostName string
|
||||
YoutubeChannelID string
|
||||
YoutubeAPIKey string
|
||||
ApiURL string
|
||||
ApiToken string
|
||||
BlobsDir string
|
||||
VideosLimit int
|
||||
MaxVideoSize int
|
||||
LbrycrdString string
|
||||
AwsS3ID string
|
||||
AwsS3Secret string
|
||||
AwsS3Region string
|
||||
AwsS3Bucket string
|
||||
SingleRun bool
|
||||
stopOnError bool
|
||||
maxTries int
|
||||
takeOverExistingChannel bool
|
||||
refill int
|
||||
limit int
|
||||
skipSpaceCheck bool
|
||||
syncUpdate bool
|
||||
concurrentJobs int
|
||||
concurrentVideos int
|
||||
blobsDir string
|
||||
videosLimit int
|
||||
maxVideoSize int
|
||||
lbrycrdString string
|
||||
awsS3ID string
|
||||
awsS3Secret string
|
||||
awsS3Region string
|
||||
syncStatus string
|
||||
awsS3Bucket string
|
||||
singleRun bool
|
||||
syncProperties *sdk.SyncProperties
|
||||
apiConfig *sdk.APIConfig
|
||||
namer *namer.Namer
|
||||
}
|
||||
|
||||
func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool, refill int, limit int,
|
||||
skipSpaceCheck bool, syncUpdate bool, concurrentJobs int, concurrentVideos int, blobsDir string, videosLimit int,
|
||||
maxVideoSize int, lbrycrdString string, awsS3ID string, awsS3Secret string, awsS3Region string, awsS3Bucket string,
|
||||
syncStatus string, singleRun bool, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig) *SyncManager {
|
||||
return &SyncManager{
|
||||
stopOnError: stopOnError,
|
||||
maxTries: maxTries,
|
||||
takeOverExistingChannel: takeOverExistingChannel,
|
||||
refill: refill,
|
||||
limit: limit,
|
||||
skipSpaceCheck: skipSpaceCheck,
|
||||
syncUpdate: syncUpdate,
|
||||
concurrentJobs: concurrentJobs,
|
||||
concurrentVideos: concurrentVideos,
|
||||
blobsDir: blobsDir,
|
||||
videosLimit: videosLimit,
|
||||
maxVideoSize: maxVideoSize,
|
||||
lbrycrdString: lbrycrdString,
|
||||
awsS3ID: awsS3ID,
|
||||
awsS3Secret: awsS3Secret,
|
||||
awsS3Region: awsS3Region,
|
||||
awsS3Bucket: awsS3Bucket,
|
||||
syncStatus: syncStatus,
|
||||
singleRun: singleRun,
|
||||
syncProperties: syncProperties,
|
||||
apiConfig: apiConfig,
|
||||
namer: namer.NewNamer(),
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -57,143 +79,13 @@ const (
|
|||
|
||||
var SyncStatuses = []string{StatusPending, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized}
|
||||
|
||||
type apiJobsResponse struct {
|
||||
Success bool `json:"success"`
|
||||
Error null.String `json:"error"`
|
||||
Data []apiYoutubeChannel `json:"data"`
|
||||
}
|
||||
|
||||
type apiYoutubeChannel struct {
|
||||
ChannelId string `json:"channel_id"`
|
||||
TotalVideos uint `json:"total_videos"`
|
||||
DesiredChannelName string `json:"desired_channel_name"`
|
||||
SyncServer null.String `json:"sync_server"`
|
||||
}
|
||||
|
||||
func (s *SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
|
||||
endpoint := s.ApiURL + "/yt/jobs"
|
||||
res, _ := http.PostForm(endpoint, url.Values{
|
||||
"auth_token": {s.ApiToken},
|
||||
"sync_status": {status},
|
||||
"min_videos": {strconv.Itoa(1)},
|
||||
"after": {strconv.Itoa(int(s.SyncFrom))},
|
||||
"before": {strconv.Itoa(int(s.SyncUntil))},
|
||||
"sync_server": {s.HostName},
|
||||
"channel_id": {s.YoutubeChannelID},
|
||||
})
|
||||
defer res.Body.Close()
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
var response apiJobsResponse
|
||||
err := json.Unmarshal(body, &response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if response.Data == nil {
|
||||
return nil, errors.Err(response.Error)
|
||||
}
|
||||
log.Printf("Fetched channels: %d", len(response.Data))
|
||||
return response.Data, nil
|
||||
}
|
||||
|
||||
type apiChannelStatusResponse struct {
|
||||
Success bool `json:"success"`
|
||||
Error null.String `json:"error"`
|
||||
Data []syncedVideo `json:"data"`
|
||||
}
|
||||
|
||||
type syncedVideo struct {
|
||||
VideoID string `json:"video_id"`
|
||||
Published bool `json:"published"`
|
||||
FailureReason string `json:"failure_reason"`
|
||||
ClaimName string `json:"claim_name"`
|
||||
}
|
||||
|
||||
func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, map[string]bool, error) {
|
||||
endpoint := s.ApiURL + "/yt/channel_status"
|
||||
if len(failureReason) > maxReasonLength {
|
||||
failureReason = failureReason[:maxReasonLength]
|
||||
}
|
||||
res, _ := http.PostForm(endpoint, url.Values{
|
||||
"channel_id": {channelID},
|
||||
"sync_server": {s.HostName},
|
||||
"auth_token": {s.ApiToken},
|
||||
"sync_status": {status},
|
||||
"failure_reason": {failureReason},
|
||||
})
|
||||
defer res.Body.Close()
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
var response apiChannelStatusResponse
|
||||
err := json.Unmarshal(body, &response)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if !response.Error.IsNull() {
|
||||
return nil, nil, errors.Err(response.Error.String)
|
||||
}
|
||||
if response.Data != nil {
|
||||
svs := make(map[string]syncedVideo)
|
||||
claimNames := make(map[string]bool)
|
||||
for _, v := range response.Data {
|
||||
svs[v.VideoID] = v
|
||||
claimNames[v.ClaimName] = v.Published
|
||||
}
|
||||
return svs, claimNames, nil
|
||||
}
|
||||
return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
||||
}
|
||||
|
||||
const (
|
||||
VideoStatusPublished = "published"
|
||||
VideoStatusFailed = "failed"
|
||||
)
|
||||
|
||||
func (s *SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error {
|
||||
endpoint := s.ApiURL + "/yt/video_status"
|
||||
if len(failureReason) > maxReasonLength {
|
||||
failureReason = failureReason[:maxReasonLength]
|
||||
}
|
||||
vals := url.Values{
|
||||
"youtube_channel_id": {channelID},
|
||||
"video_id": {videoID},
|
||||
"status": {status},
|
||||
"auth_token": {s.ApiToken},
|
||||
}
|
||||
if status == VideoStatusPublished {
|
||||
if claimID == "" || claimName == "" {
|
||||
return errors.Err("claimID or claimName missing")
|
||||
}
|
||||
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
|
||||
vals.Add("claim_id", claimID)
|
||||
vals.Add("claim_name", claimName)
|
||||
if size != nil {
|
||||
vals.Add("size", strconv.FormatInt(*size, 10))
|
||||
}
|
||||
}
|
||||
if failureReason != "" {
|
||||
vals.Add("failure_reason", failureReason)
|
||||
}
|
||||
res, _ := http.PostForm(endpoint, vals)
|
||||
defer res.Body.Close()
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
var response struct {
|
||||
Success bool `json:"success"`
|
||||
Error null.String `json:"error"`
|
||||
Data null.String `json:"data"`
|
||||
}
|
||||
err := json.Unmarshal(body, &response)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !response.Error.IsNull() {
|
||||
return errors.Err(response.Error.String)
|
||||
}
|
||||
if !response.Data.IsNull() && response.Data.String == "ok" {
|
||||
return nil
|
||||
}
|
||||
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
||||
}
|
||||
|
||||
func (s *SyncManager) Start() error {
|
||||
|
||||
syncCount := 0
|
||||
for {
|
||||
err := s.checkUsedSpace()
|
||||
|
@ -204,9 +96,9 @@ func (s *SyncManager) Start() error {
|
|||
var syncs []Sync
|
||||
shouldInterruptLoop := false
|
||||
|
||||
isSingleChannelSync := s.YoutubeChannelID != ""
|
||||
isSingleChannelSync := s.syncProperties.YoutubeChannelID != ""
|
||||
if isSingleChannelSync {
|
||||
channels, err := s.fetchChannels("")
|
||||
channels, err := s.apiConfig.FetchChannels("", s.syncProperties)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -216,52 +108,53 @@ func (s *SyncManager) Start() error {
|
|||
lbryChannelName := channels[0].DesiredChannelName
|
||||
syncs = make([]Sync, 1)
|
||||
syncs[0] = Sync{
|
||||
YoutubeAPIKey: s.YoutubeAPIKey,
|
||||
YoutubeChannelID: s.YoutubeChannelID,
|
||||
APIConfig: s.apiConfig,
|
||||
YoutubeChannelID: s.syncProperties.YoutubeChannelID,
|
||||
LbryChannelName: lbryChannelName,
|
||||
StopOnError: s.StopOnError,
|
||||
MaxTries: s.MaxTries,
|
||||
ConcurrentVideos: s.ConcurrentVideos,
|
||||
TakeOverExistingChannel: s.TakeOverExistingChannel,
|
||||
Refill: s.Refill,
|
||||
StopOnError: s.stopOnError,
|
||||
MaxTries: s.maxTries,
|
||||
ConcurrentVideos: s.concurrentVideos,
|
||||
TakeOverExistingChannel: s.takeOverExistingChannel,
|
||||
Refill: s.refill,
|
||||
Manager: s,
|
||||
LbrycrdString: s.LbrycrdString,
|
||||
AwsS3ID: s.AwsS3ID,
|
||||
AwsS3Secret: s.AwsS3Secret,
|
||||
AwsS3Region: s.AwsS3Region,
|
||||
AwsS3Bucket: s.AwsS3Bucket,
|
||||
LbrycrdString: s.lbrycrdString,
|
||||
AwsS3ID: s.awsS3ID,
|
||||
AwsS3Secret: s.awsS3Secret,
|
||||
AwsS3Region: s.awsS3Region,
|
||||
AwsS3Bucket: s.awsS3Bucket,
|
||||
namer: s.namer,
|
||||
}
|
||||
shouldInterruptLoop = true
|
||||
} else {
|
||||
var queuesToSync []string
|
||||
if s.SyncStatus != "" {
|
||||
queuesToSync = append(queuesToSync, s.SyncStatus)
|
||||
} else if s.SyncUpdate {
|
||||
if s.syncStatus != "" {
|
||||
queuesToSync = append(queuesToSync, s.syncStatus)
|
||||
} else if s.syncUpdate {
|
||||
queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced)
|
||||
} else {
|
||||
queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued)
|
||||
}
|
||||
for _, q := range queuesToSync {
|
||||
channels, err := s.fetchChannels(q)
|
||||
channels, err := s.apiConfig.FetchChannels(q, s.syncProperties)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, c := range channels {
|
||||
syncs = append(syncs, Sync{
|
||||
YoutubeAPIKey: s.YoutubeAPIKey,
|
||||
APIConfig: s.apiConfig,
|
||||
YoutubeChannelID: c.ChannelId,
|
||||
LbryChannelName: c.DesiredChannelName,
|
||||
StopOnError: s.StopOnError,
|
||||
MaxTries: s.MaxTries,
|
||||
ConcurrentVideos: s.ConcurrentVideos,
|
||||
TakeOverExistingChannel: s.TakeOverExistingChannel,
|
||||
Refill: s.Refill,
|
||||
StopOnError: s.stopOnError,
|
||||
MaxTries: s.maxTries,
|
||||
ConcurrentVideos: s.concurrentVideos,
|
||||
TakeOverExistingChannel: s.takeOverExistingChannel,
|
||||
Refill: s.refill,
|
||||
Manager: s,
|
||||
LbrycrdString: s.LbrycrdString,
|
||||
AwsS3ID: s.AwsS3ID,
|
||||
AwsS3Secret: s.AwsS3Secret,
|
||||
AwsS3Region: s.AwsS3Region,
|
||||
AwsS3Bucket: s.AwsS3Bucket,
|
||||
LbrycrdString: s.lbrycrdString,
|
||||
AwsS3ID: s.awsS3ID,
|
||||
AwsS3Secret: s.awsS3Secret,
|
||||
AwsS3Region: s.awsS3Region,
|
||||
AwsS3Bucket: s.awsS3Bucket,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -294,12 +187,12 @@ func (s *SyncManager) Start() error {
|
|||
if !shouldNotCount {
|
||||
syncCount++
|
||||
}
|
||||
if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) {
|
||||
if sync.IsInterrupted() || (s.limit != 0 && syncCount >= s.limit) {
|
||||
shouldInterruptLoop = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if shouldInterruptLoop || s.SingleRun {
|
||||
if shouldInterruptLoop || s.singleRun {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -307,11 +200,11 @@ func (s *SyncManager) Start() error {
|
|||
}
|
||||
|
||||
func (s *SyncManager) checkUsedSpace() error {
|
||||
usedPctile, err := GetUsedSpace(s.BlobsDir)
|
||||
usedPctile, err := GetUsedSpace(s.blobsDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if usedPctile >= 0.90 && !s.SkipSpaceCheck {
|
||||
if usedPctile >= 0.90 && !s.skipSpaceCheck {
|
||||
return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100))
|
||||
}
|
||||
log.Infof("disk usage: %.1f%%", usedPctile*100)
|
||||
|
|
83
ytsync/namer/names.go
Normal file
83
ytsync/namer/names.go
Normal file
|
@ -0,0 +1,83 @@
|
|||
package namer
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var titleRegexp = regexp.MustCompile(`[^a-zA-Z0-9]+`)
|
||||
|
||||
type Namer struct {
|
||||
mu *sync.Mutex
|
||||
names map[string]bool
|
||||
}
|
||||
|
||||
func NewNamer() *Namer {
|
||||
return &Namer{
|
||||
mu: &sync.Mutex{},
|
||||
names: make(map[string]bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Namer) SetNames(names map[string]bool) {
|
||||
n.names = names
|
||||
}
|
||||
|
||||
func (n *Namer) GetNextName(prefix string) string {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
|
||||
attempt := 1
|
||||
var name string
|
||||
for {
|
||||
name = getClaimNameFromTitle(prefix, attempt)
|
||||
if _, exists := n.names[name]; !exists {
|
||||
break
|
||||
}
|
||||
attempt++
|
||||
}
|
||||
|
||||
//if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash
|
||||
if len(name) < 2 {
|
||||
sum := md5.Sum([]byte(prefix))
|
||||
name = fmt.Sprintf("%s-%d", hex.EncodeToString(sum[:])[:15], attempt)
|
||||
}
|
||||
|
||||
n.names[name] = true
|
||||
|
||||
return name
|
||||
}
|
||||
|
||||
// TODO: clean this up some
|
||||
func getClaimNameFromTitle(title string, attempt int) string {
|
||||
suffix := ""
|
||||
if attempt > 1 {
|
||||
suffix = "-" + strconv.Itoa(attempt)
|
||||
}
|
||||
maxLen := 40 - len(suffix)
|
||||
|
||||
chunks := strings.Split(strings.ToLower(strings.Trim(titleRegexp.ReplaceAllString(title, "-"), "-")), "-")
|
||||
|
||||
name := chunks[0]
|
||||
if len(name) > maxLen {
|
||||
return name[:maxLen]
|
||||
}
|
||||
|
||||
for _, chunk := range chunks[1:] {
|
||||
tmpName := name + "-" + chunk
|
||||
if len(tmpName) > maxLen {
|
||||
if len(name) < 20 {
|
||||
name = tmpName[:maxLen]
|
||||
}
|
||||
break
|
||||
}
|
||||
name = tmpName
|
||||
}
|
||||
|
||||
return name + suffix
|
||||
}
|
170
ytsync/sdk/api.go
Normal file
170
ytsync/sdk/api.go
Normal file
|
@ -0,0 +1,170 @@
|
|||
package sdk
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/errors"
|
||||
"github.com/lbryio/lbry.go/null"
|
||||
)
|
||||
|
||||
const (
|
||||
MaxReasonLength = 500
|
||||
)
|
||||
|
||||
type APIConfig struct {
|
||||
YoutubeAPIKey string
|
||||
ApiURL string
|
||||
ApiToken string
|
||||
HostName string
|
||||
}
|
||||
|
||||
type SyncProperties struct {
|
||||
SyncFrom int64
|
||||
SyncUntil int64
|
||||
YoutubeChannelID string
|
||||
}
|
||||
|
||||
type YoutubeChannel struct {
|
||||
ChannelId string `json:"channel_id"`
|
||||
TotalVideos uint `json:"total_videos"`
|
||||
DesiredChannelName string `json:"desired_channel_name"`
|
||||
SyncServer null.String `json:"sync_server"`
|
||||
Fee *struct {
|
||||
Amount string `json:"amount"`
|
||||
Address string `json:"address"`
|
||||
Currency string `json:"currency"`
|
||||
} `json:"fee"`
|
||||
}
|
||||
|
||||
func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeChannel, error) {
|
||||
type apiJobsResponse struct {
|
||||
Success bool `json:"success"`
|
||||
Error null.String `json:"error"`
|
||||
Data []YoutubeChannel `json:"data"`
|
||||
}
|
||||
endpoint := a.ApiURL + "/yt/jobs"
|
||||
res, _ := http.PostForm(endpoint, url.Values{
|
||||
"auth_token": {a.ApiToken},
|
||||
"sync_status": {status},
|
||||
"min_videos": {strconv.Itoa(1)},
|
||||
"after": {strconv.Itoa(int(cp.SyncFrom))},
|
||||
"before": {strconv.Itoa(int(cp.SyncUntil))},
|
||||
"sync_server": {a.HostName},
|
||||
"channel_id": {cp.YoutubeChannelID},
|
||||
})
|
||||
defer res.Body.Close()
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
var response apiJobsResponse
|
||||
err := json.Unmarshal(body, &response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if response.Data == nil {
|
||||
return nil, errors.Err(response.Error)
|
||||
}
|
||||
log.Printf("Fetched channels: %d", len(response.Data))
|
||||
return response.Data, nil
|
||||
}
|
||||
|
||||
type SyncedVideo struct {
|
||||
VideoID string `json:"video_id"`
|
||||
Published bool `json:"published"`
|
||||
FailureReason string `json:"failure_reason"`
|
||||
ClaimName string `json:"claim_name"`
|
||||
}
|
||||
|
||||
func (a *APIConfig) SetChannelStatus(channelID string, status string, failureReason string) (map[string]SyncedVideo, map[string]bool, error) {
|
||||
type apiChannelStatusResponse struct {
|
||||
Success bool `json:"success"`
|
||||
Error null.String `json:"error"`
|
||||
Data []SyncedVideo `json:"data"`
|
||||
}
|
||||
endpoint := a.ApiURL + "/yt/channel_status"
|
||||
if len(failureReason) > MaxReasonLength {
|
||||
failureReason = failureReason[:MaxReasonLength]
|
||||
}
|
||||
res, _ := http.PostForm(endpoint, url.Values{
|
||||
"channel_id": {channelID},
|
||||
"sync_server": {a.HostName},
|
||||
"auth_token": {a.ApiToken},
|
||||
"sync_status": {status},
|
||||
"failure_reason": {failureReason},
|
||||
})
|
||||
defer res.Body.Close()
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
var response apiChannelStatusResponse
|
||||
err := json.Unmarshal(body, &response)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if !response.Error.IsNull() {
|
||||
return nil, nil, errors.Err(response.Error.String)
|
||||
}
|
||||
if response.Data != nil {
|
||||
svs := make(map[string]SyncedVideo)
|
||||
claimNames := make(map[string]bool)
|
||||
for _, v := range response.Data {
|
||||
svs[v.VideoID] = v
|
||||
claimNames[v.ClaimName] = v.Published
|
||||
}
|
||||
return svs, claimNames, nil
|
||||
}
|
||||
return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
||||
}
|
||||
|
||||
const (
|
||||
VideoStatusPublished = "published"
|
||||
VideoStatusFailed = "failed"
|
||||
)
|
||||
|
||||
func (a *APIConfig) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error {
|
||||
endpoint := a.ApiURL + "/yt/video_status"
|
||||
if len(failureReason) > MaxReasonLength {
|
||||
failureReason = failureReason[:MaxReasonLength]
|
||||
}
|
||||
vals := url.Values{
|
||||
"youtube_channel_id": {channelID},
|
||||
"video_id": {videoID},
|
||||
"status": {status},
|
||||
"auth_token": {a.ApiToken},
|
||||
}
|
||||
if status == VideoStatusPublished {
|
||||
if claimID == "" || claimName == "" {
|
||||
return errors.Err("claimID or claimName missing")
|
||||
}
|
||||
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
|
||||
vals.Add("claim_id", claimID)
|
||||
vals.Add("claim_name", claimName)
|
||||
if size != nil {
|
||||
vals.Add("size", strconv.FormatInt(*size, 10))
|
||||
}
|
||||
}
|
||||
if failureReason != "" {
|
||||
vals.Add("failure_reason", failureReason)
|
||||
}
|
||||
res, _ := http.PostForm(endpoint, vals)
|
||||
defer res.Body.Close()
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
var response struct {
|
||||
Success bool `json:"success"`
|
||||
Error null.String `json:"error"`
|
||||
Data null.String `json:"data"`
|
||||
}
|
||||
err := json.Unmarshal(body, &response)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !response.Error.IsNull() {
|
||||
return errors.Err(response.Error.String)
|
||||
}
|
||||
if !response.Data.IsNull() && response.Data.String == "ok" {
|
||||
return nil
|
||||
}
|
||||
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
||||
}
|
|
@ -50,8 +50,8 @@ func (s *Sync) walletSetup() error {
|
|||
s.syncedVideosMux.RUnlock()
|
||||
log.Debugf("We already allocated credits for %d videos", numPublished)
|
||||
|
||||
if numOnSource-numPublished > s.Manager.VideosLimit {
|
||||
numOnSource = s.Manager.VideosLimit
|
||||
if numOnSource-numPublished > s.Manager.videosLimit {
|
||||
numOnSource = s.Manager.videosLimit
|
||||
}
|
||||
|
||||
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount
|
||||
|
|
|
@ -1,90 +1,27 @@
|
|||
package sources
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
|
||||
"github.com/lbryio/lbry.go/jsonrpc"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/lbryio/lbry.go/ytsync/namer"
|
||||
)
|
||||
|
||||
var titleRegexp = regexp.MustCompile(`[^a-zA-Z0-9]+`)
|
||||
|
||||
type SyncSummary struct {
|
||||
ClaimID string
|
||||
ClaimName string
|
||||
}
|
||||
|
||||
func getClaimNameFromTitle(title string, attempt int) string {
|
||||
suffix := ""
|
||||
if attempt > 1 {
|
||||
suffix = "-" + strconv.Itoa(attempt)
|
||||
}
|
||||
maxLen := 40 - len(suffix)
|
||||
|
||||
chunks := strings.Split(strings.ToLower(strings.Trim(titleRegexp.ReplaceAllString(title, "-"), "-")), "-")
|
||||
|
||||
name := chunks[0]
|
||||
if len(name) > maxLen {
|
||||
return name[:maxLen]
|
||||
}
|
||||
|
||||
for _, chunk := range chunks[1:] {
|
||||
tmpName := name + "-" + chunk
|
||||
if len(tmpName) > maxLen {
|
||||
if len(name) < 20 {
|
||||
name = tmpName[:maxLen]
|
||||
}
|
||||
break
|
||||
}
|
||||
name = tmpName
|
||||
}
|
||||
|
||||
return name + suffix
|
||||
}
|
||||
|
||||
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
|
||||
attempt := 0
|
||||
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, namer *namer.Namer) (*SyncSummary, error) {
|
||||
for {
|
||||
attempt++
|
||||
name := getClaimNameFromTitle(title, attempt)
|
||||
|
||||
syncedVideosMux.Lock()
|
||||
_, exists := claimNames[name]
|
||||
if exists {
|
||||
log.Printf("name exists, retrying (%d attempts so far)", attempt)
|
||||
syncedVideosMux.Unlock()
|
||||
continue
|
||||
}
|
||||
claimNames[name] = false
|
||||
syncedVideosMux.Unlock()
|
||||
|
||||
//if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash
|
||||
if len(name) < 2 {
|
||||
hasher := md5.New()
|
||||
hasher.Write([]byte(title))
|
||||
name = fmt.Sprintf("%s-%d", hex.EncodeToString(hasher.Sum(nil))[:15], attempt)
|
||||
}
|
||||
|
||||
name := namer.GetNextName(title)
|
||||
response, err := daemon.Publish(name, filename, amount, options)
|
||||
if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") {
|
||||
syncedVideosMux.Lock()
|
||||
claimNames[name] = true
|
||||
syncedVideosMux.Unlock()
|
||||
if err == nil {
|
||||
return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
|
||||
} else {
|
||||
log.Printf("name exists, retrying (%d attempts so far)", attempt)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "failed: Multiple claims (") {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,15 +6,15 @@ import (
|
|||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
"github.com/lbryio/lbry.go/errors"
|
||||
"github.com/lbryio/lbry.go/jsonrpc"
|
||||
"github.com/lbryio/lbry.go/ytsync/namer"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
|
@ -174,7 +174,7 @@ func (v *ucbVideo) saveThumbnail() error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
|
||||
func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *namer.Namer) (*SyncSummary, error) {
|
||||
options := jsonrpc.PublishOptions{
|
||||
Title: &v.title,
|
||||
Author: strPtr("UC Berkeley"),
|
||||
|
@ -187,16 +187,14 @@ func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount f
|
|||
ChangeAddress: &claimAddress,
|
||||
}
|
||||
|
||||
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
|
||||
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, namer)
|
||||
}
|
||||
|
||||
func (v *ucbVideo) Size() *int64 {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
|
||||
v.claimNames = claimNames
|
||||
v.syncedVideosMux = syncedVideosMux
|
||||
func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *namer.Namer) (*SyncSummary, error) {
|
||||
//download and thumbnail can be done in parallel
|
||||
err := v.download()
|
||||
if err != nil {
|
||||
|
@ -210,7 +208,7 @@ func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount floa
|
|||
//}
|
||||
//log.Debugln("Created thumbnail for " + v.id)
|
||||
|
||||
summary, err := v.publish(daemon, claimAddress, amount, channelID)
|
||||
summary, err := v.publish(daemon, claimAddress, amount, channelID, namer)
|
||||
if err != nil {
|
||||
return nil, errors.Prefix("publish error", err)
|
||||
}
|
||||
|
|
|
@ -9,12 +9,12 @@ import (
|
|||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/errors"
|
||||
"github.com/lbryio/lbry.go/jsonrpc"
|
||||
"github.com/lbryio/lbry.go/ytsync/namer"
|
||||
|
||||
"github.com/rylio/ytdl"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
@ -155,6 +155,8 @@ func (v *YoutubeVideo) download() error {
|
|||
err = videoInfo.Download(formats[formatIndex], downloadedFile)
|
||||
downloadedFile.Close()
|
||||
if err != nil {
|
||||
//delete the video and ignore the error
|
||||
_ = v.delete()
|
||||
break
|
||||
}
|
||||
fi, err := os.Stat(v.getFilename())
|
||||
|
@ -172,7 +174,6 @@ func (v *YoutubeVideo) download() error {
|
|||
break
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -234,7 +235,7 @@ func (v *YoutubeVideo) triggerThumbnailSave() error {
|
|||
|
||||
func strPtr(s string) *string { return &s }
|
||||
|
||||
func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
|
||||
func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *namer.Namer) (*SyncSummary, error) {
|
||||
if channelID == "" {
|
||||
return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed?
|
||||
}
|
||||
|
@ -249,18 +250,16 @@ func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amou
|
|||
ChangeAddress: &claimAddress,
|
||||
ChannelID: &channelID,
|
||||
}
|
||||
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
|
||||
|
||||
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, namer)
|
||||
}
|
||||
|
||||
func (v *YoutubeVideo) Size() *int64 {
|
||||
return v.size
|
||||
}
|
||||
|
||||
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
|
||||
v.claimNames = claimNames
|
||||
v.syncedVideosMux = syncedVideosMux
|
||||
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *namer.Namer) (*SyncSummary, error) {
|
||||
v.maxVideoSize = int64(maxVideoSize) * 1024 * 1024
|
||||
|
||||
//download and thumbnail can be done in parallel
|
||||
err := v.download()
|
||||
if err != nil {
|
||||
|
@ -274,14 +273,11 @@ func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount
|
|||
}
|
||||
log.Debugln("Created thumbnail for " + v.id)
|
||||
|
||||
summary, err := v.publish(daemon, claimAddress, amount, channelID)
|
||||
summary, err := v.publish(daemon, claimAddress, amount, channelID, namer)
|
||||
//delete the video in all cases (and ignore the error)
|
||||
_ = v.delete()
|
||||
if err != nil {
|
||||
return nil, errors.Prefix("publish error", err)
|
||||
}
|
||||
|
||||
return summary, nil
|
||||
return summary, errors.Prefix("publish error", err)
|
||||
}
|
||||
|
||||
// sorting videos
|
||||
|
|
|
@ -17,17 +17,20 @@ import (
|
|||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/errors"
|
||||
"github.com/lbryio/lbry.go/jsonrpc"
|
||||
"github.com/lbryio/lbry.go/stop"
|
||||
"github.com/lbryio/lbry.go/util"
|
||||
"github.com/lbryio/lbry.go/ytsync/namer"
|
||||
"github.com/lbryio/lbry.go/ytsync/sdk"
|
||||
"github.com/lbryio/lbry.go/ytsync/sources"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
"github.com/lbryio/lbry.go/errors"
|
||||
"github.com/lbryio/lbry.go/jsonrpc"
|
||||
"github.com/lbryio/lbry.go/stop"
|
||||
"github.com/lbryio/lbry.go/util"
|
||||
"github.com/lbryio/lbry.go/ytsync/sources"
|
||||
"github.com/mitchellh/go-ps"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/api/googleapi/transport"
|
||||
|
@ -46,7 +49,7 @@ type video interface {
|
|||
IDAndNum() string
|
||||
PlaylistPosition() int
|
||||
PublishedAt() time.Time
|
||||
Sync(*jsonrpc.Client, string, float64, string, int, map[string]bool, *sync.RWMutex) (*sources.SyncSummary, error)
|
||||
Sync(*jsonrpc.Client, string, float64, string, int, *namer.Namer) (*sources.SyncSummary, error)
|
||||
}
|
||||
|
||||
// sorting videos
|
||||
|
@ -58,7 +61,7 @@ func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[
|
|||
|
||||
// Sync stores the options that control how syncing happens
|
||||
type Sync struct {
|
||||
YoutubeAPIKey string
|
||||
APIConfig *sdk.APIConfig
|
||||
YoutubeChannelID string
|
||||
LbryChannelName string
|
||||
StopOnError bool
|
||||
|
@ -77,10 +80,10 @@ type Sync struct {
|
|||
claimAddress string
|
||||
videoDirectory string
|
||||
syncedVideosMux *sync.RWMutex
|
||||
syncedVideos map[string]syncedVideo
|
||||
claimNames map[string]bool
|
||||
syncedVideos map[string]sdk.SyncedVideo
|
||||
grp *stop.Group
|
||||
lbryChannelID string
|
||||
namer *namer.Namer
|
||||
|
||||
walletMux *sync.Mutex
|
||||
queue chan video
|
||||
|
@ -89,14 +92,11 @@ type Sync struct {
|
|||
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) {
|
||||
s.syncedVideosMux.Lock()
|
||||
defer s.syncedVideosMux.Unlock()
|
||||
s.syncedVideos[videoID] = syncedVideo{
|
||||
s.syncedVideos[videoID] = sdk.SyncedVideo{
|
||||
VideoID: videoID,
|
||||
Published: published,
|
||||
FailureReason: failureReason,
|
||||
}
|
||||
if claimName != "" {
|
||||
s.claimNames[claimName] = true
|
||||
}
|
||||
}
|
||||
|
||||
// SendErrorToSlack Sends an error message to the default channel and to the process log.
|
||||
|
@ -221,13 +221,13 @@ func (s *Sync) uploadWallet() error {
|
|||
}
|
||||
|
||||
func (s *Sync) setStatusSyncing() error {
|
||||
syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
|
||||
syncedVideos, claimNames, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.syncedVideosMux.Lock()
|
||||
s.syncedVideos = syncedVideos
|
||||
s.claimNames = claimNames
|
||||
s.Manager.namer.SetNames(claimNames)
|
||||
s.syncedVideosMux.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
@ -260,7 +260,7 @@ func (s *Sync) FullCycle() (e error) {
|
|||
|
||||
err = s.downloadWallet()
|
||||
if err != nil && err.Error() != "wallet not on S3" {
|
||||
return errors.Prefix("failure in downloading wallet: ", err)
|
||||
return errors.Prefix("failure in downloading wallet", err)
|
||||
} else if err == nil {
|
||||
log.Println("Continuing previous upload")
|
||||
} else {
|
||||
|
@ -311,14 +311,13 @@ func (s *Sync) setChannelTerminationStatus(e *error) {
|
|||
return
|
||||
}
|
||||
failureReason := (*e).Error()
|
||||
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
|
||||
_, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
|
||||
err = errors.Prefix(msg, err)
|
||||
*e = errors.Prefix(err.Error(), *e)
|
||||
msg := fmt.Sprintf("Failed setting failed state for channel %s", s.LbryChannelName)
|
||||
*e = errors.Prefix(msg+err.Error(), *e)
|
||||
}
|
||||
} else if !s.IsInterrupted() {
|
||||
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "")
|
||||
_, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSynced, "")
|
||||
if err != nil {
|
||||
*e = err
|
||||
}
|
||||
|
@ -359,7 +358,7 @@ func (s *Sync) stopAndUploadWallet(e *error) {
|
|||
e = &err //not 100% sure
|
||||
return
|
||||
} else {
|
||||
*e = errors.Prefix("failure uploading wallet: ", *e)
|
||||
*e = errors.Prefix("failure uploading wallet", *e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -424,7 +423,7 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err
|
|||
pv, ok := s.syncedVideos[videoID]
|
||||
if !ok || pv.ClaimName != c.Name {
|
||||
fixed++
|
||||
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil)
|
||||
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil)
|
||||
if err != nil {
|
||||
return total, fixed, err
|
||||
}
|
||||
|
@ -438,11 +437,11 @@ func (s *Sync) doSync() error {
|
|||
var err error
|
||||
claims, err := s.daemon.ClaimListMine()
|
||||
if err != nil {
|
||||
return errors.Prefix("cannot list claims: ", err)
|
||||
return errors.Prefix("cannot list claims", err)
|
||||
}
|
||||
hasDupes, err := s.fixDupes(*claims)
|
||||
if err != nil {
|
||||
return errors.Prefix("error checking for duplicates: ", err)
|
||||
return errors.Prefix("error checking for duplicates", err)
|
||||
}
|
||||
if hasDupes {
|
||||
SendInfoToSlack("Channel had dupes and was fixed!")
|
||||
|
@ -452,13 +451,13 @@ func (s *Sync) doSync() error {
|
|||
}
|
||||
claims, err = s.daemon.ClaimListMine()
|
||||
if err != nil {
|
||||
return errors.Prefix("cannot list claims: ", err)
|
||||
return errors.Prefix("cannot list claims", err)
|
||||
}
|
||||
}
|
||||
|
||||
pubsOnWallet, nFixed, err := s.updateRemoteDB(*claims)
|
||||
if err != nil {
|
||||
return errors.Prefix("error counting claims: ", err)
|
||||
return errors.Prefix("error counting claims", err)
|
||||
}
|
||||
if nFixed > 0 {
|
||||
err := s.setStatusSyncing()
|
||||
|
@ -466,7 +465,6 @@ func (s *Sync) doSync() error {
|
|||
return err
|
||||
}
|
||||
SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed)
|
||||
|
||||
}
|
||||
pubsOnDB := 0
|
||||
for _, sv := range s.syncedVideos {
|
||||
|
@ -593,7 +591,7 @@ func (s *Sync) startWorker(workerNum int) {
|
|||
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
|
||||
}
|
||||
s.AppendSyncedVideo(v.ID(), false, err.Error(), "")
|
||||
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
|
||||
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
|
||||
if err != nil {
|
||||
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
||||
}
|
||||
|
@ -605,7 +603,7 @@ func (s *Sync) startWorker(workerNum int) {
|
|||
|
||||
func (s *Sync) enqueueYoutubeVideos() error {
|
||||
client := &http.Client{
|
||||
Transport: &transport.APIKey{Key: s.YoutubeAPIKey},
|
||||
Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey},
|
||||
}
|
||||
|
||||
service, err := youtube.New(client)
|
||||
|
@ -770,7 +768,7 @@ func (s *Sync) processVideo(v video) (err error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
if v.PlaylistPosition() > s.Manager.VideosLimit {
|
||||
if v.PlaylistPosition() > s.Manager.videosLimit {
|
||||
log.Println(v.ID() + " is old: skipping")
|
||||
return nil
|
||||
}
|
||||
|
@ -778,11 +776,13 @@ func (s *Sync) processVideo(v video) (err error) {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.claimNames, s.syncedVideosMux)
|
||||
|
||||
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.maxVideoSize, s.namer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
|
||||
|
||||
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
|
||||
if err != nil {
|
||||
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue