continued refactoring

This commit is contained in:
Niko Storni 2018-09-26 00:08:18 -04:00
parent 58fef45a5f
commit 95c4dde276
6 changed files with 111 additions and 121 deletions

View file

@ -11,7 +11,7 @@ import (
func (s *Sync) CountVideos() (uint64, error) { func (s *Sync) CountVideos() (uint64, error) {
client := &http.Client{ client := &http.Client{
Transport: &transport.APIKey{Key: s.YoutubeAPIKey}, Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey},
} }
service, err := youtube.New(client) service, err := youtube.New(client)

View file

@ -7,7 +7,6 @@ import (
"time" "time"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/null"
"github.com/lbryio/lbry.go/util" "github.com/lbryio/lbry.go/util"
"github.com/lbryio/lbry.go/ytsync/namer" "github.com/lbryio/lbry.go/ytsync/namer"
"github.com/lbryio/lbry.go/ytsync/sdk" "github.com/lbryio/lbry.go/ytsync/sdk"
@ -15,32 +14,56 @@ import (
) )
type SyncManager struct { type SyncManager struct {
StopOnError bool stopOnError bool
MaxTries int maxTries int
TakeOverExistingChannel bool takeOverExistingChannel bool
Refill int refill int
Limit int limit int
SkipSpaceCheck bool skipSpaceCheck bool
SyncUpdate bool syncUpdate bool
ConcurrentJobs int concurrentJobs int
ConcurrentVideos int concurrentVideos int
BlobsDir string blobsDir string
VideosLimit int videosLimit int
MaxVideoSize int maxVideoSize int
LbrycrdString string lbrycrdString string
AwsS3ID string awsS3ID string
AwsS3Secret string awsS3Secret string
AwsS3Region string awsS3Region string
SyncStatus string syncStatus string
AwsS3Bucket string awsS3Bucket string
SingleRun bool singleRun bool
ChannelProperties *sdk.ChannelProperties syncProperties *sdk.SyncProperties
APIConfig *sdk.APIConfig apiConfig *sdk.APIConfig
namer *namer.Namer namer *namer.Namer
} }
func NewSyncManager() *SyncManager { func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool, refill int, limit int,
skipSpaceCheck bool, syncUpdate bool, concurrentJobs int, concurrentVideos int, blobsDir string, videosLimit int,
maxVideoSize int, lbrycrdString string, awsS3ID string, awsS3Secret string, awsS3Region string, awsS3Bucket string,
syncStatus string, singleRun bool, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig) *SyncManager {
return &SyncManager{ return &SyncManager{
stopOnError: stopOnError,
maxTries: maxTries,
takeOverExistingChannel: takeOverExistingChannel,
refill: refill,
limit: limit,
skipSpaceCheck: skipSpaceCheck,
syncUpdate: syncUpdate,
concurrentJobs: concurrentJobs,
concurrentVideos: concurrentVideos,
blobsDir: blobsDir,
videosLimit: videosLimit,
maxVideoSize: maxVideoSize,
lbrycrdString: lbrycrdString,
awsS3ID: awsS3ID,
awsS3Secret: awsS3Secret,
awsS3Region: awsS3Region,
awsS3Bucket: awsS3Bucket,
syncStatus: syncStatus,
singleRun: singleRun,
syncProperties: syncProperties,
apiConfig: apiConfig,
namer: namer.NewNamer(), namer: namer.NewNamer(),
} }
} }
@ -56,47 +79,12 @@ const (
var SyncStatuses = []string{StatusPending, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized} var SyncStatuses = []string{StatusPending, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized}
type apiJobsResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data []apiYoutubeChannel `json:"data"`
}
type apiYoutubeChannel struct {
ChannelId string `json:"channel_id"`
TotalVideos uint `json:"total_videos"`
DesiredChannelName string `json:"desired_channel_name"`
SyncServer null.String `json:"sync_server"`
Fee *struct {
Amount string `json:"amount"`
Address string `json:"address"`
Currency string `json:"currency"`
} `json:"fee"`
}
type apiChannelStatusResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data []syncedVideo `json:"data"`
}
type syncedVideo struct {
VideoID string `json:"video_id"`
Published bool `json:"published"`
FailureReason string `json:"failure_reason"`
ClaimName string `json:"claim_name"`
}
const ( const (
VideoStatusPublished = "published" VideoStatusPublished = "published"
VideoStatusFailed = "failed" VideoStatusFailed = "failed"
) )
func (s *SyncManager) Start() error { func (s *SyncManager) Start() error {
if s.namer == nil {
// TODO: fix me, use NewSyncManager instead
s.namer = namer.NewNamer()
}
syncCount := 0 syncCount := 0
for { for {
@ -108,9 +96,9 @@ func (s *SyncManager) Start() error {
var syncs []Sync var syncs []Sync
shouldInterruptLoop := false shouldInterruptLoop := false
isSingleChannelSync := s.ChannelProperties.YoutubeChannelID != "" isSingleChannelSync := s.syncProperties.YoutubeChannelID != ""
if isSingleChannelSync { if isSingleChannelSync {
channels, err := s.APIConfig.FetchChannels("", s.ChannelProperties) channels, err := s.apiConfig.FetchChannels("", s.syncProperties)
if err != nil { if err != nil {
return err return err
} }
@ -120,53 +108,53 @@ func (s *SyncManager) Start() error {
lbryChannelName := channels[0].DesiredChannelName lbryChannelName := channels[0].DesiredChannelName
syncs = make([]Sync, 1) syncs = make([]Sync, 1)
syncs[0] = Sync{ syncs[0] = Sync{
YoutubeAPIKey: s.YoutubeAPIKey, APIConfig: s.apiConfig,
YoutubeChannelID: s.YoutubeChannelID, YoutubeChannelID: s.syncProperties.YoutubeChannelID,
LbryChannelName: lbryChannelName, LbryChannelName: lbryChannelName,
StopOnError: s.StopOnError, StopOnError: s.stopOnError,
MaxTries: s.MaxTries, MaxTries: s.maxTries,
ConcurrentVideos: s.ConcurrentVideos, ConcurrentVideos: s.concurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel, TakeOverExistingChannel: s.takeOverExistingChannel,
Refill: s.Refill, Refill: s.refill,
Manager: s, Manager: s,
LbrycrdString: s.LbrycrdString, LbrycrdString: s.lbrycrdString,
AwsS3ID: s.AwsS3ID, AwsS3ID: s.awsS3ID,
AwsS3Secret: s.AwsS3Secret, AwsS3Secret: s.awsS3Secret,
AwsS3Region: s.AwsS3Region, AwsS3Region: s.awsS3Region,
AwsS3Bucket: s.AwsS3Bucket, AwsS3Bucket: s.awsS3Bucket,
namer: s.namer, namer: s.namer,
} }
shouldInterruptLoop = true shouldInterruptLoop = true
} else { } else {
var queuesToSync []string var queuesToSync []string
if s.SyncStatus != "" { if s.syncStatus != "" {
queuesToSync = append(queuesToSync, s.SyncStatus) queuesToSync = append(queuesToSync, s.syncStatus)
} else if s.SyncUpdate { } else if s.syncUpdate {
queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced) queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced)
} else { } else {
queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued) queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued)
} }
for _, q := range queuesToSync { for _, q := range queuesToSync {
channels, err := s.APIConfig.FetchChannels(q, s.ChannelProperties) channels, err := s.apiConfig.FetchChannels(q, s.syncProperties)
if err != nil { if err != nil {
return err return err
} }
for _, c := range channels { for _, c := range channels {
syncs = append(syncs, Sync{ syncs = append(syncs, Sync{
YoutubeAPIKey: s.YoutubeAPIKey, APIConfig: s.apiConfig,
YoutubeChannelID: c.ChannelId, YoutubeChannelID: c.ChannelId,
LbryChannelName: c.DesiredChannelName, LbryChannelName: c.DesiredChannelName,
StopOnError: s.StopOnError, StopOnError: s.stopOnError,
MaxTries: s.MaxTries, MaxTries: s.maxTries,
ConcurrentVideos: s.ConcurrentVideos, ConcurrentVideos: s.concurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel, TakeOverExistingChannel: s.takeOverExistingChannel,
Refill: s.Refill, Refill: s.refill,
Manager: s, Manager: s,
LbrycrdString: s.LbrycrdString, LbrycrdString: s.lbrycrdString,
AwsS3ID: s.AwsS3ID, AwsS3ID: s.awsS3ID,
AwsS3Secret: s.AwsS3Secret, AwsS3Secret: s.awsS3Secret,
AwsS3Region: s.AwsS3Region, AwsS3Region: s.awsS3Region,
AwsS3Bucket: s.AwsS3Bucket, AwsS3Bucket: s.awsS3Bucket,
}) })
} }
} }
@ -199,12 +187,12 @@ func (s *SyncManager) Start() error {
if !shouldNotCount { if !shouldNotCount {
syncCount++ syncCount++
} }
if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) { if sync.IsInterrupted() || (s.limit != 0 && syncCount >= s.limit) {
shouldInterruptLoop = true shouldInterruptLoop = true
break break
} }
} }
if shouldInterruptLoop || s.SingleRun { if shouldInterruptLoop || s.singleRun {
break break
} }
} }
@ -212,11 +200,11 @@ func (s *SyncManager) Start() error {
} }
func (s *SyncManager) checkUsedSpace() error { func (s *SyncManager) checkUsedSpace() error {
usedPctile, err := GetUsedSpace(s.BlobsDir) usedPctile, err := GetUsedSpace(s.blobsDir)
if err != nil { if err != nil {
return err return err
} }
if usedPctile >= 0.90 && !s.SkipSpaceCheck { if usedPctile >= 0.90 && !s.skipSpaceCheck {
return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100)) return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100))
} }
log.Infof("disk usage: %.1f%%", usedPctile*100) log.Infof("disk usage: %.1f%%", usedPctile*100)

View file

@ -24,6 +24,10 @@ func NewNamer() *Namer {
} }
} }
func (n *Namer) SetNames(names map[string]bool) {
n.names = names
}
func (n *Namer) GetNextName(prefix string) string { func (n *Namer) GetNextName(prefix string) string {
n.mu.Lock() n.mu.Lock()
defer n.mu.Unlock() defer n.mu.Unlock()

View file

@ -24,7 +24,7 @@ type APIConfig struct {
HostName string HostName string
} }
type ChannelProperties struct { type SyncProperties struct {
SyncFrom int64 SyncFrom int64
SyncUntil int64 SyncUntil int64
YoutubeChannelID string YoutubeChannelID string
@ -42,7 +42,7 @@ type YoutubeChannel struct {
} `json:"fee"` } `json:"fee"`
} }
func (a *APIConfig) FetchChannels(status string, cp *ChannelProperties) ([]YoutubeChannel, error) { func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeChannel, error) {
type apiJobsResponse struct { type apiJobsResponse struct {
Success bool `json:"success"` Success bool `json:"success"`
Error null.String `json:"error"` Error null.String `json:"error"`
@ -79,7 +79,7 @@ type SyncedVideo struct {
ClaimName string `json:"claim_name"` ClaimName string `json:"claim_name"`
} }
func (a *APIConfig) setChannelStatus(channelID string, status string, failureReason string) (map[string]SyncedVideo, map[string]bool, error) { func (a *APIConfig) SetChannelStatus(channelID string, status string, failureReason string) (map[string]SyncedVideo, map[string]bool, error) {
type apiChannelStatusResponse struct { type apiChannelStatusResponse struct {
Success bool `json:"success"` Success bool `json:"success"`
Error null.String `json:"error"` Error null.String `json:"error"`

View file

@ -50,8 +50,8 @@ func (s *Sync) walletSetup() error {
s.syncedVideosMux.RUnlock() s.syncedVideosMux.RUnlock()
log.Debugf("We already allocated credits for %d videos", numPublished) log.Debugf("We already allocated credits for %d videos", numPublished)
if numOnSource-numPublished > s.Manager.VideosLimit { if numOnSource-numPublished > s.Manager.videosLimit {
numOnSource = s.Manager.VideosLimit numOnSource = s.Manager.videosLimit
} }
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount

View file

@ -17,18 +17,20 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stop"
"github.com/lbryio/lbry.go/util"
"github.com/lbryio/lbry.go/ytsync/namer"
"github.com/lbryio/lbry.go/ytsync/sdk"
"github.com/lbryio/lbry.go/ytsync/sources"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stop"
"github.com/lbryio/lbry.go/util"
"github.com/lbryio/lbry.go/ytsync/namer"
"github.com/lbryio/lbry.go/ytsync/sources"
"github.com/mitchellh/go-ps" "github.com/mitchellh/go-ps"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"google.golang.org/api/googleapi/transport" "google.golang.org/api/googleapi/transport"
@ -59,7 +61,7 @@ func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[
// Sync stores the options that control how syncing happens // Sync stores the options that control how syncing happens
type Sync struct { type Sync struct {
YoutubeAPIKey string APIConfig *sdk.APIConfig
YoutubeChannelID string YoutubeChannelID string
LbryChannelName string LbryChannelName string
StopOnError bool StopOnError bool
@ -78,8 +80,7 @@ type Sync struct {
claimAddress string claimAddress string
videoDirectory string videoDirectory string
syncedVideosMux *sync.RWMutex syncedVideosMux *sync.RWMutex
syncedVideos map[string]syncedVideo syncedVideos map[string]sdk.SyncedVideo
claimNames map[string]bool
grp *stop.Group grp *stop.Group
lbryChannelID string lbryChannelID string
namer *namer.Namer namer *namer.Namer
@ -91,14 +92,11 @@ type Sync struct {
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) { func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) {
s.syncedVideosMux.Lock() s.syncedVideosMux.Lock()
defer s.syncedVideosMux.Unlock() defer s.syncedVideosMux.Unlock()
s.syncedVideos[videoID] = syncedVideo{ s.syncedVideos[videoID] = sdk.SyncedVideo{
VideoID: videoID, VideoID: videoID,
Published: published, Published: published,
FailureReason: failureReason, FailureReason: failureReason,
} }
if claimName != "" {
s.claimNames[claimName] = true
}
} }
// SendErrorToSlack Sends an error message to the default channel and to the process log. // SendErrorToSlack Sends an error message to the default channel and to the process log.
@ -223,13 +221,13 @@ func (s *Sync) uploadWallet() error {
} }
func (s *Sync) setStatusSyncing() error { func (s *Sync) setStatusSyncing() error {
syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "") syncedVideos, claimNames, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
if err != nil { if err != nil {
return err return err
} }
s.syncedVideosMux.Lock() s.syncedVideosMux.Lock()
s.syncedVideos = syncedVideos s.syncedVideos = syncedVideos
s.claimNames = claimNames s.Manager.namer.SetNames(claimNames)
s.syncedVideosMux.Unlock() s.syncedVideosMux.Unlock()
return nil return nil
} }
@ -313,14 +311,14 @@ func (s *Sync) setChannelTerminationStatus(e *error) {
return return
} }
failureReason := (*e).Error() failureReason := (*e).Error()
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason) _, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
if err != nil { if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName) msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
err = errors.Prefix(msg, err) err = errors.Prefix(msg, err)
*e = errors.Prefix(err.Error(), *e) *e = errors.Prefix(err.Error(), *e)
} }
} else if !s.IsInterrupted() { } else if !s.IsInterrupted() {
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "") _, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSynced, "")
if err != nil { if err != nil {
*e = err *e = err
} }
@ -426,7 +424,7 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err
pv, ok := s.syncedVideos[videoID] pv, ok := s.syncedVideos[videoID]
if !ok || pv.ClaimName != c.Name { if !ok || pv.ClaimName != c.Name {
fixed++ fixed++
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil) err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil)
if err != nil { if err != nil {
return total, fixed, err return total, fixed, err
} }
@ -595,7 +593,7 @@ func (s *Sync) startWorker(workerNum int) {
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg) SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
} }
s.AppendSyncedVideo(v.ID(), false, err.Error(), "") s.AppendSyncedVideo(v.ID(), false, err.Error(), "")
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size()) err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
if err != nil { if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
} }
@ -607,7 +605,7 @@ func (s *Sync) startWorker(workerNum int) {
func (s *Sync) enqueueYoutubeVideos() error { func (s *Sync) enqueueYoutubeVideos() error {
client := &http.Client{ client := &http.Client{
Transport: &transport.APIKey{Key: s.YoutubeAPIKey}, Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey},
} }
service, err := youtube.New(client) service, err := youtube.New(client)
@ -772,7 +770,7 @@ func (s *Sync) processVideo(v video) (err error) {
return nil return nil
} }
if v.PlaylistPosition() > s.Manager.VideosLimit { if v.PlaylistPosition() > s.Manager.videosLimit {
log.Println(v.ID() + " is old: skipping") log.Println(v.ID() + " is old: skipping")
return nil return nil
} }
@ -781,12 +779,12 @@ func (s *Sync) processVideo(v video) (err error) {
return err return err
} }
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.namer) summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.maxVideoSize, s.namer)
if err != nil { if err != nil {
return err return err
} }
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size()) err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
if err != nil { if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error()) SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
} }