This commit is contained in:
Jack Robison 2019-01-29 20:47:02 -05:00
parent 8f7cf0b38f
commit 933d58d49b
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 9 additions and 7 deletions

View file

@ -1,6 +1,7 @@
import asyncio import asyncio
import logging import logging
import typing import typing
import binascii
from lbrynet.blob_exchange.serialization import BlobResponse, BlobRequest from lbrynet.blob_exchange.serialization import BlobResponse, BlobRequest
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbrynet.blob.blob_file import BlobFile from lbrynet.blob.blob_file import BlobFile
@ -25,7 +26,9 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
self._request_lock = asyncio.Lock(loop=self.loop) self._request_lock = asyncio.Lock(loop=self.loop)
def data_received(self, data: bytes): def data_received(self, data: bytes):
if self.transport.is_closing(): # TODO: is this needed? if not self.transport or self.transport.is_closing():
log.warning("transport closing, but got more bytes from %s:%i\n%s", self.peer_address, self.peer_port,
binascii.hexlify(data))
if self._response_fut and not self._response_fut.done(): if self._response_fut and not self._response_fut.done():
self._response_fut.cancel() self._response_fut.cancel()
return return

View file

@ -59,7 +59,7 @@ class StreamAssembler:
decrypted = await self.loop.run_in_executor(None, _decrypt_and_write) decrypted = await self.loop.run_in_executor(None, _decrypt_and_write)
if decrypted: if decrypted:
log.info("decrypted %s", blob.blob_hash[:8]) log.debug("decrypted %s", blob.blob_hash[:8])
return return
async def assemble_decrypted_stream(self, output_dir: str, output_file_name: typing.Optional[str] = None): async def assemble_decrypted_stream(self, output_dir: str, output_file_name: typing.Optional[str] = None):

View file

@ -29,7 +29,6 @@ class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor t
self.peer_timeout = peer_timeout self.peer_timeout = peer_timeout
self.peer_connect_timeout = peer_connect_timeout self.peer_connect_timeout = peer_connect_timeout
self.current_blob: 'BlobFile' = None self.current_blob: 'BlobFile' = None
self.download_task: asyncio.Task = None self.download_task: asyncio.Task = None
self.accumulate_connections_task: asyncio.Task = None self.accumulate_connections_task: asyncio.Task = None
self.new_peer_event = asyncio.Event(loop=self.loop) self.new_peer_event = asyncio.Event(loop=self.loop)
@ -106,13 +105,16 @@ class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor t
loop=self.loop) loop=self.loop)
if got_new_peer and not got_new_peer.done(): if got_new_peer and not got_new_peer.done():
got_new_peer.cancel() got_new_peer.cancel()
async with self._lock: async with self._lock:
if self.current_blob.get_is_verified(): if self.current_blob.get_is_verified():
# a download attempt finished
if got_new_peer and not got_new_peer.done(): if got_new_peer and not got_new_peer.done():
got_new_peer.cancel() got_new_peer.cancel()
drain_tasks(download_tasks) drain_tasks(download_tasks)
return self.current_blob return self.current_blob
else: else:
# we got a new peer, re add the other pending download attempts
for task in download_tasks: for task in download_tasks:
if task and not task.done(): if task and not task.done():
self.running_download_requests.append(task) self.running_download_requests.append(task)
@ -147,14 +149,13 @@ class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor t
added += 1 added += 1
if added: if added:
if not self.new_peer_event.is_set(): if not self.new_peer_event.is_set():
log.info("added %i new peers", len(peers)) log.debug("added %i new peers", len(peers))
self.new_peer_event.set() self.new_peer_event.set()
async def _accumulate_connections(self, node: 'Node'): async def _accumulate_connections(self, node: 'Node'):
blob_queue = asyncio.Queue(loop=self.loop) blob_queue = asyncio.Queue(loop=self.loop)
blob_queue.put_nowait(self.sd_hash) blob_queue.put_nowait(self.sd_hash)
task = asyncio.create_task(self.got_descriptor.wait()) task = asyncio.create_task(self.got_descriptor.wait())
added_peers = asyncio.Event(loop=self.loop)
add_fixed_peers_timer: typing.Optional[asyncio.Handle] = None add_fixed_peers_timer: typing.Optional[asyncio.Handle] = None
if self.fixed_peers: if self.fixed_peers:
@ -178,8 +179,6 @@ class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor t
async for peers in search_junction: async for peers in search_junction:
if peers: if peers:
self._add_peer_protocols(peers) self._add_peer_protocols(peers)
if not added_peers.is_set():
added_peers.set()
return return
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass