zookeeper路径信息变化(节点信息变化)监听实现
项目中用的监听zookeeper节点信息变化(如何变化?,变化以后要如何做?待后续的了解)的方式,通过注册监听器,然后开启监听。 zookeeper简单操作类
package com.lancy.zookeeper; import java.util.List; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; /** * @ClassName: ZKCrudOperation * @Description: zookeeper的简单操作 * */ public class ZKCrudOperation { /** * * @Title: create * @Description: this will create the given ZNode with the given data * @param client * @param path * @param payload */ public static void create(CuratorFramework client, String path, byte[] payload) { try { client.create().forPath(path, payload); } catch (Exception e) { e.printStackTrace(); } } /** * * @Title: createEphemeral * @Description:this will create the given EPHEMERAL ZNode with the given data * @param client * @param path * @param payload * */ public static void createEphemeral(CuratorFramework client, String path, byte[] payload) { try { client.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); } catch (Exception e) { e.printStackTrace(); } } /** * * @Title: createEphemeralSequential * @Description: this will create the given EPHEMERAL-SEQUENTIAL ZNode with * the given data using Curator protection. * @param client * @param path * @param payload * @return */ public static String createEphemeralSequential(CuratorFramework client, String path, byte[] payload) { try { return client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, payload); } catch (Exception e) { e.printStackTrace(); return null; } } /** * * @Title: setData * @Description:set data for the given node * @param client * @param path * @param payload */ public static void setData(CuratorFramework client, String path, byte[] payload) { try { client.setData().forPath(path, payload); } catch (Exception e) { e.printStackTrace(); } } /** * * @Title: delete * @Description:delete the given node * @param client * @param path */ public static void delete(CuratorFramework client, String path) { try { client.delete().forPath(path); } catch (Exception e) { e.printStackTrace(); } } /** * * @Title: guaranteedDelete * @Description: delete the given node and guarantee that it completes * @param client * @param path */ public static void guaranteedDelete(CuratorFramework client, String path) { try { client.delete().guaranteed().forPath(path); } catch (Exception e) { e.printStackTrace(); } } /** * * @Title: watchedGetChildren * @Description: Get children and set a watcher on the node. The watcher * notification * @param client * @param path * @return */ public static List<String> watchedGetChildren(CuratorFramework client, String path) { try { return client.getChildren().watched().forPath(path); } catch (Exception e) { e.printStackTrace(); return null; } } /** * * @Title: getPathChildren * @Description: Get children from the path * @param client * @param path * @return */ public static List<String> getPathChildren(CuratorFramework client, String path) { try { return client.getChildren().forPath(path); } catch (Exception e) { e.printStackTrace(); return null; } } /** * * @Title: getNoneValue * @Description: 获取节点的值 * @param client * @param path * @return */ public static String getNodeValue(CuratorFramework client, String path) { try { byte[] value = client.getData().forPath(path); return value == null ? null : new String(value); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 判断节点是否存在 * @param client * @param path * @return */ public static boolean checkNodeExist(CuratorFramework client, String path) { boolean flag = true; try { Stat stat = client.checkExists().forPath(path); // Stat就是对zonde所有属性的一个映射, stat=null表示节点不存在! if (stat == null) { flag = false; } } catch (Exception e) { e.printStackTrace(); flag = false; } return flag; } }
监听器的接口
package com.lancy.zookeeper; import org.apache.curator.framework.CuratorFramework; public interface IZKListener { void executor(CuratorFramework client); }
FTP监听器 监听器接口的实现类
package com.lancy.zookeeper.listener; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.lancy.common.ConfigCommon; import com.lancy.zookeeper.IZKListener; /** * @ClassName: FtpConfigListener * @Description: FTP配置 * */ public class FtpConfigListener implements IZKListener { private static Logger logger = LoggerFactory.getLogger(FtpConfigListener.class); private String path; public FtpConfigListener(String path) { this.path = path; } @Override public void executor(CuratorFramework client) { //zookeeper的节点变化监听器PathChildrenCache,只能监听下一级节点 final PathChildrenCache cached = new PathChildrenCache(client, path, true); cached.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { PathChildrenCacheEvent.Type eventType = event.getType(); switch (eventType) { case CONNECTION_RECONNECTED: cached.rebuild(); break; case CONNECTION_SUSPENDED: case CONNECTION_LOST: logger.info("Connection error,waiting..."); break; case CHILD_REMOVED: logger.info("Child node removed..."); //把map里对应的记录删除 ConfigCommon.FTP.remove(event.getData().getPath() .substring(event.getData().getPath().lastIndexOf("/") + 1)); break; default: logger.info("Child node new added or updated..."); //添加到Map里 ConfigCommon.FTP.put( event.getData().getPath().substring(event.getData().getPath().lastIndexOf("/") + 1), new String(event.getData().getData())); } } }); try { cached.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); } catch (Exception e) { logger.error("zookeeper listener error: ", e); } } }
zookeeper 客户端框架Curator的工厂 实现注册监听器,启动监听、启动CuratorFramework。
package com.lancy.zookeeper; import java.util.List; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.ExponentialBackoffRetry; import com.lancy.common.util.PropertiesUtil; public class ZKClientFactory { //保存需要注册的监听器(监听zookeeper不同节点变化的监听器) public static List<IZKListener> listeners; /** * 获取Curator */ public static CuratorFramework newClient() { //PropertiesUtil获取配置文件的配置信息;如ZOOKEEPER_HOST_PORT=192.168.1.81:2181 String connectionString = PropertiesUtil.get("ZOOKEEPER_HOST_PORT"); ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); return CuratorFrameworkFactory.builder().connectString(connectionString).retryPolicy(retryPolicy) .connectionTimeoutMs(2000).sessionTimeoutMs(100000).build(); } // 注册需要监听的监听者对像. @SuppressWarnings("unused") public static void registerListeners(CuratorFramework client) { //Curator建议添加实现了ConnectionStateListener接口的监听器,其实现自动重新在zookeeper上注册节点 client.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { //如果client与zookeeper的状态是 if (newState == ConnectionState.CONNECTED || newState == ConnectionState.RECONNECTED) { for (IZKListener listener : listeners) { listener.executor(client);//注册监听器,启动监听器 } } } }); client.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() { @Override public void unhandledError(String message, Throwable e) { System.out.println(message); } }); } }
静态参数配置 项目中该类保存了大多数的配置信息,这些配置信息都是从zookeeper中的节点获取的,如何把这些配置信息放到zookeeper中,需要后续的补充。。。。。。
package com.lancy.common; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.curator.framework.CuratorFramework; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.lancy.common.util.FileUtil; import com.lancy.common.util.PropertiesUtil; import com.lancy.common.util.RedisMappingUtil; import com.lancy.zookeeper.IZKListener; import com.lancy.zookeeper.ZKClientFactory; import com.lancy.zookeeper.ZKCrudOperation; import com.lancy.zookeeper.listener.FtpConfigListener; /** * 静态参数配置 */ public class ConfigCommon { // zookeeper public static String ZOOKEEPER_HOST_PORT = ""; public static String ZOOKEEPER_HOST = ""; public static String ZOOKEEPER_PORT = ""; // kafka public static String KAFKA_METADATA_BROKER_LIST = ""; public static String KAFKA_ZOOKEEPER_CONNECT = ""; public static String KAFKA_REQUEST_REQUIRED_ACKS = ""; public static String KAFKA_SERIALIZER_CLASS = ""; public static String KAFKA_PARTITIONER_CLASS = ""; public static String KAFKA_LNT_VALID_DATA_TOPIC = ""; public static String KAFKA_YKT_VALID_DATA_TOPIC = ""; public static String KAFKA_LNT_EXCEPTION_TOPIC = ""; public static String KAFKA_LNT_EXCEPTION_REVIEW_TOPIC = ""; public static String KAFKA_LNT_HIS_MAP_LIST = ""; public static String KAFKA_LNT_VALID_CUSTOMER_ID = ""; public static String KAFKA_LNT_EXCEPTION_CUSTOMER_ID = ""; public static String KAFKA_LNT_EXCEPTION_REVIEW_CUSTOMER_ID = ""; public static String KAFKA_LNT_IMP_LIST = ""; public static CuratorFramework client = null; // FTP public static Map<String, String> FTP = new HashMap<String, String>(); private static Logger logger = LoggerFactory.getLogger(ConfigCommon.class); private ConfigCommon() { try { // 读取config配置文件中的zookeeper 信息,其它配置來自zookeeper ConfigCommon.ZOOKEEPER_HOST_PORT = PropertiesUtil.get("ZOOKEEPER_HOST_PORT"); ConfigCommon.ZOOKEEPER_HOST = PropertiesUtil.get("ZOOKEEPER_HOST"); ConfigCommon.ZOOKEEPER_PORT = PropertiesUtil.get("ZOOKEEPER_PORT"); // 从zookeeper中读取出各种配置 client = ZKClientFactory.newClient(); ZKClientFactory.listeners = new ArrayList<IZKListener>(); // 监听zookeeper 路径信息变化 ZKClientFactory.listeners.add(new FtpConfigListener("/lnt/ftp")); ZKClientFactory.registerListeners(client); client.start(); // kafka配置信息 this.KAFKA_METADATA_BROKER_LIST = ZKCrudOperation.getNodeValue(client, "/lnt/kafka/kafka_metadata_broker_list"); this.KAFKA_ZOOKEEPER_CONNECT = ZKCrudOperation.getNodeValue(client, "/lnt/kafka/kafka_zookeeper_connect"); this.KAFKA_REQUEST_REQUIRED_ACKS = ZKCrudOperation.getNodeValue(client, "/lnt/kafka/kafka_request_required_acks"); this.KAFKA_SERIALIZER_CLASS = ZKCrudOperation.getNodeValue(client, "/lnt/kafka/kafka_serializer_class"); this.KAFKA_PARTITIONER_CLASS = ZKCrudOperation.getNodeValue(client, "/lnt/kafka/kafka_partitioner_class"); this.KAFKA_LNT_VALID_DATA_TOPIC = ZKCrudOperation.getNodeValue(client, "/lnt/kafka/kafka_lnt_valid_data_topic"); this.KAFKA_YKT_VALID_DATA_TOPIC = ZKCrudOperation.getNodeValue(client, "/lnt/kafka/kafka_ykt_valid_data_topic"); this.KAFKA_LNT_EXCEPTION_TOPIC = ZKCrudOperation.getNodeValue(client, "/lnt/kafka/kafka_lnt_exception_topic"); this.KAFKA_LNT_EXCEPTION_REVIEW_TOPIC = ZKCrudOperation.getNodeValue(client, "/lnt/kafka/kafka_lnt_exception_review_topic"); this.KAFKA_LNT_HIS_MAP_LIST = ZKCrudOperation.getNodeValue(client, "/lnt/kafka/kafka_lnt_his_map_list"); this.KAFKA_LNT_VALID_CUSTOMER_ID = ZKCrudOperation.getNodeValue(client, "/lnt/kafka/kafka_lnt_valid_customer_id"); this.KAFKA_LNT_EXCEPTION_CUSTOMER_ID = ZKCrudOperation.getNodeValue(client, "/lnt/kafka/kafka_lnt_exception_customer_id"); this.KAFKA_LNT_EXCEPTION_REVIEW_CUSTOMER_ID = ZKCrudOperation.getNodeValue(client, "/lnt/kafka/kafka_lnt_exception_review_customer_id"); this.KAFKA_LNT_IMP_LIST = ZKCrudOperation.getNodeValue(client, "/lnt/kafka/kafka_lnt_imp_list"); //FTP 配置信息 List<String> nodes = ZKCrudOperation.getPathChildren(client, "/lnt/ftp"); for (String node : nodes) { String json = ZKCrudOperation.getNodeValue(client, "/lnt/ftp/" + node); FTP.put(node, json); } } catch (Exception e) { System.out.println("参数初始化数据异常:" + e); } } private static class SingletonConfig { private static ConfigCommon instance = new ConfigCommon(); } public static ConfigCommon getInstance() { return SingletonConfig.instance; } }
参考 Curator节点监听 zookeeper注册节点掉线自动重新注册 Curator学习 zookeeper监听实现