2018-05-13 01:35:56 +02:00
|
|
|
/*
|
|
|
|
* Importer code, handles all the syncing with chainquery into elasticsearch.
|
|
|
|
*/
|
|
|
|
import elasticsearch from 'elasticsearch';
|
|
|
|
import ElasticQueue from 'elastic-queue';
|
|
|
|
import winston from 'winston';
|
|
|
|
import winstonStream from 'winston-stream';
|
|
|
|
import jsonfile from 'jsonfile';
|
|
|
|
import path from 'path';
|
|
|
|
import rp from 'request-promise';
|
|
|
|
import appRoot from 'app-root-path';
|
|
|
|
import fs from 'fs';
|
|
|
|
import fileExists from 'file-exists';
|
2019-09-22 00:47:23 +02:00
|
|
|
import * as util from './util';
|
2018-10-28 07:28:24 +01:00
|
|
|
import {logErrorToSlack} from '../../index';
|
2018-05-13 01:35:56 +02:00
|
|
|
|
2018-05-27 04:49:29 +02:00
|
|
|
const elasticsearchloglevel = 'info';
|
2019-09-21 23:25:18 +02:00
|
|
|
const MaxClaimsToProcessPerIteration = 100000;
|
|
|
|
const BatchSize = 5000;
|
2018-05-27 04:49:29 +02:00
|
|
|
const loggerStream = winstonStream(winston, elasticsearchloglevel);
|
2018-05-13 01:35:56 +02:00
|
|
|
const eclient = new elasticsearch.Client({
|
|
|
|
host: 'http://localhost:9200',
|
|
|
|
|
|
|
|
log: {
|
2018-05-27 04:49:29 +02:00
|
|
|
level : elasticsearchloglevel,
|
2018-05-13 01:35:56 +02:00
|
|
|
type : 'stream',
|
|
|
|
stream: loggerStream,
|
|
|
|
},
|
|
|
|
});
|
|
|
|
const queue = new ElasticQueue({elastic: eclient});
|
|
|
|
|
2018-05-27 04:56:09 +02:00
|
|
|
// Check that our syncState file exist.
|
2018-05-13 01:35:56 +02:00
|
|
|
fileExists(path.join(appRoot.path, 'syncState.json'), (err, exists) => {
|
|
|
|
if (err) { throw err }
|
|
|
|
if (!exists) {
|
|
|
|
fs.writeFileSync(path.join(appRoot.path, 'syncState.json'), '{}');
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
2019-09-22 00:47:23 +02:00
|
|
|
let status = {info: 'startup successful'};
|
2018-05-13 01:35:56 +02:00
|
|
|
|
|
|
|
export async function claimSync () {
|
|
|
|
try {
|
|
|
|
let syncState = await getJSON(path.join(appRoot.path, 'syncState.json')); // get our persisted state
|
|
|
|
if (!syncState.LastSyncTime) {
|
|
|
|
syncState.LastSyncTime = '0001-01-01 00:00:00';
|
|
|
|
}
|
2019-09-21 23:25:18 +02:00
|
|
|
if (!syncState.LastID) {
|
|
|
|
syncState.LastID = 0;
|
|
|
|
}
|
2019-09-22 00:09:11 +02:00
|
|
|
if (!syncState.StartSyncTime || syncState.LastID === 0) {
|
|
|
|
syncState.StartSyncTime = new Date().toISOString().slice(0, 19).replace('T', ' ');
|
|
|
|
}
|
2018-05-13 01:35:56 +02:00
|
|
|
status.info = 'gettingClaimsToUpdate';
|
2019-02-24 22:57:56 +01:00
|
|
|
let finished = false;
|
2019-09-21 23:25:18 +02:00
|
|
|
let lastID = syncState.LastID;
|
|
|
|
let iteration = 0;
|
2019-02-24 22:57:56 +01:00
|
|
|
while (!finished) {
|
2019-09-21 23:25:18 +02:00
|
|
|
let claimsResponse = await getClaimsSince(syncState.LastSyncTime, lastID, BatchSize);
|
2019-02-24 22:57:56 +01:00
|
|
|
let claims = JSON.parse(claimsResponse).data;
|
|
|
|
status.info = 'addingClaimsToElastic';
|
|
|
|
for (let claim of claims) {
|
2019-06-22 22:17:40 +02:00
|
|
|
if (claim.value === null) {
|
2019-06-21 06:11:16 +02:00
|
|
|
console.log(claim);
|
|
|
|
await logErrorToSlack('Failed to process claim ' + claim.claimId + ' due to missing value');
|
2019-06-22 22:37:39 +02:00
|
|
|
continue;
|
2019-02-24 22:57:56 +01:00
|
|
|
}
|
2019-06-21 06:11:16 +02:00
|
|
|
claim.value = JSON.parse(claim.value).Claim;
|
|
|
|
if (claim.name && claim.value) {
|
|
|
|
claim.suggest_name = {
|
|
|
|
input : '' + claim.name + '',
|
|
|
|
weight: '30',
|
|
|
|
};
|
|
|
|
}
|
|
|
|
if (claim.bid_state === 'Spent' || claim.bid_state === 'Expired') {
|
|
|
|
deleteFromElastic(claim.claimId);
|
|
|
|
} else {
|
|
|
|
pushElastic(claim);
|
|
|
|
}
|
|
|
|
lastID = claim.id;
|
2018-05-27 04:56:09 +02:00
|
|
|
}
|
2019-02-24 22:57:56 +01:00
|
|
|
winston.log('info', '[Importer] Pushed ' + claims.length + ' claims to elastic search [LastID]' + lastID);
|
2019-09-21 23:25:18 +02:00
|
|
|
finished = claims.length < BatchSize || (iteration * BatchSize + BatchSize >= MaxClaimsToProcessPerIteration);
|
|
|
|
iteration++;
|
2018-05-13 01:35:56 +02:00
|
|
|
}
|
2018-05-27 04:56:09 +02:00
|
|
|
deleteBlockedClaims();
|
2019-09-21 23:25:18 +02:00
|
|
|
// If not finished, store last id to run again later where we left off, otherwise update last sync time.
|
|
|
|
if (iteration * BatchSize + BatchSize >= MaxClaimsToProcessPerIteration) {
|
|
|
|
syncState.LastID = lastID;
|
|
|
|
} else {
|
2019-09-22 00:09:11 +02:00
|
|
|
syncState.LastID = 0;
|
|
|
|
syncState.LastSyncTime = syncState.StartSyncTime;
|
2019-09-21 23:25:18 +02:00
|
|
|
}
|
2018-05-13 01:35:56 +02:00
|
|
|
await saveJSON(path.join(appRoot.path, 'syncState.json'), syncState);
|
|
|
|
status.info = 'upToDate';
|
2019-09-22 00:47:23 +02:00
|
|
|
status.syncState = syncState;
|
2018-05-13 01:35:56 +02:00
|
|
|
await sleep(600000);
|
|
|
|
claimSync();
|
|
|
|
} catch (err) {
|
2019-06-21 06:11:16 +02:00
|
|
|
await logErrorToSlack(err);
|
2018-05-13 01:35:56 +02:00
|
|
|
status.err = err;
|
|
|
|
await sleep(600000);
|
|
|
|
claimSync();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
export function getStats () {
|
|
|
|
return status;
|
|
|
|
}
|
|
|
|
|
2018-05-27 04:56:09 +02:00
|
|
|
async function deleteBlockedClaims () {
|
|
|
|
winston.log('info', '[Importer] Removing blocked claims from search!');
|
|
|
|
let blockedOutputsResponse = await getBlockedOutpoints();
|
|
|
|
let outpointlist = JSON.parse(blockedOutputsResponse);
|
|
|
|
for (let outpoint of outpointlist.data.outpoints) {
|
|
|
|
let claimid = util.OutpointToClaimId(outpoint);
|
|
|
|
deleteFromElastic(claimid);
|
|
|
|
}
|
|
|
|
winston.log('info', '[Importer] Done processing blocked claims!');
|
|
|
|
}
|
|
|
|
|
|
|
|
async function deleteFromElastic (claimid) {
|
2019-09-18 18:47:21 +02:00
|
|
|
return new Promise(async (resolve, reject) => {
|
2018-05-27 04:56:09 +02:00
|
|
|
queue.push({
|
|
|
|
index: 'claims',
|
|
|
|
type : 'claim',
|
|
|
|
id : claimid,
|
|
|
|
body : {},
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2018-05-13 01:35:56 +02:00
|
|
|
async function pushElastic (claim) {
|
2019-09-18 18:47:21 +02:00
|
|
|
return new Promise(async (resolve, reject) => {
|
2018-05-13 01:35:56 +02:00
|
|
|
queue.push({
|
|
|
|
index: 'claims',
|
|
|
|
type : 'claim',
|
|
|
|
id : claim.claimId,
|
|
|
|
body : claim,
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
function getJSON (path) {
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
jsonfile.readFile(path, function (err, jsoncontent) {
|
|
|
|
if (err) {
|
2018-10-28 07:28:24 +01:00
|
|
|
logErrorToSlack(err);
|
2018-05-13 01:35:56 +02:00
|
|
|
reject(err);
|
|
|
|
} else {
|
|
|
|
resolve(jsoncontent);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
function saveJSON (path, obj) {
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
jsonfile.writeFile(path, obj, function (err, jsoncontent) {
|
|
|
|
if (err) {
|
2018-10-28 07:28:24 +01:00
|
|
|
logErrorToSlack(err);
|
2018-05-13 01:35:56 +02:00
|
|
|
reject(err);
|
|
|
|
} else {
|
|
|
|
resolve();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
function sleep (ms) {
|
|
|
|
return new Promise(resolve => setTimeout(resolve, ms));
|
|
|
|
}
|
|
|
|
|
|
|
|
function getBlockedOutpoints () {
|
|
|
|
return new Promise((resolve, reject) => {
|
2019-07-08 20:00:10 +02:00
|
|
|
rp(`https://api.lbry.com/file/list_blocked`)
|
2018-05-13 01:35:56 +02:00
|
|
|
.then(function (htmlString) {
|
|
|
|
resolve(htmlString);
|
|
|
|
})
|
|
|
|
.catch(function (err) {
|
2018-10-28 07:28:24 +01:00
|
|
|
logErrorToSlack('[Importer] Error getting blocked outpoints. ' + err);
|
2018-05-13 01:35:56 +02:00
|
|
|
reject(err);
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2019-02-24 22:57:56 +01:00
|
|
|
function getClaimsSince (time, lastID, MaxClaimsInCall) {
|
2018-05-13 01:35:56 +02:00
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
let query = `` +
|
|
|
|
`SELECT ` +
|
2019-02-24 22:57:56 +01:00
|
|
|
`c.id, ` +
|
2018-07-23 12:06:47 +02:00
|
|
|
`c.name,` +
|
|
|
|
`p.name as channel,` +
|
2019-09-20 04:15:52 +02:00
|
|
|
`p.claim_id as channel_id,` +
|
2018-07-23 12:06:47 +02:00
|
|
|
`c.bid_state,` +
|
|
|
|
`c.effective_amount,` +
|
2019-02-26 14:50:26 +01:00
|
|
|
`COALESCE(p.effective_amount,1) as certificate_amount,` +
|
2018-07-23 12:06:47 +02:00
|
|
|
`c.claim_id as claimId,` +
|
|
|
|
`c.value_as_json as value ` +
|
|
|
|
`FROM claim c ` +
|
|
|
|
`LEFT JOIN claim p on p.claim_id = c.publisher_id ` +
|
2019-02-24 22:57:56 +01:00
|
|
|
`WHERE c.id >` + lastID + ` ` +
|
|
|
|
`AND c.modified_at >='` + time + `' ` +
|
2019-02-26 14:50:26 +01:00
|
|
|
`ORDER BY c.id ` +
|
2019-02-24 22:57:56 +01:00
|
|
|
`LIMIT ` + MaxClaimsInCall;
|
2018-05-13 01:35:56 +02:00
|
|
|
// Outputs full query to console for copy/paste into chainquery (debugging)
|
2019-06-21 06:11:16 +02:00
|
|
|
console.log(query);
|
2019-07-08 20:00:10 +02:00
|
|
|
rp(`https://chainquery.lbry.com/api/sql?query=` + query)
|
2018-05-13 01:35:56 +02:00
|
|
|
.then(function (htmlString) {
|
|
|
|
resolve(htmlString);
|
|
|
|
})
|
|
|
|
.catch(function (err) {
|
2018-10-28 07:28:24 +01:00
|
|
|
logErrorToSlack('[Importer] Error getting updated claims. ' + err);
|
2018-05-13 01:35:56 +02:00
|
|
|
reject(err);
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|