Added synctoolv2, claimTrie resolving instead of full chain!

Added synctoolv2, claimTrie resolving instead of full chain and some small patches!
This commit is contained in:
Fillerino 2017-09-03 14:47:40 +02:00
parent 1d9e54458f
commit 5c929f343c
6 changed files with 143 additions and 113 deletions
server/utils/importer

View file

@ -1,60 +0,0 @@
'use strict';
const Promise = require('bluebird');
const rp = require('request-promise');
let client;
async function getClaims (height, gclient) {
return new Promise(async (resolve, reject) => {
try {
client = gclient;
let blockHash = await client.getBlockHash(height).then(blockHash => { return blockHash }).catch(err => reject(err));
let block = await client.getBlock(blockHash).then(block => { return block }).catch(err => reject(err));
let claims = await getClaimsForTxes(block.tx, height); // should return an array of claims, decoded if possible.
resolve(claims);
} catch (err) {
return reject(err);
}
});
}
async function getClaimsForTxes (txes, height) {
return new Promise(async (resolve, reject) => {
try {
let claimsArr = [];
for (let tx of txes) {
let claimsTx = await client.getClaimsForTx(tx).then(claims => { return claims }).catch(err => reject(err));
if (claimsTx != null) {
for (let claim of claimsTx) {
claim['height'] = height;
let dClaim = await getValue(tx, claim['nOut']);
if (dClaim !== 'error when decoding claims' && claim['value']) {
claim['value'] = JSON.parse(dClaim);
claimsArr.push(claim);
} else {
claim['value'] = { error: 'non_decodable' };
claimsArr.push(claim);
}
}
}
}
resolve(claimsArr);
} catch (err) {
return reject(err);
}
});
}
async function getValue (tx, i) {
return new Promise(async (resolve, reject) => {
rp(`http://localhost:5000/claim_decode/${tx}/${i}`)
.then(function (htmlString) {
resolve(htmlString);
})
.catch(function (err) {
reject(err);
});
});
}
module.exports = exports = getClaims;

View file

@ -1,13 +1,17 @@
/*
* Importer code, handles all the syncing with the blockchain into elasticsearch.
*/
const bitcoin = require('bitcoin-promise');
const elasticsearch = require('elasticsearch');
const winston = require('winston');
const winstonStream = require('winston-stream');
import bitcoin from 'bitcoin-promise';
import elasticsearch from 'elasticsearch';
import ElasticQueue from 'elastic-queue';
import winston from 'winston';
import winstonStream from 'winston-stream';
import jsonfile from 'jsonfile';
import path from 'path';
import rp from 'request-promise';
import appRoot from 'app-root-path';
const loggerStream = winstonStream(winston, 'info');
const eclient = new elasticsearch.Client({
host: 'http://elastic:changeme@localhost:9200',
log : {
@ -16,7 +20,7 @@ const eclient = new elasticsearch.Client({
stream: loggerStream,
},
});
const queue = new ElasticQueue({elastic: eclient});
const client = new bitcoin.Client({
host : 'localhost',
port : 9245,
@ -24,26 +28,64 @@ const client = new bitcoin.Client({
pass : 'lbry',
timeout: 30000,
});
let claimsSynced = 0;
let status = {};
export async function sync (currentHeight) {
export async function sync () {
try {
let maxHeight = await client.getBlockCount().then(blockHash => { return blockHash }).catch(err => { throw err });
if (currentHeight <= maxHeight) {
let claims = await require('./getClaims')(currentHeight, client);
send(claims);
claimsSynced += claims.length;
// currentHeight / maxHeight / claimsSynced
status.message = `Running,${currentHeight} / ${maxHeight} done, ${claimsSynced} claims imported.`;
sync(currentHeight + 1);
} else {
await sleep(2000);
status.message = `All claims imported, waiting for new blocks at ${maxHeight}`;
sync(currentHeight); // eslint-disable-line no-unreachable
status.info = 'Grabbing the claimTrie...';
let claimTrie = await client.getClaimsInTrie().then(claimtrie => { return claimtrie }).catch(err => { throw err });
let txList = [];
let latestClaimTrie = [];
for (let i in claimTrie) {
for (let o in claimTrie[i].claims) {
txList.push({
txid : claimTrie[i].claims[o].txid,
nOut : claimTrie[i].claims[o].n,
claimId: claimTrie[i].claims[o].claimId,
});
latestClaimTrie.push(claimTrie[i].claims[o].claimId);
}
}
let oldClaimTrie = await getJSON(path.join(appRoot.path, 'claimTrieCache.json')); // get our old claimTrieCache....
let added = await getAddedClaims(oldClaimTrie, latestClaimTrie); // get all new that should be added
let removed = await getRemovedClaims(oldClaimTrie, latestClaimTrie); // get all old that should be removed
status.info = 'Adding/Removing Claims, please wait...';
for (let claimId of added) { // for all new get their tx info and add to database
let tx = txList.find(x => x.claimId === claimId);
if (typeof tx !== 'undefined') {
let value = await getValue(tx.txid, tx.nOut);
if ((value !== 'error when decoding claims')) {
value = JSON.parse(value);
if (value['is controlling'] && value['name'] !== '') {
if (value.name && value.value) {
value.suggest_name = {
input : value.name,
weight: 30,
};
}
pushElastic(value);
}
}
}
}
for (let claimId of removed) { // Call elastic and remove claim by id if it exists.
eclient.delete({
index: 'claims',
type : 'claim',
id : claimId,
}, function (error, response) {
if (error) {
winston.log(error);
}
});
}
// Done adding, update our claimTrie cache to latest and wait a bit...
await saveJSON(path.join(appRoot.path, 'claimTrieCache.json'), latestClaimTrie);
status.info = 'Done updating the claimTrieCache, waiting 5 minutes before doing a recheck..';
await sleep(300000);
sync();
} catch (err) {
console.log(err);
winston.log(err);
status.err = err;
}
}
@ -52,31 +94,64 @@ export function getStats () {
return status;
}
function send (arr) { // Modular change output here :)
arr.forEach(function (claim) {
claim['id'] = claim['claimId'];
// Check if our value is a object, else make it a object...
claim['value'] = (typeof claim.value === 'object' ? claim.value : JSON.parse(claim.value));
// claim['value'] = JSON.stringify(claim['value']);
if (claim.name && claim.value) {
claim.suggest_name = {
input : claim.name,
weight: 30,
};
if (claim.value.claimType === 'streamType' && claim.value.stream.metadata && claim.value.stream.metadata.description) {
claim.suggest_desc = {
input : claim.value.stream.metadata.description.split(' '),
weight: 10,
};
}
}
eclient.create({
function getAddedClaims (oldClaimTrie, newClaimTrie) {
return new Promise((resolve, reject) => {
let a = new Set(oldClaimTrie);
let b = new Set(newClaimTrie);
resolve(new Set([...b].filter(x => !a.has(x))));
});
}
function getRemovedClaims (oldClaimTrie, newClaimTrie) {
return new Promise((resolve, reject) => {
let a = new Set(oldClaimTrie);
let b = new Set(newClaimTrie);
resolve(new Set([...a].filter(x => !b.has(x))));
});
}
function getValue (tx, i) {
return new Promise((resolve, reject) => {
rp(`http://localhost:5000/claim_decode/${tx}/${i}`)
.then(function (htmlString) {
resolve(htmlString);
})
.catch(function (err) {
reject(err);
});
});
}
async function pushElastic (claim) {
return new Promise(async(resolve, reject) => {
queue.push({
index: 'claims',
type : 'claim',
id : claim.claimId,
body : claim,
}, function (error, response) {
if (error) { status.err = error; console.log(error) }
});
});
}
function getJSON (path) {
return new Promise((resolve, reject) => {
jsonfile.readFile(path, function (err, jsoncontent) {
if (err) {
reject(err);
} else {
resolve(jsoncontent);
}
});
});
}
function saveJSON (path, obj) {
return new Promise((resolve, reject) => {
jsonfile.writeFile(path, obj, function (err, jsoncontent) {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}