remove TempBlob, clean up blob classes, use FBP and BytesIO in BlobFile

This commit is contained in:
Jack Robison 2017-09-07 11:53:02 -04:00
parent 6022aa925b
commit c53a189605
No known key found for this signature in database
GPG key ID: 284699E7404E3CFF
3 changed files with 40 additions and 134 deletions

View file

@ -1,10 +1,7 @@
from StringIO import StringIO
from io import BytesIO from io import BytesIO
import logging import logging
import os import os
import tempfile
import threading import threading
import shutil
from twisted.internet import interfaces, defer, threads from twisted.internet import interfaces, defer, threads
from twisted.protocols.basic import FileSender from twisted.protocols.basic import FileSender
from twisted.web.client import FileBodyProducer from twisted.web.client import FileBodyProducer
@ -120,7 +117,6 @@ class HashBlob(object):
return False return False
def read(self, write_func): def read(self, write_func):
def close_self(*args): def close_self(*args):
self.close_read_handle(file_handle) self.close_read_handle(file_handle)
return args[0] return args[0]
@ -182,22 +178,22 @@ class HashBlob(object):
return d return d
def open_for_writing(self, peer): def open_for_writing(self, peer):
pass raise NotImplementedError()
def open_for_reading(self): def open_for_reading(self):
pass raise NotImplementedError()
def delete(self): def delete(self):
pass raise NotImplementedError()
def close_read_handle(self, file_handle): def close_read_handle(self, file_handle):
pass raise NotImplementedError()
def _close_writer(self, writer): def _close_writer(self, writer):
pass raise NotImplementedError()
def _save_verified_blob(self, writer): def _save_verified_blob(self, writer):
pass raise NotImplementedError()
def __str__(self): def __str__(self):
return self.blob_hash[:16] return self.blob_hash[:16]
@ -209,12 +205,13 @@ class HashBlob(object):
class BlobFile(HashBlob): class BlobFile(HashBlob):
"""A HashBlob which will be saved to the hard disk of the downloader""" """A HashBlob which will be saved to the hard disk of the downloader"""
def __init__(self, blob_dir, *args): def __init__(self, blob_dir, blob_hash, length=None):
HashBlob.__init__(self, *args) HashBlob.__init__(self, blob_hash, length)
self.blob_dir = blob_dir self.blob_dir = blob_dir
self.file_path = os.path.join(blob_dir, self.blob_hash) self.file_path = os.path.join(blob_dir, self.blob_hash)
self.setting_verified_blob_lock = threading.Lock() self.setting_verified_blob_lock = threading.Lock()
self.moved_verified_blob = False self.moved_verified_blob = False
self.buffer = BytesIO()
if os.path.isfile(self.file_path): if os.path.isfile(self.file_path):
self.set_length(os.path.getsize(self.file_path)) self.set_length(os.path.getsize(self.file_path))
# This assumes that the hash of the blob has already been # This assumes that the hash of the blob has already been
@ -227,10 +224,8 @@ class BlobFile(HashBlob):
def open_for_writing(self, peer): def open_for_writing(self, peer):
if not peer in self.writers: if not peer in self.writers:
log.debug("Opening %s to be written by %s", str(self), str(peer)) log.debug("Opening %s to be written by %s", str(self), str(peer))
write_file = tempfile.NamedTemporaryFile(delete=False, dir=self.blob_dir)
finished_deferred = defer.Deferred() finished_deferred = defer.Deferred()
writer = HashBlobWriter(write_file, self.get_length, self.writer_finished) writer = HashBlobWriter(self.buffer, self.get_length, self.writer_finished)
self.writers[peer] = (writer, finished_deferred) self.writers[peer] = (writer, finished_deferred)
return finished_deferred, writer.write, writer.cancel return finished_deferred, writer.write, writer.cancel
log.warning("Tried to download the same file twice simultaneously from the same peer") log.warning("Tried to download the same file twice simultaneously from the same peer")
@ -278,137 +273,48 @@ class BlobFile(HashBlob):
def _close_writer(self, writer): def _close_writer(self, writer):
if writer.write_handle is not None: if writer.write_handle is not None:
log.debug("Closing %s", str(self)) log.debug("Closing %s", str(self))
name = writer.write_handle.name
writer.write_handle.close() writer.write_handle.close()
threads.deferToThread(os.remove, name)
writer.write_handle = None writer.write_handle = None
@defer.inlineCallbacks
def _save_verified_blob(self, writer): def _save_verified_blob(self, writer):
def move_file():
with self.setting_verified_blob_lock: with self.setting_verified_blob_lock:
if self.moved_verified_blob is False: if self.moved_verified_blob is False:
temp_file_name = writer.write_handle.name self.buffer.seek(0)
writer.write_handle.close() out_path = os.path.join(self.blob_dir, self.blob_hash)
shutil.move(temp_file_name, self.file_path) producer = FileBodyProducer(self.buffer)
writer.write_handle = None yield producer.startProducing(open(out_path, 'wb'))
self.moved_verified_blob = True self.moved_verified_blob = True
return True defer.returnValue(True)
else: else:
raise DownloadCanceledError() raise DownloadCanceledError()
return threads.deferToThread(move_file)
class BlobFileCreator(object):
class TempBlob(HashBlob): def __init__(self, blob_dir):
"""A HashBlob which will only exist in memory""" self.blob_dir = blob_dir
def __init__(self, *args): self.buffer = BytesIO()
HashBlob.__init__(self, *args) self._is_open = True
self.data_buffer = ""
def open_for_writing(self, peer):
if not peer in self.writers:
temp_buffer = StringIO()
finished_deferred = defer.Deferred()
writer = HashBlobWriter(temp_buffer, self.get_length, self.writer_finished)
self.writers[peer] = (writer, finished_deferred)
return finished_deferred, writer.write, writer.cancel
return None, None, None
def open_for_reading(self):
if self._verified is True:
return StringIO(self.data_buffer)
return None
def delete(self):
if not self.writers and not self.readers:
self._verified = False
self.data_buffer = ''
return defer.succeed(True)
else:
return defer.fail(Failure(
ValueError("Blob is currently being read or written and cannot be deleted")))
def close_read_handle(self, file_handle):
file_handle.close()
def _close_writer(self, writer):
if writer.write_handle is not None:
writer.write_handle.close()
writer.write_handle = None
def _save_verified_blob(self, writer):
if not self.data_buffer:
self.data_buffer = writer.write_handle.getvalue()
writer.write_handle.close()
writer.write_handle = None
return defer.succeed(True)
else:
return defer.fail(Failure(DownloadCanceledError()))
class HashBlobCreator(object):
def __init__(self):
self._hashsum = get_lbry_hash_obj() self._hashsum = get_lbry_hash_obj()
self.len_so_far = 0 self.len_so_far = 0
self.blob_hash = None self.blob_hash = None
self.length = None self.length = None
def open(self): @defer.inlineCallbacks
pass
def close(self): def close(self):
self.length = self.len_so_far self.length = self.len_so_far
if self.length == 0:
self.blob_hash = None
else:
self.blob_hash = self._hashsum.hexdigest() self.blob_hash = self._hashsum.hexdigest()
d = self._close() if self.blob_hash and self._is_open:
if self.blob_hash is not None:
d.addCallback(lambda _: self.blob_hash)
else:
d.addCallback(lambda _: None)
return d
def write(self, data):
self._hashsum.update(data)
self.len_so_far += len(data)
self._write(data)
def _close(self):
pass
def _write(self, data):
pass
class BlobFileCreator(HashBlobCreator):
def __init__(self, blob_dir):
HashBlobCreator.__init__(self)
self.blob_dir = blob_dir
self.buffer = BytesIO()
def _close(self):
if self.blob_hash is not None:
self.buffer.seek(0) self.buffer.seek(0)
out_path = os.path.join(self.blob_dir, self.blob_hash) out_path = os.path.join(self.blob_dir, self.blob_hash)
producer = FileBodyProducer(self.buffer) producer = FileBodyProducer(self.buffer)
return producer.startProducing(open(out_path, 'wb')) yield producer.startProducing(open(out_path, 'wb'))
return defer.succeed(True) self._is_open = False
defer.returnValue(self.blob_hash)
def _write(self, data): def write(self, data):
if not self._is_open:
raise IOError
self._hashsum.update(data)
self.len_so_far += len(data)
self.buffer.write(data) self.buffer.write(data)
class TempBlobCreator(HashBlobCreator):
def __init__(self):
HashBlobCreator.__init__(self)
# TODO: use StringIO
self.data_buffer = ''
def _close(self):
return defer.succeed(True)
def _write(self, data):
self.data_buffer += data

View file

@ -2,7 +2,7 @@
import argparse import argparse
import logging import logging
import sys import sys
import tempfile import os
from twisted.internet import defer from twisted.internet import defer
from twisted.internet import reactor from twisted.internet import reactor
@ -31,13 +31,14 @@ def main(args=None):
parser.add_argument('--timeout', type=int, default=30) parser.add_argument('--timeout', type=int, default=30)
parser.add_argument('peer') parser.add_argument('peer')
parser.add_argument('blob_hash') parser.add_argument('blob_hash')
parser.add_argument('directory', type=str, default=os.getcwd())
args = parser.parse_args(args) args = parser.parse_args(args)
log_support.configure_console(level='DEBUG') log_support.configure_console(level='DEBUG')
announcer = HashAnnouncer.DummyHashAnnouncer() announcer = HashAnnouncer.DummyHashAnnouncer()
blob_manager = MyBlobManager(announcer) blob_manager = MyBlobManager(announcer)
blob = HashBlob.TempBlob(args.blob_hash, False) blob = HashBlob.BlobFile(args.directory, args.blob_hash)
download_manager = SingleBlobDownloadManager(blob) download_manager = SingleBlobDownloadManager(blob)
peer = Peer.Peer(*conf.server_port(args.peer)) peer = Peer.Peer(*conf.server_port(args.peer))
payment_rate_manager = DumbPaymentRateManager() payment_rate_manager = DumbPaymentRateManager()

View file

@ -1,7 +1,6 @@
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import defer from twisted.internet import defer
from lbrynet.cryptstream import CryptBlob from lbrynet.cryptstream import CryptBlob
from lbrynet.core.HashBlob import TempBlobCreator
from lbrynet import conf from lbrynet import conf
from tests.mocks import mock_conf_settings from tests.mocks import mock_conf_settings