2019-01-22 18:54:17 +01:00
|
|
|
import os
|
|
|
|
import asyncio
|
|
|
|
import typing
|
|
|
|
import logging
|
2019-01-31 20:32:08 +01:00
|
|
|
import binascii
|
2019-05-01 23:09:50 +02:00
|
|
|
from aiohttp.web import Request, StreamResponse
|
2019-03-01 20:48:49 +01:00
|
|
|
from lbrynet.utils import generate_id
|
2019-05-06 01:41:35 +02:00
|
|
|
from lbrynet.error import DownloadSDTimeout
|
2019-03-24 21:55:04 +01:00
|
|
|
from lbrynet.schema.mime_types import guess_media_type
|
2019-01-22 18:54:17 +01:00
|
|
|
from lbrynet.stream.downloader import StreamDownloader
|
2019-01-23 23:20:16 +01:00
|
|
|
from lbrynet.stream.descriptor import StreamDescriptor
|
2019-01-25 21:05:22 +01:00
|
|
|
from lbrynet.stream.reflector.client import StreamReflectorClient
|
2019-01-31 20:32:08 +01:00
|
|
|
from lbrynet.extras.daemon.storage import StoredStreamClaim
|
2019-01-22 18:54:17 +01:00
|
|
|
if typing.TYPE_CHECKING:
|
2019-03-31 03:07:43 +02:00
|
|
|
from lbrynet.conf import Config
|
2019-03-20 07:00:03 +01:00
|
|
|
from lbrynet.schema.claim import Claim
|
2019-03-28 19:51:55 +01:00
|
|
|
from lbrynet.blob.blob_manager import BlobManager
|
2019-03-31 03:07:43 +02:00
|
|
|
from lbrynet.blob.blob_info import BlobInfo
|
2019-02-01 21:46:31 +01:00
|
|
|
from lbrynet.dht.node import Node
|
2019-03-31 19:42:27 +02:00
|
|
|
from lbrynet.extras.daemon.analytics import AnalyticsManager
|
2019-04-05 05:10:18 +02:00
|
|
|
from lbrynet.wallet.transaction import Transaction
|
2019-01-22 18:54:17 +01:00
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
2019-03-31 03:07:43 +02:00
|
|
|
def _get_next_available_file_name(download_directory: str, file_name: str) -> str:
|
|
|
|
base_name, ext = os.path.splitext(os.path.basename(file_name))
|
|
|
|
i = 0
|
|
|
|
while os.path.isfile(os.path.join(download_directory, file_name)):
|
|
|
|
i += 1
|
|
|
|
file_name = "%s_%i%s" % (base_name, i, ext)
|
|
|
|
|
|
|
|
return file_name
|
|
|
|
|
|
|
|
|
|
|
|
async def get_next_available_file_name(loop: asyncio.BaseEventLoop, download_directory: str, file_name: str) -> str:
|
|
|
|
return await loop.run_in_executor(None, _get_next_available_file_name, download_directory, file_name)
|
|
|
|
|
|
|
|
|
2019-01-22 18:54:17 +01:00
|
|
|
class ManagedStream:
|
|
|
|
STATUS_RUNNING = "running"
|
|
|
|
STATUS_STOPPED = "stopped"
|
|
|
|
STATUS_FINISHED = "finished"
|
|
|
|
|
2019-05-01 23:09:50 +02:00
|
|
|
__slots__ = [
|
|
|
|
'loop',
|
|
|
|
'config',
|
|
|
|
'blob_manager',
|
|
|
|
'sd_hash',
|
|
|
|
'download_directory',
|
|
|
|
'_file_name',
|
|
|
|
'_status',
|
|
|
|
'stream_claim_info',
|
|
|
|
'download_id',
|
|
|
|
'rowid',
|
|
|
|
'written_bytes',
|
|
|
|
'content_fee',
|
|
|
|
'downloader',
|
|
|
|
'analytics_manager',
|
|
|
|
'fully_reflected',
|
|
|
|
'file_output_task',
|
|
|
|
'delayed_stop_task',
|
|
|
|
'streaming_responses',
|
|
|
|
'streaming',
|
|
|
|
'_running',
|
|
|
|
'saving',
|
|
|
|
'finished_writing',
|
|
|
|
'started_writing',
|
2019-05-10 20:50:01 +02:00
|
|
|
'finished_write_attempt'
|
2019-05-01 23:09:50 +02:00
|
|
|
]
|
|
|
|
|
2019-03-31 03:07:43 +02:00
|
|
|
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager',
|
|
|
|
sd_hash: str, download_directory: typing.Optional[str] = None, file_name: typing.Optional[str] = None,
|
2019-03-11 02:55:33 +01:00
|
|
|
status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None,
|
2019-03-31 03:07:43 +02:00
|
|
|
download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None,
|
2019-03-31 19:42:27 +02:00
|
|
|
descriptor: typing.Optional[StreamDescriptor] = None,
|
2019-04-05 05:10:18 +02:00
|
|
|
content_fee: typing.Optional['Transaction'] = None,
|
2019-03-31 19:42:27 +02:00
|
|
|
analytics_manager: typing.Optional['AnalyticsManager'] = None):
|
2019-01-22 18:54:17 +01:00
|
|
|
self.loop = loop
|
2019-03-31 03:07:43 +02:00
|
|
|
self.config = config
|
2019-01-22 18:54:17 +01:00
|
|
|
self.blob_manager = blob_manager
|
2019-03-31 03:07:43 +02:00
|
|
|
self.sd_hash = sd_hash
|
2019-01-22 18:54:17 +01:00
|
|
|
self.download_directory = download_directory
|
2019-02-14 00:41:26 +01:00
|
|
|
self._file_name = file_name
|
2019-01-22 18:54:17 +01:00
|
|
|
self._status = status
|
2019-03-31 03:07:43 +02:00
|
|
|
self.stream_claim_info = claim
|
2019-03-11 02:55:33 +01:00
|
|
|
self.download_id = download_id or binascii.hexlify(generate_id()).decode()
|
2019-03-31 03:07:43 +02:00
|
|
|
self.rowid = rowid
|
|
|
|
self.written_bytes = 0
|
2019-04-05 05:10:18 +02:00
|
|
|
self.content_fee = content_fee
|
2019-03-31 03:07:43 +02:00
|
|
|
self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor)
|
2019-03-31 19:42:27 +02:00
|
|
|
self.analytics_manager = analytics_manager
|
2019-05-01 23:09:50 +02:00
|
|
|
|
2019-03-31 03:07:43 +02:00
|
|
|
self.fully_reflected = asyncio.Event(loop=self.loop)
|
|
|
|
self.file_output_task: typing.Optional[asyncio.Task] = None
|
2019-05-01 23:09:50 +02:00
|
|
|
self.delayed_stop_task: typing.Optional[asyncio.Task] = None
|
2019-05-02 22:56:29 +02:00
|
|
|
self.streaming_responses: typing.List[typing.Tuple[Request, StreamResponse]] = []
|
2019-05-01 23:09:50 +02:00
|
|
|
self.streaming = asyncio.Event(loop=self.loop)
|
|
|
|
self._running = asyncio.Event(loop=self.loop)
|
2019-03-31 03:07:43 +02:00
|
|
|
self.saving = asyncio.Event(loop=self.loop)
|
|
|
|
self.finished_writing = asyncio.Event(loop=self.loop)
|
2019-03-31 19:42:27 +02:00
|
|
|
self.started_writing = asyncio.Event(loop=self.loop)
|
2019-05-10 20:50:01 +02:00
|
|
|
self.finished_write_attempt = asyncio.Event(loop=self.loop)
|
2019-03-31 03:07:43 +02:00
|
|
|
|
|
|
|
@property
|
|
|
|
def descriptor(self) -> StreamDescriptor:
|
|
|
|
return self.downloader.descriptor
|
|
|
|
|
|
|
|
@property
|
|
|
|
def stream_hash(self) -> str:
|
|
|
|
return self.descriptor.stream_hash
|
2019-01-22 18:54:17 +01:00
|
|
|
|
2019-02-14 00:41:26 +01:00
|
|
|
@property
|
2019-02-20 01:44:31 +01:00
|
|
|
def file_name(self) -> typing.Optional[str]:
|
2019-03-31 03:07:43 +02:00
|
|
|
return self._file_name or (self.descriptor.suggested_file_name if self.descriptor else None)
|
2019-02-14 00:41:26 +01:00
|
|
|
|
2019-01-22 18:54:17 +01:00
|
|
|
@property
|
|
|
|
def status(self) -> str:
|
|
|
|
return self._status
|
|
|
|
|
2019-05-01 23:09:50 +02:00
|
|
|
async def update_status(self, status: str):
|
2019-01-22 18:54:17 +01:00
|
|
|
assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED]
|
|
|
|
self._status = status
|
2019-05-01 23:09:50 +02:00
|
|
|
await self.blob_manager.storage.change_file_status(self.stream_hash, status)
|
2019-01-22 18:54:17 +01:00
|
|
|
|
|
|
|
@property
|
|
|
|
def finished(self) -> bool:
|
|
|
|
return self.status == self.STATUS_FINISHED
|
|
|
|
|
|
|
|
@property
|
|
|
|
def running(self) -> bool:
|
|
|
|
return self.status == self.STATUS_RUNNING
|
|
|
|
|
|
|
|
@property
|
|
|
|
def claim_id(self) -> typing.Optional[str]:
|
|
|
|
return None if not self.stream_claim_info else self.stream_claim_info.claim_id
|
|
|
|
|
|
|
|
@property
|
|
|
|
def txid(self) -> typing.Optional[str]:
|
|
|
|
return None if not self.stream_claim_info else self.stream_claim_info.txid
|
|
|
|
|
|
|
|
@property
|
|
|
|
def nout(self) -> typing.Optional[int]:
|
|
|
|
return None if not self.stream_claim_info else self.stream_claim_info.nout
|
|
|
|
|
|
|
|
@property
|
|
|
|
def outpoint(self) -> typing.Optional[str]:
|
|
|
|
return None if not self.stream_claim_info else self.stream_claim_info.outpoint
|
|
|
|
|
|
|
|
@property
|
|
|
|
def claim_height(self) -> typing.Optional[int]:
|
|
|
|
return None if not self.stream_claim_info else self.stream_claim_info.height
|
|
|
|
|
|
|
|
@property
|
|
|
|
def channel_claim_id(self) -> typing.Optional[str]:
|
|
|
|
return None if not self.stream_claim_info else self.stream_claim_info.channel_claim_id
|
|
|
|
|
|
|
|
@property
|
|
|
|
def channel_name(self) -> typing.Optional[str]:
|
|
|
|
return None if not self.stream_claim_info else self.stream_claim_info.channel_name
|
|
|
|
|
|
|
|
@property
|
|
|
|
def claim_name(self) -> typing.Optional[str]:
|
|
|
|
return None if not self.stream_claim_info else self.stream_claim_info.claim_name
|
|
|
|
|
|
|
|
@property
|
2019-04-05 05:10:18 +02:00
|
|
|
def metadata(self) -> typing.Optional[typing.Dict]:
|
2019-03-22 07:18:34 +01:00
|
|
|
return None if not self.stream_claim_info else self.stream_claim_info.claim.stream.to_dict()
|
2019-01-22 18:54:17 +01:00
|
|
|
|
2019-04-21 19:51:10 +02:00
|
|
|
@property
|
|
|
|
def metadata_protobuf(self) -> bytes:
|
|
|
|
if self.stream_claim_info:
|
|
|
|
return binascii.hexlify(self.stream_claim_info.claim.to_bytes())
|
|
|
|
|
2019-01-22 18:54:17 +01:00
|
|
|
@property
|
|
|
|
def blobs_completed(self) -> int:
|
2019-04-18 21:19:06 +02:00
|
|
|
return sum([1 if self.blob_manager.is_blob_verified(b.blob_hash) else 0
|
2019-01-22 18:54:17 +01:00
|
|
|
for b in self.descriptor.blobs[:-1]])
|
|
|
|
|
|
|
|
@property
|
|
|
|
def blobs_in_stream(self) -> int:
|
|
|
|
return len(self.descriptor.blobs) - 1
|
|
|
|
|
2019-02-01 22:17:10 +01:00
|
|
|
@property
|
|
|
|
def blobs_remaining(self) -> int:
|
|
|
|
return self.blobs_in_stream - self.blobs_completed
|
|
|
|
|
2019-02-06 15:29:19 +01:00
|
|
|
@property
|
2019-02-20 01:44:31 +01:00
|
|
|
def full_path(self) -> typing.Optional[str]:
|
2019-03-31 03:07:43 +02:00
|
|
|
return os.path.join(self.download_directory, os.path.basename(self.file_name)) \
|
|
|
|
if self.file_name and self.download_directory else None
|
2019-02-20 01:44:31 +01:00
|
|
|
|
|
|
|
@property
|
|
|
|
def output_file_exists(self):
|
|
|
|
return os.path.isfile(self.full_path) if self.full_path else False
|
2019-02-06 15:29:19 +01:00
|
|
|
|
2019-03-31 03:07:43 +02:00
|
|
|
@property
|
|
|
|
def mime_type(self):
|
2019-04-05 05:10:18 +02:00
|
|
|
return guess_media_type(os.path.basename(self.descriptor.suggested_file_name))[0]
|
2019-03-31 03:07:43 +02:00
|
|
|
|
2019-01-22 18:54:17 +01:00
|
|
|
def as_dict(self) -> typing.Dict:
|
2019-05-08 20:10:22 +02:00
|
|
|
full_path = self.full_path
|
|
|
|
file_name = self.file_name
|
|
|
|
download_directory = self.download_directory
|
|
|
|
if self.full_path and self.output_file_exists:
|
|
|
|
if self.written_bytes:
|
|
|
|
written_bytes = self.written_bytes
|
|
|
|
else:
|
|
|
|
written_bytes = os.stat(self.full_path).st_size
|
2019-01-25 15:50:01 +01:00
|
|
|
else:
|
2019-05-08 20:10:22 +02:00
|
|
|
full_path = None
|
|
|
|
file_name = None
|
|
|
|
download_directory = None
|
|
|
|
written_bytes = None
|
2019-01-22 18:54:17 +01:00
|
|
|
return {
|
2019-05-14 01:48:19 +02:00
|
|
|
'streaming_url': f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}",
|
2019-05-08 20:10:22 +02:00
|
|
|
'completed': (self.output_file_exists and self.status in ('stopped', 'finished')) or all(
|
|
|
|
self.blob_manager.is_blob_verified(b.blob_hash) for b in self.descriptor.blobs[:-1]),
|
|
|
|
'file_name': file_name,
|
|
|
|
'download_directory': download_directory,
|
2019-01-22 18:54:17 +01:00
|
|
|
'points_paid': 0.0,
|
|
|
|
'stopped': not self.running,
|
|
|
|
'stream_hash': self.stream_hash,
|
|
|
|
'stream_name': self.descriptor.stream_name,
|
|
|
|
'suggested_file_name': self.descriptor.suggested_file_name,
|
|
|
|
'sd_hash': self.descriptor.sd_hash,
|
2019-05-08 20:10:22 +02:00
|
|
|
'download_path': full_path,
|
2019-03-31 03:07:43 +02:00
|
|
|
'mime_type': self.mime_type,
|
2019-01-22 18:54:17 +01:00
|
|
|
'key': self.descriptor.key,
|
|
|
|
'total_bytes_lower_bound': self.descriptor.lower_bound_decrypted_length(),
|
|
|
|
'total_bytes': self.descriptor.upper_bound_decrypted_length(),
|
2019-01-25 15:50:01 +01:00
|
|
|
'written_bytes': written_bytes,
|
2019-01-22 18:54:17 +01:00
|
|
|
'blobs_completed': self.blobs_completed,
|
|
|
|
'blobs_in_stream': self.blobs_in_stream,
|
2019-02-01 22:17:10 +01:00
|
|
|
'blobs_remaining': self.blobs_remaining,
|
2019-01-22 18:54:17 +01:00
|
|
|
'status': self.status,
|
|
|
|
'claim_id': self.claim_id,
|
|
|
|
'txid': self.txid,
|
|
|
|
'nout': self.nout,
|
|
|
|
'outpoint': self.outpoint,
|
|
|
|
'metadata': self.metadata,
|
2019-04-21 19:51:10 +02:00
|
|
|
'protobuf': self.metadata_protobuf,
|
2019-01-22 18:54:17 +01:00
|
|
|
'channel_claim_id': self.channel_claim_id,
|
|
|
|
'channel_name': self.channel_name,
|
2019-04-05 05:10:18 +02:00
|
|
|
'claim_name': self.claim_name,
|
2019-05-08 20:10:22 +02:00
|
|
|
'content_fee': self.content_fee
|
2019-01-22 18:54:17 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@classmethod
|
2019-03-31 03:07:43 +02:00
|
|
|
async def create(cls, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager',
|
2019-01-25 21:05:22 +01:00
|
|
|
file_path: str, key: typing.Optional[bytes] = None,
|
|
|
|
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedStream':
|
2019-01-22 18:54:17 +01:00
|
|
|
descriptor = await StreamDescriptor.create_stream(
|
2019-04-15 22:14:19 +02:00
|
|
|
loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator,
|
|
|
|
blob_completed_callback=blob_manager.blob_completed
|
2019-01-22 18:54:17 +01:00
|
|
|
)
|
|
|
|
await blob_manager.storage.store_stream(
|
|
|
|
blob_manager.get_blob(descriptor.sd_hash), descriptor
|
|
|
|
)
|
2019-02-15 22:44:31 +01:00
|
|
|
row_id = await blob_manager.storage.save_published_file(descriptor.stream_hash, os.path.basename(file_path),
|
|
|
|
os.path.dirname(file_path), 0)
|
2019-03-31 03:07:43 +02:00
|
|
|
return cls(loop, config, blob_manager, descriptor.sd_hash, os.path.dirname(file_path),
|
|
|
|
os.path.basename(file_path), status=cls.STATUS_FINISHED, rowid=row_id, descriptor=descriptor)
|
|
|
|
|
2019-05-01 23:09:50 +02:00
|
|
|
async def start(self, node: typing.Optional['Node'] = None, timeout: typing.Optional[float] = None,
|
|
|
|
save_now: bool = False):
|
|
|
|
timeout = timeout or self.config.download_timeout
|
|
|
|
if self._running.is_set():
|
|
|
|
return
|
2019-05-06 02:22:10 +02:00
|
|
|
log.info("start downloader for stream (sd hash: %s)", self.sd_hash)
|
2019-05-01 23:09:50 +02:00
|
|
|
self._running.set()
|
|
|
|
try:
|
|
|
|
await asyncio.wait_for(self.downloader.start(node), timeout, loop=self.loop)
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
self._running.clear()
|
2019-05-06 01:41:35 +02:00
|
|
|
raise DownloadSDTimeout(self.sd_hash)
|
2019-05-01 23:09:50 +02:00
|
|
|
|
|
|
|
if self.delayed_stop_task and not self.delayed_stop_task.done():
|
|
|
|
self.delayed_stop_task.cancel()
|
|
|
|
self.delayed_stop_task = self.loop.create_task(self._delayed_stop())
|
|
|
|
if not await self.blob_manager.storage.file_exists(self.sd_hash):
|
|
|
|
if save_now:
|
|
|
|
file_name, download_dir = self._file_name, self.download_directory
|
|
|
|
else:
|
|
|
|
file_name, download_dir = None, None
|
|
|
|
self.rowid = await self.blob_manager.storage.save_downloaded_file(
|
|
|
|
self.stream_hash, file_name, download_dir, 0.0
|
|
|
|
)
|
|
|
|
if self.status != self.STATUS_RUNNING:
|
|
|
|
await self.update_status(self.STATUS_RUNNING)
|
2019-03-31 03:07:43 +02:00
|
|
|
|
2019-05-01 23:09:50 +02:00
|
|
|
async def stop(self, finished: bool = False):
|
|
|
|
"""
|
|
|
|
Stop any running save/stream tasks as well as the downloader and update the status in the database
|
|
|
|
"""
|
2019-03-31 03:07:43 +02:00
|
|
|
|
2019-05-01 23:09:50 +02:00
|
|
|
self.stop_tasks()
|
|
|
|
if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING:
|
|
|
|
await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED)
|
2019-03-31 03:07:43 +02:00
|
|
|
|
2019-05-06 02:22:10 +02:00
|
|
|
async def _aiter_read_stream(self, start_blob_num: typing.Optional[int] = 0, connection_id: int = 0)\
|
2019-05-01 23:09:50 +02:00
|
|
|
-> typing.AsyncIterator[typing.Tuple['BlobInfo', bytes]]:
|
2019-03-31 03:07:43 +02:00
|
|
|
if start_blob_num >= len(self.descriptor.blobs[:-1]):
|
|
|
|
raise IndexError(start_blob_num)
|
|
|
|
for i, blob_info in enumerate(self.descriptor.blobs[start_blob_num:-1]):
|
|
|
|
assert i + start_blob_num == blob_info.blob_num
|
2019-05-06 02:22:10 +02:00
|
|
|
decrypted = await self.downloader.read_blob(blob_info, connection_id)
|
2019-05-01 23:09:50 +02:00
|
|
|
yield (blob_info, decrypted)
|
|
|
|
|
|
|
|
async def stream_file(self, request: Request, node: typing.Optional['Node'] = None) -> StreamResponse:
|
2019-05-02 22:55:53 +02:00
|
|
|
log.info("stream file to browser for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id,
|
|
|
|
self.sd_hash[:6])
|
2019-05-01 23:09:50 +02:00
|
|
|
await self.start(node)
|
|
|
|
headers, size, skip_blobs = self._prepare_range_response_headers(request.headers.get('range', 'bytes=0-'))
|
|
|
|
response = StreamResponse(
|
|
|
|
status=206,
|
|
|
|
headers=headers
|
|
|
|
)
|
|
|
|
await response.prepare(request)
|
2019-05-02 22:56:29 +02:00
|
|
|
self.streaming_responses.append((request, response))
|
2019-05-01 23:09:50 +02:00
|
|
|
self.streaming.set()
|
|
|
|
try:
|
|
|
|
wrote = 0
|
2019-05-06 02:22:10 +02:00
|
|
|
async for blob_info, decrypted in self._aiter_read_stream(skip_blobs, connection_id=2):
|
2019-05-01 23:09:50 +02:00
|
|
|
if (blob_info.blob_num == len(self.descriptor.blobs) - 2) or (len(decrypted) + wrote >= size):
|
2019-05-04 02:38:33 +02:00
|
|
|
decrypted += (b'\x00' * (size - len(decrypted) - wrote - (skip_blobs * 2097151)))
|
2019-05-01 23:09:50 +02:00
|
|
|
await response.write_eof(decrypted)
|
|
|
|
else:
|
|
|
|
await response.write(decrypted)
|
|
|
|
wrote += len(decrypted)
|
2019-05-02 22:55:53 +02:00
|
|
|
log.info("sent browser %sblob %i/%i", "(final) " if response._eof_sent else "",
|
2019-05-01 23:09:50 +02:00
|
|
|
blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
|
|
|
|
if response._eof_sent:
|
|
|
|
break
|
|
|
|
return response
|
|
|
|
finally:
|
|
|
|
response.force_close()
|
2019-05-02 22:56:29 +02:00
|
|
|
if (request, response) in self.streaming_responses:
|
|
|
|
self.streaming_responses.remove((request, response))
|
|
|
|
if not self.streaming_responses:
|
2019-05-01 23:09:50 +02:00
|
|
|
self.streaming.clear()
|
2019-03-31 03:07:43 +02:00
|
|
|
|
2019-05-08 22:07:32 +02:00
|
|
|
@staticmethod
|
|
|
|
def _write_decrypted_blob(handle: typing.IO, data: bytes):
|
|
|
|
handle.write(data)
|
|
|
|
handle.flush()
|
|
|
|
|
2019-03-31 03:07:43 +02:00
|
|
|
async def _save_file(self, output_path: str):
|
2019-05-02 22:55:53 +02:00
|
|
|
log.info("save file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id, self.sd_hash[:6],
|
|
|
|
output_path)
|
2019-03-31 03:07:43 +02:00
|
|
|
self.saving.set()
|
2019-05-10 20:50:01 +02:00
|
|
|
self.finished_write_attempt.clear()
|
2019-03-31 03:07:43 +02:00
|
|
|
self.finished_writing.clear()
|
2019-03-31 19:42:27 +02:00
|
|
|
self.started_writing.clear()
|
2019-03-31 03:07:43 +02:00
|
|
|
try:
|
|
|
|
with open(output_path, 'wb') as file_write_handle:
|
2019-05-06 02:22:10 +02:00
|
|
|
async for blob_info, decrypted in self._aiter_read_stream(connection_id=1):
|
2019-03-31 03:07:43 +02:00
|
|
|
log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
|
2019-05-08 22:07:32 +02:00
|
|
|
await self.loop.run_in_executor(None, self._write_decrypted_blob, file_write_handle, decrypted)
|
2019-03-31 03:07:43 +02:00
|
|
|
self.written_bytes += len(decrypted)
|
2019-03-31 19:42:27 +02:00
|
|
|
if not self.started_writing.is_set():
|
|
|
|
self.started_writing.set()
|
2019-05-01 23:09:50 +02:00
|
|
|
await self.update_status(ManagedStream.STATUS_FINISHED)
|
2019-03-31 19:42:27 +02:00
|
|
|
if self.analytics_manager:
|
|
|
|
self.loop.create_task(self.analytics_manager.send_download_finished(
|
|
|
|
self.download_id, self.claim_name, self.sd_hash
|
|
|
|
))
|
2019-03-31 03:07:43 +02:00
|
|
|
self.finished_writing.set()
|
2019-05-03 20:53:23 +02:00
|
|
|
log.info("finished saving file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id,
|
2019-05-03 22:36:24 +02:00
|
|
|
self.sd_hash[:6], self.full_path)
|
2019-05-07 20:30:35 +02:00
|
|
|
await self.blob_manager.storage.set_saved_file(self.stream_hash)
|
2019-03-31 03:07:43 +02:00
|
|
|
except Exception as err:
|
|
|
|
if os.path.isfile(output_path):
|
2019-05-02 22:55:53 +02:00
|
|
|
log.warning("removing incomplete download %s for %s", output_path, self.sd_hash)
|
2019-03-31 03:07:43 +02:00
|
|
|
os.remove(output_path)
|
2019-05-10 20:50:01 +02:00
|
|
|
self.written_bytes = 0
|
|
|
|
if isinstance(err, asyncio.TimeoutError):
|
|
|
|
self.downloader.stop()
|
|
|
|
await self.blob_manager.storage.change_file_download_dir_and_file_name(
|
|
|
|
self.stream_hash, None, None
|
|
|
|
)
|
|
|
|
self._file_name, self.download_directory = None, None
|
|
|
|
await self.blob_manager.storage.clear_saved_file(self.stream_hash)
|
|
|
|
await self.update_status(self.STATUS_STOPPED)
|
|
|
|
return
|
|
|
|
elif not isinstance(err, asyncio.CancelledError):
|
2019-03-31 19:42:27 +02:00
|
|
|
log.exception("unexpected error encountered writing file for stream %s", self.sd_hash)
|
2019-03-31 03:07:43 +02:00
|
|
|
raise err
|
|
|
|
finally:
|
|
|
|
self.saving.clear()
|
2019-05-10 20:50:01 +02:00
|
|
|
self.finished_write_attempt.set()
|
2019-03-31 03:07:43 +02:00
|
|
|
|
2019-05-01 23:09:50 +02:00
|
|
|
async def save_file(self, file_name: typing.Optional[str] = None, download_directory: typing.Optional[str] = None,
|
|
|
|
node: typing.Optional['Node'] = None):
|
|
|
|
await self.start(node)
|
|
|
|
if self.file_output_task and not self.file_output_task.done(): # cancel an already running save task
|
2019-03-31 03:07:43 +02:00
|
|
|
self.file_output_task.cancel()
|
2019-03-31 19:42:27 +02:00
|
|
|
self.download_directory = download_directory or self.download_directory or self.config.download_dir
|
2019-03-31 03:07:43 +02:00
|
|
|
if not self.download_directory:
|
|
|
|
raise ValueError("no directory to download to")
|
|
|
|
if not (file_name or self._file_name or self.descriptor.suggested_file_name):
|
|
|
|
raise ValueError("no file name to download to")
|
|
|
|
if not os.path.isdir(self.download_directory):
|
|
|
|
log.warning("download directory '%s' does not exist, attempting to make it", self.download_directory)
|
|
|
|
os.mkdir(self.download_directory)
|
2019-05-01 23:09:50 +02:00
|
|
|
self._file_name = await get_next_available_file_name(
|
|
|
|
self.loop, self.download_directory,
|
2019-05-08 20:10:22 +02:00
|
|
|
file_name or self.descriptor.suggested_file_name
|
2019-05-01 23:09:50 +02:00
|
|
|
)
|
|
|
|
await self.blob_manager.storage.change_file_download_dir_and_file_name(
|
|
|
|
self.stream_hash, self.download_directory, self.file_name
|
|
|
|
)
|
|
|
|
await self.update_status(ManagedStream.STATUS_RUNNING)
|
2019-03-31 03:07:43 +02:00
|
|
|
self.written_bytes = 0
|
|
|
|
self.file_output_task = self.loop.create_task(self._save_file(self.full_path))
|
2019-05-01 23:09:50 +02:00
|
|
|
await self.started_writing.wait()
|
2019-02-01 21:46:31 +01:00
|
|
|
|
2019-05-01 23:09:50 +02:00
|
|
|
def stop_tasks(self):
|
2019-03-31 03:07:43 +02:00
|
|
|
if self.file_output_task and not self.file_output_task.done():
|
|
|
|
self.file_output_task.cancel()
|
|
|
|
self.file_output_task = None
|
2019-05-01 23:09:50 +02:00
|
|
|
while self.streaming_responses:
|
2019-05-02 22:56:29 +02:00
|
|
|
req, response = self.streaming_responses.pop()
|
|
|
|
response.force_close()
|
|
|
|
req.transport.close()
|
2019-03-31 03:07:43 +02:00
|
|
|
self.downloader.stop()
|
2019-05-01 23:09:50 +02:00
|
|
|
self._running.clear()
|
2019-01-25 21:05:22 +01:00
|
|
|
|
|
|
|
async def upload_to_reflector(self, host: str, port: int) -> typing.List[str]:
|
|
|
|
sent = []
|
|
|
|
protocol = StreamReflectorClient(self.blob_manager, self.descriptor)
|
|
|
|
try:
|
|
|
|
await self.loop.create_connection(lambda: protocol, host, port)
|
|
|
|
await protocol.send_handshake()
|
|
|
|
sent_sd, needed = await protocol.send_descriptor()
|
|
|
|
if sent_sd:
|
|
|
|
sent.append(self.sd_hash)
|
2019-01-30 20:58:26 +01:00
|
|
|
if not sent_sd and not needed:
|
|
|
|
if not self.fully_reflected.is_set():
|
|
|
|
self.fully_reflected.set()
|
2019-01-31 18:30:31 +01:00
|
|
|
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
|
|
|
|
return []
|
2019-03-31 03:07:43 +02:00
|
|
|
we_have = [
|
|
|
|
blob_hash for blob_hash in needed if blob_hash in self.blob_manager.completed_blob_hashes
|
|
|
|
]
|
2019-02-04 23:53:59 +01:00
|
|
|
for blob_hash in we_have:
|
2019-02-04 17:34:18 +01:00
|
|
|
await protocol.send_blob(blob_hash)
|
|
|
|
sent.append(blob_hash)
|
2019-02-22 03:07:45 +01:00
|
|
|
except (asyncio.TimeoutError, ValueError):
|
2019-02-04 17:34:18 +01:00
|
|
|
return sent
|
|
|
|
except ConnectionRefusedError:
|
|
|
|
return sent
|
|
|
|
finally:
|
2019-01-25 21:05:22 +01:00
|
|
|
if protocol.transport:
|
|
|
|
protocol.transport.close()
|
|
|
|
if not self.fully_reflected.is_set():
|
|
|
|
self.fully_reflected.set()
|
2019-01-31 18:30:31 +01:00
|
|
|
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
|
2019-01-25 21:05:22 +01:00
|
|
|
return sent
|
2019-01-31 18:32:52 +01:00
|
|
|
|
2019-03-20 07:00:03 +01:00
|
|
|
def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):
|
2019-01-31 18:32:52 +01:00
|
|
|
self.stream_claim_info = StoredStreamClaim(
|
|
|
|
self.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'],
|
2019-01-31 20:32:08 +01:00
|
|
|
claim_info['name'], claim_info['amount'], claim_info['height'],
|
2019-03-20 06:46:23 +01:00
|
|
|
binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'],
|
2019-01-31 20:32:08 +01:00
|
|
|
claim_info['claim_sequence'], claim_info.get('channel_name')
|
2019-01-31 18:32:52 +01:00
|
|
|
)
|
2019-05-01 23:09:50 +02:00
|
|
|
|
|
|
|
async def update_content_claim(self, claim_info: typing.Optional[typing.Dict] = None):
|
|
|
|
if not claim_info:
|
|
|
|
claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash)
|
|
|
|
self.set_claim(claim_info, claim_info['value'])
|
|
|
|
|
|
|
|
async def _delayed_stop(self):
|
|
|
|
stalled_count = 0
|
|
|
|
while self._running.is_set():
|
|
|
|
if self.saving.is_set() or self.streaming.is_set():
|
|
|
|
stalled_count = 0
|
|
|
|
else:
|
|
|
|
stalled_count += 1
|
|
|
|
if stalled_count > 1:
|
2019-05-02 22:55:53 +02:00
|
|
|
log.info("stopping inactive download for lbry://%s#%s (%s...)", self.claim_name, self.claim_id,
|
|
|
|
self.sd_hash[:6])
|
2019-05-01 23:09:50 +02:00
|
|
|
await self.stop()
|
|
|
|
return
|
|
|
|
await asyncio.sleep(1, loop=self.loop)
|
|
|
|
|
|
|
|
def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]:
|
|
|
|
if '=' in get_range:
|
|
|
|
get_range = get_range.split('=')[1]
|
|
|
|
start, end = get_range.split('-')
|
|
|
|
size = 0
|
2019-05-08 20:01:19 +02:00
|
|
|
|
2019-05-01 23:09:50 +02:00
|
|
|
for blob in self.descriptor.blobs[:-1]:
|
|
|
|
size += blob.length - 1
|
2019-05-08 20:01:19 +02:00
|
|
|
if self.stream_claim_info and self.stream_claim_info.claim.stream.source.size:
|
|
|
|
size_from_claim = int(self.stream_claim_info.claim.stream.source.size)
|
|
|
|
if not size_from_claim <= size <= size_from_claim + 16:
|
|
|
|
raise ValueError("claim contains implausible stream size")
|
|
|
|
log.debug("using stream size from claim")
|
|
|
|
size = size_from_claim
|
|
|
|
elif self.stream_claim_info:
|
|
|
|
log.debug("estimating stream size")
|
|
|
|
|
2019-05-01 23:09:50 +02:00
|
|
|
start = int(start)
|
|
|
|
end = int(end) if end else size - 1
|
|
|
|
skip_blobs = start // 2097150
|
|
|
|
skip = skip_blobs * 2097151
|
|
|
|
start = skip
|
|
|
|
final_size = end - start + 1
|
|
|
|
|
|
|
|
headers = {
|
|
|
|
'Accept-Ranges': 'bytes',
|
|
|
|
'Content-Range': f'bytes {start}-{end}/{size}',
|
|
|
|
'Content-Length': str(final_size),
|
|
|
|
'Content-Type': self.mime_type
|
|
|
|
}
|
|
|
|
return headers, size, skip_blobs
|