From c8dd1987e6bfdd0a2614a5fdcb478e77f56ff323 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sun, 2 Jun 2019 23:50:17 -0400 Subject: [PATCH] add connection status --- lbrynet/blob/blob_manager.py | 4 ++ lbrynet/blob_exchange/client.py | 35 ++++++++--- lbrynet/blob_exchange/downloader.py | 4 +- lbrynet/blob_exchange/server.py | 13 ++++- lbrynet/connection_manager.py | 90 +++++++++++++++++++++++++++++ lbrynet/extras/daemon/Components.py | 15 +++-- lbrynet/extras/daemon/Daemon.py | 11 ++++ 7 files changed, 156 insertions(+), 16 deletions(-) create mode 100644 lbrynet/connection_manager.py diff --git a/lbrynet/blob/blob_manager.py b/lbrynet/blob/blob_manager.py index cc68d01b0..793a4d9d1 100644 --- a/lbrynet/blob/blob_manager.py +++ b/lbrynet/blob/blob_manager.py @@ -5,6 +5,7 @@ import logging from lbrynet.utils import LRUCache from lbrynet.blob.blob_file import is_valid_blobhash, BlobFile, BlobBuffer, AbstractBlob from lbrynet.stream.descriptor import StreamDescriptor +from lbrynet.connection_manager import ConnectionManager if typing.TYPE_CHECKING: from lbrynet.conf import Config @@ -33,6 +34,7 @@ class BlobManager: self.config = config self.decrypted_blob_lru_cache = None if not self.config.blob_lru_cache_size else LRUCache( self.config.blob_lru_cache_size) + self.connection_manager = ConnectionManager(loop) def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None): if self.config.save_blobs: @@ -84,9 +86,11 @@ class BlobManager: to_add = await self.storage.sync_missing_blobs(in_blobfiles_dir) if to_add: self.completed_blob_hashes.update(to_add) + self.connection_manager.start() return True def stop(self): + self.connection_manager.stop() while self.blobs: _, blob = self.blobs.popitem() blob.close() diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 4051d8dc3..7a67b2e39 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -8,18 +8,20 @@ from lbrynet.utils import cache_concurrent if typing.TYPE_CHECKING: from lbrynet.blob.blob_file import AbstractBlob from lbrynet.blob.writer import HashBlobWriter + from lbrynet.connection_manager import ConnectionManager log = logging.getLogger(__name__) class BlobExchangeClientProtocol(asyncio.Protocol): - def __init__(self, loop: asyncio.BaseEventLoop, peer_timeout: typing.Optional[float] = 10): + def __init__(self, loop: asyncio.BaseEventLoop, peer_timeout: typing.Optional[float] = 10, + connection_manager: typing.Optional['ConnectionManager'] = None): self.loop = loop self.peer_port: typing.Optional[int] = None self.peer_address: typing.Optional[str] = None - self.peer_timeout = peer_timeout self.transport: typing.Optional[asyncio.Transport] = None - + self.peer_timeout = peer_timeout + self.connection_manager = connection_manager self.writer: typing.Optional['HashBlobWriter'] = None self.blob: typing.Optional['AbstractBlob'] = None @@ -31,6 +33,12 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.closed = asyncio.Event(loop=self.loop) def data_received(self, data: bytes): + if self.connection_manager: + if not self.peer_address: + addr_info = self.transport.get_extra_info('peername') + self.peer_address, self.peer_port = addr_info + # assert self.peer_address is not None + self.connection_manager.received_data(f"{self.peer_address}:{self.peer_port}", len(data)) #log.debug("%s:%d -- got %s bytes -- %s bytes on buffer -- %s blob bytes received", # self.peer_address, self.peer_port, len(data), len(self.buf), self._blob_bytes_received) if not self.transport or self.transport.is_closing(): @@ -94,10 +102,15 @@ class BlobExchangeClientProtocol(asyncio.Protocol): """ request = BlobRequest.make_request_for_blob_hash(self.blob.blob_hash) blob_hash = self.blob.blob_hash + if not self.peer_address: + addr_info = self.transport.get_extra_info('peername') + self.peer_address, self.peer_port = addr_info try: msg = request.serialize() log.debug("send request to %s:%i -> %s", self.peer_address, self.peer_port, msg.decode()) self.transport.write(msg) + if self.connection_manager: + self.connection_manager.sent_data(f"{self.peer_address}:{self.peer_port}", len(msg)) response: BlobResponse = await asyncio.wait_for(self._response_fut, self.peer_timeout, loop=self.loop) availability_response = response.get_availability_response() price_response = response.get_price_response() @@ -186,11 +199,16 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.writer = None def connection_made(self, transport: asyncio.Transport): + addr = transport.get_extra_info('peername') + self.peer_address, self.peer_port = addr[0], addr[1] self.transport = transport - self.peer_address, self.peer_port = self.transport.get_extra_info('peername') + if self.connection_manager: + self.connection_manager.connection_made(f"{self.peer_address}:{self.peer_port}") log.debug("connection made to %s:%i", self.peer_address, self.peer_port) def connection_lost(self, reason): + if self.connection_manager: + self.connection_manager.outgoing_connection_lost(f"{self.peer_address}:{self.peer_port}") log.debug("connection lost to %s:%i (reason: %s, %s)", self.peer_address, self.peer_port, str(reason), str(type(reason))) self.close() @@ -199,16 +217,19 @@ class BlobExchangeClientProtocol(asyncio.Protocol): @cache_concurrent async def request_blob(loop: asyncio.BaseEventLoop, blob: 'AbstractBlob', address: str, tcp_port: int, peer_connect_timeout: float, blob_download_timeout: float, - connected_transport: asyncio.Transport = None, connection_id: int = 0)\ + connected_transport: asyncio.Transport = None, connection_id: int = 0, + connection_manager: typing.Optional['ConnectionManager'] = None)\ -> typing.Tuple[int, typing.Optional[asyncio.Transport]]: """ Returns [, ] """ - protocol = BlobExchangeClientProtocol(loop, blob_download_timeout) + protocol = BlobExchangeClientProtocol( + loop, blob_download_timeout, connection_manager + ) if connected_transport and not connected_transport.is_closing(): connected_transport.set_protocol(protocol) - protocol.connection_made(connected_transport) + protocol.transport = connected_transport log.debug("reusing connection for %s:%d", address, tcp_port) else: connected_transport = None diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 364a22f70..87723e5db 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -41,7 +41,9 @@ class BlobDownloader: start = self.loop.time() bytes_received, transport = await request_blob( self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, - self.config.blob_download_timeout, connected_transport=transport, connection_id=connection_id + self.config.blob_download_timeout, connected_transport=transport, connection_id=connection_id, + connection_manager=self.blob_manager.connection_manager + ) if not transport and peer not in self.ignored: self.ignored[peer] = self.loop.time() diff --git a/lbrynet/blob_exchange/server.py b/lbrynet/blob_exchange/server.py index da9bd3bf1..d7b0c49bb 100644 --- a/lbrynet/blob_exchange/server.py +++ b/lbrynet/blob_exchange/server.py @@ -22,15 +22,23 @@ class BlobServerProtocol(asyncio.Protocol): self.buf = b'' self.transport = None self.lbrycrd_address = lbrycrd_address + self.peer_address_and_port: typing.Optional[str] = None def connection_made(self, transport): self.transport = transport + self.peer_address_and_port = "%s:%i" % self.transport.get_extra_info('peername') + self.blob_manager.connection_manager.connection_received(self.peer_address_and_port) + + def connection_lost(self, exc: typing.Optional[Exception]) -> None: + self.blob_manager.connection_manager.incoming_connection_lost(self.peer_address_and_port) def send_response(self, responses: typing.List[blob_response_types]): to_send = [] while responses: to_send.append(responses.pop()) - self.transport.write(BlobResponse(to_send).serialize()) + serialized = BlobResponse(to_send).serialize() + self.transport.write(serialized) + self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, len(serialized)) async def handle_request(self, request: BlobRequest): addr = self.transport.get_extra_info('peername') @@ -72,6 +80,7 @@ class BlobServerProtocol(asyncio.Protocol): def data_received(self, data): request = None if data: + self.blob_manager.connection_manager.received_data(self.peer_address_and_port, len(data)) message, separator, remainder = data.rpartition(b'}') if not separator: self.buf += data @@ -97,7 +106,7 @@ class BlobServer: def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str): self.loop = loop self.blob_manager = blob_manager - self.server_task: asyncio.Task = None + self.server_task: typing.Optional[asyncio.Task] = None self.started_listening = asyncio.Event(loop=self.loop) self.lbrycrd_address = lbrycrd_address self.server_protocol_class = BlobServerProtocol diff --git a/lbrynet/connection_manager.py b/lbrynet/connection_manager.py new file mode 100644 index 000000000..342989a7e --- /dev/null +++ b/lbrynet/connection_manager.py @@ -0,0 +1,90 @@ +import asyncio +import typing +import collections +import logging + +log = logging.getLogger(__name__) + + +CONNECTED_EVENT = "connected" +DISCONNECTED_EVENT = "disconnected" +TRANSFERRED_EVENT = "transferred" + + +class ConnectionManager: + def __init__(self, loop: asyncio.AbstractEventLoop): + self.loop = loop + self.incoming_connected: typing.Set[str] = set() + self.incoming: typing.DefaultDict[str, int] = collections.defaultdict(int) + self.outgoing_connected: typing.Set[str] = set() + self.outgoing: typing.DefaultDict[str, int] = collections.defaultdict(int) + self._status = {} + + self._task: typing.Optional[asyncio.Task] = None + + @property + def status(self): + return self._status + + def sent_data(self, host_and_port: str, size: int): + self.outgoing[host_and_port] += size + + def received_data(self, host_and_port: str, size: int): + self.incoming[host_and_port] += size + + def connection_made(self, host_and_port: str): + self.outgoing_connected.add(host_and_port) + + def connection_received(self, host_and_port: str): + # self.incoming_connected.add(host_and_port) + pass + + def outgoing_connection_lost(self, host_and_port: str): + if host_and_port in self.outgoing_connected: + self.outgoing_connected.remove(host_and_port) + + def incoming_connection_lost(self, host_and_port: str): + if host_and_port in self.incoming_connected: + self.incoming_connected.remove(host_and_port) + + async def _update(self): + + self._status = { + 'incoming_bps': {}, + 'outgoing_bps': {}, + 'total_incoming_mbs': 0.0, + 'total_outgoing_mbs': 0.0, + 'time': self.loop.time() + } + + while True: + last = self.loop.time() + await asyncio.sleep(1, loop=self.loop) + self._status['incoming_bps'].clear() + self._status['outgoing_bps'].clear() + while self.outgoing: + k, v = self.outgoing.popitem() + self._status['outgoing_bps'][k] = v + while self.incoming: + k, v = self.incoming.popitem() + self._status['incoming_bps'][k] = v + now = self.loop.time() + self._status['total_outgoing_mbs'] = int(sum(list(self._status['outgoing_bps'].values()) + ) / (now - last)) / 1000000.0 + self._status['total_incoming_mbs'] = int(sum(list(self._status['incoming_bps'].values()) + ) / (now - last)) / 1000000.0 + self._status['time'] = now + + def stop(self): + if self._task: + self._task.cancel() + self._task = None + self.outgoing.clear() + self.outgoing_connected.clear() + self.incoming.clear() + self.incoming_connected.clear() + self._status.clear() + + def start(self): + self.stop() + self._task = self.loop.create_task(self._update()) diff --git a/lbrynet/extras/daemon/Components.py b/lbrynet/extras/daemon/Components.py index 359ed97f9..73a7be8c3 100644 --- a/lbrynet/extras/daemon/Components.py +++ b/lbrynet/extras/daemon/Components.py @@ -278,7 +278,7 @@ class BlobComponent(Component): def __init__(self, component_manager): super().__init__(component_manager) - self.blob_manager: BlobManager = None + self.blob_manager: typing.Optional[BlobManager] = None @property def component(self) -> typing.Optional[BlobManager]: @@ -294,7 +294,7 @@ class BlobComponent(Component): blob_dir = os.path.join(self.conf.data_dir, 'blobfiles') if not os.path.isdir(blob_dir): os.mkdir(blob_dir) - self.blob_manager = BlobManager(asyncio.get_event_loop(), blob_dir, storage, self.conf, data_store) + self.blob_manager = BlobManager(self.component_manager.loop, blob_dir, storage, self.conf, data_store) return await self.blob_manager.setup() async def stop(self): @@ -304,7 +304,10 @@ class BlobComponent(Component): count = 0 if self.blob_manager: count = len(self.blob_manager.completed_blob_hashes) - return {'finished_blobs': count} + return { + 'finished_blobs': count, + 'connections': self.blob_manager.connection_manager.status + } class DHTComponent(Component): @@ -405,7 +408,7 @@ class StreamManagerComponent(Component): def __init__(self, component_manager): super().__init__(component_manager) - self.stream_manager: StreamManager = None + self.stream_manager: typing.Optional[StreamManager] = None @property def component(self) -> typing.Optional[StreamManager]: @@ -415,7 +418,7 @@ class StreamManagerComponent(Component): if not self.stream_manager: return return { - 'managed_files': len(self.stream_manager.streams) + 'managed_files': len(self.stream_manager.streams), } async def start(self): @@ -444,7 +447,7 @@ class PeerProtocolServerComponent(Component): def __init__(self, component_manager): super().__init__(component_manager) - self.blob_server: BlobServer = None + self.blob_server: typing.Optional[BlobServer] = None @property def component(self) -> typing.Optional[BlobServer]: diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index fcecd227e..390ef004a 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -746,6 +746,17 @@ class Daemon(metaclass=JSONRPCServerType): }, 'blob_manager': { 'finished_blobs': (int) number of finished blobs in the blob manager, + 'connections': { + 'incoming_bps': { + : (int) bytes per second received, + }, + 'outgoing_bps': { + : (int) bytes per second sent, + }, + 'total_outgoing_mps': (float) megabytes per second sent, + 'total_incoming_mps': (float) megabytes per second received, + 'time': (float) timestamp + } }, 'hash_announcer': { 'announce_queue_size': (int) number of blobs currently queued to be announced