canal源码解析系列(5):store模块(上)
-
系列文章索引:
store模块简介
store模块用于binlog事件的存储 ,目前开源的版本中仅实现了Memory内存模式。官方文档中提到"后续计划增加本地file存储,mixed混合模式”,这句话大家不必当真,从笔者最开始接触canal到现在已经几年了,依然没有动静,好在Memory内存模式已经可以满足绝大部分场景。
store模块目录结构如下,该模块的核心接口为 CanalEventStore
以下是相关类图:
其中MemoryEventStoreWithBuffer就是内存模式的实现,是我们分析的重点,其实现了CanalEventStore接口,并继承了AbstractCanalStoreScavenge抽象类。需要注意的是,AbstractCanalStoreScavenge这个类中定义的字段和方法在开源版本中并没有任何地方使用到,因此我们不会对其进行分析。MemoryEventStoreWithBuffer的实现借鉴了Disruptor的RingBuffer。简而言之,你可以把其当做一个环形队列,如下:
针对这个环形队列,canal定义了3类操作:Put、Get、Ack,其中:-
Put 操作:添加数据。event parser模块拉取到binlog后,并经过event sink模块过滤,最终就通过Put操作存储到了队列中。
-
Get操作:获取数据。canal client连接到canal server后,最终获取到的binlog都是从这个队列中取得。
-
Ack操作:确认消费成功。canal client获取到binlog事件消费后,需要进行Ack。你可以认为Ack操作实际上就是将消费成功的事件从队列中删除,如果一直不Ack的话,队列满了之后,Put操作就无法添加新的数据了。
对应的,我们需要使用3个变量来记录Put、Get、Ack这三个操作的位置,其中:
-
putSequence: 每放入一个数据putSequence +1,可表示存储数据存储的总数量
-
getSequence: 每获取一个数据getSequence +1,可表示数据订阅获取的最后一次提取位置
-
ackSequence: 每确认一个数据ackSequence + 1,可表示数据最后一次消费成功位置
另外,putSequence、getSequence、ackSequence这3个变量初始值都是-1,且都是递增的,均用long型表示。由于数据只有被Put进来后,才能进行Get;Get之后才能进行Ack。 所以,这三个变量满足以下关系:
ackSequence <= getSequence <= putSequence
如果将RingBuffer拉直来看,将会变得更加直观:
通过对这3个位置进行运算,我们可以得到一些有用的信息,如:
计算当前可消费的event数量:
当前可消费的event数量 = putSequence - getSequence
计算当前队列的大小(即队列中还有多少事件等待消费):
当前队列的大小 = putSequence - ackSequence
在进行Put/Get/Ack操作时,首先都要确定操作到环形队列的哪个位置。环形队列的bufferSize默认大小是16384,而这3个操作的位置变量putSequence、getSequence、ackSequence都是递增的,显然最终都会超过bufferSize。因此必须要对这3个值进行转换。最简单的操作就是使用%进行取余。
举例来说,putSequence的当前值为16383,这已经是环形队列的最大下标了(从0开始计算),下一个要插入的数据要在第16384个位置上,此时可以使用16384 % bufferSize = 0,因此下一个要插入的数据在0号位置上。可见,当达到队列的最大下标时,再从头开始循环,这也是为什么称之为环形队列的原因。当然在实际操作时,更加复杂,如0号位置上已经有数据了,就不能插入,需要等待这个位置被释放出来,否则出现数据覆盖。
canal使用的是通过位操作进行取余,这种取余方式与%作用完全相同,只不过因为是位操作,因此更加高效。其计算方式如下:
操作位置 = sequence & (bufferSize - 1)
需要注意的是,这种方式只对除数是2的N次方幂时才有效,如果对于位运算取余不熟悉,可参考:https://blog.csdn.net/actionzh/article/details/78976082。
在canal.properties文件中定义了几个MemoryEventStoreWithBuffer的配置参数,主要用于控制环形队列的大小和存储的数据可占用的最大内存,如下:
canal.instance.memory.buffer.size = 16384 canal.instance.memory.buffer.memunit = 1024 canal.instance.memory.batch.mode = MEMSIZE
其中:
canal.instance.memory.buffer.size:
表示RingBuffer队列的最大容量,也就是可缓存的binlog事件的最大记录数,其值需要为2的指数(原因如前所述,canal通过位运算进行取余),默认值为2^16=16384。
canal.instance.memory.buffer.memunit:
表示RingBuffer使用的内存单元, 默认是1kb。和canal.instance.memory.buffer.size组合决定最终的内存使用大小。需要注意的是,这个配置项仅仅是用于计算占用总内存,并不是限制每个event最大为1kb。
canal.instance.memory.batch.mode:
表示canal内存store中数据缓存模式,支持两种方式:
-
ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量。这种方式有一些潜在的问题,举个极端例子,假设每个event有1M,那么16384个这种event占用内存要达到16G左右,基本上肯定会造成内存溢出(超大内存的物理机除外)。
-
MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录占用的总内存大小。指定为这种模式时,意味着默认缓存的event占用的总内存不能超过16384*1024=16M。这个值偏小,但笔者认为也足够了。因为通常我们在一个服务器上会部署多个instance,每个instance的store模块都会占用16M,因此只要instance的数量合适,也就不会浪费内存了。部分读者可能会担心,这是否限制了一个event的最大大小为16M,实际上是没有这个限制的。因为canal在Put一个新的event时,只会判断队列中已有的event占用的内存是否超过16M,如果没有,新的event不论大小是多少,总是可以放入的(canal的内存计算实际上是不精确的),之后的event再要放入时,如果这个超过16M的event没有被消费,则需要进行等待。
在canal自带的instance.xml文件中,使用了这些配置项来创建MemoryEventStoreWithBuffer实例,如下:
<bean id="eventStore" class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer"> <property name="bufferSize" value="${canal.instance.memory.buffer.size:16384}" /> <property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" /> <property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" /> <property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" /> </bean>
这里我们还看到了一个ddlIsolation属性,其对于Get操作生效,用于设置ddl语句是否单独一个batch返回(比如下游dml/ddl如果做batch内无序并发处理,会导致结构不一致)。其值通过canal.instance.get.ddl.isolation配置项来设置,默认值为false。
CanalEventStore接口
通过前面的分析,我们知道了环形队列要支持三种操作:Put、Get、Ack,针对这三种操作,在CanalEventStore中都有相应的方法定义,如下所示:
com.alibaba.otter.canal.store.CanalEventStore
/** * canel数据存储接口 */ public interface CanalEventStore<T> extends CanalLifeCycle, CanalStoreScavenge { //==========================Put操作============================== /**添加一组数据对象,阻塞等待其操作完成 (比如一次性添加一个事务数据)*/ void put(List<T> data) throws InterruptedException, CanalStoreException; /**添加一组数据对象,阻塞等待其操作完成或者时间超时 (比如一次性添加一个事务数据)*/ boolean put(List<T> data, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException; /**添加一组数据对象 (比如一次性添加一个事务数据)*/ boolean tryPut(List<T> data) throws CanalStoreException; /**添加一个数据对象,阻塞等待其操作完成*/ void put(T data) throws InterruptedException, CanalStoreException; /**添加一个数据对象,阻塞等待其操作完成或者时间超时*/ boolean put(T data, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException; /** 添加一个数据对象*/ boolean tryPut(T data) throws CanalStoreException; //==========================GET操作============================== /** 获取指定大小的数据,阻塞等待其操作完成*/ Events<T> get(Position start, int batchSize) throws InterruptedException, CanalStoreException; /**获取指定大小的数据,阻塞等待其操作完成或者时间超时*/ Events<T> get(Position start, int batchSize, long timeout, TimeUnit unit) throws InterruptedException,CanalStoreException; /**根据指定位置,获取一个指定大小的数据*/ Events<T> tryGet(Position start, int batchSize) throws CanalStoreException; //=========================Ack操作============================== /**删除{@linkplain Position}之前的数据*/ void ack(Position position) throws CanalStoreException; //==========================其他操作============================== /** 获取最后一条数据的position*/ Position getLatestPosition() throws CanalStoreException; /**获取第一条数据的position,如果没有数据返回为null*/ Position getFirstPosition() throws CanalStoreException; /**出错时执行回滚操作(未提交ack的所有状态信息重新归位,减少出错时数据全部重来的成本)*/ void rollback() throws CanalStoreException; }
可以看到Put/Get/Ack操作都有多种重载形式,各个方法的作用参考方法注释即可,后文在分析MemoryEventStoreWithBuffer时,将会进行详细的介绍。
这里对 get方法返回的Events对象,进行一下说明:
com.alibaba.otter.canal.store.model.Events
public class Events<EVENT> implements Serializable { private static final long serialVersionUID = -7337454954300706044L; private PositionRange positionRange = new PositionRange(); private List<EVENT> events = new ArrayList<EVENT>(); //setters getters and toString }
其中:CanalEntry.Entry和LogIdentity也都是protocol模块中的类:
-
LogIdentity记录这个Event的来源信息mysql地址(sourceAddress)和slaveId。
-
CanalEntry.Entry封装了binlog事件的数据
MemoryEventStoreWithBuffer
MemoryEventStoreWithBuffer是目前开源版本中的CanalEventStore接口的唯一实现,基于内存模式。当然你也可以进行扩展,提供一个基于本地文件存储方式的CanalEventStore实现。这样就可以一份数据让多个业务费进行订阅,只要独立维护消费位置元数据即可。然而,我不得不提醒你的是,基于本地文件的存储方式,一定要考虑好数据清理工作,否则会有大坑。
如果一个库只有一个业务方订阅,其实根本也不用实现本地存储,使用基于内存模式的队列进行缓存即可。如果client消费的快,那么队列中的数据放入后就被取走,队列基本上一直是空的,实现本地存储也没意义;如果client消费的慢,队列基本上一直是满的,只要client来获取,总是能拿到数据,因此也没有必要实现本地存储。
言归正传,下面对MemoryEventStoreWithBuffer的源码进行分析。
MemoryEventStoreWithBuffer字段
首先对MemoryEventStoreWithBuffer中定义的字段进行一下介绍,这是后面分析其他方法的基础,如下:
public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge implements CanalEventStore<Event>, CanalStoreScavenge { private static final long INIT_SQEUENCE = -1; private int bufferSize = 16 * 1024; // memsize的单位,默认为1kb大小 private int bufferMemUnit = 1024; private int indexMask; private Event[] entries; // 记录下put/get/ack操作的三个下标,初始值都是-1 // 代表当前put操作最后一次写操作发生的位置 private AtomicLong putSequence = new AtomicLong(INIT_SQEUENCE); // 代表当前get操作读取的最后一条的位置 private AtomicLong getSequence = new AtomicLong(INIT_SQEUENCE); // 代表当前ack操作的最后一条的位置 private AtomicLong ackSequence = new AtomicLong(INIT_SQEUENCE); // 记录下put/get/ack操作的三个memsize大小 private AtomicLong putMemSize = new AtomicLong(0); private AtomicLong getMemSize = new AtomicLong(0); private AtomicLong ackMemSize = new AtomicLong(0); // 阻塞put/get操作控制信号 private ReentrantLock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition(); // 默认为内存大小模式 private BatchMode batchMode = BatchMode.ITEMSIZE; private boolean ddlIsolation = false; ... }
属性说明:
-
bufferSize、bufferMemUnit、batchMode、ddlIsolation、putSequence、getSequence、ackSequence:
这几个属性前面已经介绍过,这里不再赘述。
-
entries:
类型为Event[]数组,环形队列底层基于的Event[]数组,队列的大小就是bufferSize。关于如何使用数组来实现环形队列,可参考笔者的另一篇文章http://www.tianshouzhi.com/api/tutorials/basicalgorithm/43。
-
indexMask
用于对putSequence、getSequence、ackSequence进行取余操作,前面已经介绍过canal通过位操作进行取余,其值为bufferSize-1 ,参见下文的start方法
-
putMemSize、getMemSize、ackMemSize:
分别用于记录put/get/ack操作的event占用内存的累加值,都是从0开始计算。例如每put一个event,putMemSize就要增加这个event占用的内存大小;get和ack操作也是类似。这三个变量,都是在batchMode指定为MEMSIZE的情况下,才会发生作用。
因为都是累加值,所以我们需要进行一些运算,才能得有有用的信息,如:
计算出当前环形队列当前占用的内存大小
环形队列当前占用的内存大小 = putMemSize - ackMemSize
前面我们提到,batchMode为MEMSIZE时,需要限制环形队列中event占用的总内存,事实上在执行put操作前,就是通过这种方式计算出来当前大小,然后我们限制的bufferSize * bufferMemUnit大小进行比较。
计算尚未被获取的事件占用的内存大小
尚未被获取的事件占用的内存大小 = putMemSize - getMemSize
batchMode除了对PUT操作有限制,对Get操作也有影响。Get操作可以指定一个batchSize,用于指定批量获取的大小。当batchMode为MEMSIZE时,其含义就在不再是记录数,而是要获取到总共占用 batchSize * bufferMemUnit 内存大小的事件数量。
lock、notFull、notEmpty:
阻塞put/get操作控制信号。notFull用于控制put操作,只有队列没满的情况下才能put。notEmpty控制get操作,只有队列不为空的情况下,才能get。put操作和get操作共用一把锁(lock)。
启动和停止方法
MemoryEventStoreWithBuffer实现了CanalLifeCycle接口,因此实现了其定义的start、stop方法
start启动方法
start方法主要是初始化MemoryEventStoreWithBuffer内部的环形队列,其实就是初始化一下Event[]数组。
public void start() throws CanalStoreException { super.start(); if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } indexMask = bufferSize - 1;//初始化indexMask,前面已经介绍过,用于通过位操作进行取余 entries = new Event[bufferSize];//创建循环队列基于的底层数组,大小为bufferSize }
stop停止方法
stop方法作用是停止,在停止时会清空所有缓存的数据,将维护的相关状态变量设置为初始值。MemoryEventStoreWithBuffer#stop
public void stop() throws CanalStoreException { super.stop(); //清空所有缓存的数据,将维护的相关状态变量设置为初始值 cleanAll(); }
在停止时,通过调用cleanAll方法清空所有缓存的数据。
cleanAll方法是在CanalStoreScavenge接口中定义的,在MemoryEventStoreWithBuffer中进行了实现, 此外这个接口还定义了另外一个方法cleanUtil,在执行ack操作时会被调用,我们将在介绍ack方法时进行讲解。
MemoryEventStoreWithBuffer#cleanAll
public void cleanAll() throws CanalStoreException { final ReentrantLock lock = this.lock; lock.lock(); try { //将Put/Get/Ack三个操作的位置都重置为初始状态-1 putSequence.set(INIT_SQEUENCE); getSequence.set(INIT_SQEUENCE); ackSequence.set(INIT_SQEUENCE); //将Put/Get/Ack三个操作的memSize都重置为0 putMemSize.set(0); getMemSize.set(0); ackMemSize.set(0); //将底层Event[]数组置为null,相当于清空所有数据 entries = null; } finally { lock.unlock(); } }
本文转载自田守枝的Java技术博客,感谢作者的分享
-