continued refactoring

This commit is contained in:
Niko Storni 2018-09-26 00:08:18 -04:00
parent 0c268039c1
commit 6175202405
No known key found for this signature in database
GPG key ID: F37FE63398800368
8 changed files with 174 additions and 142 deletions

View file

@ -2,6 +2,7 @@ package cmd
import (
sync "github.com/lbryio/lbry.go/ytsync"
"github.com/lbryio/lbry.go/ytsync/sdk"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@ -22,7 +23,9 @@ func ytcount(cmd *cobra.Command, args []string) {
channelID := args[1]
s := sync.Sync{
YoutubeAPIKey: ytAPIKey,
APIConfig: &sdk.APIConfig{
YoutubeAPIKey: ytAPIKey,
},
YoutubeChannelID: channelID,
}

View file

@ -9,6 +9,7 @@ import (
"github.com/lbryio/lbry.go/util"
sync "github.com/lbryio/lbry.go/ytsync"
"github.com/lbryio/lbry.go/ytsync/sdk"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
@ -141,7 +142,7 @@ func ytSync(cmd *cobra.Command, args []string) {
blobsDir = usr.HomeDir + "/.lbrynet/blobfiles/"
}
sm := sync.SyncManager{
/*sm := sync.SyncManager{
StopOnError: stopOnError,
MaxTries: maxTries,
TakeOverExistingChannel: takeOverExistingChannel,
@ -150,26 +151,64 @@ func ytSync(cmd *cobra.Command, args []string) {
SkipSpaceCheck: skipSpaceCheck,
SyncUpdate: syncUpdate,
SyncStatus: syncStatus,
SyncFrom: syncFrom,
SyncUntil: syncUntil,
ConcurrentJobs: concurrentJobs,
ConcurrentVideos: concurrentJobs,
HostName: hostname,
YoutubeChannelID: channelID,
YoutubeAPIKey: youtubeAPIKey,
ApiURL: apiURL,
ApiToken: apiToken,
BlobsDir: blobsDir,
VideosLimit: videosLimit,
MaxVideoSize: maxVideoSize,
LbrycrdString: lbrycrdString,
AwsS3ID: awsS3ID,
AwsS3Secret: awsS3Secret,
AwsS3Region: awsS3Region,
AwsS3Bucket: awsS3Bucket,
SingleRun: singleRun,
SyncProperties: &sdk.SyncProperties{
SyncFrom: syncFrom,
SyncUntil: syncUntil,
YoutubeChannelID: channelID,
},
APIConfig: &sdk.APIConfig{
YoutubeAPIKey: youtubeAPIKey,
ApiURL: apiURL,
ApiToken: apiToken,
HostName: hostname,
},
ConcurrentJobs: concurrentJobs,
ConcurrentVideos: concurrentJobs,
BlobsDir: blobsDir,
VideosLimit: videosLimit,
MaxVideoSize: maxVideoSize,
LbrycrdString: lbrycrdString,
AwsS3ID: awsS3ID,
AwsS3Secret: awsS3Secret,
AwsS3Region: awsS3Region,
AwsS3Bucket: awsS3Bucket,
SingleRun: singleRun,
}
*/
syncProperties := &sdk.SyncProperties{
SyncFrom: syncFrom,
SyncUntil: syncUntil,
YoutubeChannelID: channelID,
}
apiConfig := &sdk.APIConfig{
YoutubeAPIKey: youtubeAPIKey,
ApiURL: apiURL,
ApiToken: apiToken,
HostName: hostname,
}
sm := sync.NewSyncManager(
stopOnError,
maxTries,
takeOverExistingChannel,
refill,
limit,
skipSpaceCheck,
syncUpdate,
concurrentJobs,
concurrentJobs,
blobsDir,
videosLimit,
maxVideoSize,
lbrycrdString,
awsS3ID,
awsS3Secret,
awsS3Region,
awsS3Bucket,
syncStatus,
singleRun,
syncProperties,
apiConfig,
)
err := sm.Start()
if err != nil {
sync.SendErrorToSlack(err.Error())

View file

@ -11,7 +11,7 @@ import (
func (s *Sync) CountVideos() (uint64, error) {
client := &http.Client{
Transport: &transport.APIKey{Key: s.YoutubeAPIKey},
Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey},
}
service, err := youtube.New(client)

View file

@ -7,7 +7,6 @@ import (
"time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/null"
"github.com/lbryio/lbry.go/util"
"github.com/lbryio/lbry.go/ytsync/namer"
"github.com/lbryio/lbry.go/ytsync/sdk"
@ -15,33 +14,57 @@ import (
)
type SyncManager struct {
StopOnError bool
MaxTries int
TakeOverExistingChannel bool
Refill int
Limit int
SkipSpaceCheck bool
SyncUpdate bool
ConcurrentJobs int
ConcurrentVideos int
BlobsDir string
VideosLimit int
MaxVideoSize int
LbrycrdString string
AwsS3ID string
AwsS3Secret string
AwsS3Region string
SyncStatus string
AwsS3Bucket string
SingleRun bool
ChannelProperties *sdk.ChannelProperties
APIConfig *sdk.APIConfig
stopOnError bool
maxTries int
takeOverExistingChannel bool
refill int
limit int
skipSpaceCheck bool
syncUpdate bool
concurrentJobs int
concurrentVideos int
blobsDir string
videosLimit int
maxVideoSize int
lbrycrdString string
awsS3ID string
awsS3Secret string
awsS3Region string
syncStatus string
awsS3Bucket string
singleRun bool
syncProperties *sdk.SyncProperties
apiConfig *sdk.APIConfig
namer *namer.Namer
}
func NewSyncManager() *SyncManager {
func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool, refill int, limit int,
skipSpaceCheck bool, syncUpdate bool, concurrentJobs int, concurrentVideos int, blobsDir string, videosLimit int,
maxVideoSize int, lbrycrdString string, awsS3ID string, awsS3Secret string, awsS3Region string, awsS3Bucket string,
syncStatus string, singleRun bool, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig) *SyncManager {
return &SyncManager{
namer: namer.NewNamer(),
stopOnError: stopOnError,
maxTries: maxTries,
takeOverExistingChannel: takeOverExistingChannel,
refill: refill,
limit: limit,
skipSpaceCheck: skipSpaceCheck,
syncUpdate: syncUpdate,
concurrentJobs: concurrentJobs,
concurrentVideos: concurrentVideos,
blobsDir: blobsDir,
videosLimit: videosLimit,
maxVideoSize: maxVideoSize,
lbrycrdString: lbrycrdString,
awsS3ID: awsS3ID,
awsS3Secret: awsS3Secret,
awsS3Region: awsS3Region,
awsS3Bucket: awsS3Bucket,
syncStatus: syncStatus,
singleRun: singleRun,
syncProperties: syncProperties,
apiConfig: apiConfig,
namer: namer.NewNamer(),
}
}
@ -56,47 +79,12 @@ const (
var SyncStatuses = []string{StatusPending, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized}
type apiJobsResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data []apiYoutubeChannel `json:"data"`
}
type apiYoutubeChannel struct {
ChannelId string `json:"channel_id"`
TotalVideos uint `json:"total_videos"`
DesiredChannelName string `json:"desired_channel_name"`
SyncServer null.String `json:"sync_server"`
Fee *struct {
Amount string `json:"amount"`
Address string `json:"address"`
Currency string `json:"currency"`
} `json:"fee"`
}
type apiChannelStatusResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data []syncedVideo `json:"data"`
}
type syncedVideo struct {
VideoID string `json:"video_id"`
Published bool `json:"published"`
FailureReason string `json:"failure_reason"`
ClaimName string `json:"claim_name"`
}
const (
VideoStatusPublished = "published"
VideoStatusFailed = "failed"
)
func (s *SyncManager) Start() error {
if s.namer == nil {
// TODO: fix me, use NewSyncManager instead
s.namer = namer.NewNamer()
}
syncCount := 0
for {
@ -108,9 +96,9 @@ func (s *SyncManager) Start() error {
var syncs []Sync
shouldInterruptLoop := false
isSingleChannelSync := s.ChannelProperties.YoutubeChannelID != ""
isSingleChannelSync := s.syncProperties.YoutubeChannelID != ""
if isSingleChannelSync {
channels, err := s.APIConfig.FetchChannels("", s.ChannelProperties)
channels, err := s.apiConfig.FetchChannels("", s.syncProperties)
if err != nil {
return err
}
@ -120,53 +108,53 @@ func (s *SyncManager) Start() error {
lbryChannelName := channels[0].DesiredChannelName
syncs = make([]Sync, 1)
syncs[0] = Sync{
YoutubeAPIKey: s.YoutubeAPIKey,
YoutubeChannelID: s.YoutubeChannelID,
APIConfig: s.apiConfig,
YoutubeChannelID: s.syncProperties.YoutubeChannelID,
LbryChannelName: lbryChannelName,
StopOnError: s.StopOnError,
MaxTries: s.MaxTries,
ConcurrentVideos: s.ConcurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill,
StopOnError: s.stopOnError,
MaxTries: s.maxTries,
ConcurrentVideos: s.concurrentVideos,
TakeOverExistingChannel: s.takeOverExistingChannel,
Refill: s.refill,
Manager: s,
LbrycrdString: s.LbrycrdString,
AwsS3ID: s.AwsS3ID,
AwsS3Secret: s.AwsS3Secret,
AwsS3Region: s.AwsS3Region,
AwsS3Bucket: s.AwsS3Bucket,
LbrycrdString: s.lbrycrdString,
AwsS3ID: s.awsS3ID,
AwsS3Secret: s.awsS3Secret,
AwsS3Region: s.awsS3Region,
AwsS3Bucket: s.awsS3Bucket,
namer: s.namer,
}
shouldInterruptLoop = true
} else {
var queuesToSync []string
if s.SyncStatus != "" {
queuesToSync = append(queuesToSync, s.SyncStatus)
} else if s.SyncUpdate {
if s.syncStatus != "" {
queuesToSync = append(queuesToSync, s.syncStatus)
} else if s.syncUpdate {
queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced)
} else {
queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued)
}
for _, q := range queuesToSync {
channels, err := s.APIConfig.FetchChannels(q, s.ChannelProperties)
channels, err := s.apiConfig.FetchChannels(q, s.syncProperties)
if err != nil {
return err
}
for _, c := range channels {
syncs = append(syncs, Sync{
YoutubeAPIKey: s.YoutubeAPIKey,
APIConfig: s.apiConfig,
YoutubeChannelID: c.ChannelId,
LbryChannelName: c.DesiredChannelName,
StopOnError: s.StopOnError,
MaxTries: s.MaxTries,
ConcurrentVideos: s.ConcurrentVideos,
TakeOverExistingChannel: s.TakeOverExistingChannel,
Refill: s.Refill,
StopOnError: s.stopOnError,
MaxTries: s.maxTries,
ConcurrentVideos: s.concurrentVideos,
TakeOverExistingChannel: s.takeOverExistingChannel,
Refill: s.refill,
Manager: s,
LbrycrdString: s.LbrycrdString,
AwsS3ID: s.AwsS3ID,
AwsS3Secret: s.AwsS3Secret,
AwsS3Region: s.AwsS3Region,
AwsS3Bucket: s.AwsS3Bucket,
LbrycrdString: s.lbrycrdString,
AwsS3ID: s.awsS3ID,
AwsS3Secret: s.awsS3Secret,
AwsS3Region: s.awsS3Region,
AwsS3Bucket: s.awsS3Bucket,
})
}
}
@ -199,12 +187,12 @@ func (s *SyncManager) Start() error {
if !shouldNotCount {
syncCount++
}
if sync.IsInterrupted() || (s.Limit != 0 && syncCount >= s.Limit) {
if sync.IsInterrupted() || (s.limit != 0 && syncCount >= s.limit) {
shouldInterruptLoop = true
break
}
}
if shouldInterruptLoop || s.SingleRun {
if shouldInterruptLoop || s.singleRun {
break
}
}
@ -212,11 +200,11 @@ func (s *SyncManager) Start() error {
}
func (s *SyncManager) checkUsedSpace() error {
usedPctile, err := GetUsedSpace(s.BlobsDir)
usedPctile, err := GetUsedSpace(s.blobsDir)
if err != nil {
return err
}
if usedPctile >= 0.90 && !s.SkipSpaceCheck {
if usedPctile >= 0.90 && !s.skipSpaceCheck {
return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100))
}
log.Infof("disk usage: %.1f%%", usedPctile*100)

View file

@ -24,6 +24,10 @@ func NewNamer() *Namer {
}
}
func (n *Namer) SetNames(names map[string]bool) {
n.names = names
}
func (n *Namer) GetNextName(prefix string) string {
n.mu.Lock()
defer n.mu.Unlock()

View file

@ -24,7 +24,7 @@ type APIConfig struct {
HostName string
}
type ChannelProperties struct {
type SyncProperties struct {
SyncFrom int64
SyncUntil int64
YoutubeChannelID string
@ -42,7 +42,7 @@ type YoutubeChannel struct {
} `json:"fee"`
}
func (a *APIConfig) FetchChannels(status string, cp *ChannelProperties) ([]YoutubeChannel, error) {
func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeChannel, error) {
type apiJobsResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
@ -79,7 +79,7 @@ type SyncedVideo struct {
ClaimName string `json:"claim_name"`
}
func (a *APIConfig) setChannelStatus(channelID string, status string, failureReason string) (map[string]SyncedVideo, map[string]bool, error) {
func (a *APIConfig) SetChannelStatus(channelID string, status string, failureReason string) (map[string]SyncedVideo, map[string]bool, error) {
type apiChannelStatusResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`

View file

@ -50,8 +50,8 @@ func (s *Sync) walletSetup() error {
s.syncedVideosMux.RUnlock()
log.Debugf("We already allocated credits for %d videos", numPublished)
if numOnSource-numPublished > s.Manager.VideosLimit {
numOnSource = s.Manager.VideosLimit
if numOnSource-numPublished > s.Manager.videosLimit {
numOnSource = s.Manager.videosLimit
}
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount

View file

@ -17,18 +17,20 @@ import (
"syscall"
"time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stop"
"github.com/lbryio/lbry.go/util"
"github.com/lbryio/lbry.go/ytsync/namer"
"github.com/lbryio/lbry.go/ytsync/sdk"
"github.com/lbryio/lbry.go/ytsync/sources"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stop"
"github.com/lbryio/lbry.go/util"
"github.com/lbryio/lbry.go/ytsync/namer"
"github.com/lbryio/lbry.go/ytsync/sources"
"github.com/mitchellh/go-ps"
log "github.com/sirupsen/logrus"
"google.golang.org/api/googleapi/transport"
@ -59,7 +61,7 @@ func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[
// Sync stores the options that control how syncing happens
type Sync struct {
YoutubeAPIKey string
APIConfig *sdk.APIConfig
YoutubeChannelID string
LbryChannelName string
StopOnError bool
@ -78,8 +80,7 @@ type Sync struct {
claimAddress string
videoDirectory string
syncedVideosMux *sync.RWMutex
syncedVideos map[string]syncedVideo
claimNames map[string]bool
syncedVideos map[string]sdk.SyncedVideo
grp *stop.Group
lbryChannelID string
namer *namer.Namer
@ -91,14 +92,11 @@ type Sync struct {
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) {
s.syncedVideosMux.Lock()
defer s.syncedVideosMux.Unlock()
s.syncedVideos[videoID] = syncedVideo{
s.syncedVideos[videoID] = sdk.SyncedVideo{
VideoID: videoID,
Published: published,
FailureReason: failureReason,
}
if claimName != "" {
s.claimNames[claimName] = true
}
}
// SendErrorToSlack Sends an error message to the default channel and to the process log.
@ -223,13 +221,13 @@ func (s *Sync) uploadWallet() error {
}
func (s *Sync) setStatusSyncing() error {
syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
syncedVideos, claimNames, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
if err != nil {
return err
}
s.syncedVideosMux.Lock()
s.syncedVideos = syncedVideos
s.claimNames = claimNames
s.Manager.namer.SetNames(claimNames)
s.syncedVideosMux.Unlock()
return nil
}
@ -313,14 +311,14 @@ func (s *Sync) setChannelTerminationStatus(e *error) {
return
}
failureReason := (*e).Error()
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
_, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s.", s.LbryChannelName)
err = errors.Prefix(msg, err)
*e = errors.Prefix(err.Error(), *e)
}
} else if !s.IsInterrupted() {
_, _, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSynced, "")
_, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSynced, "")
if err != nil {
*e = err
}
@ -426,7 +424,7 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err
pv, ok := s.syncedVideos[videoID]
if !ok || pv.ClaimName != c.Name {
fixed++
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil)
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil)
if err != nil {
return total, fixed, err
}
@ -595,7 +593,7 @@ func (s *Sync) startWorker(workerNum int) {
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
}
s.AppendSyncedVideo(v.ID(), false, err.Error(), "")
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}
@ -607,7 +605,7 @@ func (s *Sync) startWorker(workerNum int) {
func (s *Sync) enqueueYoutubeVideos() error {
client := &http.Client{
Transport: &transport.APIKey{Key: s.YoutubeAPIKey},
Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey},
}
service, err := youtube.New(client)
@ -772,7 +770,7 @@ func (s *Sync) processVideo(v video) (err error) {
return nil
}
if v.PlaylistPosition() > s.Manager.VideosLimit {
if v.PlaylistPosition() > s.Manager.videosLimit {
log.Println(v.ID() + " is old: skipping")
return nil
}
@ -781,12 +779,12 @@ func (s *Sync) processVideo(v video) (err error) {
return err
}
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.MaxVideoSize, s.namer)
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.maxVideoSize, s.namer)
if err != nil {
return err
}
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}