2018-05-13 01:35:56 +02:00
/ *
* Importer code , handles all the syncing with chainquery into elasticsearch .
* /
import elasticsearch from 'elasticsearch' ;
import ElasticQueue from 'elastic-queue' ;
import winston from 'winston' ;
import winstonStream from 'winston-stream' ;
import jsonfile from 'jsonfile' ;
import path from 'path' ;
import rp from 'request-promise' ;
import appRoot from 'app-root-path' ;
import fs from 'fs' ;
import fileExists from 'file-exists' ;
2019-09-22 00:47:23 +02:00
import * as util from './util' ;
2018-10-28 07:28:24 +01:00
import { logErrorToSlack } from '../../index' ;
2019-07-08 20:54:01 +02:00
import mysql from 'mysql' ;
2018-05-13 01:35:56 +02:00
2018-05-27 04:49:29 +02:00
const elasticsearchloglevel = 'info' ;
2019-09-21 23:25:18 +02:00
const MaxClaimsToProcessPerIteration = 100000 ;
const BatchSize = 5000 ;
2018-05-27 04:49:29 +02:00
const loggerStream = winstonStream ( winston , elasticsearchloglevel ) ;
2018-05-13 01:35:56 +02:00
const eclient = new elasticsearch . Client ( {
host : 'http://localhost:9200' ,
log : {
2018-05-27 04:49:29 +02:00
level : elasticsearchloglevel ,
2018-05-13 01:35:56 +02:00
type : 'stream' ,
stream : loggerStream ,
} ,
} ) ;
2019-10-02 18:15:24 +02:00
2018-05-13 01:35:56 +02:00
const queue = new ElasticQueue ( { elastic : eclient } ) ;
2019-10-02 18:15:24 +02:00
queue . on ( 'drain' , function ( ) {
console . log ( 'elasticsearch queue is drained' ) ;
} ) ;
2018-05-13 01:35:56 +02:00
2018-05-27 04:56:09 +02:00
// Check that our syncState file exist.
2018-05-13 01:35:56 +02:00
fileExists ( path . join ( appRoot . path , 'syncState.json' ) , ( err , exists ) => {
2019-07-08 20:54:01 +02:00
if ( err ) {
throw err ;
}
2018-05-13 01:35:56 +02:00
if ( ! exists ) {
fs . writeFileSync ( path . join ( appRoot . path , 'syncState.json' ) , '{}' ) ;
}
} ) ;
2019-09-22 00:47:23 +02:00
let status = { info : 'startup successful' } ;
2018-05-13 01:35:56 +02:00
export async function claimSync ( ) {
try {
let syncState = await getJSON ( path . join ( appRoot . path , 'syncState.json' ) ) ; // get our persisted state
if ( ! syncState . LastSyncTime ) {
syncState . LastSyncTime = '0001-01-01 00:00:00' ;
}
2019-09-21 23:25:18 +02:00
if ( ! syncState . LastID ) {
syncState . LastID = 0 ;
}
2019-09-22 00:09:11 +02:00
if ( ! syncState . StartSyncTime || syncState . LastID === 0 ) {
syncState . StartSyncTime = new Date ( ) . toISOString ( ) . slice ( 0 , 19 ) . replace ( 'T' , ' ' ) ;
}
2018-05-13 01:35:56 +02:00
status . info = 'gettingClaimsToUpdate' ;
2019-02-24 22:57:56 +01:00
let finished = false ;
2019-09-21 23:25:18 +02:00
let lastID = syncState . LastID ;
let iteration = 0 ;
2019-02-24 22:57:56 +01:00
while ( ! finished ) {
2019-07-08 21:16:54 +02:00
let claims = await getClaimsSince ( syncState . LastSyncTime , lastID , BatchSize ) ;
2019-02-24 22:57:56 +01:00
status . info = 'addingClaimsToElastic' ;
for ( let claim of claims ) {
2019-06-22 22:17:40 +02:00
if ( claim . value === null ) {
2019-06-21 06:11:16 +02:00
console . log ( claim ) ;
2019-07-08 21:25:19 +02:00
// await logErrorToSlack('Failed to process claim ' + claim.claimId + ' due to missing value');
console . error ( 'Failed to process claim ' + claim . claimId + ' due to missing value' ) ;
2019-06-22 22:37:39 +02:00
continue ;
2019-02-24 22:57:56 +01:00
}
2019-06-21 06:11:16 +02:00
claim . value = JSON . parse ( claim . value ) . Claim ;
if ( claim . name && claim . value ) {
claim . suggest _name = {
input : '' + claim . name + '' ,
weight : '30' ,
} ;
}
if ( claim . bid _state === 'Spent' || claim . bid _state === 'Expired' ) {
2019-10-02 18:43:12 +02:00
deleteFromElastic ( claim . claimId ) ;
2019-06-21 06:11:16 +02:00
} else {
2019-10-02 18:43:12 +02:00
pushElastic ( claim ) ;
2019-06-21 06:11:16 +02:00
}
lastID = claim . id ;
2018-05-27 04:56:09 +02:00
}
2019-02-24 22:57:56 +01:00
winston . log ( 'info' , '[Importer] Pushed ' + claims . length + ' claims to elastic search [LastID]' + lastID ) ;
2019-09-21 23:25:18 +02:00
finished = claims . length < BatchSize || ( iteration * BatchSize + BatchSize >= MaxClaimsToProcessPerIteration ) ;
iteration ++ ;
2018-05-13 01:35:56 +02:00
}
2019-10-02 18:04:17 +02:00
await deleteBlockedClaims ( ) ;
2019-09-21 23:25:18 +02:00
// If not finished, store last id to run again later where we left off, otherwise update last sync time.
if ( iteration * BatchSize + BatchSize >= MaxClaimsToProcessPerIteration ) {
syncState . LastID = lastID ;
} else {
2019-09-22 00:09:11 +02:00
syncState . LastID = 0 ;
syncState . LastSyncTime = syncState . StartSyncTime ;
2019-09-21 23:25:18 +02:00
}
2018-05-13 01:35:56 +02:00
await saveJSON ( path . join ( appRoot . path , 'syncState.json' ) , syncState ) ;
status . info = 'upToDate' ;
2019-09-22 00:47:23 +02:00
status . syncState = syncState ;
2018-05-13 01:35:56 +02:00
await sleep ( 600000 ) ;
2019-10-02 18:04:17 +02:00
await claimSync ( ) ;
2018-05-13 01:35:56 +02:00
} catch ( err ) {
2019-06-21 06:11:16 +02:00
await logErrorToSlack ( err ) ;
2018-05-13 01:35:56 +02:00
status . err = err ;
await sleep ( 600000 ) ;
2019-10-02 18:04:17 +02:00
await claimSync ( ) ;
2018-05-13 01:35:56 +02:00
}
}
export function getStats ( ) {
return status ;
}
2018-05-27 04:56:09 +02:00
async function deleteBlockedClaims ( ) {
winston . log ( 'info' , '[Importer] Removing blocked claims from search!' ) ;
let blockedOutputsResponse = await getBlockedOutpoints ( ) ;
let outpointlist = JSON . parse ( blockedOutputsResponse ) ;
for ( let outpoint of outpointlist . data . outpoints ) {
let claimid = util . OutpointToClaimId ( outpoint ) ;
deleteFromElastic ( claimid ) ;
}
winston . log ( 'info' , '[Importer] Done processing blocked claims!' ) ;
}
2019-10-02 18:43:12 +02:00
function deleteFromElastic ( claimid ) {
queue . push ( {
index : 'claims' ,
type : 'claim' ,
id : claimid ,
body : { } ,
2018-05-27 04:56:09 +02:00
} ) ;
}
2019-10-02 18:43:12 +02:00
function pushElastic ( claim ) {
queue . push ( {
index : 'claims' ,
type : 'claim' ,
id : claim . claimId ,
body : claim ,
2018-05-13 01:35:56 +02:00
} ) ;
}
function getJSON ( path ) {
return new Promise ( ( resolve , reject ) => {
jsonfile . readFile ( path , function ( err , jsoncontent ) {
if ( err ) {
2018-10-28 07:28:24 +01:00
logErrorToSlack ( err ) ;
2018-05-13 01:35:56 +02:00
reject ( err ) ;
} else {
resolve ( jsoncontent ) ;
}
} ) ;
} ) ;
}
2019-07-08 20:54:01 +02:00
2018-05-13 01:35:56 +02:00
function saveJSON ( path , obj ) {
return new Promise ( ( resolve , reject ) => {
jsonfile . writeFile ( path , obj , function ( err , jsoncontent ) {
if ( err ) {
2018-10-28 07:28:24 +01:00
logErrorToSlack ( err ) ;
2018-05-13 01:35:56 +02:00
reject ( err ) ;
} else {
resolve ( ) ;
}
} ) ;
} ) ;
}
function sleep ( ms ) {
return new Promise ( resolve => setTimeout ( resolve , ms ) ) ;
}
function getBlockedOutpoints ( ) {
return new Promise ( ( resolve , reject ) => {
2019-07-08 20:00:10 +02:00
rp ( ` https://api.lbry.com/file/list_blocked ` )
2018-05-13 01:35:56 +02:00
. then ( function ( htmlString ) {
resolve ( htmlString ) ;
} )
. catch ( function ( err ) {
2018-10-28 07:28:24 +01:00
logErrorToSlack ( '[Importer] Error getting blocked outpoints. ' + err ) ;
2018-05-13 01:35:56 +02:00
reject ( err ) ;
} ) ;
} ) ;
}
2019-07-08 20:54:01 +02:00
let connection = null ;
const chainqueryConfig = require ( '../../../chainquery-config.json' ) ;
2019-02-24 22:57:56 +01:00
function getClaimsSince ( time , lastID , MaxClaimsInCall ) {
2019-07-08 20:54:01 +02:00
if ( connection === null ) {
connection = mysql . createConnection ( {
host : chainqueryConfig . host ,
user : chainqueryConfig . user ,
password : chainqueryConfig . password ,
database : chainqueryConfig . db ,
} ) ;
connection . connect ( ) ;
}
2018-05-13 01:35:56 +02:00
return new Promise ( ( resolve , reject ) => {
2019-07-08 20:54:01 +02:00
let query = ` SELECT c.id, c.name,p.name as channel, p.claim_id as channel_id, c.bid_state,c.effective_amount,COALESCE(p.effective_amount,1) as certificate_amount,c.claim_id as claimId,c.value_as_json as value FROM claim c LEFT JOIN claim p on p.claim_id = c.publisher_id WHERE c.id > ${ lastID } AND c.modified_at >=' ${ time } ' ORDER BY c.id LIMIT ${ MaxClaimsInCall } ` ;
2018-05-13 01:35:56 +02:00
// Outputs full query to console for copy/paste into chainquery (debugging)
2019-06-21 06:11:16 +02:00
console . log ( query ) ;
2019-07-08 20:54:01 +02:00
connection . query ( query , function ( err , results , fields ) {
if ( err ) {
2019-07-08 21:03:12 +02:00
console . error ( err ) ;
2018-10-28 07:28:24 +01:00
logErrorToSlack ( '[Importer] Error getting updated claims. ' + err ) ;
2019-07-08 20:54:01 +02:00
return reject ( err ) ;
}
2019-07-08 21:16:54 +02:00
let claims = [ ] ;
for ( let i = 0 ; i < results . length ; i ++ ) {
let r = results [ i ] ;
claims . push ( {
id : r . id ,
name : r . name ,
channel : r . channel ,
bid _state : r . bid _state ,
effective _amount : r . effective _amount ,
certificate _amount : r . certificate _amount ,
claimId : r . claimId ,
value : r . value ,
} ) ;
}
resolve ( claims ) ;
2019-07-08 20:54:01 +02:00
} ) ;
2018-05-13 01:35:56 +02:00
} ) ;
}