forked from LBRYCommunity/lbry-sdk
changes from review
This commit is contained in:
parent
e54cc8850c
commit
629812337b
2 changed files with 18 additions and 18 deletions
|
@ -733,14 +733,14 @@ class TrackerAnnouncerComponent(Component):
|
||||||
|
|
||||||
async def announce_forever(self):
|
async def announce_forever(self):
|
||||||
while True:
|
while True:
|
||||||
to_sleep = 60.0
|
sleep_seconds = 60.0
|
||||||
to_announce = []
|
announce_sd_hashes = []
|
||||||
for file in self.file_manager.get_filtered():
|
for file in self.file_manager.get_filtered():
|
||||||
if not file.downloader:
|
if not file.downloader:
|
||||||
continue
|
continue
|
||||||
to_announce.append(bytes.fromhex(file.sd_hash))
|
announce_sd_hashes.append(bytes.fromhex(file.sd_hash))
|
||||||
await self.tracker_client.announce_many(*to_announce)
|
await self.tracker_client.announce_many(*announce_sd_hashes)
|
||||||
await asyncio.sleep(to_sleep)
|
await asyncio.sleep(sleep_seconds)
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
node = self.component_manager.get_component(DHT_COMPONENT) \
|
node = self.component_manager.get_component(DHT_COMPONENT) \
|
||||||
|
|
|
@ -31,7 +31,7 @@ ScrapeRequest = namedtuple("ScrapeRequest", ["connection_id", "action", "transac
|
||||||
ScrapeResponse = namedtuple("ScrapeResponse", ["action", "transaction_id", "items"])
|
ScrapeResponse = namedtuple("ScrapeResponse", ["action", "transaction_id", "items"])
|
||||||
ScrapeResponseItem = namedtuple("ScrapeResponseItem", ["seeders", "completed", "leechers"])
|
ScrapeResponseItem = namedtuple("ScrapeResponseItem", ["seeders", "completed", "leechers"])
|
||||||
ErrorResponse = namedtuple("ErrorResponse", ["action", "transaction_id", "message"])
|
ErrorResponse = namedtuple("ErrorResponse", ["action", "transaction_id", "message"])
|
||||||
STRUCTS = {
|
structs = {
|
||||||
ConnectRequest: struct.Struct(">QII"),
|
ConnectRequest: struct.Struct(">QII"),
|
||||||
ConnectResponse: struct.Struct(">IIQ"),
|
ConnectResponse: struct.Struct(">IIQ"),
|
||||||
AnnounceRequest: struct.Struct(">QII20s20sQQQIIIiH"),
|
AnnounceRequest: struct.Struct(">QII20s20sQQQIIIiH"),
|
||||||
|
@ -45,26 +45,26 @@ STRUCTS = {
|
||||||
|
|
||||||
|
|
||||||
def decode(cls, data, offset=0):
|
def decode(cls, data, offset=0):
|
||||||
decoder = STRUCTS[cls]
|
decoder = structs[cls]
|
||||||
if cls == AnnounceResponse:
|
if cls is AnnounceResponse:
|
||||||
return AnnounceResponse(*decoder.unpack_from(data, offset),
|
return AnnounceResponse(*decoder.unpack_from(data, offset),
|
||||||
peers=[decode(CompactIPv4Peer, data, index) for index in range(20, len(data), 6)])
|
peers=[decode(CompactIPv4Peer, data, index) for index in range(20, len(data), 6)])
|
||||||
elif cls == ScrapeResponse:
|
elif cls is ScrapeResponse:
|
||||||
return ScrapeResponse(*decoder.unpack_from(data, offset),
|
return ScrapeResponse(*decoder.unpack_from(data, offset),
|
||||||
items=[decode(ScrapeResponseItem, data, index) for index in range(8, len(data), 12)])
|
items=[decode(ScrapeResponseItem, data, index) for index in range(8, len(data), 12)])
|
||||||
elif cls == ErrorResponse:
|
elif cls is ErrorResponse:
|
||||||
return ErrorResponse(*decoder.unpack_from(data, offset), data[decoder.size:])
|
return ErrorResponse(*decoder.unpack_from(data, offset), data[decoder.size:])
|
||||||
return cls(*decoder.unpack_from(data, offset))
|
return cls(*decoder.unpack_from(data, offset))
|
||||||
|
|
||||||
|
|
||||||
def encode(obj):
|
def encode(obj):
|
||||||
if isinstance(obj, ScrapeRequest):
|
if isinstance(obj, ScrapeRequest):
|
||||||
return STRUCTS[ScrapeRequest].pack(*obj[:-1]) + b''.join(obj.infohashes)
|
return structs[ScrapeRequest].pack(*obj[:-1]) + b''.join(obj.infohashes)
|
||||||
elif isinstance(obj, ErrorResponse):
|
elif isinstance(obj, ErrorResponse):
|
||||||
return STRUCTS[ErrorResponse].pack(*obj[:-1]) + obj.message
|
return structs[ErrorResponse].pack(*obj[:-1]) + obj.message
|
||||||
elif isinstance(obj, AnnounceResponse):
|
elif isinstance(obj, AnnounceResponse):
|
||||||
return STRUCTS[AnnounceResponse].pack(*obj[:-1]) + b''.join([encode(peer) for peer in obj.peers])
|
return structs[AnnounceResponse].pack(*obj[:-1]) + b''.join([encode(peer) for peer in obj.peers])
|
||||||
return STRUCTS[type(obj)].pack(*obj)
|
return structs[type(obj)].pack(*obj)
|
||||||
|
|
||||||
|
|
||||||
def make_peer_id(random_part: Optional[str] = None) -> bytes:
|
def make_peer_id(random_part: Optional[str] = None) -> bytes:
|
||||||
|
@ -136,7 +136,7 @@ class UDPTrackerClientProtocol(asyncio.DatagramProtocol):
|
||||||
|
|
||||||
|
|
||||||
class TrackerClient:
|
class TrackerClient:
|
||||||
EVENT_CONTROLLER = StreamController()
|
event_controller = StreamController()
|
||||||
|
|
||||||
def __init__(self, node_id, announce_port, get_servers, timeout=10.0):
|
def __init__(self, node_id, announce_port, get_servers, timeout=10.0):
|
||||||
self.client = UDPTrackerClientProtocol(timeout=timeout)
|
self.client = UDPTrackerClientProtocol(timeout=timeout)
|
||||||
|
@ -150,7 +150,7 @@ class TrackerClient:
|
||||||
async def start(self):
|
async def start(self):
|
||||||
self.transport, _ = await asyncio.get_running_loop().create_datagram_endpoint(
|
self.transport, _ = await asyncio.get_running_loop().create_datagram_endpoint(
|
||||||
lambda: self.client, local_addr=("0.0.0.0", 0))
|
lambda: self.client, local_addr=("0.0.0.0", 0))
|
||||||
self.EVENT_CONTROLLER.stream.listen(
|
self.event_controller.stream.listen(
|
||||||
lambda request: self.on_hash(request[1], request[2]) if request[0] == 'search' else None)
|
lambda request: self.on_hash(request[1], request[2]) if request[0] == 'search' else None)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
@ -160,7 +160,7 @@ class TrackerClient:
|
||||||
self.transport.close()
|
self.transport.close()
|
||||||
self.client = None
|
self.client = None
|
||||||
self.transport = None
|
self.transport = None
|
||||||
self.EVENT_CONTROLLER.close()
|
self.event_controller.close()
|
||||||
|
|
||||||
def on_hash(self, info_hash, on_announcement=None):
|
def on_hash(self, info_hash, on_announcement=None):
|
||||||
if info_hash not in self.tasks:
|
if info_hash not in self.tasks:
|
||||||
|
@ -230,7 +230,7 @@ def enqueue_tracker_search(info_hash: bytes, peer_q: asyncio.Queue):
|
||||||
peers = await announcement_to_kademlia_peers(announcement)
|
peers = await announcement_to_kademlia_peers(announcement)
|
||||||
log.info("Found %d peers from tracker for %s", len(peers), info_hash.hex()[:8])
|
log.info("Found %d peers from tracker for %s", len(peers), info_hash.hex()[:8])
|
||||||
peer_q.put_nowait(peers)
|
peer_q.put_nowait(peers)
|
||||||
TrackerClient.EVENT_CONTROLLER.add(('search', info_hash, on_announcement))
|
TrackerClient.event_controller.add(('search', info_hash, on_announcement))
|
||||||
|
|
||||||
|
|
||||||
def announcement_to_kademlia_peers(*announcements: AnnounceResponse):
|
def announcement_to_kademlia_peers(*announcements: AnnounceResponse):
|
||||||
|
|
Loading…
Reference in a new issue