Add capability to restart sync where it left off by storing state of the sync. Set to update

Signed-off-by: Mark Beamer Jr <markbeamerjr@gmail.com>
This commit is contained in:
Mark Beamer Jr 2019-09-21 17:25:18 -04:00
parent c67f298ee6
commit 162107239b
No known key found for this signature in database
GPG key ID: 1C314FB89AD76973

View file

@ -15,6 +15,8 @@ import * as util from '../../utils/importer/util';
import {logErrorToSlack} from '../../index'; import {logErrorToSlack} from '../../index';
const elasticsearchloglevel = 'info'; const elasticsearchloglevel = 'info';
const MaxClaimsToProcessPerIteration = 100000;
const BatchSize = 5000;
const loggerStream = winstonStream(winston, elasticsearchloglevel); const loggerStream = winstonStream(winston, elasticsearchloglevel);
const eclient = new elasticsearch.Client({ const eclient = new elasticsearch.Client({
host: 'http://localhost:9200', host: 'http://localhost:9200',
@ -43,12 +45,15 @@ export async function claimSync () {
if (!syncState.LastSyncTime) { if (!syncState.LastSyncTime) {
syncState.LastSyncTime = '0001-01-01 00:00:00'; syncState.LastSyncTime = '0001-01-01 00:00:00';
} }
if (!syncState.LastID) {
syncState.LastID = 0;
}
status.info = 'gettingClaimsToUpdate'; status.info = 'gettingClaimsToUpdate';
let finished = false; let finished = false;
let lastID = 0; let lastID = syncState.LastID;
let groupSize = 5000; let iteration = 0;
while (!finished) { while (!finished) {
let claimsResponse = await getClaimsSince(syncState.LastSyncTime, lastID, groupSize); let claimsResponse = await getClaimsSince(syncState.LastSyncTime, lastID, BatchSize);
let claims = JSON.parse(claimsResponse).data; let claims = JSON.parse(claimsResponse).data;
status.info = 'addingClaimsToElastic'; status.info = 'addingClaimsToElastic';
for (let claim of claims) { for (let claim of claims) {
@ -72,11 +77,16 @@ export async function claimSync () {
lastID = claim.id; lastID = claim.id;
} }
winston.log('info', '[Importer] Pushed ' + claims.length + ' claims to elastic search [LastID]' + lastID); winston.log('info', '[Importer] Pushed ' + claims.length + ' claims to elastic search [LastID]' + lastID);
finished = claims.length < groupSize; finished = claims.length < BatchSize || (iteration * BatchSize + BatchSize >= MaxClaimsToProcessPerIteration);
iteration++;
} }
deleteBlockedClaims(); deleteBlockedClaims();
// If not finished, store last id to run again later where we left off, otherwise update last sync time.
if (iteration * BatchSize + BatchSize >= MaxClaimsToProcessPerIteration) {
syncState.LastID = lastID;
} else {
syncState.LastSyncTime = new Date().toISOString().slice(0, 19).replace('T', ' '); syncState.LastSyncTime = new Date().toISOString().slice(0, 19).replace('T', ' ');
}
await saveJSON(path.join(appRoot.path, 'syncState.json'), syncState); await saveJSON(path.join(appRoot.path, 'syncState.json'), syncState);
status.info = 'upToDate'; status.info = 'upToDate';
await sleep(600000); await sleep(600000);