Compare commits

..

1 commit

Author SHA1 Message Date
Lex Berezhny
677d9053de claim_list resolve of blocked claims 2022-10-03 10:44:27 -04:00
59 changed files with 285 additions and 582 deletions

View file

@ -1,18 +1,18 @@
name: ci name: ci
on: ["push", "pull_request", "workflow_dispatch"] on: ["push", "pull_request"]
jobs: jobs:
lint: lint:
name: lint name: lint
runs-on: ubuntu-20.04 runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v2
- uses: actions/setup-python@v4 - uses: actions/setup-python@v1
with: with:
python-version: '3.9' python-version: '3.7'
- name: extract pip cache - name: extract pip cache
uses: actions/cache@v3 uses: actions/cache@v2
with: with:
path: ~/.cache/pip path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }} key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
@ -26,26 +26,26 @@ jobs:
strategy: strategy:
matrix: matrix:
os: os:
- ubuntu-20.04 - ubuntu-latest
- macos-latest - macos-latest
- windows-latest - windows-latest
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v2
- uses: actions/setup-python@v4 - uses: actions/setup-python@v1
with: with:
python-version: '3.9' python-version: '3.7'
- name: set pip cache dir - name: set pip cache dir
shell: bash id: pip-cache
run: echo "PIP_CACHE_DIR=$(pip cache dir)" >> $GITHUB_ENV run: echo "::set-output name=dir::$(pip cache dir)"
- name: extract pip cache - name: extract pip cache
uses: actions/cache@v3 uses: actions/cache@v2
with: with:
path: ${{ env.PIP_CACHE_DIR }} path: ${{ steps.pip-cache.outputs.dir }}
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }} key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
restore-keys: ${{ runner.os }}-pip- restore-keys: ${{ runner.os }}-pip-
- id: os-name - id: os-name
uses: ASzc/change-string-case-action@v5 uses: ASzc/change-string-case-action@v1
with: with:
string: ${{ runner.os }} string: ${{ runner.os }}
- run: python -m pip install --user --upgrade pip wheel - run: python -m pip install --user --upgrade pip wheel
@ -72,7 +72,7 @@ jobs:
tests-integration: tests-integration:
name: "tests / integration" name: "tests / integration"
runs-on: ubuntu-20.04 runs-on: ubuntu-latest
strategy: strategy:
matrix: matrix:
test: test:
@ -93,16 +93,16 @@ jobs:
uses: elastic/elastic-github-actions/elasticsearch@master uses: elastic/elastic-github-actions/elasticsearch@master
with: with:
stack-version: 7.12.1 stack-version: 7.12.1
- uses: actions/checkout@v3 - uses: actions/checkout@v2
- uses: actions/setup-python@v4 - uses: actions/setup-python@v1
with: with:
python-version: '3.9' python-version: '3.7'
- if: matrix.test == 'other' - if: matrix.test == 'other'
run: | run: |
sudo apt-get update sudo apt-get update
sudo apt-get install -y --no-install-recommends ffmpeg sudo apt-get install -y --no-install-recommends ffmpeg
- name: extract pip cache - name: extract pip cache
uses: actions/cache@v3 uses: actions/cache@v2
with: with:
path: ./.tox path: ./.tox
key: tox-integration-${{ matrix.test }}-${{ hashFiles('setup.py') }} key: tox-integration-${{ matrix.test }}-${{ hashFiles('setup.py') }}
@ -123,7 +123,7 @@ jobs:
coverage: coverage:
needs: ["tests-unit", "tests-integration"] needs: ["tests-unit", "tests-integration"]
runs-on: ubuntu-20.04 runs-on: ubuntu-latest
steps: steps:
- name: finalize coverage report submission - name: finalize coverage report submission
env: env:
@ -138,29 +138,29 @@ jobs:
strategy: strategy:
matrix: matrix:
os: os:
- ubuntu-20.04 - ubuntu-18.04
- macos-latest - macos-latest
- windows-latest - windows-latest
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v2
- uses: actions/setup-python@v4 - uses: actions/setup-python@v1
with: with:
python-version: '3.9' python-version: '3.7'
- id: os-name - id: os-name
uses: ASzc/change-string-case-action@v5 uses: ASzc/change-string-case-action@v1
with: with:
string: ${{ runner.os }} string: ${{ runner.os }}
- name: set pip cache dir - name: set pip cache dir
shell: bash id: pip-cache
run: echo "PIP_CACHE_DIR=$(pip cache dir)" >> $GITHUB_ENV run: echo "::set-output name=dir::$(pip cache dir)"
- name: extract pip cache - name: extract pip cache
uses: actions/cache@v3 uses: actions/cache@v2
with: with:
path: ${{ env.PIP_CACHE_DIR }} path: ${{ steps.pip-cache.outputs.dir }}
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }} key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
restore-keys: ${{ runner.os }}-pip- restore-keys: ${{ runner.os }}-pip-
- run: pip install pyinstaller==4.6 - run: pip install pyinstaller==4.4
- run: pip install -e . - run: pip install -e .
- if: startsWith(github.ref, 'refs/tags/v') - if: startsWith(github.ref, 'refs/tags/v')
run: python docker/set_build.py run: python docker/set_build.py
@ -175,7 +175,7 @@ jobs:
pip install pywin32==301 pip install pywin32==301
pyinstaller --additional-hooks-dir=scripts/. --icon=icons/lbry256.ico --onefile --name lbrynet lbry/extras/cli.py pyinstaller --additional-hooks-dir=scripts/. --icon=icons/lbry256.ico --onefile --name lbrynet lbry/extras/cli.py
dist/lbrynet.exe --version dist/lbrynet.exe --version
- uses: actions/upload-artifact@v3 - uses: actions/upload-artifact@v2
with: with:
name: lbrynet-${{ steps.os-name.outputs.lowercase }} name: lbrynet-${{ steps.os-name.outputs.lowercase }}
path: dist/ path: dist/
@ -184,7 +184,7 @@ jobs:
name: "release" name: "release"
if: startsWith(github.ref, 'refs/tags/v') if: startsWith(github.ref, 'refs/tags/v')
needs: ["build"] needs: ["build"]
runs-on: ubuntu-20.04 runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
- uses: actions/download-artifact@v2 - uses: actions/download-artifact@v2

View file

@ -7,7 +7,7 @@ on:
jobs: jobs:
release: release:
name: "slack notification" name: "slack notification"
runs-on: ubuntu-20.04 runs-on: ubuntu-latest
steps: steps:
- uses: LoveToKnow/slackify-markdown-action@v1.0.0 - uses: LoveToKnow/slackify-markdown-action@v1.0.0
id: markdown id: markdown

View file

@ -1,2 +1,2 @@
__version__ = "0.113.0" __version__ = "0.110.0"
version = tuple(map(int, __version__.split('.'))) # pylint: disable=invalid-name version = tuple(map(int, __version__.split('.'))) # pylint: disable=invalid-name

View file

@ -87,8 +87,8 @@ class AbstractBlob:
self.blob_completed_callback = blob_completed_callback self.blob_completed_callback = blob_completed_callback
self.blob_directory = blob_directory self.blob_directory = blob_directory
self.writers: typing.Dict[typing.Tuple[typing.Optional[str], typing.Optional[int]], HashBlobWriter] = {} self.writers: typing.Dict[typing.Tuple[typing.Optional[str], typing.Optional[int]], HashBlobWriter] = {}
self.verified: asyncio.Event = asyncio.Event() self.verified: asyncio.Event = asyncio.Event(loop=self.loop)
self.writing: asyncio.Event = asyncio.Event() self.writing: asyncio.Event = asyncio.Event(loop=self.loop)
self.readers: typing.List[typing.BinaryIO] = [] self.readers: typing.List[typing.BinaryIO] = []
self.added_on = added_on or time.time() self.added_on = added_on or time.time()
self.is_mine = is_mine self.is_mine = is_mine
@ -222,7 +222,7 @@ class AbstractBlob:
peer_port: typing.Optional[int] = None) -> HashBlobWriter: peer_port: typing.Optional[int] = None) -> HashBlobWriter:
if (peer_address, peer_port) in self.writers and not self.writers[(peer_address, peer_port)].closed(): if (peer_address, peer_port) in self.writers and not self.writers[(peer_address, peer_port)].closed():
raise OSError(f"attempted to download blob twice from {peer_address}:{peer_port}") raise OSError(f"attempted to download blob twice from {peer_address}:{peer_port}")
fut = asyncio.Future() fut = asyncio.Future(loop=self.loop)
writer = HashBlobWriter(self.blob_hash, self.get_length, fut) writer = HashBlobWriter(self.blob_hash, self.get_length, fut)
self.writers[(peer_address, peer_port)] = writer self.writers[(peer_address, peer_port)] = writer

View file

@ -32,7 +32,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
self.buf = b'' self.buf = b''
# this is here to handle the race when the downloader is closed right as response_fut gets a result # this is here to handle the race when the downloader is closed right as response_fut gets a result
self.closed = asyncio.Event() self.closed = asyncio.Event(loop=self.loop)
def data_received(self, data: bytes): def data_received(self, data: bytes):
if self.connection_manager: if self.connection_manager:
@ -111,7 +111,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
self.transport.write(msg) self.transport.write(msg)
if self.connection_manager: if self.connection_manager:
self.connection_manager.sent_data(f"{self.peer_address}:{self.peer_port}", len(msg)) self.connection_manager.sent_data(f"{self.peer_address}:{self.peer_port}", len(msg))
response: BlobResponse = await asyncio.wait_for(self._response_fut, self.peer_timeout) response: BlobResponse = await asyncio.wait_for(self._response_fut, self.peer_timeout, loop=self.loop)
availability_response = response.get_availability_response() availability_response = response.get_availability_response()
price_response = response.get_price_response() price_response = response.get_price_response()
blob_response = response.get_blob_response() blob_response = response.get_blob_response()
@ -151,7 +151,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
f" timeout in {self.peer_timeout}" f" timeout in {self.peer_timeout}"
log.debug(msg) log.debug(msg)
msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}" msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}"
await asyncio.wait_for(self.writer.finished, self.peer_timeout) await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop)
# wait for the io to finish # wait for the io to finish
await self.blob.verified.wait() await self.blob.verified.wait()
log.info("%s at %fMB/s", msg, log.info("%s at %fMB/s", msg,
@ -187,7 +187,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
try: try:
self._blob_bytes_received = 0 self._blob_bytes_received = 0
self.blob, self.writer = blob, blob.get_blob_writer(self.peer_address, self.peer_port) self.blob, self.writer = blob, blob.get_blob_writer(self.peer_address, self.peer_port)
self._response_fut = asyncio.Future() self._response_fut = asyncio.Future(loop=self.loop)
return await self._download_blob() return await self._download_blob()
except OSError: except OSError:
# i'm not sure how to fix this race condition - jack # i'm not sure how to fix this race condition - jack
@ -244,7 +244,7 @@ async def request_blob(loop: asyncio.AbstractEventLoop, blob: Optional['Abstract
try: try:
if not connected_protocol: if not connected_protocol:
await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port), await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port),
peer_connect_timeout) peer_connect_timeout, loop=loop)
connected_protocol = protocol connected_protocol = protocol
if blob is None or blob.get_is_verified() or not blob.is_writeable(): if blob is None or blob.get_is_verified() or not blob.is_writeable():
# blob is None happens when we are just opening a connection # blob is None happens when we are just opening a connection

View file

@ -30,7 +30,7 @@ class BlobDownloader:
self.failures: typing.Dict['KademliaPeer', int] = {} self.failures: typing.Dict['KademliaPeer', int] = {}
self.connection_failures: typing.Set['KademliaPeer'] = set() self.connection_failures: typing.Set['KademliaPeer'] = set()
self.connections: typing.Dict['KademliaPeer', 'BlobExchangeClientProtocol'] = {} self.connections: typing.Dict['KademliaPeer', 'BlobExchangeClientProtocol'] = {}
self.is_running = asyncio.Event() self.is_running = asyncio.Event(loop=self.loop)
def should_race_continue(self, blob: 'AbstractBlob'): def should_race_continue(self, blob: 'AbstractBlob'):
max_probes = self.config.max_connections_per_download * (1 if self.connections else 10) max_probes = self.config.max_connections_per_download * (1 if self.connections else 10)
@ -64,8 +64,8 @@ class BlobDownloader:
self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1 self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1
async def new_peer_or_finished(self): async def new_peer_or_finished(self):
active_tasks = list(self.active_connections.values()) + [asyncio.create_task(asyncio.sleep(1))] active_tasks = list(self.active_connections.values()) + [asyncio.sleep(1)]
await asyncio.wait(active_tasks, return_when='FIRST_COMPLETED') await asyncio.wait(active_tasks, loop=self.loop, return_when='FIRST_COMPLETED')
def cleanup_active(self): def cleanup_active(self):
if not self.active_connections and not self.connections: if not self.active_connections and not self.connections:
@ -126,7 +126,7 @@ class BlobDownloader:
async def download_blob(loop, config: 'Config', blob_manager: 'BlobManager', dht_node: 'Node', async def download_blob(loop, config: 'Config', blob_manager: 'BlobManager', dht_node: 'Node',
blob_hash: str) -> 'AbstractBlob': blob_hash: str) -> 'AbstractBlob':
search_queue = asyncio.Queue(maxsize=config.max_connections_per_download) search_queue = asyncio.Queue(loop=loop, maxsize=config.max_connections_per_download)
search_queue.put_nowait(blob_hash) search_queue.put_nowait(blob_hash)
peer_queue, accumulate_task = dht_node.accumulate_peers(search_queue) peer_queue, accumulate_task = dht_node.accumulate_peers(search_queue)
fixed_peers = None if not config.fixed_peers else await get_kademlia_peers_from_hosts(config.fixed_peers) fixed_peers = None if not config.fixed_peers else await get_kademlia_peers_from_hosts(config.fixed_peers)

View file

@ -25,19 +25,19 @@ class BlobServerProtocol(asyncio.Protocol):
self.idle_timeout = idle_timeout self.idle_timeout = idle_timeout
self.transfer_timeout = transfer_timeout self.transfer_timeout = transfer_timeout
self.server_task: typing.Optional[asyncio.Task] = None self.server_task: typing.Optional[asyncio.Task] = None
self.started_listening = asyncio.Event() self.started_listening = asyncio.Event(loop=self.loop)
self.buf = b'' self.buf = b''
self.transport: typing.Optional[asyncio.Transport] = None self.transport: typing.Optional[asyncio.Transport] = None
self.lbrycrd_address = lbrycrd_address self.lbrycrd_address = lbrycrd_address
self.peer_address_and_port: typing.Optional[str] = None self.peer_address_and_port: typing.Optional[str] = None
self.started_transfer = asyncio.Event() self.started_transfer = asyncio.Event(loop=self.loop)
self.transfer_finished = asyncio.Event() self.transfer_finished = asyncio.Event(loop=self.loop)
self.close_on_idle_task: typing.Optional[asyncio.Task] = None self.close_on_idle_task: typing.Optional[asyncio.Task] = None
async def close_on_idle(self): async def close_on_idle(self):
while self.transport: while self.transport:
try: try:
await asyncio.wait_for(self.started_transfer.wait(), self.idle_timeout) await asyncio.wait_for(self.started_transfer.wait(), self.idle_timeout, loop=self.loop)
except asyncio.TimeoutError: except asyncio.TimeoutError:
log.debug("closing idle connection from %s", self.peer_address_and_port) log.debug("closing idle connection from %s", self.peer_address_and_port)
return self.close() return self.close()
@ -101,7 +101,7 @@ class BlobServerProtocol(asyncio.Protocol):
log.debug("send %s to %s:%i", blob_hash, peer_address, peer_port) log.debug("send %s to %s:%i", blob_hash, peer_address, peer_port)
self.started_transfer.set() self.started_transfer.set()
try: try:
sent = await asyncio.wait_for(blob.sendfile(self), self.transfer_timeout) sent = await asyncio.wait_for(blob.sendfile(self), self.transfer_timeout, loop=self.loop)
if sent and sent > 0: if sent and sent > 0:
self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, sent) self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, sent)
log.info("sent %s (%i bytes) to %s:%i", blob_hash, sent, peer_address, peer_port) log.info("sent %s (%i bytes) to %s:%i", blob_hash, sent, peer_address, peer_port)
@ -157,7 +157,7 @@ class BlobServer:
self.loop = loop self.loop = loop
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.server_task: typing.Optional[asyncio.Task] = None self.server_task: typing.Optional[asyncio.Task] = None
self.started_listening = asyncio.Event() self.started_listening = asyncio.Event(loop=self.loop)
self.lbrycrd_address = lbrycrd_address self.lbrycrd_address = lbrycrd_address
self.idle_timeout = idle_timeout self.idle_timeout = idle_timeout
self.transfer_timeout = transfer_timeout self.transfer_timeout = transfer_timeout

View file

@ -688,9 +688,6 @@ class Config(CLIConfig):
tracker_servers = Servers("BitTorrent-compatible (BEP15) UDP trackers for helping P2P discovery", [ tracker_servers = Servers("BitTorrent-compatible (BEP15) UDP trackers for helping P2P discovery", [
('tracker.lbry.com', 9252), ('tracker.lbry.com', 9252),
('tracker.lbry.grin.io', 9252), ('tracker.lbry.grin.io', 9252),
('tracker.lbry.pigg.es', 9252),
('tracker.lizard.technology', 9252),
('s1.lbry.network', 9252),
]) ])
lbryum_servers = Servers("SPV wallet servers", [ lbryum_servers = Servers("SPV wallet servers", [
@ -703,20 +700,14 @@ class Config(CLIConfig):
('spv17.lbry.com', 50001), ('spv17.lbry.com', 50001),
('spv18.lbry.com', 50001), ('spv18.lbry.com', 50001),
('spv19.lbry.com', 50001), ('spv19.lbry.com', 50001),
('hub.lbry.grin.io', 50001),
('hub.lizard.technology', 50001),
('s1.lbry.network', 50001),
]) ])
known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [ known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [
('dht.lbry.grin.io', 4444), # Grin ('dht.lbry.grin.io', 4444), # Grin
('dht.lbry.madiator.com', 4444), # Madiator ('dht.lbry.madiator.com', 4444), # Madiator
('dht.lbry.pigg.es', 4444), # Pigges
('lbrynet1.lbry.com', 4444), # US EAST ('lbrynet1.lbry.com', 4444), # US EAST
('lbrynet2.lbry.com', 4444), # US WEST ('lbrynet2.lbry.com', 4444), # US WEST
('lbrynet3.lbry.com', 4444), # EU ('lbrynet3.lbry.com', 4444), # EU
('lbrynet4.lbry.com', 4444), # ASIA ('lbrynet4.lbry.com', 4444) # ASIA
('dht.lizard.technology', 4444), # Jack
('s2.lbry.network', 4444),
]) ])
# blockchain # blockchain

View file

@ -67,7 +67,7 @@ class ConnectionManager:
while True: while True:
last = time.perf_counter() last = time.perf_counter()
await asyncio.sleep(0.1) await asyncio.sleep(0.1, loop=self.loop)
self._status['incoming_bps'].clear() self._status['incoming_bps'].clear()
self._status['outgoing_bps'].clear() self._status['outgoing_bps'].clear()
now = time.perf_counter() now = time.perf_counter()

View file

@ -42,13 +42,15 @@ class BlobAnnouncer:
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers) log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
except Exception as err: except Exception as err:
self.announcements_sent_metric.labels(peers=0, error=True).inc() self.announcements_sent_metric.labels(peers=0, error=True).inc()
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise err
log.warning("error announcing %s: %s", blob_hash[:8], str(err)) log.warning("error announcing %s: %s", blob_hash[:8], str(err))
async def _announce(self, batch_size: typing.Optional[int] = 10): async def _announce(self, batch_size: typing.Optional[int] = 10):
while batch_size: while batch_size:
if not self.node.joined.is_set(): if not self.node.joined.is_set():
await self.node.joined.wait() await self.node.joined.wait()
await asyncio.sleep(60) await asyncio.sleep(60, loop=self.loop)
if not self.node.protocol.routing_table.get_peers(): if not self.node.protocol.routing_table.get_peers():
log.warning("No peers in DHT, announce round skipped") log.warning("No peers in DHT, announce round skipped")
continue continue
@ -57,7 +59,7 @@ class BlobAnnouncer:
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue)) log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
while len(self.announce_queue) > 0: while len(self.announce_queue) > 0:
log.info("%i blobs to announce", len(self.announce_queue)) log.info("%i blobs to announce", len(self.announce_queue))
await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)]) await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)], loop=self.loop)
announced = list(filter(None, self.announced)) announced = list(filter(None, self.announced))
if announced: if announced:
await self.storage.update_last_announced_blobs(announced) await self.storage.update_last_announced_blobs(announced)

View file

@ -37,7 +37,7 @@ class Node:
self.protocol = KademliaProtocol(loop, peer_manager, node_id, external_ip, udp_port, peer_port, rpc_timeout, self.protocol = KademliaProtocol(loop, peer_manager, node_id, external_ip, udp_port, peer_port, rpc_timeout,
split_buckets_under_index, is_bootstrap_node) split_buckets_under_index, is_bootstrap_node)
self.listening_port: asyncio.DatagramTransport = None self.listening_port: asyncio.DatagramTransport = None
self.joined = asyncio.Event() self.joined = asyncio.Event(loop=self.loop)
self._join_task: asyncio.Task = None self._join_task: asyncio.Task = None
self._refresh_task: asyncio.Task = None self._refresh_task: asyncio.Task = None
self._storage = storage self._storage = storage
@ -79,7 +79,7 @@ class Node:
else: else:
if force_once: if force_once:
break break
fut = asyncio.Future() fut = asyncio.Future(loop=self.loop)
self.loop.call_later(constants.REFRESH_INTERVAL // 4, fut.set_result, None) self.loop.call_later(constants.REFRESH_INTERVAL // 4, fut.set_result, None)
await fut await fut
continue continue
@ -93,7 +93,7 @@ class Node:
if force_once: if force_once:
break break
fut = asyncio.Future() fut = asyncio.Future(loop=self.loop)
self.loop.call_later(constants.REFRESH_INTERVAL, fut.set_result, None) self.loop.call_later(constants.REFRESH_INTERVAL, fut.set_result, None)
await fut await fut
@ -108,7 +108,7 @@ class Node:
for peer in peers: for peer in peers:
log.debug("store to %s %s %s", peer.address, peer.udp_port, peer.tcp_port) log.debug("store to %s %s %s", peer.address, peer.udp_port, peer.tcp_port)
stored_to_tup = await asyncio.gather( stored_to_tup = await asyncio.gather(
*(self.protocol.store_to_peer(hash_value, peer) for peer in peers) *(self.protocol.store_to_peer(hash_value, peer) for peer in peers), loop=self.loop
) )
stored_to = [node_id for node_id, contacted in stored_to_tup if contacted] stored_to = [node_id for node_id, contacted in stored_to_tup if contacted]
if stored_to: if stored_to:
@ -182,14 +182,14 @@ class Node:
for address, udp_port in known_node_urls or [] for address, udp_port in known_node_urls or []
])) ]))
except socket.gaierror: except socket.gaierror:
await asyncio.sleep(30) await asyncio.sleep(30, loop=self.loop)
continue continue
self.protocol.peer_manager.reset() self.protocol.peer_manager.reset()
self.protocol.ping_queue.enqueue_maybe_ping(*seed_peers, delay=0.0) self.protocol.ping_queue.enqueue_maybe_ping(*seed_peers, delay=0.0)
await self.peer_search(self.protocol.node_id, shortlist=seed_peers, count=32) await self.peer_search(self.protocol.node_id, shortlist=seed_peers, count=32)
await asyncio.sleep(1) await asyncio.sleep(1, loop=self.loop)
def start(self, interface: str, known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None): def start(self, interface: str, known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None):
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls)) self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls))
@ -271,7 +271,7 @@ class Node:
def accumulate_peers(self, search_queue: asyncio.Queue, def accumulate_peers(self, search_queue: asyncio.Queue,
peer_queue: typing.Optional[asyncio.Queue] = None peer_queue: typing.Optional[asyncio.Queue] = None
) -> typing.Tuple[asyncio.Queue, asyncio.Task]: ) -> typing.Tuple[asyncio.Queue, asyncio.Task]:
queue = peer_queue or asyncio.Queue() queue = peer_queue or asyncio.Queue(loop=self.loop)
return queue, self.loop.create_task(self._accumulate_peers_for_value(search_queue, queue)) return queue, self.loop.create_task(self._accumulate_peers_for_value(search_queue, queue))

View file

@ -83,7 +83,7 @@ class IterativeFinder(AsyncIterator):
self.contacted: typing.Set['KademliaPeer'] = set() self.contacted: typing.Set['KademliaPeer'] = set()
self.distance = Distance(key) self.distance = Distance(key)
self.iteration_queue = asyncio.Queue() self.iteration_queue = asyncio.Queue(loop=self.loop)
self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {} self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {}
self.iteration_count = 0 self.iteration_count = 0

View file

@ -253,7 +253,7 @@ class PingQueue:
del self._pending_contacts[peer] del self._pending_contacts[peer]
self.maybe_ping(peer) self.maybe_ping(peer)
break break
await asyncio.sleep(1) await asyncio.sleep(1, loop=self._loop)
def start(self): def start(self):
assert not self._running assert not self._running
@ -319,10 +319,10 @@ class KademliaProtocol(DatagramProtocol):
self.ping_queue = PingQueue(self.loop, self) self.ping_queue = PingQueue(self.loop, self)
self.node_rpc = KademliaRPC(self, self.loop, self.peer_port) self.node_rpc = KademliaRPC(self, self.loop, self.peer_port)
self.rpc_timeout = rpc_timeout self.rpc_timeout = rpc_timeout
self._split_lock = asyncio.Lock() self._split_lock = asyncio.Lock(loop=self.loop)
self._to_remove: typing.Set['KademliaPeer'] = set() self._to_remove: typing.Set['KademliaPeer'] = set()
self._to_add: typing.Set['KademliaPeer'] = set() self._to_add: typing.Set['KademliaPeer'] = set()
self._wakeup_routing_task = asyncio.Event() self._wakeup_routing_task = asyncio.Event(loop=self.loop)
self.maintaing_routing_task: typing.Optional[asyncio.Task] = None self.maintaing_routing_task: typing.Optional[asyncio.Task] = None
@functools.lru_cache(128) @functools.lru_cache(128)
@ -385,7 +385,7 @@ class KademliaProtocol(DatagramProtocol):
while self._to_add: while self._to_add:
async with self._split_lock: async with self._split_lock:
await self._add_peer(self._to_add.pop()) await self._add_peer(self._to_add.pop())
await asyncio.gather(self._wakeup_routing_task.wait(), asyncio.sleep(.1)) await asyncio.gather(self._wakeup_routing_task.wait(), asyncio.sleep(.1, loop=self.loop), loop=self.loop)
self._wakeup_routing_task.clear() self._wakeup_routing_task.clear()
def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram): def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram):

View file

@ -8,7 +8,6 @@ from prometheus_client import Gauge
from lbry import utils from lbry import utils
from lbry.dht import constants from lbry.dht import constants
from lbry.dht.error import RemoteException
from lbry.dht.protocol.distance import Distance from lbry.dht.protocol.distance import Distance
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.dht.peer import KademliaPeer, PeerManager from lbry.dht.peer import KademliaPeer, PeerManager
@ -396,7 +395,7 @@ class TreeRoutingTable:
try: try:
await probe(to_replace) await probe(to_replace)
return False return False
except (asyncio.TimeoutError, RemoteException): except asyncio.TimeoutError:
log.debug("Replacing dead contact in bucket %i: %s:%i with %s:%i ", bucket_index, log.debug("Replacing dead contact in bucket %i: %s:%i with %s:%i ", bucket_index,
to_replace.address, to_replace.udp_port, peer.address, peer.udp_port) to_replace.address, to_replace.udp_port, peer.address, peer.udp_port)
if to_replace in self.buckets[bucket_index]: if to_replace in self.buckets[bucket_index]:

View file

@ -37,7 +37,7 @@ class Component(metaclass=ComponentType):
def running(self): def running(self):
return self._running return self._running
async def get_status(self): # pylint: disable=no-self-use async def get_status(self):
return return
async def start(self): async def start(self):

View file

@ -42,7 +42,7 @@ class ComponentManager:
self.analytics_manager = analytics_manager self.analytics_manager = analytics_manager
self.component_classes = {} self.component_classes = {}
self.components = set() self.components = set()
self.started = asyncio.Event() self.started = asyncio.Event(loop=self.loop)
self.peer_manager = peer_manager or PeerManager(asyncio.get_event_loop_policy().get_event_loop()) self.peer_manager = peer_manager or PeerManager(asyncio.get_event_loop_policy().get_event_loop())
for component_name, component_class in self.default_component_classes.items(): for component_name, component_class in self.default_component_classes.items():
@ -118,7 +118,7 @@ class ComponentManager:
component._setup() for component in stage if not component.running component._setup() for component in stage if not component.running
] ]
if needing_start: if needing_start:
await asyncio.wait(map(asyncio.create_task, needing_start)) await asyncio.wait(needing_start)
self.started.set() self.started.set()
async def stop(self): async def stop(self):
@ -131,7 +131,7 @@ class ComponentManager:
component._stop() for component in stage if component.running component._stop() for component in stage if component.running
] ]
if needing_stop: if needing_stop:
await asyncio.wait(map(asyncio.create_task, needing_stop)) await asyncio.wait(needing_stop)
def all_components_running(self, *component_names): def all_components_running(self, *component_names):
""" """

View file

@ -374,7 +374,7 @@ class FileManagerComponent(Component):
log.info('Done setting up file manager') log.info('Done setting up file manager')
async def stop(self): async def stop(self):
await self.file_manager.stop() self.file_manager.stop()
class BackgroundDownloaderComponent(Component): class BackgroundDownloaderComponent(Component):
@ -551,7 +551,7 @@ class UPnPComponent(Component):
while True: while True:
if now: if now:
await self._maintain_redirects() await self._maintain_redirects()
await asyncio.sleep(360) await asyncio.sleep(360, loop=self.component_manager.loop)
async def _maintain_redirects(self): async def _maintain_redirects(self):
# setup the gateway if necessary # setup the gateway if necessary
@ -560,6 +560,8 @@ class UPnPComponent(Component):
self.upnp = await UPnP.discover(loop=self.component_manager.loop) self.upnp = await UPnP.discover(loop=self.component_manager.loop)
log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string) log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string)
except Exception as err: except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.warning("upnp discovery failed: %s", err) log.warning("upnp discovery failed: %s", err)
self.upnp = None self.upnp = None
@ -671,7 +673,7 @@ class UPnPComponent(Component):
log.info("Removing upnp redirects: %s", self.upnp_redirects) log.info("Removing upnp redirects: %s", self.upnp_redirects)
await asyncio.wait([ await asyncio.wait([
self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items() self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items()
]) ], loop=self.component_manager.loop)
if self._maintain_redirects_task and not self._maintain_redirects_task.done(): if self._maintain_redirects_task and not self._maintain_redirects_task.done():
self._maintain_redirects_task.cancel() self._maintain_redirects_task.cancel()

View file

@ -614,8 +614,7 @@ class Daemon(metaclass=JSONRPCServerType):
content_type='application/json' content_type='application/json'
) )
@staticmethod async def handle_metrics_get_request(self, request: web.Request):
async def handle_metrics_get_request(request: web.Request):
try: try:
return web.Response( return web.Response(
text=prom_generate_latest().decode(), text=prom_generate_latest().decode(),
@ -1331,7 +1330,7 @@ class Daemon(metaclass=JSONRPCServerType):
@requires("wallet") @requires("wallet")
async def jsonrpc_wallet_export(self, password=None, wallet_id=None): async def jsonrpc_wallet_export(self, password=None, wallet_id=None):
""" """
Exports encrypted wallet data if password is supplied; otherwise plain JSON. Export wallet data
Wallet must be unlocked to perform this operation. Wallet must be unlocked to perform this operation.
@ -1343,19 +1342,18 @@ class Daemon(metaclass=JSONRPCServerType):
--wallet_id=<wallet_id> : (str) wallet being exported --wallet_id=<wallet_id> : (str) wallet being exported
Returns: Returns:
(str) data: base64-encoded encrypted wallet, or cleartext JSON (str) data
""" """
assert password is not None, "passwordless use is not implemented yet"
wallet = self.wallet_manager.get_wallet_or_default(wallet_id) wallet = self.wallet_manager.get_wallet_or_default(wallet_id)
if password is None: encrypted = wallet.pack(password)
return wallet.to_json() return encrypted.decode()
return wallet.pack(password).decode()
@requires("wallet") @requires("wallet")
async def jsonrpc_wallet_import(self, data, password=None, wallet_id=None, blocking=False): async def jsonrpc_wallet_import(self, data, password=None, wallet_id=None, blocking=False):
""" """
Import wallet data and merge accounts and preferences. Data is expected to be JSON if Import wallet data and merge accounts, preferences.
password is not supplied.
Wallet must be unlocked to perform this operation. Wallet must be unlocked to perform this operation.
@ -1370,8 +1368,9 @@ class Daemon(metaclass=JSONRPCServerType):
--blocking : (bool) wait until any new accounts have merged --blocking : (bool) wait until any new accounts have merged
Returns: Returns:
(str) base64-encoded encrypted wallet, or cleartext JSON (str) data
""" """
assert not data.strip().startswith("{"), "unencrypted wallet import is not implemented yet"
wallet = self.wallet_manager.get_wallet_or_default(wallet_id) wallet = self.wallet_manager.get_wallet_or_default(wallet_id)
added_accounts, merged_accounts = wallet.merge(self.wallet_manager, password, data) added_accounts, merged_accounts = wallet.merge(self.wallet_manager, password, data)
for new_account in itertools.chain(added_accounts, merged_accounts): for new_account in itertools.chain(added_accounts, merged_accounts):
@ -1385,7 +1384,8 @@ class Daemon(metaclass=JSONRPCServerType):
for new_account in added_accounts: for new_account in added_accounts:
asyncio.create_task(self.ledger.subscribe_account(new_account)) asyncio.create_task(self.ledger.subscribe_account(new_account))
wallet.save() wallet.save()
return await self.jsonrpc_wallet_export(password=password, wallet_id=wallet_id) encrypted = wallet.pack(password)
return encrypted.decode()
@requires("wallet") @requires("wallet")
async def jsonrpc_wallet_add(self, wallet_id): async def jsonrpc_wallet_add(self, wallet_id):
@ -2410,7 +2410,6 @@ class Daemon(metaclass=JSONRPCServerType):
Usage: Usage:
claim_list [--claim_type=<claim_type>...] [--claim_id=<claim_id>...] [--name=<name>...] [--is_spent] claim_list [--claim_type=<claim_type>...] [--claim_id=<claim_id>...] [--name=<name>...] [--is_spent]
[--reposted_claim_id=<reposted_claim_id>...]
[--channel_id=<channel_id>...] [--account_id=<account_id>] [--wallet_id=<wallet_id>] [--channel_id=<channel_id>...] [--account_id=<account_id>] [--wallet_id=<wallet_id>]
[--has_source | --has_no_source] [--page=<page>] [--page_size=<page_size>] [--has_source | --has_no_source] [--page=<page>] [--page_size=<page_size>]
[--resolve] [--order_by=<order_by>] [--no_totals] [--include_received_tips] [--resolve] [--order_by=<order_by>] [--no_totals] [--include_received_tips]
@ -2421,7 +2420,6 @@ class Daemon(metaclass=JSONRPCServerType):
--channel_id=<channel_id> : (str or list) streams in this channel --channel_id=<channel_id> : (str or list) streams in this channel
--name=<name> : (str or list) claim name --name=<name> : (str or list) claim name
--is_spent : (bool) shows previous claim updates and abandons --is_spent : (bool) shows previous claim updates and abandons
--reposted_claim_id=<reposted_claim_id> : (str or list) reposted claim id
--account_id=<account_id> : (str) id of the account to query --account_id=<account_id> : (str) id of the account to query
--wallet_id=<wallet_id> : (str) restrict results to specific wallet --wallet_id=<wallet_id> : (str) restrict results to specific wallet
--has_source : (bool) list claims containing a source field --has_source : (bool) list claims containing a source field
@ -2944,21 +2942,19 @@ class Daemon(metaclass=JSONRPCServerType):
@requires(WALLET_COMPONENT) @requires(WALLET_COMPONENT)
async def jsonrpc_channel_sign( async def jsonrpc_channel_sign(
self, channel_name=None, channel_id=None, hexdata=None, salt=None, self, channel_name=None, channel_id=None, hexdata=None, channel_account_id=None, wallet_id=None):
channel_account_id=None, wallet_id=None):
""" """
Signs data using the specified channel signing key. Signs data using the specified channel signing key.
Usage: Usage:
channel_sign [<channel_name> | --channel_name=<channel_name>] [<channel_id> | --channel_id=<channel_id>] channel_sign [<channel_name> | --channel_name=<channel_name>]
[<hexdata> | --hexdata=<hexdata>] [<salt> | --salt=<salt>] [<channel_id> | --channel_id=<channel_id>] [<hexdata> | --hexdata=<hexdata>]
[--channel_account_id=<channel_account_id>...] [--wallet_id=<wallet_id>] [--channel_account_id=<channel_account_id>...] [--wallet_id=<wallet_id>]
Options: Options:
--channel_name=<channel_name> : (str) name of channel used to sign (or use channel id) --channel_name=<channel_name> : (str) name of channel used to sign (or use channel id)
--channel_id=<channel_id> : (str) claim id of channel used to sign (or use channel name) --channel_id=<channel_id> : (str) claim id of channel used to sign (or use channel name)
--hexdata=<hexdata> : (str) data to sign, encoded as hexadecimal --hexdata=<hexdata> : (str) data to sign, encoded as hexadecimal
--salt=<salt> : (str) salt to use for signing, default is to use timestamp
--channel_account_id=<channel_account_id>: (str) one or more account ids for accounts to look in --channel_account_id=<channel_account_id>: (str) one or more account ids for accounts to look in
for channel certificates, defaults to all accounts. for channel certificates, defaults to all accounts.
--wallet_id=<wallet_id> : (str) restrict operation to specific wallet --wallet_id=<wallet_id> : (str) restrict operation to specific wallet
@ -2975,13 +2971,11 @@ class Daemon(metaclass=JSONRPCServerType):
signing_channel = await self.get_channel_or_error( signing_channel = await self.get_channel_or_error(
wallet, channel_account_id, channel_id, channel_name, for_signing=True wallet, channel_account_id, channel_id, channel_name, for_signing=True
) )
if salt is None: timestamp = str(int(time.time()))
salt = str(int(time.time())) signature = signing_channel.sign_data(unhexlify(str(hexdata)), timestamp)
signature = signing_channel.sign_data(unhexlify(str(hexdata)), salt)
return { return {
'signature': signature, 'signature': signature,
'signing_ts': salt, # DEPRECATED 'signing_ts': timestamp
'salt': salt,
} }
@requires(WALLET_COMPONENT) @requires(WALLET_COMPONENT)

View file

@ -80,6 +80,8 @@ class MarketFeed:
self.rate = ExchangeRate(self.market, rate, int(time.time())) self.rate = ExchangeRate(self.market, rate, int(time.time()))
self.last_check = time.time() self.last_check = time.time()
return self.rate return self.rate
except asyncio.CancelledError:
raise
except asyncio.TimeoutError: except asyncio.TimeoutError:
log.warning("Timed out fetching exchange rate from %s.", self.name) log.warning("Timed out fetching exchange rate from %s.", self.name)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:

View file

@ -328,8 +328,8 @@ class JSONResponseEncoder(JSONEncoder):
result.update({ result.update({
'streaming_url': managed_stream.stream_url, 'streaming_url': managed_stream.stream_url,
'stream_hash': managed_stream.stream_hash, 'stream_hash': managed_stream.stream_hash,
'stream_name': managed_stream.stream_name, 'stream_name': managed_stream.descriptor.stream_name,
'suggested_file_name': managed_stream.suggested_file_name, 'suggested_file_name': managed_stream.descriptor.suggested_file_name,
'sd_hash': managed_stream.descriptor.sd_hash, 'sd_hash': managed_stream.descriptor.sd_hash,
'mime_type': managed_stream.mime_type, 'mime_type': managed_stream.mime_type,
'key': managed_stream.descriptor.key, 'key': managed_stream.descriptor.key,

View file

@ -793,7 +793,7 @@ class SQLiteStorage(SQLiteMixin):
await self.db.run(_save_claims) await self.db.run(_save_claims)
if update_file_callbacks: if update_file_callbacks:
await asyncio.wait(map(asyncio.create_task, update_file_callbacks)) await asyncio.wait(update_file_callbacks)
if claim_id_to_supports: if claim_id_to_supports:
await self.save_supports(claim_id_to_supports) await self.save_supports(claim_id_to_supports)

View file

@ -13,12 +13,11 @@ from lbry.schema.url import URL
from lbry.wallet.dewies import dewies_to_lbc from lbry.wallet.dewies import dewies_to_lbc
from lbry.file.source_manager import SourceManager from lbry.file.source_manager import SourceManager
from lbry.file.source import ManagedDownloadSource from lbry.file.source import ManagedDownloadSource
from lbry.extras.daemon.storage import StoredContentClaim
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.conf import Config from lbry.conf import Config
from lbry.extras.daemon.analytics import AnalyticsManager from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.extras.daemon.storage import SQLiteStorage from lbry.extras.daemon.storage import SQLiteStorage
from lbry.wallet import WalletManager from lbry.wallet import WalletManager, Output
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -50,10 +49,10 @@ class FileManager:
await manager.started.wait() await manager.started.wait()
self.started.set() self.started.set()
async def stop(self): def stop(self):
for manager in self.source_managers.values(): for manager in self.source_managers.values():
# fixme: pop or not? # fixme: pop or not?
await manager.stop() manager.stop()
self.started.clear() self.started.clear()
@cache_concurrent @cache_concurrent
@ -99,6 +98,8 @@ class FileManager:
except asyncio.TimeoutError: except asyncio.TimeoutError:
raise ResolveTimeoutError(uri) raise ResolveTimeoutError(uri)
except Exception as err: except Exception as err:
if isinstance(err, asyncio.CancelledError):
raise
log.exception("Unexpected error resolving stream:") log.exception("Unexpected error resolving stream:")
raise ResolveError(f"Unexpected error resolving stream: {str(err)}") raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
if 'error' in resolved_result: if 'error' in resolved_result:
@ -193,24 +194,21 @@ class FileManager:
#################### ####################
# make downloader and wait for start # make downloader and wait for start
#################### ####################
# temporary with fields we know so downloader can start. Missing fields are populated later.
stored_claim = StoredContentClaim(outpoint=outpoint, claim_id=txo.claim_id, name=txo.claim_name,
amount=txo.amount, height=txo.tx_ref.height,
serialized=claim.to_bytes().hex())
if not claim.stream.source.bt_infohash: if not claim.stream.source.bt_infohash:
# fixme: this shouldnt be here # fixme: this shouldnt be here
stream = ManagedStream( stream = ManagedStream(
self.loop, self.config, source_manager.blob_manager, claim.stream.source.sd_hash, self.loop, self.config, source_manager.blob_manager, claim.stream.source.sd_hash,
download_directory, file_name, ManagedStream.STATUS_RUNNING, content_fee=payment, download_directory, file_name, ManagedStream.STATUS_RUNNING, content_fee=payment,
analytics_manager=self.analytics_manager, claim=stored_claim analytics_manager=self.analytics_manager
) )
stream.downloader.node = source_manager.node stream.downloader.node = source_manager.node
else: else:
stream = TorrentSource( stream = TorrentSource(
self.loop, self.config, self.storage, identifier=claim.stream.source.bt_infohash, self.loop, self.config, self.storage, identifier=claim.stream.source.bt_infohash,
file_name=file_name, download_directory=download_directory or self.config.download_dir, file_name=file_name, download_directory=download_directory or self.config.download_dir,
status=ManagedStream.STATUS_RUNNING, claim=stored_claim, analytics_manager=self.analytics_manager, status=ManagedStream.STATUS_RUNNING,
analytics_manager=self.analytics_manager,
torrent_session=source_manager.torrent_session torrent_session=source_manager.torrent_session
) )
log.info("starting download for %s", uri) log.info("starting download for %s", uri)
@ -242,12 +240,13 @@ class FileManager:
claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier) claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier)
stream.set_claim(claim_info, claim) stream.set_claim(claim_info, claim)
if save_file: if save_file:
await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download)) await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download),
loop=self.loop)
return stream return stream
except asyncio.TimeoutError: except asyncio.TimeoutError:
error = DownloadDataTimeoutError(stream.sd_hash) error = DownloadDataTimeoutError(stream.sd_hash)
raise error raise error
except (Exception, asyncio.CancelledError) as err: # forgive data timeout, don't delete stream except Exception as err: # forgive data timeout, don't delete stream
expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError, expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
KeyFeeAboveMaxAllowedError, ResolveError, InvalidStreamURLError) KeyFeeAboveMaxAllowedError, ResolveError, InvalidStreamURLError)
if isinstance(err, expected): if isinstance(err, expected):

View file

@ -47,10 +47,10 @@ class ManagedDownloadSource:
self.analytics_manager = analytics_manager self.analytics_manager = analytics_manager
self.downloader = None self.downloader = None
self.saving = asyncio.Event() self.saving = asyncio.Event(loop=self.loop)
self.finished_writing = asyncio.Event() self.finished_writing = asyncio.Event(loop=self.loop)
self.started_writing = asyncio.Event() self.started_writing = asyncio.Event(loop=self.loop)
self.finished_write_attempt = asyncio.Event() self.finished_write_attempt = asyncio.Event(loop=self.loop)
# @classmethod # @classmethod
# async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', file_path: str, # async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', file_path: str,
@ -67,7 +67,7 @@ class ManagedDownloadSource:
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None): async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
raise NotImplementedError() raise NotImplementedError()
async def stop_tasks(self): def stop_tasks(self):
raise NotImplementedError() raise NotImplementedError()
def set_claim(self, claim_info: typing.Dict, claim: 'Claim'): def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):

View file

@ -54,16 +54,16 @@ class SourceManager:
self.storage = storage self.storage = storage
self.analytics_manager = analytics_manager self.analytics_manager = analytics_manager
self._sources: typing.Dict[str, ManagedDownloadSource] = {} self._sources: typing.Dict[str, ManagedDownloadSource] = {}
self.started = asyncio.Event() self.started = asyncio.Event(loop=self.loop)
def add(self, source: ManagedDownloadSource): def add(self, source: ManagedDownloadSource):
self._sources[source.identifier] = source self._sources[source.identifier] = source
async def remove(self, source: ManagedDownloadSource): def remove(self, source: ManagedDownloadSource):
if source.identifier not in self._sources: if source.identifier not in self._sources:
return return
self._sources.pop(source.identifier) self._sources.pop(source.identifier)
await source.stop_tasks() source.stop_tasks()
async def initialize_from_database(self): async def initialize_from_database(self):
raise NotImplementedError() raise NotImplementedError()
@ -72,10 +72,10 @@ class SourceManager:
await self.initialize_from_database() await self.initialize_from_database()
self.started.set() self.started.set()
async def stop(self): def stop(self):
while self._sources: while self._sources:
_, source = self._sources.popitem() _, source = self._sources.popitem()
await source.stop_tasks() source.stop_tasks()
self.started.clear() self.started.clear()
async def create(self, file_path: str, key: Optional[bytes] = None, async def create(self, file_path: str, key: Optional[bytes] = None,
@ -83,7 +83,7 @@ class SourceManager:
raise NotImplementedError() raise NotImplementedError()
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
await self.remove(source) self.remove(source)
if delete_file and source.output_file_exists: if delete_file and source.output_file_exists:
os.remove(source.full_path) os.remove(source.full_path)

View file

@ -398,12 +398,6 @@ class Repost(BaseClaim):
claim_type = Claim.REPOST claim_type = Claim.REPOST
def to_dict(self):
claim = super().to_dict()
if claim.pop('claim_hash', None):
claim['claim_id'] = self.reference.claim_id
return claim
@property @property
def reference(self) -> ClaimReference: def reference(self) -> ClaimReference:
return ClaimReference(self.message) return ClaimReference(self.message)

View file

@ -23,7 +23,6 @@ class BackgroundDownloader:
except ValueError: except ValueError:
return return
except asyncio.CancelledError: except asyncio.CancelledError:
log.debug("Cancelled background downloader")
raise raise
except Exception: except Exception:
log.error("Unexpected download error on background downloader") log.error("Unexpected download error on background downloader")

View file

@ -27,8 +27,8 @@ class StreamDownloader:
self.config = config self.config = config
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.sd_hash = sd_hash self.sd_hash = sd_hash
self.search_queue = asyncio.Queue() # blob hashes to feed into the iterative finder self.search_queue = asyncio.Queue(loop=loop) # blob hashes to feed into the iterative finder
self.peer_queue = asyncio.Queue() # new peers to try self.peer_queue = asyncio.Queue(loop=loop) # new peers to try
self.blob_downloader = BlobDownloader(self.loop, self.config, self.blob_manager, self.peer_queue) self.blob_downloader = BlobDownloader(self.loop, self.config, self.blob_manager, self.peer_queue)
self.descriptor: typing.Optional[StreamDescriptor] = descriptor self.descriptor: typing.Optional[StreamDescriptor] = descriptor
self.node: typing.Optional['Node'] = None self.node: typing.Optional['Node'] = None
@ -72,7 +72,7 @@ class StreamDownloader:
now = self.loop.time() now = self.loop.time()
sd_blob = await asyncio.wait_for( sd_blob = await asyncio.wait_for(
self.blob_downloader.download_blob(self.sd_hash, connection_id), self.blob_downloader.download_blob(self.sd_hash, connection_id),
self.config.blob_download_timeout self.config.blob_download_timeout, loop=self.loop
) )
log.info("downloaded sd blob %s", self.sd_hash) log.info("downloaded sd blob %s", self.sd_hash)
self.time_to_descriptor = self.loop.time() - now self.time_to_descriptor = self.loop.time() - now
@ -111,7 +111,7 @@ class StreamDownloader:
raise ValueError(f"blob {blob_info.blob_hash} is not part of stream with sd hash {self.sd_hash}") raise ValueError(f"blob {blob_info.blob_hash} is not part of stream with sd hash {self.sd_hash}")
blob = await asyncio.wait_for( blob = await asyncio.wait_for(
self.blob_downloader.download_blob(blob_info.blob_hash, blob_info.length, connection_id), self.blob_downloader.download_blob(blob_info.blob_hash, blob_info.length, connection_id),
self.config.blob_download_timeout * 10 self.config.blob_download_timeout * 10, loop=self.loop
) )
return blob return blob

View file

@ -60,9 +60,9 @@ class ManagedStream(ManagedDownloadSource):
self.file_output_task: typing.Optional[asyncio.Task] = None self.file_output_task: typing.Optional[asyncio.Task] = None
self.delayed_stop_task: typing.Optional[asyncio.Task] = None self.delayed_stop_task: typing.Optional[asyncio.Task] = None
self.streaming_responses: typing.List[typing.Tuple[Request, StreamResponse]] = [] self.streaming_responses: typing.List[typing.Tuple[Request, StreamResponse]] = []
self.fully_reflected = asyncio.Event() self.fully_reflected = asyncio.Event(loop=self.loop)
self.streaming = asyncio.Event() self.streaming = asyncio.Event(loop=self.loop)
self._running = asyncio.Event() self._running = asyncio.Event(loop=self.loop)
@property @property
def sd_hash(self) -> str: def sd_hash(self) -> str:
@ -82,19 +82,7 @@ class ManagedStream(ManagedDownloadSource):
@property @property
def file_name(self) -> Optional[str]: def file_name(self) -> Optional[str]:
return self._file_name or self.suggested_file_name return self._file_name or (self.descriptor.suggested_file_name if self.descriptor else None)
@property
def suggested_file_name(self) -> Optional[str]:
first_option = ((self.descriptor and self.descriptor.suggested_file_name) or '').strip()
return sanitize_file_name(first_option or (self.stream_claim_info and self.stream_claim_info.claim and
self.stream_claim_info.claim.stream.source.name))
@property
def stream_name(self) -> Optional[str]:
first_option = ((self.descriptor and self.descriptor.stream_name) or '').strip()
return first_option or (self.stream_claim_info and self.stream_claim_info.claim and
self.stream_claim_info.claim.stream.source.name)
@property @property
def written_bytes(self) -> int: def written_bytes(self) -> int:
@ -128,7 +116,7 @@ class ManagedStream(ManagedDownloadSource):
@property @property
def mime_type(self): def mime_type(self):
return guess_media_type(os.path.basename(self.suggested_file_name))[0] return guess_media_type(os.path.basename(self.descriptor.suggested_file_name))[0]
@property @property
def download_path(self): def download_path(self):
@ -161,7 +149,7 @@ class ManagedStream(ManagedDownloadSource):
log.info("start downloader for stream (sd hash: %s)", self.sd_hash) log.info("start downloader for stream (sd hash: %s)", self.sd_hash)
self._running.set() self._running.set()
try: try:
await asyncio.wait_for(self.downloader.start(), timeout) await asyncio.wait_for(self.downloader.start(), timeout, loop=self.loop)
except asyncio.TimeoutError: except asyncio.TimeoutError:
self._running.clear() self._running.clear()
raise DownloadSDTimeoutError(self.sd_hash) raise DownloadSDTimeoutError(self.sd_hash)
@ -174,7 +162,7 @@ class ManagedStream(ManagedDownloadSource):
if not self._file_name: if not self._file_name:
self._file_name = await get_next_available_file_name( self._file_name = await get_next_available_file_name(
self.loop, self.download_directory, self.loop, self.download_directory,
self._file_name or sanitize_file_name(self.suggested_file_name) self._file_name or sanitize_file_name(self.descriptor.suggested_file_name)
) )
file_name, download_dir = self._file_name, self.download_directory file_name, download_dir = self._file_name, self.download_directory
else: else:
@ -191,7 +179,7 @@ class ManagedStream(ManagedDownloadSource):
Stop any running save/stream tasks as well as the downloader and update the status in the database Stop any running save/stream tasks as well as the downloader and update the status in the database
""" """
await self.stop_tasks() self.stop_tasks()
if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING: if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING:
await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED) await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED)
@ -279,7 +267,7 @@ class ManagedStream(ManagedDownloadSource):
log.info("finished saving file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id, log.info("finished saving file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id,
self.sd_hash[:6], self.full_path) self.sd_hash[:6], self.full_path)
await self.blob_manager.storage.set_saved_file(self.stream_hash) await self.blob_manager.storage.set_saved_file(self.stream_hash)
except (Exception, asyncio.CancelledError) as err: except Exception as err:
if os.path.isfile(output_path): if os.path.isfile(output_path):
log.warning("removing incomplete download %s for %s", output_path, self.sd_hash) log.warning("removing incomplete download %s for %s", output_path, self.sd_hash)
os.remove(output_path) os.remove(output_path)
@ -306,14 +294,14 @@ class ManagedStream(ManagedDownloadSource):
self.download_directory = download_directory or self.download_directory or self.config.download_dir self.download_directory = download_directory or self.download_directory or self.config.download_dir
if not self.download_directory: if not self.download_directory:
raise ValueError("no directory to download to") raise ValueError("no directory to download to")
if not (file_name or self._file_name or self.suggested_file_name): if not (file_name or self._file_name or self.descriptor.suggested_file_name):
raise ValueError("no file name to download to") raise ValueError("no file name to download to")
if not os.path.isdir(self.download_directory): if not os.path.isdir(self.download_directory):
log.warning("download directory '%s' does not exist, attempting to make it", self.download_directory) log.warning("download directory '%s' does not exist, attempting to make it", self.download_directory)
os.mkdir(self.download_directory) os.mkdir(self.download_directory)
self._file_name = await get_next_available_file_name( self._file_name = await get_next_available_file_name(
self.loop, self.download_directory, self.loop, self.download_directory,
file_name or self._file_name or sanitize_file_name(self.suggested_file_name) file_name or self._file_name or sanitize_file_name(self.descriptor.suggested_file_name)
) )
await self.blob_manager.storage.change_file_download_dir_and_file_name( await self.blob_manager.storage.change_file_download_dir_and_file_name(
self.stream_hash, self.download_directory, self.file_name self.stream_hash, self.download_directory, self.file_name
@ -321,16 +309,15 @@ class ManagedStream(ManagedDownloadSource):
await self.update_status(ManagedStream.STATUS_RUNNING) await self.update_status(ManagedStream.STATUS_RUNNING)
self.file_output_task = self.loop.create_task(self._save_file(self.full_path)) self.file_output_task = self.loop.create_task(self._save_file(self.full_path))
try: try:
await asyncio.wait_for(self.started_writing.wait(), self.config.download_timeout) await asyncio.wait_for(self.started_writing.wait(), self.config.download_timeout, loop=self.loop)
except asyncio.TimeoutError: except asyncio.TimeoutError:
log.warning("timeout starting to write data for lbry://%s#%s", self.claim_name, self.claim_id) log.warning("timeout starting to write data for lbry://%s#%s", self.claim_name, self.claim_id)
await self.stop_tasks() self.stop_tasks()
await self.update_status(ManagedStream.STATUS_STOPPED) await self.update_status(ManagedStream.STATUS_STOPPED)
async def stop_tasks(self): def stop_tasks(self):
if self.file_output_task and not self.file_output_task.done(): if self.file_output_task and not self.file_output_task.done():
self.file_output_task.cancel() self.file_output_task.cancel()
await asyncio.gather(self.file_output_task, return_exceptions=True)
self.file_output_task = None self.file_output_task = None
while self.streaming_responses: while self.streaming_responses:
req, response = self.streaming_responses.pop() req, response = self.streaming_responses.pop()
@ -367,7 +354,7 @@ class ManagedStream(ManagedDownloadSource):
return sent return sent
except ConnectionError: except ConnectionError:
return sent return sent
except (OSError, Exception, asyncio.CancelledError) as err: except (OSError, Exception) as err:
if isinstance(err, asyncio.CancelledError): if isinstance(err, asyncio.CancelledError):
log.warning("stopped uploading %s#%s to reflector", self.claim_name, self.claim_id) log.warning("stopped uploading %s#%s to reflector", self.claim_name, self.claim_id)
elif isinstance(err, OSError): elif isinstance(err, OSError):
@ -402,7 +389,7 @@ class ManagedStream(ManagedDownloadSource):
self.sd_hash[:6]) self.sd_hash[:6])
await self.stop() await self.stop()
return return
await asyncio.sleep(1) await asyncio.sleep(1, loop=self.loop)
def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int, int]: def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int, int]:
if '=' in get_range: if '=' in get_range:

View file

@ -21,7 +21,7 @@ class ReflectorServerProtocol(asyncio.Protocol):
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.server_task: asyncio.Task = None self.server_task: asyncio.Task = None
self.started_listening = asyncio.Event() self.started_listening = asyncio.Event(loop=self.loop)
self.buf = b'' self.buf = b''
self.transport: asyncio.StreamWriter = None self.transport: asyncio.StreamWriter = None
self.writer: typing.Optional['HashBlobWriter'] = None self.writer: typing.Optional['HashBlobWriter'] = None
@ -29,9 +29,9 @@ class ReflectorServerProtocol(asyncio.Protocol):
self.descriptor: typing.Optional['StreamDescriptor'] = None self.descriptor: typing.Optional['StreamDescriptor'] = None
self.sd_blob: typing.Optional['BlobFile'] = None self.sd_blob: typing.Optional['BlobFile'] = None
self.received = [] self.received = []
self.incoming = incoming_event or asyncio.Event() self.incoming = incoming_event or asyncio.Event(loop=self.loop)
self.not_incoming = not_incoming_event or asyncio.Event() self.not_incoming = not_incoming_event or asyncio.Event(loop=self.loop)
self.stop_event = stop_event or asyncio.Event() self.stop_event = stop_event or asyncio.Event(loop=self.loop)
self.chunk_size = response_chunk_size self.chunk_size = response_chunk_size
self.wait_for_stop_task: typing.Optional[asyncio.Task] = None self.wait_for_stop_task: typing.Optional[asyncio.Task] = None
self.partial_event = partial_event self.partial_event = partial_event
@ -94,7 +94,7 @@ class ReflectorServerProtocol(asyncio.Protocol):
self.incoming.set() self.incoming.set()
self.send_response({"send_sd_blob": True}) self.send_response({"send_sd_blob": True})
try: try:
await asyncio.wait_for(self.sd_blob.verified.wait(), 30) await asyncio.wait_for(self.sd_blob.verified.wait(), 30, loop=self.loop)
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob( self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(
self.loop, self.blob_manager.blob_dir, self.sd_blob self.loop, self.blob_manager.blob_dir, self.sd_blob
) )
@ -140,7 +140,7 @@ class ReflectorServerProtocol(asyncio.Protocol):
self.incoming.set() self.incoming.set()
self.send_response({"send_blob": True}) self.send_response({"send_blob": True})
try: try:
await asyncio.wait_for(blob.verified.wait(), 30) await asyncio.wait_for(blob.verified.wait(), 30, loop=self.loop)
self.send_response({"received_blob": True}) self.send_response({"received_blob": True})
except asyncio.TimeoutError: except asyncio.TimeoutError:
self.send_response({"received_blob": False}) self.send_response({"received_blob": False})
@ -162,10 +162,10 @@ class ReflectorServer:
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.server_task: typing.Optional[asyncio.Task] = None self.server_task: typing.Optional[asyncio.Task] = None
self.started_listening = asyncio.Event() self.started_listening = asyncio.Event(loop=self.loop)
self.stopped_listening = asyncio.Event() self.stopped_listening = asyncio.Event(loop=self.loop)
self.incoming_event = incoming_event or asyncio.Event() self.incoming_event = incoming_event or asyncio.Event(loop=self.loop)
self.not_incoming_event = not_incoming_event or asyncio.Event() self.not_incoming_event = not_incoming_event or asyncio.Event(loop=self.loop)
self.response_chunk_size = response_chunk_size self.response_chunk_size = response_chunk_size
self.stop_event = stop_event self.stop_event = stop_event
self.partial_needs = partial_needs # for testing cases where it doesn't know what it wants self.partial_needs = partial_needs # for testing cases where it doesn't know what it wants

View file

@ -54,7 +54,7 @@ class StreamManager(SourceManager):
self.re_reflect_task: Optional[asyncio.Task] = None self.re_reflect_task: Optional[asyncio.Task] = None
self.update_stream_finished_futs: typing.List[asyncio.Future] = [] self.update_stream_finished_futs: typing.List[asyncio.Future] = []
self.running_reflector_uploads: typing.Dict[str, asyncio.Task] = {} self.running_reflector_uploads: typing.Dict[str, asyncio.Task] = {}
self.started = asyncio.Event() self.started = asyncio.Event(loop=self.loop)
@property @property
def streams(self): def streams(self):
@ -150,7 +150,7 @@ class StreamManager(SourceManager):
file_info['added_on'], file_info['fully_reflected'] file_info['added_on'], file_info['fully_reflected']
))) )))
if add_stream_tasks: if add_stream_tasks:
await asyncio.gather(*add_stream_tasks) await asyncio.gather(*add_stream_tasks, loop=self.loop)
log.info("Started stream manager with %i files", len(self._sources)) log.info("Started stream manager with %i files", len(self._sources))
if not self.node: if not self.node:
log.info("no DHT node given, resuming downloads trusting that we can contact reflector") log.info("no DHT node given, resuming downloads trusting that we can contact reflector")
@ -159,11 +159,14 @@ class StreamManager(SourceManager):
self.resume_saving_task = asyncio.ensure_future(asyncio.gather( self.resume_saving_task = asyncio.ensure_future(asyncio.gather(
*(self._sources[sd_hash].save_file(file_name, download_directory) *(self._sources[sd_hash].save_file(file_name, download_directory)
for (file_name, download_directory, sd_hash) in to_resume_saving), for (file_name, download_directory, sd_hash) in to_resume_saving),
loop=self.loop
)) ))
async def reflect_streams(self): async def reflect_streams(self):
try: try:
return await self._reflect_streams() return await self._reflect_streams()
except asyncio.CancelledError:
raise
except Exception: except Exception:
log.exception("reflector task encountered an unexpected error!") log.exception("reflector task encountered an unexpected error!")
@ -183,21 +186,21 @@ class StreamManager(SourceManager):
batch.append(self.reflect_stream(stream)) batch.append(self.reflect_stream(stream))
if len(batch) >= self.config.concurrent_reflector_uploads: if len(batch) >= self.config.concurrent_reflector_uploads:
log.debug("waiting for batch of %s reflecting streams", len(batch)) log.debug("waiting for batch of %s reflecting streams", len(batch))
await asyncio.gather(*batch) await asyncio.gather(*batch, loop=self.loop)
log.debug("done processing %s streams", len(batch)) log.debug("done processing %s streams", len(batch))
batch = [] batch = []
if batch: if batch:
log.debug("waiting for batch of %s reflecting streams", len(batch)) log.debug("waiting for batch of %s reflecting streams", len(batch))
await asyncio.gather(*batch) await asyncio.gather(*batch, loop=self.loop)
log.debug("done processing %s streams", len(batch)) log.debug("done processing %s streams", len(batch))
await asyncio.sleep(300) await asyncio.sleep(300, loop=self.loop)
async def start(self): async def start(self):
await super().start() await super().start()
self.re_reflect_task = self.loop.create_task(self.reflect_streams()) self.re_reflect_task = self.loop.create_task(self.reflect_streams())
async def stop(self): def stop(self):
await super().stop() super().stop()
if self.resume_saving_task and not self.resume_saving_task.done(): if self.resume_saving_task and not self.resume_saving_task.done():
self.resume_saving_task.cancel() self.resume_saving_task.cancel()
if self.re_reflect_task and not self.re_reflect_task.done(): if self.re_reflect_task and not self.re_reflect_task.done():
@ -224,8 +227,7 @@ class StreamManager(SourceManager):
) )
return task return task
@staticmethod async def _retriable_reflect_stream(self, stream, host, port):
async def _retriable_reflect_stream(stream, host, port):
sent = await stream.upload_to_reflector(host, port) sent = await stream.upload_to_reflector(host, port)
while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0: while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0:
stream.reflector_progress = 0 stream.reflector_progress = 0
@ -260,7 +262,7 @@ class StreamManager(SourceManager):
return return
if source.identifier in self.running_reflector_uploads: if source.identifier in self.running_reflector_uploads:
self.running_reflector_uploads[source.identifier].cancel() self.running_reflector_uploads[source.identifier].cancel()
await source.stop_tasks() source.stop_tasks()
if source.identifier in self.streams: if source.identifier in self.streams:
del self.streams[source.identifier] del self.streams[source.identifier]
blob_hashes = [source.identifier] + [b.blob_hash for b in source.descriptor.blobs[:-1]] blob_hashes = [source.identifier] + [b.blob_hash for b in source.descriptor.blobs[:-1]]

View file

@ -87,7 +87,7 @@ class TorrentHandle:
self._show_status() self._show_status()
if self.finished.is_set(): if self.finished.is_set():
break break
await asyncio.sleep(0.1) await asyncio.sleep(0.1, loop=self._loop)
async def pause(self): async def pause(self):
await self._loop.run_in_executor( await self._loop.run_in_executor(
@ -150,7 +150,7 @@ class TorrentSession:
await self._loop.run_in_executor( await self._loop.run_in_executor(
self._executor, self._pop_alerts self._executor, self._pop_alerts
) )
await asyncio.sleep(1) await asyncio.sleep(1, loop=self._loop)
async def pause(self): async def pause(self):
await self._loop.run_in_executor( await self._loop.run_in_executor(

View file

@ -36,7 +36,7 @@ class Torrent:
def __init__(self, loop, handle): def __init__(self, loop, handle):
self._loop = loop self._loop = loop
self._handle = handle self._handle = handle
self.finished = asyncio.Event() self.finished = asyncio.Event(loop=loop)
def _threaded_update_status(self): def _threaded_update_status(self):
status = self._handle.status() status = self._handle.status()
@ -58,7 +58,7 @@ class Torrent:
log.info("finished downloading torrent!") log.info("finished downloading torrent!")
await self.pause() await self.pause()
break break
await asyncio.sleep(1) await asyncio.sleep(1, loop=self._loop)
async def pause(self): async def pause(self):
log.info("pause torrent") log.info("pause torrent")

View file

@ -74,7 +74,7 @@ class TorrentSource(ManagedDownloadSource):
def bt_infohash(self): def bt_infohash(self):
return self.identifier return self.identifier
async def stop_tasks(self): def stop_tasks(self):
pass pass
@property @property
@ -118,8 +118,8 @@ class TorrentManager(SourceManager):
async def start(self): async def start(self):
await super().start() await super().start()
async def stop(self): def stop(self):
await super().stop() super().stop()
log.info("finished stopping the torrent manager") log.info("finished stopping the torrent manager")
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):

View file

@ -450,8 +450,8 @@ def is_running_from_bundle():
class LockWithMetrics(asyncio.Lock): class LockWithMetrics(asyncio.Lock):
def __init__(self, acquire_metric, held_time_metric): def __init__(self, acquire_metric, held_time_metric, loop=None):
super().__init__() super().__init__(loop=loop)
self._acquire_metric = acquire_metric self._acquire_metric = acquire_metric
self._lock_held_time_metric = held_time_metric self._lock_held_time_metric = held_time_metric
self._lock_acquired_time = None self._lock_acquired_time = None

View file

@ -1064,182 +1064,4 @@ HASHES = {
1062000: 'c44d02a890aa66979b10d1cfa597c877f498841b4e12dd9a7bdf8d4a5fccab80', 1062000: 'c44d02a890aa66979b10d1cfa597c877f498841b4e12dd9a7bdf8d4a5fccab80',
1063000: '1c093734f5f241b36c1b9971e2759983f88f4033405a2588b4ebfd6998ac7465', 1063000: '1c093734f5f241b36c1b9971e2759983f88f4033405a2588b4ebfd6998ac7465',
1064000: '9e354a83b71bbb9704053bfeea038a9c3d5daad080c6406c698b047c634706a6', 1064000: '9e354a83b71bbb9704053bfeea038a9c3d5daad080c6406c698b047c634706a6',
1065000: '563188accc4a6e311bd5046516a92a233f11f891b2304d37f151c5a6002b6958',
1066000: '333f1b4e996fac87e32dec667533715b31f1736b4342806a81d568b5c5238456',
1067000: 'df59a0b7319d5269bdf55043d91ec62bbb30829bb7054da623717a394b6ed678',
1068000: '06d8b674a205393edaf20c1d837baadc9caf0b0a675645246263cc163302241d',
1069000: 'ac065c48fad1383039d39e23c8367bad7cf9a37e07a5294cd7b04af5827b9961',
1070000: '90cd8b50f94208bc459081356474a961f6b764a1217f8fd291f5e4828081b730',
1071000: '3c0aa207ba9eea45458ab4fa26d6a027862592adb9bcce30915816e777dc6cfc',
1072000: '3d556c08f2300b67b704d3cbf46e22866e3ac164472b5930e2ada23b08475a0f',
1073000: 'a39b5c54c24efe3066aa203358b96baea405cd59aac6b0b48930e77799b4dd7d',
1074000: 'e8c8273d5a50a60e8744716c9f31496fb29eca87b4d68643f4ecd7ec4e400e23',
1075000: 'b8043ae41a1d0d7d4310c85764fcba1424733df347ffc2e8cbda1fe6ccbb5153',
1076000: '58468db1f91805e767d334824d6bffe54e0f900d1fb2a89b105086a493053b3d',
1077000: '04a78749b58465efa3a56d1735cd082c1f0f796e26486c7136950dbaf6effaa4',
1078000: 'e1dd6b58c75b01a67d4a4594dc7b4b2ee9e7d7fa7b25fd6246ce0e86eff33c75',
1079000: 'd239af017a6bb664485b14ad15e0eb703775e43018a045a8612b3697794460da',
1080000: '29ae5503f8c1249fefeb63fd967a71a70588ee0db1c97497e16366163a684341',
1081000: '05103ab27469e0859cbcd3daf42faa2bae798f522534697c7f2b34f7a050ee0f',
1082000: '4553d2cb7e90b6db11d242e287fe96822e6cd60e6388b94bf9006411f202ba03',
1083000: '97995acd178b2a142d571d5ae1c2a3deaf93a909fd91fb9c541d57f73e32dc99',
1084000: '9e3f23376af14d76ab24cd54e321dec019af73ad61067d959ff90043acc5ffcc',
1085000: '81c056b14f13cee0d6d6c8079fdd5a1a84c3a5c76cc9448612e8ef6d3531300e',
1086000: '8a0004f6809bdd075915a804e43991dfe8f22e05679d2fdaf8e373f101bac5c2',
1087000: '27c45a4c9ad24e038f2ebe40835a1c49ac7221d7185082866ee354351ba87c7a',
1088000: 'fd27e21747117b00b4ada1cba161ac49edb57cca540f86ac5ba885050f08f824',
1089000: 'bff867335767103bc3ed15ede5b9fde88016f8ede15dc5bf3e81ea40dcfc61ae',
1090000: '608f75016d1db08888dd59640f63e838c19bdfa833c0cc177ad3d2b818b0db5b',
1091000: '90750b452bd4dedaab6b57fecbfe88f71ce3d5437fad7f9ec0fdd270445c7526',
1092000: '98287b39f9f1233017dc5d932e5c77f0521ca84587eb3f39f0e7b6c297c749af',
1093000: '68a5846ed05c9bb142197849106838765f90f15c10b2cc938eef49b95eaa9d33',
1094000: '5660a1aac2fc763a417fc656c8887fc8186bf613ae1ccbb1a664fb43ce1fa1d6',
1095000: '62bad3db418b3f4cad3596881b645b72479c71deb0d39c7a4c8bd1577dc225fd',
1096000: 'e0e4b2b183591f10dd5614c289412f2fb5e320b7d3278f7c028f42f591872666',
1097000: 'a233a233fc2aa5dab9e75106d91388343ef969458ea974f1409a2ab5fc441911',
1098000: '16dfa5fa6cbd1188e562697b5f00ac206960d0851ed84adf37ae975fd5ffdd6a',
1099000: 'b8a870b7dc6d3263730c00f59d52aa6cce35dc59aa8fba715034cc2d14927260',
1100000: 'a3cd7749743da22a3846dcc2edbf1df21b938e829419389e3bc09284797c5b43',
1101000: '75b14c2a95e2a095949729b7c0b624bd725a2de98404a8e3247b60c977d0198e',
1102000: '4d3af64d37064dd5f57e25d61f248a1e21c1b1cadd7bb1404e35c9fbe06f1fd4',
1103000: 'd73c92bfed358dfcd7659228974ab75ea2fc86f2301ee47133adad8075203872',
1104000: '30cd82354f37bc0b412123867c7e1835206022a7501853bf8c0d3df02f291645',
1105000: '1d2ef984f26693dce77460cd2694e5da46e675077e91a1cea26051733b01a7ef',
1106000: '51c076c304222fe3ca308ba6968c46fef448f85be13a095cecb75b90e7954698',
1107000: '99e2221339e16acc34c9816f2ef7b866c2dd753aa3cbe484ae831959a23ece68',
1108000: '0f1227c250296bfe88eb7eb41703f99f633cfe02870816111e0cadfe778ddb19',
1109000: 'b35447f1ad76f95bc4f5886e4028d33acb3ad7b5000dd15516d3f11ce4baa990',
1110000: 'ac7baff996062bfaaaddd7d496b17e3ec1c8d34b2143095645ff22fb3888ae00',
1111000: '430bbbdcca36b2d69b6a2dd8b07c583a060a467e5f9acbc6de62462e1f7c7036',
1112000: 'e5274dea029dc44baff55c05b0555f91b74d29ffd40e3a8c4e2c5b57f9d40bef',
1113000: 'cf43863249fa42cfe108220dd40169dac702b0dd9cf5cb699cf2fc96feda8371',
1114000: 'fa1c0e551784d21c451564124d2d730e616724f3e535de3c186bcdeb47e80a8f',
1115000: '49fe6ecee35a397b83b5a704e950ad028cfb4b7e7a524021e789f4acc0fd6ffe',
1116000: '74ecded36751aa8b7901b31f0d16d75d111fc3c40b567f649c04f74ed028aa5c',
1117000: 'd9ca760a22190bdf545766b47d963c738a4edcc27f4d15ca801b35751577cfa7',
1118000: 'c28d42f871682800ac4e867608227cfb6bc4c00b618e83a8556f201a1c28813c',
1119000: 'c5fafc4e1785b0b9e84bb052e392154a5ba1aefe612998017e90772bcd554e08',
1120000: 'aa054d428bc9ccee0761da92163817163413065fe1e67ef79a056c5233ea3476',
1121000: '0df295bb944218503bd1bf66d2ece0c50fd22dae3391b80673a7ad1e4e5c3934',
1122000: 'a13abb350a26673b3933b1de307a60a6845ca594d502599548c6253e21a6d8e8',
1123000: 'a4bc6a3abf9ed1f4b14338ff0f03f83456312bc91a93fa89ae6db493050115e1',
1124000: '65869938df99adf0dda76200291ce09a54c9bcc787e4bb62cd72c367db58f4f0',
1125000: 'ea5e918233b14c3c73d488a906e3741c61bdcafe0393bd0404168fe80c950a46',
1126000: 'ce88cd35104fcec51bcee77302e03162dc694802536f5b668786b2245e61bca5',
1127000: 'ea19c0c8d205be4be87d02c5301c9ed331e7d75e25b93d1c2137c248882af515',
1128000: '006f32d63c2a3adcf4fbad0b0629c97f1beab6446a9c27fbde9472f2d066219e',
1129000: '218e5392e1ecf471c3bbc3d79c24dee30ac8db315dbeb61317318efb3f221163',
1130000: '30b9da0bd8364e9cd5551b2529341a01a3b7257a238d15b2560e2c99fdb324e8',
1131000: '8a7f382cfa023d2eba6639443e67206f8883b57d23ce7e1339234b8bb3098a82',
1132000: 'bf9af68a6fe2112d8fe311dfd52334ae2e7b0bac6675c9ebfddb1f386c212668',
1133000: '1a30951e2be633502a47c255a93ddbb9ed231d6bb4c55a807c0e910b437766b3',
1134000: 'a9bcaf3300b7915e701a8e396eb13f0c7287576323420be7aab3c3ba48020f76',
1135000: '337eed9ed072b5ad862af2d3d651f1b49fa852abc590b7e1c2dc381b496f438a',
1136000: '208761dbc29ec58302d722a05e937a3cf9e78bfb6495be395dd7b54f02e169dc',
1137000: '4e5b67ff3324b64e268049fdc3d82982b847ee359d409ade6368864c38a111e5',
1138000: '55d1d0833021a664e85eec8cc90a0985e67cc80d28841aaa8c2231ec28087ebb',
1139000: 'e750ada1ec9fa0f2f2461ed68958c7d116a699a82ec12911da5563139f8df19e',
1140000: '9cf81407b6ccc8046f0233f97484166945758f7392bb54841c912fcb34cf205c',
1141000: 'fccf32b2fae03e3b6b562483776625f9843cd68734c55659e2069cde7e383170',
1142000: 'c3608c215dd6569da6c1871c4d72a09ab1caa9663647f2a9454b5693d5d72a65',
1143000: 'bd39cb8c4e529d15bbea6baeec66afe52ca18afe32bd812f28fbb0676647cdff',
1144000: '6e42d02538565ce7e2d9bf31a304f1fd0ac122d35d17a030160575815901b0b1',
1145000: 'b9722e1de2904ce1219140fffb1f4f9f5a041f885faa634404238d103c738b4c',
1146000: 'd4de4271459966cee774f538a243d7db0689b213b296463d42e45c93194d7861',
1147000: '51fadf109f22bb85574d0fbcbd0b20992983e89aee3d415a7b1c37c44775d9a9',
1148000: '137e1fe8da31680d21a42e7421eb608a883a497314e4404625ce44b0edadde6a',
1149000: 'cb87867eb04203ce15e0763a2f4389376cea75e0a2877f55e2911c575bef07a8',
1150000: '977528ca7953a2c9c19fefaa3aab7ebdec3ac324d74a07d83764ba25d9be0689',
1151000: 'a09c51c832600ded63a19201df008075273ea248fd406886e93a2cbaa3bba46b',
1152000: '0e5367cfa0f00dd932a5bcc00dcc807fa6825161806bed588e16a57947b4b32d',
1153000: '55a9de3dcde2efb56a3c5fea7d22b98c1e180db9a4d4f4f6be7aae1f1cbd7608',
1154000: 'abc58cf71c4691ebfaef920252730cf69abbe9de88b424c03051b9b03e85d45a',
1155000: '4f074ce73c8a096620b8a32498362eb66a072eae95d561f2d53557cd513ae785',
1156000: '540a838a0f0a8834466b17dd456d35b8acae2ec8419f8bd9a704d9ea439062ac',
1157000: 'd5310ac671abdb658ea028db86c23fc729af965f91d67a37218c1412cf32a1f5',
1158000: '162d906a07e6c35e7c3ebf7069a200521605a97920f5b589d31b19bfd7766ee2',
1159000: '600bd8f5e1e62219e220f4dcb650db5812e79956f95ae8a50e83126932685ee0',
1160000: '91319398d1a805fac8582c8485e6d84e7490d6cfa6e44e2c630665b6bce0e6b8',
1161000: 'f7ad3cff6ee76e1e3df4abe70c600e4af66e1df55bf7b03aee12251d4455a1d4',
1162000: '85b9fbba669c2a4d3f85cdb5123f9538c05bd66172b7236d756703f99258454d',
1163000: '966085d767d1e5e2e8baf8eda8c11472ec5351181c418b503585284009aaea79',
1164000: '1c94e1b531215c019b12caf407296d8868481f49524b7180c7161b0363c1f789',
1165000: '803b6bf93735aeae2cf607824e2adf0d754b58da2516c2da1e485c697e472143',
1166000: '872561a82f7991633d0927d25cb659d096bbe556fe6dac7a0b6a679820733069',
1167000: '6bd7cdd605a3179b54c8af88d1638bf8133fab12cbf0a78d37cf21eddf4395a1',
1168000: '79946f5758c1817239cc642d27298bd710983551a8236e49832c6d818b097337',
1169000: 'b0994c60728e74de4aa361f37fa85e5296ce3188ae4e0b66d7b34fe86a239c9c',
1170000: 'a54188a5a64e0cf8da2406d16a0ac3983b087fc7d6231b6f8abf92cf11dc78cd',
1171000: 'ec2924d98e470cc6359821e6468df2c15d60301861d443188730342581230ef2',
1172000: 'b4ac11116aa73ce19428009a80e583e19dc9bcd380f7f7ce272a92921d5868d2',
1173000: '501d3551f762999dd5a799f3c5658fff2a7f3aff0511488272cd7693fefb8f9d',
1174000: '4660074ea48a78ae453cb14b694b2844cc0fb63ed9352ed20d11158bbb5c1f28',
1175000: '0727f6b1d9f8fe5677a9ffa0d475f53f5a419ef90b80896c22c2c95de22175de',
1176000: '150633d6a35496c24a93c9e19817e90f649c56b7e2558f99e97325bfd5df8b17',
1177000: '0849e19f22571b62dba8ff02f6b5a064a7ac36e7ed491321b3663567e8e17294',
1178000: '770dd463e7bad80f689f12934e4ae06e24378d1545dcf211fd143beaef49464e',
1179000: '059d383dcc60a49b658b674d92fc35cab07b06329c58d73818b6387cb0c06534',
1180000: 'e547cb3c636243ca9ae4cfb92c30a0f583eda84e329a5c1e5f64a26fc6fc791e',
1181000: '4521a4396ab02f73d45d7a3393ea1c602d255778d52c12079c88bfbad32aab43',
1182000: '051cfe993e4b0b34233403a9e8c397dd50e8b78a30fb07e9c260604ee9e624a9',
1183000: '44a69c99bb8b85e84ae279f2d8e5400d51cb3d5f0bcd178db49d55548cd66191',
1184000: '2a1d23c9bb3c71a533e0c9d25b03bfa7e9db8e014645f3e7fbede6d99fff0191',
1185000: 'bb90d6c6d77819163a9e909ee621d874707cdb21c91b1d9e861b204cf37d0ffa',
1186000: '4a92051b738ea0e28c64c64f1eb6f0405bc7c3427bef91ff20f4c43cf084d750',
1187000: 'f782ac330ca20fb5d8a094ee0f0f8c086a76e3f03ecc6a2c42f8fd07e52e0f41',
1188000: '94cb7b653dd3d838c186420158cf0e73db73ec28deaf67d9a2ca902caba4141a',
1189000: 'c8128e59b9ec948de890184578a113478ea63f7d57cb75c2c8d5c001a5a724c0',
1190000: '4da643bd35e5b98932ae21515a6bffb9c72f2cd8d514cd2d7eac1922af785c3f',
1191000: '0f922d86658ac3f53c5f9db360c68ab3f3253a925f23e1323820e3384214719a',
1192000: '4c3ab631cf5ba0c236f7c64af6f790fc24448319de6f75dbd28df4e2648d0b7d',
1193000: 'eda118d1fac3470a1f8f01f5c78108c8ecdcd6420be30f6d20f1d1831e7b6975',
1194000: '5723fff88abd9bb5088476fa5f4221a61c6f8a718703a92f13248ad350abeea2',
1195000: '1715846f82d011919e3446c6ce675a65fb80338bd791d4e735702c4767d9adc4',
1196000: 'b497667996aee2db61e88f442e728be15ab0b2b64cfd43198691fcf6cdafacc8',
1197000: '309a6170d837b8cb334fb888a64ed4e47e6592747e93c8e9d1bf7d608cfef87d',
1198000: '3ea918ef64a67dec20051519e6aefaeb7aca2d8583baca9ad5c5bd07073e513a',
1199000: '4ec7b7361b0243e5b2996a16e3b27acd662126b95fe542a487c7030e47ea3667',
1200000: 'b829c742686fcd642d0f9443336d7e2c4eab81667c90ce553df1350ed10b4233',
1201000: '44c022887f1e126fd281b1cae26b2017fa6415a64b105762c87643204ce165a5',
1202000: 'b11cc739eb28a14f4e47be125aa7e62d6d6f90c8f8014ee70044ed506d53d938',
1203000: '997a7c5fd7a98b39c9ca0790519924d73c3567656b605c97a6fdb7b406c3c64d',
1204000: '7d25d872e17195ee277243f7a5a39aa64d8750cec62e4777146acf61a8e76b04',
1205000: 'ce8486ae745a4645bee081ef3291d9505174bed05b0668d963b2998b7643dbb0',
1206000: '46a0bcea3c411c600dffe3e06e3d1dfbf5879a7ec4dcf3848e794cefcbf2bc0b',
1207000: '37e6297bf6e4e2bdd40401d4d7f95e3e3bdafd4a7f76b9c52865cefc6b82b20b',
1208000: 'd09e3982a9827b8cf56a5a2f4031dc6b082926c1fd57b63beaaa6cfd534eb902',
1209000: '54ae9010a9f146c83464e7ee60b30d9dbee36418561abc4e8d61bce9baa2d21d',
1210000: '5dcfd33f8e5ac21c9ba8553758b8cd8afae7961cad428530b5109c2db2ebf39f',
1211000: '91c952348bb2c3dfac0d6531a3dac770ea6dab571af257530e9c55493c96bdd9',
1212000: 'e62cc3fe044a7f5de4c04a8aed5619548f9d5c6fad9f989d3382cb96de1d780d',
1213000: '66b46ffdca8acf1dd04528dadb28b6ac4ce38807c1b84abd685d4ddb3dc59a34',
1214000: '2ce4091756ad23746bab4906f46545953cadaf61deae0d78e8a10d4eb51866b1',
1215000: '83ce3ca087799cdc4b4c5e7cfeb4a127708724a7ca76aa5f7f4ec1ed48b5fca6',
1216000: '7d07b739b7991fbd74926281bf51bba9d5721afab39598720f9ff5f7410a6721',
1217000: '76adf49491670d0e8379058eacf0228f330f3c18955dfea1ebe43bc11ee065f3',
1218000: '77f422e7301a81692dec69e5c6d35fa988a00a4d820ad0ebb1d595add36558cc',
1219000: '8ba9d944f8c468c81799294aeea8dc05ed1bb90bb26552fcd190bd88fedcddf2',
1220000: '00330367c255e0fe51b374597995c53353bc5700ad7d603cbd4197141933fe9c',
1221000: '3ba8b316b7964f31fdf628ed869a6fd023680cca6611257a31efe22e4d17e578',
1222000: '016e58d3fb6a29a3f9281789359460e776e9feb2f0db500482b6e231e1272aef',
1223000: 'fdfe767c29a3de7acd913b627d1e5fa887a1af9974f6a8a6474db822468c785c',
1224000: '92239f6207bff3689c554e92b24fe2e7be4a2203104ad8ef08b2c6bedd9aeccf',
1225000: '9a2f2dd9527b533d3d743efc55236e73e15192171bc8d0cd910918d1ab00aef7',
1226000: 'eb8269c75b8c5f66e6ea88ad70883dddcf8a75a45198ca7a46eb0ec606a791bb',
1227000: '5c82e624390cd57942dc9d64344eaa3d8991e0437e01802473053245b706290c',
1228000: '51e9a7d727f07fc01be7c03e3dd854eb666697f05bf89259baac628520d4402c',
1229000: 'c4bfdb651c9abdeda717fb9c8a4c8a6c9c0f78c13d3e6cae3f24f504d734c643',
1230000: '9f1ce781d16f2334567cbfb22fff42c14d2b9290cc2883746f435a1fb127021d',
1231000: '5c996634b377412ae0a3d8f541f3cc4a354aab72c198aa23a5cfc2678cbabf09',
1232000: '86702316a2d1730fbae01a08f36fffe5bf6d3ebb7d76b35a1617713766698b46',
1233000: 'fb16b63916c0287cb9b01d0c5aad626ced1b73c49a374c9009703aa90fd27a82',
1234000: '7c6f7904602ccd86bfb05cb8d6b5547c989c57cb2e214e93f1220fa4fe29bcb0',
1235000: '898b0f20811f52aa5a6bd0c35eff86fca3fbe3b066e423644fa77b2e269d9513',
1236000: '39128910ef624b6a8bbd390a311b5587c0991cda834eed996d814fe410cac352',
1237000: 'a0709afeedb64af4168ce8cf3dbda667a248df8e91da96acb2333686a2b89325',
1238000: 'e00075e7ba8c18cc277bfc5115ae6ff6b9678e6e99efd6e45f549ef8a3981a3d',
1239000: '3fba891600738f2d37e279209d52bbe6dc7ce005eeed62048247c96f370e7cd5',
1240000: 'def9bf1bec9325db90bb070f532972cfdd74e814c2b5e74a4d5a7c09a963a5f1',
1241000: '6a5d187e32bc189ac786959e1fe846031b97ae1ce202c22e1bdb1d2a963005fd',
1242000: 'a74d7c0b104eaf76c53a3a31ce51b75bbd8e05b5e84c31f593f505a13d83634c',
} }

View file

@ -141,7 +141,7 @@ class CoinSelector:
_) -> List[OutputEffectiveAmountEstimator]: _) -> List[OutputEffectiveAmountEstimator]:
""" Accumulate UTXOs at random until there is enough to cover the target. """ """ Accumulate UTXOs at random until there is enough to cover the target. """
target = self.target + self.cost_of_change target = self.target + self.cost_of_change
self.random.shuffle(txos, random=self.random.random) # pylint: disable=deprecated-argument self.random.shuffle(txos, self.random.random)
selection = [] selection = []
amount = 0 amount = 0
for coin in txos: for coin in txos:

View file

@ -329,10 +329,10 @@ class Ledger(metaclass=LedgerRegistry):
async def start(self): async def start(self):
if not os.path.exists(self.path): if not os.path.exists(self.path):
os.mkdir(self.path) os.mkdir(self.path)
await asyncio.wait(map(asyncio.create_task, [ await asyncio.wait([
self.db.open(), self.db.open(),
self.headers.open() self.headers.open()
])) ])
fully_synced = self.on_ready.first fully_synced = self.on_ready.first
asyncio.create_task(self.network.start()) asyncio.create_task(self.network.start())
await self.network.on_connected.first await self.network.on_connected.first
@ -466,9 +466,9 @@ class Ledger(metaclass=LedgerRegistry):
async def subscribe_accounts(self): async def subscribe_accounts(self):
if self.network.is_connected and self.accounts: if self.network.is_connected and self.accounts:
log.info("Subscribe to %i accounts", len(self.accounts)) log.info("Subscribe to %i accounts", len(self.accounts))
await asyncio.wait(map(asyncio.create_task, [ await asyncio.wait([
self.subscribe_account(a) for a in self.accounts self.subscribe_account(a) for a in self.accounts
])) ])
async def subscribe_account(self, account: Account): async def subscribe_account(self, account: Account):
for address_manager in account.address_managers.values(): for address_manager in account.address_managers.values():
@ -938,7 +938,9 @@ class Ledger(metaclass=LedgerRegistry):
"%d change addresses (gap: %d), %d channels, %d certificates and %d claims. ", "%d change addresses (gap: %d), %d channels, %d certificates and %d claims. ",
account.id, balance, total_receiving, account.receiving.gap, total_change, account.id, balance, total_receiving, account.receiving.gap, total_change,
account.change.gap, channel_count, len(account.channel_keys), claim_count) account.change.gap, channel_count, len(account.channel_keys), claim_count)
except Exception: except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.exception( log.exception(
'Failed to display wallet state, please file issue ' 'Failed to display wallet state, please file issue '
'for this bug along with the traceback you see below:') 'for this bug along with the traceback you see below:')
@ -961,7 +963,9 @@ class Ledger(metaclass=LedgerRegistry):
claim_ids = [p.purchased_claim_id for p in purchases] claim_ids = [p.purchased_claim_id for p in purchases]
try: try:
resolved, _, _, _ = await self.claim_search([], claim_ids=claim_ids) resolved, _, _, _ = await self.claim_search([], claim_ids=claim_ids)
except Exception: except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.exception("Resolve failed while looking up purchased claim ids:") log.exception("Resolve failed while looking up purchased claim ids:")
resolved = [] resolved = []
lookup = {claim.claim_id: claim for claim in resolved} lookup = {claim.claim_id: claim for claim in resolved}
@ -989,6 +993,12 @@ class Ledger(metaclass=LedgerRegistry):
results.append(resolved) results.append(resolved)
else: else:
if isinstance(resolved, dict) and 'error' in resolved: if isinstance(resolved, dict) and 'error' in resolved:
if resolved['error'].get('name') == 'BLOCKED' and txo.is_claim and txo.claim.is_repost:
txo.meta['blocked_repost'] = await self.get_claim_by_claim_id(
txo.claim.repost.reference.claim_id,
accounts=accounts
)
else:
txo.meta['error'] = resolved['error'] txo.meta['error'] = resolved['error']
results.append(txo) results.append(txo)
return results return results
@ -1041,7 +1051,9 @@ class Ledger(metaclass=LedgerRegistry):
claim_ids = collection.claim.collection.claims.ids[offset:page_size + offset] claim_ids = collection.claim.collection.claims.ids[offset:page_size + offset]
try: try:
resolve_results, _, _, _ = await self.claim_search([], claim_ids=claim_ids) resolve_results, _, _, _ = await self.claim_search([], claim_ids=claim_ids)
except Exception: except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.exception("Resolve failed while looking up collection claim ids:") log.exception("Resolve failed while looking up collection claim ids:")
return [] return []
claims = [] claims = []

View file

@ -117,7 +117,7 @@ class ClientSession(BaseClientSession):
) )
else: else:
await asyncio.sleep(max(0, max_idle - (now - self.last_send))) await asyncio.sleep(max(0, max_idle - (now - self.last_send)))
except (Exception, asyncio.CancelledError) as err: except Exception as err:
if isinstance(err, asyncio.CancelledError): if isinstance(err, asyncio.CancelledError):
log.info("closing connection to %s:%i", *self.server) log.info("closing connection to %s:%i", *self.server)
else: else:
@ -214,7 +214,7 @@ class Network:
def loop_task_done_callback(f): def loop_task_done_callback(f):
try: try:
f.result() f.result()
except (Exception, asyncio.CancelledError): except Exception:
if self.running: if self.running:
log.exception("wallet server connection loop crashed") log.exception("wallet server connection loop crashed")
@ -312,8 +312,7 @@ class Network:
sleep_delay = 30 sleep_delay = 30
while self.running: while self.running:
await asyncio.wait( await asyncio.wait(
map(asyncio.create_task, [asyncio.sleep(30), self._urgent_need_reconnect.wait()]), [asyncio.sleep(30), self._urgent_need_reconnect.wait()], return_when=asyncio.FIRST_COMPLETED
return_when=asyncio.FIRST_COMPLETED
) )
if self._urgent_need_reconnect.is_set(): if self._urgent_need_reconnect.is_set():
sleep_delay = 30 sleep_delay = 30
@ -339,7 +338,7 @@ class Network:
try: try:
if not self._urgent_need_reconnect.is_set(): if not self._urgent_need_reconnect.is_set():
await asyncio.wait( await asyncio.wait(
[self._keepalive_task, asyncio.create_task(self._urgent_need_reconnect.wait())], [self._keepalive_task, self._urgent_need_reconnect.wait()],
return_when=asyncio.FIRST_COMPLETED return_when=asyncio.FIRST_COMPLETED
) )
else: else:

View file

@ -214,7 +214,6 @@ class SPVNode:
self.port = 50001 + node_number # avoid conflict with default daemon self.port = 50001 + node_number # avoid conflict with default daemon
self.udp_port = self.port self.udp_port = self.port
self.elastic_notifier_port = 19080 + node_number self.elastic_notifier_port = 19080 + node_number
self.elastic_services = f'localhost:9200/localhost:{self.elastic_notifier_port}'
self.session_timeout = 600 self.session_timeout = 600
self.stopped = True self.stopped = True
self.index_name = uuid4().hex self.index_name = uuid4().hex
@ -236,7 +235,7 @@ class SPVNode:
'host': self.hostname, 'host': self.hostname,
'tcp_port': self.port, 'tcp_port': self.port,
'udp_port': self.udp_port, 'udp_port': self.udp_port,
'elastic_services': self.elastic_services, 'elastic_notifier_port': self.elastic_notifier_port,
'session_timeout': self.session_timeout, 'session_timeout': self.session_timeout,
'max_query_workers': 0, 'max_query_workers': 0,
'es_index_prefix': self.index_name, 'es_index_prefix': self.index_name,
@ -264,6 +263,7 @@ class SPVNode:
await self.server.start() await self.server.start()
except Exception as e: except Exception as e:
self.stopped = True self.stopped = True
if not isinstance(e, asyncio.CancelledError):
log.exception("failed to start spv node") log.exception("failed to start spv node")
raise e raise e

View file

@ -395,8 +395,8 @@ class RPCSession(SessionBase):
namespace=NAMESPACE, labelnames=("version",) namespace=NAMESPACE, labelnames=("version",)
) )
def __init__(self, *, framer=None, connection=None): def __init__(self, *, framer=None, loop=None, connection=None):
super().__init__(framer=framer) super().__init__(framer=framer, loop=loop)
self.connection = connection or self.default_connection() self.connection = connection or self.default_connection()
self.client_version = 'unknown' self.client_version = 'unknown'

View file

@ -139,10 +139,6 @@ class Wallet:
'accounts': [a.to_dict(encrypt_password) for a in self.accounts] 'accounts': [a.to_dict(encrypt_password) for a in self.accounts]
} }
def to_json(self):
assert not self.is_locked, "Cannot serialize a wallet with locked/encrypted accounts."
return json.dumps(self.to_dict())
def save(self): def save(self):
if self.preferences.get(ENCRYPT_ON_DISK, False): if self.preferences.get(ENCRYPT_ON_DISK, False):
if self.encryption_password is not None: if self.encryption_password is not None:
@ -169,7 +165,8 @@ class Wallet:
def pack(self, password): def pack(self, password):
assert not self.is_locked, "Cannot pack a wallet with locked/encrypted accounts." assert not self.is_locked, "Cannot pack a wallet with locked/encrypted accounts."
new_data_compressed = zlib.compress(self.to_json().encode()) new_data = json.dumps(self.to_dict())
new_data_compressed = zlib.compress(new_data.encode())
return better_aes_encrypt(password, new_data_compressed) return better_aes_encrypt(password, new_data_compressed)
@classmethod @classmethod
@ -182,8 +179,6 @@ class Wallet:
raise InvalidPasswordError() raise InvalidPasswordError()
if "unknown compression method" in e.args[0].lower(): if "unknown compression method" in e.args[0].lower():
raise InvalidPasswordError() raise InvalidPasswordError()
if "invalid window size" in e.args[0].lower():
raise InvalidPasswordError()
raise raise
return json.loads(decompressed) return json.loads(decompressed)
@ -191,9 +186,6 @@ class Wallet:
password: str, data: str) -> (List['Account'], List['Account']): password: str, data: str) -> (List['Account'], List['Account']):
assert not self.is_locked, "Cannot sync apply on a locked wallet." assert not self.is_locked, "Cannot sync apply on a locked wallet."
added_accounts, merged_accounts = [], [] added_accounts, merged_accounts = [], []
if password is None:
decrypted_data = json.loads(data)
else:
decrypted_data = self.unpack(password, data) decrypted_data = self.unpack(password, data)
self.preferences.merge(decrypted_data.get('preferences', {})) self.preferences.merge(decrypted_data.get('preferences', {}))
for account_dict in decrypted_data['accounts']: for account_dict in decrypted_data['accounts']:

View file

@ -28,7 +28,6 @@ disable=
no-else-return, no-else-return,
cyclic-import, cyclic-import,
missing-docstring, missing-docstring,
consider-using-f-string,
duplicate-code, duplicate-code,
expression-not-assigned, expression-not-assigned,
inconsistent-return-statements, inconsistent-return-statements,

View file

@ -18,7 +18,7 @@ setup(
long_description_content_type="text/markdown", long_description_content_type="text/markdown",
keywords="lbry protocol media", keywords="lbry protocol media",
license='MIT', license='MIT',
python_requires='>=3.8', python_requires='>=3.7',
packages=find_packages(exclude=('tests',)), packages=find_packages(exclude=('tests',)),
zip_safe=False, zip_safe=False,
entry_points={ entry_points={
@ -36,7 +36,7 @@ setup(
'distro==1.4.0', 'distro==1.4.0',
'base58==1.0.0', 'base58==1.0.0',
'cffi==1.13.2', 'cffi==1.13.2',
'cryptography==3.4.7', 'cryptography==2.5',
'protobuf==3.17.2', 'protobuf==3.17.2',
'prometheus_client==0.7.1', 'prometheus_client==0.7.1',
'ecdsa==0.13.3', 'ecdsa==0.13.3',
@ -50,14 +50,14 @@ setup(
], ],
extras_require={ extras_require={
'lint': [ 'lint': [
'pylint==2.13.9' 'pylint==2.10.0'
], ],
'test': [ 'test': [
'coverage', 'coverage',
'jsonschema==4.4.0', 'jsonschema==4.4.0',
], ],
'hub': [ 'hub': [
'hub@git+https://github.com/lbryio/hub.git@929448d64bcbe6c5e476757ec78456beaa85e56a' 'hub@git+https://github.com/lbryio/hub.git@024aceda53fe6d1ab8d519b73584437c25de6975'
] ]
}, },
classifiers=[ classifiers=[

View file

@ -51,8 +51,7 @@ def mock_network_loop(loop: asyncio.AbstractEventLoop,
return rx.datagram_received(data, from_addr) return rx.datagram_received(data, from_addr)
protocol = proto_lam() protocol = proto_lam()
transport = mock.Mock(spec=asyncio.DatagramTransport) transport = asyncio.DatagramTransport(extra={'socket': mock_sock})
transport.get_extra_info = lambda k: {'socket': mock_sock}[k]
transport.is_closing = lambda: False transport.is_closing = lambda: False
transport.close = lambda: mock_sock.close() transport.close = lambda: mock_sock.close()
mock_sock.sendto = sendto mock_sock.sendto = sendto
@ -61,6 +60,7 @@ def mock_network_loop(loop: asyncio.AbstractEventLoop,
dht_network[from_addr] = protocol dht_network[from_addr] = protocol
return transport, protocol return transport, protocol
with mock.patch('socket.socket') as mock_socket:
mock_sock = mock.Mock(spec=socket.socket) mock_sock = mock.Mock(spec=socket.socket)
mock_sock.setsockopt = lambda *_: None mock_sock.setsockopt = lambda *_: None
mock_sock.bind = lambda *_: None mock_sock.bind = lambda *_: None
@ -70,5 +70,6 @@ def mock_network_loop(loop: asyncio.AbstractEventLoop,
mock_sock.close = lambda: None mock_sock.close = lambda: None
mock_sock.type = socket.SOCK_DGRAM mock_sock.type = socket.SOCK_DGRAM
mock_sock.fileno = lambda: 7 mock_sock.fileno = lambda: 7
mock_socket.return_value = mock_sock
loop.create_datagram_endpoint = create_datagram_endpoint loop.create_datagram_endpoint = create_datagram_endpoint
yield yield

View file

@ -491,21 +491,3 @@ class WalletEncryptionAndSynchronization(CommandTestCase):
daemon2.wallet_manager.default_account.channel_keys, daemon2.wallet_manager.default_account.channel_keys,
daemon.wallet_manager.default_wallet.accounts[1].channel_keys daemon.wallet_manager.default_wallet.accounts[1].channel_keys
) )
# test without passwords
data = await daemon2.jsonrpc_wallet_export()
json_data = json.loads(data)
self.assertEqual(json_data["name"], "Wallet")
self.assertNotIn("four", json_data["preferences"])
json_data["preferences"]["four"] = {"value": 4, "ts": 0}
await daemon.jsonrpc_wallet_import(data=json.dumps(json_data), blocking=True)
self.assertEqual(daemon.jsonrpc_preference_get("four"), {"four": 4})
# if password is empty string, export is encrypted
data = await daemon2.jsonrpc_wallet_export(password="")
self.assertNotEqual(data[0], "{")
# if password is empty string, import is decrypted
await daemon.jsonrpc_wallet_import(data, password="")

View file

@ -31,7 +31,7 @@ STREAM_TYPES = {
def verify(channel, data, signature, channel_hash=None): def verify(channel, data, signature, channel_hash=None):
pieces = [ pieces = [
signature['salt'].encode(), signature['signing_ts'].encode(),
channel_hash or channel.claim_hash, channel_hash or channel.claim_hash,
data data
] ]
@ -1239,13 +1239,8 @@ class ChannelCommands(CommandTestCase):
channel = channel_tx.outputs[0] channel = channel_tx.outputs[0]
signature1 = await self.out(self.daemon.jsonrpc_channel_sign(channel_name='@signer', hexdata=data_to_sign)) signature1 = await self.out(self.daemon.jsonrpc_channel_sign(channel_name='@signer', hexdata=data_to_sign))
signature2 = await self.out(self.daemon.jsonrpc_channel_sign(channel_id=channel.claim_id, hexdata=data_to_sign)) signature2 = await self.out(self.daemon.jsonrpc_channel_sign(channel_id=channel.claim_id, hexdata=data_to_sign))
signature3 = await self.out(self.daemon.jsonrpc_channel_sign(channel_id=channel.claim_id, hexdata=data_to_sign, salt='beef'))
signature4 = await self.out(self.daemon.jsonrpc_channel_sign(channel_id=channel.claim_id, hexdata=data_to_sign, salt='beef'))
self.assertNotEqual(signature2, signature3)
self.assertEqual(signature3, signature4)
self.assertTrue(verify(channel, unhexlify(data_to_sign), signature1)) self.assertTrue(verify(channel, unhexlify(data_to_sign), signature1))
self.assertTrue(verify(channel, unhexlify(data_to_sign), signature2)) self.assertTrue(verify(channel, unhexlify(data_to_sign), signature2))
self.assertTrue(verify(channel, unhexlify(data_to_sign), signature3))
signature3 = await self.out(self.daemon.jsonrpc_channel_sign(channel_id=channel.claim_id, hexdata=99)) signature3 = await self.out(self.daemon.jsonrpc_channel_sign(channel_id=channel.claim_id, hexdata=99))
self.assertTrue(verify(channel, unhexlify('99'), signature3)) self.assertTrue(verify(channel, unhexlify('99'), signature3))
@ -1605,13 +1600,6 @@ class StreamCommands(ClaimTestCase):
self.assertTrue(error['text'].startswith(f"Resolve of 'lbry://@some_channel#{some_channel_id[:1]}/bad_content#{bad_content_id[:1]}' was blocked")) self.assertTrue(error['text'].startswith(f"Resolve of 'lbry://@some_channel#{some_channel_id[:1]}/bad_content#{bad_content_id[:1]}' was blocked"))
self.assertTrue(error['censor']['short_url'].startswith('lbry://@blocking#')) self.assertTrue(error['censor']['short_url'].startswith('lbry://@blocking#'))
# local claim list still finds local reposted content that's blocked
claims = await self.claim_list(reposted_claim_id=bad_content_id)
self.assertEqual(claims[0]['name'], 'block1')
self.assertEqual(claims[0]['value']['claim_id'], bad_content_id)
self.assertEqual(claims[1]['name'], 'filter1')
self.assertEqual(claims[1]['value']['claim_id'], bad_content_id)
# a filtered/blocked channel impacts all content inside it # a filtered/blocked channel impacts all content inside it
bad_channel_id = self.get_claim_id( bad_channel_id = self.get_claim_id(
await self.channel_create('@bad_channel', '0.1', tags=['bad-stuff']) await self.channel_create('@bad_channel', '0.1', tags=['bad-stuff'])
@ -1655,7 +1643,9 @@ class StreamCommands(ClaimTestCase):
# block channel # block channel
self.assertEqual(0, len(self.conductor.spv_node.server.db.blocked_channels)) self.assertEqual(0, len(self.conductor.spv_node.server.db.blocked_channels))
blocked_repost_id = self.get_claim_id(
await self.stream_repost(bad_channel_id, 'block2', '0.1', channel_name='@blocking') await self.stream_repost(bad_channel_id, 'block2', '0.1', channel_name='@blocking')
)
self.assertEqual(1, len(self.conductor.spv_node.server.db.blocked_channels)) self.assertEqual(1, len(self.conductor.spv_node.server.db.blocked_channels))
# channel, claim in channel or claim individually no longer resolve # channel, claim in channel or claim individually no longer resolve
@ -1668,6 +1658,10 @@ class StreamCommands(ClaimTestCase):
self.assertEqual((await self.resolve('lbry://worse_content'))['error']['name'], 'BLOCKED') self.assertEqual((await self.resolve('lbry://worse_content'))['error']['name'], 'BLOCKED')
self.assertEqual((await self.resolve('lbry://@bad_channel/worse_content'))['error']['name'], 'BLOCKED') self.assertEqual((await self.resolve('lbry://@bad_channel/worse_content'))['error']['name'], 'BLOCKED')
# blocked claim should still show in local claim list
claims = await self.claim_list(claim_id=blocked_repost_id, resolve=True)
self.assertIn('blocked_repost', claims[0]['meta'])
async def test_publish_updates_file_list(self): async def test_publish_updates_file_list(self):
tx = await self.stream_create(title='created') tx = await self.stream_create(title='created')
txo = tx['outputs'][0] txo = tx['outputs'][0]

View file

@ -89,21 +89,6 @@ class FileCommands(CommandTestCase):
await self.reflector.blob_manager.delete_blobs(all_except_sd) await self.reflector.blob_manager.delete_blobs(all_except_sd)
self.assertEqual(all_except_sd, await self.daemon.jsonrpc_file_reflect(sd_hash=sd_hash)) self.assertEqual(all_except_sd, await self.daemon.jsonrpc_file_reflect(sd_hash=sd_hash))
async def test_sd_blob_fields_fallback(self):
claim_id = self.get_claim_id(await self.stream_create('foo', '0.01', suffix='.txt'))
stream = (await self.daemon.jsonrpc_file_list())["items"][0]
stream.descriptor.suggested_file_name = ' '
stream.descriptor.stream_name = ' '
stream.descriptor.stream_hash = stream.descriptor.get_stream_hash()
sd_hash = stream.descriptor.sd_hash = stream.descriptor.calculate_sd_hash()
await stream.descriptor.make_sd_blob()
await self.daemon.jsonrpc_file_delete(claim_name='foo')
await self.stream_update(claim_id=claim_id, sd_hash=sd_hash)
file_dict = await self.out(self.daemon.jsonrpc_get('lbry://foo', save_file=True))
self.assertEqual(file_dict['suggested_file_name'], stream.file_name)
self.assertEqual(file_dict['stream_name'], stream.file_name)
self.assertEqual(file_dict['mime_type'], 'text/plain')
async def test_file_management(self): async def test_file_management(self):
await self.stream_create('foo', '0.01') await self.stream_create('foo', '0.01')
await self.stream_create('foo2', '0.01') await self.stream_create('foo2', '0.01')
@ -354,7 +339,7 @@ class FileCommands(CommandTestCase):
await self.daemon.jsonrpc_get('lbry://foo') await self.daemon.jsonrpc_get('lbry://foo')
with open(original_path, 'wb') as handle: with open(original_path, 'wb') as handle:
handle.write(b'some other stuff was there instead') handle.write(b'some other stuff was there instead')
await self.daemon.file_manager.stop() self.daemon.file_manager.stop()
await self.daemon.file_manager.start() await self.daemon.file_manager.start()
await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) # if this hangs, file didn't get set completed await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) # if this hangs, file didn't get set completed
# check that internal state got through up to the file list API # check that internal state got through up to the file list API
@ -382,7 +367,8 @@ class FileCommands(CommandTestCase):
resp = await self.out(self.daemon.jsonrpc_get('lbry://foo', timeout=2)) resp = await self.out(self.daemon.jsonrpc_get('lbry://foo', timeout=2))
self.assertNotIn('error', resp) self.assertNotIn('error', resp)
self.assertTrue(os.path.isfile(path)) self.assertTrue(os.path.isfile(path))
await self.daemon.file_manager.stop() self.daemon.file_manager.stop()
await asyncio.sleep(0.01, loop=self.loop) # FIXME: this sleep should not be needed
self.assertFalse(os.path.isfile(path)) self.assertFalse(os.path.isfile(path))
async def test_incomplete_downloads_retry(self): async def test_incomplete_downloads_retry(self):
@ -477,7 +463,7 @@ class FileCommands(CommandTestCase):
# restart the daemon and make sure the fee is still there # restart the daemon and make sure the fee is still there
await self.daemon.file_manager.stop() self.daemon.file_manager.stop()
await self.daemon.file_manager.start() await self.daemon.file_manager.start()
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].content_fee.raw, raw_content_fee) self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].content_fee.raw, raw_content_fee)

View file

@ -3,9 +3,7 @@ import hashlib
import aiohttp import aiohttp
import aiohttp.web import aiohttp.web
import asyncio import asyncio
import contextlib
from lbry.file.source import ManagedDownloadSource
from lbry.utils import aiohttp_request from lbry.utils import aiohttp_request
from lbry.blob.blob_file import MAX_BLOB_SIZE from lbry.blob.blob_file import MAX_BLOB_SIZE
from lbry.testcase import CommandTestCase from lbry.testcase import CommandTestCase
@ -23,7 +21,7 @@ def get_random_bytes(n: int) -> bytes:
class RangeRequests(CommandTestCase): class RangeRequests(CommandTestCase):
async def _restart_stream_manager(self): async def _restart_stream_manager(self):
await self.daemon.file_manager.stop() self.daemon.file_manager.stop()
await self.daemon.file_manager.start() await self.daemon.file_manager.start()
return return
@ -354,20 +352,13 @@ class RangeRequests(CommandTestCase):
path = stream.full_path path = stream.full_path
self.assertIsNotNone(path) self.assertIsNotNone(path)
if wait_for_start_writing: if wait_for_start_writing:
with contextlib.suppress(asyncio.CancelledError):
await stream.started_writing.wait() await stream.started_writing.wait()
self.assertTrue(os.path.isfile(path)) self.assertTrue(os.path.isfile(path))
await self.daemon.file_manager.stop() await self._restart_stream_manager()
# while stopped, we get no response to query and no file is present
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'], [])
self.assertEqual(os.path.isfile(path), stream.status == ManagedDownloadSource.STATUS_FINISHED)
await self.daemon.file_manager.start()
# after restart, we get a response to query and same file path
stream = (await self.daemon.jsonrpc_file_list())['items'][0] stream = (await self.daemon.jsonrpc_file_list())['items'][0]
self.assertIsNotNone(stream.full_path) self.assertIsNotNone(stream.full_path)
self.assertEqual(stream.full_path, path) self.assertFalse(os.path.isfile(path))
if wait_for_start_writing: if wait_for_start_writing:
with contextlib.suppress(asyncio.CancelledError):
await stream.started_writing.wait() await stream.started_writing.wait()
self.assertTrue(os.path.isfile(path)) self.assertTrue(os.path.isfile(path))
@ -423,6 +414,6 @@ class RangeRequestsLRUCache(CommandTestCase):
# running with cache size 0 gets through without errors without # running with cache size 0 gets through without errors without
# this since the server doesn't stop immediately # this since the server doesn't stop immediately
await asyncio.sleep(1) await asyncio.sleep(1, loop=self.loop)
await self._request_stream() await self._request_stream()

View file

@ -1508,27 +1508,27 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
COIN = int(1E8) COIN = int(1E8)
self.assertEqual(self.conductor.spv_node.writer.height, 207) self.assertEqual(self.conductor.spv_node.writer.height, 207)
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stash_put( self.conductor.spv_node.writer.db.prefix_db.trending_notification.stage_put(
(208, bytes.fromhex(claim_id1)), (0, 10 * COIN) (208, bytes.fromhex(claim_id1)), (0, 10 * COIN)
) )
await self.generate(1) await self.generate(1)
self.assertEqual(self.conductor.spv_node.writer.height, 208) self.assertEqual(self.conductor.spv_node.writer.height, 208)
self.assertEqual(1.7090807854206793, await get_trending_score(claim_id1)) self.assertEqual(1.7090807854206793, await get_trending_score(claim_id1))
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stash_put( self.conductor.spv_node.writer.db.prefix_db.trending_notification.stage_put(
(209, bytes.fromhex(claim_id1)), (10 * COIN, 100 * COIN) (209, bytes.fromhex(claim_id1)), (10 * COIN, 100 * COIN)
) )
await self.generate(1) await self.generate(1)
self.assertEqual(self.conductor.spv_node.writer.height, 209) self.assertEqual(self.conductor.spv_node.writer.height, 209)
self.assertEqual(2.2437974397778886, await get_trending_score(claim_id1)) self.assertEqual(2.2437974397778886, await get_trending_score(claim_id1))
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stash_put( self.conductor.spv_node.writer.db.prefix_db.trending_notification.stage_put(
(309, bytes.fromhex(claim_id1)), (100 * COIN, 1000000 * COIN) (309, bytes.fromhex(claim_id1)), (100 * COIN, 1000000 * COIN)
) )
await self.generate(100) await self.generate(100)
self.assertEqual(self.conductor.spv_node.writer.height, 309) self.assertEqual(self.conductor.spv_node.writer.height, 309)
self.assertEqual(5.157053472135866, await get_trending_score(claim_id1)) self.assertEqual(5.157053472135866, await get_trending_score(claim_id1))
self.conductor.spv_node.writer.db.prefix_db.trending_notification.stash_put( self.conductor.spv_node.writer.db.prefix_db.trending_notification.stage_put(
(409, bytes.fromhex(claim_id1)), (1000000 * COIN, 1 * COIN) (409, bytes.fromhex(claim_id1)), (1000000 * COIN, 1 * COIN)
) )

View file

@ -2,7 +2,7 @@ import asyncio
import unittest import unittest
from lbry.testcase import CommandTestCase from lbry.testcase import CommandTestCase
from lbry.wallet import Transaction
class TransactionCommandsTestCase(CommandTestCase): class TransactionCommandsTestCase(CommandTestCase):
@ -29,42 +29,17 @@ class TransactionCommandsTestCase(CommandTestCase):
# someone's tx # someone's tx
change_address = await self.blockchain.get_raw_change_address() change_address = await self.blockchain.get_raw_change_address()
sendtxid = await self.blockchain.send_to_address(change_address, 10) sendtxid = await self.blockchain.send_to_address(change_address, 10)
# After a few tries, Hub should have the transaction (in mempool). await asyncio.sleep(0.2)
for i in range(5):
tx = await self.daemon.jsonrpc_transaction_show(sendtxid) tx = await self.daemon.jsonrpc_transaction_show(sendtxid)
# Retry if Hub is not aware of the transaction. self.assertEqual(tx.id, sendtxid)
if isinstance(tx, dict): self.assertEqual(tx.height, -1)
# Fields: 'success', 'code', 'message'
self.assertFalse(tx['success'], tx)
self.assertEqual(tx['code'], 404, tx)
self.assertEqual(tx['message'], "transaction not found", tx)
await asyncio.sleep(0.1)
continue
break
# verify transaction show (in mempool)
self.assertTrue(isinstance(tx, Transaction), str(tx))
# Fields: 'txid', 'raw', 'height', 'position', 'is_verified', and more.
self.assertEqual(tx.id, sendtxid, vars(tx))
self.assertEqual(tx.height, -1, vars(tx))
self.assertEqual(tx.is_verified, False, vars(tx))
# transaction is confirmed and leaves mempool
await self.generate(1) await self.generate(1)
# verify transaction show
tx = await self.daemon.jsonrpc_transaction_show(sendtxid) tx = await self.daemon.jsonrpc_transaction_show(sendtxid)
self.assertTrue(isinstance(tx, Transaction), str(tx)) self.assertEqual(tx.height, self.ledger.headers.height)
self.assertEqual(tx.id, sendtxid, vars(tx))
self.assertEqual(tx.height, self.ledger.headers.height, vars(tx))
self.assertEqual(tx.is_verified, True, vars(tx))
# inexistent # inexistent
result = await self.daemon.jsonrpc_transaction_show('0'*64) result = await self.daemon.jsonrpc_transaction_show('0'*64)
self.assertTrue(isinstance(result, dict), result) self.assertFalse(result['success'])
# Fields: 'success', 'code', 'message'
self.assertFalse(result['success'], result)
self.assertEqual(result['code'], 404, result)
self.assertEqual(result['message'], "transaction not found", result)
async def test_utxo_release(self): async def test_utxo_release(self):
await self.send_to_address_and_wait( await self.send_to_address_and_wait(

View file

@ -36,7 +36,7 @@ class TestBlob(AsyncioTestCase):
writer.write(self.blob_bytes) writer.write(self.blob_bytes)
await blob.verified.wait() await blob.verified.wait()
self.assertTrue(blob.get_is_verified()) self.assertTrue(blob.get_is_verified())
await asyncio.sleep(0) # wait for the db save task await asyncio.sleep(0, loop=self.loop) # wait for the db save task
return blob return blob
async def _test_close_writers_on_finished(self, blob_class=AbstractBlob, blob_directory=None): async def _test_close_writers_on_finished(self, blob_class=AbstractBlob, blob_directory=None):
@ -48,7 +48,7 @@ class TestBlob(AsyncioTestCase):
with self.assertRaises(InvalidDataError): with self.assertRaises(InvalidDataError):
writers[1].write(self.blob_bytes * 2) writers[1].write(self.blob_bytes * 2)
await writers[1].finished await writers[1].finished
await asyncio.sleep(0) await asyncio.sleep(0, loop=self.loop)
self.assertEqual(4, len(blob.writers)) self.assertEqual(4, len(blob.writers))
# write the blob # write the blob
@ -208,7 +208,7 @@ class TestBlob(AsyncioTestCase):
async def read_blob_buffer(): async def read_blob_buffer():
with reader as read_handle: with reader as read_handle:
self.assertEqual(1, len(blob.readers)) self.assertEqual(1, len(blob.readers))
await asyncio.sleep(2) await asyncio.sleep(2, loop=self.loop)
self.assertEqual(0, len(blob.readers)) self.assertEqual(0, len(blob.readers))
return read_handle.read() return read_handle.read()

View file

@ -183,7 +183,7 @@ class TestBlobExchange(BlobExchangeTestBase):
writer.write(mock_blob_bytes) writer.write(mock_blob_bytes)
return self.loop.create_task(_inner()) return self.loop.create_task(_inner())
await asyncio.gather(write_task(writer1), write_task(writer2)) await asyncio.gather(write_task(writer1), write_task(writer2), loop=self.loop)
self.assertDictEqual({1: mock_blob_bytes, 2: mock_blob_bytes}, results) self.assertDictEqual({1: mock_blob_bytes, 2: mock_blob_bytes}, results)
self.assertEqual(1, write_called_count) self.assertEqual(1, write_called_count)
@ -239,8 +239,7 @@ class TestBlobExchange(BlobExchangeTestBase):
async def test_server_chunked_request(self): async def test_server_chunked_request(self):
blob_hash = "7f5ab2def99f0ddd008da71db3a3772135f4002b19b7605840ed1034c8955431bd7079549e65e6b2a3b9c17c773073ed" blob_hash = "7f5ab2def99f0ddd008da71db3a3772135f4002b19b7605840ed1034c8955431bd7079549e65e6b2a3b9c17c773073ed"
server_protocol = BlobServerProtocol(self.loop, self.server_blob_manager, self.server.lbrycrd_address) server_protocol = BlobServerProtocol(self.loop, self.server_blob_manager, self.server.lbrycrd_address)
transport = mock.Mock(spec=asyncio.Transport) transport = asyncio.Transport(extra={'peername': ('ip', 90)})
transport.get_extra_info = lambda k: {'peername': ('ip', 90)}[k]
received_data = BytesIO() received_data = BytesIO()
transport.is_closing = lambda: received_data.closed transport.is_closing = lambda: received_data.closed
transport.write = received_data.write transport.write = received_data.write
@ -270,7 +269,7 @@ class TestBlobExchange(BlobExchangeTestBase):
client_blob.delete() client_blob.delete()
# wait for less than the idle timeout # wait for less than the idle timeout
await asyncio.sleep(0.5) await asyncio.sleep(0.5, loop=self.loop)
# download the blob again # download the blob again
downloaded, protocol2 = await request_blob(self.loop, client_blob, self.server_from_client.address, downloaded, protocol2 = await request_blob(self.loop, client_blob, self.server_from_client.address,
@ -284,10 +283,10 @@ class TestBlobExchange(BlobExchangeTestBase):
client_blob.delete() client_blob.delete()
# check that the connection times out from the server side # check that the connection times out from the server side
await asyncio.sleep(0.9) await asyncio.sleep(0.9, loop=self.loop)
self.assertFalse(protocol.transport.is_closing()) self.assertFalse(protocol.transport.is_closing())
self.assertIsNotNone(protocol.transport._sock) self.assertIsNotNone(protocol.transport._sock)
await asyncio.sleep(0.1) await asyncio.sleep(0.1, loop=self.loop)
self.assertIsNone(protocol.transport) self.assertIsNone(protocol.transport)
def test_max_request_size(self): def test_max_request_size(self):
@ -323,7 +322,7 @@ class TestBlobExchange(BlobExchangeTestBase):
server_blob = self.server_blob_manager.get_blob(blob_hash) server_blob = self.server_blob_manager.get_blob(blob_hash)
async def sendfile(writer): async def sendfile(writer):
await asyncio.sleep(2) await asyncio.sleep(2, loop=self.loop)
return 0 return 0
server_blob.sendfile = sendfile server_blob.sendfile = sendfile
@ -347,7 +346,7 @@ class TestBlobExchange(BlobExchangeTestBase):
def _mock_accumulate_peers(q1, q2=None): def _mock_accumulate_peers(q1, q2=None):
async def _task(): async def _task():
pass pass
q2 = q2 or asyncio.Queue() q2 = q2 or asyncio.Queue(loop=self.loop)
return q2, self.loop.create_task(_task()) return q2, self.loop.create_task(_task())
mock_node.accumulate_peers = _mock_accumulate_peers mock_node.accumulate_peers = _mock_accumulate_peers

View file

@ -72,14 +72,14 @@ class CacheConcurrentDecoratorTests(AsyncioTestCase):
@utils.cache_concurrent @utils.cache_concurrent
async def foo(self, arg1, arg2=None, delay=1): async def foo(self, arg1, arg2=None, delay=1):
self.called.append((arg1, arg2, delay)) self.called.append((arg1, arg2, delay))
await asyncio.sleep(delay) await asyncio.sleep(delay, loop=self.loop)
self.counter += 1 self.counter += 1
self.finished.append((arg1, arg2, delay)) self.finished.append((arg1, arg2, delay))
return object() return object()
async def test_gather_duplicates(self): async def test_gather_duplicates(self):
result = await asyncio.gather( result = await asyncio.gather(
self.loop.create_task(self.foo(1)), self.loop.create_task(self.foo(1)) self.loop.create_task(self.foo(1)), self.loop.create_task(self.foo(1)), loop=self.loop
) )
self.assertEqual(1, len(self.called)) self.assertEqual(1, len(self.called))
self.assertEqual(1, len(self.finished)) self.assertEqual(1, len(self.finished))
@ -93,7 +93,7 @@ class CacheConcurrentDecoratorTests(AsyncioTestCase):
with self.assertRaises(asyncio.CancelledError): with self.assertRaises(asyncio.CancelledError):
await asyncio.gather( await asyncio.gather(
t1, self.loop.create_task(self.foo(1)) t1, self.loop.create_task(self.foo(1)), loop=self.loop
) )
self.assertEqual(1, len(self.called)) self.assertEqual(1, len(self.called))
self.assertEqual(0, len(self.finished)) self.assertEqual(0, len(self.finished))

View file

@ -128,7 +128,7 @@ class TestBlobAnnouncer(AsyncioTestCase):
await self.chain_peer(constants.generate_id(current + 4), '1.2.3.13') await self.chain_peer(constants.generate_id(current + 4), '1.2.3.13')
last = await self.chain_peer(constants.generate_id(current + 5), '1.2.3.14') last = await self.chain_peer(constants.generate_id(current + 5), '1.2.3.14')
search_q, peer_q = asyncio.Queue(), asyncio.Queue() search_q, peer_q = asyncio.Queue(loop=self.loop), asyncio.Queue(loop=self.loop)
search_q.put_nowait(blob1) search_q.put_nowait(blob1)
_, task = last.accumulate_peers(search_q, peer_q) _, task = last.accumulate_peers(search_q, peer_q)

View file

@ -2,6 +2,7 @@ import unittest
from unittest import mock from unittest import mock
import json import json
from lbry.conf import Config
from lbry.extras.daemon.storage import SQLiteStorage from lbry.extras.daemon.storage import SQLiteStorage
from lbry.extras.daemon.componentmanager import ComponentManager from lbry.extras.daemon.componentmanager import ComponentManager
from lbry.extras.daemon.components import DATABASE_COMPONENT, DHT_COMPONENT, WALLET_COMPONENT from lbry.extras.daemon.components import DATABASE_COMPONENT, DHT_COMPONENT, WALLET_COMPONENT
@ -10,7 +11,6 @@ from lbry.extras.daemon.components import UPNP_COMPONENT, BLOB_COMPONENT
from lbry.extras.daemon.components import PEER_PROTOCOL_SERVER_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT from lbry.extras.daemon.components import PEER_PROTOCOL_SERVER_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT
from lbry.extras.daemon.daemon import Daemon as LBRYDaemon from lbry.extras.daemon.daemon import Daemon as LBRYDaemon
from lbry.wallet import WalletManager, Wallet from lbry.wallet import WalletManager, Wallet
from lbry.conf import Config
from tests import test_utils from tests import test_utils
# from tests.mocks import mock_conf_settings, FakeNetwork, FakeFileManager # from tests.mocks import mock_conf_settings, FakeNetwork, FakeFileManager

View file

@ -8,8 +8,6 @@ from lbry.blob_exchange.serialization import BlobResponse
from lbry.blob_exchange.server import BlobServerProtocol from lbry.blob_exchange.server import BlobServerProtocol
from lbry.dht.node import Node from lbry.dht.node import Node
from lbry.dht.peer import make_kademlia_peer from lbry.dht.peer import make_kademlia_peer
from lbry.extras.daemon.storage import StoredContentClaim
from lbry.schema import Claim
from lbry.stream.managed_stream import ManagedStream from lbry.stream.managed_stream import ManagedStream
from lbry.stream.descriptor import StreamDescriptor from lbry.stream.descriptor import StreamDescriptor
from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase
@ -25,10 +23,7 @@ class TestManagedStream(BlobExchangeTestBase):
with open(file_path, 'wb') as f: with open(file_path, 'wb') as f:
f.write(self.stream_bytes) f.write(self.stream_bytes)
descriptor = await StreamDescriptor.create_stream(self.loop, self.server_blob_manager.blob_dir, file_path) descriptor = await StreamDescriptor.create_stream(self.loop, self.server_blob_manager.blob_dir, file_path)
descriptor.suggested_file_name = file_name self.sd_hash = descriptor.calculate_sd_hash()
descriptor.stream_hash = descriptor.get_stream_hash()
self.sd_hash = descriptor.sd_hash = descriptor.calculate_sd_hash()
await descriptor.make_sd_blob()
return descriptor return descriptor
async def setup_stream(self, blob_count: int = 10): async def setup_stream(self, blob_count: int = 10):
@ -52,21 +47,6 @@ class TestManagedStream(BlobExchangeTestBase):
self.assertEqual(self.stream.full_path, os.path.join(self.client_dir, 'tt_f')) self.assertEqual(self.stream.full_path, os.path.join(self.client_dir, 'tt_f'))
self.assertTrue(os.path.isfile(os.path.join(self.client_dir, 'tt_f'))) self.assertTrue(os.path.isfile(os.path.join(self.client_dir, 'tt_f')))
async def test_empty_name_fallback(self):
descriptor = await self.create_stream(file_name=" ")
descriptor.suggested_file_name = " "
claim = Claim()
claim.stream.source.name = "cool.mp4"
self.stream = ManagedStream(
self.loop, self.client_config, self.client_blob_manager, self.sd_hash, self.client_dir,
claim=StoredContentClaim(serialized=claim.to_bytes().hex())
)
await self._test_transfer_stream(10, skip_setup=True)
self.assertTrue(self.stream.completed)
self.assertEqual(self.stream.suggested_file_name, "cool.mp4")
self.assertEqual(self.stream.stream_name, "cool.mp4")
self.assertEqual(self.stream.mime_type, "video/mp4")
async def test_status_file_completed(self): async def test_status_file_completed(self):
await self._test_transfer_stream(10) await self._test_transfer_stream(10)
self.assertTrue(self.stream.output_file_exists) self.assertTrue(self.stream.output_file_exists)
@ -109,9 +89,9 @@ class TestManagedStream(BlobExchangeTestBase):
await self._test_transfer_stream(10, stop_when_done=False) await self._test_transfer_stream(10, stop_when_done=False)
self.assertEqual(self.stream.status, "finished") self.assertEqual(self.stream.status, "finished")
self.assertTrue(self.stream._running.is_set()) self.assertTrue(self.stream._running.is_set())
await asyncio.sleep(0.5) await asyncio.sleep(0.5, loop=self.loop)
self.assertTrue(self.stream._running.is_set()) self.assertTrue(self.stream._running.is_set())
await asyncio.sleep(2) await asyncio.sleep(2, loop=self.loop)
self.assertEqual(self.stream.status, "finished") self.assertEqual(self.stream.status, "finished")
self.assertFalse(self.stream._running.is_set()) self.assertFalse(self.stream._running.is_set())

View file

@ -86,13 +86,13 @@ class TestReflector(AsyncioTestCase):
self.assertListEqual(sent, []) self.assertListEqual(sent, [])
async def test_reflect_stream(self): async def test_reflect_stream(self):
return await asyncio.wait_for(self._test_reflect_stream(response_chunk_size=50), 3) return await asyncio.wait_for(self._test_reflect_stream(response_chunk_size=50), 3, loop=self.loop)
async def test_reflect_stream_but_reflector_changes_its_mind(self): async def test_reflect_stream_but_reflector_changes_its_mind(self):
return await asyncio.wait_for(self._test_reflect_stream(partial_needs=True), 3) return await asyncio.wait_for(self._test_reflect_stream(partial_needs=True), 3, loop=self.loop)
async def test_reflect_stream_small_response_chunks(self): async def test_reflect_stream_small_response_chunks(self):
return await asyncio.wait_for(self._test_reflect_stream(response_chunk_size=30), 3) return await asyncio.wait_for(self._test_reflect_stream(response_chunk_size=30), 3, loop=self.loop)
async def test_announces(self): async def test_announces(self):
to_announce = await self.storage.get_blobs_to_announce() to_announce = await self.storage.get_blobs_to_announce()

View file

@ -174,7 +174,7 @@ class TestStreamManager(BlobExchangeTestBase):
await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager) await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
else: else:
await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager) await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
await asyncio.sleep(0) await asyncio.sleep(0, loop=self.loop)
self.assertTrue(checked_analytics_event) self.assertTrue(checked_analytics_event)
async def test_time_to_first_bytes(self): async def test_time_to_first_bytes(self):
@ -317,7 +317,7 @@ class TestStreamManager(BlobExchangeTestBase):
stream.downloader.node = self.stream_manager.node stream.downloader.node = self.stream_manager.node
await stream.save_file() await stream.save_file()
await stream.finished_writing.wait() await stream.finished_writing.wait()
await asyncio.sleep(0) await asyncio.sleep(0, loop=self.loop)
self.assertTrue(stream.finished) self.assertTrue(stream.finished)
self.assertFalse(stream.running) self.assertFalse(stream.running)
self.assertTrue(os.path.isfile(os.path.join(self.client_dir, "test_file"))) self.assertTrue(os.path.isfile(os.path.join(self.client_dir, "test_file")))
@ -355,7 +355,7 @@ class TestStreamManager(BlobExchangeTestBase):
self.stream_manager.analytics_manager._post = check_post self.stream_manager.analytics_manager._post = check_post
await self._test_download_error_on_start(expected_error, timeout) await self._test_download_error_on_start(expected_error, timeout)
await asyncio.sleep(0) await asyncio.sleep(0, loop=self.loop)
self.assertListEqual([expected_error.__name__], received) self.assertListEqual([expected_error.__name__], received)
async def test_insufficient_funds(self): async def test_insufficient_funds(self):
@ -424,7 +424,7 @@ class TestStreamManager(BlobExchangeTestBase):
self.assertIsNone(stream.full_path) self.assertIsNone(stream.full_path)
self.assertEqual(0, stream.written_bytes) self.assertEqual(0, stream.written_bytes)
await self.stream_manager.stop() self.stream_manager.stop()
await self.stream_manager.start() await self.stream_manager.start()
self.assertEqual(1, len(self.stream_manager.streams)) self.assertEqual(1, len(self.stream_manager.streams))
stream = list(self.stream_manager.streams.values())[0] stream = list(self.stream_manager.streams.values())[0]
@ -448,8 +448,8 @@ class TestStreamManager(BlobExchangeTestBase):
self.assertDictEqual(self.stream_manager.streams, {}) self.assertDictEqual(self.stream_manager.streams, {})
stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager) stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
await stream.finished_writing.wait() await stream.finished_writing.wait()
await asyncio.sleep(0) await asyncio.sleep(0, loop=self.loop)
await self.stream_manager.stop() self.stream_manager.stop()
self.client_blob_manager.stop() self.client_blob_manager.stop()
# partial removal, only sd blob is missing. # partial removal, only sd blob is missing.
# in this case, we recover the sd blob while the other blobs are kept untouched as 'finished' # in this case, we recover the sd blob while the other blobs are kept untouched as 'finished'

View file

@ -470,7 +470,7 @@ class TestUpgrade(AsyncioTestCase):
class TestSQLiteRace(AsyncioTestCase): class TestSQLiteRace(AsyncioTestCase):
max_misuse_attempts = 120000 max_misuse_attempts = 80000
def setup_db(self): def setup_db(self):
self.db = sqlite3.connect(":memory:", isolation_level=None) self.db = sqlite3.connect(":memory:", isolation_level=None)