Use named loggers, fix some pep8 formatting mistakes

This commit is contained in:
Jimmy Kiselak 2015-09-08 15:42:56 -04:00
parent 6f40ef6015
commit bb5cd49327
46 changed files with 407 additions and 270 deletions

View file

@ -0,0 +1,4 @@
import logging
logging.getLogger(__name__).addHandler(logging.NullHandler())

View file

@ -2,7 +2,7 @@ import logging
import os
import time
import sqlite3
from twisted.internet import threads, defer, reactor, task
from twisted.internet import threads, defer
from twisted.python.failure import Failure
from twisted.enterprise import adbapi
from lbrynet.core.HashBlob import BlobFile, TempBlob, BlobFileCreator, TempBlobCreator
@ -13,6 +13,9 @@ from lbrynet.core.Error import NoSuchBlobError
from lbrynet.core.sqlite_helpers import rerun_if_locked
log = logging.getLogger(__name__)
class BlobManager(DHTHashSupplier):
"""This class is subclassed by classes which keep track of which blobs are available
and which give access to new/existing blobs"""
@ -137,7 +140,7 @@ class DiskBlobManager(BlobManager):
return self._get_blobs_to_announce(next_announce_time)
def creator_finished(self, blob_creator):
logging.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash)
log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash)
assert blob_creator.blob_hash is not None
assert blob_creator.blob_hash not in self.blobs
assert blob_creator.length is not None
@ -188,7 +191,7 @@ class DiskBlobManager(BlobManager):
return b_h
def set_not_deleting(err, b_h):
logging.warning("Failed to delete blob %s. Reason: %s", str(b_h), err.getErrorMessage())
log.warning("Failed to delete blob %s. Reason: %s", str(b_h), err.getErrorMessage())
self.blob_hashes_to_delete[b_h] = False
return err
@ -200,7 +203,7 @@ class DiskBlobManager(BlobManager):
d = defer.succeed(True)
def log_error(err):
logging.warning("Failed to delete completed blobs from the db: %s", err.getErrorMessage())
log.warning("Failed to delete completed blobs from the db: %s", err.getErrorMessage())
d.addErrback(log_error)
return d
@ -238,7 +241,7 @@ class DiskBlobManager(BlobManager):
@rerun_if_locked
def _add_completed_blob(self, blob_hash, length, timestamp, next_announce_time=None):
logging.debug("Adding a completed blob. blob_hash=%s, length=%s", blob_hash, str(length))
log.debug("Adding a completed blob. blob_hash=%s, length=%s", blob_hash, str(length))
if next_announce_time is None:
next_announce_time = timestamp
d = self.db_conn.runQuery("insert into blobs values (?, ?, ?, ?)",
@ -502,7 +505,7 @@ class TempBlobManager(BlobManager):
d = self._delete_blobs_marked_for_deletion()
def set_next_manage_call():
logging.info("Setting the next manage call in %s", str(self))
log.info("Setting the next manage call in %s", str(self))
self._next_manage_call = reactor.callLater(1, self._manage)
d.addCallback(lambda _: set_next_manage_call())
@ -511,11 +514,11 @@ class TempBlobManager(BlobManager):
def remove_from_list(b_h):
del self.blob_hashes_to_delete[b_h]
logging.info("Deleted blob %s", blob_hash)
log.info("Deleted blob %s", blob_hash)
return b_h
def set_not_deleting(err, b_h):
logging.warning("Failed to delete blob %s. Reason: %s", str(b_h), err.getErrorMessage())
log.warning("Failed to delete blob %s. Reason: %s", str(b_h), err.getErrorMessage())
self.blob_hashes_to_delete[b_h] = False
return b_h
@ -524,7 +527,7 @@ class TempBlobManager(BlobManager):
if being_deleted is False:
if blob_hash in self.blobs:
self.blob_hashes_to_delete[blob_hash] = True
logging.info("Found a blob marked for deletion: %s", blob_hash)
log.info("Found a blob marked for deletion: %s", blob_hash)
blob = self.blobs[blob_hash]
d = blob.delete()
@ -535,6 +538,6 @@ class TempBlobManager(BlobManager):
else:
remove_from_list(blob_hash)
d = defer.fail(Failure(NoSuchBlobError(blob_hash)))
logging.warning("Blob %s cannot be deleted because it is unknown")
log.warning("Blob %s cannot be deleted because it is unknown")
ds.append(d)
return defer.DeferredList(ds)

View file

@ -13,6 +13,9 @@ from lbrynet.core.Error import DownloadCanceledError, InvalidDataError
from lbrynet.core.cryptoutils import get_lbry_hash_obj
log = logging.getLogger(__name__)
class HashBlobReader(object):
implements(interfaces.IConsumer)
@ -84,7 +87,7 @@ class HashBlob(object):
if self.length is None and 0 <= length <= BLOB_SIZE:
self.length = length
return True
logging.warning("Got an invalid length. Previous length: %s, Invalid length: %s", str(self.length), str(length))
log.warning("Got an invalid length. Previous length: %s, Invalid length: %s", str(self.length), str(length))
return False
def get_length(self):
@ -126,8 +129,8 @@ class HashBlob(object):
finished_deferred.callback(self)
del self.writers[p]
return True
logging.warning("Somehow, the writer that was accepted as being valid was already removed. writer: %s",
str(writer))
log.warning("Somehow, the writer that was accepted as being valid was already removed. writer: %s",
str(writer))
return False
def errback_finished_deferred(err):
@ -199,14 +202,14 @@ class BlobFile(HashBlob):
def open_for_writing(self, peer):
if not peer in self.writers:
logging.debug("Opening %s to be written by %s", str(self), str(peer))
log.debug("Opening %s to be written by %s", str(self), str(peer))
write_file = tempfile.NamedTemporaryFile(delete=False, dir=self.blob_dir)
finished_deferred = defer.Deferred()
writer = HashBlobWriter(write_file, self.get_length, self.writer_finished)
self.writers[peer] = (writer, finished_deferred)
return finished_deferred, writer.write, writer.cancel
logging.warning("Tried to download the same file twice simultaneously from the same peer")
log.warning("Tried to download the same file twice simultaneously from the same peer")
return None, None, None
def open_for_reading(self):
@ -232,7 +235,7 @@ class BlobFile(HashBlob):
d = threads.deferToThread(delete_from_file_system)
def log_error(err):
logging.warning("An error occurred deleting %s: %s", str(self.file_path), err.getErrorMessage())
log.warning("An error occurred deleting %s: %s", str(self.file_path), err.getErrorMessage())
return err
d.addErrback(log_error)
@ -247,7 +250,7 @@ class BlobFile(HashBlob):
def _close_writer(self, writer):
if writer.write_handle is not None:
logging.debug("Closing %s", str(self))
log.debug("Closing %s", str(self))
name = writer.write_handle.name
writer.write_handle.close()
threads.deferToThread(os.remove, name)

View file

@ -16,6 +16,9 @@ import time
import os
log = logging.getLogger(__name__)
class ReservedPoints(object):
def __init__(self, identifier, amount):
self.identifier = identifier
@ -53,7 +56,7 @@ class LBRYcrdWallet(object):
if self.start_lbrycrdd is True:
self._start_daemon()
self._get_info()
logging.info("Connected!")
log.info("Connected!")
def start_manage():
self.stopped = False
@ -67,7 +70,7 @@ class LBRYcrdWallet(object):
def stop(self):
def log_stop_error(err):
logging.error("An error occurred stopping the wallet. %s", err.getTraceback())
log.error("An error occurred stopping the wallet. %s", err.getTraceback())
self.stopped = True
# If self.next_manage_call is None, then manage is currently running or else
@ -84,7 +87,7 @@ class LBRYcrdWallet(object):
return d
def manage(self):
logging.info("Doing manage")
log.info("Doing manage")
self.next_manage_call = None
have_set_manage_running = [False]
@ -123,7 +126,7 @@ class LBRYcrdWallet(object):
d.addCallback(lambda _: set_next_manage_call())
def log_error(err):
logging.error("Something went wrong during manage. Error message: %s", err.getErrorMessage())
log.error("Something went wrong during manage. Error message: %s", err.getErrorMessage())
return err
d.addErrback(log_error)
@ -191,8 +194,8 @@ class LBRYcrdWallet(object):
self.queued_payments[self.peer_addresses[peer]] += rounded_amount
# make any unused points available
self.total_reserved_points -= (reserved_points.amount - rounded_amount)
logging.info("ordering that %s points be sent to %s", str(rounded_amount),
str(self.peer_addresses[peer]))
log.info("ordering that %s points be sent to %s", str(rounded_amount),
str(self.peer_addresses[peer]))
peer.update_stats('points_sent', amount)
return defer.succeed(True)
@ -201,7 +204,7 @@ class LBRYcrdWallet(object):
rounded_amount = Decimal(str(round(amount, 8)))
assert(peer in self.current_address_given_to_peer)
address = self.current_address_given_to_peer[peer]
logging.info("expecting a payment at address %s in the amount of %s", str(address), str(rounded_amount))
log.info("expecting a payment at address %s in the amount of %s", str(address), str(rounded_amount))
self.expected_balances[address] += rounded_amount
expected_balance = self.expected_balances[address]
expected_time = datetime.datetime.now() + self.max_expected_payment_time
@ -286,12 +289,12 @@ class LBRYcrdWallet(object):
break
except (socket.error, JSONRPCException):
tries += 1
logging.warning("Failed to connect to lbrycrdd.")
log.warning("Failed to connect to lbrycrdd.")
if tries < 5:
time.sleep(2 ** tries)
logging.warning("Trying again in %d seconds", 2 ** tries)
log.warning("Trying again in %d seconds", 2 ** tries)
else:
logging.warning("Giving up.")
log.warning("Giving up.")
else:
self.lbrycrdd.terminate()
raise ValueError("Couldn't open lbrycrdd")
@ -338,10 +341,10 @@ class LBRYcrdWallet(object):
peer.update_score(balance[5])
peer.update_stats('points_received', balance[5])
else:
logging.warning("Something went wrong checking a balance. Peer: %s, account: %s,"
"expected balance: %s, expected time: %s, count: %s, error: %s",
str(balance[0]), str(balance[1]), str(balance[2]), str(balance[3]),
str(balance[4]), str(result.getErrorMessage()))
log.warning("Something went wrong checking a balance. Peer: %s, account: %s,"
"expected balance: %s, expected time: %s, count: %s, error: %s",
str(balance[0]), str(balance[1]), str(balance[2]), str(balance[3]),
str(balance[4]), str(result.getErrorMessage()))
dl.addCallback(handle_checks)
return dl
@ -349,14 +352,14 @@ class LBRYcrdWallet(object):
@_catch_connection_error
def _check_expected_balance(self, expected_balance):
rpc_conn = self._get_rpc_conn()
logging.info("Checking balance of address %s", str(expected_balance[1]))
log.info("Checking balance of address %s", str(expected_balance[1]))
balance = rpc_conn.getreceivedbyaddress(expected_balance[1])
logging.debug("received balance: %s", str(balance))
logging.debug("expected balance: %s", str(expected_balance[2]))
log.debug("received balance: %s", str(balance))
log.debug("expected balance: %s", str(expected_balance[2]))
return balance >= expected_balance[2]
def _send_payments(self):
logging.info("Trying to send payments, if there are any to be sent")
log.info("Trying to send payments, if there are any to be sent")
def do_send(payments):
rpc_conn = self._get_rpc_conn()
@ -364,15 +367,15 @@ class LBRYcrdWallet(object):
payments_to_send = {}
for address, points in self.queued_payments.items():
logging.info("Should be sending %s points to %s", str(points), str(address))
log.info("Should be sending %s points to %s", str(points), str(address))
payments_to_send[address] = float(points)
self.total_reserved_points -= points
self.wallet_balance -= points
del self.queued_payments[address]
if payments_to_send:
logging.info("Creating a transaction with outputs %s", str(payments_to_send))
log.info("Creating a transaction with outputs %s", str(payments_to_send))
return threads.deferToThread(do_send, payments_to_send)
logging.info("There were no payments to send")
log.info("There were no payments to send")
return defer.succeed(True)
@_catch_connection_error
@ -439,8 +442,8 @@ class LBRYcrdAddressRequester(object):
def _request_failed(self, err, peer):
if not err.check(RequestCanceledError):
logging.warning("A peer failed to send a valid public key response. Error: %s, peer: %s",
err.getErrorMessage(), str(peer))
log.warning("A peer failed to send a valid public key response. Error: %s, peer: %s",
err.getErrorMessage(), str(peer))
#return err
@ -490,7 +493,7 @@ class LBRYcrdAddressQueryHandler(object):
d.addCallback(create_response)
return d
if self.address is None:
logging.warning("Expected a request for an address, but did not receive one")
log.warning("Expected a request for an address, but did not receive one")
return defer.fail(Failure(ValueError("Expected but did not receive an address request")))
else:
return defer.succeed({})

View file

@ -15,6 +15,9 @@ from twisted.python.failure import Failure
from lbrynet.core.LBRYcrdWallet import ReservedPoints
log = logging.getLogger(__name__)
class PTCWallet(object):
"""This class sends payments to peers and also ensures that expected payments are received.
This class is only intended to be used for testing."""
@ -160,8 +163,8 @@ class PTCWallet(object):
ds.append(d)
del self.queued_payments[peer]
else:
logging.warning("Don't have a payment address for peer %s. Can't send %s points.",
str(peer), str(points))
log.warning("Don't have a payment address for peer %s. Can't send %s points.",
str(peer), str(points))
return defer.DeferredList(ds)
def get_balance(self):
@ -207,8 +210,8 @@ class PTCWallet(object):
if self.peer_pub_keys[peer] in self.received_payments:
received_balance = sum([a[0] for a in self.received_payments[self.peer_pub_keys[peer]]])
if min_expected_balance > received_balance:
logging.warning("Account in bad standing: %s (pub_key: %s), expected amount = %s, received_amount = %s",
str(peer), self.peer_pub_keys[peer], str(min_expected_balance), str(received_balance))
log.warning("Account in bad standing: %s (pub_key: %s), expected amount = %s, received_amount = %s",
str(peer), self.peer_pub_keys[peer], str(min_expected_balance), str(received_balance))
def _open_db(self):
def open_db():
@ -261,8 +264,8 @@ class PointTraderKeyExchanger(object):
def _request_failed(self, err, peer):
if not err.check(RequestCanceledError):
logging.warning("A peer failed to send a valid public key response. Error: %s, peer: %s",
err.getErrorMessage(), str(peer))
log.warning("A peer failed to send a valid public key response. Error: %s, peer: %s",
err.getErrorMessage(), str(peer))
#return err
@ -306,15 +309,15 @@ class PointTraderKeyQueryHandler(object):
try:
RSA.importKey(new_encoded_pub_key)
except (ValueError, TypeError, IndexError):
logging.warning("Client sent an invalid public key.")
log.warning("Client sent an invalid public key.")
return defer.fail(Failure(ValueError("Client sent an invalid public key")))
self.public_key = new_encoded_pub_key
self.wallet.set_public_key_for_peer(self.peer, self.public_key)
logging.debug("Received the client's public key: %s", str(self.public_key))
log.debug("Received the client's public key: %s", str(self.public_key))
fields = {'public_key': self.wallet.encoded_public_key}
return defer.succeed(fields)
if self.public_key is None:
logging.warning("Expected a public key, but did not receive one")
log.warning("Expected a public key, but did not receive one")
return defer.fail(Failure(ValueError("Expected but did not receive a public key")))
else:
return defer.succeed({})

View file

@ -13,6 +13,9 @@ from lbrynet.core.PaymentRateManager import BasePaymentRateManager
from twisted.internet import threads, defer
log = logging.getLogger(__name__)
class LBRYSession(object):
"""This class manages all important services common to any application that uses the network:
the hash announcer, which informs other peers that this peer is associated with some hash. Usually,
@ -104,7 +107,7 @@ class LBRYSession(object):
def setup(self):
"""Create the blob directory and database if necessary, start all desired services"""
logging.debug("Setting up the lbry session")
log.debug("Setting up the lbry session")
if self.lbryid is None:
self.lbryid = generate_id()
@ -124,7 +127,7 @@ class LBRYSession(object):
d.addCallback(lambda _: self._setup_dht())
else:
if self.hash_announcer is None and self.peer_port is not None:
logging.warning("The server has no way to advertise its available blobs.")
log.warning("The server has no way to advertise its available blobs.")
self.hash_announcer = DummyHashAnnouncer()
d.addCallback(lambda _: self._setup_other_components())
@ -146,11 +149,11 @@ class LBRYSession(object):
def _try_upnp(self):
logging.debug("In _try_upnp")
log.debug("In _try_upnp")
def threaded_try_upnp():
if self.use_upnp is False:
logging.debug("Not using upnp")
log.debug("Not using upnp")
return False
u = miniupnpc.UPnP()
num_devices_found = u.discover()
@ -169,7 +172,7 @@ class LBRYSession(object):
return False
def upnp_failed(err):
logging.warning("UPnP failed. Reason: %s", err.getErrorMessage())
log.warning("UPnP failed. Reason: %s", err.getErrorMessage())
return False
d = threads.deferToThread(threaded_try_upnp)
@ -180,7 +183,7 @@ class LBRYSession(object):
from twisted.internet import reactor
logging.debug("Starting the dht")
log.debug("Starting the dht")
def match_port(h, p):
return h, p
@ -216,7 +219,7 @@ class LBRYSession(object):
return dl
def _setup_other_components(self):
logging.debug("Setting up the rest of the components")
log.debug("Setting up the rest of the components")
if self.rate_limiter is None:
self.rate_limiter = RateLimiter()

View file

@ -3,6 +3,9 @@ from twisted.internet import interfaces, defer
from zope.interface import implements
log = logging.getLogger(__name__)
class StreamCreator(object):
"""Classes which derive from this class create a 'stream', which can be any
collection of associated blobs and associated metadata. These classes
@ -42,7 +45,7 @@ class StreamCreator(object):
def stop(self):
"""Stop creating the stream. Create the terminating zero-length blob."""
logging.debug("stop has been called for StreamCreator")
log.debug("stop has been called for StreamCreator")
self.stopped = True
if self.current_blob is not None:
current_blob = self.current_blob

View file

@ -6,6 +6,9 @@ from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloade
from lbrynet.core.Error import UnknownStreamTypeError, InvalidStreamDescriptorError
log = logging.getLogger(__name__)
class StreamDescriptorReader(object):
"""Classes which derive from this class read a stream descriptor file return
a dictionary containing the fields in the file"""
@ -82,7 +85,7 @@ class PlainStreamDescriptorWriter(StreamDescriptorWriter):
def _write_stream_descriptor(self, raw_data):
def write_file():
logging.debug("Writing the sd file to disk")
log.debug("Writing the sd file to disk")
with open(self.sd_file_name, 'w') as sd_file:
sd_file.write(raw_data)
return self.sd_file_name
@ -97,10 +100,10 @@ class BlobStreamDescriptorWriter(StreamDescriptorWriter):
self.blob_manager = blob_manager
def _write_stream_descriptor(self, raw_data):
logging.debug("Creating the new blob for the stream descriptor")
log.debug("Creating the new blob for the stream descriptor")
blob_creator = self.blob_manager.get_blob_creator()
blob_creator.write(raw_data)
logging.debug("Wrote the data to the new blob")
log.debug("Wrote the data to the new blob")
return blob_creator.close()

View file

@ -10,6 +10,9 @@ from lbrynet.core.client.ClientRequest import ClientRequest, ClientBlobRequest
from lbrynet.interfaces import IRequestCreator
log = logging.getLogger(__name__)
class BlobRequester(object):
implements(IRequestCreator)
@ -85,7 +88,7 @@ class BlobRequester(object):
######### internal calls #########
def _download_succeeded(self, arg, peer, blob):
logging.info("Blob %s has been successfully downloaded from %s", str(blob), str(peer))
log.info("Blob %s has been successfully downloaded from %s", str(blob), str(peer))
self._update_local_score(peer, 5.0)
peer.update_stats('blobs_downloaded', 1)
peer.update_score(5.0)
@ -106,8 +109,8 @@ class BlobRequester(object):
def _handle_download_error(self, err, peer, blob_to_download):
if not err.check(DownloadCanceledError, PriceDisagreementError, RequestCanceledError):
logging.warning("An error occurred while downloading %s from %s. Error: %s",
blob_to_download.blob_hash, str(peer), err.getTraceback())
log.warning("An error occurred while downloading %s from %s. Error: %s",
blob_to_download.blob_hash, str(peer), err.getTraceback())
if err.check(PriceDisagreementError):
# Don't kill the whole connection just because a price couldn't be agreed upon.
# Other information might be desired by other request creators at a better rate.
@ -124,7 +127,7 @@ class BlobRequester(object):
else:
blob_hash = blobs_without_sources[0].blob_hash
r = blob_hash
logging.debug("Blob requester peer search response: %s", str(r))
log.debug("Blob requester peer search response: %s", str(r))
return defer.succeed(r)
def _find_peers_for_hash(self, h):
@ -202,7 +205,7 @@ class BlobRequester(object):
request = ClientBlobRequest(request_dict, response_identifier, counting_write_func, d,
cancel_func, blob_to_download)
logging.info("Requesting blob %s from %s", str(blob_to_download), str(peer))
log.info("Requesting blob %s from %s", str(blob_to_download), str(peer))
return request
def _price_settled(self, protocol):
@ -239,11 +242,11 @@ class BlobRequester(object):
def _handle_availability(self, response_dict, peer, request):
if not request.response_identifier in response_dict:
raise InvalidResponseError("response identifier not in response")
logging.debug("Received a response to the availability request")
log.debug("Received a response to the availability request")
blob_hashes = response_dict[request.response_identifier]
for blob_hash in blob_hashes:
if blob_hash in request.request_dict['requested_blobs']:
logging.debug("The server has indicated it has the following blob available: %s", blob_hash)
log.debug("The server has indicated it has the following blob available: %s", blob_hash)
self._available_blobs[peer].append(blob_hash)
if blob_hash in self._unavailable_blobs[peer]:
self._unavailable_blobs[peer].remove(blob_hash)
@ -298,8 +301,8 @@ class BlobRequester(object):
if reason.check(NoResponseError):
self._incompatible_peers.append(peer)
return
logging.warning("Blob requester: a request of type '%s' failed. Reason: %s, Error type: %s",
str(request_type), reason.getErrorMessage(), reason.type)
log.warning("Blob requester: a request of type '%s' failed. Reason: %s, Error type: %s",
str(request_type), reason.getErrorMessage(), reason.type)
self._update_local_score(peer, -10.0)
if isinstance(reason, InvalidResponseError):
peer.update_score(-10.0)

View file

@ -11,6 +11,9 @@ from lbrynet.interfaces import IRequestSender, IRateLimited
from zope.interface import implements
log = logging.getLogger(__name__)
class ClientProtocol(Protocol):
implements(IRequestSender, IRateLimited)
@ -39,7 +42,7 @@ class ClientProtocol(Protocol):
else:
self._response_buff += data
if len(self._response_buff) > MAX_RESPONSE_SIZE:
logging.warning("Response is too large. Size %s", len(self._response_buff))
log.warning("Response is too large. Size %s", len(self._response_buff))
self.transport.loseConnection()
response, extra_data = self._get_valid_response(self._response_buff)
if response is not None:
@ -55,7 +58,7 @@ class ClientProtocol(Protocol):
else:
err = reason
#if self._response_deferreds:
# logging.warning("Lost connection with active response deferreds. %s", str(self._response_deferreds))
# log.warning("Lost connection with active response deferreds. %s", str(self._response_deferreds))
for key, d in self._response_deferreds.items():
del self._response_deferreds[key]
d.errback(err)
@ -70,7 +73,7 @@ class ClientProtocol(Protocol):
return defer.fail(failure.Failure(ValueError("There is already a request for that response active")))
self._next_request.update(request.request_dict)
d = defer.Deferred()
logging.debug("Adding a request. Request: %s", str(request))
log.debug("Adding a request. Request: %s", str(request))
self._response_deferreds[request.response_identifier] = d
return d
@ -102,8 +105,8 @@ class ClientProtocol(Protocol):
######### Internal request handling #########
def _handle_request_error(self, err):
logging.error("An unexpected error occurred creating or sending a request to %s. Error message: %s",
str(self.peer), err.getTraceback())
log.error("An unexpected error occurred creating or sending a request to %s. Error message: %s",
str(self.peer), err.getTraceback())
self.transport.loseConnection()
def _ask_for_request(self):
@ -117,7 +120,7 @@ class ClientProtocol(Protocol):
self._send_request_message(request_msg)
else:
# The connection manager has indicated that this connection should be terminated
logging.info("Closing the connection to %s due to having no further requests to send", str(self.peer))
log.info("Closing the connection to %s due to having no further requests to send", str(self.peer))
self.transport.loseConnection()
d = self._connection_manager.get_next_request(self.peer, self)
@ -153,14 +156,14 @@ class ClientProtocol(Protocol):
# If an error gets to this point, log it and kill the connection.
if not err.check(MisbehavingPeerError, ConnectionClosedBeforeResponseError, DownloadCanceledError,
RequestCanceledError):
logging.error("The connection to %s is closing due to an unexpected error: %s", str(self.peer),
err.getErrorMessage())
log.error("The connection to %s is closing due to an unexpected error: %s", str(self.peer),
err.getErrorMessage())
if not err.check(RequestCanceledError):
self.transport.loseConnection()
def _handle_response(self, response):
ds = []
logging.debug("Handling a response. Current expected responses: %s", str(self._response_deferreds))
log.debug("Handling a response. Current expected responses: %s", str(self._response_deferreds))
for key, val in response.items():
if key in self._response_deferreds:
d = self._response_deferreds[key]
@ -183,7 +186,7 @@ class ClientProtocol(Protocol):
dl.addCallback(lambda _: self._ask_for_request())
def _downloading_finished(self, arg):
logging.debug("The blob has finished downloading")
log.debug("The blob has finished downloading")
self._blob_download_request = None
self._downloading_blob = False
return arg
@ -194,8 +197,8 @@ class ClientProtocol(Protocol):
# TODO: always be this way. it's done this way now because the client has no other way
# TODO: of telling the server it wants the download to stop. It would be great if the
# TODO: protocol had such a mechanism.
logging.info("Closing the connection to %s because the download of blob %s was canceled",
str(self.peer), str(self._blob_download_request.blob))
log.info("Closing the connection to %s because the download of blob %s was canceled",
str(self.peer), str(self._blob_download_request.blob))
#self.transport.loseConnection()
#return True
return err

View file

@ -7,6 +7,9 @@ from lbrynet.core.client.ClientProtocol import ClientProtocolFactory
from lbrynet.core.Error import InsufficientFundsError
log = logging.getLogger(__name__)
class PeerConnectionHandler(object):
def __init__(self, request_creators, factory):
self.request_creators = request_creators
@ -42,8 +45,8 @@ class ConnectionManager(object):
for peer in self._peer_connections.keys():
def close_connection(p):
logging.info("Abruptly closing a connection to %s due to downloading being paused",
str(p))
log.info("Abruptly closing a connection to %s due to downloading being paused",
str(p))
if self._peer_connections[p].factory.p is not None:
d = self._peer_connections[p].factory.p.cancel_requests()
@ -66,10 +69,10 @@ class ConnectionManager(object):
def get_next_request(self, peer, protocol):
logging.debug("Trying to get the next request for peer %s", str(peer))
log.debug("Trying to get the next request for peer %s", str(peer))
if not peer in self._peer_connections:
logging.debug("The peer has already been told to shut down.")
log.debug("The peer has already been told to shut down.")
return defer.succeed(False)
def handle_error(err):
@ -140,10 +143,10 @@ class ConnectionManager(object):
from twisted.internet import reactor
if peer is not None:
logging.debug("Trying to connect to %s", str(peer))
log.debug("Trying to connect to %s", str(peer))
factory = ClientProtocolFactory(peer, self.rate_limiter, self)
self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:],
factory)
factory)
connection = reactor.connectTCP(peer.host, peer.port, factory)
self._peer_connections[peer].connection = connection
@ -152,9 +155,9 @@ class ConnectionManager(object):
from twisted.internet import reactor
def get_new_peers(request_creators):
logging.debug("Trying to get a new peer to connect to")
log.debug("Trying to get a new peer to connect to")
if len(request_creators) > 0:
logging.debug("Got a creator to check: %s", str(request_creators[0]))
log.debug("Got a creator to check: %s", str(request_creators[0]))
d = request_creators[0].get_new_peers()
d.addCallback(lambda h: h if h is not None else get_new_peers(request_creators[1:]))
return d
@ -164,14 +167,14 @@ class ConnectionManager(object):
def pick_best_peer(peers):
# TODO: Eventually rank them based on past performance/reputation. For now
# TODO: just pick the first to which we don't have an open connection
logging.debug("Got a list of peers to choose from: %s", str(peers))
log.debug("Got a list of peers to choose from: %s", str(peers))
if peers is None:
return None
for peer in peers:
if not peer in self._peer_connections:
logging.debug("Got a good peer. Returning peer %s", str(peer))
log.debug("Got a good peer. Returning peer %s", str(peer))
return peer
logging.debug("Couldn't find a good peer to connect to")
log.debug("Couldn't find a good peer to connect to")
return None
if len(self._peer_connections) < MAX_CONNECTIONS_PER_STREAM:

View file

@ -5,6 +5,9 @@ from zope.interface import implements
from lbrynet import interfaces
log = logging.getLogger(__name__)
class DownloadManager(object):
implements(interfaces.IDownloadManager)
@ -24,7 +27,7 @@ class DownloadManager(object):
def start_downloading(self):
d = self.blob_info_finder.get_initial_blobs()
logging.debug("Requested the initial blobs from the info finder")
log.debug("Requested the initial blobs from the info finder")
d.addCallback(self.add_blobs_to_download)
d.addCallback(lambda _: self.resume_downloading())
return d
@ -33,7 +36,7 @@ class DownloadManager(object):
def check_start(result, manager):
if isinstance(result, failure.Failure):
logging.error("Failed to start the %s: %s", manager, result.getErrorMessage())
log.error("Failed to start the %s: %s", manager, result.getErrorMessage())
return False
return True
@ -49,7 +52,7 @@ class DownloadManager(object):
def check_stop(result, manager):
if isinstance(result, failure.Failure):
logging.error("Failed to stop the %s: %s", manager. result.getErrorMessage())
log.error("Failed to stop the %s: %s", manager. result.getErrorMessage())
return False
return True
@ -63,21 +66,21 @@ class DownloadManager(object):
def add_blobs_to_download(self, blob_infos):
logging.debug("Adding %s to blobs", str(blob_infos))
log.debug("Adding %s to blobs", str(blob_infos))
def add_blob_to_list(blob, blob_num):
self.blobs[blob_num] = blob
logging.info("Added blob (hash: %s, number %s) to the list", str(blob.blob_hash), str(blob_num))
log.info("Added blob (hash: %s, number %s) to the list", str(blob.blob_hash), str(blob_num))
def error_during_add(err):
logging.warning("An error occurred adding the blob to blobs. Error:%s", err.getErrorMessage())
log.warning("An error occurred adding the blob to blobs. Error:%s", err.getErrorMessage())
return err
ds = []
for blob_info in blob_infos:
if not blob_info.blob_num in self.blobs:
self.blob_infos[blob_info.blob_num] = blob_info
logging.debug("Trying to get the blob associated with blob hash %s", str(blob_info.blob_hash))
log.debug("Trying to get the blob associated with blob hash %s", str(blob_info.blob_hash))
d = self.blob_manager.get_blob(blob_info.blob_hash, self.upload_allowed, blob_info.length)
d.addCallback(add_blob_to_list, blob_info.blob_num)
d.addErrback(error_during_add)

View file

@ -8,6 +8,9 @@ from lbrynet.core.client.DownloadManager import DownloadManager
from twisted.internet import defer
log = logging.getLogger(__name__)
class SingleBlobMetadataHandler(object):
implements(interfaces.IMetadataHandler)
@ -18,7 +21,7 @@ class SingleBlobMetadataHandler(object):
######## IMetadataHandler #########
def get_initial_blobs(self):
logging.debug("Returning the blob info")
log.debug("Returning the blob info")
return defer.succeed([BlobInfo(self.blob_hash, 0, None)])
def final_blob_num(self):
@ -72,7 +75,7 @@ class SingleProgressManager(object):
from twisted.internet import reactor
logging.debug("The blob %s has been downloaded. Calling the finished callback", str(blob))
log.debug("The blob %s has been downloaded. Calling the finished callback", str(blob))
if self.finished is False:
self.finished = True
reactor.callLater(0, self.finished_callback, blob)

View file

@ -4,6 +4,9 @@ from twisted.internet import defer
from zope.interface import implements
log = logging.getLogger(__name__)
class StreamProgressManager(object):
implements(IProgressManager)
@ -61,18 +64,18 @@ class StreamProgressManager(object):
return defer.succeed(None)
def _finished_with_blob(self, blob_num):
logging.debug("In _finished_with_blob, blob_num = %s", str(blob_num))
log.debug("In _finished_with_blob, blob_num = %s", str(blob_num))
if self.delete_blob_after_finished is True:
logging.debug("delete_blob_after_finished is True")
log.debug("delete_blob_after_finished is True")
blobs = self.download_manager.blobs
if blob_num in blobs:
logging.debug("Telling the blob manager, %s, to delete blob %s", str(self.blob_manager),
blobs[blob_num].blob_hash)
log.debug("Telling the blob manager, %s, to delete blob %s", str(self.blob_manager),
blobs[blob_num].blob_hash)
self.blob_manager.delete_blobs([blobs[blob_num].blob_hash])
else:
logging.debug("Blob number %s was not in blobs", str(blob_num))
log.debug("Blob number %s was not in blobs", str(blob_num))
else:
logging.debug("delete_blob_after_finished is False")
log.debug("delete_blob_after_finished is False")
class FullStreamProgressManager(StreamProgressManager):
@ -126,14 +129,14 @@ class FullStreamProgressManager(StreamProgressManager):
current_blob_num = self.last_blob_outputted + 1
if current_blob_num in blobs and blobs[current_blob_num].is_validated():
logging.info("Outputting blob %s", str(self.last_blob_outputted + 1))
log.info("Outputting blob %s", str(self.last_blob_outputted + 1))
self.provided_blob_nums.append(self.last_blob_outputted + 1)
d = self.download_manager.handle_blob(self.last_blob_outputted + 1)
d.addCallback(lambda _: finished_outputting_blob())
d.addCallback(lambda _: self._finished_with_blob(current_blob_num))
def log_error(err):
logging.warning("Error occurred in the output loop. Error: %s", err.getErrorMessage())
log.warning("Error occurred in the output loop. Error: %s", err.getErrorMessage())
d.addErrback(log_error)
else:

View file

@ -4,6 +4,9 @@ from zope.interface import implements
from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler
log = logging.getLogger(__name__)
class BlobAvailabilityHandlerFactory(object):
implements(IQueryHandlerFactory)
@ -37,7 +40,7 @@ class BlobAvailabilityHandler(object):
def handle_queries(self, queries):
if self.query_identifiers[0] in queries:
logging.debug("Received the client's list of requested blobs")
log.debug("Received the client's list of requested blobs")
d = self._get_available_blobs(queries[self.query_identifiers[0]])
def set_field(available_blobs):

View file

@ -6,6 +6,9 @@ from zope.interface import implements
from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler, IBlobSender
log = logging.getLogger(__name__)
class BlobRequestHandlerFactory(object):
implements(IQueryHandlerFactory)
@ -58,7 +61,7 @@ class BlobRequestHandler(object):
response['blob_data_payment_rate'] = 'RATE_ACCEPTED'
if self.query_identifiers[1] in queries:
logging.debug("Received the client's request to send a blob")
log.debug("Received the client's request to send a blob")
response_fields = {}
response['incoming_blob'] = response_fields
@ -75,11 +78,11 @@ class BlobRequestHandler(object):
if read_handle is not None:
self.currently_uploading = blob
self.read_handle = read_handle
logging.debug("Sending %s to client", str(blob))
log.debug("Sending %s to client", str(blob))
response_fields['blob_hash'] = blob.blob_hash
response_fields['length'] = blob.length
return response
logging.debug("We can not send %s", str(blob))
log.debug("We can not send %s", str(blob))
response_fields['error'] = "BLOB_UNAVAILABLE"
return response
@ -129,13 +132,13 @@ class BlobRequestHandler(object):
def start_transfer():
self.file_sender = FileSender()
logging.info("Starting the file upload")
log.info("Starting the file upload")
assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, consumer, count_bytes)
return d
def set_expected_payment():
logging.info("Setting expected payment")
log.info("Setting expected payment")
if self.blob_bytes_uploaded != 0 and self.blob_data_payment_rate is not None:
self.wallet.add_expected_payment(self.peer,
self.currently_uploading.length * 1.0 *
@ -151,6 +154,6 @@ class BlobRequestHandler(object):
self.currently_uploading = None
self.file_sender = None
if reason is not None and isinstance(reason, Failure):
logging.warning("Upload has failed. Reason: %s", reason.getErrorMessage())
log.warning("Upload has failed. Reason: %s", reason.getErrorMessage())
return _send_file()

View file

@ -6,6 +6,9 @@ from zope.interface import implements
from lbrynet.core.server.ServerRequestHandler import ServerRequestHandler
log = logging.getLogger(__name__)
class ServerProtocol(Protocol):
"""ServerProtocol needs to:
@ -26,7 +29,7 @@ class ServerProtocol(Protocol):
#Protocol stuff
def connectionMade(self):
logging.debug("Got a connection")
log.debug("Got a connection")
peer_info = self.transport.getPeer()
self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port)
self.request_handler = ServerRequestHandler(self)
@ -34,7 +37,7 @@ class ServerProtocol(Protocol):
if enabled is True:
query_handler = query_handler_factory.build_query_handler()
query_handler.register_with_request_handler(self.request_handler, self.peer)
logging.debug("Setting the request handler")
log.debug("Setting the request handler")
self.factory.rate_limiter.register_protocol(self)
def connectionLost(self, reason=failure.Failure(error.ConnectionDone())):
@ -42,10 +45,10 @@ class ServerProtocol(Protocol):
self.request_handler.stopProducing()
self.factory.rate_limiter.unregister_protocol(self)
if not reason.check(error.ConnectionDone):
logging.warning("Closing a connection. Reason: %s", reason.getErrorMessage())
log.warning("Closing a connection. Reason: %s", reason.getErrorMessage())
def dataReceived(self, data):
logging.debug("Receiving %s bytes of data from the transport", str(len(data)))
log.debug("Receiving %s bytes of data from the transport", str(len(data)))
self.factory.rate_limiter.report_dl_bytes(len(data))
if self.request_handler is not None:
self.request_handler.data_received(data)
@ -53,7 +56,7 @@ class ServerProtocol(Protocol):
#IConsumer stuff
def registerProducer(self, producer, streaming):
logging.debug("Registering the producer")
log.debug("Registering the producer")
assert streaming is True
def unregisterProducer(self):
@ -61,7 +64,7 @@ class ServerProtocol(Protocol):
self.transport.loseConnection()
def write(self, data):
logging.debug("Writing %s bytes of data to the transport", str(len(data)))
log.debug("Writing %s bytes of data to the transport", str(len(data)))
self.transport.write(data)
self.factory.rate_limiter.report_ul_bytes(len(data))

View file

@ -5,6 +5,9 @@ from zope.interface import implements
from lbrynet.interfaces import IRequestHandler
log = logging.getLogger(__name__)
class ServerRequestHandler(object):
"""This class handles requests from clients. It can upload blobs and return request for information about
more blobs that are associated with streams"""
@ -52,7 +55,7 @@ class ServerRequestHandler(object):
chunk = self.response_buff[:self.CHUNK_SIZE]
self.response_buff = self.response_buff[self.CHUNK_SIZE:]
if chunk != '':
logging.debug("writing %s bytes to the client", str(len(chunk)))
log.debug("writing %s bytes to the client", str(len(chunk)))
self.consumer.write(chunk)
reactor.callLater(0, self._produce_more)
@ -76,7 +79,7 @@ class ServerRequestHandler(object):
def get_more_data():
if self.producer is not None:
logging.debug("Requesting more data from the producer")
log.debug("Requesting more data from the producer")
self.producer.resumeProducing()
reactor.callLater(0, get_more_data)
@ -84,8 +87,8 @@ class ServerRequestHandler(object):
#From Protocol
def data_received(self, data):
logging.debug("Received data")
logging.debug("%s", str(data))
log.debug("Received data")
log.debug("%s", str(data))
if self.request_received is False:
self.request_buff = self.request_buff + data
msg = self.try_to_parse_request(self.request_buff)
@ -96,10 +99,10 @@ class ServerRequestHandler(object):
d.addCallback(lambda _: self.blob_sender.send_blob_if_requested(self))
d.addCallbacks(lambda _: self.finished_response(), self.request_failure_handler)
else:
logging.info("Request buff not a valid json message")
logging.info("Request buff: %s", str(self.request_buff))
log.info("Request buff not a valid json message")
log.info("Request buff: %s", str(self.request_buff))
else:
logging.warning("The client sent data when we were uploading a file. This should not happen")
log.warning("The client sent data when we were uploading a file. This should not happen")
######### IRequestHandler #########
@ -112,7 +115,7 @@ class ServerRequestHandler(object):
#response handling
def request_failure_handler(self, err):
logging.warning("An error occurred handling a request. Error: %s", err.getErrorMessage())
log.warning("An error occurred handling a request. Error: %s", err.getErrorMessage())
self.stopProducing()
return err
@ -122,15 +125,15 @@ class ServerRequestHandler(object):
def send_response(self, msg):
m = json.dumps(msg)
logging.info("Sending a response of length %s", str(len(m)))
logging.debug("Response: %s", str(m))
log.info("Sending a response of length %s", str(len(m)))
log.debug("Response: %s", str(m))
self.response_buff = self.response_buff + m
self._produce_more()
return True
def handle_request(self, msg):
logging.debug("Handling a request")
logging.debug(str(msg))
log.debug("Handling a request")
log.debug(str(msg))
def create_response_message(results):
response = {}
@ -140,11 +143,11 @@ class ServerRequestHandler(object):
else:
# result is a Failure
return result
logging.debug("Finished making the response message. Response: %s", str(response))
log.debug("Finished making the response message. Response: %s", str(response))
return response
def log_errors(err):
logging.warning("An error occurred handling a client request. Error message: %s", err.getErrorMessage())
log.warning("An error occurred handling a client request. Error message: %s", err.getErrorMessage())
return err
def send_response(response):

View file

@ -3,12 +3,15 @@ from twisted.internet import task, reactor
import logging
log = logging.getLogger(__name__)
def rerun_if_locked(f):
def rerun(err, *args, **kwargs):
if err.check(sqlite3.OperationalError) and err.value.message == "database is locked":
logging.warning("database was locked. rerunning %s with args %s, kwargs %s",
str(f), str(args), str(kwargs))
log.warning("database was locked. rerunning %s with args %s, kwargs %s",
str(f), str(args), str(kwargs))
return task.deferLater(reactor, 0, wrapper, *args, **kwargs)
return err

View file

@ -5,6 +5,9 @@ from lbrynet.conf import BLOB_SIZE
from lbrynet.core.BlobInfo import BlobInfo
log = logging.getLogger(__name__)
class CryptBlobInfo(BlobInfo):
def __init__(self, blob_hash, blob_num, length, iv):
BlobInfo.__init__(self, blob_hash, blob_num, length)
@ -78,12 +81,12 @@ class CryptStreamBlobMaker(object):
return done, num_bytes_to_write
def close(self):
logging.debug("closing blob %s with plaintext len %s", str(self.blob_num), str(self.length))
log.debug("closing blob %s with plaintext len %s", str(self.blob_num), str(self.length))
if self.length != 0:
self._close_buffer()
d = self.blob.close()
d.addCallback(self._return_info)
logging.debug("called the finished_callback from CryptStreamBlobMaker.close")
log.debug("called the finished_callback from CryptStreamBlobMaker.close")
return d
def _write_buffer(self):

View file

@ -12,6 +12,9 @@ from lbrynet.core.StreamCreator import StreamCreator
from lbrynet.cryptstream.CryptBlob import CryptStreamBlobMaker
log = logging.getLogger(__name__)
class CryptStreamCreator(StreamCreator):
"""Create a new stream with blobs encrypted by a symmetric cipher.
@ -57,18 +60,18 @@ class CryptStreamCreator(StreamCreator):
return defer.succeed(True)
def _finalize(self):
logging.debug("_finalize has been called")
log.debug("_finalize has been called")
self.blob_count += 1
iv = self.iv_generator.next()
final_blob_creator = self.blob_manager.get_blob_creator()
logging.debug("Created the finished_deferred")
log.debug("Created the finished_deferred")
final_blob = self._get_blob_maker(iv, final_blob_creator)
logging.debug("Created the final blob")
logging.debug("Calling close on final blob")
log.debug("Created the final blob")
log.debug("Calling close on final blob")
d = final_blob.close()
d.addCallback(self._blob_finished)
self.finished_deferreds.append(d)
logging.debug("called close on final blob, returning from make_final_blob")
log.debug("called close on final blob, returning from make_final_blob")
return d
def _write(self, data):

View file

@ -7,6 +7,9 @@ import logging
import json
log = logging.getLogger(__name__)
known_dbs = ['lbryfile_desc.db', 'lbryfiles.db', 'valuable_blobs.db', 'blobs.db',
'lbryfile_blob.db', 'lbryfile_info.db', 'settings.db', 'blind_settings.db',
'blind_peers.db', 'blind_info.db', 'lbryfile_info.db', 'lbryfile_manager.db',
@ -17,33 +20,33 @@ def do_move(from_dir, to_dir):
for known_db in known_dbs:
known_db_path = os.path.join(from_dir, known_db)
if os.path.exists(known_db_path):
logging.debug("Moving %s to %s",
os.path.abspath(known_db_path),
os.path.abspath(os.path.join(to_dir, known_db)))
log.debug("Moving %s to %s",
os.path.abspath(known_db_path),
os.path.abspath(os.path.join(to_dir, known_db)))
shutil.move(known_db_path, os.path.join(to_dir, known_db))
else:
logging.debug("Did not find %s", os.path.abspath(known_db_path))
log.debug("Did not find %s", os.path.abspath(known_db_path))
def do_migration(db_dir):
old_dir = os.path.join(db_dir, "_0_to_1_old")
new_dir = os.path.join(db_dir, "_0_to_1_new")
try:
logging.info("Moving dbs from the real directory to %s", os.path.abspath(old_dir))
log.info("Moving dbs from the real directory to %s", os.path.abspath(old_dir))
os.makedirs(old_dir)
do_move(db_dir, old_dir)
except:
logging.error("An error occurred moving the old db files.")
log.error("An error occurred moving the old db files.")
raise
try:
logging.info("Creating the new directory in %s", os.path.abspath(new_dir))
log.info("Creating the new directory in %s", os.path.abspath(new_dir))
os.makedirs(new_dir)
except:
logging.error("An error occurred creating the new directory.")
log.error("An error occurred creating the new directory.")
raise
try:
logging.info("Doing the migration")
log.info("Doing the migration")
migrate_blob_db(old_dir, new_dir)
migrate_lbryfile_db(old_dir, new_dir)
migrate_livestream_db(old_dir, new_dir)
@ -51,20 +54,20 @@ def do_migration(db_dir):
migrate_lbryfile_manager_db(old_dir, new_dir)
migrate_settings_db(old_dir, new_dir)
migrate_repeater_db(old_dir, new_dir)
logging.info("Migration succeeded")
log.info("Migration succeeded")
except:
logging.error("An error occurred during the migration. Restoring.")
log.error("An error occurred during the migration. Restoring.")
do_move(old_dir, db_dir)
raise
try:
logging.info("Moving dbs in the new directory to the real directory")
log.info("Moving dbs in the new directory to the real directory")
do_move(new_dir, db_dir)
db_revision = open(os.path.join(db_dir, 'db_revision'), mode='w+')
db_revision.write("1")
db_revision.close()
os.rmdir(new_dir)
except:
logging.error("An error occurred moving the new db files.")
log.error("An error occurred moving the new db files.")
raise
return old_dir

View file

@ -22,6 +22,9 @@ from hashwatcher import HashWatcher
import logging
log = logging.getLogger(__name__)
def rpcmethod(func):
""" Decorator to expose Node methods as remote procedure calls
@ -231,12 +234,12 @@ class Node(object):
known_nodes = {}
def log_error(err, n):
logging.error("error storing blob_hash %s at %s", binascii.hexlify(blob_hash), str(n))
logging.error(binascii.hexlify(err.getErrorMessage()))
logging.error(err.getTraceback())
log.error("error storing blob_hash %s at %s", binascii.hexlify(blob_hash), str(n))
log.error(binascii.hexlify(err.getErrorMessage()))
log.error(err.getTraceback())
def log_success(res):
logging.debug("Response to store request: %s", str(res))
log.debug("Response to store request: %s", str(res))
return res
def announce_to_peer(responseTuple):
@ -332,7 +335,7 @@ class Node(object):
# originalPublisherID = self.id
#
# def log_error(err):
# logging.error(err.getErrorMessage())
# log.error(err.getErrorMessage())
#
# # Prepare a callback for doing "STORE" RPC calls
# def executeStoreRPCs(nodes):
@ -756,7 +759,7 @@ class Node(object):
searchIteration()
def log_error(err):
logging.error(err.getErrorMessage())
log.error(err.getErrorMessage())
# Send parallel, asynchronous FIND_NODE RPCs to the shortlist of contacts
def searchIteration():

View file

@ -6,6 +6,9 @@ import sys
from lbrynet.core.utils import generate_id
log = logging.getLogger(__name__)
def print_usage():
print "Usage:\n%s UDP_PORT KNOWN_NODE_IP KNOWN_NODE_PORT HASH"
@ -13,15 +16,15 @@ def print_usage():
def join_network(udp_port, known_nodes):
lbryid = generate_id()
logging.info('Creating Node...')
log.info('Creating Node...')
node = Node(udpPort=udp_port, lbryid=lbryid)
logging.info('Joining network...')
log.info('Joining network...')
d = node.joinNetwork(known_nodes)
def log_network_size():
logging.info("Approximate number of nodes in DHT: %s", str(node.getApproximateTotalDHTNodes()))
logging.info("Approximate number of blobs in DHT: %s", str(node.getApproximateTotalHashes()))
log.info("Approximate number of nodes in DHT: %s", str(node.getApproximateTotalDHTNodes()))
log.info("Approximate number of blobs in DHT: %s", str(node.getApproximateTotalHashes()))
d.addCallback(lambda _: log_network_size())
@ -36,7 +39,7 @@ def get_hosts(node, h):
print "Hosts returned from the DHT: "
print hosts
logging.info("Looking up %s", h)
log.info("Looking up %s", h)
d = node.getPeersForBlob(h)
d.addCallback(print_hosts)
return d
@ -48,9 +51,9 @@ def announce_hash(node, h):
def log_results(results):
for success, result in results:
if success:
logging.info("Succeeded: %s", str(result))
log.info("Succeeded: %s", str(result))
else:
logging.info("Failed: %s", str(result.getErrorMessage()))
log.info("Failed: %s", str(result.getErrorMessage()))
d.addCallback(log_results)
return d
@ -80,11 +83,11 @@ def run_dht_script(dht_func):
d.addCallback(run_dht_func)
def log_err(err):
logging.error("An error occurred: %s", err.getTraceback())
log.error("An error occurred: %s", err.getTraceback())
return err
def shut_down():
logging.info("Shutting down")
log.info("Shutting down")
reactor.stop()
d.addErrback(log_err)

View file

@ -1,13 +1,16 @@
import logging
import sqlite3
import os
from twisted.internet import threads, defer
from twisted.internet import defer
from twisted.python.failure import Failure
from twisted.enterprise import adbapi
from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHashError
from lbrynet.core.sqlite_helpers import rerun_if_locked
log = logging.getLogger(__name__)
class DBLBRYFileMetadataManager(object):
"""Store and provide access to LBRY file metadata using leveldb files"""
@ -45,7 +48,7 @@ class DBLBRYFileMetadataManager(object):
return self._add_blobs_to_stream(stream_hash, blobs, ignore_duplicate_error=True)
def get_blobs_for_stream(self, stream_hash, start_blob=None, end_blob=None, count=None, reverse=False):
logging.info("Getting blobs for a stream. Count is %s", str(count))
log.info("Getting blobs for a stream. Count is %s", str(count))
def get_positions_of_start_and_end():
if start_blob is not None:

View file

@ -6,11 +6,14 @@ from twisted.internet import defer
from lbrynet.core.Error import DuplicateStreamHashError, InvalidStreamDescriptorError
log = logging.getLogger(__name__)
LBRYFileStreamType = "lbryfile"
def save_sd_info(stream_info_manager, sd_info, ignore_duplicate=False):
logging.debug("Saving info for %s", str(sd_info['stream_name']))
log.debug("Saving info for %s", str(sd_info['stream_name']))
hex_stream_name = sd_info['stream_name']
key = sd_info['key']
stream_hash = sd_info['stream_hash']
@ -26,7 +29,7 @@ def save_sd_info(stream_info_manager, sd_info, ignore_duplicate=False):
blob_num = blob['blob_num']
iv = blob['iv']
crypt_blobs.append(CryptBlobInfo(blob_hash, blob_num, length, iv))
logging.debug("Trying to save stream info for %s", str(hex_stream_name))
log.debug("Trying to save stream info for %s", str(hex_stream_name))
d = stream_info_manager.save_stream(stream_hash, hex_stream_name, key,
suggested_file_name, crypt_blobs)
@ -80,7 +83,7 @@ class LBRYFileStreamDescriptorValidator(object):
self.raw_info = raw_info
def validate(self):
logging.debug("Trying to validate stream descriptor for %s", str(self.raw_info['stream_name']))
log.debug("Trying to validate stream descriptor for %s", str(self.raw_info['stream_name']))
try:
hex_stream_name = self.raw_info['stream_name']
key = self.raw_info['key']

View file

@ -4,6 +4,9 @@ from lbrynet.cryptstream.CryptBlob import CryptBlobInfo
from lbrynet.interfaces import IMetadataHandler
log = logging.getLogger(__name__)
class LBRYFileMetadataHandler(object):
implements(IMetadataHandler)
@ -31,6 +34,6 @@ class LBRYFileMetadataHandler(object):
if blob_hash is not None:
infos.append(CryptBlobInfo(blob_hash, blob_num, length, iv))
else:
logging.debug("Setting _final_blob_num to %s", str(blob_num - 1))
log.debug("Setting _final_blob_num to %s", str(blob_num - 1))
self._final_blob_num = blob_num - 1
return infos

View file

@ -14,6 +14,9 @@ from twisted.protocols.basic import FileSender
from lbrynet.lbryfilemanager.LBRYFileDownloader import ManagedLBRYFileDownloader
log = logging.getLogger(__name__)
class LBRYFileStreamCreator(CryptStreamCreator):
"""
A CryptStreamCreator which adds itself and its additional metadata to an LBRYFileManager
@ -30,7 +33,7 @@ class LBRYFileStreamCreator(CryptStreamCreator):
self.blob_infos = []
def _blob_finished(self, blob_info):
logging.debug("length: %s", str(blob_info.length))
log.debug("length: %s", str(blob_info.length))
self.blob_infos.append(blob_info)
def _save_lbry_file_info(self):
@ -128,11 +131,11 @@ def create_lbry_file(session, lbry_file_manager, file_name, file_handle, key=Non
"""
def stop_file(creator):
logging.debug("the file sender has triggered its deferred. stopping the stream writer")
log.debug("the file sender has triggered its deferred. stopping the stream writer")
return creator.stop()
def make_stream_desc_file(stream_hash):
logging.debug("creating the stream descriptor file")
log.debug("creating the stream descriptor file")
descriptor_writer = PlainStreamDescriptorWriter(file_name + conf.CRYPTSD_FILE_EXTENSION)
d = get_sd_info(lbry_file_manager.stream_info_manager, stream_hash, True)

View file

@ -3,7 +3,6 @@ Keep track of which LBRY Files are downloading and store their LBRY File specifi
"""
import logging
import json
from twisted.enterprise import adbapi
@ -12,12 +11,15 @@ from lbrynet.lbryfilemanager.LBRYFileDownloader import ManagedLBRYFileDownloader
from lbrynet.lbryfilemanager.LBRYFileDownloader import ManagedLBRYFileDownloaderFactory
from lbrynet.lbryfile.StreamDescriptor import LBRYFileStreamType
from lbrynet.core.PaymentRateManager import PaymentRateManager
from twisted.internet import threads, defer, task, reactor
from twisted.internet import defer, task, reactor
from twisted.python.failure import Failure
from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError, CurrentlyStoppingError
from lbrynet.core.sqlite_helpers import rerun_if_locked
log = logging.getLogger(__name__)
class LBRYFileManager(object):
"""
Keeps track of currently opened LBRY Files, their options, and their LBRY File specific metadata.
@ -70,7 +72,7 @@ class LBRYFileManager(object):
return self._set_lbry_file_payment_rate(stream_hash, new_rate)
def change_lbry_file_status(self, stream_hash, status):
logging.debug("Changing status of %s to %s", stream_hash, status)
log.debug("Changing status of %s to %s", stream_hash, status)
return self._change_file_status(stream_hash, status)
def get_lbry_file_status_reports(self):
@ -100,7 +102,7 @@ class LBRYFileManager(object):
return d
def log_error(err):
logging.error("An error occurred while starting a lbry file: %s", err.getErrorMessage())
log.error("An error occurred while starting a lbry file: %s", err.getErrorMessage())
def start_lbry_files(stream_hashes_and_options):
for stream_hash, options in stream_hashes_and_options:

View file

@ -12,6 +12,9 @@ from twisted.protocols.basic import FileSender
from zope.interface import implements
log = logging.getLogger(__name__)
class LiveStreamCreator(CryptStreamCreator):
def __init__(self, blob_manager, stream_info_manager, name=None, key=None, iv_generator=None,
delete_after_num=None, secret_pass_phrase=None):
@ -30,8 +33,8 @@ class LiveStreamCreator(CryptStreamCreator):
return d
def _blob_finished(self, blob_info):
logging.debug("In blob_finished")
logging.debug("length: %s", str(blob_info.length))
log.debug("In blob_finished")
log.debug("length: %s", str(blob_info.length))
sig_hash = get_lbry_hash_obj()
sig_hash.update(self.stream_hash)
if blob_info.length != 0:
@ -48,11 +51,11 @@ class LiveStreamCreator(CryptStreamCreator):
d = self.stream_info_manager.add_blobs_to_stream(self.stream_hash, [blob_info])
def log_add_error(err):
logging.error("An error occurred adding a blob info to the stream info manager: %s", err.getErrorMessage())
log.error("An error occurred adding a blob info to the stream info manager: %s", err.getErrorMessage())
return err
d.addErrback(log_add_error)
logging.debug("returning from blob_finished")
log.debug("returning from blob_finished")
return d
def setup(self):

View file

@ -3,13 +3,16 @@ import logging
from twisted.enterprise import adbapi
import os
import sqlite3
from twisted.internet import threads, defer
from twisted.internet import defer
from twisted.python.failure import Failure
from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier
from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHashError
from lbrynet.core.sqlite_helpers import rerun_if_locked
log = logging.getLogger(__name__)
class DBLiveStreamMetadataManager(DHTHashSupplier):
"""This class stores all stream info in a leveldb database stored in the same directory as the blobfiles"""
@ -58,7 +61,7 @@ class DBLiveStreamMetadataManager(DHTHashSupplier):
return self._add_blobs_to_stream(stream_hash, blobs, ignore_duplicate_error=True)
def get_blobs_for_stream(self, stream_hash, start_blob=None, end_blob=None, count=None, reverse=False):
logging.info("Getting blobs for a stream. Count is %s", str(count))
log.info("Getting blobs for a stream. Count is %s", str(count))
def get_positions_of_start_and_end():
if start_blob is not None:

View file

@ -8,11 +8,14 @@ from lbrynet.interfaces import IStreamDescriptorValidator
from zope.interface import implements
log = logging.getLogger(__name__)
LiveStreamType = "lbrylive"
def save_sd_info(stream_info_manager, sd_info, ignore_duplicate=False):
logging.debug("Saving info for %s", str(sd_info['stream_name']))
log.debug("Saving info for %s", str(sd_info['stream_name']))
hex_stream_name = sd_info['stream_name']
public_key = sd_info['public_key']
key = sd_info['key']
@ -30,7 +33,7 @@ def save_sd_info(stream_info_manager, sd_info, ignore_duplicate=False):
iv = blob['iv']
signature = blob['signature']
crypt_blobs.append(LiveBlobInfo(blob_hash, blob_num, length, iv, revision, signature))
logging.debug("Trying to save stream info for %s", str(hex_stream_name))
log.debug("Trying to save stream info for %s", str(hex_stream_name))
d = stream_info_manager.save_stream(stream_hash, public_key, hex_stream_name,
key, crypt_blobs)
@ -88,7 +91,7 @@ class LBRYLiveStreamDescriptorValidator(object):
self.raw_info = raw_info
def validate(self):
logging.debug("Trying to validate stream descriptor for %s", str(self.raw_info['stream_name']))
log.debug("Trying to validate stream descriptor for %s", str(self.raw_info['stream_name']))
hex_stream_name = self.raw_info['stream_name']
public_key = self.raw_info['public_key']
key = self.raw_info['key']

View file

@ -8,8 +8,6 @@ from lbrynet.lbrylive.StreamDescriptor import save_sd_info
from lbrynet.lbrylive.PaymentRateManager import LiveStreamPaymentRateManager
from twisted.internet import defer, threads # , process
from lbrynet.interfaces import IStreamDownloaderFactory
from lbrynet.lbrylive.PaymentRateManager import BaseLiveStreamPaymentRateManager
from lbrynet.conf import MIN_BLOB_INFO_PAYMENT_RATE
from lbrynet.lbrylive.StreamDescriptor import LiveStreamType

View file

@ -12,6 +12,9 @@ from lbrynet.core.Error import InsufficientFundsError, InvalidResponseError, Req
from lbrynet.core.Error import NoResponseError, ConnectionClosedBeforeResponseError
log = logging.getLogger(__name__)
class LiveStreamMetadataHandler(object):
implements(IRequestCreator, IMetadataHandler)
@ -79,7 +82,7 @@ class LiveStreamMetadataHandler(object):
r = None
if self._finished_discovery() is False:
r = self.stream_hash
logging.debug("Info finder peer search response for stream %s: %s", str(self.stream_hash), str(r))
log.debug("Info finder peer search response for stream %s: %s", str(self.stream_hash), str(r))
return defer.succeed(r)
def _find_peers_for_hash(self, h):
@ -101,7 +104,7 @@ class LiveStreamMetadataHandler(object):
if blob_hash is not None:
infos.append(LiveBlobInfo(blob_hash, blob_num, length, iv, revision, signature))
else:
logging.debug("Setting _final_blob_num to %s", str(blob_num - 1))
log.debug("Setting _final_blob_num to %s", str(blob_num - 1))
self._final_blob_num = blob_num - 1
return infos
@ -134,7 +137,7 @@ class LiveStreamMetadataHandler(object):
further_blobs_request['count'] = count
else:
further_blobs_request['count'] = MAX_BLOB_INFOS_TO_REQUEST
logging.debug("Requesting %s blob infos from %s", str(further_blobs_request['count']), str(peer))
log.debug("Requesting %s blob infos from %s", str(further_blobs_request['count']), str(peer))
r_dict = {'further_blobs': further_blobs_request}
response_identifier = 'further_blobs'
request = ClientPaidRequest(r_dict, response_identifier, further_blobs_request['count'])
@ -142,7 +145,7 @@ class LiveStreamMetadataHandler(object):
return None
def _get_discovery_params(self):
logging.debug("In _get_discovery_params")
log.debug("In _get_discovery_params")
stream_position = self.download_manager.stream_position()
blobs = self.download_manager.blobs
if blobs:
@ -153,7 +156,7 @@ class LiveStreamMetadataHandler(object):
if final_blob_num is not None:
last_blob_num = final_blob_num
if self.download_whole is False:
logging.debug("download_whole is False")
log.debug("download_whole is False")
if final_blob_num is not None:
for i in xrange(stream_position, final_blob_num + 1):
if not i in blobs:
@ -171,7 +174,7 @@ class LiveStreamMetadataHandler(object):
return self.stream_hash, blobs[last_blob_num].blob_hash, 'end', -1 * self.max_before_skip_ahead
else:
return self.stream_hash, None, 'end', -1 * self.max_before_skip_ahead
logging.debug("download_whole is True")
log.debug("download_whole is True")
beginning = None
end = None
for i in xrange(stream_position, last_blob_num + 1):
@ -187,17 +190,17 @@ class LiveStreamMetadataHandler(object):
break
if beginning is None:
if final_blob_num is not None:
logging.debug("Discovery is finished. stream_position: %s, last_blob_num + 1: %s", str(stream_position),
str(last_blob_num + 1))
log.debug("Discovery is finished. stream_position: %s, last_blob_num + 1: %s", str(stream_position),
str(last_blob_num + 1))
return None
else:
logging.debug("Discovery is not finished. final blob num is unknown.")
log.debug("Discovery is not finished. final blob num is unknown.")
if last_blob_num != -1:
return self.stream_hash, blobs[last_blob_num].blob_hash, None, None
else:
return self.stream_hash, 'beginning', None, None
else:
logging.info("Discovery is not finished. Not all blobs are known.")
log.info("Discovery is not finished. Not all blobs are known.")
return self.stream_hash, beginning, end, None
def _price_settled(self, protocol):
@ -244,7 +247,7 @@ class LiveStreamMetadataHandler(object):
if response == "RATE_ACCEPTED":
return True
else:
logging.info("Rate offer has been rejected by %s", str(peer))
log.info("Rate offer has been rejected by %s", str(peer))
del self._protocol_prices[protocol]
self._price_disagreements.append(peer)
return True
@ -263,8 +266,8 @@ class LiveStreamMetadataHandler(object):
if not 'blob_infos' in response:
return InvalidResponseError("Missing the required field 'blob_infos'")
raw_blob_infos = response['blob_infos']
logging.info("Handling %s further blobs from %s", str(len(raw_blob_infos)), str(peer))
logging.debug("blobs: %s", str(raw_blob_infos))
log.info("Handling %s further blobs from %s", str(len(raw_blob_infos)), str(peer))
log.debug("blobs: %s", str(raw_blob_infos))
for raw_blob_info in raw_blob_infos:
length = raw_blob_info['length']
if length != 0:
@ -276,10 +279,10 @@ class LiveStreamMetadataHandler(object):
iv = raw_blob_info['iv']
signature = raw_blob_info['signature']
blob_info = LiveBlobInfo(blob_hash, num, length, iv, revision, signature)
logging.debug("Learned about a potential blob: %s", str(blob_hash))
log.debug("Learned about a potential blob: %s", str(blob_hash))
if self._verify_blob(blob_info):
if blob_hash is None:
logging.info("Setting _final_blob_num to %s", str(num - 1))
log.info("Setting _final_blob_num to %s", str(num - 1))
self._final_blob_num = num - 1
else:
blob_infos.append(blob_info)
@ -289,7 +292,7 @@ class LiveStreamMetadataHandler(object):
def add_blobs_to_download_manager():
blob_nums = [b.blob_num for b in blob_infos]
logging.info("Adding the following blob nums to the download manager: %s", str(blob_nums))
log.info("Adding the following blob nums to the download manager: %s", str(blob_nums))
self.download_manager.add_blobs_to_download(blob_infos)
d.addCallback(lambda _: add_blobs_to_download_manager())
@ -308,12 +311,12 @@ class LiveStreamMetadataHandler(object):
return d
def _verify_blob(self, blob):
logging.debug("Got an unverified blob to check:")
logging.debug("blob_hash: %s", blob.blob_hash)
logging.debug("blob_num: %s", str(blob.blob_num))
logging.debug("revision: %s", str(blob.revision))
logging.debug("iv: %s", blob.iv)
logging.debug("length: %s", str(blob.length))
log.debug("Got an unverified blob to check:")
log.debug("blob_hash: %s", blob.blob_hash)
log.debug("blob_num: %s", str(blob.blob_num))
log.debug("revision: %s", str(blob.revision))
log.debug("iv: %s", blob.iv)
log.debug("length: %s", str(blob.length))
hashsum = get_lbry_hash_obj()
hashsum.update(self.stream_hash)
if blob.length != 0:
@ -322,12 +325,12 @@ class LiveStreamMetadataHandler(object):
hashsum.update(str(blob.revision))
hashsum.update(blob.iv)
hashsum.update(str(blob.length))
logging.debug("hexdigest to be verified: %s", hashsum.hexdigest())
log.debug("hexdigest to be verified: %s", hashsum.hexdigest())
if verify_signature(hashsum.digest(), blob.signature, self.stream_pub_key):
logging.debug("Blob info is valid")
log.debug("Blob info is valid")
return True
else:
logging.debug("The blob info is invalid")
log.debug("The blob info is invalid")
return False
def _request_failed(self, reason, peer):
@ -336,7 +339,7 @@ class LiveStreamMetadataHandler(object):
if reason.check(NoResponseError):
self._incompatible_peers.append(peer)
return
logging.warning("Crypt stream info finder: a request failed. Reason: %s", reason.getErrorMessage())
log.warning("Crypt stream info finder: a request failed. Reason: %s", reason.getErrorMessage())
self._update_local_score(peer, -5.0)
peer.update_score(-10.0)
if reason.check(ConnectionClosedBeforeResponseError):

View file

@ -3,6 +3,9 @@ from lbrynet.core.client.StreamProgressManager import StreamProgressManager
from twisted.internet import defer
log = logging.getLogger(__name__)
class LiveStreamProgressManager(StreamProgressManager):
def __init__(self, finished_callback, blob_manager, download_manager, delete_blob_after_finished=False,
download_whole=True, max_before_skip_ahead=5):
@ -52,9 +55,9 @@ class LiveStreamProgressManager(StreamProgressManager):
return
blobs = self.download_manager.blobs
logging.info("In _output_loop. last_blob_outputted: %s", str(self.last_blob_outputted))
log.info("In _output_loop. last_blob_outputted: %s", str(self.last_blob_outputted))
if blobs:
logging.debug("Newest blob number: %s", str(max(blobs.iterkeys())))
log.debug("Newest blob number: %s", str(max(blobs.iterkeys())))
if self.outputting_d is None:
self.outputting_d = defer.Deferred()
@ -71,15 +74,15 @@ class LiveStreamProgressManager(StreamProgressManager):
reactor.callLater(0, self._output_loop)
if current_blob_num in blobs and blobs[current_blob_num].is_validated():
logging.info("Outputting blob %s", str(current_blob_num))
log.info("Outputting blob %s", str(current_blob_num))
self.provided_blob_nums.append(current_blob_num)
d = self.download_manager.handle_blob(current_blob_num)
d.addCallback(lambda _: finished_outputting_blob())
d.addCallback(lambda _: self._finished_with_blob(current_blob_num))
elif blobs and max(blobs.iterkeys()) > self.last_blob_outputted + self.max_before_skip_ahead - 1:
self.last_blob_outputted += 1
logging.info("Skipping blob number %s due to knowing about blob number %s",
str(self.last_blob_outputted), str(max(blobs.iterkeys())))
log.info("Skipping blob number %s due to knowing about blob number %s",
str(self.last_blob_outputted), str(max(blobs.iterkeys())))
self._finished_with_blob(current_blob_num)
reactor.callLater(0, self._output_loop)
else:

View file

@ -4,6 +4,9 @@ from zope.interface import implements
from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler
log = logging.getLogger(__name__)
class CryptBlobInfoQueryHandlerFactory(object):
implements(IQueryHandlerFactory)
@ -54,7 +57,7 @@ class CryptBlobInfoQueryHandler(object):
if self.query_identifiers[1] in queries:
further_blobs_request = queries[self.query_identifiers[1]]
logging.debug("Received the client's request for additional blob information")
log.debug("Received the client's request for additional blob information")
if self.blob_info_payment_rate is None:
response['further_blobs'] = {'error': 'RATE_UNSET'}
@ -62,7 +65,7 @@ class CryptBlobInfoQueryHandler(object):
def count_and_charge(blob_infos):
if len(blob_infos) != 0:
logging.info("Responding with %s infos", str(len(blob_infos)))
log.info("Responding with %s infos", str(len(blob_infos)))
expected_payment = 1.0 * len(blob_infos) * self.blob_info_payment_rate / 1000.0
self.wallet.add_expected_payment(self.peer, expected_payment)
self.peer.update_stats('uploaded_crypt_blob_infos', len(blob_infos))

View file

@ -9,6 +9,9 @@ from lbrynet.core.StreamDescriptor import download_sd_blob
from twisted.internet import defer
log = logging.getLogger(__name__)
class InvalidChoiceError(Exception):
pass
@ -335,7 +338,7 @@ class AddStream(ControlHandler):
def _handle_load_failed(self, err):
self.loading_failed = True
logging.error("An exception occurred attempting to load the stream descriptor: %s", err.getTraceback())
log.error("An exception occurred attempting to load the stream descriptor: %s", err.getTraceback())
return defer.succeed("Encountered a problem while loading the stream descriptor: %s\n"
"See console.log for further details.\n"
"Press enter to continue" % err.getErrorMessage())

View file

@ -36,6 +36,9 @@ from lbrynet.lbrynet_console.ControlHandlers import PeerStatsAndSettingsChooserF
from lbrynet.core.LBRYcrdWallet import LBRYcrdWallet
log = logging.getLogger(__name__)
class LBRYConsole():
"""A class which can upload and download file streams to and from the network"""
def __init__(self, peer_port, dht_node_port, known_dht_nodes, control_class, wallet_type, lbrycrd_conf,
@ -132,10 +135,10 @@ class LBRYConsole():
db_revision = open(os.path.join(self.conf_dir, "db_revision"), mode='w')
db_revision.write(str(self.current_db_revision))
db_revision.close()
logging.debug("Created the configuration directory: %s", str(self.conf_dir))
log.debug("Created the configuration directory: %s", str(self.conf_dir))
if not os.path.exists(self.data_dir):
os.makedirs(self.data_dir)
logging.debug("Created the data directory: %s", str(self.data_dir))
log.debug("Created the data directory: %s", str(self.data_dir))
def _check_db_migration(self):
old_revision = 0
@ -494,8 +497,14 @@ def launch_lbry_console():
log_format = "(%(asctime)s)[%(filename)s:%(lineno)s] %(funcName)s(): %(message)s"
logging.basicConfig(level=logging.DEBUG, filename=os.path.join(conf_dir, "console.log"),
format=log_format)
formatter = logging.Formatter(log_format)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler(os.path.join(conf_dir, "console.log"))
file_handler.setFormatter(formatter)
file_handler.addFilter(logging.Filter("lbrynet"))
logger.addHandler(file_handler)
console = LBRYConsole(peer_port, dht_node_port, bootstrap_nodes, StdIOControl, wallet_type=args.wallet_type,
lbrycrd_conf=lbrycrd_conf, use_upnp=args.use_upnp,

View file

@ -6,6 +6,9 @@ import os
from twisted.internet import threads, defer
log = logging.getLogger(__name__)
class LBRYSettings(object):
def __init__(self, db_dir):
self.db_dir = db_dir
@ -19,7 +22,7 @@ class LBRYSettings(object):
return defer.succeed(True)
def _open_db(self):
logging.debug("Opening %s as the settings database", str(os.path.join(self.db_dir, "settings.db")))
log.debug("Opening %s as the settings database", str(os.path.join(self.db_dir, "settings.db")))
self.db = unqlite.UnQLite(os.path.join(self.db_dir, "settings.db"))
return defer.succeed(True)

View file

@ -12,6 +12,9 @@ from twisted.python.failure import Failure
from collections import defaultdict
log = logging.getLogger(__name__)
class BlindMetadataHandler(object):
implements(IMetadataHandler, IRequestCreator)
@ -195,7 +198,7 @@ class BlindMetadataHandler(object):
if not 'valuable_blob_hashes' in response:
return InvalidResponseError("Missing the required field 'valuable_blob_hashes'")
hashes = response['valuable_blob_hashes']
logging.info("Handling %s valuable blob hashes from %s", str(len(hashes)), str(peer))
log.info("Handling %s valuable blob hashes from %s", str(len(hashes)), str(peer))
expire_time = datetime.datetime.now() + datetime.timedelta(minutes=10)
reference = None
unique_hashes = set()
@ -230,8 +233,8 @@ class BlindMetadataHandler(object):
if not 'blob_lengths' in response:
return InvalidResponseError("Missing the required field 'blob_lengths'")
raw_blob_lengths = response['blob_lengths']
logging.info("Handling %s blob lengths from %s", str(len(raw_blob_lengths)), str(peer))
logging.debug("blobs: %s", str(raw_blob_lengths))
log.info("Handling %s blob lengths from %s", str(len(raw_blob_lengths)), str(peer))
log.debug("blobs: %s", str(raw_blob_lengths))
infos = []
unique_hashes = set()
for blob_hash, length in raw_blob_lengths:
@ -288,8 +291,8 @@ class BlindMetadataHandler(object):
if reason.check(NoResponseError):
self._incompatible_peers.append(peer)
return
logging.warning("Valuable blob info requester: a request of type %s has failed. Reason: %s",
str(request_type), str(reason.getErrorMessage()))
log.warning("Valuable blob info requester: a request of type %s has failed. Reason: %s",
str(request_type), str(reason.getErrorMessage()))
self._update_local_score(peer, -10.0)
peer.update_score(-5.0)
if reason.check(ConnectionClosedBeforeResponseError):

View file

@ -4,6 +4,9 @@ from twisted.internet import defer
import logging
log = logging.getLogger(__name__)
class ValuableQueryHandler(object):
implements(IQueryHandler)
@ -98,8 +101,8 @@ class ValuableBlobHashQueryHandler(ValuableQueryHandler):
for blob_hash, count in valuable_hashes:
hashes_and_scores.append((blob_hash, 1.0 * count / 10.0))
if len(hashes_and_scores) != 0:
logging.info("Responding to a valuable blob hashes request with %s blob hashes: %s",
str(len(hashes_and_scores)))
log.info("Responding to a valuable blob hashes request with %s blob hashes: %s",
str(len(hashes_and_scores)))
expected_payment = 1.0 * len(hashes_and_scores) * self.valuable_blob_hash_payment_rate / 1000.0
self.wallet.add_expected_payment(self.peer, expected_payment)
self.peer.update_stats('uploaded_valuable_blob_hashes', len(hashes_and_scores))
@ -190,7 +193,7 @@ class ValuableBlobLengthQueryHandler(ValuableQueryHandler):
if success is True:
lengths.append(response_pair)
if len(lengths) > 0:
logging.info("Responding with %s blob lengths: %s", str(len(lengths)))
log.info("Responding with %s blob lengths: %s", str(len(lengths)))
expected_payment = 1.0 * len(lengths) * self.blob_length_payment_rate / 1000.0
self.wallet.add_expected_payment(self.peer, expected_payment)
self.peer.update_stats('uploaded_valuable_blob_infos', len(lengths))

View file

@ -122,4 +122,4 @@ class BlindRepeaterPlugin(LBRYPlugin.LBRYPlugin):
def _add_to_lbry_console(self, lbry_console):
lbry_console.add_control_handlers(self.control_handlers)
lbry_console.add_query_handlers([self.valuable_blob_length_query_handler,
self.valuable_blob_hash_query_handler])
self.valuable_blob_hash_query_handler])

View file

@ -11,6 +11,9 @@ import os
from twisted.internet import defer, reactor, tksupport, task
log = logging.getLogger(__name__)
class DownloaderApp(object):
def __init__(self):
self.master = None
@ -27,7 +30,7 @@ class DownloaderApp(object):
d.addCallback(lambda _: self._enable_lookup())
def show_error_and_stop(err):
logging.error(err.getErrorMessage())
log.error(err.getErrorMessage())
tkMessageBox.showerror(title="Start Error", message=err.getErrorMessage())
return self.stop()
@ -37,7 +40,7 @@ class DownloaderApp(object):
def stop(self):
def log_error(err):
logging.error(err.getErrorMessage())
log.error(err.getErrorMessage())
if self.downloader is not None:
d = self.downloader.stop()

View file

@ -22,6 +22,9 @@ import requests
from twisted.internet import threads, defer, task
log = logging.getLogger(__name__)
class LBRYDownloader(object):
def __init__(self):
self.session = None
@ -109,13 +112,13 @@ class LBRYDownloader(object):
def get_configuration():
if not os.path.exists("lbry.conf"):
logging.debug("Could not read lbry.conf")
log.debug("Could not read lbry.conf")
return ""
else:
lbry_conf = open("lbry.conf")
logging.debug("Loading configuration options from lbry.conf")
log.debug("Loading configuration options from lbry.conf")
lines = lbry_conf.readlines()
logging.debug("lbry.conf file contents:\n%s", str(lines))
log.debug("lbry.conf file contents:\n%s", str(lines))
return lines
d = threads.deferToThread(get_configuration)
@ -143,7 +146,7 @@ class LBRYDownloader(object):
known_nodes.append((ip_address, int(port_string)))
except (ValueError, AssertionError):
raise ValueError("Expected known nodes in format 192.168.1.1:4000,192.168.1.2:4001. Got %s" % str(field_value))
logging.debug("Setting known_dht_nodes to %s", str(known_nodes))
log.debug("Setting known_dht_nodes to %s", str(known_nodes))
self.known_dht_nodes = known_nodes
elif field_name == "run_server":
if field_value.lower() == "true":
@ -152,25 +155,25 @@ class LBRYDownloader(object):
run_server = False
else:
raise ValueError("run_server must be set to True or False. Got %s" % field_value)
logging.debug("Setting run_server to %s", str(run_server))
log.debug("Setting run_server to %s", str(run_server))
self.run_server = run_server
elif field_name == "db_dir":
logging.debug("Setting conf_dir to %s", str(field_value))
log.debug("Setting conf_dir to %s", str(field_value))
self.conf_dir = field_value
elif field_name == "data_dir":
logging.debug("Setting data_dir to %s", str(field_value))
log.debug("Setting data_dir to %s", str(field_value))
self.data_dir = field_value
elif field_name == "wallet_dir":
logging.debug("Setting wallet_dir to %s", str(field_value))
log.debug("Setting wallet_dir to %s", str(field_value))
self.wallet_dir = field_value
elif field_name == "wallet_conf":
logging.debug("Setting wallet_conf to %s", str(field_value))
log.debug("Setting wallet_conf to %s", str(field_value))
self.wallet_conf = field_value
elif field_name == "peer_port":
try:
peer_port = int(field_value)
assert 0 <= peer_port <= 65535
logging.debug("Setting peer_port to %s", str(peer_port))
log.debug("Setting peer_port to %s", str(peer_port))
self.peer_port = peer_port
except (ValueError, AssertionError):
raise ValueError("peer_port must be set to an integer between 1 and 65535. Got %s" % field_value)
@ -178,7 +181,7 @@ class LBRYDownloader(object):
try:
dht_port = int(field_value)
assert 0 <= dht_port <= 65535
logging.debug("Setting dht_node_port to %s", str(dht_port))
log.debug("Setting dht_node_port to %s", str(dht_port))
self.dht_node_port = dht_port
except (ValueError, AssertionError):
raise ValueError("dht_port must be set to an integer between 1 and 65535. Got %s" % field_value)
@ -189,13 +192,13 @@ class LBRYDownloader(object):
use_upnp = False
else:
raise ValueError("use_upnp must be set to True or False. Got %s" % str(field_value))
logging.debug("Setting use_upnp to %s", str(use_upnp))
log.debug("Setting use_upnp to %s", str(use_upnp))
self.use_upnp = use_upnp
elif field_name == "default_blob_data_payment_rate":
try:
rate = float(field_value)
assert rate >= 0.0
logging.debug("Setting default_blob_data_payment_rate to %s", str(rate))
log.debug("Setting default_blob_data_payment_rate to %s", str(rate))
self.default_blob_data_payment_rate = rate
except (ValueError, AssertionError):
raise ValueError("default_blob_data_payment_rate must be a positive floating point number, e.g. 0.5. Got %s" % str(field_value))
@ -206,10 +209,10 @@ class LBRYDownloader(object):
start_lbrycrdd = False
else:
raise ValueError("start_lbrycrdd must be set to True or False. Got %s" % field_value)
logging.debug("Setting start_lbrycrdd to %s", str(start_lbrycrdd))
log.debug("Setting start_lbrycrdd to %s", str(start_lbrycrdd))
self.start_lbrycrdd = start_lbrycrdd
elif field_name == "download_directory":
logging.debug("Setting download_directory to %s", str(field_value))
log.debug("Setting download_directory to %s", str(field_value))
self.download_directory = field_value
elif field_name == "delete_blobs_on_stream_remove":
if field_value.lower() == "true":
@ -219,7 +222,7 @@ class LBRYDownloader(object):
else:
raise ValueError("delete_blobs_on_stream_remove must be set to True or False")
else:
logging.warning("Got unknown configuration field: %s", field_name)
log.warning("Got unknown configuration field: %s", field_name)
d.addCallback(load_configuration)
return d
@ -230,10 +233,10 @@ class LBRYDownloader(object):
db_revision = open(os.path.join(self.conf_dir, "db_revision"), mode='w')
db_revision.write(str(self.current_db_revision))
db_revision.close()
logging.debug("Created the configuration directory: %s", str(self.conf_dir))
log.debug("Created the configuration directory: %s", str(self.conf_dir))
if not os.path.exists(self.data_dir):
os.makedirs(self.data_dir)
logging.debug("Created the data directory: %s", str(self.data_dir))
log.debug("Created the data directory: %s", str(self.data_dir))
if not os.path.exists(self.wallet_dir):
os.makedirs(self.wallet_dir)
if not os.path.exists(self.wallet_conf):
@ -323,7 +326,7 @@ class LBRYDownloader(object):
return 0.0
def log_error(err):
logging.warning("unable to request free credits. %s", err.getErrorMessage())
log.warning("unable to request free credits. %s", err.getErrorMessage())
return 0.0
def request_credits(address):
@ -468,7 +471,7 @@ class LBRYDownloader(object):
def show_err(err):
tkMessageBox.showerror(title="Download Error", message=err.getErrorMessage())
logging.error(err.getErrorMessage())
log.error(err.getErrorMessage())
stream_frame.show_download_done(payment_rate_manager.points_paid)
resolve_d.addErrback(lambda err: err.trap(defer.CancelledError, UnknownNameError,

View file

@ -9,7 +9,15 @@ import locale
def start_downloader():
log_format = "(%(asctime)s)[%(filename)s:%(lineno)s] %(funcName)s(): %(message)s"
logging.basicConfig(level=logging.DEBUG, format=log_format, filename="downloader.log")
formatter = logging.Formatter(log_format)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler("downloader.log")
file_handler.setFormatter(formatter)
file_handler.addFilter(logging.Filter("lbrynet"))
logger.addHandler(file_handler)
sys.stdout = open("downloader.out.log", 'w')
sys.stderr = open("downloader.err.log", 'w')

View file

@ -42,7 +42,6 @@ from lbrynet.lbrylive.client.LiveStreamDownloader import add_full_live_stream_do
log_format = "%(funcName)s(): %(message)s"
logging.basicConfig(level=logging.WARNING, format=log_format)
logging.debug("test")
class FakeNode(object):