Merge branch 'master' into master

This commit is contained in:
endes123321 2020-04-19 19:54:24 +01:00 committed by GitHub
commit 35e8ce60a9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
32 changed files with 1046 additions and 327 deletions

View file

@ -6,7 +6,7 @@ jobs:
name: lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
with:
python-version: '3.7'
@ -17,7 +17,7 @@ jobs:
name: "tests / unit"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
with:
python-version: '3.7'
@ -37,12 +37,14 @@ jobs:
- blockchain
- other
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
with:
python-version: '3.7'
- if: matrix.test == 'other'
run: sudo apt install -y --no-install-recommends ffmpeg
run: |
sudo apt-get update
sudo apt-get install -y --no-install-recommends ffmpeg
- run: pip install tox-travis
- run: tox -e ${{ matrix.test }}
@ -57,7 +59,7 @@ jobs:
- windows-latest
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
with:
python-version: '3.7'

File diff suppressed because one or more lines are too long

View file

@ -1,2 +1,2 @@
__version__ = "0.66.0"
__version__ = "0.69.1"
version = tuple(map(int, __version__.split('.'))) # pylint: disable=invalid-name

View file

@ -602,8 +602,6 @@ class Config(CLIConfig):
# blockchain
blockchain_name = String("Blockchain name - lbrycrd_main, lbrycrd_regtest, or lbrycrd_testnet", 'lbrycrd_main')
s3_headers_depth = Integer("download headers from s3 when the local height is more than 10 chunks behind", 96 * 10)
cache_time = Integer("Time to cache resolved claims", 150) # TODO: use this
# daemon
save_files = Toggle("Save downloaded files when calling `get` by default", True)

View file

@ -158,11 +158,14 @@ class ComponentManager:
for component in self.components
}
def get_component(self, component_name):
def get_actual_component(self, component_name):
for component in self.components:
if component.component_name == component_name:
return component.component
return component
raise NameError(component_name)
def get_component(self, component_name):
return self.get_actual_component(component_name).component
def has_component(self, component_name):
return any(component for component in self.components if component_name == component.component_name)

View file

@ -329,6 +329,9 @@ class Daemon(metaclass=JSONRPCServerType):
prom_app.router.add_get('/metrics', self.handle_metrics_get_request)
self.metrics_runner = web.AppRunner(prom_app)
self.need_connection_status_refresh = asyncio.Event()
self._connection_status_task: Optional[asyncio.Task] = None
@property
def dht_node(self) -> typing.Optional['Node']:
return self.component_manager.get_component(DHT_COMPONENT)
@ -441,18 +444,25 @@ class Daemon(metaclass=JSONRPCServerType):
log.warning("detected internet connection was lost")
self._connection_status = (self.component_manager.loop.time(), connected)
async def get_connection_status(self) -> str:
if self._connection_status[0] + 300 > self.component_manager.loop.time():
if not self._connection_status[1]:
await self.update_connection_status()
else:
async def keep_connection_status_up_to_date(self):
while True:
try:
await asyncio.wait_for(self.need_connection_status_refresh.wait(), 300)
except asyncio.TimeoutError:
pass
await self.update_connection_status()
return CONNECTION_STATUS_CONNECTED if self._connection_status[1] else CONNECTION_STATUS_NETWORK
self.need_connection_status_refresh.clear()
async def start(self):
log.info("Starting LBRYNet Daemon")
log.debug("Settings: %s", json.dumps(self.conf.settings_dict, indent=2))
log.info("Platform: %s", json.dumps(self.platform_info, indent=2))
self.need_connection_status_refresh.set()
self._connection_status_task = self.component_manager.loop.create_task(
self.keep_connection_status_up_to_date()
)
await self.analytics_manager.send_server_startup()
await self.rpc_runner.setup()
await self.streaming_runner.setup()
@ -511,6 +521,10 @@ class Daemon(metaclass=JSONRPCServerType):
await self.component_startup_task
async def stop(self):
if self._connection_status_task:
if not self._connection_status_task.done():
self._connection_status_task.cancel()
self._connection_status_task = None
if self.component_startup_task is not None:
if self.component_startup_task.done():
await self.component_manager.stop()
@ -785,7 +799,7 @@ class Daemon(metaclass=JSONRPCServerType):
'analyze_audio_volume': (bool) should ffmpeg analyze audio
}
"""
return await self._video_file_analyzer.status(reset=True)
return await self._video_file_analyzer.status(reset=True, recheck=True)
async def jsonrpc_status(self):
"""
@ -875,14 +889,16 @@ class Daemon(metaclass=JSONRPCServerType):
}
"""
connection_code = await self.get_connection_status()
if not self._connection_status[1]:
self.need_connection_status_refresh.set()
connection_code = CONNECTION_STATUS_CONNECTED if self._connection_status[1] else CONNECTION_STATUS_NETWORK
ffmpeg_status = await self._video_file_analyzer.status()
running_components = self.component_manager.get_components_status()
response = {
'installation_id': self.installation_id,
'is_running': all(self.component_manager.get_components_status().values()),
'is_running': all(running_components.values()),
'skipped_components': self.component_manager.skip_components,
'startup_status': self.component_manager.get_components_status(),
'startup_status': running_components,
'connection_status': {
'code': connection_code,
'message': CONNECTION_MESSAGES[connection_code],
@ -1325,6 +1341,8 @@ class Daemon(metaclass=JSONRPCServerType):
Returns:
Dictionary of wallet status information.
"""
if self.wallet_manager is None:
return {'is_encrypted': None, 'is_syncing': None, 'is_locked': None}
wallet = self.wallet_manager.get_wallet_or_default(wallet_id)
return {
'is_encrypted': wallet.is_encrypted,
@ -1899,9 +1917,8 @@ class Daemon(metaclass=JSONRPCServerType):
"""
@requires(STREAM_MANAGER_COMPONENT)
async def jsonrpc_file_list(
self, sort=None, reverse=False, comparison=None,
wallet_id=None, page=None, page_size=None, **kwargs):
async def jsonrpc_file_list(self, sort=None, reverse=False, comparison=None, wallet_id=None, page=None,
page_size=None, **kwargs):
"""
List files limited by optional filters
@ -1922,17 +1939,17 @@ class Daemon(metaclass=JSONRPCServerType):
--stream_hash=<stream_hash> : (str) get file with matching stream hash
--rowid=<rowid> : (int) get file with matching row id
--added_on=<added_on> : (int) get file with matching time of insertion
--claim_id=<claim_id> : (str) get file with matching claim id
--outpoint=<outpoint> : (str) get file with matching claim outpoint
--claim_id=<claim_id> : (str) get file with matching claim id(s)
--outpoint=<outpoint> : (str) get file with matching claim outpoint(s)
--txid=<txid> : (str) get file with matching claim txid
--nout=<nout> : (int) get file with matching claim nout
--channel_claim_id=<channel_claim_id> : (str) get file with matching channel claim id
--channel_claim_id=<channel_claim_id> : (str) get file with matching channel claim id(s)
--channel_name=<channel_name> : (str) get file with matching channel name
--claim_name=<claim_name> : (str) get file with matching claim name
--blobs_in_stream<blobs_in_stream> : (int) get file with matching blobs in stream
--blobs_remaining=<blobs_remaining> : (int) amount of remaining blobs to download
--sort=<sort_by> : (str) field to sort by (one of the above filter fields)
--comparison=<comparison> : (str) logical comparison, (eq | ne | g | ge | l | le)
--comparison=<comparison> : (str) logical comparison, (eq | ne | g | ge | l | le | in)
--page=<page> : (int) page to return during paginating
--page_size=<page_size> : (int) number of items on page during pagination
--wallet_id=<wallet_id> : (str) add purchase receipts from this wallet
@ -1942,6 +1959,7 @@ class Daemon(metaclass=JSONRPCServerType):
wallet = self.wallet_manager.get_wallet_or_default(wallet_id)
sort = sort or 'rowid'
comparison = comparison or 'eq'
paginated = paginate_list(
self.stream_manager.get_filtered_streams(sort, reverse, comparison, **kwargs), page, page_size
)
@ -2195,7 +2213,7 @@ class Daemon(metaclass=JSONRPCServerType):
List my stream and channel claims.
Usage:
claim_list [--claim_type=<claim_type>...] [--claim_id=<claim_id>...] [--name=<name>...]
claim_list [--claim_type=<claim_type>...] [--claim_id=<claim_id>...] [--name=<name>...] [--is_spent]
[--channel_id=<channel_id>...] [--account_id=<account_id>] [--wallet_id=<wallet_id>]
[--page=<page>] [--page_size=<page_size>]
[--resolve] [--order_by=<order_by>] [--no_totals] [--include_received_tips]
@ -2205,6 +2223,7 @@ class Daemon(metaclass=JSONRPCServerType):
--claim_id=<claim_id> : (str or list) claim id
--channel_id=<channel_id> : (str or list) streams in this channel
--name=<name> : (str or list) claim name
--is_spent : (bool) shows previous claim updates and abandons
--account_id=<account_id> : (str) id of the account to query
--wallet_id=<wallet_id> : (str) restrict results to specific wallet
--page=<page> : (int) page to return during paginating
@ -2218,7 +2237,8 @@ class Daemon(metaclass=JSONRPCServerType):
Returns: {Paginated[Output]}
"""
kwargs['type'] = claim_type or CLAIM_TYPE_NAMES
kwargs['unspent'] = True
if 'is_spent' not in kwargs:
kwargs['is_not_spent'] = True
return self.jsonrpc_txo_list(**kwargs)
@requires(WALLET_COMPONENT)
@ -2732,12 +2752,13 @@ class Daemon(metaclass=JSONRPCServerType):
Usage:
channel_list [<account_id> | --account_id=<account_id>] [--wallet_id=<wallet_id>]
[--name=<name>...] [--claim_id=<claim_id>...]
[--name=<name>...] [--claim_id=<claim_id>...] [--is_spent]
[--page=<page>] [--page_size=<page_size>] [--resolve] [--no_totals]
Options:
--name=<name> : (str or list) channel name
--claim_id=<claim_id> : (str or list) channel id
--is_spent : (bool) shows previous channel updates and abandons
--account_id=<account_id> : (str) id of the account to use
--wallet_id=<wallet_id> : (str) restrict results to specific wallet
--page=<page> : (int) page to return during paginating
@ -2749,7 +2770,8 @@ class Daemon(metaclass=JSONRPCServerType):
Returns: {Paginated[Output]}
"""
kwargs['type'] = 'channel'
kwargs['unspent'] = True
if 'is_spent' not in kwargs:
kwargs['is_not_spent'] = True
return self.jsonrpc_txo_list(*args, **kwargs)
@requires(WALLET_COMPONENT)
@ -3486,12 +3508,13 @@ class Daemon(metaclass=JSONRPCServerType):
Usage:
stream_list [<account_id> | --account_id=<account_id>] [--wallet_id=<wallet_id>]
[--name=<name>...] [--claim_id=<claim_id>...]
[--name=<name>...] [--claim_id=<claim_id>...] [--is_spent]
[--page=<page>] [--page_size=<page_size>] [--resolve] [--no_totals]
Options:
--name=<name> : (str or list) stream name
--claim_id=<claim_id> : (str or list) stream id
--is_spent : (bool) shows previous stream updates and abandons
--account_id=<account_id> : (str) id of the account to query
--wallet_id=<wallet_id> : (str) restrict results to specific wallet
--page=<page> : (int) page to return during paginating
@ -3503,7 +3526,8 @@ class Daemon(metaclass=JSONRPCServerType):
Returns: {Paginated[Output]}
"""
kwargs['type'] = 'stream'
kwargs['unspent'] = True
if 'is_spent' not in kwargs:
kwargs['is_not_spent'] = True
return self.jsonrpc_txo_list(*args, **kwargs)
@requires(WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT,
@ -3950,19 +3974,23 @@ class Daemon(metaclass=JSONRPCServerType):
return tx
@requires(WALLET_COMPONENT)
def jsonrpc_support_list(self, *args, tips=None, **kwargs):
def jsonrpc_support_list(self, *args, received=False, sent=False, staked=False, **kwargs):
"""
List supports and tips in my control.
List staked supports and sent/received tips.
Usage:
support_list [<account_id> | --account_id=<account_id>] [--wallet_id=<wallet_id>]
[--name=<name>...] [--claim_id=<claim_id>...] [--tips]
[--name=<name>...] [--claim_id=<claim_id>...]
[--received | --sent | --staked] [--is_spent]
[--page=<page>] [--page_size=<page_size>] [--no_totals]
Options:
--name=<name> : (str or list) claim name
--claim_id=<claim_id> : (str or list) claim id
--tips : (bool) only show tips
--received : (bool) only show received (tips)
--sent : (bool) only show sent (tips)
--staked : (bool) only show my staked supports
--is_spent : (bool) show abandoned supports
--account_id=<account_id> : (str) id of the account to query
--wallet_id=<wallet_id> : (str) restrict results to specific wallet
--page=<page> : (int) page to return during paginating
@ -3973,9 +4001,20 @@ class Daemon(metaclass=JSONRPCServerType):
Returns: {Paginated[Output]}
"""
kwargs['type'] = 'support'
kwargs['unspent'] = True
if tips is True:
if 'is_spent' not in kwargs:
kwargs['is_not_spent'] = True
if received:
kwargs['is_not_my_input'] = True
kwargs['is_my_output'] = True
elif sent:
kwargs['is_my_input'] = True
kwargs['is_not_my_output'] = True
# spent for not my outputs is undetermined
kwargs.pop('is_spent', None)
kwargs.pop('is_not_spent', None)
elif staked:
kwargs['is_my_input'] = True
kwargs['is_my_output'] = True
return self.jsonrpc_txo_list(*args, **kwargs)
@requires(WALLET_COMPONENT)
@ -4150,11 +4189,15 @@ class Daemon(metaclass=JSONRPCServerType):
@staticmethod
def _constrain_txo_from_kwargs(
constraints, type=None, txid=None, # pylint: disable=redefined-builtin
claim_id=None, channel_id=None, name=None, unspent=False, reposted_claim_id=None,
claim_id=None, channel_id=None, name=None, reposted_claim_id=None,
is_spent=False, is_not_spent=False,
is_my_input_or_output=None, exclude_internal_transfers=False,
is_my_output=None, is_not_my_output=None,
is_my_input=None, is_not_my_input=None):
constraints['unspent'] = unspent
if is_spent:
constraints['is_spent'] = True
elif is_not_spent:
constraints['is_spent'] = False
constraints['exclude_internal_transfers'] = exclude_internal_transfers
if is_my_input_or_output is True:
constraints['is_my_input_or_output'] = True
@ -4183,8 +4226,9 @@ class Daemon(metaclass=JSONRPCServerType):
List my transaction outputs.
Usage:
txo_list [--account_id=<account_id>] [--type=<type>...] [--txid=<txid>...] [--unspent]
txo_list [--account_id=<account_id>] [--type=<type>...] [--txid=<txid>...]
[--claim_id=<claim_id>...] [--channel_id=<channel_id>...] [--name=<name>...]
[--is_spent | --is_not_spent]
[--is_my_input_or_output |
[[--is_my_output | --is_not_my_output] [--is_my_input | --is_not_my_input]]
]
@ -4199,7 +4243,8 @@ class Daemon(metaclass=JSONRPCServerType):
--claim_id=<claim_id> : (str or list) claim id
--channel_id=<channel_id> : (str or list) claims in this channel
--name=<name> : (str or list) claim name
--unspent : (bool) hide spent outputs, show only unspent ones
--is_spent : (bool) only show spent txos
--is_not_spent : (bool) only show not spent txos
--is_my_input_or_output : (bool) txos which have your inputs or your outputs,
if using this flag the other related flags
are ignored (--is_my_output, --is_my_input, etc)
@ -4248,6 +4293,63 @@ class Daemon(metaclass=JSONRPCServerType):
self._constrain_txo_from_kwargs(constraints, **kwargs)
return paginate_rows(claims, None if no_totals else claim_count, page, page_size, **constraints)
@requires(WALLET_COMPONENT)
async def jsonrpc_txo_spend(
self, account_id=None, wallet_id=None, batch_size=500,
include_full_tx=False, preview=False, blocking=False, **kwargs):
"""
Spend transaction outputs, batching into multiple transactions as necessary.
Usage:
txo_spend [--account_id=<account_id>] [--type=<type>...] [--txid=<txid>...]
[--claim_id=<claim_id>...] [--channel_id=<channel_id>...] [--name=<name>...]
[--is_my_input | --is_not_my_input]
[--exclude_internal_transfers] [--wallet_id=<wallet_id>]
[--preview] [--blocking] [--batch_size=<batch_size>] [--include_full_tx]
Options:
--type=<type> : (str or list) claim type: stream, channel, support,
purchase, collection, repost, other
--txid=<txid> : (str or list) transaction id of outputs
--claim_id=<claim_id> : (str or list) claim id
--channel_id=<channel_id> : (str or list) claims in this channel
--name=<name> : (str or list) claim name
--is_my_input : (bool) show outputs created by you
--is_not_my_input : (bool) show outputs not created by you
--exclude_internal_transfers: (bool) excludes any outputs that are exactly this combination:
"--is_my_input --is_my_output --type=other"
this allows to exclude "change" payments, this
flag can be used in combination with any of the other flags
--account_id=<account_id> : (str) id of the account to query
--wallet_id=<wallet_id> : (str) restrict results to specific wallet
--preview : (bool) do not broadcast the transaction
--blocking : (bool) wait until abandon is in mempool
--batch_size=<batch_size> : (int) number of txos to spend per transactions
--include_full_tx : (bool) include entire tx in output and not just the txid
Returns: {List[Transaction]}
"""
wallet = self.wallet_manager.get_wallet_or_default(wallet_id)
accounts = [wallet.get_account_or_error(account_id)] if account_id else wallet.accounts
txos = await self.ledger.get_txos(
wallet=wallet, accounts=accounts, read_only=True,
**self._constrain_txo_from_kwargs({}, is_not_spent=True, is_my_output=True, **kwargs)
)
txs = []
while txos:
txs.append(
await Transaction.create(
[Input.spend(txos.pop()) for _ in range(min(len(txos), batch_size))],
[], accounts, accounts[0]
)
)
if not preview:
for tx in txs:
await self.broadcast_or_release(tx, blocking)
if include_full_tx:
return txs
return [{'txid': tx.id} for tx in txs]
@requires(WALLET_COMPONENT)
def jsonrpc_txo_sum(self, account_id=None, wallet_id=None, **kwargs):
"""
@ -4255,7 +4357,8 @@ class Daemon(metaclass=JSONRPCServerType):
Usage:
txo_list [--account_id=<account_id>] [--type=<type>...] [--txid=<txid>...]
[--claim_id=<claim_id>...] [--name=<name>...] [--unspent]
[--claim_id=<claim_id>...] [--name=<name>...]
[--is_spent] [--is_not_spent]
[--is_my_input_or_output |
[[--is_my_output | --is_not_my_output] [--is_my_input | --is_not_my_input]]
]
@ -4267,7 +4370,8 @@ class Daemon(metaclass=JSONRPCServerType):
--txid=<txid> : (str or list) transaction id of outputs
--claim_id=<claim_id> : (str or list) claim id
--name=<name> : (str or list) claim name
--unspent : (bool) hide spent outputs, show only unspent ones
--is_spent : (bool) only show spent txos
--is_not_spent : (bool) only show not spent txos
--is_my_input_or_output : (bool) txos which have your inputs or your outputs,
if using this flag the other related flags
are ignored (--is_my_output, --is_my_input, etc)
@ -4299,7 +4403,7 @@ class Daemon(metaclass=JSONRPCServerType):
Usage:
txo_plot [--account_id=<account_id>] [--type=<type>...] [--txid=<txid>...]
[--claim_id=<claim_id>...] [--name=<name>...] [--unspent]
[--claim_id=<claim_id>...] [--name=<name>...] [--is_spent] [--is_not_spent]
[--is_my_input_or_output |
[[--is_my_output | --is_not_my_output] [--is_my_input | --is_not_my_input]]
]
@ -4314,7 +4418,8 @@ class Daemon(metaclass=JSONRPCServerType):
--txid=<txid> : (str or list) transaction id of outputs
--claim_id=<claim_id> : (str or list) claim id
--name=<name> : (str or list) claim name
--unspent : (bool) hide spent outputs, show only unspent ones
--is_spent : (bool) only show spent txos
--is_not_spent : (bool) only show not spent txos
--is_my_input_or_output : (bool) txos which have your inputs or your outputs,
if using this flag the other related flags
are ignored (--is_my_output, --is_my_input, etc)
@ -4371,7 +4476,7 @@ class Daemon(metaclass=JSONRPCServerType):
Returns: {Paginated[Output]}
"""
kwargs['type'] = ['other', 'purchase']
kwargs['unspent'] = True
kwargs['is_not_spent'] = True
return self.jsonrpc_txo_list(*args, **kwargs)
@requires(WALLET_COMPONENT)
@ -5049,10 +5154,11 @@ class Daemon(metaclass=JSONRPCServerType):
--comment_ids=<comment_ids> : (str, list) one or more comment_id to hide.
--wallet_id=<wallet_id> : (str) restrict operation to specific wallet
Returns:
(dict) keyed by comment_id, containing success info
'<comment_id>': {
"hidden": (bool) flag indicating if comment_id was hidden
Returns: lists containing the ids comments that are hidden and visible.
{
"hidden": (list) IDs of hidden comments.
"visible": (list) IDs of visible comments.
}
"""
wallet = self.wallet_manager.get_wallet_or_default(wallet_id)
@ -5063,6 +5169,7 @@ class Daemon(metaclass=JSONRPCServerType):
comments = await comment_client.jsonrpc_post(
self.conf.comment_server, 'get_comments_by_id', comment_ids=comment_ids
)
comments = comments['items']
claim_ids = {comment['claim_id'] for comment in comments}
claims = {cid: await self.ledger.get_claim_by_claim_id(wallet.accounts, claim_id=cid) for cid in claim_ids}
pieces = []

View file

@ -108,7 +108,8 @@ def encode_file_doc():
'metadata': '(dict) None if claim is not found else the claim metadata',
'channel_claim_id': '(str) None if claim is not found or not signed',
'channel_name': '(str) None if claim is not found or not signed',
'claim_name': '(str) None if claim is not found else the claim name'
'claim_name': '(str) None if claim is not found else the claim name',
'reflector_progress': '(int) reflector upload progress, 0 to 100'
}
@ -307,7 +308,8 @@ class JSONResponseEncoder(JSONEncoder):
'height': tx_height,
'confirmations': (best_height + 1) - tx_height if tx_height > 0 else tx_height,
'timestamp': self.ledger.headers.estimated_timestamp(tx_height),
'is_fully_reflected': managed_stream.is_fully_reflected
'is_fully_reflected': managed_stream.is_fully_reflected,
'reflector_progress': managed_stream.reflector_progress
}
def encode_claim(self, claim):

View file

@ -8,6 +8,7 @@ import re
import shlex
import shutil
import subprocess
from math import ceil
import lbry.utils
from lbry.conf import TranscodeConfig
@ -30,6 +31,7 @@ class VideoFileAnalyzer:
self._which_ffmpeg = None
self._which_ffprobe = None
self._env_copy = dict(os.environ)
self._checked_ffmpeg = False
if lbry.utils.is_running_from_bundle():
# handle the situation where PyInstaller overrides our runtime environment:
self._replace_or_pop_env('LD_LIBRARY_PATH')
@ -72,6 +74,10 @@ class VideoFileAnalyzer:
log.debug("Using %s at %s", version.splitlines()[0].split(" Copyright")[0], self._which_ffmpeg)
return version
@staticmethod
def _which_ffmpeg_and_ffmprobe(path):
return shutil.which("ffmpeg", path=path), shutil.which("ffprobe", path=path)
async def _verify_ffmpeg_installed(self):
if self._ffmpeg_installed:
return
@ -80,29 +86,33 @@ class VideoFileAnalyzer:
if hasattr(self._conf, "data_dir"):
path += os.path.pathsep + os.path.join(getattr(self._conf, "data_dir"), "ffmpeg", "bin")
path += os.path.pathsep + self._env_copy.get("PATH", "")
self._which_ffmpeg = shutil.which("ffmpeg", path=path)
self._which_ffmpeg, self._which_ffprobe = await asyncio.get_running_loop().run_in_executor(
None, self._which_ffmpeg_and_ffmprobe, path
)
if not self._which_ffmpeg:
log.warning("Unable to locate ffmpeg executable. Path: %s", path)
raise FileNotFoundError(f"Unable to locate ffmpeg executable. Path: {path}")
self._which_ffprobe = shutil.which("ffprobe", path=path)
if not self._which_ffprobe:
log.warning("Unable to locate ffprobe executable. Path: %s", path)
raise FileNotFoundError(f"Unable to locate ffprobe executable. Path: {path}")
if os.path.dirname(self._which_ffmpeg) != os.path.dirname(self._which_ffprobe):
log.warning("ffmpeg and ffprobe are in different folders!")
await self._verify_executables()
self._ffmpeg_installed = True
async def status(self, reset=False):
async def status(self, reset=False, recheck=False):
if reset:
self._available_encoders = ""
self._ffmpeg_installed = None
if self._ffmpeg_installed is None:
if self._checked_ffmpeg and not recheck:
pass
elif self._ffmpeg_installed is None:
try:
await self._verify_ffmpeg_installed()
except FileNotFoundError:
pass
self._checked_ffmpeg = True
return {
"available": self._ffmpeg_installed,
"which": self._which_ffmpeg,
@ -345,7 +355,7 @@ class VideoFileAnalyzer:
def _build_spec(scan_data):
assert scan_data
duration = float(scan_data["format"]["duration"]) # existence verified when scan_data made
duration = ceil(float(scan_data["format"]["duration"])) # existence verified when scan_data made
width = -1
height = -1
for stream in scan_data["streams"]:
@ -354,7 +364,7 @@ class VideoFileAnalyzer:
width = max(width, int(stream["width"]))
height = max(height, int(stream["height"]))
log.debug(" Detected duration: %f sec. with resolution: %d x %d", duration, width, height)
log.debug(" Detected duration: %d sec. with resolution: %d x %d", duration, width, height)
spec = {"duration": duration}
if height >= 0:

View file

@ -65,6 +65,7 @@ class ManagedStream:
'downloader',
'analytics_manager',
'fully_reflected',
'reflector_progress',
'file_output_task',
'delayed_stop_task',
'streaming_responses',
@ -101,6 +102,7 @@ class ManagedStream:
self.analytics_manager = analytics_manager
self.fully_reflected = asyncio.Event(loop=self.loop)
self.reflector_progress = 0
self.file_output_task: typing.Optional[asyncio.Task] = None
self.delayed_stop_task: typing.Optional[asyncio.Task] = None
self.streaming_responses: typing.List[typing.Tuple[Request, StreamResponse]] = []
@ -445,9 +447,10 @@ class ManagedStream:
]
log.info("we have %i/%i needed blobs needed by reflector for lbry://%s#%s", len(we_have), len(needed),
self.claim_name, self.claim_id)
for blob_hash in we_have:
for i, blob_hash in enumerate(we_have):
await protocol.send_blob(blob_hash)
sent.append(blob_hash)
self.reflector_progress = int((i + 1) / len(we_have) * 100)
except (asyncio.TimeoutError, ValueError):
return sent
except ConnectionRefusedError:

View file

@ -23,6 +23,9 @@ if typing.TYPE_CHECKING:
from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
from lbry.wallet.transaction import Transaction
from lbry.wallet.manager import WalletManager
from lbry.wallet.wallet import Wallet
log = logging.getLogger(__name__)
@ -46,6 +49,12 @@ FILTER_FIELDS = [
'blobs_in_stream'
]
SET_FILTER_FIELDS = {
"claim_ids": "claim_id",
"channel_claim_ids": "channel_claim_id",
"outpoints": "outpoint"
}
COMPARISON_OPERATORS = {
'eq': lambda a, b: a == b,
'ne': lambda a, b: a != b,
@ -53,6 +62,7 @@ COMPARISON_OPERATORS = {
'l': lambda a, b: a < b,
'ge': lambda a, b: a >= b,
'le': lambda a, b: a <= b,
'in': lambda a, b: a in b
}
@ -276,15 +286,34 @@ class StreamManager:
raise ValueError(f"'{comparison}' is not a valid comparison")
if 'full_status' in search_by:
del search_by['full_status']
for search in search_by:
if search not in FILTER_FIELDS:
raise ValueError(f"'{search}' is not a valid search operation")
compare_sets = {}
if isinstance(search_by.get('claim_id'), list):
compare_sets['claim_ids'] = search_by.pop('claim_id')
if isinstance(search_by.get('outpoint'), list):
compare_sets['outpoints'] = search_by.pop('outpoint')
if isinstance(search_by.get('channel_claim_id'), list):
compare_sets['channel_claim_ids'] = search_by.pop('channel_claim_id')
if search_by:
comparison = comparison or 'eq'
streams = []
for stream in self.streams.values():
matched = False
for set_search, val in compare_sets.items():
if COMPARISON_OPERATORS[comparison](getattr(stream, SET_FILTER_FIELDS[set_search]), val):
streams.append(stream)
matched = True
break
if matched:
continue
for search, val in search_by.items():
if COMPARISON_OPERATORS[comparison](getattr(stream, search), val):
this_stream = getattr(stream, search)
if COMPARISON_OPERATORS[comparison](this_stream, val):
streams.append(stream)
break
else:

View file

@ -565,6 +565,14 @@ class CommandTestCase(IntegrationTestCase):
self.daemon.jsonrpc_wallet_send(*args, **kwargs), confirm
)
async def txo_spend(self, *args, confirm=True, **kwargs):
txs = await self.daemon.jsonrpc_txo_spend(*args, **kwargs)
if confirm:
await asyncio.wait([self.ledger.wait(tx) for tx in txs])
await self.generate(1)
await asyncio.wait([self.ledger.wait(tx, self.blockchain.block_expected) for tx in txs])
return self.sout(txs)
async def resolve(self, uri, **kwargs):
return (await self.out(self.daemon.jsonrpc_resolve(uri, **kwargs)))[uri]

View file

@ -734,4 +734,10 @@ HASHES = {
732000: '53e1b373805f3236c7725415e872d5635b8679894c4fb630c62b6b75b4ec9d9c',
733000: '43e9ab6cf54fde5dcdc4c473af26b256435f4af4254d96fa728f2af9b078d630',
734000: 'a3ef7f9257d591c7dcc0f82346cb162a768ee5fe1228353ec485e69be1bf585f',
735000: '9bc81abb6c9294463d7fa12b9ceea4f929a5491cf4b6ff8e47e0a95b02c6d355',
736000: 'a3b391ecba546ebbbe6e05c5222beca269e5dce6e508028ea41725fef138b687',
737000: '0f2e4e43c76b3bf6fc6db9b87adb9a17a05e85110dcb923442746a00446e513a',
738000: 'aebdf15b23eb7a37600f67d45bf6586b1d5bff3d5f3459adc2f6211ab3dd0bcb',
739000: '3f5a894ac42f95f7d54ce25c42ea0baf1a05b2da0e9406978de0dc53484d8b04',
740000: '55debc22f995d844eafa0a90296c9f4f433e2b7f38456fff45dd3c66cef04e37',
}

View file

@ -694,7 +694,7 @@ class Database(SQLiteMixin):
self, cols, accounts=None, is_my_input=None, is_my_output=True,
is_my_input_or_output=None, exclude_internal_transfers=False,
include_is_spent=False, include_is_my_input=False,
read_only=False, **constraints):
is_spent=None, read_only=False, **constraints):
for rename_col in ('txid', 'txoid'):
for rename_constraint in (rename_col, rename_col+'__in', rename_col+'__not_in'):
if rename_constraint in constraints:
@ -733,27 +733,23 @@ class Database(SQLiteMixin):
include_is_my_input = True
constraints['exclude_internal_payments__or'] = {
'txo.txo_type__not': TXO_TYPES['other'],
'txo.address__not_in': my_addresses,
'txi.address__is_null': True,
'txi.address__not_in': my_addresses
'txi.address__not_in': my_addresses,
}
sql = [f"SELECT {cols} FROM txo JOIN tx ON (tx.txid=txo.txid)"]
if include_is_spent:
if is_spent:
constraints['spent.txoid__is_not_null'] = True
elif is_spent is False:
constraints['is_reserved'] = False
constraints['spent.txoid__is_null'] = True
if include_is_spent or is_spent is not None:
sql.append("LEFT JOIN txi AS spent ON (spent.txoid=txo.txoid)")
if include_is_my_input:
sql.append("LEFT JOIN txi ON (txi.position=0 AND txi.txid=txo.txid)")
return await self.db.execute_fetchall(*query(' '.join(sql), **constraints), read_only=read_only)
@staticmethod
def constrain_unspent(constraints):
constraints['is_reserved'] = False
constraints['include_is_spent'] = True
constraints['spent.txoid__is_null'] = True
async def get_txos(self, wallet=None, no_tx=False, unspent=False, read_only=False, **constraints):
if unspent:
self.constrain_unspent(constraints)
async def get_txos(self, wallet=None, no_tx=False, read_only=False, **constraints):
include_is_spent = constraints.get('include_is_spent', False)
include_is_my_input = constraints.get('include_is_my_input', False)
include_is_my_output = constraints.pop('include_is_my_output', False)
@ -869,7 +865,8 @@ class Database(SQLiteMixin):
return txos
def _clean_txo_constraints_for_aggregation(self, unspent, constraints):
@staticmethod
def _clean_txo_constraints_for_aggregation(constraints):
constraints.pop('include_is_spent', None)
constraints.pop('include_is_my_input', None)
constraints.pop('include_is_my_output', None)
@ -879,22 +876,19 @@ class Database(SQLiteMixin):
constraints.pop('offset', None)
constraints.pop('limit', None)
constraints.pop('order_by', None)
if unspent:
self.constrain_unspent(constraints)
async def get_txo_count(self, unspent=False, **constraints):
self._clean_txo_constraints_for_aggregation(unspent, constraints)
async def get_txo_count(self, **constraints):
self._clean_txo_constraints_for_aggregation(constraints)
count = await self.select_txos('COUNT(*) AS total', **constraints)
return count[0]['total'] or 0
async def get_txo_sum(self, unspent=False, **constraints):
self._clean_txo_constraints_for_aggregation(unspent, constraints)
async def get_txo_sum(self, **constraints):
self._clean_txo_constraints_for_aggregation(constraints)
result = await self.select_txos('SUM(amount) AS total', **constraints)
return result[0]['total'] or 0
async def get_txo_plot(
self, unspent=False, start_day=None, days_back=0, end_day=None, days_after=None, **constraints):
self._clean_txo_constraints_for_aggregation(unspent, constraints)
async def get_txo_plot(self, start_day=None, days_back=0, end_day=None, days_after=None, **constraints):
self._clean_txo_constraints_for_aggregation(constraints)
if start_day is None:
constraints['day__gte'] = self.ledger.headers.estimated_julian_day(
self.ledger.headers.height
@ -915,17 +909,18 @@ class Database(SQLiteMixin):
)
def get_utxos(self, read_only=False, **constraints):
return self.get_txos(unspent=True, read_only=read_only, **constraints)
return self.get_txos(is_spent=False, read_only=read_only, **constraints)
def get_utxo_count(self, **constraints):
return self.get_txo_count(unspent=True, **constraints)
return self.get_txo_count(is_spent=False, **constraints)
async def get_balance(self, wallet=None, accounts=None, read_only=False, **constraints):
assert wallet or accounts, \
"'wallet' or 'accounts' constraints required to calculate balance"
constraints['accounts'] = accounts or wallet.accounts
self.constrain_unspent(constraints)
balance = await self.select_txos('SUM(amount) as total', read_only=read_only, **constraints)
balance = await self.select_txos(
'SUM(amount) as total', is_spent=False, read_only=read_only, **constraints
)
return balance[0]['total'] or 0
async def select_addresses(self, cols, read_only=False, **constraints):
@ -1084,7 +1079,7 @@ class Database(SQLiteMixin):
def get_supports_summary(self, read_only=False, **constraints):
return self.get_txos(
txo_type=TXO_TYPES['support'],
unspent=True, is_my_output=True,
is_spent=False, is_my_output=True,
include_is_my_input=True,
no_tx=True, read_only=read_only,
**constraints

View file

@ -59,7 +59,15 @@ class Headers:
self.io = open(self.path, 'w+b')
else:
self.io = open(self.path, 'r+b')
self._size = self.io.seek(0, os.SEEK_END) // self.header_size
bytes_size = self.io.seek(0, os.SEEK_END)
self._size = bytes_size // self.header_size
max_checkpointed_height = max(self.checkpoints.keys() or [-1]) + 1000
if bytes_size % self.header_size:
log.warning("Reader file size doesnt match header size. Repairing, might take a while.")
await self.repair()
else:
# try repairing any incomplete write on tip from previous runs (outside of checkpoints, that are ok)
await self.repair(start_height=max_checkpointed_height)
await self.ensure_checkpointed_size()
await self.get_all_missing_headers()
@ -128,7 +136,9 @@ class Headers:
raise IndexError(f"failed to get {height}, at {len(self)}")
def estimated_timestamp(self, height):
return self.first_block_timestamp + (height * self.timestamp_average_offset)
if height <= 0:
return
return int(self.first_block_timestamp + (height * self.timestamp_average_offset))
def estimated_julian_day(self, height):
return date_to_julian_day(date.fromtimestamp(self.estimated_timestamp(height)))
@ -292,23 +302,26 @@ class Headers:
height, f"insufficient proof of work: {proof_of_work.value} vs target {target.value}"
)
async def repair(self):
async def repair(self, start_height=0):
previous_header_hash = fail = None
batch_size = 36
for start_height in range(0, self.height, batch_size):
for height in range(start_height, self.height, batch_size):
headers = await asyncio.get_running_loop().run_in_executor(
self.executor, self._read, start_height, batch_size
self.executor, self._read, height, batch_size
)
if len(headers) % self.header_size != 0:
headers = headers[:(len(headers) // self.header_size) * self.header_size]
for header_hash, header in self._iterate_headers(start_height, headers):
for header_hash, header in self._iterate_headers(height, headers):
height = header['block_height']
if height:
if previous_header_hash:
if header['prev_block_hash'] != previous_header_hash:
fail = True
else:
elif height == 0:
if header_hash != self.genesis_hash:
fail = True
else:
# for sanity and clarity, since it is the only way we can end up here
assert start_height > 0 and height == start_height
if fail:
log.warning("Header file corrupted at height %s, truncating it.", height - 1)
def __truncate(at_height):

View file

@ -24,6 +24,7 @@ from .account import Account, AddressManager, SingleKey
from .network import Network
from .transaction import Transaction, Output
from .header import Headers, UnvalidatedHeaders
from .checkpoints import HASHES
from .constants import TXO_TYPES, CLAIM_TYPES, COIN, NULL_HASH32
from .bip32 import PubKey, PrivateKey
from .coinselection import CoinSelector
@ -108,6 +109,8 @@ class Ledger(metaclass=LedgerRegistry):
default_fee_per_byte = 50
default_fee_per_name_char = 200000
checkpoints = HASHES
def __init__(self, config=None):
self.config = config or {}
self.db: Database = self.config.get('db') or Database(
@ -117,6 +120,7 @@ class Ledger(metaclass=LedgerRegistry):
self.headers: Headers = self.config.get('headers') or self.headers_class(
os.path.join(self.path, "headers")
)
self.headers.checkpoints = self.checkpoints
self.network: Network = self.config.get('network') or Network(self)
self.network.on_header.listen(self.receive_header)
self.network.on_status.listen(self.process_status_update)
@ -266,7 +270,7 @@ class Ledger(metaclass=LedgerRegistry):
self.constraint_spending_utxos(constraints)
return self.db.get_utxo_count(**constraints)
async def get_txos(self, resolve=False, **constraints):
async def get_txos(self, resolve=False, **constraints) -> List[Output]:
txos = await self.db.get_txos(**constraints)
if resolve:
return await self._resolve_for_local_results(constraints.get('accounts', []), txos)
@ -316,11 +320,12 @@ class Ledger(metaclass=LedgerRegistry):
self.db.open(),
self.headers.open()
])
first_connection = self.network.on_connected.first
asyncio.ensure_future(self.network.start())
await first_connection
fully_synced = self.on_ready.first
asyncio.create_task(self.network.start())
await self.network.on_connected.first
async with self._header_processing_lock:
await self._update_tasks.add(self.initial_headers_sync())
await fully_synced
await asyncio.gather(*(a.maybe_migrate_certificates() for a in self.accounts))
await asyncio.gather(*(a.save_max_gap() for a in self.accounts))
if len(self.accounts) > 10:
@ -328,12 +333,9 @@ class Ledger(metaclass=LedgerRegistry):
else:
await self._report_state()
self.on_transaction.listen(self._reset_balance_cache)
await self.on_ready.first
async def join_network(self, *_):
log.info("Subscribing and updating accounts.")
async with self._header_processing_lock:
await self._update_tasks.add(self.initial_headers_sync())
await self._update_tasks.add(self.subscribe_accounts())
await self._update_tasks.done.wait()
self._on_ready_controller.add(True)
@ -356,8 +358,8 @@ class Ledger(metaclass=LedgerRegistry):
self.headers.chunk_getter = get_chunk
async def doit():
async with self._header_processing_lock:
for height in reversed(sorted(self.headers.known_missing_checkpointed_chunks)):
for height in reversed(sorted(self.headers.known_missing_checkpointed_chunks)):
async with self._header_processing_lock:
await self.headers.ensure_chunk_at(height)
self._other_tasks.add(doit())
await self.update_headers()
@ -716,7 +718,7 @@ class Ledger(metaclass=LedgerRegistry):
if include_is_my_output:
mine = await self.db.get_txo_count(
claim_id=txo.claim_id, txo_type__in=CLAIM_TYPES, is_my_output=True,
unspent=True, accounts=accounts
is_spent=False, accounts=accounts
)
if mine:
txo_copy.is_my_output = True
@ -726,7 +728,7 @@ class Ledger(metaclass=LedgerRegistry):
supports = await self.db.get_txo_sum(
claim_id=txo.claim_id, txo_type=TXO_TYPES['support'],
is_my_input=True, is_my_output=True,
unspent=True, accounts=accounts
is_spent=False, accounts=accounts
)
txo_copy.sent_supports = supports
if include_sent_tips:
@ -750,7 +752,11 @@ class Ledger(metaclass=LedgerRegistry):
async def resolve(self, accounts, urls, **kwargs):
resolve = partial(self.network.retriable_call, self.network.resolve)
txos = (await self._inflate_outputs(resolve(urls), accounts, **kwargs))[0]
urls_copy = list(urls)
txos = []
while urls_copy:
batch, urls_copy = urls_copy[:500], urls_copy[500:]
txos.extend((await self._inflate_outputs(resolve(batch), accounts, **kwargs))[0])
assert len(urls) == len(txos), "Mismatch between urls requested for resolve and responses received."
result = {}
for url, txo in zip(urls, txos):
@ -1058,6 +1064,7 @@ class TestNetLedger(Ledger):
script_address_prefix = bytes((196,))
extended_public_key_prefix = unhexlify('043587cf')
extended_private_key_prefix = unhexlify('04358394')
checkpoints = {}
class RegTestLedger(Ledger):
@ -1072,3 +1079,4 @@ class RegTestLedger(Ledger):
genesis_hash = '6e3fcf1299d4ec5d79c3a4c91d624a4acf9e2e173d95a1a0504f677669687556'
genesis_bits = 0x207fffff
target_timespan = 1
checkpoints = {}

View file

@ -55,7 +55,8 @@ class Conductor:
async def start_blockchain(self):
if not self.blockchain_started:
await self.blockchain_node.start()
asyncio.create_task(self.blockchain_node.start())
await self.blockchain_node.running.wait()
await self.blockchain_node.generate(200)
self.blockchain_started = True
@ -255,6 +256,10 @@ class BlockchainNode:
self.rpcport = 9245 + 2 # avoid conflict with default rpc port
self.rpcuser = 'rpcuser'
self.rpcpassword = 'rpcpassword'
self.stopped = False
self.restart_ready = asyncio.Event()
self.restart_ready.set()
self.running = asyncio.Event()
@property
def rpc_url(self):
@ -315,13 +320,27 @@ class BlockchainNode:
f'-port={self.peerport}'
]
self.log.info(' '.join(command))
self.transport, self.protocol = await loop.subprocess_exec(
BlockchainProcess, *command
)
await self.protocol.ready.wait()
assert not self.protocol.stopped.is_set()
while not self.stopped:
if self.running.is_set():
await asyncio.sleep(1)
continue
await self.restart_ready.wait()
try:
self.transport, self.protocol = await loop.subprocess_exec(
BlockchainProcess, *command
)
await self.protocol.ready.wait()
assert not self.protocol.stopped.is_set()
self.running.set()
except asyncio.CancelledError:
self.running.clear()
raise
except Exception as e:
self.running.clear()
log.exception('failed to start lbrycrdd', exc_info=e)
async def stop(self, cleanup=True):
self.stopped = True
try:
self.transport.terminate()
await self.protocol.stopped.wait()
@ -330,6 +349,16 @@ class BlockchainNode:
if cleanup:
self.cleanup()
async def clear_mempool(self):
self.restart_ready.clear()
self.transport.terminate()
await self.protocol.stopped.wait()
self.transport.close()
self.running.clear()
os.remove(os.path.join(self.data_path, 'regtest', 'mempool.dat'))
self.restart_ready.set()
await self.running.wait()
def cleanup(self):
shutil.rmtree(self.data_path, ignore_errors=True)
@ -361,6 +390,12 @@ class BlockchainNode:
def get_block_hash(self, block):
return self._cli_cmnd('getblockhash', str(block))
def sendrawtransaction(self, tx):
return self._cli_cmnd('sendrawtransaction', tx)
async def get_block(self, block_hash):
return json.loads(await self._cli_cmnd('getblock', block_hash, '1'))
def get_raw_change_address(self):
return self._cli_cmnd('getrawchangeaddress')

View file

@ -39,8 +39,7 @@ from lbry.wallet.tasks import TaskGroup
from .jsonrpc import Request, JSONRPCConnection, JSONRPCv2, JSONRPC, Batch, Notification
from .jsonrpc import RPCError, ProtocolError
from .framing import BadMagicError, BadChecksumError, OversizedPayloadError, BitcoinFramer, NewlineFramer
from .util import Concurrency
from lbry.wallet.server.prometheus import NOTIFICATION_COUNT, RESPONSE_TIMES, REQUEST_ERRORS_COUNT
from lbry.wallet.server.prometheus import NOTIFICATION_COUNT, RESPONSE_TIMES, REQUEST_ERRORS_COUNT, RESET_CONNECTIONS
class Connector:
@ -389,6 +388,7 @@ class RPCSession(SessionBase):
except MemoryError:
self.logger.warning('received oversized message from %s:%s, dropping connection',
self._address[0], self._address[1])
RESET_CONNECTIONS.labels(version=self.client_version).inc()
self._close()
return

View file

@ -2,7 +2,7 @@ import time
import asyncio
from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Optional
import lbry
from lbry.schema.claim import Claim
from lbry.wallet.server.db.writer import SQLDB
@ -10,7 +10,7 @@ from lbry.wallet.server.daemon import DaemonError
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.util import chunks, class_logger
from lbry.wallet.server.leveldb import FlushData
from lbry.wallet.server.prometheus import BLOCK_COUNT, BLOCK_UPDATE_TIMES
from lbry.wallet.server.prometheus import BLOCK_COUNT, BLOCK_UPDATE_TIMES, REORG_COUNT
class Prefetcher:
@ -219,7 +219,7 @@ class BlockProcessor:
'resetting the prefetcher')
await self.prefetcher.reset_height(self.height)
async def reorg_chain(self, count=None):
async def reorg_chain(self, count: Optional[int] = None):
"""Handle a chain reorganisation.
Count is the number of blocks to simulate a reorg, or None for
@ -253,7 +253,9 @@ class BlockProcessor:
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
await self.run_in_thread_with_lock(flush_backup)
last -= len(raw_blocks)
await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height)
await self.prefetcher.reset_height(self.height)
REORG_COUNT.inc()
async def reorg_hashes(self, count):
"""Return a pair (start, last, hashes) of blocks to back up during a
@ -270,7 +272,7 @@ class BlockProcessor:
return start, last, await self.db.fs_block_hashes(start, count)
async def calc_reorg_range(self, count):
async def calc_reorg_range(self, count: Optional[int]):
"""Calculate the reorg range"""
def diff_pos(hashes1, hashes2):

View file

@ -545,11 +545,19 @@ def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_coun
f':$any_{attr}{i}' for i in range(len(any_items))
)
if for_count or attr == 'tag':
any_queries[f'#_any_{attr}'] = f"""
{CLAIM_HASH_OR_REPOST_HASH_SQL} IN (
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
)
"""
if attr == 'tag':
any_queries[f'#_any_{attr}'] = f"""
(claim.claim_type != {CLAIM_TYPES['repost']}
AND claim.claim_hash IN (SELECT claim_hash FROM tag WHERE tag IN ({values}))) OR
(claim.claim_type == {CLAIM_TYPES['repost']} AND
claim.reposted_claim_hash IN (SELECT claim_hash FROM tag WHERE tag IN ({values})))
"""
else:
any_queries[f'#_any_{attr}'] = f"""
{CLAIM_HASH_OR_REPOST_HASH_SQL} IN (
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
)
"""
else:
any_queries[f'#_any_{attr}'] = f"""
EXISTS(
@ -596,11 +604,19 @@ def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_coun
f':$not_{attr}{i}' for i in range(len(not_items))
)
if for_count:
constraints[f'#_not_{attr}'] = f"""
{CLAIM_HASH_OR_REPOST_HASH_SQL} NOT IN (
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
)
"""
if attr == 'tag':
constraints[f'#_not_{attr}'] = f"""
(claim.claim_type != {CLAIM_TYPES['repost']}
AND claim.claim_hash NOT IN (SELECT claim_hash FROM tag WHERE tag IN ({values}))) AND
(claim.claim_type == {CLAIM_TYPES['repost']} AND
claim.reposted_claim_hash NOT IN (SELECT claim_hash FROM tag WHERE tag IN ({values})))
"""
else:
constraints[f'#_not_{attr}'] = f"""
{CLAIM_HASH_OR_REPOST_HASH_SQL} NOT IN (
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
)
"""
else:
constraints[f'#_not_{attr}'] = f"""
NOT EXISTS(

View file

@ -433,6 +433,15 @@ class SQLDB:
return {r.channel_hash for r in affected_channels}
return set()
def delete_claims_above_height(self, height: int):
claim_hashes = [x[0] for x in self.execute(
"SELECT claim_hash FROM claim WHERE height>?", (height, )
).fetchall()]
while claim_hashes:
batch = set(claim_hashes[:500])
claim_hashes = claim_hashes[500:]
self.delete_claims(batch)
def _clear_claim_metadata(self, claim_hashes: Set[bytes]):
if claim_hashes:
for table in ('tag',): # 'language', 'location', etc

View file

@ -51,6 +51,13 @@ BLOCK_COUNT = Gauge(
"block_count", "Number of processed blocks", namespace=NAMESPACE
)
BLOCK_UPDATE_TIMES = Histogram("block_time", "Block update times", namespace=NAMESPACE)
REORG_COUNT = Gauge(
"reorg_count", "Number of reorgs", namespace=NAMESPACE
)
RESET_CONNECTIONS = Counter(
"reset_clients", "Number of reset connections by client version",
namespace=NAMESPACE, labelnames=("version",)
)
class PrometheusServer:

View file

@ -7,6 +7,7 @@ class TaskGroup:
self._loop = loop or get_event_loop()
self._tasks = set()
self.done = Event()
self.started = Event()
def __len__(self):
return len(self._tasks)
@ -14,6 +15,7 @@ class TaskGroup:
def add(self, coro):
task = self._loop.create_task(coro)
self._tasks.add(task)
self.started.set()
self.done.clear()
task.add_done_callback(self._remove)
return task
@ -22,8 +24,10 @@ class TaskGroup:
self._tasks.remove(task)
if len(self._tasks) < 1:
self.done.set()
self.started.clear()
def cancel(self):
for task in self._tasks:
task.cancel()
self.done.set()
self.started.clear()

View file

@ -1,8 +1,11 @@
import logging
from lbry.testcase import IntegrationTestCase
import asyncio
from binascii import hexlify
from lbry.testcase import CommandTestCase
from lbry.wallet.server.prometheus import REORG_COUNT
class BlockchainReorganizationTests(IntegrationTestCase):
class BlockchainReorganizationTests(CommandTestCase):
VERBOSITY = logging.WARN
@ -13,21 +16,105 @@ class BlockchainReorganizationTests(IntegrationTestCase):
)
async def test_reorg(self):
REORG_COUNT.set(0)
# invalidate current block, move forward 2
self.assertEqual(self.ledger.headers.height, 200)
await self.assertBlockHash(200)
await self.blockchain.invalidate_block((await self.ledger.headers.hash(200)).decode())
self.assertEqual(self.ledger.headers.height, 206)
await self.assertBlockHash(206)
await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode())
await self.blockchain.generate(2)
await self.ledger.on_header.where(lambda e: e.height == 201)
self.assertEqual(self.ledger.headers.height, 201)
await self.assertBlockHash(200)
await self.assertBlockHash(201)
await self.ledger.on_header.where(lambda e: e.height == 207)
self.assertEqual(self.ledger.headers.height, 207)
await self.assertBlockHash(206)
await self.assertBlockHash(207)
self.assertEqual(1, REORG_COUNT._samples()[0][2])
# invalidate current block, move forward 3
await self.blockchain.invalidate_block((await self.ledger.headers.hash(200)).decode())
await self.blockchain.invalidate_block((await self.ledger.headers.hash(206)).decode())
await self.blockchain.generate(3)
await self.ledger.on_header.where(lambda e: e.height == 202)
self.assertEqual(self.ledger.headers.height, 202)
await self.assertBlockHash(200)
await self.assertBlockHash(201)
await self.assertBlockHash(202)
await self.ledger.on_header.where(lambda e: e.height == 208)
self.assertEqual(self.ledger.headers.height, 208)
await self.assertBlockHash(206)
await self.assertBlockHash(207)
await self.assertBlockHash(208)
self.assertEqual(2, REORG_COUNT._samples()[0][2])
async def test_reorg_change_claim_height(self):
# sanity check
txos, _, _, _ = await self.ledger.claim_search([], name='hovercraft')
self.assertListEqual(txos, [])
still_valid = await self.daemon.jsonrpc_stream_create(
'still-valid', '1.0', file_path=self.create_upload_file(data=b'hi!')
)
await self.ledger.wait(still_valid)
await self.generate(1)
# create a claim and verify it's returned by claim_search
self.assertEqual(self.ledger.headers.height, 207)
broadcast_tx = await self.daemon.jsonrpc_stream_create(
'hovercraft', '1.0', file_path=self.create_upload_file(data=b'hi!')
)
await self.ledger.wait(broadcast_tx)
await self.generate(1)
await self.ledger.wait(broadcast_tx, self.blockchain.block_expected)
self.assertEqual(self.ledger.headers.height, 208)
txos, _, _, _ = await self.ledger.claim_search([], name='hovercraft')
self.assertEqual(1, len(txos))
txo = txos[0]
self.assertEqual(txo.tx_ref.id, broadcast_tx.id)
self.assertEqual(txo.tx_ref.height, 208)
# check that our tx is in block 208 as returned by lbrycrdd
invalidated_block_hash = (await self.ledger.headers.hash(208)).decode()
block_207 = await self.blockchain.get_block(invalidated_block_hash)
self.assertIn(txo.tx_ref.id, block_207['tx'])
self.assertEqual(208, txos[0].tx_ref.height)
# reorg the last block dropping our claim tx
await self.blockchain.invalidate_block(invalidated_block_hash)
await self.blockchain.clear_mempool()
await self.blockchain.generate(2)
# verify the claim was dropped from block 208 as returned by lbrycrdd
reorg_block_hash = await self.blockchain.get_block_hash(208)
self.assertNotEqual(invalidated_block_hash, reorg_block_hash)
block_207 = await self.blockchain.get_block(reorg_block_hash)
self.assertNotIn(txo.tx_ref.id, block_207['tx'])
# wait for the client to catch up and verify the reorg
await asyncio.wait_for(self.on_header(209), 3.0)
await self.assertBlockHash(207)
await self.assertBlockHash(208)
await self.assertBlockHash(209)
client_reorg_block_hash = (await self.ledger.headers.hash(208)).decode()
self.assertEqual(client_reorg_block_hash, reorg_block_hash)
# verify the dropped claim is no longer returned by claim search
txos, _, _, _ = await self.ledger.claim_search([], name='hovercraft')
self.assertListEqual(txos, [])
# verify the claim published a block earlier wasn't also reverted
txos, _, _, _ = await self.ledger.claim_search([], name='still-valid')
self.assertEqual(1, len(txos))
self.assertEqual(207, txos[0].tx_ref.height)
# broadcast the claim in a different block
new_txid = await self.blockchain.sendrawtransaction(hexlify(broadcast_tx.raw).decode())
self.assertEqual(broadcast_tx.id, new_txid)
await self.blockchain.generate(1)
# wait for the client to catch up
await asyncio.wait_for(self.on_header(210), 1.0)
# verify the claim is in the new block and that it is returned by claim_search
block_210 = await self.blockchain.get_block((await self.ledger.headers.hash(210)).decode())
self.assertIn(txo.tx_ref.id, block_210['tx'])
txos, _, _, _ = await self.ledger.claim_search([], name='hovercraft')
self.assertEqual(1, len(txos))
self.assertEqual(txos[0].tx_ref.id, new_txid)
self.assertEqual(210, txos[0].tx_ref.height)
# this should still be unchanged
txos, _, _, _ = await self.ledger.claim_search([], name='still-valid')
self.assertEqual(1, len(txos))
self.assertEqual(207, txos[0].tx_ref.height)

View file

@ -1,6 +1,7 @@
import os.path
import tempfile
import logging
import asyncio
from binascii import unhexlify
from urllib.request import urlopen
@ -79,6 +80,12 @@ class ClaimSearchCommand(ClaimTestCase):
] * 23828
self.assertListEqual([], await self.claim_search(claim_ids=claim_ids))
# this should do nothing... if the resolve (which is retried) results in the server disconnecting,
# it kerplodes
await asyncio.wait_for(self.daemon.jsonrpc_resolve([
f'0000000000000000000000000000000000000000{i}' for i in range(30000)
]), 30)
# 23829 claim ids makes the request just large enough
claim_ids = [
'0000000000000000000000000000000000000000',
@ -423,18 +430,18 @@ class TransactionOutputCommands(ClaimTestCase):
async def test_txo_list_and_sum_filtering(self):
channel_id = self.get_claim_id(await self.channel_create())
self.assertEqual('1.0', lbc(await self.txo_sum(type='channel', unspent=True)))
self.assertEqual('1.0', lbc(await self.txo_sum(type='channel', is_not_spent=True)))
await self.channel_update(channel_id, bid='0.5')
self.assertEqual('0.5', lbc(await self.txo_sum(type='channel', unspent=True)))
self.assertEqual('0.5', lbc(await self.txo_sum(type='channel', is_not_spent=True)))
self.assertEqual('1.5', lbc(await self.txo_sum(type='channel')))
stream_id = self.get_claim_id(await self.stream_create(bid='1.3'))
self.assertEqual('1.3', lbc(await self.txo_sum(type='stream', unspent=True)))
self.assertEqual('1.3', lbc(await self.txo_sum(type='stream', is_not_spent=True)))
await self.stream_update(stream_id, bid='0.7')
self.assertEqual('0.7', lbc(await self.txo_sum(type='stream', unspent=True)))
self.assertEqual('0.7', lbc(await self.txo_sum(type='stream', is_not_spent=True)))
self.assertEqual('2.0', lbc(await self.txo_sum(type='stream')))
self.assertEqual('1.2', lbc(await self.txo_sum(type=['stream', 'channel'], unspent=True)))
self.assertEqual('1.2', lbc(await self.txo_sum(type=['stream', 'channel'], is_not_spent=True)))
self.assertEqual('3.5', lbc(await self.txo_sum(type=['stream', 'channel'])))
# type filtering
@ -496,22 +503,35 @@ class TransactionOutputCommands(ClaimTestCase):
address2 = await self.daemon.jsonrpc_address_unused(wallet_id=wallet2.id)
await self.channel_create('@kept-channel')
await self.channel_create('@sent-channel', claim_address=address2)
await self.wallet_send('2.9', address2)
# all txos on second wallet
received_channel, = await self.txo_list(wallet_id=wallet2.id, is_my_input_or_output=True)
received_payment, received_channel = await self.txo_list(
wallet_id=wallet2.id, is_my_input_or_output=True)
self.assertEqual('1.0', received_channel['amount'])
self.assertFalse(received_channel['is_my_input'])
self.assertTrue(received_channel['is_my_output'])
self.assertFalse(received_channel['is_internal_transfer'])
self.assertEqual('2.9', received_payment['amount'])
self.assertFalse(received_payment['is_my_input'])
self.assertTrue(received_payment['is_my_output'])
self.assertFalse(received_payment['is_internal_transfer'])
# all txos on default wallet
r = await self.txo_list(is_my_input_or_output=True)
self.assertEqual(
['1.0', '7.947786', '1.0', '8.973893', '10.0'],
['2.9', '5.047662', '1.0', '7.947786', '1.0', '8.973893', '10.0'],
[t['amount'] for t in r]
)
sent_channel, change2, kept_channel, change1, initial_funds = r
sent_payment, change3, sent_channel, change2, kept_channel, change1, initial_funds = r
self.assertTrue(sent_payment['is_my_input'])
self.assertFalse(sent_payment['is_my_output'])
self.assertFalse(sent_payment['is_internal_transfer'])
self.assertTrue(change3['is_my_input'])
self.assertTrue(change3['is_my_output'])
self.assertTrue(change3['is_internal_transfer'])
self.assertTrue(sent_channel['is_my_input'])
self.assertFalse(sent_channel['is_my_output'])
@ -533,27 +553,31 @@ class TransactionOutputCommands(ClaimTestCase):
# my stuff and stuff i sent excluding "change"
r = await self.txo_list(is_my_input_or_output=True, exclude_internal_transfers=True)
self.assertEqual([sent_channel, kept_channel, initial_funds], r)
self.assertEqual([sent_payment, sent_channel, kept_channel, initial_funds], r)
# my unspent stuff and stuff i sent excluding "change"
r = await self.txo_list(is_my_input_or_output=True, unspent=True, exclude_internal_transfers=True)
self.assertEqual([sent_channel, kept_channel], r)
r = await self.txo_list(is_my_input_or_output=True, is_not_spent=True, exclude_internal_transfers=True)
self.assertEqual([sent_payment, sent_channel, kept_channel], r)
# only "change"
r = await self.txo_list(is_my_input=True, is_my_output=True, type="other")
self.assertEqual([change2, change1], r)
self.assertEqual([change3, change2, change1], r)
# only unspent "change"
r = await self.txo_list(is_my_input=True, is_my_output=True, type="other", unspent=True)
self.assertEqual([change2], r)
r = await self.txo_list(is_my_input=True, is_my_output=True, type="other", is_not_spent=True)
self.assertEqual([change3], r)
# only spent "change"
r = await self.txo_list(is_my_input=True, is_my_output=True, type="other", is_spent=True)
self.assertEqual([change2, change1], r)
# all my unspent stuff
r = await self.txo_list(is_my_output=True, unspent=True)
self.assertEqual([change2, kept_channel], r)
r = await self.txo_list(is_my_output=True, is_not_spent=True)
self.assertEqual([change3, kept_channel], r)
# stuff i sent
r = await self.txo_list(is_not_my_output=True)
self.assertEqual([sent_channel], r)
self.assertEqual([sent_payment, sent_channel], r)
async def test_txo_plot(self):
day_blocks = int((24 * 60 * 60) / self.ledger.headers.timestamp_average_offset)
@ -610,6 +634,26 @@ class TransactionOutputCommands(ClaimTestCase):
{'day': '2016-06-25', 'total': '0.6'},
], plot)
async def test_txo_spend(self):
stream_id = self.get_claim_id(await self.stream_create())
for _ in range(10):
await self.support_create(stream_id, '0.1')
await self.assertBalance(self.account, '7.978478')
self.assertEqual('1.0', lbc(await self.txo_sum(type='support', is_not_spent=True)))
txs = await self.txo_spend(type='support', batch_size=3, include_full_tx=True)
self.assertEqual(4, len(txs))
self.assertEqual(3, len(txs[0]['inputs']))
self.assertEqual(3, len(txs[1]['inputs']))
self.assertEqual(3, len(txs[2]['inputs']))
self.assertEqual(1, len(txs[3]['inputs']))
self.assertEqual('0.0', lbc(await self.txo_sum(type='support', is_not_spent=True)))
await self.assertBalance(self.account, '8.977606')
await self.support_create(stream_id, '0.1')
txs = await self.daemon.jsonrpc_txo_spend(type='support', batch_size=3)
self.assertEqual(1, len(txs))
self.assertEqual({'txid'}, set(txs[0]))
class ClaimCommands(ClaimTestCase):

View file

@ -1,4 +1,3 @@
import logging
import asyncio
import lbry

View file

@ -1,6 +1,5 @@
import asyncio
import json
import os
from lbry.wallet import ENCRYPT_ON_DISK
from lbry.error import InvalidPasswordError
@ -22,14 +21,26 @@ class WalletCommands(CommandTestCase):
async def test_wallet_syncing_status(self):
address = await self.daemon.jsonrpc_address_unused()
sendtxid = await self.blockchain.send_to_address(address, 1)
self.assertFalse(self.daemon.jsonrpc_wallet_status()['is_syncing'])
await self.blockchain.send_to_address(address, 1)
await self.ledger._update_tasks.started.wait()
self.assertTrue(self.daemon.jsonrpc_wallet_status()['is_syncing'])
await self.ledger._update_tasks.done.wait()
self.assertFalse(self.daemon.jsonrpc_wallet_status()['is_syncing'])
async def eventually_will_sync():
while not self.daemon.jsonrpc_wallet_status()['is_syncing']:
await asyncio.sleep(0)
check_sync = asyncio.create_task(eventually_will_sync())
await self.confirm_tx(sendtxid, self.ledger)
await asyncio.wait_for(check_sync, timeout=10)
wallet = self.daemon.component_manager.get_actual_component('wallet')
wallet_manager = wallet.wallet_manager
# when component manager hasn't started yet
wallet.wallet_manager = None
self.assertEqual(
{'is_encrypted': None, 'is_syncing': None, 'is_locked': None},
self.daemon.jsonrpc_wallet_status()
)
wallet.wallet_manager = wallet_manager
self.assertEqual(
{'is_encrypted': False, 'is_syncing': False, 'is_locked': False},
self.daemon.jsonrpc_wallet_status()
)
async def test_wallet_reconnect(self):
await self.conductor.spv_node.stop(True)

View file

@ -45,14 +45,6 @@ class TestSessions(IntegrationTestCase):
await self.ledger.network.broadcast('13370042004200')
class TestSegwitServer(IntegrationTestCase):
LEDGER = lbry.wallet
ENABLE_SEGWIT = True
async def test_at_least_it_starts(self):
await asyncio.wait_for(self.ledger.network.get_headers(0, 1), 1.0)
class TestUsagePayment(CommandTestCase):
async def test_single_server_payment(self):
wallet_pay_service = self.daemon.component_manager.get_component('wallet_server_payments')
@ -81,7 +73,7 @@ class TestUsagePayment(CommandTestCase):
self.assertEqual(features["payment_address"], address)
self.assertEqual(features["daily_fee"], "1.1")
with self.assertRaises(ServerPaymentFeeAboveMaxAllowedError):
await asyncio.wait_for(wallet_pay_service.on_payment.first, timeout=3)
await asyncio.wait_for(wallet_pay_service.on_payment.first, timeout=8)
await node.stop(False)
await node.start(self.blockchain, extraconf={"PAYMENT_ADDRESS": address, "DAILY_FEE": "1.0"})

View file

@ -21,6 +21,11 @@ class FileCommands(CommandTestCase):
self.assertEqual(file1['claim_name'], 'foo')
self.assertEqual(file2['claim_name'], 'foo2')
self.assertItemCount(await self.daemon.jsonrpc_file_list(claim_id=[file1['claim_id'], file2['claim_id']]), 2)
self.assertItemCount(await self.daemon.jsonrpc_file_list(claim_id=file1['claim_id']), 1)
self.assertItemCount(await self.daemon.jsonrpc_file_list(outpoint=[file1['outpoint'], file2['outpoint']]), 2)
self.assertItemCount(await self.daemon.jsonrpc_file_list(outpoint=file1['outpoint']), 1)
await self.daemon.jsonrpc_file_delete(claim_name='foo')
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
await self.daemon.jsonrpc_file_delete(claim_name='foo2')

View file

@ -106,11 +106,16 @@ class MockedCommentServer:
return False
def hide_comments(self, pieces: list):
comments_hidden = []
hidden = []
for p in pieces:
if self.hide_comment(**p):
comments_hidden.append(p['comment_id'])
return {'hidden': comments_hidden}
hidden.append(p['comment_id'])
comment_ids = {c['comment_id'] for c in pieces}
return {
'hidden': hidden,
'visible': list(comment_ids - set(hidden))
}
def get_claim_comments(self, claim_id, page=1, page_size=50,**kwargs):
comments = list(filter(lambda c: c['claim_id'] == claim_id, self.comments))
@ -138,12 +143,19 @@ class MockedCommentServer:
def get_comment_channel_by_id(self, comment_id: int, **kwargs):
comment = self.comments[self.get_comment_id(comment_id)]
return {
'channel_id': comment.get('channel_id'),
'channel_name': comment.get('channel_name')
'channel_id': comment['channel_id'],
'channel_name': comment['channel_name'],
}
def get_comments_by_id(self, comment_ids: list):
return [self.comments[self.get_comment_id(cid)] for cid in comment_ids]
comments = [self.comments[self.get_comment_id(cid)] for cid in comment_ids]
return {
'page': 1,
'page_size': len(comment_ids),
'total_pages': 1,
'items': comments,
'has_hidden_comments': bool({c for c in comments if c['is_hidden']})
}
methods = {
'get_claim_comments': get_claim_comments,

View file

@ -60,7 +60,7 @@ class TranscodeValidation(ClaimTestCase):
self.assertEqual(self.video_file_webm, new_file_name)
self.assertEqual(spec["width"], 1280)
self.assertEqual(spec["height"], 720)
self.assertEqual(spec["duration"], 15.054)
self.assertEqual(spec["duration"], 16)
async def test_volume(self):
self.conf.volume_analysis_time = 200
@ -160,3 +160,26 @@ class TranscodeValidation(ClaimTestCase):
await self.analyzer.status(reset=True)
with self.assertRaisesRegex(Exception, "Unable to locate"):
await self.analyzer.verify_or_repair(True, False, self.video_file_name)
async def test_dont_recheck_ffmpeg_installation(self):
call_count = 0
original = self.daemon._video_file_analyzer._verify_ffmpeg_installed
def _verify_ffmpeg_installed():
nonlocal call_count
call_count += 1
return original()
self.daemon._video_file_analyzer._verify_ffmpeg_installed = _verify_ffmpeg_installed
self.assertEqual(0, call_count)
await self.daemon.jsonrpc_status()
self.assertEqual(1, call_count)
# counter should not go up again
await self.daemon.jsonrpc_status()
self.assertEqual(1, call_count)
# this should force rechecking the installation
await self.daemon.jsonrpc_ffmpeg_find()
self.assertEqual(2, call_count)

View file

@ -46,7 +46,9 @@ class TestStreamAssembler(AsyncioTestCase):
reflector.start_server(5566, '127.0.0.1')
await reflector.started_listening.wait()
self.addCleanup(reflector.stop_server)
self.assertEqual(0, self.stream.reflector_progress)
sent = await self.stream.upload_to_reflector('127.0.0.1', 5566)
self.assertEqual(100, self.stream.reflector_progress)
self.assertSetEqual(
set(sent),
set(map(lambda b: b.blob_hash,

View file

@ -143,6 +143,37 @@ class TestHeaders(AsyncioTestCase):
self.assertEqual(7, headers.height)
await headers.connect(len(headers), HEADERS[block_bytes(8):])
self.assertEqual(19, headers.height)
# verify from middle
await headers.repair(start_height=10)
self.assertEqual(19, headers.height)
def test_do_not_estimate_unconfirmed(self):
headers = Headers(':memory:')
self.assertIsNone(headers.estimated_timestamp(-1))
self.assertIsNone(headers.estimated_timestamp(0))
self.assertIsNotNone(headers.estimated_timestamp(1))
async def test_misalignment_triggers_repair_on_open(self):
headers = Headers(':memory:')
headers.io.seek(0)
headers.io.write(HEADERS)
with self.assertLogs(level='WARN') as cm:
await headers.open()
self.assertEqual(cm.output, [])
headers.io.seek(0)
headers.io.truncate()
headers.io.write(HEADERS[:block_bytes(10)])
headers.io.write(b'ops')
headers.io.write(HEADERS[block_bytes(10):])
await headers.open()
self.assertEqual(
cm.output, [
'WARNING:lbry.wallet.header:Reader file size doesnt match header size. '
'Repairing, might take a while.',
'WARNING:lbry.wallet.header:Header file corrupted at height 9, truncating '
'it.'
]
)
async def test_concurrency(self):
BLOCKS = 19