forked from LBRYCommunity/lbry-sdk
rewrote URL parser
This commit is contained in:
parent
e5edaed677
commit
03455310ae
4 changed files with 192 additions and 298 deletions
|
@ -1,185 +0,0 @@
|
|||
import re
|
||||
|
||||
PROTOCOL = 'lbry://'
|
||||
CHANNEL_CHAR = '@'
|
||||
CLAIM_ID_CHAR = '#'
|
||||
CLAIM_SEQUENCE_CHAR = ':'
|
||||
BID_POSITION_CHAR = '$'
|
||||
PATH_CHAR = '/'
|
||||
QUERY_CHAR = '?'
|
||||
|
||||
CLAIM_ID_MAX_LENGTH = 40
|
||||
CHANNEL_NAME_MIN_LENGTH = 1
|
||||
|
||||
|
||||
class URIParseError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class URI(object):
|
||||
__slots__ = ['name', 'claim_sequence', 'bid_position', 'claim_id', 'path']
|
||||
|
||||
def __init__(self, name, claim_sequence=None, bid_position=None, claim_id=None, path=None):
|
||||
if len([v for v in [claim_sequence, bid_position, claim_id] if v is not None]) > 1:
|
||||
raise ValueError(
|
||||
"Only one of these may be present at a time: claim_sequence, bid_position, claim_id"
|
||||
)
|
||||
|
||||
self.name = name
|
||||
self.claim_sequence = claim_sequence
|
||||
self.bid_position = bid_position
|
||||
self.claim_id = claim_id
|
||||
self.path = path
|
||||
|
||||
if self.path is not None and not self.contains_channel:
|
||||
raise ValueError("Content claims cannot have paths")
|
||||
|
||||
def __str__(self):
|
||||
return self.to_uri_string()
|
||||
|
||||
def __eq__(self, other):
|
||||
for prop in self.__slots__:
|
||||
if not hasattr(other, prop) or getattr(self, prop) != getattr(other, prop):
|
||||
return False
|
||||
return self.__class__ == other.__class__
|
||||
@property
|
||||
def channel_name(self):
|
||||
return self.name if self.contains_channel else None
|
||||
|
||||
@property
|
||||
def claim_name(self):
|
||||
return self.name if not self.contains_channel else self.path
|
||||
|
||||
@property
|
||||
def contains_channel(self):
|
||||
return self.name.startswith(CHANNEL_CHAR)
|
||||
|
||||
@property
|
||||
def is_channel(self):
|
||||
return self.contains_channel and not self.path
|
||||
|
||||
def to_uri_string(self):
|
||||
uri_string = PROTOCOL + "%s" % self.name
|
||||
|
||||
if self.claim_sequence is not None:
|
||||
uri_string += CLAIM_SEQUENCE_CHAR + "%i" % self.claim_sequence
|
||||
elif self.bid_position is not None:
|
||||
uri_string += BID_POSITION_CHAR + "%i" % self.bid_position
|
||||
elif self.claim_id is not None:
|
||||
uri_string += CLAIM_ID_CHAR + "%s" % self.claim_id
|
||||
|
||||
if self.path is not None:
|
||||
uri_string += PATH_CHAR + "%s" % self.path
|
||||
|
||||
return uri_string
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"name": self.name,
|
||||
'claim_sequence': self.claim_sequence,
|
||||
'bid_position': self.bid_position,
|
||||
'claim_id': self.claim_id,
|
||||
'path': self.path,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_uri_string(cls, uri_string):
|
||||
"""
|
||||
Parses LBRY uri into its components
|
||||
|
||||
:param uri_string: format - lbry://name:n$rank#id/path
|
||||
optional modifiers:
|
||||
claim_sequence (int): the nth claim to the name
|
||||
bid_position (int): the bid queue position of the claim for the name
|
||||
claim_id (str): the claim id for the claim
|
||||
path (str): claim within a channel
|
||||
:return: URI
|
||||
"""
|
||||
match = re.match(get_schema_regex(), uri_string)
|
||||
|
||||
if match is None:
|
||||
raise URIParseError('Invalid URI')
|
||||
|
||||
if match.group('content_name') and match.group('path'):
|
||||
raise URIParseError('Only channels may have paths')
|
||||
|
||||
return cls(
|
||||
name=match.group("content_or_channel_name"),
|
||||
claim_sequence=int(match.group("claim_sequence")) if match.group(
|
||||
"claim_sequence") is not None else None,
|
||||
bid_position=int(match.group("bid_position")) if match.group(
|
||||
"bid_position") is not None else None,
|
||||
claim_id=match.group("claim_id"),
|
||||
path=match.group("path")
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, uri_dict):
|
||||
"""
|
||||
Creates URI from dict
|
||||
|
||||
:return: URI
|
||||
"""
|
||||
return cls(**uri_dict)
|
||||
|
||||
|
||||
def get_schema_regex():
|
||||
def _named(name, regex):
|
||||
return "(?P<" + name + ">" + regex + ")"
|
||||
|
||||
def _group(regex):
|
||||
return "(?:" + regex + ")"
|
||||
|
||||
# TODO: regex should include the fact that content names cannot have paths
|
||||
# right now this is only enforced in code, not in the regex
|
||||
|
||||
# Escape constants
|
||||
claim_id_char = re.escape(CLAIM_ID_CHAR)
|
||||
claim_sequence_char = re.escape(CLAIM_SEQUENCE_CHAR)
|
||||
bid_position_char = re.escape(BID_POSITION_CHAR)
|
||||
channel_char = re.escape(CHANNEL_CHAR)
|
||||
path_char = re.escape(PATH_CHAR)
|
||||
protocol = _named("protocol", re.escape(PROTOCOL))
|
||||
|
||||
# Define basic building blocks
|
||||
valid_name_char = "[^=&#:$@%?/]" # from the grammar section of https://spec.lbry.io/
|
||||
name_content = valid_name_char + '+'
|
||||
name_min_channel_length = valid_name_char + '{' + str(CHANNEL_NAME_MIN_LENGTH) + ',}'
|
||||
|
||||
positive_number = "[1-9][0-9]*"
|
||||
number = '\-?' + positive_number
|
||||
|
||||
# Define URI components
|
||||
content_name = _named("content_name", name_content)
|
||||
channel_name = _named("channel_name", channel_char + name_min_channel_length)
|
||||
content_or_channel_name = _named("content_or_channel_name", content_name + "|" + channel_name)
|
||||
|
||||
claim_id_piece = _named("claim_id", "[0-9a-f]{1," + str(CLAIM_ID_MAX_LENGTH) + "}")
|
||||
claim_id = _group(claim_id_char + claim_id_piece)
|
||||
|
||||
bid_position_piece = _named("bid_position", number)
|
||||
bid_position = _group(bid_position_char + bid_position_piece)
|
||||
|
||||
claim_sequence_piece = _named("claim_sequence", number)
|
||||
claim_sequence = _group(claim_sequence_char + claim_sequence_piece)
|
||||
|
||||
modifier = _named("modifier", claim_id + "|" + bid_position + "|" + claim_sequence)
|
||||
|
||||
path_piece = _named("path", name_content)
|
||||
path = _group(path_char + path_piece)
|
||||
|
||||
# Combine components
|
||||
uri = _named("uri", (
|
||||
'^' +
|
||||
protocol + '?' +
|
||||
content_or_channel_name +
|
||||
modifier + '?' +
|
||||
path + '?' +
|
||||
'$'
|
||||
))
|
||||
|
||||
return uri
|
||||
|
||||
|
||||
def parse_lbry_uri(lbry_uri):
|
||||
return URI.from_uri_string(lbry_uri)
|
107
lbrynet/schema/url.py
Normal file
107
lbrynet/schema/url.py
Normal file
|
@ -0,0 +1,107 @@
|
|||
import re
|
||||
from typing import NamedTuple, Tuple
|
||||
|
||||
|
||||
def _create_url_regex():
|
||||
# see https://spec.lbry.io/
|
||||
|
||||
def _named(name, regex):
|
||||
return "(?P<" + name + ">" + regex + ")"
|
||||
|
||||
def _group(regex):
|
||||
return "(?:" + regex + ")"
|
||||
|
||||
def _oneof(*choices):
|
||||
return _group('|'.join(choices))
|
||||
|
||||
def _claim(name, prefix=""):
|
||||
return _group(
|
||||
_named(name+"_name", prefix + "[^=&#:$@%?/]+") +
|
||||
_oneof(
|
||||
_group('#' + _named(name+"_claim_id", "[0-9a-f]{1,40}")),
|
||||
_group(':' + _named(name+"_sequence", '[1-9][0-9]*')),
|
||||
_group(r'\$' + _named(name+"_amount_order", '[1-9][0-9]*'))
|
||||
) + '?'
|
||||
)
|
||||
|
||||
return (
|
||||
'^' +
|
||||
_named("scheme", "lbry://") + '?' +
|
||||
_oneof(
|
||||
_group(_claim("channel_with_stream", "@") + "/" + _claim("stream_in_channel")),
|
||||
_claim("channel", "@"),
|
||||
_claim("stream")
|
||||
) +
|
||||
'$'
|
||||
)
|
||||
|
||||
|
||||
URL_REGEX = _create_url_regex()
|
||||
|
||||
|
||||
class PathSegment(NamedTuple):
|
||||
name: str
|
||||
claim_id: str = None
|
||||
sequence: int = None
|
||||
amount_order: int = None
|
||||
|
||||
def __str__(self):
|
||||
if self.claim_id is not None:
|
||||
return f"{self.name}#{self.claim_id}"
|
||||
elif self.sequence is not None:
|
||||
return f"{self.name}:{self.sequence}"
|
||||
elif self.amount_order is not None:
|
||||
return f"{self.name}${self.amount_order}"
|
||||
return self.name
|
||||
|
||||
|
||||
class URL(NamedTuple):
|
||||
stream: PathSegment
|
||||
channel: PathSegment
|
||||
|
||||
@property
|
||||
def has_channel(self):
|
||||
return self.channel is not None
|
||||
|
||||
@property
|
||||
def has_stream(self):
|
||||
return self.stream is not None
|
||||
|
||||
@property
|
||||
def parts(self) -> Tuple:
|
||||
if self.has_channel:
|
||||
if self.has_stream:
|
||||
return self.channel, self.stream
|
||||
return self.channel,
|
||||
return self.stream,
|
||||
|
||||
@property
|
||||
def first(self):
|
||||
return self.parts[0]
|
||||
|
||||
def __str__(self):
|
||||
return f"lbry://{'/'.join(str(p) for p in self.parts)}"
|
||||
|
||||
@classmethod
|
||||
def parse(cls, url):
|
||||
match = re.match(URL_REGEX, url)
|
||||
|
||||
if match is None:
|
||||
raise ValueError('Invalid LBRY URL')
|
||||
|
||||
segments = {}
|
||||
parts = match.groupdict()
|
||||
for segment in ('channel', 'stream', 'channel_with_stream', 'stream_in_channel'):
|
||||
if parts[f'{segment}_name'] is not None:
|
||||
segments[segment] = PathSegment(
|
||||
parts[f'{segment}_name'],
|
||||
parts[f'{segment}_claim_id'],
|
||||
parts[f'{segment}_sequence'],
|
||||
parts[f'{segment}_amount_order']
|
||||
)
|
||||
|
||||
if 'channel_with_stream' in segments:
|
||||
segments['channel'] = segments['channel_with_stream']
|
||||
segments['stream'] = segments['stream_in_channel']
|
||||
|
||||
return cls(segments.get('stream', None), segments.get('channel', None))
|
|
@ -1,113 +0,0 @@
|
|||
import unittest
|
||||
|
||||
from lbrynet.schema.uri import URI, URIParseError
|
||||
|
||||
claim_id_1 = "63f2da17b0d90042c559cc73b6b17f853945c43e"
|
||||
|
||||
parsed_uri_matches = [
|
||||
("test", URI("test"), False, False, "test", None),
|
||||
("test#%s" % claim_id_1, URI("test", claim_id=claim_id_1), False, False, "test", None),
|
||||
("test:1", URI("test", claim_sequence=1), False, False, "test", None),
|
||||
("test$1", URI("test", bid_position=1), False, False, "test", None),
|
||||
("lbry://test", URI("test"), False, False, "test", None),
|
||||
("lbry://test#%s" % claim_id_1, URI("test", claim_id=claim_id_1), False, False, "test", None),
|
||||
("lbry://test:1", URI("test", claim_sequence=1), False, False, "test", None),
|
||||
("lbry://test$1", URI("test", bid_position=1), False, False, "test", None),
|
||||
("@test", URI("@test"), True, True, None, "@test"),
|
||||
("@test#%s" % claim_id_1, URI("@test", claim_id=claim_id_1), True, True, None, "@test"),
|
||||
("@test:1", URI("@test", claim_sequence=1), True, True, None, "@test"),
|
||||
("@test$1", URI("@test", bid_position=1), True, True, None, "@test"),
|
||||
("lbry://@test1:1/fakepath", URI("@test1", claim_sequence=1, path="fakepath"), True, False, "fakepath", "@test1"),
|
||||
("lbry://@test1$1/fakepath", URI("@test1", bid_position=1, path="fakepath"), True, False, "fakepath", "@test1"),
|
||||
("lbry://@test1#abcdef/fakepath", URI("@test1", claim_id="abcdef", path="fakepath"), True, False, "fakepath",
|
||||
"@test1"),
|
||||
("@z", URI("@z"), True, True, None, "@z"),
|
||||
("@yx", URI("@yx"), True, True, None, "@yx"),
|
||||
("@abc", URI("@abc"), True, True, None, "@abc")
|
||||
]
|
||||
|
||||
parsed_uri_raises = [
|
||||
("lbry://", URIParseError),
|
||||
("lbry://test:3$1", URIParseError),
|
||||
("lbry://test$1:1", URIParseError),
|
||||
("lbry://test#x", URIParseError),
|
||||
("lbry://test#x/page", URIParseError),
|
||||
("lbry://test$", URIParseError),
|
||||
("lbry://test#", URIParseError),
|
||||
("lbry://test:", URIParseError),
|
||||
("lbry://test$x", URIParseError),
|
||||
("lbry://test:x", URIParseError),
|
||||
("lbry://@test@", URIParseError),
|
||||
("lbry://@test:", URIParseError),
|
||||
("lbry://test@", URIParseError),
|
||||
("lbry://tes@t", URIParseError),
|
||||
("lbry://test:1#%s" % claim_id_1, URIParseError),
|
||||
("lbry://test:0", URIParseError),
|
||||
("lbry://test$0", URIParseError),
|
||||
("lbry://test/path", URIParseError),
|
||||
("lbry://@test1#abcdef/fakepath:1", URIParseError),
|
||||
("lbry://@test1:1/fakepath:1", URIParseError),
|
||||
("lbry://@test1:1ab/fakepath", URIParseError),
|
||||
("lbry://test:1:1:1", URIParseError),
|
||||
("whatever/lbry://test", URIParseError),
|
||||
("lbry://lbry://test", URIParseError),
|
||||
("lbry://@/what", URIParseError),
|
||||
("lbry://abc:0x123", URIParseError),
|
||||
("lbry://abc:0x123/page", URIParseError),
|
||||
("lbry://@test1#ABCDEF/fakepath", URIParseError),
|
||||
("test:0001", URIParseError),
|
||||
("lbry://@test1$1/fakepath?arg1&arg2&arg3", URIParseError)
|
||||
]
|
||||
|
||||
|
||||
class TestURIParser(unittest.TestCase):
|
||||
|
||||
maxDiff = 4000
|
||||
longMessage = True
|
||||
|
||||
def test_uri_parse(self):
|
||||
for test_string, expected_uri_obj, contains_channel, is_channel, claim_name, channel_name in parsed_uri_matches:
|
||||
try:
|
||||
# string -> URI
|
||||
self.assertEqual(URI.from_uri_string(test_string), expected_uri_obj, test_string)
|
||||
# URI -> dict -> URI
|
||||
self.assertEqual(URI.from_dict(expected_uri_obj.to_dict()), expected_uri_obj,
|
||||
test_string)
|
||||
# contains_channel
|
||||
self.assertEqual(URI.from_uri_string(test_string).contains_channel, contains_channel,
|
||||
test_string)
|
||||
# is_channel
|
||||
self.assertEqual(URI.from_uri_string(test_string).is_channel, is_channel,
|
||||
test_string)
|
||||
# claim_name
|
||||
self.assertEqual(URI.from_uri_string(test_string).claim_name, claim_name,
|
||||
test_string)
|
||||
# channel_name
|
||||
self.assertEqual(URI.from_uri_string(test_string).channel_name, channel_name,
|
||||
test_string)
|
||||
|
||||
# convert-to-string test only works if protocol is present in test_string
|
||||
if test_string.startswith('lbry://'):
|
||||
# string -> URI -> string
|
||||
self.assertEqual(URI.from_uri_string(test_string).to_uri_string(), test_string,
|
||||
test_string)
|
||||
# string -> URI -> dict -> URI -> string
|
||||
uri_dict = URI.from_uri_string(test_string).to_dict()
|
||||
self.assertEqual(URI.from_dict(uri_dict).to_uri_string(), test_string,
|
||||
test_string)
|
||||
# URI -> dict -> URI -> string
|
||||
self.assertEqual(URI.from_dict(expected_uri_obj.to_dict()).to_uri_string(),
|
||||
test_string, test_string)
|
||||
except URIParseError as err:
|
||||
print("ERROR: " + test_string)
|
||||
raise
|
||||
|
||||
def test_uri_errors(self):
|
||||
for test_str, err in parsed_uri_raises:
|
||||
try:
|
||||
URI.from_uri_string(test_str)
|
||||
except URIParseError:
|
||||
pass
|
||||
else:
|
||||
print("\nSuccessfully parsed invalid url: " + test_str)
|
||||
self.assertRaises(err, URI.from_uri_string, test_str)
|
85
tests/unit/schema/test_url.py
Normal file
85
tests/unit/schema/test_url.py
Normal file
|
@ -0,0 +1,85 @@
|
|||
import unittest
|
||||
|
||||
from lbrynet.schema.url import URL
|
||||
|
||||
|
||||
claim_id = "63f2da17b0d90042c559cc73b6b17f853945c43e"
|
||||
|
||||
|
||||
class TestURLParsing(unittest.TestCase):
|
||||
|
||||
segments = 'stream', 'channel'
|
||||
fields = 'name', 'claim_id', 'sequence', 'amount_order'
|
||||
|
||||
def _assert_url(self, url_string, **kwargs):
|
||||
url = URL.parse(url_string)
|
||||
self.assertEqual(url_string, str(url))
|
||||
present = {}
|
||||
for key in kwargs:
|
||||
for segment_name in self.segments:
|
||||
if key.startswith(segment_name):
|
||||
present[segment_name] = True
|
||||
break
|
||||
for segment_name in self.segments:
|
||||
segment = getattr(url, segment_name)
|
||||
if segment_name not in present:
|
||||
self.assertIsNone(segment)
|
||||
else:
|
||||
for field in self.fields:
|
||||
self.assertEqual(
|
||||
getattr(segment, field),
|
||||
kwargs.get(f'{segment_name}_{field}', None)
|
||||
)
|
||||
|
||||
def _fail_url(self, url):
|
||||
with self.assertRaisesRegex(ValueError, 'Invalid LBRY URL'):
|
||||
URL.parse(url)
|
||||
|
||||
def test_parser_valid_urls(self):
|
||||
url = self._assert_url
|
||||
# stream
|
||||
url('test', stream_name='test')
|
||||
url('test:1', stream_name='test', stream_sequence='1')
|
||||
url('test$1', stream_name='test', stream_amount_order='1')
|
||||
url(f'test#{claim_id}', stream_name='test', stream_claim_id=claim_id)
|
||||
# channel
|
||||
url('@test', channel_name='@test')
|
||||
url('@test:1', channel_name='@test', channel_sequence='1')
|
||||
url('@test$1', channel_name='@test', channel_amount_order='1')
|
||||
url(f'@test#{claim_id}', channel_name='@test', channel_claim_id=claim_id)
|
||||
# channel/stream
|
||||
url('lbry://@test/stuff', channel_name='@test', stream_name='stuff')
|
||||
url('lbry://@test:1/stuff', channel_name='@test', channel_sequence='1', stream_name='stuff')
|
||||
url('lbry://@test$1/stuff', channel_name='@test', channel_amount_order='1', stream_name='stuff')
|
||||
url(f'lbry://@test#{claim_id}/stuff', channel_name='@test', channel_claim_id=claim_id, stream_name='stuff')
|
||||
|
||||
def test_parser_invalid_urls(self):
|
||||
fail = self._fail_url
|
||||
fail("lbry://")
|
||||
fail("lbry://test:3$1")
|
||||
fail("lbry://test$1:1")
|
||||
fail("lbry://test#x")
|
||||
fail("lbry://test#x/page")
|
||||
fail("lbry://test$")
|
||||
fail("lbry://test#")
|
||||
fail("lbry://test:")
|
||||
fail("lbry://test$x")
|
||||
fail("lbry://test:x")
|
||||
fail("lbry://@test@")
|
||||
fail("lbry://@test:")
|
||||
fail("lbry://test@")
|
||||
fail("lbry://tes@t")
|
||||
fail(f"lbry://test:1#{claim_id}")
|
||||
fail("lbry://test:0")
|
||||
fail("lbry://test$0")
|
||||
fail("lbry://test/path")
|
||||
fail("lbry://@test1:1ab/fakepath")
|
||||
fail("lbry://test:1:1:1")
|
||||
fail("whatever/lbry://test")
|
||||
fail("lbry://lbry://test")
|
||||
fail("lbry://@/what")
|
||||
fail("lbry://abc:0x123")
|
||||
fail("lbry://abc:0x123/page")
|
||||
fail("lbry://@test1#ABCDEF/fakepath")
|
||||
fail("test:0001")
|
||||
fail("lbry://@test1$1/fakepath?arg1&arg2&arg3")
|
Loading…
Reference in a new issue