update scribe env and fix tests

This commit is contained in:
Jack Robison 2022-05-09 11:34:28 -04:00
parent 0ea8ba72dd
commit ea8adc5367
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
6 changed files with 61 additions and 24 deletions

View file

@ -138,7 +138,7 @@ class BlobServerProtocol(asyncio.Protocol):
try: try:
request = BlobRequest.deserialize(self.buf + data) request = BlobRequest.deserialize(self.buf + data)
self.buf = remainder self.buf = remainder
except JSONDecodeError: except (UnicodeDecodeError, JSONDecodeError):
log.error("request from %s is not valid json (%i bytes): %s", self.peer_address_and_port, log.error("request from %s is not valid json (%i bytes): %s", self.peer_address_and_port,
len(self.buf + data), '' if not data else binascii.hexlify(self.buf + data).decode()) len(self.buf + data), '' if not data else binascii.hexlify(self.buf + data).decode())
self.close() self.close()

View file

@ -18,14 +18,6 @@ import urllib.request
from uuid import uuid4 from uuid import uuid4
try:
from scribe.hub.env import ServerEnv
from scribe.hub.service import HubServerService
from scribe.elasticsearch.service import ElasticSyncService
from scribe.blockchain.service import BlockchainProcessorService
except ImportError:
pass
import lbry import lbry
from lbry.wallet import Wallet, Ledger, RegTestLedger, WalletManager, Account, BlockHeightEvent from lbry.wallet import Wallet, Ledger, RegTestLedger, WalletManager, Account, BlockHeightEvent
from lbry.conf import KnownHubsList, Config from lbry.conf import KnownHubsList, Config
@ -33,6 +25,16 @@ from lbry.wallet.orchstr8 import __hub_url__
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
try:
from hub.herald.env import ServerEnv
from hub.scribe.env import BlockchainEnv
from hub.elastic_sync.env import ElasticEnv
from hub.herald.service import HubServerService
from hub.elastic_sync.service import ElasticSyncService
from hub.scribe.service import BlockchainProcessorService
except ImportError:
pass
def get_lbcd_node_from_ledger(ledger_module): def get_lbcd_node_from_ledger(ledger_module):
return LBCDNode( return LBCDNode(
@ -257,14 +259,24 @@ class SPVNode:
'session_timeout': self.session_timeout, 'session_timeout': self.session_timeout,
'max_query_workers': 0, 'max_query_workers': 0,
'es_index_prefix': self.index_name, 'es_index_prefix': self.index_name,
'chain': 'regtest' 'chain': 'regtest',
'index_address_status': False
} }
if extraconf: if extraconf:
conf.update(extraconf) conf.update(extraconf)
env = ServerEnv(**conf) self.writer = BlockchainProcessorService(
self.writer = BlockchainProcessorService(env) BlockchainEnv(db_dir=self.data_path, daemon_url=lbcwallet_node.rpc_url,
self.server = HubServerService(env) reorg_limit=100, max_query_workers=0, chain='regtest', index_address_status=False)
self.es_writer = ElasticSyncService(env) )
self.server = HubServerService(ServerEnv(**conf))
self.es_writer = ElasticSyncService(
ElasticEnv(
db_dir=self.data_path, reorg_limit=100, max_query_workers=0, chain='regtest',
elastic_notifier_port=self.elastic_notifier_port,
es_index_prefix=self.index_name, filtering_channel_ids=(extraconf or {}).get('filtering_channel_ids'),
blocking_channel_ids=(extraconf or {}).get('blocking_channel_ids')
)
)
await self.writer.start() await self.writer.start()
await self.es_writer.start() await self.es_writer.start()
await self.server.start() await self.server.start()

View file

@ -10,20 +10,21 @@ class BlockchainReorganizationTests(CommandTestCase):
async def assertBlockHash(self, height): async def assertBlockHash(self, height):
bp = self.conductor.spv_node.writer bp = self.conductor.spv_node.writer
reader = self.conductor.spv_node.server
def get_txids(): def get_txids():
return [ return [
bp.db.fs_tx_hash(tx_num)[0][::-1].hex() reader.db.fs_tx_hash(tx_num)[0][::-1].hex()
for tx_num in range(bp.db.tx_counts[height - 1], bp.db.tx_counts[height]) for tx_num in range(bp.db.tx_counts[height - 1], bp.db.tx_counts[height])
] ]
block_hash = await self.blockchain.get_block_hash(height) block_hash = await self.blockchain.get_block_hash(height)
self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode()) self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode())
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex()) self.assertEqual(block_hash, (await reader.db.fs_block_hashes(height, 1))[0][::-1].hex())
txids = await asyncio.get_event_loop().run_in_executor(None, get_txids) txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
txs = await bp.db.get_transactions_and_merkles(txids) txs = await reader.db.get_transactions_and_merkles(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx'] block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions') self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order') self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')

View file

@ -1,12 +1,12 @@
import asyncio import asyncio
import scribe import hub
from unittest.mock import Mock from unittest.mock import Mock
from scribe.hub import HUB_PROTOCOL_VERSION from hub.herald import HUB_PROTOCOL_VERSION
from scribe.hub.udp import StatusServer from hub.herald.udp import StatusServer
from scribe.hub.session import LBRYElectrumX from hub.herald.session import LBRYElectrumX
from scribe.blockchain.network import LBCRegTest from hub.scribe.network import LBCRegTest
from lbry.wallet.network import Network from lbry.wallet.network import Network
from lbry.wallet.orchstr8 import Conductor from lbry.wallet.orchstr8 import Conductor

View file

@ -1,7 +1,7 @@
import asyncio import asyncio
from scribe.hub import HUB_PROTOCOL_VERSION from hub.herald import HUB_PROTOCOL_VERSION
from scribe.hub.session import LBRYElectrumX from hub.herald.session import LBRYElectrumX
from lbry.error import ServerPaymentFeeAboveMaxAllowedError from lbry.error import ServerPaymentFeeAboveMaxAllowedError
from lbry.wallet.network import ClientSession from lbry.wallet.network import ClientSession

View file

@ -326,6 +326,30 @@ class ResolveCommand(BaseResolveTestCase):
await self.support_abandon(claim_id1) await self.support_abandon(claim_id1)
await self.assertResolvesToClaimId('@foo', claim_id2) await self.assertResolvesToClaimId('@foo', claim_id2)
async def test_resolve_duplicate_name_in_channel(self):
db_resolve = self.conductor.spv_node.server.db.resolve
# first one remains winner unless something else changes
channel_id = self.get_claim_id(await self.channel_create('@foo'))
file_path = self.create_upload_file(data=b'hi!')
tx = await self.daemon.jsonrpc_stream_create('duplicate', '0.1', file_path=file_path, allow_duplicate_name=True, channel_id=channel_id)
await self.ledger.wait(tx)
first_claim = tx.outputs[0].claim_id
file_path = self.create_upload_file(data=b'hi!')
tx = await self.daemon.jsonrpc_stream_create('duplicate', '0.1', file_path=file_path, allow_duplicate_name=True, channel_id=channel_id)
await self.ledger.wait(tx)
duplicate_claim = tx.outputs[0].claim_id
await self.generate(1)
stream, channel, _, _ = await db_resolve(f"@foo:{channel_id}/duplicate:{first_claim}")
self.assertEqual(stream.claim_hash.hex(), first_claim)
self.assertEqual(channel.claim_hash.hex(), channel_id)
stream, channel, _, _ = await db_resolve(f"@foo:{channel_id}/duplicate:{duplicate_claim}")
self.assertEqual(stream.claim_hash.hex(), duplicate_claim)
self.assertEqual(channel.claim_hash.hex(), channel_id)
async def test_advanced_resolve(self): async def test_advanced_resolve(self):
claim_id1 = self.get_claim_id( claim_id1 = self.get_claim_id(
await self.stream_create('foo', '0.7', allow_duplicate_name=True)) await self.stream_create('foo', '0.7', allow_duplicate_name=True))