refactor mirroring

This commit is contained in:
Victor Shyba 2018-07-26 22:49:35 -03:00
parent 9ab256df30
commit ff4aba9423
3 changed files with 64 additions and 42 deletions

View file

@ -27,7 +27,7 @@ class HashBlobWriter(object):
def write(self, data):
if self.write_handle is None:
log.exception("writer has already been closed")
log.warning("writer has already been closed")
raise IOError('I/O operation on closed file')
self._hashsum.update(data)

View file

@ -3,7 +3,8 @@ import logging
from twisted.internet import defer
import treq
from twisted.internet.task import LoopingCall
from lbrynet.core.Error import DownloadCanceledError
log = logging.getLogger(__name__)
@ -21,65 +22,76 @@ class HTTPBlobDownloader(object):
self.servers = servers or []
self.client = client or treq
self.blob_hashes = blob_hashes or []
self.looping_call = LoopingCall(self._download_next_blob_hash_for_file)
self.failures = 0
self.max_failures = 3
self.interval = 1
@property
def running(self):
return self.looping_call.running
self.running = False
self.semaphore = defer.DeferredSemaphore(2)
self.deferreds = []
self.writers = []
def start(self):
if not self.running and self.blob_hashes and self.servers:
return self.looping_call.start(self.interval, now=True)
return self._start()
defer.succeed(None)
def stop(self):
if self.running:
for d in reversed(self.deferreds):
d.cancel()
for writer in self.writers:
writer.close(DownloadCanceledError())
self.running = False
self.blob_hashes = []
return self.looping_call.stop()
@defer.inlineCallbacks
def _download_next_blob_hash_for_file(self):
def _start(self):
self.running = True
dl = []
for blob_hash in self.blob_hashes:
blob = yield self.blob_manager.get_blob(blob_hash)
if not blob.verified:
self.download_blob(blob)
return
self.stop()
dl.append(self.semaphore.run(self.download_blob, blob))
self.deferreds = dl
yield defer.DeferredList(dl, consumeErrors=True)
@defer.inlineCallbacks
def download_blob(self, blob):
try:
yield self._download_blob(blob)
self.failures = 0
except Exception as exception:
self.failures += 1
log.exception('Mirror failed downloading')
if self.failures >= self.max_failures:
self.stop()
self.failures = 0
for _ in range(self.max_failures):
writer, finished_deferred = blob.open_for_writing('mirror')
self.writers.append(writer)
try:
downloaded = yield self._write_blob(writer, blob)
if downloaded:
yield finished_deferred # yield for verification errors, so we log them
if blob.verified:
log.info('Mirror completed download for %s', blob.blob_hash)
break
except (IOError, Exception) as e:
if isinstance(e, DownloadCanceledError) or 'closed file' in str(e):
# some other downloader finished first or it was simply cancelled
log.info("Mirror download cancelled: %s", blob.blob_hash)
break
else:
log.exception('Mirror failed downloading')
finally:
finished_deferred.addBoth(lambda _: None) # suppress echoed errors
if 'mirror' in blob.writers:
writer.close()
self.writers.remove(writer)
@defer.inlineCallbacks
def _download_blob(self, blob):
if not blob.get_is_verified() and not blob.is_downloading() and 'mirror' not in blob.writers:
response = yield self.client.get(url_for(choice(self.servers), blob.blob_hash))
if response.code != 200:
log.debug('[Mirror] Missing a blob: %s', blob.blob_hash)
if blob.blob_hash in self.blob_hashes:
self.blob_hashes.remove(blob.blob_hash)
defer.returnValue(blob.blob_hash)
log.debug('[Mirror] Download started: %s', blob.blob_hash)
blob.set_length(response.length)
writer, finished_deferred = blob.open_for_writing('mirror')
try:
yield self.client.collect(response, writer.write)
log.info('Mirror completed download for %s', blob.blob_hash)
except Exception as e:
writer.close(e)
yield finished_deferred
defer.returnValue(blob.blob_hash)
def _write_blob(self, writer, blob):
response = yield self.client.get(url_for(choice(self.servers), blob.blob_hash))
if response.code != 200:
log.debug('[Mirror] Missing a blob: %s', blob.blob_hash)
if blob.blob_hash in self.blob_hashes:
self.blob_hashes.remove(blob.blob_hash)
defer.returnValue(False)
log.debug('[Mirror] Download started: %s', blob.blob_hash)
blob.set_length(response.length)
yield self.client.collect(response, writer.write)
defer.returnValue(True)
def url_for(server, blob_hash=''):

View file

@ -36,6 +36,16 @@ class HTTPBlobDownloaderTest(unittest.TestCase):
self.assertEqual(self.blob.get_is_verified(), True)
self.assertEqual(self.blob.writers, {})
@defer.inlineCallbacks
def test_peer_finished_first_causing_a_write_on_closed_handle(self):
self.client.collect.side_effect = lambda response, write: defer.fail(IOError('I/O operation on closed file'))
yield self.downloader.start()
self.blob_manager.get_blob.assert_called_with(self.blob_hash)
self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash))
self.client.collect.assert_called()
self.assertEqual(self.blob.get_length(), self.response.length)
self.assertEqual(self.blob.writers, {})
@defer.inlineCallbacks
def test_download_transfer_failed(self):
self.client.collect.side_effect = lambda response, write: defer.fail(Exception())