Compare commits

...

59 commits

Author SHA1 Message Date
Niko Storni 78d5c8c6fa never retry those hardcoded errors 2022-08-16 03:06:39 +02:00
Niko Storni caca92a6bc idk... let's work around this for now 2022-08-16 02:51:35 +02:00
Niko Storni 9ef1b7800b fix nil ptr? 2022-08-16 02:41:14 +02:00
Niko Storni ea3315d1d6 this is actually necessary 2022-08-16 02:14:33 +02:00
Niko Storni 68132c65a9 fix channel update process 2022-08-15 22:42:06 +02:00
Niko Storni 57e017ec8f fix some logic
reduce verbosity of debug logs
2022-08-15 21:19:20 +02:00
Niko Storni 42db3782ec fix issue with yt not returning a date 2022-08-11 18:01:55 +02:00
Niko Storni 9d93799d86 account for nil struct 2022-08-11 05:28:06 +02:00
Niko d93f463386
fix odd error with morty's date 2022-08-11 00:33:40 +02:00
Niko Storni 77988c1682 woops 2022-08-10 22:01:50 +02:00
Niko Storni c79e07c9fa change recent livestreams logic slightly 2022-08-10 21:55:29 +02:00
Niko Storni e454cdb4c9 fix post live detection
prevent unlisted videos from ever publishing (even if they were public before and we know about them)
fix timestamp on videos
update user agent
2022-08-10 21:26:36 +02:00
Niko Storni 98a10d1269 fix for channel creation bug 2022-08-10 18:23:37 +02:00
Niko Storni 4f6748ae83 fix bug in channel updates 2022-08-10 17:27:27 +02:00
Niko Storni c1b2117df5 fix vuln 2022-08-09 22:27:00 +02:00
Niko Storni c4207338c8 fix const bug
update dependencies
2022-08-09 22:17:03 +02:00
Niko Storni 5a01983203 add language to channels
improve logging (timestamps)
retry wallet uploads for 30 minutes
don't fail if the db isn't tracking all publishes
2022-08-09 22:11:42 +02:00
Niko Storni ee8eb83d07 fix dependency vuln 2022-05-10 23:36:00 +02:00
Niko Storni 8d0f762067 change description length 2022-05-10 23:09:23 +02:00
Niko Storni 8fa1482d18 adjust limits 2022-05-09 20:23:44 +02:00
Niko Storni 00ae404642 exclude audio ac-3 codec not working in chrome 2022-05-09 20:05:05 +02:00
Niko Storni 230bfe4a41 fix go version in travis 2022-05-04 19:22:19 +02:00
Niko Storni df33fb9263 fix api failures
update dependencies
fix e2e
2022-05-04 19:17:36 +02:00
Niko Storni d72be1d920 add new status 2022-03-03 18:59:32 +01:00
Niko Storni 7d12a90139 avoid ec-3 audio codec 2022-02-09 17:06:51 +01:00
Niko e1689a2a6c
Merge pull request #116 from e4drcf/synced-video-statuses
add shared video sync statuses array for validation purposes
2022-02-07 20:03:29 +01:00
Ivan a8a6347d52 add shared video sync statuses array for validation purposes 2022-02-07 20:17:41 +02:00
Niko Storni bdee1b4092 fix math to avoid negative balances 2022-01-26 07:43:09 +01:00
Niko Storni 0d0d39380c fix spend amounts to save credits 2022-01-26 07:11:26 +01:00
Niko Storni 7ff1a009da remove unused params 2022-01-14 18:49:02 +01:00
Niko Storni e3a332c7e1 upgrade dependencies 2022-01-14 18:10:58 +01:00
Niko Storni 33ee6e4b94 Merge remote-tracking branch 'origin/metadata_fix' 2022-01-14 17:20:22 +01:00
Niko Storni f6cde976a6 fix metadata? 2022-01-06 15:15:04 +01:00
Niko Storni 17944fa46a refactor get video time
remove broken time lookup
refactor quite some code
2021-12-30 13:17:11 -05:00
Niko Storni 3c18ae8de2 add checks for buggy livestreams 2021-12-29 17:47:46 -05:00
Niko Storni 84790720ff improve error handling
retry wallet uploads on failure
2021-12-02 16:59:14 +01:00
Niko 23690731af
fix bug when uploading wallet 2021-11-30 02:53:59 +01:00
Niko Storni 75628d8530 delete tars after use 2021-11-25 04:26:04 +01:00
Niko Storni 6e819b20f6 update readme 2021-11-24 18:58:38 +01:00
Niko Storni da0b6e5b79 add example config 2021-11-24 18:39:24 +01:00
Niko Storni 28791f317b update gitignore
improve error logging to slack
fix regression in dev builds
2021-11-24 18:37:04 +01:00
Niko Storni 070287716b switch from env vars to a config file
get rid of stupid stuff
simplify S3 configuration
split wallets from blockchain.db and use separate S3 store
fix bugs
2021-11-24 05:54:08 +01:00
Alex Grin 41054e77a6
Update README.md 2021-09-28 10:20:17 -04:00
Niko Storni 6944e17f43 fix livestreams failing for wrong reason 2021-08-24 12:03:03 -04:00
Niko Storni a224fe44c2 fix stream by magic 2021-08-24 11:54:38 -04:00
Niko Storni fa9dc35123 fix patched bug 2021-07-26 18:03:37 +02:00
Niko Storni 3a0882230a bypass throttling 2021-07-13 22:49:27 -04:00
Niko Storni fbd683e094 don't panic 2021-07-08 01:47:57 +02:00
Niko Storni 2f15c920d4 speed up playlist listing 2021-06-25 19:16:01 +02:00
Niko Storni 01f6448e72 revert aria2c usage 2021-06-25 19:09:00 +02:00
Niko Storni 8fb1e2ead0 fix empty thumbnails 2021-06-25 19:04:40 +02:00
Niko Storni 3b84db382c attempt using aria2c for some servers 2021-06-24 23:06:27 +02:00
Niko Storni 4fe6840a4e refactor code
fix bugs
add stub for speed checker
2021-06-18 04:47:08 +02:00
Niko Storni 69e6fb51d1 improve error handling 2021-06-18 03:09:19 +02:00
Niko Storni f17110ab7f fix nilptr 2021-06-18 01:23:25 +02:00
Niko Storni 768743a200 fix deadlock 2021-06-17 22:02:42 +02:00
Niko Storni 7c652b22a1 better output 2021-06-17 19:43:30 +02:00
Niko Storni a0fb4e579e fix progressbar
fix videos with leading dash
2021-06-17 19:13:44 +02:00
Niko Storni 519e1e4648 switch to yt-dlp
add progressbars
avoid unnecessary calls to youtube
update user agents
cookies fixes
bug fixes
introduction of new bugs
2021-06-17 17:51:21 +02:00
28 changed files with 1847 additions and 809 deletions

4
.gitignore vendored
View file

@ -4,3 +4,7 @@ e2e/supporty/supporty
.env
blobsfiles
ytsync_docker
e2e/config.json
e2e/cookies.txt

View file

@ -2,7 +2,7 @@ os: linux
dist: bionic
language: go
go:
- 1.16.3
- 1.17.x
install: true
@ -23,7 +23,7 @@ addons:
- python3-pip
before_script:
- sudo pip3 install -U youtube-dl
- sudo pip3 install -U yt-dlp
- sudo add-apt-repository -y ppa:savoury1/ffmpeg4
env:

View file

@ -8,23 +8,17 @@ With the support of said database, the tool is also able to keep all the channel
# Requirements
- lbrynet SDK https://github.com/lbryio/lbry/releases (We strive to keep the latest release of ytsync compatible with the latest major release of the SDK)
- lbrynet SDK https://github.com/lbryio/lbry-sdk/releases (We strive to keep the latest release of ytsync compatible with the latest major release of the SDK)
- a lbrycrd node running (localhost or on a remote machine) with credits in it
- internal-apis (you cannot run this one yourself)
- python3-pip
- yt-dlp (`pip3 install -U yt-dlp`)
- ffmpeg (latest)
# Setup
- make sure daemon is stopped and can be controlled through `systemctl` (find example below)
- extract the ytsync binary anywhere
- add the environment variables necessary to the tool
- export SLACK_TOKEN="a-token-to-spam-your-slack"
- export SLACK_CHANNEL="youtube-status"
- export YOUTUBE_API_KEY="youtube-api-key"
- export LBRY_WEB_API="https://lbry-api-url-here"
- export LBRY_API_TOKEN="internal-apis-token-for-ytsync-user"
- export LBRYCRD_STRING="tcp://user:password@host:5429"
- export AWS_S3_ID="THE-ID-LIES-HERE"
- export AWS_S3_SECRET="THE-SECRET-LIES-HERE"
- export AWS_S3_REGION="us-east-1"
- export AWS_S3_BUCKET="ytsync-wallets"
- create and fill `config.json` using [this example](config.json.example)
## systemd script example
`/etc/systemd/system/lbrynet.service`
@ -55,23 +49,26 @@ Usage:
Flags:
--after int Specify from when to pull jobs [Unix time](Default: 0)
--before int Specify until when to pull jobs [Unix time](Default: current Unix time) (default current timestamp)
--before int Specify until when to pull jobs [Unix time](Default: current Unix time) (default 1669311891)
--channelID string If specified, only this channel will be synced.
--concurrent-jobs int how many jobs to process concurrently (default 1)
-h, --help help for ytsync
--limit int limit the amount of channels to sync
--max-length float Maximum video length to process (in hours) (default 2)
--max-length int Maximum video length to process (in hours) (default 2)
--max-size int Maximum video size to process (in MB) (default 2048)
--max-tries int Number of times to try a publish that fails (default 3)
--no-transfers Skips the transferring process of videos, channels and supports
--quick Look up only the last 50 videos from youtube
--remove-db-unpublished Remove videos from the database that are marked as published but aren't really published
--run-once Whether the process should be stopped after one cycle or not
--skip-space-check Do not perform free space check on startup
--status string Specify which queue to pull from. Overrides --update
--stop-on-error If a publish fails, stop all publishing and exit
--status2 string Specify which secondary queue to pull from.
--takeover-existing-channel If channel exists and we don't own it, take over the channel
--update Update previously synced channels instead of syncing new ones
--upgrade-metadata Upgrade videos if they're on the old metadata version
--videos-limit int how many videos to process per channel (default 1000)
--videos-limit int how many videos to process per channel (leave 0 for automatic detection)
```
## Running from Source
@ -88,17 +85,17 @@ Contributions to this project are welcome, encouraged, and compensated. For more
## Security
We take security seriously. Please contact [security@lbry.io](mailto:security@lbry.io) regarding any security issues. Our PGP key is [here](https://keybase.io/lbry/key.asc) if you need it.
We take security seriously. Please contact [security@lbry.io](mailto:security@lbry.io) regarding any security issues. Our PGP key is [here](https://lbry.com/faq/pgp-key) if you need it.
## Contact
The primary contact for this project is [Niko Storni](https://github.com/nikooo777) (niko@lbry.io).
The primary contact for this project is [Niko Storni](https://github.com/nikooo777) (niko@lbry.com).
## Additional Info and Links
- [https://lbry.io](https://lbry.io) - The live LBRY website
- [Discord Chat](https://chat.lbry.io) - A chat room for the LBRYians
- [Email us](mailto:hello@lbry.io) - LBRY Support email
- [https://lbry.com](https://lbry.com) - The live LBRY website
- [Discord Chat](https://chat.lbry.com) - A chat room for the LBRYians
- [Email us](mailto:hello@lbry.com) - LBRY Support email
- [Twitter](https://twitter.com/@lbryio) - LBRY Twitter page
- [Facebook](https://www.facebook.com/lbryio/) - LBRY Facebook page
- [Reddit](https://reddit.com/r/lbry) - LBRY Reddit page

35
config.json.example Normal file
View file

@ -0,0 +1,35 @@
{
"slack_token": "",
"slack_channel": "ytsync-dev",
"internal_apis_endpoint": "http://localhost:15400",
"internal_apis_auth_token": "ytsyntoken",
"lbrycrd_string": "tcp://lbry:lbry@localhost:15200",
"wallet_s3_config": {
"id": "",
"secret": "",
"region": "us-east-1",
"bucket": "ytsync-wallets",
"endpoint": ""
},
"blockchaindb_s3_config": {
"id": "",
"secret": "",
"region": "us-east-1",
"bucket": "blockchaindbs",
"endpoint": ""
},
"thumbnails_s3_config": {
"id": "",
"secret": "",
"region": "us-east-1",
"bucket": "thumbnails.lbry.com",
"endpoint": ""
},
"aws_thumbnails_s3_config": {
"id": "",
"secret": "",
"region": "us-east-1",
"bucket": "thumbnails.lbry.com",
"endpoint": ""
}
}

75
configs/configs.go Normal file
View file

@ -0,0 +1,75 @@
package configs
import (
"os"
"regexp"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
log "github.com/sirupsen/logrus"
"github.com/tkanos/gonfig"
)
type S3Configs struct {
ID string `json:"id"`
Secret string `json:"secret"`
Region string `json:"region"`
Bucket string `json:"bucket"`
Endpoint string `json:"endpoint"`
}
type Configs struct {
SlackToken string `json:"slack_token"`
SlackChannel string `json:"slack_channel"`
InternalApisEndpoint string `json:"internal_apis_endpoint"`
InternalApisAuthToken string `json:"internal_apis_auth_token"`
LbrycrdString string `json:"lbrycrd_string"`
WalletS3Config S3Configs `json:"wallet_s3_config"`
BlockchaindbS3Config S3Configs `json:"blockchaindb_s3_config"`
AWSThumbnailsS3Config S3Configs `json:"aws_thumbnails_s3_config"`
ThumbnailsS3Config S3Configs `json:"thumbnails_s3_config"`
}
var Configuration *Configs
func Init(configPath string) error {
if Configuration != nil {
return nil
}
c := Configs{}
err := gonfig.GetConf(configPath, &c)
if err != nil {
return errors.Err(err)
}
Configuration = &c
return nil
}
func (s *S3Configs) GetS3AWSConfig() *aws.Config {
return &aws.Config{
Credentials: credentials.NewStaticCredentials(s.ID, s.Secret, ""),
Region: &s.Region,
Endpoint: &s.Endpoint,
S3ForcePathStyle: aws.Bool(true),
}
}
func (c *Configs) GetHostname() string {
var hostname string
var err error
hostname, err = os.Hostname()
if err != nil {
log.Error("could not detect system hostname")
hostname = "ytsync_unknown"
}
reg, err := regexp.Compile("[^a-zA-Z0-9_]+")
if err == nil {
hostname = reg.ReplaceAllString(hostname, "_")
}
if len(hostname) > 30 {
hostname = hostname[0:30]
}
return hostname
}

View file

@ -8,7 +8,9 @@ import (
"net"
"net/http"
"net/url"
"os"
"os/exec"
"path"
"strings"
"time"
@ -16,6 +18,8 @@ import (
"github.com/lbryio/ytsync/v5/downloader/ytdl"
"github.com/lbryio/ytsync/v5/ip_manager"
"github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/ytsync/v5/shared"
util2 "github.com/lbryio/ytsync/v5/util"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/stop"
@ -25,14 +29,16 @@ import (
)
func GetPlaylistVideoIDs(channelName string, maxVideos int, stopChan stop.Chan, pool *ip_manager.IPPool) ([]string, error) {
args := []string{"--skip-download", "https://www.youtube.com/channel/" + channelName, "--get-id", "--flat-playlist", "--cookies", "cookies.txt"}
ids, err := run(channelName, args, stopChan, pool, true)
args := []string{"--skip-download", "https://www.youtube.com/channel/" + channelName + "/videos", "--get-id", "--flat-playlist", "--cookies", "cookies.txt", "--playlist-end", fmt.Sprintf("%d", maxVideos)}
ids, err := run(channelName, args, stopChan, pool)
if err != nil {
return nil, errors.Err(err)
}
videoIDs := make([]string, 0, maxVideos)
for i, v := range ids {
logrus.Debugf("%d - video id %s", i, v)
if v == "" {
continue
}
if i >= maxVideos {
break
}
@ -43,62 +49,36 @@ func GetPlaylistVideoIDs(channelName string, maxVideos int, stopChan stop.Chan,
const releaseTimeFormat = "2006-01-02, 15:04:05 (MST)"
func GetVideoInformation(config *sdk.APIConfig, videoID string, stopChan stop.Chan, ip *net.TCPAddr, pool *ip_manager.IPPool) (*ytdl.YtdlVideo, error) {
args := []string{"--skip-download", "--print-json", "https://www.youtube.com/watch?v=" + videoID, "--cookies", "cookies.txt"}
results, err := run(videoID, args, stopChan, pool, false)
func GetVideoInformation(videoID string, stopChan stop.Chan, pool *ip_manager.IPPool) (*ytdl.YtdlVideo, error) {
args := []string{
"--skip-download",
"--write-info-json",
fmt.Sprintf("https://www.youtube.com/watch?v=%s", videoID),
"--cookies",
"cookies.txt",
"-o",
path.Join(util2.GetVideoMetadataDir(), videoID),
}
_, err := run(videoID, args, stopChan, pool)
if err != nil {
return nil, errors.Err(err)
}
f, err := os.Open(path.Join(util2.GetVideoMetadataDir(), videoID+".info.json"))
if err != nil {
return nil, errors.Err(err)
}
// defer the closing of our jsonFile so that we can parse it later on
defer f.Close()
// read our opened jsonFile as a byte array.
byteValue, _ := ioutil.ReadAll(f)
var video *ytdl.YtdlVideo
err = json.Unmarshal([]byte(results[0]), &video)
err = json.Unmarshal(byteValue, &video)
if err != nil {
return nil, errors.Err(err)
}
// now get an accurate time
const maxTries = 5
tries := 0
GetTime:
tries++
t, err := getUploadTime(config, videoID, ip, video.UploadDate)
if err != nil {
//slack(":warning: Upload time error: %v", err)
if tries <= maxTries && (errors.Is(err, errNotScraped) || errors.Is(err, errUploadTimeEmpty) || errors.Is(err, errStatusParse) || errors.Is(err, errConnectionIssue)) {
err := triggerScrape(videoID, ip)
if err == nil {
time.Sleep(2 * time.Second) // let them scrape it
goto GetTime
} else {
//slack("triggering scrape returned error: %v", err)
}
} else if !errors.Is(err, errNotScraped) && !errors.Is(err, errUploadTimeEmpty) {
//slack(":warning: Error while trying to get accurate upload time for %s: %v", videoID, err)
if t == "" {
return nil, errors.Err(err)
} else {
t = "" //TODO: get rid of the other piece below?
}
}
// do fallback below
}
//slack("After all that, upload time for %s is %s", videoID, t)
if t != "" {
parsed, err := time.Parse("2006-01-02, 15:04:05 (MST)", t) // this will probably be UTC, but Go's timezone parsing is fucked up. it ignores the timezone in the date
if err != nil {
return nil, errors.Err(err)
}
//slack(":exclamation: Got an accurate time for %s", videoID)
video.UploadDateForReal = parsed
} else { //TODO: this is the piece that isn't needed!
slack(":warning: Could not get accurate time for %s. Falling back to time from upload ytdl: %s.", videoID, video.UploadDate)
// fall back to UploadDate from youtube-dl
video.UploadDateForReal, err = time.Parse("20060102", video.UploadDate)
if err != nil {
return nil, err
}
}
return video, nil
}
@ -126,7 +106,7 @@ func triggerScrape(videoID string, ip *net.TCPAddr) error {
if err != nil {
return errors.Err(err)
}
req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36")
req.Header.Set("User-Agent", ChromeUA)
res, err := client.Do(req)
if err != nil {
@ -188,45 +168,7 @@ func getUploadTime(config *sdk.APIConfig, videoID string, ip *net.TCPAddr, uploa
}
}
if time.Now().AddDate(0, 0, -3).After(ytdlUploadDate) {
return ytdlUploadDate.Format(releaseTimeFormat), nil
}
client := getClient(ip)
req, err := http.NewRequest(http.MethodGet, "https://caa.iti.gr/get_verificationV3?url=https://www.youtube.com/watch?v="+videoID, nil)
if err != nil {
return ytdlUploadDate.Format(releaseTimeFormat), errors.Err(err)
}
req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36")
res, err := client.Do(req)
if err != nil {
return ytdlUploadDate.Format(releaseTimeFormat), errors.Err(err)
}
defer res.Body.Close()
var uploadTime struct {
Time string `json:"video_upload_time"`
Message string `json:"message"`
Status string `json:"status"`
}
err = json.NewDecoder(res.Body).Decode(&uploadTime)
if err != nil {
return ytdlUploadDate.Format(releaseTimeFormat), errors.Err(err)
}
if uploadTime.Status == "ERROR1" {
return ytdlUploadDate.Format(releaseTimeFormat), errNotScraped
}
if uploadTime.Status == "" && strings.HasPrefix(uploadTime.Message, "CANNOT_RETRIEVE_REPORT_FOR_VIDEO_") {
return ytdlUploadDate.Format(releaseTimeFormat), errors.Err("cannot retrieve report for video")
}
if uploadTime.Time == "" {
return ytdlUploadDate.Format(releaseTimeFormat), errUploadTimeEmpty
}
return uploadTime.Time, nil
return ytdlUploadDate.Format(releaseTimeFormat), nil
}
func getClient(ip *net.TCPAddr) *http.Client {
@ -251,16 +193,18 @@ func getClient(ip *net.TCPAddr) *http.Client {
}
const (
googleBotUA = "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
chromeUA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36"
GoogleBotUA = "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
ChromeUA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36"
maxAttempts = 3
extractionError = "YouTube said: Unable to extract video data"
throttledError = "HTTP Error 429"
AlternateThrottledError = "returned non-zero exit status 8"
youtubeDlError = "exit status 1"
videoPremiereError = "Premieres in"
liveEventError = "This live event will begin in"
)
func run(use string, args []string, stopChan stop.Chan, pool *ip_manager.IPPool, dlc bool) ([]string, error) {
func run(use string, args []string, stopChan stop.Chan, pool *ip_manager.IPPool) ([]string, error) {
var useragent []string
var lastError error
for attempts := 0; attempts < maxAttempts; attempts++ {
@ -270,10 +214,7 @@ func run(use string, args []string, stopChan stop.Chan, pool *ip_manager.IPPool,
}
argsForCommand := append(args, "--source-address", sourceAddress)
argsForCommand = append(argsForCommand, useragent...)
binary := "youtube-dl"
if dlc {
binary = "youtube-dlc"
}
binary := "yt-dlp"
cmd := exec.Command(binary, argsForCommand...)
res, err := runCmd(cmd, stopChan)
@ -283,6 +224,9 @@ func run(use string, args []string, stopChan stop.Chan, pool *ip_manager.IPPool,
}
lastError = err
if strings.Contains(err.Error(), youtubeDlError) {
if util.SubstringInSlice(err.Error(), shared.ErrorsNoRetry) {
break
}
if strings.Contains(err.Error(), extractionError) {
logrus.Warnf("known extraction error: %s", errors.FullTrace(err))
useragent = nextUA(useragent)
@ -299,13 +243,13 @@ func run(use string, args []string, stopChan stop.Chan, pool *ip_manager.IPPool,
func nextUA(current []string) []string {
if len(current) == 0 {
return []string{"--user-agent", googleBotUA}
return []string{"--user-agent", GoogleBotUA}
}
return []string{"--user-agent", chromeUA}
return []string{"--user-agent", ChromeUA}
}
func runCmd(cmd *exec.Cmd, stopChan stop.Chan) ([]string, error) {
logrus.Infof("running youtube-dl(c) cmd: %s", strings.Join(cmd.Args, " "))
logrus.Infof("running yt-dlp cmd: %s", strings.Join(cmd.Args, " "))
var err error
stderr, err := cmd.StderrPipe()
if err != nil {
@ -341,7 +285,8 @@ func runCmd(cmd *exec.Cmd, stopChan stop.Chan) ([]string, error) {
return nil, errors.Err("interrupted by user")
case err := <-done:
if err != nil {
return nil, errors.Prefix("youtube-dl(c) "+strings.Join(cmd.Args, " ")+" ["+string(errorLog)+"]", err)
//return nil, errors.Prefix("yt-dlp "+strings.Join(cmd.Args, " ")+" ["+string(errorLog)+"]", err)
return nil, errors.Prefix(string(errorLog), err)
}
return strings.Split(strings.Replace(string(outLog), "\r\n", "\n", -1), "\n"), nil
}

View file

@ -3,7 +3,11 @@ package downloader
import (
"testing"
"github.com/lbryio/ytsync/v5/ip_manager"
"github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)
@ -19,24 +23,18 @@ func TestGetPlaylistVideoIDs(t *testing.T) {
}
func TestGetVideoInformation(t *testing.T) {
video, err := GetVideoInformation(nil, "zj7pXM9gE5M", nil, nil, nil)
if err != nil {
logrus.Error(err)
}
if video != nil {
logrus.Info(video.ID)
}
s := stop.New()
ip, err := ip_manager.GetIPPool(s)
assert.NoError(t, err)
video, err := GetVideoInformation("kDGOHNpRjzc", s.Ch(), ip)
assert.NoError(t, err)
assert.NotNil(t, video)
logrus.Info(video.ID)
}
func Test_getUploadTime(t *testing.T) {
configs := sdk.APIConfig{
YoutubeAPIKey: "",
ApiURL: "https://api.lbry.com",
ApiToken: "Ht4NETrL5oWKyAaZkuSV68BKhtXkiLh5",
HostName: "test",
}
configs := sdk.APIConfig{}
got, err := getUploadTime(&configs, "kDGOHNpRjzc", nil, "20060102")
assert.NoError(t, err)
t.Log(got)
}

View file

@ -2,146 +2,136 @@ package ytdl
import (
"time"
"github.com/lbryio/ytsync/v5/sdk"
"github.com/sirupsen/logrus"
)
type YtdlVideo struct {
UploadDate string `json:"upload_date"`
UploadDateForReal time.Time // you need to manually set this since the value in the API doesn't include the time
Extractor string `json:"extractor"`
Series interface{} `json:"series"`
Format string `json:"format"`
Vbr interface{} `json:"vbr"`
Chapters interface{} `json:"chapters"`
Height int `json:"height"`
LikeCount interface{} `json:"like_count"`
Duration int `json:"duration"`
Fulltitle string `json:"fulltitle"`
PlaylistIndex interface{} `json:"playlist_index"`
Album interface{} `json:"album"`
ViewCount int `json:"view_count"`
Playlist interface{} `json:"playlist"`
Title string `json:"title"`
Filename string `json:"_filename"`
Creator interface{} `json:"creator"`
Ext string `json:"ext"`
ID string `json:"id"`
DislikeCount interface{} `json:"dislike_count"`
AverageRating float64 `json:"average_rating"`
Abr float64 `json:"abr"`
UploaderURL string `json:"uploader_url"`
Categories []string `json:"categories"`
Fps float64 `json:"fps"`
StretchedRatio interface{} `json:"stretched_ratio"`
SeasonNumber interface{} `json:"season_number"`
Annotations interface{} `json:"annotations"`
WebpageURLBasename string `json:"webpage_url_basename"`
Acodec string `json:"acodec"`
DisplayID string `json:"display_id"`
//RequestedFormats []RequestedFormat `json:"requested_formats"`
//AutomaticCaptions struct{} `json:"automatic_captions"`
Description string `json:"description"`
Tags []string `json:"tags"`
Track interface{} `json:"track"`
RequestedSubtitles interface{} `json:"requested_subtitles"`
StartTime interface{} `json:"start_time"`
Uploader string `json:"uploader"`
ExtractorKey string `json:"extractor_key"`
FormatID string `json:"format_id"`
EpisodeNumber interface{} `json:"episode_number"`
UploaderID string `json:"uploader_id"`
//Subtitles struct{} `json:"subtitles"`
ReleaseYear interface{} `json:"release_year"`
Thumbnails []Thumbnail `json:"thumbnails"`
License interface{} `json:"license"`
Artist interface{} `json:"artist"`
AgeLimit int `json:"age_limit"`
ReleaseDate interface{} `json:"release_date"`
AltTitle interface{} `json:"alt_title"`
Thumbnail string `json:"thumbnail"`
ChannelID string `json:"channel_id"`
IsLive interface{} `json:"is_live"`
Width int `json:"width"`
EndTime interface{} `json:"end_time"`
WebpageURL string `json:"webpage_url"`
//Formats []Format `json:"formats"`
ChannelURL string `json:"channel_url"`
Resolution interface{} `json:"resolution"`
Vcodec string `json:"vcodec"`
}
ID string `json:"id"`
Title string `json:"title"`
Thumbnails []Thumbnail `json:"thumbnails"`
Description string `json:"description"`
ChannelID string `json:"channel_id"`
Duration int `json:"duration"`
Categories []string `json:"categories"`
Tags []string `json:"tags"`
IsLive bool `json:"is_live"`
LiveStatus string `json:"live_status"`
ReleaseTimestamp *int64 `json:"release_timestamp"`
uploadDateForReal *time.Time
Availability string `json:"availability"`
ReleaseDate string `json:"release_date"`
UploadDate string `json:"upload_date"`
type RequestedFormat struct {
Asr interface{} `json:"asr"`
Tbr float64 `json:"tbr"`
Container string `json:"container"`
Language interface{} `json:"language"`
Format string `json:"format"`
URL string `json:"url"`
Vcodec string `json:"vcodec"`
FormatNote string `json:"format_note"`
Height int `json:"height"`
Width int `json:"width"`
Ext string `json:"ext"`
FragmentBaseURL string `json:"fragment_base_url"`
Filesize interface{} `json:"filesize"`
Fps float64 `json:"fps"`
ManifestURL string `json:"manifest_url"`
Protocol string `json:"protocol"`
FormatID string `json:"format_id"`
HTTPHeaders struct {
AcceptCharset string `json:"Accept-Charset"`
AcceptLanguage string `json:"Accept-Language"`
AcceptEncoding string `json:"Accept-Encoding"`
Accept string `json:"Accept"`
UserAgent string `json:"User-Agent"`
} `json:"http_headers"`
Fragments []struct {
Path string `json:"path"`
Duration float64 `json:"duration,omitempty"`
} `json:"fragments"`
Acodec string `json:"acodec"`
Abr int `json:"abr,omitempty"`
}
type Format struct {
Asr int `json:"asr"`
Tbr float64 `json:"tbr"`
Protocol string `json:"protocol"`
Format string `json:"format"`
FormatNote string `json:"format_note"`
Height interface{} `json:"height"`
ManifestURL string `json:"manifest_url,omitempty"`
FormatID string `json:"format_id"`
Container string `json:"container,omitempty"`
Language interface{} `json:"language,omitempty"`
HTTPHeaders HTTPHeaders `json:"http_headers"`
URL string `json:"url"`
Vcodec string `json:"vcodec"`
Abr int `json:"abr,omitempty"`
Width interface{} `json:"width"`
Ext string `json:"ext"`
FragmentBaseURL string `json:"fragment_base_url,omitempty"`
Filesize interface{} `json:"filesize"`
Fps float64 `json:"fps"`
Fragments []struct {
Path string `json:"path"`
Duration float64 `json:"duration,omitempty"`
} `json:"fragments,omitempty"`
Acodec string `json:"acodec"`
PlayerURL interface{} `json:"player_url,omitempty"`
//WasLive bool `json:"was_live"`
//Formats interface{} `json:"formats"`
//Thumbnail string `json:"thumbnail"`
//Uploader string `json:"uploader"`
//UploaderID string `json:"uploader_id"`
//UploaderURL string `json:"uploader_url"`
//ChannelURL string `json:"channel_url"`
//ViewCount int `json:"view_count"`
//AverageRating interface{} `json:"average_rating"`
//AgeLimit int `json:"age_limit"`
//WebpageURL string `json:"webpage_url"`
//PlayableInEmbed bool `json:"playable_in_embed"`
//AutomaticCaptions interface{} `json:"automatic_captions"`
//Subtitles interface{} `json:"subtitles"`
//Chapters interface{} `json:"chapters"`
//LikeCount int `json:"like_count"`
//Channel string `json:"channel"`
//ChannelFollowerCount int `json:"channel_follower_count"`
//OriginalURL string `json:"original_url"`
//WebpageURLBasename string `json:"webpage_url_basename"`
//WebpageURLDomain string `json:"webpage_url_domain"`
//Extractor string `json:"extractor"`
//ExtractorKey string `json:"extractor_key"`
//Playlist interface{} `json:"playlist"`
//PlaylistIndex interface{} `json:"playlist_index"`
//DisplayID string `json:"display_id"`
//Fulltitle string `json:"fulltitle"`
//DurationString string `json:"duration_string"`
//RequestedSubtitles interface{} `json:"requested_subtitles"`
//HasDrm bool `json:"__has_drm"`
//RequestedFormats interface{} `json:"requested_formats"`
//Format string `json:"format"`
//FormatID string `json:"format_id"`
//Ext string `json:"ext"`
//Protocol string `json:"protocol"`
//Language interface{} `json:"language"`
//FormatNote string `json:"format_note"`
//FilesizeApprox int `json:"filesize_approx"`
//Tbr float64 `json:"tbr"`
//Width int `json:"width"`
//Height int `json:"height"`
//Resolution string `json:"resolution"`
//Fps int `json:"fps"`
//DynamicRange string `json:"dynamic_range"`
//Vcodec string `json:"vcodec"`
//Vbr float64 `json:"vbr"`
//StretchedRatio interface{} `json:"stretched_ratio"`
//Acodec string `json:"acodec"`
//Abr float64 `json:"abr"`
//Asr int `json:"asr"`
//Epoch int `json:"epoch"`
//Filename string `json:"filename"`
//Urls string `json:"urls"`
//Type string `json:"_type"`
}
type Thumbnail struct {
URL string `json:"url"`
Width int `json:"width"`
Resolution string `json:"resolution"`
Preference int `json:"preference"`
ID string `json:"id"`
Height int `json:"height"`
Height int `json:"height,omitempty"`
Width int `json:"width,omitempty"`
Resolution string `json:"resolution,omitempty"`
}
type HTTPHeaders struct {
AcceptCharset string `json:"Accept-Charset"`
AcceptLanguage string `json:"Accept-Language"`
AcceptEncoding string `json:"Accept-Encoding"`
Accept string `json:"Accept"`
UserAgent string `json:"User-Agent"`
func (v *YtdlVideo) GetUploadTime() time.Time {
//priority list:
// release timestamp from yt
// release timestamp from morty
// release date from yt
// upload date from yt
if v.uploadDateForReal != nil {
return *v.uploadDateForReal
}
var ytdlReleaseTimestamp time.Time
if v.ReleaseTimestamp != nil && *v.ReleaseTimestamp > 0 {
ytdlReleaseTimestamp = time.Unix(*v.ReleaseTimestamp, 0).UTC()
}
//get morty timestamp
var mortyReleaseTimestamp time.Time
mortyRelease, err := sdk.GetAPIsConfigs().GetReleasedDate(v.ID)
if err != nil {
logrus.Error(err)
} else if mortyRelease != nil {
mortyReleaseTimestamp, err = time.ParseInLocation(time.RFC3339, mortyRelease.ReleaseTime, time.UTC)
if err != nil {
logrus.Error(err)
}
}
ytdlReleaseDate, err := time.Parse("20060102", v.ReleaseDate)
if err != nil {
logrus.Error(err)
}
ytdlUploadDate, err := time.Parse("20060102", v.UploadDate)
if err != nil {
logrus.Error(err)
}
if !ytdlReleaseTimestamp.IsZero() {
v.uploadDateForReal = &ytdlReleaseTimestamp
} else if !mortyReleaseTimestamp.IsZero() {
v.uploadDateForReal = &mortyReleaseTimestamp
} else if !ytdlReleaseDate.IsZero() {
v.uploadDateForReal = &ytdlReleaseDate
} else {
v.uploadDateForReal = &ytdlUploadDate
}
return *v.uploadDateForReal
}

View file

@ -21,7 +21,7 @@ services:
## Wallet Server ##
###################
walletserver:
image: lbry/wallet-server:latest-release
image: lbry/wallet-server:v0.101.1
restart: always
environment:
- DB_DIRECTORY=/database
@ -31,12 +31,14 @@ services:
- BANDWIDTH_LIMIT=80000000000
- SESSION_TIMEOUT=10000000000000000000000000
- TCP_PORT=50001
- ELASTIC_HOST=es01
ports:
- "15300:50001"
expose:
- "50001"
depends_on:
- lbrycrd
- es01
ulimits:
nofile:
soft: 90000
@ -44,10 +46,30 @@ services:
#command: lbry.wallet.server.coin.LBC
command: lbry.wallet.server.coin.LBCRegTest
#############
## elasticsearch ##
#############
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0
container_name: es01
environment:
- node.name=es01
- discovery.type=single-node
- indices.query.bool.max_clause_count=8196
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms4g -Xmx4g"
ulimits:
memlock:
soft: -1
hard: -1
ports:
- "9200:9200"
expose:
- "9200"
#############
## Lbrynet ##
#############
lbrynet:
image: lbry/lbrynet:v0.90.1
image: lbry/lbrynet:v0.99.0
restart: always
ports:
- "15100:5279"
@ -59,6 +81,7 @@ services:
- walletserver
environment:
- LBRY_STREAMING_SERVER=0.0.0.0:5280
- LBRY_FEE_PER_NAME_CHAR=0
volumes:
- "./persist/.lbrynet:/home/lbrynet"
- ".:/etc/lbry" #Put your daemon_settings.yml here
@ -87,7 +110,7 @@ services:
## Internal APIs ##
###################
internalapis:
image: lbry/internal-apis:master
image: odyseeteam/internal-apis:master
restart: "no"
ports:
- "15400:8080"
@ -105,7 +128,7 @@ services:
## Chainquery ##
################
chainquery:
image: lbry/chainquery:master
image: odyseeteam/chainquery:master
restart: "no"
ports:
- 6300:6300

View file

@ -14,11 +14,8 @@ export LOCAL_TMP_DIR="/var/tmp:/var/tmp"
touch -a .env && set -o allexport; source ./.env; set +o allexport
echo "LOCAL_TMP_DIR=$LOCAL_TMP_DIR"
# Compose settings - docker only
export SLACK_CHANNEL="ytsync-travis"
export LBRY_API_TOKEN="ytsyntoken"
export LBRY_WEB_API="http://localhost:15400"
export LBRYNET_ADDRESS="http://localhost:15100"
export LBRYCRD_STRING="tcp://lbry:lbry@localhost:15200"
export LBRYCRD_STRING="tcp://lbry:lbry@localhost:15200" #required for supporty
export LBRYNET_USE_DOCKER=true
export REFLECT_BLOBS=false
export CLEAN_ON_STARTUP=true
@ -50,9 +47,9 @@ until curl --output /dev/null --silent --head --fail http://localhost:15400; do
done
echo "successfully started..."
channelToSync="UCGyoEsIRjmnmzrsB67DhrOA"
channelName=@Alaminemoh11"$(date +%s)"
latestVideoID="ejWF7Jjdgmc"
channelToSync="UCMn-zv1SE-2y6vyewscfFqw"
channelName=@whatever"$(date +%s)"
latestVideoID="yPJgjiMbmX0"
#Data Setup for test
./data_setup.sh "$channelName" "$channelToSync" "$latestVideoID"
@ -96,3 +93,4 @@ if [[ $status != "synced" || $videoStatus != "published" || $channelTransferStat
else
echo "SUCCESSSSSSSSSSSSS!"
fi;
docker-compose down

View file

@ -8,7 +8,7 @@ services:
## Lbrynet ##
#############
lbrynet:
image: lbry/lbrynet:v0.90.1
image: lbry/lbrynet:v0.99.0
restart: "no"
networks:
lbry-network:

166
go.mod
View file

@ -1,3 +1,5 @@
go 1.17
module github.com/lbryio/ytsync/v5
replace github.com/btcsuite/btcd => github.com/lbryio/lbrycrd.go v0.0.0-20200203050410-e1076f12bf19
@ -6,36 +8,144 @@ replace github.com/btcsuite/btcd => github.com/lbryio/lbrycrd.go v0.0.0-20200203
//replace github.com/lbryio/reflector.go => /home/niko/go/src/github.com/lbryio/reflector.go/
require (
github.com/Microsoft/go-winio v0.4.14 // indirect
github.com/abadojack/whatlanggo v1.0.1
github.com/asaskevich/govalidator v0.0.0-20200819183940-29e1ff8eb0bb
github.com/aws/aws-sdk-go v1.25.9
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d
github.com/aws/aws-sdk-go v1.44.6
github.com/davecgh/go-spew v1.1.1
github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/docker v1.13.1
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/hashicorp/go-immutable-radix v1.1.0 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/memberlist v0.1.5 // indirect
github.com/hashicorp/serf v0.8.5 // indirect
github.com/kr/pretty v0.2.1 // indirect
github.com/lbryio/lbry.go/v2 v2.7.2-0.20210412222918-ed51ece75c3d
github.com/lbryio/reflector.go v1.1.3-0.20210412225256-4392c9724262
github.com/miekg/dns v1.1.22 // indirect
github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 // indirect
github.com/prometheus/client_golang v0.9.3
github.com/shopspring/decimal v0.0.0-20191009025716-f1972eb1d1f5
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.7.0
google.golang.org/appengine v1.6.5 // indirect
gopkg.in/ini.v1 v1.60.2 // indirect
gopkg.in/vansante/go-ffprobe.v2 v2.0.2
github.com/docker/docker v20.10.17+incompatible
github.com/lbryio/lbry.go/v2 v2.7.2-0.20220815204100-2adb8af5b68c
github.com/lbryio/reflector.go v1.1.3-0.20220730181028-f5d30b1a6e79
github.com/mitchellh/go-ps v1.0.0
github.com/prometheus/client_golang v1.12.1
github.com/shopspring/decimal v1.3.1
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cobra v1.4.0
github.com/stretchr/testify v1.7.1
github.com/tkanos/gonfig v0.0.0-20210106201359-53e13348de2f
github.com/vbauerster/mpb/v7 v7.4.1
gopkg.in/vansante/go-ffprobe.v2 v2.0.3
gotest.tools v2.2.0+incompatible
)
go 1.13
require (
github.com/Microsoft/go-winio v0.5.1 // indirect
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bluele/gcache v0.0.2 // indirect
github.com/brk0v/directio v0.0.0-20190225130936-69406e757cf7 // indirect
github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3 // indirect
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 // indirect
github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cheekybits/genny v1.0.0 // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/ekyoung/gin-nice-recovery v0.0.0-20160510022553-1654dca486db // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/gin-gonic/gin v1.7.7 // indirect
github.com/go-errors/errors v1.1.1 // indirect
github.com/go-ini/ini v1.48.0 // indirect
github.com/go-playground/locales v0.13.0 // indirect
github.com/go-playground/universal-translator v0.17.0 // indirect
github.com/go-playground/validator/v10 v10.4.1 // indirect
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/gofrs/uuid v3.2.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/rpc v1.2.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-immutable-radix v1.1.0 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/memberlist v0.3.0 // indirect
github.com/hashicorp/serf v0.9.7 // indirect
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/johntdyer/slack-go v0.0.0-20180213144715-95fac1160b22 // indirect
github.com/johntdyer/slackrus v0.0.0-20211215141436-33e4a270affb // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/karrick/godirwalk v1.17.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lbryio/chainquery v1.9.0 // indirect
github.com/lbryio/lbry.go v1.1.2 // indirect
github.com/lbryio/types v0.0.0-20220224142228-73610f6654a6 // indirect
github.com/leodido/go-urn v1.2.0 // indirect
github.com/lucas-clemente/quic-go v0.28.1 // indirect
github.com/lyoshenka/bencode v0.0.0-20180323155644-b7abd7672df5 // indirect
github.com/magiconair/properties v1.8.1 // indirect
github.com/marten-seemann/qpack v0.2.1 // indirect
github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect
github.com/marten-seemann/qtls-go1-17 v0.1.2 // indirect
github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect
github.com/marten-seemann/qtls-go1-19 v0.1.0-beta.1 // indirect
github.com/mattn/go-isatty v0.0.12 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/miekg/dns v1.1.41 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo v1.16.4 // indirect
github.com/onsi/gomega v1.17.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
github.com/pelletier/go-toml v1.9.3 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/slack-go/slack v0.10.3 // indirect
github.com/spf13/afero v1.4.1 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.7.1 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/ugorji/go/codec v1.1.7 // indirect
github.com/volatiletech/inflect v0.0.0-20170731032912-e7201282ae8d // indirect
github.com/volatiletech/null v8.0.0+incompatible // indirect
github.com/volatiletech/sqlboiler v3.4.0+incompatible // indirect
github.com/ybbus/jsonrpc v2.1.2+incompatible // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect
golang.org/x/mod v0.4.2 // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
golang.org/x/tools v0.1.5 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/ini.v1 v1.60.2 // indirect
gopkg.in/nullbio/null.v6 v6.0.0-20161116030900-40264a2e6b79 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
gotest.tools/v3 v3.2.0 // indirect
)

609
go.sum

File diff suppressed because it is too large Load diff

93
main.go
View file

@ -7,14 +7,15 @@ import (
"os"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/v5/configs"
"github.com/lbryio/ytsync/v5/manager"
"github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/ytsync/v5/shared"
ytUtils "github.com/lbryio/ytsync/v5/util"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
@ -31,6 +32,10 @@ var (
func main() {
rand.Seed(time.Now().UnixNano())
log.SetLevel(log.DebugLevel)
customFormatter := new(log.TextFormatter)
customFormatter.TimestampFormat = "2006-01-02 15:04:05"
customFormatter.FullTimestamp = true
log.SetFormatter(customFormatter)
http.Handle("/metrics", promhttp.Handler())
go func() {
log.Error(http.ListenAndServe(":2112", nil))
@ -69,22 +74,15 @@ func main() {
}
func ytSync(cmd *cobra.Command, args []string) {
var hostname string
slackToken := os.Getenv("SLACK_TOKEN")
if slackToken == "" {
log.Error("A slack token was not present in env vars! Slack messages disabled!")
} else {
var err error
hostname, err = os.Hostname()
if err != nil {
log.Error("could not detect system hostname")
hostname = "ytsync-unknown"
}
if len(hostname) > 30 {
hostname = hostname[0:30]
}
err := configs.Init("./config.json")
if err != nil {
log.Fatalf("could not parse configuration file: %s", errors.FullTrace(err))
}
util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname)
if configs.Configuration.SlackToken == "" {
log.Error("A slack token was not present in the config! Slack messages disabled!")
} else {
util.InitSlack(configs.Configuration.SlackToken, configs.Configuration.SlackChannel, configs.Configuration.GetHostname())
}
if cliFlags.Status != "" && !util.InSlice(cliFlags.Status, shared.SyncStatuses) {
@ -103,68 +101,33 @@ func ytSync(cmd *cobra.Command, args []string) {
}
cliFlags.MaxVideoLength = time.Duration(maxVideoLength) * time.Hour
apiURL := os.Getenv("LBRY_WEB_API")
apiToken := os.Getenv("LBRY_API_TOKEN")
youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY")
lbrycrdDsn := os.Getenv("LBRYCRD_STRING")
awsS3ID := os.Getenv("AWS_S3_ID")
awsS3Secret := os.Getenv("AWS_S3_SECRET")
awsS3Region := os.Getenv("AWS_S3_REGION")
awsS3Bucket := os.Getenv("AWS_S3_BUCKET")
if apiURL == "" {
log.Errorln("An API URL was not defined. Please set the environment variable LBRY_WEB_API")
if configs.Configuration.InternalApisEndpoint == "" {
log.Errorln("An Internal APIs Endpoint was not defined")
return
}
if apiToken == "" {
log.Errorln("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN")
if configs.Configuration.InternalApisAuthToken == "" {
log.Errorln("An Internal APIs auth token was not defined")
return
}
if youtubeAPIKey == "" {
log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY")
if configs.Configuration.WalletS3Config.ID == "" || configs.Configuration.WalletS3Config.Region == "" || configs.Configuration.WalletS3Config.Bucket == "" || configs.Configuration.WalletS3Config.Secret == "" || configs.Configuration.WalletS3Config.Endpoint == "" {
log.Errorln("Wallet S3 configuration is incomplete")
return
}
if awsS3ID == "" {
log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID")
if configs.Configuration.BlockchaindbS3Config.ID == "" || configs.Configuration.BlockchaindbS3Config.Region == "" || configs.Configuration.BlockchaindbS3Config.Bucket == "" || configs.Configuration.BlockchaindbS3Config.Secret == "" || configs.Configuration.BlockchaindbS3Config.Endpoint == "" {
log.Errorln("Blockchain DBs S3 configuration is incomplete")
return
}
if awsS3Secret == "" {
log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET")
return
}
if awsS3Region == "" {
log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION")
return
}
if awsS3Bucket == "" {
log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET")
return
}
if lbrycrdDsn == "" {
log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else")
if configs.Configuration.LbrycrdString == "" {
log.Infoln("Using default (local) lbrycrd instance. Set lbrycrd_string if you want to use something else")
}
blobsDir := ytUtils.GetBlobsDir()
apiConfig := &sdk.APIConfig{
YoutubeAPIKey: youtubeAPIKey,
ApiURL: apiURL,
ApiToken: apiToken,
HostName: hostname,
}
awsConfig := &shared.AwsConfigs{
AwsS3ID: awsS3ID,
AwsS3Secret: awsS3Secret,
AwsS3Region: awsS3Region,
AwsS3Bucket: awsS3Bucket,
}
sm := manager.NewSyncManager(
cliFlags,
blobsDir,
lbrycrdDsn,
awsConfig,
apiConfig,
)
err := sm.Start()
err = sm.Start()
if err != nil {
ytUtils.SendErrorToSlack(errors.FullTrace(err))
}

View file

@ -8,6 +8,7 @@ import (
"time"
"github.com/lbryio/ytsync/v5/blobs_reflector"
"github.com/lbryio/ytsync/v5/configs"
"github.com/lbryio/ytsync/v5/ip_manager"
"github.com/lbryio/ytsync/v5/namer"
"github.com/lbryio/ytsync/v5/sdk"
@ -24,19 +25,17 @@ type SyncManager struct {
CliFlags shared.SyncFlags
ApiConfig *sdk.APIConfig
LbrycrdDsn string
AwsConfigs *shared.AwsConfigs
blobsDir string
channelsToSync []Sync
}
func NewSyncManager(cliFlags shared.SyncFlags, blobsDir, lbrycrdDsn string, awsConfigs *shared.AwsConfigs, apiConfig *sdk.APIConfig) *SyncManager {
func NewSyncManager(cliFlags shared.SyncFlags, blobsDir string) *SyncManager {
return &SyncManager{
CliFlags: cliFlags,
blobsDir: blobsDir,
LbrycrdDsn: lbrycrdDsn,
AwsConfigs: awsConfigs,
ApiConfig: apiConfig,
LbrycrdDsn: configs.Configuration.LbrycrdString,
ApiConfig: sdk.GetAPIsConfigs(),
}
}
func (s *SyncManager) enqueueChannel(channel *shared.YoutubeChannel) {
@ -139,7 +138,7 @@ func (s *SyncManager) Start() error {
"WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR",
"NotEnoughFunds",
"no space left on device",
"failure uploading wallet",
"there was a problem uploading the wallet",
"the channel in the wallet is different than the channel in the database",
"this channel does not belong to this wallet!",
"You already have a stream claim published under the name",
@ -153,6 +152,10 @@ func (s *SyncManager) Start() error {
logUtils.SendInfoToSlack("A non fatal error was reported by the sync process.\n%s", errors.FullTrace(err))
}
}
err = logUtils.CleanupMetadata()
if err != nil {
log.Errorf("something went wrong while trying to clear out the video metadata directory: %s", errors.FullTrace(err))
}
err = blobs_reflector.ReflectAndClean()
if err != nil {
return errors.Prefix("@Nikooo777 something went wrong while reflecting blobs", err)

View file

@ -3,6 +3,13 @@ package manager
import (
"os"
"path/filepath"
"strings"
"time"
"github.com/lbryio/ytsync/v5/configs"
"github.com/lbryio/ytsync/v5/util"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
@ -10,24 +17,21 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
log "github.com/sirupsen/logrus"
"github.com/lbryio/lbry.go/v2/extras/errors"
logUtils "github.com/lbryio/ytsync/v5/util"
)
func (s *Sync) getS3Downloader() (*s3manager.Downloader, error) {
s3Session, err := session.NewSession(s.Manager.AwsConfigs.GetS3AWSConfig())
func (s *Sync) getS3Downloader(config *aws.Config) (*s3manager.Downloader, error) {
s3Session, err := session.NewSession(config)
if err != nil {
return nil, errors.Prefix("error starting session: ", err)
return nil, errors.Prefix("error starting session", err)
}
downloader := s3manager.NewDownloader(s3Session)
return downloader, nil
}
func (s *Sync) getS3Uploader() (*s3manager.Uploader, error) {
s3Session, err := session.NewSession(s.Manager.AwsConfigs.GetS3AWSConfig())
func (s *Sync) getS3Uploader(config *aws.Config) (*s3manager.Uploader, error) {
s3Session, err := session.NewSession(config)
if err != nil {
return nil, errors.Prefix("error starting session: ", err)
return nil, errors.Prefix("error starting session", err)
}
uploader := s3manager.NewUploader(s3Session)
return uploader, nil
@ -38,18 +42,18 @@ func (s *Sync) downloadWallet() error {
if err != nil {
return errors.Err(err)
}
downloader, err := s.getS3Downloader()
downloader, err := s.getS3Downloader(configs.Configuration.WalletS3Config.GetS3AWSConfig())
if err != nil {
return err
}
out, err := os.Create(defaultTempWalletDir)
if err != nil {
return errors.Prefix("error creating temp wallet: ", err)
return errors.Prefix("error creating temp wallet", err)
}
defer out.Close()
bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{
Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket),
Bucket: aws.String(configs.Configuration.WalletS3Config.Bucket),
Key: key,
})
if err != nil {
@ -74,21 +78,21 @@ func (s *Sync) downloadWallet() error {
err = os.Rename(defaultTempWalletDir, defaultWalletDir)
if err != nil {
return errors.Prefix("error replacing temp wallet for default wallet: ", err)
return errors.Prefix("error replacing temp wallet for default wallet", err)
}
return nil
}
func (s *Sync) downloadBlockchainDB() error {
if logUtils.IsRegTest() {
if util.IsRegTest() {
return nil // tests fail if we re-use the same blockchain DB
}
defaultBDBDir, defaultTempBDBDir, key, err := s.getBlockchainDBPaths()
defaultBDBPath, defaultTempBDBPath, key, err := s.getBlockchainDBPaths()
if err != nil {
return errors.Err(err)
}
files, err := filepath.Glob(defaultBDBDir + "*")
files, err := filepath.Glob(defaultBDBPath + "*")
if err != nil {
return errors.Err(err)
}
@ -101,18 +105,18 @@ func (s *Sync) downloadBlockchainDB() error {
if s.DbChannelData.WipeDB {
return nil
}
downloader, err := s.getS3Downloader()
downloader, err := s.getS3Downloader(configs.Configuration.BlockchaindbS3Config.GetS3AWSConfig())
if err != nil {
return errors.Err(err)
}
out, err := os.Create(defaultTempBDBDir)
out, err := os.Create(defaultTempBDBPath)
if err != nil {
return errors.Prefix("error creating temp wallet: ", err)
return errors.Prefix("error creating temp blockchain DB file", err)
}
defer out.Close()
bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{
Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket),
Bucket: aws.String(configs.Configuration.BlockchaindbS3Config.Bucket),
Key: key,
})
if err != nil {
@ -135,11 +139,16 @@ func (s *Sync) downloadBlockchainDB() error {
return errors.Err("zero bytes written")
}
err = os.Rename(defaultTempBDBDir, defaultBDBDir)
blockchainDbDir := strings.Replace(defaultBDBPath, "blockchain.db", "", -1)
err = util.Untar(defaultTempBDBPath, blockchainDbDir)
if err != nil {
return errors.Prefix("error replacing temp blockchain.db for default blockchain.db: ", err)
return errors.Prefix("error extracting blockchain.db files", err)
}
log.Printf("blockchain.db downloaded to %s", defaultBDBDir)
err = os.Remove(defaultTempBDBPath)
if err != nil {
return errors.Err(err)
}
log.Printf("blockchain.db data downloaded and extracted to %s", blockchainDbDir)
return nil
}
@ -147,7 +156,7 @@ func (s *Sync) getWalletPaths() (defaultWallet, tempWallet string, key *string,
defaultWallet = os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
tempWallet = os.Getenv("HOME") + "/.lbryum/wallets/tmp_wallet"
key = aws.String("/wallets/" + s.DbChannelData.ChannelId)
if logUtils.IsRegTest() {
if util.IsRegTest() {
defaultWallet = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
tempWallet = os.Getenv("HOME") + "/.lbryum_regtest/wallets/tmp_wallet"
key = aws.String("/regtest/" + s.DbChannelData.ChannelId)
@ -168,27 +177,27 @@ func (s *Sync) getWalletPaths() (defaultWallet, tempWallet string, key *string,
func (s *Sync) getBlockchainDBPaths() (defaultDB, tempDB string, key *string, err error) {
lbryumDir := os.Getenv("LBRYUM_DIR")
if lbryumDir == "" {
if logUtils.IsRegTest() {
if util.IsRegTest() {
lbryumDir = os.Getenv("HOME") + "/.lbryum_regtest"
} else {
lbryumDir = os.Getenv("HOME") + "/.lbryum"
}
}
defaultDB = lbryumDir + "/lbc_mainnet/blockchain.db"
tempDB = lbryumDir + "/lbc_mainnet/tmp_blockchain.db"
key = aws.String("/blockchain_dbs/" + s.DbChannelData.ChannelId)
if logUtils.IsRegTest() {
tempDB = lbryumDir + "/lbc_mainnet/tmp_blockchain.tar"
key = aws.String("/blockchain_dbs/" + s.DbChannelData.ChannelId + ".tar")
if util.IsRegTest() {
defaultDB = lbryumDir + "/lbc_regtest/blockchain.db"
tempDB = lbryumDir + "/lbc_regtest/tmp_blockchain.db"
key = aws.String("/regtest_dbs/" + s.DbChannelData.ChannelId)
tempDB = lbryumDir + "/lbc_regtest/tmp_blockchain.tar"
key = aws.String("/regtest_dbs/" + s.DbChannelData.ChannelId + ".tar")
}
return
}
func (s *Sync) uploadWallet() error {
defaultWalletDir := logUtils.GetDefaultWalletPath()
defaultWalletDir := util.GetDefaultWalletPath()
key := aws.String("/wallets/" + s.DbChannelData.ChannelId)
if logUtils.IsRegTest() {
if util.IsRegTest() {
key = aws.String("/regtest/" + s.DbChannelData.ChannelId)
}
@ -196,7 +205,7 @@ func (s *Sync) uploadWallet() error {
return errors.Err("default_wallet does not exist")
}
uploader, err := s.getS3Uploader()
uploader, err := s.getS3Uploader(configs.Configuration.WalletS3Config.GetS3AWSConfig())
if err != nil {
return err
}
@ -207,13 +216,22 @@ func (s *Sync) uploadWallet() error {
}
defer file.Close()
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket),
Key: key,
Body: file,
})
start := time.Now()
for time.Since(start) < 30*time.Minute {
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(configs.Configuration.WalletS3Config.Bucket),
Key: key,
Body: file,
})
if err != nil {
time.Sleep(30 * time.Second)
continue
}
break
}
if err != nil {
return err
return errors.Prefix("there was a problem uploading the wallet to S3", errors.Err(err))
}
log.Println("wallet uploaded to S3")
@ -229,26 +247,39 @@ func (s *Sync) uploadBlockchainDB() error {
if _, err := os.Stat(defaultBDBDir); os.IsNotExist(err) {
return errors.Err("blockchain.db does not exist")
}
uploader, err := s.getS3Uploader()
files, err := filepath.Glob(defaultBDBDir + "*")
if err != nil {
return errors.Err(err)
}
tarPath := strings.Replace(defaultBDBDir, "blockchain.db", "", -1) + s.DbChannelData.ChannelId + ".tar"
err = util.CreateTarball(tarPath, files)
if err != nil {
return err
}
file, err := os.Open(defaultBDBDir)
uploader, err := s.getS3Uploader(configs.Configuration.BlockchaindbS3Config.GetS3AWSConfig())
if err != nil {
return err
}
file, err := os.Open(tarPath)
if err != nil {
return err
}
defer file.Close()
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.Manager.AwsConfigs.AwsS3Bucket),
Bucket: aws.String(configs.Configuration.BlockchaindbS3Config.Bucket),
Key: key,
Body: file,
})
if err != nil {
return err
}
log.Println("blockchain.db uploaded to S3")
log.Println("blockchain.db files uploaded to S3")
err = os.Remove(tarPath)
if err != nil {
return errors.Err(err)
}
return os.Remove(defaultBDBDir)
}

View file

@ -103,6 +103,9 @@ func (s *Sync) walletSetup() error {
videosOnYoutube = s.Manager.CliFlags.VideosToSync(s.DbChannelData.TotalSubscribers)
}
unallocatedVideos := videosOnYoutube - (publishedCount + failedCount)
if unallocatedVideos < 0 {
unallocatedVideos = 0
}
channelFee := channelClaimAmount
channelAlreadyClaimed := s.DbChannelData.ChannelClaimID != ""
if channelAlreadyClaimed {
@ -110,7 +113,7 @@ func (s *Sync) walletSetup() error {
}
requiredBalance := float64(unallocatedVideos)*(publishAmount+estimatedMaxTxFee) + channelFee
if s.Manager.CliFlags.UpgradeMetadata {
requiredBalance += float64(notUpgradedCount) * 0.001
requiredBalance += float64(notUpgradedCount) * estimatedMaxTxFee
}
refillAmount := 0.0
@ -127,6 +130,12 @@ func (s *Sync) walletSetup() error {
if err != nil {
return errors.Err(err)
}
} else if balance > requiredBalance {
extraLBC := balance - requiredBalance
if extraLBC > 5 {
sendBackAmount := extraLBC - 1
logUtils.SendInfoToSlack("channel %s has %.1f credits which is %.1f more than it requires (%.1f). We should send at least %.1f that back.", s.DbChannelData.ChannelId, balance, extraLBC, requiredBalance, sendBackAmount)
}
}
claimAddress, err := s.daemon.AddressList(nil, nil, 1, 20)
@ -311,12 +320,12 @@ func (s *Sync) waitForNewBlock() error {
func (s *Sync) GenerateRegtestBlock() error {
lbrycrd, err := logUtils.GetLbrycrdClient(s.Manager.LbrycrdDsn)
if err != nil {
return errors.Prefix("error getting lbrycrd client: ", err)
return errors.Prefix("error getting lbrycrd client", err)
}
txs, err := lbrycrd.Generate(1)
if err != nil {
return errors.Prefix("error generating new block: ", err)
return errors.Prefix("error generating new block", err)
}
for _, tx := range txs {
@ -366,14 +375,12 @@ func (s *Sync) ensureChannelOwnership() error {
channelUsesOldMetadata := false
if channelToUse != nil {
channelUsesOldMetadata = channelToUse.Value.GetThumbnail() == nil
channelUsesOldMetadata = channelToUse.Value.GetThumbnail() == nil || (len(channelToUse.Value.GetLanguages()) == 0 && s.DbChannelData.Language != "")
if !channelUsesOldMetadata {
return nil
}
}
channelBidAmount := channelClaimAmount
balanceResp, err := s.daemon.AccountBalance(nil)
if err != nil {
return err
@ -385,8 +392,8 @@ func (s *Sync) ensureChannelOwnership() error {
return errors.Err(err)
}
if balance.LessThan(decimal.NewFromFloat(channelBidAmount)) {
err = s.addCredits(channelBidAmount + 0.3)
if balance.LessThan(decimal.NewFromFloat(channelClaimAmount)) {
err = s.addCredits(channelClaimAmount + estimatedMaxTxFee*3)
if err != nil {
return err
}
@ -407,7 +414,7 @@ func (s *Sync) ensureChannelOwnership() error {
}
thumbnail := channelInfo.Header.C4TabbedHeaderRenderer.Avatar.Thumbnails[len(channelInfo.Header.C4TabbedHeaderRenderer.Avatar.Thumbnails)-1].URL
thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail, s.DbChannelData.ChannelId, *s.Manager.AwsConfigs.GetS3AWSConfig())
thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail, s.DbChannelData.ChannelId)
if err != nil {
return err
}
@ -416,7 +423,6 @@ func (s *Sync) ensureChannelOwnership() error {
if channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails != nil {
bURL, err := thumbs.MirrorThumbnail(channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails[len(channelInfo.Header.C4TabbedHeaderRenderer.Banner.Thumbnails)-1].URL,
"banner-"+s.DbChannelData.ChannelId,
*s.Manager.AwsConfigs.GetS3AWSConfig(),
)
if err != nil {
return err
@ -425,18 +431,16 @@ func (s *Sync) ensureChannelOwnership() error {
}
var languages []string = nil
//we don't have this data without the API
//if channelInfo.DefaultLanguage != "" {
// if channelInfo.DefaultLanguage == "iw" {
// channelInfo.DefaultLanguage = "he"
// }
// languages = []string{channelInfo.DefaultLanguage}
//}
if s.DbChannelData.Language != "" {
languages = []string{s.DbChannelData.Language}
}
var locations []jsonrpc.Location = nil
if channelInfo.Topbar.DesktopTopbarRenderer.CountryCode != "" {
locations = []jsonrpc.Location{{Country: &channelInfo.Topbar.DesktopTopbarRenderer.CountryCode}}
}
var c *jsonrpc.TransactionSummary
var recoveredChannelClaimID string
claimCreateOptions := jsonrpc.ClaimCreateOptions{
Title: &channelInfo.Microformat.MicroformatDataRenderer.Title,
Description: &channelInfo.Metadata.ChannelMetadataRenderer.Description,
@ -446,12 +450,20 @@ func (s *Sync) ensureChannelOwnership() error {
ThumbnailURL: &thumbnailURL,
}
if channelUsesOldMetadata {
da, err := s.getDefaultAccount()
if err != nil {
return err
}
if s.DbChannelData.TransferState <= 1 {
c, err = s.daemon.ChannelUpdate(s.DbChannelData.ChannelClaimID, jsonrpc.ChannelUpdateOptions{
ClearTags: util.PtrToBool(true),
ClearLocations: util.PtrToBool(true),
ClearLanguages: util.PtrToBool(true),
ChannelCreateOptions: jsonrpc.ChannelCreateOptions{
AccountID: &da,
FundingAccountIDs: []string{
da,
},
ClaimCreateOptions: claimCreateOptions,
CoverURL: bannerURL,
},
@ -461,20 +473,50 @@ func (s *Sync) ensureChannelOwnership() error {
return nil
}
} else {
c, err = s.daemon.ChannelCreate(s.DbChannelData.DesiredChannelName, channelBidAmount, jsonrpc.ChannelCreateOptions{
c, err = s.daemon.ChannelCreate(s.DbChannelData.DesiredChannelName, channelClaimAmount, jsonrpc.ChannelCreateOptions{
ClaimCreateOptions: claimCreateOptions,
CoverURL: bannerURL,
})
if err != nil {
claimId, err2 := s.getChannelClaimIDForTimedOutCreation()
if err2 != nil {
err = errors.Prefix(err2.Error(), err)
} else {
recoveredChannelClaimID = claimId
}
}
}
if err != nil {
return err
}
s.DbChannelData.ChannelClaimID = c.Outputs[0].ClaimID
if recoveredChannelClaimID != "" {
s.DbChannelData.ChannelClaimID = recoveredChannelClaimID
} else {
s.DbChannelData.ChannelClaimID = c.Outputs[0].ClaimID
}
return s.Manager.ApiConfig.SetChannelClaimID(s.DbChannelData.ChannelId, s.DbChannelData.ChannelClaimID)
}
//getChannelClaimIDForTimedOutCreation is a raw function that returns the only channel that exists in the wallet
// this is used because the SDK sucks and can't figure out when to return when creating a claim...
func (s *Sync) getChannelClaimIDForTimedOutCreation() (string, error) {
channels, err := s.daemon.ChannelList(nil, 1, 500, nil)
if err != nil {
return "", err
} else if channels == nil {
return "", errors.Err("no channel response")
}
if len((*channels).Items) != 1 {
return "", errors.Err("more than one channel found when trying to recover from SDK failure in creating the channel")
}
desiredChannel := (*channels).Items[0]
if desiredChannel.Name != s.DbChannelData.DesiredChannelName {
return "", errors.Err("the channel found in the wallet has a different name than the one we expected")
}
return desiredChannel.ClaimID, nil
}
func (s *Sync) addCredits(amountToAdd float64) error {
start := time.Now()
defer func(start time.Time) {

View file

@ -241,7 +241,7 @@ func transferVideos(s *Sync) error {
},
},
},
Bid: util.PtrToString("0.005"), // Todo - Dont hardcode
Bid: util.PtrToString(fmt.Sprintf("%.5f", publishAmount/2.)),
}
videoStatus := shared.VideoStatus{
ChannelID: s.DbChannelData.ChannelId,
@ -293,7 +293,7 @@ func (s *Sync) streamUpdate(ui *updateInfo) error {
timing.TimedComponent("transferStreamUpdate").Add(time.Since(start))
if updateError != nil {
ui.videoStatus.FailureReason = updateError.Error()
ui.videoStatus.Status = shared.VideoStatusTranferFailed
ui.videoStatus.Status = shared.VideoStatusTransferFailed
ui.videoStatus.IsTransferred = util.PtrToBool(false)
} else {
ui.videoStatus.IsTransferred = util.PtrToBool(len(result.Outputs) != 0)

View file

@ -21,6 +21,7 @@ import (
"github.com/lbryio/ytsync/v5/timing"
logUtils "github.com/lbryio/ytsync/v5/util"
"github.com/lbryio/ytsync/v5/ytapi"
"github.com/vbauerster/mpb/v7"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
@ -32,11 +33,10 @@ import (
const (
channelClaimAmount = 0.01
estimatedMaxTxFee = 0.1
estimatedMaxTxFee = 0.0015
minimumAccountBalance = 1.0
minimumRefillAmount = 1
publishAmount = 0.01
maxReasonLength = 500
publishAmount = 0.002
)
// Sync stores the options that control how syncing happens
@ -54,6 +54,9 @@ type Sync struct {
queue chan ytapi.Video
defaultAccountID string
hardVideoFailure hardVideoFailure
progressBarWg *sync.WaitGroup
progressBar *mpb.Progress
}
type hardVideoFailure struct {
@ -128,7 +131,6 @@ func (s *Sync) FullCycle() (e error) {
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
s.grp.Stop()
time.Sleep(5 * time.Second)
debug.PrintStack() // so we can figure out what's not stopping
}()
err := s.setStatusSyncing()
if err != nil {
@ -176,7 +178,12 @@ func (s *Sync) FullCycle() (e error) {
return err
}
s.progressBarWg = &sync.WaitGroup{}
s.progressBar = mpb.New(mpb.WithWaitGroup(s.progressBarWg))
err = s.doSync()
// Waiting for passed &wg and for all bars to complete and flush
s.progressBar.Wait()
if err != nil {
return err
}
@ -278,6 +285,8 @@ func (s *Sync) setChannelTerminationStatus(e *error) {
"interrupted during daemon startup",
"interrupted by user",
"use --skip-space-check to ignore",
"failure uploading blockchain DB",
"default_wallet already exists",
}
dbWipeConditions := []string{
"Missing inputs",
@ -327,7 +336,7 @@ func (s *Sync) waitForDaemonStart() error {
}
func (s *Sync) stopAndUploadWallet(e *error) {
log.Printf("Stopping daemon")
log.Println("Stopping daemon")
shutdownErr := logUtils.StopDaemon()
if shutdownErr != nil {
logShutdownError(shutdownErr)
@ -342,17 +351,17 @@ func (s *Sync) stopAndUploadWallet(e *error) {
err := s.uploadWallet()
if err != nil {
if *e == nil {
e = &err
*e = err
} else {
*e = errors.Prefix("failure uploading wallet", *e)
*e = errors.Prefix(fmt.Sprintf("%s + original error", errors.FullTrace(err)), *e)
}
}
err = s.uploadBlockchainDB()
if err != nil {
if *e == nil {
e = &err
*e = err
} else {
*e = errors.Prefix("failure uploading wallet", *e)
*e = errors.Prefix(fmt.Sprintf("failure uploading blockchain DB: %s + original error", errors.FullTrace(err)), *e)
}
}
}
@ -487,7 +496,7 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim, ownClaims []jsonrpc.Claim)
claimMarkedUnpublished := claimInDatabase && !sv.Published
_, isOwnClaim := ownClaimsInfo[videoID]
transferred := !isOwnClaim || s.DbChannelData.TransferState == 3
transferStatusMismatch := sv.Transferred != transferred
transferStatusMismatch := claimInDatabase && sv.Transferred != transferred
if metadataDiffers {
log.Debugf("%s: Mismatch in database for metadata. DB: %d - Blockchain: %d", videoID, sv.MetadataVersion, chainInfo.MetadataVersion)
@ -509,9 +518,14 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim, ownClaims []jsonrpc.Claim)
}
if !claimInDatabase || metadataDiffers || claimIDDiffers || claimNameDiffers || claimMarkedUnpublished || transferStatusMismatch {
claimSize, err := chainInfo.Claim.GetStreamSizeByMagic()
if err != nil {
claimSize = 0
claimSize := uint64(0)
if chainInfo.Claim.Value.GetStream().Source != nil {
claimSize, err = chainInfo.Claim.GetStreamSizeByMagic()
if err != nil {
claimSize = 0
}
} else {
util.SendToSlack("[%s] video with claimID %s has no source?! panic prevented...", s.DbChannelData.ChannelId, chainInfo.ClaimID)
}
fixed++
log.Debugf("updating %s in the database", videoID)
@ -543,7 +557,11 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim, ownClaims []jsonrpc.Claim)
if sv.Transferred || sv.IsLbryFirst {
_, ok := allClaimsInfo[vID]
if !ok && sv.Published {
searchResponse, err := s.daemon.ClaimSearch(nil, &sv.ClaimID, nil, nil, 1, 20)
searchResponse, err := s.daemon.ClaimSearch(jsonrpc.ClaimSearchArgs{
ClaimID: &sv.ClaimID,
Page: 1,
PageSize: 20,
})
if err != nil {
log.Error(err.Error())
continue
@ -659,7 +677,8 @@ func (s *Sync) checkIntegrity() error {
if pubsOnWallet > pubsOnDB { //This case should never happen
logUtils.SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.DbChannelData.ChannelId)
return errors.Err("not all published videos are in the database")
//we never really done anything about those. it happens when a user updates the channel for a publish to another ytsync channel
//return errors.Err("not all published videos are in the database")
}
if pubsOnWallet < pubsOnDB {
logUtils.SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.DbChannelData.ChannelId)
@ -735,55 +754,7 @@ func (s *Sync) doSync() error {
func (s *Sync) startWorker(workerNum int) {
var v ytapi.Video
var more bool
fatalErrors := []string{
":5279: read: connection reset by peer",
"no space left on device",
"NotEnoughFunds",
"Cannot publish using channel",
"cannot concatenate 'str' and 'NoneType' objects",
"more than 90% of the space has been used.",
"Couldn't find private key for id",
"You already have a stream claim published under the name",
"Missing inputs",
}
errorsNoRetry := []string{
"non 200 status code received",
"This video contains content from",
"dont know which claim to update",
"uploader has not made this video available in your country",
"download error: AccessDenied: Access Denied",
"Playback on other websites has been disabled by the video owner",
"Error in daemon: Cannot publish empty file",
"Error extracting sts from embedded url response",
"Unable to extract signature tokens",
"Client.Timeout exceeded while awaiting headers",
"the video is too big to sync, skipping for now",
"video is too long to process",
"video is too short to process",
"no compatible format available for this video",
"Watch this video on YouTube.",
"have blocked it on copyright grounds",
"the video must be republished as we can't get the right size",
"HTTP Error 403",
"giving up after 0 fragment retries",
"Sorry about that",
"This video is not available",
"requested format not available",
"interrupted by user",
"Sign in to confirm your age",
"This video is unavailable",
"video is a live stream and hasn't completed yet",
}
walletErrors := []string{
"Not enough funds to cover this transaction",
"failed: Not enough funds",
"Error in daemon: Insufficient funds, please deposit additional LBC",
//"Missing inputs",
}
blockchainErrors := []string{
"txn-mempool-conflict",
"too-long-mempool-chain",
}
for {
select {
case <-s.grp.Ch():
@ -813,17 +784,18 @@ func (s *Sync) startWorker(workerNum int) {
default:
}
tryCount++
err := s.processVideo(v)
if err != nil {
logUtils.SendErrorToSlack("error processing video %s: %s", v.ID(), err.Error())
shouldRetry := s.Manager.CliFlags.MaxTries > 1 && !util.SubstringInSlice(err.Error(), errorsNoRetry) && tryCount < s.Manager.CliFlags.MaxTries
shouldRetry := s.Manager.CliFlags.MaxTries > 1 && !util.SubstringInSlice(err.Error(), shared.ErrorsNoRetry) && tryCount < s.Manager.CliFlags.MaxTries
if strings.Contains(strings.ToLower(err.Error()), "interrupted by user") {
s.grp.Stop()
} else if util.SubstringInSlice(err.Error(), fatalErrors) {
} else if util.SubstringInSlice(err.Error(), shared.FatalErrors) {
s.hardVideoFailure.flagFailure(err.Error())
s.grp.Stop()
} else if shouldRetry {
if util.SubstringInSlice(err.Error(), blockchainErrors) {
if util.SubstringInSlice(err.Error(), shared.BlockchainErrors) {
log.Println("waiting for a block before retrying")
err := s.waitForNewBlock()
if err != nil {
@ -831,7 +803,7 @@ func (s *Sync) startWorker(workerNum int) {
logUtils.SendErrorToSlack("something went wrong while waiting for a block: %s", errors.FullTrace(err))
break
}
} else if util.SubstringInSlice(err.Error(), walletErrors) {
} else if util.SubstringInSlice(err.Error(), shared.WalletErrors) {
log.Println("checking funds and UTXOs before retrying...")
err := s.walletSetup()
if err != nil {
@ -895,9 +867,8 @@ func (s *Sync) enqueueYoutubeVideos() error {
return err
}
videos, err := ytapi.GetVideosToSync(s.Manager.ApiConfig, s.DbChannelData.ChannelId, s.syncedVideos, s.Manager.CliFlags.QuickSync, s.Manager.CliFlags.VideosToSync(s.DbChannelData.TotalSubscribers), ytapi.VideoParams{
videos, err := ytapi.GetVideosToSync(s.DbChannelData.ChannelId, s.syncedVideos, s.Manager.CliFlags.QuickSync, s.Manager.CliFlags.VideosToSync(s.DbChannelData.TotalSubscribers), ytapi.VideoParams{
VideoDir: s.videoDirectory,
S3Config: *s.Manager.AwsConfigs.GetS3AWSConfig(),
Stopper: s.grp,
IPPool: ipPool,
}, s.DbChannelData.LastUploadedVideo)
@ -986,7 +957,7 @@ func (s *Sync) processVideo(v ytapi.Video) (err error) {
DefaultAccount: da,
}
summary, err := v.Sync(s.daemon, sp, &sv, videoRequiresUpgrade, s.walletMux)
summary, err := v.Sync(s.daemon, sp, &sv, videoRequiresUpgrade, s.walletMux, s.progressBarWg, s.progressBar)
if err != nil {
return err
}

View file

@ -1,31 +1,17 @@
package metrics
import (
"os"
"regexp"
"github.com/lbryio/ytsync/v5/configs"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
)
var (
Durations = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "ytsync",
Subsystem: getHostname(),
Subsystem: configs.Configuration.GetHostname(),
Name: "duration",
Help: "The durations of the individual modules",
}, []string{"path"})
)
func getHostname() string {
hostname, err := os.Hostname()
if err != nil {
hostname = "ytsync_unknown"
}
reg, err := regexp.Compile("[^a-zA-Z0-9_]+")
if err != nil {
log.Fatal(err)
}
return reg.ReplaceAllString(hostname, "_")
}

View file

@ -13,6 +13,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/null"
"github.com/lbryio/ytsync/v5/configs"
"github.com/lbryio/ytsync/v5/shared"
"github.com/lbryio/ytsync/v5/util"
@ -25,10 +26,22 @@ const (
)
type APIConfig struct {
YoutubeAPIKey string
ApiURL string
ApiToken string
HostName string
ApiURL string
ApiToken string
HostName string
}
var instance *APIConfig
func GetAPIsConfigs() *APIConfig {
if instance == nil {
instance = &APIConfig{
ApiURL: configs.Configuration.InternalApisEndpoint,
ApiToken: configs.Configuration.InternalApisAuthToken,
HostName: configs.Configuration.GetHostname(),
}
}
return instance
}
func (a *APIConfig) FetchChannels(status string, cliFlags *shared.SyncFlags) ([]shared.YoutubeChannel, error) {
@ -48,12 +61,9 @@ func (a *APIConfig) FetchChannels(status string, cliFlags *shared.SyncFlags) ([]
"channel_id": {cliFlags.ChannelID},
})
if err != nil {
if strings.Contains(err.Error(), "EOF") {
util.SendErrorToSlack("EOF error while trying to call %s. Waiting to retry", endpoint)
time.Sleep(30 * time.Second)
return a.FetchChannels(status, cliFlags)
}
return nil, errors.Err(err)
util.SendErrorToSlack("error while trying to call %s. Waiting to retry: %s", endpoint, err.Error())
time.Sleep(30 * time.Second)
return a.FetchChannels(status, cliFlags)
}
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
@ -111,12 +121,9 @@ func (a *APIConfig) SetChannelCert(certHex string, channelID string) error {
"auth_token": {a.ApiToken},
})
if err != nil {
if strings.Contains(err.Error(), "EOF") {
util.SendErrorToSlack("EOF error while trying to call %s. Waiting to retry", endpoint)
time.Sleep(30 * time.Second)
return a.SetChannelCert(certHex, channelID)
}
return errors.Err(err)
util.SendErrorToSlack("error while trying to call %s. Waiting to retry: %s", endpoint, err.Error())
time.Sleep(30 * time.Second)
return a.SetChannelCert(certHex, channelID)
}
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
@ -159,12 +166,9 @@ func (a *APIConfig) SetChannelStatus(channelID string, status string, failureRea
}
res, err := http.PostForm(endpoint, params)
if err != nil {
if strings.Contains(err.Error(), "EOF") {
util.SendErrorToSlack("EOF error while trying to call %s. Waiting to retry", endpoint)
time.Sleep(30 * time.Second)
return a.SetChannelStatus(channelID, status, failureReason, transferState)
}
return nil, nil, errors.Err(err)
util.SendErrorToSlack("error while trying to call %s. Waiting to retry: %s", endpoint, err.Error())
time.Sleep(30 * time.Second)
return a.SetChannelStatus(channelID, status, failureReason, transferState)
}
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
@ -209,12 +213,9 @@ func (a *APIConfig) SetChannelClaimID(channelID string, channelClaimID string) e
"channel_claim_id": {channelClaimID},
})
if err != nil {
if strings.Contains(err.Error(), "EOF") {
util.SendErrorToSlack("EOF error while trying to call %s. Waiting to retry", endpoint)
time.Sleep(30 * time.Second)
return a.SetChannelClaimID(channelID, channelClaimID)
}
return errors.Err(err)
util.SendErrorToSlack("error while trying to call %s. Waiting to retry: %s", endpoint, err.Error())
time.Sleep(30 * time.Second)
return a.SetChannelClaimID(channelID, channelClaimID)
}
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
@ -253,12 +254,9 @@ func (a *APIConfig) DeleteVideos(videos []string) error {
}
res, err := http.PostForm(endpoint, vals)
if err != nil {
if strings.Contains(err.Error(), "EOF") {
util.SendErrorToSlack("EOF error while trying to call %s. Waiting to retry", endpoint)
time.Sleep(30 * time.Second)
return a.DeleteVideos(videos)
}
return errors.Err(err)
util.SendErrorToSlack("error while trying to call %s. Waiting to retry: %s", endpoint, err.Error())
time.Sleep(30 * time.Second)
return a.DeleteVideos(videos)
}
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
@ -319,12 +317,9 @@ func (a *APIConfig) MarkVideoStatus(status shared.VideoStatus) error {
}
res, err := http.PostForm(endpoint, vals)
if err != nil {
if strings.Contains(err.Error(), "EOF") {
util.SendErrorToSlack("EOF error while trying to call %s for %s. Waiting to retry", endpoint, status.ClaimName)
time.Sleep(30 * time.Second)
return a.MarkVideoStatus(status)
}
return errors.Err(err)
util.SendErrorToSlack("error while trying to call %s. Waiting to retry: %s", endpoint, err.Error())
time.Sleep(30 * time.Second)
return a.MarkVideoStatus(status)
}
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
@ -361,12 +356,9 @@ func (a *APIConfig) VideoState(videoID string) (string, error) {
res, err := http.PostForm(endpoint, vals)
if err != nil {
if strings.Contains(err.Error(), "EOF") {
util.SendErrorToSlack("EOF error while trying to call %s. Waiting to retry", endpoint)
time.Sleep(30 * time.Second)
return a.VideoState(videoID)
}
return "", errors.Err(err)
util.SendErrorToSlack("error while trying to call %s. Waiting to retry: %s", endpoint, err.Error())
time.Sleep(30 * time.Second)
return a.VideoState(videoID)
}
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
@ -415,12 +407,9 @@ func (a *APIConfig) GetReleasedDate(videoID string) (*VideoRelease, error) {
res, err := http.PostForm(endpoint, vals)
if err != nil {
if strings.Contains(err.Error(), "EOF") {
util.SendErrorToSlack("EOF error while trying to call %s. Waiting to retry", endpoint)
time.Sleep(30 * time.Second)
return a.GetReleasedDate(videoID)
}
return nil, errors.Err(err)
util.SendErrorToSlack("error while trying to call %s. Waiting to retry: %s", endpoint, err.Error())
time.Sleep(30 * time.Second)
return a.GetReleasedDate(videoID)
}
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)

View file

@ -4,8 +4,6 @@ import (
"encoding/json"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/lbryio/lbry.go/v2/extras/errors"
)
@ -28,6 +26,7 @@ type YoutubeChannel struct {
SizeLimit int `json:"size_limit"`
LastUploadedVideo string `json:"last_uploaded_video"`
WipeDB bool `json:"wipe_db"`
Language string `json:"language"`
}
type PublishAddress struct {
@ -45,6 +44,65 @@ func (p *PublishAddress) UnmarshalJSON(data []byte) error {
return nil
}
var FatalErrors = []string{
":5279: read: connection reset by peer",
"no space left on device",
"NotEnoughFunds",
"Cannot publish using channel",
"cannot concatenate 'str' and 'NoneType' objects",
"more than 90% of the space has been used.",
"Couldn't find private key for id",
"You already have a stream claim published under the name",
"Missing inputs",
}
var ErrorsNoRetry = []string{
"Requested format is not available",
"non 200 status code received",
"This video contains content from",
"dont know which claim to update",
"uploader has not made this video available in your country",
"download error: AccessDenied: Access Denied",
"Playback on other websites has been disabled by the video owner",
"Error in daemon: Cannot publish empty file",
"Error extracting sts from embedded url response",
"Unable to extract signature tokens",
"Client.Timeout exceeded while awaiting headers",
"the video is too big to sync, skipping for now",
"video is too long to process",
"video is too short to process",
"no compatible format available for this video",
"Watch this video on YouTube.",
"have blocked it on copyright grounds",
"the video must be republished as we can't get the right size",
"HTTP Error 403",
"giving up after 0 fragment retries",
"Sorry about that",
"This video is not available",
"Video unavailable",
"requested format not available",
"interrupted by user",
"Sign in to confirm your age",
"This video is unavailable",
"video is a live stream and hasn't completed yet",
"Premieres in",
"Private video",
"This live event will begin in",
"This video has been removed by the uploader",
"Premiere will begin shortly",
"cannot unmarshal number 0.0",
"default youtube thumbnail found",
"livestream is likely bugged",
}
var WalletErrors = []string{
"Not enough funds to cover this transaction",
"failed: Not enough funds",
"Error in daemon: Insufficient funds, please deposit additional LBC",
//"Missing inputs",
}
var BlockchainErrors = []string{
"txn-mempool-conflict",
"too-long-mempool-chain",
}
var NeverRetryFailures = []string{
"Error extracting sts from embedded url response",
"Unable to extract signature tokens",
@ -57,6 +115,11 @@ var NeverRetryFailures = []string{
"have blocked it on copyright grounds",
"giving up after 0 fragment retries",
"Sign in to confirm your age",
"Playback on other websites has been disabled by the video owner",
"uploader has not made this video available in your country",
"This video has been removed by the uploader",
"Video unavailable",
"Video is not available - hardcoded fix",
}
type SyncFlags struct {
@ -94,7 +157,7 @@ func (f *SyncFlags) VideosToSync(totalSubscribers uint) int {
800: 250,
600: 200,
200: 80,
100: 50,
100: 20,
1: 10,
}
videosToSync := 0
@ -131,39 +194,28 @@ const (
StatusSynced = "synced" // done
StatusWipeDb = "pendingdbwipe" // in sync queue. lbryum database will be pruned
StatusFailed = "failed"
StatusFinalized = "finalized" // no more changes allowed
StatusAbandoned = "abandoned" // deleted on youtube or banned
StatusFinalized = "finalized" // no more changes allowed
StatusAbandoned = "abandoned" // deleted on youtube or banned
StatusAgeRestricted = "agerestricted" // one or more videos are age restricted and should be reprocessed with special keys
)
var SyncStatuses = []string{StatusPending, StatusPendingEmail, StatusPendingUpgrade, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized, StatusAbandoned, StatusWipeDb}
var SyncStatuses = []string{StatusPending, StatusPendingEmail, StatusPendingUpgrade, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized, StatusAbandoned, StatusWipeDb, StatusAgeRestricted}
const LatestMetadataVersion = 2
const (
VideoStatusPublished = "published"
VideoStatusFailed = "failed"
VideoStatusUpgradeFailed = "upgradefailed"
VideoStatusUnpublished = "unpublished"
VideoStatusTranferFailed = "transferfailed"
VideoStatusPublished = "published"
VideoStatusFailed = "failed"
VideoStatusUpgradeFailed = "upgradefailed"
VideoStatusUnpublished = "unpublished"
VideoStatusTransferFailed = "transferfailed"
)
var VideoSyncStatuses = []string{VideoStatusPublished, VideoStatusFailed, VideoStatusUpgradeFailed, VideoStatusUnpublished, VideoStatusTransferFailed}
const (
TransferStateNotTouched = iota
TransferStatePending
TransferStateComplete
TransferStateManual
)
type AwsConfigs struct {
AwsS3ID string
AwsS3Secret string
AwsS3Region string
AwsS3Bucket string
}
func (a *AwsConfigs) GetS3AWSConfig() *aws.Config {
return &aws.Config{
Credentials: credentials.NewStaticCredentials(a.AwsS3ID, a.AwsS3Secret, ""),
Region: &a.AwsS3Region,
}
}

View file

@ -2,25 +2,26 @@ package sources
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/abadojack/whatlanggo"
"github.com/lbryio/ytsync/v5/downloader"
"github.com/lbryio/ytsync/v5/downloader/ytdl"
"github.com/lbryio/ytsync/v5/shared"
"gopkg.in/vansante/go-ffprobe.v2"
"github.com/lbryio/ytsync/v5/ip_manager"
"github.com/lbryio/ytsync/v5/namer"
"github.com/lbryio/ytsync/v5/sdk"
"github.com/lbryio/ytsync/v5/shared"
"github.com/lbryio/ytsync/v5/tags_manager"
"github.com/lbryio/ytsync/v5/thumbs"
"github.com/lbryio/ytsync/v5/timing"
@ -31,9 +32,12 @@ import (
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/aws/aws-sdk-go/aws"
"github.com/abadojack/whatlanggo"
"github.com/shopspring/decimal"
log "github.com/sirupsen/logrus"
"github.com/vbauerster/mpb/v7"
"github.com/vbauerster/mpb/v7/decor"
"gopkg.in/vansante/go-ffprobe.v2"
)
type YoutubeVideo struct {
@ -49,13 +53,14 @@ type YoutubeVideo struct {
youtubeInfo *ytdl.YtdlVideo
youtubeChannelID string
tags []string
awsConfig aws.Config
thumbnailURL string
lbryChannelID string
mocked bool
walletLock *sync.RWMutex
stopGroup *stop.Group
pool *ip_manager.IPPool
progressBars *mpb.Progress
progressBarWg *sync.WaitGroup
}
var youtubeCategories = map[string]string{
@ -93,7 +98,7 @@ var youtubeCategories = map[string]string{
"44": "trailers",
}
func NewYoutubeVideo(directory string, videoData *ytdl.YtdlVideo, playlistPosition int64, awsConfig aws.Config, stopGroup *stop.Group, pool *ip_manager.IPPool) (*YoutubeVideo, error) {
func NewYoutubeVideo(directory string, videoData *ytdl.YtdlVideo, playlistPosition int64, stopGroup *stop.Group, pool *ip_manager.IPPool) (*YoutubeVideo, error) {
// youtube-dl returns times in local timezone sometimes. this could break in the future
// maybe we can file a PR to choose the timezone we want from youtube-dl
return &YoutubeVideo{
@ -101,22 +106,21 @@ func NewYoutubeVideo(directory string, videoData *ytdl.YtdlVideo, playlistPositi
title: videoData.Title,
description: videoData.Description,
playlistPosition: playlistPosition,
publishedAt: videoData.UploadDateForReal,
publishedAt: videoData.GetUploadTime(),
dir: directory,
youtubeInfo: videoData,
awsConfig: awsConfig,
mocked: false,
youtubeChannelID: videoData.ChannelID,
stopGroup: stopGroup,
pool: pool,
}, nil
}
func NewMockedVideo(directory string, videoID string, youtubeChannelID string, awsConfig aws.Config, stopGroup *stop.Group, pool *ip_manager.IPPool) *YoutubeVideo {
func NewMockedVideo(directory string, videoID string, youtubeChannelID string, stopGroup *stop.Group, pool *ip_manager.IPPool) *YoutubeVideo {
return &YoutubeVideo{
id: videoID,
playlistPosition: 0,
dir: directory,
awsConfig: awsConfig,
mocked: true,
youtubeChannelID: youtubeChannelID,
stopGroup: stopGroup,
@ -171,7 +175,7 @@ func (v *YoutubeVideo) getFullPath() string {
}
func (v *YoutubeVideo) getAbbrevDescription() string {
maxLength := 2800
maxLength := 6500
description := strings.TrimSpace(v.description)
additionalDescription := "\nhttps://www.youtube.com/watch?v=" + v.id
khanAcademyClaimID := "5fc52291980268b82413ca4c0ace1b8d749f3ffb"
@ -183,15 +187,32 @@ func (v *YoutubeVideo) getAbbrevDescription() string {
}
return description + "\n..." + additionalDescription
}
func checkCookiesIntegrity() error {
fi, err := os.Stat("cookies.txt")
if err != nil {
return errors.Err(err)
}
if fi.Size() == 0 {
log.Errorf("cookies were cleared out. Attempting a restore from cookies-backup.txt")
input, err := ioutil.ReadFile("cookies-backup.txt")
if err != nil {
return errors.Err(err)
}
err = ioutil.WriteFile("cookies.txt", input, 0644)
if err != nil {
return errors.Err(err)
}
}
return nil
}
func (v *YoutubeVideo) download() error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("download").Add(time.Since(start))
}(start)
if v.youtubeInfo.IsLive != nil {
return errors.Err("video is a live stream and hasn't completed yet")
}
videoPath := v.getFullPath()
err := os.Mkdir(v.videoDir(), 0777)
@ -210,32 +231,54 @@ func (v *YoutubeVideo) download() error {
"1080",
"720",
"480",
"320",
"360",
}
dur := time.Duration(v.youtubeInfo.Duration) * time.Second
if dur.Hours() > 2 { //for videos longer than 2 hours only sync up to 720p
if dur.Hours() > 1 { //for videos longer than 1 hour only sync up to 720p
qualities = []string{
"720",
"480",
"320",
"360",
}
}
metadataPath := path.Join(logUtils.GetVideoMetadataDir(), v.id+".info.json")
_, err = os.Stat(metadataPath)
if err != nil {
if os.IsNotExist(err) {
return errors.Err("metadata information for video %s is missing! Why?", v.id)
}
return errors.Err(err)
}
metadata, err := parseVideoMetadata(metadataPath)
err = checkCookiesIntegrity()
if err != nil {
return err
}
ytdlArgs := []string{
"--no-progress",
"-o" + strings.TrimSuffix(v.getFullPath(), ".mp4"),
"--merge-output-format",
"mp4",
"--rm-cache-dir",
"--postprocessor-args",
"-movflags faststart",
"ffmpeg:-movflags faststart",
"--abort-on-unavailable-fragment",
"--fragment-retries",
"1",
"--cookies",
"cookies.txt",
"--extractor-args",
"youtube:player_client=android",
//"--concurrent-fragments",
//"2",
"--load-info-json",
metadataPath,
}
userAgent := []string{"--user-agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36"}
userAgent := []string{"--user-agent", downloader.ChromeUA}
if v.maxVideoSize > 0 {
ytdlArgs = append(ytdlArgs,
"--max-filesize",
@ -272,15 +315,19 @@ func (v *YoutubeVideo) download() error {
ytdlArgs = append(ytdlArgs,
"--source-address",
sourceAddress,
"https://www.youtube.com/watch?v="+v.ID(),
fmt.Sprintf("https://www.youtube.com/watch?v=%s", v.id),
)
//speedThrottleRetries := 3
for i := 0; i < len(qualities); i++ {
quality := qualities[i]
argsWithFilters := append(ytdlArgs, "-fbestvideo[ext=mp4][vcodec!*=av01][height<="+quality+"]+bestaudio[ext!=webm][format_id!=258][format_id!=251][format_id!=256][format_id!=327]")
argsWithFilters := append(ytdlArgs, "-fbestvideo[ext=mp4][vcodec!*=av01][height<="+quality+"]+bestaudio[ext!=webm][format_id!=258][format_id!=380][format_id!=251][format_id!=256][format_id!=327][format_id!=328]")
argsWithFilters = append(argsWithFilters, userAgent...)
cmd := exec.Command("youtube-dl", argsWithFilters...)
log.Printf("Running command youtube-dl %s", strings.Join(argsWithFilters, " "))
//if speedThrottleRetries > 0 {
// speedThrottleRetries--
// argsWithFilters = append(argsWithFilters, "--throttled-rate", "180K")
//}
cmd := exec.Command("yt-dlp", argsWithFilters...)
log.Printf("Running command yt-dlp %s", strings.Join(argsWithFilters, " "))
stderr, err := cmd.StderrPipe()
if err != nil {
@ -295,9 +342,22 @@ func (v *YoutubeVideo) download() error {
return errors.Err(err)
}
dlStopGrp := stop.New()
ticker := time.NewTicker(400 * time.Millisecond)
go v.trackProgressBar(argsWithFilters, ticker, metadata, dlStopGrp, sourceAddress)
//ticker2 := time.NewTicker(10 * time.Second)
//v.monitorSlowDownload(ticker, dlStopGrp, sourceAddress, cmd)
errorLog, _ := ioutil.ReadAll(stderr)
outLog, _ := ioutil.ReadAll(stdout)
err = cmd.Wait()
//stop the progress bar
ticker.Stop()
dlStopGrp.Stop()
if err != nil {
if strings.Contains(err.Error(), "exit status 1") {
if strings.Contains(string(errorLog), "HTTP Error 429") || strings.Contains(string(errorLog), "returned non-zero exit status 8") {
@ -309,9 +369,13 @@ func (v *YoutubeVideo) download() error {
continue //this bypasses the yt throttling IP redistribution... TODO: don't
} else if strings.Contains(string(errorLog), "YouTube said: Unable to extract video data") && !strings.Contains(userAgent[1], "Googlebot") {
i-- //do not lower quality when trying a different user agent
userAgent = []string{"--user-agent", "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"}
userAgent = []string{downloader.GoogleBotUA}
log.Infof("trying different user agent for video %s", v.ID())
continue
//} else if strings.Contains(string(errorLog), "yt_dlp.utils.ThrottledDownload") {
// log.Infof("throttled download speed for video %s. Retrying", v.ID())
// i-- //do not lower quality when we're retrying a throttled download
// continue
}
return errors.Err(string(errorLog))
}
@ -349,10 +413,241 @@ func (v *YoutubeVideo) download() error {
}
return nil
}
func (v *YoutubeVideo) monitorSlowDownload(ticker *time.Ticker, stop *stop.Group, address string, cmd *exec.Cmd) {
count := 0
lastSize := int64(0)
for {
select {
case <-stop.Ch():
return
case <-ticker.C:
size, err := logUtils.DirSize(v.videoDir())
if err != nil {
log.Errorf("error while getting size of download directory: %s", errors.FullTrace(err))
continue
}
delta := size - lastSize
avgSpeed := delta / 10
if avgSpeed < 200*1024 { //200 KB/s
count++
} else {
count--
}
if count > 3 {
err := cmd.Process.Signal(syscall.SIGKILL)
if err != nil {
log.Errorf("failure in killing slow download: %s", errors.Err(err))
return
}
}
}
}
}
func (v *YoutubeVideo) trackProgressBar(argsWithFilters []string, ticker *time.Ticker, metadata *ytMetadata, done *stop.Group, sourceAddress string) {
v.progressBarWg.Add(1)
go func() {
defer v.progressBarWg.Done()
//get size of the video before downloading
cmd := exec.Command("yt-dlp", append(argsWithFilters, "-s")...)
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Errorf("error while getting final file size: %s", errors.FullTrace(err))
return
}
if err := cmd.Start(); err != nil {
log.Errorf("error while getting final file size: %s", errors.FullTrace(err))
return
}
outLog, _ := ioutil.ReadAll(stdout)
err = cmd.Wait()
output := string(outLog)
parts := strings.Split(output, ": ")
if len(parts) != 3 {
log.Errorf("couldn't parse audio and video parts from the output (%s)", output)
return
}
formats := strings.Split(parts[2], "+")
if len(formats) != 2 {
log.Errorf("couldn't parse formats from the output (%s)", output)
return
}
log.Debugf("'%s'", output)
videoFormat := formats[0]
audioFormat := strings.Replace(formats[1], "\n", "", -1)
videoSize := 0
audioSize := 0
if metadata != nil {
for _, f := range metadata.Formats {
if f.FormatID == videoFormat {
videoSize = f.Filesize
}
if f.FormatID == audioFormat {
audioSize = f.Filesize
}
}
}
log.Debugf("(%s) - videoSize: %d (%s), audiosize: %d (%s)", v.id, videoSize, videoFormat, audioSize, audioFormat)
bar := v.progressBars.AddBar(int64(videoSize+audioSize),
mpb.PrependDecorators(
decor.CountersKibiByte("% .2f / % .2f "),
// simple name decorator
decor.Name(fmt.Sprintf("id: %s src-ip: (%s)", v.id, sourceAddress)),
// decor.DSyncWidth bit enables column width synchronization
decor.Percentage(decor.WCSyncSpace),
),
mpb.AppendDecorators(
decor.EwmaETA(decor.ET_STYLE_GO, 90),
decor.Name(" ] "),
decor.EwmaSpeed(decor.UnitKiB, "% .2f ", 60),
decor.OnComplete(
// ETA decorator with ewma age of 60
decor.EwmaETA(decor.ET_STYLE_GO, 60), "done",
),
),
mpb.BarRemoveOnComplete(),
)
defer func() {
bar.Completed()
bar.Abort(true)
}()
origSize := int64(0)
lastUpdate := time.Now()
for {
select {
case <-done.Ch():
return
case <-ticker.C:
var err error
size, err := logUtils.DirSize(v.videoDir())
if err != nil {
log.Errorf("error while getting size of download directory: %s", errors.FullTrace(err))
return
}
if size > origSize {
origSize = size
bar.SetCurrent(size)
if size > int64(videoSize+audioSize) {
bar.SetTotal(size+2048, false)
}
bar.DecoratorEwmaUpdate(time.Since(lastUpdate))
lastUpdate = time.Now()
}
}
}
}()
}
type ytMetadata struct {
ID string `json:"id"`
Title string `json:"title"`
Formats []struct {
Asr int `json:"asr"`
Filesize int `json:"filesize"`
FormatID string `json:"format_id"`
FormatNote string `json:"format_note"`
Fps interface{} `json:"fps"`
Height interface{} `json:"height"`
Quality int `json:"quality"`
Tbr float64 `json:"tbr"`
URL string `json:"url"`
Width interface{} `json:"width"`
Ext string `json:"ext"`
Vcodec string `json:"vcodec"`
Acodec string `json:"acodec"`
Abr float64 `json:"abr,omitempty"`
DownloaderOptions struct {
HTTPChunkSize int `json:"http_chunk_size"`
} `json:"downloader_options,omitempty"`
Container string `json:"container,omitempty"`
Format string `json:"format"`
Protocol string `json:"protocol"`
HTTPHeaders struct {
UserAgent string `json:"User-Agent"`
AcceptCharset string `json:"Accept-Charset"`
Accept string `json:"Accept"`
AcceptEncoding string `json:"Accept-Encoding"`
AcceptLanguage string `json:"Accept-Language"`
} `json:"http_headers"`
Vbr float64 `json:"vbr,omitempty"`
} `json:"formats"`
Thumbnails []struct {
Height int `json:"height"`
URL string `json:"url"`
Width int `json:"width"`
Resolution string `json:"resolution"`
ID string `json:"id"`
} `json:"thumbnails"`
Description string `json:"description"`
UploadDate string `json:"upload_date"`
Uploader string `json:"uploader"`
UploaderID string `json:"uploader_id"`
UploaderURL string `json:"uploader_url"`
ChannelID string `json:"channel_id"`
ChannelURL string `json:"channel_url"`
Duration int `json:"duration"`
ViewCount int `json:"view_count"`
AverageRating float64 `json:"average_rating"`
AgeLimit int `json:"age_limit"`
WebpageURL string `json:"webpage_url"`
Categories []string `json:"categories"`
Tags []interface{} `json:"tags"`
IsLive interface{} `json:"is_live"`
LikeCount int `json:"like_count"`
DislikeCount int `json:"dislike_count"`
Channel string `json:"channel"`
Extractor string `json:"extractor"`
WebpageURLBasename string `json:"webpage_url_basename"`
ExtractorKey string `json:"extractor_key"`
Playlist interface{} `json:"playlist"`
PlaylistIndex interface{} `json:"playlist_index"`
Thumbnail string `json:"thumbnail"`
DisplayID string `json:"display_id"`
Format string `json:"format"`
FormatID string `json:"format_id"`
Width int `json:"width"`
Height int `json:"height"`
Resolution interface{} `json:"resolution"`
Fps int `json:"fps"`
Vcodec string `json:"vcodec"`
Vbr float64 `json:"vbr"`
StretchedRatio interface{} `json:"stretched_ratio"`
Acodec string `json:"acodec"`
Abr float64 `json:"abr"`
Ext string `json:"ext"`
Fulltitle string `json:"fulltitle"`
Filename string `json:"_filename"`
}
func parseVideoMetadata(metadataPath string) (*ytMetadata, error) {
f, err := os.Open(metadataPath)
if err != nil {
return nil, errors.Err(err)
}
// defer the closing of our jsonFile so that we can parse it later on
defer f.Close()
// read our opened jsonFile as a byte array.
byteValue, _ := ioutil.ReadAll(f)
// we initialize our Users array
var m ytMetadata
// we unmarshal our byteArray which contains our
// jsonFile's content into 'users' which we defined above
err = json.Unmarshal(byteValue, &m)
if err != nil {
return nil, errors.Err(err)
}
return &m, nil
}
func (v *YoutubeVideo) videoDir() string {
return v.dir + "/" + v.id
return path.Join(v.dir, v.id)
}
func (v *YoutubeVideo) getDownloadedPath() (string, error) {
files, err := ioutil.ReadDir(v.videoDir())
log.Infoln(v.videoDir())
@ -367,7 +662,7 @@ func (v *YoutubeVideo) getDownloadedPath() (string, error) {
continue
}
if strings.Contains(v.getFullPath(), strings.TrimSuffix(f.Name(), filepath.Ext(f.Name()))) {
return v.videoDir() + "/" + f.Name(), nil
return path.Join(v.videoDir(), f.Name()), nil
}
}
return "", errors.Err("could not find any downloaded videos")
@ -393,7 +688,10 @@ func (v *YoutubeVideo) delete(reason string) error {
func (v *YoutubeVideo) triggerThumbnailSave() (err error) {
thumbnail := thumbs.GetBestThumbnail(v.youtubeInfo.Thumbnails)
v.thumbnailURL, err = thumbs.MirrorThumbnail(thumbnail.URL, v.ID(), v.awsConfig)
if thumbnail.Width == 0 {
return errors.Err("default youtube thumbnail found")
}
v.thumbnailURL, err = thumbs.MirrorThumbnail(thumbnail.URL, v.ID())
return err
}
@ -466,11 +764,13 @@ type SyncParams struct {
DefaultAccount string
}
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, params SyncParams, existingVideoData *sdk.SyncedVideo, reprocess bool, walletLock *sync.RWMutex) (*SyncSummary, error) {
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, params SyncParams, existingVideoData *sdk.SyncedVideo, reprocess bool, walletLock *sync.RWMutex, pbWg *sync.WaitGroup, pb *mpb.Progress) (*SyncSummary, error) {
v.maxVideoSize = int64(params.MaxVideoSize)
v.maxVideoLength = params.MaxVideoLength
v.lbryChannelID = params.ChannelID
v.walletLock = walletLock
v.progressBars = pb
v.progressBarWg = pbWg
if reprocess && existingVideoData != nil && existingVideoData.Published {
summary, err := v.reprocess(daemon, params, existingVideoData)
return summary, errors.Prefix("upgrade failed", err)
@ -480,10 +780,19 @@ func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, params SyncParams, existingV
func (v *YoutubeVideo) downloadAndPublish(daemon *jsonrpc.Client, params SyncParams) (*SyncSummary, error) {
var err error
if v.youtubeInfo == nil {
return nil, errors.Err("Video is not available - hardcoded fix")
}
dur := time.Duration(v.youtubeInfo.Duration) * time.Second
minDuration := 7 * time.Second
if v.youtubeInfo.IsLive == true {
return nil, errors.Err("video is a live stream and hasn't completed yet")
}
if v.youtubeInfo.Availability != "public" {
return nil, errors.Err("video is not public")
}
if dur > v.maxVideoLength {
logUtils.SendErrorToSlack("%s is %s long and the limit is %s", v.id, dur.String(), v.maxVideoLength.String())
return nil, errors.Err("video is too long to process")
@ -492,6 +801,11 @@ func (v *YoutubeVideo) downloadAndPublish(daemon *jsonrpc.Client, params SyncPar
logUtils.SendErrorToSlack("%s is %s long and the minimum is %s", v.id, dur.String(), minDuration.String())
return nil, errors.Err("video is too short to process")
}
buggedLivestream := v.youtubeInfo.LiveStatus == "post_live"
if buggedLivestream && dur >= 2*time.Hour {
return nil, errors.Err("livestream is likely bugged as it was recently published and has a length of %s which is more than 2 hours", dur.String())
}
for {
err = v.download()
if err != nil && strings.Contains(err.Error(), "HTTP Error 429") {
@ -502,7 +816,6 @@ func (v *YoutubeVideo) downloadAndPublish(daemon *jsonrpc.Client, params SyncPar
break
}
log.Debugln("Downloaded " + v.id)
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
@ -562,7 +875,11 @@ func (v *YoutubeVideo) getMetadata() (languages []string, locations []jsonrpc.Lo
}
func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, existingVideoData *sdk.SyncedVideo) (*SyncSummary, error) {
c, err := daemon.ClaimSearch(nil, &existingVideoData.ClaimID, nil, nil, 1, 20)
c, err := daemon.ClaimSearch(jsonrpc.ClaimSearchArgs{
ClaimID: &existingVideoData.ClaimID,
Page: 1,
PageSize: 20,
})
if err != nil {
return nil, errors.Err(err)
}
@ -581,7 +898,7 @@ func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, exis
return nil, errors.Err("could not find thumbnail for mocked video")
}
thumbnail := thumbs.GetBestThumbnail(v.youtubeInfo.Thumbnails)
thumbnailURL, err = thumbs.MirrorThumbnail(thumbnail.URL, v.ID(), v.awsConfig)
thumbnailURL, err = thumbs.MirrorThumbnail(thumbnail.URL, v.ID())
} else {
thumbnailURL = thumbs.ThumbnailEndpoint + v.ID()
}
@ -598,9 +915,8 @@ func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, exis
return nil, errors.Err(err)
}
return v.downloadAndPublish(daemon, params)
}
return nil, errors.Err("the video must be republished as we can't get the right size but it doesn't exist on youtube anymore")
return nil, errors.Prefix("the video must be republished as we can't get the right size and it doesn't exist on youtube anymore", err)
}
}
v.size = util.PtrToInt64(int64(videoSize))
@ -626,12 +942,13 @@ func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, exis
params.DefaultAccount,
},
},
Author: util.PtrToString(""),
License: util.PtrToString("Copyrighted (contact publisher)"),
ChannelID: &v.lbryChannelID,
Height: util.PtrToUint(720),
Width: util.PtrToUint(1280),
Fee: fee,
Author: util.PtrToString(""),
License: util.PtrToString("Copyrighted (contact publisher)"),
ChannelID: &v.lbryChannelID,
Height: util.PtrToUint(720),
Width: util.PtrToUint(1280),
Fee: fee,
ReleaseTime: util.PtrToInt64(v.publishedAt.Unix()),
}
v.walletLock.RLock()

View file

@ -6,6 +6,7 @@ import (
"os"
"strings"
"github.com/lbryio/ytsync/v5/configs"
"github.com/lbryio/ytsync/v5/downloader/ytdl"
"github.com/lbryio/lbry.go/v2/extras/errors"
@ -83,11 +84,11 @@ func (u *thumbnailUploader) deleteTmpFile() {
log.Infof("failed to delete local thumbnail file: %s", err.Error())
}
}
func MirrorThumbnail(url string, name string, s3Config aws.Config) (string, error) {
func MirrorThumbnail(url string, name string) (string, error) {
tu := thumbnailUploader{
originalUrl: url,
name: name,
s3Config: s3Config,
s3Config: *configs.Configuration.AWSThumbnailsS3Config.GetS3AWSConfig(),
}
err := tu.downloadThumbnail()
if err != nil {
@ -100,14 +101,12 @@ func MirrorThumbnail(url string, name string, s3Config aws.Config) (string, erro
return "", err
}
ownS3Config := s3Config.Copy(&aws.Config{Endpoint: aws.String("s3.lbry.tech")})
//this is our own S3 storage
tu2 := thumbnailUploader{
originalUrl: url,
name: name,
s3Config: *ownS3Config,
s3Config: *configs.Configuration.ThumbnailsS3Config.GetS3AWSConfig(),
}
//own S3
err = tu2.uploadThumbnail()
if err != nil {
return "", err
@ -117,13 +116,11 @@ func MirrorThumbnail(url string, name string, s3Config aws.Config) (string, erro
}
func GetBestThumbnail(thumbnails []ytdl.Thumbnail) *ytdl.Thumbnail {
var bestWidth *ytdl.Thumbnail
var bestWidth ytdl.Thumbnail
for _, thumbnail := range thumbnails {
if bestWidth == nil {
bestWidth = &thumbnail
} else if bestWidth.Width < thumbnail.Width {
bestWidth = &thumbnail
if bestWidth.Width < thumbnail.Width {
bestWidth = thumbnail
}
}
return bestWidth
return &bestWidth
}

109
util/archive.go Normal file
View file

@ -0,0 +1,109 @@
package util
import (
"archive/tar"
"io"
"io/fs"
"os"
"path/filepath"
"github.com/lbryio/lbry.go/v2/extras/errors"
)
func CreateTarball(tarballFilePath string, filePaths []string) error {
file, err := os.Create(tarballFilePath)
if err != nil {
return errors.Err("Could not create tarball file '%s', got error '%s'", tarballFilePath, err.Error())
}
defer file.Close()
tarWriter := tar.NewWriter(file)
defer tarWriter.Close()
for _, filePath := range filePaths {
err := addFileToTarWriter(filePath, tarWriter)
if err != nil {
return errors.Err("Could not add file '%s', to tarball, got error '%s'", filePath, err.Error())
}
}
return nil
}
func addFileToTarWriter(filePath string, tarWriter *tar.Writer) error {
file, err := os.Open(filePath)
if err != nil {
return errors.Err("Could not open file '%s', got error '%s'", filePath, err.Error())
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
return errors.Err("Could not get stat for file '%s', got error '%s'", filePath, err.Error())
}
header := &tar.Header{
Name: stat.Name(),
Size: stat.Size(),
Mode: int64(stat.Mode()),
ModTime: stat.ModTime(),
}
err = tarWriter.WriteHeader(header)
if err != nil {
return errors.Err("Could not write header for file '%s', got error '%s'", filePath, err.Error())
}
_, err = io.Copy(tarWriter, file)
if err != nil {
return errors.Err("Could not copy the file '%s' data to the tarball, got error '%s'", filePath, err.Error())
}
return nil
}
func Untar(tarball, target string) error {
reader, err := os.Open(tarball)
if err != nil {
return errors.Err(err)
}
defer reader.Close()
tarReader := tar.NewReader(reader)
for {
header, err := tarReader.Next()
if err == io.EOF {
break
} else if err != nil {
return errors.Err(err)
}
path := filepath.Join(target, header.Name)
info := header.FileInfo()
if info.IsDir() {
if err = os.MkdirAll(path, info.Mode()); err != nil {
return errors.Err(err)
}
continue
}
err = extractFile(path, info, tarReader)
if err != nil {
return err
}
}
return nil
}
func extractFile(path string, info fs.FileInfo, tarReader *tar.Reader) error {
file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, info.Mode())
if err != nil {
return errors.Err(err)
}
defer file.Close()
_, err = io.Copy(file, tarReader)
if err != nil {
return errors.Err(err)
}
return nil
}

View file

@ -14,7 +14,9 @@ func SendErrorToSlack(format string, a ...interface{}) {
message = fmt.Sprintf(format, a...)
}
log.Errorln(message)
err := util.SendToSlack(":sos: " + message)
log.SetLevel(log.InfoLevel) //I don't want to change the underlying lib so this will do...
err := util.SendToSlack(":sos: ```" + message + "```")
log.SetLevel(log.DebugLevel)
if err != nil {
log.Errorln(err)
}
@ -27,7 +29,9 @@ func SendInfoToSlack(format string, a ...interface{}) {
message = fmt.Sprintf(format, a...)
}
log.Infoln(message)
log.SetLevel(log.InfoLevel) //I don't want to change the underlying lib so this will do...
err := util.SendToSlack(":information_source: " + message)
log.SetLevel(log.DebugLevel)
if err != nil {
log.Errorln(err)
}

View file

@ -11,6 +11,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/lbrycrd"
"github.com/lbryio/ytsync/v5/configs"
"github.com/lbryio/ytsync/v5/timing"
"github.com/docker/docker/api/types"
@ -185,9 +186,9 @@ func CleanForStartup() error {
return errors.Err(err)
}
lbrycrd, err := GetLbrycrdClient(os.Getenv("LBRYCRD_STRING"))
lbrycrd, err := GetLbrycrdClient(configs.Configuration.LbrycrdString)
if err != nil {
return errors.Prefix("error getting lbrycrd client: ", err)
return errors.Prefix("error getting lbrycrd client", err)
}
height, err := lbrycrd.GetBlockCount()
if err != nil {
@ -261,6 +262,27 @@ func CleanupLbrynet() error {
return nil
}
var metadataDirInitialized = false
func GetVideoMetadataDir() string {
dir := "./videos_metadata"
if !metadataDirInitialized {
metadataDirInitialized = true
_ = os.MkdirAll(dir, 0755)
}
return dir
}
func CleanupMetadata() error {
dir := GetVideoMetadataDir()
err := os.RemoveAll(dir)
if err != nil {
return errors.Err(err)
}
metadataDirInitialized = false
return nil
}
func SleepUntilQuotaReset() {
PST, _ := time.LoadLocation("America/Los_Angeles")
t := time.Now().In(PST)
@ -384,3 +406,17 @@ func GetBlockchainDirectoryName() string {
}
return ledger
}
func DirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
size += info.Size()
}
return err
})
return size, err
}

View file

@ -15,6 +15,7 @@ import (
"github.com/lbryio/ytsync/v5/shared"
logUtils "github.com/lbryio/ytsync/v5/util"
"github.com/vbauerster/mpb/v7"
"github.com/lbryio/ytsync/v5/downloader/ytdl"
@ -28,7 +29,6 @@ import (
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/aws/aws-sdk-go/aws"
log "github.com/sirupsen/logrus"
)
@ -38,7 +38,7 @@ type Video interface {
IDAndNum() string
PlaylistPosition() int
PublishedAt() time.Time
Sync(*jsonrpc.Client, sources.SyncParams, *sdk.SyncedVideo, bool, *sync.RWMutex) (*sources.SyncSummary, error)
Sync(*jsonrpc.Client, sources.SyncParams, *sdk.SyncedVideo, bool, *sync.RWMutex, *sync.WaitGroup, *mpb.Progress) (*sources.SyncSummary, error)
}
type byPublishedAt []Video
@ -49,14 +49,13 @@ func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[
type VideoParams struct {
VideoDir string
S3Config aws.Config
Stopper *stop.Group
IPPool *ip_manager.IPPool
}
var mostRecentlyFailedChannel string // TODO: fix this hack!
func GetVideosToSync(config *sdk.APIConfig, channelID string, syncedVideos map[string]sdk.SyncedVideo, quickSync bool, maxVideos int, videoParams VideoParams, lastUploadedVideo string) ([]Video, error) {
func GetVideosToSync(channelID string, syncedVideos map[string]sdk.SyncedVideo, quickSync bool, maxVideos int, videoParams VideoParams, lastUploadedVideo string) ([]Video, error) {
var videos []Video
if quickSync && maxVideos > 50 {
maxVideos = 50
@ -95,14 +94,14 @@ func GetVideosToSync(config *sdk.APIConfig, channelID string, syncedVideos map[s
mostRecentlyFailedChannel = channelID
}
vids, err := getVideos(config, channelID, videoIDs, videoParams.Stopper.Ch(), videoParams.IPPool)
vids, err := getVideos(channelID, videoIDs, videoParams.Stopper.Ch(), videoParams.IPPool)
if err != nil {
return nil, err
}
for _, item := range vids {
positionInList := playlistMap[item.ID]
videoToAdd, err := sources.NewYoutubeVideo(videoParams.VideoDir, item, positionInList, videoParams.S3Config, videoParams.Stopper, videoParams.IPPool)
videoToAdd, err := sources.NewYoutubeVideo(videoParams.VideoDir, item, positionInList, videoParams.Stopper, videoParams.IPPool)
if err != nil {
return nil, errors.Err(err)
}
@ -110,11 +109,12 @@ func GetVideosToSync(config *sdk.APIConfig, channelID string, syncedVideos map[s
}
for k, v := range syncedVideos {
if !v.Published {
newMetadataVersion := int8(2)
if !v.Published && v.MetadataVersion >= newMetadataVersion {
continue
}
if _, ok := playlistMap[k]; !ok {
videos = append(videos, sources.NewMockedVideo(videoParams.VideoDir, k, channelID, videoParams.S3Config, videoParams.Stopper, videoParams.IPPool))
videos = append(videos, sources.NewMockedVideo(videoParams.VideoDir, k, channelID, videoParams.Stopper, videoParams.IPPool))
}
}
@ -129,7 +129,7 @@ func CountVideosInChannel(channelID string) (int, error) {
req, _ := http.NewRequest("GET", url, nil)
req.Header.Add("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36")
req.Header.Add("User-Agent", downloader.ChromeUA)
req.Header.Add("Accept", "*/*")
req.Header.Add("Host", "socialblade.com")
@ -173,7 +173,7 @@ func ChannelInfo(channelID string) (*YoutubeStatsResponse, error) {
req, _ := http.NewRequest("GET", url, nil)
req.Header.Add("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36")
req.Header.Add("User-Agent", downloader.ChromeUA)
req.Header.Add("Accept", "*/*")
res, err := http.DefaultClient.Do(req)
@ -204,7 +204,8 @@ func ChannelInfo(channelID string) (*YoutubeStatsResponse, error) {
return &decodedResponse, nil
}
func getVideos(config *sdk.APIConfig, channelID string, videoIDs []string, stopChan stop.Chan, ipPool *ip_manager.IPPool) ([]*ytdl.YtdlVideo, error) {
func getVideos(channelID string, videoIDs []string, stopChan stop.Chan, ipPool *ip_manager.IPPool) ([]*ytdl.YtdlVideo, error) {
config := sdk.GetAPIsConfigs()
var videos []*ytdl.YtdlVideo
for _, videoID := range videoIDs {
if len(videoID) < 5 {
@ -216,11 +217,6 @@ func getVideos(config *sdk.APIConfig, channelID string, videoIDs []string, stopC
default:
}
//ip, err := ipPool.GetIP(videoID)
//if err != nil {
// return nil, err
//}
//video, err := downloader.GetVideoInformation(videoID, &net.TCPAddr{IP: net.ParseIP(ip)})
state, err := config.VideoState(videoID)
if err != nil {
return nil, errors.Err(err)
@ -228,7 +224,7 @@ func getVideos(config *sdk.APIConfig, channelID string, videoIDs []string, stopC
if state == "published" {
continue
}
video, err := downloader.GetVideoInformation(config, videoID, stopChan, nil, ipPool)
video, err := downloader.GetVideoInformation(videoID, stopChan, ipPool)
if err != nil {
errSDK := config.MarkVideoStatus(shared.VideoStatus{
ChannelID: channelID,