forked from LBRYCommunity/lbry-sdk
Merge branch 'fix_stream_tail_blob'
This commit is contained in:
commit
79924f2c9e
3 changed files with 26 additions and 23 deletions
|
@ -26,6 +26,7 @@ at anytime.
|
||||||
* Fixed hanging delayedCall in dht node class
|
* Fixed hanging delayedCall in dht node class
|
||||||
* Fixed logging error in dht when calling or receiving methods with no arguments
|
* Fixed logging error in dht when calling or receiving methods with no arguments
|
||||||
* Fixed IndexError in routingTable.findCloseNodes which would cause an empty list to be returned
|
* Fixed IndexError in routingTable.findCloseNodes which would cause an empty list to be returned
|
||||||
|
* Fixed bug where last blob in a stream was not saved to blob manager
|
||||||
|
|
||||||
### Deprecated
|
### Deprecated
|
||||||
* Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead.
|
* Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead.
|
||||||
|
|
|
@ -69,21 +69,26 @@ class CryptStreamCreator(object):
|
||||||
self.stopped = True
|
self.stopped = True
|
||||||
self.producer = None
|
self.producer = None
|
||||||
|
|
||||||
|
def _close_current_blob(self):
|
||||||
|
# close the blob that was being written to
|
||||||
|
# and save it to blob manager
|
||||||
|
should_announce = self.blob_count == 0
|
||||||
|
d = self.current_blob.close()
|
||||||
|
d.addCallback(self._blob_finished)
|
||||||
|
d.addCallback(lambda blob_info: self.blob_manager.creator_finished(blob_info,
|
||||||
|
should_announce))
|
||||||
|
self.finished_deferreds.append(d)
|
||||||
|
self.current_blob = None
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
"""Stop creating the stream. Create the terminating zero-length blob."""
|
"""Stop creating the stream. Create the terminating zero-length blob."""
|
||||||
log.debug("stop has been called for StreamCreator")
|
log.debug("stop has been called for StreamCreator")
|
||||||
self.stopped = True
|
self.stopped = True
|
||||||
if self.current_blob is not None:
|
if self.current_blob is not None:
|
||||||
current_blob = self.current_blob
|
self._close_current_blob()
|
||||||
d = current_blob.close()
|
|
||||||
d.addCallback(self._blob_finished)
|
|
||||||
d.addErrback(self._error)
|
|
||||||
self.finished_deferreds.append(d)
|
|
||||||
self.current_blob = None
|
|
||||||
self._finalize()
|
self._finalize()
|
||||||
dl = defer.DeferredList(self.finished_deferreds)
|
dl = defer.DeferredList(self.finished_deferreds)
|
||||||
dl.addCallback(lambda _: self._finished())
|
dl.addCallback(lambda _: self._finished())
|
||||||
dl.addErrback(self._error)
|
|
||||||
return dl
|
return dl
|
||||||
|
|
||||||
# TODO: move the stream creation process to its own thread and
|
# TODO: move the stream creation process to its own thread and
|
||||||
|
@ -123,6 +128,7 @@ class CryptStreamCreator(object):
|
||||||
d.addCallback(self._blob_finished)
|
d.addCallback(self._blob_finished)
|
||||||
self.finished_deferreds.append(d)
|
self.finished_deferreds.append(d)
|
||||||
|
|
||||||
|
|
||||||
def _write(self, data):
|
def _write(self, data):
|
||||||
while len(data) > 0:
|
while len(data) > 0:
|
||||||
if self.current_blob is None:
|
if self.current_blob is None:
|
||||||
|
@ -133,20 +139,11 @@ class CryptStreamCreator(object):
|
||||||
done, num_bytes_written = self.current_blob.write(data)
|
done, num_bytes_written = self.current_blob.write(data)
|
||||||
data = data[num_bytes_written:]
|
data = data[num_bytes_written:]
|
||||||
if done is True:
|
if done is True:
|
||||||
should_announce = self.blob_count == 0
|
self._close_current_blob()
|
||||||
d = self.current_blob.close()
|
|
||||||
d.addCallback(self._blob_finished)
|
|
||||||
d.addCallback(lambda blob_info: self.blob_manager.creator_finished(blob_info,
|
|
||||||
should_announce))
|
|
||||||
self.finished_deferreds.append(d)
|
|
||||||
self.current_blob = None
|
|
||||||
|
|
||||||
def _get_blob_maker(self, iv, blob_creator):
|
def _get_blob_maker(self, iv, blob_creator):
|
||||||
return CryptStreamBlobMaker(self.key, iv, self.blob_count, blob_creator)
|
return CryptStreamBlobMaker(self.key, iv, self.blob_count, blob_creator)
|
||||||
|
|
||||||
def _error(self, error):
|
|
||||||
log.error(error)
|
|
||||||
|
|
||||||
def _finished(self):
|
def _finished(self):
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
|
|
@ -46,18 +46,23 @@ class CreateEncryptedFileTest(unittest.TestCase):
|
||||||
session, manager, filename, handle, key, iv_generator())
|
session, manager, filename, handle, key, iv_generator())
|
||||||
defer.returnValue(out)
|
defer.returnValue(out)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def test_can_create_file(self):
|
def test_can_create_file(self):
|
||||||
expected_stream_hash = ('41e6b247d923d191b154fb6f1b8529d6ddd6a73d65c357b1acb7'
|
expected_stream_hash = ('41e6b247d923d191b154fb6f1b8529d6ddd6a73d65c357b1acb7'
|
||||||
'42dd83151fb66393a7709e9f346260a4f4db6de10c25')
|
'42dd83151fb66393a7709e9f346260a4f4db6de10c25')
|
||||||
filename = 'test.file'
|
filename = 'test.file'
|
||||||
d = self.create_file(filename)
|
stream_hash = yield self.create_file(filename)
|
||||||
d.addCallback(self.assertEqual, expected_stream_hash)
|
self.assertEqual(expected_stream_hash, stream_hash)
|
||||||
return d
|
|
||||||
|
|
||||||
|
blobs = yield self.blob_manager.get_all_verified_blobs()
|
||||||
|
self.assertEqual(2, len(blobs))
|
||||||
|
num_should_announce_blobs = yield self.blob_manager.count_should_announce_blobs()
|
||||||
|
self.assertEqual(1, num_should_announce_blobs)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def test_can_create_file_with_unicode_filename(self):
|
def test_can_create_file_with_unicode_filename(self):
|
||||||
expected_stream_hash = ('d1da4258f3ce12edb91d7e8e160d091d3ab1432c2e55a6352dce0'
|
expected_stream_hash = ('d1da4258f3ce12edb91d7e8e160d091d3ab1432c2e55a6352dce0'
|
||||||
'2fd5adb86fe144e93e110075b5865fff8617776c6c0')
|
'2fd5adb86fe144e93e110075b5865fff8617776c6c0')
|
||||||
filename = u'☃.file'
|
filename = u'☃.file'
|
||||||
d = self.create_file(filename)
|
stream_hash = yield self.create_file(filename)
|
||||||
d.addCallback(self.assertEqual, expected_stream_hash)
|
self.assertEqual(expected_stream_hash, stream_hash)
|
||||||
return d
|
|
||||||
|
|
Loading…
Reference in a new issue