diff --git a/db/db.go b/db/db.go index 352785f..8605c48 100644 --- a/db/db.go +++ b/db/db.go @@ -17,6 +17,10 @@ import ( "github.com/linxGnu/grocksdb" ) +// +// Constants +// + const ( nOriginalClaimExpirationTime = 262974 nExtendedClaimExpirationTime = 2102400 @@ -30,11 +34,16 @@ const ( maxTakeoverDelay = 4032 ) +// +// Types and constructors, getters, setters, etc. +// + type ReadOnlyDBColumnFamily struct { DB *grocksdb.DB Handles map[string]*grocksdb.ColumnFamilyHandle Opts *grocksdb.ReadOptions TxCounts []uint32 + Height uint32 BlockedStreams map[string][]byte BlockedChannels map[string][]byte FilteredStreams map[string][]byte @@ -49,18 +58,18 @@ type ResolveResult struct { Position uint16 TxHash []byte Height uint32 - Amount int + Amount uint64 ShortUrl string IsControlling bool CanonicalUrl string CreationHeight uint32 ActivationHeight uint32 ExpirationHeight uint32 - EffectiveAmount int - SupportAmount int + EffectiveAmount uint64 + SupportAmount uint64 Reposted int - LastTakeoverHeight int - ClaimsInChannel int + LastTakeoverHeight uint32 + ClaimsInChannel uint32 ChannelHash []byte RepostedClaimHash []byte SignatureValid bool @@ -183,25 +192,6 @@ func (o *IterOptions) WithRawValue(rawValue bool) *IterOptions { return o } -func (o *IterOptions) StopIteration(key []byte) bool { - if key == nil { - return false - } - - maxLen := int(math.Min(float64(len(key)), float64(len(o.Stop)))) - if o.Stop != nil && - (bytes.HasPrefix(key, o.Stop) || bytes.Compare(o.Stop, key[:maxLen]) < 0) { - return true - } else if o.Start != nil && - bytes.Compare(o.Start, key[:len(o.Start)]) > 0 { - return true - } else if o.Prefix != nil && !bytes.HasPrefix(key, o.Prefix) { - return true - } - - return false -} - type PathSegment struct { name string claimId string @@ -229,22 +219,6 @@ func (ps *PathSegment) String() string { return ps.name } -type URL struct { - stream *PathSegment - channel *PathSegment -} - -func NewURL() *URL { - return &URL{ - stream: nil, - channel: nil, - } -} - -func ParseURL(url string) (parsed *URL, err error) { - return NewURL(), nil -} - // BisectRight returns the index of the first element in the list that is greater than or equal to the value. func BisectRight(arr []uint32, val uint32) uint32 { i := sort.Search(len(arr), func(i int) bool { return arr[i] >= val }) @@ -265,177 +239,31 @@ func GetExpirationHeightFull(lastUpdatedHeight uint32, extended bool) uint32 { return lastUpdatedHeight + nExtendedClaimExpirationTime } -// PrepareResolveResult prepares a ResolveResult for use. -func PrepareResolveResult( - db *ReadOnlyDBColumnFamily, - txNum uint32, - position uint16, - claimHash []byte, - name string, - rootTxNum uint32, - rootPosition uint16, - activationHeight uint32, - signatureValid bool) (*ResolveResult, error) { +// +// DB Get functions +// - normalizedName := util.NormalizeName(name) - controllingClaim, err := GetControllingClaim(db, normalizedName) - if err != nil { - return nil, err - } - /* - tx_hash = self.get_tx_hash(tx_num) - */ - txHash, err := GetTxHash(db, txNum) - if err != nil { - return nil, err - } - /* - height = bisect_right(self.tx_counts, tx_num) - created_height = bisect_right(self.tx_counts, root_tx_num) - last_take_over_height = controlling_claim.height - */ - // FIXME: Actually do this - height := BisectRight(db.TxCounts, txNum) - createdHeight := BisectRight(db.TxCounts, rootTxNum) - lastTakeoverHeight := controllingClaim.Height +func GetClaimsInChannelCount(db *ReadOnlyDBColumnFamily, channelHash []byte) (uint32, error) { + prefix := []byte{prefixes.ChannelCount} + handle := db.Handles[string(prefix)] + key := prefixes.NewChannelCountKey(channelHash) + rawKey := key.PackKey() - /* - expiration_height = self.coin.get_expiration_height(height) - support_amount = self.get_support_amount(claim_hash) - claim_amount = self.get_cached_claim_txo(claim_hash).amount - */ - expirationHeight := GetExpirationHeight(height) - supportAmount, err := GetSupportAmount(db, claimHash) + slice, err := db.DB.GetCF(db.Opts, handle, rawKey) if err != nil { - return nil, err + return 0, err + } else if slice.Size() == 0 { + return 0, nil } - claimToTxo, err := GetCachedClaimTxo(db, claimHash) - if err != nil { - return nil, err - } - claimAmount := claimToTxo.Amount - /* - effective_amount = self.get_effective_amount(claim_hash) - channel_hash = self.get_channel_for_claim(claim_hash, tx_num, position) - reposted_claim_hash = self.get_repost(claim_hash) - short_url = self.get_short_claim_id_url(name, normalized_name, claim_hash, root_tx_num, root_position) - canonical_url = short_url - claims_in_channel = self.get_claims_in_channel_count(claim_hash) + rawValue := make([]byte, len(slice.Data())) + copy(rawValue, slice.Data()) + value := prefixes.ChannelCountValueUnpack(rawValue) - */ - effectiveAmount, err := GetEffectiveAmount(db, claimHash, false) - if err != nil { - return nil, err - } - channelHash, err := GetChannelForClaim(db, claimHash, txNum, position) - if err != nil { - return nil, err - } - repostedClaimHash, err := GetRepost(db, claimHash) - if err != nil { - return nil, err - } - shortUrl, err := GetShortClaimIdUrl(db, name, normalizedName, claimHash, txNum, rootPosition) - if err != nil { - return nil, err - } - var canonicalUrl string = shortUrl - claimsInChannel, err := GetClaimsInChannelCount(db, claimHash) - if err != nil { - return nil, err - } - /* - if channel_hash: - channel_vals = self.get_cached_claim_txo(channel_hash) - if channel_vals: - channel_short_url = self.get_short_claim_id_url( - channel_vals.name, channel_vals.normalized_name, channel_hash, channel_vals.root_tx_num, - channel_vals.root_position - ) - canonical_url = f'{channel_short_url}/{short_url}' - - */ - if channelHash != nil { - //FIXME - // Ignore error because we already have this set if this doesn't work - channelVals, _ := GetCachedClaimTxo(db, channelHash) - if channelVals != nil { - channelShortUrl, _ := GetShortClaimIdUrl(db, channelVals.Name, channelVals.NormalizedName(), channelHash, channelVals.RootTxNum, channelVals.RootPosition) - canonicalUrl = fmt.Sprintf("%s/%s", channelShortUrl, shortUrl) - } - } - reposted, err := GetRepostedCount(db, claimHash) - if err != nil { - return nil, err - } - return &ResolveResult{ - Name: name, - NormalizedName: normalizedName, - ClaimHash: claimHash, - TxNum: txNum, - Position: position, - TxHash: txHash, - Height: height, - Amount: int(claimAmount), - ShortUrl: shortUrl, - IsControlling: bytes.Equal(controllingClaim.ClaimHash, claimHash), - CanonicalUrl: canonicalUrl, - CreationHeight: createdHeight, - ActivationHeight: activationHeight, - ExpirationHeight: expirationHeight, - EffectiveAmount: int(effectiveAmount), - SupportAmount: int(supportAmount), - Reposted: reposted, - LastTakeoverHeight: int(lastTakeoverHeight), - ClaimsInChannel: claimsInChannel, - ChannelHash: channelHash, - RepostedClaimHash: repostedClaimHash, - SignatureValid: signatureValid, - }, nil - /* - - return ResolveResult( - name, normalized_name, claim_hash, tx_num, position, tx_hash, height, claim_amount, short_url=short_url, - is_controlling=controlling_claim.claim_hash == claim_hash, canonical_url=canonical_url, - last_takeover_height=last_take_over_height, claims_in_channel=claims_in_channel, - creation_height=created_height, activation_height=activation_height, - expiration_height=expiration_height, effective_amount=effective_amount, support_amount=support_amount, - channel_hash=channel_hash, reposted_claim_hash=reposted_claim_hash, - reposted=self.get_reposted_count(claim_hash), - signature_valid=None if not channel_hash else signature_valid - ) - - - */ - // return nil, nil -} - -func GetClaimsInChannelCount(db *ReadOnlyDBColumnFamily, channelHash []byte) (int, error) { - /* - def get_claims_in_channel_count(self, channel_hash) -> int: - channel_count_val = self.prefix_db.channel_count.get(channel_hash) - if channel_count_val is None: - return 0 - return channel_count_val.count - */ - return 0, nil + return value.Count, nil } func GetShortClaimIdUrl(db *ReadOnlyDBColumnFamily, name string, normalizedName string, claimHash []byte, rootTxNum uint32, rootPosition uint16) (string, error) { - /* - def get_short_claim_id_url(self, name: str, normalized_name: str, claim_hash: bytes, - root_tx_num: int, root_position: int) -> str: - claim_id = claim_hash.hex() - for prefix_len in range(10): - for k in self.prefix_db.claim_short_id.iterate(prefix=(normalized_name, claim_id[:prefix_len+1]), - include_value=False): - if k.root_tx_num == root_tx_num and k.root_position == root_position: - return f'{name}#{k.partial_claim_id}' - break - print(f"{claim_id} has a collision") - return f'{name}#{claim_id}' - */ prefix := []byte{prefixes.ClaimShortIdPrefix} handle := db.Handles[string(prefix)] claimId := hex.EncodeToString(claimHash) @@ -446,8 +274,8 @@ func GetShortClaimIdUrl(db *ReadOnlyDBColumnFamily, name string, normalizedName j = claimIdLen } partialClaimId := claimId[:j] - key := prefixes.NewClaimShortIDKey(normalizedName, partialClaimId) - keyPrefix := prefixes.ClaimShortIDKeyPackPartial(key, 2) + partialKey := prefixes.NewClaimShortIDKey(normalizedName, partialClaimId) + keyPrefix := prefixes.ClaimShortIDKeyPackPartial(partialKey, 2) // Prefix and handle options := NewIterateOptions().WithPrefix(prefix).WithCfHandle(handle) // Start and stop bounds @@ -456,45 +284,56 @@ func GetShortClaimIdUrl(db *ReadOnlyDBColumnFamily, name string, normalizedName options = options.WithIncludeValue(false) ch := IterCF(db.DB, options) - for row := range ch { - key := row.Key.(*prefixes.ClaimShortIDKey) - if key.RootTxNum == rootTxNum && key.RootPosition == rootPosition { - return fmt.Sprintf("%s#%s", name, key.PartialClaimId), nil - } - break + row := <-ch + key := row.Key.(*prefixes.ClaimShortIDKey) + if key.RootTxNum == rootTxNum && key.RootPosition == rootPosition { + return fmt.Sprintf("%s#%s", name, key.PartialClaimId), nil } } return "", nil } func GetRepost(db *ReadOnlyDBColumnFamily, claimHash []byte) ([]byte, error) { - /* - def get_repost(self, claim_hash) -> Optional[bytes]: - repost = self.prefix_db.repost.get(claim_hash) - if repost: - return repost.reposted_claim_hash - return - */ - return nil, nil + prefix := []byte{prefixes.Repost} + handle := db.Handles[string(prefix)] + key := prefixes.NewRepostKey(claimHash) + rawKey := key.PackKey() + slice, err := db.DB.GetCF(db.Opts, handle, rawKey) + if err != nil { + return nil, err + } else if slice.Size() == 0 { + return nil, nil + } + + rawValue := make([]byte, len(slice.Data())) + copy(rawValue, slice.Data()) + value := prefixes.RepostValueUnpack(rawValue) + return value.RepostedClaimHash, nil } func GetRepostedCount(db *ReadOnlyDBColumnFamily, claimHash []byte) (int, error) { - /* - def get_reposted_count(self, claim_hash: bytes) -> int: - return sum( - 1 for _ in self.prefix_db.reposted_claim.iterate(prefix=(claim_hash,), include_value=False) - ) - */ - return 0, nil + prefix := []byte{prefixes.RepostedClaim} + handle := db.Handles[string(prefix)] + key := prefixes.NewRepostedKey(claimHash) + keyPrefix := prefixes.RepostedKeyPackPartial(key, 1) + // Prefix and handle + options := NewIterateOptions().WithPrefix(prefix).WithCfHandle(handle) + // Start and stop bounds + options = options.WithStart(keyPrefix) + // Don't include the key + options = options.WithIncludeValue(false) + + var i int = 0 + ch := IterCF(db.DB, options) + + for range ch { + i++ + } + + return i, nil } func GetChannelForClaim(db *ReadOnlyDBColumnFamily, claimHash []byte, txNum uint32, position uint16) ([]byte, error) { - /* - def get_channel_for_claim(self, claim_hash, tx_num, position) -> Optional[bytes]: - v = self.prefix_db.claim_to_channel.get(claim_hash, tx_num, position) - if v: - return v.signing_hash - */ key := prefixes.NewClaimToChannelKey(claimHash, txNum, position) cfName := string(prefixes.ClaimToChannel) handle := db.Handles[cfName] @@ -505,6 +344,7 @@ func GetChannelForClaim(db *ReadOnlyDBColumnFamily, claimHash []byte, txNum uint } else if slice.Size() == 0 { return nil, nil } + rawValue := make([]byte, len(slice.Data())) copy(rawValue, slice.Data()) value := prefixes.ClaimToChannelValueUnpack(rawValue) @@ -545,36 +385,24 @@ func GetActiveAmount(db *ReadOnlyDBColumnFamily, claimHash []byte, txoType uint8 } func GetEffectiveAmount(db *ReadOnlyDBColumnFamily, claimHash []byte, supportOnly bool) (uint64, error) { - /* - def get_effective_amount(self, claim_hash: bytes, support_only=False) -> int: - support_amount = self._get_active_amount(claim_hash, ACTIVATED_SUPPORT_TXO_TYPE, self.db_height + 1) - if support_only: - return support_only - return support_amount + self._get_active_amount(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, self.db_height + 1) - */ - // key := prefixes.NewSupportAmountKey(claimHash) - // cfName := string(prefixes.ActiveAmount) - // handle := db.Handles[cfName] - // rawKey := key.PackKey() - // slice, err := db.DB.GetCF(db.Opts, handle, rawKey) - // if err != nil { - // return 0, err - // } else if slice == nil { - // return 0, nil - // } - // rawValue := make([]byte, len(slice.Data())) - // copy(rawValue, slice.Data()) - // value := prefixes.SupportAmountValueUnpack(rawValue) - return 0, nil + supportAmount, err := GetActiveAmount(db, claimHash, prefixes.ACTIVATED_SUPPORT_TXO_TYPE, db.Height+1) + if err != nil { + return 0, err + } + + if supportOnly { + return supportAmount, nil + } + + activationAmount, err := GetActiveAmount(db, claimHash, prefixes.ACTIVATED_CLAIM_TXO_TYPE, db.Height+1) + if err != nil { + return 0, err + } + + return activationAmount + supportAmount, nil } func GetSupportAmount(db *ReadOnlyDBColumnFamily, claimHash []byte) (uint64, error) { - /* - support_amount_val = self.prefix_db.support_amount.get(claim_hash) - if support_amount_val is None: - return 0 - return support_amount_val.amount - */ key := prefixes.NewSupportAmountKey(claimHash) cfName := string(prefixes.SupportAmount) handle := db.Handles[cfName] @@ -585,6 +413,7 @@ func GetSupportAmount(db *ReadOnlyDBColumnFamily, claimHash []byte) (uint64, err } else if slice.Size() == 0 { return 0, nil } + rawValue := make([]byte, len(slice.Data())) copy(rawValue, slice.Data()) value := prefixes.SupportAmountValueUnpack(rawValue) @@ -606,6 +435,10 @@ func GetTxHash(db *ReadOnlyDBColumnFamily, txNum uint32) ([]byte, error) { if err != nil { return nil, err } + if slice.Size() == 0 { + return nil, nil + } + rawValue := make([]byte, len(slice.Data())) copy(rawValue, slice.Data()) return rawValue, nil @@ -616,15 +449,6 @@ func GetActivation(db *ReadOnlyDBColumnFamily, txNum uint32, postition uint16) ( } func GetActivationFull(db *ReadOnlyDBColumnFamily, txNum uint32, postition uint16, isSupport bool) (uint32, error) { - /* - def get_activation(self, tx_num, position, is_support=False) -> int: - activation = self.prefix_db.activated.get( - ACTIVATED_SUPPORT_TXO_TYPE if is_support else ACTIVATED_CLAIM_TXO_TYPE, tx_num, position - ) - if activation: - return activation.height - return -1 - */ var typ uint8 if isSupport { typ = prefixes.ACTIVATED_SUPPORT_TXO_TYPE @@ -657,6 +481,10 @@ func GetCachedClaimTxo(db *ReadOnlyDBColumnFamily, claim []byte) (*prefixes.Clai if err != nil { return nil, err } + if slice.Size() == 0 { + return nil, nil + } + rawValue := make([]byte, len(slice.Data())) copy(rawValue, slice.Data()) value := prefixes.ClaimToTXOValueUnpack(rawValue) @@ -675,6 +503,7 @@ func GetControllingClaim(db *ReadOnlyDBColumnFamily, name string) (*prefixes.Cla if slice.Size() == 0 { return nil, nil } + rawValue := make([]byte, len(slice.Data())) copy(rawValue, slice.Data()) value := prefixes.ClaimTakeoverValueUnpack(rawValue) @@ -686,16 +515,13 @@ func FsGetClaimByHash(db *ReadOnlyDBColumnFamily, claimHash []byte) (*ResolveRes if err != nil { return nil, err } + activation, err := GetActivation(db, claim.TxNum, claim.Position) if err != nil { return nil, err } - /* - return self._prepare_resolve_result( - claim.tx_num, claim.position, claim_hash, claim.name, claim.root_tx_num, claim.root_position, - activation, claim.channel_signature_is_valid - ) - */ + + log.Printf("%#v\n%#v\n%#v\n", claim, hex.EncodeToString(claimHash), activation) return PrepareResolveResult( db, claim.TxNum, @@ -709,6 +535,24 @@ func FsGetClaimByHash(db *ReadOnlyDBColumnFamily, claimHash []byte) (*ResolveRes ) } +func GetDBState(db *ReadOnlyDBColumnFamily) (*prefixes.DBStateValue, error) { + prefix := []byte{prefixes.DBState} + key := prefixes.NewDBStateKey() + handle := db.Handles[string(prefix)] + rawKey := key.PackKey() + slice, err := db.DB.GetCF(db.Opts, handle, rawKey) + if err != nil { + return nil, err + } else if slice.Size() == 0 { + return nil, nil + } + + rawValue := make([]byte, len(slice.Data())) + copy(rawValue, slice.Data()) + value := prefixes.DBStateValueUnpack(rawValue) + return value, nil +} + func ClaimShortIdIter(db *ReadOnlyDBColumnFamily, normalizedName string, claimId string) <-chan *prefixes.PrefixRowKV { prefix := []byte{prefixes.ClaimShortIdPrefix} handle := db.Handles[string(prefix)] @@ -726,14 +570,6 @@ func ClaimShortIdIter(db *ReadOnlyDBColumnFamily, normalizedName string, claimId } func GetCachedClaimHash(db *ReadOnlyDBColumnFamily, txNum uint32, position uint16) (*prefixes.TXOToClaimValue, error) { - /* - if self._cache_all_claim_txos: - if tx_num not in self.txo_to_claim: - return - return self.txo_to_claim[tx_num].get(position, None) - v = self.prefix_db.txo_to_claim.get_pending(tx_num, position) - return None if not v else v.claim_hash - */ // TODO: implement cache key := prefixes.NewTXOToClaimKey(txNum, position) cfName := string(prefixes.TXOToClaim) @@ -742,136 +578,16 @@ func GetCachedClaimHash(db *ReadOnlyDBColumnFamily, txNum uint32, position uint1 slice, err := db.DB.GetCF(db.Opts, handle, rawKey) if err != nil { return nil, err + } else if slice.Size() == 0 { + return nil, nil } + rawValue := make([]byte, len(slice.Data())) copy(rawValue, slice.Data()) value := prefixes.TXOToClaimValueUnpack(rawValue) return value, nil } -func ResolveParsedUrl(db *ReadOnlyDBColumnFamily, parsed *PathSegment) (*ResolveResult, error) { - normalizedName := util.NormalizeName(parsed.name) - if (parsed.amountOrder == -1 && parsed.claimId == "") || parsed.amountOrder == 1 { - controlling, err := GetControllingClaim(db, normalizedName) - if err != nil { - return nil, err - } - if controlling == nil { - return nil, nil - } - return FsGetClaimByHash(db, controlling.ClaimHash) - } - - var amountOrder int = int(math.Max(float64(parsed.amountOrder), 1)) - - log.Println("amountOrder:", amountOrder) - - if parsed.claimId != "" { - if len(parsed.claimId) == 40 { - claimHash, err := hex.DecodeString(parsed.claimId) - if err != nil { - return nil, err - } - // Maybe don't use caching version, when I actually implement the cache - claimTxo, err := GetCachedClaimTxo(db, claimHash) - if err != nil { - return nil, err - } - if claimTxo == nil || claimTxo.NormalizedName() != normalizedName { - return nil, nil - } - activation, err := GetActivation(db, claimTxo.TxNum, claimTxo.Position) - if err != nil { - return nil, err - } - return PrepareResolveResult( - db, - claimTxo.TxNum, - claimTxo.Position, - claimHash, - claimTxo.Name, - claimTxo.RootTxNum, - claimTxo.RootPosition, - activation, - claimTxo.ChannelSignatureIsValid, - ) - } - log.Println("nomalizedName:", normalizedName) - log.Println("claimId:", parsed.claimId) - var j int = 10 - if len(parsed.claimId) < j { - j = len(parsed.claimId) - } - ch := ClaimShortIdIter(db, normalizedName, parsed.claimId[:j]) - log.Printf("ch: %#v\n", ch) - row := <-ch - key := row.Key.(*prefixes.ClaimShortIDKey) - claimTxo := row.Value.(*prefixes.ClaimShortIDValue) - log.Println("key:", key) - log.Println("claimTxo:", claimTxo) - fullClaimHash, err := GetCachedClaimHash(db, claimTxo.TxNum, claimTxo.Position) - log.Println("fullClaimHash:", fullClaimHash) - if err != nil { - return nil, err - } - c, err := GetCachedClaimTxo(db, fullClaimHash.ClaimHash) - log.Println("c:", c) - if err != nil { - return nil, err - } - nonNormalizedName := c.Name - signatureIsValid := c.ChannelSignatureIsValid - activation, err := GetActivation(db, claimTxo.TxNum, claimTxo.Position) - if err != nil { - return nil, err - } - return PrepareResolveResult( - db, - claimTxo.TxNum, - claimTxo.Position, - fullClaimHash.ClaimHash, - nonNormalizedName, - key.RootTxNum, - key.RootPosition, - activation, - signatureIsValid, - ) - } - - return nil, nil -} - -func ResolveClaimInChannel(db *ReadOnlyDBColumnFamily, claimHash []byte, normalizedName string) (*ResolveResult, error) { - return nil, nil -} - -/* -TODO: Implement blocking / filtering loading - async def reload_blocking_filtering_streams(self): - def reload(): - self.blocked_streams, self.blocked_channels = self.get_streams_and_channels_reposted_by_channel_hashes( - self.blocking_channel_hashes - ) - self.filtered_streams, self.filtered_channels = self.get_streams_and_channels_reposted_by_channel_hashes( - self.filtering_channel_hashes - ) - await asyncio.get_event_loop().run_in_executor(self._executor, reload) - - def get_streams_and_channels_reposted_by_channel_hashes(self, reposter_channel_hashes: Set[bytes]): - streams, channels = {}, {} - for reposter_channel_hash in reposter_channel_hashes: - for stream in self.prefix_db.channel_to_claim.iterate((reposter_channel_hash, ), include_key=False): - repost = self.get_repost(stream.claim_hash) - if repost: - txo = self.get_claim_txo(repost) - if txo: - if txo.normalized_name.startswith('@'): - channels[repost] = reposter_channel_hash - else: - streams[repost] = reposter_channel_hash - return streams, channels -*/ - // GetBlockerHash get the hash of the blocker or filterer of the claim. // TODO: this currently converts the byte arrays to strings, which is not // very efficient. Might want to figure out a better way to do this. @@ -914,6 +630,269 @@ func GetBlockerHash(db *ReadOnlyDBColumnFamily, claimHash, repostedClaimHash, ch return blockedHash, filteredHash, nil } +// +// Resolve functions +// + +// PrepareResolveResult prepares a ResolveResult to return +func PrepareResolveResult( + db *ReadOnlyDBColumnFamily, + txNum uint32, + position uint16, + claimHash []byte, + name string, + rootTxNum uint32, + rootPosition uint16, + activationHeight uint32, + signatureValid bool) (*ResolveResult, error) { + + normalizedName := util.NormalizeName(name) + controllingClaim, err := GetControllingClaim(db, normalizedName) + if err != nil { + return nil, err + } + + txHash, err := GetTxHash(db, txNum) + if err != nil { + return nil, err + } + + height := BisectRight(db.TxCounts, txNum) + createdHeight := BisectRight(db.TxCounts, rootTxNum) + lastTakeoverHeight := controllingClaim.Height + + expirationHeight := GetExpirationHeight(height) + + supportAmount, err := GetSupportAmount(db, claimHash) + if err != nil { + return nil, err + } + + claimToTxo, err := GetCachedClaimTxo(db, claimHash) + if err != nil { + return nil, err + } + claimAmount := claimToTxo.Amount + + effectiveAmount, err := GetEffectiveAmount(db, claimHash, false) + if err != nil { + return nil, err + } + + channelHash, err := GetChannelForClaim(db, claimHash, txNum, position) + if err != nil { + return nil, err + } + + repostedClaimHash, err := GetRepost(db, claimHash) + if err != nil { + return nil, err + } + + shortUrl, err := GetShortClaimIdUrl(db, name, normalizedName, claimHash, txNum, rootPosition) + if err != nil { + return nil, err + } + + var canonicalUrl string = shortUrl + claimsInChannel, err := GetClaimsInChannelCount(db, claimHash) + if err != nil { + return nil, err + } + + if channelHash != nil { + // Ignore error because we already have this set if this doesn't work + channelVals, _ := GetCachedClaimTxo(db, channelHash) + log.Printf("channelVals: %#v\n", channelVals) + if channelVals != nil { + channelShortUrl, _ := GetShortClaimIdUrl( + db, + channelVals.Name, + channelVals.NormalizedName(), + channelHash, channelVals.RootTxNum, + channelVals.RootPosition, + ) + canonicalUrl = fmt.Sprintf("%s/%s", channelShortUrl, shortUrl) + } + } + + reposted, err := GetRepostedCount(db, claimHash) + if err != nil { + return nil, err + } + + isControlling := bytes.Equal(controllingClaim.ClaimHash, claimHash) + + return &ResolveResult{ + Name: name, + NormalizedName: normalizedName, + ClaimHash: claimHash, + TxNum: txNum, + Position: position, + TxHash: txHash, + Height: height, + Amount: claimAmount, + ShortUrl: shortUrl, + IsControlling: isControlling, + CanonicalUrl: canonicalUrl, + CreationHeight: createdHeight, + ActivationHeight: activationHeight, + ExpirationHeight: expirationHeight, + EffectiveAmount: effectiveAmount, + SupportAmount: supportAmount, + Reposted: reposted, + LastTakeoverHeight: lastTakeoverHeight, + ClaimsInChannel: claimsInChannel, + ChannelHash: channelHash, + RepostedClaimHash: repostedClaimHash, + SignatureValid: signatureValid, + }, nil +} + +func ResolveParsedUrl(db *ReadOnlyDBColumnFamily, parsed *PathSegment) (*ResolveResult, error) { + normalizedName := util.NormalizeName(parsed.name) + if (parsed.amountOrder == -1 && parsed.claimId == "") || parsed.amountOrder == 1 { + controlling, err := GetControllingClaim(db, normalizedName) + if err != nil { + return nil, err + } + if controlling == nil { + return nil, nil + } + return FsGetClaimByHash(db, controlling.ClaimHash) + } + + var amountOrder int = int(math.Max(float64(parsed.amountOrder), 1)) + + log.Println("amountOrder:", amountOrder) + + if parsed.claimId != "" { + if len(parsed.claimId) == 40 { + claimHash, err := hex.DecodeString(parsed.claimId) + if err != nil { + return nil, err + } + + // Maybe don't use caching version, when I actually implement the cache + claimTxo, err := GetCachedClaimTxo(db, claimHash) + if err != nil { + return nil, err + } + + if claimTxo == nil || claimTxo.NormalizedName() != normalizedName { + return nil, nil + } + + activation, err := GetActivation(db, claimTxo.TxNum, claimTxo.Position) + if err != nil { + return nil, err + } + + return PrepareResolveResult( + db, + claimTxo.TxNum, + claimTxo.Position, + claimHash, + claimTxo.Name, + claimTxo.RootTxNum, + claimTxo.RootPosition, + activation, + claimTxo.ChannelSignatureIsValid, + ) + } + log.Println("nomalizedName:", normalizedName) + log.Println("claimId:", parsed.claimId) + var j int = 10 + if len(parsed.claimId) < j { + j = len(parsed.claimId) + } + + ch := ClaimShortIdIter(db, normalizedName, parsed.claimId[:j]) + row := <-ch + key := row.Key.(*prefixes.ClaimShortIDKey) + claimTxo := row.Value.(*prefixes.ClaimShortIDValue) + + fullClaimHash, err := GetCachedClaimHash(db, claimTxo.TxNum, claimTxo.Position) + if err != nil { + return nil, err + } + + c, err := GetCachedClaimTxo(db, fullClaimHash.ClaimHash) + if err != nil { + return nil, err + } + + nonNormalizedName := c.Name + signatureIsValid := c.ChannelSignatureIsValid + activation, err := GetActivation(db, claimTxo.TxNum, claimTxo.Position) + + if err != nil { + return nil, err + } + + return PrepareResolveResult( + db, + claimTxo.TxNum, + claimTxo.Position, + fullClaimHash.ClaimHash, + nonNormalizedName, + key.RootTxNum, + key.RootPosition, + activation, + signatureIsValid, + ) + } + + return nil, nil +} + +func ResolveClaimInChannel(db *ReadOnlyDBColumnFamily, channelHash []byte, normalizedName string) (*ResolveResult, error) { + prefix := []byte{prefixes.ChannelToClaim} + handle := db.Handles[string(prefix)] + key := prefixes.NewChannelToClaimKey(channelHash, normalizedName) + rawKeyPrefix := prefixes.ChannelToClaimKeyPackPartial(key, 2) + options := NewIterateOptions().WithCfHandle(handle).WithPrefix(rawKeyPrefix) + options = options.WithIncludeValue(true) //.WithIncludeStop(true) + ch := IterCF(db.DB, options) + // TODO: what's a good default size for this? + var candidates []*ResolveResult = make([]*ResolveResult, 0, 100) + var i = 0 + for row := range ch { + key := row.Key.(*prefixes.ChannelToClaimKey) + stream := row.Value.(*prefixes.ChannelToClaimValue) + effectiveAmount, err := GetEffectiveAmount(db, stream.ClaimHash, false) + if err != nil { + return nil, err + } + if i == 0 || candidates[i-1].Amount == effectiveAmount { + candidates = append( + candidates, + &ResolveResult{ + TxNum: key.TxNum, + Position: key.Position, + ClaimHash: stream.ClaimHash, + Amount: effectiveAmount, + ChannelHash: channelHash, + NormalizedName: normalizedName, + }, + ) + i++ + } else { + break + } + } + log.Printf("candidates: %#v\n", candidates) + if len(candidates) == 0 { + return nil, nil + } else { + // return list(sorted(candidates, key=lambda item: item[1]))[0] + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].Amount < candidates[j].Amount + }) + return candidates[0], nil + } +} + func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult { var res = &ExpandedResolveResult{ Stream: nil, @@ -983,11 +962,13 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult { log.Printf("resolvedChannel.TxHash: %s\n", hex.EncodeToString(resolvedChannel.TxHash)) log.Printf("resolvedChannel.ClaimHash: %s\n", hex.EncodeToString(resolvedChannel.ClaimHash)) log.Printf("resolvedChannel.ChannelHash: %s\n", hex.EncodeToString(resolvedChannel.ChannelHash)) - // s := - // log.Printf("resolvedChannel.TxHash: %s\n", hex.EncodeToString(sort.IntSlice(resolvedChannel.TxHash))) + log.Printf("stream %#v\n", stream) if stream != nil { if resolvedChannel != nil { streamClaim, err := ResolveClaimInChannel(db, resolvedChannel.ClaimHash, stream.Normalized()) + log.Printf("streamClaim %#v\n", streamClaim) + log.Printf("streamClaim.ClaimHash: %s\n", hex.EncodeToString(streamClaim.ClaimHash)) + log.Printf("streamClaim.ChannelHash: %s\n", hex.EncodeToString(streamClaim.ChannelHash)) // TODO: Confirm error case if err != nil { res.Stream = &optionalResolveResultOrError{ @@ -1033,26 +1014,10 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult { } } - /* - repost = None - reposted_channel = None - if resolved_stream or resolved_channel: - claim_hash = resolved_stream.claim_hash if resolved_stream else resolved_channel.claim_hash - claim = resolved_stream if resolved_stream else resolved_channel - reposted_claim_hash = resolved_stream.reposted_claim_hash if resolved_stream else None - - if blocker_hash: - reason_row = self._fs_get_claim_by_hash(blocker_hash) - return ExpandedResolveResult( - None, ResolveCensoredError(url, blocker_hash, censor_row=reason_row), None, None - ) - if claim.reposted_claim_hash: - repost = self._fs_get_claim_by_hash(claim.reposted_claim_hash) - if repost and repost.channel_hash and repost.signature_valid: - reposted_channel = self._fs_get_claim_by_hash(repost.channel_hash) - */ + // Getting blockers and filters var repost *ResolveResult = nil var repostedChannel *ResolveResult = nil + log.Printf("about to get blockers and filters: %#v, %#v\n", resolvedChannel, resolvedStream) if resolvedStream != nil || resolvedChannel != nil { var claim *ResolveResult = nil @@ -1068,6 +1033,7 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult { claimHash = resolvedChannel.ClaimHash } blockerHash, _, err = GetBlockerHash(db, claimHash, respostedClaimHash, claim.ChannelHash) + log.Printf("blockerHash: %s\n", hex.EncodeToString(blockerHash)) if err != nil { res.Channel = &optionalResolveResultOrError{ err: &ResolveError{err}, @@ -1124,6 +1090,29 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult { return res } +// +// Iterators / db construction functions +// + +func (o *IterOptions) StopIteration(key []byte) bool { + if key == nil { + return false + } + + maxLen := int(math.Min(float64(len(key)), float64(len(o.Stop)))) + if o.Stop != nil && + (bytes.HasPrefix(key, o.Stop) || bytes.Compare(o.Stop, key[:maxLen]) < 0) { + return true + } else if o.Start != nil && + bytes.Compare(o.Start, key[:len(o.Start)]) > 0 { + return true + } else if o.Prefix != nil && !bytes.HasPrefix(key, o.Prefix) { + return true + } + + return false +} + func (opts *IterOptions) ReadRow(ch chan *prefixes.PrefixRowKV, prevKey *[]byte) bool { it := opts.It key := it.Key() @@ -1321,6 +1310,8 @@ func GetDBColumnFamlies(name string, cfNames []string) (*ReadOnlyDBColumnFamily, BlockedChannels: make(map[string][]byte), FilteredStreams: make(map[string][]byte), FilteredChannels: make(map[string][]byte), + TxCounts: nil, + Height: 0, } err = InitTxCounts(myDB) @@ -1331,6 +1322,17 @@ func GetDBColumnFamlies(name string, cfNames []string) (*ReadOnlyDBColumnFamily, return myDB, nil } +// DetectChanges keep the rocksdb db in sync +func DetectChanges(db *ReadOnlyDBColumnFamily) error { + err := db.DB.TryCatchUpWithPrimary() + if err != nil { + log.Printf("error trying to catch up with primary: %#v", err) + return err + } + + return nil +} + func InitTxCounts(db *ReadOnlyDBColumnFamily) error { start := time.Now() handle, ok := db.Handles[string([]byte{prefixes.TxCount})] @@ -1338,8 +1340,8 @@ func InitTxCounts(db *ReadOnlyDBColumnFamily) error { return fmt.Errorf("TxCount prefix not found") } - //txCounts := make([]uint32, 1200000) - txCounts := make([]uint32, 100000) + //TODO: figure out a reasonable default and make it a constant + txCounts := make([]uint32, 0, 1200000) options := NewIterateOptions().WithPrefix([]byte{prefixes.TxCount}).WithCfHandle(handle) options = options.WithIncludeKey(false).WithIncludeValue(true) @@ -1347,7 +1349,6 @@ func InitTxCounts(db *ReadOnlyDBColumnFamily) error { ch := IterCF(db.DB, options) for txCount := range ch { - //log.Println(txCount) txCounts = append(txCounts, txCount.Value.(*prefixes.TxCountValue).TxCount) } @@ -1356,6 +1357,7 @@ func InitTxCounts(db *ReadOnlyDBColumnFamily) error { log.Println("Time to get txCounts:", duration) db.TxCounts = txCounts + db.Height = uint32(len(txCounts)) return nil } diff --git a/db/db_test.go b/db/db_test.go index 806c1e1..b4c15f4 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -209,6 +209,7 @@ func CatCSV(filePath string) { } func TestCatFullDB(t *testing.T) { + t.Skip("Skipping full db test") // url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c" // "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1" // url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9" @@ -248,12 +249,13 @@ func TestCatFullDB(t *testing.T) { // TestOpenFullDB Tests running a resolve on a full db. func TestOpenFullDB(t *testing.T) { + t.Skip("Skipping full db test") // url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c" // "lbry://@lbry", "lbry://@lbry#3", "lbry://@lbry3f", "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a", "lbry://@lbry:1", "lbry://@lbry$1" - // url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9" + url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9" // url := "lbry://@lbry" // url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a" - url := "lbry://@lbry$1" + // url := "lbry://@lbry$1" dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" prefixNames := prefixes.GetPrefixes() cfNames := []string{"default", "e", "d", "c"} @@ -291,6 +293,49 @@ func TestResolve(t *testing.T) { log.Println(expandedResolveResult) } +// TestGetDBState Tests reading the db state from rocksdb +func TestGetDBState(t *testing.T) { + filePath := "../testdata/s_resolve.csv" + want := uint32(1072108) + db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + if err != nil { + t.Error(err) + } + defer toDefer() + state, err := dbpkg.GetDBState(db) + if err != nil { + t.Error(err) + } + log.Printf("state: %#v\n", state) + if state.Height != want { + t.Errorf("Expected %d, got %d", want, state.Height) + } +} + +// TestPrintChannelCount Utility function to cat the ClaimShortId csv +func TestPrintChannelCount(t *testing.T) { + filePath := "../testdata/Z_resolve.csv" + CatCSV(filePath) +} + +func TestGetClaimsInChannelCount(t *testing.T) { + channelHash, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bd") + filePath := "../testdata/Z_resolve.csv" + want := uint32(3670) + db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + if err != nil { + t.Error(err) + } + defer toDefer() + count, err := dbpkg.GetClaimsInChannelCount(db, channelHash) + if err != nil { + t.Error(err) + } + if count != want { + t.Errorf("Expected %d, got %d", want, count) + } +} + // TestPrintClaimShortId Utility function to cat the ClaimShortId csv func TestPrintClaimShortId(t *testing.T) { filePath := "../testdata/F_cat.csv" diff --git a/db/prefixes/prefixes.go b/db/prefixes/prefixes.go index a4f99ed..16a3093 100644 --- a/db/prefixes/prefixes.go +++ b/db/prefixes/prefixes.go @@ -11,6 +11,7 @@ import ( "strings" "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/extras/util" ) const ( @@ -136,6 +137,12 @@ type DBStateValue struct { EsSyncHeight uint32 } +func NewDBStateKey() *DBStateKey { + return &DBStateKey{ + Prefix: []byte{DBState}, + } +} + func (k *DBStateKey) PackKey() []byte { prefixLen := 1 n := prefixLen @@ -1345,7 +1352,7 @@ func NewClaimToTXOKey(claimHash []byte) *ClaimToTXOKey { func (v *ClaimToTXOValue) NormalizedName() string { //TODO implement? Might not need to do anything. - return v.Name + return util.NormalizeName(v.Name) } func (k *ClaimToTXOKey) PackKey() []byte { @@ -1875,6 +1882,14 @@ type ChannelToClaimValue struct { ClaimHash []byte `json:"claim_hash"` } +func NewChannelToClaimKey(channelHash []byte, normalizedName string) *ChannelToClaimKey { + return &ChannelToClaimKey{ + Prefix: []byte{ChannelToClaim}, + SigningHash: channelHash, + Name: normalizedName, + } +} + func (k *ChannelToClaimKey) PackKey() []byte { prefixLen := 1 nameLen := len(k.Name) @@ -1996,6 +2011,13 @@ type ChannelCountValue struct { Count uint32 `json:"count"` } +func NewChannelCountKey(channelHash []byte) *ChannelCountKey { + return &ChannelCountKey{ + Prefix: []byte{ChannelCount}, + ChannelHash: channelHash, + } +} + func (k *ChannelCountKey) PackKey() []byte { prefixLen := 1 // b'>20sLH' @@ -3225,6 +3247,13 @@ type RepostValue struct { RepostedClaimHash []byte `json:"reposted_claim_hash"` } +func NewRepostKey(claimHash []byte) *RepostKey { + return &RepostKey{ + Prefix: []byte{Repost}, + ClaimHash: claimHash, + } +} + func (k *RepostKey) PackKey() []byte { prefixLen := 1 // b'>20s' @@ -3334,6 +3363,13 @@ type RepostedValue struct { ClaimHash []byte `json:"claim_hash"` } +func NewRepostedKey(claimHash []byte) *RepostedKey { + return &RepostedKey{ + Prefix: []byte{RepostedClaim}, + RepostedClaimHash: claimHash, + } +} + func (k *RepostedKey) PackKey() []byte { prefixLen := 1 // b'>20sLH' diff --git a/main.go b/main.go index 4fd3063..bf3c379 100644 --- a/main.go +++ b/main.go @@ -110,17 +110,16 @@ func main() { return } else if args.CmdType == server.DBCmd3 { - var rawPrefix byte = prefixes.TXOToClaim + // channelHash, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bd") + var rawPrefix byte = prefixes.DBState prefix := []byte{rawPrefix} columnFamily := string(prefix) - // start := &prefixes.ClaimShortIDKey{ - // Prefix: []byte{prefixes.ClaimShortIdPrefix}, - // NormalizedName: "cat", - // PartialClaimId: "", - // RootTxNum: 0, - // RootPosition: 0, + // start := &prefixes.ChannelCountKey{ + // Prefix: prefix, + // ChannelHash: channelHash, // } - // startRaw := prefixes.ClaimShortIDKeyPackPartial(start, 1) + // startRaw := prefixes.ChannelCountKeyPackPartial(start, 1) + // startRaw := start.PackKey() options := &db.IterOptions{ FillCache: false, Prefix: prefix, @@ -141,7 +140,7 @@ func main() { options.CfHandle = handles[1] - db.ReadWriteRawNColumnFamilies(dbVal, options, fmt.Sprintf("./testdata/%s_2.csv", columnFamily), 10) + db.ReadWriteRawNColumnFamilies(dbVal, options, fmt.Sprintf("./testdata/%s_resolve.csv", columnFamily), 1) return } diff --git a/testdata/Z_resolve.csv b/testdata/Z_resolve.csv new file mode 100644 index 0000000..d82960f --- /dev/null +++ b/testdata/Z_resolve.csv @@ -0,0 +1,2 @@ +Z,, +Z,5a2556ed1cab9d17f2a9392030a9ad7f5d138f11bd,00000e56 diff --git a/testdata/s_resolve.csv b/testdata/s_resolve.csv new file mode 100644 index 0000000..936f795 --- /dev/null +++ b/testdata/s_resolve.csv @@ -0,0 +1,2 @@ +s,, +s,73,9c89283ba0f3227f6c03b70216b9f665f0118d5e0fa729cedf4fb34d6a34f46300105bec03f782718ccd27260ce980e7d3d0b5c5f7be1517027b68104109128a34d1cc562f32008e00105bef0014f734000700105befffffffffffffffff00105bec