rename BlobFileManager -> BlobManager

This commit is contained in:
Jack Robison 2019-03-28 14:51:55 -04:00
parent 7d2428fd17
commit b230d8fbcb
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
20 changed files with 56 additions and 53 deletions

View file

@ -2,18 +2,18 @@ import os
import typing
import asyncio
import logging
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.blob.blob_file import BlobFile, is_valid_blobhash
from lbrynet.stream.descriptor import StreamDescriptor
if typing.TYPE_CHECKING:
from lbrynet.dht.protocol.data_store import DictDataStore
from lbrynet.extras.daemon.storage import SQLiteStorage
log = logging.getLogger(__name__)
class BlobFileManager:
def __init__(self, loop: asyncio.BaseEventLoop, blob_dir: str, storage: SQLiteStorage,
class BlobManager:
def __init__(self, loop: asyncio.BaseEventLoop, blob_dir: str, storage: 'SQLiteStorage',
node_data_store: typing.Optional['DictDataStore'] = None):
"""
This class stores blobs on the hard disk

View file

@ -7,7 +7,7 @@ if typing.TYPE_CHECKING:
from lbrynet.conf import Config
from lbrynet.dht.node import Node
from lbrynet.dht.peer import KademliaPeer
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_manager import BlobManager
from lbrynet.blob.blob_file import BlobFile
log = logging.getLogger(__name__)
@ -15,7 +15,8 @@ log = logging.getLogger(__name__)
class BlobDownloader:
BAN_TIME = 10.0 # fixme: when connection manager gets implemented, move it out from here
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobFileManager',
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager',
peer_queue: asyncio.Queue):
self.loop = loop
self.config = config
@ -130,7 +131,7 @@ class BlobDownloader:
transport.close()
async def download_blob(loop, config: 'Config', blob_manager: 'BlobFileManager', node: 'Node',
async def download_blob(loop, config: 'Config', blob_manager: 'BlobManager', node: 'Node',
blob_hash: str) -> 'BlobFile':
search_queue = asyncio.Queue(loop=loop, maxsize=config.max_connections_per_download)
search_queue.put_nowait(blob_hash)

View file

@ -8,13 +8,13 @@ from lbrynet.blob_exchange.serialization import BlobAvailabilityResponse, BlobPr
BlobPaymentAddressResponse
if typing.TYPE_CHECKING:
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_manager import BlobManager
log = logging.getLogger(__name__)
class BlobServerProtocol(asyncio.Protocol):
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', lbrycrd_address: str):
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str):
self.loop = loop
self.blob_manager = blob_manager
self.server_task: asyncio.Task = None
@ -94,7 +94,7 @@ class BlobServerProtocol(asyncio.Protocol):
class BlobServer:
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', lbrycrd_address: str):
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str):
self.loop = loop
self.blob_manager = blob_manager
self.server_task: asyncio.Task = None

View file

@ -16,7 +16,7 @@ from lbrynet import utils
from lbrynet.conf import HEADERS_FILE_SHA256_CHECKSUM
from lbrynet.dht.node import Node
from lbrynet.dht.blob_announcer import BlobAnnouncer
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_manager import BlobManager
from lbrynet.blob_exchange.server import BlobServer
from lbrynet.stream.stream_manager import StreamManager
from lbrynet.extras.daemon.Component import Component
@ -278,10 +278,10 @@ class BlobComponent(Component):
def __init__(self, component_manager):
super().__init__(component_manager)
self.blob_manager: BlobFileManager = None
self.blob_manager: BlobManager = None
@property
def component(self) -> typing.Optional[BlobFileManager]:
def component(self) -> typing.Optional[BlobManager]:
return self.blob_manager
async def start(self):
@ -291,8 +291,10 @@ class BlobComponent(Component):
dht_node: Node = self.component_manager.get_component(DHT_COMPONENT)
if dht_node:
data_store = dht_node.protocol.data_store
self.blob_manager = BlobFileManager(asyncio.get_event_loop(), os.path.join(self.conf.data_dir, "blobfiles"),
storage, data_store)
blob_dir = os.path.join(self.conf.data_dir, 'blobfiles')
if not os.path.isdir(blob_dir):
os.mkdir(blob_dir)
self.blob_manager = BlobManager(asyncio.get_event_loop(), blob_dir, storage, data_store)
return await self.blob_manager.setup()
async def stop(self):
@ -451,7 +453,7 @@ class PeerProtocolServerComponent(Component):
async def start(self):
log.info("start blob server")
upnp = self.component_manager.get_component(UPNP_COMPONENT)
blob_manager: BlobFileManager = self.component_manager.get_component(BLOB_COMPONENT)
blob_manager: BlobManager = self.component_manager.get_component(BLOB_COMPONENT)
wallet: LbryWalletManager = self.component_manager.get_component(WALLET_COMPONENT)
peer_port = self.conf.tcp_port
address = await wallet.get_unused_address()

View file

@ -40,7 +40,7 @@ from lbrynet.extras.daemon.comment_client import jsonrpc_batch, jsonrpc_post, rp
if typing.TYPE_CHECKING:
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_manager import BlobManager
from lbrynet.dht.node import Node
from lbrynet.extras.daemon.Components import UPnPComponent
from lbrynet.extras.daemon.exchange_rate_manager import ExchangeRateManager
@ -296,7 +296,7 @@ class Daemon(metaclass=JSONRPCServerType):
return self.component_manager.get_component(EXCHANGE_RATE_MANAGER_COMPONENT)
@property
def blob_manager(self) -> typing.Optional['BlobFileManager']:
def blob_manager(self) -> typing.Optional['BlobManager']:
return self.component_manager.get_component(BLOB_COMPONENT)
@property

View file

@ -10,7 +10,7 @@ from lbrynet.dht.peer import KademliaPeer
if typing.TYPE_CHECKING:
from lbrynet.conf import Config
from lbrynet.dht.node import Node
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_manager import BlobManager
from lbrynet.blob.blob_file import BlobFile
log = logging.getLogger(__name__)

View file

@ -11,7 +11,7 @@ from lbrynet.stream.reflector.client import StreamReflectorClient
from lbrynet.extras.daemon.storage import StoredStreamClaim
if typing.TYPE_CHECKING:
from lbrynet.schema.claim import Claim
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_manager import BlobManager
from lbrynet.dht.node import Node
log = logging.getLogger(__name__)
@ -22,7 +22,7 @@ class ManagedStream:
STATUS_STOPPED = "stopped"
STATUS_FINISHED = "finished"
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', rowid: int,
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobManager', rowid: int,
descriptor: 'StreamDescriptor', download_directory: str, file_name: typing.Optional[str],
downloader: typing.Optional[StreamDownloader] = None,
status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None,

View file

@ -4,7 +4,7 @@ import logging
import typing
if typing.TYPE_CHECKING:
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_manager import BlobManager
from lbrynet.stream.descriptor import StreamDescriptor
REFLECTOR_V1 = 0
@ -14,7 +14,7 @@ log = logging.getLogger(__name__)
class StreamReflectorClient(asyncio.Protocol):
def __init__(self, blob_manager: 'BlobFileManager', descriptor: 'StreamDescriptor'):
def __init__(self, blob_manager: 'BlobManager', descriptor: 'StreamDescriptor'):
self.loop = asyncio.get_event_loop()
self.transport: asyncio.StreamWriter = None
self.blob_manager = blob_manager

View file

@ -7,7 +7,7 @@ from lbrynet.stream.descriptor import StreamDescriptor
if typing.TYPE_CHECKING:
from lbrynet.blob.blob_file import BlobFile
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_manager import BlobManager
from lbrynet.blob.writer import HashBlobWriter
@ -15,7 +15,7 @@ log = logging.getLogger(__name__)
class ReflectorServerProtocol(asyncio.Protocol):
def __init__(self, blob_manager: 'BlobFileManager'):
def __init__(self, blob_manager: 'BlobManager'):
self.loop = asyncio.get_event_loop()
self.blob_manager = blob_manager
self.server_task: asyncio.Task = None
@ -121,7 +121,7 @@ class ReflectorServerProtocol(asyncio.Protocol):
class ReflectorServer:
def __init__(self, blob_manager: 'BlobFileManager'):
def __init__(self, blob_manager: 'BlobManager'):
self.loop = asyncio.get_event_loop()
self.blob_manager = blob_manager
self.server_task: asyncio.Task = None

View file

@ -16,7 +16,7 @@ from lbrynet.schema.uri import parse_lbry_uri
from lbrynet.extras.daemon.storage import lbc_to_dewies
if typing.TYPE_CHECKING:
from lbrynet.conf import Config
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_manager import BlobManager
from lbrynet.dht.node import Node
from lbrynet.extras.daemon.analytics import AnalyticsManager
from lbrynet.extras.daemon.storage import SQLiteStorage, StoredStreamClaim

View file

@ -18,7 +18,7 @@ from lbrynet.extras.daemon.Components import (
)
from lbrynet.extras.daemon.ComponentManager import ComponentManager
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_manager import BlobManager
from lbrynet.stream.reflector.server import ReflectorServer
from lbrynet.blob_exchange.server import BlobServer
@ -109,7 +109,7 @@ class CommandTestCase(IntegrationTestCase):
self.addCleanup(shutil.rmtree, server_tmp_dir)
self.server_storage = SQLiteStorage(Config(), ':memory:')
await self.server_storage.open()
self.server_blob_manager = BlobFileManager(self.loop, server_tmp_dir, self.server_storage)
self.server_blob_manager = BlobManager(self.loop, server_tmp_dir, self.server_storage)
self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP')
self.server.start_server(5567, '127.0.0.1')
await self.server.started_listening.wait()

View file

@ -5,7 +5,7 @@ import socket
import ipaddress
from lbrynet.conf import Config
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_manager import BlobManager
from lbrynet.blob_exchange.client import BlobExchangeClientProtocol, request_blob
import logging
@ -32,7 +32,7 @@ async def main(blob_hash: str, url: str):
host = host_info[0][4][0]
storage = SQLiteStorage(conf, os.path.join(conf.data_dir, "lbrynet.sqlite"))
blob_manager = BlobFileManager(loop, os.path.join(conf.data_dir, "blobfiles"), storage)
blob_manager = BlobManager(loop, os.path.join(conf.data_dir, "blobfiles"), storage)
await storage.open()
await blob_manager.setup()

View file

@ -1,7 +1,7 @@
import sys
import os
import asyncio
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_manager import BlobManager
from lbrynet.blob_exchange.server import BlobServer
from lbrynet.schema.address import decode_address
from lbrynet.extras.daemon.storage import SQLiteStorage
@ -17,7 +17,7 @@ async def main(address: str):
storage = SQLiteStorage(os.path.expanduser("~/.lbrynet/lbrynet.sqlite"))
await storage.open()
blob_manager = BlobFileManager(loop, os.path.expanduser("~/.lbrynet/blobfiles"), storage)
blob_manager = BlobManager(loop, os.path.expanduser("~/.lbrynet/blobfiles"), storage)
await blob_manager.setup()
server = await loop.create_server(

View file

@ -5,7 +5,7 @@ import os
from torba.testcase import AsyncioTestCase
from lbrynet.conf import Config
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_manager import BlobManager
class TestBlobfile(AsyncioTestCase):
@ -18,7 +18,7 @@ class TestBlobfile(AsyncioTestCase):
self.addCleanup(lambda: shutil.rmtree(tmp_dir))
storage = SQLiteStorage(Config(), os.path.join(tmp_dir, "lbrynet.sqlite"))
blob_manager = BlobFileManager(loop, tmp_dir, storage)
blob_manager = BlobManager(loop, tmp_dir, storage)
await storage.open()
await blob_manager.setup()

View file

@ -5,7 +5,7 @@ import os
from torba.testcase import AsyncioTestCase
from lbrynet.conf import Config
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_manager import BlobManager
class TestBlobManager(AsyncioTestCase):
@ -15,7 +15,7 @@ class TestBlobManager(AsyncioTestCase):
self.addCleanup(lambda: shutil.rmtree(tmp_dir))
storage = SQLiteStorage(Config(), os.path.join(tmp_dir, "lbrynet.sqlite"))
blob_manager = BlobFileManager(loop, tmp_dir, storage)
blob_manager = BlobManager(loop, tmp_dir, storage)
# add a blob file
blob_hash = "7f5ab2def99f0ddd008da71db3a3772135f4002b19b7605840ed1034c8955431bd7079549e65e6b2a3b9c17c773073ed"

View file

@ -9,7 +9,7 @@ from lbrynet.blob_exchange.serialization import BlobRequest
from torba.testcase import AsyncioTestCase
from lbrynet.conf import Config
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_manager import BlobManager
from lbrynet.blob_exchange.server import BlobServer, BlobServerProtocol
from lbrynet.blob_exchange.client import BlobExchangeClientProtocol, request_blob
from lbrynet.dht.peer import KademliaPeer, PeerManager
@ -35,13 +35,13 @@ class BlobExchangeTestBase(AsyncioTestCase):
self.server_config = Config(data_dir=self.server_dir, download_dir=self.server_dir, wallet=self.server_dir,
reflector_servers=[])
self.server_storage = SQLiteStorage(self.server_config, os.path.join(self.server_dir, "lbrynet.sqlite"))
self.server_blob_manager = BlobFileManager(self.loop, self.server_dir, self.server_storage)
self.server_blob_manager = BlobManager(self.loop, self.server_dir, self.server_storage)
self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP')
self.client_config = Config(data_dir=self.client_dir, download_dir=self.client_dir, wallet=self.client_dir,
reflector_servers=[])
self.client_storage = SQLiteStorage(self.client_config, os.path.join(self.client_dir, "lbrynet.sqlite"))
self.client_blob_manager = BlobFileManager(self.loop, self.client_dir, self.client_storage)
self.client_blob_manager = BlobManager(self.loop, self.client_dir, self.client_storage)
self.client_peer_manager = PeerManager(self.loop)
self.server_from_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333)
@ -94,7 +94,7 @@ class TestBlobExchange(BlobExchangeTestBase):
self.addCleanup(shutil.rmtree, second_client_dir)
second_client_storage = SQLiteStorage(Config(), os.path.join(second_client_dir, "lbrynet.sqlite"))
second_client_blob_manager = BlobFileManager(self.loop, second_client_dir, second_client_storage)
second_client_blob_manager = BlobManager(self.loop, second_client_dir, second_client_storage)
server_from_second_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333)
await second_client_storage.open()
@ -126,7 +126,7 @@ class TestBlobExchange(BlobExchangeTestBase):
self.addCleanup(shutil.rmtree, second_client_dir)
second_client_storage = SQLiteStorage(Config(), os.path.join(second_client_dir, "lbrynet.sqlite"))
second_client_blob_manager = BlobFileManager(self.loop, second_client_dir, second_client_storage)
second_client_blob_manager = BlobManager(self.loop, second_client_dir, second_client_storage)
server_from_second_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333)
await second_client_storage.open()

View file

@ -7,7 +7,7 @@ from torba.testcase import AsyncioTestCase
from lbrynet.conf import Config
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.blob.blob_info import BlobInfo
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_manager import BlobManager
from lbrynet.stream.descriptor import StreamDescriptor
from tests.test_utils import random_lbry_hash
@ -71,7 +71,7 @@ class StorageTest(AsyncioTestCase):
self.storage = SQLiteStorage(Config(), ':memory:')
self.blob_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.blob_dir)
self.blob_manager = BlobFileManager(asyncio.get_event_loop(), self.blob_dir, self.storage)
self.blob_manager = BlobManager(asyncio.get_event_loop(), self.blob_dir, self.storage)
await self.storage.open()
async def asyncTearDown(self):

View file

@ -7,7 +7,7 @@ from torba.testcase import AsyncioTestCase
from lbrynet.conf import Config
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_manager import BlobManager
from lbrynet.stream.assembler import StreamAssembler
from lbrynet.stream.descriptor import StreamDescriptor
from lbrynet.stream.stream_manager import StreamManager
@ -24,7 +24,7 @@ class TestStreamAssembler(AsyncioTestCase):
self.addCleanup(lambda: shutil.rmtree(tmp_dir))
self.storage = SQLiteStorage(Config(), ":memory:")
await self.storage.open()
self.blob_manager = BlobFileManager(self.loop, tmp_dir, self.storage)
self.blob_manager = BlobManager(self.loop, tmp_dir, self.storage)
download_dir = tempfile.mkdtemp()
self.addCleanup(lambda: shutil.rmtree(download_dir))
@ -51,7 +51,7 @@ class TestStreamAssembler(AsyncioTestCase):
await downloader_storage.open()
# add the blobs to the blob table (this would happen upon a blob download finishing)
downloader_blob_manager = BlobFileManager(self.loop, download_dir, downloader_storage)
downloader_blob_manager = BlobManager(self.loop, download_dir, downloader_storage)
descriptor = await downloader_blob_manager.get_stream_descriptor(sd_hash)
# assemble the decrypted file
@ -97,7 +97,7 @@ class TestStreamAssembler(AsyncioTestCase):
await storage.open()
tmp_dir = tempfile.mkdtemp()
self.addCleanup(lambda: shutil.rmtree(tmp_dir))
blob_manager = BlobFileManager(self.loop, tmp_dir, storage)
blob_manager = BlobManager(self.loop, tmp_dir, storage)
stream_manager = StreamManager(self.loop, Config(), blob_manager, None, storage, None)
# create the stream
download_dir = tempfile.mkdtemp()

View file

@ -5,7 +5,7 @@ import shutil
from torba.testcase import AsyncioTestCase
from lbrynet.conf import Config
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_manager import BlobManager
from lbrynet.stream.stream_manager import StreamManager
from lbrynet.stream.reflector.server import ReflectorServer
@ -20,14 +20,14 @@ class TestStreamAssembler(AsyncioTestCase):
self.addCleanup(lambda: shutil.rmtree(tmp_dir))
self.storage = SQLiteStorage(Config(), os.path.join(tmp_dir, "lbrynet.sqlite"))
await self.storage.open()
self.blob_manager = BlobFileManager(self.loop, tmp_dir, self.storage)
self.blob_manager = BlobManager(self.loop, tmp_dir, self.storage)
self.stream_manager = StreamManager(self.loop, Config(), self.blob_manager, None, self.storage, None)
server_tmp_dir = tempfile.mkdtemp()
self.addCleanup(lambda: shutil.rmtree(server_tmp_dir))
self.server_storage = SQLiteStorage(Config(), os.path.join(server_tmp_dir, "lbrynet.sqlite"))
await self.server_storage.open()
self.server_blob_manager = BlobFileManager(self.loop, server_tmp_dir, self.server_storage)
self.server_blob_manager = BlobManager(self.loop, server_tmp_dir, self.server_storage)
download_dir = tempfile.mkdtemp()
self.addCleanup(lambda: shutil.rmtree(download_dir))

View file

@ -9,7 +9,7 @@ from torba.testcase import AsyncioTestCase
from lbrynet.conf import Config
from lbrynet.error import InvalidStreamDescriptorError
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_manager import BlobManager
from lbrynet.stream.descriptor import StreamDescriptor
@ -22,7 +22,7 @@ class TestStreamDescriptor(AsyncioTestCase):
self.addCleanup(lambda: shutil.rmtree(self.tmp_dir))
self.storage = SQLiteStorage(Config(), ":memory:")
await self.storage.open()
self.blob_manager = BlobFileManager(self.loop, self.tmp_dir, self.storage)
self.blob_manager = BlobManager(self.loop, self.tmp_dir, self.storage)
self.file_path = os.path.join(self.tmp_dir, "test_file")
with open(self.file_path, 'wb') as f:
@ -85,7 +85,7 @@ class TestRecoverOldStreamDescriptors(AsyncioTestCase):
self.addCleanup(lambda: shutil.rmtree(tmp_dir))
storage = SQLiteStorage(Config(), ":memory:")
await storage.open()
blob_manager = BlobFileManager(loop, tmp_dir, storage)
blob_manager = BlobManager(loop, tmp_dir, storage)
sd_bytes = b'{"stream_name": "4f62616d6120446f6e6b65792d322e73746c", "blobs": [{"length": 1153488, "blob_num' \
b'": 0, "blob_hash": "9fa32a249ce3f2d4e46b78599800f368b72f2a7f22b81df443c7f6bdbef496bd61b4c0079c7' \