Source code for thutils.sqlite

# encoding: utf-8

@author: Tsuyoshi Hombashi

import logging
import os
import re
import sys
import sqlite3

import six
from six.moves import range
from six.moves import map

import thutils
import thutils.common as common
import thutils.gfile as gfile
import thutils.gtime as gtime
from thutils.logger import logger


MEMORY_DB_NAME = ":memory:"

[docs]class SqlQuery: __RE_SANITIZE = re.compile("[%s]" % (re.escape("%/()[]<>.:;'!\# -+=\n\r"))) __RE_TABLE_STR = re.compile("[%s]" % (re.escape("%()-+/."))) __RE_TO_ATTR_STR = re.compile("[%s0-9\s#]" % (re.escape("[%()-+/.]"))) __VALID_WHERE_OPERATION_LIST = [ "=", "==", "!=", "<>", ">", ">=", "<", "<="] @classmethod
[docs] def sanitize(cls, query_item): return cls.__RE_SANITIZE.sub("", query_item)
[docs] def to_table_str(cls, name): if return "[%s]" % (name) if return "'%s'" % (name) return name
[docs] def to_attr_str(cls, name, operation_query=""): if sql_name = "[%s]" % (name) elif name == "join": sql_name = "[%s]" % (name) else: sql_name = name if common.is_not_empty_string(operation_query): sql_name = "%s(%s)" % (operation_query, sql_name) return sql_name
[docs] def to_attr_str_list(cls, name_list, operation_query=None): if common.is_empty_string(operation_query): return map(cls.to_attr_str, name_list) return [ "%s(%s)" % (operation_query, cls.to_attr_str(name)) for name in name_list ]
[docs] def to_value_str(cls, value): if value is None: return "NULL" if common.is_integer(value) or common.is_float(value): return str(value) return "'%s'" % (value)
[docs] def to_value_str_list(cls, value_list): return map(cls.to_value_str, value_list)
[docs] def make_select(cls, select, table, where=None, extra=None): """ SQLite query作成補助関数 :return: SQLite query string :rtype: str """ _validate_table_name(table) query_list = [ "SELECT " + select, "FROM " + cls.to_table_str(table), ] if common.is_not_empty_string(where): query_list.append("WHERE " + where) if common.is_not_empty_string(extra): query_list.append(extra) return " ".join(query_list)
[docs] def make_insert(cls, table_name, insert_tuple, is_insert_many=False): _validate_table_name(table_name) table_name = cls.to_table_str(table_name) if common.is_empty_list_or_tuple(insert_tuple): raise ValueError("empty insert list/tuple") if is_insert_many: value_list = ['?' for _i in insert_tuple] else: value_list = [str(value) for value in insert_tuple] return "insert into %s values (%s)" % ( table_name, ",".join(value_list))
[docs] def make_where(cls, key, value, operation="="): if operation not in cls.__VALID_WHERE_OPERATION_LIST: raise ValueError("operation not supported: " + str(operation)) # if value is None: # value = "NULL" return "%s %s %s" % ( cls.to_attr_str(key), operation, cls.to_value_str(value))
[docs] def make_where_in(cls, key, value_list): return "%s IN (%s)" % ( cls.to_attr_str(key), ", ".join(cls.to_value_str_list(value_list)))
[docs] def make_where_not_in(cls, key, value_list): return "%s NOT IN (%s)" % ( cls.to_attr_str(key), ", ".join(cls.to_value_str_list(value_list)))
[docs] def make_datetimerange_where_list( cls, datetime_range, start_attr_name, end_attr_name): import copy if datetime_range is None: return [] where_query_list = [] dtr_work = copy.copy(datetime_range) dtr_work.time_format = DB_DATETIME_FORMAT if gtime.is_datetime(dtr_work.start_datetime): where_query = cls.make_where( start_attr_name, dtr_work.getStartTimeText(), operation=">=") where_query_list.append(where_query) if gtime.is_datetime(datetime_range.end_datetime): where_query = cls.make_where( end_attr_name, dtr_work.getEndTimeText(), operation="<=") where_query_list.append(where_query) return where_query_list
[docs]def getListFromQueryResult(result): """ :argument: Return value from a SQLite query response :rtype: list """ return [record[0] for record in result]
[docs]def copy_table(con_src, con_dst, table_name): _validate_table_name(table_name) if con_src is None: logger.error("null source database") return False if con_dst is None: logger.error("null destination database") return False if not con_src.has_table(table_name): logger.warn( "not found '%s' table in %s" % (table_name, con_src.database_path)) return False logger.debug("copy '%s' table: %s to %s" % ( table_name, con_src.database_path, con_dst.database_path)) result = con_src.execute_select(select="*", table=table_name) if result is None: return False value_matrix = result.fetchall() return con_dst.create_table_with_data( table_name, con_src.get_attribute_name_list(table_name), value_matrix)
[docs]def append_table(con_src, con_dst, table_name): _validate_table_name(table_name) logger.debug("append '%s' table: %s -> %s" % ( table_name, con_src.database_path, con_dst.database_path)) result = con_src.execute_select(select="*", table=table_name) if result is None: return False value_matrix = [value_list for value_list in result.fetchall()] if not con_dst.has_table(table_name): con_dst.create_table_with_data( table_name, con_src.get_attribute_name_list(table_name), value_matrix) else: con_dst.execute_insert_many(table_name, value_matrix) return True
# class ---
[docs]class DatabaseError(Exception): pass
[docs]class NullDatabaseConnectionError(Exception): pass
[docs]class TableNotFoundError(Exception): pass
[docs]class AttributeNotFoundError(Exception): pass
[docs]class MissmatchError(Exception): pass
[docs]class SqliteWrapper(object): ''' wrapper class of sqlite3 ''' # Table Nmae TN_DB_INFO = "database_information" TN_TABLE_CONFIG = "table_configuration" TN_SQL_PROFILE = "sql_profile" MISC_TABLE_LIST = [ TN_TABLE_CONFIG, TN_DB_INFO, TN_SQL_PROFILE, ] # Attribute Name AN_DB_NAME = "database_name" AN_DB_VERSION = "database_version" AN_DB_CREATE_TIME = "database_create_time" @property def database_path(self): return self.__database_path @property def connection(self): return self.__connection @property def mode(self): return self.__mode @property def dry_run(self): return self.__dry_run def __init__(self, dry_run=False, profile=False): self.cursur = None self.auto_commit = True self.sql_debug = False self.is_logging_query = False self.__dry_run = dry_run self.__database_path = None self.__connection = None self.__mode = None self.__is_profile = profile self.__dict_query_count = {} self.__dict_query_totalexectime = {} def __del__(self): self.close() def __exit__(self, exc_type, exc_value, traceback): self.close()
[docs] def is_connected(self): if self.dry_run: return True try: self.check_connection() except NullDatabaseConnectionError: return False return True
[docs] def check_connection(self): """ :raises NullDatabaseConnectionError: if not connected to a SQLite database file """ if self.dry_run: return if self.connection is None: raise NullDatabaseConnectionError("null database connection") if common.is_empty_string(self.database_path): raise NullDatabaseConnectionError("null database file path")
[docs] def check_database_name(self, expected_name): """ :raises TableNotFoundError: table not found in the database :raises AttributeError: attribute not found in the database :raises MissmatchError: database name missmatch found """ table_name = self.TN_DB_INFO self.check_connection() self.verify_table_existence(table_name) database_name = self.get_database_name() if common.is_empty_string(database_name): message = "'%s' attribute not found in '%s' table" % ( self.AN_DB_NAME, table_name) raise AttributeError(message) if database_name != expected_name: dict_msg = { "name": database_name, "expected": expected_name, "database": self.database_path, } message = "mismatch database name: " + common.dump_dict(dict_msg) raise MissmatchError(message) logger.debug("valid database name: %s" % (self.TN_DB_INFO))
[docs] def check_database_version(self, compare_version): """ :raises TableNotFoundError: table not found in the database :raises AttributeError: attribute not found in the database :raises MissmatchError: database version miss match found """ self.check_connection() database_version = self.get_database_version() if common.is_empty_string(database_version): message = "%s attribute not found in %s table" % ( self.AN_DB_VERSION, self.TN_DB_INFO) raise AttributeError(message) if common.compare_version(database_version, compare_version) != 0: message = "mismatch database version: %s and %s" % ( database_version, compare_version) raise MissmatchError(message) logger.debug( "valid %s version: %s" % (self.TN_DB_INFO, database_version))
[docs] def checkAccessPermission(self, valid_permission_list): if self.mode is None: return if self.mode not in valid_permission_list: raise thutils.PermissionError(str(valid_permission_list))
[docs] def connect(self, database_path, mode="w"): """ :param str mode: "r": read only "w": DBファイルを新規作成する。既存ファイルは削除する。 "a": 既存DBファイルに追記する。 :raises TypeError: - database_path が文字列でない - mode が文字列でない :raises FileNotFoundError: database_path not found :raises InvalidFilePathError: database_path is invalid :raises ValueError: - database_path がディレクトリ - mode が未サポートの値 :raises DatabaseError: file is not a valid SQLite binary file """ if database_path != MEMORY_DB_NAME: gfile.validate_path(database_path) self.close() if mode == "r": self.__verify_sqlite_db_file(database_path) elif mode in ["w", "a"]: if database_path != MEMORY_DB_NAME: try: gfile.check_file_existence(database_path) except gfile.FileNotFoundError: pass if mode == "w": gfile.FileManager.remove_file(database_path) else: raise ValueError("unknown connection mode: " + mode) self.__database_path = os.path.realpath(database_path) logger.debug("connect to " + database_path) if self.dry_run: return self.__connection = sqlite3.connect( database_path, #detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, cached_statements=256) self.__mode = mode
#self.__connection.row_factory = sqlite3.Row #sqlite3.dbapi2.converters['DATETIME'] = sqlite3.dbapi2.converters['TIMESTAMP']
[docs] def execute_select(self, select, table, where=None, extra=None): query = SqlQuery.make_select(select, table, where, extra) if self.sql_debug: logger.debug(query) result = self.__execute(query, logging.getLogger().findCaller()) if result is None: message = "null result: query='%s', db=%s" % ( query, self.database_path) logger.debug(message) return None return result
[docs] def execute_insert(self, table_name, insert_record): self.checkAccessPermission(["w", "a"]) query = SqlQuery.make_insert( table_name, insert_record) self.__logging("execute query: %s %s" % (query, insert_record)) if self.dry_run: return True self.__execute(query, logging.getLogger().findCaller()) return True
[docs] def execute_insert_many(self, table_name, insert_record_list): self.checkAccessPermission(["w", "a"]) logger.debug("insert %d records to '%s'" % ( len(insert_record_list), table_name)) self.verify_table_existence(table_name) if common.is_empty_list_or_tuple(insert_record_list): logger.debug("empty record list") return True query = SqlQuery.make_insert( table_name, insert_record_list[0], is_insert_many=True) self.__logging("insert many record: query=%s, size=%d, db=%s" % ( query, len(insert_record_list), self.database_path)) if self.dry_run: return True try: self.connection.executemany(query, insert_record_list) return True except Exception: _, e, _ = sys.exc_info() # for python 2.5 compatibility caller = logging.getLogger().findCaller() file_path, line_no, func_name = caller[:3] logger.error( "%s(%d) %s: failed to execute query:\n" % ( file_path, line_no, func_name) + " query=%s\n" % (query) + " msg='%s'\n" % (str(e)) + " db=%s\n" % (self.database_path) + " records=%s\n" % (insert_record_list[:2]) ) return False
[docs] def get_total_changes(self): if self.dry_run: return 0 self.check_connection() return self.connection.total_changes
[docs] def getValue(self, select, table, where=None, extra=None): query = SqlQuery.make_select(select, table, where, extra) result = self.__execute(query, logging.getLogger().findCaller()) if result is None: return None return common.get_list_item(result.fetchone(), 0)
[docs] def getValueList(self, select, table, where=None, extra=None): query = SqlQuery.make_select(select, table, where, extra) result = self.__execute(query, caller=logging.getLogger().findCaller()) if result is None: return [] return getListFromQueryResult(result.fetchall())
[docs] def get_database_name(self): table_name = self.TN_DB_INFO self.verify_table_existence(table_name) return self.getValue( select=common.AN_GENEARAL_VALUE, table=table_name, where=SqlQuery.make_where(common.AN_GENERAL_KEY, self.AN_DB_NAME))
[docs] def get_database_version(self): table_name = self.TN_DB_INFO self.verify_table_existence(table_name) return self.getValue( select=common.AN_GENEARAL_VALUE, table=table_name, where=SqlQuery.make_where(common.AN_GENERAL_KEY, self.AN_DB_VERSION))
[docs] def get_table_name_list(self): self.check_connection() query = "SELECT name FROM sqlite_master WHERE TYPE='table'" result = self.__execute(query, logging.getLogger().findCaller()) if result is None: return [] return getListFromQueryResult(result.fetchall())
[docs] def get_attribute_name_list(self, table_name): if not self.has_table(table_name): raise TableNotFoundError("'%s' table not found in %s" % ( table_name, self.database_path)) query = "SELECT * FROM '%s'" % (table_name) result = self.__execute(query, logging.getLogger().findCaller()) return getListFromQueryResult(result.description)
[docs] def get_attribute_type_list(self, table_name): if not self.has_table(table_name): raise TableNotFoundError("'%s' table not found in %s" % ( table_name, self.database_path)) attribute_name_list = self.get_attribute_name_list(table_name) query = "SELECT DISTINCT %s FROM '%s'" % ( ",".join([ "TYPEOF(%s)" % (SqlQuery.to_attr_str(attribute)) for attribute in attribute_name_list]), table_name) result = self.__execute(query, logging.getLogger().findCaller()) return result.fetchone()
[docs] def get_profile(self, get_profile_count=50): con_tmp = connect_sqlite_db_mem() value_matrix = [] for query, execute_time in six.iteritems(self.__dict_query_totalexectime): call_count = self.__dict_query_count.get(query, 0) value_list = [query, execute_time, call_count] value_matrix.append(value_list) attribute_name_list = ["query", "execution_time", "count"] con_tmp.create_table_with_data( self.TN_SQL_PROFILE, attribute_name_list, data_matrix=value_matrix) con_tmp.commit() try: result = con_tmp.execute_select( select="%s,SUM(%s),SUM(%s)" % ( "query", "execution_time", "count"), table=self.TN_SQL_PROFILE, extra="GROUP BY %s ORDER BY %s DESC LIMIT %d" % ( "query", "execution_time", get_profile_count)) except sqlite3.OperationalError: return [], [] if result is None: return [], [] return attribute_name_list, result.fetchall()
[docs] def has_table(self, table_name): if self.dry_run: return True if common.is_empty_string(table_name): return False return table_name in self.get_table_name_list()
[docs] def has_attribute(self, table_name, attribute_name): return attribute_name in self.get_attribute_name_list(table_name)
[docs] def has_attribute_list(self, table_name, attribute_name_list): if common.is_empty_list_or_tuple(attribute_name_list): return False not_exist_field_list = [ attribute_name for attribute_name in attribute_name_list if not self.has_attribute(table_name, attribute_name) ] if len(not_exist_field_list) > 0: logger.warn( "attribute not exists: %s" % (", ".join(not_exist_field_list))) return False return True
[docs] def verify_table_existence(self, table_name): """ :raises TypeError: :raises TableNotFoundError: """ _validate_table_name(table_name) found_table = self.has_table(table_name) if found_table: msg_format = "'%s' table found in %s" else: msg_format = "'%s' table not found %s" message = msg_format % (table_name, self.database_path) if found_table: logger.debug(message) else: raise TableNotFoundError(message)
[docs] def verify_attribute_existence(self, table_name, attribute_name): """ :raises sTypeError: :raises TableNotFoundError: :raises AttributeNotFoundError: """ self.verify_table_existence(table_name) found_attr = self.has_attribute(table_name, attribute_name) if found_attr: msg_format = "'%s' attribute found in '%s' table" else: msg_format = "'%s' attribute not found in '%s' table" message = msg_format % (attribute_name, table_name) if found_attr: logger.debug(message) else: raise AttributeNotFoundError(message)
[docs] def drop_table(self, table_name, is_drop_existing_table=True): self.checkAccessPermission(["w", "a"]) if self.has_table(table_name) and is_drop_existing_table: logger.debug("drop table: " + table_name) query = "DROP TABLE IF EXISTS '%s'" % (table_name) self.__execute(query, logging.getLogger().findCaller()) self.commit()
[docs] def create_table(self, table_name, attribute_description_list): self.checkAccessPermission(["w", "a"]) table_name = table_name.strip() if self.has_table(table_name): logger.debug("table already exists: " + table_name) return True query = "CREATE TABLE IF NOT EXISTS '%s' (%s)" % ( table_name, ", ".join(attribute_description_list)) if self.__execute(query, logging.getLogger().findCaller()) is None: if not self.dry_run: return False return True
[docs] def createIndex(self, table_name, attribute_name): self.verify_table_existence(table_name) self.checkAccessPermission(["w", "a"]) index_name = "%s_%s_index" % ( SqlQuery.sanitize(table_name), SqlQuery.sanitize(attribute_name)) logger.debug( "create index: table='%s, attribute='%s', index_name=%s" % ( table_name, attribute_name, index_name)) query = "CREATE INDEX IF NOT EXISTS %s ON %s('%s')" % ( index_name, SqlQuery.to_table_str(table_name), attribute_name) self.__execute(query, logging.getLogger().findCaller())
[docs] def createIndexList(self, table_name, attribute_name_list): self.checkAccessPermission(["w", "a"]) if common.is_empty_list_or_tuple(attribute_name_list): return for attribute in attribute_name_list: self.createIndex(table_name, attribute)
[docs] def create_table_with_data( self, table_name, attribute_name_list, data_matrix, index_attribute_list=()): _validate_table_name(table_name) self.checkAccessPermission(["w", "a"]) strip_index_attribute_list = list( set(attribute_name_list).intersection(set(index_attribute_list))) if common.is_empty_list_or_tuple(data_matrix): msg = "null input data: '%s (%s)'" % ( table_name, ", ".join(attribute_name_list)) logger.warn(msg) return True self.__verify_value_matrix(attribute_name_list, data_matrix) attr_description_list = [] table_config_matrix = [] for col, value_type in sorted( six.iteritems(self.__get_column_valuetype(data_matrix))): attr_name = attribute_name_list[col] attr_description_list.append( "'%s' %s" % (attr_name, value_type)) table_config_matrix.append([ table_name, attr_name, value_type, True if attr_name in strip_index_attribute_list else False, ]) self.__create_table_config(table_config_matrix) self.create_table(table_name, attr_description_list) if not self.execute_insert_many(table_name, data_matrix): raise DatabaseError("failed to insert record to " + table_name) self.createIndexList(table_name, strip_index_attribute_list) self.__auto_commit() return True
[docs] def create_db_info_table(self, db_name, db_version): import datetime import thutils.environment self.checkAccessPermission(["w", "a"]) table_name = self.TN_DB_INFO if self.has_table(table_name): logger.debug("'%s' table already exists" % (table_name)) return True value_matrix = [ [self.AN_DB_NAME, db_name], [self.AN_DB_VERSION, db_version], [ self.AN_DB_CREATE_TIME, ], ] value_matrix.extend( thutils.environment.EnvironmentInfo.getGeneralInfoMatrix()) return self.create_table_with_data( table_name, common.KEY_VALUE_HEADER, value_matrix)
[docs] def rollback(self): try: self.check_connection() except NullDatabaseConnectionError: return True"rollback %s" % (self.database_path)) self.connection.rollback() return True
[docs] def commit(self): if self.dry_run: return True try: self.check_connection() except NullDatabaseConnectionError: return True logger.debug("commit to %s" % (self.database_path)) self.connection.commit() return True
[docs] def close(self): if self.dry_run: return True try: self.check_connection() except NullDatabaseConnectionError: return True self.commit() logger.debug("close database: path=%s, changes=%d" % ( self.database_path, self.get_total_changes())) self.connection.close() self.__init__() return True
def __auto_commit(self): if self.auto_commit: self.commit() @staticmethod def __verify_sqlite_db_file(database_path): if gfile.check_file_existence(database_path) == gfile.FileType.DIRECTORY: raise ValueError( "%s is not a database file, but a directory." % ( database_path)) connection = sqlite3.connect(database_path) try: connection.execute( "SELECT name FROM sqlite_master WHERE TYPE='table'") except: message = "invalid SQLite database: " + database_path raise DatabaseError(message) @staticmethod def __verify_value_matrix(field_list, value_matrix): miss_match_idx_list = [] for list_idx in range(len(value_matrix)): if len(field_list) == len(value_matrix[list_idx]): continue miss_match_idx_list.append(list_idx) if len(miss_match_idx_list) == 0: return sample_miss_match_list = value_matrix[miss_match_idx_list[0]] raise ValueError( "miss match header length and value length:" + " header: %d %s\n" % (len(field_list), str(field_list)) + " # of miss match line: %d ouf of %d\n" % ( len(miss_match_idx_list), len(value_matrix)) + " e.g. value at line=%d, len=%d: %s\n" % ( miss_match_idx_list[0], len(sample_miss_match_list), str(sample_miss_match_list)) ) @staticmethod def __get_value_type(value): if common.is_integer(value): return "INTEGER" if common.is_float(value): return "REAL" if gtime.is_datetime(value): return "DATETIME" return "TEXT" def __get_column_valuetype(self, value_matrix): """ get value type of column :return: { column_number : value_type } :rtype: dictionary """ dict_column_valuetype = {} for col, _ in enumerate(value_matrix[0]): dict_column_valuetype[col] = "INTEGER" col = 0 for col_value_list in zip(*value_matrix): for cursor_value in col_value_list: cursor_value_type = self.__get_value_type(cursor_value) if all([ cursor_value is not None, dict_column_valuetype[col] == "INTEGER", cursor_value_type == "INTEGER" ]): continue dict_column_valuetype[col] = cursor_value_type if cursor_value_type == "TEXT": break col += 1 return dict_column_valuetype def __logging(self, message): if not self.is_logging_query: return logger.debug(self, message) def __execute(self, query, caller=None): import time if caller is None: message = "execute query: " + query else: file_path, line_no, func_name = caller[:3] message = "execute query from %s(%d) %s: %s" % ( os.path.basename(os.path.realpath(file_path)), line_no, func_name, query) message += " (%s)" % (self.database_path) self.__logging(message) if self.dry_run: return None self.check_connection() if common.is_empty_string(query): return None if self.__is_profile: exec_start_time = time.time() try: result = self.connection.execute(query) except Exception: _, e, _ = sys.exc_info() # for python 2.5 compatibility if caller is None: caller = logging.getLogger().findCaller() file_path, line_no, func_name = caller[:3] message_list = [ "failed to execute query at %s(%d) %s" % ( file_path, line_no, func_name), " - query: %s" % (query), " - msg: %s" % (e), " - db: %s" % (self.database_path), ] logger.exception(e, os.linesep.join(message_list)) # return None raise e if self.__is_profile: self.__dict_query_count[ query] = self.__dict_query_count.get(query, 0) + 1 elapse_time = time.time() - exec_start_time self.__dict_query_totalexectime[query] = ( self.__dict_query_totalexectime.get(query, 0) + elapse_time) return result def __create_table_config(self, table_config_matrix): table_config_attr_list = [ "table name", "attribute name", "value type", "create index"] attr_description_list = [] for attr_name in table_config_attr_list: attr_description_list.append("'%s' %s" % (attr_name, "TEXT")) table_name = self.TN_TABLE_CONFIG if not self.has_table(table_name): self.create_table(table_name, attr_description_list) self.execute_insert_many(table_name, table_config_matrix)
def _validate_table_name(name): if common.is_empty_string(name): raise ValueError("table name is empty")
[docs]def connect_sqlite_db_mem(): con_mem = SqliteWrapper() con_mem.connect(MEMORY_DB_NAME) return con_mem
[docs]def connect_sqlite_database(db_path, mode, options=None): """ SQLiteデータベースファイルへのコネクションを返す。 """ dry_run = False profile = False sql_logging = False if options is not None: try: dry_run = options.dry_run except AttributeError: dry_run = False try: profile = options.profile except AttributeError: profile = False try: sql_logging = options.sql_logging except AttributeError: sql_logging = False con = SqliteWrapper(dry_run, profile) con.is_logging_query = sql_logging con.connect(db_path, mode) return con