diff --git a/db/db.go b/db/db.go index 4051c50..ef3da42 100644 --- a/db/db.go +++ b/db/db.go @@ -441,8 +441,8 @@ func (db *ReadOnlyDBColumnFamily) selectFrom(prefix []byte, startKey, stopKey pr // Prefix and handle options := NewIterateOptions().WithDB(db).WithPrefix(prefix).WithCfHandle(handle) // Start and stop bounds - options = options.WithStart(startKey.PackKey()).WithStop(stopKey.PackKey()).WithIncludeStop(true) - // Don't include the key + options = options.WithStart(startKey.PackKey()).WithStop(stopKey.PackKey()).WithIncludeStop(false) + // Include the key and value options = options.WithIncludeKey(true).WithIncludeValue(true) return []*IterOptions{options}, nil } @@ -455,7 +455,7 @@ func iterate(db *grocksdb.DB, opts []*IterOptions) <-chan []*prefixes.PrefixRowK for kv := range IterCF(db, o) { row := make([]*prefixes.PrefixRowKV, 0, 1) row = append(row, kv) - log.Debugf("iterate[%v][%v] %#v", i, j, kv) + log.Debugf("iterate[%v][%v] %#v -> %#v", i, j, kv.Key, kv.Value) out <- row j++ } @@ -481,7 +481,7 @@ func innerJoin(db *grocksdb.DB, in <-chan []*prefixes.PrefixRowKV, selectFn func row = append(row, kvs...) row = append(row, kv...) for i, kv := range row { - log.Debugf("row[%v] %#v", i, kv) + log.Debugf("row[%v] %#v -> %#v", i, kv.Key, kv.Value) } out <- row } @@ -579,6 +579,7 @@ func GetDBColumnFamilies(name string, secondayPath string, cfNames []string, grp // db, handles, err := grocksdb.OpenDbColumnFamilies(opts, name, cfNames, cfOpts) if err != nil { + log.Errorf("open db as secondary failed: %v", err) return nil, err } @@ -685,7 +686,7 @@ func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) { for { // FIXME: Figure out best sleep interval if time.Since(lastPrint) > time.Second { - log.Debug("DetectChanges:", db.LastState) + log.Debugf("DetectChanges: %#v", db.LastState) lastPrint = time.Now() } err := db.detectChanges(notifCh) @@ -775,7 +776,12 @@ func (db *ReadOnlyDBColumnFamily) detectChanges(notifCh chan<- interface{}) erro log.Info("error getting block hash: ", err) return err } - notifCh <- &internal.HeightHash{Height: uint64(height), BlockHash: hash} + header, err := db.GetHeader(height) + if err != nil { + log.Info("error getting block header: ", err) + return err + } + notifCh <- &internal.HeightHash{Height: uint64(height), BlockHash: hash, BlockHeader: header} } //TODO: ClearCache log.Warn("implement cache clearing") diff --git a/db/db_get.go b/db/db_get.go index 478aaf9..b6f0c10 100644 --- a/db/db_get.go +++ b/db/db_get.go @@ -3,16 +3,19 @@ package db // db_get.go contains the basic access functions to the database. import ( + "bytes" "crypto/sha256" "encoding/hex" "fmt" - "log" "math" "github.com/lbryio/herald.go/db/prefixes" "github.com/lbryio/herald.go/db/stack" "github.com/lbryio/lbcd/chaincfg/chainhash" + "github.com/lbryio/lbcd/wire" "github.com/linxGnu/grocksdb" + + log "github.com/sirupsen/logrus" ) // GetExpirationHeight returns the expiration height for the given height. Uses @@ -65,6 +68,31 @@ func (db *ReadOnlyDBColumnFamily) GetBlockHash(height uint32) ([]byte, error) { return rawValue, nil } +func (db *ReadOnlyDBColumnFamily) GetBlockTXs(height uint32) ([]*chainhash.Hash, error) { + handle, err := db.EnsureHandle(prefixes.BlockTXs) + if err != nil { + return nil, err + } + + key := prefixes.BlockTxsKey{ + Prefix: []byte{prefixes.BlockTXs}, + Height: height, + } + slice, err := db.DB.GetCF(db.Opts, handle, key.PackKey()) + defer slice.Free() + if err != nil { + return nil, err + } + if slice.Size() == 0 { + return nil, nil + } + + rawValue := make([]byte, len(slice.Data())) + copy(rawValue, slice.Data()) + value := prefixes.BlockTxsValueUnpack(rawValue) + return value.TxHashes, nil +} + func (db *ReadOnlyDBColumnFamily) GetHeader(height uint32) ([]byte, error) { handle, err := db.EnsureHandle(prefixes.Header) if err != nil { @@ -271,6 +299,7 @@ func (db *ReadOnlyDBColumnFamily) GetStatus(hashX []byte) ([]byte, error) { // Lookup in HashXMempoolStatus first. status, err := db.getMempoolStatus(hashX) if err == nil && status != nil { + log.Debugf("(mempool) status(%#v) -> %#v", hashX, status) return status, err } @@ -291,6 +320,7 @@ func (db *ReadOnlyDBColumnFamily) GetStatus(hashX []byte) ([]byte, error) { copy(rawValue, slice.Data()) value := prefixes.HashXStatusValue{} value.UnpackValue(rawValue) + log.Debugf("status(%#v) -> %#v", hashX, value.Status) return value.Status, nil } @@ -299,6 +329,11 @@ func (db *ReadOnlyDBColumnFamily) GetStatus(hashX []byte) ([]byte, error) { if err != nil { return nil, err } + + if len(txs) == 0 { + return []byte{}, err + } + hash := sha256.New() for _, tx := range txs { hash.Write([]byte(fmt.Sprintf("%s:%d:", tx.TxHash.String(), tx.Height))) @@ -731,6 +766,70 @@ func (db *ReadOnlyDBColumnFamily) FsGetClaimByHash(claimHash []byte) (*ResolveRe ) } +func (db *ReadOnlyDBColumnFamily) GetTx(txhash *chainhash.Hash) ([]byte, *wire.MsgTx, error) { + // Lookup in MempoolTx first. + raw, tx, err := db.getMempoolTx(txhash) + if err == nil && raw != nil && tx != nil { + return raw, tx, err + } + + handle, err := db.EnsureHandle(prefixes.Tx) + if err != nil { + return nil, nil, err + } + + key := prefixes.TxKey{Prefix: []byte{prefixes.Tx}, TxHash: txhash} + rawKey := key.PackKey() + slice, err := db.DB.GetCF(db.Opts, handle, rawKey) + defer slice.Free() + if err != nil { + return nil, nil, err + } + if slice.Size() == 0 { + return nil, nil, nil + } + + rawValue := make([]byte, len(slice.Data())) + copy(rawValue, slice.Data()) + value := prefixes.TxValue{} + value.UnpackValue(rawValue) + var msgTx wire.MsgTx + err = msgTx.Deserialize(bytes.NewReader(value.RawTx)) + if err != nil { + return nil, nil, err + } + return value.RawTx, &msgTx, nil +} + +func (db *ReadOnlyDBColumnFamily) getMempoolTx(txhash *chainhash.Hash) ([]byte, *wire.MsgTx, error) { + handle, err := db.EnsureHandle(prefixes.MempoolTx) + if err != nil { + return nil, nil, err + } + + key := prefixes.MempoolTxKey{Prefix: []byte{prefixes.Tx}, TxHash: txhash} + rawKey := key.PackKey() + slice, err := db.DB.GetCF(db.Opts, handle, rawKey) + defer slice.Free() + if err != nil { + return nil, nil, err + } + if slice.Size() == 0 { + return nil, nil, nil + } + + rawValue := make([]byte, len(slice.Data())) + copy(rawValue, slice.Data()) + value := prefixes.MempoolTxValue{} + value.UnpackValue(rawValue) + var msgTx wire.MsgTx + err = msgTx.Deserialize(bytes.NewReader(value.RawTx)) + if err != nil { + return nil, nil, err + } + return value.RawTx, &msgTx, nil +} + func (db *ReadOnlyDBColumnFamily) GetTxCount(height uint32) (*prefixes.TxCountValue, error) { handle, err := db.EnsureHandle(prefixes.TxCount) if err != nil { @@ -754,6 +853,123 @@ func (db *ReadOnlyDBColumnFamily) GetTxCount(height uint32) (*prefixes.TxCountVa return value, nil } +func (db *ReadOnlyDBColumnFamily) GetTxHeight(txhash *chainhash.Hash) (uint32, error) { + handle, err := db.EnsureHandle(prefixes.TxNum) + if err != nil { + return 0, err + } + + key := prefixes.TxNumKey{Prefix: []byte{prefixes.TxNum}, TxHash: txhash} + rawKey := key.PackKey() + slice, err := db.DB.GetCF(db.Opts, handle, rawKey) + defer slice.Free() + if err != nil { + return 0, err + } + if slice.Size() == 0 { + return 0, nil + } + + // No slice copy needed. Value will be abandoned. + value := prefixes.TxNumValueUnpack(slice.Data()) + height := stack.BisectRight(db.TxCounts, []uint32{value.TxNum})[0] + return height, nil +} + +type TxMerkle struct { + TxHash *chainhash.Hash + RawTx []byte + Height int + Pos uint32 + Merkle []*chainhash.Hash +} + +// merklePath selects specific transactions by position within blockTxs. +// The resulting merkle path (aka merkle branch, or merkle) is a list of TX hashes +// which are in sibling relationship with TX nodes on the path to the root. +func merklePath(pos uint32, blockTxs, partial []*chainhash.Hash) []*chainhash.Hash { + parent := func(p uint32) uint32 { + return p >> 1 + } + sibling := func(p uint32) uint32 { + if p%2 == 0 { + return p + 1 + } else { + return p - 1 + } + } + p := parent(pos) + if p == 0 { + // No parent, path is complete. + return partial + } + // Add sibling to partial path and proceed to parent TX. + return merklePath(p, blockTxs, append(partial, blockTxs[sibling(pos)])) +} + +func (db *ReadOnlyDBColumnFamily) GetTxMerkle(tx_hashes []chainhash.Hash) ([]TxMerkle, error) { + + selectedTxNum := make([]*IterOptions, 0, len(tx_hashes)) + for _, txhash := range tx_hashes { + key := prefixes.TxNumKey{Prefix: []byte{prefixes.TxNum}, TxHash: &txhash} + log.Debugf("%v", key) + opt, err := db.selectFrom(key.Prefix, &key, &key) + if err != nil { + return nil, err + } + selectedTxNum = append(selectedTxNum, opt...) + } + + selectTxByTxNum := func(in []*prefixes.PrefixRowKV) ([]*IterOptions, error) { + txNumKey := in[0].Key.(*prefixes.TxNumKey) + log.Debugf("%v", txNumKey.TxHash.String()) + out := make([]*IterOptions, 0, 100) + startKey := &prefixes.TxKey{ + Prefix: []byte{prefixes.Tx}, + TxHash: txNumKey.TxHash, + } + endKey := &prefixes.TxKey{ + Prefix: []byte{prefixes.Tx}, + TxHash: txNumKey.TxHash, + } + selectedTx, err := db.selectFrom([]byte{prefixes.Tx}, startKey, endKey) + if err != nil { + return nil, err + } + out = append(out, selectedTx...) + return out, nil + } + + blockTxsCache := make(map[uint32][]*chainhash.Hash) + results := make([]TxMerkle, 0, 500) + for kvs := range innerJoin(db.DB, iterate(db.DB, selectedTxNum), selectTxByTxNum) { + if err := checkForError(kvs); err != nil { + return results, err + } + txNumKey, txNumVal := kvs[0].Key.(*prefixes.TxNumKey), kvs[0].Value.(*prefixes.TxNumValue) + _, txVal := kvs[1].Key.(*prefixes.TxKey), kvs[1].Value.(*prefixes.TxValue) + txHeight := stack.BisectRight(db.TxCounts, []uint32{txNumVal.TxNum})[0] + txPos := txNumVal.TxNum - db.TxCounts.Get(txHeight-1) + // We need all the TX hashes in order to select out the relevant ones. + if _, ok := blockTxsCache[txHeight]; !ok { + txs, err := db.GetBlockTXs(txHeight) + if err != nil { + return results, err + } + blockTxsCache[txHeight] = txs + } + blockTxs, _ := blockTxsCache[txHeight] + results = append(results, TxMerkle{ + TxHash: txNumKey.TxHash, + RawTx: txVal.RawTx, + Height: int(txHeight), + Pos: txPos, + Merkle: merklePath(txPos, blockTxs, []*chainhash.Hash{}), + }) + } + return results, nil +} + func (db *ReadOnlyDBColumnFamily) GetDBState() (*prefixes.DBStateValue, error) { handle, err := db.EnsureHandle(prefixes.DBState) if err != nil { diff --git a/db/prefixes/generic.go b/db/prefixes/generic.go index 2b8d685..509c1dc 100644 --- a/db/prefixes/generic.go +++ b/db/prefixes/generic.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/go-restruct/restruct" + "github.com/lbryio/herald.go/internal" "github.com/lbryio/lbcd/chaincfg/chainhash" ) @@ -59,6 +60,34 @@ func (kv *BlockTxsValue) Unpack(buf []byte, order binary.ByteOrder) ([]byte, err return buf[offset:], nil } +// Struct BigEndianChainHash is a chainhash.Hash stored in external +// byte-order (opposite of other 32 byte chainhash.Hash values). In order +// to reuse chainhash.Hash we need to correct the byte-order. +// Currently this type is used for field Genesis of DBStateValue. + +func (kv *BigEndianChainHash) SizeOf() int { + return chainhash.HashSize +} + +func (kv *BigEndianChainHash) Pack(buf []byte, order binary.ByteOrder) ([]byte, error) { + offset := 0 + hash := kv.CloneBytes() + // HACK: Instances of chainhash.Hash use the internal byte-order. + // Python scribe writes bytes of genesis hash in external byte-order. + internal.ReverseBytesInPlace(hash) + offset += copy(buf[offset:chainhash.HashSize], hash[:]) + return buf[offset:], nil +} + +func (kv *BigEndianChainHash) Unpack(buf []byte, order binary.ByteOrder) ([]byte, error) { + offset := 0 + offset += copy(kv.Hash[:], buf[offset:32]) + // HACK: Instances of chainhash.Hash use the internal byte-order. + // Python scribe writes bytes of genesis hash in external byte-order. + internal.ReverseBytesInPlace(kv.Hash[:]) + return buf[offset:], nil +} + func genericNew(prefix []byte, key bool) (interface{}, error) { t, ok := prefixRegistry[prefix[0]] if !ok { diff --git a/db/prefixes/prefixes.go b/db/prefixes/prefixes.go index 76ac64d..73e529f 100644 --- a/db/prefixes/prefixes.go +++ b/db/prefixes/prefixes.go @@ -182,12 +182,25 @@ func NewLengthEncodedPartialClaimId(s string) LengthEncodedPartialClaimId { } } +type BigEndianChainHash struct { + chainhash.Hash +} + +func NewBigEndianChainHash(hash *chainhash.Hash) BigEndianChainHash { + if hash != nil { + return BigEndianChainHash{ + *hash, + } + } + return BigEndianChainHash{} +} + type DBStateKey struct { Prefix []byte `struct:"[1]byte" json:"prefix"` } type DBStateValue struct { - Genesis *chainhash.Hash + Genesis BigEndianChainHash Height uint32 TxCount uint32 Tip *chainhash.Hash @@ -203,7 +216,7 @@ type DBStateValue struct { func NewDBStateValue() *DBStateValue { return &DBStateValue{ - Genesis: new(chainhash.Hash), + Genesis: NewBigEndianChainHash(nil), Height: 0, TxCount: 0, Tip: new(chainhash.Hash), @@ -237,7 +250,11 @@ func (v *DBStateValue) PackValue() []byte { // b'>32sLL32sLLBBlllL' n := 32 + 4 + 4 + 32 + 4 + 4 + 1 + 1 + 4 + 4 + 4 + 4 value := make([]byte, n) - copy(value, v.Genesis[:32]) + genesis := v.Genesis.CloneBytes() + // HACK: Instances of chainhash.Hash use the internal byte-order. + // Python scribe writes bytes of genesis hash in external byte-order. + internal.ReverseBytesInPlace(genesis) + copy(value, genesis[:32]) binary.BigEndian.PutUint32(value[32:], v.Height) binary.BigEndian.PutUint32(value[32+4:], v.TxCount) copy(value[32+4+4:], v.Tip[:32]) @@ -282,8 +299,11 @@ func DBStateKeyUnpack(key []byte) *DBStateKey { func DBStateValueUnpack(value []byte) *DBStateValue { genesis := (*chainhash.Hash)(value[:32]) tip := (*chainhash.Hash)(value[32+4+4 : 32+4+4+32]) + // HACK: Python scribe writes bytes of genesis hash in external byte-order. + // Instances of chainhash.Hash should use the internal byte-order. + internal.ReverseBytesInPlace(genesis[:]) x := &DBStateValue{ - Genesis: genesis, + Genesis: NewBigEndianChainHash(genesis), Height: binary.BigEndian.Uint32(value[32:]), TxCount: binary.BigEndian.Uint32(value[32+4:]), Tip: tip, @@ -708,7 +728,7 @@ type BlockTxsKey struct { } type BlockTxsValue struct { - TxHashes []*chainhash.Hash `struct-while:"!_eof" json:"tx_hashes"` + TxHashes []*chainhash.Hash `struct:"*[32]byte" struct-while:"!_eof" json:"tx_hashes"` } func (k *BlockTxsKey) NewBlockTxsKey(height uint32) *BlockTxsKey { @@ -1050,84 +1070,6 @@ func TxNumValueUnpack(value []byte) *TxNumValue { } } -type TxKey struct { - Prefix []byte `struct:"[1]byte" json:"prefix"` - TxHash *chainhash.Hash `struct:"*[32]byte" json:"tx_hash"` -} - -type TxValue struct { - RawTx []byte `struct-while:"!_eof" json:"raw_tx"` -} - -func (k *TxKey) PackKey() []byte { - prefixLen := 1 - // b'>L' - n := prefixLen + 32 - key := make([]byte, n) - copy(key, k.Prefix) - copy(key[prefixLen:], k.TxHash[:32]) - - return key -} - -func (v *TxValue) PackValue() []byte { - value := make([]byte, len(v.RawTx)) - copy(value, v.RawTx) - - return value -} - -func (kv *TxKey) NumFields() int { - return 1 -} - -func (k *TxKey) PartialPack(fields int) []byte { - // Limit fields between 0 and number of fields, we always at least need - // the prefix, and we never need to iterate past the number of fields. - if fields > 1 { - fields = 1 - } - if fields < 0 { - fields = 0 - } - - prefixLen := 1 - var n = prefixLen - for i := 0; i <= fields; i++ { - switch i { - case 1: - n += 32 - } - } - - key := make([]byte, n) - - for i := 0; i <= fields; i++ { - switch i { - case 0: - copy(key, k.Prefix) - case 1: - copy(key[prefixLen:], k.TxHash[:32]) - } - } - - return key -} - -func TxKeyUnpack(key []byte) *TxKey { - prefixLen := 1 - return &TxKey{ - Prefix: key[:prefixLen], - TxHash: (*chainhash.Hash)(key[prefixLen : prefixLen+32]), - } -} - -func TxValueUnpack(value []byte) *TxValue { - return &TxValue{ - RawTx: value, - } -} - type BlockHeaderKey struct { Prefix []byte `struct:"[1]byte" json:"prefix"` Height uint32 `json:"height"` @@ -3351,9 +3293,12 @@ func (kv *TrendingNotificationValue) UnpackValue(buf []byte) { offset += 8 } +type TxKey = MempoolTxKey +type TxValue = MempoolTxValue + type MempoolTxKey struct { - Prefix []byte `struct:"[1]byte" json:"prefix"` - TxHash []byte `struct:"[32]byte" json:"tx_hash"` + Prefix []byte `struct:"[1]byte" json:"prefix"` + TxHash *chainhash.Hash `struct:"*[32]byte" json:"tx_hash"` } type MempoolTxValue struct { @@ -3386,7 +3331,7 @@ func (kv *MempoolTxKey) UnpackKey(buf []byte) { offset := 0 kv.Prefix = buf[offset : offset+1] offset += 1 - kv.TxHash = buf[offset : offset+32] + kv.TxHash = (*chainhash.Hash)(buf[offset : offset+32]) offset += 32 } @@ -3925,12 +3870,6 @@ var prefixRegistry = map[byte]prefixMeta{ newValue: func() interface{} { return &TxValue{} }, - newKeyUnpack: func(buf []byte) interface{} { - return TxKeyUnpack(buf) - }, - newValueUnpack: func(buf []byte) interface{} { - return TxValueUnpack(buf) - }, }, BlockHash: { newKey: func() interface{} { diff --git a/internal/types.go b/internal/types.go index 4e0977d..40ffa7c 100644 --- a/internal/types.go +++ b/internal/types.go @@ -4,6 +4,7 @@ package internal // HeightHash struct for the height subscription endpoint. type HeightHash struct { - Height uint64 - BlockHash []byte + Height uint64 + BlockHash []byte + BlockHeader []byte } diff --git a/main.go b/main.go index e78be35..df2a62b 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "strconv" "time" _ "net/http/pprof" @@ -63,7 +64,7 @@ func main() { return } - conn, err := grpc.Dial("localhost:"+args.Port, + conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port), grpc.WithInsecure(), grpc.WithBlock(), ) diff --git a/server/args.go b/server/args.go index 1f6438c..491cbd4 100644 --- a/server/args.go +++ b/server/args.go @@ -22,7 +22,7 @@ const ( type Args struct { CmdType int Host string - Port string + Port int DBPath string Chain *string EsHost string @@ -67,7 +67,7 @@ type Args struct { const ( DefaultHost = "0.0.0.0" - DefaultPort = "50051" + DefaultPort = 50051 DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME DefaultEsHost = "http://localhost" DefaultEsIndex = "claims" @@ -214,7 +214,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { // main server config arguments host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost}) - port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort}) + port := parser.Int("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Validate: validatePort, Default: DefaultPort}) dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath}) chain := parser.Selector("", "chain", []string{chaincfg.MainNetParams.Name, chaincfg.TestNet3Params.Name, chaincfg.RegressionNetParams.Name, "testnet"}, &argparse.Options{Required: false, Help: "Which chain to use, default is 'mainnet'. Values 'regtest' and 'testnet' are for testing", Default: chaincfg.MainNetParams.Name}) diff --git a/server/federation.go b/server/federation.go index 2e8560d..6bd9ae5 100644 --- a/server/federation.go +++ b/server/federation.go @@ -7,6 +7,7 @@ import ( "math" "net" "os" + "strconv" "strings" "sync/atomic" "time" @@ -86,7 +87,7 @@ func (s *Server) getAndSetExternalIp(ip, port string) error { // storing them as known peers. Returns a map of peerKey -> object func (s *Server) loadPeers() error { peerFile := s.Args.PeerFile - port := s.Args.Port + port := strconv.Itoa(s.Args.Port) // First we make sure our server has come up, so we can answer back to peers. var failures = 0 @@ -181,12 +182,12 @@ func (s *Server) subscribeToPeer(peer *Peer) error { msg := &pb.ServerMessage{ Address: s.ExternalIP.String(), - Port: s.Args.Port, + Port: strconv.Itoa(s.Args.Port), } c := pb.NewHubClient(conn) - log.Printf("%s:%s subscribing to %+v\n", s.ExternalIP, s.Args.Port, peer) + log.Printf("%s:%d subscribing to %+v\n", s.ExternalIP, s.Args.Port, peer) _, err = c.PeerSubscribe(ctx, msg) if err != nil { return err @@ -219,12 +220,12 @@ func (s *Server) helloPeer(peer *Peer) (*pb.HelloMessage, error) { c := pb.NewHubClient(conn) msg := &pb.HelloMessage{ - Port: s.Args.Port, + Port: strconv.Itoa(s.Args.Port), Host: s.ExternalIP.String(), Servers: []*pb.ServerMessage{}, } - log.Printf("%s:%s saying hello to %+v\n", s.ExternalIP, s.Args.Port, peer) + log.Printf("%s:%d saying hello to %+v\n", s.ExternalIP, s.Args.Port, peer) res, err := c.Hello(ctx, msg) if err != nil { log.Println(err) @@ -345,15 +346,15 @@ func (s *Server) addPeer(newPeer *Peer, ping bool, subscribe bool) error { } } - if s.Args.Port == newPeer.Port && + if strconv.Itoa(s.Args.Port) == newPeer.Port && (localHosts[newPeer.Address] || newPeer.Address == s.ExternalIP.String()) { - log.Printf("%s:%s addPeer: Self peer, skipping...\n", s.ExternalIP, s.Args.Port) + log.Printf("%s:%d addPeer: Self peer, skipping...\n", s.ExternalIP, s.Args.Port) return nil } k := peerKey(newPeer) - log.Printf("%s:%s adding peer %+v\n", s.ExternalIP, s.Args.Port, newPeer) + log.Printf("%s:%d adding peer %+v\n", s.ExternalIP, s.Args.Port, newPeer) if oldServer, loaded := s.PeerServersLoadOrStore(newPeer); !loaded { if ping { _, err := s.helloPeer(newPeer) @@ -415,7 +416,7 @@ func (s *Server) makeHelloMessage() *pb.HelloMessage { s.PeerServersMut.RUnlock() return &pb.HelloMessage{ - Port: s.Args.Port, + Port: strconv.Itoa(s.Args.Port), Host: s.ExternalIP.String(), Servers: servers, } diff --git a/server/federation_test.go b/server/federation_test.go index 743c849..a332650 100644 --- a/server/federation_test.go +++ b/server/federation_test.go @@ -7,6 +7,7 @@ import ( "log" "net" "os" + "strconv" "strings" "testing" @@ -167,7 +168,7 @@ func TestAddPeerEndpoint(t *testing.T) { ctx := stop.NewDebug() args := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs() - args2.Port = "50052" + args2.Port = 50052 tests := []struct { name string @@ -198,7 +199,7 @@ func TestAddPeerEndpoint(t *testing.T) { go hubServer.Run() go hubServer2.Run() //go hubServer.Run() - conn, err := grpc.Dial("localhost:"+args.Port, + conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port), grpc.WithInsecure(), grpc.WithBlock(), ) @@ -240,8 +241,8 @@ func TestAddPeerEndpoint2(t *testing.T) { args := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs() args3 := server.MakeDefaultTestArgs() - args2.Port = "50052" - args3.Port = "50053" + args2.Port = 50052 + args3.Port = 50053 tests := []struct { name string @@ -266,7 +267,7 @@ func TestAddPeerEndpoint2(t *testing.T) { go hubServer.Run() go hubServer2.Run() go hubServer3.Run() - conn, err := grpc.Dial("localhost:"+args.Port, + conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port), grpc.WithInsecure(), grpc.WithBlock(), ) @@ -322,8 +323,8 @@ func TestAddPeerEndpoint3(t *testing.T) { args := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs() args3 := server.MakeDefaultTestArgs() - args2.Port = "50052" - args3.Port = "50053" + args2.Port = 50052 + args3.Port = 50053 tests := []struct { name string @@ -348,7 +349,7 @@ func TestAddPeerEndpoint3(t *testing.T) { go hubServer.Run() go hubServer2.Run() go hubServer3.Run() - conn, err := grpc.Dial("localhost:"+args.Port, + conn, err := grpc.Dial("localhost:"+strconv.Itoa(args.Port), grpc.WithInsecure(), grpc.WithBlock(), ) @@ -412,7 +413,7 @@ func TestUDPServer(t *testing.T) { args := server.MakeDefaultTestArgs() args.DisableStartUDP = false args2 := server.MakeDefaultTestArgs() - args2.Port = "50052" + args2.Port = 50052 args2.DisableStartUDP = false tests := []struct { @@ -449,12 +450,12 @@ func TestUDPServer(t *testing.T) { got1 := hubServer.ExternalIP.String() if got1 != tt.want { t.Errorf("hubServer.ExternalIP = %s, want %s\n", got1, tt.want) - t.Errorf("hubServer.Args.Port = %s\n", hubServer.Args.Port) + t.Errorf("hubServer.Args.Port = %d\n", hubServer.Args.Port) } got2 := hubServer2.ExternalIP.String() if got2 != tt.want { t.Errorf("hubServer2.ExternalIP = %s, want %s\n", got2, tt.want) - t.Errorf("hubServer2.Args.Port = %s\n", hubServer2.Args.Port) + t.Errorf("hubServer2.Args.Port = %d\n", hubServer2.Args.Port) } }) } diff --git a/server/jsonrpc_blockchain.go b/server/jsonrpc_blockchain.go index 64ce66c..771040f 100644 --- a/server/jsonrpc_blockchain.go +++ b/server/jsonrpc_blockchain.go @@ -7,6 +7,7 @@ import ( "encoding/base64" "encoding/binary" "encoding/hex" + "encoding/json" "errors" "fmt" @@ -55,6 +56,14 @@ type BlockchainScripthashService struct { session *session } +// BlockchainTransactionService methods handle "blockchain.transaction.*" RPCs +type BlockchainTransactionService struct { + DB *db.ReadOnlyDBColumnFamily + Chain *chaincfg.Params + // needed for broadcast TX + sessionMgr *sessionManager +} + const CHUNK_SIZE = 96 const MAX_CHUNK_SIZE = 40960 const HEADER_SIZE = wire.MaxBlockHeaderPayload @@ -162,9 +171,42 @@ type BlockHeadersReq struct { B64 bool `json:"b64"` } +func (req *BlockHeadersReq) UnmarshalJSON(b []byte) error { + var params [4]interface{} + err := json.Unmarshal(b, ¶ms) + if err != nil { + return err + } + switch params[0].(type) { + case float64: + req.StartHeight = uint32(params[0].(float64)) + default: + return fmt.Errorf("expected numeric argument #0 (start_height)") + } + switch params[1].(type) { + case float64: + req.Count = uint32(params[1].(float64)) + default: + return fmt.Errorf("expected numeric argument #1 (count)") + } + switch params[2].(type) { + case float64: + req.CpHeight = uint32(params[2].(float64)) + default: + return fmt.Errorf("expected numeric argument #2 (cp_height)") + } + switch params[3].(type) { + case bool: + req.B64 = params[3].(bool) + default: + return fmt.Errorf("expected boolean argument #3 (b64)") + } + return nil +} + type BlockHeadersResp struct { Base64 string `json:"base64,omitempty"` - Hex string `json:"hex,omitempty"` + Hex string `json:"hex"` Count uint32 `json:"count"` Max uint32 `json:"max"` Branch string `json:"branch,omitempty"` @@ -209,6 +251,21 @@ type HeadersSubscribeReq struct { Raw bool `json:"raw"` } +func (req *HeadersSubscribeReq) UnmarshalJSON(b []byte) error { + var params [1]interface{} + err := json.Unmarshal(b, ¶ms) + if err != nil { + return err + } + switch params[0].(type) { + case bool: + req.Raw = params[0].(bool) + default: + return fmt.Errorf("expected bool argument #0 (raw)") + } + return nil +} + type HeadersSubscribeResp struct { BlockHeaderElectrum } @@ -336,6 +393,23 @@ func (s *BlockchainScripthashService) Get_balance(req *scripthashGetBalanceReq, type AddressGetHistoryReq struct { Address string `json:"address"` } + +func (req *AddressGetHistoryReq) UnmarshalJSON(b []byte) error { + var params [1]interface{} + json.Unmarshal(b, ¶ms) + err := json.Unmarshal(b, ¶ms) + if err != nil { + return err + } + switch params[0].(type) { + case string: + req.Address = params[0].(string) + default: + return fmt.Errorf("expected string argument #0 (address)") + } + return nil +} + type TxInfo struct { TxHash string `json:"tx_hash"` Height uint32 `json:"height"` @@ -344,10 +418,7 @@ type TxInfoFee struct { TxInfo Fee uint64 `json:"fee"` } -type AddressGetHistoryResp struct { - Confirmed []TxInfo `json:"confirmed"` - Unconfirmed []TxInfoFee `json:"unconfirmed"` -} +type AddressGetHistoryResp []TxInfoFee // 'blockchain.address.get_history' func (s *BlockchainAddressService) Get_history(req *AddressGetHistoryReq, resp **AddressGetHistoryResp) error { @@ -375,11 +446,18 @@ func (s *BlockchainAddressService) Get_history(req *AddressGetHistoryReq, resp * Height: tx.Height, }) } - result := &AddressGetHistoryResp{ - Confirmed: confirmed, - Unconfirmed: []TxInfoFee{}, // TODO + unconfirmed := []TxInfoFee{} // TODO + result := make(AddressGetHistoryResp, len(confirmed)+len(unconfirmed)) + i := 0 + for _, tx := range confirmed { + result[i].TxInfo = tx + i += 1 } - *resp = result + for _, tx := range unconfirmed { + result[i] = tx + i += 1 + } + *resp = &result return err } @@ -626,3 +704,221 @@ func (s *BlockchainScripthashService) Unsubscribe(req *ScripthashSubscribeReq, r *resp = (*ScripthashSubscribeResp)(nil) return nil } + +type TransactionBroadcastReq string +type TransactionBroadcastResp string + +// 'blockchain.transaction.broadcast' +func (s *BlockchainTransactionService) Broadcast(req *TransactionBroadcastReq, resp **TransactionBroadcastResp) error { + if s.sessionMgr == nil { + return errors.New("no session manager, rpc not supported") + } + strTx := string(*req) + rawTx, err := hex.DecodeString(strTx) + if err != nil { + return err + } + txhash, err := s.sessionMgr.broadcastTx(rawTx) + if err != nil { + return err + } + result := txhash.String() + *resp = (*TransactionBroadcastResp)(&result) + return nil +} + +type TransactionGetReq string +type TXFullDetail struct { + Height int `json:"block_height"` + Pos uint32 `json:"pos"` + Merkle []string `json:"merkle"` +} +type TXDetail struct { + Height int `json:"block_height"` +} + +type TXGetItem struct { + TxHash string + TxRaw string + Detail interface{} // TXFullDetail or TXDetail struct +} +type TransactionGetResp TXGetItem + +// 'blockchain.transaction.get' +func (s *BlockchainTransactionService) Get(req *TransactionGetReq, resp **TransactionGetResp) error { + txids := [1]string{string(*req)} + request := TransactionGetBatchReq(txids[:]) + var response *TransactionGetBatchResp + err := s.Get_batch(&request, &response) + if err != nil { + return err + } + if len(*response) < 1 { + return errors.New("tx not found") + } + switch (*response)[0].Detail.(type) { + case TXFullDetail: + break + case TXDetail: + default: + return errors.New("tx not confirmed") + } + *resp = (*TransactionGetResp)(&(*response)[0]) + return err +} + +type TransactionGetBatchReq []string + +func (req *TransactionGetBatchReq) UnmarshalJSON(b []byte) error { + var params []interface{} + json.Unmarshal(b, ¶ms) + if len(params) > 100 { + return fmt.Errorf("too many tx hashes in request: %v", len(params)) + } + for i, txhash := range params { + switch params[0].(type) { + case string: + *req = append(*req, txhash.(string)) + default: + return fmt.Errorf("expected string argument #%d (tx_hash)", i) + } + } + return nil +} + +type TransactionGetBatchResp []TXGetItem + +func (resp *TransactionGetBatchResp) MarshalJSON() ([]byte, error) { + // encode key/value pairs as variable length JSON object + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + buf.WriteString("{") + for i, r := range *resp { + if i > 0 { + buf.WriteString(",") + } + txhash, raw, detail := r.TxHash, r.TxRaw, r.Detail + err := enc.Encode(txhash) + if err != nil { + return nil, err + } + buf.WriteString(":[") + err = enc.Encode(raw) + if err != nil { + return nil, err + } + buf.WriteString(",") + err = enc.Encode(detail) + if err != nil { + return nil, err + } + buf.WriteString("]") + } + buf.WriteString("}") + return buf.Bytes(), nil +} + +// 'blockchain.transaction.get_batch' +func (s *BlockchainTransactionService) Get_batch(req *TransactionGetBatchReq, resp **TransactionGetBatchResp) error { + if len(*req) > 100 { + return fmt.Errorf("too many tx hashes in request: %v", len(*req)) + } + tx_hashes := make([]chainhash.Hash, 0, len(*req)) + for i, txid := range *req { + tx_hashes = append(tx_hashes, chainhash.Hash{}) + if err := chainhash.Decode(&tx_hashes[i], txid); err != nil { + return err + } + } + dbResult, err := s.DB.GetTxMerkle(tx_hashes) + if err != nil { + return err + } + result := make([]TXGetItem, 0, len(dbResult)) + for _, r := range dbResult { + merkles := make([]string, len(r.Merkle)) + for i, m := range r.Merkle { + merkles[i] = m.String() + } + detail := TXFullDetail{ + Height: r.Height, + Pos: r.Pos, + Merkle: merkles, + } + result = append(result, TXGetItem{r.TxHash.String(), hex.EncodeToString(r.RawTx), &detail}) + } + *resp = (*TransactionGetBatchResp)(&result) + return err +} + +type TransactionGetMerkleReq struct { + TxHash string `json:"tx_hash"` + Height uint32 `json:"height"` +} +type TransactionGetMerkleResp TXGetItem + +// 'blockchain.transaction.get_merkle' +func (s *BlockchainTransactionService) Get_merkle(req *TransactionGetMerkleReq, resp **TransactionGetMerkleResp) error { + txids := [1]string{string(req.TxHash)} + request := TransactionGetBatchReq(txids[:]) + var response *TransactionGetBatchResp + err := s.Get_batch(&request, &response) + if err != nil { + return err + } + if len(*response) < 1 { + return errors.New("tx not found") + } + switch (*response)[0].Detail.(type) { + case TXFullDetail: + break + case TXDetail: + default: + return errors.New("tx not confirmed") + } + *resp = (*TransactionGetMerkleResp)(&(*response)[0]) + return err +} + +type TransactionGetHeightReq string +type TransactionGetHeightResp uint32 + +// 'blockchain.transaction.get_height' +func (s *BlockchainTransactionService) Get_height(req *TransactionGetHeightReq, resp **TransactionGetHeightResp) error { + txid := string(*(req)) + txhash, err := chainhash.NewHashFromStr(txid) + if err != nil { + return err + } + height, err := s.DB.GetTxHeight(txhash) + *resp = (*TransactionGetHeightResp)(&height) + return err +} + +type TransactionInfoReq string +type TransactionInfoResp TXGetItem + +// 'blockchain.transaction.info' +func (s *BlockchainTransactionService) Info(req *TransactionInfoReq, resp **TransactionInfoResp) error { + txids := [1]string{string(*req)} + request := TransactionGetBatchReq(txids[:]) + var response *TransactionGetBatchResp + err := s.Get_batch(&request, &response) + if err != nil { + return err + } + if len(*response) < 1 { + return errors.New("tx not found") + } + switch (*response)[0].Detail.(type) { + case TXFullDetail: + break + case TXDetail: + default: + if (*response)[0].TxHash == "" { + return errors.New("no such mempool or blockchain transaction") + } + } + *resp = (*TransactionInfoResp)(&(*response)[0]) + return err +} diff --git a/server/jsonrpc_blockchain_test.go b/server/jsonrpc_blockchain_test.go index aab7678..4616d20 100644 --- a/server/jsonrpc_blockchain_test.go +++ b/server/jsonrpc_blockchain_test.go @@ -273,12 +273,10 @@ func TestHeadersSubscribe(t *testing.T) { t.Errorf("decode err: %v", err) } note1 := headerNotification{ - HeightHash: internal.HeightHash{Height: 500}, - blockHeader: [112]byte{}, + HeightHash: internal.HeightHash{Height: 500, BlockHeader: header500}, blockHeaderElectrum: nil, blockHeaderStr: "", } - copy(note1.blockHeader[:], header500) t.Logf("sending notification") sm.doNotify(note1) diff --git a/server/jsonrpc_service.go b/server/jsonrpc_service.go index ae0d757..b846c00 100644 --- a/server/jsonrpc_service.go +++ b/server/jsonrpc_service.go @@ -52,24 +52,23 @@ func (cr *gorillaRpcCodecRequest) Method() (string, error) { // StartJsonRPC starts the json rpc server and registers the endpoints. func (s *Server) StartJsonRPC() error { - s.sessionManager.start() - defer s.sessionManager.stop() - // Set up the pure JSONRPC server with persistent connections/sessions. if s.Args.JSONRPCPort != 0 { - port := ":" + strconv.FormatUint(uint64(s.Args.JSONRPCPort), 10) - laddr, err := net.ResolveTCPAddr("tcp", port) + port := ":" + strconv.Itoa(s.Args.JSONRPCPort) + laddr, err := net.ResolveTCPAddr("tcp4", port) if err != nil { log.Errorf("ResoveIPAddr: %v\n", err) goto fail1 } - listener, err := net.ListenTCP("tcp", laddr) + listener, err := net.ListenTCP("tcp4", laddr) if err != nil { log.Errorf("ListenTCP: %v\n", err) goto fail1 } log.Infof("JSONRPC server listening on %s", listener.Addr().String()) + s.sessionManager.start() acceptConnections := func(listener net.Listener) { + defer s.sessionManager.stop() for { conn, err := listener.Accept() if err != nil { @@ -78,6 +77,7 @@ func (s *Server) StartJsonRPC() error { } log.Infof("Accepted: %v", conn.RemoteAddr()) s.sessionManager.addSession(conn) + } } go acceptConnections(netutil.LimitListener(listener, s.sessionManager.sessionsMax)) @@ -98,24 +98,29 @@ fail1: goto fail2 } - // Register other "blockchain.{block,address,scripthash}.*" handlers. + // Register "blockchain.{block,address,scripthash,transaction}.*" handlers. blockchainSvc := &BlockchainBlockService{s.DB, s.Chain} err = s1.RegisterTCPService(blockchainSvc, "blockchain_block") if err != nil { log.Errorf("RegisterTCPService: %v\n", err) goto fail2 } - err = s1.RegisterTCPService(&BlockchainHeadersService{s.DB, s.Chain, nil, nil}, "blockchain_headers") + err = s1.RegisterTCPService(&BlockchainHeadersService{s.DB, s.Chain, s.sessionManager, nil}, "blockchain_headers") if err != nil { log.Errorf("RegisterTCPService: %v\n", err) goto fail2 } - err = s1.RegisterTCPService(&BlockchainAddressService{s.DB, s.Chain, nil, nil}, "blockchain_address") + err = s1.RegisterTCPService(&BlockchainAddressService{s.DB, s.Chain, s.sessionManager, nil}, "blockchain_address") if err != nil { log.Errorf("RegisterTCPService: %v\n", err) goto fail2 } - err = s1.RegisterTCPService(&BlockchainScripthashService{s.DB, s.Chain, nil, nil}, "blockchain_scripthash") + err = s1.RegisterTCPService(&BlockchainScripthashService{s.DB, s.Chain, s.sessionManager, nil}, "blockchain_scripthash") + if err != nil { + log.Errorf("RegisterTCPService: %v\n", err) + goto fail2 + } + err = s1.RegisterTCPService(&BlockchainTransactionService{s.DB, s.Chain, s.sessionManager}, "blockchain_transaction") if err != nil { log.Errorf("RegisterTCPService: %v\n", err) goto fail2 diff --git a/server/server.go b/server/server.go index 44f4bdf..d128089 100644 --- a/server/server.go +++ b/server/server.go @@ -13,6 +13,7 @@ import ( "net/http" "os" "regexp" + "strconv" "sync" "time" @@ -136,7 +137,8 @@ func (s *Server) PeerServersLoadOrStore(peer *Peer) (actual *Peer, loaded bool) // Run "main" function for starting the server. This blocks. func (s *Server) Run() { - l, err := net.Listen("tcp", ":"+s.Args.Port) + address := ":" + strconv.Itoa(s.Args.Port) + l, err := net.Listen("tcp", address) if err != nil { log.Fatalf("failed to listen: %v", err) } @@ -171,6 +173,7 @@ func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, erro if myDB.LastState != nil { logrus.Infof("DB version: %v", myDB.LastState.DBVersion) logrus.Infof("height: %v", myDB.LastState.Height) + logrus.Infof("genesis: %v", myDB.LastState.Genesis.String()) logrus.Infof("tip: %v", myDB.LastState.Tip.String()) logrus.Infof("tx count: %v", myDB.LastState.TxCount) } @@ -274,9 +277,9 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server { // Determine which chain to use based on db and cli values dbChain := (*chaincfg.Params)(nil) - if myDB != nil && myDB.LastState != nil && myDB.LastState.Genesis != nil { + if myDB != nil && myDB.LastState != nil { // The chain params can be inferred from DBStateValue. - switch *myDB.LastState.Genesis { + switch myDB.LastState.Genesis.Hash { case *chaincfg.MainNetParams.GenesisHash: dbChain = &chaincfg.MainNetParams case *chaincfg.TestNet3Params.GenesisHash: @@ -353,11 +356,19 @@ func MakeHubServer(grp *stop.Group, args *Args) *Server { } if !args.DisableStartUDP { go func() { - err := s.UDPServer() + err := s.UDPServer(s.Args.Port) if err != nil { - log.Println("UDP Server failed!", err) + logrus.Errorf("UDP Server (%d) failed! %v", s.Args.Port, err) } }() + if s.Args.JSONRPCPort != 0 { + go func() { + err := s.UDPServer(s.Args.JSONRPCPort) + if err != nil { + logrus.Errorf("UDP Server (%d) failed! %v", s.Args.JSONRPCPort, err) + } + }() + } } if !args.DisableStartNotifier { go func() { diff --git a/server/session.go b/server/session.go index 03e2254..0b9136b 100644 --- a/server/session.go +++ b/server/session.go @@ -16,13 +16,13 @@ import ( "github.com/lbryio/herald.go/db" "github.com/lbryio/herald.go/internal" "github.com/lbryio/lbcd/chaincfg" + "github.com/lbryio/lbcd/chaincfg/chainhash" "github.com/lbryio/lbry.go/v3/extras/stop" log "github.com/sirupsen/logrus" ) type headerNotification struct { internal.HeightHash - blockHeader [HEADER_SIZE]byte blockHeaderElectrum *BlockHeaderElectrum blockHeaderStr string } @@ -66,7 +66,7 @@ func (s *session) doNotify(notification interface{}) { if s.headersSubRaw { header := note.blockHeaderStr if len(header) == 0 { - header = hex.EncodeToString(note.blockHeader[:]) + header = hex.EncodeToString(note.BlockHeader[:]) } params = &HeadersSubscribeRawResp{ Hex: header, @@ -75,7 +75,7 @@ func (s *session) doNotify(notification interface{}) { } else { header := note.blockHeaderElectrum if header == nil { // not initialized - header = newBlockHeaderElectrum(¬e.blockHeader, uint32(heightHash.Height)) + header = newBlockHeaderElectrum((*[HEADER_SIZE]byte)(note.BlockHeader), uint32(heightHash.Height)) } params = header } @@ -218,7 +218,7 @@ func (sm *sessionManager) addSession(conn net.Conn) *session { log.Errorf("RegisterName: %v\n", err) } - // Register other "blockchain.{block,address,scripthash}.*" handlers. + // Register "blockchain.{block,address,scripthash,transaction}.*" handlers. blockchainSvc := &BlockchainBlockService{sm.db, sm.chain} err = s1.RegisterName("blockchain.block", blockchainSvc) if err != nil { @@ -240,6 +240,11 @@ func (sm *sessionManager) addSession(conn net.Conn) *session { log.Errorf("RegisterName: %v\n", err) goto fail } + err = s1.RegisterName("blockchain.transaction", &BlockchainTransactionService{sm.db, sm.chain, sm}) + if err != nil { + log.Errorf("RegisterName: %v\n", err) + goto fail + } sm.grp.Add(1) go func() { @@ -276,6 +281,11 @@ func (sm *sessionManager) removeSessionLocked(sess *session) { sess.conn.Close() } +func (sm *sessionManager) broadcastTx(rawTx []byte) (*chainhash.Hash, error) { + // TODO + return nil, nil +} + func (sm *sessionManager) headersSubscribe(sess *session, raw bool, subscribe bool) { sm.sessionsMut.Lock() defer sm.sessionsMut.Unlock() @@ -315,6 +325,11 @@ func (sm *sessionManager) hashXSubscribe(sess *session, hashX []byte, original s } func (sm *sessionManager) doNotify(notification interface{}) { + switch notification.(type) { + case internal.HeightHash: + // The HeightHash notification translates to headerNotification. + notification = &headerNotification{HeightHash: notification.(internal.HeightHash)} + } sm.sessionsMut.RLock() var subsCopy sessionMap switch notification.(type) { @@ -322,8 +337,10 @@ func (sm *sessionManager) doNotify(notification interface{}) { note, _ := notification.(headerNotification) subsCopy = sm.headerSubs if len(subsCopy) > 0 { - note.blockHeaderElectrum = newBlockHeaderElectrum(¬e.blockHeader, uint32(note.Height)) - note.blockHeaderStr = hex.EncodeToString(note.blockHeader[:]) + hdr := [HEADER_SIZE]byte{} + copy(hdr[:], note.BlockHeader) + note.blockHeaderElectrum = newBlockHeaderElectrum(&hdr, uint32(note.Height)) + note.blockHeaderStr = hex.EncodeToString(note.BlockHeader[:]) } case hashXNotification: note, _ := notification.(hashXNotification) @@ -489,11 +506,6 @@ func (c *jsonPatchingCodec) Read(p []byte) (n int, err error) { // Probable single object or list argument. goto encode } - args := strings.Split(string(bracketed), ",") - if len(args) <= 1 { - // No commas at all. Definitely a single argument. - goto encode - } // The params look like ["arg1", "arg2", "arg3", ...]. // We're in trouble because our jsonrpc library does not // handle this. So pack these args in an inner list. diff --git a/server/udp.go b/server/udp.go index 07f6e55..1b75d57 100644 --- a/server/udp.go +++ b/server/udp.go @@ -219,8 +219,9 @@ func UDPPing(ip, port string) (*SPVPong, error) { // UDPServer is a goroutine that starts an udp server that implements the hubs // Ping/Pong protocol to find out about each other without making full TCP // connections. -func (s *Server) UDPServer() error { - address := ":" + s.Args.Port +func (s *Server) UDPServer(port int) error { + address := ":" + strconv.Itoa(port) + tip := make([]byte, 32) addr, err := net.ResolveUDPAddr("udp", address) if err != nil {