lbry-sdk/lbrynet/dht/encoding.py

139 lines
4.9 KiB
Python

from __future__ import print_function
from .error import DecodeError
import sys
if sys.version_info > (3,):
long = int
raw = ord
else:
raw = lambda x: x
class Encoding:
""" Interface for RPC message encoders/decoders
All encoding implementations used with this library should inherit and
implement this.
"""
def encode(self, data):
""" Encode the specified data
@param data: The data to encode
This method has to support encoding of the following
types: C{str}, C{int} and C{long}
Any additional data types may be supported as long as the
implementing class's C{decode()} method can successfully
decode them.
@return: The encoded data
@rtype: str
"""
def decode(self, data):
""" Decode the specified data string
@param data: The data (byte string) to decode.
@type data: str
@return: The decoded data (in its correct type)
"""
class Bencode(Encoding):
""" Implementation of a Bencode-based algorithm (Bencode is the encoding
algorithm used by Bittorrent).
@note: This algorithm differs from the "official" Bencode algorithm in
that it can encode/decode floating point values in addition to
integers.
"""
def encode(self, data):
""" Encoder implementation of the Bencode algorithm
@param data: The data to encode
@type data: int, long, tuple, list, dict or str
@return: The encoded data
@rtype: str
"""
if isinstance(data, (int, long)):
return b'i%de' % data
elif isinstance(data, bytes):
return b'%d:%s' % (len(data), data)
elif isinstance(data, (list, tuple)):
encodedListItems = b''
for item in data:
encodedListItems += self.encode(item)
return b'l%se' % encodedListItems
elif isinstance(data, dict):
encodedDictItems = b''
keys = data.keys()
for key in sorted(keys):
encodedDictItems += self.encode(key) # TODO: keys should always be bytestrings
encodedDictItems += self.encode(data[key])
return b'd%se' % encodedDictItems
else:
raise TypeError("Cannot bencode '%s' object" % type(data))
def decode(self, data):
""" Decoder implementation of the Bencode algorithm
@param data: The encoded data
@type data: str
@note: This is a convenience wrapper for the recursive decoding
algorithm, C{_decodeRecursive}
@return: The decoded data, as a native Python type
@rtype: int, list, dict or str
"""
assert type(data) == bytes # fixme: _maybe_ remove this after porting
if len(data) == 0:
raise DecodeError('Cannot decode empty string')
try:
return self._decodeRecursive(data)[0]
except ValueError as e:
raise DecodeError(e.message)
@staticmethod
def _decodeRecursive(data, startIndex=0):
""" Actual implementation of the recursive Bencode algorithm
Do not call this; use C{decode()} instead
"""
if data[startIndex] == raw('i'):
endPos = data[startIndex:].find(b'e') + startIndex
return int(data[startIndex + 1:endPos]), endPos + 1
elif data[startIndex] == raw('l'):
startIndex += 1
decodedList = []
while data[startIndex] != raw('e'):
listData, startIndex = Bencode._decodeRecursive(data, startIndex)
decodedList.append(listData)
return decodedList, startIndex + 1
elif data[startIndex] == raw('d'):
startIndex += 1
decodedDict = {}
while data[startIndex] != raw('e'):
key, startIndex = Bencode._decodeRecursive(data, startIndex)
value, startIndex = Bencode._decodeRecursive(data, startIndex)
decodedDict[key] = value
return decodedDict, startIndex
elif data[startIndex] == raw('f'):
# This (float data type) is a non-standard extension to the original Bencode algorithm
endPos = data[startIndex:].find(b'e') + startIndex
return float(data[startIndex + 1:endPos]), endPos + 1
elif data[startIndex] == raw('n'):
# This (None/NULL data type) is a non-standard extension
# to the original Bencode algorithm
return None, startIndex + 1
else:
splitPos = data[startIndex:].find(b':') + startIndex
try:
length = int(data[startIndex:splitPos])
except ValueError:
raise DecodeError()
startIndex = splitPos + 1
endPos = startIndex + length
bytes = data[startIndex:endPos]
return bytes, endPos