Merge pull request #636 from lbryio/fix-publish-race-conditions

Fix publish race conditions
This commit is contained in:
Jack Robison 2017-05-11 15:21:41 -04:00 committed by GitHub
commit 4bd3d49827
9 changed files with 106 additions and 45 deletions

View file

@ -9,7 +9,7 @@ at anytime.
## [Unreleased]
### Added
*
* Add decorator to support queueing api calls
*
### Changed
@ -21,9 +21,10 @@ at anytime.
* Cache claims in wallet storage for use looking claims up by id or outpoint
* Try to use cached claim info for `file_list`
* Convert wallet storage to inlinecallbacks
* Improve internal name_metadata sqlite table
### Fixed
*
* Fix race condition in publish that resulted in claims being rejected when making many publishes concurrently
*
### Deprecated

View file

@ -7,6 +7,7 @@ import time
from twisted.internet import threads, reactor, defer, task
from twisted.python.failure import Failure
from twisted.enterprise import adbapi
from collections import defaultdict, deque
from zope.interface import implements
from decimal import Decimal
@ -111,9 +112,6 @@ class MetaDataStorage(object):
def load(self):
return defer.succeed(True)
def clean_bad_records(self):
return defer.succeed(True)
def save_name_metadata(self, name, claim_outpoint, sd_hash):
return defer.succeed(True)
@ -226,11 +224,11 @@ class SqliteStorage(MetaDataStorage):
def load(self):
def create_tables(transaction):
transaction.execute("create table if not exists name_metadata (" +
" name text, " +
" txid text, " +
" n integer, " +
" sd_hash text)")
transaction.execute("CREATE TABLE IF NOT EXISTS name_metadata (" +
" name TEXT UNIQUE NOT NULL, " +
" txid TEXT NOT NULL, " +
" n INTEGER NOT NULL, " +
" sd_hash TEXT NOT NULL)")
transaction.execute("create table if not exists claim_ids (" +
" claimId text, " +
" name text, " +
@ -257,22 +255,12 @@ class SqliteStorage(MetaDataStorage):
return self.db.runInteraction(create_tables)
@rerun_if_locked
@defer.inlineCallbacks
def clean_bad_records(self):
yield self.db.runQuery("DELETE FROM name_metadata WHERE LENGTH(txid) > 64 OR txid IS NULL")
defer.returnValue(None)
@rerun_if_locked
@defer.inlineCallbacks
def save_name_metadata(self, name, claim_outpoint, sd_hash):
# TODO: refactor the 'name_metadata' and 'claim_ids' tables to not be terrible
# TODO: refactor the 'claim_ids' table to not be terrible
txid, nout = claim_outpoint['txid'], claim_outpoint['nout']
record_exists = yield self.db.runQuery("SELECT COUNT(*) FROM name_metadata "
"WHERE name=? AND txid=? AND n=?",
(name, txid, nout))
if not record_exists[0][0]:
yield self.db.runOperation("INSERT INTO name_metadata VALUES (?, ?, ?, ?)",
yield self.db.runOperation("INSERT OR REPLACE INTO name_metadata VALUES (?, ?, ?, ?)",
(name, txid, nout, sd_hash))
defer.returnValue(None)
@ -427,14 +415,10 @@ class Wallet(object):
return True
d = self._storage.load()
d.addCallback(lambda _: self._clean_bad_records())
d.addCallback(lambda _: self._start())
d.addCallback(lambda _: start_manage())
return d
def _clean_bad_records(self):
self._storage.clean_bad_records()
def _save_name_metadata(self, name, claim_outpoint, sd_hash):
return self._storage.save_name_metadata(name, claim_outpoint, sd_hash)
@ -1363,7 +1347,7 @@ class LBRYumWallet(Wallet):
@defer.inlineCallbacks
def _broadcast_transaction(self, raw_tx):
txid = yield self._run_cmd_as_defer_to_thread('broadcast', raw_tx)
txid = yield self._run_cmd_as_defer_succeed('broadcast', raw_tx)
log.info("Broadcast tx: %s", txid)
if len(txid) != 64:
raise Exception("Transaction rejected. Raw tx: {}".format(raw_tx))
@ -1391,7 +1375,7 @@ class LBRYumWallet(Wallet):
return self._run_cmd_as_defer_to_thread('getvalueforuri', uri)
def _claim_certificate(self, name, amount):
return self._run_cmd_as_defer_to_thread('claimcertificate', name, amount)
return self._run_cmd_as_defer_succeed('claimcertificate', name, amount)
def _get_certificate_claims(self):
return self._run_cmd_as_defer_succeed('getcertificateclaims')

View file

@ -7,6 +7,9 @@ def migrate_db(db_dir, start, end):
if current == 1:
from lbrynet.db_migrator.migrate1to2 import do_migration
do_migration(db_dir)
elif current == 2:
from lbrynet.db_migrator.migrate2to3 import do_migration
do_migration(db_dir)
else:
raise Exception(
"DB migration of version {} to {} is not available".format(current, current+1))

View file

@ -0,0 +1,42 @@
import sqlite3
import os
import logging
log = logging.getLogger(__name__)
def do_migration(db_dir):
log.info("Doing the migration")
migrate_blockchainname_db(db_dir)
log.info("Migration succeeded")
def migrate_blockchainname_db(db_dir):
blockchainname_db = os.path.join(db_dir, "blockchainname.db")
# skip migration on fresh installs
if not os.path.isfile(blockchainname_db):
return
db_file = sqlite3.connect(blockchainname_db)
file_cursor = db_file.cursor()
tables = file_cursor.execute("SELECT tbl_name FROM sqlite_master "
"WHERE type='table'").fetchall()
if 'tmp_name_metadata_table' in tables and 'name_metadata' not in tables:
file_cursor.execute("ALTER TABLE tmp_name_metadata_table RENAME TO name_metadata")
else:
file_cursor.executescript(
"CREATE TABLE IF NOT EXISTS tmp_name_metadata_table "
" (name TEXT UNIQUE NOT NULL, "
" txid TEXT NOT NULL, "
" n INTEGER NOT NULL, "
" sd_hash TEXT NOT NULL); "
"INSERT OR IGNORE INTO tmp_name_metadata_table "
" (name, txid, n, sd_hash) "
" SELECT name, txid, n, sd_hash FROM name_metadata; "
"DROP TABLE name_metadata; "
"ALTER TABLE tmp_name_metadata_table RENAME TO name_metadata;"
)
db_file.commit()
db_file.close()

View file

@ -240,15 +240,10 @@ class DBEncryptedFileMetadataManager(object):
@rerun_if_locked
def _save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash):
log.info("Saving sd blob hash %s to stream hash %s", str(sd_blob_hash), str(stream_hash))
d = self.db_conn.runQuery("insert into lbry_file_descriptors values (?, ?)",
(sd_blob_hash, stream_hash))
def ignore_duplicate(err):
err.trap(sqlite3.IntegrityError)
log.info("sd blob hash already known")
d.addErrback(ignore_duplicate)
d = self.db_conn.runOperation("insert or ignore into lbry_file_descriptors values (?, ?)",
(sd_blob_hash, stream_hash))
d.addCallback(lambda _: log.info("Saved sd blob hash %s to stream hash %s",
str(sd_blob_hash), str(stream_hash)))
return d
@rerun_if_locked

View file

@ -108,12 +108,16 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
num_blobs_known, status))
@defer.inlineCallbacks
def load_file_attributes(self):
sd_hash = yield self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)
if sd_hash:
self.sd_hash = sd_hash[0]
def load_file_attributes(self, sd_hash=None):
if not sd_hash:
sd_hash = yield self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)
if sd_hash:
self.sd_hash = sd_hash[0]
else:
raise NoSuchStreamHash(self.stream_hash)
else:
raise NoSuchStreamHash(self.stream_hash)
self.sd_hash = sd_hash
stream_metadata = yield self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash)
if stream_metadata:
name, txid, nout = stream_metadata

View file

@ -207,7 +207,7 @@ class Daemon(AuthJSONRPCServer):
self.platform = None
self.first_run = None
self.log_file = conf.settings.get_log_filename()
self.current_db_revision = 2
self.current_db_revision = 3
self.db_revision_file = conf.settings.get_db_revision_filename()
self.session = None
self.uploaded_temp_files = []
@ -1681,6 +1681,7 @@ class Daemon(AuthJSONRPCServer):
defer.returnValue(cost)
@AuthJSONRPCServer.auth_required
@AuthJSONRPCServer.queued
@defer.inlineCallbacks
def jsonrpc_channel_new(self, channel_name, amount):
"""
@ -1735,6 +1736,7 @@ class Daemon(AuthJSONRPCServer):
defer.returnValue(response)
@AuthJSONRPCServer.auth_required
@AuthJSONRPCServer.queued
@defer.inlineCallbacks
def jsonrpc_publish(self, name, bid, metadata=None, file_path=None, fee=None, title=None,
description=None, author=None, language=None, license=None,

View file

@ -41,7 +41,7 @@ class Publisher(object):
claim_out = yield self.make_claim(name, bid, claim_dict)
self.lbry_file.completed = True
yield self.lbry_file.load_file_attributes()
yield self.lbry_file.load_file_attributes(sd_hash)
yield self.lbry_file.save_status()
defer.returnValue(claim_out)

View file

@ -114,6 +114,8 @@ class AuthorizedBase(object):
def __init__(self):
self.authorized_functions = []
self.callable_methods = {}
self._call_lock = {}
self._queued_methods = []
for methodname in dir(self):
if methodname.startswith("jsonrpc_"):
@ -121,12 +123,19 @@ class AuthorizedBase(object):
self.callable_methods.update({methodname.split("jsonrpc_")[1]: method})
if hasattr(method, '_auth_required'):
self.authorized_functions.append(methodname.split("jsonrpc_")[1])
if hasattr(method, '_queued'):
self._queued_methods.append(methodname.split("jsonrpc_")[1])
@staticmethod
def auth_required(f):
f._auth_required = True
return f
@staticmethod
def queued(f):
f._queued = True
return f
class AuthJSONRPCServer(AuthorizedBase):
"""Authorized JSONRPC server used as the base class for the LBRY API
@ -254,6 +263,7 @@ class AuthJSONRPCServer(AuthorizedBase):
id_ = None
try:
function_name = parsed.get('method')
is_queued = function_name in self._queued_methods
args = parsed.get('params', {})
id_ = parsed.get('id', None)
token = parsed.pop('hmac', None)
@ -324,7 +334,27 @@ class AuthJSONRPCServer(AuthorizedBase):
)
return server.NOT_DONE_YET
d = defer.maybeDeferred(function, **args_dict)
if is_queued:
d_lock = self._call_lock.get(function_name, False)
if not d_lock:
d = defer.maybeDeferred(function, **args_dict)
self._call_lock[function_name] = finished_deferred
def _del_lock(*args):
if function_name in self._call_lock:
del self._call_lock[function_name]
if args:
return args
finished_deferred.addCallback(_del_lock)
else:
log.info("queued %s", function_name)
d = d_lock
d.addBoth(lambda _: log.info("running %s from queue", function_name))
d.addCallback(lambda _: defer.maybeDeferred(function, **args_dict))
else:
d = defer.maybeDeferred(function, **args_dict)
# finished_deferred will callback when the request is finished
# and errback if something went wrong. If the errback is