Compare commits

..

1 commit

Author SHA1 Message Date
Roy Lee
be69ea2803 [lbry] claimtrie: created node cache 2022-09-29 11:32:27 -07:00
13 changed files with 134 additions and 396 deletions

View file

@ -977,8 +977,8 @@ func NewImportMultiCmd(requests []ImportMultiRequest, options *ImportMultiOption
// RescanBlockchainCmd defines the RescanBlockchain JSON-RPC command.
type RescanBlockchainCmd struct {
StartHeight *int32 `jsonrpcdefault:"0"`
StopHeight *int32
StartHeight *int64 `jsonrpcdefault:"0"`
StopHeight *int64 `jsonrpcdefault:"0"`
}
// NewRescanBlockchainCmd returns a new instance which can be used to issue
@ -986,7 +986,7 @@ type RescanBlockchainCmd struct {
//
// The parameters which are pointers indicate they are optional. Passing nil
// for optional parameters will use the default value.
func NewRescanBlockchainCmd(startHeight *int32, stopHeight *int32) *RescanBlockchainCmd {
func NewRescanBlockchainCmd(startHeight *int64, stopHeight *int64) *RescanBlockchainCmd {
return &RescanBlockchainCmd{
StartHeight: startHeight,
StopHeight: stopHeight,

View file

@ -319,8 +319,8 @@ type ListUnspentResult struct {
// RescanBlockchainResult models the data returned from the rescanblockchain command.
type RescanBlockchainResult struct {
StartHeight int32 `json:"start_height"`
StoptHeight int32 `json:"stop_height"`
StartHeight int64 `json:"start_height"`
StoptHeight int64 `json:"stop_height"`
}
// SignRawTransactionError models the data that contains script verification

View file

@ -111,8 +111,6 @@ type config struct {
SigNet bool `long:"signet" description:"Connect to signet (default RPC server: localhost:49245)"`
Wallet bool `long:"wallet" description:"Connect to wallet RPC server instead (default: localhost:9244, testnet: localhost:19244, regtest: localhost:29244)"`
ShowVersion bool `short:"V" long:"version" description:"Display version information and exit"`
Timed bool `short:"t" long:"timed" description:"Display RPC response time"`
Quiet bool `short:"q" long:"quiet" description:"Do not output results to stdout"`
}
// normalizeAddress returns addr with the passed default port appended if

View file

@ -9,7 +9,6 @@ import (
"os"
"path/filepath"
"strings"
"time"
"github.com/lbryio/lbcd/btcjson"
)
@ -134,8 +133,6 @@ func main() {
os.Exit(1)
}
started := time.Now()
// Send the JSON-RPC request to the server using the user-specified
// connection configuration.
result, err := sendPostRequest(marshalledJSON, cfg)
@ -144,16 +141,6 @@ func main() {
os.Exit(1)
}
if cfg.Timed {
elapsed := time.Since(started)
defer fmt.Fprintf(os.Stderr, "%s\n", elapsed)
}
var output io.Writer = os.Stdout
if cfg.Quiet {
output = io.Discard
}
// Choose how to display the result based on its type.
strResult := string(result)
if strings.HasPrefix(strResult, "{") || strings.HasPrefix(strResult, "[") {
@ -163,7 +150,7 @@ func main() {
err)
os.Exit(1)
}
fmt.Fprintln(output, dst.String())
fmt.Println(dst.String())
} else if strings.HasPrefix(strResult, `"`) {
var str string
@ -172,9 +159,9 @@ func main() {
err)
os.Exit(1)
}
fmt.Fprintln(output, str)
fmt.Println(str)
} else if strResult != "null" {
fmt.Fprintln(output, strResult)
fmt.Println(strResult)
}
}

View file

@ -1,21 +1,15 @@
# lbcdbloknotify
# lbcd Websockets Example
This bridge program subscribes to lbcd's notifications over websockets using the rpcclient package.
Users can specify supported actions upon receiving this notifications.
This example shows how to use the rpcclient package to connect to a btcd RPC
server using TLS-secured websockets, register for block connected and block
disconnected notifications, and get the current block count.
## Building(or Running) the Program
## Running the Example
Clone the lbcd package:
The first step is to clone the lbcd package:
```bash
$ git clone github.com/lbryio/lbcd
$ cd lbcd/rpcclient/examples
# build the program
$ go build .
# or directly run it (build implicitly behind the scene)
$ go run .
```
Display available options:
@ -36,30 +30,19 @@ $ go run . -h
-stratumpass string
Stratum server password (default "password")
-quiet
Do not print periodic logs
Do not print logs
```
Running the program:
Start the program:
```bash
# Send stratum mining.update_block mesage upon receving block connected notifiations.
$ go run . -rpcuser <RPC USERNAME> -rpcpass <RPC PASSWD> --notls -stratum <STRATUM SERVER> -stratumpass <STRATUM PASSWD>
$ go run . -stratumpass <STRATUM PASSWD> -rpcuser <RPC USERNAME> -rpcpass <RPC PASSWD>
2022/01/10 23:16:21 Current block count: 1093112
2022/01/10 23:16:21 NotifyBlocks: Registration Complete
2022/01/10 23:16:21 Block count: 1093112
...
# Execute a custome command (with blockhash) upon receving block connected notifiations.
$ go run . -rpcuser <RPC USERNAME> -rpcpass <RPC PASSWD> --notls -run "echo %s"
```
## Notes
* Stratum TCP connection is persisted with auto-reconnect. (retry backoff increases from 1s to 60s maximum)
* Stratum update_block jobs on previous notifications are canceled when a new notification arrives.
Usually, the jobs are so short and completed immediately. However, if the Stratum connection is broken, this
prevents the bridge from accumulating stale jobs.
## License
This example is licensed under the [copyfree](http://copyfree.org) ISC License.

View file

@ -1,20 +0,0 @@
package main
import (
"github.com/lbryio/lbcd/wire"
"github.com/lbryio/lbcutil"
)
type eventBlockConected struct {
height int32
header *wire.BlockHeader
txns []*lbcutil.Tx
}
type adapter struct {
*bridge
}
func (a *adapter) onFilteredBlockConnected(height int32, header *wire.BlockHeader, txns []*lbcutil.Tx) {
a.eventCh <- &eventBlockConected{height, header, txns}
}

View file

@ -1,172 +0,0 @@
package main
import (
"context"
"errors"
"fmt"
"log"
"net"
"os"
"os/exec"
"strings"
"sync"
"syscall"
"time"
)
type bridge struct {
ctx context.Context
prevJobContext context.Context
prevJobCancel context.CancelFunc
eventCh chan interface{}
errorc chan error
wg sync.WaitGroup
stratum *stratumClient
customCmd string
}
func newBridge(stratumServer, stratumPass, coinid string) *bridge {
s := &bridge{
ctx: context.Background(),
eventCh: make(chan interface{}),
errorc: make(chan error),
}
if len(stratumServer) > 0 {
s.stratum = newStratumClient(stratumServer, stratumPass, coinid)
}
return s
}
func (b *bridge) start() {
if b.stratum != nil {
backoff := time.Second
for {
err := b.stratum.dial()
if err == nil {
break
}
log.Printf("WARN: stratum.dial() error: %s, retry in %s", err, backoff)
time.Sleep(backoff)
if backoff < 60*time.Second {
backoff += time.Second
}
}
}
for e := range b.eventCh {
switch e := e.(type) {
case *eventBlockConected:
b.handleFilteredBlockConnected(e)
default:
b.errorc <- fmt.Errorf("unknown event type: %T", e)
return
}
}
}
func (b *bridge) handleFilteredBlockConnected(e *eventBlockConected) {
if !*quiet {
log.Printf("Block connected: %s (%d) %v", e.header.BlockHash(), e.height, e.header.Timestamp)
}
hash := e.header.BlockHash().String()
height := e.height
// Cancel jobs on previous block. It's safe if they are already done.
if b.prevJobContext != nil {
select {
case <-b.prevJobContext.Done():
log.Printf("prev one canceled")
default:
b.prevJobCancel()
}
}
// Wait until all previous jobs are done or canceled.
b.wg.Wait()
// Create and save cancelable subcontext for new jobs.
ctx, cancel := context.WithCancel(b.ctx)
b.prevJobContext, b.prevJobCancel = ctx, cancel
if len(b.customCmd) > 0 {
go b.execCustomCommand(ctx, hash, height)
}
// Send stratum update block message
if b.stratum != nil {
go b.stratumUpdateBlock(ctx, hash, height)
}
}
func (s *bridge) stratumUpdateBlock(ctx context.Context, hash string, height int32) {
s.wg.Add(1)
defer s.wg.Done()
backoff := time.Second
retry := func(err error) {
if backoff < 60*time.Second {
backoff += time.Second
}
log.Printf("WARN: stratum.send() on block %d error: %s", height, err)
time.Sleep(backoff)
s.stratum.dial()
}
msg := stratumUpdateBlockMsg(*stratumPass, *coinid, hash)
for {
switch err := s.stratum.send(ctx, msg); {
case err == nil:
return
case errors.Is(err, context.Canceled):
log.Printf("INFO: stratum.send() on block %d: %s.", height, err)
return
case errors.Is(err, syscall.EPIPE):
errClose := s.stratum.conn.Close()
if errClose != nil {
log.Printf("WARN: stratum.conn.Close() on block %d: %s.", height, errClose)
}
retry(err)
case errors.Is(err, net.ErrClosed):
retry(err)
default:
retry(err)
}
}
}
func (s *bridge) execCustomCommand(ctx context.Context, hash string, height int32) {
s.wg.Add(1)
defer s.wg.Done()
cmd := strings.ReplaceAll(s.customCmd, "%s", hash)
err := doExecCustomCommand(ctx, cmd)
if err != nil {
log.Printf("ERROR: execCustomCommand on block %s(%d): %s", hash, height, err)
}
}
func doExecCustomCommand(ctx context.Context, cmd string) error {
strs := strings.Split(cmd, " ")
path, err := exec.LookPath(strs[0])
if errors.Is(err, exec.ErrDot) {
err = nil
}
if err != nil {
return err
}
c := exec.CommandContext(ctx, path, strs[1:]...)
c.Stdout = os.Stdout
return c.Run()
}

View file

@ -1,53 +0,0 @@
package main
import (
"io/ioutil"
"log"
"path/filepath"
"github.com/lbryio/lbcd/rpcclient"
)
func newLbcdClient(server, user, pass string, notls bool, adpt adapter) *rpcclient.Client {
ntfnHandlers := rpcclient.NotificationHandlers{
OnFilteredBlockConnected: adpt.onFilteredBlockConnected,
}
// Config lbcd RPC client with websockets.
connCfg := &rpcclient.ConnConfig{
Host: server,
Endpoint: "ws",
User: user,
Pass: pass,
DisableTLS: true,
}
if !notls {
cert, err := ioutil.ReadFile(filepath.Join(lbcdHomeDir, "rpc.cert"))
if err != nil {
log.Fatalf("can't read lbcd certificate: %s", err)
}
connCfg.Certificates = cert
connCfg.DisableTLS = false
}
client, err := rpcclient.New(connCfg, &ntfnHandlers)
if err != nil {
log.Fatalf("can't create rpc client: %s", err)
}
// Register for block connect and disconnect notifications.
if err = client.NotifyBlocks(); err != nil {
log.Fatalf("can't register block notification: %s", err)
}
// Get the current block count.
blockCount, err := client.GetBlockCount()
if err != nil {
log.Fatalf("can't get block count: %s", err)
}
log.Printf("Current block count: %d", blockCount)
return client
}

View file

@ -1,63 +1,136 @@
// Copyright (c) 2014-2017 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"errors"
"flag"
"fmt"
"io/ioutil"
"log"
"net"
"os"
"os/exec"
"path/filepath"
"strings"
"github.com/lbryio/lbcd/rpcclient"
"github.com/lbryio/lbcd/wire"
"github.com/lbryio/lbcutil"
)
var (
lbcdHomeDir = lbcutil.AppDataDir("lbcd", false)
defaultCert = filepath.Join(lbcdHomeDir, "rpc.cert")
)
var (
coinid = flag.String("coinid", "1425", "Coin ID")
stratumServer = flag.String("stratum", "", "Stratum server")
stratumPass = flag.String("stratumpass", "", "Stratum server password")
rpcserver = flag.String("rpcserver", "localhost:9245", "LBCD RPC server")
rpcuser = flag.String("rpcuser", "rpcuser", "LBCD RPC username")
rpcpass = flag.String("rpcpass", "rpcpass", "LBCD RPC password")
rpccert = flag.String("rpccert", defaultCert, "LBCD RPC certificate")
notls = flag.Bool("notls", false, "Connect to LBCD with TLS disabled")
run = flag.String("run", "", "Run custom shell command")
quiet = flag.Bool("quiet", false, "Do not print logs")
coinid = flag.String("coinid", "1425", "Coin ID")
stratum = flag.String("stratum", "", "Stratum server")
stratumPass = flag.String("stratumpass", "", "Stratum server password")
rpcserver = flag.String("rpcserver", "localhost:9245", "LBCD RPC server")
rpcuser = flag.String("rpcuser", "rpcuser", "LBCD RPC username")
rpcpass = flag.String("rpcpass", "rpcpass", "LBCD RPC password")
notls = flag.Bool("notls", false, "Connect to LBCD with TLS disabled")
run = flag.String("run", "", "Run custom shell command")
quiet = flag.Bool("quiet", false, "Do not print logs")
)
func onFilteredBlockConnected(height int32, header *wire.BlockHeader, txns []*lbcutil.Tx) {
blockHash := header.BlockHash().String()
if !*quiet {
log.Printf("Block connected: %v (%d) %v", blockHash, height, header.Timestamp)
}
if cmd := *run; len(cmd) != 0 {
cmd = strings.ReplaceAll(cmd, "%s", blockHash)
err := execCustomCommand(cmd)
if err != nil {
log.Printf("ERROR: execCustomCommand: %s", err)
}
}
if len(*stratum) > 0 && len(*stratumPass) > 0 {
err := stratumUpdateBlock(*stratum, *stratumPass, *coinid, blockHash)
if err != nil {
log.Printf("ERROR: stratumUpdateBlock: %s", err)
}
}
}
func execCustomCommand(cmd string) error {
strs := strings.Split(cmd, " ")
path, err := exec.LookPath(strs[0])
if errors.Is(err, exec.ErrDot) {
err = nil
}
if err != nil {
return err
}
c := exec.Command(path, strs[1:]...)
c.Stdout = os.Stdout
return c.Run()
}
func stratumUpdateBlock(stratum, stratumPass, coinid, blockHash string) error {
addr, err := net.ResolveTCPAddr("tcp", stratum)
if err != nil {
return fmt.Errorf("can't resolve addr: %w", err)
}
conn, err := net.DialTCP("tcp", nil, addr)
if err != nil {
return fmt.Errorf("can't dial tcp: %w", err)
}
defer conn.Close()
msg := fmt.Sprintf(`{"id":1,"method":"mining.update_block","params":[%q,%s,%q]}`,
stratumPass, coinid, blockHash)
_, err = conn.Write([]byte(msg))
if err != nil {
return fmt.Errorf("can't write message: %w", err)
}
return nil
}
func main() {
flag.Parse()
// Setup notification handler
b := newBridge(*stratumServer, *stratumPass, *coinid)
if len(*run) > 0 {
// Check if ccommand exists.
strs := strings.Split(*run, " ")
cmd := strs[0]
_, err := exec.LookPath(cmd)
if err != nil {
log.Fatalf("ERROR: %s not found: %s", cmd, err)
}
b.customCmd = *run
ntfnHandlers := rpcclient.NotificationHandlers{
OnFilteredBlockConnected: onFilteredBlockConnected,
}
// Start the eventt handler.
go b.start()
// Connect to local lbcd RPC server using websockets.
lbcdHomeDir := lbcutil.AppDataDir("lbcd", false)
certs, err := ioutil.ReadFile(filepath.Join(lbcdHomeDir, "rpc.cert"))
if err != nil {
log.Fatalf("can't read lbcd certificate: %s", err)
}
connCfg := &rpcclient.ConnConfig{
Host: *rpcserver,
Endpoint: "ws",
User: *rpcuser,
Pass: *rpcpass,
Certificates: certs,
DisableTLS: *notls,
}
client, err := rpcclient.New(connCfg, &ntfnHandlers)
if err != nil {
log.Fatalf("can't create rpc client: %s", err)
}
// Adaptater receives lbcd notifications, and emit events.
adpt := adapter{b}
// Register for block connect and disconnect notifications.
if err = client.NotifyBlocks(); err != nil {
log.Fatalf("can't register block notification: %s", err)
}
log.Printf("NotifyBlocks: Registration Complete")
client := newLbcdClient(*rpcserver, *rpcuser, *rpcpass, *notls, adpt)
go func() {
err := <-b.errorc
log.Fatalf("ERROR: %s", err)
client.Shutdown()
}()
// Get the current block count.
blockCount, err := client.GetBlockCount()
if err != nil {
log.Fatalf("can't get block count: %s", err)
}
log.Printf("Block count: %d", blockCount)
// Wait until the client either shuts down gracefully (or the user
// terminates the process with Ctrl+C).

View file

@ -1,56 +0,0 @@
package main
import (
"context"
"fmt"
"net"
)
type stratumClient struct {
server string
passwd string
coinid string
conn *net.TCPConn
}
func newStratumClient(server, passwd, coinid string) *stratumClient {
return &stratumClient{
server: server,
}
}
func (c *stratumClient) dial() error {
addr, err := net.ResolveTCPAddr("tcp", c.server)
if err != nil {
return fmt.Errorf("resolve tcp addr: %w", err)
}
conn, err := net.DialTCP("tcp", nil, addr)
if err != nil {
return fmt.Errorf("dial tcp: %w", err)
}
c.conn = conn
return nil
}
func (c *stratumClient) send(ctx context.Context, msg string) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
_, err := c.conn.Write([]byte(msg))
return err
}
func stratumUpdateBlockMsg(stratumPass, coinid, blockHash string) string {
return fmt.Sprintf(`{"id":1,"method":"mining.update_block","params":[%q,%s,%q]}`,
stratumPass, coinid, blockHash)
}

View file

@ -774,8 +774,7 @@ func (c *Client) handleSendPostMessage(jReq *jsonRequest) {
tries := 10
for i := 0; tries == 0 || i < tries; i++ {
bodyReader := bytes.NewReader(jReq.marshalledJSON)
var httpReq *http.Request
httpReq, err = http.NewRequest("POST", url, bodyReader)
httpReq, err := http.NewRequest("POST", url, bodyReader)
if err != nil {
jReq.responseChan <- &Response{result: nil, err: err}
return
@ -787,8 +786,7 @@ func (c *Client) handleSendPostMessage(jReq *jsonRequest) {
}
// Configure basic access authorization.
var user, pass string
user, pass, err = c.config.getAuth()
user, pass, err := c.config.getAuth()
if err != nil {
jReq.responseChan <- &Response{result: nil, err: err}
return

View file

@ -2062,14 +2062,14 @@ func (r FutureRescanBlockchainResult) Receive() (*btcjson.RescanBlockchainResult
// returned instance.
//
// See RescanBlockchain for the blocking version and more details.
func (c *Client) RescanBlockchainAsync(startHeight *int32, stopHeight *int32) FutureRescanBlockchainResult {
func (c *Client) RescanBlockchainAsync(startHeight *int64, stopHeight *int64) FutureRescanBlockchainResult {
cmd := btcjson.NewRescanBlockchainCmd(startHeight, stopHeight)
return c.SendCmd(cmd)
}
// RescanBlockchain rescans the local blockchain for wallet related
// transactions from the startHeight to the the inclusive stopHeight.
func (c *Client) RescanBlockchain(startHeight *int32, stopHeight *int32) (*btcjson.RescanBlockchainResult, error) {
func (c *Client) RescanBlockchain(startHeight *int64, stopHeight *int64) (*btcjson.RescanBlockchainResult, error) {
return c.RescanBlockchainAsync(startHeight, stopHeight).Receive()
}

View file

@ -206,7 +206,7 @@ func StripClaimScriptPrefix(script []byte) []byte {
return script[cs.Size:]
}
const illegalChars = "=&#:$%?/;\\\b\n\t\r\x00"
const illegalChars = "=&#:*$%?/;\\\b\n\t\r\x00"
func AllClaimsAreSane(script []byte, enforceSoftFork bool) error {
cs, err := ExtractClaimScript(script)