This commit is contained in:
Jack Robison 2019-08-02 13:14:41 -04:00 committed by Lex Berezhny
parent 266aa1b15d
commit e2b3141536
17 changed files with 39 additions and 41 deletions

View file

@ -71,7 +71,7 @@ class AbstractBlob:
'readers'
]
def __init__(self, loop: asyncio.BaseEventLoop, blob_hash: str, length: typing.Optional[int] = None,
def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
blob_directory: typing.Optional[str] = None):
self.loop = loop
@ -175,7 +175,7 @@ class AbstractBlob:
@classmethod
async def create_from_unencrypted(
cls, loop: asyncio.BaseEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes,
cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes,
unencrypted: bytes, blob_num: int,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None) -> BlobInfo:
"""
@ -237,7 +237,7 @@ class BlobBuffer(AbstractBlob):
"""
An in-memory only blob
"""
def __init__(self, loop: asyncio.BaseEventLoop, blob_hash: str, length: typing.Optional[int] = None,
def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
blob_directory: typing.Optional[str] = None):
self._verified_bytes: typing.Optional[BytesIO] = None
@ -276,7 +276,7 @@ class BlobFile(AbstractBlob):
"""
A blob existing on the local file system
"""
def __init__(self, loop: asyncio.BaseEventLoop, blob_hash: str, length: typing.Optional[int] = None,
def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
blob_directory: typing.Optional[str] = None):
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory)
@ -324,7 +324,7 @@ class BlobFile(AbstractBlob):
@classmethod
async def create_from_unencrypted(
cls, loop: asyncio.BaseEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes,
cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes,
unencrypted: bytes, blob_num: int,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'],
asyncio.Task]] = None) -> BlobInfo:

View file

@ -16,7 +16,7 @@ log = logging.getLogger(__name__)
class BlobManager:
def __init__(self, loop: asyncio.BaseEventLoop, blob_dir: str, storage: 'SQLiteStorage', config: 'Config',
def __init__(self, loop: asyncio.AbstractEventLoop, blob_dir: str, storage: 'SQLiteStorage', config: 'Config',
node_data_store: typing.Optional['DictDataStore'] = None):
"""
This class stores blobs on the hard disk

View file

@ -14,7 +14,7 @@ log = logging.getLogger(__name__)
class BlobExchangeClientProtocol(asyncio.Protocol):
def __init__(self, loop: asyncio.BaseEventLoop, peer_timeout: typing.Optional[float] = 10,
def __init__(self, loop: asyncio.AbstractEventLoop, peer_timeout: typing.Optional[float] = 10,
connection_manager: typing.Optional['ConnectionManager'] = None):
self.loop = loop
self.peer_port: typing.Optional[int] = None
@ -39,8 +39,6 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
self.peer_address, self.peer_port = addr_info
# assert self.peer_address is not None
self.connection_manager.received_data(f"{self.peer_address}:{self.peer_port}", len(data))
#log.debug("%s:%d -- got %s bytes -- %s bytes on buffer -- %s blob bytes received",
# self.peer_address, self.peer_port, len(data), len(self.buf), self._blob_bytes_received)
if not self.transport or self.transport.is_closing():
log.warning("transport closing, but got more bytes from %s:%i\n%s", self.peer_address, self.peer_port,
binascii.hexlify(data))
@ -91,7 +89,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
log.error("error downloading blob from %s:%i: %s", self.peer_address, self.peer_port, err)
if self._response_fut and not self._response_fut.done():
self._response_fut.set_exception(err)
except (asyncio.TimeoutError) as err: # TODO: is this needed?
except asyncio.TimeoutError as err: # TODO: is this needed?
log.error("%s downloading blob from %s:%i", str(err), self.peer_address, self.peer_port)
if self._response_fut and not self._response_fut.done():
self._response_fut.set_exception(err)
@ -185,7 +183,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
self.blob, self.writer = blob, blob.get_blob_writer(self.peer_address, self.peer_port)
self._response_fut = asyncio.Future(loop=self.loop)
return await self._download_blob()
except OSError as e:
except OSError:
# i'm not sure how to fix this race condition - jack
log.warning("race happened downloading %s from %s:%s", blob_hash, self.peer_address, self.peer_port)
# return self._blob_bytes_received, self.transport
@ -220,8 +218,8 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
@cache_concurrent
async def request_blob(loop: asyncio.BaseEventLoop, blob: typing.Optional['AbstractBlob'], address: str, tcp_port: int,
peer_connect_timeout: float, blob_download_timeout: float,
async def request_blob(loop: asyncio.AbstractEventLoop, blob: typing.Optional['AbstractBlob'], address: str,
tcp_port: int, peer_connect_timeout: float, blob_download_timeout: float,
connected_transport: asyncio.Transport = None, connection_id: int = 0,
connection_manager: typing.Optional['ConnectionManager'] = None)\
-> typing.Tuple[int, typing.Optional[asyncio.Transport]]:

View file

@ -16,7 +16,7 @@ log = logging.getLogger(__name__)
class BlobDownloader:
BAN_FACTOR = 2.0 # fixme: when connection manager gets implemented, move it out from here
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager',
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',
peer_queue: asyncio.Queue):
self.loop = loop
self.config = config

View file

@ -14,7 +14,7 @@ log = logging.getLogger(__name__)
class BlobServerProtocol(asyncio.Protocol):
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str):
def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str):
self.loop = loop
self.blob_manager = blob_manager
self.server_task: asyncio.Task = None
@ -103,7 +103,7 @@ class BlobServerProtocol(asyncio.Protocol):
class BlobServer:
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str):
def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str):
self.loop = loop
self.blob_manager = blob_manager
self.server_task: typing.Optional[asyncio.Task] = None

View file

@ -9,7 +9,7 @@ log = logging.getLogger(__name__)
class BlobAnnouncer:
def __init__(self, loop: asyncio.BaseEventLoop, node: 'Node', storage: 'SQLiteStorage'):
def __init__(self, loop: asyncio.AbstractEventLoop, node: 'Node', storage: 'SQLiteStorage'):
self.loop = loop
self.node = node
self.storage = storage

View file

@ -16,7 +16,7 @@ log = logging.getLogger(__name__)
class Node:
def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int,
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int,
internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.rpc_timeout,
split_buckets_under_index: int = constants.split_buckets_under_index):
self.loop = loop

View file

@ -20,7 +20,7 @@ def is_valid_ipv4(address):
class PeerManager:
def __init__(self, loop: asyncio.BaseEventLoop):
def __init__(self, loop: asyncio.AbstractEventLoop):
self._loop = loop
self._rpc_failures: typing.Dict[
typing.Tuple[str, int], typing.Tuple[typing.Optional[float], typing.Optional[float]]
@ -61,7 +61,7 @@ class PeerManager:
self._node_tokens[node_id] = (now, token)
def get_node_token(self, node_id: bytes) -> typing.Optional[bytes]:
ts, token = self._node_tokens.get(node_id, (None, None))
ts, token = self._node_tokens.get(node_id, (0, None))
if ts and ts > self._loop.time() - constants.token_secret_refresh_interval:
return token
@ -148,7 +148,7 @@ class KademliaPeer:
'protocol_version',
]
def __init__(self, loop: asyncio.BaseEventLoop, address: str, node_id: typing.Optional[bytes] = None,
def __init__(self, loop: asyncio.AbstractEventLoop, address: str, node_id: typing.Optional[bytes] = None,
udp_port: typing.Optional[int] = None, tcp_port: typing.Optional[int] = None):
if node_id is not None:
if not len(node_id) == constants.hash_length:

View file

@ -7,7 +7,7 @@ if typing.TYPE_CHECKING:
class DictDataStore:
def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager'):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager'):
# Dictionary format:
# { <key>: [(<contact>, <age>), ...] }
self._data_store: typing.Dict[bytes, typing.List[typing.Tuple['KademliaPeer', float]]] = {}

View file

@ -70,7 +70,7 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes,
class IterativeFinder:
def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager',
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.k,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
@ -254,7 +254,7 @@ class IterativeFinder:
class IterativeNodeFinder(IterativeFinder):
def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager',
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.k,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
@ -305,7 +305,7 @@ class IterativeNodeFinder(IterativeFinder):
class IterativeValueFinder(IterativeFinder):
def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager',
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.k,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,

View file

@ -29,7 +29,7 @@ old_protocol_errors = {
class KademliaRPC:
def __init__(self, protocol: 'KademliaProtocol', loop: asyncio.BaseEventLoop, peer_port: int = 3333):
def __init__(self, protocol: 'KademliaProtocol', loop: asyncio.AbstractEventLoop, peer_port: int = 3333):
self.protocol = protocol
self.loop = loop
self.peer_port = peer_port
@ -132,7 +132,7 @@ class RemoteKademliaRPC:
Encapsulates RPC calls to remote Peers
"""
def __init__(self, loop: asyncio.BaseEventLoop, peer_tracker: 'PeerManager', protocol: 'KademliaProtocol',
def __init__(self, loop: asyncio.AbstractEventLoop, peer_tracker: 'PeerManager', protocol: 'KademliaProtocol',
peer: 'KademliaPeer'):
self.loop = loop
self.peer_tracker = peer_tracker
@ -195,7 +195,7 @@ class RemoteKademliaRPC:
class PingQueue:
def __init__(self, loop: asyncio.BaseEventLoop, protocol: 'KademliaProtocol'):
def __init__(self, loop: asyncio.AbstractEventLoop, protocol: 'KademliaProtocol'):
self._loop = loop
self._protocol = protocol
self._pending_contacts: typing.Dict['KademliaPeer', float] = {}
@ -258,7 +258,7 @@ class PingQueue:
class KademliaProtocol(DatagramProtocol):
def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str,
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str,
udp_port: int, peer_port: int, rpc_timeout: float = constants.rpc_timeout,
split_buckets_under_index: int = constants.split_buckets_under_index):
self.peer_manager = peer_manager

View file

@ -163,7 +163,7 @@ class TreeRoutingTable:
that paper.
"""
def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes,
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes,
split_buckets_under_index: int = constants.split_buckets_under_index):
self._loop = loop
self._peer_manager = peer_manager

View file

@ -58,7 +58,7 @@ class StreamDescriptor:
'sd_hash'
]
def __init__(self, loop: asyncio.BaseEventLoop, blob_dir: str, stream_name: str, key: str,
def __init__(self, loop: asyncio.AbstractEventLoop, blob_dir: str, stream_name: str, key: str,
suggested_file_name: str, blobs: typing.List[BlobInfo], stream_hash: typing.Optional[str] = None,
sd_hash: typing.Optional[str] = None):
self.loop = loop
@ -139,7 +139,7 @@ class StreamDescriptor:
return sd_blob
@classmethod
def _from_stream_descriptor_blob(cls, loop: asyncio.BaseEventLoop, blob_dir: str,
def _from_stream_descriptor_blob(cls, loop: asyncio.AbstractEventLoop, blob_dir: str,
blob: AbstractBlob) -> 'StreamDescriptor':
with blob.reader_context() as blob_reader:
json_bytes = blob_reader.read()
@ -171,7 +171,7 @@ class StreamDescriptor:
return descriptor
@classmethod
async def from_stream_descriptor_blob(cls, loop: asyncio.BaseEventLoop, blob_dir: str,
async def from_stream_descriptor_blob(cls, loop: asyncio.AbstractEventLoop, blob_dir: str,
blob: AbstractBlob) -> 'StreamDescriptor':
if not blob.is_readable():
raise InvalidStreamDescriptorError(f"unreadable/missing blob: {blob.blob_hash}")
@ -209,7 +209,7 @@ class StreamDescriptor:
@classmethod
async def create_stream(
cls, loop: asyncio.BaseEventLoop, blob_dir: str, file_path: str, key: typing.Optional[bytes] = None,
cls, loop: asyncio.AbstractEventLoop, blob_dir: str, file_path: str, key: typing.Optional[bytes] = None,
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None,
old_sort: bool = False,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'],

View file

@ -114,7 +114,7 @@ class StreamDownloader:
)
async def download_stream_blob(self, blob_info: 'BlobInfo', connection_id: int = 0) -> 'AbstractBlob':
if not filter(lambda blob: blob.blob_hash == blob_info.blob_hash, self.descriptor.blobs[:-1]):
if not filter(lambda b: b.blob_hash == blob_info.blob_hash, self.descriptor.blobs[:-1]):
raise ValueError(f"blob {blob_info.blob_hash} is not part of stream with sd hash {self.sd_hash}")
blob = await asyncio.wait_for(
self.blob_downloader.download_blob(blob_info.blob_hash, blob_info.length, connection_id),

View file

@ -33,7 +33,7 @@ def _get_next_available_file_name(download_directory: str, file_name: str) -> st
return file_name
async def get_next_available_file_name(loop: asyncio.BaseEventLoop, download_directory: str, file_name: str) -> str:
async def get_next_available_file_name(loop: asyncio.AbstractEventLoop, download_directory: str, file_name: str) -> str:
return await loop.run_in_executor(None, _get_next_available_file_name, download_directory, file_name)
@ -71,7 +71,7 @@ class ManagedStream:
'finished_write_attempt'
]
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager',
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',
sd_hash: str, download_directory: typing.Optional[str] = None, file_name: typing.Optional[str] = None,
status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None,
download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None,
@ -245,7 +245,7 @@ class ManagedStream:
}
@classmethod
async def create(cls, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager',
async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',
file_path: str, key: typing.Optional[bytes] = None,
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedStream':
descriptor = await StreamDescriptor.create_stream(

View file

@ -63,7 +63,7 @@ def path_or_none(p) -> typing.Optional[str]:
class StreamManager:
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager',
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',
wallet: 'LbryWalletManager', storage: 'SQLiteStorage', node: typing.Optional['Node'],
analytics_manager: typing.Optional['AnalyticsManager'] = None):
self.loop = loop

View file

@ -8,7 +8,7 @@ if typing.TYPE_CHECKING:
from lbry.dht.protocol.protocol import KademliaProtocol
def get_time_accelerator(loop: asyncio.BaseEventLoop,
def get_time_accelerator(loop: asyncio.AbstractEventLoop,
now: typing.Optional[float] = None) -> typing.Callable[[float], typing.Awaitable[None]]:
"""
Returns an async advance() function
@ -48,7 +48,7 @@ def get_time_accelerator(loop: asyncio.BaseEventLoop,
@contextlib.contextmanager
def mock_network_loop(loop: asyncio.BaseEventLoop):
def mock_network_loop(loop: asyncio.AbstractEventLoop):
dht_network: typing.Dict[typing.Tuple[str, int], 'KademliaProtocol'] = {}
async def create_datagram_endpoint(proto_lam: typing.Callable[[], 'KademliaProtocol'],