Add support for new SDK (0.37.*) and support for upgrading channels and claims to new metadata #28

Merged
nikooo777 merged 37 commits from metadata_upgrade into master 2019-06-13 20:14:14 +02:00
16 changed files with 2395 additions and 704 deletions

View file

@ -9,7 +9,7 @@ env:
global:
- GO111MODULE=on
#GITHUB_TOKEN
- secure: "+G0TxyGyRYJb0L0qCCn6NPregigwhalkS9+/3mkV41xoAwUlEU9aclSJQVKKqeMrV0PkmJ6KrzYD78tnRj8Q5ZDe93LBDbc/WhXZ91JbRmIXIsVktz/78R2a2Q0DBrXMlyWXS84QouRfI3eyxf2BusERSYdr3H6BL4r15PIHJEmLGOchHsHwSI46PRGag+IFnbUb+fyUSs7er6gKeSp4+ZK1bSUgxiMUQrZfImYI9LS+cgqpDh67pX5jRbDNBihZZUwfreToYw3RbIAom4i2oix1rBBD/jpcTwO+WdyXFtur8RYGnsknePU88KiAB9bcXU4T2xVxR9wusK6jX5G2BChEOG9PjuVfeDqXBnskOEtAevwAKJRYW6ATmXqQZ+TAy1DFxsUWPVS7ykIA+69n7HzE7xu50AB1U2n6myT977i4jhBzw3VrB9NQz//4b2j6MCG1PhIgB38kffQWVsa/CF4RiiWi6a7ePRsPP0HdRUbXFQJWPevKgVJ9ve/ArQKiea1rP9zy9KdFAP30vESBkn4WMOCjfg7F6JbfV3e1ywBq9XMznj14iAZHwXR1tpPp0GekuMlVJHJWa0Ea/dfDGWshnr1Be71vc9T118w7sd0v1Dp4EQCFa4B0Hx7Ual7uWgeuweL/JLr1Am3BgWVcXOWrhzMT0b1tGFa9ZNz2Wwk="
- secure: "Ps3KocRP5xnM3/uA99CeYhDTVxRIuW7fGyrtqBeRWZW0cXzeA4XCTKxqcFbrPUPw67XkrBVgE58JDdWoQEJ7tm67PjMm/ltp5Evhx/QAJDh+YSofXyGDVpG1mrTZFI66R3NVVJLkSGALMkuWWXvfYZeU//AworJbyRoaIK/CVt5OP23i5N4tdd5UXc5dfLuYqnKRynyMmCkz9c3yEIQMXoPhG2hx7l7L2BeMJvcKmVhkSN7nQayjnrbUXGm/IRqrb88lvkyBevN5E3IB2V5IKEieIPZjbD/N0IfcnAt89Z96tgDhtIbx3ZvXm92lsvHA8buqQpG9d2AmSi6GKs64lQcnGeM5o0wER2JHWl1OSa1Nr/UAo5Xb/PM65Yt3yZE8AuMKHBmbfDSBzdkTXx58AeDzFUd3kMXD/fFjeQQWyXFlOss3ygH9SObl827Txmz9OJqZaxabs5Q3AP6m3EjKjz7zfLfrgpcxJM2WBiU1bN0ZxUgZkImy/CHk5gCZ7vhcnaLiDO4HZnzY/aRJwKYQPE5i0O2nHpIfovqkc0DFBA7U/7Cjin7e1E0UZvF3meLOxMqkfc6X7QTxqQpt2Tej6jlpdxw4CTLwGUhGkAw9IAPkUB3L0EbZ1/ksGhNvGDvUeSTq8hYdMAPmA+k9jS6653V4SQ+qBMy5++tbr5AeZQI="
deploy:
provider: script
skip_cleanup: true

View file

@ -55,7 +55,7 @@ Usage:
Flags:
--after int Specify from when to pull jobs [Unix time](Default: 0)
--before int Specify until when to pull jobs [Unix time](Default: current Unix time) (default 1582834707)
--before int Specify until when to pull jobs [Unix time](Default: current Unix time) (default current timestamp)
--channelID string If specified, only this channel will be synced.
--concurrent-jobs int how many jobs to process concurrently (default 1)
-h, --help help for ytsync
@ -63,12 +63,14 @@ Flags:
--max-length float Maximum video length to process (in hours) (default 2)
--max-size int Maximum video size to process (in MB) (default 2048)
--max-tries int Number of times to try a publish that fails (default 3)
--remove-db-unpublished Remove videos from the database that are marked as published but aren't really published
--run-once Whether the process should be stopped after one cycle or not
--skip-space-check Do not perform free space check on startup
--status string Specify which queue to pull from. Overrides --update
--stop-on-error If a publish fails, stop all publishing and exit
--takeover-existing-channel If channel exists and we don't own it, take over the channel
--update Update previously synced channels instead of syncing new ones
--upgrade-metadata Upgrade videos if they're on the old metadata version
--videos-limit int how many videos to process per channel (default 1000)
```

5
go.mod
View file

@ -5,8 +5,6 @@ require (
github.com/ChannelMeter/iso8601duration v0.0.0-20150204201828-8da3af7a2a61
github.com/PuerkitoBio/goquery v1.5.0 // indirect
github.com/aws/aws-sdk-go v1.17.3
github.com/btcsuite/btcd v0.0.0-20190410025418-9bfb2ca0346b // indirect
github.com/btcsuite/btcutil v0.0.0-20190316010144-3ac1210f4b38 // indirect
github.com/channelmeter/iso8601duration v0.0.0-20150204201828-8da3af7a2a61 // indirect
github.com/go-ini/ini v1.42.0 // indirect
github.com/golang/protobuf v1.3.1 // indirect
@ -17,7 +15,7 @@ require (
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c
github.com/lbryio/lbry.go v0.0.0-20190422142237-ad33acfc936f
github.com/lbryio/lbry.go v1.0.14
github.com/lusis/slack-test v0.0.0-20190408224659-6cf59653add2 // indirect
github.com/mitchellh/go-ps v0.0.0-20170309133038-4fdf99ab2936
github.com/mitchellh/mapstructure v1.1.2 // indirect
@ -36,7 +34,6 @@ require (
golang.org/x/crypto v0.0.0-20190418165655-df01cb2cc480 // indirect
golang.org/x/net v0.0.0-20190415214537-1da14a5a36f2 // indirect
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a // indirect
golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be // indirect
google.golang.org/api v0.3.2
google.golang.org/appengine v1.5.0 // indirect
google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7 // indirect

42
go.sum
View file

@ -9,6 +9,7 @@ github.com/PuerkitoBio/goquery v1.5.0 h1:uGvmFXOA73IKluu/F84Xd1tt/z07GYm8X49XKHP
github.com/PuerkitoBio/goquery v1.5.0/go.mod h1:qD2PgZ9lccMbQlc7eEOjaeRlFQON7xY8kdmcsrnKqMg=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/aead/siphash v1.0.1 h1:FwHfE/T45KPKYuuSAKyyvE+oPWcaQ+CUmFW0bPlM+kg=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@ -17,23 +18,22 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/andybalholm/cascadia v1.0.0 h1:hOCXnnZ5A+3eVDX8pvgl4kofXv2ELss0bKcqRySc45o=
github.com/andybalholm/cascadia v1.0.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf h1:eg0MeVzsP1G42dRafH3vf+al2vQIJU0YHX+1Tw87oco=
github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/aws/aws-sdk-go v1.17.3 h1:KBXxg7Jh0TxE5zmpNB2DwKmJeDUqh0O6jhy25TuYOmc=
github.com/aws/aws-sdk-go v1.17.3/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/btcsuite/btcd v0.0.0-20180531025944-86fed781132a/go.mod h1:Dmm/EzmjnCiweXmzRIAiUWCInVmPgjkzgv5k4tVyXiQ=
github.com/btcsuite/btcd v0.0.0-20190410025418-9bfb2ca0346b h1:7J7sEce3LgtbMgs7PKcN61gF3b4txM6SjaRoJTSk640=
github.com/btcsuite/btcd v0.0.0-20190410025418-9bfb2ca0346b/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f h1:bAs4lUbRJpnnkd9VhRV3jjAVU7DJVjMaK+IsvSeZvFo=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
github.com/btcsuite/btcutil v0.0.0-20180524032703-d4cc87b86016/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803 h1:j3AgPKKZtZStM2nyhrDSLSYgT7YHrZKdSkq1OYeLjvM=
github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
github.com/btcsuite/btcutil v0.0.0-20190316010144-3ac1210f4b38 h1:GbQHMJ2u/geMPV1tbN7i7zARSoPAPuXWa44V0KYvJXU=
github.com/btcsuite/btcutil v0.0.0-20190316010144-3ac1210f4b38/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd h1:R/opQEbFEy9JGkIguV40SvRY1uliPX8ifOvi6ICsFCw=
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg=
github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd h1:qdGvebPBDuYDPGi1WCPjy1tGyMpmDK8IEapSsszn7HE=
github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY=
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723 h1:ZA/jbKoGcVAnER6pCHPEkGdZOV7U1oLUedErBHCUMs0=
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3SkEwmHoWBmX1DNXhXZqlTpq6s4tyJGc=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
@ -60,6 +60,7 @@ github.com/go-ini/ini v1.42.0 h1:TWr1wGj35+UiWHlBA8er89seFXxzwFn11spilrrj+38=
github.com/go-ini/ini v1.42.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-ozzo/ozzo-validation v3.5.0+incompatible h1:sUy/in/P6askYr16XJgTKq/0SZhiWsdg4WZGaLsGQkM=
github.com/go-ozzo/ozzo-validation v3.5.0+incompatible/go.mod h1:gsEKFIVnabGBt6mXmxK0MoFy+cZoTJY6mu5Ll3LVLBU=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
@ -101,9 +102,11 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89 h1:12K8AlpT0/6QUXSfV0yi4Q0jkbq8NDtIKFtF61AoqV0=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jtolds/gls v4.2.1+incompatible h1:fSuqC+Gmlu6l/ZYAoZzx2pyucC8Xza35fpRVWLVmUEE=
@ -112,6 +115,7 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 h1:FOOIBWrEkLgmlgGfMuZT83xIwfPDxEI2OHu6xUmJMFE=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@ -125,17 +129,12 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c h1:BhdcWGsuKif/XoSZnqVGNqJ1iEmH0czWR5upj+AuR8M=
github.com/lbryio/errors.go v0.0.0-20180223142025-ad03d3cc6a5c/go.mod h1:muH7wpUqE8hRA3OrYYosw9+Sl681BF9cwcjzE+OCNK8=
github.com/lbryio/lbry.go v0.0.0-20190419005332-80b25b225e18 h1:ZhWjtvaq5r7julhcF9OSgx4bLv9UsdIx27zH1/fbrEc=
github.com/lbryio/lbry.go v0.0.0-20190419005332-80b25b225e18/go.mod h1:kd08aOMCuBVYJ3EafY4Kx3dRAWWQYhobJ9beREgsaRI=
github.com/lbryio/lbry.go v0.0.0-20190422142237-ad33acfc936f h1:o6EZ7bAdYrS6pKp85SEr6Ywyy2JDJS1CY3ChkVsvSM4=
github.com/lbryio/lbry.go v0.0.0-20190422142237-ad33acfc936f/go.mod h1:FubnMAYvLt2jGasG7BrQsokYHZ2wpNtWethPHUVauMc=
github.com/lbryio/lbryschema.go v0.0.0-20190422030648-322c658307e0 h1:/YWLlbbDefRGLs/ozyuRpvpwpFISYehwR4AAVlPthA8=
github.com/lbryio/lbryschema.go v0.0.0-20190422030648-322c658307e0/go.mod h1:dAzPCBj3CKKWBGYBZxK6tKBP5SCgY2tqd9SnQd/OyKo=
github.com/lbryio/lbry.go v1.0.14 h1:lpaO96YyP3d2RJzgl1WFkcyS15/ROd04OV3S1E5Av8E=
github.com/lbryio/lbry.go v1.0.14/go.mod h1:JtyI30bU51rm0LZ/po3mQuzf++14OWb6kR/6mMRAmKU=
github.com/lbryio/lbryschema.go v0.0.0-20190428231007-c54836bca002 h1:urfYK5ElpUrAv90auPLldoVC60LwiGAcY0OE6HJB9KI=
github.com/lbryio/lbryschema.go v0.0.0-20190428231007-c54836bca002/go.mod h1:dAzPCBj3CKKWBGYBZxK6tKBP5SCgY2tqd9SnQd/OyKo=
github.com/lbryio/ozzo-validation v0.0.0-20170323141101-d1008ad1fd04 h1:Nze+C2HbeKvhjI/kVn+9Poj/UuEW5sOQxcsxqO7L3GI=
github.com/lbryio/ozzo-validation v0.0.0-20170323141101-d1008ad1fd04/go.mod h1:fbG/dzobG8r95KzMwckXiLMHfFjZaBRQqC9hPs2XAQ4=
github.com/lbryio/types v0.0.0-20190405005919-54c3c28f676a h1:twWvrsBDvSb+qnmpSq3nvFrodgC5PpXUipyo4T/W790=
github.com/lbryio/types v0.0.0-20190405005919-54c3c28f676a/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE=
github.com/lbryio/types v0.0.0-20190415181811-35ddf1afe731 h1:iERWR8Dkng30eRInI7gzolTEJBW9nBSK/sT+Z9aSipI=
github.com/lbryio/types v0.0.0-20190415181811-35ddf1afe731/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE=
github.com/lbryio/types v0.0.0-20190422033210-321fb2abda9c h1:m3O7561xBQ00lfUVayW4c6SnpVbUDQtPUwGcGYSUYQA=
github.com/lbryio/types v0.0.0-20190422033210-321fb2abda9c/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE=
github.com/lusis/go-slackbot v0.0.0-20180109053408-401027ccfef5 h1:AsEBgzv3DhuYHI/GiQh2HxvTP71HCCE9E/tzGUzGdtU=
@ -184,11 +183,11 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/sebdah/goldie v0.0.0-20180424091453-8784dd1ab561/go.mod h1:lvjGftC8oe7XPtyrOidaMi0rp5B9+XY/ZRUynGnuaxQ=
github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shopspring/decimal v0.0.0-20180607144847-19e3cb6c2930/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/sirupsen/logrus v0.0.0-20180523074243-ea8897e79973/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
@ -198,6 +197,7 @@ github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3/go.mod h1
github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2YqsZKovtsgrwcDH1RZhVbTKCjLp47XpqCDs=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/spf13/cast v1.2.0 h1:HHl1DSRbEQN2i8tJmtS6ViPyHx35+p51amrdsiTCrkg=
github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg=
github.com/spf13/cobra v0.0.0-20190109003409-7547e83b2d85 h1:UQHWkFUuJBy5rWN1DxosG/efssLu7u0fXXSTC2HHKfQ=
github.com/spf13/cobra v0.0.0-20190109003409-7547e83b2d85/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
@ -217,7 +217,6 @@ go.opencensus.io v0.20.2 h1:NAfh7zF0/3/HqtMvJNZ/RFrSlCE6ZTlHmKfhL/Dm1Jk=
go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180608092829-8ac0e0d97ce4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190418165655-df01cb2cc480 h1:O5YqonU5IWby+w98jVUG9h7zlCWCcH4RHyPVReBmhzk=
@ -255,8 +254,8 @@ golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be h1:mI+jhqkn68ybP0ORJqunXn+fq+Eeb4hHKqLQcFICjAc=
golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190520201301-c432e742b0af h1:NXfmMfXz6JqGfG3ikSxcz2N93j6DgScr19Oo2uwFu88=
golang.org/x/sys v0.0.0-20190520201301-c432e742b0af/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2 h1:z99zHgr7hKfrUcX/KsoJk5FJfjTceCKIp96+biqP4To=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -285,7 +284,6 @@ google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.0 h1:DlsSIrgEBuZAUFJcta2B5i/lzeHHbnfkNFAfFXLVFYQ=
google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@ -293,7 +291,6 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo=
gopkg.in/ini.v1 v1.41.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk=
gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
@ -304,6 +301,7 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View file

@ -36,6 +36,8 @@ var (
videosLimit int
maxVideoSize int
maxVideoLength float64
removeDBUnpublished bool
upgradeMetadata bool
)
func main() {
@ -56,6 +58,8 @@ func main() {
cmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
cmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones")
cmd.Flags().BoolVar(&singleRun, "run-once", false, "Whether the process should be stopped after one cycle or not")
cmd.Flags().BoolVar(&removeDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published")
cmd.Flags().BoolVar(&upgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version")
cmd.Flags().StringVar(&syncStatus, "status", "", "Specify which queue to pull from. Overrides --update")
cmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.")
cmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
@ -188,6 +192,8 @@ func ytSync(cmd *cobra.Command, args []string) {
syncProperties,
apiConfig,
maxVideoLength,
removeDBUnpublished,
upgradeMetadata,
)
err := sm.Start()
if err != nil {

View file

@ -11,6 +11,9 @@ import (
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/util"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
log "github.com/sirupsen/logrus"
)
@ -37,12 +40,14 @@ type SyncManager struct {
singleRun bool
syncProperties *sdk.SyncProperties
apiConfig *sdk.APIConfig
removeDBUnpublished bool
upgradeMetadata bool
}
func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool, refill int, limit int,
skipSpaceCheck bool, syncUpdate bool, concurrentJobs int, concurrentVideos int, blobsDir string, videosLimit int,
maxVideoSize int, lbrycrdString string, awsS3ID string, awsS3Secret string, awsS3Region string, awsS3Bucket string,
syncStatus string, singleRun bool, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig, maxVideoLength float64) *SyncManager {
syncStatus string, singleRun bool, syncProperties *sdk.SyncProperties, apiConfig *sdk.APIConfig, maxVideoLength float64, removeDBUnpublished bool, upgradeMetadata bool) *SyncManager {
return &SyncManager{
stopOnError: stopOnError,
maxTries: maxTries,
@ -66,6 +71,8 @@ func NewSyncManager(stopOnError bool, maxTries int, takeOverExistingChannel bool
singleRun: singleRun,
syncProperties: syncProperties,
apiConfig: apiConfig,
removeDBUnpublished: removeDBUnpublished,
upgradeMetadata: upgradeMetadata,
}
}
@ -73,6 +80,7 @@ const (
StatusPending = "pending" // waiting for permission to sync
StatusPendingEmail = "pendingemail" // permission granted but missing email
StatusQueued = "queued" // in sync queue. will be synced soon
StatusPendingUpgrade = "pendingupgrade" // in sync queue. will be synced soon
StatusSyncing = "syncing" // syncing now
StatusSynced = "synced" // done
StatusFailed = "failed"
@ -80,11 +88,12 @@ const (
StatusAbandoned = "abandoned" // deleted on youtube or banned
)
var SyncStatuses = []string{StatusPending, StatusPendingEmail, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized, StatusAbandoned}
var SyncStatuses = []string{StatusPending, StatusPendingEmail, StatusPendingUpgrade, StatusQueued, StatusSyncing, StatusSynced, StatusFailed, StatusFinalized, StatusAbandoned}
const (
VideoStatusPublished = "published"
VideoStatusFailed = "failed"
VideoStatusUpgradeFailed = "upgradefailed"
VideoStatusUnpublished = "unpublished"
)
@ -128,10 +137,12 @@ func (s *SyncManager) Start() error {
AwsS3Region: s.awsS3Region,
AwsS3Bucket: s.awsS3Bucket,
namer: namer.NewNamer(),
Fee: channels[0].Fee,
}
shouldInterruptLoop = true
} else {
var queuesToSync []string
//TODO: implement scrambling to avoid starvation of queues
if s.syncStatus != "" {
queuesToSync = append(queuesToSync, s.syncStatus)
} else if s.syncUpdate {
@ -144,7 +155,9 @@ func (s *SyncManager) Start() error {
if err != nil {
return err
}
for _, c := range channels {
log.Infof("There are %d channels in the \"%s\" queue", len(channels), q)
if len(channels) > 0 {
c := channels[0]
syncs = append(syncs, Sync{
APIConfig: s.apiConfig,
YoutubeChannelID: c.ChannelId,
@ -162,7 +175,9 @@ func (s *SyncManager) Start() error {
AwsS3Region: s.awsS3Region,
AwsS3Bucket: s.awsS3Bucket,
namer: namer.NewNamer(),
Fee: c.Fee,
})
break
}
}
}
@ -170,9 +185,9 @@ func (s *SyncManager) Start() error {
log.Infoln("No channels to sync. Pausing 5 minutes!")
time.Sleep(5 * time.Minute)
}
for i, sync := range syncs {
for _, sync := range syncs {
shouldNotCount := false
SendInfoToSlack("Syncing %s (%s) to LBRY! (iteration %d/%d - total processed channels: %d)", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1)
SendInfoToSlack("Syncing %s (%s) to LBRY! total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1)
err := sync.FullCycle()
if err != nil {
fatalErrors := []string{
@ -192,7 +207,7 @@ func (s *SyncManager) Start() error {
SendInfoToSlack("A non fatal error was reported by the sync process. %s\nContinuing...", err.Error())
}
}
SendInfoToSlack("Syncing %s (%s) reached an end. (iteration %d/%d - total processed channels: %d)", sync.LbryChannelName, sync.YoutubeChannelID, i+1, len(syncs), syncCount+1)
SendInfoToSlack("Syncing %s (%s) reached an end. total processed channels since startup: %d", sync.LbryChannelName, sync.YoutubeChannelID, syncCount+1)
if !shouldNotCount {
syncCount++
}
@ -207,7 +222,12 @@ func (s *SyncManager) Start() error {
}
return nil
}
func (s *SyncManager) GetS3AWSConfig() aws.Config {
return aws.Config{
Credentials: credentials.NewStaticCredentials(s.awsS3ID, s.awsS3Secret, ""),
Region: &s.awsS3Region,
}
}
func (s *SyncManager) checkUsedSpace() error {
usedPctile, err := GetUsedSpace(s.blobsDir)
if err != nil {

View file

@ -2,17 +2,18 @@ package manager
import (
"fmt"
"math"
"net/http"
"os"
"strconv"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/jsonrpc"
"github.com/lbryio/lbry.go/extras/util"
"github.com/lbryio/lbry.go/lbrycrd"
"github.com/lbryio/ytsync/tagsManager"
"github.com/lbryio/ytsync/thumbs"
"github.com/shopspring/decimal"
@ -21,8 +22,28 @@ import (
"google.golang.org/api/youtube/v3"
)
func (s *Sync) enableAddressReuse() error {
accountsResponse, err := s.daemon.AccountList()
if err != nil {
return errors.Err(err)
}
accounts := accountsResponse.LBCMainnet
if os.Getenv("REGTEST") == "true" {
accounts = accountsResponse.LBCRegtest
}
for _, a := range accounts {
_, err = s.daemon.AccountSet(a.ID, jsonrpc.AccountSettings{
ChangeMaxUses: util.PtrToInt(1000),
ReceivingMaxUses: util.PtrToInt(100),
})
if err != nil {
return errors.Err(err)
}
}
return nil
}
func (s *Sync) walletSetup() error {
//prevent unnecessary concurrent execution
//prevent unnecessary concurrent execution and publishing while refilling/reallocating UTXOs
s.walletMux.Lock()
defer s.walletMux.Unlock()
err := s.ensureChannelOwnership()
@ -42,51 +63,55 @@ func (s *Sync) walletSetup() error {
}
log.Debugf("Starting balance is %.4f", balance)
var numOnSource int
if s.LbryChannelName == "@UCBerkeley" {
numOnSource = 10104
} else {
n, err := s.CountVideos()
if err != nil {
return err
}
numOnSource = int(n)
}
videosOnYoutube := int(n)
log.Debugf("Source channel has %d videos", numOnSource)
if numOnSource == 0 {
log.Debugf("Source channel has %d videos", videosOnYoutube)
if videosOnYoutube == 0 {
return nil
}
s.syncedVideosMux.RLock()
numPublished := len(s.syncedVideos) //should we only count published videos? Credits are allocated even for failed ones...
publishedCount := 0
notUpgradedCount := 0
failedCount := 0
for _, sv := range s.syncedVideos {
if sv.Published {
publishedCount++
if sv.MetadataVersion < 2 {
notUpgradedCount++
}
} else {
failedCount++
}
}
s.syncedVideosMux.RUnlock()
log.Debugf("We already allocated credits for %d videos", numPublished)
if numOnSource-numPublished > s.Manager.videosLimit {
numOnSource = s.Manager.videosLimit
log.Debugf("We already allocated credits for %d videos", publishedCount)
if videosOnYoutube > s.Manager.videosLimit {
videosOnYoutube = s.Manager.videosLimit
}
unallocatedVideos := videosOnYoutube - publishedCount
requiredBalance := float64(unallocatedVideos)*(publishAmount+estimatedMaxTxFee) + channelClaimAmount
if s.Manager.upgradeMetadata {
requiredBalance += float64(notUpgradedCount) * 0.001
}
minBalance := (float64(numOnSource)-float64(numPublished))*(publishAmount+0.1) + channelClaimAmount
if numPublished > numOnSource && balance < 1 {
SendErrorToSlack("something is going on as we published more videos than those available on source: %d/%d", numPublished, numOnSource)
minBalance = 1 //since we ended up in this function it means some juice is still needed
refillAmount := 0.0
if balance < requiredBalance || balance < minimumAccountBalance {
refillAmount = math.Max(requiredBalance-requiredBalance, minimumRefillAmount)
}
amountToAdd := minBalance - balance
if s.Refill > 0 {
if amountToAdd < 0 {
amountToAdd = float64(s.Refill)
} else {
amountToAdd += float64(s.Refill)
}
refillAmount += float64(s.Refill)
}
if amountToAdd > 0 {
if amountToAdd < 1 {
amountToAdd = 1 // no reason to bother adding less than 1 credit
}
err := s.addCredits(amountToAdd)
if refillAmount > 0 {
err := s.addCredits(refillAmount)
if err != nil {
return errors.Err(err)
}
@ -98,7 +123,7 @@ func (s *Sync) walletSetup() error {
} else if claimAddress == nil {
return errors.Err("could not get unused address")
}
s.claimAddress = string((*claimAddress)[0])
s.claimAddress = string((*claimAddress)[0]) //TODO: remove claimAddress completely
if s.claimAddress == "" {
return errors.Err("found blank claim address")
}
@ -122,7 +147,7 @@ func (s *Sync) ensureEnoughUTXOs() error {
}
defaultAccount := ""
for _, account := range accountsNet {
if account.IsDefaultAccount {
if account.IsDefault {
defaultAccount = account.ID
break
}
@ -161,12 +186,15 @@ func (s *Sync) ensureEnoughUTXOs() error {
if err != nil {
return errors.Err(err)
}
broadcastFee := 0.01
amountToSplit := fmt.Sprintf("%.6f", balanceAmount-broadcastFee)
maxUTXOs := uint64(500)
desiredUTXOCount := uint64(math.Floor((balanceAmount) / 0.1))
if desiredUTXOCount > maxUTXOs {
desiredUTXOCount = maxUTXOs
}
log.Infof("Splitting balance of %s evenly between %d UTXOs", *balance, desiredUTXOCount)
log.Infof("Splitting balance of %s evenly between 40 UTXOs", *balance)
prefillTx, err := s.daemon.AccountFund(defaultAccount, defaultAccount, amountToSplit, uint64(target))
broadcastFee := 0.1
prefillTx, err := s.daemon.AccountFund(defaultAccount, defaultAccount, fmt.Sprintf("%.4f", balanceAmount-broadcastFee), desiredUTXOCount, false)
if err != nil {
return err
} else if prefillTx == nil {
@ -220,7 +248,7 @@ func (s *Sync) ensureChannelOwnership() error {
return errors.Err("no channel name set")
}
//@TODO: get rid of this when imported channels are supported
if s.YoutubeChannelID == "UCkK9UDm_ZNrq_rIXCz3xCGA" || s.YoutubeChannelID == "UCW-thz5HxE-goYq8yPds1Gw" {
if s.YoutubeChannelID == "UCW-thz5HxE-goYq8yPds1Gw" {
return nil
}
channels, err := s.daemon.ChannelList(nil, 1, 50)
@ -246,9 +274,11 @@ func (s *Sync) ensureChannelOwnership() error {
}
}
}
channelUsesOldMetadata := false
if len((*channels).Items) == 1 {
channel := ((*channels).Items)[0]
if channel.Name == s.LbryChannelName {
channelUsesOldMetadata = channel.Value.GetThumbnail() == nil
//TODO: eventually get rid of this when the whole db is filled
if s.lbryChannelID == "" {
err = s.Manager.apiConfig.SetChannelClaimID(s.YoutubeChannelID, channel.ClaimID)
@ -256,7 +286,9 @@ func (s *Sync) ensureChannelOwnership() error {
return errors.Err("the channel in the wallet is different than the channel in the database")
}
s.lbryChannelID = channel.ClaimID
if !channelUsesOldMetadata {
return err
}
} else {
return errors.Err("this channel does not belong to this wallet! Expected: %s, found: %s", s.LbryChannelName, channel.Name)
}
@ -290,7 +322,7 @@ func (s *Sync) ensureChannelOwnership() error {
return errors.Prefix("error creating YouTube service", err)
}
response, err := service.Channels.List("snippet").Id(s.YoutubeChannelID).Do()
response, err := service.Channels.List("snippet,brandingSettings").Id(s.YoutubeChannelID).Do()
if err != nil {
return errors.Prefix("error getting channel details", err)
}
@ -300,24 +332,23 @@ func (s *Sync) ensureChannelOwnership() error {
}
channelInfo := response.Items[0].Snippet
channelBranding := response.Items[0].BrandingSettings
thumbnail := channelInfo.Thumbnails.Default
if channelInfo.Thumbnails.Maxres != nil {
thumbnail = channelInfo.Thumbnails.Maxres
} else if channelInfo.Thumbnails.High != nil {
thumbnail = channelInfo.Thumbnails.High
} else if channelInfo.Thumbnails.Medium != nil {
thumbnail = channelInfo.Thumbnails.Medium
} else if channelInfo.Thumbnails.Standard != nil {
thumbnail = channelInfo.Thumbnails.Standard
}
thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail.Url, s.YoutubeChannelID, aws.Config{
Credentials: credentials.NewStaticCredentials(s.AwsS3ID, s.AwsS3Secret, ""),
Region: &s.AwsS3Region,
})
thumbnail := thumbs.GetBestThumbnail(channelInfo.Thumbnails)
thumbnailURL, err := thumbs.MirrorThumbnail(thumbnail.Url, s.YoutubeChannelID, s.Manager.GetS3AWSConfig())
if err != nil {
return err
}
var bannerURL *string
if channelBranding.Image != nil && channelBranding.Image.BannerImageUrl != "" {
bURL, err := thumbs.MirrorThumbnail(channelBranding.Image.BannerImageUrl, "banner-"+s.YoutubeChannelID, s.Manager.GetS3AWSConfig())
if err != nil {
return err
}
bannerURL = &bURL
}
var languages []string = nil
if channelInfo.DefaultLanguage != "" {
languages = []string{channelInfo.DefaultLanguage}
@ -326,16 +357,32 @@ func (s *Sync) ensureChannelOwnership() error {
if channelInfo.Country != "" {
locations = []jsonrpc.Location{{Country: util.PtrToString(channelInfo.Country)}}
}
c, err := s.daemon.ChannelCreate(s.LbryChannelName, channelBidAmount, jsonrpc.ChannelCreateOptions{
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{
Title: channelInfo.Title,
Description: channelInfo.Description,
Tags: nil,
var c *jsonrpc.TransactionSummary
claimCreateOptions := jsonrpc.ClaimCreateOptions{
Title: &channelInfo.Title,
Description: &channelInfo.Description,
Tags: tagsManager.GetTagsForChannel(s.YoutubeChannelID),
Languages: languages,
Locations: locations,
ThumbnailURL: &thumbnailURL,
}
if channelUsesOldMetadata {
c, err = s.daemon.ChannelUpdate(s.lbryChannelID, jsonrpc.ChannelUpdateOptions{
ClearTags: util.PtrToBool(true),
ClearLocations: util.PtrToBool(true),
ClearLanguages: util.PtrToBool(true),
ChannelCreateOptions: jsonrpc.ChannelCreateOptions{
ClaimCreateOptions: claimCreateOptions,
CoverURL: bannerURL,
},
})
} else {
c, err = s.daemon.ChannelCreate(s.LbryChannelName, channelBidAmount, jsonrpc.ChannelCreateOptions{
ClaimCreateOptions: claimCreateOptions,
CoverURL: bannerURL,
})
}
if err != nil {
return err
}

View file

@ -1,11 +1,7 @@
package manager
import (
"bufio"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
@ -21,6 +17,7 @@ import (
"github.com/lbryio/ytsync/namer"
"github.com/lbryio/ytsync/sdk"
"github.com/lbryio/ytsync/sources"
"github.com/lbryio/ytsync/thumbs"
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/jsonrpc"
@ -41,6 +38,9 @@ import (
const (
channelClaimAmount = 0.01
estimatedMaxTxFee = 0.1
minimumAccountBalance = 4.0
minimumRefillAmount = 1
publishAmount = 0.01
maxReasonLength = 500
)
@ -51,7 +51,7 @@ type video interface {
IDAndNum() string
PlaylistPosition() int
PublishedAt() time.Time
Sync(*jsonrpc.Client, string, float64, string, int, *namer.Namer, float64) (*sources.SyncSummary, error)
Sync(*jsonrpc.Client, sources.SyncParams, *sdk.SyncedVideo, bool, *sync.RWMutex) (*sources.SyncSummary, error)
}
// sorting videos
@ -77,7 +77,7 @@ type Sync struct {
AwsS3Secret string
AwsS3Region string
AwsS3Bucket string
Fee *sdk.Fee
daemon *jsonrpc.Client
claimAddress string
videoDirectory string
@ -86,18 +86,21 @@ type Sync struct {
grp *stop.Group
lbryChannelID string
namer *namer.Namer
walletMux *sync.Mutex
walletMux *sync.RWMutex
queue chan video
}
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string) {
func (s *Sync) AppendSyncedVideo(videoID string, published bool, failureReason string, claimName string, claimID string, metadataVersion int8, size int64) {
s.syncedVideosMux.Lock()
defer s.syncedVideosMux.Unlock()
s.syncedVideos[videoID] = sdk.SyncedVideo{
VideoID: videoID,
Published: published,
FailureReason: failureReason,
ClaimID: claimID,
ClaimName: claimName,
MetadataVersion: metadataVersion,
Size: size,
}
}
@ -258,7 +261,7 @@ func (s *Sync) FullCycle() (e error) {
s.setExceptions()
s.syncedVideosMux = &sync.RWMutex{}
s.walletMux = &sync.Mutex{}
s.walletMux = &sync.RWMutex{}
s.grp = stop.New()
s.queue = make(chan video)
interruptChan := make(chan os.Signal, 1)
@ -319,12 +322,14 @@ func (s *Sync) FullCycle() (e error) {
return nil
}
func (s *Sync) setChannelTerminationStatus(e *error) {
if *e != nil {
//conditions for which a channel shouldn't be marked as failed
noFailConditions := []string{
"this youtube channel is being managed by another server",
"interrupted during daemon startup",
"playlist items not found",
}
if util.SubstringInSlice((*e).Error(), noFailConditions) {
return
@ -390,11 +395,10 @@ func logShutdownError(shutdownErr error) {
var thumbnailHosts = []string{
"berk.ninja/thumbnails/",
"https://thumbnails.lbry.com/",
thumbs.ThumbnailEndpoint,
}
func isYtsyncClaim(c jsonrpc.Claim) bool {
if !util.InSlice(c.Type, []string{"claim", "update"}) || c.Value.GetStream() == nil {
return false
}
@ -403,7 +407,12 @@ func isYtsyncClaim(c jsonrpc.Claim) bool {
return false
}
return util.InSlice(c.Value.GetThumbnail().GetUrl(), thumbnailHosts)
for _, th := range thumbnailHosts {
if strings.Contains(c.Value.GetThumbnail().GetUrl(), th) {
return true
}
}
return false
}
// fixDupes abandons duplicate claims
@ -442,8 +451,10 @@ func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) {
}
//updateRemoteDB counts the amount of videos published so far and updates the remote db if some videos weren't marked as published
func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err error) {
//additionally it removes all entries in the database indicating that a video is published when it's actually not
func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total, fixed, removed int, err error) {
count := 0
videoIDMap := make(map[string]string, len(claims))
for _, c := range claims {
if !isYtsyncClaim(c) {
continue
@ -452,18 +463,60 @@ func (s *Sync) updateRemoteDB(claims []jsonrpc.Claim) (total int, fixed int, err
//check if claimID is in remote db
tn := c.Value.GetThumbnail().GetUrl()
videoID := tn[strings.LastIndex(tn, "/")+1:]
pv, ok := s.syncedVideos[videoID]
if !ok || pv.ClaimName != c.Name {
fixed++
log.Debugf("adding %s to the database", c.Name)
videoIDMap[videoID] = c.ClaimID
pv, claimInDatabase := s.syncedVideos[videoID]
claimMetadataVersion := uint(1)
if strings.Contains(tn, thumbs.ThumbnailEndpoint) {
claimMetadataVersion = 2
}
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", nil)
metadataDiffers := claimInDatabase && pv.MetadataVersion != int8(claimMetadataVersion)
claimIDDiffers := claimInDatabase && pv.ClaimID != c.ClaimID
claimNameDiffers := claimInDatabase && pv.ClaimName != c.Name
claimMarkedUnpublished := claimInDatabase && !pv.Published
if metadataDiffers {
log.Debugf("%s: Mismatch in database for metadata. DB: %d - Blockchain: %d", videoID, pv.MetadataVersion, claimMetadataVersion)
}
if claimIDDiffers {
log.Debugf("%s: Mismatch in database for claimID. DB: %s - Blockchain: %s", videoID, pv.ClaimID, c.ClaimID)
}
if claimIDDiffers {
log.Debugf("%s: Mismatch in database for claimName. DB: %s - Blockchain: %s", videoID, pv.ClaimName, c.Name)
}
if claimMarkedUnpublished {
log.Debugf("%s: Mismatch in database: published but marked as unpublished", videoID)
}
if !claimInDatabase {
log.Debugf("%s: Published but is not in database (%s - %s)", videoID, c.Name, c.ClaimID)
}
if !claimInDatabase || metadataDiffers || claimIDDiffers || claimNameDiffers || claimMarkedUnpublished {
claimSize, err := c.GetStreamSizeByMagic()
if err != nil {
return count, fixed, err
claimSize = 0
}
fixed++
log.Debugf("updating %s in the database", videoID)
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, videoID, VideoStatusPublished, c.ClaimID, c.Name, "", util.PtrToInt64(int64(claimSize)), claimMetadataVersion)
if err != nil {
return count, fixed, 0, err
}
}
}
return count, fixed, nil
idsToRemove := make([]string, 0, len(videoIDMap))
for vID, sv := range s.syncedVideos {
_, ok := videoIDMap[vID]
if !ok && sv.Published {
log.Debugf("%s: claims to be published but wasn't found in the list of claims and will be removed if --remove-db-unpublished was specified", vID)
idsToRemove = append(idsToRemove, vID)
}
}
if s.Manager.removeDBUnpublished && len(idsToRemove) > 0 {
err := s.Manager.apiConfig.DeleteVideos(idsToRemove)
if err != nil {
return count, fixed, 0, err
}
}
return count, fixed, len(idsToRemove), nil
}
func (s *Sync) getClaims() ([]jsonrpc.Claim, error) {
@ -481,7 +534,10 @@ func (s *Sync) getClaims() ([]jsonrpc.Claim, error) {
}
func (s *Sync) doSync() error {
var err error
err := s.enableAddressReuse()
if err != nil {
return errors.Prefix("could not set address reuse policy", err)
}
err = s.walletSetup()
if err != nil {
return errors.Prefix("Initial wallet setup failed! Manual Intervention is required.", err)
@ -506,16 +562,21 @@ func (s *Sync) doSync() error {
}
}
pubsOnWallet, nFixed, err := s.updateRemoteDB(allClaims)
pubsOnWallet, nFixed, nRemoved, err := s.updateRemoteDB(allClaims)
if err != nil {
return errors.Prefix("error counting claims", err)
return errors.Prefix("error updating remote database", err)
}
if nFixed > 0 {
if nFixed > 0 || nRemoved > 0 {
err := s.setStatusSyncing()
if err != nil {
return err
}
SendInfoToSlack("%d claims were not on the remote database and were fixed", nFixed)
if nFixed > 0 {
SendInfoToSlack("%d claims had mismatched database info or were completely missing and were fixed", nFixed)
}
if nRemoved > 0 {
SendInfoToSlack("%d were marked as published but weren't actually published and thus removed from the database", nRemoved)
}
}
pubsOnDB := 0
for _, sv := range s.syncedVideos {
@ -545,7 +606,7 @@ func (s *Sync) doSync() error {
}
if s.LbryChannelName == "@UCBerkeley" {
err = s.enqueueUCBVideos()
err = errors.Err("UCB is not supported in this version of YTSYNC")
} else {
err = s.enqueueYoutubeVideos()
}
@ -584,7 +645,7 @@ func (s *Sync) startWorker(workerNum int) {
err := s.processVideo(v)
if err != nil {
logMsg := "error processing video: " + err.Error()
logMsg := fmt.Sprintf("error processing video %s: %s", v.ID(), err.Error())
log.Errorln(logMsg)
fatalErrors := []string{
":5279: read: connection reset by peer",
@ -613,38 +674,63 @@ func (s *Sync) startWorker(workerNum int) {
"no compatible format available for this video",
"Watch this video on YouTube.",
"have blocked it on copyright grounds",
"the video must be republished as we can't get the right size",
}
if util.SubstringInSlice(err.Error(), errorsNoRetry) {
log.Println("This error should not be retried at all")
} else if tryCount < s.MaxTries {
if strings.Contains(err.Error(), "txn-mempool-conflict") ||
strings.Contains(err.Error(), "too-long-mempool-chain") {
log.Println("waiting for a block before retrying")
err = s.waitForNewBlock()
err := s.waitForNewBlock()
if err != nil {
s.grp.Stop()
SendErrorToSlack("something went wrong while waiting for a block: %v", err)
break
}
} else if util.SubstringInSlice(err.Error(), []string{
} else if util.SubstringInSlice(err.Error(), errorsNoRetry) {
log.Println("This error should not be retried at all")
} else if tryCount < s.MaxTries {
if util.SubstringInSlice(err.Error(), []string{
"Not enough funds to cover this transaction",
"failed: Not enough funds",
"Error in daemon: Insufficient funds, please deposit additional LBC"}) {
log.Println("refilling addresses before retrying")
err = s.walletSetup()
"Error in daemon: Insufficient funds, please deposit additional LBC",
// "txn-mempool-conflict", //TODO: uncomment the two lines when the SDK will start spending confirmed UTXOs before failing
//"too-long-mempool-chain",
}) {
log.Println("checking funds and UTXOs before retrying...")
err := s.walletSetup()
if err != nil {
s.grp.Stop()
SendErrorToSlack("failed to setup the wallet for a refill: %v", err)
break
}
} else if strings.Contains(err.Error(), "Error in daemon: 'str' object has no attribute 'get'") {
time.Sleep(5 * time.Second)
}
log.Println("Retrying")
continue
}
SendErrorToSlack("Video failed after %d retries, skipping. Stack: %s", tryCount, logMsg)
}
s.AppendSyncedVideo(v.ID(), false, err.Error(), "")
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusFailed, "", "", err.Error(), v.Size())
existingClaim, ok := s.syncedVideos[v.ID()]
existingClaimID := ""
existingClaimName := ""
existingClaimSize := int64(0)
if v.Size() != nil {
existingClaimSize = *v.Size()
}
if ok {
existingClaimID = existingClaim.ClaimID
existingClaimName = existingClaim.ClaimName
if existingClaim.Size > 0 {
existingClaimSize = existingClaim.Size
}
}
videoStatus := VideoStatusFailed
if strings.Contains(err.Error(), "upgrade failed") {
videoStatus = VideoStatusUpgradeFailed
} else {
s.AppendSyncedVideo(v.ID(), false, err.Error(), existingClaimName, existingClaimID, 0, existingClaimSize)
}
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), videoStatus, existingClaimID, existingClaimName, err.Error(), &existingClaimSize, 0)
if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}
@ -683,7 +769,7 @@ func (s *Sync) enqueueYoutubeVideos() error {
}
var videos []video
playlistMap := make(map[string]*youtube.PlaylistItemSnippet, 50)
nextPageToken := ""
for {
req := service.PlaylistItems.List("snippet").
@ -705,7 +791,7 @@ func (s *Sync) enqueueYoutubeVideos() error {
}
return errors.Err("playlist items not found")
}
playlistMap := make(map[string]*youtube.PlaylistItemSnippet, 50)
//playlistMap := make(map[string]*youtube.PlaylistItemSnippet, 50)
videoIDs := make([]string, 50)
for i, item := range playlistResponse.Items {
// normally we'd send the video into the channel here, but youtube api doesn't have sorting
@ -713,14 +799,14 @@ func (s *Sync) enqueueYoutubeVideos() error {
playlistMap[item.Snippet.ResourceId.VideoId] = item.Snippet
videoIDs[i] = item.Snippet.ResourceId.VideoId
}
req2 := service.Videos.List("snippet,contentDetails").Id(strings.Join(videoIDs[:], ","))
req2 := service.Videos.List("snippet,contentDetails,recordingDetails").Id(strings.Join(videoIDs[:], ","))
videosListResponse, err := req2.Do()
if err != nil {
return errors.Prefix("error getting videos info", err)
}
for _, item := range videosListResponse.Items {
videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item, playlistMap[item.Id].Position))
videos = append(videos, sources.NewYoutubeVideo(s.videoDirectory, item, playlistMap[item.Id].Position, s.Manager.GetS3AWSConfig()))
}
log.Infof("Got info for %d videos from youtube API", len(videos))
@ -730,7 +816,16 @@ func (s *Sync) enqueueYoutubeVideos() error {
break
}
}
for k, v := range s.syncedVideos {
if !v.Published {
continue
}
_, ok := playlistMap[k]
if !ok {
videos = append(videos, sources.NewMockedVideo(s.videoDirectory, k, s.YoutubeChannelID, s.Manager.GetS3AWSConfig()))
}
}
sort.Sort(byPublishedAt(videos))
//or sort.Sort(sort.Reverse(byPlaylistPosition(videos)))
@ -752,55 +847,6 @@ Enqueue:
return nil
}
func (s *Sync) enqueueUCBVideos() error {
var videos []video
csvFile, err := os.Open("ucb.csv")
if err != nil {
return err
}
reader := csv.NewReader(bufio.NewReader(csvFile))
for {
line, err := reader.Read()
if err == io.EOF {
break
} else if err != nil {
return err
}
data := struct {
PublishedAt string `json:"publishedAt"`
}{}
err = json.Unmarshal([]byte(line[4]), &data)
if err != nil {
return err
}
videos = append(videos, sources.NewUCBVideo(line[0], line[2], line[1], line[3], data.PublishedAt, s.videoDirectory))
}
log.Printf("Publishing %d videos\n", len(videos))
sort.Sort(byPublishedAt(videos))
Enqueue:
for _, v := range videos {
select {
case <-s.grp.Ch():
break Enqueue
default:
}
select {
case s.queue <- v:
case <-s.grp.Ch():
break Enqueue
}
}
return nil
}
func (s *Sync) processVideo(v video) (err error) {
defer func() {
if p := recover(); p != nil {
@ -822,7 +868,9 @@ func (s *Sync) processVideo(v video) (err error) {
s.syncedVideosMux.RLock()
sv, ok := s.syncedVideos[v.ID()]
s.syncedVideosMux.RUnlock()
newMetadataVersion := int8(2)
alreadyPublished := ok && sv.Published
videoRequiresUpgrade := ok && s.Manager.upgradeMetadata && sv.MetadataVersion < newMetadataVersion
neverRetryFailures := []string{
"Error extracting sts from embedded url response",
@ -838,12 +886,16 @@ func (s *Sync) processVideo(v video) (err error) {
return nil
}
if alreadyPublished {
if alreadyPublished && !videoRequiresUpgrade {
log.Println(v.ID() + " already published")
return nil
}
if ok && sv.MetadataVersion >= newMetadataVersion {
log.Println(v.ID() + " upgraded to the new metadata")
return nil
}
if v.PlaylistPosition() > s.Manager.videosLimit {
if !videoRequiresUpgrade && v.PlaylistPosition() > s.Manager.videosLimit {
log.Println(v.ID() + " is old: skipping")
return nil
}
@ -851,19 +903,27 @@ func (s *Sync) processVideo(v video) (err error) {
if err != nil {
return err
}
sp := sources.SyncParams{
ClaimAddress: s.claimAddress,
Amount: publishAmount,
ChannelID: s.lbryChannelID,
MaxVideoSize: s.Manager.maxVideoSize,
Namer: s.namer,
MaxVideoLength: s.Manager.maxVideoLength,
Fee: s.Fee,
}
summary, err := v.Sync(s.daemon, s.claimAddress, publishAmount, s.lbryChannelID, s.Manager.maxVideoSize, s.namer, s.Manager.maxVideoLength)
summary, err := v.Sync(s.daemon, sp, &sv, videoRequiresUpgrade, s.walletMux)
if err != nil {
return err
}
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size())
s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName, summary.ClaimID, newMetadataVersion, *v.Size())
err = s.Manager.apiConfig.MarkVideoStatus(s.YoutubeChannelID, v.ID(), VideoStatusPublished, summary.ClaimID, summary.ClaimName, "", v.Size(), 2)
if err != nil {
SendErrorToSlack("Failed to mark video on the database: %s", err.Error())
}
s.AppendSyncedVideo(v.ID(), true, "", summary.ClaimName)
return nil
}

View file

@ -2,6 +2,7 @@ package sdk
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
@ -33,15 +34,16 @@ type SyncProperties struct {
YoutubeChannelID string
}
type Fee struct {
Amount string `json:"amount"`
Address string `json:"address"`
Currency string `json:"currency"`
}
type YoutubeChannel struct {
ChannelId string `json:"channel_id"`
TotalVideos uint `json:"total_videos"`
DesiredChannelName string `json:"desired_channel_name"`
Fee *struct {
Amount string `json:"amount"`
Address string `json:"address"`
Currency string `json:"currency"`
} `json:"fee"`
Fee *Fee `json:"fee"`
ChannelClaimID string `json:"channel_claim_id"`
}
@ -80,6 +82,9 @@ type SyncedVideo struct {
Published bool `json:"published"`
FailureReason string `json:"failure_reason"`
ClaimName string `json:"claim_name"`
ClaimID string `json:"claim_id"`
Size int64 `json:"size"`
MetadataVersion int8 `json:"metadata_version"`
}
func sanitizeFailureReason(s *string) {
@ -121,8 +126,10 @@ func (a *APIConfig) SetChannelStatus(channelID string, status string, failureRea
claimNames := make(map[string]bool)
for _, v := range response.Data {
svs[v.VideoID] = v
if v.ClaimName != "" {
claimNames[v.ClaimName] = v.Published
}
}
return svs, claimNames, nil
}
return nil, nil, errors.Err("invalid API response. Status code: %d", res.StatusCode)
@ -158,10 +165,40 @@ func (a *APIConfig) SetChannelClaimID(channelID string, channelClaimID string) e
const (
VideoStatusPublished = "published"
VideoStatusUpgradeFailed = "upgradefailed"
VideoStatusFailed = "failed"
)
func (a *APIConfig) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64) error {
func (a *APIConfig) DeleteVideos(videos []string) error {
endpoint := a.ApiURL + "/yt/video_delete"
videoIDs := strings.Join(videos, ",")
vals := url.Values{
"video_ids": {videoIDs},
"auth_token": {a.ApiToken},
}
res, _ := http.PostForm(endpoint, vals)
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
var response struct {
Success bool `json:"success"`
Error null.String `json:"error"`
Data null.String `json:"data"`
}
err := json.Unmarshal(body, &response)
if err != nil {
return errors.Err(err)
}
if !response.Error.IsNull() {
return errors.Err(response.Error.String)
}
if !response.Data.IsNull() && response.Data.String == "ok" {
return nil
}
return errors.Err("invalid API response. Status code: %d", res.StatusCode)
}
func (a *APIConfig) MarkVideoStatus(channelID string, videoID string, status string, claimID string, claimName string, failureReason string, size *int64, metadataVersion uint) error {
endpoint := a.ApiURL + "/yt/video_status"
sanitizeFailureReason(&failureReason)
@ -171,13 +208,16 @@ func (a *APIConfig) MarkVideoStatus(channelID string, videoID string, status str
"status": {status},
"auth_token": {a.ApiToken},
}
if status == VideoStatusPublished {
if status == VideoStatusPublished || status == VideoStatusUpgradeFailed {
if claimID == "" || claimName == "" {
return errors.Err("claimID or claimName missing")
return errors.Err("claimID (%s) or claimName (%s) missing", claimID, claimName)
}
vals.Add("published_at", strconv.FormatInt(time.Now().Unix(), 10))
vals.Add("claim_id", claimID)
vals.Add("claim_name", claimName)
if metadataVersion > 0 {
vals.Add("metadata_version", fmt.Sprintf("%d", metadataVersion))
}
if size != nil {
vals.Add("size", strconv.FormatInt(*size, 10))
}

View file

@ -2,6 +2,7 @@ package sources
import (
"strings"
"sync"
"github.com/lbryio/lbry.go/extras/jsonrpc"
"github.com/lbryio/ytsync/namer"
@ -12,7 +13,9 @@ type SyncSummary struct {
ClaimName string
}
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.StreamCreateOptions, namer *namer.Namer) (*SyncSummary, error) {
func publishAndRetryExistingNames(daemon *jsonrpc.Client, title, filename string, amount float64, options jsonrpc.StreamCreateOptions, namer *namer.Namer, walletLock *sync.RWMutex) (*SyncSummary, error) {
walletLock.RLock()
defer walletLock.RUnlock()
for {
name := namer.GetNextName(title)
response, err := daemon.StreamCreate(name, filename, amount, options)

View file

@ -1,219 +0,0 @@
package sources
import (
"net/http"
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/jsonrpc"
"github.com/lbryio/ytsync/namer"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
log "github.com/sirupsen/logrus"
)
type ucbVideo struct {
id string
title string
channel string
description string
publishedAt time.Time
dir string
claimNames map[string]bool
syncedVideosMux *sync.RWMutex
}
func NewUCBVideo(id, title, channel, description, publishedAt, dir string) *ucbVideo {
p, _ := time.Parse(time.RFC3339Nano, publishedAt) // ignore parse errors
return &ucbVideo{
id: id,
title: title,
description: description,
channel: channel,
dir: dir,
publishedAt: p,
}
}
func (v *ucbVideo) ID() string {
return v.id
}
func (v *ucbVideo) PlaylistPosition() int {
return 0
}
func (v *ucbVideo) IDAndNum() string {
return v.ID() + " (?)"
}
func (v *ucbVideo) PublishedAt() time.Time {
return v.publishedAt
//r := regexp.MustCompile(`(\d\d\d\d)-(\d\d)-(\d\d)`)
//matches := r.FindStringSubmatch(v.title)
//if len(matches) > 0 {
// year, _ := strconv.Atoi(matches[1])
// month, _ := strconv.Atoi(matches[2])
// day, _ := strconv.Atoi(matches[3])
// return time.Date(year, time.Month(month), day, 0, 0, 0, 0, time.UTC)
//}
//return time.Now()
}
func (v *ucbVideo) getFilename() string {
return v.dir + "/" + v.id + ".mp4"
}
func (v *ucbVideo) getClaimName(attempt int) string {
reg := regexp.MustCompile(`[^a-zA-Z0-9]+`)
suffix := ""
if attempt > 1 {
suffix = "-" + strconv.Itoa(attempt)
}
maxLen := 40 - len(suffix)
chunks := strings.Split(strings.ToLower(strings.Trim(reg.ReplaceAllString(v.title, "-"), "-")), "-")
name := chunks[0]
if len(name) > maxLen {
return name[:maxLen]
}
for _, chunk := range chunks[1:] {
tmpName := name + "-" + chunk
if len(tmpName) > maxLen {
if len(name) < 20 {
name = tmpName[:maxLen]
}
break
}
name = tmpName
}
return name + suffix
}
func (v *ucbVideo) getAbbrevDescription() string {
maxLines := 10
description := strings.TrimSpace(v.description)
if strings.Count(description, "\n") < maxLines {
return description
}
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
}
func (v *ucbVideo) download() error {
videoPath := v.getFilename()
_, err := os.Stat(videoPath)
if err != nil && !os.IsNotExist(err) {
return err
} else if err == nil {
log.Debugln(v.id + " already exists at " + videoPath)
return nil
}
creds := credentials.NewStaticCredentials("ID-GOES-HERE", "SECRET-GOES-HERE", "")
s, err := session.NewSession(&aws.Config{Region: aws.String("us-east-2"), Credentials: creds})
if err != nil {
return err
}
downloader := s3manager.NewDownloader(s)
out, err := os.Create(videoPath)
if err != nil {
return err
}
defer out.Close()
log.Println("lbry-niko2/videos/" + v.channel + "/" + v.id)
bytesWritten, err := downloader.Download(out, &s3.GetObjectInput{
Bucket: aws.String("lbry-niko2"),
Key: aws.String("/videos/" + v.channel + "/" + v.id + ".mp4"),
})
if err != nil {
return err
} else if bytesWritten == 0 {
return errors.Err("zero bytes written")
}
return nil
}
func (v *ucbVideo) saveThumbnail() error {
resp, err := http.Get("https://s3.us-east-2.amazonaws.com/lbry-niko2/thumbnails/" + v.id)
if err != nil {
return err
}
defer resp.Body.Close()
creds := credentials.NewStaticCredentials("ID-GOES-HERE", "SECRET-GOES-HERE", "")
s, err := session.NewSession(&aws.Config{Region: aws.String("us-east-2"), Credentials: creds})
if err != nil {
return err
}
uploader := s3manager.NewUploader(s)
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String("berk.ninja"),
Key: aws.String("thumbnails/" + v.id),
ContentType: aws.String("image/jpeg"),
Body: resp.Body,
})
return err
}
func (v *ucbVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *namer.Namer) (*SyncSummary, error) {
options := jsonrpc.StreamCreateOptions{
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{
Title: v.title,
Description: v.getAbbrevDescription(),
ClaimAddress: &claimAddress,
Languages: []string{"en"},
ThumbnailURL: strPtr("https://berk.ninja/thumbnails/" + v.id),
Tags: []string{},
},
Author: strPtr("UC Berkeley"),
License: strPtr("see description"),
StreamType: &jsonrpc.StreamTypeVideo,
ChannelID: &channelID,
}
return publishAndRetryExistingNames(daemon, v.title, v.getFilename(), amount, options, namer)
}
func (v *ucbVideo) Size() *int64 {
return nil
}
func (v *ucbVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *namer.Namer, maxVideoLength float64) (*SyncSummary, error) {
//download and thumbnail can be done in parallel
err := v.download()
if err != nil {
return nil, errors.Prefix("download error", err)
}
log.Debugln("Downloaded " + v.id)
//err = v.SaveThumbnail()
//if err != nil {
// return errors.WrapPrefix(err, "thumbnail error", 0)
//}
//log.Debugln("Created thumbnail for " + v.id)
summary, err := v.publish(daemon, claimAddress, amount, channelID, namer)
if err != nil {
return nil, errors.Prefix("publish error", err)
}
return summary, nil
}

View file

@ -1,31 +1,37 @@
package sources
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net/http"
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/jsonrpc"
"github.com/lbryio/lbry.go/extras/util"
"github.com/lbryio/ytsync/namer"
"github.com/lbryio/ytsync/sdk"
"github.com/lbryio/ytsync/tagsManager"
"github.com/lbryio/ytsync/thumbs"
"github.com/ChannelMeter/iso8601duration"
"github.com/aws/aws-sdk-go/aws"
"github.com/nikooo777/ytdl"
"github.com/shopspring/decimal"
log "github.com/sirupsen/logrus"
"google.golang.org/api/youtube/v3"
)
type YoutubeVideo struct {
id string
channelTitle string
title string
description string
playlistPosition int64
@ -35,20 +41,75 @@ type YoutubeVideo struct {
publishedAt time.Time
dir string
youtubeInfo *youtube.Video
youtubeChannelID string
tags []string
awsConfig aws.Config
thumbnailURL string
lbryChannelID string
mocked bool
walletLock *sync.RWMutex
}
func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPosition int64) *YoutubeVideo {
const reflectorURL = "http://blobs.lbry.io/"
var youtubeCategories = map[string]string{
"1": "film & animation",
"2": "autos & vehicles",
"10": "music",
"15": "pets & animals",
"17": "sports",
"18": "short movies",
"19": "travel & events",
"20": "gaming",
"21": "videoblogging",
"22": "people & blogs",
"23": "comedy",
"24": "entertainment",
"25": "news & politics",
"26": "howto & style",
"27": "education",
"28": "science & technology",
"29": "nonprofits & activism",
"30": "movies",
"31": "anime/animation",
"32": "action/adventure",
"33": "classics",
"34": "comedy",
"35": "documentary",
"36": "drama",
"37": "family",
"38": "foreign",
"39": "horror",
"40": "sci-fi/fantasy",
"41": "thriller",
"42": "shorts",
"43": "shows",
"44": "trailers",
}
func NewYoutubeVideo(directory string, videoData *youtube.Video, playlistPosition int64, awsConfig aws.Config) *YoutubeVideo {
publishedAt, _ := time.Parse(time.RFC3339Nano, videoData.Snippet.PublishedAt) // ignore parse errors
return &YoutubeVideo{
id: videoData.Id,
title: videoData.Snippet.Title,
description: videoData.Snippet.Description,
channelTitle: videoData.Snippet.ChannelTitle,
playlistPosition: playlistPosition,
publishedAt: publishedAt,
dir: directory,
youtubeInfo: videoData,
awsConfig: awsConfig,
mocked: false,
youtubeChannelID: videoData.Snippet.ChannelId,
}
}
func NewMockedVideo(directory string, videoID string, youtubeChannelID string, awsConfig aws.Config) *YoutubeVideo {
return &YoutubeVideo{
id: videoID,
playlistPosition: 0,
dir: directory,
awsConfig: awsConfig,
mocked: true,
youtubeChannelID: youtubeChannelID,
}
}
@ -65,6 +126,9 @@ func (v *YoutubeVideo) IDAndNum() string {
}
func (v *YoutubeVideo) PublishedAt() time.Time {
if v.mocked {
return time.Unix(0, 0)
}
return v.publishedAt
}
@ -98,10 +162,40 @@ func (v *YoutubeVideo) getFullPath() string {
func (v *YoutubeVideo) getAbbrevDescription() string {
maxLines := 10
description := strings.TrimSpace(v.description)
if strings.Count(description, "\n") < maxLines {
return description
additionalDescription := "\nhttps://www.youtube.com/watch?v=" + v.id
khanAcademyClaimID := "5fc52291980268b82413ca4c0ace1b8d749f3ffb"
if v.lbryChannelID == khanAcademyClaimID {
additionalDescription = additionalDescription + "\nNote: All Khan Academy content is available for free at (www.khanacademy.org)"
}
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..."
if strings.Count(description, "\n") < maxLines {
return description + "\n..." + additionalDescription
}
return strings.Join(strings.Split(description, "\n")[:maxLines], "\n") + "\n..." + additionalDescription
}
func (v *YoutubeVideo) fallbackDownload() error {
cmd := exec.Command("youtube-dl",
"--no-progress",
"-fbestvideo[ext=mp4,height<=1080,filesize<2000M]+best[ext=mp4,height<=1080,filesize<2000M]",
"-o"+strings.TrimRight(v.getFullPath(), ".mp4"),
"--merge-output-format",
"mp4",
"https://www.youtube.com/watch?v="+v.ID())
log.Printf("Running command and waiting for it to finish...")
output, err := cmd.CombinedOutput()
log.Debugln(string(output))
if err != nil {
log.Printf("Command finished with error: %v", errors.Err(string(output)))
return errors.Err(err)
}
fi, err := os.Stat(v.getFullPath())
if err != nil {
return errors.Err(err)
}
videoSize := fi.Size()
v.size = &videoSize
return nil
}
func (v *YoutubeVideo) download() error {
@ -190,108 +284,123 @@ func (v *YoutubeVideo) download() error {
func (v *YoutubeVideo) videoDir() string {
return v.dir + "/" + v.id
}
func (v *YoutubeVideo) getDownloadedPath() (string, error) {
files, err := ioutil.ReadDir(v.videoDir())
log.Infoln(v.videoDir())
if err != nil {
err = errors.Prefix("list error", err)
log.Errorln(err)
return "", err
}
for _, f := range files {
if f.IsDir() {
continue
}
if strings.Contains(v.getFullPath(), strings.TrimSuffix(f.Name(), filepath.Ext(f.Name()))) {
return v.videoDir() + "/" + f.Name(), nil
}
}
return "", errors.Err("could not find any downloaded videos")
}
func (v *YoutubeVideo) delete() error {
videoPath := v.getFullPath()
err := os.Remove(videoPath)
videoPath, err := v.getDownloadedPath()
if err != nil {
log.Errorln(errors.Prefix("delete error", err))
log.Errorln(err)
return err
}
log.Debugln(v.id + " deleted from disk (" + videoPath + ")")
return nil
}
err = os.Remove(videoPath)
log.Debugf("%s deleted from disk (%s)", v.id, videoPath)
func (v *YoutubeVideo) triggerThumbnailSave() error {
client := &http.Client{Timeout: 30 * time.Second}
params, err := json.Marshal(map[string]string{"videoid": v.id})
if err != nil {
err = errors.Prefix("delete error", err)
log.Errorln(err)
return err
}
request, err := http.NewRequest(http.MethodPut, "https://jgp4g1qoud.execute-api.us-east-1.amazonaws.com/prod/thumbnail", bytes.NewBuffer(params))
if err != nil {
return err
}
response, err := client.Do(request)
if err != nil {
return err
}
defer response.Body.Close()
contents, err := ioutil.ReadAll(response.Body)
if err != nil {
return err
}
var decoded struct {
Error int `json:"error"`
Url string `json:"url,omitempty"`
Message string `json:"message,omitempty"`
}
err = json.Unmarshal(contents, &decoded)
if err != nil {
return err
}
if decoded.Error != 0 {
return errors.Err("error creating thumbnail: " + decoded.Message)
}
return nil
}
func strPtr(s string) *string { return &s }
func (v *YoutubeVideo) triggerThumbnailSave() (err error) {
thumbnail := thumbs.GetBestThumbnail(v.youtubeInfo.Snippet.Thumbnails)
v.thumbnailURL, err = thumbs.MirrorThumbnail(thumbnail.Url, v.ID(), v.awsConfig)
return err
}
func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, namer *namer.Namer) (*SyncSummary, error) {
additionalDescription := "\nhttps://www.youtube.com/watch?v=" + v.id
khanAcademyClaimID := "5fc52291980268b82413ca4c0ace1b8d749f3ffb"
if channelID == khanAcademyClaimID {
additionalDescription = additionalDescription + "\nNote: All Khan Academy content is available for free at (www.khanacademy.org)"
}
var languages []string = nil
if v.youtubeInfo.Snippet.DefaultLanguage != "" {
languages = []string{v.youtubeInfo.Snippet.DefaultLanguage}
}
videoDuration, err := duration.FromString(v.youtubeInfo.ContentDetails.Duration)
func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, params SyncParams) (*SyncSummary, error) {
languages, locations, tags := v.getMetadata()
var fee *jsonrpc.Fee
if params.Fee != nil {
feeAmount, err := decimal.NewFromString(params.Fee.Amount)
if err != nil {
return nil, errors.Err(err)
}
options := jsonrpc.StreamCreateOptions{
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{
Title: v.title,
Description: v.getAbbrevDescription() + additionalDescription,
ClaimAddress: &claimAddress,
Languages: languages,
ThumbnailURL: strPtr("https://thumbnails.lbry.com/" + v.id),
Tags: v.youtubeInfo.Snippet.Tags,
},
Author: strPtr(v.channelTitle),
License: strPtr("Copyrighted (contact author)"),
StreamType: &jsonrpc.StreamTypeVideo,
ReleaseTime: util.PtrToInt64(v.publishedAt.Unix()),
VideoDuration: util.PtrToUint64(uint64(math.Ceil(videoDuration.ToDuration().Seconds()))),
ChannelID: &channelID,
fee = &jsonrpc.Fee{
FeeAddress: &params.Fee.Address,
FeeAmount: feeAmount,
FeeCurrency: jsonrpc.Currency(params.Fee.Currency),
}
}
return publishAndRetryExistingNames(daemon, v.title, v.getFullPath(), amount, options, namer)
options := jsonrpc.StreamCreateOptions{
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{
Title: &v.title,
Description: util.PtrToString(v.getAbbrevDescription()),
ClaimAddress: &params.ClaimAddress,
Languages: languages,
ThumbnailURL: &v.thumbnailURL,
Tags: tags,
Locations: locations,
},
Fee: fee,
License: util.PtrToString("Copyrighted (contact publisher)"),
ReleaseTime: util.PtrToInt64(v.publishedAt.Unix()),
ChannelID: &v.lbryChannelID,
}
downloadPath, err := v.getDownloadedPath()
if err != nil {
return nil, err
}
return publishAndRetryExistingNames(daemon, v.title, downloadPath, params.Amount, options, params.Namer, v.walletLock)
}
func (v *YoutubeVideo) Size() *int64 {
return v.size
}
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount float64, channelID string, maxVideoSize int, namer *namer.Namer, maxVideoLength float64) (*SyncSummary, error) {
v.maxVideoSize = int64(maxVideoSize) * 1024 * 1024
v.maxVideoLength = maxVideoLength
//download and thumbnail can be done in parallel
type SyncParams struct {
ClaimAddress string
Amount float64
ChannelID string
MaxVideoSize int
Namer *namer.Namer
MaxVideoLength float64
Fee *sdk.Fee
}
func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, params SyncParams, existingVideoData *sdk.SyncedVideo, reprocess bool, walletLock *sync.RWMutex) (*SyncSummary, error) {
v.maxVideoSize = int64(params.MaxVideoSize) * 1024 * 1024
v.maxVideoLength = params.MaxVideoLength
v.lbryChannelID = params.ChannelID
v.walletLock = walletLock
if reprocess && existingVideoData != nil && existingVideoData.Published {
summary, err := v.reprocess(daemon, params, existingVideoData)
return summary, errors.Prefix("upgrade failed", err)
}
return v.downloadAndPublish(daemon, params)
}
func (v *YoutubeVideo) downloadAndPublish(daemon *jsonrpc.Client, params SyncParams) (*SyncSummary, error) {
err := v.download()
if err != nil {
return nil, errors.Prefix("download error", err)
log.Errorf("standard downloader failed: %s. Trying fallback downloader\n", err.Error())
fallBackErr := v.fallbackDownload()
if fallBackErr != nil {
log.Errorf("fallback downloader failed: %s\n", fallBackErr.Error())
return nil, errors.Prefix("download error", err) //return original error
}
}
log.Debugln("Downloaded " + v.id)
@ -301,9 +410,143 @@ func (v *YoutubeVideo) Sync(daemon *jsonrpc.Client, claimAddress string, amount
}
log.Debugln("Created thumbnail for " + v.id)
summary, err := v.publish(daemon, claimAddress, amount, channelID, namer)
summary, err := v.publish(daemon, params)
//delete the video in all cases (and ignore the error)
_ = v.delete()
return summary, errors.Prefix("publish error", err)
}
func (v *YoutubeVideo) getMetadata() (languages []string, locations []jsonrpc.Location, tags []string) {
languages = nil
locations = nil
tags = nil
if !v.mocked {
if v.youtubeInfo.Snippet.DefaultLanguage != "" {
languages = []string{v.youtubeInfo.Snippet.DefaultLanguage}
}
if v.youtubeInfo.RecordingDetails != nil && v.youtubeInfo.RecordingDetails.Location != nil {
locations = []jsonrpc.Location{{
Latitude: util.PtrToString(fmt.Sprintf("%.7f", v.youtubeInfo.RecordingDetails.Location.Latitude)),
Longitude: util.PtrToString(fmt.Sprintf("%.7f", v.youtubeInfo.RecordingDetails.Location.Longitude)),
}}
}
tags = v.youtubeInfo.Snippet.Tags
}
tags, err := tagsManager.SanitizeTags(tags, v.youtubeChannelID)
if err != nil {
log.Errorln(err.Error())
}
if !v.mocked {
tags = append(tags, youtubeCategories[v.youtubeInfo.Snippet.CategoryId])
}
return languages, locations, tags
}
func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, existingVideoData *sdk.SyncedVideo) (*SyncSummary, error) {
c, err := daemon.ClaimSearch(nil, &existingVideoData.ClaimID, nil, nil)
if err != nil {
return nil, errors.Err(err)
}
if len(c.Claims) == 0 {
return nil, errors.Err("cannot reprocess: no claim found for this video")
} else if len(c.Claims) > 1 {
return nil, errors.Err("cannot reprocess: too many claims. claimID: %s", existingVideoData.ClaimID)
}
currentClaim := c.Claims[0]
languages, locations, tags := v.getMetadata()
thumbnailURL := ""
if currentClaim.Value.GetThumbnail() == nil {
if v.mocked {
return nil, errors.Err("could not find thumbnail for mocked video")
}
thumbnail := thumbs.GetBestThumbnail(v.youtubeInfo.Snippet.Thumbnails)
thumbnailURL, err = thumbs.MirrorThumbnail(thumbnail.Url, v.ID(), v.awsConfig)
} else {
thumbnailURL = thumbs.ThumbnailEndpoint + v.ID()
}
videoSize, err := currentClaim.GetStreamSizeByMagic()
if err != nil {
if existingVideoData.Size > 0 {
videoSize = uint64(existingVideoData.Size)
} else {
log.Infof("%s: the video must be republished as we can't get the right size", v.ID())
//return v.downloadAndPublish(daemon, params) //TODO: actually republish the video. NB: the current claim should be abandoned first
return nil, errors.Err("the video must be republished as we can't get the right size")
}
}
v.size = util.PtrToInt64(int64(videoSize))
var fee *jsonrpc.Fee
if params.Fee != nil {
feeAmount, err := decimal.NewFromString(params.Fee.Amount)
if err != nil {
return nil, errors.Err(err)
}
fee = &jsonrpc.Fee{
FeeAddress: &params.Fee.Address,
FeeAmount: feeAmount,
FeeCurrency: jsonrpc.Currency(params.Fee.Currency),
}
}
streamCreateOptions := &jsonrpc.StreamCreateOptions{
ClaimCreateOptions: jsonrpc.ClaimCreateOptions{
Tags: tags,
ThumbnailURL: &thumbnailURL,
Languages: languages,
Locations: locations,
},
Author: util.PtrToString(""),
License: util.PtrToString("Copyrighted (contact publisher)"),
ChannelID: &v.lbryChannelID,
Height: util.PtrToUint(720),
Width: util.PtrToUint(1280),
Fee: fee,
}
v.walletLock.RLock()
defer v.walletLock.RUnlock()
if v.mocked {
pr, err := daemon.StreamUpdate(existingVideoData.ClaimID, jsonrpc.StreamUpdateOptions{
StreamCreateOptions: streamCreateOptions,
FileSize: &videoSize,
})
if err != nil {
return nil, err
}
return &SyncSummary{
ClaimID: pr.Outputs[0].ClaimID,
ClaimName: pr.Outputs[0].Name,
}, nil
}
videoDuration, err := duration.FromString(v.youtubeInfo.ContentDetails.Duration)
if err != nil {
return nil, errors.Err(err)
}
streamCreateOptions.ClaimCreateOptions.Title = &v.title
streamCreateOptions.ClaimCreateOptions.Description = util.PtrToString(v.getAbbrevDescription())
streamCreateOptions.Duration = util.PtrToUint64(uint64(math.Ceil(videoDuration.ToDuration().Seconds())))
streamCreateOptions.ReleaseTime = util.PtrToInt64(v.publishedAt.Unix())
pr, err := daemon.StreamUpdate(existingVideoData.ClaimID, jsonrpc.StreamUpdateOptions{
ClearLanguages: util.PtrToBool(true),
ClearLocations: util.PtrToBool(true),
ClearTags: util.PtrToBool(true),
StreamCreateOptions: streamCreateOptions,
FileSize: &videoSize,
})
if err != nil {
return nil, err
}
return &SyncSummary{
ClaimID: pr.Outputs[0].ClaimID,
ClaimName: pr.Outputs[0].Name,
}, nil
}

View file

@ -1,152 +0,0 @@
import os
import sys
from decimal import Decimal
from bitcoinrpc.authproxy import AuthServiceProxy
from lbryum.wallet import Wallet, WalletStorage
from lbryum.commands import known_commands, Commands
from lbryum.simple_config import SimpleConfig
from lbryum.blockchain import get_blockchain
from lbryum.network import Network
def get_lbrycrdd_connection_string(wallet_conf):
settings = {"username": "rpcuser",
"password": "rpcpassword",
"rpc_port": 9245}
if wallet_conf and os.path.exists(wallet_conf):
with open(wallet_conf, "r") as conf:
conf_lines = conf.readlines()
for l in conf_lines:
if l.startswith("rpcuser="):
settings["username"] = l[8:].rstrip('\n')
if l.startswith("rpcpassword="):
settings["password"] = l[12:].rstrip('\n')
if l.startswith("rpcport="):
settings["rpc_port"] = int(l[8:].rstrip('\n'))
rpc_user = settings["username"]
rpc_pass = settings["password"]
rpc_port = settings["rpc_port"]
rpc_url = "127.0.0.1"
return "http://%s:%s@%s:%i" % (rpc_user, rpc_pass, rpc_url, rpc_port)
class LBRYumWallet(object):
def __init__(self, lbryum_path):
self.config = SimpleConfig()
self.config.set_key('chain', 'lbrycrd_main')
self.storage = WalletStorage(lbryum_path)
self.wallet = Wallet(self.storage)
self.cmd_runner = Commands(self.config, self.wallet, None)
if not self.wallet.has_seed():
seed = self.wallet.make_seed()
self.wallet.add_seed(seed, "derp")
self.wallet.create_master_keys("derp")
self.wallet.create_main_account()
self.wallet.update_password("derp", "")
self.network = Network(self.config)
self.blockchain = get_blockchain(self.config, self.network)
print self.config.get('chain'), self.blockchain
self.wallet.storage.write()
def command(self, command_name, *args, **kwargs):
cmd_runner = Commands(self.config, self.wallet, None)
cmd = known_commands[command_name]
func = getattr(cmd_runner, cmd.name)
return func(*args, **kwargs)
def generate_address(self):
address = self.wallet.create_new_address()
self.wallet.storage.write()
return address
class LBRYcrd(object):
def __init__(self, lbrycrdd_path):
self.lbrycrdd_conn_str = get_lbrycrdd_connection_string(lbrycrdd_path)
def __call__(self, method, *args, **kwargs):
return self.rpc(method)(*args, **kwargs)
def rpc(self, method):
return AuthServiceProxy(self.lbrycrdd_conn_str, service_name=method)
def get_wallet_path():
cwd = os.getcwd()
wallet_path = os.path.join(cwd, "wallet.json")
if not os.path.exists(wallet_path):
return wallet_path
i = 1
while True:
wallet_path = os.path.join(cwd, "wallet_%i.json" % i)
if not os.path.exists(wallet_path):
return wallet_path
i += 1
def coin_chooser(lbrycrdd, amount, fee=0.001):
def iter_txis():
unspent = lbrycrdd("listunspent")
unspent = sorted(unspent, key=lambda x: x['amount'], reverse=True)
spendable = Decimal(0.0)
for txi in unspent:
if spendable >= amount:
break
else:
spendable += txi['amount']
yield txi
if spendable < amount:
print spendable, amount
raise Exception("Not enough funds")
coins = list(iter(iter_txis()))
total = sum(c['amount'] for c in coins)
change = Decimal(total) - Decimal(amount) - Decimal(fee)
if change < 0:
raise Exception("Not enough funds")
if change:
change_address = lbrycrdd("getnewaddress")
else:
change_address = None
print "Total: %f, amount: %f, change: %f" % (total, amount, change)
return coins, change, change_address
def get_raw_tx(lbrycrdd, addresses, coins, amount, change, change_address):
txi = [{'txid': c['txid'], 'vout': c['vout']} for c in coins]
txo = {address: float(amount) for address in addresses}
if change_address:
txo[change_address] = float(change)
return lbrycrdd("createrawtransaction", txi, txo)
def main(count, value=None, lbryum_path=None, lbrycrdd_path=None):
count = int(count)
lbryum_path = lbryum_path or get_wallet_path()
if sys.platform == "darwin":
default_lbrycrdd = os.path.join(os.path.expanduser("~"),
"Library/Application Support/lbrycrd/lbrycrd.conf")
else:
default_lbrycrdd = os.path.join(os.path.expanduser("~"), ".lbrycrd/lbrycrd.conf")
lbrycrdd_path = lbrycrdd_path or default_lbrycrdd
l = LBRYcrd(lbrycrdd_path=lbrycrdd_path)
s = LBRYumWallet(lbryum_path)
value = value or 1.0
value = Decimal(value)
coins, change, change_address = coin_chooser(l, count * value)
addresses = [s.generate_address() for i in range(count)]
raw_tx = get_raw_tx(l, addresses, coins, value, change, change_address)
signed = l("signrawtransaction", raw_tx)['hex']
txid = l("sendrawtransaction", signed)
print txid
if __name__ == "__main__":
args = sys.argv[1:]
main(*args)

1486
tagsManager/tags_mapping.go Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,144 @@
package tagsManager
import (
"fmt"
"testing"
)
func TestSanitizeTags(t *testing.T) {
got, err := SanitizeTags([]string{"this", "super", "expensive", "test", "has", "a lot of", "crypto", "currency", "in it", "trump", "will build the", "wall"}, "UCNQfQvFMPnInwsU_iGYArJQ")
if err != nil {
t.Error(err)
return
}
expectedTags := []string{
"blockchain",
"switzerland",
"news",
"science & technology",
"economics",
"experiments",
"this",
"in it",
"will build the",
"has",
"crypto",
"trump",
"wall",
"expensive",
"currency",
"a lot of",
}
if len(expectedTags) != len(got) {
t.Error("number of tags differ")
return
}
outer:
for _, et := range expectedTags {
for _, t := range got {
if et == t {
continue outer
}
}
t.Error("tag not found")
return
}
}
func TestNormalizeTag(t *testing.T) {
tags := []string{
"blockchain",
"Switzerland",
"news ",
" science & Technology ",
"economics",
"experiments",
"this",
"in it",
"will build the (WOOPS)",
"~has",
"crypto",
"trump",
"wall",
"expensive",
"!currency",
" a lot of ",
"#",
"#whatever",
"#123",
"#123 Something else",
"#123aaa",
"!asdasd",
"CASA BLANCA",
"wwe 2k18 Elimination chamber!",
"pero'",
"però",
"è proprio",
"Ep 29",
"sctest29 Keddr",
"mortal kombat 11 shang tsung",
"!asdasd!",
}
normalizedTags := make([]string, 0, len(tags))
for _, tag := range tags {
got, err := normalizeTag(tag)
if err != nil {
t.Error(err)
return
}
if got != "" {
normalizedTags = append(normalizedTags, got)
}
fmt.Printf("Got tag: '%s'\n", got)
}
expected := []string{
"blockchain",
"switzerland",
"news",
"science & technology",
"economics",
"experiments",
"this",
"in it",
"will build the",
"has",
"crypto",
"trump",
"wall",
"expensive",
"currency",
"a lot of",
"whatever",
"123",
"something else",
"123aaa",
"asdasd",
"casa blanca",
"wwe 2k18 elimination chamber",
"pero",
"però",
"è proprio",
"ep 29",
"sctest29 keddr",
"mortal kombat 11 shang tsung",
"asdasd",
}
if !Equal(normalizedTags, expected) {
t.Error("result not as expected")
return
}
}
func Equal(a, b []string) bool {
if len(a) != len(b) {
fmt.Printf("expected length %d but got %d", len(b), len(a))
return false
}
for i, v := range a {
if v != b[i] {
fmt.Printf("expected %s but bot %s\n", b[i], v)
return false
}
}
return true
}

View file

@ -1,6 +1,7 @@
package thumbs
import (
"google.golang.org/api/youtube/v3"
"io"
"net/http"
"os"
@ -21,6 +22,7 @@ type thumbnailUploader struct {
}
const thumbnailPath = "/tmp/ytsync_thumbnails/"
const ThumbnailEndpoint = "https://thumbnails.lbry.com/"
func (u *thumbnailUploader) downloadThumbnail() error {
_ = os.Mkdir(thumbnailPath, 0750)
@ -44,7 +46,7 @@ func (u *thumbnailUploader) downloadThumbnail() error {
}
func (u *thumbnailUploader) uploadThumbnail() error {
key := aws.String("/thumbnails/" + u.name)
key := &u.name
thumb, err := os.Open("/tmp/ytsync_thumbnails/" + u.name)
if err != nil {
return errors.Err(err)
@ -62,8 +64,9 @@ func (u *thumbnailUploader) uploadThumbnail() error {
Bucket: aws.String("thumbnails.lbry.com"),
Key: key,
Body: thumb,
ACL: aws.String("public-read"),
})
u.mirroredUrl = "https://thumbnails.lbry.com/" + u.name
u.mirroredUrl = ThumbnailEndpoint + u.name
return errors.Err(err)
}
@ -92,3 +95,16 @@ func MirrorThumbnail(url string, name string, s3Config aws.Config) (string, erro
return tu.mirroredUrl, nil
}
func GetBestThumbnail(thumbnails *youtube.ThumbnailDetails) *youtube.Thumbnail {
if thumbnails.Maxres != nil {
return thumbnails.Maxres
} else if thumbnails.High != nil {
return thumbnails.High
} else if thumbnails.Medium != nil {
return thumbnails.Medium
} else if thumbnails.Standard != nil {
return thumbnails.Standard
}
return thumbnails.Default
}