Compare commits

..

1 commit

Author SHA1 Message Date
Mark Beamer Jr
e7c97e8420
Strip @ prefix subquery
Signed-off-by: Mark Beamer Jr <markbeamerjr@gmail.com>
2019-09-22 01:56:36 -04:00
10 changed files with 143 additions and 166 deletions

1
.gitignore vendored
View file

@ -9,4 +9,3 @@ npm-debug.log
claimTrieCache.json claimTrieCache.json
syncState.json syncState.json
yarn-error.log yarn-error.log
chainquery-config.json

View file

@ -1,6 +1,6 @@
The MIT License (MIT) The MIT License (MIT)
Copyright (c) 2017-2020 LBRY Inc Copyright (c) 2017-2018 LBRY Inc
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish,distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish,distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

View file

@ -1,6 +0,0 @@
{
"host": "chainquery.lbry.com",
"user": "lighthouse",
"password": "",
"db": "chainquery"
}

67
decoder/decoder.py Normal file
View file

@ -0,0 +1,67 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
import json, os
from bitcoinrpc.authproxy import AuthServiceProxy
from lbryschema.decode import smart_decode
from flask import Flask, url_for
app = Flask(__name__)
def get_lbrycrdd_connection_details(wallet_conf):
settings = {"username": "lbry",
"password": "lbry",
"rpc_port": 9245}
if wallet_conf and os.path.exists(wallet_conf):
with open(wallet_conf, "r") as conf:
conf_lines = conf.readlines()
for l in conf_lines:
if l.startswith("rpcuser="):
settings["username"] = l[8:].rstrip('\n')
if l.startswith("rpcpassword="):
settings["password"] = l[12:].rstrip('\n')
if l.startswith("rpcport="):
settings["rpc_port"] = int(l[8:].rstrip('\n'))
rpc_user = settings["username"]
rpc_pass = settings["password"]
rpc_port = settings["rpc_port"]
rpc_url = "127.0.0.1"
return "http://%s:%s@%s:%i" % (rpc_user, rpc_pass, rpc_url, rpc_port)
@app.errorhandler(500)
def internal_error(error):
return 'error when decoding claims'
@app.route('/claim_decode/<txid>/<nout>')
def api_decode(txid, nout):
connection_string = get_lbrycrdd_connection_details(os.path.expanduser("~")+"/.lbrycrd/lbrycrd.conf")
rpc = AuthServiceProxy(connection_string)
result = rpc.getclaimsfortx(txid)
claim = None
for claim_out in result:
if claim_out['nOut'] == int(nout):
claim = claim_out
break
if claim:
converted = ''.join([chr(ord(i)) for i in claim['value']])
decoded = smart_decode(converted)
claim['value'] = decoded.claim_dict
return json.dumps(claim)
@app.route('/claim_decodeinv/<claimid>')
def api_decodebyclaim(claimid):
connection_string = get_lbrycrdd_connection_details(os.path.expanduser("~")+"/.lbrycrd/lbrycrd.conf")
rpc = AuthServiceProxy(connection_string)
claim = rpc.getvalueforname(claimid)
if claim:
converted = ''.join([chr(ord(i)) for i in claim['value']])
decoded = smart_decode(converted)
claim['value'] = decoded.claim_dict
return json.dumps(claim)
if __name__ == '__main__':
app.run(host='127.0.0.1')

3
decoder/requirements.txt Normal file
View file

@ -0,0 +1,3 @@
git+https://github.com/lbryio/lbryschema.git#egg=lbryschema
python-bitcoinrpc==0.1
flask

34
package-lock.json generated
View file

@ -2814,11 +2814,6 @@
"resolved": "https://registry.npmjs.org/big.js/-/big.js-5.2.2.tgz", "resolved": "https://registry.npmjs.org/big.js/-/big.js-5.2.2.tgz",
"integrity": "sha512-vyL2OymJxmarO8gxMr0mhChsO9QGwhynfuu4+MHTAW6czfq9humCB7rKpUjDd9YUiDPU4mzpyupFSvOClAwbmQ==" "integrity": "sha512-vyL2OymJxmarO8gxMr0mhChsO9QGwhynfuu4+MHTAW6czfq9humCB7rKpUjDd9YUiDPU4mzpyupFSvOClAwbmQ=="
}, },
"bignumber.js": {
"version": "7.2.1",
"resolved": "https://registry.npmjs.org/bignumber.js/-/bignumber.js-7.2.1.tgz",
"integrity": "sha512-S4XzBk5sMB+Rcb/LNcpzXr57VRTxgAvaAEDAl1AwRx27j00hT84O6OkteE7u8UB3NuaaygCRrEpqox4uDOrbdQ=="
},
"binary-extensions": { "binary-extensions": {
"version": "2.0.0", "version": "2.0.0",
"resolved": "https://registry.npmjs.org/binary-extensions/-/binary-extensions-2.0.0.tgz", "resolved": "https://registry.npmjs.org/binary-extensions/-/binary-extensions-2.0.0.tgz",
@ -5336,9 +5331,9 @@
"dev": true "dev": true
}, },
"handlebars": { "handlebars": {
"version": "4.5.3", "version": "4.1.2",
"resolved": "https://registry.npmjs.org/handlebars/-/handlebars-4.5.3.tgz", "resolved": "https://registry.npmjs.org/handlebars/-/handlebars-4.1.2.tgz",
"integrity": "sha512-3yPecJoJHK/4c6aZhSvxOyG4vJKDshV36VHp0iVCDVh7o9w2vwi3NSnL2MMPj3YdduqaBcu7cGbggJQM0br9xA==", "integrity": "sha512-nvfrjqvt9xQ8Z/w0ijewdD/vvWDTOweBUm96NTr66Wfvo1mJenBLwcYmPs3TIBP5ruzYGD7Hx/DaM9RmhroGPw==",
"requires": { "requires": {
"neo-async": "^2.6.0", "neo-async": "^2.6.0",
"optimist": "^0.6.1", "optimist": "^0.6.1",
@ -7127,24 +7122,6 @@
"resolved": "https://registry.npmjs.org/mute-stream/-/mute-stream-0.0.6.tgz", "resolved": "https://registry.npmjs.org/mute-stream/-/mute-stream-0.0.6.tgz",
"integrity": "sha1-SJYrGeFp/R38JAs/HnMXYnu8R9s=" "integrity": "sha1-SJYrGeFp/R38JAs/HnMXYnu8R9s="
}, },
"mysql": {
"version": "2.17.1",
"resolved": "https://registry.npmjs.org/mysql/-/mysql-2.17.1.tgz",
"integrity": "sha512-7vMqHQ673SAk5C8fOzTG2LpPcf3bNt0oL3sFpxPEEFp1mdlDcrLK0On7z8ZYKaaHrHwNcQ/MTUz7/oobZ2OyyA==",
"requires": {
"bignumber.js": "7.2.1",
"readable-stream": "2.3.6",
"safe-buffer": "5.1.2",
"sqlstring": "2.3.1"
},
"dependencies": {
"safe-buffer": {
"version": "5.1.2",
"resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz",
"integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g=="
}
}
},
"nan": { "nan": {
"version": "2.14.0", "version": "2.14.0",
"resolved": "https://registry.npmjs.org/nan/-/nan-2.14.0.tgz", "resolved": "https://registry.npmjs.org/nan/-/nan-2.14.0.tgz",
@ -11007,11 +10984,6 @@
"resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.0.3.tgz", "resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.0.3.tgz",
"integrity": "sha1-BOaSb2YolTVPPdAVIDYzuFcpfiw=" "integrity": "sha1-BOaSb2YolTVPPdAVIDYzuFcpfiw="
}, },
"sqlstring": {
"version": "2.3.1",
"resolved": "https://registry.npmjs.org/sqlstring/-/sqlstring-2.3.1.tgz",
"integrity": "sha1-R1OT/56RR5rqYtyvDKPRSYOn+0A="
},
"sshpk": { "sshpk": {
"version": "1.16.1", "version": "1.16.1",
"resolved": "https://registry.npmjs.org/sshpk/-/sshpk-1.16.1.tgz", "resolved": "https://registry.npmjs.org/sshpk/-/sshpk-1.16.1.tgz",

View file

@ -56,7 +56,6 @@
"koa-logger": "^2.0.0", "koa-logger": "^2.0.0",
"koa-router": "^7.0.0", "koa-router": "^7.0.0",
"limited-request-queue": "^3.0.4", "limited-request-queue": "^3.0.4",
"mysql": "^2.17.1",
"node-slack": "^0.0.7", "node-slack": "^0.0.7",
"oas": "^0.8.15", "oas": "^0.8.15",
"ora": "^1.3.0", "ora": "^1.3.0",

View file

@ -40,38 +40,20 @@ function getResults (input) {
let escapedQuery = getEscapedQuery(trimmedQuery); let escapedQuery = getEscapedQuery(trimmedQuery);
let washedQuery = getEscapedQuery(getWashedQuery(trimmedQuery)); let washedQuery = getEscapedQuery(getWashedQuery(trimmedQuery));
let effectiveFactor = '0.00000000001'; let effectiveFactor = '0.00000000001';
const dynamicFilters = () => { // Search is split up into different parts, all search parts goes under this line.
let queries = []; let channelSearch;
// Search is split up into different parts, all search parts goes under this line. if (input.channel !== undefined) { // If we got a channel argument, lets filter out only that channel
if (input.channel_id !== undefined) { channelSearch = {
const channelidSearch = { // If we got a channel_id argument, lets filter out only that channel_id 'bool': {
'bool': { 'must': {
'must': { 'query_string': {
'query_string': { 'fields': ['channel'],
'fields': ['channel_claim_id'], 'query' : getEscapedQuery(input.channel.trim()),
'query' : getEscapedQuery(input.channel_id.trim()),
},
}, },
}, },
}; },
queries.push(channelidSearch); };
} }
if (input.channel !== undefined) { // If we got a channel argument, lets filter out only that channel
const channelSearch = {
'bool': {
'must': {
'query_string': {
'fields': ['channel'],
'query' : getEscapedQuery(input.channel.trim()),
},
},
},
};
queries.push(channelSearch);
}
return queries;
};
const conBoost = { // Controlling claims should get higher placement in search results. const conBoost = { // Controlling claims should get higher placement in search results.
'match': { 'match': {
'bid_state': { 'bid_state': {
@ -93,7 +75,7 @@ function getResults (input) {
'function_score': { 'function_score': {
'field_value_factor': { 'field_value_factor': {
'field' : 'certificate_amount', 'field' : 'certificate_amount',
'factor' : effectiveFactor / 10, 'factor' : effectiveFactor,
'missing': 1, 'missing': 1,
}, },
}, },
@ -282,7 +264,7 @@ function getResults (input) {
channelIdentifier, channelIdentifier,
], ],
'must': [ 'must': [
...dynamicFilters(), channelSearch,
{ {
'bool': { 'bool': {
'should': [ 'should': [
@ -305,7 +287,7 @@ function getResults (input) {
}, },
}, },
}; };
// console.log('QUERY: ', JSON.stringify(esQuery)); // console.log('QUERY: ', esQuery);
return eclient.search(esQuery); return eclient.search(esQuery);
} }

View file

@ -6,14 +6,14 @@ import helmet from 'koa-helmet';
import routing from './routes/'; import routing from './routes/';
import { port } from './config'; import { port } from './config';
import winston from 'winston'; import winston from 'winston';
import Slack from 'node-slack'; import slack from 'node-slack';
require('winston-daily-rotate-file'); require('winston-daily-rotate-file');
// Setup logging // Setup logging
winston.remove(winston.transports.Console); winston.remove(winston.transports.Console);
winston.add(winston.transports.Console, { colorize: true, timestamp: true, prettyPrint: true }); winston.add(winston.transports.Console, { colorize: true, timestamp: true, prettyPrint: true });
const slackAPIKey = process.env.SLACK_HOOK_URL; var slackAPIKey = process.env.SLACK_HOOK_URL;
const mySlack = new Slack(slackAPIKey, {}); var mySlack = new slack(slackAPIKey, {});
// Create Koa Application // Create Koa Application
const app = new Koa(); const app = new Koa();

View file

@ -13,35 +13,25 @@ import fs from 'fs';
import fileExists from 'file-exists'; import fileExists from 'file-exists';
import * as util from './util'; import * as util from './util';
import {logErrorToSlack} from '../../index'; import {logErrorToSlack} from '../../index';
import mysql from 'mysql';
import chainqueryConfig from '../../../chainquery-config.json';
let connection = null; const elasticsearchloglevel = 'info';
const esLogLevel = 'info';
const MaxClaimsToProcessPerIteration = 100000; const MaxClaimsToProcessPerIteration = 100000;
const BatchSize = 5000; const BatchSize = 5000;
const loggerStream = winstonStream(winston, esLogLevel); const loggerStream = winstonStream(winston, elasticsearchloglevel);
const eclient = new elasticsearch.Client({ const eclient = new elasticsearch.Client({
host: 'http://localhost:9200', host: 'http://localhost:9200',
log: { log: {
level : esLogLevel, level : elasticsearchloglevel,
type : 'stream', type : 'stream',
stream: loggerStream, stream: loggerStream,
}, },
}); });
const queue = new ElasticQueue({elastic: eclient}); const queue = new ElasticQueue({elastic: eclient});
queue.on('drain', function () {
console.log('elasticsearch queue is drained');
});
// Check that our syncState file exist. // Check that our syncState file exist.
fileExists(path.join(appRoot.path, 'syncState.json'), (err, exists) => { fileExists(path.join(appRoot.path, 'syncState.json'), (err, exists) => {
if (err) { if (err) { throw err }
throw err;
}
if (!exists) { if (!exists) {
fs.writeFileSync(path.join(appRoot.path, 'syncState.json'), '{}'); fs.writeFileSync(path.join(appRoot.path, 'syncState.json'), '{}');
} }
@ -66,16 +56,16 @@ export async function claimSync () {
let lastID = syncState.LastID; let lastID = syncState.LastID;
let iteration = 0; let iteration = 0;
while (!finished) { while (!finished) {
let claims = await getClaimsSince(syncState.LastSyncTime, lastID, BatchSize); let claimsResponse = await getClaimsSince(syncState.LastSyncTime, lastID, BatchSize);
let claims = JSON.parse(claimsResponse).data;
status.info = 'addingClaimsToElastic'; status.info = 'addingClaimsToElastic';
for (let claim of claims) { for (let claim of claims) {
if (claim.value === null) { if (claim.value === null) {
console.log(claim); console.log(claim);
// await logErrorToSlack('Failed to process claim ' + claim.claimId + ' due to missing value'); await logErrorToSlack('Failed to process claim ' + claim.claimId + ' due to missing value');
console.error('Failed to process claim ' + claim.claimId + ' due to missing value');
continue; continue;
} }
claim.value = claim.value.Claim; claim.value = JSON.parse(claim.value).Claim;
if (claim.name && claim.value) { if (claim.name && claim.value) {
claim.suggest_name = { claim.suggest_name = {
input : '' + claim.name + '', input : '' + claim.name + '',
@ -93,7 +83,7 @@ export async function claimSync () {
finished = claims.length < BatchSize || (iteration * BatchSize + BatchSize >= MaxClaimsToProcessPerIteration); finished = claims.length < BatchSize || (iteration * BatchSize + BatchSize >= MaxClaimsToProcessPerIteration);
iteration++; iteration++;
} }
await deleteBlockedClaims(); deleteBlockedClaims();
// If not finished, store last id to run again later where we left off, otherwise update last sync time. // If not finished, store last id to run again later where we left off, otherwise update last sync time.
if (iteration * BatchSize + BatchSize >= MaxClaimsToProcessPerIteration) { if (iteration * BatchSize + BatchSize >= MaxClaimsToProcessPerIteration) {
syncState.LastID = lastID; syncState.LastID = lastID;
@ -105,12 +95,12 @@ export async function claimSync () {
status.info = 'upToDate'; status.info = 'upToDate';
status.syncState = syncState; status.syncState = syncState;
await sleep(600000); await sleep(600000);
await claimSync(); claimSync();
} catch (err) { } catch (err) {
await logErrorToSlack(err); await logErrorToSlack(err);
status.err = err; status.err = err;
await sleep(600000); await sleep(600000);
await claimSync(); claimSync();
} }
} }
@ -129,21 +119,25 @@ async function deleteBlockedClaims () {
winston.log('info', '[Importer] Done processing blocked claims!'); winston.log('info', '[Importer] Done processing blocked claims!');
} }
function deleteFromElastic (claimid) { async function deleteFromElastic (claimid) {
queue.push({ return new Promise(async (resolve, reject) => {
index: 'claims', queue.push({
type : 'claim', index: 'claims',
id : claimid, type : 'claim',
body : {}, id : claimid,
body : {},
});
}); });
} }
function pushElastic (claim) { async function pushElastic (claim) {
queue.push({ return new Promise(async (resolve, reject) => {
index: 'claims', queue.push({
type : 'claim', index: 'claims',
id : claim.claimId, type : 'claim',
body : claim, id : claim.claimId,
body : claim,
});
}); });
} }
@ -159,7 +153,6 @@ function getJSON (path) {
}); });
}); });
} }
function saveJSON (path, obj) { function saveJSON (path, obj) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
jsonfile.writeFile(path, obj, function (err, jsoncontent) { jsonfile.writeFile(path, obj, function (err, jsoncontent) {
@ -190,66 +183,34 @@ function getBlockedOutpoints () {
}); });
} }
function getChainqueryConnection () {
if (connection === null) {
connection = mysql.createConnection({
host : chainqueryConfig.host,
user : chainqueryConfig.user,
password: chainqueryConfig.password,
database: chainqueryConfig.db,
});
connection.connect();
}
return connection;
}
function getClaimsSince (time, lastID, MaxClaimsInCall) { function getClaimsSince (time, lastID, MaxClaimsInCall) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let query = `SELECT c.id, let query = `` +
c.name, `SELECT ` +
p.name as channel, `c.id, ` +
p.claim_id as channel_id, `c.name,` +
c.bid_state, `p.name as channel,` +
c.effective_amount, `p.claim_id as channel_id,` +
COALESCE(p.effective_amount,1) as certificate_amount, `c.bid_state,` +
c.claim_id as claimId, `c.effective_amount,` +
c.value_as_json as value `COALESCE(p.effective_amount,1) as certificate_amount,` +
FROM claim c LEFT JOIN claim p `c.claim_id as claimId,` +
on p.claim_id = c.publisher_id `c.value_as_json as value ` +
WHERE c.id >${lastID} AND `FROM claim c ` +
c.modified_at >='${time}' `LEFT JOIN claim p on p.claim_id = c.publisher_id ` +
ORDER BY c.id LIMIT ${MaxClaimsInCall}`; `WHERE c.id >` + lastID + ` ` +
`AND c.modified_at >='` + time + `' ` +
`ORDER BY c.id ` +
`LIMIT ` + MaxClaimsInCall;
// Outputs full query to console for copy/paste into chainquery (debugging) // Outputs full query to console for copy/paste into chainquery (debugging)
console.log(query); console.log(query);
getChainqueryConnection().query(query, function (err, results, fields) { rp(`https://chainquery.lbry.com/api/sql?query=` + query)
if (err) { .then(function (htmlString) {
console.error(err); resolve(htmlString);
})
.catch(function (err) {
logErrorToSlack('[Importer] Error getting updated claims. ' + err); logErrorToSlack('[Importer] Error getting updated claims. ' + err);
return reject(err); reject(err);
} });
let claims = [];
for (let i = 0; i < results.length; i++) {
let r = results[i];
let value = null;
try {
value = JSON.parse(r.value);
} catch (e) {
console.error(e);
console.error(r.value);
}
claims.push({
id : r.id,
name : r.name,
channel : r.channel,
channel_claim_id : r.channel_id,
bid_state : r.bid_state,
effective_amount : r.effective_amount,
certificate_amount: r.certificate_amount,
claimId : r.claimId,
value : value,
});
}
resolve(claims);
});
}); });
} }