Merge pull request #3608 from lbryio/fix_ci

upgraded SDK to use the new LBRY hub project
This commit is contained in:
Lex Berezhny 2022-06-06 09:01:57 -04:00 committed by GitHub
commit 39fcfcccfb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 67 additions and 28 deletions

View file

@ -138,7 +138,7 @@ class BlobServerProtocol(asyncio.Protocol):
try: try:
request = BlobRequest.deserialize(self.buf + data) request = BlobRequest.deserialize(self.buf + data)
self.buf = remainder self.buf = remainder
except JSONDecodeError: except (UnicodeDecodeError, JSONDecodeError):
log.error("request from %s is not valid json (%i bytes): %s", self.peer_address_and_port, log.error("request from %s is not valid json (%i bytes): %s", self.peer_address_and_port,
len(self.buf + data), '' if not data else binascii.hexlify(self.buf + data).decode()) len(self.buf + data), '' if not data else binascii.hexlify(self.buf + data).decode())
self.close() self.close()

View file

@ -348,7 +348,6 @@ class Network:
await self._keepalive_task await self._keepalive_task
if self._urgent_need_reconnect.is_set(): if self._urgent_need_reconnect.is_set():
log.warning("urgent reconnect needed") log.warning("urgent reconnect needed")
self._urgent_need_reconnect.clear()
if self._keepalive_task and not self._keepalive_task.done(): if self._keepalive_task and not self._keepalive_task.done():
self._keepalive_task.cancel() self._keepalive_task.cancel()
except asyncio.CancelledError: except asyncio.CancelledError:

View file

@ -17,13 +17,6 @@ from typing import Type, Optional
import urllib.request import urllib.request
from uuid import uuid4 from uuid import uuid4
try:
from scribe.env import Env
from scribe.hub.service import HubServerService
from scribe.elasticsearch.service import ElasticSyncService
from scribe.blockchain.service import BlockchainProcessorService
except ImportError:
pass
import lbry import lbry
from lbry.wallet import Wallet, Ledger, RegTestLedger, WalletManager, Account, BlockHeightEvent from lbry.wallet import Wallet, Ledger, RegTestLedger, WalletManager, Account, BlockHeightEvent
@ -32,6 +25,16 @@ from lbry.wallet.orchstr8 import __hub_url__
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
try:
from hub.herald.env import ServerEnv
from hub.scribe.env import BlockchainEnv
from hub.elastic_sync.env import ElasticEnv
from hub.herald.service import HubServerService
from hub.elastic_sync.service import ElasticSyncService
from hub.scribe.service import BlockchainProcessorService
except ImportError:
pass
def get_lbcd_node_from_ledger(ledger_module): def get_lbcd_node_from_ledger(ledger_module):
return LBCDNode( return LBCDNode(
@ -256,14 +259,25 @@ class SPVNode:
'session_timeout': self.session_timeout, 'session_timeout': self.session_timeout,
'max_query_workers': 0, 'max_query_workers': 0,
'es_index_prefix': self.index_name, 'es_index_prefix': self.index_name,
'chain': 'regtest' 'chain': 'regtest',
'index_address_status': False
} }
if extraconf: if extraconf:
conf.update(extraconf) conf.update(extraconf)
env = Env(**conf) self.writer = BlockchainProcessorService(
self.writer = BlockchainProcessorService(env) BlockchainEnv(db_dir=self.data_path, daemon_url=lbcwallet_node.rpc_url,
self.server = HubServerService(env) reorg_limit=100, max_query_workers=0, chain='regtest', index_address_status=False)
self.es_writer = ElasticSyncService(env) )
self.server = HubServerService(ServerEnv(**conf))
self.es_writer = ElasticSyncService(
ElasticEnv(
db_dir=self.data_path, reorg_limit=100, max_query_workers=0, chain='regtest',
elastic_notifier_port=self.elastic_notifier_port,
es_index_prefix=self.index_name,
filtering_channel_ids=(extraconf or {}).get('filtering_channel_ids'),
blocking_channel_ids=(extraconf or {}).get('blocking_channel_ids')
)
)
await self.writer.start() await self.writer.start()
await self.es_writer.start() await self.es_writer.start()
await self.server.start() await self.server.start()

View file

@ -68,8 +68,8 @@ setup(
'coverage', 'coverage',
'jsonschema==4.4.0', 'jsonschema==4.4.0',
], ],
'scribe': [ 'hub': [
'scribe @ git+https://github.com/lbryio/scribe.git@311db529a03de7fce43ed8579f51ac23a1a884ea' 'hub@git+https://github.com/lbryio/hub.git@76dd9c392b776a2823015762814f375794120076'
] ]
}, },
classifiers=[ classifiers=[

View file

@ -10,20 +10,21 @@ class BlockchainReorganizationTests(CommandTestCase):
async def assertBlockHash(self, height): async def assertBlockHash(self, height):
bp = self.conductor.spv_node.writer bp = self.conductor.spv_node.writer
reader = self.conductor.spv_node.server
def get_txids(): def get_txids():
return [ return [
bp.db.fs_tx_hash(tx_num)[0][::-1].hex() reader.db.fs_tx_hash(tx_num)[0][::-1].hex()
for tx_num in range(bp.db.tx_counts[height - 1], bp.db.tx_counts[height]) for tx_num in range(bp.db.tx_counts[height - 1], bp.db.tx_counts[height])
] ]
block_hash = await self.blockchain.get_block_hash(height) block_hash = await self.blockchain.get_block_hash(height)
self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode()) self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode())
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex()) self.assertEqual(block_hash, (await reader.db.fs_block_hashes(height, 1))[0][::-1].hex())
txids = await asyncio.get_event_loop().run_in_executor(None, get_txids) txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
txs = await bp.db.get_transactions_and_merkles(txids) txs = await reader.db.get_transactions_and_merkles(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx'] block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions') self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order') self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')

View file

@ -1,12 +1,12 @@
import asyncio import asyncio
import scribe import hub
from unittest.mock import Mock from unittest.mock import Mock
from scribe.hub import HUB_PROTOCOL_VERSION from hub.herald import HUB_PROTOCOL_VERSION
from scribe.hub.udp import StatusServer from hub.herald.udp import StatusServer
from scribe.hub.session import LBRYElectrumX from hub.herald.session import LBRYElectrumX
from scribe.blockchain.network import LBCRegTest from hub.scribe.network import LBCRegTest
from lbry.wallet.network import Network from lbry.wallet.network import Network
from lbry.wallet.orchstr8 import Conductor from lbry.wallet.orchstr8 import Conductor
@ -116,7 +116,7 @@ class ReconnectTests(IntegrationTestCase):
# disconnect and send a new tx, should reconnect and get it # disconnect and send a new tx, should reconnect and get it
self.ledger.network.client.transport.close() self.ledger.network.client.transport.close()
self.assertFalse(self.ledger.network.is_connected) self.assertFalse(self.ledger.network.is_connected)
await self.ledger.resolve([], 'derp') await self.ledger.resolve([], ['derp'])
sendtxid = await self.send_to_address_and_wait(address1, 1.1337, 1) sendtxid = await self.send_to_address_and_wait(address1, 1.1337, 1)
self.assertLess(self.ledger.network.client.response_time, 1) # response time properly set lower, we are fine self.assertLess(self.ledger.network.client.response_time, 1) # response time properly set lower, we are fine
@ -139,6 +139,7 @@ class ReconnectTests(IntegrationTestCase):
if not self.ledger.network.is_connected: if not self.ledger.network.is_connected:
await asyncio.wait_for(self.ledger.network.on_connected.first, timeout=10.0) await asyncio.wait_for(self.ledger.network.on_connected.first, timeout=10.0)
# omg, the burned cable still works! torba is fire proof! # omg, the burned cable still works! torba is fire proof!
await self.ledger.on_header.where(self.blockchain.is_expected_block)
await self.ledger.network.get_transaction(sendtxid) await self.ledger.network.get_transaction(sendtxid)
async def test_timeout_then_reconnect(self): async def test_timeout_then_reconnect(self):

View file

@ -1,7 +1,7 @@
import asyncio import asyncio
from scribe.hub import HUB_PROTOCOL_VERSION from hub.herald import HUB_PROTOCOL_VERSION
from scribe.hub.session import LBRYElectrumX from hub.herald.session import LBRYElectrumX
from lbry.error import ServerPaymentFeeAboveMaxAllowedError from lbry.error import ServerPaymentFeeAboveMaxAllowedError
from lbry.wallet.network import ClientSession from lbry.wallet.network import ClientSession

View file

@ -326,6 +326,30 @@ class ResolveCommand(BaseResolveTestCase):
await self.support_abandon(claim_id1) await self.support_abandon(claim_id1)
await self.assertResolvesToClaimId('@foo', claim_id2) await self.assertResolvesToClaimId('@foo', claim_id2)
async def test_resolve_duplicate_name_in_channel(self):
db_resolve = self.conductor.spv_node.server.db.resolve
# first one remains winner unless something else changes
channel_id = self.get_claim_id(await self.channel_create('@foo'))
file_path = self.create_upload_file(data=b'hi!')
tx = await self.daemon.jsonrpc_stream_create('duplicate', '0.1', file_path=file_path, allow_duplicate_name=True, channel_id=channel_id)
await self.ledger.wait(tx)
first_claim = tx.outputs[0].claim_id
file_path = self.create_upload_file(data=b'hi!')
tx = await self.daemon.jsonrpc_stream_create('duplicate', '0.1', file_path=file_path, allow_duplicate_name=True, channel_id=channel_id)
await self.ledger.wait(tx)
duplicate_claim = tx.outputs[0].claim_id
await self.generate(1)
stream, channel, _, _ = await db_resolve(f"@foo:{channel_id}/duplicate:{first_claim}")
self.assertEqual(stream.claim_hash.hex(), first_claim)
self.assertEqual(channel.claim_hash.hex(), channel_id)
stream, channel, _, _ = await db_resolve(f"@foo:{channel_id}/duplicate:{duplicate_claim}")
self.assertEqual(stream.claim_hash.hex(), duplicate_claim)
self.assertEqual(channel.claim_hash.hex(), channel_id)
async def test_advanced_resolve(self): async def test_advanced_resolve(self):
claim_id1 = self.get_claim_id( claim_id1 = self.get_claim_id(
await self.stream_create('foo', '0.7', allow_duplicate_name=True)) await self.stream_create('foo', '0.7', allow_duplicate_name=True))

View file

@ -4,7 +4,7 @@ deps =
coverage coverage
extras = extras =
test test
scribe hub
torrent torrent
changedir = {toxinidir}/tests changedir = {toxinidir}/tests
setenv = setenv =