forked from LBRYCommunity/lbry-sdk
Merge pull request #2093 from lbryio/fix_announce_on_storage
Refactor announcer + fixups for last db migrator code
This commit is contained in:
commit
84381ff76c
4 changed files with 37 additions and 46 deletions
|
@ -13,55 +13,44 @@ class BlobAnnouncer:
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.node = node
|
self.node = node
|
||||||
self.storage = storage
|
self.storage = storage
|
||||||
self.pending_call: asyncio.Handle = None
|
|
||||||
self.announce_task: asyncio.Task = None
|
self.announce_task: asyncio.Task = None
|
||||||
self.running = False
|
|
||||||
self.announce_queue: typing.List[str] = []
|
self.announce_queue: typing.List[str] = []
|
||||||
|
|
||||||
async def _announce(self, batch_size: typing.Optional[int] = 10):
|
async def _submit_announcement(self, blob_hash):
|
||||||
if not batch_size:
|
try:
|
||||||
return
|
peers = len(await self.node.announce_blob(blob_hash))
|
||||||
if not self.node.joined.is_set():
|
if peers > 4:
|
||||||
await self.node.joined.wait()
|
return blob_hash
|
||||||
blob_hashes = await self.storage.get_blobs_to_announce()
|
else:
|
||||||
if blob_hashes:
|
log.warning("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
|
||||||
self.announce_queue.extend(blob_hashes)
|
except Exception as err:
|
||||||
log.info("%i blobs to announce", len(blob_hashes))
|
if isinstance(err, asyncio.CancelledError):
|
||||||
batch = []
|
raise err
|
||||||
while len(self.announce_queue):
|
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
|
||||||
cnt = 0
|
|
||||||
announced = []
|
|
||||||
while self.announce_queue and cnt < batch_size:
|
|
||||||
blob_hash = self.announce_queue.pop()
|
|
||||||
announced.append(blob_hash)
|
|
||||||
batch.append(self.node.announce_blob(blob_hash))
|
|
||||||
cnt += 1
|
|
||||||
to_await = []
|
|
||||||
while batch:
|
|
||||||
to_await.append(batch.pop())
|
|
||||||
if to_await:
|
|
||||||
await asyncio.gather(*tuple(to_await), loop=self.loop)
|
|
||||||
await self.storage.update_last_announced_blobs(announced)
|
|
||||||
log.info("announced %i blobs", len(announced))
|
|
||||||
if self.running:
|
|
||||||
self.pending_call = self.loop.call_later(60, self.announce, batch_size)
|
|
||||||
|
|
||||||
def announce(self, batch_size: typing.Optional[int] = 10):
|
|
||||||
self.announce_task = self.loop.create_task(self._announce(batch_size))
|
async def _announce(self, batch_size: typing.Optional[int] = 10):
|
||||||
|
while batch_size:
|
||||||
|
if not self.node.joined.is_set():
|
||||||
|
await self.node.joined.wait()
|
||||||
|
self.announce_queue.extend(await self.storage.get_blobs_to_announce())
|
||||||
|
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
|
||||||
|
while len(self.announce_queue):
|
||||||
|
log.info("%i blobs to announce", len(self.announce_queue))
|
||||||
|
announced = await asyncio.gather(*[
|
||||||
|
self._submit_announcement(
|
||||||
|
self.announce_queue.pop()) for _ in range(batch_size) if self.announce_queue
|
||||||
|
], loop=self.loop)
|
||||||
|
announced = list(filter(None, announced))
|
||||||
|
if announced:
|
||||||
|
await self.storage.update_last_announced_blobs(announced)
|
||||||
|
log.info("announced %i blobs", len(announced))
|
||||||
|
await asyncio.sleep(60)
|
||||||
|
|
||||||
def start(self, batch_size: typing.Optional[int] = 10):
|
def start(self, batch_size: typing.Optional[int] = 10):
|
||||||
if self.running:
|
assert not self.announce_task or self.announce_task.done(), "already running"
|
||||||
raise Exception("already running")
|
self.announce_task = self.loop.create_task(self._announce(batch_size))
|
||||||
self.running = True
|
|
||||||
self.announce(batch_size)
|
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self.running = False
|
if self.announce_task and not self.announce_task.done():
|
||||||
if self.pending_call:
|
self.announce_task.cancel()
|
||||||
if not self.pending_call.cancelled():
|
|
||||||
self.pending_call.cancel()
|
|
||||||
self.pending_call = None
|
|
||||||
if self.announce_task:
|
|
||||||
if not (self.announce_task.done() or self.announce_task.cancelled()):
|
|
||||||
self.announce_task.cancel()
|
|
||||||
self.announce_task = None
|
|
||||||
|
|
|
@ -22,6 +22,8 @@ def migrate_db(conf, start, end):
|
||||||
from .migrate7to8 import do_migration
|
from .migrate7to8 import do_migration
|
||||||
elif current == 8:
|
elif current == 8:
|
||||||
from .migrate8to9 import do_migration
|
from .migrate8to9 import do_migration
|
||||||
|
elif current == 9:
|
||||||
|
from .migrate9to10 import do_migration
|
||||||
else:
|
else:
|
||||||
raise Exception("DB migration of version {} to {} is not available".format(current,
|
raise Exception("DB migration of version {} to {} is not available".format(current,
|
||||||
current+1))
|
current+1))
|
||||||
|
|
|
@ -8,7 +8,7 @@ def do_migration(conf):
|
||||||
cursor = connection.cursor()
|
cursor = connection.cursor()
|
||||||
|
|
||||||
query = "select stream_hash, sd_hash from main.stream"
|
query = "select stream_hash, sd_hash from main.stream"
|
||||||
for stream_hash, sd_hash in cursor.execute(query):
|
for stream_hash, sd_hash in cursor.execute(query).fetchall():
|
||||||
head_blob_hash = cursor.execute(
|
head_blob_hash = cursor.execute(
|
||||||
"select blob_hash from stream_blob where position = 0 and stream_hash = ?",
|
"select blob_hash from stream_blob where position = 0 and stream_hash = ?",
|
||||||
(stream_hash,)
|
(stream_hash,)
|
||||||
|
|
|
@ -85,7 +85,7 @@ class TestBlobAnnouncer(AsyncioTestCase):
|
||||||
)
|
)
|
||||||
to_announce = await self.storage.get_blobs_to_announce()
|
to_announce = await self.storage.get_blobs_to_announce()
|
||||||
self.assertEqual(2, len(to_announce))
|
self.assertEqual(2, len(to_announce))
|
||||||
self.blob_announcer.start()
|
self.blob_announcer.start(batch_size=1) # so it covers batching logic
|
||||||
await self.advance(61.0)
|
await self.advance(61.0)
|
||||||
to_announce = await self.storage.get_blobs_to_announce()
|
to_announce = await self.storage.get_blobs_to_announce()
|
||||||
self.assertEqual(0, len(to_announce))
|
self.assertEqual(0, len(to_announce))
|
||||||
|
|
Loading…
Reference in a new issue