forked from LBRYCommunity/lbry-sdk
Merge branch 'master' into no-short-flags
This commit is contained in:
commit
f0742675b8
18 changed files with 164 additions and 126 deletions
46
CHANGELOG.md
46
CHANGELOG.md
|
@ -13,28 +13,50 @@ at anytime.
|
||||||
*
|
*
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
* fixed the inconsistencies in API and CLI docstrings
|
* incorrectly raised download cancelled error for already verified blob files
|
||||||
* `blob_announce` error when announcing a single blob
|
* infinite loop where reflector client keeps trying to send failing blobs, which may be failing because they are invalid and thus will never be successfully received
|
||||||
* `blob_list` error when looking up blobs by stream or sd hash
|
|
||||||
* issue#1107 whereing claiming a channel with the exact amount present in wallet would give out proper error
|
|
||||||
* Fix channel creation to use same bid logic as claim ([1148])(https://github.com/lbryio/lbry/pull/1148)
|
|
||||||
|
|
||||||
### Deprecated
|
### Deprecated
|
||||||
* `report_bug` jsonrpc command
|
*
|
||||||
*
|
*
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
* reflector server to periodically check and set `should_announce` for sd and head blobs instead of during each request
|
*
|
||||||
* reflector server to use `SQLiteStorage` to find needed blob hashes for a stream
|
*
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
* scripts to autogenerate documentation
|
* `blob_reflect` command to send specific blobs to a reflector server
|
||||||
* now updating new channel also takes into consideration the original bid amount, so now channel could be updated for wallet balance + the original bid amount
|
*
|
||||||
* forward-compaitibility for upcoming DHT bencoding changes
|
|
||||||
|
|
||||||
### Removed
|
### Removed
|
||||||
* short(single dashed) arguments for `lbrynet-cli`
|
|
||||||
* `flags` decorator from server.py as short flags are no longer used when using api/cli methods
|
* `flags` decorator from server.py as short flags are no longer used when using api/cli methods
|
||||||
|
*
|
||||||
|
|
||||||
|
|
||||||
|
## [0.19.1] - 2018-03-20
|
||||||
|
### Fixed
|
||||||
|
* Fixed the inconsistencies in API and CLI docstrings
|
||||||
|
* `blob_announce` error when announcing a single blob
|
||||||
|
* `blob_list` error when looking up blobs by stream or sd hash ([1126](https://github.com/lbryio/lbry/pull/1126))
|
||||||
|
* Claiming a channel with the exact amount present in wallet would return a confusing error ([1107](https://github.com/lbryio/lbry/issues/1107))
|
||||||
|
* Channel creation to use same bid logic as for claims ([1148](https://github.com/lbryio/lbry/pull/1148))
|
||||||
|
|
||||||
|
### Deprecated
|
||||||
|
* `report_bug` jsonrpc command
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
* Bumped `lbryschema` requirement to 0.0.15 [see changelog](https://github.com/lbryio/lbryschema/blob/master/CHANGELOG.md#0015---2018-03-20)
|
||||||
|
* Bumped `lbryum` requirement to 3.2.0 [see changelog](https://github.com/lbryio/lbryum/blob/master/CHANGELOG.md#320---2018-03-20)
|
||||||
|
* Reflector server to periodically check and set `should_announce` for sd and head blobs instead of during each request
|
||||||
|
* Reflector server to use `SQLiteStorage` to find needed blob hashes for a stream
|
||||||
|
|
||||||
|
### Added
|
||||||
|
* Scripts to auto-generate documentation ([1128](https://github.com/lbryio/lbry/pull/1128))
|
||||||
|
* Now updating new channel also takes into consideration the original bid amount, so now channel could be updated for wallet balance + the original bid amount ([1137](https://github.com/lbryio/lbry/pull/1137))
|
||||||
|
* Forward-compatibility for upcoming DHT bencoding changes
|
||||||
|
|
||||||
|
### Removed
|
||||||
|
* Short(single dashed) arguments for `lbrynet-cli`
|
||||||
|
|
||||||
|
|
||||||
## [0.19.0] - 2018-03-02
|
## [0.19.0] - 2018-03-02
|
||||||
|
|
|
@ -101,4 +101,6 @@ Then run:
|
||||||
pip install --editable .
|
pip install --editable .
|
||||||
```
|
```
|
||||||
|
|
||||||
This will update `lbrynet-daemon` and other executables.
|
This will install `lbrynet-daemon` in such a way that the changes you make to the code will be used as soon as you restart the daemon.
|
||||||
|
|
||||||
|
Happy hacking!
|
||||||
|
|
|
@ -32,7 +32,7 @@
|
||||||
<meta name="lang:search.tokenizer" content="[\s\-]+">
|
<meta name="lang:search.tokenizer" content="[\s\-]+">
|
||||||
|
|
||||||
<link rel="shortcut icon" href="/assets/images/favicon.png">
|
<link rel="shortcut icon" href="/assets/images/favicon.png">
|
||||||
<meta name="generator" content="mkdocs-0.17.2, mkdocs-material-2.7.0">
|
<meta name="generator" content="mkdocs-0.17.3, mkdocs-material-2.7.0">
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -295,7 +295,7 @@
|
||||||
|
|
||||||
<script src="/assets/javascripts/application.8eb9be28.js"></script>
|
<script src="/assets/javascripts/application.8eb9be28.js"></script>
|
||||||
|
|
||||||
<script>app.initialize({version:"0.17.2",url:{base:""}})</script>
|
<script>app.initialize({version:"0.17.3",url:{base:""}})</script>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,7 @@
|
||||||
<meta name="lang:search.tokenizer" content="[\s\-]+">
|
<meta name="lang:search.tokenizer" content="[\s\-]+">
|
||||||
|
|
||||||
<link rel="shortcut icon" href="../assets/images/favicon.png">
|
<link rel="shortcut icon" href="../assets/images/favicon.png">
|
||||||
<meta name="generator" content="mkdocs-0.17.2, mkdocs-material-2.7.0">
|
<meta name="generator" content="mkdocs-0.17.3, mkdocs-material-2.7.0">
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -2482,7 +2482,7 @@ Returns:
|
||||||
|
|
||||||
<script src="../assets/javascripts/application.8eb9be28.js"></script>
|
<script src="../assets/javascripts/application.8eb9be28.js"></script>
|
||||||
|
|
||||||
<script>app.initialize({version:"0.17.2",url:{base:".."}})</script>
|
<script>app.initialize({version:"0.17.3",url:{base:".."}})</script>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,7 @@
|
||||||
<meta name="lang:search.tokenizer" content="[\s\-]+">
|
<meta name="lang:search.tokenizer" content="[\s\-]+">
|
||||||
|
|
||||||
<link rel="shortcut icon" href="./assets/images/favicon.png">
|
<link rel="shortcut icon" href="./assets/images/favicon.png">
|
||||||
<meta name="generator" content="mkdocs-0.17.2, mkdocs-material-2.7.0">
|
<meta name="generator" content="mkdocs-0.17.3, mkdocs-material-2.7.0">
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -2218,7 +2218,7 @@ Returns:
|
||||||
|
|
||||||
<script src="./assets/javascripts/application.8eb9be28.js"></script>
|
<script src="./assets/javascripts/application.8eb9be28.js"></script>
|
||||||
|
|
||||||
<script>app.initialize({version:"0.17.2",url:{base:"."}})</script>
|
<script>app.initialize({version:"0.17.3",url:{base:"."}})</script>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
__version__ = "0.19.1rc6"
|
__version__ = "0.19.2rc1"
|
||||||
version = tuple(__version__.split('.'))
|
version = tuple(__version__.split('.'))
|
||||||
|
|
||||||
logging.getLogger(__name__).addHandler(logging.NullHandler())
|
logging.getLogger(__name__).addHandler(logging.NullHandler())
|
||||||
|
|
|
@ -176,17 +176,18 @@ class BlobFile(object):
|
||||||
d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred)
|
d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred)
|
||||||
d.addCallback(lambda _: cancel_other_downloads())
|
d.addCallback(lambda _: cancel_other_downloads())
|
||||||
else:
|
else:
|
||||||
errback_finished_deferred(Failure(DownloadCanceledError()))
|
d = defer.succeed(None)
|
||||||
d = defer.succeed(True)
|
fire_finished_deferred()
|
||||||
else:
|
else:
|
||||||
err_string = "length vs expected: {0}, {1}, hash vs expected: {2}, {3}"
|
if writer.len_so_far != self.length:
|
||||||
err_string = err_string.format(self.length, writer.len_so_far, self.blob_hash,
|
err_string = "blob length is %i vs expected %i" % (writer.len_so_far, self.length)
|
||||||
writer.blob_hash)
|
else:
|
||||||
|
err_string = "blob hash is %s vs expected %s" % (writer.blob_hash, self.blob_hash)
|
||||||
errback_finished_deferred(Failure(InvalidDataError(err_string)))
|
errback_finished_deferred(Failure(InvalidDataError(err_string)))
|
||||||
d = defer.succeed(True)
|
d = defer.succeed(None)
|
||||||
else:
|
else:
|
||||||
errback_finished_deferred(err)
|
errback_finished_deferred(err)
|
||||||
d = defer.succeed(True)
|
d = defer.succeed(None)
|
||||||
d.addBoth(lambda _: writer.close_handle())
|
d.addBoth(lambda _: writer.close_handle())
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
|
@ -719,7 +719,7 @@ class Daemon(AuthJSONRPCServer):
|
||||||
claim_out = yield publisher.create_and_publish_stream(name, bid, claim_dict, file_path,
|
claim_out = yield publisher.create_and_publish_stream(name, bid, claim_dict, file_path,
|
||||||
claim_address, change_address)
|
claim_address, change_address)
|
||||||
if conf.settings['reflect_uploads']:
|
if conf.settings['reflect_uploads']:
|
||||||
d = reupload.reflect_stream(publisher.lbry_file)
|
d = reupload.reflect_file(publisher.lbry_file)
|
||||||
d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name),
|
d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name),
|
||||||
log.exception)
|
log.exception)
|
||||||
self.analytics_manager.send_claim_action('publish')
|
self.analytics_manager.send_claim_action('publish')
|
||||||
|
@ -3009,7 +3009,7 @@ class Daemon(AuthJSONRPCServer):
|
||||||
raise Exception('No file found')
|
raise Exception('No file found')
|
||||||
lbry_file = lbry_files[0]
|
lbry_file = lbry_files[0]
|
||||||
|
|
||||||
results = yield reupload.reflect_stream(lbry_file, reflector_server=reflector_server)
|
results = yield reupload.reflect_file(lbry_file, reflector_server=reflector_server)
|
||||||
defer.returnValue(results)
|
defer.returnValue(results)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@ -3073,6 +3073,24 @@ class Daemon(AuthJSONRPCServer):
|
||||||
response = yield self._render_response(blob_hashes_for_return)
|
response = yield self._render_response(blob_hashes_for_return)
|
||||||
defer.returnValue(response)
|
defer.returnValue(response)
|
||||||
|
|
||||||
|
def jsonrpc_blob_reflect(self, blob_hashes, reflector_server=None):
|
||||||
|
"""
|
||||||
|
Reflects specified blobs
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
blob_reflect (<blob_hashes>...) [--reflector_server=<reflector_server>]
|
||||||
|
|
||||||
|
Options:
|
||||||
|
--reflector_server=<reflector_server> (str) : reflector address
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(list) reflected blob hashes
|
||||||
|
"""
|
||||||
|
|
||||||
|
d = reupload.reflect_blob_hashes(blob_hashes, self.session.blob_manager, reflector_server)
|
||||||
|
d.addCallback(lambda r: self._render_response(r))
|
||||||
|
return d
|
||||||
|
|
||||||
def jsonrpc_blob_reflect_all(self):
|
def jsonrpc_blob_reflect_all(self):
|
||||||
"""
|
"""
|
||||||
Reflects all saved blobs
|
Reflects all saved blobs
|
||||||
|
|
|
@ -423,12 +423,14 @@ class SQLiteStorage(object):
|
||||||
if only_completed:
|
if only_completed:
|
||||||
lengths = transaction.execute(
|
lengths = transaction.execute(
|
||||||
"select b.blob_hash, b.blob_length from blob b "
|
"select b.blob_hash, b.blob_length from blob b "
|
||||||
"inner join stream_blob s ON b.blob_hash=s.blob_hash and b.status='finished'"
|
"inner join stream_blob s ON b.blob_hash=s.blob_hash and b.status='finished' and s.stream_hash=?",
|
||||||
|
(stream_hash, )
|
||||||
).fetchall()
|
).fetchall()
|
||||||
else:
|
else:
|
||||||
lengths = transaction.execute(
|
lengths = transaction.execute(
|
||||||
"select b.blob_hash, b.blob_length from blob b "
|
"select b.blob_hash, b.blob_length from blob b "
|
||||||
"inner join stream_blob s ON b.blob_hash=s.blob_hash"
|
"inner join stream_blob s ON b.blob_hash=s.blob_hash and s.stream_hash=?",
|
||||||
|
(stream_hash, )
|
||||||
).fetchall()
|
).fetchall()
|
||||||
|
|
||||||
blob_length_dict = {}
|
blob_length_dict = {}
|
||||||
|
|
|
@ -53,7 +53,7 @@ class DefaultFormat(MessageTranslator):
|
||||||
return primitive[str(key)] # TODO: switch to int()
|
return primitive[str(key)] # TODO: switch to int()
|
||||||
|
|
||||||
def fromPrimitive(self, msgPrimitive):
|
def fromPrimitive(self, msgPrimitive):
|
||||||
msgType = msgPrimitive[self.headerType]
|
msgType = self.get(msgPrimitive, self.headerType)
|
||||||
if msgType == self.typeRequest:
|
if msgType == self.typeRequest:
|
||||||
msg = msgtypes.RequestMessage(self.get(msgPrimitive, self.headerNodeID),
|
msg = msgtypes.RequestMessage(self.get(msgPrimitive, self.headerNodeID),
|
||||||
self.get(msgPrimitive, self.headerPayload),
|
self.get(msgPrimitive, self.headerPayload),
|
||||||
|
|
|
@ -7,7 +7,7 @@ import logging
|
||||||
from twisted.internet import defer, task, reactor
|
from twisted.internet import defer, task, reactor
|
||||||
from twisted.python.failure import Failure
|
from twisted.python.failure import Failure
|
||||||
from lbrynet.core.Error import InvalidStreamDescriptorError
|
from lbrynet.core.Error import InvalidStreamDescriptorError
|
||||||
from lbrynet.reflector.reupload import reflect_stream
|
from lbrynet.reflector.reupload import reflect_file
|
||||||
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager
|
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager
|
||||||
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
|
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
|
||||||
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
|
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
|
||||||
|
@ -96,41 +96,48 @@ class EncryptedFileManager(object):
|
||||||
suggested_file_name=suggested_file_name
|
suggested_file_name=suggested_file_name
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _start_lbry_file(self, file_info, payment_rate_manager):
|
||||||
|
lbry_file = self._get_lbry_file(
|
||||||
|
file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'],
|
||||||
|
file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'],
|
||||||
|
file_info['suggested_file_name']
|
||||||
|
)
|
||||||
|
yield lbry_file.get_claim_info()
|
||||||
|
try:
|
||||||
|
# verify the stream is valid (we might have downloaded an invalid stream
|
||||||
|
# in the past when the validation check didn't work)
|
||||||
|
stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True)
|
||||||
|
validate_descriptor(stream_info)
|
||||||
|
except InvalidStreamDescriptorError as err:
|
||||||
|
log.warning("Stream for descriptor %s is invalid (%s), cleaning it up",
|
||||||
|
lbry_file.sd_hash, err.message)
|
||||||
|
yield lbry_file.delete_data()
|
||||||
|
yield self.session.storage.delete_stream(lbry_file.stream_hash)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
# restore will raise an Exception if status is unknown
|
||||||
|
lbry_file.restore(file_info['status'])
|
||||||
|
self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info
|
||||||
|
self.lbry_files.append(lbry_file)
|
||||||
|
if len(self.lbry_files) % 500 == 0:
|
||||||
|
log.info("Started %i files", len(self.lbry_files))
|
||||||
|
except Exception:
|
||||||
|
log.warning("Failed to start %i", file_info.get('rowid'))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _start_lbry_files(self):
|
def _start_lbry_files(self):
|
||||||
files = yield self.session.storage.get_all_lbry_files()
|
files = yield self.session.storage.get_all_lbry_files()
|
||||||
b_prm = self.session.base_payment_rate_manager
|
b_prm = self.session.base_payment_rate_manager
|
||||||
payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
|
payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
|
||||||
|
|
||||||
log.info("Trying to start %i files", len(files))
|
log.info("Starting %i files", len(files))
|
||||||
for i, file_info in enumerate(files):
|
dl = []
|
||||||
if len(files) > 500 and i % 500 == 0:
|
for file_info in files:
|
||||||
log.info("Started %i/%i files", i, len(files))
|
dl.append(self._start_lbry_file(file_info, payment_rate_manager))
|
||||||
|
|
||||||
|
yield defer.DeferredList(dl)
|
||||||
|
|
||||||
lbry_file = self._get_lbry_file(
|
|
||||||
file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'],
|
|
||||||
file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'],
|
|
||||||
file_info['suggested_file_name']
|
|
||||||
)
|
|
||||||
yield lbry_file.get_claim_info()
|
|
||||||
try:
|
|
||||||
# verify the stream is valid (we might have downloaded an invalid stream
|
|
||||||
# in the past when the validation check didn't work)
|
|
||||||
stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True)
|
|
||||||
validate_descriptor(stream_info)
|
|
||||||
except InvalidStreamDescriptorError as err:
|
|
||||||
log.warning("Stream for descriptor %s is invalid (%s), cleaning it up",
|
|
||||||
lbry_file.sd_hash, err.message)
|
|
||||||
yield lbry_file.delete_data()
|
|
||||||
yield self.session.storage.delete_stream(lbry_file.stream_hash)
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
# restore will raise an Exception if status is unknown
|
|
||||||
lbry_file.restore(file_info['status'])
|
|
||||||
self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info
|
|
||||||
self.lbry_files.append(lbry_file)
|
|
||||||
except Exception:
|
|
||||||
log.warning("Failed to start %i", file_info.get('rowid'))
|
|
||||||
log.info("Started %i lbry files", len(self.lbry_files))
|
log.info("Started %i lbry files", len(self.lbry_files))
|
||||||
if self.auto_re_reflect is True:
|
if self.auto_re_reflect is True:
|
||||||
safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval)
|
safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval)
|
||||||
|
@ -247,7 +254,7 @@ class EncryptedFileManager(object):
|
||||||
sem = defer.DeferredSemaphore(self.CONCURRENT_REFLECTS)
|
sem = defer.DeferredSemaphore(self.CONCURRENT_REFLECTS)
|
||||||
ds = []
|
ds = []
|
||||||
for lbry_file in self.lbry_files:
|
for lbry_file in self.lbry_files:
|
||||||
ds.append(sem.run(reflect_stream, lbry_file))
|
ds.append(sem.run(reflect_file, lbry_file))
|
||||||
yield defer.DeferredList(ds)
|
yield defer.DeferredList(ds)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
|
|
@ -26,7 +26,7 @@ class BlobReflectorClient(Protocol):
|
||||||
self.file_sender = None
|
self.file_sender = None
|
||||||
self.producer = None
|
self.producer = None
|
||||||
self.streaming = False
|
self.streaming = False
|
||||||
self.sent_blobs = False
|
self.reflected_blobs = []
|
||||||
d = self.send_handshake()
|
d = self.send_handshake()
|
||||||
d.addErrback(
|
d.addErrback(
|
||||||
lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
|
lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
|
||||||
|
@ -46,10 +46,9 @@ class BlobReflectorClient(Protocol):
|
||||||
|
|
||||||
def connectionLost(self, reason):
|
def connectionLost(self, reason):
|
||||||
if reason.check(error.ConnectionDone):
|
if reason.check(error.ConnectionDone):
|
||||||
self.factory.sent_blobs = self.sent_blobs
|
if self.reflected_blobs:
|
||||||
if self.factory.sent_blobs:
|
|
||||||
log.info('Finished sending data via reflector')
|
log.info('Finished sending data via reflector')
|
||||||
self.factory.finished_deferred.callback(True)
|
self.factory.finished_deferred.callback(self.reflected_blobs)
|
||||||
else:
|
else:
|
||||||
log.info('Reflector finished: %s', reason)
|
log.info('Reflector finished: %s', reason)
|
||||||
self.factory.finished_deferred.callback(reason)
|
self.factory.finished_deferred.callback(reason)
|
||||||
|
@ -101,7 +100,6 @@ class BlobReflectorClient(Protocol):
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
|
|
||||||
def start_transfer(self):
|
def start_transfer(self):
|
||||||
self.sent_blobs = True
|
|
||||||
assert self.read_handle is not None, \
|
assert self.read_handle is not None, \
|
||||||
"self.read_handle was None when trying to start the transfer"
|
"self.read_handle was None when trying to start the transfer"
|
||||||
d = self.file_sender.beginFileTransfer(self.read_handle, self)
|
d = self.file_sender.beginFileTransfer(self.read_handle, self)
|
||||||
|
@ -130,6 +128,8 @@ class BlobReflectorClient(Protocol):
|
||||||
if 'received_blob' not in response_dict:
|
if 'received_blob' not in response_dict:
|
||||||
raise ValueError("I don't know if the blob made it to the intended destination!")
|
raise ValueError("I don't know if the blob made it to the intended destination!")
|
||||||
else:
|
else:
|
||||||
|
if response_dict['received_blob']:
|
||||||
|
self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
|
||||||
return self.set_not_uploading()
|
return self.set_not_uploading()
|
||||||
|
|
||||||
def open_blob_for_reading(self, blob):
|
def open_blob_for_reading(self, blob):
|
||||||
|
@ -188,7 +188,6 @@ class BlobReflectorClientFactory(ClientFactory):
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.blobs = blobs
|
self.blobs = blobs
|
||||||
self.p = None
|
self.p = None
|
||||||
self.sent_blobs = False
|
|
||||||
self.finished_deferred = defer.Deferred()
|
self.finished_deferred = defer.Deferred()
|
||||||
|
|
||||||
def buildProtocol(self, addr):
|
def buildProtocol(self, addr):
|
||||||
|
|
|
@ -33,26 +33,14 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
self.file_sender = None
|
self.file_sender = None
|
||||||
self.producer = None
|
self.producer = None
|
||||||
self.streaming = False
|
self.streaming = False
|
||||||
|
|
||||||
|
self.blob_manager = self.factory.blob_manager
|
||||||
|
self.protocol_version = self.factory.protocol_version
|
||||||
|
self.stream_hash = self.factory.stream_hash
|
||||||
|
|
||||||
d = self.load_descriptor()
|
d = self.load_descriptor()
|
||||||
d.addCallback(lambda _: self.send_handshake())
|
d.addCallback(lambda _: self.send_handshake())
|
||||||
d.addErrback(
|
d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
|
||||||
lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
|
|
||||||
|
|
||||||
@property
|
|
||||||
def file_name(self):
|
|
||||||
return self.factory.file_name
|
|
||||||
|
|
||||||
@property
|
|
||||||
def blob_manager(self):
|
|
||||||
return self.factory.blob_manager
|
|
||||||
|
|
||||||
@property
|
|
||||||
def protocol_version(self):
|
|
||||||
return self.factory.protocol_version
|
|
||||||
|
|
||||||
@property
|
|
||||||
def stream_hash(self):
|
|
||||||
return self.factory.stream_hash
|
|
||||||
|
|
||||||
def dataReceived(self, data):
|
def dataReceived(self, data):
|
||||||
self.response_buff += data
|
self.response_buff += data
|
||||||
|
@ -131,7 +119,7 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
len(filtered))
|
len(filtered))
|
||||||
return filtered
|
return filtered
|
||||||
|
|
||||||
d = self.factory.blob_manager.storage.get_blobs_for_stream(self.factory.stream_hash)
|
d = self.factory.blob_manager.storage.get_blobs_for_stream(self.stream_hash)
|
||||||
d.addCallback(self.get_validated_blobs)
|
d.addCallback(self.get_validated_blobs)
|
||||||
if not self.descriptor_needed:
|
if not self.descriptor_needed:
|
||||||
d.addCallback(lambda filtered:
|
d.addCallback(lambda filtered:
|
||||||
|
@ -151,7 +139,7 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
def _save_descriptor_blob(sd_blob):
|
def _save_descriptor_blob(sd_blob):
|
||||||
self.stream_descriptor = sd_blob
|
self.stream_descriptor = sd_blob
|
||||||
|
|
||||||
d = self.factory.blob_manager.storage.get_sd_blob_hash_for_stream(self.factory.stream_hash)
|
d = self.factory.blob_manager.storage.get_sd_blob_hash_for_stream(self.stream_hash)
|
||||||
d.addCallback(self.factory.blob_manager.get_blob)
|
d.addCallback(self.factory.blob_manager.get_blob)
|
||||||
d.addCallback(_save_descriptor_blob)
|
d.addCallback(_save_descriptor_blob)
|
||||||
return d
|
return d
|
||||||
|
@ -216,14 +204,18 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
raise ValueError("I don't know if the sd blob made it to the intended destination!")
|
raise ValueError("I don't know if the sd blob made it to the intended destination!")
|
||||||
else:
|
else:
|
||||||
self.received_descriptor_response = True
|
self.received_descriptor_response = True
|
||||||
|
disconnect = False
|
||||||
if response_dict['received_sd_blob']:
|
if response_dict['received_sd_blob']:
|
||||||
self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
|
self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
|
||||||
log.info("Sent reflector descriptor %s", self.next_blob_to_send)
|
log.info("Sent reflector descriptor %s", self.next_blob_to_send)
|
||||||
else:
|
else:
|
||||||
log.warning("Reflector failed to receive descriptor %s",
|
log.warning("Reflector failed to receive descriptor %s",
|
||||||
self.next_blob_to_send)
|
self.next_blob_to_send)
|
||||||
self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
|
disconnect = True
|
||||||
return self.set_not_uploading()
|
d = self.set_not_uploading()
|
||||||
|
if disconnect:
|
||||||
|
d.addCallback(lambda _: self.transport.loseConnection())
|
||||||
|
return d
|
||||||
|
|
||||||
def handle_normal_response(self, response_dict):
|
def handle_normal_response(self, response_dict):
|
||||||
if self.file_sender is None: # Expecting Server Info Response
|
if self.file_sender is None: # Expecting Server Info Response
|
||||||
|
@ -244,7 +236,6 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
log.debug("Sent reflector blob %s", self.next_blob_to_send)
|
log.debug("Sent reflector blob %s", self.next_blob_to_send)
|
||||||
else:
|
else:
|
||||||
log.warning("Reflector failed to receive blob %s", self.next_blob_to_send)
|
log.warning("Reflector failed to receive blob %s", self.next_blob_to_send)
|
||||||
self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
|
|
||||||
return self.set_not_uploading()
|
return self.set_not_uploading()
|
||||||
|
|
||||||
def open_blob_for_reading(self, blob):
|
def open_blob_for_reading(self, blob):
|
||||||
|
@ -312,28 +303,14 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
|
|
||||||
class EncryptedFileReflectorClientFactory(ClientFactory):
|
class EncryptedFileReflectorClientFactory(ClientFactory):
|
||||||
protocol = EncryptedFileReflectorClient
|
protocol = EncryptedFileReflectorClient
|
||||||
|
protocol_version = REFLECTOR_V2
|
||||||
|
|
||||||
def __init__(self, lbry_file):
|
def __init__(self, blob_manager, stream_hash):
|
||||||
self._lbry_file = lbry_file
|
self.blob_manager = blob_manager
|
||||||
|
self.stream_hash = stream_hash
|
||||||
self.p = None
|
self.p = None
|
||||||
self.finished_deferred = defer.Deferred()
|
self.finished_deferred = defer.Deferred()
|
||||||
|
|
||||||
@property
|
|
||||||
def blob_manager(self):
|
|
||||||
return self._lbry_file.blob_manager
|
|
||||||
|
|
||||||
@property
|
|
||||||
def stream_hash(self):
|
|
||||||
return self._lbry_file.stream_hash
|
|
||||||
|
|
||||||
@property
|
|
||||||
def file_name(self):
|
|
||||||
return self._lbry_file.file_name
|
|
||||||
|
|
||||||
@property
|
|
||||||
def protocol_version(self):
|
|
||||||
return REFLECTOR_V2
|
|
||||||
|
|
||||||
def buildProtocol(self, addr):
|
def buildProtocol(self, addr):
|
||||||
p = self.protocol()
|
p = self.protocol()
|
||||||
p.factory = self
|
p.factory = self
|
||||||
|
|
|
@ -24,15 +24,19 @@ def resolve(host):
|
||||||
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _reflect_stream(lbry_file, reflector_server):
|
def _reflect_stream(blob_manager, stream_hash, reflector_server):
|
||||||
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
||||||
factory = ClientFactory(lbry_file)
|
factory = ClientFactory(blob_manager, stream_hash)
|
||||||
ip = yield resolve(reflector_address)
|
ip = yield resolve(reflector_address)
|
||||||
yield reactor.connectTCP(ip, reflector_port, factory)
|
yield reactor.connectTCP(ip, reflector_port, factory)
|
||||||
result = yield factory.finished_deferred
|
result = yield factory.finished_deferred
|
||||||
defer.returnValue(result)
|
defer.returnValue(result)
|
||||||
|
|
||||||
|
|
||||||
|
def _reflect_file(lbry_file, reflector_server):
|
||||||
|
return _reflect_stream(lbry_file.blob_manager, lbry_file.stream_hash, reflector_server)
|
||||||
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _reflect_blobs(blob_manager, blob_hashes, reflector_server):
|
def _reflect_blobs(blob_manager, blob_hashes, reflector_server):
|
||||||
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
||||||
|
@ -43,7 +47,7 @@ def _reflect_blobs(blob_manager, blob_hashes, reflector_server):
|
||||||
defer.returnValue(result)
|
defer.returnValue(result)
|
||||||
|
|
||||||
|
|
||||||
def reflect_stream(lbry_file, reflector_server=None):
|
def reflect_file(lbry_file, reflector_server=None):
|
||||||
if reflector_server:
|
if reflector_server:
|
||||||
if len(reflector_server.split(":")) == 2:
|
if len(reflector_server.split(":")) == 2:
|
||||||
host, port = tuple(reflector_server.split(":"))
|
host, port = tuple(reflector_server.split(":"))
|
||||||
|
@ -52,7 +56,19 @@ def reflect_stream(lbry_file, reflector_server=None):
|
||||||
reflector_server = reflector_server, 5566
|
reflector_server = reflector_server, 5566
|
||||||
else:
|
else:
|
||||||
reflector_server = random.choice(conf.settings['reflector_servers'])
|
reflector_server = random.choice(conf.settings['reflector_servers'])
|
||||||
return _reflect_stream(lbry_file, reflector_server)
|
return _reflect_file(lbry_file, reflector_server)
|
||||||
|
|
||||||
|
|
||||||
|
def reflect_stream(blob_manager, stream_hash, reflector_server=None):
|
||||||
|
if reflector_server:
|
||||||
|
if len(reflector_server.split(":")) == 2:
|
||||||
|
host, port = tuple(reflector_server.split(":"))
|
||||||
|
reflector_server = host, int(port)
|
||||||
|
else:
|
||||||
|
reflector_server = reflector_server, 5566
|
||||||
|
else:
|
||||||
|
reflector_server = random.choice(conf.settings['reflector_servers'])
|
||||||
|
return _reflect_stream(blob_manager, stream_hash, reflector_server)
|
||||||
|
|
||||||
|
|
||||||
def reflect_blob_hashes(blob_hashes, blob_manager, reflector_server=None):
|
def reflect_blob_hashes(blob_hashes, blob_manager, reflector_server=None):
|
||||||
|
|
|
@ -212,10 +212,7 @@ class TestReflector(unittest.TestCase):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def send_to_server():
|
def send_to_server():
|
||||||
fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager,
|
factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash)
|
||||||
self.server_session.storage,
|
|
||||||
self.stream_hash)
|
|
||||||
factory = reflector.ClientFactory(fake_lbry_file)
|
|
||||||
|
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor
|
||||||
reactor.connectTCP('localhost', self.port, factory)
|
reactor.connectTCP('localhost', self.port, factory)
|
||||||
|
@ -348,10 +345,7 @@ class TestReflector(unittest.TestCase):
|
||||||
return factory.finished_deferred
|
return factory.finished_deferred
|
||||||
|
|
||||||
def send_to_server_as_stream(result):
|
def send_to_server_as_stream(result):
|
||||||
fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager,
|
factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash)
|
||||||
self.server_session.storage,
|
|
||||||
self.stream_hash)
|
|
||||||
factory = reflector.ClientFactory(fake_lbry_file)
|
|
||||||
|
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor
|
||||||
reactor.connectTCP('localhost', self.port, factory)
|
reactor.connectTCP('localhost', self.port, factory)
|
||||||
|
|
|
@ -12,8 +12,8 @@ GitPython==2.1.3
|
||||||
jsonrpc==1.2
|
jsonrpc==1.2
|
||||||
jsonrpclib==0.1.7
|
jsonrpclib==0.1.7
|
||||||
keyring==10.4.0
|
keyring==10.4.0
|
||||||
git+https://github.com/lbryio/lbryschema.git@v0.0.15rc3#egg=lbryschema
|
git+https://github.com/lbryio/lbryschema.git@v0.0.15#egg=lbryschema
|
||||||
git+https://github.com/lbryio/lbryum.git@v3.2.0rc20#egg=lbryum
|
git+https://github.com/lbryio/lbryum.git@v3.2.0#egg=lbryum
|
||||||
miniupnpc==1.9
|
miniupnpc==1.9
|
||||||
pbkdf2==1.3
|
pbkdf2==1.3
|
||||||
pycrypto==2.6.1
|
pycrypto==2.6.1
|
||||||
|
|
|
@ -172,7 +172,7 @@ def _api_doc(obj):
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
root_dir = os.path.dirname(os.path.dirname(__file__))
|
root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||||
build_dir = os.path.realpath(os.path.join(root_dir, DOCS_BUILD_DIR))
|
build_dir = os.path.realpath(os.path.join(root_dir, DOCS_BUILD_DIR))
|
||||||
if not os.path.exists(build_dir):
|
if not os.path.exists(build_dir):
|
||||||
os.makedirs(build_dir)
|
os.makedirs(build_dir)
|
||||||
|
|
4
setup.py
4
setup.py
|
@ -20,8 +20,8 @@ requires = [
|
||||||
'base58',
|
'base58',
|
||||||
'envparse',
|
'envparse',
|
||||||
'jsonrpc',
|
'jsonrpc',
|
||||||
'lbryschema==0.0.15rc3',
|
'lbryschema==0.0.15',
|
||||||
'lbryum==3.2.0rc20',
|
'lbryum==3.2.0',
|
||||||
'miniupnpc',
|
'miniupnpc',
|
||||||
'pycrypto',
|
'pycrypto',
|
||||||
'pyyaml',
|
'pyyaml',
|
||||||
|
|
Loading…
Reference in a new issue