From 4b88db1f550dc6439b11bc5dce294ea2c9576cca Mon Sep 17 00:00:00 2001 From: Mark Beamer Jr Date: Sat, 21 Sep 2019 18:47:23 -0400 Subject: [PATCH] Cleanup the sync Add syncstate to status call Remove old sync logic Signed-off-by: Mark Beamer Jr --- server/controllers/lighthouse.js | 5 +- server/utils/chainquery/index.js | 5 +- server/utils/{importer => chainquery}/util.js | 0 server/utils/importer/index.js | 235 ------------------ 4 files changed, 5 insertions(+), 240 deletions(-) rename server/utils/{importer => chainquery}/util.js (100%) delete mode 100644 server/utils/importer/index.js diff --git a/server/controllers/lighthouse.js b/server/controllers/lighthouse.js index a3c2b1e..15cf059 100644 --- a/server/controllers/lighthouse.js +++ b/server/controllers/lighthouse.js @@ -4,8 +4,7 @@ import winstonStream from 'winston-stream'; import elasticsearch from 'elasticsearch'; import rp from 'request-promise'; import pretty from 'prettysize'; -import {claimSync} from '../utils/chainquery'; -import {getStats} from '../utils/importer'; +import {claimSync, getStats} from '../utils/chainquery'; import crypto from 'crypto'; import got from 'got'; import {logToSlack} from '../index'; @@ -420,7 +419,7 @@ function getStatus () { rp(`http://localhost:9200/claims/_stats`) .then(function (data) { data = JSON.parse(data); - resolve({status: getStats().info, spaceUsed: pretty(data._all.total.store.size_in_bytes, true), claimsInIndex: data._all.total.indexing.index_total, totSearches: data._all.total.search.query_total}); + resolve({status: getStats(), spaceUsed: pretty(data._all.total.store.size_in_bytes, true), claimsInIndex: data._all.total.indexing.index_total, totSearches: data._all.total.search.query_total}); }) .catch(function (err) { reject(err); diff --git a/server/utils/chainquery/index.js b/server/utils/chainquery/index.js index a65db38..74f6820 100644 --- a/server/utils/chainquery/index.js +++ b/server/utils/chainquery/index.js @@ -11,7 +11,7 @@ import rp from 'request-promise'; import appRoot from 'app-root-path'; import fs from 'fs'; import fileExists from 'file-exists'; -import * as util from '../../utils/importer/util'; +import * as util from './util'; import {logErrorToSlack} from '../../index'; const elasticsearchloglevel = 'info'; @@ -37,7 +37,7 @@ fileExists(path.join(appRoot.path, 'syncState.json'), (err, exists) => { } }); -let status = {}; +let status = {info: 'startup successful'}; export async function claimSync () { try { @@ -93,6 +93,7 @@ export async function claimSync () { } await saveJSON(path.join(appRoot.path, 'syncState.json'), syncState); status.info = 'upToDate'; + status.syncState = syncState; await sleep(600000); claimSync(); } catch (err) { diff --git a/server/utils/importer/util.js b/server/utils/chainquery/util.js similarity index 100% rename from server/utils/importer/util.js rename to server/utils/chainquery/util.js diff --git a/server/utils/importer/index.js b/server/utils/importer/index.js deleted file mode 100644 index 7e69749..0000000 --- a/server/utils/importer/index.js +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Importer code, handles all the syncing with the blockchain into elasticsearch. - */ -import bitcoin from 'bitcoin-promise'; -import elasticsearch from 'elasticsearch'; -import ElasticQueue from 'elastic-queue'; -import winston from 'winston'; -import winstonStream from 'winston-stream'; -import jsonfile from 'jsonfile'; -import path from 'path'; -import rp from 'request-promise'; -import appRoot from 'app-root-path'; -import fs from 'fs'; -import fileExists from 'file-exists'; -import PropertiesReader from 'properties-reader'; -import os from 'os'; - -const loggerStream = winstonStream(winston, 'info'); -const eclient = new elasticsearch.Client({ - host: 'http://localhost:9200', - - log: { - level : 'info', - type : 'stream', - stream: loggerStream, - }, -}); -const queue = new ElasticQueue({elastic: eclient}); - -// Get the lbrycrd config from the .lbrycrd folder. -function getClient () { - return new Promise((resolve, reject) => { - fileExists(path.join(os.homedir(), '.lbrycrd/lbrycrd.conf'), (err, exists) => { - if (err) { reject(err) }; - let config = {'username': 'lbry', 'password': 'lbry', 'rpc_port': 9245}; - if (exists) { - let prop = PropertiesReader(path.join(os.homedir(), '.lbrycrd/lbrycrd.conf')); - config.username = prop.get('rpcuser'); - config.password = prop.get('rpcpassword'); - config.rpc_port = prop.get('rpcport'); - let client = new bitcoin.Client({ - host : 'localhost', - port : config.rpc_port, - user : config.username, - pass : config.password, - timeout: 30000, - }); - resolve(client); - } else { - let client = new bitcoin.Client({ - host : 'localhost', - port : config.rpc_port, - user : config.username, - pass : config.password, - timeout: 30000, - }); - resolve(client); - } - }); - }); -} - -// Check that our cache file exist. -fileExists(path.join(appRoot.path, 'claimTrieCache.json'), (err, exists) => { - if (err) { throw err }; - if (!exists) { - fs.writeFileSync(path.join(appRoot.path, 'claimTrieCache.json'), '[]'); - } -}); - -let status = {}; - -export async function sync () { - try { - let client = await getClient(); - status.info = 'gettingClaimTrie'; - let claimTrie = await client.getClaimsInTrie().then(claimtrie => { return claimtrie }).catch(err => { throw err }); - let txList = []; - let latestClaimTrie = []; - for (let i in claimTrie) { - for (let o in claimTrie[i].claims) { - txList.push({ - txid : claimTrie[i].claims[o].txid, - nOut : claimTrie[i].claims[o].n, - claimId: claimTrie[i].claims[o].claimId, - }); - latestClaimTrie.push(claimTrie[i].claims[o].claimId); - } - } - status.info = 'calculatingClaimTrie'; - let oldClaimTrie = await getJSON(path.join(appRoot.path, 'claimTrieCache.json')); // get our old claimTrieCache.... - let added = await getAddedClaims(oldClaimTrie, latestClaimTrie); // get all new that should be added - let removed = await getRemovedClaims(oldClaimTrie, latestClaimTrie); // get all old that should be removed - status.info = 'updatingIndex'; - for (let claimId of added) { // for all new get their tx info and add to database - let tx = txList.find(x => x.claimId === claimId); - if (typeof tx !== 'undefined') { - let value = await getValue(tx.txid, tx.nOut); - if ((value !== 'error when decoding claims')) { - value = JSON.parse(value); - if (value['is controlling'] && value['name'] !== '') { - if (value.name && value.value) { - value.suggest_name = { - input : value.name, - weight: 30, - }; - } - pushElastic(value); - } - } - } - } - for (let claimId of removed) { // Call elastic and remove claim by id if it exists. - eclient.delete({ - index: 'claims', - type : 'claim', - id : claimId, - }, function (error, response) { - if (error) { - winston.log(error); - } - }); - } - winston.log('info', '[Importer] Removing blocked claims from search!'); - var util = require('./util.js'); - var blockedOutputsResponse = await getBlockedOutpoints(); - var outpointlist = JSON.parse(blockedOutputsResponse); - for (let outpoint of outpointlist.data.outpoints) { - var claimid = util.OutpointToClaimId(outpoint); - console.log('Deleting ClaimId: ' + claimid); - eclient.delete({ - index: 'claims', - type : 'claim', - id : claimid, - }, function (error, response) { - if (error) { - winston.log(error); - } - }); - } - - // Done adding, update our claimTrie cache to latest and wait a bit... - await saveJSON(path.join(appRoot.path, 'claimTrieCache.json'), latestClaimTrie); - status.info = 'upToDate'; - await sleep(600000); - sync(); - } catch (err) { - winston.log(err); - status.err = err; - await sleep(600000); - sync(); - } -} - -export function getStats () { - return status; -} - -function getAddedClaims (oldClaimTrie, newClaimTrie) { - return new Promise((resolve, reject) => { - let a = new Set(oldClaimTrie); - let b = new Set(newClaimTrie); - resolve(new Set([...b].filter(x => !a.has(x)))); - }); -} - -function getRemovedClaims (oldClaimTrie, newClaimTrie) { - return new Promise((resolve, reject) => { - let a = new Set(oldClaimTrie); - let b = new Set(newClaimTrie); - resolve(new Set([...a].filter(x => !b.has(x)))); - }); -} - -function getBlockedOutpoints () { - return new Promise((resolve, reject) => { - rp(`http://api.lbry.com/file/list_blocked`) - .then(function (htmlString) { - resolve(htmlString); - }) - .catch(function (err) { - reject(err); - }); - }); -} - -export function getValue (tx, i) { - return new Promise((resolve, reject) => { - rp(`http://localhost:5000/claim_decode/${tx}/${i}`) - .then(function (htmlString) { - resolve(htmlString); - }) - .catch(function (err) { - reject(err); - }); - }); -} - -async function pushElastic (claim) { - return new Promise(async (resolve, reject) => { - queue.push({ - index: 'claims', - type : 'claim', - id : claim.claimId, - body : claim, - }); - }); -} - -function getJSON (path) { - return new Promise((resolve, reject) => { - jsonfile.readFile(path, function (err, jsoncontent) { - if (err) { - reject(err); - } else { - resolve(jsoncontent); - } - }); - }); -} -function saveJSON (path, obj) { - return new Promise((resolve, reject) => { - jsonfile.writeFile(path, obj, function (err, jsoncontent) { - if (err) { - reject(err); - } else { - resolve(); - } - }); - }); -} - -function sleep (ms) { - return new Promise(resolve => setTimeout(resolve, ms)); -}