Compare commits

...

33 commits

Author SHA1 Message Date
Victor Shyba
dbe3ace812 pylint 2022-12-15 21:49:48 -03:00
Victor Shyba
636b7ed476 tests: enable logging lbry.torrent when verbosity changes 2022-12-15 20:24:37 -03:00
Victor Shyba
64aad14ba6 pick file from file name, fallback to largest 2022-12-15 20:24:37 -03:00
Victor Shyba
9d869820a3 test picking file from claim file name 2022-12-15 20:24:37 -03:00
Victor Shyba
5cf63fa03e restore torrent rowid on restart 2022-12-15 20:24:37 -03:00
Victor Shyba
9dc617f8e0 use a non-default port for streaming test so it can run with a live instance 2022-12-15 20:24:37 -03:00
Victor Shyba
2bea8f58e0 fix duplicated file entry on startup 2022-12-15 20:24:37 -03:00
Victor Shyba
8ce53069ad fix filtering for fields missing on torrents 2022-12-15 20:24:36 -03:00
Victor Shyba
39da718c28 remove dead code 2022-12-15 20:24:36 -03:00
Victor Shyba
1041a19467 deserialize torrent fields properly 2022-12-15 20:24:36 -03:00
Victor Shyba
651348f6e0 fix status for completed torrents 2022-12-15 20:24:36 -03:00
Victor Shyba
31c6e0e835 fix stream_name for torrent on json encoder 2022-12-15 20:24:36 -03:00
Victor Shyba
732b7e79d7 fix suggested_file_name for torrent on json encoder 2022-12-15 20:24:36 -03:00
Victor Shyba
2bf0ca6441 fix mime_type for torrent on json encoder 2022-12-15 20:24:36 -03:00
Victor Shyba
77d2c81a30 fix missing added_on for torrent files 2022-12-15 20:24:36 -03:00
Victor Shyba
b39971bf05 fix tests for changed error msg 2022-12-15 20:24:36 -03:00
Victor Shyba
af0ad417df generalize DownloadSDTimeout to DownloadMetadata timeout + fix usages 2022-12-15 20:24:36 -03:00
Victor Shyba
c8f25027fc start the stream after adding 2022-12-15 20:24:36 -03:00
Victor Shyba
e862c99f6c generate 3 files, check that streamed is the largest, add method to list files 2022-12-15 20:24:36 -03:00
Victor Shyba
f650e8f07e test and bugfixes for streaming multifile in a subfolder case 2022-12-15 20:24:36 -03:00
Victor Shyba
7c7e18534e refactor add_torrent, lints 2022-12-15 20:24:36 -03:00
Victor Shyba
37adc59b37 add tests for streaming, fix bugs 2022-12-15 20:24:36 -03:00
Victor Shyba
7746ded9b6 add test case for restart, fix torrent file update 2022-12-15 20:24:36 -03:00
Victor Shyba
dd103d0f95 save file-torrent association for file list 2022-12-15 20:24:36 -03:00
Victor Shyba
7410991123 save resume data on stop, remove/replace deprecated calls 2022-12-15 20:24:36 -03:00
Victor Shyba
df680e7225 fix tests and off by one error 2022-12-15 20:24:36 -03:00
Victor Shyba
b2f82070b0 piece prioritization and deadlines 2022-12-15 20:24:36 -03:00
Victor Shyba
b3bff39eea stream from torrent pieces, holding the response until the piece is completed 2022-12-15 20:24:36 -03:00
Victor Shyba
6efd4dd19a fix save path, fix prio, update deprecated calls 2022-12-15 20:24:36 -03:00
Victor Shyba
8ee5cee8c3 update flags, set sequential as a flag 2022-12-15 20:24:36 -03:00
Victor Shyba
7828041786 stream type independent stream_url 2022-12-15 20:24:36 -03:00
Victor Shyba
8212e73c2e stream torrent from file 2022-12-15 20:24:36 -03:00
Victor Shyba
63784622e9 locate stream for streaming API by identifier 2022-12-15 20:24:36 -03:00
16 changed files with 410 additions and 173 deletions

View file

@ -81,8 +81,8 @@ Code | Name | Message
511 | CorruptBlob | Blobs is corrupted.
520 | BlobFailedEncryption | Failed to encrypt blob.
531 | DownloadCancelled | Download was canceled.
532 | DownloadSDTimeout | Failed to download sd blob {download} within timeout.
533 | DownloadDataTimeout | Failed to download data blobs for sd hash {download} within timeout.
532 | DownloadMetadataTimeout | Failed to download metadata for {download} within timeout.
533 | DownloadDataTimeout | Failed to download data blobs for {download} within timeout.
534 | InvalidStreamDescriptor | {message}
535 | InvalidData | {message}
536 | InvalidBlobHash | {message}

View file

@ -411,18 +411,18 @@ class DownloadCancelledError(BlobError):
super().__init__("Download was canceled.")
class DownloadSDTimeoutError(BlobError):
class DownloadMetadataTimeoutError(BlobError):
def __init__(self, download):
self.download = download
super().__init__(f"Failed to download sd blob {download} within timeout.")
super().__init__(f"Failed to download metadata for {download} within timeout.")
class DownloadDataTimeoutError(BlobError):
def __init__(self, download):
self.download = download
super().__init__(f"Failed to download data blobs for sd hash {download} within timeout.")
super().__init__(f"Failed to download data blobs for {download} within timeout.")
class InvalidStreamDescriptorError(BlobError):

View file

@ -36,7 +36,7 @@ from lbry.blob.blob_file import is_valid_blobhash, BlobBuffer
from lbry.blob_exchange.downloader import download_blob
from lbry.dht.peer import make_kademlia_peer
from lbry.error import (
DownloadSDTimeoutError, ComponentsNotStartedError, ComponentStartConditionNotMetError,
DownloadMetadataTimeoutError, ComponentsNotStartedError, ComponentStartConditionNotMetError,
CommandDoesNotExistError, BaseError, WalletNotFoundError, WalletAlreadyLoadedError, WalletAlreadyExistsError,
ConflictingInputValueError, AlreadyPurchasedError, PrivateKeyNotFoundError, InputStringIsBlankError,
InputValueError
@ -639,7 +639,7 @@ class Daemon(metaclass=JSONRPCServerType):
stream = await self.jsonrpc_get(uri)
if isinstance(stream, dict):
raise web.HTTPServerError(text=stream['error'])
raise web.HTTPFound(f"/stream/{stream.sd_hash}")
raise web.HTTPFound(f"/stream/{stream.identifier}")
async def handle_stream_range_request(self, request: web.Request):
try:
@ -658,12 +658,13 @@ class Daemon(metaclass=JSONRPCServerType):
log.debug("finished handling /stream range request")
async def _handle_stream_range_request(self, request: web.Request):
sd_hash = request.path.split("/stream/")[1]
identifier = request.path.split("/stream/")[1]
if not self.file_manager.started.is_set():
await self.file_manager.started.wait()
if sd_hash not in self.file_manager.streams:
stream = self.file_manager.get_filtered(identifier=identifier)
if not stream:
return web.HTTPNotFound()
return await self.file_manager.stream_partial_content(request, sd_hash)
return await self.file_manager.stream_partial_content(request, identifier)
async def _process_rpc_call(self, data):
args = data.get('params', {})
@ -1139,7 +1140,7 @@ class Daemon(metaclass=JSONRPCServerType):
save_file=save_file, wallet=wallet
)
if not stream:
raise DownloadSDTimeoutError(uri)
raise DownloadMetadataTimeoutError(uri)
except Exception as e:
# TODO: use error from lbry.error
log.warning("Error downloading %s: %s", uri, str(e))

View file

@ -285,7 +285,7 @@ class JSONResponseEncoder(JSONEncoder):
else:
total_bytes_lower_bound = total_bytes = managed_stream.torrent_length
result = {
'streaming_url': None,
'streaming_url': managed_stream.stream_url,
'completed': managed_stream.completed,
'file_name': None,
'download_directory': None,
@ -293,10 +293,10 @@ class JSONResponseEncoder(JSONEncoder):
'points_paid': 0.0,
'stopped': not managed_stream.running,
'stream_hash': None,
'stream_name': None,
'suggested_file_name': None,
'stream_name': managed_stream.stream_name,
'suggested_file_name': managed_stream.suggested_file_name,
'sd_hash': None,
'mime_type': None,
'mime_type': managed_stream.mime_type,
'key': None,
'total_bytes_lower_bound': total_bytes_lower_bound,
'total_bytes': total_bytes,
@ -326,12 +326,8 @@ class JSONResponseEncoder(JSONEncoder):
}
if is_stream:
result.update({
'streaming_url': managed_stream.stream_url,
'stream_hash': managed_stream.stream_hash,
'stream_name': managed_stream.stream_name,
'suggested_file_name': managed_stream.suggested_file_name,
'sd_hash': managed_stream.descriptor.sd_hash,
'mime_type': managed_stream.mime_type,
'key': managed_stream.descriptor.key,
'blobs_completed': managed_stream.blobs_completed,
'blobs_in_stream': managed_stream.blobs_in_stream,
@ -340,10 +336,6 @@ class JSONResponseEncoder(JSONEncoder):
'reflector_progress': managed_stream.reflector_progress,
'uploading_to_reflector': managed_stream.uploading_to_reflector
})
else:
result.update({
'streaming_url': f'file://{managed_stream.full_path}',
})
if output_exists:
result.update({
'file_name': managed_stream.file_name,

View file

@ -5,6 +5,7 @@ import typing
import asyncio
import binascii
import time
from operator import itemgetter
from typing import Optional
from lbry.wallet import SQLiteMixin
from lbry.conf import Config
@ -211,7 +212,7 @@ def delete_torrent(transaction: sqlite3.Connection, bt_infohash: str):
transaction.execute("delete from torrent where bt_infohash=?", (bt_infohash,)).fetchall()
def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typing.Optional[str],
def store_file(transaction: sqlite3.Connection, identifier_value: str, file_name: typing.Optional[str],
download_directory: typing.Optional[str], data_payment_rate: float, status: str,
content_fee: typing.Optional[Transaction], added_on: typing.Optional[int] = None) -> int:
if not file_name and not download_directory:
@ -219,15 +220,18 @@ def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typ
else:
encoded_file_name = binascii.hexlify(file_name.encode()).decode()
encoded_download_dir = binascii.hexlify(download_directory.encode()).decode()
is_torrent = len(identifier_value) == 40
time_added = added_on or int(time.time())
transaction.execute(
"insert or replace into file values (?, NULL, ?, ?, ?, ?, ?, ?, ?)",
(stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status,
f"insert or replace into file values ({'NULL, ?' if is_torrent else '?, NULL'}, ?, ?, ?, ?, ?, ?, ?)",
(identifier_value, encoded_file_name, encoded_download_dir, data_payment_rate, status,
1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0,
None if not content_fee else binascii.hexlify(content_fee.raw).decode(), time_added)
).fetchall()
return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0]
return transaction.execute(
f"select rowid from file where {'bt_infohash' if is_torrent else 'stream_hash'}=?",
(identifier_value, )).fetchone()[0]
class SQLiteStorage(SQLiteMixin):
@ -632,6 +636,13 @@ class SQLiteStorage(SQLiteMixin):
def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]:
return self.db.run(get_all_lbry_files)
async def get_all_torrent_files(self) -> typing.List[typing.Dict]:
def _get_all_torrent_files(transaction):
cursor = transaction.execute(
"select file.ROWID as rowid, * from file join torrent on file.bt_infohash=torrent.bt_infohash")
return map(lambda row: dict(zip(list(map(itemgetter(0), cursor.description)), row)), cursor.fetchall())
return list(await self.db.run(_get_all_torrent_files))
def change_file_status(self, stream_hash: str, new_status: str):
log.debug("update file status %s -> %s", stream_hash, new_status)
return self.db.execute_fetchall("update file set status=? where stream_hash=?", (new_status, stream_hash))
@ -872,15 +883,20 @@ class SQLiteStorage(SQLiteMixin):
if stream_hash in self.content_claim_callbacks:
await self.content_claim_callbacks[stream_hash]()
async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name):
def _save_torrent(transaction):
async def add_torrent(self, bt_infohash, length, name):
def _save_torrent(transaction, bt_infohash, length, name):
transaction.execute(
"insert or replace into torrent values (?, NULL, ?, ?)", (bt_infohash, length, name)
).fetchall()
return await self.db.run(_save_torrent, bt_infohash, length, name)
async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name):
def _save_torrent_claim(transaction):
transaction.execute(
"insert or replace into content_claim values (NULL, ?, ?)", (bt_infohash, claim_outpoint)
).fetchall()
await self.db.run(_save_torrent)
await self.add_torrent(bt_infohash, length, name)
await self.db.run(_save_torrent_claim)
# update corresponding ManagedEncryptedFileDownloader object
if bt_infohash in self.content_claim_callbacks:
await self.content_claim_callbacks[bt_infohash]()
@ -898,7 +914,7 @@ class SQLiteStorage(SQLiteMixin):
async def get_content_claim_for_torrent(self, bt_infohash):
claims = await self.db.run(get_claims_from_torrent_info_hashes, [bt_infohash])
return claims[bt_infohash].as_dict() if claims else None
return claims[bt_infohash] if claims else None
# # # # # # # # # reflector functions # # # # # # # # #

View file

@ -3,7 +3,7 @@ import logging
import typing
from typing import Optional
from aiohttp.web import Request
from lbry.error import ResolveError, DownloadSDTimeoutError, InsufficientFundsError
from lbry.error import ResolveError, DownloadMetadataTimeoutError, InsufficientFundsError
from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError
from lbry.error import InvalidStreamURLError
from lbry.stream.managed_stream import ManagedStream
@ -139,7 +139,7 @@ class FileManager:
existing[0].identifier, outpoint, existing[0].torrent_length, existing[0].torrent_name
)
claim_info = await self.storage.get_content_claim_for_torrent(existing[0].identifier)
existing[0].set_claim(claim_info, claim)
existing[0].set_claim(claim_info.as_dict() if claim_info else None, claim)
else:
await self.storage.save_content_claim(
existing[0].stream_hash, outpoint
@ -242,15 +242,15 @@ class FileManager:
stream.identifier, outpoint, stream.torrent_length, stream.torrent_name
)
claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier)
stream.set_claim(claim_info, claim)
stream.set_claim(claim_info.as_dict() if claim_info else None, claim)
if save_file:
await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download))
return stream
except asyncio.TimeoutError:
error = DownloadDataTimeoutError(stream.sd_hash)
error = DownloadDataTimeoutError(stream.identifier)
raise error
except Exception as err: # forgive data timeout, don't delete stream
expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
expected = (DownloadMetadataTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
KeyFeeAboveMaxAllowedError, ResolveError, InvalidStreamURLError)
if isinstance(err, expected):
log.warning("Failed to download %s: %s", uri, str(err))
@ -290,19 +290,24 @@ class FileManager:
)
)
async def stream_partial_content(self, request: Request, sd_hash: str):
return await self.source_managers['stream'].stream_partial_content(request, sd_hash)
async def stream_partial_content(self, request: Request, identifier: str):
for source_manager in self.source_managers.values():
if source_manager.get_filtered(identifier=identifier):
return await source_manager.stream_partial_content(request, identifier)
def get_filtered(self, *args, **kwargs) -> typing.List[ManagedDownloadSource]:
"""
Get a list of filtered and sorted ManagedStream objects
:param sort_by: field to sort by
:param reverse: reverse sorting
:param comparison: comparison operator used for filtering
:param search_by: fields and values to filter by
Get a list of filtered and sorted ManagedDownloadSource objects from all available source managers
"""
return sum((manager.get_filtered(*args, **kwargs) for manager in self.source_managers.values()), [])
result = last_error = None
for manager in self.source_managers.values():
try:
result = (result or []) + manager.get_filtered(*args, **kwargs)
except ValueError as error:
last_error = error
if result is not None:
return result
raise last_error
async def delete(self, source: ManagedDownloadSource, delete_file=False):
for manager in self.source_managers.values():

View file

@ -1,5 +1,6 @@
import os
import asyncio
import time
import typing
import logging
import binascii
@ -43,7 +44,7 @@ class ManagedDownloadSource:
self.rowid = rowid
self.content_fee = content_fee
self.purchase_receipt = None
self._added_on = added_on
self._added_on = added_on or int(time.time())
self.analytics_manager = analytics_manager
self.downloader = None
@ -91,6 +92,14 @@ class ManagedDownloadSource:
def added_on(self) -> Optional[int]:
return self._added_on
@property
def suggested_file_name(self):
return self._file_name
@property
def stream_name(self):
return self.suggested_file_name
@property
def status(self) -> str:
return self._status
@ -99,9 +108,9 @@ class ManagedDownloadSource:
def completed(self):
raise NotImplementedError()
# @property
# def stream_url(self):
# return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}
@property
def stream_url(self):
return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.identifier}"
@property
def finished(self) -> bool:

View file

@ -23,6 +23,7 @@ COMPARISON_OPERATORS = {
class SourceManager:
filter_fields = {
'identifier',
'rowid',
'status',
'file_name',
@ -83,6 +84,7 @@ class SourceManager:
raise NotImplementedError()
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
await self.storage.delete_torrent(source.identifier)
self.remove(source)
if delete_file and source.output_file_exists:
os.remove(source.full_path)

View file

@ -4,7 +4,7 @@ import logging
import binascii
from lbry.dht.node import get_kademlia_peers_from_hosts
from lbry.error import DownloadSDTimeoutError
from lbry.error import DownloadMetadataTimeoutError
from lbry.utils import lru_cache_concurrent
from lbry.stream.descriptor import StreamDescriptor
from lbry.blob_exchange.downloader import BlobDownloader
@ -77,7 +77,7 @@ class StreamDownloader:
log.info("downloaded sd blob %s", self.sd_hash)
self.time_to_descriptor = self.loop.time() - now
except asyncio.TimeoutError:
raise DownloadSDTimeoutError(self.sd_hash)
raise DownloadMetadataTimeoutError(self.sd_hash)
# parse the descriptor
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(

View file

@ -5,7 +5,7 @@ import typing
import logging
from typing import Optional
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
from lbry.error import DownloadSDTimeoutError
from lbry.error import DownloadMetadataTimeoutError
from lbry.schema.mime_types import guess_media_type
from lbry.stream.downloader import StreamDownloader
from lbry.stream.descriptor import StreamDescriptor, sanitize_file_name
@ -104,10 +104,6 @@ class ManagedStream(ManagedDownloadSource):
def completed(self):
return self.written_bytes >= self.descriptor.lower_bound_decrypted_length()
@property
def stream_url(self):
return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}"
async def update_status(self, status: str):
assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED]
self._status = status
@ -164,7 +160,7 @@ class ManagedStream(ManagedDownloadSource):
await asyncio.wait_for(self.downloader.start(), timeout)
except asyncio.TimeoutError:
self._running.clear()
raise DownloadSDTimeoutError(self.sd_hash)
raise DownloadMetadataTimeoutError(self.identifier)
if self.delayed_stop_task and not self.delayed_stop_task.done():
self.delayed_stop_task.cancel()

View file

@ -32,7 +32,7 @@ def path_or_none(encoded_path) -> Optional[str]:
class StreamManager(SourceManager):
_sources: typing.Dict[str, ManagedStream]
filter_fields = SourceManager.filter_fields
filter_fields = set(SourceManager.filter_fields)
filter_fields.update({
'sd_hash',
'stream_hash',

View file

@ -394,6 +394,7 @@ class CommandTestCase(IntegrationTestCase):
logging.getLogger('lbry.blob_exchange').setLevel(self.VERBOSITY)
logging.getLogger('lbry.daemon').setLevel(self.VERBOSITY)
logging.getLogger('lbry.stream').setLevel(self.VERBOSITY)
logging.getLogger('lbry.torrent').setLevel(self.VERBOSITY)
logging.getLogger('lbry.wallet').setLevel(self.VERBOSITY)
await super().asyncSetUp()

View file

@ -3,9 +3,8 @@ import binascii
import os
import logging
import random
from hashlib import sha1
from tempfile import mkdtemp
from typing import Optional
from typing import Optional, Tuple, Dict
import libtorrent
@ -14,6 +13,8 @@ log = logging.getLogger(__name__)
DEFAULT_FLAGS = ( # fixme: somehow the logic here is inverted?
libtorrent.add_torrent_params_flags_t.flag_auto_managed
| libtorrent.add_torrent_params_flags_t.flag_update_subscribe
| libtorrent.add_torrent_params_flags_t.flag_sequential_download
| libtorrent.add_torrent_params_flags_t.flag_paused
)
@ -22,66 +23,102 @@ class TorrentHandle:
self._loop = loop
self._executor = executor
self._handle: libtorrent.torrent_handle = handle
self.started = asyncio.Event(loop=loop)
self.finished = asyncio.Event(loop=loop)
self.metadata_completed = asyncio.Event(loop=loop)
self.size = 0
self.size = handle.status().total_wanted
self.total_wanted_done = 0
self.name = ''
self.tasks = []
self.torrent_file: Optional[libtorrent.file_storage] = None
self._torrent_info: libtorrent.torrent_info = handle.torrent_file()
self._base_path = None
self._handle.set_sequential_download(1)
@property
def largest_file(self) -> Optional[str]:
if not self.torrent_file:
def torrent_file(self) -> Optional[libtorrent.file_storage]:
return self._torrent_info.files()
def full_path_at(self, file_num) -> Optional[str]:
if self.torrent_file is None:
return None
index = self.largest_file_index
return os.path.join(self._base_path, self.torrent_file.at(index).path)
return os.path.join(self.save_path, self.torrent_file.file_path(file_num))
def size_at(self, file_num) -> Optional[int]:
if self.torrent_file is not None:
return self.torrent_file.file_size(file_num)
@property
def largest_file_index(self):
largest_size, index = 0, 0
def save_path(self) -> Optional[str]:
if not self._base_path:
self._base_path = self._handle.status().save_path
return self._base_path
def index_from_name(self, file_name):
for file_num in range(self.torrent_file.num_files()):
if self.torrent_file.file_size(file_num) > largest_size:
largest_size = self.torrent_file.file_size(file_num)
index = file_num
return index
if '.pad' in self.torrent_file.file_path(file_num):
continue # ignore padding files
if file_name == os.path.basename(self.full_path_at(file_num)):
return file_num
def stop_tasks(self):
self._handle.save_resume_data()
while self.tasks:
self.tasks.pop().cancel()
def byte_range_to_piece_range(
self, file_index, start_offset, end_offset) -> Tuple[libtorrent.peer_request, libtorrent.peer_request]:
start_piece = self._torrent_info.map_file(file_index, start_offset, 0)
end_piece = self._torrent_info.map_file(file_index, end_offset, 0)
return start_piece, end_piece
async def stream_range_as_completed(self, file_name, start, end):
file_index = self.index_from_name(file_name)
if file_index is None:
raise ValueError(f"Attempt to stream from invalid file. Expected name: {file_name}")
first_piece, final_piece = self.byte_range_to_piece_range(file_index, start, end)
start_piece_offset = first_piece.start
piece_size = self._torrent_info.piece_length()
log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d, piece size: %d): %s",
first_piece.piece, final_piece.piece, start, end, piece_size, self.name)
self.prioritize(file_index, start, end)
for piece_index in range(first_piece.piece, final_piece.piece + 1):
while not self._handle.have_piece(piece_index):
log.info("Waiting for piece %d: %s", piece_index, self.name)
self._handle.set_piece_deadline(piece_index, 0)
await asyncio.sleep(0.2)
log.info("Streaming piece offset %d / %d for torrent %s", piece_index, final_piece.piece, self.name)
yield piece_size - start_piece_offset
def _show_status(self):
# fixme: cleanup
if not self._handle.is_valid():
return
status = self._handle.status()
self._base_path = status.save_path
if status.has_metadata:
self.size = status.total_wanted
self.total_wanted_done = status.total_wanted_done
self.name = status.name
if not self.metadata_completed.is_set():
self.metadata_completed.set()
self._torrent_info = self._handle.torrent_file()
log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name)
self.torrent_file = self._handle.get_torrent_info().files()
self._base_path = status.save_path
first_piece = self.torrent_file.at(self.largest_file_index).offset
if not self.started.is_set():
if self._handle.have_piece(first_piece):
self.started.set()
else:
# prioritize it
self._handle.set_piece_deadline(first_piece, 100)
if not status.is_seeding:
log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s',
status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000,
status.num_peers, status.num_seeds, status.state, status.save_path)
elif not self.finished.is_set():
log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s',
status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000,
status.num_peers, status.num_seeds, status.state, status.save_path)
if (status.is_finished or status.is_seeding) and not self.finished.is_set():
self.finished.set()
log.info("Torrent finished: %s", self.name)
def prioritize(self, file_index, start, end, cleanup=False):
first_piece, last_piece = self.byte_range_to_piece_range(file_index, start, end)
priorities = self._handle.get_piece_priorities()
priorities = [0 if cleanup else 1 for _ in priorities]
self._handle.clear_piece_deadlines()
for idx, piece_number in enumerate(range(first_piece.piece, last_piece.piece)):
priorities[piece_number] = 7 - idx if 0 <= idx <= 6 else 1
self._handle.set_piece_deadline(piece_number, idx)
log.debug("Prioritizing pieces for %s: %s", self.name, priorities)
self._handle.prioritize_pieces(priorities)
async def status_loop(self):
while True:
self._show_status()
@ -105,19 +142,21 @@ class TorrentSession:
self._loop = loop
self._executor = executor
self._session: Optional[libtorrent.session] = None
self._handles = {}
self._handles: Dict[str, TorrentHandle] = {}
self.tasks = []
self.wait_start = True
async def add_fake_torrent(self):
def add_peer(self, btih, addr, port):
self._handles[btih]._handle.connect_peer((addr, port))
async def add_fake_torrent(self, file_count=3):
tmpdir = mkdtemp()
info, btih = _create_fake_torrent(tmpdir)
info = _create_fake_torrent(tmpdir, file_count=file_count)
flags = libtorrent.add_torrent_params_flags_t.flag_seed_mode
handle = self._session.add_torrent({
'ti': info, 'save_path': tmpdir, 'flags': flags
})
self._handles[btih] = TorrentHandle(self._loop, self._executor, handle)
return btih
self._handles[str(info.info_hash())] = TorrentHandle(self._loop, self._executor, handle)
return str(info.info_hash())
async def bind(self, interface: str = '0.0.0.0', port: int = 10889):
settings = {
@ -131,15 +170,14 @@ class TorrentSession:
self.tasks.append(self._loop.create_task(self.process_alerts()))
def stop(self):
while self._handles:
self._handles.popitem()[1].stop_tasks()
while self.tasks:
self.tasks.pop().cancel()
self._session.save_state()
self._session.pause()
self._session.stop_dht()
self._session.stop_lsd()
self._session.stop_natpmp()
self._session.stop_upnp()
self._session = None
if self._session:
self._session.save_state()
self._session.pause()
self._session = None
def _pop_alerts(self):
for alert in self._session.pop_alerts():
@ -173,18 +211,23 @@ class TorrentSession:
handle.force_dht_announce()
self._handles[btih] = TorrentHandle(self._loop, self._executor, handle)
def full_path(self, btih):
return self._handles[btih].largest_file
def full_path(self, btih, file_num) -> Optional[str]:
return self._handles[btih].full_path_at(file_num)
def save_path(self, btih):
return self._handles[btih].save_path
def has_torrent(self, btih):
return btih in self._handles
async def add_torrent(self, btih, download_path):
if btih in self._handles:
return await self._handles[btih].metadata_completed.wait()
await self._loop.run_in_executor(
self._executor, self._add_torrent, btih, download_path
)
self._handles[btih].tasks.append(self._loop.create_task(self._handles[btih].status_loop()))
await self._handles[btih].metadata_completed.wait()
if self.wait_start:
# fixme: temporary until we add streaming support, otherwise playback fails!
await self._handles[btih].started.wait()
def remove_torrent(self, btih, remove_files=False):
if btih in self._handles:
@ -197,9 +240,17 @@ class TorrentSession:
handle = self._handles[btih]
await handle.resume()
def get_size(self, btih):
def get_total_size(self, btih):
return self._handles[btih].size
def get_index_from_name(self, btih, file_name):
return self._handles[btih].index_from_name(file_name)
def get_size(self, btih, file_name) -> Optional[int]:
for (path, size) in self.get_files(btih).items():
if os.path.basename(path) == file_name:
return size
def get_name(self, btih):
return self._handles[btih].name
@ -209,23 +260,38 @@ class TorrentSession:
def is_completed(self, btih):
return self._handles[btih].finished.is_set()
def stream_file(self, btih, file_name, start, end):
handle = self._handles[btih]
return handle.stream_range_as_completed(file_name, start, end)
def get_files(self, btih) -> Dict:
handle = self._handles[btih]
return {
self.full_path(btih, file_num): handle.torrent_file.file_size(file_num)
for file_num in range(handle.torrent_file.num_files())
if '.pad' not in handle.torrent_file.file_path(file_num)
}
def get_magnet_uri(btih):
return f"magnet:?xt=urn:btih:{btih}"
def _create_fake_torrent(tmpdir):
# beware, that's just for testing
path = os.path.join(tmpdir, 'tmp')
with open(path, 'wb') as myfile:
size = myfile.write(bytes([random.randint(0, 255) for _ in range(40)]) * 1024)
def _create_fake_torrent(tmpdir, file_count=3, largest_index=1):
# layout: subdir/tmp{0..file_count-1} files. v1+v2. automatic piece size.
# largest_index: which file index {0 ... file_count} will be the largest file
file_storage = libtorrent.file_storage()
file_storage.add_file('tmp', size)
t = libtorrent.create_torrent(file_storage, 0, 4 * 1024 * 1024)
subfolder = os.path.join(tmpdir, "subdir")
os.mkdir(subfolder)
for file_number in range(file_count):
file_name = f"tmp{file_number}"
with open(os.path.join(subfolder, file_name), 'wb') as myfile:
size = myfile.write(
bytes([random.randint(0, 255) for _ in range(10 - abs(file_number - largest_index))]) * 1024)
file_storage.add_file(os.path.join("subdir", file_name), size)
t = libtorrent.create_torrent(file_storage, 0, 0)
libtorrent.set_piece_hashes(t, tmpdir)
info = libtorrent.torrent_info(t.generate())
btih = sha1(info.metadata()).hexdigest()
return info, btih
return libtorrent.torrent_info(t.generate())
async def main():
@ -238,17 +304,16 @@ async def main():
executor = None
session = TorrentSession(asyncio.get_event_loop(), executor)
session2 = TorrentSession(asyncio.get_event_loop(), executor)
await session.bind('localhost', port=4040)
await session2.bind('localhost', port=4041)
btih = await session.add_fake_torrent()
session2._session.add_dht_node(('localhost', 4040))
await session2.add_torrent(btih, "/tmp/down")
await session.bind()
await session.add_torrent(btih, os.path.expanduser("~/Downloads"))
while True:
await asyncio.sleep(100)
session.full_path(btih, 0)
await asyncio.sleep(1)
await session.pause()
executor.shutdown()
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
log = logging.getLogger(__name__)
asyncio.run(main())

View file

@ -1,12 +1,14 @@
import asyncio
import binascii
import logging
import os
import typing
from typing import Optional
from aiohttp.web import Request
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
from lbry.error import DownloadMetadataTimeoutError
from lbry.file.source_manager import SourceManager
from lbry.file.source import ManagedDownloadSource
from lbry.schema.mime_types import guess_media_type
if typing.TYPE_CHECKING:
from lbry.torrent.session import TorrentSession
@ -19,12 +21,6 @@ if typing.TYPE_CHECKING:
log = logging.getLogger(__name__)
def path_or_none(encoded_path) -> Optional[str]:
if not encoded_path:
return
return binascii.unhexlify(encoded_path).decode()
class TorrentSource(ManagedDownloadSource):
STATUS_STOPPED = "stopped"
filter_fields = SourceManager.filter_fields
@ -42,15 +38,55 @@ class TorrentSource(ManagedDownloadSource):
super().__init__(loop, config, storage, identifier, file_name, download_directory, status, claim, download_id,
rowid, content_fee, analytics_manager, added_on)
self.torrent_session = torrent_session
self._suggested_file_name = None
self._full_path = None
@property
def full_path(self) -> Optional[str]:
full_path = self.torrent_session.full_path(self.identifier)
self.download_directory = os.path.dirname(full_path)
return full_path
if not self._full_path:
self._full_path = self.select_path()
self._file_name = os.path.basename(self._full_path)
self.download_directory = self.torrent_session.save_path(self.identifier)
return self._full_path
def select_path(self):
wanted_name = (self.stream_claim_info.claim.stream.source.name or '') if self.stream_claim_info else ''
wanted_index = self.torrent_session.get_index_from_name(self.identifier, wanted_name)
if wanted_index is None:
# maybe warn?
largest = (None, -1)
for (path, size) in self.torrent_session.get_files(self.identifier).items():
largest = (path, size) if size > largest[1] else largest
return largest[0]
else:
return self.torrent_session.full_path(self.identifier, wanted_index or 0)
@property
def suggested_file_name(self):
self._suggested_file_name = self._suggested_file_name or os.path.basename(self.select_path())
return self._suggested_file_name
@property
def mime_type(self) -> Optional[str]:
return guess_media_type(os.path.basename(self.full_path))[0]
async def setup(self, timeout: Optional[float] = None):
try:
metadata_download = self.torrent_session.add_torrent(self.identifier, self.download_directory)
await asyncio.wait_for(metadata_download, timeout, loop=self.loop)
except asyncio.TimeoutError:
self.torrent_session.remove_torrent(btih=self.identifier)
raise DownloadMetadataTimeoutError(self.identifier)
self.download_directory = self.torrent_session.save_path(self.identifier)
self._file_name = os.path.basename(self.full_path)
async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False):
await self.torrent_session.add_torrent(self.identifier, self.download_directory)
await self.setup(timeout)
if not self.rowid:
await self.storage.add_torrent(self.identifier, self.torrent_length, self.torrent_name)
self.rowid = await self.storage.save_downloaded_file(
self.identifier, self.file_name, self.download_directory, 0.0, added_on=self._added_on
)
async def stop(self, finished: bool = False):
await self.torrent_session.remove_torrent(self.identifier)
@ -60,7 +96,11 @@ class TorrentSource(ManagedDownloadSource):
@property
def torrent_length(self):
return self.torrent_session.get_size(self.identifier)
return self.torrent_session.get_total_size(self.identifier)
@property
def stream_length(self):
return self.torrent_session.get_size(self.identifier, self.file_name)
@property
def written_bytes(self):
@ -81,6 +121,56 @@ class TorrentSource(ManagedDownloadSource):
def completed(self):
return self.torrent_session.is_completed(self.identifier)
@property
def status(self):
return self.STATUS_FINISHED if self.completed else self.STATUS_RUNNING
async def stream_file(self, request):
log.info("stream torrent to browser for lbry://%s#%s (btih %s...)", self.claim_name, self.claim_id,
self.identifier[:6])
headers, start, end = self._prepare_range_response_headers(
request.headers.get('range', 'bytes=0-')
)
target = self.suggested_file_name
await self.start()
response = StreamResponse(
status=206,
headers=headers
)
await response.prepare(request)
while not os.path.exists(self.full_path):
async for _ in self.torrent_session.stream_file(self.identifier, target, start, end):
break
with open(self.full_path, 'rb') as infile:
infile.seek(start)
async for read_size in self.torrent_session.stream_file(self.identifier, target, start, end):
if infile.tell() + read_size < end:
await response.write(infile.read(read_size))
else:
await response.write_eof(infile.read(end - infile.tell() + 1))
return response
def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]:
if '=' in get_range:
get_range = get_range.split('=')[1]
start, end = get_range.split('-')
size = self.stream_length
start = int(start)
end = int(end) if end else size - 1
if end >= size or not 0 <= start < size:
raise HTTPRequestRangeNotSatisfiable()
final_size = end - start + 1
headers = {
'Accept-Ranges': 'bytes',
'Content-Range': f'bytes {start}-{end}/{size}',
'Content-Length': str(final_size),
'Content-Type': self.mime_type
}
return headers, start, end
class TorrentManager(SourceManager):
_sources: typing.Dict[str, ManagedDownloadSource]
@ -103,7 +193,7 @@ class TorrentManager(SourceManager):
async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[str],
download_directory: Optional[str], status: str,
claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'],
added_on: Optional[int]):
added_on: Optional[int], **kwargs):
stream = TorrentSource(
self.loop, self.config, self.storage, identifier=bt_infohash, file_name=file_name,
download_directory=download_directory, status=status, claim=claim, rowid=rowid,
@ -111,9 +201,14 @@ class TorrentManager(SourceManager):
torrent_session=self.torrent_session
)
self.add(stream)
await stream.setup()
async def initialize_from_database(self):
pass
for file in await self.storage.get_all_torrent_files():
claim = await self.storage.get_content_claim_for_torrent(file['bt_infohash'])
file['download_directory'] = bytes.fromhex(file['download_directory'] or '').decode() or None
file['file_name'] = bytes.fromhex(file['file_name'] or '').decode() or None
await self._load_stream(claim=claim, **file)
async def start(self):
await super().start()
@ -132,9 +227,6 @@ class TorrentManager(SourceManager):
async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
raise NotImplementedError
# blob_hashes = [source.sd_hash] + [b.blob_hash for b in source.descriptor.blobs[:-1]]
# await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
# await self.storage.delete_stream(source.descriptor)
async def stream_partial_content(self, request: Request, sd_hash: str):
raise NotImplementedError
async def stream_partial_content(self, request: Request, identifier: str):
return await self._sources[identifier].stream_file(request)

View file

@ -1,14 +1,18 @@
import time
import unittest
from unittest import skipIf
import asyncio
import os
from binascii import hexlify
import aiohttp.web
from lbry.schema import Claim
from lbry.stream.background_downloader import BackgroundDownloader
from lbry.stream.descriptor import StreamDescriptor
from lbry.testcase import CommandTestCase
from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT
from lbry.utils import aiohttp_request
from lbry.wallet import Transaction
from lbry.torrent.tracker import UDPTrackerServerProtocol
@ -17,55 +21,104 @@ class FileCommands(CommandTestCase):
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
self.skip_libtorrent = False
self.streaming_port = 60818
self.seeder_session = None
async def add_forever(self):
while True:
for handle in self.client_session._handles.values():
handle._handle.connect_peer(('127.0.0.1', 4040))
await asyncio.sleep(.1)
async def initialize_torrent(self, tx_to_update=None):
if not hasattr(self, 'seeder_session'):
async def initialize_torrent(self, tx_to_update=None, pick_a_file=True, name=None):
assert name is None or tx_to_update is None
if not self.seeder_session:
self.seeder_session = TorrentSession(self.loop, None)
self.addCleanup(self.seeder_session.stop)
await self.seeder_session.bind('127.0.0.1', port=4040)
btih = await self.seeder_session.add_fake_torrent()
btih = await self.seeder_session.add_fake_torrent(file_count=3)
files = [(size, path) for (path, size) in self.seeder_session.get_files(btih).items()]
files.sort()
# picking a file will pick something in the middle, while automatic selection will pick largest
self.expected_size, self.expected_path = files[1] if pick_a_file else files[-1]
address = await self.account.receiving.get_or_create_usable_address()
claim = tx_to_update.outputs[0].claim if tx_to_update else Claim()
claim.stream.update(bt_infohash=btih)
if pick_a_file:
claim.stream.source.name = os.path.basename(self.expected_path)
if not tx_to_update:
claim = Claim()
claim.stream.update(bt_infohash=btih)
tx = await Transaction.claim_create(
'torrent', claim, 1, address, [self.account], self.account
name or 'torrent', claim, 1, address, [self.account], self.account
)
else:
claim = tx_to_update.outputs[0].claim
claim.stream.update(bt_infohash=btih)
tx = await Transaction.claim_update(
tx_to_update.outputs[0], claim, 1, address, [self.account], self.account
)
await tx.sign([self.account])
await self.broadcast_and_confirm(tx)
self.client_session = self.daemon.file_manager.source_managers['torrent'].torrent_session
self.client_session.wait_start = False # fixme: this is super slow on tests
task = asyncio.create_task(self.add_forever())
self.addCleanup(task.cancel)
return tx, btih
async def assert_torrent_streaming_works(self, btih):
url = f'http://{self.daemon.conf.streaming_host}:{self.streaming_port}/stream/{btih}'
if self.daemon.streaming_runner.server is None:
await self.daemon.streaming_runner.setup()
site = aiohttp.web.TCPSite(self.daemon.streaming_runner, self.daemon.conf.streaming_host,
self.streaming_port)
await site.start()
async with aiohttp_request('get', url) as req:
self.assertEqual(req.status, 206)
self.assertEqual(req.headers.get('Content-Type'), 'application/octet-stream')
content_range = req.headers.get('Content-Range')
content_length = int(req.headers.get('Content-Length'))
streamed_bytes = await req.content.read()
expected_size = self.expected_size
self.assertEqual(expected_size, len(streamed_bytes))
self.assertEqual(content_length, len(streamed_bytes))
self.assertEqual(f"bytes 0-{expected_size - 1}/{expected_size}", content_range)
@skipIf(TorrentSession is None, "libtorrent not installed")
async def test_download_torrent(self):
tx, btih = await self.initialize_torrent()
tx, btih = await self.initialize_torrent(pick_a_file=False)
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent')))
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
# second call, see its there and move on
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent')))
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, btih)
self.assertIn(btih, self.client_session._handles)
# stream over streaming API (full range of the largest file)
await self.assert_torrent_streaming_works(btih)
# check json encoder fields for torrent sources
file = (await self.out(self.daemon.jsonrpc_file_list()))['items'][0]
self.assertEqual(btih, file['metadata']['source']['bt_infohash'])
self.assertAlmostEqual(time.time(), file['added_on'], delta=12)
self.assertEqual("application/octet-stream", file['mime_type'])
self.assertEqual(os.path.basename(self.expected_path), file['suggested_file_name'])
self.assertEqual(os.path.basename(self.expected_path), file['stream_name'])
while not file['completed']: # improve that
await asyncio.sleep(0.5)
file = (await self.out(self.daemon.jsonrpc_file_list()))['items'][0]
self.assertTrue(file['completed'])
self.assertGreater(file['total_bytes_lower_bound'], 0)
self.assertEqual(file['total_bytes_lower_bound'], file['total_bytes'])
self.assertEqual(file['total_bytes'], file['written_bytes'])
self.assertEqual('finished', file['status'])
# filter by a field which is missing on torrent
self.assertItemCount(await self.daemon.jsonrpc_file_list(stream_hash="abc"), 0)
tx, new_btih = await self.initialize_torrent(tx)
self.assertNotEqual(btih, new_btih)
# claim now points to another torrent, update to it
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent')))
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih)
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
# restart and verify that only one updated stream was recovered
self.daemon.file_manager.stop()
await self.daemon.file_manager.start()
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih)
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
# check it was saved properly, once
self.assertEqual(1, len(await self.daemon.storage.get_all_torrent_files()))
self.assertIn(new_btih, self.client_session._handles)
self.assertNotIn(btih, self.client_session._handles)
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
@ -73,6 +126,11 @@ class FileCommands(CommandTestCase):
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 0)
self.assertNotIn(new_btih, self.client_session._handles)
await self.initialize_torrent(name='torrent2')
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent2')))
file = (await self.out(self.daemon.jsonrpc_file_list()))['items'][0]
self.assertEqual(os.path.basename(self.expected_path), file['stream_name'])
async def create_streams_in_range(self, *args, **kwargs):
self.stream_claim_ids = []
for i in range(*args, **kwargs):
@ -335,12 +393,12 @@ class FileCommands(CommandTestCase):
await self.server.blob_manager.delete_blobs(all_except_sd)
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True)
self.assertIn('error', resp)
self.assertEqual('Failed to download data blobs for sd hash %s within timeout.' % sd_hash, resp['error'])
self.assertEqual('Failed to download data blobs for %s within timeout.' % sd_hash, resp['error'])
self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='foo'), "data timeout didn't create a file")
await self.server.blob_manager.delete_blobs([sd_hash])
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True)
self.assertIn('error', resp)
self.assertEqual('Failed to download sd blob %s within timeout.' % sd_hash, resp['error'])
self.assertEqual('Failed to download metadata for %s within timeout.' % sd_hash, resp['error'])
async def wait_files_to_complete(self):
while await self.file_list(status='running'):

View file

@ -11,7 +11,7 @@ from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase
from lbry.testcase import get_fake_exchange_rate_manager
from lbry.utils import generate_id
from lbry.error import InsufficientFundsError
from lbry.error import KeyFeeAboveMaxAllowedError, ResolveError, DownloadSDTimeoutError, DownloadDataTimeoutError
from lbry.error import KeyFeeAboveMaxAllowedError, ResolveError, DownloadMetadataTimeoutError, DownloadDataTimeoutError
from lbry.wallet import WalletManager, Wallet, Ledger, Transaction, Input, Output, Database
from lbry.wallet.constants import CENT, NULL_HASH32
from lbry.wallet.network import ClientSession
@ -229,10 +229,10 @@ class TestStreamManager(BlobExchangeTestBase):
self.assertFalse(event['properties']['added_fixed_peers'])
self.assertEqual(event['properties']['connection_failures_count'], 1)
self.assertEqual(
event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout.'
event['properties']['error_message'], f'Failed to download metadata for {self.sd_hash} within timeout.'
)
await self._test_time_to_first_bytes(check_post, DownloadSDTimeoutError, after_setup=after_setup)
await self._test_time_to_first_bytes(check_post, DownloadMetadataTimeoutError, after_setup=after_setup)
async def test_override_fixed_peer_delay_dht_disabled(self):
self.client_config.fixed_peers = [(self.server_from_client.address, self.server_from_client.tcp_port)]
@ -266,18 +266,18 @@ class TestStreamManager(BlobExchangeTestBase):
def check_post(event):
self.assertEqual(event['event'], 'Time To First Bytes')
self.assertEqual(event['properties']['error'], 'DownloadSDTimeoutError')
self.assertEqual(event['properties']['error'], 'DownloadMetadataTimeoutError')
self.assertEqual(event['properties']['tried_peers_count'], 0)
self.assertEqual(event['properties']['active_peer_count'], 0)
self.assertFalse(event['properties']['use_fixed_peers'])
self.assertFalse(event['properties']['added_fixed_peers'])
self.assertIsNone(event['properties']['fixed_peer_delay'])
self.assertEqual(
event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout.'
event['properties']['error_message'], f'Failed to download metadata for {self.sd_hash} within timeout.'
)
start = self.loop.time()
await self._test_time_to_first_bytes(check_post, DownloadSDTimeoutError)
await self._test_time_to_first_bytes(check_post, DownloadMetadataTimeoutError)
duration = self.loop.time() - start
self.assertLessEqual(duration, 5)
self.assertGreaterEqual(duration, 3.0)
@ -387,7 +387,7 @@ class TestStreamManager(BlobExchangeTestBase):
self.server.stop_server()
await self.setup_stream_manager()
await self._test_download_error_analytics_on_start(
DownloadSDTimeoutError, f'Failed to download sd blob {self.sd_hash} within timeout.', timeout=1
DownloadMetadataTimeoutError, f'Failed to download metadata for {self.sd_hash} within timeout.', timeout=1
)
async def test_download_data_timeout(self):
@ -396,7 +396,7 @@ class TestStreamManager(BlobExchangeTestBase):
head_blob_hash = json.loads(sdf.read())['blobs'][0]['blob_hash']
self.server_blob_manager.delete_blob(head_blob_hash)
await self._test_download_error_analytics_on_start(
DownloadDataTimeoutError, f'Failed to download data blobs for sd hash {self.sd_hash} within timeout.', timeout=1
DownloadDataTimeoutError, f'Failed to download data blobs for {self.sd_hash} within timeout.', timeout=1
)
async def test_unexpected_error(self):