Merge pull request #55 from moodyjon/blockchain_rpc1

Add some blockchain RPC handlers and DB fetching routines
This commit is contained in:
Jonathan Moody 2022-09-14 10:23:34 -05:00 committed by GitHub
commit 7d24ff82bf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 10754 additions and 22 deletions

View file

@ -406,6 +406,75 @@ func Iter(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
return ch return ch
} }
func (db *ReadOnlyDBColumnFamily) selectFrom(prefix []byte, startKey, stopKey prefixes.BaseKey) ([]*IterOptions, error) {
handle, err := db.EnsureHandle(prefix[0])
if err != nil {
return nil, err
}
// Prefix and handle
options := NewIterateOptions().WithPrefix(prefix).WithCfHandle(handle)
// Start and stop bounds
options = options.WithStart(startKey.PackKey()).WithStop(stopKey.PackKey()).WithIncludeStop(true)
// Don't include the key
options = options.WithIncludeKey(true).WithIncludeValue(true)
return []*IterOptions{options}, nil
}
func iterate(db *grocksdb.DB, opts []*IterOptions) <-chan []*prefixes.PrefixRowKV {
out := make(chan []*prefixes.PrefixRowKV)
routine := func() {
for i, o := range opts {
j := 0
for kv := range IterCF(db, o) {
row := make([]*prefixes.PrefixRowKV, 0, 1)
row = append(row, kv)
log.Debugf("iterate[%v][%v] %#v", i, j, kv)
out <- row
j++
}
}
close(out)
}
go routine()
return out
}
func innerJoin(db *grocksdb.DB, in <-chan []*prefixes.PrefixRowKV, selectFn func([]*prefixes.PrefixRowKV) ([]*IterOptions, error)) <-chan []*prefixes.PrefixRowKV {
out := make(chan []*prefixes.PrefixRowKV)
routine := func() {
for kvs := range in {
selected, err := selectFn(kvs)
if err != nil {
out <- []*prefixes.PrefixRowKV{{Error: err}}
close(out)
return
}
for kv := range iterate(db, selected) {
row := make([]*prefixes.PrefixRowKV, 0, len(kvs)+1)
row = append(row, kvs...)
row = append(row, kv...)
for i, kv := range row {
log.Debugf("row[%v] %#v", i, kv)
}
out <- row
}
}
close(out)
return
}
go routine()
return out
}
func checkForError(kvs []*prefixes.PrefixRowKV) error {
for _, kv := range kvs {
if kv.Error != nil {
return kv.Error
}
}
return nil
}
// //
// GetDB functions that open and return a db // GetDB functions that open and return a db
// //
@ -487,7 +556,7 @@ func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*R
var handlesMap = make(map[string]*grocksdb.ColumnFamilyHandle) var handlesMap = make(map[string]*grocksdb.ColumnFamilyHandle)
for i, handle := range handles { for i, handle := range handles {
log.Printf("%d: %+v\n", i, handle) log.Printf("handle %d(%s): %+v\n", i, cfNames[i], handle)
handlesMap[cfNames[i]] = handle handlesMap[cfNames[i]] = handle
} }

View file

@ -6,8 +6,11 @@ import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"log" "log"
"math"
"github.com/lbryio/herald.go/db/prefixes" "github.com/lbryio/herald.go/db/prefixes"
"github.com/lbryio/herald.go/db/stack"
"github.com/lbryio/lbcd/chaincfg/chainhash"
"github.com/linxGnu/grocksdb" "github.com/linxGnu/grocksdb"
) )
@ -82,6 +85,187 @@ func (db *ReadOnlyDBColumnFamily) GetHeader(height uint32) ([]byte, error) {
return rawValue, nil return rawValue, nil
} }
func (db *ReadOnlyDBColumnFamily) GetHeaders(height uint32, count uint32) ([][112]byte, error) {
handle, err := db.EnsureHandle(prefixes.Header)
if err != nil {
return nil, err
}
startKeyRaw := prefixes.NewHeaderKey(height).PackKey()
endKeyRaw := prefixes.NewHeaderKey(height + count).PackKey()
options := NewIterateOptions().WithPrefix([]byte{prefixes.Header}).WithCfHandle(handle)
options = options.WithIncludeKey(false).WithIncludeValue(true) //.WithIncludeStop(true)
options = options.WithStart(startKeyRaw).WithStop(endKeyRaw)
result := make([][112]byte, 0, count)
for kv := range IterCF(db.DB, options) {
h := [112]byte{}
copy(h[:], kv.Value.(*prefixes.BlockHeaderValue).Header[:112])
result = append(result, h)
}
return result, nil
}
func (db *ReadOnlyDBColumnFamily) GetBalance(hashX []byte) (uint64, uint64, error) {
handle, err := db.EnsureHandle(prefixes.UTXO)
if err != nil {
return 0, 0, err
}
startKey := prefixes.UTXOKey{
Prefix: []byte{prefixes.UTXO},
HashX: hashX,
TxNum: 0,
Nout: 0,
}
endKey := prefixes.UTXOKey{
Prefix: []byte{prefixes.UTXO},
HashX: hashX,
TxNum: math.MaxUint32,
Nout: math.MaxUint16,
}
startKeyRaw := startKey.PackKey()
endKeyRaw := endKey.PackKey()
// Prefix and handle
options := NewIterateOptions().WithPrefix([]byte{prefixes.UTXO}).WithCfHandle(handle)
// Start and stop bounds
options = options.WithStart(startKeyRaw).WithStop(endKeyRaw).WithIncludeStop(true)
// Don't include the key
options = options.WithIncludeKey(false).WithIncludeValue(true)
ch := IterCF(db.DB, options)
var confirmed uint64 = 0
var unconfirmed uint64 = 0 // TODO
for kv := range ch {
confirmed += kv.Value.(*prefixes.UTXOValue).Amount
}
return confirmed, unconfirmed, nil
}
type TXOInfo struct {
TxHash *chainhash.Hash
TxPos uint16
Height uint32
Value uint64
}
func (db *ReadOnlyDBColumnFamily) GetUnspent(hashX []byte) ([]TXOInfo, error) {
startKey := &prefixes.UTXOKey{
Prefix: []byte{prefixes.UTXO},
HashX: hashX,
TxNum: 0,
Nout: 0,
}
endKey := &prefixes.UTXOKey{
Prefix: []byte{prefixes.UTXO},
HashX: hashX,
TxNum: math.MaxUint32,
Nout: math.MaxUint16,
}
selectedUTXO, err := db.selectFrom([]byte{prefixes.UTXO}, startKey, endKey)
if err != nil {
return nil, err
}
selectTxHashByTxNum := func(in []*prefixes.PrefixRowKV) ([]*IterOptions, error) {
historyKey := in[0].Key.(*prefixes.UTXOKey)
out := make([]*IterOptions, 0, 100)
startKey := &prefixes.TxHashKey{
Prefix: []byte{prefixes.TxHash},
TxNum: historyKey.TxNum,
}
endKey := &prefixes.TxHashKey{
Prefix: []byte{prefixes.TxHash},
TxNum: historyKey.TxNum,
}
selectedTxHash, err := db.selectFrom([]byte{prefixes.TxHash}, startKey, endKey)
if err != nil {
return nil, err
}
out = append(out, selectedTxHash...)
return out, nil
}
results := make([]TXOInfo, 0, 1000)
for kvs := range innerJoin(db.DB, iterate(db.DB, selectedUTXO), selectTxHashByTxNum) {
if err := checkForError(kvs); err != nil {
return results, err
}
utxoKey := kvs[0].Key.(*prefixes.UTXOKey)
utxoValue := kvs[0].Value.(*prefixes.UTXOValue)
txhashValue := kvs[1].Value.(*prefixes.TxHashValue)
results = append(results,
TXOInfo{
TxHash: txhashValue.TxHash,
TxPos: utxoKey.Nout,
Height: stack.BisectRight(db.TxCounts, []uint32{utxoKey.TxNum})[0],
Value: utxoValue.Amount,
},
)
}
return results, nil
}
type TxInfo struct {
TxHash *chainhash.Hash
Height uint32
}
func (db *ReadOnlyDBColumnFamily) GetHistory(hashX []byte) ([]TxInfo, error) {
startKey := &prefixes.HashXHistoryKey{
Prefix: []byte{prefixes.HashXHistory},
HashX: hashX,
Height: 0,
}
endKey := &prefixes.HashXHistoryKey{
Prefix: []byte{prefixes.HashXHistory},
HashX: hashX,
Height: math.MaxUint32,
}
selectedHistory, err := db.selectFrom([]byte{prefixes.HashXHistory}, startKey, endKey)
if err != nil {
return nil, err
}
selectTxHashByTxNums := func(in []*prefixes.PrefixRowKV) ([]*IterOptions, error) {
historyValue := in[0].Value.(*prefixes.HashXHistoryValue)
out := make([]*IterOptions, 0, 100)
for _, txnum := range historyValue.TxNums {
startKey := &prefixes.TxHashKey{
Prefix: []byte{prefixes.TxHash},
TxNum: txnum,
}
endKey := &prefixes.TxHashKey{
Prefix: []byte{prefixes.TxHash},
TxNum: txnum,
}
selectedTxHash, err := db.selectFrom([]byte{prefixes.TxHash}, startKey, endKey)
if err != nil {
return nil, err
}
out = append(out, selectedTxHash...)
}
return out, nil
}
results := make([]TxInfo, 0, 1000)
for kvs := range innerJoin(db.DB, iterate(db.DB, selectedHistory), selectTxHashByTxNums) {
if err := checkForError(kvs); err != nil {
return results, err
}
historyKey := kvs[0].Key.(*prefixes.HashXHistoryKey)
txHashValue := kvs[1].Value.(*prefixes.TxHashValue)
results = append(results, TxInfo{
TxHash: txHashValue.TxHash,
Height: historyKey.Height,
})
}
return results, nil
}
// GetStreamsAndChannelRepostedByChannelHashes returns a map of streams and channel hashes that are reposted by the given channel hashes. // GetStreamsAndChannelRepostedByChannelHashes returns a map of streams and channel hashes that are reposted by the given channel hashes.
func (db *ReadOnlyDBColumnFamily) GetStreamsAndChannelRepostedByChannelHashes(reposterChannelHashes [][]byte) (map[string][]byte, map[string][]byte, error) { func (db *ReadOnlyDBColumnFamily) GetStreamsAndChannelRepostedByChannelHashes(reposterChannelHashes [][]byte) (map[string][]byte, map[string][]byte, error) {
handle, err := db.EnsureHandle(prefixes.ChannelToClaim) handle, err := db.EnsureHandle(prefixes.ChannelToClaim)

View file

@ -125,6 +125,7 @@ type PrefixRowKV struct {
Value BaseValue Value BaseValue
RawKey []byte RawKey []byte
RawValue []byte RawValue []byte
Error error
} }
type BaseKey interface { type BaseKey interface {
@ -193,7 +194,7 @@ type DBStateValue struct {
UtxoFlushCount uint32 UtxoFlushCount uint32
WallTime uint32 WallTime uint32
FirstSync bool FirstSync bool
DDVersion uint8 DBVersion uint8
HistFlushCount int32 HistFlushCount int32
CompFlushCount int32 CompFlushCount int32
CompCursor int32 CompCursor int32
@ -209,7 +210,7 @@ func NewDBStateValue() *DBStateValue {
UtxoFlushCount: 0, UtxoFlushCount: 0,
WallTime: 0, WallTime: 0,
FirstSync: true, FirstSync: true,
DDVersion: 0, DBVersion: 0,
HistFlushCount: 0, HistFlushCount: 0,
CompFlushCount: -1, CompFlushCount: -1,
CompCursor: -1, CompCursor: -1,
@ -247,7 +248,7 @@ func (v *DBStateValue) PackValue() []byte {
bitSetVar = 1 bitSetVar = 1
} }
value[32+4+4+32+4+4] = bitSetVar value[32+4+4+32+4+4] = bitSetVar
value[32+4+4+32+4+4+1] = v.DDVersion value[32+4+4+32+4+4+1] = v.DBVersion
binary.BigEndian.PutUint32(value[32+4+4+32+4+4+1+1:], uint32(v.HistFlushCount)) binary.BigEndian.PutUint32(value[32+4+4+32+4+4+1+1:], uint32(v.HistFlushCount))
binary.BigEndian.PutUint32(value[32+4+4+32+4+4+1+1+4:], uint32(v.CompFlushCount)) binary.BigEndian.PutUint32(value[32+4+4+32+4+4+1+1+4:], uint32(v.CompFlushCount))
@ -289,7 +290,7 @@ func DBStateValueUnpack(value []byte) *DBStateValue {
UtxoFlushCount: binary.BigEndian.Uint32(value[32+4+4+32:]), UtxoFlushCount: binary.BigEndian.Uint32(value[32+4+4+32:]),
WallTime: binary.BigEndian.Uint32(value[32+4+4+32+4:]), WallTime: binary.BigEndian.Uint32(value[32+4+4+32+4:]),
FirstSync: value[32+4+4+32+4+4] == 1, FirstSync: value[32+4+4+32+4+4] == 1,
DDVersion: value[32+4+4+32+4+4+1], DBVersion: value[32+4+4+32+4+4+1],
HistFlushCount: int32(binary.BigEndian.Uint32(value[32+4+4+32+4+4+1+1:])), HistFlushCount: int32(binary.BigEndian.Uint32(value[32+4+4+32+4+4+1+1:])),
CompFlushCount: int32(binary.BigEndian.Uint32(value[32+4+4+32+4+4+1+1+4:])), CompFlushCount: int32(binary.BigEndian.Uint32(value[32+4+4+32+4+4+1+1+4:])),
CompCursor: int32(binary.BigEndian.Uint32(value[32+4+4+32+4+4+1+1+4+4:])), CompCursor: int32(binary.BigEndian.Uint32(value[32+4+4+32+4+4+1+1+4+4:])),

8
go.mod
View file

@ -10,6 +10,7 @@ require (
github.com/go-restruct/restruct v1.2.0-alpha github.com/go-restruct/restruct v1.2.0-alpha
github.com/gorilla/mux v1.7.3 github.com/gorilla/mux v1.7.3
github.com/gorilla/rpc v1.2.0 github.com/gorilla/rpc v1.2.0
github.com/lbryio/lbcutil v1.0.202
github.com/lbryio/lbry.go/v3 v3.0.1-beta github.com/lbryio/lbry.go/v3 v3.0.1-beta
github.com/linxGnu/grocksdb v1.6.42 github.com/linxGnu/grocksdb v1.6.42
github.com/olivere/elastic/v7 v7.0.24 github.com/olivere/elastic/v7 v7.0.24
@ -23,15 +24,17 @@ require (
gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c
) )
require golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b // indirect require (
)
require ( require (
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.2 // indirect github.com/golang/protobuf v1.5.2 // indirect
github.com/josharian/intern v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect
github.com/lbryio/lbcd v0.22.201-beta-rc1 github.com/lbryio/lbcd v0.22.201-beta-rc4
github.com/mailru/easyjson v0.7.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
@ -39,6 +42,7 @@ require (
github.com/prometheus/common v0.26.0 // indirect github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect
github.com/stretchr/testify v1.7.0 // indirect github.com/stretchr/testify v1.7.0 // indirect
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b // indirect
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect

7
go.sum
View file

@ -63,6 +63,7 @@ github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJm
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ=
github.com/btcsuite/btcd v0.22.0-beta/go.mod h1:9n5ntfhhHQBIhUvlhDvD3Qg6fRUj4jkN0VB8L8svzOA= github.com/btcsuite/btcd v0.22.0-beta/go.mod h1:9n5ntfhhHQBIhUvlhDvD3Qg6fRUj4jkN0VB8L8svzOA=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f h1:bAs4lUbRJpnnkd9VhRV3jjAVU7DJVjMaK+IsvSeZvFo=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
@ -359,8 +360,12 @@ github.com/lbryio/lbcd v0.22.100-beta-rc5/go.mod h1:9PbFSlHYX7WlnDQwcTxHVf1W35VA
github.com/lbryio/lbcd v0.22.200-beta/go.mod h1:kNuzGWf808ipTGB0y0WogzsGv5BVM4Qv85Z+JYwC9FA= github.com/lbryio/lbcd v0.22.200-beta/go.mod h1:kNuzGWf808ipTGB0y0WogzsGv5BVM4Qv85Z+JYwC9FA=
github.com/lbryio/lbcd v0.22.201-beta-rc1 h1:FmzzApVj2RBXloLM2w9tLvN2xyTZjeyh+QC7GIw/wwo= github.com/lbryio/lbcd v0.22.201-beta-rc1 h1:FmzzApVj2RBXloLM2w9tLvN2xyTZjeyh+QC7GIw/wwo=
github.com/lbryio/lbcd v0.22.201-beta-rc1/go.mod h1:kNuzGWf808ipTGB0y0WogzsGv5BVM4Qv85Z+JYwC9FA= github.com/lbryio/lbcd v0.22.201-beta-rc1/go.mod h1:kNuzGWf808ipTGB0y0WogzsGv5BVM4Qv85Z+JYwC9FA=
github.com/lbryio/lbcd v0.22.201-beta-rc4 h1:Xh751Bh/GWRcP5bI6NJ2+zueo2otTcTWapFvFbryP5c=
github.com/lbryio/lbcd v0.22.201-beta-rc4/go.mod h1:Jgo48JDINhdOgHHR83J70Q6G42x3WAo9DI//QogcL+E=
github.com/lbryio/lbcutil v1.0.201/go.mod h1:gDHc/b+Rdz3J7+VB8e5/Bl9roVf8Q5/8FQCyuK9dXD0= github.com/lbryio/lbcutil v1.0.201/go.mod h1:gDHc/b+Rdz3J7+VB8e5/Bl9roVf8Q5/8FQCyuK9dXD0=
github.com/lbryio/lbcutil v1.0.202-rc3/go.mod h1:LGPtVBBzh4cFXfLFb8ginlFcbA2QwumLNFd0yk/as2o= github.com/lbryio/lbcutil v1.0.202-rc3/go.mod h1:LGPtVBBzh4cFXfLFb8ginlFcbA2QwumLNFd0yk/as2o=
github.com/lbryio/lbcutil v1.0.202 h1:L0aRMs2bdCUAicD8Xe4NmUEvevDDea3qkIpCSACnftI=
github.com/lbryio/lbcutil v1.0.202/go.mod h1:LGPtVBBzh4cFXfLFb8ginlFcbA2QwumLNFd0yk/as2o=
github.com/lbryio/lbry.go/v2 v2.7.1/go.mod h1:sUhhSKqPNkiwgBqvBzJIqfLLzGH8hkDGrrO/HcaXzFc= github.com/lbryio/lbry.go/v2 v2.7.1/go.mod h1:sUhhSKqPNkiwgBqvBzJIqfLLzGH8hkDGrrO/HcaXzFc=
github.com/lbryio/lbry.go/v3 v3.0.1-beta h1:oIpQ5czhtdVSoWZCiOHE9SrqnNsahyCnMhXvXsd2IiM= github.com/lbryio/lbry.go/v3 v3.0.1-beta h1:oIpQ5czhtdVSoWZCiOHE9SrqnNsahyCnMhXvXsd2IiM=
github.com/lbryio/lbry.go/v3 v3.0.1-beta/go.mod h1:v03OVXSBGNZNDfGoAVyjQV/ZOzBGQyTnWs3jpkssxGM= github.com/lbryio/lbry.go/v3 v3.0.1-beta/go.mod h1:v03OVXSBGNZNDfGoAVyjQV/ZOzBGQyTnWs3jpkssxGM=
@ -370,6 +375,8 @@ github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-b
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/linxGnu/grocksdb v1.6.42 h1:nJLoXFuzwBwQQQrXTUgRGRz1QRm7y8pR6CNV/gwrbqs= github.com/linxGnu/grocksdb v1.6.42 h1:nJLoXFuzwBwQQQrXTUgRGRz1QRm7y8pR6CNV/gwrbqs=
github.com/linxGnu/grocksdb v1.6.42/go.mod h1:JcMMDBFaDNhRXFYcYXmgQwb/RarSld1PulTI7UzE+w0= github.com/linxGnu/grocksdb v1.6.42/go.mod h1:JcMMDBFaDNhRXFYcYXmgQwb/RarSld1PulTI7UzE+w0=
github.com/linxGnu/grocksdb v1.7.0 h1:UyFDykX0CUfxDN10cqlFho/rwt9K6KoDaLXL9Ej5z9g=
github.com/linxGnu/grocksdb v1.7.0/go.mod h1:JcMMDBFaDNhRXFYcYXmgQwb/RarSld1PulTI7UzE+w0=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/lyoshenka/bencode v0.0.0-20180323155644-b7abd7672df5/go.mod h1:H0aPCWffGOaDcjkw1iB7W9DVLp6GXmfcJY/7YZCWPA4= github.com/lyoshenka/bencode v0.0.0-20180323155644-b7abd7672df5/go.mod h1:H0aPCWffGOaDcjkw1iB7W9DVLp6GXmfcJY/7YZCWPA4=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=

View file

@ -39,11 +39,13 @@ func main() {
defer func() { defer func() {
log.Println("Shutting down server...") log.Println("Shutting down server...")
if !s.Args.DisableEs { if s.EsClient != nil {
s.EsClient.Stop() s.EsClient.Stop()
} }
s.GrpcServer.GracefulStop() if s.GrpcServer != nil {
if !s.Args.DisableResolve { s.GrpcServer.GracefulStop()
}
if s.DB != nil {
s.DB.Shutdown() s.DB.Shutdown()
} }

View file

@ -7,6 +7,7 @@ import (
"github.com/akamensky/argparse" "github.com/akamensky/argparse"
pb "github.com/lbryio/herald.go/protobuf/go" pb "github.com/lbryio/herald.go/protobuf/go"
"github.com/lbryio/lbcd/chaincfg"
) )
const ( const (
@ -21,6 +22,7 @@ type Args struct {
Host string Host string
Port string Port string
DBPath string DBPath string
Chain *string
EsHost string EsHost string
EsPort string EsPort string
PrometheusPort string PrometheusPort string
@ -112,6 +114,8 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost}) host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost})
port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort}) port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort})
dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath}) dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath})
chain := parser.Selector("", "chain", []string{chaincfg.MainNetParams.Name, chaincfg.TestNet3Params.Name, chaincfg.RegressionNetParams.Name, "testnet"},
&argparse.Options{Required: false, Help: "Which chain to use, default is 'mainnet'. Values 'regtest' and 'testnet' are for testing", Default: chaincfg.MainNetParams.Name})
esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: DefaultEsHost}) esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: DefaultEsHost})
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort}) esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort})
prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort}) prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort})
@ -159,6 +163,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
Host: *host, Host: *host,
Port: *port, Port: *port,
DBPath: *dbPath, DBPath: *dbPath,
Chain: chain,
EsHost: *esHost, EsHost: *esHost,
EsPort: *esPort, EsPort: *esPort,
PrometheusPort: *prometheusPort, PrometheusPort: *prometheusPort,

View file

@ -0,0 +1,483 @@
package server
import (
"bytes"
"compress/zlib"
"crypto/sha256"
"encoding/base64"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"net/http"
"strings"
"github.com/gorilla/rpc"
"github.com/lbryio/herald.go/db"
"github.com/lbryio/herald.go/internal"
"github.com/lbryio/lbcd/chaincfg"
"github.com/lbryio/lbcd/chaincfg/chainhash"
"github.com/lbryio/lbcd/txscript"
"github.com/lbryio/lbcd/wire"
"github.com/lbryio/lbcutil"
"golang.org/x/exp/constraints"
)
type BlockchainCodec struct {
rpc.Codec
}
func (c *BlockchainCodec) NewRequest(r *http.Request) rpc.CodecRequest {
return &BlockchainCodecRequest{c.Codec.NewRequest(r)}
}
// BlockchainCodecRequest provides ability to rewrite the incoming
// request "method" field. For example:
// blockchain.block.get_header -> blockchain_block.Get_header
// blockchain.address.listunspent -> blockchain_address.Listunspent
// This makes the "method" string compatible with Gorilla/RPC
// requirements.
type BlockchainCodecRequest struct {
rpc.CodecRequest
}
func (cr *BlockchainCodecRequest) Method() (string, error) {
rawMethod, err := cr.CodecRequest.Method()
if err != nil {
return rawMethod, err
}
parts := strings.Split(rawMethod, ".")
if len(parts) < 2 {
return rawMethod, fmt.Errorf("blockchain rpc: service/method ill-formed: %q", rawMethod)
}
service := strings.Join(parts[0:len(parts)-1], "_")
method := parts[len(parts)-1]
if len(method) < 1 {
return rawMethod, fmt.Errorf("blockchain rpc: method ill-formed: %q", method)
}
method = strings.ToUpper(string(method[0])) + string(method[1:])
return service + "." + method, err
}
// BlockchainService methods handle "blockchain.block.*" RPCs
type BlockchainService struct {
DB *db.ReadOnlyDBColumnFamily
Chain *chaincfg.Params
}
// BlockchainAddressService methods handle "blockchain.address.*" RPCs
type BlockchainAddressService struct {
BlockchainService
}
// BlockchainScripthashService methods handle "blockchain.scripthash.*" RPCs
type BlockchainScripthashService struct {
BlockchainService
}
const CHUNK_SIZE = 96
const MAX_CHUNK_SIZE = 40960
const HEADER_SIZE = wire.MaxBlockHeaderPayload
const HASHX_LEN = 11
func min[Ord constraints.Ordered](x, y Ord) Ord {
if x < y {
return x
}
return y
}
type BlockGetServerHeightReq struct{}
type BlockGetServerHeightResp uint32
func (s *BlockchainService) Get_server_height(r *http.Request, req *BlockGetServerHeightReq, resp **BlockGetServerHeightResp) error {
if s.DB == nil || s.DB.LastState == nil {
return fmt.Errorf("unknown height")
}
result := BlockGetServerHeightResp(s.DB.LastState.Height)
*resp = &result
return nil
}
type BlockGetChunkReq uint32
type BlockGetChunkResp string
// 'blockchain.block.get_chunk'
func (s *BlockchainService) Get_chunk(r *http.Request, req *BlockGetChunkReq, resp **BlockGetChunkResp) error {
index := uint32(*req)
db_headers, err := s.DB.GetHeaders(index*CHUNK_SIZE, CHUNK_SIZE)
if err != nil {
return err
}
raw := make([]byte, 0, HEADER_SIZE*len(db_headers))
for _, h := range db_headers {
raw = append(raw, h[:]...)
}
headers := BlockGetChunkResp(hex.EncodeToString(raw))
*resp = &headers
return err
}
type BlockGetHeaderReq uint32
type BlockGetHeaderResp struct {
Version uint32 `json:"version"`
PrevBlockHash string `json:"prev_block_hash"`
MerkleRoot string `json:"merkle_root"`
ClaimTrieRoot string `json:"claim_trie_root"`
Timestamp uint32 `json:"timestamp"`
Bits uint32 `json:"bits"`
Nonce uint32 `json:"nonce"`
BlockHeight uint32 `json:"block_height"`
}
// 'blockchain.block.get_header'
func (s *BlockchainService) Get_header(r *http.Request, req *BlockGetHeaderReq, resp **BlockGetHeaderResp) error {
height := uint32(*req)
headers, err := s.DB.GetHeaders(height, 1)
if err != nil {
return err
}
if len(headers) < 1 {
return errors.New("not found")
}
decode := func(header *[HEADER_SIZE]byte, height uint32) *BlockGetHeaderResp {
var h1, h2, h3 chainhash.Hash
h1.SetBytes(header[4:36])
h2.SetBytes(header[36:68])
h3.SetBytes(header[68:100])
return &BlockGetHeaderResp{
Version: binary.LittleEndian.Uint32(header[0:]),
PrevBlockHash: h1.String(),
MerkleRoot: h2.String(),
ClaimTrieRoot: h3.String(),
Timestamp: binary.LittleEndian.Uint32(header[100:]),
Bits: binary.LittleEndian.Uint32(header[104:]),
Nonce: binary.LittleEndian.Uint32(header[108:]),
BlockHeight: height,
}
}
*resp = decode(&headers[0], height)
return err
}
type BlockHeadersReq struct {
StartHeight uint32 `json:"start_height"`
Count uint32 `json:"count"`
CpHeight uint32 `json:"cp_height"`
B64 bool `json:"b64"`
}
type BlockHeadersResp struct {
Base64 string `json:"base64,omitempty"`
Hex string `json:"hex,omitempty"`
Count uint32 `json:"count"`
Max uint32 `json:"max"`
Branch string `json:"branch,omitempty"`
Root string `json:"root,omitempty"`
}
// 'blockchain.block.headers'
func (s *BlockchainService) Headers(r *http.Request, req *BlockHeadersReq, resp **BlockHeadersResp) error {
count := min(req.Count, MAX_CHUNK_SIZE)
db_headers, err := s.DB.GetHeaders(req.StartHeight, count)
if err != nil {
return err
}
count = uint32(len(db_headers))
raw := make([]byte, 0, HEADER_SIZE*count)
for _, h := range db_headers {
raw = append(raw, h[:]...)
}
result := &BlockHeadersResp{
Count: count,
Max: MAX_CHUNK_SIZE,
}
if req.B64 {
zipped := bytes.Buffer{}
w := zlib.NewWriter(&zipped)
w.Write(raw)
w.Close()
result.Base64 = base64.StdEncoding.EncodeToString(zipped.Bytes())
} else {
result.Hex = hex.EncodeToString(raw)
}
if count > 0 && req.CpHeight > 0 {
// TODO
//last_height := height + count - 1
}
*resp = result
return err
}
func decodeScriptHash(scripthash string) ([]byte, error) {
sh, err := hex.DecodeString(scripthash)
if err != nil {
return nil, err
}
if len(sh) != chainhash.HashSize {
return nil, fmt.Errorf("invalid scripthash: %v (length %v)", scripthash, len(sh))
}
internal.ReverseBytesInPlace(sh)
return sh, nil
}
func hashX(scripthash []byte) []byte {
return scripthash[:HASHX_LEN]
}
func hashXScript(script []byte, coin *chaincfg.Params) []byte {
if _, err := txscript.ExtractClaimScript(script); err == nil {
baseScript := txscript.StripClaimScriptPrefix(script)
if class, addrs, _, err := txscript.ExtractPkScriptAddrs(baseScript, coin); err == nil {
switch class {
case txscript.PubKeyHashTy, txscript.ScriptHashTy, txscript.PubKeyTy:
script, _ := txscript.PayToAddrScript(addrs[0])
return hashXScript(script, coin)
}
}
}
sum := sha256.Sum256(script)
return sum[:HASHX_LEN]
}
type AddressGetBalanceReq struct {
Address string `json:"address"`
}
type AddressGetBalanceResp struct {
Confirmed uint64 `json:"confirmed"`
Unconfirmed uint64 `json:"unconfirmed"`
}
// 'blockchain.address.get_balance'
func (s *BlockchainAddressService) Get_balance(r *http.Request, req *AddressGetBalanceReq, resp **AddressGetBalanceResp) error {
address, err := lbcutil.DecodeAddress(req.Address, s.Chain)
if err != nil {
return err
}
script, err := txscript.PayToAddrScript(address)
if err != nil {
return err
}
hashX := hashXScript(script, s.Chain)
confirmed, unconfirmed, err := s.DB.GetBalance(hashX)
if err != nil {
return err
}
*resp = &AddressGetBalanceResp{confirmed, unconfirmed}
return err
}
type scripthashGetBalanceReq struct {
ScriptHash string `json:"scripthash"`
}
type ScripthashGetBalanceResp struct {
Confirmed uint64 `json:"confirmed"`
Unconfirmed uint64 `json:"unconfirmed"`
}
// 'blockchain.scripthash.get_balance'
func (s *BlockchainScripthashService) Get_balance(r *http.Request, req *scripthashGetBalanceReq, resp **ScripthashGetBalanceResp) error {
scripthash, err := decodeScriptHash(req.ScriptHash)
if err != nil {
return err
}
hashX := hashX(scripthash)
confirmed, unconfirmed, err := s.DB.GetBalance(hashX)
if err != nil {
return err
}
*resp = &ScripthashGetBalanceResp{confirmed, unconfirmed}
return err
}
type AddressGetHistoryReq struct {
Address string `json:"address"`
}
type TxInfo struct {
TxHash string `json:"tx_hash"`
Height uint32 `json:"height"`
}
type TxInfoFee struct {
TxInfo
Fee uint64 `json:"fee"`
}
type AddressGetHistoryResp struct {
Confirmed []TxInfo `json:"confirmed"`
Unconfirmed []TxInfoFee `json:"unconfirmed"`
}
// 'blockchain.address.get_history'
func (s *BlockchainAddressService) Get_history(r *http.Request, req *AddressGetHistoryReq, resp **AddressGetHistoryResp) error {
address, err := lbcutil.DecodeAddress(req.Address, s.Chain)
if err != nil {
return err
}
script, err := txscript.PayToAddrScript(address)
if err != nil {
return err
}
hashX := hashXScript(script, s.Chain)
dbTXs, err := s.DB.GetHistory(hashX)
if err != nil {
return err
}
confirmed := make([]TxInfo, 0, len(dbTXs))
for _, tx := range dbTXs {
confirmed = append(confirmed,
TxInfo{
TxHash: tx.TxHash.String(),
Height: tx.Height,
})
}
result := &AddressGetHistoryResp{
Confirmed: confirmed,
Unconfirmed: []TxInfoFee{}, // TODO
}
*resp = result
return err
}
type ScripthashGetHistoryReq struct {
ScriptHash string `json:"scripthash"`
}
type ScripthashGetHistoryResp struct {
Confirmed []TxInfo `json:"confirmed"`
Unconfirmed []TxInfoFee `json:"unconfirmed"`
}
// 'blockchain.scripthash.get_history'
func (s *BlockchainScripthashService) Get_history(r *http.Request, req *ScripthashGetHistoryReq, resp **ScripthashGetHistoryResp) error {
scripthash, err := decodeScriptHash(req.ScriptHash)
if err != nil {
return err
}
hashX := hashX(scripthash)
dbTXs, err := s.DB.GetHistory(hashX)
if err != nil {
return err
}
confirmed := make([]TxInfo, 0, len(dbTXs))
for _, tx := range dbTXs {
confirmed = append(confirmed,
TxInfo{
TxHash: tx.TxHash.String(),
Height: tx.Height,
})
}
result := &ScripthashGetHistoryResp{
Confirmed: confirmed,
Unconfirmed: []TxInfoFee{}, // TODO
}
*resp = result
return err
}
type AddressGetMempoolReq struct {
Address string `json:"address"`
}
type AddressGetMempoolResp []TxInfoFee
// 'blockchain.address.get_mempool'
func (s *BlockchainAddressService) Get_mempool(r *http.Request, req *AddressGetMempoolReq, resp **AddressGetMempoolResp) error {
address, err := lbcutil.DecodeAddress(req.Address, s.Chain)
if err != nil {
return err
}
script, err := txscript.PayToAddrScript(address)
if err != nil {
return err
}
hashX := hashXScript(script, s.Chain)
// TODO...
internal.ReverseBytesInPlace(hashX)
unconfirmed := make([]TxInfoFee, 0, 100)
result := AddressGetMempoolResp(unconfirmed)
*resp = &result
return err
}
type ScripthashGetMempoolReq struct {
ScriptHash string `json:"scripthash"`
}
type ScripthashGetMempoolResp []TxInfoFee
// 'blockchain.scripthash.get_mempool'
func (s *BlockchainScripthashService) Get_mempool(r *http.Request, req *ScripthashGetMempoolReq, resp **ScripthashGetMempoolResp) error {
scripthash, err := decodeScriptHash(req.ScriptHash)
if err != nil {
return err
}
hashX := hashX(scripthash)
// TODO...
internal.ReverseBytesInPlace(hashX)
unconfirmed := make([]TxInfoFee, 0, 100)
result := ScripthashGetMempoolResp(unconfirmed)
*resp = &result
return err
}
type AddressListUnspentReq struct {
Address string `json:"address"`
}
type TXOInfo struct {
TxHash string `json:"tx_hash"`
TxPos uint16 `json:"tx_pos"`
Height uint32 `json:"height"`
Value uint64 `json:"value"`
}
type AddressListUnspentResp []TXOInfo
// 'blockchain.address.listunspent'
func (s *BlockchainAddressService) Listunspent(r *http.Request, req *AddressListUnspentReq, resp **AddressListUnspentResp) error {
address, err := lbcutil.DecodeAddress(req.Address, s.Chain)
if err != nil {
return err
}
script, err := txscript.PayToAddrScript(address)
if err != nil {
return err
}
hashX := hashXScript(script, s.Chain)
dbTXOs, err := s.DB.GetUnspent(hashX)
unspent := make([]TXOInfo, 0, len(dbTXOs))
for _, txo := range dbTXOs {
unspent = append(unspent,
TXOInfo{
TxHash: txo.TxHash.String(),
TxPos: txo.TxPos,
Height: txo.Height,
Value: txo.Value,
})
}
result := AddressListUnspentResp(unspent)
*resp = &result
return err
}
type ScripthashListUnspentReq struct {
ScriptHash string `json:"scripthash"`
}
type ScripthashListUnspentResp []TXOInfo
// 'blockchain.scripthash.listunspent'
func (s *BlockchainScripthashService) Listunspent(r *http.Request, req *ScripthashListUnspentReq, resp **ScripthashListUnspentResp) error {
scripthash, err := decodeScriptHash(req.ScriptHash)
if err != nil {
return err
}
hashX := hashX(scripthash)
dbTXOs, err := s.DB.GetUnspent(hashX)
unspent := make([]TXOInfo, 0, len(dbTXOs))
for _, txo := range dbTXOs {
unspent = append(unspent,
TXOInfo{
TxHash: txo.TxHash.String(),
TxPos: txo.TxPos,
Height: txo.Height,
Value: txo.Value,
})
}
result := ScripthashListUnspentResp(unspent)
*resp = &result
return err
}

View file

@ -0,0 +1,245 @@
package server
import (
"encoding/json"
"strconv"
"testing"
"github.com/lbryio/herald.go/db"
"github.com/lbryio/lbcd/chaincfg"
)
// Source: test_variety_of_transactions_and_longish_history (lbry-sdk/tests/integration/transactions)
const regTestDBPath = "../testdata/test_variety_of_transactions/lbry-rocksdb"
const regTestHeight = 502
var regTestAddrs = [30]string{
"mtgiQkd35xpx3TaZ4RBNirf3uSMQ8tXQ7z",
"mqMjBtzGTtRty7Y54RqeNLk9QE8rYUfpm3",
"n2q8ASDZmib4adu2eU4dPvVvjYeU97pks4",
"mzxYWTJogAtduNaeyH9pSSmBSPkJj33HDJ",
"mweCKeZkeUUi8RQdHry3Mziphb87vCwiiW",
"mp7ZuiZgBNJHFX6DVmeZrCj8SuzVQNDLwb",
"n2zZoBocGCcxe6jFo1anbbAsUFMPXdYfnY",
"msps28KwRJF77DxhzqD98prdwCrZwdUxJc",
"mjvkjuss63pq2mpsRn4Q5tsNKVMLG9qUt7",
"miF9cJn8HiX6vsorRDXtZEgcW7BeWowqkX",
"mx87wRYFchYaLjXyNaboMuEMRLRboFSPDD",
"mhvb94idtQvTSCQk9EB16wLLkSrbWizPRG",
"mx3Fu8FDM4nKR9VYtHWPtSGKVt1D588Ay1",
"mhqvhX7kLNQ2bUNWZxMhE1z6QEJKrqdV8T",
"mgekw8L4xEezFtkYdSarL4sk5Sc8n9UtzG",
"myhFrTz99ZHwbGo7qV4D7fJKfji7YJ3vZ8",
"mnf8UCVoo6DBq6Tg4QpnFFdV1mFVHi43TF",
"mn7hKyh6EA8oLAPkvTd9vPEgzLRejLxkj2",
"msfarwFff7LX6DkXk295x3YMnJtR5Yw8uy",
"mn8sUv6ryiLn4kzssBTqNaB1oL6qcKDzJ4",
"mhwgeQFyi1z1RxNR1CphE8PcwG2xBWcxDp",
"n2jKpDXhVaQHiKqhdQYwwykhoYtKtbh8P1",
"mhnt4btqpAuiNwjAfFxPEaA4ekCE8faRYN",
"mmTFCt6Du1VsdxSKc7f21vYsT75KnRy7NM",
"mm1nx1xSmgRponM5tmdq15KREa7f6M36La",
"mxMXmMKUqoj19hxEA5r3hZJgirT6nCQh14",
"mx2L4iqNGzpuNNsDmjvCpcomefDWLAjdv1",
"mohJcUzQdCYL7nEySKNQC8PUzowNS5gGvo",
"mjv1vErZiDXsh9TvBDGCBpzobZx7aVYuy7",
"mwDPTZzHsM6p1DfDnBeojDLRCDceTcejkT",
}
// const dbPath := "/Users/swdev1/hub/scribe_db.599529/lbry-rocksdb"
// const dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb"
func TestServerGetHeight(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
if err != nil {
t.Error(err)
return
}
s := &BlockchainService{
DB: db,
Chain: &chaincfg.RegressionNetParams,
}
req := BlockGetServerHeightReq{}
var resp *BlockGetServerHeightResp
err = s.Get_server_height(nil, &req, &resp)
if err != nil {
t.Errorf("handler err: %v", err)
}
marshalled, err := json.MarshalIndent(resp, "", " ")
if err != nil {
t.Errorf("unmarshal err: %v", err)
}
t.Logf("resp: %v", string(marshalled))
if string(marshalled) != strconv.FormatInt(regTestHeight, 10) {
t.Errorf("bad height: %v", string(marshalled))
}
}
func TestGetChunk(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
if err != nil {
t.Error(err)
return
}
s := &BlockchainService{
DB: db,
Chain: &chaincfg.RegressionNetParams,
}
for index := 0; index < 10; index++ {
req := BlockGetChunkReq(index)
var resp *BlockGetChunkResp
err := s.Get_chunk(nil, &req, &resp)
if err != nil {
t.Errorf("index: %v handler err: %v", index, err)
}
marshalled, err := json.MarshalIndent(resp, "", " ")
if err != nil {
t.Errorf("index: %v unmarshal err: %v", index, err)
}
t.Logf("index: %v resp: %v", index, string(marshalled))
switch index {
case 0, 1, 2, 3, 4:
if len(*resp) != (CHUNK_SIZE * HEADER_SIZE * 2) {
t.Errorf("index: %v bad length: %v", index, len(*resp))
}
case 5:
if len(*resp) != 23*112*2 {
t.Errorf("index: %v bad length: %v", index, len(*resp))
}
default:
if len(*resp) != 0 {
t.Errorf("index: %v bad length: %v", index, len(*resp))
}
}
}
}
func TestGetHeader(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
if err != nil {
t.Error(err)
return
}
s := &BlockchainService{
DB: db,
Chain: &chaincfg.RegressionNetParams,
}
for height := 0; height < 700; height += 100 {
req := BlockGetHeaderReq(height)
var resp *BlockGetHeaderResp
err := s.Get_header(nil, &req, &resp)
if err != nil && height <= 500 {
t.Errorf("height: %v handler err: %v", height, err)
}
marshalled, err := json.MarshalIndent(resp, "", " ")
if err != nil {
t.Errorf("height: %v unmarshal err: %v", height, err)
}
t.Logf("height: %v resp: %v", height, string(marshalled))
}
}
func TestGetBalance(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
if err != nil {
t.Error(err)
return
}
s := &BlockchainAddressService{
BlockchainService{
DB: db,
Chain: &chaincfg.RegressionNetParams,
},
}
for _, addr := range regTestAddrs {
req := AddressGetBalanceReq{addr}
var resp *AddressGetBalanceResp
err := s.Get_balance(nil, &req, &resp)
if err != nil {
t.Errorf("address: %v handler err: %v", addr, err)
}
marshalled, err := json.MarshalIndent(resp, "", " ")
if err != nil {
t.Errorf("address: %v unmarshal err: %v", addr, err)
}
t.Logf("address: %v resp: %v", addr, string(marshalled))
}
}
func TestGetHistory(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
if err != nil {
t.Error(err)
return
}
s := &BlockchainAddressService{
BlockchainService{
DB: db,
Chain: &chaincfg.RegressionNetParams,
},
}
for _, addr := range regTestAddrs {
req := AddressGetHistoryReq{addr}
var resp *AddressGetHistoryResp
err := s.Get_history(nil, &req, &resp)
if err != nil {
t.Errorf("address: %v handler err: %v", addr, err)
}
marshalled, err := json.MarshalIndent(resp, "", " ")
if err != nil {
t.Errorf("address: %v unmarshal err: %v", addr, err)
}
t.Logf("address: %v resp: %v", addr, string(marshalled))
}
}
func TestListUnspent(t *testing.T) {
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath)
defer toDefer()
if err != nil {
t.Error(err)
return
}
s := &BlockchainAddressService{
BlockchainService{
DB: db,
Chain: &chaincfg.RegressionNetParams,
},
}
for _, addr := range regTestAddrs {
req := AddressListUnspentReq{addr}
var resp *AddressListUnspentResp
err := s.Listunspent(nil, &req, &resp)
if err != nil {
t.Errorf("address: %v handler err: %v", addr, err)
}
marshalled, err := json.MarshalIndent(resp, "", " ")
if err != nil {
t.Errorf("address: %v unmarshal err: %v", addr, err)
}
t.Logf("address: %v resp: %v", addr, string(marshalled))
}
}

View file

@ -1,7 +1,6 @@
package server package server
import ( import (
"log"
"net/http" "net/http"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -9,9 +8,10 @@ import (
"github.com/gorilla/rpc/json" "github.com/gorilla/rpc/json"
"github.com/lbryio/herald.go/db" "github.com/lbryio/herald.go/db"
pb "github.com/lbryio/herald.go/protobuf/go" pb "github.com/lbryio/herald.go/protobuf/go"
log "github.com/sirupsen/logrus"
) )
type JSONServer struct { type ClaimtrieService struct {
DB *db.ReadOnlyDBColumnFamily DB *db.ReadOnlyDBColumnFamily
} }
@ -23,8 +23,8 @@ type Result struct {
Data string `json:"data"` Data string `json:"data"`
} }
// Resolve is the json rpc endpoint for resolve // Resolve is the json rpc endpoint for 'blockchain.claimtrie.resolve'.
func (t *JSONServer) Resolve(r *http.Request, args *ResolveData, result **pb.Outputs) error { func (t *ClaimtrieService) Resolve(r *http.Request, args *ResolveData, result **pb.Outputs) error {
log.Println("Resolve") log.Println("Resolve")
res, err := InternalResolve(args.Data, t.DB) res, err := InternalResolve(args.Data, t.DB)
*result = res *result = res
@ -33,14 +33,33 @@ func (t *JSONServer) Resolve(r *http.Request, args *ResolveData, result **pb.Out
// StartJsonRPC starts the json rpc server and registers the endpoints. // StartJsonRPC starts the json rpc server and registers the endpoints.
func (s *Server) StartJsonRPC() error { func (s *Server) StartJsonRPC() error {
server := new(JSONServer)
server.DB = s.DB
port := ":" + s.Args.JSONRPCPort port := ":" + s.Args.JSONRPCPort
s1 := rpc.NewServer() // Create a new RPC server s1 := rpc.NewServer() // Create a new RPC server
s1.RegisterCodec(json.NewCodec(), "application/json") // Register the type of data requested as JSON // Register the type of data requested as JSON, with custom codec.
s1.RegisterService(server, "") // Register the service by creating a new JSON server s1.RegisterCodec(&BlockchainCodec{json.NewCodec()}, "application/json")
// Register "blockchain.claimtrie.*"" handlers.
claimtrieSvc := &ClaimtrieService{s.DB}
err := s1.RegisterService(claimtrieSvc, "blockchain_claimtrie")
if err != nil {
log.Errorf("RegisterService: %v\n", err)
}
// Register other "blockchain.{block,address,scripthash}.*" handlers.
blockchainSvc := &BlockchainService{s.DB, s.Chain}
err = s1.RegisterService(blockchainSvc, "blockchain_block")
if err != nil {
log.Errorf("RegisterService: %v\n", err)
}
err = s1.RegisterService(&BlockchainAddressService{*blockchainSvc}, "blockchain_address")
if err != nil {
log.Errorf("RegisterService: %v\n", err)
}
err = s1.RegisterService(&BlockchainScripthashService{*blockchainSvc}, "blockchain_scripthash")
if err != nil {
log.Errorf("RegisterService: %v\n", err)
}
r := mux.NewRouter() r := mux.NewRouter()
r.Handle("/rpc", s1) r.Handle("/rpc", s1)

View file

@ -22,6 +22,7 @@ import (
"github.com/lbryio/herald.go/internal/metrics" "github.com/lbryio/herald.go/internal/metrics"
"github.com/lbryio/herald.go/meta" "github.com/lbryio/herald.go/meta"
pb "github.com/lbryio/herald.go/protobuf/go" pb "github.com/lbryio/herald.go/protobuf/go"
"github.com/lbryio/lbcd/chaincfg"
"github.com/olivere/elastic/v7" "github.com/olivere/elastic/v7"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
@ -36,6 +37,7 @@ type Server struct {
MultiSpaceRe *regexp.Regexp MultiSpaceRe *regexp.Regexp
WeirdCharsRe *regexp.Regexp WeirdCharsRe *regexp.Regexp
DB *db.ReadOnlyDBColumnFamily DB *db.ReadOnlyDBColumnFamily
Chain *chaincfg.Params
EsClient *elastic.Client EsClient *elastic.Client
QueryCache *ttlcache.Cache QueryCache *ttlcache.Cache
S256 *hash.Hash S256 *hash.Hash
@ -167,27 +169,48 @@ func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) {
log.Fatalln(err) log.Fatalln(err)
} }
if myDB.LastState != nil {
logrus.Infof("DB version: %v", myDB.LastState.DBVersion)
logrus.Infof("height: %v", myDB.LastState.Height)
logrus.Infof("tip: %v", myDB.LastState.Tip.String())
logrus.Infof("tx count: %v", myDB.LastState.TxCount)
}
blockingChannelHashes := make([][]byte, 0, 10) blockingChannelHashes := make([][]byte, 0, 10)
blockingIds := make([]string, 0, 10)
filteringChannelHashes := make([][]byte, 0, 10) filteringChannelHashes := make([][]byte, 0, 10)
filteringIds := make([]string, 0, 10)
for _, id := range args.BlockingChannelIds { for _, id := range args.BlockingChannelIds {
hash, err := hex.DecodeString(id) hash, err := hex.DecodeString(id)
if err != nil { if err != nil {
logrus.Warn("Invalid channel id: ", id) logrus.Warn("Invalid channel id: ", id)
continue
} }
blockingChannelHashes = append(blockingChannelHashes, hash) blockingChannelHashes = append(blockingChannelHashes, hash)
blockingIds = append(blockingIds, id)
} }
for _, id := range args.FilteringChannelIds { for _, id := range args.FilteringChannelIds {
hash, err := hex.DecodeString(id) hash, err := hex.DecodeString(id)
if err != nil { if err != nil {
logrus.Warn("Invalid channel id: ", id) logrus.Warn("Invalid channel id: ", id)
continue
} }
filteringChannelHashes = append(filteringChannelHashes, hash) filteringChannelHashes = append(filteringChannelHashes, hash)
filteringIds = append(filteringIds, id)
} }
myDB.BlockingChannelHashes = blockingChannelHashes myDB.BlockingChannelHashes = blockingChannelHashes
myDB.FilteringChannelHashes = filteringChannelHashes myDB.FilteringChannelHashes = filteringChannelHashes
if len(filteringIds) > 0 {
logrus.Infof("filtering claims reposted by channels: %+s", filteringIds)
}
if len(blockingIds) > 0 {
logrus.Infof("blocking claims reposted by channels: %+s", blockingIds)
}
return myDB, nil return myDB, nil
} }
@ -251,12 +274,49 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
} }
} }
dbChain := (*chaincfg.Params)(nil)
if myDB != nil && myDB.LastState != nil && myDB.LastState.Genesis != nil {
// The chain params can be inferred from DBStateValue.
switch *myDB.LastState.Genesis {
case *chaincfg.MainNetParams.GenesisHash:
dbChain = &chaincfg.MainNetParams
case *chaincfg.TestNet3Params.GenesisHash:
dbChain = &chaincfg.TestNet3Params
case *chaincfg.RegressionNetParams.GenesisHash:
dbChain = &chaincfg.RegressionNetParams
}
}
cliChain := (*chaincfg.Params)(nil)
if args.Chain != nil {
switch *args.Chain {
case chaincfg.MainNetParams.Name:
cliChain = &chaincfg.MainNetParams
case chaincfg.TestNet3Params.Name, "testnet":
cliChain = &chaincfg.TestNet3Params
case chaincfg.RegressionNetParams.Name:
cliChain = &chaincfg.RegressionNetParams
}
}
chain := chaincfg.MainNetParams
if dbChain != nil && cliChain != nil {
if dbChain != cliChain {
logrus.Warnf("network: %v (from db) conflicts with %v (from cli)", dbChain.Name, cliChain.Name)
}
chain = *dbChain
} else if dbChain != nil {
chain = *dbChain
} else if cliChain != nil {
chain = *cliChain
}
logrus.Infof("network: %v", chain.Name)
s := &Server{ s := &Server{
GrpcServer: grpcServer, GrpcServer: grpcServer,
Args: args, Args: args,
MultiSpaceRe: multiSpaceRe, MultiSpaceRe: multiSpaceRe,
WeirdCharsRe: weirdCharsRe, WeirdCharsRe: weirdCharsRe,
DB: myDB, DB: myDB,
Chain: &chain,
EsClient: client, EsClient: client,
QueryCache: cache, QueryCache: cache,
S256: &s256, S256: &s256,

View file

@ -39,10 +39,10 @@ func TestUDPPing(t *testing.T) {
toPort := "50001" toPort := "50001"
pong, err := server.UDPPing(toAddr, toPort) pong, err := server.UDPPing(toAddr, toPort)
gotCountry := pong.DecodeCountry()
if err != nil { if err != nil {
log.Println(err) t.Skipf("ping failed: %v", err)
} }
gotCountry := pong.DecodeCountry()
res, err := exec.Command("dig", "@resolver4.opendns.com", "myip.opendns.com", "+short").Output() res, err := exec.Command("dig", "@resolver4.opendns.com", "myip.opendns.com", "+short").Output()

Binary file not shown.

View file

@ -0,0 +1 @@
MANIFEST-000004

View file

@ -0,0 +1 @@
2c7866e6-c325-4a0e-bfac-4cb01a746a45

File diff suppressed because it is too large Load diff

Binary file not shown.

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff