Merge pull request #299 from lbryio/long-lines

Wrap Long lines
This commit is contained in:
Job Evers‐Meltzer 2016-12-07 11:41:21 -05:00 committed by GitHub
commit 9619b8dfca
76 changed files with 920 additions and 827 deletions

View file

@ -11,6 +11,11 @@
# paths.
ignore=CVS
# Add files or directories matching the regex patterns to the
# blacklist. The regex matches against base names, not paths.
# `\.#.*` - add emacs tmp files to the blacklist
ignore-patterns=\.#.*
# Pickle collected data for later comparisons.
persistent=yes
@ -121,7 +126,8 @@ logging-modules=logging
bad-functions=map,filter,input
# Good variable names which should always be accepted, separated by a comma
good-names=i,j,k,ex,Run,_
# allow `d` as its used frequently for deferred callback chains
good-names=i,j,k,ex,Run,_,d
# Bad variable names which should always be refused, separated by a comma
bad-names=foo,bar,baz,toto,tutu,tata

View file

@ -278,7 +278,10 @@ class Config(DefaultSettings):
return {k: self[k] for k in self}
def get_adjustable_settings_dict(self):
return {opt: val for opt, val in self.get_dict().iteritems() if opt in self.environ.original_schema}
return {
opt: val for opt, val in self.get_dict().iteritems()
if opt in self.environ.original_schema
}
def ensure_data_dir(self):
# although there is a risk of a race condition here we don't

View file

@ -107,8 +107,11 @@ class DiskBlobManager(BlobManager):
return defer.succeed(True)
def get_blob(self, blob_hash, upload_allowed, length=None):
"""Return a blob identified by blob_hash, which may be a new blob or a blob that is already on the hard disk"""
# TODO: if blob.upload_allowed and upload_allowed is False, change upload_allowed in blob and on disk
"""Return a blob identified by blob_hash, which may be a new blob or a
blob that is already on the hard disk
"""
# TODO: if blob.upload_allowed and upload_allowed is False,
# change upload_allowed in blob and on disk
if blob_hash in self.blobs:
return defer.succeed(self.blobs[blob_hash])
return self._make_new_blob(blob_hash, upload_allowed, length)
@ -228,7 +231,8 @@ class DiskBlobManager(BlobManager):
d = defer.succeed(True)
def log_error(err):
log.warning("Failed to delete completed blobs from the db: %s", err.getErrorMessage())
log.warning(
"Failed to delete completed blobs from the db: %s", err.getErrorMessage())
d.addErrback(log_error)
return d
@ -243,7 +247,9 @@ class DiskBlobManager(BlobManager):
if being_deleted is False:
self.blob_hashes_to_delete[blob_hash] = True
d = self.get_blob(blob_hash, True)
d.addCallbacks(delete, set_not_deleting, callbackArgs=(blob_hash,), errbackArgs=(blob_hash,))
d.addCallbacks(
delete, set_not_deleting,
callbackArgs=(blob_hash,), errbackArgs=(blob_hash,))
ds.append(d)
dl = defer.DeferredList(ds, consumeErrors=True)
dl.addCallback(delete_from_db)
@ -298,8 +304,9 @@ class DiskBlobManager(BlobManager):
def get_blobs_in_db(db_transaction):
blobs_in_db = [] # [(blob_hash, last_verified_time)]
for b in blobs_to_check:
result = db_transaction.execute("select last_verified_time from blobs where blob_hash = ?",
(b,))
result = db_transaction.execute(
"select last_verified_time from blobs where blob_hash = ?",
(b,))
row = result.fetchone()
if row is not None:
blobs_in_db.append((b, row[0]))
@ -355,8 +362,9 @@ class DiskBlobManager(BlobManager):
"where next_announce_time < ? and blob_hash is not null",
(timestamp,))
blobs = [b for b, in r.fetchall()]
transaction.execute("update blobs set next_announce_time = ? where next_announce_time < ?",
(next_announce_time, timestamp))
transaction.execute(
"update blobs set next_announce_time = ? where next_announce_time < ?",
(next_announce_time, timestamp))
return blobs
return self.db_conn.runInteraction(get_and_update)
@ -460,13 +468,17 @@ class DiskBlobManager(BlobManager):
@rerun_if_locked
def _add_blob_to_download_history(self, blob_hash, host, rate):
ts = int(time.time())
d = self.db_conn.runQuery("insert into download values (null, ?, ?, ?, ?) ", (blob_hash, str(host), float(rate), ts))
d = self.db_conn.runQuery(
"insert into download values (null, ?, ?, ?, ?) ",
(blob_hash, str(host), float(rate), ts))
return d
@rerun_if_locked
def _add_blob_to_upload_history(self, blob_hash, host, rate):
ts = int(time.time())
d = self.db_conn.runQuery("insert into upload values (null, ?, ?, ?, ?) ", (blob_hash, str(host), float(rate), ts))
d = self.db_conn.runQuery(
"insert into upload values (null, ?, ?, ?, ?) ",
(blob_hash, str(host), float(rate), ts))
return d
@ -510,7 +522,10 @@ class TempBlobManager(BlobManager):
return defer.succeed(True)
def completed_blobs(self, blobs_to_check):
blobs = [b.blob_hash for b in self.blobs.itervalues() if b.blob_hash in blobs_to_check and b.is_validated()]
blobs = [
b.blob_hash for b in self.blobs.itervalues()
if b.blob_hash in blobs_to_check and b.is_validated()
]
return defer.succeed(blobs)
def get_all_verified_blobs(self):
@ -519,7 +534,10 @@ class TempBlobManager(BlobManager):
def hashes_to_announce(self):
now = time.time()
blobs = [blob_hash for blob_hash, announce_time in self.blob_next_announces.iteritems() if announce_time < now]
blobs = [
blob_hash for blob_hash, announce_time in self.blob_next_announces.iteritems()
if announce_time < now
]
next_announce_time = now + self.hash_reannounce_time
for b in blobs:
self.blob_next_announces[b] = next_announce_time

View file

@ -55,9 +55,11 @@ class HashBlobWriter(object):
self.hashsum.update(data)
self.len_so_far += len(data)
if self.len_so_far > self.length_getter():
self.finished_cb(self, Failure(InvalidDataError("Length so far is greater than the expected length."
" %s to %s" % (str(self.len_so_far),
str(self.length_getter())))))
self.finished_cb(
self,
Failure(InvalidDataError("Length so far is greater than the expected length."
" %s to %s" % (self.len_so_far,
self.length_getter()))))
else:
if self.write_handle is None:
log.debug("Tried to write to a write_handle that was None.")
@ -90,7 +92,8 @@ class HashBlob(object):
if self.length is None and 0 <= length <= settings.BLOB_SIZE:
self.length = length
return True
log.warning("Got an invalid length. Previous length: %s, Invalid length: %s", str(self.length), str(length))
log.warning("Got an invalid length. Previous length: %s, Invalid length: %s",
self.length, length)
return False
def get_length(self):
@ -132,8 +135,9 @@ class HashBlob(object):
finished_deferred.callback(self)
del self.writers[p]
return True
log.warning("Somehow, the writer that was accepted as being valid was already removed. writer: %s",
str(writer))
log.warning(
"Somehow, the writer that was accepted as being valid was already removed: %s",
writer)
return False
def errback_finished_deferred(err):
@ -238,13 +242,15 @@ class BlobFile(HashBlob):
d = threads.deferToThread(delete_from_file_system)
def log_error(err):
log.warning("An error occurred deleting %s: %s", str(self.file_path), err.getErrorMessage())
log.warning("An error occurred deleting %s: %s",
str(self.file_path), err.getErrorMessage())
return err
d.addErrback(log_error)
return d
else:
return defer.fail(Failure(ValueError("File is currently being read or written and cannot be deleted")))
return defer.fail(Failure(
ValueError("File is currently being read or written and cannot be deleted")))
def close_read_handle(self, file_handle):
if file_handle is not None:
@ -303,7 +309,8 @@ class TempBlob(HashBlob):
self.data_buffer = ''
return defer.succeed(True)
else:
return defer.fail(Failure(ValueError("Blob is currently being read or written and cannot be deleted")))
return defer.fail(Failure(
ValueError("Blob is currently being read or written and cannot be deleted")))
def close_read_handle(self, file_handle):
file_handle.close()

View file

@ -111,14 +111,16 @@ class PTCWallet(object):
return PointTraderKeyQueryHandlerFactory(self)
def reserve_points(self, peer, amount):
"""
Ensure a certain amount of points are available to be sent as payment, before the service is rendered
"""Ensure a certain amount of points are available to be sent as
payment, before the service is rendered
@param peer: The peer to which the payment will ultimately be sent
@param amount: The amount of points to reserve
@return: A ReservedPoints object which is given to send_points once the service has been rendered
@return: A ReservedPoints object which is given to send_points
once the service has been rendered
"""
if self.wallet_balance >= self.total_reserved_points + amount:
self.total_reserved_points += amount
@ -157,7 +159,8 @@ class PTCWallet(object):
ds = []
for peer, points in self.queued_payments.items():
if peer in self.peer_pub_keys:
d = pointtraderclient.send_points(self.private_key, self.peer_pub_keys[peer], points)
d = pointtraderclient.send_points(
self.private_key, self.peer_pub_keys[peer], points)
self.wallet_balance -= points
self.total_reserved_points -= points
ds.append(d)
@ -208,10 +211,13 @@ class PTCWallet(object):
min_expected_balance = sum([a[0] for a in expected_payments if a[1] < expected_cutoff])
received_balance = 0
if self.peer_pub_keys[peer] in self.received_payments:
received_balance = sum([a[0] for a in self.received_payments[self.peer_pub_keys[peer]]])
received_balance = sum([
a[0] for a in self.received_payments[self.peer_pub_keys[peer]]])
if min_expected_balance > received_balance:
log.warning("Account in bad standing: %s (pub_key: %s), expected amount = %s, received_amount = %s",
str(peer), self.peer_pub_keys[peer], str(min_expected_balance), str(received_balance))
log.warning(
"Account in bad standing: %s (pub_key: %s), expected amount = %s, "
"received_amount = %s",
peer, self.peer_pub_keys[peer], min_expected_balance, received_balance)
def _open_db(self):
def open_db():
@ -285,7 +291,8 @@ class PointTraderKeyQueryHandlerFactory(object):
return 'public_key'
def get_description(self):
return "Point Trader Address - an address for receiving payments on the point trader testing network"
return ("Point Trader Address - an address for receiving payments on the "
"point trader testing network")
class PointTraderKeyQueryHandler(object):

View file

@ -77,5 +77,6 @@ class NegotiatedPaymentRateManager(object):
def price_limit_reached(self, peer):
if peer in self.strategy.pending_sent_offers:
offer = self.strategy.pending_sent_offers[peer]
return offer.is_too_low and round(Decimal.from_float(offer.rate), 5) >= round(self.strategy.max_rate, 5)
return (offer.is_too_low and
round(Decimal.from_float(offer.rate), 5) >= round(self.strategy.max_rate, 5))
return False

View file

@ -10,14 +10,15 @@ def get_default_price_model(blob_tracker, base_price, **kwargs):
class MeanAvailabilityWeightedPrice(object):
"""
Calculate mean-blob-availability and stream-position weighted price for a blob
"""Calculate mean-blob-availability and stream-position weighted price for a blob
Attributes:
base_price (float): base price
alpha (float): constant, > 0.0 and <= 1.0, used to more highly value blobs at the beginning of a stream.
alpha (float): constant, > 0.0 and <= 1.0, used to more highly
value blobs at the beginning of a stream.
alpha defaults to 1.0, which has a null effect
blob_tracker (BlobAvailabilityTracker): blob availability tracker
"""
implementer(IBlobPriceModel)
@ -30,21 +31,23 @@ class MeanAvailabilityWeightedPrice(object):
mean_availability = self.blob_tracker.last_mean_availability
availability = self.blob_tracker.availability.get(blob, [])
index = 0 # blob.index
price = self.base_price * self._get_availability_multiplier(mean_availability, availability) / self._frontload(index)
availability_mult = self._get_availability_multiplier(mean_availability, availability)
price = self.base_price * availability_mult / self._frontload(index)
return round(price, 5)
def _get_availability_multiplier(self, mean_availability, availability):
return Decimal(max(1, mean_availability) / Decimal(max(1, len(availability))))
def _frontload(self, index):
"""
Get front-load multiplier, used to weight prices of blobs in a stream towards the front of the stream.
"""Get front-load multiplier, used to weight prices of blobs in a
stream towards the front of the stream.
At index 0, returns 1.0
As index increases, return value approaches 2.0
@param index: blob position in stream
@return: front-load multiplier
"""
return Decimal(2.0) - (self.alpha ** index)

View file

@ -94,14 +94,16 @@ class RateLimiter(object):
#throttling
def check_dl(self):
if self.max_dl_bytes is not None and self.dl_bytes_this_interval > self.max_dl_bytes * self.tick_interval:
need_throttle = (self.max_dl_bytes is not None and
self.dl_bytes_this_interval > self.max_dl_bytes * self.tick_interval)
if need_throttle:
from twisted.internet import reactor
reactor.callLater(0, self.throttle_dl)
def check_ul(self):
if self.max_ul_bytes is not None and self.ul_bytes_this_interval > self.max_ul_bytes * self.tick_interval:
need_throttle = (self.max_ul_bytes is not None and
self.ul_bytes_this_interval > self.max_ul_bytes * self.tick_interval)
if need_throttle:
from twisted.internet import reactor
reactor.callLater(0, self.throttle_ul)

View file

@ -92,17 +92,22 @@ class Strategy(object):
class BasicAvailabilityWeightedStrategy(Strategy):
"""
Basic strategy to target blob prices based on supply relative to mean supply
"""Basic strategy to target blob prices based on supply relative to mean supply
Discount price target with each incoming request, and raise it
with each outgoing from the modeled price until the rate is
accepted or a threshold is reached
Discount price target with each incoming request, and raise it with each outgoing from the modeled price
until the rate is accepted or a threshold is reached
"""
implementer(INegotiationStrategy)
def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=None, min_rate=0.0,
is_generous=settings.is_generous_host, base_price=0.0001, alpha=1.0):
price_model = MeanAvailabilityWeightedPrice(blob_tracker, base_price=base_price, alpha=alpha)
def __init__(self, blob_tracker, acceleration=1.25,
deceleration=0.9, max_rate=None,
min_rate=0.0,
is_generous=settings.is_generous_host,
base_price=0.0001, alpha=1.0):
price_model = MeanAvailabilityWeightedPrice(
blob_tracker, base_price=base_price, alpha=alpha)
Strategy.__init__(self, price_model, max_rate, min_rate, is_generous)
self._acceleration = Decimal(acceleration) # rate of how quickly to ramp offer
self._deceleration = Decimal(deceleration)

View file

@ -73,7 +73,9 @@ class StreamDescriptorWriter(object):
return self._write_stream_descriptor(json.dumps(sd_info))
def _write_stream_descriptor(self, raw_data):
"""This method must be overridden by subclasses to write raw data to the stream descriptor"""
"""This method must be overridden by subclasses to write raw data to
the stream descriptor
"""
pass
@ -126,45 +128,55 @@ class StreamDescriptorIdentifier(object):
and returns the appropriate ones based on the type of the stream descriptor given
"""
def __init__(self):
self._sd_info_validators = {} # {stream_type: IStreamDescriptorValidator}
self._stream_options = {} # {stream_type: IStreamOptions}
self._stream_downloader_factories = defaultdict(list) # {stream_type: [IStreamDownloaderFactory]}
# {stream_type: IStreamDescriptorValidator}
self._sd_info_validators = {}
# {stream_type: IStreamOptions
self._stream_options = {}
# {stream_type: [IStreamDownloaderFactory]}
self._stream_downloader_factories = defaultdict(list)
def add_stream_type(self, stream_type, sd_info_validator, stream_options):
"""
This is how the StreamDescriptorIdentifier learns about new types of stream descriptors.
"""This is how the StreamDescriptorIdentifier learns about new types of stream descriptors.
There can only be one StreamDescriptorValidator for each type of stream.
@param stream_type: A string representing the type of stream descriptor. This must be unique to
this stream descriptor.
@param stream_type: A string representing the type of stream
descriptor. This must be unique to this stream descriptor.
@param sd_info_validator: A class implementing the IStreamDescriptorValidator interface. This class's
constructor will be passed the raw metadata in the stream descriptor file and its 'validate' method
will then be called. If the validation step fails, an exception will be thrown, preventing the stream
descriptor from being further processed.
@param sd_info_validator: A class implementing the
IStreamDescriptorValidator interface. This class's
constructor will be passed the raw metadata in the stream
descriptor file and its 'validate' method will then be
called. If the validation step fails, an exception will be
thrown, preventing the stream descriptor from being
further processed.
@param stream_options: A class implementing the IStreamOptions interface. This class's constructor will be
passed the sd_info_validator object containing the raw metadata from the stream descriptor file.
@param stream_options: A class implementing the IStreamOptions
interface. This class's constructor will be passed the
sd_info_validator object containing the raw metadata from
the stream descriptor file.
@return: None
"""
self._sd_info_validators[stream_type] = sd_info_validator
self._stream_options[stream_type] = stream_options
def add_stream_downloader_factory(self, stream_type, factory):
"""
Register a stream downloader factory with the StreamDescriptorIdentifier.
"""Register a stream downloader factory with the StreamDescriptorIdentifier.
This is how the StreamDescriptorIdentifier determines what factories may be used to process different stream
descriptor files. There must be at least one factory for each type of stream added via
"add_stream_info_validator".
This is how the StreamDescriptorIdentifier determines what
factories may be used to process different stream descriptor
files. There must be at least one factory for each type of
stream added via "add_stream_info_validator".
@param stream_type: A string representing the type of stream descriptor which the factory knows how to process.
@param stream_type: A string representing the type of stream
descriptor which the factory knows how to process.
@param factory: An object implementing the IStreamDownloaderFactory interface.
@return: None
"""
self._stream_downloader_factories[stream_type].append(factory)
@ -236,6 +248,10 @@ def download_sd_blob(session, blob_hash, payment_rate_manager):
@return: An object of type HashBlob
"""
downloader = StandaloneBlobDownloader(blob_hash, session.blob_manager, session.peer_finder,
session.rate_limiter, payment_rate_manager, session.wallet)
downloader = StandaloneBlobDownloader(blob_hash,
session.blob_manager,
session.peer_finder,
session.rate_limiter,
payment_rate_manager,
session.wallet)
return downloader.download()

View file

@ -1,3 +1,5 @@
# pylint: disable=line-too-long
# TODO: renable pylint check after lbrycrdd code is removed
import datetime
import logging
import json

View file

@ -122,7 +122,8 @@ class BlobRequester(object):
def choose_best_peers(peers):
bad_peers = self._get_bad_peers()
without_bad_peers = [p for p in peers if not p in bad_peers]
without_maxed_out_peers = [p for p in without_bad_peers if p not in self._maxed_out_peers]
without_maxed_out_peers = [
p for p in without_bad_peers if p not in self._maxed_out_peers]
return without_maxed_out_peers
d.addCallback(choose_best_peers)
@ -494,9 +495,8 @@ class DownloadRequest(RequestHelper):
def _pay_or_cancel_payment(self, arg, reserved_points, blob):
if self._can_pay_peer(blob, arg):
self._pay_peer(blob.length, reserved_points)
d = self.requestor.blob_manager.add_blob_to_download_history(str(blob),
str(self.peer.host),
float(self.protocol_prices[self.protocol]))
d = self.requestor.blob_manager.add_blob_to_download_history(
str(blob), str(self.peer.host), float(self.protocol_prices[self.protocol]))
else:
self._cancel_points(reserved_points)
return arg

View file

@ -64,8 +64,6 @@ class ClientProtocol(Protocol):
err = failure.Failure(ConnectionClosedBeforeResponseError())
else:
err = reason
#if self._response_deferreds:
# log.warning("Lost connection with active response deferreds. %s", str(self._response_deferreds))
for key, d in self._response_deferreds.items():
del self._response_deferreds[key]
d.errback(err)
@ -77,7 +75,7 @@ class ClientProtocol(Protocol):
def add_request(self, request):
if request.response_identifier in self._response_deferreds:
return defer.fail(failure.Failure(ValueError("There is already a request for that response active")))
raise ValueError("There is already a request for that response active")
self._next_request.update(request.request_dict)
d = defer.Deferred()
log.debug("Adding a request. Request: %s", str(request.request_dict))
@ -93,7 +91,7 @@ class ClientProtocol(Protocol):
blob_request.finished_deferred.addErrback(self._handle_response_error)
return d
else:
return defer.fail(failure.Failure(ValueError("There is already a blob download request active")))
raise ValueError("There is already a blob download request active")
def cancel_requests(self):
self.connection_closing = True
@ -112,8 +110,9 @@ class ClientProtocol(Protocol):
######### Internal request handling #########
def _handle_request_error(self, err):
log.error("An unexpected error occurred creating or sending a request to %s. Error message: %s",
str(self.peer), err.getTraceback())
log.error(
"An unexpected error occurred creating or sending a request to %s. Error message: %s",
self.peer, err.getTraceback())
self.transport.loseConnection()
def _ask_for_request(self):
@ -129,7 +128,9 @@ class ClientProtocol(Protocol):
self._send_request_message(request_msg)
else:
# The connection manager has indicated that this connection should be terminated
log.info("Closing the connection to %s due to having no further requests to send", str(self.peer))
log.info(
"Closing the connection to %s due to having no further requests to send",
self.peer)
self.transport.loseConnection()
d = self._connection_manager.get_next_request(self.peer, self)
@ -163,17 +164,19 @@ class ClientProtocol(Protocol):
def _handle_response_error(self, err):
# If an error gets to this point, log it and kill the connection.
if not err.check(MisbehavingPeerError, ConnectionClosedBeforeResponseError, DownloadCanceledError,
RequestCanceledError):
log.error("The connection to %s is closing due to an unexpected error: %s", str(self.peer),
err.getErrorMessage())
if not err.check(RequestCanceledError): # The connection manager is closing the connection, so
# there's no need to do it here.
expected_errors = (MisbehavingPeerError, ConnectionClosedBeforeResponseError,
DownloadCanceledError, RequestCanceledError)
if not err.check(expected_errors):
log.error("The connection to %s is closing due to an unexpected error: %s",
self.peer, err.getErrorMessage())
if not err.check(RequestCanceledError):
# The connection manager is closing the connection, so
# there's no need to do it here.
return err
def _handle_response(self, response):
ds = []
log.debug("Handling a response. Current expected responses: %s", str(self._response_deferreds))
log.debug("Handling a response. Current expected responses: %s", self._response_deferreds)
for key, val in response.items():
if key in self._response_deferreds:
d = self._response_deferreds[key]
@ -200,7 +203,8 @@ class ClientProtocol(Protocol):
failed = True
if not isinstance(result.value, DownloadCanceledError):
log.info(result.value)
log.info("The connection is closing due to an error: %s", str(result.getTraceback()))
log.info("The connection is closing due to an error: %s",
result.getTraceback())
if failed is False:
log.debug("Asking for another request.")
from twisted.internet import reactor

View file

@ -20,7 +20,8 @@ class PeerConnectionHandler(object):
class ConnectionManager(object):
implements(interfaces.IConnectionManager)
def __init__(self, downloader, rate_limiter, primary_request_creators, secondary_request_creators):
def __init__(self, downloader, rate_limiter,
primary_request_creators, secondary_request_creators):
self.downloader = downloader
self.rate_limiter = rate_limiter
self._primary_request_creators = primary_request_creators
@ -134,12 +135,14 @@ class ConnectionManager(object):
d.callback(True)
def _rank_request_creator_connections(self):
"""
@return: an ordered list of our request creators, ranked according to which has the least number of
connections open that it likes
"""Returns an ordered list of our request creators, ranked according
to which has the least number of connections open that it
likes
"""
def count_peers(request_creator):
return len([p for p in self._peer_connections.itervalues() if request_creator in p.request_creators])
return len([
p for p in self._peer_connections.itervalues()
if request_creator in p.request_creators])
return sorted(self._primary_request_creators, key=count_peers)

View file

@ -70,18 +70,22 @@ class DownloadManager(object):
def add_blob_to_list(blob, blob_num):
self.blobs[blob_num] = blob
log.debug("Added blob (hash: %s, number %s) to the list", str(blob.blob_hash), str(blob_num))
log.debug(
"Added blob (hash: %s, number %s) to the list", blob.blob_hash, blob_num)
def error_during_add(err):
log.warning("An error occurred adding the blob to blobs. Error:%s", err.getErrorMessage())
log.warning(
"An error occurred adding the blob to blobs. Error:%s", err.getErrorMessage())
return err
ds = []
for blob_info in blob_infos:
if not blob_info.blob_num in self.blobs:
self.blob_infos[blob_info.blob_num] = blob_info
log.debug("Trying to get the blob associated with blob hash %s", str(blob_info.blob_hash))
d = self.blob_manager.get_blob(blob_info.blob_hash, self.upload_allowed, blob_info.length)
log.debug(
"Trying to get the blob associated with blob hash %s", blob_info.blob_hash)
d = self.blob_manager.get_blob(
blob_info.blob_hash, self.upload_allowed, blob_info.length)
d.addCallback(add_blob_to_list, blob_info.blob_num)
d.addErrback(error_during_add)
ds.append(d)
@ -108,7 +112,10 @@ class DownloadManager(object):
if not self.blobs:
return self.calculate_total_bytes()
else:
to_be_outputted = [b for n, b in self.blobs.iteritems() if n >= self.progress_manager.last_blob_outputted]
to_be_outputted = [
b for n, b in self.blobs.iteritems()
if n >= self.progress_manager.last_blob_outputted
]
return sum([b.length for b in to_be_outputted if b.length is not None])
def calculate_bytes_left_to_download(self):

View file

@ -93,7 +93,8 @@ class DummyBlobHandler(object):
class StandaloneBlobDownloader(object):
def __init__(self, blob_hash, blob_manager, peer_finder, rate_limiter, payment_rate_manager, wallet):
def __init__(self, blob_hash, blob_manager, peer_finder,
rate_limiter, payment_rate_manager, wallet):
self.blob_hash = blob_hash
self.blob_manager = blob_manager
self.peer_finder = peer_finder

View file

@ -10,7 +10,8 @@ log = logging.getLogger(__name__)
class StreamProgressManager(object):
implements(IProgressManager)
def __init__(self, finished_callback, blob_manager, download_manager, delete_blob_after_finished=False):
def __init__(self, finished_callback, blob_manager,
download_manager, delete_blob_after_finished=False):
self.finished_callback = finished_callback
self.blob_manager = blob_manager
self.delete_blob_after_finished = delete_blob_after_finished
@ -69,8 +70,8 @@ class StreamProgressManager(object):
log.debug("delete_blob_after_finished is True")
blobs = self.download_manager.blobs
if blob_num in blobs:
log.debug("Telling the blob manager, %s, to delete blob %s", str(self.blob_manager),
blobs[blob_num].blob_hash)
log.debug("Telling the blob manager, %s, to delete blob %s",
self.blob_manager, blobs[blob_num].blob_hash)
self.blob_manager.delete_blobs([blobs[blob_num].blob_hash])
else:
log.debug("Blob number %s was not in blobs", str(blob_num))
@ -79,26 +80,40 @@ class StreamProgressManager(object):
class FullStreamProgressManager(StreamProgressManager):
def __init__(self, finished_callback, blob_manager, download_manager, delete_blob_after_finished=False):
def __init__(self, finished_callback, blob_manager,
download_manager, delete_blob_after_finished=False):
StreamProgressManager.__init__(self, finished_callback, blob_manager, download_manager,
delete_blob_after_finished)
self.outputting_d = None
######### IProgressManager #########
def _done(self, i, blobs):
"""Return true if `i` is a blob number we don't have"""
return (
i not in blobs or
(
not blobs[i].is_validated() and
i not in self.provided_blob_nums
)
)
def stream_position(self):
blobs = self.download_manager.blobs
if not blobs:
return 0
else:
for i in xrange(max(blobs.iterkeys())):
if not i in blobs or (not blobs[i].is_validated() and not i in self.provided_blob_nums):
if self._done(i, blobs):
return i
return max(blobs.iterkeys()) + 1
def needed_blobs(self):
blobs = self.download_manager.blobs
return [b for n, b in blobs.iteritems() if not b.is_validated() and not n in self.provided_blob_nums]
return [
b for n, b in blobs.iteritems()
if not b.is_validated() and not n in self.provided_blob_nums
]
######### internal #########

View file

@ -26,7 +26,8 @@ class BlobRequestHandlerFactory(object):
######### IQueryHandlerFactory #########
def build_query_handler(self):
q_h = BlobRequestHandler(self.blob_manager, self.wallet, self.payment_rate_manager, self.track)
q_h = BlobRequestHandler(
self.blob_manager, self.wallet, self.payment_rate_manager, self.track)
return q_h
def get_primary_query_identifier(self):
@ -162,7 +163,8 @@ class BlobRequestHandler(object):
return d
def record_transaction(self, blob):
d = self.blob_manager.add_blob_to_upload_history(str(blob), self.peer.host, self.blob_data_payment_rate)
d = self.blob_manager.add_blob_to_upload_history(
str(blob), self.peer.host, self.blob_data_payment_rate)
return d
def _reply_to_send_request(self, response, incoming):
@ -203,7 +205,8 @@ class BlobRequestHandler(object):
def start_transfer():
self.file_sender = FileSender()
log.debug("Starting the file upload")
assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer"
assert self.read_handle is not None, \
"self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, consumer, count_bytes)
return d

View file

@ -93,7 +93,8 @@ class ServerRequestHandler(object):
if self.request_received is False:
return self._parse_data_and_maybe_send_blob(data)
else:
log.warning("The client sent data when we were uploading a file. This should not happen")
log.warning(
"The client sent data when we were uploading a file. This should not happen")
def _parse_data_and_maybe_send_blob(self, data):
self.request_buff = self.request_buff + data
@ -155,7 +156,9 @@ class ServerRequestHandler(object):
return response
def log_errors(err):
log.warning("An error occurred handling a client request. Error message: %s", err.getErrorMessage())
log.warning(
"An error occurred handling a client request. Error message: %s",
err.getErrorMessage())
return err
def send_response(response):

View file

@ -34,11 +34,12 @@ def main():
parser.add_argument("amount_of_nodes",
help="The number of nodes to create",
type=int)
parser.add_argument("--nic_ip_address",
help="The network interface on which these nodes will listen for connections "
"from each other and from other nodes. If omitted, an attempt will be "
"made to automatically determine the system's IP address, but this may "
"result in the nodes being reachable only from this system")
parser.add_argument(
"--nic_ip_address",
help=("The network interface on which these nodes will listen for connections "
"from each other and from other nodes. If omitted, an attempt will be "
"made to automatically determine the system's IP address, but this may "
"result in the nodes being reachable only from this system"))
args = parser.parse_args()
@ -56,14 +57,19 @@ def main():
nodes = []
print 'Creating Kademlia network...'
try:
nodes.append(os.spawnlp(os.P_NOWAIT, 'lbrynet-launch-node', 'lbrynet-launch-node', str(startPort)))
node = os.spawnlp(
os.P_NOWAIT, 'lbrynet-launch-node', 'lbrynet-launch-node', str(startPort))
nodes.append(node)
for i in range(amount-1):
time.sleep(0.15)
hashAmount = i*50/amount
hashbar = '#'*hashAmount
output = '\r[%-50s] %d/%d' % (hashbar, i, amount)
sys.stdout.write(output)
nodes.append(os.spawnlp(os.P_NOWAIT, 'lbrynet-launch-node', 'lbrynet-launch-node', str(port), ipAddress, str(startPort)))
node = os.spawnlp(
os.P_NOWAIT, 'lbrynet-launch-node', 'lbrynet-launch-node', str(port),
ipAddress, str(startPort))
nodes.append(node)
port += 1
except KeyboardInterrupt:
'\nNetwork creation cancelled.'

View file

@ -35,8 +35,8 @@ class StreamBlobDecryptor(object):
def write_bytes():
if self.len_read < self.length:
num_bytes_to_decrypt = (len(self.buff) // self.cipher.block_size) * self.cipher.block_size
data_to_decrypt, self.buff = self.buff[:num_bytes_to_decrypt], self.buff[num_bytes_to_decrypt:]
num_bytes_to_decrypt = greatest_multiple(len(self.buff), self.cipher.block_size)
data_to_decrypt, self.buff = split(self.buff, num_bytes_to_decrypt)
write_func(self.cipher.decrypt(data_to_decrypt))
def finish_decrypt():
@ -108,5 +108,11 @@ class CryptStreamBlobMaker(object):
def _return_info(self, blob_hash):
return CryptBlobInfo(blob_hash, self.blob_num, self.length, binascii.hexlify(self.iv))
def greatest_multiple(a, b):
"""return the largest value `c`, that is a multiple of `b` and is <= `a`"""
return (a // b) * b
def split(buff, cutoff):
return buff[:cutoff], buff[cutoff:]

View file

@ -14,6 +14,7 @@ class CryptBlobHandler(object):
######## IBlobHandler #########
def handle_blob(self, blob, blob_info):
blob_decryptor = StreamBlobDecryptor(blob, self.key, binascii.unhexlify(blob_info.iv), blob_info.length)
blob_decryptor = StreamBlobDecryptor(
blob, self.key, binascii.unhexlify(blob_info.iv), blob_info.length)
d = blob_decryptor.decrypt(self.write_func)
return d

View file

@ -39,10 +39,10 @@ class CryptStreamDownloader(object):
def __init__(self, peer_finder, rate_limiter, blob_manager,
payment_rate_manager, wallet, upload_allowed):
"""
Initialize a CryptStreamDownloader
"""Initialize a CryptStreamDownloader
@param peer_finder: An object which implements the IPeerFinder interface. Used to look up peers by a hashsum.
@param peer_finder: An object which implements the IPeerFinder
interface. Used to look up peers by a hashsum.
@param rate_limiter: An object which implements the IRateLimiter interface
@ -53,6 +53,7 @@ class CryptStreamDownloader(object):
@param wallet: An object which implements the IWallet interface
@return:
"""
self.peer_finder = peer_finder
@ -184,11 +185,13 @@ class CryptStreamDownloader(object):
pass
def _get_blob_requester(self, download_manager):
return BlobRequester(self.blob_manager, self.peer_finder, self.payment_rate_manager, self.wallet,
return BlobRequester(self.blob_manager, self.peer_finder,
self.payment_rate_manager, self.wallet,
download_manager)
def _get_progress_manager(self, download_manager):
return FullStreamProgressManager(self._finished_downloading, self.blob_manager, download_manager)
return FullStreamProgressManager(self._finished_downloading,
self.blob_manager, download_manager)
def _get_write_func(self):
pass

View file

@ -59,7 +59,8 @@ def migrate_blockchainname_db(db_dir):
" sd_hash text)")
for name, txid, n, sd_hash in new_name_metadata:
file_cursor.execute("insert into name_metadata values (?, ?, ?, ?) ", (name, txid, n, sd_hash))
file_cursor.execute(
"insert into name_metadata values (?, ?, ?, ?) ", (name, txid, n, sd_hash))
file_cursor.execute("drop table claim_ids")
file_cursor.execute("create table claim_ids ("

View file

@ -38,11 +38,13 @@ class Contact(object):
return True
def compact_ip(self):
compact_ip = reduce(lambda buff, x: buff + bytearray([int(x)]), self.address.split('.'), bytearray())
compact_ip = reduce(
lambda buff, x: buff + bytearray([int(x)]), self.address.split('.'), bytearray())
return str(compact_ip)
def __str__(self):
return '<%s.%s object; IP address: %s, UDP port: %d>' % (self.__module__, self.__class__.__name__, self.address, self.port)
return '<%s.%s object; IP address: %s, UDP port: %d>' % (
self.__module__, self.__class__.__name__, self.address, self.port)
def __getattr__(self, name):
""" This override allows the host node to call a method of the remote

View file

@ -22,44 +22,9 @@ class DataStore(UserDict.DictMixin):
def keys(self):
""" Return a list of the keys in this data store """
# def lastPublished(self, key):
# """ Get the time the C{(key, value)} pair identified by C{key}
# was last published """
# def originalPublisherID(self, key):
# """ Get the original publisher of the data's node ID
#
# @param key: The key that identifies the stored data
# @type key: str
#
# @return: Return the node ID of the original publisher of the
# C{(key, value)} pair identified by C{key}.
# """
# def originalPublishTime(self, key):
# """ Get the time the C{(key, value)} pair identified by C{key}
# was originally published """
# def setItem(self, key, value, lastPublished, originallyPublished, originalPublisherID):
# """ Set the value of the (key, value) pair identified by C{key};
# this should set the "last published" value for the (key, value)
# pair to the current time
# """
def addPeerToBlob(self, key, value, lastPublished, originallyPublished, originalPublisherID):
pass
# def __getitem__(self, key):
# """ Get the value identified by C{key} """
# def __setitem__(self, key, value):
# """ Convenience wrapper to C{setItem}; this accepts a tuple in the
# format: (value, lastPublished, originallyPublished, originalPublisherID) """
# self.setItem(key, *value)
# def __delitem__(self, key):
# """ Delete the specified key (and its value) """
class DictDataStore(DataStore):
""" A datastore using an in-memory Python dictionary """
def __init__(self):
@ -71,27 +36,6 @@ class DictDataStore(DataStore):
""" Return a list of the keys in this data store """
return self._dict.keys()
# def lastPublished(self, key):
# """ Get the time the C{(key, value)} pair identified by C{key}
# was last published """
# return self._dict[key][1]
# def originalPublisherID(self, key):
# """ Get the original publisher of the data's node ID
#
# @param key: The key that identifies the stored data
# @type key: str
#
# @return: Return the node ID of the original publisher of the
# C{(key, value)} pair identified by C{key}.
# """
# return self._dict[key][3]
# def originalPublishTime(self, key):
# """ Get the time the C{(key, value)} pair identified by C{key}
# was originally published """
# return self._dict[key][2]
def removeExpiredPeers(self):
now = int(time.time())
def notExpired(peer):
@ -116,95 +60,3 @@ class DictDataStore(DataStore):
def getPeersForBlob(self, key):
if key in self._dict:
return [val[0] for val in self._dict[key]]
# def setItem(self, key, value, lastPublished, originallyPublished, originalPublisherID):
# """ Set the value of the (key, value) pair identified by C{key};
# this should set the "last published" value for the (key, value)
# pair to the current time
# """
# self._dict[key] = (value, lastPublished, originallyPublished, originalPublisherID)
# def __getitem__(self, key):
# """ Get the value identified by C{key} """
# return self._dict[key][0]
# def __delitem__(self, key):
# """ Delete the specified key (and its value) """
# del self._dict[key]
#class SQLiteDataStore(DataStore):
# """ Example of a SQLite database-based datastore
# """
# def __init__(self, dbFile=':memory:'):
# """
# @param dbFile: The name of the file containing the SQLite database; if
# unspecified, an in-memory database is used.
# @type dbFile: str
# """
# createDB = not os.path.exists(dbFile)
# self._db = sqlite3.connect(dbFile)
# self._db.isolation_level = None
# self._db.text_factory = str
# if createDB:
# self._db.execute('CREATE TABLE data(key, value, lastPublished, originallyPublished, originalPublisherID)')
# self._cursor = self._db.cursor()
# def keys(self):
# """ Return a list of the keys in this data store """
# keys = []
# try:
# self._cursor.execute("SELECT key FROM data")
# for row in self._cursor:
# keys.append(row[0].decode('hex'))
# finally:
# return keys
# def lastPublished(self, key):
# """ Get the time the C{(key, value)} pair identified by C{key}
# was last published """
# return int(self._dbQuery(key, 'lastPublished'))
# def originalPublisherID(self, key):
# """ Get the original publisher of the data's node ID
# @param key: The key that identifies the stored data
# @type key: str
# @return: Return the node ID of the original publisher of the
# C{(key, value)} pair identified by C{key}.
# """
# return self._dbQuery(key, 'originalPublisherID')
# def originalPublishTime(self, key):
# """ Get the time the C{(key, value)} pair identified by C{key}
# was originally published """
# return int(self._dbQuery(key, 'originallyPublished'))
# def setItem(self, key, value, lastPublished, originallyPublished, originalPublisherID):
# # Encode the key so that it doesn't corrupt the database
# encodedKey = key.encode('hex')
# self._cursor.execute("select key from data where key=:reqKey", {'reqKey': encodedKey})
# if self._cursor.fetchone() == None:
# self._cursor.execute('INSERT INTO data(key, value, lastPublished, originallyPublished, originalPublisherID) VALUES (?, ?, ?, ?, ?)', (encodedKey, buffer(pickle.dumps(value, pickle.HIGHEST_PROTOCOL)), lastPublished, originallyPublished, originalPublisherID))
# else:
# self._cursor.execute('UPDATE data SET value=?, lastPublished=?, originallyPublished=?, originalPublisherID=? WHERE key=?', (buffer(pickle.dumps(value, pickle.HIGHEST_PROTOCOL)), lastPublished, originallyPublished, originalPublisherID, encodedKey))
# def _dbQuery(self, key, columnName, unpickle=False):
# try:
# self._cursor.execute("SELECT %s FROM data WHERE key=:reqKey" % columnName, {'reqKey': key.encode('hex')})
# row = self._cursor.fetchone()
# value = str(row[0])
# except TypeError:
# raise KeyError, key
# else:
# if unpickle:
# return pickle.loads(value)
# else:
# return value
# def __getitem__(self, key):
# return self._dbQuery(key, 'value', unpickle=True)
# def __delitem__(self, key):
# self._cursor.execute("DELETE FROM data WHERE key=:reqKey", {'reqKey': key.encode('hex')})

View file

@ -79,7 +79,8 @@ class Bencode(Encoding):
# This (float data type) is a non-standard extension to the original Bencode algorithm
return 'f%fe' % data
elif data == None:
# This (None/NULL data type) is a non-standard extension to the original Bencode algorithm
# This (None/NULL data type) is a non-standard extension
# to the original Bencode algorithm
return 'n'
else:
print data
@ -133,7 +134,8 @@ class Bencode(Encoding):
endPos = data[startIndex:].find('e')+startIndex
return (float(data[startIndex+1:endPos]), endPos+1)
elif data[startIndex] == 'n':
# This (None/NULL data type) is a non-standard extension to the original Bencode algorithm
# This (None/NULL data type) is a non-standard extension
# to the original Bencode algorithm
return (None, startIndex+1)
else:
splitPos = data[startIndex:].find(':')+startIndex

View file

@ -41,7 +41,8 @@ class KBucket(object):
"""
if contact in self._contacts:
# Move the existing contact to the end of the list
# - using the new contact to allow add-on data (e.g. optimization-specific stuff) to pe updated as well
# - using the new contact to allow add-on data
# (e.g. optimization-specific stuff) to pe updated as well
self._contacts.remove(contact)
self._contacts.append(contact)
elif len(self._contacts) < constants.k:

View file

@ -45,11 +45,19 @@ class DefaultFormat(MessageTranslator):
def fromPrimitive(self, msgPrimitive):
msgType = msgPrimitive[self.headerType]
if msgType == self.typeRequest:
msg = msgtypes.RequestMessage(msgPrimitive[self.headerNodeID], msgPrimitive[self.headerPayload], msgPrimitive[self.headerArgs], msgPrimitive[self.headerMsgID])
msg = msgtypes.RequestMessage(msgPrimitive[self.headerNodeID],
msgPrimitive[self.headerPayload],
msgPrimitive[self.headerArgs],
msgPrimitive[self.headerMsgID])
elif msgType == self.typeResponse:
msg = msgtypes.ResponseMessage(msgPrimitive[self.headerMsgID], msgPrimitive[self.headerNodeID], msgPrimitive[self.headerPayload])
msg = msgtypes.ResponseMessage(msgPrimitive[self.headerMsgID],
msgPrimitive[self.headerNodeID],
msgPrimitive[self.headerPayload])
elif msgType == self.typeError:
msg = msgtypes.ErrorMessage(msgPrimitive[self.headerMsgID], msgPrimitive[self.headerNodeID], msgPrimitive[self.headerPayload], msgPrimitive[self.headerArgs])
msg = msgtypes.ErrorMessage(msgPrimitive[self.headerMsgID],
msgPrimitive[self.headerNodeID],
msgPrimitive[self.headerPayload],
msgPrimitive[self.headerArgs])
else:
# Unknown message, no payload
msg = msgtypes.Message(msgPrimitive[self.headerMsgID], msgPrimitive[self.headerNodeID])

View file

@ -44,7 +44,9 @@ class Node(object):
In Entangled, all interactions with the Kademlia network by a client
application is performed via this class (or a subclass).
"""
def __init__(self, id=None, udpPort=4000, dataStore=None, routingTableClass=None, networkProtocol=None, lbryid=None, externalIP=None):
def __init__(self, id=None, udpPort=4000, dataStore=None,
routingTableClass=None, networkProtocol=None, lbryid=None,
externalIP=None):
"""
@param dataStore: The data store to use. This must be class inheriting
from the C{DataStore} interface (or providing the
@ -73,10 +75,12 @@ class Node(object):
self.id = self._generateID()
self.lbryid = lbryid
self.port = udpPort
self._listeningPort = None # object implementing Twisted IListeningPort
# This will contain a deferred created when joining the network, to enable publishing/retrieving information from
# the DHT as soon as the node is part of the network (add callbacks to this deferred if scheduling such operations
# before the node has finished joining the network)
self._listeningPort = None # object implementing Twisted
# IListeningPort This will contain a deferred created when
# joining the network, to enable publishing/retrieving
# information from the DHT as soon as the node is part of the
# network (add callbacks to this deferred if scheduling such
# operations before the node has finished joining the network)
self._joinDeferred = None
self.next_refresh_call = None
self.next_change_token_call = None
@ -107,7 +111,8 @@ class Node(object):
state = self._dataStore['nodeState']
self.id = state['id']
for contactTriple in state['closestNodes']:
contact = Contact(contactTriple[0], contactTriple[1], contactTriple[2], self._protocol)
contact = Contact(
contactTriple[0], contactTriple[1], contactTriple[2], self._protocol)
self._routingTable.addContact(contact)
self.externalIP = externalIP
self.hash_watcher = HashWatcher()
@ -171,7 +176,8 @@ class Node(object):
#self._joinDeferred.addCallback(self._persistState)
#self._joinDeferred.addCallback(self.printContacts)
# Start refreshing k-buckets periodically, if necessary
self.next_refresh_call = twisted.internet.reactor.callLater(constants.checkRefreshInterval, self._refreshNode) #IGNORE:E1101
self.next_refresh_call = twisted.internet.reactor.callLater(
constants.checkRefreshInterval, self._refreshNode) #IGNORE:E1101
self.hash_watcher.tick()
return self._joinDeferred
@ -250,7 +256,8 @@ class Node(object):
def announce_to_peer(responseTuple):
""" @type responseMsg: kademlia.msgtypes.ResponseMessage """
# The "raw response" tuple contains the response message, and the originating address info
# The "raw response" tuple contains the response message,
# and the originating address info
responseMsg = responseTuple[0]
originAddress = responseTuple[1] # tuple: (ip adress, udp port)
# Make sure the responding node is valid, and abort the operation if it isn't
@ -275,7 +282,10 @@ class Node(object):
def requestPeers(contacts):
if self.externalIP is not None and len(contacts) >= constants.k:
if self._routingTable.distance(blob_hash, self.id) < self._routingTable.distance(blob_hash, contacts[-1].id):
is_closer = (
self._routingTable.distance(blob_hash, self.id) <
self._routingTable.distance(blob_hash, contacts[-1].id))
if is_closer:
contacts.pop()
self.store(blob_hash, value, self_store=True, originalPublisherID=self.id)
elif self.externalIP is not None:
@ -298,7 +308,8 @@ class Node(object):
def change_token(self):
self.old_token_secret = self.token_secret
self.token_secret = self._generateID()
self.next_change_token_call = twisted.internet.reactor.callLater(constants.tokenSecretChangeInterval, self.change_token)
self.next_change_token_call = twisted.internet.reactor.callLater(
constants.tokenSecretChangeInterval, self.change_token)
def make_token(self, compact_ip):
h = hashlib.new('sha384')
@ -316,54 +327,6 @@ class Node(object):
return False
return True
# def iterativeStore(self, key, value, originalPublisherID=None, age=0):
# """ This is deprecated. Use iterativeAnnounceHaveBlob instead.
#
# The Kademlia store operation
#
# Call this to store/republish data in the DHT.
#
# @param key: The hashtable key of the data
# @type key: str
# @param value: The actual data (the value associated with C{key})
# @type value: str
# @param originalPublisherID: The node ID of the node that is the
# B{original} publisher of the data
# @type originalPublisherID: str
# @param age: The relative age of the data (time in seconds since it was
# originally published). Note that the original publish time
# isn't actually given, to compensate for clock skew between
# different nodes.
# @type age: int
# """
# #print ' iterativeStore called'
# if originalPublisherID == None:
# originalPublisherID = self.id
#
# def log_error(err):
# log.error(err.getErrorMessage())
#
# # Prepare a callback for doing "STORE" RPC calls
# def executeStoreRPCs(nodes):
# #print ' .....execStoreRPCs called'
# if len(nodes) >= constants.k:
# # If this node itself is closer to the key than the last (furthest) node in the list,
# # we should store the value at ourselves as well
# if self._routingTable.distance(key, self.id) < self._routingTable.distance(key, nodes[-1].id):
# nodes.pop()
# self.store(key, value, originalPublisherID=originalPublisherID, age=age)
# else:
# self.store(key, value, originalPublisherID=originalPublisherID, age=age)
# for contact in nodes:
# d = contact.store(key, value, originalPublisherID, age)
# d.addErrback(log_error)
# return nodes
# # Find k nodes closest to the key...
# df = self.iterativeFindNode(key)
# # ...and send them STORE RPCs as soon as they've been found
# df.addCallback(executeStoreRPCs)
# return df
def iterativeFindNode(self, key):
""" The basic Kademlia node lookup operation
@ -553,17 +516,11 @@ class Node(object):
else:
raise TypeError, 'No lbryid given'
#if originalPublisherID == None:
#if rpcSenderID != None:
# originalPublisherID = rpcSenderID
#else:
# raise TypeError, 'No publisher specifed, and RPC caller ID not available. Data requires an original publisher.'
#if self_store is True:
# print "got this far"
now = int(time.time())
originallyPublished = now# - age
#print compact_address
self._dataStore.addPeerToBlob(key, compact_address, now, originallyPublished, originalPublisherID)
self._dataStore.addPeerToBlob(
key, compact_address, now, originallyPublished, originalPublisherID)
#if self_store is True:
# print "looks like it was successful maybe"
return 'OK'
@ -617,16 +574,6 @@ class Node(object):
self.hash_watcher.add_requested_hash(key, compact_ip)
return rval
# def _distance(self, keyOne, keyTwo):
# """ Calculate the XOR result between two string variables
#
# @return: XOR result of two long variables
# @rtype: long
# """
# valKeyOne = long(keyOne.encode('hex'), 16)
# valKeyTwo = long(keyTwo.encode('hex'), 16)
# return valKeyOne ^ valKeyTwo
def _generateID(self):
""" Generates an n-bit pseudo-random identifier
@ -685,7 +632,10 @@ class Node(object):
shortlist = startupShortlist
# List of active queries; len() indicates number of active probes
# - using lists for these variables, because Python doesn't allow binding a new value to a name in an enclosing (non-global) scope
#
# n.b: using lists for these variables, because Python doesn't
# allow binding a new value to a name in an enclosing
# (non-global) scope
activeProbes = []
# List of contact IDs that have already been queried
alreadyContacted = []
@ -700,7 +650,8 @@ class Node(object):
def extendShortlist(responseTuple):
""" @type responseMsg: kademlia.msgtypes.ResponseMessage """
# The "raw response" tuple contains the response message, and the originating address info
# The "raw response" tuple contains the response message,
# and the originating address info
responseMsg = responseTuple[0]
originAddress = responseTuple[1] # tuple: (ip adress, udp port)
# Make sure the responding node is valid, and abort the operation if it isn't
@ -714,7 +665,8 @@ class Node(object):
else:
# If it's not in the shortlist; we probably used a fake ID to reach it
# - reconstruct the contact, using the real node ID this time
aContact = Contact(responseMsg.nodeID, originAddress[0], originAddress[1], self._protocol)
aContact = Contact(
responseMsg.nodeID, originAddress[0], originAddress[1], self._protocol)
activeContacts.append(aContact)
# This makes sure "bootstrap"-nodes with "fake" IDs don't get queried twice
if responseMsg.nodeID not in alreadyContacted:
@ -733,7 +685,10 @@ class Node(object):
# We are looking for a value, and the remote node didn't have it
# - mark it as the closest "empty" node, if it is
if 'closestNodeNoValue' in findValueResult:
if self._routingTable.distance(key, responseMsg.nodeID) < self._routingTable.distance(key, activeContacts[0].id):
is_closer = (
self._routingTable.distance(key, responseMsg.nodeID) <
self._routingTable.distance(key, activeContacts[0].id))
if is_closer:
findValueResult['closestNodeNoValue'] = aContact
else:
findValueResult['closestNodeNoValue'] = aContact
@ -742,7 +697,8 @@ class Node(object):
contactTriples = result
for contactTriple in contactTriples:
if isinstance(contactTriple, (list, tuple)) and len(contactTriple) == 3:
testContact = Contact(contactTriple[0], contactTriple[1], contactTriple[2], self._protocol)
testContact = Contact(
contactTriple[0], contactTriple[1], contactTriple[2], self._protocol)
if testContact not in shortlist:
shortlist.append(testContact)
return responseMsg.nodeID
@ -771,31 +727,42 @@ class Node(object):
def searchIteration():
#print '==> searchiteration'
slowNodeCount[0] = len(activeProbes)
# TODO: move sort_key to be a method on the class
def sort_key(firstContact, secondContact, targetKey=key):
return cmp(
self._routingTable.distance(firstContact.id, targetKey),
self._routingTable.distance(secondContact.id, targetKey)
)
# Sort the discovered active nodes from closest to furthest
activeContacts.sort(lambda firstContact, secondContact, targetKey=key: cmp(self._routingTable.distance(firstContact.id, targetKey), self._routingTable.distance(secondContact.id, targetKey)))
activeContacts.sort(sort_key)
# This makes sure a returning probe doesn't force calling this function by mistake
while len(pendingIterationCalls):
del pendingIterationCalls[0]
# See if should continue the search
if key in findValueResult:
#print '++++++++++++++ DONE (findValue found) +++++++++++++++\n\n'
outerDf.callback(findValueResult)
return
elif len(activeContacts) and findValue == False:
if (len(activeContacts) >= constants.k) or (activeContacts[0] == prevClosestNode[0] and len(activeProbes) == slowNodeCount[0]):
is_all_done = (
len(activeContacts) >= constants.k or
(
activeContacts[0] == prevClosestNode[0] and
len(activeProbes) == slowNodeCount[0]
)
)
if is_all_done:
# TODO: Re-send the FIND_NODEs to all of the k closest nodes not already queried
# Ok, we're done; either we have accumulated k active contacts or no improvement in closestNode has been noted
#if len(activeContacts) >= constants.k:
# print '++++++++++++++ DONE (test for k active contacts) +++++++++++++++\n\n'
#else:
# print '++++++++++++++ DONE (test for closest node) +++++++++++++++\n\n'
#
# Ok, we're done; either we have accumulated k
# active contacts or no improvement in closestNode
# has been noted
outerDf.callback(activeContacts)
return
# The search continues...
if len(activeContacts):
prevClosestNode[0] = activeContacts[0]
contactedNow = 0
shortlist.sort(lambda firstContact, secondContact, targetKey=key: cmp(self._routingTable.distance(firstContact.id, targetKey), self._routingTable.distance(secondContact.id, targetKey)))
shortlist.sort(sort_key)
# Store the current shortList length before contacting other nodes
prevShortlistLength = len(shortlist)
for contact in shortlist:
@ -811,11 +778,19 @@ class Node(object):
contactedNow += 1
if contactedNow == constants.alpha:
break
if len(activeProbes) > slowNodeCount[0] \
or (len(shortlist) < constants.k and len(activeContacts) < len(shortlist) and len(activeProbes) > 0):
#print '----------- scheduling next call -------------'
# Schedule the next iteration if there are any active calls (Kademlia uses loose parallelism)
call = twisted.internet.reactor.callLater(constants.iterativeLookupDelay, searchIteration) #IGNORE:E1101
should_lookup_active_calls = (
len(activeProbes) > slowNodeCount[0] or
(
len(shortlist) < constants.k and
len(activeContacts) < len(shortlist) and
len(activeProbes) > 0
)
)
if should_lookup_active_calls:
# Schedule the next iteration if there are any active
# calls (Kademlia uses loose parallelism)
call = twisted.internet.reactor.callLater(
constants.iterativeLookupDelay, searchIteration) #IGNORE:E1101
pendingIterationCalls.append(call)
# Check for a quick contact response that made an update to the shortList
elif prevShortlistLength < len(shortlist):
@ -831,89 +806,6 @@ class Node(object):
searchIteration()
return outerDf
# def _kbucketIndex(self, key):
# """ Calculate the index of the k-bucket which is responsible for the
# specified key
#
# @param key: The key for which to find the appropriate k-bucket index
# @type key: str
#
# @return: The index of the k-bucket responsible for the specified key
# @rtype: int
# """
# distance = self._distance(self.id, key)
# bucketIndex = int(math.log(distance, 2))
# return bucketIndex
# def _randomIDInBucketRange(self, bucketIndex):
# """ Returns a random ID in the specified k-bucket's range
#
# @param bucketIndex: The index of the k-bucket to use
# @type bucketIndex: int
# """
# def makeIDString(distance):
# id = hex(distance)[2:]
# if id[-1] == 'L':
# id = id[:-1]
# if len(id) % 2 != 0:
# id = '0' + id
# id = id.decode('hex')
# id = (20 - len(id))*'\x00' + id
# return id
# min = math.pow(2, bucketIndex)
# max = math.pow(2, bucketIndex+1)
# distance = random.randrange(min, max)
# distanceStr = makeIDString(distance)
# randomID = makeIDString(self._distance(distanceStr, self.id))
# return randomID
# def _refreshKBuckets(self, startIndex=0, force=False):
# """ Refreshes all k-buckets that need refreshing, starting at the
# k-bucket with the specified index
#
# @param startIndex: The index of the bucket to start refreshing at;
# this bucket and those further away from it will
# be refreshed. For example, when joining the
# network, this node will set this to the index of
# the bucket after the one containing it's closest
# neighbour.
# @type startIndex: index
# @param force: If this is C{True}, all buckets (in the specified range)
# will be refreshed, regardless of the time they were last
# accessed.
# @type force: bool
# """
# #print '_refreshKbuckets called with index:',startIndex
# bucketIndex = []
# bucketIndex.append(startIndex + 1)
# outerDf = defer.Deferred()
# def refreshNextKBucket(dfResult=None):
# #print ' refreshNexKbucket called; bucketindex is', bucketIndex[0]
# bucketIndex[0] += 1
# while bucketIndex[0] < 160:
# if force or (int(time.time()) - self._buckets[bucketIndex[0]].lastAccessed >= constants.refreshTimeout):
# searchID = self._randomIDInBucketRange(bucketIndex[0])
# self._buckets[bucketIndex[0]].lastAccessed = int(time.time())
# #print ' refreshing bucket',bucketIndex[0]
# df = self.iterativeFindNode(searchID)
# df.addCallback(refreshNextKBucket)
# return
# else:
# bucketIndex[0] += 1
# # If this is reached, we have refreshed all the buckets
# #print ' all buckets refreshed; initiating outer deferred callback'
# outerDf.callback(None)
# #print '_refreshKbuckets starting cycle'
# refreshNextKBucket()
# #print '_refreshKbuckets returning'
# return outerDf
#def _persistState(self, *args):
# state = {'id': self.id,
# 'closestNodes': self.findNode(self.id)}
# now = int(time.time())
# self._dataStore.setItem('nodeState', state, now, now, self.id)
def _refreshNode(self):
""" Periodically called to perform k-bucket refreshes and data
replication/republishing as necessary """
@ -945,56 +837,16 @@ class Node(object):
def _scheduleNextNodeRefresh(self, *args):
#print '==== sheduling next refresh'
self.next_refresh_call = twisted.internet.reactor.callLater(constants.checkRefreshInterval, self._refreshNode)
self.next_refresh_call = twisted.internet.reactor.callLater(
constants.checkRefreshInterval, self._refreshNode)
def _removeExpiredPeers(self, *args):#args put here because _refreshRoutingTable does outerDF.callback(None)
#args put here because _refreshRoutingTable does outerDF.callback(None)
def _removeExpiredPeers(self, *args):
df = twisted.internet.threads.deferToThread(self._dataStore.removeExpiredPeers)
return df
#def _threadedRepublishData(self, *args):
# """ Republishes and expires any stored data (i.e. stored
# C{(key, value pairs)} that need to be republished/expired
#
# This method should run in a deferred thread
# """
# #print '== republishData called, node:',ord(self.id[0])
# expiredKeys = []
# for key in self._dataStore:
# # Filter internal variables stored in the datastore
# if key == 'nodeState':
# continue
# now = int(time.time())
# originalPublisherID = self._dataStore.originalPublisherID(key)
# age = now - self._dataStore.originalPublishTime(key)
# #print ' node:',ord(self.id[0]),'key:',ord(key[0]),'orig publishing time:',self._dataStore.originalPublishTime(key),'now:',now,'age:',age,'lastPublished age:',now - self._dataStore.lastPublished(key),'original pubID:', ord(originalPublisherID[0])
# if originalPublisherID == self.id:
# # This node is the original publisher; it has to republish
# # the data before it expires (24 hours in basic Kademlia)
# if age >= constants.dataExpireTimeout:
# #print ' REPUBLISHING key:', key
# #self.iterativeStore(key, self._dataStore[key])
# twisted.internet.reactor.callFromThread(self.iterativeStore, key, self._dataStore[key])
# else:
# # This node needs to replicate the data at set intervals,
# # until it expires, without changing the metadata associated with it
# # First, check if the data has expired
# if age >= constants.dataExpireTimeout:
# # This key/value pair has expired (and it has not been republished by the original publishing node
# # - remove it
# expiredKeys.append(key)
# elif now - self._dataStore.lastPublished(key) >= constants.replicateInterval:
# # ...data has not yet expired, and we need to replicate it
# #print ' replicating key:', key,'age:',age
# #self.iterativeStore(key=key, value=self._dataStore[key], originalPublisherID=originalPublisherID, age=age)
# twisted.internet.reactor.callFromThread(self.iterativeStore, key=key, value=self._dataStore[key], originalPublisherID=originalPublisherID, age=age)
# for key in expiredKeys:
# #print ' expiring key:', key
# del self._dataStore[key]
# #print 'done with threadedDataRefresh()'
def main():
parser = argparse.ArgumentParser(description="Launch a dht node")
parser.add_argument("udp_port", help="The UDP port on which the node will listen",
type=int)

View file

@ -42,7 +42,8 @@ class KademliaProtocol(protocol.DatagramProtocol):
maxToSendDelay = 10**-3#0.05
minToSendDelay = 10**-5#0.01
def __init__(self, node, msgEncoder=encoding.Bencode(), msgTranslator=msgformat.DefaultFormat()):
def __init__(self, node, msgEncoder=encoding.Bencode(),
msgTranslator=msgformat.DefaultFormat()):
self._node = node
self._encoder = msgEncoder
self._translator = msgTranslator
@ -88,7 +89,8 @@ class KademliaProtocol(protocol.DatagramProtocol):
df._rpcRawResponse = True
# Set the RPC timeout timer
timeoutCall = reactor.callLater(constants.rpcTimeout, self._msgTimeout, msg.id) #IGNORE:E1101
timeoutCall = reactor.callLater(
constants.rpcTimeout, self._msgTimeout, msg.id) #IGNORE:E1101
# Transmit the data
self._send(encodedMsg, msg.id, (contact.address, contact.port))
self._sentMessages[msg.id] = (contact.id, df, timeoutCall)
@ -193,8 +195,12 @@ class KademliaProtocol(protocol.DatagramProtocol):
class (see C{kademlia.msgformat} and C{kademlia.encoding}).
"""
if len(data) > self.msgSizeLimit:
# We have to spread the data over multiple UDP datagrams, and provide sequencing information
# 1st byte is transmission type id, bytes 2 & 3 are the total number of packets in this transmission, bytes 4 & 5 are the sequence number for this specific packet
# We have to spread the data over multiple UDP datagrams,
# and provide sequencing information
#
# 1st byte is transmission type id, bytes 2 & 3 are the
# total number of packets in this transmission, bytes 4 &
# 5 are the sequence number for this specific packet
totalPackets = len(data) / self.msgSizeLimit
if len(data) % self.msgSizeLimit > 0:
totalPackets += 1
@ -288,14 +294,18 @@ class KademliaProtocol(protocol.DatagramProtocol):
# We are still receiving this message
# See if any progress has been made; if not, kill the message
if self._partialMessagesProgress.has_key(messageID):
if len(self._partialMessagesProgress[messageID]) == len(self._partialMessages[messageID]):
same_length = (
len(self._partialMessagesProgress[messageID]) ==
len(self._partialMessages[messageID]))
if same_length:
# No progress has been made
del self._partialMessagesProgress[messageID]
del self._partialMessages[messageID]
df.errback(failure.Failure(TimeoutError(remoteContactID)))
return
# Reset the RPC timeout timer
timeoutCall = reactor.callLater(constants.rpcTimeout, self._msgTimeout, messageID) #IGNORE:E1101
timeoutCall = reactor.callLater(
constants.rpcTimeout, self._msgTimeout, messageID) #IGNORE:E1101
self._sentMessages[messageID] = (remoteContactID, df, timeoutCall)
return
del self._sentMessages[messageID]

View file

@ -143,7 +143,8 @@ class TreeRoutingTable(RoutingTable):
try:
self._buckets[bucketIndex].addContact(contact)
except kbucket.BucketFull:
# The bucket is full; see if it can be split (by checking if its range includes the host node's id)
# The bucket is full; see if it can be split (by checking
# if its range includes the host node's id)
if self._buckets[bucketIndex].keyInRange(self._parentNodeID):
self._splitBucket(bucketIndex)
# Retry the insertion attempt
@ -151,12 +152,15 @@ class TreeRoutingTable(RoutingTable):
else:
# We can't split the k-bucket
# NOTE:
# In section 2.4 of the 13-page version of the Kademlia paper, it is specified that
# in this case, the new contact should simply be dropped. However, in section 2.2,
# it states that the head contact in the k-bucket (i.e. the least-recently seen node)
# should be pinged - if it does not reply, it should be dropped, and the new contact
# added to the tail of the k-bucket. This implementation follows section 2.2 regarding
# this point.
# In section 2.4 of the 13-page version of the
# Kademlia paper, it is specified that in this case,
# the new contact should simply be dropped. However,
# in section 2.2, it states that the head contact in
# the k-bucket (i.e. the least-recently seen node)
# should be pinged - if it does not reply, it should
# be dropped, and the new contact added to the tail of
# the k-bucket. This implementation follows section
# 2.2 regarding this point.
headContact = self._buckets[bucketIndex]._contacts[0]
def replaceContact(failure):
@ -180,7 +184,8 @@ class TreeRoutingTable(RoutingTable):
# Ping the least-recently seen contact in this k-bucket
headContact = self._buckets[bucketIndex]._contacts[0]
df = headContact.ping()
# If there's an error (i.e. timeout), remove the head contact, and append the new one
# If there's an error (i.e. timeout), remove the head
# contact, and append the new one
df.addErrback(replaceContact)
def findCloseNodes(self, key, count, _rpcNodeID=None):
@ -208,8 +213,9 @@ class TreeRoutingTable(RoutingTable):
#else:
bucketIndex = self._kbucketIndex(key)
closestNodes = self._buckets[bucketIndex].getContacts(constants.k, _rpcNodeID)
# This method must return k contacts (even if we have the node with the specified key as node ID),
# unless there is less than k remote nodes in the routing table
# This method must return k contacts (even if we have the node
# with the specified key as node ID), unless there is less
# than k remote nodes in the routing table
i = 1
canGoLower = bucketIndex-i >= 0
canGoHigher = bucketIndex+i < len(self._buckets)
@ -217,10 +223,14 @@ class TreeRoutingTable(RoutingTable):
while len(closestNodes) < constants.k and (canGoLower or canGoHigher):
#TODO: this may need to be optimized
if canGoLower:
closestNodes.extend(self._buckets[bucketIndex-i].getContacts(constants.k - len(closestNodes), _rpcNodeID))
closestNodes.extend(
self._buckets[bucketIndex-i].getContacts(
constants.k - len(closestNodes), _rpcNodeID))
canGoLower = bucketIndex-(i+1) >= 0
if canGoHigher:
closestNodes.extend(self._buckets[bucketIndex+i].getContacts(constants.k - len(closestNodes), _rpcNodeID))
closestNodes.extend(
self._buckets[bucketIndex+i].getContacts(
constants.k - len(closestNodes), _rpcNodeID))
canGoHigher = bucketIndex+(i+1) < len(self._buckets)
i += 1
return closestNodes
@ -318,7 +328,8 @@ class TreeRoutingTable(RoutingTable):
@param bucketIndex: The index of the k-bucket to use
@type bucketIndex: int
"""
idValue = random.randrange(self._buckets[bucketIndex].rangeMin, self._buckets[bucketIndex].rangeMax)
idValue = random.randrange(
self._buckets[bucketIndex].rangeMin, self._buckets[bucketIndex].rangeMax)
randomID = hex(idValue)[2:]
if randomID[-1] == 'L':
randomID = randomID[:-1]
@ -379,7 +390,8 @@ class OptimizedTreeRoutingTable(TreeRoutingTable):
try:
self._buckets[bucketIndex].addContact(contact)
except kbucket.BucketFull:
# The bucket is full; see if it can be split (by checking if its range includes the host node's id)
# The bucket is full; see if it can be split (by checking
# if its range includes the host node's id)
if self._buckets[bucketIndex].keyInRange(self._parentNodeID):
self._splitBucket(bucketIndex)
# Retry the insertion attempt
@ -390,12 +402,16 @@ class OptimizedTreeRoutingTable(TreeRoutingTable):
# of the Kademlia paper (optimized contact accounting without PINGs
#- results in much less network traffic, at the expense of some memory)
# Put the new contact in our replacement cache for the corresponding k-bucket (or update it's position if it exists already)
# Put the new contact in our replacement cache for the
# corresponding k-bucket (or update it's position if
# it exists already)
if not self._replacementCache.has_key(bucketIndex):
self._replacementCache[bucketIndex] = []
if contact in self._replacementCache[bucketIndex]:
self._replacementCache[bucketIndex].remove(contact)
#TODO: Using k to limit the size of the contact replacement cache - maybe define a seperate value for this in constants.py?
#TODO: Using k to limit the size of the contact
#replacement cache - maybe define a seperate value for
#this in constants.py?
elif len(self._replacementCache) >= constants.k:
self._replacementCache.pop(0)
self._replacementCache[bucketIndex].append(contact)

View file

@ -1,7 +1,9 @@
#!/usr/bin/env python
#
# This is a basic single-node example of how to use the Entangled DHT. It creates a Node and (optionally) joins an existing DHT.
# It then does a Kademlia store and find, and then it deletes the stored value (non-Kademlia method).
# This is a basic single-node example of how to use the Entangled
# DHT. It creates a Node and (optionally) joins an existing DHT. It
# then does a Kademlia store and find, and then it deletes the stored
# value (non-Kademlia method).
#
# No tuple space functionality is demonstrated by this script.
#
@ -42,12 +44,15 @@ def storeValue(key, value):
""" Stores the specified value in the DHT using the specified key """
global node
print '\nStoring value; Key: %s, Value: %s' % (key, value)
# Store the value in the DHT. This method returns a Twisted Deferred result, which we then add callbacks to
# Store the value in the DHT. This method returns a Twisted
# Deferred result, which we then add callbacks to
deferredResult = node.announceHaveHash(key, value)
# Add our callback; this method is called when the operation completes...
deferredResult.addCallback(storeValueCallback)
# ...and for error handling, add an "error callback" as well.
# For this example script, I use a generic error handler; usually you would need something more specific
#
# For this example script, I use a generic error handler; usually
# you would need something more specific
deferredResult.addErrback(genericErrorCallback)
@ -68,8 +73,10 @@ def getValue():
""" Retrieves the value of the specified key (KEY) from the DHT """
global node, KEY
# Get the value for the specified key (immediately returns a Twisted deferred result)
print '\nRetrieving value from DHT for key "%s"...' % binascii.unhexlify("f7d9dc4de674eaa2c5a022eb95bc0d33ec2e75c6")
deferredResult = node.iterativeFindValue(binascii.unhexlify("f7d9dc4de674eaa2c5a022eb95bc0d33ec2e75c6"))
print ('\nRetrieving value from DHT for key "%s"...' %
binascii.unhexlify("f7d9dc4de674eaa2c5a022eb95bc0d33ec2e75c6"))
deferredResult = node.iterativeFindValue(
binascii.unhexlify("f7d9dc4de674eaa2c5a022eb95bc0d33ec2e75c6"))
#deferredResult = node.iterativeFindValue(KEY)
# Add a callback to this result; this will be called as soon as the operation has completed
deferredResult.addCallback(getValueCallback)
@ -79,7 +86,9 @@ def getValue():
def getValueCallback(result):
""" Callback function that is invoked when the getValue() operation succeeds """
# Check if the key was found (result is a dict of format {key: value}) or not (in which case a list of "closest" Kademlia contacts would be returned instead")
# Check if the key was found (result is a dict of format {key:
# value}) or not (in which case a list of "closest" Kademlia
# contacts would be returned instead")
print "Got the value"
print result
#if type(result) == dict:
@ -105,12 +114,13 @@ def stop():
twisted.internet.reactor.stop()
if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print 'Usage:\n%s UDP_PORT [KNOWN_NODE_IP KNOWN_NODE_PORT]' % sys.argv[0]
print 'or:\n%s UDP_PORT [FILE_WITH_KNOWN_NODES]' % sys.argv[0]
print '\nIf a file is specified, it should containg one IP address and UDP port\nper line, seperated by a space.'
print
print 'If a file is specified, it should containg one IP address and UDP port'
print 'per line, seperated by a space.'
sys.exit(1)
try:
int(sys.argv[1])
@ -118,7 +128,9 @@ if __name__ == '__main__':
print '\nUDP_PORT must be an integer value.\n'
print 'Usage:\n%s UDP_PORT [KNOWN_NODE_IP KNOWN_NODE_PORT]' % sys.argv[0]
print 'or:\n%s UDP_PORT [FILE_WITH_KNOWN_NODES]' % sys.argv[0]
print '\nIf a file is specified, it should contain one IP address and UDP port\nper line, seperated by a space.'
print
print 'If a file is specified, it should contain one IP address and UDP port'
print 'per line, seperated by a space.'
sys.exit(1)
if len(sys.argv) == 4:
@ -134,25 +146,32 @@ if __name__ == '__main__':
else:
knownNodes = None
print '\nNOTE: You have not specified any remote DHT node(s) to connect to'
print 'It will thus not be aware of any existing DHT, but will still function as a self-contained DHT (until another node contacts it).'
print 'It will thus not be aware of any existing DHT, but will still function as'
print ' a self-contained DHT (until another node contacts it).'
print 'Run this script without any arguments for info.\n'
# Set up SQLite-based data store (you could use an in-memory store instead, for example)
#if os.path.isfile('/tmp/dbFile%s.db' % sys.argv[1]):
# os.remove('/tmp/dbFile%s.db' % sys.argv[1])
#dataStore = SQLiteDataStore(dbFile = '/tmp/dbFile%s.db' % sys.argv[1])
# Create the Entangled node. It extends the functionality of a basic Kademlia node (but is fully backwards-compatible with a Kademlia-only network)
# If you wish to have a pure Kademlia network, use the entangled.kademlia.node.Node class instead
#
# Create the Entangled node. It extends the functionality of a
# basic Kademlia node (but is fully backwards-compatible with a
# Kademlia-only network)
#
# If you wish to have a pure Kademlia network, use the
# entangled.kademlia.node.Node class instead
print 'Creating Node...'
#node = EntangledNode( udpPort=int(sys.argv[1]), dataStore=dataStore )
node = Node(udpPort=int(sys.argv[1]), lbryid=lbryid)
# Schedule the node to join the Kademlia/Entangled DHT
node.joinNetwork(knownNodes)
# Schedule the "storeValue() call to be invoked after 2.5 seconds, using KEY and VALUE as arguments
# Schedule the "storeValue() call to be invoked after 2.5 seconds,
#using KEY and VALUE as arguments
#twisted.internet.reactor.callLater(2.5, storeValue, KEY, VALUE)
twisted.internet.reactor.callLater(2.5, getValue)
# Start the Twisted reactor - this fires up all networking, and allows the scheduled join operation to take place
# Start the Twisted reactor - this fires up all networking, and
# allows the scheduled join operation to take place
print 'Twisted reactor started (script will commence in 2.5 seconds)'
twisted.internet.reactor.run()

View file

@ -36,11 +36,11 @@ class IRequestSender(Interface):
"""
def add_blob_request(self, blob_request):
"""
Add a request for a blob to the next message that will be sent to the peer.
"""Add a request for a blob to the next message that will be sent to the peer.
This will cause the protocol to call blob_request.write(data) for all incoming
data, after the response message has been parsed out, until blob_request.finished_deferred fires.
This will cause the protocol to call blob_request.write(data)
for all incoming data, after the response message has been
parsed out, until blob_request.finished_deferred fires.
@param blob_request: the request for the blob
@type blob_request: ClientBlobRequest
@ -56,8 +56,7 @@ class IRequestCreator(Interface):
"""
def send_next_request(self, peer, protocol):
"""
Create a Request object for the peer and then give the protocol that request.
"""Create a Request object for the peer and then give the protocol that request.
@param peer: the Peer object which the request will be sent to.
@type peer: Peer
@ -65,7 +64,8 @@ class IRequestCreator(Interface):
@param protocol: the protocol to pass the request to.
@type protocol: object which implements IRequestSender
@return: Deferred object which will callback with True or False depending on whether a Request was sent
@return: Deferred object which will callback with True or
False depending on whether a Request was sent
@rtype: Deferred which fires with boolean
"""
@ -83,12 +83,12 @@ class IMetadataHandler(Interface):
Get metadata for the IDownloadManager.
"""
def get_initial_blobs(self):
"""
Return metadata about blobs that are known to be associated with the stream at the time that the
stream is set up.
"""Return metadata about blobs that are known to be associated with
the stream at the time that the stream is set up.
@return: Deferred object which will call back with a list of BlobInfo objects
@rtype: Deferred which fires with [BlobInfo]
"""
def final_blob_num(self):
@ -156,11 +156,12 @@ class IDownloadManager(Interface):
"""
def needed_blobs(self):
"""
Returns a list of BlobInfos representing all of the blobs that the stream still needs to download.
"""Returns a list of BlobInfos representing all of the blobs that the
stream still needs to download.
@return: the list of BlobInfos representing blobs that the stream still needs to download.
@rtype: [BlobInfo]
"""
def final_blob_num(self):
@ -172,14 +173,15 @@ class IDownloadManager(Interface):
"""
def handle_blob(self, blob_num):
"""
This function is called when the next blob in the stream is ready to be handled, whatever that may mean.
"""This function is called when the next blob in the stream is ready
to be handled, whatever that may mean.
@param blob_num: The blob_num of the blob that is ready to be handled.
@type blob_num: integer
@return: A Deferred which fires when the blob has been 'handled'
@rtype: Deferred which can fire with anything
"""
@ -188,8 +190,8 @@ class IConnectionManager(Interface):
Connects to peers so that IRequestCreators can send their requests.
"""
def get_next_request(self, peer, protocol):
"""
Ask all IRequestCreators belonging to this object to create a Request for peer and give it to protocol
"""Ask all IRequestCreators belonging to this object to create a
Request for peer and give it to protocol
@param peer: the peer which the request will be sent to.
@type peer: Peer
@ -197,9 +199,11 @@ class IConnectionManager(Interface):
@param protocol: the protocol which the request should be sent to by the IRequestCreator.
@type protocol: IRequestSender
@return: Deferred object which will callback with True or False depending on whether the IRequestSender
should send the request or hang up
@return: Deferred object which will callback with True or
False depending on whether the IRequestSender should send
the request or hang up
@rtype: Deferred which fires with boolean
"""
def protocol_disconnected(self, peer, protocol):
@ -217,11 +221,12 @@ class IConnectionManager(Interface):
class IProgressManager(Interface):
"""
Responsible for keeping track of the progress of the download.
"""Responsible for keeping track of the progress of the download.
Specifically, it is their responsibility to decide which blobs
need to be downloaded and keep track of the progress of the
download
Specifically, it is their responsibility to decide which blobs need to be downloaded and keep track of
the progress of the download
"""
def stream_position(self):
"""
@ -235,11 +240,12 @@ class IProgressManager(Interface):
"""
def needed_blobs(self):
"""
Returns a list of BlobInfos representing all of the blobs that the stream still needs to download.
"""Returns a list of BlobInfos representing all of the blobs that the
stream still needs to download.
@return: the list of BlobInfos representing blobs that the stream still needs to download.
@rtype: [BlobInfo]
"""
def blob_downloaded(self, blob, blob_info):
@ -334,24 +340,26 @@ class IRateLimiter(Interface):
"""
def register_protocol(self, protocol):
"""
Register an IRateLimited object with the IRateLimiter so that the IRateLimiter can throttle it
"""Register an IRateLimited object with the IRateLimiter so that the
IRateLimiter can throttle it
@param protocol: An object implementing the interface IRateLimited
@type protocol: Object implementing IRateLimited
@return: None
"""
def unregister_protocol(self, protocol):
"""
Unregister an IRateLimited object so that it won't be throttled any more.
"""Unregister an IRateLimited object so that it won't be throttled any more.
@param protocol: An object implementing the interface IRateLimited, which was previously registered with this
@param protocol: An object implementing the interface
IRateLimited, which was previously registered with this
IRateLimiter via "register_protocol"
@type protocol: Object implementing IRateLimited
@return: None
"""
@ -360,11 +368,11 @@ class IRequestHandler(Interface):
Pass client queries on to IQueryHandlers
"""
def register_query_handler(self, query_handler, query_identifiers):
"""
Register a query handler, which will be passed any queries that
"""Register a query handler, which will be passed any queries that
match any of the identifiers in query_identifiers
@param query_handler: the object which will handle queries matching the given query_identifiers
@param query_handler: the object which will handle queries
matching the given query_identifiers
@type query_handler: Object implementing IQueryHandler
@param query_identifiers: A list of strings representing the query identifiers
@ -372,6 +380,7 @@ class IRequestHandler(Interface):
@type query_identifiers: [string]
@return: None
"""
def register_blob_sender(self, blob_sender):
@ -462,13 +471,14 @@ class IStreamDownloaderOptions(Interface):
class IStreamDownloaderFactory(Interface):
"""
Construct IStreamDownloaders and provide options that will be passed to those IStreamDownloaders.
"""Construct IStreamDownloaders and provide options that will be
passed to those IStreamDownloaders.
"""
def can_download(self, sd_validator, payment_rate_manager):
"""
Decide whether the downloaders created by this factory can download the stream described by sd_validator
"""Decide whether the downloaders created by this factory can
download the stream described by sd_validator
@param sd_validator: object containing stream metadata
@type sd_validator: object which implements IStreamDescriptorValidator interface
@ -478,13 +488,14 @@ class IStreamDownloaderFactory(Interface):
@return: True if the downloaders can download the stream, False otherwise
@rtype: bool
"""
def make_downloader(self, sd_validator, options, payment_rate_manager):
"""
Create an object that implements the IStreamDownloader interface
"""Create an object that implements the IStreamDownloader interface
@param sd_validator: object containing stream metadata which will be given to the IStreamDownloader
@param sd_validator: object containing stream metadata which
will be given to the IStreamDownloader
@type sd_validator: object which implements IStreamDescriptorValidator interface
@param options: a list of values that will be used by the IStreamDownloaderFactory to
@ -497,6 +508,7 @@ class IStreamDownloaderFactory(Interface):
@return: a Deferred which fires with the downloader object
@rtype: Deferred which fires with IStreamDownloader
"""
def get_description(self):
@ -513,12 +525,12 @@ class IStreamDownloader(Interface):
Use metadata and data from the network for some useful purpose.
"""
def start(self):
"""
start downloading the stream
"""start downloading the stream
@return: a Deferred which fires when the stream is finished downloading, or errbacks when the stream is
cancelled.
@return: a Deferred which fires when the stream is finished
downloading, or errbacks when the stream is cancelled.
@rtype: Deferred which fires with anything
"""
def insufficient_funds(self, err):
@ -542,26 +554,28 @@ class IStreamDescriptorValidator(Interface):
def info_to_show(self):
"""
@return: A list of tuples representing metadata that should be presented to the user before starting the
download
@return: A list of tuples representing metadata that should be
presented to the user before starting the download
@rtype: [(string, string)]
"""
class IWallet(Interface):
"""
Send and receive payments.
"""Send and receive payments.
To send a payment, a payment reservation must be obtained first. This guarantees that a payment
isn't promised if it can't be paid. When the service in question is rendered, the payment
reservation must be given to the IWallet along with the final price. The reservation can also
be canceled.
To send a payment, a payment reservation must be obtained
first. This guarantees that a payment isn't promised if it can't
be paid. When the service in question is rendered, the payment
reservation must be given to the IWallet along with the final
price. The reservation can also be canceled.
"""
def stop(self):
"""
Send out any unsent payments, close any connections, and stop checking for incoming payments.
"""Send out any unsent payments, close any connections, and stop
checking for incoming payments.
@return: None
"""
def start(self):
@ -590,8 +604,8 @@ class IWallet(Interface):
"""
def reserve_points(self, peer, amount):
"""
Ensure a certain amount of points are available to be sent as payment, before the service is rendered
"""Ensure a certain amount of points are available to be sent as
payment, before the service is rendered
@param peer: The peer to which the payment will ultimately be sent
@type peer: Peer
@ -599,8 +613,10 @@ class IWallet(Interface):
@param amount: The amount of points to reserve
@type amount: float
@return: A ReservedPoints object which is given to send_points once the service has been rendered
@return: A ReservedPoints object which is given to send_points
once the service has been rendered
@rtype: ReservedPoints
"""
def cancel_point_reservation(self, reserved_points):

View file

@ -47,7 +47,8 @@ class DBEncryptedFileMetadataManager(object):
def add_blobs_to_stream(self, stream_hash, blobs):
return self._add_blobs_to_stream(stream_hash, blobs, ignore_duplicate_error=True)
def get_blobs_for_stream(self, stream_hash, start_blob=None, end_blob=None, count=None, reverse=False):
def get_blobs_for_stream(self, stream_hash, start_blob=None,
end_blob=None, count=None, reverse=False):
log.debug("Getting blobs for a stream. Count is %s", str(count))
def get_positions_of_start_and_end():
@ -97,8 +98,10 @@ class DBEncryptedFileMetadataManager(object):
# to a bug in twisted, where the connection is closed by a different thread than the
# one that opened it. The individual connections in the pool are not used in multiple
# threads.
self.db_conn = adbapi.ConnectionPool("sqlite3", (os.path.join(self.db_dir, "lbryfile_info.db")),
check_same_thread=False)
self.db_conn = adbapi.ConnectionPool(
"sqlite3",
(os.path.join(self.db_dir, "lbryfile_info.db")),
check_same_thread=False)
def create_tables(transaction):
transaction.execute("create table if not exists lbry_files (" +
@ -125,8 +128,10 @@ class DBEncryptedFileMetadataManager(object):
@rerun_if_locked
def _delete_stream(self, stream_hash):
d = self.db_conn.runQuery("select stream_hash from lbry_files where stream_hash = ?", (stream_hash,))
d.addCallback(lambda result: result[0][0] if len(result) else Failure(NoSuchStreamHashError(stream_hash)))
d = self.db_conn.runQuery(
"select stream_hash from lbry_files where stream_hash = ?", (stream_hash,))
d.addCallback(
lambda result: result[0][0] if result else Failure(NoSuchStreamHashError(stream_hash)))
def do_delete(transaction, s_h):
transaction.execute("delete from lbry_files where stream_hash = ?", (s_h,))
@ -157,21 +162,30 @@ class DBEncryptedFileMetadataManager(object):
@rerun_if_locked
def _get_stream_info(self, stream_hash):
d = self.db_conn.runQuery("select key, stream_name, suggested_file_name from lbry_files where stream_hash = ?",
(stream_hash,))
d.addCallback(lambda result: result[0] if len(result) else Failure(NoSuchStreamHashError(stream_hash)))
def get_result(res):
if res:
return res[0]
else:
raise NoSuchStreamHashError(stream_hash)
d = self.db_conn.runQuery(
"select key, stream_name, suggested_file_name from lbry_files where stream_hash = ?",
(stream_hash,))
d.addCallback(get_result)
return d
@rerun_if_locked
def _check_if_stream_exists(self, stream_hash):
d = self.db_conn.runQuery("select stream_hash from lbry_files where stream_hash = ?", (stream_hash,))
d = self.db_conn.runQuery(
"select stream_hash from lbry_files where stream_hash = ?", (stream_hash,))
d.addCallback(lambda r: True if len(r) else False)
return d
@rerun_if_locked
def _get_blob_num_by_hash(self, stream_hash, blob_hash):
d = self.db_conn.runQuery("select position from lbry_file_blobs where stream_hash = ? and blob_hash = ?",
(stream_hash, blob_hash))
d = self.db_conn.runQuery(
"select position from lbry_file_blobs where stream_hash = ? and blob_hash = ?",
(stream_hash, blob_hash))
d.addCallback(lambda r: r[0][0] if len(r) else None)
return d
@ -237,8 +251,9 @@ class DBEncryptedFileMetadataManager(object):
@rerun_if_locked
def _get_sd_blob_hashes_for_stream(self, stream_hash):
log.debug("Looking up sd blob hashes for stream hash %s", str(stream_hash))
d = self.db_conn.runQuery("select sd_blob_hash from lbry_file_descriptors where stream_hash = ?",
(stream_hash,))
d = self.db_conn.runQuery(
"select sd_blob_hash from lbry_file_descriptors where stream_hash = ?",
(stream_hash,))
d.addCallback(lambda results: [r[0] for r in results])
return d
@ -291,7 +306,8 @@ class TempEncryptedFileMetadataManager(object):
self.stream_blobs[(stream_hash, blob.blob_hash)] = info
return defer.succeed(True)
def get_blobs_for_stream(self, stream_hash, start_blob=None, end_blob=None, count=None, reverse=False):
def get_blobs_for_stream(self, stream_hash, start_blob=None,
end_blob=None, count=None, reverse=False):
if start_blob is not None:
start_num = self._get_blob_num_by_hash(stream_hash, start_blob)
@ -333,4 +349,5 @@ class TempEncryptedFileMetadataManager(object):
return defer.succeed(True)
def get_sd_blob_hashes_for_stream(self, stream_hash):
return defer.succeed([sd_hash for sd_hash, s_h in self.sd_files.iteritems() if stream_hash == s_h])
return defer.succeed(
[sd_hash for sd_hash, s_h in self.sd_files.iteritems() if stream_hash == s_h])

View file

@ -137,7 +137,8 @@ class EncryptedFileStreamDescriptorValidator(object):
raise InvalidStreamDescriptorError("Missing '%s'" % (e.args[0]))
for c in hex_suggested_file_name:
if c not in '0123456789abcdef':
raise InvalidStreamDescriptorError("Suggested file name is not a hex-encoded string")
raise InvalidStreamDescriptorError(
"Suggested file name is not a hex-encoded string")
h = get_lbry_hash_obj()
h.update(hex_stream_name)
h.update(key)

View file

@ -77,7 +77,8 @@ class EncryptedFileDownloader(CryptStreamDownloader):
return d
def _get_progress_manager(self, download_manager):
return FullStreamProgressManager(self._finished_downloading, self.blob_manager, download_manager)
return FullStreamProgressManager(self._finished_downloading,
self.blob_manager, download_manager)
def _start(self):
d = self._setup_output()
@ -120,7 +121,8 @@ class EncryptedFileDownloader(CryptStreamDownloader):
return 0
def _get_metadata_handler(self, download_manager):
return EncryptedFileMetadataHandler(self.stream_hash, self.stream_info_manager, download_manager)
return EncryptedFileMetadataHandler(self.stream_hash,
self.stream_info_manager, download_manager)
class EncryptedFileDownloaderFactory(object):
@ -143,7 +145,8 @@ class EncryptedFileDownloaderFactory(object):
def save_source_if_blob(stream_hash):
if metadata.metadata_source == StreamMetadata.FROM_BLOB:
d = self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, metadata.source_blob_hash)
d = self.stream_info_manager.save_sd_blob_hash_to_stream(
stream_hash, metadata.source_blob_hash)
else:
d = defer.succeed(True)
d.addCallback(lambda _: stream_hash)
@ -168,8 +171,11 @@ class EncryptedFileDownloaderFactory(object):
class EncryptedFileSaver(EncryptedFileDownloader):
def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager,
payment_rate_manager, wallet, download_directory, upload_allowed, file_name=None):
EncryptedFileDownloader.__init__(self, stream_hash, peer_finder, rate_limiter, blob_manager,
stream_info_manager, payment_rate_manager, wallet, upload_allowed)
EncryptedFileDownloader.__init__(self, stream_hash,
peer_finder, rate_limiter,
blob_manager, stream_info_manager,
payment_rate_manager, wallet,
upload_allowed)
self.download_directory = download_directory
self.file_name = file_name
self.file_written_to = None
@ -200,8 +206,11 @@ class EncryptedFileSaver(EncryptedFileDownloader):
return d
def _get_progress_manager(self, download_manager):
return FullStreamProgressManager(self._finished_downloading, self.blob_manager, download_manager,
delete_blob_after_finished=not self.upload_allowed)
return FullStreamProgressManager(self._finished_downloading,
self.blob_manager,
download_manager,
delete_blob_after_finished=not
self.upload_allowed)
def _setup_output(self):
def open_file():
@ -230,9 +239,10 @@ class EncryptedFileSaver(EncryptedFileDownloader):
self.file_written_to = os.path.join(self.download_directory, file_name)
except IOError:
log.error(traceback.format_exc())
raise ValueError("Failed to open %s. Make sure you have permission to save files to that"
" location." % str(os.path.join(self.download_directory,
file_name)))
raise ValueError(
"Failed to open %s. Make sure you have permission to save files to that"
" location." %
os.path.join(self.download_directory, file_name))
return threads.deferToThread(open_file)
def _close_output(self):
@ -265,9 +275,11 @@ class EncryptedFileSaverFactory(EncryptedFileDownloaderFactory):
self.download_directory = download_directory
def _make_downloader(self, stream_hash, payment_rate_manager, stream_info, upload_allowed):
return EncryptedFileSaver(stream_hash, self.peer_finder, self.rate_limiter, self.blob_manager,
self.stream_info_manager, payment_rate_manager, self.wallet,
self.download_directory, upload_allowed)
return EncryptedFileSaver(stream_hash, self.peer_finder,
self.rate_limiter, self.blob_manager,
self.stream_info_manager,
payment_rate_manager, self.wallet,
self.download_directory, upload_allowed)
@staticmethod
def get_description():
@ -277,8 +289,11 @@ class EncryptedFileSaverFactory(EncryptedFileDownloaderFactory):
class EncryptedFileOpener(EncryptedFileDownloader):
def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager,
payment_rate_manager, wallet, upload_allowed):
EncryptedFileDownloader.__init__(self, stream_hash, peer_finder, rate_limiter, blob_manager,
stream_info_manager, payment_rate_manager, wallet, upload_allowed)
EncryptedFileDownloader.__init__(self, stream_hash,
peer_finder, rate_limiter,
blob_manager, stream_info_manager,
payment_rate_manager, wallet,
upload_allowed)
self.process = None
self.process_log = None
@ -288,8 +303,11 @@ class EncryptedFileOpener(EncryptedFileDownloader):
return d
def _get_progress_manager(self, download_manager):
return FullStreamProgressManager(self._finished_downloading, self.blob_manager, download_manager,
delete_blob_after_finished=not self.upload_allowed)
return FullStreamProgressManager(self._finished_downloading,
self.blob_manager,
download_manager,
delete_blob_after_finished=not
self.upload_allowed)
def _setup_output(self):
def start_process():
@ -346,8 +364,11 @@ class EncryptedFileOpenerFactory(EncryptedFileDownloaderFactory):
return False
def _make_downloader(self, stream_hash, payment_rate_manager, stream_info, upload_allowed):
return EncryptedFileOpener(stream_hash, self.peer_finder, self.rate_limiter, self.blob_manager,
self.stream_info_manager, payment_rate_manager, self.wallet, upload_allowed)
return EncryptedFileOpener(stream_hash, self.peer_finder,
self.rate_limiter, self.blob_manager,
self.stream_info_manager,
payment_rate_manager, self.wallet,
upload_allowed)
@staticmethod
def get_description():

View file

@ -1,9 +1,11 @@
from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamType, EncryptedFileStreamDescriptorValidator
from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamType
from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamDescriptorValidator
from lbrynet.core.DownloadOption import DownloadOption, DownloadOptionChoice
def add_lbry_file_to_sd_identifier(sd_identifier):
sd_identifier.add_stream_type(EncryptedFileStreamType, EncryptedFileStreamDescriptorValidator, EncryptedFileOptions())
sd_identifier.add_stream_type(
EncryptedFileStreamType, EncryptedFileStreamDescriptorValidator, EncryptedFileOptions())
class EncryptedFileOptions(object):
@ -20,13 +22,15 @@ class EncryptedFileOptions(object):
return "%f LBC/MB" % prm.base.min_blob_data_payment_rate
rate_choices = []
rate_choices.append(DownloadOptionChoice(prm.base.min_blob_data_payment_rate,
"No change - %s" % get_default_data_rate_description(),
"No change - %s" % get_default_data_rate_description()))
rate_choices.append(DownloadOptionChoice(
prm.base.min_blob_data_payment_rate,
"No change - %s" % get_default_data_rate_description(),
"No change - %s" % get_default_data_rate_description()))
if prm.base.min_blob_data_payment_rate is not None:
rate_choices.append(DownloadOptionChoice(None,
"Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate),
"Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate)))
rate_choices.append(DownloadOptionChoice(
None,
"Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate),
"Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate)))
rate_choices.append(DownloadOptionChoice(float,
"Enter rate in LBC/MB",
"Enter rate in LBC/MB"))

View file

@ -9,7 +9,8 @@ from twisted.internet import defer
from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager
from lbrynet.core.StreamDescriptor import StreamMetadata
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaver, EncryptedFileDownloader
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaver
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileDownloader
from lbrynet.lbryfilemanager.EncryptedFileStatusReport import EncryptedFileStatusReport
from lbrynet.interfaces import IStreamDownloaderFactory
from lbrynet.lbryfile.StreamDescriptor import save_sd_info
@ -24,11 +25,15 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
STATUS_STOPPED = "stopped"
STATUS_FINISHED = "finished"
def __init__(self, rowid, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager,
lbry_file_manager, payment_rate_manager, wallet, download_directory, upload_allowed,
file_name=None):
EncryptedFileSaver.__init__(self, stream_hash, peer_finder, rate_limiter, blob_manager,
stream_info_manager, payment_rate_manager, wallet, download_directory,
def __init__(self, rowid, stream_hash, peer_finder, rate_limiter,
blob_manager, stream_info_manager, lbry_file_manager,
payment_rate_manager, wallet, download_directory,
upload_allowed, file_name=None):
EncryptedFileSaver.__init__(self, stream_hash, peer_finder,
rate_limiter, blob_manager,
stream_info_manager,
payment_rate_manager, wallet,
download_directory,
upload_allowed, file_name)
self.sd_hash = None
self.txid = None
@ -131,7 +136,8 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
def _start(self):
d = EncryptedFileSaver._start(self)
d.addCallback(lambda _: self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash))
d.addCallback(
lambda _: self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash))
def _save_sd_hash(sd_hash):
if len(sd_hash):
@ -170,7 +176,8 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
return self.lbry_file_manager.change_lbry_file_status(self, s)
def _get_progress_manager(self, download_manager):
return FullStreamProgressManager(self._finished_downloading, self.blob_manager, download_manager)
return FullStreamProgressManager(self._finished_downloading,
self.blob_manager, download_manager)
class ManagedEncryptedFileDownloaderFactory(object):
@ -182,14 +189,18 @@ class ManagedEncryptedFileDownloaderFactory(object):
def can_download(self, sd_validator):
return True
def make_downloader(self, metadata, options, payment_rate_manager, download_directory=None, file_name=None):
def make_downloader(self, metadata, options, payment_rate_manager,
download_directory=None, file_name=None):
data_rate = options[0]
upload_allowed = options[1]
def save_source_if_blob(stream_hash):
if metadata.metadata_source == StreamMetadata.FROM_BLOB:
d = self.lbry_file_manager.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash,
metadata.source_blob_hash)
# TODO: should never have to dig this deep into a another classes
# members. lbry_file_manager should have a
# save_sd_blob_hash_to_stream method
d = self.lbry_file_manager.stream_info_manager.save_sd_blob_hash_to_stream(
stream_hash, metadata.source_blob_hash)
else:
d = defer.succeed(True)
d.addCallback(lambda _: stream_hash)
@ -197,12 +208,13 @@ class ManagedEncryptedFileDownloaderFactory(object):
d = save_sd_info(self.lbry_file_manager.stream_info_manager, metadata.validator.raw_info)
d.addCallback(save_source_if_blob)
d.addCallback(lambda stream_hash: self.lbry_file_manager.add_lbry_file(stream_hash,
payment_rate_manager,
data_rate,
upload_allowed,
download_directory=download_directory,
file_name=file_name))
d.addCallback(lambda stream_hash: self.lbry_file_manager.add_lbry_file(
stream_hash,
payment_rate_manager,
data_rate,
upload_allowed,
download_directory=download_directory,
file_name=file_name))
return d
@staticmethod

View file

@ -13,7 +13,8 @@ from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamType
from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError, CurrentlyStoppingError
from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError
from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError
from lbrynet.core.sqlite_helpers import rerun_if_locked
@ -21,8 +22,9 @@ log = logging.getLogger(__name__)
class EncryptedFileManager(object):
"""
Keeps track of currently opened LBRY Files, their options, and their LBRY File specific metadata.
"""Keeps track of currently opened LBRY Files, their options, and
their LBRY File specific metadata.
"""
def __init__(self, session, stream_info_manager, sd_identifier, download_directory=None):
@ -69,12 +71,14 @@ class EncryptedFileManager(object):
def _add_to_sd_identifier(self):
downloader_factory = ManagedEncryptedFileDownloaderFactory(self)
self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, downloader_factory)
self.sd_identifier.add_stream_downloader_factory(
EncryptedFileStreamType, downloader_factory)
def _start_lbry_files(self):
def set_options_and_restore(rowid, stream_hash, options):
payment_rate_manager = NegotiatedPaymentRateManager(self.session.base_payment_rate_manager,
payment_rate_manager = NegotiatedPaymentRateManager(
self.session.base_payment_rate_manager,
self.session.blob_tracker)
d = self.start_lbry_file(rowid, stream_hash, payment_rate_manager,
blob_data_rate=options)
@ -97,8 +101,9 @@ class EncryptedFileManager(object):
d.addCallback(start_lbry_files)
return d
def start_lbry_file(self, rowid, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True,
download_directory=None, file_name=None):
def start_lbry_file(self, rowid, stream_hash,
payment_rate_manager, blob_data_rate=None, upload_allowed=True,
download_directory=None, file_name=None):
if not download_directory:
download_directory = self.download_directory
payment_rate_manager.min_blob_data_payment_rate = blob_data_rate
@ -116,11 +121,16 @@ class EncryptedFileManager(object):
d.addCallback(lambda _: lbry_file_downloader)
return d
def add_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True,
download_directory=None, file_name=None):
def add_lbry_file(self, stream_hash, payment_rate_manager,
blob_data_rate=None,
upload_allowed=True,
download_directory=None,
file_name=None):
d = self._save_lbry_file(stream_hash, blob_data_rate)
d.addCallback(lambda rowid: self.start_lbry_file(rowid, stream_hash, payment_rate_manager,
blob_data_rate, upload_allowed, download_directory, file_name))
d.addCallback(
lambda rowid: self.start_lbry_file(
rowid, stream_hash, payment_rate_manager,
blob_data_rate, upload_allowed, download_directory, file_name))
return d
def delete_lbry_file(self, lbry_file):
@ -195,14 +205,19 @@ class EncryptedFileManager(object):
# to a bug in twisted, where the connection is closed by a different thread than the
# one that opened it. The individual connections in the pool are not used in multiple
# threads.
self.sql_db = adbapi.ConnectionPool("sqlite3", os.path.join(self.session.db_dir, "lbryfile_info.db"),
check_same_thread=False)
return self.sql_db.runQuery("create table if not exists lbry_file_options (" +
" blob_data_rate real, " +
" status text," +
" stream_hash text,"
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")")
self.sql_db = adbapi.ConnectionPool(
"sqlite3",
os.path.join(self.session.db_dir, "lbryfile_info.db"),
check_same_thread=False
)
return self.sql_db.runQuery(
"create table if not exists lbry_file_options (" +
" blob_data_rate real, " +
" status text," +
" stream_hash text,"
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")"
)
@rerun_if_locked
def _save_lbry_file(self, stream_hash, data_payment_rate):
@ -220,8 +235,9 @@ class EncryptedFileManager(object):
@rerun_if_locked
def _set_lbry_file_payment_rate(self, rowid, new_rate):
return self.sql_db.runQuery("update lbry_file_options set blob_data_rate = ? where rowid = ?",
(new_rate, rowid))
return self.sql_db.runQuery(
"update lbry_file_options set blob_data_rate = ? where rowid = ?",
(new_rate, rowid))
@rerun_if_locked
def _get_all_lbry_files(self):
@ -237,7 +253,8 @@ class EncryptedFileManager(object):
def _get_lbry_file_status(self, rowid):
d = self.sql_db.runQuery("select status from lbry_file_options where rowid = ?",
(rowid,))
d.addCallback(lambda r: r[0][0] if len(r) else ManagedEncryptedFileDownloader.STATUS_STOPPED)
d.addCallback(lambda r: (
r[0][0] if len(r) else ManagedEncryptedFileDownloader.STATUS_STOPPED))
return d
@rerun_if_locked

View file

@ -1,3 +1,4 @@
# pylint: skip-file
from lbrynet.cryptstream.CryptBlob import CryptStreamBlobMaker, CryptBlobInfo
import binascii

View file

@ -1,3 +1,4 @@
# pylint: skip-file
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorWriter
from lbrynet.lbrylive.StreamDescriptor import get_sd_info
from lbrynet.cryptstream.CryptStreamCreator import CryptStreamCreator

View file

@ -1,3 +1,4 @@
# pylint: skip-file
import time
import logging
from twisted.enterprise import adbapi

View file

@ -1,3 +1,4 @@
# pylint: skip-file
class BaseLiveStreamPaymentRateManager(object):
def __init__(self, blob_info_rate, blob_data_rate=None):
self.min_live_blob_info_payment_rate = blob_info_rate

View file

@ -1,4 +1,5 @@
# pylint: skip-file
# pylint: skip-file
# This file is not maintained, but might be used in the future
#
import logging

View file

@ -1,4 +1,5 @@
# pylint: skip-file
# pylint: skip-file
# This file is not maintained, but might be used in the future
#
import logging

View file

@ -1,3 +1,4 @@
# pylint: skip-file
import binascii
import logging
from lbrynet.core.cryptoutils import get_lbry_hash_obj, verify_signature

View file

@ -1,3 +1,4 @@
# pylint: skip-file
import binascii
from lbrynet.core.StreamDescriptor import StreamMetadata
from lbrynet.cryptstream.client.CryptStreamDownloader import CryptStreamDownloader

View file

@ -1,3 +1,4 @@
# pylint: skip-file
from collections import defaultdict
import logging
from zope.interface import implements

View file

@ -1,3 +1,4 @@
# pylint: skip-file
from lbrynet.lbrylive.StreamDescriptor import LiveStreamType, LiveStreamDescriptorValidator
from lbrynet.core.DownloadOption import DownloadOption, DownloadOptionChoice

View file

@ -1,3 +1,4 @@
# pylint: skip-file
import logging
from lbrynet.core.client.StreamProgressManager import StreamProgressManager
from twisted.internet import defer

View file

@ -1,3 +1,4 @@
# pylint: skip-file
import logging
from twisted.internet import defer
from zope.interface import implements

View file

@ -26,7 +26,8 @@ from lbrynet import conf, reflector, analytics
from lbrynet.conf import LBRYCRD_WALLET, LBRYUM_WALLET, PTC_WALLET
from lbrynet.metadata.Fee import FeeValidator
from lbrynet.metadata.Metadata import Metadata, verify_name_characters
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaverFactory, EncryptedFileOpenerFactory
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaverFactory
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileOpenerFactory
from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager
@ -39,7 +40,8 @@ from lbrynet.lbrynet_daemon.Publisher import Publisher
from lbrynet.lbrynet_daemon.ExchangeRateManager import ExchangeRateManager
from lbrynet.lbrynet_daemon.auth.server import AuthJSONRPCServer
from lbrynet.core import log_support, utils, Platform
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob, BlobStreamDescriptorReader
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader
from lbrynet.core.Session import Session
from lbrynet.core.PTCWallet import PTCWallet
from lbrynet.core.Wallet import LBRYumWallet
@ -87,7 +89,8 @@ CONNECT_CODE_wallet = 'wallet_catchup_lag'
CONNECTION_PROBLEM_CODES = [
(CONNECT_CODE_VERSION_CHECK, "There was a problem checking for updates on github"),
(CONNECT_CODE_NETWORK, "Your internet connection appears to have been interrupted"),
(CONNECT_CODE_wallet, "Synchronization with the blockchain is lagging... if this continues try restarting LBRY")
(CONNECT_CODE_wallet,
"Synchronization with the blockchain is lagging... if this continues try restarting LBRY")
]
BAD_REQUEST = 400
@ -365,7 +368,8 @@ class Daemon(AuthJSONRPCServer):
def _load_caches(self):
if os.path.isfile(os.path.join(self.db_dir, "stream_info_cache.json")):
with open(os.path.join(self.db_dir, "stream_info_cache.json"), "r") as stream_info_cache:
filename = os.path.join(self.db_dir, "stream_info_cache.json")
with open(filename, "r") as stream_info_cache:
self.name_cache = json.loads(stream_info_cache.read())
log.info("Loaded claim info cache")
@ -382,7 +386,8 @@ class Daemon(AuthJSONRPCServer):
def _log_failure():
log.info("lbrynet connectivity test failed")
wonderfullife_sh = "6f3af0fa3924be98a54766aa2715d22c6c1509c3f7fa32566df4899a41f3530a9f97b2ecb817fa1dcbf1b30553aefaa7"
wonderfullife_sh = ("6f3af0fa3924be98a54766aa2715d22c6c1509c3f7fa32566df4899"
"a41f3530a9f97b2ecb817fa1dcbf1b30553aefaa7")
d = download_sd_blob(self.session, wonderfullife_sh, self.session.base_payment_rate_manager)
d.addCallbacks(lambda _: _log_success, lambda _: _log_failure)
@ -414,9 +419,14 @@ class Daemon(AuthJSONRPCServer):
return defer.succeed("Started LBRY file")
def _get_and_start_file(name):
def start_stopped_file(l):
if l.stopped:
return _start_file(l)
else:
return "LBRY file was already running"
d = defer.succeed(self.pending_claims.pop(name))
d.addCallback(lambda _: self._get_lbry_file(FileID.NAME, name, return_json=False))
d.addCallback(lambda l: _start_file(l) if l.stopped else "LBRY file was already running")
d.addCallback(start_stopped_file)
def re_add_to_pending_claims(name):
log.warning("Re-add %s to pending claims", name)
@ -468,11 +478,13 @@ class Daemon(AuthJSONRPCServer):
self.session.blob_manager
)
try:
self.reflector_server_port = reactor.listenTCP(self.reflector_port, reflector_factory)
self.reflector_server_port = reactor.listenTCP(
self.reflector_port, reflector_factory)
log.info('Started reflector on port %s', self.reflector_port)
except error.CannotListenError as e:
log.exception("Couldn't bind reflector to port %d", self.reflector_port)
raise ValueError("{} lbrynet may already be running on your computer.".format(e))
raise ValueError(
"{} lbrynet may already be running on your computer.".format(e))
return defer.succeed(True)
def _stop_reflector(self):
@ -613,12 +625,18 @@ class Daemon(AuthJSONRPCServer):
'search_timeout': float,
'cache_time': int
}
def can_update_key(settings, key, setting_type):
return (
isinstance(settings[key], setting_type) or
(
key == "max_key_fee" and
isinstance(FeeValidator(settings[key]).amount, setting_type)
)
)
for key, setting_type in setting_types.iteritems():
if key in settings:
if isinstance(settings[key], setting_type):
conf.settings.update({key: settings[key]})
elif key == "max_key_fee" and isinstance(FeeValidator(settings[key]).amount, setting_type):
if can_update_key(settings, key, setting_type):
conf.settings.update({key: settings[key]})
else:
try:
@ -666,7 +684,7 @@ class Daemon(AuthJSONRPCServer):
old_revision = int(open(self.db_revision_file).read().strip())
if old_revision > self.current_db_revision:
return defer.fail(Exception('This version of lbrynet is not compatible with the database'))
raise Exception('This version of lbrynet is not compatible with the database')
def update_version_file_and_print_success():
self._write_db_revision_file(self.current_db_revision)
@ -675,7 +693,8 @@ class Daemon(AuthJSONRPCServer):
if old_revision < self.current_db_revision:
from lbrynet.db_migrator import dbmigrator
log.info("Upgrading your databases...")
d = threads.deferToThread(dbmigrator.migrate_db, self.db_dir, old_revision, self.current_db_revision)
d = threads.deferToThread(
dbmigrator.migrate_db, self.db_dir, old_revision, self.current_db_revision)
d.addCallback(lambda _: update_version_file_and_print_success())
return d
return defer.succeed(True)
@ -773,11 +792,18 @@ class Daemon(AuthJSONRPCServer):
return r
def create_session(results):
self.session = Session(results['default_data_payment_rate'], db_dir=self.db_dir, lbryid=self.lbryid,
blob_dir=self.blobfile_dir, dht_node_port=self.dht_node_port,
known_dht_nodes=conf.settings.known_dht_nodes, peer_port=self.peer_port,
use_upnp=self.use_upnp, wallet=results['wallet'],
is_generous=conf.settings.is_generous_host)
self.session = Session(
results['default_data_payment_rate'],
db_dir=self.db_dir,
lbryid=self.lbryid,
blob_dir=self.blobfile_dir,
dht_node_port=self.dht_node_port,
known_dht_nodes=conf.settings.known_dht_nodes,
peer_port=self.peer_port,
use_upnp=self.use_upnp,
wallet=results['wallet'],
is_generous=conf.settings.is_generous_host
)
self.startup_status = STARTUP_STAGES[2]
dl = defer.DeferredList([d1, d2], fireOnOneErrback=True)
@ -788,14 +814,25 @@ class Daemon(AuthJSONRPCServer):
return dl
def _setup_stream_identifier(self):
file_saver_factory = EncryptedFileSaverFactory(self.session.peer_finder, self.session.rate_limiter,
self.session.blob_manager, self.stream_info_manager,
self.session.wallet, self.download_directory)
self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, file_saver_factory)
file_opener_factory = EncryptedFileOpenerFactory(self.session.peer_finder, self.session.rate_limiter,
self.session.blob_manager, self.stream_info_manager,
self.session.wallet)
self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, file_opener_factory)
file_saver_factory = EncryptedFileSaverFactory(
self.session.peer_finder,
self.session.rate_limiter,
self.session.blob_manager,
self.stream_info_manager,
self.session.wallet,
self.download_directory
)
self.sd_identifier.add_stream_downloader_factory(
EncryptedFileStreamType, file_saver_factory)
file_opener_factory = EncryptedFileOpenerFactory(
self.session.peer_finder,
self.session.rate_limiter,
self.session.blob_manager,
self.stream_info_manager,
self.session.wallet
)
self.sd_identifier.add_stream_downloader_factory(
EncryptedFileStreamType, file_opener_factory)
return defer.succeed(None)
def _download_sd_blob(self, sd_hash, timeout=conf.settings.sd_download_timeout):
@ -890,8 +927,13 @@ class Daemon(AuthJSONRPCServer):
# TODO: could possibly be a timing issue here
d.addCallback(lambda c: self.stream_info_manager.delete_stream(s_h) if c == 0 else True)
if delete_file:
d.addCallback(lambda _: os.remove(os.path.join(self.download_directory, lbry_file.file_name)) if
os.path.isfile(os.path.join(self.download_directory, lbry_file.file_name)) else defer.succeed(None))
def remove_if_file():
filename = os.path.join(self.download_directory, lbry_file.file_name)
if os.path.isfile(filename):
os.remove(filename)
else:
return defer.succeed(None)
d.addCallback(lambda _: remove_if_file)
return d
d.addCallback(lambda _: finish_deletion(lbry_file))
@ -908,14 +950,15 @@ class Daemon(AuthJSONRPCServer):
d = defer.succeed(None)
reactor.callLater(self.search_timeout, _check_est, d)
d.addCallback(lambda _: download_sd_blob(self.session, sd_hash, self.blob_request_payment_rate_manager))
d.addCallback(
lambda _: download_sd_blob(
self.session, sd_hash, self.blob_request_payment_rate_manager))
return d
def get_or_download_sd_blob(self, sd_hash):
"""Return previously downloaded sd blob if already in the blob
manager, otherwise download and return it
"""
Return previously downloaded sd blob if already in the blob manager, otherwise download and return it
"""
d = self.session.blob_manager.completed_blobs([sd_hash])
d.addCallback(self._get_or_download_sd_blob, sd_hash)
return d
@ -965,7 +1008,8 @@ class Daemon(AuthJSONRPCServer):
def _handle_err(err):
if isinstance(err, Failure):
log.warning("Timeout getting blob for cost est for lbry://%s, using only key fee", name)
log.warning(
"Timeout getting blob for cost est for lbry://%s, using only key fee", name)
return 0.0
raise err
@ -987,11 +1031,10 @@ class Daemon(AuthJSONRPCServer):
d.addCallback(self._get_est_cost_from_metadata, name)
return d
def get_est_cost(self, name, size=None):
"""
Get a cost estimate for a lbry stream, if size is not provided the sd blob will be downloaded
to determine the stream size
"""Get a cost estimate for a lbry stream, if size is not provided the
sd blob will be downloaded to determine the stream size
"""
if size is not None:
@ -1284,25 +1327,28 @@ class Daemon(AuthJSONRPCServer):
"""
def _log_settings_change():
log.info("Set daemon settings to " + json.dumps(conf.settings.get_adjustable_settings_dict()))
log.info(
"Set daemon settings to %s",
json.dumps(conf.settings.get_adjustable_settings_dict()))
d = self._update_settings(p)
d.addErrback(lambda err: log.info(err.getTraceback()))
d.addCallback(lambda _: _log_settings_change())
d.addCallback(lambda _: self._render_response(conf.settings.get_adjustable_settings_dict(), OK_CODE))
d.addCallback(
lambda _: self._regnder_response(conf.settings.get_adjustable_settings_dict(), OK_CODE))
return d
def jsonrpc_help(self, p=None):
"""
Function to retrieve docstring for API function
"""Function to retrieve docstring for API function
Args:
optional 'function': function to retrieve documentation for
optional 'callable_during_startup':
Returns:
if given a function, returns given documentation
if given callable_during_startup flag, returns list of functions callable during the startup sequence
if given callable_during_startup flag, returns list of
functions callable during the startup sequence
if no params are given, returns the list of callable functions
"""
@ -1777,7 +1823,6 @@ class Daemon(AuthJSONRPCServer):
txid : txid of resulting transaction if succesful
nout : nout of the resulting support claim if succesful
fee : fee paid for the transaction if succesful
"""
name = p[FileID.NAME]
@ -2095,14 +2140,16 @@ class Daemon(AuthJSONRPCServer):
return self._render_response(True, OK_CODE)
def jsonrpc_upload_log(self, p=None):
"""
Upload log
"""Upload log
Args, optional:
'name_prefix': prefix to indicate what is requesting the log upload
'exclude_previous': true/false, whether or not to exclude previous sessions from upload, defaults on true
'exclude_previous': true/false, whether or not to exclude
previous sessions from upload, defaults on true
Returns:
True
"""
if p:
@ -2313,7 +2360,8 @@ class Daemon(AuthJSONRPCServer):
def get_lbryum_version_from_github():
r = urlopen("https://raw.githubusercontent.com/lbryio/lbryum/master/lib/version.py").read().split('\n')
r = urlopen(
"https://raw.githubusercontent.com/lbryio/lbryum/master/lib/version.py").read().split('\n')
version = next(line.split("=")[1].split("#")[0].replace(" ", "")
for line in r if "LBRYUM_VERSION" in line)
version = version.replace("'", "")
@ -2358,8 +2406,10 @@ def get_output_callback(params):
class _DownloadNameHelper(object):
def __init__(self, daemon, name, timeout=conf.settings.download_timeout, download_directory=None,
file_name=None, wait_for_write=True):
def __init__(self, daemon, name,
timeout=conf.settings.download_timeout,
download_directory=None, file_name=None,
wait_for_write=True):
self.daemon = daemon
self.name = name
self.timeout = timeout

View file

@ -118,18 +118,19 @@ class GetStream(object):
log.warning("Insufficient funds to download lbry://%s", self.resolved_name)
return defer.fail(InsufficientFundsError())
if converted_fee > max_key_fee:
log.warning("Key fee %f above limit of %f didn't download lbry://%s", converted_fee,
max_key_fee,
self.resolved_name)
log.warning(
"Key fee %f above limit of %f didn't download lbry://%s",
converted_fee, max_key_fee, self.resolved_name)
return defer.fail(KeyFeeAboveMaxAllowed())
log.info("Key fee %f below limit of %f, downloading lbry://%s", converted_fee,
max_key_fee,
self.resolved_name)
log.info(
"Key fee %f below limit of %f, downloading lbry://%s",
converted_fee, max_key_fee, self.resolved_name)
self.checker.start(1)
self.d.addCallback(lambda _: _set_status(None, DOWNLOAD_METADATA_CODE))
self.d.addCallback(lambda _: download_sd_blob(self.session, self.stream_hash, self.payment_rate_manager))
self.d.addCallback(lambda _: download_sd_blob(
self.session, self.stream_hash, self.payment_rate_manager))
self.d.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
self.d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE))
self.d.addCallback(get_downloader_factory)
@ -145,7 +146,8 @@ class GetStream(object):
self.download_path = os.path.join(downloader.download_directory, downloader.file_name)
d = self._pay_key_fee()
d.addCallback(lambda _: log.info("Downloading %s --> %s", self.stream_hash, self.downloader.file_name))
d.addCallback(lambda _: log.info(
"Downloading %s --> %s", self.stream_hash, self.downloader.file_name))
d.addCallback(lambda _: self.downloader.start())
def _pay_key_fee(self):

View file

@ -59,7 +59,9 @@ class MarketFeed(object):
def _log_error(self, err):
log.error(err)
log.warning("There was a problem updating %s exchange rate information from %s", self.market, self.name)
log.warning(
"There was a problem updating %s exchange rate information from %s",
self.market, self.name)
def _update_price(self):
d = self._make_request()
@ -133,7 +135,8 @@ def get_default_market_feed(currency_pair):
class ExchangeRateManager(object):
def __init__(self):
reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
self.market_feeds = [get_default_market_feed(currency_pair) for currency_pair in CURRENCY_PAIRS]
self.market_feeds = [
get_default_market_feed(currency_pair) for currency_pair in CURRENCY_PAIRS]
def start(self):
log.info("Starting exchange rate manager")
@ -154,8 +157,10 @@ class ExchangeRateManager(object):
return amount * market.rate.spot
for market in self.market_feeds:
if market.rate.currency_pair[0] == from_currency:
return self.convert_currency(market.rate.currency_pair[1], to_currency, amount * market.rate.spot)
raise Exception('Unable to convert {} from {} to {}'.format(amount, from_currency, to_currency))
return self.convert_currency(
market.rate.currency_pair[1], to_currency, amount * market.rate.spot)
raise Exception(
'Unable to convert {} from {} to {}'.format(amount, from_currency, to_currency))
def fee_dict(self):
return {market: market.rate.as_dict() for market in self.market_feeds}
@ -168,11 +173,11 @@ class ExchangeRateManager(object):
else:
fee_in = fee
return FeeValidator({fee_in.currency_symbol:
{
'amount': self.convert_currency(fee_in.currency_symbol, "LBC", fee_in.amount),
'address': fee_in.address
}
return FeeValidator({
fee_in.currency_symbol: {
'amount': self.convert_currency(fee_in.currency_symbol, "LBC", fee_in.amount),
'address': fee_in.address
}
})
@ -204,7 +209,8 @@ class DummyExchangeRateManager(object):
def __init__(self, rates):
self.market_feeds = [DummyBTCLBCFeed(), DummyUSDBTCFeed()]
for feed in self.market_feeds:
feed.rate = ExchangeRate(feed.market, rates[feed.market]['spot'], rates[feed.market]['ts'])
feed.rate = ExchangeRate(
feed.market, rates[feed.market]['spot'], rates[feed.market]['ts'])
def convert_currency(self, from_currency, to_currency, amount):
log.debug("Converting %f %s to %s" % (amount, from_currency, to_currency))
@ -213,7 +219,8 @@ class DummyExchangeRateManager(object):
return amount * market.rate.spot
for market in self.market_feeds:
if market.rate.currency_pair[0] == from_currency:
return self.convert_currency(market.rate.currency_pair[1], to_currency, amount * market.rate.spot)
return self.convert_currency(
market.rate.currency_pair[1], to_currency, amount * market.rate.spot)
def to_lbc(self, fee):
if fee is None:
@ -223,9 +230,9 @@ class DummyExchangeRateManager(object):
else:
fee_in = fee
return FeeValidator({fee_in.currency_symbol:
{
'amount': self.convert_currency(fee_in.currency_symbol, "LBC", fee_in.amount),
'address': fee_in.address
}
return FeeValidator({
fee_in.currency_symbol: {
'amount': self.convert_currency(fee_in.currency_symbol, "LBC", fee_in.amount),
'address': fee_in.address
}
})

View file

@ -68,7 +68,9 @@ class EncryptedFileStreamer(object):
return
if stream_status != STATUS_FINISHED:
self._deferred.addCallback(lambda _: task.deferLater(reactor, self.new_data_check_interval, self._check_for_new_data))
self._deferred.addCallback(
lambda _: task.deferLater(
reactor, self.new_data_check_interval, self._check_for_new_data))
else:
self.stopProducing()
@ -82,9 +84,12 @@ class EncryptedFileStreamer(object):
if data:
self._request.write(data)
if self._running: # .write() can trigger a pause
self._deferred.addCallback(lambda _: task.deferLater(reactor, self.stream_interval, self._check_for_new_data))
self._deferred.addCallback(
lambda _: task.deferLater(
reactor, self.stream_interval, self._check_for_new_data))
else:
self._deferred.addCallback(lambda _: self._file_manager.get_lbry_file_status(self._stream))
self._deferred.addCallback(
lambda _: self._file_manager.get_lbry_file_status(self._stream))
self._deferred.addCallback(_recurse_or_stop)
def pauseProducing(self):

View file

@ -122,7 +122,8 @@ class Publisher(object):
def set_status(self):
log.debug('Setting status')
d = self.lbry_file_manager.change_lbry_file_status(self.lbry_file, ManagedEncryptedFileDownloader.STATUS_FINISHED)
d = self.lbry_file_manager.change_lbry_file_status(
self.lbry_file, ManagedEncryptedFileDownloader.STATUS_FINISHED)
d.addCallback(lambda _: self.lbry_file.restore())
return d

View file

@ -73,10 +73,15 @@ class HostedEncryptedFile(resource.Resource):
d.addErrback(self._responseFailed, d)
return d
def is_valid_request_name(self, request):
return (
request.args['name'][0] != 'lbry' and
request.args['name'][0] not in self._api.waiting_on.keys())
def render_GET(self, request):
request.setHeader("Content-Security-Policy", "sandbox")
if 'name' in request.args.keys():
if request.args['name'][0] != 'lbry' and request.args['name'][0] not in self._api.waiting_on.keys():
if self.is_valid_request_name(request):
d = self._api._download_name(request.args['name'][0])
d.addCallback(lambda stream: self._make_stream_producer(request, stream))
elif request.args['name'][0] in self._api.waiting_on.keys():

View file

@ -167,31 +167,30 @@ class UIManager(object):
else:
c = None
if c:
log_msg = "Local version %s of %s does not meet UI requirement for version %s"
if self.requirements[r]['operator'] == '==':
if not self.requirements[r]['version'] == c:
passed_requirements = False
log.info("Local version %s of %s does not meet UI requirement for version %s" % (
c, r, self.requirements[r]['version']))
log.info(log_msg, c, r, self.requirements[r]['version'])
else:
log.info("Local version of %s meets ui requirement" % r)
if self.requirements[r]['operator'] == '>=':
if not self.requirements[r]['version'] <= c:
passed_requirements = False
log.info("Local version %s of %s does not meet UI requirement for version %s" % (
c, r, self.requirements[r]['version']))
log.info(log_msg, c, r, self.requirements[r]['version'])
else:
log.info("Local version of %s meets ui requirement" % r)
if self.requirements[r]['operator'] == '<=':
if not self.requirements[r]['version'] >= c:
passed_requirements = False
log.info("Local version %s of %s does not meet UI requirement for version %s" % (
c, r, self.requirements[r]['version']))
log.info(log_msg, c, r, self.requirements[r]['version'])
else:
log.info("Local version of %s meets ui requirement" % r)
return defer.succeed(passed_requirements)
def _disp_failure():
log.info("Failed to satisfy requirements for branch '%s', update was not loaded" % self.branch)
log.info("Failed to satisfy requirements for branch '%s', update was not loaded",
self.branch)
return defer.succeed(False)
def _do_migrate():
@ -202,7 +201,11 @@ class UIManager(object):
log.info("Loaded UI update")
f = open(self.config, "w")
loaded_ui = {'commit': self.git_version, 'branch': self.branch, 'requirements': self.requirements}
loaded_ui = {
'commit': self.git_version,
'branch': self.branch,
'requirements': self.requirements
}
f.write(json.dumps(loaded_ui))
f.close()

View file

@ -98,8 +98,12 @@ class AuthAPIClient(object):
return response['result']
@classmethod
def config(cls, key_name=None, key=None, pw_path=None, timeout=HTTP_TIMEOUT, connection=None, count=0,
service=None, cookies=None, auth=None, url=None, login_url=None):
def config(cls, key_name=None, key=None, pw_path=None,
timeout=HTTP_TIMEOUT,
connection=None, count=0,
service=None,
cookies=None, auth=None,
url=None, login_url=None):
api_key_name = API_KEY_NAME if not key_name else key_name
pw_path = os.path.join(settings.data_dir, ".api_keys") if not pw_path else pw_path
@ -153,7 +157,8 @@ class AuthAPIClient(object):
assert cookies.get(LBRY_SECRET, False), "Missing cookie"
secret = cookies.get(LBRY_SECRET)
api_key = APIKey(secret, api_key_name)
return cls(api_key, timeout, conn, id_count, service, cookies, auth_header, url, service_url)
return cls(api_key, timeout, conn, id_count, service, cookies,
auth_header, url, service_url)
class LBRYAPIClient(object):

View file

@ -61,30 +61,34 @@ class AuthorizedBase(object):
class AuthJSONRPCServer(AuthorizedBase):
"""
Authorized JSONRPC server used as the base class for the LBRY API
"""Authorized JSONRPC server used as the base class for the LBRY API
API methods are named with a leading "jsonrpc_"
Decorators:
@AuthJSONRPCServer.auth_required: this requires the client include a valid hmac authentication token in their
request
@AuthJSONRPCServer.subhandler: include the tagged method in the processing of requests, to allow inheriting
classes to modify request handling. Tagged methods will be passed the request
object, and return True when finished to indicate success
@AuthJSONRPCServer.auth_required: this requires the client
include a valid hmac authentication token in their request
@AuthJSONRPCServer.subhandler: include the tagged method in
the processing of requests, to allow inheriting classes to
modify request handling. Tagged methods will be passed the
request object, and return True when finished to indicate
success
Attributes:
allowed_during_startup (list): list of api methods that are callable before the server has finished
startup
allowed_during_startup (list): list of api methods that are
callable before the server has finished startup
sessions (dict): dictionary of active session_id: lbrynet.lbrynet_daemon.auth.util.APIKey values
sessions (dict): dictionary of active session_id:
lbrynet.lbrynet_daemon.auth.util.APIKey values
authorized_functions (list): list of api methods that require authentication
subhandlers (list): list of subhandlers
callable_methods (dict): dictionary of api_callable_name: method values
"""
implements(resource.IResource)
@ -167,7 +171,8 @@ class AuthJSONRPCServer(AuthorizedBase):
self._verify_token(session_id, parsed, token)
except InvalidAuthenticationToken as err:
log.warning("API validation failed")
self._render_error(err, request, version=version, response_code=AuthJSONRPCServer.UNAUTHORIZED)
self._render_error(
err, request, version=version, response_code=AuthJSONRPCServer.UNAUTHORIZED)
return server.NOT_DONE_YET
self._update_session_secret(session_id)
reply_with_next_secret = True
@ -305,7 +310,8 @@ class AuthJSONRPCServer(AuthorizedBase):
result_for_return = (result_for_return,)
# Convert the result (python) to JSON-RPC
try:
encoded_message = jsonrpclib.dumps(result_for_return, version=version, default=default_decimal)
encoded_message = jsonrpclib.dumps(
result_for_return, version=version, default=default_decimal)
self._set_headers(request, encoded_message, auth_required)
self._render_message(request, encoded_message)
except Exception as err:

View file

@ -12,14 +12,19 @@ class migrator(object):
def start(self):
def _resolve_claims(claimtrie):
claims = [i for i in claimtrie if 'txid' in i.keys()]
r = defer.DeferredList([self._api._resolve_name(claim['name'], force_refresh=True) for claim in claims], consumeErrors=True)
r = defer.DeferredList(
[self._api._resolve_name(claim['name'], force_refresh=True) for claim in claims],
consumeErrors=True)
return r
def _restart_lbry_files():
def _restart_lbry_file(lbry_file):
return lbry_file.restore()
r = defer.DeferredList([_restart_lbry_file(lbry_file) for lbry_file in self._api.lbry_file_manager.lbry_files if not lbry_file.txid], consumeErrors=True)
lbry_files = self._api.lbry_file_manager.lbry_files
r = defer.DeferredList(
[_restart_lbry_file(lbry_file) for lbry_file in lbry_files if not lbry_file.txid],
consumeErrors=True)
return r
d = self._api.session.wallet.get_nametrie()

View file

@ -50,13 +50,15 @@ class StructuredDict(dict):
def migrate(self, target_version=None):
if target_version:
assert self._versions.index(target_version) > self.versions.index(self.version), "Current version is above target version"
assert self._versions.index(target_version) > self.versions.index(self.version), \
"Current version is above target version"
for version, schema, migration in self._upgrade_version_range(self.version, target_version):
migration(self)
try:
self.validate(version)
except ValidationError as e:
raise ValidationError, "Could not migrate to version %s due to validation error: %s" % (version, e.message)
raise ValidationError(
"Could not migrate to version %s due to validation error: %s" %
(version, e.message))
self.version = version

View file

@ -46,7 +46,9 @@ def get_body_from_request(path, data):
jsondata = FileBodyProducer(StringIO(json.dumps(data)))
agent = Agent(reactor)
d = agent.request('POST', settings.pointtrader_server + path, Headers({'Content-Type': ['application/json']}), jsondata)
d = agent.request(
'POST', settings.pointtrader_server + path,
Headers({'Content-Type': ['application/json']}), jsondata)
d.addCallback(get_body)
return d

View file

@ -104,7 +104,8 @@ class BlobReflectorClient(Protocol):
def start_transfer(self):
self.sent_blobs = True
self.write(json.dumps({}))
assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer"
assert self.read_handle is not None, \
"self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, self)
return d

View file

@ -168,7 +168,8 @@ class EncryptedFileReflectorClient(Protocol):
def start_transfer(self):
self.write(json.dumps({}))
assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer"
assert self.read_handle is not None, \
"self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, self)
return d

View file

@ -46,6 +46,7 @@ def _catch_error(err, uri):
def check_and_restore_availability(lbry_file, reflector_server):
d = _check_if_reflector_has_stream(lbry_file, reflector_server)
d.addCallbacks(lambda send_stream: _reflect_if_unavailable(send_stream, lbry_file, reflector_server),
lambda err: _catch_error(err, lbry_file.uri))
d.addCallbacks(
lambda send_stream: _reflect_if_unavailable(send_stream, lbry_file, reflector_server),
lambda err: _catch_error(err, lbry_file.uri))
return d

View file

@ -87,7 +87,7 @@ class ReflectorServer(Protocol):
return {'send_blob': False}
else:
self.incoming_blob = blob
self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer)
self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer) # pylint: disable=line-too-long
self.blob_finished_d.addCallback(lambda _: self.blob_manager.blob_completed(blob))
return {'send_blob': True}

View file

@ -42,7 +42,8 @@ def main():
parser = argparse.ArgumentParser(description="Launch a dht node which responds to rpc commands")
parser.add_argument("node_port",
help="The UDP port on which the node will listen for connections from other dht nodes",
help=("The UDP port on which the node will listen for connections "
"from other dht nodes"),
type=int)
parser.add_argument("rpc_port",
help="The TCP port on which the node will listen for rpc commands",

View file

@ -55,7 +55,8 @@ class LBRYDaemonApp(AppKit.NSApplication):
def openui_(self, sender):
webbrowser.open(settings.UI_ADDRESS)
# this code is from the example https://pythonhosted.org/pyobjc/examples/Cocoa/Twisted/WebServicesTool/index.html
# this code is from the example
# https://pythonhosted.org/pyobjc/examples/Cocoa/Twisted/WebServicesTool/index.html
def applicationShouldTerminate_(self, sender):
if reactor.running:
log.info('Stopping twisted event loop')

View file

@ -12,7 +12,8 @@ def LBRYNotify(message):
notification.setInformativeText_(message)
notification.setUserInfo_({})
notification.setSoundName_("NSUserNotificationDefaultSoundName")
notification.setDeliveryDate_(Foundation.NSDate.dateWithTimeInterval_sinceDate_(0, Foundation.NSDate.date()))
notification.setDeliveryDate_(
Foundation.NSDate.dateWithTimeInterval_sinceDate_(0, Foundation.NSDate.date()))
NSUserNotificationCenter.defaultUserNotificationCenter().scheduleNotification_(notification)
@ -25,5 +26,6 @@ def notify(title, subtitle, info_text, delay=0, sound=False, userInfo=None):
notification.setUserInfo_(userInfo)
if sound:
notification.setSoundName_("NSUserNotificationDefaultSoundName")
notification.setDeliveryDate_(Foundation.NSDate.dateWithTimeInterval_sinceDate_(delay, Foundation.NSDate.date()))
notification.setDeliveryDate_(
Foundation.NSDate.dateWithTimeInterval_sinceDate_(delay, Foundation.NSDate.date()))
NSUserNotificationCenter.defaultUserNotificationCenter().scheduleNotification_(notification)

View file

@ -60,8 +60,7 @@ pip install -r requirements.txt
pip install pyOpenSSL
pip install pylint
pylint -E --disable=inherit-non-class --disable=no-member --ignored-modules=distutils \
--enable=unused-import --enable=bad-whitespace lbrynet packaging/osx/lbry-osx-app/lbrygui/
./run_pylint.sh packaging/osx/lbry-osx-app/lbrygui/
python setup.py install

View file

@ -43,7 +43,4 @@ pip install mock pylint coveralls
coverage run --source=lbrynet `which trial` tests
coveralls
# Ignoring distutils because: https://github.com/PyCQA/pylint/issues/73
# TODO: as code quality improves, make pylint be more strict
pylint -E --disable=inherit-non-class --disable=no-member --ignored-modules=distutils \
--enable=unused-import --enable=bad-whitespace lbrynet
./run_pylint.sh

View file

@ -201,7 +201,8 @@ class SysTrayIcon(object):
# First load the icon.
ico_x = win32api.GetSystemMetrics(win32con.SM_CXSMICON)
ico_y = win32api.GetSystemMetrics(win32con.SM_CYSMICON)
hicon = win32gui.LoadImage(0, icon, win32con.IMAGE_ICON, ico_x, ico_y, win32con.LR_LOADFROMFILE)
hicon = win32gui.LoadImage(
0, icon, win32con.IMAGE_ICON, ico_x, ico_y, win32con.LR_LOADFROMFILE)
hdcBitmap = win32gui.CreateCompatibleDC(0)
hdcScreen = win32gui.GetDC(0)

View file

@ -2,5 +2,5 @@ C:\Python27\Scripts\pip.exe install mock
C:\Python27\Scripts\pip.exe install pylint
C:\Python27\python.exe C:\Python27\Scripts\trial.py C:\projects\lbry\tests\unit
if ($LastExitCode -ne 0) { $host.SetShouldExit($LastExitCode) }
C:\Python27\Scripts\pylint.exe -E --disable=inherit-non-class --disable=no-member --ignored-modules=distutils --enable=unused-import --enable=bad-whitespace lbrynet packaging/windows/lbry-win32-app/LBRYWin32App.py
C:\Python27\Scripts\pylint.exe -E --disable=inherit-non-class --disable=no-member --ignored-modules=distutils --enable=unused-import --enable=bad-whitespace --enable=line-too-long lbrynet packaging/windows/lbry-win32-app/LBRYWin32App.py
if ($LastExitCode -ne 0) { $host.SetShouldExit($LastExitCode) }

11
run_pylint.sh Executable file
View file

@ -0,0 +1,11 @@
#! /bin/bash
# Ignoring distutils because: https://github.com/PyCQA/pylint/issues/73
# TODO: as code quality improves, make pylint be more strict
pylint -E --disable=inherit-non-class --disable=no-member \
--ignored-modules=distutils \
--enable=unused-import \
--enable=bad-whitespace \
--enable=line-too-long \
lbrynet $@