Ytsync refactor #45
13 changed files with 460 additions and 390 deletions
56
Gopkg.lock
generated
56
Gopkg.lock
generated
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
digest = "1:90272eae3bf75d2aa681ff3eee6cf4f49e8b06db533dc9c830ef214e5abbaaf2"
|
digest = "1:9a88883f474d09f1da61894cd8115c7f33988d6941e4f6236324c777aaff8f2c"
|
||||||
name = "github.com/PuerkitoBio/goquery"
|
name = "github.com/PuerkitoBio/goquery"
|
||||||
packages = ["."]
|
packages = ["."]
|
||||||
pruneopts = ""
|
pruneopts = ""
|
||||||
|
@ -10,7 +10,7 @@
|
||||||
version = "v1.4.1"
|
version = "v1.4.1"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
digest = "1:4d0cb5aec47a2aec8b8b211540dc59902f8f00d571281356597b41ae349f4885"
|
digest = "1:e3726ad6f38f710e84c8dcd0e830014de6eaeea81f28d91ae898afecc078479a"
|
||||||
name = "github.com/andybalholm/cascadia"
|
name = "github.com/andybalholm/cascadia"
|
||||||
packages = ["."]
|
packages = ["."]
|
||||||
pruneopts = ""
|
pruneopts = ""
|
||||||
|
@ -18,7 +18,7 @@
|
||||||
version = "v1.0.0"
|
version = "v1.0.0"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
digest = "1:d4336a8cea03c9729c9be12c142938a28a0dbac23a316f019f6fee323cf03709"
|
digest = "1:261d95f4464744d542759a7a33846f56f24113f5a93c7577f4cd7044f7cb3d76"
|
||||||
name = "github.com/aws/aws-sdk-go"
|
name = "github.com/aws/aws-sdk-go"
|
||||||
packages = [
|
packages = [
|
||||||
"aws",
|
"aws",
|
||||||
|
@ -55,7 +55,7 @@
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
branch = "master"
|
branch = "master"
|
||||||
digest = "1:ea2251fa804d1b978feac8146d751b32ce2017eaf1f2915fde0df389bacaf383"
|
digest = "1:cc8ebf0c6745d09f728f1fa4fbd29baaa2e3a65efb49b5fefb0c163171ee7863"
|
||||||
name = "github.com/btcsuite/btcd"
|
name = "github.com/btcsuite/btcd"
|
||||||
packages = [
|
packages = [
|
||||||
"btcec",
|
"btcec",
|
||||||
|
@ -78,7 +78,7 @@
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
branch = "master"
|
branch = "master"
|
||||||
digest = "1:471ae435f9ad7fc2f6b7a2e91ca026a124792859a7033fa60579c3aa618161ed"
|
digest = "1:b0f4d2431c167d7127a029210c1a7cdc33c9114c1b3fd3582347baad5e832588"
|
||||||
name = "github.com/btcsuite/btcutil"
|
name = "github.com/btcsuite/btcutil"
|
||||||
packages = [
|
packages = [
|
||||||
".",
|
".",
|
||||||
|
@ -98,31 +98,20 @@
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
branch = "master"
|
branch = "master"
|
||||||
digest = "1:57c911bbbf529465cf2ca5d43546cd5875a59054c41e2fe97791419959282aa1"
|
digest = "1:dfc248d5e6e1582fdec83796d3d1d451aa6cae773c4e4ba1dac2838caef6d381"
|
||||||
name = "github.com/btcsuite/websocket"
|
name = "github.com/btcsuite/websocket"
|
||||||
packages = ["."]
|
packages = ["."]
|
||||||
pruneopts = ""
|
pruneopts = ""
|
||||||
revision = "31079b6807923eb23992c421b114992b95131b55"
|
revision = "31079b6807923eb23992c421b114992b95131b55"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
digest = "1:0a39ec8bf5629610a4bc7873a92039ee509246da3cef1a0ea60f1ed7e5f9cea5"
|
digest = "1:56c130d885a4aacae1dd9c7b71cfe39912c7ebc1ff7d2b46083c8812996dc43b"
|
||||||
name = "github.com/davecgh/go-spew"
|
name = "github.com/davecgh/go-spew"
|
||||||
packages = ["spew"]
|
packages = ["spew"]
|
||||||
pruneopts = ""
|
pruneopts = ""
|
||||||
revision = "346938d642f2ec3594ed81d874461961cd0faa76"
|
revision = "346938d642f2ec3594ed81d874461961cd0faa76"
|
||||||
version = "v1.1.0"
|
version = "v1.1.0"
|
||||||
|
|
||||||
[[projects]]
|
|
||||||
digest = "1:4b5f8c148e7fa094b73bcb6d16ea46eac7fdc726e55b81845ff96e29df534421"
|
|
||||||
name = "github.com/garyburd/redigo"
|
|
||||||
packages = [
|
|
||||||
"internal",
|
|
||||||
"redis",
|
|
||||||
]
|
|
||||||
pruneopts = ""
|
|
||||||
revision = "a69d19351219b6dd56f274f96d85a7014a2ec34e"
|
|
||||||
version = "v1.6.0"
|
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
digest = "1:968d8903d598e3fae738325d3410f33f07ea6a2b9ee5591e9c262ee37df6845a"
|
digest = "1:968d8903d598e3fae738325d3410f33f07ea6a2b9ee5591e9c262ee37df6845a"
|
||||||
name = "github.com/go-errors/errors"
|
name = "github.com/go-errors/errors"
|
||||||
|
@ -133,14 +122,14 @@
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
branch = "master"
|
branch = "master"
|
||||||
digest = "1:4d5221853226d8d4be594d52d885ddde38170d2e3159b82ed92ecde4dded2304"
|
digest = "1:cd5bab9c9e23ffa6858eaa79dc827fd84bc24bc00b0cfb0b14036e393da2b1fa"
|
||||||
name = "github.com/go-ini/ini"
|
name = "github.com/go-ini/ini"
|
||||||
packages = ["."]
|
packages = ["."]
|
||||||
pruneopts = ""
|
pruneopts = ""
|
||||||
revision = "5cf292cae48347c2490ac1a58fe36735fb78df7e"
|
revision = "5cf292cae48347c2490ac1a58fe36735fb78df7e"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
digest = "1:b1d3041d568e065ab4d76f7477844458e9209c0bb241eaccdc0770bf0a13b120"
|
digest = "1:f958a1c137db276e52f0b50efee41a1a389dcdded59a69711f3e872757dab34b"
|
||||||
name = "github.com/golang/protobuf"
|
name = "github.com/golang/protobuf"
|
||||||
packages = ["proto"]
|
packages = ["proto"]
|
||||||
pruneopts = ""
|
pruneopts = ""
|
||||||
|
@ -148,7 +137,7 @@
|
||||||
version = "v1.1.0"
|
version = "v1.1.0"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
digest = "1:fe1b4d4cbe48c0d55507c55f8663aa4185576cc58fa0c8be03bb8f19dfe17a9c"
|
digest = "1:64d212c703a2b94054be0ce470303286b177ad260b2f89a307e3d1bb6c073ef6"
|
||||||
name = "github.com/gorilla/websocket"
|
name = "github.com/gorilla/websocket"
|
||||||
packages = ["."]
|
packages = ["."]
|
||||||
pruneopts = ""
|
pruneopts = ""
|
||||||
|
@ -164,7 +153,7 @@
|
||||||
version = "v1.0"
|
version = "v1.0"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
digest = "1:4f767a115bc8e08576f6d38ab73c376fc1b1cd3bb5041171c9e8668cc7739b52"
|
digest = "1:6f49eae0c1e5dab1dafafee34b207aeb7a42303105960944828c2079b92fc88e"
|
||||||
name = "github.com/jmespath/go-jmespath"
|
name = "github.com/jmespath/go-jmespath"
|
||||||
packages = ["."]
|
packages = ["."]
|
||||||
pruneopts = ""
|
pruneopts = ""
|
||||||
|
@ -172,14 +161,14 @@
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
branch = "master"
|
branch = "master"
|
||||||
digest = "1:375104fd705791c50351e652a9d80321813fefc4f83a7871cb2f6111a5bc1dc3"
|
digest = "1:d261f80387a38eeddc1d819ee9ee56d37ca10fc02e6e09ff400fb0ce146e13dc"
|
||||||
name = "github.com/lbryio/lbryschema.go"
|
name = "github.com/lbryio/lbryschema.go"
|
||||||
packages = ["pb"]
|
packages = ["pb"]
|
||||||
pruneopts = ""
|
pruneopts = ""
|
||||||
revision = "185433f2fd0c732547654749b98b37e56223dd22"
|
revision = "185433f2fd0c732547654749b98b37e56223dd22"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
digest = "1:daad05ffdae6e2cd9bd9bbc14440e7e8e841037141f26a775a5a31b1b61cb14d"
|
digest = "1:5e30b8342813a6a85a647f9277e34ffcd5872dc57ab590dd9b251b145b6ec88f"
|
||||||
name = "github.com/lbryio/ozzo-validation"
|
name = "github.com/lbryio/ozzo-validation"
|
||||||
packages = ["."]
|
packages = ["."]
|
||||||
pruneopts = ""
|
pruneopts = ""
|
||||||
|
@ -203,7 +192,7 @@
|
||||||
revision = "bb74f1db0675b241733089d5a1faa5dd8b0ef57b"
|
revision = "bb74f1db0675b241733089d5a1faa5dd8b0ef57b"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
digest = "1:ba7d1dfde0c2142011332bffce4d8468310228afd49dd4425ac59fa9124fb7c4"
|
digest = "1:3cb50c403fa46c85697dbc4e06a95008689e058f33466b7eb8d31ea0eb291ea3"
|
||||||
name = "github.com/nlopes/slack"
|
name = "github.com/nlopes/slack"
|
||||||
packages = ["."]
|
packages = ["."]
|
||||||
pruneopts = ""
|
pruneopts = ""
|
||||||
|
@ -212,7 +201,7 @@
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
branch = "master"
|
branch = "master"
|
||||||
digest = "1:1ee326e6da62d87b3a07a9303d2cbb70f974207d14e6d992c61b6e650ff37c50"
|
digest = "1:8d6d81d0d9d8153e65d637bda77a7c4e6ba496c61efac3578d7d8c981ac31a7b"
|
||||||
name = "github.com/rylio/ytdl"
|
name = "github.com/rylio/ytdl"
|
||||||
packages = ["."]
|
packages = ["."]
|
||||||
pruneopts = ""
|
pruneopts = ""
|
||||||
|
@ -228,7 +217,7 @@
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
branch = "master"
|
branch = "master"
|
||||||
digest = "1:4cb1f758b69097d419a148e64c86b358a4f77c695504f99de1ee86617d64f74e"
|
digest = "1:c92f01303e3ab3b5da92657841639cb53d1548f0d2733d12ef3b9fd9d47c869e"
|
||||||
name = "github.com/sirupsen/logrus"
|
name = "github.com/sirupsen/logrus"
|
||||||
packages = ["."]
|
packages = ["."]
|
||||||
pruneopts = ""
|
pruneopts = ""
|
||||||
|
@ -244,7 +233,7 @@
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
branch = "master"
|
branch = "master"
|
||||||
digest = "1:7bcdb212f21d3cf318699d50af69a9192ef73fedad0d94d9ed5616f349457881"
|
digest = "1:bfbf4a9c265ef41f8d03c9d91e340aaddae835710eaed6cd2e6be889cbc05f56"
|
||||||
name = "github.com/spf13/cobra"
|
name = "github.com/spf13/cobra"
|
||||||
packages = ["."]
|
packages = ["."]
|
||||||
pruneopts = ""
|
pruneopts = ""
|
||||||
|
@ -276,7 +265,7 @@
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
branch = "master"
|
branch = "master"
|
||||||
digest = "1:e7dc596c84a908dd326d2c07453307f192976c3edb9482b88290c42076fe378f"
|
digest = "1:8af4dda167d0ef21ab0affc797bff87ed0e87c57bd1d9bf57ad8f72d348c7932"
|
||||||
name = "golang.org/x/crypto"
|
name = "golang.org/x/crypto"
|
||||||
packages = [
|
packages = [
|
||||||
"ripemd160",
|
"ripemd160",
|
||||||
|
@ -288,7 +277,7 @@
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
branch = "master"
|
branch = "master"
|
||||||
digest = "1:3edb9c19d0b874999053badbbcc08edab3cde0262d2beb36ad6c0d78391c19ac"
|
digest = "1:5dc6753986b9eeba4abdf05dedc5ba06bb52dad43cc8aad35ffb42bb7adfa68f"
|
||||||
name = "golang.org/x/net"
|
name = "golang.org/x/net"
|
||||||
packages = [
|
packages = [
|
||||||
"context",
|
"context",
|
||||||
|
@ -301,7 +290,7 @@
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
branch = "master"
|
branch = "master"
|
||||||
digest = "1:b1ac49fd3eae66e95230ea3423158b18374a5ad1c53caf89bc7fc1a441e9e0e7"
|
digest = "1:baee54aa41cb93366e76a9c29f8dd2e4c4e6a35ff89551721d5275d2c858edc9"
|
||||||
name = "golang.org/x/sys"
|
name = "golang.org/x/sys"
|
||||||
packages = [
|
packages = [
|
||||||
"unix",
|
"unix",
|
||||||
|
@ -312,7 +301,7 @@
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
branch = "master"
|
branch = "master"
|
||||||
digest = "1:b4f82373e582dff1470e66574ac664b548aa69cffce0943321797fd75f46ee6f"
|
digest = "1:b064108d68f82d0201d9f812297c928e57488e82ccdb77ed06ac69f64519a890"
|
||||||
name = "google.golang.org/api"
|
name = "google.golang.org/api"
|
||||||
packages = [
|
packages = [
|
||||||
"gensupport",
|
"gensupport",
|
||||||
|
@ -325,7 +314,7 @@
|
||||||
revision = "ef86ce4234efee96020bde00391d6a9cfae66561"
|
revision = "ef86ce4234efee96020bde00391d6a9cfae66561"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
digest = "1:05eca53b271663de74078b5484b1995a8d56668a51434a698dc5d0863035d575"
|
digest = "1:f771bf87a3253de520c2af6fb6e75314dce0fedc0b30b208134fe502932bb15d"
|
||||||
name = "gopkg.in/nullbio/null.v6"
|
name = "gopkg.in/nullbio/null.v6"
|
||||||
packages = ["convert"]
|
packages = ["convert"]
|
||||||
pruneopts = ""
|
pruneopts = ""
|
||||||
|
@ -348,7 +337,6 @@
|
||||||
"github.com/btcsuite/btcutil",
|
"github.com/btcsuite/btcutil",
|
||||||
"github.com/btcsuite/btcutil/base58",
|
"github.com/btcsuite/btcutil/base58",
|
||||||
"github.com/davecgh/go-spew/spew",
|
"github.com/davecgh/go-spew/spew",
|
||||||
"github.com/garyburd/redigo/redis",
|
|
||||||
"github.com/go-errors/errors",
|
"github.com/go-errors/errors",
|
||||||
"github.com/go-ini/ini",
|
"github.com/go-ini/ini",
|
||||||
"github.com/lbryio/lbryschema.go/pb",
|
"github.com/lbryio/lbryschema.go/pb",
|
||||||
|
|
|
@ -2,10 +2,6 @@
|
||||||
name = "github.com/davecgh/go-spew"
|
name = "github.com/davecgh/go-spew"
|
||||||
version = "1.1.0"
|
version = "1.1.0"
|
||||||
|
|
||||||
[[constraint]]
|
|
||||||
name = "github.com/garyburd/redigo"
|
|
||||||
version = "1.1.0"
|
|
||||||
|
|
||||||
[[constraint]]
|
[[constraint]]
|
||||||
name = "github.com/go-errors/errors"
|
name = "github.com/go-errors/errors"
|
||||||
version = "1.0.0"
|
version = "1.0.0"
|
||||||
|
|
|
@ -2,6 +2,7 @@ package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
sync "github.com/lbryio/lbry.go/ytsync"
|
sync "github.com/lbryio/lbry.go/ytsync"
|
||||||
|
"github.com/lbryio/lbry.go/ytsync/sdk"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
@ -22,7 +23,9 @@ func ytcount(cmd *cobra.Command, args []string) {
|
||||||
channelID := args[1]
|
channelID := args[1]
|
||||||
|
|
||||||
s := sync.Sync{
|
s := sync.Sync{
|
||||||
YoutubeAPIKey: ytAPIKey,
|
APIConfig: &sdk.APIConfig{
|
||||||
|
YoutubeAPIKey: ytAPIKey,
|
||||||
|
},
|
||||||
YoutubeChannelID: channelID,
|
YoutubeChannelID: channelID,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/util"
|
"github.com/lbryio/lbry.go/util"
|
||||||
sync "github.com/lbryio/lbry.go/ytsync"
|
sync "github.com/lbryio/lbry.go/ytsync"
|
||||||
|
"github.com/lbryio/lbry.go/ytsync/sdk"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
|
@ -141,35 +142,40 @@ func ytSync(cmd *cobra.Command, args []string) {
|
||||||
blobsDir = usr.HomeDir + "/.lbrynet/blobfiles/"
|
blobsDir = usr.HomeDir + "/.lbrynet/blobfiles/"
|
||||||
}
|
}
|
||||||
|
|
||||||
sm := sync.SyncManager{
|
syncProperties := &sdk.SyncProperties{
|
||||||
StopOnError: stopOnError,
|
SyncFrom: syncFrom,
|
||||||
MaxTries: maxTries,
|
SyncUntil: syncUntil,
|
||||||
TakeOverExistingChannel: takeOverExistingChannel,
|
YoutubeChannelID: channelID,
|
||||||
Refill: refill,
|
|
||||||
Limit: limit,
|
|
||||||
SkipSpaceCheck: skipSpaceCheck,
|
|
||||||
SyncUpdate: syncUpdate,
|
|
||||||
SyncStatus: syncStatus,
|
|
||||||
SyncFrom: syncFrom,
|
|
||||||
SyncUntil: syncUntil,
|
|
||||||
ConcurrentJobs: concurrentJobs,
|
|
||||||
ConcurrentVideos: concurrentJobs,
|
|
||||||
HostName: hostname,
|
|
||||||
YoutubeChannelID: channelID,
|
|
||||||
YoutubeAPIKey: youtubeAPIKey,
|
|
||||||
ApiURL: apiURL,
|
|
||||||
ApiToken: apiToken,
|
|
||||||
BlobsDir: blobsDir,
|
|
||||||
VideosLimit: videosLimit,
|
|
||||||
MaxVideoSize: maxVideoSize,
|
|
||||||
LbrycrdString: lbrycrdString,
|
|
||||||
AwsS3ID: awsS3ID,
|
|
||||||
AwsS3Secret: awsS3Secret,
|
|
||||||
AwsS3Region: awsS3Region,
|
|
||||||
AwsS3Bucket: awsS3Bucket,
|
|
||||||
SingleRun: singleRun,
|
|
||||||
}
|
}
|
||||||
|
apiConfig := &sdk.APIConfig{
|
||||||
|
YoutubeAPIKey: youtubeAPIKey,
|
||||||
|
ApiURL: apiURL,
|
||||||
|
ApiToken: apiToken,
|
||||||
|
HostName: hostname,
|
||||||
|
}
|
||||||
|
sm := sync.NewSyncManager(
|
||||||
|
stopOnError,
|
||||||
|
maxTries,
|
||||||
|
takeOverExistingChannel,
|
||||||
|
refill,
|
||||||
|
limit,
|
||||||
|
skipSpaceCheck,
|
||||||
|
syncUpdate,
|
||||||
|
concurrentJobs,
|
||||||
|
concurrentJobs,
|
||||||
|
blobsDir,
|
||||||
|
videosLimit,
|
||||||
|
maxVideoSize,
|
||||||
|
lbrycrdString,
|
||||||
|
awsS3ID,
|
||||||
|
awsS3Secret,
|
||||||
|
awsS3Region,
|
||||||
|
awsS3Bucket,
|
||||||
|
syncStatus,
|
||||||
|
singleRun,
|
||||||
|
syncProperties,
|
||||||
|
apiConfig,
|
||||||
|
)
|
||||||
err := sm.Start()
|
err := sm.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sync.SendErrorToSlack(err.Error())
|
sync.SendErrorToSlack(err.Error())
|
||||||
|
|
|
@ -11,7 +11,7 @@ import (
|
||||||
|
|
||||||
func (s *Sync) CountVideos() (uint64, error) {
|
func (s *Sync) CountVideos() (uint64, error) {
|
||||||
client := &http.Client{
|
client := &http.Client{
|
||||||
Transport: &transport.APIKey{Key: s.YoutubeAPIKey},
|
Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey},
|
||||||
}
|
}
|
||||||
|
|
||||||
service, err := youtube.New(client)
|
service, err := youtube.New(client)
|
||||||
|
|
|
@ -1,49 +1,71 @@
|
||||||
package ytsync
|
package ytsync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
|
||||||
"net/url"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/errors"
|
"github.com/lbryio/lbry.go/errors"
|
||||||
"github.com/lbryio/lbry.go/null"
|
|
||||||
"github.com/lbryio/lbry.go/util"
|
"github.com/lbryio/lbry.go/util"
|
||||||
|
"github.com/lbryio/lbry.go/ytsync/namer"
|
||||||
|
"github.com/lbryio/lbry.go/ytsync/sdk"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SyncManager struct {
|
type SyncManager struct {
|
||||||
StopOnError bool
|
stopOnError bool
|
||||||
MaxTries int
|
maxTries int
|
||||||
TakeOverExistingChannel bool
|
takeOverExistingChannel bool
|
||||||
Refill int
|
refill int
|
||||||
Limit int
|
limit int
|
||||||
SkipSpaceCheck bool
|
skipSpaceCheck bool
|
||||||
SyncUpdate bool
|
syncUpdate bool
|
||||||
SyncStatus string
|
concurrentJobs int
|
||||||
SyncFrom int64
|
concurrentVideos int
|
||||||
SyncUntil int64
|
blobsDir string
|
||||||
ConcurrentJobs int
|
videosLimit int
|
||||||
ConcurrentVideos int
|
maxVideoSize int
|
||||||
HostName string
|
lbrycrdString string
|
||||||
YoutubeChannelID string
|
awsS3ID string
|
||||||
YoutubeAPIKey string
|
awsS3Secret string
|
||||||
ApiURL string
|
awsS3Region string
|
||||||
ApiToken string
|
syncStatus string
|
||||||
BlobsDir string
|
awsS3Bucket string
|
||||||
VideosLimit int
|
singleRun bool
|
||||||
MaxVideoSize int
|
syncProperties *sdk.SyncProperties
|
||||||
LbrycrdString string
|
apiConfig *sdk.APIConfig
|
||||||
AwsS3ID string
|
namer *namer.Namer
|
||||||
AwsS3Secret string
|
}
|
||||||
AwsS3Region string
|
|
||||||
AwsS3Bucket string
|
func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool, refill int, limit int,
|
||||||
SingleRun bool
|
skipSpaceCheck bool, syncUpdate bool, concurrentJobs int, concurrentVideos int, blobsDir string, videosLimit int,
|
||||||
|
maxVideoSize int, lbrycrdString string, awsS3ID string, awsS3Secret string, awsS3Region string, awsS3Bucket string,
|
||||||
|
syncStatus string, singleRun bool, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig) *SyncManager {
|
||||||
|
return &SyncManager{
|
||||||
|
stopOnError: stopOnError,
|
||||||
|
maxTries: maxTries,
|
||||||
|
takeOverExistingChannel: takeOverExistingChannel,
|
||||||
|
refill: refill,
|
||||||
|
limit: limit,
|
||||||
|
skipSpaceCheck: skipSpaceCheck,
|
||||||
|
syncUpdate: syncUpdate,
|
||||||
|
concurrentJobs: concurrentJobs,
|
||||||
|
concurrentVideos: concurrentVideos,
|
||||||
|
blobsDir: blobsDir,
|
||||||
|
videosLimit: videosLimit,
|
||||||
|
maxVideoSize: maxVideoSize,
|
||||||
|
lbrycrdString: lbrycrdString,
|
||||||
|
awsS3ID: awsS3ID,
|
||||||
|
awsS3Secret: awsS3Secret,
|
||||||
|
awsS3Region: awsS3Region,
|
||||||
|
awsS3Bucket: awsS3Bucket,
|
||||||
|
syncStatus: syncStatus,
|
||||||
|
singleRun: singleRun,
|
||||||
|
syncProperties: syncProperties,
|
||||||
|
apiConfig: apiConfig,
|
||||||
|
namer: namer.NewNamer(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -57,143 +79,13 @@ const (
|
||||||
|
|
||||||
var SyncStatuses = []string{StatusPending, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized}
|
var SyncStatuses = []string{StatusPending, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized}
|
||||||
|
|
||||||
type apiJobsResponse struct {
|
|
||||||
Success bool `json:"success"`
|
|
||||||
Error null.String `json:"error"`
|
|
||||||
Data []apiYoutubeChannel `json:"data"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type apiYoutubeChannel struct {
|
|
||||||
ChannelId string `json:"channel_id"`
|
|
||||||
TotalVideos uint `json:"total_videos"`
|
|
||||||
DesiredChannelName string `json:"desired_channel_name"`
|
|
||||||
SyncServer null.String `json:"sync_server"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) {
|
|
||||||
endpoint := s.ApiURL + "/yt/jobs"
|
|
||||||
res, _ := http.PostForm(endpoint, url.Values{
|
|
||||||
"auth_token": {s.ApiToken},
|
|
||||||
"sync_status": {status},
|
|
||||||
"min_videos": {strconv.Itoa(1)},
|
|
||||||
"after": {strconv.Itoa(int(s.SyncFrom))},
|
|
||||||
"before": {strconv.Itoa(int(s.SyncUntil))},
|
|
||||||
"sync_server": {s.HostName},
|
|
||||||
"channel_id": {s.YoutubeChannelID},
|
|
||||||
})
|
|
||||||
defer res.Body.Close()
|
|
||||||
body, _ := ioutil.ReadAll(res.Body)
|
|
||||||
var response apiJobsResponse
|
|
||||||
err := json.Unmarshal(body, &response)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if response.Data == nil {
|
|
||||||
return nil, errors.Err(response.Error)
|
|
||||||
}
|
|
||||||
log.Printf("Fetched channels: %d", len(response.Data))
|
|
||||||
return response.Data, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type apiChannelStatusResponse struct {
|
|
||||||
Success bool `json:"success"`
|
|
||||||
Error null.String `json:"error"`
|
|
||||||
Data []syncedVideo `json:"data"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type syncedVideo struct {
|
|
||||||
VideoID string `json:"video_id"`
|
|
||||||
Published bool `json:"published"`
|
|
||||||
FailureReason string `json:"failure_reason"`
|
|
||||||
ClaimName string `json:"claim_name"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SyncManager) setChannelStatus(channelID string, status string, failureReason string) (map[string]syncedVideo, map[string]bool, error) {
|
|
||||||
endpoint := s.ApiURL + "/yt/channel_status"
|
|
||||||
if len(failureReason) > maxReasonLength {
|
|
||||||
failureReason = failureReason[:maxReasonLength]
|
|
||||||
}
|
|
||||||
res, _ := http.PostForm(endpoint, url.Values{
|
|
||||||
"channel_id": {channelID},
|
|
||||||
"sync_server": {s.HostName},
|
|
||||||
"auth_token": {s.ApiToken},
|
|
||||||
"sync_status": {status},
|
|
||||||
"failure_reason": {failureReason},
|
|
||||||
})
|
|
||||||
defer res.Body.Close()
|
|
||||||
body, _ := ioutil.ReadAll(res.Body)
|
|
||||||
var response apiChannelStatusResponse
|
|
||||||
err := json.Unmarshal(body, &response)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
if !response.Error.IsNull() {
|
|
||||||
return nil, nil, errors.Err(response.Error.String)
|
|
||||||
}
|
|
||||||
if response.Data != nil {
|
|
||||||
svs := make(map[string]syncedVideo)
|
|
||||||
claimNames := make(map[string]bool)
|
|
||||||
for _, v := range response.Data {
|
|
||||||
svs[v.VideoID] = v
|
|
||||||
claimNames[v.ClaimName] = v.Published
|
|
||||||
}
|
|
||||||
return svs, claimNames, nil
|
|
||||||
}
|
|
||||||
return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
VideoStatusPublished = "published"
|
VideoStatusPublished = "published"
|
||||||
VideoStatusFailed = "failed"
|
VideoStatusFailed = "failed"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *SyncManager) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error {
|
|
||||||
endpoint := s.ApiURL + "/yt/video_status"
|
|
||||||
if len(failureReason) > maxReasonLength {
|
|
||||||
failureReason = failureReason[:maxReasonLength]
|
|
||||||
}
|
|
||||||
vals := url.Values{
|
|
||||||
"youtube_channel_id": {channelID},
|
|
||||||
"video_id": {videoID},
|
|
||||||
"status": {status},
|
|
||||||
"auth_token": {s.ApiToken},
|
|
||||||
}
|
|
||||||
if status == VideoStatusPublished {
|
|
||||||
if claimID == "" || claimName == "" {
|
|
||||||
return errors.Err("claimID or claimName missing")
|
|
||||||
}
|
|
||||||
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
|
|
||||||
vals.Add("claim_id", claimID)
|
|
||||||
vals.Add("claim_name", claimName)
|
|
||||||
if size != nil {
|
|
||||||
vals.Add("size", strconv.FormatInt(*size, 10))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if failureReason != "" {
|
|
||||||
vals.Add("failure_reason", failureReason)
|
|
||||||
}
|
|
||||||
res, _ := http.PostForm(endpoint, vals)
|
|
||||||
defer res.Body.Close()
|
|
||||||
body, _ := ioutil.ReadAll(res.Body)
|
|
||||||
var response struct {
|
|
||||||
Success bool `json:"success"`
|
|
||||||
Error null.String `json:"error"`
|
|
||||||
Data null.String `json:"data"`
|
|
||||||
}
|
|
||||||
err := json.Unmarshal(body, &response)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if !response.Error.IsNull() {
|
|
||||||
return errors.Err(response.Error.String)
|
|
||||||
}
|
|
||||||
if !response.Data.IsNull() && response.Data.String == "ok" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SyncManager) Start() error {
|
func (s *SyncManager) Start() error {
|
||||||
|
|
||||||
syncCount := 0
|
syncCount := 0
|
||||||
for {
|
for {
|
||||||
err := s.checkUsedSpace()
|
err := s.checkUsedSpace()
|
||||||
|
@ -204,9 +96,9 @@ func (s *SyncManager) Start() error {
|
||||||
var syncs []Sync
|
var syncs []Sync
|
||||||
shouldInterruptLoop := false
|
shouldInterruptLoop := false
|
||||||
|
|
||||||
isSingleChannelSync := s.YoutubeChannelID != ""
|
isSingleChannelSync := s.syncProperties.YoutubeChannelID != ""
|
||||||
if isSingleChannelSync {
|
if isSingleChannelSync {
|
||||||
channels, err := s.fetchChannels("")
|
channels, err := s.apiConfig.FetchChannels("", s.syncProperties)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -216,52 +108,53 @@ func (s *SyncManager) Start() error {
|
||||||
lbryChannelName := channels[0].DesiredChannelName
|
lbryChannelName := channels[0].DesiredChannelName
|
||||||
syncs = make([]Sync, 1)
|
syncs = make([]Sync, 1)
|
||||||
syncs[0] = Sync{
|
syncs[0] = Sync{
|
||||||
YoutubeAPIKey: s.YoutubeAPIKey,
|
APIConfig: s.apiConfig,
|
||||||
YoutubeChannelID: s.YoutubeChannelID,
|
YoutubeChannelID: s.syncProperties.YoutubeChannelID,
|
||||||
LbryChannelName: lbryChannelName,
|
LbryChannelName: lbryChannelName,
|
||||||
StopOnError: s.StopOnError,
|
StopOnError: s.stopOnError,
|
||||||
MaxTries: s.MaxTries,
|
MaxTries: s.maxTries,
|
||||||
ConcurrentVideos: s.ConcurrentVideos,
|
ConcurrentVideos: s.concurrentVideos,
|
||||||
TakeOverExistingChannel: s.TakeOverExistingChannel,
|
TakeOverExistingChannel: s.takeOverExistingChannel,
|
||||||
Refill: s.Refill,
|
Refill: s.refill,
|
||||||
Manager: s,
|
Manager: s,
|
||||||
LbrycrdString: s.LbrycrdString,
|
LbrycrdString: s.lbrycrdString,
|
||||||
AwsS3ID: s.AwsS3ID,
|
AwsS3ID: s.awsS3ID,
|
||||||
AwsS3Secret: s.AwsS3Secret,
|
AwsS3Secret: s.awsS3Secret,
|
||||||
AwsS3Region: s.AwsS3Region,
|
AwsS3Region: s.awsS3Region,
|
||||||
AwsS3Bucket: s.AwsS3Bucket,
|
AwsS3Bucket: s.awsS3Bucket,
|
||||||
|
namer: s.namer,
|
||||||
}
|
}
|
||||||
shouldInterruptLoop = true
|
shouldInterruptLoop = true
|
||||||
} else {
|
} else {
|
||||||
var queuesToSync []string
|
var queuesToSync []string
|
||||||
if s.SyncStatus != "" {
|
if s.syncStatus != "" {
|
||||||
queuesToSync = append(queuesToSync, s.SyncStatus)
|
queuesToSync = append(queuesToSync, s.syncStatus)
|
||||||
} else if s.SyncUpdate {
|
} else if s.syncUpdate {
|
||||||
queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced)
|
queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced)
|
||||||
} else {
|
} else {
|
||||||
queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued)
|
queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued)
|
||||||
}
|
}
|
||||||
for _, q := range queuesToSync {
|
for _, q := range queuesToSync {
|
||||||
channels, err := s.fetchChannels(q)
|
channels, err := s.apiConfig.FetchChannels(q, s.syncProperties)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, c := range channels {
|
for _, c := range channels {
|
||||||
syncs = append(syncs, Sync{
|
syncs = append(syncs, Sync{
|
||||||
YoutubeAPIKey: s.YoutubeAPIKey,
|
APIConfig: s.apiConfig,
|
||||||
YoutubeChannelID: c.ChannelId,
|
YoutubeChannelID: c.ChannelId,
|
||||||
LbryChannelName: c.DesiredChannelName,
|
LbryChannelName: c.DesiredChannelName,
|
||||||
StopOnError: s.StopOnError,
|
StopOnError: s.stopOnError,
|
||||||
MaxTries: s.MaxTries,
|
MaxTries: s.maxTries,
|
||||||
ConcurrentVideos: s.ConcurrentVideos,
|
ConcurrentVideos: s.concurrentVideos,
|
||||||
TakeOverExistingChannel: s.TakeOverExistingChannel,
|
TakeOverExistingChannel: s.takeOverExistingChannel,
|
||||||
Refill: s.Refill,
|
Refill: s.refill,
|
||||||
Manager: s,
|
Manager: s,
|
||||||
LbrycrdString: s.LbrycrdString,
|
LbrycrdString: s.lbrycrdString,
|
||||||
AwsS3ID: s.AwsS3ID,
|
AwsS3ID: s.awsS3ID,
|
||||||
AwsS3Secret: s.AwsS3Secret,
|
AwsS3Secret: s.awsS3Secret,
|
||||||
AwsS3Region: s.AwsS3Region,
|
AwsS3Region: s.awsS3Region,
|
||||||
AwsS3Bucket: s.AwsS3Bucket,
|
AwsS3Bucket: s.awsS3Bucket,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -294,12 +187,12 @@ func (s *SyncManager) Start() error {
|
||||||
if !shouldNotCount {
|
if !shouldNotCount {
|
||||||
syncCount++
|
syncCount++
|
||||||
}
|
}
|
||||||
if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) {
|
if sync.IsInterrupted() || (s.limit != 0 && syncCount >= s.limit) {
|
||||||
shouldInterruptLoop = true
|
shouldInterruptLoop = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if shouldInterruptLoop || s.SingleRun {
|
if shouldInterruptLoop || s.singleRun {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -307,11 +200,11 @@ func (s *SyncManager) Start() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SyncManager) checkUsedSpace() error {
|
func (s *SyncManager) checkUsedSpace() error {
|
||||||
usedPctile, err := GetUsedSpace(s.BlobsDir)
|
usedPctile, err := GetUsedSpace(s.blobsDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if usedPctile >= 0.90 && !s.SkipSpaceCheck {
|
if usedPctile >= 0.90 && !s.skipSpaceCheck {
|
||||||
return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100))
|
return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100))
|
||||||
}
|
}
|
||||||
log.Infof("disk usage: %.1f%%", usedPctile*100)
|
log.Infof("disk usage: %.1f%%", usedPctile*100)
|
||||||
|
|
83
ytsync/namer/names.go
Normal file
83
ytsync/namer/names.go
Normal file
|
@ -0,0 +1,83 @@
|
||||||
|
package namer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/md5"
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
|
"regexp"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
var titleRegexp = regexp.MustCompile(`[^a-zA-Z0-9]+`)
|
||||||
|
|
||||||
|
type Namer struct {
|
||||||
|
mu *sync.Mutex
|
||||||
|
names map[string]bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewNamer() *Namer {
|
||||||
|
return &Namer{
|
||||||
|
mu: &sync.Mutex{},
|
||||||
|
names: make(map[string]bool),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Namer) SetNames(names map[string]bool) {
|
||||||
|
n.names = names
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Namer) GetNextName(prefix string) string {
|
||||||
|
n.mu.Lock()
|
||||||
|
defer n.mu.Unlock()
|
||||||
|
|
||||||
|
attempt := 1
|
||||||
|
var name string
|
||||||
|
for {
|
||||||
|
name = getClaimNameFromTitle(prefix, attempt)
|
||||||
|
if _, exists := n.names[name]; !exists {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
attempt++
|
||||||
|
}
|
||||||
|
|
||||||
|
//if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash
|
||||||
|
if len(name) < 2 {
|
||||||
|
sum := md5.Sum([]byte(prefix))
|
||||||
|
name = fmt.Sprintf("%s-%d", hex.EncodeToString(sum[:])[:15], attempt)
|
||||||
|
}
|
||||||
|
|
||||||
|
n.names[name] = true
|
||||||
|
|
||||||
|
return name
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: clean this up some
|
||||||
|
func getClaimNameFromTitle(title string, attempt int) string {
|
||||||
|
suffix := ""
|
||||||
|
if attempt > 1 {
|
||||||
|
suffix = "-" + strconv.Itoa(attempt)
|
||||||
|
}
|
||||||
|
maxLen := 40 - len(suffix)
|
||||||
|
|
||||||
|
chunks := strings.Split(strings.ToLower(strings.Trim(titleRegexp.ReplaceAllString(title, "-"), "-")), "-")
|
||||||
|
|
||||||
|
name := chunks[0]
|
||||||
|
if len(name) > maxLen {
|
||||||
|
return name[:maxLen]
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, chunk := range chunks[1:] {
|
||||||
|
tmpName := name + "-" + chunk
|
||||||
|
if len(tmpName) > maxLen {
|
||||||
|
if len(name) < 20 {
|
||||||
|
name = tmpName[:maxLen]
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
name = tmpName
|
||||||
|
}
|
||||||
|
|
||||||
|
return name + suffix
|
||||||
|
}
|
170
ytsync/sdk/api.go
Normal file
170
ytsync/sdk/api.go
Normal file
|
@ -0,0 +1,170 @@
|
||||||
|
package sdk
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/errors"
|
||||||
|
"github.com/lbryio/lbry.go/null"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
MaxReasonLength = 500
|
||||||
|
)
|
||||||
|
|
||||||
|
type APIConfig struct {
|
||||||
|
YoutubeAPIKey string
|
||||||
|
ApiURL string
|
||||||
|
ApiToken string
|
||||||
|
HostName string
|
||||||
|
}
|
||||||
|
|
||||||
|
type SyncProperties struct {
|
||||||
|
SyncFrom int64
|
||||||
|
SyncUntil int64
|
||||||
|
YoutubeChannelID string
|
||||||
|
}
|
||||||
|
|
||||||
|
type YoutubeChannel struct {
|
||||||
|
ChannelId string `json:"channel_id"`
|
||||||
|
TotalVideos uint `json:"total_videos"`
|
||||||
|
DesiredChannelName string `json:"desired_channel_name"`
|
||||||
|
SyncServer null.String `json:"sync_server"`
|
||||||
|
Fee *struct {
|
||||||
|
Amount string `json:"amount"`
|
||||||
|
Address string `json:"address"`
|
||||||
|
Currency string `json:"currency"`
|
||||||
|
} `json:"fee"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeChannel, error) {
|
||||||
|
type apiJobsResponse struct {
|
||||||
|
Success bool `json:"success"`
|
||||||
|
Error null.String `json:"error"`
|
||||||
|
Data []YoutubeChannel `json:"data"`
|
||||||
|
}
|
||||||
|
endpoint := a.ApiURL + "/yt/jobs"
|
||||||
|
res, _ := http.PostForm(endpoint, url.Values{
|
||||||
|
"auth_token": {a.ApiToken},
|
||||||
|
"sync_status": {status},
|
||||||
|
"min_videos": {strconv.Itoa(1)},
|
||||||
|
"after": {strconv.Itoa(int(cp.SyncFrom))},
|
||||||
|
"before": {strconv.Itoa(int(cp.SyncUntil))},
|
||||||
|
"sync_server": {a.HostName},
|
||||||
|
"channel_id": {cp.YoutubeChannelID},
|
||||||
|
})
|
||||||
|
defer res.Body.Close()
|
||||||
|
body, _ := ioutil.ReadAll(res.Body)
|
||||||
|
var response apiJobsResponse
|
||||||
|
err := json.Unmarshal(body, &response)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if response.Data == nil {
|
||||||
|
return nil, errors.Err(response.Error)
|
||||||
|
}
|
||||||
|
log.Printf("Fetched channels: %d", len(response.Data))
|
||||||
|
return response.Data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type SyncedVideo struct {
|
||||||
|
VideoID string `json:"video_id"`
|
||||||
|
Published bool `json:"published"`
|
||||||
|
FailureReason string `json:"failure_reason"`
|
||||||
|
ClaimName string `json:"claim_name"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *APIConfig) SetChannelStatus(channelID string, status string, failureReason string) (map[string]SyncedVideo, map[string]bool, error) {
|
||||||
|
type apiChannelStatusResponse struct {
|
||||||
|
Success bool `json:"success"`
|
||||||
|
Error null.String `json:"error"`
|
||||||
|
Data []SyncedVideo `json:"data"`
|
||||||
|
}
|
||||||
|
endpoint := a.ApiURL + "/yt/channel_status"
|
||||||
|
if len(failureReason) > MaxReasonLength {
|
||||||
|
failureReason = failureReason[:MaxReasonLength]
|
||||||
|
}
|
||||||
|
res, _ := http.PostForm(endpoint, url.Values{
|
||||||
|
"channel_id": {channelID},
|
||||||
|
"sync_server": {a.HostName},
|
||||||
|
"auth_token": {a.ApiToken},
|
||||||
|
"sync_status": {status},
|
||||||
|
"failure_reason": {failureReason},
|
||||||
|
})
|
||||||
|
defer res.Body.Close()
|
||||||
|
body, _ := ioutil.ReadAll(res.Body)
|
||||||
|
var response apiChannelStatusResponse
|
||||||
|
err := json.Unmarshal(body, &response)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
if !response.Error.IsNull() {
|
||||||
|
return nil, nil, errors.Err(response.Error.String)
|
||||||
|
}
|
||||||
|
if response.Data != nil {
|
||||||
|
svs := make(map[string]SyncedVideo)
|
||||||
|
claimNames := make(map[string]bool)
|
||||||
|
for _, v := range response.Data {
|
||||||
|
svs[v.VideoID] = v
|
||||||
|
claimNames[v.ClaimName] = v.Published
|
||||||
|
}
|
||||||
|
return svs, claimNames, nil
|
||||||
|
}
|
||||||
|
return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
VideoStatusPublished = "published"
|
||||||
|
VideoStatusFailed = "failed"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (a *APIConfig) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error {
|
||||||
|
endpoint := a.ApiURL + "/yt/video_status"
|
||||||
|
if len(failureReason) > MaxReasonLength {
|
||||||
|
failureReason = failureReason[:MaxReasonLength]
|
||||||
|
}
|
||||||
|
vals := url.Values{
|
||||||
|
"youtube_channel_id": {channelID},
|
||||||
|
"video_id": {videoID},
|
||||||
|
"status": {status},
|
||||||
|
"auth_token": {a.ApiToken},
|
||||||
|
}
|
||||||
|
if status == VideoStatusPublished {
|
||||||
|
if claimID == "" || claimName == "" {
|
||||||
|
return errors.Err("claimID or claimName missing")
|
||||||
|
}
|
||||||
|
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
|
||||||
|
vals.Add("claim_id", claimID)
|
||||||
|
vals.Add("claim_name", claimName)
|
||||||
|
if size != nil {
|
||||||
|
vals.Add("size", strconv.FormatInt(*size, 10))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if failureReason != "" {
|
||||||
|
vals.Add("failure_reason", failureReason)
|
||||||
|
}
|
||||||
|
res, _ := http.PostForm(endpoint, vals)
|
||||||
|
defer res.Body.Close()
|
||||||
|
body, _ := ioutil.ReadAll(res.Body)
|
||||||
|
var response struct {
|
||||||
|
Success bool `json:"success"`
|
||||||
|
Error null.String `json:"error"`
|
||||||
|
Data null.String `json:"data"`
|
||||||
|
}
|
||||||
|
err := json.Unmarshal(body, &response)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !response.Error.IsNull() {
|
||||||
|
return errors.Err(response.Error.String)
|
||||||
|
}
|
||||||
|
if !response.Data.IsNull() && response.Data.String == "ok" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
||||||
|
}
|
|
@ -50,8 +50,8 @@ func (s *Sync) walletSetup() error {
|
||||||
s.syncedVideosMux.RUnlock()
|
s.syncedVideosMux.RUnlock()
|
||||||
log.Debugf("We already allocated credits for %d videos", numPublished)
|
log.Debugf("We already allocated credits for %d videos", numPublished)
|
||||||
|
|
||||||
if numOnSource-numPublished > s.Manager.VideosLimit {
|
if numOnSource-numPublished > s.Manager.videosLimit {
|
||||||
numOnSource = s.Manager.VideosLimit
|
numOnSource = s.Manager.videosLimit
|
||||||
}
|
}
|
||||||
|
|
||||||
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount
|
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount
|
||||||
|
|
|
@ -1,90 +1,27 @@
|
||||||
package sources
|
package sources
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"regexp"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"crypto/md5"
|
|
||||||
"encoding/hex"
|
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/jsonrpc"
|
"github.com/lbryio/lbry.go/jsonrpc"
|
||||||
log "github.com/sirupsen/logrus"
|
"github.com/lbryio/lbry.go/ytsync/namer"
|
||||||
)
|
)
|
||||||
|
|
||||||
var titleRegexp = regexp.MustCompile(`[^a-zA-Z0-9]+`)
|
|
||||||
|
|
||||||
type SyncSummary struct {
|
type SyncSummary struct {
|
||||||
ClaimID string
|
ClaimID string
|
||||||
ClaimName string
|
ClaimName string
|
||||||
}
|
}
|
||||||
|
|
||||||
func getClaimNameFromTitle(title string, attempt int) string {
|
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, namer *namer.Namer) (*SyncSummary, error) {
|
||||||
suffix := ""
|
|
||||||
if attempt > 1 {
|
|
||||||
suffix = "-" + strconv.Itoa(attempt)
|
|
||||||
}
|
|
||||||
maxLen := 40 - len(suffix)
|
|
||||||
|
|
||||||
chunks := strings.Split(strings.ToLower(strings.Trim(titleRegexp.ReplaceAllString(title, "-"), "-")), "-")
|
|
||||||
|
|
||||||
name := chunks[0]
|
|
||||||
if len(name) > maxLen {
|
|
||||||
return name[:maxLen]
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, chunk := range chunks[1:] {
|
|
||||||
tmpName := name + "-" + chunk
|
|
||||||
if len(tmpName) > maxLen {
|
|
||||||
if len(name) < 20 {
|
|
||||||
name = tmpName[:maxLen]
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
name = tmpName
|
|
||||||
}
|
|
||||||
|
|
||||||
return name + suffix
|
|
||||||
}
|
|
||||||
|
|
||||||
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
|
|
||||||
attempt := 0
|
|
||||||
for {
|
for {
|
||||||
attempt++
|
name := namer.GetNextName(title)
|
||||||
name := getClaimNameFromTitle(title, attempt)
|
|
||||||
|
|
||||||
syncedVideosMux.Lock()
|
|
||||||
_, exists := claimNames[name]
|
|
||||||
if exists {
|
|
||||||
log.Printf("name exists, retrying (%d attempts so far)", attempt)
|
|
||||||
syncedVideosMux.Unlock()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
claimNames[name] = false
|
|
||||||
syncedVideosMux.Unlock()
|
|
||||||
|
|
||||||
//if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash
|
|
||||||
if len(name) < 2 {
|
|
||||||
hasher := md5.New()
|
|
||||||
hasher.Write([]byte(title))
|
|
||||||
name = fmt.Sprintf("%s-%d", hex.EncodeToString(hasher.Sum(nil))[:15], attempt)
|
|
||||||
}
|
|
||||||
|
|
||||||
response, err := daemon.Publish(name, filename, amount, options)
|
response, err := daemon.Publish(name, filename, amount, options)
|
||||||
if err == nil || strings.Contains(err.Error(), "failed: Multiple claims (") {
|
if err != nil {
|
||||||
syncedVideosMux.Lock()
|
if strings.Contains(err.Error(), "failed: Multiple claims (") {
|
||||||
claimNames[name] = true
|
|
||||||
syncedVideosMux.Unlock()
|
|
||||||
if err == nil {
|
|
||||||
return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
|
|
||||||
} else {
|
|
||||||
log.Printf("name exists, retrying (%d attempts so far)", attempt)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,15 +6,15 @@ import (
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||||
"github.com/aws/aws-sdk-go/service/s3"
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||||
"github.com/lbryio/lbry.go/errors"
|
"github.com/lbryio/lbry.go/errors"
|
||||||
"github.com/lbryio/lbry.go/jsonrpc"
|
"github.com/lbryio/lbry.go/jsonrpc"
|
||||||
|
"github.com/lbryio/lbry.go/ytsync/namer"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/aws/session"
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
|
@ -174,7 +174,7 @@ func (v *ucbVideo) saveThumbnail() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
|
func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *namer.Namer) (*SyncSummary, error) {
|
||||||
options := jsonrpc.PublishOptions{
|
options := jsonrpc.PublishOptions{
|
||||||
Title: &v.title,
|
Title: &v.title,
|
||||||
Author: strPtr("UC Berkeley"),
|
Author: strPtr("UC Berkeley"),
|
||||||
|
@ -187,16 +187,14 @@ func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount f
|
||||||
ChangeAddress: &claimAddress,
|
ChangeAddress: &claimAddress,
|
||||||
}
|
}
|
||||||
|
|
||||||
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
|
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, namer)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *ucbVideo) Size() *int64 {
|
func (v *ucbVideo) Size() *int64 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
|
func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *namer.Namer) (*SyncSummary, error) {
|
||||||
v.claimNames = claimNames
|
|
||||||
v.syncedVideosMux = syncedVideosMux
|
|
||||||
//download and thumbnail can be done in parallel
|
//download and thumbnail can be done in parallel
|
||||||
err := v.download()
|
err := v.download()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -210,7 +208,7 @@ func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount floa
|
||||||
//}
|
//}
|
||||||
//log.Debugln("Created thumbnail for " + v.id)
|
//log.Debugln("Created thumbnail for " + v.id)
|
||||||
|
|
||||||
summary, err := v.publish(daemon, claimAddress, amount, channelID)
|
summary, err := v.publish(daemon, claimAddress, amount, channelID, namer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Prefix("publish error", err)
|
return nil, errors.Prefix("publish error", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,12 +9,12 @@ import (
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/errors"
|
"github.com/lbryio/lbry.go/errors"
|
||||||
"github.com/lbryio/lbry.go/jsonrpc"
|
"github.com/lbryio/lbry.go/jsonrpc"
|
||||||
|
"github.com/lbryio/lbry.go/ytsync/namer"
|
||||||
|
|
||||||
"github.com/rylio/ytdl"
|
"github.com/rylio/ytdl"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
@ -155,6 +155,8 @@ func (v *YoutubeVideo) download() error {
|
||||||
err = videoInfo.Download(formats[formatIndex], downloadedFile)
|
err = videoInfo.Download(formats[formatIndex], downloadedFile)
|
||||||
downloadedFile.Close()
|
downloadedFile.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
//delete the video and ignore the error
|
||||||
|
_ = v.delete()
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
fi, err := os.Stat(v.getFilename())
|
fi, err := os.Stat(v.getFilename())
|
||||||
|
@ -172,7 +174,6 @@ func (v *YoutubeVideo) download() error {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -234,7 +235,7 @@ func (v *YoutubeVideo) triggerThumbnailSave() error {
|
||||||
|
|
||||||
func strPtr(s string) *string { return &s }
|
func strPtr(s string) *string { return &s }
|
||||||
|
|
||||||
func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string) (*SyncSummary, error) {
|
func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *namer.Namer) (*SyncSummary, error) {
|
||||||
if channelID == "" {
|
if channelID == "" {
|
||||||
return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed?
|
return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed?
|
||||||
}
|
}
|
||||||
|
@ -249,18 +250,16 @@ func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amou
|
||||||
ChangeAddress: &claimAddress,
|
ChangeAddress: &claimAddress,
|
||||||
ChannelID: &channelID,
|
ChannelID: &channelID,
|
||||||
}
|
}
|
||||||
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, v.claimNames, v.syncedVideosMux)
|
|
||||||
|
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, namer)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *YoutubeVideo) Size() *int64 {
|
func (v *YoutubeVideo) Size() *int64 {
|
||||||
return v.size
|
return v.size
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
|
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *namer.Namer) (*SyncSummary, error) {
|
||||||
v.claimNames = claimNames
|
|
||||||
v.syncedVideosMux = syncedVideosMux
|
|
||||||
v.maxVideoSize = int64(maxVideoSize) * 1024 * 1024
|
v.maxVideoSize = int64(maxVideoSize) * 1024 * 1024
|
||||||
|
|
||||||
//download and thumbnail can be done in parallel
|
//download and thumbnail can be done in parallel
|
||||||
err := v.download()
|
err := v.download()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -274,14 +273,11 @@ func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount
|
||||||
}
|
}
|
||||||
log.Debugln("Created thumbnail for " + v.id)
|
log.Debugln("Created thumbnail for " + v.id)
|
||||||
|
|
||||||
summary, err := v.publish(daemon, claimAddress, amount, channelID)
|
summary, err := v.publish(daemon, claimAddress, amount, channelID, namer)
|
||||||
//delete the video in all cases (and ignore the error)
|
//delete the video in all cases (and ignore the error)
|
||||||
_ = v.delete()
|
_ = v.delete()
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Prefix("publish error", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return summary, nil
|
return summary, errors.Prefix("publish error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// sorting videos
|
// sorting videos
|
||||||
|
|
|
@ -17,17 +17,20 @@ import (
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/errors"
|
||||||
|
"github.com/lbryio/lbry.go/jsonrpc"
|
||||||
|
"github.com/lbryio/lbry.go/stop"
|
||||||
|
"github.com/lbryio/lbry.go/util"
|
||||||
|
"github.com/lbryio/lbry.go/ytsync/namer"
|
||||||
|
"github.com/lbryio/lbry.go/ytsync/sdk"
|
||||||
|
"github.com/lbryio/lbry.go/ytsync/sources"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||||
"github.com/aws/aws-sdk-go/aws/session"
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
"github.com/aws/aws-sdk-go/service/s3"
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||||
"github.com/lbryio/lbry.go/errors"
|
|
||||||
"github.com/lbryio/lbry.go/jsonrpc"
|
|
||||||
"github.com/lbryio/lbry.go/stop"
|
|
||||||
"github.com/lbryio/lbry.go/util"
|
|
||||||
"github.com/lbryio/lbry.go/ytsync/sources"
|
|
||||||
"github.com/mitchellh/go-ps"
|
"github.com/mitchellh/go-ps"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"google.golang.org/api/googleapi/transport"
|
"google.golang.org/api/googleapi/transport"
|
||||||
|
@ -46,7 +49,7 @@ type video interface {
|
||||||
IDAndNum() string
|
IDAndNum() string
|
||||||
PlaylistPosition() int
|
PlaylistPosition() int
|
||||||
PublishedAt() time.Time
|
PublishedAt() time.Time
|
||||||
Sync(*jsonrpc.Client, string, float64, string, int, map[string]bool, *sync.RWMutex) (*sources.SyncSummary, error)
|
Sync(*jsonrpc.Client, string, float64, string, int, *namer.Namer) (*sources.SyncSummary, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// sorting videos
|
// sorting videos
|
||||||
|
@ -58,7 +61,7 @@ func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[
|
||||||
|
|
||||||
// Sync stores the options that control how syncing happens
|
// Sync stores the options that control how syncing happens
|
||||||
type Sync struct {
|
type Sync struct {
|
||||||
YoutubeAPIKey string
|
APIConfig *sdk.APIConfig
|
||||||
YoutubeChannelID string
|
YoutubeChannelID string
|
||||||
LbryChannelName string
|
LbryChannelName string
|
||||||
StopOnError bool
|
StopOnError bool
|
||||||
|
@ -77,10 +80,10 @@ type Sync struct {
|
||||||
claimAddress string
|
claimAddress string
|
||||||
videoDirectory string
|
videoDirectory string
|
||||||
syncedVideosMux *sync.RWMutex
|
syncedVideosMux *sync.RWMutex
|
||||||
syncedVideos map[string]syncedVideo
|
syncedVideos map[string]sdk.SyncedVideo
|
||||||
claimNames map[string]bool
|
|
||||||
grp *stop.Group
|
grp *stop.Group
|
||||||
lbryChannelID string
|
lbryChannelID string
|
||||||
|
namer *namer.Namer
|
||||||
|
|
||||||
walletMux *sync.Mutex
|
walletMux *sync.Mutex
|
||||||
queue chan video
|
queue chan video
|
||||||
|
@ -89,14 +92,11 @@ type Sync struct {
|
||||||
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) {
|
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) {
|
||||||
s.syncedVideosMux.Lock()
|
s.syncedVideosMux.Lock()
|
||||||
defer s.syncedVideosMux.Unlock()
|
defer s.syncedVideosMux.Unlock()
|
||||||
s.syncedVideos[videoID] = syncedVideo{
|
s.syncedVideos[videoID] = sdk.SyncedVideo{
|
||||||
VideoID: videoID,
|
VideoID: videoID,
|
||||||
Published: published,
|
Published: published,
|
||||||
FailureReason: failureReason,
|
FailureReason: failureReason,
|
||||||
}
|
}
|
||||||
if claimName != "" {
|
|
||||||
s.claimNames[claimName] = true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendErrorToSlack Sends an error message to the default channel and to the process log.
|
// SendErrorToSlack Sends an error message to the default channel and to the process log.
|
||||||
|
@ -221,13 +221,13 @@ func (s *Sync) uploadWallet() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Sync) setStatusSyncing() error {
|
func (s *Sync) setStatusSyncing() error {
|
||||||
syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
|
syncedVideos, claimNames, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.syncedVideosMux.Lock()
|
s.syncedVideosMux.Lock()
|
||||||
s.syncedVideos = syncedVideos
|
s.syncedVideos = syncedVideos
|
||||||
s.claimNames = claimNames
|
s.Manager.namer.SetNames(claimNames)
|
||||||
s.syncedVideosMux.Unlock()
|
s.syncedVideosMux.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -260,7 +260,7 @@ func (s *Sync) FullCycle() (e error) {
|
||||||
|
|
||||||
err = s.downloadWallet()
|
err = s.downloadWallet()
|
||||||
if err != nil && err.Error() != "wallet not on S3" {
|
if err != nil && err.Error() != "wallet not on S3" {
|
||||||
return errors.Prefix("failure in downloading wallet: ", err)
|
return errors.Prefix("failure in downloading wallet", err)
|
||||||
} else if err == nil {
|
} else if err == nil {
|
||||||
log.Println("Continuing previous upload")
|
log.Println("Continuing previous upload")
|
||||||
} else {
|
} else {
|
||||||
|
@ -311,14 +311,13 @@ func (s *Sync) setChannelTerminationStatus(e *error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
failureReason := (*e).Error()
|
failureReason := (*e).Error()
|
||||||
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
|
_, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
|
msg := fmt.Sprintf("Failed setting failed state for channel %s", s.LbryChannelName)
|
||||||
err = errors.Prefix(msg, err)
|
*e = errors.Prefix(msg+err.Error(), *e)
|
||||||
*e = errors.Prefix(err.Error(), *e)
|
|
||||||
}
|
}
|
||||||
} else if !s.IsInterrupted() {
|
} else if !s.IsInterrupted() {
|
||||||
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "")
|
_, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSynced, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
*e = err
|
*e = err
|
||||||
}
|
}
|
||||||
|
@ -359,7 +358,7 @@ func (s *Sync) stopAndUploadWallet(e *error) {
|
||||||
e = &err //not 100% sure
|
e = &err //not 100% sure
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
*e = errors.Prefix("failure uploading wallet: ", *e)
|
*e = errors.Prefix("failure uploading wallet", *e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -424,7 +423,7 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err
|
||||||
pv, ok := s.syncedVideos[videoID]
|
pv, ok := s.syncedVideos[videoID]
|
||||||
if !ok || pv.ClaimName != c.Name {
|
if !ok || pv.ClaimName != c.Name {
|
||||||
fixed++
|
fixed++
|
||||||
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil)
|
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return total, fixed, err
|
return total, fixed, err
|
||||||
}
|
}
|
||||||
|
@ -438,11 +437,11 @@ func (s *Sync) doSync() error {
|
||||||
var err error
|
var err error
|
||||||
claims, err := s.daemon.ClaimListMine()
|
claims, err := s.daemon.ClaimListMine()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Prefix("cannot list claims: ", err)
|
return errors.Prefix("cannot list claims", err)
|
||||||
}
|
}
|
||||||
hasDupes, err := s.fixDupes(*claims)
|
hasDupes, err := s.fixDupes(*claims)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Prefix("error checking for duplicates: ", err)
|
return errors.Prefix("error checking for duplicates", err)
|
||||||
}
|
}
|
||||||
if hasDupes {
|
if hasDupes {
|
||||||
SendInfoToSlack("Channel had dupes and was fixed!")
|
SendInfoToSlack("Channel had dupes and was fixed!")
|
||||||
|
@ -452,13 +451,13 @@ func (s *Sync) doSync() error {
|
||||||
}
|
}
|
||||||
claims, err = s.daemon.ClaimListMine()
|
claims, err = s.daemon.ClaimListMine()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Prefix("cannot list claims: ", err)
|
return errors.Prefix("cannot list claims", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pubsOnWallet, nFixed, err := s.updateRemoteDB(*claims)
|
pubsOnWallet, nFixed, err := s.updateRemoteDB(*claims)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Prefix("error counting claims: ", err)
|
return errors.Prefix("error counting claims", err)
|
||||||
}
|
}
|
||||||
if nFixed > 0 {
|
if nFixed > 0 {
|
||||||
err := s.setStatusSyncing()
|
err := s.setStatusSyncing()
|
||||||
|
@ -466,7 +465,6 @@ func (s *Sync) doSync() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed)
|
SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed)
|
||||||
|
|
||||||
}
|
}
|
||||||
pubsOnDB := 0
|
pubsOnDB := 0
|
||||||
for _, sv := range s.syncedVideos {
|
for _, sv := range s.syncedVideos {
|
||||||
|
@ -593,7 +591,7 @@ func (s *Sync) startWorker(workerNum int) {
|
||||||
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
|
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
|
||||||
}
|
}
|
||||||
s.AppendSyncedVideo(v.ID(), false, err.Error(), "")
|
s.AppendSyncedVideo(v.ID(), false, err.Error(), "")
|
||||||
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
|
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
@ -605,7 +603,7 @@ func (s *Sync) startWorker(workerNum int) {
|
||||||
|
|
||||||
func (s *Sync) enqueueYoutubeVideos() error {
|
func (s *Sync) enqueueYoutubeVideos() error {
|
||||||
client := &http.Client{
|
client := &http.Client{
|
||||||
Transport: &transport.APIKey{Key: s.YoutubeAPIKey},
|
Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey},
|
||||||
}
|
}
|
||||||
|
|
||||||
service, err := youtube.New(client)
|
service, err := youtube.New(client)
|
||||||
|
@ -770,7 +768,7 @@ func (s *Sync) processVideo(v video) (err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if v.PlaylistPosition() > s.Manager.VideosLimit {
|
if v.PlaylistPosition() > s.Manager.videosLimit {
|
||||||
log.Println(v.ID() + " is old: skipping")
|
log.Println(v.ID() + " is old: skipping")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -778,11 +776,13 @@ func (s *Sync) processVideo(v video) (err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.claimNames, s.syncedVideosMux)
|
|
||||||
|
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.maxVideoSize, s.namer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
|
|
||||||
|
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue