WIP: blockchain.transaction.yyy JSON RPC implementations #78
3 changed files with 18 additions and 7 deletions
11
db/db.go
11
db/db.go
|
@ -441,8 +441,8 @@ func (db *ReadOnlyDBColumnFamily) selectFrom(prefix []byte, startKey, stopKey pr
|
||||||
// Prefix and handle
|
// Prefix and handle
|
||||||
options := NewIterateOptions().WithDB(db).WithPrefix(prefix).WithCfHandle(handle)
|
options := NewIterateOptions().WithDB(db).WithPrefix(prefix).WithCfHandle(handle)
|
||||||
// Start and stop bounds
|
// Start and stop bounds
|
||||||
options = options.WithStart(startKey.PackKey()).WithStop(stopKey.PackKey()).WithIncludeStop(true)
|
options = options.WithStart(startKey.PackKey()).WithStop(stopKey.PackKey()).WithIncludeStop(false)
|
||||||
// Don't include the key
|
// Include the key and value
|
||||||
options = options.WithIncludeKey(true).WithIncludeValue(true)
|
options = options.WithIncludeKey(true).WithIncludeValue(true)
|
||||||
return []*IterOptions{options}, nil
|
return []*IterOptions{options}, nil
|
||||||
}
|
}
|
||||||
|
@ -455,7 +455,7 @@ func iterate(db *grocksdb.DB, opts []*IterOptions) <-chan []*prefixes.PrefixRowK
|
||||||
for kv := range IterCF(db, o) {
|
for kv := range IterCF(db, o) {
|
||||||
row := make([]*prefixes.PrefixRowKV, 0, 1)
|
row := make([]*prefixes.PrefixRowKV, 0, 1)
|
||||||
row = append(row, kv)
|
row = append(row, kv)
|
||||||
log.Debugf("iterate[%v][%v] %#v", i, j, kv)
|
log.Debugf("iterate[%v][%v] %#v -> %#v", i, j, kv.Key, kv.Value)
|
||||||
out <- row
|
out <- row
|
||||||
j++
|
j++
|
||||||
}
|
}
|
||||||
|
@ -481,7 +481,7 @@ func innerJoin(db *grocksdb.DB, in <-chan []*prefixes.PrefixRowKV, selectFn func
|
||||||
row = append(row, kvs...)
|
row = append(row, kvs...)
|
||||||
row = append(row, kv...)
|
row = append(row, kv...)
|
||||||
for i, kv := range row {
|
for i, kv := range row {
|
||||||
log.Debugf("row[%v] %#v", i, kv)
|
log.Debugf("row[%v] %#v -> %#v", i, kv.Key, kv.Value)
|
||||||
}
|
}
|
||||||
out <- row
|
out <- row
|
||||||
}
|
}
|
||||||
|
@ -579,6 +579,7 @@ func GetDBColumnFamilies(name string, secondayPath string, cfNames []string, grp
|
||||||
// db, handles, err := grocksdb.OpenDbColumnFamilies(opts, name, cfNames, cfOpts)
|
// db, handles, err := grocksdb.OpenDbColumnFamilies(opts, name, cfNames, cfOpts)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Errorf("open db as secondary failed: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -685,7 +686,7 @@ func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) {
|
||||||
for {
|
for {
|
||||||
// FIXME: Figure out best sleep interval
|
// FIXME: Figure out best sleep interval
|
||||||
if time.Since(lastPrint) > time.Second {
|
if time.Since(lastPrint) > time.Second {
|
||||||
log.Debug("DetectChanges:", db.LastState)
|
log.Debugf("DetectChanges: %#v", db.LastState)
|
||||||
lastPrint = time.Now()
|
lastPrint = time.Now()
|
||||||
}
|
}
|
||||||
err := db.detectChanges(notifCh)
|
err := db.detectChanges(notifCh)
|
||||||
|
|
12
db/db_get.go
12
db/db_get.go
|
@ -7,7 +7,6 @@ import (
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"math"
|
"math"
|
||||||
|
|
||||||
"github.com/lbryio/herald.go/db/prefixes"
|
"github.com/lbryio/herald.go/db/prefixes"
|
||||||
|
@ -15,6 +14,8 @@ import (
|
||||||
"github.com/lbryio/lbcd/chaincfg/chainhash"
|
"github.com/lbryio/lbcd/chaincfg/chainhash"
|
||||||
"github.com/lbryio/lbcd/wire"
|
"github.com/lbryio/lbcd/wire"
|
||||||
"github.com/linxGnu/grocksdb"
|
"github.com/linxGnu/grocksdb"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// GetExpirationHeight returns the expiration height for the given height. Uses
|
// GetExpirationHeight returns the expiration height for the given height. Uses
|
||||||
|
@ -298,6 +299,7 @@ func (db *ReadOnlyDBColumnFamily) GetStatus(hashX []byte) ([]byte, error) {
|
||||||
// Lookup in HashXMempoolStatus first.
|
// Lookup in HashXMempoolStatus first.
|
||||||
status, err := db.getMempoolStatus(hashX)
|
status, err := db.getMempoolStatus(hashX)
|
||||||
if err == nil && status != nil {
|
if err == nil && status != nil {
|
||||||
|
log.Debugf("(mempool) status(%#v) -> %#v", hashX, status)
|
||||||
return status, err
|
return status, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -318,6 +320,7 @@ func (db *ReadOnlyDBColumnFamily) GetStatus(hashX []byte) ([]byte, error) {
|
||||||
copy(rawValue, slice.Data())
|
copy(rawValue, slice.Data())
|
||||||
value := prefixes.HashXStatusValue{}
|
value := prefixes.HashXStatusValue{}
|
||||||
value.UnpackValue(rawValue)
|
value.UnpackValue(rawValue)
|
||||||
|
log.Debugf("status(%#v) -> %#v", hashX, value.Status)
|
||||||
return value.Status, nil
|
return value.Status, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -326,6 +329,11 @@ func (db *ReadOnlyDBColumnFamily) GetStatus(hashX []byte) ([]byte, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(txs) == 0 {
|
||||||
|
return []byte{}, err
|
||||||
|
}
|
||||||
|
|
||||||
hash := sha256.New()
|
hash := sha256.New()
|
||||||
for _, tx := range txs {
|
for _, tx := range txs {
|
||||||
hash.Write([]byte(fmt.Sprintf("%s:%d:", tx.TxHash.String(), tx.Height)))
|
hash.Write([]byte(fmt.Sprintf("%s:%d:", tx.TxHash.String(), tx.Height)))
|
||||||
|
@ -904,6 +912,7 @@ func (db *ReadOnlyDBColumnFamily) GetTxMerkle(tx_hashes []chainhash.Hash) ([]TxM
|
||||||
selectedTxNum := make([]*IterOptions, 0, len(tx_hashes))
|
selectedTxNum := make([]*IterOptions, 0, len(tx_hashes))
|
||||||
for _, txhash := range tx_hashes {
|
for _, txhash := range tx_hashes {
|
||||||
key := prefixes.TxNumKey{Prefix: []byte{prefixes.TxNum}, TxHash: &txhash}
|
key := prefixes.TxNumKey{Prefix: []byte{prefixes.TxNum}, TxHash: &txhash}
|
||||||
|
log.Debugf("%v", key)
|
||||||
opt, err := db.selectFrom(key.Prefix, &key, &key)
|
opt, err := db.selectFrom(key.Prefix, &key, &key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -913,6 +922,7 @@ func (db *ReadOnlyDBColumnFamily) GetTxMerkle(tx_hashes []chainhash.Hash) ([]TxM
|
||||||
|
|
||||||
selectTxByTxNum := func(in []*prefixes.PrefixRowKV) ([]*IterOptions, error) {
|
selectTxByTxNum := func(in []*prefixes.PrefixRowKV) ([]*IterOptions, error) {
|
||||||
txNumKey := in[0].Key.(*prefixes.TxNumKey)
|
txNumKey := in[0].Key.(*prefixes.TxNumKey)
|
||||||
|
log.Debugf("%v", txNumKey.TxHash.String())
|
||||||
out := make([]*IterOptions, 0, 100)
|
out := make([]*IterOptions, 0, 100)
|
||||||
startKey := &prefixes.TxKey{
|
startKey := &prefixes.TxKey{
|
||||||
Prefix: []byte{prefixes.Tx},
|
Prefix: []byte{prefixes.Tx},
|
||||||
|
|
|
@ -823,7 +823,7 @@ func (s *BlockchainTransactionService) Get_batch(req *TransactionGetBatchReq, re
|
||||||
if len(*req) > 100 {
|
if len(*req) > 100 {
|
||||||
return fmt.Errorf("too many tx hashes in request: %v", len(*req))
|
return fmt.Errorf("too many tx hashes in request: %v", len(*req))
|
||||||
}
|
}
|
||||||
tx_hashes := make([]chainhash.Hash, len(*req))
|
tx_hashes := make([]chainhash.Hash, 0, len(*req))
|
||||||
for i, txid := range *req {
|
for i, txid := range *req {
|
||||||
tx_hashes = append(tx_hashes, chainhash.Hash{})
|
tx_hashes = append(tx_hashes, chainhash.Hash{})
|
||||||
if err := chainhash.Decode(&tx_hashes[i], txid); err != nil {
|
if err := chainhash.Decode(&tx_hashes[i], txid); err != nil {
|
||||||
|
|
Loading…
Reference in a new issue