add dupes patcher and database patcher

add claim_abandon function
refactor logic
This commit is contained in:
Niko Storni 2018-09-19 19:05:47 -04:00
parent 10f9cc1ba0
commit 6eca7bfb76
2 changed files with 76 additions and 28 deletions

View file

@ -48,7 +48,7 @@ func (s *Sync) walletSetup() error {
s.syncedVideosMux.RLock() s.syncedVideosMux.RLock()
numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones... numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
s.syncedVideosMux.RUnlock() s.syncedVideosMux.RUnlock()
log.Debugf("We already published %d videos", numPublished) log.Debugf("We already allocated credits for %d videos", numPublished)
if numOnSource-numPublished > s.Manager.VideosLimit { if numOnSource-numPublished > s.Manager.VideosLimit {
numOnSource = s.Manager.VideosLimit numOnSource = s.Manager.VideosLimit

102
ytsync.go
View file

@ -220,6 +220,18 @@ func (s *Sync) uploadWallet() error {
return os.Remove(defaultWalletDir) return os.Remove(defaultWalletDir)
} }
func (s *Sync) setStatusSyncing() error {
syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
if err != nil {
return err
}
s.syncedVideosMux.Lock()
s.syncedVideos = syncedVideos
s.claimNames = claimNames
s.syncedVideosMux.Unlock()
return nil
}
func (s *Sync) FullCycle() (e error) { func (s *Sync) FullCycle() (e error) {
if os.Getenv("HOME") == "" { if os.Getenv("HOME") == "" {
return errors.Err("no $HOME env var found") return errors.Err("no $HOME env var found")
@ -239,16 +251,12 @@ func (s *Sync) FullCycle() (e error) {
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)") log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
s.grp.Stop() s.grp.Stop()
}() }()
syncedVideos, claimNames, err := s.Manager.setChannelStatus(s.YoutubeChannelID, StatusSyncing, "") err := s.setStatusSyncing()
if err != nil { if err != nil {
return err return err
} }
s.syncedVideosMux.Lock()
s.syncedVideos = syncedVideos
s.claimNames = claimNames
s.syncedVideosMux.Unlock()
defer s.updateChannelStatus(&e) defer s.setChannelTerminationStatus(&e)
err = s.downloadWallet() err = s.downloadWallet()
if err != nil && err.Error() != "wallet not on S3" { if err != nil && err.Error() != "wallet not on S3" {
@ -293,7 +301,7 @@ func (s *Sync) FullCycle() (e error) {
return nil return nil
} }
func (s *Sync) updateChannelStatus(e *error) { func (s *Sync) setChannelTerminationStatus(e *error) {
if *e != nil { if *e != nil {
//conditions for which a channel shouldn't be marked as failed //conditions for which a channel shouldn't be marked as failed
noFailConditions := []string{ noFailConditions := []string{
@ -362,8 +370,10 @@ func logShutdownError(shutdownErr error) {
SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR") SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
} }
func hasDupes(claims []jsonrpc.Claim) (bool, error) { // fixDupes abandons duplicate claims
videoIDs := make(map[string]string) func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) {
abandonedClaims := false
videoIDs := make(map[string]jsonrpc.Claim)
for _, c := range claims { for _, c := range claims {
if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil { if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil {
continue continue
@ -375,29 +385,53 @@ func hasDupes(claims []jsonrpc.Claim) (bool, error) {
videoID := tn[strings.LastIndex(tn, "/")+1:] videoID := tn[strings.LastIndex(tn, "/")+1:]
log.Infof("claimid: %s, claimName: %s, videoID: %s", c.ClaimID, c.Name, videoID) log.Infof("claimid: %s, claimName: %s, videoID: %s", c.ClaimID, c.Name, videoID)
cID, ok := videoIDs[videoID] cl, ok := videoIDs[videoID]
if !ok || cID == c.ClaimID { if !ok || cl.ClaimID == c.ClaimID {
videoIDs[videoID] = c.ClaimID videoIDs[videoID] = c
continue continue
} }
return true, nil // only keep the most recent one
claimToAbandon := c
videoIDs[videoID] = cl
if c.Height > cl.Height {
claimToAbandon = cl
videoIDs[videoID] = c
} }
return false, nil _, err := s.daemon.ClaimAbandon(claimToAbandon.Txid, claimToAbandon.Nout)
if err != nil {
return true, err
}
log.Debugf("abandoning %+v", claimToAbandon)
abandonedClaims = true
//return true, nil
}
return abandonedClaims, nil
} }
//publishesCount counts the amount of videos published so far //updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published
func publishesCount(claims []jsonrpc.Claim) (int, error) { func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err error) {
count := 0 count := 0
for _, c := range claims { for _, c := range claims {
if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil { if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil {
continue continue
} }
if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil { if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil {
return count, errors.Err("something is wrong with the this claim: %s", c.ClaimID) return count, fixed, errors.Err("something is wrong with the this claim: %s", c.ClaimID)
} }
count++ //check if claimID is in remote db
tn := *c.Value.Stream.Metadata.Thumbnail
videoID := tn[strings.LastIndex(tn, "/")+1:]
pv, ok := s.syncedVideos[videoID]
if !ok || pv.ClaimName != c.Name {
fixed++
err = s.Manager.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil)
if err != nil {
return total, fixed, err
} }
return count, nil }
total++
}
return total, fixed, nil
} }
func (s *Sync) doSync() error { func (s *Sync) doSync() error {
@ -406,29 +440,43 @@ func (s *Sync) doSync() error {
if err != nil { if err != nil {
return errors.Prefix("cannot list claims: ", err) return errors.Prefix("cannot list claims: ", err)
} }
hasDupes, err := hasDupes(*claims) hasDupes, err := s.fixDupes(*claims)
if err != nil { if err != nil {
return errors.Prefix("error checking for duplicates: ", err) return errors.Prefix("error checking for duplicates: ", err)
} }
if hasDupes { if hasDupes {
return errors.Err("channel has duplicates! Manual fix required") SendInfoToSlack("Channel had dupes and was fixed!")
claims, err = s.daemon.ClaimListMine()
if err != nil {
return errors.Prefix("cannot list claims: ", err)
} }
pubsOnWallet, err := publishesCount(*claims) }
pubsOnWallet, nFixed, err := s.updateRemoteDB(*claims)
if err != nil { if err != nil {
return errors.Prefix("error counting claims: ", err) return errors.Prefix("error counting claims: ", err)
} }
if nFixed > 0 {
err := s.setStatusSyncing()
if err != nil {
return err
}
SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed)
}
pubsOnDB := 0 pubsOnDB := 0
for _, sv := range s.syncedVideos { for _, sv := range s.syncedVideos {
if sv.Published { if sv.Published {
pubsOnDB++ pubsOnDB++
} }
} }
//if pubsOnWallet > pubsOnDB {
// SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) if pubsOnWallet > pubsOnDB { //This case should never happen
// return errors.Err("not all published videos are in the database") SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
// } return errors.Err("not all published videos are in the database")
}
if pubsOnWallet < pubsOnDB { if pubsOnWallet < pubsOnDB {
SendInfoToSlack("We're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID) SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
} }
err = s.walletSetup() err = s.walletSetup()
if err != nil { if err != nil {