/*
 *  Importer code, handles all the syncing with chainquery into elasticsearch.
 */
import elasticsearch from 'elasticsearch';
import ElasticQueue from 'elastic-queue';
import winston from 'winston';
import winstonStream from 'winston-stream';
import jsonfile from 'jsonfile';
import path from 'path';
import rp from 'request-promise';
import appRoot from 'app-root-path';
import fs from 'fs';
import fileExists from 'file-exists';
import * as util from './util';
import {logErrorToSlack} from '../../index';
import mysql from 'mysql';
import chainqueryConfig from '../../../chainquery-config.json';

let connection = null;

const esLogLevel = 'info';
const MaxClaimsToProcessPerIteration = 100000;
const BatchSize = 5000;
const loggerStream = winstonStream(winston, esLogLevel);
const eclient = new elasticsearch.Client({
  host: 'http://localhost:9200',

  log: {
    level : esLogLevel,
    type  : 'stream',
    stream: loggerStream,
  },
});

const queue = new ElasticQueue({elastic: eclient});
queue.on('drain', function () {
  console.log('elasticsearch queue is drained');
});

// Check that our syncState file exist.
fileExists(path.join(appRoot.path, 'syncState.json'), (err, exists) => {
  if (err) {
    throw err;
  }
  if (!exists) {
    fs.writeFileSync(path.join(appRoot.path, 'syncState.json'), '{}');
  }
});

let status = {info: 'startup successful'};

export async function claimSync () {
  try {
    let syncState = await getJSON(path.join(appRoot.path, 'syncState.json')); // get our persisted state
    if (!syncState.LastSyncTime) {
      syncState.LastSyncTime = '0001-01-01 00:00:00';
    }
    if (!syncState.LastID) {
      syncState.LastID = 0;
    }
    if (!syncState.StartSyncTime || syncState.LastID === 0) {
      syncState.StartSyncTime = new Date().toISOString().slice(0, 19).replace('T', ' ');
    }
    status.info = 'gettingClaimsToUpdate';
    let finished = false;
    let lastID = syncState.LastID;
    let iteration = 0;
    while (!finished) {
      let claims = await getClaimsSince(syncState.LastSyncTime, lastID, BatchSize);
      status.info = 'addingClaimsToElastic';
      for (let claim of claims) {
        if (claim.value === null) {
          console.log(claim);
          // await logErrorToSlack('Failed to process claim ' + claim.claimId + ' due to missing value');
          console.error('Failed to process claim ' + claim.claimId + ' due to missing value');
          continue;
        }
        claim.value = claim.value.Claim;
        if (claim.name && claim.value) {
          claim.suggest_name = {
            input : '' + claim.name + '',
            weight: '30',
          };
        }
        if (claim.bid_state === 'Spent' || claim.bid_state === 'Expired') {
          deleteFromElastic(claim.claimId);
        } else {
          pushElastic(claim);
        }
        lastID = claim.id;
      }
      winston.log('info', '[Importer] Pushed ' + claims.length + ' claims to elastic search [LastID]' + lastID);
      finished = claims.length < BatchSize || (iteration * BatchSize + BatchSize >= MaxClaimsToProcessPerIteration);
      iteration++;
    }
    await deleteBlockedClaims();
    // If not finished, store last id to run again later where we left off, otherwise update last sync time.
    if (iteration * BatchSize + BatchSize >= MaxClaimsToProcessPerIteration) {
      syncState.LastID = lastID;
    } else {
      syncState.LastID = 0;
      syncState.LastSyncTime = syncState.StartSyncTime;
    }
    await saveJSON(path.join(appRoot.path, 'syncState.json'), syncState);
    status.info = 'upToDate';
    status.syncState = syncState;
    await sleep(600000);
    await claimSync();
  } catch (err) {
    await logErrorToSlack(err);
    status.err = err;
    await sleep(600000);
    await claimSync();
  }
}

export function getStats () {
  return status;
}

async function deleteBlockedClaims () {
  winston.log('info', '[Importer] Removing blocked claims from search!');
  let blockedOutputsResponse = await getBlockedOutpoints();
  let outpointlist = JSON.parse(blockedOutputsResponse);
  for (let outpoint of outpointlist.data.outpoints) {
    let claimid = util.OutpointToClaimId(outpoint);
    deleteFromElastic(claimid);
  }
  winston.log('info', '[Importer] Done processing blocked claims!');
}

function deleteFromElastic (claimid) {
  queue.push({
    index: 'claims',
    type : 'claim',
    id   : claimid,
    body : {},
  });
}

function pushElastic (claim) {
  queue.push({
    index: 'claims',
    type : 'claim',
    id   : claim.claimId,
    body : claim,
  });
}

function getJSON (path) {
  return new Promise((resolve, reject) => {
    jsonfile.readFile(path, function (err, jsoncontent) {
      if (err) {
        logErrorToSlack(err);
        reject(err);
      } else {
        resolve(jsoncontent);
      }
    });
  });
}

function saveJSON (path, obj) {
  return new Promise((resolve, reject) => {
    jsonfile.writeFile(path, obj, function (err, jsoncontent) {
      if (err) {
        logErrorToSlack(err);
        reject(err);
      } else {
        resolve();
      }
    });
  });
}

function sleep (ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

function getBlockedOutpoints () {
  return new Promise((resolve, reject) => {
    rp(`https://api.lbry.com/file/list_blocked`)
      .then(function (htmlString) {
        resolve(htmlString);
      })
      .catch(function (err) {
        logErrorToSlack('[Importer] Error getting blocked outpoints. ' + err);
        reject(err);
      });
  });
}

function getChainqueryConnection () {
  if (connection === null) {
    connection = mysql.createConnection({
      host    : chainqueryConfig.host,
      user    : chainqueryConfig.user,
      password: chainqueryConfig.password,
      database: chainqueryConfig.db,
    });
    connection.connect();
  }
  return connection;
}

function getClaimsSince (time, lastID, MaxClaimsInCall) {
  return new Promise((resolve, reject) => {
    let query = `SELECT c.id,
     c.name,
     p.name as channel,
     p.claim_id as channel_id,
     c.bid_state,
     c.effective_amount,
     COALESCE(p.effective_amount,1) as certificate_amount,
     c.claim_id as claimId,
     c.value_as_json as value
     FROM claim c LEFT JOIN claim p
     on p.claim_id = c.publisher_id
     WHERE c.id >${lastID} AND
     c.modified_at >='${time}'
     ORDER BY c.id LIMIT ${MaxClaimsInCall}`;
    // Outputs full query to console for copy/paste into chainquery (debugging)
    console.log(query);
    getChainqueryConnection().query(query, function (err, results, fields) {
      if (err) {
        console.error(err);
        logErrorToSlack('[Importer] Error getting updated claims. ' + err);
        return reject(err);
      }
      let claims = [];
      for (let i = 0; i < results.length; i++) {
        let r = results[i];
        let value = null;
        try {
          value = JSON.parse(r.value);
        } catch (e) {
          console.error(e);
          console.error(r.value);
        }
        claims.push({
          id                : r.id,
          name              : r.name,
          channel           : r.channel,
          channel_claim_id  : r.channel_id,
          bid_state         : r.bid_state,
          effective_amount  : r.effective_amount,
          certificate_amount: r.certificate_amount,
          claimId           : r.claimId,
          value             : value,
        });
      }
      resolve(claims);
    });
  });
}