Compare commits
3 commits
master
...
ongoing_fi
Author | SHA1 | Date | |
---|---|---|---|
|
a394713171 | ||
|
368c6ab4a0 | ||
|
907276045b |
6 changed files with 59 additions and 171 deletions
|
@ -218,15 +218,45 @@ class JSONResponseEncoder(JSONEncoder):
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def encode_file(self, managed_stream):
|
def encode_file(self, managed_stream):
|
||||||
file = managed_stream.as_dict()
|
output_exists = managed_stream.output_file_exists
|
||||||
tx_height = managed_stream.stream_claim_info.height
|
tx_height = managed_stream.stream_claim_info.height
|
||||||
best_height = self.ledger.headers.height
|
best_height = self.ledger.headers.height
|
||||||
file.update({
|
return {
|
||||||
|
'streaming_url': managed_stream.stream_url,
|
||||||
|
'completed': managed_stream.completed,
|
||||||
|
'file_name': managed_stream.file_name if output_exists else None,
|
||||||
|
'download_directory': managed_stream.download_directory if output_exists else None,
|
||||||
|
'download_path': managed_stream.full_path if output_exists else None,
|
||||||
|
'claim_output': managed_stream.claim_output,
|
||||||
|
'points_paid': 0.0,
|
||||||
|
'stopped': not managed_stream.running,
|
||||||
|
'stream_hash': managed_stream.stream_hash,
|
||||||
|
'stream_name': managed_stream.descriptor.stream_name,
|
||||||
|
'suggested_file_name': managed_stream.descriptor.suggested_file_name,
|
||||||
|
'sd_hash': managed_stream.descriptor.sd_hash,
|
||||||
|
'mime_type': managed_stream.mime_type,
|
||||||
|
'key': managed_stream.descriptor.key,
|
||||||
|
'total_bytes_lower_bound': managed_stream.descriptor.lower_bound_decrypted_length(),
|
||||||
|
'total_bytes': managed_stream.descriptor.upper_bound_decrypted_length(),
|
||||||
|
'written_bytes': managed_stream.written_bytes,
|
||||||
|
'blobs_completed': managed_stream.blobs_completed,
|
||||||
|
'blobs_in_stream': managed_stream.blobs_in_stream,
|
||||||
|
'blobs_remaining': managed_stream.blobs_remaining,
|
||||||
|
'status': managed_stream.status,
|
||||||
|
'claim_id': managed_stream.claim_id,
|
||||||
|
'txid': managed_stream.txid,
|
||||||
|
'nout': managed_stream.nout,
|
||||||
|
'outpoint': managed_stream.outpoint,
|
||||||
|
'metadata': managed_stream.metadata,
|
||||||
|
'protobuf': managed_stream.metadata_protobuf,
|
||||||
|
'channel_claim_id': managed_stream.channel_claim_id,
|
||||||
|
'channel_name': managed_stream.channel_name,
|
||||||
|
'claim_name': managed_stream.claim_name,
|
||||||
|
'content_fee': managed_stream.content_fee,
|
||||||
'height': tx_height,
|
'height': tx_height,
|
||||||
'confirmations': (best_height+1) - tx_height if tx_height > 0 else tx_height,
|
'confirmations': (best_height + 1) - tx_height if tx_height > 0 else tx_height,
|
||||||
'timestamp': self.ledger.headers[tx_height]['timestamp'] if 0 < tx_height <= best_height else None
|
'timestamp': self.ledger.headers[tx_height]['timestamp'] if 0 < tx_height <= best_height else None
|
||||||
})
|
}
|
||||||
return file
|
|
||||||
|
|
||||||
def encode_claim(self, claim):
|
def encode_claim(self, claim):
|
||||||
encoded = getattr(claim, claim.claim_type).to_dict()
|
encoded = getattr(claim, claim.claim_type).to_dict()
|
||||||
|
|
|
@ -90,20 +90,6 @@ def get_claims_from_stream_hashes(transaction: sqlite3.Connection,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def get_content_claim_from_outpoint(transaction: sqlite3.Connection,
|
|
||||||
outpoint: str) -> typing.Optional[StoredStreamClaim]:
|
|
||||||
query = (
|
|
||||||
"select content_claim.stream_hash, c.*, case when c.channel_claim_id is not null then "
|
|
||||||
" (select claim_name from claim where claim_id==c.channel_claim_id) "
|
|
||||||
" else null end as channel_name "
|
|
||||||
" from content_claim "
|
|
||||||
" inner join claim c on c.claim_outpoint=content_claim.claim_outpoint and content_claim.claim_outpoint=?"
|
|
||||||
)
|
|
||||||
claim_fields = transaction.execute(query, (outpoint, )).fetchone()
|
|
||||||
if claim_fields:
|
|
||||||
return StoredStreamClaim(*claim_fields)
|
|
||||||
|
|
||||||
|
|
||||||
def _batched_select(transaction, query, parameters, batch_size=900):
|
def _batched_select(transaction, query, parameters, batch_size=900):
|
||||||
for start_index in range(0, len(parameters), batch_size):
|
for start_index in range(0, len(parameters), batch_size):
|
||||||
current_batch = parameters[start_index:start_index+batch_size]
|
current_batch = parameters[start_index:start_index+batch_size]
|
||||||
|
@ -325,31 +311,6 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
"select status from blob where blob_hash=?", blob_hash
|
"select status from blob where blob_hash=?", blob_hash
|
||||||
)
|
)
|
||||||
|
|
||||||
def should_announce(self, blob_hash: str):
|
|
||||||
return self.run_and_return_one_or_none(
|
|
||||||
"select should_announce from blob where blob_hash=?", blob_hash
|
|
||||||
)
|
|
||||||
|
|
||||||
def count_should_announce_blobs(self):
|
|
||||||
return self.run_and_return_one_or_none(
|
|
||||||
"select count(*) from blob where should_announce=1 and status='finished'"
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_all_should_announce_blobs(self):
|
|
||||||
return self.run_and_return_list(
|
|
||||||
"select blob_hash from blob where should_announce=1 and status='finished'"
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_all_finished_blobs(self):
|
|
||||||
return self.run_and_return_list(
|
|
||||||
"select blob_hash from blob where status='finished'"
|
|
||||||
)
|
|
||||||
|
|
||||||
def count_finished_blobs(self):
|
|
||||||
return self.run_and_return_one_or_none(
|
|
||||||
"select count(*) from blob where status='finished'"
|
|
||||||
)
|
|
||||||
|
|
||||||
def update_last_announced_blobs(self, blob_hashes: typing.List[str]):
|
def update_last_announced_blobs(self, blob_hashes: typing.List[str]):
|
||||||
def _update_last_announced_blobs(transaction: sqlite3.Connection):
|
def _update_last_announced_blobs(transaction: sqlite3.Connection):
|
||||||
last_announced = self.time_getter()
|
last_announced = self.time_getter()
|
||||||
|
@ -427,26 +388,6 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
}
|
}
|
||||||
return self.db.run(_sync_blobs)
|
return self.db.run(_sync_blobs)
|
||||||
|
|
||||||
def sync_files_to_blobs(self):
|
|
||||||
def _sync_blobs(transaction: sqlite3.Connection):
|
|
||||||
transaction.executemany(
|
|
||||||
"update file set status='stopped' where stream_hash=?",
|
|
||||||
transaction.execute(
|
|
||||||
"select distinct sb.stream_hash from stream_blob sb "
|
|
||||||
"inner join blob b on b.blob_hash=sb.blob_hash and b.status=='pending'"
|
|
||||||
).fetchall()
|
|
||||||
)
|
|
||||||
return self.db.run(_sync_blobs)
|
|
||||||
|
|
||||||
def set_files_as_streaming(self, stream_hashes: typing.List[str]):
|
|
||||||
def _set_streaming(transaction: sqlite3.Connection):
|
|
||||||
transaction.executemany(
|
|
||||||
"update file set file_name=null, download_directory=null where stream_hash=?",
|
|
||||||
[(stream_hash, ) for stream_hash in stream_hashes]
|
|
||||||
)
|
|
||||||
|
|
||||||
return self.db.run(_set_streaming)
|
|
||||||
|
|
||||||
# # # # # # # # # stream functions # # # # # # # # #
|
# # # # # # # # # stream functions # # # # # # # # #
|
||||||
|
|
||||||
async def stream_exists(self, sd_hash: str) -> bool:
|
async def stream_exists(self, sd_hash: str) -> bool:
|
||||||
|
@ -459,11 +400,6 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
"s.stream_hash=f.stream_hash and s.sd_hash=?", sd_hash)
|
"s.stream_hash=f.stream_hash and s.sd_hash=?", sd_hash)
|
||||||
return streams is not None
|
return streams is not None
|
||||||
|
|
||||||
def rowid_for_stream(self, stream_hash: str) -> typing.Awaitable[typing.Optional[int]]:
|
|
||||||
return self.run_and_return_one_or_none(
|
|
||||||
"select rowid from file where stream_hash=?", stream_hash
|
|
||||||
)
|
|
||||||
|
|
||||||
def store_stream(self, sd_blob: 'BlobFile', descriptor: 'StreamDescriptor'):
|
def store_stream(self, sd_blob: 'BlobFile', descriptor: 'StreamDescriptor'):
|
||||||
return self.db.run(store_stream, sd_blob, descriptor)
|
return self.db.run(store_stream, sd_blob, descriptor)
|
||||||
|
|
||||||
|
@ -509,12 +445,6 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
"select stream_hash from stream where sd_hash = ?", sd_blob_hash
|
"select stream_hash from stream where sd_hash = ?", sd_blob_hash
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_stream_info_for_sd_hash(self, sd_blob_hash):
|
|
||||||
return self.run_and_return_one_or_none(
|
|
||||||
"select stream_hash, stream_name, suggested_filename, stream_key from stream where sd_hash = ?",
|
|
||||||
sd_blob_hash
|
|
||||||
)
|
|
||||||
|
|
||||||
def delete_stream(self, descriptor: 'StreamDescriptor'):
|
def delete_stream(self, descriptor: 'StreamDescriptor'):
|
||||||
return self.db.run_with_foreign_keys_disabled(delete_stream, descriptor)
|
return self.db.run_with_foreign_keys_disabled(delete_stream, descriptor)
|
||||||
|
|
||||||
|
@ -788,55 +718,6 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
claim['effective_amount'] = calculate_effective_amount(claim['amount'], supports)
|
claim['effective_amount'] = calculate_effective_amount(claim['amount'], supports)
|
||||||
return claim
|
return claim
|
||||||
|
|
||||||
async def get_claims_from_stream_hashes(self, stream_hashes: typing.List[str],
|
|
||||||
include_supports: typing.Optional[bool] = True):
|
|
||||||
claims = await self.db.run(get_claims_from_stream_hashes, stream_hashes)
|
|
||||||
return {stream_hash: claim_info.as_dict() for stream_hash, claim_info in claims.items()}
|
|
||||||
|
|
||||||
async def get_claim(self, claim_outpoint, include_supports=True):
|
|
||||||
claim_info = await self.db.run(get_content_claim_from_outpoint, claim_outpoint)
|
|
||||||
if not claim_info:
|
|
||||||
return
|
|
||||||
result = claim_info.as_dict()
|
|
||||||
if include_supports:
|
|
||||||
supports = await self.get_supports(result['claim_id'])
|
|
||||||
result['supports'] = supports
|
|
||||||
result['effective_amount'] = calculate_effective_amount(result['amount'], supports)
|
|
||||||
return result
|
|
||||||
|
|
||||||
def get_unknown_certificate_ids(self):
|
|
||||||
def _get_unknown_certificate_claim_ids(transaction):
|
|
||||||
return [
|
|
||||||
claim_id for (claim_id,) in transaction.execute(
|
|
||||||
"select distinct c1.channel_claim_id from claim as c1 "
|
|
||||||
"where c1.channel_claim_id!='' "
|
|
||||||
"and c1.channel_claim_id not in "
|
|
||||||
"(select c2.claim_id from claim as c2)"
|
|
||||||
).fetchall()
|
|
||||||
]
|
|
||||||
return self.db.run(_get_unknown_certificate_claim_ids)
|
|
||||||
|
|
||||||
async def get_pending_claim_outpoints(self):
|
|
||||||
claim_outpoints = await self.run_and_return_list("select claim_outpoint from claim where height=-1")
|
|
||||||
results = {} # {txid: [nout, ...]}
|
|
||||||
for outpoint_str in claim_outpoints:
|
|
||||||
txid, nout = outpoint_str.split(":")
|
|
||||||
outputs = results.get(txid, [])
|
|
||||||
outputs.append(int(nout))
|
|
||||||
results[txid] = outputs
|
|
||||||
if results:
|
|
||||||
log.debug("missing transaction heights for %i claims", len(results))
|
|
||||||
return results
|
|
||||||
|
|
||||||
def save_claim_tx_heights(self, claim_tx_heights):
|
|
||||||
def _save_claim_heights(transaction):
|
|
||||||
for outpoint, height in claim_tx_heights.items():
|
|
||||||
transaction.execute(
|
|
||||||
"update claim set height=? where claim_outpoint=? and height=-1",
|
|
||||||
(height, outpoint)
|
|
||||||
)
|
|
||||||
return self.db.run(_save_claim_heights)
|
|
||||||
|
|
||||||
# # # # # # # # # reflector functions # # # # # # # # #
|
# # # # # # # # # reflector functions # # # # # # # # #
|
||||||
|
|
||||||
def update_reflected_stream(self, sd_hash, reflector_address, success=True):
|
def update_reflected_stream(self, sd_hash, reflector_address, success=True):
|
||||||
|
|
|
@ -18,7 +18,7 @@ if typing.TYPE_CHECKING:
|
||||||
from lbry.blob.blob_info import BlobInfo
|
from lbry.blob.blob_info import BlobInfo
|
||||||
from lbry.dht.node import Node
|
from lbry.dht.node import Node
|
||||||
from lbry.extras.daemon.analytics import AnalyticsManager
|
from lbry.extras.daemon.analytics import AnalyticsManager
|
||||||
from lbry.wallet.transaction import Transaction
|
from lbry.wallet.transaction import Transaction, Output
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -68,7 +68,8 @@ class ManagedStream:
|
||||||
'saving',
|
'saving',
|
||||||
'finished_writing',
|
'finished_writing',
|
||||||
'started_writing',
|
'started_writing',
|
||||||
'finished_write_attempt'
|
'finished_write_attempt',
|
||||||
|
'claim_output'
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',
|
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',
|
||||||
|
@ -77,7 +78,8 @@ class ManagedStream:
|
||||||
download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None,
|
download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None,
|
||||||
descriptor: typing.Optional[StreamDescriptor] = None,
|
descriptor: typing.Optional[StreamDescriptor] = None,
|
||||||
content_fee: typing.Optional['Transaction'] = None,
|
content_fee: typing.Optional['Transaction'] = None,
|
||||||
analytics_manager: typing.Optional['AnalyticsManager'] = None):
|
analytics_manager: typing.Optional['AnalyticsManager'] = None,
|
||||||
|
output: typing.Optional['Output'] = None):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.config = config
|
self.config = config
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
|
@ -91,6 +93,7 @@ class ManagedStream:
|
||||||
self.content_fee = content_fee
|
self.content_fee = content_fee
|
||||||
self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor)
|
self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor)
|
||||||
self.analytics_manager = analytics_manager
|
self.analytics_manager = analytics_manager
|
||||||
|
self.claim_output = output
|
||||||
|
|
||||||
self.fully_reflected = asyncio.Event(loop=self.loop)
|
self.fully_reflected = asyncio.Event(loop=self.loop)
|
||||||
self.file_output_task: typing.Optional[asyncio.Task] = None
|
self.file_output_task: typing.Optional[asyncio.Task] = None
|
||||||
|
@ -123,6 +126,14 @@ class ManagedStream:
|
||||||
def written_bytes(self) -> int:
|
def written_bytes(self) -> int:
|
||||||
return 0 if not self.output_file_exists else os.stat(self.full_path).st_size
|
return 0 if not self.output_file_exists else os.stat(self.full_path).st_size
|
||||||
|
|
||||||
|
@property
|
||||||
|
def completed(self):
|
||||||
|
return self.written_bytes >= self.descriptor.lower_bound_decrypted_length()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def stream_url(self):
|
||||||
|
return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}"
|
||||||
|
|
||||||
async def update_status(self, status: str):
|
async def update_status(self, status: str):
|
||||||
assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED]
|
assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED]
|
||||||
self._status = status
|
self._status = status
|
||||||
|
@ -203,47 +214,6 @@ class ManagedStream:
|
||||||
def mime_type(self):
|
def mime_type(self):
|
||||||
return guess_media_type(os.path.basename(self.descriptor.suggested_file_name))[0]
|
return guess_media_type(os.path.basename(self.descriptor.suggested_file_name))[0]
|
||||||
|
|
||||||
def as_dict(self) -> typing.Dict:
|
|
||||||
full_path = self.full_path
|
|
||||||
file_name = self.file_name
|
|
||||||
download_directory = self.download_directory
|
|
||||||
if not self.output_file_exists:
|
|
||||||
full_path = None
|
|
||||||
file_name = None
|
|
||||||
download_directory = None
|
|
||||||
return {
|
|
||||||
'streaming_url': f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}",
|
|
||||||
'completed': self.written_bytes >= self.descriptor.lower_bound_decrypted_length(),
|
|
||||||
'file_name': file_name,
|
|
||||||
'download_directory': download_directory,
|
|
||||||
'points_paid': 0.0,
|
|
||||||
'stopped': not self.running,
|
|
||||||
'stream_hash': self.stream_hash,
|
|
||||||
'stream_name': self.descriptor.stream_name,
|
|
||||||
'suggested_file_name': self.descriptor.suggested_file_name,
|
|
||||||
'sd_hash': self.descriptor.sd_hash,
|
|
||||||
'download_path': full_path,
|
|
||||||
'mime_type': self.mime_type,
|
|
||||||
'key': self.descriptor.key,
|
|
||||||
'total_bytes_lower_bound': self.descriptor.lower_bound_decrypted_length(),
|
|
||||||
'total_bytes': self.descriptor.upper_bound_decrypted_length(),
|
|
||||||
'written_bytes': self.written_bytes,
|
|
||||||
'blobs_completed': self.blobs_completed,
|
|
||||||
'blobs_in_stream': self.blobs_in_stream,
|
|
||||||
'blobs_remaining': self.blobs_remaining,
|
|
||||||
'status': self.status,
|
|
||||||
'claim_id': self.claim_id,
|
|
||||||
'txid': self.txid,
|
|
||||||
'nout': self.nout,
|
|
||||||
'outpoint': self.outpoint,
|
|
||||||
'metadata': self.metadata,
|
|
||||||
'protobuf': self.metadata_protobuf,
|
|
||||||
'channel_claim_id': self.channel_claim_id,
|
|
||||||
'channel_name': self.channel_name,
|
|
||||||
'claim_name': self.claim_name,
|
|
||||||
'content_fee': self.content_fee
|
|
||||||
}
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',
|
async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',
|
||||||
file_path: str, key: typing.Optional[bytes] = None,
|
file_path: str, key: typing.Optional[bytes] = None,
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import os
|
import os
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import time
|
||||||
import typing
|
import typing
|
||||||
import binascii
|
import binascii
|
||||||
import logging
|
import logging
|
||||||
|
@ -126,12 +127,15 @@ class StreamManager:
|
||||||
claim, content_fee=content_fee, rowid=rowid, descriptor=descriptor,
|
claim, content_fee=content_fee, rowid=rowid, descriptor=descriptor,
|
||||||
analytics_manager=self.analytics_manager
|
analytics_manager=self.analytics_manager
|
||||||
)
|
)
|
||||||
|
tx = await self.wallet.get_transaction(claim.txid, save_missing=True)
|
||||||
|
stream.claim_output = tx.outputs[claim.nout]
|
||||||
self.streams[sd_hash] = stream
|
self.streams[sd_hash] = stream
|
||||||
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
|
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
|
||||||
|
|
||||||
async def load_and_resume_streams_from_database(self):
|
async def load_and_resume_streams_from_database(self):
|
||||||
to_recover = []
|
to_recover = []
|
||||||
to_start = []
|
to_start = []
|
||||||
|
start = time.time()
|
||||||
|
|
||||||
await self.storage.update_manually_removed_files_since_last_run()
|
await self.storage.update_manually_removed_files_since_last_run()
|
||||||
|
|
||||||
|
@ -161,6 +165,7 @@ class StreamManager:
|
||||||
if add_stream_tasks:
|
if add_stream_tasks:
|
||||||
await asyncio.gather(*add_stream_tasks, loop=self.loop)
|
await asyncio.gather(*add_stream_tasks, loop=self.loop)
|
||||||
log.info("Started stream manager with %i files", len(self.streams))
|
log.info("Started stream manager with %i files", len(self.streams))
|
||||||
|
log.info("took %s seconds", time.time() - start)
|
||||||
if not self.node:
|
if not self.node:
|
||||||
log.warning("no DHT node given, resuming downloads trusting that we can contact reflector")
|
log.warning("no DHT node given, resuming downloads trusting that we can contact reflector")
|
||||||
if to_resume_saving:
|
if to_resume_saving:
|
||||||
|
|
|
@ -194,7 +194,7 @@ class LbryWalletManager(BaseWalletManager):
|
||||||
await account.ledger.broadcast(tx)
|
await account.ledger.broadcast(tx)
|
||||||
return tx
|
return tx
|
||||||
|
|
||||||
async def get_transaction(self, txid):
|
async def get_transaction(self, txid, save_missing=False):
|
||||||
tx = await self.db.get_transaction(txid=txid)
|
tx = await self.db.get_transaction(txid=txid)
|
||||||
if not tx:
|
if not tx:
|
||||||
try:
|
try:
|
||||||
|
@ -206,6 +206,8 @@ class LbryWalletManager(BaseWalletManager):
|
||||||
return {'success': False, 'code': e.code, 'message': e.message}
|
return {'success': False, 'code': e.code, 'message': e.message}
|
||||||
tx = self.ledger.transaction_class(unhexlify(raw))
|
tx = self.ledger.transaction_class(unhexlify(raw))
|
||||||
await self.ledger.maybe_verify_transaction(tx, height)
|
await self.ledger.maybe_verify_transaction(tx, height)
|
||||||
|
if save_missing:
|
||||||
|
await self.db.insert_transaction(tx)
|
||||||
return tx
|
return tx
|
||||||
|
|
||||||
def save(self):
|
def save(self):
|
||||||
|
|
|
@ -43,11 +43,11 @@ class TestManagedStream(BlobExchangeTestBase):
|
||||||
async def test_status_file_completed(self):
|
async def test_status_file_completed(self):
|
||||||
await self._test_transfer_stream(10)
|
await self._test_transfer_stream(10)
|
||||||
self.assertTrue(self.stream.output_file_exists)
|
self.assertTrue(self.stream.output_file_exists)
|
||||||
self.assertTrue(self.stream.as_dict()['completed'])
|
self.assertTrue(self.stream.completed)
|
||||||
with open(self.stream.full_path, 'w+b') as outfile:
|
with open(self.stream.full_path, 'w+b') as outfile:
|
||||||
outfile.truncate(1)
|
outfile.truncate(1)
|
||||||
self.assertTrue(self.stream.output_file_exists)
|
self.assertTrue(self.stream.output_file_exists)
|
||||||
self.assertFalse(self.stream.as_dict()['completed'])
|
self.assertFalse(self.stream.completed)
|
||||||
|
|
||||||
async def _test_transfer_stream(self, blob_count: int, mock_accumulate_peers=None, stop_when_done=True):
|
async def _test_transfer_stream(self, blob_count: int, mock_accumulate_peers=None, stop_when_done=True):
|
||||||
await self.setup_stream(blob_count)
|
await self.setup_stream(blob_count)
|
||||||
|
|
Loading…
Add table
Reference in a new issue