canal部署及源码分析

发布于:2023-03-27 ⋅ 阅读:(217) ⋅ 点赞:(0)

canal配置与部署说明

canal源码分析

https://github.com/alibaba/canal   
版本:1.1.5
打包:mvn clean package -DskipTests=true -Denv=release

参考

http://www.tianshouzhi.com/api/tutorials/canal/380

模块说明

模块 说明
admin admin控制器模块
common 提供了一些公共的工具类和接口
deployer 部署模块,对应canal-deployer.tar.gz,是canal的启动模块
server canal核心服务端
instance 每个server有多个instance
parser 解析binlog,依赖于dbsync、driver
sink parser和store链接器,用于数据过滤、分发
store 数据存储模块
filter 用于对binlog进行过滤
meta 增量订阅和消费信息管理器
client canal客户端
protocol client与server之间的通信协议

deployer

deploy模式是canal server的启动入口,它包含了一些初始化过程和与canal admin交互的逻辑,从startup.sh启动脚本中可以看到启动入口维CanalLauncher这个类。

CanalLauncher中主要包含了以下逻辑:

1. 加载本地配置文件
2. 初始化CanalStarter(托管canal的启动过程)
3. 如果配置了canal admin,创建与canal admin的连接并通过一个定时任务从admin定期拉取
   配置文件,如果配置文件发生变化,就通过CanalStarter进行重启

canal server的启动工作实际由CanalLauncher交给了CanalStarter,但在CanalStarter内部实际也是只做了一些周边工作,最终的启动是有CanalController实现的。在CanalStarter中主要做了以下几件事情:

1. 如果配置了mq producer,则跟spi来动态初始化对应的CanalMQProducer
2. 初始化CanalController并调用start()方法启动Canal Server
3. 如果设置了CanalMQProducer,启动这个producer
4. 设置jvm shutdown hook

CanalController是启动canal server的核心类,主要逻辑如下:

一、初始化
1. 将properties配置转换成Bean配置(InstanceConfig)
2. 初始化embededCanalServer
3. 初始化CanalServer(提供了tcp消费模式)
4. 如果配置的zkServer,就初始化zkClient,并初始化zk上的数据路径
5. 初始化ServerRunningMonitor
    processActiveEnter  (抢占了zk节点,变成active状态时执行)
    processActiveExit   (退出active状态时执行)
    processStart  (启动canal server时执行)
    processStop   (终止canal server时执行)
6. 初始化defaultAction
7. 初始化instanceConfigMonitors

二、启动(CanalController.start())
1. 初始化zk cluster路径:otter/canal/cluster/192.168.233.1:11111
2. 启动embededCanalServer
3. 启动每一个destination上的ServerRunningMonitor 运行监控器
     ServerRunningMonitor.start()    由CanalController.start触发
4. 启动instanceConfigMonitors 配置监控器
     InstanceConfigMonitor.start()   由CanalController.start触发
5. 启动CanalServer(带tpc端口的)

至此,deployer模块的启动工作就做完了,总结起来主要就是做了几件事情:

  1. 初始化配置
  2. 与admin加你连接
  3. 启动embededCanalServer
  4. 初始化对外提供数据的方式(tcp/mq)
  5. 在zookeeper上初始化节点数据

server

canal提供了两种常用server,分别是CanalServerWithNetty和CanalServerWithEmbedded,CanalServerWithNetty实际是上在CanalServerWithEmbedded的基础上增加了一个对外提供服务的NettyServer。

CanalServerWithNetty中向Netty的pipline中初始化了一个SessionHandler,在SessionHandler处理可客户端的请求:

bootstrap.setPipelineFactory(() -> {
            ChannelPipeline pipelines = Channels.pipeline();
            pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder());
            // support to maintain child socket channel.
            pipelines.addLast(HandshakeInitializationHandler.class.getName(),
                new HandshakeInitializationHandler(childGroups));
            pipelines.addLast(ClientAuthenticationHandler.class.getName(),
                new ClientAuthenticationHandler(embeddedServer));

            SessionHandler sessionHandler = new SessionHandler(embeddedServer);
            pipelines.addLast(SessionHandler.class.getName(), sessionHandler);
            return pipelines;
        });

SessionHandler

 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 
   switch (packet.getType()) { 
         case SUBSCRIPTION:   //订阅
              embeddedServer.subscribe(clientIdentity);
         case UNSUBSCRIPTION:  //取消订阅
              embeddedServer.unsubscribe(clientIdentity);
         case GET:  //获取数据
               message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
         case CLIENTACK:  //ack
               embeddedServer.ack(clientIdentity, ack.getBatchId());
         case CLIENTROLLBACK:  //回滚
               embeddedServer.rollback(clientIdentity, rollback.getBatchId());
         default:
    }
}

以上是通过tcp端口向外输出数据的方式,下面看一下通过mq的方式向外输出数据的方式。

在CanalStarter中初始化的CanalMQProducer会set到CanalController中,然后,ServerRunninngMonitor在processActiveEnter中会为每一个destination启动对应MQProducer

public void processActiveEnter() {
    if (canalMQStarter != null) {
        canalMQStarter.startDestination(destination);
    }
}

public synchronized void startDestination(String destination) {
        CanalInstance canalInstance = canalServer.getCanalInstances().get(destination);
        if (canalInstance != null) {
            stopDestination(destination);
            //初始化一个用于消费的任务
            CanalMQRunnable canalMQRunnable = new CanalMQRunnable(destination);
            canalMQWorks.put(canalInstance.getDestination(), canalMQRunnable);
            //将任务交给线程池运行
            executorService.execute(canalMQRunnable);
            logger.info("## Start the MQ work of destination:" + destination);
        }
    }

CanalMQRunnable中的work方法是真正处理数据的地方

while (running && destinationRunning.get()) { 
    CanalInstance canalInstance = canalServer.getCanalInstances().get(destination);
    //消息订阅
    canalServer.subscribe(clientIdentity);
    while (running && destinationRunning.get()) { 
         //从canal server中获取消息
         message = canalServer.getWithoutAck(clientIdentity, getBatchSize);
         //向mq发送消息
         canalMQProducer.send(canalDestination, message, new Callback() {
            @Override
            public void commit() {
                canalServer.ack(clientIdentity, batchId); // 提交确认
            }

            @Override
            public void rollback() {
                //回滚
                canalServer.rollback(clientIdentity, batchId);
            }
        }
    }
}

实际上内部就是从canal server消费数据然后发送的消息队列中,只不过canal对这个过程进行了规范化的封装。

CanalServerWithEmbedded实现了CanalService接口,改接口定义了订阅、取消订阅、拉取数据、ack确认、回滚等方法,是进行数据处理的核心方法。

订阅:

/**
     * 客户端订阅,重复订阅时会更新对应的filter信息
     *
     * 实际上订阅的动作,就是将客户端信息注册到MetaManager中,
     * 然后从EventStore中获取数据时,根据这些MetaManager来定位数据
     * 由于canal采用的时pull模型,所以拉取数据的动是由客户端触发的
     */
 @Override
    public void subscribe(ClientIdentity clientIdentity) throws CanalServerException { 
    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
    canalInstance.getMetaManager().subscribe(clientIdentity);
}

取消订阅:

/**
     * 取消订阅
     *
     * 取消订阅实际上就是将当前client
     * 从MetaManager中移除。
     * 因为消费数据的时候,代码会检查当前客户端id是否在MetaManager中
     * 如果不存在就不会返回数据
     */
@Override
    public void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException {
    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
        canalInstance.getMetaManager().unsubscribe(clientIdentity); // 执行一下meta订阅
}

获取数据:

@Override
    public Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit) {
    checkSubscribe(clientIdentity);
        CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
        synchronized (canalInstance) { 
        // 获取到流式数据中的最后一批获取的位置
            PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity);
            Events<Event> events = null;
            Position start = canalInstance.getMetaManager().getCursor(clientIdentity);
             //从指定的位置开始,从EventStore中获取Event数据
            events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);
            //解析event
            entrys = Lists.transform(events.getEvents(), Event::getRawEntry);
            //返回Message
            return new Message(batchId, raw, entrys);
    }
}

Ack操作:

@Override
    public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {
        checkSubscribe(clientIdentity);
        CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
        //移除batchId
        positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId);
        //更新cursor
        canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
}

meta

meta模块的主要作用是提供统一的元数据管理接口,元数据包括订阅的客户端、binlog消费的偏移量和拉取数据时的batchId。

MetaManager的实现方式主要有五个:

  • FileMixedMetaManager:内存+定时刷新到本地文件系统
  • MemoryMetaManager:内存
  • MixedMetaManager:内存+实时刷新到zookeeper
  • PeriodMixedMetaManager:内存+定时刷新到zookeeper(默认实现方式)
  • ZookeeperMetaManager:实时刷新到zookeeper
  • 这几种MetaManager中,只有ZookeeperMetaManager能够保证可靠性和数据一致性,但性能比较差(canal Issue中有人提供了RedisMetaManager的实现,但还没有被官方merge)。

以MemoryMetaManager为例看一下数据结构:

//某个destination上注册的client,进行subscribe时,会向这个map注册
protected Map<String, List<ClientIdentity>>              destinations;
//某个client对应的消息batchId,生成新的batchId或ack时会操作这个map
protected Map<ClientIdentity, MemoryClientIdentityBatch> batches;
//某个client消费消息的偏移量
protected Map<ClientIdentity, Position>                  cursors;
  • 订阅

    public synchronized void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
      //获取指定destination的已经注册的客户端
      List<ClientIdentity> clientIdentitys = destinations.get(clientIdentity.getDestination());
      //判断是否存在,如果存在,先移除
      if (clientIdentitys.contains(clientIdentity)) {
          clientIdentitys.remove(clientIdentity);
      }
      //新增
      clientIdentitys.add(clientIdentity);
    }
  • 取消订阅

    /**
     * 订阅之后,Meta中就存在了给定的client
     * 当消费数据的时候,CanalServer首先会根据client到Meta中找对应的消费偏移量
     * 如果Meta中不存在这个client,也就没有办法消费数据了
     * */
    public synchronized void unsubscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
      List<ClientIdentity> clientIdentitys = destinations.get(clientIdentity.getDestination());
      if (clientIdentitys != null && clientIdentitys.contains(clientIdentity)) {
          clientIdentitys.remove(clientIdentity);
      }
    }
  • 获取消息偏移量

    /**
     * 获取某个客户端消费数据的偏移量
     * */
    public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException {
      return cursors.get(clientIdentity);
    }
    
    public static class MemoryClientIdentityBatch {
     
      private ClientIdentity           clientIdentity;
      private Map<Long, PositionRange> batches          = new MapMaker().makeMap();
      //这个就是batchId是生成器,依次递增
      private AtomicLong               atomicMaxBatchId = new AtomicLong(1);
    
      public synchronized Long addPositionRange(PositionRange positionRange) {
          //生成batchid
          Long batchId = atomicMaxBatchId.getAndIncrement();
          //把batchId和偏移量关联起来,表示当前彼此从指定的range内消费数据
          batches.put(batchId, positionRange);
          return batchId;
      }
    
      //ack或rollback后,移除的batchId必须按顺序处理  
    public synchronized PositionRange removePositionRange(Long batchId) {
      if (batches.containsKey(batchId)) {
          Long minBatchId = Collections.min(batches.keySet());
          if (!minBatchId.equals(batchId)) {
              // 检查一下提交的ack/rollback,必须按batchId分出去的顺序提交,否则容易出现丢数据
              throw new CanalMetaManagerException(String.format("batchId:%d is not the firstly:%d",
                  batchId,
                  minBatchId));
          }
          return batches.remove(batchId);
      } else {
          return null;
      }
    }
    }

    消费数据的逻辑实际就是,生成batchId,并将batchId与binlog的偏移量position进行关联,添加到MetaManager中,然后返回数据给客户的,客户端ack/rollback后,将batchId和position的关联关系从MetaManager中移除。

instance

一个destination对应一个instance,核心接口是CanalInstance,在instance内部包含了MetaManager、EventParser、EventSink、EventStore

  • MetaManager 原数据管理器
  • EventParser 解析binlog
  • EventSink 数据到EventStore的一个缓冲器
  • EventStore 存储Event的地方

CanalInstance有两种类型的实现:

  • manager 基于canal admin的实现
  • spring 基于本地spring配置文件的实现

通过看AbstractCanalInstance源码发现,实际上CanalInstance并没有相关的数据处理逻辑,只是对相关组件进行了一个封装,并通过一系列的getter方法将这些组件输出出去。

start()和stop()是CanalInstance的核心功能,组件的启动顺序和停止顺序正好相反,这取决于各个组件之间的依赖关系。

@Override
public void start() {
    super.start();
    if (!metaManager.isStart()) {
        metaManager.start();
    }
 
    if (!alarmHandler.isStart()) {
        alarmHandler.start();
    }
 
    if (!eventStore.isStart()) {
        eventStore.start();
    }
 
    if (!eventSink.isStart()) {
        eventSink.start();
    }
 
    if (!eventParser.isStart()) {
        beforeStartEventParser(eventParser);
        eventParser.start();
        afterStartEventParser(eventParser);
    }
    logger.info("start successful....");
}
  
  
@Override
public void stop() {
    if (eventParser.isStart()) {
        beforeStopEventParser(eventParser);
        eventParser.stop();
        afterStopEventParser(eventParser);
    }
 
    if (eventSink.isStart()) {
        eventSink.stop();
    }
 
    if (eventStore.isStart()) {
        eventStore.stop();
    }
 
    if (metaManager.isStart()) {
        metaManager.stop();
    }
 
    if (alarmHandler.isStart()) {
        alarmHandler.stop();
    }
 
    logger.info("stop successful....");
}

sink

sink是parser和store中间的一个缓冲区,主要功能是利用filter来对event数据进行过滤,将符合用户配的规则的数据存储到store中。

sink模块提供了两种类型的EventSink:

  • EntryEventSink
  • GroupEventSink 针对GroupEventParser多表(分库分表的场景),能够对event进行合并处理
    EventSink的核心逻辑:

    protected boolean doFilter(CanalEntry.Entry entry) {
      if (filter != null && entry.getEntryType() == EntryType.ROWDATA) {
          //获取表名称
          String name = getSchemaNameAndTableName(entry);
          //调用filter,大意可以理解成是匹配正则表达式
          boolean need = filter.filter(name);
          if (!need) {
              logger.debug("filter name[{}] entry : {}:{}",
                  name,
                  entry.getHeader().getLogfileName(),
                  entry.getHeader().getLogfileOffset());
          }
     
          return need;
      } else {
          return true;
      }
    }

    doSink方法是将event放入store的核心逻辑

    protected boolean doSink(List<Event> events) {
      for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
          events = handler.before(events);
      }
      long blockingStart = 0L;
      int fullTimes = 0;
      do {
          //尝试将event放入store中,
          //这里没有调用阻塞的put,而是调用了非阻塞的tryPut
          //如果store满了 导致tryPut失败,自己处理超时等待
          if (eventStore.tryPut(events)) {
              if (fullTimes > 0) {
                  eventsSinkBlockingTime.addAndGet(System.nanoTime() - blockingStart);
              }
              for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
                  events = handler.after(events);
              }
              return true;
          } else {
              if (fullTimes == 0) {
                  blockingStart = System.nanoTime();
              }
              //当tryPut失败后,进行等待
              applyWait(++fullTimes);
              if (fullTimes % 100 == 0) {
                  long nextStart = System.nanoTime();
                  eventsSinkBlockingTime.addAndGet(nextStart - blockingStart);
                  blockingStart = nextStart;
              }
          }
     
          for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {
              events = handler.retry(events);
          }
     
      } while (running && !Thread.interrupted());
      return false;
    }
    
    private void applyWait(int fullTimes) {
      int newFullTimes = fullTimes > maxFullTimes ? maxFullTimes : fullTimes;
      //前3次不进行挂起,只是尝试让渡cpu执行全,主要是为了提高性能
      if (fullTimes <= 3) { // 3次以内
          Thread.yield();
      } else { // 超过3次,最多只sleep 10ms
          LockSupport.parkNanos(1000 * 1000L * newFullTimes);
      }
    }

connector

connector模块是canal server向外输出数据方式的抽象,包括了kafka/rabbitmq/rocketmq/tcp,根据serverMode,利用spi动态加载

deployer模块的CanalStarter:

ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);