add connection status
This commit is contained in:
parent
8d2c9e5785
commit
c8dd1987e6
7 changed files with 156 additions and 16 deletions
|
@ -5,6 +5,7 @@ import logging
|
||||||
from lbrynet.utils import LRUCache
|
from lbrynet.utils import LRUCache
|
||||||
from lbrynet.blob.blob_file import is_valid_blobhash, BlobFile, BlobBuffer, AbstractBlob
|
from lbrynet.blob.blob_file import is_valid_blobhash, BlobFile, BlobBuffer, AbstractBlob
|
||||||
from lbrynet.stream.descriptor import StreamDescriptor
|
from lbrynet.stream.descriptor import StreamDescriptor
|
||||||
|
from lbrynet.connection_manager import ConnectionManager
|
||||||
|
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from lbrynet.conf import Config
|
from lbrynet.conf import Config
|
||||||
|
@ -33,6 +34,7 @@ class BlobManager:
|
||||||
self.config = config
|
self.config = config
|
||||||
self.decrypted_blob_lru_cache = None if not self.config.blob_lru_cache_size else LRUCache(
|
self.decrypted_blob_lru_cache = None if not self.config.blob_lru_cache_size else LRUCache(
|
||||||
self.config.blob_lru_cache_size)
|
self.config.blob_lru_cache_size)
|
||||||
|
self.connection_manager = ConnectionManager(loop)
|
||||||
|
|
||||||
def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None):
|
def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None):
|
||||||
if self.config.save_blobs:
|
if self.config.save_blobs:
|
||||||
|
@ -84,9 +86,11 @@ class BlobManager:
|
||||||
to_add = await self.storage.sync_missing_blobs(in_blobfiles_dir)
|
to_add = await self.storage.sync_missing_blobs(in_blobfiles_dir)
|
||||||
if to_add:
|
if to_add:
|
||||||
self.completed_blob_hashes.update(to_add)
|
self.completed_blob_hashes.update(to_add)
|
||||||
|
self.connection_manager.start()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
self.connection_manager.stop()
|
||||||
while self.blobs:
|
while self.blobs:
|
||||||
_, blob = self.blobs.popitem()
|
_, blob = self.blobs.popitem()
|
||||||
blob.close()
|
blob.close()
|
||||||
|
|
|
@ -8,18 +8,20 @@ from lbrynet.utils import cache_concurrent
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from lbrynet.blob.blob_file import AbstractBlob
|
from lbrynet.blob.blob_file import AbstractBlob
|
||||||
from lbrynet.blob.writer import HashBlobWriter
|
from lbrynet.blob.writer import HashBlobWriter
|
||||||
|
from lbrynet.connection_manager import ConnectionManager
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class BlobExchangeClientProtocol(asyncio.Protocol):
|
class BlobExchangeClientProtocol(asyncio.Protocol):
|
||||||
def __init__(self, loop: asyncio.BaseEventLoop, peer_timeout: typing.Optional[float] = 10):
|
def __init__(self, loop: asyncio.BaseEventLoop, peer_timeout: typing.Optional[float] = 10,
|
||||||
|
connection_manager: typing.Optional['ConnectionManager'] = None):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.peer_port: typing.Optional[int] = None
|
self.peer_port: typing.Optional[int] = None
|
||||||
self.peer_address: typing.Optional[str] = None
|
self.peer_address: typing.Optional[str] = None
|
||||||
self.peer_timeout = peer_timeout
|
|
||||||
self.transport: typing.Optional[asyncio.Transport] = None
|
self.transport: typing.Optional[asyncio.Transport] = None
|
||||||
|
self.peer_timeout = peer_timeout
|
||||||
|
self.connection_manager = connection_manager
|
||||||
self.writer: typing.Optional['HashBlobWriter'] = None
|
self.writer: typing.Optional['HashBlobWriter'] = None
|
||||||
self.blob: typing.Optional['AbstractBlob'] = None
|
self.blob: typing.Optional['AbstractBlob'] = None
|
||||||
|
|
||||||
|
@ -31,6 +33,12 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
||||||
self.closed = asyncio.Event(loop=self.loop)
|
self.closed = asyncio.Event(loop=self.loop)
|
||||||
|
|
||||||
def data_received(self, data: bytes):
|
def data_received(self, data: bytes):
|
||||||
|
if self.connection_manager:
|
||||||
|
if not self.peer_address:
|
||||||
|
addr_info = self.transport.get_extra_info('peername')
|
||||||
|
self.peer_address, self.peer_port = addr_info
|
||||||
|
# assert self.peer_address is not None
|
||||||
|
self.connection_manager.received_data(f"{self.peer_address}:{self.peer_port}", len(data))
|
||||||
#log.debug("%s:%d -- got %s bytes -- %s bytes on buffer -- %s blob bytes received",
|
#log.debug("%s:%d -- got %s bytes -- %s bytes on buffer -- %s blob bytes received",
|
||||||
# self.peer_address, self.peer_port, len(data), len(self.buf), self._blob_bytes_received)
|
# self.peer_address, self.peer_port, len(data), len(self.buf), self._blob_bytes_received)
|
||||||
if not self.transport or self.transport.is_closing():
|
if not self.transport or self.transport.is_closing():
|
||||||
|
@ -94,10 +102,15 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
||||||
"""
|
"""
|
||||||
request = BlobRequest.make_request_for_blob_hash(self.blob.blob_hash)
|
request = BlobRequest.make_request_for_blob_hash(self.blob.blob_hash)
|
||||||
blob_hash = self.blob.blob_hash
|
blob_hash = self.blob.blob_hash
|
||||||
|
if not self.peer_address:
|
||||||
|
addr_info = self.transport.get_extra_info('peername')
|
||||||
|
self.peer_address, self.peer_port = addr_info
|
||||||
try:
|
try:
|
||||||
msg = request.serialize()
|
msg = request.serialize()
|
||||||
log.debug("send request to %s:%i -> %s", self.peer_address, self.peer_port, msg.decode())
|
log.debug("send request to %s:%i -> %s", self.peer_address, self.peer_port, msg.decode())
|
||||||
self.transport.write(msg)
|
self.transport.write(msg)
|
||||||
|
if self.connection_manager:
|
||||||
|
self.connection_manager.sent_data(f"{self.peer_address}:{self.peer_port}", len(msg))
|
||||||
response: BlobResponse = await asyncio.wait_for(self._response_fut, self.peer_timeout, loop=self.loop)
|
response: BlobResponse = await asyncio.wait_for(self._response_fut, self.peer_timeout, loop=self.loop)
|
||||||
availability_response = response.get_availability_response()
|
availability_response = response.get_availability_response()
|
||||||
price_response = response.get_price_response()
|
price_response = response.get_price_response()
|
||||||
|
@ -186,11 +199,16 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
||||||
self.writer = None
|
self.writer = None
|
||||||
|
|
||||||
def connection_made(self, transport: asyncio.Transport):
|
def connection_made(self, transport: asyncio.Transport):
|
||||||
|
addr = transport.get_extra_info('peername')
|
||||||
|
self.peer_address, self.peer_port = addr[0], addr[1]
|
||||||
self.transport = transport
|
self.transport = transport
|
||||||
self.peer_address, self.peer_port = self.transport.get_extra_info('peername')
|
if self.connection_manager:
|
||||||
|
self.connection_manager.connection_made(f"{self.peer_address}:{self.peer_port}")
|
||||||
log.debug("connection made to %s:%i", self.peer_address, self.peer_port)
|
log.debug("connection made to %s:%i", self.peer_address, self.peer_port)
|
||||||
|
|
||||||
def connection_lost(self, reason):
|
def connection_lost(self, reason):
|
||||||
|
if self.connection_manager:
|
||||||
|
self.connection_manager.outgoing_connection_lost(f"{self.peer_address}:{self.peer_port}")
|
||||||
log.debug("connection lost to %s:%i (reason: %s, %s)", self.peer_address, self.peer_port, str(reason),
|
log.debug("connection lost to %s:%i (reason: %s, %s)", self.peer_address, self.peer_port, str(reason),
|
||||||
str(type(reason)))
|
str(type(reason)))
|
||||||
self.close()
|
self.close()
|
||||||
|
@ -199,16 +217,19 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
||||||
@cache_concurrent
|
@cache_concurrent
|
||||||
async def request_blob(loop: asyncio.BaseEventLoop, blob: 'AbstractBlob', address: str, tcp_port: int,
|
async def request_blob(loop: asyncio.BaseEventLoop, blob: 'AbstractBlob', address: str, tcp_port: int,
|
||||||
peer_connect_timeout: float, blob_download_timeout: float,
|
peer_connect_timeout: float, blob_download_timeout: float,
|
||||||
connected_transport: asyncio.Transport = None, connection_id: int = 0)\
|
connected_transport: asyncio.Transport = None, connection_id: int = 0,
|
||||||
|
connection_manager: typing.Optional['ConnectionManager'] = None)\
|
||||||
-> typing.Tuple[int, typing.Optional[asyncio.Transport]]:
|
-> typing.Tuple[int, typing.Optional[asyncio.Transport]]:
|
||||||
"""
|
"""
|
||||||
Returns [<downloaded blob>, <keep connection>]
|
Returns [<downloaded blob>, <keep connection>]
|
||||||
"""
|
"""
|
||||||
|
|
||||||
protocol = BlobExchangeClientProtocol(loop, blob_download_timeout)
|
protocol = BlobExchangeClientProtocol(
|
||||||
|
loop, blob_download_timeout, connection_manager
|
||||||
|
)
|
||||||
if connected_transport and not connected_transport.is_closing():
|
if connected_transport and not connected_transport.is_closing():
|
||||||
connected_transport.set_protocol(protocol)
|
connected_transport.set_protocol(protocol)
|
||||||
protocol.connection_made(connected_transport)
|
protocol.transport = connected_transport
|
||||||
log.debug("reusing connection for %s:%d", address, tcp_port)
|
log.debug("reusing connection for %s:%d", address, tcp_port)
|
||||||
else:
|
else:
|
||||||
connected_transport = None
|
connected_transport = None
|
||||||
|
|
|
@ -41,7 +41,9 @@ class BlobDownloader:
|
||||||
start = self.loop.time()
|
start = self.loop.time()
|
||||||
bytes_received, transport = await request_blob(
|
bytes_received, transport = await request_blob(
|
||||||
self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout,
|
self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout,
|
||||||
self.config.blob_download_timeout, connected_transport=transport, connection_id=connection_id
|
self.config.blob_download_timeout, connected_transport=transport, connection_id=connection_id,
|
||||||
|
connection_manager=self.blob_manager.connection_manager
|
||||||
|
|
||||||
)
|
)
|
||||||
if not transport and peer not in self.ignored:
|
if not transport and peer not in self.ignored:
|
||||||
self.ignored[peer] = self.loop.time()
|
self.ignored[peer] = self.loop.time()
|
||||||
|
|
|
@ -22,15 +22,23 @@ class BlobServerProtocol(asyncio.Protocol):
|
||||||
self.buf = b''
|
self.buf = b''
|
||||||
self.transport = None
|
self.transport = None
|
||||||
self.lbrycrd_address = lbrycrd_address
|
self.lbrycrd_address = lbrycrd_address
|
||||||
|
self.peer_address_and_port: typing.Optional[str] = None
|
||||||
|
|
||||||
def connection_made(self, transport):
|
def connection_made(self, transport):
|
||||||
self.transport = transport
|
self.transport = transport
|
||||||
|
self.peer_address_and_port = "%s:%i" % self.transport.get_extra_info('peername')
|
||||||
|
self.blob_manager.connection_manager.connection_received(self.peer_address_and_port)
|
||||||
|
|
||||||
|
def connection_lost(self, exc: typing.Optional[Exception]) -> None:
|
||||||
|
self.blob_manager.connection_manager.incoming_connection_lost(self.peer_address_and_port)
|
||||||
|
|
||||||
def send_response(self, responses: typing.List[blob_response_types]):
|
def send_response(self, responses: typing.List[blob_response_types]):
|
||||||
to_send = []
|
to_send = []
|
||||||
while responses:
|
while responses:
|
||||||
to_send.append(responses.pop())
|
to_send.append(responses.pop())
|
||||||
self.transport.write(BlobResponse(to_send).serialize())
|
serialized = BlobResponse(to_send).serialize()
|
||||||
|
self.transport.write(serialized)
|
||||||
|
self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, len(serialized))
|
||||||
|
|
||||||
async def handle_request(self, request: BlobRequest):
|
async def handle_request(self, request: BlobRequest):
|
||||||
addr = self.transport.get_extra_info('peername')
|
addr = self.transport.get_extra_info('peername')
|
||||||
|
@ -72,6 +80,7 @@ class BlobServerProtocol(asyncio.Protocol):
|
||||||
def data_received(self, data):
|
def data_received(self, data):
|
||||||
request = None
|
request = None
|
||||||
if data:
|
if data:
|
||||||
|
self.blob_manager.connection_manager.received_data(self.peer_address_and_port, len(data))
|
||||||
message, separator, remainder = data.rpartition(b'}')
|
message, separator, remainder = data.rpartition(b'}')
|
||||||
if not separator:
|
if not separator:
|
||||||
self.buf += data
|
self.buf += data
|
||||||
|
@ -97,7 +106,7 @@ class BlobServer:
|
||||||
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str):
|
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.server_task: asyncio.Task = None
|
self.server_task: typing.Optional[asyncio.Task] = None
|
||||||
self.started_listening = asyncio.Event(loop=self.loop)
|
self.started_listening = asyncio.Event(loop=self.loop)
|
||||||
self.lbrycrd_address = lbrycrd_address
|
self.lbrycrd_address = lbrycrd_address
|
||||||
self.server_protocol_class = BlobServerProtocol
|
self.server_protocol_class = BlobServerProtocol
|
||||||
|
|
90
lbrynet/connection_manager.py
Normal file
90
lbrynet/connection_manager.py
Normal file
|
@ -0,0 +1,90 @@
|
||||||
|
import asyncio
|
||||||
|
import typing
|
||||||
|
import collections
|
||||||
|
import logging
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
CONNECTED_EVENT = "connected"
|
||||||
|
DISCONNECTED_EVENT = "disconnected"
|
||||||
|
TRANSFERRED_EVENT = "transferred"
|
||||||
|
|
||||||
|
|
||||||
|
class ConnectionManager:
|
||||||
|
def __init__(self, loop: asyncio.AbstractEventLoop):
|
||||||
|
self.loop = loop
|
||||||
|
self.incoming_connected: typing.Set[str] = set()
|
||||||
|
self.incoming: typing.DefaultDict[str, int] = collections.defaultdict(int)
|
||||||
|
self.outgoing_connected: typing.Set[str] = set()
|
||||||
|
self.outgoing: typing.DefaultDict[str, int] = collections.defaultdict(int)
|
||||||
|
self._status = {}
|
||||||
|
|
||||||
|
self._task: typing.Optional[asyncio.Task] = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def status(self):
|
||||||
|
return self._status
|
||||||
|
|
||||||
|
def sent_data(self, host_and_port: str, size: int):
|
||||||
|
self.outgoing[host_and_port] += size
|
||||||
|
|
||||||
|
def received_data(self, host_and_port: str, size: int):
|
||||||
|
self.incoming[host_and_port] += size
|
||||||
|
|
||||||
|
def connection_made(self, host_and_port: str):
|
||||||
|
self.outgoing_connected.add(host_and_port)
|
||||||
|
|
||||||
|
def connection_received(self, host_and_port: str):
|
||||||
|
# self.incoming_connected.add(host_and_port)
|
||||||
|
pass
|
||||||
|
|
||||||
|
def outgoing_connection_lost(self, host_and_port: str):
|
||||||
|
if host_and_port in self.outgoing_connected:
|
||||||
|
self.outgoing_connected.remove(host_and_port)
|
||||||
|
|
||||||
|
def incoming_connection_lost(self, host_and_port: str):
|
||||||
|
if host_and_port in self.incoming_connected:
|
||||||
|
self.incoming_connected.remove(host_and_port)
|
||||||
|
|
||||||
|
async def _update(self):
|
||||||
|
|
||||||
|
self._status = {
|
||||||
|
'incoming_bps': {},
|
||||||
|
'outgoing_bps': {},
|
||||||
|
'total_incoming_mbs': 0.0,
|
||||||
|
'total_outgoing_mbs': 0.0,
|
||||||
|
'time': self.loop.time()
|
||||||
|
}
|
||||||
|
|
||||||
|
while True:
|
||||||
|
last = self.loop.time()
|
||||||
|
await asyncio.sleep(1, loop=self.loop)
|
||||||
|
self._status['incoming_bps'].clear()
|
||||||
|
self._status['outgoing_bps'].clear()
|
||||||
|
while self.outgoing:
|
||||||
|
k, v = self.outgoing.popitem()
|
||||||
|
self._status['outgoing_bps'][k] = v
|
||||||
|
while self.incoming:
|
||||||
|
k, v = self.incoming.popitem()
|
||||||
|
self._status['incoming_bps'][k] = v
|
||||||
|
now = self.loop.time()
|
||||||
|
self._status['total_outgoing_mbs'] = int(sum(list(self._status['outgoing_bps'].values())
|
||||||
|
) / (now - last)) / 1000000.0
|
||||||
|
self._status['total_incoming_mbs'] = int(sum(list(self._status['incoming_bps'].values())
|
||||||
|
) / (now - last)) / 1000000.0
|
||||||
|
self._status['time'] = now
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
if self._task:
|
||||||
|
self._task.cancel()
|
||||||
|
self._task = None
|
||||||
|
self.outgoing.clear()
|
||||||
|
self.outgoing_connected.clear()
|
||||||
|
self.incoming.clear()
|
||||||
|
self.incoming_connected.clear()
|
||||||
|
self._status.clear()
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
self.stop()
|
||||||
|
self._task = self.loop.create_task(self._update())
|
|
@ -278,7 +278,7 @@ class BlobComponent(Component):
|
||||||
|
|
||||||
def __init__(self, component_manager):
|
def __init__(self, component_manager):
|
||||||
super().__init__(component_manager)
|
super().__init__(component_manager)
|
||||||
self.blob_manager: BlobManager = None
|
self.blob_manager: typing.Optional[BlobManager] = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def component(self) -> typing.Optional[BlobManager]:
|
def component(self) -> typing.Optional[BlobManager]:
|
||||||
|
@ -294,7 +294,7 @@ class BlobComponent(Component):
|
||||||
blob_dir = os.path.join(self.conf.data_dir, 'blobfiles')
|
blob_dir = os.path.join(self.conf.data_dir, 'blobfiles')
|
||||||
if not os.path.isdir(blob_dir):
|
if not os.path.isdir(blob_dir):
|
||||||
os.mkdir(blob_dir)
|
os.mkdir(blob_dir)
|
||||||
self.blob_manager = BlobManager(asyncio.get_event_loop(), blob_dir, storage, self.conf, data_store)
|
self.blob_manager = BlobManager(self.component_manager.loop, blob_dir, storage, self.conf, data_store)
|
||||||
return await self.blob_manager.setup()
|
return await self.blob_manager.setup()
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
|
@ -304,7 +304,10 @@ class BlobComponent(Component):
|
||||||
count = 0
|
count = 0
|
||||||
if self.blob_manager:
|
if self.blob_manager:
|
||||||
count = len(self.blob_manager.completed_blob_hashes)
|
count = len(self.blob_manager.completed_blob_hashes)
|
||||||
return {'finished_blobs': count}
|
return {
|
||||||
|
'finished_blobs': count,
|
||||||
|
'connections': self.blob_manager.connection_manager.status
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class DHTComponent(Component):
|
class DHTComponent(Component):
|
||||||
|
@ -405,7 +408,7 @@ class StreamManagerComponent(Component):
|
||||||
|
|
||||||
def __init__(self, component_manager):
|
def __init__(self, component_manager):
|
||||||
super().__init__(component_manager)
|
super().__init__(component_manager)
|
||||||
self.stream_manager: StreamManager = None
|
self.stream_manager: typing.Optional[StreamManager] = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def component(self) -> typing.Optional[StreamManager]:
|
def component(self) -> typing.Optional[StreamManager]:
|
||||||
|
@ -415,7 +418,7 @@ class StreamManagerComponent(Component):
|
||||||
if not self.stream_manager:
|
if not self.stream_manager:
|
||||||
return
|
return
|
||||||
return {
|
return {
|
||||||
'managed_files': len(self.stream_manager.streams)
|
'managed_files': len(self.stream_manager.streams),
|
||||||
}
|
}
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
|
@ -444,7 +447,7 @@ class PeerProtocolServerComponent(Component):
|
||||||
|
|
||||||
def __init__(self, component_manager):
|
def __init__(self, component_manager):
|
||||||
super().__init__(component_manager)
|
super().__init__(component_manager)
|
||||||
self.blob_server: BlobServer = None
|
self.blob_server: typing.Optional[BlobServer] = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def component(self) -> typing.Optional[BlobServer]:
|
def component(self) -> typing.Optional[BlobServer]:
|
||||||
|
|
|
@ -746,6 +746,17 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
},
|
},
|
||||||
'blob_manager': {
|
'blob_manager': {
|
||||||
'finished_blobs': (int) number of finished blobs in the blob manager,
|
'finished_blobs': (int) number of finished blobs in the blob manager,
|
||||||
|
'connections': {
|
||||||
|
'incoming_bps': {
|
||||||
|
<source ip and tcp port>: (int) bytes per second received,
|
||||||
|
},
|
||||||
|
'outgoing_bps': {
|
||||||
|
<destination ip and tcp port>: (int) bytes per second sent,
|
||||||
|
},
|
||||||
|
'total_outgoing_mps': (float) megabytes per second sent,
|
||||||
|
'total_incoming_mps': (float) megabytes per second received,
|
||||||
|
'time': (float) timestamp
|
||||||
|
}
|
||||||
},
|
},
|
||||||
'hash_announcer': {
|
'hash_announcer': {
|
||||||
'announce_queue_size': (int) number of blobs currently queued to be announced
|
'announce_queue_size': (int) number of blobs currently queued to be announced
|
||||||
|
|
Loading…
Reference in a new issue