* switch herald to herald.go * update ci/cd stuff * fix issues with binary name * we're no longer building dynamically, so turn CGO back on, and fix names * update package names in proto files
977 lines
23 KiB
977 lines
23 KiB
package db
// db.go contains basic functions for representing and accessing the state of
// a read-only version of the rocksdb database.
import (
pb "github.com/lbryio/herald.go/protobuf/go"
log "github.com/sirupsen/logrus"
// Constants
const (
// Blochchain height / expiration constants
OriginalClaimExpirationTime = 262974
ExtendedClaimExpirationTime = 2102400
ExtendedClaimExpirationForkHeight = 400155
NormalizedNameForkHeight = 539940 // targeting 21 March 2019
MinTakeoverWorkaroundHeight = 496850
MaxTakeoverWorkaroundHeight = 658300 // targeting 30 Oct 2019
WitnessForkHeight = 680770 // targeting 11 Dec 2019
AllClaimsInMerkleForkHeight = 658310 // targeting 30 Oct 2019
ProportionalDelayFactor = 32
MaxTakeoverDelay = 4032
// Initial size constants
InitialTxCountSize = 1200000
// Types and constructors, getters, setters, etc.
type ReadOnlyDBColumnFamily struct {
DB *grocksdb.DB
Handles map[string]*grocksdb.ColumnFamilyHandle
Opts *grocksdb.ReadOptions
TxCounts *stack.SliceBacked
Height uint32
LastState *prefixes.DBStateValue
Headers *stack.SliceBacked
BlockingChannelHashes [][]byte
FilteringChannelHashes [][]byte
BlockedStreams map[string][]byte
BlockedChannels map[string][]byte
FilteredStreams map[string][]byte
FilteredChannels map[string][]byte
ShutdownChan chan struct{}
DoneChan chan struct{}
Cleanup func()
type ResolveResult struct {
Name string
NormalizedName string
ClaimHash []byte
TxNum uint32
Position uint16
TxHash []byte
Height uint32
Amount uint64
ShortUrl string
IsControlling bool
CanonicalUrl string
CreationHeight uint32
ActivationHeight uint32
ExpirationHeight uint32
EffectiveAmount uint64
SupportAmount uint64
Reposted int
LastTakeoverHeight uint32
ClaimsInChannel uint32
ChannelHash []byte
RepostedClaimHash []byte
SignatureValid bool
RepostTxHash []byte
RepostTxPostition uint16
RepostHeight uint32
ChannelTxHash []byte
ChannelTxPostition uint16
ChannelHeight uint32
type ResolveError struct {
Error error
ErrorType uint8
type OptionalResolveResultOrError interface {
GetResult() *ResolveResult
GetError() *ResolveError
String() string
type optionalResolveResultOrError struct {
res *ResolveResult
err *ResolveError
func (x *optionalResolveResultOrError) GetResult() *ResolveResult {
return x.res
func (x *optionalResolveResultOrError) GetError() *ResolveError {
return x.err
func (x *ResolveResult) String() string {
return fmt.Sprintf("%#v", x)
func (x *ResolveError) String() string {
return fmt.Sprintf("ResolveError{Error: %#v}", x.Error)
func (x *optionalResolveResultOrError) String() string {
if x.res != nil {
return x.res.String()
if x.err != nil {
return x.err.String()
return fmt.Sprintf("%#v", x)
type ExpandedResolveResult struct {
Stream OptionalResolveResultOrError
Channel OptionalResolveResultOrError
Repost OptionalResolveResultOrError
RepostedChannel OptionalResolveResultOrError
func NewExpandedResolveResult() *ExpandedResolveResult {
return &ExpandedResolveResult{
Stream: &optionalResolveResultOrError{},
Channel: &optionalResolveResultOrError{},
Repost: &optionalResolveResultOrError{},
RepostedChannel: &optionalResolveResultOrError{},
func (x *ExpandedResolveResult) String() string {
return fmt.Sprintf("ExpandedResolveResult{Stream: %s, Channel: %s, Repost: %s, RepostedChannel: %s}", x.Stream, x.Channel, x.Repost, x.RepostedChannel)
func (res *ExpandedResolveResult) ToOutputs() ([]*pb.Output, []*pb.Output, error) {
txos := make([]*pb.Output, 0)
extraTxos := make([]*pb.Output, 0)
// Errors
if x := res.Channel.GetError(); x != nil {
log.Warn("Channel error: ", x)
outputErr := &pb.Output_Error{
Error: &pb.Error{
Text: x.Error.Error(),
Code: pb.Error_Code(x.ErrorType),
res := &pb.Output{Meta: outputErr}
txos = append(txos, res)
return txos, nil, nil
if x := res.Stream.GetError(); x != nil {
log.Warn("Stream error: ", x)
outputErr := &pb.Output_Error{
Error: &pb.Error{
Text: x.Error.Error(),
Code: pb.Error_Code(x.ErrorType),
res := &pb.Output{Meta: outputErr}
txos = append(txos, res)
return txos, nil, nil
// Not errors
var channel, stream, repost, repostedChannel *ResolveResult
channel = res.Channel.GetResult()
stream = res.Stream.GetResult()
repost = res.Repost.GetResult()
repostedChannel = res.RepostedChannel.GetResult()
if channel != nil && stream == nil {
// Channel
output := channel.ToOutput()
txos = append(txos, output)
if repost != nil {
output := repost.ToOutput()
extraTxos = append(extraTxos, output)
if repostedChannel != nil {
output := repostedChannel.ToOutput()
extraTxos = append(extraTxos, output)
return txos, extraTxos, nil
} else if stream != nil {
output := stream.ToOutput()
txos = append(txos, output)
if channel != nil {
output := channel.ToOutput()
extraTxos = append(extraTxos, output)
if repost != nil {
output := repost.ToOutput()
extraTxos = append(extraTxos, output)
if repostedChannel != nil {
output := repostedChannel.ToOutput()
extraTxos = append(extraTxos, output)
return txos, extraTxos, nil
return nil, nil, nil
// ToOutput
func (res *ResolveResult) ToOutput() *pb.Output {
// func ResolveResultToOutput(res *db.ResolveResult, outputType byte) *OutputWType {
// res.ClaimHash
var channelOutput *pb.Output
var repostOutput *pb.Output
if res.ChannelTxHash != nil {
channelOutput = &pb.Output{
TxHash: res.ChannelTxHash,
Nout: uint32(res.ChannelTxPostition),
Height: res.ChannelHeight,
if res.RepostTxHash != nil {
repostOutput = &pb.Output{
TxHash: res.RepostTxHash,
Nout: uint32(res.RepostTxPostition),
Height: res.RepostHeight,
claimMeta := &pb.ClaimMeta{
Channel: channelOutput,
Repost: repostOutput,
ShortUrl: res.ShortUrl,
Reposted: uint32(res.Reposted),
IsControlling: res.IsControlling,
CreationHeight: res.CreationHeight,
ExpirationHeight: res.ExpirationHeight,
ClaimsInChannel: res.ClaimsInChannel,
EffectiveAmount: res.EffectiveAmount,
SupportAmount: res.SupportAmount,
claim := &pb.Output_Claim{
Claim: claimMeta,
output := &pb.Output{
TxHash: res.TxHash,
Nout: uint32(res.Position),
Height: res.Height,
Meta: claim,
return output
type PathSegment struct {
name string
claimId string
amountOrder int
func (ps *PathSegment) Normalized() string {
return internal.NormalizeName(ps.name)
func (ps *PathSegment) IsShortId() bool {
return ps.claimId != "" && len(ps.claimId) < 40
func (ps *PathSegment) IsFullId() bool {
return len(ps.claimId) == 40
func (ps *PathSegment) String() string {
if ps.claimId != "" {
return fmt.Sprintf("%s:%s", ps.name, ps.claimId)
} else if ps.amountOrder != 0 {
return fmt.Sprintf("%s:%d", ps.name, ps.amountOrder)
return ps.name
// Iterators / db construction functions
func intMin(a, b int) int {
if a < b {
return a
return b
func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
ch := make(chan *prefixes.PrefixRowKV)
ro := grocksdb.NewDefaultReadOptions()
it := db.NewIteratorCF(ro, opts.CfHandle)
opts.It = it
if opts.Start != nil {
go func() {
defer func() {
var prevKey []byte
// FIXME: There's messy uses of kv being nil / not nil here.
var kv *prefixes.PrefixRowKV = nil
if !opts.IncludeStart {
kv = opts.ReadRow(&prevKey)
if !it.Valid() && opts.IncludeStop && kv != nil {
ch <- kv
kv = &prefixes.PrefixRowKV{}
for ; kv != nil && !opts.StopIteration(prevKey) && it.Valid(); it.Next() {
if kv = opts.ReadRow(&prevKey); kv != nil {
ch <- kv
return ch
func Iter(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
ch := make(chan *prefixes.PrefixRowKV)
ro := grocksdb.NewDefaultReadOptions()
ro.PrefixSameAsStart() -> false
ro.AutoPrefixMode() -> on
it := db.NewIterator(ro)
opts.It = it
if opts.Start != nil {
go func() {
defer it.Close()
defer close(ch)
var prevKey []byte
var kv *prefixes.PrefixRowKV = &prefixes.PrefixRowKV{}
if !opts.IncludeStart {
kv = opts.ReadRow(&prevKey)
if !it.Valid() && opts.IncludeStop && kv != nil {
ch <- kv
for ; kv != nil && !opts.StopIteration(prevKey) && it.Valid(); it.Next() {
if kv = opts.ReadRow(&prevKey); kv != nil {
ch <- kv
return ch
// GetDB functions that open and return a db
// GetWriteDBCF opens a db for writing with all columns families opened.
func GetWriteDBCF(name string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, error) {
opts := grocksdb.NewDefaultOptions()
cfOpt := grocksdb.NewDefaultOptions()
cfNames, err := grocksdb.ListColumnFamilies(opts, name)
if err != nil {
return nil, nil, err
cfOpts := make([]*grocksdb.Options, len(cfNames))
for i := range cfNames {
cfOpts[i] = cfOpt
db, handles, err := grocksdb.OpenDbColumnFamilies(opts, name, cfNames, cfOpts)
if err != nil {
return nil, nil, err
for i, handle := range handles {
log.Printf("%d: %s, %+v\n", i, cfNames[i], handle)
return db, handles, nil
// GetProdDB returns a db that is used for production.
func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func(), error) {
prefixNames := prefixes.GetPrefixes()
// additional prefixes that aren't in the code explicitly
cfNames := []string{"default", "e", "d", "c"}
for _, prefix := range prefixNames {
cfName := string(prefix)
cfNames = append(cfNames, cfName)
db, err := GetDBColumnFamilies(name, secondaryPath, cfNames)
cleanupFiles := func() {
err = os.RemoveAll(secondaryPath)
if err != nil {
if err != nil {
return nil, cleanupFiles, err
cleanupDB := func() {
db.Cleanup = cleanupDB
return db, cleanupDB, nil
// GetDBColumnFamilies gets a db with the specified column families and secondary path.
func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*ReadOnlyDBColumnFamily, error) {
opts := grocksdb.NewDefaultOptions()
roOpts := grocksdb.NewDefaultReadOptions()
cfOpt := grocksdb.NewDefaultOptions()
//cfNames := []string{"default", cf}
cfOpts := make([]*grocksdb.Options, len(cfNames))
for i := range cfNames {
cfOpts[i] = cfOpt
db, handles, err := grocksdb.OpenDbAsSecondaryColumnFamilies(opts, name, secondayPath, cfNames, cfOpts)
// db, handles, err := grocksdb.OpenDbColumnFamilies(opts, name, cfNames, cfOpts)
if err != nil {
return nil, err
var handlesMap = make(map[string]*grocksdb.ColumnFamilyHandle)
for i, handle := range handles {
log.Printf("%d: %+v\n", i, handle)
handlesMap[cfNames[i]] = handle
myDB := &ReadOnlyDBColumnFamily{
DB: db,
Handles: handlesMap,
Opts: roOpts,
BlockedStreams: make(map[string][]byte),
BlockedChannels: make(map[string][]byte),
FilteredStreams: make(map[string][]byte),
FilteredChannels: make(map[string][]byte),
TxCounts: nil,
LastState: nil,
Height: 0,
Headers: nil,
ShutdownChan: make(chan struct{}),
DoneChan: make(chan struct{}),
err = myDB.ReadDBState() //TODO: Figure out right place for this
if err != nil {
return nil, err
err = myDB.InitTxCounts()
if err != nil {
return nil, err
err = myDB.InitHeaders()
if err != nil {
return nil, err
err = myDB.GetBlocksAndFilters()
if err != nil {
return nil, err
return myDB, nil
// Advance advance the db to the given height.
func (db *ReadOnlyDBColumnFamily) Advance(height uint32) {
// DB wasn't created when we initialized headers, reinit
if db.TxCounts.Len() == 0 {
// TODO: assert tx_count not in self.db.tx_counts, f'boom {tx_count} in {len(self.db.tx_counts)} tx counts'
if db.TxCounts.Len() != height {
log.Error("tx count len:", db.TxCounts.Len(), "height:", height)
headerObj, err := db.GetHeader(height)
if err != nil {
log.Error("getting header:", err)
txCountObj, err := db.GetTxCount(height)
if err != nil {
log.Error("getting tx count:", err)
txCount := txCountObj.TxCount
if db.TxCounts.GetTip().(uint32) >= txCount {
log.Error("current tip should be less than new txCount",
"tx count tip:", db.TxCounts.GetTip(), "tx count:", txCount)
// Unwind unwinds the db one block height
func (db *ReadOnlyDBColumnFamily) Unwind() {
// Shutdown shuts down the db.
func (db *ReadOnlyDBColumnFamily) Shutdown() {
db.ShutdownChan <- struct{}{}
// RunDetectChanges Go routine the runs continuously while the hub is active
// to keep the db readonly view up to date and handle reorgs on the
// blockchain.
func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan *internal.HeightHash) {
go func() {
lastPrint := time.Now()
for {
// FIXME: Figure out best sleep interval
if time.Since(lastPrint) > time.Second {
log.Debug("DetectChanges:", db.LastState)
lastPrint = time.Now()
err := db.detectChanges(notifCh)
if err != nil {
log.Infof("Error detecting changes: %#v", err)
select {
case <-db.ShutdownChan:
db.DoneChan <- struct{}{}
case <-time.After(time.Millisecond * 10):
// DetectChanges keep the rocksdb db in sync and handle reorgs
func (db *ReadOnlyDBColumnFamily) detectChanges(notifCh chan *internal.HeightHash) error {
err := db.DB.TryCatchUpWithPrimary()
if err != nil {
return err
state, err := db.GetDBState()
if err != nil {
return err
if state == nil || state.Height <= 0 {
return nil
log.Debugf("db.LastState %#v, state: %#v", db.LastState, state)
if db.LastState != nil && db.LastState.Height > state.Height {
log.Info("reorg detected, waiting until the writer has flushed the new blocks to advance")
return nil
var lastHeight uint32 = 0
var rewound bool = false
if db.LastState != nil {
lastHeight = db.LastState.Height
for {
lastHeightHeader, err := db.GetHeader(lastHeight)
if err != nil {
return err
curHeaderObj := db.Headers.GetTip()
if curHeaderObj == nil {
curHeader := curHeaderObj.([]byte)
log.Debugln("lastHeightHeader: ", hex.EncodeToString(lastHeightHeader))
log.Debugln("curHeader: ", hex.EncodeToString(curHeader))
if bytes.Equal(curHeader, lastHeightHeader) {
log.Traceln("connects to block", lastHeight)
} else {
log.Infoln("disconnect block", lastHeight)
rewound = true
lastHeight -= 1
if rewound {
hash, err := db.GetBlockHash(lastHeight)
if err != nil {
return err
notifCh <- &internal.HeightHash{Height: uint64(lastHeight), BlockHash: hash}
err = db.ReadDBState()
if err != nil {
return err
if db.LastState == nil || lastHeight < state.Height {
for height := lastHeight + 1; height <= state.Height; height++ {
log.Info("advancing to: ", height)
hash, err := db.GetBlockHash(height)
if err != nil {
log.Info("error getting block hash: ", err)
return err
notifCh <- &internal.HeightHash{Height: uint64(height), BlockHash: hash}
//TODO: ClearCache
log.Warn("implement cache clearing")
db.LastState = state
//TODO: update blocked streams
//TODO: update filtered streams
log.Warn("implement updating blocked streams")
log.Warn("implement updating filtered streams")
return nil
func (db *ReadOnlyDBColumnFamily) ReadDBState() error {
state, err := db.GetDBState()
if err != nil {
return err
if state != nil {
db.LastState = state
} else {
db.LastState = prefixes.NewDBStateValue()
return nil
func (db *ReadOnlyDBColumnFamily) InitHeaders() error {
handle, err := db.EnsureHandle(prefixes.Header)
if err != nil {
return err
//TODO: figure out a reasonable default and make it a constant
db.Headers = stack.NewSliceBacked(12000)
startKey := prefixes.NewHeaderKey(0)
// endKey := prefixes.NewHeaderKey(db.LastState.Height)
startKeyRaw := startKey.PackKey()
// endKeyRaw := endKey.PackKey()
options := NewIterateOptions().WithPrefix([]byte{prefixes.Header}).WithCfHandle(handle)
options = options.WithIncludeKey(false).WithIncludeValue(true) //.WithIncludeStop(true)
options = options.WithStart(startKeyRaw) //.WithStop(endKeyRaw)
ch := IterCF(db.DB, options)
for header := range ch {
return nil
// InitTxCounts initializes the txCounts map
func (db *ReadOnlyDBColumnFamily) InitTxCounts() error {
start := time.Now()
handle, err := db.EnsureHandle(prefixes.TxCount)
if err != nil {
return err
db.TxCounts = stack.NewSliceBacked(InitialTxCountSize)
options := NewIterateOptions().WithPrefix([]byte{prefixes.TxCount}).WithCfHandle(handle)
options = options.WithIncludeKey(false).WithIncludeValue(true).WithIncludeStop(true)
ch := IterCF(db.DB, options)
for txCount := range ch {
duration := time.Since(start)
log.Println("len(db.TxCounts), cap(db.TxCounts):", db.TxCounts.Len(), db.TxCounts.Cap())
log.Println("Time to get txCounts:", duration)
return nil
// RunGetBlocksAndFilters Go routine that runs continuously while the hub is active
// to keep the blocked and filtered channels and streams up to date.
func (db *ReadOnlyDBColumnFamily) RunGetBlocksAndFilters() {
go func() {
for {
// FIXME: Figure out best sleep interval
err := db.GetBlocksAndFilters()
if err != nil {
log.Printf("Error getting blocked and filtered chanels: %#v\n", err)
time.Sleep(time.Second * 10)
// GetBlocksAndFilters gets the blocked and filtered channels and streams from the database.
func (db *ReadOnlyDBColumnFamily) GetBlocksAndFilters() error {
blockedChannels, blockedStreams, err := db.GetStreamsAndChannelRepostedByChannelHashes(db.BlockingChannelHashes)
if err != nil {
return err
db.BlockedChannels = blockedChannels
db.BlockedStreams = blockedStreams
filteredChannels, filteredStreams, err := db.GetStreamsAndChannelRepostedByChannelHashes(db.FilteringChannelHashes)
if err != nil {
return err
db.FilteredChannels = filteredChannels
db.FilteredStreams = filteredStreams
return nil
// GetDBCF Get the database and open given column families.
func GetDBCF(name string, cf string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, error) {
opts := grocksdb.NewDefaultOptions()
cfOpt := grocksdb.NewDefaultOptions()
cfNames := []string{"default", cf}
cfOpts := []*grocksdb.Options{cfOpt, cfOpt}
db, handles, err := grocksdb.OpenDbAsSecondaryColumnFamilies(opts, name, "asdf", cfNames, cfOpts)
for i, handle := range handles {
log.Printf("%d: %+v\n", i, handle)
if err != nil {
return nil, nil, err
return db, handles, nil
// GetDB Get the database.
func GetDB(name string) (*grocksdb.DB, error) {
opts := grocksdb.NewDefaultOptions()
db, err := grocksdb.OpenDbAsSecondary(opts, name, "asdf")
if err != nil {
return nil, err
return db, nil
// Reading utility functions
// ReadPrefixN Reads n entries from a rocksdb db starting at the given prefix
// Does not use column families
func ReadPrefixN(db *grocksdb.DB, prefix []byte, n int) []*prefixes.PrefixRowKV {
ro := grocksdb.NewDefaultReadOptions()
it := db.NewIterator(ro)
defer it.Close()
res := make([]*prefixes.PrefixRowKV, n)
var i = 0
for ; it.Valid(); it.Next() {
key := it.Key()
value := it.Value()
res[i] = &prefixes.PrefixRowKV{
Key: key.Data(),
Value: value.Data(),
if i >= n {
return res
// ReadWriteRawNColumnFamilies reads n entries from a given column famliy of a rocksdb db
// and writes then to a given file.
func ReadWriteRawNColumnFamilies(db *grocksdb.DB, options *IterOptions, out string, n int) {
readWriteRawNCF(db, options, out, n, 1)
// ReadWriteRawNColumnFamilies reads n entries from a given column famliy of a rocksdb db
// and writes then to a given file.
func ReadWriteRawNCF(db *grocksdb.DB, options *IterOptions, out string, n int) {
readWriteRawNCF(db, options, out, n, 0)
// readWriteRawNCF reads n entries from a given column famliy of a rocksdb db and
// writes them as a csv to a give file.
func readWriteRawNCF(db *grocksdb.DB, options *IterOptions, out string, n int, fileVersion int) {
var formatStr string = ""
switch fileVersion {
case 0:
formatStr = "%s,\n"
case 1:
formatStr = "%s,,\n"
options.RawKey = true
options.RawValue = true
ch := IterCF(db, options)
file, err := os.Create(out)
if err != nil {
defer file.Close()
var i = 0
cf := string(options.Prefix)
file.Write([]byte(fmt.Sprintf(formatStr, options.Prefix)))
for kv := range ch {
if i >= n {
key := kv.Key.([]byte)
value := kv.Value.([]byte)
keyHex := hex.EncodeToString(key)
valueHex := hex.EncodeToString(value)
if fileVersion == 1 {
// ReadWriteRawN reads n entries from a given rocksdb db and writes them as a
// csv to a give file.
func ReadWriteRawN(db *grocksdb.DB, options *IterOptions, out string, n int) {
options.RawKey = true
options.RawValue = true
ch := Iter(db, options)
file, err := os.Create(out)
if err != nil {
defer file.Close()
var i = 0
for kv := range ch {
if i >= n {
key := kv.Key.([]byte)
value := kv.Value.([]byte)
keyHex := hex.EncodeToString(key)
valueHex := hex.EncodeToString(value)
// GenerateTestData generates a test data file for a prefix.
func GenerateTestData(prefix byte, fileName string) {
dbVal, err := GetDB("/mnt/d/data/wallet/lbry-rocksdb/")
if err != nil {
options := NewIterateOptions()
ReadWriteRawN(dbVal, options, fileName, 10)