Source code for pystratum_mysql.helper.MySqlRoutineLoaderHelper

import re
from typing import Dict, Optional, Any

from mysql import connector
from pystratum_backend.StratumStyle import StratumStyle

from pystratum_common.exception.LoaderException import LoaderException
from pystratum_common.helper.DataTypeHelper import DataTypeHelper
from pystratum_common.helper.RoutineLoaderHelper import RoutineLoaderHelper
from pystratum_mysql.helper.MySqlDataTypeHelper import MySqlDataTypeHelper
from pystratum_mysql.MySqlMetadataDataLayer import MySqlMetadataDataLayer


[docs]class MySqlRoutineLoaderHelper(RoutineLoaderHelper): """ Class for loading a single stored routine into a MySQL instance from a (pseudo) SQL file. """ # ------------------------------------------------------------------------------------------------------------------ def __init__(self, io: StratumStyle, dl: MySqlMetadataDataLayer, routine_filename: str, routine_file_encoding: str, pystratum_old_metadata: Optional[Dict], replace_pairs: Dict[str, Any], rdbms_old_metadata: Optional[Dict], sql_mode: str, character_set: str, collate: str): """ Object constructor. :param PyStratumStyle io: The output decorator. :param MySqlMetadataDataLayer dl: The metadata layer. :param str routine_filename: The filename of the source of the stored routine. :param str routine_file_encoding: The encoding of the source file. :param dict pystratum_old_metadata: The metadata of the stored routine from PyStratum. :param dict[str,str] replace_pairs: A map from placeholders to their actual values. :param dict rdbms_old_metadata: The old metadata of the stored routine from MS SQL Server. :param str sql_mode: The SQL mode under which the stored routine must be loaded and run. :param str character_set: The default character set under which the stored routine must be loaded and run. :param str collate: The default collate under which the stored routine must be loaded and run. """ RoutineLoaderHelper.__init__(self, io, routine_filename, routine_file_encoding, pystratum_old_metadata, replace_pairs, rdbms_old_metadata) self._character_set: str = character_set """ The default character set under which the stored routine will be loaded and run. """ self._collate: str = collate """ The default collate under which the stored routine will be loaded and run. """ self._sql_mode: str = sql_mode """ The SQL-mode under which the stored routine will be loaded and run. """ self._dl: MySqlMetadataDataLayer = dl """ The metadata layer. """ # ------------------------------------------------------------------------------------------------------------------ def _get_bulk_insert_table_columns_info(self) -> None: """ Gets the column names and column types of the current table for bulk insert. """ table_is_non_temporary = self._dl.check_table_exists(self._table_name) if not table_is_non_temporary: self._dl.call_stored_routine(self._routine_name) columns = self._dl.describe_table(self._table_name) tmp_column_types = [] tmp_fields = [] n1 = 0 for column in columns: prog = re.compile('(\\w+)') c_type = prog.findall(column['Type']) tmp_column_types.append(c_type[0]) tmp_fields.append(column['Field']) n1 += 1 n2 = len(self._columns) if not table_is_non_temporary: self._dl.drop_temporary_table(self._table_name) if n1 != n2: raise LoaderException("Number of fields %d and number of columns %d don't match." % (n1, n2)) self._columns_types = tmp_column_types self._fields = tmp_fields # ------------------------------------------------------------------------------------------------------------------ def _get_data_type_helper(self) -> DataTypeHelper: """ Returns a data type helper object for MySQL. :rtype: DataTypeHelper """ return MySqlDataTypeHelper() # ------------------------------------------------------------------------------------------------------------------ def _get_name(self) -> None: """ Extracts the name of the stored routine and the stored routine type (i.e. procedure or function) source. """ prog = re.compile("create\\s+(procedure|function)\\s+([a-zA-Z0-9_]+)") matches = prog.findall(self._routine_source_code) if matches: self._routine_type = matches[0][0].lower() if self._routine_name != matches[0][1]: raise LoaderException('Stored routine name {0} does not match filename in file {1}'. format(matches[0][1], self._source_filename)) if not self._routine_type: raise LoaderException('Unable to find the stored routine name and type in file {0}'. format(self._source_filename)) # ------------------------------------------------------------------------------------------------------------------ def _get_routine_parameters_info(self) -> None: """ Retrieves information about the stored routine parameters from the meta data of MySQL. """ routine_parameters = self._dl.get_routine_parameters(self._routine_name) for routine_parameter in routine_parameters: if routine_parameter['parameter_name']: value = routine_parameter['column_type'] if 'character_set_name' in routine_parameter: if routine_parameter['character_set_name']: value += ' character set %s' % routine_parameter['character_set_name'] if 'collation' in routine_parameter: if routine_parameter['character_set_name']: value += ' collation %s' % routine_parameter['collation'] self._parameters.append({'name': routine_parameter['parameter_name'], 'data_type': routine_parameter['parameter_type'], 'numeric_precision': routine_parameter['numeric_precision'], 'numeric_scale': routine_parameter['numeric_scale'], 'data_type_descriptor': value}) # ------------------------------------------------------------------------------------------------------------------ def _is_start_of_stored_routine(self, line: str) -> bool: """ Returns True if a line is the start of the code of the stored routine. :param str line: The line with source code of the stored routine. :rtype: bool """ return re.match(r'^\s*create\s+(procedure|function)', line, re.IGNORECASE) is not None # ------------------------------------------------------------------------------------------------------------------ def _is_start_of_stored_routine_body(self, line: str) -> bool: """ Returns True if a line is the start of the body of the stored routine. :param str line: The line with source code of the stored routine. :rtype: bool """ return re.match(r'^\s*begin', line, re.IGNORECASE) is not None # ------------------------------------------------------------------------------------------------------------------ def _load_routine_file(self) -> None: """ Loads the stored routine into the MySQL instance. """ self._io.text('Loading {0} <dbo>{1}</dbo>'.format(self._routine_type, self._routine_name)) self._unset_magic_constants() self._drop_routine() self._dl.set_sql_mode(self._sql_mode) self._dl.set_character_set(self._character_set, self._collate) self._dl.execute_none(self._routine_source_code) # ------------------------------------------------------------------------------------------------------------------ def _log_exception(self, exception: Exception) -> None: """ Logs an exception. :param Exception exception: The exception. """ RoutineLoaderHelper._log_exception(self, exception) if isinstance(exception, connector.errors.Error): if exception.errno == 1064: # Exception is caused by an invalid SQL statement. sql = self._dl.last_sql() if sql: sql = sql.strip() # The format of a 1064 message is: %s near '%s' at line %d parts = re.search(r'(\d+)$', exception.msg) if parts: error_line = int(parts.group(1)) else: error_line = 0 self._print_sql_with_error(sql, error_line) # ------------------------------------------------------------------------------------------------------------------ def _must_reload(self) -> bool: """ Returns True if the source file must be load or reloaded. Otherwise returns False. :rtype: bool """ if not self._pystratum_old_metadata: return True if self._pystratum_old_metadata['timestamp'] != self._m_time: return True if self._pystratum_old_metadata['replace']: for key, value in self._pystratum_old_metadata['replace'].items(): if key.lower() not in self._replace_pairs or self._replace_pairs[key.lower()] != value: return True if not self._rdbms_old_metadata: return True if self._rdbms_old_metadata['sql_mode'] != self._sql_mode: return True if self._rdbms_old_metadata['character_set_client'] != self._character_set: return True if self._rdbms_old_metadata['collation_connection'] != self._collate: return True return False # ------------------------------------------------------------------------------------------------------------------ def _drop_routine(self) -> None: """ Drops the stored routine if it exists. """ if self._rdbms_old_metadata: self._dl.drop_stored_routine(self._rdbms_old_metadata['routine_type'], self._routine_name)
# ----------------------------------------------------------------------------------------------------------------------