zookeeper路径信息变化(节点信息变化)监听实现

项目中用的监听zookeeper节点信息变化(如何变化?,变化以后要如何做?待后续的了解)的方式,通过注册监听器,然后开启监听。 zookeeper简单操作类

package com.lancy.zookeeper;

import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

/**
 * @ClassName: ZKCrudOperation
 * @Description: zookeeper的简单操作
 *
 */
public class ZKCrudOperation {
          
   
    /**
     * 
     * @Title: create
     * @Description: this will create the given ZNode with the given data
     * @param client
     * @param path
     * @param payload
     */
    public static void create(CuratorFramework client, String path, byte[] payload) {
        try {
            client.create().forPath(path, payload);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 
     * @Title: createEphemeral
     * @Description:this will create the given EPHEMERAL ZNode with the given data
     * @param client
     * @param path
     * @param payload
     * 
     */
    public static void createEphemeral(CuratorFramework client, String path, byte[] payload) {
        try {
            client.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 
     * @Title: createEphemeralSequential
     * @Description: this will create the given EPHEMERAL-SEQUENTIAL ZNode with
     *               the given data using Curator protection.
     * @param client
     * @param path
     * @param payload
     * @return
     */
    public static String createEphemeralSequential(CuratorFramework client, String path, byte[] payload) {

        try {
            return client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, payload);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 
     * @Title: setData
     * @Description:set data for the given node
     * @param client
     * @param path
     * @param payload
     */
    public static void setData(CuratorFramework client, String path, byte[] payload) {

        try {
            client.setData().forPath(path, payload);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 
     * @Title: delete
     * @Description:delete the given node
     * @param client
     * @param path
     */
    public static void delete(CuratorFramework client, String path) {

        try {
            client.delete().forPath(path);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 
     * @Title: guaranteedDelete
     * @Description: delete the given node and guarantee that it completes
     * @param client
     * @param path
     */
    public static void guaranteedDelete(CuratorFramework client, String path) {
        try {
            client.delete().guaranteed().forPath(path);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 
     * @Title: watchedGetChildren
     * @Description: Get children and set a watcher on the node. The watcher
     *               notification
     * @param client
     * @param path
     * @return
     */
    public static List<String> watchedGetChildren(CuratorFramework client, String path) {

        try {
            return client.getChildren().watched().forPath(path);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 
     * @Title: getPathChildren
     * @Description: Get children from the path
     * @param client
     * @param path
     * @return
     */
    public static List<String> getPathChildren(CuratorFramework client, String path) {

        try {
            return client.getChildren().forPath(path);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 
     * @Title: getNoneValue
     * @Description: 获取节点的值
     * @param client
     * @param path
     * @return
     */
    public static String getNodeValue(CuratorFramework client, String path) {
        try {
            byte[] value = client.getData().forPath(path);
            return value == null ? null : new String(value);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 判断节点是否存在
     * @param client
     * @param path
     * @return
     */
    public static boolean checkNodeExist(CuratorFramework client, String path) {
        boolean flag = true;
        try {
            Stat stat = client.checkExists().forPath(path);
            // Stat就是对zonde所有属性的一个映射, stat=null表示节点不存在!
            if (stat == null) {
                flag = false;
            }
        } catch (Exception e) {
            e.printStackTrace();
            flag = false;
        }
        return flag;
    }
}

监听器的接口

package com.lancy.zookeeper;

import org.apache.curator.framework.CuratorFramework;

public interface IZKListener {
          
   
    void executor(CuratorFramework client);
}

FTP监听器 监听器接口的实现类

package com.lancy.zookeeper.listener;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.lancy.common.ConfigCommon;
import com.lancy.zookeeper.IZKListener;

/**
 * @ClassName: FtpConfigListener
 * @Description: FTP配置
 *
 */
public class FtpConfigListener implements IZKListener {
          
   
    private static Logger logger = LoggerFactory.getLogger(FtpConfigListener.class);
    private String path;

    public FtpConfigListener(String path) {
        this.path = path;
    }

    @Override
    public void executor(CuratorFramework client) {
        //zookeeper的节点变化监听器PathChildrenCache,只能监听下一级节点
        final PathChildrenCache cached = new PathChildrenCache(client, path, true);
        cached.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                PathChildrenCacheEvent.Type eventType = event.getType();
                switch (eventType) {
                    case CONNECTION_RECONNECTED:
                        cached.rebuild();
                        break;
                    case CONNECTION_SUSPENDED:
                    case CONNECTION_LOST:
                        logger.info("Connection error,waiting...");
                        break;
                    case CHILD_REMOVED:
                        logger.info("Child node removed...");
                        //把map里对应的记录删除
                        ConfigCommon.FTP.remove(event.getData().getPath()
                                .substring(event.getData().getPath().lastIndexOf("/") + 1));
                        break;
                    default:
                        logger.info("Child node new added or updated...");
                        //添加到Map里
                        ConfigCommon.FTP.put(
                                event.getData().getPath().substring(event.getData().getPath().lastIndexOf("/") + 1),
                                new String(event.getData().getData()));
                }
            }
        });
        try {
            cached.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        } catch (Exception e) {
            logger.error("zookeeper listener error: ", e);
        }
    }

}

zookeeper 客户端框架Curator的工厂 实现注册监听器,启动监听、启动CuratorFramework。

package com.lancy.zookeeper;

import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import com.lancy.common.util.PropertiesUtil;

public class ZKClientFactory {
          
   

    //保存需要注册的监听器(监听zookeeper不同节点变化的监听器)
    public static List<IZKListener> listeners;

    /**
     * 获取Curator
     */
    public static CuratorFramework newClient() {
        //PropertiesUtil获取配置文件的配置信息;如ZOOKEEPER_HOST_PORT=192.168.1.81:2181
        String connectionString = PropertiesUtil.get("ZOOKEEPER_HOST_PORT");
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
        return CuratorFrameworkFactory.builder().connectString(connectionString).retryPolicy(retryPolicy)
                .connectionTimeoutMs(2000).sessionTimeoutMs(100000).build();

    }

    // 注册需要监听的监听者对像.
    @SuppressWarnings("unused")
    public static void registerListeners(CuratorFramework client) {
        //Curator建议添加实现了ConnectionStateListener接口的监听器,其实现自动重新在zookeeper上注册节点
        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                //如果client与zookeeper的状态是
                if (newState == ConnectionState.CONNECTED || newState == ConnectionState.RECONNECTED) {
                    for (IZKListener listener : listeners) {
                        listener.executor(client);//注册监听器,启动监听器

                    }
                }
            }
        });

        client.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
            @Override
            public void unhandledError(String message, Throwable e) {
                System.out.println(message);
            }
        });
    }
}

静态参数配置 项目中该类保存了大多数的配置信息,这些配置信息都是从zookeeper中的节点获取的,如何把这些配置信息放到zookeeper中,需要后续的补充。。。。。。

package com.lancy.common;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.lancy.common.util.FileUtil;
import com.lancy.common.util.PropertiesUtil;
import com.lancy.common.util.RedisMappingUtil;
import com.lancy.zookeeper.IZKListener;
import com.lancy.zookeeper.ZKClientFactory;
import com.lancy.zookeeper.ZKCrudOperation;
import com.lancy.zookeeper.listener.FtpConfigListener;

/**
 * 静态参数配置
 */
public class ConfigCommon {
          
   
    // zookeeper
    public static String                ZOOKEEPER_HOST_PORT                     = "";
    public static String                ZOOKEEPER_HOST                          = "";
    public static String                ZOOKEEPER_PORT                          = "";
    // kafka
    public static String                KAFKA_METADATA_BROKER_LIST              = "";
    public static String                KAFKA_ZOOKEEPER_CONNECT                 = "";
    public static String                KAFKA_REQUEST_REQUIRED_ACKS             = "";
    public static String                KAFKA_SERIALIZER_CLASS                  = "";
    public static String                KAFKA_PARTITIONER_CLASS                 = "";
    public static String                KAFKA_LNT_VALID_DATA_TOPIC              = "";
    public static String                KAFKA_YKT_VALID_DATA_TOPIC              = "";
    public static String                KAFKA_LNT_EXCEPTION_TOPIC               = "";
    public static String                KAFKA_LNT_EXCEPTION_REVIEW_TOPIC        = "";
    public static String                KAFKA_LNT_HIS_MAP_LIST                  = "";
    public static String                KAFKA_LNT_VALID_CUSTOMER_ID             = "";
    public static String                KAFKA_LNT_EXCEPTION_CUSTOMER_ID         = "";
    public static String                KAFKA_LNT_EXCEPTION_REVIEW_CUSTOMER_ID  = "";
    public static String                KAFKA_LNT_IMP_LIST                      = "";
    public static CuratorFramework  client  = null;

    // FTP
    public static Map<String, String> FTP = new HashMap<String, String>();

    private static Logger logger = LoggerFactory.getLogger(ConfigCommon.class);

    private ConfigCommon() {
        try {
            // 读取config配置文件中的zookeeper 信息,其它配置來自zookeeper
            ConfigCommon.ZOOKEEPER_HOST_PORT = PropertiesUtil.get("ZOOKEEPER_HOST_PORT");
            ConfigCommon.ZOOKEEPER_HOST = PropertiesUtil.get("ZOOKEEPER_HOST");
            ConfigCommon.ZOOKEEPER_PORT = PropertiesUtil.get("ZOOKEEPER_PORT");

            // 从zookeeper中读取出各种配置
            client = ZKClientFactory.newClient();
            ZKClientFactory.listeners = new ArrayList<IZKListener>();

            // 监听zookeeper 路径信息变化
            ZKClientFactory.listeners.add(new FtpConfigListener("/lnt/ftp"));
            ZKClientFactory.registerListeners(client);
            client.start();

            // kafka配置信息
            this.KAFKA_METADATA_BROKER_LIST = ZKCrudOperation.getNodeValue(client,
                    "/lnt/kafka/kafka_metadata_broker_list");
            this.KAFKA_ZOOKEEPER_CONNECT = ZKCrudOperation.getNodeValue(client, "/lnt/kafka/kafka_zookeeper_connect");
            this.KAFKA_REQUEST_REQUIRED_ACKS = ZKCrudOperation.getNodeValue(client,
                    "/lnt/kafka/kafka_request_required_acks");
            this.KAFKA_SERIALIZER_CLASS = ZKCrudOperation.getNodeValue(client, "/lnt/kafka/kafka_serializer_class");
            this.KAFKA_PARTITIONER_CLASS = ZKCrudOperation.getNodeValue(client, "/lnt/kafka/kafka_partitioner_class");
            this.KAFKA_LNT_VALID_DATA_TOPIC = ZKCrudOperation.getNodeValue(client,
                    "/lnt/kafka/kafka_lnt_valid_data_topic");
            this.KAFKA_YKT_VALID_DATA_TOPIC = ZKCrudOperation.getNodeValue(client,
                    "/lnt/kafka/kafka_ykt_valid_data_topic");
            this.KAFKA_LNT_EXCEPTION_TOPIC = ZKCrudOperation.getNodeValue(client,
                    "/lnt/kafka/kafka_lnt_exception_topic");
            this.KAFKA_LNT_EXCEPTION_REVIEW_TOPIC = ZKCrudOperation.getNodeValue(client,
                    "/lnt/kafka/kafka_lnt_exception_review_topic");
            this.KAFKA_LNT_HIS_MAP_LIST = ZKCrudOperation.getNodeValue(client, "/lnt/kafka/kafka_lnt_his_map_list");
            this.KAFKA_LNT_VALID_CUSTOMER_ID = ZKCrudOperation.getNodeValue(client,
                    "/lnt/kafka/kafka_lnt_valid_customer_id");
            this.KAFKA_LNT_EXCEPTION_CUSTOMER_ID = ZKCrudOperation.getNodeValue(client,
                    "/lnt/kafka/kafka_lnt_exception_customer_id");
            this.KAFKA_LNT_EXCEPTION_REVIEW_CUSTOMER_ID = ZKCrudOperation.getNodeValue(client,
                    "/lnt/kafka/kafka_lnt_exception_review_customer_id");
            this.KAFKA_LNT_IMP_LIST = ZKCrudOperation.getNodeValue(client, "/lnt/kafka/kafka_lnt_imp_list");

            //FTP 配置信息
            List<String> nodes = ZKCrudOperation.getPathChildren(client, "/lnt/ftp");
            for (String node : nodes) {
                String json = ZKCrudOperation.getNodeValue(client, "/lnt/ftp/" + node);
                FTP.put(node, json);
            }
        } catch (Exception e) {
            System.out.println("参数初始化数据异常:" + e);
        }
    }

    private static class SingletonConfig {
          
   
        private static ConfigCommon instance    = new ConfigCommon();
    }

    public static ConfigCommon getInstance() {
        return SingletonConfig.instance;
    }

}

参考 Curator节点监听 zookeeper注册节点掉线自动重新注册 Curator学习 zookeeper监听实现

经验分享 程序员 微信小程序 职场和发展