download from DHT
This commit is contained in:
parent
abbd8473bb
commit
8d6a7101f6
4 changed files with 22 additions and 3 deletions
|
@ -1,5 +1,6 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import typing
|
import typing
|
||||||
|
from collections import deque
|
||||||
|
|
||||||
from lbry.dht import constants
|
from lbry.dht import constants
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
|
@ -15,6 +16,7 @@ class DictDataStore:
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self._peer_manager = peer_manager
|
self._peer_manager = peer_manager
|
||||||
self.completed_blobs: typing.Set[str] = set()
|
self.completed_blobs: typing.Set[str] = set()
|
||||||
|
self.requested_blobs: typing.Deque = deque(maxlen=10)
|
||||||
|
|
||||||
def __len__(self):
|
def __len__(self):
|
||||||
return self._data_store.__len__()
|
return self._data_store.__len__()
|
||||||
|
|
|
@ -95,6 +95,7 @@ class KademliaRPC:
|
||||||
for peer in self.protocol.data_store.get_peers_for_blob(key)
|
for peer in self.protocol.data_store.get_peers_for_blob(key)
|
||||||
if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp()
|
if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp()
|
||||||
]
|
]
|
||||||
|
self.protocol.data_store.requested_blobs.append(key.hex())
|
||||||
# if we don't have k storing peers to return and we have this hash locally, include our contact information
|
# if we don't have k storing peers to return and we have this hash locally, include our contact information
|
||||||
if len(peers) < constants.K and key.hex() in self.protocol.data_store.completed_blobs:
|
if len(peers) < constants.K and key.hex() in self.protocol.data_store.completed_blobs:
|
||||||
peers.append(self.compact_address())
|
peers.append(self.compact_address())
|
||||||
|
|
|
@ -4,6 +4,8 @@ import asyncio
|
||||||
import logging
|
import logging
|
||||||
import binascii
|
import binascii
|
||||||
import typing
|
import typing
|
||||||
|
from collections import deque
|
||||||
|
|
||||||
import base58
|
import base58
|
||||||
|
|
||||||
from aioupnp import __version__ as aioupnp_version
|
from aioupnp import __version__ as aioupnp_version
|
||||||
|
@ -385,21 +387,26 @@ class BackgroundDownloader(Component):
|
||||||
|
|
||||||
def __init__(self, component_manager):
|
def __init__(self, component_manager):
|
||||||
super().__init__(component_manager)
|
super().__init__(component_manager)
|
||||||
self.status = {'pending': 0, 'ongoing': 0}
|
|
||||||
self.task: typing.Optional[asyncio.Task] = None
|
self.task: typing.Optional[asyncio.Task] = None
|
||||||
self.download_loop_delay_seconds = 60
|
self.download_loop_delay_seconds = 60
|
||||||
self.finished_iteration = asyncio.Event()
|
self.finished_iteration = asyncio.Event()
|
||||||
|
self.requested_blobs = deque(maxlen=10)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def component(self) -> 'BackgroundDownloader':
|
def component(self) -> 'BackgroundDownloader':
|
||||||
return self
|
return self
|
||||||
|
|
||||||
async def get_status(self):
|
async def get_status(self):
|
||||||
self.status['running'] = self.task is not None and not self.task.done()
|
return {'running': self.task is not None and not self.task.done(), 'enqueued': len(self.requested_blobs)}
|
||||||
return self.status
|
|
||||||
|
|
||||||
async def loop(self):
|
async def loop(self):
|
||||||
while True:
|
while True:
|
||||||
|
if self.component_manager.has_component(DHT_COMPONENT):
|
||||||
|
node = self.component_manager.get_component(DHT_COMPONENT)
|
||||||
|
self.requested_blobs = node.protocol.data_store.requested_blobs
|
||||||
|
if self.requested_blobs:
|
||||||
|
blob_hash = self.requested_blobs.pop()
|
||||||
|
await self.download_blobs(blob_hash)
|
||||||
self.finished_iteration.set()
|
self.finished_iteration.set()
|
||||||
self.finished_iteration.clear()
|
self.finished_iteration.clear()
|
||||||
await asyncio.sleep(self.download_loop_delay_seconds)
|
await asyncio.sleep(self.download_loop_delay_seconds)
|
||||||
|
|
|
@ -618,3 +618,12 @@ class TestProactiveDownloaderComponent(CommandTestCase):
|
||||||
await self.clear()
|
await self.clear()
|
||||||
await proactive_downloader.download_blobs(blobs[0].blob_hash)
|
await proactive_downloader.download_blobs(blobs[0].blob_hash)
|
||||||
self.assertEqual({blobs[0].blob_hash}, self.daemon.blob_manager.completed_blob_hashes)
|
self.assertEqual({blobs[0].blob_hash}, self.daemon.blob_manager.completed_blob_hashes)
|
||||||
|
|
||||||
|
# trigger from requested blobs
|
||||||
|
await self.clear()
|
||||||
|
await proactive_downloader.stop()
|
||||||
|
proactive_downloader.requested_blobs.append(content1)
|
||||||
|
finished = proactive_downloader.finished_iteration.wait()
|
||||||
|
await proactive_downloader.start()
|
||||||
|
await finished
|
||||||
|
await self.assertBlobs(content1)
|
||||||
|
|
Loading…
Add table
Reference in a new issue