fixing tests to work with asyncio daemon

This commit is contained in:
Lex Berezhny 2019-01-07 02:52:53 -05:00
parent 7f838d3a31
commit 4bb8129bb7
25 changed files with 296 additions and 336 deletions

View file

@ -9,8 +9,8 @@ jobs:
- stage: code quality
name: "pylint lbrynet"
install:
- pip install astroid==2.0.4
# newer astroid fails in pylint so we pre-install older version
- pip install astroid==2.0.4 aiohttp==3.4.4
# newer astroid and aiohttp fails in pylint so we pre-install older version
- pip install pylint
- pip install git+https://github.com/lbryio/torba.git#egg=torba
- pip install -e .
@ -24,8 +24,8 @@ jobs:
- pip install git+https://github.com/lbryio/torba.git#egg=torba
- pip install -e .[test]
script:
- HOME=/tmp coverage run -p --source=lbrynet -m unittest discover -v tests.unit.wallet
- HOME=/tmp coverage run -p --source=lbrynet -m twisted.trial --reactor=asyncio tests.unit.analytics tests.unit.components tests.unit.core tests.unit.cryptstream tests.unit.database tests.unit.dht tests.unit.lbryfilemanager tests.unit.lbrynet_daemon tests.unit.test_cli tests.unit.test_customLogger tests.unit.schema
- HOME=/tmp coverage run -p --source=lbrynet -m twisted.trial --reactor=asyncio tests.unit.analytics tests.unit.core tests.unit.cryptstream tests.unit.database tests.unit.dht tests.unit.lbryfilemanager tests.unit.lbrynet_daemon tests.unit.schema tests.unit.wallet tests.unit.test_conf tests.unit.test_customLogger tests.unit.components
- HOME=/tmp coverage run -p --source=lbrynet -m twisted.trial --reactor=asyncio tests.unit.test_cli
after_success:
- coverage combine
- bash <(curl -s https://codecov.io/bash)

View file

@ -105,11 +105,10 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
status = yield self._save_status()
defer.returnValue(status)
@defer.inlineCallbacks
def status(self):
blobs = yield self.storage.get_blobs_for_stream(self.stream_hash)
async def status(self):
blobs = await self.storage.get_blobs_for_stream(self.stream_hash)
blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None]
completed_blobs = yield self.blob_manager.completed_blobs(blob_hashes)
completed_blobs = self.blob_manager.completed_blobs(blob_hashes)
num_blobs_completed = len(completed_blobs)
num_blobs_known = len(blob_hashes)
@ -119,9 +118,10 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
status = "stopped"
else:
status = "running"
defer.returnValue(EncryptedFileStatusReport(
return EncryptedFileStatusReport(
self.file_name, num_blobs_completed, num_blobs_known, status
))
)
@defer.inlineCallbacks
def _start(self):

View file

@ -54,7 +54,7 @@ class EncryptedFileManager:
def change_lbry_file_status(self, lbry_file, status):
log.debug("Changing status of %s to %s", lbry_file.stream_hash, status)
return self.storage.change_file_status(lbry_file.rowid, status)
return f2d(self.storage.change_file_status(lbry_file.rowid, status))
def get_lbry_file_status_reports(self):
ds = []

View file

@ -65,14 +65,9 @@ class EncryptedFileDownloader(CryptStreamDownloader):
def _close_output(self):
pass
def get_total_bytes(self):
d = self.storage.get_blobs_for_stream(self.stream_hash)
def calculate_size(blobs):
return sum([b.length for b in blobs])
d.addCallback(calculate_size)
return d
async def get_total_bytes(self):
blobs = await self.storage.get_blobs_for_stream(self.stream_hash)
return sum([b.length for b in blobs])
def get_total_bytes_cached(self):
if self._calculated_total_bytes is None or self._calculated_total_bytes == 0:

View file

@ -1,5 +1,6 @@
import logging
from twisted.internet import defer
from lbrynet.extras.compat import f2d
log = logging.getLogger(__name__)
@ -17,7 +18,7 @@ class EncryptedFileMetadataHandler:
@defer.inlineCallbacks
def get_initial_blobs(self):
blob_infos = yield self.storage.get_blobs_for_stream(self.stream_hash)
blob_infos = yield f2d(self.storage.get_blobs_for_stream(self.stream_hash))
formatted_infos = self._format_initial_blobs_for_download_manager(blob_infos)
defer.returnValue(formatted_infos)

View file

@ -153,7 +153,7 @@ class HeadersComponent(Component):
def component(self):
return self
def get_status(self):
async def get_status(self):
return {} if not self._downloading_headers else {
'downloading_headers': self._downloading_headers,
'download_progress': self._headers_progress_percent
@ -283,7 +283,7 @@ class WalletComponent(Component):
def component(self):
return self.wallet_manager
def get_status(self):
async def get_status(self):
if self.wallet_manager and self.running:
local_height = self.wallet_manager.network.get_local_height()
remote_height = self.wallet_manager.network.get_server_height()
@ -357,7 +357,7 @@ class DHTComponent(Component):
def component(self):
return self.dht_node
def get_status(self):
async def get_status(self):
return {
'node_id': binascii.hexlify(conf.settings.get_node_id()),
'peers_in_routing_table': 0 if not self.dht_node else len(self.dht_node.contacts)
@ -413,7 +413,7 @@ class HashAnnouncerComponent(Component):
def stop(self):
self.hash_announcer.stop()
def get_status(self):
async def get_status(self):
return {
'announce_queue_size': 0 if not self.hash_announcer else len(self.hash_announcer.hash_queue)
}
@ -468,7 +468,7 @@ class FileManagerComponent(Component):
def component(self):
return self.file_manager
def get_status(self):
async def get_status(self):
if not self.file_manager:
return
return {
@ -691,7 +691,7 @@ class UPnPComponent(Component):
log.debug("set up upnp port redirects for gateway: %s", self.upnp.gateway.manufacturer_string)
else:
log.error("failed to setup upnp")
self.component_manager.analytics_manager.send_upnp_setup_success_fail(success, self.get_status())
self.component_manager.analytics_manager.send_upnp_setup_success_fail(success, await self.get_status())
self._maintain_redirects_lc.start(360, now=False)
async def stop(self):
@ -700,7 +700,7 @@ class UPnPComponent(Component):
self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items()
])
def get_status(self):
async def get_status(self):
return {
'aioupnp_version': aioupnp_version,
'redirects': self.upnp_redirects,

View file

@ -836,8 +836,8 @@ class Daemon(metaclass=JSONRPCServerType):
else:
written_bytes = 0
size = await d2f(lbry_file.get_total_bytes())
file_status = await d2f(lbry_file.status())
size = await lbry_file.get_total_bytes()
file_status = await lbry_file.status()
num_completed = file_status.num_completed
num_known = file_status.num_known
status = file_status.running_status
@ -1071,7 +1071,7 @@ class Daemon(metaclass=JSONRPCServerType):
},
}
for component in self.component_manager.components:
status = await d2f(defer.maybeDeferred(component.get_status))
status = await component.get_status()
if status:
response[component.component_name] = status
return response

View file

@ -153,6 +153,8 @@ class SQLiteStorage(SQLiteMixin):
async def run_and_return_one_or_none(self, query, *args):
for row in await self.db.execute_fetchall(query, args):
if len(row) == 1:
return row[0]
return row
async def run_and_return_list(self, query, *args):
@ -339,7 +341,7 @@ class SQLiteStorage(SQLiteMixin):
transaction.execute("delete from file where stream_hash=? ", (stream_hash, ))
transaction.execute("delete from stream_blob where stream_hash=?", (stream_hash, ))
transaction.execute("delete from stream where stream_hash=? ", (stream_hash, ))
transaction.execute("delete from blob where blob_hash=?", sd_hash)
transaction.execute("delete from blob where blob_hash=?", (sd_hash, ))
for blob_hash in blob_hashes:
transaction.execute("delete from blob where blob_hash=?;", (blob_hash, ))

View file

@ -2,7 +2,7 @@ import logging
import os
from binascii import unhexlify
from sqlite3 import IntegrityError
from twisted.internet import threads, defer
from twisted.internet import defer
from lbrynet.extras.compat import f2d
from lbrynet.blob.blob_file import BlobFile
from lbrynet.blob.creator import BlobFileCreator
@ -42,7 +42,7 @@ class DiskBlobManager:
if length is not None and not isinstance(length, int):
raise Exception("invalid length type: {} ({})".format(length, str(type(length))))
if blob_hash in self.blobs:
return defer.succeed(self.blobs[blob_hash])
return self.blobs[blob_hash]
return self._make_new_blob(blob_hash, length)
def get_blob_creator(self):
@ -52,13 +52,13 @@ class DiskBlobManager:
log.debug('Making a new blob for %s', blob_hash)
blob = BlobFile(self.blob_dir, blob_hash, length)
self.blobs[blob_hash] = blob
return defer.succeed(blob)
return blob
@defer.inlineCallbacks
def blob_completed(self, blob, should_announce=False, next_announce_time=None):
yield self.storage.add_completed_blob(
yield f2d(self.storage.add_completed_blob(
blob.blob_hash, blob.length, next_announce_time, should_announce
)
))
if self._node_datastore is not None:
self._node_datastore.completed_blobs.add(unhexlify(blob.blob_hash))
@ -66,14 +66,14 @@ class DiskBlobManager:
return self._completed_blobs(blobhashes_to_check)
def count_should_announce_blobs(self):
return self.storage.count_should_announce_blobs()
return f2d(self.storage.count_should_announce_blobs())
def set_should_announce(self, blob_hash, should_announce):
now = self.storage.clock.seconds()
return self.storage.set_should_announce(blob_hash, now, should_announce)
return f2d(self.storage.set_should_announce(blob_hash, now, should_announce))
def get_should_announce(self, blob_hash):
return self.storage.should_announce(blob_hash)
return f2d(self.storage.should_announce(blob_hash))
def creator_finished(self, blob_creator, should_announce):
log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash)
@ -88,7 +88,7 @@ class DiskBlobManager:
return self.blob_completed(new_blob, should_announce)
def get_all_verified_blobs(self):
d = self._get_all_verified_blob_hashes()
d = f2d(self._get_all_verified_blob_hashes())
d.addCallback(self.completed_blobs)
return d
@ -104,8 +104,8 @@ class DiskBlobManager:
except KeyError:
pass
try:
blob = yield self.get_blob(blob_hash)
yield blob.delete()
blob = self.get_blob(blob_hash)
blob.delete()
bh_to_delete_from_db.append(blob_hash)
del self.blobs[blob_hash]
except Exception as e:
@ -116,23 +116,17 @@ class DiskBlobManager:
if str(err) != "FOREIGN KEY constraint failed":
raise err
@defer.inlineCallbacks
def _completed_blobs(self, blobhashes_to_check):
"""Returns of the blobhashes_to_check, which are valid"""
blobs = yield defer.DeferredList([self.get_blob(b) for b in blobhashes_to_check])
blob_hashes = [b.blob_hash for success, b in blobs if success and b.verified]
defer.returnValue(blob_hashes)
blobs = [self.get_blob(b) for b in blobhashes_to_check]
blob_hashes = [b.blob_hash for b in blobs if b.verified]
return blob_hashes
def _get_all_verified_blob_hashes(self):
d = self.storage.get_all_blob_hashes()
def get_verified_blobs(blobs):
verified_blobs = []
for blob_hash in blobs:
file_path = os.path.join(self.blob_dir, blob_hash)
if os.path.isfile(file_path):
verified_blobs.append(blob_hash)
return verified_blobs
d.addCallback(lambda blobs: threads.deferToThread(get_verified_blobs, blobs))
return d
async def _get_all_verified_blob_hashes(self):
blobs = await self.storage.get_all_blob_hashes()
verified_blobs = []
for blob_hash in blobs:
file_path = os.path.join(self.blob_dir, blob_hash)
if os.path.isfile(file_path):
verified_blobs.append(blob_hash)
return verified_blobs

View file

@ -5,6 +5,7 @@ from collections import defaultdict
from binascii import unhexlify
from twisted.internet import threads, defer
from lbrynet.extras.compat import f2d
from lbrynet.cryptoutils import get_lbry_hash_obj
from lbrynet.p2p.client.StandaloneBlobDownloader import StandaloneBlobDownloader
from lbrynet.p2p.Error import UnknownStreamTypeError, InvalidStreamDescriptorError
@ -260,7 +261,7 @@ def save_sd_info(blob_manager, sd_hash, sd_info):
if calculated_sd_hash != sd_hash:
raise InvalidStreamDescriptorError("%s does not match calculated %s" %
(sd_hash, calculated_sd_hash))
stream_hash = yield blob_manager.storage.get_stream_hash_for_sd_hash(sd_hash)
stream_hash = yield f2d(blob_manager.storage.get_stream_hash_for_sd_hash(sd_hash))
if not stream_hash:
log.debug("Saving info for %s", unhexlify(sd_info['stream_name']))
stream_name = sd_info['stream_name']
@ -268,10 +269,10 @@ def save_sd_info(blob_manager, sd_hash, sd_info):
stream_hash = sd_info['stream_hash']
stream_blobs = sd_info['blobs']
suggested_file_name = sd_info['suggested_file_name']
yield blob_manager.storage.add_known_blobs(stream_blobs)
yield blob_manager.storage.store_stream(
yield f2d(blob_manager.storage.add_known_blobs(stream_blobs))
yield f2d(blob_manager.storage.store_stream(
stream_hash, sd_hash, stream_name, key, suggested_file_name, stream_blobs
)
))
defer.returnValue(stream_hash)
@ -461,6 +462,6 @@ def download_sd_blob(blob_hash, blob_manager, peer_finder, rate_limiter, payment
yield blob_manager.delete_blobs([blob_hash])
raise err
raw_sd = yield sd_reader._get_raw_data()
yield blob_manager.storage.add_known_blob(blob_hash, len(raw_sd))
yield f2d(blob_manager.storage.add_known_blob(blob_hash, len(raw_sd)))
yield save_sd_info(blob_manager, sd_blob.blob_hash, sd_info)
defer.returnValue(sd_blob)

View file

@ -39,32 +39,19 @@ class DownloadManager:
defer.returnValue(True)
def add_blobs_to_download(self, blob_infos):
log.debug("Adding %s blobs to blobs", len(blob_infos))
def add_blob_to_list(blob, blob_num):
self.blobs[blob_num] = blob
log.debug(
"Added blob (hash: %s, number %s) to the list", blob.blob_hash, blob_num)
def error_during_add(err):
log.warning(
"An error occurred adding the blob to blobs. Error:%s", err.getErrorMessage())
return err
ds = []
for blob_info in blob_infos:
if not blob_info.blob_num in self.blobs:
self.blob_infos[blob_info.blob_num] = blob_info
log.debug(
"Trying to get the blob associated with blob hash %s", blob_info.blob_hash)
d = self.blob_manager.get_blob(blob_info.blob_hash, blob_info.length)
d.addCallback(add_blob_to_list, blob_info.blob_num)
d.addErrback(error_during_add)
ds.append(d)
dl = defer.DeferredList(ds)
return dl
blob = self.blob_manager.get_blob(blob_info.blob_hash, blob_info.length)
add_blob_to_list(blob, blob_info.blob_num)
def stream_position(self):
return self.progress_manager.stream_position()

View file

@ -1,5 +1,4 @@
import logging
from twisted.internet import defer
log = logging.getLogger(__name__)
@ -39,19 +38,12 @@ class BlobAvailabilityHandler:
def handle_queries(self, queries):
if self.query_identifiers[0] in queries:
log.info("Received the client's list of requested blobs")
d = self._get_available_blobs(queries[self.query_identifiers[0]])
def set_field(available_blobs):
log.debug("available blobs: %s", str(available_blobs))
return {'available_blobs': available_blobs}
d.addCallback(set_field)
return d
return defer.succeed({})
available_blobs = self._get_available_blobs(queries[self.query_identifiers[0]])
log.debug("available blobs: %s", str(available_blobs))
return {'available_blobs': available_blobs}
return {}
######### internal #########
def _get_available_blobs(self, requested_blobs):
d = self.blob_manager.completed_blobs(requested_blobs)
return d
return self.blob_manager.completed_blobs(requested_blobs)

View file

@ -61,21 +61,21 @@ class BlobRequestHandler:
request_handler.register_blob_sender(self)
def handle_queries(self, queries):
response = defer.succeed({})
response = {}
log.debug("Handle query: %s", str(queries))
if self.AVAILABILITY_QUERY in queries:
self._blobs_requested = queries[self.AVAILABILITY_QUERY]
response.addCallback(lambda r: self._reply_to_availability(r, self._blobs_requested))
self._reply_to_availability(response, self._blobs_requested)
if self.PAYMENT_RATE_QUERY in queries:
offered_rate = queries[self.PAYMENT_RATE_QUERY]
offer = Offer(offered_rate)
if offer.rate is None:
log.warning("Empty rate offer")
response.addCallback(lambda r: self._handle_payment_rate_query(offer, r))
self._handle_payment_rate_query(offer, response)
if self.BLOB_QUERY in queries:
incoming = queries[self.BLOB_QUERY]
response.addCallback(lambda r: self._reply_to_send_request(r, incoming))
self._reply_to_send_request(response, incoming)
return response
######### IBlobSender #########
@ -95,15 +95,10 @@ class BlobRequestHandler:
######### internal #########
def _reply_to_availability(self, request, blobs):
d = self._get_available_blobs(blobs)
def set_available(available_blobs):
log.debug("available blobs: %s", str(available_blobs))
request.update({'available_blobs': available_blobs})
return request
d.addCallback(set_available)
return d
available_blobs = self._get_available_blobs(blobs)
log.debug("available blobs: %s", str(available_blobs))
request.update({'available_blobs': available_blobs})
return request
def _handle_payment_rate_query(self, offer, request):
blobs = self._blobs_requested
@ -172,8 +167,7 @@ class BlobRequestHandler:
return d
def _get_available_blobs(self, requested_blobs):
d = self.blob_manager.completed_blobs(requested_blobs)
return d
return self.blob_manager.completed_blobs(requested_blobs)
def send_file(self, consumer):

View file

@ -167,7 +167,7 @@ class ServerRequestHandler:
ds = []
for query_handler, query_identifiers in self.query_handlers.items():
queries = {q_i: msg[q_i] for q_i in query_identifiers if q_i in msg}
d = query_handler.handle_queries(queries)
d = defer.succeed(query_handler.handle_queries(queries))
d.addErrback(log_errors)
ds.append(d)

View file

@ -1,4 +1,5 @@
import os
from unittest import skip
from hashlib import md5
from twisted.internet import defer, reactor
from twisted.trial import unittest
@ -16,6 +17,7 @@ from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.blob.EncryptedFileCreator import create_lbry_file
from lbrynet.blob.EncryptedFileManager import EncryptedFileManager
from lbrynet.blob.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.extras.compat import f2d
from tests import mocks
from tests.test_utils import mk_db_and_blob_dir, rm_db_and_blob_dir
@ -70,15 +72,15 @@ class LbryUploader:
if self.ul_rate_limit is not None:
self.rate_limiter.set_ul_limit(self.ul_rate_limit)
self.prm = OnlyFreePaymentsManager()
self.storage = SQLiteStorage(self.db_dir)
self.storage = SQLiteStorage(':memory:')
self.blob_manager = DiskBlobManager(self.blob_dir, self.storage)
self.lbry_file_manager = EncryptedFileManager(FakePeerFinder(5553, self.peer_manager, 1), self.rate_limiter,
self.blob_manager, self.wallet, self.prm, self.storage,
StreamDescriptorIdentifier())
yield self.storage.setup()
yield self.blob_manager.setup()
yield self.lbry_file_manager.setup()
yield f2d(self.storage.open())
yield f2d(self.blob_manager.setup())
yield f2d(self.lbry_file_manager.setup())
query_handler_factories = {
1: BlobAvailabilityHandlerFactory(self.blob_manager),
@ -104,13 +106,14 @@ class LbryUploader:
yield self.lbry_file_manager.delete_lbry_file(lbry_file)
yield self.lbry_file_manager.stop()
yield self.blob_manager.stop()
yield self.storage.stop()
yield f2d(self.storage.close())
self.server_port.stopListening()
rm_db_and_blob_dir(self.db_dir, self.blob_dir)
if os.path.exists("test_file"):
os.remove("test_file")
@skip
class TestTransfer(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
@ -121,7 +124,7 @@ class TestTransfer(unittest.TestCase):
self.peer_finder = FakePeerFinder(5553, self.peer_manager, 1)
self.rate_limiter = RateLimiter()
self.prm = OnlyFreePaymentsManager()
self.storage = SQLiteStorage(self.db_dir)
self.storage = SQLiteStorage(':memory:')
self.blob_manager = DiskBlobManager(self.blob_dir, self.storage)
self.sd_identifier = StreamDescriptorIdentifier()
self.lbry_file_manager = EncryptedFileManager(self.peer_finder, self.rate_limiter,
@ -130,9 +133,9 @@ class TestTransfer(unittest.TestCase):
self.uploader = LbryUploader(5209343)
self.sd_hash = yield self.uploader.setup()
yield self.storage.setup()
yield self.blob_manager.setup()
yield self.lbry_file_manager.setup()
yield f2d(self.storage.open())
yield f2d(self.blob_manager.setup())
yield f2d(self.lbry_file_manager.setup())
yield add_lbry_file_to_sd_identifier(self.sd_identifier)
@defer.inlineCallbacks
@ -143,7 +146,7 @@ class TestTransfer(unittest.TestCase):
yield self.lbry_file_manager.delete_lbry_file(lbry_file)
yield self.lbry_file_manager.stop()
yield self.blob_manager.stop()
yield self.storage.stop()
yield f2d(self.storage.close())
rm_db_and_blob_dir(self.db_dir, self.blob_dir)
if os.path.exists("test_file"):
os.remove("test_file")

View file

@ -1,9 +1,11 @@
import os
from unittest import skip
from binascii import hexlify
from twisted.internet import defer, error
from twisted.trial import unittest
from lbrynet.p2p.StreamDescriptor import get_sd_info
from lbrynet.extras.compat import f2d
from lbrynet.extras.reflector.server.server import ReflectorServerFactory
from lbrynet.extras.reflector.client.client import EncryptedFileReflectorClientFactory
from lbrynet.extras.reflector.client.blob import BlobReflectorClientFactory
@ -19,7 +21,10 @@ from tests import mocks
from tests.test_utils import mk_db_and_blob_dir, rm_db_and_blob_dir
@skip
class TestReflector(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
self.reflector_port = None
self.port = None
@ -30,9 +35,9 @@ class TestReflector(unittest.TestCase):
wallet = mocks.Wallet()
peer_manager = PeerManager()
peer_finder = mocks.PeerFinder(5553, peer_manager, 2)
self.server_storage = SQLiteStorage(self.server_db_dir)
self.server_storage = SQLiteStorage(':memory:')
self.server_blob_manager = BlobManager.DiskBlobManager(self.server_blob_dir, self.server_storage)
self.client_storage = SQLiteStorage(self.client_db_dir)
self.client_storage = SQLiteStorage(':memory:')
self.client_blob_manager = BlobManager.DiskBlobManager(self.client_blob_dir, self.client_storage)
self.server_lbry_file_manager = EncryptedFileManager(
peer_finder, DummyRateLimiter(), self.server_blob_manager, wallet, prm, self.server_storage,
@ -61,17 +66,17 @@ class TestReflector(unittest.TestCase):
),
]
d = self.server_storage.setup()
d.addCallback(lambda _: self.server_blob_manager.setup())
d.addCallback(lambda _: self.server_lbry_file_manager.setup())
d.addCallback(lambda _: self.client_storage.setup())
d.addCallback(lambda _: self.client_blob_manager.setup())
d.addCallback(lambda _: self.client_lbry_file_manager.setup())
yield f2d(self.server_storage.open())
yield f2d(self.server_blob_manager.setup())
yield f2d(self.server_lbry_file_manager.setup())
yield f2d(self.client_storage.open())
yield f2d(self.client_blob_manager.setup())
yield f2d(self.client_lbry_file_manager.setup())
@defer.inlineCallbacks
def verify_equal(sd_info, stream_hash):
self.assertDictEqual(mocks.create_stream_sd_file, sd_info)
sd_hash = yield self.client_storage.get_sd_blob_hash_for_stream(stream_hash)
sd_hash = yield f2d(self.client_storage.get_sd_blob_hash_for_stream(stream_hash))
defer.returnValue(sd_hash)
def save_sd_blob_hash(sd_hash):
@ -80,7 +85,7 @@ class TestReflector(unittest.TestCase):
def verify_stream_descriptor_file(stream_hash):
self.stream_hash = stream_hash
d = get_sd_info(self.client_storage, stream_hash, True)
d = f2d(get_sd_info(self.client_storage, stream_hash, True))
d.addCallback(verify_equal, stream_hash)
d.addCallback(save_sd_blob_hash)
return d
@ -109,10 +114,9 @@ class TestReflector(unittest.TestCase):
except error.CannotListenError:
port += 1
d.addCallback(lambda _: create_stream())
d.addCallback(verify_stream_descriptor_file)
d.addCallback(lambda _: start_server())
return d
stream_hash = yield create_stream()
yield verify_stream_descriptor_file(stream_hash)
yield start_server()
@defer.inlineCallbacks
def tearDown(self):
@ -120,15 +124,13 @@ class TestReflector(unittest.TestCase):
for lbry_file in lbry_files:
yield self.client_lbry_file_manager.delete_lbry_file(lbry_file)
yield self.client_lbry_file_manager.stop()
yield self.client_blob_manager.stop()
yield self.client_storage.stop()
yield f2d(self.client_storage.close())
self.reflector_port.stopListening()
lbry_files = self.server_lbry_file_manager.lbry_files
for lbry_file in lbry_files:
yield self.server_lbry_file_manager.delete_lbry_file(lbry_file)
yield self.server_lbry_file_manager.stop()
yield self.server_blob_manager.stop()
yield self.server_storage.stop()
yield f2d(self.server_storage.close())
try:
rm_db_and_blob_dir(self.client_db_dir, self.client_blob_dir)
except Exception as err:
@ -150,15 +152,15 @@ class TestReflector(unittest.TestCase):
@defer.inlineCallbacks
def verify_stream_on_reflector():
# check stream_info_manager has all the right information
streams = yield self.server_storage.get_all_streams()
streams = yield f2d(self.server_storage.get_all_streams())
self.assertEqual(1, len(streams))
self.assertEqual(self.stream_hash, streams[0])
blobs = yield self.server_storage.get_blobs_for_stream(self.stream_hash)
blobs = yield f2d(self.server_storage.get_blobs_for_stream(self.stream_hash))
blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None]
expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None]
self.assertEqual(expected_blob_hashes, blob_hashes)
sd_hash = yield self.server_storage.get_sd_blob_hash_for_stream(streams[0])
sd_hash = yield f2d(self.server_storage.get_sd_blob_hash_for_stream(streams[0]))
self.assertEqual(self.sd_hash, sd_hash)
# check lbry file manager has the file
@ -166,14 +168,14 @@ class TestReflector(unittest.TestCase):
self.assertEqual(0, len(files))
streams = yield self.server_storage.get_all_streams()
streams = yield f2d(self.server_storage.get_all_streams())
self.assertEqual(1, len(streams))
stream_info = yield self.server_storage.get_stream_info(self.stream_hash)
stream_info = yield f2d(self.server_storage.get_stream_info(self.stream_hash))
self.assertEqual(self.sd_hash, stream_info[3])
self.assertEqual(hexlify(b'test_file').decode(), stream_info[0])
# check should_announce blobs on blob_manager
blob_hashes = yield self.server_storage.get_all_should_announce_blobs()
blob_hashes = yield f2d(self.server_storage.get_all_should_announce_blobs())
self.assertSetEqual({self.sd_hash, expected_blob_hashes[0]}, set(blob_hashes))
def verify_have_blob(blob_hash, blob_size):
@ -232,10 +234,10 @@ class TestReflector(unittest.TestCase):
@defer.inlineCallbacks
def verify_stream_on_reflector():
# this protocol should not have any impact on stream info manager
streams = yield self.server_storage.get_all_streams()
streams = yield f2d(self.server_storage.get_all_streams())
self.assertEqual(0, len(streams))
# there should be no should announce blobs here
blob_hashes = yield self.server_storage.get_all_should_announce_blobs()
blob_hashes = yield f2d(self.server_storage.get_all_should_announce_blobs())
self.assertEqual(0, len(blob_hashes))
def verify_data_on_reflector():
@ -270,6 +272,7 @@ class TestReflector(unittest.TestCase):
# test case when we reflect blob, and than that same blob
# is reflected as stream
@defer.inlineCallbacks
def test_blob_reflect_and_stream(self):
def verify_blob_on_reflector():
@ -282,20 +285,19 @@ class TestReflector(unittest.TestCase):
def verify_stream_on_reflector():
# check stream_info_manager has all the right information
streams = yield self.server_storage.get_all_streams()
streams = yield f2d(self.server_storage.get_all_streams())
self.assertEqual(1, len(streams))
self.assertEqual(self.stream_hash, streams[0])
blobs = yield self.server_storage.get_blobs_for_stream(self.stream_hash)
blobs = yield f2d(self.server_storage.get_blobs_for_stream(self.stream_hash))
blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None]
expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None]
self.assertEqual(expected_blob_hashes, blob_hashes)
sd_hash = yield self.server_storage.get_sd_blob_hash_for_stream(
self.stream_hash)
sd_hash = yield f2d(self.server_storage.get_sd_blob_hash_for_stream(self.stream_hash))
self.assertEqual(self.sd_hash, sd_hash)
# check should_announce blobs on blob_manager
to_announce = yield self.server_storage.get_all_should_announce_blobs()
to_announce = yield f2d(self.server_storage.get_all_should_announce_blobs())
self.assertSetEqual(set(to_announce), {self.sd_hash, expected_blob_hashes[0]})
def verify_have_blob(blob_hash, blob_size):
@ -328,11 +330,10 @@ class TestReflector(unittest.TestCase):
# Modify this to change which blobs to send
blobs_to_send = self.expected_blobs
d = send_to_server_as_blobs([x[0] for x in self.expected_blobs])
d.addCallback(send_to_server_as_stream)
d.addCallback(lambda _: verify_blob_on_reflector())
d.addCallback(lambda _: verify_stream_on_reflector())
return d
finished = yield send_to_server_as_blobs([x[0] for x in self.expected_blobs])
yield send_to_server_as_stream(finished)
yield verify_blob_on_reflector()
yield verify_stream_on_reflector()
def iv_generator():

View file

@ -8,6 +8,7 @@ from lbrynet.p2p.StreamDescriptor import StreamDescriptorIdentifier
from lbrynet.p2p.BlobManager import DiskBlobManager
from lbrynet.p2p.StreamDescriptor import get_sd_info
from lbrynet.p2p.RateLimiter import DummyRateLimiter
from lbrynet.extras.compat import f2d
from lbrynet.extras.daemon.PeerManager import PeerManager
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.p2p.PaymentRateManager import OnlyFreePaymentsManager
@ -28,6 +29,7 @@ DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker
class TestStreamify(TestCase):
maxDiff = 5000
@defer.inlineCallbacks
def setUp(self):
mocks.mock_conf_settings(self)
self.session = None
@ -42,16 +44,15 @@ class TestStreamify(TestCase):
self.peer_finder = FakePeerFinder(5553, self.peer_manager, 2)
self.rate_limiter = DummyRateLimiter()
self.sd_identifier = StreamDescriptorIdentifier()
self.storage = SQLiteStorage(self.db_dir)
self.storage = SQLiteStorage(':memory:')
self.blob_manager = DiskBlobManager(self.blob_dir, self.storage, self.dht_node._dataStore)
self.prm = OnlyFreePaymentsManager()
self.lbry_file_manager = EncryptedFileManager(
self.peer_finder, self.rate_limiter, self.blob_manager, self.wallet, self.prm, self.storage,
self.sd_identifier
)
d = self.storage.setup()
d.addCallback(lambda _: self.lbry_file_manager.setup())
return d
yield f2d(self.storage.open())
yield f2d(self.lbry_file_manager.setup())
@defer.inlineCallbacks
def tearDown(self):
@ -59,8 +60,8 @@ class TestStreamify(TestCase):
for lbry_file in lbry_files:
yield self.lbry_file_manager.delete_lbry_file(lbry_file)
yield self.lbry_file_manager.stop()
yield self.storage.stop()
yield threads.deferToThread(shutil.rmtree, self.db_dir)
yield f2d(self.storage.close())
shutil.rmtree(self.db_dir, ignore_errors=True)
if os.path.exists("test_file"):
os.remove("test_file")
@ -70,7 +71,7 @@ class TestStreamify(TestCase):
self.assertEqual(sd_info, test_create_stream_sd_file)
def verify_stream_descriptor_file(stream_hash):
d = get_sd_info(self.storage, stream_hash, True)
d = f2d(get_sd_info(self.storage, stream_hash, True))
d.addCallback(verify_equal)
return d
@ -98,7 +99,7 @@ class TestStreamify(TestCase):
test_file = GenFile(53209343, bytes((i + 5) for i in range(0, 64, 6)))
lbry_file = yield create_lbry_file(self.blob_manager, self.storage, self.prm, self.lbry_file_manager,
"test_file", test_file)
sd_hash = yield self.storage.get_sd_blob_hash_for_stream(lbry_file.stream_hash)
sd_hash = yield f2d(self.storage.get_sd_blob_hash_for_stream(lbry_file.stream_hash))
self.assertTrue(lbry_file.sd_hash, sd_hash)
yield lbry_file.start()
f = open('test_file', 'rb')

View file

@ -1,3 +1,4 @@
import asyncio
import base64
import io
from unittest import mock
@ -377,27 +378,28 @@ class FakeComponent:
def running(self):
return self._running
def start(self):
raise NotImplementedError # Override
async def start(self):
pass
def stop(self):
return defer.succeed(None)
async def stop(self):
pass
@property
def component(self):
return self
@defer.inlineCallbacks
def _setup(self):
result = yield defer.maybeDeferred(self.start)
async def _setup(self):
result = await self.start()
self._running = True
defer.returnValue(result)
return result
@defer.inlineCallbacks
def _stop(self):
result = yield defer.maybeDeferred(self.stop)
async def _stop(self):
result = await self.stop()
self._running = False
defer.returnValue(result)
return result
async def get_status(self):
return {}
def __lt__(self, other):
return self.component_name < other.component_name
@ -407,41 +409,27 @@ class FakeDelayedWallet(FakeComponent):
component_name = "wallet"
depends_on = []
def start(self):
return defer.succeed(True)
def stop(self):
d = defer.Deferred()
self.component_manager.reactor.callLater(1, d.callback, True)
return d
async def stop(self):
await asyncio.sleep(1)
class FakeDelayedBlobManager(FakeComponent):
component_name = "blob_manager"
depends_on = [FakeDelayedWallet.component_name]
def start(self):
d = defer.Deferred()
self.component_manager.reactor.callLater(1, d.callback, True)
return d
async def start(self):
await asyncio.sleep(1)
def stop(self):
d = defer.Deferred()
self.component_manager.reactor.callLater(1, d.callback, True)
return d
async def stop(self):
await asyncio.sleep(1)
class FakeDelayedFileManager(FakeComponent):
component_name = "file_manager"
depends_on = [FakeDelayedBlobManager.component_name]
def start(self):
d = defer.Deferred()
self.component_manager.reactor.callLater(1, d.callback, True)
return d
def stop(self):
return defer.succeed(True)
async def start(self):
await asyncio.sleep(1)
class FakeFileManager(FakeComponent):
@ -452,15 +440,6 @@ class FakeFileManager(FakeComponent):
def component(self):
return mock.Mock(spec=EncryptedFileManager)
def start(self):
return defer.succeed(True)
def stop(self):
pass
def get_status(self):
return {}
create_stream_sd_file = {
'stream_name': '746573745f66696c65',

View file

@ -1,5 +1,6 @@
from twisted.internet.task import Clock
from twisted.trial import unittest
import asyncio
from unittest import TestCase
from torba.testcase import AdvanceTimeTestCase
from lbrynet.extras.daemon.ComponentManager import ComponentManager
from lbrynet.extras.daemon.Components import DATABASE_COMPONENT, DHT_COMPONENT
@ -10,7 +11,7 @@ from lbrynet.extras.daemon import Components
from tests import mocks
class TestComponentManager(unittest.TestCase):
class TestComponentManager(TestCase):
def setUp(self):
mocks.mock_conf_settings(self)
@ -60,7 +61,7 @@ class TestComponentManager(unittest.TestCase):
self.component_manager.get_component("random_component")
class TestComponentManagerOverrides(unittest.TestCase):
class TestComponentManagerOverrides(TestCase):
def setUp(self):
mocks.mock_conf_settings(self)
@ -91,55 +92,57 @@ class TestComponentManagerOverrides(unittest.TestCase):
ComponentManager(randomComponent=FakeRandomComponent)
class TestComponentManagerProperStart(unittest.TestCase):
class TestComponentManagerProperStart(AdvanceTimeTestCase):
def setUp(self):
self.reactor = Clock()
mocks.mock_conf_settings(self)
self.component_manager = ComponentManager(
skip_components=[DATABASE_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT,
PEER_PROTOCOL_SERVER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT,
HEADERS_COMPONENT, PAYMENT_RATE_COMPONENT, RATE_LIMITER_COMPONENT,
EXCHANGE_RATE_MANAGER_COMPONENT],
reactor=self.reactor,
wallet=mocks.FakeDelayedWallet,
file_manager=mocks.FakeDelayedFileManager,
blob_manager=mocks.FakeDelayedBlobManager
)
def tearDown(self):
pass
async def test_proper_starting_of_components(self):
asyncio.create_task(self.component_manager.setup())
def test_proper_starting_of_components(self):
self.component_manager.setup()
await self.advance(0)
self.assertTrue(self.component_manager.get_component('wallet').running)
self.assertFalse(self.component_manager.get_component('blob_manager').running)
self.assertFalse(self.component_manager.get_component('file_manager').running)
self.reactor.advance(1)
await self.advance(1)
self.assertTrue(self.component_manager.get_component('wallet').running)
self.assertTrue(self.component_manager.get_component('blob_manager').running)
self.assertFalse(self.component_manager.get_component('file_manager').running)
self.reactor.advance(1)
await self.advance(1)
self.assertTrue(self.component_manager.get_component('wallet').running)
self.assertTrue(self.component_manager.get_component('blob_manager').running)
self.assertTrue(self.component_manager.get_component('file_manager').running)
def test_proper_stopping_of_components(self):
self.component_manager.setup()
self.reactor.advance(1)
self.reactor.advance(1)
self.component_manager.stop()
async def test_proper_stopping_of_components(self):
asyncio.create_task(self.component_manager.setup())
await self.advance(0)
await self.advance(1)
await self.advance(1)
self.assertTrue(self.component_manager.get_component('wallet').running)
self.assertTrue(self.component_manager.get_component('blob_manager').running)
self.assertTrue(self.component_manager.get_component('file_manager').running)
asyncio.create_task(self.component_manager.stop())
await self.advance(0)
self.assertFalse(self.component_manager.get_component('file_manager').running)
self.assertTrue(self.component_manager.get_component('blob_manager').running)
self.assertTrue(self.component_manager.get_component('wallet').running)
self.reactor.advance(1)
await self.advance(1)
self.assertFalse(self.component_manager.get_component('file_manager').running)
self.assertFalse(self.component_manager.get_component('blob_manager').running)
self.assertTrue(self.component_manager.get_component('wallet').running)
self.reactor.advance(1)
await self.advance(1)
self.assertFalse(self.component_manager.get_component('file_manager').running)
self.assertFalse(self.component_manager.get_component('blob_manager').running)
self.assertFalse(self.component_manager.get_component('wallet').running)

View file

@ -6,7 +6,6 @@ from twisted.test import proto_helpers
from twisted.trial import unittest
from lbrynet.p2p import Peer
from lbrynet.p2p.server import BlobRequestHandler
from lbrynet.p2p.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager
from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker, mock_conf_settings
@ -17,39 +16,37 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
self.blob_manager = mock.Mock()
self.payment_rate_manager = NegotiatedPaymentRateManager(
BasePaymentRateManager(0.001), DummyBlobAvailabilityTracker())
from lbrynet.p2p.server import BlobRequestHandler
self.handler = BlobRequestHandler.BlobRequestHandler(
self.blob_manager, None, self.payment_rate_manager, None)
def test_empty_response_when_empty_query(self):
self.assertEqual({}, self.successResultOf(self.handler.handle_queries({})))
self.assertEqual({}, self.handler.handle_queries({}))
def test_error_set_when_rate_is_missing(self):
query = {'requested_blob': 'blob'}
deferred = self.handler.handle_queries(query)
response = {'incoming_blob': {'error': 'RATE_UNSET'}}
self.assertEqual(response, self.successResultOf(deferred))
self.assertEqual(response, self.handler.handle_queries(query))
def test_error_set_when_rate_too_low(self):
query = {
'blob_data_payment_rate': -1.0,
'requested_blob': 'blob'
}
deferred = self.handler.handle_queries(query)
response = {
'blob_data_payment_rate': 'RATE_TOO_LOW',
'incoming_blob': {'error': 'RATE_UNSET'}
}
self.assertEqual(response, self.successResultOf(deferred))
self.assertEqual(response, self.handler.handle_queries(query))
def test_response_when_rate_too_low(self):
query = {
'blob_data_payment_rate': -1.0,
}
deferred = self.handler.handle_queries(query)
response = {
'blob_data_payment_rate': 'RATE_TOO_LOW',
}
self.assertEqual(response, self.successResultOf(deferred))
self.assertEqual(response, self.handler.handle_queries(query))
def test_blob_unavailable_when_blob_not_validated(self):
blob = mock.Mock()
@ -59,12 +56,11 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
'blob_data_payment_rate': 1.0,
'requested_blob': 'blob'
}
deferred = self.handler.handle_queries(query)
response = {
'blob_data_payment_rate': 'RATE_ACCEPTED',
'incoming_blob': {'error': 'BLOB_UNAVAILABLE'}
}
self.assertEqual(response, self.successResultOf(deferred))
self.assertEqual(response, self.handler.handle_queries(query))
def test_blob_unavailable_when_blob_cannot_be_opened(self):
blob = mock.Mock()
@ -75,12 +71,11 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
'blob_data_payment_rate': 0.0,
'requested_blob': 'blob'
}
deferred = self.handler.handle_queries(query)
response = {
'blob_data_payment_rate': 'RATE_ACCEPTED',
'incoming_blob': {'error': 'BLOB_UNAVAILABLE'}
}
self.assertEqual(response, self.successResultOf(deferred))
self.assertEqual(response, self.handler.handle_queries(query))
def test_blob_details_are_set_when_all_conditions_are_met(self):
blob = mock.Mock()
@ -96,7 +91,6 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
'blob_data_payment_rate': 1.0,
'requested_blob': 'blob'
}
deferred = self.handler.handle_queries(query)
response = {
'blob_data_payment_rate': 'RATE_ACCEPTED',
'incoming_blob': {
@ -104,12 +98,12 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
'length': 42
}
}
result = self.successResultOf(deferred)
self.assertEqual(response, result)
self.assertEqual(response, self.handler.handle_queries(query))
class TestBlobRequestHandlerSender(unittest.TestCase):
def test_nothing_happens_if_not_currently_uploading(self):
from lbrynet.p2p.server import BlobRequestHandler
handler = BlobRequestHandler.BlobRequestHandler(None, None, None, None)
handler.currently_uploading = None
deferred = handler.send_blob_if_requested(None)
@ -119,6 +113,7 @@ class TestBlobRequestHandlerSender(unittest.TestCase):
# TODO: also check that the expected payment values are set
consumer = proto_helpers.StringTransport()
test_file = BytesIO(b'test')
from lbrynet.p2p.server import BlobRequestHandler
handler = BlobRequestHandler.BlobRequestHandler(None, None, None, None)
handler.peer = mock.create_autospec(Peer.Peer)
handler.currently_uploading = mock.Mock()

View file

@ -8,6 +8,7 @@ from twisted.internet import defer
from tests.test_utils import random_lbry_hash
from lbrynet.p2p.BlobManager import DiskBlobManager
from lbrynet.extras.compat import f2d
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.p2p.Peer import Peer
from lbrynet import conf
@ -21,14 +22,13 @@ class BlobManagerTest(unittest.TestCase):
conf.initialize_settings(False)
self.blob_dir = tempfile.mkdtemp()
self.db_dir = tempfile.mkdtemp()
self.bm = DiskBlobManager(self.blob_dir, SQLiteStorage(self.db_dir))
self.bm = DiskBlobManager(self.blob_dir, SQLiteStorage(':memory:'))
self.peer = Peer('somehost', 22)
yield self.bm.storage.setup()
yield f2d(self.bm.storage.open())
@defer.inlineCallbacks
def tearDown(self):
yield self.bm.stop()
yield self.bm.storage.stop()
yield f2d(self.bm.storage.close())
shutil.rmtree(self.blob_dir)
shutil.rmtree(self.db_dir)
@ -44,10 +44,10 @@ class BlobManagerTest(unittest.TestCase):
blob_hash = out
# create new blob
yield self.bm.setup()
blob = yield self.bm.get_blob(blob_hash, len(data))
yield f2d(self.bm.setup())
blob = self.bm.get_blob(blob_hash, len(data))
writer, finished_d = yield blob.open_for_writing(self.peer)
writer, finished_d = blob.open_for_writing(self.peer)
yield writer.write(data)
yield self.bm.blob_completed(blob, should_announce)
@ -80,7 +80,7 @@ class BlobManagerTest(unittest.TestCase):
self.assertFalse(os.path.isfile(os.path.join(self.blob_dir, blob_hash)))
blobs = yield self.bm.get_all_verified_blobs()
self.assertEqual(len(blobs), 0)
blobs = yield self.bm.storage.get_all_blob_hashes()
blobs = yield f2d(self.bm.storage.get_all_blob_hashes())
self.assertEqual(len(blobs), 0)
self.assertNotIn(blob_hash, self.bm.blobs)

View file

@ -6,6 +6,7 @@ from copy import deepcopy
from twisted.internet import defer
from twisted.trial import unittest
from lbrynet import conf
from lbrynet.extras.compat import f2d
from lbrynet.extras.daemon.storage import SQLiteStorage, open_file_for_writing
from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from tests.test_utils import random_lbry_hash
@ -85,18 +86,18 @@ class StorageTest(unittest.TestCase):
def setUp(self):
conf.initialize_settings(False)
self.db_dir = tempfile.mkdtemp()
self.storage = SQLiteStorage(self.db_dir)
yield self.storage.setup()
self.storage = SQLiteStorage(':memory:')
yield f2d(self.storage.open())
@defer.inlineCallbacks
def tearDown(self):
yield self.storage.stop()
yield f2d(self.storage.close())
shutil.rmtree(self.db_dir)
@defer.inlineCallbacks
def store_fake_blob(self, blob_hash, blob_length=100, next_announce=0, should_announce=0):
yield self.storage.add_completed_blob(blob_hash, blob_length, next_announce,
should_announce, "finished")
yield f2d(self.storage.add_completed_blob(blob_hash, blob_length, next_announce,
should_announce, "finished"))
@defer.inlineCallbacks
def store_fake_stream_blob(self, stream_hash, blob_hash, blob_num, length=100, iv="DEADBEEF"):
@ -105,13 +106,13 @@ class StorageTest(unittest.TestCase):
}
if length:
blob_info['length'] = length
yield self.storage.add_blobs_to_stream(stream_hash, [blob_info])
yield f2d(self.storage.add_blobs_to_stream(stream_hash, [blob_info]))
@defer.inlineCallbacks
def store_fake_stream(self, stream_hash, sd_hash, file_name="fake_file", key="DEADBEEF",
blobs=[]):
yield self.storage.store_stream(stream_hash, sd_hash, file_name, key,
file_name, blobs)
yield f2d(self.storage.store_stream(stream_hash, sd_hash, file_name, key,
file_name, blobs))
@defer.inlineCallbacks
def make_and_store_fake_stream(self, blob_count=2, stream_hash=None, sd_hash=None):
@ -135,9 +136,9 @@ class StorageTest(unittest.TestCase):
class TestSetup(StorageTest):
@defer.inlineCallbacks
def test_setup(self):
files = yield self.storage.get_all_lbry_files()
files = yield f2d(self.storage.get_all_lbry_files())
self.assertEqual(len(files), 0)
blobs = yield self.storage.get_all_blob_hashes()
blobs = yield f2d(self.storage.get_all_blob_hashes())
self.assertEqual(len(blobs), 0)
@ -146,17 +147,17 @@ class BlobStorageTests(StorageTest):
def test_store_blob(self):
blob_hash = random_lbry_hash()
yield self.store_fake_blob(blob_hash)
blob_hashes = yield self.storage.get_all_blob_hashes()
blob_hashes = yield f2d(self.storage.get_all_blob_hashes())
self.assertEqual(blob_hashes, [blob_hash])
@defer.inlineCallbacks
def test_delete_blob(self):
blob_hash = random_lbry_hash()
yield self.store_fake_blob(blob_hash)
blob_hashes = yield self.storage.get_all_blob_hashes()
blob_hashes = yield f2d(self.storage.get_all_blob_hashes())
self.assertEqual(blob_hashes, [blob_hash])
yield self.storage.delete_blobs_from_db(blob_hashes)
blob_hashes = yield self.storage.get_all_blob_hashes()
yield f2d(self.storage.delete_blobs_from_db(blob_hashes))
blob_hashes = yield f2d(self.storage.get_all_blob_hashes())
self.assertEqual(blob_hashes, [])
@ -172,13 +173,13 @@ class SupportsStorageTests(StorageTest):
} for i in range(20)]
expected_supports = {}
for idx, claim_id in enumerate(claim_ids):
yield self.storage.save_supports(claim_id, random_supports[idx*2:idx*2+2])
yield f2d(self.storage.save_supports(claim_id, random_supports[idx*2:idx*2+2]))
for random_support in random_supports[idx*2:idx*2+2]:
random_support['claim_id'] = claim_id
expected_supports.setdefault(claim_id, []).append(random_support)
supports = yield self.storage.get_supports(claim_ids[0])
supports = yield f2d(self.storage.get_supports(claim_ids[0]))
self.assertEqual(supports, expected_supports[claim_ids[0]])
all_supports = yield self.storage.get_supports(*claim_ids)
all_supports = yield f2d(self.storage.get_supports(*claim_ids))
for support in all_supports:
self.assertIn(support, expected_supports[support['claim_id']])
@ -199,55 +200,55 @@ class StreamStorageTests(StorageTest):
yield self.store_fake_stream_blob(stream_hash, blob1, 1)
yield self.store_fake_stream_blob(stream_hash, blob2, 2)
stream_blobs = yield self.storage.get_blobs_for_stream(stream_hash)
stream_blobs = yield f2d(self.storage.get_blobs_for_stream(stream_hash))
stream_blob_hashes = [b.blob_hash for b in stream_blobs]
self.assertListEqual(stream_blob_hashes, [blob1, blob2])
blob_hashes = yield self.storage.get_all_blob_hashes()
blob_hashes = yield f2d(self.storage.get_all_blob_hashes())
self.assertSetEqual(set(blob_hashes), {sd_hash, blob1, blob2})
stream_blobs = yield self.storage.get_blobs_for_stream(stream_hash)
stream_blobs = yield f2d(self.storage.get_blobs_for_stream(stream_hash))
stream_blob_hashes = [b.blob_hash for b in stream_blobs]
self.assertListEqual(stream_blob_hashes, [blob1, blob2])
yield self.storage.set_should_announce(sd_hash, 1, 1)
yield self.storage.set_should_announce(blob1, 1, 1)
yield f2d(self.storage.set_should_announce(sd_hash, 1, 1))
yield f2d(self.storage.set_should_announce(blob1, 1, 1))
should_announce_count = yield self.storage.count_should_announce_blobs()
should_announce_count = yield f2d(self.storage.count_should_announce_blobs())
self.assertEqual(should_announce_count, 2)
should_announce_hashes = yield self.storage.get_blobs_to_announce()
should_announce_hashes = yield f2d(self.storage.get_blobs_to_announce())
self.assertSetEqual(set(should_announce_hashes), {sd_hash, blob1})
stream_hashes = yield self.storage.get_all_streams()
stream_hashes = yield f2d(self.storage.get_all_streams())
self.assertListEqual(stream_hashes, [stream_hash])
@defer.inlineCallbacks
def test_delete_stream(self):
stream_hash = random_lbry_hash()
yield self.test_store_stream(stream_hash)
yield self.storage.delete_stream(stream_hash)
stream_hashes = yield self.storage.get_all_streams()
yield f2d(self.storage.delete_stream(stream_hash))
stream_hashes = yield f2d(self.storage.get_all_streams())
self.assertListEqual(stream_hashes, [])
stream_blobs = yield self.storage.get_blobs_for_stream(stream_hash)
stream_blobs = yield f2d(self.storage.get_blobs_for_stream(stream_hash))
self.assertListEqual(stream_blobs, [])
blob_hashes = yield self.storage.get_all_blob_hashes()
blob_hashes = yield f2d(self.storage.get_all_blob_hashes())
self.assertListEqual(blob_hashes, [])
class FileStorageTests(StorageTest):
@defer.inlineCallbacks
def test_setup_output(self):
async def test_setup_output(self):
file_name = 'encrypted_file_saver_test.tmp'
self.assertFalse(os.path.isfile(file_name))
written_to = yield open_file_for_writing(self.db_dir, file_name)
written_to = await open_file_for_writing(self.db_dir, file_name)
self.assertEqual(written_to, file_name)
self.assertTrue(os.path.isfile(os.path.join(self.db_dir, file_name)))
@defer.inlineCallbacks
def test_store_file(self):
download_directory = self.db_dir
out = yield self.storage.get_all_lbry_files()
out = yield f2d(self.storage.get_all_lbry_files())
self.assertEqual(len(out), 0)
stream_hash = random_lbry_hash()
@ -265,21 +266,21 @@ class FileStorageTests(StorageTest):
blob_data_rate = 0
file_name = "test file"
out = yield self.storage.save_published_file(
out = yield f2d(self.storage.save_published_file(
stream_hash, file_name, download_directory, blob_data_rate
)
rowid = yield self.storage.get_rowid_for_stream_hash(stream_hash)
))
rowid = yield f2d(self.storage.get_rowid_for_stream_hash(stream_hash))
self.assertEqual(out, rowid)
files = yield self.storage.get_all_lbry_files()
files = yield f2d(self.storage.get_all_lbry_files())
self.assertEqual(1, len(files))
status = yield self.storage.get_lbry_file_status(rowid)
status = yield f2d(self.storage.get_lbry_file_status(rowid))
self.assertEqual(status, ManagedEncryptedFileDownloader.STATUS_STOPPED)
running = ManagedEncryptedFileDownloader.STATUS_RUNNING
yield self.storage.change_file_status(rowid, running)
status = yield self.storage.get_lbry_file_status(rowid)
yield f2d(self.storage.change_file_status(rowid, running))
status = yield f2d(self.storage.get_lbry_file_status(rowid))
self.assertEqual(status, ManagedEncryptedFileDownloader.STATUS_RUNNING)
@ -287,7 +288,7 @@ class ContentClaimStorageTests(StorageTest):
@defer.inlineCallbacks
def test_store_content_claim(self):
download_directory = self.db_dir
out = yield self.storage.get_all_lbry_files()
out = yield f2d(self.storage.get_all_lbry_files())
self.assertEqual(len(out), 0)
stream_hash = random_lbry_hash()
@ -300,32 +301,32 @@ class ContentClaimStorageTests(StorageTest):
yield self.make_and_store_fake_stream(blob_count=2, stream_hash=stream_hash, sd_hash=sd_hash)
blob_data_rate = 0
file_name = "test file"
yield self.storage.save_published_file(
yield f2d(self.storage.save_published_file(
stream_hash, file_name, download_directory, blob_data_rate
)
yield self.storage.save_claims([fake_claim_info])
yield self.storage.save_content_claim(stream_hash, fake_outpoint)
stored_content_claim = yield self.storage.get_content_claim(stream_hash)
))
yield f2d(self.storage.save_claims([fake_claim_info]))
yield f2d(self.storage.save_content_claim(stream_hash, fake_outpoint))
stored_content_claim = yield f2d(self.storage.get_content_claim(stream_hash))
self.assertDictEqual(stored_content_claim, fake_claim_info)
stream_hashes = yield self.storage.get_old_stream_hashes_for_claim_id(fake_claim_info['claim_id'],
stream_hash)
stream_hashes = yield f2d(self.storage.get_old_stream_hashes_for_claim_id(fake_claim_info['claim_id'],
stream_hash))
self.assertListEqual(stream_hashes, [])
# test that we can't associate a claim update with a new stream to the file
second_stream_hash, second_sd_hash = random_lbry_hash(), random_lbry_hash()
yield self.make_and_store_fake_stream(blob_count=2, stream_hash=second_stream_hash, sd_hash=second_sd_hash)
with self.assertRaisesRegex(Exception, "stream mismatch"):
yield self.storage.save_content_claim(second_stream_hash, fake_outpoint)
yield f2d(self.storage.save_content_claim(second_stream_hash, fake_outpoint))
# test that we can associate a new claim update containing the same stream to the file
update_info = deepcopy(fake_claim_info)
update_info['txid'] = "beef0000" * 12
update_info['nout'] = 0
second_outpoint = "%s:%i" % (update_info['txid'], update_info['nout'])
yield self.storage.save_claims([update_info])
yield self.storage.save_content_claim(stream_hash, second_outpoint)
update_info_result = yield self.storage.get_content_claim(stream_hash)
yield f2d(self.storage.save_claims([update_info]))
yield f2d(self.storage.save_content_claim(stream_hash, second_outpoint))
update_info_result = yield f2d(self.storage.get_content_claim(stream_hash))
self.assertDictEqual(update_info_result, update_info)
# test that we can't associate an update with a mismatching claim id
@ -337,8 +338,8 @@ class ContentClaimStorageTests(StorageTest):
with self.assertRaisesRegex(Exception, "mismatching claim ids when updating stream "
"deadbeefdeadbeefdeadbeefdeadbeefdeadbeef "
"vs beef0002beef0002beef0002beef0002beef0002"):
yield self.storage.save_claims([invalid_update_info])
yield self.storage.save_content_claim(stream_hash, invalid_update_outpoint)
current_claim_info = yield self.storage.get_content_claim(stream_hash)
yield f2d(self.storage.save_claims([invalid_update_info]))
yield f2d(self.storage.save_content_claim(stream_hash, invalid_update_outpoint))
current_claim_info = yield f2d(self.storage.get_content_claim(stream_hash))
# this should still be the previous update
self.assertDictEqual(current_claim_info, update_info)

View file

@ -3,6 +3,7 @@ from twisted.trial import unittest
from twisted.internet import defer
from cryptography.hazmat.primitives.ciphers.algorithms import AES
from lbrynet.extras.compat import f2d
from lbrynet.extras.daemon.PeerManager import PeerManager
from lbrynet.p2p.StreamDescriptor import get_sd_info, BlobStreamDescriptorReader
from lbrynet.p2p.StreamDescriptor import StreamDescriptorIdentifier
@ -44,20 +45,20 @@ class CreateEncryptedFileTest(unittest.TestCase):
self.peer_finder = FakePeerFinder(5553, self.peer_manager, 2)
self.rate_limiter = DummyRateLimiter()
self.sd_identifier = StreamDescriptorIdentifier()
self.storage = SQLiteStorage(self.tmp_db_dir)
self.storage = SQLiteStorage(':memory:')
self.blob_manager = DiskBlobManager(self.tmp_blob_dir, self.storage)
self.prm = OnlyFreePaymentsManager()
self.lbry_file_manager = EncryptedFileManager(self.peer_finder, self.rate_limiter, self.blob_manager,
self.wallet, self.prm, self.storage, self.sd_identifier)
d = self.storage.setup()
d.addCallback(lambda _: self.lbry_file_manager.setup())
d = f2d(self.storage.open())
d.addCallback(lambda _: f2d(self.lbry_file_manager.setup()))
return d
@defer.inlineCallbacks
def tearDown(self):
yield self.lbry_file_manager.stop()
yield self.blob_manager.stop()
yield self.storage.stop()
yield f2d(self.blob_manager.stop())
yield f2d(self.storage.close())
rm_db_and_blob_dir(self.tmp_db_dir, self.tmp_blob_dir)
@defer.inlineCallbacks
@ -77,7 +78,7 @@ class CreateEncryptedFileTest(unittest.TestCase):
"3e62e81a2e8945b0db7c94f1852e70e371d917b994352c"
filename = 'test.file'
lbry_file = yield self.create_file(filename)
sd_hash = yield self.storage.get_sd_blob_hash_for_stream(lbry_file.stream_hash)
sd_hash = yield f2d(self.storage.get_sd_blob_hash_for_stream(lbry_file.stream_hash))
# read the sd blob file
sd_blob = self.blob_manager.blobs[sd_hash]
@ -85,7 +86,7 @@ class CreateEncryptedFileTest(unittest.TestCase):
sd_file_info = yield sd_reader.get_info()
# this comes from the database, the blobs returned are sorted
sd_info = yield get_sd_info(self.storage, lbry_file.stream_hash, include_blobs=True)
sd_info = yield f2d(get_sd_info(self.storage, lbry_file.stream_hash, include_blobs=True))
self.maxDiff = None
unicode_sd_info = json.loads(json.dumps(sd_info, sort_keys=True, cls=JSONBytesEncoder))
self.assertDictEqual(unicode_sd_info, sd_file_info)

View file

@ -1,13 +1,14 @@
from unittest import mock
from unittest import mock, skip
from twisted.internet import reactor
from twisted.trial import unittest
from lbrynet import conf
from lbrynet.extras.daemon.auth import server
#from lbrynet.extras.daemon.auth import server
from tests.mocks import mock_conf_settings
@skip
class AuthJSONRPCServerTest(unittest.TestCase):
# TODO: move to using a base class for tests
# and add useful general utilities like this

View file

@ -5,17 +5,18 @@ from twisted.internet import defer
from twisted.trial import unittest
from lbrynet import conf
from lbrynet.extras.compat import f2d
from lbrynet.schema.decode import smart_decode
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.extras.daemon.ComponentManager import ComponentManager
from lbrynet.extras.daemon.Components import DATABASE_COMPONENT, DHT_COMPONENT, WALLET_COMPONENT
from lbrynet.extras.daemon.Components import f2d
from lbrynet.extras.daemon.Components import HASH_ANNOUNCER_COMPONENT, REFLECTOR_COMPONENT
from lbrynet.extras.daemon.Components import UPNP_COMPONENT, BLOB_COMPONENT
from lbrynet.extras.daemon.Components import PEER_PROTOCOL_SERVER_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT
from lbrynet.extras.daemon.Components import RATE_LIMITER_COMPONENT, HEADERS_COMPONENT, FILE_MANAGER_COMPONENT
from lbrynet.extras.daemon.Daemon import Daemon as LBRYDaemon
from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet.blob.EncryptedFileStatusReport import EncryptedFileStatusReport
from lbrynet.extras.wallet import LbryWalletManager
from torba.client.wallet import Wallet
@ -130,14 +131,14 @@ class TestJsonRpc(unittest.TestCase):
self.test_daemon.wallet_manager.is_first_run = False
self.test_daemon.wallet_manager.get_best_blockhash = noop
@defer.inlineCallbacks
def test_status(self):
d = defer.maybeDeferred(self.test_daemon.jsonrpc_status)
d.addCallback(lambda status: self.assertDictContainsSubset({'is_running': False}, status))
status = yield f2d(self.test_daemon.jsonrpc_status())
self.assertDictContainsSubset({'is_running': False}, status)
def test_help(self):
d = defer.maybeDeferred(self.test_daemon.jsonrpc_help, command='status')
d.addCallback(lambda result: self.assertSubstring('daemon status', result['help']))
# self.assertSubstring('daemon status', d.result)
result = self.test_daemon.jsonrpc_help(command='status')
self.assertSubstring('daemon status', result['help'])
if is_android():
test_help.skip = "Test cannot pass on Android because PYTHONOPTIMIZE removes the docstrings."
@ -162,42 +163,42 @@ class TestFileListSorting(unittest.TestCase):
'ashlee27', 'bfrederick', 'brittanyhicks', 'davidsonjeffrey', 'heidiherring',
'jlewis', 'kswanson', 'michelle50', 'richard64', 'xsteele'
]
return self.test_daemon.component_manager.setup()
return f2d(self.test_daemon.component_manager.setup())
@defer.inlineCallbacks
def test_sort_by_points_paid_no_direction_specified(self):
sort_options = ['points_paid']
file_list = yield self.test_daemon.jsonrpc_file_list(sort=sort_options)
file_list = yield f2d(self.test_daemon.jsonrpc_file_list(sort=sort_options))
self.assertEqual(self.test_points_paid, [f['points_paid'] for f in file_list])
@defer.inlineCallbacks
def test_sort_by_points_paid_ascending(self):
sort_options = ['points_paid,asc']
file_list = yield self.test_daemon.jsonrpc_file_list(sort=sort_options)
file_list = yield f2d(self.test_daemon.jsonrpc_file_list(sort=sort_options))
self.assertEqual(self.test_points_paid, [f['points_paid'] for f in file_list])
@defer.inlineCallbacks
def test_sort_by_points_paid_descending(self):
sort_options = ['points_paid, desc']
file_list = yield self.test_daemon.jsonrpc_file_list(sort=sort_options)
file_list = yield f2d(self.test_daemon.jsonrpc_file_list(sort=sort_options))
self.assertEqual(list(reversed(self.test_points_paid)), [f['points_paid'] for f in file_list])
@defer.inlineCallbacks
def test_sort_by_file_name_no_direction_specified(self):
sort_options = ['file_name']
file_list = yield self.test_daemon.jsonrpc_file_list(sort=sort_options)
file_list = yield f2d(self.test_daemon.jsonrpc_file_list(sort=sort_options))
self.assertEqual(self.test_file_names, [f['file_name'] for f in file_list])
@defer.inlineCallbacks
def test_sort_by_file_name_ascending(self):
sort_options = ['file_name,\nasc']
file_list = yield self.test_daemon.jsonrpc_file_list(sort=sort_options)
file_list = yield f2d(self.test_daemon.jsonrpc_file_list(sort=sort_options))
self.assertEqual(self.test_file_names, [f['file_name'] for f in file_list])
@defer.inlineCallbacks
def test_sort_by_file_name_descending(self):
sort_options = ['\tfile_name,\n\tdesc']
file_list = yield self.test_daemon.jsonrpc_file_list(sort=sort_options)
file_list = yield f2d(self.test_daemon.jsonrpc_file_list(sort=sort_options))
self.assertEqual(list(reversed(self.test_file_names)), [f['file_name'] for f in file_list])
@defer.inlineCallbacks
@ -217,21 +218,21 @@ class TestFileListSorting(unittest.TestCase):
format_result = lambda f: 'file_name={}, points_paid={}'.format(f['file_name'], f['points_paid'])
sort_options = ['file_name,asc', 'points_paid,desc']
file_list = yield self.test_daemon.jsonrpc_file_list(sort=sort_options)
file_list = yield f2d(self.test_daemon.jsonrpc_file_list(sort=sort_options))
self.assertEqual(expected, [format_result(r) for r in file_list])
# Check that the list is not sorted as expected when sorted only by file_name.
sort_options = ['file_name,asc']
file_list = yield self.test_daemon.jsonrpc_file_list(sort=sort_options)
file_list = yield f2d(self.test_daemon.jsonrpc_file_list(sort=sort_options))
self.assertNotEqual(expected, [format_result(r) for r in file_list])
# Check that the list is not sorted as expected when sorted only by points_paid.
sort_options = ['points_paid,desc']
file_list = yield self.test_daemon.jsonrpc_file_list(sort=sort_options)
file_list = yield f2d(self.test_daemon.jsonrpc_file_list(sort=sort_options))
self.assertNotEqual(expected, [format_result(r) for r in file_list])
# Check that the list is not sorted as expected when not sorted at all.
file_list = yield self.test_daemon.jsonrpc_file_list()
file_list = yield f2d(self.test_daemon.jsonrpc_file_list())
self.assertNotEqual(expected, [format_result(r) for r in file_list])
@defer.inlineCallbacks
@ -239,16 +240,16 @@ class TestFileListSorting(unittest.TestCase):
extract_authors = lambda file_list: [f['metadata']['author'] for f in file_list]
sort_options = ['metadata.author']
file_list = yield self.test_daemon.jsonrpc_file_list(sort=sort_options)
file_list = yield f2d(self.test_daemon.jsonrpc_file_list(sort=sort_options))
self.assertEqual(self.test_authors, extract_authors(file_list))
# Check that the list matches the expected in reverse when sorting in descending order.
sort_options = ['metadata.author,desc']
file_list = yield self.test_daemon.jsonrpc_file_list(sort=sort_options)
file_list = yield f2d(self.test_daemon.jsonrpc_file_list(sort=sort_options))
self.assertEqual(list(reversed(self.test_authors)), extract_authors(file_list))
# Check that the list is not sorted as expected when not sorted at all.
file_list = yield self.test_daemon.jsonrpc_file_list()
file_list = yield f2d(self.test_daemon.jsonrpc_file_list())
self.assertNotEqual(self.test_authors, extract_authors(file_list))
@defer.inlineCallbacks
@ -256,11 +257,11 @@ class TestFileListSorting(unittest.TestCase):
sort_options = ['meta.author']
expected_message = "Failed to get 'meta.author', key 'meta' was not found."
with self.assertRaisesRegex(Exception, expected_message):
yield self.test_daemon.jsonrpc_file_list(sort=sort_options)
yield f2d(self.test_daemon.jsonrpc_file_list(sort=sort_options))
sort_options = ['metadata.foo.bar']
expected_message = "Failed to get 'metadata.foo.bar', key 'foo' was not found."
with self.assertRaisesRegex(Exception, expected_message):
yield self.test_daemon.jsonrpc_file_list(sort=sort_options)
yield f2d(self.test_daemon.jsonrpc_file_list(sort=sort_options))
@staticmethod
def _get_fake_lbry_files():
@ -269,6 +270,14 @@ class TestFileListSorting(unittest.TestCase):
lbry_file = mock.Mock(spec=ManagedEncryptedFileDownloader)
for attribute in metadata:
setattr(lbry_file, attribute, metadata[attribute])
async def get_total_bytes():
return 0
lbry_file.get_total_bytes = get_total_bytes
async def status():
return EncryptedFileStatusReport(
'file_name', 1, 1, 'completed'
)
lbry_file.status = status
faked_lbry_files.append(lbry_file)
return faked_lbry_files