Batched the chainquery calls for lighthouse in case it needs to grab all 700K in a redo.
This commit is contained in:
parent
b04f2df1e1
commit
81986315cb
1 changed files with 29 additions and 20 deletions
|
@ -44,24 +44,32 @@ export async function claimSync () {
|
|||
syncState.LastSyncTime = '0001-01-01 00:00:00';
|
||||
}
|
||||
status.info = 'gettingClaimsToUpdate';
|
||||
let claimsResponse = await getClaimsSince(syncState.LastSyncTime);
|
||||
let claims = JSON.parse(claimsResponse).data;
|
||||
status.info = 'addingClaimsToElastic';
|
||||
for (let claim of claims) {
|
||||
claim.value = JSON.parse(claim.value).Claim;
|
||||
if (claim.name && claim.value) {
|
||||
claim.suggest_name = {
|
||||
input : '' + claim.name + '',
|
||||
weight: '30',
|
||||
};
|
||||
}
|
||||
if (claim.bid_state === 'Spent' || claim.bid_state === 'Expired') {
|
||||
deleteFromElastic(claim.claimId);
|
||||
} else {
|
||||
pushElastic(claim);
|
||||
let finished = false;
|
||||
let lastID = 0;
|
||||
let groupSize = 5000;
|
||||
while (!finished) {
|
||||
let claimsResponse = await getClaimsSince(syncState.LastSyncTime, lastID, groupSize);
|
||||
let claims = JSON.parse(claimsResponse).data;
|
||||
status.info = 'addingClaimsToElastic';
|
||||
for (let claim of claims) {
|
||||
claim.value = JSON.parse(claim.value).Claim;
|
||||
if (claim.name && claim.value) {
|
||||
claim.suggest_name = {
|
||||
input : '' + claim.name + '',
|
||||
weight: '30',
|
||||
};
|
||||
}
|
||||
if (claim.bid_state === 'Spent' || claim.bid_state === 'Expired') {
|
||||
deleteFromElastic(claim.claimId);
|
||||
} else {
|
||||
pushElastic(claim);
|
||||
}
|
||||
lastID = claim.id;
|
||||
}
|
||||
winston.log('info', '[Importer] Pushed ' + claims.length + ' claims to elastic search [LastID]' + lastID);
|
||||
finished = claims.length < groupSize;
|
||||
}
|
||||
winston.log('info', '[Importer] Pushed ' + claims.length + ' claims to elastic search');
|
||||
|
||||
deleteBlockedClaims();
|
||||
syncState.LastSyncTime = new Date().toISOString().slice(0, 19).replace('T', ' ');
|
||||
await saveJSON(path.join(appRoot.path, 'syncState.json'), syncState);
|
||||
|
@ -155,21 +163,22 @@ function getBlockedOutpoints () {
|
|||
});
|
||||
}
|
||||
|
||||
function getClaimsSince (time) {
|
||||
function getClaimsSince (time, lastID, MaxClaimsInCall) {
|
||||
return new Promise((resolve, reject) => {
|
||||
let query = `` +
|
||||
`SELECT ` +
|
||||
`c.id, ` +
|
||||
`c.name,` +
|
||||
`p.name as channel,` +
|
||||
`c.bid_state,` +
|
||||
`c.effective_amount,` +
|
||||
`c.claim_id as claimId,` +
|
||||
`c.value_as_json as value ` +
|
||||
// `,transaction_by_hash_id, ` + // txhash and vout needed to leverage old format for comparison.
|
||||
// `vout ` +
|
||||
`FROM claim c ` +
|
||||
`LEFT JOIN claim p on p.claim_id = c.publisher_id ` +
|
||||
`WHERE c.modified_at >='` + time + `'`;
|
||||
`WHERE c.id >` + lastID + ` ` +
|
||||
`AND c.modified_at >='` + time + `' ` +
|
||||
`LIMIT ` + MaxClaimsInCall;
|
||||
// Outputs full query to console for copy/paste into chainquery (debugging)
|
||||
// console.log(query);
|
||||
rp(`https://chainquery.lbry.io/api/sql?query=` + query)
|
||||
|
|
Loading…
Reference in a new issue