refactor integrity check

add checks for improper transfer state
add checks for unsent supports
only use default account for operations that only work on it
add more logging
This commit is contained in:
Niko Storni 2019-10-08 01:38:39 +02:00
parent 946314da94
commit 5171acc007
2 changed files with 199 additions and 75 deletions

View file

@ -11,10 +11,14 @@ import (
)
func waitConfirmations(s *Sync) error {
defaultAccount, err := s.getDefaultAccount()
if err != nil {
return err
}
allConfirmed := false
waiting:
for !allConfirmed {
utxolist, err := s.daemon.UTXOList(nil)
utxolist, err := s.daemon.UTXOList(&defaultAccount)
if err != nil {
return err
} else if utxolist == nil {
@ -165,5 +169,5 @@ func transferChannel(s *Sync) error {
}
log.Infof("TRANSFERRED %t", len(result.Outputs) != 0)
return errors.Err(err)
return nil
}

View file

@ -8,6 +8,7 @@ import (
"os/signal"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
"syscall"
@ -314,44 +315,53 @@ func (s *Sync) FullCycle() (e error) {
}
if s.shouldTransfer() {
err := waitConfirmations(s)
if err != nil {
return err
}
supportAmount, err := abandonSupports(s)
if err != nil {
return errors.Prefix(fmt.Sprintf("%.6f LBCs were abandoned before failing", supportAmount), err)
}
err = transferVideos(s)
if err != nil {
return err
}
err = transferChannel(s)
if err != nil {
return err
}
reallocateSupports := supportAmount > 0.01
if reallocateSupports {
err = waitConfirmations(s)
if err != nil {
return err
}
isTip := true
summary, err := s.daemon.SupportCreate(s.lbryChannelID, fmt.Sprintf("%.6f", supportAmount), &isTip, nil, nil)
if err != nil {
return errors.Err(err)
}
if len(summary.Outputs) < 1 {
return errors.Err("something went wrong while tipping the channel for %.6f LBCs", supportAmount)
}
}
return nil
return s.processTransfers()
}
return nil
}
func (s *Sync) processTransfers() (e error) {
log.Println("Processing transfers")
err := waitConfirmations(s)
if err != nil {
return err
}
supportAmount, err := abandonSupports(s)
if err != nil {
return errors.Prefix(fmt.Sprintf("%.6f LBCs were abandoned before failing", supportAmount), err)
}
if supportAmount > 0 {
logUtils.SendInfoToSlack("%.6f LBCs were abandoned and should be used as support", supportAmount)
}
err = transferVideos(s)
if err != nil {
return err
}
err = transferChannel(s)
if err != nil {
return err
}
reallocateSupports := supportAmount > 0.01
if reallocateSupports {
err = waitConfirmations(s)
if err != nil {
return err
}
isTip := true
summary, err := s.daemon.SupportCreate(s.lbryChannelID, fmt.Sprintf("%.6f", supportAmount), &isTip, nil, nil)
if err != nil {
return errors.Err(err)
}
if len(summary.Outputs) < 1 {
return errors.Err("something went wrong while tipping the channel for %.6f LBCs", supportAmount)
}
}
log.Println("Done processing transfers")
return nil
}
func deleteSyncFolder(videoDirectory string) {
if !strings.Contains(videoDirectory, "/tmp/ytsync") {
_ = util.SendToSlack(errors.Err("Trying to delete an unexpected directory: %s", videoDirectory).Error())
@ -510,52 +520,84 @@ func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) {
return abandonedClaims, nil
}
//updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published
//additionally it removes all entries in the database indicating that a video is published when it's actually not
func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total, fixed, removed int, err error) {
count := 0
videoIDMap := make(map[string]string, len(claims))
type ytsyncClaim struct {
ClaimID string
MetadataVersion uint
ClaimName string
PublishAddress string
VideoID string
Claim *jsonrpc.Claim
}
// mapFromClaims returns a map of videoIDs (youtube id) to ytsyncClaim which is a structure holding blockchain related
// information
func (s *Sync) mapFromClaims(claims []jsonrpc.Claim) map[string]ytsyncClaim {
videoIDMap := make(map[string]ytsyncClaim, len(claims))
for _, c := range claims {
if !isYtsyncClaim(c) {
continue
}
count++
//check if claimID is in remote db
tn := c.Value.GetThumbnail().GetUrl()
videoID := tn[strings.LastIndex(tn, "/")+1:]
videoIDMap[videoID] = c.ClaimID
s.syncedVideosMux.RLock()
pv, claimInDatabase := s.syncedVideos[videoID]
s.syncedVideosMux.RUnlock()
claimMetadataVersion := uint(1)
if strings.Contains(tn, thumbs.ThumbnailEndpoint) {
claimMetadataVersion = 2
}
metadataDiffers := claimInDatabase && pv.MetadataVersion != int8(claimMetadataVersion)
claimIDDiffers := claimInDatabase && pv.ClaimID != c.ClaimID
claimNameDiffers := claimInDatabase && pv.ClaimName != c.Name
claimMarkedUnpublished := claimInDatabase && !pv.Published
videoIDMap[videoID] = ytsyncClaim{
ClaimID: c.ClaimID,
MetadataVersion: claimMetadataVersion,
ClaimName: c.Name,
PublishAddress: c.Address,
VideoID: videoID,
Claim: &c,
}
}
return videoIDMap
}
//updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published
//additionally it removes all entries in the database indicating that a video is published when it's actually not
func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim, ownClaims []jsonrpc.Claim) (total, fixed, removed int, err error) {
allClaimsInfo := s.mapFromClaims(claims)
ownClaimsInfo := s.mapFromClaims(ownClaims)
count := len(allClaimsInfo)
idsToRemove := make([]string, 0, count)
for videoID, chainInfo := range allClaimsInfo {
s.syncedVideosMux.RLock()
sv, claimInDatabase := s.syncedVideos[videoID]
s.syncedVideosMux.RUnlock()
metadataDiffers := claimInDatabase && sv.MetadataVersion != int8(chainInfo.MetadataVersion)
claimIDDiffers := claimInDatabase && sv.ClaimID != chainInfo.ClaimID
claimNameDiffers := claimInDatabase && sv.ClaimName != chainInfo.ClaimName
claimMarkedUnpublished := claimInDatabase && !sv.Published
_, isOwnClaim := ownClaimsInfo[videoID]
tranferred := !isOwnClaim
transferStatusMismatch := sv.Transferred != tranferred
if metadataDiffers {
log.Debugf("%s: Mismatch in database for metadata. DB: %d - Blockchain: %d", videoID, pv.MetadataVersion, claimMetadataVersion)
log.Debugf("%s: Mismatch in database for metadata. DB: %d - Blockchain: %d", videoID, sv.MetadataVersion, chainInfo.MetadataVersion)
}
if claimIDDiffers {
log.Debugf("%s: Mismatch in database for claimID. DB: %s - Blockchain: %s", videoID, pv.ClaimID, c.ClaimID)
log.Debugf("%s: Mismatch in database for claimID. DB: %s - Blockchain: %s", videoID, sv.ClaimID, chainInfo.ClaimID)
}
if claimNameDiffers {
log.Debugf("%s: Mismatch in database for claimName. DB: %s - Blockchain: %s", videoID, pv.ClaimName, c.Name)
log.Debugf("%s: Mismatch in database for claimName. DB: %s - Blockchain: %s", videoID, sv.ClaimName, chainInfo.ClaimName)
}
if claimMarkedUnpublished {
log.Debugf("%s: Mismatch in database: published but marked as unpublished", videoID)
}
if !claimInDatabase {
log.Debugf("%s: Published but is not in database (%s - %s)", videoID, c.Name, c.ClaimID)
log.Debugf("%s: Published but is not in database (%s - %s)", videoID, chainInfo.ClaimName, chainInfo.ClaimID)
}
if s.syncedVideos[videoID].Transferred && s.publishAddress != c.Address {
log.Debugf("%s: Marked as transferred while in fact it's not (%s - %s). Publish address: %s, expected: %s", videoID, c.Name, c.ClaimID, c.Address, s.publishAddress)
if transferStatusMismatch {
log.Debugf("%s: is marked as transferred %t on it's actually %t", videoID, sv.Transferred, tranferred)
}
if !claimInDatabase || metadataDiffers || claimIDDiffers || claimNameDiffers || claimMarkedUnpublished {
claimSize, err := c.GetStreamSizeByMagic()
if !claimInDatabase || metadataDiffers || claimIDDiffers || claimNameDiffers || claimMarkedUnpublished || transferStatusMismatch {
claimSize, err := chainInfo.Claim.GetStreamSizeByMagic()
if err != nil {
claimSize = 0
}
@ -565,24 +607,36 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total, fixed, removed int
ChannelID: s.YoutubeChannelID,
VideoID: videoID,
Status: VideoStatusPublished,
ClaimID: c.ClaimID,
ClaimName: c.Name,
ClaimID: chainInfo.ClaimID,
ClaimName: chainInfo.ClaimName,
Size: util.PtrToInt64(int64(claimSize)),
MetaDataVersion: claimMetadataVersion,
IsTransferred: util.PtrToBool(s.publishAddress == c.Address),
MetaDataVersion: chainInfo.MetadataVersion,
IsTransferred: &tranferred,
})
if err != nil {
return count, fixed, 0, err
}
}
}
idsToRemove := make([]string, 0, len(videoIDMap))
//reload the synced videos map before we use it for further processing
if fixed > 0 {
err := s.setStatusSyncing()
if err != nil {
return count, fixed, 0, err
}
}
for vID, sv := range s.syncedVideos {
if sv.Transferred {
log.Infof("%s: claim was transferred, ignoring", vID)
_, ok := allClaimsInfo[vID]
if !ok && sv.Published {
log.Warnf("%s: claims to be published and transferred but wasn't found in the list of claims", vID)
idsToRemove = append(idsToRemove, vID)
}
continue
}
_, ok := videoIDMap[vID]
_, ok := ownClaimsInfo[vID]
if !ok && sv.Published {
log.Debugf("%s: claims to be published but wasn't found in the list of claims and will be removed if --remove-db-unpublished was specified (%t)", vID, s.Manager.SyncFlags.RemoveDBUnpublished)
idsToRemove = append(idsToRemove, vID)
@ -594,15 +648,31 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total, fixed, removed int
if err != nil {
return count, fixed, len(idsToRemove), err
}
removed++
}
return count, fixed, 0, nil
//reload the synced videos map before we use it for further processing
if removed > 0 {
err := s.setStatusSyncing()
if err != nil {
return count, fixed, removed, err
}
}
return count, fixed, removed, nil
}
func (s *Sync) getClaims() ([]jsonrpc.Claim, error) {
func (s *Sync) getClaims(defaultOnly bool) ([]jsonrpc.Claim, error) {
totalPages := uint64(1)
var allClaims []jsonrpc.Claim
var account *string = nil
if defaultOnly {
a, err := s.getDefaultAccount()
if err != nil {
return nil, err
}
account = &a
}
for page := uint64(1); page <= totalPages; page++ {
claims, err := s.daemon.ClaimList(nil, page, 50)
claims, err := s.daemon.ClaimList(account, page, 50)
if err != nil {
return nil, errors.Prefix("cannot list claims", err)
}
@ -613,7 +683,7 @@ func (s *Sync) getClaims() ([]jsonrpc.Claim, error) {
}
func (s *Sync) checkIntegrity() error {
allClaims, err := s.getClaims()
allClaims, err := s.getClaims(false)
if err != nil {
return err
}
@ -627,22 +697,22 @@ func (s *Sync) checkIntegrity() error {
if err != nil {
return err
}
allClaims, err = s.getClaims()
allClaims, err = s.getClaims(false)
if err != nil {
return err
}
}
pubsOnWallet, nFixed, nRemoved, err := s.updateRemoteDB(allClaims)
ownClaims, err := s.getClaims(true)
if err != nil {
return err
}
pubsOnWallet, nFixed, nRemoved, err := s.updateRemoteDB(allClaims, ownClaims)
if err != nil {
return errors.Prefix("error updating remote database", err)
}
if nFixed > 0 || nRemoved > 0 {
err := s.setStatusSyncing()
if err != nil {
return err
}
if nFixed > 0 {
logUtils.SendInfoToSlack("%d claims had mismatched database info or were completely missing and were fixed", nFixed)
}
@ -664,6 +734,11 @@ func (s *Sync) checkIntegrity() error {
if pubsOnWallet < pubsOnDB {
logUtils.SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
}
_, err = s.getUnsentSupports() //TODO: use the returned value when it works
if err != nil {
return err
}
return nil
}
@ -1083,6 +1158,51 @@ func (s *Sync) importPublicKey() error {
return nil
}
//TODO: fully implement this once I find a way to reliably get the abandoned supports amount
func (s *Sync) getUnsentSupports() (float64, error) {
defaultAccount, err := s.getDefaultAccount()
if err != nil {
return 0, errors.Err(err)
}
if s.transferState == 2 {
balance, err := s.daemon.AccountBalance(&defaultAccount)
if err != nil {
return 0, err
} else if balance == nil {
return 0, errors.Err("no response")
}
balanceAmount, err := strconv.ParseFloat(balance.Available.String(), 64)
if err != nil {
return 0, errors.Err(err)
}
transactionList, err := s.daemon.TransactionList(&defaultAccount)
if err != nil {
return 0, errors.Err(err)
}
sentSupports := 0.0
for _, t := range *transactionList {
if len(t.SupportInfo) == 0 {
continue
}
for _, support := range t.SupportInfo {
supportAmount, err := strconv.ParseFloat(support.BalanceDelta, 64)
if err != nil {
return 0, err
}
if supportAmount < 0 { // && support.IsTip TODO: re-enable this when transaction list shows correct information
sentSupports += -supportAmount
}
}
}
if balanceAmount > 10 && sentSupports < 1 {
logUtils.SendErrorToSlack("this channel has quite some LBCs in it (%.2f) and %.2f LBC in sent tips, it's likely that the tips weren't actually sent or the wallet has unnecessary extra credits in it", balanceAmount, sentSupports)
return balanceAmount - 10, nil
}
}
return 0, nil
}
// waitForDaemonProcess observes the running processes and returns when the process is no longer running or when the timeout is up
func waitForDaemonProcess(timeout time.Duration) error {
then := time.Now()