forked from LBRYCommunity/lbry-sdk
Clean up
This commit is contained in:
parent
d4d1d87680
commit
2eec26b648
2 changed files with 20 additions and 16 deletions
|
@ -15,16 +15,16 @@ from lbry.error import InvalidStreamDescriptorError
|
|||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
_exprs = [
|
||||
'[<>:"/\\\|\?\*]+', # Illegal characters
|
||||
'[\\x00-\\x1F]+', # All characters in range 0-31
|
||||
'[ \t]*(\.)+[ \t]*$', # Dots at the end
|
||||
'(^[ \t]+|[ \t]+$)', # Leading and trailing whitespace
|
||||
'^CON$', '^PRN$', '^AUX$', # Illegal names
|
||||
'^NUL$', '^COM[1-9]$', '^LPT[1-9]$'
|
||||
]
|
||||
|
||||
RES = re.compile('(' + '|'.join((expr for expr in _exprs)) + ')')
|
||||
RE_ILLEGAL_FILENAME_CHARS = re.compile(
|
||||
'('
|
||||
'[<>:"/\\\|\?\*]+|' # Illegal characters
|
||||
'[\\x00-\\x1F]+|' # All characters in range 0-31
|
||||
'[ \t]*(\.)+[ \t]*$|' # Dots at the end
|
||||
'(^[ \t]+|[ \t]+$)|' # Leading and trailing whitespace
|
||||
'^CON$|^PRN$|^AUX$|' # Illegal names
|
||||
'^NUL$|^COM[1-9]$|^LPT[1-9]$' # ...
|
||||
')'
|
||||
)
|
||||
|
||||
|
||||
def format_sd_info(stream_name: str, key: str, suggested_file_name: str, stream_hash: str,
|
||||
|
@ -60,8 +60,8 @@ def file_reader(file_path: str):
|
|||
|
||||
def sanitize_file_name(dirty_name: str):
|
||||
file_name, ext = os.path.splitext(dirty_name)
|
||||
file_name = re.sub(RES, '', file_name)
|
||||
ext = re.sub(RES, '', ext)
|
||||
file_name = re.sub(RE_ILLEGAL_FILENAME_CHARS, '', file_name)
|
||||
ext = re.sub(RE_ILLEGAL_FILENAME_CHARS, '', ext)
|
||||
|
||||
if not file_name:
|
||||
log.warning('Unable to suggest a file name for %s', dirty_name)
|
||||
|
|
|
@ -79,10 +79,14 @@ class TestStreamDescriptor(AsyncioTestCase):
|
|||
await self._test_invalid_sd()
|
||||
|
||||
def test_sanitize_file_name(self):
|
||||
test_cases = [' t/-?t|.g.ext ', 'end_dot .', '.file\0\0', 'test n\16ame.ext', 'COM8', 'LPT2', '']
|
||||
expected = ['t-t.g.ext', 'end_dot', '.file', 'test name.ext', '', '', '']
|
||||
actual = [sanitize_file_name(tc) for tc in test_cases]
|
||||
self.assertListEqual(actual, expected)
|
||||
self.assertEqual(sanitize_file_name(' t/-?t|.g.ext '), 't-t.g.ext')
|
||||
self.assertEqual(sanitize_file_name('end_dot .'), 'end_dot')
|
||||
self.assertEqual(sanitize_file_name('.file\0\0'), '.file')
|
||||
self.assertEqual(sanitize_file_name('test n\16ame.ext'), 'test name.ext')
|
||||
self.assertEqual(sanitize_file_name('COM8'), '')
|
||||
self.assertEqual(sanitize_file_name('LPT2'), '')
|
||||
self.assertEqual(sanitize_file_name(''), '')
|
||||
|
||||
|
||||
class TestRecoverOldStreamDescriptors(AsyncioTestCase):
|
||||
async def test_old_key_sort_sd_blob(self):
|
||||
|
|
Loading…
Add table
Reference in a new issue