Merge pull request #116 from lbryio/reflector

Backend for pushing newly created files to a designated server
This commit is contained in:
Jack Robison 2016-08-21 03:43:03 -04:00 committed by GitHub
commit 02b19402ae
26 changed files with 1084 additions and 21 deletions

View file

@ -28,6 +28,8 @@ SEARCH_SERVERS = ["http://lighthouse1.lbry.io:50005",
"http://lighthouse2.lbry.io:50005",
"http://lighthouse3.lbry.io:50005"]
REFLECTOR_SERVERS = [("reflector.lbry.io", 5566)]
LOG_FILE_NAME = "lbrynet.log"
LOG_POST_URL = "https://lbry.io/log-upload"

View file

@ -73,7 +73,7 @@ class ClientProtocol(Protocol):
return defer.fail(failure.Failure(ValueError("There is already a request for that response active")))
self._next_request.update(request.request_dict)
d = defer.Deferred()
log.debug("Adding a request. Request: %s", str(request))
log.debug("Adding a request. Request: %s", str(request.request_dict))
self._response_deferreds[request.response_identifier] = d
return d

View file

@ -172,7 +172,7 @@ class ConnectionManager(object):
def pick_best_peer(peers):
# TODO: Eventually rank them based on past performance/reputation. For now
# TODO: just pick the first to which we don't have an open connection
log.debug("Got a list of peers to choose from: %s", str(peers))
log.debug("Got a list of peers to choose from: %s", str(["%s:%i" % (p.host, p.port) for p in peers]))
if peers is None:
return None
for peer in peers:

View file

@ -80,6 +80,7 @@ def disable_noisy_loggers():
logging.getLogger('lbrynet.core.client.ConnectionManager').setLevel(logging.INFO)
logging.getLogger('lbrynet.core.client.BlobRequester').setLevel(logging.INFO)
logging.getLogger('lbrynet.core.client.ClientProtocol').setLevel(logging.INFO)
logging.getLogger('lbrynet.analytics.api').setLevel(logging.INFO)
@_log_decorator

View file

@ -44,6 +44,7 @@ class BlobAvailabilityHandler(object):
d = self._get_available_blobs(queries[self.query_identifiers[0]])
def set_field(available_blobs):
log.debug("available blobs: %s", str(available_blobs))
return {'available_blobs': available_blobs}
d.addCallback(set_field)

View file

@ -0,0 +1,2 @@
from lbrynet.lbryfile.StreamDescriptor import get_sd_info
from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob

View file

@ -49,7 +49,7 @@ from lbrynet.lbrynet_console.LBRYSettings import LBRYSettings
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, DEFAULT_MAX_SEARCH_RESULTS, \
KNOWN_DHT_NODES, DEFAULT_MAX_KEY_FEE, DEFAULT_WALLET, \
DEFAULT_SEARCH_TIMEOUT, DEFAULT_CACHE_TIME, DEFAULT_UI_BRANCH, \
LOG_POST_URL, LOG_FILE_NAME
LOG_POST_URL, LOG_FILE_NAME, REFLECTOR_SERVERS
from lbrynet.conf import DEFAULT_SD_DOWNLOAD_TIMEOUT
from lbrynet.conf import DEFAULT_TIMEOUT
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob, BlobStreamDescriptorReader
@ -58,7 +58,7 @@ from lbrynet.core.PTCWallet import PTCWallet
from lbrynet.core.LBRYWallet import LBRYcrdWallet, LBRYumWallet
from lbrynet.lbryfilemanager.LBRYFileManager import LBRYFileManager
from lbrynet.lbryfile.LBRYFileMetadataManager import DBLBRYFileMetadataManager, TempLBRYFileMetadataManager
# from lbryum import LOG_PATH as lbryum_log
from lbrynet import reflector
# TODO: this code snippet is everywhere. Make it go away
@ -211,9 +211,11 @@ class LBRYDaemon(jsonrpc.JSONRPC):
'delete_blobs_on_remove': True,
'peer_port': 3333,
'dht_node_port': 4444,
'reflector_port': 5566,
'use_upnp': True,
'start_lbrycrdd': True,
'requested_first_run_credits': False,
'run_reflector_server': False,
'cache_time': DEFAULT_CACHE_TIME,
'startup_scripts': [],
'last_version': {'lbrynet': lbrynet_version, 'lbryum': lbryum_version}
@ -273,6 +275,7 @@ class LBRYDaemon(jsonrpc.JSONRPC):
self.search_timeout = self.session_settings['search_timeout']
self.download_timeout = self.session_settings['download_timeout']
self.max_search_results = self.session_settings['max_search_results']
self.run_reflector_server = self.session_settings['run_reflector_server']
####
#
# Ignore the saved wallet type. Some users will have their wallet type
@ -299,6 +302,7 @@ class LBRYDaemon(jsonrpc.JSONRPC):
####
self.delete_blobs_on_remove = self.session_settings['delete_blobs_on_remove']
self.peer_port = self.session_settings['peer_port']
self.reflector_port = self.session_settings['reflector_port']
self.dht_node_port = self.session_settings['dht_node_port']
self.use_upnp = self.session_settings['use_upnp']
self.start_lbrycrdd = self.session_settings['start_lbrycrdd']
@ -683,10 +687,10 @@ class LBRYDaemon(jsonrpc.JSONRPC):
def _start_server(self):
if self.peer_port is not None:
server_factory = ServerProtocolFactory(self.session.rate_limiter,
self.query_handlers,
self.session.peer_manager)
try:
self.lbry_server_port = reactor.listenTCP(self.peer_port, server_factory)
except error.CannotListenError as e:
@ -695,6 +699,33 @@ class LBRYDaemon(jsonrpc.JSONRPC):
raise ValueError("%s lbrynet may already be running on your computer.", str(e))
return defer.succeed(True)
def _start_reflector(self):
if self.run_reflector_server:
log.info("Starting reflector server")
if self.reflector_port is not None:
reflector_factory = reflector.ServerFactory(
self.session.peer_manager,
self.session.blob_manager
)
try:
self.reflector_server_port = reactor.listenTCP(self.reflector_port, reflector_factory)
log.info('Started reflector on port %s', self.reflector_port)
except error.CannotListenError as e:
log.exception("Couldn't bind reflector to port %d", self.reflector_port)
raise ValueError("{} lbrynet may already be running on your computer.".format(e))
return defer.succeed(True)
def _stop_reflector(self):
if self.run_reflector_server:
log.info("Stopping reflector server")
try:
if self.reflector_server_port is not None:
self.reflector_server_port, p = None, self.reflector_server_port
return defer.maybeDeferred(p.stopListening)
except AttributeError:
return defer.succeed(True)
return defer.succeed(True)
def _stop_server(self):
try:
if self.lbry_server_port is not None:
@ -708,7 +739,8 @@ class LBRYDaemon(jsonrpc.JSONRPC):
def _setup_server(self):
def restore_running_status(running):
if running is True:
return self._start_server()
d = self._start_server()
d.addCallback(lambda _: self._start_reflector())
return defer.succeed(True)
self.startup_status = STARTUP_STAGES[4]
@ -808,6 +840,7 @@ class LBRYDaemon(jsonrpc.JSONRPC):
d = self._upload_log(log_type="close", exclude_previous=False if self.first_run else True)
d.addCallback(lambda _: self._stop_server())
d.addCallback(lambda _: self._stop_reflector())
d.addErrback(lambda err: True)
d.addCallback(lambda _: self.lbry_file_manager.stop())
d.addErrback(lambda err: True)
@ -1307,6 +1340,30 @@ class LBRYDaemon(jsonrpc.JSONRPC):
d = defer.DeferredList([self._get_lbry_file('sd_hash', l.sd_hash) for l in self.lbry_file_manager.lbry_files])
return d
def _reflect(self, lbry_file):
if not lbry_file:
return defer.fail(Exception("no lbry file given to reflect"))
stream_hash = lbry_file.stream_hash
if stream_hash is None:
return defer.fail(Exception("no stream hash"))
log.info("Reflecting stream: %s" % stream_hash)
reflector_server = random.choice(REFLECTOR_SERVERS)
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
log.info("Start reflector client")
factory = reflector.ClientFactory(
self.session.blob_manager,
self.lbry_file_manager.stream_info_manager,
stream_hash
)
d = reactor.resolve(reflector_address)
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
d.addCallback(lambda _: factory.finished_deferred)
return d
def _log_to_slack(self, msg):
URL = "https://hooks.slack.com/services/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"
msg = platform.platform() + ": " + base58.b58encode(self.lbryid)[:20] + ", " + msg
@ -1881,6 +1938,12 @@ class LBRYDaemon(jsonrpc.JSONRPC):
m['fee'][currency]['address'] = address
return m
def _reflect_if_possible(sd_hash, txid):
d = self._get_lbry_file('sd_hash', sd_hash, return_json=False)
d.addCallback(self._reflect)
d.addCallback(lambda _: txid)
return d
name = p['name']
log.info("Publish: ")
@ -1897,8 +1960,10 @@ class LBRYDaemon(jsonrpc.JSONRPC):
try:
metadata = Metadata(p['metadata'])
make_lbry_file = False
sd_hash = metadata['sources']['lbry_sd_hash']
except AssertionError:
make_lbry_file = True
sd_hash = None
metadata = p['metadata']
file_path = p['file_path']
@ -1922,6 +1987,9 @@ class LBRYDaemon(jsonrpc.JSONRPC):
d.addCallback(lambda meta: pub.start(name, file_path, bid, meta))
else:
d.addCallback(lambda meta: self.session.wallet.claim_name(name, bid, meta))
if sd_hash:
d.addCallback(lambda txid: _reflect_if_possible(sd_hash, txid))
d.addCallback(lambda txid: self._add_to_pending_claims(name, txid))
d.addCallback(lambda r: self._render_response(r, OK_CODE))
@ -2351,6 +2419,45 @@ class LBRYDaemon(jsonrpc.JSONRPC):
d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d
def jsonrpc_announce_all_blobs_to_dht(self):
"""
Announce all blobs to the dht
Args:
None
Returns:
"""
d = self.session.blob_manager.immediate_announce_all_blobs()
d.addCallback(lambda _: self._render_response("Announced", OK_CODE))
return d
def jsonrpc_reflect(self, p):
"""
Reflect a stream
Args:
sd_hash
Returns:
True or traceback
"""
sd_hash = p['sd_hash']
d = self._get_lbry_file('sd_hash', sd_hash, return_json=False)
d.addCallback(self._reflect)
d.addCallbacks(lambda _: self._render_response(True, OK_CODE), lambda err: self._render_response(err.getTraceback(), OK_CODE))
return d
def jsonrpc_get_blobs(self):
"""
return all blobs
"""
d = defer.succeed(self.session.blob_manager.blobs)
d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d
def get_lbrynet_version_from_github():
"""Return the latest released version from github."""

View file

@ -2,6 +2,7 @@ import logging
import mimetypes
import os
import sys
import random
from appdirs import user_data_dir
@ -11,8 +12,9 @@ from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob
from lbrynet.core.PaymentRateManager import PaymentRateManager
from lbrynet.core.LBRYMetadata import Metadata, CURRENT_METADATA_VERSION
from lbrynet.lbryfilemanager.LBRYFileDownloader import ManagedLBRYFileDownloader
from lbrynet.conf import LOG_FILE_NAME
from twisted.internet import threads, defer
from lbrynet import reflector
from lbrynet.conf import LOG_FILE_NAME, REFLECTOR_SERVERS
from twisted.internet import threads, defer, reactor
if sys.platform != "darwin":
log_dir = os.path.join(os.path.expanduser("~"), ".lbrynet")
@ -40,12 +42,14 @@ class Publisher(object):
self.lbry_file = None
self.txid = None
self.stream_hash = None
reflector_server = random.choice(REFLECTOR_SERVERS)
self.reflector_server, self.reflector_port = reflector_server[0], reflector_server[1]
self.metadata = {}
def start(self, name, file_path, bid, metadata):
log.info('Starting publish for %s', name)
def _show_result():
log.info("Published %s --> lbry://%s txid: %s", self.file_name, self.publish_name, self.txid)
log.info("Success! Published %s --> lbry://%s txid: %s", self.file_name, self.publish_name, self.txid)
return defer.succeed(self.txid)
self.publish_name = name
@ -60,10 +64,25 @@ class Publisher(object):
d.addCallback(lambda _: self._create_sd_blob())
d.addCallback(lambda _: self._claim_name())
d.addCallback(lambda _: self.set_status())
d.addCallback(lambda _: self.start_reflector())
d.addCallbacks(lambda _: _show_result(), self._show_publish_error)
return d
def start_reflector(self):
reflector_server = random.choice(REFLECTOR_SERVERS)
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
log.info("Reflecting new publication")
factory = reflector.ClientFactory(
self.session.blob_manager,
self.lbry_file_manager.stream_info_manager,
self.stream_hash
)
d = reactor.resolve(reflector_address)
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
d.addCallback(lambda _: factory.finished_deferred)
return d
def _check_file_path(self, file_path):
def check_file_threaded():
f = open(file_path)
@ -84,10 +103,13 @@ class Publisher(object):
return d
def _create_sd_blob(self):
d = publish_sd_blob(self.lbry_file_manager.stream_info_manager, self.session.blob_manager,
log.debug('Creating stream descriptor blob')
d = publish_sd_blob(self.lbry_file_manager.stream_info_manager,
self.session.blob_manager,
self.lbry_file.stream_hash)
def set_sd_hash(sd_hash):
log.debug('stream descriptor hash: %s', sd_hash)
if 'sources' not in self.metadata:
self.metadata['sources'] = {}
self.metadata['sources']['lbry_sd_hash'] = sd_hash
@ -96,23 +118,29 @@ class Publisher(object):
return d
def set_status(self):
log.debug('Setting status')
d = self.lbry_file_manager.change_lbry_file_status(self.lbry_file, ManagedLBRYFileDownloader.STATUS_FINISHED)
d.addCallback(lambda _: self.lbry_file.restore())
return d
def _claim_name(self):
self.metadata['content-type'] = mimetypes.guess_type(os.path.join(self.lbry_file.download_directory,
self.lbry_file.file_name))[0]
self.metadata['ver'] = CURRENT_METADATA_VERSION
log.debug('Claiming name')
self._update_metadata()
m = Metadata(self.metadata)
def set_tx_hash(txid):
log.debug('Name claimed using txid: %s', txid)
self.txid = txid
d = self.wallet.claim_name(self.publish_name, self.bid_amount, m)
d.addCallback(set_tx_hash)
return d
def _update_metadata(self):
filename = os.path.join(self.lbry_file.download_directory, self.lbry_file.file_name)
self.metadata['content-type'] = get_content_type(filename)
self.metadata['ver'] = CURRENT_METADATA_VERSION
def _show_publish_error(self, err):
log.info(err.getTraceback())
message = "An error occurred publishing %s to %s. Error: %s."
@ -125,3 +153,7 @@ class Publisher(object):
log.error(message, str(self.file_name), str(self.publish_name), err.getTraceback())
return defer.fail(Exception("Publish failed"))
def get_content_type(filename):
return mimetypes.guess_type(filename)[0]

View file

@ -0,0 +1,2 @@
from lbrynet.reflector.server.server import ReflectorServerFactory as ServerFactory
from lbrynet.reflector.client.client import LBRYFileReflectorClientFactory as ClientFactory

View file

@ -0,0 +1,269 @@
"""
The reflector protocol (all dicts encoded in json):
Client Handshake (sent once per connection, at the start of the connection):
{
'version': 0,
}
Server Handshake (sent once per connection, after receiving the client handshake):
{
'version': 0,
}
Client Info Request:
{
'blob_hash': "<blob_hash>",
'blob_size': <blob_size>
}
Server Info Response (sent in response to Client Info Request):
{
'send_blob': True|False
}
If response is 'YES', client may send a Client Blob Request or a Client Info Request.
If response is 'NO', client may only send a Client Info Request
Client Blob Request:
{} # Yes, this is an empty dictionary, in case something needs to go here in the future
<raw blob_data> # this blob data must match the info sent in the most recent Client Info Request
Server Blob Response (sent in response to Client Blob Request):
{
'received_blob': True
}
Client may now send another Client Info Request
"""
import json
import logging
from twisted.protocols.basic import FileSender
from twisted.internet.protocol import Protocol, ClientFactory
from twisted.internet import defer, error
log = logging.getLogger(__name__)
class IncompleteResponseError(Exception):
pass
class LBRYFileReflectorClient(Protocol):
# Protocol stuff
def connectionMade(self):
self.blob_manager = self.factory.blob_manager
self.response_buff = ''
self.outgoing_buff = ''
self.blob_hashes_to_send = []
self.next_blob_to_send = None
self.blob_read_handle = None
self.received_handshake_response = False
self.protocol_version = None
self.file_sender = None
self.producer = None
self.streaming = False
d = self.get_blobs_to_send(self.factory.stream_info_manager, self.factory.stream_hash)
d.addCallback(lambda _: self.send_handshake())
d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
def dataReceived(self, data):
log.debug('Recieved %s', data)
self.response_buff += data
try:
msg = self.parse_response(self.response_buff)
except IncompleteResponseError:
pass
else:
self.response_buff = ''
d = self.handle_response(msg)
d.addCallback(lambda _: self.send_next_request())
d.addErrback(self.response_failure_handler)
def connectionLost(self, reason):
if reason.check(error.ConnectionDone):
log.debug('Finished sending data via reflector')
self.factory.finished_deferred.callback(True)
else:
log.debug('reflector finished: %s', reason)
self.factory.finished_deferred.callback(reason)
# IConsumer stuff
def registerProducer(self, producer, streaming):
self.producer = producer
self.streaming = streaming
if self.streaming is False:
from twisted.internet import reactor
reactor.callLater(0, self.producer.resumeProducing)
def unregisterProducer(self):
self.producer = None
def write(self, data):
self.transport.write(data)
if self.producer is not None and self.streaming is False:
from twisted.internet import reactor
reactor.callLater(0, self.producer.resumeProducing)
def get_blobs_to_send(self, stream_info_manager, stream_hash):
log.debug('Getting blobs from stream hash: %s', stream_hash)
d = stream_info_manager.get_blobs_for_stream(stream_hash)
def set_blobs(blob_hashes):
for blob_hash, position, iv, length in blob_hashes:
log.info("Preparing to send %s", blob_hash)
if blob_hash is not None:
self.blob_hashes_to_send.append(blob_hash)
d.addCallback(set_blobs)
d.addCallback(lambda _: stream_info_manager.get_sd_blob_hashes_for_stream(stream_hash))
def set_sd_blobs(sd_blob_hashes):
for sd_blob_hash in sd_blob_hashes:
self.blob_hashes_to_send.append(sd_blob_hash)
d.addCallback(set_sd_blobs)
return d
def send_handshake(self):
log.debug('Sending handshake')
self.write(json.dumps({'version': 0}))
def parse_response(self, buff):
try:
return json.loads(buff)
except ValueError:
raise IncompleteResponseError()
def response_failure_handler(self, err):
log.warning("An error occurred handling the response: %s", err.getTraceback())
def handle_response(self, response_dict):
if self.received_handshake_response is False:
return self.handle_handshake_response(response_dict)
else:
return self.handle_normal_response(response_dict)
def set_not_uploading(self):
if self.next_blob_to_send is not None:
self.next_blob_to_send.close_read_handle(self.read_handle)
self.read_handle = None
self.next_blob_to_send = None
self.file_sender = None
return defer.succeed(None)
def start_transfer(self):
self.write(json.dumps({}))
assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, self)
return d
def handle_handshake_response(self, response_dict):
if 'version' not in response_dict:
raise ValueError("Need protocol version number!")
self.protocol_version = int(response_dict['version'])
if self.protocol_version != 0:
raise ValueError("I can't handle protocol version {}!".format(self.protocol_version))
self.received_handshake_response = True
return defer.succeed(True)
def handle_normal_response(self, response_dict):
if self.file_sender is None: # Expecting Server Info Response
if 'send_blob' not in response_dict:
raise ValueError("I don't know whether to send the blob or not!")
if response_dict['send_blob'] is True:
self.file_sender = FileSender()
return defer.succeed(True)
else:
return self.set_not_uploading()
else: # Expecting Server Blob Response
if 'received_blob' not in response_dict:
raise ValueError("I don't know if the blob made it to the intended destination!")
else:
return self.set_not_uploading()
def open_blob_for_reading(self, blob):
if blob.is_validated():
read_handle = blob.open_for_reading()
if read_handle is not None:
log.debug('Getting ready to send %s', blob.blob_hash)
self.next_blob_to_send = blob
self.read_handle = read_handle
return None
raise ValueError("Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash))
def send_blob_info(self):
log.info("Send blob info for %s", self.next_blob_to_send.blob_hash)
assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
log.debug('sending blob info')
self.write(json.dumps({
'blob_hash': self.next_blob_to_send.blob_hash,
'blob_size': self.next_blob_to_send.length
}))
def send_next_request(self):
if self.file_sender is not None:
# send the blob
log.debug('Sending the blob')
return self.start_transfer()
elif self.blob_hashes_to_send:
# open the next blob to send
blob_hash = self.blob_hashes_to_send[0]
log.debug('No current blob, sending the next one: %s', blob_hash)
self.blob_hashes_to_send = self.blob_hashes_to_send[1:]
d = self.blob_manager.get_blob(blob_hash, True)
d.addCallback(self.open_blob_for_reading)
# send the server the next blob hash + length
d.addCallback(lambda _: self.send_blob_info())
return d
else:
# close connection
log.debug('No more blob hashes, closing connection')
self.transport.loseConnection()
class LBRYFileReflectorClientFactory(ClientFactory):
protocol = LBRYFileReflectorClient
def __init__(self, blob_manager, stream_info_manager, stream_hash):
self.blob_manager = blob_manager
self.stream_info_manager = stream_info_manager
self.stream_hash = stream_hash
self.p = None
self.finished_deferred = defer.Deferred()
def buildProtocol(self, addr):
p = self.protocol()
p.factory = self
self.p = p
return p
def startFactory(self):
log.debug('Starting reflector factory')
ClientFactory.startFactory(self)
def startedConnecting(self, connector):
log.debug('Started connecting')
def clientConnectionLost(self, connector, reason):
"""If we get disconnected, reconnect to server."""
log.debug("connection lost: %s", reason)
def clientConnectionFailed(self, connector, reason):
log.debug("connection failed: %s", reason)

View file

@ -0,0 +1,148 @@
import logging
from twisted.python import failure
from twisted.internet import error, defer
from twisted.internet.protocol import Protocol, ServerFactory
import json
from lbrynet.core.utils import is_valid_blobhash
log = logging.getLogger(__name__)
class ReflectorServer(Protocol):
def connectionMade(self):
peer_info = self.transport.getPeer()
log.debug('Connection made to %s', peer_info)
self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port)
self.blob_manager = self.factory.blob_manager
self.received_handshake = False
self.peer_version = None
self.receiving_blob = False
self.incoming_blob = None
self.blob_write = None
self.blob_finished_d = None
self.cancel_write = None
self.request_buff = ""
def connectionLost(self, reason=failure.Failure(error.ConnectionDone())):
log.info("Reflector upload from %s finished" % self.peer.host)
def dataReceived(self, data):
if self.receiving_blob:
# log.debug('Writing data to blob')
self.blob_write(data)
else:
log.debug('Not yet recieving blob, data needs further processing')
self.request_buff += data
msg, extra_data = self._get_valid_response(self.request_buff)
if msg is not None:
self.request_buff = ''
d = self.handle_request(msg)
d.addCallbacks(self.send_response, self.handle_error)
if self.receiving_blob and extra_data:
log.debug('Writing extra data to blob')
self.blob_write(extra_data)
def _get_valid_response(self, response_msg):
extra_data = None
response = None
curr_pos = 0
while True:
next_close_paren = response_msg.find('}', curr_pos)
if next_close_paren != -1:
curr_pos = next_close_paren + 1
try:
response = json.loads(response_msg[:curr_pos])
except ValueError:
if curr_pos > 100:
raise Exception("error decoding response")
else:
pass
else:
extra_data = response_msg[curr_pos:]
break
else:
break
return response, extra_data
def handle_request(self, request_dict):
if self.received_handshake is False:
return self.handle_handshake(request_dict)
else:
return self.handle_normal_request(request_dict)
def handle_handshake(self, request_dict):
log.debug('Handling handshake')
if 'version' not in request_dict:
raise ValueError("Client should send version")
self.peer_version = int(request_dict['version'])
if self.peer_version != 0:
raise ValueError("I don't know that version!")
self.received_handshake = True
return defer.succeed({'version': 0})
def determine_blob_needed(self, blob):
if blob.is_validated():
return {'send_blob': False}
else:
self.incoming_blob = blob
self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer)
self.blob_finished_d.addCallback(lambda _ :self.blob_manager.blob_completed(blob))
return {'send_blob': True}
def close_blob(self):
self.blob_finished_d = None
self.blob_write = None
self.cancel_write = None
self.incoming_blob = None
self.receiving_blob = False
def handle_normal_request(self, request_dict):
if self.blob_write is None:
# we haven't opened a blob yet, meaning we must be waiting for the
# next message containing a blob hash and a length. this message
# should be it. if it's one we want, open the blob for writing, and
# return a nice response dict (in a Deferred) saying go ahead
if not 'blob_hash' in request_dict or not 'blob_size' in request_dict:
raise ValueError("Expected a blob hash and a blob size")
if not is_valid_blobhash(request_dict['blob_hash']):
raise ValueError("Got a bad blob hash: {}".format(request_dict['blob_hash']))
log.debug('Recieved info for blob: %s', request_dict['blob_hash'])
d = self.blob_manager.get_blob(
request_dict['blob_hash'],
True,
int(request_dict['blob_size'])
)
d.addCallback(self.determine_blob_needed)
else:
# we have a blob open already, so this message should have nothing
# important in it. to the deferred that fires when the blob is done,
# add a callback which returns a nice response dict saying to keep
# sending, and then return that deferred
log.debug('blob is already open')
self.receiving_blob = True
d = self.blob_finished_d
d.addCallback(lambda _: self.close_blob())
d.addCallback(lambda _: {'received_blob': True})
return d
def send_response(self, response_dict):
self.transport.write(json.dumps(response_dict))
def handle_error(self, err):
log.error(err.getTraceback())
self.transport.loseConnection()
class ReflectorServerFactory(ServerFactory):
protocol = ReflectorServer
def __init__(self, peer_manager, blob_manager):
self.peer_manager = peer_manager
self.blob_manager = blob_manager
def buildProtocol(self, addr):
log.debug('Creating a protocol for %s', addr)
return ServerFactory.buildProtocol(self, addr)

View file

@ -0,0 +1,190 @@
import os
import shutil
from twisted.internet import defer, threads, error
from twisted.trial import unittest
from lbrynet import conf
from lbrynet import lbryfile
from lbrynet import reflector
from lbrynet.core import BlobManager
from lbrynet.core import PeerManager
from lbrynet.core import RateLimiter
from lbrynet.core import Session
from lbrynet.core import StreamDescriptor
from lbrynet.lbryfile import LBRYFileMetadataManager
from lbrynet.lbryfile.client import LBRYFileOptions
from lbrynet.lbryfilemanager import LBRYFileCreator
from lbrynet.lbryfilemanager import LBRYFileManager
from tests import mocks
class TestReflector(unittest.TestCase):
def setUp(self):
self.session = None
self.stream_info_manager = None
self.lbry_file_manager = None
self.server_blob_manager = None
self.reflector_port = None
self.addCleanup(self.take_down_env)
def take_down_env(self):
d = defer.succeed(True)
if self.lbry_file_manager is not None:
d.addCallback(lambda _: self.lbry_file_manager.stop())
if self.session is not None:
d.addCallback(lambda _: self.session.shut_down())
if self.stream_info_manager is not None:
d.addCallback(lambda _: self.stream_info_manager.stop())
if self.server_blob_manager is not None:
d.addCallback(lambda _: self.server_blob_manager.stop())
if self.reflector_port is not None:
d.addCallback(lambda _: self.reflector_port.stopListening())
def delete_test_env():
shutil.rmtree('client')
d.addCallback(lambda _: threads.deferToThread(delete_test_env))
return d
def test_reflector(self):
wallet = mocks.Wallet()
peer_manager = PeerManager.PeerManager()
peer_finder = mocks.PeerFinder(5553, peer_manager, 2)
hash_announcer = mocks.Announcer()
rate_limiter = RateLimiter.DummyRateLimiter()
sd_identifier = StreamDescriptor.StreamDescriptorIdentifier()
self.expected_blobs = [
(
'dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b'
'441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586',
2097152
),
(
'f4067522c1b49432a2a679512e3917144317caa1abba0c04'
'1e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70',
2097152
),
(
'305486c434260484fcb2968ce0e963b72f81ba56c11b08b1'
'af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9',
1015056
),
]
db_dir = "client"
os.mkdir(db_dir)
self.session = Session.LBRYSession(
conf.MIN_BLOB_DATA_PAYMENT_RATE,
db_dir=db_dir,
lbryid="abcd",
peer_finder=peer_finder,
hash_announcer=hash_announcer,
blob_dir=None,
peer_port=5553,
use_upnp=False,
rate_limiter=rate_limiter,
wallet=wallet
)
self.stream_info_manager = LBRYFileMetadataManager.TempLBRYFileMetadataManager()
self.lbry_file_manager = LBRYFileManager.LBRYFileManager(
self.session, self.stream_info_manager, sd_identifier)
self.server_blob_manager = BlobManager.TempBlobManager(hash_announcer)
d = self.session.setup()
d.addCallback(lambda _: self.stream_info_manager.setup())
d.addCallback(lambda _: LBRYFileOptions.add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup())
d.addCallback(lambda _: self.server_blob_manager.setup())
def verify_equal(sd_info):
self.assertEqual(mocks.create_stream_sd_file, sd_info)
def save_sd_blob_hash(sd_hash):
self.expected_blobs.append((sd_hash, 923))
def verify_stream_descriptor_file(stream_hash):
d = lbryfile.get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True)
d.addCallback(verify_equal)
d.addCallback(
lambda _: lbryfile.publish_sd_blob(
self.lbry_file_manager.stream_info_manager,
self.session.blob_manager, stream_hash
)
)
d.addCallback(save_sd_blob_hash)
d.addCallback(lambda _: stream_hash)
return d
def create_stream():
test_file = mocks.GenFile(5209343, b''.join([chr(i + 3) for i in xrange(0, 64, 6)]))
d = LBRYFileCreator.create_lbry_file(
self.session,
self.lbry_file_manager,
"test_file",
test_file,
key="0123456701234567",
iv_generator=iv_generator()
)
return d
def start_server():
server_factory = reflector.ServerFactory(peer_manager, self.server_blob_manager)
from twisted.internet import reactor
port = 8943
while self.reflector_port is None:
try:
self.reflector_port = reactor.listenTCP(port, server_factory)
except error.CannotListenError:
port += 1
return defer.succeed(port)
def send_to_server(port, stream_hash):
factory = reflector.ClientFactory(
self.session.blob_manager,
self.stream_info_manager,
stream_hash
)
from twisted.internet import reactor
reactor.connectTCP('localhost', port, factory)
return factory.finished_deferred
def verify_blob_completed(blob, blob_size):
self.assertTrue(blob.is_validated())
self.assertEqual(blob_size, blob.length)
def verify_have_blob(blob_hash, blob_size):
d = self.server_blob_manager.get_blob(blob_hash, True)
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
return d
def verify_data_on_reflector():
check_blob_ds = []
for blob_hash, blob_size in self.expected_blobs:
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
return defer.DeferredList(check_blob_ds)
def upload_to_reflector(stream_hash):
d = start_server()
d.addCallback(lambda port: send_to_server(port, stream_hash))
d.addCallback(lambda _: verify_data_on_reflector())
return d
d.addCallback(lambda _: create_stream())
d.addCallback(verify_stream_descriptor_file)
d.addCallback(upload_to_reflector)
return d
def iv_generator():
iv = 0
while True:
iv += 1
yield "%016d" % iv

View file

@ -26,7 +26,7 @@ from lbrynet.core.StreamDescriptor import download_sd_blob
from lbrynet.lbryfilemanager.LBRYFileCreator import create_lbry_file
from lbrynet.lbryfile.client.LBRYFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.lbryfile.StreamDescriptor import get_sd_info
from twisted.internet import defer, threads, task
from twisted.internet import defer, threads, task, error
from twisted.trial.unittest import TestCase
from twisted.python.failure import Failure
import os
@ -38,6 +38,10 @@ from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.lbrylive.server.LiveBlobInfoQueryHandler import CryptBlobInfoQueryHandlerFactory
from lbrynet.lbrylive.client.LiveStreamOptions import add_live_stream_to_sd_identifier
from lbrynet.lbrylive.client.LiveStreamDownloader import add_full_live_stream_downloader_to_sd_identifier
from lbrynet.core.BlobManager import TempBlobManager
from lbrynet.reflector.client.client import LBRYFileReflectorClientFactory
from lbrynet.reflector.server.server import ReflectorServerFactory
from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob
log_format = "%(funcName)s(): %(message)s"
@ -1369,4 +1373,147 @@ class TestStreamify(TestCase):
d.addCallback(lambda _: self.lbry_file_manager.setup())
d.addCallback(lambda _: create_stream())
d.addCallback(combine_stream)
return d
class TestReflector(TestCase):
def setUp(self):
self.session = None
self.stream_info_manager = None
self.lbry_file_manager = None
self.server_blob_manager = None
self.reflector_port = None
self.addCleanup(self.take_down_env)
def take_down_env(self):
d = defer.succeed(True)
if self.lbry_file_manager is not None:
d.addCallback(lambda _: self.lbry_file_manager.stop())
if self.session is not None:
d.addCallback(lambda _: self.session.shut_down())
if self.stream_info_manager is not None:
d.addCallback(lambda _: self.stream_info_manager.stop())
if self.server_blob_manager is not None:
d.addCallback(lambda _: self.server_blob_manager.stop())
if self.reflector_port is not None:
d.addCallback(lambda _: self.reflector_port.stopListening())
def delete_test_env():
shutil.rmtree('client')
d.addCallback(lambda _: threads.deferToThread(delete_test_env))
return d
def test_reflector(self):
wallet = FakeWallet()
peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager, 2)
hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
self.expected_blobs = [
('dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b'
'441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586', 2097152),
('f4067522c1b49432a2a679512e3917144317caa1abba0c04'
'1e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70', 2097152),
('305486c434260484fcb2968ce0e963b72f81ba56c11b08b1'
'af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9', 1015056),
]
db_dir = "client"
os.mkdir(db_dir)
self.session = LBRYSession(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=None, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet)
self.stream_info_manager = TempLBRYFileMetadataManager()
self.lbry_file_manager = LBRYFileManager(self.session, self.stream_info_manager, sd_identifier)
self.server_blob_manager = TempBlobManager(hash_announcer)
d = self.session.setup()
d.addCallback(lambda _: self.stream_info_manager.setup())
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup())
d.addCallback(lambda _: self.server_blob_manager.setup())
def verify_equal(sd_info):
self.assertEqual(sd_info, test_create_stream_sd_file)
def save_sd_blob_hash(sd_hash):
self.expected_blobs.append((sd_hash, 923))
def verify_stream_descriptor_file(stream_hash):
d = get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True)
d.addCallback(verify_equal)
d.addCallback(lambda _: publish_sd_blob(self.lbry_file_manager.stream_info_manager, self.session.blob_manager, stream_hash))
d.addCallback(save_sd_blob_hash)
d.addCallback(lambda _: stream_hash)
return d
def iv_generator():
iv = 0
while 1:
iv += 1
yield "%016d" % iv
def create_stream():
test_file = GenFile(5209343, b''.join([chr(i + 3) for i in xrange(0, 64, 6)]))
d = create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file,
key="0123456701234567", iv_generator=iv_generator())
return d
def start_server():
server_factory = ReflectorServerFactory(peer_manager, self.server_blob_manager)
from twisted.internet import reactor
port = 8943
while self.reflector_port is None:
try:
self.reflector_port = reactor.listenTCP(port, server_factory)
except error.CannotListenError:
port += 1
return defer.succeed(port)
def send_to_server(port, stream_hash):
factory = LBRYFileReflectorClientFactory(
self.session.blob_manager,
self.stream_info_manager,
stream_hash
)
from twisted.internet import reactor
reactor.connectTCP('localhost', port, factory)
return factory.finished_deferred
def verify_blob_completed(blob, blob_size):
self.assertTrue(blob.is_validated())
self.assertEqual(blob_size, blob.length)
def verify_have_blob(blob_hash, blob_size):
d = self.server_blob_manager.get_blob(blob_hash, True)
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
return d
def verify_data_on_reflector():
check_blob_ds = []
for blob_hash, blob_size in self.expected_blobs:
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
return defer.DeferredList(check_blob_ds)
def upload_to_reflector(stream_hash):
d = start_server()
d.addCallback(lambda port: send_to_server(port, stream_hash))
d.addCallback(lambda _: verify_data_on_reflector())
return d
d.addCallback(lambda _: create_stream())
d.addCallback(verify_stream_descriptor_file)
d.addCallback(upload_to_reflector)
return d

164
tests/mocks.py Normal file
View file

@ -0,0 +1,164 @@
import io
from Crypto.PublicKey import RSA
from twisted.internet import defer, threads, task, error
from lbrynet.core import PTCWallet
class Node(object):
def __init__(self, *args, **kwargs):
pass
def joinNetwork(self, *args):
pass
def stop(self):
pass
class Wallet(object):
def __init__(self):
self.private_key = RSA.generate(1024)
self.encoded_public_key = self.private_key.publickey().exportKey()
def start(self):
return defer.succeed(True)
def stop(self):
return defer.succeed(True)
def get_info_exchanger(self):
return PTCWallet.PointTraderKeyExchanger(self)
def get_wallet_info_query_handler_factory(self):
return PTCWallet.PointTraderKeyQueryHandlerFactory(self)
def reserve_points(self, *args):
return True
def cancel_point_reservation(self, *args):
pass
def send_points(self, *args):
return defer.succeed(True)
def add_expected_payment(self, *args):
pass
def get_balance(self):
return defer.succeed(1000)
def set_public_key_for_peer(self, peer, public_key):
pass
class PeerFinder(object):
def __init__(self, start_port, peer_manager, num_peers):
self.start_port = start_port
self.peer_manager = peer_manager
self.num_peers = num_peers
self.count = 0
def find_peers_for_blob(self, *args):
peer_port = self.start_port + self.count
self.count += 1
if self.count >= self.num_peers:
self.count = 0
return defer.succeed([self.peer_manager.get_peer("127.0.0.1", peer_port)])
def run_manage_loop(self):
pass
def stop(self):
pass
class Announcer(object):
def __init__(self, *args):
pass
def add_supplier(self, supplier):
pass
def immediate_announce(self, *args):
pass
def run_manage_loop(self):
pass
def stop(self):
pass
class GenFile(io.RawIOBase):
def __init__(self, size, pattern):
io.RawIOBase.__init__(self)
self.size = size
self.pattern = pattern
self.read_so_far = 0
self.buff = b''
self.last_offset = 0
def readable(self):
return True
def writable(self):
return False
def read(self, n=-1):
if n > -1:
bytes_to_read = min(n, self.size - self.read_so_far)
else:
bytes_to_read = self.size - self.read_so_far
output, self.buff = self.buff[:bytes_to_read], self.buff[bytes_to_read:]
bytes_to_read -= len(output)
while bytes_to_read > 0:
self.buff = self._generate_chunk()
new_output, self.buff = self.buff[:bytes_to_read], self.buff[bytes_to_read:]
bytes_to_read -= len(new_output)
output += new_output
self.read_so_far += len(output)
return output
def readall(self):
return self.read()
def _generate_chunk(self, n=2**10):
output = self.pattern[self.last_offset:self.last_offset + n]
n_left = n - len(output)
whole_patterns = n_left / len(self.pattern)
output += self.pattern * whole_patterns
self.last_offset = n - len(output)
output += self.pattern[:self.last_offset]
return output
create_stream_sd_file = {
'stream_name': '746573745f66696c65',
'blobs': [
{
'length': 2097152,
'blob_num': 0,
'blob_hash': 'dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586',
'iv': '30303030303030303030303030303031'
},
{
'length': 2097152,
'blob_num': 1,
'blob_hash': 'f4067522c1b49432a2a679512e3917144317caa1abba0c041e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70',
'iv': '30303030303030303030303030303032'
},
{
'length': 1015056,
'blob_num': 2,
'blob_hash': '305486c434260484fcb2968ce0e963b72f81ba56c11b08b1af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9',
'iv': '30303030303030303030303030303033'
},
{'length': 0, 'blob_num': 3, 'iv': '30303030303030303030303030303034'}
],
'stream_type': 'lbryfile',
'key': '30313233343536373031323334353637',
'suggested_file_name': '746573745f66696c65',
'stream_hash': '6d27fbe10c86d81aacfb897c7a426d0a2214f5a299455a6d315c0f998c4b3545c2dc60906122d94653c23b1898229e3f'
}

View file

View file

View file

@ -19,12 +19,10 @@ class LBRYFeeFormatTest(unittest.TestCase):
class LBRYFeeTest(unittest.TestCase):
def setUp(self):
self.patcher = mock.patch('time.time')
self.time = self.patcher.start()
patcher = mock.patch('time.time')
self.time = patcher.start()
self.time.return_value = 0
def tearDown(self):
self.time.stop()
self.addCleanup(patcher.stop)
def test_fee_converts_to_lbc(self):
fee_dict = {
@ -35,4 +33,4 @@ class LBRYFeeTest(unittest.TestCase):
}
rates = {'BTCLBC': {'spot': 3.0, 'ts': 2}, 'USDBTC': {'spot': 2.0, 'ts': 3}}
manager = LBRYExchangeRateManager.DummyExchangeRateManager(rates)
self.assertEqual(60.0, manager.to_lbc(fee_dict).amount)
self.assertEqual(60.0, manager.to_lbc(fee_dict).amount)

View file