Minecraft-Overviewer/overviewer_core/files.py

182 lines
7.7 KiB
Python

# This file is part of the Minecraft Overviewer.
#
# Minecraft Overviewer is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Minecraft Overviewer is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the Overviewer. If not, see <http://www.gnu.org/licenses/>.
import os
import os.path
import tempfile
import shutil
import logging
import stat
import errno
default_caps = {"chmod_works": True, "rename_works": True}
def get_fs_caps(dir_to_test):
return {"chmod_works": does_chmod_work(dir_to_test),
"rename_works": does_rename_work(dir_to_test)
}
def does_chmod_work(dir_to_test):
"Detects if chmod works in a given directory"
# a CIFS mounted FS is the only thing known to reliably not provide chmod
if not os.path.isdir(dir_to_test):
return True
f1 = tempfile.NamedTemporaryFile(dir=dir_to_test)
try:
f1_stat = os.stat(f1.name)
os.chmod(f1.name, f1_stat.st_mode | stat.S_IRUSR)
chmod_works = True
logging.debug("Detected that chmods work in %r" % dir_to_test)
except OSError:
chmod_works = False
logging.debug("Detected that chmods do NOT work in %r" % dir_to_test)
return chmod_works
def does_rename_work(dir_to_test):
try:
with tempfile.NamedTemporaryFile(dir=dir_to_test) as f1:
with tempfile.NamedTemporaryFile(dir=dir_to_test) as f2:
try:
os.rename(f1.name,f2.name)
except OSError:
renameworks = False
logging.debug("Detected that overwriting renames do NOT work in %r"
% dir_to_test)
else:
renameworks = True
logging.debug("Detected that overwriting renames work in %r" % dir_to_test)
# re-make this file so it can be deleted without error
open(f1.name, 'w').close()
except FileNotFoundError:
# Special handling for CIFS, which simply cannot cope with any rename whatsoever
renameworks = False
return renameworks
## useful recursive copy, that ignores common OS cruft
def mirror_dir(src, dst, entities=None, capabilities=default_caps, force_writable=False):
'''copies all of the entities from src to dst'''
chmod_works = capabilities.get("chmod_works")
if not os.path.exists(dst):
os.mkdir(dst)
if entities and type(entities) != list: raise Exception("Expected a list, got a %r instead" % type(entities))
# files which are problematic and should not be copied
# usually, generated by the OS
skip_files = ['Thumbs.db', '.DS_Store']
for entry in os.listdir(src):
if entry in skip_files:
continue
if entities and entry not in entities:
continue
if os.path.isdir(os.path.join(src,entry)):
mirror_dir(os.path.join(src, entry), os.path.join(dst, entry), capabilities=capabilities, force_writable=force_writable)
elif os.path.isfile(os.path.join(src,entry)):
try:
if chmod_works:
shutil.copy(os.path.join(src, entry), os.path.join(dst, entry))
if force_writable: # for shitty neckbeard ware
dst_stat = os.stat(os.path.join(dst, entry))
os.chmod(os.path.join(dst, entry), dst_stat.st_mode | stat.S_IWUSR)
else:
shutil.copyfile(os.path.join(src, entry), os.path.join(dst, entry))
except IOError as outer:
try:
# maybe permission problems?
src_stat = os.stat(os.path.join(src, entry))
os.chmod(os.path.join(src, entry), src_stat.st_mode | stat.S_IRUSR)
dst_stat = os.stat(os.path.join(dst, entry))
os.chmod(os.path.join(dst, entry), dst_stat.st_mode | stat.S_IWUSR)
except OSError: # we don't care if this fails
pass
# try again; if this stills throws an error, let it propagate up
if chmod_works:
shutil.copy(os.path.join(src, entry), os.path.join(dst, entry))
if force_writable:
dst_stat = os.stat(os.path.join(dst, entry))
os.chmod(os.path.join(dst, entry), dst_stat.st_mode | stat.S_IWUSR)
else:
shutil.copyfile(os.path.join(src, entry), os.path.join(dst, entry))
# Define a context manager to handle atomic renaming or "just forget it write
# straight to the file" depending on whether os.rename provides atomic
# overwrites.
# Detect whether os.rename will overwrite files
doc = """This class acts as a context manager for files that are to be written
out overwriting an existing file.
The parameter is the destination filename. The value returned into the context
is the filename that should be used. On systems that support an atomic
os.rename(), the filename will actually be a temporary file, and it will be
atomically replaced over the destination file on exit.
On systems that don't support an atomic rename, the filename returned is the
filename given.
If an error is encountered, the file is attempted to be removed, and the error
is propagated.
Example:
with FileReplacer("config") as configname:
with open(configout, 'w') as configout:
configout.write(newconfig)
"""
class FileReplacer(object):
__doc__ = doc
def __init__(self, destname, capabilities=default_caps):
self.caps = capabilities
self.destname = destname
if self.caps.get("rename_works"):
self.tmpname = destname + ".tmp"
def __enter__(self):
if self.caps.get("rename_works"):
# rename works here. Return a temporary filename
return self.tmpname
return self.destname
def __exit__(self, exc_type, exc_val, exc_tb):
if self.caps.get("rename_works"):
if exc_type:
# error
try:
os.remove(self.tmpname)
except Exception as e:
logging.warning("An error was raised, so I was doing "
"some cleanup first, but I couldn't remove "
"'%s'!", self.tmpname)
else:
# copy permission bits, if needed
if self.caps.get("chmod_works") and os.path.exists(self.destname):
try:
shutil.copymode(self.destname, self.tmpname)
except OSError as e:
# Ignore errno ENOENT: file does not exist. Due to a race
# condition, two processes could conceivably try and update
# the same temp file at the same time
if e.errno != errno.ENOENT:
raise
# atomic rename into place
try:
os.rename(self.tmpname, self.destname)
except OSError as e:
# Ignore errno ENOENT: file does not exist. Due to a race
# condition, two processes could conceivably try and update
# the same temp file at the same time
if e.errno != errno.ENOENT:
raise