rewrite account funding algorithm

lock publishing when UTXOs management is in progress
spend everything when funding UTXOs
update lbry.go
This commit is contained in:
Niko Storni 2019-06-12 03:17:59 +02:00
parent ae3dad944d
commit f4e75cf221
7 changed files with 60 additions and 54 deletions

2
go.mod
View file

@ -15,7 +15,7 @@ require (
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c
github.com/lbryio/lbry.go v1.0.10
github.com/lbryio/lbry.go v1.0.11
github.com/lusis/slack-test v0.0.0-20190408224659-6cf59653add2 // indirect
github.com/mitchellh/go-ps v0.0.0-20170309133038-4fdf99ab2936
github.com/mitchellh/mapstructure v1.1.2 // indirect

4
go.sum
View file

@ -129,8 +129,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c h1:BhdcWGsuKif/XoSZnqVGNqJ1iEmH0czWR5upj+AuR8M=
github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c/go.mod h1:muH7wpUqE8hRA3OrYYosw9+Sl681BF9cwcjzE+OCNK8=
github.com/lbryio/lbry.go v1.0.10 h1:9TKfk0R9q7obMDefURpd0/Z4wzx//jtyeTKaHPeD6pU=
github.com/lbryio/lbry.go v1.0.10/go.mod h1:JtyI30bU51rm0LZ/po3mQuzf++14OWb6kR/6mMRAmKU=
github.com/lbryio/lbry.go v1.0.11 h1:dTaTNWF5wyWX9WQrgpIolfDEfGLl/ay2c2f2WOMayY4=
github.com/lbryio/lbry.go v1.0.11/go.mod h1:JtyI30bU51rm0LZ/po3mQuzf++14OWb6kR/6mMRAmKU=
github.com/lbryio/lbryschema.go v0.0.0-20190428231007-c54836bca002 h1:urfYK5ElpUrAv90auPLldoVC60LwiGAcY0OE6HJB9KI=
github.com/lbryio/lbryschema.go v0.0.0-20190428231007-c54836bca002/go.mod h1:dAzPCBj3CKKWBGYBZxK6tKBP5SCgY2tqd9SnQd/OyKo=
github.com/lbryio/ozzo-validation v0.0.0-20170323141101-d1008ad1fd04 h1:Nze+C2HbeKvhjI/kVn+9Poj/UuEW5sOQxcsxqO7L3GI=

View file

@ -1,7 +1,6 @@
package manager
import (
"fmt"
"math"
"net/http"
"os"
@ -22,8 +21,6 @@ import (
"google.golang.org/api/youtube/v3"
)
const minimumRefillAmount = 4
func (s *Sync) enableAddressReuse() error {
accountsResponse, err := s.daemon.AccountList()
if err != nil {
@ -44,7 +41,7 @@ func (s *Sync) enableAddressReuse() error {
return nil
}
func (s *Sync) walletSetup() error {
//prevent unnecessary concurrent execution
//prevent unnecessary concurrent execution and publishing while refilling/reallocating UTXOs
s.walletMux.Lock()
defer s.walletMux.Unlock()
err := s.ensureChannelOwnership()
@ -68,51 +65,51 @@ func (s *Sync) walletSetup() error {
if err != nil {
return err
}
numOnSource := int(n)
videosOnYoutube := int(n)
log.Debugf("Source channel has %d videos", numOnSource)
if numOnSource == 0 {
log.Debugf("Source channel has %d videos", videosOnYoutube)
if videosOnYoutube == 0 {
return nil
}
s.syncedVideosMux.RLock()
numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
publishedCount := 0
notUpgradedCount := 0
failedCount := 0
for _, sv := range s.syncedVideos {
if sv.Published {
publishedCount++
if sv.MetadataVersion < 2 {
notUpgradedCount++
}
} else {
failedCount++
}
}
s.syncedVideosMux.RUnlock()
log.Debugf("We already allocated credits for %d videos", numPublished)
if numOnSource-numPublished > s.Manager.videosLimit {
numOnSource = s.Manager.videosLimit
log.Debugf("We already allocated credits for %d videos", publishedCount)
if videosOnYoutube > s.Manager.videosLimit {
videosOnYoutube = s.Manager.videosLimit
}
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount
unallocatedVideos := videosOnYoutube - publishedCount
requiredBalance := float64(unallocatedVideos)*(publishAmount+estimatedMaxTxFee) + channelClaimAmount
if s.Manager.upgradeMetadata {
videosToUpgrade := 0
for _, v := range s.syncedVideos {
if v.Published && v.MetadataVersion < 2 {
videosToUpgrade++
requiredBalance += float64(notUpgradedCount) * 0.001
}
refillAmount := 0.0
if balance < requiredBalance || balance < minimumAccountBalance {
refillAmount = math.Max(requiredBalance-requiredBalance, minimumRefillAmount)
}
minBalance += float64(videosToUpgrade) * 0.001
}
if numPublished > numOnSource && balance < minimumRefillAmount {
SendErrorToSlack("something is going on as we published more videos than those available on source: %d/%d", numPublished, numOnSource)
minBalance = minimumRefillAmount
}
amountToAdd := minBalance - balance
if s.Refill > 0 {
if amountToAdd < 0 {
amountToAdd = float64(s.Refill)
} else {
amountToAdd += float64(s.Refill)
}
refillAmount += float64(s.Refill)
}
if amountToAdd > 0 {
if amountToAdd < minimumRefillAmount {
amountToAdd = minimumRefillAmount
}
err := s.addCredits(amountToAdd)
if refillAmount > 0 {
err := s.addCredits(refillAmount)
if err != nil {
return errors.Err(err)
}
@ -187,13 +184,10 @@ func (s *Sync) ensureEnoughUTXOs() error {
if err != nil {
return errors.Err(err)
}
broadcastFee := 0.01
amountToSplit := fmt.Sprintf("%.6f", balanceAmount-broadcastFee)
desiredUTXOCount := uint64(math.Floor((balanceAmount - broadcastFee) / 0.1))
desiredUTXOCount := uint64(math.Floor((balanceAmount) / 0.1))
log.Infof("Splitting balance of %s evenly between %d UTXOs", *balance, desiredUTXOCount)
prefillTx, err := s.daemon.AccountFund(defaultAccount, defaultAccount, amountToSplit, desiredUTXOCount)
prefillTx, err := s.daemon.AccountFund(defaultAccount, defaultAccount, "0.0", desiredUTXOCount, true)
if err != nil {
return err
} else if prefillTx == nil {

View file

@ -38,6 +38,9 @@ import (
const (
channelClaimAmount = 0.01
estimatedMaxTxFee = 0.1
minimumAccountBalance = 4.0
minimumRefillAmount = 4
publishAmount = 0.01
maxReasonLength = 500
)
@ -48,7 +51,7 @@ type video interface {
IDAndNum() string
PlaylistPosition() int
PublishedAt() time.Time
Sync(*jsonrpc.Client, sources.SyncParams, *sdk.SyncedVideo, bool) (*sources.SyncSummary, error)
Sync(*jsonrpc.Client, sources.SyncParams, *sdk.SyncedVideo, bool, *sync.RWMutex) (*sources.SyncSummary, error)
}
// sorting videos
@ -83,7 +86,7 @@ type Sync struct {
grp *stop.Group
lbryChannelID string
namer *namer.Namer
walletMux *sync.Mutex
walletMux *sync.RWMutex
queue chan video
}
@ -254,7 +257,7 @@ func (s *Sync) FullCycle() (e error) {
s.setExceptions()
s.syncedVideosMux = &sync.RWMutex{}
s.walletMux = &sync.Mutex{}
s.walletMux = &sync.RWMutex{}
s.grp = stop.New()
s.queue = make(chan video)
interruptChan := make(chan os.Signal, 1)
@ -899,7 +902,7 @@ func (s *Sync) processVideo(v video) (err error) {
Fee: s.Fee,
}
summary, err := v.Sync(s.daemon, sp, &sv, videoRequiresUpgrade)
summary, err := v.Sync(s.daemon, sp, &sv, videoRequiresUpgrade, s.walletMux)
if err != nil {
return err
}

View file

@ -2,6 +2,7 @@ package sources
import (
"strings"
"sync"
"github.com/lbryio/lbry.go/extras/jsonrpc"
"github.com/lbryio/ytsync/namer"
@ -12,7 +13,9 @@ type SyncSummary struct {
ClaimName string
}
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.StreamCreateOptions, namer *namer.Namer) (*SyncSummary, error) {
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.StreamCreateOptions, namer *namer.Namer, walletLock *sync.RWMutex) (*SyncSummary, error) {
walletLock.RLock()
defer walletLock.RUnlock()
for {
name := namer.GetNextName(title)
response, err := daemon.StreamCreate(name, filename, amount, options)

View file

@ -10,6 +10,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/lbryio/lbry.go/extras/errors"
@ -46,6 +47,7 @@ type YoutubeVideo struct {
thumbnailURL string
lbryChannelID string
mocked bool
walletLock *sync.RWMutex
}
const reflectorURL = "http://blobs.lbry.io/"
@ -355,7 +357,7 @@ func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, params SyncParams) (*Sync
if err != nil {
return nil, err
}
return publishAndRetryExistingNames(daemon, v.title, downloadPath, params.Amount, options, params.Namer)
return publishAndRetryExistingNames(daemon, v.title, downloadPath, params.Amount, options, params.Namer, v.walletLock)
}
func (v *YoutubeVideo) Size() *int64 {
@ -372,7 +374,7 @@ type SyncParams struct {
Fee *sdk.Fee
}
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, params SyncParams, existingVideoData *sdk.SyncedVideo, reprocess bool) (*SyncSummary, error) {
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, params SyncParams, existingVideoData *sdk.SyncedVideo, reprocess bool, walletLock *sync.RWMutex) (*SyncSummary, error) {
v.maxVideoSize = int64(params.MaxVideoSize) * 1024 * 1024
v.maxVideoLength = params.MaxVideoLength
v.lbryChannelID = params.ChannelID
@ -499,6 +501,8 @@ func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, exis
Fee: fee,
}
v.walletLock.RLock()
defer v.walletLock.RUnlock()
if v.mocked {
pr, err := daemon.StreamUpdate(existingVideoData.ClaimID, jsonrpc.StreamUpdateOptions{
StreamCreateOptions: streamCreateOptions,

View file

@ -392,11 +392,13 @@ const (
SEIJIHITO = "UCNqUrLE6dI8fWw_u3HQkpXA"
anvithavlogs = "UCsP9pYat2DEBvnvF_iFGG_w"
YoelRekts = "UCZ_BcFyhIo6GdtTSrqXXepg"
TechFox = "UCIp-oTSdFO7BhAJpW2d5HMQ"
)
var channelWideTags = map[string][]string{
JustJuggling: {"juggling", "circus arts", "malabares"},
SwissExperiments: {"science & technology", "experiments", "switzerland"},
TechFox: {"technology", "reviews"},
misszizi: {"art", "pop culture"},
creationshub: {"art"},
AlicePandora: {"art"},