Drop comment_* apis.

Refactored dangling functions.
Added unit test.
This commit is contained in:
Cristian Vicas 2021-08-31 14:54:04 +03:00
parent fef0cc764d
commit 45bf6c3bf3
5 changed files with 53 additions and 48 deletions

View file

@ -1,41 +0,0 @@
import logging
import time
import hashlib
import binascii
import ecdsa
from lbry.crypto.hash import sha256
from lbry.wallet.transaction import Output
log = logging.getLogger(__name__)
def get_encoded_signature(signature):
signature = signature.encode() if isinstance(signature, str) else signature
r = int(signature[:int(len(signature) / 2)], 16)
s = int(signature[int(len(signature) / 2):], 16)
return ecdsa.util.sigencode_der(r, s, len(signature) * 4)
def verify(channel, data, signature, channel_hash=None):
pieces = [
signature['signing_ts'].encode(),
channel_hash or channel.claim_hash,
data
]
return Output.is_signature_valid(
get_encoded_signature(signature['signature']),
sha256(b''.join(pieces)),
channel.claim.channel.public_key_bytes
)
def sign(channel, data):
timestamp = str(int(time.time()))
pieces = [timestamp.encode(), channel.claim_hash, data]
digest = sha256(b''.join(pieces))
signature = channel.private_key.sign_digest_deterministic(digest, hashfunc=hashlib.sha256)
return {
'signature': binascii.hexlify(signature).decode(),
'signing_ts': timestamp
}

View file

@ -46,7 +46,6 @@ from lbry.extras.daemon.components import EXCHANGE_RATE_MANAGER_COMPONENT, UPNP_
from lbry.extras.daemon.componentmanager import RequiredCondition from lbry.extras.daemon.componentmanager import RequiredCondition
from lbry.extras.daemon.componentmanager import ComponentManager from lbry.extras.daemon.componentmanager import ComponentManager
from lbry.extras.daemon.json_response_encoder import JSONResponseEncoder from lbry.extras.daemon.json_response_encoder import JSONResponseEncoder
from lbry.extras.daemon import comment_client
from lbry.extras.daemon.undecorated import undecorated from lbry.extras.daemon.undecorated import undecorated
from lbry.extras.daemon.security import ensure_request_allowed from lbry.extras.daemon.security import ensure_request_allowed
from lbry.file_analysis import VideoFileAnalyzer from lbry.file_analysis import VideoFileAnalyzer
@ -2837,7 +2836,12 @@ class Daemon(metaclass=JSONRPCServerType):
signing_channel = await self.get_channel_or_error( signing_channel = await self.get_channel_or_error(
wallet, channel_account_id, channel_id, channel_name, for_signing=True wallet, channel_account_id, channel_id, channel_name, for_signing=True
) )
return comment_client.sign(signing_channel, unhexlify(hexdata)) timestamp = str(int(time.time()))
signature = signing_channel.sign_data(unhexlify(hexdata), timestamp)
return {
'signature': signature,
'signing_ts': timestamp
}
@requires(WALLET_COMPONENT) @requires(WALLET_COMPONENT)
async def jsonrpc_channel_abandon( async def jsonrpc_channel_abandon(

View file

@ -459,6 +459,12 @@ class Output(InputOutput):
self.signable.signature = channel.private_key.sign_digest_deterministic(digest, hashfunc=hashlib.sha256) self.signable.signature = channel.private_key.sign_digest_deterministic(digest, hashfunc=hashlib.sha256)
self.script.generate() self.script.generate()
def sign_data(self, data:bytes, timestamp:str) -> str:
pieces = [timestamp.encode(), self.claim_hash, data]
digest = sha256(b''.join(pieces))
signature = self.private_key.sign_digest_deterministic(digest, hashfunc=hashlib.sha256)
return hexlify(signature).decode()
def clear_signature(self): def clear_signature(self):
self.channel = None self.channel = None
self.signable.clear_signature() self.signable.clear_signature()

View file

@ -5,19 +5,38 @@ import asyncio
from binascii import unhexlify from binascii import unhexlify
from unittest import skip from unittest import skip
from urllib.request import urlopen from urllib.request import urlopen
import ecdsa
from lbry.error import InsufficientFundsError from lbry.error import InsufficientFundsError
from lbry.extras.daemon.comment_client import verify
from lbry.extras.daemon.daemon import DEFAULT_PAGE_SIZE from lbry.extras.daemon.daemon import DEFAULT_PAGE_SIZE
from lbry.testcase import CommandTestCase from lbry.testcase import CommandTestCase
from lbry.wallet.orchstr8.node import SPVNode from lbry.wallet.orchstr8.node import SPVNode
from lbry.wallet.transaction import Transaction from lbry.wallet.transaction import Transaction, Output
from lbry.wallet.util import satoshis_to_coins as lbc from lbry.wallet.util import satoshis_to_coins as lbc
from lbry.crypto.hash import sha256
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def get_encoded_signature(signature):
signature = signature.encode() if isinstance(signature, str) else signature
r = int(signature[:int(len(signature) / 2)], 16)
s = int(signature[int(len(signature) / 2):], 16)
return ecdsa.util.sigencode_der(r, s, len(signature) * 4)
def verify(channel, data, signature, channel_hash=None):
pieces = [
signature['signing_ts'].encode(),
channel_hash or channel.claim_hash,
data
]
return Output.is_signature_valid(
get_encoded_signature(signature['signature']),
sha256(b''.join(pieces)),
channel.claim.channel.public_key_bytes
)
class ClaimTestCase(CommandTestCase): class ClaimTestCase(CommandTestCase):

View file

@ -2,10 +2,9 @@ from binascii import unhexlify
from lbry.testcase import AsyncioTestCase from lbry.testcase import AsyncioTestCase
from lbry.wallet.constants import CENT, NULL_HASH32 from lbry.wallet.constants import CENT, NULL_HASH32
from lbry.wallet import Ledger, Database, Headers, Transaction, Input, Output from lbry.wallet import Ledger, Database, Headers, Transaction, Input, Output
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
from lbry.crypto.hash import sha256
def get_output(amount=CENT, pubkey_hash=NULL_HASH32): def get_output(amount=CENT, pubkey_hash=NULL_HASH32):
return Transaction() \ return Transaction() \
@ -114,3 +113,21 @@ class TestValidatingOldSignatures(AsyncioTestCase):
}) })
self.assertTrue(stream.is_signed_by(channel, ledger)) self.assertTrue(stream.is_signed_by(channel, ledger))
class TestValidateSignContent(AsyncioTestCase):
async def test_sign_some_content(self):
some_content = "MEANINGLESS CONTENT AEE3353320".encode()
timestamp_str = "1630564175"
channel = await get_channel()
stream = get_stream()
signature = channel.sign_data(some_content, timestamp_str)
stream.signable.signature = unhexlify(signature.encode())
encoded_signature = stream.get_encoded_signature()
pieces = [timestamp_str.encode(), channel.claim_hash, some_content]
self.assertTrue(Output.is_signature_valid(
encoded_signature,
sha256(b''.join(pieces)),
channel.claim.channel.public_key_bytes
))