diff --git a/CHANGELOG.md b/CHANGELOG.md index bc92c2705..f8f2bb925 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,28 +13,50 @@ at anytime. * ### Fixed - * fixed the inconsistencies in API and CLI docstrings - * `blob_announce` error when announcing a single blob - * `blob_list` error when looking up blobs by stream or sd hash - * issue#1107 whereing claiming a channel with the exact amount present in wallet would give out proper error - * Fix channel creation to use same bid logic as claim ([1148])(https://github.com/lbryio/lbry/pull/1148) + * incorrectly raised download cancelled error for already verified blob files + * infinite loop where reflector client keeps trying to send failing blobs, which may be failing because they are invalid and thus will never be successfully received ### Deprecated - * `report_bug` jsonrpc command + * * ### Changed - * reflector server to periodically check and set `should_announce` for sd and head blobs instead of during each request - * reflector server to use `SQLiteStorage` to find needed blob hashes for a stream + * + * ### Added - * scripts to autogenerate documentation - * now updating new channel also takes into consideration the original bid amount, so now channel could be updated for wallet balance + the original bid amount - * forward-compaitibility for upcoming DHT bencoding changes + * `blob_reflect` command to send specific blobs to a reflector server + * ### Removed - * short(single dashed) arguments for `lbrynet-cli` * `flags` decorator from server.py as short flags are no longer used when using api/cli methods + * + + +## [0.19.1] - 2018-03-20 +### Fixed + * Fixed the inconsistencies in API and CLI docstrings + * `blob_announce` error when announcing a single blob + * `blob_list` error when looking up blobs by stream or sd hash ([1126](https://github.com/lbryio/lbry/pull/1126)) + * Claiming a channel with the exact amount present in wallet would return a confusing error ([1107](https://github.com/lbryio/lbry/issues/1107)) + * Channel creation to use same bid logic as for claims ([1148](https://github.com/lbryio/lbry/pull/1148)) + +### Deprecated + * `report_bug` jsonrpc command + +### Changed + * Bumped `lbryschema` requirement to 0.0.15 [see changelog](https://github.com/lbryio/lbryschema/blob/master/CHANGELOG.md#0015---2018-03-20) + * Bumped `lbryum` requirement to 3.2.0 [see changelog](https://github.com/lbryio/lbryum/blob/master/CHANGELOG.md#320---2018-03-20) + * Reflector server to periodically check and set `should_announce` for sd and head blobs instead of during each request + * Reflector server to use `SQLiteStorage` to find needed blob hashes for a stream + +### Added + * Scripts to auto-generate documentation ([1128](https://github.com/lbryio/lbry/pull/1128)) + * Now updating new channel also takes into consideration the original bid amount, so now channel could be updated for wallet balance + the original bid amount ([1137](https://github.com/lbryio/lbry/pull/1137)) + * Forward-compatibility for upcoming DHT bencoding changes + +### Removed + * Short(single dashed) arguments for `lbrynet-cli` ## [0.19.0] - 2018-03-02 diff --git a/INSTALL.md b/INSTALL.md index f06352b97..06a090dd1 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -101,4 +101,6 @@ Then run: pip install --editable . ``` -This will update `lbrynet-daemon` and other executables. +This will install `lbrynet-daemon` in such a way that the changes you make to the code will be used as soon as you restart the daemon. + +Happy hacking! diff --git a/docs/404.html b/docs/404.html index 285a7053a..d7554115d 100644 --- a/docs/404.html +++ b/docs/404.html @@ -32,7 +32,7 @@ - + @@ -295,7 +295,7 @@ - + diff --git a/docs/cli/index.html b/docs/cli/index.html index 880591af8..5c7dee6a9 100644 --- a/docs/cli/index.html +++ b/docs/cli/index.html @@ -32,7 +32,7 @@ - + @@ -2482,7 +2482,7 @@ Returns: - + diff --git a/docs/index.html b/docs/index.html index 3a27aad61..87e403ec6 100644 --- a/docs/index.html +++ b/docs/index.html @@ -32,7 +32,7 @@ - + @@ -2218,7 +2218,7 @@ Returns: - + diff --git a/lbrynet/__init__.py b/lbrynet/__init__.py index 228eace97..604a6d1c3 100644 --- a/lbrynet/__init__.py +++ b/lbrynet/__init__.py @@ -1,6 +1,6 @@ import logging -__version__ = "0.19.1rc6" +__version__ = "0.19.2rc1" version = tuple(__version__.split('.')) logging.getLogger(__name__).addHandler(logging.NullHandler()) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index df17b25b5..709a33df0 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -176,17 +176,18 @@ class BlobFile(object): d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred) d.addCallback(lambda _: cancel_other_downloads()) else: - errback_finished_deferred(Failure(DownloadCanceledError())) - d = defer.succeed(True) + d = defer.succeed(None) + fire_finished_deferred() else: - err_string = "length vs expected: {0}, {1}, hash vs expected: {2}, {3}" - err_string = err_string.format(self.length, writer.len_so_far, self.blob_hash, - writer.blob_hash) + if writer.len_so_far != self.length: + err_string = "blob length is %i vs expected %i" % (writer.len_so_far, self.length) + else: + err_string = "blob hash is %s vs expected %s" % (writer.blob_hash, self.blob_hash) errback_finished_deferred(Failure(InvalidDataError(err_string))) - d = defer.succeed(True) + d = defer.succeed(None) else: errback_finished_deferred(err) - d = defer.succeed(True) + d = defer.succeed(None) d.addBoth(lambda _: writer.close_handle()) return d diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 801c15ef5..7294d44aa 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -719,7 +719,7 @@ class Daemon(AuthJSONRPCServer): claim_out = yield publisher.create_and_publish_stream(name, bid, claim_dict, file_path, claim_address, change_address) if conf.settings['reflect_uploads']: - d = reupload.reflect_stream(publisher.lbry_file) + d = reupload.reflect_file(publisher.lbry_file) d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name), log.exception) self.analytics_manager.send_claim_action('publish') @@ -3009,7 +3009,7 @@ class Daemon(AuthJSONRPCServer): raise Exception('No file found') lbry_file = lbry_files[0] - results = yield reupload.reflect_stream(lbry_file, reflector_server=reflector_server) + results = yield reupload.reflect_file(lbry_file, reflector_server=reflector_server) defer.returnValue(results) @defer.inlineCallbacks @@ -3073,6 +3073,24 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(blob_hashes_for_return) defer.returnValue(response) + def jsonrpc_blob_reflect(self, blob_hashes, reflector_server=None): + """ + Reflects specified blobs + + Usage: + blob_reflect (...) [--reflector_server=] + + Options: + --reflector_server= (str) : reflector address + + Returns: + (list) reflected blob hashes + """ + + d = reupload.reflect_blob_hashes(blob_hashes, self.session.blob_manager, reflector_server) + d.addCallback(lambda r: self._render_response(r)) + return d + def jsonrpc_blob_reflect_all(self): """ Reflects all saved blobs diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index 4dce4b477..a77a8dae8 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -423,12 +423,14 @@ class SQLiteStorage(object): if only_completed: lengths = transaction.execute( "select b.blob_hash, b.blob_length from blob b " - "inner join stream_blob s ON b.blob_hash=s.blob_hash and b.status='finished'" + "inner join stream_blob s ON b.blob_hash=s.blob_hash and b.status='finished' and s.stream_hash=?", + (stream_hash, ) ).fetchall() else: lengths = transaction.execute( "select b.blob_hash, b.blob_length from blob b " - "inner join stream_blob s ON b.blob_hash=s.blob_hash" + "inner join stream_blob s ON b.blob_hash=s.blob_hash and s.stream_hash=?", + (stream_hash, ) ).fetchall() blob_length_dict = {} diff --git a/lbrynet/dht/msgformat.py b/lbrynet/dht/msgformat.py index 7a9dd5378..2cc79f29c 100644 --- a/lbrynet/dht/msgformat.py +++ b/lbrynet/dht/msgformat.py @@ -53,7 +53,7 @@ class DefaultFormat(MessageTranslator): return primitive[str(key)] # TODO: switch to int() def fromPrimitive(self, msgPrimitive): - msgType = msgPrimitive[self.headerType] + msgType = self.get(msgPrimitive, self.headerType) if msgType == self.typeRequest: msg = msgtypes.RequestMessage(self.get(msgPrimitive, self.headerNodeID), self.get(msgPrimitive, self.headerPayload), diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index 96b56c0ab..5f91eae01 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -7,7 +7,7 @@ import logging from twisted.internet import defer, task, reactor from twisted.python.failure import Failure from lbrynet.core.Error import InvalidStreamDescriptorError -from lbrynet.reflector.reupload import reflect_stream +from lbrynet.reflector.reupload import reflect_file from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory @@ -96,41 +96,48 @@ class EncryptedFileManager(object): suggested_file_name=suggested_file_name ) + @defer.inlineCallbacks + def _start_lbry_file(self, file_info, payment_rate_manager): + lbry_file = self._get_lbry_file( + file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'], + file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'], + file_info['suggested_file_name'] + ) + yield lbry_file.get_claim_info() + try: + # verify the stream is valid (we might have downloaded an invalid stream + # in the past when the validation check didn't work) + stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True) + validate_descriptor(stream_info) + except InvalidStreamDescriptorError as err: + log.warning("Stream for descriptor %s is invalid (%s), cleaning it up", + lbry_file.sd_hash, err.message) + yield lbry_file.delete_data() + yield self.session.storage.delete_stream(lbry_file.stream_hash) + else: + try: + # restore will raise an Exception if status is unknown + lbry_file.restore(file_info['status']) + self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info + self.lbry_files.append(lbry_file) + if len(self.lbry_files) % 500 == 0: + log.info("Started %i files", len(self.lbry_files)) + except Exception: + log.warning("Failed to start %i", file_info.get('rowid')) + @defer.inlineCallbacks def _start_lbry_files(self): files = yield self.session.storage.get_all_lbry_files() b_prm = self.session.base_payment_rate_manager payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker) - log.info("Trying to start %i files", len(files)) - for i, file_info in enumerate(files): - if len(files) > 500 and i % 500 == 0: - log.info("Started %i/%i files", i, len(files)) + log.info("Starting %i files", len(files)) + dl = [] + for file_info in files: + dl.append(self._start_lbry_file(file_info, payment_rate_manager)) + + yield defer.DeferredList(dl) - lbry_file = self._get_lbry_file( - file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'], - file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'], - file_info['suggested_file_name'] - ) - yield lbry_file.get_claim_info() - try: - # verify the stream is valid (we might have downloaded an invalid stream - # in the past when the validation check didn't work) - stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True) - validate_descriptor(stream_info) - except InvalidStreamDescriptorError as err: - log.warning("Stream for descriptor %s is invalid (%s), cleaning it up", - lbry_file.sd_hash, err.message) - yield lbry_file.delete_data() - yield self.session.storage.delete_stream(lbry_file.stream_hash) - else: - try: - # restore will raise an Exception if status is unknown - lbry_file.restore(file_info['status']) - self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info - self.lbry_files.append(lbry_file) - except Exception: - log.warning("Failed to start %i", file_info.get('rowid')) log.info("Started %i lbry files", len(self.lbry_files)) if self.auto_re_reflect is True: safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval) @@ -247,7 +254,7 @@ class EncryptedFileManager(object): sem = defer.DeferredSemaphore(self.CONCURRENT_REFLECTS) ds = [] for lbry_file in self.lbry_files: - ds.append(sem.run(reflect_stream, lbry_file)) + ds.append(sem.run(reflect_file, lbry_file)) yield defer.DeferredList(ds) @defer.inlineCallbacks diff --git a/lbrynet/reflector/client/blob.py b/lbrynet/reflector/client/blob.py index d2c41ce30..d2533cb02 100644 --- a/lbrynet/reflector/client/blob.py +++ b/lbrynet/reflector/client/blob.py @@ -26,7 +26,7 @@ class BlobReflectorClient(Protocol): self.file_sender = None self.producer = None self.streaming = False - self.sent_blobs = False + self.reflected_blobs = [] d = self.send_handshake() d.addErrback( lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) @@ -46,10 +46,9 @@ class BlobReflectorClient(Protocol): def connectionLost(self, reason): if reason.check(error.ConnectionDone): - self.factory.sent_blobs = self.sent_blobs - if self.factory.sent_blobs: + if self.reflected_blobs: log.info('Finished sending data via reflector') - self.factory.finished_deferred.callback(True) + self.factory.finished_deferred.callback(self.reflected_blobs) else: log.info('Reflector finished: %s', reason) self.factory.finished_deferred.callback(reason) @@ -101,7 +100,6 @@ class BlobReflectorClient(Protocol): return defer.succeed(None) def start_transfer(self): - self.sent_blobs = True assert self.read_handle is not None, \ "self.read_handle was None when trying to start the transfer" d = self.file_sender.beginFileTransfer(self.read_handle, self) @@ -130,6 +128,8 @@ class BlobReflectorClient(Protocol): if 'received_blob' not in response_dict: raise ValueError("I don't know if the blob made it to the intended destination!") else: + if response_dict['received_blob']: + self.reflected_blobs.append(self.next_blob_to_send.blob_hash) return self.set_not_uploading() def open_blob_for_reading(self, blob): @@ -188,7 +188,6 @@ class BlobReflectorClientFactory(ClientFactory): self.blob_manager = blob_manager self.blobs = blobs self.p = None - self.sent_blobs = False self.finished_deferred = defer.Deferred() def buildProtocol(self, addr): diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index d70e531a2..7c5bc3e3e 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -33,26 +33,14 @@ class EncryptedFileReflectorClient(Protocol): self.file_sender = None self.producer = None self.streaming = False + + self.blob_manager = self.factory.blob_manager + self.protocol_version = self.factory.protocol_version + self.stream_hash = self.factory.stream_hash + d = self.load_descriptor() d.addCallback(lambda _: self.send_handshake()) - d.addErrback( - lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) - - @property - def file_name(self): - return self.factory.file_name - - @property - def blob_manager(self): - return self.factory.blob_manager - - @property - def protocol_version(self): - return self.factory.protocol_version - - @property - def stream_hash(self): - return self.factory.stream_hash + d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) def dataReceived(self, data): self.response_buff += data @@ -131,7 +119,7 @@ class EncryptedFileReflectorClient(Protocol): len(filtered)) return filtered - d = self.factory.blob_manager.storage.get_blobs_for_stream(self.factory.stream_hash) + d = self.factory.blob_manager.storage.get_blobs_for_stream(self.stream_hash) d.addCallback(self.get_validated_blobs) if not self.descriptor_needed: d.addCallback(lambda filtered: @@ -151,7 +139,7 @@ class EncryptedFileReflectorClient(Protocol): def _save_descriptor_blob(sd_blob): self.stream_descriptor = sd_blob - d = self.factory.blob_manager.storage.get_sd_blob_hash_for_stream(self.factory.stream_hash) + d = self.factory.blob_manager.storage.get_sd_blob_hash_for_stream(self.stream_hash) d.addCallback(self.factory.blob_manager.get_blob) d.addCallback(_save_descriptor_blob) return d @@ -216,14 +204,18 @@ class EncryptedFileReflectorClient(Protocol): raise ValueError("I don't know if the sd blob made it to the intended destination!") else: self.received_descriptor_response = True + disconnect = False if response_dict['received_sd_blob']: self.reflected_blobs.append(self.next_blob_to_send.blob_hash) log.info("Sent reflector descriptor %s", self.next_blob_to_send) else: log.warning("Reflector failed to receive descriptor %s", self.next_blob_to_send) - self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash) - return self.set_not_uploading() + disconnect = True + d = self.set_not_uploading() + if disconnect: + d.addCallback(lambda _: self.transport.loseConnection()) + return d def handle_normal_response(self, response_dict): if self.file_sender is None: # Expecting Server Info Response @@ -244,7 +236,6 @@ class EncryptedFileReflectorClient(Protocol): log.debug("Sent reflector blob %s", self.next_blob_to_send) else: log.warning("Reflector failed to receive blob %s", self.next_blob_to_send) - self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash) return self.set_not_uploading() def open_blob_for_reading(self, blob): @@ -312,28 +303,14 @@ class EncryptedFileReflectorClient(Protocol): class EncryptedFileReflectorClientFactory(ClientFactory): protocol = EncryptedFileReflectorClient + protocol_version = REFLECTOR_V2 - def __init__(self, lbry_file): - self._lbry_file = lbry_file + def __init__(self, blob_manager, stream_hash): + self.blob_manager = blob_manager + self.stream_hash = stream_hash self.p = None self.finished_deferred = defer.Deferred() - @property - def blob_manager(self): - return self._lbry_file.blob_manager - - @property - def stream_hash(self): - return self._lbry_file.stream_hash - - @property - def file_name(self): - return self._lbry_file.file_name - - @property - def protocol_version(self): - return REFLECTOR_V2 - def buildProtocol(self, addr): p = self.protocol() p.factory = self diff --git a/lbrynet/reflector/reupload.py b/lbrynet/reflector/reupload.py index 39c6c1ba1..5f31b5c32 100644 --- a/lbrynet/reflector/reupload.py +++ b/lbrynet/reflector/reupload.py @@ -24,15 +24,19 @@ def resolve(host): @defer.inlineCallbacks -def _reflect_stream(lbry_file, reflector_server): +def _reflect_stream(blob_manager, stream_hash, reflector_server): reflector_address, reflector_port = reflector_server[0], reflector_server[1] - factory = ClientFactory(lbry_file) + factory = ClientFactory(blob_manager, stream_hash) ip = yield resolve(reflector_address) yield reactor.connectTCP(ip, reflector_port, factory) result = yield factory.finished_deferred defer.returnValue(result) +def _reflect_file(lbry_file, reflector_server): + return _reflect_stream(lbry_file.blob_manager, lbry_file.stream_hash, reflector_server) + + @defer.inlineCallbacks def _reflect_blobs(blob_manager, blob_hashes, reflector_server): reflector_address, reflector_port = reflector_server[0], reflector_server[1] @@ -43,7 +47,7 @@ def _reflect_blobs(blob_manager, blob_hashes, reflector_server): defer.returnValue(result) -def reflect_stream(lbry_file, reflector_server=None): +def reflect_file(lbry_file, reflector_server=None): if reflector_server: if len(reflector_server.split(":")) == 2: host, port = tuple(reflector_server.split(":")) @@ -52,7 +56,19 @@ def reflect_stream(lbry_file, reflector_server=None): reflector_server = reflector_server, 5566 else: reflector_server = random.choice(conf.settings['reflector_servers']) - return _reflect_stream(lbry_file, reflector_server) + return _reflect_file(lbry_file, reflector_server) + + +def reflect_stream(blob_manager, stream_hash, reflector_server=None): + if reflector_server: + if len(reflector_server.split(":")) == 2: + host, port = tuple(reflector_server.split(":")) + reflector_server = host, int(port) + else: + reflector_server = reflector_server, 5566 + else: + reflector_server = random.choice(conf.settings['reflector_servers']) + return _reflect_stream(blob_manager, stream_hash, reflector_server) def reflect_blob_hashes(blob_hashes, blob_manager, reflector_server=None): diff --git a/lbrynet/tests/functional/test_reflector.py b/lbrynet/tests/functional/test_reflector.py index a73dbee96..41d24902a 100644 --- a/lbrynet/tests/functional/test_reflector.py +++ b/lbrynet/tests/functional/test_reflector.py @@ -212,10 +212,7 @@ class TestReflector(unittest.TestCase): return d def send_to_server(): - fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager, - self.server_session.storage, - self.stream_hash) - factory = reflector.ClientFactory(fake_lbry_file) + factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash) from twisted.internet import reactor reactor.connectTCP('localhost', self.port, factory) @@ -348,10 +345,7 @@ class TestReflector(unittest.TestCase): return factory.finished_deferred def send_to_server_as_stream(result): - fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager, - self.server_session.storage, - self.stream_hash) - factory = reflector.ClientFactory(fake_lbry_file) + factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash) from twisted.internet import reactor reactor.connectTCP('localhost', self.port, factory) diff --git a/requirements.txt b/requirements.txt index 4a0015bf8..cbca7a5c5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,8 +12,8 @@ GitPython==2.1.3 jsonrpc==1.2 jsonrpclib==0.1.7 keyring==10.4.0 -git+https://github.com/lbryio/lbryschema.git@v0.0.15rc3#egg=lbryschema -git+https://github.com/lbryio/lbryum.git@v3.2.0rc20#egg=lbryum +git+https://github.com/lbryio/lbryschema.git@v0.0.15#egg=lbryschema +git+https://github.com/lbryio/lbryum.git@v3.2.0#egg=lbryum miniupnpc==1.9 pbkdf2==1.3 pycrypto==2.6.1 diff --git a/scripts/gen_docs.py b/scripts/gen_docs.py index d1c0827f7..b4317269f 100755 --- a/scripts/gen_docs.py +++ b/scripts/gen_docs.py @@ -172,7 +172,7 @@ def _api_doc(obj): def main(): - root_dir = os.path.dirname(os.path.dirname(__file__)) + root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) build_dir = os.path.realpath(os.path.join(root_dir, DOCS_BUILD_DIR)) if not os.path.exists(build_dir): os.makedirs(build_dir) diff --git a/setup.py b/setup.py index 4c5074a11..d341584e5 100644 --- a/setup.py +++ b/setup.py @@ -20,8 +20,8 @@ requires = [ 'base58', 'envparse', 'jsonrpc', - 'lbryschema==0.0.15rc3', - 'lbryum==3.2.0rc20', + 'lbryschema==0.0.15', + 'lbryum==3.2.0', 'miniupnpc', 'pycrypto', 'pyyaml',