split up HashBlob.py into lbrynet.blob

This commit is contained in:
Jack Robison 2017-09-13 15:46:39 -04:00
parent 7d6e62eb77
commit 14636a5d38
No known key found for this signature in database
GPG key ID: 284699E7404E3CFF
9 changed files with 141 additions and 117 deletions

4
lbrynet/blob/__init__.py Normal file
View file

@ -0,0 +1,4 @@
from blob_file import BlobFile
from creator import BlobFileCreator
from writer import HashBlobWriter
from reader import HashBlobReader

View file

@ -1,93 +1,20 @@
from io import BytesIO
import logging import logging
import os import os
import threading import threading
from twisted.internet import interfaces, defer, threads from twisted.internet import defer, threads
from twisted.protocols.basic import FileSender from twisted.protocols.basic import FileSender
from twisted.web.client import FileBodyProducer from twisted.web.client import FileBodyProducer
from twisted.python.failure import Failure from twisted.python.failure import Failure
from zope.interface import implements
from lbrynet import conf from lbrynet import conf
from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError
from lbrynet.core.cryptoutils import get_lbry_hash_obj
from lbrynet.core.utils import is_valid_blobhash from lbrynet.core.utils import is_valid_blobhash
from lbrynet.blob.writer import HashBlobWriter
from lbrynet.blob.reader import HashBlobReader
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class HashBlobReader(object):
implements(interfaces.IConsumer)
def __init__(self, write_func):
self.write_func = write_func
def registerProducer(self, producer, streaming):
from twisted.internet import reactor
self.producer = producer
self.streaming = streaming
if self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
def unregisterProducer(self):
pass
def write(self, data):
from twisted.internet import reactor
self.write_func(data)
if self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
class HashBlobWriter(object):
def __init__(self, length_getter, finished_cb):
self.write_handle = BytesIO()
self.length_getter = length_getter
self.finished_cb = finished_cb
self.finished_cb_d = None
self._hashsum = get_lbry_hash_obj()
self.len_so_far = 0
@property
def blob_hash(self):
return self._hashsum.hexdigest()
def write(self, data):
if self.write_handle is None:
log.info("writer has already been closed")
# can this be changed to IOError?
raise ValueError('I/O operation on closed file')
self._hashsum.update(data)
self.len_so_far += len(data)
if self.len_so_far > self.length_getter():
self.finished_cb_d = self.finished_cb(
self,
Failure(InvalidDataError("Length so far is greater than the expected length."
" %s to %s" % (self.len_so_far,
self.length_getter()))))
else:
self.write_handle.write(data)
if self.len_so_far == self.length_getter():
self.finished_cb_d = self.finished_cb(self)
def close_handle(self):
if self.write_handle is not None:
self.write_handle.close()
self.write_handle = None
def close(self, reason=None):
# if we've already called finished_cb because we either finished writing
# or closed already, do nothing
if self.finished_cb_d is not None:
return
if reason is None:
reason = Failure(DownloadCanceledError())
self.finished_cb_d = self.finished_cb(self, reason)
class BlobFile(object): class BlobFile(object):
""" """
A chunk of data available on the network which is specified by a hashsum A chunk of data available on the network which is specified by a hashsum
@ -299,38 +226,3 @@ class BlobFile(object):
defer.returnValue(True) defer.returnValue(True)
else: else:
raise DownloadCanceledError() raise DownloadCanceledError()
class BlobFileCreator(object):
"""
This class is used to create blobs on the local filesystem
when we do not know the blob hash beforehand (i.e, when creating
a new stream)
"""
def __init__(self, blob_dir):
self.blob_dir = blob_dir
self.buffer = BytesIO()
self._is_open = True
self._hashsum = get_lbry_hash_obj()
self.len_so_far = 0
self.blob_hash = None
self.length = None
@defer.inlineCallbacks
def close(self):
self.length = self.len_so_far
self.blob_hash = self._hashsum.hexdigest()
if self.blob_hash and self._is_open:
self.buffer.seek(0)
out_path = os.path.join(self.blob_dir, self.blob_hash)
producer = FileBodyProducer(self.buffer)
yield producer.startProducing(open(out_path, 'wb'))
self._is_open = False
defer.returnValue(self.blob_hash)
def write(self, data):
if not self._is_open:
raise IOError
self._hashsum.update(data)
self.len_so_far += len(data)
self.buffer.write(data)

43
lbrynet/blob/creator.py Normal file
View file

@ -0,0 +1,43 @@
import os
import logging
from io import BytesIO
from twisted.internet import defer
from twisted.web.client import FileBodyProducer
from lbrynet.core.cryptoutils import get_lbry_hash_obj
log = logging.getLogger(__name__)
class BlobFileCreator(object):
"""
This class is used to create blobs on the local filesystem
when we do not know the blob hash beforehand (i.e, when creating
a new stream)
"""
def __init__(self, blob_dir):
self.blob_dir = blob_dir
self.buffer = BytesIO()
self._is_open = True
self._hashsum = get_lbry_hash_obj()
self.len_so_far = 0
self.blob_hash = None
self.length = None
@defer.inlineCallbacks
def close(self):
self.length = self.len_so_far
self.blob_hash = self._hashsum.hexdigest()
if self.blob_hash and self._is_open:
self.buffer.seek(0)
out_path = os.path.join(self.blob_dir, self.blob_hash)
producer = FileBodyProducer(self.buffer)
yield producer.startProducing(open(out_path, 'wb'))
self._is_open = False
defer.returnValue(self.blob_hash)
def write(self, data):
if not self._is_open:
raise IOError
self._hashsum.update(data)
self.len_so_far += len(data)
self.buffer.write(data)

30
lbrynet/blob/reader.py Normal file
View file

@ -0,0 +1,30 @@
import logging
from twisted.internet import interfaces
from zope.interface import implements
log = logging.getLogger(__name__)
class HashBlobReader(object):
implements(interfaces.IConsumer)
def __init__(self, write_func):
self.write_func = write_func
def registerProducer(self, producer, streaming):
from twisted.internet import reactor
self.producer = producer
self.streaming = streaming
if self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
def unregisterProducer(self):
pass
def write(self, data):
from twisted.internet import reactor
self.write_func(data)
if self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing)

54
lbrynet/blob/writer.py Normal file
View file

@ -0,0 +1,54 @@
import logging
from io import BytesIO
from twisted.python.failure import Failure
from lbrynet.core.Error import DownloadCanceledError, InvalidDataError
from lbrynet.core.cryptoutils import get_lbry_hash_obj
log = logging.getLogger(__name__)
class HashBlobWriter(object):
def __init__(self, length_getter, finished_cb):
self.write_handle = BytesIO()
self.length_getter = length_getter
self.finished_cb = finished_cb
self.finished_cb_d = None
self._hashsum = get_lbry_hash_obj()
self.len_so_far = 0
@property
def blob_hash(self):
return self._hashsum.hexdigest()
def write(self, data):
if self.write_handle is None:
log.info("writer has already been closed")
# can this be changed to IOError?
raise ValueError('I/O operation on closed file')
self._hashsum.update(data)
self.len_so_far += len(data)
if self.len_so_far > self.length_getter():
self.finished_cb_d = self.finished_cb(
self,
Failure(InvalidDataError("Length so far is greater than the expected length."
" %s to %s" % (self.len_so_far,
self.length_getter()))))
else:
self.write_handle.write(data)
if self.len_so_far == self.length_getter():
self.finished_cb_d = self.finished_cb(self)
def close_handle(self):
if self.write_handle is not None:
self.write_handle.close()
self.write_handle = None
def close(self, reason=None):
# if we've already called finished_cb because we either finished writing
# or closed already, do nothing
if self.finished_cb_d is not None:
return
if reason is None:
reason = Failure(DownloadCanceledError())
self.finished_cb_d = self.finished_cb(self, reason)

View file

@ -6,7 +6,8 @@ import sqlite3
from twisted.internet import threads, defer, reactor from twisted.internet import threads, defer, reactor
from twisted.enterprise import adbapi from twisted.enterprise import adbapi
from lbrynet import conf from lbrynet import conf
from lbrynet.core.HashBlob import BlobFile, BlobFileCreator from lbrynet.blob.blob_file import BlobFile
from lbrynet.blob.creator import BlobFileCreator
from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier
from lbrynet.core.sqlite_helpers import rerun_if_locked from lbrynet.core.sqlite_helpers import rerun_if_locked

View file

@ -10,7 +10,7 @@ from twisted.internet import reactor
from lbrynet import conf from lbrynet import conf
from lbrynet.cryptstream import CryptBlob from lbrynet.cryptstream import CryptBlob
from lbrynet.core import HashBlob from lbrynet.blob import BlobFile
from lbrynet.core import log_support from lbrynet.core import log_support
@ -46,7 +46,7 @@ def decrypt_blob(blob_file, key, iv, output):
filename = os.path.abspath(blob_file) filename = os.path.abspath(blob_file)
length = os.path.getsize(filename) length = os.path.getsize(filename)
directory, blob_hash = os.path.split(filename) directory, blob_hash = os.path.split(filename)
blob = HashBlob.BlobFile(directory, blob_hash, True, length) blob = BlobFile(directory, blob_hash, length)
decryptor = CryptBlob.StreamBlobDecryptor( decryptor = CryptBlob.StreamBlobDecryptor(
blob, binascii.unhexlify(key), binascii.unhexlify(iv), length) blob, binascii.unhexlify(key), binascii.unhexlify(iv), length)
with open(output, 'w') as f: with open(output, 'w') as f:

View file

@ -13,7 +13,7 @@ from lbrynet import conf
from lbrynet.core import log_support from lbrynet.core import log_support
from lbrynet.core import BlobManager from lbrynet.core import BlobManager
from lbrynet.core import HashAnnouncer from lbrynet.core import HashAnnouncer
from lbrynet.core import HashBlob from lbrynet.blob import BlobFile
from lbrynet.core import RateLimiter from lbrynet.core import RateLimiter
from lbrynet.core import Peer from lbrynet.core import Peer
from lbrynet.core import Wallet from lbrynet.core import Wallet
@ -38,7 +38,7 @@ def main(args=None):
announcer = HashAnnouncer.DummyHashAnnouncer() announcer = HashAnnouncer.DummyHashAnnouncer()
blob_manager = MyBlobManager(announcer) blob_manager = MyBlobManager(announcer)
blob = HashBlob.BlobFile(args.directory, args.blob_hash) blob = BlobFile(args.directory, args.blob_hash)
download_manager = SingleBlobDownloadManager(blob) download_manager = SingleBlobDownloadManager(blob)
peer = Peer.Peer(*conf.server_port(args.peer)) peer = Peer.Peer(*conf.server_port(args.peer))
payment_rate_manager = DumbPaymentRateManager() payment_rate_manager = DumbPaymentRateManager()

View file

@ -1,4 +1,4 @@
from lbrynet.core.HashBlob import BlobFile from lbrynet.blob import BlobFile
from lbrynet.core.Error import DownloadCanceledError, InvalidDataError from lbrynet.core.Error import DownloadCanceledError, InvalidDataError