2017-08-17 12:06:24 +02:00
|
|
|
/*
|
|
|
|
* Importer code, handles all the syncing with the blockchain into elasticsearch.
|
|
|
|
*/
|
2017-09-03 14:47:40 +02:00
|
|
|
import bitcoin from 'bitcoin-promise';
|
|
|
|
import elasticsearch from 'elasticsearch';
|
|
|
|
import ElasticQueue from 'elastic-queue';
|
|
|
|
import winston from 'winston';
|
|
|
|
import winstonStream from 'winston-stream';
|
|
|
|
import jsonfile from 'jsonfile';
|
|
|
|
import path from 'path';
|
|
|
|
import rp from 'request-promise';
|
|
|
|
import appRoot from 'app-root-path';
|
2017-08-18 10:16:29 +02:00
|
|
|
|
|
|
|
const loggerStream = winstonStream(winston, 'info');
|
|
|
|
const eclient = new elasticsearch.Client({
|
2017-09-20 03:49:42 +02:00
|
|
|
host: 'http://localhost:9200',
|
|
|
|
|
2017-08-18 10:16:29 +02:00
|
|
|
log : {
|
|
|
|
level : 'info',
|
|
|
|
type : 'stream',
|
|
|
|
stream: loggerStream,
|
|
|
|
},
|
2017-08-16 19:49:27 +02:00
|
|
|
});
|
2017-09-03 14:47:40 +02:00
|
|
|
const queue = new ElasticQueue({elastic: eclient});
|
2017-08-16 19:49:27 +02:00
|
|
|
const client = new bitcoin.Client({
|
|
|
|
host : 'localhost',
|
|
|
|
port : 9245,
|
|
|
|
user : 'lbry',
|
|
|
|
pass : 'lbry',
|
|
|
|
timeout: 30000,
|
|
|
|
});
|
2017-08-17 12:06:24 +02:00
|
|
|
let status = {};
|
2017-08-16 19:49:27 +02:00
|
|
|
|
2017-09-03 14:47:40 +02:00
|
|
|
export async function sync () {
|
2017-08-16 19:49:27 +02:00
|
|
|
try {
|
2017-09-03 14:47:40 +02:00
|
|
|
status.info = 'Grabbing the claimTrie...';
|
|
|
|
let claimTrie = await client.getClaimsInTrie().then(claimtrie => { return claimtrie }).catch(err => { throw err });
|
|
|
|
let txList = [];
|
|
|
|
let latestClaimTrie = [];
|
|
|
|
for (let i in claimTrie) {
|
|
|
|
for (let o in claimTrie[i].claims) {
|
|
|
|
txList.push({
|
|
|
|
txid : claimTrie[i].claims[o].txid,
|
|
|
|
nOut : claimTrie[i].claims[o].n,
|
|
|
|
claimId: claimTrie[i].claims[o].claimId,
|
|
|
|
});
|
|
|
|
latestClaimTrie.push(claimTrie[i].claims[o].claimId);
|
|
|
|
}
|
2017-08-16 19:49:27 +02:00
|
|
|
}
|
2017-09-03 14:47:40 +02:00
|
|
|
let oldClaimTrie = await getJSON(path.join(appRoot.path, 'claimTrieCache.json')); // get our old claimTrieCache....
|
|
|
|
let added = await getAddedClaims(oldClaimTrie, latestClaimTrie); // get all new that should be added
|
|
|
|
let removed = await getRemovedClaims(oldClaimTrie, latestClaimTrie); // get all old that should be removed
|
|
|
|
status.info = 'Adding/Removing Claims, please wait...';
|
|
|
|
for (let claimId of added) { // for all new get their tx info and add to database
|
|
|
|
let tx = txList.find(x => x.claimId === claimId);
|
|
|
|
if (typeof tx !== 'undefined') {
|
|
|
|
let value = await getValue(tx.txid, tx.nOut);
|
|
|
|
if ((value !== 'error when decoding claims')) {
|
|
|
|
value = JSON.parse(value);
|
|
|
|
if (value['is controlling'] && value['name'] !== '') {
|
|
|
|
if (value.name && value.value) {
|
|
|
|
value.suggest_name = {
|
|
|
|
input : value.name,
|
|
|
|
weight: 30,
|
|
|
|
};
|
|
|
|
}
|
|
|
|
pushElastic(value);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for (let claimId of removed) { // Call elastic and remove claim by id if it exists.
|
|
|
|
eclient.delete({
|
|
|
|
index: 'claims',
|
|
|
|
type : 'claim',
|
|
|
|
id : claimId,
|
|
|
|
}, function (error, response) {
|
|
|
|
if (error) {
|
|
|
|
winston.log(error);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
// Done adding, update our claimTrie cache to latest and wait a bit...
|
|
|
|
await saveJSON(path.join(appRoot.path, 'claimTrieCache.json'), latestClaimTrie);
|
|
|
|
status.info = 'Done updating the claimTrieCache, waiting 5 minutes before doing a recheck..';
|
|
|
|
await sleep(300000);
|
|
|
|
sync();
|
2017-08-16 19:49:27 +02:00
|
|
|
} catch (err) {
|
2017-09-03 14:47:40 +02:00
|
|
|
winston.log(err);
|
2017-08-17 12:06:24 +02:00
|
|
|
status.err = err;
|
2017-08-16 19:49:27 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-08-17 12:06:24 +02:00
|
|
|
export function getStats () {
|
|
|
|
return status;
|
|
|
|
}
|
|
|
|
|
2017-09-03 14:47:40 +02:00
|
|
|
function getAddedClaims (oldClaimTrie, newClaimTrie) {
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
let a = new Set(oldClaimTrie);
|
|
|
|
let b = new Set(newClaimTrie);
|
|
|
|
resolve(new Set([...b].filter(x => !a.has(x))));
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
function getRemovedClaims (oldClaimTrie, newClaimTrie) {
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
let a = new Set(oldClaimTrie);
|
|
|
|
let b = new Set(newClaimTrie);
|
|
|
|
resolve(new Set([...a].filter(x => !b.has(x))));
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
function getValue (tx, i) {
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
rp(`http://localhost:5000/claim_decode/${tx}/${i}`)
|
|
|
|
.then(function (htmlString) {
|
|
|
|
resolve(htmlString);
|
|
|
|
})
|
|
|
|
.catch(function (err) {
|
|
|
|
reject(err);
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
async function pushElastic (claim) {
|
|
|
|
return new Promise(async(resolve, reject) => {
|
|
|
|
queue.push({
|
2017-08-16 19:49:27 +02:00
|
|
|
index: 'claims',
|
|
|
|
type : 'claim',
|
|
|
|
id : claim.claimId,
|
|
|
|
body : claim,
|
2017-09-03 14:47:40 +02:00
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
function getJSON (path) {
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
jsonfile.readFile(path, function (err, jsoncontent) {
|
|
|
|
if (err) {
|
|
|
|
reject(err);
|
|
|
|
} else {
|
|
|
|
resolve(jsoncontent);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
function saveJSON (path, obj) {
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
jsonfile.writeFile(path, obj, function (err, jsoncontent) {
|
|
|
|
if (err) {
|
|
|
|
reject(err);
|
|
|
|
} else {
|
|
|
|
resolve();
|
|
|
|
}
|
2017-08-16 19:49:27 +02:00
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2017-08-17 12:06:24 +02:00
|
|
|
function sleep (ms) {
|
|
|
|
return new Promise(resolve => setTimeout(resolve, ms));
|
|
|
|
}
|