Merge pull request #39 from lbryio/drop-sync-server-dep

Drop sync_server dependency & Update ytdl lib
This commit is contained in:
Niko 2018-09-26 22:29:10 +02:00 committed by GitHub
commit 52d087e920
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 283 additions and 166 deletions

146
Gopkg.lock generated
View file

@ -2,18 +2,23 @@
[[projects]]
digest = "1:90272eae3bf75d2aa681ff3eee6cf4f49e8b06db533dc9c830ef214e5abbaaf2"
name = "github.com/PuerkitoBio/goquery"
packages = ["."]
pruneopts = ""
revision = "dc2ec5c7ca4d9aae063b79b9f581dd3ea6afd2b2"
version = "v1.4.1"
[[projects]]
digest = "1:4d0cb5aec47a2aec8b8b211540dc59902f8f00d571281356597b41ae349f4885"
name = "github.com/andybalholm/cascadia"
packages = ["."]
pruneopts = ""
revision = "901648c87902174f774fac311d7f176f8647bdaa"
version = "v1.0.0"
[[projects]]
digest = "1:d4336a8cea03c9729c9be12c142938a28a0dbac23a316f019f6fee323cf03709"
name = "github.com/aws/aws-sdk-go"
packages = [
"aws",
@ -26,19 +31,14 @@
"aws/credentials/ec2rolecreds",
"aws/credentials/endpointcreds",
"aws/credentials/stscreds",
"aws/csm",
"aws/defaults",
"aws/ec2metadata",
"aws/endpoints",
"aws/request",
"aws/session",
"aws/signer/v4",
"internal/sdkio",
"internal/sdkrand",
"internal/shareddefaults",
"private/protocol",
"private/protocol/eventstream",
"private/protocol/eventstream/eventstreamapi",
"private/protocol/query",
"private/protocol/query/queryutil",
"private/protocol/rest",
@ -47,13 +47,15 @@
"service/s3",
"service/s3/s3iface",
"service/s3/s3manager",
"service/sts"
"service/sts",
]
revision = "d2d4fca90395039f60821c231da759cab8f318c7"
version = "v1.14.4"
pruneopts = ""
revision = "b69f447375c7fa0047ebcdd8ae5d585d5aac2f71"
version = "v1.10.51"
[[projects]]
branch = "master"
digest = "1:ea2251fa804d1b978feac8146d751b32ce2017eaf1f2915fde0df389bacaf383"
name = "github.com/btcsuite/btcd"
packages = [
"btcec",
@ -61,217 +63,311 @@
"chaincfg",
"chaincfg/chainhash",
"rpcclient",
"wire"
"wire",
]
pruneopts = ""
revision = "86fed781132ac890ee03e906e4ecd5d6fa180c64"
[[projects]]
branch = "master"
digest = "1:30d4a548e09bca4a0c77317c58e7407e2a65c15325e944f9c08a7b7992f8a59e"
name = "github.com/btcsuite/btclog"
packages = ["."]
pruneopts = ""
revision = "84c8d2346e9fc8c7b947e243b9c24e6df9fd206a"
[[projects]]
branch = "master"
digest = "1:471ae435f9ad7fc2f6b7a2e91ca026a124792859a7033fa60579c3aa618161ed"
name = "github.com/btcsuite/btcutil"
packages = [
".",
"base58",
"bech32"
"bech32",
]
pruneopts = ""
revision = "d4cc87b860166d00d6b5b9e0d3b3d71d6088d4d4"
[[projects]]
branch = "master"
digest = "1:422f38d57f1bc0fdc34f26d0f1026869a3710400b09b5478c9288efa13573cfa"
name = "github.com/btcsuite/go-socks"
packages = ["socks"]
pruneopts = ""
revision = "4720035b7bfd2a9bb130b1c184f8bbe41b6f0d0f"
[[projects]]
branch = "master"
digest = "1:57c911bbbf529465cf2ca5d43546cd5875a59054c41e2fe97791419959282aa1"
name = "github.com/btcsuite/websocket"
packages = ["."]
pruneopts = ""
revision = "31079b6807923eb23992c421b114992b95131b55"
[[projects]]
digest = "1:0a39ec8bf5629610a4bc7873a92039ee509246da3cef1a0ea60f1ed7e5f9cea5"
name = "github.com/davecgh/go-spew"
packages = ["spew"]
pruneopts = ""
revision = "346938d642f2ec3594ed81d874461961cd0faa76"
version = "v1.1.0"
[[projects]]
digest = "1:4b5f8c148e7fa094b73bcb6d16ea46eac7fdc726e55b81845ff96e29df534421"
name = "github.com/garyburd/redigo"
packages = [
"internal",
"redis"
"redis",
]
pruneopts = ""
revision = "a69d19351219b6dd56f274f96d85a7014a2ec34e"
version = "v1.6.0"
[[projects]]
digest = "1:968d8903d598e3fae738325d3410f33f07ea6a2b9ee5591e9c262ee37df6845a"
name = "github.com/go-errors/errors"
packages = ["."]
pruneopts = ""
revision = "a6af135bd4e28680facf08a3d206b454abc877a4"
version = "v1.0.1"
[[projects]]
branch = "master"
digest = "1:4d5221853226d8d4be594d52d885ddde38170d2e3159b82ed92ecde4dded2304"
name = "github.com/go-ini/ini"
packages = ["."]
revision = "06f5f3d67269ccec1fe5fe4134ba6e982984f7f5"
version = "v1.37.0"
pruneopts = ""
revision = "5cf292cae48347c2490ac1a58fe36735fb78df7e"
[[projects]]
digest = "1:b1d3041d568e065ab4d76f7477844458e9209c0bb241eaccdc0770bf0a13b120"
name = "github.com/golang/protobuf"
packages = ["proto"]
pruneopts = ""
revision = "b4deda0973fb4c70b50d226b1af49f3da59f5265"
version = "v1.1.0"
[[projects]]
digest = "1:fe1b4d4cbe48c0d55507c55f8663aa4185576cc58fa0c8be03bb8f19dfe17a9c"
name = "github.com/gorilla/websocket"
packages = ["."]
pruneopts = ""
revision = "ea4d1f681babbce9545c9c5f3d5194a789c89f5b"
version = "v1.2.0"
[[projects]]
digest = "1:870d441fe217b8e689d7949fef6e43efbc787e50f200cb1e70dbca9204a1d6be"
name = "github.com/inconshreveable/mousetrap"
packages = ["."]
pruneopts = ""
revision = "76626ae9c91c4f2a10f34cad8ce83ea42c93bb75"
version = "v1.0"
[[projects]]
digest = "1:4f767a115bc8e08576f6d38ab73c376fc1b1cd3bb5041171c9e8668cc7739b52"
name = "github.com/jmespath/go-jmespath"
packages = ["."]
pruneopts = ""
revision = "0b12d6b5"
[[projects]]
branch = "master"
digest = "1:375104fd705791c50351e652a9d80321813fefc4f83a7871cb2f6111a5bc1dc3"
name = "github.com/lbryio/lbryschema.go"
packages = ["pb"]
pruneopts = ""
revision = "185433f2fd0c732547654749b98b37e56223dd22"
[[projects]]
digest = "1:daad05ffdae6e2cd9bd9bbc14440e7e8e841037141f26a775a5a31b1b61cb14d"
name = "github.com/lbryio/ozzo-validation"
packages = ["."]
pruneopts = ""
revision = "d1008ad1fd04ceb5faedaf34881df0c504382706"
version = "v3.1"
[[projects]]
branch = "master"
digest = "1:1dee6133ab829c8559a39031ad1e0e3538e4a7b34d3e0509d1fc247737e928c1"
name = "github.com/mitchellh/go-ps"
packages = ["."]
pruneopts = ""
revision = "4fdf99ab29366514c69ccccddab5dc58b8d84062"
[[projects]]
branch = "master"
digest = "1:eb9117392ee8e7aa44f78e0db603f70b1050ee0ebda4bd40040befb5b218c546"
name = "github.com/mitchellh/mapstructure"
packages = ["."]
pruneopts = ""
revision = "bb74f1db0675b241733089d5a1faa5dd8b0ef57b"
[[projects]]
branch = "master"
name = "github.com/nikooo777/ytdl"
packages = ["."]
revision = "64fe19bb69f66746824eb7812caf9f382c0e0bd8"
[[projects]]
digest = "1:ba7d1dfde0c2142011332bffce4d8468310228afd49dd4425ac59fa9124fb7c4"
name = "github.com/nlopes/slack"
packages = ["."]
pruneopts = ""
revision = "8ab4d0b364ef1e9af5d102531da20d5ec902b6c4"
version = "v0.2.0"
[[projects]]
branch = "master"
digest = "1:1ee326e6da62d87b3a07a9303d2cbb70f974207d14e6d992c61b6e650ff37c50"
name = "github.com/rylio/ytdl"
packages = ["."]
pruneopts = ""
revision = "06f6510946275931157f5fe73f55ec7d6fd65870"
[[projects]]
branch = "master"
digest = "1:67b7dcb3b7e67cb6f96fb38fe7358bc1210453189da210e40cf357a92d57c1c1"
name = "github.com/shopspring/decimal"
packages = ["."]
pruneopts = ""
revision = "19e3cb6c29303990525b56f51acf77c5630dd88a"
[[projects]]
branch = "master"
digest = "1:4cb1f758b69097d419a148e64c86b358a4f77c695504f99de1ee86617d64f74e"
name = "github.com/sirupsen/logrus"
packages = ["."]
pruneopts = ""
revision = "ea8897e79973357ba785ac2533559a6297e83c44"
[[projects]]
branch = "master"
digest = "1:d0b38ba6da419a6d4380700218eeec8623841d44a856bb57369c172fbf692ab4"
name = "github.com/spf13/cast"
packages = ["."]
pruneopts = ""
revision = "8965335b8c7107321228e3e3702cab9832751bac"
version = "v1.2.0"
[[projects]]
branch = "master"
digest = "1:7bcdb212f21d3cf318699d50af69a9192ef73fedad0d94d9ed5616f349457881"
name = "github.com/spf13/cobra"
packages = ["."]
pruneopts = ""
revision = "1e58aa3361fd650121dceeedc399e7189c05674a"
[[projects]]
digest = "1:8e243c568f36b09031ec18dff5f7d2769dcf5ca4d624ea511c8e3197dc3d352d"
name = "github.com/spf13/pflag"
packages = ["."]
pruneopts = ""
revision = "583c0c0531f06d5278b7d917446061adc344b5cd"
version = "v1.0.1"
[[projects]]
branch = "master"
digest = "1:22d3674d44ee93f52a9c0b6a22d1f736a0ad9ac3f9d2c1ca8648f3c9ce9910bd"
name = "github.com/ybbus/jsonrpc"
packages = ["."]
pruneopts = ""
revision = "2a548b7d822dd62717337a6b1e817fae1b14660a"
[[projects]]
branch = "master"
digest = "1:3610c577942fbfd2c8975d70a2342bbd13f30cf214237fb8f920c9a6cec0f14a"
name = "github.com/zeebo/bencode"
packages = ["."]
pruneopts = ""
revision = "d522839ac797fc43269dae6a04a1f8be475a915d"
[[projects]]
branch = "master"
digest = "1:e7dc596c84a908dd326d2c07453307f192976c3edb9482b88290c42076fe378f"
name = "golang.org/x/crypto"
packages = [
"ripemd160",
"sha3",
"ssh/terminal"
"ssh/terminal",
]
pruneopts = ""
revision = "8ac0e0d97ce45cd83d1d7243c060cb8461dda5e9"
[[projects]]
branch = "master"
digest = "1:3edb9c19d0b874999053badbbcc08edab3cde0262d2beb36ad6c0d78391c19ac"
name = "golang.org/x/net"
packages = [
"context",
"context/ctxhttp",
"html",
"html/atom"
"html/atom",
]
pruneopts = ""
revision = "db08ff08e8622530d9ed3a0e8ac279f6d4c02196"
[[projects]]
branch = "master"
digest = "1:b1ac49fd3eae66e95230ea3423158b18374a5ad1c53caf89bc7fc1a441e9e0e7"
name = "golang.org/x/sys"
packages = [
"unix",
"windows"
"windows",
]
pruneopts = ""
revision = "bff228c7b664c5fce602223a05fb708fd8654986"
[[projects]]
branch = "master"
digest = "1:b4f82373e582dff1470e66574ac664b548aa69cffce0943321797fd75f46ee6f"
name = "google.golang.org/api"
packages = [
"gensupport",
"googleapi",
"googleapi/internal/uritemplates",
"googleapi/transport",
"youtube/v3"
"youtube/v3",
]
pruneopts = ""
revision = "ef86ce4234efee96020bde00391d6a9cfae66561"
[[projects]]
digest = "1:05eca53b271663de74078b5484b1995a8d56668a51434a698dc5d0863035d575"
name = "gopkg.in/nullbio/null.v6"
packages = ["convert"]
pruneopts = ""
revision = "40264a2e6b7972d183906cf17663983c23231c82"
version = "v6.3"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "adebfeec40e94ad17cba1f18d8f47da60af4106a50386ebce51650d036e3d4ac"
input-imports = [
"github.com/aws/aws-sdk-go/aws",
"github.com/aws/aws-sdk-go/aws/awserr",
"github.com/aws/aws-sdk-go/aws/credentials",
"github.com/aws/aws-sdk-go/aws/session",
"github.com/aws/aws-sdk-go/service/s3",
"github.com/aws/aws-sdk-go/service/s3/s3manager",
"github.com/btcsuite/btcd/chaincfg",
"github.com/btcsuite/btcd/chaincfg/chainhash",
"github.com/btcsuite/btcd/rpcclient",
"github.com/btcsuite/btcutil",
"github.com/btcsuite/btcutil/base58",
"github.com/davecgh/go-spew/spew",
"github.com/garyburd/redigo/redis",
"github.com/go-errors/errors",
"github.com/go-ini/ini",
"github.com/lbryio/lbryschema.go/pb",
"github.com/lbryio/ozzo-validation",
"github.com/mitchellh/go-ps",
"github.com/mitchellh/mapstructure",
"github.com/nlopes/slack",
"github.com/rylio/ytdl",
"github.com/shopspring/decimal",
"github.com/sirupsen/logrus",
"github.com/spf13/cast",
"github.com/spf13/cobra",
"github.com/ybbus/jsonrpc",
"github.com/zeebo/bencode",
"golang.org/x/crypto/ripemd160",
"golang.org/x/crypto/sha3",
"google.golang.org/api/googleapi/transport",
"google.golang.org/api/youtube/v3",
"gopkg.in/nullbio/null.v6/convert",
]
solver-name = "gps-cdcl"
solver-version = 1

View file

@ -12,7 +12,7 @@
[[constraint]]
branch = "master"
name = "github.com/nikooo777/ytdl"
name = "github.com/rylio/ytdl"
[[constraint]]
branch = "master"

View file

@ -468,3 +468,18 @@ func (d *Client) ClaimListMine() (*ClaimListMineResponse, error) {
return response, nil
}
func (d *Client) ClaimAbandon(txID string, nOut int) (*ClaimAbandonResponse, error) {
response := new(ClaimAbandonResponse)
err := d.call(response, "claim_abandon", map[string]interface{}{
"txid": txID,
"nout": nOut,
})
if err != nil {
return nil, err
} else if response == nil {
return nil, errors.Err("no response")
}
return response, nil
}

View file

@ -243,6 +243,10 @@ type ClaimListResponse struct {
}
type ClaimListMineResponse []Claim
type ClaimShowResponse Claim
type ClaimAbandonResponse struct {
Txid string `json:"txid"`
Fee float64 `json:"fee"`
}
type PeerListResponsePeer struct {
IP string `json:"host"`

View file

@ -78,8 +78,8 @@ func (s *SyncManager) fetchChannels(status string) ([]apiYoutubeChannel, error)
"min_videos": {strconv.Itoa(1)},
"after": {strconv.Itoa(int(s.SyncFrom))},
"before": {strconv.Itoa(int(s.SyncUntil))},
//"sync_server": {s.HostName},
"channel_id": {s.YoutubeChannelID},
"sync_server": {s.HostName},
"channel_id": {s.YoutubeChannelID},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
@ -214,9 +214,6 @@ func (s *SyncManager) Start() error {
return errors.Err("Expected 1 channel, %d returned", len(channels))
}
lbryChannelName := channels[0].DesiredChannelName
if !s.isWorthProcessing(channels[0]) {
break
}
syncs = make([]Sync, 1)
syncs[0] = Sync{
YoutubeAPIKey: s.YoutubeAPIKey,
@ -250,9 +247,6 @@ func (s *SyncManager) Start() error {
return err
}
for _, c := range channels {
if !s.isWorthProcessing(c) {
continue
}
syncs = append(syncs, Sync{
YoutubeAPIKey: s.YoutubeAPIKey,
YoutubeChannelID: c.ChannelId,
@ -312,10 +306,6 @@ func (s *SyncManager) Start() error {
return nil
}
func (s *SyncManager) isWorthProcessing(channel apiYoutubeChannel) bool {
return channel.TotalVideos > 0 && (channel.SyncServer.IsNull() || channel.SyncServer.String == s.HostName)
}
func (s *SyncManager) checkUsedSpace() error {
usedPctile, err := GetUsedSpace(s.BlobsDir)
if err != nil {

View file

@ -1,63 +0,0 @@
package redisdb
import (
"time"
"github.com/lbryio/lbry.go/errors"
"github.com/garyburd/redigo/redis"
)
const (
redisHashKey = "ytsync"
redisSyncedVal = "t"
)
type DB struct {
pool *redis.Pool
}
func New() *DB {
var r DB
r.pool = &redis.Pool{
MaxIdle: 3,
IdleTimeout: 5 * time.Minute,
Dial: func() (redis.Conn, error) { return redis.Dial("tcp", ":6379") },
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return err
},
}
return &r
}
func (r DB) IsPublished(id string) (bool, error) {
conn := r.pool.Get()
defer conn.Close()
alreadyPublished, err := redis.String(conn.Do("HGET", redisHashKey, id))
if err != nil && err != redis.ErrNil {
return false, errors.Prefix("redis error", err)
}
if alreadyPublished == redisSyncedVal {
return true, nil
}
return false, nil
}
func (r DB) SetPublished(id string) error {
conn := r.pool.Get()
defer conn.Close()
_, err := redis.Bool(conn.Do("HSET", redisHashKey, id, redisSyncedVal))
if err != nil {
return errors.Prefix("redis error", err)
}
return nil
}

View file

@ -48,7 +48,7 @@ func (s *Sync) walletSetup() error {
s.syncedVideosMux.RLock()
numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
s.syncedVideosMux.RUnlock()
log.Debugf("We already published %d videos", numPublished)
log.Debugf("We already allocated credits for %d videos", numPublished)
if numOnSource-numPublished > s.Manager.VideosLimit {
numOnSource = s.Manager.VideosLimit

View file

@ -16,7 +16,7 @@ import (
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/nikooo777/ytdl"
"github.com/rylio/ytdl"
log "github.com/sirupsen/logrus"
"google.golang.org/api/youtube/v3"
)
@ -28,6 +28,7 @@ type YoutubeVideo struct {
description string
playlistPosition int64
size *int64
maxVideoSize int64
publishedAt time.Time
dir string
claimNames map[string]bool
@ -121,14 +122,58 @@ func (v *YoutubeVideo) download() error {
return err
}
var downloadedFile *os.File
downloadedFile, err = os.Create(videoPath)
defer downloadedFile.Close()
if err != nil {
return err
codec := []string{"H.264"}
ext := []string{"mp4"}
//Filter requires a [] interface{}
codecFilter := make([]interface{}, len(codec))
for i, v := range codec {
codecFilter[i] = v
}
return videoInfo.Download(videoInfo.Formats.Best(ytdl.FormatAudioEncodingKey)[1], downloadedFile)
//Filter requires a [] interface{}
extFilter := make([]interface{}, len(ext))
for i, v := range ext {
extFilter[i] = v
}
formats := videoInfo.Formats.Filter(ytdl.FormatVideoEncodingKey, codecFilter).Filter(ytdl.FormatExtensionKey, extFilter)
if len(formats) == 0 {
return errors.Err("no compatible format available for this video")
}
maxRetryAttempts := 5
for i := 0; i < len(formats) && i < maxRetryAttempts; i++ {
formatIndex := i
if i == maxRetryAttempts-1 {
formatIndex = len(formats) - 1
}
var downloadedFile *os.File
downloadedFile, err = os.Create(videoPath)
if err != nil {
return err
}
err = videoInfo.Download(formats[formatIndex], downloadedFile)
downloadedFile.Close()
if err != nil {
break
}
fi, err := os.Stat(v.getFilename())
if err != nil {
return err
}
videoSize := fi.Size()
v.size = &videoSize
if videoSize > v.maxVideoSize {
//delete the video and ignore the error
_ = v.delete()
err = errors.Err("file is too big and there is no other format available")
} else {
break
}
}
return err
}
func (v *YoutubeVideo) videoDir() string {
@ -214,6 +259,8 @@ func (v *YoutubeVideo) Size() *int64 {
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, claimNames map[string]bool, syncedVideosMux *sync.RWMutex) (*SyncSummary, error) {
v.claimNames = claimNames
v.syncedVideosMux = syncedVideosMux
v.maxVideoSize = int64(maxVideoSize) * 1024 * 1024
//download and thumbnail can be done in parallel
err := v.download()
if err != nil {
@ -221,19 +268,6 @@ func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount
}
log.Debugln("Downloaded " + v.id)
fi, err := os.Stat(v.getFilename())
if err != nil {
return nil, err
}
videoSize := fi.Size()
v.size = &videoSize
if videoSize > int64(maxVideoSize)*1024*1024 {
//delete the video and ignore the error
_ = v.delete()
return nil, errors.Err("the video is too big to sync, skipping for now")
}
err = v.triggerThumbnailSave()
if err != nil {
return nil, errors.Prefix("thumbnail error", err)

View file

@ -27,7 +27,6 @@ import (
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stop"
"github.com/lbryio/lbry.go/util"
"github.com/lbryio/lbry.go/ytsync/redisdb"
"github.com/lbryio/lbry.go/ytsync/sources"
"github.com/mitchellh/go-ps"
log "github.com/sirupsen/logrus"
@ -77,7 +76,6 @@ type Sync struct {
daemon *jsonrpc.Client
claimAddress string
videoDirectory string
db *redisdb.DB
syncedVideosMux *sync.RWMutex
syncedVideos map[string]syncedVideo
claimNames map[string]bool
@ -222,6 +220,18 @@ func (s *Sync) uploadWallet() error {
return os.Remove(defaultWalletDir)
}
func (s *Sync) setStatusSyncing() error {
syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
if err != nil {
return err
}
s.syncedVideosMux.Lock()
s.syncedVideos = syncedVideos
s.claimNames = claimNames
s.syncedVideosMux.Unlock()
return nil
}
func (s *Sync) FullCycle() (e error) {
if os.Getenv("HOME") == "" {
return errors.Err("no $HOME env var found")
@ -231,7 +241,6 @@ func (s *Sync) FullCycle() (e error) {
}
s.syncedVideosMux = &sync.RWMutex{}
s.walletMux = &sync.Mutex{}
s.db = redisdb.New()
s.grp = stop.New()
s.queue = make(chan video)
interruptChan := make(chan os.Signal, 1)
@ -242,16 +251,12 @@ func (s *Sync) FullCycle() (e error) {
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
s.grp.Stop()
}()
syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
err := s.setStatusSyncing()
if err != nil {
return err
}
s.syncedVideosMux.Lock()
s.syncedVideos = syncedVideos
s.claimNames = claimNames
s.syncedVideosMux.Unlock()
defer s.updateChannelStatus(&e)
defer s.setChannelTerminationStatus(&e)
err = s.downloadWallet()
if err != nil && err.Error() != "wallet not on S3" {
@ -296,7 +301,7 @@ func (s *Sync) FullCycle() (e error) {
return nil
}
func (s *Sync) updateChannelStatus(e *error) {
func (s *Sync) setChannelTerminationStatus(e *error) {
if *e != nil {
//conditions for which a channel shouldn't be marked as failed
noFailConditions := []string{
@ -321,20 +326,20 @@ func (s *Sync) updateChannelStatus(e *error) {
}
func (s *Sync) waitForDaemonStart() error {
for {
select {
case <-s.grp.Ch():
return errors.Err("interrupted during daemon startup")
default:
s, err := s.daemon.Status()
if err == nil && s.StartupStatus.Wallet {
if err == nil && s.StartupStatus.Wallet && s.StartupStatus.FileManager {
return nil
}
time.Sleep(5 * time.Second)
}
}
}
func (s *Sync) stopAndUploadWallet(e *error) {
log.Printf("Stopping daemon")
shutdownErr := stopDaemonViaSystemd()
@ -365,40 +370,68 @@ func logShutdownError(shutdownErr error) {
SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
}
func hasDupes(claims []jsonrpc.Claim) (bool, error) {
videoIDs := make(map[string]interface{})
// fixDupes abandons duplicate claims
func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) {
abandonedClaims := false
videoIDs := make(map[string]jsonrpc.Claim)
for _, c := range claims {
if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil {
continue
}
if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil {
return false, errors.Err("something is wrong with the this claim: %s", c.ClaimID)
return false, errors.Err("something is wrong with this claim: %s", c.ClaimID)
}
tn := *c.Value.Stream.Metadata.Thumbnail
videoID := tn[:strings.LastIndex(tn, "/")+1]
_, ok := videoIDs[videoID]
if !ok {
videoIDs[videoID] = nil
videoID := tn[strings.LastIndex(tn, "/")+1:]
log.Infof("claimid: %s, claimName: %s, videoID: %s", c.ClaimID, c.Name, videoID)
cl, ok := videoIDs[videoID]
if !ok || cl.ClaimID == c.ClaimID {
videoIDs[videoID] = c
continue
}
return true, nil
// only keep the most recent one
claimToAbandon := c
videoIDs[videoID] = cl
if c.Height > cl.Height {
claimToAbandon = cl
videoIDs[videoID] = c
}
_, err := s.daemon.ClaimAbandon(claimToAbandon.Txid, claimToAbandon.Nout)
if err != nil {
return true, err
}
log.Debugf("abandoning %+v", claimToAbandon)
abandonedClaims = true
//return true, nil
}
return false, nil
return abandonedClaims, nil
}
//publishesCount counts the amount of videos published so far
func publishesCount(claims []jsonrpc.Claim) (int, error) {
//updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published
func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err error) {
count := 0
for _, c := range claims {
if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil {
continue
}
if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil {
return count, errors.Err("something is wrong with the this claim: %s", c.ClaimID)
return count, fixed, errors.Err("something is wrong with the this claim: %s", c.ClaimID)
}
count++
//check if claimID is in remote db
tn := *c.Value.Stream.Metadata.Thumbnail
videoID := tn[strings.LastIndex(tn, "/")+1:]
pv, ok := s.syncedVideos[videoID]
if !ok || pv.ClaimName != c.Name {
fixed++
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil)
if err != nil {
return total, fixed, err
}
}
total++
}
return count, nil
return total, fixed, nil
}
func (s *Sync) doSync() error {
@ -407,28 +440,47 @@ func (s *Sync) doSync() error {
if err != nil {
return errors.Prefix("cannot list claims: ", err)
}
hasDupes, err := hasDupes(*claims)
hasDupes, err := s.fixDupes(*claims)
if err != nil {
return errors.Prefix("error checking for duplicates: ", err)
}
if hasDupes {
return errors.Err("channel has duplicates! Manual fix required")
SendInfoToSlack("Channel had dupes and was fixed!")
err = s.waitForNewBlock()
if err != nil {
return err
}
claims, err = s.daemon.ClaimListMine()
if err != nil {
return errors.Prefix("cannot list claims: ", err)
}
}
pubsOnWallet, err := publishesCount(*claims)
pubsOnWallet, nFixed, err := s.updateRemoteDB(*claims)
if err != nil {
return errors.Prefix("error counting claims: ", err)
}
if nFixed > 0 {
err := s.setStatusSyncing()
if err != nil {
return err
}
SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed)
}
pubsOnDB := 0
for _, sv := range s.syncedVideos {
if sv.Published {
pubsOnDB++
}
}
if pubsOnWallet > pubsOnDB {
if pubsOnWallet > pubsOnDB { //This case should never happen
SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
return errors.Err("not all published videos are in the database")
}
if pubsOnWallet < pubsOnDB {
SendInfoToSlack("We're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
}
err = s.walletSetup()
if err != nil {
@ -509,6 +561,7 @@ func (s *Sync) startWorker(workerNum int) {
"Playback on other websites has been disabled by the video owner",
"Error in daemon: Cannot publish empty file",
"Error extracting sts from embedded url response",
"Unable to extract signature tokens",
"Client.Timeout exceeded while awaiting headers)",
"the video is too big to sync, skipping for now",
}
@ -704,6 +757,7 @@ func (s *Sync) processVideo(v video) (err error) {
neverRetryFailures := []string{
"Error extracting sts from embedded url response",
"Unable to extract signature tokens",
"the video is too big to sync, skipping for now",
}
if ok && !sv.Published && util.SubstringInSlice(sv.FailureReason, neverRetryFailures) {
@ -711,19 +765,6 @@ func (s *Sync) processVideo(v video) (err error) {
return nil
}
//TODO: remove this after a few runs...
alreadyPublishedOld, err := s.db.IsPublished(v.ID())
if err != nil {
return err
}
//TODO: remove this after a few runs...
if alreadyPublishedOld && !alreadyPublished {
//seems like something in the migration of blobs didn't go perfectly right so warn about it!
SendInfoToSlack("A video that was previously published is on the local database but isn't on the remote db! fix it @Nikooo777! \nchannelID: %s, videoID: %s",
s.YoutubeChannelID, v.ID())
return nil
}
if alreadyPublished {
log.Println(v.ID() + " already published")
return nil