作者:王向爱可生 DBA 团队成员,负责公司 DMP 产品的运维和客户 mysql 问题的处理擅长数据库故障处理对数据库技术和 python 有着浓厚的兴趣,我来为大家科普一下关于源码分析mysql?以下内容希望对你有帮助!

源码分析mysql(技术分享mysqlreplicate)

源码分析mysql

作者:王向

爱可生 DBA 团队成员,负责公司 DMP 产品的运维和客户 mysql 问题的处理。擅长数据库故障处理。对数据库技术和 python 有着浓厚的兴趣。

本文来源:原创投稿

* 爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。


目录mysqlreplicate 介绍

his utility permits an administrator to setup and start replication from one server (the master) to another (the slave). The user provides login information for the slave and connection information for connecting to the master. It is also possible to specify a database to be used to test replication.

The utility reports conditions where the storage engines on the master and the slave differ for older versions of the server. It also reports a warning if the InnoDB storage engine type (plugin verus built-in) differs on the master and slave. For InnoDB to be the same, both servers must be running the same "type" of InnoDB (built-in or the InnoDB Plugin), and InnoDB on both servers must have the same major and minor version numbers and enabled state.

By default, the utility issues warnings for mismatches between the sets of storage engines, the default storage engine, and the InnoDB storage engine. To produce errors instead, use the --pedantic option, whichrequires storage engines to be the same on the master and slave.

The -vv option displays any discrepancies between the storage engines and InnoDB values, with or without the--pedantic option.

Replication can be started using one of the following strategies.

就是可以使用一条命令快速创建从库环境

使用方法

最常用的方式:

mysqlreplicate \ --master=root:123@10.186.65.20:3306 \ # user[:passwd]@host[:port][:socket] --slave=root:123@10.186.65.119:3306 \ --rpl-user=rpl:rpl \ # user[:password] -vv \ # 详细输出 --pedantic # Fail if both servers do not have the same set of storage engines, the same default storage engine, and the same InnoDB storage engine.

指定 pos 点:

The following command starts replication from the beginning of a specific master binary log file: shell> mysqlreplicate --master=root@localhost:3306 \ --slave=root@localhost:3307 --rpl-user=rpl:rpl \ --master-log-file=my_log.000003 # master on localhost: ... connected. # slave on localhost: ... connected. # Checking for binary logging on master... # Setting up replication... # ...done. The following command starts replication from specific master binary log coordinates (specific log file and position): shell> mysqlreplicate --master=root@localhost:3306 \ --slave=root@localhost:3307 --rpl-user=rpl:rpl \ --master-log-file=my_log.000001 --master-log-pos=96 # master on localhost: ... connected. # slave on localhost: ... connected. # Checking for binary logging on master... # Setting up replication... # ...done.

[root@wx-super ~]# mysqlreplicate --master=root:123@10.186.65.20:3306 --slave=root:123@10.186.65.119:3306 --rpl-user=rpl:rpl -vv --pedantic WARNING: Using a password on the command line interface can be insecure. # master on 10.186.65.20: ... connected. # slave on 10.186.65.119: ... connected. # master id = 159197357 # slave id = 567719145 # master uuid = fec2b773-1850-11ec-9db1-02000aba4114 # slave uuid = f2743970-1850-11ec-ab78-02000aba4177 # Checking innodb statistics for type and version conflicts. # Checking storage engines... # Checking for binary logging on master... # Setting up replication... # Connecting slave to master... # CHANGE MASTER TO MASTER_HOST = '10.186.65.20', MASTER_USER = 'rpl', MASTER_PASSWORD = 'rpl', MASTER_PORT = 3306, MASTER_AUTO_POSITION=1 # Starting slave from master's last position... # IO status: Waiting for master to send event # IO thread running: Yes # IO error: None # SQL thread running: Yes # SQL error: None # ...done.

mysqlreplicate 源码分析第一步:检查重要参数的唯一性

检查 server_id 唯一性

# Create an instance of the replication object rpl = Replication(master, slave, rpl_options) errors = rpl.check_server_ids() # 检查主服务器和从服务器的id for error in errors: # 1.master_server_id不能等于0 # 2.slave_server_id不能等于0 # 3.master_server_id不能等于slave_server_id print error # Check for server_id uniqueness if verbosity > 0: print "# master id = %s" % master.get_server_id() print "# slave id = %s" % slave.get_server_id() # check_server_ids方法解析 def check_server_ids(self): master_server_id = self.master.get_server_id() # 内部封装其实是执行SHOW VARIABLES LIKE 'server_id'的结果 slave_server_id = self.slave.get_server_id() if master_server_id == 0: raise UtilRplError("Master server_id is set to 0.") if slave_server_id == 0: raise UtilRplError("Slave server_id is set to 0.") # Check for server_id uniqueness if master_server_id == slave_server_id: raise UtilRplError("The slave's server_id is the same as the " "master.") return []

检查 uuid_id 唯一性

errors = rpl.check_server_uuids() for error in errors: # 1.mysql版本不小于5.6.5 # 2.GTID_MODE必须开启 # 3.master_uuid 必须开启 # 4.slave_uuid 必须开启 # 5.master_uuid和slave_uuid不能等于 print error # Check for server_uuid uniqueness if verbosity > 0: print "# master uuid = %s" % master.get_server_uuid() print "# sla # check_server_uuids方法解析 def check_server_uuids(self): master_uuid = self.master.get_uuid() slave_uuid = self.slave.get_uuid() # 内部封装get_uuid() if self.supports_gtid() != "NO": # 检查是否开启了gtid其内部封装其实是执行SELECT @@GLOBAL.GTID_MODE的结果 # ON为开启,OFF为支持但未开启,NO为不支持GITD # 什么情况下为得出NO值?,mysql版本小于5.6.5 #version_ok = self.check_version_compat(5, 6, 5) # 内部封装为执行 SHOW VARIABLES LIKE 'VERSION' #if not version_ok: # return "NO" res = self.show_server_variable("server_uuid") # 内部封装为执行SHOW VARIABLES LIKE 'server_uuid' # Check for both not supporting UUIDs. if master_uuid is None and slave_uuid is None: return [] # Check for unbalanced servers - one with UUID, one without if master_uuid is None or slave_uuid is None: raise UtilRplError("%s does not support UUIDs." % "Master" if master_uuid is None else "Slave") # Check for uuid uniqueness if master_uuid == slave_uuid: raise UtilRplError("The slave's UUID is the same as the " "master.")

第二步:检查 InnoDB 兼容性

# Check InnoDB compatibility if verbosity > 0: print "# Checking InnoDB statistics for type and version conflicts." errors = rpl.check_innodb_compatibility(options) #检查InnoDB兼容性 for error in errors: print error # check_innodb_compatibility方法 def check_innodb_compatibility(self, options): pedantic = options.get("pedantic", False) verbose = options.get("verbosity", 0) > 0 errors = [] master_innodb_stats = self.master.get_innodb_stats() slave_innodb_stats = self.slave.get_innodb_stats() # get_innodb_stats()所做的操作 # 1.是否内置了innodb引擎,能查出数据则为内置了innodb引擎,innodb类型改为builtin # SELECT (support='YES' OR support='DEFAULT' OR support='ENABLED') AS `exists` FROM INFORMATION_SCHEMA.ENGINES WHERE engine = 'innodb' # 2.是否使用了innodb插件,能查出数据则为支持innodb引擎插件,innodb类型改为plugin # SELECT (plugin_library LIKE 'ha_innodb_plugin%') AS `exists` FROM INFORMATION_SCHEMA.PLUGINS WHERE LOWER(plugin_name) = 'innodb' AND LOWER(plugin_status) = 'active' # 3.innodb的版本信息 # SELECT plugin_version, plugin_type_version FROM INFORMATION_SCHEMA.PLUGINS WHERE LOWER(plugin_name) = 'innodb'; # innodb本部号分为两个,plugin_version和plugin_type_version # ---------------- --------------------- # | plugin_version | plugin_type_version | # ---------------- --------------------- # | 5.7 | 50725.0 | # ---------------- --------------------- # 4.检查have_innodb是否开启 # SHOW VARIABLES LIKE 'have_innodb' 目前不知道这一步是用了做什么的,mysql官方文档没有查到have_innodb参数的具体用意。是否与Mariadb有关? # 从代码判断have_innodb的返回值为Yes和No # 最终返回一个:(innoDB类型,plugin_version, plugin_type_version, have_innodb) if master_innodb_stats != slave_innodb_stats: # master和slave的innoDB类型,plugin_version, plugin_type_version, have_innodb参数必须一样 if not pedantic: errors.append("WARNING: Innodb settings differ between master " "and slave.") if verbose or pedantic: cols = ['type', 'plugin_version', 'plugin_type_version', 'have_innodb'] rows = [] rows.append(master_innodb_stats) errors.append("# Master's InnoDB Stats:") errors.extend(_get_list(rows, cols)) rows = [] rows.append(slave_innodb_stats) errors.append("# Slave's InnoDB Stats:") errors.extend(_get_list(rows, cols)) if pedantic: for line in errors: print line raise UtilRplError("Innodb settings differ between master " "and slave.") return errors

第三步:检查存储引擎一致性

# Checking storage engines if verbosity > 0: print "# Checking storage engines..." errors = rpl.check_storage_engines(options) #检查存储引擎 for error in errors: print error # check_storage_engines方法 def check_storage_engines(self, options): pedantic = options.get("pedantic", False) verbose = options.get("verbosity", 0) > 0 errors = [] slave_engines = self.slave.get_storage_engines() results = self.master.check_storage_engines(slave_engines) # get_storage_engines()所做的操作 # 执行 SELECT UPPER(engine), UPPER(support) FROM INFORMATION_SCHEMA.ENGINES ORDER BY engine获取当前数据库所安装或所支持的引擎 # 1.get_storage_engines检查所得到的引擎主从要一致 if results[0] is not None or results[1] is not None: if not pedantic: errors.append("WARNING: The master and slave have differing " "storage engine configurations!") if verbose or pedantic: cols = ['engine', 'support'] if results[0] is not None: errors.append("# Storage engine configuration on Master:") errors.extend(_get_list(results[0], cols)) if results[1] is not None: errors.append("# Storage engine configuration on Slave:") errors.extend(_get_list(results[1], cols)) if pedantic: for line in errors: print line raise UtilRplError("The master and slave have differing " "storage engine configurations!") return errors

第四步:检查 master binary logging

# Check master for binary logging print "# Checking for binary logging on master..." errors = rpl.check_master_binlog() # check_master_binlog内执行SHOW VARIABLES LIKE 'log_bin'获取是否开启了binlog,如果为"OFF"或者"0"则报错 if errors != []: raise UtilError(errors[0])

第五步:创建复制

# Setup replication print "# Setting up replication..." if not rpl.setup(rpl_user, 10): #建立主从复制,等待slave同步的尝试次数 raise UtilError("Cannot setup replication.") # setup方法解析 def setup(self, rpl_user, num_tries): if self.master is None or self.slave is None: print "ERROR: Must connect to master and slave before " \ "calling replicate()" return False result = True # Parse user and password (support login-paths) try: r_user, r_pass = parse_user_password(rpl_user) except FormatError: raise UtilError(USER_PASSWORD_FORMAT.format("--rpl-user")) # Check to see if rpl_user is present, else create her if not self.create_rpl_user(r_user, r_pass)[0]: # create_rpl_user()创建复制用户 # 1.检查是否启用了跳过权限表,就是启动了--skip-grant-tables参数 # 执行SHOW GRANTS FOR 'snuffles'@'host',报错如ERROR 1141 (42000)则为未开启,ERROR 1290 (HY000)为开启了--skip-grant-tables参数 # 2.检查用户是否存在 # 通过user.exists()方法执行:SELECT * FROM mysql.user WHERE user = 'rpl' and host = '10.186.65.119' 进行检查 # 3.不存在则创建用户 # CREATE USER 'rpl'@'10.186.65.119' IDENTIFIED WITH 'mysql_native_password' AS '*624459C87E534A126498ADE1B12E0C66EDA035A3' # 创建用户和密码 # GRANT REPLICATION SLAVE ON *.* TO 'rpl'@'10.186.65.119' # 赋予复制权限 # 4.检查用户权限 # SELECT CURRENT_USER() # SHOW GRANTS FOR 'rpl'@'10.186.65.119'; return False # Read master log file information res = self.master.get_status() # 就是SHOW MASTER STATUS,如果执行错误则报错 if not res: print "ERROR: Cannot retrieve master status." return False # If master log file, pos not specified, read master log file info read_master_info = False if self.master_log_file is None: res = self.master.get_status() # 没有指定位置点则使用SHOW MASTER STATUS获取的 if not res: print "ERROR: Cannot retrieve master status." return False #mysql> SHOW MASTER STATUS; # ------------------ ----------- -------------- ------------------ ------------------------------------------------------------------------------------------------ #| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set | # ------------------ ----------- -------------- ------------------ ------------------------------------------------------------------------------------------------ #| mysql-bin.000023 | 211116031 | | | f7d09cd0-1850-11ec-8d9a-02000aba414d:1-1849979 | # ------------------ ----------- -------------- ------------------ ------------------------------------------------------------------------------------------------ read_master_info = True self.master_log_file = res[0][0] # 对应SHOW MASTER STATUS里的File字段 self.master_log_pos = res[0][1] # 对应SHOW MASTER STATUS里的Position字段 else: #指定了位置点所做的操作 # Check to make sure file is accessible and valid found = False res = self.master.get_binary_logs(self.query_options) for row in res: if row[0] == self.master_log_file: found = True break if not found: raise UtilError("Master binary log file not listed as a " "valid binary log file on the master.") if self.master_log_file is None: raise UtilError("No master log file specified.") # Stop slave first res = self.slave.get_thread_status() # 执行 SHOW SLAVE STATUS,获取(slave_io_state, slave_io_running, slave_sql_running) if res is not None: if res[1] == "Yes" or res[2] == "Yes": # io线程和sql线程都为running时,停止复制 res = self.slave.stop(self.query_options) # self.slave.stop就是执行 STOP SLAVE # Connect slave to master if self.verbosity > 0: print "# Connecting slave to master..." master_values = { 'Master_Host': self.master.host, 'Master_Port': self.master.port, 'Master_User': r_user, 'Master_Password': r_pass, 'Master_Log_File': self.master_log_file, 'Read_Master_Log_Pos': self.master_log_pos, } # Use the options SSL certificates if defined, # else use the master SSL certificates if defined. if self.ssl: master_values['Master_SSL_Allowed'] = 1 if self.ssl_ca: master_values['Master_SSL_CA_File'] = self.ssl_ca if self.ssl_cert: master_values['Master_SSL_Cert'] = self.ssl_cert if self.ssl_key: master_values['Master_SSL_Key'] = self.ssl_key elif self.master.has_ssl: master_values['Master_SSL_Allowed'] = 1 master_values['Master_SSL_CA_File'] = self.master.ssl_ca master_values['Master_SSL_Cert'] = self.master.ssl_cert master_values['Master_SSL_Key'] = self.master.ssl_key change_master = self.slave.make_change_master(self.from_beginning, master_values) #创建复制 # CHANGE MASTER TO MASTER_HOST = '10.186.65.20' MASTER_USER = 'rpl' MASTER_PASSWORD = <secret> MASTER_PORT = 3306 MASTER_AUTO_POSITION=1 res = self.slave.exec_query(change_master, self.query_options) if self.verbosity > 0: print "# %s" % change_master # Start slave if self.verbosity > 0: if not self.from_beginning: if read_master_info: print "# Starting slave from master's last position..." else: msg = "# Starting slave from master log file '%s'" % \ self.master_log_file if self.master_log_pos >= 0: msg = " using position %s" % self.master_log_pos msg = "..." print msg else: print "# Starting slave from the beginning..." res = self.slave.start(self.query_options) # 启动slave 就是执行 START SLAVE # COMMIT提交事务 # Add commit because C/Py are auto_commit=0 by default self.slave.exec_query("COMMIT") # 检查slave的状态 # Check slave status i = 0 while i < num_tries: time.sleep(1) res = self.slave.get_slaves_errors() # SHOW SLAVE STATUS status = res[0] sql_running = res[4] if self.verbosity > 0: io_errorno = res[1] io_error = res[2] io_running = res[3] sql_errorno = res[5] sql_error = res[6] print "# IO status: %s" % status print "# IO thread running: %s" % io_running # if io_errorno = 0 and error = '' -> no error if not io_errorno and not io_error: print "# IO error: None" else: print "# IO error: %s:%s" % (io_errorno, io_error) # if io_errorno = 0 and error = '' -> no error print "# SQL thread running: %s" % sql_running if not sql_errorno and not sql_error: print "# SQL error: None" else: print "# SQL error: %s:%s" % (io_errorno, io_error) if status == "Waiting for master to send event" and sql_running: break elif not sql_running: if self.verbosity > 0: print "# Retry to start the slave SQL thread..." # SQL thread is not running, retry to start it res = self.slave.start_sql_thread(self.query_options) if self.verbosity > 0: print "# Waiting for slave to synchronize with master" i = 1 if i == num_tries: print "ERROR: failed to sync slave with master." result = False if result is True: self.replicating = True return result # 测试复制 # Test the replication setup. if test_db: rpl.test(test_db, 10) # 就是主库执行 CREATE DATABASE TEST1 # 从库执行 SHOW DATABASES检查TEST1是否存在,检查失败重试10次。能检查出来就通过 print "# ...done."

步骤梳理

第一步:检查重要参数的唯一性

第二步:检查InnoDB兼容性

第三步:检查存储引擎一致性

第四步:检查master binary logging

第五步:创建复制

,