canal源码解析系列(2):deployer模块(下)
-
系列文章索引:
3.4 CanalInstance运行状态监控相关代码
由于这段代码比较长且恶心,这里笔者暂时对部分代码进行省略,以便读者看清楚整各脉络
final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port); ServerRunningMonitors.setServerData(serverData); ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() { public ServerRunningMonitor apply(final String destination) { ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData); runningMonitor.setDestination(destination); runningMonitor.setListener(new ServerRunningListener() {....});//省略ServerRunningListener的具体实现 if (zkclientx != null) { runningMonitor.setZkClient(zkclientx); } // 触发创建一下cid节点 runningMonitor.init(); return runningMonitor; } }));
上述代码中,ServerRunningMonitors是ServerRunningMonitor对象的容器,而ServerRunningMonitor用于监控CanalInstance。
canal会为每一个destination创建一个CanalInstance,每个CanalInstance都会由一个ServerRunningMonitor来进行监控。而ServerRunningMonitor统一由ServerRunningMonitors进行管理。 除了CanalInstance需要监控,CanalServer本身也需要监控。因此我们在代码一开始,就看到往ServerRunningMonitors设置了一个ServerRunningData对象,封装了canal server监听的ip和端口等信息。
ServerRunningMonitors源码如下所示:
public class ServerRunningMonitors { private static ServerRunningData serverData; private static Map runningMonitors; // <String,ServerRunningMonitor> public static ServerRunningData getServerData() { return serverData; } public static Map<String, ServerRunningMonitor> getRunningMonitors() { return runningMonitors; } public static ServerRunningMonitor getRunningMonitor(String destination) { return (ServerRunningMonitor) runningMonitors.get(destination); } public static void setServerData(ServerRunningData serverData) { ServerRunningMonitors.serverData = serverData; } public static void setRunningMonitors(Map runningMonitors) { ServerRunningMonitors.runningMonitors = runningMonitors; } }
ServerRunningMonitors的setRunningMonitors方法接收的参数是一个Map,其中Map的key是destination,value是ServerRunningMonitor,也就是说针对每一个destination都有一个ServerRunningMonitor来监控。
上述代码中,在往ServerRunningMonitors设置Map时,是通过MigrateMap.makeComputingMap方法来创建的,其接受一个Function类型的参数,这是guava中定义的接口,其声明了apply抽象方法。其工作原理可以通过下面代码片段进行介绍:
Map<String, User> map = MigrateMap.makeComputingMap(new Function<String, User>() { @Override public User apply(String name) { return new User(name); } }); User user = map.get("tianshouzhi");//第一次获取时会创建 assert user != null; assert user == map.get("tianshouzhi");//之后获取,总是返回之前已经创建的对象
这段代码中,我们利用MigrateMap.makeComputingMap创建了一个Map,其中key为String类型,value为User类型。当我们调用map.get(“tianshouzhi”)方法,最开始这个Map中并没有任何key/value的,于是其就会回调Function的apply方法,利用参数"tianshouzhi"创建一个User对象并返回。之后当我们再以"tianshouzhi"为key从Map中获取User对象时,会直接将前面创建的对象返回。不会回调apply方法,也就是说,只有在第一次尝试获取时,才会回调apply方法。
而在上述代码中,实际上就利用了这个特性,只不过是根据destination获取ServerRunningMonitor对象,如果不存在就创建。
在创建ServerRunningMonitor对象时,首先根据ServerRunningData创建ServerRunningMonitor实例,之后设置了destination和ServerRunningListener对象,接着,判断如果zkClientx字段如果不为空,也设置到ServerRunningMonitor中,最后调用init方法进行初始化。
ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData); runningMonitor.setDestination(destination); runningMonitor.setListener(new ServerRunningListener(){...})//省略ServerRunningListener具体代码 if (zkclientx != null) { runningMonitor.setZkClient(zkclientx); } // 触发创建一下cid节点 runningMonitor.init(); return runningMonitor;
ServerRunningListener的实现如下:
new ServerRunningListener() { /*内部调用了embededCanalServer的start(destination)方法。 此处需要划重点,说明每个destination对应的CanalInstance是通过embededCanalServer的start方法启动的, 这与我们之前分析将instanceGenerator设置到embededCanalServer中可以对应上。 embededCanalServer负责调用instanceGenerator生成CanalInstance实例,并负责其启动。*/ public void processActiveEnter() { try { MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination)); embededCanalServer.start(destination); } finally { MDC.remove(CanalConstants.MDC_DESTINATION); } } //内部调用embededCanalServer的stop(destination)方法。与上start方法类似,只不过是停止CanalInstance。 public void processActiveExit() { try { MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination)); embededCanalServer.stop(destination); } finally { MDC.remove(CanalConstants.MDC_DESTINATION); } } /*处理存在zk的情况下,在Canalinstance启动之前,在zk中创建节点。 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被destination替换,1会被ip:port替换。 此方法会在processActiveEnter()之前被调用*/ public void processStart() { try { if (zkclientx != null) { final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":" + port); initCid(path); zkclientx.subscribeStateChanges(new IZkStateListener() { public void handleStateChanged(KeeperState state) throws Exception { } public void handleNewSession() throws Exception { initCid(path); } }); } } finally { MDC.remove(CanalConstants.MDC_DESTINATION); } } //处理存在zk的情况下,在Canalinstance停止前,释放zk节点,路径为/otter/canal/destinations/{0}/cluster/{1}, //其0会被destination替换,1会被ip:port替换。此方法会在processActiveExit()之前被调用 public void processStop() { try { MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination)); if (zkclientx != null) { final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":" + port); releaseCid(path); } } finally { MDC.remove(CanalConstants.MDC_DESTINATION); } } }
上述代码中,我们可以看到启动一个CanalInstance实际上是在ServerRunningListener的processActiveEnter方法中,通过调用embededCanalServer的start(destination)方法进行的,对于停止也是类似。
那么ServerRunningListener中的相关方法到底是在哪里回调的呢?我们可以在ServerRunningMonitor的start和stop方法中找到答案,这里只列出start方法。
public class ServerRunningMonitor extends AbstractCanalLifeCycle { ... public void start() { super.start(); processStart();//其内部会调用ServerRunningListener的processStart()方法 if (zkClient != null) {//存在zk,以HA方式启动 // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.subscribeDataChanges(path, dataListener); initRunning(); } else {//没有zk,直接启动 processActiveEnter(); } } //...stop方法逻辑类似,相关代码省略 }
当ServerRunningMonitor的start方法被调用时,其首先会直接调用processStart方法,这个方法内部直接调了ServerRunningListener的processStart()方法,源码如下所示。通过前面的分析,我们已经知道在存在zkClient!=null的情况,会往zk中创建一个节点。
private void processStart() { if (listener != null) { try { listener.processStart(); } catch (Exception e) { logger.error("processStart failed", e); } } }
HA方式启动:
存在zk,说明canal server可能做了集群,因为canal就是利用zk来做HA的。首先根据destination构造一个zk的节点路径,然后进行监听。
/*构建临时节点的路径:/otter/canal/destinations/{0}/running,其中占位符{0}会被destination替换。 在集群模式下,可能会有多个canal server共同处理同一个destination, 在某一时刻,只能由一个canal server进行处理,处理这个destination的canal server进入running状态,其他canal server进入standby状态。*/ String path = ZookeeperPathUtils.getDestinationServerRunning(destination); /*对destination对应的running节点进行监听,一旦发生了变化,则说明可能其他处理相同destination的canal server可能出现了异常, 此时需要尝试自己进入running状态。*/ zkClient.subscribeDataChanges(path, dataListener);
上述只是监听代码,之后尝试调用initRunning方法通过HA的方式来启动CanalInstance。
private void initRunning() { if (!isStart()) { return; } //构建临时节点的路径:/otter/canal/destinations/{0}/running,其中占位符{0}会被destination替换 String path = ZookeeperPathUtils.getDestinationServerRunning(destination); // 序列化 //构建临时节点的数据,标记当前destination由哪一个canal server处理 byte[] bytes = JsonUtils.marshalToByte(serverData); try { mutex.set(false); //尝试创建临时节点。如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。 //此时会抛出ZkNodeExistsException,进入catch代码块。 zkClient.create(path, bytes, CreateMode.EPHEMERAL); activeData = serverData; processActiveEnter();//如果创建成功,触发一下事件,内部调用ServerRunningListener的processActiveEnter方法 mutex.set(true); } catch (ZkNodeExistsException e) { //创建节点失败,则根据path从zk中获取当前是哪一个canal server创建了当前canal instance的相关信息。 //第二个参数true,表示的是,如果这个path不存在,则返回null。 bytes = zkClient.readData(path, true); if (bytes == null) {// 如果不存在节点,立即尝试一次 initRunning(); } else { //如果的确存在,则将创建该canal instance实例信息存入activeData中。 activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class); } } catch (ZkNoNodeException e) {//如果/otter/canal/destinations/{0}/节点不存在,进行创建其中占位符{0}会被destination替换 zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点 initRunning(); } }
可以看到,initRunning方法内部只有在尝试在zk中创建节点成功后,才会去调用listener的processActiveEnter方法来真正启动destination对应的canal instance,这是canal HA方式启动的核心。canal官方文档中介绍了CanalServer HA机制启动的流程,如下:
事实上,这个说明的前两步,都是在initRunning方法中实现的。从上面的代码中,我们可以看出,在HA机启动的情况下,initRunning方法不一定能走到processActiveEnter()方法,因为创建临时节点可能会出错。此外,根据官方文档说明,如果出错,那么当前canal instance则进入standBy状态。也就是另外一个canal instance出现异常时,当前canal instance顶上去。那么相关源码在什么地方呢?在HA方式启动最开始的2行代码的监听逻辑中:
String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.subscribeDataChanges(path, dataListener);
其中dataListener类型是IZkDataListener,这是zkclient客户端提供的接口,定义如下:
public interface IZkDataListener { public void handleDataChange(String dataPath, Object data) throws Exception; public void handleDataDeleted(String dataPath) throws Exception; }
当zk节点中的数据发生变更时,会自动回调这两个方法,很明显,一个是用于处理节点数据发生变化,一个是用于处理节点数据被删除。
而dataListener是在ServerRunningMonitor的构造方法中初始化的,如下:
public ServerRunningMonitor(){ // 创建父节点 dataListener = new IZkDataListener() { //!!!目前看来,好像并没有存在修改running节点数据的代码,为什么这个方法不是空实现? public void handleDataChange(String dataPath, Object data) throws Exception { MDC.put("destination", destination); ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class); if (!isMine(runningData.getAddress())) { mutex.set(false); } if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active release = true; releaseRunning();// 彻底释放mainstem } activeData = (ServerRunningData) runningData; } //当其他canal instance出现异常,临时节点数据被删除时,会自动回调这个方法,此时当前canal instance要顶上去 public void handleDataDeleted(String dataPath) throws Exception { MDC.put("destination", destination); mutex.set(false); if (!release && activeData != null && isMine(activeData.getAddress())) { // 如果上一次active的状态就是本机,则即时触发一下active抢占 initRunning(); } else { // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作 delayExector.schedule(new Runnable() { public void run() { initRunning();//尝试自己进入running状态 } }, delayTime, TimeUnit.SECONDS); } } }; }
那么现在问题来了?ServerRunningMonitor的start方法又是在哪里被调用的, 这个方法被调用了,才能真正的启动canal instance。这部分代码我们放到后面的CanalController中的start方法进行讲解。
下面分析最后一部分代码,autoScan机制相关代码。
3.5 autoScan机制相关代码
关于autoscan,官方文档有以下介绍:
结合autoscan机制的相关源码:
// autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN)); if (autoScan) { defaultAction = new InstanceAction() {//....}; instanceConfigMonitors = //.... }
可以看到,autoScan是否需要自动扫描的开关,只有当autoScan为true时,才会初始化defaultAction字段和instanceConfigMonitors字段。其中:
其中:
defaultAction:其作用是如果配置发生了变更,默认应该采取什么样的操作。其实现了InstanceAction接口定义的三个抽象方法:start、stop和reload。当新增一个destination配置时,需要调用start方法来启动;当移除一个destination配置时,需要调用stop方法来停止;当某个destination配置发生变更时,需要调用reload方法来进行重启。
instanceConfigMonitors:类型为Map<InstanceMode, InstanceConfigMonitor>。defaultAction字段只是定义了配置发生变化默认应该采取的操作,那么总该有一个类来监听配置是否发生了变化,这就是InstanceConfigMonitor的作用。官方文档中,只提到了对canal.conf.dir配置项指定的目录的监听,这指的是通过spring方式加载配置。显然的,通过manager方式加载配置,配置中心的内容也是可能发生变化的,也需要进行监听。此时可以理解为什么instanceConfigMonitors的类型是一个Map,key为InstanceMode,就是为了对这两种方式的配置加载方式都进行监听。defaultAction字段初始化源码如下所示:
defaultAction = new InstanceAction() { public void start(String destination) { InstanceConfig config = instanceConfigs.get(destination); if (config == null) { // 重新读取一下instance config config = parseInstanceConfig(properties, destination); instanceConfigs.put(destination, config); } if (!embededCanalServer.isStart(destination)) { // HA机制启动 ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (!config.getLazy() && !runningMonitor.isStart()) { runningMonitor.start(); } } } public void stop(String destination) { // 此处的stop,代表强制退出,非HA机制,所以需要退出HA的monitor和配置信息 InstanceConfig config = instanceConfigs.remove(destination); if (config != null) { embededCanalServer.stop(destination); ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (runningMonitor.isStart()) { runningMonitor.stop(); } } } public void reload(String destination) { // 目前任何配置变化,直接重启,简单处理 stop(destination); start(destination); } };
instanceConfigMonitors字段初始化源码如下所示:
instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() { public InstanceConfigMonitor apply(InstanceMode mode) { int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL)); if (mode.isSpring()) {//如果加载方式是spring,返回SpringInstanceConfigMonitor SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor(); monitor.setScanIntervalInSecond(scanInterval); monitor.setDefaultAction(defaultAction); // 设置conf目录,默认是user.dir + conf目录组成 String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR); if (StringUtils.isEmpty(rootDir)) { rootDir = "../conf"; } if (StringUtils.equals("otter-canal", System.getProperty("appName"))) { monitor.setRootConf(rootDir); } else { // eclipse debug模式 monitor.setRootConf("src/main/resources/"); } return monitor; } else if (mode.isManager()) {//如果加载方式是manager,返回ManagerInstanceConfigMonitor return new ManagerInstanceConfigMonitor(); } else { throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor"); } } });
可以看到instanceConfigMonitors也是根据mode属性,来采取不同的监控实现类SpringInstanceConfigMonitor 或者ManagerInstanceConfigMonitor,二者都实现了InstanceConfigMonitor接口。
public interface InstanceConfigMonitor extends CanalLifeCycle { void register(String destination, InstanceAction action); void unregister(String destination); }
当需要对一个destination进行监听时,调用register方法
当取消对一个destination监听时,调用unregister方法。
事实上,unregister方法在canal 内部并没有有任何地方被调用,也就是说,某个destination如果开启了autoScan=true,那么你是无法在运行时停止对其进行监控的。如果要停止,你可以选择将对应的目录删除。
InstanceConfigMonitor本身并不知道哪些canal instance需要进行监控,因为不同的canal instance,有的可能设置autoScan为true,另外一些可能设置为false。
在CanalConroller的start方法中,对于autoScan为true的destination,会调用InstanceConfigMonitor的register方法进行注册,此时InstanceConfigMonitor才会真正的对这个destination配置进行扫描监听。对于那些autoScan为false的destination,则不会进行监听。
目前SpringInstanceConfigMonitor对这两个方法都进行了实现,而ManagerInstanceConfigMonitor目前对这两个方法实现的都是空,需要开发者自己来实现。
在实现ManagerInstanceConfigMonitor时,可以参考SpringInstanceConfigMonitor。
此处不打算再继续进行分析SpringInstanceConfigMonitor的源码,因为逻辑很简单,感兴趣的读者可以自行查看SpringInstanceConfigMonitor 的scan方法,内部在什么情况下会回调defualtAction的start、stop、reload方法 。
4 CanalController的start方法
而ServerRunningMonitor的start方法,是在CanalController中的start方法中被调用的,CanalController中的start方法是在CanalLauncher中被调用的。
com.alibaba.otter.canal.deployer.CanalController#start
public void start() throws Throwable { logger.info("## start the canal server[{}:{}]", ip, port); // 创建整个canal的工作节点 :/otter/canal/cluster/{0} final String path = ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port); initCid(path); if (zkclientx != null) { this.zkclientx.subscribeStateChanges(new IZkStateListener() { public void handleStateChanged(KeeperState state) throws Exception { } public void handleNewSession() throws Exception { initCid(path); } }); } // 优先启动embeded服务 embededCanalServer.start(); //启动不是lazy模式的CanalInstance,通过迭代instanceConfigs,根据destination获取对应的ServerRunningMonitor,然后逐一启动 for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) { final String destination = entry.getKey(); InstanceConfig config = entry.getValue(); // 如果destination对应的CanalInstance没有启动,则进行启动 if (!embededCanalServer.isStart(destination)) { ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); //如果不是lazy,lazy模式需要等到第一次有客户端请求才会启动 if (!config.getLazy() && !runningMonitor.isStart()) { runningMonitor.start(); } } if (autoScan) { instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction); } } if (autoScan) {//启动配置文件自动检测机制 instanceConfigMonitors.get(globalInstanceConfig.getMode()).start(); for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) { if (!monitor.isStart()) { monitor.start();//启动monitor } } } // 启动网络接口,监听客户端请求 canalServer.start(); }
5 总结
deployer模块的主要作用:
1、读取canal.properties,确定canal instance的配置加载方式
2、确定canal instance的启动方式:独立启动或者集群方式启动
3、监听canal instance的配置的变化,动态停止、启动或新增
4、启动canal server,监听客户端请求转载说明
本文转载自田守枝的Java技术博客,感谢作者的分享