diff --git a/Gopkg.lock b/Gopkg.lock index c8e627f..27863b6 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -2,18 +2,23 @@ [[projects]] + digest = "1:90272eae3bf75d2aa681ff3eee6cf4f49e8b06db533dc9c830ef214e5abbaaf2" name = "github.com/PuerkitoBio/goquery" packages = ["."] + pruneopts = "" revision = "dc2ec5c7ca4d9aae063b79b9f581dd3ea6afd2b2" version = "v1.4.1" [[projects]] + digest = "1:4d0cb5aec47a2aec8b8b211540dc59902f8f00d571281356597b41ae349f4885" name = "github.com/andybalholm/cascadia" packages = ["."] + pruneopts = "" revision = "901648c87902174f774fac311d7f176f8647bdaa" version = "v1.0.0" [[projects]] + digest = "1:d4336a8cea03c9729c9be12c142938a28a0dbac23a316f019f6fee323cf03709" name = "github.com/aws/aws-sdk-go" packages = [ "aws", @@ -26,19 +31,14 @@ "aws/credentials/ec2rolecreds", "aws/credentials/endpointcreds", "aws/credentials/stscreds", - "aws/csm", "aws/defaults", "aws/ec2metadata", "aws/endpoints", "aws/request", "aws/session", "aws/signer/v4", - "internal/sdkio", - "internal/sdkrand", "internal/shareddefaults", "private/protocol", - "private/protocol/eventstream", - "private/protocol/eventstream/eventstreamapi", "private/protocol/query", "private/protocol/query/queryutil", "private/protocol/rest", @@ -47,13 +47,15 @@ "service/s3", "service/s3/s3iface", "service/s3/s3manager", - "service/sts" + "service/sts", ] - revision = "d2d4fca90395039f60821c231da759cab8f318c7" - version = "v1.14.4" + pruneopts = "" + revision = "b69f447375c7fa0047ebcdd8ae5d585d5aac2f71" + version = "v1.10.51" [[projects]] branch = "master" + digest = "1:ea2251fa804d1b978feac8146d751b32ce2017eaf1f2915fde0df389bacaf383" name = "github.com/btcsuite/btcd" packages = [ "btcec", @@ -61,217 +63,311 @@ "chaincfg", "chaincfg/chainhash", "rpcclient", - "wire" + "wire", ] + pruneopts = "" revision = "86fed781132ac890ee03e906e4ecd5d6fa180c64" [[projects]] branch = "master" + digest = "1:30d4a548e09bca4a0c77317c58e7407e2a65c15325e944f9c08a7b7992f8a59e" name = "github.com/btcsuite/btclog" packages = ["."] + pruneopts = "" revision = "84c8d2346e9fc8c7b947e243b9c24e6df9fd206a" [[projects]] branch = "master" + digest = "1:471ae435f9ad7fc2f6b7a2e91ca026a124792859a7033fa60579c3aa618161ed" name = "github.com/btcsuite/btcutil" packages = [ ".", "base58", - "bech32" + "bech32", ] + pruneopts = "" revision = "d4cc87b860166d00d6b5b9e0d3b3d71d6088d4d4" [[projects]] branch = "master" + digest = "1:422f38d57f1bc0fdc34f26d0f1026869a3710400b09b5478c9288efa13573cfa" name = "github.com/btcsuite/go-socks" packages = ["socks"] + pruneopts = "" revision = "4720035b7bfd2a9bb130b1c184f8bbe41b6f0d0f" [[projects]] branch = "master" + digest = "1:57c911bbbf529465cf2ca5d43546cd5875a59054c41e2fe97791419959282aa1" name = "github.com/btcsuite/websocket" packages = ["."] + pruneopts = "" revision = "31079b6807923eb23992c421b114992b95131b55" [[projects]] + digest = "1:0a39ec8bf5629610a4bc7873a92039ee509246da3cef1a0ea60f1ed7e5f9cea5" name = "github.com/davecgh/go-spew" packages = ["spew"] + pruneopts = "" revision = "346938d642f2ec3594ed81d874461961cd0faa76" version = "v1.1.0" [[projects]] + digest = "1:4b5f8c148e7fa094b73bcb6d16ea46eac7fdc726e55b81845ff96e29df534421" name = "github.com/garyburd/redigo" packages = [ "internal", - "redis" + "redis", ] + pruneopts = "" revision = "a69d19351219b6dd56f274f96d85a7014a2ec34e" version = "v1.6.0" [[projects]] + digest = "1:968d8903d598e3fae738325d3410f33f07ea6a2b9ee5591e9c262ee37df6845a" name = "github.com/go-errors/errors" packages = ["."] + pruneopts = "" revision = "a6af135bd4e28680facf08a3d206b454abc877a4" version = "v1.0.1" [[projects]] + branch = "master" + digest = "1:4d5221853226d8d4be594d52d885ddde38170d2e3159b82ed92ecde4dded2304" name = "github.com/go-ini/ini" packages = ["."] - revision = "06f5f3d67269ccec1fe5fe4134ba6e982984f7f5" - version = "v1.37.0" + pruneopts = "" + revision = "5cf292cae48347c2490ac1a58fe36735fb78df7e" [[projects]] + digest = "1:b1d3041d568e065ab4d76f7477844458e9209c0bb241eaccdc0770bf0a13b120" name = "github.com/golang/protobuf" packages = ["proto"] + pruneopts = "" revision = "b4deda0973fb4c70b50d226b1af49f3da59f5265" version = "v1.1.0" [[projects]] + digest = "1:fe1b4d4cbe48c0d55507c55f8663aa4185576cc58fa0c8be03bb8f19dfe17a9c" name = "github.com/gorilla/websocket" packages = ["."] + pruneopts = "" revision = "ea4d1f681babbce9545c9c5f3d5194a789c89f5b" version = "v1.2.0" [[projects]] + digest = "1:870d441fe217b8e689d7949fef6e43efbc787e50f200cb1e70dbca9204a1d6be" name = "github.com/inconshreveable/mousetrap" packages = ["."] + pruneopts = "" revision = "76626ae9c91c4f2a10f34cad8ce83ea42c93bb75" version = "v1.0" [[projects]] + digest = "1:4f767a115bc8e08576f6d38ab73c376fc1b1cd3bb5041171c9e8668cc7739b52" name = "github.com/jmespath/go-jmespath" packages = ["."] + pruneopts = "" revision = "0b12d6b5" [[projects]] branch = "master" + digest = "1:375104fd705791c50351e652a9d80321813fefc4f83a7871cb2f6111a5bc1dc3" name = "github.com/lbryio/lbryschema.go" packages = ["pb"] + pruneopts = "" revision = "185433f2fd0c732547654749b98b37e56223dd22" [[projects]] + digest = "1:daad05ffdae6e2cd9bd9bbc14440e7e8e841037141f26a775a5a31b1b61cb14d" name = "github.com/lbryio/ozzo-validation" packages = ["."] + pruneopts = "" revision = "d1008ad1fd04ceb5faedaf34881df0c504382706" version = "v3.1" [[projects]] branch = "master" + digest = "1:1dee6133ab829c8559a39031ad1e0e3538e4a7b34d3e0509d1fc247737e928c1" name = "github.com/mitchellh/go-ps" packages = ["."] + pruneopts = "" revision = "4fdf99ab29366514c69ccccddab5dc58b8d84062" [[projects]] branch = "master" + digest = "1:eb9117392ee8e7aa44f78e0db603f70b1050ee0ebda4bd40040befb5b218c546" name = "github.com/mitchellh/mapstructure" packages = ["."] + pruneopts = "" revision = "bb74f1db0675b241733089d5a1faa5dd8b0ef57b" [[projects]] - branch = "master" - name = "github.com/nikooo777/ytdl" - packages = ["."] - revision = "64fe19bb69f66746824eb7812caf9f382c0e0bd8" - -[[projects]] + digest = "1:ba7d1dfde0c2142011332bffce4d8468310228afd49dd4425ac59fa9124fb7c4" name = "github.com/nlopes/slack" packages = ["."] + pruneopts = "" revision = "8ab4d0b364ef1e9af5d102531da20d5ec902b6c4" version = "v0.2.0" [[projects]] branch = "master" + digest = "1:1ee326e6da62d87b3a07a9303d2cbb70f974207d14e6d992c61b6e650ff37c50" + name = "github.com/rylio/ytdl" + packages = ["."] + pruneopts = "" + revision = "06f6510946275931157f5fe73f55ec7d6fd65870" + +[[projects]] + branch = "master" + digest = "1:67b7dcb3b7e67cb6f96fb38fe7358bc1210453189da210e40cf357a92d57c1c1" name = "github.com/shopspring/decimal" packages = ["."] + pruneopts = "" revision = "19e3cb6c29303990525b56f51acf77c5630dd88a" [[projects]] branch = "master" + digest = "1:4cb1f758b69097d419a148e64c86b358a4f77c695504f99de1ee86617d64f74e" name = "github.com/sirupsen/logrus" packages = ["."] + pruneopts = "" revision = "ea8897e79973357ba785ac2533559a6297e83c44" [[projects]] + branch = "master" + digest = "1:d0b38ba6da419a6d4380700218eeec8623841d44a856bb57369c172fbf692ab4" name = "github.com/spf13/cast" packages = ["."] + pruneopts = "" revision = "8965335b8c7107321228e3e3702cab9832751bac" - version = "v1.2.0" [[projects]] branch = "master" + digest = "1:7bcdb212f21d3cf318699d50af69a9192ef73fedad0d94d9ed5616f349457881" name = "github.com/spf13/cobra" packages = ["."] + pruneopts = "" revision = "1e58aa3361fd650121dceeedc399e7189c05674a" [[projects]] + digest = "1:8e243c568f36b09031ec18dff5f7d2769dcf5ca4d624ea511c8e3197dc3d352d" name = "github.com/spf13/pflag" packages = ["."] + pruneopts = "" revision = "583c0c0531f06d5278b7d917446061adc344b5cd" version = "v1.0.1" [[projects]] branch = "master" + digest = "1:22d3674d44ee93f52a9c0b6a22d1f736a0ad9ac3f9d2c1ca8648f3c9ce9910bd" name = "github.com/ybbus/jsonrpc" packages = ["."] + pruneopts = "" revision = "2a548b7d822dd62717337a6b1e817fae1b14660a" [[projects]] branch = "master" + digest = "1:3610c577942fbfd2c8975d70a2342bbd13f30cf214237fb8f920c9a6cec0f14a" name = "github.com/zeebo/bencode" packages = ["."] + pruneopts = "" revision = "d522839ac797fc43269dae6a04a1f8be475a915d" [[projects]] branch = "master" + digest = "1:e7dc596c84a908dd326d2c07453307f192976c3edb9482b88290c42076fe378f" name = "golang.org/x/crypto" packages = [ "ripemd160", "sha3", - "ssh/terminal" + "ssh/terminal", ] + pruneopts = "" revision = "8ac0e0d97ce45cd83d1d7243c060cb8461dda5e9" [[projects]] branch = "master" + digest = "1:3edb9c19d0b874999053badbbcc08edab3cde0262d2beb36ad6c0d78391c19ac" name = "golang.org/x/net" packages = [ "context", "context/ctxhttp", "html", - "html/atom" + "html/atom", ] + pruneopts = "" revision = "db08ff08e8622530d9ed3a0e8ac279f6d4c02196" [[projects]] branch = "master" + digest = "1:b1ac49fd3eae66e95230ea3423158b18374a5ad1c53caf89bc7fc1a441e9e0e7" name = "golang.org/x/sys" packages = [ "unix", - "windows" + "windows", ] + pruneopts = "" revision = "bff228c7b664c5fce602223a05fb708fd8654986" [[projects]] branch = "master" + digest = "1:b4f82373e582dff1470e66574ac664b548aa69cffce0943321797fd75f46ee6f" name = "google.golang.org/api" packages = [ "gensupport", "googleapi", "googleapi/internal/uritemplates", "googleapi/transport", - "youtube/v3" + "youtube/v3", ] + pruneopts = "" revision = "ef86ce4234efee96020bde00391d6a9cfae66561" [[projects]] + digest = "1:05eca53b271663de74078b5484b1995a8d56668a51434a698dc5d0863035d575" name = "gopkg.in/nullbio/null.v6" packages = ["convert"] + pruneopts = "" revision = "40264a2e6b7972d183906cf17663983c23231c82" version = "v6.3" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "adebfeec40e94ad17cba1f18d8f47da60af4106a50386ebce51650d036e3d4ac" + input-imports = [ + "github.com/aws/aws-sdk-go/aws", + "github.com/aws/aws-sdk-go/aws/awserr", + "github.com/aws/aws-sdk-go/aws/credentials", + "github.com/aws/aws-sdk-go/aws/session", + "github.com/aws/aws-sdk-go/service/s3", + "github.com/aws/aws-sdk-go/service/s3/s3manager", + "github.com/btcsuite/btcd/chaincfg", + "github.com/btcsuite/btcd/chaincfg/chainhash", + "github.com/btcsuite/btcd/rpcclient", + "github.com/btcsuite/btcutil", + "github.com/btcsuite/btcutil/base58", + "github.com/davecgh/go-spew/spew", + "github.com/garyburd/redigo/redis", + "github.com/go-errors/errors", + "github.com/go-ini/ini", + "github.com/lbryio/lbryschema.go/pb", + "github.com/lbryio/ozzo-validation", + "github.com/mitchellh/go-ps", + "github.com/mitchellh/mapstructure", + "github.com/nlopes/slack", + "github.com/rylio/ytdl", + "github.com/shopspring/decimal", + "github.com/sirupsen/logrus", + "github.com/spf13/cast", + "github.com/spf13/cobra", + "github.com/ybbus/jsonrpc", + "github.com/zeebo/bencode", + "golang.org/x/crypto/ripemd160", + "golang.org/x/crypto/sha3", + "google.golang.org/api/googleapi/transport", + "google.golang.org/api/youtube/v3", + "gopkg.in/nullbio/null.v6/convert", + ] solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index c2db89a..1eeae38 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -12,7 +12,7 @@ [[constraint]] branch = "master" - name = "github.com/nikooo777/ytdl" + name = "github.com/rylio/ytdl" [[constraint]] branch = "master" diff --git a/jsonrpc/daemon.go b/jsonrpc/daemon.go index 07d7374..db7d0c1 100644 --- a/jsonrpc/daemon.go +++ b/jsonrpc/daemon.go @@ -468,3 +468,18 @@ func (d *Client) ClaimListMine() (*ClaimListMineResponse, error) { return response, nil } + +func (d *Client) ClaimAbandon(txID string, nOut int) (*ClaimAbandonResponse, error) { + response := new(ClaimAbandonResponse) + err := d.call(response, "claim_abandon", map[string]interface{}{ + "txid": txID, + "nout": nOut, + }) + if err != nil { + return nil, err + } else if response == nil { + return nil, errors.Err("no response") + } + + return response, nil +} diff --git a/jsonrpc/daemon_types.go b/jsonrpc/daemon_types.go index 84212ee..b4e7bf4 100644 --- a/jsonrpc/daemon_types.go +++ b/jsonrpc/daemon_types.go @@ -243,6 +243,10 @@ type ClaimListResponse struct { } type ClaimListMineResponse []Claim type ClaimShowResponse Claim +type ClaimAbandonResponse struct { + Txid string `json:"txid"` + Fee float64 `json:"fee"` +} type PeerListResponsePeer struct { IP string `json:"host"` diff --git a/ytsync/manager.go b/ytsync/manager.go index f923f4f..8d45e25 100644 --- a/ytsync/manager.go +++ b/ytsync/manager.go @@ -78,8 +78,8 @@ func (s *SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error) "min_videos": {strconv.Itoa(1)}, "after": {strconv.Itoa(int(s.SyncFrom))}, "before": {strconv.Itoa(int(s.SyncUntil))}, - //"sync_server": {s.HostName}, - "channel_id": {s.YoutubeChannelID}, + "sync_server": {s.HostName}, + "channel_id": {s.YoutubeChannelID}, }) defer res.Body.Close() body, _ := ioutil.ReadAll(res.Body) @@ -214,9 +214,6 @@ func (s *SyncManager) Start() error { return errors.Err("Expected 1 channel, %d returned", len(channels)) } lbryChannelName := channels[0].DesiredChannelName - if !s.isWorthProcessing(channels[0]) { - break - } syncs = make([]Sync, 1) syncs[0] = Sync{ YoutubeAPIKey: s.YoutubeAPIKey, @@ -250,9 +247,6 @@ func (s *SyncManager) Start() error { return err } for _, c := range channels { - if !s.isWorthProcessing(c) { - continue - } syncs = append(syncs, Sync{ YoutubeAPIKey: s.YoutubeAPIKey, YoutubeChannelID: c.ChannelId, @@ -312,10 +306,6 @@ func (s *SyncManager) Start() error { return nil } -func (s *SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool { - return channel.TotalVideos > 0 && (channel.SyncServer.IsNull() || channel.SyncServer.String == s.HostName) -} - func (s *SyncManager) checkUsedSpace() error { usedPctile, err := GetUsedSpace(s.BlobsDir) if err != nil { diff --git a/ytsync/redisdb/redisdb.go b/ytsync/redisdb/redisdb.go deleted file mode 100644 index 207d7c2..0000000 --- a/ytsync/redisdb/redisdb.go +++ /dev/null @@ -1,63 +0,0 @@ -package redisdb - -import ( - "time" - - "github.com/lbryio/lbry.go/errors" - - "github.com/garyburd/redigo/redis" -) - -const ( - redisHashKey = "ytsync" - redisSyncedVal = "t" -) - -type DB struct { - pool *redis.Pool -} - -func New() *DB { - var r DB - r.pool = &redis.Pool{ - MaxIdle: 3, - IdleTimeout: 5 * time.Minute, - Dial: func() (redis.Conn, error) { return redis.Dial("tcp", ":6379") }, - TestOnBorrow: func(c redis.Conn, t time.Time) error { - if time.Since(t) < time.Minute { - return nil - } - _, err := c.Do("PING") - return err - }, - } - return &r -} - -func (r DB) IsPublished(id string) (bool, error) { - conn := r.pool.Get() - defer conn.Close() - - alreadyPublished, err := redis.String(conn.Do("HGET", redisHashKey, id)) - if err != nil && err != redis.ErrNil { - return false, errors.Prefix("redis error", err) - - } - - if alreadyPublished == redisSyncedVal { - return true, nil - } - - return false, nil -} - -func (r DB) SetPublished(id string) error { - conn := r.pool.Get() - defer conn.Close() - - _, err := redis.Bool(conn.Do("HSET", redisHashKey, id, redisSyncedVal)) - if err != nil { - return errors.Prefix("redis error", err) - } - return nil -} diff --git a/ytsync/setup.go b/ytsync/setup.go index 6d72a03..113bbb5 100644 --- a/ytsync/setup.go +++ b/ytsync/setup.go @@ -48,7 +48,7 @@ func (s *Sync) walletSetup() error { s.syncedVideosMux.RLock() numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones... s.syncedVideosMux.RUnlock() - log.Debugf("We already published %d videos", numPublished) + log.Debugf("We already allocated credits for %d videos", numPublished) if numOnSource-numPublished > s.Manager.VideosLimit { numOnSource = s.Manager.VideosLimit diff --git a/ytsync/sources/youtubeVideo.go b/ytsync/sources/youtubeVideo.go index 19a5620..d67587d 100644 --- a/ytsync/sources/youtubeVideo.go +++ b/ytsync/sources/youtubeVideo.go @@ -16,7 +16,7 @@ import ( "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/jsonrpc" - "github.com/nikooo777/ytdl" + "github.com/rylio/ytdl" log "github.com/sirupsen/logrus" "google.golang.org/api/youtube/v3" ) @@ -28,6 +28,7 @@ type YoutubeVideo struct { description string playlistPosition int64 size *int64 + maxVideoSize int64 publishedAt time.Time dir string claimNames map[string]bool @@ -121,14 +122,58 @@ func (v *YoutubeVideo) download() error { return err } - var downloadedFile *os.File - downloadedFile, err = os.Create(videoPath) - defer downloadedFile.Close() - if err != nil { - return err + codec := []string{"H.264"} + ext := []string{"mp4"} + + //Filter requires a [] interface{} + codecFilter := make([]interface{}, len(codec)) + for i, v := range codec { + codecFilter[i] = v } - return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[1], downloadedFile) + //Filter requires a [] interface{} + extFilter := make([]interface{}, len(ext)) + for i, v := range ext { + extFilter[i] = v + } + + formats := videoInfo.Formats.Filter(ytdl.FormatVideoEncodingKey, codecFilter).Filter(ytdl.FormatExtensionKey, extFilter) + if len(formats) == 0 { + return errors.Err("no compatible format available for this video") + } + maxRetryAttempts := 5 + for i := 0; i < len(formats) && i < maxRetryAttempts; i++ { + formatIndex := i + if i == maxRetryAttempts-1 { + formatIndex = len(formats) - 1 + } + var downloadedFile *os.File + downloadedFile, err = os.Create(videoPath) + if err != nil { + return err + } + err = videoInfo.Download(formats[formatIndex], downloadedFile) + downloadedFile.Close() + if err != nil { + break + } + fi, err := os.Stat(v.getFilename()) + if err != nil { + return err + } + videoSize := fi.Size() + v.size = &videoSize + + if videoSize > v.maxVideoSize { + //delete the video and ignore the error + _ = v.delete() + err = errors.Err("file is too big and there is no other format available") + } else { + break + } + } + + return err } func (v *YoutubeVideo) videoDir() string { @@ -214,6 +259,8 @@ func (v *YoutubeVideo) Size() *int64 { func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) { v.claimNames = claimNames v.syncedVideosMux = syncedVideosMux + v.maxVideoSize = int64(maxVideoSize) * 1024 * 1024 + //download and thumbnail can be done in parallel err := v.download() if err != nil { @@ -221,19 +268,6 @@ func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount } log.Debugln("Downloaded " + v.id) - fi, err := os.Stat(v.getFilename()) - if err != nil { - return nil, err - } - videoSize := fi.Size() - v.size = &videoSize - - if videoSize > int64(maxVideoSize)*1024*1024 { - //delete the video and ignore the error - _ = v.delete() - return nil, errors.Err("the video is too big to sync, skipping for now") - } - err = v.triggerThumbnailSave() if err != nil { return nil, errors.Prefix("thumbnail error", err) diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 9ece285..59c3864 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -27,7 +27,6 @@ import ( "github.com/lbryio/lbry.go/jsonrpc" "github.com/lbryio/lbry.go/stop" "github.com/lbryio/lbry.go/util" - "github.com/lbryio/lbry.go/ytsync/redisdb" "github.com/lbryio/lbry.go/ytsync/sources" "github.com/mitchellh/go-ps" log "github.com/sirupsen/logrus" @@ -77,7 +76,6 @@ type Sync struct { daemon *jsonrpc.Client claimAddress string videoDirectory string - db *redisdb.DB syncedVideosMux *sync.RWMutex syncedVideos map[string]syncedVideo claimNames map[string]bool @@ -222,6 +220,18 @@ func (s *Sync) uploadWallet() error { return os.Remove(defaultWalletDir) } +func (s *Sync) setStatusSyncing() error { + syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "") + if err != nil { + return err + } + s.syncedVideosMux.Lock() + s.syncedVideos = syncedVideos + s.claimNames = claimNames + s.syncedVideosMux.Unlock() + return nil +} + func (s *Sync) FullCycle() (e error) { if os.Getenv("HOME") == "" { return errors.Err("no $HOME env var found") @@ -231,7 +241,6 @@ func (s *Sync) FullCycle() (e error) { } s.syncedVideosMux = &sync.RWMutex{} s.walletMux = &sync.Mutex{} - s.db = redisdb.New() s.grp = stop.New() s.queue = make(chan video) interruptChan := make(chan os.Signal, 1) @@ -242,16 +251,12 @@ func (s *Sync) FullCycle() (e error) { log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") s.grp.Stop() }() - syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "") + err := s.setStatusSyncing() if err != nil { return err } - s.syncedVideosMux.Lock() - s.syncedVideos = syncedVideos - s.claimNames = claimNames - s.syncedVideosMux.Unlock() - defer s.updateChannelStatus(&e) + defer s.setChannelTerminationStatus(&e) err = s.downloadWallet() if err != nil && err.Error() != "wallet not on S3" { @@ -296,7 +301,7 @@ func (s *Sync) FullCycle() (e error) { return nil } -func (s *Sync) updateChannelStatus(e *error) { +func (s *Sync) setChannelTerminationStatus(e *error) { if *e != nil { //conditions for which a channel shouldn't be marked as failed noFailConditions := []string{ @@ -321,20 +326,20 @@ func (s *Sync) updateChannelStatus(e *error) { } func (s *Sync) waitForDaemonStart() error { - for { select { case <-s.grp.Ch(): return errors.Err("interrupted during daemon startup") default: s, err := s.daemon.Status() - if err == nil && s.StartupStatus.Wallet { + if err == nil && s.StartupStatus.Wallet && s.StartupStatus.FileManager { return nil } time.Sleep(5 * time.Second) } } } + func (s *Sync) stopAndUploadWallet(e *error) { log.Printf("Stopping daemon") shutdownErr := stopDaemonViaSystemd() @@ -365,40 +370,68 @@ func logShutdownError(shutdownErr error) { SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR") } -func hasDupes(claims []jsonrpc.Claim) (bool, error) { - videoIDs := make(map[string]interface{}) +// fixDupes abandons duplicate claims +func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) { + abandonedClaims := false + videoIDs := make(map[string]jsonrpc.Claim) for _, c := range claims { if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil { continue } if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil { - return false, errors.Err("something is wrong with the this claim: %s", c.ClaimID) + return false, errors.Err("something is wrong with this claim: %s", c.ClaimID) } tn := *c.Value.Stream.Metadata.Thumbnail - videoID := tn[:strings.LastIndex(tn, "/")+1] - _, ok := videoIDs[videoID] - if !ok { - videoIDs[videoID] = nil + videoID := tn[strings.LastIndex(tn, "/")+1:] + + log.Infof("claimid: %s, claimName: %s, videoID: %s", c.ClaimID, c.Name, videoID) + cl, ok := videoIDs[videoID] + if !ok || cl.ClaimID == c.ClaimID { + videoIDs[videoID] = c continue } - return true, nil + // only keep the most recent one + claimToAbandon := c + videoIDs[videoID] = cl + if c.Height > cl.Height { + claimToAbandon = cl + videoIDs[videoID] = c + } + _, err := s.daemon.ClaimAbandon(claimToAbandon.Txid, claimToAbandon.Nout) + if err != nil { + return true, err + } + log.Debugf("abandoning %+v", claimToAbandon) + abandonedClaims = true + //return true, nil } - return false, nil + return abandonedClaims, nil } -//publishesCount counts the amount of videos published so far -func publishesCount(claims []jsonrpc.Claim) (int, error) { +//updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published +func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err error) { count := 0 for _, c := range claims { if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil { continue } if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil { - return count, errors.Err("something is wrong with the this claim: %s", c.ClaimID) + return count, fixed, errors.Err("something is wrong with the this claim: %s", c.ClaimID) } - count++ + //check if claimID is in remote db + tn := *c.Value.Stream.Metadata.Thumbnail + videoID := tn[strings.LastIndex(tn, "/")+1:] + pv, ok := s.syncedVideos[videoID] + if !ok || pv.ClaimName != c.Name { + fixed++ + err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil) + if err != nil { + return total, fixed, err + } + } + total++ } - return count, nil + return total, fixed, nil } func (s *Sync) doSync() error { @@ -407,28 +440,47 @@ func (s *Sync) doSync() error { if err != nil { return errors.Prefix("cannot list claims: ", err) } - hasDupes, err := hasDupes(*claims) + hasDupes, err := s.fixDupes(*claims) if err != nil { return errors.Prefix("error checking for duplicates: ", err) } if hasDupes { - return errors.Err("channel has duplicates! Manual fix required") + SendInfoToSlack("Channel had dupes and was fixed!") + err = s.waitForNewBlock() + if err != nil { + return err + } + claims, err = s.daemon.ClaimListMine() + if err != nil { + return errors.Prefix("cannot list claims: ", err) + } } - pubsOnWallet, err := publishesCount(*claims) + + pubsOnWallet, nFixed, err := s.updateRemoteDB(*claims) if err != nil { return errors.Prefix("error counting claims: ", err) } + if nFixed > 0 { + err := s.setStatusSyncing() + if err != nil { + return err + } + SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed) + + } pubsOnDB := 0 for _, sv := range s.syncedVideos { if sv.Published { pubsOnDB++ } } - if pubsOnWallet > pubsOnDB { + + if pubsOnWallet > pubsOnDB { //This case should never happen + SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) return errors.Err("not all published videos are in the database") } if pubsOnWallet < pubsOnDB { - SendInfoToSlack("We're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) + SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) } err = s.walletSetup() if err != nil { @@ -509,6 +561,7 @@ func (s *Sync) startWorker(workerNum int) { "Playback on other websites has been disabled by the video owner", "Error in daemon: Cannot publish empty file", "Error extracting sts from embedded url response", + "Unable to extract signature tokens", "Client.Timeout exceeded while awaiting headers)", "the video is too big to sync, skipping for now", } @@ -704,6 +757,7 @@ func (s *Sync) processVideo(v video) (err error) { neverRetryFailures := []string{ "Error extracting sts from embedded url response", + "Unable to extract signature tokens", "the video is too big to sync, skipping for now", } if ok && !sv.Published && util.SubstringInSlice(sv.FailureReason, neverRetryFailures) { @@ -711,19 +765,6 @@ func (s *Sync) processVideo(v video) (err error) { return nil } - //TODO: remove this after a few runs... - alreadyPublishedOld, err := s.db.IsPublished(v.ID()) - if err != nil { - return err - } - //TODO: remove this after a few runs... - if alreadyPublishedOld && !alreadyPublished { - //seems like something in the migration of blobs didn't go perfectly right so warn about it! - SendInfoToSlack("A video that was previously published is on the local database but isn't on the remote db! fix it @Nikooo777! \nchannelID: %s, videoID: %s", - s.YoutubeChannelID, v.ID()) - return nil - } - if alreadyPublished { log.Println(v.ID() + " already published") return nil