Source code for thutils.binary_writer

'''
@author: Tsuyoshi Hombashi
'''

import os
import random

import six
from six.moves import range

import thutils.common as common
from thutils.logger import logger


[docs]class BinaryWriter: __TABLE_SIZE = 16 __DEFAULT_IO_SIZE = 64 * 1024 # [byte] __MAX_IO_SIZE = 1 * 1024 ** 2 # [byte] __BYTE_CONTINUITY = 30 # [%] def __init__( self, io_size_byte=__DEFAULT_IO_SIZE, byte_continuity=__BYTE_CONTINUITY): self.__verify_size( io_size_byte, min_value=1, max_value=self.__MAX_IO_SIZE) self.__io_size_byte = int(io_size_byte) # [byte] self.__byte_continuity = byte_continuity # [%] # get open flag --- self.__open_flag = self.__get_open_flag() # get minimum write data block size --- self.__min_data_block_size = 1024 while True: if self.__min_data_block_size < io_size_byte: break self.__min_data_block_size = int(self.__min_data_block_size / 2) logger.debug("""BinaryWriter initialize: io-size=%d[byte] min-block-size=%d[byte] """ % (io_size_byte, self.__min_data_block_size) ) # make write data --- random.seed() self.__make_binary_data_table()
[docs] def write_binary( self, file_path, write_size_byte, permission="-rw-r--r--"):
return self.__write_binary( file_path, write_size_byte, permission)
[docs] def get_write_binary_data(self, size_byte): remain_byte = int(size_byte) return_data_list = [] work_data_list = None while True: if remain_byte < self.__min_data_block_size: return_data_list.extend( self.__get_raw_byte_data_list(remain_byte)) break data_idx = random.randint(0, self.__TABLE_SIZE - 1) if any([work_data_list is None, not self.__is_continue_byte()]): work_data_list = self.__bin_block_data_list[data_idx] return_data_list.extend(work_data_list) remain_byte -= self.__min_data_block_size return return_data_list
@staticmethod def __get_open_flag(): import platform os_type = platform.system() if os_type == "Linux": return os.O_WRONLY | os.O_CREAT elif os_type == "Windows": return os.O_WRONLY | os.O_CREAT | os.O_BINARY | os.O_SEQUENTIAL else: raise OSError("not supported os: " + os_type) def __is_continue_byte(self): return random.randint(0, 99) < self.__byte_continuity def __get_block_size_byte(self, bytes_to_write): if bytes_to_write <= 0: return 0 block_size_candidate = self.__io_size_byte while True: if bytes_to_write >= block_size_candidate: return int(block_size_candidate) block_size_candidate *= 0.5 if block_size_candidate < 1.0: break return 0 def __get_raw_byte_data_list(self, data_size_byte): import struct bin_data_list = [] bin_data = struct.pack("B", random.randint(1, 255)) for _i in range(data_size_byte): if not self.__is_continue_byte(): bin_data = struct.pack("B", random.randint(1, 255)) bin_data_list.append(bin_data) return bin_data_list def __make_binary_data_table(self): self.__bin_block_data_list = [ self.__get_raw_byte_data_list(self.__min_data_block_size) for _i in range(self.__TABLE_SIZE) ] def __verify_size(self, size, min_value=None, max_value=None): if common.is_float(min_value): if size < min_value: raise ValueError( "invalid size: expected>%d, value=%d" % (min_value, size)) if common.is_float(max_value): if size > max_value: raise ValueError( "invalid size: expected<%d, value=%d" % (max_value, size)) def __write_binary( self, file_path, write_size_byte, permission): import thutils.gfile self.__verify_size(write_size_byte, min_value=1) fd = None write_size_byte_count = 0 try: fd = os.open(file_path, self.__open_flag) bin_data_list = None while True: bytes_to_write = write_size_byte - write_size_byte_count if bytes_to_write <= 0: break io_size_byte = min( int(self.__io_size_byte), self.__get_block_size_byte(bytes_to_write)) self.__verify_size(io_size_byte, min_value=1) if any([bin_data_list is None, not self.__is_continue_byte()]): bin_data_list = self.get_write_binary_data(io_size_byte) actual_write_byte = os.write(fd, six.b("").join(bin_data_list)) if io_size_byte > actual_write_byte: raise IOError( "write failed: write-size=%d[byte] > actual-write=%d[byte]" % ( io_size_byte, actual_write_byte)) write_size_byte_count += actual_write_byte os.close(fd) fd = None thutils.gfile.FileManager.chmod(file_path, permission) return write_size_byte_count finally: if fd is not None: os.close(fd) fd = None