Source code for thutils.gfile

# encoding: utf-8

'''
@author: Tsuyoshi Hombashi
'''

from __future__ import with_statement
import os.path
import re
import sys

import six
import path

import thutils.common
from thutils.logger import logger


__INVALID_PATH_CHARS = '\:*?"<>|'


class FileType:
    FILE = 1
    DIRECTORY = 2
    LINK = 3


[docs]class NullPathError(Exception): pass
[docs]class InvalidFilePathError(Exception): pass
[docs]class FileNotFoundError(Exception): pass
[docs]class EmptyFileError(Exception): pass
[docs]class FileTypeChecker:
[docs] class FileType: TEXT = "ASCII text" BINARY = "data"
__re_text = re.compile(FileType.TEXT) @classmethod def get_file_type(cls, file_path): import magic return magic.from_file(file_path) @classmethod def is_text_file(cls, file_path): try: file_type_text = cls.get_file_type(file_path).decode("utf-8") except UnicodeDecodeError: return False return cls.__re_text.search(file_type_text) is not None
[docs]class FileManager: __dry_run = False @classmethod
[docs] def initialize(cls, dry_run): cls.__dry_run = dry_run
@classmethod
[docs] def touch(cls, touch_path): logger.debug("touch file: " + touch_path) if cls.__dry_run: return path_obj = path.Path(touch_path) cls.make_directory(path_obj.dirname()) return path_obj.touch()
@classmethod
[docs] def make_directory(cls, dir_path, force=False): try: check_file_existence(dir_path) except FileNotFoundError: pass else: logger.debug("directory already exists: " + dir_path) return path.Path(dir_path) logger.debug("make directory: " + dir_path) path_obj = path.Path(dir_path) if any([not cls.__dry_run, force]): return path_obj.makedirs_p() return path_obj
@classmethod
[docs] def copy_file(cls, src_path, dst_path): import shutil try: check_file_existence(src_path) except FileNotFoundError: _, e, _ = sys.exc_info() # for python 2.5 compatibility logger.debug(e) return False logger.debug("copy: %s -> %s" % (src_path, dst_path)) if cls.__dry_run: return True try: shutil.copyfile(src_path, dst_path) except (shutil.Error, IOError): _, e, _ = sys.exc_info() # for python 2.5 compatibility logger.debug("skip copy: " + str(e)) return True
@classmethod
[docs] def moveFile(cls, src_path, dst_path): import shutil try: check_file_existence(src_path) except FileNotFoundError: _, e, _ = sys.exc_info() # for python 2.5 compatibility logger.debug(e) return False if os.path.realpath(src_path) == os.path.realpath(dst_path): logger.debug("%s and %s are the same file" % ( src_path, dst_path)) return True logger.info("move: %s -> %s" % (src_path, dst_path)) if not cls.__dry_run: shutil.move(src_path, dst_path) return True
@classmethod
[docs] def chmod(cls, path, permission_text): """ :param str permission_text: "ls -l" style permission string. e.g. -rw-r--r-- """ try: check_file_existence(path) except FileNotFoundError: _, e, _ = sys.exc_info() # for python 2.5 compatibility logger.debug(e) return False logger.debug("chmod %s %s" % (path, permission_text)) os.chmod(path, parseLsPermissionText(permission_text))
@classmethod
[docs] def rename(cls, src_path, dst_path): try: check_file_existence(src_path) except (InvalidFilePathError, FileNotFoundError): _, e, _ = sys.exc_info() # for python 2.5 compatibility logger.exception(e) return False if thutils.common.is_empty_string(dst_path): logger.error("empty destination path") return False if os.path.lexists(dst_path): logger.error("'%s' already exists" % (dst_path)) return False logger.info("rename: %s -> %s" % (src_path, dst_path)) if not cls.__dry_run: os.rename(src_path, dst_path) return True
@classmethod
[docs] def remove_directory(cls, path): try: file_type = check_file_existence(path) except (InvalidFilePathError, FileNotFoundError): return True if file_type not in [FileType.DIRECTORY]: logger.error("not a directory: '%s'" % (path)) return False logger.debug("remove directory: " + path) if cls.__dry_run: return True try: import shutil shutil.rmtree(path, False) except (ImportError, IOError): _, e, _ = sys.exc_info() # for python 2.5 compatibility logger.exception(e) return False return True
@classmethod
[docs] def remove_file(cls, path): try: file_type = check_file_existence(path) except (InvalidFilePathError, FileNotFoundError): return True if file_type not in [FileType.FILE, FileType.LINK]: logger.error("not a file: '%s'" % (path)) return False if cls.__dry_run: return True try: os.remove(path) except Exception: _, e, _ = sys.exc_info() # for python 2.5 compatibility logger.exception(e) return False return True
@classmethod
[docs] def remove_object(cls, path): try: file_type = check_file_existence(path) except (InvalidFilePathError, FileNotFoundError): return True if file_type not in [FileType.FILE, FileType.LINK]: return cls.remove_file(path) if file_type not in [FileType.DIRECTORY]: return cls.remove_directory(path) return False
@classmethod
[docs] def removeMatchFileRecursively(cls, search_dir_path, re_remove_list): logger.debug("remove matched file: search-root=%s, re=%s" % ( search_dir_path, str(re_remove_list))) re_compile_list = [ re.compile(re_pattern) for re_pattern in re_remove_list if thutils.common.is_not_empty_string(remove_fileattern) ] dict_result_pathlist = {} for dir_path, _dir_name_list, filename_list in os.walk(search_dir_path): for filename in filename_list: for re_pattern in re_compile_list: if re_pattern.search(filename): break else: continue remove_path = os.path.join(dir_path, filename) result = cls.remove_object(remove_path) dict_result_pathlist.setdefault(result, []).append(remove_path) thutils.common.debug_dict(dict_result_pathlist, locals()) return dict_result_pathlist
@classmethod
[docs] def removeMatchDirectory(cls, search_dir_path, re_target_list): import shutil logger.debug("search-root=%s, target=%s" % ( search_dir_path, str(re_target_list))) dict_result_pathlist = {} for dir_path, dir_name_list, _filename_list in os.walk(search_dir_path): for dir_name in dir_name_list: for re_text in re_target_list: if re_text is None: logger.debug("null regular expression") continue if re.search(re_text, dir_name): break else: continue remove_path = os.path.join(dir_path, dir_name) result = False logger.debug("remove directory: " + remove_path) if not cls.__dry_run: try: shutil.rmtree(remove_path, False) result = True except (OSError, os.error): # for python 2.5 compatibility _, e, _ = sys.exc_info() logger.exception(e) dict_result_pathlist.setdefault(result, []).append(remove_path) return dict_result_pathlist
[docs]def validate_path(input_path): if thutils.common.is_empty_string(input_path): raise NullPathError() match = re.search( "[%s]" % (re.escape(__INVALID_PATH_CHARS)), os.path.basename(input_path)) if match is not None: raise InvalidFilePathError( "invalid char found in file name: '%s'" % (re.escape(match.group())))
[docs]def check_file_existence(path): """ :return: FileType :rtype: int :raises InvalidFilePathError: :raises FileNotFoundError: :raises RuntimeError: """ validate_path(path) if not os.path.lexists(path): raise FileNotFoundError(path) if os.path.isfile(path): logger.debug("file found: " + path) return FileType.FILE if os.path.isdir(path): logger.debug("directory found: " + path) return FileType.DIRECTORY if os.path.islink(path): logger.debug("link found: " + path) return FileType.LINK raise RuntimeError()
[docs]def findFile(search_root_dir_path, re_pattern_text): result = findFileAll( search_root_dir_path, os.path.isfile, re_pattern_text, find_count=1) if thutils.common.is_empty_list_or_tuple(result): return None return result[0]
[docs]def findFileAll( search_root_dir_path, check_func, re_pattern_text, find_count=six.MAXSIZE):
re_compile = re.compile(re_pattern_text) path_list = [] for dir_path, dir_name_list, filename_list in os.walk(search_root_dir_path): for file_name in dir_name_list + filename_list: path = os.path.join(dir_path, file_name) if not check_func(path): continue if re_compile.search(file_name) is None: continue path_list.append(path) if len(path_list) >= find_count: return path_list logger.debug("find file result: count=%d, files=(%s)" % ( len(path_list), ", ".join(path_list))) return path_list
[docs]def findDirectory(search_root_dir_path, re_pattern, find_count=-1): result = findFileAll( search_root_dir_path, os.path.isdir, re_pattern, find_count=1) if thutils.common.is_empty_list_or_tuple(result): return None return result[0]
[docs]def sanitize_file_name(path, replacement_text=""): path = path.strip() re_replace = re.compile("[%s]" % re.escape(__INVALID_PATH_CHARS)) return re_replace.sub(replacement_text, path)
[docs]def replace_symbol(file_name, replacement_text=""): fname = sanitize_file_name(file_name, replacement_text) if fname is None: return None re_replace = re.compile("[%s]" % re.escape(" ,.%()/")) return re_replace.sub(replacement_text, fname)
[docs]def parsePermission3Char(permission): """ 'rwx' 形式のアクセス権限文字列 permission を8進数形式に変換する :return: :rtype: int """ if len(permission) != 3: raise ValueError(permission) permission_int = 0 if permission[0] == "r": permission_int += 4 if permission[1] == "w": permission_int += 2 if permission[2] == "x": permission_int += 1 return permission_int
[docs]def parseLsPermissionText(permission_text): """ parse "ls -l" style permission text: e.g. -rw-r--r-- """ from six.moves import range match = re.search("[-drwx]+", permission_text) if match is None: raise ValueError( "invalid permission character: " + permission_text) if len(permission_text) != 10: raise ValueError( "invalid permission text length: " + permission_text) permission_text = permission_text[1:] return int( "0" + "".join([ str(parsePermission3Char(permission_text[i:i + 3])) for i in range(0, 9, 3) ]), base=8)