diff --git a/src/Shell/BlockShell.php b/src/Shell/BlockShell.php index c5102e7..b568cf7 100644 --- a/src/Shell/BlockShell.php +++ b/src/Shell/BlockShell.php @@ -10,12 +10,16 @@ use Mdanter\Ecc\EccFactory; class BlockShell extends Shell { + const mempooltxkey = 'lbc.mempooltx'; + const pubKeyAddress = [0, 85]; const scriptAddress = [5, 122]; const rpcurl = 'http://lrpc:lrpc@127.0.0.1:9245'; + const redisurl = 'tcp://127.0.0.1:6379'; + public function initialize() { parent::initialize(); $this->loadModel('Blocks'); @@ -26,7 +30,6 @@ class BlockShell extends Shell { } public function main() { - //$this->parsenewblocks(); $this->out('No arguments specified'); } @@ -38,7 +41,7 @@ class BlockShell extends Shell { public function fixzerooutputs() { self::lock('zerooutputs'); - $redis = new \Predis\Client('tcp://127.0.0.1:6379'); + $redis = new \Predis\Client(self::redisurl); $conn = ConnectionManager::get('default'); /** 2017-06-12 21:38:07 **/ @@ -517,6 +520,13 @@ class BlockShell extends Shell { //$conn->execute('SET foreign_key_checks = 0'); //$conn->execute('SET unique_checks = 0'); + $redis = null; + try { + $redis = new \Predis\Client(self::redisurl); + } catch (\Exception $e) { + // redis unavailable + } + try { $unproc_blocks = $this->Blocks->find()->select(['Id', 'Height', 'Hash', 'TransactionHashes', 'BlockTime'])->where(['TransactionsProcessed' => 0])->order(['Height' => 'asc'])->toArray(); foreach ($unproc_blocks as $min_block) { @@ -556,6 +566,11 @@ class BlockShell extends Shell { $total_diff += $diff_ms; echo "tx took {$diff_ms}ms. Total {$total_diff}ms. "; + if (!$data_error && $redis && $redis->sismember(self::mempooltxkey, $tx_hash)) { + $redis->srem(self::mempooltxkey, $tx_hash); + echo "Removed $tx_hash from redis mempooltx.\n"; + } + echo "Done.\n"; } @@ -591,6 +606,11 @@ class BlockShell extends Shell { $upd_entity = $this->Transactions->newEntity($upd_tx); $this->Transactions->save($upd_entity); echo "Done.\n"; + + if ($redis && $redis->sismember(self::mempooltxkey, $tx_hash)) { + $redis->srem(self::mempooltxkey, $tx_hash); + echo "Removed $tx_hash from redis mempooltx.\n"; + } } else { echo "Block not found.\n"; } @@ -610,7 +630,14 @@ class BlockShell extends Shell { self::lock('parsenewblocks'); echo "Parsing new blocks...\n"; + $redis = null; try { + try { + $redis = new \Predis\Client(); + } catch (\Exception $e) { + // redis not available + } + // Get the best block hash $req = ['method' => 'getbestblockhash', 'params' => []]; $response = self::curl_json_post(self::rpcurl, json_encode($req)); @@ -690,7 +717,7 @@ class BlockShell extends Shell { $curr_block_entity->Bits, $curr_block_entity->Chainwork, $curr_block_entity->Confirmations, - $curr_block_ins['Difficulty'], + $curr_block_ins['Difficulty'], // cakephp 3 why? $curr_block_entity->Hash, $curr_block_entity->Height, $curr_block_entity->MedianTime, @@ -737,7 +764,6 @@ class BlockShell extends Shell { echo "Updated tx $tx_hash with block hash and time $block_data->time.\n"; } else { // Doesn't exist, create a new transaction - echo "Inserting tx $tx_hash for block height $block_data->height... "; $conn->begin(); @@ -752,6 +778,12 @@ class BlockShell extends Shell { echo "Done.\n"; } } + + // Remove from redis if present + if (!$data_error && $redis && $redis->sismember(self::mempooltxkey, $tx_hash)) { + $redis->srem(self::mempooltxkey, $tx_hash); + echo "Removed $tx_hash from redis mempooltx.\n"; + } } if (!$data_error && $block_id > -1) { @@ -767,46 +799,17 @@ class BlockShell extends Shell { self::unlock('parsenewblocks'); } - public function parsemempool() { - self::lock('parsemempool'); - - $data = ['method' => 'getrawmempool', 'params' => []]; - $res = self::curl_json_post(self::rpcurl, json_encode($data)); - $json = json_decode($res); - $txs = $json->result; - $now = new \DateTime('now', new \DateTimeZone('UTC')); - $data_error = false; - $conn = ConnectionManager::get('default'); - - foreach ($txs as $tx_hash) { - echo "Processing tx hash: $tx_hash... "; - - $exist_tx = $this->Transactions->find()->select(['Id'])->where(['Hash' => $tx_hash])->first(); - if ($exist_tx) { - echo "Exists. Skipping.\n"; - continue; - } - - $conn->begin(); - $block_ts = new \DateTime('now', new \DateTimeZone('UTC')); - $this->processtx($tx_hash, $block_ts, null, $data_error); - - if ($data_error) { - echo "Rolling back!\n"; - $conn->rollback(); - throw new \Exception('Data save failed!'); - } else { - echo "Data committed.\n"; - $conn->commit(); - } - } - - self::unlock('parsemempool'); - } - public function forevermempool() { self::lock('forevermempool'); + $conn = ConnectionManager::get('default'); + $redis = null; + try { + $redis = new \Predis\Client(self::redisurl); + } catch (\Exception $e) { + // redis not available + } + while (true) { try { $data = ['method' => 'getrawmempool', 'params' => []]; @@ -815,10 +818,25 @@ class BlockShell extends Shell { $txs = $json->result; $now = new \DateTime('now', new \DateTimeZone('UTC')); $data_error = false; - $conn = ConnectionManager::get('default'); - foreach ($txs as $tx_hash) { - echo "Processing tx hash: $tx_hash... "; + if (count($txs) === 0) { + // If no transactions found, that means there's nothing in the mempool. Clear redis + if ($redis) { + $redis->del(self::mempooltxkey); + echo "Empty rawmempool. Cleared mempool txs from redis.\n"; + } + } + + foreach ($txs as $tx_hash) { + // Check redis mempool txs + if ($redis && $redis->exists(self::mempooltxkey)) { + if ($redis->sismember(self::mempooltxkey, $tx_hash)) { + echo "Found processed tx hash: $tx_hash. Skipping.\n"; + continue; + } + } + + echo "Processing tx hash: $tx_hash... "; $exist_tx = $this->Transactions->find()->select(['Id'])->where(['Hash' => $tx_hash])->first(); if ($exist_tx) { echo "Exists. Skipping.\n"; @@ -837,14 +855,19 @@ class BlockShell extends Shell { } else { echo "Data committed.\n"; $conn->commit(); + + // Save to redis to prevent the DB from behing hit again + if ($redis) { + $redis->sadd(self::mempooltxkey, $tx_hash); + } } } } catch (\Exception $e) { echo "Error occurred processing mempool: " . $e->getMessage() . "\n"; } - echo "Sleeping for 15 seconds before next iteration.\n"; - sleep(15); + echo "*******************\n"; + sleep(1); } self::unlock('forevermempool');