This commit is contained in:
Alex Grintsvayg 2018-10-08 16:26:45 -04:00
parent 7a6eb57280
commit aa401b2ff3
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
15 changed files with 78 additions and 2688 deletions

175
Gopkg.lock generated
View file

@ -1,58 +1,6 @@
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
[[projects]]
digest = "1:9a88883f474d09f1da61894cd8115c7f33988d6941e4f6236324c777aaff8f2c"
name = "github.com/PuerkitoBio/goquery"
packages = ["."]
pruneopts = ""
revision = "dc2ec5c7ca4d9aae063b79b9f581dd3ea6afd2b2"
version = "v1.4.1"
[[projects]]
digest = "1:e3726ad6f38f710e84c8dcd0e830014de6eaeea81f28d91ae898afecc078479a"
name = "github.com/andybalholm/cascadia"
packages = ["."]
pruneopts = ""
revision = "901648c87902174f774fac311d7f176f8647bdaa"
version = "v1.0.0"
[[projects]]
digest = "1:261d95f4464744d542759a7a33846f56f24113f5a93c7577f4cd7044f7cb3d76"
name = "github.com/aws/aws-sdk-go"
packages = [
"aws",
"aws/awserr",
"aws/awsutil",
"aws/client",
"aws/client/metadata",
"aws/corehandlers",
"aws/credentials",
"aws/credentials/ec2rolecreds",
"aws/credentials/endpointcreds",
"aws/credentials/stscreds",
"aws/defaults",
"aws/ec2metadata",
"aws/endpoints",
"aws/request",
"aws/session",
"aws/signer/v4",
"internal/shareddefaults",
"private/protocol",
"private/protocol/query",
"private/protocol/query/queryutil",
"private/protocol/rest",
"private/protocol/restxml",
"private/protocol/xml/xmlutil",
"service/s3",
"service/s3/s3iface",
"service/s3/s3manager",
"service/sts",
]
pruneopts = ""
revision = "b69f447375c7fa0047ebcdd8ae5d585d5aac2f71"
version = "v1.10.51"
[[projects]]
branch = "master"
digest = "1:cc8ebf0c6745d09f728f1fa4fbd29baaa2e3a65efb49b5fefb0c163171ee7863"
@ -131,7 +79,13 @@
[[projects]]
digest = "1:f958a1c137db276e52f0b50efee41a1a389dcdded59a69711f3e872757dab34b"
name = "github.com/golang/protobuf"
packages = ["proto"]
packages = [
"proto",
"ptypes",
"ptypes/any",
"ptypes/duration",
"ptypes/timestamp",
]
pruneopts = ""
revision = "b4deda0973fb4c70b50d226b1af49f3da59f5265"
version = "v1.1.0"
@ -152,13 +106,6 @@
revision = "76626ae9c91c4f2a10f34cad8ce83ea42c93bb75"
version = "v1.0"
[[projects]]
digest = "1:6f49eae0c1e5dab1dafafee34b207aeb7a42303105960944828c2079b92fc88e"
name = "github.com/jmespath/go-jmespath"
packages = ["."]
pruneopts = ""
revision = "0b12d6b5"
[[projects]]
branch = "master"
digest = "1:d261f80387a38eeddc1d819ee9ee56d37ca10fc02e6e09ff400fb0ce146e13dc"
@ -175,14 +122,6 @@
revision = "d1008ad1fd04ceb5faedaf34881df0c504382706"
version = "v3.1"
[[projects]]
branch = "master"
digest = "1:1dee6133ab829c8559a39031ad1e0e3538e4a7b34d3e0509d1fc247737e928c1"
name = "github.com/mitchellh/go-ps"
packages = ["."]
pruneopts = ""
revision = "4fdf99ab29366514c69ccccddab5dc58b8d84062"
[[projects]]
branch = "master"
digest = "1:eb9117392ee8e7aa44f78e0db603f70b1050ee0ebda4bd40040befb5b218c546"
@ -199,14 +138,6 @@
revision = "8ab4d0b364ef1e9af5d102531da20d5ec902b6c4"
version = "v0.2.0"
[[projects]]
branch = "master"
digest = "1:8d6d81d0d9d8153e65d637bda77a7c4e6ba496c61efac3578d7d8c981ac31a7b"
name = "github.com/rylio/ytdl"
packages = ["."]
pruneopts = ""
revision = "06f6510946275931157f5fe73f55ec7d6fd65870"
[[projects]]
branch = "master"
digest = "1:67b7dcb3b7e67cb6f96fb38fe7358bc1210453189da210e40cf357a92d57c1c1"
@ -281,9 +212,12 @@
name = "golang.org/x/net"
packages = [
"context",
"context/ctxhttp",
"html",
"html/atom",
"http/httpguts",
"http2",
"http2/hpack",
"idna",
"internal/timeseries",
"trace",
]
pruneopts = ""
revision = "db08ff08e8622530d9ed3a0e8ac279f6d4c02196"
@ -300,18 +234,70 @@
revision = "bff228c7b664c5fce602223a05fb708fd8654986"
[[projects]]
branch = "master"
digest = "1:b064108d68f82d0201d9f812297c928e57488e82ccdb77ed06ac69f64519a890"
name = "google.golang.org/api"
digest = "1:5acd3512b047305d49e8763eef7ba423901e85d5dd2fd1e71778a0ea8de10bd4"
name = "golang.org/x/text"
packages = [
"gensupport",
"googleapi",
"googleapi/internal/uritemplates",
"googleapi/transport",
"youtube/v3",
"collate",
"collate/build",
"internal/colltab",
"internal/gen",
"internal/tag",
"internal/triegen",
"internal/ucd",
"language",
"secure/bidirule",
"transform",
"unicode/bidi",
"unicode/cldr",
"unicode/norm",
"unicode/rangetable",
]
pruneopts = ""
revision = "ef86ce4234efee96020bde00391d6a9cfae66561"
revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0"
version = "v0.3.0"
[[projects]]
branch = "master"
digest = "1:1b3b4ec811695907c4a3cb92e4f32834a4a42459bff7e02068b6b2b5344803cd"
name = "google.golang.org/genproto"
packages = ["googleapis/rpc/status"]
pruneopts = ""
revision = "af9cb2a35e7f169ec875002c1829c9b315cddc04"
[[projects]]
digest = "1:15656947b87a6a240e61dcfae9e71a55a8d5677f240d12ab48f02cdbabf1e309"
name = "google.golang.org/grpc"
packages = [
".",
"balancer",
"balancer/base",
"balancer/roundrobin",
"codes",
"connectivity",
"credentials",
"encoding",
"encoding/proto",
"grpclog",
"internal",
"internal/backoff",
"internal/channelz",
"internal/envconfig",
"internal/grpcrand",
"internal/transport",
"keepalive",
"metadata",
"naming",
"peer",
"resolver",
"resolver/dns",
"resolver/passthrough",
"stats",
"status",
"tap",
]
pruneopts = ""
revision = "8dea3dc473e90c8179e519d91302d0597c0ca1d1"
version = "v1.15.0"
[[projects]]
digest = "1:f771bf87a3253de520c2af6fb6e75314dce0fedc0b30b208134fe502932bb15d"
@ -325,12 +311,6 @@
analyzer-name = "dep"
analyzer-version = 1
input-imports = [
"github.com/aws/aws-sdk-go/aws",
"github.com/aws/aws-sdk-go/aws/awserr",
"github.com/aws/aws-sdk-go/aws/credentials",
"github.com/aws/aws-sdk-go/aws/session",
"github.com/aws/aws-sdk-go/service/s3",
"github.com/aws/aws-sdk-go/service/s3/s3manager",
"github.com/btcsuite/btcd/chaincfg",
"github.com/btcsuite/btcd/chaincfg/chainhash",
"github.com/btcsuite/btcd/rpcclient",
@ -339,12 +319,11 @@
"github.com/davecgh/go-spew/spew",
"github.com/go-errors/errors",
"github.com/go-ini/ini",
"github.com/golang/protobuf/proto",
"github.com/lbryio/lbryschema.go/pb",
"github.com/lbryio/ozzo-validation",
"github.com/mitchellh/go-ps",
"github.com/mitchellh/mapstructure",
"github.com/nlopes/slack",
"github.com/rylio/ytdl",
"github.com/shopspring/decimal",
"github.com/sirupsen/logrus",
"github.com/spf13/cast",
@ -353,8 +332,8 @@
"github.com/zeebo/bencode",
"golang.org/x/crypto/ripemd160",
"golang.org/x/crypto/sha3",
"google.golang.org/api/googleapi/transport",
"google.golang.org/api/youtube/v3",
"golang.org/x/net/context",
"google.golang.org/grpc",
"gopkg.in/nullbio/null.v6/convert",
]
solver-name = "gps-cdcl"

View file

@ -6,10 +6,6 @@
name = "github.com/go-errors/errors"
version = "1.0.0"
[[constraint]]
branch = "master"
name = "github.com/rylio/ytdl"
[[constraint]]
branch = "master"
name = "github.com/lbryio/lbryschema.go"
@ -42,10 +38,6 @@
branch = "master"
name = "github.com/zeebo/bencode"
[[constraint]]
branch = "master"
name = "google.golang.org/api"
[[constraint]]
branch = "master"
name = "github.com/btcsuite/btcd"
@ -56,7 +48,4 @@
[[constraint]]
branch = "master"
name = "github.com/btcsuite/btcutil"
[[constraint]]
name = "github.com/aws/aws-sdk-go"
version = "^1.10.51"
name = "github.com/btcsuite/btcutil"

View file

@ -1,38 +0,0 @@
package cmd
import (
sync "github.com/lbryio/lbry.go/ytsync"
"github.com/lbryio/lbry.go/ytsync/sdk"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
func init() {
var ytCountCmd = &cobra.Command{
Use: "ytcount <youtube_api_key> <youtube_channel_id>",
Args: cobra.ExactArgs(2),
Short: "Count videos in a youtube channel",
Run: ytcount,
}
RootCmd.AddCommand(ytCountCmd)
}
func ytcount(cmd *cobra.Command, args []string) {
ytAPIKey := args[0]
channelID := args[1]
s := sync.Sync{
APIConfig: &sdk.APIConfig{
YoutubeAPIKey: ytAPIKey,
},
YoutubeChannelID: channelID,
}
count, err := s.CountVideos()
if err != nil {
panic(err)
}
log.Printf("%d videos in channel %s\n", count, channelID)
}

View file

@ -1,184 +0,0 @@
package cmd
import (
"os"
"time"
"os/user"
"github.com/lbryio/lbry.go/util"
sync "github.com/lbryio/lbry.go/ytsync"
"github.com/lbryio/lbry.go/ytsync/sdk"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
const defaultMaxTries = 3
var (
stopOnError bool
maxTries int
takeOverExistingChannel bool
refill int
limit int
skipSpaceCheck bool
syncUpdate bool
singleRun bool
syncStatus string
channelID string
syncFrom int64
syncUntil int64
concurrentJobs int
videosLimit int
maxVideoSize int
)
func init() {
var ytSyncCmd = &cobra.Command{
Use: "ytsync",
Args: cobra.RangeArgs(0, 0),
Short: "Publish youtube channels into LBRY network automatically.",
Run: ytSync,
}
ytSyncCmd.Flags().BoolVar(&stopOnError, "stop-on-error", false, "If a publish fails, stop all publishing and exit")
ytSyncCmd.Flags().IntVar(&maxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails")
ytSyncCmd.Flags().BoolVar(&takeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
ytSyncCmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync")
ytSyncCmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
ytSyncCmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones")
ytSyncCmd.Flags().BoolVar(&singleRun, "run-once", false, "Whether the process should be stopped after one cycle or not")
ytSyncCmd.Flags().StringVar(&syncStatus, "status", "", "Specify which queue to pull from. Overrides --update")
ytSyncCmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.")
ytSyncCmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
ytSyncCmd.Flags().Int64Var(&syncUntil, "before", time.Now().Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)")
ytSyncCmd.Flags().IntVar(&concurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently")
ytSyncCmd.Flags().IntVar(&videosLimit, "videos-limit", 1000, "how many videos to process per channel")
ytSyncCmd.Flags().IntVar(&maxVideoSize, "max-size", 2048, "Maximum video size to process (in MB)")
RootCmd.AddCommand(ytSyncCmd)
}
func ytSync(cmd *cobra.Command, args []string) {
var hostname string
slackToken := os.Getenv("SLACK_TOKEN")
if slackToken == "" {
log.Error("A slack token was not present in env vars! Slack messages disabled!")
} else {
var err error
hostname, err = os.Hostname()
if err != nil {
log.Error("could not detect system hostname")
hostname = "ytsync-unknown"
}
util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname)
}
if syncStatus != "" && !util.InSlice(syncStatus, sync.SyncStatuses) {
log.Errorf("status must be one of the following: %v\n", sync.SyncStatuses)
return
}
if stopOnError && maxTries != defaultMaxTries {
log.Errorln("--stop-on-error and --max-tries are mutually exclusive")
return
}
if maxTries < 1 {
log.Errorln("setting --max-tries less than 1 doesn't make sense")
return
}
if limit < 0 {
log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense")
return
}
apiURL := os.Getenv("LBRY_API")
apiToken := os.Getenv("LBRY_API_TOKEN")
youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY")
blobsDir := os.Getenv("BLOBS_DIRECTORY")
lbrycrdString := os.Getenv("LBRYCRD_STRING")
awsS3ID := os.Getenv("AWS_S3_ID")
awsS3Secret := os.Getenv("AWS_S3_SECRET")
awsS3Region := os.Getenv("AWS_S3_REGION")
awsS3Bucket := os.Getenv("AWS_S3_BUCKET")
if apiURL == "" {
log.Errorln("An API URL was not defined. Please set the environment variable LBRY_API")
return
}
if apiToken == "" {
log.Errorln("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN")
return
}
if youtubeAPIKey == "" {
log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY")
return
}
if awsS3ID == "" {
log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID")
return
}
if awsS3Secret == "" {
log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET")
return
}
if awsS3Region == "" {
log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION")
return
}
if awsS3Bucket == "" {
log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET")
return
}
if lbrycrdString == "" {
log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else")
}
if blobsDir == "" {
usr, err := user.Current()
if err != nil {
log.Errorln(err.Error())
return
}
blobsDir = usr.HomeDir + "/.lbrynet/blobfiles/"
}
syncProperties := &sdk.SyncProperties{
SyncFrom: syncFrom,
SyncUntil: syncUntil,
YoutubeChannelID: channelID,
}
apiConfig := &sdk.APIConfig{
YoutubeAPIKey: youtubeAPIKey,
ApiURL: apiURL,
ApiToken: apiToken,
HostName: hostname,
}
sm := sync.NewSyncManager(
stopOnError,
maxTries,
takeOverExistingChannel,
refill,
limit,
skipSpaceCheck,
syncUpdate,
concurrentJobs,
concurrentJobs,
blobsDir,
videosLimit,
maxVideoSize,
lbrycrdString,
awsS3ID,
awsS3Secret,
awsS3Region,
awsS3Bucket,
syncStatus,
singleRun,
syncProperties,
apiConfig,
)
err := sm.Start()
if err != nil {
sync.SendErrorToSlack(err.Error())
}
sync.SendInfoToSlack("Syncing process terminated!")
}

View file

@ -1,13 +0,0 @@
# YT Sync Process
- make sure you don't have a `.lbryum/wallets/default_wallet`
- delete existing wallet if there's nothing you need there, or better yet, move it somewhere else in case you need it later
- make sure daemon is stopped and can be controlled with `systemctl`
- run `lbry ytsync YOUTUBE_KEY LBRY_CHANNEL_NAME YOUTUBE_CHANNEL_ID`
- after sync is complete, daemon will be stopped and wallet will be moved to `~/wallets/`
- now mark content as synced in doc
Running the sync command for a channel that was already started will resume the sync. This can also be used to update a channel with new
content that was put on Youtube since the last sync.
---

View file

@ -1,32 +0,0 @@
package ytsync
import (
"net/http"
"github.com/lbryio/lbry.go/errors"
"google.golang.org/api/googleapi/transport"
"google.golang.org/api/youtube/v3"
)
func (s *Sync) CountVideos() (uint64, error) {
client := &http.Client{
Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey},
}
service, err := youtube.New(client)
if err != nil {
return 0, errors.Prefix("error creating YouTube service", err)
}
response, err := service.Channels.List("statistics").Id(s.YoutubeChannelID).Do()
if err != nil {
return 0, errors.Prefix("error getting channels", err)
}
if len(response.Items) < 1 {
return 0, errors.Err("youtube channel not found")
}
return response.Items[0].Statistics.VideoCount, nil
}

View file

@ -1,227 +0,0 @@
package ytsync
import (
"fmt"
"strings"
"syscall"
"time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/util"
"github.com/lbryio/lbry.go/ytsync/namer"
"github.com/lbryio/lbry.go/ytsync/sdk"
log "github.com/sirupsen/logrus"
)
type SyncManager struct {
stopOnError bool
maxTries int
takeOverExistingChannel bool
refill int
limit int
skipSpaceCheck bool
syncUpdate bool
concurrentJobs int
concurrentVideos int
blobsDir string
videosLimit int
maxVideoSize int
lbrycrdString string
awsS3ID string
awsS3Secret string
awsS3Region string
syncStatus string
awsS3Bucket string
singleRun bool
syncProperties *sdk.SyncProperties
apiConfig *sdk.APIConfig
namer *namer.Namer
}
func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool, refill int, limit int,
skipSpaceCheck bool, syncUpdate bool, concurrentJobs int, concurrentVideos int, blobsDir string, videosLimit int,
maxVideoSize int, lbrycrdString string, awsS3ID string, awsS3Secret string, awsS3Region string, awsS3Bucket string,
syncStatus string, singleRun bool, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig) *SyncManager {
return &SyncManager{
stopOnError: stopOnError,
maxTries: maxTries,
takeOverExistingChannel: takeOverExistingChannel,
refill: refill,
limit: limit,
skipSpaceCheck: skipSpaceCheck,
syncUpdate: syncUpdate,
concurrentJobs: concurrentJobs,
concurrentVideos: concurrentVideos,
blobsDir: blobsDir,
videosLimit: videosLimit,
maxVideoSize: maxVideoSize,
lbrycrdString: lbrycrdString,
awsS3ID: awsS3ID,
awsS3Secret: awsS3Secret,
awsS3Region: awsS3Region,
awsS3Bucket: awsS3Bucket,
syncStatus: syncStatus,
singleRun: singleRun,
syncProperties: syncProperties,
apiConfig: apiConfig,
namer: namer.NewNamer(),
}
}
const (
StatusPending = "pending" // waiting for permission to sync
StatusQueued = "queued" // in sync queue. will be synced soon
StatusSyncing = "syncing" // syncing now
StatusSynced = "synced" // done
StatusFailed = "failed"
StatusFinalized = "finalized" // no more changes allowed
)
var SyncStatuses = []string{StatusPending, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized}
const (
VideoStatusPublished = "published"
VideoStatusFailed = "failed"
)
func (s *SyncManager) Start() error {
syncCount := 0
for {
err := s.checkUsedSpace()
if err != nil {
return err
}
var syncs []Sync
shouldInterruptLoop := false
isSingleChannelSync := s.syncProperties.YoutubeChannelID != ""
if isSingleChannelSync {
channels, err := s.apiConfig.FetchChannels("", s.syncProperties)
if err != nil {
return err
}
if len(channels) != 1 {
return errors.Err("Expected 1 channel, %d returned", len(channels))
}
lbryChannelName := channels[0].DesiredChannelName
syncs = make([]Sync, 1)
syncs[0] = Sync{
APIConfig: s.apiConfig,
YoutubeChannelID: s.syncProperties.YoutubeChannelID,
LbryChannelName: lbryChannelName,
StopOnError: s.stopOnError,
MaxTries: s.maxTries,
ConcurrentVideos: s.concurrentVideos,
TakeOverExistingChannel: s.takeOverExistingChannel,
Refill: s.refill,
Manager: s,
LbrycrdString: s.lbrycrdString,
AwsS3ID: s.awsS3ID,
AwsS3Secret: s.awsS3Secret,
AwsS3Region: s.awsS3Region,
AwsS3Bucket: s.awsS3Bucket,
namer: s.namer,
}
shouldInterruptLoop = true
} else {
var queuesToSync []string
if s.syncStatus != "" {
queuesToSync = append(queuesToSync, s.syncStatus)
} else if s.syncUpdate {
queuesToSync = append(queuesToSync, StatusSyncing, StatusSynced)
} else {
queuesToSync = append(queuesToSync, StatusSyncing, StatusQueued)
}
for _, q := range queuesToSync {
channels, err := s.apiConfig.FetchChannels(q, s.syncProperties)
if err != nil {
return err
}
for _, c := range channels {
syncs = append(syncs, Sync{
APIConfig: s.apiConfig,
YoutubeChannelID: c.ChannelId,
LbryChannelName: c.DesiredChannelName,
StopOnError: s.stopOnError,
MaxTries: s.maxTries,
ConcurrentVideos: s.concurrentVideos,
TakeOverExistingChannel: s.takeOverExistingChannel,
Refill: s.refill,
Manager: s,
LbrycrdString: s.lbrycrdString,
AwsS3ID: s.awsS3ID,
AwsS3Secret: s.awsS3Secret,
AwsS3Region: s.awsS3Region,
AwsS3Bucket: s.awsS3Bucket,
})
}
}
}
if len(syncs) == 0 {
log.Infoln("No channels to sync. Pausing 5 minutes!")
time.Sleep(5 * time.Minute)
}
for i, sync := range syncs {
shouldNotCount := false
SendInfoToSlack("Syncing %s (%s) to LBRY! (iteration %d/%d - total processed channels: %d)", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1)
err := sync.FullCycle()
if err != nil {
fatalErrors := []string{
"default_wallet already exists",
"WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR",
"NotEnoughFunds",
"no space left on device",
"failure uploading wallet",
}
if util.SubstringInSlice(err.Error(), fatalErrors) {
return errors.Prefix("@Nikooo777 this requires manual intervention! Exiting...", err)
}
shouldNotCount = strings.Contains(err.Error(), "this youtube channel is being managed by another server")
if !shouldNotCount {
SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error())
}
}
SendInfoToSlack("Syncing %s (%s) reached an end. (iteration %d/%d - total processed channels: %d)", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1)
if !shouldNotCount {
syncCount++
}
if sync.IsInterrupted() || (s.limit != 0 && syncCount >= s.limit) {
shouldInterruptLoop = true
break
}
}
if shouldInterruptLoop || s.singleRun {
break
}
}
return nil
}
func (s *SyncManager) checkUsedSpace() error {
usedPctile, err := GetUsedSpace(s.blobsDir)
if err != nil {
return err
}
if usedPctile >= 0.90 && !s.skipSpaceCheck {
return errors.Err(fmt.Sprintf("more than 90%% of the space has been used. use --skip-space-check to ignore. Used: %.1f%%", usedPctile*100))
}
log.Infof("disk usage: %.1f%%", usedPctile*100)
return nil
}
// GetUsedSpace returns a value between 0 and 1, with 0 being completely empty and 1 being full, for the disk that holds the provided path
func GetUsedSpace(path string) (float32, error) {
var stat syscall.Statfs_t
err := syscall.Statfs(path, &stat)
if err != nil {
return 0, err
}
// Available blocks * size per block = available space in bytes
all := stat.Blocks * uint64(stat.Bsize)
free := stat.Bfree * uint64(stat.Bsize)
used := all - free
return float32(used) / float32(all), nil
}

View file

@ -1,83 +0,0 @@
package namer
import (
"crypto/md5"
"encoding/hex"
"fmt"
"regexp"
"strconv"
"strings"
"sync"
)
var titleRegexp = regexp.MustCompile(`[^a-zA-Z0-9]+`)
type Namer struct {
mu *sync.Mutex
names map[string]bool
}
func NewNamer() *Namer {
return &Namer{
mu: &sync.Mutex{},
names: make(map[string]bool),
}
}
func (n *Namer) SetNames(names map[string]bool) {
n.names = names
}
func (n *Namer) GetNextName(prefix string) string {
n.mu.Lock()
defer n.mu.Unlock()
attempt := 1
var name string
for {
name = getClaimNameFromTitle(prefix, attempt)
if _, exists := n.names[name]; !exists {
break
}
attempt++
}
//if for some reasons the title can't be converted in a valid claim name (too short or not latin) then we use a hash
if len(name) < 2 {
sum := md5.Sum([]byte(prefix))
name = fmt.Sprintf("%s-%d", hex.EncodeToString(sum[:])[:15], attempt)
}
n.names[name] = true
return name
}
// TODO: clean this up some
func getClaimNameFromTitle(title string, attempt int) string {
suffix := ""
if attempt > 1 {
suffix = "-" + strconv.Itoa(attempt)
}
maxLen := 40 - len(suffix)
chunks := strings.Split(strings.ToLower(strings.Trim(titleRegexp.ReplaceAllString(title, "-"), "-")), "-")
name := chunks[0]
if len(name) > maxLen {
return name[:maxLen]
}
for _, chunk := range chunks[1:] {
tmpName := name + "-" + chunk
if len(tmpName) > maxLen {
if len(name) < 20 {
name = tmpName[:maxLen]
}
break
}
name = tmpName
}
return name + suffix
}

View file

@ -1,170 +0,0 @@
package sdk
import (
"encoding/json"
"io/ioutil"
"log"
"net/http"
"net/url"
"strconv"
"time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/null"
)
const (
MaxReasonLength = 500
)
type APIConfig struct {
YoutubeAPIKey string
ApiURL string
ApiToken string
HostName string
}
type SyncProperties struct {
SyncFrom int64
SyncUntil int64
YoutubeChannelID string
}
type YoutubeChannel struct {
ChannelId string `json:"channel_id"`
TotalVideos uint `json:"total_videos"`
DesiredChannelName string `json:"desired_channel_name"`
SyncServer null.String `json:"sync_server"`
Fee *struct {
Amount string `json:"amount"`
Address string `json:"address"`
Currency string `json:"currency"`
} `json:"fee"`
}
func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeChannel, error) {
type apiJobsResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data []YoutubeChannel `json:"data"`
}
endpoint := a.ApiURL + "/yt/jobs"
res, _ := http.PostForm(endpoint, url.Values{
"auth_token": {a.ApiToken},
"sync_status": {status},
"min_videos": {strconv.Itoa(1)},
"after": {strconv.Itoa(int(cp.SyncFrom))},
"before": {strconv.Itoa(int(cp.SyncUntil))},
"sync_server": {a.HostName},
"channel_id": {cp.YoutubeChannelID},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiJobsResponse
err := json.Unmarshal(body, &response)
if err != nil {
return nil, err
}
if response.Data == nil {
return nil, errors.Err(response.Error)
}
log.Printf("Fetched channels: %d", len(response.Data))
return response.Data, nil
}
type SyncedVideo struct {
VideoID string `json:"video_id"`
Published bool `json:"published"`
FailureReason string `json:"failure_reason"`
ClaimName string `json:"claim_name"`
}
func (a *APIConfig) SetChannelStatus(channelID string, status string, failureReason string) (map[string]SyncedVideo, map[string]bool, error) {
type apiChannelStatusResponse struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data []SyncedVideo `json:"data"`
}
endpoint := a.ApiURL + "/yt/channel_status"
if len(failureReason) > MaxReasonLength {
failureReason = failureReason[:MaxReasonLength]
}
res, _ := http.PostForm(endpoint, url.Values{
"channel_id": {channelID},
"sync_server": {a.HostName},
"auth_token": {a.ApiToken},
"sync_status": {status},
"failure_reason": {failureReason},
})
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response apiChannelStatusResponse
err := json.Unmarshal(body, &response)
if err != nil {
return nil, nil, err
}
if !response.Error.IsNull() {
return nil, nil, errors.Err(response.Error.String)
}
if response.Data != nil {
svs := make(map[string]SyncedVideo)
claimNames := make(map[string]bool)
for _, v := range response.Data {
svs[v.VideoID] = v
claimNames[v.ClaimName] = v.Published
}
return svs, claimNames, nil
}
return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
const (
VideoStatusPublished = "published"
VideoStatusFailed = "failed"
)
func (a *APIConfig) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error {
endpoint := a.ApiURL + "/yt/video_status"
if len(failureReason) > MaxReasonLength {
failureReason = failureReason[:MaxReasonLength]
}
vals := url.Values{
"youtube_channel_id": {channelID},
"video_id": {videoID},
"status": {status},
"auth_token": {a.ApiToken},
}
if status == VideoStatusPublished {
if claimID == "" || claimName == "" {
return errors.Err("claimID or claimName missing")
}
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
vals.Add("claim_id", claimID)
vals.Add("claim_name", claimName)
if size != nil {
vals.Add("size", strconv.FormatInt(*size, 10))
}
}
if failureReason != "" {
vals.Add("failure_reason", failureReason)
}
res, _ := http.PostForm(endpoint, vals)
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data null.String `json:"data"`
}
err := json.Unmarshal(body, &response)
if err != nil {
return err
}
if !response.Error.IsNull() {
return errors.Err(response.Error.String)
}
if !response.Data.IsNull() && response.Data.String == "ok" {
return nil
}
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
}

View file

@ -1,293 +0,0 @@
package ytsync
import (
"strings"
"time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/lbrycrd"
"github.com/shopspring/decimal"
log "github.com/sirupsen/logrus"
)
func (s *Sync) walletSetup() error {
//prevent unnecessary concurrent execution
s.walletMux.Lock()
defer s.walletMux.Unlock()
err := s.ensureChannelOwnership()
if err != nil {
return err
}
balanceResp, err := s.daemon.WalletBalance()
if err != nil {
return err
} else if balanceResp == nil {
return errors.Err("no response")
}
balance := decimal.Decimal(*balanceResp)
log.Debugf("Starting balance is %s", balance.String())
var numOnSource int
if s.LbryChannelName == "@UCBerkeley" {
numOnSource = 10104
} else {
n, err := s.CountVideos()
if err != nil {
return err
}
numOnSource = int(n)
}
log.Debugf("Source channel has %d videos", numOnSource)
if numOnSource == 0 {
return nil
}
s.syncedVideosMux.RLock()
numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
s.syncedVideosMux.RUnlock()
log.Debugf("We already allocated credits for %d videos", numPublished)
if numOnSource-numPublished > s.Manager.videosLimit {
numOnSource = s.Manager.videosLimit
}
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount
if numPublished > numOnSource && balance.LessThan(decimal.NewFromFloat(1)) {
SendErrorToSlack("something is going on as we published more videos than those available on source: %d/%d", numPublished, numOnSource)
minBalance = 1 //since we ended up in this function it means some juice is still needed
}
amountToAdd, _ := decimal.NewFromFloat(minBalance).Sub(balance).Float64()
if s.Refill > 0 {
if amountToAdd < 0 {
amountToAdd = float64(s.Refill)
} else {
amountToAdd += float64(s.Refill)
}
}
if amountToAdd > 0 {
if amountToAdd < 1 {
amountToAdd = 1 // no reason to bother adding less than 1 credit
}
s.addCredits(amountToAdd)
}
claimAddress, err := s.daemon.WalletUnusedAddress()
if err != nil {
return err
} else if claimAddress == nil {
return errors.Err("could not get unused address")
}
s.claimAddress = string(*claimAddress)
if s.claimAddress == "" {
return errors.Err("found blank claim address")
}
err = s.ensureEnoughUTXOs()
if err != nil {
return err
}
return nil
}
func (s *Sync) ensureEnoughUTXOs() error {
utxolist, err := s.daemon.UTXOList()
if err != nil {
return err
} else if utxolist == nil {
return errors.Err("no response")
}
target := 40
slack := int(float32(0.1) * float32(target))
count := 0
for _, utxo := range *utxolist {
if !utxo.IsClaim && !utxo.IsSupport && !utxo.IsUpdate && utxo.Amount.Cmp(decimal.New(0, 0)) == 1 {
count++
}
}
if count < target-slack {
newAddresses := target - count
balance, err := s.daemon.WalletBalance()
if err != nil {
return err
} else if balance == nil {
return errors.Err("no response")
}
log.Println("balance is " + decimal.Decimal(*balance).String())
amountPerAddress := decimal.Decimal(*balance).Div(decimal.NewFromFloat(float64(target)))
log.Infof("Putting %s credits into each of %d new addresses", amountPerAddress.String(), newAddresses)
prefillTx, err := s.daemon.WalletPrefillAddresses(newAddresses, amountPerAddress, true)
if err != nil {
return err
} else if prefillTx == nil {
return errors.Err("no response")
}
err = s.waitForNewBlock()
if err != nil {
return err
}
} else if !allUTXOsConfirmed(utxolist) {
log.Println("Waiting for previous txns to confirm")
err := s.waitForNewBlock()
if err != nil {
return err
}
}
return nil
}
func (s *Sync) waitForNewBlock() error {
status, err := s.daemon.Status()
if err != nil {
return err
}
for status.Wallet.Blocks == 0 || status.Wallet.BlocksBehind != 0 {
time.Sleep(5 * time.Second)
status, err = s.daemon.Status()
if err != nil {
return err
}
}
currentBlock := status.Wallet.Blocks
for i := 0; status.Wallet.Blocks <= currentBlock; i++ {
if i%3 == 0 {
log.Printf("Waiting for new block (%d)...", currentBlock+1)
}
time.Sleep(10 * time.Second)
status, err = s.daemon.Status()
if err != nil {
return err
}
}
return nil
}
func (s *Sync) ensureChannelOwnership() error {
if s.LbryChannelName == "" {
return errors.Err("no channel name set")
}
channels, err := s.daemon.ChannelList()
if err != nil {
return err
} else if channels == nil {
return errors.Err("no channel response")
}
isChannelMine := false
for _, channel := range *channels {
if channel.Name == s.LbryChannelName {
s.lbryChannelID = channel.ClaimID
isChannelMine = true
} else {
return errors.Err("this wallet has multiple channels. maybe something went wrong during setup?")
}
}
if isChannelMine {
return nil
}
resolveResp, err := s.daemon.Resolve(s.LbryChannelName)
if err != nil {
return err
}
channel := (*resolveResp)[s.LbryChannelName]
channelBidAmount := channelClaimAmount
channelNotFound := channel.Error != nil && strings.Contains(*(channel.Error), "cannot be resolved")
if !channelNotFound {
if !s.TakeOverExistingChannel {
return errors.Err("Channel exists and we don't own it. Pick another channel.")
}
log.Println("Channel exists and we don't own it. Outbidding existing claim.")
channelBidAmount, _ = channel.Certificate.Amount.Add(decimal.NewFromFloat(channelClaimAmount)).Float64()
}
balanceResp, err := s.daemon.WalletBalance()
if err != nil {
return err
} else if balanceResp == nil {
return errors.Err("no response")
}
balance := decimal.Decimal(*balanceResp)
if balance.LessThan(decimal.NewFromFloat(channelBidAmount)) {
s.addCredits(channelBidAmount + 0.1)
}
c, err := s.daemon.ChannelNew(s.LbryChannelName, channelBidAmount)
if err != nil {
return err
}
s.lbryChannelID = c.ClaimID
return nil
}
func allUTXOsConfirmed(utxolist *jsonrpc.UTXOListResponse) bool {
if utxolist == nil {
return false
}
if len(*utxolist) < 1 {
return false
}
for _, utxo := range *utxolist {
if utxo.Height == 0 {
return false
}
}
return true
}
func (s *Sync) addCredits(amountToAdd float64) error {
log.Printf("Adding %f credits", amountToAdd)
var lbrycrdd *lbrycrd.Client
var err error
if s.LbrycrdString == "" {
lbrycrdd, err = lbrycrd.NewWithDefaultURL()
if err != nil {
return err
}
} else {
lbrycrdd, err = lbrycrd.New(s.LbrycrdString)
if err != nil {
return err
}
}
addressResp, err := s.daemon.WalletUnusedAddress()
if err != nil {
return err
} else if addressResp == nil {
return errors.Err("no response")
}
address := string(*addressResp)
_, err = lbrycrdd.SimpleSend(address, amountToAdd)
if err != nil {
return err
}
wait := 15 * time.Second
log.Println("Waiting " + wait.String() + " for lbryum to let us know we have the new transaction")
time.Sleep(wait)
return nil
}

View file

@ -1,27 +0,0 @@
package sources
import (
"strings"
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/ytsync/namer"
)
type SyncSummary struct {
ClaimID string
ClaimName string
}
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.PublishOptions, namer *namer.Namer) (*SyncSummary, error) {
for {
name := namer.GetNextName(title)
response, err := daemon.Publish(name, filename, amount, options)
if err != nil {
if strings.Contains(err.Error(), "failed: Multiple claims (") {
continue
}
return nil, err
}
return &SyncSummary{ClaimID: response.ClaimID, ClaimName: name}, nil
}
}

View file

@ -1,217 +0,0 @@
package sources
import (
"net/http"
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/ytsync/namer"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
log "github.com/sirupsen/logrus"
)
type ucbVideo struct {
id string
title string
channel string
description string
publishedAt time.Time
dir string
claimNames map[string]bool
syncedVideosMux *sync.RWMutex
}
func NewUCBVideo(id, title, channel, description, publishedAt, dir string) *ucbVideo {
p, _ := time.Parse(time.RFC3339Nano, publishedAt) // ignore parse errors
return &ucbVideo{
id: id,
title: title,
description: description,
channel: channel,
dir: dir,
publishedAt: p,
}
}
func (v *ucbVideo) ID() string {
return v.id
}
func (v *ucbVideo) PlaylistPosition() int {
return 0
}
func (v *ucbVideo) IDAndNum() string {
return v.ID() + " (?)"
}
func (v *ucbVideo) PublishedAt() time.Time {
return v.publishedAt
//r := regexp.MustCompile(`(\d\d\d\d)-(\d\d)-(\d\d)`)
//matches := r.FindStringSubmatch(v.title)
//if len(matches) > 0 {
// year, _ := strconv.Atoi(matches[1])
// month, _ := strconv.Atoi(matches[2])
// day, _ := strconv.Atoi(matches[3])
// return time.Date(year, time.Month(month), day, 0, 0, 0, 0, time.UTC)
//}
//return time.Now()
}
func (v *ucbVideo) getFilename() string {
return v.dir + "/" + v.id + ".mp4"
}
func (v *ucbVideo) getClaimName(attempt int) string {
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
suffix := ""
if attempt > 1 {
suffix = "-" + strconv.Itoa(attempt)
}
maxLen := 40 - len(suffix)
chunks := strings.Split(strings.ToLower(strings.Trim(reg.ReplaceAllString(v.title, "-"), "-")), "-")
name := chunks[0]
if len(name) > maxLen {
return name[:maxLen]
}
for _, chunk := range chunks[1:] {
tmpName := name + "-" + chunk
if len(tmpName) > maxLen {
if len(name) < 20 {
name = tmpName[:maxLen]
}
break
}
name = tmpName
}
return name + suffix
}
func (v *ucbVideo) getAbbrevDescription() string {
maxLines := 10
description := strings.TrimSpace(v.description)
if strings.Count(description, "\n") < maxLines {
return description
}
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
}
func (v *ucbVideo) download() error {
videoPath := v.getFilename()
_, err := os.Stat(videoPath)
if err != nil && !os.IsNotExist(err) {
return err
} else if err == nil {
log.Debugln(v.id + " already exists at " + videoPath)
return nil
}
creds := credentials.NewStaticCredentials("ID-GOES-HERE", "SECRET-GOES-HERE", "")
s, err := session.NewSession(&aws.Config{Region: aws.String("us-east-2"), Credentials: creds})
if err != nil {
return err
}
downloader := s3manager.NewDownloader(s)
out, err := os.Create(videoPath)
if err != nil {
return err
}
defer out.Close()
log.Println("lbry-niko2/videos/" + v.channel + "/" + v.id)
bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{
Bucket: aws.String("lbry-niko2"),
Key: aws.String("/videos/" + v.channel + "/" + v.id + ".mp4"),
})
if err != nil {
return err
} else if bytesWritten == 0 {
return errors.Err("zero bytes written")
}
return nil
}
func (v *ucbVideo) saveThumbnail() error {
resp, err := http.Get("https://s3.us-east-2.amazonaws.com/lbry-niko2/thumbnails/" + v.id)
if err != nil {
return err
}
defer resp.Body.Close()
creds := credentials.NewStaticCredentials("ID-GOES-HERE", "SECRET-GOES-HERE", "")
s, err := session.NewSession(&aws.Config{Region: aws.String("us-east-2"), Credentials: creds})
if err != nil {
return err
}
uploader := s3manager.NewUploader(s)
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String("berk.ninja"),
Key: aws.String("thumbnails/" + v.id),
ContentType: aws.String("image/jpeg"),
Body: resp.Body,
})
return err
}
func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *namer.Namer) (*SyncSummary, error) {
options := jsonrpc.PublishOptions{
Title: &v.title,
Author: strPtr("UC Berkeley"),
Description: strPtr(v.getAbbrevDescription()),
Language: strPtr("en"),
ClaimAddress: &claimAddress,
Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id),
License: strPtr("see description"),
ChannelID: &channelID,
ChangeAddress: &claimAddress,
}
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, namer)
}
func (v *ucbVideo) Size() *int64 {
return nil
}
func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *namer.Namer) (*SyncSummary, error) {
//download and thumbnail can be done in parallel
err := v.download()
if err != nil {
return nil, errors.Prefix("download error", err)
}
log.Debugln("Downloaded " + v.id)
//err = v.SaveThumbnail()
//if err != nil {
// return errors.WrapPrefix(err, "thumbnail error", 0)
//}
//log.Debugln("Created thumbnail for " + v.id)
summary, err := v.publish(daemon, claimAddress, amount, channelID, namer)
if err != nil {
return nil, errors.Prefix("publish error", err)
}
return summary, nil
}

View file

@ -1,294 +0,0 @@
package sources
import (
"bytes"
"encoding/json"
"io/ioutil"
"net/http"
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/ytsync/namer"
"github.com/rylio/ytdl"
log "github.com/sirupsen/logrus"
"google.golang.org/api/youtube/v3"
)
type YoutubeVideo struct {
id string
channelTitle string
title string
description string
playlistPosition int64
size *int64
maxVideoSize int64
publishedAt time.Time
dir string
claimNames map[string]bool
syncedVideosMux *sync.RWMutex
}
func NewYoutubeVideo(directory string, snippet *youtube.PlaylistItemSnippet) *YoutubeVideo {
publishedAt, _ := time.Parse(time.RFC3339Nano, snippet.PublishedAt) // ignore parse errors
return &YoutubeVideo{
id: snippet.ResourceId.VideoId,
title: snippet.Title,
description: snippet.Description,
channelTitle: snippet.ChannelTitle,
playlistPosition: snippet.Position,
publishedAt: publishedAt,
dir: directory,
}
}
func (v *YoutubeVideo) ID() string {
return v.id
}
func (v *YoutubeVideo) PlaylistPosition() int {
return int(v.playlistPosition)
}
func (v *YoutubeVideo) IDAndNum() string {
return v.ID() + " (" + strconv.Itoa(int(v.playlistPosition)) + " in channel)"
}
func (v *YoutubeVideo) PublishedAt() time.Time {
return v.publishedAt
}
func (v *YoutubeVideo) getFilename() string {
maxLen := 30
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
chunks := strings.Split(strings.ToLower(strings.Trim(reg.ReplaceAllString(v.title, "-"), "-")), "-")
name := chunks[0]
if len(name) > maxLen {
name = name[:maxLen]
}
for _, chunk := range chunks[1:] {
tmpName := name + "-" + chunk
if len(tmpName) > maxLen {
if len(name) < 20 {
name = tmpName[:maxLen]
}
break
}
name = tmpName
}
if len(name) < 1 {
name = v.id
}
return v.videoDir() + "/" + name + ".mp4"
}
func (v *YoutubeVideo) getAbbrevDescription() string {
maxLines := 10
description := strings.TrimSpace(v.description)
if strings.Count(description, "\n") < maxLines {
return description
}
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
}
func (v *YoutubeVideo) download() error {
videoPath := v.getFilename()
err := os.Mkdir(v.videoDir(), 0750)
if err != nil && !strings.Contains(err.Error(), "file exists") {
return errors.Wrap(err, 0)
}
_, err = os.Stat(videoPath)
if err != nil && !os.IsNotExist(err) {
return err
} else if err == nil {
log.Debugln(v.id + " already exists at " + videoPath)
return nil
}
videoUrl := "https://www.youtube.com/watch?v=" + v.id
videoInfo, err := ytdl.GetVideoInfo(videoUrl)
if err != nil {
return err
}
codec := []string{"H.264"}
ext := []string{"mp4"}
//Filter requires a [] interface{}
codecFilter := make([]interface{}, len(codec))
for i, v := range codec {
codecFilter[i] = v
}
//Filter requires a [] interface{}
extFilter := make([]interface{}, len(ext))
for i, v := range ext {
extFilter[i] = v
}
formats := videoInfo.Formats.Filter(ytdl.FormatVideoEncodingKey, codecFilter).Filter(ytdl.FormatExtensionKey, extFilter)
if len(formats) == 0 {
return errors.Err("no compatible format available for this video")
}
maxRetryAttempts := 5
for i := 0; i < len(formats) && i < maxRetryAttempts; i++ {
formatIndex := i
if i == maxRetryAttempts-1 {
formatIndex = len(formats) - 1
}
var downloadedFile *os.File
downloadedFile, err = os.Create(videoPath)
if err != nil {
return err
}
err = videoInfo.Download(formats[formatIndex], downloadedFile)
downloadedFile.Close()
if err != nil {
//delete the video and ignore the error
_ = v.delete()
break
}
fi, err := os.Stat(v.getFilename())
if err != nil {
return err
}
videoSize := fi.Size()
v.size = &videoSize
if videoSize > v.maxVideoSize {
//delete the video and ignore the error
_ = v.delete()
err = errors.Err("file is too big and there is no other format available")
} else {
break
}
}
return err
}
func (v *YoutubeVideo) videoDir() string {
return v.dir + "/" + v.id
}
func (v *YoutubeVideo) delete() error {
videoPath := v.getFilename()
err := os.Remove(videoPath)
if err != nil {
log.Errorln(errors.Prefix("delete error", err))
return err
}
log.Debugln(v.id + " deleted from disk (" + videoPath + ")")
return nil
}
func (v *YoutubeVideo) triggerThumbnailSave() error {
client := &http.Client{Timeout: 30 * time.Second}
params, err := json.Marshal(map[string]string{"videoid": v.id})
if err != nil {
return err
}
request, err := http.NewRequest(http.MethodPut, "https://jgp4g1qoud.execute-api.us-east-1.amazonaws.com/prod/thumbnail", bytes.NewBuffer(params))
if err != nil {
return err
}
response, err := client.Do(request)
if err != nil {
return err
}
defer response.Body.Close()
contents, err := ioutil.ReadAll(response.Body)
if err != nil {
return err
}
var decoded struct {
Error int `json:"error"`
Url string `json:"url,omitempty"`
Message string `json:"message,omitempty"`
}
err = json.Unmarshal(contents, &decoded)
if err != nil {
return err
}
if decoded.Error != 0 {
return errors.Err("error creating thumbnail: " + decoded.Message)
}
return nil
}
func strPtr(s string) *string { return &s }
func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *namer.Namer) (*SyncSummary, error) {
if channelID == "" {
return nil, errors.Err("a claim_id for the channel wasn't provided") //TODO: this is probably not needed?
}
options := jsonrpc.PublishOptions{
Title: &v.title,
Author: &v.channelTitle,
Description: strPtr(v.getAbbrevDescription() + "\nhttps://www.youtube.com/watch?v=" + v.id),
Language: strPtr("en"),
ClaimAddress: &claimAddress,
Thumbnail: strPtr("https://berk.ninja/thumbnails/" + v.id),
License: strPtr("Copyrighted (contact author)"),
ChangeAddress: &claimAddress,
ChannelID: &channelID,
}
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, namer)
}
func (v *YoutubeVideo) Size() *int64 {
return v.size
}
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *namer.Namer) (*SyncSummary, error) {
v.maxVideoSize = int64(maxVideoSize) * 1024 * 1024
//download and thumbnail can be done in parallel
err := v.download()
if err != nil {
return nil, errors.Prefix("download error", err)
}
log.Debugln("Downloaded " + v.id)
err = v.triggerThumbnailSave()
if err != nil {
return nil, errors.Prefix("thumbnail error", err)
}
log.Debugln("Created thumbnail for " + v.id)
summary, err := v.publish(daemon, claimAddress, amount, channelID, namer)
//delete the video in all cases (and ignore the error)
_ = v.delete()
return summary, errors.Prefix("publish error", err)
}
// sorting videos
//type ByPublishedAt []YoutubeVideo
//
//func (a ByPublishedAt) Len() int { return len(a) }
//func (a ByPublishedAt) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
//func (a ByPublishedAt) Less(i, j int) bool { return a[i].publishedAt.Before(a[j].publishedAt) }
//
//type ByPlaylistPosition []YoutubeVideo
//
//func (a ByPlaylistPosition) Len() int { return len(a) }
//func (a ByPlaylistPosition) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
//func (a ByPlaylistPosition) Less(i, j int) bool { return a[i].playlistPosition < a[j].playlistPosition }

View file

@ -1,152 +0,0 @@
import os
import sys
from decimal import Decimal
from bitcoinrpc.authproxy import AuthServiceProxy
from lbryum.wallet import Wallet, WalletStorage
from lbryum.commands import known_commands, Commands
from lbryum.simple_config import SimpleConfig
from lbryum.blockchain import get_blockchain
from lbryum.network import Network
def get_lbrycrdd_connection_string(wallet_conf):
settings = {"username": "rpcuser",
"password": "rpcpassword",
"rpc_port": 9245}
if wallet_conf and os.path.exists(wallet_conf):
with open(wallet_conf, "r") as conf:
conf_lines = conf.readlines()
for l in conf_lines:
if l.startswith("rpcuser="):
settings["username"] = l[8:].rstrip('\n')
if l.startswith("rpcpassword="):
settings["password"] = l[12:].rstrip('\n')
if l.startswith("rpcport="):
settings["rpc_port"] = int(l[8:].rstrip('\n'))
rpc_user = settings["username"]
rpc_pass = settings["password"]
rpc_port = settings["rpc_port"]
rpc_url = "127.0.0.1"
return "http://%s:%s@%s:%i" % (rpc_user, rpc_pass, rpc_url, rpc_port)
class LBRYumWallet(object):
def __init__(self, lbryum_path):
self.config = SimpleConfig()
self.config.set_key('chain', 'lbrycrd_main')
self.storage = WalletStorage(lbryum_path)
self.wallet = Wallet(self.storage)
self.cmd_runner = Commands(self.config, self.wallet, None)
if not self.wallet.has_seed():
seed = self.wallet.make_seed()
self.wallet.add_seed(seed, "derp")
self.wallet.create_master_keys("derp")
self.wallet.create_main_account()
self.wallet.update_password("derp", "")
self.network = Network(self.config)
self.blockchain = get_blockchain(self.config, self.network)
print self.config.get('chain'), self.blockchain
self.wallet.storage.write()
def command(self, command_name, *args, **kwargs):
cmd_runner = Commands(self.config, self.wallet, None)
cmd = known_commands[command_name]
func = getattr(cmd_runner, cmd.name)
return func(*args, **kwargs)
def generate_address(self):
address = self.wallet.create_new_address()
self.wallet.storage.write()
return address
class LBRYcrd(object):
def __init__(self, lbrycrdd_path):
self.lbrycrdd_conn_str = get_lbrycrdd_connection_string(lbrycrdd_path)
def __call__(self, method, *args, **kwargs):
return self.rpc(method)(*args, **kwargs)
def rpc(self, method):
return AuthServiceProxy(self.lbrycrdd_conn_str, service_name=method)
def get_wallet_path():
cwd = os.getcwd()
wallet_path = os.path.join(cwd, "wallet.json")
if not os.path.exists(wallet_path):
return wallet_path
i = 1
while True:
wallet_path = os.path.join(cwd, "wallet_%i.json" % i)
if not os.path.exists(wallet_path):
return wallet_path
i += 1
def coin_chooser(lbrycrdd, amount, fee=0.001):
def iter_txis():
unspent = lbrycrdd("listunspent")
unspent = sorted(unspent, key=lambda x: x['amount'], reverse=True)
spendable = Decimal(0.0)
for txi in unspent:
if spendable >= amount:
break
else:
spendable += txi['amount']
yield txi
if spendable < amount:
print spendable, amount
raise Exception("Not enough funds")
coins = list(iter(iter_txis()))
total = sum(c['amount'] for c in coins)
change = Decimal(total) - Decimal(amount) - Decimal(fee)
if change < 0:
raise Exception("Not enough funds")
if change:
change_address = lbrycrdd("getnewaddress")
else:
change_address = None
print "Total: %f, amount: %f, change: %f" % (total, amount, change)
return coins, change, change_address
def get_raw_tx(lbrycrdd, addresses, coins, amount, change, change_address):
txi = [{'txid': c['txid'], 'vout': c['vout']} for c in coins]
txo = {address: float(amount) for address in addresses}
if change_address:
txo[change_address] = float(change)
return lbrycrdd("createrawtransaction", txi, txo)
def main(count, value=None, lbryum_path=None, lbrycrdd_path=None):
count = int(count)
lbryum_path = lbryum_path or get_wallet_path()
if sys.platform == "darwin":
default_lbrycrdd = os.path.join(os.path.expanduser("~"),
"Library/Application Support/lbrycrd/lbrycrd.conf")
else:
default_lbrycrdd = os.path.join(os.path.expanduser("~"), ".lbrycrd/lbrycrd.conf")
lbrycrdd_path = lbrycrdd_path or default_lbrycrdd
l = LBRYcrd(lbrycrdd_path=lbrycrdd_path)
s = LBRYumWallet(lbryum_path)
value = value or 1.0
value = Decimal(value)
coins, change, change_address = coin_chooser(l, count * value)
addresses = [s.generate_address() for i in range(count)]
raw_tx = get_raw_tx(l, addresses, coins, value, change, change_address)
signed = l("signrawtransaction", raw_tx)['hex']
txid = l("sendrawtransaction", signed)
print txid
if __name__ == "__main__":
args = sys.argv[1:]
main(*args)

View file

@ -1,848 +0,0 @@
package ytsync
import (
"bufio"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"os/exec"
"os/signal"
"sort"
"strings"
"sync"
"syscall"
"time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/jsonrpc"
"github.com/lbryio/lbry.go/stop"
"github.com/lbryio/lbry.go/util"
"github.com/lbryio/lbry.go/ytsync/namer"
"github.com/lbryio/lbry.go/ytsync/sdk"
"github.com/lbryio/lbry.go/ytsync/sources"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/mitchellh/go-ps"
log "github.com/sirupsen/logrus"
"google.golang.org/api/googleapi/transport"
"google.golang.org/api/youtube/v3"
)
const (
channelClaimAmount = 0.01
publishAmount = 0.01
maxReasonLength = 500
)
type video interface {
Size() *int64
ID() string
IDAndNum() string
PlaylistPosition() int
PublishedAt() time.Time
Sync(*jsonrpc.Client, string, float64, string, int, *namer.Namer) (*sources.SyncSummary, error)
}
// sorting videos
type byPublishedAt []video
func (a byPublishedAt) Len() int { return len(a) }
func (a byPublishedAt) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byPublishedAt) Less(i, j int) bool { return a[i].PublishedAt().Before(a[j].PublishedAt()) }
// Sync stores the options that control how syncing happens
type Sync struct {
APIConfig *sdk.APIConfig
YoutubeChannelID string
LbryChannelName string
StopOnError bool
MaxTries int
ConcurrentVideos int
TakeOverExistingChannel bool
Refill int
Manager *SyncManager
LbrycrdString string
AwsS3ID string
AwsS3Secret string
AwsS3Region string
AwsS3Bucket string
daemon *jsonrpc.Client
claimAddress string
videoDirectory string
syncedVideosMux *sync.RWMutex
syncedVideos map[string]sdk.SyncedVideo
grp *stop.Group
lbryChannelID string
namer *namer.Namer
walletMux *sync.Mutex
queue chan video
}
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) {
s.syncedVideosMux.Lock()
defer s.syncedVideosMux.Unlock()
s.syncedVideos[videoID] = sdk.SyncedVideo{
VideoID: videoID,
Published: published,
FailureReason: failureReason,
}
}
// SendErrorToSlack Sends an error message to the default channel and to the process log.
func SendErrorToSlack(format string, a ...interface{}) error {
message := format
if len(a) > 0 {
message = fmt.Sprintf(format, a...)
}
log.Errorln(message)
return util.SendToSlack(":sos: " + message)
}
// SendInfoToSlack Sends an info message to the default channel and to the process log.
func SendInfoToSlack(format string, a ...interface{}) error {
message := format
if len(a) > 0 {
message = fmt.Sprintf(format, a...)
}
log.Infoln(message)
return util.SendToSlack(":information_source: " + message)
}
// IsInterrupted can be queried to discover if the sync process was interrupted manually
func (s *Sync) IsInterrupted() bool {
select {
case <-s.grp.Ch():
return true
default:
return false
}
}
func (s *Sync) downloadWallet() error {
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
defaultTempWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/tmp_wallet"
key := aws.String("/wallets/" + s.YoutubeChannelID)
if os.Getenv("REGTEST") == "true" {
defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
defaultTempWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/tmp_wallet"
key = aws.String("/regtest/" + s.YoutubeChannelID)
}
if _, err := os.Stat(defaultWalletDir); !os.IsNotExist(err) {
return errors.Err("default_wallet already exists")
}
creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "")
s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds})
if err != nil {
return err
}
downloader := s3manager.NewDownloader(s3Session)
out, err := os.Create(defaultTempWalletDir)
if err != nil {
return err
}
defer out.Close()
bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{
Bucket: aws.String(s.AwsS3Bucket),
Key: key,
})
if err != nil {
// Casting to the awserr.Error type will allow you to inspect the error
// code returned by the service in code. The error code can be used
// to switch on context specific functionality. In this case a context
// specific error message is printed to the user based on the bucket
// and key existing.
//
// For information on other S3 API error codes see:
// http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
if aerr, ok := err.(awserr.Error); ok {
code := aerr.Code()
if code == s3.ErrCodeNoSuchKey {
return errors.Err("wallet not on S3")
}
}
return err
} else if bytesWritten == 0 {
return errors.Err("zero bytes written")
}
return os.Rename(defaultTempWalletDir, defaultWalletDir)
}
func (s *Sync) uploadWallet() error {
defaultWalletDir := os.Getenv("HOME") + "/.lbryum/wallets/default_wallet"
key := aws.String("/wallets/" + s.YoutubeChannelID)
if os.Getenv("REGTEST") == "true" {
defaultWalletDir = os.Getenv("HOME") + "/.lbryum_regtest/wallets/default_wallet"
key = aws.String("/regtest/" + s.YoutubeChannelID)
}
if _, err := os.Stat(defaultWalletDir); os.IsNotExist(err) {
return errors.Err("default_wallet does not exist")
}
creds := credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, "")
s3Session, err := session.NewSession(&aws.Config{Region: aws.String(s.AwsS3Region), Credentials: creds})
if err != nil {
return err
}
uploader := s3manager.NewUploader(s3Session)
file, err := os.Open(defaultWalletDir)
if err != nil {
return err
}
defer file.Close()
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.AwsS3Bucket),
Key: key,
Body: file,
})
if err != nil {
return err
}
return os.Remove(defaultWalletDir)
}
func (s *Sync) setStatusSyncing() error {
syncedVideos, claimNames, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSyncing, "")
if err != nil {
return err
}
s.syncedVideosMux.Lock()
s.syncedVideos = syncedVideos
s.Manager.namer.SetNames(claimNames)
s.syncedVideosMux.Unlock()
return nil
}
func (s *Sync) FullCycle() (e error) {
if os.Getenv("HOME") == "" {
return errors.Err("no $HOME env var found")
}
if s.YoutubeChannelID == "" {
return errors.Err("channel ID not provided")
}
s.syncedVideosMux = &sync.RWMutex{}
s.walletMux = &sync.Mutex{}
s.grp = stop.New()
s.queue = make(chan video)
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(interruptChan)
go func() {
<-interruptChan
log.Println("Got interrupt signal, shutting down (if publishing, will shut down after current publish)")
s.grp.Stop()
}()
err := s.setStatusSyncing()
if err != nil {
return err
}
defer s.setChannelTerminationStatus(&e)
err = s.downloadWallet()
if err != nil && err.Error() != "wallet not on S3" {
return errors.Prefix("failure in downloading wallet", err)
} else if err == nil {
log.Println("Continuing previous upload")
} else {
log.Println("Starting new wallet")
}
defer s.stopAndUploadWallet(&e)
s.videoDirectory, err = ioutil.TempDir("", "ytsync")
if err != nil {
return errors.Wrap(err, 0)
}
log.Printf("Starting daemon")
err = startDaemonViaSystemd()
if err != nil {
return err
}
log.Infoln("Waiting for daemon to finish starting...")
s.daemon = jsonrpc.NewClient("")
s.daemon.SetRPCTimeout(40 * time.Minute)
err = s.waitForDaemonStart()
if err != nil {
return err
}
err = s.doSync()
if err != nil {
return err
} else {
// wait for reflection to finish???
wait := 15 * time.Second // should bump this up to a few min, but keeping it low for testing
log.Println("Waiting " + wait.String() + " to finish reflecting everything")
time.Sleep(wait)
}
return nil
}
func (s *Sync) setChannelTerminationStatus(e *error) {
if *e != nil {
//conditions for which a channel shouldn't be marked as failed
noFailConditions := []string{
"this youtube channel is being managed by another server",
}
if util.SubstringInSlice((*e).Error(), noFailConditions) {
return
}
failureReason := (*e).Error()
_, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusFailed, failureReason)
if err != nil {
msg := fmt.Sprintf("Failed setting failed state for channel %s", s.LbryChannelName)
*e = errors.Prefix(msg+err.Error(), *e)
}
} else if !s.IsInterrupted() {
_, _, err := s.Manager.apiConfig.SetChannelStatus(s.YoutubeChannelID, StatusSynced, "")
if err != nil {
*e = err
}
}
}
func (s *Sync) waitForDaemonStart() error {
for {
select {
case <-s.grp.Ch():
return errors.Err("interrupted during daemon startup")
default:
s, err := s.daemon.Status()
if err == nil && s.StartupStatus.Wallet && s.StartupStatus.FileManager {
return nil
}
time.Sleep(5 * time.Second)
}
}
}
func (s *Sync) stopAndUploadWallet(e *error) {
log.Printf("Stopping daemon")
shutdownErr := stopDaemonViaSystemd()
if shutdownErr != nil {
logShutdownError(shutdownErr)
} else {
// the cli will return long before the daemon effectively stops. we must observe the processes running
// before moving the wallet
waitTimeout := 8 * time.Minute
processDeathError := waitForDaemonProcess(waitTimeout)
if processDeathError != nil {
logShutdownError(processDeathError)
} else {
err := s.uploadWallet()
if err != nil {
if *e == nil {
e = &err //not 100% sure
return
} else {
*e = errors.Prefix("failure uploading wallet", *e)
}
}
}
}
}
func logShutdownError(shutdownErr error) {
SendErrorToSlack("error shutting down daemon: %v", shutdownErr)
SendErrorToSlack("WALLET HAS NOT BEEN MOVED TO THE WALLET BACKUP DIR")
}
// fixDupes abandons duplicate claims
func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) {
abandonedClaims := false
videoIDs := make(map[string]jsonrpc.Claim)
for _, c := range claims {
if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil {
continue
}
if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil {
return false, errors.Err("something is wrong with this claim: %s", c.ClaimID)
}
tn := *c.Value.Stream.Metadata.Thumbnail
videoID := tn[strings.LastIndex(tn, "/")+1:]
log.Infof("claimid: %s, claimName: %s, videoID: %s", c.ClaimID, c.Name, videoID)
cl, ok := videoIDs[videoID]
if !ok || cl.ClaimID == c.ClaimID {
videoIDs[videoID] = c
continue
}
// only keep the most recent one
claimToAbandon := c
videoIDs[videoID] = cl
if c.Height > cl.Height {
claimToAbandon = cl
videoIDs[videoID] = c
}
_, err := s.daemon.ClaimAbandon(claimToAbandon.Txid, claimToAbandon.Nout)
if err != nil {
return true, err
}
log.Debugf("abandoning %+v", claimToAbandon)
abandonedClaims = true
//return true, nil
}
return abandonedClaims, nil
}
//updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published
func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err error) {
count := 0
for _, c := range claims {
if !util.InSlice(c.Category, []string{"claim", "update"}) || c.Value.Stream == nil {
continue
}
if c.Value.Stream.Metadata == nil || c.Value.Stream.Metadata.Thumbnail == nil {
return count, fixed, errors.Err("something is wrong with the this claim: %s", c.ClaimID)
}
//check if claimID is in remote db
tn := *c.Value.Stream.Metadata.Thumbnail
videoID := tn[strings.LastIndex(tn, "/")+1:]
pv, ok := s.syncedVideos[videoID]
if !ok || pv.ClaimName != c.Name {
fixed++
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil)
if err != nil {
return total, fixed, err
}
}
total++
}
return total, fixed, nil
}
func (s *Sync) doSync() error {
var err error
claims, err := s.daemon.ClaimListMine()
if err != nil {
return errors.Prefix("cannot list claims", err)
}
hasDupes, err := s.fixDupes(*claims)
if err != nil {
return errors.Prefix("error checking for duplicates", err)
}
if hasDupes {
SendInfoToSlack("Channel had dupes and was fixed!")
err = s.waitForNewBlock()
if err != nil {
return err
}
claims, err = s.daemon.ClaimListMine()
if err != nil {
return errors.Prefix("cannot list claims", err)
}
}
pubsOnWallet, nFixed, err := s.updateRemoteDB(*claims)
if err != nil {
return errors.Prefix("error counting claims", err)
}
if nFixed > 0 {
err := s.setStatusSyncing()
if err != nil {
return err
}
SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed)
}
pubsOnDB := 0
for _, sv := range s.syncedVideos {
if sv.Published {
pubsOnDB++
}
}
if pubsOnWallet > pubsOnDB { //This case should never happen
SendInfoToSlack("We're claiming to have published %d videos but in reality we published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
return errors.Err("not all published videos are in the database")
}
if pubsOnWallet < pubsOnDB {
SendInfoToSlack("we're claiming to have published %d videos but we only published %d (%s)", pubsOnDB, pubsOnWallet, s.YoutubeChannelID)
}
err = s.walletSetup()
if err != nil {
return errors.Prefix("Initial wallet setup failed! Manual Intervention is required.", err)
}
if s.StopOnError {
log.Println("Will stop publishing if an error is detected")
}
for i := 0; i < s.ConcurrentVideos; i++ {
s.grp.Add(1)
go func(i int) {
defer s.grp.Done()
s.startWorker(i)
}(i)
}
if s.LbryChannelName == "@UCBerkeley" {
err = s.enqueueUCBVideos()
} else {
err = s.enqueueYoutubeVideos()
}
close(s.queue)
s.grp.Wait()
return err
}
func (s *Sync) startWorker(workerNum int) {
var v video
var more bool
for {
select {
case <-s.grp.Ch():
log.Printf("Stopping worker %d", workerNum)
return
default:
}
select {
case v, more = <-s.queue:
if !more {
return
}
case <-s.grp.Ch():
log.Printf("Stopping worker %d", workerNum)
return
}
log.Println("================================================================================")
tryCount := 0
for {
tryCount++
err := s.processVideo(v)
if err != nil {
logMsg := fmt.Sprintf("error processing video: " + err.Error())
log.Errorln(logMsg)
fatalErrors := []string{
":5279: read: connection reset by peer",
"no space left on device",
"NotEnoughFunds",
"Cannot publish using channel",
"cannot concatenate 'str' and 'NoneType' objects",
"more than 90% of the space has been used.",
}
if util.SubstringInSlice(err.Error(), fatalErrors) || s.StopOnError {
s.grp.Stop()
} else if s.MaxTries > 1 {
errorsNoRetry := []string{
"non 200 status code received",
" reason: 'This video contains content from",
"dont know which claim to update",
"uploader has not made this video available in your country",
"download error: AccessDenied: Access Denied",
"Playback on other websites has been disabled by the video owner",
"Error in daemon: Cannot publish empty file",
"Error extracting sts from embedded url response",
"Unable to extract signature tokens",
"Client.Timeout exceeded while awaiting headers)",
"the video is too big to sync, skipping for now",
}
if util.SubstringInSlice(err.Error(), errorsNoRetry) {
log.Println("This error should not be retried at all")
} else if tryCount < s.MaxTries {
if strings.Contains(err.Error(), "txn-mempool-conflict") ||
strings.Contains(err.Error(), "too-long-mempool-chain") {
log.Println("waiting for a block before retrying")
err = s.waitForNewBlock()
if err != nil {
s.grp.Stop()
SendErrorToSlack("something went wrong while waiting for a block: %v", err)
break
}
} else if strings.Contains(err.Error(), "failed: Not enough funds") ||
strings.Contains(err.Error(), "Error in daemon: Insufficient funds, please deposit additional LBC") {
log.Println("refilling addresses before retrying")
err = s.walletSetup()
if err != nil {
s.grp.Stop()
SendErrorToSlack("failed to setup the wallet for a refill: %v", err)
break
}
}
log.Println("Retrying")
continue
}
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
}
s.AppendSyncedVideo(v.ID(), false, err.Error(), "")
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}
}
break
}
}
}
func (s *Sync) enqueueYoutubeVideos() error {
client := &http.Client{
Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey},
}
service, err := youtube.New(client)
if err != nil {
return errors.Prefix("error creating YouTube service", err)
}
response, err := service.Channels.List("contentDetails").Id(s.YoutubeChannelID).Do()
if err != nil {
return errors.Prefix("error getting channels", err)
}
if len(response.Items) < 1 {
return errors.Err("youtube channel not found")
}
if response.Items[0].ContentDetails.RelatedPlaylists == nil {
return errors.Err("no related playlists")
}
playlistID := response.Items[0].ContentDetails.RelatedPlaylists.Uploads
if playlistID == "" {
return errors.Err("no channel playlist")
}
var videos []video
nextPageToken := ""
for {
req := service.PlaylistItems.List("snippet").
PlaylistId(playlistID).
MaxResults(50).
PageToken(nextPageToken)
playlistResponse, err := req.Do()
if err != nil {
return errors.Prefix("error getting playlist items", err)
}
if len(playlistResponse.Items) < 1 {
return errors.Err("playlist items not found")
}
for _, item := range playlistResponse.Items {
// normally we'd send the video into the channel here, but youtube api doesn't have sorting
// so we have to get ALL the videos, then sort them, then send them in
videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item.Snippet))
}
log.Infof("Got info for %d videos from youtube API", len(videos))
nextPageToken = playlistResponse.NextPageToken
if nextPageToken == "" {
break
}
}
sort.Sort(byPublishedAt(videos))
//or sort.Sort(sort.Reverse(byPlaylistPosition(videos)))
Enqueue:
for _, v := range videos {
select {
case <-s.grp.Ch():
break Enqueue
default:
}
select {
case s.queue <- v:
case <-s.grp.Ch():
break Enqueue
}
}
return nil
}
func (s *Sync) enqueueUCBVideos() error {
var videos []video
csvFile, err := os.Open("ucb.csv")
if err != nil {
return err
}
reader := csv.NewReader(bufio.NewReader(csvFile))
for {
line, err := reader.Read()
if err == io.EOF {
break
} else if err != nil {
return err
}
data := struct {
PublishedAt string `json:"publishedAt"`
}{}
err = json.Unmarshal([]byte(line[4]), &data)
if err != nil {
return err
}
videos = append(videos, sources.NewUCBVideo(line[0], line[2], line[1], line[3], data.PublishedAt, s.videoDirectory))
}
log.Printf("Publishing %d videos\n", len(videos))
sort.Sort(byPublishedAt(videos))
Enqueue:
for _, v := range videos {
select {
case <-s.grp.Ch():
break Enqueue
default:
}
select {
case s.queue <- v:
case <-s.grp.Ch():
break Enqueue
}
}
return nil
}
func (s *Sync) processVideo(v video) (err error) {
defer func() {
if p := recover(); p != nil {
var ok bool
err, ok = p.(error)
if !ok {
err = errors.Err("%v", p)
}
err = errors.Wrap(p, 2)
}
}()
log.Println("Processing " + v.IDAndNum())
defer func(start time.Time) {
log.Println(v.ID() + " took " + time.Since(start).String())
}(time.Now())
s.syncedVideosMux.RLock()
sv, ok := s.syncedVideos[v.ID()]
s.syncedVideosMux.RUnlock()
alreadyPublished := ok && sv.Published
neverRetryFailures := []string{
"Error extracting sts from embedded url response",
"Unable to extract signature tokens",
"the video is too big to sync, skipping for now",
}
if ok && !sv.Published && util.SubstringInSlice(sv.FailureReason, neverRetryFailures) {
log.Println(v.ID() + " can't ever be published")
return nil
}
if alreadyPublished {
log.Println(v.ID() + " already published")
return nil
}
if v.PlaylistPosition() > s.Manager.videosLimit {
log.Println(v.ID() + " is old: skipping")
return nil
}
err = s.Manager.checkUsedSpace()
if err != nil {
return err
}
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.maxVideoSize, s.namer)
if err != nil {
return err
}
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}
s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName)
return nil
}
func startDaemonViaSystemd() error {
err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "start", "lbrynet.service").Run()
if err != nil {
return errors.Err(err)
}
return nil
}
func stopDaemonViaSystemd() error {
err := exec.Command("/usr/bin/sudo", "/bin/systemctl", "stop", "lbrynet.service").Run()
if err != nil {
return errors.Err(err)
}
return nil
}
// waitForDaemonProcess observes the running processes and returns when the process is no longer running or when the timeout is up
func waitForDaemonProcess(timeout time.Duration) error {
processes, err := ps.Processes()
if err != nil {
return err
}
var daemonProcessId = -1
for _, p := range processes {
if p.Executable() == "lbrynet-daemon" {
daemonProcessId = p.Pid()
break
}
}
if daemonProcessId == -1 {
return nil
}
then := time.Now()
stopTime := then.Add(time.Duration(timeout * time.Second))
for !time.Now().After(stopTime) {
wait := 10 * time.Second
log.Println("the daemon is still running, waiting for it to exit")
time.Sleep(wait)
proc, err := os.FindProcess(daemonProcessId)
if err != nil {
// couldn't find the process, that means the daemon is stopped and can continue
return nil
}
//double check if process is running and alive
//by sending a signal 0
//NOTE : syscall.Signal is not available in Windows
err = proc.Signal(syscall.Signal(0))
//the process doesn't exist anymore! we're free to go
if err != nil && (err == syscall.ESRCH || err.Error() == "os: process already finished") {
return nil
}
}
return errors.Err("timeout reached")
}