diff --git a/db/db.go b/db/db.go index 24e2419..b34316a 100644 --- a/db/db.go +++ b/db/db.go @@ -7,27 +7,37 @@ import ( "log" "math" "os" + "strings" + "time" "github.com/lbryio/hub/db/prefixes" "github.com/lbryio/lbry.go/v2/extras/util" + lbryurl "github.com/lbryio/lbry.go/v2/url" "github.com/linxGnu/grocksdb" ) +type ReadOnlyDBColumnFamily struct { + DB *grocksdb.DB + Handles map[string]*grocksdb.ColumnFamilyHandle + Opts *grocksdb.ReadOptions + TxCounts []uint32 +} + type ResolveResult struct { Name string NormalizedName string ClaimHash []byte - TxNum int - Position int + TxNum uint32 + Position uint16 TxHash []byte - Height int + Height uint32 Amount int ShortUrl string IsControlling bool CanonicalUrl string - CreationHeight int - ActivationHeight int - ExpirationHeight int + CreationHeight uint32 + ActivationHeight uint32 + ExpirationHeight uint32 EffectiveAmount int SupportAmount int Reposted int @@ -217,7 +227,514 @@ func ParseURL(url string) (parsed *URL, err error) { return NewURL(), nil } -func Resolve(db *grocksdb.DB, url string) *ExpandedResolveResult { +func PrepareResolveResult( + db *ReadOnlyDBColumnFamily, + txNum uint32, + position uint16, + claimHash []byte, + name string, + rootTxNum uint32, + rootPosition uint16, + activationHeight uint32, + signatureValid bool) (*ResolveResult, error) { + + normalizedName := util.NormalizeName(name) + controllingClaim, err := GetControllingClaim(db, normalizedName) + if err != nil { + return nil, err + } + /* + tx_hash = self.get_tx_hash(tx_num) + */ + txHash, err := GetTxHash(db, txNum) + if err != nil { + return nil, err + } + /* + height = bisect_right(self.tx_counts, tx_num) + created_height = bisect_right(self.tx_counts, root_tx_num) + last_take_over_height = controlling_claim.height + */ + height := txNum + createdHeight := rootTxNum + lastTakeoverHeight := controllingClaim.Height + + /* + expiration_height = self.coin.get_expiration_height(height) + support_amount = self.get_support_amount(claim_hash) + claim_amount = self.get_cached_claim_txo(claim_hash).amount + */ + //expirationHeight := activationHeight + int(controllingClaim.Expiration) + expirationHeight := activationHeight + height // FIXME + supportAmount, err := GetSupportAmount(db, claimHash) + if err != nil { + return nil, err + } + claimToTxo, err := GetCachedClaimTxo(db, claimHash) + if err != nil { + return nil, err + } + claimAmount := claimToTxo.Amount + + /* + effective_amount = self.get_effective_amount(claim_hash) + channel_hash = self.get_channel_for_claim(claim_hash, tx_num, position) + reposted_claim_hash = self.get_repost(claim_hash) + short_url = self.get_short_claim_id_url(name, normalized_name, claim_hash, root_tx_num, root_position) + canonical_url = short_url + claims_in_channel = self.get_claims_in_channel_count(claim_hash) + + */ + effectiveAmount, err := GetEffectiveAmount(db, claimHash, false) + if err != nil { + return nil, err + } + channelHash, err := GetChannelForClaim(db, claimHash, txNum, position) + if err != nil { + return nil, err + } + repostedClaimHash, err := GetRepost(db, claimHash) + if err != nil { + return nil, err + } + shortUrl, err := GetShortClaimIdUrl(db, name, normalizedName, claimHash, txNum, rootPosition) + if err != nil { + return nil, err + } + var canonicalUrl string = shortUrl + claimsInChannel, err := GetClaimsInChannelCount(db, claimHash) + if err != nil { + return nil, err + } + /* + if channel_hash: + channel_vals = self.get_cached_claim_txo(channel_hash) + if channel_vals: + channel_short_url = self.get_short_claim_id_url( + channel_vals.name, channel_vals.normalized_name, channel_hash, channel_vals.root_tx_num, + channel_vals.root_position + ) + canonical_url = f'{channel_short_url}/{short_url}' + + */ + if channelHash != nil { + //FIXME + // Ignore error because we already have this set if this doesn't work + channelVals, _ := GetCachedClaimTxo(db, channelHash) + if channelVals != nil { + channelShortUrl, _ := GetShortClaimIdUrl(db, channelVals.Name, channelVals.NormalizedName(), channelHash, channelVals.RootTxNum, channelVals.RootPosition) + canonicalUrl = fmt.Sprintf("%s/%s", channelShortUrl, shortUrl) + } + } + reposted, err := GetRepostedCount(db, claimHash) + if err != nil { + return nil, err + } + return &ResolveResult{ + Name: name, + NormalizedName: normalizedName, + ClaimHash: claimHash, + TxNum: txNum, + Position: position, + TxHash: txHash, + Height: height, + Amount: int(claimAmount), + ShortUrl: shortUrl, + IsControlling: bytes.Equal(controllingClaim.ClaimHash, claimHash), + CanonicalUrl: canonicalUrl, + CreationHeight: createdHeight, + ActivationHeight: activationHeight, + ExpirationHeight: expirationHeight, + EffectiveAmount: int(effectiveAmount), + SupportAmount: int(supportAmount), + Reposted: reposted, + LastTakeoverHeight: int(lastTakeoverHeight), + ClaimsInChannel: claimsInChannel, + ChannelHash: channelHash, + RepostedClaimHash: repostedClaimHash, + SignatureValid: signatureValid, + }, nil + /* + + return ResolveResult( + name, normalized_name, claim_hash, tx_num, position, tx_hash, height, claim_amount, short_url=short_url, + is_controlling=controlling_claim.claim_hash == claim_hash, canonical_url=canonical_url, + last_takeover_height=last_take_over_height, claims_in_channel=claims_in_channel, + creation_height=created_height, activation_height=activation_height, + expiration_height=expiration_height, effective_amount=effective_amount, support_amount=support_amount, + channel_hash=channel_hash, reposted_claim_hash=reposted_claim_hash, + reposted=self.get_reposted_count(claim_hash), + signature_valid=None if not channel_hash else signature_valid + ) + + + */ + // return nil, nil +} + +func GetClaimsInChannelCount(db *ReadOnlyDBColumnFamily, channelHash []byte) (int, error) { + /* + def get_claims_in_channel_count(self, channel_hash) -> int: + channel_count_val = self.prefix_db.channel_count.get(channel_hash) + if channel_count_val is None: + return 0 + return channel_count_val.count + */ + return 0, nil +} + +func GetShortClaimIdUrl(db *ReadOnlyDBColumnFamily, name string, normalizedName string, claimHash []byte, rootTxNum uint32, rootPosition uint16) (string, error) { + /* + def get_short_claim_id_url(self, name: str, normalized_name: str, claim_hash: bytes, + root_tx_num: int, root_position: int) -> str: + claim_id = claim_hash.hex() + for prefix_len in range(10): + for k in self.prefix_db.claim_short_id.iterate(prefix=(normalized_name, claim_id[:prefix_len+1]), + include_value=False): + if k.root_tx_num == root_tx_num and k.root_position == root_position: + return f'{name}#{k.partial_claim_id}' + break + print(f"{claim_id} has a collision") + return f'{name}#{claim_id}' + */ + // prefix := []byte{prefixes.ClaimShortIdPrefix} + // cfName := string(prefix) + // handle := db.Handles[cfName] + // claimId := hex.EncodeToString(claimHash) + // for prefixLen := 0; prefixLen < 10; prefixLen++ { + // // Prefix and handle + // options := NewIterateOptions().WithPrefix(prefix).WithCfHandle(handle) + // // Start and stop bounds + // options = options.WithStart(startKeyRaw).WithStop(endKeyRaw) + // // Don't include the key + // options = options.WithIncludeKey(false) + + // IterCF(db.DB) + // // for k, _ := range db.ClaimShortId.Iterate(prefix=(normalizedName, claimId[:prefixLen+1]), includeValue=false) { + // // if k.RootTxNum == rootTxNum && k.RootPosition == rootPosition { + // // return fmt.Sprintf("%s#%s", name, k.PartialClaimId), nil + // // } + // // break + // // } + // } + return "", nil +} + +func GetRepost(db *ReadOnlyDBColumnFamily, claimHash []byte) ([]byte, error) { + /* + def get_repost(self, claim_hash) -> Optional[bytes]: + repost = self.prefix_db.repost.get(claim_hash) + if repost: + return repost.reposted_claim_hash + return + */ + return nil, nil +} + +func GetRepostedCount(db *ReadOnlyDBColumnFamily, claimHash []byte) (int, error) { + /* + def get_reposted_count(self, claim_hash: bytes) -> int: + return sum( + 1 for _ in self.prefix_db.reposted_claim.iterate(prefix=(claim_hash,), include_value=False) + ) + */ + return 0, nil +} + +func GetChannelForClaim(db *ReadOnlyDBColumnFamily, claimHash []byte, txNum uint32, position uint16) ([]byte, error) { + /* + def get_channel_for_claim(self, claim_hash, tx_num, position) -> Optional[bytes]: + v = self.prefix_db.claim_to_channel.get(claim_hash, tx_num, position) + if v: + return v.signing_hash + */ + key := prefixes.NewClaimToChannelKey(claimHash, txNum, position) + cfName := string(prefixes.ClaimToChannel) + handle := db.Handles[cfName] + rawKey := key.PackKey() + slice, err := db.DB.GetCF(db.Opts, handle, rawKey) + if err != nil { + return nil, err + } else if slice == nil { + return nil, nil + } + rawValue := make([]byte, len(slice.Data())) + copy(rawValue, slice.Data()) + value := prefixes.ClaimToChannelValueUnpack(rawValue) + return value.SigningHash, nil +} + +func GetActiveAmount(db *ReadOnlyDBColumnFamily, claimHash []byte, txoType uint8, height uint32) (uint64, error) { + cfName := string(prefixes.ActiveAmount) + handle := db.Handles[cfName] + startKey := &prefixes.ActiveAmountKey{ + Prefix: []byte{prefixes.ActiveAmount}, + ClaimHash: claimHash, + TxoType: txoType, + ActivationHeight: 0, + } + endKey := &prefixes.ActiveAmountKey{ + Prefix: []byte{prefixes.ActiveAmount}, + ClaimHash: claimHash, + TxoType: txoType, + ActivationHeight: height, + } + startKeyRaw := prefixes.ActiveAmountKeyPackPartial(startKey, 3) + endKeyRaw := prefixes.ActiveAmountKeyPackPartial(endKey, 3) + // Prefix and handle + options := NewIterateOptions().WithPrefix([]byte{prefixes.ActiveAmount}).WithCfHandle(handle) + // Start and stop bounds + options = options.WithStart(startKeyRaw).WithStop(endKeyRaw) + // Don't include the key + options = options.WithIncludeKey(false).WithIncludeValue(true) + + ch := IterCF(db.DB, options) + var sum uint64 = 0 + for kv := range ch { + sum += kv.Value.(*prefixes.ActiveAmountValue).Amount + } + + return sum, nil +} + +func GetEffectiveAmount(db *ReadOnlyDBColumnFamily, claimHash []byte, supportOnly bool) (uint64, error) { + /* + def get_effective_amount(self, claim_hash: bytes, support_only=False) -> int: + support_amount = self._get_active_amount(claim_hash, ACTIVATED_SUPPORT_TXO_TYPE, self.db_height + 1) + if support_only: + return support_only + return support_amount + self._get_active_amount(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, self.db_height + 1) + */ + // key := prefixes.NewSupportAmountKey(claimHash) + // cfName := string(prefixes.ActiveAmount) + // handle := db.Handles[cfName] + // rawKey := key.PackKey() + // slice, err := db.DB.GetCF(db.Opts, handle, rawKey) + // if err != nil { + // return 0, err + // } else if slice == nil { + // return 0, nil + // } + // rawValue := make([]byte, len(slice.Data())) + // copy(rawValue, slice.Data()) + // value := prefixes.SupportAmountValueUnpack(rawValue) + return 0, nil +} + +func GetSupportAmount(db *ReadOnlyDBColumnFamily, claimHash []byte) (uint64, error) { + /* + support_amount_val = self.prefix_db.support_amount.get(claim_hash) + if support_amount_val is None: + return 0 + return support_amount_val.amount + */ + key := prefixes.NewSupportAmountKey(claimHash) + cfName := string(prefixes.SupportAmount) + handle := db.Handles[cfName] + rawKey := key.PackKey() + slice, err := db.DB.GetCF(db.Opts, handle, rawKey) + if err != nil { + return 0, err + } else if slice == nil { + return 0, nil + } + rawValue := make([]byte, len(slice.Data())) + copy(rawValue, slice.Data()) + value := prefixes.SupportAmountValueUnpack(rawValue) + return value.Amount, nil +} + +func GetTxHash(db *ReadOnlyDBColumnFamily, txNum uint32) ([]byte, error) { + /* + if self._cache_all_tx_hashes: + return self.total_transactions[tx_num] + return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False) + */ + // TODO: caching + key := prefixes.NewTxHashKey(txNum) + cfName := string(prefixes.TxHash) + handle := db.Handles[cfName] + rawKey := key.PackKey() + slice, err := db.DB.GetCF(db.Opts, handle, rawKey) + if err != nil { + return nil, err + } + rawValue := make([]byte, len(slice.Data())) + copy(rawValue, slice.Data()) + return rawValue, nil +} + +func GetActivation(db *ReadOnlyDBColumnFamily, txNum uint32, postition uint16, isSupport bool) (uint32, error) { + /* + def get_activation(self, tx_num, position, is_support=False) -> int: + activation = self.prefix_db.activated.get( + ACTIVATED_SUPPORT_TXO_TYPE if is_support else ACTIVATED_CLAIM_TXO_TYPE, tx_num, position + ) + if activation: + return activation.height + return -1 + */ + var typ uint8 + if isSupport { + typ = prefixes.ACTIVATED_SUPPORT_TXO_TYPE + } else { + typ = prefixes.ACTIVATED_CLAIM_TXO_TYPE + } + + key := prefixes.NewActivationKey(typ, txNum, postition) + cfName := string(prefixes.ActivatedClaimAndSupport) + handle := db.Handles[cfName] + rawKey := key.PackKey() + slice, err := db.DB.GetCF(db.Opts, handle, rawKey) + if err != nil { + return 0, err + } + rawValue := make([]byte, len(slice.Data())) + copy(rawValue, slice.Data()) + value := prefixes.ActivationValueUnpack(rawValue) + // Does this need to explicitly return an int64, in case the uint32 overflows the max of an int? + return value.Height, nil +} + +func GetCachedClaimTxo(db *ReadOnlyDBColumnFamily, claim []byte) (*prefixes.ClaimToTXOValue, error) { + // TODO: implement cache? + key := prefixes.NewClaimToTXOKey(claim) + cfName := string(prefixes.ClaimToTXO) + handle := db.Handles[cfName] + rawKey := key.PackKey() + slice, err := db.DB.GetCF(db.Opts, handle, rawKey) + if err != nil { + return nil, err + } + rawValue := make([]byte, len(slice.Data())) + copy(rawValue, slice.Data()) + value := prefixes.ClaimToTXOValueUnpack(rawValue) + return value, nil +} + +func GetControllingClaim(db *ReadOnlyDBColumnFamily, name string) (*prefixes.ClaimTakeoverValue, error) { + key := prefixes.NewClaimTakeoverKey(name) + cfName := string(prefixes.ClaimTakeover) + handle := db.Handles[cfName] + rawKey := key.PackKey() + slice, err := db.DB.GetCF(db.Opts, handle, rawKey) + if err != nil { + return nil, err + } + rawValue := make([]byte, len(slice.Data())) + copy(rawValue, slice.Data()) + value := prefixes.ClaimTakeoverValueUnpack(rawValue) + return value, nil +} + +func FsGetClaimByHash(db *ReadOnlyDBColumnFamily, claimHash []byte) (*ResolveResult, error) { + claim, err := GetCachedClaimTxo(db, claimHash) + if err != nil { + return nil, err + } + activation, err := GetActivation(db, claim.TxNum, claim.Position, false) + if err != nil { + return nil, err + } + /* + return self._prepare_resolve_result( + claim.tx_num, claim.position, claim_hash, claim.name, claim.root_tx_num, claim.root_position, + activation, claim.channel_signature_is_valid + ) + */ + //return PrepareResolveResult(db, 0, 0, nil, "", 0, 0, 0, false) + return PrepareResolveResult( + db, + claim.TxNum, + claim.Position, + claimHash, + claim.Name, + claim.RootTxNum, + claim.RootPosition, + activation, + claim.ChannelSignatureIsValid, + ) +} + +func ClaimShortIdIter(db *ReadOnlyDBColumnFamily, normalizedName string, claimId string) <-chan *prefixes.PrefixRowKV { + prefix := []byte{prefixes.ClaimShortIdPrefix} + handle := db.Handles[string(prefix)] + key := prefixes.NewClaimShortIDKey(normalizedName, claimId) + rawKeyPrefix := prefixes.ClaimShortIDKeyPackPartial(key, 2) + options := NewIterateOptions().WithCfHandle(handle).WithPrefix(rawKeyPrefix) + options = options.WithIncludeValue(true) + ch := IterCF(db.DB, options) + return ch +} + +func GetCachedClaimHash(db *ReadOnlyDBColumnFamily, txNum uint32, position uint16) ([]byte, error) { + /* + if self._cache_all_claim_txos: + if tx_num not in self.txo_to_claim: + return + return self.txo_to_claim[tx_num].get(position, None) + v = self.prefix_db.txo_to_claim.get_pending(tx_num, position) + return None if not v else v.claim_hash + */ + return nil, nil +} + +func ResolveParsedUrl(db *ReadOnlyDBColumnFamily, parsed *PathSegment) (*ResolveResult, error) { + normalizedName := util.NormalizeName(parsed.name) + if (parsed.amountOrder == 0 && parsed.claimId == "") || parsed.amountOrder == 1 { + controlling, err := GetControllingClaim(db, normalizedName) + if err != nil { + return nil, err + } + return FsGetClaimByHash(db, controlling.ClaimHash) + } + + var amountOrder int = int(math.Max(float64(parsed.amountOrder), 1)) + + log.Println(amountOrder) + + if parsed.claimId != "" { + if len(parsed.claimId) == 40 { + // Do full claim stuff + log.Println("TODO") + } + ch := ClaimShortIdIter(db, normalizedName, parsed.claimId) + for row := range ch { + key := row.Value.(*prefixes.ClaimShortIDKey) + claimTxo := row.Value.(*prefixes.ClaimShortIDValue) + fullClaimHash, err := GetCachedClaimHash(db, claimTxo.TxNum, claimTxo.Position) + if err != nil { + return nil, err + } + c, err := GetCachedClaimTxo(db, fullClaimHash) + if err != nil { + return nil, err + } + nonNormalizedName := c.Name + signatureIsValid := c.ChannelSignatureIsValid + activation, err := GetActivation(db, claimTxo.TxNum, claimTxo.Position, false) + if err != nil { + return nil, err + } + // LOL how do we ignore this static-check from Go? + return PrepareResolveResult( + db, + claimTxo.TxNum, + claimTxo.Position, + fullClaimHash, + nonNormalizedName, + key.RootTxNum, + key.RootPosition, + activation, + signatureIsValid, + ) + } + } + + return nil, nil +} + +func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult { var res = &ExpandedResolveResult{ Stream: nil, Channel: nil, @@ -225,7 +742,9 @@ func Resolve(db *grocksdb.DB, url string) *ExpandedResolveResult { RepostedChannel: nil, } - parsed, err := ParseURL(url) + var channel *PathSegment = nil + var stream *PathSegment = nil + parsed, err := lbryurl.Parse(url, false) if err != nil { res.Stream = &optionalResolveResultOrError{ err: &ResolveError{err}, @@ -233,6 +752,65 @@ func Resolve(db *grocksdb.DB, url string) *ExpandedResolveResult { return res } + log.Printf("parsed: %#v\n", parsed) + + // has stream in channel + if strings.Compare(parsed.StreamName, "") != 0 && strings.Compare(parsed.ChannelName, "") != 0 { + channel = &PathSegment{ + name: parsed.ChannelName, + claimId: parsed.ChannelClaimId, + amountOrder: parsed.PrimaryBidPosition, + } + stream = &PathSegment{ + name: parsed.StreamName, + claimId: parsed.StreamClaimId, + amountOrder: parsed.SecondaryBidPosition, + } + } else if strings.Compare(parsed.ChannelName, "") != 0 { + channel = &PathSegment{ + name: parsed.ChannelName, + claimId: parsed.ChannelClaimId, + amountOrder: parsed.PrimaryBidPosition, + } + } else if strings.Compare(parsed.StreamName, "") != 0 { + stream = &PathSegment{ + name: parsed.StreamName, + claimId: parsed.StreamClaimId, + amountOrder: parsed.SecondaryBidPosition, + } + } + + log.Printf("channel: %#v\n", channel) + log.Printf("stream: %#v\n", stream) + + var resolvedChannel *ResolveResult = nil + var resolvedStream *ResolveResult = nil + if channel != nil { + resolvedChannel, err = ResolveParsedUrl(db, channel) + if err != nil { + res.Channel = &optionalResolveResultOrError{ + err: &ResolveError{err}, + } + return res + } + } + log.Println("resolvedChannel:", resolvedChannel) + return nil + if stream != nil { + if resolvedChannel != nil { + + } else { + + } + } + + res.Channel = &optionalResolveResultOrError{ + res: resolvedChannel, + } + res.Stream = &optionalResolveResultOrError{ + res: resolvedStream, + } + log.Printf("parsed: %+v\n", parsed) return res } @@ -333,6 +911,13 @@ func Iter(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { ch := make(chan *prefixes.PrefixRowKV) ro := grocksdb.NewDefaultReadOptions() + /* + FIXME: + ro.SetIterateLowerBound() + ro.SetIterateUpperBound() + ro.PrefixSameAsStart() -> false + ro.AutoPrefixMode() -> on + */ ro.SetFillCache(opts.FillCache) it := db.NewIterator(ro) opts.It = it @@ -384,6 +969,72 @@ func GetWriteDBCF(name string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, er return db, handles, nil } +func GetDBColumnFamlies(name string, cfNames []string) (*ReadOnlyDBColumnFamily, error) { + opts := grocksdb.NewDefaultOptions() + roOpts := grocksdb.NewDefaultReadOptions() + cfOpt := grocksdb.NewDefaultOptions() + + //cfNames := []string{"default", cf} + cfOpts := make([]*grocksdb.Options, len(cfNames)) + for i := range cfNames { + cfOpts[i] = cfOpt + } + + db, handles, err := grocksdb.OpenDbAsSecondaryColumnFamilies(opts, name, "asdf", cfNames, cfOpts) + // db, handles, err := grocksdb.OpenDbColumnFamilies(opts, name, cfNames, cfOpts) + + if err != nil { + return nil, err + } + + var handlesMap = make(map[string]*grocksdb.ColumnFamilyHandle) + for i, handle := range handles { + log.Printf("%d: %+v\n", i, handle) + handlesMap[cfNames[i]] = handle + } + + myDB := &ReadOnlyDBColumnFamily{ + DB: db, + Handles: handlesMap, + Opts: roOpts, + } + + err = InitTxCounts(myDB) + if err != nil { + return nil, err + } + + return myDB, nil +} + +func InitTxCounts(db *ReadOnlyDBColumnFamily) error { + start := time.Now() + handle, ok := db.Handles[string([]byte{prefixes.TxCount})] + if !ok { + return fmt.Errorf("TxCount prefix not found") + } + + //txCounts := make([]uint32, 1200000) + txCounts := make([]uint32, 100000) + + options := NewIterateOptions().WithPrefix([]byte{prefixes.TxCount}).WithCfHandle(handle) + options = options.WithIncludeKey(false).WithIncludeValue(true) + + ch := IterCF(db.DB, options) + + for txCount := range ch { + //log.Println(txCount) + txCounts = append(txCounts, txCount.Value.(*prefixes.TxCountValue).TxCount) + } + + duration := time.Since(start) + log.Println("len(txCounts), cap(txCounts):", len(txCounts), cap(txCounts)) + log.Println("Time to get txCounts:", duration) + + db.TxCounts = txCounts + return nil +} + func GetDBCF(name string, cf string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, error) { opts := grocksdb.NewDefaultOptions() cfOpt := grocksdb.NewDefaultOptions() @@ -444,6 +1095,45 @@ func ReadPrefixN(db *grocksdb.DB, prefix []byte, n int) []*prefixes.PrefixRowKV return res } +func ReadWriteRawNColumnFamilies(db *grocksdb.DB, options *IterOptions, out string, n int) { + + options.RawKey = true + options.RawValue = true + ch := IterCF(db, options) + + file, err := os.Create(out) + if err != nil { + log.Println(err) + return + } + defer file.Close() + + var i = 0 + log.Println(options.Prefix) + cf := string(options.Prefix) + file.Write([]byte(fmt.Sprintf("%s,,\n", options.Prefix))) + for kv := range ch { + log.Println(i) + if i >= n { + return + } + key := kv.Key.([]byte) + value := kv.Value.([]byte) + keyHex := hex.EncodeToString(key) + valueHex := hex.EncodeToString(value) + //log.Println(keyHex) + //log.Println(valueHex) + file.WriteString(cf) + file.WriteString(",") + file.WriteString(keyHex) + file.WriteString(",") + file.WriteString(valueHex) + file.WriteString("\n") + + i++ + } +} + func ReadWriteRawNCF(db *grocksdb.DB, options *IterOptions, out string, n int) { options.RawKey = true diff --git a/db/db_test.go b/db/db_test.go index 4a8f929..fcecbfb 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -13,6 +13,70 @@ import ( "github.com/linxGnu/grocksdb" ) +func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFamily, [][]string, func(), error) { + + log.Println(filePath) + file, err := os.Open(filePath) + if err != nil { + log.Println(err) + } + reader := csv.NewReader(file) + records, err := reader.ReadAll() + if err != nil { + return nil, nil, nil, err + } + + wOpts := grocksdb.NewDefaultWriteOptions() + opts := grocksdb.NewDefaultOptions() + opts.SetCreateIfMissing(true) + db, err := grocksdb.OpenDb(opts, "tmp") + if err != nil { + return nil, nil, nil, err + } + var handleMap map[string]*grocksdb.ColumnFamilyHandle = make(map[string]*grocksdb.ColumnFamilyHandle) + + for _, cfNameRune := range records[0][0] { + cfName := string(cfNameRune) + log.Println(cfName) + handle, err := db.CreateColumnFamily(opts, cfName) + if err != nil { + return nil, nil, nil, err + } + handleMap[cfName] = handle + } + toDefer := func() { + db.Close() + err = os.RemoveAll("./tmp") + if err != nil { + log.Println(err) + } + } + for _, record := range records[1:] { + cf := record[0] + if err != nil { + return nil, nil, nil, err + } + handle := handleMap[string(cf)] + key, err := hex.DecodeString(record[1]) + if err != nil { + return nil, nil, nil, err + } + val, err := hex.DecodeString(record[2]) + if err != nil { + return nil, nil, nil, err + } + db.PutCF(wOpts, handle, key, val) + } + + myDB := &dbpkg.ReadOnlyDBColumnFamily{ + DB: db, + Handles: handleMap, + Opts: grocksdb.NewDefaultReadOptions(), + } + + return myDB, records, toDefer, nil +} + func OpenAndFillTmpDBCF(filePath string) (*grocksdb.DB, [][]string, func(), *grocksdb.ColumnFamilyHandle, error) { log.Println(filePath) @@ -101,9 +165,68 @@ func OpenAndFillTmpDB(filePath string) (*grocksdb.DB, [][]string, func(), error) return db, records, toDefer, nil } +func CatCSV(filePath string) { + log.Println(filePath) + file, err := os.Open(filePath) + if err != nil { + log.Println(err) + } + reader := csv.NewReader(file) + records, err := reader.ReadAll() + if err != nil { + log.Println(err) + return + } + for _, record := range records[1:] { + log.Println(record[1]) + keyRaw, err := hex.DecodeString(record[1]) + key, _ := prefixes.UnpackGenericKey(keyRaw) + log.Println(key) + if err != nil { + log.Println(err) + return + } + valRaw, err := hex.DecodeString(record[2]) + // val := prefixes.ClaimTakeoverValueUnpack(valRaw) + val, _ := prefixes.UnpackGenericValue(keyRaw, valRaw) + log.Println(val) + if err != nil { + log.Println(err) + return + } + } +} + +func TestOpenFullDB(t *testing.T) { + url := "lbry://@lothrop#2/lothrop-livestream-games-and-code#c" + dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" + prefixes := prefixes.GetPrefixes() + cfNames := []string{"default", "e", "d", "c"} + for _, prefix := range prefixes { + cfName := string(prefix) + cfNames = append(cfNames, cfName) + } + db, err := dbpkg.GetDBColumnFamlies(dbPath, cfNames) + toDefer := func() { + db.DB.Close() + err = os.RemoveAll("./asdf") + if err != nil { + log.Println(err) + } + } + defer toDefer() + if err != nil { + t.Error(err) + return + } + expandedResolveResult := dbpkg.Resolve(db, url) + log.Println(expandedResolveResult) +} + +// FIXME: Needs new data format func TestResolve(t *testing.T) { - filePath := "../testdata/W.csv" - db, _, toDefer, _, err := OpenAndFillTmpDBCF(filePath) + filePath := "../testdata/P_cat.csv" + db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) if err != nil { t.Error(err) return @@ -113,6 +236,81 @@ func TestResolve(t *testing.T) { log.Println(expandedResolveResult) } +func TestPrintClaimShortId(t *testing.T) { + filePath := "../testdata/F_cat.csv" + CatCSV(filePath) +} + +func TestClaimShortIdIter(t *testing.T) { + filePath := "../testdata/F_cat.csv" + normalName := "cat" + claimId := "0" + db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + if err != nil { + t.Error(err) + return + } + defer toDefer() + + ch := dbpkg.ClaimShortIdIter(db, normalName, claimId) + + for row := range ch { + key := row.Key.(*prefixes.ClaimShortIDKey) + log.Println(key) + if key.NormalizedName != normalName { + t.Errorf("Expected %s, got %s", normalName, key.NormalizedName) + } + } +} + +func TestPrintClaimToTXO(t *testing.T) { + filePath := "../testdata/E_2.csv" + CatCSV(filePath) +} + +func TestGetClaimToTXO(t *testing.T) { + claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a" + claimHash, err := hex.DecodeString(claimHashStr) + if err != nil { + t.Error(err) + return + } + filePath := "../testdata/E_2.csv" + db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + if err != nil { + t.Error(err) + return + } + defer toDefer() + res, err := dbpkg.GetCachedClaimTxo(db, claimHash) + if err != nil { + t.Error(err) + return + } + log.Println(res) +} + +func TestPrintClaimTakeover(t *testing.T) { + filePath := "../testdata/P_cat.csv" + CatCSV(filePath) +} + +func TestGetControllingClaim(t *testing.T) { + filePath := "../testdata/P_cat.csv" + db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath) + if err != nil { + t.Error(err) + return + } + defer toDefer() + res, err := dbpkg.GetControllingClaim(db, "cat") + if err != nil { + t.Error(err) + return + } + log.Println(res) +} + func TestIter(t *testing.T) { filePath := "../testdata/W.csv" diff --git a/db/prefixes/prefixes.go b/db/prefixes/prefixes.go index 15172de..03e8a3d 100644 --- a/db/prefixes/prefixes.go +++ b/db/prefixes/prefixes.go @@ -911,6 +911,13 @@ type TxHashValue struct { TxHash []byte `json:"tx_hash"` } +func NewTxHashKey(txNum uint32) *TxHashKey { + return &TxHashKey{ + Prefix: []byte{TxHash}, + TxNum: txNum, + } +} + func (k *TxHashKey) PackKey() []byte { prefixLen := 1 // b'>L' @@ -1329,6 +1336,13 @@ type ClaimToTXOValue struct { Name string `json:"name"` } +func NewClaimToTXOKey(claimHash []byte) *ClaimToTXOKey { + return &ClaimToTXOKey{ + Prefix: []byte{ClaimToTXO}, + ClaimHash: claimHash, + } +} + func (v *ClaimToTXOValue) NormalizedName() string { //TODO implement? Might not need to do anything. return v.Name @@ -1577,6 +1591,14 @@ type ClaimShortIDValue struct { Position uint16 `json:"position"` } +func NewClaimShortIDKey(normalizedName, partialClaimId string) *ClaimShortIDKey { + return &ClaimShortIDKey{ + Prefix: []byte{ClaimShortIdPrefix}, + NormalizedName: normalizedName, + PartialClaimId: partialClaimId, + } +} + func (k *ClaimShortIDKey) PackKey() []byte { prefixLen := 1 nameLen := len(k.NormalizedName) @@ -1715,6 +1737,15 @@ type ClaimToChannelValue struct { SigningHash []byte `json:"signing_hash"` } +func NewClaimToChannelKey(claimHash []byte, txNum uint32, position uint16) *ClaimToChannelKey { + return &ClaimToChannelKey{ + Prefix: []byte{ClaimToChannel}, + ClaimHash: claimHash, + TxNum: txNum, + Position: position, + } +} + func (k *ClaimToChannelKey) PackKey() []byte { prefixLen := 1 // b'>20sLH' @@ -2055,6 +2086,13 @@ type SupportAmountValue struct { Amount uint64 `json:"amount"` } +func NewSupportAmountKey(claimHash []byte) *SupportAmountKey { + return &SupportAmountKey{ + Prefix: []byte{SupportAmount}, + ClaimHash: claimHash, + } +} + func (k *SupportAmountKey) PackKey() []byte { prefixLen := 1 // b'>20sLH' @@ -2501,6 +2539,13 @@ type ClaimTakeoverValue struct { Height uint32 `json:"height"` } +func NewClaimTakeoverKey(normalizedName string) *ClaimTakeoverKey { + return &ClaimTakeoverKey{ + Prefix: []byte{ClaimTakeover}, + NormalizedName: normalizedName, + } +} + func (v *ClaimTakeoverValue) String() string { return fmt.Sprintf( "%s(claim_hash=%s, height=%d)", @@ -2772,6 +2817,15 @@ type ActivationValue struct { NormalizedName string `json:"normalized_name"` } +func NewActivationKey(txoType uint8, txNum uint32, position uint16) *ActivationKey { + return &ActivationKey{ + Prefix: []byte{ActivatedClaimAndSupport}, + TxoType: txoType, + TxNum: txNum, + Position: position, + } +} + func (k *ActivationKey) PackKey() []byte { prefixLen := 1 // b'>BLH' diff --git a/main.go b/main.go index d8e57f9..b44401f 100644 --- a/main.go +++ b/main.go @@ -87,8 +87,75 @@ func main() { db.ReadWriteRawNCF(dbVal, options, fmt.Sprintf("./testdata/%s.csv", columnFamily), n) } + return + } else if args.CmdType == server.DBCmd3 { + var rawPrefix byte = prefixes.ClaimShortIdPrefix + prefix := []byte{rawPrefix} + columnFamily := string(prefix) + start := &prefixes.ClaimShortIDKey{ + Prefix: []byte{prefixes.ClaimShortIdPrefix}, + NormalizedName: "cat", + PartialClaimId: "", + RootTxNum: 0, + RootPosition: 0, + } + startRaw := prefixes.ClaimShortIDKeyPackPartial(start, 1) + options := &db.IterOptions{ + FillCache: false, + Prefix: prefix, + Start: startRaw, + Stop: nil, + IncludeStart: true, + IncludeStop: false, + IncludeKey: true, + IncludeValue: true, + RawKey: true, + RawValue: true, + } + + dbVal, handles, err := db.GetDBCF("/mnt/d/data/snapshot_1072108/lbry-rocksdb/", columnFamily) + if err != nil { + log.Fatalln(err) + } + + options.CfHandle = handles[1] + + db.ReadWriteRawNColumnFamilies(dbVal, options, fmt.Sprintf("./testdata/%s_cat.csv", columnFamily), 10) return } + // } else if args.CmdType == server.DBCmd4 { + // var rawPrefix byte = prefixes.TxCount + + // prefix := []byte{rawPrefix} + // columnFamily := string(prefix) + // options := &db.IterOptions{ + // FillCache: false, + // Prefix: prefix, + // Start: nil, + // Stop: nil, + // IncludeStart: true, + // IncludeStop: false, + // IncludeKey: true, + // IncludeValue: true, + // RawKey: true, + // RawValue: true, + // } + + // dbVal, handles, err := db.GetDBCF("/mnt/d/data/snapshot_1072108/lbry-rocksdb/", columnFamily) + // if err != nil { + // log.Fatalln(err) + // } + + // options.CfHandle = handles[1] + // var n = 10 + // if bytes.Equal(prefix, []byte{prefixes.Undo}) || bytes.Equal(prefix, []byte{prefixes.DBState}) { + // n = 1 + // } + + // db.ReadWriteRawNCF(dbVal, options, fmt.Sprintf("./testdata/%s.csv", columnFamily), n) + + // return + // } conn, err := grpc.Dial("localhost:"+args.Port, grpc.WithInsecure(), diff --git a/server/args.go b/server/args.go index 8b8cb3e..e0e9827 100644 --- a/server/args.go +++ b/server/args.go @@ -14,6 +14,7 @@ const ( SearchCmd = iota DBCmd = iota DBCmd2 = iota + DBCmd3 = iota ) // Args struct contains the arguments to the hub server. @@ -87,6 +88,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { searchCmd := parser.NewCommand("search", "claim search") dbCmd := parser.NewCommand("db", "db testing") dbCmd2 := parser.NewCommand("db2", "db testing") + dbCmd3 := parser.NewCommand("db3", "db testing") host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost}) port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort}) @@ -175,6 +177,8 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { args.CmdType = DBCmd } else if dbCmd2.Happened() { args.CmdType = DBCmd2 + } else if dbCmd3.Happened() { + args.CmdType = DBCmd3 } if *text != "" { diff --git a/server/federation.go b/server/federation.go index 288561f..5fb6e29 100644 --- a/server/federation.go +++ b/server/federation.go @@ -246,7 +246,7 @@ func (s *Server) writePeers() { } writer := bufio.NewWriter(f) - for key, _ := range s.PeerServers { + for key := range s.PeerServers { line := key + "\n" _, err := writer.WriteString(line) if err != nil { diff --git a/testdata/E_2.csv b/testdata/E_2.csv new file mode 100644 index 0000000..af15e6b --- /dev/null +++ b/testdata/E_2.csv @@ -0,0 +1,11 @@ +E,, +E,45000000a420c44374f4f399ab4807fa1901eefc87,0297ec2100000297ec21000000000000000f42400100246167656e63652d64c3a974727569742c2d6e6f7576656175782d736b696e732d6c65616b +E,45000000c27eef5ea69e0d73f118826c7e326bb469,00371d66000000371d660000000000001dcd650001000e4d696e696174757265486f757365 +E,4500000110e40894573f528c393fbcec7a472ec853,01516b32000001516b3200000000000000989680010021696f2d3137372d4e6f616d2d43686f6d736b792d6f6e2d434f494e54454c50524f +E,4500000324e40fcb63a0b517a3660645e9bd99244a,030bb6ba0000030bb6ba000000000000000f424001001b436f6e616e2d4578696c65732d526169642d4561726c792d457261 +E,45000003d1538a0f19f5cd4bc1a62cc294f5c89934,011c7c990000011c7c99000000000000000f424001001130322d636172726167652d68616e646c65 +E,45000008d47beeff8325e795a8604226145b01702b,02dbb2a2000002dbb2a2000000000000000186a001001039643336363661336161313231376138 +E,4500000906499e073e94370ceff37cb21c28212444,0369842d00000369842d000000000000000186a001001033333465356465363139306534323466 +E,45000009c3172e034a255f3c03566dca84bb9f046a,0225c69c000002251b0f0000000000000007a120010028617574686f722d73746f726965732d706f64636173742d657069736f64652d3734332d6b6172656e +E,45000009ca6e0caaaef16872b4bd4f6f1b8c2363e2,02b16956000002b16956000000000000000f4240010027554e2d504f55522d43454e542d28312d292d28536f72616c2c2d4162c3a963c3a9646169726529 +E,4500000ad9ded2e15d18987900d09e9b29ef33d03e,02c972b3000002c972b3000000000000000186a0010006313331333333 diff --git a/testdata/F_cat.csv b/testdata/F_cat.csv new file mode 100644 index 0000000..04eb35e --- /dev/null +++ b/testdata/F_cat.csv @@ -0,0 +1,11 @@ +F,, +F,460003636174013000201c740000,00201c740000 +F,4600036361740130002d4eb10000,002d4eb10000 +F,46000363617401300035f0460000,0035f0460000 +F,460003636174013000817df60000,00817df60000 +F,46000363617401300090d0f30000,009102470000 +F,460003636174013000a009af0000,00a009af0000 +F,460003636174013000a082a60000,00a082a60000 +F,460003636174013000a6a0d60000,00f2c75d0000 +F,460003636174013000a9111e0000,00b72e480000 +F,460003636174013000ab038e0000,00ab0ba60000 diff --git a/testdata/P_cat.csv b/testdata/P_cat.csv new file mode 100644 index 0000000..53c4405 --- /dev/null +++ b/testdata/P_cat.csv @@ -0,0 +1,11 @@ +P,, +P,500003636174,7b07396275ce6b5fd30bfbc84f69dd7818c4e119000d695f +P,500003636176,6950f0e12745c690b28365738e0095c776d48dd2000d7fc0 +P,50000363617a,9991c150e0e7df76163cbda34bd9bca59cc5a032000c2c3e +P,500003636231,3cd83b2b8e06867c3d8b066427896005881c5db9000bb158 +P,500003636232,e7e1aa9d2e5d7c8e6349f06dfb0ff1fd8e69038f000fdd49 +P,500003636233,d8d7099f4db8be475ed9e4ee2cd062fb6ce9533e000fc62b +P,500003636234,b696eb09a7529365015a033ebf4708101a1e1313000f17dd +P,500003636235,33b755873917f6c6d78a3777c3828827cc3a1e7e000fc68e +P,500003636236,ec06a840aaa04c2c09cc7f0b122ac3bbc2697e5b000fd4e8 +P,500003636237,e1d0cdb9bd6f230f2d0eb860f1d634c737c52c95000f006d diff --git a/testdata/ab.csv b/testdata/ab.csv new file mode 100644 index 0000000..417bbe6 --- /dev/null +++ b/testdata/ab.csv @@ -0,0 +1,21 @@ +ab,, +a,6100000324e40fcb63a0b517a3660645e9bd99244a,0000000001312d00 +a,6100000e474ea919000015b80ccb7f4e6cc73e2f52,0000000000000000 +a,61000023415fc7ba8a470f0cdf4a66bffacd5ba979,000000005c18bd6f +a,610000298e7db49c1f582e316bb3706fc3c71193cf,0000000001f0f430 +a,6100002c5bca153faaf3c644304f8a259340064f6c,0000000000000000 +a,6100002e6db2ae2c415a34d2d36d3cf61ac7133196,000000000bebc200 +a,6100002ea3970f0f658f50dbdb27abbb716ed01c80,0000000000000000 +a,610000345ff10a01448c42bf1a89a4399e8b82c1aa,0000000001ceef65 +a,610000437bd840e2463d3dfc8c80e66e2585dd02b3,00000000000eb7a7 +a,6100004cd6a62be5ccbfbef3ffca345c8f58595f87,000000001feadeb0 +b,6200000001,ba888e2f9c037f831046f8ad09f6d378f79c728d003b177a64d29621f481da5d +b,6200000002,09d8734d81b5f2eb1b653caf17491544ddfbc72f2f4c0c3f22a3362db5ba9d47 +b,6200000003,e285dbf24334585b9a924536a717160ee185a86d1eeb7b19684538685eca761a +b,6200000004,d83cf1408debbd631950b7a95b0c940772119cd8a615a3d44601568713fec80c +b,6200000005,47638e54178dbdddf2e81a3b7566860e5264df6066755f9760a893f5caecc579 +b,6200000006,ec91627e0dba856b933983425d7f72958e8f974682632a0fa2acee9cfd819401 +b,6200000007,a3c4a19948a1263722c45c5601fd10a7aea7cf73bfa45e060508f109155e80ab +b,6200000008,0fc2da46cf0de0057c1b9fc93d997105ff6cf2c8c43269b446c1dbf5ac18be8c +b,6200000009,7356a733f87e592ea133328792dd9d676ed83771c8ff0f519928ce752f159ba6 +b,620000000a,54a598c4356ce620a604004929af14f4c03c42eba017288a4a1d186aedfdd8f4 \ No newline at end of file