From 6b8d4a444bd747a8942c62c9066c797cc2ec36a1 Mon Sep 17 00:00:00 2001 From: FemtosecondLaser <38204088+FemtosecondLaser@users.noreply.github.com> Date: Wed, 20 Oct 2021 15:26:16 +0100 Subject: [PATCH] Modified ensure_directory_exists() to check if the directory is writable by the process. --- lbry/extras/cli.py | 3 +++ tests/unit/test_cli.py | 46 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/lbry/extras/cli.py b/lbry/extras/cli.py index c263a84d9..a33543faf 100644 --- a/lbry/extras/cli.py +++ b/lbry/extras/cli.py @@ -226,6 +226,9 @@ def get_argument_parser(): def ensure_directory_exists(path: str): if not os.path.isdir(path): pathlib.Path(path).mkdir(parents=True, exist_ok=True) + use_effective_ids = os.access in os.supports_effective_ids + if not os.access(path, os.W_OK, effective_ids=use_effective_ids): + raise PermissionError(f"The following directory is not writable: {path}") LOG_MODULES = 'lbry', 'aioupnp' diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index 935364f05..2f9b7219c 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -3,16 +3,18 @@ import tempfile import shutil import contextlib import logging +import pathlib from io import StringIO from unittest import TestCase from unittest.mock import patch +from pyfakefs.fake_filesystem_unittest import TestCase as FakeFSTestCase from types import SimpleNamespace from contextlib import asynccontextmanager import docopt from lbry.testcase import AsyncioTestCase -from lbry.extras.cli import normalize_value, main, setup_logging +from lbry.extras.cli import normalize_value, main, setup_logging, ensure_directory_exists from lbry.extras.system_info import get_platform from lbry.extras.daemon.daemon import Daemon from lbry.conf import Config @@ -202,3 +204,45 @@ class DaemonDocsTests(TestCase): pass if failures: self.fail("\n" + "\n".join(failures)) + + +class DirectoryTests(FakeFSTestCase): + + def setUp(self): + self.setUpPyfakefs() + + def test_when_dir_does_not_exist_then_it_is_created(self): + dir_path = "dir" + ensure_directory_exists(dir_path) + self.assertTrue(os.path.exists(dir_path)) + + def test_when_parent_dir_does_not_exist_then_dir_is_created_with_parent(self): + dir_path = os.path.join("parent_dir", "dir") + ensure_directory_exists(dir_path) + self.assertTrue(os.path.exists(dir_path)) + + def test_when_dir_exists_then_it_still_exists(self): + dir_path = "dir" + pathlib.Path(dir_path).mkdir() + ensure_directory_exists(dir_path) + self.assertTrue(os.path.exists(dir_path)) + + def test_when_non_writable_dir_exists_then_raise(self): + dir_path = "dir" + pathlib.Path(dir_path).mkdir(mode=0o555) # creates a non-writable, readable and executable dir + with self.assertRaises(PermissionError): + ensure_directory_exists(dir_path) + + def test_when_dir_exists_and_writable_then_no_raise(self): + dir_path = "dir" + pathlib.Path(dir_path).mkdir(mode=0o777) # creates a writable, readable and executable dir + try: + ensure_directory_exists(dir_path) + except (FileExistsError, PermissionError) as err: + self.fail(f"{type(err).__name__} was raised") + + def test_when_file_exists_at_path_then_raise(self): + file_path = "file.extension" + self.fs.create_file(file_path) + with self.assertRaises(FileExistsError): + ensure_directory_exists(file_path)