lighthouse.js/server/utils/chainquery/index.js

235 lines
7 KiB
JavaScript
Raw Normal View History

/*
* Importer code, handles all the syncing with chainquery into elasticsearch.
*/
import elasticsearch from 'elasticsearch';
import ElasticQueue from 'elastic-queue';
import winston from 'winston';
import winstonStream from 'winston-stream';
import jsonfile from 'jsonfile';
import path from 'path';
import rp from 'request-promise';
import appRoot from 'app-root-path';
import fs from 'fs';
import fileExists from 'file-exists';
import * as util from './util';
2018-10-28 07:28:24 +01:00
import {logErrorToSlack} from '../../index';
2019-07-08 20:54:01 +02:00
import mysql from 'mysql';
2019-10-15 00:04:47 +02:00
import chainqueryConfig from '../../../chainquery-config.json';
2019-10-15 00:04:47 +02:00
let connection = null;
const esLogLevel = 'info';
const MaxClaimsToProcessPerIteration = 100000;
const BatchSize = 5000;
2019-10-15 00:04:47 +02:00
const loggerStream = winstonStream(winston, esLogLevel);
const eclient = new elasticsearch.Client({
host: 'http://localhost:9200',
log: {
2019-10-15 00:04:47 +02:00
level : esLogLevel,
type : 'stream',
stream: loggerStream,
},
});
2019-10-02 18:15:24 +02:00
const queue = new ElasticQueue({elastic: eclient});
2019-10-02 18:15:24 +02:00
queue.on('drain', function () {
console.log('elasticsearch queue is drained');
});
// Check that our syncState file exist.
fileExists(path.join(appRoot.path, 'syncState.json'), (err, exists) => {
2019-07-08 20:54:01 +02:00
if (err) {
throw err;
}
if (!exists) {
fs.writeFileSync(path.join(appRoot.path, 'syncState.json'), '{}');
}
});
let status = {info: 'startup successful'};
export async function claimSync () {
try {
let syncState = await getJSON(path.join(appRoot.path, 'syncState.json')); // get our persisted state
if (!syncState.LastSyncTime) {
syncState.LastSyncTime = '0001-01-01 00:00:00';
}
if (!syncState.LastID) {
syncState.LastID = 0;
}
if (!syncState.StartSyncTime || syncState.LastID === 0) {
syncState.StartSyncTime = new Date().toISOString().slice(0, 19).replace('T', ' ');
}
status.info = 'gettingClaimsToUpdate';
let finished = false;
let lastID = syncState.LastID;
let iteration = 0;
while (!finished) {
2019-07-08 21:16:54 +02:00
let claims = await getClaimsSince(syncState.LastSyncTime, lastID, BatchSize);
status.info = 'addingClaimsToElastic';
for (let claim of claims) {
if (claim.value === null) {
console.log(claim);
2019-07-08 21:25:19 +02:00
// await logErrorToSlack('Failed to process claim ' + claim.claimId + ' due to missing value');
console.error('Failed to process claim ' + claim.claimId + ' due to missing value');
continue;
}
claim.value = JSON.parse(claim.value).Claim;
if (claim.name && claim.value) {
claim.suggest_name = {
input : '' + claim.name + '',
weight: '30',
};
}
if (claim.bid_state === 'Spent' || claim.bid_state === 'Expired') {
2019-10-02 18:43:12 +02:00
deleteFromElastic(claim.claimId);
} else {
2019-10-02 18:43:12 +02:00
pushElastic(claim);
}
lastID = claim.id;
}
winston.log('info', '[Importer] Pushed ' + claims.length + ' claims to elastic search [LastID]' + lastID);
finished = claims.length < BatchSize || (iteration * BatchSize + BatchSize >= MaxClaimsToProcessPerIteration);
iteration++;
}
2019-10-02 18:04:17 +02:00
await deleteBlockedClaims();
// If not finished, store last id to run again later where we left off, otherwise update last sync time.
if (iteration * BatchSize + BatchSize >= MaxClaimsToProcessPerIteration) {
syncState.LastID = lastID;
} else {
syncState.LastID = 0;
syncState.LastSyncTime = syncState.StartSyncTime;
}
await saveJSON(path.join(appRoot.path, 'syncState.json'), syncState);
status.info = 'upToDate';
status.syncState = syncState;
await sleep(600000);
2019-10-02 18:04:17 +02:00
await claimSync();
} catch (err) {
await logErrorToSlack(err);
status.err = err;
await sleep(600000);
2019-10-02 18:04:17 +02:00
await claimSync();
}
}
export function getStats () {
return status;
}
async function deleteBlockedClaims () {
winston.log('info', '[Importer] Removing blocked claims from search!');
let blockedOutputsResponse = await getBlockedOutpoints();
let outpointlist = JSON.parse(blockedOutputsResponse);
for (let outpoint of outpointlist.data.outpoints) {
let claimid = util.OutpointToClaimId(outpoint);
deleteFromElastic(claimid);
}
winston.log('info', '[Importer] Done processing blocked claims!');
}
2019-10-02 18:43:12 +02:00
function deleteFromElastic (claimid) {
queue.push({
index: 'claims',
type : 'claim',
id : claimid,
body : {},
});
}
2019-10-02 18:43:12 +02:00
function pushElastic (claim) {
queue.push({
index: 'claims',
type : 'claim',
id : claim.claimId,
body : claim,
});
}
function getJSON (path) {
return new Promise((resolve, reject) => {
jsonfile.readFile(path, function (err, jsoncontent) {
if (err) {
2018-10-28 07:28:24 +01:00
logErrorToSlack(err);
reject(err);
} else {
resolve(jsoncontent);
}
});
});
}
2019-07-08 20:54:01 +02:00
function saveJSON (path, obj) {
return new Promise((resolve, reject) => {
jsonfile.writeFile(path, obj, function (err, jsoncontent) {
if (err) {
2018-10-28 07:28:24 +01:00
logErrorToSlack(err);
reject(err);
} else {
resolve();
}
});
});
}
function sleep (ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
function getBlockedOutpoints () {
return new Promise((resolve, reject) => {
2019-07-08 20:00:10 +02:00
rp(`https://api.lbry.com/file/list_blocked`)
.then(function (htmlString) {
resolve(htmlString);
})
.catch(function (err) {
2018-10-28 07:28:24 +01:00
logErrorToSlack('[Importer] Error getting blocked outpoints. ' + err);
reject(err);
});
});
}
2019-10-15 00:04:47 +02:00
function getChainqueryConnection () {
2019-07-08 20:54:01 +02:00
if (connection === null) {
connection = mysql.createConnection({
host : chainqueryConfig.host,
user : chainqueryConfig.user,
password: chainqueryConfig.password,
database: chainqueryConfig.db,
});
connection.connect();
}
2019-10-15 00:04:47 +02:00
return connection;
}
2019-07-08 20:54:01 +02:00
2019-10-15 00:04:47 +02:00
function getClaimsSince (time, lastID, MaxClaimsInCall) {
return new Promise((resolve, reject) => {
2019-07-08 20:54:01 +02:00
let query = `SELECT c.id, c.name,p.name as channel, p.claim_id as channel_id, c.bid_state,c.effective_amount,COALESCE(p.effective_amount,1) as certificate_amount,c.claim_id as claimId,c.value_as_json as value FROM claim c LEFT JOIN claim p on p.claim_id = c.publisher_id WHERE c.id >${lastID} AND c.modified_at >='${time}' ORDER BY c.id LIMIT ${MaxClaimsInCall}`;
// Outputs full query to console for copy/paste into chainquery (debugging)
console.log(query);
2019-10-15 00:04:47 +02:00
getChainqueryConnection().query(query, function (err, results, fields) {
2019-07-08 20:54:01 +02:00
if (err) {
2019-07-08 21:03:12 +02:00
console.error(err);
2018-10-28 07:28:24 +01:00
logErrorToSlack('[Importer] Error getting updated claims. ' + err);
2019-07-08 20:54:01 +02:00
return reject(err);
}
2019-07-08 21:16:54 +02:00
let claims = [];
for (let i = 0; i < results.length; i++) {
let r = results[i];
claims.push({
id : r.id,
name : r.name,
channel : r.channel,
bid_state : r.bid_state,
effective_amount : r.effective_amount,
certificate_amount: r.certificate_amount,
claimId : r.claimId,
value : r.value,
});
}
resolve(claims);
2019-07-08 20:54:01 +02:00
});
});
}