forked from LBRYCommunity/lbry-sdk
lots o bugs fixed discovered while using desktop app
This commit is contained in:
parent
3fdcb80960
commit
16e596ec87
7 changed files with 29 additions and 25 deletions
|
@ -149,7 +149,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
|
||||||
status = ManagedEncryptedFileDownloader.STATUS_RUNNING
|
status = ManagedEncryptedFileDownloader.STATUS_RUNNING
|
||||||
status = yield self.lbry_file_manager.change_lbry_file_status(self, status)
|
status = yield self.lbry_file_manager.change_lbry_file_status(self, status)
|
||||||
self._saving_status = False
|
self._saving_status = False
|
||||||
defer.returnValue(status)
|
return status
|
||||||
|
|
||||||
def save_status(self):
|
def save_status(self):
|
||||||
return self._save_status()
|
return self._save_status()
|
||||||
|
|
|
@ -236,7 +236,7 @@ class EncryptedFileManager:
|
||||||
def reflect_lbry_files(self):
|
def reflect_lbry_files(self):
|
||||||
sem = defer.DeferredSemaphore(self.CONCURRENT_REFLECTS)
|
sem = defer.DeferredSemaphore(self.CONCURRENT_REFLECTS)
|
||||||
ds = []
|
ds = []
|
||||||
sd_hashes_to_reflect = yield self.storage.get_streams_to_re_reflect()
|
sd_hashes_to_reflect = yield f2d(self.storage.get_streams_to_re_reflect())
|
||||||
for lbry_file in self.lbry_files:
|
for lbry_file in self.lbry_files:
|
||||||
if lbry_file.sd_hash in sd_hashes_to_reflect:
|
if lbry_file.sd_hash in sd_hashes_to_reflect:
|
||||||
ds.append(sem.run(reflect_file, lbry_file))
|
ds.append(sem.run(reflect_file, lbry_file))
|
||||||
|
|
|
@ -57,7 +57,7 @@ def start_daemon(settings: typing.Optional[typing.Dict] = None,
|
||||||
|
|
||||||
if check_connection():
|
if check_connection():
|
||||||
daemon = Daemon()
|
daemon = Daemon()
|
||||||
asyncio.create_task(daemon.start_listening())
|
reactor._asyncioEventloop.create_task(daemon.start_listening())
|
||||||
reactor.run()
|
reactor.run()
|
||||||
else:
|
else:
|
||||||
log.info("Not connected to internet, unable to start")
|
log.info("Not connected to internet, unable to start")
|
||||||
|
|
|
@ -7,7 +7,7 @@ import math
|
||||||
import binascii
|
import binascii
|
||||||
from hashlib import sha256
|
from hashlib import sha256
|
||||||
from types import SimpleNamespace
|
from types import SimpleNamespace
|
||||||
from twisted.internet import defer, reactor, error, task
|
from twisted.internet import defer, reactor, error
|
||||||
|
|
||||||
from aioupnp import __version__ as aioupnp_version
|
from aioupnp import __version__ as aioupnp_version
|
||||||
from aioupnp.upnp import UPnP
|
from aioupnp.upnp import UPnP
|
||||||
|
@ -404,7 +404,7 @@ class HashAnnouncerComponent(Component):
|
||||||
def component(self):
|
def component(self):
|
||||||
return self.hash_announcer
|
return self.hash_announcer
|
||||||
|
|
||||||
def start(self):
|
async def start(self):
|
||||||
storage = self.component_manager.get_component(DATABASE_COMPONENT)
|
storage = self.component_manager.get_component(DATABASE_COMPONENT)
|
||||||
dht_node = self.component_manager.get_component(DHT_COMPONENT)
|
dht_node = self.component_manager.get_component(DHT_COMPONENT)
|
||||||
self.hash_announcer = DHTHashAnnouncer(dht_node, storage)
|
self.hash_announcer = DHTHashAnnouncer(dht_node, storage)
|
||||||
|
@ -539,7 +539,7 @@ class PeerProtocolServerComponent(Component):
|
||||||
try:
|
try:
|
||||||
log.info("Peer protocol listening on TCP %i (ext port %i)", peer_port,
|
log.info("Peer protocol listening on TCP %i (ext port %i)", peer_port,
|
||||||
upnp.upnp_redirects.get("TCP", peer_port))
|
upnp.upnp_redirects.get("TCP", peer_port))
|
||||||
self.lbry_server_port = await d2f(reactor.listenTCP(peer_port, server_factory))
|
self.lbry_server_port = reactor.listenTCP(peer_port, server_factory)
|
||||||
except error.CannotListenError as e:
|
except error.CannotListenError as e:
|
||||||
import traceback
|
import traceback
|
||||||
log.error("Couldn't bind to port %d. Visit lbry.io/faq/how-to-change-port for"
|
log.error("Couldn't bind to port %d. Visit lbry.io/faq/how-to-change-port for"
|
||||||
|
@ -597,13 +597,18 @@ class UPnPComponent(Component):
|
||||||
self.upnp = None
|
self.upnp = None
|
||||||
self.upnp_redirects = {}
|
self.upnp_redirects = {}
|
||||||
self.external_ip = None
|
self.external_ip = None
|
||||||
self._maintain_redirects_lc = task.LoopingCall(self._maintain_redirects)
|
self._maintain_redirects_task = None
|
||||||
self._maintain_redirects_lc.clock = self.component_manager.reactor
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def component(self):
|
def component(self):
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
async def _repeatedly_maintain_redirects(self, now=True):
|
||||||
|
while True:
|
||||||
|
if now:
|
||||||
|
await self._maintain_redirects()
|
||||||
|
await asyncio.sleep(360)
|
||||||
|
|
||||||
async def _maintain_redirects(self):
|
async def _maintain_redirects(self):
|
||||||
# setup the gateway if necessary
|
# setup the gateway if necessary
|
||||||
if not self.upnp:
|
if not self.upnp:
|
||||||
|
@ -692,13 +697,15 @@ class UPnPComponent(Component):
|
||||||
else:
|
else:
|
||||||
log.error("failed to setup upnp")
|
log.error("failed to setup upnp")
|
||||||
self.component_manager.analytics_manager.send_upnp_setup_success_fail(success, await self.get_status())
|
self.component_manager.analytics_manager.send_upnp_setup_success_fail(success, await self.get_status())
|
||||||
self._maintain_redirects_lc.start(360, now=False)
|
self._maintain_redirects_task = asyncio.create_task(self._repeatedly_maintain_redirects(now=False))
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
if self.upnp_redirects:
|
if self.upnp_redirects:
|
||||||
await asyncio.wait([
|
await asyncio.wait([
|
||||||
self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items()
|
self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items()
|
||||||
])
|
])
|
||||||
|
if self._maintain_redirects_task is not None and not self._maintain_redirects_task.done():
|
||||||
|
self._maintain_redirects_task.cancel()
|
||||||
|
|
||||||
async def get_status(self):
|
async def get_status(self):
|
||||||
return {
|
return {
|
||||||
|
|
|
@ -6,6 +6,7 @@ from twisted.protocols.basic import FileSender
|
||||||
from twisted.internet.protocol import Protocol, ClientFactory
|
from twisted.internet.protocol import Protocol, ClientFactory
|
||||||
from twisted.internet import defer, error
|
from twisted.internet import defer, error
|
||||||
|
|
||||||
|
from lbrynet.extras.compat import f2d
|
||||||
from lbrynet.extras.reflector.common import IncompleteResponse, ReflectorRequestError
|
from lbrynet.extras.reflector.common import IncompleteResponse, ReflectorRequestError
|
||||||
from lbrynet.extras.reflector.common import REFLECTOR_V1, REFLECTOR_V2
|
from lbrynet.extras.reflector.common import REFLECTOR_V1, REFLECTOR_V2
|
||||||
|
|
||||||
|
@ -61,7 +62,9 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
else:
|
else:
|
||||||
reflected = False
|
reflected = False
|
||||||
|
|
||||||
d = self.blob_manager.storage.update_reflected_stream(self.sd_hash, self.transport.getPeer().host, reflected)
|
d = f2d(self.blob_manager.storage.update_reflected_stream(
|
||||||
|
self.sd_hash, self.transport.getPeer().host, reflected
|
||||||
|
))
|
||||||
d.addCallback(lambda _: result)
|
d.addCallback(lambda _: result)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
@ -113,10 +116,7 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
for crypt_blob in blobs:
|
for crypt_blob in blobs:
|
||||||
if crypt_blob.blob_hash and crypt_blob.length:
|
if crypt_blob.blob_hash and crypt_blob.length:
|
||||||
yield self.blob_manager.get_blob(crypt_blob.blob_hash, crypt_blob.length)
|
yield self.blob_manager.get_blob(crypt_blob.blob_hash, crypt_blob.length)
|
||||||
|
return [blob for r, blob in get_blobs(blobs_in_stream) if r and blob.get_is_verified()]
|
||||||
dl = defer.DeferredList(list(get_blobs(blobs_in_stream)), consumeErrors=True)
|
|
||||||
dl.addCallback(lambda blobs: [blob for r, blob in blobs if r and blob.get_is_verified()])
|
|
||||||
return dl
|
|
||||||
|
|
||||||
def set_blobs_to_send(self, blobs_to_send):
|
def set_blobs_to_send(self, blobs_to_send):
|
||||||
for blob in blobs_to_send:
|
for blob in blobs_to_send:
|
||||||
|
@ -132,7 +132,7 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
len(filtered))
|
len(filtered))
|
||||||
return filtered
|
return filtered
|
||||||
|
|
||||||
d = self.factory.blob_manager.storage.get_blobs_for_stream(self.stream_hash)
|
d = f2d(self.factory.blob_manager.storage.get_blobs_for_stream(self.stream_hash))
|
||||||
d.addCallback(self.get_validated_blobs)
|
d.addCallback(self.get_validated_blobs)
|
||||||
if not self.descriptor_needed:
|
if not self.descriptor_needed:
|
||||||
d.addCallback(lambda filtered:
|
d.addCallback(lambda filtered:
|
||||||
|
|
|
@ -5,7 +5,7 @@ from twisted.internet import defer, task
|
||||||
from twisted.internet.error import ConnectingCancelledError
|
from twisted.internet.error import ConnectingCancelledError
|
||||||
from twisted.web._newclient import ResponseNeverReceived
|
from twisted.web._newclient import ResponseNeverReceived
|
||||||
|
|
||||||
from lbrynet.utils import DeferredDict
|
from lbrynet.extras.compat import f2d
|
||||||
from lbrynet.p2p.Error import DownloadCanceledError
|
from lbrynet.p2p.Error import DownloadCanceledError
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
@ -104,9 +104,7 @@ class HTTPBlobDownloader:
|
||||||
log.debug("trying to download stream from mirror (sd %s)", self.sd_hashes[0][:8])
|
log.debug("trying to download stream from mirror (sd %s)", self.sd_hashes[0][:8])
|
||||||
else:
|
else:
|
||||||
log.debug("trying to download %i blobs from mirror", len(self.blob_hashes))
|
log.debug("trying to download %i blobs from mirror", len(self.blob_hashes))
|
||||||
blobs = yield DeferredDict(
|
blobs = {blob_hash: self.blob_manager.get_blob(blob_hash) for blob_hash in self.blob_hashes}
|
||||||
{blob_hash: self.blob_manager.get_blob(blob_hash) for blob_hash in self.blob_hashes}
|
|
||||||
)
|
|
||||||
self.deferreds = [self.download_blob(blobs[blob_hash]) for blob_hash in self.blob_hashes]
|
self.deferreds = [self.download_blob(blobs[blob_hash]) for blob_hash in self.blob_hashes]
|
||||||
yield defer.DeferredList(self.deferreds)
|
yield defer.DeferredList(self.deferreds)
|
||||||
if self.retry and self.missing_blob_hashes:
|
if self.retry and self.missing_blob_hashes:
|
||||||
|
@ -175,7 +173,7 @@ class HTTPBlobDownloader:
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def download_stream(self, stream_hash, sd_hash):
|
def download_stream(self, stream_hash, sd_hash):
|
||||||
stream_crypt_blobs = yield self.blob_manager.storage.get_blobs_for_stream(stream_hash)
|
stream_crypt_blobs = yield f2d(self.blob_manager.storage.get_blobs_for_stream(stream_hash))
|
||||||
self.blob_hashes.extend([
|
self.blob_hashes.extend([
|
||||||
b.blob_hash for b in stream_crypt_blobs
|
b.blob_hash for b in stream_crypt_blobs
|
||||||
if b.blob_hash and b.blob_hash not in self.blob_hashes
|
if b.blob_hash and b.blob_hash not in self.blob_hashes
|
||||||
|
|
|
@ -152,16 +152,15 @@ class PointTraderKeyQueryHandler:
|
||||||
try:
|
try:
|
||||||
decode_rsa_key(new_encoded_pub_key)
|
decode_rsa_key(new_encoded_pub_key)
|
||||||
except (ValueError, TypeError, IndexError):
|
except (ValueError, TypeError, IndexError):
|
||||||
value_error = ValueError(f"Client sent an invalid public key: {new_encoded_pub_key}")
|
raise ValueError(f"Client sent an invalid public key: {new_encoded_pub_key}")
|
||||||
return defer.fail(Failure(value_error))
|
|
||||||
self.public_key = new_encoded_pub_key
|
self.public_key = new_encoded_pub_key
|
||||||
self.wallet.set_public_key_for_peer(self.peer, self.public_key)
|
self.wallet.set_public_key_for_peer(self.peer, self.public_key)
|
||||||
fields = {'public_key': self.wallet.encoded_public_key.decode()}
|
fields = {'public_key': self.wallet.encoded_public_key.decode()}
|
||||||
return defer.succeed(fields)
|
return fields
|
||||||
if self.public_key is None:
|
if self.public_key is None:
|
||||||
return defer.fail(Failure(ValueError("Expected but did not receive a public key")))
|
raise ValueError("Expected but did not receive a public key")
|
||||||
else:
|
else:
|
||||||
return defer.succeed({})
|
return {}
|
||||||
|
|
||||||
|
|
||||||
class Wallet:
|
class Wallet:
|
||||||
|
|
Loading…
Reference in a new issue