forked from LBRYCommunity/lbry-sdk
145 lines
5.4 KiB
Python
145 lines
5.4 KiB
Python
|
#!/usr/bin/env python
|
||
|
#
|
||
|
# This library is free software, distributed under the terms of
|
||
|
# the GNU Lesser General Public License Version 3, or any later version.
|
||
|
# See the COPYING file included in this archive
|
||
|
#
|
||
|
# The docstrings in this module contain epytext markup; API documentation
|
||
|
# may be created by processing this file with epydoc: http://epydoc.sf.net
|
||
|
|
||
|
class DecodeError(Exception):
|
||
|
""" Should be raised by an C{Encoding} implementation if decode operation
|
||
|
fails
|
||
|
"""
|
||
|
|
||
|
class Encoding(object):
|
||
|
""" Interface for RPC message encoders/decoders
|
||
|
|
||
|
All encoding implementations used with this library should inherit and
|
||
|
implement this.
|
||
|
"""
|
||
|
def encode(self, data):
|
||
|
""" Encode the specified data
|
||
|
|
||
|
@param data: The data to encode
|
||
|
This method has to support encoding of the following
|
||
|
types: C{str}, C{int} and C{long}
|
||
|
Any additional data types may be supported as long as the
|
||
|
implementing class's C{decode()} method can successfully
|
||
|
decode them.
|
||
|
|
||
|
@return: The encoded data
|
||
|
@rtype: str
|
||
|
"""
|
||
|
def decode(self, data):
|
||
|
""" Decode the specified data string
|
||
|
|
||
|
@param data: The data (byte string) to decode.
|
||
|
@type data: str
|
||
|
|
||
|
@return: The decoded data (in its correct type)
|
||
|
"""
|
||
|
|
||
|
class Bencode(Encoding):
|
||
|
""" Implementation of a Bencode-based algorithm (Bencode is the encoding
|
||
|
algorithm used by Bittorrent).
|
||
|
|
||
|
@note: This algorithm differs from the "official" Bencode algorithm in
|
||
|
that it can encode/decode floating point values in addition to
|
||
|
integers.
|
||
|
"""
|
||
|
|
||
|
def encode(self, data):
|
||
|
""" Encoder implementation of the Bencode algorithm
|
||
|
|
||
|
@param data: The data to encode
|
||
|
@type data: int, long, tuple, list, dict or str
|
||
|
|
||
|
@return: The encoded data
|
||
|
@rtype: str
|
||
|
"""
|
||
|
if type(data) in (int, long):
|
||
|
return 'i%de' % data
|
||
|
elif type(data) == str:
|
||
|
return '%d:%s' % (len(data), data)
|
||
|
elif type(data) in (list, tuple):
|
||
|
encodedListItems = ''
|
||
|
for item in data:
|
||
|
encodedListItems += self.encode(item)
|
||
|
return 'l%se' % encodedListItems
|
||
|
elif type(data) == dict:
|
||
|
encodedDictItems = ''
|
||
|
keys = data.keys()
|
||
|
keys.sort()
|
||
|
for key in keys:
|
||
|
encodedDictItems += self.encode(key)
|
||
|
encodedDictItems += self.encode(data[key])
|
||
|
return 'd%se' % encodedDictItems
|
||
|
elif type(data) == float:
|
||
|
# This (float data type) is a non-standard extension to the original Bencode algorithm
|
||
|
return 'f%fe' % data
|
||
|
elif data == None:
|
||
|
# This (None/NULL data type) is a non-standard extension to the original Bencode algorithm
|
||
|
return 'n'
|
||
|
else:
|
||
|
print data
|
||
|
raise TypeError, "Cannot bencode '%s' object" % type(data)
|
||
|
|
||
|
def decode(self, data):
|
||
|
""" Decoder implementation of the Bencode algorithm
|
||
|
|
||
|
@param data: The encoded data
|
||
|
@type data: str
|
||
|
|
||
|
@note: This is a convenience wrapper for the recursive decoding
|
||
|
algorithm, C{_decodeRecursive}
|
||
|
|
||
|
@return: The decoded data, as a native Python type
|
||
|
@rtype: int, list, dict or str
|
||
|
"""
|
||
|
if len(data) == 0:
|
||
|
raise DecodeError, 'Cannot decode empty string'
|
||
|
return self._decodeRecursive(data)[0]
|
||
|
|
||
|
@staticmethod
|
||
|
def _decodeRecursive(data, startIndex=0):
|
||
|
""" Actual implementation of the recursive Bencode algorithm
|
||
|
|
||
|
Do not call this; use C{decode()} instead
|
||
|
"""
|
||
|
if data[startIndex] == 'i':
|
||
|
endPos = data[startIndex:].find('e')+startIndex
|
||
|
return (int(data[startIndex+1:endPos]), endPos+1)
|
||
|
elif data[startIndex] == 'l':
|
||
|
startIndex += 1
|
||
|
decodedList = []
|
||
|
while data[startIndex] != 'e':
|
||
|
listData, startIndex = Bencode._decodeRecursive(data, startIndex)
|
||
|
decodedList.append(listData)
|
||
|
return (decodedList, startIndex+1)
|
||
|
elif data[startIndex] == 'd':
|
||
|
startIndex += 1
|
||
|
decodedDict = {}
|
||
|
while data[startIndex] != 'e':
|
||
|
key, startIndex = Bencode._decodeRecursive(data, startIndex)
|
||
|
value, startIndex = Bencode._decodeRecursive(data, startIndex)
|
||
|
decodedDict[key] = value
|
||
|
return (decodedDict, startIndex)
|
||
|
elif data[startIndex] == 'f':
|
||
|
# This (float data type) is a non-standard extension to the original Bencode algorithm
|
||
|
endPos = data[startIndex:].find('e')+startIndex
|
||
|
return (float(data[startIndex+1:endPos]), endPos+1)
|
||
|
elif data[startIndex] == 'n':
|
||
|
# This (None/NULL data type) is a non-standard extension to the original Bencode algorithm
|
||
|
return (None, startIndex+1)
|
||
|
else:
|
||
|
splitPos = data[startIndex:].find(':')+startIndex
|
||
|
try:
|
||
|
length = int(data[startIndex:splitPos])
|
||
|
except ValueError, e:
|
||
|
raise DecodeError, e
|
||
|
startIndex = splitPos+1
|
||
|
endPos = startIndex+length
|
||
|
bytes = data[startIndex:endPos]
|
||
|
return (bytes, endPos)
|