This commit is contained in:
Miroslav Kovar 2019-10-08 22:43:11 +02:00 committed by Lex Berezhny
parent 268f6447cb
commit f92b610a03
3 changed files with 18 additions and 18 deletions

View file

@ -48,24 +48,24 @@ def file_reader(file_path: str):
def sanitize_file_name(dirty_name: str):
RE_IL_CHARS = re.compile(r'[<>:"/\\\|\?\*]')
RE_DOTS_AT_END = re.compile(r'[ \t]*(\.)+[ \t]$')
RE_LEAD_TRAIL_SPACE = re.compile(r'(^[ \t]+|[ \t]+$)')
res_il_names = \
[re.compile(regex) for regex in (r'^CON$', r'^PRN$', r'^AUX$', r'^NUL$', r'^COM[1-9]$', r'^LPT[1-9]$')]
exprs = [
'[<>:"/\\\|\?\*]+', # Illegal characters
'[\\x00-\\x1F]+', # All characters in range 0-31
'[ \t]*(\.)+[ \t]*$', # Dots at the end
'(^[ \t]+|[ \t]+$)', # Leading and trailing whitespace
'^CON$', '^PRN$', '^AUX$', # Illegal names
'^NUL$', '^COM[1-9]$', '^LPT[1-9]$']
file_name = re.sub(RE_IL_CHARS, '', dirty_name)
file_name, ext = os.path.splitext(file_name)
file_name = re.sub(RE_LEAD_TRAIL_SPACE, '', file_name)
file_name = re.sub(RE_DOTS_AT_END, '', file_name)
ext = re.sub(RE_LEAD_TRAIL_SPACE, '', ext)
if any((REGEX.match(file_name) for REGEX in res_il_names)):
file_name = ''
elif file_name and len(ext) > 1:
file_name += ext
RES = re.compile('(' + '|'.join((expr for expr in exprs)) + ')')
if len(file_name) == 0:
file_name, ext = os.path.splitext(dirty_name)
file_name = re.sub(RES, '', file_name)
ext = re.sub(RES, '', ext)
if not file_name:
log.warning('Unable to suggest a file name for %s', dirty_name)
elif len(ext) > 1:
file_name += ext
return file_name

View file

@ -46,7 +46,7 @@ class TestManagedStream(BlobExchangeTestBase):
self.stream = ManagedStream(
self.loop, self.client_config, self.client_blob_manager, self.sd_hash, self.client_dir
)
await self._test_transfer_stream(1, skip_setup=True)
await self._test_transfer_stream(10, skip_setup=True)
self.assertTrue(self.stream.completed)
self.assertEqual(self.stream.file_name, 'tt_f')
self.assertTrue(self.stream.output_file_exists)

View file

@ -79,8 +79,8 @@ class TestStreamDescriptor(AsyncioTestCase):
await self._test_invalid_sd()
async def test_sanitize_file_name(self):
test_cases = [' t/-?t|.g.ext ', 'end_me .', '', '.file', 'test name.ext', 'COM8', 'LPT2']
expected = ['t-t.g.ext', 'end_me', '', '.file', 'test name.ext', '', '']
test_cases = [' t/-?t|.g.ext ', 'end_dot .', '.file\0\0', 'test n\16ame.ext', 'COM8', 'LPT2', '']
expected = ['t-t.g.ext', 'end_dot', '.file', 'test name.ext', '', '', '']
actual = [sanitize_file_name(tc) for tc in test_cases]
self.assertListEqual(actual, expected)