441d3c1220
Adds a daemon for lbrynet. Currently commands are limited the following: download_name(name) resolve_name(name) get_downloads stop This allows other programs to easily interact with lbrynet, such as LBRYURIHandler. LBRYURIHandler can be built with py2app, the resulting plist file must be edited in the same way the committed plist file has been edited. When built and installed to the /Applications folder lbry:// domain names will download and open the corresponding file so long as the daemon is running.
561 lines
No EOL
26 KiB
Python
561 lines
No EOL
26 KiB
Python
import binascii
|
|
import logging
|
|
import tkMessageBox
|
|
from Crypto import Random
|
|
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE
|
|
from lbrynet.core import StreamDescriptor
|
|
from lbrynet.core.Error import UnknownNameError, UnknownStreamTypeError, InvalidStreamDescriptorError
|
|
from lbrynet.core.Error import InvalidStreamInfoError
|
|
from lbrynet.core.LBRYcrdWallet import LBRYcrdWallet
|
|
from lbrynet.core.PaymentRateManager import PaymentRateManager
|
|
from lbrynet.core.Session import LBRYSession
|
|
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier
|
|
from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory
|
|
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
|
|
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
|
|
from lbrynet.lbryfile.LBRYFileMetadataManager import TempLBRYFileMetadataManager
|
|
from lbrynet.lbryfile.StreamDescriptor import LBRYFileStreamType
|
|
from lbrynet.lbryfile.client.LBRYFileDownloader import LBRYFileSaverFactory, LBRYFileOpenerFactory
|
|
from lbrynet.lbryfile.client.LBRYFileOptions import add_lbry_file_to_sd_identifier
|
|
import os
|
|
import requests
|
|
import shutil
|
|
from twisted.internet import threads, defer, task
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class LBRYDownloader(object):
|
|
def __init__(self):
|
|
self.session = None
|
|
self.known_dht_nodes = [('104.236.42.182', 4000)]
|
|
self.db_dir = os.path.join(os.path.expanduser("~"), ".lbrydownloader")
|
|
self.blobfile_dir = os.path.join(self.db_dir, "blobfiles")
|
|
self.peer_port = 3333
|
|
self.dht_node_port = 4444
|
|
self.run_server = True
|
|
self.first_run = False
|
|
self.current_db_revision = 1
|
|
if os.name == "nt":
|
|
from lbrynet.winhelpers.knownpaths import get_path, FOLDERID, UserHandle
|
|
self.download_directory = get_path(FOLDERID.Downloads, UserHandle.current)
|
|
self.wallet_dir = os.path.join(get_path(FOLDERID.RoamingAppData, UserHandle.current), "lbrycrd")
|
|
else:
|
|
self.download_directory = os.getcwd()
|
|
self.wallet_dir = os.path.join(os.path.expanduser("~"), ".lbrycrd")
|
|
self.wallet_conf = os.path.join(self.wallet_dir, "lbrycrd.conf")
|
|
self.wallet_user = None
|
|
self.wallet_password = None
|
|
self.sd_identifier = StreamDescriptorIdentifier()
|
|
self.wallet_rpc_port = 8332
|
|
self.download_deferreds = []
|
|
self.stream_frames = []
|
|
self.default_blob_data_payment_rate = MIN_BLOB_DATA_PAYMENT_RATE
|
|
self.use_upnp = True
|
|
self.start_lbrycrdd = True
|
|
if os.name == "nt":
|
|
self.lbrycrdd_path = "lbrycrdd.exe"
|
|
else:
|
|
self.lbrycrdd_path = None
|
|
self.default_lbrycrdd_path = "./lbrycrdd"
|
|
self.delete_blobs_on_remove = True
|
|
self.blob_request_payment_rate_manager = None
|
|
|
|
def start(self):
|
|
d = self._load_conf_options()
|
|
d.addCallback(lambda _: threads.deferToThread(self._create_directory))
|
|
d.addCallback(lambda _: self._check_db_migration())
|
|
d.addCallback(lambda _: self._get_session())
|
|
d.addCallback(lambda _: self._setup_stream_info_manager())
|
|
d.addCallback(lambda _: self._setup_stream_identifier())
|
|
d.addCallback(lambda _: self.start_server())
|
|
return d
|
|
|
|
def stop(self):
|
|
dl = defer.DeferredList(self.download_deferreds)
|
|
for stream_frame in self.stream_frames:
|
|
stream_frame.cancel_func()
|
|
if self.session is not None:
|
|
dl.addBoth(lambda _: self.stop_server())
|
|
dl.addBoth(lambda _: self.session.shut_down())
|
|
return dl
|
|
|
|
def get_new_address(self):
|
|
return self.session.wallet.get_new_address()
|
|
|
|
def _check_db_migration(self):
|
|
old_revision = 0
|
|
db_revision_file = os.path.join(self.db_dir, "db_revision")
|
|
if os.path.exists(db_revision_file):
|
|
old_revision = int(open(db_revision_file).read().strip())
|
|
if old_revision < self.current_db_revision:
|
|
if os.name == "nt":
|
|
import subprocess
|
|
import sys
|
|
|
|
def run_migrator():
|
|
migrator_exe = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])),
|
|
"dbmigrator", "migrator.exe")
|
|
print "trying to find the migrator at", migrator_exe
|
|
si = subprocess.STARTUPINFO
|
|
si.dwFlags = subprocess.STARTF_USESHOWWINDOW
|
|
si.wShowWindow = subprocess.SW_HIDE
|
|
print "trying to run the migrator"
|
|
migrator_proc = subprocess.Popen([migrator_exe, self.db_dir, str(old_revision),
|
|
str(self.current_db_revision)], startupinfo=si)
|
|
print "started the migrator"
|
|
migrator_proc.wait()
|
|
print "migrator has returned"
|
|
|
|
return threads.deferToThread(run_migrator)
|
|
else:
|
|
from lbrynet.db_migrator import dbmigrator
|
|
return threads.deferToThread(dbmigrator.migrate_db, self.db_dir, old_revision,
|
|
self.current_db_revision)
|
|
return defer.succeed(True)
|
|
|
|
def _load_conf_options(self):
|
|
|
|
def get_lbrycrdd_path_conf_file():
|
|
if os.name == "nt":
|
|
return ""
|
|
lbrycrdd_path_conf_path = os.path.join(os.path.expanduser("~"), ".lbrycrddpath.conf")
|
|
if not os.path.exists(lbrycrdd_path_conf_path):
|
|
return ""
|
|
lbrycrdd_path_conf = open(lbrycrdd_path_conf_path)
|
|
lines = lbrycrdd_path_conf.readlines()
|
|
return lines
|
|
|
|
d = threads.deferToThread(get_lbrycrdd_path_conf_file)
|
|
|
|
def load_lbrycrdd_path(conf):
|
|
for line in conf:
|
|
if len(line.strip()) and line.strip()[0] != "#":
|
|
self.lbrycrdd_path = line.strip()
|
|
|
|
d.addCallback(load_lbrycrdd_path)
|
|
|
|
def get_configuration_file():
|
|
if os.name == "nt":
|
|
lbry_conf_path = "lbry.conf"
|
|
if not os.path.exists(lbry_conf_path):
|
|
log.debug("Could not read lbry.conf")
|
|
return ""
|
|
else:
|
|
lbry_conf_path = os.path.join(os.path.expanduser("~"), ".lbrynetgui.conf")
|
|
if not os.path.exists(lbry_conf_path):
|
|
clean_conf_path = os.path.join(os.path.dirname(__file__), "lbry.conf")
|
|
shutil.copy(clean_conf_path, lbry_conf_path)
|
|
lbry_conf = open(lbry_conf_path)
|
|
log.debug("Loading configuration options from %s", lbry_conf_path)
|
|
lines = lbry_conf.readlines()
|
|
log.debug("%s file contents:\n%s", lbry_conf_path, str(lines))
|
|
return lines
|
|
|
|
d.addCallback(lambda _: threads.deferToThread(get_configuration_file))
|
|
|
|
def load_configuration_file(conf):
|
|
for line in conf:
|
|
if len(line.strip()) and line.strip()[0] != "#":
|
|
try:
|
|
field_name, field_value = map(lambda x: x.strip(), line.strip().split("=", 1))
|
|
field_name = field_name.lower()
|
|
except ValueError:
|
|
raise ValueError("Invalid configuration line: %s" % line)
|
|
if field_name == "known_dht_nodes":
|
|
known_nodes = []
|
|
nodes = field_value.split(",")
|
|
for n in nodes:
|
|
if n.strip():
|
|
try:
|
|
ip_address, port_string = map(lambda x: x.strip(), n.split(":"))
|
|
ip_numbers = ip_address.split(".")
|
|
assert len(ip_numbers) == 4
|
|
for ip_num in ip_numbers:
|
|
num = int(ip_num)
|
|
assert 0 <= num <= 255
|
|
known_nodes.append((ip_address, int(port_string)))
|
|
except (ValueError, AssertionError):
|
|
raise ValueError("Expected known nodes in format 192.168.1.1:4000,192.168.1.2:4001. Got %s" % str(field_value))
|
|
log.debug("Setting known_dht_nodes to %s", str(known_nodes))
|
|
self.known_dht_nodes = known_nodes
|
|
elif field_name == "run_server":
|
|
if field_value.lower() == "true":
|
|
run_server = True
|
|
elif field_value.lower() == "false":
|
|
run_server = False
|
|
else:
|
|
raise ValueError("run_server must be set to True or False. Got %s" % field_value)
|
|
log.debug("Setting run_server to %s", str(run_server))
|
|
self.run_server = run_server
|
|
elif field_name == "data_dir":
|
|
log.debug("Setting data_dir to %s", str(field_value))
|
|
self.db_dir = field_value
|
|
self.blobfile_dir = os.path.join(self.db_dir, "blobfiles")
|
|
elif field_name == "wallet_dir":
|
|
log.debug("Setting wallet_dir to %s", str(field_value))
|
|
self.wallet_dir = field_value
|
|
elif field_name == "wallet_conf":
|
|
log.debug("Setting wallet_conf to %s", str(field_value))
|
|
self.wallet_conf = field_value
|
|
elif field_name == "peer_port":
|
|
try:
|
|
peer_port = int(field_value)
|
|
assert 0 <= peer_port <= 65535
|
|
log.debug("Setting peer_port to %s", str(peer_port))
|
|
self.peer_port = peer_port
|
|
except (ValueError, AssertionError):
|
|
raise ValueError("peer_port must be set to an integer between 1 and 65535. Got %s" % field_value)
|
|
elif field_name == "dht_port":
|
|
try:
|
|
dht_port = int(field_value)
|
|
assert 0 <= dht_port <= 65535
|
|
log.debug("Setting dht_node_port to %s", str(dht_port))
|
|
self.dht_node_port = dht_port
|
|
except (ValueError, AssertionError):
|
|
raise ValueError("dht_port must be set to an integer between 1 and 65535. Got %s" % field_value)
|
|
elif field_name == "use_upnp":
|
|
if field_value.lower() == "true":
|
|
use_upnp = True
|
|
elif field_value.lower() == "false":
|
|
use_upnp = False
|
|
else:
|
|
raise ValueError("use_upnp must be set to True or False. Got %s" % str(field_value))
|
|
log.debug("Setting use_upnp to %s", str(use_upnp))
|
|
self.use_upnp = use_upnp
|
|
elif field_name == "default_blob_data_payment_rate":
|
|
try:
|
|
rate = float(field_value)
|
|
assert rate >= 0.0
|
|
log.debug("Setting default_blob_data_payment_rate to %s", str(rate))
|
|
self.default_blob_data_payment_rate = rate
|
|
except (ValueError, AssertionError):
|
|
raise ValueError("default_blob_data_payment_rate must be a positive floating point number, e.g. 0.5. Got %s" % str(field_value))
|
|
elif field_name == "start_lbrycrdd":
|
|
if field_value.lower() == "true":
|
|
start_lbrycrdd = True
|
|
elif field_value.lower() == "false":
|
|
start_lbrycrdd = False
|
|
else:
|
|
raise ValueError("start_lbrycrdd must be set to True or False. Got %s" % field_value)
|
|
log.debug("Setting start_lbrycrdd to %s", str(start_lbrycrdd))
|
|
self.start_lbrycrdd = start_lbrycrdd
|
|
elif field_name == "lbrycrdd_path":
|
|
self.lbrycrdd_path = field_value
|
|
elif field_name == "download_directory":
|
|
log.debug("Setting download_directory to %s", str(field_value))
|
|
self.download_directory = field_value
|
|
elif field_name == "delete_blobs_on_stream_remove":
|
|
if field_value.lower() == "true":
|
|
self.delete_blobs_on_remove = True
|
|
elif field_value.lower() == "false":
|
|
self.delete_blobs_on_remove = False
|
|
else:
|
|
raise ValueError("delete_blobs_on_stream_remove must be set to True or False")
|
|
else:
|
|
log.warning("Got unknown configuration field: %s", field_name)
|
|
|
|
d.addCallback(load_configuration_file)
|
|
return d
|
|
|
|
def _create_directory(self):
|
|
if not os.path.exists(self.db_dir):
|
|
os.makedirs(self.db_dir)
|
|
db_revision = open(os.path.join(self.db_dir, "db_revision"), mode='w')
|
|
db_revision.write(str(self.current_db_revision))
|
|
db_revision.close()
|
|
log.debug("Created the configuration directory: %s", str(self.db_dir))
|
|
if not os.path.exists(self.blobfile_dir):
|
|
os.makedirs(self.blobfile_dir)
|
|
log.debug("Created the data directory: %s", str(self.blobfile_dir))
|
|
if os.name == "nt":
|
|
if not os.path.exists(self.wallet_dir):
|
|
os.makedirs(self.wallet_dir)
|
|
if not os.path.exists(self.wallet_conf):
|
|
lbrycrd_conf = open(self.wallet_conf, mode='w')
|
|
self.wallet_user = "rpcuser"
|
|
lbrycrd_conf.write("rpcuser=%s\n" % self.wallet_user)
|
|
self.wallet_password = binascii.hexlify(Random.new().read(20))
|
|
lbrycrd_conf.write("rpcpassword=%s\n" % self.wallet_password)
|
|
lbrycrd_conf.write("server=1\n")
|
|
lbrycrd_conf.close()
|
|
else:
|
|
lbrycrd_conf = open(self.wallet_conf)
|
|
for l in lbrycrd_conf:
|
|
if l.startswith("rpcuser="):
|
|
self.wallet_user = l[8:].rstrip('\n')
|
|
if l.startswith("rpcpassword="):
|
|
self.wallet_password = l[12:].rstrip('\n')
|
|
if l.startswith("rpcport="):
|
|
self.wallet_rpc_port = int(l[8:-1].rstrip('\n'))
|
|
|
|
def _get_session(self):
|
|
lbrycrdd_path = None
|
|
if self.start_lbrycrdd is True:
|
|
lbrycrdd_path = self.lbrycrdd_path
|
|
if not lbrycrdd_path:
|
|
lbrycrdd_path = self.default_lbrycrdd_path
|
|
|
|
wallet = LBRYcrdWallet(self.db_dir, wallet_dir=self.wallet_dir, wallet_conf=self.wallet_conf,
|
|
lbrycrdd_path=lbrycrdd_path)
|
|
|
|
peer_port = None
|
|
if self.run_server:
|
|
peer_port = self.peer_port
|
|
self.session = LBRYSession(self.default_blob_data_payment_rate, db_dir=self.db_dir,
|
|
blob_dir=self.blobfile_dir, use_upnp=self.use_upnp, wallet=wallet,
|
|
known_dht_nodes=self.known_dht_nodes, dht_node_port=self.dht_node_port,
|
|
peer_port=peer_port)
|
|
return self.session.setup()
|
|
|
|
def _setup_stream_info_manager(self):
|
|
self.stream_info_manager = TempLBRYFileMetadataManager()
|
|
return defer.succeed(True)
|
|
|
|
def start_server(self):
|
|
|
|
if self.run_server:
|
|
self.blob_request_payment_rate_manager = PaymentRateManager(
|
|
self.session.base_payment_rate_manager,
|
|
self.default_blob_data_payment_rate
|
|
)
|
|
handlers = [
|
|
BlobAvailabilityHandlerFactory(self.session.blob_manager),
|
|
self.session.wallet.get_wallet_info_query_handler_factory(),
|
|
BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet,
|
|
self.blob_request_payment_rate_manager)
|
|
]
|
|
|
|
server_factory = ServerProtocolFactory(self.session.rate_limiter,
|
|
handlers,
|
|
self.session.peer_manager)
|
|
from twisted.internet import reactor
|
|
self.lbry_server_port = reactor.listenTCP(self.peer_port, server_factory)
|
|
|
|
return defer.succeed(True)
|
|
|
|
def stop_server(self):
|
|
if self.lbry_server_port is not None:
|
|
self.lbry_server_port, p = None, self.lbry_server_port
|
|
return defer.maybeDeferred(p.stopListening)
|
|
else:
|
|
return defer.succeed(True)
|
|
|
|
def _setup_stream_identifier(self):
|
|
add_lbry_file_to_sd_identifier(self.sd_identifier)
|
|
file_saver_factory = LBRYFileSaverFactory(self.session.peer_finder, self.session.rate_limiter,
|
|
self.session.blob_manager, self.stream_info_manager,
|
|
self.session.wallet, self.download_directory)
|
|
self.sd_identifier.add_stream_downloader_factory(LBRYFileStreamType, file_saver_factory)
|
|
file_opener_factory = LBRYFileOpenerFactory(self.session.peer_finder, self.session.rate_limiter,
|
|
self.session.blob_manager, self.stream_info_manager,
|
|
self.session.wallet)
|
|
self.sd_identifier.add_stream_downloader_factory(LBRYFileStreamType, file_opener_factory)
|
|
|
|
def check_first_run(self):
|
|
d = self.session.wallet.check_first_run()
|
|
d.addCallback(lambda is_first_run: self._do_first_run() if is_first_run else 0.0)
|
|
return d
|
|
|
|
def _do_first_run(self):
|
|
d = self.session.wallet.get_new_address()
|
|
|
|
def send_request(url, data):
|
|
r = requests.post(url, json=data)
|
|
if r.status_code == 200:
|
|
return r.json()['credits_sent']
|
|
return 0.0
|
|
|
|
def log_error(err):
|
|
log.warning("unable to request free credits. %s", err.getErrorMessage())
|
|
return 0.0
|
|
|
|
def request_credits(address):
|
|
url = "http://credreq.lbry.io/requestcredits"
|
|
data = {"address": address}
|
|
d = threads.deferToThread(send_request, url, data)
|
|
d.addErrback(log_error)
|
|
return d
|
|
|
|
d.addCallback(request_credits)
|
|
return d
|
|
|
|
def _resolve_name(self, uri):
|
|
return self.session.wallet.get_stream_info_for_name(uri)
|
|
|
|
def download_stream(self, stream_frame, uri):
|
|
resolve_d = self._resolve_name(uri)
|
|
|
|
stream_frame.show_metadata_status("resolving name...")
|
|
|
|
stream_frame.cancel_func = resolve_d.cancel
|
|
payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager)
|
|
|
|
def update_stream_name(value):
|
|
if 'name' in value:
|
|
stream_frame.show_name(value['name'])
|
|
if 'description' in value:
|
|
stream_frame.show_description(value['description'])
|
|
return value
|
|
|
|
def get_sd_hash(value):
|
|
if 'stream_hash' in value:
|
|
return value['stream_hash']
|
|
raise UnknownNameError(uri)
|
|
|
|
def get_sd_blob(sd_hash):
|
|
stream_frame.show_metadata_status("name resolved, fetching metadata...")
|
|
get_sd_d = StreamDescriptor.download_sd_blob(self.session, sd_hash,
|
|
payment_rate_manager)
|
|
get_sd_d.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
|
|
get_sd_d.addCallbacks(choose_download_factory, bad_sd_blob)
|
|
return get_sd_d
|
|
|
|
def get_info_from_validator(info_validator):
|
|
stream_name = None
|
|
stream_size = None
|
|
for field, val in info_validator.info_to_show():
|
|
if field == "suggested_file_name":
|
|
stream_name = val
|
|
elif field == "stream_name" and stream_name is None:
|
|
stream_name = val
|
|
elif field == "stream_size":
|
|
stream_size = int(val)
|
|
if stream_size is None:
|
|
stream_size = "unknown"
|
|
if stream_name is None:
|
|
stream_name = "unknown"
|
|
return stream_name, stream_size
|
|
|
|
def choose_download_factory(metadata):
|
|
#info_validator, options, factories = info_and_factories
|
|
stream_name, stream_size = get_info_from_validator(metadata.validator)
|
|
if isinstance(stream_size, (int, long)):
|
|
price = payment_rate_manager.get_effective_min_blob_data_payment_rate()
|
|
estimated_cost = stream_size * 1.0 / 2**20 * price
|
|
else:
|
|
estimated_cost = "unknown"
|
|
|
|
stream_frame.show_stream_metadata(stream_name, stream_size)
|
|
|
|
available_options = metadata.options.get_downloader_options(metadata.validator,
|
|
payment_rate_manager)
|
|
|
|
stream_frame.show_download_options(available_options)
|
|
|
|
get_downloader_d = defer.Deferred()
|
|
|
|
def create_downloader(f, chosen_options):
|
|
|
|
def fire_get_downloader_d(downloader):
|
|
if not get_downloader_d.called:
|
|
get_downloader_d.callback(downloader)
|
|
|
|
stream_frame.disable_download_buttons()
|
|
d = f.make_downloader(metadata, chosen_options,
|
|
payment_rate_manager)
|
|
d.addCallback(fire_get_downloader_d)
|
|
|
|
for factory in metadata.factories:
|
|
|
|
def choose_factory(f=factory):
|
|
chosen_options = stream_frame.get_chosen_options()
|
|
create_downloader(f, chosen_options)
|
|
|
|
stream_frame.add_download_factory(factory, choose_factory)
|
|
|
|
get_downloader_d.addCallback(start_download)
|
|
|
|
return get_downloader_d
|
|
|
|
def show_stream_status(downloader):
|
|
total_bytes = downloader.get_total_bytes_cached()
|
|
bytes_left_to_download = downloader.get_bytes_left_to_download()
|
|
points_paid = payment_rate_manager.points_paid
|
|
payment_rate = payment_rate_manager.get_effective_min_blob_data_payment_rate()
|
|
points_remaining = 1.0 * bytes_left_to_download * payment_rate / 2**20
|
|
stream_frame.show_progress(total_bytes, bytes_left_to_download,
|
|
points_paid, points_remaining)
|
|
|
|
def show_finished(arg, downloader):
|
|
show_stream_status(downloader)
|
|
stream_frame.show_download_done(payment_rate_manager.points_paid)
|
|
return arg
|
|
|
|
def start_download(downloader):
|
|
stream_frame.stream_hash = downloader.stream_hash
|
|
l = task.LoopingCall(show_stream_status, downloader)
|
|
l.start(1)
|
|
d = downloader.start()
|
|
stream_frame.cancel_func = downloader.stop
|
|
|
|
def stop_looping_call(arg):
|
|
l.stop()
|
|
stream_frame.cancel_func = resolve_d.cancel
|
|
return arg
|
|
|
|
d.addBoth(stop_looping_call)
|
|
d.addCallback(show_finished, downloader)
|
|
return d
|
|
|
|
def lookup_failed(err):
|
|
stream_frame.show_metadata_status("name lookup failed")
|
|
return err
|
|
|
|
def bad_sd_blob(err):
|
|
stream_frame.show_metadata_status("Unknown type or badly formed metadata")
|
|
return err
|
|
|
|
resolve_d.addCallback(update_stream_name)
|
|
resolve_d.addCallback(get_sd_hash)
|
|
resolve_d.addCallbacks(get_sd_blob, lookup_failed)
|
|
|
|
def show_err(err):
|
|
tkMessageBox.showerror(title="Download Error", message=err.getErrorMessage())
|
|
log.error(err.getErrorMessage())
|
|
stream_frame.show_download_done(payment_rate_manager.points_paid)
|
|
|
|
resolve_d.addErrback(lambda err: err.trap(defer.CancelledError, UnknownNameError,
|
|
UnknownStreamTypeError, InvalidStreamDescriptorError,
|
|
InvalidStreamInfoError))
|
|
resolve_d.addErrback(show_err)
|
|
|
|
def delete_associated_blobs():
|
|
if stream_frame.stream_hash is None or self.delete_blobs_on_remove is False:
|
|
return defer.succeed(True)
|
|
d1 = self.stream_info_manager.get_blobs_for_stream(stream_frame.stream_hash)
|
|
|
|
def get_blob_hashes(blob_infos):
|
|
return [b[0] for b in blob_infos if b[0] is not None]
|
|
|
|
d1.addCallback(get_blob_hashes)
|
|
d2 = self.stream_info_manager.get_sd_blob_hashes_for_stream(stream_frame.stream_hash)
|
|
|
|
def combine_blob_hashes(results):
|
|
blob_hashes = []
|
|
for success, result in results:
|
|
if success is True:
|
|
blob_hashes.extend(result)
|
|
return blob_hashes
|
|
|
|
def delete_blobs(blob_hashes):
|
|
return self.session.blob_manager.delete_blobs(blob_hashes)
|
|
|
|
dl = defer.DeferredList([d1, d2], fireOnOneErrback=True)
|
|
dl.addCallback(combine_blob_hashes)
|
|
dl.addCallback(delete_blobs)
|
|
return dl
|
|
|
|
resolve_d.addCallback(lambda _: delete_associated_blobs())
|
|
self._add_download_deferred(resolve_d, stream_frame)
|
|
|
|
def _add_download_deferred(self, d, stream_frame):
|
|
self.download_deferreds.append(d)
|
|
self.stream_frames.append(stream_frame)
|
|
|
|
def remove_from_list():
|
|
self.download_deferreds.remove(d)
|
|
self.stream_frames.remove(stream_frame)
|
|
|
|
d.addBoth(lambda _: remove_from_list()) |