forked from LBRYCommunity/lbry-sdk
557 lines
No EOL
26 KiB
Python
557 lines
No EOL
26 KiB
Python
import binascii
|
|
import logging
|
|
import tkMessageBox
|
|
from Crypto import Random
|
|
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE
|
|
from lbrynet.core import StreamDescriptor
|
|
from lbrynet.core.Error import UnknownNameError, UnknownStreamTypeError, InvalidStreamDescriptorError
|
|
from lbrynet.core.Error import InvalidStreamInfoError
|
|
from lbrynet.core.LBRYcrdWallet import LBRYcrdWallet
|
|
from lbrynet.core.PaymentRateManager import PaymentRateManager
|
|
from lbrynet.core.Session import LBRYSession
|
|
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier
|
|
from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory
|
|
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
|
|
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
|
|
from lbrynet.lbryfile.LBRYFileMetadataManager import TempLBRYFileMetadataManager
|
|
from lbrynet.lbryfile.StreamDescriptor import LBRYFileStreamType
|
|
from lbrynet.lbryfile.client.LBRYFileDownloader import LBRYFileSaverFactory, LBRYFileOpenerFactory
|
|
from lbrynet.lbryfile.client.LBRYFileOptions import add_lbry_file_to_sd_identifier
|
|
import os
|
|
import requests
|
|
import shutil
|
|
from twisted.internet import threads, defer, task
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class LBRYDownloader(object):
|
|
def __init__(self):
|
|
self.session = None
|
|
self.known_dht_nodes = [('104.236.42.182', 4000)]
|
|
self.db_dir = os.path.join(os.path.expanduser("~"), ".lbrydownloader")
|
|
self.blobfile_dir = os.path.join(self.db_dir, "blobfiles")
|
|
self.peer_port = 3333
|
|
self.dht_node_port = 4444
|
|
self.run_server = True
|
|
self.first_run = False
|
|
self.current_db_revision = 1
|
|
if os.name == "nt":
|
|
from lbrynet.winhelpers.knownpaths import get_path, FOLDERID, UserHandle
|
|
self.download_directory = get_path(FOLDERID.Downloads, UserHandle.current)
|
|
self.wallet_dir = os.path.join(get_path(FOLDERID.RoamingAppData, UserHandle.current), "lbrycrd")
|
|
else:
|
|
self.download_directory = os.getcwd()
|
|
self.wallet_conf = os.path.join(self.wallet_dir, "lbrycrd.conf")
|
|
self.wallet_user = None
|
|
self.wallet_password = None
|
|
self.sd_identifier = StreamDescriptorIdentifier()
|
|
self.wallet_rpc_port = 8332
|
|
self.download_deferreds = []
|
|
self.stream_frames = []
|
|
self.default_blob_data_payment_rate = MIN_BLOB_DATA_PAYMENT_RATE
|
|
self.use_upnp = True
|
|
self.start_lbrycrdd = True
|
|
if os.name == "nt":
|
|
self.lbrycrdd_path = "lbrycrdd.exe"
|
|
else:
|
|
self.lbrycrdd_path = "./lbrycrdd"
|
|
self.delete_blobs_on_remove = True
|
|
self.blob_request_payment_rate_manager = None
|
|
|
|
def start(self):
|
|
d = self._load_conf_options()
|
|
d.addCallback(lambda _: threads.deferToThread(self._create_directory))
|
|
d.addCallback(lambda _: self._check_db_migration())
|
|
d.addCallback(lambda _: self._get_session())
|
|
d.addCallback(lambda _: self._setup_stream_info_manager())
|
|
d.addCallback(lambda _: self._setup_stream_identifier())
|
|
d.addCallback(lambda _: self.start_server())
|
|
return d
|
|
|
|
def stop(self):
|
|
dl = defer.DeferredList(self.download_deferreds)
|
|
for stream_frame in self.stream_frames:
|
|
stream_frame.cancel_func()
|
|
if self.session is not None:
|
|
dl.addBoth(lambda _: self.stop_server())
|
|
dl.addBoth(lambda _: self.session.shut_down())
|
|
return dl
|
|
|
|
def get_new_address(self):
|
|
return self.session.wallet.get_new_address()
|
|
|
|
def _check_db_migration(self):
|
|
old_revision = 0
|
|
db_revision_file = os.path.join(self.db_dir, "db_revision")
|
|
if os.path.exists(db_revision_file):
|
|
old_revision = int(open(db_revision_file).read().strip())
|
|
if old_revision < self.current_db_revision:
|
|
if os.name == "nt":
|
|
import subprocess
|
|
import sys
|
|
|
|
def run_migrator():
|
|
migrator_exe = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])),
|
|
"dbmigrator", "migrator.exe")
|
|
print "trying to find the migrator at", migrator_exe
|
|
si = subprocess.STARTUPINFO
|
|
si.dwFlags = subprocess.STARTF_USESHOWWINDOW
|
|
si.wShowWindow = subprocess.SW_HIDE
|
|
print "trying to run the migrator"
|
|
migrator_proc = subprocess.Popen([migrator_exe, self.db_dir, str(old_revision),
|
|
str(self.current_db_revision)], startupinfo=si)
|
|
print "started the migrator"
|
|
migrator_proc.wait()
|
|
print "migrator has returned"
|
|
|
|
return threads.deferToThread(run_migrator)
|
|
else:
|
|
from lbrynet.db_migrator import dbmigrator
|
|
return threads.deferToThread(dbmigrator.migrate_db, self.db_dir, old_revision,
|
|
self.current_db_revision)
|
|
return defer.succeed(True)
|
|
|
|
def _load_conf_options(self):
|
|
|
|
def get_lbrycrdd_path_conf_file():
|
|
if os.name == "nt":
|
|
return ""
|
|
lbrycrdd_path_conf_path = os.path.join(os.path.expanduser("~"), ".lbrycrddpath.conf")
|
|
if not os.path.exists(lbrycrdd_path_conf_path):
|
|
return ""
|
|
lbrycrdd_path_conf = open(lbrycrdd_path_conf_path)
|
|
lines = lbrycrdd_path_conf.readline()
|
|
return lines
|
|
|
|
d = threads.deferToThread(get_lbrycrdd_path_conf_file)
|
|
|
|
def load_lbrycrdd_path(conf):
|
|
for line in conf:
|
|
if len(line.strip()) and line.strip()[0] != "#":
|
|
self.lbrycrdd_path = line.strip()
|
|
|
|
d.addCallback(load_lbrycrdd_path)
|
|
|
|
def get_configuration_file():
|
|
if os.name == "nt":
|
|
lbry_conf_path = "lbry.conf"
|
|
if not os.path.exists(lbry_conf_path):
|
|
log.debug("Could not read lbry.conf")
|
|
return ""
|
|
else:
|
|
lbry_conf_path = os.path.join(os.path.expanduser("~"), ".lbrynetgui.conf")
|
|
if not os.path.exists(lbry_conf_path):
|
|
clean_conf_path = os.path.join(os.path.dirname(__file__), "lbry.conf")
|
|
shutil.copy(clean_conf_path, lbry_conf_path)
|
|
lbry_conf = open(lbry_conf_path)
|
|
log.debug("Loading configuration options from %s", lbry_conf_path)
|
|
lines = lbry_conf.readlines()
|
|
log.debug("%s file contents:\n%s", lbry_conf_path, str(lines))
|
|
return lines
|
|
|
|
d.addCallback(lambda _: threads.deferToThread(get_configuration_file))
|
|
|
|
def load_configuration_file(conf):
|
|
for line in conf:
|
|
if len(line.strip()) and line.strip()[0] != "#":
|
|
try:
|
|
field_name, field_value = map(lambda x: x.strip(), line.strip().split("=", 1))
|
|
field_name = field_name.lower()
|
|
except ValueError:
|
|
raise ValueError("Invalid configuration line: %s" % line)
|
|
if field_name == "known_dht_nodes":
|
|
known_nodes = []
|
|
nodes = field_value.split(",")
|
|
for n in nodes:
|
|
if n.strip():
|
|
try:
|
|
ip_address, port_string = map(lambda x: x.strip(), n.split(":"))
|
|
ip_numbers = ip_address.split(".")
|
|
assert len(ip_numbers) == 4
|
|
for ip_num in ip_numbers:
|
|
num = int(ip_num)
|
|
assert 0 <= num <= 255
|
|
known_nodes.append((ip_address, int(port_string)))
|
|
except (ValueError, AssertionError):
|
|
raise ValueError("Expected known nodes in format 192.168.1.1:4000,192.168.1.2:4001. Got %s" % str(field_value))
|
|
log.debug("Setting known_dht_nodes to %s", str(known_nodes))
|
|
self.known_dht_nodes = known_nodes
|
|
elif field_name == "run_server":
|
|
if field_value.lower() == "true":
|
|
run_server = True
|
|
elif field_value.lower() == "false":
|
|
run_server = False
|
|
else:
|
|
raise ValueError("run_server must be set to True or False. Got %s" % field_value)
|
|
log.debug("Setting run_server to %s", str(run_server))
|
|
self.run_server = run_server
|
|
elif field_name == "data_dir":
|
|
log.debug("Setting data_dir to %s", str(field_value))
|
|
self.db_dir = field_value
|
|
self.blobfile_dir = os.path.join(self.db_dir, "blobfiles")
|
|
elif field_name == "wallet_dir":
|
|
log.debug("Setting wallet_dir to %s", str(field_value))
|
|
self.wallet_dir = field_value
|
|
elif field_name == "wallet_conf":
|
|
log.debug("Setting wallet_conf to %s", str(field_value))
|
|
self.wallet_conf = field_value
|
|
elif field_name == "peer_port":
|
|
try:
|
|
peer_port = int(field_value)
|
|
assert 0 <= peer_port <= 65535
|
|
log.debug("Setting peer_port to %s", str(peer_port))
|
|
self.peer_port = peer_port
|
|
except (ValueError, AssertionError):
|
|
raise ValueError("peer_port must be set to an integer between 1 and 65535. Got %s" % field_value)
|
|
elif field_name == "dht_port":
|
|
try:
|
|
dht_port = int(field_value)
|
|
assert 0 <= dht_port <= 65535
|
|
log.debug("Setting dht_node_port to %s", str(dht_port))
|
|
self.dht_node_port = dht_port
|
|
except (ValueError, AssertionError):
|
|
raise ValueError("dht_port must be set to an integer between 1 and 65535. Got %s" % field_value)
|
|
elif field_name == "use_upnp":
|
|
if field_value.lower() == "true":
|
|
use_upnp = True
|
|
elif field_value.lower() == "false":
|
|
use_upnp = False
|
|
else:
|
|
raise ValueError("use_upnp must be set to True or False. Got %s" % str(field_value))
|
|
log.debug("Setting use_upnp to %s", str(use_upnp))
|
|
self.use_upnp = use_upnp
|
|
elif field_name == "default_blob_data_payment_rate":
|
|
try:
|
|
rate = float(field_value)
|
|
assert rate >= 0.0
|
|
log.debug("Setting default_blob_data_payment_rate to %s", str(rate))
|
|
self.default_blob_data_payment_rate = rate
|
|
except (ValueError, AssertionError):
|
|
raise ValueError("default_blob_data_payment_rate must be a positive floating point number, e.g. 0.5. Got %s" % str(field_value))
|
|
elif field_name == "start_lbrycrdd":
|
|
if field_value.lower() == "true":
|
|
start_lbrycrdd = True
|
|
elif field_value.lower() == "false":
|
|
start_lbrycrdd = False
|
|
else:
|
|
raise ValueError("start_lbrycrdd must be set to True or False. Got %s" % field_value)
|
|
log.debug("Setting start_lbrycrdd to %s", str(start_lbrycrdd))
|
|
self.start_lbrycrdd = start_lbrycrdd
|
|
elif field_name == "lbrycrdd_path":
|
|
self.lbrycrdd_path = field_value
|
|
elif field_name == "download_directory":
|
|
log.debug("Setting download_directory to %s", str(field_value))
|
|
self.download_directory = field_value
|
|
elif field_name == "delete_blobs_on_stream_remove":
|
|
if field_value.lower() == "true":
|
|
self.delete_blobs_on_remove = True
|
|
elif field_value.lower() == "false":
|
|
self.delete_blobs_on_remove = False
|
|
else:
|
|
raise ValueError("delete_blobs_on_stream_remove must be set to True or False")
|
|
else:
|
|
log.warning("Got unknown configuration field: %s", field_name)
|
|
|
|
d.addCallback(load_configuration_file)
|
|
return d
|
|
|
|
def _create_directory(self):
|
|
if not os.path.exists(self.db_dir):
|
|
os.makedirs(self.db_dir)
|
|
db_revision = open(os.path.join(self.db_dir, "db_revision"), mode='w')
|
|
db_revision.write(str(self.current_db_revision))
|
|
db_revision.close()
|
|
log.debug("Created the configuration directory: %s", str(self.db_dir))
|
|
if not os.path.exists(self.blobfile_dir):
|
|
os.makedirs(self.blobfile_dir)
|
|
log.debug("Created the data directory: %s", str(self.blobfile_dir))
|
|
if os.name == "nt":
|
|
if not os.path.exists(self.wallet_dir):
|
|
os.makedirs(self.wallet_dir)
|
|
if not os.path.exists(self.wallet_conf):
|
|
lbrycrd_conf = open(self.wallet_conf, mode='w')
|
|
self.wallet_user = "rpcuser"
|
|
lbrycrd_conf.write("rpcuser=%s\n" % self.wallet_user)
|
|
self.wallet_password = binascii.hexlify(Random.new().read(20))
|
|
lbrycrd_conf.write("rpcpassword=%s\n" % self.wallet_password)
|
|
lbrycrd_conf.write("server=1\n")
|
|
lbrycrd_conf.close()
|
|
else:
|
|
lbrycrd_conf = open(self.wallet_conf)
|
|
for l in lbrycrd_conf:
|
|
if l.startswith("rpcuser="):
|
|
self.wallet_user = l[8:].rstrip('\n')
|
|
if l.startswith("rpcpassword="):
|
|
self.wallet_password = l[12:].rstrip('\n')
|
|
if l.startswith("rpcport="):
|
|
self.wallet_rpc_port = int(l[8:-1].rstrip('\n'))
|
|
|
|
def _get_session(self):
|
|
lbrycrdd_path = None
|
|
if self.start_lbrycrdd is True:
|
|
lbrycrdd_path = self.lbrycrdd_path
|
|
|
|
wallet = LBRYcrdWallet(self.db_dir, wallet_dir=self.wallet_dir, wallet_conf=self.wallet_conf,
|
|
lbrycrdd_path=lbrycrdd_path)
|
|
|
|
peer_port = None
|
|
if self.run_server:
|
|
peer_port = self.peer_port
|
|
self.session = LBRYSession(self.default_blob_data_payment_rate, db_dir=self.db_dir,
|
|
blob_dir=self.blobfile_dir, use_upnp=self.use_upnp, wallet=wallet,
|
|
known_dht_nodes=self.known_dht_nodes, dht_node_port=self.dht_node_port,
|
|
peer_port=peer_port)
|
|
return self.session.setup()
|
|
|
|
def _setup_stream_info_manager(self):
|
|
self.stream_info_manager = TempLBRYFileMetadataManager()
|
|
return defer.succeed(True)
|
|
|
|
def start_server(self):
|
|
|
|
if self.run_server:
|
|
self.blob_request_payment_rate_manager = PaymentRateManager(
|
|
self.session.base_payment_rate_manager,
|
|
self.default_blob_data_payment_rate
|
|
)
|
|
handlers = [
|
|
BlobAvailabilityHandlerFactory(self.session.blob_manager),
|
|
self.session.wallet.get_wallet_info_query_handler_factory(),
|
|
BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet,
|
|
self.blob_request_payment_rate_manager)
|
|
]
|
|
|
|
server_factory = ServerProtocolFactory(self.session.rate_limiter,
|
|
handlers,
|
|
self.session.peer_manager)
|
|
from twisted.internet import reactor
|
|
self.lbry_server_port = reactor.listenTCP(self.peer_port, server_factory)
|
|
|
|
return defer.succeed(True)
|
|
|
|
def stop_server(self):
|
|
if self.lbry_server_port is not None:
|
|
self.lbry_server_port, p = None, self.lbry_server_port
|
|
return defer.maybeDeferred(p.stopListening)
|
|
else:
|
|
return defer.succeed(True)
|
|
|
|
def _setup_stream_identifier(self):
|
|
add_lbry_file_to_sd_identifier(self.sd_identifier)
|
|
file_saver_factory = LBRYFileSaverFactory(self.session.peer_finder, self.session.rate_limiter,
|
|
self.session.blob_manager, self.stream_info_manager,
|
|
self.session.wallet, self.download_directory)
|
|
self.sd_identifier.add_stream_downloader_factory(LBRYFileStreamType, file_saver_factory)
|
|
file_opener_factory = LBRYFileOpenerFactory(self.session.peer_finder, self.session.rate_limiter,
|
|
self.session.blob_manager, self.stream_info_manager,
|
|
self.session.wallet)
|
|
self.sd_identifier.add_stream_downloader_factory(LBRYFileStreamType, file_opener_factory)
|
|
|
|
def check_first_run(self):
|
|
d = self.session.wallet.check_first_run()
|
|
d.addCallback(lambda is_first_run: self._do_first_run() if is_first_run else 0.0)
|
|
return d
|
|
|
|
def _do_first_run(self):
|
|
d = self.session.wallet.get_new_address()
|
|
|
|
def send_request(url, data):
|
|
r = requests.post(url, json=data)
|
|
if r.status_code == 200:
|
|
return r.json()['credits_sent']
|
|
return 0.0
|
|
|
|
def log_error(err):
|
|
log.warning("unable to request free credits. %s", err.getErrorMessage())
|
|
return 0.0
|
|
|
|
def request_credits(address):
|
|
url = "http://credreq.lbry.io/requestcredits"
|
|
data = {"address": address}
|
|
d = threads.deferToThread(send_request, url, data)
|
|
d.addErrback(log_error)
|
|
return d
|
|
|
|
d.addCallback(request_credits)
|
|
return d
|
|
|
|
def _resolve_name(self, uri):
|
|
return self.session.wallet.get_stream_info_for_name(uri)
|
|
|
|
def download_stream(self, stream_frame, uri):
|
|
resolve_d = self._resolve_name(uri)
|
|
|
|
stream_frame.show_metadata_status("resolving name...")
|
|
|
|
stream_frame.cancel_func = resolve_d.cancel
|
|
payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager)
|
|
|
|
def update_stream_name(value):
|
|
if 'name' in value:
|
|
stream_frame.show_name(value['name'])
|
|
if 'description' in value:
|
|
stream_frame.show_description(value['description'])
|
|
return value
|
|
|
|
def get_sd_hash(value):
|
|
if 'stream_hash' in value:
|
|
return value['stream_hash']
|
|
raise UnknownNameError(uri)
|
|
|
|
def get_sd_blob(sd_hash):
|
|
stream_frame.show_metadata_status("name resolved, fetching metadata...")
|
|
get_sd_d = StreamDescriptor.download_sd_blob(self.session, sd_hash,
|
|
payment_rate_manager)
|
|
get_sd_d.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
|
|
get_sd_d.addCallbacks(choose_download_factory, bad_sd_blob)
|
|
return get_sd_d
|
|
|
|
def get_info_from_validator(info_validator):
|
|
stream_name = None
|
|
stream_size = None
|
|
for field, val in info_validator.info_to_show():
|
|
if field == "suggested_file_name":
|
|
stream_name = val
|
|
elif field == "stream_name" and stream_name is None:
|
|
stream_name = val
|
|
elif field == "stream_size":
|
|
stream_size = int(val)
|
|
if stream_size is None:
|
|
stream_size = "unknown"
|
|
if stream_name is None:
|
|
stream_name = "unknown"
|
|
return stream_name, stream_size
|
|
|
|
def choose_download_factory(metadata):
|
|
#info_validator, options, factories = info_and_factories
|
|
stream_name, stream_size = get_info_from_validator(metadata.validator)
|
|
if isinstance(stream_size, (int, long)):
|
|
price = payment_rate_manager.get_effective_min_blob_data_payment_rate()
|
|
estimated_cost = stream_size * 1.0 / 2**20 * price
|
|
else:
|
|
estimated_cost = "unknown"
|
|
|
|
stream_frame.show_stream_metadata(stream_name, stream_size)
|
|
|
|
available_options = metadata.options.get_downloader_options(metadata.validator,
|
|
payment_rate_manager)
|
|
|
|
stream_frame.show_download_options(available_options)
|
|
|
|
get_downloader_d = defer.Deferred()
|
|
|
|
def create_downloader(f, chosen_options):
|
|
|
|
def fire_get_downloader_d(downloader):
|
|
if not get_downloader_d.called:
|
|
get_downloader_d.callback(downloader)
|
|
|
|
stream_frame.disable_download_buttons()
|
|
d = f.make_downloader(metadata, chosen_options,
|
|
payment_rate_manager)
|
|
d.addCallback(fire_get_downloader_d)
|
|
|
|
for factory in metadata.factories:
|
|
|
|
def choose_factory(f=factory):
|
|
chosen_options = stream_frame.get_chosen_options()
|
|
create_downloader(f, chosen_options)
|
|
|
|
stream_frame.add_download_factory(factory, choose_factory)
|
|
|
|
get_downloader_d.addCallback(start_download)
|
|
|
|
return get_downloader_d
|
|
|
|
def show_stream_status(downloader):
|
|
total_bytes = downloader.get_total_bytes_cached()
|
|
bytes_left_to_download = downloader.get_bytes_left_to_download()
|
|
points_paid = payment_rate_manager.points_paid
|
|
payment_rate = payment_rate_manager.get_effective_min_blob_data_payment_rate()
|
|
points_remaining = 1.0 * bytes_left_to_download * payment_rate / 2**20
|
|
stream_frame.show_progress(total_bytes, bytes_left_to_download,
|
|
points_paid, points_remaining)
|
|
|
|
def show_finished(arg, downloader):
|
|
show_stream_status(downloader)
|
|
stream_frame.show_download_done(payment_rate_manager.points_paid)
|
|
return arg
|
|
|
|
def start_download(downloader):
|
|
stream_frame.stream_hash = downloader.stream_hash
|
|
l = task.LoopingCall(show_stream_status, downloader)
|
|
l.start(1)
|
|
d = downloader.start()
|
|
stream_frame.cancel_func = downloader.stop
|
|
|
|
def stop_looping_call(arg):
|
|
l.stop()
|
|
stream_frame.cancel_func = resolve_d.cancel
|
|
return arg
|
|
|
|
d.addBoth(stop_looping_call)
|
|
d.addCallback(show_finished, downloader)
|
|
return d
|
|
|
|
def lookup_failed(err):
|
|
stream_frame.show_metadata_status("name lookup failed")
|
|
return err
|
|
|
|
def bad_sd_blob(err):
|
|
stream_frame.show_metadata_status("Unknown type or badly formed metadata")
|
|
return err
|
|
|
|
resolve_d.addCallback(update_stream_name)
|
|
resolve_d.addCallback(get_sd_hash)
|
|
resolve_d.addCallbacks(get_sd_blob, lookup_failed)
|
|
|
|
def show_err(err):
|
|
tkMessageBox.showerror(title="Download Error", message=err.getErrorMessage())
|
|
log.error(err.getErrorMessage())
|
|
stream_frame.show_download_done(payment_rate_manager.points_paid)
|
|
|
|
resolve_d.addErrback(lambda err: err.trap(defer.CancelledError, UnknownNameError,
|
|
UnknownStreamTypeError, InvalidStreamDescriptorError,
|
|
InvalidStreamInfoError))
|
|
resolve_d.addErrback(show_err)
|
|
|
|
def delete_associated_blobs():
|
|
if stream_frame.stream_hash is None or self.delete_blobs_on_remove is False:
|
|
return defer.succeed(True)
|
|
d1 = self.stream_info_manager.get_blobs_for_stream(stream_frame.stream_hash)
|
|
|
|
def get_blob_hashes(blob_infos):
|
|
return [b[0] for b in blob_infos if b[0] is not None]
|
|
|
|
d1.addCallback(get_blob_hashes)
|
|
d2 = self.stream_info_manager.get_sd_blob_hashes_for_stream(stream_frame.stream_hash)
|
|
|
|
def combine_blob_hashes(results):
|
|
blob_hashes = []
|
|
for success, result in results:
|
|
if success is True:
|
|
blob_hashes.extend(result)
|
|
return blob_hashes
|
|
|
|
def delete_blobs(blob_hashes):
|
|
return self.session.blob_manager.delete_blobs(blob_hashes)
|
|
|
|
dl = defer.DeferredList([d1, d2], fireOnOneErrback=True)
|
|
dl.addCallback(combine_blob_hashes)
|
|
dl.addCallback(delete_blobs)
|
|
return dl
|
|
|
|
resolve_d.addCallback(lambda _: delete_associated_blobs())
|
|
self._add_download_deferred(resolve_d, stream_frame)
|
|
|
|
def _add_download_deferred(self, d, stream_frame):
|
|
self.download_deferreds.append(d)
|
|
self.stream_frames.append(stream_frame)
|
|
|
|
def remove_from_list():
|
|
self.download_deferreds.remove(d)
|
|
self.stream_frames.remove(stream_frame)
|
|
|
|
d.addBoth(lambda _: remove_from_list()) |