Compare commits

..

53 commits
py39 ... master

Author SHA1 Message Date
Jonathan Moody
eb5da9511e Revert "TEMP: Try python 3.8."
This reverts commit 8def4d5177.
2023-04-03 13:34:36 -04:00
Jonathan Moody
8722ef840e Bump python_requires >= 3.8.
Code to handle CancelledError (as subclass of Exception) was removed.
2023-04-03 13:34:36 -04:00
Jonathan Moody
6e75a1a89b TEMP: Try python 3.8. 2023-04-03 13:34:36 -04:00
Jonathan Moody
ef3189de1d Work on some DeprecationWarnings: The explicit passing of coroutine objects to asyncio.wait() is deprecated since Python 3.8. 2023-04-03 13:34:36 -04:00
Jonathan Moody
c2d2080034 Try to suppress asyncio.CancelledError in a different way in test_streaming.py. 2023-04-03 13:34:36 -04:00
Jonathan Moody
d0b5a0a8fd TEMP: Add workflow_dispatch. 2023-04-03 13:34:36 -04:00
Jonathan Moody
1d0e17be21 Another place generalized to Exception or asyncio.CancelledError. 2023-04-03 13:34:36 -04:00
Jonathan Moody
4ef03bb1f4 Try separate file_manager.stop() and start() calls to better
control order of events in test.
While file_manager is stopped, we get no response to file_list().
2023-04-03 13:34:36 -04:00
Jonathan Moody
4bd4bcdc27 Try ubuntu-20.04 to resolve missing libffi.so.7 issue. 2023-04-03 13:34:36 -04:00
Jonathan Moody
e5ca967fa2 Make FileManager.stop() async because SourceManager.stop() is now async. 2023-04-03 13:34:36 -04:00
Jonathan Moody
eed7d02e8b Tweak aiohttp version to be compatible with hub repository. 2023-04-03 13:34:36 -04:00
Jonathan Moody
02aecad52b CancelledError derives from BaseException in Python >= 3.8. The significant functional
change here is in upload_to_reflector(). Unit tests in TestReflector were failing.
Deal with lint related to CancelledError cleanup.
2023-04-03 13:34:36 -04:00
Jonathan Moody
585962d930 Make stop(), stop_tasks() consistently async routines, and have stop_tasks()
wait for file_output_task completion. This fixes a problem with
test_download_stop_resume_delete.
2023-04-03 13:34:36 -04:00
Jonathan Moody
ea4fba39a6 Fix Transport, DatagramTransport mockup issues. 2023-04-03 13:34:36 -04:00
Jonathan Moody
7a86406746 Fix and enable lint no-self-use & try-except-raise. 2023-04-03 13:34:36 -04:00
Jonathan Moody
c8a3eb97a4 Bump pylint version. Old pylint did not find standard library stuff on 3.9.12. 2023-04-03 13:34:36 -04:00
Lex Berezhny
20213628d7 upgrade cryptography 2023-04-03 13:34:36 -04:00
Lex Berezhny
2d1649f972 pylint disable shuffle() arg check 2023-04-03 13:34:36 -04:00
Lex Berezhny
5cb04b86a0 shuffle() needs custom random, removed loop from Event()/Queue() 2023-04-03 13:34:36 -04:00
Lex Berezhny
93ab6b3be3 passing loop to asyncio functions is deprecated 2023-04-03 13:34:36 -04:00
Lex Berezhny
b9762c3e64 update plyvel 2023-04-03 13:34:36 -04:00
Lex Berezhny
82592d00ef try building 3.9 2023-04-03 13:34:36 -04:00
Jonathan Moody
c118174c1a Try shell: bash to simplify. 2023-02-02 14:16:07 -05:00
Jonathan Moody
d284acd8b8 Remove "debug pip cache". 2023-02-02 14:16:07 -05:00
Jonathan Moody
235c98372d Fix syntax. 2023-02-02 14:16:07 -05:00
Jonathan Moody
d2f5073ef4 Single "set pip cache dir" task with conditional inside. 2023-02-02 14:16:07 -05:00
Jonathan Moody
84e5e43117 Bump upload-artifact version too. 2023-02-02 14:16:07 -05:00
Jonathan Moody
7bd025ae54 Upgrade change-string-case. Use startsWith() to test runner.os.
Bump change-string-case-action version again.
2023-02-02 14:16:07 -05:00
Jonathan Moody
8f28ce65b0 Switch to environment vars in $GITHUB_ENV. 2023-02-02 14:16:07 -05:00
Jonathan Moody
d36e305129 Functions save-state, set-output deprecated. Use new mechanism. 2023-02-02 14:16:07 -05:00
Jonathan Moody
2609dee8fb Bump checkout, setup-python, cache action verions. 2023-02-02 14:16:07 -05:00
Lex Berezhny
a2da86d4b5 v0.113.0 2023-01-23 10:43:02 -05:00
Alex Grin
aa16c7fee5 Update conf.py 2023-01-23 10:30:25 -05:00
Alex Grin
3266f72b82 add s1.lbry.network 2023-01-23 10:30:25 -05:00
Jack Robison
77cd2a3f8a add more non lbry.com hubs/bootstrap dht nodes 2023-01-23 10:30:25 -05:00
Alex Grin
308e586e9a add grin's domain to bootstrap hubs list 2023-01-23 10:30:25 -05:00
84beddfd77 Added tracker and dht from pigg.es
Added tracker and dht from pigg.es
2023-01-22 19:09:17 -05:00
Victor Shyba
6258651650
Merge pull request #3716 from lbryio/dht_exceptions
handle remote exceptions on routing table ping
2022-12-13 17:18:47 -03:00
Victor Shyba
cc5f0b6630 handle remote exception on routing table ping 2022-12-13 16:56:58 -03:00
Jonathan Moody
f64d507d39 TEMP: Pin workflows to ubuntu-20.04 to work around missing ripemd160 issue. 2022-12-12 21:47:41 -05:00
Jonathan Moody
001819d5c2 Bump Hub to include fix for supports with wrong names. 2022-11-20 20:34:30 -05:00
Jonathan Moody
8b4c046d28 Try pyinstaller==4.6 to fix MacOS build failure. 2022-11-20 20:34:30 -05:00
Jonathan Moody
2c20ad6c43 Add another zlib.error mapped to InvalidPasswordError. 2022-11-20 20:34:30 -05:00
Jonathan Moody
9e610cc54c Update test for Hub rename of method stage_put() -> stash_put(). 2022-11-20 20:34:30 -05:00
Jonathan Moody
b9d25c6d01 Bump hub to latest, getting fix for TX negative caching issue and others. 2022-11-20 20:34:30 -05:00
Jonathan Moody
419b5b45f2 Allow a few initial "transaction not found" responses from Hub. 2022-11-20 20:34:30 -05:00
Jonathan Moody
516c2dd5d0 Bump hub to fix subscribe race + EADDRINUSE issue. 2022-11-20 20:34:30 -05:00
Jonathan Moody
b99102f9c9 Bump max_misuse_attempts by 50% to 120000. 2022-11-20 20:34:30 -05:00
Lex Berezhny
8c6c7b655c v0.112.0 2022-10-30 21:56:17 -04:00
Lex Berezhny
48c6873fc4 channel_sign command has customizeable salt 2022-10-30 21:53:53 -04:00
Victor Shyba
15dc52bd9a
Merge pull request #3695 from lbryio/3690
Fix claim fields fallback raising errors before download is saved on database
2022-10-28 11:16:52 -03:00
Victor Shyba
52d555078f initialize stored claim field for fallback earlier 2022-10-19 15:13:47 -03:00
Victor Shyba
cc976bd010 test for early fallback of suggested_file_name 2022-10-19 15:13:47 -03:00
33 changed files with 204 additions and 125 deletions

View file

@ -1,18 +1,18 @@
name: ci name: ci
on: ["push", "pull_request"] on: ["push", "pull_request", "workflow_dispatch"]
jobs: jobs:
lint: lint:
name: lint name: lint
runs-on: ubuntu-latest runs-on: ubuntu-20.04
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v3
- uses: actions/setup-python@v1 - uses: actions/setup-python@v4
with: with:
python-version: '3.9' python-version: '3.9'
- name: extract pip cache - name: extract pip cache
uses: actions/cache@v2 uses: actions/cache@v3
with: with:
path: ~/.cache/pip path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }} key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
@ -26,26 +26,26 @@ jobs:
strategy: strategy:
matrix: matrix:
os: os:
- ubuntu-latest - ubuntu-20.04
- macos-latest - macos-latest
- windows-latest - windows-latest
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v3
- uses: actions/setup-python@v1 - uses: actions/setup-python@v4
with: with:
python-version: '3.9' python-version: '3.9'
- name: set pip cache dir - name: set pip cache dir
id: pip-cache shell: bash
run: echo "::set-output name=dir::$(pip cache dir)" run: echo "PIP_CACHE_DIR=$(pip cache dir)" >> $GITHUB_ENV
- name: extract pip cache - name: extract pip cache
uses: actions/cache@v2 uses: actions/cache@v3
with: with:
path: ${{ steps.pip-cache.outputs.dir }} path: ${{ env.PIP_CACHE_DIR }}
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }} key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
restore-keys: ${{ runner.os }}-pip- restore-keys: ${{ runner.os }}-pip-
- id: os-name - id: os-name
uses: ASzc/change-string-case-action@v1 uses: ASzc/change-string-case-action@v5
with: with:
string: ${{ runner.os }} string: ${{ runner.os }}
- run: python -m pip install --user --upgrade pip wheel - run: python -m pip install --user --upgrade pip wheel
@ -72,7 +72,7 @@ jobs:
tests-integration: tests-integration:
name: "tests / integration" name: "tests / integration"
runs-on: ubuntu-latest runs-on: ubuntu-20.04
strategy: strategy:
matrix: matrix:
test: test:
@ -93,8 +93,8 @@ jobs:
uses: elastic/elastic-github-actions/elasticsearch@master uses: elastic/elastic-github-actions/elasticsearch@master
with: with:
stack-version: 7.12.1 stack-version: 7.12.1
- uses: actions/checkout@v2 - uses: actions/checkout@v3
- uses: actions/setup-python@v1 - uses: actions/setup-python@v4
with: with:
python-version: '3.9' python-version: '3.9'
- if: matrix.test == 'other' - if: matrix.test == 'other'
@ -102,7 +102,7 @@ jobs:
sudo apt-get update sudo apt-get update
sudo apt-get install -y --no-install-recommends ffmpeg sudo apt-get install -y --no-install-recommends ffmpeg
- name: extract pip cache - name: extract pip cache
uses: actions/cache@v2 uses: actions/cache@v3
with: with:
path: ./.tox path: ./.tox
key: tox-integration-${{ matrix.test }}-${{ hashFiles('setup.py') }} key: tox-integration-${{ matrix.test }}-${{ hashFiles('setup.py') }}
@ -123,7 +123,7 @@ jobs:
coverage: coverage:
needs: ["tests-unit", "tests-integration"] needs: ["tests-unit", "tests-integration"]
runs-on: ubuntu-latest runs-on: ubuntu-20.04
steps: steps:
- name: finalize coverage report submission - name: finalize coverage report submission
env: env:
@ -143,24 +143,24 @@ jobs:
- windows-latest - windows-latest
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v3
- uses: actions/setup-python@v1 - uses: actions/setup-python@v4
with: with:
python-version: '3.9' python-version: '3.9'
- id: os-name - id: os-name
uses: ASzc/change-string-case-action@v1 uses: ASzc/change-string-case-action@v5
with: with:
string: ${{ runner.os }} string: ${{ runner.os }}
- name: set pip cache dir - name: set pip cache dir
id: pip-cache shell: bash
run: echo "::set-output name=dir::$(pip cache dir)" run: echo "PIP_CACHE_DIR=$(pip cache dir)" >> $GITHUB_ENV
- name: extract pip cache - name: extract pip cache
uses: actions/cache@v2 uses: actions/cache@v3
with: with:
path: ${{ steps.pip-cache.outputs.dir }} path: ${{ env.PIP_CACHE_DIR }}
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }} key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
restore-keys: ${{ runner.os }}-pip- restore-keys: ${{ runner.os }}-pip-
- run: pip install pyinstaller==5.3 - run: pip install pyinstaller==4.6
- run: pip install -e . - run: pip install -e .
- if: startsWith(github.ref, 'refs/tags/v') - if: startsWith(github.ref, 'refs/tags/v')
run: python docker/set_build.py run: python docker/set_build.py
@ -175,7 +175,7 @@ jobs:
pip install pywin32==301 pip install pywin32==301
pyinstaller --additional-hooks-dir=scripts/. --icon=icons/lbry256.ico --onefile --name lbrynet lbry/extras/cli.py pyinstaller --additional-hooks-dir=scripts/. --icon=icons/lbry256.ico --onefile --name lbrynet lbry/extras/cli.py
dist/lbrynet.exe --version dist/lbrynet.exe --version
- uses: actions/upload-artifact@v2 - uses: actions/upload-artifact@v3
with: with:
name: lbrynet-${{ steps.os-name.outputs.lowercase }} name: lbrynet-${{ steps.os-name.outputs.lowercase }}
path: dist/ path: dist/
@ -184,7 +184,7 @@ jobs:
name: "release" name: "release"
if: startsWith(github.ref, 'refs/tags/v') if: startsWith(github.ref, 'refs/tags/v')
needs: ["build"] needs: ["build"]
runs-on: ubuntu-latest runs-on: ubuntu-20.04
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
- uses: actions/download-artifact@v2 - uses: actions/download-artifact@v2

View file

@ -7,7 +7,7 @@ on:
jobs: jobs:
release: release:
name: "slack notification" name: "slack notification"
runs-on: ubuntu-latest runs-on: ubuntu-20.04
steps: steps:
- uses: LoveToKnow/slackify-markdown-action@v1.0.0 - uses: LoveToKnow/slackify-markdown-action@v1.0.0
id: markdown id: markdown

View file

@ -1,2 +1,2 @@
__version__ = "0.111.0" __version__ = "0.113.0"
version = tuple(map(int, __version__.split('.'))) # pylint: disable=invalid-name version = tuple(map(int, __version__.split('.'))) # pylint: disable=invalid-name

View file

@ -64,7 +64,7 @@ class BlobDownloader:
self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1 self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1
async def new_peer_or_finished(self): async def new_peer_or_finished(self):
active_tasks = list(self.active_connections.values()) + [asyncio.sleep(1)] active_tasks = list(self.active_connections.values()) + [asyncio.create_task(asyncio.sleep(1))]
await asyncio.wait(active_tasks, return_when='FIRST_COMPLETED') await asyncio.wait(active_tasks, return_when='FIRST_COMPLETED')
def cleanup_active(self): def cleanup_active(self):

View file

@ -688,6 +688,9 @@ class Config(CLIConfig):
tracker_servers = Servers("BitTorrent-compatible (BEP15) UDP trackers for helping P2P discovery", [ tracker_servers = Servers("BitTorrent-compatible (BEP15) UDP trackers for helping P2P discovery", [
('tracker.lbry.com', 9252), ('tracker.lbry.com', 9252),
('tracker.lbry.grin.io', 9252), ('tracker.lbry.grin.io', 9252),
('tracker.lbry.pigg.es', 9252),
('tracker.lizard.technology', 9252),
('s1.lbry.network', 9252),
]) ])
lbryum_servers = Servers("SPV wallet servers", [ lbryum_servers = Servers("SPV wallet servers", [
@ -700,14 +703,20 @@ class Config(CLIConfig):
('spv17.lbry.com', 50001), ('spv17.lbry.com', 50001),
('spv18.lbry.com', 50001), ('spv18.lbry.com', 50001),
('spv19.lbry.com', 50001), ('spv19.lbry.com', 50001),
('hub.lbry.grin.io', 50001),
('hub.lizard.technology', 50001),
('s1.lbry.network', 50001),
]) ])
known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [ known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [
('dht.lbry.grin.io', 4444), # Grin ('dht.lbry.grin.io', 4444), # Grin
('dht.lbry.madiator.com', 4444), # Madiator ('dht.lbry.madiator.com', 4444), # Madiator
('dht.lbry.pigg.es', 4444), # Pigges
('lbrynet1.lbry.com', 4444), # US EAST ('lbrynet1.lbry.com', 4444), # US EAST
('lbrynet2.lbry.com', 4444), # US WEST ('lbrynet2.lbry.com', 4444), # US WEST
('lbrynet3.lbry.com', 4444), # EU ('lbrynet3.lbry.com', 4444), # EU
('lbrynet4.lbry.com', 4444) # ASIA ('lbrynet4.lbry.com', 4444), # ASIA
('dht.lizard.technology', 4444), # Jack
('s2.lbry.network', 4444),
]) ])
# blockchain # blockchain

View file

@ -8,6 +8,7 @@ from prometheus_client import Gauge
from lbry import utils from lbry import utils
from lbry.dht import constants from lbry.dht import constants
from lbry.dht.error import RemoteException
from lbry.dht.protocol.distance import Distance from lbry.dht.protocol.distance import Distance
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.dht.peer import KademliaPeer, PeerManager from lbry.dht.peer import KademliaPeer, PeerManager
@ -395,7 +396,7 @@ class TreeRoutingTable:
try: try:
await probe(to_replace) await probe(to_replace)
return False return False
except asyncio.TimeoutError: except (asyncio.TimeoutError, RemoteException):
log.debug("Replacing dead contact in bucket %i: %s:%i with %s:%i ", bucket_index, log.debug("Replacing dead contact in bucket %i: %s:%i with %s:%i ", bucket_index,
to_replace.address, to_replace.udp_port, peer.address, peer.udp_port) to_replace.address, to_replace.udp_port, peer.address, peer.udp_port)
if to_replace in self.buckets[bucket_index]: if to_replace in self.buckets[bucket_index]:

View file

@ -37,7 +37,7 @@ class Component(metaclass=ComponentType):
def running(self): def running(self):
return self._running return self._running
async def get_status(self): async def get_status(self): # pylint: disable=no-self-use
return return
async def start(self): async def start(self):

View file

@ -118,7 +118,7 @@ class ComponentManager:
component._setup() for component in stage if not component.running component._setup() for component in stage if not component.running
] ]
if needing_start: if needing_start:
await asyncio.wait(needing_start) await asyncio.wait(map(asyncio.create_task, needing_start))
self.started.set() self.started.set()
async def stop(self): async def stop(self):
@ -131,7 +131,7 @@ class ComponentManager:
component._stop() for component in stage if component.running component._stop() for component in stage if component.running
] ]
if needing_stop: if needing_stop:
await asyncio.wait(needing_stop) await asyncio.wait(map(asyncio.create_task, needing_stop))
def all_components_running(self, *component_names): def all_components_running(self, *component_names):
""" """

View file

@ -374,7 +374,7 @@ class FileManagerComponent(Component):
log.info('Done setting up file manager') log.info('Done setting up file manager')
async def stop(self): async def stop(self):
self.file_manager.stop() await self.file_manager.stop()
class BackgroundDownloaderComponent(Component): class BackgroundDownloaderComponent(Component):

View file

@ -614,7 +614,8 @@ class Daemon(metaclass=JSONRPCServerType):
content_type='application/json' content_type='application/json'
) )
async def handle_metrics_get_request(self, request: web.Request): @staticmethod
async def handle_metrics_get_request(request: web.Request):
try: try:
return web.Response( return web.Response(
text=prom_generate_latest().decode(), text=prom_generate_latest().decode(),
@ -2943,19 +2944,21 @@ class Daemon(metaclass=JSONRPCServerType):
@requires(WALLET_COMPONENT) @requires(WALLET_COMPONENT)
async def jsonrpc_channel_sign( async def jsonrpc_channel_sign(
self, channel_name=None, channel_id=None, hexdata=None, channel_account_id=None, wallet_id=None): self, channel_name=None, channel_id=None, hexdata=None, salt=None,
channel_account_id=None, wallet_id=None):
""" """
Signs data using the specified channel signing key. Signs data using the specified channel signing key.
Usage: Usage:
channel_sign [<channel_name> | --channel_name=<channel_name>] channel_sign [<channel_name> | --channel_name=<channel_name>] [<channel_id> | --channel_id=<channel_id>]
[<channel_id> | --channel_id=<channel_id>] [<hexdata> | --hexdata=<hexdata>] [<hexdata> | --hexdata=<hexdata>] [<salt> | --salt=<salt>]
[--channel_account_id=<channel_account_id>...] [--wallet_id=<wallet_id>] [--channel_account_id=<channel_account_id>...] [--wallet_id=<wallet_id>]
Options: Options:
--channel_name=<channel_name> : (str) name of channel used to sign (or use channel id) --channel_name=<channel_name> : (str) name of channel used to sign (or use channel id)
--channel_id=<channel_id> : (str) claim id of channel used to sign (or use channel name) --channel_id=<channel_id> : (str) claim id of channel used to sign (or use channel name)
--hexdata=<hexdata> : (str) data to sign, encoded as hexadecimal --hexdata=<hexdata> : (str) data to sign, encoded as hexadecimal
--salt=<salt> : (str) salt to use for signing, default is to use timestamp
--channel_account_id=<channel_account_id>: (str) one or more account ids for accounts to look in --channel_account_id=<channel_account_id>: (str) one or more account ids for accounts to look in
for channel certificates, defaults to all accounts. for channel certificates, defaults to all accounts.
--wallet_id=<wallet_id> : (str) restrict operation to specific wallet --wallet_id=<wallet_id> : (str) restrict operation to specific wallet
@ -2972,11 +2975,13 @@ class Daemon(metaclass=JSONRPCServerType):
signing_channel = await self.get_channel_or_error( signing_channel = await self.get_channel_or_error(
wallet, channel_account_id, channel_id, channel_name, for_signing=True wallet, channel_account_id, channel_id, channel_name, for_signing=True
) )
timestamp = str(int(time.time())) if salt is None:
signature = signing_channel.sign_data(unhexlify(str(hexdata)), timestamp) salt = str(int(time.time()))
signature = signing_channel.sign_data(unhexlify(str(hexdata)), salt)
return { return {
'signature': signature, 'signature': signature,
'signing_ts': timestamp 'signing_ts': salt, # DEPRECATED
'salt': salt,
} }
@requires(WALLET_COMPONENT) @requires(WALLET_COMPONENT)

View file

@ -793,7 +793,7 @@ class SQLiteStorage(SQLiteMixin):
await self.db.run(_save_claims) await self.db.run(_save_claims)
if update_file_callbacks: if update_file_callbacks:
await asyncio.wait(update_file_callbacks) await asyncio.wait(map(asyncio.create_task, update_file_callbacks))
if claim_id_to_supports: if claim_id_to_supports:
await self.save_supports(claim_id_to_supports) await self.save_supports(claim_id_to_supports)

View file

@ -13,11 +13,12 @@ from lbry.schema.url import URL
from lbry.wallet.dewies import dewies_to_lbc from lbry.wallet.dewies import dewies_to_lbc
from lbry.file.source_manager import SourceManager from lbry.file.source_manager import SourceManager
from lbry.file.source import ManagedDownloadSource from lbry.file.source import ManagedDownloadSource
from lbry.extras.daemon.storage import StoredContentClaim
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.conf import Config from lbry.conf import Config
from lbry.extras.daemon.analytics import AnalyticsManager from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.extras.daemon.storage import SQLiteStorage from lbry.extras.daemon.storage import SQLiteStorage
from lbry.wallet import WalletManager, Output from lbry.wallet import WalletManager
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -49,10 +50,10 @@ class FileManager:
await manager.started.wait() await manager.started.wait()
self.started.set() self.started.set()
def stop(self): async def stop(self):
for manager in self.source_managers.values(): for manager in self.source_managers.values():
# fixme: pop or not? # fixme: pop or not?
manager.stop() await manager.stop()
self.started.clear() self.started.clear()
@cache_concurrent @cache_concurrent
@ -192,21 +193,24 @@ class FileManager:
#################### ####################
# make downloader and wait for start # make downloader and wait for start
#################### ####################
# temporary with fields we know so downloader can start. Missing fields are populated later.
stored_claim = StoredContentClaim(outpoint=outpoint, claim_id=txo.claim_id, name=txo.claim_name,
amount=txo.amount, height=txo.tx_ref.height,
serialized=claim.to_bytes().hex())
if not claim.stream.source.bt_infohash: if not claim.stream.source.bt_infohash:
# fixme: this shouldnt be here # fixme: this shouldnt be here
stream = ManagedStream( stream = ManagedStream(
self.loop, self.config, source_manager.blob_manager, claim.stream.source.sd_hash, self.loop, self.config, source_manager.blob_manager, claim.stream.source.sd_hash,
download_directory, file_name, ManagedStream.STATUS_RUNNING, content_fee=payment, download_directory, file_name, ManagedStream.STATUS_RUNNING, content_fee=payment,
analytics_manager=self.analytics_manager analytics_manager=self.analytics_manager, claim=stored_claim
) )
stream.downloader.node = source_manager.node stream.downloader.node = source_manager.node
else: else:
stream = TorrentSource( stream = TorrentSource(
self.loop, self.config, self.storage, identifier=claim.stream.source.bt_infohash, self.loop, self.config, self.storage, identifier=claim.stream.source.bt_infohash,
file_name=file_name, download_directory=download_directory or self.config.download_dir, file_name=file_name, download_directory=download_directory or self.config.download_dir,
status=ManagedStream.STATUS_RUNNING, status=ManagedStream.STATUS_RUNNING, claim=stored_claim, analytics_manager=self.analytics_manager,
analytics_manager=self.analytics_manager,
torrent_session=source_manager.torrent_session torrent_session=source_manager.torrent_session
) )
log.info("starting download for %s", uri) log.info("starting download for %s", uri)

View file

@ -67,7 +67,7 @@ class ManagedDownloadSource:
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None): async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
raise NotImplementedError() raise NotImplementedError()
def stop_tasks(self): async def stop_tasks(self):
raise NotImplementedError() raise NotImplementedError()
def set_claim(self, claim_info: typing.Dict, claim: 'Claim'): def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):

View file

@ -59,11 +59,11 @@ class SourceManager:
def add(self, source: ManagedDownloadSource): def add(self, source: ManagedDownloadSource):
self._sources[source.identifier] = source self._sources[source.identifier] = source
def remove(self, source: ManagedDownloadSource): async def remove(self, source: ManagedDownloadSource):
if source.identifier not in self._sources: if source.identifier not in self._sources:
return return
self._sources.pop(source.identifier) self._sources.pop(source.identifier)
source.stop_tasks() await source.stop_tasks()
async def initialize_from_database(self): async def initialize_from_database(self):
raise NotImplementedError() raise NotImplementedError()
@ -72,10 +72,10 @@ class SourceManager:
await self.initialize_from_database() await self.initialize_from_database()
self.started.set() self.started.set()
def stop(self): async def stop(self):
while self._sources: while self._sources:
_, source = self._sources.popitem() _, source = self._sources.popitem()
source.stop_tasks() await source.stop_tasks()
self.started.clear() self.started.clear()
async def create(self, file_path: str, key: Optional[bytes] = None, async def create(self, file_path: str, key: Optional[bytes] = None,
@ -83,7 +83,7 @@ class SourceManager:
raise NotImplementedError() raise NotImplementedError()
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
self.remove(source) await self.remove(source)
if delete_file and source.output_file_exists: if delete_file and source.output_file_exists:
os.remove(source.full_path) os.remove(source.full_path)

View file

@ -22,6 +22,9 @@ class BackgroundDownloader:
await downloader.download_stream_blob(blob_info) await downloader.download_stream_blob(blob_info)
except ValueError: except ValueError:
return return
except asyncio.CancelledError:
log.debug("Cancelled background downloader")
raise
except Exception: except Exception:
log.error("Unexpected download error on background downloader") log.error("Unexpected download error on background downloader")
finally: finally:

View file

@ -191,7 +191,7 @@ class ManagedStream(ManagedDownloadSource):
Stop any running save/stream tasks as well as the downloader and update the status in the database Stop any running save/stream tasks as well as the downloader and update the status in the database
""" """
self.stop_tasks() await self.stop_tasks()
if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING: if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING:
await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED) await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED)
@ -324,12 +324,13 @@ class ManagedStream(ManagedDownloadSource):
await asyncio.wait_for(self.started_writing.wait(), self.config.download_timeout) await asyncio.wait_for(self.started_writing.wait(), self.config.download_timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
log.warning("timeout starting to write data for lbry://%s#%s", self.claim_name, self.claim_id) log.warning("timeout starting to write data for lbry://%s#%s", self.claim_name, self.claim_id)
self.stop_tasks() await self.stop_tasks()
await self.update_status(ManagedStream.STATUS_STOPPED) await self.update_status(ManagedStream.STATUS_STOPPED)
def stop_tasks(self): async def stop_tasks(self):
if self.file_output_task and not self.file_output_task.done(): if self.file_output_task and not self.file_output_task.done():
self.file_output_task.cancel() self.file_output_task.cancel()
await asyncio.gather(self.file_output_task, return_exceptions=True)
self.file_output_task = None self.file_output_task = None
while self.streaming_responses: while self.streaming_responses:
req, response = self.streaming_responses.pop() req, response = self.streaming_responses.pop()

View file

@ -196,8 +196,8 @@ class StreamManager(SourceManager):
await super().start() await super().start()
self.re_reflect_task = self.loop.create_task(self.reflect_streams()) self.re_reflect_task = self.loop.create_task(self.reflect_streams())
def stop(self): async def stop(self):
super().stop() await super().stop()
if self.resume_saving_task and not self.resume_saving_task.done(): if self.resume_saving_task and not self.resume_saving_task.done():
self.resume_saving_task.cancel() self.resume_saving_task.cancel()
if self.re_reflect_task and not self.re_reflect_task.done(): if self.re_reflect_task and not self.re_reflect_task.done():
@ -224,7 +224,8 @@ class StreamManager(SourceManager):
) )
return task return task
async def _retriable_reflect_stream(self, stream, host, port): @staticmethod
async def _retriable_reflect_stream(stream, host, port):
sent = await stream.upload_to_reflector(host, port) sent = await stream.upload_to_reflector(host, port)
while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0: while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0:
stream.reflector_progress = 0 stream.reflector_progress = 0
@ -259,7 +260,7 @@ class StreamManager(SourceManager):
return return
if source.identifier in self.running_reflector_uploads: if source.identifier in self.running_reflector_uploads:
self.running_reflector_uploads[source.identifier].cancel() self.running_reflector_uploads[source.identifier].cancel()
source.stop_tasks() await source.stop_tasks()
if source.identifier in self.streams: if source.identifier in self.streams:
del self.streams[source.identifier] del self.streams[source.identifier]
blob_hashes = [source.identifier] + [b.blob_hash for b in source.descriptor.blobs[:-1]] blob_hashes = [source.identifier] + [b.blob_hash for b in source.descriptor.blobs[:-1]]

View file

@ -22,9 +22,9 @@ class TorrentHandle:
self._loop = loop self._loop = loop
self._executor = executor self._executor = executor
self._handle: libtorrent.torrent_handle = handle self._handle: libtorrent.torrent_handle = handle
self.started = asyncio.Event() self.started = asyncio.Event(loop=loop)
self.finished = asyncio.Event() self.finished = asyncio.Event(loop=loop)
self.metadata_completed = asyncio.Event() self.metadata_completed = asyncio.Event(loop=loop)
self.size = 0 self.size = 0
self.total_wanted_done = 0 self.total_wanted_done = 0
self.name = '' self.name = ''

View file

@ -74,7 +74,7 @@ class TorrentSource(ManagedDownloadSource):
def bt_infohash(self): def bt_infohash(self):
return self.identifier return self.identifier
def stop_tasks(self): async def stop_tasks(self):
pass pass
@property @property
@ -118,8 +118,8 @@ class TorrentManager(SourceManager):
async def start(self): async def start(self):
await super().start() await super().start()
def stop(self): async def stop(self):
super().stop() await super().stop()
log.info("finished stopping the torrent manager") log.info("finished stopping the torrent manager")
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):

View file

@ -329,10 +329,10 @@ class Ledger(metaclass=LedgerRegistry):
async def start(self): async def start(self):
if not os.path.exists(self.path): if not os.path.exists(self.path):
os.mkdir(self.path) os.mkdir(self.path)
await asyncio.wait([ await asyncio.wait(map(asyncio.create_task, [
self.db.open(), self.db.open(),
self.headers.open() self.headers.open()
]) ]))
fully_synced = self.on_ready.first fully_synced = self.on_ready.first
asyncio.create_task(self.network.start()) asyncio.create_task(self.network.start())
await self.network.on_connected.first await self.network.on_connected.first
@ -466,9 +466,9 @@ class Ledger(metaclass=LedgerRegistry):
async def subscribe_accounts(self): async def subscribe_accounts(self):
if self.network.is_connected and self.accounts: if self.network.is_connected and self.accounts:
log.info("Subscribe to %i accounts", len(self.accounts)) log.info("Subscribe to %i accounts", len(self.accounts))
await asyncio.wait([ await asyncio.wait(map(asyncio.create_task, [
self.subscribe_account(a) for a in self.accounts self.subscribe_account(a) for a in self.accounts
]) ]))
async def subscribe_account(self, account: Account): async def subscribe_account(self, account: Account):
for address_manager in account.address_managers.values(): for address_manager in account.address_managers.values():

View file

@ -214,7 +214,7 @@ class Network:
def loop_task_done_callback(f): def loop_task_done_callback(f):
try: try:
f.result() f.result()
except Exception: except (Exception, asyncio.CancelledError):
if self.running: if self.running:
log.exception("wallet server connection loop crashed") log.exception("wallet server connection loop crashed")
@ -312,7 +312,8 @@ class Network:
sleep_delay = 30 sleep_delay = 30
while self.running: while self.running:
await asyncio.wait( await asyncio.wait(
[asyncio.sleep(30), self._urgent_need_reconnect.wait()], return_when=asyncio.FIRST_COMPLETED map(asyncio.create_task, [asyncio.sleep(30), self._urgent_need_reconnect.wait()]),
return_when=asyncio.FIRST_COMPLETED
) )
if self._urgent_need_reconnect.is_set(): if self._urgent_need_reconnect.is_set():
sleep_delay = 30 sleep_delay = 30
@ -338,7 +339,7 @@ class Network:
try: try:
if not self._urgent_need_reconnect.is_set(): if not self._urgent_need_reconnect.is_set():
await asyncio.wait( await asyncio.wait(
[self._keepalive_task, self._urgent_need_reconnect.wait()], [self._keepalive_task, asyncio.create_task(self._urgent_need_reconnect.wait())],
return_when=asyncio.FIRST_COMPLETED return_when=asyncio.FIRST_COMPLETED
) )
else: else:

View file

@ -214,6 +214,7 @@ class SPVNode:
self.port = 50001 + node_number # avoid conflict with default daemon self.port = 50001 + node_number # avoid conflict with default daemon
self.udp_port = self.port self.udp_port = self.port
self.elastic_notifier_port = 19080 + node_number self.elastic_notifier_port = 19080 + node_number
self.elastic_services = f'localhost:9200/localhost:{self.elastic_notifier_port}'
self.session_timeout = 600 self.session_timeout = 600
self.stopped = True self.stopped = True
self.index_name = uuid4().hex self.index_name = uuid4().hex
@ -235,7 +236,7 @@ class SPVNode:
'host': self.hostname, 'host': self.hostname,
'tcp_port': self.port, 'tcp_port': self.port,
'udp_port': self.udp_port, 'udp_port': self.udp_port,
'elastic_notifier_port': self.elastic_notifier_port, 'elastic_services': self.elastic_services,
'session_timeout': self.session_timeout, 'session_timeout': self.session_timeout,
'max_query_workers': 0, 'max_query_workers': 0,
'es_index_prefix': self.index_name, 'es_index_prefix': self.index_name,
@ -263,8 +264,7 @@ class SPVNode:
await self.server.start() await self.server.start()
except Exception as e: except Exception as e:
self.stopped = True self.stopped = True
if not isinstance(e, asyncio.CancelledError): log.exception("failed to start spv node")
log.exception("failed to start spv node")
raise e raise e
async def stop(self, cleanup=True): async def stop(self, cleanup=True):

View file

@ -182,6 +182,8 @@ class Wallet:
raise InvalidPasswordError() raise InvalidPasswordError()
if "unknown compression method" in e.args[0].lower(): if "unknown compression method" in e.args[0].lower():
raise InvalidPasswordError() raise InvalidPasswordError()
if "invalid window size" in e.args[0].lower():
raise InvalidPasswordError()
raise raise
return json.loads(decompressed) return json.loads(decompressed)

View file

@ -28,6 +28,7 @@ disable=
no-else-return, no-else-return,
cyclic-import, cyclic-import,
missing-docstring, missing-docstring,
consider-using-f-string,
duplicate-code, duplicate-code,
expression-not-assigned, expression-not-assigned,
inconsistent-return-statements, inconsistent-return-statements,

View file

@ -18,7 +18,7 @@ setup(
long_description_content_type="text/markdown", long_description_content_type="text/markdown",
keywords="lbry protocol media", keywords="lbry protocol media",
license='MIT', license='MIT',
python_requires='>=3.9', python_requires='>=3.8',
packages=find_packages(exclude=('tests',)), packages=find_packages(exclude=('tests',)),
zip_safe=False, zip_safe=False,
entry_points={ entry_points={
@ -40,7 +40,7 @@ setup(
'protobuf==3.17.2', 'protobuf==3.17.2',
'prometheus_client==0.7.1', 'prometheus_client==0.7.1',
'ecdsa==0.13.3', 'ecdsa==0.13.3',
'pyyaml==5.4', 'pyyaml==5.3.1',
'docopt==0.6.2', 'docopt==0.6.2',
'hachoir==3.1.2', 'hachoir==3.1.2',
'coincurve==15.0.0', 'coincurve==15.0.0',
@ -50,14 +50,14 @@ setup(
], ],
extras_require={ extras_require={
'lint': [ 'lint': [
'pylint==2.10.0' 'pylint==2.13.9'
], ],
'test': [ 'test': [
'coverage', 'coverage',
'jsonschema==4.4.0', 'jsonschema==4.4.0',
], ],
'hub': [ 'hub': [
'hub@git+https://github.com/lbryio/hub.git@024aceda53fe6d1ab8d519b73584437c25de6975' 'hub@git+https://github.com/lbryio/hub.git@929448d64bcbe6c5e476757ec78456beaa85e56a'
] ]
}, },
classifiers=[ classifiers=[

View file

@ -61,16 +61,14 @@ def mock_network_loop(loop: asyncio.AbstractEventLoop,
dht_network[from_addr] = protocol dht_network[from_addr] = protocol
return transport, protocol return transport, protocol
with mock.patch('socket.socket') as mock_socket: mock_sock = mock.Mock(spec=socket.socket)
mock_sock = mock.Mock(spec=socket.socket) mock_sock.setsockopt = lambda *_: None
mock_sock.setsockopt = lambda *_: None mock_sock.bind = lambda *_: None
mock_sock.bind = lambda *_: None mock_sock.setblocking = lambda *_: None
mock_sock.setblocking = lambda *_: None mock_sock.getsockname = lambda: "0.0.0.0"
mock_sock.getsockname = lambda: "0.0.0.0" mock_sock.getpeername = lambda: ""
mock_sock.getpeername = lambda: "" mock_sock.close = lambda: None
mock_sock.close = lambda: None mock_sock.type = socket.SOCK_DGRAM
mock_sock.type = socket.SOCK_DGRAM mock_sock.fileno = lambda: 7
mock_sock.fileno = lambda: 7 loop.create_datagram_endpoint = create_datagram_endpoint
mock_socket.return_value = mock_sock yield
loop.create_datagram_endpoint = create_datagram_endpoint
yield

View file

@ -31,7 +31,7 @@ STREAM_TYPES = {
def verify(channel, data, signature, channel_hash=None): def verify(channel, data, signature, channel_hash=None):
pieces = [ pieces = [
signature['signing_ts'].encode(), signature['salt'].encode(),
channel_hash or channel.claim_hash, channel_hash or channel.claim_hash,
data data
] ]
@ -1239,8 +1239,13 @@ class ChannelCommands(CommandTestCase):
channel = channel_tx.outputs[0] channel = channel_tx.outputs[0]
signature1 = await self.out(self.daemon.jsonrpc_channel_sign(channel_name='@signer', hexdata=data_to_sign)) signature1 = await self.out(self.daemon.jsonrpc_channel_sign(channel_name='@signer', hexdata=data_to_sign))
signature2 = await self.out(self.daemon.jsonrpc_channel_sign(channel_id=channel.claim_id, hexdata=data_to_sign)) signature2 = await self.out(self.daemon.jsonrpc_channel_sign(channel_id=channel.claim_id, hexdata=data_to_sign))
signature3 = await self.out(self.daemon.jsonrpc_channel_sign(channel_id=channel.claim_id, hexdata=data_to_sign, salt='beef'))
signature4 = await self.out(self.daemon.jsonrpc_channel_sign(channel_id=channel.claim_id, hexdata=data_to_sign, salt='beef'))
self.assertNotEqual(signature2, signature3)
self.assertEqual(signature3, signature4)
self.assertTrue(verify(channel, unhexlify(data_to_sign), signature1)) self.assertTrue(verify(channel, unhexlify(data_to_sign), signature1))
self.assertTrue(verify(channel, unhexlify(data_to_sign), signature2)) self.assertTrue(verify(channel, unhexlify(data_to_sign), signature2))
self.assertTrue(verify(channel, unhexlify(data_to_sign), signature3))
signature3 = await self.out(self.daemon.jsonrpc_channel_sign(channel_id=channel.claim_id, hexdata=99)) signature3 = await self.out(self.daemon.jsonrpc_channel_sign(channel_id=channel.claim_id, hexdata=99))
self.assertTrue(verify(channel, unhexlify('99'), signature3)) self.assertTrue(verify(channel, unhexlify('99'), signature3))

View file

@ -1,5 +1,5 @@
import unittest import unittest
from unittest import skipIf, skip from unittest import skipIf
import asyncio import asyncio
import os import os
from binascii import hexlify from binascii import hexlify
@ -51,8 +51,7 @@ class FileCommands(CommandTestCase):
self.addCleanup(task.cancel) self.addCleanup(task.cancel)
return tx, btih return tx, btih
#@skipIf(TorrentSession is None, "libtorrent not installed") @skipIf(TorrentSession is None, "libtorrent not installed")
@skip
async def test_download_torrent(self): async def test_download_torrent(self):
tx, btih = await self.initialize_torrent() tx, btih = await self.initialize_torrent()
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent'))) self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent')))
@ -90,6 +89,21 @@ class FileCommands(CommandTestCase):
await self.reflector.blob_manager.delete_blobs(all_except_sd) await self.reflector.blob_manager.delete_blobs(all_except_sd)
self.assertEqual(all_except_sd, await self.daemon.jsonrpc_file_reflect(sd_hash=sd_hash)) self.assertEqual(all_except_sd, await self.daemon.jsonrpc_file_reflect(sd_hash=sd_hash))
async def test_sd_blob_fields_fallback(self):
claim_id = self.get_claim_id(await self.stream_create('foo', '0.01', suffix='.txt'))
stream = (await self.daemon.jsonrpc_file_list())["items"][0]
stream.descriptor.suggested_file_name = ' '
stream.descriptor.stream_name = ' '
stream.descriptor.stream_hash = stream.descriptor.get_stream_hash()
sd_hash = stream.descriptor.sd_hash = stream.descriptor.calculate_sd_hash()
await stream.descriptor.make_sd_blob()
await self.daemon.jsonrpc_file_delete(claim_name='foo')
await self.stream_update(claim_id=claim_id, sd_hash=sd_hash)
file_dict = await self.out(self.daemon.jsonrpc_get('lbry://foo', save_file=True))
self.assertEqual(file_dict['suggested_file_name'], stream.file_name)
self.assertEqual(file_dict['stream_name'], stream.file_name)
self.assertEqual(file_dict['mime_type'], 'text/plain')
async def test_file_management(self): async def test_file_management(self):
await self.stream_create('foo', '0.01') await self.stream_create('foo', '0.01')
await self.stream_create('foo2', '0.01') await self.stream_create('foo2', '0.01')
@ -340,7 +354,7 @@ class FileCommands(CommandTestCase):
await self.daemon.jsonrpc_get('lbry://foo') await self.daemon.jsonrpc_get('lbry://foo')
with open(original_path, 'wb') as handle: with open(original_path, 'wb') as handle:
handle.write(b'some other stuff was there instead') handle.write(b'some other stuff was there instead')
self.daemon.file_manager.stop() await self.daemon.file_manager.stop()
await self.daemon.file_manager.start() await self.daemon.file_manager.start()
await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) # if this hangs, file didn't get set completed await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) # if this hangs, file didn't get set completed
# check that internal state got through up to the file list API # check that internal state got through up to the file list API
@ -368,8 +382,7 @@ class FileCommands(CommandTestCase):
resp = await self.out(self.daemon.jsonrpc_get('lbry://foo', timeout=2)) resp = await self.out(self.daemon.jsonrpc_get('lbry://foo', timeout=2))
self.assertNotIn('error', resp) self.assertNotIn('error', resp)
self.assertTrue(os.path.isfile(path)) self.assertTrue(os.path.isfile(path))
self.daemon.file_manager.stop() await self.daemon.file_manager.stop()
await asyncio.sleep(0.01) # FIXME: this sleep should not be needed
self.assertFalse(os.path.isfile(path)) self.assertFalse(os.path.isfile(path))
async def test_incomplete_downloads_retry(self): async def test_incomplete_downloads_retry(self):
@ -464,7 +477,7 @@ class FileCommands(CommandTestCase):
# restart the daemon and make sure the fee is still there # restart the daemon and make sure the fee is still there
self.daemon.file_manager.stop() await self.daemon.file_manager.stop()
await self.daemon.file_manager.start() await self.daemon.file_manager.start()
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].content_fee.raw, raw_content_fee) self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].content_fee.raw, raw_content_fee)

View file

@ -3,7 +3,9 @@ import hashlib
import aiohttp import aiohttp
import aiohttp.web import aiohttp.web
import asyncio import asyncio
import contextlib
from lbry.file.source import ManagedDownloadSource
from lbry.utils import aiohttp_request from lbry.utils import aiohttp_request
from lbry.blob.blob_file import MAX_BLOB_SIZE from lbry.blob.blob_file import MAX_BLOB_SIZE
from lbry.testcase import CommandTestCase from lbry.testcase import CommandTestCase
@ -21,7 +23,7 @@ def get_random_bytes(n: int) -> bytes:
class RangeRequests(CommandTestCase): class RangeRequests(CommandTestCase):
async def _restart_stream_manager(self): async def _restart_stream_manager(self):
self.daemon.file_manager.stop() await self.daemon.file_manager.stop()
await self.daemon.file_manager.start() await self.daemon.file_manager.start()
return return
@ -352,14 +354,21 @@ class RangeRequests(CommandTestCase):
path = stream.full_path path = stream.full_path
self.assertIsNotNone(path) self.assertIsNotNone(path)
if wait_for_start_writing: if wait_for_start_writing:
await stream.started_writing.wait() with contextlib.suppress(asyncio.CancelledError):
await stream.started_writing.wait()
self.assertTrue(os.path.isfile(path)) self.assertTrue(os.path.isfile(path))
await self._restart_stream_manager() await self.daemon.file_manager.stop()
# while stopped, we get no response to query and no file is present
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'], [])
self.assertEqual(os.path.isfile(path), stream.status == ManagedDownloadSource.STATUS_FINISHED)
await self.daemon.file_manager.start()
# after restart, we get a response to query and same file path
stream = (await self.daemon.jsonrpc_file_list())['items'][0] stream = (await self.daemon.jsonrpc_file_list())['items'][0]
self.assertIsNotNone(stream.full_path) self.assertIsNotNone(stream.full_path)
self.assertFalse(os.path.isfile(path)) self.assertEqual(stream.full_path, path)
if wait_for_start_writing: if wait_for_start_writing:
await stream.started_writing.wait() with contextlib.suppress(asyncio.CancelledError):
await stream.started_writing.wait()
self.assertTrue(os.path.isfile(path)) self.assertTrue(os.path.isfile(path))
async def test_file_save_stop_before_finished_streaming_only_wait_for_start(self): async def test_file_save_stop_before_finished_streaming_only_wait_for_start(self):

View file

@ -1508,27 +1508,27 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
COIN = int(1E8) COIN = int(1E8)
self.assertEqual(self.conductor.spv_node.writer.height, 207) self.assertEqual(self.conductor.spv_node.writer.height, 207)
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stage_put( self.conductor.spv_node.writer.db.prefix_db.trending_notification.stash_put(
(208, bytes.fromhex(claim_id1)), (0, 10 * COIN) (208, bytes.fromhex(claim_id1)), (0, 10 * COIN)
) )
await self.generate(1) await self.generate(1)
self.assertEqual(self.conductor.spv_node.writer.height, 208) self.assertEqual(self.conductor.spv_node.writer.height, 208)
self.assertEqual(1.7090807854206793, await get_trending_score(claim_id1)) self.assertEqual(1.7090807854206793, await get_trending_score(claim_id1))
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stage_put( self.conductor.spv_node.writer.db.prefix_db.trending_notification.stash_put(
(209, bytes.fromhex(claim_id1)), (10 * COIN, 100 * COIN) (209, bytes.fromhex(claim_id1)), (10 * COIN, 100 * COIN)
) )
await self.generate(1) await self.generate(1)
self.assertEqual(self.conductor.spv_node.writer.height, 209) self.assertEqual(self.conductor.spv_node.writer.height, 209)
self.assertEqual(2.2437974397778886, await get_trending_score(claim_id1)) self.assertEqual(2.2437974397778886, await get_trending_score(claim_id1))
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stage_put( self.conductor.spv_node.writer.db.prefix_db.trending_notification.stash_put(
(309, bytes.fromhex(claim_id1)), (100 * COIN, 1000000 * COIN) (309, bytes.fromhex(claim_id1)), (100 * COIN, 1000000 * COIN)
) )
await self.generate(100) await self.generate(100)
self.assertEqual(self.conductor.spv_node.writer.height, 309) self.assertEqual(self.conductor.spv_node.writer.height, 309)
self.assertEqual(5.157053472135866, await get_trending_score(claim_id1)) self.assertEqual(5.157053472135866, await get_trending_score(claim_id1))
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stage_put( self.conductor.spv_node.writer.db.prefix_db.trending_notification.stash_put(
(409, bytes.fromhex(claim_id1)), (1000000 * COIN, 1 * COIN) (409, bytes.fromhex(claim_id1)), (1000000 * COIN, 1 * COIN)
) )

View file

@ -2,7 +2,7 @@ import asyncio
import unittest import unittest
from lbry.testcase import CommandTestCase from lbry.testcase import CommandTestCase
from lbry.wallet import Transaction
class TransactionCommandsTestCase(CommandTestCase): class TransactionCommandsTestCase(CommandTestCase):
@ -29,17 +29,42 @@ class TransactionCommandsTestCase(CommandTestCase):
# someone's tx # someone's tx
change_address = await self.blockchain.get_raw_change_address() change_address = await self.blockchain.get_raw_change_address()
sendtxid = await self.blockchain.send_to_address(change_address, 10) sendtxid = await self.blockchain.send_to_address(change_address, 10)
await asyncio.sleep(0.2) # After a few tries, Hub should have the transaction (in mempool).
tx = await self.daemon.jsonrpc_transaction_show(sendtxid) for i in range(5):
self.assertEqual(tx.id, sendtxid) tx = await self.daemon.jsonrpc_transaction_show(sendtxid)
self.assertEqual(tx.height, -1) # Retry if Hub is not aware of the transaction.
if isinstance(tx, dict):
# Fields: 'success', 'code', 'message'
self.assertFalse(tx['success'], tx)
self.assertEqual(tx['code'], 404, tx)
self.assertEqual(tx['message'], "transaction not found", tx)
await asyncio.sleep(0.1)
continue
break
# verify transaction show (in mempool)
self.assertTrue(isinstance(tx, Transaction), str(tx))
# Fields: 'txid', 'raw', 'height', 'position', 'is_verified', and more.
self.assertEqual(tx.id, sendtxid, vars(tx))
self.assertEqual(tx.height, -1, vars(tx))
self.assertEqual(tx.is_verified, False, vars(tx))
# transaction is confirmed and leaves mempool
await self.generate(1) await self.generate(1)
# verify transaction show
tx = await self.daemon.jsonrpc_transaction_show(sendtxid) tx = await self.daemon.jsonrpc_transaction_show(sendtxid)
self.assertEqual(tx.height, self.ledger.headers.height) self.assertTrue(isinstance(tx, Transaction), str(tx))
self.assertEqual(tx.id, sendtxid, vars(tx))
self.assertEqual(tx.height, self.ledger.headers.height, vars(tx))
self.assertEqual(tx.is_verified, True, vars(tx))
# inexistent # inexistent
result = await self.daemon.jsonrpc_transaction_show('0'*64) result = await self.daemon.jsonrpc_transaction_show('0'*64)
self.assertFalse(result['success']) self.assertTrue(isinstance(result, dict), result)
# Fields: 'success', 'code', 'message'
self.assertFalse(result['success'], result)
self.assertEqual(result['code'], 404, result)
self.assertEqual(result['message'], "transaction not found", result)
async def test_utxo_release(self): async def test_utxo_release(self):
await self.send_to_address_and_wait( await self.send_to_address_and_wait(

View file

@ -305,7 +305,6 @@ class TestStreamManager(BlobExchangeTestBase):
self.assertEqual(stored_status, "running") self.assertEqual(stored_status, "running")
await stream.stop() await stream.stop()
await asyncio.sleep(1) # TODO: should not be needed
self.assertFalse(stream.finished) self.assertFalse(stream.finished)
self.assertFalse(stream.running) self.assertFalse(stream.running)
@ -341,6 +340,8 @@ class TestStreamManager(BlobExchangeTestBase):
try: try:
await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager, timeout) await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager, timeout)
except Exception as err: except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
error = err error = err
self.assertEqual(expected_error, type(error)) self.assertEqual(expected_error, type(error))
@ -423,7 +424,7 @@ class TestStreamManager(BlobExchangeTestBase):
self.assertIsNone(stream.full_path) self.assertIsNone(stream.full_path)
self.assertEqual(0, stream.written_bytes) self.assertEqual(0, stream.written_bytes)
self.stream_manager.stop() await self.stream_manager.stop()
await self.stream_manager.start() await self.stream_manager.start()
self.assertEqual(1, len(self.stream_manager.streams)) self.assertEqual(1, len(self.stream_manager.streams))
stream = list(self.stream_manager.streams.values())[0] stream = list(self.stream_manager.streams.values())[0]
@ -448,7 +449,7 @@ class TestStreamManager(BlobExchangeTestBase):
stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager) stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
await stream.finished_writing.wait() await stream.finished_writing.wait()
await asyncio.sleep(0) await asyncio.sleep(0)
self.stream_manager.stop() await self.stream_manager.stop()
self.client_blob_manager.stop() self.client_blob_manager.stop()
# partial removal, only sd blob is missing. # partial removal, only sd blob is missing.
# in this case, we recover the sd blob while the other blobs are kept untouched as 'finished' # in this case, we recover the sd blob while the other blobs are kept untouched as 'finished'

View file

@ -470,7 +470,7 @@ class TestUpgrade(AsyncioTestCase):
class TestSQLiteRace(AsyncioTestCase): class TestSQLiteRace(AsyncioTestCase):
max_misuse_attempts = 80000 max_misuse_attempts = 120000
def setup_db(self): def setup_db(self):
self.db = sqlite3.connect(":memory:", isolation_level=None) self.db = sqlite3.connect(":memory:", isolation_level=None)