canal源码解析系列(2):deployer模块(上)
-
系列文章索引:
canal有两种使用方式:1、独立部署 2、内嵌到应用中。 deployer模块主要用于独立部署canal server。关于这两种方式的区别,请参见server模块源码分析。deployer模块源码目录结构如下所示:
在独立部署canal时,需要首先对canal的源码进行打包
mvn clean install -Dmaven.test.skip -Denv=release
以本教程使用1.0.24版本为例,打包后会在target目录生成一个以下两个文件:
其中canal.deployer-1.0.24.tar.gz就是canal的独立部署包。解压缩后,目录如下所示。其中bin目录和conf目录(包括子目录spring)中的所有文件,都来自于deployer模块。canal ├── bin │ ├── startup.bat │ ├── startup.sh │ └── stop.sh ├── conf │ ├── canal.properties │ ├── example │ │ └── instance.properties │ ├── logback.xml │ └── spring │ ├── default-instance.xml │ ├── file-instance.xml │ ├── group-instance.xml │ ├── local-instance.xml │ └── memory-instance.xml ├── lib │ └── ....依赖的各种jar └── logs
deployer模块主要完成以下功能:
1、读取canal,properties配置文件
2、启动canal server,监听canal client的请求
3、启动canal instance,连接mysql数据库,伪装成slave,解析binlog
4、在canal的运行过程中,监听配置文件的变化
1、启动和停止脚本
bin目录中包含了canal的启动和停止脚本startup.sh和stop.sh,当我们要启动canal时,只需要输入以下命令即可
sh bin/startup.sh
在windows环境下,可以直接双击startup.bat。
在startup.sh脚本内,会调用com.alibaba.otter.canal.deployer.CanalLauncher类来进行启动,这是分析Canal源码的入口类,如下图所示:
同时,startup.sh还会在bin目录下生成一个canal.pid文件,用于存储canal的进程id。当停止canal的时候
sh bin/stop.sh
会根据canal.pid文件中记录的进程id,kill掉canal进程,并且删除这个文件。
2、CannalLauncher
CanalLauncher是整个源码分析的入口类,代码相当简单。步骤是:
1、读取canal.properties文件中的配置
2、利用读取的配置构造一个CanalController实例,将所有的启动操作都委派给CanalController进行处理。
3、最后注册一个钩子函数,在JVM停止时同时也停止canal server。com.alibaba.otter.canal.deployer.CanalLauncher
public class CanalLauncher { private static final String CLASSPATH_URL_PREFIX = "classpath:"; private static final Logger logger = LoggerFactory.getLogger(CanalLauncher.class); public static void main(String[] args) throws Throwable { try { //1、读取canal.properties文件中配置,默认读取classpath下的canal.properties String conf = System.getProperty("canal.conf", "classpath:canal.properties"); Properties properties = new Properties(); if (conf.startsWith(CLASSPATH_URL_PREFIX)) { conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX); properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf)); } else { properties.load(new FileInputStream(conf)); } //2、启动canal,首先将properties对象传递给CanalController,然后调用其start方法启动 logger.info("## start the canal server."); final CanalController controller = new CanalController(properties); controller.start(); logger.info("## the canal server is running now ......"); //3、关闭canal,通过添加JVM的钩子,JVM停止前会回调run方法,其内部调用controller.stop()方法进行停止 Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { try { logger.info("## stop the canal server"); controller.stop(); } catch (Throwable e) { logger.warn("##something goes wrong when stopping canal Server:\n{}", ExceptionUtils.getFullStackTrace(e)); } finally { logger.info("## canal server is down."); } } }); } catch (Throwable e) { logger.error("## Something goes wrong when starting up the canal Server:\n{}", ExceptionUtils.getFullStackTrace(e)); System.exit(0); } } }
可以看到,CanalLauncher实际上只是负责读取canal.properties配置文件,然后构造CanalController对象,并通过其start和stop方法来开启和停止canal。因此,如果说CanalLauncher是canal源码分析的入口类,那么CanalController就是canal源码分析的核心类。
3、CanalController
在CanalController的构造方法中,会对配置文件内容解析,初始化相关成员变量,做好canal server的启动前的准备工作,之后在CanalLauncher中调用CanalController.start方法来启动。
CanalController中定义的相关字段和构造方法,如下所示:
public class CanalController { private static final Logger logger = LoggerFactory.getLogger(CanalController.class); private Long cid; private String ip; private int port; // 默认使用spring的方式载入 private Map<String, InstanceConfig> instanceConfigs; private InstanceConfig globalInstanceConfig; private Map<String, CanalConfigClient> managerClients; // 监听instance config的变化 private boolean autoScan = true; private InstanceAction defaultAction; private Map<InstanceMode, InstanceConfigMonitor> instanceConfigMonitors; private CanalServerWithEmbedded embededCanalServer; private CanalServerWithNetty canalServer; private CanalInstanceGenerator instanceGenerator; private ZkClientx zkclientx; public CanalController(){ this(System.getProperties()); } public CanalController(final Properties properties){ managerClients = MigrateMap.makeComputingMap(new Function<String, CanalConfigClient>() { public CanalConfigClient apply(String managerAddress) { return getManagerClient(managerAddress); } }); //1、配置解析 globalInstanceConfig = initGlobalConfig(properties); instanceConfigs = new MapMaker().makeMap(); initInstanceConfig(properties); // 2、准备canal server cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID)); ip = getProperty(properties, CanalConstants.CANAL_IP); port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT)); embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator canalServer = CanalServerWithNetty.instance(); canalServer.setIp(ip); canalServer.setPort(port); //3、初始化zk相关代码 // 处理下ip为空,默认使用hostIp暴露到zk中 if (StringUtils.isEmpty(ip)) { ip = AddressUtils.getHostIp(); } final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS); if (StringUtils.isNotEmpty(zkServers)) { zkclientx = ZkClientx.getZkClient(zkServers); // 初始化系统目录 zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true); zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true); } //4 CanalInstance运行状态监控 final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port); ServerRunningMonitors.setServerData(serverData); ServerRunningMonitors.setRunningMonitors(//...); //5、autoScan机制相关代码 autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN)); if (autoScan) { defaultAction = new InstanceAction() {//....}; instanceConfigMonitors = //.... } } .... }
为了读者能够尽量容易的看出CanalController的构造方法中都做了什么,上面代码片段中省略了部分代码。这样,我们可以很明显的看出来, ,在CanalController构造方法中的代码分划分为了固定的几个处理步骤,下面按照几个步骤的划分,逐一进行讲解,并详细的介绍CanalController中定义的各个字段的作用。
3.1 配置解析相关代码
// 初始化全局参数设置 globalInstanceConfig = initGlobalConfig(properties); instanceConfigs = new MapMaker().makeMap(); // 初始化instance config initInstanceConfig(properties);
3.1.1 globalInstanceConfig字段
表示canal instance的全局配置,类型为InstanceConfig,通过initGlobalConfig方法进行初始化。主要用于解析canal.properties以下几个配置项:
-
canal.instance.global.mode:确定canal instance配置加载方式,取值有manager|spring两种方式
-
canal.instance.global.lazy:确定canal instance是否延迟初始化
-
canal.instance.global.manager.address:配置中心地址。如果canal.instance.global.mode=manager,需要提供此配置项
-
canal.instance.global.spring.xml:spring配置文件路径。如果canal.instance.global.mode=spring,需要提供此配置项
initGlobalConfig源码如下所示:
private InstanceConfig initGlobalConfig(Properties properties) { InstanceConfig globalConfig = new InstanceConfig(); //读取canal.instance.global.mode String modeStr = getProperty(properties, CanalConstants.getInstanceModeKey(CanalConstants.GLOBAL_NAME)); if (StringUtils.isNotEmpty(modeStr)) { //将modelStr转成枚举InstanceMode,这是一个枚举类,只有2个取值,SPRING\MANAGER,对应两种配置方式 globalConfig.setMode(InstanceMode.valueOf(StringUtils.upperCase(modeStr))); } //读取canal.instance.global.lazy String lazyStr = getProperty(properties, CanalConstants.getInstancLazyKey(CanalConstants.GLOBAL_NAME)); if (StringUtils.isNotEmpty(lazyStr)) { globalConfig.setLazy(Boolean.valueOf(lazyStr)); } //读取canal.instance.global.manager.address String managerAddress = getProperty(properties, CanalConstants.getInstanceManagerAddressKey(CanalConstants.GLOBAL_NAME)); if (StringUtils.isNotEmpty(managerAddress)) { globalConfig.setManagerAddress(managerAddress); } //读取canal.instance.global.spring.xml String springXml = getProperty(properties, CanalConstants.getInstancSpringXmlKey(CanalConstants.GLOBAL_NAME)); if (StringUtils.isNotEmpty(springXml)) { globalConfig.setSpringXml(springXml); } instanceGenerator = //...初始化instanceGenerator return globalConfig; }
其中canal.instance.global.mode用于确定canal instance的全局配置加载方式,其取值范围有2个:spring、manager。我们知道一个canal server中可以启动多个canal instance,每个instance都有各自的配置。instance的配置也可以放在本地,也可以放在远程配置中心里。我们可以自定义每个canal instance配置文件存储的位置,如果所有canal instance的配置都在本地或者远程,此时我们就可以通过canal.instance.global.mode这个配置项,来统一的指定配置文件的位置,避免为每个canal instance单独指定。
其中:
spring方式:
表示所有的canal instance的配置文件位于本地。此时,我们必须提供配置项canal.instance.global.spring.xml指定spring配置文件的路径。canal提供了多个spring配置文件:file-instance.xml、default-instance.xml、memory-instance.xml、local-instance.xml、group-instance.xml。这么多配置文件主要是为了支持canal instance不同的工作方式。我们在稍后将会讲解各个配置文件的区别。而在这些配置文件的开头,我们无一例外的可以看到以下配置:
<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false"> <property name="ignoreResourceNotFound" value="true" /> <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 --> <property name="locationNames"> <list> <value>classpath:canal.properties</value> <value>classpath:${canal.instance.destination:}/instance.properties</value> </list> </property> </bean>
这里我们可以看到,所谓通过spring方式加载canal instance配置,无非就是通过spring提供的PropertyPlaceholderConfigurer来加载canal instance的配置文件instance.properties。
这里instance.properties的文件完整路径是${canal.instance.destination:}/instance.properties,其中${canal.instance.destination}是一个变量。这是因为我们可以在一个canal server中配置多个canal instance,每个canal instance配置文件的名称都是instance.properties,因此我们需要通过目录进行区分。例如我们通过配置项canal.destinations指定多个canal instance的名字
canal.destinations= example1,example2
此时我们就要conf目录下,新建两个子目录example1和example2,每个目录下各自放置一个instance.properties。
canal在初始化时就会分别使用example1和example2来替换${canal.instance.destination:},从而分别根据example1/instance.properties和example2/instance.properties创建2个canal instance。
manager方式:
表示所有的canal instance的配置文件位于远程配置中心,此时我们必须提供配置项 canal.instance.global.manager.address来指定远程配置中心的地址。目前alibaba内部配置使用这种方式。开发者可以自己实现CanalConfigClient,连接各自的管理系统,完成接入。
3.1.2 instanceGenerator字段
类型为CanalInstanceGenerator。在initGlobalConfig方法中,除了创建了globalInstanceConfig实例,同时还为字段instanceGenerator字段进行了赋值。
顾名思义,这个字段用于创建CanalInstance实例。这是instance模块中的类,其作用就是为canal.properties文件中canal.destinations配置项列出的每个destination,创建一个CanalInstance实例。CanalInstanceGenerator是一个接口,定义如下所示:
public interface CanalInstanceGenerator { /** * 通过 destination 产生特定的 {@link CanalInstance} */ CanalInstance generate(String destination); }
针对spring和manager两种instance配置的加载方式,CanalInstanceGenerator提供了两个对应的实现类,如下所示:
instanceGenerator字段通过一个匿名内部类进行初始化。其内部会判断配置的各个destination的配置加载方式,spring 或者manager。
instanceGenerator = new CanalInstanceGenerator() { public CanalInstance generate(String destination) { //1、根据destination从instanceConfigs获取对应的InstanceConfig对象 InstanceConfig config = instanceConfigs.get(destination); if (config == null) { throw new CanalServerException("can't find destination:{}"); } //2、如果destination对应的InstanceConfig的mode是manager方式,使用ManagerCanalInstanceGenerator if (config.getMode().isManager()) { ManagerCanalInstanceGenerator instanceGenerator = new ManagerCanalInstanceGenerator(); instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress())); return instanceGenerator.generate(destination); } else if (config.getMode().isSpring()) { //3、如果destination对应的InstanceConfig的mode是spring方式,使用SpringCanalInstanceGenerator SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator(); synchronized (this) { try { // 设置当前正在加载的通道,加载spring查找文件时会用到该变量 System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, destination); instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml())); return instanceGenerator.generate(destination); } catch (Throwable e) { logger.error("generator instance failed.", e); throw new CanalException(e); } finally { System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, ""); } } } else { throw new UnsupportedOperationException("unknow mode :" + config.getMode()); } } };
上述代码中的第1步比较变态,从instanceConfigs中根据destination作为参数,获得对应的InstanceConfig。而instanceConfigs目前还没有被初始化,这个字段是在稍后将后将要讲解的initInstanceConfig方法初始化的,不过由于这是一个引用类型,当initInstanceConfig方法被执行后,instanceConfigs字段中也就有值了。目前,我们姑且认为, instanceConfigs这个Map<String, InstanceConfig>类型的字段已经被初始化好了。
2、3两步用于确定是instance的配置加载方式是spring还是manager,如果是spring,就使用SpringCanalInstanceGenerator创建CanalInstance实例,如果是manager,就使用ManagerCanalInstanceGenerator创建CanalInstance实例。
由于目前manager方式的源码并未开源,因此,我们只分析SpringCanalInstanceGenerator相关代码。
上述代码中,首先创建了一个SpringCanalInstanceGenerator实例,然后往里面设置了一个BeanFactory。
instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));
其中config.getSpringXml()返回的就是我们在canal.properties中通过canal.instance.global.spring.xml配置项指定了spring配置文件路径。getBeanFactory方法源码如下所示:
private BeanFactory getBeanFactory(String springXml) { ApplicationContext applicationContext = new ClassPathXmlApplicationContext(springXml); return applicationContext; }
往SpringCanalInstanceGenerator设置了BeanFactory之后,就可以通过其的generate方法获得CanalInstance实例。
SpringCanalInstanceGenerator的源码如下所示:
public class SpringCanalInstanceGenerator implements CanalInstanceGenerator, BeanFactoryAware { private String defaultName = "instance"; private BeanFactory beanFactory; public CanalInstance generate(String destination) { String beanName = destination; //首先判断beanFactory是否包含以destination为id的bean if (!beanFactory.containsBean(beanName)) { beanName = defaultName;//如果没有,设置要获取的bean的id为instance。 } //以默认的bean的id值"instance"来获取CanalInstance实例 return (CanalInstance) beanFactory.getBean(beanName); } public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; } }
首先尝试以传入的参数destination来获取CanalInstance实例,如果没有,就以默认的bean的id值"instance"来获取CanalInstance实例。事实上,如果你没有修改spring配置文件,那么默认的名字就是instance。事实上,在canal提供的各个spring配置文件xxx-instance.xml中,都有类似以下配置:
<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring"> <property name="destination" value="${canal.instance.destination}" /> <property name="eventParser"> <ref local="eventParser" /> </property> <property name="eventSink"> <ref local="eventSink" /> </property> <property name="eventStore"> <ref local="eventStore" /> </property> <property name="metaManager"> <ref local="metaManager" /> </property> <property name="alarmHandler"> <ref local="alarmHandler" /> </property> </bean>
上面的代码片段中,我们看到的确有一个bean的名字是instance,其类型是CanalInstanceWithSpring,这是CanalInstance接口的实现类。类似的,我们可以想到在manager配置方式下,获取的CanalInstance实现类是CanalInstanceWithManager。事实上,你想的没错,CanalInstance的类图继承关系如下所示:
需要注意的是,到目前为止,我们只是创建好了CanalInstanceGenerator,而CanalInstance尚未创建。在CanalController的start方法被调用时,CanalInstance才会被真正的创建,相关源码将在稍后分析。3.1.3 instanceConfigs字段
类型为Map<String, InstanceConfig>。前面提到初始化instanceGenerator后,当其generate方法被调用时,会尝试从instanceConfigs根据一个destination获取对应的InstanceConfig,现在分析instanceConfigs的相关初始化代码。
我们知道globalInstanceConfig定义全局的配置加载方式。如果需要把部分CanalInstance配置放于本地,另外一部分CanalIntance配置放于远程配置中心,则只通过全局方式配置,无法达到这个要求。虽然这种情况很少见,但是为了提供最大的灵活性,canal支持每个CanalIntance自己来定义自己的加载方式,来覆盖默认的全局配置加载方式。而每个destination对应的InstanceConfig配置就存放于instanceConfigs字段中。
举例来说:
//当前server上部署的instance列表 canal.destinations=instance1,instance2 //instance配置全局加载方式 canal.instance.global.mode = spring canal.instance.global.lazy = false canal.instance.global.spring.xml = classpath:spring/file-instance.xml //instance1覆盖全局加载方式 canal.instance.instance1.mode = manager canal.instance.instance1.manager.address = 127.0.0.1:1099 canal.instance.instance1.lazy = tue
这段配置中,设置了instance的全局加载方式为spring,instance1覆盖了全局配置,使用manager方式加载配置。而instance2没有覆盖配置,因此默认使用spring加载方式。
instanceConfigs字段通过initInstanceConfig方法进行初始化
instanceConfigs = new MapMaker().makeMap();//这里利用Google Guava框架的MapMaker创建Map实例并赋值给instanceConfigs // 初始化instance config initInstanceConfig(properties);
initInstanceConfig方法源码如下:
private void initInstanceConfig(Properties properties) { //读取配置项canal.destinations String destinationStr = getProperty(properties, CanalConstants.CANAL_DESTINATIONS); //以","分割canal.destinations,得到一个数组形式的destination String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT); for (String destination : destinations) { //为每一个destination生成一个InstanceConfig实例 InstanceConfig config = parseInstanceConfig(properties, destination); //将destination对应的InstanceConfig放入instanceConfigs中 InstanceConfig oldConfig = instanceConfigs.put(destination, config); if (oldConfig != null) { logger.warn("destination:{} old config:{} has replace by new config:{}", new Object[] { destination, oldConfig, config }); } } }
上面代码片段中,首先解析canal.destinations配置项,可以理解一个destination就对应要初始化一个canal instance。针对每个destination会创建各自的InstanceConfig,最终都会放到instanceConfigs这个Map中。
各个destination对应的InstanceConfig都是通过parseInstanceConfig方法来解析
private InstanceConfig parseInstanceConfig(Properties properties, String destination) { //每个destination对应的InstanceConfig都引用了全局的globalInstanceConfig InstanceConfig config = new InstanceConfig(globalInstanceConfig); //...其他几个配置项与获取globalInstanceConfig类似,不再赘述,唯一注意的的是配置项的key部分中的global变成传递进来的destination return config; }
此时我们可以看一下InstanceConfig类的源码:
public class InstanceConfig { private InstanceConfig globalConfig; private InstanceMode mode; private Boolean lazy; private String managerAddress; private String springXml; public InstanceConfig(){ } public InstanceConfig(InstanceConfig globalConfig){ this.globalConfig = globalConfig; } public static enum InstanceMode { SPRING, MANAGER; public boolean isSpring() { return this == InstanceMode.SPRING; } public boolean isManager() { return this == InstanceMode.MANAGER; } } public Boolean getLazy() { if (lazy == null && globalConfig != null) { return globalConfig.getLazy(); } else { return lazy; } } public void setLazy(Boolean lazy) { this.lazy = lazy; } public InstanceMode getMode() { if (mode == null && globalConfig != null) { return globalConfig.getMode(); } else { return mode; } } public void setMode(InstanceMode mode) { this.mode = mode; } public String getManagerAddress() { if (managerAddress == null && globalConfig != null) { return globalConfig.getManagerAddress(); } else { return managerAddress; } } public void setManagerAddress(String managerAddress) { this.managerAddress = managerAddress; } public String getSpringXml() { if (springXml == null && globalConfig != null) { return globalConfig.getSpringXml(); } else { return springXml; } } public void setSpringXml(String springXml) { this.springXml = springXml; } public String toString() { return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE); } }
可以看到,InstanceConfig类中维护了一个globalConfig字段,其类型也是InstanceConfig。而其相关get方法在执行时,会按照以下逻辑进行判断:如果没有自身没有这个配置,则返回全局配置,如果有,则返回自身的配置。通过这种方式实现对全局配置的覆盖。
3.2 准备canal server相关代码
cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID)); ip = getProperty(properties, CanalConstants.CANAL_IP); port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT)); embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator canalServer = CanalServerWithNetty.instance(); canalServer.setIp(ip); canalServer.setPort(port);
上述代码中,首先解析了cid、ip、port字段,其中:
cid:Long,对应canal.properties文件中的canal.id,目前无实际用途
ip:String,对应canal.properties文件中的canal.ip,canal server监听的ip。
port:int,对应canal.properties文件中的canal.port,canal server监听的端口
之后分别为以下两个字段赋值:
embededCanalServer:类型为CanalServerWithEmbedded
canalServer:类型为CanalServerWithNetty
CanalServerWithEmbedded 和 CanalServerWithNetty都实现了CanalServer接口,且都实现了单例模式,通过静态方法instance获取实例。
关于这两种类型的实现,canal官方文档有以下描述:
说白了,就是我们可以不必独立部署canal server。在应用直接使用CanalServerWithEmbedded直连mysql数据库。如果觉得自己的技术hold不住相关代码,就独立部署一个canal server,使用canal提供的客户端,连接canal server获取binlog解析后数据。而CanalServerWithNetty是在CanalServerWithEmbedded的基础上做的一层封装,用于与客户端通信。
在独立部署canal server时,Canal客户端发送的所有请求都交给CanalServerWithNetty处理解析,解析完成之后委派给了交给CanalServerWithEmbedded进行处理。因此CanalServerWithNetty就是一个马甲而已。CanalServerWithEmbedded才是核心。
因此,在上述代码中,我们看到,用于生成CanalInstance实例的instanceGenerator被设置到了CanalServerWithEmbedded中,而ip和port被设置到CanalServerWithNetty中。
关于CanalServerWithNetty如何将客户端的请求委派给CanalServerWithEmbedded进行处理,我们将在server模块源码分析中进行讲解。
3.3 初始化zk相关代码
//读取canal.properties中的配置项canal.zkServers,如果没有这个配置,则表示项目不使用zk final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS); if (StringUtils.isNotEmpty(zkServers)) { //创建zk实例 zkclientx = ZkClientx.getZkClient(zkServers); // 初始化系统目录 //destination列表,路径为/otter/canal/destinations zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true); //整个canal server的集群列表,路径为/otter/canal/cluster zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true); }
canal支持利用了zk来完成HA机制、以及将当前消费到到的mysql的binlog位置记录到zk中。ZkClientx是canal对ZkClient进行了一层简单的封装。
显然,当我们没有配置canal.zkServers,那么zkclientx不会被初始化。
关于Canal如何利用ZK做HA,我们将在稍后的代码中进行分。而利用zk记录binlog的消费进度,将在之后的章节进行分析。
转载说明
本文转载自田守枝的Java技术博客,感谢作者的分享
-