forked from LBRYCommunity/lbry-sdk
Compare commits
33 commits
master
...
torrent_st
Author | SHA1 | Date | |
---|---|---|---|
|
dbe3ace812 | ||
|
636b7ed476 | ||
|
64aad14ba6 | ||
|
9d869820a3 | ||
|
5cf63fa03e | ||
|
9dc617f8e0 | ||
|
2bea8f58e0 | ||
|
8ce53069ad | ||
|
39da718c28 | ||
|
1041a19467 | ||
|
651348f6e0 | ||
|
31c6e0e835 | ||
|
732b7e79d7 | ||
|
2bf0ca6441 | ||
|
77d2c81a30 | ||
|
b39971bf05 | ||
|
af0ad417df | ||
|
c8f25027fc | ||
|
e862c99f6c | ||
|
f650e8f07e | ||
|
7c7e18534e | ||
|
37adc59b37 | ||
|
7746ded9b6 | ||
|
dd103d0f95 | ||
|
7410991123 | ||
|
df680e7225 | ||
|
b2f82070b0 | ||
|
b3bff39eea | ||
|
6efd4dd19a | ||
|
8ee5cee8c3 | ||
|
7828041786 | ||
|
8212e73c2e | ||
|
63784622e9 |
34 changed files with 524 additions and 291 deletions
54
.github/workflows/main.yml
vendored
54
.github/workflows/main.yml
vendored
|
@ -1,5 +1,5 @@
|
||||||
name: ci
|
name: ci
|
||||||
on: ["push", "pull_request", "workflow_dispatch"]
|
on: ["push", "pull_request"]
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
|
|
||||||
|
@ -7,12 +7,12 @@ jobs:
|
||||||
name: lint
|
name: lint
|
||||||
runs-on: ubuntu-20.04
|
runs-on: ubuntu-20.04
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v2
|
||||||
- uses: actions/setup-python@v4
|
- uses: actions/setup-python@v1
|
||||||
with:
|
with:
|
||||||
python-version: '3.9'
|
python-version: '3.7'
|
||||||
- name: extract pip cache
|
- name: extract pip cache
|
||||||
uses: actions/cache@v3
|
uses: actions/cache@v2
|
||||||
with:
|
with:
|
||||||
path: ~/.cache/pip
|
path: ~/.cache/pip
|
||||||
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
|
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
|
||||||
|
@ -31,21 +31,21 @@ jobs:
|
||||||
- windows-latest
|
- windows-latest
|
||||||
runs-on: ${{ matrix.os }}
|
runs-on: ${{ matrix.os }}
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v2
|
||||||
- uses: actions/setup-python@v4
|
- uses: actions/setup-python@v1
|
||||||
with:
|
with:
|
||||||
python-version: '3.9'
|
python-version: '3.7'
|
||||||
- name: set pip cache dir
|
- name: set pip cache dir
|
||||||
shell: bash
|
id: pip-cache
|
||||||
run: echo "PIP_CACHE_DIR=$(pip cache dir)" >> $GITHUB_ENV
|
run: echo "::set-output name=dir::$(pip cache dir)"
|
||||||
- name: extract pip cache
|
- name: extract pip cache
|
||||||
uses: actions/cache@v3
|
uses: actions/cache@v2
|
||||||
with:
|
with:
|
||||||
path: ${{ env.PIP_CACHE_DIR }}
|
path: ${{ steps.pip-cache.outputs.dir }}
|
||||||
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
|
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
|
||||||
restore-keys: ${{ runner.os }}-pip-
|
restore-keys: ${{ runner.os }}-pip-
|
||||||
- id: os-name
|
- id: os-name
|
||||||
uses: ASzc/change-string-case-action@v5
|
uses: ASzc/change-string-case-action@v1
|
||||||
with:
|
with:
|
||||||
string: ${{ runner.os }}
|
string: ${{ runner.os }}
|
||||||
- run: python -m pip install --user --upgrade pip wheel
|
- run: python -m pip install --user --upgrade pip wheel
|
||||||
|
@ -93,16 +93,16 @@ jobs:
|
||||||
uses: elastic/elastic-github-actions/elasticsearch@master
|
uses: elastic/elastic-github-actions/elasticsearch@master
|
||||||
with:
|
with:
|
||||||
stack-version: 7.12.1
|
stack-version: 7.12.1
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v2
|
||||||
- uses: actions/setup-python@v4
|
- uses: actions/setup-python@v1
|
||||||
with:
|
with:
|
||||||
python-version: '3.9'
|
python-version: '3.7'
|
||||||
- if: matrix.test == 'other'
|
- if: matrix.test == 'other'
|
||||||
run: |
|
run: |
|
||||||
sudo apt-get update
|
sudo apt-get update
|
||||||
sudo apt-get install -y --no-install-recommends ffmpeg
|
sudo apt-get install -y --no-install-recommends ffmpeg
|
||||||
- name: extract pip cache
|
- name: extract pip cache
|
||||||
uses: actions/cache@v3
|
uses: actions/cache@v2
|
||||||
with:
|
with:
|
||||||
path: ./.tox
|
path: ./.tox
|
||||||
key: tox-integration-${{ matrix.test }}-${{ hashFiles('setup.py') }}
|
key: tox-integration-${{ matrix.test }}-${{ hashFiles('setup.py') }}
|
||||||
|
@ -138,26 +138,26 @@ jobs:
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
os:
|
os:
|
||||||
- ubuntu-20.04
|
- ubuntu-18.04
|
||||||
- macos-latest
|
- macos-latest
|
||||||
- windows-latest
|
- windows-latest
|
||||||
runs-on: ${{ matrix.os }}
|
runs-on: ${{ matrix.os }}
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v2
|
||||||
- uses: actions/setup-python@v4
|
- uses: actions/setup-python@v1
|
||||||
with:
|
with:
|
||||||
python-version: '3.9'
|
python-version: '3.7'
|
||||||
- id: os-name
|
- id: os-name
|
||||||
uses: ASzc/change-string-case-action@v5
|
uses: ASzc/change-string-case-action@v1
|
||||||
with:
|
with:
|
||||||
string: ${{ runner.os }}
|
string: ${{ runner.os }}
|
||||||
- name: set pip cache dir
|
- name: set pip cache dir
|
||||||
shell: bash
|
id: pip-cache
|
||||||
run: echo "PIP_CACHE_DIR=$(pip cache dir)" >> $GITHUB_ENV
|
run: echo "::set-output name=dir::$(pip cache dir)"
|
||||||
- name: extract pip cache
|
- name: extract pip cache
|
||||||
uses: actions/cache@v3
|
uses: actions/cache@v2
|
||||||
with:
|
with:
|
||||||
path: ${{ env.PIP_CACHE_DIR }}
|
path: ${{ steps.pip-cache.outputs.dir }}
|
||||||
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
|
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
|
||||||
restore-keys: ${{ runner.os }}-pip-
|
restore-keys: ${{ runner.os }}-pip-
|
||||||
- run: pip install pyinstaller==4.6
|
- run: pip install pyinstaller==4.6
|
||||||
|
@ -175,7 +175,7 @@ jobs:
|
||||||
pip install pywin32==301
|
pip install pywin32==301
|
||||||
pyinstaller --additional-hooks-dir=scripts/. --icon=icons/lbry256.ico --onefile --name lbrynet lbry/extras/cli.py
|
pyinstaller --additional-hooks-dir=scripts/. --icon=icons/lbry256.ico --onefile --name lbrynet lbry/extras/cli.py
|
||||||
dist/lbrynet.exe --version
|
dist/lbrynet.exe --version
|
||||||
- uses: actions/upload-artifact@v3
|
- uses: actions/upload-artifact@v2
|
||||||
with:
|
with:
|
||||||
name: lbrynet-${{ steps.os-name.outputs.lowercase }}
|
name: lbrynet-${{ steps.os-name.outputs.lowercase }}
|
||||||
path: dist/
|
path: dist/
|
||||||
|
|
|
@ -1,2 +1,2 @@
|
||||||
__version__ = "0.113.0"
|
__version__ = "0.112.0"
|
||||||
version = tuple(map(int, __version__.split('.'))) # pylint: disable=invalid-name
|
version = tuple(map(int, __version__.split('.'))) # pylint: disable=invalid-name
|
||||||
|
|
|
@ -64,7 +64,7 @@ class BlobDownloader:
|
||||||
self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1
|
self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1
|
||||||
|
|
||||||
async def new_peer_or_finished(self):
|
async def new_peer_or_finished(self):
|
||||||
active_tasks = list(self.active_connections.values()) + [asyncio.create_task(asyncio.sleep(1))]
|
active_tasks = list(self.active_connections.values()) + [asyncio.sleep(1)]
|
||||||
await asyncio.wait(active_tasks, return_when='FIRST_COMPLETED')
|
await asyncio.wait(active_tasks, return_when='FIRST_COMPLETED')
|
||||||
|
|
||||||
def cleanup_active(self):
|
def cleanup_active(self):
|
||||||
|
|
11
lbry/conf.py
11
lbry/conf.py
|
@ -688,9 +688,6 @@ class Config(CLIConfig):
|
||||||
tracker_servers = Servers("BitTorrent-compatible (BEP15) UDP trackers for helping P2P discovery", [
|
tracker_servers = Servers("BitTorrent-compatible (BEP15) UDP trackers for helping P2P discovery", [
|
||||||
('tracker.lbry.com', 9252),
|
('tracker.lbry.com', 9252),
|
||||||
('tracker.lbry.grin.io', 9252),
|
('tracker.lbry.grin.io', 9252),
|
||||||
('tracker.lbry.pigg.es', 9252),
|
|
||||||
('tracker.lizard.technology', 9252),
|
|
||||||
('s1.lbry.network', 9252),
|
|
||||||
])
|
])
|
||||||
|
|
||||||
lbryum_servers = Servers("SPV wallet servers", [
|
lbryum_servers = Servers("SPV wallet servers", [
|
||||||
|
@ -703,20 +700,14 @@ class Config(CLIConfig):
|
||||||
('spv17.lbry.com', 50001),
|
('spv17.lbry.com', 50001),
|
||||||
('spv18.lbry.com', 50001),
|
('spv18.lbry.com', 50001),
|
||||||
('spv19.lbry.com', 50001),
|
('spv19.lbry.com', 50001),
|
||||||
('hub.lbry.grin.io', 50001),
|
|
||||||
('hub.lizard.technology', 50001),
|
|
||||||
('s1.lbry.network', 50001),
|
|
||||||
])
|
])
|
||||||
known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [
|
known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [
|
||||||
('dht.lbry.grin.io', 4444), # Grin
|
('dht.lbry.grin.io', 4444), # Grin
|
||||||
('dht.lbry.madiator.com', 4444), # Madiator
|
('dht.lbry.madiator.com', 4444), # Madiator
|
||||||
('dht.lbry.pigg.es', 4444), # Pigges
|
|
||||||
('lbrynet1.lbry.com', 4444), # US EAST
|
('lbrynet1.lbry.com', 4444), # US EAST
|
||||||
('lbrynet2.lbry.com', 4444), # US WEST
|
('lbrynet2.lbry.com', 4444), # US WEST
|
||||||
('lbrynet3.lbry.com', 4444), # EU
|
('lbrynet3.lbry.com', 4444), # EU
|
||||||
('lbrynet4.lbry.com', 4444), # ASIA
|
('lbrynet4.lbry.com', 4444) # ASIA
|
||||||
('dht.lizard.technology', 4444), # Jack
|
|
||||||
('s2.lbry.network', 4444),
|
|
||||||
])
|
])
|
||||||
|
|
||||||
# blockchain
|
# blockchain
|
||||||
|
|
|
@ -42,6 +42,8 @@ class BlobAnnouncer:
|
||||||
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
|
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
self.announcements_sent_metric.labels(peers=0, error=True).inc()
|
self.announcements_sent_metric.labels(peers=0, error=True).inc()
|
||||||
|
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
|
||||||
|
raise err
|
||||||
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
|
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
|
||||||
|
|
||||||
async def _announce(self, batch_size: typing.Optional[int] = 10):
|
async def _announce(self, batch_size: typing.Optional[int] = 10):
|
||||||
|
|
|
@ -81,8 +81,8 @@ Code | Name | Message
|
||||||
511 | CorruptBlob | Blobs is corrupted.
|
511 | CorruptBlob | Blobs is corrupted.
|
||||||
520 | BlobFailedEncryption | Failed to encrypt blob.
|
520 | BlobFailedEncryption | Failed to encrypt blob.
|
||||||
531 | DownloadCancelled | Download was canceled.
|
531 | DownloadCancelled | Download was canceled.
|
||||||
532 | DownloadSDTimeout | Failed to download sd blob {download} within timeout.
|
532 | DownloadMetadataTimeout | Failed to download metadata for {download} within timeout.
|
||||||
533 | DownloadDataTimeout | Failed to download data blobs for sd hash {download} within timeout.
|
533 | DownloadDataTimeout | Failed to download data blobs for {download} within timeout.
|
||||||
534 | InvalidStreamDescriptor | {message}
|
534 | InvalidStreamDescriptor | {message}
|
||||||
535 | InvalidData | {message}
|
535 | InvalidData | {message}
|
||||||
536 | InvalidBlobHash | {message}
|
536 | InvalidBlobHash | {message}
|
||||||
|
|
|
@ -411,18 +411,18 @@ class DownloadCancelledError(BlobError):
|
||||||
super().__init__("Download was canceled.")
|
super().__init__("Download was canceled.")
|
||||||
|
|
||||||
|
|
||||||
class DownloadSDTimeoutError(BlobError):
|
class DownloadMetadataTimeoutError(BlobError):
|
||||||
|
|
||||||
def __init__(self, download):
|
def __init__(self, download):
|
||||||
self.download = download
|
self.download = download
|
||||||
super().__init__(f"Failed to download sd blob {download} within timeout.")
|
super().__init__(f"Failed to download metadata for {download} within timeout.")
|
||||||
|
|
||||||
|
|
||||||
class DownloadDataTimeoutError(BlobError):
|
class DownloadDataTimeoutError(BlobError):
|
||||||
|
|
||||||
def __init__(self, download):
|
def __init__(self, download):
|
||||||
self.download = download
|
self.download = download
|
||||||
super().__init__(f"Failed to download data blobs for sd hash {download} within timeout.")
|
super().__init__(f"Failed to download data blobs for {download} within timeout.")
|
||||||
|
|
||||||
|
|
||||||
class InvalidStreamDescriptorError(BlobError):
|
class InvalidStreamDescriptorError(BlobError):
|
||||||
|
|
|
@ -37,7 +37,7 @@ class Component(metaclass=ComponentType):
|
||||||
def running(self):
|
def running(self):
|
||||||
return self._running
|
return self._running
|
||||||
|
|
||||||
async def get_status(self): # pylint: disable=no-self-use
|
async def get_status(self):
|
||||||
return
|
return
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
|
|
|
@ -118,7 +118,7 @@ class ComponentManager:
|
||||||
component._setup() for component in stage if not component.running
|
component._setup() for component in stage if not component.running
|
||||||
]
|
]
|
||||||
if needing_start:
|
if needing_start:
|
||||||
await asyncio.wait(map(asyncio.create_task, needing_start))
|
await asyncio.wait(needing_start)
|
||||||
self.started.set()
|
self.started.set()
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
|
@ -131,7 +131,7 @@ class ComponentManager:
|
||||||
component._stop() for component in stage if component.running
|
component._stop() for component in stage if component.running
|
||||||
]
|
]
|
||||||
if needing_stop:
|
if needing_stop:
|
||||||
await asyncio.wait(map(asyncio.create_task, needing_stop))
|
await asyncio.wait(needing_stop)
|
||||||
|
|
||||||
def all_components_running(self, *component_names):
|
def all_components_running(self, *component_names):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -374,7 +374,7 @@ class FileManagerComponent(Component):
|
||||||
log.info('Done setting up file manager')
|
log.info('Done setting up file manager')
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
await self.file_manager.stop()
|
self.file_manager.stop()
|
||||||
|
|
||||||
|
|
||||||
class BackgroundDownloaderComponent(Component):
|
class BackgroundDownloaderComponent(Component):
|
||||||
|
@ -560,6 +560,8 @@ class UPnPComponent(Component):
|
||||||
self.upnp = await UPnP.discover(loop=self.component_manager.loop)
|
self.upnp = await UPnP.discover(loop=self.component_manager.loop)
|
||||||
log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string)
|
log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
|
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
|
||||||
|
raise
|
||||||
log.warning("upnp discovery failed: %s", err)
|
log.warning("upnp discovery failed: %s", err)
|
||||||
self.upnp = None
|
self.upnp = None
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ from lbry.blob.blob_file import is_valid_blobhash, BlobBuffer
|
||||||
from lbry.blob_exchange.downloader import download_blob
|
from lbry.blob_exchange.downloader import download_blob
|
||||||
from lbry.dht.peer import make_kademlia_peer
|
from lbry.dht.peer import make_kademlia_peer
|
||||||
from lbry.error import (
|
from lbry.error import (
|
||||||
DownloadSDTimeoutError, ComponentsNotStartedError, ComponentStartConditionNotMetError,
|
DownloadMetadataTimeoutError, ComponentsNotStartedError, ComponentStartConditionNotMetError,
|
||||||
CommandDoesNotExistError, BaseError, WalletNotFoundError, WalletAlreadyLoadedError, WalletAlreadyExistsError,
|
CommandDoesNotExistError, BaseError, WalletNotFoundError, WalletAlreadyLoadedError, WalletAlreadyExistsError,
|
||||||
ConflictingInputValueError, AlreadyPurchasedError, PrivateKeyNotFoundError, InputStringIsBlankError,
|
ConflictingInputValueError, AlreadyPurchasedError, PrivateKeyNotFoundError, InputStringIsBlankError,
|
||||||
InputValueError
|
InputValueError
|
||||||
|
@ -614,8 +614,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
content_type='application/json'
|
content_type='application/json'
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
async def handle_metrics_get_request(self, request: web.Request):
|
||||||
async def handle_metrics_get_request(request: web.Request):
|
|
||||||
try:
|
try:
|
||||||
return web.Response(
|
return web.Response(
|
||||||
text=prom_generate_latest().decode(),
|
text=prom_generate_latest().decode(),
|
||||||
|
@ -640,7 +639,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
stream = await self.jsonrpc_get(uri)
|
stream = await self.jsonrpc_get(uri)
|
||||||
if isinstance(stream, dict):
|
if isinstance(stream, dict):
|
||||||
raise web.HTTPServerError(text=stream['error'])
|
raise web.HTTPServerError(text=stream['error'])
|
||||||
raise web.HTTPFound(f"/stream/{stream.sd_hash}")
|
raise web.HTTPFound(f"/stream/{stream.identifier}")
|
||||||
|
|
||||||
async def handle_stream_range_request(self, request: web.Request):
|
async def handle_stream_range_request(self, request: web.Request):
|
||||||
try:
|
try:
|
||||||
|
@ -659,12 +658,13 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
log.debug("finished handling /stream range request")
|
log.debug("finished handling /stream range request")
|
||||||
|
|
||||||
async def _handle_stream_range_request(self, request: web.Request):
|
async def _handle_stream_range_request(self, request: web.Request):
|
||||||
sd_hash = request.path.split("/stream/")[1]
|
identifier = request.path.split("/stream/")[1]
|
||||||
if not self.file_manager.started.is_set():
|
if not self.file_manager.started.is_set():
|
||||||
await self.file_manager.started.wait()
|
await self.file_manager.started.wait()
|
||||||
if sd_hash not in self.file_manager.streams:
|
stream = self.file_manager.get_filtered(identifier=identifier)
|
||||||
|
if not stream:
|
||||||
return web.HTTPNotFound()
|
return web.HTTPNotFound()
|
||||||
return await self.file_manager.stream_partial_content(request, sd_hash)
|
return await self.file_manager.stream_partial_content(request, identifier)
|
||||||
|
|
||||||
async def _process_rpc_call(self, data):
|
async def _process_rpc_call(self, data):
|
||||||
args = data.get('params', {})
|
args = data.get('params', {})
|
||||||
|
@ -1140,7 +1140,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
save_file=save_file, wallet=wallet
|
save_file=save_file, wallet=wallet
|
||||||
)
|
)
|
||||||
if not stream:
|
if not stream:
|
||||||
raise DownloadSDTimeoutError(uri)
|
raise DownloadMetadataTimeoutError(uri)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# TODO: use error from lbry.error
|
# TODO: use error from lbry.error
|
||||||
log.warning("Error downloading %s: %s", uri, str(e))
|
log.warning("Error downloading %s: %s", uri, str(e))
|
||||||
|
|
|
@ -80,6 +80,8 @@ class MarketFeed:
|
||||||
self.rate = ExchangeRate(self.market, rate, int(time.time()))
|
self.rate = ExchangeRate(self.market, rate, int(time.time()))
|
||||||
self.last_check = time.time()
|
self.last_check = time.time()
|
||||||
return self.rate
|
return self.rate
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
log.warning("Timed out fetching exchange rate from %s.", self.name)
|
log.warning("Timed out fetching exchange rate from %s.", self.name)
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
|
|
|
@ -285,7 +285,7 @@ class JSONResponseEncoder(JSONEncoder):
|
||||||
else:
|
else:
|
||||||
total_bytes_lower_bound = total_bytes = managed_stream.torrent_length
|
total_bytes_lower_bound = total_bytes = managed_stream.torrent_length
|
||||||
result = {
|
result = {
|
||||||
'streaming_url': None,
|
'streaming_url': managed_stream.stream_url,
|
||||||
'completed': managed_stream.completed,
|
'completed': managed_stream.completed,
|
||||||
'file_name': None,
|
'file_name': None,
|
||||||
'download_directory': None,
|
'download_directory': None,
|
||||||
|
@ -293,10 +293,10 @@ class JSONResponseEncoder(JSONEncoder):
|
||||||
'points_paid': 0.0,
|
'points_paid': 0.0,
|
||||||
'stopped': not managed_stream.running,
|
'stopped': not managed_stream.running,
|
||||||
'stream_hash': None,
|
'stream_hash': None,
|
||||||
'stream_name': None,
|
'stream_name': managed_stream.stream_name,
|
||||||
'suggested_file_name': None,
|
'suggested_file_name': managed_stream.suggested_file_name,
|
||||||
'sd_hash': None,
|
'sd_hash': None,
|
||||||
'mime_type': None,
|
'mime_type': managed_stream.mime_type,
|
||||||
'key': None,
|
'key': None,
|
||||||
'total_bytes_lower_bound': total_bytes_lower_bound,
|
'total_bytes_lower_bound': total_bytes_lower_bound,
|
||||||
'total_bytes': total_bytes,
|
'total_bytes': total_bytes,
|
||||||
|
@ -326,12 +326,8 @@ class JSONResponseEncoder(JSONEncoder):
|
||||||
}
|
}
|
||||||
if is_stream:
|
if is_stream:
|
||||||
result.update({
|
result.update({
|
||||||
'streaming_url': managed_stream.stream_url,
|
|
||||||
'stream_hash': managed_stream.stream_hash,
|
'stream_hash': managed_stream.stream_hash,
|
||||||
'stream_name': managed_stream.stream_name,
|
|
||||||
'suggested_file_name': managed_stream.suggested_file_name,
|
|
||||||
'sd_hash': managed_stream.descriptor.sd_hash,
|
'sd_hash': managed_stream.descriptor.sd_hash,
|
||||||
'mime_type': managed_stream.mime_type,
|
|
||||||
'key': managed_stream.descriptor.key,
|
'key': managed_stream.descriptor.key,
|
||||||
'blobs_completed': managed_stream.blobs_completed,
|
'blobs_completed': managed_stream.blobs_completed,
|
||||||
'blobs_in_stream': managed_stream.blobs_in_stream,
|
'blobs_in_stream': managed_stream.blobs_in_stream,
|
||||||
|
@ -340,10 +336,6 @@ class JSONResponseEncoder(JSONEncoder):
|
||||||
'reflector_progress': managed_stream.reflector_progress,
|
'reflector_progress': managed_stream.reflector_progress,
|
||||||
'uploading_to_reflector': managed_stream.uploading_to_reflector
|
'uploading_to_reflector': managed_stream.uploading_to_reflector
|
||||||
})
|
})
|
||||||
else:
|
|
||||||
result.update({
|
|
||||||
'streaming_url': f'file://{managed_stream.full_path}',
|
|
||||||
})
|
|
||||||
if output_exists:
|
if output_exists:
|
||||||
result.update({
|
result.update({
|
||||||
'file_name': managed_stream.file_name,
|
'file_name': managed_stream.file_name,
|
||||||
|
|
|
@ -5,6 +5,7 @@ import typing
|
||||||
import asyncio
|
import asyncio
|
||||||
import binascii
|
import binascii
|
||||||
import time
|
import time
|
||||||
|
from operator import itemgetter
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from lbry.wallet import SQLiteMixin
|
from lbry.wallet import SQLiteMixin
|
||||||
from lbry.conf import Config
|
from lbry.conf import Config
|
||||||
|
@ -211,7 +212,7 @@ def delete_torrent(transaction: sqlite3.Connection, bt_infohash: str):
|
||||||
transaction.execute("delete from torrent where bt_infohash=?", (bt_infohash,)).fetchall()
|
transaction.execute("delete from torrent where bt_infohash=?", (bt_infohash,)).fetchall()
|
||||||
|
|
||||||
|
|
||||||
def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typing.Optional[str],
|
def store_file(transaction: sqlite3.Connection, identifier_value: str, file_name: typing.Optional[str],
|
||||||
download_directory: typing.Optional[str], data_payment_rate: float, status: str,
|
download_directory: typing.Optional[str], data_payment_rate: float, status: str,
|
||||||
content_fee: typing.Optional[Transaction], added_on: typing.Optional[int] = None) -> int:
|
content_fee: typing.Optional[Transaction], added_on: typing.Optional[int] = None) -> int:
|
||||||
if not file_name and not download_directory:
|
if not file_name and not download_directory:
|
||||||
|
@ -219,15 +220,18 @@ def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typ
|
||||||
else:
|
else:
|
||||||
encoded_file_name = binascii.hexlify(file_name.encode()).decode()
|
encoded_file_name = binascii.hexlify(file_name.encode()).decode()
|
||||||
encoded_download_dir = binascii.hexlify(download_directory.encode()).decode()
|
encoded_download_dir = binascii.hexlify(download_directory.encode()).decode()
|
||||||
|
is_torrent = len(identifier_value) == 40
|
||||||
time_added = added_on or int(time.time())
|
time_added = added_on or int(time.time())
|
||||||
transaction.execute(
|
transaction.execute(
|
||||||
"insert or replace into file values (?, NULL, ?, ?, ?, ?, ?, ?, ?)",
|
f"insert or replace into file values ({'NULL, ?' if is_torrent else '?, NULL'}, ?, ?, ?, ?, ?, ?, ?)",
|
||||||
(stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status,
|
(identifier_value, encoded_file_name, encoded_download_dir, data_payment_rate, status,
|
||||||
1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0,
|
1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0,
|
||||||
None if not content_fee else binascii.hexlify(content_fee.raw).decode(), time_added)
|
None if not content_fee else binascii.hexlify(content_fee.raw).decode(), time_added)
|
||||||
).fetchall()
|
).fetchall()
|
||||||
|
|
||||||
return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0]
|
return transaction.execute(
|
||||||
|
f"select rowid from file where {'bt_infohash' if is_torrent else 'stream_hash'}=?",
|
||||||
|
(identifier_value, )).fetchone()[0]
|
||||||
|
|
||||||
|
|
||||||
class SQLiteStorage(SQLiteMixin):
|
class SQLiteStorage(SQLiteMixin):
|
||||||
|
@ -632,6 +636,13 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]:
|
def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]:
|
||||||
return self.db.run(get_all_lbry_files)
|
return self.db.run(get_all_lbry_files)
|
||||||
|
|
||||||
|
async def get_all_torrent_files(self) -> typing.List[typing.Dict]:
|
||||||
|
def _get_all_torrent_files(transaction):
|
||||||
|
cursor = transaction.execute(
|
||||||
|
"select file.ROWID as rowid, * from file join torrent on file.bt_infohash=torrent.bt_infohash")
|
||||||
|
return map(lambda row: dict(zip(list(map(itemgetter(0), cursor.description)), row)), cursor.fetchall())
|
||||||
|
return list(await self.db.run(_get_all_torrent_files))
|
||||||
|
|
||||||
def change_file_status(self, stream_hash: str, new_status: str):
|
def change_file_status(self, stream_hash: str, new_status: str):
|
||||||
log.debug("update file status %s -> %s", stream_hash, new_status)
|
log.debug("update file status %s -> %s", stream_hash, new_status)
|
||||||
return self.db.execute_fetchall("update file set status=? where stream_hash=?", (new_status, stream_hash))
|
return self.db.execute_fetchall("update file set status=? where stream_hash=?", (new_status, stream_hash))
|
||||||
|
@ -793,7 +804,7 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
|
|
||||||
await self.db.run(_save_claims)
|
await self.db.run(_save_claims)
|
||||||
if update_file_callbacks:
|
if update_file_callbacks:
|
||||||
await asyncio.wait(map(asyncio.create_task, update_file_callbacks))
|
await asyncio.wait(update_file_callbacks)
|
||||||
if claim_id_to_supports:
|
if claim_id_to_supports:
|
||||||
await self.save_supports(claim_id_to_supports)
|
await self.save_supports(claim_id_to_supports)
|
||||||
|
|
||||||
|
@ -872,15 +883,20 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
if stream_hash in self.content_claim_callbacks:
|
if stream_hash in self.content_claim_callbacks:
|
||||||
await self.content_claim_callbacks[stream_hash]()
|
await self.content_claim_callbacks[stream_hash]()
|
||||||
|
|
||||||
async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name):
|
async def add_torrent(self, bt_infohash, length, name):
|
||||||
def _save_torrent(transaction):
|
def _save_torrent(transaction, bt_infohash, length, name):
|
||||||
transaction.execute(
|
transaction.execute(
|
||||||
"insert or replace into torrent values (?, NULL, ?, ?)", (bt_infohash, length, name)
|
"insert or replace into torrent values (?, NULL, ?, ?)", (bt_infohash, length, name)
|
||||||
).fetchall()
|
).fetchall()
|
||||||
|
return await self.db.run(_save_torrent, bt_infohash, length, name)
|
||||||
|
|
||||||
|
async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name):
|
||||||
|
def _save_torrent_claim(transaction):
|
||||||
transaction.execute(
|
transaction.execute(
|
||||||
"insert or replace into content_claim values (NULL, ?, ?)", (bt_infohash, claim_outpoint)
|
"insert or replace into content_claim values (NULL, ?, ?)", (bt_infohash, claim_outpoint)
|
||||||
).fetchall()
|
).fetchall()
|
||||||
await self.db.run(_save_torrent)
|
await self.add_torrent(bt_infohash, length, name)
|
||||||
|
await self.db.run(_save_torrent_claim)
|
||||||
# update corresponding ManagedEncryptedFileDownloader object
|
# update corresponding ManagedEncryptedFileDownloader object
|
||||||
if bt_infohash in self.content_claim_callbacks:
|
if bt_infohash in self.content_claim_callbacks:
|
||||||
await self.content_claim_callbacks[bt_infohash]()
|
await self.content_claim_callbacks[bt_infohash]()
|
||||||
|
@ -898,7 +914,7 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
|
|
||||||
async def get_content_claim_for_torrent(self, bt_infohash):
|
async def get_content_claim_for_torrent(self, bt_infohash):
|
||||||
claims = await self.db.run(get_claims_from_torrent_info_hashes, [bt_infohash])
|
claims = await self.db.run(get_claims_from_torrent_info_hashes, [bt_infohash])
|
||||||
return claims[bt_infohash].as_dict() if claims else None
|
return claims[bt_infohash] if claims else None
|
||||||
|
|
||||||
# # # # # # # # # reflector functions # # # # # # # # #
|
# # # # # # # # # reflector functions # # # # # # # # #
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,7 @@ import logging
|
||||||
import typing
|
import typing
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from aiohttp.web import Request
|
from aiohttp.web import Request
|
||||||
from lbry.error import ResolveError, DownloadSDTimeoutError, InsufficientFundsError
|
from lbry.error import ResolveError, DownloadMetadataTimeoutError, InsufficientFundsError
|
||||||
from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError
|
from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError
|
||||||
from lbry.error import InvalidStreamURLError
|
from lbry.error import InvalidStreamURLError
|
||||||
from lbry.stream.managed_stream import ManagedStream
|
from lbry.stream.managed_stream import ManagedStream
|
||||||
|
@ -50,10 +50,10 @@ class FileManager:
|
||||||
await manager.started.wait()
|
await manager.started.wait()
|
||||||
self.started.set()
|
self.started.set()
|
||||||
|
|
||||||
async def stop(self):
|
def stop(self):
|
||||||
for manager in self.source_managers.values():
|
for manager in self.source_managers.values():
|
||||||
# fixme: pop or not?
|
# fixme: pop or not?
|
||||||
await manager.stop()
|
manager.stop()
|
||||||
self.started.clear()
|
self.started.clear()
|
||||||
|
|
||||||
@cache_concurrent
|
@cache_concurrent
|
||||||
|
@ -99,6 +99,8 @@ class FileManager:
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
raise ResolveTimeoutError(uri)
|
raise ResolveTimeoutError(uri)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
|
if isinstance(err, asyncio.CancelledError):
|
||||||
|
raise
|
||||||
log.exception("Unexpected error resolving stream:")
|
log.exception("Unexpected error resolving stream:")
|
||||||
raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
|
raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
|
||||||
if 'error' in resolved_result:
|
if 'error' in resolved_result:
|
||||||
|
@ -137,7 +139,7 @@ class FileManager:
|
||||||
existing[0].identifier, outpoint, existing[0].torrent_length, existing[0].torrent_name
|
existing[0].identifier, outpoint, existing[0].torrent_length, existing[0].torrent_name
|
||||||
)
|
)
|
||||||
claim_info = await self.storage.get_content_claim_for_torrent(existing[0].identifier)
|
claim_info = await self.storage.get_content_claim_for_torrent(existing[0].identifier)
|
||||||
existing[0].set_claim(claim_info, claim)
|
existing[0].set_claim(claim_info.as_dict() if claim_info else None, claim)
|
||||||
else:
|
else:
|
||||||
await self.storage.save_content_claim(
|
await self.storage.save_content_claim(
|
||||||
existing[0].stream_hash, outpoint
|
existing[0].stream_hash, outpoint
|
||||||
|
@ -240,15 +242,15 @@ class FileManager:
|
||||||
stream.identifier, outpoint, stream.torrent_length, stream.torrent_name
|
stream.identifier, outpoint, stream.torrent_length, stream.torrent_name
|
||||||
)
|
)
|
||||||
claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier)
|
claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier)
|
||||||
stream.set_claim(claim_info, claim)
|
stream.set_claim(claim_info.as_dict() if claim_info else None, claim)
|
||||||
if save_file:
|
if save_file:
|
||||||
await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download))
|
await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download))
|
||||||
return stream
|
return stream
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
error = DownloadDataTimeoutError(stream.sd_hash)
|
error = DownloadDataTimeoutError(stream.identifier)
|
||||||
raise error
|
raise error
|
||||||
except (Exception, asyncio.CancelledError) as err: # forgive data timeout, don't delete stream
|
except Exception as err: # forgive data timeout, don't delete stream
|
||||||
expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
|
expected = (DownloadMetadataTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
|
||||||
KeyFeeAboveMaxAllowedError, ResolveError, InvalidStreamURLError)
|
KeyFeeAboveMaxAllowedError, ResolveError, InvalidStreamURLError)
|
||||||
if isinstance(err, expected):
|
if isinstance(err, expected):
|
||||||
log.warning("Failed to download %s: %s", uri, str(err))
|
log.warning("Failed to download %s: %s", uri, str(err))
|
||||||
|
@ -288,19 +290,24 @@ class FileManager:
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
async def stream_partial_content(self, request: Request, sd_hash: str):
|
async def stream_partial_content(self, request: Request, identifier: str):
|
||||||
return await self.source_managers['stream'].stream_partial_content(request, sd_hash)
|
for source_manager in self.source_managers.values():
|
||||||
|
if source_manager.get_filtered(identifier=identifier):
|
||||||
|
return await source_manager.stream_partial_content(request, identifier)
|
||||||
|
|
||||||
def get_filtered(self, *args, **kwargs) -> typing.List[ManagedDownloadSource]:
|
def get_filtered(self, *args, **kwargs) -> typing.List[ManagedDownloadSource]:
|
||||||
"""
|
"""
|
||||||
Get a list of filtered and sorted ManagedStream objects
|
Get a list of filtered and sorted ManagedDownloadSource objects from all available source managers
|
||||||
|
|
||||||
:param sort_by: field to sort by
|
|
||||||
:param reverse: reverse sorting
|
|
||||||
:param comparison: comparison operator used for filtering
|
|
||||||
:param search_by: fields and values to filter by
|
|
||||||
"""
|
"""
|
||||||
return sum((manager.get_filtered(*args, **kwargs) for manager in self.source_managers.values()), [])
|
result = last_error = None
|
||||||
|
for manager in self.source_managers.values():
|
||||||
|
try:
|
||||||
|
result = (result or []) + manager.get_filtered(*args, **kwargs)
|
||||||
|
except ValueError as error:
|
||||||
|
last_error = error
|
||||||
|
if result is not None:
|
||||||
|
return result
|
||||||
|
raise last_error
|
||||||
|
|
||||||
async def delete(self, source: ManagedDownloadSource, delete_file=False):
|
async def delete(self, source: ManagedDownloadSource, delete_file=False):
|
||||||
for manager in self.source_managers.values():
|
for manager in self.source_managers.values():
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import os
|
import os
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import time
|
||||||
import typing
|
import typing
|
||||||
import logging
|
import logging
|
||||||
import binascii
|
import binascii
|
||||||
|
@ -43,7 +44,7 @@ class ManagedDownloadSource:
|
||||||
self.rowid = rowid
|
self.rowid = rowid
|
||||||
self.content_fee = content_fee
|
self.content_fee = content_fee
|
||||||
self.purchase_receipt = None
|
self.purchase_receipt = None
|
||||||
self._added_on = added_on
|
self._added_on = added_on or int(time.time())
|
||||||
self.analytics_manager = analytics_manager
|
self.analytics_manager = analytics_manager
|
||||||
self.downloader = None
|
self.downloader = None
|
||||||
|
|
||||||
|
@ -67,7 +68,7 @@ class ManagedDownloadSource:
|
||||||
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
|
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
async def stop_tasks(self):
|
def stop_tasks(self):
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):
|
def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):
|
||||||
|
@ -91,6 +92,14 @@ class ManagedDownloadSource:
|
||||||
def added_on(self) -> Optional[int]:
|
def added_on(self) -> Optional[int]:
|
||||||
return self._added_on
|
return self._added_on
|
||||||
|
|
||||||
|
@property
|
||||||
|
def suggested_file_name(self):
|
||||||
|
return self._file_name
|
||||||
|
|
||||||
|
@property
|
||||||
|
def stream_name(self):
|
||||||
|
return self.suggested_file_name
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def status(self) -> str:
|
def status(self) -> str:
|
||||||
return self._status
|
return self._status
|
||||||
|
@ -99,9 +108,9 @@ class ManagedDownloadSource:
|
||||||
def completed(self):
|
def completed(self):
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
# @property
|
@property
|
||||||
# def stream_url(self):
|
def stream_url(self):
|
||||||
# return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}
|
return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.identifier}"
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def finished(self) -> bool:
|
def finished(self) -> bool:
|
||||||
|
|
|
@ -23,6 +23,7 @@ COMPARISON_OPERATORS = {
|
||||||
|
|
||||||
class SourceManager:
|
class SourceManager:
|
||||||
filter_fields = {
|
filter_fields = {
|
||||||
|
'identifier',
|
||||||
'rowid',
|
'rowid',
|
||||||
'status',
|
'status',
|
||||||
'file_name',
|
'file_name',
|
||||||
|
@ -59,11 +60,11 @@ class SourceManager:
|
||||||
def add(self, source: ManagedDownloadSource):
|
def add(self, source: ManagedDownloadSource):
|
||||||
self._sources[source.identifier] = source
|
self._sources[source.identifier] = source
|
||||||
|
|
||||||
async def remove(self, source: ManagedDownloadSource):
|
def remove(self, source: ManagedDownloadSource):
|
||||||
if source.identifier not in self._sources:
|
if source.identifier not in self._sources:
|
||||||
return
|
return
|
||||||
self._sources.pop(source.identifier)
|
self._sources.pop(source.identifier)
|
||||||
await source.stop_tasks()
|
source.stop_tasks()
|
||||||
|
|
||||||
async def initialize_from_database(self):
|
async def initialize_from_database(self):
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
@ -72,10 +73,10 @@ class SourceManager:
|
||||||
await self.initialize_from_database()
|
await self.initialize_from_database()
|
||||||
self.started.set()
|
self.started.set()
|
||||||
|
|
||||||
async def stop(self):
|
def stop(self):
|
||||||
while self._sources:
|
while self._sources:
|
||||||
_, source = self._sources.popitem()
|
_, source = self._sources.popitem()
|
||||||
await source.stop_tasks()
|
source.stop_tasks()
|
||||||
self.started.clear()
|
self.started.clear()
|
||||||
|
|
||||||
async def create(self, file_path: str, key: Optional[bytes] = None,
|
async def create(self, file_path: str, key: Optional[bytes] = None,
|
||||||
|
@ -83,7 +84,8 @@ class SourceManager:
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
|
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
|
||||||
await self.remove(source)
|
await self.storage.delete_torrent(source.identifier)
|
||||||
|
self.remove(source)
|
||||||
if delete_file and source.output_file_exists:
|
if delete_file and source.output_file_exists:
|
||||||
os.remove(source.full_path)
|
os.remove(source.full_path)
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,6 @@ class BackgroundDownloader:
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return
|
return
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
log.debug("Cancelled background downloader")
|
|
||||||
raise
|
raise
|
||||||
except Exception:
|
except Exception:
|
||||||
log.error("Unexpected download error on background downloader")
|
log.error("Unexpected download error on background downloader")
|
||||||
|
|
|
@ -4,7 +4,7 @@ import logging
|
||||||
import binascii
|
import binascii
|
||||||
|
|
||||||
from lbry.dht.node import get_kademlia_peers_from_hosts
|
from lbry.dht.node import get_kademlia_peers_from_hosts
|
||||||
from lbry.error import DownloadSDTimeoutError
|
from lbry.error import DownloadMetadataTimeoutError
|
||||||
from lbry.utils import lru_cache_concurrent
|
from lbry.utils import lru_cache_concurrent
|
||||||
from lbry.stream.descriptor import StreamDescriptor
|
from lbry.stream.descriptor import StreamDescriptor
|
||||||
from lbry.blob_exchange.downloader import BlobDownloader
|
from lbry.blob_exchange.downloader import BlobDownloader
|
||||||
|
@ -77,7 +77,7 @@ class StreamDownloader:
|
||||||
log.info("downloaded sd blob %s", self.sd_hash)
|
log.info("downloaded sd blob %s", self.sd_hash)
|
||||||
self.time_to_descriptor = self.loop.time() - now
|
self.time_to_descriptor = self.loop.time() - now
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
raise DownloadSDTimeoutError(self.sd_hash)
|
raise DownloadMetadataTimeoutError(self.sd_hash)
|
||||||
|
|
||||||
# parse the descriptor
|
# parse the descriptor
|
||||||
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(
|
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(
|
||||||
|
|
|
@ -5,7 +5,7 @@ import typing
|
||||||
import logging
|
import logging
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
|
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
|
||||||
from lbry.error import DownloadSDTimeoutError
|
from lbry.error import DownloadMetadataTimeoutError
|
||||||
from lbry.schema.mime_types import guess_media_type
|
from lbry.schema.mime_types import guess_media_type
|
||||||
from lbry.stream.downloader import StreamDownloader
|
from lbry.stream.downloader import StreamDownloader
|
||||||
from lbry.stream.descriptor import StreamDescriptor, sanitize_file_name
|
from lbry.stream.descriptor import StreamDescriptor, sanitize_file_name
|
||||||
|
@ -104,10 +104,6 @@ class ManagedStream(ManagedDownloadSource):
|
||||||
def completed(self):
|
def completed(self):
|
||||||
return self.written_bytes >= self.descriptor.lower_bound_decrypted_length()
|
return self.written_bytes >= self.descriptor.lower_bound_decrypted_length()
|
||||||
|
|
||||||
@property
|
|
||||||
def stream_url(self):
|
|
||||||
return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}"
|
|
||||||
|
|
||||||
async def update_status(self, status: str):
|
async def update_status(self, status: str):
|
||||||
assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED]
|
assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED]
|
||||||
self._status = status
|
self._status = status
|
||||||
|
@ -164,7 +160,7 @@ class ManagedStream(ManagedDownloadSource):
|
||||||
await asyncio.wait_for(self.downloader.start(), timeout)
|
await asyncio.wait_for(self.downloader.start(), timeout)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
self._running.clear()
|
self._running.clear()
|
||||||
raise DownloadSDTimeoutError(self.sd_hash)
|
raise DownloadMetadataTimeoutError(self.identifier)
|
||||||
|
|
||||||
if self.delayed_stop_task and not self.delayed_stop_task.done():
|
if self.delayed_stop_task and not self.delayed_stop_task.done():
|
||||||
self.delayed_stop_task.cancel()
|
self.delayed_stop_task.cancel()
|
||||||
|
@ -191,7 +187,7 @@ class ManagedStream(ManagedDownloadSource):
|
||||||
Stop any running save/stream tasks as well as the downloader and update the status in the database
|
Stop any running save/stream tasks as well as the downloader and update the status in the database
|
||||||
"""
|
"""
|
||||||
|
|
||||||
await self.stop_tasks()
|
self.stop_tasks()
|
||||||
if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING:
|
if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING:
|
||||||
await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED)
|
await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED)
|
||||||
|
|
||||||
|
@ -279,7 +275,7 @@ class ManagedStream(ManagedDownloadSource):
|
||||||
log.info("finished saving file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id,
|
log.info("finished saving file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id,
|
||||||
self.sd_hash[:6], self.full_path)
|
self.sd_hash[:6], self.full_path)
|
||||||
await self.blob_manager.storage.set_saved_file(self.stream_hash)
|
await self.blob_manager.storage.set_saved_file(self.stream_hash)
|
||||||
except (Exception, asyncio.CancelledError) as err:
|
except Exception as err:
|
||||||
if os.path.isfile(output_path):
|
if os.path.isfile(output_path):
|
||||||
log.warning("removing incomplete download %s for %s", output_path, self.sd_hash)
|
log.warning("removing incomplete download %s for %s", output_path, self.sd_hash)
|
||||||
os.remove(output_path)
|
os.remove(output_path)
|
||||||
|
@ -324,13 +320,12 @@ class ManagedStream(ManagedDownloadSource):
|
||||||
await asyncio.wait_for(self.started_writing.wait(), self.config.download_timeout)
|
await asyncio.wait_for(self.started_writing.wait(), self.config.download_timeout)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
log.warning("timeout starting to write data for lbry://%s#%s", self.claim_name, self.claim_id)
|
log.warning("timeout starting to write data for lbry://%s#%s", self.claim_name, self.claim_id)
|
||||||
await self.stop_tasks()
|
self.stop_tasks()
|
||||||
await self.update_status(ManagedStream.STATUS_STOPPED)
|
await self.update_status(ManagedStream.STATUS_STOPPED)
|
||||||
|
|
||||||
async def stop_tasks(self):
|
def stop_tasks(self):
|
||||||
if self.file_output_task and not self.file_output_task.done():
|
if self.file_output_task and not self.file_output_task.done():
|
||||||
self.file_output_task.cancel()
|
self.file_output_task.cancel()
|
||||||
await asyncio.gather(self.file_output_task, return_exceptions=True)
|
|
||||||
self.file_output_task = None
|
self.file_output_task = None
|
||||||
while self.streaming_responses:
|
while self.streaming_responses:
|
||||||
req, response = self.streaming_responses.pop()
|
req, response = self.streaming_responses.pop()
|
||||||
|
@ -367,7 +362,7 @@ class ManagedStream(ManagedDownloadSource):
|
||||||
return sent
|
return sent
|
||||||
except ConnectionError:
|
except ConnectionError:
|
||||||
return sent
|
return sent
|
||||||
except (OSError, Exception, asyncio.CancelledError) as err:
|
except (OSError, Exception) as err:
|
||||||
if isinstance(err, asyncio.CancelledError):
|
if isinstance(err, asyncio.CancelledError):
|
||||||
log.warning("stopped uploading %s#%s to reflector", self.claim_name, self.claim_id)
|
log.warning("stopped uploading %s#%s to reflector", self.claim_name, self.claim_id)
|
||||||
elif isinstance(err, OSError):
|
elif isinstance(err, OSError):
|
||||||
|
|
|
@ -32,7 +32,7 @@ def path_or_none(encoded_path) -> Optional[str]:
|
||||||
class StreamManager(SourceManager):
|
class StreamManager(SourceManager):
|
||||||
_sources: typing.Dict[str, ManagedStream]
|
_sources: typing.Dict[str, ManagedStream]
|
||||||
|
|
||||||
filter_fields = SourceManager.filter_fields
|
filter_fields = set(SourceManager.filter_fields)
|
||||||
filter_fields.update({
|
filter_fields.update({
|
||||||
'sd_hash',
|
'sd_hash',
|
||||||
'stream_hash',
|
'stream_hash',
|
||||||
|
@ -164,6 +164,8 @@ class StreamManager(SourceManager):
|
||||||
async def reflect_streams(self):
|
async def reflect_streams(self):
|
||||||
try:
|
try:
|
||||||
return await self._reflect_streams()
|
return await self._reflect_streams()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
except Exception:
|
except Exception:
|
||||||
log.exception("reflector task encountered an unexpected error!")
|
log.exception("reflector task encountered an unexpected error!")
|
||||||
|
|
||||||
|
@ -196,8 +198,8 @@ class StreamManager(SourceManager):
|
||||||
await super().start()
|
await super().start()
|
||||||
self.re_reflect_task = self.loop.create_task(self.reflect_streams())
|
self.re_reflect_task = self.loop.create_task(self.reflect_streams())
|
||||||
|
|
||||||
async def stop(self):
|
def stop(self):
|
||||||
await super().stop()
|
super().stop()
|
||||||
if self.resume_saving_task and not self.resume_saving_task.done():
|
if self.resume_saving_task and not self.resume_saving_task.done():
|
||||||
self.resume_saving_task.cancel()
|
self.resume_saving_task.cancel()
|
||||||
if self.re_reflect_task and not self.re_reflect_task.done():
|
if self.re_reflect_task and not self.re_reflect_task.done():
|
||||||
|
@ -224,8 +226,7 @@ class StreamManager(SourceManager):
|
||||||
)
|
)
|
||||||
return task
|
return task
|
||||||
|
|
||||||
@staticmethod
|
async def _retriable_reflect_stream(self, stream, host, port):
|
||||||
async def _retriable_reflect_stream(stream, host, port):
|
|
||||||
sent = await stream.upload_to_reflector(host, port)
|
sent = await stream.upload_to_reflector(host, port)
|
||||||
while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0:
|
while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0:
|
||||||
stream.reflector_progress = 0
|
stream.reflector_progress = 0
|
||||||
|
@ -260,7 +261,7 @@ class StreamManager(SourceManager):
|
||||||
return
|
return
|
||||||
if source.identifier in self.running_reflector_uploads:
|
if source.identifier in self.running_reflector_uploads:
|
||||||
self.running_reflector_uploads[source.identifier].cancel()
|
self.running_reflector_uploads[source.identifier].cancel()
|
||||||
await source.stop_tasks()
|
source.stop_tasks()
|
||||||
if source.identifier in self.streams:
|
if source.identifier in self.streams:
|
||||||
del self.streams[source.identifier]
|
del self.streams[source.identifier]
|
||||||
blob_hashes = [source.identifier] + [b.blob_hash for b in source.descriptor.blobs[:-1]]
|
blob_hashes = [source.identifier] + [b.blob_hash for b in source.descriptor.blobs[:-1]]
|
||||||
|
|
|
@ -394,6 +394,7 @@ class CommandTestCase(IntegrationTestCase):
|
||||||
logging.getLogger('lbry.blob_exchange').setLevel(self.VERBOSITY)
|
logging.getLogger('lbry.blob_exchange').setLevel(self.VERBOSITY)
|
||||||
logging.getLogger('lbry.daemon').setLevel(self.VERBOSITY)
|
logging.getLogger('lbry.daemon').setLevel(self.VERBOSITY)
|
||||||
logging.getLogger('lbry.stream').setLevel(self.VERBOSITY)
|
logging.getLogger('lbry.stream').setLevel(self.VERBOSITY)
|
||||||
|
logging.getLogger('lbry.torrent').setLevel(self.VERBOSITY)
|
||||||
logging.getLogger('lbry.wallet').setLevel(self.VERBOSITY)
|
logging.getLogger('lbry.wallet').setLevel(self.VERBOSITY)
|
||||||
|
|
||||||
await super().asyncSetUp()
|
await super().asyncSetUp()
|
||||||
|
|
|
@ -3,9 +3,8 @@ import binascii
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
import random
|
import random
|
||||||
from hashlib import sha1
|
|
||||||
from tempfile import mkdtemp
|
from tempfile import mkdtemp
|
||||||
from typing import Optional
|
from typing import Optional, Tuple, Dict
|
||||||
|
|
||||||
import libtorrent
|
import libtorrent
|
||||||
|
|
||||||
|
@ -14,6 +13,8 @@ log = logging.getLogger(__name__)
|
||||||
DEFAULT_FLAGS = ( # fixme: somehow the logic here is inverted?
|
DEFAULT_FLAGS = ( # fixme: somehow the logic here is inverted?
|
||||||
libtorrent.add_torrent_params_flags_t.flag_auto_managed
|
libtorrent.add_torrent_params_flags_t.flag_auto_managed
|
||||||
| libtorrent.add_torrent_params_flags_t.flag_update_subscribe
|
| libtorrent.add_torrent_params_flags_t.flag_update_subscribe
|
||||||
|
| libtorrent.add_torrent_params_flags_t.flag_sequential_download
|
||||||
|
| libtorrent.add_torrent_params_flags_t.flag_paused
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -22,66 +23,102 @@ class TorrentHandle:
|
||||||
self._loop = loop
|
self._loop = loop
|
||||||
self._executor = executor
|
self._executor = executor
|
||||||
self._handle: libtorrent.torrent_handle = handle
|
self._handle: libtorrent.torrent_handle = handle
|
||||||
self.started = asyncio.Event(loop=loop)
|
|
||||||
self.finished = asyncio.Event(loop=loop)
|
self.finished = asyncio.Event(loop=loop)
|
||||||
self.metadata_completed = asyncio.Event(loop=loop)
|
self.metadata_completed = asyncio.Event(loop=loop)
|
||||||
self.size = 0
|
self.size = handle.status().total_wanted
|
||||||
self.total_wanted_done = 0
|
self.total_wanted_done = 0
|
||||||
self.name = ''
|
self.name = ''
|
||||||
self.tasks = []
|
self.tasks = []
|
||||||
self.torrent_file: Optional[libtorrent.file_storage] = None
|
self._torrent_info: libtorrent.torrent_info = handle.torrent_file()
|
||||||
self._base_path = None
|
self._base_path = None
|
||||||
self._handle.set_sequential_download(1)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def largest_file(self) -> Optional[str]:
|
def torrent_file(self) -> Optional[libtorrent.file_storage]:
|
||||||
if not self.torrent_file:
|
return self._torrent_info.files()
|
||||||
|
|
||||||
|
def full_path_at(self, file_num) -> Optional[str]:
|
||||||
|
if self.torrent_file is None:
|
||||||
return None
|
return None
|
||||||
index = self.largest_file_index
|
return os.path.join(self.save_path, self.torrent_file.file_path(file_num))
|
||||||
return os.path.join(self._base_path, self.torrent_file.at(index).path)
|
|
||||||
|
def size_at(self, file_num) -> Optional[int]:
|
||||||
|
if self.torrent_file is not None:
|
||||||
|
return self.torrent_file.file_size(file_num)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def largest_file_index(self):
|
def save_path(self) -> Optional[str]:
|
||||||
largest_size, index = 0, 0
|
if not self._base_path:
|
||||||
|
self._base_path = self._handle.status().save_path
|
||||||
|
return self._base_path
|
||||||
|
|
||||||
|
def index_from_name(self, file_name):
|
||||||
for file_num in range(self.torrent_file.num_files()):
|
for file_num in range(self.torrent_file.num_files()):
|
||||||
if self.torrent_file.file_size(file_num) > largest_size:
|
if '.pad' in self.torrent_file.file_path(file_num):
|
||||||
largest_size = self.torrent_file.file_size(file_num)
|
continue # ignore padding files
|
||||||
index = file_num
|
if file_name == os.path.basename(self.full_path_at(file_num)):
|
||||||
return index
|
return file_num
|
||||||
|
|
||||||
def stop_tasks(self):
|
def stop_tasks(self):
|
||||||
|
self._handle.save_resume_data()
|
||||||
while self.tasks:
|
while self.tasks:
|
||||||
self.tasks.pop().cancel()
|
self.tasks.pop().cancel()
|
||||||
|
|
||||||
|
def byte_range_to_piece_range(
|
||||||
|
self, file_index, start_offset, end_offset) -> Tuple[libtorrent.peer_request, libtorrent.peer_request]:
|
||||||
|
start_piece = self._torrent_info.map_file(file_index, start_offset, 0)
|
||||||
|
end_piece = self._torrent_info.map_file(file_index, end_offset, 0)
|
||||||
|
return start_piece, end_piece
|
||||||
|
|
||||||
|
async def stream_range_as_completed(self, file_name, start, end):
|
||||||
|
file_index = self.index_from_name(file_name)
|
||||||
|
if file_index is None:
|
||||||
|
raise ValueError(f"Attempt to stream from invalid file. Expected name: {file_name}")
|
||||||
|
first_piece, final_piece = self.byte_range_to_piece_range(file_index, start, end)
|
||||||
|
start_piece_offset = first_piece.start
|
||||||
|
piece_size = self._torrent_info.piece_length()
|
||||||
|
log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d, piece size: %d): %s",
|
||||||
|
first_piece.piece, final_piece.piece, start, end, piece_size, self.name)
|
||||||
|
self.prioritize(file_index, start, end)
|
||||||
|
for piece_index in range(first_piece.piece, final_piece.piece + 1):
|
||||||
|
while not self._handle.have_piece(piece_index):
|
||||||
|
log.info("Waiting for piece %d: %s", piece_index, self.name)
|
||||||
|
self._handle.set_piece_deadline(piece_index, 0)
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
log.info("Streaming piece offset %d / %d for torrent %s", piece_index, final_piece.piece, self.name)
|
||||||
|
yield piece_size - start_piece_offset
|
||||||
|
|
||||||
def _show_status(self):
|
def _show_status(self):
|
||||||
# fixme: cleanup
|
# fixme: cleanup
|
||||||
if not self._handle.is_valid():
|
if not self._handle.is_valid():
|
||||||
return
|
return
|
||||||
status = self._handle.status()
|
status = self._handle.status()
|
||||||
|
self._base_path = status.save_path
|
||||||
if status.has_metadata:
|
if status.has_metadata:
|
||||||
self.size = status.total_wanted
|
self.size = status.total_wanted
|
||||||
self.total_wanted_done = status.total_wanted_done
|
self.total_wanted_done = status.total_wanted_done
|
||||||
self.name = status.name
|
self.name = status.name
|
||||||
if not self.metadata_completed.is_set():
|
if not self.metadata_completed.is_set():
|
||||||
self.metadata_completed.set()
|
self.metadata_completed.set()
|
||||||
|
self._torrent_info = self._handle.torrent_file()
|
||||||
log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name)
|
log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name)
|
||||||
self.torrent_file = self._handle.get_torrent_info().files()
|
log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s',
|
||||||
self._base_path = status.save_path
|
status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000,
|
||||||
first_piece = self.torrent_file.at(self.largest_file_index).offset
|
status.num_peers, status.num_seeds, status.state, status.save_path)
|
||||||
if not self.started.is_set():
|
if (status.is_finished or status.is_seeding) and not self.finished.is_set():
|
||||||
if self._handle.have_piece(first_piece):
|
|
||||||
self.started.set()
|
|
||||||
else:
|
|
||||||
# prioritize it
|
|
||||||
self._handle.set_piece_deadline(first_piece, 100)
|
|
||||||
if not status.is_seeding:
|
|
||||||
log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s',
|
|
||||||
status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000,
|
|
||||||
status.num_peers, status.num_seeds, status.state, status.save_path)
|
|
||||||
elif not self.finished.is_set():
|
|
||||||
self.finished.set()
|
self.finished.set()
|
||||||
log.info("Torrent finished: %s", self.name)
|
log.info("Torrent finished: %s", self.name)
|
||||||
|
|
||||||
|
def prioritize(self, file_index, start, end, cleanup=False):
|
||||||
|
first_piece, last_piece = self.byte_range_to_piece_range(file_index, start, end)
|
||||||
|
priorities = self._handle.get_piece_priorities()
|
||||||
|
priorities = [0 if cleanup else 1 for _ in priorities]
|
||||||
|
self._handle.clear_piece_deadlines()
|
||||||
|
for idx, piece_number in enumerate(range(first_piece.piece, last_piece.piece)):
|
||||||
|
priorities[piece_number] = 7 - idx if 0 <= idx <= 6 else 1
|
||||||
|
self._handle.set_piece_deadline(piece_number, idx)
|
||||||
|
log.debug("Prioritizing pieces for %s: %s", self.name, priorities)
|
||||||
|
self._handle.prioritize_pieces(priorities)
|
||||||
|
|
||||||
async def status_loop(self):
|
async def status_loop(self):
|
||||||
while True:
|
while True:
|
||||||
self._show_status()
|
self._show_status()
|
||||||
|
@ -105,19 +142,21 @@ class TorrentSession:
|
||||||
self._loop = loop
|
self._loop = loop
|
||||||
self._executor = executor
|
self._executor = executor
|
||||||
self._session: Optional[libtorrent.session] = None
|
self._session: Optional[libtorrent.session] = None
|
||||||
self._handles = {}
|
self._handles: Dict[str, TorrentHandle] = {}
|
||||||
self.tasks = []
|
self.tasks = []
|
||||||
self.wait_start = True
|
|
||||||
|
|
||||||
async def add_fake_torrent(self):
|
def add_peer(self, btih, addr, port):
|
||||||
|
self._handles[btih]._handle.connect_peer((addr, port))
|
||||||
|
|
||||||
|
async def add_fake_torrent(self, file_count=3):
|
||||||
tmpdir = mkdtemp()
|
tmpdir = mkdtemp()
|
||||||
info, btih = _create_fake_torrent(tmpdir)
|
info = _create_fake_torrent(tmpdir, file_count=file_count)
|
||||||
flags = libtorrent.add_torrent_params_flags_t.flag_seed_mode
|
flags = libtorrent.add_torrent_params_flags_t.flag_seed_mode
|
||||||
handle = self._session.add_torrent({
|
handle = self._session.add_torrent({
|
||||||
'ti': info, 'save_path': tmpdir, 'flags': flags
|
'ti': info, 'save_path': tmpdir, 'flags': flags
|
||||||
})
|
})
|
||||||
self._handles[btih] = TorrentHandle(self._loop, self._executor, handle)
|
self._handles[str(info.info_hash())] = TorrentHandle(self._loop, self._executor, handle)
|
||||||
return btih
|
return str(info.info_hash())
|
||||||
|
|
||||||
async def bind(self, interface: str = '0.0.0.0', port: int = 10889):
|
async def bind(self, interface: str = '0.0.0.0', port: int = 10889):
|
||||||
settings = {
|
settings = {
|
||||||
|
@ -131,15 +170,14 @@ class TorrentSession:
|
||||||
self.tasks.append(self._loop.create_task(self.process_alerts()))
|
self.tasks.append(self._loop.create_task(self.process_alerts()))
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
while self._handles:
|
||||||
|
self._handles.popitem()[1].stop_tasks()
|
||||||
while self.tasks:
|
while self.tasks:
|
||||||
self.tasks.pop().cancel()
|
self.tasks.pop().cancel()
|
||||||
self._session.save_state()
|
if self._session:
|
||||||
self._session.pause()
|
self._session.save_state()
|
||||||
self._session.stop_dht()
|
self._session.pause()
|
||||||
self._session.stop_lsd()
|
self._session = None
|
||||||
self._session.stop_natpmp()
|
|
||||||
self._session.stop_upnp()
|
|
||||||
self._session = None
|
|
||||||
|
|
||||||
def _pop_alerts(self):
|
def _pop_alerts(self):
|
||||||
for alert in self._session.pop_alerts():
|
for alert in self._session.pop_alerts():
|
||||||
|
@ -173,18 +211,23 @@ class TorrentSession:
|
||||||
handle.force_dht_announce()
|
handle.force_dht_announce()
|
||||||
self._handles[btih] = TorrentHandle(self._loop, self._executor, handle)
|
self._handles[btih] = TorrentHandle(self._loop, self._executor, handle)
|
||||||
|
|
||||||
def full_path(self, btih):
|
def full_path(self, btih, file_num) -> Optional[str]:
|
||||||
return self._handles[btih].largest_file
|
return self._handles[btih].full_path_at(file_num)
|
||||||
|
|
||||||
|
def save_path(self, btih):
|
||||||
|
return self._handles[btih].save_path
|
||||||
|
|
||||||
|
def has_torrent(self, btih):
|
||||||
|
return btih in self._handles
|
||||||
|
|
||||||
async def add_torrent(self, btih, download_path):
|
async def add_torrent(self, btih, download_path):
|
||||||
|
if btih in self._handles:
|
||||||
|
return await self._handles[btih].metadata_completed.wait()
|
||||||
await self._loop.run_in_executor(
|
await self._loop.run_in_executor(
|
||||||
self._executor, self._add_torrent, btih, download_path
|
self._executor, self._add_torrent, btih, download_path
|
||||||
)
|
)
|
||||||
self._handles[btih].tasks.append(self._loop.create_task(self._handles[btih].status_loop()))
|
self._handles[btih].tasks.append(self._loop.create_task(self._handles[btih].status_loop()))
|
||||||
await self._handles[btih].metadata_completed.wait()
|
await self._handles[btih].metadata_completed.wait()
|
||||||
if self.wait_start:
|
|
||||||
# fixme: temporary until we add streaming support, otherwise playback fails!
|
|
||||||
await self._handles[btih].started.wait()
|
|
||||||
|
|
||||||
def remove_torrent(self, btih, remove_files=False):
|
def remove_torrent(self, btih, remove_files=False):
|
||||||
if btih in self._handles:
|
if btih in self._handles:
|
||||||
|
@ -197,9 +240,17 @@ class TorrentSession:
|
||||||
handle = self._handles[btih]
|
handle = self._handles[btih]
|
||||||
await handle.resume()
|
await handle.resume()
|
||||||
|
|
||||||
def get_size(self, btih):
|
def get_total_size(self, btih):
|
||||||
return self._handles[btih].size
|
return self._handles[btih].size
|
||||||
|
|
||||||
|
def get_index_from_name(self, btih, file_name):
|
||||||
|
return self._handles[btih].index_from_name(file_name)
|
||||||
|
|
||||||
|
def get_size(self, btih, file_name) -> Optional[int]:
|
||||||
|
for (path, size) in self.get_files(btih).items():
|
||||||
|
if os.path.basename(path) == file_name:
|
||||||
|
return size
|
||||||
|
|
||||||
def get_name(self, btih):
|
def get_name(self, btih):
|
||||||
return self._handles[btih].name
|
return self._handles[btih].name
|
||||||
|
|
||||||
|
@ -209,23 +260,38 @@ class TorrentSession:
|
||||||
def is_completed(self, btih):
|
def is_completed(self, btih):
|
||||||
return self._handles[btih].finished.is_set()
|
return self._handles[btih].finished.is_set()
|
||||||
|
|
||||||
|
def stream_file(self, btih, file_name, start, end):
|
||||||
|
handle = self._handles[btih]
|
||||||
|
return handle.stream_range_as_completed(file_name, start, end)
|
||||||
|
|
||||||
|
def get_files(self, btih) -> Dict:
|
||||||
|
handle = self._handles[btih]
|
||||||
|
return {
|
||||||
|
self.full_path(btih, file_num): handle.torrent_file.file_size(file_num)
|
||||||
|
for file_num in range(handle.torrent_file.num_files())
|
||||||
|
if '.pad' not in handle.torrent_file.file_path(file_num)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def get_magnet_uri(btih):
|
def get_magnet_uri(btih):
|
||||||
return f"magnet:?xt=urn:btih:{btih}"
|
return f"magnet:?xt=urn:btih:{btih}"
|
||||||
|
|
||||||
|
|
||||||
def _create_fake_torrent(tmpdir):
|
def _create_fake_torrent(tmpdir, file_count=3, largest_index=1):
|
||||||
# beware, that's just for testing
|
# layout: subdir/tmp{0..file_count-1} files. v1+v2. automatic piece size.
|
||||||
path = os.path.join(tmpdir, 'tmp')
|
# largest_index: which file index {0 ... file_count} will be the largest file
|
||||||
with open(path, 'wb') as myfile:
|
|
||||||
size = myfile.write(bytes([random.randint(0, 255) for _ in range(40)]) * 1024)
|
|
||||||
file_storage = libtorrent.file_storage()
|
file_storage = libtorrent.file_storage()
|
||||||
file_storage.add_file('tmp', size)
|
subfolder = os.path.join(tmpdir, "subdir")
|
||||||
t = libtorrent.create_torrent(file_storage, 0, 4 * 1024 * 1024)
|
os.mkdir(subfolder)
|
||||||
|
for file_number in range(file_count):
|
||||||
|
file_name = f"tmp{file_number}"
|
||||||
|
with open(os.path.join(subfolder, file_name), 'wb') as myfile:
|
||||||
|
size = myfile.write(
|
||||||
|
bytes([random.randint(0, 255) for _ in range(10 - abs(file_number - largest_index))]) * 1024)
|
||||||
|
file_storage.add_file(os.path.join("subdir", file_name), size)
|
||||||
|
t = libtorrent.create_torrent(file_storage, 0, 0)
|
||||||
libtorrent.set_piece_hashes(t, tmpdir)
|
libtorrent.set_piece_hashes(t, tmpdir)
|
||||||
info = libtorrent.torrent_info(t.generate())
|
return libtorrent.torrent_info(t.generate())
|
||||||
btih = sha1(info.metadata()).hexdigest()
|
|
||||||
return info, btih
|
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
@ -238,17 +304,16 @@ async def main():
|
||||||
|
|
||||||
executor = None
|
executor = None
|
||||||
session = TorrentSession(asyncio.get_event_loop(), executor)
|
session = TorrentSession(asyncio.get_event_loop(), executor)
|
||||||
session2 = TorrentSession(asyncio.get_event_loop(), executor)
|
await session.bind()
|
||||||
await session.bind('localhost', port=4040)
|
await session.add_torrent(btih, os.path.expanduser("~/Downloads"))
|
||||||
await session2.bind('localhost', port=4041)
|
|
||||||
btih = await session.add_fake_torrent()
|
|
||||||
session2._session.add_dht_node(('localhost', 4040))
|
|
||||||
await session2.add_torrent(btih, "/tmp/down")
|
|
||||||
while True:
|
while True:
|
||||||
await asyncio.sleep(100)
|
session.full_path(btih, 0)
|
||||||
|
await asyncio.sleep(1)
|
||||||
await session.pause()
|
await session.pause()
|
||||||
executor.shutdown()
|
executor.shutdown()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
|
|
|
@ -1,12 +1,14 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import binascii
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import typing
|
import typing
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from aiohttp.web import Request
|
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
|
||||||
|
|
||||||
|
from lbry.error import DownloadMetadataTimeoutError
|
||||||
from lbry.file.source_manager import SourceManager
|
from lbry.file.source_manager import SourceManager
|
||||||
from lbry.file.source import ManagedDownloadSource
|
from lbry.file.source import ManagedDownloadSource
|
||||||
|
from lbry.schema.mime_types import guess_media_type
|
||||||
|
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from lbry.torrent.session import TorrentSession
|
from lbry.torrent.session import TorrentSession
|
||||||
|
@ -19,12 +21,6 @@ if typing.TYPE_CHECKING:
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def path_or_none(encoded_path) -> Optional[str]:
|
|
||||||
if not encoded_path:
|
|
||||||
return
|
|
||||||
return binascii.unhexlify(encoded_path).decode()
|
|
||||||
|
|
||||||
|
|
||||||
class TorrentSource(ManagedDownloadSource):
|
class TorrentSource(ManagedDownloadSource):
|
||||||
STATUS_STOPPED = "stopped"
|
STATUS_STOPPED = "stopped"
|
||||||
filter_fields = SourceManager.filter_fields
|
filter_fields = SourceManager.filter_fields
|
||||||
|
@ -42,15 +38,55 @@ class TorrentSource(ManagedDownloadSource):
|
||||||
super().__init__(loop, config, storage, identifier, file_name, download_directory, status, claim, download_id,
|
super().__init__(loop, config, storage, identifier, file_name, download_directory, status, claim, download_id,
|
||||||
rowid, content_fee, analytics_manager, added_on)
|
rowid, content_fee, analytics_manager, added_on)
|
||||||
self.torrent_session = torrent_session
|
self.torrent_session = torrent_session
|
||||||
|
self._suggested_file_name = None
|
||||||
|
self._full_path = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def full_path(self) -> Optional[str]:
|
def full_path(self) -> Optional[str]:
|
||||||
full_path = self.torrent_session.full_path(self.identifier)
|
if not self._full_path:
|
||||||
self.download_directory = os.path.dirname(full_path)
|
self._full_path = self.select_path()
|
||||||
return full_path
|
self._file_name = os.path.basename(self._full_path)
|
||||||
|
self.download_directory = self.torrent_session.save_path(self.identifier)
|
||||||
|
return self._full_path
|
||||||
|
|
||||||
|
def select_path(self):
|
||||||
|
wanted_name = (self.stream_claim_info.claim.stream.source.name or '') if self.stream_claim_info else ''
|
||||||
|
wanted_index = self.torrent_session.get_index_from_name(self.identifier, wanted_name)
|
||||||
|
if wanted_index is None:
|
||||||
|
# maybe warn?
|
||||||
|
largest = (None, -1)
|
||||||
|
for (path, size) in self.torrent_session.get_files(self.identifier).items():
|
||||||
|
largest = (path, size) if size > largest[1] else largest
|
||||||
|
return largest[0]
|
||||||
|
else:
|
||||||
|
return self.torrent_session.full_path(self.identifier, wanted_index or 0)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def suggested_file_name(self):
|
||||||
|
self._suggested_file_name = self._suggested_file_name or os.path.basename(self.select_path())
|
||||||
|
return self._suggested_file_name
|
||||||
|
|
||||||
|
@property
|
||||||
|
def mime_type(self) -> Optional[str]:
|
||||||
|
return guess_media_type(os.path.basename(self.full_path))[0]
|
||||||
|
|
||||||
|
async def setup(self, timeout: Optional[float] = None):
|
||||||
|
try:
|
||||||
|
metadata_download = self.torrent_session.add_torrent(self.identifier, self.download_directory)
|
||||||
|
await asyncio.wait_for(metadata_download, timeout, loop=self.loop)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
self.torrent_session.remove_torrent(btih=self.identifier)
|
||||||
|
raise DownloadMetadataTimeoutError(self.identifier)
|
||||||
|
self.download_directory = self.torrent_session.save_path(self.identifier)
|
||||||
|
self._file_name = os.path.basename(self.full_path)
|
||||||
|
|
||||||
async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False):
|
async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False):
|
||||||
await self.torrent_session.add_torrent(self.identifier, self.download_directory)
|
await self.setup(timeout)
|
||||||
|
if not self.rowid:
|
||||||
|
await self.storage.add_torrent(self.identifier, self.torrent_length, self.torrent_name)
|
||||||
|
self.rowid = await self.storage.save_downloaded_file(
|
||||||
|
self.identifier, self.file_name, self.download_directory, 0.0, added_on=self._added_on
|
||||||
|
)
|
||||||
|
|
||||||
async def stop(self, finished: bool = False):
|
async def stop(self, finished: bool = False):
|
||||||
await self.torrent_session.remove_torrent(self.identifier)
|
await self.torrent_session.remove_torrent(self.identifier)
|
||||||
|
@ -60,7 +96,11 @@ class TorrentSource(ManagedDownloadSource):
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def torrent_length(self):
|
def torrent_length(self):
|
||||||
return self.torrent_session.get_size(self.identifier)
|
return self.torrent_session.get_total_size(self.identifier)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def stream_length(self):
|
||||||
|
return self.torrent_session.get_size(self.identifier, self.file_name)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def written_bytes(self):
|
def written_bytes(self):
|
||||||
|
@ -74,13 +114,63 @@ class TorrentSource(ManagedDownloadSource):
|
||||||
def bt_infohash(self):
|
def bt_infohash(self):
|
||||||
return self.identifier
|
return self.identifier
|
||||||
|
|
||||||
async def stop_tasks(self):
|
def stop_tasks(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def completed(self):
|
def completed(self):
|
||||||
return self.torrent_session.is_completed(self.identifier)
|
return self.torrent_session.is_completed(self.identifier)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def status(self):
|
||||||
|
return self.STATUS_FINISHED if self.completed else self.STATUS_RUNNING
|
||||||
|
|
||||||
|
async def stream_file(self, request):
|
||||||
|
log.info("stream torrent to browser for lbry://%s#%s (btih %s...)", self.claim_name, self.claim_id,
|
||||||
|
self.identifier[:6])
|
||||||
|
headers, start, end = self._prepare_range_response_headers(
|
||||||
|
request.headers.get('range', 'bytes=0-')
|
||||||
|
)
|
||||||
|
target = self.suggested_file_name
|
||||||
|
await self.start()
|
||||||
|
response = StreamResponse(
|
||||||
|
status=206,
|
||||||
|
headers=headers
|
||||||
|
)
|
||||||
|
await response.prepare(request)
|
||||||
|
while not os.path.exists(self.full_path):
|
||||||
|
async for _ in self.torrent_session.stream_file(self.identifier, target, start, end):
|
||||||
|
break
|
||||||
|
with open(self.full_path, 'rb') as infile:
|
||||||
|
infile.seek(start)
|
||||||
|
async for read_size in self.torrent_session.stream_file(self.identifier, target, start, end):
|
||||||
|
if infile.tell() + read_size < end:
|
||||||
|
await response.write(infile.read(read_size))
|
||||||
|
else:
|
||||||
|
await response.write_eof(infile.read(end - infile.tell() + 1))
|
||||||
|
return response
|
||||||
|
|
||||||
|
def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]:
|
||||||
|
if '=' in get_range:
|
||||||
|
get_range = get_range.split('=')[1]
|
||||||
|
start, end = get_range.split('-')
|
||||||
|
size = self.stream_length
|
||||||
|
|
||||||
|
start = int(start)
|
||||||
|
end = int(end) if end else size - 1
|
||||||
|
|
||||||
|
if end >= size or not 0 <= start < size:
|
||||||
|
raise HTTPRequestRangeNotSatisfiable()
|
||||||
|
|
||||||
|
final_size = end - start + 1
|
||||||
|
headers = {
|
||||||
|
'Accept-Ranges': 'bytes',
|
||||||
|
'Content-Range': f'bytes {start}-{end}/{size}',
|
||||||
|
'Content-Length': str(final_size),
|
||||||
|
'Content-Type': self.mime_type
|
||||||
|
}
|
||||||
|
return headers, start, end
|
||||||
|
|
||||||
|
|
||||||
class TorrentManager(SourceManager):
|
class TorrentManager(SourceManager):
|
||||||
_sources: typing.Dict[str, ManagedDownloadSource]
|
_sources: typing.Dict[str, ManagedDownloadSource]
|
||||||
|
@ -103,7 +193,7 @@ class TorrentManager(SourceManager):
|
||||||
async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[str],
|
async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[str],
|
||||||
download_directory: Optional[str], status: str,
|
download_directory: Optional[str], status: str,
|
||||||
claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'],
|
claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'],
|
||||||
added_on: Optional[int]):
|
added_on: Optional[int], **kwargs):
|
||||||
stream = TorrentSource(
|
stream = TorrentSource(
|
||||||
self.loop, self.config, self.storage, identifier=bt_infohash, file_name=file_name,
|
self.loop, self.config, self.storage, identifier=bt_infohash, file_name=file_name,
|
||||||
download_directory=download_directory, status=status, claim=claim, rowid=rowid,
|
download_directory=download_directory, status=status, claim=claim, rowid=rowid,
|
||||||
|
@ -111,15 +201,20 @@ class TorrentManager(SourceManager):
|
||||||
torrent_session=self.torrent_session
|
torrent_session=self.torrent_session
|
||||||
)
|
)
|
||||||
self.add(stream)
|
self.add(stream)
|
||||||
|
await stream.setup()
|
||||||
|
|
||||||
async def initialize_from_database(self):
|
async def initialize_from_database(self):
|
||||||
pass
|
for file in await self.storage.get_all_torrent_files():
|
||||||
|
claim = await self.storage.get_content_claim_for_torrent(file['bt_infohash'])
|
||||||
|
file['download_directory'] = bytes.fromhex(file['download_directory'] or '').decode() or None
|
||||||
|
file['file_name'] = bytes.fromhex(file['file_name'] or '').decode() or None
|
||||||
|
await self._load_stream(claim=claim, **file)
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
await super().start()
|
await super().start()
|
||||||
|
|
||||||
async def stop(self):
|
def stop(self):
|
||||||
await super().stop()
|
super().stop()
|
||||||
log.info("finished stopping the torrent manager")
|
log.info("finished stopping the torrent manager")
|
||||||
|
|
||||||
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
|
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
|
||||||
|
@ -132,9 +227,6 @@ class TorrentManager(SourceManager):
|
||||||
|
|
||||||
async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
|
async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
# blob_hashes = [source.sd_hash] + [b.blob_hash for b in source.descriptor.blobs[:-1]]
|
|
||||||
# await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
|
|
||||||
# await self.storage.delete_stream(source.descriptor)
|
|
||||||
|
|
||||||
async def stream_partial_content(self, request: Request, sd_hash: str):
|
async def stream_partial_content(self, request: Request, identifier: str):
|
||||||
raise NotImplementedError
|
return await self._sources[identifier].stream_file(request)
|
||||||
|
|
|
@ -141,7 +141,7 @@ class CoinSelector:
|
||||||
_) -> List[OutputEffectiveAmountEstimator]:
|
_) -> List[OutputEffectiveAmountEstimator]:
|
||||||
""" Accumulate UTXOs at random until there is enough to cover the target. """
|
""" Accumulate UTXOs at random until there is enough to cover the target. """
|
||||||
target = self.target + self.cost_of_change
|
target = self.target + self.cost_of_change
|
||||||
self.random.shuffle(txos, random=self.random.random) # pylint: disable=deprecated-argument
|
self.random.shuffle(txos, self.random.random)
|
||||||
selection = []
|
selection = []
|
||||||
amount = 0
|
amount = 0
|
||||||
for coin in txos:
|
for coin in txos:
|
||||||
|
|
|
@ -329,10 +329,10 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
async def start(self):
|
async def start(self):
|
||||||
if not os.path.exists(self.path):
|
if not os.path.exists(self.path):
|
||||||
os.mkdir(self.path)
|
os.mkdir(self.path)
|
||||||
await asyncio.wait(map(asyncio.create_task, [
|
await asyncio.wait([
|
||||||
self.db.open(),
|
self.db.open(),
|
||||||
self.headers.open()
|
self.headers.open()
|
||||||
]))
|
])
|
||||||
fully_synced = self.on_ready.first
|
fully_synced = self.on_ready.first
|
||||||
asyncio.create_task(self.network.start())
|
asyncio.create_task(self.network.start())
|
||||||
await self.network.on_connected.first
|
await self.network.on_connected.first
|
||||||
|
@ -466,9 +466,9 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
async def subscribe_accounts(self):
|
async def subscribe_accounts(self):
|
||||||
if self.network.is_connected and self.accounts:
|
if self.network.is_connected and self.accounts:
|
||||||
log.info("Subscribe to %i accounts", len(self.accounts))
|
log.info("Subscribe to %i accounts", len(self.accounts))
|
||||||
await asyncio.wait(map(asyncio.create_task, [
|
await asyncio.wait([
|
||||||
self.subscribe_account(a) for a in self.accounts
|
self.subscribe_account(a) for a in self.accounts
|
||||||
]))
|
])
|
||||||
|
|
||||||
async def subscribe_account(self, account: Account):
|
async def subscribe_account(self, account: Account):
|
||||||
for address_manager in account.address_managers.values():
|
for address_manager in account.address_managers.values():
|
||||||
|
@ -938,7 +938,9 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
"%d change addresses (gap: %d), %d channels, %d certificates and %d claims. ",
|
"%d change addresses (gap: %d), %d channels, %d certificates and %d claims. ",
|
||||||
account.id, balance, total_receiving, account.receiving.gap, total_change,
|
account.id, balance, total_receiving, account.receiving.gap, total_change,
|
||||||
account.change.gap, channel_count, len(account.channel_keys), claim_count)
|
account.change.gap, channel_count, len(account.channel_keys), claim_count)
|
||||||
except Exception:
|
except Exception as err:
|
||||||
|
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
|
||||||
|
raise
|
||||||
log.exception(
|
log.exception(
|
||||||
'Failed to display wallet state, please file issue '
|
'Failed to display wallet state, please file issue '
|
||||||
'for this bug along with the traceback you see below:')
|
'for this bug along with the traceback you see below:')
|
||||||
|
@ -961,7 +963,9 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
claim_ids = [p.purchased_claim_id for p in purchases]
|
claim_ids = [p.purchased_claim_id for p in purchases]
|
||||||
try:
|
try:
|
||||||
resolved, _, _, _ = await self.claim_search([], claim_ids=claim_ids)
|
resolved, _, _, _ = await self.claim_search([], claim_ids=claim_ids)
|
||||||
except Exception:
|
except Exception as err:
|
||||||
|
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
|
||||||
|
raise
|
||||||
log.exception("Resolve failed while looking up purchased claim ids:")
|
log.exception("Resolve failed while looking up purchased claim ids:")
|
||||||
resolved = []
|
resolved = []
|
||||||
lookup = {claim.claim_id: claim for claim in resolved}
|
lookup = {claim.claim_id: claim for claim in resolved}
|
||||||
|
@ -1041,7 +1045,9 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
claim_ids = collection.claim.collection.claims.ids[offset:page_size + offset]
|
claim_ids = collection.claim.collection.claims.ids[offset:page_size + offset]
|
||||||
try:
|
try:
|
||||||
resolve_results, _, _, _ = await self.claim_search([], claim_ids=claim_ids)
|
resolve_results, _, _, _ = await self.claim_search([], claim_ids=claim_ids)
|
||||||
except Exception:
|
except Exception as err:
|
||||||
|
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
|
||||||
|
raise
|
||||||
log.exception("Resolve failed while looking up collection claim ids:")
|
log.exception("Resolve failed while looking up collection claim ids:")
|
||||||
return []
|
return []
|
||||||
claims = []
|
claims = []
|
||||||
|
|
|
@ -117,7 +117,7 @@ class ClientSession(BaseClientSession):
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
await asyncio.sleep(max(0, max_idle - (now - self.last_send)))
|
await asyncio.sleep(max(0, max_idle - (now - self.last_send)))
|
||||||
except (Exception, asyncio.CancelledError) as err:
|
except Exception as err:
|
||||||
if isinstance(err, asyncio.CancelledError):
|
if isinstance(err, asyncio.CancelledError):
|
||||||
log.info("closing connection to %s:%i", *self.server)
|
log.info("closing connection to %s:%i", *self.server)
|
||||||
else:
|
else:
|
||||||
|
@ -214,7 +214,7 @@ class Network:
|
||||||
def loop_task_done_callback(f):
|
def loop_task_done_callback(f):
|
||||||
try:
|
try:
|
||||||
f.result()
|
f.result()
|
||||||
except (Exception, asyncio.CancelledError):
|
except Exception:
|
||||||
if self.running:
|
if self.running:
|
||||||
log.exception("wallet server connection loop crashed")
|
log.exception("wallet server connection loop crashed")
|
||||||
|
|
||||||
|
@ -312,8 +312,7 @@ class Network:
|
||||||
sleep_delay = 30
|
sleep_delay = 30
|
||||||
while self.running:
|
while self.running:
|
||||||
await asyncio.wait(
|
await asyncio.wait(
|
||||||
map(asyncio.create_task, [asyncio.sleep(30), self._urgent_need_reconnect.wait()]),
|
[asyncio.sleep(30), self._urgent_need_reconnect.wait()], return_when=asyncio.FIRST_COMPLETED
|
||||||
return_when=asyncio.FIRST_COMPLETED
|
|
||||||
)
|
)
|
||||||
if self._urgent_need_reconnect.is_set():
|
if self._urgent_need_reconnect.is_set():
|
||||||
sleep_delay = 30
|
sleep_delay = 30
|
||||||
|
@ -339,7 +338,7 @@ class Network:
|
||||||
try:
|
try:
|
||||||
if not self._urgent_need_reconnect.is_set():
|
if not self._urgent_need_reconnect.is_set():
|
||||||
await asyncio.wait(
|
await asyncio.wait(
|
||||||
[self._keepalive_task, asyncio.create_task(self._urgent_need_reconnect.wait())],
|
[self._keepalive_task, self._urgent_need_reconnect.wait()],
|
||||||
return_when=asyncio.FIRST_COMPLETED
|
return_when=asyncio.FIRST_COMPLETED
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -264,7 +264,8 @@ class SPVNode:
|
||||||
await self.server.start()
|
await self.server.start()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.stopped = True
|
self.stopped = True
|
||||||
log.exception("failed to start spv node")
|
if not isinstance(e, asyncio.CancelledError):
|
||||||
|
log.exception("failed to start spv node")
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
async def stop(self, cleanup=True):
|
async def stop(self, cleanup=True):
|
||||||
|
|
|
@ -28,7 +28,6 @@ disable=
|
||||||
no-else-return,
|
no-else-return,
|
||||||
cyclic-import,
|
cyclic-import,
|
||||||
missing-docstring,
|
missing-docstring,
|
||||||
consider-using-f-string,
|
|
||||||
duplicate-code,
|
duplicate-code,
|
||||||
expression-not-assigned,
|
expression-not-assigned,
|
||||||
inconsistent-return-statements,
|
inconsistent-return-statements,
|
||||||
|
|
6
setup.py
6
setup.py
|
@ -18,7 +18,7 @@ setup(
|
||||||
long_description_content_type="text/markdown",
|
long_description_content_type="text/markdown",
|
||||||
keywords="lbry protocol media",
|
keywords="lbry protocol media",
|
||||||
license='MIT',
|
license='MIT',
|
||||||
python_requires='>=3.8',
|
python_requires='>=3.7',
|
||||||
packages=find_packages(exclude=('tests',)),
|
packages=find_packages(exclude=('tests',)),
|
||||||
zip_safe=False,
|
zip_safe=False,
|
||||||
entry_points={
|
entry_points={
|
||||||
|
@ -36,7 +36,7 @@ setup(
|
||||||
'distro==1.4.0',
|
'distro==1.4.0',
|
||||||
'base58==1.0.0',
|
'base58==1.0.0',
|
||||||
'cffi==1.13.2',
|
'cffi==1.13.2',
|
||||||
'cryptography==3.4.7',
|
'cryptography==2.5',
|
||||||
'protobuf==3.17.2',
|
'protobuf==3.17.2',
|
||||||
'prometheus_client==0.7.1',
|
'prometheus_client==0.7.1',
|
||||||
'ecdsa==0.13.3',
|
'ecdsa==0.13.3',
|
||||||
|
@ -50,7 +50,7 @@ setup(
|
||||||
],
|
],
|
||||||
extras_require={
|
extras_require={
|
||||||
'lint': [
|
'lint': [
|
||||||
'pylint==2.13.9'
|
'pylint==2.10.0'
|
||||||
],
|
],
|
||||||
'test': [
|
'test': [
|
||||||
'coverage',
|
'coverage',
|
||||||
|
|
|
@ -61,14 +61,16 @@ def mock_network_loop(loop: asyncio.AbstractEventLoop,
|
||||||
dht_network[from_addr] = protocol
|
dht_network[from_addr] = protocol
|
||||||
return transport, protocol
|
return transport, protocol
|
||||||
|
|
||||||
mock_sock = mock.Mock(spec=socket.socket)
|
with mock.patch('socket.socket') as mock_socket:
|
||||||
mock_sock.setsockopt = lambda *_: None
|
mock_sock = mock.Mock(spec=socket.socket)
|
||||||
mock_sock.bind = lambda *_: None
|
mock_sock.setsockopt = lambda *_: None
|
||||||
mock_sock.setblocking = lambda *_: None
|
mock_sock.bind = lambda *_: None
|
||||||
mock_sock.getsockname = lambda: "0.0.0.0"
|
mock_sock.setblocking = lambda *_: None
|
||||||
mock_sock.getpeername = lambda: ""
|
mock_sock.getsockname = lambda: "0.0.0.0"
|
||||||
mock_sock.close = lambda: None
|
mock_sock.getpeername = lambda: ""
|
||||||
mock_sock.type = socket.SOCK_DGRAM
|
mock_sock.close = lambda: None
|
||||||
mock_sock.fileno = lambda: 7
|
mock_sock.type = socket.SOCK_DGRAM
|
||||||
loop.create_datagram_endpoint = create_datagram_endpoint
|
mock_sock.fileno = lambda: 7
|
||||||
yield
|
mock_socket.return_value = mock_sock
|
||||||
|
loop.create_datagram_endpoint = create_datagram_endpoint
|
||||||
|
yield
|
||||||
|
|
|
@ -1,14 +1,18 @@
|
||||||
|
import time
|
||||||
import unittest
|
import unittest
|
||||||
from unittest import skipIf
|
from unittest import skipIf
|
||||||
import asyncio
|
import asyncio
|
||||||
import os
|
import os
|
||||||
from binascii import hexlify
|
from binascii import hexlify
|
||||||
|
|
||||||
|
import aiohttp.web
|
||||||
|
|
||||||
from lbry.schema import Claim
|
from lbry.schema import Claim
|
||||||
from lbry.stream.background_downloader import BackgroundDownloader
|
from lbry.stream.background_downloader import BackgroundDownloader
|
||||||
from lbry.stream.descriptor import StreamDescriptor
|
from lbry.stream.descriptor import StreamDescriptor
|
||||||
from lbry.testcase import CommandTestCase
|
from lbry.testcase import CommandTestCase
|
||||||
from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT
|
from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT
|
||||||
|
from lbry.utils import aiohttp_request
|
||||||
from lbry.wallet import Transaction
|
from lbry.wallet import Transaction
|
||||||
from lbry.torrent.tracker import UDPTrackerServerProtocol
|
from lbry.torrent.tracker import UDPTrackerServerProtocol
|
||||||
|
|
||||||
|
@ -17,55 +21,104 @@ class FileCommands(CommandTestCase):
|
||||||
def __init__(self, *a, **kw):
|
def __init__(self, *a, **kw):
|
||||||
super().__init__(*a, **kw)
|
super().__init__(*a, **kw)
|
||||||
self.skip_libtorrent = False
|
self.skip_libtorrent = False
|
||||||
|
self.streaming_port = 60818
|
||||||
|
self.seeder_session = None
|
||||||
|
|
||||||
async def add_forever(self):
|
async def initialize_torrent(self, tx_to_update=None, pick_a_file=True, name=None):
|
||||||
while True:
|
assert name is None or tx_to_update is None
|
||||||
for handle in self.client_session._handles.values():
|
if not self.seeder_session:
|
||||||
handle._handle.connect_peer(('127.0.0.1', 4040))
|
|
||||||
await asyncio.sleep(.1)
|
|
||||||
|
|
||||||
async def initialize_torrent(self, tx_to_update=None):
|
|
||||||
if not hasattr(self, 'seeder_session'):
|
|
||||||
self.seeder_session = TorrentSession(self.loop, None)
|
self.seeder_session = TorrentSession(self.loop, None)
|
||||||
self.addCleanup(self.seeder_session.stop)
|
self.addCleanup(self.seeder_session.stop)
|
||||||
await self.seeder_session.bind('127.0.0.1', port=4040)
|
await self.seeder_session.bind('127.0.0.1', port=4040)
|
||||||
btih = await self.seeder_session.add_fake_torrent()
|
btih = await self.seeder_session.add_fake_torrent(file_count=3)
|
||||||
|
files = [(size, path) for (path, size) in self.seeder_session.get_files(btih).items()]
|
||||||
|
files.sort()
|
||||||
|
# picking a file will pick something in the middle, while automatic selection will pick largest
|
||||||
|
self.expected_size, self.expected_path = files[1] if pick_a_file else files[-1]
|
||||||
|
|
||||||
address = await self.account.receiving.get_or_create_usable_address()
|
address = await self.account.receiving.get_or_create_usable_address()
|
||||||
|
claim = tx_to_update.outputs[0].claim if tx_to_update else Claim()
|
||||||
|
claim.stream.update(bt_infohash=btih)
|
||||||
|
if pick_a_file:
|
||||||
|
claim.stream.source.name = os.path.basename(self.expected_path)
|
||||||
if not tx_to_update:
|
if not tx_to_update:
|
||||||
claim = Claim()
|
|
||||||
claim.stream.update(bt_infohash=btih)
|
|
||||||
tx = await Transaction.claim_create(
|
tx = await Transaction.claim_create(
|
||||||
'torrent', claim, 1, address, [self.account], self.account
|
name or 'torrent', claim, 1, address, [self.account], self.account
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
claim = tx_to_update.outputs[0].claim
|
|
||||||
claim.stream.update(bt_infohash=btih)
|
|
||||||
tx = await Transaction.claim_update(
|
tx = await Transaction.claim_update(
|
||||||
tx_to_update.outputs[0], claim, 1, address, [self.account], self.account
|
tx_to_update.outputs[0], claim, 1, address, [self.account], self.account
|
||||||
)
|
)
|
||||||
await tx.sign([self.account])
|
await tx.sign([self.account])
|
||||||
await self.broadcast_and_confirm(tx)
|
await self.broadcast_and_confirm(tx)
|
||||||
self.client_session = self.daemon.file_manager.source_managers['torrent'].torrent_session
|
self.client_session = self.daemon.file_manager.source_managers['torrent'].torrent_session
|
||||||
self.client_session.wait_start = False # fixme: this is super slow on tests
|
|
||||||
task = asyncio.create_task(self.add_forever())
|
|
||||||
self.addCleanup(task.cancel)
|
|
||||||
return tx, btih
|
return tx, btih
|
||||||
|
|
||||||
|
async def assert_torrent_streaming_works(self, btih):
|
||||||
|
url = f'http://{self.daemon.conf.streaming_host}:{self.streaming_port}/stream/{btih}'
|
||||||
|
if self.daemon.streaming_runner.server is None:
|
||||||
|
await self.daemon.streaming_runner.setup()
|
||||||
|
site = aiohttp.web.TCPSite(self.daemon.streaming_runner, self.daemon.conf.streaming_host,
|
||||||
|
self.streaming_port)
|
||||||
|
await site.start()
|
||||||
|
async with aiohttp_request('get', url) as req:
|
||||||
|
self.assertEqual(req.status, 206)
|
||||||
|
self.assertEqual(req.headers.get('Content-Type'), 'application/octet-stream')
|
||||||
|
content_range = req.headers.get('Content-Range')
|
||||||
|
content_length = int(req.headers.get('Content-Length'))
|
||||||
|
streamed_bytes = await req.content.read()
|
||||||
|
expected_size = self.expected_size
|
||||||
|
self.assertEqual(expected_size, len(streamed_bytes))
|
||||||
|
self.assertEqual(content_length, len(streamed_bytes))
|
||||||
|
self.assertEqual(f"bytes 0-{expected_size - 1}/{expected_size}", content_range)
|
||||||
|
|
||||||
@skipIf(TorrentSession is None, "libtorrent not installed")
|
@skipIf(TorrentSession is None, "libtorrent not installed")
|
||||||
async def test_download_torrent(self):
|
async def test_download_torrent(self):
|
||||||
tx, btih = await self.initialize_torrent()
|
tx, btih = await self.initialize_torrent(pick_a_file=False)
|
||||||
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent')))
|
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent')))
|
||||||
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
|
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
|
||||||
# second call, see its there and move on
|
# second call, see its there and move on
|
||||||
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent')))
|
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent')))
|
||||||
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
|
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
|
||||||
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, btih)
|
|
||||||
self.assertIn(btih, self.client_session._handles)
|
self.assertIn(btih, self.client_session._handles)
|
||||||
|
|
||||||
|
# stream over streaming API (full range of the largest file)
|
||||||
|
await self.assert_torrent_streaming_works(btih)
|
||||||
|
|
||||||
|
# check json encoder fields for torrent sources
|
||||||
|
file = (await self.out(self.daemon.jsonrpc_file_list()))['items'][0]
|
||||||
|
self.assertEqual(btih, file['metadata']['source']['bt_infohash'])
|
||||||
|
self.assertAlmostEqual(time.time(), file['added_on'], delta=12)
|
||||||
|
self.assertEqual("application/octet-stream", file['mime_type'])
|
||||||
|
self.assertEqual(os.path.basename(self.expected_path), file['suggested_file_name'])
|
||||||
|
self.assertEqual(os.path.basename(self.expected_path), file['stream_name'])
|
||||||
|
while not file['completed']: # improve that
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
file = (await self.out(self.daemon.jsonrpc_file_list()))['items'][0]
|
||||||
|
self.assertTrue(file['completed'])
|
||||||
|
self.assertGreater(file['total_bytes_lower_bound'], 0)
|
||||||
|
self.assertEqual(file['total_bytes_lower_bound'], file['total_bytes'])
|
||||||
|
self.assertEqual(file['total_bytes'], file['written_bytes'])
|
||||||
|
self.assertEqual('finished', file['status'])
|
||||||
|
|
||||||
|
# filter by a field which is missing on torrent
|
||||||
|
self.assertItemCount(await self.daemon.jsonrpc_file_list(stream_hash="abc"), 0)
|
||||||
|
|
||||||
tx, new_btih = await self.initialize_torrent(tx)
|
tx, new_btih = await self.initialize_torrent(tx)
|
||||||
self.assertNotEqual(btih, new_btih)
|
self.assertNotEqual(btih, new_btih)
|
||||||
# claim now points to another torrent, update to it
|
# claim now points to another torrent, update to it
|
||||||
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent')))
|
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent')))
|
||||||
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih)
|
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih)
|
||||||
|
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
|
||||||
|
|
||||||
|
# restart and verify that only one updated stream was recovered
|
||||||
|
self.daemon.file_manager.stop()
|
||||||
|
await self.daemon.file_manager.start()
|
||||||
|
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih)
|
||||||
|
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
|
||||||
|
# check it was saved properly, once
|
||||||
|
self.assertEqual(1, len(await self.daemon.storage.get_all_torrent_files()))
|
||||||
|
|
||||||
self.assertIn(new_btih, self.client_session._handles)
|
self.assertIn(new_btih, self.client_session._handles)
|
||||||
self.assertNotIn(btih, self.client_session._handles)
|
self.assertNotIn(btih, self.client_session._handles)
|
||||||
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
|
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
|
||||||
|
@ -73,6 +126,11 @@ class FileCommands(CommandTestCase):
|
||||||
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 0)
|
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 0)
|
||||||
self.assertNotIn(new_btih, self.client_session._handles)
|
self.assertNotIn(new_btih, self.client_session._handles)
|
||||||
|
|
||||||
|
await self.initialize_torrent(name='torrent2')
|
||||||
|
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent2')))
|
||||||
|
file = (await self.out(self.daemon.jsonrpc_file_list()))['items'][0]
|
||||||
|
self.assertEqual(os.path.basename(self.expected_path), file['stream_name'])
|
||||||
|
|
||||||
async def create_streams_in_range(self, *args, **kwargs):
|
async def create_streams_in_range(self, *args, **kwargs):
|
||||||
self.stream_claim_ids = []
|
self.stream_claim_ids = []
|
||||||
for i in range(*args, **kwargs):
|
for i in range(*args, **kwargs):
|
||||||
|
@ -335,12 +393,12 @@ class FileCommands(CommandTestCase):
|
||||||
await self.server.blob_manager.delete_blobs(all_except_sd)
|
await self.server.blob_manager.delete_blobs(all_except_sd)
|
||||||
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True)
|
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True)
|
||||||
self.assertIn('error', resp)
|
self.assertIn('error', resp)
|
||||||
self.assertEqual('Failed to download data blobs for sd hash %s within timeout.' % sd_hash, resp['error'])
|
self.assertEqual('Failed to download data blobs for %s within timeout.' % sd_hash, resp['error'])
|
||||||
self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='foo'), "data timeout didn't create a file")
|
self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='foo'), "data timeout didn't create a file")
|
||||||
await self.server.blob_manager.delete_blobs([sd_hash])
|
await self.server.blob_manager.delete_blobs([sd_hash])
|
||||||
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True)
|
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True)
|
||||||
self.assertIn('error', resp)
|
self.assertIn('error', resp)
|
||||||
self.assertEqual('Failed to download sd blob %s within timeout.' % sd_hash, resp['error'])
|
self.assertEqual('Failed to download metadata for %s within timeout.' % sd_hash, resp['error'])
|
||||||
|
|
||||||
async def wait_files_to_complete(self):
|
async def wait_files_to_complete(self):
|
||||||
while await self.file_list(status='running'):
|
while await self.file_list(status='running'):
|
||||||
|
@ -354,7 +412,7 @@ class FileCommands(CommandTestCase):
|
||||||
await self.daemon.jsonrpc_get('lbry://foo')
|
await self.daemon.jsonrpc_get('lbry://foo')
|
||||||
with open(original_path, 'wb') as handle:
|
with open(original_path, 'wb') as handle:
|
||||||
handle.write(b'some other stuff was there instead')
|
handle.write(b'some other stuff was there instead')
|
||||||
await self.daemon.file_manager.stop()
|
self.daemon.file_manager.stop()
|
||||||
await self.daemon.file_manager.start()
|
await self.daemon.file_manager.start()
|
||||||
await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) # if this hangs, file didn't get set completed
|
await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) # if this hangs, file didn't get set completed
|
||||||
# check that internal state got through up to the file list API
|
# check that internal state got through up to the file list API
|
||||||
|
@ -382,7 +440,8 @@ class FileCommands(CommandTestCase):
|
||||||
resp = await self.out(self.daemon.jsonrpc_get('lbry://foo', timeout=2))
|
resp = await self.out(self.daemon.jsonrpc_get('lbry://foo', timeout=2))
|
||||||
self.assertNotIn('error', resp)
|
self.assertNotIn('error', resp)
|
||||||
self.assertTrue(os.path.isfile(path))
|
self.assertTrue(os.path.isfile(path))
|
||||||
await self.daemon.file_manager.stop()
|
self.daemon.file_manager.stop()
|
||||||
|
await asyncio.sleep(0.01) # FIXME: this sleep should not be needed
|
||||||
self.assertFalse(os.path.isfile(path))
|
self.assertFalse(os.path.isfile(path))
|
||||||
|
|
||||||
async def test_incomplete_downloads_retry(self):
|
async def test_incomplete_downloads_retry(self):
|
||||||
|
@ -477,7 +536,7 @@ class FileCommands(CommandTestCase):
|
||||||
|
|
||||||
# restart the daemon and make sure the fee is still there
|
# restart the daemon and make sure the fee is still there
|
||||||
|
|
||||||
await self.daemon.file_manager.stop()
|
self.daemon.file_manager.stop()
|
||||||
await self.daemon.file_manager.start()
|
await self.daemon.file_manager.start()
|
||||||
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
|
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
|
||||||
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].content_fee.raw, raw_content_fee)
|
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].content_fee.raw, raw_content_fee)
|
||||||
|
|
|
@ -3,9 +3,7 @@ import hashlib
|
||||||
import aiohttp
|
import aiohttp
|
||||||
import aiohttp.web
|
import aiohttp.web
|
||||||
import asyncio
|
import asyncio
|
||||||
import contextlib
|
|
||||||
|
|
||||||
from lbry.file.source import ManagedDownloadSource
|
|
||||||
from lbry.utils import aiohttp_request
|
from lbry.utils import aiohttp_request
|
||||||
from lbry.blob.blob_file import MAX_BLOB_SIZE
|
from lbry.blob.blob_file import MAX_BLOB_SIZE
|
||||||
from lbry.testcase import CommandTestCase
|
from lbry.testcase import CommandTestCase
|
||||||
|
@ -23,7 +21,7 @@ def get_random_bytes(n: int) -> bytes:
|
||||||
|
|
||||||
class RangeRequests(CommandTestCase):
|
class RangeRequests(CommandTestCase):
|
||||||
async def _restart_stream_manager(self):
|
async def _restart_stream_manager(self):
|
||||||
await self.daemon.file_manager.stop()
|
self.daemon.file_manager.stop()
|
||||||
await self.daemon.file_manager.start()
|
await self.daemon.file_manager.start()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -354,21 +352,14 @@ class RangeRequests(CommandTestCase):
|
||||||
path = stream.full_path
|
path = stream.full_path
|
||||||
self.assertIsNotNone(path)
|
self.assertIsNotNone(path)
|
||||||
if wait_for_start_writing:
|
if wait_for_start_writing:
|
||||||
with contextlib.suppress(asyncio.CancelledError):
|
await stream.started_writing.wait()
|
||||||
await stream.started_writing.wait()
|
|
||||||
self.assertTrue(os.path.isfile(path))
|
self.assertTrue(os.path.isfile(path))
|
||||||
await self.daemon.file_manager.stop()
|
await self._restart_stream_manager()
|
||||||
# while stopped, we get no response to query and no file is present
|
|
||||||
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'], [])
|
|
||||||
self.assertEqual(os.path.isfile(path), stream.status == ManagedDownloadSource.STATUS_FINISHED)
|
|
||||||
await self.daemon.file_manager.start()
|
|
||||||
# after restart, we get a response to query and same file path
|
|
||||||
stream = (await self.daemon.jsonrpc_file_list())['items'][0]
|
stream = (await self.daemon.jsonrpc_file_list())['items'][0]
|
||||||
self.assertIsNotNone(stream.full_path)
|
self.assertIsNotNone(stream.full_path)
|
||||||
self.assertEqual(stream.full_path, path)
|
self.assertFalse(os.path.isfile(path))
|
||||||
if wait_for_start_writing:
|
if wait_for_start_writing:
|
||||||
with contextlib.suppress(asyncio.CancelledError):
|
await stream.started_writing.wait()
|
||||||
await stream.started_writing.wait()
|
|
||||||
self.assertTrue(os.path.isfile(path))
|
self.assertTrue(os.path.isfile(path))
|
||||||
|
|
||||||
async def test_file_save_stop_before_finished_streaming_only_wait_for_start(self):
|
async def test_file_save_stop_before_finished_streaming_only_wait_for_start(self):
|
||||||
|
|
|
@ -11,7 +11,7 @@ from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase
|
||||||
from lbry.testcase import get_fake_exchange_rate_manager
|
from lbry.testcase import get_fake_exchange_rate_manager
|
||||||
from lbry.utils import generate_id
|
from lbry.utils import generate_id
|
||||||
from lbry.error import InsufficientFundsError
|
from lbry.error import InsufficientFundsError
|
||||||
from lbry.error import KeyFeeAboveMaxAllowedError, ResolveError, DownloadSDTimeoutError, DownloadDataTimeoutError
|
from lbry.error import KeyFeeAboveMaxAllowedError, ResolveError, DownloadMetadataTimeoutError, DownloadDataTimeoutError
|
||||||
from lbry.wallet import WalletManager, Wallet, Ledger, Transaction, Input, Output, Database
|
from lbry.wallet import WalletManager, Wallet, Ledger, Transaction, Input, Output, Database
|
||||||
from lbry.wallet.constants import CENT, NULL_HASH32
|
from lbry.wallet.constants import CENT, NULL_HASH32
|
||||||
from lbry.wallet.network import ClientSession
|
from lbry.wallet.network import ClientSession
|
||||||
|
@ -229,10 +229,10 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
self.assertFalse(event['properties']['added_fixed_peers'])
|
self.assertFalse(event['properties']['added_fixed_peers'])
|
||||||
self.assertEqual(event['properties']['connection_failures_count'], 1)
|
self.assertEqual(event['properties']['connection_failures_count'], 1)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout.'
|
event['properties']['error_message'], f'Failed to download metadata for {self.sd_hash} within timeout.'
|
||||||
)
|
)
|
||||||
|
|
||||||
await self._test_time_to_first_bytes(check_post, DownloadSDTimeoutError, after_setup=after_setup)
|
await self._test_time_to_first_bytes(check_post, DownloadMetadataTimeoutError, after_setup=after_setup)
|
||||||
|
|
||||||
async def test_override_fixed_peer_delay_dht_disabled(self):
|
async def test_override_fixed_peer_delay_dht_disabled(self):
|
||||||
self.client_config.fixed_peers = [(self.server_from_client.address, self.server_from_client.tcp_port)]
|
self.client_config.fixed_peers = [(self.server_from_client.address, self.server_from_client.tcp_port)]
|
||||||
|
@ -266,18 +266,18 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
|
|
||||||
def check_post(event):
|
def check_post(event):
|
||||||
self.assertEqual(event['event'], 'Time To First Bytes')
|
self.assertEqual(event['event'], 'Time To First Bytes')
|
||||||
self.assertEqual(event['properties']['error'], 'DownloadSDTimeoutError')
|
self.assertEqual(event['properties']['error'], 'DownloadMetadataTimeoutError')
|
||||||
self.assertEqual(event['properties']['tried_peers_count'], 0)
|
self.assertEqual(event['properties']['tried_peers_count'], 0)
|
||||||
self.assertEqual(event['properties']['active_peer_count'], 0)
|
self.assertEqual(event['properties']['active_peer_count'], 0)
|
||||||
self.assertFalse(event['properties']['use_fixed_peers'])
|
self.assertFalse(event['properties']['use_fixed_peers'])
|
||||||
self.assertFalse(event['properties']['added_fixed_peers'])
|
self.assertFalse(event['properties']['added_fixed_peers'])
|
||||||
self.assertIsNone(event['properties']['fixed_peer_delay'])
|
self.assertIsNone(event['properties']['fixed_peer_delay'])
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout.'
|
event['properties']['error_message'], f'Failed to download metadata for {self.sd_hash} within timeout.'
|
||||||
)
|
)
|
||||||
|
|
||||||
start = self.loop.time()
|
start = self.loop.time()
|
||||||
await self._test_time_to_first_bytes(check_post, DownloadSDTimeoutError)
|
await self._test_time_to_first_bytes(check_post, DownloadMetadataTimeoutError)
|
||||||
duration = self.loop.time() - start
|
duration = self.loop.time() - start
|
||||||
self.assertLessEqual(duration, 5)
|
self.assertLessEqual(duration, 5)
|
||||||
self.assertGreaterEqual(duration, 3.0)
|
self.assertGreaterEqual(duration, 3.0)
|
||||||
|
@ -387,7 +387,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
self.server.stop_server()
|
self.server.stop_server()
|
||||||
await self.setup_stream_manager()
|
await self.setup_stream_manager()
|
||||||
await self._test_download_error_analytics_on_start(
|
await self._test_download_error_analytics_on_start(
|
||||||
DownloadSDTimeoutError, f'Failed to download sd blob {self.sd_hash} within timeout.', timeout=1
|
DownloadMetadataTimeoutError, f'Failed to download metadata for {self.sd_hash} within timeout.', timeout=1
|
||||||
)
|
)
|
||||||
|
|
||||||
async def test_download_data_timeout(self):
|
async def test_download_data_timeout(self):
|
||||||
|
@ -396,7 +396,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
head_blob_hash = json.loads(sdf.read())['blobs'][0]['blob_hash']
|
head_blob_hash = json.loads(sdf.read())['blobs'][0]['blob_hash']
|
||||||
self.server_blob_manager.delete_blob(head_blob_hash)
|
self.server_blob_manager.delete_blob(head_blob_hash)
|
||||||
await self._test_download_error_analytics_on_start(
|
await self._test_download_error_analytics_on_start(
|
||||||
DownloadDataTimeoutError, f'Failed to download data blobs for sd hash {self.sd_hash} within timeout.', timeout=1
|
DownloadDataTimeoutError, f'Failed to download data blobs for {self.sd_hash} within timeout.', timeout=1
|
||||||
)
|
)
|
||||||
|
|
||||||
async def test_unexpected_error(self):
|
async def test_unexpected_error(self):
|
||||||
|
@ -424,7 +424,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
self.assertIsNone(stream.full_path)
|
self.assertIsNone(stream.full_path)
|
||||||
self.assertEqual(0, stream.written_bytes)
|
self.assertEqual(0, stream.written_bytes)
|
||||||
|
|
||||||
await self.stream_manager.stop()
|
self.stream_manager.stop()
|
||||||
await self.stream_manager.start()
|
await self.stream_manager.start()
|
||||||
self.assertEqual(1, len(self.stream_manager.streams))
|
self.assertEqual(1, len(self.stream_manager.streams))
|
||||||
stream = list(self.stream_manager.streams.values())[0]
|
stream = list(self.stream_manager.streams.values())[0]
|
||||||
|
@ -449,7 +449,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
|
stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
|
||||||
await stream.finished_writing.wait()
|
await stream.finished_writing.wait()
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
await self.stream_manager.stop()
|
self.stream_manager.stop()
|
||||||
self.client_blob_manager.stop()
|
self.client_blob_manager.stop()
|
||||||
# partial removal, only sd blob is missing.
|
# partial removal, only sd blob is missing.
|
||||||
# in this case, we recover the sd blob while the other blobs are kept untouched as 'finished'
|
# in this case, we recover the sd blob while the other blobs are kept untouched as 'finished'
|
||||||
|
|
Loading…
Add table
Reference in a new issue