diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index bb13f3416..364cc1f79 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -2900,17 +2900,15 @@ class Daemon(AuthJSONRPCServer): return d @defer.inlineCallbacks - def jsonrpc_blob_announce(self, blob_hash=None, stream_hash=None, sd_hash=None, announce_all=None): + def jsonrpc_blob_announce(self, blob_hash=None, stream_hash=None, sd_hash=None): """ Announce blobs to the DHT Usage: blob_announce [ | --blob_hash=] [ | --stream_hash=] | [ | --sd_hash=] - [--announce_all] Options: - --announce_all : (bool) announce all the blobs possessed by user --blob_hash= : (str) announce a blob, specified by blob_hash --stream_hash= : (str) announce all blobs associated with stream_hash @@ -2921,41 +2919,22 @@ class Daemon(AuthJSONRPCServer): (bool) true if successful """ - if announce_all: - yield self.session.blob_manager.immediate_announce_all_blobs() + blob_hashes = [] + if blob_hash: + blob_hashes.append(blob_hash) + elif stream_hash or sd_hash: + if sd_hash and stream_hash: + raise Exception("either the sd hash or the stream hash should be provided, not both") + if sd_hash: + stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash) + blobs = yield self.storage.get_blobs_for_stream(stream_hash, only_completed=True) + blob_hashes.extend(blob.blob_hash for blob in blobs if blob.blob_hash is not None) else: - blob_hashes = [] - if blob_hash: - blob_hashes.append(blob_hash) - elif stream_hash or sd_hash: - if sd_hash and stream_hash: - raise Exception("either the sd hash or the stream hash should be provided, not both") - if sd_hash: - stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash) - blobs = yield self.storage.get_blobs_for_stream(stream_hash, only_completed=True) - blob_hashes.extend([blob.blob_hash for blob in blobs if blob.blob_hash is not None]) - else: - raise Exception('single argument must be specified') - yield self.session.blob_manager.immediate_announce(blob_hashes) + raise Exception('single argument must be specified') + yield self.storage.should_single_announce_blobs(blob_hashes, immediate=True) response = yield self._render_response(True) defer.returnValue(response) - @AuthJSONRPCServer.deprecated("blob_announce") - def jsonrpc_blob_announce_all(self): - """ - Announce all blobs to the DHT - - Usage: - blob_announce_all - - Options: - None - - Returns: - (str) Success/fail message - """ - return self.jsonrpc_blob_announce(announce_all=True) - @defer.inlineCallbacks def jsonrpc_file_reflect(self, **kwargs): """ diff --git a/lbrynet/database/migrator/migrate6to7.py b/lbrynet/database/migrator/migrate6to7.py index 536afc256..eff68eca0 100644 --- a/lbrynet/database/migrator/migrate6to7.py +++ b/lbrynet/database/migrator/migrate6to7.py @@ -7,6 +7,7 @@ def do_migration(db_dir): connection = sqlite3.connect(db_path) cursor = connection.cursor() cursor.executescript("alter table blob add last_announced_time integer;") + cursor.executescript("alter table blob add single_announce integer;") cursor.execute("update blob set next_announce_time=0") connection.commit() connection.close() diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index 61166f148..e3bdd649c 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -105,7 +105,8 @@ class SQLiteStorage(object): next_announce_time integer not null, should_announce integer not null default 0, status text not null, - last_announced_time integer + last_announced_time integer, + single_announce integer ); create table if not exists stream ( @@ -233,8 +234,8 @@ class SQLiteStorage(object): status = yield self.get_blob_status(blob_hash) if status is None: status = "pending" - yield self.db.runOperation("insert into blob values (?, ?, ?, ?, ?, ?)", - (blob_hash, length, 0, 0, status, 0)) + yield self.db.runOperation("insert into blob values (?, ?, ?, ?, ?, ?, ?)", + (blob_hash, length, 0, 0, status, 0, 0)) defer.returnValue(status) def should_announce(self, blob_hash): @@ -254,17 +255,33 @@ class SQLiteStorage(object): def update_last_announced_blob(self, blob_hash, last_announced): return self.db.runOperation( - "update blob set next_announce_time=?, last_announced_time=? where blob_hash=?", + "update blob set next_announce_time=?, last_announced_time=?, single_announce=0 where blob_hash=?", (int(last_announced + (dataExpireTimeout / 2)), int(last_announced), blob_hash) ) + def should_single_announce_blobs(self, blob_hashes, immediate=False): + def set_single_announce(transaction): + now = self.clock.seconds() + for blob_hash in blob_hashes: + if immediate: + transaction.execute( + "update blob set single_announce=1, next_announce_time=? " + "where blob_hash=? and status='finished'", (int(now), blob_hash) + ) + else: + transaction.execute( + "update blob set single_announce=1 where blob_hash=? and status='finished'", (blob_hash, ) + ) + return self.db.runInteraction(set_single_announce) + def get_blobs_to_announce(self): def get_and_update(transaction): timestamp = self.clock.seconds() if conf.settings['announce_head_blobs_only']: r = transaction.execute( "select blob_hash from blob " - "where blob_hash is not null and should_announce=1 and next_announce_time