Compare commits

...

6 commits

Author SHA1 Message Date
Roy Lee
a0ff51b84a claimtrie: allows '*' in claim name 2022-11-23 08:50:17 -08:00
Roy Lee
4c39a9842c rpcclient: update rescanblockchain support 2022-10-31 00:23:46 -07:00
Roy Lee
f513fca6a7 lbcdblocknotify: reorganize the code with a few updates
1. Fixed a bug, which reads certs even TLS is disabled

2. Persists Stratum TCP connection with auto-reconnect.
   (retry backoff increases from 1s to 60s maximum)

3. Stratum update jobs on previous notifications are canceled
   when a new notification arrives.

   Usually, the jobs are so short and completed immediately.
   However, if the Stratum connection is broken, this prevents
   the bridge from accumulating stale jobs.
2022-10-17 00:03:13 -07:00
Alex Grintsvayg
6728bf4b08 error properly when lbcd fails to connect in HTTP POST mode
in the case where you're e.g. trying to connect to an
invalid address, the err vars in handleSendPostMessage()
were being shadowed inside the for loop. if c.httpClient.Do()
returned an error, that error never got returned upstream.
then ioutil.ReadAll(httpResponse.Body) would get a nil pointer
dereference. this fixes that case.
2022-10-14 11:40:46 -07:00
Roy Lee
979d643594 [lbry] claimtrie: created node cache 2022-09-29 16:45:42 -07:00
Roy Lee
cbc4d489e8 lbcctl: support --timed, --quiet options 2022-09-29 16:45:42 -07:00
18 changed files with 609 additions and 233 deletions

View file

@ -977,8 +977,8 @@ func NewImportMultiCmd(requests []ImportMultiRequest, options *ImportMultiOption
// RescanBlockchainCmd defines the RescanBlockchain JSON-RPC command. // RescanBlockchainCmd defines the RescanBlockchain JSON-RPC command.
type RescanBlockchainCmd struct { type RescanBlockchainCmd struct {
StartHeight *int64 `jsonrpcdefault:"0"` StartHeight *int32 `jsonrpcdefault:"0"`
StopHeight *int64 `jsonrpcdefault:"0"` StopHeight *int32
} }
// NewRescanBlockchainCmd returns a new instance which can be used to issue // NewRescanBlockchainCmd returns a new instance which can be used to issue
@ -986,7 +986,7 @@ type RescanBlockchainCmd struct {
// //
// The parameters which are pointers indicate they are optional. Passing nil // The parameters which are pointers indicate they are optional. Passing nil
// for optional parameters will use the default value. // for optional parameters will use the default value.
func NewRescanBlockchainCmd(startHeight *int64, stopHeight *int64) *RescanBlockchainCmd { func NewRescanBlockchainCmd(startHeight *int32, stopHeight *int32) *RescanBlockchainCmd {
return &RescanBlockchainCmd{ return &RescanBlockchainCmd{
StartHeight: startHeight, StartHeight: startHeight,
StopHeight: stopHeight, StopHeight: stopHeight,

View file

@ -319,8 +319,8 @@ type ListUnspentResult struct {
// RescanBlockchainResult models the data returned from the rescanblockchain command. // RescanBlockchainResult models the data returned from the rescanblockchain command.
type RescanBlockchainResult struct { type RescanBlockchainResult struct {
StartHeight int64 `json:"start_height"` StartHeight int32 `json:"start_height"`
StoptHeight int64 `json:"stop_height"` StoptHeight int32 `json:"stop_height"`
} }
// SignRawTransactionError models the data that contains script verification // SignRawTransactionError models the data that contains script verification

View file

@ -4,9 +4,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"path/filepath" "path/filepath"
"runtime"
"sort" "sort"
"sync"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -249,17 +247,17 @@ func (ct *ClaimTrie) AppendBlock(temporary bool) error {
names = append(names, expirations...) names = append(names, expirations...)
names = removeDuplicates(names) names = removeDuplicates(names)
nhns := ct.makeNameHashNext(names, false, nil) for _, name := range names {
for nhn := range nhns {
ct.merkleTrie.Update(nhn.Name, nhn.Hash, true) hash, next := ct.nodeManager.Hash(name)
if nhn.Next <= 0 { ct.merkleTrie.Update(name, hash, true)
if next <= 0 {
continue continue
} }
newName := normalization.NormalizeIfNecessary(nhn.Name, nhn.Next) newName := normalization.NormalizeIfNecessary(name, next)
updateNames = append(updateNames, newName) updateNames = append(updateNames, newName)
updateHeights = append(updateHeights, nhn.Next) updateHeights = append(updateHeights, next)
} }
if !temporary && len(updateNames) > 0 { if !temporary && len(updateNames) > 0 {
err = ct.temporalRepo.SetNodesAt(updateNames, updateHeights) err = ct.temporalRepo.SetNodesAt(updateNames, updateHeights)
@ -356,22 +354,29 @@ func (ct *ClaimTrie) ResetHeight(height int32) error {
} }
func (ct *ClaimTrie) runFullTrieRebuild(names [][]byte, interrupt <-chan struct{}) { func (ct *ClaimTrie) runFullTrieRebuild(names [][]byte, interrupt <-chan struct{}) {
var nhns chan NameHashNext
if names == nil { if names == nil {
node.Log("Building the entire claim trie in RAM...") node.Log("Building the entire claim trie in RAM...")
ct.claimLogger = newClaimProgressLogger("Processed", node.GetLogger()) ct.claimLogger = newClaimProgressLogger("Processed", node.GetLogger())
nhns = ct.makeNameHashNext(nil, true, interrupt)
} else {
ct.claimLogger = nil
nhns = ct.makeNameHashNext(names, false, interrupt)
}
for nhn := range nhns { ct.nodeManager.IterateNames(func(name []byte) bool {
ct.merkleTrie.Update(nhn.Name, nhn.Hash, false) if interruptRequested(interrupt) {
if ct.claimLogger != nil { return false
ct.claimLogger.LogName(nhn.Name) }
clone := make([]byte, len(name))
copy(clone, name)
hash, _ := ct.nodeManager.Hash(clone)
ct.merkleTrie.Update(clone, hash, false)
ct.claimLogger.LogName(name)
return true
})
} else {
for _, name := range names {
hash, _ := ct.nodeManager.Hash(name)
ct.merkleTrie.Update(name, hash, false)
} }
} }
} }
// MerkleHash returns the Merkle Hash of the claimTrie. // MerkleHash returns the Merkle Hash of the claimTrie.
@ -437,12 +442,6 @@ func (ct *ClaimTrie) FlushToDisk() {
} }
} }
type NameHashNext struct {
Name []byte
Hash *chainhash.Hash
Next int32
}
func interruptRequested(interrupted <-chan struct{}) bool { func interruptRequested(interrupted <-chan struct{}) bool {
select { select {
case <-interrupted: // should never block on nil case <-interrupted: // should never block on nil
@ -452,53 +451,3 @@ func interruptRequested(interrupted <-chan struct{}) bool {
return false return false
} }
func (ct *ClaimTrie) makeNameHashNext(names [][]byte, all bool, interrupt <-chan struct{}) chan NameHashNext {
inputs := make(chan []byte, 512)
outputs := make(chan NameHashNext, 512)
var wg sync.WaitGroup
hashComputationWorker := func() {
for name := range inputs {
hash, next := ct.nodeManager.Hash(name)
outputs <- NameHashNext{name, hash, next}
}
wg.Done()
}
threads := int(0.8 * float32(runtime.GOMAXPROCS(0)))
if threads < 1 {
threads = 1
}
for threads > 0 {
threads--
wg.Add(1)
go hashComputationWorker()
}
go func() {
if all {
ct.nodeManager.IterateNames(func(name []byte) bool {
if interruptRequested(interrupt) {
return false
}
clone := make([]byte, len(name))
copy(clone, name) // iteration name buffer is reused on future loops
inputs <- clone
return true
})
} else {
for _, name := range names {
if interruptRequested(interrupt) {
break
}
inputs <- name
}
}
close(inputs)
}()
go func() {
wg.Wait()
close(outputs)
}()
return outputs
}

85
claimtrie/node/cache.go Normal file
View file

@ -0,0 +1,85 @@
package node
import (
"container/list"
"github.com/lbryio/lbcd/claimtrie/change"
)
type cacheLeaf struct {
node *Node
element *list.Element
changes []change.Change
height int32
}
type Cache struct {
nodes map[string]*cacheLeaf
order *list.List
limit int
}
func (nc *Cache) insert(name []byte, n *Node, height int32) {
key := string(name)
existing := nc.nodes[key]
if existing != nil {
existing.node = n
existing.height = height
existing.changes = nil
nc.order.MoveToFront(existing.element)
return
}
for nc.order.Len() >= nc.limit {
// TODO: maybe ensure that we don't remove nodes that have a lot of changes?
delete(nc.nodes, nc.order.Back().Value.(string))
nc.order.Remove(nc.order.Back())
}
element := nc.order.PushFront(key)
nc.nodes[key] = &cacheLeaf{node: n, element: element, height: height}
}
func (nc *Cache) fetch(name []byte, height int32) (*Node, []change.Change, int32) {
key := string(name)
existing := nc.nodes[key]
if existing != nil && existing.height <= height {
nc.order.MoveToFront(existing.element)
return existing.node, existing.changes, existing.height
}
return nil, nil, -1
}
func (nc *Cache) addChanges(changes []change.Change, height int32) {
for _, c := range changes {
key := string(c.Name)
existing := nc.nodes[key]
if existing != nil && existing.height <= height {
existing.changes = append(existing.changes, c)
}
}
}
func (nc *Cache) drop(names [][]byte) {
for _, name := range names {
key := string(name)
existing := nc.nodes[key]
if existing != nil {
// we can't roll it backwards because we don't know its previous height value; just toast it
delete(nc.nodes, key)
nc.order.Remove(existing.element)
}
}
}
func (nc *Cache) clear() {
nc.nodes = map[string]*cacheLeaf{}
nc.order = list.New()
// we'll let the GC sort out the remains...
}
func NewCache(limit int) *Cache {
return &Cache{limit: limit, nodes: map[string]*cacheLeaf{}, order: list.New()}
}

View file

@ -21,6 +21,7 @@ type Manager interface {
IterateNames(predicate func(name []byte) bool) IterateNames(predicate func(name []byte) bool)
Hash(name []byte) (*chainhash.Hash, int32) Hash(name []byte) (*chainhash.Hash, int32)
Flush() error Flush() error
ClearCache()
} }
type BaseManager struct { type BaseManager struct {
@ -30,31 +31,62 @@ type BaseManager struct {
changes []change.Change changes []change.Change
tempChanges map[string][]change.Change tempChanges map[string][]change.Change
cache *Cache
} }
func NewBaseManager(repo Repo) (*BaseManager, error) { func NewBaseManager(repo Repo) (*BaseManager, error) {
nm := &BaseManager{ nm := &BaseManager{
repo: repo, repo: repo,
cache: NewCache(10000), // TODO: how many should we cache?
} }
return nm, nil return nm, nil
} }
func (nm *BaseManager) ClearCache() {
nm.cache.clear()
}
func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) { func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) {
changes, err := nm.repo.LoadChanges(name) n, changes, oldHeight := nm.cache.fetch(name, height)
if err != nil { if n == nil {
return nil, errors.Wrap(err, "in load changes") changes, err := nm.repo.LoadChanges(name)
} if err != nil {
return nil, errors.Wrap(err, "in load changes")
}
if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block
changes = append(changes, nm.tempChanges[string(name)]...) changes = append(changes, nm.tempChanges[string(name)]...)
} }
n, err := nm.newNodeFromChanges(changes, height) n, err = nm.newNodeFromChanges(changes, height)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "in new node") return nil, errors.Wrap(err, "in new node")
}
// TODO: how can we tell what needs to be cached?
if nm.tempChanges == nil && height == nm.height && n != nil && (len(changes) > 4 || len(name) < 12) {
nm.cache.insert(name, n, height)
}
} else {
if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block
changes = append(changes, nm.tempChanges[string(name)]...)
n = n.Clone()
} else if height != nm.height {
n = n.Clone()
}
updated, err := nm.updateFromChanges(n, changes, height)
if err != nil {
return nil, errors.Wrap(err, "in update from changes")
}
if !updated {
n.AdjustTo(oldHeight, height, name)
}
if nm.tempChanges == nil && height == nm.height {
nm.cache.insert(name, n, height)
}
} }
return n, nil return n, nil
@ -66,17 +98,13 @@ func (nm *BaseManager) node(name []byte) (*Node, error) {
return nm.NodeAt(nm.height, name) return nm.NodeAt(nm.height, name)
} }
// newNodeFromChanges returns a new Node constructed from the changes. func (nm *BaseManager) updateFromChanges(n *Node, changes []change.Change, height int32) (bool, error) {
// The changes must preserve their order received.
func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32) (*Node, error) {
if len(changes) == 0 {
return nil, nil
}
n := New()
previous := changes[0].Height
count := len(changes) count := len(changes)
if count == 0 {
return false, nil
}
previous := changes[0].Height
for i, chg := range changes { for i, chg := range changes {
if chg.Height < previous { if chg.Height < previous {
@ -95,15 +123,37 @@ func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32)
delay := nm.getDelayForName(n, chg) delay := nm.getDelayForName(n, chg)
err := n.ApplyChange(chg, delay) err := n.ApplyChange(chg, delay)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "in apply change") return false, errors.Wrap(err, "in apply change")
} }
} }
if count <= 0 { if count <= 0 {
return nil, nil // we applied no changes, which means we shouldn't exist if we had all the changes
// or might mean nothing significant if we are applying a partial changeset
return false, nil
} }
lastChange := changes[count-1] lastChange := changes[count-1]
return n.AdjustTo(lastChange.Height, height, lastChange.Name), nil n.AdjustTo(lastChange.Height, height, lastChange.Name)
return true, nil
}
// newNodeFromChanges returns a new Node constructed from the changes.
// The changes must preserve their order received.
func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32) (*Node, error) {
if len(changes) == 0 {
return nil, nil
}
n := New()
updated, err := nm.updateFromChanges(n, changes, height)
if err != nil {
return nil, errors.Wrap(err, "in update from changes")
}
if updated {
return n, nil
}
return nil, nil
} }
func (nm *BaseManager) AppendChange(chg change.Change) { func (nm *BaseManager) AppendChange(chg change.Change) {
@ -220,6 +270,7 @@ func (nm *BaseManager) IncrementHeightTo(height int32, temporary bool) ([][]byte
} }
if !temporary { if !temporary {
nm.cache.addChanges(nm.changes, height)
if err := nm.repo.AppendChanges(nm.changes); err != nil { // destroys names if err := nm.repo.AppendChanges(nm.changes); err != nil { // destroys names
return nil, errors.Wrap(err, "in append changes") return nil, errors.Wrap(err, "in append changes")
} }
@ -255,6 +306,8 @@ func (nm *BaseManager) DecrementHeightTo(affectedNames [][]byte, height int32) (
return affectedNames, errors.Wrap(err, "in drop changes") return affectedNames, errors.Wrap(err, "in drop changes")
} }
} }
nm.cache.drop(affectedNames)
} }
nm.height = height nm.height = height

View file

@ -110,7 +110,7 @@ func (n *Node) ApplyChange(chg change.Change, delay int32) error {
} }
// AdjustTo activates claims and computes takeovers until it reaches the specified height. // AdjustTo activates claims and computes takeovers until it reaches the specified height.
func (n *Node) AdjustTo(height, maxHeight int32, name []byte) *Node { func (n *Node) AdjustTo(height, maxHeight int32, name []byte) {
changed := n.handleExpiredAndActivated(height) > 0 changed := n.handleExpiredAndActivated(height) > 0
n.updateTakeoverHeight(height, name, changed) n.updateTakeoverHeight(height, name, changed)
if maxHeight > height { if maxHeight > height {
@ -120,7 +120,6 @@ func (n *Node) AdjustTo(height, maxHeight int32, name []byte) *Node {
height = h height = h
} }
} }
return n
} }
func (n *Node) updateTakeoverHeight(height int32, name []byte, refindBest bool) { func (n *Node) updateTakeoverHeight(height int32, name []byte, refindBest bool) {
@ -340,3 +339,28 @@ func (n *Node) SortClaimsByBid() {
return OutPointLess(n.Claims[j].OutPoint, n.Claims[i].OutPoint) return OutPointLess(n.Claims[j].OutPoint, n.Claims[i].OutPoint)
}) })
} }
func (n *Node) Clone() *Node {
clone := New()
if n.SupportSums != nil {
clone.SupportSums = map[string]int64{}
for key, value := range n.SupportSums {
clone.SupportSums[key] = value
}
}
clone.Supports = make(ClaimList, len(n.Supports))
for i, support := range n.Supports {
clone.Supports[i] = &Claim{}
*clone.Supports[i] = *support
}
clone.Claims = make(ClaimList, len(n.Claims))
for i, claim := range n.Claims {
clone.Claims[i] = &Claim{}
*clone.Claims[i] = *claim
}
clone.TakenOverAt = n.TakenOverAt
if n.BestClaim != nil {
clone.BestClaim = clone.Claims.find(byID(n.BestClaim.ClaimID))
}
return clone
}

View file

@ -34,6 +34,7 @@ func (nm *NormalizingManager) IncrementHeightTo(height int32, temporary bool) ([
func (nm *NormalizingManager) DecrementHeightTo(affectedNames [][]byte, height int32) ([][]byte, error) { func (nm *NormalizingManager) DecrementHeightTo(affectedNames [][]byte, height int32) ([][]byte, error) {
if nm.normalizedAt > height { if nm.normalizedAt > height {
nm.normalizedAt = -1 nm.normalizedAt = -1
nm.ClearCache()
} }
return nm.Manager.DecrementHeightTo(affectedNames, height) return nm.Manager.DecrementHeightTo(affectedNames, height)
} }
@ -110,5 +111,7 @@ func (nm *NormalizingManager) addNormalizationForkChangesIfNecessary(height int3
return true return true
} }
nm.Manager.ClearCache()
nm.Manager.IterateNames(predicate) nm.Manager.IterateNames(predicate)
} }

View file

@ -111,6 +111,8 @@ type config struct {
SigNet bool `long:"signet" description:"Connect to signet (default RPC server: localhost:49245)"` SigNet bool `long:"signet" description:"Connect to signet (default RPC server: localhost:49245)"`
Wallet bool `long:"wallet" description:"Connect to wallet RPC server instead (default: localhost:9244, testnet: localhost:19244, regtest: localhost:29244)"` Wallet bool `long:"wallet" description:"Connect to wallet RPC server instead (default: localhost:9244, testnet: localhost:19244, regtest: localhost:29244)"`
ShowVersion bool `short:"V" long:"version" description:"Display version information and exit"` ShowVersion bool `short:"V" long:"version" description:"Display version information and exit"`
Timed bool `short:"t" long:"timed" description:"Display RPC response time"`
Quiet bool `short:"q" long:"quiet" description:"Do not output results to stdout"`
} }
// normalizeAddress returns addr with the passed default port appended if // normalizeAddress returns addr with the passed default port appended if

View file

@ -9,6 +9,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
"github.com/lbryio/lbcd/btcjson" "github.com/lbryio/lbcd/btcjson"
) )
@ -133,6 +134,8 @@ func main() {
os.Exit(1) os.Exit(1)
} }
started := time.Now()
// Send the JSON-RPC request to the server using the user-specified // Send the JSON-RPC request to the server using the user-specified
// connection configuration. // connection configuration.
result, err := sendPostRequest(marshalledJSON, cfg) result, err := sendPostRequest(marshalledJSON, cfg)
@ -141,6 +144,16 @@ func main() {
os.Exit(1) os.Exit(1)
} }
if cfg.Timed {
elapsed := time.Since(started)
defer fmt.Fprintf(os.Stderr, "%s\n", elapsed)
}
var output io.Writer = os.Stdout
if cfg.Quiet {
output = io.Discard
}
// Choose how to display the result based on its type. // Choose how to display the result based on its type.
strResult := string(result) strResult := string(result)
if strings.HasPrefix(strResult, "{") || strings.HasPrefix(strResult, "[") { if strings.HasPrefix(strResult, "{") || strings.HasPrefix(strResult, "[") {
@ -150,7 +163,7 @@ func main() {
err) err)
os.Exit(1) os.Exit(1)
} }
fmt.Println(dst.String()) fmt.Fprintln(output, dst.String())
} else if strings.HasPrefix(strResult, `"`) { } else if strings.HasPrefix(strResult, `"`) {
var str string var str string
@ -159,9 +172,9 @@ func main() {
err) err)
os.Exit(1) os.Exit(1)
} }
fmt.Println(str) fmt.Fprintln(output, str)
} else if strResult != "null" { } else if strResult != "null" {
fmt.Println(strResult) fmt.Fprintln(output, strResult)
} }
} }

View file

@ -1,15 +1,21 @@
# lbcd Websockets Example # lbcdbloknotify
This example shows how to use the rpcclient package to connect to a btcd RPC This bridge program subscribes to lbcd's notifications over websockets using the rpcclient package.
server using TLS-secured websockets, register for block connected and block Users can specify supported actions upon receiving this notifications.
disconnected notifications, and get the current block count.
## Running the Example ## Building(or Running) the Program
The first step is to clone the lbcd package: Clone the lbcd package:
```bash ```bash
$ git clone github.com/lbryio/lbcd $ git clone github.com/lbryio/lbcd
$ cd lbcd/rpcclient/examples
# build the program
$ go build .
# or directly run it (build implicitly behind the scene)
$ go run .
``` ```
Display available options: Display available options:
@ -30,19 +36,30 @@ $ go run . -h
-stratumpass string -stratumpass string
Stratum server password (default "password") Stratum server password (default "password")
-quiet -quiet
Do not print logs Do not print periodic logs
``` ```
Start the program: Running the program:
```bash ```bash
$ go run . -stratumpass <STRATUM PASSWD> -rpcuser <RPC USERNAME> -rpcpass <RPC PASSWD> # Send stratum mining.update_block mesage upon receving block connected notifiations.
$ go run . -rpcuser <RPC USERNAME> -rpcpass <RPC PASSWD> --notls -stratum <STRATUM SERVER> -stratumpass <STRATUM PASSWD>
2022/01/10 23:16:21 NotifyBlocks: Registration Complete 2022/01/10 23:16:21 Current block count: 1093112
2022/01/10 23:16:21 Block count: 1093112
... ...
# Execute a custome command (with blockhash) upon receving block connected notifiations.
$ go run . -rpcuser <RPC USERNAME> -rpcpass <RPC PASSWD> --notls -run "echo %s"
``` ```
## Notes
* Stratum TCP connection is persisted with auto-reconnect. (retry backoff increases from 1s to 60s maximum)
* Stratum update_block jobs on previous notifications are canceled when a new notification arrives.
Usually, the jobs are so short and completed immediately. However, if the Stratum connection is broken, this
prevents the bridge from accumulating stale jobs.
## License ## License
This example is licensed under the [copyfree](http://copyfree.org) ISC License. This example is licensed under the [copyfree](http://copyfree.org) ISC License.

View file

@ -0,0 +1,20 @@
package main
import (
"github.com/lbryio/lbcd/wire"
"github.com/lbryio/lbcutil"
)
type eventBlockConected struct {
height int32
header *wire.BlockHeader
txns []*lbcutil.Tx
}
type adapter struct {
*bridge
}
func (a *adapter) onFilteredBlockConnected(height int32, header *wire.BlockHeader, txns []*lbcutil.Tx) {
a.eventCh <- &eventBlockConected{height, header, txns}
}

View file

@ -0,0 +1,172 @@
package main
import (
"context"
"errors"
"fmt"
"log"
"net"
"os"
"os/exec"
"strings"
"sync"
"syscall"
"time"
)
type bridge struct {
ctx context.Context
prevJobContext context.Context
prevJobCancel context.CancelFunc
eventCh chan interface{}
errorc chan error
wg sync.WaitGroup
stratum *stratumClient
customCmd string
}
func newBridge(stratumServer, stratumPass, coinid string) *bridge {
s := &bridge{
ctx: context.Background(),
eventCh: make(chan interface{}),
errorc: make(chan error),
}
if len(stratumServer) > 0 {
s.stratum = newStratumClient(stratumServer, stratumPass, coinid)
}
return s
}
func (b *bridge) start() {
if b.stratum != nil {
backoff := time.Second
for {
err := b.stratum.dial()
if err == nil {
break
}
log.Printf("WARN: stratum.dial() error: %s, retry in %s", err, backoff)
time.Sleep(backoff)
if backoff < 60*time.Second {
backoff += time.Second
}
}
}
for e := range b.eventCh {
switch e := e.(type) {
case *eventBlockConected:
b.handleFilteredBlockConnected(e)
default:
b.errorc <- fmt.Errorf("unknown event type: %T", e)
return
}
}
}
func (b *bridge) handleFilteredBlockConnected(e *eventBlockConected) {
if !*quiet {
log.Printf("Block connected: %s (%d) %v", e.header.BlockHash(), e.height, e.header.Timestamp)
}
hash := e.header.BlockHash().String()
height := e.height
// Cancel jobs on previous block. It's safe if they are already done.
if b.prevJobContext != nil {
select {
case <-b.prevJobContext.Done():
log.Printf("prev one canceled")
default:
b.prevJobCancel()
}
}
// Wait until all previous jobs are done or canceled.
b.wg.Wait()
// Create and save cancelable subcontext for new jobs.
ctx, cancel := context.WithCancel(b.ctx)
b.prevJobContext, b.prevJobCancel = ctx, cancel
if len(b.customCmd) > 0 {
go b.execCustomCommand(ctx, hash, height)
}
// Send stratum update block message
if b.stratum != nil {
go b.stratumUpdateBlock(ctx, hash, height)
}
}
func (s *bridge) stratumUpdateBlock(ctx context.Context, hash string, height int32) {
s.wg.Add(1)
defer s.wg.Done()
backoff := time.Second
retry := func(err error) {
if backoff < 60*time.Second {
backoff += time.Second
}
log.Printf("WARN: stratum.send() on block %d error: %s", height, err)
time.Sleep(backoff)
s.stratum.dial()
}
msg := stratumUpdateBlockMsg(*stratumPass, *coinid, hash)
for {
switch err := s.stratum.send(ctx, msg); {
case err == nil:
return
case errors.Is(err, context.Canceled):
log.Printf("INFO: stratum.send() on block %d: %s.", height, err)
return
case errors.Is(err, syscall.EPIPE):
errClose := s.stratum.conn.Close()
if errClose != nil {
log.Printf("WARN: stratum.conn.Close() on block %d: %s.", height, errClose)
}
retry(err)
case errors.Is(err, net.ErrClosed):
retry(err)
default:
retry(err)
}
}
}
func (s *bridge) execCustomCommand(ctx context.Context, hash string, height int32) {
s.wg.Add(1)
defer s.wg.Done()
cmd := strings.ReplaceAll(s.customCmd, "%s", hash)
err := doExecCustomCommand(ctx, cmd)
if err != nil {
log.Printf("ERROR: execCustomCommand on block %s(%d): %s", hash, height, err)
}
}
func doExecCustomCommand(ctx context.Context, cmd string) error {
strs := strings.Split(cmd, " ")
path, err := exec.LookPath(strs[0])
if errors.Is(err, exec.ErrDot) {
err = nil
}
if err != nil {
return err
}
c := exec.CommandContext(ctx, path, strs[1:]...)
c.Stdout = os.Stdout
return c.Run()
}

View file

@ -0,0 +1,53 @@
package main
import (
"io/ioutil"
"log"
"path/filepath"
"github.com/lbryio/lbcd/rpcclient"
)
func newLbcdClient(server, user, pass string, notls bool, adpt adapter) *rpcclient.Client {
ntfnHandlers := rpcclient.NotificationHandlers{
OnFilteredBlockConnected: adpt.onFilteredBlockConnected,
}
// Config lbcd RPC client with websockets.
connCfg := &rpcclient.ConnConfig{
Host: server,
Endpoint: "ws",
User: user,
Pass: pass,
DisableTLS: true,
}
if !notls {
cert, err := ioutil.ReadFile(filepath.Join(lbcdHomeDir, "rpc.cert"))
if err != nil {
log.Fatalf("can't read lbcd certificate: %s", err)
}
connCfg.Certificates = cert
connCfg.DisableTLS = false
}
client, err := rpcclient.New(connCfg, &ntfnHandlers)
if err != nil {
log.Fatalf("can't create rpc client: %s", err)
}
// Register for block connect and disconnect notifications.
if err = client.NotifyBlocks(); err != nil {
log.Fatalf("can't register block notification: %s", err)
}
// Get the current block count.
blockCount, err := client.GetBlockCount()
if err != nil {
log.Fatalf("can't get block count: %s", err)
}
log.Printf("Current block count: %d", blockCount)
return client
}

View file

@ -1,136 +1,63 @@
// Copyright (c) 2014-2017 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main package main
import ( import (
"errors"
"flag" "flag"
"fmt"
"io/ioutil"
"log" "log"
"net"
"os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"strings" "strings"
"github.com/lbryio/lbcd/rpcclient"
"github.com/lbryio/lbcd/wire"
"github.com/lbryio/lbcutil" "github.com/lbryio/lbcutil"
) )
var ( var (
coinid = flag.String("coinid", "1425", "Coin ID") lbcdHomeDir = lbcutil.AppDataDir("lbcd", false)
stratum = flag.String("stratum", "", "Stratum server") defaultCert = filepath.Join(lbcdHomeDir, "rpc.cert")
stratumPass = flag.String("stratumpass", "", "Stratum server password") )
rpcserver = flag.String("rpcserver", "localhost:9245", "LBCD RPC server") var (
rpcuser = flag.String("rpcuser", "rpcuser", "LBCD RPC username") coinid = flag.String("coinid", "1425", "Coin ID")
rpcpass = flag.String("rpcpass", "rpcpass", "LBCD RPC password") stratumServer = flag.String("stratum", "", "Stratum server")
notls = flag.Bool("notls", false, "Connect to LBCD with TLS disabled") stratumPass = flag.String("stratumpass", "", "Stratum server password")
run = flag.String("run", "", "Run custom shell command") rpcserver = flag.String("rpcserver", "localhost:9245", "LBCD RPC server")
quiet = flag.Bool("quiet", false, "Do not print logs") rpcuser = flag.String("rpcuser", "rpcuser", "LBCD RPC username")
rpcpass = flag.String("rpcpass", "rpcpass", "LBCD RPC password")
rpccert = flag.String("rpccert", defaultCert, "LBCD RPC certificate")
notls = flag.Bool("notls", false, "Connect to LBCD with TLS disabled")
run = flag.String("run", "", "Run custom shell command")
quiet = flag.Bool("quiet", false, "Do not print logs")
) )
func onFilteredBlockConnected(height int32, header *wire.BlockHeader, txns []*lbcutil.Tx) {
blockHash := header.BlockHash().String()
if !*quiet {
log.Printf("Block connected: %v (%d) %v", blockHash, height, header.Timestamp)
}
if cmd := *run; len(cmd) != 0 {
cmd = strings.ReplaceAll(cmd, "%s", blockHash)
err := execCustomCommand(cmd)
if err != nil {
log.Printf("ERROR: execCustomCommand: %s", err)
}
}
if len(*stratum) > 0 && len(*stratumPass) > 0 {
err := stratumUpdateBlock(*stratum, *stratumPass, *coinid, blockHash)
if err != nil {
log.Printf("ERROR: stratumUpdateBlock: %s", err)
}
}
}
func execCustomCommand(cmd string) error {
strs := strings.Split(cmd, " ")
path, err := exec.LookPath(strs[0])
if errors.Is(err, exec.ErrDot) {
err = nil
}
if err != nil {
return err
}
c := exec.Command(path, strs[1:]...)
c.Stdout = os.Stdout
return c.Run()
}
func stratumUpdateBlock(stratum, stratumPass, coinid, blockHash string) error {
addr, err := net.ResolveTCPAddr("tcp", stratum)
if err != nil {
return fmt.Errorf("can't resolve addr: %w", err)
}
conn, err := net.DialTCP("tcp", nil, addr)
if err != nil {
return fmt.Errorf("can't dial tcp: %w", err)
}
defer conn.Close()
msg := fmt.Sprintf(`{"id":1,"method":"mining.update_block","params":[%q,%s,%q]}`,
stratumPass, coinid, blockHash)
_, err = conn.Write([]byte(msg))
if err != nil {
return fmt.Errorf("can't write message: %w", err)
}
return nil
}
func main() { func main() {
flag.Parse() flag.Parse()
ntfnHandlers := rpcclient.NotificationHandlers{ // Setup notification handler
OnFilteredBlockConnected: onFilteredBlockConnected, b := newBridge(*stratumServer, *stratumPass, *coinid)
if len(*run) > 0 {
// Check if ccommand exists.
strs := strings.Split(*run, " ")
cmd := strs[0]
_, err := exec.LookPath(cmd)
if err != nil {
log.Fatalf("ERROR: %s not found: %s", cmd, err)
}
b.customCmd = *run
} }
// Connect to local lbcd RPC server using websockets. // Start the eventt handler.
lbcdHomeDir := lbcutil.AppDataDir("lbcd", false) go b.start()
certs, err := ioutil.ReadFile(filepath.Join(lbcdHomeDir, "rpc.cert"))
if err != nil {
log.Fatalf("can't read lbcd certificate: %s", err)
}
connCfg := &rpcclient.ConnConfig{
Host: *rpcserver,
Endpoint: "ws",
User: *rpcuser,
Pass: *rpcpass,
Certificates: certs,
DisableTLS: *notls,
}
client, err := rpcclient.New(connCfg, &ntfnHandlers)
if err != nil {
log.Fatalf("can't create rpc client: %s", err)
}
// Register for block connect and disconnect notifications. // Adaptater receives lbcd notifications, and emit events.
if err = client.NotifyBlocks(); err != nil { adpt := adapter{b}
log.Fatalf("can't register block notification: %s", err)
}
log.Printf("NotifyBlocks: Registration Complete")
// Get the current block count. client := newLbcdClient(*rpcserver, *rpcuser, *rpcpass, *notls, adpt)
blockCount, err := client.GetBlockCount()
if err != nil { go func() {
log.Fatalf("can't get block count: %s", err) err := <-b.errorc
} log.Fatalf("ERROR: %s", err)
log.Printf("Block count: %d", blockCount) client.Shutdown()
}()
// Wait until the client either shuts down gracefully (or the user // Wait until the client either shuts down gracefully (or the user
// terminates the process with Ctrl+C). // terminates the process with Ctrl+C).

View file

@ -0,0 +1,56 @@
package main
import (
"context"
"fmt"
"net"
)
type stratumClient struct {
server string
passwd string
coinid string
conn *net.TCPConn
}
func newStratumClient(server, passwd, coinid string) *stratumClient {
return &stratumClient{
server: server,
}
}
func (c *stratumClient) dial() error {
addr, err := net.ResolveTCPAddr("tcp", c.server)
if err != nil {
return fmt.Errorf("resolve tcp addr: %w", err)
}
conn, err := net.DialTCP("tcp", nil, addr)
if err != nil {
return fmt.Errorf("dial tcp: %w", err)
}
c.conn = conn
return nil
}
func (c *stratumClient) send(ctx context.Context, msg string) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
_, err := c.conn.Write([]byte(msg))
return err
}
func stratumUpdateBlockMsg(stratumPass, coinid, blockHash string) string {
return fmt.Sprintf(`{"id":1,"method":"mining.update_block","params":[%q,%s,%q]}`,
stratumPass, coinid, blockHash)
}

View file

@ -774,7 +774,8 @@ func (c *Client) handleSendPostMessage(jReq *jsonRequest) {
tries := 10 tries := 10
for i := 0; tries == 0 || i < tries; i++ { for i := 0; tries == 0 || i < tries; i++ {
bodyReader := bytes.NewReader(jReq.marshalledJSON) bodyReader := bytes.NewReader(jReq.marshalledJSON)
httpReq, err := http.NewRequest("POST", url, bodyReader) var httpReq *http.Request
httpReq, err = http.NewRequest("POST", url, bodyReader)
if err != nil { if err != nil {
jReq.responseChan <- &Response{result: nil, err: err} jReq.responseChan <- &Response{result: nil, err: err}
return return
@ -786,7 +787,8 @@ func (c *Client) handleSendPostMessage(jReq *jsonRequest) {
} }
// Configure basic access authorization. // Configure basic access authorization.
user, pass, err := c.config.getAuth() var user, pass string
user, pass, err = c.config.getAuth()
if err != nil { if err != nil {
jReq.responseChan <- &Response{result: nil, err: err} jReq.responseChan <- &Response{result: nil, err: err}
return return

View file

@ -2062,14 +2062,14 @@ func (r FutureRescanBlockchainResult) Receive() (*btcjson.RescanBlockchainResult
// returned instance. // returned instance.
// //
// See RescanBlockchain for the blocking version and more details. // See RescanBlockchain for the blocking version and more details.
func (c *Client) RescanBlockchainAsync(startHeight *int64, stopHeight *int64) FutureRescanBlockchainResult { func (c *Client) RescanBlockchainAsync(startHeight *int32, stopHeight *int32) FutureRescanBlockchainResult {
cmd := btcjson.NewRescanBlockchainCmd(startHeight, stopHeight) cmd := btcjson.NewRescanBlockchainCmd(startHeight, stopHeight)
return c.SendCmd(cmd) return c.SendCmd(cmd)
} }
// RescanBlockchain rescans the local blockchain for wallet related // RescanBlockchain rescans the local blockchain for wallet related
// transactions from the startHeight to the the inclusive stopHeight. // transactions from the startHeight to the the inclusive stopHeight.
func (c *Client) RescanBlockchain(startHeight *int64, stopHeight *int64) (*btcjson.RescanBlockchainResult, error) { func (c *Client) RescanBlockchain(startHeight *int32, stopHeight *int32) (*btcjson.RescanBlockchainResult, error) {
return c.RescanBlockchainAsync(startHeight, stopHeight).Receive() return c.RescanBlockchainAsync(startHeight, stopHeight).Receive()
} }

View file

@ -206,7 +206,7 @@ func StripClaimScriptPrefix(script []byte) []byte {
return script[cs.Size:] return script[cs.Size:]
} }
const illegalChars = "=&#:*$%?/;\\\b\n\t\r\x00" const illegalChars = "=&#:$%?/;\\\b\n\t\r\x00"
func AllClaimsAreSane(script []byte, enforceSoftFork bool) error { func AllClaimsAreSane(script []byte, enforceSoftFork bool) error {
cs, err := ExtractClaimScript(script) cs, err := ExtractClaimScript(script)