forked from LBRYCommunity/lbry-sdk
Merge branch 'remove_temp_blob_manager'
* remove_temp_blob_manager: download script now gets all blobs in stream if you give it an sd hash
This commit is contained in:
commit
21e653367f
6 changed files with 67 additions and 125 deletions
|
@ -81,6 +81,7 @@ at anytime.
|
|||
* Removed `auto_re_reflect` setting from the conf file, use the `reflect_uploads` setting instead
|
||||
* Removed `include_tip_info` argument from `transaction_list`, which will now always include tip information.
|
||||
* Removed `seccure` and `gmpy` dependencies
|
||||
* Removed TempBlobManager
|
||||
|
||||
|
||||
## [0.18.0] - 2017-11-08
|
||||
|
|
|
@ -34,7 +34,7 @@ class DiskBlobManager(DHTHashSupplier):
|
|||
# TODO: consider using an LRU for blobs as there could potentially
|
||||
# be thousands of blobs loaded up, many stale
|
||||
self.blobs = {}
|
||||
self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)}
|
||||
self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def setup(self):
|
||||
|
|
|
@ -17,12 +17,6 @@ from lbrynet.core.client.ConnectionManager import ConnectionManager
|
|||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TempBlobManager(DiskBlobManager):
|
||||
def stop(self):
|
||||
self.db_conn.close()
|
||||
return defer.succeed(True)
|
||||
|
||||
|
||||
class SinglePeerFinder(DummyPeerFinder):
|
||||
def __init__(self, peer):
|
||||
DummyPeerFinder.__init__(self)
|
||||
|
@ -93,6 +87,7 @@ class SinglePeerDownloader(object):
|
|||
connection_manager = ConnectionManager(downloader, self._rate_limiter, [requester],
|
||||
[info_exchanger])
|
||||
connection_manager.start()
|
||||
|
||||
result = yield blob.callback
|
||||
if not result:
|
||||
log.debug("Failed to downloaded %s from %s", blob_hash[:16], peer.host)
|
||||
|
@ -102,7 +97,7 @@ class SinglePeerDownloader(object):
|
|||
@defer.inlineCallbacks
|
||||
def download_temp_blob_from_peer(self, peer, timeout, blob_hash):
|
||||
tmp_dir = yield threads.deferToThread(tempfile.mkdtemp)
|
||||
tmp_blob_manager = TempBlobManager(self._announcer, tmp_dir, tmp_dir)
|
||||
tmp_blob_manager = DiskBlobManager(self._announcer, tmp_dir, tmp_dir)
|
||||
try:
|
||||
result = yield self.download_blob_from_peer(peer, timeout, blob_hash, tmp_blob_manager)
|
||||
finally:
|
||||
|
|
|
@ -47,7 +47,7 @@ class ClientProtocol(Protocol, TimeoutMixin):
|
|||
self._ask_for_request()
|
||||
|
||||
def dataReceived(self, data):
|
||||
log.debug("Data receieved from %s", self.peer)
|
||||
log.debug("Received %d bytes from %s", len(data), self.peer)
|
||||
self.setTimeout(None)
|
||||
self._rate_limiter.report_dl_bytes(len(data))
|
||||
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
|
||||
|
||||
|
||||
class ClientRequest(object):
|
||||
def __init__(self, request_dict, response_identifier=None):
|
||||
self.request_dict = request_dict
|
||||
|
|
|
@ -1,151 +1,96 @@
|
|||
"""A simple script that attempts to directly download a single blob from a given peer"""
|
||||
"""A simple script that attempts to directly download a single blob or stream from a given peer"""
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
import os
|
||||
import tempfile
|
||||
import time
|
||||
import shutil
|
||||
from pprint import pprint
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet import reactor
|
||||
from zope.interface import implements
|
||||
from twisted.internet import defer, reactor, threads
|
||||
|
||||
from lbrynet import interfaces
|
||||
from lbrynet import conf
|
||||
from lbrynet.core import log_support
|
||||
from lbrynet.core import BlobManager
|
||||
from lbrynet.core import HashAnnouncer
|
||||
from lbrynet.blob import BlobFile
|
||||
from lbrynet.core import RateLimiter
|
||||
from lbrynet.core import Peer
|
||||
from lbrynet.core import Wallet
|
||||
from lbrynet.core.client import BlobRequester
|
||||
from lbrynet.core.client import ConnectionManager
|
||||
|
||||
from lbrynet.core import log_support, Wallet, Peer
|
||||
from lbrynet.core.SinglePeerDownloader import SinglePeerDownloader
|
||||
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader
|
||||
from lbrynet.core.BlobManager import DiskBlobManager
|
||||
from lbrynet.core.HashAnnouncer import DummyHashAnnouncer
|
||||
|
||||
log = logging.getLogger()
|
||||
SUCCESS = False
|
||||
|
||||
|
||||
def main(args=None):
|
||||
conf.initialize_settings()
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--timeout', type=int, default=30)
|
||||
parser.add_argument('peer')
|
||||
parser.add_argument('blob_hash')
|
||||
parser.add_argument('directory', type=str, default=os.getcwd())
|
||||
parser.add_argument('--timeout', type=int, default=30)
|
||||
args = parser.parse_args(args)
|
||||
|
||||
log_support.configure_console(level='DEBUG')
|
||||
log_support.configure_twisted()
|
||||
|
||||
announcer = HashAnnouncer.DummyHashAnnouncer()
|
||||
blob_manager = MyBlobManager(announcer)
|
||||
blob = BlobFile(args.directory, args.blob_hash)
|
||||
download_manager = SingleBlobDownloadManager(blob)
|
||||
peer = Peer.Peer(*conf.server_port(args.peer))
|
||||
payment_rate_manager = DumbPaymentRateManager()
|
||||
wallet = getWallet()
|
||||
requester = SingleBlobRequester(
|
||||
peer, blob_manager, payment_rate_manager, wallet, download_manager)
|
||||
rate_limiter = RateLimiter.DummyRateLimiter()
|
||||
downloader = SingleBlobDownloader()
|
||||
connection_manager = ConnectionManager.ConnectionManager(
|
||||
downloader, rate_limiter, [requester], [wallet.get_info_exchanger()])
|
||||
reactor.callLater(args.timeout, reactor.stop)
|
||||
d = connection_manager.start()
|
||||
d.addErrback(log_support.failure, 'Something bad happened: %s')
|
||||
if ":" in str(args.peer):
|
||||
host, port = str(args.peer).strip().split(":")
|
||||
else:
|
||||
host = args.peer
|
||||
port = 3333
|
||||
|
||||
d = download_it(Peer.Peer(host, int(port)), args.timeout, args.blob_hash)
|
||||
d.addErrback(log.exception)
|
||||
d.addBoth(lambda _: reactor.callLater(0, reactor.stop))
|
||||
reactor.run()
|
||||
|
||||
if SUCCESS:
|
||||
sys.exit(0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def download_it(peer, timeout, blob_hash):
|
||||
tmp_dir = yield threads.deferToThread(tempfile.mkdtemp)
|
||||
announcer = DummyHashAnnouncer()
|
||||
tmp_blob_manager = DiskBlobManager(announcer, tmp_dir, tmp_dir)
|
||||
|
||||
class MyBlobManager(BlobManager.BlobManager):
|
||||
def blob_completed(self, blob):
|
||||
global SUCCESS
|
||||
log.info('Blob has been downloaded, we can stop')
|
||||
# this feels pretty hacky, but its as good of a stopping point as any
|
||||
SUCCESS = True
|
||||
reactor.stop()
|
||||
|
||||
|
||||
def getWallet():
|
||||
config = {'auto_connect': True}
|
||||
if conf.settings['lbryum_wallet_dir']:
|
||||
config['lbryum_path'] = conf.settings['lbryum_wallet_dir']
|
||||
storage = Wallet.InMemoryStorage()
|
||||
return Wallet.LBRYumWallet(storage, config)
|
||||
wallet = Wallet.LBRYumWallet(storage, config)
|
||||
|
||||
downloader = SinglePeerDownloader()
|
||||
downloader.setup(wallet)
|
||||
|
||||
class SingleBlobDownloader(object):
|
||||
def insufficientfunds(self, err):
|
||||
pass
|
||||
try:
|
||||
blob_downloaded = yield downloader.download_blob_from_peer(peer, timeout, blob_hash,
|
||||
tmp_blob_manager)
|
||||
if blob_downloaded:
|
||||
log.info("SUCCESS!")
|
||||
blob = yield tmp_blob_manager.get_blob(blob_hash)
|
||||
pprint(blob)
|
||||
if not blob.verified:
|
||||
log.error("except that its not verified....")
|
||||
else:
|
||||
reader = BlobStreamDescriptorReader(blob)
|
||||
info = None
|
||||
for x in range(0, 3):
|
||||
try:
|
||||
info = yield reader.get_info()
|
||||
except ValueError:
|
||||
pass
|
||||
if info:
|
||||
break
|
||||
time.sleep(
|
||||
0.1) # there's some kind of race condition where it sometimes doesnt write the blob to disk in time
|
||||
|
||||
|
||||
class SingleBlobDownloadManager(object):
|
||||
def __init__(self, blob):
|
||||
self.blob = blob
|
||||
|
||||
def needed_blobs(self):
|
||||
if self.blob.verified:
|
||||
return []
|
||||
if info is not None:
|
||||
pprint(info)
|
||||
for content_blob in info['blobs']:
|
||||
if 'blob_hash' in content_blob:
|
||||
yield download_it(peer, timeout, content_blob['blob_hash'])
|
||||
else:
|
||||
return [self.blob]
|
||||
log.error("Download failed")
|
||||
finally:
|
||||
yield tmp_blob_manager.stop()
|
||||
yield threads.deferToThread(shutil.rmtree, tmp_dir)
|
||||
|
||||
|
||||
class NullStrategy(object):
|
||||
def __init__(self):
|
||||
self.pending_sent_offers = {}
|
||||
|
||||
|
||||
class DumbPaymentRateManager(object):
|
||||
def __init__(self):
|
||||
self.strategy = NullStrategy()
|
||||
|
||||
def price_limit_reached(self, peer):
|
||||
return False
|
||||
|
||||
def get_rate_blob_data(self, *args):
|
||||
return 0.0
|
||||
|
||||
def record_offer_reply(self, peer, offer):
|
||||
pass
|
||||
|
||||
def record_points_paid(self, point_ammount):
|
||||
pass
|
||||
|
||||
|
||||
class FreeDownload(BlobRequester.DownloadRequest):
|
||||
def _pay_peer(self, *args):
|
||||
# TODO: somewhere I missed the part that is supposed to get
|
||||
# and address from the remote server for where to send
|
||||
# data fees to so we can't make payments. Probably has
|
||||
# to do with the wallet_info_exchanger
|
||||
pass
|
||||
|
||||
|
||||
class SingleBlobRequester(BlobRequester.BlobRequester):
|
||||
implements(interfaces.IRequestCreator)
|
||||
DownloadRequest = FreeDownload
|
||||
|
||||
def __init__(self, peer, blob_manager, payment_rate_manager, wallet, download_manager):
|
||||
self.peer = peer
|
||||
self.sent = False
|
||||
BlobRequester.BlobRequester.__init__(
|
||||
self, blob_manager, None, payment_rate_manager, wallet, download_manager)
|
||||
|
||||
def __repr__(self):
|
||||
return 'SingleBlobRequestor({!r})'.format(self.peer)
|
||||
|
||||
def get_new_peers(self):
|
||||
if self.sent:
|
||||
return defer.succeed([])
|
||||
else:
|
||||
self.sent = True
|
||||
return defer.succeed([self.peer])
|
||||
|
||||
def send_next_request(self, peer, protocol):
|
||||
return self._send_next_request(peer, protocol)
|
||||
defer.returnValue(True)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
Loading…
Reference in a new issue