From 5171acc007785dc20ecb5bfcc5635c43e89ea2ea Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Tue, 8 Oct 2019 01:38:39 +0200 Subject: [PATCH] refactor integrity check add checks for improper transfer state add checks for unsent supports only use default account for operations that only work on it add more logging --- manager/transfer.go | 8 +- manager/ytsync.go | 266 ++++++++++++++++++++++++++++++++------------ 2 files changed, 199 insertions(+), 75 deletions(-) diff --git a/manager/transfer.go b/manager/transfer.go index bbde3bc..6b3f37f 100644 --- a/manager/transfer.go +++ b/manager/transfer.go @@ -11,10 +11,14 @@ import ( ) func waitConfirmations(s *Sync) error { + defaultAccount, err := s.getDefaultAccount() + if err != nil { + return err + } allConfirmed := false waiting: for !allConfirmed { - utxolist, err := s.daemon.UTXOList(nil) + utxolist, err := s.daemon.UTXOList(&defaultAccount) if err != nil { return err } else if utxolist == nil { @@ -165,5 +169,5 @@ func transferChannel(s *Sync) error { } log.Infof("TRANSFERRED %t", len(result.Outputs) != 0) - return errors.Err(err) + return nil } diff --git a/manager/ytsync.go b/manager/ytsync.go index f5fc422..da38421 100644 --- a/manager/ytsync.go +++ b/manager/ytsync.go @@ -8,6 +8,7 @@ import ( "os/signal" "runtime/debug" "sort" + "strconv" "strings" "sync" "syscall" @@ -314,44 +315,53 @@ func (s *Sync) FullCycle() (e error) { } if s.shouldTransfer() { - err := waitConfirmations(s) - if err != nil { - return err - } - supportAmount, err := abandonSupports(s) - if err != nil { - return errors.Prefix(fmt.Sprintf("%.6f LBCs were abandoned before failing", supportAmount), err) - } - err = transferVideos(s) - if err != nil { - return err - } - err = transferChannel(s) - if err != nil { - return err - } - - reallocateSupports := supportAmount > 0.01 - if reallocateSupports { - err = waitConfirmations(s) - if err != nil { - return err - } - isTip := true - summary, err := s.daemon.SupportCreate(s.lbryChannelID, fmt.Sprintf("%.6f", supportAmount), &isTip, nil, nil) - if err != nil { - return errors.Err(err) - } - if len(summary.Outputs) < 1 { - return errors.Err("something went wrong while tipping the channel for %.6f LBCs", supportAmount) - } - } - return nil + return s.processTransfers() } return nil } +func (s *Sync) processTransfers() (e error) { + log.Println("Processing transfers") + err := waitConfirmations(s) + if err != nil { + return err + } + supportAmount, err := abandonSupports(s) + if err != nil { + return errors.Prefix(fmt.Sprintf("%.6f LBCs were abandoned before failing", supportAmount), err) + } + if supportAmount > 0 { + logUtils.SendInfoToSlack("%.6f LBCs were abandoned and should be used as support", supportAmount) + } + err = transferVideos(s) + if err != nil { + return err + } + err = transferChannel(s) + if err != nil { + return err + } + + reallocateSupports := supportAmount > 0.01 + if reallocateSupports { + err = waitConfirmations(s) + if err != nil { + return err + } + isTip := true + summary, err := s.daemon.SupportCreate(s.lbryChannelID, fmt.Sprintf("%.6f", supportAmount), &isTip, nil, nil) + if err != nil { + return errors.Err(err) + } + if len(summary.Outputs) < 1 { + return errors.Err("something went wrong while tipping the channel for %.6f LBCs", supportAmount) + } + } + log.Println("Done processing transfers") + return nil +} + func deleteSyncFolder(videoDirectory string) { if !strings.Contains(videoDirectory, "/tmp/ytsync") { _ = util.SendToSlack(errors.Err("Trying to delete an unexpected directory: %s", videoDirectory).Error()) @@ -510,52 +520,84 @@ func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) { return abandonedClaims, nil } -//updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published -//additionally it removes all entries in the database indicating that a video is published when it's actually not -func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total, fixed, removed int, err error) { - count := 0 - videoIDMap := make(map[string]string, len(claims)) +type ytsyncClaim struct { + ClaimID string + MetadataVersion uint + ClaimName string + PublishAddress string + VideoID string + Claim *jsonrpc.Claim +} + +// mapFromClaims returns a map of videoIDs (youtube id) to ytsyncClaim which is a structure holding blockchain related +// information +func (s *Sync) mapFromClaims(claims []jsonrpc.Claim) map[string]ytsyncClaim { + videoIDMap := make(map[string]ytsyncClaim, len(claims)) for _, c := range claims { if !isYtsyncClaim(c) { continue } - count++ - //check if claimID is in remote db tn := c.Value.GetThumbnail().GetUrl() videoID := tn[strings.LastIndex(tn, "/")+1:] - videoIDMap[videoID] = c.ClaimID - s.syncedVideosMux.RLock() - pv, claimInDatabase := s.syncedVideos[videoID] - s.syncedVideosMux.RUnlock() claimMetadataVersion := uint(1) if strings.Contains(tn, thumbs.ThumbnailEndpoint) { claimMetadataVersion = 2 } - metadataDiffers := claimInDatabase && pv.MetadataVersion != int8(claimMetadataVersion) - claimIDDiffers := claimInDatabase && pv.ClaimID != c.ClaimID - claimNameDiffers := claimInDatabase && pv.ClaimName != c.Name - claimMarkedUnpublished := claimInDatabase && !pv.Published + videoIDMap[videoID] = ytsyncClaim{ + ClaimID: c.ClaimID, + MetadataVersion: claimMetadataVersion, + ClaimName: c.Name, + PublishAddress: c.Address, + VideoID: videoID, + Claim: &c, + } + } + return videoIDMap +} + +//updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published +//additionally it removes all entries in the database indicating that a video is published when it's actually not +func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim, ownClaims []jsonrpc.Claim) (total, fixed, removed int, err error) { + allClaimsInfo := s.mapFromClaims(claims) + ownClaimsInfo := s.mapFromClaims(ownClaims) + count := len(allClaimsInfo) + idsToRemove := make([]string, 0, count) + + for videoID, chainInfo := range allClaimsInfo { + s.syncedVideosMux.RLock() + sv, claimInDatabase := s.syncedVideos[videoID] + s.syncedVideosMux.RUnlock() + + metadataDiffers := claimInDatabase && sv.MetadataVersion != int8(chainInfo.MetadataVersion) + claimIDDiffers := claimInDatabase && sv.ClaimID != chainInfo.ClaimID + claimNameDiffers := claimInDatabase && sv.ClaimName != chainInfo.ClaimName + claimMarkedUnpublished := claimInDatabase && !sv.Published + _, isOwnClaim := ownClaimsInfo[videoID] + tranferred := !isOwnClaim + transferStatusMismatch := sv.Transferred != tranferred + if metadataDiffers { - log.Debugf("%s: Mismatch in database for metadata. DB: %d - Blockchain: %d", videoID, pv.MetadataVersion, claimMetadataVersion) + log.Debugf("%s: Mismatch in database for metadata. DB: %d - Blockchain: %d", videoID, sv.MetadataVersion, chainInfo.MetadataVersion) } if claimIDDiffers { - log.Debugf("%s: Mismatch in database for claimID. DB: %s - Blockchain: %s", videoID, pv.ClaimID, c.ClaimID) + log.Debugf("%s: Mismatch in database for claimID. DB: %s - Blockchain: %s", videoID, sv.ClaimID, chainInfo.ClaimID) } if claimNameDiffers { - log.Debugf("%s: Mismatch in database for claimName. DB: %s - Blockchain: %s", videoID, pv.ClaimName, c.Name) + log.Debugf("%s: Mismatch in database for claimName. DB: %s - Blockchain: %s", videoID, sv.ClaimName, chainInfo.ClaimName) } if claimMarkedUnpublished { log.Debugf("%s: Mismatch in database: published but marked as unpublished", videoID) } if !claimInDatabase { - log.Debugf("%s: Published but is not in database (%s - %s)", videoID, c.Name, c.ClaimID) + log.Debugf("%s: Published but is not in database (%s - %s)", videoID, chainInfo.ClaimName, chainInfo.ClaimID) } - if s.syncedVideos[videoID].Transferred && s.publishAddress != c.Address { - log.Debugf("%s: Marked as transferred while in fact it's not (%s - %s). Publish address: %s, expected: %s", videoID, c.Name, c.ClaimID, c.Address, s.publishAddress) + if transferStatusMismatch { + log.Debugf("%s: is marked as transferred %t on it's actually %t", videoID, sv.Transferred, tranferred) } - if !claimInDatabase || metadataDiffers || claimIDDiffers || claimNameDiffers || claimMarkedUnpublished { - claimSize, err := c.GetStreamSizeByMagic() + + if !claimInDatabase || metadataDiffers || claimIDDiffers || claimNameDiffers || claimMarkedUnpublished || transferStatusMismatch { + claimSize, err := chainInfo.Claim.GetStreamSizeByMagic() if err != nil { claimSize = 0 } @@ -565,24 +607,36 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total, fixed, removed int ChannelID: s.YoutubeChannelID, VideoID: videoID, Status: VideoStatusPublished, - ClaimID: c.ClaimID, - ClaimName: c.Name, + ClaimID: chainInfo.ClaimID, + ClaimName: chainInfo.ClaimName, Size: util.PtrToInt64(int64(claimSize)), - MetaDataVersion: claimMetadataVersion, - IsTransferred: util.PtrToBool(s.publishAddress == c.Address), + MetaDataVersion: chainInfo.MetadataVersion, + IsTransferred: &tranferred, }) if err != nil { return count, fixed, 0, err } } } - idsToRemove := make([]string, 0, len(videoIDMap)) + + //reload the synced videos map before we use it for further processing + if fixed > 0 { + err := s.setStatusSyncing() + if err != nil { + return count, fixed, 0, err + } + } + for vID, sv := range s.syncedVideos { if sv.Transferred { - log.Infof("%s: claim was transferred, ignoring", vID) + _, ok := allClaimsInfo[vID] + if !ok && sv.Published { + log.Warnf("%s: claims to be published and transferred but wasn't found in the list of claims", vID) + idsToRemove = append(idsToRemove, vID) + } continue } - _, ok := videoIDMap[vID] + _, ok := ownClaimsInfo[vID] if !ok && sv.Published { log.Debugf("%s: claims to be published but wasn't found in the list of claims and will be removed if --remove-db-unpublished was specified (%t)", vID, s.Manager.SyncFlags.RemoveDBUnpublished) idsToRemove = append(idsToRemove, vID) @@ -594,15 +648,31 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total, fixed, removed int if err != nil { return count, fixed, len(idsToRemove), err } + removed++ } - return count, fixed, 0, nil + //reload the synced videos map before we use it for further processing + if removed > 0 { + err := s.setStatusSyncing() + if err != nil { + return count, fixed, removed, err + } + } + return count, fixed, removed, nil } -func (s *Sync) getClaims() ([]jsonrpc.Claim, error) { +func (s *Sync) getClaims(defaultOnly bool) ([]jsonrpc.Claim, error) { totalPages := uint64(1) var allClaims []jsonrpc.Claim + var account *string = nil + if defaultOnly { + a, err := s.getDefaultAccount() + if err != nil { + return nil, err + } + account = &a + } for page := uint64(1); page <= totalPages; page++ { - claims, err := s.daemon.ClaimList(nil, page, 50) + claims, err := s.daemon.ClaimList(account, page, 50) if err != nil { return nil, errors.Prefix("cannot list claims", err) } @@ -613,7 +683,7 @@ func (s *Sync) getClaims() ([]jsonrpc.Claim, error) { } func (s *Sync) checkIntegrity() error { - allClaims, err := s.getClaims() + allClaims, err := s.getClaims(false) if err != nil { return err } @@ -627,22 +697,22 @@ func (s *Sync) checkIntegrity() error { if err != nil { return err } - allClaims, err = s.getClaims() + allClaims, err = s.getClaims(false) if err != nil { return err } } - pubsOnWallet, nFixed, nRemoved, err := s.updateRemoteDB(allClaims) + ownClaims, err := s.getClaims(true) + if err != nil { + return err + } + pubsOnWallet, nFixed, nRemoved, err := s.updateRemoteDB(allClaims, ownClaims) if err != nil { return errors.Prefix("error updating remote database", err) } if nFixed > 0 || nRemoved > 0 { - err := s.setStatusSyncing() - if err != nil { - return err - } if nFixed > 0 { logUtils.SendInfoToSlack("%d claims had mismatched database info or were completely missing and were fixed", nFixed) } @@ -664,6 +734,11 @@ func (s *Sync) checkIntegrity() error { if pubsOnWallet < pubsOnDB { logUtils.SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) } + + _, err = s.getUnsentSupports() //TODO: use the returned value when it works + if err != nil { + return err + } return nil } @@ -1083,6 +1158,51 @@ func (s *Sync) importPublicKey() error { return nil } +//TODO: fully implement this once I find a way to reliably get the abandoned supports amount +func (s *Sync) getUnsentSupports() (float64, error) { + defaultAccount, err := s.getDefaultAccount() + if err != nil { + return 0, errors.Err(err) + } + if s.transferState == 2 { + balance, err := s.daemon.AccountBalance(&defaultAccount) + if err != nil { + return 0, err + } else if balance == nil { + return 0, errors.Err("no response") + } + + balanceAmount, err := strconv.ParseFloat(balance.Available.String(), 64) + if err != nil { + return 0, errors.Err(err) + } + transactionList, err := s.daemon.TransactionList(&defaultAccount) + if err != nil { + return 0, errors.Err(err) + } + sentSupports := 0.0 + for _, t := range *transactionList { + if len(t.SupportInfo) == 0 { + continue + } + for _, support := range t.SupportInfo { + supportAmount, err := strconv.ParseFloat(support.BalanceDelta, 64) + if err != nil { + return 0, err + } + if supportAmount < 0 { // && support.IsTip TODO: re-enable this when transaction list shows correct information + sentSupports += -supportAmount + } + } + } + if balanceAmount > 10 && sentSupports < 1 { + logUtils.SendErrorToSlack("this channel has quite some LBCs in it (%.2f) and %.2f LBC in sent tips, it's likely that the tips weren't actually sent or the wallet has unnecessary extra credits in it", balanceAmount, sentSupports) + return balanceAmount - 10, nil + } + } + return 0, nil +} + // waitForDaemonProcess observes the running processes and returns when the process is no longer running or when the timeout is up func waitForDaemonProcess(timeout time.Duration) error { then := time.Now()