WIP: blockchain.transaction.yyy JSON RPC implementations (#78)

* Partial blockchain.transaction.yyy RPC implementations.

* Register RPC service object.

* Move session manager start/stop to a better place.

* Attempt to fill in the details of transaction.get_batch,
including merkle path.

* Correct interpretation of DBStateValue Genesis hash.

* Convert Args.Port to int and validate. Run UDP ping server on JSONRPCPort too.

* Add BlockHeader to HeightHash notification.

* Limit session-based JSON RPC service to IPv4.
Client not ready for IPv6.

* Adapt to new HeightHash struct.

* Fine tune JSON RPC handlers and types to match lbry-sdk expectations.
Implement UnmarshalJSON()/MarshalJSON() for several types.

* Add more special handling of DBStateValue.Genesis hash.

* Set IncludeStop=false generally to avoid returning extra rows.
Other misc fixes.
This commit is contained in:
Jonathan Moody 2022-12-06 16:14:28 -05:00 committed by GitHub
parent e070e8a51e
commit 317cdf7129
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 682 additions and 165 deletions

View file

@ -441,8 +441,8 @@ func (db *ReadOnlyDBColumnFamily) selectFrom(prefix []byte, startKey, stopKey pr
// Prefix and handle // Prefix and handle
options := NewIterateOptions().WithDB(db).WithPrefix(prefix).WithCfHandle(handle) options := NewIterateOptions().WithDB(db).WithPrefix(prefix).WithCfHandle(handle)
// Start and stop bounds // Start and stop bounds
options = options.WithStart(startKey.PackKey()).WithStop(stopKey.PackKey()).WithIncludeStop(true) options = options.WithStart(startKey.PackKey()).WithStop(stopKey.PackKey()).WithIncludeStop(false)
// Don't include the key // Include the key and value
options = options.WithIncludeKey(true).WithIncludeValue(true) options = options.WithIncludeKey(true).WithIncludeValue(true)
return []*IterOptions{options}, nil return []*IterOptions{options}, nil
} }
@ -455,7 +455,7 @@ func iterate(db *grocksdb.DB, opts []*IterOptions) <-chan []*prefixes.PrefixRowK
for kv := range IterCF(db, o) { for kv := range IterCF(db, o) {
row := make([]*prefixes.PrefixRowKV, 0, 1) row := make([]*prefixes.PrefixRowKV, 0, 1)
row = append(row, kv) row = append(row, kv)
log.Debugf("iterate[%v][%v] %#v", i, j, kv) log.Debugf("iterate[%v][%v] %#v -> %#v", i, j, kv.Key, kv.Value)
out <- row out <- row
j++ j++
} }
@ -481,7 +481,7 @@ func innerJoin(db *grocksdb.DB, in <-chan []*prefixes.PrefixRowKV, selectFn func
row = append(row, kvs...) row = append(row, kvs...)
row = append(row, kv...) row = append(row, kv...)
for i, kv := range row { for i, kv := range row {
log.Debugf("row[%v] %#v", i, kv) log.Debugf("row[%v] %#v -> %#v", i, kv.Key, kv.Value)
} }
out <- row out <- row
} }
@ -579,6 +579,7 @@ func GetDBColumnFamilies(name string, secondayPath string, cfNames []string, grp
// db, handles, err := grocksdb.OpenDbColumnFamilies(opts, name, cfNames, cfOpts) // db, handles, err := grocksdb.OpenDbColumnFamilies(opts, name, cfNames, cfOpts)
if err != nil { if err != nil {
log.Errorf("open db as secondary failed: %v", err)
return nil, err return nil, err
} }
@ -685,7 +686,7 @@ func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) {
for { for {
// FIXME: Figure out best sleep interval // FIXME: Figure out best sleep interval
if time.Since(lastPrint) > time.Second { if time.Since(lastPrint) > time.Second {
log.Debug("DetectChanges:", db.LastState) log.Debugf("DetectChanges: %#v", db.LastState)
lastPrint = time.Now() lastPrint = time.Now()
} }
err := db.detectChanges(notifCh) err := db.detectChanges(notifCh)
@ -775,7 +776,12 @@ func (db *ReadOnlyDBColumnFamily) detectChanges(notifCh chan<- interface{}) erro
log.Info("error getting block hash: ", err) log.Info("error getting block hash: ", err)
return err return err
} }
notifCh <- &internal.HeightHash{Height: uint64(height), BlockHash: hash} header, err := db.GetHeader(height)
if err != nil {
log.Info("error getting block header: ", err)
return err
}
notifCh <- &internal.HeightHash{Height: uint64(height), BlockHash: hash, BlockHeader: header}
} }
//TODO: ClearCache //TODO: ClearCache
log.Warn("implement cache clearing") log.Warn("implement cache clearing")

View file

@ -3,16 +3,19 @@ package db
// db_get.go contains the basic access functions to the database. // db_get.go contains the basic access functions to the database.
import ( import (
"bytes"
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"log"
"math" "math"
"github.com/lbryio/herald.go/db/prefixes" "github.com/lbryio/herald.go/db/prefixes"
"github.com/lbryio/herald.go/db/stack" "github.com/lbryio/herald.go/db/stack"
"github.com/lbryio/lbcd/chaincfg/chainhash" "github.com/lbryio/lbcd/chaincfg/chainhash"
"github.com/lbryio/lbcd/wire"
"github.com/linxGnu/grocksdb" "github.com/linxGnu/grocksdb"
log "github.com/sirupsen/logrus"
) )
// GetExpirationHeight returns the expiration height for the given height. Uses // GetExpirationHeight returns the expiration height for the given height. Uses
@ -65,6 +68,31 @@ func (db *ReadOnlyDBColumnFamily) GetBlockHash(height uint32) ([]byte, error) {
return rawValue, nil return rawValue, nil
} }
func (db *ReadOnlyDBColumnFamily) GetBlockTXs(height uint32) ([]*chainhash.Hash, error) {
handle, err := db.EnsureHandle(prefixes.BlockTXs)
if err != nil {
return nil, err
}
key := prefixes.BlockTxsKey{
Prefix: []byte{prefixes.BlockTXs},
Height: height,
}
slice, err := db.DB.GetCF(db.Opts, handle, key.PackKey())
defer slice.Free()
if err != nil {
return nil, err
}
if slice.Size() == 0 {
return nil, nil
}
rawValue := make([]byte, len(slice.Data()))
copy(rawValue, slice.Data())
value := prefixes.BlockTxsValueUnpack(rawValue)
return value.TxHashes, nil
}
func (db *ReadOnlyDBColumnFamily) GetHeader(height uint32) ([]byte, error) { func (db *ReadOnlyDBColumnFamily) GetHeader(height uint32) ([]byte, error) {
handle, err := db.EnsureHandle(prefixes.Header) handle, err := db.EnsureHandle(prefixes.Header)
if err != nil { if err != nil {
@ -271,6 +299,7 @@ func (db *ReadOnlyDBColumnFamily) GetStatus(hashX []byte) ([]byte, error) {
// Lookup in HashXMempoolStatus first. // Lookup in HashXMempoolStatus first.
status, err := db.getMempoolStatus(hashX) status, err := db.getMempoolStatus(hashX)
if err == nil && status != nil { if err == nil && status != nil {
log.Debugf("(mempool) status(%#v) -> %#v", hashX, status)
return status, err return status, err
} }
@ -291,6 +320,7 @@ func (db *ReadOnlyDBColumnFamily) GetStatus(hashX []byte) ([]byte, error) {
copy(rawValue, slice.Data()) copy(rawValue, slice.Data())
value := prefixes.HashXStatusValue{} value := prefixes.HashXStatusValue{}
value.UnpackValue(rawValue) value.UnpackValue(rawValue)
log.Debugf("status(%#v) -> %#v", hashX, value.Status)
return value.Status, nil return value.Status, nil
} }
@ -299,6 +329,11 @@ func (db *ReadOnlyDBColumnFamily) GetStatus(hashX []byte) ([]byte, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(txs) == 0 {
return []byte{}, err
}
hash := sha256.New() hash := sha256.New()
for _, tx := range txs { for _, tx := range txs {
hash.Write([]byte(fmt.Sprintf("%s:%d:", tx.TxHash.String(), tx.Height))) hash.Write([]byte(fmt.Sprintf("%s:%d:", tx.TxHash.String(), tx.Height)))
@ -731,6 +766,70 @@ func (db *ReadOnlyDBColumnFamily) FsGetClaimByHash(claimHash []byte) (*ResolveRe
) )
} }
func (db *ReadOnlyDBColumnFamily) GetTx(txhash *chainhash.Hash) ([]byte, *wire.MsgTx, error) {
// Lookup in MempoolTx first.
raw, tx, err := db.getMempoolTx(txhash)
if err == nil && raw != nil && tx != nil {
return raw, tx, err
}
handle, err := db.EnsureHandle(prefixes.Tx)
if err != nil {
return nil, nil, err
}
key := prefixes.TxKey{Prefix: []byte{prefixes.Tx}, TxHash: txhash}
rawKey := key.PackKey()
slice, err := db.DB.GetCF(db.Opts, handle, rawKey)
defer slice.Free()
if err != nil {
return nil, nil, err
}
if slice.Size() == 0 {
return nil, nil, nil
}
rawValue := make([]byte, len(slice.Data()))
copy(rawValue, slice.Data())
value := prefixes.TxValue{}
value.UnpackValue(rawValue)
var msgTx wire.MsgTx
err = msgTx.Deserialize(bytes.NewReader(value.RawTx))
if err != nil {
return nil, nil, err
}
return value.RawTx, &msgTx, nil
}
func (db *ReadOnlyDBColumnFamily) getMempoolTx(txhash *chainhash.Hash) ([]byte, *wire.MsgTx, error) {
handle, err := db.EnsureHandle(prefixes.MempoolTx)
if err != nil {
return nil, nil, err
}
key := prefixes.MempoolTxKey{Prefix: []byte{prefixes.Tx}, TxHash: txhash}
rawKey := key.PackKey()
slice, err := db.DB.GetCF(db.Opts, handle, rawKey)
defer slice.Free()
if err != nil {
return nil, nil, err
}
if slice.Size() == 0 {
return nil, nil, nil
}
rawValue := make([]byte, len(slice.Data()))
copy(rawValue, slice.Data())
value := prefixes.MempoolTxValue{}
value.UnpackValue(rawValue)
var msgTx wire.MsgTx
err = msgTx.Deserialize(bytes.NewReader(value.RawTx))
if err != nil {
return nil, nil, err
}
return value.RawTx, &msgTx, nil
}
func (db *ReadOnlyDBColumnFamily) GetTxCount(height uint32) (*prefixes.TxCountValue, error) { func (db *ReadOnlyDBColumnFamily) GetTxCount(height uint32) (*prefixes.TxCountValue, error) {
handle, err := db.EnsureHandle(prefixes.TxCount) handle, err := db.EnsureHandle(prefixes.TxCount)
if err != nil { if err != nil {
@ -754,6 +853,123 @@ func (db *ReadOnlyDBColumnFamily) GetTxCount(height uint32) (*prefixes.TxCountVa
return value, nil return value, nil
} }
func (db *ReadOnlyDBColumnFamily) GetTxHeight(txhash *chainhash.Hash) (uint32, error) {
handle, err := db.EnsureHandle(prefixes.TxNum)
if err != nil {
return 0, err
}
key := prefixes.TxNumKey{Prefix: []byte{prefixes.TxNum}, TxHash: txhash}
rawKey := key.PackKey()
slice, err := db.DB.GetCF(db.Opts, handle, rawKey)
defer slice.Free()
if err != nil {
return 0, err
}
if slice.Size() == 0 {
return 0, nil
}
// No slice copy needed. Value will be abandoned.
value := prefixes.TxNumValueUnpack(slice.Data())
height := stack.BisectRight(db.TxCounts, []uint32{value.TxNum})[0]
return height, nil
}
type TxMerkle struct {
TxHash *chainhash.Hash
RawTx []byte
Height int
Pos uint32
Merkle []*chainhash.Hash
}
// merklePath selects specific transactions by position within blockTxs.
// The resulting merkle path (aka merkle branch, or merkle) is a list of TX hashes
// which are in sibling relationship with TX nodes on the path to the root.
func merklePath(pos uint32, blockTxs, partial []*chainhash.Hash) []*chainhash.Hash {
parent := func(p uint32) uint32 {
return p >> 1
}
sibling := func(p uint32) uint32 {
if p%2 == 0 {
return p + 1
} else {
return p - 1
}
}
p := parent(pos)
if p == 0 {
// No parent, path is complete.
return partial
}
// Add sibling to partial path and proceed to parent TX.
return merklePath(p, blockTxs, append(partial, blockTxs[sibling(pos)]))
}
func (db *ReadOnlyDBColumnFamily) GetTxMerkle(tx_hashes []chainhash.Hash) ([]TxMerkle, error) {
selectedTxNum := make([]*IterOptions, 0, len(tx_hashes))
for _, txhash := range tx_hashes {
key := prefixes.TxNumKey{Prefix: []byte{prefixes.TxNum}, TxHash: &txhash}
log.Debugf("%v", key)
opt, err := db.selectFrom(key.Prefix, &key, &key)
if err != nil {
return nil, err
}
selectedTxNum = append(selectedTxNum, opt...)
}
selectTxByTxNum := func(in []*prefixes.PrefixRowKV) ([]*IterOptions, error) {
txNumKey := in[0].Key.(*prefixes.TxNumKey)
log.Debugf("%v", txNumKey.TxHash.String())
out := make([]*IterOptions, 0, 100)
startKey := &prefixes.TxKey{
Prefix: []byte{prefixes.Tx},
TxHash: txNumKey.TxHash,
}
endKey := &prefixes.TxKey{
Prefix: []byte{prefixes.Tx},
TxHash: txNumKey.TxHash,
}
selectedTx, err := db.selectFrom([]byte{prefixes.Tx}, startKey, endKey)
if err != nil {
return nil, err
}
out = append(out, selectedTx...)
return out, nil
}
blockTxsCache := make(map[uint32][]*chainhash.Hash)
results := make([]TxMerkle, 0, 500)
for kvs := range innerJoin(db.DB, iterate(db.DB, selectedTxNum), selectTxByTxNum) {
if err := checkForError(kvs); err != nil {
return results, err
}
txNumKey, txNumVal := kvs[0].Key.(*prefixes.TxNumKey), kvs[0].Value.(*prefixes.TxNumValue)
_, txVal := kvs[1].Key.(*prefixes.TxKey), kvs[1].Value.(*prefixes.TxValue)
txHeight := stack.BisectRight(db.TxCounts, []uint32{txNumVal.TxNum})[0]
txPos := txNumVal.TxNum - db.TxCounts.Get(txHeight-1)
// We need all the TX hashes in order to select out the relevant ones.
if _, ok := blockTxsCache[txHeight]; !ok {
txs, err := db.GetBlockTXs(txHeight)
if err != nil {
return results, err
}
blockTxsCache[txHeight] = txs
}
blockTxs, _ := blockTxsCache[txHeight]
results = append(results, TxMerkle{
TxHash: txNumKey.TxHash,
RawTx: txVal.RawTx,
Height: int(txHeight),
Pos: txPos,
Merkle: merklePath(txPos, blockTxs, []*chainhash.Hash{}),
})
}
return results, nil
}
func (db *ReadOnlyDBColumnFamily) GetDBState() (*prefixes.DBStateValue, error) { func (db *ReadOnlyDBColumnFamily) GetDBState() (*prefixes.DBStateValue, error) {
handle, err := db.EnsureHandle(prefixes.DBState) handle, err := db.EnsureHandle(prefixes.DBState)
if err != nil { if err != nil {

View file

@ -7,6 +7,7 @@ import (
"strings" "strings"
"github.com/go-restruct/restruct" "github.com/go-restruct/restruct"
"github.com/lbryio/herald.go/internal"
"github.com/lbryio/lbcd/chaincfg/chainhash" "github.com/lbryio/lbcd/chaincfg/chainhash"
) )
@ -59,6 +60,34 @@ func (kv *BlockTxsValue) Unpack(buf []byte, order binary.ByteOrder) ([]byte, err
return buf[offset:], nil return buf[offset:], nil
} }
// Struct BigEndianChainHash is a chainhash.Hash stored in external
// byte-order (opposite of other 32 byte chainhash.Hash values). In order
// to reuse chainhash.Hash we need to correct the byte-order.
// Currently this type is used for field Genesis of DBStateValue.
func (kv *BigEndianChainHash) SizeOf() int {
return chainhash.HashSize
}
func (kv *BigEndianChainHash) Pack(buf []byte, order binary.ByteOrder) ([]byte, error) {
offset := 0
hash := kv.CloneBytes()
// HACK: Instances of chainhash.Hash use the internal byte-order.
// Python scribe writes bytes of genesis hash in external byte-order.
internal.ReverseBytesInPlace(hash)
offset += copy(buf[offset:chainhash.HashSize], hash[:])
return buf[offset:], nil
}
func (kv *BigEndianChainHash) Unpack(buf []byte, order binary.ByteOrder) ([]byte, error) {
offset := 0
offset += copy(kv.Hash[:], buf[offset:32])
// HACK: Instances of chainhash.Hash use the internal byte-order.
// Python scribe writes bytes of genesis hash in external byte-order.
internal.ReverseBytesInPlace(kv.Hash[:])
return buf[offset:], nil
}
func genericNew(prefix []byte, key bool) (interface{}, error) { func genericNew(prefix []byte, key bool) (interface{}, error) {
t, ok := prefixRegistry[prefix[0]] t, ok := prefixRegistry[prefix[0]]
if !ok { if !ok {

View file

@ -182,12 +182,25 @@ func NewLengthEncodedPartialClaimId(s string) LengthEncodedPartialClaimId {
} }
} }
type BigEndianChainHash struct {
chainhash.Hash
}
func NewBigEndianChainHash(hash *chainhash.Hash) BigEndianChainHash {
if hash != nil {
return BigEndianChainHash{
*hash,
}
}
return BigEndianChainHash{}
}
type DBStateKey struct { type DBStateKey struct {
Prefix []byte `struct:"[1]byte" json:"prefix"` Prefix []byte `struct:"[1]byte" json:"prefix"`
} }
type DBStateValue struct { type DBStateValue struct {
Genesis *chainhash.Hash Genesis BigEndianChainHash
Height uint32 Height uint32
TxCount uint32 TxCount uint32
Tip *chainhash.Hash Tip *chainhash.Hash
@ -203,7 +216,7 @@ type DBStateValue struct {
func NewDBStateValue() *DBStateValue { func NewDBStateValue() *DBStateValue {
return &DBStateValue{ return &DBStateValue{
Genesis: new(chainhash.Hash), Genesis: NewBigEndianChainHash(nil),
Height: 0, Height: 0,
TxCount: 0, TxCount: 0,
Tip: new(chainhash.Hash), Tip: new(chainhash.Hash),
@ -237,7 +250,11 @@ func (v *DBStateValue) PackValue() []byte {
// b'>32sLL32sLLBBlllL' // b'>32sLL32sLLBBlllL'
n := 32 + 4 + 4 + 32 + 4 + 4 + 1 + 1 + 4 + 4 + 4 + 4 n := 32 + 4 + 4 + 32 + 4 + 4 + 1 + 1 + 4 + 4 + 4 + 4
value := make([]byte, n) value := make([]byte, n)
copy(value, v.Genesis[:32]) genesis := v.Genesis.CloneBytes()
// HACK: Instances of chainhash.Hash use the internal byte-order.
// Python scribe writes bytes of genesis hash in external byte-order.
internal.ReverseBytesInPlace(genesis)
copy(value, genesis[:32])
binary.BigEndian.PutUint32(value[32:], v.Height) binary.BigEndian.PutUint32(value[32:], v.Height)
binary.BigEndian.PutUint32(value[32+4:], v.TxCount) binary.BigEndian.PutUint32(value[32+4:], v.TxCount)
copy(value[32+4+4:], v.Tip[:32]) copy(value[32+4+4:], v.Tip[:32])
@ -282,8 +299,11 @@ func DBStateKeyUnpack(key []byte) *DBStateKey {
func DBStateValueUnpack(value []byte) *DBStateValue { func DBStateValueUnpack(value []byte) *DBStateValue {
genesis := (*chainhash.Hash)(value[:32]) genesis := (*chainhash.Hash)(value[:32])
tip := (*chainhash.Hash)(value[32+4+4 : 32+4+4+32]) tip := (*chainhash.Hash)(value[32+4+4 : 32+4+4+32])
// HACK: Python scribe writes bytes of genesis hash in external byte-order.
// Instances of chainhash.Hash should use the internal byte-order.
internal.ReverseBytesInPlace(genesis[:])
x := &DBStateValue{ x := &DBStateValue{
Genesis: genesis, Genesis: NewBigEndianChainHash(genesis),
Height: binary.BigEndian.Uint32(value[32:]), Height: binary.BigEndian.Uint32(value[32:]),
TxCount: binary.BigEndian.Uint32(value[32+4:]), TxCount: binary.BigEndian.Uint32(value[32+4:]),
Tip: tip, Tip: tip,
@ -708,7 +728,7 @@ type BlockTxsKey struct {
} }
type BlockTxsValue struct { type BlockTxsValue struct {
TxHashes []*chainhash.Hash `struct-while:"!_eof" json:"tx_hashes"` TxHashes []*chainhash.Hash `struct:"*[32]byte" struct-while:"!_eof" json:"tx_hashes"`
} }
func (k *BlockTxsKey) NewBlockTxsKey(height uint32) *BlockTxsKey { func (k *BlockTxsKey) NewBlockTxsKey(height uint32) *BlockTxsKey {
@ -1050,84 +1070,6 @@ func TxNumValueUnpack(value []byte) *TxNumValue {
} }
} }
type TxKey struct {
Prefix []byte `struct:"[1]byte" json:"prefix"`
TxHash *chainhash.Hash `struct:"*[32]byte" json:"tx_hash"`
}
type TxValue struct {
RawTx []byte `struct-while:"!_eof" json:"raw_tx"`
}
func (k *TxKey) PackKey() []byte {
prefixLen := 1
// b'>L'
n := prefixLen + 32
key := make([]byte, n)
copy(key, k.Prefix)
copy(key[prefixLen:], k.TxHash[:32])
return key
}
func (v *TxValue) PackValue() []byte {
value := make([]byte, len(v.RawTx))
copy(value, v.RawTx)
return value
}
func (kv *TxKey) NumFields() int {
return 1
}
func (k *TxKey) PartialPack(fields int) []byte {
// Limit fields between 0 and number of fields, we always at least need
// the prefix, and we never need to iterate past the number of fields.
if fields > 1 {
fields = 1
}
if fields < 0 {
fields = 0
}
prefixLen := 1
var n = prefixLen
for i := 0; i <= fields; i++ {
switch i {
case 1:
n += 32
}
}
key := make([]byte, n)
for i := 0; i <= fields; i++ {
switch i {
case 0:
copy(key, k.Prefix)
case 1:
copy(key[prefixLen:], k.TxHash[:32])
}
}
return key
}
func TxKeyUnpack(key []byte) *TxKey {
prefixLen := 1
return &TxKey{
Prefix: key[:prefixLen],
TxHash: (*chainhash.Hash)(key[prefixLen : prefixLen+32]),
}
}
func TxValueUnpack(value []byte) *TxValue {
return &TxValue{
RawTx: value,
}
}
type BlockHeaderKey struct { type BlockHeaderKey struct {
Prefix []byte `struct:"[1]byte" json:"prefix"` Prefix []byte `struct:"[1]byte" json:"prefix"`
Height uint32 `json:"height"` Height uint32 `json:"height"`
@ -3351,9 +3293,12 @@ func (kv *TrendingNotificationValue) UnpackValue(buf []byte) {
offset += 8 offset += 8
} }
type TxKey = MempoolTxKey
type TxValue = MempoolTxValue
type MempoolTxKey struct { type MempoolTxKey struct {
Prefix []byte `struct:"[1]byte" json:"prefix"` Prefix []byte `struct:"[1]byte" json:"prefix"`
TxHash []byte `struct:"[32]byte" json:"tx_hash"` TxHash *chainhash.Hash `struct:"*[32]byte" json:"tx_hash"`
} }
type MempoolTxValue struct { type MempoolTxValue struct {
@ -3386,7 +3331,7 @@ func (kv *MempoolTxKey) UnpackKey(buf []byte) {
offset := 0 offset := 0
kv.Prefix = buf[offset : offset+1] kv.Prefix = buf[offset : offset+1]
offset += 1 offset += 1
kv.TxHash = buf[offset : offset+32] kv.TxHash = (*chainhash.Hash)(buf[offset : offset+32])
offset += 32 offset += 32
} }
@ -3925,12 +3870,6 @@ var prefixRegistry = map[byte]prefixMeta{
newValue: func() interface{} { newValue: func() interface{} {
return &TxValue{} return &TxValue{}
}, },
newKeyUnpack: func(buf []byte) interface{} {
return TxKeyUnpack(buf)
},
newValueUnpack: func(buf []byte) interface{} {
return TxValueUnpack(buf)
},
}, },
BlockHash: { BlockHash: {
newKey: func() interface{} { newKey: func() interface{} {

View file

@ -6,4 +6,5 @@ package internal
type HeightHash struct { type HeightHash struct {
Height uint64 Height uint64
BlockHash []byte BlockHash []byte
BlockHeader []byte
} }

View file

@ -3,6 +3,7 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"strconv"
"time" "time"
_ "net/http/pprof" _ "net/http/pprof"
@ -63,7 +64,7 @@ func main() {
return return
} }
conn, err := grpc.Dial("localhost:"+args.Port, conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
grpc.WithInsecure(), grpc.WithInsecure(),
grpc.WithBlock(), grpc.WithBlock(),
) )

View file

@ -22,7 +22,7 @@ const (
type Args struct { type Args struct {
CmdType int CmdType int
Host string Host string
Port string Port int
DBPath string DBPath string
Chain *string Chain *string
EsHost string EsHost string
@ -67,7 +67,7 @@ type Args struct {
const ( const (
DefaultHost = "0.0.0.0" DefaultHost = "0.0.0.0"
DefaultPort = "50051" DefaultPort = 50051
DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME
DefaultEsHost = "http://localhost" DefaultEsHost = "http://localhost"
DefaultEsIndex = "claims" DefaultEsIndex = "claims"
@ -214,7 +214,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
// main server config arguments // main server config arguments
host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost}) host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost})
port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort}) port := parser.Int("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Validate: validatePort, Default: DefaultPort})
dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath}) dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath})
chain := parser.Selector("", "chain", []string{chaincfg.MainNetParams.Name, chaincfg.TestNet3Params.Name, chaincfg.RegressionNetParams.Name, "testnet"}, chain := parser.Selector("", "chain", []string{chaincfg.MainNetParams.Name, chaincfg.TestNet3Params.Name, chaincfg.RegressionNetParams.Name, "testnet"},
&argparse.Options{Required: false, Help: "Which chain to use, default is 'mainnet'. Values 'regtest' and 'testnet' are for testing", Default: chaincfg.MainNetParams.Name}) &argparse.Options{Required: false, Help: "Which chain to use, default is 'mainnet'. Values 'regtest' and 'testnet' are for testing", Default: chaincfg.MainNetParams.Name})

View file

@ -7,6 +7,7 @@ import (
"math" "math"
"net" "net"
"os" "os"
"strconv"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
@ -86,7 +87,7 @@ func (s *Server) getAndSetExternalIp(ip, port string) error {
// storing them as known peers. Returns a map of peerKey -> object // storing them as known peers. Returns a map of peerKey -> object
func (s *Server) loadPeers() error { func (s *Server) loadPeers() error {
peerFile := s.Args.PeerFile peerFile := s.Args.PeerFile
port := s.Args.Port port := strconv.Itoa(s.Args.Port)
// First we make sure our server has come up, so we can answer back to peers. // First we make sure our server has come up, so we can answer back to peers.
var failures = 0 var failures = 0
@ -181,12 +182,12 @@ func (s *Server) subscribeToPeer(peer *Peer) error {
msg := &pb.ServerMessage{ msg := &pb.ServerMessage{
Address: s.ExternalIP.String(), Address: s.ExternalIP.String(),
Port: s.Args.Port, Port: strconv.Itoa(s.Args.Port),
} }
c := pb.NewHubClient(conn) c := pb.NewHubClient(conn)
log.Printf("%s:%s subscribing to %+v\n", s.ExternalIP, s.Args.Port, peer) log.Printf("%s:%d subscribing to %+v\n", s.ExternalIP, s.Args.Port, peer)
_, err = c.PeerSubscribe(ctx, msg) _, err = c.PeerSubscribe(ctx, msg)
if err != nil { if err != nil {
return err return err
@ -219,12 +220,12 @@ func (s *Server) helloPeer(peer *Peer) (*pb.HelloMessage, error) {
c := pb.NewHubClient(conn) c := pb.NewHubClient(conn)
msg := &pb.HelloMessage{ msg := &pb.HelloMessage{
Port: s.Args.Port, Port: strconv.Itoa(s.Args.Port),
Host: s.ExternalIP.String(), Host: s.ExternalIP.String(),
Servers: []*pb.ServerMessage{}, Servers: []*pb.ServerMessage{},
} }
log.Printf("%s:%s saying hello to %+v\n", s.ExternalIP, s.Args.Port, peer) log.Printf("%s:%d saying hello to %+v\n", s.ExternalIP, s.Args.Port, peer)
res, err := c.Hello(ctx, msg) res, err := c.Hello(ctx, msg)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
@ -345,15 +346,15 @@ func (s *Server) addPeer(newPeer *Peer, ping bool, subscribe bool) error {
} }
} }
if s.Args.Port == newPeer.Port && if strconv.Itoa(s.Args.Port) == newPeer.Port &&
(localHosts[newPeer.Address] || newPeer.Address == s.ExternalIP.String()) { (localHosts[newPeer.Address] || newPeer.Address == s.ExternalIP.String()) {
log.Printf("%s:%s addPeer: Self peer, skipping...\n", s.ExternalIP, s.Args.Port) log.Printf("%s:%d addPeer: Self peer, skipping...\n", s.ExternalIP, s.Args.Port)
return nil return nil
} }
k := peerKey(newPeer) k := peerKey(newPeer)
log.Printf("%s:%s adding peer %+v\n", s.ExternalIP, s.Args.Port, newPeer) log.Printf("%s:%d adding peer %+v\n", s.ExternalIP, s.Args.Port, newPeer)
if oldServer, loaded := s.PeerServersLoadOrStore(newPeer); !loaded { if oldServer, loaded := s.PeerServersLoadOrStore(newPeer); !loaded {
if ping { if ping {
_, err := s.helloPeer(newPeer) _, err := s.helloPeer(newPeer)
@ -415,7 +416,7 @@ func (s *Server) makeHelloMessage() *pb.HelloMessage {
s.PeerServersMut.RUnlock() s.PeerServersMut.RUnlock()
return &pb.HelloMessage{ return &pb.HelloMessage{
Port: s.Args.Port, Port: strconv.Itoa(s.Args.Port),
Host: s.ExternalIP.String(), Host: s.ExternalIP.String(),
Servers: servers, Servers: servers,
} }

View file

@ -7,6 +7,7 @@ import (
"log" "log"
"net" "net"
"os" "os"
"strconv"
"strings" "strings"
"testing" "testing"
@ -167,7 +168,7 @@ func TestAddPeerEndpoint(t *testing.T) {
ctx := stop.NewDebug() ctx := stop.NewDebug()
args := server.MakeDefaultTestArgs() args := server.MakeDefaultTestArgs()
args2 := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs()
args2.Port = "50052" args2.Port = 50052
tests := []struct { tests := []struct {
name string name string
@ -198,7 +199,7 @@ func TestAddPeerEndpoint(t *testing.T) {
go hubServer.Run() go hubServer.Run()
go hubServer2.Run() go hubServer2.Run()
//go hubServer.Run() //go hubServer.Run()
conn, err := grpc.Dial("localhost:"+args.Port, conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
grpc.WithInsecure(), grpc.WithInsecure(),
grpc.WithBlock(), grpc.WithBlock(),
) )
@ -240,8 +241,8 @@ func TestAddPeerEndpoint2(t *testing.T) {
args := server.MakeDefaultTestArgs() args := server.MakeDefaultTestArgs()
args2 := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs()
args3 := server.MakeDefaultTestArgs() args3 := server.MakeDefaultTestArgs()
args2.Port = "50052" args2.Port = 50052
args3.Port = "50053" args3.Port = 50053
tests := []struct { tests := []struct {
name string name string
@ -266,7 +267,7 @@ func TestAddPeerEndpoint2(t *testing.T) {
go hubServer.Run() go hubServer.Run()
go hubServer2.Run() go hubServer2.Run()
go hubServer3.Run() go hubServer3.Run()
conn, err := grpc.Dial("localhost:"+args.Port, conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
grpc.WithInsecure(), grpc.WithInsecure(),
grpc.WithBlock(), grpc.WithBlock(),
) )
@ -322,8 +323,8 @@ func TestAddPeerEndpoint3(t *testing.T) {
args := server.MakeDefaultTestArgs() args := server.MakeDefaultTestArgs()
args2 := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs()
args3 := server.MakeDefaultTestArgs() args3 := server.MakeDefaultTestArgs()
args2.Port = "50052" args2.Port = 50052
args3.Port = "50053" args3.Port = 50053
tests := []struct { tests := []struct {
name string name string
@ -348,7 +349,7 @@ func TestAddPeerEndpoint3(t *testing.T) {
go hubServer.Run() go hubServer.Run()
go hubServer2.Run() go hubServer2.Run()
go hubServer3.Run() go hubServer3.Run()
conn, err := grpc.Dial("localhost:"+args.Port, conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port),
grpc.WithInsecure(), grpc.WithInsecure(),
grpc.WithBlock(), grpc.WithBlock(),
) )
@ -412,7 +413,7 @@ func TestUDPServer(t *testing.T) {
args := server.MakeDefaultTestArgs() args := server.MakeDefaultTestArgs()
args.DisableStartUDP = false args.DisableStartUDP = false
args2 := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs()
args2.Port = "50052" args2.Port = 50052
args2.DisableStartUDP = false args2.DisableStartUDP = false
tests := []struct { tests := []struct {
@ -449,12 +450,12 @@ func TestUDPServer(t *testing.T) {
got1 := hubServer.ExternalIP.String() got1 := hubServer.ExternalIP.String()
if got1 != tt.want { if got1 != tt.want {
t.Errorf("hubServer.ExternalIP = %s, want %s\n", got1, tt.want) t.Errorf("hubServer.ExternalIP = %s, want %s\n", got1, tt.want)
t.Errorf("hubServer.Args.Port = %s\n", hubServer.Args.Port) t.Errorf("hubServer.Args.Port = %d\n", hubServer.Args.Port)
} }
got2 := hubServer2.ExternalIP.String() got2 := hubServer2.ExternalIP.String()
if got2 != tt.want { if got2 != tt.want {
t.Errorf("hubServer2.ExternalIP = %s, want %s\n", got2, tt.want) t.Errorf("hubServer2.ExternalIP = %s, want %s\n", got2, tt.want)
t.Errorf("hubServer2.Args.Port = %s\n", hubServer2.Args.Port) t.Errorf("hubServer2.Args.Port = %d\n", hubServer2.Args.Port)
} }
}) })
} }

View file

@ -7,6 +7,7 @@ import (
"encoding/base64" "encoding/base64"
"encoding/binary" "encoding/binary"
"encoding/hex" "encoding/hex"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -55,6 +56,14 @@ type BlockchainScripthashService struct {
session *session session *session
} }
// BlockchainTransactionService methods handle "blockchain.transaction.*" RPCs
type BlockchainTransactionService struct {
DB *db.ReadOnlyDBColumnFamily
Chain *chaincfg.Params
// needed for broadcast TX
sessionMgr *sessionManager
}
const CHUNK_SIZE = 96 const CHUNK_SIZE = 96
const MAX_CHUNK_SIZE = 40960 const MAX_CHUNK_SIZE = 40960
const HEADER_SIZE = wire.MaxBlockHeaderPayload const HEADER_SIZE = wire.MaxBlockHeaderPayload
@ -162,9 +171,42 @@ type BlockHeadersReq struct {
B64 bool `json:"b64"` B64 bool `json:"b64"`
} }
func (req *BlockHeadersReq) UnmarshalJSON(b []byte) error {
var params [4]interface{}
err := json.Unmarshal(b, &params)
if err != nil {
return err
}
switch params[0].(type) {
case float64:
req.StartHeight = uint32(params[0].(float64))
default:
return fmt.Errorf("expected numeric argument #0 (start_height)")
}
switch params[1].(type) {
case float64:
req.Count = uint32(params[1].(float64))
default:
return fmt.Errorf("expected numeric argument #1 (count)")
}
switch params[2].(type) {
case float64:
req.CpHeight = uint32(params[2].(float64))
default:
return fmt.Errorf("expected numeric argument #2 (cp_height)")
}
switch params[3].(type) {
case bool:
req.B64 = params[3].(bool)
default:
return fmt.Errorf("expected boolean argument #3 (b64)")
}
return nil
}
type BlockHeadersResp struct { type BlockHeadersResp struct {
Base64 string `json:"base64,omitempty"` Base64 string `json:"base64,omitempty"`
Hex string `json:"hex,omitempty"` Hex string `json:"hex"`
Count uint32 `json:"count"` Count uint32 `json:"count"`
Max uint32 `json:"max"` Max uint32 `json:"max"`
Branch string `json:"branch,omitempty"` Branch string `json:"branch,omitempty"`
@ -209,6 +251,21 @@ type HeadersSubscribeReq struct {
Raw bool `json:"raw"` Raw bool `json:"raw"`
} }
func (req *HeadersSubscribeReq) UnmarshalJSON(b []byte) error {
var params [1]interface{}
err := json.Unmarshal(b, &params)
if err != nil {
return err
}
switch params[0].(type) {
case bool:
req.Raw = params[0].(bool)
default:
return fmt.Errorf("expected bool argument #0 (raw)")
}
return nil
}
type HeadersSubscribeResp struct { type HeadersSubscribeResp struct {
BlockHeaderElectrum BlockHeaderElectrum
} }
@ -336,6 +393,23 @@ func (s *BlockchainScripthashService) Get_balance(req *scripthashGetBalanceReq,
type AddressGetHistoryReq struct { type AddressGetHistoryReq struct {
Address string `json:"address"` Address string `json:"address"`
} }
func (req *AddressGetHistoryReq) UnmarshalJSON(b []byte) error {
var params [1]interface{}
json.Unmarshal(b, &params)
err := json.Unmarshal(b, &params)
if err != nil {
return err
}
switch params[0].(type) {
case string:
req.Address = params[0].(string)
default:
return fmt.Errorf("expected string argument #0 (address)")
}
return nil
}
type TxInfo struct { type TxInfo struct {
TxHash string `json:"tx_hash"` TxHash string `json:"tx_hash"`
Height uint32 `json:"height"` Height uint32 `json:"height"`
@ -344,10 +418,7 @@ type TxInfoFee struct {
TxInfo TxInfo
Fee uint64 `json:"fee"` Fee uint64 `json:"fee"`
} }
type AddressGetHistoryResp struct { type AddressGetHistoryResp []TxInfoFee
Confirmed []TxInfo `json:"confirmed"`
Unconfirmed []TxInfoFee `json:"unconfirmed"`
}
// 'blockchain.address.get_history' // 'blockchain.address.get_history'
func (s *BlockchainAddressService) Get_history(req *AddressGetHistoryReq, resp **AddressGetHistoryResp) error { func (s *BlockchainAddressService) Get_history(req *AddressGetHistoryReq, resp **AddressGetHistoryResp) error {
@ -375,11 +446,18 @@ func (s *BlockchainAddressService) Get_history(req *AddressGetHistoryReq, resp *
Height: tx.Height, Height: tx.Height,
}) })
} }
result := &AddressGetHistoryResp{ unconfirmed := []TxInfoFee{} // TODO
Confirmed: confirmed, result := make(AddressGetHistoryResp, len(confirmed)+len(unconfirmed))
Unconfirmed: []TxInfoFee{}, // TODO i := 0
for _, tx := range confirmed {
result[i].TxInfo = tx
i += 1
} }
*resp = result for _, tx := range unconfirmed {
result[i] = tx
i += 1
}
*resp = &result
return err return err
} }
@ -626,3 +704,221 @@ func (s *BlockchainScripthashService) Unsubscribe(req *ScripthashSubscribeReq, r
*resp = (*ScripthashSubscribeResp)(nil) *resp = (*ScripthashSubscribeResp)(nil)
return nil return nil
} }
type TransactionBroadcastReq string
type TransactionBroadcastResp string
// 'blockchain.transaction.broadcast'
func (s *BlockchainTransactionService) Broadcast(req *TransactionBroadcastReq, resp **TransactionBroadcastResp) error {
if s.sessionMgr == nil {
return errors.New("no session manager, rpc not supported")
}
strTx := string(*req)
rawTx, err := hex.DecodeString(strTx)
if err != nil {
return err
}
txhash, err := s.sessionMgr.broadcastTx(rawTx)
if err != nil {
return err
}
result := txhash.String()
*resp = (*TransactionBroadcastResp)(&result)
return nil
}
type TransactionGetReq string
type TXFullDetail struct {
Height int `json:"block_height"`
Pos uint32 `json:"pos"`
Merkle []string `json:"merkle"`
}
type TXDetail struct {
Height int `json:"block_height"`
}
type TXGetItem struct {
TxHash string
TxRaw string
Detail interface{} // TXFullDetail or TXDetail struct
}
type TransactionGetResp TXGetItem
// 'blockchain.transaction.get'
func (s *BlockchainTransactionService) Get(req *TransactionGetReq, resp **TransactionGetResp) error {
txids := [1]string{string(*req)}
request := TransactionGetBatchReq(txids[:])
var response *TransactionGetBatchResp
err := s.Get_batch(&request, &response)
if err != nil {
return err
}
if len(*response) < 1 {
return errors.New("tx not found")
}
switch (*response)[0].Detail.(type) {
case TXFullDetail:
break
case TXDetail:
default:
return errors.New("tx not confirmed")
}
*resp = (*TransactionGetResp)(&(*response)[0])
return err
}
type TransactionGetBatchReq []string
func (req *TransactionGetBatchReq) UnmarshalJSON(b []byte) error {
var params []interface{}
json.Unmarshal(b, &params)
if len(params) > 100 {
return fmt.Errorf("too many tx hashes in request: %v", len(params))
}
for i, txhash := range params {
switch params[0].(type) {
case string:
*req = append(*req, txhash.(string))
default:
return fmt.Errorf("expected string argument #%d (tx_hash)", i)
}
}
return nil
}
type TransactionGetBatchResp []TXGetItem
func (resp *TransactionGetBatchResp) MarshalJSON() ([]byte, error) {
// encode key/value pairs as variable length JSON object
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
buf.WriteString("{")
for i, r := range *resp {
if i > 0 {
buf.WriteString(",")
}
txhash, raw, detail := r.TxHash, r.TxRaw, r.Detail
err := enc.Encode(txhash)
if err != nil {
return nil, err
}
buf.WriteString(":[")
err = enc.Encode(raw)
if err != nil {
return nil, err
}
buf.WriteString(",")
err = enc.Encode(detail)
if err != nil {
return nil, err
}
buf.WriteString("]")
}
buf.WriteString("}")
return buf.Bytes(), nil
}
// 'blockchain.transaction.get_batch'
func (s *BlockchainTransactionService) Get_batch(req *TransactionGetBatchReq, resp **TransactionGetBatchResp) error {
if len(*req) > 100 {
return fmt.Errorf("too many tx hashes in request: %v", len(*req))
}
tx_hashes := make([]chainhash.Hash, 0, len(*req))
for i, txid := range *req {
tx_hashes = append(tx_hashes, chainhash.Hash{})
if err := chainhash.Decode(&tx_hashes[i], txid); err != nil {
return err
}
}
dbResult, err := s.DB.GetTxMerkle(tx_hashes)
if err != nil {
return err
}
result := make([]TXGetItem, 0, len(dbResult))
for _, r := range dbResult {
merkles := make([]string, len(r.Merkle))
for i, m := range r.Merkle {
merkles[i] = m.String()
}
detail := TXFullDetail{
Height: r.Height,
Pos: r.Pos,
Merkle: merkles,
}
result = append(result, TXGetItem{r.TxHash.String(), hex.EncodeToString(r.RawTx), &detail})
}
*resp = (*TransactionGetBatchResp)(&result)
return err
}
type TransactionGetMerkleReq struct {
TxHash string `json:"tx_hash"`
Height uint32 `json:"height"`
}
type TransactionGetMerkleResp TXGetItem
// 'blockchain.transaction.get_merkle'
func (s *BlockchainTransactionService) Get_merkle(req *TransactionGetMerkleReq, resp **TransactionGetMerkleResp) error {
txids := [1]string{string(req.TxHash)}
request := TransactionGetBatchReq(txids[:])
var response *TransactionGetBatchResp
err := s.Get_batch(&request, &response)
if err != nil {
return err
}
if len(*response) < 1 {
return errors.New("tx not found")
}
switch (*response)[0].Detail.(type) {
case TXFullDetail:
break
case TXDetail:
default:
return errors.New("tx not confirmed")
}
*resp = (*TransactionGetMerkleResp)(&(*response)[0])
return err
}
type TransactionGetHeightReq string
type TransactionGetHeightResp uint32
// 'blockchain.transaction.get_height'
func (s *BlockchainTransactionService) Get_height(req *TransactionGetHeightReq, resp **TransactionGetHeightResp) error {
txid := string(*(req))
txhash, err := chainhash.NewHashFromStr(txid)
if err != nil {
return err
}
height, err := s.DB.GetTxHeight(txhash)
*resp = (*TransactionGetHeightResp)(&height)
return err
}
type TransactionInfoReq string
type TransactionInfoResp TXGetItem
// 'blockchain.transaction.info'
func (s *BlockchainTransactionService) Info(req *TransactionInfoReq, resp **TransactionInfoResp) error {
txids := [1]string{string(*req)}
request := TransactionGetBatchReq(txids[:])
var response *TransactionGetBatchResp
err := s.Get_batch(&request, &response)
if err != nil {
return err
}
if len(*response) < 1 {
return errors.New("tx not found")
}
switch (*response)[0].Detail.(type) {
case TXFullDetail:
break
case TXDetail:
default:
if (*response)[0].TxHash == "" {
return errors.New("no such mempool or blockchain transaction")
}
}
*resp = (*TransactionInfoResp)(&(*response)[0])
return err
}

View file

@ -273,12 +273,10 @@ func TestHeadersSubscribe(t *testing.T) {
t.Errorf("decode err: %v", err) t.Errorf("decode err: %v", err)
} }
note1 := headerNotification{ note1 := headerNotification{
HeightHash: internal.HeightHash{Height: 500}, HeightHash: internal.HeightHash{Height: 500, BlockHeader: header500},
blockHeader: [112]byte{},
blockHeaderElectrum: nil, blockHeaderElectrum: nil,
blockHeaderStr: "", blockHeaderStr: "",
} }
copy(note1.blockHeader[:], header500)
t.Logf("sending notification") t.Logf("sending notification")
sm.doNotify(note1) sm.doNotify(note1)

View file

@ -52,24 +52,23 @@ func (cr *gorillaRpcCodecRequest) Method() (string, error) {
// StartJsonRPC starts the json rpc server and registers the endpoints. // StartJsonRPC starts the json rpc server and registers the endpoints.
func (s *Server) StartJsonRPC() error { func (s *Server) StartJsonRPC() error {
s.sessionManager.start()
defer s.sessionManager.stop()
// Set up the pure JSONRPC server with persistent connections/sessions. // Set up the pure JSONRPC server with persistent connections/sessions.
if s.Args.JSONRPCPort != 0 { if s.Args.JSONRPCPort != 0 {
port := ":" + strconv.FormatUint(uint64(s.Args.JSONRPCPort), 10) port := ":" + strconv.Itoa(s.Args.JSONRPCPort)
laddr, err := net.ResolveTCPAddr("tcp", port) laddr, err := net.ResolveTCPAddr("tcp4", port)
if err != nil { if err != nil {
log.Errorf("ResoveIPAddr: %v\n", err) log.Errorf("ResoveIPAddr: %v\n", err)
goto fail1 goto fail1
} }
listener, err := net.ListenTCP("tcp", laddr) listener, err := net.ListenTCP("tcp4", laddr)
if err != nil { if err != nil {
log.Errorf("ListenTCP: %v\n", err) log.Errorf("ListenTCP: %v\n", err)
goto fail1 goto fail1
} }
log.Infof("JSONRPC server listening on %s", listener.Addr().String()) log.Infof("JSONRPC server listening on %s", listener.Addr().String())
s.sessionManager.start()
acceptConnections := func(listener net.Listener) { acceptConnections := func(listener net.Listener) {
defer s.sessionManager.stop()
for { for {
conn, err := listener.Accept() conn, err := listener.Accept()
if err != nil { if err != nil {
@ -78,6 +77,7 @@ func (s *Server) StartJsonRPC() error {
} }
log.Infof("Accepted: %v", conn.RemoteAddr()) log.Infof("Accepted: %v", conn.RemoteAddr())
s.sessionManager.addSession(conn) s.sessionManager.addSession(conn)
} }
} }
go acceptConnections(netutil.LimitListener(listener, s.sessionManager.sessionsMax)) go acceptConnections(netutil.LimitListener(listener, s.sessionManager.sessionsMax))
@ -98,24 +98,29 @@ fail1:
goto fail2 goto fail2
} }
// Register other "blockchain.{block,address,scripthash}.*" handlers. // Register "blockchain.{block,address,scripthash,transaction}.*" handlers.
blockchainSvc := &BlockchainBlockService{s.DB, s.Chain} blockchainSvc := &BlockchainBlockService{s.DB, s.Chain}
err = s1.RegisterTCPService(blockchainSvc, "blockchain_block") err = s1.RegisterTCPService(blockchainSvc, "blockchain_block")
if err != nil { if err != nil {
log.Errorf("RegisterTCPService: %v\n", err) log.Errorf("RegisterTCPService: %v\n", err)
goto fail2 goto fail2
} }
err = s1.RegisterTCPService(&BlockchainHeadersService{s.DB, s.Chain, nil, nil}, "blockchain_headers") err = s1.RegisterTCPService(&BlockchainHeadersService{s.DB, s.Chain, s.sessionManager, nil}, "blockchain_headers")
if err != nil { if err != nil {
log.Errorf("RegisterTCPService: %v\n", err) log.Errorf("RegisterTCPService: %v\n", err)
goto fail2 goto fail2
} }
err = s1.RegisterTCPService(&BlockchainAddressService{s.DB, s.Chain, nil, nil}, "blockchain_address") err = s1.RegisterTCPService(&BlockchainAddressService{s.DB, s.Chain, s.sessionManager, nil}, "blockchain_address")
if err != nil { if err != nil {
log.Errorf("RegisterTCPService: %v\n", err) log.Errorf("RegisterTCPService: %v\n", err)
goto fail2 goto fail2
} }
err = s1.RegisterTCPService(&BlockchainScripthashService{s.DB, s.Chain, nil, nil}, "blockchain_scripthash") err = s1.RegisterTCPService(&BlockchainScripthashService{s.DB, s.Chain, s.sessionManager, nil}, "blockchain_scripthash")
if err != nil {
log.Errorf("RegisterTCPService: %v\n", err)
goto fail2
}
err = s1.RegisterTCPService(&BlockchainTransactionService{s.DB, s.Chain, s.sessionManager}, "blockchain_transaction")
if err != nil { if err != nil {
log.Errorf("RegisterTCPService: %v\n", err) log.Errorf("RegisterTCPService: %v\n", err)
goto fail2 goto fail2

View file

@ -13,6 +13,7 @@ import (
"net/http" "net/http"
"os" "os"
"regexp" "regexp"
"strconv"
"sync" "sync"
"time" "time"
@ -136,7 +137,8 @@ func (s *Server) PeerServersLoadOrStore(peer *Peer) (actual *Peer, loaded bool)
// Run "main" function for starting the server. This blocks. // Run "main" function for starting the server. This blocks.
func (s *Server) Run() { func (s *Server) Run() {
l, err := net.Listen("tcp", ":"+s.Args.Port) address := ":" + strconv.Itoa(s.Args.Port)
l, err := net.Listen("tcp", address)
if err != nil { if err != nil {
log.Fatalf("failed to listen: %v", err) log.Fatalf("failed to listen: %v", err)
} }
@ -171,6 +173,7 @@ func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, erro
if myDB.LastState != nil { if myDB.LastState != nil {
logrus.Infof("DB version: %v", myDB.LastState.DBVersion) logrus.Infof("DB version: %v", myDB.LastState.DBVersion)
logrus.Infof("height: %v", myDB.LastState.Height) logrus.Infof("height: %v", myDB.LastState.Height)
logrus.Infof("genesis: %v", myDB.LastState.Genesis.String())
logrus.Infof("tip: %v", myDB.LastState.Tip.String()) logrus.Infof("tip: %v", myDB.LastState.Tip.String())
logrus.Infof("tx count: %v", myDB.LastState.TxCount) logrus.Infof("tx count: %v", myDB.LastState.TxCount)
} }
@ -274,9 +277,9 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
// Determine which chain to use based on db and cli values // Determine which chain to use based on db and cli values
dbChain := (*chaincfg.Params)(nil) dbChain := (*chaincfg.Params)(nil)
if myDB != nil && myDB.LastState != nil && myDB.LastState.Genesis != nil { if myDB != nil && myDB.LastState != nil {
// The chain params can be inferred from DBStateValue. // The chain params can be inferred from DBStateValue.
switch *myDB.LastState.Genesis { switch myDB.LastState.Genesis.Hash {
case *chaincfg.MainNetParams.GenesisHash: case *chaincfg.MainNetParams.GenesisHash:
dbChain = &chaincfg.MainNetParams dbChain = &chaincfg.MainNetParams
case *chaincfg.TestNet3Params.GenesisHash: case *chaincfg.TestNet3Params.GenesisHash:
@ -353,11 +356,19 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server {
} }
if !args.DisableStartUDP { if !args.DisableStartUDP {
go func() { go func() {
err := s.UDPServer() err := s.UDPServer(s.Args.Port)
if err != nil { if err != nil {
log.Println("UDP Server failed!", err) logrus.Errorf("UDP Server (%d) failed! %v", s.Args.Port, err)
} }
}() }()
if s.Args.JSONRPCPort != 0 {
go func() {
err := s.UDPServer(s.Args.JSONRPCPort)
if err != nil {
logrus.Errorf("UDP Server (%d) failed! %v", s.Args.JSONRPCPort, err)
}
}()
}
} }
if !args.DisableStartNotifier { if !args.DisableStartNotifier {
go func() { go func() {

View file

@ -16,13 +16,13 @@ import (
"github.com/lbryio/herald.go/db" "github.com/lbryio/herald.go/db"
"github.com/lbryio/herald.go/internal" "github.com/lbryio/herald.go/internal"
"github.com/lbryio/lbcd/chaincfg" "github.com/lbryio/lbcd/chaincfg"
"github.com/lbryio/lbcd/chaincfg/chainhash"
"github.com/lbryio/lbry.go/v3/extras/stop" "github.com/lbryio/lbry.go/v3/extras/stop"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
type headerNotification struct { type headerNotification struct {
internal.HeightHash internal.HeightHash
blockHeader [HEADER_SIZE]byte
blockHeaderElectrum *BlockHeaderElectrum blockHeaderElectrum *BlockHeaderElectrum
blockHeaderStr string blockHeaderStr string
} }
@ -66,7 +66,7 @@ func (s *session) doNotify(notification interface{}) {
if s.headersSubRaw { if s.headersSubRaw {
header := note.blockHeaderStr header := note.blockHeaderStr
if len(header) == 0 { if len(header) == 0 {
header = hex.EncodeToString(note.blockHeader[:]) header = hex.EncodeToString(note.BlockHeader[:])
} }
params = &HeadersSubscribeRawResp{ params = &HeadersSubscribeRawResp{
Hex: header, Hex: header,
@ -75,7 +75,7 @@ func (s *session) doNotify(notification interface{}) {
} else { } else {
header := note.blockHeaderElectrum header := note.blockHeaderElectrum
if header == nil { // not initialized if header == nil { // not initialized
header = newBlockHeaderElectrum(&note.blockHeader, uint32(heightHash.Height)) header = newBlockHeaderElectrum((*[HEADER_SIZE]byte)(note.BlockHeader), uint32(heightHash.Height))
} }
params = header params = header
} }
@ -218,7 +218,7 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
log.Errorf("RegisterName: %v\n", err) log.Errorf("RegisterName: %v\n", err)
} }
// Register other "blockchain.{block,address,scripthash}.*" handlers. // Register "blockchain.{block,address,scripthash,transaction}.*" handlers.
blockchainSvc := &BlockchainBlockService{sm.db, sm.chain} blockchainSvc := &BlockchainBlockService{sm.db, sm.chain}
err = s1.RegisterName("blockchain.block", blockchainSvc) err = s1.RegisterName("blockchain.block", blockchainSvc)
if err != nil { if err != nil {
@ -240,6 +240,11 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
log.Errorf("RegisterName: %v\n", err) log.Errorf("RegisterName: %v\n", err)
goto fail goto fail
} }
err = s1.RegisterName("blockchain.transaction", &BlockchainTransactionService{sm.db, sm.chain, sm})
if err != nil {
log.Errorf("RegisterName: %v\n", err)
goto fail
}
sm.grp.Add(1) sm.grp.Add(1)
go func() { go func() {
@ -276,6 +281,11 @@ func (sm *sessionManager) removeSessionLocked(sess *session) {
sess.conn.Close() sess.conn.Close()
} }
func (sm *sessionManager) broadcastTx(rawTx []byte) (*chainhash.Hash, error) {
// TODO
return nil, nil
}
func (sm *sessionManager) headersSubscribe(sess *session, raw bool, subscribe bool) { func (sm *sessionManager) headersSubscribe(sess *session, raw bool, subscribe bool) {
sm.sessionsMut.Lock() sm.sessionsMut.Lock()
defer sm.sessionsMut.Unlock() defer sm.sessionsMut.Unlock()
@ -315,6 +325,11 @@ func (sm *sessionManager) hashXSubscribe(sess *session, hashX []byte, original s
} }
func (sm *sessionManager) doNotify(notification interface{}) { func (sm *sessionManager) doNotify(notification interface{}) {
switch notification.(type) {
case internal.HeightHash:
// The HeightHash notification translates to headerNotification.
notification = &headerNotification{HeightHash: notification.(internal.HeightHash)}
}
sm.sessionsMut.RLock() sm.sessionsMut.RLock()
var subsCopy sessionMap var subsCopy sessionMap
switch notification.(type) { switch notification.(type) {
@ -322,8 +337,10 @@ func (sm *sessionManager) doNotify(notification interface{}) {
note, _ := notification.(headerNotification) note, _ := notification.(headerNotification)
subsCopy = sm.headerSubs subsCopy = sm.headerSubs
if len(subsCopy) > 0 { if len(subsCopy) > 0 {
note.blockHeaderElectrum = newBlockHeaderElectrum(&note.blockHeader, uint32(note.Height)) hdr := [HEADER_SIZE]byte{}
note.blockHeaderStr = hex.EncodeToString(note.blockHeader[:]) copy(hdr[:], note.BlockHeader)
note.blockHeaderElectrum = newBlockHeaderElectrum(&hdr, uint32(note.Height))
note.blockHeaderStr = hex.EncodeToString(note.BlockHeader[:])
} }
case hashXNotification: case hashXNotification:
note, _ := notification.(hashXNotification) note, _ := notification.(hashXNotification)
@ -489,11 +506,6 @@ func (c *jsonPatchingCodec) Read(p []byte) (n int, err error) {
// Probable single object or list argument. // Probable single object or list argument.
goto encode goto encode
} }
args := strings.Split(string(bracketed), ",")
if len(args) <= 1 {
// No commas at all. Definitely a single argument.
goto encode
}
// The params look like ["arg1", "arg2", "arg3", ...]. // The params look like ["arg1", "arg2", "arg3", ...].
// We're in trouble because our jsonrpc library does not // We're in trouble because our jsonrpc library does not
// handle this. So pack these args in an inner list. // handle this. So pack these args in an inner list.

View file

@ -219,8 +219,9 @@ func UDPPing(ip, port string) (*SPVPong, error) {
// UDPServer is a goroutine that starts an udp server that implements the hubs // UDPServer is a goroutine that starts an udp server that implements the hubs
// Ping/Pong protocol to find out about each other without making full TCP // Ping/Pong protocol to find out about each other without making full TCP
// connections. // connections.
func (s *Server) UDPServer() error { func (s *Server) UDPServer(port int) error {
address := ":" + s.Args.Port address := ":" + strconv.Itoa(port)
tip := make([]byte, 32) tip := make([]byte, 32)
addr, err := net.ResolveUDPAddr("udp", address) addr, err := net.ResolveUDPAddr("udp", address)
if err != nil { if err != nil {