Merge pull request #3542 from eug3nix/gh_3481_file_type_detection

file type detection now looks inside the file to determine the type, in addition to using the file extension
This commit is contained in:
Lex Berezhny 2022-01-31 10:29:47 -05:00 committed by GitHub
commit a1abd94387
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 83 additions and 1 deletions

View file

@ -1,4 +1,6 @@
import os import os
import filetype
import logging
types_map = { types_map = {
# http://www.iana.org/assignments/media-types # http://www.iana.org/assignments/media-types
@ -166,10 +168,38 @@ types_map = {
'.wmv': ('video/x-ms-wmv', 'video') '.wmv': ('video/x-ms-wmv', 'video')
} }
# maps detected extensions to the possible analogs
# i.e. .cbz file is actually a .zip
synonyms_map = {
'.zip': ['.cbz'],
'.rar': ['.cbr'],
'.ar': ['.a']
}
log = logging.getLogger(__name__)
def guess_media_type(path): def guess_media_type(path):
_, ext = os.path.splitext(path) _, ext = os.path.splitext(path)
extension = ext.strip().lower() extension = ext.strip().lower()
try:
kind = filetype.guess(path)
if kind:
real_extension = f".{kind.extension}"
if extension != real_extension:
if extension:
log.warning(f"file extension does not match it's contents: {path}, identified as {real_extension}")
else:
log.debug(f"file {path} does not have extension, identified by it's contents as {real_extension}")
if extension not in synonyms_map.get(real_extension, []):
extension = real_extension
except OSError as error:
pass
if extension[1:]: if extension[1:]:
if extension in types_map: if extension in types_map:
return types_map[extension] return types_map[extension]

View file

@ -56,7 +56,8 @@ setup(
'attrs==18.2.0', 'attrs==18.2.0',
'pylru==1.1.0', 'pylru==1.1.0',
'elasticsearch==7.10.1', 'elasticsearch==7.10.1',
'grpcio==1.38.0' 'grpcio==1.38.0',
'filetype==1.0.9'
] + PLYVEL, ] + PLYVEL,
extras_require={ extras_require={
'torrent': ['lbry-libtorrent'], 'torrent': ['lbry-libtorrent'],

View file

@ -0,0 +1,51 @@
import unittest
import tempfile
import os
from lbry.schema.mime_types import guess_media_type
class MediaTypeTests(unittest.TestCase):
def test_guess_media_type_from_path_only(self):
kind = guess_media_type('/tmp/test.mkv')
self.assertEqual(kind, ('video/x-matroska', 'video'))
def test_defaults_for_no_extension(self):
kind = guess_media_type('/tmp/test')
self.assertEqual(kind, ('application/octet-stream', 'binary'))
def test_defaults_for_unknown_extension(self):
kind = guess_media_type('/tmp/test.unk')
self.assertEqual(kind, ('application/x-ext-unk', 'binary'))
def test_spoofed_unknown(self):
with tempfile.TemporaryDirectory() as temp_dir:
file = os.path.join(temp_dir, 'spoofed_unknown.txt')
with open(file, 'wb') as fd:
bytes_lz4 = bytearray([0x04,0x22,0x4d,0x18])
fd.write(bytes_lz4)
fd.close()
kind = guess_media_type(file)
self.assertEqual(kind, ('application/x-ext-lz4', 'binary'))
def test_spoofed_known(self):
with tempfile.TemporaryDirectory() as temp_dir:
file = os.path.join(temp_dir, 'spoofed_known.avi')
with open(file, 'wb') as fd:
bytes_zip = bytearray([0x50,0x4b,0x03,0x06])
fd.write(bytes_zip)
fd.close()
kind = guess_media_type(file)
self.assertEqual(kind, ('application/zip', 'binary'))
def test_spoofed_synonym(self):
with tempfile.TemporaryDirectory() as temp_dir:
file = os.path.join(temp_dir, 'spoofed_known.cbz')
with open(file, 'wb') as fd:
bytes_zip = bytearray([0x50,0x4b,0x03,0x06])
fd.write(bytes_zip)
fd.close()
kind = guess_media_type(file)
self.assertEqual(kind, ('application/vnd.comicbook+zip', 'document'))