lbry-sdk/lbry/blockchain/sync.py

112 lines
3.3 KiB
Python
Raw Normal View History

2020-02-14 12:19:55 -05:00
import os
2020-02-27 23:52:18 -05:00
import asyncio
2020-02-14 12:19:55 -05:00
import logging
2020-02-27 23:52:18 -05:00
from threading import Thread
from multiprocessing import Queue, Event
from concurrent import futures
2020-04-12 11:59:00 -04:00
from lbry.wallet.stream import StreamController, EventQueuePublisher
2020-04-11 20:15:04 -04:00
from lbry.db import Database
2020-02-14 12:19:55 -05:00
from .lbrycrd import Lbrycrd
2020-02-27 23:52:18 -05:00
from . import worker
2020-02-14 12:19:55 -05:00
log = logging.getLogger(__name__)
2020-02-27 23:52:18 -05:00
class ProgressMonitorThread(Thread):
STOP = 'stop'
FORMAT = '{l_bar}{bar}| {n_fmt:>6}/{total_fmt:>7} [{elapsed}<{remaining:>5}, {rate_fmt:>15}]'
def __init__(self, state: dict, queue: Queue, stream_controller: StreamController):
super().__init__()
self.state = state
self.queue = queue
self.stream_controller = stream_controller
self.loop = asyncio.get_event_loop()
def run(self):
asyncio.set_event_loop(self.loop)
while True:
msg = self.queue.get()
if msg == self.STOP:
return
self.stream_controller.add(msg)
def shutdown(self):
self.queue.put(self.STOP)
self.join()
def __enter__(self):
self.start()
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()
2020-04-11 20:15:04 -04:00
class BlockchainSync:
2020-02-14 12:19:55 -05:00
2020-04-11 20:15:04 -04:00
def __init__(self, chain: Lbrycrd, db: Database, use_process_pool=False):
2020-02-14 12:19:55 -05:00
self.chain = chain
2020-04-11 20:15:04 -04:00
self.db = db
2020-02-27 23:52:18 -05:00
self.use_process_pool = use_process_pool
self._on_progress_controller = StreamController()
self.on_progress = self._on_progress_controller.stream
2020-02-14 12:19:55 -05:00
2020-02-27 23:52:18 -05:00
def get_worker_pool(self, queue, full_stop) -> futures.Executor:
args = dict(
initializer=worker.initializer,
2020-04-11 20:15:04 -04:00
initargs=(self.chain.data_dir, self.chain.regtest, self.db.db_path, queue, full_stop)
2020-02-27 23:52:18 -05:00
)
if not self.use_process_pool:
return futures.ThreadPoolExecutor(max_workers=1, **args)
return futures.ProcessPoolExecutor(max_workers=max(os.cpu_count()-1, 4), **args)
async def load_blocks(self):
jobs = []
queue, full_stop = Queue(), Event()
executor = self.get_worker_pool(queue, full_stop)
2020-04-11 20:15:04 -04:00
files = list(await self.chain.get_block_files_not_synced())
2020-02-27 23:52:18 -05:00
state = {
file.file_number: {
'status': worker.PENDING,
'done_txs': 0,
'total_txs': file.txs,
'done_blocks': 0,
'total_blocks': file.blocks,
} for file in files
}
2020-04-12 11:59:00 -04:00
progress = EventQueuePublisher(queue, self._on_progress_controller)
2020-02-27 23:52:18 -05:00
progress.start()
def cancel_all_the_things():
for job in jobs:
job.cancel()
full_stop.set()
for job in jobs:
exception = job.exception()
if exception is not None:
2020-04-12 11:59:00 -04:00
log.exception(exception)
2020-02-27 23:52:18 -05:00
raise exception
try:
for file in files:
jobs.append(executor.submit(worker.process_block_file, file.file_number))
done, not_done = await asyncio.get_event_loop().run_in_executor(
None, futures.wait, jobs, None, futures.FIRST_EXCEPTION
)
if not_done:
cancel_all_the_things()
except asyncio.CancelledError:
cancel_all_the_things()
raise
finally:
2020-04-12 11:59:00 -04:00
progress.stop()
2020-02-27 23:52:18 -05:00
executor.shutdown()