forked from LBRYCommunity/lbry-sdk
Added support for spec compliant URL parsing
Legacy URLs are preserved by attempting to parse the new URL format and, on failing that, it'll attempt the legacy one. Tests had to be updated such that the correct things are asserted against each other.
This commit is contained in:
parent
ab067d1d3a
commit
7637aa2ab6
2 changed files with 37 additions and 20 deletions
|
@ -1,12 +1,12 @@
|
||||||
import re
|
import re
|
||||||
import unicodedata
|
import unicodedata
|
||||||
from typing import NamedTuple, Tuple
|
from typing import Iterable, NamedTuple, Pattern, Tuple
|
||||||
|
|
||||||
|
|
||||||
def _create_url_regex():
|
def _create_url_regex(legacy=False):
|
||||||
# see https://spec.lbry.com/ and test_url.py
|
# see https://spec.lbry.com/ and test_url.py
|
||||||
invalid_names_regex = \
|
invalid_names_regex = \
|
||||||
r"[^=&#:$@%?;\"/\\<>%{}|^~`\[\]" \
|
r"[^=&#:$@%*?;\"/\\<>%{}|^~`\[\]" \
|
||||||
r"\u0000-\u0020\uD800-\uDFFF\uFFFE-\uFFFF]+"
|
r"\u0000-\u0020\uD800-\uDFFF\uFFFE-\uFFFF]+"
|
||||||
|
|
||||||
def _named(name, regex):
|
def _named(name, regex):
|
||||||
|
@ -18,29 +18,42 @@ def _create_url_regex():
|
||||||
def _oneof(*choices):
|
def _oneof(*choices):
|
||||||
return _group('|'.join(choices))
|
return _group('|'.join(choices))
|
||||||
|
|
||||||
|
def _legacy_claim(name, prefix=""):
|
||||||
|
return _group(
|
||||||
|
_named(name + "_name", prefix + invalid_names_regex) +
|
||||||
|
_oneof(
|
||||||
|
_group('#' + _named(name + "_claim_id", "[0-9a-f]{1,40}")),
|
||||||
|
_group(':' + _named(name + "_sequence", '[1-9][0-9]*')),
|
||||||
|
_group(r'\$' + _named(name + "_amount_order", '[1-9][0-9]*'))
|
||||||
|
) + '?'
|
||||||
|
)
|
||||||
|
|
||||||
def _claim(name, prefix=""):
|
def _claim(name, prefix=""):
|
||||||
return _group(
|
return _group(
|
||||||
_named(name+"_name", prefix + invalid_names_regex) +
|
_named(name+"_name", prefix + invalid_names_regex) +
|
||||||
_oneof(
|
_oneof(
|
||||||
_group('#' + _named(name+"_claim_id", "[0-9a-f]{1,40}")),
|
_group(':' + _named(name+"_claim_id", "[0-9a-f]{1,40}")),
|
||||||
_group(':' + _named(name+"_sequence", '[1-9][0-9]*')),
|
_group(r'\*' + _named(name+"_sequence", '[1-9][0-9]*')),
|
||||||
_group(r'\$' + _named(name+"_amount_order", '[1-9][0-9]*'))
|
_group(r'\$' + _named(name+"_amount_order", '[1-9][0-9]*'))
|
||||||
) + '?'
|
) + '?'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
claim = _claim if not legacy else _legacy_claim
|
||||||
|
|
||||||
return (
|
return (
|
||||||
'^' +
|
'^' +
|
||||||
_named("scheme", "lbry://") + '?' +
|
_named("scheme", "lbry://") + '?' +
|
||||||
_oneof(
|
_oneof(
|
||||||
_group(_claim("channel_with_stream", "@") + "/" + _claim("stream_in_channel")),
|
_group(claim("channel_with_stream", "@") + "/" + claim("stream_in_channel")),
|
||||||
_claim("channel", "@"),
|
claim("channel", "@"),
|
||||||
_claim("stream")
|
claim("stream")
|
||||||
) +
|
) +
|
||||||
'$'
|
'$'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
URL_REGEX = _create_url_regex()
|
URL_REGEX = _create_url_regex()
|
||||||
|
URL_REGEX_LEGACY = _create_url_regex(legacy=True)
|
||||||
|
|
||||||
|
|
||||||
def normalize_name(name):
|
def normalize_name(name):
|
||||||
|
@ -69,9 +82,9 @@ class PathSegment(NamedTuple):
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
if self.claim_id is not None:
|
if self.claim_id is not None:
|
||||||
return f"{self.name}#{self.claim_id}"
|
return f"{self.name}:{self.claim_id}"
|
||||||
elif self.sequence is not None:
|
elif self.sequence is not None:
|
||||||
return f"{self.name}:{self.sequence}"
|
return f"{self.name}*{self.sequence}"
|
||||||
elif self.amount_order is not None:
|
elif self.amount_order is not None:
|
||||||
return f"{self.name}${self.amount_order}"
|
return f"{self.name}${self.amount_order}"
|
||||||
return self.name
|
return self.name
|
||||||
|
@ -104,9 +117,13 @@ class URL(NamedTuple):
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return f"lbry://{'/'.join(str(p) for p in self.parts)}"
|
return f"lbry://{'/'.join(str(p) for p in self.parts)}"
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _first_match(x: str, ptns: Iterable[Pattern[str]]):
|
||||||
|
return next(filter(None, (re.match(ptn, x) for ptn in ptns)), None)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def parse(cls, url):
|
def parse(cls, url):
|
||||||
match = re.match(URL_REGEX, url)
|
match = URL._first_match(url, (URL_REGEX, URL_REGEX_LEGACY))
|
||||||
|
|
||||||
if match is None:
|
if match is None:
|
||||||
raise ValueError('Invalid LBRY URL')
|
raise ValueError('Invalid LBRY URL')
|
||||||
|
|
|
@ -42,19 +42,19 @@ class TestURLParsing(unittest.TestCase):
|
||||||
url = self._assert_url
|
url = self._assert_url
|
||||||
# stream
|
# stream
|
||||||
url('test', stream_name='test')
|
url('test', stream_name='test')
|
||||||
url('test:1', stream_name='test', stream_sequence='1')
|
url('test*1', stream_name='test', stream_sequence='1')
|
||||||
url('test$1', stream_name='test', stream_amount_order='1')
|
url('test$1', stream_name='test', stream_amount_order='1')
|
||||||
url(f'test#{claim_id}', stream_name='test', stream_claim_id=claim_id)
|
url(f'test:{claim_id}', stream_name='test', stream_claim_id=claim_id)
|
||||||
# channel
|
# channel
|
||||||
url('@test', channel_name='@test')
|
url('@test', channel_name='@test')
|
||||||
url('@test:1', channel_name='@test', channel_sequence='1')
|
url('@test*1', channel_name='@test', channel_sequence='1')
|
||||||
url('@test$1', channel_name='@test', channel_amount_order='1')
|
url('@test$1', channel_name='@test', channel_amount_order='1')
|
||||||
url(f'@test#{claim_id}', channel_name='@test', channel_claim_id=claim_id)
|
url(f'@test:{claim_id}', channel_name='@test', channel_claim_id=claim_id)
|
||||||
# channel/stream
|
# channel/stream
|
||||||
url('lbry://@test/stuff', channel_name='@test', stream_name='stuff')
|
url('lbry://@test/stuff', channel_name='@test', stream_name='stuff')
|
||||||
url('lbry://@test:1/stuff', channel_name='@test', channel_sequence='1', stream_name='stuff')
|
url('lbry://@test*1/stuff', channel_name='@test', channel_sequence='1', stream_name='stuff')
|
||||||
url('lbry://@test$1/stuff', channel_name='@test', channel_amount_order='1', stream_name='stuff')
|
url('lbry://@test$1/stuff', channel_name='@test', channel_amount_order='1', stream_name='stuff')
|
||||||
url(f'lbry://@test#{claim_id}/stuff', channel_name='@test', channel_claim_id=claim_id, stream_name='stuff')
|
url(f'lbry://@test:{claim_id}/stuff', channel_name='@test', channel_claim_id=claim_id, stream_name='stuff')
|
||||||
# unicode regex edges
|
# unicode regex edges
|
||||||
_url = lambda name: url(name, stream_name=name)
|
_url = lambda name: url(name, stream_name=name)
|
||||||
_url('\uD799')
|
_url('\uD799')
|
||||||
|
@ -104,10 +104,10 @@ class TestURLParsing(unittest.TestCase):
|
||||||
fail("lbry://test@")
|
fail("lbry://test@")
|
||||||
fail("lbry://tes@t")
|
fail("lbry://tes@t")
|
||||||
fail(f"lbry://test:1#{claim_id}")
|
fail(f"lbry://test:1#{claim_id}")
|
||||||
fail("lbry://test:0")
|
fail("lbry://test*0")
|
||||||
fail("lbry://test$0")
|
fail("lbry://test$0")
|
||||||
fail("lbry://test/path")
|
fail("lbry://test/path")
|
||||||
fail("lbry://@test1:1ab/fakepath")
|
fail("lbry://@test1*1ab/fakepath")
|
||||||
fail("lbry://test:1:1:1")
|
fail("lbry://test:1:1:1")
|
||||||
fail("whatever/lbry://test")
|
fail("whatever/lbry://test")
|
||||||
fail("lbry://lbry://test")
|
fail("lbry://lbry://test")
|
||||||
|
@ -115,5 +115,5 @@ class TestURLParsing(unittest.TestCase):
|
||||||
fail("lbry://abc:0x123")
|
fail("lbry://abc:0x123")
|
||||||
fail("lbry://abc:0x123/page")
|
fail("lbry://abc:0x123/page")
|
||||||
fail("lbry://@test1#ABCDEF/fakepath")
|
fail("lbry://@test1#ABCDEF/fakepath")
|
||||||
fail("test:0001")
|
fail("test*0001")
|
||||||
fail("lbry://@test1$1/fakepath?arg1&arg2&arg3")
|
fail("lbry://@test1$1/fakepath?arg1&arg2&arg3")
|
||||||
|
|
Loading…
Reference in a new issue