From 1a743bb74780ac269b366c8557dbd3f900fa4bc6 Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Tue, 12 Jan 2021 14:51:43 -0800 Subject: [PATCH] Replace secure_write, is_hidden, exists with jupyter_core's --- jupyter_server/base/handlers.py | 3 +- jupyter_server/serverapp.py | 6 +- .../services/contents/filemanager.py | 6 +- .../services/kernels/kernelmanager.py | 3 +- jupyter_server/utils.py | 226 +----------------- tests/test_utils.py | 47 +--- 6 files changed, 14 insertions(+), 277 deletions(-) diff --git a/jupyter_server/base/handlers.py b/jupyter_server/base/handlers.py index 57a37bda83..c230753246 100755 --- a/jupyter_server/base/handlers.py +++ b/jupyter_server/base/handlers.py @@ -28,10 +28,11 @@ from ipython_genutils.path import filefind from ipython_genutils.py3compat import string_types +from jupyter_core.paths import is_hidden import jupyter_server from jupyter_server._tz import utcnow from jupyter_server.i18n import combine_translations -from jupyter_server.utils import ensure_async, is_hidden, url_path_join, url_is_absolute, url_escape +from jupyter_server.utils import ensure_async, url_path_join, url_is_absolute, url_escape from jupyter_server.services.security import csp_report_uri #----------------------------------------------------------------------------- diff --git a/jupyter_server/serverapp.py b/jupyter_server/serverapp.py index db2bdefed3..7602e4569d 100755 --- a/jupyter_server/serverapp.py +++ b/jupyter_server/serverapp.py @@ -7,14 +7,12 @@ from __future__ import absolute_import, print_function import jupyter_server -import asyncio import binascii import datetime import errno import gettext import hashlib import hmac -import importlib import io import ipaddress import json @@ -34,7 +32,6 @@ import webbrowser import urllib -from types import ModuleType from base64 import encodebytes try: import resource @@ -44,8 +41,9 @@ from jinja2 import Environment, FileSystemLoader +from jupyter_core.paths import secure_write from jupyter_server.transutils import trans, _ -from jupyter_server.utils import secure_write, run_sync +from jupyter_server.utils import run_sync # the minimum viable tornado version: needs to be kept in sync with setup.py MIN_TORNADO = (6, 1, 0) diff --git a/jupyter_server/services/contents/filemanager.py b/jupyter_server/services/contents/filemanager.py index e6fc1e53ca..18df0866f2 100644 --- a/jupyter_server/services/contents/filemanager.py +++ b/jupyter_server/services/contents/filemanager.py @@ -19,17 +19,13 @@ from .filecheckpoints import AsyncFileCheckpoints, FileCheckpoints from .fileio import AsyncFileManagerMixin, FileManagerMixin from .manager import AsyncContentsManager, ContentsManager -from ...utils import exists from ipython_genutils.importstring import import_item from traitlets import Any, Unicode, Bool, TraitError, observe, default, validate from ipython_genutils.py3compat import getcwd, string_types +from jupyter_core.paths import exists, is_hidden, is_file_hidden from jupyter_server import _tz as tz -from jupyter_server.utils import ( - is_hidden, is_file_hidden, - to_api_path, -) from jupyter_server.base.handlers import AuthenticatedFileHandler from jupyter_server.transutils import _ diff --git a/jupyter_server/services/kernels/kernelmanager.py b/jupyter_server/services/kernels/kernelmanager.py index 8f0df097de..f6f9cc5064 100644 --- a/jupyter_server/services/kernels/kernelmanager.py +++ b/jupyter_server/services/kernels/kernelmanager.py @@ -18,11 +18,12 @@ from jupyter_client.session import Session from jupyter_client.multikernelmanager import MultiKernelManager, AsyncMultiKernelManager +from jupyter_core.paths import exists from traitlets import (Any, Bool, Dict, List, Unicode, TraitError, Integer, Float, Instance, default, validate ) -from jupyter_server.utils import to_os_path, exists, ensure_async, run_sync +from jupyter_server.utils import to_os_path, ensure_async from jupyter_server._tz import utcnow, isoformat from ipython_genutils.py3compat import getcwd diff --git a/jupyter_server/utils.py b/jupyter_server/utils.py index 42a6ae9278..f083476e6f 100644 --- a/jupyter_server/utils.py +++ b/jupyter_server/utils.py @@ -6,42 +6,17 @@ from __future__ import print_function import asyncio -import concurrent.futures -import ctypes import errno import inspect import os -import stat import sys from distutils.version import LooseVersion -from contextlib import contextmanager - from urllib.parse import quote, unquote, urlparse, urljoin from urllib.request import pathname2url - -# tornado.concurrent.Future is asyncio.Future -# in tornado >=5 with Python 3 -from tornado.concurrent import Future as TornadoFuture -from tornado import gen from ipython_genutils import py3compat -# UF_HIDDEN is a stat flag not defined in the stat module. -# It is used by BSD to indicate hidden files. -UF_HIDDEN = getattr(stat, 'UF_HIDDEN', 32768) - - -def exists(path): - """Replacement for `os.path.exists` which works for host mapped volumes - on Windows containers - """ - try: - os.lstat(path) - except OSError: - return False - return True - def url_path_join(*pieces): """Join components of url into a relative url @@ -58,10 +33,12 @@ def url_path_join(*pieces): if result == '//': result = '/' return result + def url_is_absolute(url): """Determine whether a given URL is absolute""" return urlparse(url).path.startswith("/") + def path2url(path): """Convert a local file path to a URL""" pieces = [ quote(p) for p in path.split(os.sep) ] @@ -71,12 +48,14 @@ def path2url(path): url = url_path_join(*pieces) return url + def url2path(url): """Convert a URL to a local file path""" pieces = [ unquote(p) for p in url.split('/') ] path = os.path.join(*pieces) return path + def url_escape(path): """Escape special characters in a URL path @@ -85,6 +64,7 @@ def url_escape(path): parts = py3compat.unicode_to_str(path, encoding='utf8').split('/') return u'/'.join([quote(p) for p in parts]) + def url_unescape(path): """Unescape special characters in a URL path @@ -96,201 +76,6 @@ def url_unescape(path): ]) -def is_file_hidden_win(abs_path, stat_res=None): - """Is a file hidden? - - This only checks the file itself; it should be called in combination with - checking the directory containing the file. - - Use is_hidden() instead to check the file and its parent directories. - - Parameters - ---------- - abs_path : unicode - The absolute path to check. - stat_res : os.stat_result, optional - Ignored on Windows, exists for compatibility with POSIX version of the - function. - """ - if os.path.basename(abs_path).startswith('.'): - return True - - win32_FILE_ATTRIBUTE_HIDDEN = 0x02 - try: - attrs = ctypes.windll.kernel32.GetFileAttributesW( - py3compat.cast_unicode(abs_path) - ) - except AttributeError: - pass - else: - if attrs > 0 and attrs & win32_FILE_ATTRIBUTE_HIDDEN: - return True - - return False - -def is_file_hidden_posix(abs_path, stat_res=None): - """Is a file hidden? - - This only checks the file itself; it should be called in combination with - checking the directory containing the file. - - Use is_hidden() instead to check the file and its parent directories. - - Parameters - ---------- - abs_path : unicode - The absolute path to check. - stat_res : os.stat_result, optional - The result of calling stat() on abs_path. If not passed, this function - will call stat() internally. - """ - if os.path.basename(abs_path).startswith('.'): - return True - - if stat_res is None or stat.S_ISLNK(stat_res.st_mode): - try: - stat_res = os.stat(abs_path) - except OSError as e: - if e.errno == errno.ENOENT: - return False - raise - - # check that dirs can be listed - if stat.S_ISDIR(stat_res.st_mode): - # use x-access, not actual listing, in case of slow/large listings - if not os.access(abs_path, os.X_OK | os.R_OK): - return True - - # check UF_HIDDEN - if getattr(stat_res, 'st_flags', 0) & UF_HIDDEN: - return True - - return False - -if sys.platform == 'win32': - is_file_hidden = is_file_hidden_win -else: - is_file_hidden = is_file_hidden_posix - -def is_hidden(abs_path, abs_root=''): - """Is a file hidden or contained in a hidden directory? - - This will start with the rightmost path element and work backwards to the - given root to see if a path is hidden or in a hidden directory. Hidden is - determined by either name starting with '.' or the UF_HIDDEN flag as - reported by stat. - - If abs_path is the same directory as abs_root, it will be visible even if - that is a hidden folder. This only checks the visibility of files - and directories *within* abs_root. - - Parameters - ---------- - abs_path : unicode - The absolute path to check for hidden directories. - abs_root : unicode - The absolute path of the root directory in which hidden directories - should be checked for. - """ - if os.path.normpath(abs_path) == os.path.normpath(abs_root): - return False - - if is_file_hidden(abs_path): - return True - - if not abs_root: - abs_root = abs_path.split(os.sep, 1)[0] + os.sep - inside_root = abs_path[len(abs_root):] - if any(part.startswith('.') for part in inside_root.split(os.sep)): - return True - - # check UF_HIDDEN on any location up to root. - # is_file_hidden() already checked the file, so start from its parent dir - path = os.path.dirname(abs_path) - while path and path.startswith(abs_root) and path != abs_root: - if not exists(path): - path = os.path.dirname(path) - continue - try: - # may fail on Windows junctions - st = os.lstat(path) - except OSError: - return True - if getattr(st, 'st_flags', 0) & UF_HIDDEN: - return True - path = os.path.dirname(path) - - return False - -# TODO: Move to jupyter_core -def win32_restrict_file_to_user(fname): - """Secure a windows file to read-only access for the user. - Follows guidance from win32 library creator: - http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html - - This method should be executed against an already generated file which - has no secrets written to it yet. - - Parameters - ---------- - - fname : unicode - The path to the file to secure - """ - import win32api - import win32security - import ntsecuritycon as con - - # everyone, _domain, _type = win32security.LookupAccountName("", "Everyone") - admins = win32security.CreateWellKnownSid(win32security.WinBuiltinAdministratorsSid) - user, _domain, _type = win32security.LookupAccountName("", win32api.GetUserName()) - - sd = win32security.GetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION) - - dacl = win32security.ACL() - # dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, everyone) - dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_GENERIC_READ | con.FILE_GENERIC_WRITE, user) - dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, admins) - - sd.SetSecurityDescriptorDacl(1, dacl, 0) - win32security.SetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION, sd) - -# TODO: Move to jupyter_core -@contextmanager -def secure_write(fname, binary=False): - """Opens a file in the most restricted pattern available for - writing content. This limits the file mode to `600` and yields - the resulting opened filed handle. - - Parameters - ---------- - - fname : unicode - The path to the file to write - """ - mode = 'wb' if binary else 'w' - open_flag = os.O_CREAT | os.O_WRONLY | os.O_TRUNC - try: - os.remove(fname) - except (IOError, OSError): - # Skip any issues with the file not existing - pass - - if os.name == 'nt': - # Python on windows does not respect the group and public bits for chmod, so we need - # to take additional steps to secure the contents. - # Touch file pre-emptively to avoid editing permissions in open files in Windows - fd = os.open(fname, os.O_CREAT | os.O_WRONLY | os.O_TRUNC, 0o600) - os.close(fd) - open_flag = os.O_WRONLY | os.O_TRUNC - win32_restrict_file_to_user(fname) - - with os.fdopen(os.open(fname, open_flag, 0o600), mode) as f: - if os.name != 'nt': - # Enforce that the file got the requested permissions before writing - assert '0600' == oct(stat.S_IMODE(os.stat(fname).st_mode)).replace('0o', '0') - yield f - def samefile_simple(path, other_path): """ Fill in for os.path.samefile when it is unavailable (Windows+py2). @@ -328,6 +113,7 @@ def to_os_path(path, root=''): path = os.path.join(root, *parts) return path + def to_api_path(os_path, root=''): """Convert a filesystem path to an API path diff --git a/tests/test_utils.py b/tests/test_utils.py index 54ad7ca2db..c047e8307c 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,11 +1,7 @@ -import os -import sys -import ctypes import pytest from traitlets.tests.utils import check_help_all_output -from jupyter_server.utils import url_escape, url_unescape, is_hidden, is_file_hidden, secure_write -from ipython_genutils.py3compat import cast_unicode +from jupyter_server.utils import url_escape, url_unescape def test_help_output(): @@ -41,44 +37,3 @@ def test_url_escaping(unescaped, escaped): # Test unescaping. path = url_unescape(escaped) assert path == unescaped - - -def test_is_hidden(tmp_path): - root = str(tmp_path) - subdir1_path = tmp_path / 'subdir' - subdir1_path.mkdir() - subdir1 = str(subdir1_path) - assert not is_hidden(subdir1, root) - assert not is_file_hidden(subdir1) - - subdir2_path = tmp_path / '.subdir2' - subdir2_path.mkdir() - subdir2 = str(subdir2_path) - assert is_hidden(subdir2, root) - assert is_file_hidden(subdir2) - - subdir34_path = tmp_path / 'subdir3' / '.subdir4' - subdir34_path.mkdir(parents=True) - subdir34 = str(subdir34_path) - assert is_hidden(subdir34, root) - assert is_hidden(subdir34) - - subdir56_path = tmp_path / '.subdir5' / 'subdir6' - subdir56_path.mkdir(parents=True) - subdir56 = str(subdir56_path) - assert is_hidden(subdir56, root) - assert is_hidden(subdir56) - assert not is_file_hidden(subdir56) - assert not is_file_hidden(subdir56, os.stat(subdir56)) - - -@pytest.mark.skipif(sys.platform != "win32", reason="Test is not windows.") -def test_is_hidden_win32(tmp_path): - root = str(tmp_path) - root = cast_unicode(root) - subdir1 = tmp_path / 'subdir' - subdir1.mkdir() - assert not is_hidden(str(subdir1), root) - ctypes.windll.kernel32.SetFileAttributesW(str(subdir1), 0x02) - assert is_hidden(str(subdir1), root) - assert is_file_hidden(str(subdir1))