Merge branch 'add_lbry_file_manager_to_reflector_2'

This commit is contained in:
Jack Robison 2017-12-29 12:39:50 -05:00
commit bf46f5616b
No known key found for this signature in database
GPG key ID: 284699E7404E3CFF
8 changed files with 77 additions and 51 deletions

View file

@ -63,6 +63,7 @@ at anytime.
* Removed claim related filter arguments `name`, `claim_id`, and `outpoint` from `file_list`, `file_delete`, `file_set_status`, and `file_reflect`
* Removed unused files
* Removed old and unused UI related code
* Removed claim information from lbry file internals
## [0.18.0] - 2017-11-08

View file

@ -324,7 +324,8 @@ class Daemon(AuthJSONRPCServer):
reflector_factory = reflector_server_factory(
self.session.peer_manager,
self.session.blob_manager,
self.stream_info_manager
self.stream_info_manager,
self.lbry_file_manager
)
try:
self.reflector_server_port = reactor.listenTCP(self.reflector_port,

View file

@ -34,10 +34,9 @@ class Publisher(object):
with file_utils.get_read_handle(file_path) as read_handle:
stream_hash = yield create_lbry_file(self.session, self.lbry_file_manager, file_name,
read_handle)
prm = self.session.payment_rate_manager
self.lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash, prm)
sd_hash = yield publish_sd_blob(self.lbry_file_manager.stream_info_manager,
self.session.blob_manager, self.lbry_file.stream_hash)
self.session.blob_manager, stream_hash)
self.lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash)
if 'source' not in claim_dict['stream']:
claim_dict['stream']['source'] = {}
claim_dict['stream']['source']['source'] = sd_hash
@ -47,7 +46,6 @@ class Publisher(object):
claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address)
self.lbry_file.completed = True
yield self.lbry_file.load_file_attributes(sd_hash)
yield self.lbry_file.save_status()
defer.returnValue(claim_out)

View file

@ -7,7 +7,6 @@ from zope.interface import implements
from twisted.internet import defer
from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager
from lbrynet.core.Error import NoSuchStreamHash
from lbrynet.core.utils import short_hash
from lbrynet.core.StreamDescriptor import StreamMetadata
from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaver
@ -35,18 +34,21 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
STATUS_RUNNING = "running"
STATUS_STOPPED = "stopped"
STATUS_FINISHED = "finished"
"""
These are started by EncryptedFileManager, aka, file_manager
"""
def __init__(self, rowid, stream_hash, peer_finder, rate_limiter,
blob_manager, stream_info_manager, lbry_file_manager,
payment_rate_manager, wallet, download_directory,
file_name=None):
EncryptedFileSaver.__init__(self, stream_hash, peer_finder,
rate_limiter, blob_manager,
stream_info_manager,
payment_rate_manager, wallet,
download_directory,
file_name)
self.sd_hash = None
self.rowid = rowid
self.lbry_file_manager = lbry_file_manager
self._saving_status = False
@ -57,7 +59,6 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
@defer.inlineCallbacks
def restore(self):
yield self.load_file_attributes()
status = yield self.lbry_file_manager.get_lbry_file_status(self)
log_status(self.sd_hash, status)
@ -101,23 +102,9 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
defer.returnValue(EncryptedFileStatusReport(self.file_name, num_blobs_completed,
num_blobs_known, status))
@defer.inlineCallbacks
def load_file_attributes(self, sd_hash=None):
if not sd_hash:
sd_hash = yield self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)
if sd_hash:
self.sd_hash = sd_hash[0]
else:
raise NoSuchStreamHash(self.stream_hash)
else:
self.sd_hash = sd_hash
defer.returnValue(None)
@defer.inlineCallbacks
def _start(self):
yield EncryptedFileSaver._start(self)
yield self.load_file_attributes()
status = yield self._save_status()
log_status(self.sd_hash, status)
defer.returnValue(status)

View file

@ -180,8 +180,10 @@ class EncryptedFileManager(object):
yield self._stop_lbry_file(lbry_file)
@defer.inlineCallbacks
def add_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None,
def add_lbry_file(self, stream_hash, payment_rate_manager=None, blob_data_rate=None,
download_directory=None, file_name=None):
if not payment_rate_manager:
payment_rate_manager = self.session.payment_rate_manager
rowid = yield self._save_lbry_file(stream_hash, blob_data_rate)
lbry_file = yield self.start_lbry_file(rowid, stream_hash, payment_rate_manager,
blob_data_rate, download_directory,

View file

@ -6,6 +6,7 @@ from lbrynet.lbry_file.StreamDescriptor import save_sd_info
from lbrynet.cryptstream.client.CryptStreamDownloader import CryptStreamDownloader
from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager
from lbrynet.core.StreamDescriptor import StreamMetadata
from lbrynet.core.Error import NoSuchStreamHash
from lbrynet.interfaces import IStreamDownloaderFactory
from lbrynet.lbry_file.client.EncryptedFileMetadataHandler import EncryptedFileMetadataHandler
import os
@ -28,21 +29,22 @@ class EncryptedFileDownloader(CryptStreamDownloader):
self.stream_info_manager = stream_info_manager
self.suggested_file_name = None
self._calculated_total_bytes = None
self.sd_hash = None
@defer.inlineCallbacks
def set_stream_info(self):
if self.key is None:
d = self.stream_info_manager.get_stream_info(self.stream_hash)
out = yield self.stream_info_manager.get_stream_info(self.stream_hash)
key, stream_name, suggested_file_name = out
self.key = binascii.unhexlify(key)
self.stream_name = binascii.unhexlify(stream_name)
self.suggested_file_name = binascii.unhexlify(suggested_file_name)
def set_stream_info(stream_info):
key, stream_name, suggested_file_name = stream_info
self.key = binascii.unhexlify(key)
self.stream_name = binascii.unhexlify(stream_name)
self.suggested_file_name = binascii.unhexlify(suggested_file_name)
d.addCallback(set_stream_info)
return d
else:
return defer.succeed(True)
out = yield self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)
if out:
self.sd_hash = out[0]
else:
raise NoSuchStreamHash(self.stream_hash)
def delete_data(self):
d1 = self.stream_info_manager.get_blobs_for_stream(self.stream_hash)

View file

@ -33,6 +33,7 @@ class ReflectorServer(Protocol):
self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port)
self.blob_manager = self.factory.blob_manager
self.stream_info_manager = self.factory.stream_info_manager
self.lbry_file_manager = self.factory.lbry_file_manager
self.protocol_version = self.factory.protocol_version
self.received_handshake = False
self.peer_version = None
@ -107,6 +108,7 @@ class ReflectorServer(Protocol):
yield save_sd_info(self.stream_info_manager, sd_info)
yield self.stream_info_manager.save_sd_blob_hash_to_stream(sd_info['stream_hash'],
blob.blob_hash)
self.lbry_file_manager.add_lbry_file(sd_info['stream_hash'])
should_announce = True
# if we already have the head blob, set it to be announced now that we know it's
@ -399,10 +401,11 @@ class ReflectorServer(Protocol):
class ReflectorServerFactory(ServerFactory):
protocol = ReflectorServer
def __init__(self, peer_manager, blob_manager, stream_info_manager):
def __init__(self, peer_manager, blob_manager, stream_info_manager, lbry_file_manager):
self.peer_manager = peer_manager
self.blob_manager = blob_manager
self.stream_info_manager = stream_info_manager
self.lbry_file_manager = lbry_file_manager
self.protocol_version = REFLECTOR_V2
def buildProtocol(self, addr):

View file

@ -6,7 +6,6 @@ from lbrynet import lbry_file
from lbrynet import reflector
from lbrynet.core import BlobManager
from lbrynet.core import PeerManager
from lbrynet.core import RateLimiter
from lbrynet.core import Session
from lbrynet.core import StreamDescriptor
from lbrynet.lbry_file import EncryptedFileMetadataManager
@ -32,7 +31,6 @@ class TestReflector(unittest.TestCase):
peer_manager = PeerManager.PeerManager()
peer_finder = mocks.PeerFinder(5553, peer_manager, 2)
hash_announcer = mocks.Announcer()
rate_limiter = RateLimiter.DummyRateLimiter()
sd_identifier = StreamDescriptor.StreamDescriptorIdentifier()
self.expected_blobs = [
@ -52,7 +50,7 @@ class TestReflector(unittest.TestCase):
1015056
),
]
## Setup reflector client classes ##
self.db_dir, self.blob_dir = mk_db_and_blob_dir()
self.session = Session.Session(
conf.settings['data_rate'],
@ -63,7 +61,6 @@ class TestReflector(unittest.TestCase):
blob_dir=self.blob_dir,
peer_port=5553,
use_upnp=False,
rate_limiter=rate_limiter,
wallet=wallet,
blob_tracker_class=mocks.BlobAvailabilityTracker,
external_ip="127.0.0.1"
@ -75,19 +72,39 @@ class TestReflector(unittest.TestCase):
self.lbry_file_manager = EncryptedFileManager.EncryptedFileManager(
self.session, self.stream_info_manager, sd_identifier)
## Setup reflector server classes ##
self.server_db_dir, self.server_blob_dir = mk_db_and_blob_dir()
self.server_session = Session.Session(
conf.settings['data_rate'],
db_dir=self.server_db_dir,
node_id="abcd",
peer_finder=peer_finder,
hash_announcer=hash_announcer,
blob_dir=self.server_blob_dir,
peer_port=5553,
use_upnp=False,
wallet=wallet,
blob_tracker_class=mocks.BlobAvailabilityTracker,
external_ip="127.0.0.1"
)
self.server_blob_manager = BlobManager.DiskBlobManager(
hash_announcer, self.server_blob_dir, self.server_db_dir)
self.server_stream_info_manager = \
EncryptedFileMetadataManager.DBEncryptedFileMetadataManager(self.server_db_dir)
self.server_lbry_file_manager = EncryptedFileManager.EncryptedFileManager(
self.server_session, self.server_stream_info_manager,
sd_identifier)
d = self.session.setup()
d.addCallback(lambda _: self.stream_info_manager.setup())
d.addCallback(lambda _: EncryptedFileOptions.add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup())
d.addCallback(lambda _: self.server_session.setup())
d.addCallback(lambda _: self.server_blob_manager.setup())
d.addCallback(lambda _: self.server_stream_info_manager.setup())
d.addCallback(lambda _: self.server_lbry_file_manager.setup())
def verify_equal(sd_info):
self.assertEqual(mocks.create_stream_sd_file, sd_info)
@ -123,7 +140,8 @@ class TestReflector(unittest.TestCase):
def start_server():
server_factory = reflector.ServerFactory(
peer_manager, self.server_blob_manager, self.server_stream_info_manager)
peer_manager, self.server_blob_manager, self.server_stream_info_manager,
self.server_lbry_file_manager)
from twisted.internet import reactor
port = 8943
while self.reflector_port is None:
@ -140,16 +158,18 @@ class TestReflector(unittest.TestCase):
def take_down_env(self):
d = defer.succeed(True)
if self.lbry_file_manager is not None:
d.addCallback(lambda _: self.lbry_file_manager.stop())
if self.session is not None:
d.addCallback(lambda _: self.session.shut_down())
if self.stream_info_manager is not None:
d.addCallback(lambda _: self.stream_info_manager.stop())
if self.server_blob_manager is not None:
d.addCallback(lambda _: self.server_blob_manager.stop())
if self.reflector_port is not None:
d.addCallback(lambda _: self.reflector_port.stopListening())
## Close client classes ##
d.addCallback(lambda _: self.lbry_file_manager.stop())
d.addCallback(lambda _: self.session.shut_down())
d.addCallback(lambda _: self.stream_info_manager.stop())
## Close server classes ##
d.addCallback(lambda _: self.server_blob_manager.stop())
d.addCallback(lambda _: self.server_lbry_file_manager.stop())
d.addCallback(lambda _: self.server_session.shut_down())
d.addCallback(lambda _: self.server_stream_info_manager.stop())
d.addCallback(lambda _: self.reflector_port.stopListening())
def delete_test_env():
try:
@ -186,6 +206,18 @@ class TestReflector(unittest.TestCase):
expected_sd_hash = self.expected_blobs[-1][0]
self.assertEqual(self.sd_hash, sd_hashes[0])
# check lbry file manager has the file
files = yield self.server_lbry_file_manager.lbry_files
self.assertEqual(1, len(files))
self.assertEqual(self.sd_hash, files[0].sd_hash)
self.assertEqual('test_file', files[0].file_name)
status = yield files[0].status()
self.assertEqual('stopped', status.running_status)
num_blobs = len(self.expected_blobs) -1 # subtract sd hash
self.assertEqual(num_blobs, status.num_completed)
self.assertEqual(num_blobs, status.num_known)
# check should_announce blobs on blob_manager
blob_hashes = yield self.server_blob_manager._get_all_should_announce_blob_hashes()
self.assertEqual(2, len(blob_hashes))