canal源码解析系列(5):store模块(下)
-
系列文章索引:
Put操作
前面分析CanalEventStore接口中,我们看到总共有6个put方法,可以分为3类:
-
不带timeout超时参数的put方法,会一直进行阻塞,直到有足够的空间可以放入。
-
带timeout参数超时参数的put方法,如果超过指定时间还未put成功,会抛出InterruptedException。
-
tryPut方法每次只是尝试放入数据,立即返回true或者false,不会阻塞。
事实上,这些方法只是超时机制不同,底层都是通过调用doPut方法来完成真正的数据放入。因此在后面的分析中,笔者只选择其中一种进行讲解。
所有的put操作,在放入数据之前,都需要进行一些前置检查工作,主要检查2点:
1、检查是否足够的slot
默认的bufferSize设置大小为16384,即有16384个slot,每个slot可以存储一个event,因此canal默认最多缓存16384个event。从来另一个角度出发,这意味着putSequence最多比ackSequence可以大16384,不能超过这个值。如果超过了,就意味着尚未没有被消费的数据被覆盖了,相当于丢失了数据。因此,如果Put操作满足以下条件时,是不能新加入数据的
(putSequence + need_put_events_size)- ackSequence > bufferSize
"putSequence + need_put_events_size"的结果为添加数据后的putSequence的最终位置值,要把这个作为预判断条件,其减去ackSequence,如果大于bufferSize,则不能插入数据。需要等待有足够的空间,或者抛出异常。2、检测是否超出了内存限制
前面我们已经看到了,为了控制队列中event占用的总内存大小,可以指定batchMode为MEMSIZE。在这种情况下,buffer.size * buffer.memunit(默认为16M)就表示环形队列存储的event总共可以占用的内存大小。因此当出现以下情况下, 不能加入新的event:
(putMemSize - ackMemSize) > buffer.size * buffer.memunit
关于putMemSize和ackMemSize前面已经介绍过,二者的差值,实际上就是"队列当前包含的event占用的总内存”。下面我们选择可以指定timeout超时时间的put方法进行讲解,如下:
public boolean put(List<Event> data, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException { //1 如果需要插入的List为空,直接返回true if (data == null || data.isEmpty()) { return true; } //2 获得超时时间,并通过加锁进行put操作 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) {//这是一个死循环,执行到下面任意一个return或者抛出异常是时才会停止 //3 检查是否满足插入条件,如果满足,进入到3.1,否则进入到3.2 if (checkFreeSlotAt(putSequence.get() + data.size())) { //3.1 如果满足条件,调用doPut方法进行真正的插入 doPut(data); return true; } //3.2 判断是否已经超时,如果超时,则不执行插入操作,直接返回false if (nanos <= 0) { return false; } //3.3 如果还没有超时,调用notFull.awaitNanos进行等待,需要其他线程调用notFull.signal()方法唤醒。 //唤醒是在ack操作中进行的,ack操作会删除已经消费成功的event,此时队列有了空间,因此可以唤醒,详见ack方法分析 //当被唤醒后,因为这是一个死循环,所以循环中的代码会重复执行。当插入条件满足时,调用doPut方法插入,然后返回 try { nanos = notFull.awaitNanos(nanos); //3.4 如果一直等待到超时,都没有可用空间可以插入,notFull.awaitNanos会抛出InterruptedException } catch (InterruptedException ie) { notFull.signal(); //3.5 超时之后,唤醒一个其他执行put操作且未被中断的线程(不明白是为了干啥) throw ie; } } } finally { lock.unlock(); } }
上述方法的第3步,通过调用checkFreeSlotAt方法来执行插入数据前的检查工作,所做的事情就是我们前面提到的2点:1、检查是否足够的slot 2、检测是否超出了内存限制,源码如下所示:
MemoryEventStoreWithBuffer#checkFreeSlotAt
/**查询是否有空位*/ private boolean checkFreeSlotAt(final long sequence) { //1、检查是否足够的slot。注意方法参数传入的sequence值是:当前putSequence值 + 新插入的event的记录数。 //按照前面的说明,其减去bufferSize不能大于ack位置,或者换一种说法,减去bufferSize不能大于ack位置。 //1.1 首先用sequence值减去bufferSize final long wrapPoint = sequence - bufferSize; //1.2 获取get位置ack位置的较小值,事实上,ack位置总是应该小于等于get位置,因此这里总是应该返回的是ack位置。 final long minPoint = getMinimumGetOrAck(); //1.3 将1.1 与1.2步得到的值进行比较,如果前者大,说明二者差值已经超过了bufferSize,不能插入数据,返回false if (wrapPoint > minPoint) { // 刚好追上一轮 return false; } else { //2 如果batchMode是MEMSIZE,继续检查是否超出了内存限制。 if (batchMode.isMemSize()) { //2.1 使用putMemSize值减去ackMemSize值,得到当前保存的event事件占用的总内存 final long memsize = putMemSize.get() - ackMemSize.get(); //2.2 如果没有超出bufferSize * bufferMemUnit内存限制,返回true,否则返回false if (memsize < bufferSize * bufferMemUnit) { return true; } else { return false; } } else { //3 如果batchMode不是MEMSIZE,说明只限制记录数,则直接返回true return true; } } }
getMinimumGetOrAck方法用于返回getSequence和ackSequence二者的较小值,源码如下所示:
MemoryEventStoreWithBuffer#getMinimumGetOrAck
private long getMinimumGetOrAck() { long get = getSequence.get(); long ack = ackSequence.get(); return ack <= get ? ack : get; }
如前所述,ackSequence总是应该小于等于getSequence,因此这里判断应该是没有必要的,笔者已经给官方提了issue,也得到了确认,参见:https://github.com/alibaba/canal/issues/966。
当checkFreeSlotAt方法检验通过后,最终调用的是doPut方法进行插入。doPut方法主要有4个步骤:
1、将新插入的event数据赋值到Event[]数组的正确位置上,就算完成了插入
2、当新插入的event记录数累加到putSequence上
3、累加新插入的event的大小到putMemSize上
4、调用notEmpty.signal()方法,通知队列中有数据了,如果之前有client获取数据处于阻塞状态,将会被唤醒
MemoryEventStoreWithBuffer#doPut
/*** 执行具体的put操作*/ private void doPut(List<Event> data) { //1 将新插入的event数据赋值到Event[]数组的正确位置上 //1.1 获得putSequence的当前值current,和插入数据后的putSequence结束值end long current = putSequence.get(); long end = current + data.size(); //1.2 循环需要插入的数据,从current位置开始,到end位置结束 for (long next = current + 1; next <= end; next++) { //1.3 通过getIndex方法对next变量转换成正确的位置,设置到Event[]数组中 //需要转换的原因在于,这里的Event[]数组是环形队列的底层实现,其大小为bufferSize值,默认为16384。 //运行一段时间后,接收到的binlog数量肯定会超过16384,每接受到一个event,putSequence+1,因此最终必然超过这个值。 //而next变量是比当前putSequence值要大的,因此必须进行转换,否则会数组越界,转换工作就是在getIndex方法中进行的。 entries[getIndex(next)] = data.get((int) (next - current - 1)); } //2 直接设置putSequence为end值,相当于完成event记录数的累加 putSequence.set(end); //3 累加新插入的event的大小到putMemSize上 if (batchMode.isMemSize()) { //用于记录本次插入的event记录的大小 long size = 0; //循环每一个event for (Event event : data) { //通过calculateSize方法计算每个event的大小,并累加到size变量上 size += calculateSize(event); } //将size变量的值,添加到当前putMemSize putMemSize.getAndAdd(size); } // 4 调用notEmpty.signal()方法,通知队列中有数据了,如果之前有client获取数据处于阻塞状态,将会被唤醒 notEmpty.signal(); }
上述代码中,通过getIndex方法方法来进行位置转换,其内部通过位运算来快速取余数,不再赘述
MemoryEventStoreWithBuffer#getIndex
private int getIndex(long sequcnce) { return (int) sequcnce & indexMask; //bufferSize-1 }
对于batchMode是MEMSIZE的情况下, 还会通过calculateSize方法计算每个event占用的内存大小,累加到putMemSize上。
MemoryEventStoreWithBuffer#calculateSize
private long calculateSize(Event event) { // 直接返回binlog中的事件大小 return event.getEntry().getHeader().getEventLength(); }
其原理在于,mysql的binlog的event header中,都有一个event_length表示这个event占用的字节数。不熟悉mysql binlog event结构的读者可参考:https://dev.mysql.com/doc/internals/en/event-structure.html
parser模块将二进制形式binlog event解析后,这个event_length字段的值也被解析出来了,转换成Event对象后,在存储到store模块时,就可以根据其值判断占用内存大小。
需要注意的是,这个计算并不精确。原始的event_length表示的是event是二进制字节流时的字节数,在转换成java对象后,基本上都会变大。如何获取java对象的真实大小,可参考这个博客:https://www.cnblogs.com/Kidezyq/p/8030098.html。
Get操作
Put操作是canal parser模块解析binlog事件,并经过sink模块过滤后,放入到store模块中,也就是说Put操作实际上是canal内部调用。 Get操作(以及ack、rollback)则不同,其是由client发起的网络请求,server端通过对请求参数进行解析,最终调用CanalEventStore模块中定义的对应方法。
Get操作用于获取指定batchSize大小的Events。提供了3个方法:
// 尝试获取,如果获取不到立即返回 public Events<Event> tryGet(Position start, int batchSize) throws CanalStoreException // 获取指定大小的数据,阻塞等待其操作完成 public Events<Event> get(Position start, int batchSize) throws InterruptedException, CanalStoreException // 获取指定大小的数据,阻塞等待其操作完成或者超时,如果超时了,有多少,返回多少 public Events<Event> get(Position start, int batchSize, long timeout, TimeUnit unit) throws InterruptedException,CanalStoreException
其中:
-
start参数:其类型为Posisiton,表示从哪个位置开始获取
-
batchSize参数:表示批量获取的大小
-
timeout和uint参数:超时参数配置
与Put操作类似,MemoryEventStoreWithBuffer在实现这三个方法时,真正的获取操作都是在doGet方法中进行的。这里我们依然只选择其中一种进行完整的讲解:
public Events<Event> get(Position start, int batchSize, long timeout, TimeUnit unit) throws InterruptedException,CanalStoreException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { if (checkUnGetSlotAt((LogPosition) start, batchSize)) { return doGet(start, batchSize); } if (nanos <= 0) { // 如果时间到了,有多少取多少 return doGet(start, batchSize); } try { nanos = notEmpty.awaitNanos(nanos); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } } } finally { lock.unlock(); } }
可以看到,get方法的实现逻辑与put方法整体上是类似的,不再赘述。这里我们直接关注checkUnGetSlotAt和doGet方法。
checkUnGetSlotAt方法,用于检查是否有足够的event可供获取,根据batchMode的不同,有着不同的判断逻辑
-
如果batchMode为ITEMSIZE,则表示只要有有满足batchSize数量的记录数即可,即putSequence - getSequence >= batchSize
-
如果batchMode为MEMSIZE,此时batchSize不再表示记录数,而是bufferMemUnit的个数,也就是说,获取到的event列表占用的总内存要达到batchSize * bufferMemUnit,即putMemSize-getMemSize >= batchSize * bufferMemUnit
MemoryEventStoreWithBuffer#checkUnGetSlotAt
private boolean checkUnGetSlotAt(LogPosition startPosition, int batchSize) { //1 如果batchMode为ITEMSIZE if (batchMode.isItemSize()) { long current = getSequence.get(); long maxAbleSequence = putSequence.get(); long next = current; //1.1 第一次订阅之后,需要包含一下start位置,防止丢失第一条记录。 if (startPosition == null || !startPosition.getPostion().isIncluded()) { next = next + 1; } //1.2 理论上只需要满足条件:putSequence - getSequence >= batchSize //1.2.1 先通过current < maxAbleSequence进行一下简单判断,如果不满足,可以直接返回false了 //1.2.2 如果1.2.1满足,再通过putSequence - getSequence >= batchSize判断是否有足够的数据 if (current < maxAbleSequence && next + batchSize - 1 <= maxAbleSequence) { return true; } else { return false; } //2 如果batchMode为MEMSIZE } else { long currentSize = getMemSize.get(); long maxAbleSize = putMemSize.get(); //2.1 需要满足条件 putMemSize-getMemSize >= batchSize * bufferMemUnit if (maxAbleSize - currentSize >= batchSize * bufferMemUnit) { return true; } else { return false; } } }
关于1.1步的描述"第一次订阅之后,需要包含一下start位置,防止丢失第一条记录”,这里进行一下特殊说明。首先要明确checkUnGetSlotAt方法的startPosition参数到底是从哪里传递过来的。
当一个client在获取数据时,CanalServerWithEmbedded的getWithoutAck/或get方法会被调用。其内部首先通过CanalMetaManager查找client的消费位置信息,由于是第一次,肯定没有记录,因此返回null,此时会调用CanalEventStore的getFirstPosition()方法,尝试把第一条数据作为消费的开始。而此时CanalEventStore中可能有数据,也可能没有数据。在没有数据的情况下,依然返回null;在有数据的情况下,把第一个Event的位置作为消费开始位置。那么显然,传入checkUnGetSlotAt方法的startPosition参数可能是null,也可能不是null。所以有了以下处理逻辑:
if (startPosition == null || !startPosition.getPostion().isIncluded()) { next = next + 1; }
如果不是null的情况下,尽管把第一个event当做开始位置,但是因为这个event毕竟还没有消费,所以在消费的时候我们必须也将其包含进去。之所以要+1,因为是第一次获取,getSequence的值肯定还是初始值-1,所以要+1变成0之后才是队列的第一个event位置。关于CanalEventStore的getFirstPosition()方法,我们将在最后分析。
当通过checkUnGetSlotAt的检查条件后,通过doGet方法进行真正的数据获取操作,获取主要分为5个步骤:
1、确定从哪个位置开始获取数据
2、根据batchMode是MEMSIZE还是ITEMSIZE,通过不同的方式来获取数据
3、设置PositionRange,表示获取到的event列表开始和结束位置
4、设置ack点
5、累加getSequence,getMemSize值
MemoryEventStoreWithBuffer#doGet
private Events<Event> doGet(Position start, int batchSize) throws CanalStoreException { LogPosition startPosition = (LogPosition) start; //1 确定从哪个位置开始获取数据 //获得当前的get位置 long current = getSequence.get(); //获得当前的put位置 long maxAbleSequence = putSequence.get(); //要获取的第一个Event的位置,一开始等于当前get位置 long next = current; //要获取的最后一个event的位置,一开始也是当前get位置,每获取一个event,end值加1,最大为current+batchSize //因为可能进行ddl隔离,因此可能没有获取到batchSize个event就返回了,此时end值就会小于current+batchSize long end = current; // 如果startPosition为null,说明是第一次订阅,默认+1处理,因为getSequence的值是从-1开始的 // 如果tartPosition不为null,需要包含一下start位置,防止丢失第一条记录 if (startPosition == null || !startPosition.getPostion().isIncluded()) { next = next + 1; } // 如果没有数据,直接返回一个空列表 if (current >= maxAbleSequence) { return new Events<Event>(); } //2 如果有数据,根据batchMode是ITEMSIZE或MEMSIZE选择不同的处理方式 Events<Event> result = new Events<Event>(); //维护要返回的Event列表 List<Event> entrys = result.getEvents(); long memsize = 0; //2.1 如果batchMode是ITEMSIZE if (batchMode.isItemSize()) { end = (next + batchSize - 1) < maxAbleSequence ? (next + batchSize - 1) : maxAbleSequence; //2.1.1 循环从开始位置(next)到结束位置(end),每次循环next+1 for (; next <= end; next++) { //2.1.2 获取指定位置上的事件 Event event = entries[getIndex(next)]; //2.1.3 果是当前事件是DDL事件,且开启了ddl隔离,本次事件处理完后,即结束循环(if语句最后是一行是break) if (ddlIsolation && isDdl(event.getEntry().getHeader().getEventType())) { // 2.1.4 因为ddl事件需要单独返回,因此需要判断entrys中是否应添加了其他事件 if (entrys.size() == 0) {//如果entrys中尚未添加任何其他event entrys.add(event);//加入当前的DDL事件 end = next; // 更新end为当前值 } else { //如果已经添加了其他事件 如果之前已经有DML事件,直接返回了,因为不包含当前next这记录,需要回退一个位置 end = next - 1; // next-1一定大于current,不需要判断 } break; } else {//如果没有开启DDL隔离,直接将事件加入到entrys中 entrys.add(event); } } //2.2 如果batchMode是MEMSIZE } else { //2.2.1 计算本次要获取的event占用最大字节数 long maxMemSize = batchSize * bufferMemUnit; //2.2.2 memsize从0开始,当memsize小于maxMemSize且next未超过maxAbleSequence时,可以进行循环 for (; memsize <= maxMemSize && next <= maxAbleSequence; next++) { //2.2.3 获取指定位置上的Event Event event = entries[getIndex(next)]; //2.2.4 果是当前事件是DDL事件,且开启了ddl隔离,本次事件处理完后,即结束循环(if语句最后是一行是break) if (ddlIsolation && isDdl(event.getEntry().getHeader().getEventType())) { // 如果是ddl隔离,直接返回 if (entrys.size() == 0) { entrys.add(event);// 如果没有DML事件,加入当前的DDL事件 end = next; // 更新end为当前 } else { // 如果之前已经有DML事件,直接返回了,因为不包含当前next这记录,需要回退一个位置 end = next - 1; // next-1一定大于current,不需要判断 } break; } else { //如果没有开启DDL隔离,直接将事件加入到entrys中 entrys.add(event); //并将当前添加的event占用字节数累加到memsize变量上 memsize += calculateSize(event); end = next;// 记录end位点 } } } //3 构造PositionRange,表示本次获取的Event的开始和结束位置 PositionRange<LogPosition> range = new PositionRange<LogPosition>(); result.setPositionRange(range); //3.1 把entrys列表中的第一个event的位置,当做PositionRange的开始位置 range.setStart(CanalEventUtils.createPosition(entrys.get(0))); //3.2 把entrys列表中的最后一个event的位置,当做PositionRange的结束位置 range.setEnd(CanalEventUtils.createPosition(entrys.get(result.getEvents().size() - 1))); //4 记录一下是否存在可以被ack的点,逆序迭代获取到的Event列表 for (int i = entrys.size() - 1; i >= 0; i--) { Event event = entrys.get(i); //4.1.1 如果是事务开始/事务结束/或者dll事件, if (CanalEntry.EntryType.TRANSACTIONBEGIN == event.getEntry().getEntryType() || CanalEntry.EntryType.TRANSACTIONEND == event.getEntry().getEntryType() || isDdl(event.getEntry().getHeader().getEventType())) { // 4.1.2 将其设置为可被ack的点,并跳出循环 range.setAck(CanalEventUtils.createPosition(event)); break; } //4.1.3 如果没有这三种类型事件,意味着没有可被ack的点 } //5 累加getMemSize值,getMemSize值 //5.1 通过AtomLong的compareAndSet尝试增加getSequence值 if (getSequence.compareAndSet(current, end)) {//如果成功,累加getMemSize getMemSize.addAndGet(memsize); //如果之前有put操作因为队列满了而被阻塞,这里发送信号,通知队列已经有空位置,下面还要进行说明 notFull.signal(); return result; } else {//如果失败,直接返回空事件列表 return new Events<Event>(); } }
补充说明:
1 Get数据时,会通过isDdl方法判断event是否是ddl类型。
MemoryEventStoreWithBuffer#isDdl
private boolean isDdl(EventType type) { return type == EventType.ALTER || type == EventType.CREATE || type == EventType.ERASE || type == EventType.RENAME || type == EventType.TRUNCATE || type == EventType.CINDEX || type == EventType.DINDEX; }
这里的EventType是在protocol模块中定义的,并非mysql binlog event结构中的event type。在原始的mysql binlog event类型中,有一个QueryEvent,里面记录的是执行的sql语句,canal通过对这个sql语句进行正则表达式匹配,判断出这个event是否是DDL语句(参见SimpleDdlParser#parse方法)。
2 获取到event列表之后,会构造一个PostionRange对象。
通过CanalEventUtils.createPosition方法计算出第一、最后一个event的位置,作为PostionRange的开始和结束。
事实上,parser模块解析后,已经将位置信息:binlog文件,position封装到了Event中,createPosition方法只是将这些信息提取出来。
CanalEventUtils#createPosition
public static LogPosition createPosition(Event event) { //=============创建一个EntryPosition实例,提取event中的位置信息============ EntryPosition position = new EntryPosition(); //event所在的binlog文件 position.setJournalName(event.getEntry().getHeader().getLogfileName()); //event锁在binlog文件中的位置 position.setPosition(event.getEntry().getHeader().getLogfileOffset()); //event的创建时间 position.setTimestamp(event.getEntry().getHeader().getExecuteTime()); //event是mysql主从集群哪一个实例上生成的,一般都是主库,如果从库没有配置read-only,那么serverId也可能是从库 position.setServerId(event.getEntry().getHeader().getServerId()); //===========将EntryPosition实例封装到一个LogPosition对象中=============== LogPosition logPosition = new LogPosition(); logPosition.setPostion(position); //LogIdentity中包含了这个event来源的mysql实力的ip地址信息 logPosition.setIdentity(event.getLogIdentity()); return logPosition; }
3 获取到Event列表后,会从中逆序寻找第一个类型为"事务开始/事务结束/DDL"的Event,将其位置作为PostionRange的可ack位置。
mysql原生的binlog事件中,总是以一个内容”BEGIN”的QueryEvent作为事务开始,以XidEvent事件表示事务结束。即使我们没有显式的开启事务,对于单独的一个更新语句(如Insert、update、delete),mysql也会默认开启事务。而canal将其转换成更容易理解的自定义EventType类型:TRANSACTIONBEGIN、TRANSACTIONEND。
而将这些事件作为ack点,主要是为了保证事务的完整性。例如client一次拉取了10个binlog event,前5个构成一个事务,后5个还不足以构成一个完整事务。在ack后,如果这个client停止了,也就是说下一个事务还没有被完整处理完。尽管之前ack的是10条数据,但是client重新启动后,将从第6个event开始消费,而不是从第11个event开始消费,因为第6个event是下一个事务的开始。
具体逻辑在于,canal server在接受到client ack后,CanalServerWithEmbedded#ack方法会执行。其内部首先根据ack的batchId找到对应的PositionRange,再找出其中的ack点,通过CanalMetaManager将这个位置记录下来。之后client重启后,再把这个位置信息取出来,从这个位置开始消费。
也就是说,ack位置实际上提供给CanalMetaManager使用的。而对于MemoryEventStoreWithBuffer本身而言,也需要进行ack,用于将已经消费的数据从队列中清除,从而腾出更多的空间存放新的数据。
ack操作
相对于get操作和put操作,ack操作没有重载,只有一个ack方法,用于清空指定position之前的数据,如下:
MemoryEventStoreWithBuffer#ack
public void ack(Position position) throws CanalStoreException { cleanUntil(position); }
CanalStoreScavenge接口定义了2个方法:cleanAll和cleanUntil。前面我们已经看到了在stop时,cleanAll方法会被执行。而每次ack时,cleanUntil方法会被执行,这个方法实现如下所示:
MemoryEventStoreWithBuffer#cleanUntil
// postion表示要ack的配置 public void cleanUntil(Position position) throws CanalStoreException { final ReentrantLock lock = this.lock; lock.lock(); try { //获得当前ack值 long sequence = ackSequence.get(); //获得当前get值 long maxSequence = getSequence.get(); boolean hasMatch = false; long memsize = 0; //迭代所有未被ack的event,从中找出与需要ack的position相同位置的event,清空这个event之前的所有数据。 //一旦找到这个event,循环结束。 for (long next = sequence + 1; next <= maxSequence; next++) { Event event = entries[getIndex(next)];//获得要ack的event memsize += calculateSize(event);//计算当前要ack的event占用字节数 boolean match = CanalEventUtils.checkPosition(event, (LogPosition) position); if (match) {// 找到对应的position,更新ack seq hasMatch = true; if (batchMode.isMemSize()) {//如果batchMode是MEMSIZE ackMemSize.addAndGet(memsize);//累加ackMemSize // 尝试清空buffer中的内存,将ack之前的内存全部释放掉 for (long index = sequence + 1; index < next; index++) { entries[getIndex(index)] = null;// 设置为null } } //累加ack值 //官方注释说,采用compareAndSet,是为了避免并发ack。我觉得根本不会并发ack,因为都加锁了 if (ackSequence.compareAndSet(sequence, next)) { notFull.signal();//如果之前存在put操作因为队列满了而被阻塞,通知其队列有了新空间 return; } } } if (!hasMatch) {// 找不到对应需要ack的position throw new CanalStoreException("no match ack position" + position.toString()); } } finally { lock.unlock(); } }
在匹配尚未ack的Event,是否有匹配的位置时,调用了CanalEventUtils#checkPosition方法。其内部:
-
首先比较Event的生成时间
-
接着,如果位置信息的binlog文件名或者信息不为空的话(通常不为空),则会进行精确匹配
CanalEventUtils#checkPosition
/** * 判断当前的entry和position是否相同 */ public static boolean checkPosition(Event event, LogPosition logPosition) { EntryPosition position = logPosition.getPostion(); CanalEntry.Entry entry = event.getEntry(); //匹配时间 boolean result = position.getTimestamp().equals(entry.getHeader().getExecuteTime()); //判断是否需要根据:binlog文件+position进行比较 boolean exactely = (StringUtils.isBlank(position.getJournalName()) && position.getPosition() == null); if (!exactely) {// 精确匹配 result &= StringUtils.equals(entry.getHeader().getLogfileName(), position.getJournalName()); result &= position.getPosition().equals(entry.getHeader().getLogfileOffset()); } return result; }
rollback操作
相对于put/get/ack操作,rollback操作简单了很多。所谓rollback,就是client已经get到的数据,没能消费成功,因此需要进行回滚。回滚操作特别简单,只需要将getSequence的位置重置为ackSequence,将getMemSize设置为ackMemSize即可。
public void rollback() throws CanalStoreException { final ReentrantLock lock = this.lock; lock.lock(); try { getSequence.set(ackSequence.get()); getMemSize.set(ackMemSize.get()); } finally { lock.unlock(); } }
其他方法
除了上述提到的所有方法外,MemoryEventStoreWithBuffer还提供了getFirstPosition()和getLatestPosition()方法,分别用于获取当前队列中的第一个和最后一个Event的位置信息。前面已经提到,在CanalServerWithEmbedded中会使用getFirstPosition()方法来获取CanalEventStore中存储的第一个Event的位置,而getLatestPosition()只是在一些单元测试中使用到,因此在这里我们只分析getFirstPosition()方法。
第一条数据通过ackSequence当前值对应的Event来确定,因为更早的Event在ack后都已经被删除了。相关源码如下:
MemoryEventStoreWithBuffer#getFirstPosition
//获取第一条数据的position,如果没有数据返回为null public LogPosition getFirstPosition() throws CanalStoreException { final ReentrantLock lock = this.lock; lock.lock(); try { long firstSeqeuence = ackSequence.get(); //1 没有ack过数据,且队列中有数据 if (firstSeqeuence == INIT_SQEUENCE && firstSeqeuence < putSequence.get()) { //没有ack过数据,那么ack为初始值-1,又因为队列中有数据,因此ack+1,即返回队列中第一条数据的位置 Event event = entries[getIndex(firstSeqeuence + 1)]; return CanalEventUtils.createPosition(event, false); //2 已经ack过数据,但是未追上put操作 } else if (firstSeqeuence > INIT_SQEUENCE && firstSeqeuence < putSequence.get()) { //返回最后一次ack的位置数据 + 1 Event event = entries[getIndex(firstSeqeuence + 1)]; return CanalEventUtils.createPosition(event, true); //3 已经ack过数据,且已经追上put操作,说明队列中所有数据都被消费完了 } else if (firstSeqeuence > INIT_SQEUENCE && firstSeqeuence == putSequence.get()) { // 最后一次ack的位置数据,和last为同一条 Event event = entries[getIndex(firstSeqeuence)]; return CanalEventUtils.createPosition(event, false); //4 没有任何数据,返回null } else { return null; } } finally { lock.unlock(); } }
代码逻辑很简单,唯一需要关注的是,通过CanalEventUtils#createPosition(Event, boolean)方法来计算第一个Event的位置,返回的是一个LogPosition对象。其中boolean参数用LogPosition内部维护的EntryPosition的included属性赋值。在前面get方法源码分析时,我们已经看到,当included值为false时,会把当前get位置+1,然后开始获取Event;当为true时,则直接从当前get位置开始获取数据。
本文转载自田守枝的Java技术博客,感谢作者的分享
-