Compare commits

..

37 commits

Author SHA1 Message Date
Jonathan Moody
eb5da9511e Revert "TEMP: Try python 3.8."
This reverts commit 8def4d5177.
2023-04-03 13:34:36 -04:00
Jonathan Moody
8722ef840e Bump python_requires >= 3.8.
Code to handle CancelledError (as subclass of Exception) was removed.
2023-04-03 13:34:36 -04:00
Jonathan Moody
6e75a1a89b TEMP: Try python 3.8. 2023-04-03 13:34:36 -04:00
Jonathan Moody
ef3189de1d Work on some DeprecationWarnings: The explicit passing of coroutine objects to asyncio.wait() is deprecated since Python 3.8. 2023-04-03 13:34:36 -04:00
Jonathan Moody
c2d2080034 Try to suppress asyncio.CancelledError in a different way in test_streaming.py. 2023-04-03 13:34:36 -04:00
Jonathan Moody
d0b5a0a8fd TEMP: Add workflow_dispatch. 2023-04-03 13:34:36 -04:00
Jonathan Moody
1d0e17be21 Another place generalized to Exception or asyncio.CancelledError. 2023-04-03 13:34:36 -04:00
Jonathan Moody
4ef03bb1f4 Try separate file_manager.stop() and start() calls to better
control order of events in test.
While file_manager is stopped, we get no response to file_list().
2023-04-03 13:34:36 -04:00
Jonathan Moody
4bd4bcdc27 Try ubuntu-20.04 to resolve missing libffi.so.7 issue. 2023-04-03 13:34:36 -04:00
Jonathan Moody
e5ca967fa2 Make FileManager.stop() async because SourceManager.stop() is now async. 2023-04-03 13:34:36 -04:00
Jonathan Moody
eed7d02e8b Tweak aiohttp version to be compatible with hub repository. 2023-04-03 13:34:36 -04:00
Jonathan Moody
02aecad52b CancelledError derives from BaseException in Python >= 3.8. The significant functional
change here is in upload_to_reflector(). Unit tests in TestReflector were failing.
Deal with lint related to CancelledError cleanup.
2023-04-03 13:34:36 -04:00
Jonathan Moody
585962d930 Make stop(), stop_tasks() consistently async routines, and have stop_tasks()
wait for file_output_task completion. This fixes a problem with
test_download_stop_resume_delete.
2023-04-03 13:34:36 -04:00
Jonathan Moody
ea4fba39a6 Fix Transport, DatagramTransport mockup issues. 2023-04-03 13:34:36 -04:00
Jonathan Moody
7a86406746 Fix and enable lint no-self-use & try-except-raise. 2023-04-03 13:34:36 -04:00
Jonathan Moody
c8a3eb97a4 Bump pylint version. Old pylint did not find standard library stuff on 3.9.12. 2023-04-03 13:34:36 -04:00
Lex Berezhny
20213628d7 upgrade cryptography 2023-04-03 13:34:36 -04:00
Lex Berezhny
2d1649f972 pylint disable shuffle() arg check 2023-04-03 13:34:36 -04:00
Lex Berezhny
5cb04b86a0 shuffle() needs custom random, removed loop from Event()/Queue() 2023-04-03 13:34:36 -04:00
Lex Berezhny
93ab6b3be3 passing loop to asyncio functions is deprecated 2023-04-03 13:34:36 -04:00
Lex Berezhny
b9762c3e64 update plyvel 2023-04-03 13:34:36 -04:00
Lex Berezhny
82592d00ef try building 3.9 2023-04-03 13:34:36 -04:00
Jonathan Moody
c118174c1a Try shell: bash to simplify. 2023-02-02 14:16:07 -05:00
Jonathan Moody
d284acd8b8 Remove "debug pip cache". 2023-02-02 14:16:07 -05:00
Jonathan Moody
235c98372d Fix syntax. 2023-02-02 14:16:07 -05:00
Jonathan Moody
d2f5073ef4 Single "set pip cache dir" task with conditional inside. 2023-02-02 14:16:07 -05:00
Jonathan Moody
84e5e43117 Bump upload-artifact version too. 2023-02-02 14:16:07 -05:00
Jonathan Moody
7bd025ae54 Upgrade change-string-case. Use startsWith() to test runner.os.
Bump change-string-case-action version again.
2023-02-02 14:16:07 -05:00
Jonathan Moody
8f28ce65b0 Switch to environment vars in $GITHUB_ENV. 2023-02-02 14:16:07 -05:00
Jonathan Moody
d36e305129 Functions save-state, set-output deprecated. Use new mechanism. 2023-02-02 14:16:07 -05:00
Jonathan Moody
2609dee8fb Bump checkout, setup-python, cache action verions. 2023-02-02 14:16:07 -05:00
Lex Berezhny
a2da86d4b5 v0.113.0 2023-01-23 10:43:02 -05:00
Alex Grin
aa16c7fee5 Update conf.py 2023-01-23 10:30:25 -05:00
Alex Grin
3266f72b82 add s1.lbry.network 2023-01-23 10:30:25 -05:00
Jack Robison
77cd2a3f8a add more non lbry.com hubs/bootstrap dht nodes 2023-01-23 10:30:25 -05:00
Alex Grin
308e586e9a add grin's domain to bootstrap hubs list 2023-01-23 10:30:25 -05:00
84beddfd77 Added tracker and dht from pigg.es
Added tracker and dht from pigg.es
2023-01-22 19:09:17 -05:00
34 changed files with 291 additions and 524 deletions

View file

@ -1,5 +1,5 @@
name: ci name: ci
on: ["push", "pull_request"] on: ["push", "pull_request", "workflow_dispatch"]
jobs: jobs:
@ -7,12 +7,12 @@ jobs:
name: lint name: lint
runs-on: ubuntu-20.04 runs-on: ubuntu-20.04
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v3
- uses: actions/setup-python@v1 - uses: actions/setup-python@v4
with: with:
python-version: '3.7' python-version: '3.9'
- name: extract pip cache - name: extract pip cache
uses: actions/cache@v2 uses: actions/cache@v3
with: with:
path: ~/.cache/pip path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }} key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
@ -31,21 +31,21 @@ jobs:
- windows-latest - windows-latest
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v3
- uses: actions/setup-python@v1 - uses: actions/setup-python@v4
with: with:
python-version: '3.7' python-version: '3.9'
- name: set pip cache dir - name: set pip cache dir
id: pip-cache shell: bash
run: echo "::set-output name=dir::$(pip cache dir)" run: echo "PIP_CACHE_DIR=$(pip cache dir)" >> $GITHUB_ENV
- name: extract pip cache - name: extract pip cache
uses: actions/cache@v2 uses: actions/cache@v3
with: with:
path: ${{ steps.pip-cache.outputs.dir }} path: ${{ env.PIP_CACHE_DIR }}
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }} key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
restore-keys: ${{ runner.os }}-pip- restore-keys: ${{ runner.os }}-pip-
- id: os-name - id: os-name
uses: ASzc/change-string-case-action@v1 uses: ASzc/change-string-case-action@v5
with: with:
string: ${{ runner.os }} string: ${{ runner.os }}
- run: python -m pip install --user --upgrade pip wheel - run: python -m pip install --user --upgrade pip wheel
@ -93,16 +93,16 @@ jobs:
uses: elastic/elastic-github-actions/elasticsearch@master uses: elastic/elastic-github-actions/elasticsearch@master
with: with:
stack-version: 7.12.1 stack-version: 7.12.1
- uses: actions/checkout@v2 - uses: actions/checkout@v3
- uses: actions/setup-python@v1 - uses: actions/setup-python@v4
with: with:
python-version: '3.7' python-version: '3.9'
- if: matrix.test == 'other' - if: matrix.test == 'other'
run: | run: |
sudo apt-get update sudo apt-get update
sudo apt-get install -y --no-install-recommends ffmpeg sudo apt-get install -y --no-install-recommends ffmpeg
- name: extract pip cache - name: extract pip cache
uses: actions/cache@v2 uses: actions/cache@v3
with: with:
path: ./.tox path: ./.tox
key: tox-integration-${{ matrix.test }}-${{ hashFiles('setup.py') }} key: tox-integration-${{ matrix.test }}-${{ hashFiles('setup.py') }}
@ -138,26 +138,26 @@ jobs:
strategy: strategy:
matrix: matrix:
os: os:
- ubuntu-18.04 - ubuntu-20.04
- macos-latest - macos-latest
- windows-latest - windows-latest
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v3
- uses: actions/setup-python@v1 - uses: actions/setup-python@v4
with: with:
python-version: '3.7' python-version: '3.9'
- id: os-name - id: os-name
uses: ASzc/change-string-case-action@v1 uses: ASzc/change-string-case-action@v5
with: with:
string: ${{ runner.os }} string: ${{ runner.os }}
- name: set pip cache dir - name: set pip cache dir
id: pip-cache shell: bash
run: echo "::set-output name=dir::$(pip cache dir)" run: echo "PIP_CACHE_DIR=$(pip cache dir)" >> $GITHUB_ENV
- name: extract pip cache - name: extract pip cache
uses: actions/cache@v2 uses: actions/cache@v3
with: with:
path: ${{ steps.pip-cache.outputs.dir }} path: ${{ env.PIP_CACHE_DIR }}
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }} key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
restore-keys: ${{ runner.os }}-pip- restore-keys: ${{ runner.os }}-pip-
- run: pip install pyinstaller==4.6 - run: pip install pyinstaller==4.6
@ -175,7 +175,7 @@ jobs:
pip install pywin32==301 pip install pywin32==301
pyinstaller --additional-hooks-dir=scripts/. --icon=icons/lbry256.ico --onefile --name lbrynet lbry/extras/cli.py pyinstaller --additional-hooks-dir=scripts/. --icon=icons/lbry256.ico --onefile --name lbrynet lbry/extras/cli.py
dist/lbrynet.exe --version dist/lbrynet.exe --version
- uses: actions/upload-artifact@v2 - uses: actions/upload-artifact@v3
with: with:
name: lbrynet-${{ steps.os-name.outputs.lowercase }} name: lbrynet-${{ steps.os-name.outputs.lowercase }}
path: dist/ path: dist/

View file

@ -1,2 +1,2 @@
__version__ = "0.112.0" __version__ = "0.113.0"
version = tuple(map(int, __version__.split('.'))) # pylint: disable=invalid-name version = tuple(map(int, __version__.split('.'))) # pylint: disable=invalid-name

View file

@ -64,7 +64,7 @@ class BlobDownloader:
self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1 self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1
async def new_peer_or_finished(self): async def new_peer_or_finished(self):
active_tasks = list(self.active_connections.values()) + [asyncio.sleep(1)] active_tasks = list(self.active_connections.values()) + [asyncio.create_task(asyncio.sleep(1))]
await asyncio.wait(active_tasks, return_when='FIRST_COMPLETED') await asyncio.wait(active_tasks, return_when='FIRST_COMPLETED')
def cleanup_active(self): def cleanup_active(self):

View file

@ -688,6 +688,9 @@ class Config(CLIConfig):
tracker_servers = Servers("BitTorrent-compatible (BEP15) UDP trackers for helping P2P discovery", [ tracker_servers = Servers("BitTorrent-compatible (BEP15) UDP trackers for helping P2P discovery", [
('tracker.lbry.com', 9252), ('tracker.lbry.com', 9252),
('tracker.lbry.grin.io', 9252), ('tracker.lbry.grin.io', 9252),
('tracker.lbry.pigg.es', 9252),
('tracker.lizard.technology', 9252),
('s1.lbry.network', 9252),
]) ])
lbryum_servers = Servers("SPV wallet servers", [ lbryum_servers = Servers("SPV wallet servers", [
@ -700,14 +703,20 @@ class Config(CLIConfig):
('spv17.lbry.com', 50001), ('spv17.lbry.com', 50001),
('spv18.lbry.com', 50001), ('spv18.lbry.com', 50001),
('spv19.lbry.com', 50001), ('spv19.lbry.com', 50001),
('hub.lbry.grin.io', 50001),
('hub.lizard.technology', 50001),
('s1.lbry.network', 50001),
]) ])
known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [ known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [
('dht.lbry.grin.io', 4444), # Grin ('dht.lbry.grin.io', 4444), # Grin
('dht.lbry.madiator.com', 4444), # Madiator ('dht.lbry.madiator.com', 4444), # Madiator
('dht.lbry.pigg.es', 4444), # Pigges
('lbrynet1.lbry.com', 4444), # US EAST ('lbrynet1.lbry.com', 4444), # US EAST
('lbrynet2.lbry.com', 4444), # US WEST ('lbrynet2.lbry.com', 4444), # US WEST
('lbrynet3.lbry.com', 4444), # EU ('lbrynet3.lbry.com', 4444), # EU
('lbrynet4.lbry.com', 4444) # ASIA ('lbrynet4.lbry.com', 4444), # ASIA
('dht.lizard.technology', 4444), # Jack
('s2.lbry.network', 4444),
]) ])
# blockchain # blockchain

View file

@ -42,8 +42,6 @@ class BlobAnnouncer:
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers) log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
except Exception as err: except Exception as err:
self.announcements_sent_metric.labels(peers=0, error=True).inc() self.announcements_sent_metric.labels(peers=0, error=True).inc()
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise err
log.warning("error announcing %s: %s", blob_hash[:8], str(err)) log.warning("error announcing %s: %s", blob_hash[:8], str(err))
async def _announce(self, batch_size: typing.Optional[int] = 10): async def _announce(self, batch_size: typing.Optional[int] = 10):

View file

@ -81,8 +81,8 @@ Code | Name | Message
511 | CorruptBlob | Blobs is corrupted. 511 | CorruptBlob | Blobs is corrupted.
520 | BlobFailedEncryption | Failed to encrypt blob. 520 | BlobFailedEncryption | Failed to encrypt blob.
531 | DownloadCancelled | Download was canceled. 531 | DownloadCancelled | Download was canceled.
532 | DownloadMetadataTimeout | Failed to download metadata for {download} within timeout. 532 | DownloadSDTimeout | Failed to download sd blob {download} within timeout.
533 | DownloadDataTimeout | Failed to download data blobs for {download} within timeout. 533 | DownloadDataTimeout | Failed to download data blobs for sd hash {download} within timeout.
534 | InvalidStreamDescriptor | {message} 534 | InvalidStreamDescriptor | {message}
535 | InvalidData | {message} 535 | InvalidData | {message}
536 | InvalidBlobHash | {message} 536 | InvalidBlobHash | {message}

View file

@ -411,18 +411,18 @@ class DownloadCancelledError(BlobError):
super().__init__("Download was canceled.") super().__init__("Download was canceled.")
class DownloadMetadataTimeoutError(BlobError): class DownloadSDTimeoutError(BlobError):
def __init__(self, download): def __init__(self, download):
self.download = download self.download = download
super().__init__(f"Failed to download metadata for {download} within timeout.") super().__init__(f"Failed to download sd blob {download} within timeout.")
class DownloadDataTimeoutError(BlobError): class DownloadDataTimeoutError(BlobError):
def __init__(self, download): def __init__(self, download):
self.download = download self.download = download
super().__init__(f"Failed to download data blobs for {download} within timeout.") super().__init__(f"Failed to download data blobs for sd hash {download} within timeout.")
class InvalidStreamDescriptorError(BlobError): class InvalidStreamDescriptorError(BlobError):

View file

@ -37,7 +37,7 @@ class Component(metaclass=ComponentType):
def running(self): def running(self):
return self._running return self._running
async def get_status(self): async def get_status(self): # pylint: disable=no-self-use
return return
async def start(self): async def start(self):

View file

@ -118,7 +118,7 @@ class ComponentManager:
component._setup() for component in stage if not component.running component._setup() for component in stage if not component.running
] ]
if needing_start: if needing_start:
await asyncio.wait(needing_start) await asyncio.wait(map(asyncio.create_task, needing_start))
self.started.set() self.started.set()
async def stop(self): async def stop(self):
@ -131,7 +131,7 @@ class ComponentManager:
component._stop() for component in stage if component.running component._stop() for component in stage if component.running
] ]
if needing_stop: if needing_stop:
await asyncio.wait(needing_stop) await asyncio.wait(map(asyncio.create_task, needing_stop))
def all_components_running(self, *component_names): def all_components_running(self, *component_names):
""" """

View file

@ -374,7 +374,7 @@ class FileManagerComponent(Component):
log.info('Done setting up file manager') log.info('Done setting up file manager')
async def stop(self): async def stop(self):
self.file_manager.stop() await self.file_manager.stop()
class BackgroundDownloaderComponent(Component): class BackgroundDownloaderComponent(Component):
@ -560,8 +560,6 @@ class UPnPComponent(Component):
self.upnp = await UPnP.discover(loop=self.component_manager.loop) self.upnp = await UPnP.discover(loop=self.component_manager.loop)
log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string) log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string)
except Exception as err: except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.warning("upnp discovery failed: %s", err) log.warning("upnp discovery failed: %s", err)
self.upnp = None self.upnp = None

View file

@ -36,7 +36,7 @@ from lbry.blob.blob_file import is_valid_blobhash, BlobBuffer
from lbry.blob_exchange.downloader import download_blob from lbry.blob_exchange.downloader import download_blob
from lbry.dht.peer import make_kademlia_peer from lbry.dht.peer import make_kademlia_peer
from lbry.error import ( from lbry.error import (
DownloadMetadataTimeoutError, ComponentsNotStartedError, ComponentStartConditionNotMetError, DownloadSDTimeoutError, ComponentsNotStartedError, ComponentStartConditionNotMetError,
CommandDoesNotExistError, BaseError, WalletNotFoundError, WalletAlreadyLoadedError, WalletAlreadyExistsError, CommandDoesNotExistError, BaseError, WalletNotFoundError, WalletAlreadyLoadedError, WalletAlreadyExistsError,
ConflictingInputValueError, AlreadyPurchasedError, PrivateKeyNotFoundError, InputStringIsBlankError, ConflictingInputValueError, AlreadyPurchasedError, PrivateKeyNotFoundError, InputStringIsBlankError,
InputValueError InputValueError
@ -614,7 +614,8 @@ class Daemon(metaclass=JSONRPCServerType):
content_type='application/json' content_type='application/json'
) )
async def handle_metrics_get_request(self, request: web.Request): @staticmethod
async def handle_metrics_get_request(request: web.Request):
try: try:
return web.Response( return web.Response(
text=prom_generate_latest().decode(), text=prom_generate_latest().decode(),
@ -639,7 +640,7 @@ class Daemon(metaclass=JSONRPCServerType):
stream = await self.jsonrpc_get(uri) stream = await self.jsonrpc_get(uri)
if isinstance(stream, dict): if isinstance(stream, dict):
raise web.HTTPServerError(text=stream['error']) raise web.HTTPServerError(text=stream['error'])
raise web.HTTPFound(f"/stream/{stream.identifier}") raise web.HTTPFound(f"/stream/{stream.sd_hash}")
async def handle_stream_range_request(self, request: web.Request): async def handle_stream_range_request(self, request: web.Request):
try: try:
@ -658,13 +659,12 @@ class Daemon(metaclass=JSONRPCServerType):
log.debug("finished handling /stream range request") log.debug("finished handling /stream range request")
async def _handle_stream_range_request(self, request: web.Request): async def _handle_stream_range_request(self, request: web.Request):
identifier = request.path.split("/stream/")[1] sd_hash = request.path.split("/stream/")[1]
if not self.file_manager.started.is_set(): if not self.file_manager.started.is_set():
await self.file_manager.started.wait() await self.file_manager.started.wait()
stream = self.file_manager.get_filtered(identifier=identifier) if sd_hash not in self.file_manager.streams:
if not stream:
return web.HTTPNotFound() return web.HTTPNotFound()
return await self.file_manager.stream_partial_content(request, identifier) return await self.file_manager.stream_partial_content(request, sd_hash)
async def _process_rpc_call(self, data): async def _process_rpc_call(self, data):
args = data.get('params', {}) args = data.get('params', {})
@ -1140,7 +1140,7 @@ class Daemon(metaclass=JSONRPCServerType):
save_file=save_file, wallet=wallet save_file=save_file, wallet=wallet
) )
if not stream: if not stream:
raise DownloadMetadataTimeoutError(uri) raise DownloadSDTimeoutError(uri)
except Exception as e: except Exception as e:
# TODO: use error from lbry.error # TODO: use error from lbry.error
log.warning("Error downloading %s: %s", uri, str(e)) log.warning("Error downloading %s: %s", uri, str(e))

View file

@ -80,8 +80,6 @@ class MarketFeed:
self.rate = ExchangeRate(self.market, rate, int(time.time())) self.rate = ExchangeRate(self.market, rate, int(time.time()))
self.last_check = time.time() self.last_check = time.time()
return self.rate return self.rate
except asyncio.CancelledError:
raise
except asyncio.TimeoutError: except asyncio.TimeoutError:
log.warning("Timed out fetching exchange rate from %s.", self.name) log.warning("Timed out fetching exchange rate from %s.", self.name)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:

View file

@ -285,7 +285,7 @@ class JSONResponseEncoder(JSONEncoder):
else: else:
total_bytes_lower_bound = total_bytes = managed_stream.torrent_length total_bytes_lower_bound = total_bytes = managed_stream.torrent_length
result = { result = {
'streaming_url': managed_stream.stream_url, 'streaming_url': None,
'completed': managed_stream.completed, 'completed': managed_stream.completed,
'file_name': None, 'file_name': None,
'download_directory': None, 'download_directory': None,
@ -293,10 +293,10 @@ class JSONResponseEncoder(JSONEncoder):
'points_paid': 0.0, 'points_paid': 0.0,
'stopped': not managed_stream.running, 'stopped': not managed_stream.running,
'stream_hash': None, 'stream_hash': None,
'stream_name': managed_stream.stream_name, 'stream_name': None,
'suggested_file_name': managed_stream.suggested_file_name, 'suggested_file_name': None,
'sd_hash': None, 'sd_hash': None,
'mime_type': managed_stream.mime_type, 'mime_type': None,
'key': None, 'key': None,
'total_bytes_lower_bound': total_bytes_lower_bound, 'total_bytes_lower_bound': total_bytes_lower_bound,
'total_bytes': total_bytes, 'total_bytes': total_bytes,
@ -326,8 +326,12 @@ class JSONResponseEncoder(JSONEncoder):
} }
if is_stream: if is_stream:
result.update({ result.update({
'streaming_url': managed_stream.stream_url,
'stream_hash': managed_stream.stream_hash, 'stream_hash': managed_stream.stream_hash,
'stream_name': managed_stream.stream_name,
'suggested_file_name': managed_stream.suggested_file_name,
'sd_hash': managed_stream.descriptor.sd_hash, 'sd_hash': managed_stream.descriptor.sd_hash,
'mime_type': managed_stream.mime_type,
'key': managed_stream.descriptor.key, 'key': managed_stream.descriptor.key,
'blobs_completed': managed_stream.blobs_completed, 'blobs_completed': managed_stream.blobs_completed,
'blobs_in_stream': managed_stream.blobs_in_stream, 'blobs_in_stream': managed_stream.blobs_in_stream,
@ -336,6 +340,10 @@ class JSONResponseEncoder(JSONEncoder):
'reflector_progress': managed_stream.reflector_progress, 'reflector_progress': managed_stream.reflector_progress,
'uploading_to_reflector': managed_stream.uploading_to_reflector 'uploading_to_reflector': managed_stream.uploading_to_reflector
}) })
else:
result.update({
'streaming_url': f'file://{managed_stream.full_path}',
})
if output_exists: if output_exists:
result.update({ result.update({
'file_name': managed_stream.file_name, 'file_name': managed_stream.file_name,

View file

@ -5,7 +5,6 @@ import typing
import asyncio import asyncio
import binascii import binascii
import time import time
from operator import itemgetter
from typing import Optional from typing import Optional
from lbry.wallet import SQLiteMixin from lbry.wallet import SQLiteMixin
from lbry.conf import Config from lbry.conf import Config
@ -212,7 +211,7 @@ def delete_torrent(transaction: sqlite3.Connection, bt_infohash: str):
transaction.execute("delete from torrent where bt_infohash=?", (bt_infohash,)).fetchall() transaction.execute("delete from torrent where bt_infohash=?", (bt_infohash,)).fetchall()
def store_file(transaction: sqlite3.Connection, identifier_value: str, file_name: typing.Optional[str], def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typing.Optional[str],
download_directory: typing.Optional[str], data_payment_rate: float, status: str, download_directory: typing.Optional[str], data_payment_rate: float, status: str,
content_fee: typing.Optional[Transaction], added_on: typing.Optional[int] = None) -> int: content_fee: typing.Optional[Transaction], added_on: typing.Optional[int] = None) -> int:
if not file_name and not download_directory: if not file_name and not download_directory:
@ -220,18 +219,15 @@ def store_file(transaction: sqlite3.Connection, identifier_value: str, file_name
else: else:
encoded_file_name = binascii.hexlify(file_name.encode()).decode() encoded_file_name = binascii.hexlify(file_name.encode()).decode()
encoded_download_dir = binascii.hexlify(download_directory.encode()).decode() encoded_download_dir = binascii.hexlify(download_directory.encode()).decode()
is_torrent = len(identifier_value) == 40
time_added = added_on or int(time.time()) time_added = added_on or int(time.time())
transaction.execute( transaction.execute(
f"insert or replace into file values ({'NULL, ?' if is_torrent else '?, NULL'}, ?, ?, ?, ?, ?, ?, ?)", "insert or replace into file values (?, NULL, ?, ?, ?, ?, ?, ?, ?)",
(identifier_value, encoded_file_name, encoded_download_dir, data_payment_rate, status, (stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status,
1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0, 1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0,
None if not content_fee else binascii.hexlify(content_fee.raw).decode(), time_added) None if not content_fee else binascii.hexlify(content_fee.raw).decode(), time_added)
).fetchall() ).fetchall()
return transaction.execute( return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0]
f"select rowid from file where {'bt_infohash' if is_torrent else 'stream_hash'}=?",
(identifier_value, )).fetchone()[0]
class SQLiteStorage(SQLiteMixin): class SQLiteStorage(SQLiteMixin):
@ -636,13 +632,6 @@ class SQLiteStorage(SQLiteMixin):
def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]: def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]:
return self.db.run(get_all_lbry_files) return self.db.run(get_all_lbry_files)
async def get_all_torrent_files(self) -> typing.List[typing.Dict]:
def _get_all_torrent_files(transaction):
cursor = transaction.execute(
"select file.ROWID as rowid, * from file join torrent on file.bt_infohash=torrent.bt_infohash")
return map(lambda row: dict(zip(list(map(itemgetter(0), cursor.description)), row)), cursor.fetchall())
return list(await self.db.run(_get_all_torrent_files))
def change_file_status(self, stream_hash: str, new_status: str): def change_file_status(self, stream_hash: str, new_status: str):
log.debug("update file status %s -> %s", stream_hash, new_status) log.debug("update file status %s -> %s", stream_hash, new_status)
return self.db.execute_fetchall("update file set status=? where stream_hash=?", (new_status, stream_hash)) return self.db.execute_fetchall("update file set status=? where stream_hash=?", (new_status, stream_hash))
@ -804,7 +793,7 @@ class SQLiteStorage(SQLiteMixin):
await self.db.run(_save_claims) await self.db.run(_save_claims)
if update_file_callbacks: if update_file_callbacks:
await asyncio.wait(update_file_callbacks) await asyncio.wait(map(asyncio.create_task, update_file_callbacks))
if claim_id_to_supports: if claim_id_to_supports:
await self.save_supports(claim_id_to_supports) await self.save_supports(claim_id_to_supports)
@ -883,20 +872,15 @@ class SQLiteStorage(SQLiteMixin):
if stream_hash in self.content_claim_callbacks: if stream_hash in self.content_claim_callbacks:
await self.content_claim_callbacks[stream_hash]() await self.content_claim_callbacks[stream_hash]()
async def add_torrent(self, bt_infohash, length, name): async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name):
def _save_torrent(transaction, bt_infohash, length, name): def _save_torrent(transaction):
transaction.execute( transaction.execute(
"insert or replace into torrent values (?, NULL, ?, ?)", (bt_infohash, length, name) "insert or replace into torrent values (?, NULL, ?, ?)", (bt_infohash, length, name)
).fetchall() ).fetchall()
return await self.db.run(_save_torrent, bt_infohash, length, name)
async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name):
def _save_torrent_claim(transaction):
transaction.execute( transaction.execute(
"insert or replace into content_claim values (NULL, ?, ?)", (bt_infohash, claim_outpoint) "insert or replace into content_claim values (NULL, ?, ?)", (bt_infohash, claim_outpoint)
).fetchall() ).fetchall()
await self.add_torrent(bt_infohash, length, name) await self.db.run(_save_torrent)
await self.db.run(_save_torrent_claim)
# update corresponding ManagedEncryptedFileDownloader object # update corresponding ManagedEncryptedFileDownloader object
if bt_infohash in self.content_claim_callbacks: if bt_infohash in self.content_claim_callbacks:
await self.content_claim_callbacks[bt_infohash]() await self.content_claim_callbacks[bt_infohash]()
@ -914,7 +898,7 @@ class SQLiteStorage(SQLiteMixin):
async def get_content_claim_for_torrent(self, bt_infohash): async def get_content_claim_for_torrent(self, bt_infohash):
claims = await self.db.run(get_claims_from_torrent_info_hashes, [bt_infohash]) claims = await self.db.run(get_claims_from_torrent_info_hashes, [bt_infohash])
return claims[bt_infohash] if claims else None return claims[bt_infohash].as_dict() if claims else None
# # # # # # # # # reflector functions # # # # # # # # # # # # # # # # # # reflector functions # # # # # # # # #

View file

@ -3,7 +3,7 @@ import logging
import typing import typing
from typing import Optional from typing import Optional
from aiohttp.web import Request from aiohttp.web import Request
from lbry.error import ResolveError, DownloadMetadataTimeoutError, InsufficientFundsError from lbry.error import ResolveError, DownloadSDTimeoutError, InsufficientFundsError
from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError
from lbry.error import InvalidStreamURLError from lbry.error import InvalidStreamURLError
from lbry.stream.managed_stream import ManagedStream from lbry.stream.managed_stream import ManagedStream
@ -50,10 +50,10 @@ class FileManager:
await manager.started.wait() await manager.started.wait()
self.started.set() self.started.set()
def stop(self): async def stop(self):
for manager in self.source_managers.values(): for manager in self.source_managers.values():
# fixme: pop or not? # fixme: pop or not?
manager.stop() await manager.stop()
self.started.clear() self.started.clear()
@cache_concurrent @cache_concurrent
@ -99,8 +99,6 @@ class FileManager:
except asyncio.TimeoutError: except asyncio.TimeoutError:
raise ResolveTimeoutError(uri) raise ResolveTimeoutError(uri)
except Exception as err: except Exception as err:
if isinstance(err, asyncio.CancelledError):
raise
log.exception("Unexpected error resolving stream:") log.exception("Unexpected error resolving stream:")
raise ResolveError(f"Unexpected error resolving stream: {str(err)}") raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
if 'error' in resolved_result: if 'error' in resolved_result:
@ -139,7 +137,7 @@ class FileManager:
existing[0].identifier, outpoint, existing[0].torrent_length, existing[0].torrent_name existing[0].identifier, outpoint, existing[0].torrent_length, existing[0].torrent_name
) )
claim_info = await self.storage.get_content_claim_for_torrent(existing[0].identifier) claim_info = await self.storage.get_content_claim_for_torrent(existing[0].identifier)
existing[0].set_claim(claim_info.as_dict() if claim_info else None, claim) existing[0].set_claim(claim_info, claim)
else: else:
await self.storage.save_content_claim( await self.storage.save_content_claim(
existing[0].stream_hash, outpoint existing[0].stream_hash, outpoint
@ -242,15 +240,15 @@ class FileManager:
stream.identifier, outpoint, stream.torrent_length, stream.torrent_name stream.identifier, outpoint, stream.torrent_length, stream.torrent_name
) )
claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier) claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier)
stream.set_claim(claim_info.as_dict() if claim_info else None, claim) stream.set_claim(claim_info, claim)
if save_file: if save_file:
await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download)) await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download))
return stream return stream
except asyncio.TimeoutError: except asyncio.TimeoutError:
error = DownloadDataTimeoutError(stream.identifier) error = DownloadDataTimeoutError(stream.sd_hash)
raise error raise error
except Exception as err: # forgive data timeout, don't delete stream except (Exception, asyncio.CancelledError) as err: # forgive data timeout, don't delete stream
expected = (DownloadMetadataTimeoutError, DownloadDataTimeoutError, InsufficientFundsError, expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
KeyFeeAboveMaxAllowedError, ResolveError, InvalidStreamURLError) KeyFeeAboveMaxAllowedError, ResolveError, InvalidStreamURLError)
if isinstance(err, expected): if isinstance(err, expected):
log.warning("Failed to download %s: %s", uri, str(err)) log.warning("Failed to download %s: %s", uri, str(err))
@ -290,24 +288,19 @@ class FileManager:
) )
) )
async def stream_partial_content(self, request: Request, identifier: str): async def stream_partial_content(self, request: Request, sd_hash: str):
for source_manager in self.source_managers.values(): return await self.source_managers['stream'].stream_partial_content(request, sd_hash)
if source_manager.get_filtered(identifier=identifier):
return await source_manager.stream_partial_content(request, identifier)
def get_filtered(self, *args, **kwargs) -> typing.List[ManagedDownloadSource]: def get_filtered(self, *args, **kwargs) -> typing.List[ManagedDownloadSource]:
""" """
Get a list of filtered and sorted ManagedDownloadSource objects from all available source managers Get a list of filtered and sorted ManagedStream objects
:param sort_by: field to sort by
:param reverse: reverse sorting
:param comparison: comparison operator used for filtering
:param search_by: fields and values to filter by
""" """
result = last_error = None return sum((manager.get_filtered(*args, **kwargs) for manager in self.source_managers.values()), [])
for manager in self.source_managers.values():
try:
result = (result or []) + manager.get_filtered(*args, **kwargs)
except ValueError as error:
last_error = error
if result is not None:
return result
raise last_error
async def delete(self, source: ManagedDownloadSource, delete_file=False): async def delete(self, source: ManagedDownloadSource, delete_file=False):
for manager in self.source_managers.values(): for manager in self.source_managers.values():

View file

@ -1,6 +1,5 @@
import os import os
import asyncio import asyncio
import time
import typing import typing
import logging import logging
import binascii import binascii
@ -44,7 +43,7 @@ class ManagedDownloadSource:
self.rowid = rowid self.rowid = rowid
self.content_fee = content_fee self.content_fee = content_fee
self.purchase_receipt = None self.purchase_receipt = None
self._added_on = added_on or int(time.time()) self._added_on = added_on
self.analytics_manager = analytics_manager self.analytics_manager = analytics_manager
self.downloader = None self.downloader = None
@ -68,7 +67,7 @@ class ManagedDownloadSource:
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None): async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
raise NotImplementedError() raise NotImplementedError()
def stop_tasks(self): async def stop_tasks(self):
raise NotImplementedError() raise NotImplementedError()
def set_claim(self, claim_info: typing.Dict, claim: 'Claim'): def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):
@ -92,14 +91,6 @@ class ManagedDownloadSource:
def added_on(self) -> Optional[int]: def added_on(self) -> Optional[int]:
return self._added_on return self._added_on
@property
def suggested_file_name(self):
return self._file_name
@property
def stream_name(self):
return self.suggested_file_name
@property @property
def status(self) -> str: def status(self) -> str:
return self._status return self._status
@ -108,9 +99,9 @@ class ManagedDownloadSource:
def completed(self): def completed(self):
raise NotImplementedError() raise NotImplementedError()
@property # @property
def stream_url(self): # def stream_url(self):
return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.identifier}" # return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}
@property @property
def finished(self) -> bool: def finished(self) -> bool:

View file

@ -23,7 +23,6 @@ COMPARISON_OPERATORS = {
class SourceManager: class SourceManager:
filter_fields = { filter_fields = {
'identifier',
'rowid', 'rowid',
'status', 'status',
'file_name', 'file_name',
@ -60,11 +59,11 @@ class SourceManager:
def add(self, source: ManagedDownloadSource): def add(self, source: ManagedDownloadSource):
self._sources[source.identifier] = source self._sources[source.identifier] = source
def remove(self, source: ManagedDownloadSource): async def remove(self, source: ManagedDownloadSource):
if source.identifier not in self._sources: if source.identifier not in self._sources:
return return
self._sources.pop(source.identifier) self._sources.pop(source.identifier)
source.stop_tasks() await source.stop_tasks()
async def initialize_from_database(self): async def initialize_from_database(self):
raise NotImplementedError() raise NotImplementedError()
@ -73,10 +72,10 @@ class SourceManager:
await self.initialize_from_database() await self.initialize_from_database()
self.started.set() self.started.set()
def stop(self): async def stop(self):
while self._sources: while self._sources:
_, source = self._sources.popitem() _, source = self._sources.popitem()
source.stop_tasks() await source.stop_tasks()
self.started.clear() self.started.clear()
async def create(self, file_path: str, key: Optional[bytes] = None, async def create(self, file_path: str, key: Optional[bytes] = None,
@ -84,8 +83,7 @@ class SourceManager:
raise NotImplementedError() raise NotImplementedError()
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
await self.storage.delete_torrent(source.identifier) await self.remove(source)
self.remove(source)
if delete_file and source.output_file_exists: if delete_file and source.output_file_exists:
os.remove(source.full_path) os.remove(source.full_path)

View file

@ -23,6 +23,7 @@ class BackgroundDownloader:
except ValueError: except ValueError:
return return
except asyncio.CancelledError: except asyncio.CancelledError:
log.debug("Cancelled background downloader")
raise raise
except Exception: except Exception:
log.error("Unexpected download error on background downloader") log.error("Unexpected download error on background downloader")

View file

@ -4,7 +4,7 @@ import logging
import binascii import binascii
from lbry.dht.node import get_kademlia_peers_from_hosts from lbry.dht.node import get_kademlia_peers_from_hosts
from lbry.error import DownloadMetadataTimeoutError from lbry.error import DownloadSDTimeoutError
from lbry.utils import lru_cache_concurrent from lbry.utils import lru_cache_concurrent
from lbry.stream.descriptor import StreamDescriptor from lbry.stream.descriptor import StreamDescriptor
from lbry.blob_exchange.downloader import BlobDownloader from lbry.blob_exchange.downloader import BlobDownloader
@ -77,7 +77,7 @@ class StreamDownloader:
log.info("downloaded sd blob %s", self.sd_hash) log.info("downloaded sd blob %s", self.sd_hash)
self.time_to_descriptor = self.loop.time() - now self.time_to_descriptor = self.loop.time() - now
except asyncio.TimeoutError: except asyncio.TimeoutError:
raise DownloadMetadataTimeoutError(self.sd_hash) raise DownloadSDTimeoutError(self.sd_hash)
# parse the descriptor # parse the descriptor
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob( self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(

View file

@ -5,7 +5,7 @@ import typing
import logging import logging
from typing import Optional from typing import Optional
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
from lbry.error import DownloadMetadataTimeoutError from lbry.error import DownloadSDTimeoutError
from lbry.schema.mime_types import guess_media_type from lbry.schema.mime_types import guess_media_type
from lbry.stream.downloader import StreamDownloader from lbry.stream.downloader import StreamDownloader
from lbry.stream.descriptor import StreamDescriptor, sanitize_file_name from lbry.stream.descriptor import StreamDescriptor, sanitize_file_name
@ -104,6 +104,10 @@ class ManagedStream(ManagedDownloadSource):
def completed(self): def completed(self):
return self.written_bytes >= self.descriptor.lower_bound_decrypted_length() return self.written_bytes >= self.descriptor.lower_bound_decrypted_length()
@property
def stream_url(self):
return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}"
async def update_status(self, status: str): async def update_status(self, status: str):
assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED] assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED]
self._status = status self._status = status
@ -160,7 +164,7 @@ class ManagedStream(ManagedDownloadSource):
await asyncio.wait_for(self.downloader.start(), timeout) await asyncio.wait_for(self.downloader.start(), timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
self._running.clear() self._running.clear()
raise DownloadMetadataTimeoutError(self.identifier) raise DownloadSDTimeoutError(self.sd_hash)
if self.delayed_stop_task and not self.delayed_stop_task.done(): if self.delayed_stop_task and not self.delayed_stop_task.done():
self.delayed_stop_task.cancel() self.delayed_stop_task.cancel()
@ -187,7 +191,7 @@ class ManagedStream(ManagedDownloadSource):
Stop any running save/stream tasks as well as the downloader and update the status in the database Stop any running save/stream tasks as well as the downloader and update the status in the database
""" """
self.stop_tasks() await self.stop_tasks()
if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING: if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING:
await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED) await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED)
@ -275,7 +279,7 @@ class ManagedStream(ManagedDownloadSource):
log.info("finished saving file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id, log.info("finished saving file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id,
self.sd_hash[:6], self.full_path) self.sd_hash[:6], self.full_path)
await self.blob_manager.storage.set_saved_file(self.stream_hash) await self.blob_manager.storage.set_saved_file(self.stream_hash)
except Exception as err: except (Exception, asyncio.CancelledError) as err:
if os.path.isfile(output_path): if os.path.isfile(output_path):
log.warning("removing incomplete download %s for %s", output_path, self.sd_hash) log.warning("removing incomplete download %s for %s", output_path, self.sd_hash)
os.remove(output_path) os.remove(output_path)
@ -320,12 +324,13 @@ class ManagedStream(ManagedDownloadSource):
await asyncio.wait_for(self.started_writing.wait(), self.config.download_timeout) await asyncio.wait_for(self.started_writing.wait(), self.config.download_timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
log.warning("timeout starting to write data for lbry://%s#%s", self.claim_name, self.claim_id) log.warning("timeout starting to write data for lbry://%s#%s", self.claim_name, self.claim_id)
self.stop_tasks() await self.stop_tasks()
await self.update_status(ManagedStream.STATUS_STOPPED) await self.update_status(ManagedStream.STATUS_STOPPED)
def stop_tasks(self): async def stop_tasks(self):
if self.file_output_task and not self.file_output_task.done(): if self.file_output_task and not self.file_output_task.done():
self.file_output_task.cancel() self.file_output_task.cancel()
await asyncio.gather(self.file_output_task, return_exceptions=True)
self.file_output_task = None self.file_output_task = None
while self.streaming_responses: while self.streaming_responses:
req, response = self.streaming_responses.pop() req, response = self.streaming_responses.pop()
@ -362,7 +367,7 @@ class ManagedStream(ManagedDownloadSource):
return sent return sent
except ConnectionError: except ConnectionError:
return sent return sent
except (OSError, Exception) as err: except (OSError, Exception, asyncio.CancelledError) as err:
if isinstance(err, asyncio.CancelledError): if isinstance(err, asyncio.CancelledError):
log.warning("stopped uploading %s#%s to reflector", self.claim_name, self.claim_id) log.warning("stopped uploading %s#%s to reflector", self.claim_name, self.claim_id)
elif isinstance(err, OSError): elif isinstance(err, OSError):

View file

@ -32,7 +32,7 @@ def path_or_none(encoded_path) -> Optional[str]:
class StreamManager(SourceManager): class StreamManager(SourceManager):
_sources: typing.Dict[str, ManagedStream] _sources: typing.Dict[str, ManagedStream]
filter_fields = set(SourceManager.filter_fields) filter_fields = SourceManager.filter_fields
filter_fields.update({ filter_fields.update({
'sd_hash', 'sd_hash',
'stream_hash', 'stream_hash',
@ -164,8 +164,6 @@ class StreamManager(SourceManager):
async def reflect_streams(self): async def reflect_streams(self):
try: try:
return await self._reflect_streams() return await self._reflect_streams()
except asyncio.CancelledError:
raise
except Exception: except Exception:
log.exception("reflector task encountered an unexpected error!") log.exception("reflector task encountered an unexpected error!")
@ -198,8 +196,8 @@ class StreamManager(SourceManager):
await super().start() await super().start()
self.re_reflect_task = self.loop.create_task(self.reflect_streams()) self.re_reflect_task = self.loop.create_task(self.reflect_streams())
def stop(self): async def stop(self):
super().stop() await super().stop()
if self.resume_saving_task and not self.resume_saving_task.done(): if self.resume_saving_task and not self.resume_saving_task.done():
self.resume_saving_task.cancel() self.resume_saving_task.cancel()
if self.re_reflect_task and not self.re_reflect_task.done(): if self.re_reflect_task and not self.re_reflect_task.done():
@ -226,7 +224,8 @@ class StreamManager(SourceManager):
) )
return task return task
async def _retriable_reflect_stream(self, stream, host, port): @staticmethod
async def _retriable_reflect_stream(stream, host, port):
sent = await stream.upload_to_reflector(host, port) sent = await stream.upload_to_reflector(host, port)
while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0: while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0:
stream.reflector_progress = 0 stream.reflector_progress = 0
@ -261,7 +260,7 @@ class StreamManager(SourceManager):
return return
if source.identifier in self.running_reflector_uploads: if source.identifier in self.running_reflector_uploads:
self.running_reflector_uploads[source.identifier].cancel() self.running_reflector_uploads[source.identifier].cancel()
source.stop_tasks() await source.stop_tasks()
if source.identifier in self.streams: if source.identifier in self.streams:
del self.streams[source.identifier] del self.streams[source.identifier]
blob_hashes = [source.identifier] + [b.blob_hash for b in source.descriptor.blobs[:-1]] blob_hashes = [source.identifier] + [b.blob_hash for b in source.descriptor.blobs[:-1]]

View file

@ -394,7 +394,6 @@ class CommandTestCase(IntegrationTestCase):
logging.getLogger('lbry.blob_exchange').setLevel(self.VERBOSITY) logging.getLogger('lbry.blob_exchange').setLevel(self.VERBOSITY)
logging.getLogger('lbry.daemon').setLevel(self.VERBOSITY) logging.getLogger('lbry.daemon').setLevel(self.VERBOSITY)
logging.getLogger('lbry.stream').setLevel(self.VERBOSITY) logging.getLogger('lbry.stream').setLevel(self.VERBOSITY)
logging.getLogger('lbry.torrent').setLevel(self.VERBOSITY)
logging.getLogger('lbry.wallet').setLevel(self.VERBOSITY) logging.getLogger('lbry.wallet').setLevel(self.VERBOSITY)
await super().asyncSetUp() await super().asyncSetUp()

View file

@ -3,8 +3,9 @@ import binascii
import os import os
import logging import logging
import random import random
from hashlib import sha1
from tempfile import mkdtemp from tempfile import mkdtemp
from typing import Optional, Tuple, Dict from typing import Optional
import libtorrent import libtorrent
@ -13,8 +14,6 @@ log = logging.getLogger(__name__)
DEFAULT_FLAGS = ( # fixme: somehow the logic here is inverted? DEFAULT_FLAGS = ( # fixme: somehow the logic here is inverted?
libtorrent.add_torrent_params_flags_t.flag_auto_managed libtorrent.add_torrent_params_flags_t.flag_auto_managed
| libtorrent.add_torrent_params_flags_t.flag_update_subscribe | libtorrent.add_torrent_params_flags_t.flag_update_subscribe
| libtorrent.add_torrent_params_flags_t.flag_sequential_download
| libtorrent.add_torrent_params_flags_t.flag_paused
) )
@ -23,102 +22,66 @@ class TorrentHandle:
self._loop = loop self._loop = loop
self._executor = executor self._executor = executor
self._handle: libtorrent.torrent_handle = handle self._handle: libtorrent.torrent_handle = handle
self.started = asyncio.Event(loop=loop)
self.finished = asyncio.Event(loop=loop) self.finished = asyncio.Event(loop=loop)
self.metadata_completed = asyncio.Event(loop=loop) self.metadata_completed = asyncio.Event(loop=loop)
self.size = handle.status().total_wanted self.size = 0
self.total_wanted_done = 0 self.total_wanted_done = 0
self.name = '' self.name = ''
self.tasks = [] self.tasks = []
self._torrent_info: libtorrent.torrent_info = handle.torrent_file() self.torrent_file: Optional[libtorrent.file_storage] = None
self._base_path = None self._base_path = None
self._handle.set_sequential_download(1)
@property @property
def torrent_file(self) -> Optional[libtorrent.file_storage]: def largest_file(self) -> Optional[str]:
return self._torrent_info.files() if not self.torrent_file:
def full_path_at(self, file_num) -> Optional[str]:
if self.torrent_file is None:
return None return None
return os.path.join(self.save_path, self.torrent_file.file_path(file_num)) index = self.largest_file_index
return os.path.join(self._base_path, self.torrent_file.at(index).path)
def size_at(self, file_num) -> Optional[int]:
if self.torrent_file is not None:
return self.torrent_file.file_size(file_num)
@property @property
def save_path(self) -> Optional[str]: def largest_file_index(self):
if not self._base_path: largest_size, index = 0, 0
self._base_path = self._handle.status().save_path
return self._base_path
def index_from_name(self, file_name):
for file_num in range(self.torrent_file.num_files()): for file_num in range(self.torrent_file.num_files()):
if '.pad' in self.torrent_file.file_path(file_num): if self.torrent_file.file_size(file_num) > largest_size:
continue # ignore padding files largest_size = self.torrent_file.file_size(file_num)
if file_name == os.path.basename(self.full_path_at(file_num)): index = file_num
return file_num return index
def stop_tasks(self): def stop_tasks(self):
self._handle.save_resume_data()
while self.tasks: while self.tasks:
self.tasks.pop().cancel() self.tasks.pop().cancel()
def byte_range_to_piece_range(
self, file_index, start_offset, end_offset) -> Tuple[libtorrent.peer_request, libtorrent.peer_request]:
start_piece = self._torrent_info.map_file(file_index, start_offset, 0)
end_piece = self._torrent_info.map_file(file_index, end_offset, 0)
return start_piece, end_piece
async def stream_range_as_completed(self, file_name, start, end):
file_index = self.index_from_name(file_name)
if file_index is None:
raise ValueError(f"Attempt to stream from invalid file. Expected name: {file_name}")
first_piece, final_piece = self.byte_range_to_piece_range(file_index, start, end)
start_piece_offset = first_piece.start
piece_size = self._torrent_info.piece_length()
log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d, piece size: %d): %s",
first_piece.piece, final_piece.piece, start, end, piece_size, self.name)
self.prioritize(file_index, start, end)
for piece_index in range(first_piece.piece, final_piece.piece + 1):
while not self._handle.have_piece(piece_index):
log.info("Waiting for piece %d: %s", piece_index, self.name)
self._handle.set_piece_deadline(piece_index, 0)
await asyncio.sleep(0.2)
log.info("Streaming piece offset %d / %d for torrent %s", piece_index, final_piece.piece, self.name)
yield piece_size - start_piece_offset
def _show_status(self): def _show_status(self):
# fixme: cleanup # fixme: cleanup
if not self._handle.is_valid(): if not self._handle.is_valid():
return return
status = self._handle.status() status = self._handle.status()
self._base_path = status.save_path
if status.has_metadata: if status.has_metadata:
self.size = status.total_wanted self.size = status.total_wanted
self.total_wanted_done = status.total_wanted_done self.total_wanted_done = status.total_wanted_done
self.name = status.name self.name = status.name
if not self.metadata_completed.is_set(): if not self.metadata_completed.is_set():
self.metadata_completed.set() self.metadata_completed.set()
self._torrent_info = self._handle.torrent_file()
log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name) log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name)
log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s', self.torrent_file = self._handle.get_torrent_info().files()
status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000, self._base_path = status.save_path
status.num_peers, status.num_seeds, status.state, status.save_path) first_piece = self.torrent_file.at(self.largest_file_index).offset
if (status.is_finished or status.is_seeding) and not self.finished.is_set(): if not self.started.is_set():
if self._handle.have_piece(first_piece):
self.started.set()
else:
# prioritize it
self._handle.set_piece_deadline(first_piece, 100)
if not status.is_seeding:
log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s',
status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000,
status.num_peers, status.num_seeds, status.state, status.save_path)
elif not self.finished.is_set():
self.finished.set() self.finished.set()
log.info("Torrent finished: %s", self.name) log.info("Torrent finished: %s", self.name)
def prioritize(self, file_index, start, end, cleanup=False):
first_piece, last_piece = self.byte_range_to_piece_range(file_index, start, end)
priorities = self._handle.get_piece_priorities()
priorities = [0 if cleanup else 1 for _ in priorities]
self._handle.clear_piece_deadlines()
for idx, piece_number in enumerate(range(first_piece.piece, last_piece.piece)):
priorities[piece_number] = 7 - idx if 0 <= idx <= 6 else 1
self._handle.set_piece_deadline(piece_number, idx)
log.debug("Prioritizing pieces for %s: %s", self.name, priorities)
self._handle.prioritize_pieces(priorities)
async def status_loop(self): async def status_loop(self):
while True: while True:
self._show_status() self._show_status()
@ -142,21 +105,19 @@ class TorrentSession:
self._loop = loop self._loop = loop
self._executor = executor self._executor = executor
self._session: Optional[libtorrent.session] = None self._session: Optional[libtorrent.session] = None
self._handles: Dict[str, TorrentHandle] = {} self._handles = {}
self.tasks = [] self.tasks = []
self.wait_start = True
def add_peer(self, btih, addr, port): async def add_fake_torrent(self):
self._handles[btih]._handle.connect_peer((addr, port))
async def add_fake_torrent(self, file_count=3):
tmpdir = mkdtemp() tmpdir = mkdtemp()
info = _create_fake_torrent(tmpdir, file_count=file_count) info, btih = _create_fake_torrent(tmpdir)
flags = libtorrent.add_torrent_params_flags_t.flag_seed_mode flags = libtorrent.add_torrent_params_flags_t.flag_seed_mode
handle = self._session.add_torrent({ handle = self._session.add_torrent({
'ti': info, 'save_path': tmpdir, 'flags': flags 'ti': info, 'save_path': tmpdir, 'flags': flags
}) })
self._handles[str(info.info_hash())] = TorrentHandle(self._loop, self._executor, handle) self._handles[btih] = TorrentHandle(self._loop, self._executor, handle)
return str(info.info_hash()) return btih
async def bind(self, interface: str = '0.0.0.0', port: int = 10889): async def bind(self, interface: str = '0.0.0.0', port: int = 10889):
settings = { settings = {
@ -170,14 +131,15 @@ class TorrentSession:
self.tasks.append(self._loop.create_task(self.process_alerts())) self.tasks.append(self._loop.create_task(self.process_alerts()))
def stop(self): def stop(self):
while self._handles:
self._handles.popitem()[1].stop_tasks()
while self.tasks: while self.tasks:
self.tasks.pop().cancel() self.tasks.pop().cancel()
if self._session: self._session.save_state()
self._session.save_state() self._session.pause()
self._session.pause() self._session.stop_dht()
self._session = None self._session.stop_lsd()
self._session.stop_natpmp()
self._session.stop_upnp()
self._session = None
def _pop_alerts(self): def _pop_alerts(self):
for alert in self._session.pop_alerts(): for alert in self._session.pop_alerts():
@ -211,23 +173,18 @@ class TorrentSession:
handle.force_dht_announce() handle.force_dht_announce()
self._handles[btih] = TorrentHandle(self._loop, self._executor, handle) self._handles[btih] = TorrentHandle(self._loop, self._executor, handle)
def full_path(self, btih, file_num) -> Optional[str]: def full_path(self, btih):
return self._handles[btih].full_path_at(file_num) return self._handles[btih].largest_file
def save_path(self, btih):
return self._handles[btih].save_path
def has_torrent(self, btih):
return btih in self._handles
async def add_torrent(self, btih, download_path): async def add_torrent(self, btih, download_path):
if btih in self._handles:
return await self._handles[btih].metadata_completed.wait()
await self._loop.run_in_executor( await self._loop.run_in_executor(
self._executor, self._add_torrent, btih, download_path self._executor, self._add_torrent, btih, download_path
) )
self._handles[btih].tasks.append(self._loop.create_task(self._handles[btih].status_loop())) self._handles[btih].tasks.append(self._loop.create_task(self._handles[btih].status_loop()))
await self._handles[btih].metadata_completed.wait() await self._handles[btih].metadata_completed.wait()
if self.wait_start:
# fixme: temporary until we add streaming support, otherwise playback fails!
await self._handles[btih].started.wait()
def remove_torrent(self, btih, remove_files=False): def remove_torrent(self, btih, remove_files=False):
if btih in self._handles: if btih in self._handles:
@ -240,17 +197,9 @@ class TorrentSession:
handle = self._handles[btih] handle = self._handles[btih]
await handle.resume() await handle.resume()
def get_total_size(self, btih): def get_size(self, btih):
return self._handles[btih].size return self._handles[btih].size
def get_index_from_name(self, btih, file_name):
return self._handles[btih].index_from_name(file_name)
def get_size(self, btih, file_name) -> Optional[int]:
for (path, size) in self.get_files(btih).items():
if os.path.basename(path) == file_name:
return size
def get_name(self, btih): def get_name(self, btih):
return self._handles[btih].name return self._handles[btih].name
@ -260,38 +209,23 @@ class TorrentSession:
def is_completed(self, btih): def is_completed(self, btih):
return self._handles[btih].finished.is_set() return self._handles[btih].finished.is_set()
def stream_file(self, btih, file_name, start, end):
handle = self._handles[btih]
return handle.stream_range_as_completed(file_name, start, end)
def get_files(self, btih) -> Dict:
handle = self._handles[btih]
return {
self.full_path(btih, file_num): handle.torrent_file.file_size(file_num)
for file_num in range(handle.torrent_file.num_files())
if '.pad' not in handle.torrent_file.file_path(file_num)
}
def get_magnet_uri(btih): def get_magnet_uri(btih):
return f"magnet:?xt=urn:btih:{btih}" return f"magnet:?xt=urn:btih:{btih}"
def _create_fake_torrent(tmpdir, file_count=3, largest_index=1): def _create_fake_torrent(tmpdir):
# layout: subdir/tmp{0..file_count-1} files. v1+v2. automatic piece size. # beware, that's just for testing
# largest_index: which file index {0 ... file_count} will be the largest file path = os.path.join(tmpdir, 'tmp')
with open(path, 'wb') as myfile:
size = myfile.write(bytes([random.randint(0, 255) for _ in range(40)]) * 1024)
file_storage = libtorrent.file_storage() file_storage = libtorrent.file_storage()
subfolder = os.path.join(tmpdir, "subdir") file_storage.add_file('tmp', size)
os.mkdir(subfolder) t = libtorrent.create_torrent(file_storage, 0, 4 * 1024 * 1024)
for file_number in range(file_count):
file_name = f"tmp{file_number}"
with open(os.path.join(subfolder, file_name), 'wb') as myfile:
size = myfile.write(
bytes([random.randint(0, 255) for _ in range(10 - abs(file_number - largest_index))]) * 1024)
file_storage.add_file(os.path.join("subdir", file_name), size)
t = libtorrent.create_torrent(file_storage, 0, 0)
libtorrent.set_piece_hashes(t, tmpdir) libtorrent.set_piece_hashes(t, tmpdir)
return libtorrent.torrent_info(t.generate()) info = libtorrent.torrent_info(t.generate())
btih = sha1(info.metadata()).hexdigest()
return info, btih
async def main(): async def main():
@ -304,16 +238,17 @@ async def main():
executor = None executor = None
session = TorrentSession(asyncio.get_event_loop(), executor) session = TorrentSession(asyncio.get_event_loop(), executor)
await session.bind() session2 = TorrentSession(asyncio.get_event_loop(), executor)
await session.add_torrent(btih, os.path.expanduser("~/Downloads")) await session.bind('localhost', port=4040)
await session2.bind('localhost', port=4041)
btih = await session.add_fake_torrent()
session2._session.add_dht_node(('localhost', 4040))
await session2.add_torrent(btih, "/tmp/down")
while True: while True:
session.full_path(btih, 0) await asyncio.sleep(100)
await asyncio.sleep(1)
await session.pause() await session.pause()
executor.shutdown() executor.shutdown()
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
log = logging.getLogger(__name__)
asyncio.run(main()) asyncio.run(main())

View file

@ -1,14 +1,12 @@
import asyncio import asyncio
import binascii
import logging import logging
import os import os
import typing import typing
from typing import Optional from typing import Optional
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable from aiohttp.web import Request
from lbry.error import DownloadMetadataTimeoutError
from lbry.file.source_manager import SourceManager from lbry.file.source_manager import SourceManager
from lbry.file.source import ManagedDownloadSource from lbry.file.source import ManagedDownloadSource
from lbry.schema.mime_types import guess_media_type
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.torrent.session import TorrentSession from lbry.torrent.session import TorrentSession
@ -21,6 +19,12 @@ if typing.TYPE_CHECKING:
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def path_or_none(encoded_path) -> Optional[str]:
if not encoded_path:
return
return binascii.unhexlify(encoded_path).decode()
class TorrentSource(ManagedDownloadSource): class TorrentSource(ManagedDownloadSource):
STATUS_STOPPED = "stopped" STATUS_STOPPED = "stopped"
filter_fields = SourceManager.filter_fields filter_fields = SourceManager.filter_fields
@ -38,55 +42,15 @@ class TorrentSource(ManagedDownloadSource):
super().__init__(loop, config, storage, identifier, file_name, download_directory, status, claim, download_id, super().__init__(loop, config, storage, identifier, file_name, download_directory, status, claim, download_id,
rowid, content_fee, analytics_manager, added_on) rowid, content_fee, analytics_manager, added_on)
self.torrent_session = torrent_session self.torrent_session = torrent_session
self._suggested_file_name = None
self._full_path = None
@property @property
def full_path(self) -> Optional[str]: def full_path(self) -> Optional[str]:
if not self._full_path: full_path = self.torrent_session.full_path(self.identifier)
self._full_path = self.select_path() self.download_directory = os.path.dirname(full_path)
self._file_name = os.path.basename(self._full_path) return full_path
self.download_directory = self.torrent_session.save_path(self.identifier)
return self._full_path
def select_path(self):
wanted_name = (self.stream_claim_info.claim.stream.source.name or '') if self.stream_claim_info else ''
wanted_index = self.torrent_session.get_index_from_name(self.identifier, wanted_name)
if wanted_index is None:
# maybe warn?
largest = (None, -1)
for (path, size) in self.torrent_session.get_files(self.identifier).items():
largest = (path, size) if size > largest[1] else largest
return largest[0]
else:
return self.torrent_session.full_path(self.identifier, wanted_index or 0)
@property
def suggested_file_name(self):
self._suggested_file_name = self._suggested_file_name or os.path.basename(self.select_path())
return self._suggested_file_name
@property
def mime_type(self) -> Optional[str]:
return guess_media_type(os.path.basename(self.full_path))[0]
async def setup(self, timeout: Optional[float] = None):
try:
metadata_download = self.torrent_session.add_torrent(self.identifier, self.download_directory)
await asyncio.wait_for(metadata_download, timeout, loop=self.loop)
except asyncio.TimeoutError:
self.torrent_session.remove_torrent(btih=self.identifier)
raise DownloadMetadataTimeoutError(self.identifier)
self.download_directory = self.torrent_session.save_path(self.identifier)
self._file_name = os.path.basename(self.full_path)
async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False): async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False):
await self.setup(timeout) await self.torrent_session.add_torrent(self.identifier, self.download_directory)
if not self.rowid:
await self.storage.add_torrent(self.identifier, self.torrent_length, self.torrent_name)
self.rowid = await self.storage.save_downloaded_file(
self.identifier, self.file_name, self.download_directory, 0.0, added_on=self._added_on
)
async def stop(self, finished: bool = False): async def stop(self, finished: bool = False):
await self.torrent_session.remove_torrent(self.identifier) await self.torrent_session.remove_torrent(self.identifier)
@ -96,11 +60,7 @@ class TorrentSource(ManagedDownloadSource):
@property @property
def torrent_length(self): def torrent_length(self):
return self.torrent_session.get_total_size(self.identifier) return self.torrent_session.get_size(self.identifier)
@property
def stream_length(self):
return self.torrent_session.get_size(self.identifier, self.file_name)
@property @property
def written_bytes(self): def written_bytes(self):
@ -114,63 +74,13 @@ class TorrentSource(ManagedDownloadSource):
def bt_infohash(self): def bt_infohash(self):
return self.identifier return self.identifier
def stop_tasks(self): async def stop_tasks(self):
pass pass
@property @property
def completed(self): def completed(self):
return self.torrent_session.is_completed(self.identifier) return self.torrent_session.is_completed(self.identifier)
@property
def status(self):
return self.STATUS_FINISHED if self.completed else self.STATUS_RUNNING
async def stream_file(self, request):
log.info("stream torrent to browser for lbry://%s#%s (btih %s...)", self.claim_name, self.claim_id,
self.identifier[:6])
headers, start, end = self._prepare_range_response_headers(
request.headers.get('range', 'bytes=0-')
)
target = self.suggested_file_name
await self.start()
response = StreamResponse(
status=206,
headers=headers
)
await response.prepare(request)
while not os.path.exists(self.full_path):
async for _ in self.torrent_session.stream_file(self.identifier, target, start, end):
break
with open(self.full_path, 'rb') as infile:
infile.seek(start)
async for read_size in self.torrent_session.stream_file(self.identifier, target, start, end):
if infile.tell() + read_size < end:
await response.write(infile.read(read_size))
else:
await response.write_eof(infile.read(end - infile.tell() + 1))
return response
def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]:
if '=' in get_range:
get_range = get_range.split('=')[1]
start, end = get_range.split('-')
size = self.stream_length
start = int(start)
end = int(end) if end else size - 1
if end >= size or not 0 <= start < size:
raise HTTPRequestRangeNotSatisfiable()
final_size = end - start + 1
headers = {
'Accept-Ranges': 'bytes',
'Content-Range': f'bytes {start}-{end}/{size}',
'Content-Length': str(final_size),
'Content-Type': self.mime_type
}
return headers, start, end
class TorrentManager(SourceManager): class TorrentManager(SourceManager):
_sources: typing.Dict[str, ManagedDownloadSource] _sources: typing.Dict[str, ManagedDownloadSource]
@ -193,7 +103,7 @@ class TorrentManager(SourceManager):
async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[str], async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[str],
download_directory: Optional[str], status: str, download_directory: Optional[str], status: str,
claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'], claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'],
added_on: Optional[int], **kwargs): added_on: Optional[int]):
stream = TorrentSource( stream = TorrentSource(
self.loop, self.config, self.storage, identifier=bt_infohash, file_name=file_name, self.loop, self.config, self.storage, identifier=bt_infohash, file_name=file_name,
download_directory=download_directory, status=status, claim=claim, rowid=rowid, download_directory=download_directory, status=status, claim=claim, rowid=rowid,
@ -201,20 +111,15 @@ class TorrentManager(SourceManager):
torrent_session=self.torrent_session torrent_session=self.torrent_session
) )
self.add(stream) self.add(stream)
await stream.setup()
async def initialize_from_database(self): async def initialize_from_database(self):
for file in await self.storage.get_all_torrent_files(): pass
claim = await self.storage.get_content_claim_for_torrent(file['bt_infohash'])
file['download_directory'] = bytes.fromhex(file['download_directory'] or '').decode() or None
file['file_name'] = bytes.fromhex(file['file_name'] or '').decode() or None
await self._load_stream(claim=claim, **file)
async def start(self): async def start(self):
await super().start() await super().start()
def stop(self): async def stop(self):
super().stop() await super().stop()
log.info("finished stopping the torrent manager") log.info("finished stopping the torrent manager")
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
@ -227,6 +132,9 @@ class TorrentManager(SourceManager):
async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
raise NotImplementedError raise NotImplementedError
# blob_hashes = [source.sd_hash] + [b.blob_hash for b in source.descriptor.blobs[:-1]]
# await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
# await self.storage.delete_stream(source.descriptor)
async def stream_partial_content(self, request: Request, identifier: str): async def stream_partial_content(self, request: Request, sd_hash: str):
return await self._sources[identifier].stream_file(request) raise NotImplementedError

View file

@ -141,7 +141,7 @@ class CoinSelector:
_) -> List[OutputEffectiveAmountEstimator]: _) -> List[OutputEffectiveAmountEstimator]:
""" Accumulate UTXOs at random until there is enough to cover the target. """ """ Accumulate UTXOs at random until there is enough to cover the target. """
target = self.target + self.cost_of_change target = self.target + self.cost_of_change
self.random.shuffle(txos, self.random.random) self.random.shuffle(txos, random=self.random.random) # pylint: disable=deprecated-argument
selection = [] selection = []
amount = 0 amount = 0
for coin in txos: for coin in txos:

View file

@ -329,10 +329,10 @@ class Ledger(metaclass=LedgerRegistry):
async def start(self): async def start(self):
if not os.path.exists(self.path): if not os.path.exists(self.path):
os.mkdir(self.path) os.mkdir(self.path)
await asyncio.wait([ await asyncio.wait(map(asyncio.create_task, [
self.db.open(), self.db.open(),
self.headers.open() self.headers.open()
]) ]))
fully_synced = self.on_ready.first fully_synced = self.on_ready.first
asyncio.create_task(self.network.start()) asyncio.create_task(self.network.start())
await self.network.on_connected.first await self.network.on_connected.first
@ -466,9 +466,9 @@ class Ledger(metaclass=LedgerRegistry):
async def subscribe_accounts(self): async def subscribe_accounts(self):
if self.network.is_connected and self.accounts: if self.network.is_connected and self.accounts:
log.info("Subscribe to %i accounts", len(self.accounts)) log.info("Subscribe to %i accounts", len(self.accounts))
await asyncio.wait([ await asyncio.wait(map(asyncio.create_task, [
self.subscribe_account(a) for a in self.accounts self.subscribe_account(a) for a in self.accounts
]) ]))
async def subscribe_account(self, account: Account): async def subscribe_account(self, account: Account):
for address_manager in account.address_managers.values(): for address_manager in account.address_managers.values():
@ -938,9 +938,7 @@ class Ledger(metaclass=LedgerRegistry):
"%d change addresses (gap: %d), %d channels, %d certificates and %d claims. ", "%d change addresses (gap: %d), %d channels, %d certificates and %d claims. ",
account.id, balance, total_receiving, account.receiving.gap, total_change, account.id, balance, total_receiving, account.receiving.gap, total_change,
account.change.gap, channel_count, len(account.channel_keys), claim_count) account.change.gap, channel_count, len(account.channel_keys), claim_count)
except Exception as err: except Exception:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.exception( log.exception(
'Failed to display wallet state, please file issue ' 'Failed to display wallet state, please file issue '
'for this bug along with the traceback you see below:') 'for this bug along with the traceback you see below:')
@ -963,9 +961,7 @@ class Ledger(metaclass=LedgerRegistry):
claim_ids = [p.purchased_claim_id for p in purchases] claim_ids = [p.purchased_claim_id for p in purchases]
try: try:
resolved, _, _, _ = await self.claim_search([], claim_ids=claim_ids) resolved, _, _, _ = await self.claim_search([], claim_ids=claim_ids)
except Exception as err: except Exception:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.exception("Resolve failed while looking up purchased claim ids:") log.exception("Resolve failed while looking up purchased claim ids:")
resolved = [] resolved = []
lookup = {claim.claim_id: claim for claim in resolved} lookup = {claim.claim_id: claim for claim in resolved}
@ -1045,9 +1041,7 @@ class Ledger(metaclass=LedgerRegistry):
claim_ids = collection.claim.collection.claims.ids[offset:page_size + offset] claim_ids = collection.claim.collection.claims.ids[offset:page_size + offset]
try: try:
resolve_results, _, _, _ = await self.claim_search([], claim_ids=claim_ids) resolve_results, _, _, _ = await self.claim_search([], claim_ids=claim_ids)
except Exception as err: except Exception:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.exception("Resolve failed while looking up collection claim ids:") log.exception("Resolve failed while looking up collection claim ids:")
return [] return []
claims = [] claims = []

View file

@ -117,7 +117,7 @@ class ClientSession(BaseClientSession):
) )
else: else:
await asyncio.sleep(max(0, max_idle - (now - self.last_send))) await asyncio.sleep(max(0, max_idle - (now - self.last_send)))
except Exception as err: except (Exception, asyncio.CancelledError) as err:
if isinstance(err, asyncio.CancelledError): if isinstance(err, asyncio.CancelledError):
log.info("closing connection to %s:%i", *self.server) log.info("closing connection to %s:%i", *self.server)
else: else:
@ -214,7 +214,7 @@ class Network:
def loop_task_done_callback(f): def loop_task_done_callback(f):
try: try:
f.result() f.result()
except Exception: except (Exception, asyncio.CancelledError):
if self.running: if self.running:
log.exception("wallet server connection loop crashed") log.exception("wallet server connection loop crashed")
@ -312,7 +312,8 @@ class Network:
sleep_delay = 30 sleep_delay = 30
while self.running: while self.running:
await asyncio.wait( await asyncio.wait(
[asyncio.sleep(30), self._urgent_need_reconnect.wait()], return_when=asyncio.FIRST_COMPLETED map(asyncio.create_task, [asyncio.sleep(30), self._urgent_need_reconnect.wait()]),
return_when=asyncio.FIRST_COMPLETED
) )
if self._urgent_need_reconnect.is_set(): if self._urgent_need_reconnect.is_set():
sleep_delay = 30 sleep_delay = 30
@ -338,7 +339,7 @@ class Network:
try: try:
if not self._urgent_need_reconnect.is_set(): if not self._urgent_need_reconnect.is_set():
await asyncio.wait( await asyncio.wait(
[self._keepalive_task, self._urgent_need_reconnect.wait()], [self._keepalive_task, asyncio.create_task(self._urgent_need_reconnect.wait())],
return_when=asyncio.FIRST_COMPLETED return_when=asyncio.FIRST_COMPLETED
) )
else: else:

View file

@ -264,8 +264,7 @@ class SPVNode:
await self.server.start() await self.server.start()
except Exception as e: except Exception as e:
self.stopped = True self.stopped = True
if not isinstance(e, asyncio.CancelledError): log.exception("failed to start spv node")
log.exception("failed to start spv node")
raise e raise e
async def stop(self, cleanup=True): async def stop(self, cleanup=True):

View file

@ -28,6 +28,7 @@ disable=
no-else-return, no-else-return,
cyclic-import, cyclic-import,
missing-docstring, missing-docstring,
consider-using-f-string,
duplicate-code, duplicate-code,
expression-not-assigned, expression-not-assigned,
inconsistent-return-statements, inconsistent-return-statements,

View file

@ -18,7 +18,7 @@ setup(
long_description_content_type="text/markdown", long_description_content_type="text/markdown",
keywords="lbry protocol media", keywords="lbry protocol media",
license='MIT', license='MIT',
python_requires='>=3.7', python_requires='>=3.8',
packages=find_packages(exclude=('tests',)), packages=find_packages(exclude=('tests',)),
zip_safe=False, zip_safe=False,
entry_points={ entry_points={
@ -36,7 +36,7 @@ setup(
'distro==1.4.0', 'distro==1.4.0',
'base58==1.0.0', 'base58==1.0.0',
'cffi==1.13.2', 'cffi==1.13.2',
'cryptography==2.5', 'cryptography==3.4.7',
'protobuf==3.17.2', 'protobuf==3.17.2',
'prometheus_client==0.7.1', 'prometheus_client==0.7.1',
'ecdsa==0.13.3', 'ecdsa==0.13.3',
@ -50,7 +50,7 @@ setup(
], ],
extras_require={ extras_require={
'lint': [ 'lint': [
'pylint==2.10.0' 'pylint==2.13.9'
], ],
'test': [ 'test': [
'coverage', 'coverage',

View file

@ -61,16 +61,14 @@ def mock_network_loop(loop: asyncio.AbstractEventLoop,
dht_network[from_addr] = protocol dht_network[from_addr] = protocol
return transport, protocol return transport, protocol
with mock.patch('socket.socket') as mock_socket: mock_sock = mock.Mock(spec=socket.socket)
mock_sock = mock.Mock(spec=socket.socket) mock_sock.setsockopt = lambda *_: None
mock_sock.setsockopt = lambda *_: None mock_sock.bind = lambda *_: None
mock_sock.bind = lambda *_: None mock_sock.setblocking = lambda *_: None
mock_sock.setblocking = lambda *_: None mock_sock.getsockname = lambda: "0.0.0.0"
mock_sock.getsockname = lambda: "0.0.0.0" mock_sock.getpeername = lambda: ""
mock_sock.getpeername = lambda: "" mock_sock.close = lambda: None
mock_sock.close = lambda: None mock_sock.type = socket.SOCK_DGRAM
mock_sock.type = socket.SOCK_DGRAM mock_sock.fileno = lambda: 7
mock_sock.fileno = lambda: 7 loop.create_datagram_endpoint = create_datagram_endpoint
mock_socket.return_value = mock_sock yield
loop.create_datagram_endpoint = create_datagram_endpoint
yield

View file

@ -1,18 +1,14 @@
import time
import unittest import unittest
from unittest import skipIf from unittest import skipIf
import asyncio import asyncio
import os import os
from binascii import hexlify from binascii import hexlify
import aiohttp.web
from lbry.schema import Claim from lbry.schema import Claim
from lbry.stream.background_downloader import BackgroundDownloader from lbry.stream.background_downloader import BackgroundDownloader
from lbry.stream.descriptor import StreamDescriptor from lbry.stream.descriptor import StreamDescriptor
from lbry.testcase import CommandTestCase from lbry.testcase import CommandTestCase
from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT
from lbry.utils import aiohttp_request
from lbry.wallet import Transaction from lbry.wallet import Transaction
from lbry.torrent.tracker import UDPTrackerServerProtocol from lbry.torrent.tracker import UDPTrackerServerProtocol
@ -21,104 +17,55 @@ class FileCommands(CommandTestCase):
def __init__(self, *a, **kw): def __init__(self, *a, **kw):
super().__init__(*a, **kw) super().__init__(*a, **kw)
self.skip_libtorrent = False self.skip_libtorrent = False
self.streaming_port = 60818
self.seeder_session = None
async def initialize_torrent(self, tx_to_update=None, pick_a_file=True, name=None): async def add_forever(self):
assert name is None or tx_to_update is None while True:
if not self.seeder_session: for handle in self.client_session._handles.values():
handle._handle.connect_peer(('127.0.0.1', 4040))
await asyncio.sleep(.1)
async def initialize_torrent(self, tx_to_update=None):
if not hasattr(self, 'seeder_session'):
self.seeder_session = TorrentSession(self.loop, None) self.seeder_session = TorrentSession(self.loop, None)
self.addCleanup(self.seeder_session.stop) self.addCleanup(self.seeder_session.stop)
await self.seeder_session.bind('127.0.0.1', port=4040) await self.seeder_session.bind('127.0.0.1', port=4040)
btih = await self.seeder_session.add_fake_torrent(file_count=3) btih = await self.seeder_session.add_fake_torrent()
files = [(size, path) for (path, size) in self.seeder_session.get_files(btih).items()]
files.sort()
# picking a file will pick something in the middle, while automatic selection will pick largest
self.expected_size, self.expected_path = files[1] if pick_a_file else files[-1]
address = await self.account.receiving.get_or_create_usable_address() address = await self.account.receiving.get_or_create_usable_address()
claim = tx_to_update.outputs[0].claim if tx_to_update else Claim()
claim.stream.update(bt_infohash=btih)
if pick_a_file:
claim.stream.source.name = os.path.basename(self.expected_path)
if not tx_to_update: if not tx_to_update:
claim = Claim()
claim.stream.update(bt_infohash=btih)
tx = await Transaction.claim_create( tx = await Transaction.claim_create(
name or 'torrent', claim, 1, address, [self.account], self.account 'torrent', claim, 1, address, [self.account], self.account
) )
else: else:
claim = tx_to_update.outputs[0].claim
claim.stream.update(bt_infohash=btih)
tx = await Transaction.claim_update( tx = await Transaction.claim_update(
tx_to_update.outputs[0], claim, 1, address, [self.account], self.account tx_to_update.outputs[0], claim, 1, address, [self.account], self.account
) )
await tx.sign([self.account]) await tx.sign([self.account])
await self.broadcast_and_confirm(tx) await self.broadcast_and_confirm(tx)
self.client_session = self.daemon.file_manager.source_managers['torrent'].torrent_session self.client_session = self.daemon.file_manager.source_managers['torrent'].torrent_session
self.client_session.wait_start = False # fixme: this is super slow on tests
task = asyncio.create_task(self.add_forever())
self.addCleanup(task.cancel)
return tx, btih return tx, btih
async def assert_torrent_streaming_works(self, btih):
url = f'http://{self.daemon.conf.streaming_host}:{self.streaming_port}/stream/{btih}'
if self.daemon.streaming_runner.server is None:
await self.daemon.streaming_runner.setup()
site = aiohttp.web.TCPSite(self.daemon.streaming_runner, self.daemon.conf.streaming_host,
self.streaming_port)
await site.start()
async with aiohttp_request('get', url) as req:
self.assertEqual(req.status, 206)
self.assertEqual(req.headers.get('Content-Type'), 'application/octet-stream')
content_range = req.headers.get('Content-Range')
content_length = int(req.headers.get('Content-Length'))
streamed_bytes = await req.content.read()
expected_size = self.expected_size
self.assertEqual(expected_size, len(streamed_bytes))
self.assertEqual(content_length, len(streamed_bytes))
self.assertEqual(f"bytes 0-{expected_size - 1}/{expected_size}", content_range)
@skipIf(TorrentSession is None, "libtorrent not installed") @skipIf(TorrentSession is None, "libtorrent not installed")
async def test_download_torrent(self): async def test_download_torrent(self):
tx, btih = await self.initialize_torrent(pick_a_file=False) tx, btih = await self.initialize_torrent()
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent'))) self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent')))
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
# second call, see its there and move on # second call, see its there and move on
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent'))) self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent')))
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, btih)
self.assertIn(btih, self.client_session._handles) self.assertIn(btih, self.client_session._handles)
# stream over streaming API (full range of the largest file)
await self.assert_torrent_streaming_works(btih)
# check json encoder fields for torrent sources
file = (await self.out(self.daemon.jsonrpc_file_list()))['items'][0]
self.assertEqual(btih, file['metadata']['source']['bt_infohash'])
self.assertAlmostEqual(time.time(), file['added_on'], delta=12)
self.assertEqual("application/octet-stream", file['mime_type'])
self.assertEqual(os.path.basename(self.expected_path), file['suggested_file_name'])
self.assertEqual(os.path.basename(self.expected_path), file['stream_name'])
while not file['completed']: # improve that
await asyncio.sleep(0.5)
file = (await self.out(self.daemon.jsonrpc_file_list()))['items'][0]
self.assertTrue(file['completed'])
self.assertGreater(file['total_bytes_lower_bound'], 0)
self.assertEqual(file['total_bytes_lower_bound'], file['total_bytes'])
self.assertEqual(file['total_bytes'], file['written_bytes'])
self.assertEqual('finished', file['status'])
# filter by a field which is missing on torrent
self.assertItemCount(await self.daemon.jsonrpc_file_list(stream_hash="abc"), 0)
tx, new_btih = await self.initialize_torrent(tx) tx, new_btih = await self.initialize_torrent(tx)
self.assertNotEqual(btih, new_btih) self.assertNotEqual(btih, new_btih)
# claim now points to another torrent, update to it # claim now points to another torrent, update to it
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent'))) self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent')))
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih) self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih)
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
# restart and verify that only one updated stream was recovered
self.daemon.file_manager.stop()
await self.daemon.file_manager.start()
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih)
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
# check it was saved properly, once
self.assertEqual(1, len(await self.daemon.storage.get_all_torrent_files()))
self.assertIn(new_btih, self.client_session._handles) self.assertIn(new_btih, self.client_session._handles)
self.assertNotIn(btih, self.client_session._handles) self.assertNotIn(btih, self.client_session._handles)
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
@ -126,11 +73,6 @@ class FileCommands(CommandTestCase):
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 0) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 0)
self.assertNotIn(new_btih, self.client_session._handles) self.assertNotIn(new_btih, self.client_session._handles)
await self.initialize_torrent(name='torrent2')
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent2')))
file = (await self.out(self.daemon.jsonrpc_file_list()))['items'][0]
self.assertEqual(os.path.basename(self.expected_path), file['stream_name'])
async def create_streams_in_range(self, *args, **kwargs): async def create_streams_in_range(self, *args, **kwargs):
self.stream_claim_ids = [] self.stream_claim_ids = []
for i in range(*args, **kwargs): for i in range(*args, **kwargs):
@ -393,12 +335,12 @@ class FileCommands(CommandTestCase):
await self.server.blob_manager.delete_blobs(all_except_sd) await self.server.blob_manager.delete_blobs(all_except_sd)
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True) resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True)
self.assertIn('error', resp) self.assertIn('error', resp)
self.assertEqual('Failed to download data blobs for %s within timeout.' % sd_hash, resp['error']) self.assertEqual('Failed to download data blobs for sd hash %s within timeout.' % sd_hash, resp['error'])
self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='foo'), "data timeout didn't create a file") self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='foo'), "data timeout didn't create a file")
await self.server.blob_manager.delete_blobs([sd_hash]) await self.server.blob_manager.delete_blobs([sd_hash])
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True) resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True)
self.assertIn('error', resp) self.assertIn('error', resp)
self.assertEqual('Failed to download metadata for %s within timeout.' % sd_hash, resp['error']) self.assertEqual('Failed to download sd blob %s within timeout.' % sd_hash, resp['error'])
async def wait_files_to_complete(self): async def wait_files_to_complete(self):
while await self.file_list(status='running'): while await self.file_list(status='running'):
@ -412,7 +354,7 @@ class FileCommands(CommandTestCase):
await self.daemon.jsonrpc_get('lbry://foo') await self.daemon.jsonrpc_get('lbry://foo')
with open(original_path, 'wb') as handle: with open(original_path, 'wb') as handle:
handle.write(b'some other stuff was there instead') handle.write(b'some other stuff was there instead')
self.daemon.file_manager.stop() await self.daemon.file_manager.stop()
await self.daemon.file_manager.start() await self.daemon.file_manager.start()
await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) # if this hangs, file didn't get set completed await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) # if this hangs, file didn't get set completed
# check that internal state got through up to the file list API # check that internal state got through up to the file list API
@ -440,8 +382,7 @@ class FileCommands(CommandTestCase):
resp = await self.out(self.daemon.jsonrpc_get('lbry://foo', timeout=2)) resp = await self.out(self.daemon.jsonrpc_get('lbry://foo', timeout=2))
self.assertNotIn('error', resp) self.assertNotIn('error', resp)
self.assertTrue(os.path.isfile(path)) self.assertTrue(os.path.isfile(path))
self.daemon.file_manager.stop() await self.daemon.file_manager.stop()
await asyncio.sleep(0.01) # FIXME: this sleep should not be needed
self.assertFalse(os.path.isfile(path)) self.assertFalse(os.path.isfile(path))
async def test_incomplete_downloads_retry(self): async def test_incomplete_downloads_retry(self):
@ -536,7 +477,7 @@ class FileCommands(CommandTestCase):
# restart the daemon and make sure the fee is still there # restart the daemon and make sure the fee is still there
self.daemon.file_manager.stop() await self.daemon.file_manager.stop()
await self.daemon.file_manager.start() await self.daemon.file_manager.start()
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].content_fee.raw, raw_content_fee) self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].content_fee.raw, raw_content_fee)

View file

@ -3,7 +3,9 @@ import hashlib
import aiohttp import aiohttp
import aiohttp.web import aiohttp.web
import asyncio import asyncio
import contextlib
from lbry.file.source import ManagedDownloadSource
from lbry.utils import aiohttp_request from lbry.utils import aiohttp_request
from lbry.blob.blob_file import MAX_BLOB_SIZE from lbry.blob.blob_file import MAX_BLOB_SIZE
from lbry.testcase import CommandTestCase from lbry.testcase import CommandTestCase
@ -21,7 +23,7 @@ def get_random_bytes(n: int) -> bytes:
class RangeRequests(CommandTestCase): class RangeRequests(CommandTestCase):
async def _restart_stream_manager(self): async def _restart_stream_manager(self):
self.daemon.file_manager.stop() await self.daemon.file_manager.stop()
await self.daemon.file_manager.start() await self.daemon.file_manager.start()
return return
@ -352,14 +354,21 @@ class RangeRequests(CommandTestCase):
path = stream.full_path path = stream.full_path
self.assertIsNotNone(path) self.assertIsNotNone(path)
if wait_for_start_writing: if wait_for_start_writing:
await stream.started_writing.wait() with contextlib.suppress(asyncio.CancelledError):
await stream.started_writing.wait()
self.assertTrue(os.path.isfile(path)) self.assertTrue(os.path.isfile(path))
await self._restart_stream_manager() await self.daemon.file_manager.stop()
# while stopped, we get no response to query and no file is present
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'], [])
self.assertEqual(os.path.isfile(path), stream.status == ManagedDownloadSource.STATUS_FINISHED)
await self.daemon.file_manager.start()
# after restart, we get a response to query and same file path
stream = (await self.daemon.jsonrpc_file_list())['items'][0] stream = (await self.daemon.jsonrpc_file_list())['items'][0]
self.assertIsNotNone(stream.full_path) self.assertIsNotNone(stream.full_path)
self.assertFalse(os.path.isfile(path)) self.assertEqual(stream.full_path, path)
if wait_for_start_writing: if wait_for_start_writing:
await stream.started_writing.wait() with contextlib.suppress(asyncio.CancelledError):
await stream.started_writing.wait()
self.assertTrue(os.path.isfile(path)) self.assertTrue(os.path.isfile(path))
async def test_file_save_stop_before_finished_streaming_only_wait_for_start(self): async def test_file_save_stop_before_finished_streaming_only_wait_for_start(self):

View file

@ -11,7 +11,7 @@ from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase
from lbry.testcase import get_fake_exchange_rate_manager from lbry.testcase import get_fake_exchange_rate_manager
from lbry.utils import generate_id from lbry.utils import generate_id
from lbry.error import InsufficientFundsError from lbry.error import InsufficientFundsError
from lbry.error import KeyFeeAboveMaxAllowedError, ResolveError, DownloadMetadataTimeoutError, DownloadDataTimeoutError from lbry.error import KeyFeeAboveMaxAllowedError, ResolveError, DownloadSDTimeoutError, DownloadDataTimeoutError
from lbry.wallet import WalletManager, Wallet, Ledger, Transaction, Input, Output, Database from lbry.wallet import WalletManager, Wallet, Ledger, Transaction, Input, Output, Database
from lbry.wallet.constants import CENT, NULL_HASH32 from lbry.wallet.constants import CENT, NULL_HASH32
from lbry.wallet.network import ClientSession from lbry.wallet.network import ClientSession
@ -229,10 +229,10 @@ class TestStreamManager(BlobExchangeTestBase):
self.assertFalse(event['properties']['added_fixed_peers']) self.assertFalse(event['properties']['added_fixed_peers'])
self.assertEqual(event['properties']['connection_failures_count'], 1) self.assertEqual(event['properties']['connection_failures_count'], 1)
self.assertEqual( self.assertEqual(
event['properties']['error_message'], f'Failed to download metadata for {self.sd_hash} within timeout.' event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout.'
) )
await self._test_time_to_first_bytes(check_post, DownloadMetadataTimeoutError, after_setup=after_setup) await self._test_time_to_first_bytes(check_post, DownloadSDTimeoutError, after_setup=after_setup)
async def test_override_fixed_peer_delay_dht_disabled(self): async def test_override_fixed_peer_delay_dht_disabled(self):
self.client_config.fixed_peers = [(self.server_from_client.address, self.server_from_client.tcp_port)] self.client_config.fixed_peers = [(self.server_from_client.address, self.server_from_client.tcp_port)]
@ -266,18 +266,18 @@ class TestStreamManager(BlobExchangeTestBase):
def check_post(event): def check_post(event):
self.assertEqual(event['event'], 'Time To First Bytes') self.assertEqual(event['event'], 'Time To First Bytes')
self.assertEqual(event['properties']['error'], 'DownloadMetadataTimeoutError') self.assertEqual(event['properties']['error'], 'DownloadSDTimeoutError')
self.assertEqual(event['properties']['tried_peers_count'], 0) self.assertEqual(event['properties']['tried_peers_count'], 0)
self.assertEqual(event['properties']['active_peer_count'], 0) self.assertEqual(event['properties']['active_peer_count'], 0)
self.assertFalse(event['properties']['use_fixed_peers']) self.assertFalse(event['properties']['use_fixed_peers'])
self.assertFalse(event['properties']['added_fixed_peers']) self.assertFalse(event['properties']['added_fixed_peers'])
self.assertIsNone(event['properties']['fixed_peer_delay']) self.assertIsNone(event['properties']['fixed_peer_delay'])
self.assertEqual( self.assertEqual(
event['properties']['error_message'], f'Failed to download metadata for {self.sd_hash} within timeout.' event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout.'
) )
start = self.loop.time() start = self.loop.time()
await self._test_time_to_first_bytes(check_post, DownloadMetadataTimeoutError) await self._test_time_to_first_bytes(check_post, DownloadSDTimeoutError)
duration = self.loop.time() - start duration = self.loop.time() - start
self.assertLessEqual(duration, 5) self.assertLessEqual(duration, 5)
self.assertGreaterEqual(duration, 3.0) self.assertGreaterEqual(duration, 3.0)
@ -387,7 +387,7 @@ class TestStreamManager(BlobExchangeTestBase):
self.server.stop_server() self.server.stop_server()
await self.setup_stream_manager() await self.setup_stream_manager()
await self._test_download_error_analytics_on_start( await self._test_download_error_analytics_on_start(
DownloadMetadataTimeoutError, f'Failed to download metadata for {self.sd_hash} within timeout.', timeout=1 DownloadSDTimeoutError, f'Failed to download sd blob {self.sd_hash} within timeout.', timeout=1
) )
async def test_download_data_timeout(self): async def test_download_data_timeout(self):
@ -396,7 +396,7 @@ class TestStreamManager(BlobExchangeTestBase):
head_blob_hash = json.loads(sdf.read())['blobs'][0]['blob_hash'] head_blob_hash = json.loads(sdf.read())['blobs'][0]['blob_hash']
self.server_blob_manager.delete_blob(head_blob_hash) self.server_blob_manager.delete_blob(head_blob_hash)
await self._test_download_error_analytics_on_start( await self._test_download_error_analytics_on_start(
DownloadDataTimeoutError, f'Failed to download data blobs for {self.sd_hash} within timeout.', timeout=1 DownloadDataTimeoutError, f'Failed to download data blobs for sd hash {self.sd_hash} within timeout.', timeout=1
) )
async def test_unexpected_error(self): async def test_unexpected_error(self):
@ -424,7 +424,7 @@ class TestStreamManager(BlobExchangeTestBase):
self.assertIsNone(stream.full_path) self.assertIsNone(stream.full_path)
self.assertEqual(0, stream.written_bytes) self.assertEqual(0, stream.written_bytes)
self.stream_manager.stop() await self.stream_manager.stop()
await self.stream_manager.start() await self.stream_manager.start()
self.assertEqual(1, len(self.stream_manager.streams)) self.assertEqual(1, len(self.stream_manager.streams))
stream = list(self.stream_manager.streams.values())[0] stream = list(self.stream_manager.streams.values())[0]
@ -449,7 +449,7 @@ class TestStreamManager(BlobExchangeTestBase):
stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager) stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
await stream.finished_writing.wait() await stream.finished_writing.wait()
await asyncio.sleep(0) await asyncio.sleep(0)
self.stream_manager.stop() await self.stream_manager.stop()
self.client_blob_manager.stop() self.client_blob_manager.stop()
# partial removal, only sd blob is missing. # partial removal, only sd blob is missing.
# in this case, we recover the sd blob while the other blobs are kept untouched as 'finished' # in this case, we recover the sd blob while the other blobs are kept untouched as 'finished'