Add support for new SDK (0.37.*) and support for upgrading channels and claims to new metadata #28
16 changed files with 2395 additions and 704 deletions
|
@ -9,7 +9,7 @@ env:
|
|||
global:
|
||||
- GO111MODULE=on
|
||||
#GITHUB_TOKEN
|
||||
- secure: "+G0TxyGyRYJb0L0qCCn6NPregigwhalkS9+/3mkV41xoAwUlEU9aclSJQVKKqeMrV0PkmJ6KrzYD78tnRj8Q5ZDe93LBDbc/WhXZ91JbRmIXIsVktz/78R2a2Q0DBrXMlyWXS84QouRfI3eyxf2BusERSYdr3H6BL4r15PIHJEmLGOchHsHwSI46PRGag+IFnbUb+fyUSs7er6gKeSp4+ZK1bSUgxiMUQrZfImYI9LS+cgqpDh67pX5jRbDNBihZZUwfreToYw3RbIAom4i2oix1rBBD/jpcTwO+WdyXFtur8RYGnsknePU88KiAB9bcXU4T2xVxR9wusK6jX5G2BChEOG9PjuVfeDqXBnskOEtAevwAKJRYW6ATmXqQZ+TAy1DFxsUWPVS7ykIA+69n7HzE7xu50AB1U2n6myT977i4jhBzw3VrB9NQz//4b2j6MCG1PhIgB38kffQWVsa/CF4RiiWi6a7ePRsPP0HdRUbXFQJWPevKgVJ9ve/ArQKiea1rP9zy9KdFAP30vESBkn4WMOCjfg7F6JbfV3e1ywBq9XMznj14iAZHwXR1tpPp0GekuMlVJHJWa0Ea/dfDGWshnr1Be71vc9T118w7sd0v1Dp4EQCFa4B0Hx7Ual7uWgeuweL/JLr1Am3BgWVcXOWrhzMT0b1tGFa9ZNz2Wwk="
|
||||
- secure: "Ps3KocRP5xnM3/uA99CeYhDTVxRIuW7fGyrtqBeRWZW0cXzeA4XCTKxqcFbrPUPw67XkrBVgE58JDdWoQEJ7tm67PjMm/ltp5Evhx/QAJDh+YSofXyGDVpG1mrTZFI66R3NVVJLkSGALMkuWWXvfYZeU//AworJbyRoaIK/CVt5OP23i5N4tdd5UXc5dfLuYqnKRynyMmCkz9c3yEIQMXoPhG2hx7l7L2BeMJvcKmVhkSN7nQayjnrbUXGm/IRqrb88lvkyBevN5E3IB2V5IKEieIPZjbD/N0IfcnAt89Z96tgDhtIbx3ZvXm92lsvHA8buqQpG9d2AmSi6GKs64lQcnGeM5o0wER2JHWl1OSa1Nr/UAo5Xb/PM65Yt3yZE8AuMKHBmbfDSBzdkTXx58AeDzFUd3kMXD/fFjeQQWyXFlOss3ygH9SObl827Txmz9OJqZaxabs5Q3AP6m3EjKjz7zfLfrgpcxJM2WBiU1bN0ZxUgZkImy/CHk5gCZ7vhcnaLiDO4HZnzY/aRJwKYQPE5i0O2nHpIfovqkc0DFBA7U/7Cjin7e1E0UZvF3meLOxMqkfc6X7QTxqQpt2Tej6jlpdxw4CTLwGUhGkAw9IAPkUB3L0EbZ1/ksGhNvGDvUeSTq8hYdMAPmA+k9jS6653V4SQ+qBMy5++tbr5AeZQI="
|
||||
deploy:
|
||||
provider: script
|
||||
skip_cleanup: true
|
||||
|
|
|
@ -55,7 +55,7 @@ Usage:
|
|||
|
||||
Flags:
|
||||
--after int Specify from when to pull jobs [Unix time](Default: 0)
|
||||
--before int Specify until when to pull jobs [Unix time](Default: current Unix time) (default 1582834707)
|
||||
--before int Specify until when to pull jobs [Unix time](Default: current Unix time) (default current timestamp)
|
||||
--channelID string If specified, only this channel will be synced.
|
||||
--concurrent-jobs int how many jobs to process concurrently (default 1)
|
||||
-h, --help help for ytsync
|
||||
|
@ -63,12 +63,14 @@ Flags:
|
|||
--max-length float Maximum video length to process (in hours) (default 2)
|
||||
--max-size int Maximum video size to process (in MB) (default 2048)
|
||||
--max-tries int Number of times to try a publish that fails (default 3)
|
||||
--remove-db-unpublished Remove videos from the database that are marked as published but aren't really published
|
||||
--run-once Whether the process should be stopped after one cycle or not
|
||||
--skip-space-check Do not perform free space check on startup
|
||||
--status string Specify which queue to pull from. Overrides --update
|
||||
--stop-on-error If a publish fails, stop all publishing and exit
|
||||
--takeover-existing-channel If channel exists and we don't own it, take over the channel
|
||||
--update Update previously synced channels instead of syncing new ones
|
||||
--upgrade-metadata Upgrade videos if they're on the old metadata version
|
||||
--videos-limit int how many videos to process per channel (default 1000)
|
||||
```
|
||||
|
||||
|
|
5
go.mod
5
go.mod
|
@ -5,8 +5,6 @@ require (
|
|||
github.com/ChannelMeter/iso8601duration v0.0.0-20150204201828-8da3af7a2a61
|
||||
github.com/PuerkitoBio/goquery v1.5.0 // indirect
|
||||
github.com/aws/aws-sdk-go v1.17.3
|
||||
github.com/btcsuite/btcd v0.0.0-20190410025418-9bfb2ca0346b // indirect
|
||||
github.com/btcsuite/btcutil v0.0.0-20190316010144-3ac1210f4b38 // indirect
|
||||
github.com/channelmeter/iso8601duration v0.0.0-20150204201828-8da3af7a2a61 // indirect
|
||||
github.com/go-ini/ini v1.42.0 // indirect
|
||||
github.com/golang/protobuf v1.3.1 // indirect
|
||||
|
@ -17,7 +15,7 @@ require (
|
|||
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
|
||||
github.com/kr/pretty v0.1.0 // indirect
|
||||
github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c
|
||||
github.com/lbryio/lbry.go v0.0.0-20190422142237-ad33acfc936f
|
||||
github.com/lbryio/lbry.go v1.0.14
|
||||
github.com/lusis/slack-test v0.0.0-20190408224659-6cf59653add2 // indirect
|
||||
github.com/mitchellh/go-ps v0.0.0-20170309133038-4fdf99ab2936
|
||||
github.com/mitchellh/mapstructure v1.1.2 // indirect
|
||||
|
@ -36,7 +34,6 @@ require (
|
|||
golang.org/x/crypto v0.0.0-20190418165655-df01cb2cc480 // indirect
|
||||
golang.org/x/net v0.0.0-20190415214537-1da14a5a36f2 // indirect
|
||||
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a // indirect
|
||||
golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be // indirect
|
||||
google.golang.org/api v0.3.2
|
||||
google.golang.org/appengine v1.5.0 // indirect
|
||||
google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7 // indirect
|
||||
|
|
42
go.sum
42
go.sum
|
@ -9,6 +9,7 @@ github.com/PuerkitoBio/goquery v1.5.0 h1:uGvmFXOA73IKluu/F84Xd1tt/z07GYm8X49XKHP
|
|||
github.com/PuerkitoBio/goquery v1.5.0/go.mod h1:qD2PgZ9lccMbQlc7eEOjaeRlFQON7xY8kdmcsrnKqMg=
|
||||
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
|
||||
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
|
||||
github.com/aead/siphash v1.0.1 h1:FwHfE/T45KPKYuuSAKyyvE+oPWcaQ+CUmFW0bPlM+kg=
|
||||
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
|
||||
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
|
||||
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
|
@ -17,23 +18,22 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
|
|||
github.com/andybalholm/cascadia v1.0.0 h1:hOCXnnZ5A+3eVDX8pvgl4kofXv2ELss0bKcqRySc45o=
|
||||
github.com/andybalholm/cascadia v1.0.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
|
||||
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
|
||||
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf h1:eg0MeVzsP1G42dRafH3vf+al2vQIJU0YHX+1Tw87oco=
|
||||
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
|
||||
github.com/aws/aws-sdk-go v1.17.3 h1:KBXxg7Jh0TxE5zmpNB2DwKmJeDUqh0O6jhy25TuYOmc=
|
||||
github.com/aws/aws-sdk-go v1.17.3/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
|
||||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
|
||||
github.com/btcsuite/btcd v0.0.0-20180531025944-86fed781132a/go.mod h1:Dmm/EzmjnCiweXmzRIAiUWCInVmPgjkzgv5k4tVyXiQ=
|
||||
github.com/btcsuite/btcd v0.0.0-20190410025418-9bfb2ca0346b h1:7J7sEce3LgtbMgs7PKcN61gF3b4txM6SjaRoJTSk640=
|
||||
github.com/btcsuite/btcd v0.0.0-20190410025418-9bfb2ca0346b/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
|
||||
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78=
|
||||
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
|
||||
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f h1:bAs4lUbRJpnnkd9VhRV3jjAVU7DJVjMaK+IsvSeZvFo=
|
||||
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
|
||||
github.com/btcsuite/btcutil v0.0.0-20180524032703-d4cc87b86016/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
|
||||
github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803 h1:j3AgPKKZtZStM2nyhrDSLSYgT7YHrZKdSkq1OYeLjvM=
|
||||
github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
|
||||
github.com/btcsuite/btcutil v0.0.0-20190316010144-3ac1210f4b38 h1:GbQHMJ2u/geMPV1tbN7i7zARSoPAPuXWa44V0KYvJXU=
|
||||
github.com/btcsuite/btcutil v0.0.0-20190316010144-3ac1210f4b38/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
|
||||
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd h1:R/opQEbFEy9JGkIguV40SvRY1uliPX8ifOvi6ICsFCw=
|
||||
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg=
|
||||
github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd h1:qdGvebPBDuYDPGi1WCPjy1tGyMpmDK8IEapSsszn7HE=
|
||||
github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY=
|
||||
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723 h1:ZA/jbKoGcVAnER6pCHPEkGdZOV7U1oLUedErBHCUMs0=
|
||||
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
|
||||
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3SkEwmHoWBmX1DNXhXZqlTpq6s4tyJGc=
|
||||
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
|
||||
|
@ -60,6 +60,7 @@ github.com/go-ini/ini v1.42.0 h1:TWr1wGj35+UiWHlBA8er89seFXxzwFn11spilrrj+38=
|
|||
github.com/go-ini/ini v1.42.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
|
||||
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
|
||||
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
|
||||
github.com/go-ozzo/ozzo-validation v3.5.0+incompatible h1:sUy/in/P6askYr16XJgTKq/0SZhiWsdg4WZGaLsGQkM=
|
||||
github.com/go-ozzo/ozzo-validation v3.5.0+incompatible/go.mod h1:gsEKFIVnabGBt6mXmxK0MoFy+cZoTJY6mu5Ll3LVLBU=
|
||||
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
|
||||
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
|
||||
|
@ -101,9 +102,11 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
|
|||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
|
||||
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
|
||||
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89 h1:12K8AlpT0/6QUXSfV0yi4Q0jkbq8NDtIKFtF61AoqV0=
|
||||
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
|
||||
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
|
||||
github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI=
|
||||
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
|
||||
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
|
||||
github.com/jtolds/gls v4.2.1+incompatible h1:fSuqC+Gmlu6l/ZYAoZzx2pyucC8Xza35fpRVWLVmUEE=
|
||||
|
@ -112,6 +115,7 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
|
|||
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
||||
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 h1:FOOIBWrEkLgmlgGfMuZT83xIwfPDxEI2OHu6xUmJMFE=
|
||||
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
|
@ -125,17 +129,12 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
|||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c h1:BhdcWGsuKif/XoSZnqVGNqJ1iEmH0czWR5upj+AuR8M=
|
||||
github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c/go.mod h1:muH7wpUqE8hRA3OrYYosw9+Sl681BF9cwcjzE+OCNK8=
|
||||
github.com/lbryio/lbry.go v0.0.0-20190419005332-80b25b225e18 h1:ZhWjtvaq5r7julhcF9OSgx4bLv9UsdIx27zH1/fbrEc=
|
||||
github.com/lbryio/lbry.go v0.0.0-20190419005332-80b25b225e18/go.mod h1:kd08aOMCuBVYJ3EafY4Kx3dRAWWQYhobJ9beREgsaRI=
|
||||
github.com/lbryio/lbry.go v0.0.0-20190422142237-ad33acfc936f h1:o6EZ7bAdYrS6pKp85SEr6Ywyy2JDJS1CY3ChkVsvSM4=
|
||||
github.com/lbryio/lbry.go v0.0.0-20190422142237-ad33acfc936f/go.mod h1:FubnMAYvLt2jGasG7BrQsokYHZ2wpNtWethPHUVauMc=
|
||||
github.com/lbryio/lbryschema.go v0.0.0-20190422030648-322c658307e0 h1:/YWLlbbDefRGLs/ozyuRpvpwpFISYehwR4AAVlPthA8=
|
||||
github.com/lbryio/lbryschema.go v0.0.0-20190422030648-322c658307e0/go.mod h1:dAzPCBj3CKKWBGYBZxK6tKBP5SCgY2tqd9SnQd/OyKo=
|
||||
github.com/lbryio/lbry.go v1.0.14 h1:lpaO96YyP3d2RJzgl1WFkcyS15/ROd04OV3S1E5Av8E=
|
||||
github.com/lbryio/lbry.go v1.0.14/go.mod h1:JtyI30bU51rm0LZ/po3mQuzf++14OWb6kR/6mMRAmKU=
|
||||
github.com/lbryio/lbryschema.go v0.0.0-20190428231007-c54836bca002 h1:urfYK5ElpUrAv90auPLldoVC60LwiGAcY0OE6HJB9KI=
|
||||
github.com/lbryio/lbryschema.go v0.0.0-20190428231007-c54836bca002/go.mod h1:dAzPCBj3CKKWBGYBZxK6tKBP5SCgY2tqd9SnQd/OyKo=
|
||||
github.com/lbryio/ozzo-validation v0.0.0-20170323141101-d1008ad1fd04 h1:Nze+C2HbeKvhjI/kVn+9Poj/UuEW5sOQxcsxqO7L3GI=
|
||||
github.com/lbryio/ozzo-validation v0.0.0-20170323141101-d1008ad1fd04/go.mod h1:fbG/dzobG8r95KzMwckXiLMHfFjZaBRQqC9hPs2XAQ4=
|
||||
github.com/lbryio/types v0.0.0-20190405005919-54c3c28f676a h1:twWvrsBDvSb+qnmpSq3nvFrodgC5PpXUipyo4T/W790=
|
||||
github.com/lbryio/types v0.0.0-20190405005919-54c3c28f676a/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE=
|
||||
github.com/lbryio/types v0.0.0-20190415181811-35ddf1afe731 h1:iERWR8Dkng30eRInI7gzolTEJBW9nBSK/sT+Z9aSipI=
|
||||
github.com/lbryio/types v0.0.0-20190415181811-35ddf1afe731/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE=
|
||||
github.com/lbryio/types v0.0.0-20190422033210-321fb2abda9c h1:m3O7561xBQ00lfUVayW4c6SnpVbUDQtPUwGcGYSUYQA=
|
||||
github.com/lbryio/types v0.0.0-20190422033210-321fb2abda9c/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE=
|
||||
github.com/lusis/go-slackbot v0.0.0-20180109053408-401027ccfef5 h1:AsEBgzv3DhuYHI/GiQh2HxvTP71HCCE9E/tzGUzGdtU=
|
||||
|
@ -184,11 +183,11 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
|
|||
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||
github.com/sebdah/goldie v0.0.0-20180424091453-8784dd1ab561/go.mod h1:lvjGftC8oe7XPtyrOidaMi0rp5B9+XY/ZRUynGnuaxQ=
|
||||
github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
|
||||
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
|
||||
github.com/shopspring/decimal v0.0.0-20180607144847-19e3cb6c2930/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
|
||||
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE=
|
||||
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
|
||||
github.com/sirupsen/logrus v0.0.0-20180523074243-ea8897e79973/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
|
||||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
|
||||
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
|
||||
|
@ -198,6 +197,7 @@ github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3/go.mod h1
|
|||
github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s=
|
||||
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2YqsZKovtsgrwcDH1RZhVbTKCjLp47XpqCDs=
|
||||
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
|
||||
github.com/spf13/cast v1.2.0 h1:HHl1DSRbEQN2i8tJmtS6ViPyHx35+p51amrdsiTCrkg=
|
||||
github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg=
|
||||
github.com/spf13/cobra v0.0.0-20190109003409-7547e83b2d85 h1:UQHWkFUuJBy5rWN1DxosG/efssLu7u0fXXSTC2HHKfQ=
|
||||
github.com/spf13/cobra v0.0.0-20190109003409-7547e83b2d85/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
|
||||
|
@ -217,7 +217,6 @@ go.opencensus.io v0.20.2 h1:NAfh7zF0/3/HqtMvJNZ/RFrSlCE6ZTlHmKfhL/Dm1Jk=
|
|||
go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
|
||||
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20180608092829-8ac0e0d97ce4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20190418165655-df01cb2cc480 h1:O5YqonU5IWby+w98jVUG9h7zlCWCcH4RHyPVReBmhzk=
|
||||
|
@ -255,8 +254,8 @@ golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5h
|
|||
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be h1:mI+jhqkn68ybP0ORJqunXn+fq+Eeb4hHKqLQcFICjAc=
|
||||
golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190520201301-c432e742b0af h1:NXfmMfXz6JqGfG3ikSxcz2N93j6DgScr19Oo2uwFu88=
|
||||
golang.org/x/sys v0.0.0-20190520201301-c432e742b0af/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2 h1:z99zHgr7hKfrUcX/KsoJk5FJfjTceCKIp96+biqP4To=
|
||||
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
|
@ -285,7 +284,6 @@ google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3
|
|||
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
||||
google.golang.org/grpc v1.20.0 h1:DlsSIrgEBuZAUFJcta2B5i/lzeHHbnfkNFAfFXLVFYQ=
|
||||
google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM=
|
||||
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
|
||||
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
|
||||
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
@ -293,7 +291,6 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33
|
|||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
|
||||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
|
||||
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo=
|
||||
gopkg.in/ini.v1 v1.41.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
|
||||
gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk=
|
||||
gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
|
||||
|
@ -304,6 +301,7 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD
|
|||
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
|
||||
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
|
||||
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
|
|
6
main.go
6
main.go
|
@ -36,6 +36,8 @@ var (
|
|||
videosLimit int
|
||||
maxVideoSize int
|
||||
maxVideoLength float64
|
||||
removeDBUnpublished bool
|
||||
upgradeMetadata bool
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
@ -56,6 +58,8 @@ func main() {
|
|||
cmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
|
||||
cmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones")
|
||||
cmd.Flags().BoolVar(&singleRun, "run-once", false, "Whether the process should be stopped after one cycle or not")
|
||||
cmd.Flags().BoolVar(&removeDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published")
|
||||
cmd.Flags().BoolVar(&upgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version")
|
||||
cmd.Flags().StringVar(&syncStatus, "status", "", "Specify which queue to pull from. Overrides --update")
|
||||
cmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.")
|
||||
cmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
|
||||
|
@ -188,6 +192,8 @@ func ytSync(cmd *cobra.Command, args []string) {
|
|||
syncProperties,
|
||||
apiConfig,
|
||||
maxVideoLength,
|
||||
removeDBUnpublished,
|
||||
upgradeMetadata,
|
||||
)
|
||||
err := sm.Start()
|
||||
if err != nil {
|
||||
|
|
|
@ -11,6 +11,9 @@ import (
|
|||
|
||||
"github.com/lbryio/lbry.go/extras/errors"
|
||||
"github.com/lbryio/lbry.go/extras/util"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
|
@ -37,12 +40,14 @@ type SyncManager struct {
|
|||
singleRun bool
|
||||
syncProperties *sdk.SyncProperties
|
||||
apiConfig *sdk.APIConfig
|
||||
removeDBUnpublished bool
|
||||
upgradeMetadata bool
|
||||
}
|
||||
|
||||
func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool, refill int, limit int,
|
||||
skipSpaceCheck bool, syncUpdate bool, concurrentJobs int, concurrentVideos int, blobsDir string, videosLimit int,
|
||||
maxVideoSize int, lbrycrdString string, awsS3ID string, awsS3Secret string, awsS3Region string, awsS3Bucket string,
|
||||
syncStatus string, singleRun bool, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig, maxVideoLength float64) *SyncManager {
|
||||
syncStatus string, singleRun bool, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig, maxVideoLength float64, removeDBUnpublished bool, upgradeMetadata bool) *SyncManager {
|
||||
return &SyncManager{
|
||||
stopOnError: stopOnError,
|
||||
maxTries: maxTries,
|
||||
|
@ -66,26 +71,30 @@ func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool
|
|||
singleRun: singleRun,
|
||||
syncProperties: syncProperties,
|
||||
apiConfig: apiConfig,
|
||||
removeDBUnpublished: removeDBUnpublished,
|
||||
upgradeMetadata: upgradeMetadata,
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
StatusPending = "pending" // waiting for permission to sync
|
||||
StatusPendingEmail = "pendingemail" // permission granted but missing email
|
||||
StatusQueued = "queued" // in sync queue. will be synced soon
|
||||
StatusSyncing = "syncing" // syncing now
|
||||
StatusSynced = "synced" // done
|
||||
StatusFailed = "failed"
|
||||
StatusFinalized = "finalized" // no more changes allowed
|
||||
StatusAbandoned = "abandoned" // deleted on youtube or banned
|
||||
StatusPending = "pending" // waiting for permission to sync
|
||||
StatusPendingEmail = "pendingemail" // permission granted but missing email
|
||||
StatusQueued = "queued" // in sync queue. will be synced soon
|
||||
StatusPendingUpgrade = "pendingupgrade" // in sync queue. will be synced soon
|
||||
StatusSyncing = "syncing" // syncing now
|
||||
StatusSynced = "synced" // done
|
||||
StatusFailed = "failed"
|
||||
StatusFinalized = "finalized" // no more changes allowed
|
||||
StatusAbandoned = "abandoned" // deleted on youtube or banned
|
||||
)
|
||||
|
||||
var SyncStatuses = []string{StatusPending, StatusPendingEmail, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized, StatusAbandoned}
|
||||
var SyncStatuses = []string{StatusPending, StatusPendingEmail, StatusPendingUpgrade, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized, StatusAbandoned}
|
||||
|
||||
const (
|
||||
VideoStatusPublished = "published"
|
||||
VideoStatusFailed = "failed"
|
||||
VideoStatusUnpublished = "unpublished"
|
||||
VideoStatusPublished = "published"
|
||||
VideoStatusFailed = "failed"
|
||||
VideoStatusUpgradeFailed = "upgradefailed"
|
||||
VideoStatusUnpublished = "unpublished"
|
||||
)
|
||||
|
||||
func (s *SyncManager) Start() error {
|
||||
|
@ -128,10 +137,12 @@ func (s *SyncManager) Start() error {
|
|||
AwsS3Region: s.awsS3Region,
|
||||
AwsS3Bucket: s.awsS3Bucket,
|
||||
namer: namer.NewNamer(),
|
||||
Fee: channels[0].Fee,
|
||||
}
|
||||
shouldInterruptLoop = true
|
||||
} else {
|
||||
var queuesToSync []string
|
||||
//TODO: implement scrambling to avoid starvation of queues
|
||||
if s.syncStatus != "" {
|
||||
queuesToSync = append(queuesToSync, s.syncStatus)
|
||||
} else if s.syncUpdate {
|
||||
|
@ -144,7 +155,9 @@ func (s *SyncManager) Start() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, c := range channels {
|
||||
log.Infof("There are %d channels in the \"%s\" queue", len(channels), q)
|
||||
if len(channels) > 0 {
|
||||
c := channels[0]
|
||||
syncs = append(syncs, Sync{
|
||||
APIConfig: s.apiConfig,
|
||||
YoutubeChannelID: c.ChannelId,
|
||||
|
@ -162,7 +175,9 @@ func (s *SyncManager) Start() error {
|
|||
AwsS3Region: s.awsS3Region,
|
||||
AwsS3Bucket: s.awsS3Bucket,
|
||||
namer: namer.NewNamer(),
|
||||
Fee: c.Fee,
|
||||
})
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -170,9 +185,9 @@ func (s *SyncManager) Start() error {
|
|||
log.Infoln("No channels to sync. Pausing 5 minutes!")
|
||||
time.Sleep(5 * time.Minute)
|
||||
}
|
||||
for i, sync := range syncs {
|
||||
for _, sync := range syncs {
|
||||
shouldNotCount := false
|
||||
SendInfoToSlack("Syncing %s (%s) to LBRY! (iteration %d/%d - total processed channels: %d)", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1)
|
||||
SendInfoToSlack("Syncing %s (%s) to LBRY! total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1)
|
||||
err := sync.FullCycle()
|
||||
if err != nil {
|
||||
fatalErrors := []string{
|
||||
|
@ -192,7 +207,7 @@ func (s *SyncManager) Start() error {
|
|||
SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error())
|
||||
}
|
||||
}
|
||||
SendInfoToSlack("Syncing %s (%s) reached an end. (iteration %d/%d - total processed channels: %d)", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1)
|
||||
SendInfoToSlack("Syncing %s (%s) reached an end. total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1)
|
||||
if !shouldNotCount {
|
||||
syncCount++
|
||||
}
|
||||
|
@ -207,7 +222,12 @@ func (s *SyncManager) Start() error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SyncManager) GetS3AWSConfig() aws.Config {
|
||||
return aws.Config{
|
||||
Credentials: credentials.NewStaticCredentials(s.awsS3ID, s.awsS3Secret, ""),
|
||||
Region: &s.awsS3Region,
|
||||
}
|
||||
}
|
||||
func (s *SyncManager) checkUsedSpace() error {
|
||||
usedPctile, err := GetUsedSpace(s.blobsDir)
|
||||
if err != nil {
|
||||
|
|
181
manager/setup.go
181
manager/setup.go
|
@ -2,17 +2,18 @@ package manager
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/lbryio/lbry.go/extras/errors"
|
||||
"github.com/lbryio/lbry.go/extras/jsonrpc"
|
||||
"github.com/lbryio/lbry.go/extras/util"
|
||||
"github.com/lbryio/lbry.go/lbrycrd"
|
||||
|
||||
"github.com/lbryio/ytsync/tagsManager"
|
||||
"github.com/lbryio/ytsync/thumbs"
|
||||
|
||||
"github.com/shopspring/decimal"
|
||||
|
@ -21,8 +22,28 @@ import (
|
|||
"google.golang.org/api/youtube/v3"
|
||||
)
|
||||
|
||||
func (s *Sync) enableAddressReuse() error {
|
||||
accountsResponse, err := s.daemon.AccountList()
|
||||
if err != nil {
|
||||
return errors.Err(err)
|
||||
}
|
||||
accounts := accountsResponse.LBCMainnet
|
||||
if os.Getenv("REGTEST") == "true" {
|
||||
accounts = accountsResponse.LBCRegtest
|
||||
}
|
||||
for _, a := range accounts {
|
||||
_, err = s.daemon.AccountSet(a.ID, jsonrpc.AccountSettings{
|
||||
ChangeMaxUses: util.PtrToInt(1000),
|
||||
ReceivingMaxUses: util.PtrToInt(100),
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Err(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (s *Sync) walletSetup() error {
|
||||
//prevent unnecessary concurrent execution
|
||||
//prevent unnecessary concurrent execution and publishing while refilling/reallocating UTXOs
|
||||
s.walletMux.Lock()
|
||||
defer s.walletMux.Unlock()
|
||||
err := s.ensureChannelOwnership()
|
||||
|
@ -42,51 +63,55 @@ func (s *Sync) walletSetup() error {
|
|||
}
|
||||
log.Debugf("Starting balance is %.4f", balance)
|
||||
|
||||
var numOnSource int
|
||||
if s.LbryChannelName == "@UCBerkeley" {
|
||||
numOnSource = 10104
|
||||
} else {
|
||||
n, err := s.CountVideos()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
numOnSource = int(n)
|
||||
n, err := s.CountVideos()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
videosOnYoutube := int(n)
|
||||
|
||||
log.Debugf("Source channel has %d videos", numOnSource)
|
||||
if numOnSource == 0 {
|
||||
log.Debugf("Source channel has %d videos", videosOnYoutube)
|
||||
if videosOnYoutube == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.syncedVideosMux.RLock()
|
||||
numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
|
||||
publishedCount := 0
|
||||
notUpgradedCount := 0
|
||||
failedCount := 0
|
||||
for _, sv := range s.syncedVideos {
|
||||
if sv.Published {
|
||||
publishedCount++
|
||||
if sv.MetadataVersion < 2 {
|
||||
notUpgradedCount++
|
||||
}
|
||||
} else {
|
||||
failedCount++
|
||||
}
|
||||
}
|
||||
s.syncedVideosMux.RUnlock()
|
||||
log.Debugf("We already allocated credits for %d videos", numPublished)
|
||||
|
||||
if numOnSource-numPublished > s.Manager.videosLimit {
|
||||
numOnSource = s.Manager.videosLimit
|
||||
log.Debugf("We already allocated credits for %d videos", publishedCount)
|
||||
|
||||
if videosOnYoutube > s.Manager.videosLimit {
|
||||
videosOnYoutube = s.Manager.videosLimit
|
||||
}
|
||||
unallocatedVideos := videosOnYoutube - publishedCount
|
||||
requiredBalance := float64(unallocatedVideos)*(publishAmount+estimatedMaxTxFee) + channelClaimAmount
|
||||
if s.Manager.upgradeMetadata {
|
||||
requiredBalance += float64(notUpgradedCount) * 0.001
|
||||
}
|
||||
|
||||
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount
|
||||
if numPublished > numOnSource && balance < 1 {
|
||||
SendErrorToSlack("something is going on as we published more videos than those available on source: %d/%d", numPublished, numOnSource)
|
||||
minBalance = 1 //since we ended up in this function it means some juice is still needed
|
||||
refillAmount := 0.0
|
||||
if balance < requiredBalance || balance < minimumAccountBalance {
|
||||
refillAmount = math.Max(requiredBalance-requiredBalance, minimumRefillAmount)
|
||||
}
|
||||
amountToAdd := minBalance - balance
|
||||
|
||||
if s.Refill > 0 {
|
||||
if amountToAdd < 0 {
|
||||
amountToAdd = float64(s.Refill)
|
||||
} else {
|
||||
amountToAdd += float64(s.Refill)
|
||||
}
|
||||
refillAmount += float64(s.Refill)
|
||||
}
|
||||
|
||||
if amountToAdd > 0 {
|
||||
if amountToAdd < 1 {
|
||||
amountToAdd = 1 // no reason to bother adding less than 1 credit
|
||||
}
|
||||
err := s.addCredits(amountToAdd)
|
||||
if refillAmount > 0 {
|
||||
err := s.addCredits(refillAmount)
|
||||
if err != nil {
|
||||
return errors.Err(err)
|
||||
}
|
||||
|
@ -98,7 +123,7 @@ func (s *Sync) walletSetup() error {
|
|||
} else if claimAddress == nil {
|
||||
return errors.Err("could not get unused address")
|
||||
}
|
||||
s.claimAddress = string((*claimAddress)[0])
|
||||
s.claimAddress = string((*claimAddress)[0]) //TODO: remove claimAddress completely
|
||||
if s.claimAddress == "" {
|
||||
return errors.Err("found blank claim address")
|
||||
}
|
||||
|
@ -122,7 +147,7 @@ func (s *Sync) ensureEnoughUTXOs() error {
|
|||
}
|
||||
defaultAccount := ""
|
||||
for _, account := range accountsNet {
|
||||
if account.IsDefaultAccount {
|
||||
if account.IsDefault {
|
||||
defaultAccount = account.ID
|
||||
break
|
||||
}
|
||||
|
@ -161,12 +186,15 @@ func (s *Sync) ensureEnoughUTXOs() error {
|
|||
if err != nil {
|
||||
return errors.Err(err)
|
||||
}
|
||||
broadcastFee := 0.01
|
||||
amountToSplit := fmt.Sprintf("%.6f", balanceAmount-broadcastFee)
|
||||
maxUTXOs := uint64(500)
|
||||
desiredUTXOCount := uint64(math.Floor((balanceAmount) / 0.1))
|
||||
if desiredUTXOCount > maxUTXOs {
|
||||
desiredUTXOCount = maxUTXOs
|
||||
}
|
||||
log.Infof("Splitting balance of %s evenly between %d UTXOs", *balance, desiredUTXOCount)
|
||||
|
||||
log.Infof("Splitting balance of %s evenly between 40 UTXOs", *balance)
|
||||
|
||||
prefillTx, err := s.daemon.AccountFund(defaultAccount, defaultAccount, amountToSplit, uint64(target))
|
||||
broadcastFee := 0.1
|
||||
prefillTx, err := s.daemon.AccountFund(defaultAccount, defaultAccount, fmt.Sprintf("%.4f", balanceAmount-broadcastFee), desiredUTXOCount, false)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if prefillTx == nil {
|
||||
|
@ -220,7 +248,7 @@ func (s *Sync) ensureChannelOwnership() error {
|
|||
return errors.Err("no channel name set")
|
||||
}
|
||||
//@TODO: get rid of this when imported channels are supported
|
||||
if s.YoutubeChannelID == "UCkK9UDm_ZNrq_rIXCz3xCGA" || s.YoutubeChannelID == "UCW-thz5HxE-goYq8yPds1Gw" {
|
||||
if s.YoutubeChannelID == "UCW-thz5HxE-goYq8yPds1Gw" {
|
||||
return nil
|
||||
}
|
||||
channels, err := s.daemon.ChannelList(nil, 1, 50)
|
||||
|
@ -246,9 +274,11 @@ func (s *Sync) ensureChannelOwnership() error {
|
|||
}
|
||||
}
|
||||
}
|
||||
channelUsesOldMetadata := false
|
||||
if len((*channels).Items) == 1 {
|
||||
channel := ((*channels).Items)[0]
|
||||
if channel.Name == s.LbryChannelName {
|
||||
channelUsesOldMetadata = channel.Value.GetThumbnail() == nil
|
||||
//TODO: eventually get rid of this when the whole db is filled
|
||||
if s.lbryChannelID == "" {
|
||||
err = s.Manager.apiConfig.SetChannelClaimID(s.YoutubeChannelID, channel.ClaimID)
|
||||
|
@ -256,7 +286,9 @@ func (s *Sync) ensureChannelOwnership() error {
|
|||
return errors.Err("the channel in the wallet is different than the channel in the database")
|
||||
}
|
||||
s.lbryChannelID = channel.ClaimID
|
||||
return err
|
||||
if !channelUsesOldMetadata {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
return errors.Err("this channel does not belong to this wallet! Expected: %s, found: %s", s.LbryChannelName, channel.Name)
|
||||
}
|
||||
|
@ -290,7 +322,7 @@ func (s *Sync) ensureChannelOwnership() error {
|
|||
return errors.Prefix("error creating YouTube service", err)
|
||||
}
|
||||
|
||||
response, err := service.Channels.List("snippet").Id(s.YoutubeChannelID).Do()
|
||||
response, err := service.Channels.List("snippet,brandingSettings").Id(s.YoutubeChannelID).Do()
|
||||
if err != nil {
|
||||
return errors.Prefix("error getting channel details", err)
|
||||
}
|
||||
|
@ -300,24 +332,23 @@ func (s *Sync) ensureChannelOwnership() error {
|
|||
}
|
||||
|
||||
channelInfo := response.Items[0].Snippet
|
||||
channelBranding := response.Items[0].BrandingSettings
|
||||
|
||||
thumbnail := channelInfo.Thumbnails.Default
|
||||
if channelInfo.Thumbnails.Maxres != nil {
|
||||
thumbnail = channelInfo.Thumbnails.Maxres
|
||||
} else if channelInfo.Thumbnails.High != nil {
|
||||
thumbnail = channelInfo.Thumbnails.High
|
||||
} else if channelInfo.Thumbnails.Medium != nil {
|
||||
thumbnail = channelInfo.Thumbnails.Medium
|
||||
} else if channelInfo.Thumbnails.Standard != nil {
|
||||
thumbnail = channelInfo.Thumbnails.Standard
|
||||
}
|
||||
thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail.Url, s.YoutubeChannelID, aws.Config{
|
||||
Credentials: credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, ""),
|
||||
Region: &s.AwsS3Region,
|
||||
})
|
||||
thumbnail := thumbs.GetBestThumbnail(channelInfo.Thumbnails)
|
||||
thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail.Url, s.YoutubeChannelID, s.Manager.GetS3AWSConfig())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var bannerURL *string
|
||||
if channelBranding.Image != nil && channelBranding.Image.BannerImageUrl != "" {
|
||||
bURL, err := thumbs.MirrorThumbnail(channelBranding.Image.BannerImageUrl, "banner-"+s.YoutubeChannelID, s.Manager.GetS3AWSConfig())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bannerURL = &bURL
|
||||
}
|
||||
|
||||
var languages []string = nil
|
||||
if channelInfo.DefaultLanguage != "" {
|
||||
languages = []string{channelInfo.DefaultLanguage}
|
||||
|
@ -326,16 +357,32 @@ func (s *Sync) ensureChannelOwnership() error {
|
|||
if channelInfo.Country != "" {
|
||||
locations = []jsonrpc.Location{{Country: util.PtrToString(channelInfo.Country)}}
|
||||
}
|
||||
c, err := s.daemon.ChannelCreate(s.LbryChannelName, channelBidAmount, jsonrpc.ChannelCreateOptions{
|
||||
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{
|
||||
Title: channelInfo.Title,
|
||||
Description: channelInfo.Description,
|
||||
Tags: nil,
|
||||
Languages: languages,
|
||||
Locations: locations,
|
||||
ThumbnailURL: &thumbnailURL,
|
||||
},
|
||||
})
|
||||
var c *jsonrpc.TransactionSummary
|
||||
claimCreateOptions := jsonrpc.ClaimCreateOptions{
|
||||
Title: &channelInfo.Title,
|
||||
Description: &channelInfo.Description,
|
||||
Tags: tagsManager.GetTagsForChannel(s.YoutubeChannelID),
|
||||
Languages: languages,
|
||||
Locations: locations,
|
||||
ThumbnailURL: &thumbnailURL,
|
||||
}
|
||||
if channelUsesOldMetadata {
|
||||
c, err = s.daemon.ChannelUpdate(s.lbryChannelID, jsonrpc.ChannelUpdateOptions{
|
||||
ClearTags: util.PtrToBool(true),
|
||||
ClearLocations: util.PtrToBool(true),
|
||||
ClearLanguages: util.PtrToBool(true),
|
||||
ChannelCreateOptions: jsonrpc.ChannelCreateOptions{
|
||||
ClaimCreateOptions: claimCreateOptions,
|
||||
CoverURL: bannerURL,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
c, err = s.daemon.ChannelCreate(s.LbryChannelName, channelBidAmount, jsonrpc.ChannelCreateOptions{
|
||||
ClaimCreateOptions: claimCreateOptions,
|
||||
CoverURL: bannerURL,
|
||||
})
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -1,11 +1,7 @@
|
|||
package manager
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/csv"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
|
@ -21,6 +17,7 @@ import (
|
|||
"github.com/lbryio/ytsync/namer"
|
||||
"github.com/lbryio/ytsync/sdk"
|
||||
"github.com/lbryio/ytsync/sources"
|
||||
"github.com/lbryio/ytsync/thumbs"
|
||||
|
||||
"github.com/lbryio/lbry.go/extras/errors"
|
||||
"github.com/lbryio/lbry.go/extras/jsonrpc"
|
||||
|
@ -40,9 +37,12 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
channelClaimAmount = 0.01
|
||||
publishAmount = 0.01
|
||||
maxReasonLength = 500
|
||||
channelClaimAmount = 0.01
|
||||
estimatedMaxTxFee = 0.1
|
||||
minimumAccountBalance = 4.0
|
||||
minimumRefillAmount = 1
|
||||
publishAmount = 0.01
|
||||
maxReasonLength = 500
|
||||
)
|
||||
|
||||
type video interface {
|
||||
|
@ -51,7 +51,7 @@ type video interface {
|
|||
IDAndNum() string
|
||||
PlaylistPosition() int
|
||||
PublishedAt() time.Time
|
||||
Sync(*jsonrpc.Client, string, float64, string, int, *namer.Namer, float64) (*sources.SyncSummary, error)
|
||||
Sync(*jsonrpc.Client, sources.SyncParams, *sdk.SyncedVideo, bool, *sync.RWMutex) (*sources.SyncSummary, error)
|
||||
}
|
||||
|
||||
// sorting videos
|
||||
|
@ -77,27 +77,30 @@ type Sync struct {
|
|||
AwsS3Secret string
|
||||
AwsS3Region string
|
||||
AwsS3Bucket string
|
||||
|
||||
daemon *jsonrpc.Client
|
||||
claimAddress string
|
||||
videoDirectory string
|
||||
syncedVideosMux *sync.RWMutex
|
||||
syncedVideos map[string]sdk.SyncedVideo
|
||||
grp *stop.Group
|
||||
lbryChannelID string
|
||||
namer *namer.Namer
|
||||
|
||||
walletMux *sync.Mutex
|
||||
queue chan video
|
||||
Fee *sdk.Fee
|
||||
daemon *jsonrpc.Client
|
||||
claimAddress string
|
||||
videoDirectory string
|
||||
syncedVideosMux *sync.RWMutex
|
||||
syncedVideos map[string]sdk.SyncedVideo
|
||||
grp *stop.Group
|
||||
lbryChannelID string
|
||||
namer *namer.Namer
|
||||
walletMux *sync.RWMutex
|
||||
queue chan video
|
||||
}
|
||||
|
||||
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) {
|
||||
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string, claimID string, metadataVersion int8, size int64) {
|
||||
s.syncedVideosMux.Lock()
|
||||
defer s.syncedVideosMux.Unlock()
|
||||
s.syncedVideos[videoID] = sdk.SyncedVideo{
|
||||
VideoID: videoID,
|
||||
Published: published,
|
||||
FailureReason: failureReason,
|
||||
VideoID: videoID,
|
||||
Published: published,
|
||||
FailureReason: failureReason,
|
||||
ClaimID: claimID,
|
||||
ClaimName: claimName,
|
||||
MetadataVersion: metadataVersion,
|
||||
Size: size,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -258,7 +261,7 @@ func (s *Sync) FullCycle() (e error) {
|
|||
s.setExceptions()
|
||||
|
||||
s.syncedVideosMux = &sync.RWMutex{}
|
||||
s.walletMux = &sync.Mutex{}
|
||||
s.walletMux = &sync.RWMutex{}
|
||||
s.grp = stop.New()
|
||||
s.queue = make(chan video)
|
||||
interruptChan := make(chan os.Signal, 1)
|
||||
|
@ -319,12 +322,14 @@ func (s *Sync) FullCycle() (e error) {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Sync) setChannelTerminationStatus(e *error) {
|
||||
if *e != nil {
|
||||
//conditions for which a channel shouldn't be marked as failed
|
||||
noFailConditions := []string{
|
||||
"this youtube channel is being managed by another server",
|
||||
"interrupted during daemon startup",
|
||||
"playlist items not found",
|
||||
}
|
||||
if util.SubstringInSlice((*e).Error(), noFailConditions) {
|
||||
return
|
||||
|
@ -390,11 +395,10 @@ func logShutdownError(shutdownErr error) {
|
|||
|
||||
var thumbnailHosts = []string{
|
||||
"berk.ninja/thumbnails/",
|
||||
"https://thumbnails.lbry.com/",
|
||||
thumbs.ThumbnailEndpoint,
|
||||
}
|
||||
|
||||
func isYtsyncClaim(c jsonrpc.Claim) bool {
|
||||
|
||||
if !util.InSlice(c.Type, []string{"claim", "update"}) || c.Value.GetStream() == nil {
|
||||
return false
|
||||
}
|
||||
|
@ -403,7 +407,12 @@ func isYtsyncClaim(c jsonrpc.Claim) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
return util.InSlice(c.Value.GetThumbnail().GetUrl(), thumbnailHosts)
|
||||
for _, th := range thumbnailHosts {
|
||||
if strings.Contains(c.Value.GetThumbnail().GetUrl(), th) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// fixDupes abandons duplicate claims
|
||||
|
@ -442,8 +451,10 @@ func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) {
|
|||
}
|
||||
|
||||
//updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published
|
||||
func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err error) {
|
||||
//additionally it removes all entries in the database indicating that a video is published when it's actually not
|
||||
func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total, fixed, removed int, err error) {
|
||||
count := 0
|
||||
videoIDMap := make(map[string]string, len(claims))
|
||||
for _, c := range claims {
|
||||
if !isYtsyncClaim(c) {
|
||||
continue
|
||||
|
@ -452,18 +463,60 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err
|
|||
//check if claimID is in remote db
|
||||
tn := c.Value.GetThumbnail().GetUrl()
|
||||
videoID := tn[strings.LastIndex(tn, "/")+1:]
|
||||
pv, ok := s.syncedVideos[videoID]
|
||||
if !ok || pv.ClaimName != c.Name {
|
||||
fixed++
|
||||
log.Debugf("adding %s to the database", c.Name)
|
||||
videoIDMap[videoID] = c.ClaimID
|
||||
pv, claimInDatabase := s.syncedVideos[videoID]
|
||||
claimMetadataVersion := uint(1)
|
||||
if strings.Contains(tn, thumbs.ThumbnailEndpoint) {
|
||||
claimMetadataVersion = 2
|
||||
}
|
||||
|
||||
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil)
|
||||
metadataDiffers := claimInDatabase && pv.MetadataVersion != int8(claimMetadataVersion)
|
||||
claimIDDiffers := claimInDatabase && pv.ClaimID != c.ClaimID
|
||||
claimNameDiffers := claimInDatabase && pv.ClaimName != c.Name
|
||||
claimMarkedUnpublished := claimInDatabase && !pv.Published
|
||||
if metadataDiffers {
|
||||
log.Debugf("%s: Mismatch in database for metadata. DB: %d - Blockchain: %d", videoID, pv.MetadataVersion, claimMetadataVersion)
|
||||
}
|
||||
if claimIDDiffers {
|
||||
log.Debugf("%s: Mismatch in database for claimID. DB: %s - Blockchain: %s", videoID, pv.ClaimID, c.ClaimID)
|
||||
}
|
||||
if claimIDDiffers {
|
||||
log.Debugf("%s: Mismatch in database for claimName. DB: %s - Blockchain: %s", videoID, pv.ClaimName, c.Name)
|
||||
}
|
||||
if claimMarkedUnpublished {
|
||||
log.Debugf("%s: Mismatch in database: published but marked as unpublished", videoID)
|
||||
}
|
||||
if !claimInDatabase {
|
||||
log.Debugf("%s: Published but is not in database (%s - %s)", videoID, c.Name, c.ClaimID)
|
||||
}
|
||||
if !claimInDatabase || metadataDiffers || claimIDDiffers || claimNameDiffers || claimMarkedUnpublished {
|
||||
claimSize, err := c.GetStreamSizeByMagic()
|
||||
if err != nil {
|
||||
return count, fixed, err
|
||||
claimSize = 0
|
||||
}
|
||||
fixed++
|
||||
log.Debugf("updating %s in the database", videoID)
|
||||
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", util.PtrToInt64(int64(claimSize)), claimMetadataVersion)
|
||||
if err != nil {
|
||||
return count, fixed, 0, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return count, fixed, nil
|
||||
idsToRemove := make([]string, 0, len(videoIDMap))
|
||||
for vID, sv := range s.syncedVideos {
|
||||
_, ok := videoIDMap[vID]
|
||||
if !ok && sv.Published {
|
||||
log.Debugf("%s: claims to be published but wasn't found in the list of claims and will be removed if --remove-db-unpublished was specified", vID)
|
||||
idsToRemove = append(idsToRemove, vID)
|
||||
}
|
||||
}
|
||||
if s.Manager.removeDBUnpublished && len(idsToRemove) > 0 {
|
||||
err := s.Manager.apiConfig.DeleteVideos(idsToRemove)
|
||||
if err != nil {
|
||||
return count, fixed, 0, err
|
||||
}
|
||||
}
|
||||
return count, fixed, len(idsToRemove), nil
|
||||
}
|
||||
|
||||
func (s *Sync) getClaims() ([]jsonrpc.Claim, error) {
|
||||
|
@ -481,7 +534,10 @@ func (s *Sync) getClaims() ([]jsonrpc.Claim, error) {
|
|||
}
|
||||
|
||||
func (s *Sync) doSync() error {
|
||||
var err error
|
||||
err := s.enableAddressReuse()
|
||||
if err != nil {
|
||||
return errors.Prefix("could not set address reuse policy", err)
|
||||
}
|
||||
err = s.walletSetup()
|
||||
if err != nil {
|
||||
return errors.Prefix("Initial wallet setup failed! Manual Intervention is required.", err)
|
||||
|
@ -506,16 +562,21 @@ func (s *Sync) doSync() error {
|
|||
}
|
||||
}
|
||||
|
||||
pubsOnWallet, nFixed, err := s.updateRemoteDB(allClaims)
|
||||
pubsOnWallet, nFixed, nRemoved, err := s.updateRemoteDB(allClaims)
|
||||
if err != nil {
|
||||
return errors.Prefix("error counting claims", err)
|
||||
return errors.Prefix("error updating remote database", err)
|
||||
}
|
||||
if nFixed > 0 {
|
||||
if nFixed > 0 || nRemoved > 0 {
|
||||
err := s.setStatusSyncing()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed)
|
||||
if nFixed > 0 {
|
||||
SendInfoToSlack("%d claims had mismatched database info or were completely missing and were fixed", nFixed)
|
||||
}
|
||||
if nRemoved > 0 {
|
||||
SendInfoToSlack("%d were marked as published but weren't actually published and thus removed from the database", nRemoved)
|
||||
}
|
||||
}
|
||||
pubsOnDB := 0
|
||||
for _, sv := range s.syncedVideos {
|
||||
|
@ -545,7 +606,7 @@ func (s *Sync) doSync() error {
|
|||
}
|
||||
|
||||
if s.LbryChannelName == "@UCBerkeley" {
|
||||
err = s.enqueueUCBVideos()
|
||||
err = errors.Err("UCB is not supported in this version of YTSYNC")
|
||||
} else {
|
||||
err = s.enqueueYoutubeVideos()
|
||||
}
|
||||
|
@ -584,7 +645,7 @@ func (s *Sync) startWorker(workerNum int) {
|
|||
err := s.processVideo(v)
|
||||
|
||||
if err != nil {
|
||||
logMsg := "error processing video: " + err.Error()
|
||||
logMsg := fmt.Sprintf("error processing video %s: %s", v.ID(), err.Error())
|
||||
log.Errorln(logMsg)
|
||||
fatalErrors := []string{
|
||||
":5279: read: connection reset by peer",
|
||||
|
@ -613,38 +674,63 @@ func (s *Sync) startWorker(workerNum int) {
|
|||
"no compatible format available for this video",
|
||||
"Watch this video on YouTube.",
|
||||
"have blocked it on copyright grounds",
|
||||
"the video must be republished as we can't get the right size",
|
||||
}
|
||||
if util.SubstringInSlice(err.Error(), errorsNoRetry) {
|
||||
if strings.Contains(err.Error(), "txn-mempool-conflict") ||
|
||||
strings.Contains(err.Error(), "too-long-mempool-chain") {
|
||||
log.Println("waiting for a block before retrying")
|
||||
err := s.waitForNewBlock()
|
||||
if err != nil {
|
||||
s.grp.Stop()
|
||||
SendErrorToSlack("something went wrong while waiting for a block: %v", err)
|
||||
break
|
||||
}
|
||||
} else if util.SubstringInSlice(err.Error(), errorsNoRetry) {
|
||||
log.Println("This error should not be retried at all")
|
||||
} else if tryCount < s.MaxTries {
|
||||
if strings.Contains(err.Error(), "txn-mempool-conflict") ||
|
||||
strings.Contains(err.Error(), "too-long-mempool-chain") {
|
||||
log.Println("waiting for a block before retrying")
|
||||
err = s.waitForNewBlock()
|
||||
if err != nil {
|
||||
s.grp.Stop()
|
||||
SendErrorToSlack("something went wrong while waiting for a block: %v", err)
|
||||
break
|
||||
}
|
||||
} else if util.SubstringInSlice(err.Error(), []string{
|
||||
if util.SubstringInSlice(err.Error(), []string{
|
||||
"Not enough funds to cover this transaction",
|
||||
"failed: Not enough funds",
|
||||
"Error in daemon: Insufficient funds, please deposit additional LBC"}) {
|
||||
log.Println("refilling addresses before retrying")
|
||||
err = s.walletSetup()
|
||||
"Error in daemon: Insufficient funds, please deposit additional LBC",
|
||||
// "txn-mempool-conflict", //TODO: uncomment the two lines when the SDK will start spending confirmed UTXOs before failing
|
||||
//"too-long-mempool-chain",
|
||||
}) {
|
||||
log.Println("checking funds and UTXOs before retrying...")
|
||||
err := s.walletSetup()
|
||||
if err != nil {
|
||||
s.grp.Stop()
|
||||
SendErrorToSlack("failed to setup the wallet for a refill: %v", err)
|
||||
break
|
||||
}
|
||||
} else if strings.Contains(err.Error(), "Error in daemon: 'str' object has no attribute 'get'") {
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
log.Println("Retrying")
|
||||
continue
|
||||
}
|
||||
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
|
||||
}
|
||||
s.AppendSyncedVideo(v.ID(), false, err.Error(), "")
|
||||
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
|
||||
existingClaim, ok := s.syncedVideos[v.ID()]
|
||||
existingClaimID := ""
|
||||
existingClaimName := ""
|
||||
existingClaimSize := int64(0)
|
||||
if v.Size() != nil {
|
||||
existingClaimSize = *v.Size()
|
||||
}
|
||||
if ok {
|
||||
existingClaimID = existingClaim.ClaimID
|
||||
existingClaimName = existingClaim.ClaimName
|
||||
if existingClaim.Size > 0 {
|
||||
existingClaimSize = existingClaim.Size
|
||||
}
|
||||
}
|
||||
videoStatus := VideoStatusFailed
|
||||
if strings.Contains(err.Error(), "upgrade failed") {
|
||||
videoStatus = VideoStatusUpgradeFailed
|
||||
} else {
|
||||
s.AppendSyncedVideo(v.ID(), false, err.Error(), existingClaimName, existingClaimID, 0, existingClaimSize)
|
||||
}
|
||||
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), videoStatus, existingClaimID, existingClaimName, err.Error(), &existingClaimSize, 0)
|
||||
if err != nil {
|
||||
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
||||
}
|
||||
|
@ -683,7 +769,7 @@ func (s *Sync) enqueueYoutubeVideos() error {
|
|||
}
|
||||
|
||||
var videos []video
|
||||
|
||||
playlistMap := make(map[string]*youtube.PlaylistItemSnippet, 50)
|
||||
nextPageToken := ""
|
||||
for {
|
||||
req := service.PlaylistItems.List("snippet").
|
||||
|
@ -705,7 +791,7 @@ func (s *Sync) enqueueYoutubeVideos() error {
|
|||
}
|
||||
return errors.Err("playlist items not found")
|
||||
}
|
||||
playlistMap := make(map[string]*youtube.PlaylistItemSnippet, 50)
|
||||
//playlistMap := make(map[string]*youtube.PlaylistItemSnippet, 50)
|
||||
videoIDs := make([]string, 50)
|
||||
for i, item := range playlistResponse.Items {
|
||||
// normally we'd send the video into the channel here, but youtube api doesn't have sorting
|
||||
|
@ -713,14 +799,14 @@ func (s *Sync) enqueueYoutubeVideos() error {
|
|||
playlistMap[item.Snippet.ResourceId.VideoId] = item.Snippet
|
||||
videoIDs[i] = item.Snippet.ResourceId.VideoId
|
||||
}
|
||||
req2 := service.Videos.List("snippet,contentDetails").Id(strings.Join(videoIDs[:], ","))
|
||||
req2 := service.Videos.List("snippet,contentDetails,recordingDetails").Id(strings.Join(videoIDs[:], ","))
|
||||
|
||||
videosListResponse, err := req2.Do()
|
||||
if err != nil {
|
||||
return errors.Prefix("error getting videos info", err)
|
||||
}
|
||||
for _, item := range videosListResponse.Items {
|
||||
videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item, playlistMap[item.Id].Position))
|
||||
videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item, playlistMap[item.Id].Position, s.Manager.GetS3AWSConfig()))
|
||||
}
|
||||
|
||||
log.Infof("Got info for %d videos from youtube API", len(videos))
|
||||
|
@ -730,7 +816,16 @@ func (s *Sync) enqueueYoutubeVideos() error {
|
|||
break
|
||||
}
|
||||
}
|
||||
for k, v := range s.syncedVideos {
|
||||
if !v.Published {
|
||||
continue
|
||||
}
|
||||
_, ok := playlistMap[k]
|
||||
if !ok {
|
||||
videos = append(videos, sources.NewMockedVideo(s.videoDirectory, k, s.YoutubeChannelID, s.Manager.GetS3AWSConfig()))
|
||||
}
|
||||
|
||||
}
|
||||
sort.Sort(byPublishedAt(videos))
|
||||
//or sort.Sort(sort.Reverse(byPlaylistPosition(videos)))
|
||||
|
||||
|
@ -752,55 +847,6 @@ Enqueue:
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Sync) enqueueUCBVideos() error {
|
||||
var videos []video
|
||||
|
||||
csvFile, err := os.Open("ucb.csv")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reader := csv.NewReader(bufio.NewReader(csvFile))
|
||||
for {
|
||||
line, err := reader.Read()
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
data := struct {
|
||||
PublishedAt string `json:"publishedAt"`
|
||||
}{}
|
||||
err = json.Unmarshal([]byte(line[4]), &data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
videos = append(videos, sources.NewUCBVideo(line[0], line[2], line[1], line[3], data.PublishedAt, s.videoDirectory))
|
||||
}
|
||||
|
||||
log.Printf("Publishing %d videos\n", len(videos))
|
||||
|
||||
sort.Sort(byPublishedAt(videos))
|
||||
|
||||
Enqueue:
|
||||
for _, v := range videos {
|
||||
select {
|
||||
case <-s.grp.Ch():
|
||||
break Enqueue
|
||||
default:
|
||||
}
|
||||
|
||||
select {
|
||||
case s.queue <- v:
|
||||
case <-s.grp.Ch():
|
||||
break Enqueue
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Sync) processVideo(v video) (err error) {
|
||||
defer func() {
|
||||
if p := recover(); p != nil {
|
||||
|
@ -822,7 +868,9 @@ func (s *Sync) processVideo(v video) (err error) {
|
|||
s.syncedVideosMux.RLock()
|
||||
sv, ok := s.syncedVideos[v.ID()]
|
||||
s.syncedVideosMux.RUnlock()
|
||||
newMetadataVersion := int8(2)
|
||||
alreadyPublished := ok && sv.Published
|
||||
videoRequiresUpgrade := ok && s.Manager.upgradeMetadata && sv.MetadataVersion < newMetadataVersion
|
||||
|
||||
neverRetryFailures := []string{
|
||||
"Error extracting sts from embedded url response",
|
||||
|
@ -838,12 +886,16 @@ func (s *Sync) processVideo(v video) (err error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
if alreadyPublished {
|
||||
if alreadyPublished && !videoRequiresUpgrade {
|
||||
log.Println(v.ID() + " already published")
|
||||
return nil
|
||||
}
|
||||
if ok && sv.MetadataVersion >= newMetadataVersion {
|
||||
log.Println(v.ID() + " upgraded to the new metadata")
|
||||
return nil
|
||||
}
|
||||
|
||||
if v.PlaylistPosition() > s.Manager.videosLimit {
|
||||
if !videoRequiresUpgrade && v.PlaylistPosition() > s.Manager.videosLimit {
|
||||
log.Println(v.ID() + " is old: skipping")
|
||||
return nil
|
||||
}
|
||||
|
@ -851,19 +903,27 @@ func (s *Sync) processVideo(v video) (err error) {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sp := sources.SyncParams{
|
||||
ClaimAddress: s.claimAddress,
|
||||
Amount: publishAmount,
|
||||
ChannelID: s.lbryChannelID,
|
||||
MaxVideoSize: s.Manager.maxVideoSize,
|
||||
Namer: s.namer,
|
||||
MaxVideoLength: s.Manager.maxVideoLength,
|
||||
Fee: s.Fee,
|
||||
}
|
||||
|
||||
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.maxVideoSize, s.namer, s.Manager.maxVideoLength)
|
||||
summary, err := v.Sync(s.daemon, sp, &sv, videoRequiresUpgrade, s.walletMux)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
|
||||
s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName, summary.ClaimID, newMetadataVersion, *v.Size())
|
||||
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size(), 2)
|
||||
if err != nil {
|
||||
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
|
||||
}
|
||||
|
||||
s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
72
sdk/api.go
72
sdk/api.go
|
@ -2,6 +2,7 @@ package sdk
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
@ -33,16 +34,17 @@ type SyncProperties struct {
|
|||
YoutubeChannelID string
|
||||
}
|
||||
|
||||
type Fee struct {
|
||||
Amount string `json:"amount"`
|
||||
Address string `json:"address"`
|
||||
Currency string `json:"currency"`
|
||||
}
|
||||
type YoutubeChannel struct {
|
||||
ChannelId string `json:"channel_id"`
|
||||
TotalVideos uint `json:"total_videos"`
|
||||
DesiredChannelName string `json:"desired_channel_name"`
|
||||
Fee *struct {
|
||||
Amount string `json:"amount"`
|
||||
Address string `json:"address"`
|
||||
Currency string `json:"currency"`
|
||||
} `json:"fee"`
|
||||
ChannelClaimID string `json:"channel_claim_id"`
|
||||
Fee *Fee `json:"fee"`
|
||||
ChannelClaimID string `json:"channel_claim_id"`
|
||||
}
|
||||
|
||||
func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeChannel, error) {
|
||||
|
@ -76,10 +78,13 @@ func (a *APIConfig) FetchChannels(status string, cp *SyncProperties) ([]YoutubeC
|
|||
}
|
||||
|
||||
type SyncedVideo struct {
|
||||
VideoID string `json:"video_id"`
|
||||
Published bool `json:"published"`
|
||||
FailureReason string `json:"failure_reason"`
|
||||
ClaimName string `json:"claim_name"`
|
||||
VideoID string `json:"video_id"`
|
||||
Published bool `json:"published"`
|
||||
FailureReason string `json:"failure_reason"`
|
||||
ClaimName string `json:"claim_name"`
|
||||
ClaimID string `json:"claim_id"`
|
||||
Size int64 `json:"size"`
|
||||
MetadataVersion int8 `json:"metadata_version"`
|
||||
}
|
||||
|
||||
func sanitizeFailureReason(s *string) {
|
||||
|
@ -121,7 +126,9 @@ func (a *APIConfig) SetChannelStatus(channelID string, status string, failureRea
|
|||
claimNames := make(map[string]bool)
|
||||
for _, v := range response.Data {
|
||||
svs[v.VideoID] = v
|
||||
claimNames[v.ClaimName] = v.Published
|
||||
if v.ClaimName != "" {
|
||||
claimNames[v.ClaimName] = v.Published
|
||||
}
|
||||
}
|
||||
return svs, claimNames, nil
|
||||
}
|
||||
|
@ -157,11 +164,41 @@ func (a *APIConfig) SetChannelClaimID(channelID string, channelClaimID string) e
|
|||
}
|
||||
|
||||
const (
|
||||
VideoStatusPublished = "published"
|
||||
VideoStatusFailed = "failed"
|
||||
VideoStatusPublished = "published"
|
||||
VideoStatusUpgradeFailed = "upgradefailed"
|
||||
VideoStatusFailed = "failed"
|
||||
)
|
||||
|
||||
func (a *APIConfig) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error {
|
||||
func (a *APIConfig) DeleteVideos(videos []string) error {
|
||||
endpoint := a.ApiURL + "/yt/video_delete"
|
||||
videoIDs := strings.Join(videos, ",")
|
||||
vals := url.Values{
|
||||
"video_ids": {videoIDs},
|
||||
"auth_token": {a.ApiToken},
|
||||
}
|
||||
res, _ := http.PostForm(endpoint, vals)
|
||||
defer res.Body.Close()
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
var response struct {
|
||||
Success bool `json:"success"`
|
||||
Error null.String `json:"error"`
|
||||
Data null.String `json:"data"`
|
||||
}
|
||||
err := json.Unmarshal(body, &response)
|
||||
if err != nil {
|
||||
return errors.Err(err)
|
||||
}
|
||||
if !response.Error.IsNull() {
|
||||
return errors.Err(response.Error.String)
|
||||
}
|
||||
|
||||
if !response.Data.IsNull() && response.Data.String == "ok" {
|
||||
return nil
|
||||
}
|
||||
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
|
||||
}
|
||||
|
||||
func (a *APIConfig) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64, metadataVersion uint) error {
|
||||
endpoint := a.ApiURL + "/yt/video_status"
|
||||
|
||||
sanitizeFailureReason(&failureReason)
|
||||
|
@ -171,13 +208,16 @@ func (a *APIConfig) MarkVideoStatus(channelID string, videoID string, status str
|
|||
"status": {status},
|
||||
"auth_token": {a.ApiToken},
|
||||
}
|
||||
if status == VideoStatusPublished {
|
||||
if status == VideoStatusPublished || status == VideoStatusUpgradeFailed {
|
||||
if claimID == "" || claimName == "" {
|
||||
return errors.Err("claimID or claimName missing")
|
||||
return errors.Err("claimID (%s) or claimName (%s) missing", claimID, claimName)
|
||||
}
|
||||
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
|
||||
vals.Add("claim_id", claimID)
|
||||
vals.Add("claim_name", claimName)
|
||||
if metadataVersion > 0 {
|
||||
vals.Add("metadata_version", fmt.Sprintf("%d", metadataVersion))
|
||||
}
|
||||
if size != nil {
|
||||
vals.Add("size", strconv.FormatInt(*size, 10))
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package sources
|
|||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/lbryio/lbry.go/extras/jsonrpc"
|
||||
"github.com/lbryio/ytsync/namer"
|
||||
|
@ -12,7 +13,9 @@ type SyncSummary struct {
|
|||
ClaimName string
|
||||
}
|
||||
|
||||
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.StreamCreateOptions, namer *namer.Namer) (*SyncSummary, error) {
|
||||
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.StreamCreateOptions, namer *namer.Namer, walletLock *sync.RWMutex) (*SyncSummary, error) {
|
||||
walletLock.RLock()
|
||||
defer walletLock.RUnlock()
|
||||
for {
|
||||
name := namer.GetNextName(title)
|
||||
response, err := daemon.StreamCreate(name, filename, amount, options)
|
||||
|
|
|
@ -1,219 +0,0 @@
|
|||
package sources
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"os"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
"github.com/lbryio/lbry.go/extras/errors"
|
||||
"github.com/lbryio/lbry.go/extras/jsonrpc"
|
||||
"github.com/lbryio/ytsync/namer"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type ucbVideo struct {
|
||||
id string
|
||||
title string
|
||||
channel string
|
||||
description string
|
||||
publishedAt time.Time
|
||||
dir string
|
||||
claimNames map[string]bool
|
||||
syncedVideosMux *sync.RWMutex
|
||||
}
|
||||
|
||||
func NewUCBVideo(id, title, channel, description, publishedAt, dir string) *ucbVideo {
|
||||
p, _ := time.Parse(time.RFC3339Nano, publishedAt) // ignore parse errors
|
||||
return &ucbVideo{
|
||||
id: id,
|
||||
title: title,
|
||||
description: description,
|
||||
channel: channel,
|
||||
dir: dir,
|
||||
publishedAt: p,
|
||||
}
|
||||
}
|
||||
|
||||
func (v *ucbVideo) ID() string {
|
||||
return v.id
|
||||
}
|
||||
|
||||
func (v *ucbVideo) PlaylistPosition() int {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (v *ucbVideo) IDAndNum() string {
|
||||
return v.ID() + " (?)"
|
||||
}
|
||||
|
||||
func (v *ucbVideo) PublishedAt() time.Time {
|
||||
return v.publishedAt
|
||||
//r := regexp.MustCompile(`(\d\d\d\d)-(\d\d)-(\d\d)`)
|
||||
//matches := r.FindStringSubmatch(v.title)
|
||||
//if len(matches) > 0 {
|
||||
// year, _ := strconv.Atoi(matches[1])
|
||||
// month, _ := strconv.Atoi(matches[2])
|
||||
// day, _ := strconv.Atoi(matches[3])
|
||||
// return time.Date(year, time.Month(month), day, 0, 0, 0, 0, time.UTC)
|
||||
//}
|
||||
//return time.Now()
|
||||
}
|
||||
|
||||
func (v *ucbVideo) getFilename() string {
|
||||
return v.dir + "/" + v.id + ".mp4"
|
||||
}
|
||||
|
||||
func (v *ucbVideo) getClaimName(attempt int) string {
|
||||
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
|
||||
suffix := ""
|
||||
if attempt > 1 {
|
||||
suffix = "-" + strconv.Itoa(attempt)
|
||||
}
|
||||
maxLen := 40 - len(suffix)
|
||||
|
||||
chunks := strings.Split(strings.ToLower(strings.Trim(reg.ReplaceAllString(v.title, "-"), "-")), "-")
|
||||
|
||||
name := chunks[0]
|
||||
if len(name) > maxLen {
|
||||
return name[:maxLen]
|
||||
}
|
||||
|
||||
for _, chunk := range chunks[1:] {
|
||||
tmpName := name + "-" + chunk
|
||||
if len(tmpName) > maxLen {
|
||||
if len(name) < 20 {
|
||||
name = tmpName[:maxLen]
|
||||
}
|
||||
break
|
||||
}
|
||||
name = tmpName
|
||||
}
|
||||
|
||||
return name + suffix
|
||||
}
|
||||
|
||||
func (v *ucbVideo) getAbbrevDescription() string {
|
||||
maxLines := 10
|
||||
description := strings.TrimSpace(v.description)
|
||||
if strings.Count(description, "\n") < maxLines {
|
||||
return description
|
||||
}
|
||||
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
|
||||
}
|
||||
|
||||
func (v *ucbVideo) download() error {
|
||||
videoPath := v.getFilename()
|
||||
|
||||
_, err := os.Stat(videoPath)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
} else if err == nil {
|
||||
log.Debugln(v.id + " already exists at " + videoPath)
|
||||
return nil
|
||||
}
|
||||
|
||||
creds := credentials.NewStaticCredentials("ID-GOES-HERE", "SECRET-GOES-HERE", "")
|
||||
s, err := session.NewSession(&aws.Config{Region: aws.String("us-east-2"), Credentials: creds})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
downloader := s3manager.NewDownloader(s)
|
||||
|
||||
out, err := os.Create(videoPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer out.Close()
|
||||
|
||||
log.Println("lbry-niko2/videos/" + v.channel + "/" + v.id)
|
||||
|
||||
bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{
|
||||
Bucket: aws.String("lbry-niko2"),
|
||||
Key: aws.String("/videos/" + v.channel + "/" + v.id + ".mp4"),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
} else if bytesWritten == 0 {
|
||||
return errors.Err("zero bytes written")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *ucbVideo) saveThumbnail() error {
|
||||
resp, err := http.Get("https://s3.us-east-2.amazonaws.com/lbry-niko2/thumbnails/" + v.id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
creds := credentials.NewStaticCredentials("ID-GOES-HERE", "SECRET-GOES-HERE", "")
|
||||
s, err := session.NewSession(&aws.Config{Region: aws.String("us-east-2"), Credentials: creds})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
uploader := s3manager.NewUploader(s)
|
||||
|
||||
_, err = uploader.Upload(&s3manager.UploadInput{
|
||||
Bucket: aws.String("berk.ninja"),
|
||||
Key: aws.String("thumbnails/" + v.id),
|
||||
ContentType: aws.String("image/jpeg"),
|
||||
Body: resp.Body,
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *namer.Namer) (*SyncSummary, error) {
|
||||
options := jsonrpc.StreamCreateOptions{
|
||||
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{
|
||||
Title: v.title,
|
||||
Description: v.getAbbrevDescription(),
|
||||
ClaimAddress: &claimAddress,
|
||||
Languages: []string{"en"},
|
||||
ThumbnailURL: strPtr("https://berk.ninja/thumbnails/" + v.id),
|
||||
Tags: []string{},
|
||||
},
|
||||
Author: strPtr("UC Berkeley"),
|
||||
License: strPtr("see description"),
|
||||
StreamType: &jsonrpc.StreamTypeVideo,
|
||||
ChannelID: &channelID,
|
||||
}
|
||||
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, namer)
|
||||
}
|
||||
|
||||
func (v *ucbVideo) Size() *int64 {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *namer.Namer, maxVideoLength float64) (*SyncSummary, error) {
|
||||
//download and thumbnail can be done in parallel
|
||||
err := v.download()
|
||||
if err != nil {
|
||||
return nil, errors.Prefix("download error", err)
|
||||
}
|
||||
log.Debugln("Downloaded " + v.id)
|
||||
|
||||
//err = v.SaveThumbnail()
|
||||
//if err != nil {
|
||||
// return errors.WrapPrefix(err, "thumbnail error", 0)
|
||||
//}
|
||||
//log.Debugln("Created thumbnail for " + v.id)
|
||||
|
||||
summary, err := v.publish(daemon, claimAddress, amount, channelID, namer)
|
||||
if err != nil {
|
||||
return nil, errors.Prefix("publish error", err)
|
||||
}
|
||||
|
||||
return summary, nil
|
||||
}
|
|
@ -1,31 +1,37 @@
|
|||
package sources
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/extras/errors"
|
||||
"github.com/lbryio/lbry.go/extras/jsonrpc"
|
||||
"github.com/lbryio/lbry.go/extras/util"
|
||||
|
||||
"github.com/lbryio/ytsync/namer"
|
||||
"github.com/lbryio/ytsync/sdk"
|
||||
"github.com/lbryio/ytsync/tagsManager"
|
||||
"github.com/lbryio/ytsync/thumbs"
|
||||
|
||||
"github.com/ChannelMeter/iso8601duration"
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/nikooo777/ytdl"
|
||||
"github.com/shopspring/decimal"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/api/youtube/v3"
|
||||
)
|
||||
|
||||
type YoutubeVideo struct {
|
||||
id string
|
||||
channelTitle string
|
||||
title string
|
||||
description string
|
||||
playlistPosition int64
|
||||
|
@ -35,20 +41,75 @@ type YoutubeVideo struct {
|
|||
publishedAt time.Time
|
||||
dir string
|
||||
youtubeInfo *youtube.Video
|
||||
youtubeChannelID string
|
||||
tags []string
|
||||
awsConfig aws.Config
|
||||
thumbnailURL string
|
||||
lbryChannelID string
|
||||
mocked bool
|
||||
walletLock *sync.RWMutex
|
||||
}
|
||||
|
||||
func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPosition int64) *YoutubeVideo {
|
||||
const reflectorURL = "http://blobs.lbry.io/"
|
||||
|
||||
var youtubeCategories = map[string]string{
|
||||
"1": "film & animation",
|
||||
"2": "autos & vehicles",
|
||||
"10": "music",
|
||||
"15": "pets & animals",
|
||||
"17": "sports",
|
||||
"18": "short movies",
|
||||
"19": "travel & events",
|
||||
"20": "gaming",
|
||||
"21": "videoblogging",
|
||||
"22": "people & blogs",
|
||||
"23": "comedy",
|
||||
"24": "entertainment",
|
||||
"25": "news & politics",
|
||||
"26": "howto & style",
|
||||
"27": "education",
|
||||
"28": "science & technology",
|
||||
"29": "nonprofits & activism",
|
||||
"30": "movies",
|
||||
"31": "anime/animation",
|
||||
"32": "action/adventure",
|
||||
"33": "classics",
|
||||
"34": "comedy",
|
||||
"35": "documentary",
|
||||
"36": "drama",
|
||||
"37": "family",
|
||||
"38": "foreign",
|
||||
"39": "horror",
|
||||
"40": "sci-fi/fantasy",
|
||||
"41": "thriller",
|
||||
"42": "shorts",
|
||||
"43": "shows",
|
||||
"44": "trailers",
|
||||
}
|
||||
|
||||
func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPosition int64, awsConfig aws.Config) *YoutubeVideo {
|
||||
publishedAt, _ := time.Parse(time.RFC3339Nano, videoData.Snippet.PublishedAt) // ignore parse errors
|
||||
return &YoutubeVideo{
|
||||
id: videoData.Id,
|
||||
title: videoData.Snippet.Title,
|
||||
description: videoData.Snippet.Description,
|
||||
channelTitle: videoData.Snippet.ChannelTitle,
|
||||
playlistPosition: playlistPosition,
|
||||
publishedAt: publishedAt,
|
||||
dir: directory,
|
||||
youtubeInfo: videoData,
|
||||
awsConfig: awsConfig,
|
||||
mocked: false,
|
||||
youtubeChannelID: videoData.Snippet.ChannelId,
|
||||
}
|
||||
}
|
||||
func NewMockedVideo(directory string, videoID string, youtubeChannelID string, awsConfig aws.Config) *YoutubeVideo {
|
||||
return &YoutubeVideo{
|
||||
id: videoID,
|
||||
playlistPosition: 0,
|
||||
dir: directory,
|
||||
awsConfig: awsConfig,
|
||||
mocked: true,
|
||||
youtubeChannelID: youtubeChannelID,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -65,6 +126,9 @@ func (v *YoutubeVideo) IDAndNum() string {
|
|||
}
|
||||
|
||||
func (v *YoutubeVideo) PublishedAt() time.Time {
|
||||
if v.mocked {
|
||||
return time.Unix(0, 0)
|
||||
}
|
||||
return v.publishedAt
|
||||
}
|
||||
|
||||
|
@ -98,10 +162,40 @@ func (v *YoutubeVideo) getFullPath() string {
|
|||
func (v *YoutubeVideo) getAbbrevDescription() string {
|
||||
maxLines := 10
|
||||
description := strings.TrimSpace(v.description)
|
||||
if strings.Count(description, "\n") < maxLines {
|
||||
return description
|
||||
additionalDescription := "\nhttps://www.youtube.com/watch?v=" + v.id
|
||||
khanAcademyClaimID := "5fc52291980268b82413ca4c0ace1b8d749f3ffb"
|
||||
if v.lbryChannelID == khanAcademyClaimID {
|
||||
additionalDescription = additionalDescription + "\nNote: All Khan Academy content is available for free at (www.khanacademy.org)"
|
||||
}
|
||||
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
|
||||
if strings.Count(description, "\n") < maxLines {
|
||||
return description + "\n..." + additionalDescription
|
||||
}
|
||||
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." + additionalDescription
|
||||
}
|
||||
|
||||
func (v *YoutubeVideo) fallbackDownload() error {
|
||||
cmd := exec.Command("youtube-dl",
|
||||
"--no-progress",
|
||||
"-fbestvideo[ext=mp4,height<=1080,filesize<2000M]+best[ext=mp4,height<=1080,filesize<2000M]",
|
||||
"-o"+strings.TrimRight(v.getFullPath(), ".mp4"),
|
||||
"--merge-output-format",
|
||||
"mp4",
|
||||
"https://www.youtube.com/watch?v="+v.ID())
|
||||
|
||||
log.Printf("Running command and waiting for it to finish...")
|
||||
output, err := cmd.CombinedOutput()
|
||||
log.Debugln(string(output))
|
||||
if err != nil {
|
||||
log.Printf("Command finished with error: %v", errors.Err(string(output)))
|
||||
return errors.Err(err)
|
||||
}
|
||||
fi, err := os.Stat(v.getFullPath())
|
||||
if err != nil {
|
||||
return errors.Err(err)
|
||||
}
|
||||
videoSize := fi.Size()
|
||||
v.size = &videoSize
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *YoutubeVideo) download() error {
|
||||
|
@ -190,108 +284,123 @@ func (v *YoutubeVideo) download() error {
|
|||
func (v *YoutubeVideo) videoDir() string {
|
||||
return v.dir + "/" + v.id
|
||||
}
|
||||
func (v *YoutubeVideo) getDownloadedPath() (string, error) {
|
||||
files, err := ioutil.ReadDir(v.videoDir())
|
||||
log.Infoln(v.videoDir())
|
||||
if err != nil {
|
||||
err = errors.Prefix("list error", err)
|
||||
log.Errorln(err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
for _, f := range files {
|
||||
if f.IsDir() {
|
||||
continue
|
||||
}
|
||||
if strings.Contains(v.getFullPath(), strings.TrimSuffix(f.Name(), filepath.Ext(f.Name()))) {
|
||||
return v.videoDir() + "/" + f.Name(), nil
|
||||
}
|
||||
}
|
||||
return "", errors.Err("could not find any downloaded videos")
|
||||
|
||||
}
|
||||
func (v *YoutubeVideo) delete() error {
|
||||
videoPath := v.getFullPath()
|
||||
err := os.Remove(videoPath)
|
||||
videoPath, err := v.getDownloadedPath()
|
||||
if err != nil {
|
||||
log.Errorln(errors.Prefix("delete error", err))
|
||||
log.Errorln(err)
|
||||
return err
|
||||
}
|
||||
log.Debugln(v.id + " deleted from disk (" + videoPath + ")")
|
||||
return nil
|
||||
}
|
||||
err = os.Remove(videoPath)
|
||||
log.Debugf("%s deleted from disk (%s)", v.id, videoPath)
|
||||
|
||||
func (v *YoutubeVideo) triggerThumbnailSave() error {
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
|
||||
params, err := json.Marshal(map[string]string{"videoid": v.id})
|
||||
if err != nil {
|
||||
err = errors.Prefix("delete error", err)
|
||||
log.Errorln(err)
|
||||
return err
|
||||
}
|
||||
|
||||
request, err := http.NewRequest(http.MethodPut, "https://jgp4g1qoud.execute-api.us-east-1.amazonaws.com/prod/thumbnail", bytes.NewBuffer(params))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
response, err := client.Do(request)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer response.Body.Close()
|
||||
|
||||
contents, err := ioutil.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var decoded struct {
|
||||
Error int `json:"error"`
|
||||
Url string `json:"url,omitempty"`
|
||||
Message string `json:"message,omitempty"`
|
||||
}
|
||||
err = json.Unmarshal(contents, &decoded)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if decoded.Error != 0 {
|
||||
return errors.Err("error creating thumbnail: " + decoded.Message)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func strPtr(s string) *string { return &s }
|
||||
func (v *YoutubeVideo) triggerThumbnailSave() (err error) {
|
||||
thumbnail := thumbs.GetBestThumbnail(v.youtubeInfo.Snippet.Thumbnails)
|
||||
v.thumbnailURL, err = thumbs.MirrorThumbnail(thumbnail.Url, v.ID(), v.awsConfig)
|
||||
return err
|
||||
}
|
||||
|
||||
func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *namer.Namer) (*SyncSummary, error) {
|
||||
additionalDescription := "\nhttps://www.youtube.com/watch?v=" + v.id
|
||||
khanAcademyClaimID := "5fc52291980268b82413ca4c0ace1b8d749f3ffb"
|
||||
if channelID == khanAcademyClaimID {
|
||||
additionalDescription = additionalDescription + "\nNote: All Khan Academy content is available for free at (www.khanacademy.org)"
|
||||
}
|
||||
var languages []string = nil
|
||||
if v.youtubeInfo.Snippet.DefaultLanguage != "" {
|
||||
languages = []string{v.youtubeInfo.Snippet.DefaultLanguage}
|
||||
func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, params SyncParams) (*SyncSummary, error) {
|
||||
languages, locations, tags := v.getMetadata()
|
||||
|
||||
var fee *jsonrpc.Fee
|
||||
if params.Fee != nil {
|
||||
feeAmount, err := decimal.NewFromString(params.Fee.Amount)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
fee = &jsonrpc.Fee{
|
||||
FeeAddress: ¶ms.Fee.Address,
|
||||
FeeAmount: feeAmount,
|
||||
FeeCurrency: jsonrpc.Currency(params.Fee.Currency),
|
||||
}
|
||||
}
|
||||
|
||||
videoDuration, err := duration.FromString(v.youtubeInfo.ContentDetails.Duration)
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
options := jsonrpc.StreamCreateOptions{
|
||||
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{
|
||||
Title: v.title,
|
||||
Description: v.getAbbrevDescription() + additionalDescription,
|
||||
ClaimAddress: &claimAddress,
|
||||
Title: &v.title,
|
||||
Description: util.PtrToString(v.getAbbrevDescription()),
|
||||
ClaimAddress: ¶ms.ClaimAddress,
|
||||
Languages: languages,
|
||||
ThumbnailURL: strPtr("https://thumbnails.lbry.com/" + v.id),
|
||||
Tags: v.youtubeInfo.Snippet.Tags,
|
||||
ThumbnailURL: &v.thumbnailURL,
|
||||
Tags: tags,
|
||||
Locations: locations,
|
||||
},
|
||||
Author: strPtr(v.channelTitle),
|
||||
License: strPtr("Copyrighted (contact author)"),
|
||||
StreamType: &jsonrpc.StreamTypeVideo,
|
||||
ReleaseTime: util.PtrToInt64(v.publishedAt.Unix()),
|
||||
VideoDuration: util.PtrToUint64(uint64(math.Ceil(videoDuration.ToDuration().Seconds()))),
|
||||
ChannelID: &channelID,
|
||||
Fee: fee,
|
||||
License: util.PtrToString("Copyrighted (contact publisher)"),
|
||||
ReleaseTime: util.PtrToInt64(v.publishedAt.Unix()),
|
||||
ChannelID: &v.lbryChannelID,
|
||||
}
|
||||
|
||||
return publishAndRetryExistingNames(daemon, v.title, v.getFullPath(), amount, options, namer)
|
||||
downloadPath, err := v.getDownloadedPath()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return publishAndRetryExistingNames(daemon, v.title, downloadPath, params.Amount, options, params.Namer, v.walletLock)
|
||||
}
|
||||
|
||||
func (v *YoutubeVideo) Size() *int64 {
|
||||
return v.size
|
||||
}
|
||||
|
||||
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *namer.Namer, maxVideoLength float64) (*SyncSummary, error) {
|
||||
v.maxVideoSize = int64(maxVideoSize) * 1024 * 1024
|
||||
v.maxVideoLength = maxVideoLength
|
||||
//download and thumbnail can be done in parallel
|
||||
type SyncParams struct {
|
||||
ClaimAddress string
|
||||
Amount float64
|
||||
ChannelID string
|
||||
MaxVideoSize int
|
||||
Namer *namer.Namer
|
||||
MaxVideoLength float64
|
||||
Fee *sdk.Fee
|
||||
}
|
||||
|
||||
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, params SyncParams, existingVideoData *sdk.SyncedVideo, reprocess bool, walletLock *sync.RWMutex) (*SyncSummary, error) {
|
||||
v.maxVideoSize = int64(params.MaxVideoSize) * 1024 * 1024
|
||||
v.maxVideoLength = params.MaxVideoLength
|
||||
v.lbryChannelID = params.ChannelID
|
||||
v.walletLock = walletLock
|
||||
if reprocess && existingVideoData != nil && existingVideoData.Published {
|
||||
summary, err := v.reprocess(daemon, params, existingVideoData)
|
||||
return summary, errors.Prefix("upgrade failed", err)
|
||||
}
|
||||
return v.downloadAndPublish(daemon, params)
|
||||
}
|
||||
|
||||
func (v *YoutubeVideo) downloadAndPublish(daemon *jsonrpc.Client, params SyncParams) (*SyncSummary, error) {
|
||||
err := v.download()
|
||||
if err != nil {
|
||||
return nil, errors.Prefix("download error", err)
|
||||
log.Errorf("standard downloader failed: %s. Trying fallback downloader\n", err.Error())
|
||||
fallBackErr := v.fallbackDownload()
|
||||
if fallBackErr != nil {
|
||||
log.Errorf("fallback downloader failed: %s\n", fallBackErr.Error())
|
||||
return nil, errors.Prefix("download error", err) //return original error
|
||||
}
|
||||
}
|
||||
log.Debugln("Downloaded " + v.id)
|
||||
|
||||
|
@ -301,9 +410,143 @@ func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount
|
|||
}
|
||||
log.Debugln("Created thumbnail for " + v.id)
|
||||
|
||||
summary, err := v.publish(daemon, claimAddress, amount, channelID, namer)
|
||||
summary, err := v.publish(daemon, params)
|
||||
//delete the video in all cases (and ignore the error)
|
||||
_ = v.delete()
|
||||
|
||||
return summary, errors.Prefix("publish error", err)
|
||||
}
|
||||
|
||||
func (v *YoutubeVideo) getMetadata() (languages []string, locations []jsonrpc.Location, tags []string) {
|
||||
languages = nil
|
||||
locations = nil
|
||||
tags = nil
|
||||
if !v.mocked {
|
||||
if v.youtubeInfo.Snippet.DefaultLanguage != "" {
|
||||
languages = []string{v.youtubeInfo.Snippet.DefaultLanguage}
|
||||
}
|
||||
|
||||
if v.youtubeInfo.RecordingDetails != nil && v.youtubeInfo.RecordingDetails.Location != nil {
|
||||
locations = []jsonrpc.Location{{
|
||||
Latitude: util.PtrToString(fmt.Sprintf("%.7f", v.youtubeInfo.RecordingDetails.Location.Latitude)),
|
||||
Longitude: util.PtrToString(fmt.Sprintf("%.7f", v.youtubeInfo.RecordingDetails.Location.Longitude)),
|
||||
}}
|
||||
}
|
||||
tags = v.youtubeInfo.Snippet.Tags
|
||||
}
|
||||
tags, err := tagsManager.SanitizeTags(tags, v.youtubeChannelID)
|
||||
if err != nil {
|
||||
log.Errorln(err.Error())
|
||||
}
|
||||
if !v.mocked {
|
||||
tags = append(tags, youtubeCategories[v.youtubeInfo.Snippet.CategoryId])
|
||||
}
|
||||
|
||||
return languages, locations, tags
|
||||
}
|
||||
|
||||
func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, existingVideoData *sdk.SyncedVideo) (*SyncSummary, error) {
|
||||
c, err := daemon.ClaimSearch(nil, &existingVideoData.ClaimID, nil, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
if len(c.Claims) == 0 {
|
||||
return nil, errors.Err("cannot reprocess: no claim found for this video")
|
||||
} else if len(c.Claims) > 1 {
|
||||
return nil, errors.Err("cannot reprocess: too many claims. claimID: %s", existingVideoData.ClaimID)
|
||||
}
|
||||
|
||||
currentClaim := c.Claims[0]
|
||||
languages, locations, tags := v.getMetadata()
|
||||
|
||||
thumbnailURL := ""
|
||||
if currentClaim.Value.GetThumbnail() == nil {
|
||||
if v.mocked {
|
||||
return nil, errors.Err("could not find thumbnail for mocked video")
|
||||
}
|
||||
thumbnail := thumbs.GetBestThumbnail(v.youtubeInfo.Snippet.Thumbnails)
|
||||
thumbnailURL, err = thumbs.MirrorThumbnail(thumbnail.Url, v.ID(), v.awsConfig)
|
||||
} else {
|
||||
thumbnailURL = thumbs.ThumbnailEndpoint + v.ID()
|
||||
}
|
||||
|
||||
videoSize, err := currentClaim.GetStreamSizeByMagic()
|
||||
if err != nil {
|
||||
if existingVideoData.Size > 0 {
|
||||
videoSize = uint64(existingVideoData.Size)
|
||||
} else {
|
||||
log.Infof("%s: the video must be republished as we can't get the right size", v.ID())
|
||||
//return v.downloadAndPublish(daemon, params) //TODO: actually republish the video. NB: the current claim should be abandoned first
|
||||
return nil, errors.Err("the video must be republished as we can't get the right size")
|
||||
}
|
||||
}
|
||||
v.size = util.PtrToInt64(int64(videoSize))
|
||||
var fee *jsonrpc.Fee
|
||||
if params.Fee != nil {
|
||||
feeAmount, err := decimal.NewFromString(params.Fee.Amount)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
fee = &jsonrpc.Fee{
|
||||
FeeAddress: ¶ms.Fee.Address,
|
||||
FeeAmount: feeAmount,
|
||||
FeeCurrency: jsonrpc.Currency(params.Fee.Currency),
|
||||
}
|
||||
}
|
||||
streamCreateOptions := &jsonrpc.StreamCreateOptions{
|
||||
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{
|
||||
Tags: tags,
|
||||
ThumbnailURL: &thumbnailURL,
|
||||
Languages: languages,
|
||||
Locations: locations,
|
||||
},
|
||||
Author: util.PtrToString(""),
|
||||
License: util.PtrToString("Copyrighted (contact publisher)"),
|
||||
ChannelID: &v.lbryChannelID,
|
||||
Height: util.PtrToUint(720),
|
||||
Width: util.PtrToUint(1280),
|
||||
Fee: fee,
|
||||
}
|
||||
|
||||
v.walletLock.RLock()
|
||||
defer v.walletLock.RUnlock()
|
||||
if v.mocked {
|
||||
pr, err := daemon.StreamUpdate(existingVideoData.ClaimID, jsonrpc.StreamUpdateOptions{
|
||||
StreamCreateOptions: streamCreateOptions,
|
||||
FileSize: &videoSize,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &SyncSummary{
|
||||
ClaimID: pr.Outputs[0].ClaimID,
|
||||
ClaimName: pr.Outputs[0].Name,
|
||||
}, nil
|
||||
}
|
||||
|
||||
videoDuration, err := duration.FromString(v.youtubeInfo.ContentDetails.Duration)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
|
||||
streamCreateOptions.ClaimCreateOptions.Title = &v.title
|
||||
streamCreateOptions.ClaimCreateOptions.Description = util.PtrToString(v.getAbbrevDescription())
|
||||
streamCreateOptions.Duration = util.PtrToUint64(uint64(math.Ceil(videoDuration.ToDuration().Seconds())))
|
||||
streamCreateOptions.ReleaseTime = util.PtrToInt64(v.publishedAt.Unix())
|
||||
pr, err := daemon.StreamUpdate(existingVideoData.ClaimID, jsonrpc.StreamUpdateOptions{
|
||||
ClearLanguages: util.PtrToBool(true),
|
||||
ClearLocations: util.PtrToBool(true),
|
||||
ClearTags: util.PtrToBool(true),
|
||||
StreamCreateOptions: streamCreateOptions,
|
||||
FileSize: &videoSize,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &SyncSummary{
|
||||
ClaimID: pr.Outputs[0].ClaimID,
|
||||
ClaimName: pr.Outputs[0].Name,
|
||||
}, nil
|
||||
}
|
||||
|
|
152
splitter.py
152
splitter.py
|
@ -1,152 +0,0 @@
|
|||
import os
|
||||
import sys
|
||||
from decimal import Decimal
|
||||
from bitcoinrpc.authproxy import AuthServiceProxy
|
||||
|
||||
from lbryum.wallet import Wallet, WalletStorage
|
||||
from lbryum.commands import known_commands, Commands
|
||||
from lbryum.simple_config import SimpleConfig
|
||||
from lbryum.blockchain import get_blockchain
|
||||
from lbryum.network import Network
|
||||
|
||||
|
||||
def get_lbrycrdd_connection_string(wallet_conf):
|
||||
settings = {"username": "rpcuser",
|
||||
"password": "rpcpassword",
|
||||
"rpc_port": 9245}
|
||||
if wallet_conf and os.path.exists(wallet_conf):
|
||||
with open(wallet_conf, "r") as conf:
|
||||
conf_lines = conf.readlines()
|
||||
for l in conf_lines:
|
||||
if l.startswith("rpcuser="):
|
||||
settings["username"] = l[8:].rstrip('\n')
|
||||
if l.startswith("rpcpassword="):
|
||||
settings["password"] = l[12:].rstrip('\n')
|
||||
if l.startswith("rpcport="):
|
||||
settings["rpc_port"] = int(l[8:].rstrip('\n'))
|
||||
|
||||
rpc_user = settings["username"]
|
||||
rpc_pass = settings["password"]
|
||||
rpc_port = settings["rpc_port"]
|
||||
rpc_url = "127.0.0.1"
|
||||
return "http://%s:%s@%s:%i" % (rpc_user, rpc_pass, rpc_url, rpc_port)
|
||||
|
||||
|
||||
class LBRYumWallet(object):
|
||||
def __init__(self, lbryum_path):
|
||||
self.config = SimpleConfig()
|
||||
self.config.set_key('chain', 'lbrycrd_main')
|
||||
self.storage = WalletStorage(lbryum_path)
|
||||
self.wallet = Wallet(self.storage)
|
||||
self.cmd_runner = Commands(self.config, self.wallet, None)
|
||||
if not self.wallet.has_seed():
|
||||
seed = self.wallet.make_seed()
|
||||
self.wallet.add_seed(seed, "derp")
|
||||
self.wallet.create_master_keys("derp")
|
||||
self.wallet.create_main_account()
|
||||
self.wallet.update_password("derp", "")
|
||||
self.network = Network(self.config)
|
||||
self.blockchain = get_blockchain(self.config, self.network)
|
||||
print self.config.get('chain'), self.blockchain
|
||||
self.wallet.storage.write()
|
||||
|
||||
def command(self, command_name, *args, **kwargs):
|
||||
cmd_runner = Commands(self.config, self.wallet, None)
|
||||
cmd = known_commands[command_name]
|
||||
func = getattr(cmd_runner, cmd.name)
|
||||
return func(*args, **kwargs)
|
||||
|
||||
def generate_address(self):
|
||||
address = self.wallet.create_new_address()
|
||||
self.wallet.storage.write()
|
||||
return address
|
||||
|
||||
|
||||
class LBRYcrd(object):
|
||||
def __init__(self, lbrycrdd_path):
|
||||
self.lbrycrdd_conn_str = get_lbrycrdd_connection_string(lbrycrdd_path)
|
||||
|
||||
def __call__(self, method, *args, **kwargs):
|
||||
return self.rpc(method)(*args, **kwargs)
|
||||
|
||||
def rpc(self, method):
|
||||
return AuthServiceProxy(self.lbrycrdd_conn_str, service_name=method)
|
||||
|
||||
|
||||
def get_wallet_path():
|
||||
cwd = os.getcwd()
|
||||
wallet_path = os.path.join(cwd, "wallet.json")
|
||||
if not os.path.exists(wallet_path):
|
||||
return wallet_path
|
||||
i = 1
|
||||
while True:
|
||||
wallet_path = os.path.join(cwd, "wallet_%i.json" % i)
|
||||
if not os.path.exists(wallet_path):
|
||||
return wallet_path
|
||||
i += 1
|
||||
|
||||
|
||||
def coin_chooser(lbrycrdd, amount, fee=0.001):
|
||||
def iter_txis():
|
||||
unspent = lbrycrdd("listunspent")
|
||||
unspent = sorted(unspent, key=lambda x: x['amount'], reverse=True)
|
||||
spendable = Decimal(0.0)
|
||||
for txi in unspent:
|
||||
if spendable >= amount:
|
||||
break
|
||||
else:
|
||||
spendable += txi['amount']
|
||||
yield txi
|
||||
if spendable < amount:
|
||||
print spendable, amount
|
||||
raise Exception("Not enough funds")
|
||||
|
||||
coins = list(iter(iter_txis()))
|
||||
total = sum(c['amount'] for c in coins)
|
||||
change = Decimal(total) - Decimal(amount) - Decimal(fee)
|
||||
|
||||
if change < 0:
|
||||
raise Exception("Not enough funds")
|
||||
if change:
|
||||
change_address = lbrycrdd("getnewaddress")
|
||||
else:
|
||||
change_address = None
|
||||
|
||||
print "Total: %f, amount: %f, change: %f" % (total, amount, change)
|
||||
|
||||
return coins, change, change_address
|
||||
|
||||
|
||||
def get_raw_tx(lbrycrdd, addresses, coins, amount, change, change_address):
|
||||
txi = [{'txid': c['txid'], 'vout': c['vout']} for c in coins]
|
||||
txo = {address: float(amount) for address in addresses}
|
||||
if change_address:
|
||||
txo[change_address] = float(change)
|
||||
return lbrycrdd("createrawtransaction", txi, txo)
|
||||
|
||||
|
||||
def main(count, value=None, lbryum_path=None, lbrycrdd_path=None):
|
||||
count = int(count)
|
||||
lbryum_path = lbryum_path or get_wallet_path()
|
||||
if sys.platform == "darwin":
|
||||
default_lbrycrdd = os.path.join(os.path.expanduser("~"),
|
||||
"Library/Application Support/lbrycrd/lbrycrd.conf")
|
||||
else:
|
||||
default_lbrycrdd = os.path.join(os.path.expanduser("~"), ".lbrycrd/lbrycrd.conf")
|
||||
lbrycrdd_path = lbrycrdd_path or default_lbrycrdd
|
||||
l = LBRYcrd(lbrycrdd_path=lbrycrdd_path)
|
||||
s = LBRYumWallet(lbryum_path)
|
||||
value = value or 1.0
|
||||
value = Decimal(value)
|
||||
|
||||
coins, change, change_address = coin_chooser(l, count * value)
|
||||
addresses = [s.generate_address() for i in range(count)]
|
||||
raw_tx = get_raw_tx(l, addresses, coins, value, change, change_address)
|
||||
signed = l("signrawtransaction", raw_tx)['hex']
|
||||
txid = l("sendrawtransaction", signed)
|
||||
print txid
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
args = sys.argv[1:]
|
||||
main(*args)
|
1486
tagsManager/tags_mapping.go
Normal file
1486
tagsManager/tags_mapping.go
Normal file
File diff suppressed because it is too large
Load diff
144
tagsManager/tags_mapping_test.go
Normal file
144
tagsManager/tags_mapping_test.go
Normal file
|
@ -0,0 +1,144 @@
|
|||
package tagsManager
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSanitizeTags(t *testing.T) {
|
||||
got, err := SanitizeTags([]string{"this", "super", "expensive", "test", "has", "a lot of", "crypto", "currency", "in it", "trump", "will build the", "wall"}, "UCNQfQvFMPnInwsU_iGYArJQ")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
expectedTags := []string{
|
||||
"blockchain",
|
||||
"switzerland",
|
||||
"news",
|
||||
"science & technology",
|
||||
"economics",
|
||||
"experiments",
|
||||
"this",
|
||||
"in it",
|
||||
"will build the",
|
||||
"has",
|
||||
"crypto",
|
||||
"trump",
|
||||
"wall",
|
||||
"expensive",
|
||||
"currency",
|
||||
"a lot of",
|
||||
}
|
||||
if len(expectedTags) != len(got) {
|
||||
t.Error("number of tags differ")
|
||||
return
|
||||
}
|
||||
outer:
|
||||
for _, et := range expectedTags {
|
||||
for _, t := range got {
|
||||
if et == t {
|
||||
continue outer
|
||||
}
|
||||
}
|
||||
t.Error("tag not found")
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
func TestNormalizeTag(t *testing.T) {
|
||||
tags := []string{
|
||||
"blockchain",
|
||||
"Switzerland",
|
||||
"news ",
|
||||
" science & Technology ",
|
||||
"economics",
|
||||
"experiments",
|
||||
"this",
|
||||
"in it",
|
||||
"will build the (WOOPS)",
|
||||
"~has",
|
||||
"crypto",
|
||||
"trump",
|
||||
"wall",
|
||||
"expensive",
|
||||
"!currency",
|
||||
" a lot of ",
|
||||
"#",
|
||||
"#whatever",
|
||||
"#123",
|
||||
"#123 Something else",
|
||||
"#123aaa",
|
||||
"!asdasd",
|
||||
"CASA BLANCA",
|
||||
"wwe 2k18 Elimination chamber!",
|
||||
"pero'",
|
||||
"però",
|
||||
"è proprio",
|
||||
"Ep 29",
|
||||
"sctest29 Keddr",
|
||||
"mortal kombat 11 shang tsung",
|
||||
"!asdasd!",
|
||||
}
|
||||
normalizedTags := make([]string, 0, len(tags))
|
||||
for _, tag := range tags {
|
||||
got, err := normalizeTag(tag)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if got != "" {
|
||||
normalizedTags = append(normalizedTags, got)
|
||||
}
|
||||
fmt.Printf("Got tag: '%s'\n", got)
|
||||
}
|
||||
expected := []string{
|
||||
"blockchain",
|
||||
"switzerland",
|
||||
"news",
|
||||
"science & technology",
|
||||
"economics",
|
||||
"experiments",
|
||||
"this",
|
||||
"in it",
|
||||
"will build the",
|
||||
"has",
|
||||
"crypto",
|
||||
"trump",
|
||||
"wall",
|
||||
"expensive",
|
||||
"currency",
|
||||
"a lot of",
|
||||
"whatever",
|
||||
"123",
|
||||
"something else",
|
||||
"123aaa",
|
||||
"asdasd",
|
||||
"casa blanca",
|
||||
"wwe 2k18 elimination chamber",
|
||||
"pero",
|
||||
"però",
|
||||
"è proprio",
|
||||
"ep 29",
|
||||
"sctest29 keddr",
|
||||
"mortal kombat 11 shang tsung",
|
||||
"asdasd",
|
||||
}
|
||||
if !Equal(normalizedTags, expected) {
|
||||
t.Error("result not as expected")
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
func Equal(a, b []string) bool {
|
||||
if len(a) != len(b) {
|
||||
fmt.Printf("expected length %d but got %d", len(b), len(a))
|
||||
return false
|
||||
}
|
||||
for i, v := range a {
|
||||
if v != b[i] {
|
||||
fmt.Printf("expected %s but bot %s\n", b[i], v)
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
package thumbs
|
||||
|
||||
import (
|
||||
"google.golang.org/api/youtube/v3"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
|
@ -21,6 +22,7 @@ type thumbnailUploader struct {
|
|||
}
|
||||
|
||||
const thumbnailPath = "/tmp/ytsync_thumbnails/"
|
||||
const ThumbnailEndpoint = "https://thumbnails.lbry.com/"
|
||||
|
||||
func (u *thumbnailUploader) downloadThumbnail() error {
|
||||
_ = os.Mkdir(thumbnailPath, 0750)
|
||||
|
@ -44,7 +46,7 @@ func (u *thumbnailUploader) downloadThumbnail() error {
|
|||
}
|
||||
|
||||
func (u *thumbnailUploader) uploadThumbnail() error {
|
||||
key := aws.String("/thumbnails/" + u.name)
|
||||
key := &u.name
|
||||
thumb, err := os.Open("/tmp/ytsync_thumbnails/" + u.name)
|
||||
if err != nil {
|
||||
return errors.Err(err)
|
||||
|
@ -62,8 +64,9 @@ func (u *thumbnailUploader) uploadThumbnail() error {
|
|||
Bucket: aws.String("thumbnails.lbry.com"),
|
||||
Key: key,
|
||||
Body: thumb,
|
||||
ACL: aws.String("public-read"),
|
||||
})
|
||||
u.mirroredUrl = "https://thumbnails.lbry.com/" + u.name
|
||||
u.mirroredUrl = ThumbnailEndpoint + u.name
|
||||
return errors.Err(err)
|
||||
}
|
||||
|
||||
|
@ -92,3 +95,16 @@ func MirrorThumbnail(url string, name string, s3Config aws.Config) (string, erro
|
|||
|
||||
return tu.mirroredUrl, nil
|
||||
}
|
||||
|
||||
func GetBestThumbnail(thumbnails *youtube.ThumbnailDetails) *youtube.Thumbnail {
|
||||
if thumbnails.Maxres != nil {
|
||||
return thumbnails.Maxres
|
||||
} else if thumbnails.High != nil {
|
||||
return thumbnails.High
|
||||
} else if thumbnails.Medium != nil {
|
||||
return thumbnails.Medium
|
||||
} else if thumbnails.Standard != nil {
|
||||
return thumbnails.Standard
|
||||
}
|
||||
return thumbnails.Default
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue