changed forever mempool resolution to 1 second and added redis tx hash caching to prevent db hits

This commit is contained in:
Akinwale Ariwodola 2017-06-13 21:20:41 +01:00
parent a7ef82fd69
commit dacac5b8e4

View file

@ -10,12 +10,16 @@ use Mdanter\Ecc\EccFactory;
class BlockShell extends Shell {
const mempooltxkey = 'lbc.mempooltx';
const pubKeyAddress = [0, 85];
const scriptAddress = [5, 122];
const rpcurl = 'http://lrpc:lrpc@127.0.0.1:9245';
const redisurl = 'tcp://127.0.0.1:6379';
public function initialize() {
parent::initialize();
$this->loadModel('Blocks');
@ -26,7 +30,6 @@ class BlockShell extends Shell {
}
public function main() {
//$this->parsenewblocks();
$this->out('No arguments specified');
}
@ -38,7 +41,7 @@ class BlockShell extends Shell {
public function fixzerooutputs() {
self::lock('zerooutputs');
$redis = new \Predis\Client('tcp://127.0.0.1:6379');
$redis = new \Predis\Client(self::redisurl);
$conn = ConnectionManager::get('default');
/** 2017-06-12 21:38:07 **/
@ -517,6 +520,13 @@ class BlockShell extends Shell {
//$conn->execute('SET foreign_key_checks = 0');
//$conn->execute('SET unique_checks = 0');
$redis = null;
try {
$redis = new \Predis\Client(self::redisurl);
} catch (\Exception $e) {
// redis unavailable
}
try {
$unproc_blocks = $this->Blocks->find()->select(['Id', 'Height', 'Hash', 'TransactionHashes', 'BlockTime'])->where(['TransactionsProcessed' => 0])->order(['Height' => 'asc'])->toArray();
foreach ($unproc_blocks as $min_block) {
@ -556,6 +566,11 @@ class BlockShell extends Shell {
$total_diff += $diff_ms;
echo "tx took {$diff_ms}ms. Total {$total_diff}ms. ";
if (!$data_error && $redis && $redis->sismember(self::mempooltxkey, $tx_hash)) {
$redis->srem(self::mempooltxkey, $tx_hash);
echo "Removed $tx_hash from redis mempooltx.\n";
}
echo "Done.\n";
}
@ -591,6 +606,11 @@ class BlockShell extends Shell {
$upd_entity = $this->Transactions->newEntity($upd_tx);
$this->Transactions->save($upd_entity);
echo "Done.\n";
if ($redis && $redis->sismember(self::mempooltxkey, $tx_hash)) {
$redis->srem(self::mempooltxkey, $tx_hash);
echo "Removed $tx_hash from redis mempooltx.\n";
}
} else {
echo "Block not found.\n";
}
@ -610,7 +630,14 @@ class BlockShell extends Shell {
self::lock('parsenewblocks');
echo "Parsing new blocks...\n";
$redis = null;
try {
try {
$redis = new \Predis\Client();
} catch (\Exception $e) {
// redis not available
}
// Get the best block hash
$req = ['method' => 'getbestblockhash', 'params' => []];
$response = self::curl_json_post(self::rpcurl, json_encode($req));
@ -690,7 +717,7 @@ class BlockShell extends Shell {
$curr_block_entity->Bits,
$curr_block_entity->Chainwork,
$curr_block_entity->Confirmations,
$curr_block_ins['Difficulty'],
$curr_block_ins['Difficulty'], // cakephp 3 why?
$curr_block_entity->Hash,
$curr_block_entity->Height,
$curr_block_entity->MedianTime,
@ -737,7 +764,6 @@ class BlockShell extends Shell {
echo "Updated tx $tx_hash with block hash and time $block_data->time.\n";
} else {
// Doesn't exist, create a new transaction
echo "Inserting tx $tx_hash for block height $block_data->height... ";
$conn->begin();
@ -752,6 +778,12 @@ class BlockShell extends Shell {
echo "Done.\n";
}
}
// Remove from redis if present
if (!$data_error && $redis && $redis->sismember(self::mempooltxkey, $tx_hash)) {
$redis->srem(self::mempooltxkey, $tx_hash);
echo "Removed $tx_hash from redis mempooltx.\n";
}
}
if (!$data_error && $block_id > -1) {
@ -767,46 +799,17 @@ class BlockShell extends Shell {
self::unlock('parsenewblocks');
}
public function parsemempool() {
self::lock('parsemempool');
$data = ['method' => 'getrawmempool', 'params' => []];
$res = self::curl_json_post(self::rpcurl, json_encode($data));
$json = json_decode($res);
$txs = $json->result;
$now = new \DateTime('now', new \DateTimeZone('UTC'));
$data_error = false;
$conn = ConnectionManager::get('default');
foreach ($txs as $tx_hash) {
echo "Processing tx hash: $tx_hash... ";
$exist_tx = $this->Transactions->find()->select(['Id'])->where(['Hash' => $tx_hash])->first();
if ($exist_tx) {
echo "Exists. Skipping.\n";
continue;
}
$conn->begin();
$block_ts = new \DateTime('now', new \DateTimeZone('UTC'));
$this->processtx($tx_hash, $block_ts, null, $data_error);
if ($data_error) {
echo "Rolling back!\n";
$conn->rollback();
throw new \Exception('Data save failed!');
} else {
echo "Data committed.\n";
$conn->commit();
}
}
self::unlock('parsemempool');
}
public function forevermempool() {
self::lock('forevermempool');
$conn = ConnectionManager::get('default');
$redis = null;
try {
$redis = new \Predis\Client(self::redisurl);
} catch (\Exception $e) {
// redis not available
}
while (true) {
try {
$data = ['method' => 'getrawmempool', 'params' => []];
@ -815,10 +818,25 @@ class BlockShell extends Shell {
$txs = $json->result;
$now = new \DateTime('now', new \DateTimeZone('UTC'));
$data_error = false;
$conn = ConnectionManager::get('default');
foreach ($txs as $tx_hash) {
echo "Processing tx hash: $tx_hash... ";
if (count($txs) === 0) {
// If no transactions found, that means there's nothing in the mempool. Clear redis
if ($redis) {
$redis->del(self::mempooltxkey);
echo "Empty rawmempool. Cleared mempool txs from redis.\n";
}
}
foreach ($txs as $tx_hash) {
// Check redis mempool txs
if ($redis && $redis->exists(self::mempooltxkey)) {
if ($redis->sismember(self::mempooltxkey, $tx_hash)) {
echo "Found processed tx hash: $tx_hash. Skipping.\n";
continue;
}
}
echo "Processing tx hash: $tx_hash... ";
$exist_tx = $this->Transactions->find()->select(['Id'])->where(['Hash' => $tx_hash])->first();
if ($exist_tx) {
echo "Exists. Skipping.\n";
@ -837,14 +855,19 @@ class BlockShell extends Shell {
} else {
echo "Data committed.\n";
$conn->commit();
// Save to redis to prevent the DB from behing hit again
if ($redis) {
$redis->sadd(self::mempooltxkey, $tx_hash);
}
}
}
} catch (\Exception $e) {
echo "Error occurred processing mempool: " . $e->getMessage() . "\n";
}
echo "Sleeping for 15 seconds before next iteration.\n";
sleep(15);
echo "*******************\n";
sleep(1);
}
self::unlock('forevermempool');