lbry-sdk/lbrynet/dht/protocol/data_store.py

71 lines
2.6 KiB
Python
Raw Normal View History

2019-01-22 18:49:43 +01:00
import asyncio
import typing
from lbrynet.dht import constants
if typing.TYPE_CHECKING:
from lbrynet.dht.peer import KademliaPeer, PeerManager
class DictDataStore:
def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager'):
# Dictionary format:
2019-05-17 15:12:00 +02:00
# { <key>: [(<contact>, <age>), ...] }
self._data_store: typing.Dict[bytes, typing.List[typing.Tuple['KademliaPeer', float]]] = {}
self.loop = loop
2019-01-22 18:49:43 +01:00
self._peer_manager = peer_manager
self.completed_blobs: typing.Set[str] = set()
2019-05-17 15:12:00 +02:00
def removed_expired_peers(self):
now = self.loop.time()
keys = list(self._data_store.keys())
for key in keys:
to_remove = []
for (peer, ts) in self._data_store[key]:
if ts + constants.data_expiration < now or self._peer_manager.peer_is_good(peer) is False:
to_remove.append((peer, ts))
for item in to_remove:
self._data_store[key].remove(item)
if not self._data_store[key]:
del self._data_store[key]
def filter_bad_and_expired_peers(self, key: bytes) -> typing.Iterator['KademliaPeer']:
2019-01-22 18:49:43 +01:00
"""
Returns only non-expired and unknown/good peers
"""
2019-05-17 15:12:00 +02:00
for peer in self.filter_expired_peers(key):
2019-01-22 18:49:43 +01:00
if self._peer_manager.peer_is_good(peer) is not False:
2019-05-17 15:12:00 +02:00
yield peer
2019-01-22 18:49:43 +01:00
2019-05-17 15:12:00 +02:00
def filter_expired_peers(self, key: bytes) -> typing.Iterator['KademliaPeer']:
2019-01-22 18:49:43 +01:00
"""
Returns only non-expired peers
"""
2019-05-17 15:12:00 +02:00
now = self.loop.time()
for (peer, ts) in self._data_store.get(key, []):
if ts + constants.data_expiration > now:
yield peer
2019-01-22 18:49:43 +01:00
def has_peers_for_blob(self, key: bytes) -> bool:
2019-05-17 15:12:00 +02:00
return key in self._data_store
2019-01-22 18:49:43 +01:00
2019-05-17 15:12:00 +02:00
def add_peer_to_blob(self, contact: 'KademliaPeer', key: bytes) -> None:
now = self.loop.time()
2019-01-22 18:49:43 +01:00
if key in self._data_store:
2019-05-17 15:12:00 +02:00
current = list(filter(lambda x: x[0] == contact, self._data_store[key]))
if len(current):
self._data_store[key][self._data_store[key].index(current[0])] = contact, now
else:
self._data_store[key].append((contact, now))
2019-01-22 18:49:43 +01:00
else:
2019-05-17 15:12:00 +02:00
self._data_store[key] = [(contact, now)]
2019-01-22 18:49:43 +01:00
def get_peers_for_blob(self, key: bytes) -> typing.List['KademliaPeer']:
2019-05-17 15:12:00 +02:00
return list(self.filter_bad_and_expired_peers(key))
2019-01-22 18:49:43 +01:00
def get_storing_contacts(self) -> typing.List['KademliaPeer']:
peers = set()
2019-05-17 15:12:00 +02:00
for key, stored in self._data_store.items():
peers.update(set(map(lambda tup: tup[0], stored)))
2019-01-22 18:49:43 +01:00
return list(peers)