Compare commits

...

41 commits

Author SHA1 Message Date
Niko Storni
78d5c8c6fa never retry those hardcoded errors 2022-08-16 03:06:39 +02:00
Niko Storni
caca92a6bc idk... let's work around this for now 2022-08-16 02:51:35 +02:00
Niko Storni
9ef1b7800b fix nil ptr? 2022-08-16 02:41:14 +02:00
Niko Storni
ea3315d1d6 this is actually necessary 2022-08-16 02:14:33 +02:00
Niko Storni
68132c65a9 fix channel update process 2022-08-15 22:42:06 +02:00
Niko Storni
57e017ec8f fix some logic
reduce verbosity of debug logs
2022-08-15 21:19:20 +02:00
Niko Storni
42db3782ec fix issue with yt not returning a date 2022-08-11 18:01:55 +02:00
Niko Storni
9d93799d86 account for nil struct 2022-08-11 05:28:06 +02:00
Niko
d93f463386
fix odd error with morty's date 2022-08-11 00:33:40 +02:00
Niko Storni
77988c1682 woops 2022-08-10 22:01:50 +02:00
Niko Storni
c79e07c9fa change recent livestreams logic slightly 2022-08-10 21:55:29 +02:00
Niko Storni
e454cdb4c9 fix post live detection
prevent unlisted videos from ever publishing (even if they were public before and we know about them)
fix timestamp on videos
update user agent
2022-08-10 21:26:36 +02:00
Niko Storni
98a10d1269 fix for channel creation bug 2022-08-10 18:23:37 +02:00
Niko Storni
4f6748ae83 fix bug in channel updates 2022-08-10 17:27:27 +02:00
Niko Storni
c1b2117df5 fix vuln 2022-08-09 22:27:00 +02:00
Niko Storni
c4207338c8 fix const bug
update dependencies
2022-08-09 22:17:03 +02:00
Niko Storni
5a01983203 add language to channels
improve logging (timestamps)
retry wallet uploads for 30 minutes
don't fail if the db isn't tracking all publishes
2022-08-09 22:11:42 +02:00
Niko Storni
ee8eb83d07 fix dependency vuln 2022-05-10 23:36:00 +02:00
Niko Storni
8d0f762067 change description length 2022-05-10 23:09:23 +02:00
Niko Storni
8fa1482d18 adjust limits 2022-05-09 20:23:44 +02:00
Niko Storni
00ae404642 exclude audio ac-3 codec not working in chrome 2022-05-09 20:05:05 +02:00
Niko Storni
230bfe4a41 fix go version in travis 2022-05-04 19:22:19 +02:00
Niko Storni
df33fb9263 fix api failures
update dependencies
fix e2e
2022-05-04 19:17:36 +02:00
Niko Storni
d72be1d920 add new status 2022-03-03 18:59:32 +01:00
Niko Storni
7d12a90139 avoid ec-3 audio codec 2022-02-09 17:06:51 +01:00
Niko
e1689a2a6c
Merge pull request #116 from e4drcf/synced-video-statuses
add shared video sync statuses array for validation purposes
2022-02-07 20:03:29 +01:00
Ivan
a8a6347d52 add shared video sync statuses array for validation purposes 2022-02-07 20:17:41 +02:00
Niko Storni
bdee1b4092 fix math to avoid negative balances 2022-01-26 07:43:09 +01:00
Niko Storni
0d0d39380c fix spend amounts to save credits 2022-01-26 07:11:26 +01:00
Niko Storni
7ff1a009da remove unused params 2022-01-14 18:49:02 +01:00
Niko Storni
e3a332c7e1 upgrade dependencies 2022-01-14 18:10:58 +01:00
Niko Storni
33ee6e4b94 Merge remote-tracking branch 'origin/metadata_fix' 2022-01-14 17:20:22 +01:00
Niko Storni
f6cde976a6 fix metadata? 2022-01-06 15:15:04 +01:00
Niko Storni
17944fa46a refactor get video time
remove broken time lookup
refactor quite some code
2021-12-30 13:17:11 -05:00
Niko Storni
3c18ae8de2 add checks for buggy livestreams 2021-12-29 17:47:46 -05:00
Niko Storni
84790720ff improve error handling
retry wallet uploads on failure
2021-12-02 16:59:14 +01:00
Niko
23690731af
fix bug when uploading wallet 2021-11-30 02:53:59 +01:00
Niko Storni
75628d8530 delete tars after use 2021-11-25 04:26:04 +01:00
Niko Storni
6e819b20f6 update readme 2021-11-24 18:58:38 +01:00
Niko Storni
da0b6e5b79 add example config 2021-11-24 18:39:24 +01:00
Niko Storni
28791f317b update gitignore
improve error logging to slack
fix regression in dev builds
2021-11-24 18:37:04 +01:00
23 changed files with 1061 additions and 575 deletions

4
.gitignore vendored
View file

@ -4,3 +4,7 @@ e2e/supporty/supporty
.env .env
blobsfiles blobsfiles
ytsync_docker ytsync_docker
e2e/config.json
e2e/cookies.txt

View file

@ -2,7 +2,7 @@ os: linux
dist: bionic dist: bionic
language: go language: go
go: go:
- 1.16.3 - 1.17.x
install: true install: true

View file

@ -8,23 +8,17 @@ With the support of said database, the tool is also able to keep all the channel
# Requirements # Requirements
- lbrynet SDK https://github.com/lbryio/lbry/releases (We strive to keep the latest release of ytsync compatible with the latest major release of the SDK) - lbrynet SDK https://github.com/lbryio/lbry-sdk/releases (We strive to keep the latest release of ytsync compatible with the latest major release of the SDK)
- a lbrycrd node running (localhost or on a remote machine) with credits in it - a lbrycrd node running (localhost or on a remote machine) with credits in it
- internal-apis (you cannot run this one yourself)
- python3-pip
- yt-dlp (`pip3 install -U yt-dlp`)
- ffmpeg (latest)
# Setup # Setup
- make sure daemon is stopped and can be controlled through `systemctl` (find example below) - make sure daemon is stopped and can be controlled through `systemctl` (find example below)
- extract the ytsync binary anywhere - extract the ytsync binary anywhere
- add the environment variables necessary to the tool - create and fill `config.json` using [this example](config.json.example)
- export SLACK_TOKEN="a-token-to-spam-your-slack"
- export SLACK_CHANNEL="youtube-status"
- export YOUTUBE_API_KEY="youtube-api-key"
- export LBRY_WEB_API="https://lbry-api-url-here"
- export LBRY_API_TOKEN="internal-apis-token-for-ytsync-user"
- export LBRYCRD_STRING="tcp://user:password@host:5429"
- export AWS_S3_ID="THE-ID-LIES-HERE"
- export AWS_S3_SECRET="THE-SECRET-LIES-HERE"
- export AWS_S3_REGION="us-east-1"
- export AWS_S3_BUCKET="ytsync-wallets"
## systemd script example ## systemd script example
`/etc/systemd/system/lbrynet.service` `/etc/systemd/system/lbrynet.service`
@ -55,23 +49,26 @@ Usage:
Flags: Flags:
--after int Specify from when to pull jobs [Unix time](Default: 0) --after int Specify from when to pull jobs [Unix time](Default: 0)
--before int Specify until when to pull jobs [Unix time](Default: current Unix time) (default current timestamp) --before int Specify until when to pull jobs [Unix time](Default: current Unix time) (default 1669311891)
--channelID string If specified, only this channel will be synced. --channelID string If specified, only this channel will be synced.
--concurrent-jobs int how many jobs to process concurrently (default 1) --concurrent-jobs int how many jobs to process concurrently (default 1)
-h, --help help for ytsync -h, --help help for ytsync
--limit int limit the amount of channels to sync --limit int limit the amount of channels to sync
--max-length float Maximum video length to process (in hours) (default 2) --max-length int Maximum video length to process (in hours) (default 2)
--max-size int Maximum video size to process (in MB) (default 2048) --max-size int Maximum video size to process (in MB) (default 2048)
--max-tries int Number of times to try a publish that fails (default 3) --max-tries int Number of times to try a publish that fails (default 3)
--no-transfers Skips the transferring process of videos, channels and supports
--quick Look up only the last 50 videos from youtube
--remove-db-unpublished Remove videos from the database that are marked as published but aren't really published --remove-db-unpublished Remove videos from the database that are marked as published but aren't really published
--run-once Whether the process should be stopped after one cycle or not --run-once Whether the process should be stopped after one cycle or not
--skip-space-check Do not perform free space check on startup --skip-space-check Do not perform free space check on startup
--status string Specify which queue to pull from. Overrides --update --status string Specify which queue to pull from. Overrides --update
--stop-on-error If a publish fails, stop all publishing and exit --status2 string Specify which secondary queue to pull from.
--takeover-existing-channel If channel exists and we don't own it, take over the channel --takeover-existing-channel If channel exists and we don't own it, take over the channel
--update Update previously synced channels instead of syncing new ones --update Update previously synced channels instead of syncing new ones
--upgrade-metadata Upgrade videos if they're on the old metadata version --upgrade-metadata Upgrade videos if they're on the old metadata version
--videos-limit int how many videos to process per channel (default 1000) --videos-limit int how many videos to process per channel (leave 0 for automatic detection)
``` ```
## Running from Source ## Running from Source
@ -92,13 +89,13 @@ We take security seriously. Please contact [security@lbry.io](mailto:security@lb
## Contact ## Contact
The primary contact for this project is [Niko Storni](https://github.com/nikooo777) (niko@lbry.io). The primary contact for this project is [Niko Storni](https://github.com/nikooo777) (niko@lbry.com).
## Additional Info and Links ## Additional Info and Links
- [https://lbry.io](https://lbry.io) - The live LBRY website - [https://lbry.com](https://lbry.com) - The live LBRY website
- [Discord Chat](https://chat.lbry.io) - A chat room for the LBRYians - [Discord Chat](https://chat.lbry.com) - A chat room for the LBRYians
- [Email us](mailto:hello@lbry.io) - LBRY Support email - [Email us](mailto:hello@lbry.com) - LBRY Support email
- [Twitter](https://twitter.com/@lbryio) - LBRY Twitter page - [Twitter](https://twitter.com/@lbryio) - LBRY Twitter page
- [Facebook](https://www.facebook.com/lbryio/) - LBRY Facebook page - [Facebook](https://www.facebook.com/lbryio/) - LBRY Facebook page
- [Reddit](https://reddit.com/r/lbry) - LBRY Reddit page - [Reddit](https://reddit.com/r/lbry) - LBRY Reddit page

35
config.json.example Normal file
View file

@ -0,0 +1,35 @@
{
"slack_token": "",
"slack_channel": "ytsync-dev",
"internal_apis_endpoint": "http://localhost:15400",
"internal_apis_auth_token": "ytsyntoken",
"lbrycrd_string": "tcp://lbry:lbry@localhost:15200",
"wallet_s3_config": {
"id": "",
"secret": "",
"region": "us-east-1",
"bucket": "ytsync-wallets",
"endpoint": ""
},
"blockchaindb_s3_config": {
"id": "",
"secret": "",
"region": "us-east-1",
"bucket": "blockchaindbs",
"endpoint": ""
},
"thumbnails_s3_config": {
"id": "",
"secret": "",
"region": "us-east-1",
"bucket": "thumbnails.lbry.com",
"endpoint": ""
},
"aws_thumbnails_s3_config": {
"id": "",
"secret": "",
"region": "us-east-1",
"bucket": "thumbnails.lbry.com",
"endpoint": ""
}
}

View file

@ -1,10 +1,14 @@
package configs package configs
import ( import (
"os"
"regexp"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials"
log "github.com/sirupsen/logrus"
"github.com/tkanos/gonfig" "github.com/tkanos/gonfig"
) )
@ -50,3 +54,22 @@ func (s *S3Configs) GetS3AWSConfig() *aws.Config {
S3ForcePathStyle: aws.Bool(true), S3ForcePathStyle: aws.Bool(true),
} }
} }
func (c *Configs) GetHostname() string {
var hostname string
var err error
hostname, err = os.Hostname()
if err != nil {
log.Error("could not detect system hostname")
hostname = "ytsync_unknown"
}
reg, err := regexp.Compile("[^a-zA-Z0-9_]+")
if err == nil {
hostname = reg.ReplaceAllString(hostname, "_")
}
if len(hostname) > 30 {
hostname = hostname[0:30]
}
return hostname
}

View file

@ -39,7 +39,6 @@ func GetPlaylistVideoIDs(channelName string, maxVideos int, stopChan stop.Chan,
if v == "" { if v == "" {
continue continue
} }
logrus.Debugf("%d - video id %s", i, v)
if i >= maxVideos { if i >= maxVideos {
break break
} }
@ -50,7 +49,7 @@ func GetPlaylistVideoIDs(channelName string, maxVideos int, stopChan stop.Chan,
const releaseTimeFormat = "2006-01-02, 15:04:05 (MST)" const releaseTimeFormat = "2006-01-02, 15:04:05 (MST)"
func GetVideoInformation(config *sdk.APIConfig, videoID string, stopChan stop.Chan, ip *net.TCPAddr, pool *ip_manager.IPPool) (*ytdl.YtdlVideo, error) { func GetVideoInformation(videoID string, stopChan stop.Chan, pool *ip_manager.IPPool) (*ytdl.YtdlVideo, error) {
args := []string{ args := []string{
"--skip-download", "--skip-download",
"--write-info-json", "--write-info-json",
@ -80,50 +79,6 @@ func GetVideoInformation(config *sdk.APIConfig, videoID string, stopChan stop.Ch
return nil, errors.Err(err) return nil, errors.Err(err)
} }
// now get an accurate time
const maxTries = 5
tries := 0
GetTime:
tries++
t, err := getUploadTime(config, videoID, ip, video.UploadDate)
if err != nil {
//slack(":warning: Upload time error: %v", err)
if tries <= maxTries && (errors.Is(err, errNotScraped) || errors.Is(err, errUploadTimeEmpty) || errors.Is(err, errStatusParse) || errors.Is(err, errConnectionIssue)) {
err := triggerScrape(videoID, ip)
if err == nil {
time.Sleep(2 * time.Second) // let them scrape it
goto GetTime
} else {
//slack("triggering scrape returned error: %v", err)
}
} else if !errors.Is(err, errNotScraped) && !errors.Is(err, errUploadTimeEmpty) {
//slack(":warning: Error while trying to get accurate upload time for %s: %v", videoID, err)
if t == "" {
return nil, errors.Err(err)
} else {
t = "" //TODO: get rid of the other piece below?
}
}
// do fallback below
}
//slack("After all that, upload time for %s is %s", videoID, t)
if t != "" {
parsed, err := time.Parse("2006-01-02, 15:04:05 (MST)", t) // this will probably be UTC, but Go's timezone parsing is fucked up. it ignores the timezone in the date
if err != nil {
return nil, errors.Err(err)
}
//slack(":exclamation: Got an accurate time for %s", videoID)
video.UploadDateForReal = parsed
} else { //TODO: this is the piece that isn't needed!
slack(":warning: Could not get accurate time for %s. Falling back to time from upload ytdl: %s.", videoID, video.UploadDate)
// fall back to UploadDate from youtube-dl
video.UploadDateForReal, err = time.Parse("20060102", video.UploadDate)
if err != nil {
return nil, err
}
}
return video, nil return video, nil
} }
@ -213,46 +168,8 @@ func getUploadTime(config *sdk.APIConfig, videoID string, ip *net.TCPAddr, uploa
} }
} }
if time.Now().AddDate(0, 0, -3).After(ytdlUploadDate) {
return ytdlUploadDate.Format(releaseTimeFormat), nil return ytdlUploadDate.Format(releaseTimeFormat), nil
} }
client := getClient(ip)
req, err := http.NewRequest(http.MethodGet, "https://caa.iti.gr/get_verificationV3?url=https://www.youtube.com/watch?v="+videoID, nil)
if err != nil {
return ytdlUploadDate.Format(releaseTimeFormat), errors.Err(err)
}
req.Header.Set("User-Agent", ChromeUA)
res, err := client.Do(req)
if err != nil {
return ytdlUploadDate.Format(releaseTimeFormat), errors.Err(err)
}
defer res.Body.Close()
var uploadTime struct {
Time string `json:"video_upload_time"`
Message string `json:"message"`
Status string `json:"status"`
}
err = json.NewDecoder(res.Body).Decode(&uploadTime)
if err != nil {
return ytdlUploadDate.Format(releaseTimeFormat), errors.Err(err)
}
if uploadTime.Status == "ERROR1" {
return ytdlUploadDate.Format(releaseTimeFormat), errNotScraped
}
if uploadTime.Status == "" && strings.HasPrefix(uploadTime.Message, "CANNOT_RETRIEVE_REPORT_FOR_VIDEO_") {
return ytdlUploadDate.Format(releaseTimeFormat), errors.Err("cannot retrieve report for video")
}
if uploadTime.Time == "" {
return ytdlUploadDate.Format(releaseTimeFormat), errUploadTimeEmpty
}
return uploadTime.Time, nil
}
func getClient(ip *net.TCPAddr) *http.Client { func getClient(ip *net.TCPAddr) *http.Client {
if ip == nil { if ip == nil {
@ -277,7 +194,7 @@ func getClient(ip *net.TCPAddr) *http.Client {
const ( const (
GoogleBotUA = "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)" GoogleBotUA = "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
ChromeUA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36" ChromeUA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36"
maxAttempts = 3 maxAttempts = 3
extractionError = "YouTube said: Unable to extract video data" extractionError = "YouTube said: Unable to extract video data"
throttledError = "HTTP Error 429" throttledError = "HTTP Error 429"
@ -368,7 +285,8 @@ func runCmd(cmd *exec.Cmd, stopChan stop.Chan) ([]string, error) {
return nil, errors.Err("interrupted by user") return nil, errors.Err("interrupted by user")
case err := <-done: case err := <-done:
if err != nil { if err != nil {
return nil, errors.Prefix("yt-dlp "+strings.Join(cmd.Args, " ")+" ["+string(errorLog)+"]", err) //return nil, errors.Prefix("yt-dlp "+strings.Join(cmd.Args, " ")+" ["+string(errorLog)+"]", err)
return nil, errors.Prefix(string(errorLog), err)
} }
return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil
} }

View file

@ -3,7 +3,11 @@ package downloader
import ( import (
"testing" "testing"
"github.com/lbryio/ytsync/v5/ip_manager"
"github.com/lbryio/ytsync/v5/sdk" "github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -19,19 +23,18 @@ func TestGetPlaylistVideoIDs(t *testing.T) {
} }
func TestGetVideoInformation(t *testing.T) { func TestGetVideoInformation(t *testing.T) {
video, err := GetVideoInformation(nil, "zj7pXM9gE5M", nil, nil, nil) s := stop.New()
if err != nil { ip, err := ip_manager.GetIPPool(s)
logrus.Error(err) assert.NoError(t, err)
} video, err := GetVideoInformation("kDGOHNpRjzc", s.Ch(), ip)
if video != nil { assert.NoError(t, err)
assert.NotNil(t, video)
logrus.Info(video.ID) logrus.Info(video.ID)
} }
}
func Test_getUploadTime(t *testing.T) { func Test_getUploadTime(t *testing.T) {
configs := sdk.APIConfig{} configs := sdk.APIConfig{}
got, err := getUploadTime(&configs, "kDGOHNpRjzc", nil, "20060102") got, err := getUploadTime(&configs, "kDGOHNpRjzc", nil, "20060102")
assert.NoError(t, err) assert.NoError(t, err)
t.Log(got) t.Log(got)
} }

View file

@ -2,149 +2,136 @@ package ytdl
import ( import (
"time" "time"
"github.com/lbryio/ytsync/v5/sdk"
"github.com/sirupsen/logrus"
) )
type YtdlVideo struct { type YtdlVideo struct {
UploadDate string `json:"upload_date"`
UploadDateForReal time.Time // you need to manually set this since the value in the API doesn't include the time
Extractor string `json:"extractor"`
Series interface{} `json:"series"`
Format string `json:"format"`
Vbr interface{} `json:"vbr"`
Chapters interface{} `json:"chapters"`
Height int `json:"height"`
LikeCount interface{} `json:"like_count"`
Duration int `json:"duration"`
Fulltitle string `json:"fulltitle"`
PlaylistIndex interface{} `json:"playlist_index"`
Album interface{} `json:"album"`
ViewCount int `json:"view_count"`
Playlist interface{} `json:"playlist"`
Title string `json:"title"`
Filename string `json:"_filename"`
Creator interface{} `json:"creator"`
Ext string `json:"ext"`
ID string `json:"id"` ID string `json:"id"`
DislikeCount interface{} `json:"dislike_count"` Title string `json:"title"`
AverageRating float64 `json:"average_rating"`
Abr float64 `json:"abr"`
UploaderURL string `json:"uploader_url"`
Categories []string `json:"categories"`
Fps float64 `json:"fps"`
StretchedRatio interface{} `json:"stretched_ratio"`
SeasonNumber interface{} `json:"season_number"`
Annotations interface{} `json:"annotations"`
WebpageURLBasename string `json:"webpage_url_basename"`
Acodec string `json:"acodec"`
DisplayID string `json:"display_id"`
//RequestedFormats []RequestedFormat `json:"requested_formats"`
//AutomaticCaptions struct{} `json:"automatic_captions"`
Description string `json:"description"`
Tags []string `json:"tags"`
Track interface{} `json:"track"`
RequestedSubtitles interface{} `json:"requested_subtitles"`
StartTime interface{} `json:"start_time"`
Uploader string `json:"uploader"`
ExtractorKey string `json:"extractor_key"`
FormatID string `json:"format_id"`
EpisodeNumber interface{} `json:"episode_number"`
UploaderID string `json:"uploader_id"`
//Subtitles struct{} `json:"subtitles"`
ReleaseYear interface{} `json:"release_year"`
Thumbnails []Thumbnail `json:"thumbnails"` Thumbnails []Thumbnail `json:"thumbnails"`
License interface{} `json:"license"` Description string `json:"description"`
Artist interface{} `json:"artist"`
AgeLimit int `json:"age_limit"`
ReleaseDate interface{} `json:"release_date"`
AltTitle interface{} `json:"alt_title"`
Thumbnail string `json:"thumbnail"`
ChannelID string `json:"channel_id"` ChannelID string `json:"channel_id"`
IsLive interface{} `json:"is_live"` Duration int `json:"duration"`
Width int `json:"width"` Categories []string `json:"categories"`
EndTime interface{} `json:"end_time"` Tags []string `json:"tags"`
WebpageURL string `json:"webpage_url"` IsLive bool `json:"is_live"`
Formats []Format `json:"formats"` LiveStatus string `json:"live_status"`
ChannelURL string `json:"channel_url"` ReleaseTimestamp *int64 `json:"release_timestamp"`
Resolution interface{} `json:"resolution"` uploadDateForReal *time.Time
Vcodec string `json:"vcodec"` Availability string `json:"availability"`
} ReleaseDate string `json:"release_date"`
UploadDate string `json:"upload_date"`
type RequestedFormat struct { //WasLive bool `json:"was_live"`
Asr interface{} `json:"asr"` //Formats interface{} `json:"formats"`
Tbr float64 `json:"tbr"` //Thumbnail string `json:"thumbnail"`
Container string `json:"container"` //Uploader string `json:"uploader"`
Language interface{} `json:"language"` //UploaderID string `json:"uploader_id"`
Format string `json:"format"` //UploaderURL string `json:"uploader_url"`
URL string `json:"url"` //ChannelURL string `json:"channel_url"`
Vcodec string `json:"vcodec"` //ViewCount int `json:"view_count"`
FormatNote string `json:"format_note"` //AverageRating interface{} `json:"average_rating"`
Height int `json:"height"` //AgeLimit int `json:"age_limit"`
Width int `json:"width"` //WebpageURL string `json:"webpage_url"`
Ext string `json:"ext"` //PlayableInEmbed bool `json:"playable_in_embed"`
FragmentBaseURL string `json:"fragment_base_url"` //AutomaticCaptions interface{} `json:"automatic_captions"`
Filesize interface{} `json:"filesize"` //Subtitles interface{} `json:"subtitles"`
Fps float64 `json:"fps"` //Chapters interface{} `json:"chapters"`
ManifestURL string `json:"manifest_url"` //LikeCount int `json:"like_count"`
Protocol string `json:"protocol"` //Channel string `json:"channel"`
FormatID string `json:"format_id"` //ChannelFollowerCount int `json:"channel_follower_count"`
HTTPHeaders struct { //OriginalURL string `json:"original_url"`
AcceptCharset string `json:"Accept-Charset"` //WebpageURLBasename string `json:"webpage_url_basename"`
AcceptLanguage string `json:"Accept-Language"` //WebpageURLDomain string `json:"webpage_url_domain"`
AcceptEncoding string `json:"Accept-Encoding"` //Extractor string `json:"extractor"`
Accept string `json:"Accept"` //ExtractorKey string `json:"extractor_key"`
UserAgent string `json:"User-Agent"` //Playlist interface{} `json:"playlist"`
} `json:"http_headers"` //PlaylistIndex interface{} `json:"playlist_index"`
Fragments []struct { //DisplayID string `json:"display_id"`
Path string `json:"path"` //Fulltitle string `json:"fulltitle"`
Duration float64 `json:"duration,omitempty"` //DurationString string `json:"duration_string"`
} `json:"fragments"` //RequestedSubtitles interface{} `json:"requested_subtitles"`
Acodec string `json:"acodec"` //HasDrm bool `json:"__has_drm"`
Abr int `json:"abr,omitempty"` //RequestedFormats interface{} `json:"requested_formats"`
} //Format string `json:"format"`
//FormatID string `json:"format_id"`
type Format struct { //Ext string `json:"ext"`
Asr int `json:"asr"` //Protocol string `json:"protocol"`
Filesize int `json:"filesize"` //Language interface{} `json:"language"`
FormatID string `json:"format_id"` //FormatNote string `json:"format_note"`
FormatNote string `json:"format_note"` //FilesizeApprox int `json:"filesize_approx"`
Fps interface{} `json:"fps"` //Tbr float64 `json:"tbr"`
Height interface{} `json:"height"` //Width int `json:"width"`
Quality int `json:"quality"` //Height int `json:"height"`
Tbr float64 `json:"tbr"` //Resolution string `json:"resolution"`
URL string `json:"url"` //Fps int `json:"fps"`
Width interface{} `json:"width"` //DynamicRange string `json:"dynamic_range"`
Ext string `json:"ext"` //Vcodec string `json:"vcodec"`
Vcodec string `json:"vcodec"` //Vbr float64 `json:"vbr"`
Acodec string `json:"acodec"` //StretchedRatio interface{} `json:"stretched_ratio"`
Abr float64 `json:"abr,omitempty"` //Acodec string `json:"acodec"`
DownloaderOptions struct { //Abr float64 `json:"abr"`
HTTPChunkSize int `json:"http_chunk_size"` //Asr int `json:"asr"`
} `json:"downloader_options,omitempty"` //Epoch int `json:"epoch"`
Container string `json:"container,omitempty"` //Filename string `json:"filename"`
Format string `json:"format"` //Urls string `json:"urls"`
Protocol string `json:"protocol"` //Type string `json:"_type"`
HTTPHeaders struct {
UserAgent string `json:"User-Agent"`
AcceptCharset string `json:"Accept-Charset"`
Accept string `json:"Accept"`
AcceptEncoding string `json:"Accept-Encoding"`
AcceptLanguage string `json:"Accept-Language"`
} `json:"http_headers"`
Vbr float64 `json:"vbr,omitempty"`
} }
type Thumbnail struct { type Thumbnail struct {
URL string `json:"url"` URL string `json:"url"`
Width int `json:"width"` Preference int `json:"preference"`
Resolution string `json:"resolution"`
ID string `json:"id"` ID string `json:"id"`
Height int `json:"height"` Height int `json:"height,omitempty"`
Width int `json:"width,omitempty"`
Resolution string `json:"resolution,omitempty"`
} }
type HTTPHeaders struct { func (v *YtdlVideo) GetUploadTime() time.Time {
AcceptCharset string `json:"Accept-Charset"` //priority list:
AcceptLanguage string `json:"Accept-Language"` // release timestamp from yt
AcceptEncoding string `json:"Accept-Encoding"` // release timestamp from morty
Accept string `json:"Accept"` // release date from yt
UserAgent string `json:"User-Agent"` // upload date from yt
if v.uploadDateForReal != nil {
return *v.uploadDateForReal
}
var ytdlReleaseTimestamp time.Time
if v.ReleaseTimestamp != nil && *v.ReleaseTimestamp > 0 {
ytdlReleaseTimestamp = time.Unix(*v.ReleaseTimestamp, 0).UTC()
}
//get morty timestamp
var mortyReleaseTimestamp time.Time
mortyRelease, err := sdk.GetAPIsConfigs().GetReleasedDate(v.ID)
if err != nil {
logrus.Error(err)
} else if mortyRelease != nil {
mortyReleaseTimestamp, err = time.ParseInLocation(time.RFC3339, mortyRelease.ReleaseTime, time.UTC)
if err != nil {
logrus.Error(err)
}
}
ytdlReleaseDate, err := time.Parse("20060102", v.ReleaseDate)
if err != nil {
logrus.Error(err)
}
ytdlUploadDate, err := time.Parse("20060102", v.UploadDate)
if err != nil {
logrus.Error(err)
}
if !ytdlReleaseTimestamp.IsZero() {
v.uploadDateForReal = &ytdlReleaseTimestamp
} else if !mortyReleaseTimestamp.IsZero() {
v.uploadDateForReal = &mortyReleaseTimestamp
} else if !ytdlReleaseDate.IsZero() {
v.uploadDateForReal = &ytdlReleaseDate
} else {
v.uploadDateForReal = &ytdlUploadDate
}
return *v.uploadDateForReal
} }

View file

@ -21,7 +21,7 @@ services:
## Wallet Server ## ## Wallet Server ##
################### ###################
walletserver: walletserver:
image: lbry/wallet-server:latest-release image: lbry/wallet-server:v0.101.1
restart: always restart: always
environment: environment:
- DB_DIRECTORY=/database - DB_DIRECTORY=/database
@ -81,6 +81,7 @@ services:
- walletserver - walletserver
environment: environment:
- LBRY_STREAMING_SERVER=0.0.0.0:5280 - LBRY_STREAMING_SERVER=0.0.0.0:5280
- LBRY_FEE_PER_NAME_CHAR=0
volumes: volumes:
- "./persist/.lbrynet:/home/lbrynet" - "./persist/.lbrynet:/home/lbrynet"
- ".:/etc/lbry" #Put your daemon_settings.yml here - ".:/etc/lbry" #Put your daemon_settings.yml here
@ -109,7 +110,7 @@ services:
## Internal APIs ## ## Internal APIs ##
################### ###################
internalapis: internalapis:
image: lbry/internal-apis:master image: odyseeteam/internal-apis:master
restart: "no" restart: "no"
ports: ports:
- "15400:8080" - "15400:8080"
@ -127,7 +128,7 @@ services:
## Chainquery ## ## Chainquery ##
################ ################
chainquery: chainquery:
image: lbry/chainquery:master image: odyseeteam/chainquery:master
restart: "no" restart: "no"
ports: ports:
- 6300:6300 - 6300:6300

166
go.mod
View file

@ -1,3 +1,5 @@
go 1.17
module github.com/lbryio/ytsync/v5 module github.com/lbryio/ytsync/v5
replace github.com/btcsuite/btcd => github.com/lbryio/lbrycrd.go v0.0.0-20200203050410-e1076f12bf19 replace github.com/btcsuite/btcd => github.com/lbryio/lbrycrd.go v0.0.0-20200203050410-e1076f12bf19
@ -6,38 +8,144 @@ replace github.com/btcsuite/btcd => github.com/lbryio/lbrycrd.go v0.0.0-20200203
//replace github.com/lbryio/reflector.go => /home/niko/go/src/github.com/lbryio/reflector.go/ //replace github.com/lbryio/reflector.go => /home/niko/go/src/github.com/lbryio/reflector.go/
require ( require (
github.com/Microsoft/go-winio v0.4.14 // indirect
github.com/abadojack/whatlanggo v1.0.1 github.com/abadojack/whatlanggo v1.0.1
github.com/asaskevich/govalidator v0.0.0-20200819183940-29e1ff8eb0bb github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d
github.com/aws/aws-sdk-go v1.25.9 github.com/aws/aws-sdk-go v1.44.6
github.com/davecgh/go-spew v1.1.1 github.com/davecgh/go-spew v1.1.1
github.com/docker/distribution v2.7.1+incompatible // indirect github.com/docker/docker v20.10.17+incompatible
github.com/docker/docker v1.13.1 github.com/lbryio/lbry.go/v2 v2.7.2-0.20220815204100-2adb8af5b68c
github.com/docker/go-connections v0.4.0 // indirect github.com/lbryio/reflector.go v1.1.3-0.20220730181028-f5d30b1a6e79
github.com/docker/go-units v0.4.0 // indirect github.com/mitchellh/go-ps v1.0.0
github.com/hashicorp/go-immutable-radix v1.1.0 // indirect github.com/prometheus/client_golang v1.12.1
github.com/hashicorp/go-sockaddr v1.0.2 // indirect github.com/shopspring/decimal v1.3.1
github.com/hashicorp/memberlist v0.1.5 // indirect github.com/sirupsen/logrus v1.9.0
github.com/hashicorp/serf v0.8.5 // indirect github.com/spf13/cobra v1.4.0
github.com/kr/pretty v0.2.1 // indirect github.com/stretchr/testify v1.7.1
github.com/lbryio/lbry.go/v2 v2.7.2-0.20210824154606-3e18b74da08b
github.com/lbryio/reflector.go v1.1.3-0.20210412225256-4392c9724262
github.com/miekg/dns v1.1.22 // indirect
github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 // indirect
github.com/prometheus/client_golang v0.9.3
github.com/shopspring/decimal v0.0.0-20191009025716-f1972eb1d1f5
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.7.0
github.com/tkanos/gonfig v0.0.0-20210106201359-53e13348de2f github.com/tkanos/gonfig v0.0.0-20210106201359-53e13348de2f
github.com/vbauerster/mpb/v7 v7.0.2 github.com/vbauerster/mpb/v7 v7.4.1
google.golang.org/appengine v1.6.5 // indirect gopkg.in/vansante/go-ffprobe.v2 v2.0.3
gopkg.in/ini.v1 v1.60.2 // indirect
gopkg.in/vansante/go-ffprobe.v2 v2.0.2
gotest.tools v2.2.0+incompatible gotest.tools v2.2.0+incompatible
) )
go 1.13 require (
github.com/Microsoft/go-winio v0.5.1 // indirect
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bluele/gcache v0.0.2 // indirect
github.com/brk0v/directio v0.0.0-20190225130936-69406e757cf7 // indirect
github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3 // indirect
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 // indirect
github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cheekybits/genny v1.0.0 // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/ekyoung/gin-nice-recovery v0.0.0-20160510022553-1654dca486db // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/gin-gonic/gin v1.7.7 // indirect
github.com/go-errors/errors v1.1.1 // indirect
github.com/go-ini/ini v1.48.0 // indirect
github.com/go-playground/locales v0.13.0 // indirect
github.com/go-playground/universal-translator v0.17.0 // indirect
github.com/go-playground/validator/v10 v10.4.1 // indirect
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/gofrs/uuid v3.2.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/rpc v1.2.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-immutable-radix v1.1.0 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/memberlist v0.3.0 // indirect
github.com/hashicorp/serf v0.9.7 // indirect
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/johntdyer/slack-go v0.0.0-20180213144715-95fac1160b22 // indirect
github.com/johntdyer/slackrus v0.0.0-20211215141436-33e4a270affb // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/karrick/godirwalk v1.17.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lbryio/chainquery v1.9.0 // indirect
github.com/lbryio/lbry.go v1.1.2 // indirect
github.com/lbryio/types v0.0.0-20220224142228-73610f6654a6 // indirect
github.com/leodido/go-urn v1.2.0 // indirect
github.com/lucas-clemente/quic-go v0.28.1 // indirect
github.com/lyoshenka/bencode v0.0.0-20180323155644-b7abd7672df5 // indirect
github.com/magiconair/properties v1.8.1 // indirect
github.com/marten-seemann/qpack v0.2.1 // indirect
github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect
github.com/marten-seemann/qtls-go1-17 v0.1.2 // indirect
github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect
github.com/marten-seemann/qtls-go1-19 v0.1.0-beta.1 // indirect
github.com/mattn/go-isatty v0.0.12 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/miekg/dns v1.1.41 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo v1.16.4 // indirect
github.com/onsi/gomega v1.17.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
github.com/pelletier/go-toml v1.9.3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/slack-go/slack v0.10.3 // indirect
github.com/spf13/afero v1.4.1 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.7.1 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/ugorji/go/codec v1.1.7 // indirect
github.com/volatiletech/inflect v0.0.0-20170731032912-e7201282ae8d // indirect
github.com/volatiletech/null v8.0.0+incompatible // indirect
github.com/volatiletech/sqlboiler v3.4.0+incompatible // indirect
github.com/ybbus/jsonrpc v2.1.2+incompatible // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect
golang.org/x/mod v0.4.2 // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
golang.org/x/tools v0.1.5 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/ini.v1 v1.60.2 // indirect
gopkg.in/nullbio/null.v6 v6.0.0-20161116030900-40264a2e6b79 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
gotest.tools/v3 v3.2.0 // indirect
)

602
go.sum

File diff suppressed because it is too large Load diff

26
main.go
View file

@ -9,7 +9,6 @@ import (
"github.com/lbryio/ytsync/v5/configs" "github.com/lbryio/ytsync/v5/configs"
"github.com/lbryio/ytsync/v5/manager" "github.com/lbryio/ytsync/v5/manager"
"github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/ytsync/v5/shared" "github.com/lbryio/ytsync/v5/shared"
ytUtils "github.com/lbryio/ytsync/v5/util" ytUtils "github.com/lbryio/ytsync/v5/util"
@ -33,6 +32,10 @@ var (
func main() { func main() {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
log.SetLevel(log.DebugLevel) log.SetLevel(log.DebugLevel)
customFormatter := new(log.TextFormatter)
customFormatter.TimestampFormat = "2006-01-02 15:04:05"
customFormatter.FullTimestamp = true
log.SetFormatter(customFormatter)
http.Handle("/metrics", promhttp.Handler()) http.Handle("/metrics", promhttp.Handler())
go func() { go func() {
log.Error(http.ListenAndServe(":2112", nil)) log.Error(http.ListenAndServe(":2112", nil))
@ -75,22 +78,11 @@ func ytSync(cmd *cobra.Command, args []string) {
if err != nil { if err != nil {
log.Fatalf("could not parse configuration file: %s", errors.FullTrace(err)) log.Fatalf("could not parse configuration file: %s", errors.FullTrace(err))
} }
var hostname string
if configs.Configuration.SlackToken == "" { if configs.Configuration.SlackToken == "" {
log.Error("A slack token was not present in the config! Slack messages disabled!") log.Error("A slack token was not present in the config! Slack messages disabled!")
} else { } else {
var err error util.InitSlack(configs.Configuration.SlackToken, configs.Configuration.SlackChannel, configs.Configuration.GetHostname())
hostname, err = os.Hostname()
if err != nil {
log.Error("could not detect system hostname")
hostname = "ytsync-unknown"
}
if len(hostname) > 30 {
hostname = hostname[0:30]
}
util.InitSlack(configs.Configuration.SlackToken, configs.Configuration.SlackChannel, hostname)
} }
if cliFlags.Status != "" && !util.InSlice(cliFlags.Status, shared.SyncStatuses) { if cliFlags.Status != "" && !util.InSlice(cliFlags.Status, shared.SyncStatuses) {
@ -131,17 +123,9 @@ func ytSync(cmd *cobra.Command, args []string) {
blobsDir := ytUtils.GetBlobsDir() blobsDir := ytUtils.GetBlobsDir()
apiConfig := &sdk.APIConfig{
ApiURL: configs.Configuration.InternalApisEndpoint,
ApiToken: configs.Configuration.InternalApisAuthToken,
HostName: hostname,
}
sm := manager.NewSyncManager( sm := manager.NewSyncManager(
cliFlags, cliFlags,
blobsDir, blobsDir,
configs.Configuration.LbrycrdString,
apiConfig,
) )
err = sm.Start() err = sm.Start()
if err != nil { if err != nil {

View file

@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/lbryio/ytsync/v5/blobs_reflector" "github.com/lbryio/ytsync/v5/blobs_reflector"
"github.com/lbryio/ytsync/v5/configs"
"github.com/lbryio/ytsync/v5/ip_manager" "github.com/lbryio/ytsync/v5/ip_manager"
"github.com/lbryio/ytsync/v5/namer" "github.com/lbryio/ytsync/v5/namer"
"github.com/lbryio/ytsync/v5/sdk" "github.com/lbryio/ytsync/v5/sdk"
@ -29,12 +30,12 @@ type SyncManager struct {
channelsToSync []Sync channelsToSync []Sync
} }
func NewSyncManager(cliFlags shared.SyncFlags, blobsDir, lbrycrdDsn string, apiConfig *sdk.APIConfig) *SyncManager { func NewSyncManager(cliFlags shared.SyncFlags, blobsDir string) *SyncManager {
return &SyncManager{ return &SyncManager{
CliFlags: cliFlags, CliFlags: cliFlags,
blobsDir: blobsDir, blobsDir: blobsDir,
LbrycrdDsn: lbrycrdDsn, LbrycrdDsn: configs.Configuration.LbrycrdString,
ApiConfig: apiConfig, ApiConfig: sdk.GetAPIsConfigs(),
} }
} }
func (s *SyncManager) enqueueChannel(channel *shared.YoutubeChannel) { func (s *SyncManager) enqueueChannel(channel *shared.YoutubeChannel) {
@ -137,7 +138,7 @@ func (s *SyncManager) Start() error {
"WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR", "WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR",
"NotEnoughFunds", "NotEnoughFunds",
"no space left on device", "no space left on device",
"failure uploading wallet", "there was a problem uploading the wallet",
"the channel in the wallet is different than the channel in the database", "the channel in the wallet is different than the channel in the database",
"this channel does not belong to this wallet!", "this channel does not belong to this wallet!",
"You already have a stream claim published under the name", "You already have a stream claim published under the name",

View file

@ -4,6 +4,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
"github.com/lbryio/ytsync/v5/configs" "github.com/lbryio/ytsync/v5/configs"
"github.com/lbryio/ytsync/v5/util" "github.com/lbryio/ytsync/v5/util"
@ -84,9 +85,9 @@ func (s *Sync) downloadWallet() error {
} }
func (s *Sync) downloadBlockchainDB() error { func (s *Sync) downloadBlockchainDB() error {
//if util.IsRegTest() { if util.IsRegTest() {
// return nil // tests fail if we re-use the same blockchain DB return nil // tests fail if we re-use the same blockchain DB
//} }
defaultBDBPath, defaultTempBDBPath, key, err := s.getBlockchainDBPaths() defaultBDBPath, defaultTempBDBPath, key, err := s.getBlockchainDBPaths()
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
@ -143,7 +144,10 @@ func (s *Sync) downloadBlockchainDB() error {
if err != nil { if err != nil {
return errors.Prefix("error extracting blockchain.db files", err) return errors.Prefix("error extracting blockchain.db files", err)
} }
err = os.Remove(defaultTempBDBPath)
if err != nil {
return errors.Err(err)
}
log.Printf("blockchain.db data downloaded and extracted to %s", blockchainDbDir) log.Printf("blockchain.db data downloaded and extracted to %s", blockchainDbDir)
return nil return nil
} }
@ -212,13 +216,22 @@ func (s *Sync) uploadWallet() error {
} }
defer file.Close() defer file.Close()
start := time.Now()
for time.Since(start) < 30*time.Minute {
_, err = uploader.Upload(&s3manager.UploadInput{ _, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(configs.Configuration.WalletS3Config.Bucket), Bucket: aws.String(configs.Configuration.WalletS3Config.Bucket),
Key: key, Key: key,
Body: file, Body: file,
}) })
if err != nil { if err != nil {
return err time.Sleep(30 * time.Second)
continue
}
break
}
if err != nil {
return errors.Prefix("there was a problem uploading the wallet to S3", errors.Err(err))
} }
log.Println("wallet uploaded to S3") log.Println("wallet uploaded to S3")
@ -264,5 +277,9 @@ func (s *Sync) uploadBlockchainDB() error {
return err return err
} }
log.Println("blockchain.db files uploaded to S3") log.Println("blockchain.db files uploaded to S3")
err = os.Remove(tarPath)
if err != nil {
return errors.Err(err)
}
return os.Remove(defaultBDBDir) return os.Remove(defaultBDBDir)
} }

View file

@ -103,6 +103,9 @@ func (s *Sync) walletSetup() error {
videosOnYoutube = s.Manager.CliFlags.VideosToSync(s.DbChannelData.TotalSubscribers) videosOnYoutube = s.Manager.CliFlags.VideosToSync(s.DbChannelData.TotalSubscribers)
} }
unallocatedVideos := videosOnYoutube - (publishedCount + failedCount) unallocatedVideos := videosOnYoutube - (publishedCount + failedCount)
if unallocatedVideos < 0 {
unallocatedVideos = 0
}
channelFee := channelClaimAmount channelFee := channelClaimAmount
channelAlreadyClaimed := s.DbChannelData.ChannelClaimID != "" channelAlreadyClaimed := s.DbChannelData.ChannelClaimID != ""
if channelAlreadyClaimed { if channelAlreadyClaimed {
@ -110,7 +113,7 @@ func (s *Sync) walletSetup() error {
} }
requiredBalance := float64(unallocatedVideos)*(publishAmount+estimatedMaxTxFee) + channelFee requiredBalance := float64(unallocatedVideos)*(publishAmount+estimatedMaxTxFee) + channelFee
if s.Manager.CliFlags.UpgradeMetadata { if s.Manager.CliFlags.UpgradeMetadata {
requiredBalance += float64(notUpgradedCount) * 0.001 requiredBalance += float64(notUpgradedCount) * estimatedMaxTxFee
} }
refillAmount := 0.0 refillAmount := 0.0
@ -127,6 +130,12 @@ func (s *Sync) walletSetup() error {
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
} }
} else if balance > requiredBalance {
extraLBC := balance - requiredBalance
if extraLBC > 5 {
sendBackAmount := extraLBC - 1
logUtils.SendInfoToSlack("channel %s has %.1f credits which is %.1f more than it requires (%.1f). We should send at least %.1f that back.", s.DbChannelData.ChannelId, balance, extraLBC, requiredBalance, sendBackAmount)
}
} }
claimAddress, err := s.daemon.AddressList(nil, nil, 1, 20) claimAddress, err := s.daemon.AddressList(nil, nil, 1, 20)
@ -366,14 +375,12 @@ func (s *Sync) ensureChannelOwnership() error {
channelUsesOldMetadata := false channelUsesOldMetadata := false
if channelToUse != nil { if channelToUse != nil {
channelUsesOldMetadata = channelToUse.Value.GetThumbnail() == nil channelUsesOldMetadata = channelToUse.Value.GetThumbnail() == nil || (len(channelToUse.Value.GetLanguages()) == 0 && s.DbChannelData.Language != "")
if !channelUsesOldMetadata { if !channelUsesOldMetadata {
return nil return nil
} }
} }
channelBidAmount := channelClaimAmount
balanceResp, err := s.daemon.AccountBalance(nil) balanceResp, err := s.daemon.AccountBalance(nil)
if err != nil { if err != nil {
return err return err
@ -385,8 +392,8 @@ func (s *Sync) ensureChannelOwnership() error {
return errors.Err(err) return errors.Err(err)
} }
if balance.LessThan(decimal.NewFromFloat(channelBidAmount)) { if balance.LessThan(decimal.NewFromFloat(channelClaimAmount)) {
err = s.addCredits(channelBidAmount + 0.3) err = s.addCredits(channelClaimAmount + estimatedMaxTxFee*3)
if err != nil { if err != nil {
return err return err
} }
@ -424,18 +431,16 @@ func (s *Sync) ensureChannelOwnership() error {
} }
var languages []string = nil var languages []string = nil
//we don't have this data without the API if s.DbChannelData.Language != "" {
//if channelInfo.DefaultLanguage != "" { languages = []string{s.DbChannelData.Language}
// if channelInfo.DefaultLanguage == "iw" { }
// channelInfo.DefaultLanguage = "he"
// }
// languages = []string{channelInfo.DefaultLanguage}
//}
var locations []jsonrpc.Location = nil var locations []jsonrpc.Location = nil
if channelInfo.Topbar.DesktopTopbarRenderer.CountryCode != "" { if channelInfo.Topbar.DesktopTopbarRenderer.CountryCode != "" {
locations = []jsonrpc.Location{{Country: &channelInfo.Topbar.DesktopTopbarRenderer.CountryCode}} locations = []jsonrpc.Location{{Country: &channelInfo.Topbar.DesktopTopbarRenderer.CountryCode}}
} }
var c *jsonrpc.TransactionSummary var c *jsonrpc.TransactionSummary
var recoveredChannelClaimID string
claimCreateOptions := jsonrpc.ClaimCreateOptions{ claimCreateOptions := jsonrpc.ClaimCreateOptions{
Title: &channelInfo.Microformat.MicroformatDataRenderer.Title, Title: &channelInfo.Microformat.MicroformatDataRenderer.Title,
Description: &channelInfo.Metadata.ChannelMetadataRenderer.Description, Description: &channelInfo.Metadata.ChannelMetadataRenderer.Description,
@ -445,12 +450,20 @@ func (s *Sync) ensureChannelOwnership() error {
ThumbnailURL: &thumbnailURL, ThumbnailURL: &thumbnailURL,
} }
if channelUsesOldMetadata { if channelUsesOldMetadata {
da, err := s.getDefaultAccount()
if err != nil {
return err
}
if s.DbChannelData.TransferState <= 1 { if s.DbChannelData.TransferState <= 1 {
c, err = s.daemon.ChannelUpdate(s.DbChannelData.ChannelClaimID, jsonrpc.ChannelUpdateOptions{ c, err = s.daemon.ChannelUpdate(s.DbChannelData.ChannelClaimID, jsonrpc.ChannelUpdateOptions{
ClearTags: util.PtrToBool(true), ClearTags: util.PtrToBool(true),
ClearLocations: util.PtrToBool(true), ClearLocations: util.PtrToBool(true),
ClearLanguages: util.PtrToBool(true), ClearLanguages: util.PtrToBool(true),
ChannelCreateOptions: jsonrpc.ChannelCreateOptions{ ChannelCreateOptions: jsonrpc.ChannelCreateOptions{
AccountID: &da,
FundingAccountIDs: []string{
da,
},
ClaimCreateOptions: claimCreateOptions, ClaimCreateOptions: claimCreateOptions,
CoverURL: bannerURL, CoverURL: bannerURL,
}, },
@ -460,20 +473,50 @@ func (s *Sync) ensureChannelOwnership() error {
return nil return nil
} }
} else { } else {
c, err = s.daemon.ChannelCreate(s.DbChannelData.DesiredChannelName, channelBidAmount, jsonrpc.ChannelCreateOptions{ c, err = s.daemon.ChannelCreate(s.DbChannelData.DesiredChannelName, channelClaimAmount, jsonrpc.ChannelCreateOptions{
ClaimCreateOptions: claimCreateOptions, ClaimCreateOptions: claimCreateOptions,
CoverURL: bannerURL, CoverURL: bannerURL,
}) })
if err != nil {
claimId, err2 := s.getChannelClaimIDForTimedOutCreation()
if err2 != nil {
err = errors.Prefix(err2.Error(), err)
} else {
recoveredChannelClaimID = claimId
}
}
} }
if err != nil { if err != nil {
return err return err
} }
if recoveredChannelClaimID != "" {
s.DbChannelData.ChannelClaimID = recoveredChannelClaimID
} else {
s.DbChannelData.ChannelClaimID = c.Outputs[0].ClaimID s.DbChannelData.ChannelClaimID = c.Outputs[0].ClaimID
}
return s.Manager.ApiConfig.SetChannelClaimID(s.DbChannelData.ChannelId, s.DbChannelData.ChannelClaimID) return s.Manager.ApiConfig.SetChannelClaimID(s.DbChannelData.ChannelId, s.DbChannelData.ChannelClaimID)
} }
//getChannelClaimIDForTimedOutCreation is a raw function that returns the only channel that exists in the wallet
// this is used because the SDK sucks and can't figure out when to return when creating a claim...
func (s *Sync) getChannelClaimIDForTimedOutCreation() (string, error) {
channels, err := s.daemon.ChannelList(nil, 1, 500, nil)
if err != nil {
return "", err
} else if channels == nil {
return "", errors.Err("no channel response")
}
if len((*channels).Items) != 1 {
return "", errors.Err("more than one channel found when trying to recover from SDK failure in creating the channel")
}
desiredChannel := (*channels).Items[0]
if desiredChannel.Name != s.DbChannelData.DesiredChannelName {
return "", errors.Err("the channel found in the wallet has a different name than the one we expected")
}
return desiredChannel.ClaimID, nil
}
func (s *Sync) addCredits(amountToAdd float64) error { func (s *Sync) addCredits(amountToAdd float64) error {
start := time.Now() start := time.Now()
defer func(start time.Time) { defer func(start time.Time) {

View file

@ -241,7 +241,7 @@ func transferVideos(s *Sync) error {
}, },
}, },
}, },
Bid: util.PtrToString("0.005"), // Todo - Dont hardcode Bid: util.PtrToString(fmt.Sprintf("%.5f", publishAmount/2.)),
} }
videoStatus := shared.VideoStatus{ videoStatus := shared.VideoStatus{
ChannelID: s.DbChannelData.ChannelId, ChannelID: s.DbChannelData.ChannelId,
@ -293,7 +293,7 @@ func (s *Sync) streamUpdate(ui *updateInfo) error {
timing.TimedComponent("transferStreamUpdate").Add(time.Since(start)) timing.TimedComponent("transferStreamUpdate").Add(time.Since(start))
if updateError != nil { if updateError != nil {
ui.videoStatus.FailureReason = updateError.Error() ui.videoStatus.FailureReason = updateError.Error()
ui.videoStatus.Status = shared.VideoStatusTranferFailed ui.videoStatus.Status = shared.VideoStatusTransferFailed
ui.videoStatus.IsTransferred = util.PtrToBool(false) ui.videoStatus.IsTransferred = util.PtrToBool(false)
} else { } else {
ui.videoStatus.IsTransferred = util.PtrToBool(len(result.Outputs) != 0) ui.videoStatus.IsTransferred = util.PtrToBool(len(result.Outputs) != 0)

View file

@ -33,11 +33,10 @@ import (
const ( const (
channelClaimAmount = 0.01 channelClaimAmount = 0.01
estimatedMaxTxFee = 0.1 estimatedMaxTxFee = 0.0015
minimumAccountBalance = 1.0 minimumAccountBalance = 1.0
minimumRefillAmount = 1 minimumRefillAmount = 1
publishAmount = 0.01 publishAmount = 0.002
maxReasonLength = 500
) )
// Sync stores the options that control how syncing happens // Sync stores the options that control how syncing happens
@ -286,6 +285,8 @@ func (s *Sync) setChannelTerminationStatus(e *error) {
"interrupted during daemon startup", "interrupted during daemon startup",
"interrupted by user", "interrupted by user",
"use --skip-space-check to ignore", "use --skip-space-check to ignore",
"failure uploading blockchain DB",
"default_wallet already exists",
} }
dbWipeConditions := []string{ dbWipeConditions := []string{
"Missing inputs", "Missing inputs",
@ -335,7 +336,7 @@ func (s *Sync) waitForDaemonStart() error {
} }
func (s *Sync) stopAndUploadWallet(e *error) { func (s *Sync) stopAndUploadWallet(e *error) {
log.Printf("Stopping daemon") log.Println("Stopping daemon")
shutdownErr := logUtils.StopDaemon() shutdownErr := logUtils.StopDaemon()
if shutdownErr != nil { if shutdownErr != nil {
logShutdownError(shutdownErr) logShutdownError(shutdownErr)
@ -350,9 +351,9 @@ func (s *Sync) stopAndUploadWallet(e *error) {
err := s.uploadWallet() err := s.uploadWallet()
if err != nil { if err != nil {
if *e == nil { if *e == nil {
e = &err *e = err
} else { } else {
*e = errors.Prefix("failure uploading wallet", *e) *e = errors.Prefix(fmt.Sprintf("%s + original error", errors.FullTrace(err)), *e)
} }
} }
err = s.uploadBlockchainDB() err = s.uploadBlockchainDB()
@ -495,7 +496,7 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim, ownClaims []jsonrpc.Claim)
claimMarkedUnpublished := claimInDatabase && !sv.Published claimMarkedUnpublished := claimInDatabase && !sv.Published
_, isOwnClaim := ownClaimsInfo[videoID] _, isOwnClaim := ownClaimsInfo[videoID]
transferred := !isOwnClaim || s.DbChannelData.TransferState == 3 transferred := !isOwnClaim || s.DbChannelData.TransferState == 3
transferStatusMismatch := sv.Transferred != transferred transferStatusMismatch := claimInDatabase && sv.Transferred != transferred
if metadataDiffers { if metadataDiffers {
log.Debugf("%s: Mismatch in database for metadata. DB: %d - Blockchain: %d", videoID, sv.MetadataVersion, chainInfo.MetadataVersion) log.Debugf("%s: Mismatch in database for metadata. DB: %d - Blockchain: %d", videoID, sv.MetadataVersion, chainInfo.MetadataVersion)
@ -556,7 +557,11 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim, ownClaims []jsonrpc.Claim)
if sv.Transferred || sv.IsLbryFirst { if sv.Transferred || sv.IsLbryFirst {
_, ok := allClaimsInfo[vID] _, ok := allClaimsInfo[vID]
if !ok && sv.Published { if !ok && sv.Published {
searchResponse, err := s.daemon.ClaimSearch(nil, &sv.ClaimID, nil, nil, 1, 20) searchResponse, err := s.daemon.ClaimSearch(jsonrpc.ClaimSearchArgs{
ClaimID: &sv.ClaimID,
Page: 1,
PageSize: 20,
})
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
continue continue
@ -672,7 +677,8 @@ func (s *Sync) checkIntegrity() error {
if pubsOnWallet > pubsOnDB { //This case should never happen if pubsOnWallet > pubsOnDB { //This case should never happen
logUtils.SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.DbChannelData.ChannelId) logUtils.SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.DbChannelData.ChannelId)
return errors.Err("not all published videos are in the database") //we never really done anything about those. it happens when a user updates the channel for a publish to another ytsync channel
//return errors.Err("not all published videos are in the database")
} }
if pubsOnWallet < pubsOnDB { if pubsOnWallet < pubsOnDB {
logUtils.SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.DbChannelData.ChannelId) logUtils.SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.DbChannelData.ChannelId)
@ -861,7 +867,7 @@ func (s *Sync) enqueueYoutubeVideos() error {
return err return err
} }
videos, err := ytapi.GetVideosToSync(s.Manager.ApiConfig, s.DbChannelData.ChannelId, s.syncedVideos, s.Manager.CliFlags.QuickSync, s.Manager.CliFlags.VideosToSync(s.DbChannelData.TotalSubscribers), ytapi.VideoParams{ videos, err := ytapi.GetVideosToSync(s.DbChannelData.ChannelId, s.syncedVideos, s.Manager.CliFlags.QuickSync, s.Manager.CliFlags.VideosToSync(s.DbChannelData.TotalSubscribers), ytapi.VideoParams{
VideoDir: s.videoDirectory, VideoDir: s.videoDirectory,
Stopper: s.grp, Stopper: s.grp,
IPPool: ipPool, IPPool: ipPool,

View file

@ -1,31 +1,17 @@
package metrics package metrics
import ( import (
"os" "github.com/lbryio/ytsync/v5/configs"
"regexp"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
) )
var ( var (
Durations = promauto.NewHistogramVec(prometheus.HistogramOpts{ Durations = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "ytsync", Namespace: "ytsync",
Subsystem: getHostname(), Subsystem: configs.Configuration.GetHostname(),
Name: "duration", Name: "duration",
Help: "The durations of the individual modules", Help: "The durations of the individual modules",
}, []string{"path"}) }, []string{"path"})
) )
func getHostname() string {
hostname, err := os.Hostname()
if err != nil {
hostname = "ytsync_unknown"
}
reg, err := regexp.Compile("[^a-zA-Z0-9_]+")
if err != nil {
log.Fatal(err)
}
return reg.ReplaceAllString(hostname, "_")
}

View file

@ -13,6 +13,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/null" "github.com/lbryio/lbry.go/v2/extras/null"
"github.com/lbryio/ytsync/v5/configs"
"github.com/lbryio/ytsync/v5/shared" "github.com/lbryio/ytsync/v5/shared"
"github.com/lbryio/ytsync/v5/util" "github.com/lbryio/ytsync/v5/util"
@ -30,6 +31,19 @@ type APIConfig struct {
HostName string HostName string
} }
var instance *APIConfig
func GetAPIsConfigs() *APIConfig {
if instance == nil {
instance = &APIConfig{
ApiURL: configs.Configuration.InternalApisEndpoint,
ApiToken: configs.Configuration.InternalApisAuthToken,
HostName: configs.Configuration.GetHostname(),
}
}
return instance
}
func (a *APIConfig) FetchChannels(status string, cliFlags *shared.SyncFlags) ([]shared.YoutubeChannel, error) { func (a *APIConfig) FetchChannels(status string, cliFlags *shared.SyncFlags) ([]shared.YoutubeChannel, error) {
type apiJobsResponse struct { type apiJobsResponse struct {
Success bool `json:"success"` Success bool `json:"success"`
@ -47,13 +61,10 @@ func (a *APIConfig) FetchChannels(status string, cliFlags *shared.SyncFlags) ([]
"channel_id": {cliFlags.ChannelID}, "channel_id": {cliFlags.ChannelID},
}) })
if err != nil { if err != nil {
if strings.Contains(err.Error(), "EOF") { util.SendErrorToSlack("error while trying to call %s. Waiting to retry: %s", endpoint, err.Error())
util.SendErrorToSlack("EOF error while trying to call %s. Waiting to retry", endpoint)
time.Sleep(30 * time.Second) time.Sleep(30 * time.Second)
return a.FetchChannels(status, cliFlags) return a.FetchChannels(status, cliFlags)
} }
return nil, errors.Err(err)
}
defer res.Body.Close() defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body) body, _ := ioutil.ReadAll(res.Body)
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
@ -110,13 +121,10 @@ func (a *APIConfig) SetChannelCert(certHex string, channelID string) error {
"auth_token": {a.ApiToken}, "auth_token": {a.ApiToken},
}) })
if err != nil { if err != nil {
if strings.Contains(err.Error(), "EOF") { util.SendErrorToSlack("error while trying to call %s. Waiting to retry: %s", endpoint, err.Error())
util.SendErrorToSlack("EOF error while trying to call %s. Waiting to retry", endpoint)
time.Sleep(30 * time.Second) time.Sleep(30 * time.Second)
return a.SetChannelCert(certHex, channelID) return a.SetChannelCert(certHex, channelID)
} }
return errors.Err(err)
}
defer res.Body.Close() defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body) body, _ := ioutil.ReadAll(res.Body)
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
@ -158,13 +166,10 @@ func (a *APIConfig) SetChannelStatus(channelID string, status string, failureRea
} }
res, err := http.PostForm(endpoint, params) res, err := http.PostForm(endpoint, params)
if err != nil { if err != nil {
if strings.Contains(err.Error(), "EOF") { util.SendErrorToSlack("error while trying to call %s. Waiting to retry: %s", endpoint, err.Error())
util.SendErrorToSlack("EOF error while trying to call %s. Waiting to retry", endpoint)
time.Sleep(30 * time.Second) time.Sleep(30 * time.Second)
return a.SetChannelStatus(channelID, status, failureReason, transferState) return a.SetChannelStatus(channelID, status, failureReason, transferState)
} }
return nil, nil, errors.Err(err)
}
defer res.Body.Close() defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body) body, _ := ioutil.ReadAll(res.Body)
if res.StatusCode >= http.StatusInternalServerError { if res.StatusCode >= http.StatusInternalServerError {
@ -208,13 +213,10 @@ func (a *APIConfig) SetChannelClaimID(channelID string, channelClaimID string) e
"channel_claim_id": {channelClaimID}, "channel_claim_id": {channelClaimID},
}) })
if err != nil { if err != nil {
if strings.Contains(err.Error(), "EOF") { util.SendErrorToSlack("error while trying to call %s. Waiting to retry: %s", endpoint, err.Error())
util.SendErrorToSlack("EOF error while trying to call %s. Waiting to retry", endpoint)
time.Sleep(30 * time.Second) time.Sleep(30 * time.Second)
return a.SetChannelClaimID(channelID, channelClaimID) return a.SetChannelClaimID(channelID, channelClaimID)
} }
return errors.Err(err)
}
defer res.Body.Close() defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body) body, _ := ioutil.ReadAll(res.Body)
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
@ -252,13 +254,10 @@ func (a *APIConfig) DeleteVideos(videos []string) error {
} }
res, err := http.PostForm(endpoint, vals) res, err := http.PostForm(endpoint, vals)
if err != nil { if err != nil {
if strings.Contains(err.Error(), "EOF") { util.SendErrorToSlack("error while trying to call %s. Waiting to retry: %s", endpoint, err.Error())
util.SendErrorToSlack("EOF error while trying to call %s. Waiting to retry", endpoint)
time.Sleep(30 * time.Second) time.Sleep(30 * time.Second)
return a.DeleteVideos(videos) return a.DeleteVideos(videos)
} }
return errors.Err(err)
}
defer res.Body.Close() defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body) body, _ := ioutil.ReadAll(res.Body)
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
@ -318,13 +317,10 @@ func (a *APIConfig) MarkVideoStatus(status shared.VideoStatus) error {
} }
res, err := http.PostForm(endpoint, vals) res, err := http.PostForm(endpoint, vals)
if err != nil { if err != nil {
if strings.Contains(err.Error(), "EOF") { util.SendErrorToSlack("error while trying to call %s. Waiting to retry: %s", endpoint, err.Error())
util.SendErrorToSlack("EOF error while trying to call %s for %s. Waiting to retry", endpoint, status.ClaimName)
time.Sleep(30 * time.Second) time.Sleep(30 * time.Second)
return a.MarkVideoStatus(status) return a.MarkVideoStatus(status)
} }
return errors.Err(err)
}
defer res.Body.Close() defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body) body, _ := ioutil.ReadAll(res.Body)
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
@ -360,13 +356,10 @@ func (a *APIConfig) VideoState(videoID string) (string, error) {
res, err := http.PostForm(endpoint, vals) res, err := http.PostForm(endpoint, vals)
if err != nil { if err != nil {
if strings.Contains(err.Error(), "EOF") { util.SendErrorToSlack("error while trying to call %s. Waiting to retry: %s", endpoint, err.Error())
util.SendErrorToSlack("EOF error while trying to call %s. Waiting to retry", endpoint)
time.Sleep(30 * time.Second) time.Sleep(30 * time.Second)
return a.VideoState(videoID) return a.VideoState(videoID)
} }
return "", errors.Err(err)
}
defer res.Body.Close() defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body) body, _ := ioutil.ReadAll(res.Body)
if res.StatusCode == http.StatusNotFound { if res.StatusCode == http.StatusNotFound {
@ -414,13 +407,10 @@ func (a *APIConfig) GetReleasedDate(videoID string) (*VideoRelease, error) {
res, err := http.PostForm(endpoint, vals) res, err := http.PostForm(endpoint, vals)
if err != nil { if err != nil {
if strings.Contains(err.Error(), "EOF") { util.SendErrorToSlack("error while trying to call %s. Waiting to retry: %s", endpoint, err.Error())
util.SendErrorToSlack("EOF error while trying to call %s. Waiting to retry", endpoint)
time.Sleep(30 * time.Second) time.Sleep(30 * time.Second)
return a.GetReleasedDate(videoID) return a.GetReleasedDate(videoID)
} }
return nil, errors.Err(err)
}
defer res.Body.Close() defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body) body, _ := ioutil.ReadAll(res.Body)
if res.StatusCode == http.StatusNotFound { if res.StatusCode == http.StatusNotFound {

View file

@ -26,6 +26,7 @@ type YoutubeChannel struct {
SizeLimit int `json:"size_limit"` SizeLimit int `json:"size_limit"`
LastUploadedVideo string `json:"last_uploaded_video"` LastUploadedVideo string `json:"last_uploaded_video"`
WipeDB bool `json:"wipe_db"` WipeDB bool `json:"wipe_db"`
Language string `json:"language"`
} }
type PublishAddress struct { type PublishAddress struct {
@ -77,6 +78,7 @@ var ErrorsNoRetry = []string{
"giving up after 0 fragment retries", "giving up after 0 fragment retries",
"Sorry about that", "Sorry about that",
"This video is not available", "This video is not available",
"Video unavailable",
"requested format not available", "requested format not available",
"interrupted by user", "interrupted by user",
"Sign in to confirm your age", "Sign in to confirm your age",
@ -89,6 +91,7 @@ var ErrorsNoRetry = []string{
"Premiere will begin shortly", "Premiere will begin shortly",
"cannot unmarshal number 0.0", "cannot unmarshal number 0.0",
"default youtube thumbnail found", "default youtube thumbnail found",
"livestream is likely bugged",
} }
var WalletErrors = []string{ var WalletErrors = []string{
"Not enough funds to cover this transaction", "Not enough funds to cover this transaction",
@ -115,6 +118,8 @@ var NeverRetryFailures = []string{
"Playback on other websites has been disabled by the video owner", "Playback on other websites has been disabled by the video owner",
"uploader has not made this video available in your country", "uploader has not made this video available in your country",
"This video has been removed by the uploader", "This video has been removed by the uploader",
"Video unavailable",
"Video is not available - hardcoded fix",
} }
type SyncFlags struct { type SyncFlags struct {
@ -152,7 +157,7 @@ func (f *SyncFlags) VideosToSync(totalSubscribers uint) int {
800: 250, 800: 250,
600: 200, 600: 200,
200: 80, 200: 80,
100: 50, 100: 20,
1: 10, 1: 10,
} }
videosToSync := 0 videosToSync := 0
@ -191,9 +196,10 @@ const (
StatusFailed = "failed" StatusFailed = "failed"
StatusFinalized = "finalized" // no more changes allowed StatusFinalized = "finalized" // no more changes allowed
StatusAbandoned = "abandoned" // deleted on youtube or banned StatusAbandoned = "abandoned" // deleted on youtube or banned
StatusAgeRestricted = "agerestricted" // one or more videos are age restricted and should be reprocessed with special keys
) )
var SyncStatuses = []string{StatusPending, StatusPendingEmail, StatusPendingUpgrade, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized, StatusAbandoned, StatusWipeDb} var SyncStatuses = []string{StatusPending, StatusPendingEmail, StatusPendingUpgrade, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized, StatusAbandoned, StatusWipeDb, StatusAgeRestricted}
const LatestMetadataVersion = 2 const LatestMetadataVersion = 2
@ -202,9 +208,11 @@ const (
VideoStatusFailed = "failed" VideoStatusFailed = "failed"
VideoStatusUpgradeFailed = "upgradefailed" VideoStatusUpgradeFailed = "upgradefailed"
VideoStatusUnpublished = "unpublished" VideoStatusUnpublished = "unpublished"
VideoStatusTranferFailed = "transferfailed" VideoStatusTransferFailed = "transferfailed"
) )
var VideoSyncStatuses = []string{VideoStatusPublished, VideoStatusFailed, VideoStatusUpgradeFailed, VideoStatusUnpublished, VideoStatusTransferFailed}
const ( const (
TransferStateNotTouched = iota TransferStateNotTouched = iota
TransferStatePending TransferStatePending

View file

@ -16,17 +16,12 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/abadojack/whatlanggo"
"github.com/lbryio/ytsync/v5/downloader" "github.com/lbryio/ytsync/v5/downloader"
"github.com/lbryio/ytsync/v5/downloader/ytdl" "github.com/lbryio/ytsync/v5/downloader/ytdl"
"github.com/lbryio/ytsync/v5/shared"
"github.com/vbauerster/mpb/v7"
"github.com/vbauerster/mpb/v7/decor"
"gopkg.in/vansante/go-ffprobe.v2"
"github.com/lbryio/ytsync/v5/ip_manager" "github.com/lbryio/ytsync/v5/ip_manager"
"github.com/lbryio/ytsync/v5/namer" "github.com/lbryio/ytsync/v5/namer"
"github.com/lbryio/ytsync/v5/sdk" "github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/ytsync/v5/shared"
"github.com/lbryio/ytsync/v5/tags_manager" "github.com/lbryio/ytsync/v5/tags_manager"
"github.com/lbryio/ytsync/v5/thumbs" "github.com/lbryio/ytsync/v5/thumbs"
"github.com/lbryio/ytsync/v5/timing" "github.com/lbryio/ytsync/v5/timing"
@ -37,8 +32,12 @@ import (
"github.com/lbryio/lbry.go/v2/extras/stop" "github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/lbry.go/v2/extras/util" "github.com/lbryio/lbry.go/v2/extras/util"
"github.com/abadojack/whatlanggo"
"github.com/shopspring/decimal" "github.com/shopspring/decimal"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vbauerster/mpb/v7"
"github.com/vbauerster/mpb/v7/decor"
"gopkg.in/vansante/go-ffprobe.v2"
) )
type YoutubeVideo struct { type YoutubeVideo struct {
@ -107,7 +106,7 @@ func NewYoutubeVideo(directory string, videoData *ytdl.YtdlVideo, playlistPositi
title: videoData.Title, title: videoData.Title,
description: videoData.Description, description: videoData.Description,
playlistPosition: playlistPosition, playlistPosition: playlistPosition,
publishedAt: videoData.UploadDateForReal, publishedAt: videoData.GetUploadTime(),
dir: directory, dir: directory,
youtubeInfo: videoData, youtubeInfo: videoData,
mocked: false, mocked: false,
@ -176,7 +175,7 @@ func (v *YoutubeVideo) getFullPath() string {
} }
func (v *YoutubeVideo) getAbbrevDescription() string { func (v *YoutubeVideo) getAbbrevDescription() string {
maxLength := 2800 maxLength := 6500
description := strings.TrimSpace(v.description) description := strings.TrimSpace(v.description)
additionalDescription := "\nhttps://www.youtube.com/watch?v=" + v.id additionalDescription := "\nhttps://www.youtube.com/watch?v=" + v.id
khanAcademyClaimID := "5fc52291980268b82413ca4c0ace1b8d749f3ffb" khanAcademyClaimID := "5fc52291980268b82413ca4c0ace1b8d749f3ffb"
@ -321,7 +320,7 @@ func (v *YoutubeVideo) download() error {
//speedThrottleRetries := 3 //speedThrottleRetries := 3
for i := 0; i < len(qualities); i++ { for i := 0; i < len(qualities); i++ {
quality := qualities[i] quality := qualities[i]
argsWithFilters := append(ytdlArgs, "-fbestvideo[ext=mp4][vcodec!*=av01][height<="+quality+"]+bestaudio[ext!=webm][format_id!=258][format_id!=251][format_id!=256][format_id!=327]") argsWithFilters := append(ytdlArgs, "-fbestvideo[ext=mp4][vcodec!*=av01][height<="+quality+"]+bestaudio[ext!=webm][format_id!=258][format_id!=380][format_id!=251][format_id!=256][format_id!=327][format_id!=328]")
argsWithFilters = append(argsWithFilters, userAgent...) argsWithFilters = append(argsWithFilters, userAgent...)
//if speedThrottleRetries > 0 { //if speedThrottleRetries > 0 {
// speedThrottleRetries-- // speedThrottleRetries--
@ -515,21 +514,28 @@ func (v *YoutubeVideo) trackProgressBar(argsWithFilters []string, ticker *time.T
bar.Completed() bar.Completed()
bar.Abort(true) bar.Abort(true)
}() }()
origSize := int64(0)
lastUpdate := time.Now()
for { for {
select { select {
case <-done.Ch(): case <-done.Ch():
return return
case <-ticker.C: case <-ticker.C:
var err error
size, err := logUtils.DirSize(v.videoDir()) size, err := logUtils.DirSize(v.videoDir())
if err != nil { if err != nil {
log.Errorf("error while getting size of download directory: %s", errors.FullTrace(err)) log.Errorf("error while getting size of download directory: %s", errors.FullTrace(err))
return return
} }
if size > origSize {
origSize = size
bar.SetCurrent(size) bar.SetCurrent(size)
if size > int64(videoSize+audioSize) { if size > int64(videoSize+audioSize) {
bar.SetTotal(size+2048, false) bar.SetTotal(size+2048, false)
} }
bar.DecoratorEwmaUpdate(400 * time.Millisecond) bar.DecoratorEwmaUpdate(time.Since(lastUpdate))
lastUpdate = time.Now()
}
} }
} }
}() }()
@ -774,6 +780,9 @@ func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, params SyncParams, existingV
func (v *YoutubeVideo) downloadAndPublish(daemon *jsonrpc.Client, params SyncParams) (*SyncSummary, error) { func (v *YoutubeVideo) downloadAndPublish(daemon *jsonrpc.Client, params SyncParams) (*SyncSummary, error) {
var err error var err error
if v.youtubeInfo == nil {
return nil, errors.Err("Video is not available - hardcoded fix")
}
dur := time.Duration(v.youtubeInfo.Duration) * time.Second dur := time.Duration(v.youtubeInfo.Duration) * time.Second
minDuration := 7 * time.Second minDuration := 7 * time.Second
@ -781,6 +790,9 @@ func (v *YoutubeVideo) downloadAndPublish(daemon *jsonrpc.Client, params SyncPar
if v.youtubeInfo.IsLive == true { if v.youtubeInfo.IsLive == true {
return nil, errors.Err("video is a live stream and hasn't completed yet") return nil, errors.Err("video is a live stream and hasn't completed yet")
} }
if v.youtubeInfo.Availability != "public" {
return nil, errors.Err("video is not public")
}
if dur > v.maxVideoLength { if dur > v.maxVideoLength {
logUtils.SendErrorToSlack("%s is %s long and the limit is %s", v.id, dur.String(), v.maxVideoLength.String()) logUtils.SendErrorToSlack("%s is %s long and the limit is %s", v.id, dur.String(), v.maxVideoLength.String())
return nil, errors.Err("video is too long to process") return nil, errors.Err("video is too long to process")
@ -789,6 +801,11 @@ func (v *YoutubeVideo) downloadAndPublish(daemon *jsonrpc.Client, params SyncPar
logUtils.SendErrorToSlack("%s is %s long and the minimum is %s", v.id, dur.String(), minDuration.String()) logUtils.SendErrorToSlack("%s is %s long and the minimum is %s", v.id, dur.String(), minDuration.String())
return nil, errors.Err("video is too short to process") return nil, errors.Err("video is too short to process")
} }
buggedLivestream := v.youtubeInfo.LiveStatus == "post_live"
if buggedLivestream && dur >= 2*time.Hour {
return nil, errors.Err("livestream is likely bugged as it was recently published and has a length of %s which is more than 2 hours", dur.String())
}
for { for {
err = v.download() err = v.download()
if err != nil && strings.Contains(err.Error(), "HTTP Error 429") { if err != nil && strings.Contains(err.Error(), "HTTP Error 429") {
@ -858,7 +875,11 @@ func (v *YoutubeVideo) getMetadata() (languages []string, locations []jsonrpc.Lo
} }
func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, existingVideoData *sdk.SyncedVideo) (*SyncSummary, error) { func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, existingVideoData *sdk.SyncedVideo) (*SyncSummary, error) {
c, err := daemon.ClaimSearch(nil, &existingVideoData.ClaimID, nil, nil, 1, 20) c, err := daemon.ClaimSearch(jsonrpc.ClaimSearchArgs{
ClaimID: &existingVideoData.ClaimID,
Page: 1,
PageSize: 20,
})
if err != nil { if err != nil {
return nil, errors.Err(err) return nil, errors.Err(err)
} }
@ -927,6 +948,7 @@ func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, exis
Height: util.PtrToUint(720), Height: util.PtrToUint(720),
Width: util.PtrToUint(1280), Width: util.PtrToUint(1280),
Fee: fee, Fee: fee,
ReleaseTime: util.PtrToInt64(v.publishedAt.Unix()),
} }
v.walletLock.RLock() v.walletLock.RLock()

View file

@ -14,7 +14,9 @@ func SendErrorToSlack(format string, a ...interface{}) {
message = fmt.Sprintf(format, a...) message = fmt.Sprintf(format, a...)
} }
log.Errorln(message) log.Errorln(message)
err := util.SendToSlack(":sos: " + message) log.SetLevel(log.InfoLevel) //I don't want to change the underlying lib so this will do...
err := util.SendToSlack(":sos: ```" + message + "```")
log.SetLevel(log.DebugLevel)
if err != nil { if err != nil {
log.Errorln(err) log.Errorln(err)
} }
@ -27,7 +29,9 @@ func SendInfoToSlack(format string, a ...interface{}) {
message = fmt.Sprintf(format, a...) message = fmt.Sprintf(format, a...)
} }
log.Infoln(message) log.Infoln(message)
log.SetLevel(log.InfoLevel) //I don't want to change the underlying lib so this will do...
err := util.SendToSlack(":information_source: " + message) err := util.SendToSlack(":information_source: " + message)
log.SetLevel(log.DebugLevel)
if err != nil { if err != nil {
log.Errorln(err) log.Errorln(err)
} }

View file

@ -55,7 +55,7 @@ type VideoParams struct {
var mostRecentlyFailedChannel string // TODO: fix this hack! var mostRecentlyFailedChannel string // TODO: fix this hack!
func GetVideosToSync(config *sdk.APIConfig, channelID string, syncedVideos map[string]sdk.SyncedVideo, quickSync bool, maxVideos int, videoParams VideoParams, lastUploadedVideo string) ([]Video, error) { func GetVideosToSync(channelID string, syncedVideos map[string]sdk.SyncedVideo, quickSync bool, maxVideos int, videoParams VideoParams, lastUploadedVideo string) ([]Video, error) {
var videos []Video var videos []Video
if quickSync && maxVideos > 50 { if quickSync && maxVideos > 50 {
maxVideos = 50 maxVideos = 50
@ -94,7 +94,7 @@ func GetVideosToSync(config *sdk.APIConfig, channelID string, syncedVideos map[s
mostRecentlyFailedChannel = channelID mostRecentlyFailedChannel = channelID
} }
vids, err := getVideos(config, channelID, videoIDs, videoParams.Stopper.Ch(), videoParams.IPPool) vids, err := getVideos(channelID, videoIDs, videoParams.Stopper.Ch(), videoParams.IPPool)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -109,7 +109,8 @@ func GetVideosToSync(config *sdk.APIConfig, channelID string, syncedVideos map[s
} }
for k, v := range syncedVideos { for k, v := range syncedVideos {
if !v.Published { newMetadataVersion := int8(2)
if !v.Published && v.MetadataVersion >= newMetadataVersion {
continue continue
} }
if _, ok := playlistMap[k]; !ok { if _, ok := playlistMap[k]; !ok {
@ -203,7 +204,8 @@ func ChannelInfo(channelID string) (*YoutubeStatsResponse, error) {
return &decodedResponse, nil return &decodedResponse, nil
} }
func getVideos(config *sdk.APIConfig, channelID string, videoIDs []string, stopChan stop.Chan, ipPool *ip_manager.IPPool) ([]*ytdl.YtdlVideo, error) { func getVideos(channelID string, videoIDs []string, stopChan stop.Chan, ipPool *ip_manager.IPPool) ([]*ytdl.YtdlVideo, error) {
config := sdk.GetAPIsConfigs()
var videos []*ytdl.YtdlVideo var videos []*ytdl.YtdlVideo
for _, videoID := range videoIDs { for _, videoID := range videoIDs {
if len(videoID) < 5 { if len(videoID) < 5 {
@ -215,11 +217,6 @@ func getVideos(config *sdk.APIConfig, channelID string, videoIDs []string, stopC
default: default:
} }
//ip, err := ipPool.GetIP(videoID)
//if err != nil {
// return nil, err
//}
//video, err := downloader.GetVideoInformation(videoID, &net.TCPAddr{IP: net.ParseIP(ip)})
state, err := config.VideoState(videoID) state, err := config.VideoState(videoID)
if err != nil { if err != nil {
return nil, errors.Err(err) return nil, errors.Err(err)
@ -227,7 +224,7 @@ func getVideos(config *sdk.APIConfig, channelID string, videoIDs []string, stopC
if state == "published" { if state == "published" {
continue continue
} }
video, err := downloader.GetVideoInformation(config, videoID, stopChan, nil, ipPool) video, err := downloader.GetVideoInformation(videoID, stopChan, ipPool)
if err != nil { if err != nil {
errSDK := config.MarkVideoStatus(shared.VideoStatus{ errSDK := config.MarkVideoStatus(shared.VideoStatus{
ChannelID: channelID, ChannelID: channelID,