WIP: blockchain.transaction.yyy JSON RPC implementations #78

Merged
moodyjon merged 12 commits from blockchain_tx_rpc1 into master 2022-12-06 22:14:28 +01:00
4 changed files with 245 additions and 87 deletions
Showing only changes of commit 8c2df7e62a - Show all commits

View file

@ -3,6 +3,7 @@ package db
// db_get.go contains the basic access functions to the database.
import (
"bytes"
"crypto/sha256"
"encoding/hex"
"fmt"
@ -12,6 +13,7 @@ import (
"github.com/lbryio/herald.go/db/prefixes"
"github.com/lbryio/herald.go/db/stack"
"github.com/lbryio/lbcd/chaincfg/chainhash"
"github.com/lbryio/lbcd/wire"
"github.com/linxGnu/grocksdb"
)
@ -731,6 +733,71 @@ func (db *ReadOnlyDBColumnFamily) FsGetClaimByHash(claimHash []byte) (*ResolveRe
)
}
func (db *ReadOnlyDBColumnFamily) GetTx(txhash *chainhash.Hash) ([]byte, *wire.MsgTx, error) {
// Lookup in MempoolTx first.
raw, tx, err := db.getMempoolTx(txhash)
if err == nil && raw != nil && tx != nil {
return raw, tx, err
}
handle, err := db.EnsureHandle(prefixes.Tx)
if err != nil {
return nil, nil, err
}
key := prefixes.TxKey{Prefix: []byte{prefixes.Tx}, TxHash: txhash}
rawKey := key.PackKey()
slice, err := db.DB.GetCF(db.Opts, handle, rawKey)
defer slice.Free()
if err != nil {
return nil, nil, err
}
if slice.Size() == 0 {
return nil, nil, nil
}
rawValue := make([]byte, len(slice.Data()))
copy(rawValue, slice.Data())
value := prefixes.TxValue{}
value.UnpackValue(rawValue)
var msgTx wire.MsgTx
err = msgTx.Deserialize(bytes.NewReader(value.RawTx))
if err != nil {
return nil, nil, err
}
return value.RawTx, &msgTx, nil
}
func (db *ReadOnlyDBColumnFamily) getMempoolTx(txhash *chainhash.Hash) ([]byte, *wire.MsgTx, error) {
handle, err := db.EnsureHandle(prefixes.MempoolTx)
if err != nil {
return nil, nil, err
}
key := prefixes.MempoolTxKey{Prefix: []byte{prefixes.Tx}, TxHash: txhash}
rawKey := key.PackKey()
slice, err := db.DB.GetCF(db.Opts, handle, rawKey)
defer slice.Free()
if err != nil {
return nil, nil, err
}
if slice.Size() == 0 {
return nil, nil, nil
}
rawValue := make([]byte, len(slice.Data()))
copy(rawValue, slice.Data())
value := prefixes.MempoolTxValue{}
value.UnpackValue(rawValue)
var msgTx wire.MsgTx
err = msgTx.Deserialize(bytes.NewReader(value.RawTx))
if err != nil {
return nil, nil, err
}
return value.RawTx, &msgTx, nil
}
func (db *ReadOnlyDBColumnFamily) GetTxCount(height uint32) (*prefixes.TxCountValue, error) {
handle, err := db.EnsureHandle(prefixes.TxCount)
if err != nil {
@ -754,6 +821,29 @@ func (db *ReadOnlyDBColumnFamily) GetTxCount(height uint32) (*prefixes.TxCountVa
return value, nil
}
func (db *ReadOnlyDBColumnFamily) GetTxHeight(txhash *chainhash.Hash) (uint32, error) {
handle, err := db.EnsureHandle(prefixes.TxNum)
if err != nil {
return 0, err
}
key := prefixes.TxNumKey{Prefix: []byte{prefixes.TxNum}, TxHash: txhash}
rawKey := key.PackKey()
slice, err := db.DB.GetCF(db.Opts, handle, rawKey)
defer slice.Free()
if err != nil {
return 0, err
}
if slice.Size() == 0 {
return 0, nil
}
// No slice copy needed. Value will be abandoned.
value := prefixes.TxNumValueUnpack(slice.Data())
height := stack.BisectRight(db.TxCounts, []uint32{value.TxNum})[0]
return height, nil
}
func (db *ReadOnlyDBColumnFamily) GetDBState() (*prefixes.DBStateValue, error) {
handle, err := db.EnsureHandle(prefixes.DBState)
if err != nil {

View file

@ -1050,84 +1050,6 @@ func TxNumValueUnpack(value []byte) *TxNumValue {
}
}
type TxKey struct {
Prefix []byte `struct:"[1]byte" json:"prefix"`
TxHash *chainhash.Hash `struct:"*[32]byte" json:"tx_hash"`
}
type TxValue struct {
RawTx []byte `struct-while:"!_eof" json:"raw_tx"`
}
func (k *TxKey) PackKey() []byte {
prefixLen := 1
// b'>L'
n := prefixLen + 32
key := make([]byte, n)
copy(key, k.Prefix)
copy(key[prefixLen:], k.TxHash[:32])
return key
}
func (v *TxValue) PackValue() []byte {
value := make([]byte, len(v.RawTx))
copy(value, v.RawTx)
return value
}
func (kv *TxKey) NumFields() int {
return 1
}
func (k *TxKey) PartialPack(fields int) []byte {
// Limit fields between 0 and number of fields, we always at least need
// the prefix, and we never need to iterate past the number of fields.
if fields > 1 {
fields = 1
}
if fields < 0 {
fields = 0
}
prefixLen := 1
var n = prefixLen
for i := 0; i <= fields; i++ {
switch i {
case 1:
n += 32
}
}
key := make([]byte, n)
for i := 0; i <= fields; i++ {
switch i {
case 0:
copy(key, k.Prefix)
case 1:
copy(key[prefixLen:], k.TxHash[:32])
}
}
return key
}
func TxKeyUnpack(key []byte) *TxKey {
prefixLen := 1
return &TxKey{
Prefix: key[:prefixLen],
TxHash: (*chainhash.Hash)(key[prefixLen : prefixLen+32]),
}
}
func TxValueUnpack(value []byte) *TxValue {
return &TxValue{
RawTx: value,
}
}
type BlockHeaderKey struct {
Prefix []byte `struct:"[1]byte" json:"prefix"`
Height uint32 `json:"height"`
@ -3351,9 +3273,12 @@ func (kv *TrendingNotificationValue) UnpackValue(buf []byte) {
offset += 8
}
type TxKey = MempoolTxKey
type TxValue = MempoolTxValue
type MempoolTxKey struct {
Prefix []byte `struct:"[1]byte" json:"prefix"`
TxHash []byte `struct:"[32]byte" json:"tx_hash"`
Prefix []byte `struct:"[1]byte" json:"prefix"`
TxHash *chainhash.Hash `struct:"*[32]byte" json:"tx_hash"`
}
type MempoolTxValue struct {
@ -3386,7 +3311,7 @@ func (kv *MempoolTxKey) UnpackKey(buf []byte) {
offset := 0
kv.Prefix = buf[offset : offset+1]
offset += 1
kv.TxHash = buf[offset : offset+32]
kv.TxHash = (*chainhash.Hash)(buf[offset : offset+32])
offset += 32
}
@ -3925,12 +3850,6 @@ var prefixRegistry = map[byte]prefixMeta{
newValue: func() interface{} {
return &TxValue{}
},
newKeyUnpack: func(buf []byte) interface{} {
return TxKeyUnpack(buf)
},
newValueUnpack: func(buf []byte) interface{} {
return TxValueUnpack(buf)
},
},
BlockHash: {
newKey: func() interface{} {

View file

@ -55,6 +55,14 @@ type BlockchainScripthashService struct {
session *session
}
// BlockchainTransactionService methods handle "blockchain.transaction.*" RPCs
type BlockchainTransactionService struct {
DB *db.ReadOnlyDBColumnFamily
Chain *chaincfg.Params
// needed for broadcast TX
sessionMgr *sessionManager
}
const CHUNK_SIZE = 96
const MAX_CHUNK_SIZE = 40960
const HEADER_SIZE = wire.MaxBlockHeaderPayload
@ -626,3 +634,138 @@ func (s *BlockchainScripthashService) Unsubscribe(req *ScripthashSubscribeReq, r
*resp = (*ScripthashSubscribeResp)(nil)
return nil
}
type TransactionBroadcastReq string
type TransactionBroadcastResp string
// 'blockchain.transaction.broadcast'
func (s *BlockchainTransactionService) Broadcast(req *TransactionBroadcastReq, resp **TransactionBroadcastResp) error {
strTx := string(*req)
rawTx, err := hex.DecodeString(strTx)
if err != nil {
return err
}
txhash, err := s.sessionMgr.broadcastTx(rawTx)
if err != nil {
return err
}
result := txhash.String()
*resp = (*TransactionBroadcastResp)(&result)
return nil
}
type TransactionGetReq string
type TXFullDetail struct {
Height uint32 `json:"block_height"`
Merkle string `json:"merkle"`
Pos uint64 `json:"pos"`
}
type TXDetail struct {
Height uint32 `json:"block_height"`
}
// TransactionResp is a pair consisting of:
// resp[0]: Raw transaction as hex string
// resp[1]: TXFullDetail or TXDetail structure
type TXGetResp [2]interface{}
type TransactionGetResp TXGetResp
// 'blockchain.transaction.get'
func (s *BlockchainTransactionService) Get(req *TransactionGetReq, resp **TransactionGetResp) error {
txids := [1]string{string(*req)}
request := TransactionGetBatchReq(txids[:])
var response *TransactionGetBatchResp
err := s.Get_batch(&request, &response)
if err != nil {
return err
}
if len(*response) < 1 {
return errors.New("tx not found")
}
switch (*response)[0][1].(type) {
case TXFullDetail:
break
case TXDetail:
default:
return errors.New("tx not confirmed")
}
*resp = (*TransactionGetResp)(&(*response)[0])
return err
}
type TransactionGetBatchReq []string
type TransactionGetBatchResp []TXGetResp
// 'blockchain.transaction.get_batch'
func (s *BlockchainTransactionService) Get_batch(req *TransactionGetBatchReq, resp **TransactionGetBatchResp) error {
return nil
}
type TransactionGetMerkleReq string
type TransactionGetMerkleResp TXGetResp
// 'blockchain.transaction.get_merkle'
func (s *BlockchainTransactionService) Get_merkle(req *TransactionGetMerkleReq, resp **TransactionGetMerkleResp) error {
txids := [1]string{string(*req)}
request := TransactionGetBatchReq(txids[:])
var response *TransactionGetBatchResp
err := s.Get_batch(&request, &response)
if err != nil {
return err
}
if len(*response) < 1 {
return errors.New("tx not found")
}
switch (*response)[0][1].(type) {
case TXFullDetail:
break
case TXDetail:
default:
return errors.New("tx not confirmed")
}
*resp = (*TransactionGetMerkleResp)(&(*response)[0])
return err
}
type TransactionGetHeightReq string
type TransactionGetHeightResp uint32
// 'blockchain.transaction.get_height'
func (s *BlockchainTransactionService) Get_height(req *TransactionGetHeightReq, resp **TransactionGetHeightResp) error {
txid := string(*(req))
txhash, err := chainhash.NewHashFromStr(txid)
if err != nil {
return err
}
height, err := s.DB.GetTxHeight(txhash)
*resp = (*TransactionGetHeightResp)(&height)
return err
}
type TransactionInfoReq string
type TransactionInfoResp TXGetResp
// 'blockchain.transaction.info'
func (s *BlockchainTransactionService) Info(req *TransactionInfoReq, resp **TransactionInfoResp) error {
txids := [1]string{string(*req)}
request := TransactionGetBatchReq(txids[:])
var response *TransactionGetBatchResp
err := s.Get_batch(&request, &response)
if err != nil {
return err
}
if len(*response) < 1 {
return errors.New("tx not found")
}
switch (*response)[0][1].(type) {
case TXFullDetail:
break
case TXDetail:
default:
if (*response)[0][0] == nil {
return errors.New("no such mempool or blockchain transaction")
}
}
*resp = (*TransactionInfoResp)(&(*response)[0])
return err
}

View file

@ -16,6 +16,7 @@ import (
"github.com/lbryio/herald.go/db"
"github.com/lbryio/herald.go/internal"
"github.com/lbryio/lbcd/chaincfg"
"github.com/lbryio/lbcd/chaincfg/chainhash"
"github.com/lbryio/lbry.go/v3/extras/stop"
log "github.com/sirupsen/logrus"
)
@ -276,6 +277,11 @@ func (sm *sessionManager) removeSessionLocked(sess *session) {
sess.conn.Close()
}
func (sm *sessionManager) broadcastTx(rawTx []byte) (*chainhash.Hash, error) {
// TODO
return nil, nil
}
func (sm *sessionManager) headersSubscribe(sess *session, raw bool, subscribe bool) {
sm.sessionsMut.Lock()
defer sm.sessionsMut.Unlock()