move params to env vars

consider fees of 0.1 while calculating setup funds
add change address to speed up publishing
rebase
This commit is contained in:
Niko Storni 2018-05-25 20:43:16 -04:00
parent 0df643a550
commit 6fb792f80c
No known key found for this signature in database
GPG key ID: F37FE63398800368
5 changed files with 65 additions and 34 deletions

View file

@ -10,6 +10,10 @@ import (
"os/user"
"time"
"strconv"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/null"
"github.com/lbryio/lbry.go/util"
@ -18,6 +22,9 @@ import (
"github.com/spf13/cobra"
)
var APIURL string
var APIToken string
func init() {
var selfSyncCmd = &cobra.Command{
Use: "selfsync <youtube_api_key> <auth_token>",
@ -34,6 +41,16 @@ func init() {
selfSyncCmd.Flags().StringVar(&syncStatus, "status", StatusQueued, "Specify which queue to pull from. Overrides --update (Default: queued)")
RootCmd.AddCommand(selfSyncCmd)
APIURL = os.Getenv("LBRY_API")
APIToken = os.Getenv("LBRY_API_TOKEN")
if APIURL == "" {
log.Errorln("An API URL was not defined. Please set the environment variable LBRY_API")
return
}
if APIToken == "" {
log.Errorln("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN")
return
}
}
type APIJobsResponse struct {
@ -49,11 +66,12 @@ type APIYoutubeChannel struct {
SyncServer null.String `json:"sync_server"`
}
func fetchChannels(authToken string, status string) ([]APIYoutubeChannel, error) {
url := "http://localhost:8080/yt/jobs"
func fetchChannels(status string) ([]APIYoutubeChannel, error) {
url := APIURL + "/yt/jobs"
res, _ := http.PostForm(url, url2.Values{
"auth_token": {authToken},
"auth_token": {APIToken},
"sync_status": {status},
"after": {strconv.Itoa(int(time.Now().AddDate(0, 0, -1).Unix()))},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
@ -62,6 +80,9 @@ func fetchChannels(authToken string, status string) ([]APIYoutubeChannel, error)
if err != nil {
return nil, err
}
if response.Data == nil {
return nil, errors.Err(response.Error)
}
return response.Data, nil
}
@ -71,17 +92,17 @@ type APISyncUpdateResponse struct {
Data null.String `json:"data"`
}
func setChannelSyncStatus(authToken string, channelID string, status string) error {
func setChannelSyncStatus(channelID string, status string) error {
host, err := os.Hostname()
if err != nil {
return errors.Err("could not detect system hostname")
}
url := "http://localhost:8080/yt/sync_update"
url := APIURL + "/yt/sync_update"
res, _ := http.PostForm(url, url2.Values{
"channel_id": {channelID},
"sync_server": {host},
"auth_token": {authToken},
"auth_token": {APIToken},
"sync_status": {status},
})
defer res.Body.Close()
@ -123,17 +144,19 @@ func selfSync(cmd *cobra.Command, args []string) {
} else {
util.InitSlack(os.Getenv("SLACK_TOKEN"))
}
err := spaceCheck()
if err != nil {
util.SendToSlackError(err.Error())
return
}
ytAPIKey := args[0]
authToken := args[1]
//authToken := args[1]
if !util.InSlice(syncStatus, SyncStatuses) {
log.Errorf("status must be one of the following: %v\n", SyncStatuses)
return
}
if syncUpdate {
syncStatus = StatusSynced
}
if stopOnError && maxTries != defaultMaxTries {
log.Errorln("--stop-on-error and --max-tries are mutually exclusive")
return
@ -161,19 +184,20 @@ func selfSync(cmd *cobra.Command, args []string) {
queuesToSync = append(queuesToSync, StatusSynced)
}
for _, v := range queuesToSync {
interruptedByUser, err := processQueue(v, authToken, ytAPIKey, &syncCount)
interruptedByUser, err := processQueue(v, ytAPIKey, &syncCount)
if err != nil {
util.SendToSlackError(err.Error())
break mainLoop
}
if interruptedByUser {
util.SendToSlackInfo("interrupted by user!")
break mainLoop
}
}
}
} else {
// sync whatever was specified
_, err := processQueue(syncStatus, authToken, ytAPIKey, &syncCount)
_, err := processQueue(syncStatus, ytAPIKey, &syncCount)
if err != nil {
util.SendToSlackError(err.Error())
return
@ -182,18 +206,19 @@ func selfSync(cmd *cobra.Command, args []string) {
util.SendToSlackInfo("Syncing process terminated!")
}
func processQueue(queueStatus string, authToken string, ytAPIKey string, syncCount *int) (bool, error) {
syncingChannels, err := fetchChannels(authToken, queueStatus)
func processQueue(queueStatus string, ytAPIKey string, syncCount *int) (bool, error) {
util.SendToSlackInfo("Syncing %s channels", queueStatus)
channelsToSync, err := fetchChannels(queueStatus)
if err != nil {
return false, errors.Prefix("failed to fetch channels", err)
}
util.SendToSlackInfo("Finished syncing %s channels", queueStatus)
return syncChannels(syncingChannels, authToken, ytAPIKey, syncCount)
return syncChannels(channelsToSync, ytAPIKey, syncCount)
}
// syncChannels processes a slice of youtube channels (channelsToSync) and returns a bool that indicates whether
// the execution finished by itself or was interrupted by the user and an error if anything happened
func syncChannels(channelsToSync []APIYoutubeChannel, authToken string, ytAPIKey string, syncCount *int) (bool, error) {
func syncChannels(channelsToSync []APIYoutubeChannel, ytAPIKey string, syncCount *int) (bool, error) {
host, err := os.Hostname()
if err != nil {
host = ""
@ -217,7 +242,7 @@ func syncChannels(channelsToSync []APIYoutubeChannel, authToken string, ytAPIKey
}
//acquire the lock on the channel
err := setChannelSyncStatus(authToken, channelID, StatusSyncing)
err := setChannelSyncStatus(channelID, StatusSyncing)
if err != nil {
util.SendToSlackError("Failed acquiring sync rights for channel %s: %v", lbryChannelName, err)
continue
@ -247,7 +272,7 @@ func syncChannels(channelsToSync []APIYoutubeChannel, authToken string, ytAPIKey
return s.IsInterrupted(), errors.Prefix("@Nikooo777 this requires manual intervention! Exiting...", err)
}
//mark video as failed
err := setChannelSyncStatus(authToken, channelID, StatusFailed)
err := setChannelSyncStatus(channelID, StatusFailed)
if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s. \n@Nikooo777 this requires manual intervention! Exiting...", lbryChannelName)
return s.IsInterrupted(), errors.Prefix(msg, err)
@ -258,7 +283,7 @@ func syncChannels(channelsToSync []APIYoutubeChannel, authToken string, ytAPIKey
return true, nil
}
//mark video as synced
err = setChannelSyncStatus(authToken, channelID, StatusSynced)
err = setChannelSyncStatus(channelID, StatusSynced)
if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s. \n@Nikooo777 this requires manual intervention! Exiting...", lbryChannelName)
return false, errors.Prefix(msg, err)

View file

@ -5,8 +5,14 @@ import (
"testing"
)
/*
func TestMain(m *testing.M) {
APIURL = os.Getenv("LBRY_API")
APIToken = os.Getenv("LBRY_API_TOKEN")
}
*/
func TestFetchChannels(t *testing.T) {
res, err := fetchChannels("620280", StatusSynced)
res, err := fetchChannels(StatusQueued)
if err != nil {
t.Error(err)
}
@ -20,11 +26,11 @@ func TestFetchChannels(t *testing.T) {
// such field should be reset to null if the test must be run on a different machine (different hostname)
// and obviously the auth token must be appropriate
func TestSetChannelSyncStatus(t *testing.T) {
err := setChannelSyncStatus("620280", "UCNQfQvFMPnInwsU_iGYArJQ", StatusSyncing)
err := setChannelSyncStatus("UCNQfQvFMPnInwsU_iGYArJQ", StatusSyncing)
if err != nil {
t.Error(err)
}
err = setChannelSyncStatus("620280", "UCNQfQvFMPnInwsU_iGYArJQ", StatusQueued)
err = setChannelSyncStatus("UCNQfQvFMPnInwsU_iGYArJQ", StatusQueued)
if err != nil {
t.Error(err)
}

View file

@ -45,7 +45,7 @@ func (s *Sync) walletSetup() error {
}
log.Debugf("We already published %d videos", numPublished)
minBalance := (float64(numOnSource)-float64(numPublished))*publishAmount + channelClaimAmount
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount
if numPublished > numOnSource {
util.SendToSlackError("something is going on as we published more videos than those available on source: %d/%d", numPublished, numOnSource)
minBalance = 1 //since we ended up in this function it means some juice is still needed

View file

@ -179,13 +179,14 @@ func strPtr(s string) *string { return &s }
func (v YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelName string) error {
options := jsonrpc.PublishOptions{
Title: &v.title,
Author: &v.channelTitle,
Description: strPtr(v.getAbbrevDescription() + "\nhttps://www.youtube.com/watch?v=" + v.id),
Language: strPtr("en"),
ClaimAddress: &claimAddress,
Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id),
License: strPtr("Copyrighted (contact author)"),
Title: &v.title,
Author: &v.channelTitle,
Description: strPtr(v.getAbbrevDescription() + "\nhttps://www.youtube.com/watch?v=" + v.id),
Language: strPtr("en"),
ClaimAddress: &claimAddress,
Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id),
License: strPtr("Copyrighted (contact author)"),
ChangeAddress: &claimAddress,
}
if channelName != "" {
options.ChannelName = &channelName

View file

@ -20,10 +20,9 @@ import (
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stop"
"github.com/lbryio/lbry.go/util"
"github.com/lbryio/lbry.go/ytsync/redisdb"
"github.com/lbryio/lbry.go/ytsync/sources"
"github.com/lbryio/lbry.go/util"
"github.com/mitchellh/go-ps"
log "github.com/sirupsen/logrus"
"google.golang.org/api/googleapi/transport"
@ -75,7 +74,7 @@ type Sync struct {
// IsInterrupted can be queried to discover if the sync process was interrupted manually
func (s *Sync) IsInterrupted() bool {
select {
case <-s.stop.Chan():
case <-s.stop.Ch():
return true
default:
return false