automatically split balance into multiple addresses

This commit is contained in:
Alex Grintsvayg 2017-11-02 11:20:22 -04:00
parent ffa93be2c5
commit 44417d12d9
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
7 changed files with 255 additions and 24 deletions

View file

@ -3,10 +3,8 @@ BINARY=lbry
DIR = $(shell cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd)
VENDOR_DIR = vendor
VERSION=$(shell git --git-dir=${DIR}/.git describe --dirty --always)
COMMIT=$(shell git --git-dir=${DIR}/.git rev-parse --short HEAD)
BRANCH=$(shell git --git-dir=${DIR}/.git rev-parse --abbrev-ref HEAD)
LDFLAGS = -ldflags "-X main.VERSION=${VERSION} -X main.COMMIT=${COMMIT} -X main.BRANCH=${BRANCH}"
VERSION=$(shell git --git-dir=${DIR}/.git describe --dirty --always --long --abbrev=7)
LDFLAGS = -ldflags "-X main.Version=${VERSION}"
.PHONY: build dep clean

35
cmd/count.go Normal file
View file

@ -0,0 +1,35 @@
package cmd
import (
sync "github.com/lbryio/lbry.go/ytsync"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
func init() {
var ytCountCmd = &cobra.Command{
Use: "ytcount <youtube_api_key> <youtube_channel_id>",
Args: cobra.ExactArgs(2),
Short: "Count videos in a youtube channel",
Run: ytcount,
}
RootCmd.AddCommand(ytCountCmd)
}
func ytcount(cmd *cobra.Command, args []string) {
ytAPIKey := args[0]
channelID := args[1]
s := sync.Sync{
YoutubeAPIKey: ytAPIKey,
YoutubeChannelID: channelID,
}
count, err := s.CountVideos()
if err != nil {
panic(err)
}
log.Printf("%d videos in channel %s\n", count, channelID)
}

View file

@ -3,6 +3,7 @@ package cmd
import (
sync "github.com/lbryio/lbry.go/ytsync"
"github.com/go-errors/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
@ -54,6 +55,10 @@ func ytsync(cmd *cobra.Command, args []string) {
err := s.Go()
if err != nil {
panic(err)
if wrappedError, ok := err.(*errors.Error); ok {
log.Error(wrappedError.Error() + "\n" + string(wrappedError.Stack()))
} else {
panic(err)
}
}
}

View file

@ -2,14 +2,15 @@ package jsonrpc
import (
"encoding/json"
"errors"
"fmt"
"reflect"
"sort"
"strconv"
"strings"
"github.com/go-errors/errors"
"github.com/mitchellh/mapstructure"
"github.com/shopspring/decimal"
log "github.com/sirupsen/logrus"
"github.com/ybbus/jsonrpc"
)
@ -43,10 +44,34 @@ func decode(data interface{}, targetStruct interface{}) error {
decoder, err := mapstructure.NewDecoder(config)
if err != nil {
return err
return errors.Wrap(err, 0)
}
return decoder.Decode(data)
err = decoder.Decode(data)
if err != nil {
return errors.Wrap(err, 0)
}
return nil
}
func decodeNumber(data interface{}) (decimal.Decimal, error) {
var number string
switch d := data.(type) {
case json.Number:
number = d.String()
case string:
number = d
default:
return decimal.Decimal{}, errors.New("unexpected number type")
}
dec, err := decimal.NewFromString(number)
if err != nil {
return decimal.Decimal{}, errors.Wrap(err, 0)
}
return dec, nil
}
func debugParams(params map[string]interface{}) string {
@ -69,7 +94,7 @@ func (d *Client) callNoDecode(command string, params map[string]interface{}) (in
log.Debugln("jsonrpc: " + command + " " + debugParams(params))
r, err := d.conn.CallNamed(command, params)
if err != nil {
return nil, err
return nil, errors.Wrap(err, 0)
}
if r.Error != nil {
@ -98,8 +123,28 @@ func (d *Client) Status() (*StatusResponse, error) {
}
func (d *Client) WalletBalance() (*WalletBalanceResponse, error) {
response := new(WalletBalanceResponse)
return response, d.call(response, "wallet_balance", map[string]interface{}{})
rawResponse, err := d.callNoDecode("wallet_balance", map[string]interface{}{})
if err != nil {
return nil, err
}
dec, err := decodeNumber(rawResponse)
if err != nil {
return nil, err
}
response := WalletBalanceResponse(dec)
return &response, nil
}
func (d *Client) WalletList() (*WalletListResponse, error) {
response := new(WalletListResponse)
return response, d.call(response, "wallet_list", map[string]interface{}{})
}
func (d *Client) UTXOList() (*UTXOListResponse, error) {
response := new(UTXOListResponse)
return response, d.call(response, "utxo_list", map[string]interface{}{})
}
func (d *Client) Version() (*VersionResponse, error) {
@ -172,7 +217,7 @@ func (d *Client) PeerList(blobHash string, timeout *uint) (*PeerListResponse, er
portNum, err := port.Int64()
if err != nil {
return nil, err
return nil, errors.Wrap(err, 0)
} else if portNum < 0 {
return nil, errors.New("invalid port in peer_list response")
}
@ -207,11 +252,21 @@ func (d *Client) BlobGet(blobHash string, encoding *string, timeout *uint) (*Blo
}
func (d *Client) StreamCostEstimate(url string, size *uint64) (*StreamCostEstimateResponse, error) {
response := new(StreamCostEstimateResponse)
return response, d.call(response, "stream_cost_estimate", map[string]interface{}{
rawResponse, err := d.callNoDecode("stream_cost_estimate", map[string]interface{}{
"uri": url,
"size": size,
})
if err != nil {
return nil, err
}
dec, err := decodeNumber(rawResponse)
if err != nil {
return nil, err
}
response := StreamCostEstimateResponse(dec)
return &response, nil
}
type FileListOptions struct {
@ -257,11 +312,6 @@ func (d *Client) ChannelListMine() (*ChannelListMineResponse, error) {
return response, d.call(response, "channel_list_mine", map[string]interface{}{})
}
func (d *Client) WalletList() (*WalletListResponse, error) {
response := new(WalletListResponse)
return response, d.call(response, "wallet_list", map[string]interface{}{})
}
type PublishOptions struct {
Fee *Fee
Title *string
@ -310,3 +360,15 @@ func (d *Client) BlobAnnounce(blobHash, sdHash, streamHash *string) (*BlobAnnoun
"sd_hash": sdHash,
})
}
func (d *Client) WalletPrefillAddresses(numAddresses int, amount decimal.Decimal, broadcast bool) (*WalletPrefillAddressesResponse, error) {
if numAddresses < 1 {
return nil, errors.New("must create at least 1 address")
}
response := new(WalletPrefillAddressesResponse)
return response, d.call(response, "wallet_prefill_addresses", map[string]interface{}{
"num_addresses": numAddresses,
"amount": amount,
"no_broadcast": !broadcast,
})
}

View file

@ -95,7 +95,7 @@ func fixDecodeProto(src, dest reflect.Type, data interface{}) (interface{}, erro
if n, ok := data.(json.Number); ok {
val, err := n.Int64()
if err != nil {
return nil, err
return nil, errors.Wrap(err, 0)
} else if val < 0 {
return nil, errors.New("must be unsigned int")
}
@ -110,13 +110,13 @@ func fixDecodeProto(src, dest reflect.Type, data interface{}) (interface{}, erro
if n, ok := data.(json.Number); ok {
val, err := n.Float64()
if err != nil {
return nil, err
return nil, errors.Wrap(err, 0)
}
return decimal.NewFromFloat(val), nil
} else if s, ok := data.(string); ok {
d, err := decimal.NewFromString(s)
if err != nil {
return nil, err
return nil, errors.Wrap(err, 0)
}
return d, nil
}
@ -289,3 +289,21 @@ type PublishResponse struct {
}
type BlobAnnounceResponse bool
type WalletPrefillAddressesResponse struct {
Broadcast bool `json:"broadcast"`
Complete bool `json:"complete"`
Hex string `json:"hex"`
}
type UTXOListResponse []struct {
Address string `json:"address"`
Amount decimal.Decimal `json:"amount"`
Height int `json:"height"`
IsClaim bool `json:"is_claim"`
IsCoinbase bool `json:"is_coinbase"`
IsSupport bool `json:"is_support"`
IsUpdate bool `json:"is_update"`
Nout int `json:"nout"`
Txid string `json:"txid"`
}

View file

@ -6,6 +6,8 @@ import (
log "github.com/sirupsen/logrus"
)
var Version string
func main() {
log.SetLevel(log.DebugLevel)
cmd.Execute()

View file

@ -18,6 +18,7 @@ import (
"github.com/garyburd/redigo/redis"
"github.com/go-errors/errors"
ytdl "github.com/kkdai/youtube"
"github.com/shopspring/decimal"
log "github.com/sirupsen/logrus"
"google.golang.org/api/googleapi/transport"
"google.golang.org/api/youtube/v3"
@ -63,7 +64,7 @@ func (s *Sync) init() error {
s.videoDirectory, err = ioutil.TempDir("", "ytsync")
if err != nil {
return err
return errors.Wrap(err, 0)
}
s.daemon = jsonrpc.NewClient("")
@ -79,6 +80,11 @@ func (s *Sync) init() error {
return errors.New("found blank claim address")
}
err = s.ensureEnoughUTXOs()
if err != nil {
return err
}
if s.LbryChannelName != "" {
err = s.ensureChannelOwnership()
if err != nil {
@ -89,6 +95,28 @@ func (s *Sync) init() error {
return nil
}
func (s *Sync) CountVideos() (uint64, error) {
client := &http.Client{
Transport: &transport.APIKey{Key: s.YoutubeAPIKey},
}
service, err := youtube.New(client)
if err != nil {
return 0, errors.WrapPrefix(err, "error creating YouTube service", 0)
}
response, err := service.Channels.List("statistics").Id(s.YoutubeChannelID).Do()
if err != nil {
return 0, errors.WrapPrefix(err, "error getting channels", 0)
}
if len(response.Items) < 1 {
return 0, errors.New("youtube channel not found")
}
return response.Items[0].Statistics.VideoCount, nil
}
func (s *Sync) Go() error {
var err error
@ -143,7 +171,11 @@ func (s *Sync) Go() error {
strings.Contains(err.Error(), " reason: 'This video contains content from") {
log.Println("This error should not be retried at all")
} else if tryCount >= s.MaxTries {
log.Println("Video failed after " + strconv.Itoa(s.MaxTries) + " retries, moving on")
log.Println("Video failed after " + strconv.Itoa(s.MaxTries) + " retries, exiting")
videoErrored.Store(true)
sendStopEnqueuing.Do(func() {
queueStopChan <- struct{}{}
})
} else {
log.Println("Retrying")
continue
@ -162,6 +194,82 @@ func (s *Sync) Go() error {
return err
}
func (s *Sync) ensureEnoughUTXOs() error {
utxolist, err := s.daemon.UTXOList()
if err != nil {
return err
} else if utxolist == nil {
return errors.New("no response")
}
target := 50
count := 0
for _, utxo := range *utxolist {
if !utxo.IsClaim && !utxo.IsSupport && !utxo.IsUpdate && utxo.Amount.Cmp(decimal.New(0, 0)) == 1 {
count++
}
}
if count < target {
newAddresses := target - count
balance, err := s.daemon.WalletBalance()
if err != nil {
return err
} else if balance == nil {
return errors.New("no response")
}
amountPerAddress := decimal.Decimal(*balance).Div(decimal.NewFromFloat(float64(target)))
log.Infof("Putting %s credits into each of %d new addresses", amountPerAddress.String(), newAddresses)
prefillTx, err := s.daemon.WalletPrefillAddresses(newAddresses, amountPerAddress, true)
if err != nil {
return err
} else if prefillTx == nil {
return errors.New("no response")
} else if !prefillTx.Complete || !prefillTx.Broadcast {
return errors.New("failed to prefill addresses")
}
wait := 15 * time.Second
log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new addresses")
time.Sleep(wait)
log.Println("Creating UTXOs and waiting for them to be confirmed")
s.waitUntilUTXOsConfirmed()
}
return nil
}
func (s *Sync) waitUntilUTXOsConfirmed() error {
for {
r, err := s.daemon.UTXOList()
if err != nil {
return err
} else if r == nil {
return errors.New("no response")
}
allConfirmed := true
for _, utxo := range *r {
if utxo.Height == 0 {
allConfirmed = false
break
}
}
if allConfirmed {
return nil
}
wait := 30 * time.Second
log.Println("Waiting " + wait.String() + "...")
time.Sleep(wait)
}
}
func (s *Sync) ensureChannelOwnership() error {
channels, err := s.daemon.ChannelListMine()
if err != nil {
@ -292,6 +400,9 @@ Enqueue:
func (s *Sync) processVideo(v video) error {
log.Println("Processing " + v.id + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)")
defer func(start time.Time) {
log.Println(v.id + " took " + time.Since(start).String())
}(time.Now())
conn := s.redisPool.Get()
defer conn.Close()