lbry-sdk/lbrynet/stream/managed_stream.py

370 lines
16 KiB
Python
Raw Normal View History

2019-01-22 18:54:17 +01:00
import os
import asyncio
import typing
import logging
2019-01-31 20:32:08 +01:00
import binascii
from lbrynet.utils import generate_id
2019-03-24 21:55:04 +01:00
from lbrynet.schema.mime_types import guess_media_type
2019-01-22 18:54:17 +01:00
from lbrynet.stream.downloader import StreamDownloader
2019-01-23 23:20:16 +01:00
from lbrynet.stream.descriptor import StreamDescriptor
2019-01-25 21:05:22 +01:00
from lbrynet.stream.reflector.client import StreamReflectorClient
2019-01-31 20:32:08 +01:00
from lbrynet.extras.daemon.storage import StoredStreamClaim
2019-01-22 18:54:17 +01:00
if typing.TYPE_CHECKING:
from lbrynet.conf import Config
2019-03-20 07:00:03 +01:00
from lbrynet.schema.claim import Claim
2019-03-28 19:51:55 +01:00
from lbrynet.blob.blob_manager import BlobManager
from lbrynet.blob.blob_info import BlobInfo
2019-02-01 21:46:31 +01:00
from lbrynet.dht.node import Node
2019-03-31 19:42:27 +02:00
from lbrynet.extras.daemon.analytics import AnalyticsManager
2019-04-05 05:10:18 +02:00
from lbrynet.wallet.transaction import Transaction
2019-01-22 18:54:17 +01:00
log = logging.getLogger(__name__)
def _get_next_available_file_name(download_directory: str, file_name: str) -> str:
base_name, ext = os.path.splitext(os.path.basename(file_name))
i = 0
while os.path.isfile(os.path.join(download_directory, file_name)):
i += 1
file_name = "%s_%i%s" % (base_name, i, ext)
return file_name
async def get_next_available_file_name(loop: asyncio.BaseEventLoop, download_directory: str, file_name: str) -> str:
return await loop.run_in_executor(None, _get_next_available_file_name, download_directory, file_name)
2019-01-22 18:54:17 +01:00
class ManagedStream:
STATUS_RUNNING = "running"
STATUS_STOPPED = "stopped"
STATUS_FINISHED = "finished"
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager',
sd_hash: str, download_directory: typing.Optional[str] = None, file_name: typing.Optional[str] = None,
2019-03-11 02:55:33 +01:00
status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None,
download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None,
2019-03-31 19:42:27 +02:00
descriptor: typing.Optional[StreamDescriptor] = None,
2019-04-05 05:10:18 +02:00
content_fee: typing.Optional['Transaction'] = None,
2019-03-31 19:42:27 +02:00
analytics_manager: typing.Optional['AnalyticsManager'] = None):
2019-01-22 18:54:17 +01:00
self.loop = loop
self.config = config
2019-01-22 18:54:17 +01:00
self.blob_manager = blob_manager
self.sd_hash = sd_hash
2019-01-22 18:54:17 +01:00
self.download_directory = download_directory
self._file_name = file_name
2019-01-22 18:54:17 +01:00
self._status = status
self.stream_claim_info = claim
2019-03-11 02:55:33 +01:00
self.download_id = download_id or binascii.hexlify(generate_id()).decode()
self.rowid = rowid
self.written_bytes = 0
2019-04-05 05:10:18 +02:00
self.content_fee = content_fee
self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor)
2019-03-31 19:42:27 +02:00
self.analytics_manager = analytics_manager
self.fully_reflected = asyncio.Event(loop=self.loop)
self.file_output_task: typing.Optional[asyncio.Task] = None
self.delayed_stop: typing.Optional[asyncio.Handle] = None
self.saving = asyncio.Event(loop=self.loop)
self.finished_writing = asyncio.Event(loop=self.loop)
2019-03-31 19:42:27 +02:00
self.started_writing = asyncio.Event(loop=self.loop)
@property
def descriptor(self) -> StreamDescriptor:
return self.downloader.descriptor
@property
def stream_hash(self) -> str:
return self.descriptor.stream_hash
2019-01-22 18:54:17 +01:00
@property
def file_name(self) -> typing.Optional[str]:
return self._file_name or (self.descriptor.suggested_file_name if self.descriptor else None)
2019-01-22 18:54:17 +01:00
@property
def status(self) -> str:
return self._status
def update_status(self, status: str):
assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED]
self._status = status
@property
def finished(self) -> bool:
return self.status == self.STATUS_FINISHED
@property
def running(self) -> bool:
return self.status == self.STATUS_RUNNING
@property
def claim_id(self) -> typing.Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.claim_id
@property
def txid(self) -> typing.Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.txid
@property
def nout(self) -> typing.Optional[int]:
return None if not self.stream_claim_info else self.stream_claim_info.nout
@property
def outpoint(self) -> typing.Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.outpoint
@property
def claim_height(self) -> typing.Optional[int]:
return None if not self.stream_claim_info else self.stream_claim_info.height
@property
def channel_claim_id(self) -> typing.Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.channel_claim_id
@property
def channel_name(self) -> typing.Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.channel_name
@property
def claim_name(self) -> typing.Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.claim_name
@property
2019-04-05 05:10:18 +02:00
def metadata(self) -> typing.Optional[typing.Dict]:
return None if not self.stream_claim_info else self.stream_claim_info.claim.stream.to_dict()
2019-01-22 18:54:17 +01:00
@property
def metadata_protobuf(self) -> bytes:
if self.stream_claim_info:
return binascii.hexlify(self.stream_claim_info.claim.to_bytes())
2019-01-22 18:54:17 +01:00
@property
def blobs_completed(self) -> int:
return sum([1 if self.blob_manager.is_blob_verified(b.blob_hash) else 0
2019-01-22 18:54:17 +01:00
for b in self.descriptor.blobs[:-1]])
@property
def blobs_in_stream(self) -> int:
return len(self.descriptor.blobs) - 1
@property
def blobs_remaining(self) -> int:
return self.blobs_in_stream - self.blobs_completed
@property
def full_path(self) -> typing.Optional[str]:
return os.path.join(self.download_directory, os.path.basename(self.file_name)) \
if self.file_name and self.download_directory else None
@property
def output_file_exists(self):
return os.path.isfile(self.full_path) if self.full_path else False
@property
def mime_type(self):
2019-04-05 05:10:18 +02:00
return guess_media_type(os.path.basename(self.descriptor.suggested_file_name))[0]
2019-01-22 18:54:17 +01:00
def as_dict(self) -> typing.Dict:
if self.written_bytes:
written_bytes = self.written_bytes
2019-04-05 05:10:18 +02:00
elif self.output_file_exists:
written_bytes = os.stat(self.full_path).st_size
2019-01-25 15:50:01 +01:00
else:
written_bytes = None
2019-01-22 18:54:17 +01:00
return {
'completed': self.finished,
'file_name': self.file_name,
'download_directory': self.download_directory,
'points_paid': 0.0,
'stopped': not self.running,
'stream_hash': self.stream_hash,
'stream_name': self.descriptor.stream_name,
'suggested_file_name': self.descriptor.suggested_file_name,
'sd_hash': self.descriptor.sd_hash,
2019-04-05 05:10:18 +02:00
'download_path': self.full_path,
'mime_type': self.mime_type,
2019-01-22 18:54:17 +01:00
'key': self.descriptor.key,
'total_bytes_lower_bound': self.descriptor.lower_bound_decrypted_length(),
'total_bytes': self.descriptor.upper_bound_decrypted_length(),
2019-01-25 15:50:01 +01:00
'written_bytes': written_bytes,
2019-01-22 18:54:17 +01:00
'blobs_completed': self.blobs_completed,
'blobs_in_stream': self.blobs_in_stream,
'blobs_remaining': self.blobs_remaining,
2019-01-22 18:54:17 +01:00
'status': self.status,
'claim_id': self.claim_id,
'txid': self.txid,
'nout': self.nout,
'outpoint': self.outpoint,
'metadata': self.metadata,
'protobuf': self.metadata_protobuf,
2019-01-22 18:54:17 +01:00
'channel_claim_id': self.channel_claim_id,
'channel_name': self.channel_name,
2019-04-05 05:10:18 +02:00
'claim_name': self.claim_name,
'content_fee': self.content_fee # TODO: this isn't in the database
2019-01-22 18:54:17 +01:00
}
@classmethod
async def create(cls, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager',
2019-01-25 21:05:22 +01:00
file_path: str, key: typing.Optional[bytes] = None,
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedStream':
2019-01-22 18:54:17 +01:00
descriptor = await StreamDescriptor.create_stream(
loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator,
blob_completed_callback=blob_manager.blob_completed
2019-01-22 18:54:17 +01:00
)
await blob_manager.storage.store_stream(
blob_manager.get_blob(descriptor.sd_hash), descriptor
)
2019-02-15 22:44:31 +01:00
row_id = await blob_manager.storage.save_published_file(descriptor.stream_hash, os.path.basename(file_path),
os.path.dirname(file_path), 0)
return cls(loop, config, blob_manager, descriptor.sd_hash, os.path.dirname(file_path),
os.path.basename(file_path), status=cls.STATUS_FINISHED, rowid=row_id, descriptor=descriptor)
2019-03-31 19:42:27 +02:00
async def setup(self, node: typing.Optional['Node'] = None, save_file: typing.Optional[bool] = True,
file_name: typing.Optional[str] = None, download_directory: typing.Optional[str] = None):
await self.downloader.start(node)
2019-03-31 19:42:27 +02:00
if not save_file and not file_name:
if not await self.blob_manager.storage.file_exists(self.sd_hash):
2019-04-17 21:21:58 +02:00
self.rowid = await self.blob_manager.storage.save_downloaded_file(
self.stream_hash, None, None, 0.0
)
self.download_directory = None
self._file_name = None
2019-04-05 05:10:18 +02:00
self.update_status(ManagedStream.STATUS_RUNNING)
await self.blob_manager.storage.change_file_status(self.stream_hash, ManagedStream.STATUS_RUNNING)
self.update_delayed_stop()
2019-05-01 00:51:19 +02:00
else:
2019-03-31 19:42:27 +02:00
await self.save_file(file_name, download_directory)
await self.started_writing.wait()
def update_delayed_stop(self):
def _delayed_stop():
log.info("Stopping inactive download for stream %s", self.sd_hash)
self.stop_download()
if self.delayed_stop:
self.delayed_stop.cancel()
self.delayed_stop = self.loop.call_later(60, _delayed_stop)
async def aiter_read_stream(self, start_blob_num: typing.Optional[int] = 0) -> typing.AsyncIterator[
typing.Tuple['BlobInfo', bytes]]:
if start_blob_num >= len(self.descriptor.blobs[:-1]):
raise IndexError(start_blob_num)
for i, blob_info in enumerate(self.descriptor.blobs[start_blob_num:-1]):
assert i + start_blob_num == blob_info.blob_num
if self.delayed_stop:
self.delayed_stop.cancel()
try:
decrypted = await self.downloader.read_blob(blob_info)
yield (blob_info, decrypted)
except asyncio.CancelledError:
if not self.saving.is_set() and not self.finished_writing.is_set():
self.update_delayed_stop()
raise
async def _save_file(self, output_path: str):
2019-04-05 05:10:18 +02:00
log.debug("save file %s -> %s", self.sd_hash, output_path)
self.saving.set()
self.finished_writing.clear()
2019-03-31 19:42:27 +02:00
self.started_writing.clear()
try:
with open(output_path, 'wb') as file_write_handle:
async for blob_info, decrypted in self.aiter_read_stream():
log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
file_write_handle.write(decrypted)
file_write_handle.flush()
self.written_bytes += len(decrypted)
2019-03-31 19:42:27 +02:00
if not self.started_writing.is_set():
self.started_writing.set()
self.update_status(ManagedStream.STATUS_FINISHED)
await self.blob_manager.storage.change_file_status(self.stream_hash, ManagedStream.STATUS_FINISHED)
if self.analytics_manager:
self.loop.create_task(self.analytics_manager.send_download_finished(
self.download_id, self.claim_name, self.sd_hash
))
self.finished_writing.set()
except Exception as err:
if os.path.isfile(output_path):
2019-03-31 19:42:27 +02:00
log.info("removing incomplete download %s for %s", output_path, self.sd_hash)
os.remove(output_path)
2019-03-31 19:42:27 +02:00
if not isinstance(err, asyncio.CancelledError):
log.exception("unexpected error encountered writing file for stream %s", self.sd_hash)
raise err
finally:
self.saving.clear()
async def save_file(self, file_name: typing.Optional[str] = None, download_directory: typing.Optional[str] = None):
if self.file_output_task and not self.file_output_task.done():
self.file_output_task.cancel()
if self.delayed_stop:
self.delayed_stop.cancel()
self.delayed_stop = None
2019-03-31 19:42:27 +02:00
self.download_directory = download_directory or self.download_directory or self.config.download_dir
if not self.download_directory:
raise ValueError("no directory to download to")
if not (file_name or self._file_name or self.descriptor.suggested_file_name):
raise ValueError("no file name to download to")
if not os.path.isdir(self.download_directory):
log.warning("download directory '%s' does not exist, attempting to make it", self.download_directory)
os.mkdir(self.download_directory)
if not await self.blob_manager.storage.file_exists(self.sd_hash):
2019-05-01 18:16:45 +02:00
self._file_name = await get_next_available_file_name(
self.loop, self.download_directory,
file_name or self._file_name or self.descriptor.suggested_file_name
)
self.rowid = self.blob_manager.storage.save_downloaded_file(
self.stream_hash, self.file_name, self.download_directory, 0.0
)
else:
await self.blob_manager.storage.change_file_download_dir_and_file_name(
self.stream_hash, self.download_directory, self.file_name
)
2019-04-05 05:10:18 +02:00
self.update_status(ManagedStream.STATUS_RUNNING)
await self.blob_manager.storage.change_file_status(self.stream_hash, ManagedStream.STATUS_RUNNING)
self.written_bytes = 0
self.file_output_task = self.loop.create_task(self._save_file(self.full_path))
2019-02-01 21:46:31 +01:00
def stop_download(self):
if self.file_output_task and not self.file_output_task.done():
self.file_output_task.cancel()
self.file_output_task = None
self.downloader.stop()
2019-01-25 21:05:22 +01:00
async def upload_to_reflector(self, host: str, port: int) -> typing.List[str]:
sent = []
protocol = StreamReflectorClient(self.blob_manager, self.descriptor)
try:
await self.loop.create_connection(lambda: protocol, host, port)
await protocol.send_handshake()
sent_sd, needed = await protocol.send_descriptor()
if sent_sd:
sent.append(self.sd_hash)
if not sent_sd and not needed:
if not self.fully_reflected.is_set():
self.fully_reflected.set()
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
return []
we_have = [
blob_hash for blob_hash in needed if blob_hash in self.blob_manager.completed_blob_hashes
]
for blob_hash in we_have:
2019-02-04 17:34:18 +01:00
await protocol.send_blob(blob_hash)
sent.append(blob_hash)
2019-02-22 03:07:45 +01:00
except (asyncio.TimeoutError, ValueError):
2019-02-04 17:34:18 +01:00
return sent
except ConnectionRefusedError:
return sent
finally:
2019-01-25 21:05:22 +01:00
if protocol.transport:
protocol.transport.close()
if not self.fully_reflected.is_set():
self.fully_reflected.set()
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
2019-01-25 21:05:22 +01:00
return sent
2019-03-20 07:00:03 +01:00
def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):
self.stream_claim_info = StoredStreamClaim(
self.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'],
2019-01-31 20:32:08 +01:00
claim_info['name'], claim_info['amount'], claim_info['height'],
2019-03-20 06:46:23 +01:00
binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'],
2019-01-31 20:32:08 +01:00
claim_info['claim_sequence'], claim_info.get('channel_name')
)