Drop sync_server dependency & Update ytdl lib #39

Merged
nikooo777 merged 7 commits from drop-sync-server-dep into master 2018-09-26 22:29:11 +02:00
9 changed files with 283 additions and 166 deletions

146
Gopkg.lock generated
View file

@ -2,18 +2,23 @@
[[projects]] [[projects]]
digest = "1:90272eae3bf75d2aa681ff3eee6cf4f49e8b06db533dc9c830ef214e5abbaaf2"
name = "github.com/PuerkitoBio/goquery" name = "github.com/PuerkitoBio/goquery"
packages = ["."] packages = ["."]
pruneopts = ""
revision = "dc2ec5c7ca4d9aae063b79b9f581dd3ea6afd2b2" revision = "dc2ec5c7ca4d9aae063b79b9f581dd3ea6afd2b2"
version = "v1.4.1" version = "v1.4.1"
[[projects]] [[projects]]
digest = "1:4d0cb5aec47a2aec8b8b211540dc59902f8f00d571281356597b41ae349f4885"
name = "github.com/andybalholm/cascadia" name = "github.com/andybalholm/cascadia"
packages = ["."] packages = ["."]
pruneopts = ""
revision = "901648c87902174f774fac311d7f176f8647bdaa" revision = "901648c87902174f774fac311d7f176f8647bdaa"
version = "v1.0.0" version = "v1.0.0"
[[projects]] [[projects]]
digest = "1:d4336a8cea03c9729c9be12c142938a28a0dbac23a316f019f6fee323cf03709"
name = "github.com/aws/aws-sdk-go" name = "github.com/aws/aws-sdk-go"
packages = [ packages = [
"aws", "aws",
@ -26,19 +31,14 @@
"aws/credentials/ec2rolecreds", "aws/credentials/ec2rolecreds",
"aws/credentials/endpointcreds", "aws/credentials/endpointcreds",
"aws/credentials/stscreds", "aws/credentials/stscreds",
"aws/csm",
"aws/defaults", "aws/defaults",
"aws/ec2metadata", "aws/ec2metadata",
"aws/endpoints", "aws/endpoints",
"aws/request", "aws/request",
"aws/session", "aws/session",
"aws/signer/v4", "aws/signer/v4",
"internal/sdkio",
"internal/sdkrand",
"internal/shareddefaults", "internal/shareddefaults",
"private/protocol", "private/protocol",
"private/protocol/eventstream",
"private/protocol/eventstream/eventstreamapi",
"private/protocol/query", "private/protocol/query",
"private/protocol/query/queryutil", "private/protocol/query/queryutil",
"private/protocol/rest", "private/protocol/rest",
@ -47,13 +47,15 @@
"service/s3", "service/s3",
"service/s3/s3iface", "service/s3/s3iface",
"service/s3/s3manager", "service/s3/s3manager",
"service/sts" "service/sts",
] ]
revision = "d2d4fca90395039f60821c231da759cab8f318c7" pruneopts = ""
version = "v1.14.4" revision = "b69f447375c7fa0047ebcdd8ae5d585d5aac2f71"
version = "v1.10.51"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:ea2251fa804d1b978feac8146d751b32ce2017eaf1f2915fde0df389bacaf383"
name = "github.com/btcsuite/btcd" name = "github.com/btcsuite/btcd"
packages = [ packages = [
"btcec", "btcec",
@ -61,217 +63,311 @@
"chaincfg", "chaincfg",
"chaincfg/chainhash", "chaincfg/chainhash",
"rpcclient", "rpcclient",
"wire" "wire",
] ]
pruneopts = ""
revision = "86fed781132ac890ee03e906e4ecd5d6fa180c64" revision = "86fed781132ac890ee03e906e4ecd5d6fa180c64"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:30d4a548e09bca4a0c77317c58e7407e2a65c15325e944f9c08a7b7992f8a59e"
name = "github.com/btcsuite/btclog" name = "github.com/btcsuite/btclog"
packages = ["."] packages = ["."]
pruneopts = ""
revision = "84c8d2346e9fc8c7b947e243b9c24e6df9fd206a" revision = "84c8d2346e9fc8c7b947e243b9c24e6df9fd206a"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:471ae435f9ad7fc2f6b7a2e91ca026a124792859a7033fa60579c3aa618161ed"
name = "github.com/btcsuite/btcutil" name = "github.com/btcsuite/btcutil"
packages = [ packages = [
".", ".",
"base58", "base58",
"bech32" "bech32",
] ]
pruneopts = ""
revision = "d4cc87b860166d00d6b5b9e0d3b3d71d6088d4d4" revision = "d4cc87b860166d00d6b5b9e0d3b3d71d6088d4d4"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:422f38d57f1bc0fdc34f26d0f1026869a3710400b09b5478c9288efa13573cfa"
name = "github.com/btcsuite/go-socks" name = "github.com/btcsuite/go-socks"
packages = ["socks"] packages = ["socks"]
pruneopts = ""
revision = "4720035b7bfd2a9bb130b1c184f8bbe41b6f0d0f" revision = "4720035b7bfd2a9bb130b1c184f8bbe41b6f0d0f"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:57c911bbbf529465cf2ca5d43546cd5875a59054c41e2fe97791419959282aa1"
name = "github.com/btcsuite/websocket" name = "github.com/btcsuite/websocket"
packages = ["."] packages = ["."]
pruneopts = ""
revision = "31079b6807923eb23992c421b114992b95131b55" revision = "31079b6807923eb23992c421b114992b95131b55"
[[projects]] [[projects]]
digest = "1:0a39ec8bf5629610a4bc7873a92039ee509246da3cef1a0ea60f1ed7e5f9cea5"
name = "github.com/davecgh/go-spew" name = "github.com/davecgh/go-spew"
packages = ["spew"] packages = ["spew"]
pruneopts = ""
revision = "346938d642f2ec3594ed81d874461961cd0faa76" revision = "346938d642f2ec3594ed81d874461961cd0faa76"
version = "v1.1.0" version = "v1.1.0"
[[projects]] [[projects]]
digest = "1:4b5f8c148e7fa094b73bcb6d16ea46eac7fdc726e55b81845ff96e29df534421"
name = "github.com/garyburd/redigo" name = "github.com/garyburd/redigo"
packages = [ packages = [
"internal", "internal",
"redis" "redis",
] ]
pruneopts = ""
revision = "a69d19351219b6dd56f274f96d85a7014a2ec34e" revision = "a69d19351219b6dd56f274f96d85a7014a2ec34e"
version = "v1.6.0" version = "v1.6.0"
[[projects]] [[projects]]
digest = "1:968d8903d598e3fae738325d3410f33f07ea6a2b9ee5591e9c262ee37df6845a"
name = "github.com/go-errors/errors" name = "github.com/go-errors/errors"
packages = ["."] packages = ["."]
pruneopts = ""
revision = "a6af135bd4e28680facf08a3d206b454abc877a4" revision = "a6af135bd4e28680facf08a3d206b454abc877a4"
version = "v1.0.1" version = "v1.0.1"
[[projects]] [[projects]]
branch = "master"
digest = "1:4d5221853226d8d4be594d52d885ddde38170d2e3159b82ed92ecde4dded2304"
name = "github.com/go-ini/ini" name = "github.com/go-ini/ini"
packages = ["."] packages = ["."]
revision = "06f5f3d67269ccec1fe5fe4134ba6e982984f7f5" pruneopts = ""
version = "v1.37.0" revision = "5cf292cae48347c2490ac1a58fe36735fb78df7e"
[[projects]] [[projects]]
digest = "1:b1d3041d568e065ab4d76f7477844458e9209c0bb241eaccdc0770bf0a13b120"
name = "github.com/golang/protobuf" name = "github.com/golang/protobuf"
packages = ["proto"] packages = ["proto"]
pruneopts = ""
revision = "b4deda0973fb4c70b50d226b1af49f3da59f5265" revision = "b4deda0973fb4c70b50d226b1af49f3da59f5265"
version = "v1.1.0" version = "v1.1.0"
[[projects]] [[projects]]
digest = "1:fe1b4d4cbe48c0d55507c55f8663aa4185576cc58fa0c8be03bb8f19dfe17a9c"
name = "github.com/gorilla/websocket" name = "github.com/gorilla/websocket"
packages = ["."] packages = ["."]
pruneopts = ""
revision = "ea4d1f681babbce9545c9c5f3d5194a789c89f5b" revision = "ea4d1f681babbce9545c9c5f3d5194a789c89f5b"
version = "v1.2.0" version = "v1.2.0"
[[projects]] [[projects]]
digest = "1:870d441fe217b8e689d7949fef6e43efbc787e50f200cb1e70dbca9204a1d6be"
name = "github.com/inconshreveable/mousetrap" name = "github.com/inconshreveable/mousetrap"
packages = ["."] packages = ["."]
pruneopts = ""
revision = "76626ae9c91c4f2a10f34cad8ce83ea42c93bb75" revision = "76626ae9c91c4f2a10f34cad8ce83ea42c93bb75"
version = "v1.0" version = "v1.0"
[[projects]] [[projects]]
digest = "1:4f767a115bc8e08576f6d38ab73c376fc1b1cd3bb5041171c9e8668cc7739b52"
name = "github.com/jmespath/go-jmespath" name = "github.com/jmespath/go-jmespath"
packages = ["."] packages = ["."]
pruneopts = ""
revision = "0b12d6b5" revision = "0b12d6b5"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:375104fd705791c50351e652a9d80321813fefc4f83a7871cb2f6111a5bc1dc3"
name = "github.com/lbryio/lbryschema.go" name = "github.com/lbryio/lbryschema.go"
packages = ["pb"] packages = ["pb"]
pruneopts = ""
revision = "185433f2fd0c732547654749b98b37e56223dd22" revision = "185433f2fd0c732547654749b98b37e56223dd22"
[[projects]] [[projects]]
digest = "1:daad05ffdae6e2cd9bd9bbc14440e7e8e841037141f26a775a5a31b1b61cb14d"
name = "github.com/lbryio/ozzo-validation" name = "github.com/lbryio/ozzo-validation"
packages = ["."] packages = ["."]
pruneopts = ""
revision = "d1008ad1fd04ceb5faedaf34881df0c504382706" revision = "d1008ad1fd04ceb5faedaf34881df0c504382706"
version = "v3.1" version = "v3.1"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:1dee6133ab829c8559a39031ad1e0e3538e4a7b34d3e0509d1fc247737e928c1"
name = "github.com/mitchellh/go-ps" name = "github.com/mitchellh/go-ps"
packages = ["."] packages = ["."]
pruneopts = ""
revision = "4fdf99ab29366514c69ccccddab5dc58b8d84062" revision = "4fdf99ab29366514c69ccccddab5dc58b8d84062"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:eb9117392ee8e7aa44f78e0db603f70b1050ee0ebda4bd40040befb5b218c546"
name = "github.com/mitchellh/mapstructure" name = "github.com/mitchellh/mapstructure"
packages = ["."] packages = ["."]
pruneopts = ""
revision = "bb74f1db0675b241733089d5a1faa5dd8b0ef57b" revision = "bb74f1db0675b241733089d5a1faa5dd8b0ef57b"
[[projects]] [[projects]]
branch = "master" digest = "1:ba7d1dfde0c2142011332bffce4d8468310228afd49dd4425ac59fa9124fb7c4"
name = "github.com/nikooo777/ytdl"
packages = ["."]
revision = "64fe19bb69f66746824eb7812caf9f382c0e0bd8"
[[projects]]
name = "github.com/nlopes/slack" name = "github.com/nlopes/slack"
packages = ["."] packages = ["."]
pruneopts = ""
revision = "8ab4d0b364ef1e9af5d102531da20d5ec902b6c4" revision = "8ab4d0b364ef1e9af5d102531da20d5ec902b6c4"
version = "v0.2.0" version = "v0.2.0"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:1ee326e6da62d87b3a07a9303d2cbb70f974207d14e6d992c61b6e650ff37c50"
name = "github.com/rylio/ytdl"
packages = ["."]
pruneopts = ""
revision = "06f6510946275931157f5fe73f55ec7d6fd65870"
[[projects]]
branch = "master"
digest = "1:67b7dcb3b7e67cb6f96fb38fe7358bc1210453189da210e40cf357a92d57c1c1"
name = "github.com/shopspring/decimal" name = "github.com/shopspring/decimal"
packages = ["."] packages = ["."]
pruneopts = ""
revision = "19e3cb6c29303990525b56f51acf77c5630dd88a" revision = "19e3cb6c29303990525b56f51acf77c5630dd88a"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:4cb1f758b69097d419a148e64c86b358a4f77c695504f99de1ee86617d64f74e"
name = "github.com/sirupsen/logrus" name = "github.com/sirupsen/logrus"
packages = ["."] packages = ["."]
pruneopts = ""
revision = "ea8897e79973357ba785ac2533559a6297e83c44" revision = "ea8897e79973357ba785ac2533559a6297e83c44"
[[projects]] [[projects]]
branch = "master"
digest = "1:d0b38ba6da419a6d4380700218eeec8623841d44a856bb57369c172fbf692ab4"
name = "github.com/spf13/cast" name = "github.com/spf13/cast"
packages = ["."] packages = ["."]
pruneopts = ""
revision = "8965335b8c7107321228e3e3702cab9832751bac" revision = "8965335b8c7107321228e3e3702cab9832751bac"
version = "v1.2.0"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:7bcdb212f21d3cf318699d50af69a9192ef73fedad0d94d9ed5616f349457881"
name = "github.com/spf13/cobra" name = "github.com/spf13/cobra"
packages = ["."] packages = ["."]
pruneopts = ""
revision = "1e58aa3361fd650121dceeedc399e7189c05674a" revision = "1e58aa3361fd650121dceeedc399e7189c05674a"
[[projects]] [[projects]]
digest = "1:8e243c568f36b09031ec18dff5f7d2769dcf5ca4d624ea511c8e3197dc3d352d"
name = "github.com/spf13/pflag" name = "github.com/spf13/pflag"
packages = ["."] packages = ["."]
pruneopts = ""
revision = "583c0c0531f06d5278b7d917446061adc344b5cd" revision = "583c0c0531f06d5278b7d917446061adc344b5cd"
version = "v1.0.1" version = "v1.0.1"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:22d3674d44ee93f52a9c0b6a22d1f736a0ad9ac3f9d2c1ca8648f3c9ce9910bd"
name = "github.com/ybbus/jsonrpc" name = "github.com/ybbus/jsonrpc"
packages = ["."] packages = ["."]
pruneopts = ""
revision = "2a548b7d822dd62717337a6b1e817fae1b14660a" revision = "2a548b7d822dd62717337a6b1e817fae1b14660a"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:3610c577942fbfd2c8975d70a2342bbd13f30cf214237fb8f920c9a6cec0f14a"
name = "github.com/zeebo/bencode" name = "github.com/zeebo/bencode"
packages = ["."] packages = ["."]
pruneopts = ""
revision = "d522839ac797fc43269dae6a04a1f8be475a915d" revision = "d522839ac797fc43269dae6a04a1f8be475a915d"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:e7dc596c84a908dd326d2c07453307f192976c3edb9482b88290c42076fe378f"
name = "golang.org/x/crypto" name = "golang.org/x/crypto"
packages = [ packages = [
"ripemd160", "ripemd160",
"sha3", "sha3",
"ssh/terminal" "ssh/terminal",
] ]
pruneopts = ""
revision = "8ac0e0d97ce45cd83d1d7243c060cb8461dda5e9" revision = "8ac0e0d97ce45cd83d1d7243c060cb8461dda5e9"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:3edb9c19d0b874999053badbbcc08edab3cde0262d2beb36ad6c0d78391c19ac"
name = "golang.org/x/net" name = "golang.org/x/net"
packages = [ packages = [
"context", "context",
"context/ctxhttp", "context/ctxhttp",
"html", "html",
"html/atom" "html/atom",
] ]
pruneopts = ""
revision = "db08ff08e8622530d9ed3a0e8ac279f6d4c02196" revision = "db08ff08e8622530d9ed3a0e8ac279f6d4c02196"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:b1ac49fd3eae66e95230ea3423158b18374a5ad1c53caf89bc7fc1a441e9e0e7"
name = "golang.org/x/sys" name = "golang.org/x/sys"
packages = [ packages = [
"unix", "unix",
"windows" "windows",
] ]
pruneopts = ""
revision = "bff228c7b664c5fce602223a05fb708fd8654986" revision = "bff228c7b664c5fce602223a05fb708fd8654986"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:b4f82373e582dff1470e66574ac664b548aa69cffce0943321797fd75f46ee6f"
name = "google.golang.org/api" name = "google.golang.org/api"
packages = [ packages = [
"gensupport", "gensupport",
"googleapi", "googleapi",
"googleapi/internal/uritemplates", "googleapi/internal/uritemplates",
"googleapi/transport", "googleapi/transport",
"youtube/v3" "youtube/v3",
] ]
pruneopts = ""
revision = "ef86ce4234efee96020bde00391d6a9cfae66561" revision = "ef86ce4234efee96020bde00391d6a9cfae66561"
[[projects]] [[projects]]
digest = "1:05eca53b271663de74078b5484b1995a8d56668a51434a698dc5d0863035d575"
name = "gopkg.in/nullbio/null.v6" name = "gopkg.in/nullbio/null.v6"
packages = ["convert"] packages = ["convert"]
pruneopts = ""
revision = "40264a2e6b7972d183906cf17663983c23231c82" revision = "40264a2e6b7972d183906cf17663983c23231c82"
version = "v6.3" version = "v6.3"
[solve-meta] [solve-meta]
analyzer-name = "dep" analyzer-name = "dep"
analyzer-version = 1 analyzer-version = 1
inputs-digest = "adebfeec40e94ad17cba1f18d8f47da60af4106a50386ebce51650d036e3d4ac" input-imports = [
"github.com/aws/aws-sdk-go/aws",
"github.com/aws/aws-sdk-go/aws/awserr",
"github.com/aws/aws-sdk-go/aws/credentials",
"github.com/aws/aws-sdk-go/aws/session",
"github.com/aws/aws-sdk-go/service/s3",
"github.com/aws/aws-sdk-go/service/s3/s3manager",
"github.com/btcsuite/btcd/chaincfg",
"github.com/btcsuite/btcd/chaincfg/chainhash",
"github.com/btcsuite/btcd/rpcclient",
"github.com/btcsuite/btcutil",
"github.com/btcsuite/btcutil/base58",
"github.com/davecgh/go-spew/spew",
"github.com/garyburd/redigo/redis",
"github.com/go-errors/errors",
"github.com/go-ini/ini",
"github.com/lbryio/lbryschema.go/pb",
"github.com/lbryio/ozzo-validation",
"github.com/mitchellh/go-ps",
"github.com/mitchellh/mapstructure",
"github.com/nlopes/slack",
"github.com/rylio/ytdl",
"github.com/shopspring/decimal",
"github.com/sirupsen/logrus",
"github.com/spf13/cast",
"github.com/spf13/cobra",
"github.com/ybbus/jsonrpc",
"github.com/zeebo/bencode",
"golang.org/x/crypto/ripemd160",
"golang.org/x/crypto/sha3",
"google.golang.org/api/googleapi/transport",
"google.golang.org/api/youtube/v3",
"gopkg.in/nullbio/null.v6/convert",
]
solver-name = "gps-cdcl" solver-name = "gps-cdcl"
solver-version = 1 solver-version = 1

View file

@ -12,7 +12,7 @@
[[constraint]] [[constraint]]
branch = "master" branch = "master"
name = "github.com/nikooo777/ytdl" name = "github.com/rylio/ytdl"
[[constraint]] [[constraint]]
branch = "master" branch = "master"

View file

@ -468,3 +468,18 @@ func (d *Client) ClaimListMine() (*ClaimListMineResponse, error) {
return response, nil return response, nil
} }
func (d *Client) ClaimAbandon(txID string, nOut int) (*ClaimAbandonResponse, error) {
response := new(ClaimAbandonResponse)
err := d.call(response, "claim_abandon", map[string]interface{}{
"txid": txID,
"nout": nOut,
})
if err != nil {
return nil, err
} else if response == nil {
return nil, errors.Err("no response")
}
return response, nil
}

View file

@ -243,6 +243,10 @@ type ClaimListResponse struct {
} }
type ClaimListMineResponse []Claim type ClaimListMineResponse []Claim
type ClaimShowResponse Claim type ClaimShowResponse Claim
type ClaimAbandonResponse struct {
Txid string `json:"txid"`
Fee float64 `json:"fee"`
}
type PeerListResponsePeer struct { type PeerListResponsePeer struct {
IP string `json:"host"` IP string `json:"host"`

View file

@ -78,8 +78,8 @@ func (s *SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error)
"min_videos": {strconv.Itoa(1)}, "min_videos": {strconv.Itoa(1)},
"after": {strconv.Itoa(int(s.SyncFrom))}, "after": {strconv.Itoa(int(s.SyncFrom))},
"before": {strconv.Itoa(int(s.SyncUntil))}, "before": {strconv.Itoa(int(s.SyncUntil))},
//"sync_server": {s.HostName}, "sync_server": {s.HostName},
"channel_id": {s.YoutubeChannelID}, "channel_id": {s.YoutubeChannelID},
}) })
defer res.Body.Close() defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body) body, _ := ioutil.ReadAll(res.Body)
@ -214,9 +214,6 @@ func (s *SyncManager) Start() error {
return errors.Err("Expected 1 channel, %d returned", len(channels)) return errors.Err("Expected 1 channel, %d returned", len(channels))
} }
lbryChannelName := channels[0].DesiredChannelName lbryChannelName := channels[0].DesiredChannelName
if !s.isWorthProcessing(channels[0]) {
break
}
syncs = make([]Sync, 1) syncs = make([]Sync, 1)
syncs[0] = Sync{ syncs[0] = Sync{
YoutubeAPIKey: s.YoutubeAPIKey, YoutubeAPIKey: s.YoutubeAPIKey,
@ -250,9 +247,6 @@ func (s *SyncManager) Start() error {
return err return err
} }
for _, c := range channels { for _, c := range channels {
if !s.isWorthProcessing(c) {
continue
}
syncs = append(syncs, Sync{ syncs = append(syncs, Sync{
YoutubeAPIKey: s.YoutubeAPIKey, YoutubeAPIKey: s.YoutubeAPIKey,
YoutubeChannelID: c.ChannelId, YoutubeChannelID: c.ChannelId,
@ -312,10 +306,6 @@ func (s *SyncManager) Start() error {
return nil return nil
} }
func (s *SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool {
return channel.TotalVideos > 0 && (channel.SyncServer.IsNull() || channel.SyncServer.String == s.HostName)
}
func (s *SyncManager) checkUsedSpace() error { func (s *SyncManager) checkUsedSpace() error {
usedPctile, err := GetUsedSpace(s.BlobsDir) usedPctile, err := GetUsedSpace(s.BlobsDir)
if err != nil { if err != nil {

View file

@ -1,63 +0,0 @@
package redisdb
import (
"time"
"github.com/lbryio/lbry.go/errors"
"github.com/garyburd/redigo/redis"
)
const (
redisHashKey = "ytsync"
redisSyncedVal = "t"
)
type DB struct {
pool *redis.Pool
}
func New() *DB {
var r DB
r.pool = &redis.Pool{
MaxIdle: 3,
IdleTimeout: 5 * time.Minute,
Dial: func() (redis.Conn, error) { return redis.Dial("tcp", ":6379") },
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return err
},
}
return &r
}
func (r DB) IsPublished(id string) (bool, error) {
conn := r.pool.Get()
defer conn.Close()
alreadyPublished, err := redis.String(conn.Do("HGET", redisHashKey, id))
if err != nil && err != redis.ErrNil {
return false, errors.Prefix("redis error", err)
}
if alreadyPublished == redisSyncedVal {
return true, nil
}
return false, nil
}
func (r DB) SetPublished(id string) error {
conn := r.pool.Get()
defer conn.Close()
_, err := redis.Bool(conn.Do("HSET", redisHashKey, id, redisSyncedVal))
if err != nil {
return errors.Prefix("redis error", err)
}
return nil
}

View file

@ -48,7 +48,7 @@ func (s *Sync) walletSetup() error {
s.syncedVideosMux.RLock() s.syncedVideosMux.RLock()
numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones... numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
s.syncedVideosMux.RUnlock() s.syncedVideosMux.RUnlock()
log.Debugf("We already published %d videos", numPublished) log.Debugf("We already allocated credits for %d videos", numPublished)
if numOnSource-numPublished > s.Manager.VideosLimit { if numOnSource-numPublished > s.Manager.VideosLimit {
numOnSource = s.Manager.VideosLimit numOnSource = s.Manager.VideosLimit

View file

@ -16,7 +16,7 @@ import (
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
"github.com/nikooo777/ytdl" "github.com/rylio/ytdl"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"google.golang.org/api/youtube/v3" "google.golang.org/api/youtube/v3"
) )
@ -28,6 +28,7 @@ type YoutubeVideo struct {
description string description string
playlistPosition int64 playlistPosition int64
size *int64 size *int64
maxVideoSize int64
publishedAt time.Time publishedAt time.Time
dir string dir string
claimNames map[string]bool claimNames map[string]bool
@ -121,14 +122,58 @@ func (v *YoutubeVideo) download() error {
return err return err
} }
var downloadedFile *os.File codec := []string{"H.264"}
downloadedFile, err = os.Create(videoPath) ext := []string{"mp4"}
defer downloadedFile.Close()
if err != nil { //Filter requires a [] interface{}
return err codecFilter := make([]interface{}, len(codec))
for i, v := range codec {
codecFilter[i] = v
} }
return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[1], downloadedFile) //Filter requires a [] interface{}
extFilter := make([]interface{}, len(ext))
for i, v := range ext {
extFilter[i] = v
}
formats := videoInfo.Formats.Filter(ytdl.FormatVideoEncodingKey, codecFilter).Filter(ytdl.FormatExtensionKey, extFilter)
if len(formats) == 0 {
return errors.Err("no compatible format available for this video")
}
maxRetryAttempts := 5
for i := 0; i < len(formats) && i < maxRetryAttempts; i++ {
formatIndex := i
if i == maxRetryAttempts-1 {
formatIndex = len(formats) - 1
}
var downloadedFile *os.File
downloadedFile, err = os.Create(videoPath)
if err != nil {
return err
}
err = videoInfo.Download(formats[formatIndex], downloadedFile)
downloadedFile.Close()
if err != nil {
break
}
fi, err := os.Stat(v.getFilename())
if err != nil {
return err
}
videoSize := fi.Size()
v.size = &videoSize
if videoSize > v.maxVideoSize {
//delete the video and ignore the error
_ = v.delete()
err = errors.Err("file is too big and there is no other format available")
} else {
break
}
}
return err
} }
func (v *YoutubeVideo) videoDir() string { func (v *YoutubeVideo) videoDir() string {
@ -214,6 +259,8 @@ func (v *YoutubeVideo) Size() *int64 {
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) { func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
v.claimNames = claimNames v.claimNames = claimNames
v.syncedVideosMux = syncedVideosMux v.syncedVideosMux = syncedVideosMux
v.maxVideoSize = int64(maxVideoSize) * 1024 * 1024
//download and thumbnail can be done in parallel //download and thumbnail can be done in parallel
err := v.download() err := v.download()
if err != nil { if err != nil {
@ -221,19 +268,6 @@ func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount
} }
log.Debugln("Downloaded " + v.id) log.Debugln("Downloaded " + v.id)
fi, err := os.Stat(v.getFilename())
if err != nil {
return nil, err
}
videoSize := fi.Size()
v.size = &videoSize
if videoSize > int64(maxVideoSize)*1024*1024 {
//delete the video and ignore the error
_ = v.delete()
return nil, errors.Err("the video is too big to sync, skipping for now")
}
err = v.triggerThumbnailSave() err = v.triggerThumbnailSave()
if err != nil { if err != nil {
return nil, errors.Prefix("thumbnail error", err) return nil, errors.Prefix("thumbnail error", err)

View file

@ -27,7 +27,6 @@ import (
"github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stop" "github.com/lbryio/lbry.go/stop"
"github.com/lbryio/lbry.go/util" "github.com/lbryio/lbry.go/util"
"github.com/lbryio/lbry.go/ytsync/redisdb"
"github.com/lbryio/lbry.go/ytsync/sources" "github.com/lbryio/lbry.go/ytsync/sources"
"github.com/mitchellh/go-ps" "github.com/mitchellh/go-ps"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -77,7 +76,6 @@ type Sync struct {
daemon *jsonrpc.Client daemon *jsonrpc.Client
claimAddress string claimAddress string
videoDirectory string videoDirectory string
db *redisdb.DB
syncedVideosMux *sync.RWMutex syncedVideosMux *sync.RWMutex
syncedVideos map[string]syncedVideo syncedVideos map[string]syncedVideo
claimNames map[string]bool claimNames map[string]bool
@ -222,6 +220,18 @@ func (s *Sync) uploadWallet() error {
return os.Remove(defaultWalletDir) return os.Remove(defaultWalletDir)
} }
func (s *Sync) setStatusSyncing() error {
syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
if err != nil {
return err
}
s.syncedVideosMux.Lock()
s.syncedVideos = syncedVideos
s.claimNames = claimNames
s.syncedVideosMux.Unlock()
return nil
}
func (s *Sync) FullCycle() (e error) { func (s *Sync) FullCycle() (e error) {
if os.Getenv("HOME") == "" { if os.Getenv("HOME") == "" {
return errors.Err("no $HOME env var found") return errors.Err("no $HOME env var found")
@ -231,7 +241,6 @@ func (s *Sync) FullCycle() (e error) {
} }
s.syncedVideosMux = &sync.RWMutex{} s.syncedVideosMux = &sync.RWMutex{}
s.walletMux = &sync.Mutex{} s.walletMux = &sync.Mutex{}
s.db = redisdb.New()
s.grp = stop.New() s.grp = stop.New()
s.queue = make(chan video) s.queue = make(chan video)
interruptChan := make(chan os.Signal, 1) interruptChan := make(chan os.Signal, 1)
@ -242,16 +251,12 @@ func (s *Sync) FullCycle() (e error) {
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
s.grp.Stop() s.grp.Stop()
}() }()
syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "") err := s.setStatusSyncing()
if err != nil { if err != nil {
return err return err
} }
s.syncedVideosMux.Lock()
s.syncedVideos = syncedVideos
s.claimNames = claimNames
s.syncedVideosMux.Unlock()
defer s.updateChannelStatus(&e) defer s.setChannelTerminationStatus(&e)
err = s.downloadWallet() err = s.downloadWallet()
if err != nil && err.Error() != "wallet not on S3" { if err != nil && err.Error() != "wallet not on S3" {
@ -296,7 +301,7 @@ func (s *Sync) FullCycle() (e error) {
return nil return nil
} }
func (s *Sync) updateChannelStatus(e *error) { func (s *Sync) setChannelTerminationStatus(e *error) {
if *e != nil { if *e != nil {
//conditions for which a channel shouldn't be marked as failed //conditions for which a channel shouldn't be marked as failed
noFailConditions := []string{ noFailConditions := []string{
@ -321,20 +326,20 @@ func (s *Sync) updateChannelStatus(e *error) {
} }
func (s *Sync) waitForDaemonStart() error { func (s *Sync) waitForDaemonStart() error {
for { for {
select { select {
case <-s.grp.Ch(): case <-s.grp.Ch():
return errors.Err("interrupted during daemon startup") return errors.Err("interrupted during daemon startup")
default: default:
s, err := s.daemon.Status() s, err := s.daemon.Status()
if err == nil && s.StartupStatus.Wallet { if err == nil && s.StartupStatus.Wallet && s.StartupStatus.FileManager {
return nil return nil
} }
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
} }
} }
} }
func (s *Sync) stopAndUploadWallet(e *error) { func (s *Sync) stopAndUploadWallet(e *error) {
log.Printf("Stopping daemon") log.Printf("Stopping daemon")
shutdownErr := stopDaemonViaSystemd() shutdownErr := stopDaemonViaSystemd()
@ -365,40 +370,68 @@ func logShutdownError(shutdownErr error) {
SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR") SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
} }
func hasDupes(claims []jsonrpc.Claim) (bool, error) { // fixDupes abandons duplicate claims
videoIDs := make(map[string]interface{}) func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) {
abandonedClaims := false
videoIDs := make(map[string]jsonrpc.Claim)
for _, c := range claims { for _, c := range claims {
if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil { if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil {
continue continue
} }
if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil { if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil {
return false, errors.Err("something is wrong with the this claim: %s", c.ClaimID) return false, errors.Err("something is wrong with this claim: %s", c.ClaimID)
} }
tn := *c.Value.Stream.Metadata.Thumbnail tn := *c.Value.Stream.Metadata.Thumbnail
videoID := tn[:strings.LastIndex(tn, "/")+1] videoID := tn[strings.LastIndex(tn, "/")+1:]
_, ok := videoIDs[videoID]
if !ok { log.Infof("claimid: %s, claimName: %s, videoID: %s", c.ClaimID, c.Name, videoID)
videoIDs[videoID] = nil cl, ok := videoIDs[videoID]
if !ok || cl.ClaimID == c.ClaimID {
videoIDs[videoID] = c
continue continue
} }
return true, nil // only keep the most recent one
claimToAbandon := c
videoIDs[videoID] = cl
if c.Height > cl.Height {
claimToAbandon = cl
videoIDs[videoID] = c
}
_, err := s.daemon.ClaimAbandon(claimToAbandon.Txid, claimToAbandon.Nout)
if err != nil {
return true, err
}
log.Debugf("abandoning %+v", claimToAbandon)
abandonedClaims = true
//return true, nil
} }
return false, nil return abandonedClaims, nil
} }
//publishesCount counts the amount of videos published so far //updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published
func publishesCount(claims []jsonrpc.Claim) (int, error) { func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err error) {
count := 0 count := 0
for _, c := range claims { for _, c := range claims {
if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil { if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil {
continue continue
} }
if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil { if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil {
return count, errors.Err("something is wrong with the this claim: %s", c.ClaimID) return count, fixed, errors.Err("something is wrong with the this claim: %s", c.ClaimID)
} }
count++ //check if claimID is in remote db
tn := *c.Value.Stream.Metadata.Thumbnail
videoID := tn[strings.LastIndex(tn, "/")+1:]
pv, ok := s.syncedVideos[videoID]
if !ok || pv.ClaimName != c.Name {
fixed++
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil)
if err != nil {
return total, fixed, err
}
}
total++
} }
return count, nil return total, fixed, nil
} }
func (s *Sync) doSync() error { func (s *Sync) doSync() error {
@ -407,28 +440,47 @@ func (s *Sync) doSync() error {
if err != nil { if err != nil {
return errors.Prefix("cannot list claims: ", err) return errors.Prefix("cannot list claims: ", err)
} }
hasDupes, err := hasDupes(*claims) hasDupes, err := s.fixDupes(*claims)
if err != nil { if err != nil {
return errors.Prefix("error checking for duplicates: ", err) return errors.Prefix("error checking for duplicates: ", err)
} }
if hasDupes { if hasDupes {
return errors.Err("channel has duplicates! Manual fix required") SendInfoToSlack("Channel had dupes and was fixed!")
err = s.waitForNewBlock()
if err != nil {
return err
}
claims, err = s.daemon.ClaimListMine()
if err != nil {
return errors.Prefix("cannot list claims: ", err)
}
} }
pubsOnWallet, err := publishesCount(*claims)
pubsOnWallet, nFixed, err := s.updateRemoteDB(*claims)
if err != nil { if err != nil {
return errors.Prefix("error counting claims: ", err) return errors.Prefix("error counting claims: ", err)
} }
if nFixed > 0 {
err := s.setStatusSyncing()
if err != nil {
return err
}
SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed)
}
pubsOnDB := 0 pubsOnDB := 0
for _, sv := range s.syncedVideos { for _, sv := range s.syncedVideos {
if sv.Published { if sv.Published {
pubsOnDB++ pubsOnDB++
} }
} }
if pubsOnWallet > pubsOnDB {
if pubsOnWallet > pubsOnDB { //This case should never happen
SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
return errors.Err("not all published videos are in the database") return errors.Err("not all published videos are in the database")
} }
if pubsOnWallet < pubsOnDB { if pubsOnWallet < pubsOnDB {
SendInfoToSlack("We're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
} }
err = s.walletSetup() err = s.walletSetup()
if err != nil { if err != nil {
@ -509,6 +561,7 @@ func (s *Sync) startWorker(workerNum int) {
"Playback on other websites has been disabled by the video owner", "Playback on other websites has been disabled by the video owner",
"Error in daemon: Cannot publish empty file", "Error in daemon: Cannot publish empty file",
"Error extracting sts from embedded url response", "Error extracting sts from embedded url response",
"Unable to extract signature tokens",
"Client.Timeout exceeded while awaiting headers)", "Client.Timeout exceeded while awaiting headers)",
"the video is too big to sync, skipping for now", "the video is too big to sync, skipping for now",
} }
@ -704,6 +757,7 @@ func (s *Sync) processVideo(v video) (err error) {
neverRetryFailures := []string{ neverRetryFailures := []string{
"Error extracting sts from embedded url response", "Error extracting sts from embedded url response",
"Unable to extract signature tokens",
"the video is too big to sync, skipping for now", "the video is too big to sync, skipping for now",
} }
if ok && !sv.Published && util.SubstringInSlice(sv.FailureReason, neverRetryFailures) { if ok && !sv.Published && util.SubstringInSlice(sv.FailureReason, neverRetryFailures) {
@ -711,19 +765,6 @@ func (s *Sync) processVideo(v video) (err error) {
return nil return nil
} }
//TODO: remove this after a few runs...
alreadyPublishedOld, err := s.db.IsPublished(v.ID())
if err != nil {
return err
}
//TODO: remove this after a few runs...
if alreadyPublishedOld && !alreadyPublished {
//seems like something in the migration of blobs didn't go perfectly right so warn about it!
SendInfoToSlack("A video that was previously published is on the local database but isn't on the remote db! fix it @Nikooo777! \nchannelID: %s, videoID: %s",
s.YoutubeChannelID, v.ID())
return nil
}
if alreadyPublished { if alreadyPublished {
log.Println(v.ID() + " already published") log.Println(v.ID() + " already published")
return nil return nil