lbry-sdk/tests/integration/wallet/test_transactions.py

94 lines
3.4 KiB
Python
Raw Normal View History

2018-06-12 17:53:29 +02:00
import asyncio
2018-06-14 21:18:36 +02:00
from binascii import hexlify
from orchstr8.testcase import IntegrationTestCase, d2f
2018-06-12 17:53:29 +02:00
from lbryschema.claim import ClaimDict
2018-06-14 07:32:11 +02:00
from torba.constants import COIN
2018-06-14 21:18:36 +02:00
from lbrynet.wallet.transaction import Transaction
from lbrynet.wallet.account import generate_certificate
import lbryschema
lbryschema.BLOCKCHAIN_NAME = 'lbrycrd_regtest'
2018-06-12 17:53:29 +02:00
example_claim_dict = {
"version": "_0_0_1",
"claimType": "streamType",
"stream": {
"source": {
"source": "d5169241150022f996fa7cd6a9a1c421937276a3275eb912790bd07ba7aec1fac5fd45431d226b8fb402691e79aeb24b",
"version": "_0_0_1",
"contentType": "video/mp4",
"sourceType": "lbry_sd_hash"
},
"version": "_0_0_1",
"metadata": {
"license": "LBRY Inc",
"description": "What is LBRY? An introduction with Alex Tabarrok",
"language": "en",
"title": "What is LBRY?",
"author": "Samuel Bryan",
"version": "_0_1_0",
"nsfw": False,
"licenseUrl": "",
"preview": "",
"thumbnail": "https://s3.amazonaws.com/files.lbry.io/logo.png"
}
}
}
class BasicTransactionTest(IntegrationTestCase):
2018-06-12 17:53:29 +02:00
VERBOSE = False
2018-06-12 17:53:29 +02:00
async def test_creating_updating_and_abandoning_claim_with_channel(self):
2018-06-12 17:53:29 +02:00
await d2f(self.account.ensure_address_gap())
2018-06-12 17:53:29 +02:00
address1, address2 = await d2f(self.account.receiving.get_usable_addresses(2))
sendtxid1 = await self.blockchain.send_to_address(address1.decode(), 5)
sendtxid2 = await self.blockchain.send_to_address(address2.decode(), 5)
2018-06-14 07:32:11 +02:00
await self.blockchain.generate(1)
await asyncio.wait([
self.on_transaction_id(sendtxid1),
self.on_transaction_id(sendtxid2),
])
2018-06-14 07:32:11 +02:00
self.assertEqual(round(await self.get_balance(self.account)/COIN, 1), 10.0)
2018-06-12 17:53:29 +02:00
cert, key = generate_certificate()
cert_tx = await d2f(Transaction.claim(b'@bar', cert, 1*COIN, address1, [self.account], self.account))
2018-06-12 17:53:29 +02:00
claim = ClaimDict.load_dict(example_claim_dict)
claim = claim.sign(key, address1, hexlify(cert_tx.get_claim_id(0)))
claim_tx = await d2f(Transaction.claim(b'foo', claim, 1*COIN, address1, [self.account], self.account))
await self.broadcast(cert_tx)
await self.broadcast(claim_tx)
await asyncio.wait([ # mempool
self.on_transaction(claim_tx),
self.on_transaction(cert_tx),
])
2018-06-14 07:32:11 +02:00
await self.blockchain.generate(1)
await asyncio.wait([ # confirmed
self.on_transaction(claim_tx),
self.on_transaction(cert_tx),
])
2018-06-12 17:53:29 +02:00
self.assertEqual(round(await d2f(self.account.get_balance())/COIN, 1), 8.0)
self.assertEqual(round(await d2f(self.account.get_balance(True))/COIN, 1), 10.0)
2018-06-12 17:53:29 +02:00
response = await d2f(self.ledger.resolve('lbry://@bar/foo'))
self.assertIn('lbry://@bar/foo', response)
2018-06-12 17:53:29 +02:00
claim_txo = claim_tx.outputs[0]
claim_txo.txoid = await d2f(self.ledger.db.get_txoid_for_txo(claim_txo))
abandon_tx = await d2f(Transaction.abandon(claim_txo, [self.account], self.account))
await self.broadcast(abandon_tx)
await self.on_transaction(abandon_tx)
await self.blockchain.generate(1)
await self.on_transaction(abandon_tx)
2018-06-12 17:53:29 +02:00
response = await d2f(self.ledger.resolve('lbry://@bar/foo'))
self.assertIn('lbry://@bar/foo', response)