merged some old stashed code

This commit is contained in:
Lex Berezhny 2020-04-12 11:59:00 -04:00
parent 10c262a095
commit 777c6342f8
4 changed files with 40 additions and 10 deletions

View file

@ -5,7 +5,7 @@ from threading import Thread
from multiprocessing import Queue, Event from multiprocessing import Queue, Event
from concurrent import futures from concurrent import futures
from lbry.wallet.stream import StreamController from lbry.wallet.stream import StreamController, EventQueuePublisher
from lbry.db import Database from lbry.db import Database
from .lbrycrd import Lbrycrd from .lbrycrd import Lbrycrd
@ -64,9 +64,6 @@ class BlockchainSync:
return futures.ThreadPoolExecutor(max_workers=1, **args) return futures.ThreadPoolExecutor(max_workers=1, **args)
return futures.ProcessPoolExecutor(max_workers=max(os.cpu_count()-1, 4), **args) return futures.ProcessPoolExecutor(max_workers=max(os.cpu_count()-1, 4), **args)
def get_progress_monitor(self, state, queue) -> ProgressMonitorThread:
return ProgressMonitorThread(state, queue, self._on_progress_controller)
async def load_blocks(self): async def load_blocks(self):
jobs = [] jobs = []
queue, full_stop = Queue(), Event() queue, full_stop = Queue(), Event()
@ -81,7 +78,7 @@ class BlockchainSync:
'total_blocks': file.blocks, 'total_blocks': file.blocks,
} for file in files } for file in files
} }
progress = self.get_progress_monitor(state, queue) progress = EventQueuePublisher(queue, self._on_progress_controller)
progress.start() progress.start()
def cancel_all_the_things(): def cancel_all_the_things():
@ -91,6 +88,7 @@ class BlockchainSync:
for job in jobs: for job in jobs:
exception = job.exception() exception = job.exception()
if exception is not None: if exception is not None:
log.exception(exception)
raise exception raise exception
try: try:
@ -109,5 +107,5 @@ class BlockchainSync:
raise raise
finally: finally:
progress.shutdown() progress.stop()
executor.shutdown() executor.shutdown()

View file

@ -2,7 +2,7 @@
from sqlalchemy import ( from sqlalchemy import (
MetaData, Table, Column, ForeignKey, MetaData, Table, Column, ForeignKey,
LargeBinary, Text, SmallInteger, Integer, Boolean LargeBinary, Text, SmallInteger, Integer, BigInteger, Boolean
) )
@ -63,7 +63,7 @@ TXO = Table(
Column('txo_hash', LargeBinary, primary_key=True), Column('txo_hash', LargeBinary, primary_key=True),
Column('address', Text), Column('address', Text),
Column('position', Integer), Column('position', Integer),
Column('amount', Integer), Column('amount', BigInteger),
Column('script', LargeBinary), Column('script', LargeBinary),
Column('is_reserved', Boolean, server_default='0'), Column('is_reserved', Boolean, server_default='0'),
Column('txo_type', Integer, server_default='0'), Column('txo_type', Integer, server_default='0'),

View file

@ -1,4 +1,6 @@
import asyncio import asyncio
import threading
import multiprocessing
class BroadcastSubscription: class BroadcastSubscription:
@ -64,7 +66,7 @@ class StreamController:
next_sub = next_sub._next next_sub = next_sub._next
yield subscription yield subscription
def _notify_and_ensure_future(self, notify): def _notify_and_create_task(self, notify):
tasks = [] tasks = []
for subscription in self._iterate_subscriptions: for subscription in self._iterate_subscriptions:
maybe_coroutine = notify(subscription) maybe_coroutine = notify(subscription)
@ -80,7 +82,7 @@ class StreamController:
def add(self, event): def add(self, event):
skip = self._merge_repeated and event == self._last_event skip = self._merge_repeated and event == self._last_event
self._last_event = event self._last_event = event
return self._notify_and_ensure_future( return self._notify_and_create_task(
lambda subscription: None if skip else subscription._add(event) lambda subscription: None if skip else subscription._add(event)
) )
@ -159,3 +161,31 @@ class Stream:
def _cancel_and_error(subscription: BroadcastSubscription, future: asyncio.Future, exception): def _cancel_and_error(subscription: BroadcastSubscription, future: asyncio.Future, exception):
subscription.cancel() subscription.cancel()
future.set_exception(exception) future.set_exception(exception)
class EventQueuePublisher(threading.Thread):
STOP = object()
def __init__(self, queue: multiprocessing.Queue, stream_controller: StreamController):
super().__init__()
self.queue = queue
self.stream_controller = stream_controller
self.loop = asyncio.get_running_loop()
def run(self):
while True:
msg = self.queue.get()
if msg == self.STOP:
return
self.loop.call_soon_threadsafe(self.stream_controller.add, msg)
def stop(self):
self.queue.put(self.STOP)
self.join()
def __enter__(self):
self.start()
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()

View file

@ -2,6 +2,7 @@ import os
import time import time
import asyncio import asyncio
import logging import logging
from unittest import skip
from binascii import unhexlify, hexlify from binascii import unhexlify, hexlify
from random import choice from random import choice
@ -19,6 +20,7 @@ from lbry.wallet.bcd_data_stream import BCDataStream
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@skip
class TestBlockchain(AsyncioTestCase): class TestBlockchain(AsyncioTestCase):
async def asyncSetUp(self): async def asyncSetUp(self):