diff --git a/rpcclient/examples/lbcdblocknotify/README.md b/rpcclient/examples/lbcdblocknotify/README.md index 55fc19b0..ea1fc777 100644 --- a/rpcclient/examples/lbcdblocknotify/README.md +++ b/rpcclient/examples/lbcdblocknotify/README.md @@ -1,15 +1,21 @@ -# lbcd Websockets Example +# lbcdbloknotify -This example shows how to use the rpcclient package to connect to a btcd RPC -server using TLS-secured websockets, register for block connected and block -disconnected notifications, and get the current block count. +This bridge program subscribes to lbcd's notifications over websockets using the rpcclient package. +Users can specify supported actions upon receiving this notifications. -## Running the Example +## Building(or Running) the Program -The first step is to clone the lbcd package: +Clone the lbcd package: ```bash $ git clone github.com/lbryio/lbcd +$ cd lbcd/rpcclient/examples + +# build the program +$ go build . + +# or directly run it (build implicitly behind the scene) +$ go run . ``` Display available options: @@ -30,19 +36,30 @@ $ go run . -h -stratumpass string Stratum server password (default "password") -quiet - Do not print logs + Do not print periodic logs ``` -Start the program: +Running the program: ```bash -$ go run . -stratumpass -rpcuser -rpcpass +# Send stratum mining.update_block mesage upon receving block connected notifiations. +$ go run . -rpcuser -rpcpass --notls -stratum -stratumpass -2022/01/10 23:16:21 NotifyBlocks: Registration Complete -2022/01/10 23:16:21 Block count: 1093112 +2022/01/10 23:16:21 Current block count: 1093112 ... + +# Execute a custome command (with blockhash) upon receving block connected notifiations. +$ go run . -rpcuser -rpcpass --notls -run "echo %s" ``` +## Notes + +* Stratum TCP connection is persisted with auto-reconnect. (retry backoff increases from 1s to 60s maximum) + +* Stratum update_block jobs on previous notifications are canceled when a new notification arrives. + Usually, the jobs are so short and completed immediately. However, if the Stratum connection is broken, this + prevents the bridge from accumulating stale jobs. + ## License This example is licensed under the [copyfree](http://copyfree.org) ISC License. diff --git a/rpcclient/examples/lbcdblocknotify/adapter.go b/rpcclient/examples/lbcdblocknotify/adapter.go new file mode 100644 index 00000000..b12b0d90 --- /dev/null +++ b/rpcclient/examples/lbcdblocknotify/adapter.go @@ -0,0 +1,20 @@ +package main + +import ( + "github.com/lbryio/lbcd/wire" + "github.com/lbryio/lbcutil" +) + +type eventBlockConected struct { + height int32 + header *wire.BlockHeader + txns []*lbcutil.Tx +} + +type adapter struct { + *bridge +} + +func (a *adapter) onFilteredBlockConnected(height int32, header *wire.BlockHeader, txns []*lbcutil.Tx) { + a.eventCh <- &eventBlockConected{height, header, txns} +} diff --git a/rpcclient/examples/lbcdblocknotify/bridge.go b/rpcclient/examples/lbcdblocknotify/bridge.go new file mode 100644 index 00000000..db5e8cc0 --- /dev/null +++ b/rpcclient/examples/lbcdblocknotify/bridge.go @@ -0,0 +1,172 @@ +package main + +import ( + "context" + "errors" + "fmt" + "log" + "net" + "os" + "os/exec" + "strings" + "sync" + "syscall" + "time" +) + +type bridge struct { + ctx context.Context + + prevJobContext context.Context + prevJobCancel context.CancelFunc + + eventCh chan interface{} + errorc chan error + wg sync.WaitGroup + + stratum *stratumClient + + customCmd string +} + +func newBridge(stratumServer, stratumPass, coinid string) *bridge { + + s := &bridge{ + ctx: context.Background(), + eventCh: make(chan interface{}), + errorc: make(chan error), + } + + if len(stratumServer) > 0 { + s.stratum = newStratumClient(stratumServer, stratumPass, coinid) + } + + return s +} + +func (b *bridge) start() { + + if b.stratum != nil { + backoff := time.Second + for { + err := b.stratum.dial() + if err == nil { + break + } + log.Printf("WARN: stratum.dial() error: %s, retry in %s", err, backoff) + time.Sleep(backoff) + if backoff < 60*time.Second { + backoff += time.Second + } + } + } + + for e := range b.eventCh { + switch e := e.(type) { + case *eventBlockConected: + b.handleFilteredBlockConnected(e) + default: + b.errorc <- fmt.Errorf("unknown event type: %T", e) + return + } + } +} + +func (b *bridge) handleFilteredBlockConnected(e *eventBlockConected) { + + if !*quiet { + log.Printf("Block connected: %s (%d) %v", e.header.BlockHash(), e.height, e.header.Timestamp) + } + + hash := e.header.BlockHash().String() + height := e.height + + // Cancel jobs on previous block. It's safe if they are already done. + if b.prevJobContext != nil { + select { + case <-b.prevJobContext.Done(): + log.Printf("prev one canceled") + default: + b.prevJobCancel() + } + } + + // Wait until all previous jobs are done or canceled. + b.wg.Wait() + + // Create and save cancelable subcontext for new jobs. + ctx, cancel := context.WithCancel(b.ctx) + b.prevJobContext, b.prevJobCancel = ctx, cancel + + if len(b.customCmd) > 0 { + go b.execCustomCommand(ctx, hash, height) + } + + // Send stratum update block message + if b.stratum != nil { + go b.stratumUpdateBlock(ctx, hash, height) + } +} + +func (s *bridge) stratumUpdateBlock(ctx context.Context, hash string, height int32) { + s.wg.Add(1) + defer s.wg.Done() + + backoff := time.Second + retry := func(err error) { + if backoff < 60*time.Second { + backoff += time.Second + } + log.Printf("WARN: stratum.send() on block %d error: %s", height, err) + time.Sleep(backoff) + s.stratum.dial() + } + + msg := stratumUpdateBlockMsg(*stratumPass, *coinid, hash) + + for { + switch err := s.stratum.send(ctx, msg); { + case err == nil: + return + case errors.Is(err, context.Canceled): + log.Printf("INFO: stratum.send() on block %d: %s.", height, err) + return + case errors.Is(err, syscall.EPIPE): + errClose := s.stratum.conn.Close() + if errClose != nil { + log.Printf("WARN: stratum.conn.Close() on block %d: %s.", height, errClose) + } + retry(err) + case errors.Is(err, net.ErrClosed): + retry(err) + default: + retry(err) + } + } + +} + +func (s *bridge) execCustomCommand(ctx context.Context, hash string, height int32) { + s.wg.Add(1) + defer s.wg.Done() + + cmd := strings.ReplaceAll(s.customCmd, "%s", hash) + err := doExecCustomCommand(ctx, cmd) + if err != nil { + log.Printf("ERROR: execCustomCommand on block %s(%d): %s", hash, height, err) + } +} + +func doExecCustomCommand(ctx context.Context, cmd string) error { + strs := strings.Split(cmd, " ") + path, err := exec.LookPath(strs[0]) + if errors.Is(err, exec.ErrDot) { + err = nil + } + if err != nil { + return err + } + c := exec.CommandContext(ctx, path, strs[1:]...) + c.Stdout = os.Stdout + return c.Run() +} diff --git a/rpcclient/examples/lbcdblocknotify/lbcdclient.go b/rpcclient/examples/lbcdblocknotify/lbcdclient.go new file mode 100644 index 00000000..91580fd8 --- /dev/null +++ b/rpcclient/examples/lbcdblocknotify/lbcdclient.go @@ -0,0 +1,53 @@ +package main + +import ( + "io/ioutil" + "log" + "path/filepath" + + "github.com/lbryio/lbcd/rpcclient" +) + +func newLbcdClient(server, user, pass string, notls bool, adpt adapter) *rpcclient.Client { + + ntfnHandlers := rpcclient.NotificationHandlers{ + OnFilteredBlockConnected: adpt.onFilteredBlockConnected, + } + + // Config lbcd RPC client with websockets. + connCfg := &rpcclient.ConnConfig{ + Host: server, + Endpoint: "ws", + User: user, + Pass: pass, + DisableTLS: true, + } + + if !notls { + cert, err := ioutil.ReadFile(filepath.Join(lbcdHomeDir, "rpc.cert")) + if err != nil { + log.Fatalf("can't read lbcd certificate: %s", err) + } + connCfg.Certificates = cert + connCfg.DisableTLS = false + } + + client, err := rpcclient.New(connCfg, &ntfnHandlers) + if err != nil { + log.Fatalf("can't create rpc client: %s", err) + } + + // Register for block connect and disconnect notifications. + if err = client.NotifyBlocks(); err != nil { + log.Fatalf("can't register block notification: %s", err) + } + + // Get the current block count. + blockCount, err := client.GetBlockCount() + if err != nil { + log.Fatalf("can't get block count: %s", err) + } + log.Printf("Current block count: %d", blockCount) + + return client +} diff --git a/rpcclient/examples/lbcdblocknotify/main.go b/rpcclient/examples/lbcdblocknotify/main.go index 9b672a59..27cdf329 100644 --- a/rpcclient/examples/lbcdblocknotify/main.go +++ b/rpcclient/examples/lbcdblocknotify/main.go @@ -1,136 +1,63 @@ -// Copyright (c) 2014-2017 The btcsuite developers -// Use of this source code is governed by an ISC -// license that can be found in the LICENSE file. - package main import ( - "errors" "flag" - "fmt" - "io/ioutil" "log" - "net" - "os" "os/exec" "path/filepath" "strings" - "github.com/lbryio/lbcd/rpcclient" - "github.com/lbryio/lbcd/wire" "github.com/lbryio/lbcutil" ) var ( - coinid = flag.String("coinid", "1425", "Coin ID") - stratum = flag.String("stratum", "", "Stratum server") - stratumPass = flag.String("stratumpass", "", "Stratum server password") - rpcserver = flag.String("rpcserver", "localhost:9245", "LBCD RPC server") - rpcuser = flag.String("rpcuser", "rpcuser", "LBCD RPC username") - rpcpass = flag.String("rpcpass", "rpcpass", "LBCD RPC password") - notls = flag.Bool("notls", false, "Connect to LBCD with TLS disabled") - run = flag.String("run", "", "Run custom shell command") - quiet = flag.Bool("quiet", false, "Do not print logs") + lbcdHomeDir = lbcutil.AppDataDir("lbcd", false) + defaultCert = filepath.Join(lbcdHomeDir, "rpc.cert") +) +var ( + coinid = flag.String("coinid", "1425", "Coin ID") + stratumServer = flag.String("stratum", "", "Stratum server") + stratumPass = flag.String("stratumpass", "", "Stratum server password") + rpcserver = flag.String("rpcserver", "localhost:9245", "LBCD RPC server") + rpcuser = flag.String("rpcuser", "rpcuser", "LBCD RPC username") + rpcpass = flag.String("rpcpass", "rpcpass", "LBCD RPC password") + rpccert = flag.String("rpccert", defaultCert, "LBCD RPC certificate") + notls = flag.Bool("notls", false, "Connect to LBCD with TLS disabled") + run = flag.String("run", "", "Run custom shell command") + quiet = flag.Bool("quiet", false, "Do not print logs") ) -func onFilteredBlockConnected(height int32, header *wire.BlockHeader, txns []*lbcutil.Tx) { - - blockHash := header.BlockHash().String() - - if !*quiet { - log.Printf("Block connected: %v (%d) %v", blockHash, height, header.Timestamp) - } - - if cmd := *run; len(cmd) != 0 { - cmd = strings.ReplaceAll(cmd, "%s", blockHash) - err := execCustomCommand(cmd) - if err != nil { - log.Printf("ERROR: execCustomCommand: %s", err) - } - } - - if len(*stratum) > 0 && len(*stratumPass) > 0 { - err := stratumUpdateBlock(*stratum, *stratumPass, *coinid, blockHash) - if err != nil { - log.Printf("ERROR: stratumUpdateBlock: %s", err) - } - } -} -func execCustomCommand(cmd string) error { - strs := strings.Split(cmd, " ") - path, err := exec.LookPath(strs[0]) - if errors.Is(err, exec.ErrDot) { - err = nil - } - if err != nil { - return err - } - c := exec.Command(path, strs[1:]...) - c.Stdout = os.Stdout - return c.Run() -} - -func stratumUpdateBlock(stratum, stratumPass, coinid, blockHash string) error { - addr, err := net.ResolveTCPAddr("tcp", stratum) - if err != nil { - return fmt.Errorf("can't resolve addr: %w", err) - } - - conn, err := net.DialTCP("tcp", nil, addr) - if err != nil { - return fmt.Errorf("can't dial tcp: %w", err) - } - defer conn.Close() - - msg := fmt.Sprintf(`{"id":1,"method":"mining.update_block","params":[%q,%s,%q]}`, - stratumPass, coinid, blockHash) - - _, err = conn.Write([]byte(msg)) - if err != nil { - return fmt.Errorf("can't write message: %w", err) - } - - return nil -} func main() { flag.Parse() - ntfnHandlers := rpcclient.NotificationHandlers{ - OnFilteredBlockConnected: onFilteredBlockConnected, + // Setup notification handler + b := newBridge(*stratumServer, *stratumPass, *coinid) + + if len(*run) > 0 { + // Check if ccommand exists. + strs := strings.Split(*run, " ") + cmd := strs[0] + _, err := exec.LookPath(cmd) + if err != nil { + log.Fatalf("ERROR: %s not found: %s", cmd, err) + } + b.customCmd = *run } - // Connect to local lbcd RPC server using websockets. - lbcdHomeDir := lbcutil.AppDataDir("lbcd", false) - certs, err := ioutil.ReadFile(filepath.Join(lbcdHomeDir, "rpc.cert")) - if err != nil { - log.Fatalf("can't read lbcd certificate: %s", err) - } - connCfg := &rpcclient.ConnConfig{ - Host: *rpcserver, - Endpoint: "ws", - User: *rpcuser, - Pass: *rpcpass, - Certificates: certs, - DisableTLS: *notls, - } - client, err := rpcclient.New(connCfg, &ntfnHandlers) - if err != nil { - log.Fatalf("can't create rpc client: %s", err) - } + // Start the eventt handler. + go b.start() - // Register for block connect and disconnect notifications. - if err = client.NotifyBlocks(); err != nil { - log.Fatalf("can't register block notification: %s", err) - } - log.Printf("NotifyBlocks: Registration Complete") + // Adaptater receives lbcd notifications, and emit events. + adpt := adapter{b} - // Get the current block count. - blockCount, err := client.GetBlockCount() - if err != nil { - log.Fatalf("can't get block count: %s", err) - } - log.Printf("Block count: %d", blockCount) + client := newLbcdClient(*rpcserver, *rpcuser, *rpcpass, *notls, adpt) + + go func() { + err := <-b.errorc + log.Fatalf("ERROR: %s", err) + client.Shutdown() + }() // Wait until the client either shuts down gracefully (or the user // terminates the process with Ctrl+C). diff --git a/rpcclient/examples/lbcdblocknotify/stratumclient.go b/rpcclient/examples/lbcdblocknotify/stratumclient.go new file mode 100644 index 00000000..449135ce --- /dev/null +++ b/rpcclient/examples/lbcdblocknotify/stratumclient.go @@ -0,0 +1,56 @@ +package main + +import ( + "context" + "fmt" + "net" +) + +type stratumClient struct { + server string + passwd string + coinid string + conn *net.TCPConn +} + +func newStratumClient(server, passwd, coinid string) *stratumClient { + + return &stratumClient{ + server: server, + } +} + +func (c *stratumClient) dial() error { + + addr, err := net.ResolveTCPAddr("tcp", c.server) + if err != nil { + return fmt.Errorf("resolve tcp addr: %w", err) + } + + conn, err := net.DialTCP("tcp", nil, addr) + if err != nil { + return fmt.Errorf("dial tcp: %w", err) + } + c.conn = conn + + return nil +} + +func (c *stratumClient) send(ctx context.Context, msg string) error { + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + _, err := c.conn.Write([]byte(msg)) + + return err +} + +func stratumUpdateBlockMsg(stratumPass, coinid, blockHash string) string { + + return fmt.Sprintf(`{"id":1,"method":"mining.update_block","params":[%q,%s,%q]}`, + stratumPass, coinid, blockHash) +}