Navigation

    • Register
    • Login
    • Search
    • Categories
    • Recent
    • Tags
    • Popular
    • Users
    • Groups

    canal源码解析系列(5):store模块(上)

    技术文章分享
    canal 源码解析
    1
    1
    533
    Loading More Posts
    • Oldest to Newest
    • Newest to Oldest
    • Most Votes
    Reply
    • Reply as topic
    Log in to reply
    This topic has been deleted. Only users with topic management privileges can see it.
    • CloudCanal-万少
      CloudCanal-万少 last edited by CloudCanal-万少

      系列文章索引:

      1. canal源码分析简介
      2. deployer模块(上)
      3. deployer模块(下)
      4. server模块
      5. instance模块
      6. store模块(上)
      7. store模块(下)
      8. filter模块
      9. driver模块

      store模块简介

      store模块用于binlog事件的存储 ,目前开源的版本中仅实现了Memory内存模式。官方文档中提到"后续计划增加本地file存储,mixed混合模式”,这句话大家不必当真,从笔者最开始接触canal到现在已经几年了,依然没有动静,好在Memory内存模式已经可以满足绝大部分场景。

      store模块目录结构如下,该模块的核心接口为 CanalEventStore

      76c413ed-84ce-4f1c-ae3e-5fc37619b6c0-image.png

      以下是相关类图:

      955fa32c-28fb-45a9-a487-0dc64e186b63-image.png
      其中MemoryEventStoreWithBuffer就是内存模式的实现,是我们分析的重点,其实现了CanalEventStore接口,并继承了AbstractCanalStoreScavenge抽象类。需要注意的是,AbstractCanalStoreScavenge这个类中定义的字段和方法在开源版本中并没有任何地方使用到,因此我们不会对其进行分析。

          MemoryEventStoreWithBuffer的实现借鉴了Disruptor的RingBuffer。简而言之,你可以把其当做一个环形队列,如下: 
      

      0296dd0d-f364-457b-b068-5fb9a37cfa5b-image.png
      针对这个环形队列,canal定义了3类操作:Put、Get、Ack,其中:

      • Put 操作:添加数据。event parser模块拉取到binlog后,并经过event sink模块过滤,最终就通过Put操作存储到了队列中。

      • Get操作:获取数据。canal client连接到canal server后,最终获取到的binlog都是从这个队列中取得。

      • Ack操作:确认消费成功。canal client获取到binlog事件消费后,需要进行Ack。你可以认为Ack操作实际上就是将消费成功的事件从队列中删除,如果一直不Ack的话,队列满了之后,Put操作就无法添加新的数据了。

      对应的,我们需要使用3个变量来记录Put、Get、Ack这三个操作的位置,其中:

      • putSequence: 每放入一个数据putSequence +1,可表示存储数据存储的总数量

      • getSequence: 每获取一个数据getSequence +1,可表示数据订阅获取的最后一次提取位置

      • ackSequence: 每确认一个数据ackSequence + 1,可表示数据最后一次消费成功位置

      另外,putSequence、getSequence、ackSequence这3个变量初始值都是-1,且都是递增的,均用long型表示。由于数据只有被Put进来后,才能进行Get;Get之后才能进行Ack。 所以,这三个变量满足以下关系:

      ackSequence <= getSequence <= putSequence
      

      如果将RingBuffer拉直来看,将会变得更加直观:

      defe0de7-1b86-482d-8196-7646ae455730-image.png

      通过对这3个位置进行运算,我们可以得到一些有用的信息,如:

      计算当前可消费的event数量:

      当前可消费的event数量 = putSequence - getSequence
      

      计算当前队列的大小(即队列中还有多少事件等待消费):

      当前队列的大小 = putSequence - ackSequence
      

      在进行Put/Get/Ack操作时,首先都要确定操作到环形队列的哪个位置。环形队列的bufferSize默认大小是16384,而这3个操作的位置变量putSequence、getSequence、ackSequence都是递增的,显然最终都会超过bufferSize。因此必须要对这3个值进行转换。最简单的操作就是使用%进行取余。

      举例来说,putSequence的当前值为16383,这已经是环形队列的最大下标了(从0开始计算),下一个要插入的数据要在第16384个位置上,此时可以使用16384 % bufferSize = 0,因此下一个要插入的数据在0号位置上。可见,当达到队列的最大下标时,再从头开始循环,这也是为什么称之为环形队列的原因。当然在实际操作时,更加复杂,如0号位置上已经有数据了,就不能插入,需要等待这个位置被释放出来,否则出现数据覆盖。

      canal使用的是通过位操作进行取余,这种取余方式与%作用完全相同,只不过因为是位操作,因此更加高效。其计算方式如下:

      操作位置 = sequence & (bufferSize - 1)
      

      需要注意的是,这种方式只对除数是2的N次方幂时才有效,如果对于位运算取余不熟悉,可参考:https://blog.csdn.net/actionzh/article/details/78976082。

      在canal.properties文件中定义了几个MemoryEventStoreWithBuffer的配置参数,主要用于控制环形队列的大小和存储的数据可占用的最大内存,如下:

      canal.instance.memory.buffer.size = 16384
      canal.instance.memory.buffer.memunit = 1024
      canal.instance.memory.batch.mode = MEMSIZE
      

      其中:

      canal.instance.memory.buffer.size:

      表示RingBuffer队列的最大容量,也就是可缓存的binlog事件的最大记录数,其值需要为2的指数(原因如前所述,canal通过位运算进行取余),默认值为2^16=16384。

      canal.instance.memory.buffer.memunit:

      表示RingBuffer使用的内存单元, 默认是1kb。和canal.instance.memory.buffer.size组合决定最终的内存使用大小。需要注意的是,这个配置项仅仅是用于计算占用总内存,并不是限制每个event最大为1kb。

      canal.instance.memory.batch.mode:

      表示canal内存store中数据缓存模式,支持两种方式:

      • ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量。这种方式有一些潜在的问题,举个极端例子,假设每个event有1M,那么16384个这种event占用内存要达到16G左右,基本上肯定会造成内存溢出(超大内存的物理机除外)。

      • MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录占用的总内存大小。指定为这种模式时,意味着默认缓存的event占用的总内存不能超过16384*1024=16M。这个值偏小,但笔者认为也足够了。因为通常我们在一个服务器上会部署多个instance,每个instance的store模块都会占用16M,因此只要instance的数量合适,也就不会浪费内存了。部分读者可能会担心,这是否限制了一个event的最大大小为16M,实际上是没有这个限制的。因为canal在Put一个新的event时,只会判断队列中已有的event占用的内存是否超过16M,如果没有,新的event不论大小是多少,总是可以放入的(canal的内存计算实际上是不精确的),之后的event再要放入时,如果这个超过16M的event没有被消费,则需要进行等待。

      在canal自带的instance.xml文件中,使用了这些配置项来创建MemoryEventStoreWithBuffer实例,如下:

      <bean id="eventStore" class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer">
         <property name="bufferSize" value="${canal.instance.memory.buffer.size:16384}" />
         <property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
         <property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" />
         <property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" />
      </bean>
      

      这里我们还看到了一个ddlIsolation属性,其对于Get操作生效,用于设置ddl语句是否单独一个batch返回(比如下游dml/ddl如果做batch内无序并发处理,会导致结构不一致)。其值通过canal.instance.get.ddl.isolation配置项来设置,默认值为false。

      CanalEventStore接口

      通过前面的分析,我们知道了环形队列要支持三种操作:Put、Get、Ack,针对这三种操作,在CanalEventStore中都有相应的方法定义,如下所示:

      com.alibaba.otter.canal.store.CanalEventStore

      /**
      * canel数据存储接口
      */
      public interface CanalEventStore<T> extends CanalLifeCycle, CanalStoreScavenge {
          //==========================Put操作==============================
          /**添加一组数据对象,阻塞等待其操作完成 (比如一次性添加一个事务数据)*/
          void put(List<T> data) throws InterruptedException, CanalStoreException;
          /**添加一组数据对象,阻塞等待其操作完成或者时间超时 (比如一次性添加一个事务数据)*/
          boolean put(List<T> data, long timeout, TimeUnit unit) throws InterruptedException, 
          CanalStoreException;
          /**添加一组数据对象 (比如一次性添加一个事务数据)*/
          boolean tryPut(List<T> data) throws CanalStoreException;
          /**添加一个数据对象,阻塞等待其操作完成*/
          void put(T data) throws InterruptedException, CanalStoreException;
          /**添加一个数据对象,阻塞等待其操作完成或者时间超时*/
          boolean put(T data, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException;
          /** 添加一个数据对象*/
          boolean tryPut(T data) throws CanalStoreException;
          
          //==========================GET操作==============================
          /** 获取指定大小的数据,阻塞等待其操作完成*/
          Events<T> get(Position start, int batchSize) throws InterruptedException, CanalStoreException;
          /**获取指定大小的数据,阻塞等待其操作完成或者时间超时*/
          Events<T> get(Position start, int batchSize, long timeout, TimeUnit unit) throws 
          InterruptedException,CanalStoreException;
          /**根据指定位置,获取一个指定大小的数据*/
          Events<T> tryGet(Position start, int batchSize) throws CanalStoreException;
          
          //=========================Ack操作==============================
          /**删除{@linkplain Position}之前的数据*/
          void ack(Position position) throws CanalStoreException;
         //==========================其他操作==============================
          /** 获取最后一条数据的position*/
          Position getLatestPosition() throws CanalStoreException;
          /**获取第一条数据的position,如果没有数据返回为null*/
          Position getFirstPosition() throws CanalStoreException;
          /**出错时执行回滚操作(未提交ack的所有状态信息重新归位,减少出错时数据全部重来的成本)*/
          void rollback() throws CanalStoreException;
      }
      

      可以看到Put/Get/Ack操作都有多种重载形式,各个方法的作用参考方法注释即可,后文在分析MemoryEventStoreWithBuffer时,将会进行详细的介绍。

      这里对 get方法返回的Events对象,进行一下说明:

      com.alibaba.otter.canal.store.model.Events

      public class Events<EVENT> implements Serializable {
          private static final long serialVersionUID = -7337454954300706044L;
          private PositionRange     positionRange    = new PositionRange();
          private List<EVENT>       events           = new ArrayList<EVENT>();
          //setters getters and toString
      }
      

      其中:CanalEntry.Entry和LogIdentity也都是protocol模块中的类:

      • LogIdentity记录这个Event的来源信息mysql地址(sourceAddress)和slaveId。

      • CanalEntry.Entry封装了binlog事件的数据

      MemoryEventStoreWithBuffer

      MemoryEventStoreWithBuffer是目前开源版本中的CanalEventStore接口的唯一实现,基于内存模式。当然你也可以进行扩展,提供一个基于本地文件存储方式的CanalEventStore实现。这样就可以一份数据让多个业务费进行订阅,只要独立维护消费位置元数据即可。然而,我不得不提醒你的是,基于本地文件的存储方式,一定要考虑好数据清理工作,否则会有大坑。

      如果一个库只有一个业务方订阅,其实根本也不用实现本地存储,使用基于内存模式的队列进行缓存即可。如果client消费的快,那么队列中的数据放入后就被取走,队列基本上一直是空的,实现本地存储也没意义;如果client消费的慢,队列基本上一直是满的,只要client来获取,总是能拿到数据,因此也没有必要实现本地存储。

      言归正传,下面对MemoryEventStoreWithBuffer的源码进行分析。

      MemoryEventStoreWithBuffer字段

      首先对MemoryEventStoreWithBuffer中定义的字段进行一下介绍,这是后面分析其他方法的基础,如下:

      public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge implements 
      CanalEventStore<Event>, CanalStoreScavenge {
          private static final long INIT_SQEUENCE = -1;
          private int               bufferSize    = 16 * 1024;
          // memsize的单位,默认为1kb大小
          private int               bufferMemUnit = 1024;     
          private int               indexMask;
          private Event[]           entries;
          // 记录下put/get/ack操作的三个下标,初始值都是-1
          // 代表当前put操作最后一次写操作发生的位置
          private AtomicLong   putSequence   = new AtomicLong(INIT_SQEUENCE);
          // 代表当前get操作读取的最后一条的位置 
          private AtomicLong   getSequence   = new AtomicLong(INIT_SQEUENCE); 
          // 代表当前ack操作的最后一条的位置
          private AtomicLong   ackSequence   = new AtomicLong(INIT_SQEUENCE); 
          // 记录下put/get/ack操作的三个memsize大小
          private AtomicLong   putMemSize    = new AtomicLong(0);
          private AtomicLong   getMemSize    = new AtomicLong(0);
          private AtomicLong   ackMemSize    = new AtomicLong(0);
          // 阻塞put/get操作控制信号
          private ReentrantLock     lock          = new ReentrantLock();
          private Condition    notFull       = lock.newCondition();
          private Condition    notEmpty      = lock.newCondition();
          // 默认为内存大小模式
          private BatchMode    batchMode     = BatchMode.ITEMSIZE;           
          private boolean     ddlIsolation  = false;
          ...
      }
      

      属性说明:

      • bufferSize、bufferMemUnit、batchMode、ddlIsolation、putSequence、getSequence、ackSequence:

        这几个属性前面已经介绍过,这里不再赘述。

      • entries:

        类型为Event[]数组,环形队列底层基于的Event[]数组,队列的大小就是bufferSize。关于如何使用数组来实现环形队列,可参考笔者的另一篇文章http://www.tianshouzhi.com/api/tutorials/basicalgorithm/43。

      • indexMask

        用于对putSequence、getSequence、ackSequence进行取余操作,前面已经介绍过canal通过位操作进行取余,其值为bufferSize-1 ,参见下文的start方法

      • putMemSize、getMemSize、ackMemSize:

      分别用于记录put/get/ack操作的event占用内存的累加值,都是从0开始计算。例如每put一个event,putMemSize就要增加这个event占用的内存大小;get和ack操作也是类似。这三个变量,都是在batchMode指定为MEMSIZE的情况下,才会发生作用。

      因为都是累加值,所以我们需要进行一些运算,才能得有有用的信息,如:

      计算出当前环形队列当前占用的内存大小

      环形队列当前占用的内存大小 = putMemSize - ackMemSize
      

      前面我们提到,batchMode为MEMSIZE时,需要限制环形队列中event占用的总内存,事实上在执行put操作前,就是通过这种方式计算出来当前大小,然后我们限制的bufferSize * bufferMemUnit大小进行比较。

      计算尚未被获取的事件占用的内存大小

      尚未被获取的事件占用的内存大小 = putMemSize - getMemSize
      

      batchMode除了对PUT操作有限制,对Get操作也有影响。Get操作可以指定一个batchSize,用于指定批量获取的大小。当batchMode为MEMSIZE时,其含义就在不再是记录数,而是要获取到总共占用 batchSize * bufferMemUnit 内存大小的事件数量。

      lock、notFull、notEmpty:

      阻塞put/get操作控制信号。notFull用于控制put操作,只有队列没满的情况下才能put。notEmpty控制get操作,只有队列不为空的情况下,才能get。put操作和get操作共用一把锁(lock)。 
      

      启动和停止方法

      MemoryEventStoreWithBuffer实现了CanalLifeCycle接口,因此实现了其定义的start、stop方法

      start启动方法

      start方法主要是初始化MemoryEventStoreWithBuffer内部的环形队列,其实就是初始化一下Event[]数组。 
      
      public void start() throws CanalStoreException {
          super.start();
          if (Integer.bitCount(bufferSize) != 1) {
              throw new IllegalArgumentException("bufferSize must be a power of 2");
          }
          indexMask = bufferSize - 1;//初始化indexMask,前面已经介绍过,用于通过位操作进行取余
          entries = new Event[bufferSize];//创建循环队列基于的底层数组,大小为bufferSize
      }
      

      stop停止方法
      stop方法作用是停止,在停止时会清空所有缓存的数据,将维护的相关状态变量设置为初始值。

      MemoryEventStoreWithBuffer#stop

      public void stop() throws CanalStoreException {
          super.stop();
          //清空所有缓存的数据,将维护的相关状态变量设置为初始值
          cleanAll();
      }
      

      在停止时,通过调用cleanAll方法清空所有缓存的数据。

      cleanAll方法是在CanalStoreScavenge接口中定义的,在MemoryEventStoreWithBuffer中进行了实现, 此外这个接口还定义了另外一个方法cleanUtil,在执行ack操作时会被调用,我们将在介绍ack方法时进行讲解。

      MemoryEventStoreWithBuffer#cleanAll

      public void cleanAll() throws CanalStoreException {
          final ReentrantLock lock = this.lock;
          lock.lock();
          try {
              //将Put/Get/Ack三个操作的位置都重置为初始状态-1
              putSequence.set(INIT_SQEUENCE);
              getSequence.set(INIT_SQEUENCE);
              ackSequence.set(INIT_SQEUENCE);
              //将Put/Get/Ack三个操作的memSize都重置为0
              putMemSize.set(0);
              getMemSize.set(0);
              ackMemSize.set(0);
              //将底层Event[]数组置为null,相当于清空所有数据
              entries = null;
          } finally {
              lock.unlock();
          }
      }
      

      本文转载自田守枝的Java技术博客,感谢作者的分享

      1 Reply Last reply Reply Quote 0
      • 1 / 1
      • First post
        Last post
      Copyright © 2020 ClouGence, Inc.备案号:浙ICP备20007605号-2