# This file is part of the Minecraft Overviewer. # # Minecraft Overviewer is free software: you can redistribute it and/or # modify it under the terms of the GNU General Public License as published # by the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # Minecraft Overviewer is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. # # You should have received a copy of the GNU General Public License along # with the Overviewer. If not, see . import os import os.path import tempfile import shutil import logging import stat import errno default_caps = {"chmod_works": True, "rename_works": True} def get_fs_caps(dir_to_test): return {"chmod_works": does_chmod_work(dir_to_test), "rename_works": does_rename_work(dir_to_test) } def does_chmod_work(dir_to_test): "Detects if chmod works in a given directory" # a CIFS mounted FS is the only thing known to reliably not provide chmod if not os.path.isdir(dir_to_test): return True f1 = tempfile.NamedTemporaryFile(dir=dir_to_test) try: f1_stat = os.stat(f1.name) os.chmod(f1.name, f1_stat.st_mode | stat.S_IRUSR) chmod_works = True logging.debug("Detected that chmods work in %r" % dir_to_test) except OSError: chmod_works = False logging.debug("Detected that chmods do NOT work in %r" % dir_to_test) return chmod_works def does_rename_work(dir_to_test): try: with tempfile.NamedTemporaryFile(dir=dir_to_test) as f1: with tempfile.NamedTemporaryFile(dir=dir_to_test) as f2: try: os.rename(f1.name,f2.name) except OSError: renameworks = False logging.debug("Detected that overwriting renames do NOT work in %r" % dir_to_test) else: renameworks = True logging.debug("Detected that overwriting renames work in %r" % dir_to_test) # re-make this file so it can be deleted without error open(f1.name, 'w').close() except FileNotFoundError: # Special handling for CIFS, which simply cannot cope with any rename whatsoever renameworks = False return renameworks ## useful recursive copy, that ignores common OS cruft def mirror_dir(src, dst, entities=None, capabilities=default_caps, force_writable=False): '''copies all of the entities from src to dst''' chmod_works = capabilities.get("chmod_works") if not os.path.exists(dst): os.mkdir(dst) if entities and type(entities) != list: raise Exception("Expected a list, got a %r instead" % type(entities)) # files which are problematic and should not be copied # usually, generated by the OS skip_files = ['Thumbs.db', '.DS_Store'] for entry in os.listdir(src): if entry in skip_files: continue if entities and entry not in entities: continue if os.path.isdir(os.path.join(src,entry)): mirror_dir(os.path.join(src, entry), os.path.join(dst, entry), capabilities=capabilities, force_writable=force_writable) elif os.path.isfile(os.path.join(src,entry)): try: if chmod_works: shutil.copy(os.path.join(src, entry), os.path.join(dst, entry)) if force_writable: # for shitty neckbeard ware dst_stat = os.stat(os.path.join(dst, entry)) os.chmod(os.path.join(dst, entry), dst_stat.st_mode | stat.S_IWUSR) else: shutil.copyfile(os.path.join(src, entry), os.path.join(dst, entry)) except IOError as outer: try: # maybe permission problems? src_stat = os.stat(os.path.join(src, entry)) os.chmod(os.path.join(src, entry), src_stat.st_mode | stat.S_IRUSR) dst_stat = os.stat(os.path.join(dst, entry)) os.chmod(os.path.join(dst, entry), dst_stat.st_mode | stat.S_IWUSR) except OSError: # we don't care if this fails pass # try again; if this stills throws an error, let it propagate up if chmod_works: shutil.copy(os.path.join(src, entry), os.path.join(dst, entry)) if force_writable: dst_stat = os.stat(os.path.join(dst, entry)) os.chmod(os.path.join(dst, entry), dst_stat.st_mode | stat.S_IWUSR) else: shutil.copyfile(os.path.join(src, entry), os.path.join(dst, entry)) # Define a context manager to handle atomic renaming or "just forget it write # straight to the file" depending on whether os.rename provides atomic # overwrites. # Detect whether os.rename will overwrite files doc = """This class acts as a context manager for files that are to be written out overwriting an existing file. The parameter is the destination filename. The value returned into the context is the filename that should be used. On systems that support an atomic os.rename(), the filename will actually be a temporary file, and it will be atomically replaced over the destination file on exit. On systems that don't support an atomic rename, the filename returned is the filename given. If an error is encountered, the file is attempted to be removed, and the error is propagated. Example: with FileReplacer("config") as configname: with open(configout, 'w') as configout: configout.write(newconfig) """ class FileReplacer(object): __doc__ = doc def __init__(self, destname, capabilities=default_caps): self.caps = capabilities self.destname = destname if self.caps.get("rename_works"): self.tmpname = destname + ".tmp" def __enter__(self): if self.caps.get("rename_works"): # rename works here. Return a temporary filename return self.tmpname return self.destname def __exit__(self, exc_type, exc_val, exc_tb): if self.caps.get("rename_works"): if exc_type: # error try: os.remove(self.tmpname) except Exception as e: logging.warning("An error was raised, so I was doing " "some cleanup first, but I couldn't remove " "'%s'!", self.tmpname) else: # copy permission bits, if needed if self.caps.get("chmod_works") and os.path.exists(self.destname): try: shutil.copymode(self.destname, self.tmpname) except OSError as e: # Ignore errno ENOENT: file does not exist. Due to a race # condition, two processes could conceivably try and update # the same temp file at the same time if e.errno != errno.ENOENT: raise # atomic rename into place try: os.rename(self.tmpname, self.destname) except OSError as e: # Ignore errno ENOENT: file does not exist. Due to a race # condition, two processes could conceivably try and update # the same temp file at the same time if e.errno != errno.ENOENT: raise