From a8a7473714795f4d4ac08833e0bf0b489d81ae50 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Fri, 3 Nov 2017 08:46:27 -0400 Subject: [PATCH] start-to-finish sync of a single channel --- cmd/ytsync.go | 2 +- jsonrpc/daemon.go | 30 ++++ jsonrpc/daemon_types.go | 4 + lbrycrd/client.go | 160 ++++++++++++++++++++ lbrycrd/client_types.go | 313 ++++++++++++++++++++++++++++++++++++++++ ytsync/ytsync.go | 136 +++++++++++++++-- 6 files changed, 634 insertions(+), 11 deletions(-) create mode 100644 lbrycrd/client.go create mode 100644 lbrycrd/client_types.go diff --git a/cmd/ytsync.go b/cmd/ytsync.go index 5a23630..b3500b6 100644 --- a/cmd/ytsync.go +++ b/cmd/ytsync.go @@ -53,7 +53,7 @@ func ytsync(cmd *cobra.Command, args []string) { ConcurrentVideos: 1, } - err := s.Go() + err := s.FullCycle() if err != nil { if wrappedError, ok := err.(*errors.Error); ok { log.Error(wrappedError.Error() + "\n" + string(wrappedError.Stack())) diff --git a/jsonrpc/daemon.go b/jsonrpc/daemon.go index 89cc9e6..04abf2a 100644 --- a/jsonrpc/daemon.go +++ b/jsonrpc/daemon.go @@ -372,3 +372,33 @@ func (d *Client) WalletPrefillAddresses(numAddresses int, amount decimal.Decimal "no_broadcast": !broadcast, }) } + +func (d *Client) WalletNewAddress() (*WalletNewAddressResponse, error) { + rawResponse, err := d.callNoDecode("wallet_new_address", map[string]interface{}{}) + if err != nil { + return nil, err + } + + address, ok := rawResponse.(string) + if !ok { + return nil, errors.New("unexpected response") + } + + response := WalletNewAddressResponse(address) + return &response, nil +} + +func (d *Client) WalletUnusedAddress() (*WalletUnusedAddressResponse, error) { + rawResponse, err := d.callNoDecode("wallet_unused_address", map[string]interface{}{}) + if err != nil { + return nil, err + } + + address, ok := rawResponse.(string) + if !ok { + return nil, errors.New("unexpected response") + } + + response := WalletUnusedAddressResponse(address) + return &response, nil +} diff --git a/jsonrpc/daemon_types.go b/jsonrpc/daemon_types.go index f113701..b29ba24 100644 --- a/jsonrpc/daemon_types.go +++ b/jsonrpc/daemon_types.go @@ -307,3 +307,7 @@ type UTXOListResponse []struct { Nout int `json:"nout"` Txid string `json:"txid"` } + +type WalletNewAddressResponse string + +type WalletUnusedAddressResponse string diff --git a/lbrycrd/client.go b/lbrycrd/client.go new file mode 100644 index 0000000..d6ef233 --- /dev/null +++ b/lbrycrd/client.go @@ -0,0 +1,160 @@ +package lbrycrd + +import ( + e "errors" + "net/url" + "os" + "strconv" + + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcrpcclient" + "github.com/btcsuite/btcutil" + "github.com/go-errors/errors" + "github.com/go-ini/ini" +) + +const DefaultPort = 9245 + +// MainNetParams define the lbrycrd network. See https://github.com/lbryio/lbrycrd/blob/master/src/chainparams.cpp +var MainNetParams = chaincfg.Params{ + PubKeyHashAddrID: 0x55, + ScriptHashAddrID: 0x7a, + PrivateKeyID: 0x1c, +} + +func init() { + // Register lbrycrd network + err := chaincfg.Register(&MainNetParams) + if err != nil { + panic("failed to register lbrycrd network: " + err.Error()) + } +} + +// Client connects to a lbrycrd instance +type Client struct { + *btcrpcclient.Client +} + +// New initializes a new Client +func New(lbrycrdURL string) (*Client, error) { + // Connect to local bitcoin core RPC server using HTTP POST mode. + + u, err := url.Parse(lbrycrdURL) + if err != nil { + return nil, errors.Wrap(err, 0) + } + + if u.User == nil { + return nil, errors.New("no userinfo") + } + + password, _ := u.User.Password() + + connCfg := &btcrpcclient.ConnConfig{ + Host: u.Host, + User: u.User.Username(), + Pass: password, + HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode + DisableTLS: true, // Bitcoin core does not provide TLS by default + } + // Notice the notification parameter is nil since notifications are not supported in HTTP POST mode. + client, err := btcrpcclient.New(connCfg, nil) + if err != nil { + return nil, errors.Wrap(err, 0) + } + + return &Client{client}, nil +} + +func NewWithDefaultURL() (*Client, error) { + url, err := getLbrycrdURLFromConfFile() + if err != nil { + return nil, err + } + return New(url) +} + +var errInsufficientFunds = e.New("insufficient funds") + +// SimpleSend is a convenience function to send credits to an address (0 min confirmations) +func (c *Client) SimpleSend(toAddress string, amount float64) (*chainhash.Hash, error) { + decodedAddress, err := btcutil.DecodeAddress(toAddress, &MainNetParams) + if err != nil { + return nil, errors.Wrap(err, 0) + } + + lbcAmount, err := btcutil.NewAmount(amount) + if err != nil { + return nil, errors.Wrap(err, 0) + } + + hash, err := c.Client.SendFromMinConf("", decodedAddress, lbcAmount, 0) + if err != nil && err.Error() == "-6: Insufficient funds" { + err = errors.Wrap(errInsufficientFunds, 0) + } + return hash, errors.Wrap(err, 0) +} + +//func (c *Client) SendWithSplit(toAddress string, amount float64, numUTXOs int) (*chainhash.Hash, error) { +// decodedAddress, err := btcutil.DecodeAddress(toAddress, &MainNetParams) +// if err != nil { +// return nil, errors.Wrap(err, 0) +// } +// +// amountPerAddress, err := btcutil.NewAmount(amount / float64(numUTXOs)) +// if err != nil { +// return nil, errors.Wrap(err, 0) +// } +// +// amounts := map[btcutil.Address]btcutil.Amount{} +// for i := 0; i < numUTXOs; i++ { +// addr := decodedAddress // to give it a new address, so +// amounts[addr] = amountPerAddress +// } +// +// hash, err := c.Client.SendManyMinConf("", amounts, 0) +// if err != nil && err.Error() == "-6: Insufficient funds" { +// err = errors.Wrap(errInsufficientFunds, 0) +// } +// return hash, errors.Wrap(err, 0) +//} + +func getLbrycrdURLFromConfFile() (string, error) { + if os.Getenv("HOME") == "" { + return "", errors.New("no $HOME var found") + } + + defaultConfFile := os.Getenv("HOME") + "/.lbrycrd/lbrycrd.conf" + if _, err := os.Stat(defaultConfFile); os.IsNotExist(err) { + return "", errors.New("default lbrycrd conf file not found") + } + + cfg, err := ini.Load(defaultConfFile) + if err != nil { + return "", errors.Wrap(err, 0) + } + + section, err := cfg.GetSection("") + if err != nil { + return "", errors.Wrap(err, 0) + } + + username := section.Key("rpcuser").String() + password := section.Key("rpcpassword").String() + host := section.Key("rpchost").String() + if host == "" { + host = "localhost" + } + port := section.Key("rpcport").String() + if port == "" { + port = strconv.Itoa(DefaultPort) + } + + userpass := "" + if username != "" || password != "" { + userpass = username + ":" + password + "@" + } + + return "rpc://" + userpass + host + ":" + port, nil +} diff --git a/lbrycrd/client_types.go b/lbrycrd/client_types.go new file mode 100644 index 0000000..09fa887 --- /dev/null +++ b/lbrycrd/client_types.go @@ -0,0 +1,313 @@ +package lbrycrd + +import ( + "encoding/json" + "reflect" + + lbryschema "github.com/lbryio/lbryschema.go/pb" + + "github.com/go-errors/errors" + "github.com/shopspring/decimal" +) + +type Currency string + +const ( + CurrencyLBC = Currency("LBC") + CurrencyUSD = Currency("USD") + CurrencyBTC = Currency("BTC") +) + +type Fee struct { + Currency Currency `json:"currency"` + Amount decimal.Decimal `json:"amount"` + Address *string `json:"address"` +} + +type Support struct { + Amount decimal.Decimal `json:"amount"` + Nout int `json:"nout"` + Txid string `json:"txid"` +} + +type Claim struct { + Address string `json:"address"` + Amount decimal.Decimal `json:"amount"` + ClaimID string `json:"claim_id"` + ClaimSequence int `json:"claim_sequence"` + DecodedClaim bool `json:"decoded_claim"` + Depth int `json:"depth"` + EffectiveAmount decimal.Decimal `json:"effective_amount"` + Height int `json:"height"` + Hex string `json:"hex"` + Name string `json:"name"` + Nout int `json:"nout"` + Supports []Support `json:"supports"` + Txid string `json:"txid"` + ValidAtHeight int `json:"valid_at_height"` + Value lbryschema.Claim `json:"value"` + Error *string `json:"error,omitempty"` + ChannelName *string `json:"channel_name,omitempty"` + HasSignature *bool `json:"has_signature,omitempty"` + SignatureIsValid *bool `json:"signature_is_valid,omitempty"` +} + +type File struct { + ClaimID string `json:"claim_id"` + Completed bool `json:"completed"` + DownloadDirectory string `json:"download_directory"` + DownloadPath string `json:"download_path"` + FileName string `json:"file_name"` + Key string `json:"key"` + Message string `json:"message"` + Metadata *lbryschema.Claim `json:"metadata"` + MimeType string `json:"mime_type"` + Name string `json:"name"` + Outpoint string `json:"outpoint"` + PointsPaid decimal.Decimal `json:"points_paid"` + SdHash string `json:"sd_hash"` + Stopped bool `json:"stopped"` + StreamHash string `json:"stream_hash"` + StreamName string `json:"stream_name"` + SuggestedFileName string `json:"suggested_file_name"` + TotalBytes uint64 `json:"total_bytes"` + WrittenBytes uint64 `json:"written_bytes"` + ChannelName *string `json:"channel_name,omitempty"` + HasSignature *bool `json:"has_signature,omitempty"` + SignatureIsValid *bool `json:"signature_is_valid,omitempty"` +} + +func getEnumVal(enum map[string]int32, data interface{}) (int32, error) { + s, ok := data.(string) + if !ok { + return 0, errors.New("expected a string") + } + val, ok := enum[s] + if !ok { + return 0, errors.New("invalid enum key") + } + return val, nil +} + +func fixDecodeProto(src, dest reflect.Type, data interface{}) (interface{}, error) { + switch dest { + case reflect.TypeOf(uint64(0)): + if n, ok := data.(json.Number); ok { + val, err := n.Int64() + if err != nil { + return nil, errors.Wrap(err, 0) + } else if val < 0 { + return nil, errors.New("must be unsigned int") + } + return uint64(val), nil + } + case reflect.TypeOf([]byte{}): + if s, ok := data.(string); ok { + return []byte(s), nil + } + + case reflect.TypeOf(decimal.Decimal{}): + if n, ok := data.(json.Number); ok { + val, err := n.Float64() + if err != nil { + return nil, errors.Wrap(err, 0) + } + return decimal.NewFromFloat(val), nil + } else if s, ok := data.(string); ok { + d, err := decimal.NewFromString(s) + if err != nil { + return nil, errors.Wrap(err, 0) + } + return d, nil + } + + case reflect.TypeOf(lbryschema.Metadata_Version(0)): + val, err := getEnumVal(lbryschema.Metadata_Version_value, data) + return lbryschema.Metadata_Version(val), err + case reflect.TypeOf(lbryschema.Metadata_Language(0)): + val, err := getEnumVal(lbryschema.Metadata_Language_value, data) + return lbryschema.Metadata_Language(val), err + + case reflect.TypeOf(lbryschema.Stream_Version(0)): + val, err := getEnumVal(lbryschema.Stream_Version_value, data) + return lbryschema.Stream_Version(val), err + + case reflect.TypeOf(lbryschema.Claim_Version(0)): + val, err := getEnumVal(lbryschema.Claim_Version_value, data) + return lbryschema.Claim_Version(val), err + case reflect.TypeOf(lbryschema.Claim_ClaimType(0)): + val, err := getEnumVal(lbryschema.Claim_ClaimType_value, data) + return lbryschema.Claim_ClaimType(val), err + + case reflect.TypeOf(lbryschema.Fee_Version(0)): + val, err := getEnumVal(lbryschema.Fee_Version_value, data) + return lbryschema.Fee_Version(val), err + case reflect.TypeOf(lbryschema.Fee_Currency(0)): + val, err := getEnumVal(lbryschema.Fee_Currency_value, data) + return lbryschema.Fee_Currency(val), err + + case reflect.TypeOf(lbryschema.Source_Version(0)): + val, err := getEnumVal(lbryschema.Source_Version_value, data) + return lbryschema.Source_Version(val), err + case reflect.TypeOf(lbryschema.Source_SourceTypes(0)): + val, err := getEnumVal(lbryschema.Source_SourceTypes_value, data) + return lbryschema.Source_SourceTypes(val), err + + case reflect.TypeOf(lbryschema.KeyType(0)): + val, err := getEnumVal(lbryschema.KeyType_value, data) + return lbryschema.KeyType(val), err + + case reflect.TypeOf(lbryschema.Signature_Version(0)): + val, err := getEnumVal(lbryschema.Signature_Version_value, data) + return lbryschema.Signature_Version(val), err + + case reflect.TypeOf(lbryschema.Certificate_Version(0)): + val, err := getEnumVal(lbryschema.Certificate_Version_value, data) + return lbryschema.Certificate_Version(val), err + } + + return data, nil +} + +type CommandsResponse []string + +type WalletBalanceResponse decimal.Decimal + +type VersionResponse struct { + Build string `json:"build"` + LbrynetVersion string `json:"lbrynet_version"` + LbryschemaVersion string `json:"lbryschema_version"` + LbryumVersion string `json:"lbryum_version"` + OsRelease string `json:"os_release"` + OsSystem string `json:"os_system"` + Platform string `json:"platform"` + Processor string `json:"processor"` + PythonVersion string `json:"python_version"` +} +type StatusResponse struct { + BlockchainStatus struct { + BestBlockhash string `json:"best_blockhash"` + Blocks int `json:"blocks"` + BlocksBehind int `json:"blocks_behind"` + } `json:"blockchain_status"` + BlocksBehind int `json:"blocks_behind"` + ConnectionStatus struct { + Code string `json:"code"` + Message string `json:"message"` + } `json:"connection_status"` + InstallationID string `json:"installation_id"` + IsFirstRun bool `json:"is_first_run"` + IsRunning bool `json:"is_running"` + LbryID string `json:"lbry_id"` + StartupStatus struct { + Code string `json:"code"` + Message string `json:"message"` + } `json:"startup_status"` +} + +type ClaimListResponse struct { + Claims []Claim `json:"claims"` + LastTakeoverHeight int `json:"last_takeover_height"` + SupportsWithoutClaims []Support `json:"supports_without_claims"` +} + +type ClaimShowResponse Claim + +type PeerListResponsePeer struct { + IP string + Port uint + IsAvailable bool +} +type PeerListResponse []PeerListResponsePeer + +type BlobGetResponse struct { + Blobs []struct { + BlobHash string `json:"blob_hash,omitempty"` + BlobNum int `json:"blob_num"` + IV string `json:"iv"` + Length int `json:"length"` + } `json:"blobs"` + Key string `json:"key"` + StreamHash string `json:"stream_hash"` + StreamName string `json:"stream_name"` + StreamType string `json:"stream_type"` + SuggestedFileName string `json:"suggested_file_name"` +} + +type StreamCostEstimateResponse decimal.Decimal + +type GetResponse File +type FileListResponse []File + +type ResolveResponse map[string]ResolveResponseItem +type ResolveResponseItem struct { + Certificate *Claim `json:"certificate,omitempty"` + Claim *Claim `json:"claim,omitempty"` + ClaimsInChannel *uint64 `json:"claims_in_channel,omitempty"` + Error *string `json:"error,omitempty"` +} + +type ChannelNewResponse struct { + ClaimID string `json:"claim_id"` + Fee decimal.Decimal `json:"fee"` + Nout int `json:"nout"` + Success bool `json:"success"` + Tx string `json:"tx"` + Txid string `json:"txid"` +} + +type ChannelListMineResponse []struct { + Address string `json:"address"` + Amount decimal.Decimal `json:"amount"` + BlocksToExpiration int `json:"blocks_to_expiration"` + CanSign bool `json:"can_sign"` + Category string `json:"category"` + ClaimID string `json:"claim_id"` + Confirmations int `json:"confirmations"` + DecodedClaim bool `json:"decoded_claim"` + ExpirationHeight int `json:"expiration_height"` + Expired bool `json:"expired"` + HasSignature bool `json:"has_signature"` + Height int `json:"height"` + Hex string `json:"hex"` + IsPending bool `json:"is_pending"` + IsSpent bool `json:"is_spent"` + Name string `json:"name"` + Nout int `json:"nout"` + Txid string `json:"txid"` + Value *lbryschema.Claim `json:"value"` +} + +type WalletListResponse []string + +type PublishResponse struct { + ClaimID string `json:"claim_id"` + Fee decimal.Decimal `json:"fee"` + Nout int `json:"nout"` + Tx string `json:"tx"` + Txid string `json:"txid"` +} + +type BlobAnnounceResponse bool + +type WalletPrefillAddressesResponse struct { + Broadcast bool `json:"broadcast"` + Complete bool `json:"complete"` + Hex string `json:"hex"` +} + +type UTXOListResponse []struct { + Address string `json:"address"` + Amount decimal.Decimal `json:"amount"` + Height int `json:"height"` + IsClaim bool `json:"is_claim"` + IsCoinbase bool `json:"is_coinbase"` + IsSupport bool `json:"is_support"` + IsUpdate bool `json:"is_update"` + Nout int `json:"nout"` + Txid string `json:"txid"` +} + +type WalletNewAddressResponse string + +type WalletUnusedAddressResponse string diff --git a/ytsync/ytsync.go b/ytsync/ytsync.go index 94e0170..29b4e66 100644 --- a/ytsync/ytsync.go +++ b/ytsync/ytsync.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "net/http" "os" + "os/exec" "sort" "strconv" "strings" @@ -18,6 +19,7 @@ import ( "github.com/garyburd/redigo/redis" "github.com/go-errors/errors" ytdl "github.com/kkdai/youtube" + "github.com/lbryio/lbry.go/lbrycrd" "github.com/shopspring/decimal" log "github.com/sirupsen/logrus" "google.golang.org/api/googleapi/transport" @@ -46,6 +48,20 @@ type Sync struct { redisPool *redis.Pool } +func (s *Sync) initDaemon() { + if s.daemon == nil { + s.daemon = jsonrpc.NewClient("") + log.Infoln("Waiting for daemon to finish starting...") + for { + _, err := s.daemon.WalletBalance() + if err == nil { + break + } + time.Sleep(2 * time.Second) + } + } +} + func (s *Sync) init() error { var err error @@ -67,15 +83,15 @@ func (s *Sync) init() error { return errors.Wrap(err, 0) } - s.daemon = jsonrpc.NewClient("") + s.initDaemon() - addresses, err := s.daemon.WalletList() + address, err := s.daemon.WalletUnusedAddress() if err != nil { return err - } else if addresses == nil || len(*addresses) == 0 { - return errors.New("could not find an address in wallet") + } else if address == nil { + return errors.New("could not get unused address") } - s.claimAddress = (*addresses)[0] + s.claimAddress = string(*address) if s.claimAddress == "" { return errors.New("found blank claim address") } @@ -117,6 +133,81 @@ func (s *Sync) CountVideos() (uint64, error) { return response.Items[0].Statistics.VideoCount, nil } +func (s *Sync) FullCycle() error { + walletDir := os.Getenv("HOME") + "/wallets/" + strings.Replace(s.LbryChannelName, "@", "", 1) + if _, err := os.Stat(walletDir); !os.IsNotExist(err) { + return errors.New("channel has already been uploaded, and wallet is in " + walletDir) + } + + err := s.startDaemonViaSystemd() + if err != nil { + return err + } + + s.initDaemon() + + addressResp, err := s.daemon.WalletUnusedAddress() + if err != nil { + return err + } else if addressResp == nil { + return errors.New("no response") + } + address := string(*addressResp) + + count, err := s.CountVideos() + if err != nil { + return err + } + initialAmount := float64(count)*publishAmount + channelClaimAmount + 5 // +5 for fees and such + + log.Printf("Loading wallet with %f initial credits", initialAmount) + lbrycrdd, err := lbrycrd.NewWithDefaultURL() + if err != nil { + return err + } + lbrycrdd.SimpleSend(address, initialAmount) + //lbrycrdd.SendWithSplit(address, initialAmount, 50) + + wait := 15 * time.Second + log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new transaction") + time.Sleep(wait) + + log.Println("Waiting for transaction to be confirmed") + err = s.waitUntilUTXOsConfirmed() + if err != nil { + return err + } + + err = s.Go() + if err != nil { + return err + } + + // wait for reflection to finish??? + wait = 15 * time.Second // should bump this up to a few min, but keeping it low for testing + log.Println("Waiting " + wait.String() + " to finish reflecting everything") + time.Sleep(wait) + + log.Printf("Stopping daemon") + err = s.stopDaemonViaSystemd() + if err != nil { + return err + } + + // move wallet + if os.Getenv("HOME") == "" { + return errors.New("could not move lbryum dir - no $HOME var found") + } + + lbryumDir := os.Getenv("HOME") + "/.lbryum" + err = os.Rename(lbryumDir, walletDir) + if err != nil { + return errors.Wrap(err, 0) + } + + return nil +} + func (s *Sync) Go() error { var err error @@ -221,6 +312,8 @@ func (s *Sync) ensureEnoughUTXOs() error { return errors.New("no response") } + log.Println("balance is " + decimal.Decimal(*balance).String()) + amountPerAddress := decimal.Decimal(*balance).Div(decimal.NewFromFloat(float64(target))) log.Infof("Putting %s credits into each of %d new addresses", amountPerAddress.String(), newAddresses) prefillTx, err := s.daemon.WalletPrefillAddresses(newAddresses, amountPerAddress, true) @@ -237,7 +330,10 @@ func (s *Sync) ensureEnoughUTXOs() error { time.Sleep(wait) log.Println("Creating UTXOs and waiting for them to be confirmed") - s.waitUntilUTXOsConfirmed() + err = s.waitUntilUTXOsConfirmed() + if err != nil { + return err + } } return nil @@ -253,10 +349,14 @@ func (s *Sync) waitUntilUTXOsConfirmed() error { } allConfirmed := true - for _, utxo := range *r { - if utxo.Height == 0 { - allConfirmed = false - break + if len(*r) < 1 { + allConfirmed = false + } else { + for _, utxo := range *r { + if utxo.Height == 0 { + allConfirmed = false + break + } } } @@ -532,3 +632,19 @@ func (s *Sync) publish(v video, conn redis.Conn) error { return nil } + +func (s *Sync) startDaemonViaSystemd() error { + err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "start", "lbrynet.service").Run() + if err != nil { + return errors.New(err) + } + return nil +} + +func (s *Sync) stopDaemonViaSystemd() error { + err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "stop", "lbrynet.service").Run() + if err != nil { + return errors.New(err) + } + return nil +}