lighthouse.js/server/utils/importer/index.js

235 lines
6.7 KiB
JavaScript

/*
* Importer code, handles all the syncing with the blockchain into elasticsearch.
*/
import bitcoin from 'bitcoin-promise';
import elasticsearch from 'elasticsearch';
import ElasticQueue from 'elastic-queue';
import winston from 'winston';
import winstonStream from 'winston-stream';
import jsonfile from 'jsonfile';
import path from 'path';
import rp from 'request-promise';
import appRoot from 'app-root-path';
import fs from 'fs';
import fileExists from 'file-exists';
import PropertiesReader from 'properties-reader';
import os from 'os';
const loggerStream = winstonStream(winston, 'info');
const eclient = new elasticsearch.Client({
host: 'http://localhost:9200',
log: {
level : 'info',
type : 'stream',
stream: loggerStream,
},
});
const queue = new ElasticQueue({elastic: eclient});
// Get the lbrycrd config from the .lbrycrd folder.
function getClient () {
return new Promise((resolve, reject) => {
fileExists(path.join(os.homedir(), '.lbrycrd/lbrycrd.conf'), (err, exists) => {
if (err) { reject(err) };
let config = {'username': 'lbry', 'password': 'lbry', 'rpc_port': 9245};
if (exists) {
let prop = PropertiesReader(path.join(os.homedir(), '.lbrycrd/lbrycrd.conf'));
config.username = prop.get('rpcuser');
config.password = prop.get('rpcpassword');
config.rpc_port = prop.get('rpcport');
let client = new bitcoin.Client({
host : 'localhost',
port : config.rpc_port,
user : config.username,
pass : config.password,
timeout: 30000,
});
resolve(client);
} else {
let client = new bitcoin.Client({
host : 'localhost',
port : config.rpc_port,
user : config.username,
pass : config.password,
timeout: 30000,
});
resolve(client);
}
});
});
}
// Check that our cache file exist.
fileExists(path.join(appRoot.path, 'claimTrieCache.json'), (err, exists) => {
if (err) { throw err };
if (!exists) {
fs.writeFileSync(path.join(appRoot.path, 'claimTrieCache.json'), '[]');
}
});
let status = {};
export async function sync () {
try {
let client = await getClient();
status.info = 'gettingClaimTrie';
let claimTrie = await client.getClaimsInTrie().then(claimtrie => { return claimtrie }).catch(err => { throw err });
let txList = [];
let latestClaimTrie = [];
for (let i in claimTrie) {
for (let o in claimTrie[i].claims) {
txList.push({
txid : claimTrie[i].claims[o].txid,
nOut : claimTrie[i].claims[o].n,
claimId: claimTrie[i].claims[o].claimId,
});
latestClaimTrie.push(claimTrie[i].claims[o].claimId);
}
}
status.info = 'calculatingClaimTrie';
let oldClaimTrie = await getJSON(path.join(appRoot.path, 'claimTrieCache.json')); // get our old claimTrieCache....
let added = await getAddedClaims(oldClaimTrie, latestClaimTrie); // get all new that should be added
let removed = await getRemovedClaims(oldClaimTrie, latestClaimTrie); // get all old that should be removed
status.info = 'updatingIndex';
for (let claimId of added) { // for all new get their tx info and add to database
let tx = txList.find(x => x.claimId === claimId);
if (typeof tx !== 'undefined') {
let value = await getValue(tx.txid, tx.nOut);
if ((value !== 'error when decoding claims')) {
value = JSON.parse(value);
if (value['is controlling'] && value['name'] !== '') {
if (value.name && value.value) {
value.suggest_name = {
input : value.name,
weight: 30,
};
}
pushElastic(value);
}
}
}
}
for (let claimId of removed) { // Call elastic and remove claim by id if it exists.
eclient.delete({
index: 'claims',
type : 'claim',
id : claimId,
}, function (error, response) {
if (error) {
winston.log(error);
}
});
}
winston.log('info', '[Importer] Removing blocked claims from search!');
var util = require('./util.js');
var blockedOutputsResponse = await getBlockedOutpoints();
var outpointlist = JSON.parse(blockedOutputsResponse);
for (let outpoint of outpointlist.data.outpoints) {
var claimid = util.OutpointToClaimId(outpoint);
console.log('Deleting ClaimId: ' + claimid);
eclient.delete({
index: 'claims',
type : 'claim',
id : claimid,
}, function (error, response) {
if (error) {
winston.log(error);
}
});
}
// Done adding, update our claimTrie cache to latest and wait a bit...
await saveJSON(path.join(appRoot.path, 'claimTrieCache.json'), latestClaimTrie);
status.info = 'upToDate';
await sleep(600000);
sync();
} catch (err) {
winston.log(err);
status.err = err;
await sleep(600000);
sync();
}
}
export function getStats () {
return status;
}
function getAddedClaims (oldClaimTrie, newClaimTrie) {
return new Promise((resolve, reject) => {
let a = new Set(oldClaimTrie);
let b = new Set(newClaimTrie);
resolve(new Set([...b].filter(x => !a.has(x))));
});
}
function getRemovedClaims (oldClaimTrie, newClaimTrie) {
return new Promise((resolve, reject) => {
let a = new Set(oldClaimTrie);
let b = new Set(newClaimTrie);
resolve(new Set([...a].filter(x => !b.has(x))));
});
}
function getBlockedOutpoints () {
return new Promise((resolve, reject) => {
rp(`http://api.lbry.io/file/list_blocked`)
.then(function (htmlString) {
resolve(htmlString);
})
.catch(function (err) {
reject(err);
});
});
}
export function getValue (tx, i) {
return new Promise((resolve, reject) => {
rp(`http://localhost:5000/claim_decode/${tx}/${i}`)
.then(function (htmlString) {
resolve(htmlString);
})
.catch(function (err) {
reject(err);
});
});
}
async function pushElastic (claim) {
return new Promise(async(resolve, reject) => {
queue.push({
index: 'claims',
type : 'claim',
id : claim.claimId,
body : claim,
});
});
}
function getJSON (path) {
return new Promise((resolve, reject) => {
jsonfile.readFile(path, function (err, jsoncontent) {
if (err) {
reject(err);
} else {
resolve(jsoncontent);
}
});
});
}
function saveJSON (path, obj) {
return new Promise((resolve, reject) => {
jsonfile.writeFile(path, obj, function (err, jsoncontent) {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
function sleep (ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}