diff --git a/db/db.go b/db/db.go index b34316a..352785f 100644 --- a/db/db.go +++ b/db/db.go @@ -7,6 +7,7 @@ import ( "log" "math" "os" + "sort" "strings" "time" @@ -16,11 +17,28 @@ import ( "github.com/linxGnu/grocksdb" ) +const ( + nOriginalClaimExpirationTime = 262974 + nExtendedClaimExpirationTime = 2102400 + nExtendedClaimExpirationForkHeight = 400155 + nNormalizedNameForkHeight = 539940 // targeting 21 March 2019 + nMinTakeoverWorkaroundHeight = 496850 + nMaxTakeoverWorkaroundHeight = 658300 // targeting 30 Oct 2019 + nWitnessForkHeight = 680770 // targeting 11 Dec 2019 + nAllClaimsInMerkleForkHeight = 658310 // targeting 30 Oct 2019 + proportionalDelayFactor = 32 + maxTakeoverDelay = 4032 +) + type ReadOnlyDBColumnFamily struct { - DB *grocksdb.DB - Handles map[string]*grocksdb.ColumnFamilyHandle - Opts *grocksdb.ReadOptions - TxCounts []uint32 + DB *grocksdb.DB + Handles map[string]*grocksdb.ColumnFamilyHandle + Opts *grocksdb.ReadOptions + TxCounts []uint32 + BlockedStreams map[string][]byte + BlockedChannels map[string][]byte + FilteredStreams map[string][]byte + FilteredChannels map[string][]byte } type ResolveResult struct { @@ -227,6 +245,27 @@ func ParseURL(url string) (parsed *URL, err error) { return NewURL(), nil } +// BisectRight returns the index of the first element in the list that is greater than or equal to the value. +func BisectRight(arr []uint32, val uint32) uint32 { + i := sort.Search(len(arr), func(i int) bool { return arr[i] >= val }) + return uint32(i) +} + +func GetExpirationHeight(lastUpdatedHeight uint32) uint32 { + return GetExpirationHeightFull(lastUpdatedHeight, false) +} + +func GetExpirationHeightFull(lastUpdatedHeight uint32, extended bool) uint32 { + if extended { + return lastUpdatedHeight + nExtendedClaimExpirationTime + } + if lastUpdatedHeight < nExtendedClaimExpirationForkHeight { + return lastUpdatedHeight + nOriginalClaimExpirationTime + } + return lastUpdatedHeight + nExtendedClaimExpirationTime +} + +// PrepareResolveResult prepares a ResolveResult for use. func PrepareResolveResult( db *ReadOnlyDBColumnFamily, txNum uint32, @@ -255,8 +294,9 @@ func PrepareResolveResult( created_height = bisect_right(self.tx_counts, root_tx_num) last_take_over_height = controlling_claim.height */ - height := txNum - createdHeight := rootTxNum + // FIXME: Actually do this + height := BisectRight(db.TxCounts, txNum) + createdHeight := BisectRight(db.TxCounts, rootTxNum) lastTakeoverHeight := controllingClaim.Height /* @@ -264,8 +304,7 @@ func PrepareResolveResult( support_amount = self.get_support_amount(claim_hash) claim_amount = self.get_cached_claim_txo(claim_hash).amount */ - //expirationHeight := activationHeight + int(controllingClaim.Expiration) - expirationHeight := activationHeight + height // FIXME + expirationHeight := GetExpirationHeight(height) supportAmount, err := GetSupportAmount(db, claimHash) if err != nil { return nil, err @@ -397,26 +436,34 @@ func GetShortClaimIdUrl(db *ReadOnlyDBColumnFamily, name string, normalizedName print(f"{claim_id} has a collision") return f'{name}#{claim_id}' */ - // prefix := []byte{prefixes.ClaimShortIdPrefix} - // cfName := string(prefix) - // handle := db.Handles[cfName] - // claimId := hex.EncodeToString(claimHash) - // for prefixLen := 0; prefixLen < 10; prefixLen++ { - // // Prefix and handle - // options := NewIterateOptions().WithPrefix(prefix).WithCfHandle(handle) - // // Start and stop bounds - // options = options.WithStart(startKeyRaw).WithStop(endKeyRaw) - // // Don't include the key - // options = options.WithIncludeKey(false) + prefix := []byte{prefixes.ClaimShortIdPrefix} + handle := db.Handles[string(prefix)] + claimId := hex.EncodeToString(claimHash) + claimIdLen := len(claimId) + for prefixLen := 0; prefixLen < 10; prefixLen++ { + var j int = prefixLen + 1 + if j > claimIdLen { + j = claimIdLen + } + partialClaimId := claimId[:j] + key := prefixes.NewClaimShortIDKey(normalizedName, partialClaimId) + keyPrefix := prefixes.ClaimShortIDKeyPackPartial(key, 2) + // Prefix and handle + options := NewIterateOptions().WithPrefix(prefix).WithCfHandle(handle) + // Start and stop bounds + options = options.WithStart(keyPrefix) + // Don't include the key + options = options.WithIncludeValue(false) - // IterCF(db.DB) - // // for k, _ := range db.ClaimShortId.Iterate(prefix=(normalizedName, claimId[:prefixLen+1]), includeValue=false) { - // // if k.RootTxNum == rootTxNum && k.RootPosition == rootPosition { - // // return fmt.Sprintf("%s#%s", name, k.PartialClaimId), nil - // // } - // // break - // // } - // } + ch := IterCF(db.DB, options) + for row := range ch { + key := row.Key.(*prefixes.ClaimShortIDKey) + if key.RootTxNum == rootTxNum && key.RootPosition == rootPosition { + return fmt.Sprintf("%s#%s", name, key.PartialClaimId), nil + } + break + } + } return "", nil } @@ -455,7 +502,7 @@ func GetChannelForClaim(db *ReadOnlyDBColumnFamily, claimHash []byte, txNum uint slice, err := db.DB.GetCF(db.Opts, handle, rawKey) if err != nil { return nil, err - } else if slice == nil { + } else if slice.Size() == 0 { return nil, nil } rawValue := make([]byte, len(slice.Data())) @@ -535,7 +582,7 @@ func GetSupportAmount(db *ReadOnlyDBColumnFamily, claimHash []byte) (uint64, err slice, err := db.DB.GetCF(db.Opts, handle, rawKey) if err != nil { return 0, err - } else if slice == nil { + } else if slice.Size() == 0 { return 0, nil } rawValue := make([]byte, len(slice.Data())) @@ -564,7 +611,11 @@ func GetTxHash(db *ReadOnlyDBColumnFamily, txNum uint32) ([]byte, error) { return rawValue, nil } -func GetActivation(db *ReadOnlyDBColumnFamily, txNum uint32, postition uint16, isSupport bool) (uint32, error) { +func GetActivation(db *ReadOnlyDBColumnFamily, txNum uint32, postition uint16) (uint32, error) { + return GetActivationFull(db, txNum, postition, false) +} + +func GetActivationFull(db *ReadOnlyDBColumnFamily, txNum uint32, postition uint16, isSupport bool) (uint32, error) { /* def get_activation(self, tx_num, position, is_support=False) -> int: activation = self.prefix_db.activated.get( @@ -597,7 +648,7 @@ func GetActivation(db *ReadOnlyDBColumnFamily, txNum uint32, postition uint16, i } func GetCachedClaimTxo(db *ReadOnlyDBColumnFamily, claim []byte) (*prefixes.ClaimToTXOValue, error) { - // TODO: implement cache? + // TODO: implement cache key := prefixes.NewClaimToTXOKey(claim) cfName := string(prefixes.ClaimToTXO) handle := db.Handles[cfName] @@ -621,6 +672,9 @@ func GetControllingClaim(db *ReadOnlyDBColumnFamily, name string) (*prefixes.Cla if err != nil { return nil, err } + if slice.Size() == 0 { + return nil, nil + } rawValue := make([]byte, len(slice.Data())) copy(rawValue, slice.Data()) value := prefixes.ClaimTakeoverValueUnpack(rawValue) @@ -632,7 +686,7 @@ func FsGetClaimByHash(db *ReadOnlyDBColumnFamily, claimHash []byte) (*ResolveRes if err != nil { return nil, err } - activation, err := GetActivation(db, claim.TxNum, claim.Position, false) + activation, err := GetActivation(db, claim.TxNum, claim.Position) if err != nil { return nil, err } @@ -642,7 +696,6 @@ func FsGetClaimByHash(db *ReadOnlyDBColumnFamily, claimHash []byte) (*ResolveRes activation, claim.channel_signature_is_valid ) */ - //return PrepareResolveResult(db, 0, 0, nil, "", 0, 0, 0, false) return PrepareResolveResult( db, claim.TxNum, @@ -660,80 +713,207 @@ func ClaimShortIdIter(db *ReadOnlyDBColumnFamily, normalizedName string, claimId prefix := []byte{prefixes.ClaimShortIdPrefix} handle := db.Handles[string(prefix)] key := prefixes.NewClaimShortIDKey(normalizedName, claimId) - rawKeyPrefix := prefixes.ClaimShortIDKeyPackPartial(key, 2) + var rawKeyPrefix []byte = nil + if claimId != "" { + rawKeyPrefix = prefixes.ClaimShortIDKeyPackPartial(key, 2) + } else { + rawKeyPrefix = prefixes.ClaimShortIDKeyPackPartial(key, 1) + } options := NewIterateOptions().WithCfHandle(handle).WithPrefix(rawKeyPrefix) - options = options.WithIncludeValue(true) + options = options.WithIncludeValue(true) //.WithIncludeStop(true) ch := IterCF(db.DB, options) return ch } -func GetCachedClaimHash(db *ReadOnlyDBColumnFamily, txNum uint32, position uint16) ([]byte, error) { +func GetCachedClaimHash(db *ReadOnlyDBColumnFamily, txNum uint32, position uint16) (*prefixes.TXOToClaimValue, error) { /* - if self._cache_all_claim_txos: + if self._cache_all_claim_txos: if tx_num not in self.txo_to_claim: return return self.txo_to_claim[tx_num].get(position, None) v = self.prefix_db.txo_to_claim.get_pending(tx_num, position) return None if not v else v.claim_hash */ - return nil, nil + // TODO: implement cache + key := prefixes.NewTXOToClaimKey(txNum, position) + cfName := string(prefixes.TXOToClaim) + handle := db.Handles[cfName] + rawKey := key.PackKey() + slice, err := db.DB.GetCF(db.Opts, handle, rawKey) + if err != nil { + return nil, err + } + rawValue := make([]byte, len(slice.Data())) + copy(rawValue, slice.Data()) + value := prefixes.TXOToClaimValueUnpack(rawValue) + return value, nil } func ResolveParsedUrl(db *ReadOnlyDBColumnFamily, parsed *PathSegment) (*ResolveResult, error) { normalizedName := util.NormalizeName(parsed.name) - if (parsed.amountOrder == 0 && parsed.claimId == "") || parsed.amountOrder == 1 { + if (parsed.amountOrder == -1 && parsed.claimId == "") || parsed.amountOrder == 1 { controlling, err := GetControllingClaim(db, normalizedName) if err != nil { return nil, err } + if controlling == nil { + return nil, nil + } return FsGetClaimByHash(db, controlling.ClaimHash) } var amountOrder int = int(math.Max(float64(parsed.amountOrder), 1)) - log.Println(amountOrder) + log.Println("amountOrder:", amountOrder) if parsed.claimId != "" { if len(parsed.claimId) == 40 { - // Do full claim stuff - log.Println("TODO") - } - ch := ClaimShortIdIter(db, normalizedName, parsed.claimId) - for row := range ch { - key := row.Value.(*prefixes.ClaimShortIDKey) - claimTxo := row.Value.(*prefixes.ClaimShortIDValue) - fullClaimHash, err := GetCachedClaimHash(db, claimTxo.TxNum, claimTxo.Position) + claimHash, err := hex.DecodeString(parsed.claimId) if err != nil { return nil, err } - c, err := GetCachedClaimTxo(db, fullClaimHash) + // Maybe don't use caching version, when I actually implement the cache + claimTxo, err := GetCachedClaimTxo(db, claimHash) if err != nil { return nil, err } - nonNormalizedName := c.Name - signatureIsValid := c.ChannelSignatureIsValid - activation, err := GetActivation(db, claimTxo.TxNum, claimTxo.Position, false) + if claimTxo == nil || claimTxo.NormalizedName() != normalizedName { + return nil, nil + } + activation, err := GetActivation(db, claimTxo.TxNum, claimTxo.Position) if err != nil { return nil, err } - // LOL how do we ignore this static-check from Go? return PrepareResolveResult( db, claimTxo.TxNum, claimTxo.Position, - fullClaimHash, - nonNormalizedName, - key.RootTxNum, - key.RootPosition, + claimHash, + claimTxo.Name, + claimTxo.RootTxNum, + claimTxo.RootPosition, activation, - signatureIsValid, + claimTxo.ChannelSignatureIsValid, ) } + log.Println("nomalizedName:", normalizedName) + log.Println("claimId:", parsed.claimId) + var j int = 10 + if len(parsed.claimId) < j { + j = len(parsed.claimId) + } + ch := ClaimShortIdIter(db, normalizedName, parsed.claimId[:j]) + log.Printf("ch: %#v\n", ch) + row := <-ch + key := row.Key.(*prefixes.ClaimShortIDKey) + claimTxo := row.Value.(*prefixes.ClaimShortIDValue) + log.Println("key:", key) + log.Println("claimTxo:", claimTxo) + fullClaimHash, err := GetCachedClaimHash(db, claimTxo.TxNum, claimTxo.Position) + log.Println("fullClaimHash:", fullClaimHash) + if err != nil { + return nil, err + } + c, err := GetCachedClaimTxo(db, fullClaimHash.ClaimHash) + log.Println("c:", c) + if err != nil { + return nil, err + } + nonNormalizedName := c.Name + signatureIsValid := c.ChannelSignatureIsValid + activation, err := GetActivation(db, claimTxo.TxNum, claimTxo.Position) + if err != nil { + return nil, err + } + return PrepareResolveResult( + db, + claimTxo.TxNum, + claimTxo.Position, + fullClaimHash.ClaimHash, + nonNormalizedName, + key.RootTxNum, + key.RootPosition, + activation, + signatureIsValid, + ) } return nil, nil } +func ResolveClaimInChannel(db *ReadOnlyDBColumnFamily, claimHash []byte, normalizedName string) (*ResolveResult, error) { + return nil, nil +} + +/* +TODO: Implement blocking / filtering loading + async def reload_blocking_filtering_streams(self): + def reload(): + self.blocked_streams, self.blocked_channels = self.get_streams_and_channels_reposted_by_channel_hashes( + self.blocking_channel_hashes + ) + self.filtered_streams, self.filtered_channels = self.get_streams_and_channels_reposted_by_channel_hashes( + self.filtering_channel_hashes + ) + await asyncio.get_event_loop().run_in_executor(self._executor, reload) + + def get_streams_and_channels_reposted_by_channel_hashes(self, reposter_channel_hashes: Set[bytes]): + streams, channels = {}, {} + for reposter_channel_hash in reposter_channel_hashes: + for stream in self.prefix_db.channel_to_claim.iterate((reposter_channel_hash, ), include_key=False): + repost = self.get_repost(stream.claim_hash) + if repost: + txo = self.get_claim_txo(repost) + if txo: + if txo.normalized_name.startswith('@'): + channels[repost] = reposter_channel_hash + else: + streams[repost] = reposter_channel_hash + return streams, channels +*/ + +// GetBlockerHash get the hash of the blocker or filterer of the claim. +// TODO: this currently converts the byte arrays to strings, which is not +// very efficient. Might want to figure out a better way to do this. +func GetBlockerHash(db *ReadOnlyDBColumnFamily, claimHash, repostedClaimHash, channelHash []byte) ([]byte, []byte, error) { + claimHashStr := string(claimHash) + respostedClaimHashStr := string(repostedClaimHash) + channelHashStr := string(channelHash) + + var blockedHash []byte = nil + var filteredHash []byte = nil + + blockedHash = db.BlockedStreams[claimHashStr] + if blockedHash == nil { + blockedHash = db.BlockedStreams[respostedClaimHashStr] + } + if blockedHash == nil { + blockedHash = db.BlockedChannels[claimHashStr] + } + if blockedHash == nil { + blockedHash = db.BlockedChannels[respostedClaimHashStr] + } + if blockedHash == nil { + blockedHash = db.BlockedChannels[channelHashStr] + } + + filteredHash = db.FilteredStreams[claimHashStr] + if filteredHash == nil { + filteredHash = db.FilteredStreams[respostedClaimHashStr] + } + if filteredHash == nil { + filteredHash = db.FilteredChannels[claimHashStr] + } + if filteredHash == nil { + filteredHash = db.FilteredChannels[respostedClaimHashStr] + } + if filteredHash == nil { + filteredHash = db.FilteredChannels[channelHashStr] + } + + return blockedHash, filteredHash, nil +} + func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult { var res = &ExpandedResolveResult{ Stream: nil, @@ -755,9 +935,9 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult { log.Printf("parsed: %#v\n", parsed) // has stream in channel - if strings.Compare(parsed.StreamName, "") != 0 && strings.Compare(parsed.ChannelName, "") != 0 { + if strings.Compare(parsed.StreamName, "") != 0 && strings.Compare(parsed.ClaimName, "") != 0 { channel = &PathSegment{ - name: parsed.ChannelName, + name: parsed.ClaimName, claimId: parsed.ChannelClaimId, amountOrder: parsed.PrimaryBidPosition, } @@ -766,9 +946,9 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult { claimId: parsed.StreamClaimId, amountOrder: parsed.SecondaryBidPosition, } - } else if strings.Compare(parsed.ChannelName, "") != 0 { + } else if strings.Compare(parsed.ClaimName, "") != 0 { channel = &PathSegment{ - name: parsed.ChannelName, + name: parsed.ClaimName, claimId: parsed.ChannelClaimId, amountOrder: parsed.PrimaryBidPosition, } @@ -792,15 +972,138 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult { err: &ResolveError{err}, } return res + } else if resolvedChannel == nil { + res.Channel = &optionalResolveResultOrError{ + err: &ResolveError{fmt.Errorf("could not find channel in \"%s\"", url)}, + } + return res } } - log.Println("resolvedChannel:", resolvedChannel) - return nil + log.Printf("resolvedChannel: %#v\n", resolvedChannel) + log.Printf("resolvedChannel.TxHash: %s\n", hex.EncodeToString(resolvedChannel.TxHash)) + log.Printf("resolvedChannel.ClaimHash: %s\n", hex.EncodeToString(resolvedChannel.ClaimHash)) + log.Printf("resolvedChannel.ChannelHash: %s\n", hex.EncodeToString(resolvedChannel.ChannelHash)) + // s := + // log.Printf("resolvedChannel.TxHash: %s\n", hex.EncodeToString(sort.IntSlice(resolvedChannel.TxHash))) if stream != nil { if resolvedChannel != nil { - + streamClaim, err := ResolveClaimInChannel(db, resolvedChannel.ClaimHash, stream.Normalized()) + // TODO: Confirm error case + if err != nil { + res.Stream = &optionalResolveResultOrError{ + err: &ResolveError{err}, + } + return res + } + if streamClaim != nil { + resolvedStream, err = FsGetClaimByHash(db, streamClaim.ClaimHash) + // TODO: Confirm error case + if err != nil { + res.Stream = &optionalResolveResultOrError{ + err: &ResolveError{err}, + } + return res + } + } } else { + resolvedStream, err = ResolveParsedUrl(db, stream) + // TODO: Confirm error case + if err != nil { + res.Stream = &optionalResolveResultOrError{ + err: &ResolveError{err}, + } + return res + } + if channel == nil && resolvedChannel == nil && resolvedStream != nil && len(resolvedStream.ChannelHash) > 0 { + resolvedChannel, err = FsGetClaimByHash(db, resolvedStream.ChannelHash) + // TODO: Confirm error case + if err != nil { + res.Channel = &optionalResolveResultOrError{ + err: &ResolveError{err}, + } + return res + } + } + } + if resolvedStream == nil { + res.Stream = &optionalResolveResultOrError{ + err: &ResolveError{fmt.Errorf("could not find stream in \"%s\"", url)}, + } + return res + } + } + /* + repost = None + reposted_channel = None + if resolved_stream or resolved_channel: + claim_hash = resolved_stream.claim_hash if resolved_stream else resolved_channel.claim_hash + claim = resolved_stream if resolved_stream else resolved_channel + reposted_claim_hash = resolved_stream.reposted_claim_hash if resolved_stream else None + + if blocker_hash: + reason_row = self._fs_get_claim_by_hash(blocker_hash) + return ExpandedResolveResult( + None, ResolveCensoredError(url, blocker_hash, censor_row=reason_row), None, None + ) + if claim.reposted_claim_hash: + repost = self._fs_get_claim_by_hash(claim.reposted_claim_hash) + if repost and repost.channel_hash and repost.signature_valid: + reposted_channel = self._fs_get_claim_by_hash(repost.channel_hash) + */ + var repost *ResolveResult = nil + var repostedChannel *ResolveResult = nil + + if resolvedStream != nil || resolvedChannel != nil { + var claim *ResolveResult = nil + var claimHash []byte = nil + var respostedClaimHash []byte = nil + var blockerHash []byte = nil + if resolvedStream != nil { + claim = resolvedStream + claimHash = resolvedStream.ClaimHash + respostedClaimHash = resolvedStream.RepostedClaimHash + } else { + claim = resolvedChannel + claimHash = resolvedChannel.ClaimHash + } + blockerHash, _, err = GetBlockerHash(db, claimHash, respostedClaimHash, claim.ChannelHash) + if err != nil { + res.Channel = &optionalResolveResultOrError{ + err: &ResolveError{err}, + } + return res + } + if blockerHash != nil { + reasonRow, err := FsGetClaimByHash(db, blockerHash) + if err != nil { + res.Channel = &optionalResolveResultOrError{ + err: &ResolveError{err}, + } + return res + } + res.Channel = &optionalResolveResultOrError{ + err: &ResolveError{fmt.Errorf("%s, %v, %v", url, blockerHash, reasonRow)}, + } + return res + } + if claim.RepostedClaimHash != nil { + repost, err = FsGetClaimByHash(db, claim.RepostedClaimHash) + if err != nil { + res.Channel = &optionalResolveResultOrError{ + err: &ResolveError{err}, + } + return res + } + if repost != nil && repost.ChannelHash != nil && repost.SignatureValid { + repostedChannel, err = FsGetClaimByHash(db, repost.ChannelHash) + if err != nil { + res.Channel = &optionalResolveResultOrError{ + err: &ResolveError{err}, + } + return res + } + } } } @@ -810,6 +1113,12 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult { res.Stream = &optionalResolveResultOrError{ res: resolvedStream, } + res.Repost = &optionalResolveResultOrError{ + res: repost, + } + res.RepostedChannel = &optionalResolveResultOrError{ + res: repostedChannel, + } log.Printf("parsed: %+v\n", parsed) return res @@ -828,9 +1137,13 @@ func (opts *IterOptions) ReadRow(ch chan *prefixes.PrefixRowKV, prevKey *[]byte) var outValue interface{} = nil var err error = nil - // We need to check the current key is we're not including the stop + // log.Println("keyData:", keyData) + // log.Println("valueData:", valueData) + + // We need to check the current key if we're not including the stop // key. if !opts.IncludeStop && opts.StopIteration(keyData) { + log.Println("returning false") return false } @@ -865,11 +1178,13 @@ func (opts *IterOptions) ReadRow(ch chan *prefixes.PrefixRowKV, prevKey *[]byte) key.Free() value.Free() + // log.Println("sending to channel") ch <- &prefixes.PrefixRowKV{ Key: outKey, Value: outValue, } *prevKey = newKeyData + // log.Println("*prevKey:", *prevKey) return true } @@ -885,6 +1200,7 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { it.Seek(opts.Prefix) if opts.Start != nil { + log.Println("Seeking to start") it.Seek(opts.Start) } @@ -894,13 +1210,17 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { var prevKey []byte = nil if !opts.IncludeStart { + log.Println("Not including start") it.Next() } if !it.Valid() && opts.IncludeStop { + log.Println("Not valid, but including stop") opts.ReadRow(ch, &prevKey) } - for ; !opts.StopIteration(prevKey) && it.Valid(); it.Next() { - opts.ReadRow(ch, &prevKey) + var continueIter bool = true + for ; continueIter && !opts.StopIteration(prevKey) && it.Valid(); it.Next() { + //log.Println("Main loop") + continueIter = opts.ReadRow(ch, &prevKey) } }() @@ -954,7 +1274,7 @@ func GetWriteDBCF(name string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, er return nil, nil, err } cfOpts := make([]*grocksdb.Options, len(cfNames)) - for i, _ := range cfNames { + for i := range cfNames { cfOpts[i] = cfOpt } db, handles, err := grocksdb.OpenDbColumnFamilies(opts, name, cfNames, cfOpts) @@ -994,9 +1314,13 @@ func GetDBColumnFamlies(name string, cfNames []string) (*ReadOnlyDBColumnFamily, } myDB := &ReadOnlyDBColumnFamily{ - DB: db, - Handles: handlesMap, - Opts: roOpts, + DB: db, + Handles: handlesMap, + Opts: roOpts, + BlockedStreams: make(map[string][]byte), + BlockedChannels: make(map[string][]byte), + FilteredStreams: make(map[string][]byte), + FilteredChannels: make(map[string][]byte), } err = InitTxCounts(myDB) diff --git a/db/db_test.go b/db/db_test.go index fcecbfb..806c1e1 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -13,6 +13,11 @@ import ( "github.com/linxGnu/grocksdb" ) +//////////////////////////////////////////////////////////////////////////////// +// Utility functions for testing +//////////////////////////////////////////////////////////////////////////////// + +// OpenAndFillTmpDBColumnFamlies opens a db and fills it with data from a csv file using the given column family names func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFamily, [][]string, func(), error) { log.Println(filePath) @@ -77,6 +82,8 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami return myDB, records, toDefer, nil } +// OpenAndFillTmpDBCF opens a db and fills it with data from a csv file +// using the given column family handle. Old version, should probably remove. func OpenAndFillTmpDBCF(filePath string) (*grocksdb.DB, [][]string, func(), *grocksdb.ColumnFamilyHandle, error) { log.Println(filePath) @@ -123,6 +130,8 @@ func OpenAndFillTmpDBCF(filePath string) (*grocksdb.DB, [][]string, func(), *gro return db, records, toDefer, handle, nil } +// OpenAndFillTmpDB opens a db and fills it with data from a csv file. +// Old funciont, should probably remove. func OpenAndFillTmpDB(filePath string) (*grocksdb.DB, [][]string, func(), error) { log.Println(filePath) @@ -165,6 +174,8 @@ func OpenAndFillTmpDB(filePath string) (*grocksdb.DB, [][]string, func(), error) return db, records, toDefer, nil } +// CatCSV Reads a csv version of the db and prints it to stdout, +// while decoding types. func CatCSV(filePath string) { log.Println(filePath) file, err := os.Open(filePath) @@ -197,12 +208,56 @@ func CatCSV(filePath string) { } } -func TestOpenFullDB(t *testing.T) { - url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c" +func TestCatFullDB(t *testing.T) { + // url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c" + // "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1" + // url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9" + // url := "lbry://@lbry" + // url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a" dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" - prefixes := prefixes.GetPrefixes() + prefixNames := prefixes.GetPrefixes() cfNames := []string{"default", "e", "d", "c"} - for _, prefix := range prefixes { + for _, prefix := range prefixNames { + cfName := string(prefix) + cfNames = append(cfNames, cfName) + } + db, err := dbpkg.GetDBColumnFamlies(dbPath, cfNames) + toDefer := func() { + db.DB.Close() + err = os.RemoveAll("./asdf") + if err != nil { + log.Println(err) + } + } + defer toDefer() + if err != nil { + t.Error(err) + return + } + ch := dbpkg.ClaimShortIdIter(db, "@lbry", "") + for row := range ch { + key := row.Key.(*prefixes.ClaimShortIDKey) + val := row.Value.(*prefixes.ClaimShortIDValue) + log.Printf("%#v, %#v\n", key, val) + } +} + +//////////////////////////////////////////////////////////////////////////////// +// End utility functions +//////////////////////////////////////////////////////////////////////////////// + +// TestOpenFullDB Tests running a resolve on a full db. +func TestOpenFullDB(t *testing.T) { + // url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c" + // "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1" + // url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9" + // url := "lbry://@lbry" + // url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a" + url := "lbry://@lbry$1" + dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" + prefixNames := prefixes.GetPrefixes() + cfNames := []string{"default", "e", "d", "c"} + for _, prefix := range prefixNames { cfName := string(prefix) cfNames = append(cfNames, cfName) } @@ -220,7 +275,7 @@ func TestOpenFullDB(t *testing.T) { return } expandedResolveResult := dbpkg.Resolve(db, url) - log.Println(expandedResolveResult) + log.Printf("expandedResolveResult: %#v\n", expandedResolveResult) } // FIXME: Needs new data format @@ -236,11 +291,35 @@ func TestResolve(t *testing.T) { log.Println(expandedResolveResult) } +// TestPrintClaimShortId Utility function to cat the ClaimShortId csv func TestPrintClaimShortId(t *testing.T) { filePath := "../testdata/F_cat.csv" CatCSV(filePath) } +// TestGetShortClaimIdUrl tests resolving a claim to a short url. +func TestGetShortClaimIdUrl(t *testing.T) { + // &{[70] cat 0 2104436 0} + name := "cat" + normalName := "cat" + claimHash := []byte{} + var rootTxNum uint32 = 2104436 + var position uint16 = 0 + filePath := "../testdata/F_cat.csv" + db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + if err != nil { + t.Error(err) + } + defer toDefer() + shortUrl, err := dbpkg.GetShortClaimIdUrl(db, name, normalName, claimHash, rootTxNum, position) + if err != nil { + t.Error(err) + } + log.Println(shortUrl) +} + +// TestClaimShortIdIter Tests the function to get an iterator of ClaimShortIds +// with a noramlized name and a partial claim id. func TestClaimShortIdIter(t *testing.T) { filePath := "../testdata/F_cat.csv" normalName := "cat" @@ -248,7 +327,6 @@ func TestClaimShortIdIter(t *testing.T) { db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) if err != nil { t.Error(err) - return } defer toDefer() @@ -263,11 +341,39 @@ func TestClaimShortIdIter(t *testing.T) { } } +// TestPrintTXOToCLaim Utility function to cat the TXOToClaim csv. +func TestPrintTXOToClaim(t *testing.T) { + filePath := "../testdata/G_2.csv" + CatCSV(filePath) +} + +// TestGetTXOToClaim Tests getting a claim hash from the db given +// a txNum and position. +func TestGetTXOToClaim(t *testing.T) { + //&{[71] 1456296 0} + var txNum uint32 = 1456296 + var position uint16 = 0 + filePath := "../testdata/G_2.csv" + db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + if err != nil { + t.Error(err) + } + defer toDefer() + val, err := dbpkg.GetCachedClaimHash(db, txNum, position) + if err != nil { + t.Error(err) + } else if val.Name != "one" { + t.Error(err) + } +} + +// TestPrintClaimToTXO Utility function to cat the ClaimToTXO csv. func TestPrintClaimToTXO(t *testing.T) { filePath := "../testdata/E_2.csv" CatCSV(filePath) } +// TestGetClaimToTXO Tests getting a ClaimToTXO value from the db. func TestGetClaimToTXO(t *testing.T) { claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a" claimHash, err := hex.DecodeString(claimHashStr) @@ -290,11 +396,14 @@ func TestGetClaimToTXO(t *testing.T) { log.Println(res) } +// TestPrintClaimTakeover Utility function to cat the ClaimTakeover csv. func TestPrintClaimTakeover(t *testing.T) { filePath := "../testdata/P_cat.csv" CatCSV(filePath) } +// TestGetControlingClaim Tests getting a controlling claim value from the db +// based on a name. func TestGetControllingClaim(t *testing.T) { filePath := "../testdata/P_cat.csv" db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) @@ -311,6 +420,7 @@ func TestGetControllingClaim(t *testing.T) { log.Println(res) } +// TestIter Tests the db iterator. Probably needs data format updated. func TestIter(t *testing.T) { filePath := "../testdata/W.csv" diff --git a/db/prefixes/prefixes.go b/db/prefixes/prefixes.go index 03e8a3d..a4f99ed 100644 --- a/db/prefixes/prefixes.go +++ b/db/prefixes/prefixes.go @@ -1471,6 +1471,14 @@ type TXOToClaimValue struct { Name string `json:"name"` } +func NewTXOToClaimKey(txNum uint32, position uint16) *TXOToClaimKey { + return &TXOToClaimKey{ + Prefix: []byte{TXOToClaim}, + TxNum: txNum, + Position: position, + } +} + func (k *TXOToClaimKey) PackKey() []byte { prefixLen := 1 // b'>LH' diff --git a/main.go b/main.go index b44401f..4fd3063 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,9 @@ import ( "context" "fmt" "log" + "os" + "os/signal" + "syscall" "time" "github.com/lbryio/hub/db" @@ -27,9 +30,27 @@ func main() { ctxWCancel, cancel := context.WithCancel(ctx) defer cancel() + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM) + defer signal.Stop(interrupt) + s := server.MakeHubServer(ctxWCancel, args) s.Run() + select { + case <-interrupt: + break + case <-ctx.Done(): + break + } + + log.Println("Shutting down server...") + + s.EsClient.Stop() + s.GrpcServer.GracefulStop() + + log.Println("Returning from main...") + return } else if args.CmdType == server.DBCmd { options := &db.IterOptions{ @@ -89,21 +110,21 @@ func main() { return } else if args.CmdType == server.DBCmd3 { - var rawPrefix byte = prefixes.ClaimShortIdPrefix + var rawPrefix byte = prefixes.TXOToClaim prefix := []byte{rawPrefix} columnFamily := string(prefix) - start := &prefixes.ClaimShortIDKey{ - Prefix: []byte{prefixes.ClaimShortIdPrefix}, - NormalizedName: "cat", - PartialClaimId: "", - RootTxNum: 0, - RootPosition: 0, - } - startRaw := prefixes.ClaimShortIDKeyPackPartial(start, 1) + // start := &prefixes.ClaimShortIDKey{ + // Prefix: []byte{prefixes.ClaimShortIdPrefix}, + // NormalizedName: "cat", + // PartialClaimId: "", + // RootTxNum: 0, + // RootPosition: 0, + // } + // startRaw := prefixes.ClaimShortIDKeyPackPartial(start, 1) options := &db.IterOptions{ FillCache: false, Prefix: prefix, - Start: startRaw, + Start: nil, Stop: nil, IncludeStart: true, IncludeStop: false, @@ -120,42 +141,9 @@ func main() { options.CfHandle = handles[1] - db.ReadWriteRawNColumnFamilies(dbVal, options, fmt.Sprintf("./testdata/%s_cat.csv", columnFamily), 10) + db.ReadWriteRawNColumnFamilies(dbVal, options, fmt.Sprintf("./testdata/%s_2.csv", columnFamily), 10) return } - // } else if args.CmdType == server.DBCmd4 { - // var rawPrefix byte = prefixes.TxCount - - // prefix := []byte{rawPrefix} - // columnFamily := string(prefix) - // options := &db.IterOptions{ - // FillCache: false, - // Prefix: prefix, - // Start: nil, - // Stop: nil, - // IncludeStart: true, - // IncludeStop: false, - // IncludeKey: true, - // IncludeValue: true, - // RawKey: true, - // RawValue: true, - // } - - // dbVal, handles, err := db.GetDBCF("/mnt/d/data/snapshot_1072108/lbry-rocksdb/", columnFamily) - // if err != nil { - // log.Fatalln(err) - // } - - // options.CfHandle = handles[1] - // var n = 10 - // if bytes.Equal(prefix, []byte{prefixes.Undo}) || bytes.Equal(prefix, []byte{prefixes.DBState}) { - // n = 1 - // } - - // db.ReadWriteRawNCF(dbVal, options, fmt.Sprintf("./testdata/%s.csv", columnFamily), n) - - // return - // } conn, err := grpc.Dial("localhost:"+args.Port, grpc.WithInsecure(), diff --git a/testdata/G_2.csv b/testdata/G_2.csv new file mode 100644 index 0000000..b527e7b --- /dev/null +++ b/testdata/G_2.csv @@ -0,0 +1,11 @@ +G,, +G,4700162aa70000,c78ac4c326cd43cdc0c844b7cea13659449ab3e40015746573742d70686f746f2d7374726173626f757267 +G,4700162f600000,ebf95f7fdb89db5467bb1b88ea3b0f0f7ee5ce360003636e63 +G,47001630960000,a6f91a86837ab84a4cf0d2dcbe94704a528cf820000f776f6e646572776f6d616e31393933 +G,47001635e60000,9673cc2a1aac64d7b2742705abfb09fca30d7e0500056d6d61736b +G,47001638a80000,c39342066646dc50f1a9954b41684d157b035dac00036f6e65 +G,47001645ef0001,4689c1ccb4420309f93ab98799b28c49fa4d3809000a65617379737472656574 +G,470016529a0000,f1628d66ae52295590b72b9a0b3a3527642a532600137465737470756230332d32312d323031372d32 +G,470016529d0000,a4c61ced261ab571bdb3410ae140bec6c31f14ce00117465737470756230332d32312d32303137 +G,47001655960000,2327bcb6d7578a2669e416b5aa185fe14ee8e03e00056569676874 +G,47001664200000,f69099600bdca9b062ba60432dba3c0ca2241167002c686973746f72792d6f662d6672696564726963682d69692d6f662d707275737369612d766f6c756d652d3137