Merge pull request #72 from lbryio/chainquery

Chainquery Integration
This commit is contained in:
filipnyquist 2018-05-14 19:41:02 +02:00 committed by GitHub
commit 98095cdb64
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 179 additions and 3 deletions

1
.gitignore vendored
View file

@ -5,3 +5,4 @@ npm-debug.log
*.retry
.vscode
claimTrieCache.json
syncState.json

View file

@ -1,10 +1,11 @@
import 'babel-polyfill';
import winston from 'winston';
import winstonStream from 'winston-stream';
import { sync, getStats } from '../utils/importer';
import elasticsearch from 'elasticsearch';
import rp from 'request-promise';
import pretty from 'prettysize';
import {claimSync} from '../utils/chainquery';
import {getStats} from '../utils/importer';
const loggerStream = winstonStream(winston, 'info');
@ -233,7 +234,8 @@ class LighthouseControllers {
// Start syncing blocks...
startSync () {
winston.log('info', '[Importer] Started importer, indexing claims.');
sync();
claimSync();
// sync(); // Old Sync
}
/**
* Search API Endpoint.

View file

@ -0,0 +1,173 @@
/*
* Importer code, handles all the syncing with chainquery into elasticsearch.
*/
import elasticsearch from 'elasticsearch';
import ElasticQueue from 'elastic-queue';
import winston from 'winston';
import winstonStream from 'winston-stream';
import jsonfile from 'jsonfile';
import path from 'path';
import rp from 'request-promise';
import appRoot from 'app-root-path';
import fs from 'fs';
import fileExists from 'file-exists';
const loggerStream = winstonStream(winston, 'info');
const eclient = new elasticsearch.Client({
host: 'http://localhost:9200',
log: {
level : 'info',
type : 'stream',
stream: loggerStream,
},
});
const queue = new ElasticQueue({elastic: eclient});
// Check that our cache file exist.
fileExists(path.join(appRoot.path, 'syncState.json'), (err, exists) => {
if (err) { throw err }
if (!exists) {
fs.writeFileSync(path.join(appRoot.path, 'syncState.json'), '{}');
}
});
let status = {};
export async function claimSync () {
try {
let syncState = await getJSON(path.join(appRoot.path, 'syncState.json')); // get our persisted state
if (!syncState.LastSyncTime) {
syncState.LastSyncTime = '0001-01-01 00:00:00';
}
status.info = 'gettingClaimsToUpdate';
let claimsResponse = await getClaimsSince(syncState.LastSyncTime);
let claims = JSON.parse(claimsResponse).data;
status.info = 'addingClaimsToElastic';
for (let claim of claims) {
claim.value = JSON.parse(claim.value).Claim;
// console.log('NEW: ' + JSON.stringify(claim));
// console.log('--------------------------------------------');
// var imprtr = require('../importer');
// let oldvalue = await imprtr.getValue(claim.transaction_by_hash_id, claim.vout);
// oldvalue = JSON.parse(oldvalue);
// console.log('OLD: ' + JSON.stringify(oldvalue));
// console.log('--------------------------------------------');
if (claim.name && claim.value) {
claim.suggest_name = {
input : claim.name,
weight: 30,
};
}
console.log('Push claim to elastic search: ' + claim);
pushElastic(claim);
}
winston.log('info', '[Importer] Removing blocked claims from search!');
let util = require('./util.js');
let blockedOutputsResponse = await getBlockedOutpoints();
let outpointlist = JSON.parse(blockedOutputsResponse);
for (let outpoint of outpointlist.data.outpoints) {
let claimid = util.OutpointToClaimId(outpoint);
console.log('Deleting ClaimId: ' + claimid);
eclient.delete({
index: 'claims',
type : 'claim',
id : claimid,
}, function (error, response) {
if (error) {
winston.log(error, response);
}
});
}
syncState.LastSyncTime = new Date().toISOString().slice(0, 19).replace('T', ' ');
await saveJSON(path.join(appRoot.path, 'syncState.json'), syncState);
status.info = 'upToDate';
await sleep(600000);
claimSync();
} catch (err) {
winston.log(err);
status.err = err;
await sleep(600000);
claimSync();
}
}
export function getStats () {
return status;
}
async function pushElastic (claim) {
console.log('_id:' + claim.claimId);
return new Promise(async(resolve, reject) => {
queue.push({
index: 'claims',
type : 'claim',
id : claim.claimId,
body : claim,
});
});
}
function getJSON (path) {
return new Promise((resolve, reject) => {
jsonfile.readFile(path, function (err, jsoncontent) {
if (err) {
reject(err);
} else {
resolve(jsoncontent);
}
});
});
}
function saveJSON (path, obj) {
return new Promise((resolve, reject) => {
jsonfile.writeFile(path, obj, function (err, jsoncontent) {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
function sleep (ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
function getBlockedOutpoints () {
return new Promise((resolve, reject) => {
rp(`http://api.lbry.io/file/list_blocked`)
.then(function (htmlString) {
resolve(htmlString);
})
.catch(function (err) {
reject(err);
});
});
}
function getClaimsSince (time) {
return new Promise((resolve, reject) => {
let query = `` +
`SELECT ` +
`name, ` +
`value_as_json as value, ` +
`bid_state, ` +
`effective_amount, ` +
`claim_id as claimId, ` +
// `transaction_by_hash_id, ` + // txhash and vout needed to leverage old format for comparison.
// `vout ` +
`FROM claim ` +
`WHERE modified >='` + time + `'`;
// Outputs full query to console for copy/paste into chainquery (debugging)
// console.log(query);
rp(`https://chainquery.lbry.io/api/sql?query=` + query)
.then(function (htmlString) {
resolve(htmlString);
})
.catch(function (err) {
reject(err);
});
});
}

View file

@ -184,7 +184,7 @@ function getBlockedOutpoints () {
});
}
function getValue (tx, i) {
export function getValue (tx, i) {
return new Promise((resolve, reject) => {
rp(`http://localhost:5000/claim_decode/${tx}/${i}`)
.then(function (htmlString) {