Skip to main content

mysql binlog获取

· 2 min read

mysql 报文:

mysql报文分为两部分:headerpayload

有四个字节,其中前三个字节是标识这个包的长度描述payload的长度,也就是payload最长的长度为2^24-1字节,最后一个字节则是类似于tcp的序列号,每次从0开始递增,描述的是第几个包

payload

payload则是具体负载

mysql握手

tcp三次握手之后,整个传输层的连接已经建立了,那么怎么登陆呢? 握手文档 加密的方式 举个例子:加密套件是mysql_native_password,那么第一个包会是由 server发出,附带20字节的随机码, 然后在客户端的用户提交的密码做多次sha1哈希然后回传给mysql

binlog获取分为两步:

1: COM_REGISTER_SLAVE 把slave注册到master里面 2: COM_BINLOG_DUMP 这个包主要是告诉master锁需要的binlog的名字和位点,然后就会返回一堆binlog事件给客户端

mysql 发送binlog流程

int Binlog_sender::get_binlog_end_pos(File_reader *reader, my_off_t *end_pos) {
DBUG_TRACE;
my_off_t read_pos = reader->position();

do {
/*
MYSQL_BIN_LOG::binlog_end_pos is atomic. We should only acquire the
LOCK_binlog_end_pos if we reached the end of the hot log and are going
to wait for updates on the binary log (Binlog_sender::wait_new_event()).
*/
*end_pos = mysql_bin_log.get_binlog_end_pos();

/* If this is a cold binlog file, we are done getting the end pos */
if (unlikely(!mysql_bin_log.is_active(m_linfo.log_file_name))) {
*end_pos = 0;
return 0;
}

DBUG_PRINT("info", ("Reading file %s, seek pos %llu, end_pos is %llu",
m_linfo.log_file_name, read_pos, *end_pos));
DBUG_PRINT("info", ("Active file is %s", mysql_bin_log.get_log_fname()));

if (read_pos < *end_pos) return 0;

/* Some data may be in net buffer, it should be flushed before waiting */
if (!m_wait_new_events || flush_net()) return 1;

if (unlikely(wait_new_events(read_pos))) return 1;
} while (unlikely(!m_thd->killed));

return 1;
}

c字节对齐

· One min read
_attribute_((packed))

你会在redis的sds.h看到这个gcc的扩展属性,这个属性是拿来干嘛呢?其实是拿来压缩字段长度的

This attribute, attached to an enum, struct, or union type definition, specified that the minimum required memory be used to represent the type.
Specifying this attribute for struct and union types is equivalent to specifying the packed attribute on each of the structure or union members. Specifying the -fshort-enums flag on the line is equivalent to specifying the packed attribute on all enum definitions.

You may only specify this attribute after a closing curly brace on an enum definition, not in a typedef declaration, unless that declaration also contains the definition of the enum.

相关阅读

Tags:

mysql主从

· One min read
  DBUG_PRINT("info", ("Creating new slave thread"));
if (mysql_thread_create(thread_key, &th, &connection_attrib, h_func,
(void *)mi)) {
LogErr(ERROR_LEVEL, ER_RPL_CANT_CREATE_SLAVE_THREAD,
mi->get_for_channel_str());
my_error(ER_SLAVE_THREAD, MYF(0));
goto err;
}

slave 线程

/**
Slave SQL thread entry point.

@param arg Pointer to Relay_log_info object that holds information
for the SQL thread.

@return Always 0.
*/
extern "C" void *handle_slave_sql(void *arg) {
THD *thd; /* needs to be first for thread_stack */
bool thd_added = false;
bool main_loop_error = false;
char llbuff[22], llbuff1[22];

c99柔性数组

· One min read

As a special case, the last element of a structure with more than one named member may have an incomplete array type; this is called a flexible array member. With two exceptions, the flexible array member is ignored. First, the size of the structure shall be equal to the offset of the last element of an otherwise identical structure that replaces the flexible array member with an array of unspecified length.106) Second, when a . (or ->) operator has a left operand that is (a pointer to) a structure with a flexible array member and the right operand names that member, it behaves as if that member were replaced with the longest array (with the same element type) that would not make the structure larger than the object being accessed; the offset of the array shall remain that of the flexible array member, even if this would differ from that of the replacement array. If this array would have no elements, it behaves as if it had one element but the behavior is undefined if any attempt is made to access that element or to generate a pointer one past it.

Tags:

20210413反思过去

· 3 min read

突然间不想写这些业务代码了,就是那么突然.我已经快工作四年了,毫无进展,什么都不会

这几年我究竟做了什么?

给PHP修了两个内存泄漏,相对于写业务,我更喜欢修bug

引入了canal,对binlog更加了解了,而且效果真的很好但是canal其实很多坑.

这些其实很好,但是其他对于我来说都是垃圾时间,我其实真的不喜欢写前端,为什么PHP一定要写前端?

我一点都不会写算法题,真的一点都不会,最近才弄懂循环不变式和霍尔逻辑还有一点点数理逻辑

对于db,我最近才分清隔离级别/事务/mvcc的关系,花了三年半太久了,我也是一年的时候才弄懂PHP关闭连接是什么时候

对于redis其实我根本不懂,我只会简单的set aaa bbb 这样用

对于es,我对分词有所了解因为做过nlp相关需求,但是还是没有用过es

对于我来说,PHP会用,还天天写bug,mysql很多不会用,对于网络,我基本没有用过长连接挺失败的这三年半

对于我这三年半还是挺失败的

开心的地方

最开心的地方在于会找相关的论文来看了,虽然对工作一点用都没有

对于我来说很多东西不是黑盒了,虽然对于面试一点用都没有

我唯一知道的东西

只有数学才是有且仅有的可以提供正确性的工具,实践并不是什么真正有用的东西.

canal需要注意的点

· 6 min read

比较坑的点:

1 每次同步的内容会每秒持久化到file或者zk ,binlog一般只保留的几天,如果你持久化到文件/zk的配置的binlog文件在mysql已经不存在了会报错, 报错信息大概如下

java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file

这个时候只能调整配置或者删除mate.dat 文件,然后重启canal , 这个时候他会使用mysql的语句show status去取最新位点

2 重启canal有个非常非常坑的点在于会读information_schema 这个库的内容去读表名和表id等信息 ,而这个往往会很久,不知道是不是测试环境原因,读了挺久的

mate刷新的逻辑

根据配置每秒刷新到mate信息 也就是文件或者zk上,所以重启会有重复消费

找到位点

加载顺序: 1 从mate中获取位点: getLatestIndexBy 也就是从 memeory/zk或者file的mate信息中读取位点 2 根据配置读取:

    protected EntryPosition findStartPositionInternal(ErosaConnection connection) {
MysqlConnection mysqlConnection = (MysqlConnection) connection;
LogPosition logPosition = logPositionManager.getLatestIndexBy(destination);
if (logPosition == null) {// 找不到历史成功记录
EntryPosition entryPosition = null;
if (masterInfo != null && mysqlConnection.getConnector().getAddress().equals(masterInfo.getAddress())) {
entryPosition = masterPosition;
} else if (standbyInfo != null
&& mysqlConnection.getConnector().getAddress().equals(standbyInfo.getAddress())) {
entryPosition = standbyPosition;
}

if (entryPosition == null) {
entryPosition = findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费
}

// 判断一下是否需要按时间订阅
if (StringUtils.isEmpty(entryPosition.getJournalName())) {
// 如果没有指定binlogName,尝试按照timestamp进行查找
if (entryPosition.getTimestamp() != null && entryPosition.getTimestamp() > 0L) {
logger.warn("prepare to find start position {}:{}:{}",
new Object[] { "", "", entryPosition.getTimestamp() });
return findByStartTimeStamp(mysqlConnection, entryPosition.getTimestamp());
} else {
logger.warn("prepare to find start position just show master status");
return findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费
}
} else {
if (entryPosition.getPosition() != null && entryPosition.getPosition() > 0L) {
// 如果指定binlogName + offest,直接返回
entryPosition = findPositionWithMasterIdAndTimestamp(mysqlConnection, entryPosition);
logger.warn("prepare to find start position {}:{}:{}",
new Object[] { entryPosition.getJournalName(), entryPosition.getPosition(),
entryPosition.getTimestamp() });
return entryPosition;
} else {
EntryPosition specificLogFilePosition = null;
if (entryPosition.getTimestamp() != null && entryPosition.getTimestamp() > 0L) {
// 如果指定binlogName +
// timestamp,但没有指定对应的offest,尝试根据时间找一下offest
EntryPosition endPosition = findEndPosition(mysqlConnection);
if (endPosition != null) {
logger.warn("prepare to find start position {}:{}:{}",
new Object[] { entryPosition.getJournalName(), "", entryPosition.getTimestamp() });
specificLogFilePosition = findAsPerTimestampInSpecificLogFile(mysqlConnection,
entryPosition.getTimestamp(),
endPosition,
entryPosition.getJournalName(),
true);
}
}

if (specificLogFilePosition == null) {
// position不存在,从文件头开始
entryPosition.setPosition(BINLOG_START_OFFEST);
return entryPosition;
} else {
return specificLogFilePosition;
}
}
}
} else {
if (logPosition.getIdentity().getSourceAddress().equals(mysqlConnection.getConnector().getAddress())) {
if (dumpErrorCountThreshold >= 0 && dumpErrorCount > dumpErrorCountThreshold) {
// binlog定位位点失败,可能有两个原因:
// 1. binlog位点被删除
// 2.vip模式的mysql,发生了主备切换,判断一下serverId是否变化,针对这种模式可以发起一次基于时间戳查找合适的binlog位点
boolean case2 = (standbyInfo == null || standbyInfo.getAddress() == null)
&& logPosition.getPostion().getServerId() != null
&& !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection));
if (case2) {
EntryPosition findPosition = fallbackFindByStartTimestamp(logPosition, mysqlConnection);
dumpErrorCount = 0;
return findPosition;
}
// 处理 binlog 位点被删除的情况,提供自动重置到当前位点的功能
// 应用场景: 测试环境不稳定,位点经常被删。强烈不建议在正式环境中开启此控制参数,因为binlog 丢失调到最新位点也即意味着数据丢失
if (isAutoResetLatestPosMode()) {
dumpErrorCount = 0;
return findEndPosition(mysqlConnection);
}
Long timestamp = logPosition.getPostion().getTimestamp();
if (isRdsOssMode() && (timestamp != null && timestamp > 0)) {
// 如果binlog位点不存在,并且属于timestamp不为空,可以返回null走到oss binlog处理
return null;
}
} else if (StringUtils.isBlank(logPosition.getPostion().getJournalName())
&& logPosition.getPostion().getPosition() <= 0
&& logPosition.getPostion().getTimestamp() > 0) {
return fallbackFindByStartTimestamp(logPosition,mysqlConnection);
}
// 其余情况
logger.warn("prepare to find start position just last position\n {}",
JsonUtils.marshalToString(logPosition));
return logPosition.getPostion();
} else {
// 针对切换的情况,考虑回退时间
long newStartTimestamp = logPosition.getPostion().getTimestamp() - fallbackIntervalInSeconds * 1000;
logger.warn("prepare to find start position by switch {}:{}:{}", new Object[] { "", "",
logPosition.getPostion().getTimestamp() });
return findByStartTimeStamp(mysqlConnection, newStartTimestamp);
}
}

事件类型

事件有很多类型,我现在只对update和insert 感兴趣

public enum EventType
implements com.google.protobuf.ProtocolMessageEnum {
/**
* <code>INSERT = 1;</code>
*/
INSERT(0, 1),
/**
* <code>UPDATE = 2;</code>
*/
UPDATE(1, 2),
/**
* <code>DELETE = 3;</code>
*/
DELETE(2, 3),
/**
* <code>CREATE = 4;</code>
*/
CREATE(3, 4),
/**
* <code>ALTER = 5;</code>
*/
ALTER(4, 5),
/**
* <code>ERASE = 6;</code>
*/
ERASE(5, 6),
/**
* <code>QUERY = 7;</code>
*/
QUERY(6, 7),
/**
* <code>TRUNCATE = 8;</code>
*/
TRUNCATE(7, 8),
/**
* <code>RENAME = 9;</code>
*/
RENAME(8, 9),
/**
* <code>CINDEX = 10;</code>
*
* <pre>
**CREATE INDEX*
* </pre>
*/
CINDEX(9, 10),
/**
* <code>DINDEX = 11;</code>
*/
DINDEX(10, 11),
/**
* <code>GTID = 12;</code>
*/
GTID(11, 12),
/**
* <code>XACOMMIT = 13;</code>
*
* <pre>
** XA *
* </pre>
*/
XACOMMIT(12, 13),
/**
* <code>XAROLLBACK = 14;</code>
*/
XAROLLBACK(13, 14),
/**
* <code>MHEARTBEAT = 15;</code>
*
* <pre>
** MASTER HEARTBEAT *
* </pre>
*/
MHEARTBEAT(14, 15),
;
  • 相关阅读

http://www.tianshouzhi.com/api/tutorials/canal

tcp协议

· 3 min read

状态机

                              +---------+ ---------\      active OPEN
| CLOSED | \ -----------
+---------+<---------\ \ create TCB
| ^ \ \ snd SYN
passive OPEN | | CLOSE \ \
------------ | | ---------- \ \
create TCB | | delete TCB \ \
V | \ \
+---------+ CLOSE | \
| LISTEN | ---------- | |
+---------+ delete TCB | |
rcv SYN | | SEND | |
----------- | | ------- | V
+---------+ snd SYN,ACK / \ snd SYN +---------+
| |<----------------- ------------------>| |
| SYN | rcv SYN | SYN |
| RCVD |<-----------------------------------------------| SENT |
| | snd ACK | |
| |------------------ -------------------| |
+---------+ rcv ACK of SYN \ / rcv SYN,ACK +---------+
| -------------- | | -----------
| x | | snd ACK
| V V
| CLOSE +---------+
| ------- | ESTAB |
| snd FIN +---------+
| CLOSE | | rcv FIN
V ------- | | -------
+---------+ snd FIN / \ snd ACK +---------+
| FIN |<----------------- ------------------>| CLOSE |
| WAIT-1 |------------------ | WAIT |
+---------+ rcv FIN \ +---------+
| rcv ACK of FIN ------- | CLOSE |
| -------------- snd ACK | ------- |
V x V snd FIN V
+---------+ +---------+ +---------+
|FINWAIT-2| | CLOSING | | LAST-ACK|
+---------+ +---------+ +---------+
| rcv ACK of FIN | rcv ACK of FIN |
| rcv FIN -------------- | Timeout=2MSL -------------- |
| ------- x V ------------ x V
\ snd ACK +---------+delete TCB +---------+
------------------------>|TIME WAIT|------------------>| CLOSED |
+---------+ +---------+

拥塞控制

1 慢开始和拥塞避免 2 快速重传

慢开始

慢开始为了什么?

快速重传

快速恢复

我的理解

tcp本质是什么?
本质是一个字节流
为什么会有大小端问题? 因为字节流终究是字节流,如果你只要一个字节的的话在不同端的机器一点问题都没有
怎么保证不丢包?
序号+重传,为什么序号可以?
因为序号是和包一一映射的,所以序号和报文是同构的,也就是一一映射的 重传有什么问题吗?
因为序号和包一一对应,也就是幂等的,所以重传没有什么问题

为什么需要状态机? 因为状态机是从一个状态到另外一个状态,这样我们更加明确整个流程

相关阅读