Merge pull request #339 from lbryio/all-prs

All The PRs
This commit is contained in:
Job Evers‐Meltzer 2016-12-15 19:42:35 -06:00 committed by GitHub
commit 3dde7af576
74 changed files with 923 additions and 1216 deletions

View file

@ -3,7 +3,7 @@ from lbrynet.core import looping_call_manager
from twisted.internet import defer
from twisted.internet import task
from lbrynet.core.Platform import get_platform
from lbrynet.core.system_info import get_platform
from lbrynet.conf import settings
import constants

View file

@ -182,6 +182,7 @@ ENVIRONMENT = Env(
# all of your credits.
API_INTERFACE=(str, "localhost"),
bittrex_feed=(str, "https://bittrex.com/api/v1.1/public/getmarkethistory"),
reflector_reupload=(bool, True),
)

View file

@ -1,4 +1,6 @@
import logging
import random
import time
from twisted.internet import defer
from twisted.internet.task import LoopingCall
@ -27,7 +29,7 @@ class BlobAvailabilityTracker(object):
def start(self):
log.info("Starting %s", self)
self._check_popular.start(30)
self._check_mine.start(120)
self._check_mine.start(600)
def stop(self):
log.info("Stopping %s", self)
@ -76,7 +78,8 @@ class BlobAvailabilityTracker(object):
def _update_most_popular(self):
d = self._get_most_popular()
d.addCallback(lambda _: self._get_mean_peers())
d.addCallback(lambda _: self._set_mean_peers())
def _update_mine(self):
def _get_peers(blobs):
@ -85,11 +88,26 @@ class BlobAvailabilityTracker(object):
dl.append(self._update_peers_for_blob(hash))
return defer.DeferredList(dl)
d = self._blob_manager.get_all_verified_blobs()
d.addCallback(_get_peers)
d.addCallback(lambda _: self._get_mean_peers())
def sample(blobs):
return random.sample(blobs, 100)
def _get_mean_peers(self):
start = time.time()
log.debug('==> Updating the peers for my blobs')
d = self._blob_manager.get_all_verified_blobs()
# as far as I can tell, this only is used to set _last_mean_availability
# which... seems like a very expensive operation for such little payoff.
# so taking a sample should get about the same effect as querying the entire
# list of blobs
d.addCallback(sample)
d.addCallback(_get_peers)
d.addCallback(lambda _: self._set_mean_peers())
d.addCallback(lambda _: log.debug('<== Done updating peers for my blobs. Took %s seconds',
time.time() - start))
# although unused, need to return or else the looping call
# could overrun on a previous call
return d
def _set_mean_peers(self):
num_peers = [len(self.availability[blob]) for blob in self.availability]
mean = Decimal(sum(num_peers)) / Decimal(max(1, len(num_peers)))
self._last_mean_availability = mean

View file

@ -9,10 +9,10 @@ from twisted.enterprise import adbapi
from lbrynet.core.HashBlob import BlobFile, TempBlob, BlobFileCreator, TempBlobCreator
from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier
from lbrynet.core.utils import is_valid_blobhash
from lbrynet.core.cryptoutils import get_lbry_hash_obj
from lbrynet.core.Error import NoSuchBlobError
from lbrynet.core.sqlite_helpers import rerun_if_locked
log = logging.getLogger(__name__)
@ -52,9 +52,6 @@ class BlobManager(DHTHashSupplier):
def get_blob_length(self, blob_hash):
pass
def check_consistency(self):
pass
def blob_requested(self, blob_hash):
pass
@ -86,6 +83,8 @@ class DiskBlobManager(BlobManager):
self.db_conn = None
self.blob_type = BlobFile
self.blob_creator_type = BlobFileCreator
# TODO: consider using an LRU for blobs as there could potentially
# be thousands of blobs loaded up, many stale
self.blobs = {}
self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)}
self._next_manage_call = None
@ -102,7 +101,6 @@ class DiskBlobManager(BlobManager):
if self._next_manage_call is not None and self._next_manage_call.active():
self._next_manage_call.cancel()
self._next_manage_call = None
#d = self.db_conn.close()
self.db_conn = None
return defer.succeed(True)
@ -120,6 +118,7 @@ class DiskBlobManager(BlobManager):
return self.blob_creator_type(self, self.blob_dir)
def _make_new_blob(self, blob_hash, upload_allowed, length=None):
log.debug('Making a new blob for %s', blob_hash)
blob = self.blob_type(self.blob_dir, blob_hash, upload_allowed, length)
self.blobs[blob_hash] = blob
d = self._completed_blobs([blob_hash])
@ -143,9 +142,11 @@ class DiskBlobManager(BlobManager):
def blob_completed(self, blob, next_announce_time=None):
if next_announce_time is None:
next_announce_time = time.time()
return self._add_completed_blob(blob.blob_hash, blob.length,
next_announce_time = time.time() + self.hash_reannounce_time
d = self._add_completed_blob(blob.blob_hash, blob.length,
time.time(), next_announce_time)
d.addCallback(lambda _: self.hash_announcer.immediate_announce([blob.blob_hash]))
return d
def completed_blobs(self, blobs_to_check):
return self._completed_blobs(blobs_to_check)
@ -186,9 +187,6 @@ class DiskBlobManager(BlobManager):
def get_blob_length(self, blob_hash):
return self._get_blob_length(blob_hash)
def check_consistency(self):
return self._check_consistency()
def get_all_verified_blobs(self):
d = self._get_all_verified_blob_hashes()
d.addCallback(self.completed_blobs)
@ -299,18 +297,27 @@ class DiskBlobManager(BlobManager):
@rerun_if_locked
def _completed_blobs(self, blobs_to_check):
"""Returns of the blobs_to_check, which are valid"""
blobs_to_check = filter(is_valid_blobhash, blobs_to_check)
def get_blobs_in_db(db_transaction):
blobs_in_db = [] # [(blob_hash, last_verified_time)]
for b in blobs_to_check:
def _get_last_verified_time(db_transaction, blob_hash):
result = db_transaction.execute(
"select last_verified_time from blobs where blob_hash = ?",
(b,))
"select last_verified_time from blobs where blob_hash = ?", (blob_hash,))
row = result.fetchone()
if row is not None:
blobs_in_db.append((b, row[0]))
return blobs_in_db
if row:
return row[0]
else:
return None
def _filter_blobs_in_db(db_transaction, blobs_to_check):
for b in blobs_to_check:
verified_time = _get_last_verified_time(db_transaction, b)
if verified_time:
yield (b, verified_time)
def get_blobs_in_db(db_transaction, blob_to_check):
# [(blob_hash, last_verified_time)]
return list(_filter_blobs_in_db(db_transaction, blobs_to_check))
def get_valid_blobs(blobs_in_db):
@ -319,23 +326,31 @@ class DiskBlobManager(BlobManager):
if os.path.isfile(file_path):
if verified_time > os.path.getctime(file_path):
return True
else:
log.debug('Verification time for %s is too old (%s < %s)',
file_path, verified_time, os.path.getctime(file_path))
else:
log.debug('file %s does not exist', file_path)
return False
def return_valid_blobs(results):
valid_blobs = []
for (b, verified_date), (success, result) in zip(blobs_in_db, results):
if success is True and result is True:
valid_blobs.append(b)
def filter_valid_blobs(results):
assert len(blobs_in_db) == len(results)
valid_blobs = [
b for (b, verified_date), (success, result) in zip(blobs_in_db, results)
if success is True and result is True
]
log.debug('Of %s blobs, %s were valid', len(results), len(valid_blobs))
return valid_blobs
ds = []
for b, verified_date in blobs_in_db:
ds.append(threads.deferToThread(check_blob_verified_date, b, verified_date))
ds = [
threads.deferToThread(check_blob_verified_date, b, verified_date)
for b, verified_date in blobs_in_db
]
dl = defer.DeferredList(ds)
dl.addCallback(return_valid_blobs)
dl.addCallback(filter_valid_blobs)
return dl
d = self.db_conn.runInteraction(get_blobs_in_db)
d = self.db_conn.runInteraction(get_blobs_in_db, blobs_to_check)
d.addCallback(get_valid_blobs)
return d
@ -345,8 +360,6 @@ class DiskBlobManager(BlobManager):
d.addCallback(lambda r: r[0][0] if len(r) else Failure(NoSuchBlobError(blob)))
return d
#length, verified_time, next_announce_time = json.loads(self.db.Get(blob))
#return length
@rerun_if_locked
def _update_blob_verified_timestamp(self, blob, timestamp):
@ -382,73 +395,6 @@ class DiskBlobManager(BlobManager):
return self.db_conn.runInteraction(delete_blobs)
@rerun_if_locked
def _check_consistency(self):
ALREADY_VERIFIED = 1
NEWLY_VERIFIED = 2
INVALID = 3
current_time = time.time()
d = self.db_conn.runQuery("select blob_hash, blob_length, last_verified_time from blobs")
def check_blob(blob_hash, blob_length, verified_time):
file_path = os.path.join(self.blob_dir, blob_hash)
if os.path.isfile(file_path):
if verified_time >= os.path.getctime(file_path):
return ALREADY_VERIFIED
else:
h = get_lbry_hash_obj()
len_so_far = 0
f = open(file_path)
while True:
data = f.read(2**12)
if not data:
break
h.update(data)
len_so_far += len(data)
if len_so_far == blob_length and h.hexdigest() == blob_hash:
return NEWLY_VERIFIED
return INVALID
def do_check(blobs):
already_verified = []
newly_verified = []
invalid = []
for blob_hash, blob_length, verified_time in blobs:
status = check_blob(blob_hash, blob_length, verified_time)
if status == ALREADY_VERIFIED:
already_verified.append(blob_hash)
elif status == NEWLY_VERIFIED:
newly_verified.append(blob_hash)
else:
invalid.append(blob_hash)
return already_verified, newly_verified, invalid
def update_newly_verified(transaction, blobs):
for b in blobs:
transaction.execute("update blobs set last_verified_time = ? where blob_hash = ?",
(current_time, b))
def check_blobs(blobs):
@rerun_if_locked
def update_and_return(status_lists):
already_verified, newly_verified, invalid = status_lists
d = self.db_conn.runInteraction(update_newly_verified, newly_verified)
d.addCallback(lambda _: status_lists)
return d
d = threads.deferToThread(do_check, blobs)
d.addCallback(update_and_return)
return d
d.addCallback(check_blobs)
return d
@rerun_if_locked
def _get_all_verified_blob_hashes(self):
d = self.db_conn.runQuery("select blob_hash, last_verified_time from blobs")

View file

@ -212,7 +212,6 @@ class Wallet(object):
once the service has been rendered
"""
rounded_amount = Decimal(str(round(amount, 8)))
#if peer in self.peer_addresses:
if self.wallet_balance >= self.total_reserved_points + rounded_amount:
self.total_reserved_points += rounded_amount
return ReservedPoints(identifier, rounded_amount)
@ -461,11 +460,13 @@ class Wallet(object):
meta_for_return[k] = new_metadata[k]
return defer.succeed(Metadata(meta_for_return))
def claim_name(self, name, bid, m):
def _save_metadata(claim_out, metadata):
if not claim_out['success']:
msg = 'Claim to name {} failed: {}'.format(name, claim_out['reason'])
raise Exception(msg)
claim_out.pop('success')
claim_outpoint = ClaimOutpoint(claim_out['txid'], claim_out['nout'])
log.debug("Saving metadata for claim %s %d" % (claim_outpoint['txid'], claim_outpoint['nout']))
d = self._save_name_metadata(name, claim_outpoint, metadata['sources']['lbry_sd_hash'])
@ -492,11 +493,29 @@ class Wallet(object):
return d
def abandon_claim(self, txid, nout):
def _parse_abandon_claim_out(claim_out):
if not claim_out['success']:
msg = 'Abandon of {}:{} failed: {}'.format(txid, nout, claim_out['resason'])
raise Exception(msg)
claim_out.pop('success')
return defer.succeed(claim_out)
claim_outpoint = ClaimOutpoint(txid, nout)
return self._abandon_claim(claim_outpoint)
d = self._abandon_claim(claim_outpoint)
d.addCallback(lambda claim_out: _parse_abandon_claim_out(claim_out))
return d
def support_claim(self, name, claim_id, amount):
return self._support_claim(name, claim_id, amount)
def _parse_support_claim_out(claim_out):
if not claim_out['success']:
msg = 'Support of {}:{} failed: {}'.format(name, claim_id, claim_out['reason'])
raise Exception(msg)
claim_out.pop('success')
return defer.succeed(claim_out)
d = self._support_claim(name, claim_id, amount)
d.addCallback(lambda claim_out: _parse_support_claim_out(claim_out))
return d
def get_block_info(self, height):
d = self._get_blockhash(height)

View file

@ -209,7 +209,6 @@ class ClientProtocol(Protocol):
log.debug("Asking for another request.")
from twisted.internet import reactor
reactor.callLater(0, self._ask_for_request)
#self._ask_for_request()
else:
log.debug("Not asking for another request.")
self.transport.loseConnection()
@ -230,8 +229,6 @@ class ClientProtocol(Protocol):
# TODO: protocol had such a mechanism.
log.debug("Closing the connection to %s because the download of blob %s was canceled",
str(self.peer), str(self._blob_download_request.blob))
#self.transport.loseConnection()
#return True
return err
######### IRateLimited #########

View file

@ -1,4 +1,5 @@
import datetime
import inspect
import json
import logging
import logging.handlers
@ -14,8 +15,25 @@ import lbrynet
from lbrynet.conf import settings
from lbrynet.core import utils
####
# This code is copied from logging/__init__.py in the python source code
####
#
# _srcfile is used when walking the stack to check when we've got the first
# caller stack frame.
#
if hasattr(sys, 'frozen'): #support for py2exe
_srcfile = "logging%s__init__%s" % (os.sep, __file__[-4:])
elif __file__[-4:].lower() in ['.pyc', '.pyo']:
_srcfile = __file__[:-4] + '.py'
else:
_srcfile = __file__
_srcfile = os.path.normcase(_srcfile)
#####
session = FuturesSession()
TRACE = 5
def bg_cb(sess, resp):
@ -148,6 +166,30 @@ class JsonFormatter(logging.Formatter):
data['exc_info'] = self.formatException(record.exc_info)
return json.dumps(data)
####
# This code is copied from logging/__init__.py in the python source code
####
def findCaller(srcfile=None):
"""Returns the filename, line number and function name of the caller"""
srcfile = srcfile or _srcfile
f = inspect.currentframe()
#On some versions of IronPython, currentframe() returns None if
#IronPython isn't run with -X:Frames.
if f is not None:
f = f.f_back
rv = "(unknown file)", 0, "(unknown function)"
while hasattr(f, "f_code"):
co = f.f_code
filename = os.path.normcase(co.co_filename)
# ignore any function calls that are in this file
if filename == srcfile:
f = f.f_back
continue
rv = (filename, f.f_lineno, co.co_name)
break
return rv
###
def failure(failure, log, msg, *args):
"""Log a failure message from a deferred.
@ -287,3 +329,53 @@ class LogUploader(object):
else:
log_size = 0
return cls(log_name, log_file, log_size)
class Logger(logging.Logger):
"""A logger that has an extra `fail` method useful for handling twisted failures."""
def fail(self, callback=None, *args, **kwargs):
"""Returns a function to log a failure from an errback.
The returned function appends the error message and extracts
the traceback from `err`.
Example usage:
d.addErrback(log.fail(), 'This is an error message')
Although odd, making the method call is necessary to extract
out useful filename and line number information; otherwise the
reported values are from inside twisted's deferred handling
code.
Args:
callback: callable to call after making the log. The first argument
will be the `err` from the deferred
args: extra arguments to pass into `callback`
Returns: a function that takes the following arguments:
err: twisted.python.failure.Failure
msg: the message to log, using normal logging string iterpolation.
msg_args: the values to subtitute into `msg`
msg_kwargs: set `level` to change from the default ERROR severity. Other
keywoards are treated as normal log kwargs.
"""
fn, lno, func = findCaller()
def _fail(err, msg, *msg_args, **msg_kwargs):
level = msg_kwargs.pop('level', logging.ERROR)
msg += ": %s"
msg_args += (err.getErrorMessage(),)
exc_info = (err.type, err.value, err.getTracebackObject())
record = self.makeRecord(
self.name, level, fn, lno, msg, msg_args, exc_info, func, msg_kwargs)
self.handle(record)
if callback:
callback(err, *args, **kwargs)
return _fail
def trace(self, msg, *args, **kwargs):
if self.isEnabledFor(TRACE):
self._log(TRACE, msg, args, **kwargs)
logging.setLoggerClass(Logger)
logging.addLevelName(TRACE, 'TRACE')

View file

@ -1,6 +1,7 @@
import binascii
import collections
import logging
import time
from twisted.internet import defer, reactor
@ -42,6 +43,7 @@ class DHTHashAnnouncer(object):
return defer.succeed(False)
def _announce_available_hashes(self):
log.debug('Announcing available hashes')
ds = []
for supplier in self.suppliers:
d = supplier.hashes_to_announce()
@ -51,7 +53,11 @@ class DHTHashAnnouncer(object):
return dl
def _announce_hashes(self, hashes):
if not hashes:
return
log.debug('Announcing %s hashes', len(hashes))
# TODO: add a timeit decorator
start = time.time()
ds = []
for h in hashes:
@ -62,6 +68,7 @@ class DHTHashAnnouncer(object):
def announce():
if len(self.hash_queue):
h, announce_deferred = self.hash_queue.popleft()
log.debug('Announcing blob %s to dht', h)
d = self.dht_node.announceHaveBlob(binascii.unhexlify(h), self.peer_port)
d.chainDeferred(announce_deferred)
d.addBoth(lambda _: reactor.callLater(0, announce))
@ -72,7 +79,10 @@ class DHTHashAnnouncer(object):
# TODO: maybe make the 5 configurable
self._concurrent_announcers += 1
announce()
return defer.DeferredList(ds)
d = defer.DeferredList(ds)
d.addCallback(lambda _: log.debug('Took %s seconds to announce %s hashes',
time.time() - start, len(hashes)))
return d
class DHTHashSupplier(object):

View file

@ -64,7 +64,7 @@ class ServerProtocol(Protocol):
self.transport.loseConnection()
def write(self, data):
log.debug("Writing %s bytes of data to the transport", str(len(data)))
log.trace("Writing %s bytes of data to the transport", len(data))
self.transport.write(data)
self.factory.rate_limiter.report_ul_bytes(len(data))

View file

@ -52,18 +52,19 @@ class ServerRequestHandler(object):
from twisted.internet import reactor
if self.production_paused is False:
if self.production_paused:
return
chunk = self.response_buff[:self.CHUNK_SIZE]
self.response_buff = self.response_buff[self.CHUNK_SIZE:]
if chunk != '':
log.debug("writing %s bytes to the client", str(len(chunk)))
if chunk == '':
return
log.trace("writing %s bytes to the client", len(chunk))
self.consumer.write(chunk)
reactor.callLater(0, self._produce_more)
#IConsumer stuff
def registerProducer(self, producer, streaming):
#assert self.file_sender == producer
self.producer = producer
assert streaming is False
producer.resumeProducing()
@ -80,7 +81,7 @@ class ServerRequestHandler(object):
def get_more_data():
if self.producer is not None:
log.debug("Requesting more data from the producer")
log.trace("Requesting more data from the producer")
self.producer.resumeProducing()
reactor.callLater(0, get_more_data)

View file

@ -48,7 +48,6 @@ class StreamBlobDecryptor(object):
self.buff += data
self.len_read += len(data)
write_bytes()
#write_func(remove_padding(self.cipher.decrypt(self.buff)))
d = self.blob.read(decrypt_bytes)
d.addCallback(lambda _: finish_decrypt())

View file

@ -1,307 +0,0 @@
# import sqlite3
# import unqlite
# import leveldb
# import shutil
# import os
# import logging
# import json
#
#
# log = logging.getLogger(__name__)
#
#
# known_dbs = ['lbryfile_desc.db', 'lbryfiles.db', 'valuable_blobs.db', 'blobs.db',
# 'lbryfile_blob.db', 'lbryfile_info.db', 'settings.db', 'blind_settings.db',
# 'blind_peers.db', 'blind_info.db', 'lbryfile_info.db', 'lbryfile_manager.db',
# 'live_stream.db', 'stream_info.db', 'stream_blob.db', 'stream_desc.db']
#
#
# def do_move(from_dir, to_dir):
# for known_db in known_dbs:
# known_db_path = os.path.join(from_dir, known_db)
# if os.path.exists(known_db_path):
# log.debug("Moving %s to %s",
# os.path.abspath(known_db_path),
# os.path.abspath(os.path.join(to_dir, known_db)))
# shutil.move(known_db_path, os.path.join(to_dir, known_db))
# else:
# log.debug("Did not find %s", os.path.abspath(known_db_path))
#
#
# def do_migration(db_dir):
# old_dir = os.path.join(db_dir, "_0_to_1_old")
# new_dir = os.path.join(db_dir, "_0_to_1_new")
# try:
# log.info("Moving dbs from the real directory to %s", os.path.abspath(old_dir))
# os.makedirs(old_dir)
# do_move(db_dir, old_dir)
# except:
# log.error("An error occurred moving the old db files.")
# raise
# try:
# log.info("Creating the new directory in %s", os.path.abspath(new_dir))
# os.makedirs(new_dir)
#
# except:
# log.error("An error occurred creating the new directory.")
# raise
# try:
# log.info("Doing the migration")
# migrate_blob_db(old_dir, new_dir)
# migrate_lbryfile_db(old_dir, new_dir)
# migrate_livestream_db(old_dir, new_dir)
# migrate_ptc_db(old_dir, new_dir)
# migrate_lbryfile_manager_db(old_dir, new_dir)
# migrate_settings_db(old_dir, new_dir)
# migrate_repeater_db(old_dir, new_dir)
# log.info("Migration succeeded")
# except:
# log.error("An error occurred during the migration. Restoring.")
# do_move(old_dir, db_dir)
# raise
# try:
# log.info("Moving dbs in the new directory to the real directory")
# do_move(new_dir, db_dir)
# db_revision = open(os.path.join(db_dir, 'db_revision'), mode='w+')
# db_revision.write("1")
# db_revision.close()
# os.rmdir(new_dir)
# except:
# log.error("An error occurred moving the new db files.")
# raise
# return old_dir
#
#
# def migrate_blob_db(old_db_dir, new_db_dir):
# old_blob_db_path = os.path.join(old_db_dir, "blobs.db")
# if not os.path.exists(old_blob_db_path):
# return True
#
# old_db = leveldb.LevelDB(old_blob_db_path)
# new_db_conn = sqlite3.connect(os.path.join(new_db_dir, "blobs.db"))
# c = new_db_conn.cursor()
# c.execute("create table if not exists blobs (" +
# " blob_hash text primary key, " +
# " blob_length integer, " +
# " last_verified_time real, " +
# " next_announce_time real"
# ")")
# new_db_conn.commit()
# c = new_db_conn.cursor()
# for blob_hash, blob_info in old_db.RangeIter():
# blob_length, verified_time, announce_time = json.loads(blob_info)
# c.execute("insert into blobs values (?, ?, ?, ?)",
# (blob_hash, blob_length, verified_time, announce_time))
# new_db_conn.commit()
# new_db_conn.close()
#
#
# def migrate_lbryfile_db(old_db_dir, new_db_dir):
# old_lbryfile_db_path = os.path.join(old_db_dir, "lbryfiles.db")
# if not os.path.exists(old_lbryfile_db_path):
# return True
#
# stream_info_db = leveldb.LevelDB(os.path.join(old_db_dir, "lbryfile_info.db"))
# stream_blob_db = leveldb.LevelDB(os.path.join(old_db_dir, "lbryfile_blob.db"))
# stream_desc_db = leveldb.LevelDB(os.path.join(old_db_dir, "lbryfile_desc.db"))
#
# db_conn = sqlite3.connect(os.path.join(new_db_dir, "lbryfile_info.db"))
# c = db_conn.cursor()
# c.execute("create table if not exists lbry_files (" +
# " stream_hash text primary key, " +
# " key text, " +
# " stream_name text, " +
# " suggested_file_name text" +
# ")")
# c.execute("create table if not exists lbry_file_blobs (" +
# " blob_hash text, " +
# " stream_hash text, " +
# " position integer, " +
# " iv text, " +
# " length integer, " +
# " foreign key(stream_hash) references lbry_files(stream_hash)" +
# ")")
# c.execute("create table if not exists lbry_file_descriptors (" +
# " sd_blob_hash TEXT PRIMARY KEY, " +
# " stream_hash TEXT, " +
# " foreign key(stream_hash) references lbry_files(stream_hash)" +
# ")")
# db_conn.commit()
# c = db_conn.cursor()
# for stream_hash, stream_info in stream_info_db.RangeIter():
# key, name, suggested_file_name = json.loads(stream_info)
# c.execute("insert into lbry_files values (?, ?, ?, ?)",
# (stream_hash, key, name, suggested_file_name))
# db_conn.commit()
# c = db_conn.cursor()
# for blob_hash_stream_hash, blob_info in stream_blob_db.RangeIter():
# b_h, s_h = json.loads(blob_hash_stream_hash)
# position, iv, length = json.loads(blob_info)
# c.execute("insert into lbry_file_blobs values (?, ?, ?, ?, ?)",
# (b_h, s_h, position, iv, length))
# db_conn.commit()
# c = db_conn.cursor()
# for sd_blob_hash, stream_hash in stream_desc_db.RangeIter():
# c.execute("insert into lbry_file_descriptors values (?, ?)",
# (sd_blob_hash, stream_hash))
# db_conn.commit()
# db_conn.close()
#
#
# def migrate_livestream_db(old_db_dir, new_db_dir):
# old_db_path = os.path.join(old_db_dir, "stream_info.db")
# if not os.path.exists(old_db_path):
# return True
# stream_info_db = leveldb.LevelDB(os.path.join(old_db_dir, "stream_info.db"))
# stream_blob_db = leveldb.LevelDB(os.path.join(old_db_dir, "stream_blob.db"))
# stream_desc_db = leveldb.LevelDB(os.path.join(old_db_dir, "stream_desc.db"))
#
# db_conn = sqlite3.connect(os.path.join(new_db_dir, "live_stream.db"))
#
# c = db_conn.cursor()
#
# c.execute("create table if not exists live_streams (" +
# " stream_hash text primary key, " +
# " public_key text, " +
# " key text, " +
# " stream_name text, " +
# " next_announce_time real" +
# ")")
# c.execute("create table if not exists live_stream_blobs (" +
# " blob_hash text, " +
# " stream_hash text, " +
# " position integer, " +
# " revision integer, " +
# " iv text, " +
# " length integer, " +
# " signature text, " +
# " foreign key(stream_hash) references live_streams(stream_hash)" +
# ")")
# c.execute("create table if not exists live_stream_descriptors (" +
# " sd_blob_hash TEXT PRIMARY KEY, " +
# " stream_hash TEXT, " +
# " foreign key(stream_hash) references live_streams(stream_hash)" +
# ")")
#
# db_conn.commit()
#
# c = db_conn.cursor()
# for stream_hash, stream_info in stream_info_db.RangeIter():
# public_key, key, name, next_announce_time = json.loads(stream_info)
# c.execute("insert into live_streams values (?, ?, ?, ?, ?)",
# (stream_hash, public_key, key, name, next_announce_time))
# db_conn.commit()
# c = db_conn.cursor()
# for blob_hash_stream_hash, blob_info in stream_blob_db.RangeIter():
# b_h, s_h = json.loads(blob_hash_stream_hash)
# position, revision, iv, length, signature = json.loads(blob_info)
# c.execute("insert into live_stream_blobs values (?, ?, ?, ?, ?, ?, ?)",
# (b_h, s_h, position, revision, iv, length, signature))
# db_conn.commit()
# c = db_conn.cursor()
# for sd_blob_hash, stream_hash in stream_desc_db.RangeIter():
# c.execute("insert into live_stream_descriptors values (?, ?)",
# (sd_blob_hash, stream_hash))
# db_conn.commit()
# db_conn.close()
#
#
# def migrate_ptc_db(old_db_dir, new_db_dir):
# old_db_path = os.path.join(old_db_dir, "ptcwallet.db")
# if not os.path.exists(old_db_path):
# return True
# old_db = leveldb.LevelDB(old_db_path)
# try:
# p_key = old_db.Get("private_key")
# new_db = unqlite.UnQLite(os.path.join(new_db_dir, "ptcwallet.db"))
# new_db['private_key'] = p_key
# except KeyError:
# pass
#
#
# def migrate_lbryfile_manager_db(old_db_dir, new_db_dir):
# old_db_path = os.path.join(old_db_dir, "lbryfiles.db")
# if not os.path.exists(old_db_path):
# return True
# old_db = leveldb.LevelDB(old_db_path)
# new_db = sqlite3.connect(os.path.join(new_db_dir, "lbryfile_info.db"))
# c = new_db.cursor()
# c.execute("create table if not exists lbry_file_options (" +
# " blob_data_rate real, " +
# " status text," +
# " stream_hash text,"
# " foreign key(stream_hash) references lbry_files(stream_hash)" +
# ")")
# new_db.commit()
# FILE_STATUS = "t"
# FILE_OPTIONS = "o"
# c = new_db.cursor()
# for k, v in old_db.RangeIter():
# key_type, stream_hash = json.loads(k)
# if key_type == FILE_STATUS:
# try:
# rate = json.loads(old_db.Get(json.dumps((FILE_OPTIONS, stream_hash))))[0]
# except KeyError:
# rate = None
# c.execute("insert into lbry_file_options values (?, ?, ?)",
# (rate, v, stream_hash))
# new_db.commit()
# new_db.close()
#
#
# def migrate_settings_db(old_db_dir, new_db_dir):
# old_settings_db_path = os.path.join(old_db_dir, "settings.db")
# if not os.path.exists(old_settings_db_path):
# return True
# old_db = leveldb.LevelDB(old_settings_db_path)
# new_db = unqlite.UnQLite(os.path.join(new_db_dir, "settings.db"))
# for k, v in old_db.RangeIter():
# new_db[k] = v
#
#
# def migrate_repeater_db(old_db_dir, new_db_dir):
# old_repeater_db_path = os.path.join(old_db_dir, "valuable_blobs.db")
# if not os.path.exists(old_repeater_db_path):
# return True
# old_db = leveldb.LevelDB(old_repeater_db_path)
# info_db = sqlite3.connect(os.path.join(new_db_dir, "blind_info.db"))
# peer_db = sqlite3.connect(os.path.join(new_db_dir, "blind_peers.db"))
# unql_db = unqlite.UnQLite(os.path.join(new_db_dir, "blind_settings.db"))
# BLOB_INFO_TYPE = 'b'
# SETTING_TYPE = 's'
# PEER_TYPE = 'p'
# info_c = info_db.cursor()
# info_c.execute("create table if not exists valuable_blobs (" +
# " blob_hash text primary key, " +
# " blob_length integer, " +
# " reference text, " +
# " peer_host text, " +
# " peer_port integer, " +
# " peer_score text" +
# ")")
# info_db.commit()
# peer_c = peer_db.cursor()
# peer_c.execute("create table if not exists approved_peers (" +
# " ip_address text, " +
# " port integer" +
# ")")
# peer_db.commit()
# info_c = info_db.cursor()
# peer_c = peer_db.cursor()
# for k, v in old_db.RangeIter():
# key_type, key_rest = json.loads(k)
# if key_type == PEER_TYPE:
# host, port = key_rest
# peer_c.execute("insert into approved_peers values (?, ?)",
# (host, port))
# elif key_type == SETTING_TYPE:
# unql_db[key_rest] = v
# elif key_type == BLOB_INFO_TYPE:
# blob_hash = key_rest
# length, reference, peer_host, peer_port, peer_score = json.loads(v)
# info_c.execute("insert into valuable_blobs values (?, ?, ?, ?, ?, ?)",
# (blob_hash, length, reference, peer_host, peer_port, peer_score))
# info_db.commit()
# peer_db.commit()
# info_db.close()
# peer_db.close()

View file

@ -6,10 +6,16 @@
#
# The docstrings in this module contain epytext markup; API documentation
# may be created by processing this file with epydoc: http://epydoc.sf.net
import hashlib, random, struct, time, binascii
import argparse
import binascii
import hashlib
import operator
import random
import struct
import time
from twisted.internet import defer, error
import constants
import routingtable
import datastore
@ -34,6 +40,7 @@ def rpcmethod(func):
func.rpcmethod = True
return func
class Node(object):
""" Local node in the Kademlia network
@ -85,9 +92,6 @@ class Node(object):
self.next_refresh_call = None
self.next_change_token_call = None
# Create k-buckets (for storing contacts)
#self._buckets = []
#for i in range(160):
# self._buckets.append(kbucket.KBucket())
if routingTableClass == None:
self._routingTable = routingtable.OptimizedTreeRoutingTable(self.id)
else:
@ -118,7 +122,6 @@ class Node(object):
self.hash_watcher = HashWatcher()
def __del__(self):
#self._persistState()
if self._listeningPort is not None:
self._listeningPort.stopListening()
@ -165,16 +168,6 @@ class Node(object):
# Initiate the Kademlia joining sequence - perform a search for this node's own ID
self._joinDeferred = self._iterativeFind(self.id, bootstrapContacts)
# #TODO: Refresh all k-buckets further away than this node's closest neighbour
# def getBucketAfterNeighbour(*args):
# for i in range(160):
# if len(self._buckets[i]) > 0:
# return i+1
# return 160
# df.addCallback(getBucketAfterNeighbour)
# df.addCallback(self._refreshKBuckets)
#protocol.reactor.callLater(10, self.printContacts)
#self._joinDeferred.addCallback(self._persistState)
#self._joinDeferred.addCallback(self.printContacts)
# Start refreshing k-buckets periodically, if necessary
self.next_refresh_call = twisted.internet.reactor.callLater(
constants.checkRefreshInterval, self._refreshNode) #IGNORE:E1101
@ -187,7 +180,6 @@ class Node(object):
for contact in self._routingTable._buckets[i]._contacts:
print contact
print '=================================='
#twisted.internet.reactor.callLater(10, self.printContacts)
def getApproximateTotalDHTNodes(self):
# get the deepest bucket and the number of contacts in that bucket and multiply it
@ -218,7 +210,6 @@ class Node(object):
if type(result) == dict:
if blob_hash in result:
for peer in result[blob_hash]:
#print peer
if self.lbryid != peer[6:]:
host = ".".join([str(ord(d)) for d in peer[:4]])
if host == "127.0.0.1":
@ -230,8 +221,6 @@ class Node(object):
return expanded_peers
def find_failed(err):
#print "An exception occurred in the DHT"
#print err.getErrorMessage()
return []
d = self.iterativeFindValue(blob_hash)
@ -246,9 +235,14 @@ class Node(object):
known_nodes = {}
def log_error(err, n):
log.debug("error storing blob_hash %s at %s", binascii.hexlify(blob_hash), str(n))
log.debug(err.getErrorMessage())
log.debug(err.getTraceback())
if err.check(protocol.TimeoutError):
log.debug(
"Timeout while storing blob_hash %s at %s",
binascii.hexlify(blob_hash), n)
else:
log.error(
"Unexpected error while storing blob_hash %s at %s: %s",
binascii.hexlify(blob_hash), n, err.getErrorMessage())
def log_success(res):
log.debug("Response to store request: %s", str(res))
@ -268,28 +262,21 @@ class Node(object):
result = responseMsg.response
if 'token' in result:
#print "Printing result...", result
value['token'] = result['token']
d = n.store(blob_hash, value, self.id, 0)
d.addCallback(log_success)
d.addErrback(log_error, n)
else:
d = defer.succeed(False)
#else:
# print "result:", result
# print "No token where it should be"
return d
def requestPeers(contacts):
if self.externalIP is not None and len(contacts) >= constants.k:
is_closer = (
self._routingTable.distance(blob_hash, self.id) <
self._routingTable.distance(blob_hash, contacts[-1].id))
is_closer = Distance(blob_hash).is_closer(self.id, contacts[-1].id)
if is_closer:
contacts.pop()
self.store(blob_hash, value, self_store=True, originalPublisherID=self.id)
elif self.externalIP is not None:
#print "attempting to self-store"
self.store(blob_hash, value, self_store=True, originalPublisherID=self.id)
ds = []
for contact in contacts:
@ -323,7 +310,6 @@ class Node(object):
h = hashlib.new('sha384')
h.update(self.old_token_secret + compact_ip)
if not token == h.digest():
#print 'invalid token found'
return False
return True
@ -368,24 +354,17 @@ class Node(object):
def checkResult(result):
if type(result) == dict:
# We have found the value; now see who was the closest contact without it...
# if 'closestNodeNoValue' in result:
# ...and store the key/value pair
# contact = result['closestNodeNoValue']
# contact.store(key, result[key])
outerDf.callback(result)
else:
# The value wasn't found, but a list of contacts was returned
# Now, see if we have the value (it might seem wasteful to search on the network
# first, but it ensures that all values are properly propagated through the
# network
#if key in self._dataStore:
if self._dataStore.hasPeersForBlob(key):
# Ok, we have the value locally, so use that
peers = self._dataStore.getPeersForBlob(key)
# Send this value to the closest node without it
#if len(result) > 0:
# contact = result[0]
# contact.store(key, value)
outerDf.callback({key: peers, "from_peer": 'self'})
else:
# Ok, value does not exist in DHT at all
@ -484,19 +463,13 @@ class Node(object):
compact_ip = contact.compact_ip()
elif '_rpcNodeContact' in kwargs:
contact = kwargs['_rpcNodeContact']
#print contact.address
compact_ip = contact.compact_ip()
#print compact_ip
else:
return 'Not OK'
#raise TypeError, 'No contact info available'
if ((self_store is False) and
(not 'token' in value or not self.verify_token(value['token'], compact_ip))):
#if not 'token' in value:
# print "Couldn't find token in value"
#elif not self.verify_token(value['token'], contact.compact_ip()):
# print "Token is invalid"
raise ValueError('Invalid or missing token')
if 'port' in value:
@ -518,11 +491,8 @@ class Node(object):
now = int(time.time())
originallyPublished = now# - age
#print compact_address
self._dataStore.addPeerToBlob(
key, compact_address, now, originallyPublished, originalPublisherID)
#if self_store is True:
# print "looks like it was successful maybe"
return 'OK'
@rpcmethod
@ -631,187 +601,17 @@ class Node(object):
# This is used during the bootstrap process; node ID's are most probably fake
shortlist = startupShortlist
# List of active queries; len() indicates number of active probes
#
# n.b: using lists for these variables, because Python doesn't
# allow binding a new value to a name in an enclosing
# (non-global) scope
activeProbes = []
# List of contact IDs that have already been queried
alreadyContacted = []
# Probes that were active during the previous iteration
# A list of found and known-to-be-active remote nodes
activeContacts = []
# This should only contain one entry; the next scheduled iteration call
pendingIterationCalls = []
prevClosestNode = [None]
findValueResult = {}
slowNodeCount = [0]
def extendShortlist(responseTuple):
""" @type responseMsg: kademlia.msgtypes.ResponseMessage """
# The "raw response" tuple contains the response message,
# and the originating address info
responseMsg = responseTuple[0]
originAddress = responseTuple[1] # tuple: (ip adress, udp port)
# Make sure the responding node is valid, and abort the operation if it isn't
if responseMsg.nodeID in activeContacts or responseMsg.nodeID == self.id:
return responseMsg.nodeID
# Mark this node as active
if responseMsg.nodeID in shortlist:
# Get the contact information from the shortlist...
aContact = shortlist[shortlist.index(responseMsg.nodeID)]
else:
# If it's not in the shortlist; we probably used a fake ID to reach it
# - reconstruct the contact, using the real node ID this time
aContact = Contact(
responseMsg.nodeID, originAddress[0], originAddress[1], self._protocol)
activeContacts.append(aContact)
# This makes sure "bootstrap"-nodes with "fake" IDs don't get queried twice
if responseMsg.nodeID not in alreadyContacted:
alreadyContacted.append(responseMsg.nodeID)
# Now grow extend the (unverified) shortlist with the returned contacts
result = responseMsg.response
#TODO: some validation on the result (for guarding against attacks)
# If we are looking for a value, first see if this result is the value
# we are looking for before treating it as a list of contact triples
if findValue is True and key in result and not 'contacts' in result:
# We have found the value
findValueResult[key] = result[key]
findValueResult['from_peer'] = aContact.address
else:
if findValue is True:
# We are looking for a value, and the remote node didn't have it
# - mark it as the closest "empty" node, if it is
if 'closestNodeNoValue' in findValueResult:
is_closer = (
self._routingTable.distance(key, responseMsg.nodeID) <
self._routingTable.distance(key, activeContacts[0].id))
if is_closer:
findValueResult['closestNodeNoValue'] = aContact
else:
findValueResult['closestNodeNoValue'] = aContact
contactTriples = result['contacts']
else:
contactTriples = result
for contactTriple in contactTriples:
if isinstance(contactTriple, (list, tuple)) and len(contactTriple) == 3:
testContact = Contact(
contactTriple[0], contactTriple[1], contactTriple[2], self._protocol)
if testContact not in shortlist:
shortlist.append(testContact)
return responseMsg.nodeID
def removeFromShortlist(failure):
""" @type failure: twisted.python.failure.Failure """
failure.trap(protocol.TimeoutError)
deadContactID = failure.getErrorMessage()
if deadContactID in shortlist:
shortlist.remove(deadContactID)
return deadContactID
def cancelActiveProbe(contactID):
activeProbes.pop()
if len(activeProbes) <= constants.alpha/2 and len(pendingIterationCalls):
# Force the iteration
pendingIterationCalls[0].cancel()
del pendingIterationCalls[0]
#print 'forcing iteration ================='
searchIteration()
def log_error(err):
log.error(err.getErrorMessage())
# Send parallel, asynchronous FIND_NODE RPCs to the shortlist of contacts
def searchIteration():
#print '==> searchiteration'
slowNodeCount[0] = len(activeProbes)
# TODO: move sort_key to be a method on the class
def sort_key(firstContact, secondContact, targetKey=key):
return cmp(
self._routingTable.distance(firstContact.id, targetKey),
self._routingTable.distance(secondContact.id, targetKey)
)
# Sort the discovered active nodes from closest to furthest
activeContacts.sort(sort_key)
# This makes sure a returning probe doesn't force calling this function by mistake
while len(pendingIterationCalls):
del pendingIterationCalls[0]
# See if should continue the search
if key in findValueResult:
outerDf.callback(findValueResult)
return
elif len(activeContacts) and findValue == False:
is_all_done = (
len(activeContacts) >= constants.k or
(
activeContacts[0] == prevClosestNode[0] and
len(activeProbes) == slowNodeCount[0]
)
)
if is_all_done:
# TODO: Re-send the FIND_NODEs to all of the k closest nodes not already queried
#
# Ok, we're done; either we have accumulated k
# active contacts or no improvement in closestNode
# has been noted
outerDf.callback(activeContacts)
return
# The search continues...
if len(activeContacts):
prevClosestNode[0] = activeContacts[0]
contactedNow = 0
shortlist.sort(sort_key)
# Store the current shortList length before contacting other nodes
prevShortlistLength = len(shortlist)
for contact in shortlist:
if contact.id not in alreadyContacted:
activeProbes.append(contact.id)
rpcMethod = getattr(contact, rpc)
df = rpcMethod(key, rawResponse=True)
df.addCallback(extendShortlist)
df.addErrback(removeFromShortlist)
df.addCallback(cancelActiveProbe)
df.addErrback(log_error)
alreadyContacted.append(contact.id)
contactedNow += 1
if contactedNow == constants.alpha:
break
should_lookup_active_calls = (
len(activeProbes) > slowNodeCount[0] or
(
len(shortlist) < constants.k and
len(activeContacts) < len(shortlist) and
len(activeProbes) > 0
)
)
if should_lookup_active_calls:
# Schedule the next iteration if there are any active
# calls (Kademlia uses loose parallelism)
call = twisted.internet.reactor.callLater(
constants.iterativeLookupDelay, searchIteration) #IGNORE:E1101
pendingIterationCalls.append(call)
# Check for a quick contact response that made an update to the shortList
elif prevShortlistLength < len(shortlist):
# Ensure that the closest contacts are taken from the updated shortList
searchIteration()
else:
#print '++++++++++++++ DONE (logically) +++++++++++++\n\n'
# If no probes were sent, there will not be any improvement, so we're done
outerDf.callback(activeContacts)
outerDf = defer.Deferred()
helper = _IterativeFindHelper(self, outerDf, shortlist, key, findValue, rpc)
# Start the iterations
searchIteration()
helper.searchIteration()
return outerDf
def _refreshNode(self):
""" Periodically called to perform k-bucket refreshes and data
replication/republishing as necessary """
#print 'refreshNode called'
df = self._refreshRoutingTable()
#df.addCallback(self._republishData)
df.addCallback(self._removeExpiredPeers)
df.addCallback(self._scheduleNextNodeRefresh)
@ -830,13 +630,8 @@ class Node(object):
searchForNextNodeID()
return outerDf
#def _republishData(self, *args):
# #print '---republishData() called'
# df = twisted.internet.threads.deferToThread(self._threadedRepublishData)
# return df
def _scheduleNextNodeRefresh(self, *args):
#print '==== sheduling next refresh'
self.next_refresh_call = twisted.internet.reactor.callLater(
constants.checkRefreshInterval, self._refreshNode)
@ -846,6 +641,266 @@ class Node(object):
return df
# This was originally a set of nested methods in _iterativeFind
# but they have been moved into this helper class in-order to
# have better scoping and readability
class _IterativeFindHelper(object):
# TODO: use polymorphism to search for a value or node
# instead of using a find_value flag
def __init__(self, node, outer_d, shortlist, key, find_value, rpc):
self.node = node
self.outer_d = outer_d
self.shortlist = shortlist
self.key = key
self.find_value = find_value
self.rpc = rpc
# all distance operations in this class only care about the distance
# to self.key, so this makes it easier to calculate those
self.distance = Distance(key)
# List of active queries; len() indicates number of active probes
#
# n.b: using lists for these variables, because Python doesn't
# allow binding a new value to a name in an enclosing
# (non-global) scope
self.active_probes = []
# List of contact IDs that have already been queried
self.already_contacted = []
# Probes that were active during the previous iteration
# A list of found and known-to-be-active remote nodes
self.active_contacts = []
# This should only contain one entry; the next scheduled iteration call
self.pending_iteration_calls = []
self.prev_closest_node = [None]
self.find_value_result = {}
self.slow_node_count = [0]
def extendShortlist(self, responseTuple):
""" @type responseMsg: kademlia.msgtypes.ResponseMessage """
# The "raw response" tuple contains the response message,
# and the originating address info
responseMsg = responseTuple[0]
originAddress = responseTuple[1] # tuple: (ip adress, udp port)
# Make sure the responding node is valid, and abort the operation if it isn't
if responseMsg.nodeID in self.active_contacts or responseMsg.nodeID == self.node.id:
return responseMsg.nodeID
# Mark this node as active
aContact = self._getActiveContact(responseMsg, originAddress)
self.active_contacts.append(aContact)
# This makes sure "bootstrap"-nodes with "fake" IDs don't get queried twice
if responseMsg.nodeID not in self.already_contacted:
self.already_contacted.append(responseMsg.nodeID)
# Now grow extend the (unverified) shortlist with the returned contacts
result = responseMsg.response
#TODO: some validation on the result (for guarding against attacks)
# If we are looking for a value, first see if this result is the value
# we are looking for before treating it as a list of contact triples
if self.find_value is True and self.key in result and not 'contacts' in result:
# We have found the value
self.find_value_result[self.key] = result[self.key]
self.find_value_result['from_peer'] = aContact.address
else:
if self.find_value is True:
self._setClosestNodeValue(responseMsg, aContact)
self._keepSearching(result)
return responseMsg.nodeID
def _getActiveContact(self, responseMsg, originAddress):
if responseMsg.nodeID in self.shortlist:
# Get the contact information from the shortlist...
return self.shortlist[self.shortlist.index(responseMsg.nodeID)]
else:
# If it's not in the shortlist; we probably used a fake ID to reach it
# - reconstruct the contact, using the real node ID this time
return Contact(
responseMsg.nodeID, originAddress[0], originAddress[1], self.node._protocol)
def _keepSearching(self, result):
contactTriples = self._getContactTriples(result)
for contactTriple in contactTriples:
self._addIfValid(contactTriple)
def _getContactTriples(self, result):
if self.find_value is True:
return result['contacts']
else:
return result
def _setClosestNodeValue(self, responseMsg, aContact):
# We are looking for a value, and the remote node didn't have it
# - mark it as the closest "empty" node, if it is
if 'closestNodeNoValue' in self.find_value_result:
if self._is_closer(responseMsg):
self.find_value_result['closestNodeNoValue'] = aContact
else:
self.find_value_result['closestNodeNoValue'] = aContact
def _is_closer(self, responseMsg):
return self.distance.is_closer(responseMsg.nodeID, self.active_contacts[0].id)
def _addIfValid(self, contactTriple):
if isinstance(contactTriple, (list, tuple)) and len(contactTriple) == 3:
testContact = Contact(
contactTriple[0], contactTriple[1], contactTriple[2], self.node._protocol)
if testContact not in self.shortlist:
self.shortlist.append(testContact)
def removeFromShortlist(self, failure):
""" @type failure: twisted.python.failure.Failure """
failure.trap(protocol.TimeoutError)
deadContactID = failure.getErrorMessage()
if deadContactID in self.shortlist:
self.shortlist.remove(deadContactID)
return deadContactID
def cancelActiveProbe(self, contactID):
self.active_probes.pop()
if len(self.active_probes) <= constants.alpha/2 and len(self.pending_iteration_calls):
# Force the iteration
self.pending_iteration_calls[0].cancel()
del self.pending_iteration_calls[0]
self.searchIteration()
def sortByDistance(self, contact_list):
"""Sort the list of contacts in order by distance from key"""
ExpensiveSort(contact_list, self.distance.to_contact).sort()
# Send parallel, asynchronous FIND_NODE RPCs to the shortlist of contacts
def searchIteration(self):
self.slow_node_count[0] = len(self.active_probes)
# Sort the discovered active nodes from closest to furthest
self.sortByDistance(self.active_contacts)
# This makes sure a returning probe doesn't force calling this function by mistake
while len(self.pending_iteration_calls):
del self.pending_iteration_calls[0]
# See if should continue the search
if self.key in self.find_value_result:
self.outer_d.callback(self.find_value_result)
return
elif len(self.active_contacts) and self.find_value == False:
if self._is_all_done():
# TODO: Re-send the FIND_NODEs to all of the k closest nodes not already queried
#
# Ok, we're done; either we have accumulated k active
# contacts or no improvement in closestNode has been
# noted
self.outer_d.callback(self.active_contacts)
return
# The search continues...
if len(self.active_contacts):
self.prev_closest_node[0] = self.active_contacts[0]
contactedNow = 0
self.sortByDistance(self.shortlist)
# Store the current shortList length before contacting other nodes
prevShortlistLength = len(self.shortlist)
for contact in self.shortlist:
if contact.id not in self.already_contacted:
self._probeContact(contact)
contactedNow += 1
if contactedNow == constants.alpha:
break
if self._should_lookup_active_calls():
# Schedule the next iteration if there are any active
# calls (Kademlia uses loose parallelism)
call = twisted.internet.reactor.callLater(
constants.iterativeLookupDelay, self.searchIteration) #IGNORE:E1101
self.pending_iteration_calls.append(call)
# Check for a quick contact response that made an update to the shortList
elif prevShortlistLength < len(self.shortlist):
# Ensure that the closest contacts are taken from the updated shortList
self.searchIteration()
else:
# If no probes were sent, there will not be any improvement, so we're done
self.outer_d.callback(self.active_contacts)
def _probeContact(self, contact):
self.active_probes.append(contact.id)
rpcMethod = getattr(contact, self.rpc)
df = rpcMethod(self.key, rawResponse=True)
df.addCallback(self.extendShortlist)
df.addErrback(self.removeFromShortlist)
df.addCallback(self.cancelActiveProbe)
df.addErrback(log.fail(), 'Failed to contact %s', contact)
self.already_contacted.append(contact.id)
def _should_lookup_active_calls(self):
return (
len(self.active_probes) > self.slow_node_count[0] or
(
len(self.shortlist) < constants.k and
len(self.active_contacts) < len(self.shortlist) and
len(self.active_probes) > 0
)
)
def _is_all_done(self):
return (
len(self.active_contacts) >= constants.k or
(
self.active_contacts[0] == self.prev_closest_node[0] and
len(self.active_probes) == self.slow_node_count[0]
)
)
class Distance(object):
"""Calculate the XOR result between two string variables.
Frequently we re-use one of the points so as an optimization
we pre-calculate the long value of that point.
"""
def __init__(self, key):
self.key = key
self.val_key_one = long(key.encode('hex'), 16)
def __call__(self, key_two):
val_key_two = long(key_two.encode('hex'), 16)
return self.val_key_one ^ val_key_two
def is_closer(self, a, b):
"""Returns true is `a` is closer to `key` than `b` is"""
return self(a) < self(b)
def to_contact(self, contact):
"""A convenience function for calculating the distance to a contact"""
return self(contact.id)
class ExpensiveSort(object):
"""Sort a list in place.
The result of `key(item)` is cached for each item in the `to_sort`
list as an optimization. This can be useful when `key` is
expensive.
Attributes:
to_sort: a list of items to sort
key: callable, like `key` in normal python sort
attr: the attribute name used to cache the value on each item.
"""
def __init__(self, to_sort, key, attr='__value'):
self.to_sort = to_sort
self.key = key
self.attr = attr
def sort(self):
self._cacheValues()
self._sortByValue()
self._removeValue()
def _cacheValues(self):
for item in self.to_sort:
setattr(item, self.attr, self.key(item))
def _sortByValue(self):
self.to_sort.sort(key=operator.attrgetter(self.attr))
def _removeValue(self):
for item in self.to_sort:
delattr(item, self.attr)
def main():
parser = argparse.ArgumentParser(description="Launch a dht node")
parser.add_argument("udp_port", help="The UDP port on which the node will listen",

View file

@ -208,7 +208,6 @@ class KademliaProtocol(protocol.DatagramProtocol):
seqNumber = 0
startPos = 0
while seqNumber < totalPackets:
#reactor.iterate() #IGNORE:E1101
packetData = data[startPos:startPos+self.msgSizeLimit]
encSeqNumber = chr(seqNumber >> 8) + chr(seqNumber & 0xff)
txData = '\x00%s%s%s\x00%s' % (encTotalPackets, encSeqNumber, rpcID, packetData)
@ -270,13 +269,8 @@ class KademliaProtocol(protocol.DatagramProtocol):
if callable(func) and hasattr(func, 'rpcmethod'):
# Call the exposed Node method and return the result to the deferred callback chain
try:
##try:
## # Try to pass the sender's node id to the function...
kwargs = {'_rpcNodeID': senderContact.id, '_rpcNodeContact': senderContact}
result = func(*args, **kwargs)
##except TypeError:
## # ...or simply call it if that fails
## result = func(*args)
except Exception, e:
df.errback(failure.Failure(e))
else:
@ -288,34 +282,41 @@ class KademliaProtocol(protocol.DatagramProtocol):
def _msgTimeout(self, messageID):
""" Called when an RPC request message times out """
# Find the message that timed out
if self._sentMessages.has_key(messageID):
if not self._sentMessages.has_key(messageID):
# This should never be reached
log.error("deferred timed out, but is not present in sent messages list!")
return
remoteContactID, df = self._sentMessages[messageID][0:2]
if self._partialMessages.has_key(messageID):
# We are still receiving this message
# See if any progress has been made; if not, kill the message
if self._partialMessagesProgress.has_key(messageID):
same_length = (
len(self._partialMessagesProgress[messageID]) ==
len(self._partialMessages[messageID]))
if same_length:
# No progress has been made
del self._partialMessagesProgress[messageID]
del self._partialMessages[messageID]
df.errback(failure.Failure(TimeoutError(remoteContactID)))
return
# Reset the RPC timeout timer
timeoutCall = reactor.callLater(
constants.rpcTimeout, self._msgTimeout, messageID) #IGNORE:E1101
self._sentMessages[messageID] = (remoteContactID, df, timeoutCall)
self._msgTimeoutInProgress(messageID, remoteContactID, df)
return
del self._sentMessages[messageID]
# The message's destination node is now considered to be dead;
# raise an (asynchronous) TimeoutError exception and update the host node
self._node.removeContact(remoteContactID)
df.errback(failure.Failure(TimeoutError(remoteContactID)))
def _msgTimeoutInProgress(self, messageID, remoteContactID, df):
# See if any progress has been made; if not, kill the message
if self._hasProgressBeenMade(messageID):
# Reset the RPC timeout timer
timeoutCall = reactor.callLater(constants.rpcTimeout, self._msgTimeout, messageID)
self._sentMessages[messageID] = (remoteContactID, df, timeoutCall)
else:
# This should never be reached
log.error("deferred timed out, but is not present in sent messages list!")
# No progress has been made
del self._partialMessagesProgress[messageID]
del self._partialMessages[messageID]
df.errback(failure.Failure(TimeoutError(remoteContactID)))
def _hasProgressBeenMade(self, messageID):
return (
self._partialMessagesProgress.has_key(messageID) and
(
len(self._partialMessagesProgress[messageID]) !=
len(self._partialMessages[messageID])
)
)
def stopProtocol(self):
""" Called when the transport is disconnected.

View file

@ -31,16 +31,6 @@ class RoutingTable(object):
@type contact: kademlia.contact.Contact
"""
def distance(self, keyOne, keyTwo):
""" Calculate the XOR result between two string variables
@return: XOR result of two long variables
@rtype: long
"""
valKeyOne = long(keyOne.encode('hex'), 16)
valKeyTwo = long(keyTwo.encode('hex'), 16)
return valKeyOne ^ valKeyTwo
def findCloseNodes(self, key, count, _rpcNodeID=None):
""" Finds a number of known nodes closest to the node/value with the
specified key.
@ -208,9 +198,6 @@ class TreeRoutingTable(RoutingTable):
node is returning all of the contacts that it knows of.
@rtype: list
"""
#if key == self.id:
# bucketIndex = 0 #TODO: maybe not allow this to continue?
#else:
bucketIndex = self._kbucketIndex(key)
closestNodes = self._buckets[bucketIndex].getContacts(constants.k, _rpcNodeID)
# This method must return k contacts (even if we have the node
@ -290,7 +277,6 @@ class TreeRoutingTable(RoutingTable):
try:
self._buckets[bucketIndex].removeContact(contactID)
except ValueError:
#print 'removeContact(): Contact not in routing table'
return
def touchKBucket(self, key):
@ -427,7 +413,6 @@ class OptimizedTreeRoutingTable(TreeRoutingTable):
try:
contact = self._buckets[bucketIndex].getContact(contactID)
except ValueError:
#print 'removeContact(): Contact not in routing table'
return
contact.failedRPCs += 1
if contact.failedRPCs >= 5:

View file

@ -25,7 +25,6 @@
import sys, hashlib, random
import twisted.internet.reactor
from lbrynet.dht.node import Node
#from entangled.kademlia.datastore import SQLiteDataStore
# The Entangled DHT node; instantiated in the main() method
node = None
@ -77,7 +76,6 @@ def getValue():
binascii.unhexlify("f7d9dc4de674eaa2c5a022eb95bc0d33ec2e75c6"))
deferredResult = node.iterativeFindValue(
binascii.unhexlify("f7d9dc4de674eaa2c5a022eb95bc0d33ec2e75c6"))
#deferredResult = node.iterativeFindValue(KEY)
# Add a callback to this result; this will be called as soon as the operation has completed
deferredResult.addCallback(getValueCallback)
# As before, add the generic error callback
@ -91,19 +89,8 @@ def getValueCallback(result):
# contacts would be returned instead")
print "Got the value"
print result
#if type(result) == dict:
# for v in result[binascii.unhexlify("5292fa9c426621f02419f5050900392bdff5036c")]:
# print "v:", v
# print "v[6:", v[6:]
# print "lbryid:",lbryid
# print "lbryid == v[6:]:", lbryid == v[6:]
# print 'Value successfully retrieved: %s' % result[KEY]
#else:
# print 'Value not found'
# Either way, schedule a "delete" operation for the key
#print 'Scheduling removal in 2.5 seconds...'
#twisted.internet.reactor.callLater(2.5, deleteValue)
print 'Scheduling shutdown in 2.5 seconds...'
twisted.internet.reactor.callLater(2.5, stop)
@ -151,9 +138,6 @@ if __name__ == '__main__':
print 'Run this script without any arguments for info.\n'
# Set up SQLite-based data store (you could use an in-memory store instead, for example)
#if os.path.isfile('/tmp/dbFile%s.db' % sys.argv[1]):
# os.remove('/tmp/dbFile%s.db' % sys.argv[1])
#dataStore = SQLiteDataStore(dbFile = '/tmp/dbFile%s.db' % sys.argv[1])
#
# Create the Entangled node. It extends the functionality of a
# basic Kademlia node (but is fully backwards-compatible with a
@ -162,14 +146,12 @@ if __name__ == '__main__':
# If you wish to have a pure Kademlia network, use the
# entangled.kademlia.node.Node class instead
print 'Creating Node...'
#node = EntangledNode( udpPort=int(sys.argv[1]), dataStore=dataStore )
node = Node(udpPort=int(sys.argv[1]), lbryid=lbryid)
# Schedule the node to join the Kademlia/Entangled DHT
node.joinNetwork(knownNodes)
# Schedule the "storeValue() call to be invoked after 2.5 seconds,
#using KEY and VALUE as arguments
#twisted.internet.reactor.callLater(2.5, storeValue, KEY, VALUE)
twisted.internet.reactor.callLater(2.5, getValue)
# Start the Twisted reactor - this fires up all networking, and
# allows the scheduled join operation to take place

View file

@ -74,10 +74,8 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
d.addCallbacks(_save_claim_id, lambda err: _notify_bad_claim(name, txid, nout))
return d
reflector_server = random.choice(settings.reflector_servers)
d.addCallback(_save_stream_info)
d.addCallback(lambda _: reupload.check_and_restore_availability(self, reflector_server))
d.addCallback(lambda _: self._reupload())
d.addCallback(lambda _: self.lbry_file_manager.get_lbry_file_status(self))
def restore_status(status):
@ -92,6 +90,12 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
d.addCallback(restore_status)
return d
def _reupload(self):
if not settings.reflector_reupload:
return
reflector_server = random.choice(settings.reflector_servers)
return reupload.check_and_restore_availability(self, reflector_server)
def stop(self, err=None, change_status=True):
def set_saving_status_done():

View file

@ -152,7 +152,6 @@ class StdinStreamProducer(object):
self.finished_deferred = defer.Deferred()
self.consumer.registerProducer(self, True)
#self.reader = process.ProcessReader(reactor, self, 'read', 0)
self.resumeProducing()
return self.finished_deferred

View file

@ -46,7 +46,6 @@ class LiveStreamDownloader(_LiveStreamDownloader):
_LiveStreamDownloader.__init__(self, stream_hash, peer_finder, rate_limiter, blob_manager,
stream_info_manager, payment_rate_manager, wallet, upload_allowed)
#self.writer = process.ProcessWriter(reactor, self, 'write', 1)
def _get_metadata_handler(self, download_manager):
return LiveStreamMetadataHandler(self.stream_hash, self.stream_info_manager,
@ -61,7 +60,6 @@ class LiveStreamDownloader(_LiveStreamDownloader):
def _get_write_func(self):
def write_func(data):
if self.stopped is False:
#self.writer.write(data)
pass
return write_func

View file

@ -39,7 +39,8 @@ from lbrynet.lbrynet_daemon.Downloader import GetStream
from lbrynet.lbrynet_daemon.Publisher import Publisher
from lbrynet.lbrynet_daemon.ExchangeRateManager import ExchangeRateManager
from lbrynet.lbrynet_daemon.auth.server import AuthJSONRPCServer
from lbrynet.core import log_support, utils, Platform
from lbrynet.core import log_support, utils
from lbrynet.core import system_info
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader
from lbrynet.core.Session import Session
@ -354,7 +355,7 @@ class Daemon(AuthJSONRPCServer):
def _get_platform(self):
if self.platform is None:
self.platform = Platform.get_platform()
self.platform = system_info.get_platform()
self.platform["ui_version"] = self.lbry_ui_manager.loaded_git_version
return self.platform
@ -602,14 +603,14 @@ class Daemon(AuthJSONRPCServer):
d = defer.succeed(None)
d.addCallback(lambda _: self._stop_server())
d.addErrback(log_support.failure, log, 'Failure while shutting down: %s')
d.addErrback(log.fail(), 'Failure while shutting down')
d.addCallback(lambda _: self._stop_reflector())
d.addErrback(log_support.failure, log, 'Failure while shutting down: %s')
d.addErrback(log.fail(), 'Failure while shutting down')
d.addCallback(lambda _: self._stop_file_manager())
d.addErrback(log_support.failure, log, 'Failure while shutting down: %s')
d.addErrback(log.fail(), 'Failure while shutting down')
if self.session is not None:
d.addCallback(lambda _: self.session.shut_down())
d.addErrback(log_support.failure, log, 'Failure while shutting down: %s')
d.addErrback(log.fail(), 'Failure while shutting down')
return d
def _update_settings(self, settings):
@ -1353,7 +1354,7 @@ class Daemon(AuthJSONRPCServer):
"""
if not p:
return self._render_response(self.callable_methods.keys(), OK_CODE)
return self._render_response(sorted(self.callable_methods.keys()), OK_CODE)
elif 'callable_during_start' in p.keys():
return self._render_response(self.allowed_during_startup, OK_CODE)
elif 'function' in p.keys():
@ -1468,9 +1469,20 @@ class Daemon(AuthJSONRPCServer):
return self._render_response(None, BAD_REQUEST)
d = self._resolve_name(name, force_refresh=force)
# TODO: this is the rpc call that returns a server.failure.
# what is up with that?
d.addCallbacks(
lambda info: self._render_response(info, OK_CODE),
errback=handle_failure, errbackArgs=('Failed to resolve name: %s',)
# TODO: Is server.failure a module? It looks like it:
#
# In [1]: import twisted.web.server
# In [2]: twisted.web.server.failure
# Out[2]: <module 'twisted.python.failure' from
# '.../site-packages/twisted/python/failure.pyc'>
#
# If so, maybe we should return something else.
errback=log.fail(lambda: server.failure),
errbackArgs=('Failed to resolve name: %s',)
)
return d
@ -1697,8 +1709,6 @@ class Daemon(AuthJSONRPCServer):
'metadata': metadata dictionary
optional 'fee'
Returns:
'success' : True if claim was succesful , False otherwise
'reason' : if not succesful, give reason
'txid' : txid of resulting transaction if succesful
'nout' : nout of the resulting support claim if succesful
'fee' : fee paid for the claim transaction if succesful
@ -1773,8 +1783,6 @@ class Daemon(AuthJSONRPCServer):
'txid': txid of claim, string
'nout': nout of claim, integer
Return:
success : True if succesful , False otherwise
reason : if not succesful, give reason
txid : txid of resulting transaction if succesful
fee : fee paid for the transaction if succesful
"""
@ -1818,8 +1826,6 @@ class Daemon(AuthJSONRPCServer):
'claim_id': claim id of claim to support
'amount': amount to support by
Return:
success : True if succesful , False otherwise
reason : if not succesful, give reason
txid : txid of resulting transaction if succesful
nout : nout of the resulting support claim if succesful
fee : fee paid for the transaction if succesful
@ -2682,18 +2688,6 @@ def get_lbry_file_search_value(p):
raise NoValidSearch()
def handle_failure(err, msg):
log_support.failure(err, log, msg)
# TODO: Is this a module? It looks like it:
#
# In [1]: import twisted.web.server
# In [2]: twisted.web.server.failure
# Out[2]: <module 'twisted.python.failure' from '.../site-packages/twisted/python/failure.pyc'>
#
# If so, maybe we should return something else.
return server.failure
def run_reflector_factory(factory):
reflector_server = random.choice(conf.settings.reflector_servers)
reflector_address, reflector_port = reflector_server

View file

@ -117,9 +117,8 @@ def update_settings_from_args(args):
settings.update(to_pass)
def log_and_kill(failure, analytics_manager):
def kill(failure, analytics_manager):
analytics_manager.send_server_startup_error(failure.getErrorMessage() + " " + str(failure))
log_support.failure(failure, log, 'Failed to startup: %s')
reactor.callFromThread(reactor.stop)
@ -130,14 +129,13 @@ def start_server_and_listen(launchui, use_auth, analytics_manager):
launchui: set to true to open a browser window
use_auth: set to true to enable http authentication
analytics_manager: to send analytics
kwargs: passed along to `DaemonServer().start()`
"""
daemon_server = DaemonServer(analytics_manager)
d = daemon_server.start(use_auth)
if launchui:
d.addCallback(lambda _: webbrowser.open(settings.UI_ADDRESS))
d.addCallback(lambda _: analytics_manager.send_server_startup_success())
d.addErrback(log_and_kill, analytics_manager)
d.addErrback(log.fail(kill, analytics_manager), 'Failed to startup')
if __name__ == "__main__":

View file

@ -76,7 +76,6 @@ class DaemonRequest(server.Request):
try:
self.content.seek(0, 0)
args.update(self.parse_multipart(self.content, pdict))
#args.update(cgi.parse_multipart(self.content, pdict))
except KeyError as e:
if e.args[0] == b'content-disposition':

View file

@ -5,7 +5,6 @@ import random
from twisted.internet import threads, defer, reactor
from lbrynet.core import log_support
from lbrynet.lbryfilemanager.EncryptedFileCreator import create_lbry_file
from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob
from lbrynet.metadata.Metadata import Metadata
@ -68,7 +67,12 @@ class Publisher(object):
d.addCallback(lambda _: self._claim_name())
d.addCallback(lambda _: self.set_status())
d.addCallback(lambda _: self.start_reflector())
d.addCallbacks(lambda _: _show_result(), self._show_publish_error)
d.addCallbacks(
lambda _: _show_result(),
errback=log.fail(self._throw_publish_error),
errbackArgs=(
"An error occurred publishing %s to %s", self.file_name, self.publish_name)
)
return d
def start_reflector(self):
@ -151,11 +155,13 @@ class Publisher(object):
self.metadata['content_type'] = get_content_type(filename)
self.metadata['ver'] = Metadata.current_version
def _show_publish_error(self, err):
log_support.failure(
err, log, "An error occurred publishing %s to %s. Error: %s.",
self.file_name, self.publish_name)
return defer.fail(Exception("Publish failed"))
def _throw_publish_error(self, err):
# TODO: I'm not a fan of the log and re-throw, especially when
# the new exception is more generic. Look over this to
# see if there is a reason not to remove the errback
# handler and allow the original exception to move up
# the stack.
raise Exception("Publish failed")
def get_content_type(filename):

View file

@ -10,7 +10,6 @@ from twisted.python.failure import Failure
from txjsonrpc import jsonrpclib
from lbrynet.core.Error import InvalidAuthenticationToken, InvalidHeaderError, SubhandlerError
from lbrynet.conf import settings
from lbrynet.core import log_support
from lbrynet.lbrynet_daemon.auth.util import APIKey, get_auth_message
from lbrynet.lbrynet_daemon.auth.client import LBRY_SECRET
@ -117,11 +116,6 @@ class AuthJSONRPCServer(AuthorizedBase):
request.write(fault)
request.finish()
def _log_and_render_error(self, failure, request, message=None, **kwargs):
msg = message or "API Failure: %s"
log_support.failure(Failure(failure), log, msg)
self._render_error(failure, request, **kwargs)
def render(self, request):
notify_finish = request.notifyFinish()
assert self._check_headers(request), InvalidHeaderError
@ -192,7 +186,10 @@ class AuthJSONRPCServer(AuthorizedBase):
# cancel the response if the connection is broken
notify_finish.addErrback(self._response_failed, d)
d.addCallback(self._callback_render, request, version, reply_with_next_secret)
d.addErrback(self._log_and_render_error, request, version=version)
d.addErrback(
log.fail(self._render_error, request, version=version),
'Failed to process %s', function_name
)
return server.NOT_DONE_YET
def _register_user_session(self, session_id):
@ -285,7 +282,6 @@ class AuthJSONRPCServer(AuthorizedBase):
assert api_key.compare_hmac(to_auth, token), InvalidAuthenticationToken
def _update_session_secret(self, session_id):
# log.info("Generating new token for next request")
self.sessions.update({session_id: APIKey.new(name=session_id)})
def _get_jsonrpc_version(self, version=None, id=None):

View file

@ -5,7 +5,6 @@ from twisted.protocols.basic import FileSender
from twisted.internet.protocol import Protocol, ClientFactory
from twisted.internet import defer, error
from lbrynet.core import log_support
from lbrynet.reflector.common import IncompleteResponse
@ -153,8 +152,7 @@ class BlobReflectorClient(Protocol):
'blob_size': self.next_blob_to_send.length
}))
def log_fail_and_disconnect(self, err, blob_hash):
log_support.failure(err, log, "Error reflecting blob %s: %s", blob_hash)
def disconnect(self, err):
self.transport.loseConnection()
def send_next_request(self):
@ -172,7 +170,9 @@ class BlobReflectorClient(Protocol):
# send the server the next blob hash + length
d.addCallbacks(
lambda _: self.send_blob_info(),
lambda err: self.log_fail_and_disconnect(err, blob_hash))
errback=log.fail(self.disconnect),
errbackArgs=("Error reflecting blob %s", blob_hash)
)
return d
else:
# close connection

View file

@ -30,7 +30,6 @@ class ReflectorServer(Protocol):
def dataReceived(self, data):
if self.receiving_blob:
# log.debug('Writing data to blob')
self.blob_write(data)
else:
log.debug('Not yet recieving blob, data needs further processing')

View file

@ -165,7 +165,6 @@ class SysTrayIcon(object):
def show_menu(self):
menu = win32gui.CreatePopupMenu()
self.create_menu(menu, self.menu_options)
# win32gui.SetMenuDefaultItem(menu, 1000, 0)
pos = win32gui.GetCursorPos()
# See http://msdn.microsoft.com/library/default.asp?url=/library/en-us/winui/menus_0hdi.asp

View file

@ -8,4 +8,7 @@ pylint -E --disable=inherit-non-class --disable=no-member \
--enable=unused-import \
--enable=bad-whitespace \
--enable=line-too-long \
--enable=trailing-whitespace \
--enable=missing-final-newline \
--enable=mixed-indentation \
lbrynet $@

View file

@ -51,14 +51,6 @@ requires = [
]
console_scripts = [
# 'lbrynet-stdin-uploader = lbrynet.lbrynet_console.LBRYStdinUploader:launch_stdin_uploader',
# 'lbrynet-stdout-downloader = lbrynet.lbrynet_console.LBRYStdoutDownloader:launch_stdout_downloader',
# 'lbrynet-create-network = lbrynet.create_network:main',
# 'lbrynet-launch-node = lbrynet.dht.node:main',
# 'lbrynet-launch-rpc-node = lbrynet.rpc_node:main',
# 'lbrynet-rpc-node-cli = lbrynet.node_rpc_cli:main',
# 'lbrynet-lookup-hosts-for-hash = lbrynet.dht_scripts:get_hosts_for_hash_in_dht',
# 'lbrynet-announce_hash_to_dht = lbrynet.dht_scripts:announce_hash_to_dht',
'lbrynet-daemon = lbrynet.lbrynet_daemon.DaemonControl:start',
'stop-lbrynet-daemon = lbrynet.lbrynet_daemon.DaemonControl:stop',
'lbrynet-cli = lbrynet.lbrynet_daemon.DaemonCLI:main'

View file

@ -0,0 +1,4 @@
# log_support setups the default Logger class
# and so we need to ensure that it is also
# setup for the tests
from lbrynet.core import log_support

View file

@ -54,12 +54,13 @@ class NodeDataTest(unittest.TestCase):
h.update(str(i))
self.cases.append((h.digest(), 5000+2*i))
self.cases.append((h.digest(), 5001+2*i))
<<<<<<< Updated upstream
#(('a', 'hello there\nthis is a test'),
# ('b', unicode('jasdklfjklsdj;f2352352ljklzsdlkjkasf\ndsjklafsd')),
# ('e', 123),
# ('f', [('this', 'is', 1), {'complex': 'data entry'}]),
# ('aMuchLongerKeyThanAnyOfThePreviousOnes', 'some data'))
=======
>>>>>>> Stashed changes
def testStore(self):
def check_val_in_result(r, peer_info):
@ -105,31 +106,17 @@ class NodeContactTest(unittest.TestCase):
self.failIf(contact in closestNodes, 'Node added itself as a contact')
#class NodeLookupTest(unittest.TestCase):
<<<<<<< Updated upstream
# """ Test case for the Node class's iterative node lookup algorithm """
# def setUp(self):
# import entangled.kademlia.contact
# self.node = entangled.kademlia.node.Node()
# self.remoteNodes = []
# for i in range(10):
# remoteNode = entangled.kademlia.node.Node()
# remoteContact = entangled.kademlia.contact.Contact(remoteNode.id, '127.0.0.1', 91827+i, self.node._protocol)
# self.remoteNodes.append(remoteNode)
# self.node.addContact(remoteContact)
# def testIterativeFindNode(self):
# """ Ugly brute-force test to see if the iterative node lookup algorithm runs without failing """
# import entangled.kademlia.protocol
# entangled.kademlia.protocol.reactor.listenUDP(91826, self.node._protocol)
# for i in range(10):
# entangled.kademlia.protocol.reactor.listenUDP(91827+i, self.remoteNodes[i]._protocol)
# df = self.node.iterativeFindNode(self.node.id)
# df.addBoth(lambda _: entangled.kademlia.protocol.reactor.stop())
# entangled.kademlia.protocol.reactor.run()
=======
>>>>>>> Stashed changes
""" Some scaffolding for the NodeLookupTest class. Allows isolated node testing by simulating remote node responses"""
"""Some scaffolding for the NodeLookupTest class. Allows isolated
node testing by simulating remote node responses"""
from twisted.internet import protocol, defer, selectreactor
from lbrynet.dht.msgtypes import ResponseMessage
@ -149,22 +136,17 @@ class FakeRPCProtocol(protocol.DatagramProtocol):
""" Fake RPC protocol; allows entangled.kademlia.contact.Contact objects to "send" RPCs """
def sendRPC(self, contact, method, args, rawResponse=False):
#print method + " " + str(args)
if method == "findNode":
# get the specific contacts closest contacts
closestContacts = []
#print "contact" + contact.id
for contactTuple in self.network:
#print contactTuple[0].id
if contact == contactTuple[0]:
# get the list of closest contacts for this contact
closestContactsList = contactTuple[1]
#print "contact" + contact.id
# Pack the closest contacts into a ResponseMessage
for closeContact in closestContactsList:
#print closeContact.id
closestContacts.append((closeContact.id, closeContact.address, closeContact.port))
message = ResponseMessage("rpcId", contact.id, closestContacts)
@ -221,9 +203,11 @@ class NodeLookupTest(unittest.TestCase):
self.updPort = 81173
<<<<<<< Updated upstream
# create a dummy reactor
#self._protocol.reactor.listenUDP(self.updPort, self._protocol)
=======
>>>>>>> Stashed changes
self.contactsAmount = 80
# set the node ID manually for testing
self.node.id = '12345678901234567800'
@ -233,7 +217,6 @@ class NodeLookupTest(unittest.TestCase):
# create 160 bit node ID's for test purposes
self.testNodeIDs = []
#idNum = long(self.node.id.encode('hex'), 16)
idNum = int(self.node.id)
for i in range(self.contactsAmount):
# create the testNodeIDs in ascending order, away from the actual node ID, with regards to the distance metric
@ -284,7 +267,6 @@ class NodeLookupTest(unittest.TestCase):
for item in self.contacts[0:6]:
expectedResult.append(item.id)
#print item.id
# Get the result from the deferred
activeContacts = df.result
@ -299,150 +281,6 @@ class NodeLookupTest(unittest.TestCase):
self.failUnlessEqual(activeContacts, expectedResult, \
"Active should only contain the closest possible contacts which were used as input for the boostrap")
# def testFindingCloserNodes(self):
# """ Test discovery of closer contacts"""
#
# # Use input contacts that have knowledge of closer contacts,
# df = self.node._iterativeFind(self.node.id, self.contacts[50:53])
# #set the expected result
# expectedResult = []
# #print "############ Expected Active contacts #################"
# for item in self.contacts[0:9]:
# expectedResult.append(item.id)
# #print item.id
# #print "#######################################################"
#
# # Get the result from the deferred
# activeContacts = df.result
#
# #print "!!!!!!!!!!! Receieved Active contacts !!!!!!!!!!!!!!!"
# #for item in activeContacts:
# # print item.id
# #print "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
#
# # Check the length of the active contacts
# self.failUnlessEqual(activeContacts.__len__(), expectedResult.__len__(), \
# "Length of received active contacts not as expected, should be %d" %expectedResult.__len__())
#
#
# # Check that the received active contacts are now closer to this node
# self.failUnlessEqual(activeContacts, expectedResult, \
# "Active contacts should now only contain the closest possible contacts")
# def testIterativeStore(self):
# """ test storing values """
#
# # create the network of contacts in format: (contact, closest contacts)
# contactNetwork = ((self.contacts[0], self.contacts[0:8]),
# (self.contacts[1], self.contacts[0:8]),
# (self.contacts[2], self.contacts[0:8]),
# (self.contacts[3], self.contacts[0:8]),
# (self.contacts[4], self.contacts[0:8]),
# (self.contacts[5], self.contacts[0:8]),
# (self.contacts[6], self.contacts[0:8]),
# (self.contacts[7], self.contacts[0:8]),
# (self.contacts[8], self.contacts[0:8]),
# (self.contacts[40], self.contacts[41:48]),
# (self.contacts[41], self.contacts[41:48]),
# (self.contacts[42], self.contacts[41:48]),
# (self.contacts[43], self.contacts[41:48]),
# (self.contacts[44], self.contacts[41:48]),
# (self.contacts[45], self.contacts[41:48]),
# (self.contacts[46], self.contacts[41:48]),
# (self.contacts[47], self.contacts[41:48]),
# (self.contacts[48], self.contacts[41:48]))
# contacts_with_datastores = []
#
# for contact_tuple in contactNetwork:
# contacts_with_datastores.append((contact_tuple[0], contact_tuple[1], lbrynet.dht.datastore.DictDataStore()))
#
# self._protocol.createNetwork(contacts_with_datastores)
#
#
# #self._protocol.createNetwork(contactNetwork)
#
#
# # Test storing a value that has an hash id close to the known contacts
# # The value should only be stored at those nodes
# value = 'value'
# valueID = self.contacts[40].id
#
# # Manually populate the routing table with contacts that have ID's close to the valueID
# for contact in self.contacts[40:48]:
# self.node.addContact(contact)
#
# # Manually populate the routing table with contacts that have ID's far away from the valueID
# for contact in self.contacts[0:8]:
# self.node.addContact(contact)
#
# # Store the value
# df = self.node.announceHaveBlob(valueID, value)
#
# storageNodes = df.result
#
# storageNodeIDs = []
# for item in storageNodes:
# storageNodeIDs.append(item.id)
# storageNodeIDs.sort()
# #print storageNodeIDs
#
# expectedIDs = []
# for item in self.contacts[40:43]:
# expectedIDs.append(item.id)
# #print expectedIDs
#
# #print '#### storage nodes ####'
# #for node in storageNodes:
# # print node.id
#
#
# # check that the value has been stored at nodes with ID's close to the valueID
# self.failUnlessEqual(storageNodeIDs, expectedIDs, \
# "Value not stored at nodes with ID's close to the valueID")
#
# def testFindValue(self):
# # create test values using the contact ID as the key
# testValues = ({self.contacts[0].id: "some test data"},
# {self.contacts[1].id: "some more test data"},
# {self.contacts[8].id: "and more data"}
# )
#
#
# # create the network of contacts in format: (contact, closest contacts, values)
# contactNetwork = ((self.contacts[0], self.contacts[0:6], testValues[0]),
# (self.contacts[1], self.contacts[0:6], testValues[1]),
# (self.contacts[2], self.contacts[0:6], {'2':'2'}),
# (self.contacts[3], self.contacts[0:6], {'4':'5'}),
# (self.contacts[4], self.contacts[0:6], testValues[2]),
# (self.contacts[5], self.contacts[0:6], {'2':'2'}),
# (self.contacts[6], self.contacts[0:6], {'2':'2'}))
#
# self._protocol.createNetwork(contactNetwork)
#
# # Initialise the routing table with some contacts
# for contact in self.contacts[0:4]:
# self.node.addContact(contact)
#
# # Initialise the node with some known contacts
# #self.node._iterativeFind(self.node.id, self.contacts[0:3])
#
# df = self.node.iterativeFindValue(testValues[1].keys()[0])
#
# resultDict = df.result
# keys = resultDict.keys()
#
# for key in keys:
# if key == 'closestNodeNoValue':
# print "closest contact without data " + " " + resultDict.get(key).id
# else:
# print "data key :" + key + "; " + "data: " + resultDict.get(key)
def suite():
suite = unittest.TestSuite()
@ -452,6 +290,7 @@ def suite():
suite.addTest(unittest.makeSuite(NodeLookupTest))
return suite
if __name__ == '__main__':
# If this module is executed from the commandline, run all its tests
unittest.TextTestRunner().run(suite())

View file

@ -68,16 +68,12 @@ class ClientDatagramProtocol(lbrynet.dht.protocol.KademliaProtocol):
lbrynet.dht.protocol.KademliaProtocol.__init__(self, None)
def startProtocol(self):
#self.transport.connect(self.destination[0], self.destination[1])
self.sendDatagram()
def sendDatagram(self):
if len(self.data):
self._send(self.data, self.msgID, self.destination)
# def datagramReceived(self, datagram, host):
# print 'Datagram received: ', repr(datagram)
# self.sendDatagram()
@ -193,44 +189,6 @@ class KademliaProtocolTest(unittest.TestCase):
# The list of sent RPC messages should be empty at this stage
self.failUnlessEqual(len(self.protocol._sentMessages), 0, 'The protocol is still waiting for a RPC result, but the transaction is already done!')
# def testDatagramLargeMessageReconstruction(self):
# """ Tests if a large amount of data can be successfully re-constructed from multiple UDP datagrams """
# remoteContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol)
# self.node.addContact(remoteContact)
# self.error = None
# #responseData = 8143 * '0' # Threshold for a single packet transmission
# responseData = 300000 * '0'
# def handleError(f):
# if f.check((lbrynet.dht.protocol.TimeoutError)):
# self.error = 'RPC from the following contact timed out: %s' % f.getErrorMessage()
# else:
# self.error = 'An RPC error occurred: %s' % f.getErrorMessage()
# def handleResult(result):
# if result != responseData:
# self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % (responseData, result)
# # Publish the "local" node on the network
# lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol)
# # ...and make it think it is waiting for a result from an RPC
# msgID = 'abcdefghij1234567890'
# df = defer.Deferred()
# timeoutCall = lbrynet.dht.protocol.reactor.callLater(lbrynet.dht.constants.rpcTimeout, self.protocol._msgTimeout, msgID)
# self.protocol._sentMessages[msgID] = (remoteContact.id, df, timeoutCall)
# # Simulate the "reply" transmission
# msg = lbrynet.dht.msgtypes.ResponseMessage(msgID, 'node2', responseData)
# msgPrimitive = self.protocol._translator.toPrimitive(msg)
# encodedMsg = self.protocol._encoder.encode(msgPrimitive)
# udpClient = ClientDatagramProtocol()
# udpClient.data = encodedMsg
# udpClient.msgID = msgID
# lbrynet.dht.protocol.reactor.listenUDP(0, udpClient)
# df.addCallback(handleResult)
# df.addErrback(handleError)
# df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop())
# lbrynet.dht.protocol.reactor.run()
# self.failIf(self.error, self.error)
# # The list of sent RPC messages should be empty at this stage
# #self.failUnlessEqual(len(self.protocol._sentMessages), 0, 'The protocol is still waiting for a RPC result, but the transaction is already done!')
def suite():
suite = unittest.TestSuite()

View file

@ -167,7 +167,7 @@ class BlobAvailabilityTracker(BlobAvailability.BlobAvailabilityTracker):
self._dht_node = None
self._check_popular = None
self._check_mine = None
self._get_mean_peers()
self._set_mean_peers()
def start(self):
pass

View file

@ -0,0 +1,130 @@
from twisted.trial import unittest
from twisted.internet import threads, defer
from lbrynet.core.Wallet import Wallet
test_metadata = {
'license': 'NASA',
'fee': {'USD': {'amount': 0.01, 'address': 'baBYSK7CqGSn5KrEmNmmQwAhBSFgo6v47z'}},
'ver': '0.0.3',
'description': 'test',
'language': 'en',
'author': 'test',
'title': 'test',
'sources': {
'lbry_sd_hash': '8655f713819344980a9a0d67b198344e2c462c90f813e86f0c63789ab0868031f25c54d0bb31af6658e997e2041806eb'},
'nsfw': False,
'content_type': 'video/mp4',
'thumbnail': 'test'
}
class MocLbryumWallet(Wallet):
def __init__(self):
pass
def get_name_claims(self):
return threads.deferToThread(lambda: [])
def _save_name_metadata(self, name, claim_outpoint, sd_hash):
return defer.succeed(True)
class WalletTest(unittest.TestCase):
def test_failed_send_name_claim(self):
def not_enough_funds_send_name_claim(self, name, val, amount):
claim_out = {'success':False, 'reason':'Not enough funds'}
return claim_out
MocLbryumWallet._send_name_claim = not_enough_funds_send_name_claim
wallet = MocLbryumWallet()
d = wallet.claim_name('test', 1, test_metadata)
self.assertFailure(d,Exception)
return d
def test_successful_send_name_claim(self):
expected_claim_out = {
"claimid": "f43dc06256a69988bdbea09a58c80493ba15dcfa",
"fee": "0.00012",
"nout": 0,
"success": True,
"txid": "6f8180002ef4d21f5b09ca7d9648a54d213c666daf8639dc283e2fd47450269e"
}
def check_out(claim_out):
self.assertTrue('success' not in claim_out)
self.assertEqual(expected_claim_out['claimid'], claim_out['claimid'])
self.assertEqual(expected_claim_out['fee'], claim_out['fee'])
self.assertEqual(expected_claim_out['nout'], claim_out['nout'])
self.assertEqual(expected_claim_out['txid'], claim_out['txid'])
def success_send_name_claim(self, name, val, amount):
return expected_claim_out
MocLbryumWallet._send_name_claim = success_send_name_claim
wallet = MocLbryumWallet()
d = wallet.claim_name('test', 1, test_metadata)
d.addCallback(lambda claim_out: check_out(claim_out))
return d
def test_failed_support(self):
def failed_support_claim(self, name, claim_id, amount):
claim_out = {'success':False, 'reason':'Not enough funds'}
return threads.deferToThread(lambda: claim_out)
MocLbryumWallet._support_claim = failed_support_claim
wallet = MocLbryumWallet()
d = wallet.support_claim('test', "f43dc06256a69988bdbea09a58c80493ba15dcfa", 1)
self.assertFailure(d,Exception)
return d
def test_succesful_support(self):
expected_support_out = {
"fee": "0.000129",
"nout": 0,
"success": True,
"txid": "11030a76521e5f552ca87ad70765d0cc52e6ea4c0dc0063335e6cf2a9a85085f"
}
def check_out(claim_out):
self.assertTrue('success' not in claim_out)
self.assertEqual(expected_support_out['fee'], claim_out['fee'])
self.assertEqual(expected_support_out['nout'], claim_out['nout'])
self.assertEqual(expected_support_out['txid'], claim_out['txid'])
def success_support_claim(self, name, val, amount):
return threads.deferToThread(lambda: expected_support_out)
MocLbryumWallet._support_claim = success_support_claim
wallet = MocLbryumWallet()
d = wallet.support_claim('test', "f43dc06256a69988bdbea09a58c80493ba15dcfa", 1)
d.addCallback(lambda claim_out: check_out(claim_out))
return d
def test_failed_abandon(self):
def failed_abandon_claim(self, claim_outpoint):
claim_out = {'success':False, 'reason':'Not enough funds'}
return threads.deferToThread(lambda: claim_out)
MocLbryumWallet._abandon_claim = failed_abandon_claim
wallet = MocLbryumWallet()
d = wallet.abandon_claim("11030a76521e5f552ca87ad70765d0cc52e6ea4c0dc0063335e6cf2a9a85085f", 1)
self.assertFailure(d,Exception)
return d
def test_successful_abandon(self):
expected_abandon_out = {
"fee": "0.000096",
"success": True,
"txid": "0578c161ad8d36a7580c557d7444f967ea7f988e194c20d0e3c42c3cabf110dd"
}
def check_out(claim_out):
self.assertTrue('success' not in claim_out)
self.assertEqual(expected_abandon_out['fee'], claim_out['fee'])
self.assertEqual(expected_abandon_out['txid'], claim_out['txid'])
def success_abandon_claim(self, claim_outpoint):
return threads.deferToThread(lambda: expected_abandon_out)
MocLbryumWallet._abandon_claim = success_abandon_claim
wallet = MocLbryumWallet()
d = wallet.abandon_claim("0578c161ad8d36a7580c557d7444f967ea7f988e194c20d0e3c42c3cabf110dd", 1)
d.addCallback(lambda claim_out: check_out(claim_out))
return d

View file

@ -0,0 +1,48 @@
import StringIO
import logging
import mock
from twisted.internet import defer
from twisted.trial import unittest
from lbrynet.core import log_support
class TestLogger(unittest.TestCase):
def raiseError(self):
raise Exception('terrible things happened')
def triggerErrback(self, callback=None):
d = defer.Deferred()
d.addCallback(lambda _: self.raiseError())
d.addErrback(self.log.fail(callback), 'My message')
d.callback(None)
return d
def setUp(self):
self.log = log_support.Logger('test')
self.stream = StringIO.StringIO()
handler = logging.StreamHandler(self.stream)
handler.setFormatter(logging.Formatter("%(filename)s:%(lineno)d - %(message)s"))
self.log.addHandler(handler)
def test_can_log_failure(self):
def output_lines():
return self.stream.getvalue().split('\n')
# the line number could change if this file gets refactored
expected_first_line = 'test_log_support.py:18 - My message: terrible things happened'
# testing the entirety of the message is futile as the
# traceback will depend on the system the test is being run on
# but hopefully these two tests are good enough
d = self.triggerErrback()
d.addCallback(lambda _: self.assertEquals(expected_first_line, output_lines()[0]))
d.addCallback(lambda _: self.assertEqual(10, len(output_lines())))
return d
def test_can_log_failure_with_callback(self):
callback = mock.Mock()
d = self.triggerErrback(callback)
d.addCallback(lambda _: callback.assert_called_once_with(mock.ANY))
return d

View file

@ -17,7 +17,6 @@ import hashlib
class DictDataStoreTest(unittest.TestCase):
""" Basic tests case for the reference DataStore API and implementation """
def setUp(self):
#if not hasattr(self, 'ds'):
self.ds = lbrynet.dht.datastore.DictDataStore()
h = hashlib.sha1()
h.update('g')
@ -29,12 +28,6 @@ class DictDataStoreTest(unittest.TestCase):
h3.update('Boozoo Bajou - 09 - S.I.P.mp3')
hashKey3 = h3.digest()
#self.cases = (('a', 'hello there\nthis is a test'),
# ('b', unicode('jasdklfjklsdj;f2352352ljklzsdlkjkasf\ndsjklafsd')),
# ('e', 123),
# ('f', [('this', 'is', 1), {'complex': 'data entry'}]),
# ('aMuchLongerKeyThanAnyOfThePreviousOnes', 'some data'),
# (hashKey, 'some data'),
# (hashKey2, 'abcdefghijklmnopqrstuvwxz'),
# (hashKey3, '1 2 3 4 5 6 7 8 9 0'))
self.cases = ((hashKey, 'test1test1test1test1test1t'),
(hashKey, 'test2'),
@ -90,88 +83,37 @@ class DictDataStoreTest(unittest.TestCase):
self.failIf('val3' in self.ds.getPeersForBlob(h2), 'DataStore failed to delete an expired value! Value %s, publish time %s, current time %s' % ('val3', str(now - td2), str(now)))
self.failUnless('val4' in self.ds.getPeersForBlob(h2), 'DataStore deleted an unexpired value! Value %s, publish time %s, current time %s' % ('val4', str(now), str(now)))
# def testReplace(self):
# # First write with fake values
# now = int(time.time())
# for key, value in self.cases:
# try:
# self.ds.setItem(key, 'abc', now, now, 'node1')
# except Exception:
# import traceback
# self.fail('Failed writing the following data: key: "%s", data: "%s"\n The error was: %s:' % (key, value, traceback.format_exc(5)))
#
# # write this stuff a second time, with the real values
# for key, value in self.cases:
# try:
# self.ds.setItem(key, value, now, now, 'node1')
# except Exception:
# import traceback
# self.fail('Failed writing the following data: key: "%s", data: "%s"\n The error was: %s:' % (key, value, traceback.format_exc(5)))
#
# self.failUnlessEqual(len(self.ds.keys()), len(self.cases), 'Values did not get overwritten properly; expected %d keys, got %d' % (len(self.cases), len(self.ds.keys())))
# # Read back the data
# for key, value in self.cases:
# self.failUnlessEqual(self.ds[key], value, 'DataStore returned invalid data! Expected "%s", got "%s"' % (value, self.ds[key]))
# def testDelete(self):
# # First some values
# now = int(time.time())
# for key, value in self.cases:
# try:
# self.ds.setItem(key, 'abc', now, now, 'node1')
# except Exception:
# import traceback
# self.fail('Failed writing the following data: key: "%s", data: "%s"\n The error was: %s:' % (key, value, traceback.format_exc(5)))
#
# self.failUnlessEqual(len(self.ds.keys()), len(self.cases), 'Values did not get stored properly; expected %d keys, got %d' % (len(self.cases), len(self.ds.keys())))
#
# # Delete an item from the data
# key, value == self.cases[0]
# del self.ds[key]
# self.failUnlessEqual(len(self.ds.keys()), len(self.cases)-1, 'Value was not deleted; expected %d keys, got %d' % (len(self.cases)-1, len(self.ds.keys())))
# self.failIf(key in self.ds.keys(), 'Key was not deleted: %s' % key)
# def testMetaData(self):
# now = int(time.time())
# age = random.randint(10,3600)
# originallyPublished = []
# for i in range(len(self.cases)):
# originallyPublished.append(now - age)
# # First some values with metadata
# i = 0
# for key, value in self.cases:
# try:
# self.ds.setItem(key, 'abc', now, originallyPublished[i], 'node%d' % i)
# i += 1
# except Exception:
# import traceback
# self.fail('Failed writing the following data: key: "%s", data: "%s"\n The error was: %s:' % (key, value, traceback.format_exc(5)))
#
# # Read back the meta-data
# i = 0
# for key, value in self.cases:
# dsLastPublished = self.ds.lastPublished(key)
# dsOriginallyPublished = self.ds.originalPublishTime(key)
# dsOriginalPublisherID = self.ds.originalPublisherID(key)
# self.failUnless(type(dsLastPublished) == int, 'DataStore returned invalid type for "last published" time! Expected "int", got %s' % type(dsLastPublished))
# self.failUnless(type(dsOriginallyPublished) == int, 'DataStore returned invalid type for "originally published" time! Expected "int", got %s' % type(dsOriginallyPublished))
# self.failUnless(type(dsOriginalPublisherID) == str, 'DataStore returned invalid type for "original publisher ID"; Expected "str", got %s' % type(dsOriginalPublisherID))
# self.failUnlessEqual(dsLastPublished, now, 'DataStore returned invalid "last published" time! Expected "%d", got "%d"' % (now, dsLastPublished))
# self.failUnlessEqual(dsOriginallyPublished, originallyPublished[i], 'DataStore returned invalid "originally published" time! Expected "%d", got "%d"' % (originallyPublished[i], dsOriginallyPublished))
# self.failUnlessEqual(dsOriginalPublisherID, 'node%d' % i, 'DataStore returned invalid "original publisher ID"; Expected "%s", got "%s"' % ('node%d' % i, dsOriginalPublisherID))
# i += 1
#class SQLiteDataStoreTest(DictDataStoreTest):
# def setUp(self):
# self.ds = entangled.kademlia.datastore.SQLiteDataStore()
# DictDataStoreTest.setUp(self)
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(DictDataStoreTest))
#suite.addTest(unittest.makeSuite(SQLiteDataStoreTest))
return suite