canal配置与部署说明
canal源码分析
https://github.com/alibaba/canal
版本:1.1.5
打包:mvn clean package -DskipTests=true -Denv=release
参考
http://www.tianshouzhi.com/api/tutorials/canal/380
模块说明
模块 | 说明 |
---|---|
admin | admin控制器模块 |
common | 提供了一些公共的工具类和接口 |
deployer | 部署模块,对应canal-deployer.tar.gz,是canal的启动模块 |
server | canal核心服务端 |
instance | 每个server有多个instance |
parser | 解析binlog,依赖于dbsync、driver |
sink | parser和store链接器,用于数据过滤、分发 |
store | 数据存储模块 |
filter | 用于对binlog进行过滤 |
meta | 增量订阅和消费信息管理器 |
client | canal客户端 |
protocol | client与server之间的通信协议 |
deployer
deploy模式是canal server的启动入口,它包含了一些初始化过程和与canal admin交互的逻辑,从startup.sh
启动脚本中可以看到启动入口维CanalLauncher
这个类。
CanalLauncher中主要包含了以下逻辑:
1. 加载本地配置文件
2. 初始化CanalStarter(托管canal的启动过程)
3. 如果配置了canal admin,创建与canal admin的连接并通过一个定时任务从admin定期拉取
配置文件,如果配置文件发生变化,就通过CanalStarter进行重启
canal server的启动工作实际由CanalLauncher交给了CanalStarter,但在CanalStarter内部实际也是只做了一些周边工作,最终的启动是有CanalController实现的。在CanalStarter中主要做了以下几件事情:
1. 如果配置了mq producer,则跟spi来动态初始化对应的CanalMQProducer
2. 初始化CanalController并调用start()方法启动Canal Server
3. 如果设置了CanalMQProducer,启动这个producer
4. 设置jvm shutdown hook
CanalController是启动canal server的核心类,主要逻辑如下:
一、初始化
1. 将properties配置转换成Bean配置(InstanceConfig)
2. 初始化embededCanalServer
3. 初始化CanalServer(提供了tcp消费模式)
4. 如果配置的zkServer,就初始化zkClient,并初始化zk上的数据路径
5. 初始化ServerRunningMonitor
processActiveEnter (抢占了zk节点,变成active状态时执行)
processActiveExit (退出active状态时执行)
processStart (启动canal server时执行)
processStop (终止canal server时执行)
6. 初始化defaultAction
7. 初始化instanceConfigMonitors
二、启动(CanalController.start())
1. 初始化zk cluster路径:otter/canal/cluster/192.168.233.1:11111
2. 启动embededCanalServer
3. 启动每一个destination上的ServerRunningMonitor 运行监控器
ServerRunningMonitor.start() 由CanalController.start触发
4. 启动instanceConfigMonitors 配置监控器
InstanceConfigMonitor.start() 由CanalController.start触发
5. 启动CanalServer(带tpc端口的)
至此,deployer模块的启动工作就做完了,总结起来主要就是做了几件事情:
- 初始化配置
- 与admin加你连接
- 启动embededCanalServer
- 初始化对外提供数据的方式(tcp/mq)
- 在zookeeper上初始化节点数据
server
canal提供了两种常用server,分别是CanalServerWithNetty和CanalServerWithEmbedded,CanalServerWithNetty实际是上在CanalServerWithEmbedded的基础上增加了一个对外提供服务的NettyServer。
CanalServerWithNetty中向Netty的pipline中初始化了一个SessionHandler,在SessionHandler处理可客户端的请求:
bootstrap.setPipelineFactory(() -> {
ChannelPipeline pipelines = Channels.pipeline();
pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder());
// support to maintain child socket channel.
pipelines.addLast(HandshakeInitializationHandler.class.getName(),
new HandshakeInitializationHandler(childGroups));
pipelines.addLast(ClientAuthenticationHandler.class.getName(),
new ClientAuthenticationHandler(embeddedServer));
SessionHandler sessionHandler = new SessionHandler(embeddedServer);
pipelines.addLast(SessionHandler.class.getName(), sessionHandler);
return pipelines;
});
SessionHandler
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
switch (packet.getType()) {
case SUBSCRIPTION: //订阅
embeddedServer.subscribe(clientIdentity);
case UNSUBSCRIPTION: //取消订阅
embeddedServer.unsubscribe(clientIdentity);
case GET: //获取数据
message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
case CLIENTACK: //ack
embeddedServer.ack(clientIdentity, ack.getBatchId());
case CLIENTROLLBACK: //回滚
embeddedServer.rollback(clientIdentity, rollback.getBatchId());
default:
}
}
以上是通过tcp端口向外输出数据的方式,下面看一下通过mq的方式向外输出数据的方式。
在CanalStarter中初始化的CanalMQProducer会set到CanalController中,然后,ServerRunninngMonitor在processActiveEnter中会为每一个destination启动对应MQProducer
public void processActiveEnter() {
if (canalMQStarter != null) {
canalMQStarter.startDestination(destination);
}
}
public synchronized void startDestination(String destination) {
CanalInstance canalInstance = canalServer.getCanalInstances().get(destination);
if (canalInstance != null) {
stopDestination(destination);
//初始化一个用于消费的任务
CanalMQRunnable canalMQRunnable = new CanalMQRunnable(destination);
canalMQWorks.put(canalInstance.getDestination(), canalMQRunnable);
//将任务交给线程池运行
executorService.execute(canalMQRunnable);
logger.info("## Start the MQ work of destination:" + destination);
}
}
CanalMQRunnable中的work方法是真正处理数据的地方
while (running && destinationRunning.get()) {
CanalInstance canalInstance = canalServer.getCanalInstances().get(destination);
//消息订阅
canalServer.subscribe(clientIdentity);
while (running && destinationRunning.get()) {
//从canal server中获取消息
message = canalServer.getWithoutAck(clientIdentity, getBatchSize);
//向mq发送消息
canalMQProducer.send(canalDestination, message, new Callback() {
@Override
public void commit() {
canalServer.ack(clientIdentity, batchId); // 提交确认
}
@Override
public void rollback() {
//回滚
canalServer.rollback(clientIdentity, batchId);
}
}
}
}
实际上内部就是从canal server消费数据然后发送的消息队列中,只不过canal对这个过程进行了规范化的封装。
CanalServerWithEmbedded实现了CanalService接口,改接口定义了订阅、取消订阅、拉取数据、ack确认、回滚等方法,是进行数据处理的核心方法。
订阅:
/**
* 客户端订阅,重复订阅时会更新对应的filter信息
*
* 实际上订阅的动作,就是将客户端信息注册到MetaManager中,
* 然后从EventStore中获取数据时,根据这些MetaManager来定位数据
* 由于canal采用的时pull模型,所以拉取数据的动是由客户端触发的
*/
@Override
public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
canalInstance.getMetaManager().subscribe(clientIdentity);
}
取消订阅:
/**
* 取消订阅
*
* 取消订阅实际上就是将当前client
* 从MetaManager中移除。
* 因为消费数据的时候,代码会检查当前客户端id是否在MetaManager中
* 如果不存在就不会返回数据
*/
@Override
public void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException {
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
canalInstance.getMetaManager().unsubscribe(clientIdentity); // 执行一下meta订阅
}
获取数据:
@Override
public Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit) {
checkSubscribe(clientIdentity);
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
synchronized (canalInstance) {
// 获取到流式数据中的最后一批获取的位置
PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity);
Events<Event> events = null;
Position start = canalInstance.getMetaManager().getCursor(clientIdentity);
//从指定的位置开始,从EventStore中获取Event数据
events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);
//解析event
entrys = Lists.transform(events.getEvents(), Event::getRawEntry);
//返回Message
return new Message(batchId, raw, entrys);
}
}
Ack操作:
@Override
public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {
checkSubscribe(clientIdentity);
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
//移除batchId
positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId);
//更新cursor
canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
}
meta
meta模块的主要作用是提供统一的元数据管理接口,元数据包括订阅的客户端、binlog消费的偏移量和拉取数据时的batchId。
MetaManager的实现方式主要有五个:
- FileMixedMetaManager:内存+定时刷新到本地文件系统
- MemoryMetaManager:内存
- MixedMetaManager:内存+实时刷新到zookeeper
- PeriodMixedMetaManager:内存+定时刷新到zookeeper(默认实现方式)
- ZookeeperMetaManager:实时刷新到zookeeper
- 这几种MetaManager中,只有ZookeeperMetaManager能够保证可靠性和数据一致性,但性能比较差(canal Issue中有人提供了RedisMetaManager的实现,但还没有被官方merge)。
以MemoryMetaManager为例看一下数据结构:
//某个destination上注册的client,进行subscribe时,会向这个map注册
protected Map<String, List<ClientIdentity>> destinations;
//某个client对应的消息batchId,生成新的batchId或ack时会操作这个map
protected Map<ClientIdentity, MemoryClientIdentityBatch> batches;
//某个client消费消息的偏移量
protected Map<ClientIdentity, Position> cursors;
订阅
public synchronized void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException { //获取指定destination的已经注册的客户端 List<ClientIdentity> clientIdentitys = destinations.get(clientIdentity.getDestination()); //判断是否存在,如果存在,先移除 if (clientIdentitys.contains(clientIdentity)) { clientIdentitys.remove(clientIdentity); } //新增 clientIdentitys.add(clientIdentity); }
取消订阅
/** * 订阅之后,Meta中就存在了给定的client * 当消费数据的时候,CanalServer首先会根据client到Meta中找对应的消费偏移量 * 如果Meta中不存在这个client,也就没有办法消费数据了 * */ public synchronized void unsubscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException { List<ClientIdentity> clientIdentitys = destinations.get(clientIdentity.getDestination()); if (clientIdentitys != null && clientIdentitys.contains(clientIdentity)) { clientIdentitys.remove(clientIdentity); } }
获取消息偏移量
/** * 获取某个客户端消费数据的偏移量 * */ public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException { return cursors.get(clientIdentity); } public static class MemoryClientIdentityBatch { private ClientIdentity clientIdentity; private Map<Long, PositionRange> batches = new MapMaker().makeMap(); //这个就是batchId是生成器,依次递增 private AtomicLong atomicMaxBatchId = new AtomicLong(1); public synchronized Long addPositionRange(PositionRange positionRange) { //生成batchid Long batchId = atomicMaxBatchId.getAndIncrement(); //把batchId和偏移量关联起来,表示当前彼此从指定的range内消费数据 batches.put(batchId, positionRange); return batchId; } //ack或rollback后,移除的batchId必须按顺序处理 public synchronized PositionRange removePositionRange(Long batchId) { if (batches.containsKey(batchId)) { Long minBatchId = Collections.min(batches.keySet()); if (!minBatchId.equals(batchId)) { // 检查一下提交的ack/rollback,必须按batchId分出去的顺序提交,否则容易出现丢数据 throw new CanalMetaManagerException(String.format("batchId:%d is not the firstly:%d", batchId, minBatchId)); } return batches.remove(batchId); } else { return null; } } }
消费数据的逻辑实际就是,生成batchId,并将batchId与binlog的偏移量position进行关联,添加到MetaManager中,然后返回数据给客户的,客户端ack/rollback后,将batchId和position的关联关系从MetaManager中移除。
instance
一个destination对应一个instance,核心接口是CanalInstance,在instance内部包含了MetaManager、EventParser、EventSink、EventStore
- MetaManager 原数据管理器
- EventParser 解析binlog
- EventSink 数据到EventStore的一个缓冲器
- EventStore 存储Event的地方
CanalInstance有两种类型的实现:
- manager 基于canal admin的实现
- spring 基于本地spring配置文件的实现
通过看AbstractCanalInstance源码发现,实际上CanalInstance并没有相关的数据处理逻辑,只是对相关组件进行了一个封装,并通过一系列的getter方法将这些组件输出出去。
start()和stop()是CanalInstance的核心功能,组件的启动顺序和停止顺序正好相反,这取决于各个组件之间的依赖关系。
@Override
public void start() {
super.start();
if (!metaManager.isStart()) {
metaManager.start();
}
if (!alarmHandler.isStart()) {
alarmHandler.start();
}
if (!eventStore.isStart()) {
eventStore.start();
}
if (!eventSink.isStart()) {
eventSink.start();
}
if (!eventParser.isStart()) {
beforeStartEventParser(eventParser);
eventParser.start();
afterStartEventParser(eventParser);
}
logger.info("start successful....");
}
@Override
public void stop() {
if (eventParser.isStart()) {
beforeStopEventParser(eventParser);
eventParser.stop();
afterStopEventParser(eventParser);
}
if (eventSink.isStart()) {
eventSink.stop();
}
if (eventStore.isStart()) {
eventStore.stop();
}
if (metaManager.isStart()) {
metaManager.stop();
}
if (alarmHandler.isStart()) {
alarmHandler.stop();
}
logger.info("stop successful....");
}
sink
sink是parser和store中间的一个缓冲区,主要功能是利用filter来对event数据进行过滤,将符合用户配的规则的数据存储到store中。
sink模块提供了两种类型的EventSink:
- EntryEventSink
GroupEventSink 针对GroupEventParser多表(分库分表的场景),能够对event进行合并处理
EventSink的核心逻辑:protected boolean doFilter(CanalEntry.Entry entry) { if (filter != null && entry.getEntryType() == EntryType.ROWDATA) { //获取表名称 String name = getSchemaNameAndTableName(entry); //调用filter,大意可以理解成是匹配正则表达式 boolean need = filter.filter(name); if (!need) { logger.debug("filter name[{}] entry : {}:{}", name, entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset()); } return need; } else { return true; } }
doSink方法是将event放入store的核心逻辑
protected boolean doSink(List<Event> events) { for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) { events = handler.before(events); } long blockingStart = 0L; int fullTimes = 0; do { //尝试将event放入store中, //这里没有调用阻塞的put,而是调用了非阻塞的tryPut //如果store满了 导致tryPut失败,自己处理超时等待 if (eventStore.tryPut(events)) { if (fullTimes > 0) { eventsSinkBlockingTime.addAndGet(System.nanoTime() - blockingStart); } for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) { events = handler.after(events); } return true; } else { if (fullTimes == 0) { blockingStart = System.nanoTime(); } //当tryPut失败后,进行等待 applyWait(++fullTimes); if (fullTimes % 100 == 0) { long nextStart = System.nanoTime(); eventsSinkBlockingTime.addAndGet(nextStart - blockingStart); blockingStart = nextStart; } } for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) { events = handler.retry(events); } } while (running && !Thread.interrupted()); return false; } private void applyWait(int fullTimes) { int newFullTimes = fullTimes > maxFullTimes ? maxFullTimes : fullTimes; //前3次不进行挂起,只是尝试让渡cpu执行全,主要是为了提高性能 if (fullTimes <= 3) { // 3次以内 Thread.yield(); } else { // 超过3次,最多只sleep 10ms LockSupport.parkNanos(1000 * 1000L * newFullTimes); } }
connector
connector模块是canal server向外输出数据方式的抽象,包括了kafka/rabbitmq/rocketmq/tcp,根据serverMode,利用spi动态加载
deployer模块的CanalStarter:
ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);