Compare commits
4 commits
Author | SHA1 | Date | |
---|---|---|---|
|
6deeb67585 | ||
|
c53e76011f | ||
|
87c30c5b05 | ||
|
359e36a5b8 |
31 changed files with 288 additions and 137 deletions
64
.github/workflows/main.yml
vendored
64
.github/workflows/main.yml
vendored
|
@ -1,18 +1,18 @@
|
|||
name: ci
|
||||
on: ["push", "pull_request", "workflow_dispatch"]
|
||||
on: ["push", "pull_request"]
|
||||
|
||||
jobs:
|
||||
|
||||
lint:
|
||||
name: lint
|
||||
runs-on: ubuntu-20.04
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/setup-python@v4
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/setup-python@v1
|
||||
with:
|
||||
python-version: '3.9'
|
||||
python-version: '3.7'
|
||||
- name: extract pip cache
|
||||
uses: actions/cache@v3
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ~/.cache/pip
|
||||
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
|
||||
|
@ -26,26 +26,26 @@ jobs:
|
|||
strategy:
|
||||
matrix:
|
||||
os:
|
||||
- ubuntu-20.04
|
||||
- ubuntu-latest
|
||||
- macos-latest
|
||||
- windows-latest
|
||||
runs-on: ${{ matrix.os }}
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/setup-python@v4
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/setup-python@v1
|
||||
with:
|
||||
python-version: '3.9'
|
||||
python-version: '3.7'
|
||||
- name: set pip cache dir
|
||||
shell: bash
|
||||
run: echo "PIP_CACHE_DIR=$(pip cache dir)" >> $GITHUB_ENV
|
||||
id: pip-cache
|
||||
run: echo "::set-output name=dir::$(pip cache dir)"
|
||||
- name: extract pip cache
|
||||
uses: actions/cache@v3
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ env.PIP_CACHE_DIR }}
|
||||
path: ${{ steps.pip-cache.outputs.dir }}
|
||||
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
|
||||
restore-keys: ${{ runner.os }}-pip-
|
||||
- id: os-name
|
||||
uses: ASzc/change-string-case-action@v5
|
||||
uses: ASzc/change-string-case-action@v1
|
||||
with:
|
||||
string: ${{ runner.os }}
|
||||
- run: python -m pip install --user --upgrade pip wheel
|
||||
|
@ -72,7 +72,7 @@ jobs:
|
|||
|
||||
tests-integration:
|
||||
name: "tests / integration"
|
||||
runs-on: ubuntu-20.04
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
test:
|
||||
|
@ -93,16 +93,16 @@ jobs:
|
|||
uses: elastic/elastic-github-actions/elasticsearch@master
|
||||
with:
|
||||
stack-version: 7.12.1
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/setup-python@v4
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/setup-python@v1
|
||||
with:
|
||||
python-version: '3.9'
|
||||
python-version: '3.7'
|
||||
- if: matrix.test == 'other'
|
||||
run: |
|
||||
sudo apt-get update
|
||||
sudo apt-get install -y --no-install-recommends ffmpeg
|
||||
- name: extract pip cache
|
||||
uses: actions/cache@v3
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ./.tox
|
||||
key: tox-integration-${{ matrix.test }}-${{ hashFiles('setup.py') }}
|
||||
|
@ -123,7 +123,7 @@ jobs:
|
|||
|
||||
coverage:
|
||||
needs: ["tests-unit", "tests-integration"]
|
||||
runs-on: ubuntu-20.04
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: finalize coverage report submission
|
||||
env:
|
||||
|
@ -138,26 +138,26 @@ jobs:
|
|||
strategy:
|
||||
matrix:
|
||||
os:
|
||||
- ubuntu-20.04
|
||||
- ubuntu-18.04
|
||||
- macos-latest
|
||||
- windows-latest
|
||||
runs-on: ${{ matrix.os }}
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/setup-python@v4
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/setup-python@v1
|
||||
with:
|
||||
python-version: '3.9'
|
||||
python-version: '3.7'
|
||||
- id: os-name
|
||||
uses: ASzc/change-string-case-action@v5
|
||||
uses: ASzc/change-string-case-action@v1
|
||||
with:
|
||||
string: ${{ runner.os }}
|
||||
- name: set pip cache dir
|
||||
shell: bash
|
||||
run: echo "PIP_CACHE_DIR=$(pip cache dir)" >> $GITHUB_ENV
|
||||
id: pip-cache
|
||||
run: echo "::set-output name=dir::$(pip cache dir)"
|
||||
- name: extract pip cache
|
||||
uses: actions/cache@v3
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ env.PIP_CACHE_DIR }}
|
||||
path: ${{ steps.pip-cache.outputs.dir }}
|
||||
key: ${{ runner.os }}-pip-${{ hashFiles('setup.py') }}
|
||||
restore-keys: ${{ runner.os }}-pip-
|
||||
- run: pip install pyinstaller==4.6
|
||||
|
@ -175,7 +175,7 @@ jobs:
|
|||
pip install pywin32==301
|
||||
pyinstaller --additional-hooks-dir=scripts/. --icon=icons/lbry256.ico --onefile --name lbrynet lbry/extras/cli.py
|
||||
dist/lbrynet.exe --version
|
||||
- uses: actions/upload-artifact@v3
|
||||
- uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: lbrynet-${{ steps.os-name.outputs.lowercase }}
|
||||
path: dist/
|
||||
|
@ -184,7 +184,7 @@ jobs:
|
|||
name: "release"
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
needs: ["build"]
|
||||
runs-on: ubuntu-20.04
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v1
|
||||
- uses: actions/download-artifact@v2
|
||||
|
|
2
.github/workflows/release.yml
vendored
2
.github/workflows/release.yml
vendored
|
@ -7,7 +7,7 @@ on:
|
|||
jobs:
|
||||
release:
|
||||
name: "slack notification"
|
||||
runs-on: ubuntu-20.04
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: LoveToKnow/slackify-markdown-action@v1.0.0
|
||||
id: markdown
|
||||
|
|
2
Makefile
2
Makefile
|
@ -23,4 +23,4 @@ idea:
|
|||
cp -r scripts/idea/* .idea
|
||||
|
||||
elastic-docker:
|
||||
docker run -d -v lbryhub:/usr/share/elasticsearch/data -p 9200:9200 -p 9300:9300 -e"ES_JAVA_OPTS=-Xms512m -Xmx512m" -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.12.1
|
||||
docker run -d --env network.publish_host=127.0.0.1 -v lbryhub:/usr/share/elasticsearch/data -p 9200:9200 -p 9300:9300 -e"ES_JAVA_OPTS=-Xms512m -Xmx512m" -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.12.1
|
||||
|
|
|
@ -1,2 +1,2 @@
|
|||
__version__ = "0.113.0"
|
||||
__version__ = "0.112.0"
|
||||
version = tuple(map(int, __version__.split('.'))) # pylint: disable=invalid-name
|
||||
|
|
|
@ -64,7 +64,7 @@ class BlobDownloader:
|
|||
self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1
|
||||
|
||||
async def new_peer_or_finished(self):
|
||||
active_tasks = list(self.active_connections.values()) + [asyncio.create_task(asyncio.sleep(1))]
|
||||
active_tasks = list(self.active_connections.values()) + [asyncio.sleep(1)]
|
||||
await asyncio.wait(active_tasks, return_when='FIRST_COMPLETED')
|
||||
|
||||
def cleanup_active(self):
|
||||
|
|
11
lbry/conf.py
11
lbry/conf.py
|
@ -688,9 +688,6 @@ class Config(CLIConfig):
|
|||
tracker_servers = Servers("BitTorrent-compatible (BEP15) UDP trackers for helping P2P discovery", [
|
||||
('tracker.lbry.com', 9252),
|
||||
('tracker.lbry.grin.io', 9252),
|
||||
('tracker.lbry.pigg.es', 9252),
|
||||
('tracker.lizard.technology', 9252),
|
||||
('s1.lbry.network', 9252),
|
||||
])
|
||||
|
||||
lbryum_servers = Servers("SPV wallet servers", [
|
||||
|
@ -703,20 +700,14 @@ class Config(CLIConfig):
|
|||
('spv17.lbry.com', 50001),
|
||||
('spv18.lbry.com', 50001),
|
||||
('spv19.lbry.com', 50001),
|
||||
('hub.lbry.grin.io', 50001),
|
||||
('hub.lizard.technology', 50001),
|
||||
('s1.lbry.network', 50001),
|
||||
])
|
||||
known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [
|
||||
('dht.lbry.grin.io', 4444), # Grin
|
||||
('dht.lbry.madiator.com', 4444), # Madiator
|
||||
('dht.lbry.pigg.es', 4444), # Pigges
|
||||
('lbrynet1.lbry.com', 4444), # US EAST
|
||||
('lbrynet2.lbry.com', 4444), # US WEST
|
||||
('lbrynet3.lbry.com', 4444), # EU
|
||||
('lbrynet4.lbry.com', 4444), # ASIA
|
||||
('dht.lizard.technology', 4444), # Jack
|
||||
('s2.lbry.network', 4444),
|
||||
('lbrynet4.lbry.com', 4444) # ASIA
|
||||
])
|
||||
|
||||
# blockchain
|
||||
|
|
|
@ -42,6 +42,8 @@ class BlobAnnouncer:
|
|||
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
|
||||
except Exception as err:
|
||||
self.announcements_sent_metric.labels(peers=0, error=True).inc()
|
||||
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
|
||||
raise err
|
||||
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
|
||||
|
||||
async def _announce(self, batch_size: typing.Optional[int] = 10):
|
||||
|
|
|
@ -8,7 +8,6 @@ from prometheus_client import Gauge
|
|||
|
||||
from lbry import utils
|
||||
from lbry.dht import constants
|
||||
from lbry.dht.error import RemoteException
|
||||
from lbry.dht.protocol.distance import Distance
|
||||
if typing.TYPE_CHECKING:
|
||||
from lbry.dht.peer import KademliaPeer, PeerManager
|
||||
|
@ -396,7 +395,7 @@ class TreeRoutingTable:
|
|||
try:
|
||||
await probe(to_replace)
|
||||
return False
|
||||
except (asyncio.TimeoutError, RemoteException):
|
||||
except asyncio.TimeoutError:
|
||||
log.debug("Replacing dead contact in bucket %i: %s:%i with %s:%i ", bucket_index,
|
||||
to_replace.address, to_replace.udp_port, peer.address, peer.udp_port)
|
||||
if to_replace in self.buckets[bucket_index]:
|
||||
|
|
|
@ -37,7 +37,7 @@ class Component(metaclass=ComponentType):
|
|||
def running(self):
|
||||
return self._running
|
||||
|
||||
async def get_status(self): # pylint: disable=no-self-use
|
||||
async def get_status(self):
|
||||
return
|
||||
|
||||
async def start(self):
|
||||
|
|
|
@ -118,7 +118,7 @@ class ComponentManager:
|
|||
component._setup() for component in stage if not component.running
|
||||
]
|
||||
if needing_start:
|
||||
await asyncio.wait(map(asyncio.create_task, needing_start))
|
||||
await asyncio.wait(needing_start)
|
||||
self.started.set()
|
||||
|
||||
async def stop(self):
|
||||
|
@ -131,7 +131,7 @@ class ComponentManager:
|
|||
component._stop() for component in stage if component.running
|
||||
]
|
||||
if needing_stop:
|
||||
await asyncio.wait(map(asyncio.create_task, needing_stop))
|
||||
await asyncio.wait(needing_stop)
|
||||
|
||||
def all_components_running(self, *component_names):
|
||||
"""
|
||||
|
|
|
@ -374,7 +374,7 @@ class FileManagerComponent(Component):
|
|||
log.info('Done setting up file manager')
|
||||
|
||||
async def stop(self):
|
||||
await self.file_manager.stop()
|
||||
self.file_manager.stop()
|
||||
|
||||
|
||||
class BackgroundDownloaderComponent(Component):
|
||||
|
@ -560,6 +560,8 @@ class UPnPComponent(Component):
|
|||
self.upnp = await UPnP.discover(loop=self.component_manager.loop)
|
||||
log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string)
|
||||
except Exception as err:
|
||||
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
|
||||
raise
|
||||
log.warning("upnp discovery failed: %s", err)
|
||||
self.upnp = None
|
||||
|
||||
|
|
|
@ -614,8 +614,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
content_type='application/json'
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
async def handle_metrics_get_request(request: web.Request):
|
||||
async def handle_metrics_get_request(self, request: web.Request):
|
||||
try:
|
||||
return web.Response(
|
||||
text=prom_generate_latest().decode(),
|
||||
|
|
|
@ -80,6 +80,8 @@ class MarketFeed:
|
|||
self.rate = ExchangeRate(self.market, rate, int(time.time()))
|
||||
self.last_check = time.time()
|
||||
return self.rate
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except asyncio.TimeoutError:
|
||||
log.warning("Timed out fetching exchange rate from %s.", self.name)
|
||||
except json.JSONDecodeError as e:
|
||||
|
|
|
@ -793,7 +793,7 @@ class SQLiteStorage(SQLiteMixin):
|
|||
|
||||
await self.db.run(_save_claims)
|
||||
if update_file_callbacks:
|
||||
await asyncio.wait(map(asyncio.create_task, update_file_callbacks))
|
||||
await asyncio.wait(update_file_callbacks)
|
||||
if claim_id_to_supports:
|
||||
await self.save_supports(claim_id_to_supports)
|
||||
|
||||
|
|
|
@ -50,10 +50,10 @@ class FileManager:
|
|||
await manager.started.wait()
|
||||
self.started.set()
|
||||
|
||||
async def stop(self):
|
||||
def stop(self):
|
||||
for manager in self.source_managers.values():
|
||||
# fixme: pop or not?
|
||||
await manager.stop()
|
||||
manager.stop()
|
||||
self.started.clear()
|
||||
|
||||
@cache_concurrent
|
||||
|
@ -99,6 +99,8 @@ class FileManager:
|
|||
except asyncio.TimeoutError:
|
||||
raise ResolveTimeoutError(uri)
|
||||
except Exception as err:
|
||||
if isinstance(err, asyncio.CancelledError):
|
||||
raise
|
||||
log.exception("Unexpected error resolving stream:")
|
||||
raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
|
||||
if 'error' in resolved_result:
|
||||
|
@ -247,7 +249,7 @@ class FileManager:
|
|||
except asyncio.TimeoutError:
|
||||
error = DownloadDataTimeoutError(stream.sd_hash)
|
||||
raise error
|
||||
except (Exception, asyncio.CancelledError) as err: # forgive data timeout, don't delete stream
|
||||
except Exception as err: # forgive data timeout, don't delete stream
|
||||
expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
|
||||
KeyFeeAboveMaxAllowedError, ResolveError, InvalidStreamURLError)
|
||||
if isinstance(err, expected):
|
||||
|
|
|
@ -67,7 +67,7 @@ class ManagedDownloadSource:
|
|||
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
|
||||
raise NotImplementedError()
|
||||
|
||||
async def stop_tasks(self):
|
||||
def stop_tasks(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):
|
||||
|
|
|
@ -59,11 +59,11 @@ class SourceManager:
|
|||
def add(self, source: ManagedDownloadSource):
|
||||
self._sources[source.identifier] = source
|
||||
|
||||
async def remove(self, source: ManagedDownloadSource):
|
||||
def remove(self, source: ManagedDownloadSource):
|
||||
if source.identifier not in self._sources:
|
||||
return
|
||||
self._sources.pop(source.identifier)
|
||||
await source.stop_tasks()
|
||||
source.stop_tasks()
|
||||
|
||||
async def initialize_from_database(self):
|
||||
raise NotImplementedError()
|
||||
|
@ -72,10 +72,10 @@ class SourceManager:
|
|||
await self.initialize_from_database()
|
||||
self.started.set()
|
||||
|
||||
async def stop(self):
|
||||
def stop(self):
|
||||
while self._sources:
|
||||
_, source = self._sources.popitem()
|
||||
await source.stop_tasks()
|
||||
source.stop_tasks()
|
||||
self.started.clear()
|
||||
|
||||
async def create(self, file_path: str, key: Optional[bytes] = None,
|
||||
|
@ -83,7 +83,7 @@ class SourceManager:
|
|||
raise NotImplementedError()
|
||||
|
||||
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
|
||||
await self.remove(source)
|
||||
self.remove(source)
|
||||
if delete_file and source.output_file_exists:
|
||||
os.remove(source.full_path)
|
||||
|
||||
|
|
|
@ -23,7 +23,6 @@ class BackgroundDownloader:
|
|||
except ValueError:
|
||||
return
|
||||
except asyncio.CancelledError:
|
||||
log.debug("Cancelled background downloader")
|
||||
raise
|
||||
except Exception:
|
||||
log.error("Unexpected download error on background downloader")
|
||||
|
|
|
@ -191,7 +191,7 @@ class ManagedStream(ManagedDownloadSource):
|
|||
Stop any running save/stream tasks as well as the downloader and update the status in the database
|
||||
"""
|
||||
|
||||
await self.stop_tasks()
|
||||
self.stop_tasks()
|
||||
if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING:
|
||||
await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED)
|
||||
|
||||
|
@ -279,7 +279,7 @@ class ManagedStream(ManagedDownloadSource):
|
|||
log.info("finished saving file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id,
|
||||
self.sd_hash[:6], self.full_path)
|
||||
await self.blob_manager.storage.set_saved_file(self.stream_hash)
|
||||
except (Exception, asyncio.CancelledError) as err:
|
||||
except Exception as err:
|
||||
if os.path.isfile(output_path):
|
||||
log.warning("removing incomplete download %s for %s", output_path, self.sd_hash)
|
||||
os.remove(output_path)
|
||||
|
@ -324,13 +324,12 @@ class ManagedStream(ManagedDownloadSource):
|
|||
await asyncio.wait_for(self.started_writing.wait(), self.config.download_timeout)
|
||||
except asyncio.TimeoutError:
|
||||
log.warning("timeout starting to write data for lbry://%s#%s", self.claim_name, self.claim_id)
|
||||
await self.stop_tasks()
|
||||
self.stop_tasks()
|
||||
await self.update_status(ManagedStream.STATUS_STOPPED)
|
||||
|
||||
async def stop_tasks(self):
|
||||
def stop_tasks(self):
|
||||
if self.file_output_task and not self.file_output_task.done():
|
||||
self.file_output_task.cancel()
|
||||
await asyncio.gather(self.file_output_task, return_exceptions=True)
|
||||
self.file_output_task = None
|
||||
while self.streaming_responses:
|
||||
req, response = self.streaming_responses.pop()
|
||||
|
@ -367,7 +366,7 @@ class ManagedStream(ManagedDownloadSource):
|
|||
return sent
|
||||
except ConnectionError:
|
||||
return sent
|
||||
except (OSError, Exception, asyncio.CancelledError) as err:
|
||||
except (OSError, Exception) as err:
|
||||
if isinstance(err, asyncio.CancelledError):
|
||||
log.warning("stopped uploading %s#%s to reflector", self.claim_name, self.claim_id)
|
||||
elif isinstance(err, OSError):
|
||||
|
|
|
@ -164,6 +164,8 @@ class StreamManager(SourceManager):
|
|||
async def reflect_streams(self):
|
||||
try:
|
||||
return await self._reflect_streams()
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception:
|
||||
log.exception("reflector task encountered an unexpected error!")
|
||||
|
||||
|
@ -196,8 +198,8 @@ class StreamManager(SourceManager):
|
|||
await super().start()
|
||||
self.re_reflect_task = self.loop.create_task(self.reflect_streams())
|
||||
|
||||
async def stop(self):
|
||||
await super().stop()
|
||||
def stop(self):
|
||||
super().stop()
|
||||
if self.resume_saving_task and not self.resume_saving_task.done():
|
||||
self.resume_saving_task.cancel()
|
||||
if self.re_reflect_task and not self.re_reflect_task.done():
|
||||
|
@ -224,8 +226,7 @@ class StreamManager(SourceManager):
|
|||
)
|
||||
return task
|
||||
|
||||
@staticmethod
|
||||
async def _retriable_reflect_stream(stream, host, port):
|
||||
async def _retriable_reflect_stream(self, stream, host, port):
|
||||
sent = await stream.upload_to_reflector(host, port)
|
||||
while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0:
|
||||
stream.reflector_progress = 0
|
||||
|
@ -260,7 +261,7 @@ class StreamManager(SourceManager):
|
|||
return
|
||||
if source.identifier in self.running_reflector_uploads:
|
||||
self.running_reflector_uploads[source.identifier].cancel()
|
||||
await source.stop_tasks()
|
||||
source.stop_tasks()
|
||||
if source.identifier in self.streams:
|
||||
del self.streams[source.identifier]
|
||||
blob_hashes = [source.identifier] + [b.blob_hash for b in source.descriptor.blobs[:-1]]
|
||||
|
|
|
@ -74,7 +74,7 @@ class TorrentSource(ManagedDownloadSource):
|
|||
def bt_infohash(self):
|
||||
return self.identifier
|
||||
|
||||
async def stop_tasks(self):
|
||||
def stop_tasks(self):
|
||||
pass
|
||||
|
||||
@property
|
||||
|
@ -118,8 +118,8 @@ class TorrentManager(SourceManager):
|
|||
async def start(self):
|
||||
await super().start()
|
||||
|
||||
async def stop(self):
|
||||
await super().stop()
|
||||
def stop(self):
|
||||
super().stop()
|
||||
log.info("finished stopping the torrent manager")
|
||||
|
||||
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
|
||||
|
|
|
@ -141,7 +141,7 @@ class CoinSelector:
|
|||
_) -> List[OutputEffectiveAmountEstimator]:
|
||||
""" Accumulate UTXOs at random until there is enough to cover the target. """
|
||||
target = self.target + self.cost_of_change
|
||||
self.random.shuffle(txos, random=self.random.random) # pylint: disable=deprecated-argument
|
||||
self.random.shuffle(txos, self.random.random)
|
||||
selection = []
|
||||
amount = 0
|
||||
for coin in txos:
|
||||
|
|
|
@ -30,6 +30,7 @@ from lbry.wallet.bip32 import PublicKey, PrivateKey
|
|||
from lbry.wallet.coinselection import CoinSelector
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
log.setLevel("DEBUG")
|
||||
|
||||
LedgerType = Type['BaseLedger']
|
||||
|
||||
|
@ -329,10 +330,10 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
async def start(self):
|
||||
if not os.path.exists(self.path):
|
||||
os.mkdir(self.path)
|
||||
await asyncio.wait(map(asyncio.create_task, [
|
||||
await asyncio.wait([
|
||||
self.db.open(),
|
||||
self.headers.open()
|
||||
]))
|
||||
])
|
||||
fully_synced = self.on_ready.first
|
||||
asyncio.create_task(self.network.start())
|
||||
await self.network.on_connected.first
|
||||
|
@ -466,9 +467,9 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
async def subscribe_accounts(self):
|
||||
if self.network.is_connected and self.accounts:
|
||||
log.info("Subscribe to %i accounts", len(self.accounts))
|
||||
await asyncio.wait(map(asyncio.create_task, [
|
||||
await asyncio.wait([
|
||||
self.subscribe_account(a) for a in self.accounts
|
||||
]))
|
||||
])
|
||||
|
||||
async def subscribe_account(self, account: Account):
|
||||
for address_manager in account.address_managers.values():
|
||||
|
@ -506,6 +507,7 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
|
||||
def process_status_update(self, update):
|
||||
address, remote_status = update
|
||||
print(f"**** status update {address} {remote_status}")
|
||||
self._update_tasks.add(self.update_history(address, remote_status))
|
||||
|
||||
async def update_history(self, address, remote_status, address_manager: AddressManager = None,
|
||||
|
@ -518,6 +520,7 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
return True
|
||||
|
||||
remote_history = await self.network.retriable_call(self.network.get_history, address)
|
||||
print(f'>>>>>> {remote_history}')
|
||||
remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history))
|
||||
we_need = set(remote_history) - set(local_history)
|
||||
if not we_need:
|
||||
|
@ -549,7 +552,7 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
continue
|
||||
to_request[i] = (txid, remote_height)
|
||||
|
||||
log.debug(
|
||||
log.warning(
|
||||
"request %i transactions, %i/%i for %s are already synced", len(to_request), len(already_synced),
|
||||
len(remote_history), address
|
||||
)
|
||||
|
@ -558,8 +561,8 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
self.maybe_has_channel_key(tx)
|
||||
pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:"
|
||||
if len(pending_synced_history) % 100 == 0:
|
||||
log.info("Syncing address %s: %d/%d", address, len(pending_synced_history), len(to_request))
|
||||
log.info("Sync finished for address %s: %d/%d", address, len(pending_synced_history), len(to_request))
|
||||
log.warning("Syncing address %s: %d/%d", address, len(pending_synced_history), len(to_request))
|
||||
log.warning("Sync finished for address %s: %d/%d", address, len(pending_synced_history), len(to_request))
|
||||
|
||||
assert len(pending_synced_history) == len(remote_history), \
|
||||
f"{len(pending_synced_history)} vs {len(remote_history)} for {address}"
|
||||
|
@ -606,7 +609,7 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
self._known_addresses_out_of_sync.add(address)
|
||||
return False
|
||||
else:
|
||||
log.debug("finished syncing transaction history for %s, %i known txs", address, len(local_history))
|
||||
log.warning("finished syncing transaction history for %s, %i known txs", address, len(local_history))
|
||||
return True
|
||||
|
||||
async def maybe_verify_transaction(self, tx, remote_height, merkle=None):
|
||||
|
@ -621,6 +624,8 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
header = await self.headers.get(remote_height)
|
||||
tx.position = merkle['pos']
|
||||
tx.is_verified = merkle_root == header['merkle_root']
|
||||
if not tx.is_verified:
|
||||
print(f"&&&&&&& {tx.height}: {merkle_root} != {header['merkle_root']}")
|
||||
return tx
|
||||
|
||||
def maybe_has_channel_key(self, tx):
|
||||
|
@ -938,7 +943,9 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
"%d change addresses (gap: %d), %d channels, %d certificates and %d claims. ",
|
||||
account.id, balance, total_receiving, account.receiving.gap, total_change,
|
||||
account.change.gap, channel_count, len(account.channel_keys), claim_count)
|
||||
except Exception:
|
||||
except Exception as err:
|
||||
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
|
||||
raise
|
||||
log.exception(
|
||||
'Failed to display wallet state, please file issue '
|
||||
'for this bug along with the traceback you see below:')
|
||||
|
@ -961,7 +968,9 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
claim_ids = [p.purchased_claim_id for p in purchases]
|
||||
try:
|
||||
resolved, _, _, _ = await self.claim_search([], claim_ids=claim_ids)
|
||||
except Exception:
|
||||
except Exception as err:
|
||||
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
|
||||
raise
|
||||
log.exception("Resolve failed while looking up purchased claim ids:")
|
||||
resolved = []
|
||||
lookup = {claim.claim_id: claim for claim in resolved}
|
||||
|
@ -1041,7 +1050,9 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
claim_ids = collection.claim.collection.claims.ids[offset:page_size + offset]
|
||||
try:
|
||||
resolve_results, _, _, _ = await self.claim_search([], claim_ids=claim_ids)
|
||||
except Exception:
|
||||
except Exception as err:
|
||||
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
|
||||
raise
|
||||
log.exception("Resolve failed while looking up collection claim ids:")
|
||||
return []
|
||||
claims = []
|
||||
|
|
|
@ -76,7 +76,9 @@ class ClientSession(BaseClientSession):
|
|||
raise asyncio.TimeoutError
|
||||
if done:
|
||||
try:
|
||||
return request.result()
|
||||
result = request.result()
|
||||
log.warning("sent %s%s to %s:%i (%i timeout) result: %s", method, tuple(args), self.server[0], self.server[1], self.timeout, result)
|
||||
return result
|
||||
except ConnectionResetError:
|
||||
log.error(
|
||||
"wallet server (%s) reset connection upon our %s request, json of %i args is %i bytes",
|
||||
|
@ -117,7 +119,7 @@ class ClientSession(BaseClientSession):
|
|||
)
|
||||
else:
|
||||
await asyncio.sleep(max(0, max_idle - (now - self.last_send)))
|
||||
except (Exception, asyncio.CancelledError) as err:
|
||||
except Exception as err:
|
||||
if isinstance(err, asyncio.CancelledError):
|
||||
log.info("closing connection to %s:%i", *self.server)
|
||||
else:
|
||||
|
@ -214,7 +216,7 @@ class Network:
|
|||
def loop_task_done_callback(f):
|
||||
try:
|
||||
f.result()
|
||||
except (Exception, asyncio.CancelledError):
|
||||
except Exception:
|
||||
if self.running:
|
||||
log.exception("wallet server connection loop crashed")
|
||||
|
||||
|
@ -304,7 +306,7 @@ class Network:
|
|||
await client.ensure_server_version()
|
||||
return client
|
||||
except (asyncio.TimeoutError, ConnectionError, OSError, IncompatibleWalletServerError, RPCError):
|
||||
log.warning("Connecting to %s:%d failed", host, port)
|
||||
log.exception("Connecting to %s:%d failed", host, port)
|
||||
client._close()
|
||||
return
|
||||
|
||||
|
@ -312,8 +314,7 @@ class Network:
|
|||
sleep_delay = 30
|
||||
while self.running:
|
||||
await asyncio.wait(
|
||||
map(asyncio.create_task, [asyncio.sleep(30), self._urgent_need_reconnect.wait()]),
|
||||
return_when=asyncio.FIRST_COMPLETED
|
||||
[asyncio.sleep(30), self._urgent_need_reconnect.wait()], return_when=asyncio.FIRST_COMPLETED
|
||||
)
|
||||
if self._urgent_need_reconnect.is_set():
|
||||
sleep_delay = 30
|
||||
|
@ -329,7 +330,8 @@ class Network:
|
|||
features = await client.send_request('server.features', [])
|
||||
self.client, self.server_features = client, features
|
||||
log.debug("discover other hubs %s:%i", *client.server)
|
||||
await self._update_hubs(await client.send_request('server.peers.subscribe', []))
|
||||
# TODO: Enable this after herald.go supports 'server.peers.subscribe'.
|
||||
#await self._update_hubs(await client.send_request('server.peers.subscribe', []))
|
||||
log.info("subscribe to headers %s:%i", *client.server)
|
||||
self._update_remote_height((await self.subscribe_headers(),))
|
||||
self._on_connected_controller.add(True)
|
||||
|
@ -339,7 +341,7 @@ class Network:
|
|||
try:
|
||||
if not self._urgent_need_reconnect.is_set():
|
||||
await asyncio.wait(
|
||||
[self._keepalive_task, asyncio.create_task(self._urgent_need_reconnect.wait())],
|
||||
[self._keepalive_task, self._urgent_need_reconnect.wait()],
|
||||
return_when=asyncio.FIRST_COMPLETED
|
||||
)
|
||||
else:
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
# pylint: disable=import-error
|
||||
import os
|
||||
import signal
|
||||
import json
|
||||
import shutil
|
||||
import asyncio
|
||||
|
@ -30,7 +31,7 @@ try:
|
|||
from hub.elastic_sync.service import ElasticSyncService
|
||||
from hub.scribe.service import BlockchainProcessorService
|
||||
except ImportError:
|
||||
pass
|
||||
raise
|
||||
|
||||
|
||||
def get_lbcd_node_from_ledger(ledger_module):
|
||||
|
@ -226,6 +227,7 @@ class SPVNode:
|
|||
self.stopped = False
|
||||
try:
|
||||
self.data_path = tempfile.mkdtemp()
|
||||
#self.data_path = '/Users/swdev1/herald/test_db'
|
||||
conf = {
|
||||
'description': '',
|
||||
'payment_address': '',
|
||||
|
@ -249,7 +251,9 @@ class SPVNode:
|
|||
BlockchainEnv(db_dir=self.data_path, daemon_url=lbcwallet_node.rpc_url,
|
||||
reorg_limit=100, max_query_workers=0, chain='regtest', index_address_status=False)
|
||||
)
|
||||
self.server = HubServerService(ServerEnv(**conf))
|
||||
# Select Herald variant:
|
||||
self.server = HubNode("", "herald", self) # Go Herald
|
||||
#self.server = HubServerService(ServerEnv(**conf)) # Python Herald
|
||||
self.es_writer = ElasticSyncService(
|
||||
ElasticEnv(
|
||||
db_dir=self.data_path, reorg_limit=100, max_query_workers=0, chain='regtest',
|
||||
|
@ -264,7 +268,8 @@ class SPVNode:
|
|||
await self.server.start()
|
||||
except Exception as e:
|
||||
self.stopped = True
|
||||
log.exception("failed to start spv node")
|
||||
if not isinstance(e, asyncio.CancelledError):
|
||||
log.exception("failed to start spv node")
|
||||
raise e
|
||||
|
||||
async def stop(self, cleanup=True):
|
||||
|
@ -284,7 +289,8 @@ class SPVNode:
|
|||
cleanup and self.cleanup()
|
||||
|
||||
def cleanup(self):
|
||||
shutil.rmtree(self.data_path, ignore_errors=True)
|
||||
log.error("skipping cleanup of data_path: %s", self.data_path)
|
||||
#shutil.rmtree(self.data_path, ignore_errors=True)
|
||||
|
||||
|
||||
class LBCDProcess(asyncio.SubprocessProtocol):
|
||||
|
@ -673,3 +679,146 @@ class LBCWalletNode:
|
|||
|
||||
def get_raw_transaction(self, txid):
|
||||
return self._cli_cmnd('getrawtransaction', txid, '1')
|
||||
|
||||
|
||||
class HubProcess(asyncio.SubprocessProtocol):
|
||||
def __init__(self, ready, stopped):
|
||||
self.ready = ready
|
||||
self.stopped = stopped
|
||||
self.log = log.getChild('hub')
|
||||
self.transport = None
|
||||
|
||||
def pipe_data_received(self, fd, data):
|
||||
self.stopped.clear()
|
||||
self.ready.set()
|
||||
if self.log:
|
||||
self.log.warning(data.decode())
|
||||
#if b'error' in data.lower():
|
||||
# self.ready.set()
|
||||
# raise SystemError(data.decode())
|
||||
if b'listening on' in data:
|
||||
self.ready.set()
|
||||
str_lines = str(data.decode()).split("\n")
|
||||
for line in str_lines:
|
||||
if 'releaseTime' in line:
|
||||
print(line)
|
||||
|
||||
def process_exited(self):
|
||||
self.ready.clear()
|
||||
self.stopped.set()
|
||||
|
||||
async def stop(self):
|
||||
t = asyncio.create_task(self.stopped.wait())
|
||||
try:
|
||||
self.transport.send_signal(signal.SIGINT)
|
||||
await asyncio.wait_for(t, 3)
|
||||
# log.warning("stopped go hub")
|
||||
except asyncio.TimeoutError:
|
||||
if not t.done():
|
||||
t.cancel()
|
||||
self.transport.terminate()
|
||||
await self.stopped.wait()
|
||||
log.warning("terminated go hub")
|
||||
|
||||
|
||||
class HubNode:
|
||||
def __init__(self, url, daemon, spv_node):
|
||||
self.spv_node = spv_node
|
||||
self.latest_release_url = url
|
||||
self.project_dir = os.path.dirname(os.path.dirname(__file__))
|
||||
self.bin_dir = os.path.join(self.project_dir, 'bin')
|
||||
self.daemon_bin = os.path.join(self.bin_dir, daemon)
|
||||
self.cli_bin = os.path.join(self.bin_dir, daemon)
|
||||
self.log = log.getChild('hub')
|
||||
self.transport = None
|
||||
self.protocol = None
|
||||
self.hostname = 'localhost'
|
||||
self.rpcport = 50051 # avoid conflict with default rpc port
|
||||
self._stopped = asyncio.Event()
|
||||
self.running = asyncio.Event()
|
||||
|
||||
@property
|
||||
def stopped(self):
|
||||
return not self.running.is_set()
|
||||
|
||||
@property
|
||||
def exists(self):
|
||||
return (
|
||||
os.path.exists(self.cli_bin) and
|
||||
os.path.exists(self.daemon_bin)
|
||||
)
|
||||
|
||||
def download(self):
|
||||
downloaded_file = os.path.join(
|
||||
self.bin_dir,
|
||||
self.latest_release_url[self.latest_release_url.rfind('/')+1:]
|
||||
)
|
||||
|
||||
if not os.path.exists(self.bin_dir):
|
||||
os.mkdir(self.bin_dir)
|
||||
|
||||
if not os.path.exists(downloaded_file):
|
||||
self.log.info('Downloading: %s', self.latest_release_url)
|
||||
with urllib.request.urlopen(self.latest_release_url) as response:
|
||||
with open(downloaded_file, 'wb') as out_file:
|
||||
shutil.copyfileobj(response, out_file)
|
||||
|
||||
self.log.info('Extracting: %s', downloaded_file)
|
||||
|
||||
if downloaded_file.endswith('.zip'):
|
||||
with zipfile.ZipFile(downloaded_file) as dotzip:
|
||||
dotzip.extractall(self.bin_dir)
|
||||
# zipfile bug https://bugs.python.org/issue15795
|
||||
os.chmod(self.cli_bin, 0o755)
|
||||
os.chmod(self.daemon_bin, 0o755)
|
||||
|
||||
elif downloaded_file.endswith('.tar.gz'):
|
||||
with tarfile.open(downloaded_file) as tar:
|
||||
tar.extractall(self.bin_dir)
|
||||
|
||||
os.chmod(self.daemon_bin, 0o755)
|
||||
|
||||
return self.exists
|
||||
|
||||
def ensure(self):
|
||||
return self.exists or self.download()
|
||||
|
||||
async def start(self):
|
||||
assert self.ensure()
|
||||
loop = asyncio.get_event_loop()
|
||||
asyncio.get_child_watcher().attach_loop(loop)
|
||||
command = [
|
||||
self.daemon_bin, 'serve',
|
||||
'--db-path', self.spv_node.data_path + '/lbry-rocksdb',
|
||||
'--chain', 'regtest',
|
||||
'--json-rpc-port', str(self.spv_node.port),
|
||||
'--json-rpc-http-port', '0', # disabled
|
||||
'--esindex', self.spv_node.index_name + 'claims',
|
||||
'--notifier-port', str(self.spv_node.elastic_notifier_port),
|
||||
'--debug'
|
||||
]
|
||||
self.log.info(' '.join(command))
|
||||
self.protocol = HubProcess(self.running, self._stopped)
|
||||
try:
|
||||
self.transport, _ = await loop.subprocess_exec(
|
||||
lambda: self.protocol, *command
|
||||
)
|
||||
self.protocol.transport = self.transport
|
||||
except Exception as e:
|
||||
log.exception('failed to start go hub', exc_info=e)
|
||||
raise e
|
||||
await self.protocol.ready.wait()
|
||||
|
||||
async def stop(self, cleanup=True):
|
||||
try:
|
||||
if self.protocol:
|
||||
await self.protocol.stop()
|
||||
except Exception as e:
|
||||
log.exception('failed to stop go hub', exc_info=e)
|
||||
raise e
|
||||
finally:
|
||||
if cleanup:
|
||||
self.cleanup()
|
||||
|
||||
def cleanup(self):
|
||||
pass
|
||||
|
|
|
@ -28,7 +28,6 @@ disable=
|
|||
no-else-return,
|
||||
cyclic-import,
|
||||
missing-docstring,
|
||||
consider-using-f-string,
|
||||
duplicate-code,
|
||||
expression-not-assigned,
|
||||
inconsistent-return-statements,
|
||||
|
|
6
setup.py
6
setup.py
|
@ -18,7 +18,7 @@ setup(
|
|||
long_description_content_type="text/markdown",
|
||||
keywords="lbry protocol media",
|
||||
license='MIT',
|
||||
python_requires='>=3.8',
|
||||
python_requires='>=3.7',
|
||||
packages=find_packages(exclude=('tests',)),
|
||||
zip_safe=False,
|
||||
entry_points={
|
||||
|
@ -36,7 +36,7 @@ setup(
|
|||
'distro==1.4.0',
|
||||
'base58==1.0.0',
|
||||
'cffi==1.13.2',
|
||||
'cryptography==3.4.7',
|
||||
'cryptography==2.5',
|
||||
'protobuf==3.17.2',
|
||||
'prometheus_client==0.7.1',
|
||||
'ecdsa==0.13.3',
|
||||
|
@ -50,7 +50,7 @@ setup(
|
|||
],
|
||||
extras_require={
|
||||
'lint': [
|
||||
'pylint==2.13.9'
|
||||
'pylint==2.10.0'
|
||||
],
|
||||
'test': [
|
||||
'coverage',
|
||||
|
|
|
@ -61,14 +61,16 @@ def mock_network_loop(loop: asyncio.AbstractEventLoop,
|
|||
dht_network[from_addr] = protocol
|
||||
return transport, protocol
|
||||
|
||||
mock_sock = mock.Mock(spec=socket.socket)
|
||||
mock_sock.setsockopt = lambda *_: None
|
||||
mock_sock.bind = lambda *_: None
|
||||
mock_sock.setblocking = lambda *_: None
|
||||
mock_sock.getsockname = lambda: "0.0.0.0"
|
||||
mock_sock.getpeername = lambda: ""
|
||||
mock_sock.close = lambda: None
|
||||
mock_sock.type = socket.SOCK_DGRAM
|
||||
mock_sock.fileno = lambda: 7
|
||||
loop.create_datagram_endpoint = create_datagram_endpoint
|
||||
yield
|
||||
with mock.patch('socket.socket') as mock_socket:
|
||||
mock_sock = mock.Mock(spec=socket.socket)
|
||||
mock_sock.setsockopt = lambda *_: None
|
||||
mock_sock.bind = lambda *_: None
|
||||
mock_sock.setblocking = lambda *_: None
|
||||
mock_sock.getsockname = lambda: "0.0.0.0"
|
||||
mock_sock.getpeername = lambda: ""
|
||||
mock_sock.close = lambda: None
|
||||
mock_sock.type = socket.SOCK_DGRAM
|
||||
mock_sock.fileno = lambda: 7
|
||||
mock_socket.return_value = mock_sock
|
||||
loop.create_datagram_endpoint = create_datagram_endpoint
|
||||
yield
|
||||
|
|
|
@ -354,7 +354,7 @@ class FileCommands(CommandTestCase):
|
|||
await self.daemon.jsonrpc_get('lbry://foo')
|
||||
with open(original_path, 'wb') as handle:
|
||||
handle.write(b'some other stuff was there instead')
|
||||
await self.daemon.file_manager.stop()
|
||||
self.daemon.file_manager.stop()
|
||||
await self.daemon.file_manager.start()
|
||||
await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) # if this hangs, file didn't get set completed
|
||||
# check that internal state got through up to the file list API
|
||||
|
@ -382,7 +382,8 @@ class FileCommands(CommandTestCase):
|
|||
resp = await self.out(self.daemon.jsonrpc_get('lbry://foo', timeout=2))
|
||||
self.assertNotIn('error', resp)
|
||||
self.assertTrue(os.path.isfile(path))
|
||||
await self.daemon.file_manager.stop()
|
||||
self.daemon.file_manager.stop()
|
||||
await asyncio.sleep(0.01) # FIXME: this sleep should not be needed
|
||||
self.assertFalse(os.path.isfile(path))
|
||||
|
||||
async def test_incomplete_downloads_retry(self):
|
||||
|
@ -477,7 +478,7 @@ class FileCommands(CommandTestCase):
|
|||
|
||||
# restart the daemon and make sure the fee is still there
|
||||
|
||||
await self.daemon.file_manager.stop()
|
||||
self.daemon.file_manager.stop()
|
||||
await self.daemon.file_manager.start()
|
||||
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
|
||||
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].content_fee.raw, raw_content_fee)
|
||||
|
|
|
@ -3,9 +3,7 @@ import hashlib
|
|||
import aiohttp
|
||||
import aiohttp.web
|
||||
import asyncio
|
||||
import contextlib
|
||||
|
||||
from lbry.file.source import ManagedDownloadSource
|
||||
from lbry.utils import aiohttp_request
|
||||
from lbry.blob.blob_file import MAX_BLOB_SIZE
|
||||
from lbry.testcase import CommandTestCase
|
||||
|
@ -23,7 +21,7 @@ def get_random_bytes(n: int) -> bytes:
|
|||
|
||||
class RangeRequests(CommandTestCase):
|
||||
async def _restart_stream_manager(self):
|
||||
await self.daemon.file_manager.stop()
|
||||
self.daemon.file_manager.stop()
|
||||
await self.daemon.file_manager.start()
|
||||
return
|
||||
|
||||
|
@ -354,21 +352,14 @@ class RangeRequests(CommandTestCase):
|
|||
path = stream.full_path
|
||||
self.assertIsNotNone(path)
|
||||
if wait_for_start_writing:
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await stream.started_writing.wait()
|
||||
await stream.started_writing.wait()
|
||||
self.assertTrue(os.path.isfile(path))
|
||||
await self.daemon.file_manager.stop()
|
||||
# while stopped, we get no response to query and no file is present
|
||||
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'], [])
|
||||
self.assertEqual(os.path.isfile(path), stream.status == ManagedDownloadSource.STATUS_FINISHED)
|
||||
await self.daemon.file_manager.start()
|
||||
# after restart, we get a response to query and same file path
|
||||
await self._restart_stream_manager()
|
||||
stream = (await self.daemon.jsonrpc_file_list())['items'][0]
|
||||
self.assertIsNotNone(stream.full_path)
|
||||
self.assertEqual(stream.full_path, path)
|
||||
self.assertFalse(os.path.isfile(path))
|
||||
if wait_for_start_writing:
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await stream.started_writing.wait()
|
||||
await stream.started_writing.wait()
|
||||
self.assertTrue(os.path.isfile(path))
|
||||
|
||||
async def test_file_save_stop_before_finished_streaming_only_wait_for_start(self):
|
||||
|
|
|
@ -424,7 +424,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
|||
self.assertIsNone(stream.full_path)
|
||||
self.assertEqual(0, stream.written_bytes)
|
||||
|
||||
await self.stream_manager.stop()
|
||||
self.stream_manager.stop()
|
||||
await self.stream_manager.start()
|
||||
self.assertEqual(1, len(self.stream_manager.streams))
|
||||
stream = list(self.stream_manager.streams.values())[0]
|
||||
|
@ -449,7 +449,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
|||
stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
|
||||
await stream.finished_writing.wait()
|
||||
await asyncio.sleep(0)
|
||||
await self.stream_manager.stop()
|
||||
self.stream_manager.stop()
|
||||
self.client_blob_manager.stop()
|
||||
# partial removal, only sd blob is missing.
|
||||
# in this case, we recover the sd blob while the other blobs are kept untouched as 'finished'
|
||||
|
|
Loading…
Reference in a new issue