call update_reflected_stream upon reflector finishing

This commit is contained in:
Jack Robison 2019-01-31 12:30:31 -05:00
parent 2e978c00b2
commit 3589cc9977
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 11 additions and 2 deletions

View file

@ -2061,6 +2061,7 @@ class Daemon(metaclass=JSONRPCServerType):
await self.storage.save_content_claim( await self.storage.save_content_claim(
stream_hash, tx.outputs[0].id stream_hash, tx.outputs[0].id
) )
await self.analytics_manager.send_claim_action('publish') await self.analytics_manager.send_claim_action('publish')
nout = 0 nout = 0
txo = tx.outputs[nout] txo = tx.outputs[nout]
@ -2794,10 +2795,14 @@ class Daemon(metaclass=JSONRPCServerType):
port = int(port) port = int(port)
else: else:
server, port = random.choice(self.conf.reflector_servers) server, port = random.choice(self.conf.reflector_servers)
return await asyncio.gather(*[ reflected = await asyncio.gather(*[
stream.upload_to_reflector(server, port) stream.upload_to_reflector(server, port)
for stream in self.stream_manager.get_filtered_streams(**kwargs) for stream in self.stream_manager.get_filtered_streams(**kwargs)
]) ])
total = []
for reflected_for_stream in reflected:
total.extend(reflected_for_stream)
return total
@requires(DHT_COMPONENT) @requires(DHT_COMPONENT)
async def jsonrpc_peer_ping(self, node_id, address, port): async def jsonrpc_peer_ping(self, node_id, address, port):

View file

@ -3,6 +3,7 @@ import sqlite3
import typing import typing
import asyncio import asyncio
import binascii import binascii
import time
from torba.client.basedatabase import SQLiteMixin from torba.client.basedatabase import SQLiteMixin
from lbrynet.conf import Config from lbrynet.conf import Config
from lbrynet.extras.wallet.dewies import dewies_to_lbc, lbc_to_dewies from lbrynet.extras.wallet.dewies import dewies_to_lbc, lbc_to_dewies
@ -682,7 +683,7 @@ class SQLiteStorage(SQLiteMixin):
if success: if success:
return self.db.execute( return self.db.execute(
"insert or replace into reflected_stream values (?, ?, ?)", "insert or replace into reflected_stream values (?, ?, ?)",
(sd_hash, reflector_address, self.loop.time()) (sd_hash, reflector_address, time.time())
) )
return self.db.execute( return self.db.execute(
"delete from reflected_stream where sd_hash=? and reflector_address=?", "delete from reflected_stream where sd_hash=? and reflector_address=?",

View file

@ -179,6 +179,8 @@ class ManagedStream:
if not sent_sd and not needed: if not sent_sd and not needed:
if not self.fully_reflected.is_set(): if not self.fully_reflected.is_set():
self.fully_reflected.set() self.fully_reflected.set()
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
return []
except (asyncio.CancelledError, asyncio.TimeoutError, ValueError): except (asyncio.CancelledError, asyncio.TimeoutError, ValueError):
if protocol.transport: if protocol.transport:
protocol.transport.close() protocol.transport.close()
@ -195,4 +197,5 @@ class ManagedStream:
protocol.transport.close() protocol.transport.close()
if not self.fully_reflected.is_set(): if not self.fully_reflected.is_set():
self.fully_reflected.set() self.fully_reflected.set()
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
return sent return sent