From 2f8c645edc3d0963378d4b11120bf8805c975258 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Tue, 30 Jan 2018 20:16:25 -0500 Subject: [PATCH] download script now gets all blobs in stream if you give it an sd hash --- CHANGELOG.md | 1 + lbrynet/core/BlobManager.py | 2 +- lbrynet/core/SinglePeerDownloader.py | 9 +- lbrynet/core/client/ClientProtocol.py | 2 +- lbrynet/core/client/ClientRequest.py | 1 + scripts/download_blob_from_peer.py | 177 +++++++++----------------- 6 files changed, 67 insertions(+), 125 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4cf47f9ca..445f7faf2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -81,6 +81,7 @@ at anytime. * Removed `auto_re_reflect` setting from the conf file, use the `reflect_uploads` setting instead * Removed `include_tip_info` argument from `transaction_list`, which will now always include tip information. * Removed `seccure` and `gmpy` dependencies + * Removed TempBlobManager ## [0.18.0] - 2017-11-08 diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index 5d0556f13..7e5545745 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -34,7 +34,7 @@ class DiskBlobManager(DHTHashSupplier): # TODO: consider using an LRU for blobs as there could potentially # be thousands of blobs loaded up, many stale self.blobs = {} - self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)} + self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)} @defer.inlineCallbacks def setup(self): diff --git a/lbrynet/core/SinglePeerDownloader.py b/lbrynet/core/SinglePeerDownloader.py index 0f056d747..85ade1bb6 100644 --- a/lbrynet/core/SinglePeerDownloader.py +++ b/lbrynet/core/SinglePeerDownloader.py @@ -17,12 +17,6 @@ from lbrynet.core.client.ConnectionManager import ConnectionManager log = logging.getLogger(__name__) -class TempBlobManager(DiskBlobManager): - def stop(self): - self.db_conn.close() - return defer.succeed(True) - - class SinglePeerFinder(DummyPeerFinder): def __init__(self, peer): DummyPeerFinder.__init__(self) @@ -93,6 +87,7 @@ class SinglePeerDownloader(object): connection_manager = ConnectionManager(downloader, self._rate_limiter, [requester], [info_exchanger]) connection_manager.start() + result = yield blob.callback if not result: log.debug("Failed to downloaded %s from %s", blob_hash[:16], peer.host) @@ -102,7 +97,7 @@ class SinglePeerDownloader(object): @defer.inlineCallbacks def download_temp_blob_from_peer(self, peer, timeout, blob_hash): tmp_dir = yield threads.deferToThread(tempfile.mkdtemp) - tmp_blob_manager = TempBlobManager(self._announcer, tmp_dir, tmp_dir) + tmp_blob_manager = DiskBlobManager(self._announcer, tmp_dir, tmp_dir) try: result = yield self.download_blob_from_peer(peer, timeout, blob_hash, tmp_blob_manager) finally: diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index 061e87ad7..72024f7c7 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -47,7 +47,7 @@ class ClientProtocol(Protocol, TimeoutMixin): self._ask_for_request() def dataReceived(self, data): - log.debug("Data receieved from %s", self.peer) + log.debug("Received %d bytes from %s", len(data), self.peer) self.setTimeout(None) self._rate_limiter.report_dl_bytes(len(data)) diff --git a/lbrynet/core/client/ClientRequest.py b/lbrynet/core/client/ClientRequest.py index 1dee9b9d6..9f9854e6f 100644 --- a/lbrynet/core/client/ClientRequest.py +++ b/lbrynet/core/client/ClientRequest.py @@ -1,5 +1,6 @@ from lbrynet.blob.blob_file import MAX_BLOB_SIZE + class ClientRequest(object): def __init__(self, request_dict, response_identifier=None): self.request_dict = request_dict diff --git a/scripts/download_blob_from_peer.py b/scripts/download_blob_from_peer.py index 43a510328..c5263d29d 100644 --- a/scripts/download_blob_from_peer.py +++ b/scripts/download_blob_from_peer.py @@ -1,151 +1,96 @@ -"""A simple script that attempts to directly download a single blob from a given peer""" +"""A simple script that attempts to directly download a single blob or stream from a given peer""" import argparse import logging import sys -import os +import tempfile +import time +import shutil +from pprint import pprint -from twisted.internet import defer -from twisted.internet import reactor -from zope.interface import implements +from twisted.internet import defer, reactor, threads -from lbrynet import interfaces from lbrynet import conf -from lbrynet.core import log_support -from lbrynet.core import BlobManager -from lbrynet.core import HashAnnouncer -from lbrynet.blob import BlobFile -from lbrynet.core import RateLimiter -from lbrynet.core import Peer -from lbrynet.core import Wallet -from lbrynet.core.client import BlobRequester -from lbrynet.core.client import ConnectionManager - +from lbrynet.core import log_support, Wallet, Peer +from lbrynet.core.SinglePeerDownloader import SinglePeerDownloader +from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader +from lbrynet.core.BlobManager import DiskBlobManager +from lbrynet.core.HashAnnouncer import DummyHashAnnouncer log = logging.getLogger() -SUCCESS = False def main(args=None): conf.initialize_settings() parser = argparse.ArgumentParser() - parser.add_argument('--timeout', type=int, default=30) parser.add_argument('peer') parser.add_argument('blob_hash') - parser.add_argument('directory', type=str, default=os.getcwd()) + parser.add_argument('--timeout', type=int, default=30) args = parser.parse_args(args) log_support.configure_console(level='DEBUG') + log_support.configure_twisted() - announcer = HashAnnouncer.DummyHashAnnouncer() - blob_manager = MyBlobManager(announcer) - blob = BlobFile(args.directory, args.blob_hash) - download_manager = SingleBlobDownloadManager(blob) - peer = Peer.Peer(*conf.server_port(args.peer)) - payment_rate_manager = DumbPaymentRateManager() - wallet = getWallet() - requester = SingleBlobRequester( - peer, blob_manager, payment_rate_manager, wallet, download_manager) - rate_limiter = RateLimiter.DummyRateLimiter() - downloader = SingleBlobDownloader() - connection_manager = ConnectionManager.ConnectionManager( - downloader, rate_limiter, [requester], [wallet.get_info_exchanger()]) - reactor.callLater(args.timeout, reactor.stop) - d = connection_manager.start() - d.addErrback(log_support.failure, 'Something bad happened: %s') + if ":" in str(args.peer): + host, port = str(args.peer).strip().split(":") + else: + host = args.peer + port = 3333 + + d = download_it(Peer.Peer(host, int(port)), args.timeout, args.blob_hash) + d.addErrback(log.exception) + d.addBoth(lambda _: reactor.callLater(0, reactor.stop)) reactor.run() - if SUCCESS: - sys.exit(0) - else: - sys.exit(1) +@defer.inlineCallbacks +def download_it(peer, timeout, blob_hash): + tmp_dir = yield threads.deferToThread(tempfile.mkdtemp) + announcer = DummyHashAnnouncer() + tmp_blob_manager = DiskBlobManager(announcer, tmp_dir, tmp_dir) -class MyBlobManager(BlobManager.BlobManager): - def blob_completed(self, blob): - global SUCCESS - log.info('Blob has been downloaded, we can stop') - # this feels pretty hacky, but its as good of a stopping point as any - SUCCESS = True - reactor.stop() - - -def getWallet(): config = {'auto_connect': True} if conf.settings['lbryum_wallet_dir']: config['lbryum_path'] = conf.settings['lbryum_wallet_dir'] storage = Wallet.InMemoryStorage() - return Wallet.LBRYumWallet(storage, config) + wallet = Wallet.LBRYumWallet(storage, config) + downloader = SinglePeerDownloader() + downloader.setup(wallet) -class SingleBlobDownloader(object): - def insufficientfunds(self, err): - pass + try: + blob_downloaded = yield downloader.download_blob_from_peer(peer, timeout, blob_hash, + tmp_blob_manager) + if blob_downloaded: + log.info("SUCCESS!") + blob = yield tmp_blob_manager.get_blob(blob_hash) + pprint(blob) + if not blob.verified: + log.error("except that its not verified....") + else: + reader = BlobStreamDescriptorReader(blob) + info = None + for x in range(0, 3): + try: + info = yield reader.get_info() + except ValueError: + pass + if info: + break + time.sleep( + 0.1) # there's some kind of race condition where it sometimes doesnt write the blob to disk in time - -class SingleBlobDownloadManager(object): - def __init__(self, blob): - self.blob = blob - - def needed_blobs(self): - if self.blob.verified: - return [] + if info is not None: + pprint(info) + for content_blob in info['blobs']: + if 'blob_hash' in content_blob: + yield download_it(peer, timeout, content_blob['blob_hash']) else: - return [self.blob] + log.error("Download failed") + finally: + yield tmp_blob_manager.stop() + yield threads.deferToThread(shutil.rmtree, tmp_dir) - -class NullStrategy(object): - def __init__(self): - self.pending_sent_offers = {} - - -class DumbPaymentRateManager(object): - def __init__(self): - self.strategy = NullStrategy() - - def price_limit_reached(self, peer): - return False - - def get_rate_blob_data(self, *args): - return 0.0 - - def record_offer_reply(self, peer, offer): - pass - - def record_points_paid(self, point_ammount): - pass - - -class FreeDownload(BlobRequester.DownloadRequest): - def _pay_peer(self, *args): - # TODO: somewhere I missed the part that is supposed to get - # and address from the remote server for where to send - # data fees to so we can't make payments. Probably has - # to do with the wallet_info_exchanger - pass - - -class SingleBlobRequester(BlobRequester.BlobRequester): - implements(interfaces.IRequestCreator) - DownloadRequest = FreeDownload - - def __init__(self, peer, blob_manager, payment_rate_manager, wallet, download_manager): - self.peer = peer - self.sent = False - BlobRequester.BlobRequester.__init__( - self, blob_manager, None, payment_rate_manager, wallet, download_manager) - - def __repr__(self): - return 'SingleBlobRequestor({!r})'.format(self.peer) - - def get_new_peers(self): - if self.sent: - return defer.succeed([]) - else: - self.sent = True - return defer.succeed([self.peer]) - - def send_next_request(self, peer, protocol): - return self._send_next_request(peer, protocol) + defer.returnValue(True) if __name__ == '__main__':