Add some blockchain RPC handlers and DB fetching routines #55

Merged
moodyjon merged 15 commits from blockchain_rpc1 into master 2022-09-14 17:23:35 +02:00
4 changed files with 47 additions and 10 deletions
Showing only changes of commit 50f7e91ead - Show all commits

View file

@ -550,7 +550,7 @@ func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*R
var handlesMap = make(map[string]*grocksdb.ColumnFamilyHandle) var handlesMap = make(map[string]*grocksdb.ColumnFamilyHandle)
for i, handle := range handles { for i, handle := range handles {
log.Printf("%d: %+v\n", i, handle) log.Printf("handle %d(%s): %+v\n", i, cfNames[i], handle)
handlesMap[cfNames[i]] = handle handlesMap[cfNames[i]] = handle
} }

View file

@ -194,7 +194,7 @@ type DBStateValue struct {
UtxoFlushCount uint32 UtxoFlushCount uint32
WallTime uint32 WallTime uint32
FirstSync bool FirstSync bool
DDVersion uint8 DBVersion uint8
HistFlushCount int32 HistFlushCount int32
CompFlushCount int32 CompFlushCount int32
CompCursor int32 CompCursor int32
@ -210,7 +210,7 @@ func NewDBStateValue() *DBStateValue {
UtxoFlushCount: 0, UtxoFlushCount: 0,
WallTime: 0, WallTime: 0,
FirstSync: true, FirstSync: true,
DDVersion: 0, DBVersion: 0,
HistFlushCount: 0, HistFlushCount: 0,
CompFlushCount: -1, CompFlushCount: -1,
CompCursor: -1, CompCursor: -1,
@ -248,7 +248,7 @@ func (v *DBStateValue) PackValue() []byte {
bitSetVar = 1 bitSetVar = 1
} }
value[32+4+4+32+4+4] = bitSetVar value[32+4+4+32+4+4] = bitSetVar
value[32+4+4+32+4+4+1] = v.DDVersion value[32+4+4+32+4+4+1] = v.DBVersion
binary.BigEndian.PutUint32(value[32+4+4+32+4+4+1+1:], uint32(v.HistFlushCount)) binary.BigEndian.PutUint32(value[32+4+4+32+4+4+1+1:], uint32(v.HistFlushCount))
binary.BigEndian.PutUint32(value[32+4+4+32+4+4+1+1+4:], uint32(v.CompFlushCount)) binary.BigEndian.PutUint32(value[32+4+4+32+4+4+1+1+4:], uint32(v.CompFlushCount))
@ -290,7 +290,7 @@ func DBStateValueUnpack(value []byte) *DBStateValue {
UtxoFlushCount: binary.BigEndian.Uint32(value[32+4+4+32:]), UtxoFlushCount: binary.BigEndian.Uint32(value[32+4+4+32:]),
WallTime: binary.BigEndian.Uint32(value[32+4+4+32+4:]), WallTime: binary.BigEndian.Uint32(value[32+4+4+32+4:]),
FirstSync: value[32+4+4+32+4+4] == 1, FirstSync: value[32+4+4+32+4+4] == 1,
DDVersion: value[32+4+4+32+4+4+1], DBVersion: value[32+4+4+32+4+4+1],
HistFlushCount: int32(binary.BigEndian.Uint32(value[32+4+4+32+4+4+1+1:])), HistFlushCount: int32(binary.BigEndian.Uint32(value[32+4+4+32+4+4+1+1:])),
CompFlushCount: int32(binary.BigEndian.Uint32(value[32+4+4+32+4+4+1+1+4:])), CompFlushCount: int32(binary.BigEndian.Uint32(value[32+4+4+32+4+4+1+1+4:])),
CompCursor: int32(binary.BigEndian.Uint32(value[32+4+4+32+4+4+1+1+4+4:])), CompCursor: int32(binary.BigEndian.Uint32(value[32+4+4+32+4+4+1+1+4+4:])),

View file

@ -39,11 +39,13 @@ func main() {
defer func() { defer func() {
log.Println("Shutting down server...") log.Println("Shutting down server...")
if !s.Args.DisableEs { if s.EsClient != nil {
s.EsClient.Stop() s.EsClient.Stop()
} }
if s.GrpcServer != nil {
s.GrpcServer.GracefulStop() s.GrpcServer.GracefulStop()
if !s.Args.DisableResolve { }
if s.DB != nil {
s.DB.Shutdown() s.DB.Shutdown()
} }

View file

@ -37,9 +37,9 @@ type Server struct {
MultiSpaceRe *regexp.Regexp MultiSpaceRe *regexp.Regexp
WeirdCharsRe *regexp.Regexp WeirdCharsRe *regexp.Regexp
DB *db.ReadOnlyDBColumnFamily DB *db.ReadOnlyDBColumnFamily
Chain *chaincfg.Params
EsClient *elastic.Client EsClient *elastic.Client
QueryCache *ttlcache.Cache QueryCache *ttlcache.Cache
Coin *chaincfg.Params
S256 *hash.Hash S256 *hash.Hash
LastRefreshCheck time.Time LastRefreshCheck time.Time
RefreshDelta time.Duration RefreshDelta time.Duration
@ -169,27 +169,48 @@ func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) {
log.Fatalln(err) log.Fatalln(err)
} }
if myDB.LastState != nil {
logrus.Infof("DB version: %v", myDB.LastState.DBVersion)
logrus.Infof("height: %v", myDB.LastState.Height)
logrus.Infof("tip: %v", myDB.LastState.Tip.String())
logrus.Infof("tx count: %v", myDB.LastState.TxCount)
}
blockingChannelHashes := make([][]byte, 0, 10) blockingChannelHashes := make([][]byte, 0, 10)
blockingIds := make([]string, 0, 10)
filteringChannelHashes := make([][]byte, 0, 10) filteringChannelHashes := make([][]byte, 0, 10)
filteringIds := make([]string, 0, 10)
for _, id := range args.BlockingChannelIds { for _, id := range args.BlockingChannelIds {
hash, err := hex.DecodeString(id) hash, err := hex.DecodeString(id)
if err != nil { if err != nil {
logrus.Warn("Invalid channel id: ", id) logrus.Warn("Invalid channel id: ", id)
continue
} }
blockingChannelHashes = append(blockingChannelHashes, hash) blockingChannelHashes = append(blockingChannelHashes, hash)
blockingIds = append(blockingIds, id)
} }
for _, id := range args.FilteringChannelIds { for _, id := range args.FilteringChannelIds {
hash, err := hex.DecodeString(id) hash, err := hex.DecodeString(id)
if err != nil { if err != nil {
logrus.Warn("Invalid channel id: ", id) logrus.Warn("Invalid channel id: ", id)
continue
} }
filteringChannelHashes = append(filteringChannelHashes, hash) filteringChannelHashes = append(filteringChannelHashes, hash)
filteringIds = append(filteringIds, id)
} }
myDB.BlockingChannelHashes = blockingChannelHashes myDB.BlockingChannelHashes = blockingChannelHashes
myDB.FilteringChannelHashes = filteringChannelHashes myDB.FilteringChannelHashes = filteringChannelHashes
if len(filteringIds) > 0 {
logrus.Infof("filtering claims reposted by channels: %+s", filteringIds)
}
if len(blockingIds) > 0 {
logrus.Infof("blocking claims reposted by channels: %+s", blockingIds)
}
return myDB, nil return myDB, nil
} }
@ -253,16 +274,30 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
} }
} }
chain := chaincfg.MainNetParams
if myDB != nil && myDB.LastState != nil && myDB.LastState.Genesis != nil {
// The chain params can be inferred from DBStateValue.
switch *myDB.LastState.Genesis {
case *chaincfg.MainNetParams.GenesisHash:
chain = chaincfg.MainNetParams
case *chaincfg.TestNet3Params.GenesisHash:
chain = chaincfg.TestNet3Params
case *chaincfg.RegressionNetParams.GenesisHash:
chain = chaincfg.RegressionNetParams
}
}
logrus.Infof("network: %v", chain.Name)
s := &Server{ s := &Server{
GrpcServer: grpcServer, GrpcServer: grpcServer,
Args: args, Args: args,
MultiSpaceRe: multiSpaceRe, MultiSpaceRe: multiSpaceRe,
WeirdCharsRe: weirdCharsRe, WeirdCharsRe: weirdCharsRe,
DB: myDB, DB: myDB,
Chain: &chain,
EsClient: client, EsClient: client,
QueryCache: cache, QueryCache: cache,
S256: &s256, S256: &s256,
Coin: &chaincfg.MainNetParams,
LastRefreshCheck: time.Now(), LastRefreshCheck: time.Now(),
RefreshDelta: refreshDelta, RefreshDelta: refreshDelta,
NumESRefreshes: 0, NumESRefreshes: 0,