# encoding: utf-8
'''
@author: Tsuyoshi Hombashi
'''
from __future__ import with_statement
import os.path
import re
import sys
import six
from thutils.logger import logger
# Attribute Name ---
AN_GENERAL_KEY = "key"
AN_GENEARAL_VALUE = "value"
KEY_VALUE_HEADER = [AN_GENERAL_KEY, AN_GENEARAL_VALUE]
# Regular Expression ---
RE_SPACE = re.compile("[\s]+")
# class ---
[docs]class NotInstallError(Exception):
pass
[docs]class BaseObject(object):
def __init__(self):
pass
def to_string(self):
return "%s: %s" % (
self.__class__.__name__, dump_dict(self.__dict__))
def debug(self, message=""):
logger.debug("%s: %s %s" % (
self.__class__.__name__, message, dump_dict(self.__dict__)))
[docs]class MinMaxObject(BaseObject):
@property
def min_value(self):
return self.__min_value
@property
def max_value(self):
return self.__max_value
def __init__(self):
self.__min_value = None
self.__max_value = None
def diff(self):
return self.max_value - self.min_value
def average(self):
return (self.max_value + self.min_value) * 0.5
def update(self, value):
if self.__min_value is None:
self.__min_value = value
else:
self.__min_value = min(self.__min_value, value)
if self.__max_value is None:
self.__max_value = value
else:
self.__max_value = max(self.__max_value, value)
# function ---
[docs]def is_integer(value):
if isinstance(value, six.integer_types):
return not isinstance(value, bool)
try:
int(value)
except:
return False
if isinstance(value, float):
return False
#text = str(value).strip()
# if re.search("[.]|e-", text) is not None:
# return False
return True
[docs]def is_hex(value):
try:
int(value, 16)
except (TypeError, ValueError):
return False
return True
[docs]def is_float(value):
if any([isinstance(value, float), value == float("inf")]):
return True
if isinstance(value, bool):
return False
try:
work = float(value)
if work == float("inf"):
return False
except:
return False
return True
[docs]def is_nan(value):
return value != value
[docs]def is_empty_string(value):
try:
return len(value.strip()) == 0
except AttributeError:
return True
[docs]def is_not_empty_string(value):
"""
空白文字(\0, \t, \n)を除いた文字数が0より大きければTrueを返す
"""
try:
return len(value.strip()) > 0
except AttributeError:
return False
def _is_list(value):
return isinstance(value, list)
def _is_tuple(value):
return isinstance(value, tuple)
[docs]def is_list_or_tuple(value):
return any([_is_list(value), _is_tuple(value)])
def is_empty_list_or_tuple(value):
return value is None or (_is_list(value) and len(value) == 0)
[docs]def is_empty_list_or_tuple(value):
return value is None or (is_list_or_tuple(value) and len(value) == 0)
[docs]def is_not_empty_list_or_tuple(value):
return is_list_or_tuple(value) and len(value) > 0
[docs]def safe_division(dividend, divisor):
"""
:return:
nan: invalid arguments
:rtype: float
"""
try:
divisor = float(divisor)
dividend = float(dividend)
except (TypeError, ValueError, AssertionError):
return float("nan")
try:
return dividend / divisor
except (ZeroDivisionError):
return float("nan")
[docs]def get_list_item(input_list, index):
if not is_integer(index):
return None
list_size = len(input_list)
if not (0 <= index < list_size):
message = "out of index: list=%s, size=%d, index=%s" % (
input_list, list_size, str(index))
#raise IndexError(message)
logger.debug(message)
return None
try:
return input_list[index]
except TypeError:
return None
[docs]def get_integer_digit(value):
import math
abs_value = abs(float(value))
if abs_value == 0:
return 1
return max(1, int(math.log10(abs_value) + 1.0))
def _get_decimal_places(value, integer_digits):
import math
from collections import namedtuple
from six.moves import range
float_digit_len = 0
if is_integer(value):
abs_value = abs(int(value))
else:
abs_value = abs(float(value))
text_value = str(abs_value)
float_text = 0
if text_value.find(".") != -1:
float_text = text_value.split(".")[1]
float_digit_len = len(float_text)
elif text_value.find("e-") != -1:
float_text = text_value.split("e-")[1]
float_digit_len = int(float_text) - 1
Threshold = namedtuple("Threshold", "pow digit_len")
upper_threshold = Threshold(pow=-2, digit_len=6)
min_digit_len = 1
treshold_list = [
Threshold(upper_threshold.pow + i, upper_threshold.digit_len - i)
for i, _ in enumerate(range(upper_threshold.digit_len, min_digit_len - 1, -1))
]
abs_digit = min_digit_len
for treshold in treshold_list:
if abs_value < math.pow(10, treshold.pow):
abs_digit = treshold.digit_len
break
return min(abs_digit, float_digit_len)
[docs]def get_number_of_digit(value):
try:
integer_digits = get_integer_digit(value)
except (ValueError, TypeError):
integer_digits = float("nan")
try:
decimal_places = _get_decimal_places(value, integer_digits)
except (ValueError, TypeError):
decimal_places = float("nan")
return (integer_digits, decimal_places)
[docs]def get_text_len(text):
try:
return len(str(text))
except UnicodeEncodeError:
return len(text)
except:
return 0
[docs]def removeItemFromList(item_list, item):
is_remove = False
if item in item_list:
item_list.remove(item)
is_remove = True
return is_remove
[docs]def removeListFromList(input_list, remove_list):
for remove_item in remove_list:
removeItemFromList(input_list, remove_item)
[docs]def convert_value(value):
if is_integer(value):
value = int(value)
elif is_float(value):
value = float(value)
return value
[docs]def diffItemList(item_list, remove_list):
# return list(set(item_list).difference(set(remove_list)))
work_list = list(item_list)
for remove_item in remove_list:
removeItemFromList(work_list, remove_item)
return work_list
def _unit_to_byte(unit, kilo_size):
if kilo_size not in [1000, 1024]:
raise ValueError("invalid kilo size: " + str(kilo_size))
re_exp_pair_list = [
[re.compile("^b$", re.IGNORECASE), 0],
[re.compile("^k$", re.IGNORECASE), 1],
[re.compile("^m$", re.IGNORECASE), 2],
[re.compile("^g$", re.IGNORECASE), 3],
[re.compile("^t$", re.IGNORECASE), 4],
[re.compile("^p$", re.IGNORECASE), 5],
]
for re_exp_pair in re_exp_pair_list:
re_pattern, exp = re_exp_pair
if re_pattern.search(unit):
return kilo_size ** exp
raise ValueError("unknown unit: %s" % (unit))
[docs]def humanreadable_to_byte(readable_size, kilo_size=1024):
"""
:param str: readable_size. human readable size (bytes). e.g. 256 M
:param int: size of kilo. 1024 or 1000
:raises: ValueError
"""
size = readable_size[:-1]
unit = readable_size[-1]
size = float(size)
unit = unit.lower()
if size < 0:
raise ValueError("minus size")
coefficient = _unit_to_byte(unit, kilo_size)
return size * coefficient
[docs]def humanreadable_to_kb(readable_size):
return humanreadable_to_byte(readable_size) / 1024
def _get_unit(byte):
kilo = 1024
unit_pair_list = [
[kilo ** 5, "PB"],
[kilo ** 4, "TB"],
[kilo ** 3, "GB"],
[kilo ** 2, "MB"],
[kilo ** 1, "KB"],
]
for unit_pair in unit_pair_list:
unit_byte, unit_name = unit_pair
if byte >= unit_byte:
return unit_byte, unit_name
return 1, "B"
[docs]def bytes_to_humanreadable(byte):
byte = int(byte)
if byte < 0:
raise ValueError("argument must be greatar than 0")
divisor, unit = _get_unit(byte)
if (byte % divisor) == 0 and byte >= 1:
value = str(int(byte / divisor))
else:
value = str(safe_division(byte, divisor))
return value + " " + unit
[docs]def strtobool_wrapper(value):
"""
try:
import distutils.util
return bool(distutils.util.strtobool(value))
except ImportError:
"""
re_true = re.compile("^true$", re.IGNORECASE)
re_false = re.compile("^false$", re.IGNORECASE)
if re_true.match(value):
return True
if re_false.match(value):
return False
raise ValueError("can not convert '%s' to bool" % (value))
[docs]def split_line_list(
line_list, re_line_separator=re.compile("^$"),
is_include_matched_line=False, is_strip=True):
block_list = []
block = []
for line in line_list:
if is_strip:
line = line.strip()
if re_line_separator.search(line):
if len(block) > 0:
block_list.append(block)
block = []
if is_include_matched_line:
block.append(line)
continue
block.append(line)
if len(block) > 0:
block_list.append(block)
return block_list
[docs]def is_install_command(command):
import subprocess
import platform
if platform.system() != "Linux":
return True
search_command = "type " + command.split()[0]
proc = subprocess.Popen(
search_command, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
is_command_found = proc.wait() == 0
if not is_command_found:
logger.debug("'%s' command not found" % (command))
return is_command_found
[docs]def verify_install_command(command_list):
not_installed_command_list = []
for command in command_list:
if not is_install_command(command):
not_installed_command_list.append(command)
if len(not_installed_command_list) > 0:
message = "command not found: %s" % (
", ".join(not_installed_command_list))
raise NotInstallError(message)
logger.debug("required commands are installed: " + ", ".join(command_list))
[docs]def command_to_filename(command, suffix=""):
import thutils.gfile as gfile
sep_char = "/\\"
command = command.strip()
filename = command.replace(" ", "_")
filename = filename.replace("-", "")
filename = filename.strip(sep_char).lstrip(sep_char)
filename = re.sub("[%s]" % re.escape("/\\"), "-", filename)
filename = gfile.sanitize_file_name(filename)
if is_not_empty_string(suffix):
filename += "_" + suffix
return filename
[docs]def compare_version(lhs_version, rhs_version):
"""
<Major>.<Minor>.<Revision> 形式のバージョン文字列を比較する。
:return:
0<: LHSがRHSより小さい
0: LHS == RHS
0>: LHSがRHSより大きい
:rtype: int
"""
lhs_major, lhs_minor, lhs_revision = [
int(v) for v in lhs_version.split(".")]
rhs_major, rhs_minor, rhs_revision = [
int(v) for v in rhs_version.split(".")]
if lhs_major < rhs_major:
return -1
elif lhs_major > rhs_major:
return 1
if lhs_minor < rhs_minor:
return -1
elif lhs_minor > rhs_minor:
return 1
if lhs_revision < rhs_revision:
return -1
elif lhs_revision > rhs_revision:
return 1
return 0
[docs]def get_execution_command():
def get_arg_text():
arg_list = []
for arg in sys.argv[1:]:
if is_integer(arg):
arg_list.append(arg)
continue
if RE_SPACE.search(arg):
arg = "'%s'" % (arg)
arg_list.append(arg)
return " ".join(arg_list)
return os.path.basename(sys.argv[0]) + " " + get_arg_text()
[docs]def sleep_wrapper(sleep_second, dry_run=False):
import time
if sleep_second == float("inf"):
# Process to maintain consistency between OS
# linux: raise IOError
# windows: raise OverflowError
raise OverflowError("sleep length is too large")
if is_nan(sleep_second):
# Process to maintain consistency between OS
# linux: raise IOError
# windows: not raise exception
raise IOError("Invalid argument")
sleep_second = float(sleep_second)
if sleep_second <= 0:
logger.debug("skip sleep")
return 0
if dry_run:
logger.debug("dry-run: skip sleep")
return 0
logger.debug("sleep %f seconds" % (sleep_second))
time.sleep(sleep_second)
return sleep_second
[docs]def get_var_name(var, symboltable):
for name, v in six.iteritems(symboltable):
if id(v) == id(var):
return name
[docs]def dump_dict(dict_input, indent=4):
"""
辞書型変数を文字列に変換して返す
"""
dict_work = dict(dict_input)
"""
for key, value in six.iteritems(dict_input):
if any([f(value) for f in (is_float, isDict, is_list_or_tuple)]):
dict_work[key] = value
continue
try:
dict_work[key] = str(value)
except:
dict_work[key] = str(type(value))
dict_work[key] = _convert_dump_dict(value)
"""
try:
import json
return json.dumps(dict_work, sort_keys=True, indent=indent)
except ImportError:
pass
try:
import simplejson as json
return json.dumps(dict_work, sort_keys=True, indent=indent)
except ImportError:
pass
logger.error("failed to import json library")
try:
import pprint
return pprint.pformat(dict_work, indent=indent)
except ImportError:
pass
return str(dict_work)
[docs]def debug_dict(dict_input, symbol_table, convert_func=dump_dict):
logger.debug("%s keys=%d %s" % (
get_var_name(dict_input, symbol_table),
len(dict_input), convert_func(dict_input)))