diff --git a/lbry/lbry/blob/blob_file.py b/lbry/lbry/blob/blob_file.py index 6b1218bd9..45ac2b26f 100644 --- a/lbry/lbry/blob/blob_file.py +++ b/lbry/lbry/blob/blob_file.py @@ -71,7 +71,7 @@ class AbstractBlob: 'readers' ] - def __init__(self, loop: asyncio.BaseEventLoop, blob_hash: str, length: typing.Optional[int] = None, + def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None, blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None, blob_directory: typing.Optional[str] = None): self.loop = loop @@ -175,7 +175,7 @@ class AbstractBlob: @classmethod async def create_from_unencrypted( - cls, loop: asyncio.BaseEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes, + cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes, unencrypted: bytes, blob_num: int, blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None) -> BlobInfo: """ @@ -237,7 +237,7 @@ class BlobBuffer(AbstractBlob): """ An in-memory only blob """ - def __init__(self, loop: asyncio.BaseEventLoop, blob_hash: str, length: typing.Optional[int] = None, + def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None, blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None, blob_directory: typing.Optional[str] = None): self._verified_bytes: typing.Optional[BytesIO] = None @@ -276,7 +276,7 @@ class BlobFile(AbstractBlob): """ A blob existing on the local file system """ - def __init__(self, loop: asyncio.BaseEventLoop, blob_hash: str, length: typing.Optional[int] = None, + def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None, blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None, blob_directory: typing.Optional[str] = None): super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory) @@ -324,7 +324,7 @@ class BlobFile(AbstractBlob): @classmethod async def create_from_unencrypted( - cls, loop: asyncio.BaseEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes, + cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes, unencrypted: bytes, blob_num: int, blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None) -> BlobInfo: diff --git a/lbry/lbry/blob/blob_manager.py b/lbry/lbry/blob/blob_manager.py index 64ef5f2cb..71c646484 100644 --- a/lbry/lbry/blob/blob_manager.py +++ b/lbry/lbry/blob/blob_manager.py @@ -16,7 +16,7 @@ log = logging.getLogger(__name__) class BlobManager: - def __init__(self, loop: asyncio.BaseEventLoop, blob_dir: str, storage: 'SQLiteStorage', config: 'Config', + def __init__(self, loop: asyncio.AbstractEventLoop, blob_dir: str, storage: 'SQLiteStorage', config: 'Config', node_data_store: typing.Optional['DictDataStore'] = None): """ This class stores blobs on the hard disk diff --git a/lbry/lbry/blob_exchange/client.py b/lbry/lbry/blob_exchange/client.py index ff3516d32..35ef4284f 100644 --- a/lbry/lbry/blob_exchange/client.py +++ b/lbry/lbry/blob_exchange/client.py @@ -14,7 +14,7 @@ log = logging.getLogger(__name__) class BlobExchangeClientProtocol(asyncio.Protocol): - def __init__(self, loop: asyncio.BaseEventLoop, peer_timeout: typing.Optional[float] = 10, + def __init__(self, loop: asyncio.AbstractEventLoop, peer_timeout: typing.Optional[float] = 10, connection_manager: typing.Optional['ConnectionManager'] = None): self.loop = loop self.peer_port: typing.Optional[int] = None @@ -39,8 +39,6 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.peer_address, self.peer_port = addr_info # assert self.peer_address is not None self.connection_manager.received_data(f"{self.peer_address}:{self.peer_port}", len(data)) - #log.debug("%s:%d -- got %s bytes -- %s bytes on buffer -- %s blob bytes received", - # self.peer_address, self.peer_port, len(data), len(self.buf), self._blob_bytes_received) if not self.transport or self.transport.is_closing(): log.warning("transport closing, but got more bytes from %s:%i\n%s", self.peer_address, self.peer_port, binascii.hexlify(data)) @@ -91,7 +89,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): log.error("error downloading blob from %s:%i: %s", self.peer_address, self.peer_port, err) if self._response_fut and not self._response_fut.done(): self._response_fut.set_exception(err) - except (asyncio.TimeoutError) as err: # TODO: is this needed? + except asyncio.TimeoutError as err: # TODO: is this needed? log.error("%s downloading blob from %s:%i", str(err), self.peer_address, self.peer_port) if self._response_fut and not self._response_fut.done(): self._response_fut.set_exception(err) @@ -185,7 +183,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.blob, self.writer = blob, blob.get_blob_writer(self.peer_address, self.peer_port) self._response_fut = asyncio.Future(loop=self.loop) return await self._download_blob() - except OSError as e: + except OSError: # i'm not sure how to fix this race condition - jack log.warning("race happened downloading %s from %s:%s", blob_hash, self.peer_address, self.peer_port) # return self._blob_bytes_received, self.transport @@ -220,8 +218,8 @@ class BlobExchangeClientProtocol(asyncio.Protocol): @cache_concurrent -async def request_blob(loop: asyncio.BaseEventLoop, blob: typing.Optional['AbstractBlob'], address: str, tcp_port: int, - peer_connect_timeout: float, blob_download_timeout: float, +async def request_blob(loop: asyncio.AbstractEventLoop, blob: typing.Optional['AbstractBlob'], address: str, + tcp_port: int, peer_connect_timeout: float, blob_download_timeout: float, connected_transport: asyncio.Transport = None, connection_id: int = 0, connection_manager: typing.Optional['ConnectionManager'] = None)\ -> typing.Tuple[int, typing.Optional[asyncio.Transport]]: diff --git a/lbry/lbry/blob_exchange/downloader.py b/lbry/lbry/blob_exchange/downloader.py index 3c60e5db1..98d626fa0 100644 --- a/lbry/lbry/blob_exchange/downloader.py +++ b/lbry/lbry/blob_exchange/downloader.py @@ -16,7 +16,7 @@ log = logging.getLogger(__name__) class BlobDownloader: BAN_FACTOR = 2.0 # fixme: when connection manager gets implemented, move it out from here - def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager', + def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', peer_queue: asyncio.Queue): self.loop = loop self.config = config diff --git a/lbry/lbry/blob_exchange/server.py b/lbry/lbry/blob_exchange/server.py index 07ee0ce39..70234acb8 100644 --- a/lbry/lbry/blob_exchange/server.py +++ b/lbry/lbry/blob_exchange/server.py @@ -14,7 +14,7 @@ log = logging.getLogger(__name__) class BlobServerProtocol(asyncio.Protocol): - def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str): + def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str): self.loop = loop self.blob_manager = blob_manager self.server_task: asyncio.Task = None @@ -103,7 +103,7 @@ class BlobServerProtocol(asyncio.Protocol): class BlobServer: - def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str): + def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str): self.loop = loop self.blob_manager = blob_manager self.server_task: typing.Optional[asyncio.Task] = None diff --git a/lbry/lbry/dht/blob_announcer.py b/lbry/lbry/dht/blob_announcer.py index d9d3a8233..22d12a550 100644 --- a/lbry/lbry/dht/blob_announcer.py +++ b/lbry/lbry/dht/blob_announcer.py @@ -9,7 +9,7 @@ log = logging.getLogger(__name__) class BlobAnnouncer: - def __init__(self, loop: asyncio.BaseEventLoop, node: 'Node', storage: 'SQLiteStorage'): + def __init__(self, loop: asyncio.AbstractEventLoop, node: 'Node', storage: 'SQLiteStorage'): self.loop = loop self.node = node self.storage = storage diff --git a/lbry/lbry/dht/node.py b/lbry/lbry/dht/node.py index 1fba74863..d6d358db5 100644 --- a/lbry/lbry/dht/node.py +++ b/lbry/lbry/dht/node.py @@ -16,7 +16,7 @@ log = logging.getLogger(__name__) class Node: - def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int, + def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int, internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.rpc_timeout, split_buckets_under_index: int = constants.split_buckets_under_index): self.loop = loop diff --git a/lbry/lbry/dht/peer.py b/lbry/lbry/dht/peer.py index 5e6d1925b..3bc2f7e75 100644 --- a/lbry/lbry/dht/peer.py +++ b/lbry/lbry/dht/peer.py @@ -20,7 +20,7 @@ def is_valid_ipv4(address): class PeerManager: - def __init__(self, loop: asyncio.BaseEventLoop): + def __init__(self, loop: asyncio.AbstractEventLoop): self._loop = loop self._rpc_failures: typing.Dict[ typing.Tuple[str, int], typing.Tuple[typing.Optional[float], typing.Optional[float]] @@ -61,7 +61,7 @@ class PeerManager: self._node_tokens[node_id] = (now, token) def get_node_token(self, node_id: bytes) -> typing.Optional[bytes]: - ts, token = self._node_tokens.get(node_id, (None, None)) + ts, token = self._node_tokens.get(node_id, (0, None)) if ts and ts > self._loop.time() - constants.token_secret_refresh_interval: return token @@ -148,7 +148,7 @@ class KademliaPeer: 'protocol_version', ] - def __init__(self, loop: asyncio.BaseEventLoop, address: str, node_id: typing.Optional[bytes] = None, + def __init__(self, loop: asyncio.AbstractEventLoop, address: str, node_id: typing.Optional[bytes] = None, udp_port: typing.Optional[int] = None, tcp_port: typing.Optional[int] = None): if node_id is not None: if not len(node_id) == constants.hash_length: diff --git a/lbry/lbry/dht/protocol/data_store.py b/lbry/lbry/dht/protocol/data_store.py index d4678b0e3..73e69c80e 100644 --- a/lbry/lbry/dht/protocol/data_store.py +++ b/lbry/lbry/dht/protocol/data_store.py @@ -7,7 +7,7 @@ if typing.TYPE_CHECKING: class DictDataStore: - def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager'): + def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager'): # Dictionary format: # { : [(, ), ...] } self._data_store: typing.Dict[bytes, typing.List[typing.Tuple['KademliaPeer', float]]] = {} diff --git a/lbry/lbry/dht/protocol/iterative_find.py b/lbry/lbry/dht/protocol/iterative_find.py index 0d36cdd5c..8f0e26437 100644 --- a/lbry/lbry/dht/protocol/iterative_find.py +++ b/lbry/lbry/dht/protocol/iterative_find.py @@ -70,7 +70,7 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes, class IterativeFinder: - def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager', + def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.k, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, @@ -254,7 +254,7 @@ class IterativeFinder: class IterativeNodeFinder(IterativeFinder): - def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager', + def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.k, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, @@ -305,7 +305,7 @@ class IterativeNodeFinder(IterativeFinder): class IterativeValueFinder(IterativeFinder): - def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager', + def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.k, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, diff --git a/lbry/lbry/dht/protocol/protocol.py b/lbry/lbry/dht/protocol/protocol.py index a60e8424b..68940f35b 100644 --- a/lbry/lbry/dht/protocol/protocol.py +++ b/lbry/lbry/dht/protocol/protocol.py @@ -29,7 +29,7 @@ old_protocol_errors = { class KademliaRPC: - def __init__(self, protocol: 'KademliaProtocol', loop: asyncio.BaseEventLoop, peer_port: int = 3333): + def __init__(self, protocol: 'KademliaProtocol', loop: asyncio.AbstractEventLoop, peer_port: int = 3333): self.protocol = protocol self.loop = loop self.peer_port = peer_port @@ -132,7 +132,7 @@ class RemoteKademliaRPC: Encapsulates RPC calls to remote Peers """ - def __init__(self, loop: asyncio.BaseEventLoop, peer_tracker: 'PeerManager', protocol: 'KademliaProtocol', + def __init__(self, loop: asyncio.AbstractEventLoop, peer_tracker: 'PeerManager', protocol: 'KademliaProtocol', peer: 'KademliaPeer'): self.loop = loop self.peer_tracker = peer_tracker @@ -195,7 +195,7 @@ class RemoteKademliaRPC: class PingQueue: - def __init__(self, loop: asyncio.BaseEventLoop, protocol: 'KademliaProtocol'): + def __init__(self, loop: asyncio.AbstractEventLoop, protocol: 'KademliaProtocol'): self._loop = loop self._protocol = protocol self._pending_contacts: typing.Dict['KademliaPeer', float] = {} @@ -258,7 +258,7 @@ class PingQueue: class KademliaProtocol(DatagramProtocol): - def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str, + def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str, udp_port: int, peer_port: int, rpc_timeout: float = constants.rpc_timeout, split_buckets_under_index: int = constants.split_buckets_under_index): self.peer_manager = peer_manager diff --git a/lbry/lbry/dht/protocol/routing_table.py b/lbry/lbry/dht/protocol/routing_table.py index 8775ae22f..d0d1415bf 100644 --- a/lbry/lbry/dht/protocol/routing_table.py +++ b/lbry/lbry/dht/protocol/routing_table.py @@ -163,7 +163,7 @@ class TreeRoutingTable: that paper. """ - def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes, + def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes, split_buckets_under_index: int = constants.split_buckets_under_index): self._loop = loop self._peer_manager = peer_manager diff --git a/lbry/lbry/stream/descriptor.py b/lbry/lbry/stream/descriptor.py index 5c8a9861d..1b7856d85 100644 --- a/lbry/lbry/stream/descriptor.py +++ b/lbry/lbry/stream/descriptor.py @@ -58,7 +58,7 @@ class StreamDescriptor: 'sd_hash' ] - def __init__(self, loop: asyncio.BaseEventLoop, blob_dir: str, stream_name: str, key: str, + def __init__(self, loop: asyncio.AbstractEventLoop, blob_dir: str, stream_name: str, key: str, suggested_file_name: str, blobs: typing.List[BlobInfo], stream_hash: typing.Optional[str] = None, sd_hash: typing.Optional[str] = None): self.loop = loop @@ -139,7 +139,7 @@ class StreamDescriptor: return sd_blob @classmethod - def _from_stream_descriptor_blob(cls, loop: asyncio.BaseEventLoop, blob_dir: str, + def _from_stream_descriptor_blob(cls, loop: asyncio.AbstractEventLoop, blob_dir: str, blob: AbstractBlob) -> 'StreamDescriptor': with blob.reader_context() as blob_reader: json_bytes = blob_reader.read() @@ -171,7 +171,7 @@ class StreamDescriptor: return descriptor @classmethod - async def from_stream_descriptor_blob(cls, loop: asyncio.BaseEventLoop, blob_dir: str, + async def from_stream_descriptor_blob(cls, loop: asyncio.AbstractEventLoop, blob_dir: str, blob: AbstractBlob) -> 'StreamDescriptor': if not blob.is_readable(): raise InvalidStreamDescriptorError(f"unreadable/missing blob: {blob.blob_hash}") @@ -209,7 +209,7 @@ class StreamDescriptor: @classmethod async def create_stream( - cls, loop: asyncio.BaseEventLoop, blob_dir: str, file_path: str, key: typing.Optional[bytes] = None, + cls, loop: asyncio.AbstractEventLoop, blob_dir: str, file_path: str, key: typing.Optional[bytes] = None, iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None, old_sort: bool = False, blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], diff --git a/lbry/lbry/stream/downloader.py b/lbry/lbry/stream/downloader.py index 717299a77..df79b6a4f 100644 --- a/lbry/lbry/stream/downloader.py +++ b/lbry/lbry/stream/downloader.py @@ -114,7 +114,7 @@ class StreamDownloader: ) async def download_stream_blob(self, blob_info: 'BlobInfo', connection_id: int = 0) -> 'AbstractBlob': - if not filter(lambda blob: blob.blob_hash == blob_info.blob_hash, self.descriptor.blobs[:-1]): + if not filter(lambda b: b.blob_hash == blob_info.blob_hash, self.descriptor.blobs[:-1]): raise ValueError(f"blob {blob_info.blob_hash} is not part of stream with sd hash {self.sd_hash}") blob = await asyncio.wait_for( self.blob_downloader.download_blob(blob_info.blob_hash, blob_info.length, connection_id), diff --git a/lbry/lbry/stream/managed_stream.py b/lbry/lbry/stream/managed_stream.py index f82d6a51b..2f8ce85a5 100644 --- a/lbry/lbry/stream/managed_stream.py +++ b/lbry/lbry/stream/managed_stream.py @@ -33,7 +33,7 @@ def _get_next_available_file_name(download_directory: str, file_name: str) -> st return file_name -async def get_next_available_file_name(loop: asyncio.BaseEventLoop, download_directory: str, file_name: str) -> str: +async def get_next_available_file_name(loop: asyncio.AbstractEventLoop, download_directory: str, file_name: str) -> str: return await loop.run_in_executor(None, _get_next_available_file_name, download_directory, file_name) @@ -71,7 +71,7 @@ class ManagedStream: 'finished_write_attempt' ] - def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager', + def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', sd_hash: str, download_directory: typing.Optional[str] = None, file_name: typing.Optional[str] = None, status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None, download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None, @@ -245,7 +245,7 @@ class ManagedStream: } @classmethod - async def create(cls, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager', + async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', file_path: str, key: typing.Optional[bytes] = None, iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedStream': descriptor = await StreamDescriptor.create_stream( diff --git a/lbry/lbry/stream/stream_manager.py b/lbry/lbry/stream/stream_manager.py index f52661bcc..2e458bfe9 100644 --- a/lbry/lbry/stream/stream_manager.py +++ b/lbry/lbry/stream/stream_manager.py @@ -63,7 +63,7 @@ def path_or_none(p) -> typing.Optional[str]: class StreamManager: - def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager', + def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', wallet: 'LbryWalletManager', storage: 'SQLiteStorage', node: typing.Optional['Node'], analytics_manager: typing.Optional['AnalyticsManager'] = None): self.loop = loop diff --git a/lbry/tests/dht_mocks.py b/lbry/tests/dht_mocks.py index f08a2cc57..17b0d3daf 100644 --- a/lbry/tests/dht_mocks.py +++ b/lbry/tests/dht_mocks.py @@ -8,7 +8,7 @@ if typing.TYPE_CHECKING: from lbry.dht.protocol.protocol import KademliaProtocol -def get_time_accelerator(loop: asyncio.BaseEventLoop, +def get_time_accelerator(loop: asyncio.AbstractEventLoop, now: typing.Optional[float] = None) -> typing.Callable[[float], typing.Awaitable[None]]: """ Returns an async advance() function @@ -48,7 +48,7 @@ def get_time_accelerator(loop: asyncio.BaseEventLoop, @contextlib.contextmanager -def mock_network_loop(loop: asyncio.BaseEventLoop): +def mock_network_loop(loop: asyncio.AbstractEventLoop): dht_network: typing.Dict[typing.Tuple[str, int], 'KademliaProtocol'] = {} async def create_datagram_endpoint(proto_lam: typing.Callable[[], 'KademliaProtocol'],