> filename2Config : filename2ConfigMap) {
+ cacheConfigs.putAll(filename2Config.getValue());
+ }
+ MapPropertySource propertySource =
+ new MapPropertySource(PROPERTY_SOURCE_NAME, cacheConfigs);
+ environment.getPropertySources().addFirst(propertySource);
+ }
+ }
+ };
+ }
+
+ private static void initClientInfo(String defaultCacheDir, ConfigurableEnvironment environment) {
+ clientInfo.setServer(
+ environment.getProperty(DEFAULT_CONFIG_NAME_SERVER, String.class, DEFAULT_CONFIG_SERVER));
+ clientInfo.setPort(
+ environment.getProperty(DEFAULT_CONFIG_NAME_PORT, Integer.class, DEFAULT_CONFIG_PORT));
+ clientInfo.setEnv(
+ environment.getProperty(DEFAULT_CONFIG_NAME_ENV, String.class, DEFAULT_CONFIG_ENV));
+ clientInfo.setDataId(
+ environment.getProperty(DEFAULT_CONFIG_NAME_DATA_ID, String.class, DEFAULT_CONFIG_DATA_ID));
+ clientInfo.setCacheDir(
+ environment.getProperty(DEFAULT_CONFIG_NAME_CACHE_DIR, String.class, defaultCacheDir));
+ }
+
+ private static String getDefaultCacheDir() {
+ String defaultCacheDir;
+ String os = System.getProperty("os.name");
+ if (os.toLowerCase().startsWith(OS_TYPE_WIN)) {
+ defaultCacheDir = DEFAULT_PATH_WIN;
+ } else if (os.toLowerCase().startsWith(OS_TYPE_MAC)) {
+ defaultCacheDir = DEFAULT_PATH_MAC;
+ } else {
+ defaultCacheDir = DEFAULT_PATH_MAC;
}
- }
- };
- }
-
- private static void initClientInfo(String defaultCacheDir, ConfigurableEnvironment environment) {
- clientInfo.setServer(
- environment.getProperty(DEFAULT_CONFIG_NAME_SERVER, String.class, DEFAULT_CONFIG_SERVER));
- clientInfo.setPort(
- environment.getProperty(DEFAULT_CONFIG_NAME_PORT, Integer.class, DEFAULT_CONFIG_PORT));
- clientInfo.setEnv(
- environment.getProperty(DEFAULT_CONFIG_NAME_ENV, String.class, DEFAULT_CONFIG_ENV));
- clientInfo.setDataId(
- environment.getProperty(DEFAULT_CONFIG_NAME_DATA_ID, String.class, DEFAULT_CONFIG_DATA_ID));
- clientInfo.setCacheDir(
- environment.getProperty(DEFAULT_CONFIG_NAME_CACHE_DIR, String.class, defaultCacheDir));
- }
-
- private static String getDefaultCacheDir() {
- String defaultCacheDir;
- String os = System.getProperty("os.name");
- if (os.toLowerCase().startsWith(OS_TYPE_WIN)) {
- defaultCacheDir = DEFAULT_PATH_WIN;
- } else if (os.toLowerCase().startsWith(OS_TYPE_MAC)) {
- defaultCacheDir = DEFAULT_PATH_MAC;
- } else {
- defaultCacheDir = DEFAULT_PATH_MAC;
+ return defaultCacheDir;
}
- return defaultCacheDir;
- }
- private static void validateCacheDirPath(String defaultCacheDir, String cacheDir) {
- if (defaultCacheDir.contains(DEFAULT_PATH_WIN_SEP)
- && !cacheDir.contains(DEFAULT_PATH_WIN_SEP)) {
- throw new RuntimeException(DEFAULT_CONFIG_NAME_CACHE_DIR + " 配置的路径不正确");
+ private static void validateCacheDirPath(String defaultCacheDir, String cacheDir) {
+ if (defaultCacheDir.contains(DEFAULT_PATH_WIN_SEP)
+ && !cacheDir.contains(DEFAULT_PATH_WIN_SEP)) {
+ throw new RuntimeException(DEFAULT_CONFIG_NAME_CACHE_DIR + " 配置的路径不正确");
+ }
+ if (cacheDir.contains(DEFAULT_PATH_WIN_SEP) && !cacheDir.contains(DEFAULT_PATH_SEP_WIN)) {
+ throw new RuntimeException(
+ DEFAULT_CONFIG_NAME_CACHE_DIR + " 配置的路径不正确, 正确的路径示范, " + DEFAULT_PATH_WIN);
+ }
}
- if (cacheDir.contains(DEFAULT_PATH_WIN_SEP) && !cacheDir.contains(DEFAULT_PATH_SEP_WIN)) {
- throw new RuntimeException(
- DEFAULT_CONFIG_NAME_CACHE_DIR + " 配置的路径不正确, 正确的路径示范, " + DEFAULT_PATH_WIN);
- }
- }
- /** 创建缓存文件夹 */
- private static void createCacheDir(String cacheDir) {
- Path path = Paths.get(cacheDir);
- if (!Files.exists(path)) {
- File mkdir = FileUtil.mkdir(cacheDir);
- if (!mkdir.canWrite()) {
- throw new RuntimeException("缓存文件夹创建失败, 无读写权限");
- }
+ /**
+ * 创建缓存文件夹
+ */
+ private static void createCacheDir(String cacheDir) {
+ Path path = Paths.get(cacheDir);
+ if (!Files.exists(path)) {
+ File mkdir = FileUtil.mkdir(cacheDir);
+ if (!mkdir.canWrite()) {
+ throw new RuntimeException("缓存文件夹创建失败, 无读写权限");
+ }
+ }
}
- }
}
diff --git a/kenaito-config-core/src/main/java/cn/odboy/config/context/ClientPropertyHelper.java b/kenaito-config-core/src/main/java/cn/odboy/config/context/ClientPropertyHelper.java
index 4976266..57e431d 100644
--- a/kenaito-config-core/src/main/java/cn/odboy/config/context/ClientPropertyHelper.java
+++ b/kenaito-config-core/src/main/java/cn/odboy/config/context/ClientPropertyHelper.java
@@ -1,18 +1,20 @@
package cn.odboy.config.context;
import cn.hutool.core.util.StrUtil;
-import java.util.Map;
import lombok.RequiredArgsConstructor;
-import org.springframework.cloud.context.refresh.ConfigDataContextRefresher;
-import org.springframework.cloud.context.refresh.ContextRefresher;
+//import org.springframework.cloud.context.refresh.ConfigDataContextRefresher;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.core.env.MutablePropertySources;
import org.springframework.core.env.PropertySource;
import org.springframework.stereotype.Component;
+import java.util.Map;
+
/**
* 客户端配置 辅助类
+ *
+ * 依赖 spring-cloud-context
*
* @author odboy
* @date 2024-12-07
@@ -20,33 +22,34 @@ import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class ClientPropertyHelper {
- private final ConfigurableEnvironment environment;
- private final ValueAnnotationProcessor valueAnnotationProcessor;
- private final ConfigDataContextRefresher configDataContextRefresher;
+ private final ConfigurableEnvironment environment;
+ private final ValueAnnotationProcessor valueAnnotationProcessor;
+// private final ConfigDataContextRefresher configDataContextRefresher;
+ private final ConfigPropertyContextRefresher contextRefresher;
- /**
- * 动态更新配置值
- *
- * @param propertyName 属性路径名
- * @param value 属性值
- */
- public void updateValue(String propertyName, Object value) {
- if (StrUtil.isNotBlank(propertyName)) {
- // 设置属性值
- MutablePropertySources propertySources = environment.getPropertySources();
- if (propertySources.contains(ClientConfigLoader.PROPERTY_SOURCE_NAME)) {
- // 更新属性值
- PropertySource> propertySource =
- propertySources.get(ClientConfigLoader.PROPERTY_SOURCE_NAME);
- Map source = ((MapPropertySource) propertySource).getSource();
- source.put(propertyName, value);
- }
- // 单独更新@Value对应的值
- valueAnnotationProcessor.setValue(propertyName, value);
- // 刷新上下文(解决 @ConfigurationProperties注解的类属性值更新 问题)
- // Spring Cloud只会对被@RefreshScope和@ConfigurationProperties标注的bean进行刷新
- // 这个方法主要做了两件事:刷新配置源,也就是PropertySource,然后刷新了@ConfigurationProperties注解的类
- configDataContextRefresher.refresh();
+ /**
+ * 动态更新配置值
+ *
+ * @param propertyName 属性路径名
+ * @param value 属性值
+ */
+ public void updateValue(String propertyName, Object value) {
+ if (StrUtil.isNotBlank(propertyName)) {
+ // 设置属性值
+ MutablePropertySources propertySources = environment.getPropertySources();
+ if (propertySources.contains(ClientConfigLoader.PROPERTY_SOURCE_NAME)) {
+ // 更新属性值
+ PropertySource> propertySource = propertySources.get(ClientConfigLoader.PROPERTY_SOURCE_NAME);
+ Map source = ((MapPropertySource) propertySource).getSource();
+ source.put(propertyName, value);
+ }
+ // 单独更新@Value对应的值
+ valueAnnotationProcessor.setValue(propertyName, value);
+ // 刷新上下文(解决 @ConfigurationProperties注解的类属性值更新 问题)
+ // Spring Cloud只会对被@RefreshScope和@ConfigurationProperties标注的bean进行刷新
+ // 这个方法主要做了两件事:刷新配置源,也就是PropertySource,然后刷新了@ConfigurationProperties注解的类
+// configDataContextRefresher.refresh();
+ contextRefresher.refreshAll();
+ }
}
- }
}
diff --git a/kenaito-config-core/src/main/java/cn/odboy/config/context/ConfigPropertyContextRefresher.java b/kenaito-config-core/src/main/java/cn/odboy/config/context/ConfigPropertyContextRefresher.java
new file mode 100644
index 0000000..1b9bfe0
--- /dev/null
+++ b/kenaito-config-core/src/main/java/cn/odboy/config/context/ConfigPropertyContextRefresher.java
@@ -0,0 +1,74 @@
+package cn.odboy.config.context;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.boot.context.properties.bind.Bindable;
+import org.springframework.boot.context.properties.bind.Binder;
+import org.springframework.core.env.ConfigurableEnvironment;
+import org.springframework.core.env.MapPropertySource;
+import org.springframework.core.env.MutablePropertySources;
+import org.springframework.core.env.PropertySource;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 配置属性上下文 刷新
+ *
+ * @author odboy
+ * @date 2024-12-07
+ */
+@Component
+@RequiredArgsConstructor
+public class ConfigPropertyContextRefresher {
+ private final ConfigurationPropertiesAnnotationProcessor processor;
+ private final ConfigurableEnvironment environment;
+
+ /**
+ * 刷新单个属性
+ *
+ * @param propertyName 属性名表达式
+ * @param value 属性值
+ */
+ public void refreshSingle(String propertyName, Object value) {
+ MutablePropertySources propertySources = environment.getPropertySources();
+ if (propertySources.contains(ClientConfigLoader.PROPERTY_SOURCE_NAME)) {
+ // 更新属性值
+ PropertySource> propertySource = propertySources.get(ClientConfigLoader.PROPERTY_SOURCE_NAME);
+ Map source = ((MapPropertySource) propertySource).getSource();
+ source.put(propertyName, value);
+ } else {
+ // 新增属性值
+ Map propertyMap = new HashMap<>(1);
+ MapPropertySource propertySource = new MapPropertySource(ClientConfigLoader.PROPERTY_SOURCE_NAME, propertyMap);
+ propertySources.addFirst(propertySource);
+ }
+ // 使用 Binder 重新绑定 @ConfigurationProperties
+ Binder binder = Binder.get(environment);
+ for (Map.Entry propertyPrefixBean : processor.getPrefixBeanMap().entrySet()) {
+ binder.bind(propertyPrefixBean.getKey(), Bindable.ofInstance(propertyPrefixBean.getValue()));
+ }
+ }
+
+ /**
+ * 刷新所有属性
+ */
+ public void refreshAll() {
+ MutablePropertySources propertySources = environment.getPropertySources();
+ if (propertySources.contains(ClientConfigLoader.PROPERTY_SOURCE_NAME)) {
+ // 替换属性值
+ MapPropertySource propertySource = new MapPropertySource(ClientConfigLoader.PROPERTY_SOURCE_NAME, ClientConfigLoader.cacheConfigs);
+ propertySources.replace(ClientConfigLoader.PROPERTY_SOURCE_NAME, propertySource);
+ } else {
+ // 新增属性值
+ Map propertyMap = new HashMap<>(1);
+ MapPropertySource propertySource = new MapPropertySource(ClientConfigLoader.PROPERTY_SOURCE_NAME, propertyMap);
+ propertySources.addFirst(propertySource);
+ }
+ // 使用 Binder 重新绑定 @ConfigurationProperties
+ Binder binder = Binder.get(environment);
+ for (Map.Entry propertyPrefixBean : processor.getPrefixBeanMap().entrySet()) {
+ binder.bind(propertyPrefixBean.getKey(), Bindable.ofInstance(propertyPrefixBean.getValue()));
+ }
+ }
+}
diff --git a/kenaito-config-core/src/main/java/cn/odboy/config/context/ConfigurationPropertiesAnnotationProcessor.java b/kenaito-config-core/src/main/java/cn/odboy/config/context/ConfigurationPropertiesAnnotationProcessor.java
new file mode 100644
index 0000000..facb842
--- /dev/null
+++ b/kenaito-config-core/src/main/java/cn/odboy/config/context/ConfigurationPropertiesAnnotationProcessor.java
@@ -0,0 +1,65 @@
+package cn.odboy.config.context;
+
+import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 加载并处理@ConfigurationProperties对应的引用
+ *
+ * @author odboy
+ * @date 2024-12-07
+ */
+@Component
+public class ConfigurationPropertiesAnnotationProcessor implements BeanPostProcessor {
+ private final Logger logger =
+ LoggerFactory.getLogger(ConfigurationPropertiesAnnotationProcessor.class);
+ @Getter
+ private final Map prefixBeanMap = new HashMap<>();
+
+ @Override
+ public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+ Class> clazz = bean.getClass();
+ while (clazz != null) {
+ if (clazz.isAnnotationPresent(ConfigurationProperties.class)) {
+ // 排除springboot框架的配置
+ if (beanName.contains("springframework")) {
+ clazz = clazz.getSuperclass();
+ continue;
+ }
+ // 排除数据源框架的配置
+ if (beanName.contains("dataSource") || beanName.contains("druid")) {
+ clazz = clazz.getSuperclass();
+ continue;
+ }
+ // 排除ORM框架的配置
+ if (beanName.contains("mybatis")) {
+ clazz = clazz.getSuperclass();
+ continue;
+ }
+ // 排除ip2region的配置
+ if (beanName.contains("ip2region")) {
+ clazz = clazz.getSuperclass();
+ continue;
+ }
+ logger.info("扫描到自定义的@ConfigurationProperties注解类: {}", beanName);
+ this.processConfigBean(clazz, bean);
+ }
+ clazz = clazz.getSuperclass();
+ }
+ return bean;
+ }
+
+ private void processConfigBean(Class> clazz, Object bean) {
+ ConfigurationProperties annotation = clazz.getAnnotation(ConfigurationProperties.class);
+ // 比如: kenaito.config-center
+ prefixBeanMap.put(annotation.prefix(), bean);
+ }
+}
diff --git a/kenaito-config-core/src/main/java/cn/odboy/config/context/ValueAnnotationProcessor.java b/kenaito-config-core/src/main/java/cn/odboy/config/context/ValueAnnotationProcessor.java
index f53fd1c..694f1ff 100644
--- a/kenaito-config-core/src/main/java/cn/odboy/config/context/ValueAnnotationProcessor.java
+++ b/kenaito-config-core/src/main/java/cn/odboy/config/context/ValueAnnotationProcessor.java
@@ -1,8 +1,5 @@
package cn.odboy.config.context;
-import java.lang.reflect.Field;
-import java.util.HashMap;
-import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
@@ -10,6 +7,10 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* 加载并处理@Value对应的引用
*
@@ -18,46 +19,46 @@ import org.springframework.stereotype.Component;
*/
@Component
public class ValueAnnotationProcessor implements BeanPostProcessor {
- private final Logger logger = LoggerFactory.getLogger(ValueAnnotationProcessor.class);
- private final Map valueFieldMap = new HashMap<>();
- private final Map valueBeanMap = new HashMap<>();
+ private final Logger logger = LoggerFactory.getLogger(ValueAnnotationProcessor.class);
+ private final Map valueFieldMap = new HashMap<>();
+ private final Map valueBeanMap = new HashMap<>();
- @Override
- public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
- Class> clazz = bean.getClass();
- while (clazz != null) {
- // 处理@Value
- for (Field field : clazz.getDeclaredFields()) {
- if (field.isAnnotationPresent(Value.class)) {
- field.setAccessible(true);
- // 对@Value的处理
- Value annotation = field.getAnnotation(Value.class);
- String propertyName = annotation.value();
- propertyName =
- propertyName.substring(propertyName.indexOf("${") + 2, propertyName.lastIndexOf("}"));
- // 处理带有默认值的配置
- if (propertyName.contains(":")) {
- int firstSpIndex = propertyName.indexOf(":");
- propertyName = propertyName.substring(0, firstSpIndex);
- }
- valueFieldMap.put(propertyName, field);
- valueBeanMap.put(propertyName, bean);
+ @Override
+ public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+ Class> clazz = bean.getClass();
+ while (clazz != null) {
+ // 处理@Value
+ for (Field field : clazz.getDeclaredFields()) {
+ if (field.isAnnotationPresent(Value.class)) {
+ field.setAccessible(true);
+ // 对@Value的处理
+ Value annotation = field.getAnnotation(Value.class);
+ String propertyName = annotation.value();
+ propertyName =
+ propertyName.substring(propertyName.indexOf("${") + 2, propertyName.lastIndexOf("}"));
+ // 处理带有默认值的配置
+ if (propertyName.contains(":")) {
+ int firstSpIndex = propertyName.indexOf(":");
+ propertyName = propertyName.substring(0, firstSpIndex);
+ }
+ valueFieldMap.put(propertyName, field);
+ valueBeanMap.put(propertyName, bean);
+ }
+ }
+ clazz = clazz.getSuperclass();
}
- }
- clazz = clazz.getSuperclass();
+ return bean;
}
- return bean;
- }
- public void setValue(String propertyName, Object value) {
- try {
- Field field = valueFieldMap.getOrDefault(propertyName, null);
- if (field != null) {
- field.setAccessible(true);
- field.set(valueBeanMap.get(propertyName), value);
- }
- } catch (IllegalAccessException e) {
- logger.error("设置 {} 字段值 {} 失败", propertyName, value, e);
+ public void setValue(String propertyName, Object value) {
+ try {
+ Field field = valueFieldMap.getOrDefault(propertyName, null);
+ if (field != null) {
+ field.setAccessible(true);
+ field.set(valueBeanMap.get(propertyName), value);
+ }
+ } catch (IllegalAccessException e) {
+ logger.error("设置 {} 字段值 {} 失败", propertyName, value, e);
+ }
}
- }
}
diff --git a/kenaito-config-core/src/main/java/cn/odboy/config/netty/ConfigClient.java b/kenaito-config-core/src/main/java/cn/odboy/config/netty/ConfigClient.java
index 172b55d..9f01763 100644
--- a/kenaito-config-core/src/main/java/cn/odboy/config/netty/ConfigClient.java
+++ b/kenaito-config-core/src/main/java/cn/odboy/config/netty/ConfigClient.java
@@ -6,11 +6,12 @@ import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
-import java.util.concurrent.TimeUnit;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.TimeUnit;
+
/**
* 配置中心客户端
*
@@ -19,134 +20,161 @@ import org.slf4j.LoggerFactory;
*/
@Data
public class ConfigClient {
- private static final Logger logger = LoggerFactory.getLogger(ConfigClient.class);
+ private static final Logger logger = LoggerFactory.getLogger(ConfigClient.class);
- /** 客户端 */
- private EventLoopGroup eventLoopGroup;
+ /**
+ * 客户端
+ */
+ private EventLoopGroup eventLoopGroup;
- /** 存放客户端bootstrap对象 */
- private Bootstrap bootstrap;
+ /**
+ * 存放客户端bootstrap对象
+ */
+ private Bootstrap bootstrap;
- /** 存放客户端channel对象 */
- private Channel channel;
+ /**
+ * 存放客户端channel对象
+ */
+ private Channel channel;
- /** 重连间隔,单位秒 */
- private Integer delaySeconds = 5;
+ /**
+ * 重连间隔,单位秒
+ */
+ private Integer delaySeconds = 5;
- /** 连接属性:服务ip */
- private String serverIp;
+ /**
+ * 连接属性:服务ip
+ */
+ private String serverIp;
- /** 连接属性:服务端口 */
- private Integer serverPort;
+ /**
+ * 连接属性:服务端口
+ */
+ private Integer serverPort;
- /** 私有静态实例 */
- private static volatile ConfigClient instance;
+ /**
+ * 私有静态实例
+ */
+ private static volatile ConfigClient instance;
- /** 最大重试次数 */
- private static final int MAX_RETRY_COUNT = 2;
+ /**
+ * 最大重试次数
+ */
+ private static final int MAX_RETRY_COUNT = 2;
- /** 当前重试次数 */
- private static int retryCount = 0;
+ /**
+ * 当前重试次数
+ */
+ private static int retryCount = 0;
- /** 私有化构造函数 */
- private ConfigClient() {}
+ /**
+ * 私有化构造函数
+ */
+ private ConfigClient() {
+ }
- /** 获取 */
- public static ConfigClient getInstance() {
- if (instance == null) {
- synchronized (ConfigClient.class) {
+ /**
+ * 获取
+ */
+ public static ConfigClient getInstance() {
if (instance == null) {
- instance = new ConfigClient();
- }
- }
- }
- return instance;
- }
-
- public void start(String server, Integer port) throws InterruptedException {
- this.serverIp = server;
- this.serverPort = port;
- try {
- this.eventLoopGroup = new NioEventLoopGroup();
- this.bootstrap = new Bootstrap();
- this.bootstrap
- .group(this.eventLoopGroup)
- .option(ChannelOption.SO_KEEPALIVE, true)
- // 设置接收缓冲区大小为10MB
- .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(1024 * 1024 * 10))
- .channel(NioSocketChannel.class)
- .handler(
- new ChannelInitializer() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast(new ConfigClientHandler());
+ synchronized (ConfigClient.class) {
+ if (instance == null) {
+ instance = new ConfigClient();
}
- });
- this.doConnect(server, port);
- } finally {
- // eventLoopGroup.shutdownGracefully();
- }
- }
-
- private void doConnect(String server, Integer port) throws InterruptedException {
- logger.info("开始连接配置中心服务...");
- if (channel != null && channel.isActive()) {
- return;
- }
- bootstrap.connect(server, port).addListener(new ConfigClientChannelListener()).sync().channel();
- /// channel.closeFuture().sync();
- }
-
- private class ConfigClientChannelListener implements ChannelFutureListener {
- /** 该方法会在channelActive之前执行,去判断客户端连接是否成功,并做失败重连的操作 */
- @Override
- public void operationComplete(ChannelFuture channelFuture) throws Exception {
- // 连接成功后保存Channel
- if (channelFuture.isSuccess()) {
- channel = channelFuture.channel();
- // 重置重试次数
- retryCount = 0;
- logger.info("连接成功 {}:{}", serverIp, serverPort);
- } else {
- // 失败后delaySecond秒(默认是5秒)重连,周期性delaySecond秒的重连
- retryCount++;
- logger.info("当前重试次数: {}", retryCount);
- if (retryCount >= MAX_RETRY_COUNT) {
- ClientConfigLoader.isConfigLoaded = true;
- ClientConfigLoader.isServerOffline = true;
- synchronized (ClientConfigLoader.clientInfo) {
- // 通知等待的线程
- ClientConfigLoader.clientInfo.notifyAll();
- }
- } else {
- // 进行重连
- channelFuture
- .channel()
- .eventLoop()
- .schedule(ConfigClient.this::reConnect, delaySeconds, TimeUnit.SECONDS);
+ }
}
- }
+ return instance;
}
- }
- /** 重新连接 */
- protected void reConnect() {
- try {
- logger.info("重连配置中心服务 {}:{}", this.serverIp, this.serverPort);
- ClientConfigLoader.isConfigLoaded = false;
- ClientConfigLoader.isServerOffline = false;
- if (channel != null && channel.isOpen()) {
- logger.info("Channel已激活, 关闭且重启中...");
- channel.close();
- }
- bootstrap
- .connect(this.serverIp, this.serverPort)
- .addListener(new ConfigClientChannelListener())
- .sync()
- .channel();
- } catch (Exception e) {
- logger.error("重连配置中心服务失败 {}:{}", this.serverIp, this.serverPort, e);
+ public void start(String server, Integer port) throws InterruptedException {
+ this.serverIp = server;
+ this.serverPort = port;
+ try {
+ this.eventLoopGroup = new NioEventLoopGroup();
+ this.bootstrap = new Bootstrap();
+ this.bootstrap
+ .group(this.eventLoopGroup)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ // 设置接收缓冲区大小为10MB
+ .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(1024 * 1024 * 10))
+ .channel(NioSocketChannel.class)
+ .handler(
+ new ChannelInitializer() {
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast(new ConfigClientHandler());
+ }
+ });
+ this.doConnect(server, port);
+ } finally {
+ // eventLoopGroup.shutdownGracefully();
+ }
+ }
+
+ private void doConnect(String server, Integer port) throws InterruptedException {
+ logger.info("开始连接配置中心服务...");
+ if (channel != null && channel.isActive()) {
+ return;
+ }
+ bootstrap.connect(server, port).addListener(new ConfigClientChannelListener()).sync().channel();
+ /// channel.closeFuture().sync();
+ }
+
+ private class ConfigClientChannelListener implements ChannelFutureListener {
+ /**
+ * 该方法会在channelActive之前执行,去判断客户端连接是否成功,并做失败重连的操作
+ */
+ @Override
+ public void operationComplete(ChannelFuture channelFuture) throws Exception {
+ // 连接成功后保存Channel
+ if (channelFuture.isSuccess()) {
+ channel = channelFuture.channel();
+ // 重置重试次数
+ retryCount = 0;
+ logger.info("连接成功 {}:{}", serverIp, serverPort);
+ } else {
+ // 失败后delaySecond秒(默认是5秒)重连,周期性delaySecond秒的重连
+ retryCount++;
+ logger.info("当前重试次数: {}", retryCount);
+ if (retryCount >= MAX_RETRY_COUNT) {
+ ClientConfigLoader.isConfigLoaded = true;
+ ClientConfigLoader.isServerOffline = true;
+ synchronized (ClientConfigLoader.clientInfo) {
+ // 通知等待的线程
+ ClientConfigLoader.clientInfo.notifyAll();
+ }
+ } else {
+ // 进行重连
+ channelFuture
+ .channel()
+ .eventLoop()
+ .schedule(ConfigClient.this::reConnect, delaySeconds, TimeUnit.SECONDS);
+ }
+ }
+ }
+ }
+
+ /**
+ * 重新连接
+ */
+ protected void reConnect() {
+ try {
+ logger.info("重连配置中心服务 {}:{}", this.serverIp, this.serverPort);
+ ClientConfigLoader.isConfigLoaded = false;
+ ClientConfigLoader.isServerOffline = false;
+ if (channel != null && channel.isOpen()) {
+ logger.info("Channel已激活, 关闭且重启中...");
+ channel.close();
+ }
+ bootstrap
+ .connect(this.serverIp, this.serverPort)
+ .addListener(new ConfigClientChannelListener())
+ .sync()
+ .channel();
+ } catch (Exception e) {
+ logger.error("重连配置中心服务失败 {}:{}", this.serverIp, this.serverPort, e);
+ }
}
- }
}
diff --git a/kenaito-config-core/src/main/java/cn/odboy/config/netty/ConfigClientHandler.java b/kenaito-config-core/src/main/java/cn/odboy/config/netty/ConfigClientHandler.java
index 12daea3..df2d8e7 100644
--- a/kenaito-config-core/src/main/java/cn/odboy/config/netty/ConfigClientHandler.java
+++ b/kenaito-config-core/src/main/java/cn/odboy/config/netty/ConfigClientHandler.java
@@ -9,14 +9,15 @@ import cn.odboy.config.model.msgtype.ConfigFileInfo;
import cn.odboy.config.util.MessageUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
/**
* 配置中心客户端 业务处理
@@ -25,99 +26,99 @@ import org.yaml.snakeyaml.Yaml;
* @date 2024-12-06
*/
public class ConfigClientHandler extends ChannelInboundHandlerAdapter {
- private static final Logger logger = LoggerFactory.getLogger(ConfigClientHandler.class);
+ private static final Logger logger = LoggerFactory.getLogger(ConfigClientHandler.class);
- @Override
- public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
- /// logger.info("当Channel已经注册到它的EventLoop并且能够处理I/O时被调用");
- super.channelRegistered(ctx);
- }
-
- @Override
- public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
- /// logger.info("当Channel从它的EventLoop注销并且无法处理任何I/O时被调用");
- super.channelUnregistered(ctx);
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- /// logger.info("当Channel处于活动状态(已经连接到它的远程节点)时被调用, 注册客户端");
- SmallMessage smallMessage = new SmallMessage();
- smallMessage.setType(TransferMessageType.REGISTER);
- smallMessage.setResp(SmallMessage.Response.ok(ClientConfigLoader.clientInfo));
- ctx.writeAndFlush(MessageUtil.toByteBuf(smallMessage));
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- /// logger.info("当Channel离开活动状态并且不再连接到它的远程节点时被调用");
- /// this.configClient.reConnect();
- super.channelInactive(ctx);
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- /// logger.info("当从Channel读取数据时被调用, 收到服务器消息");
- logger.info("从服务端读取到Object");
- SmallMessage smallMessage = MessageUtil.getMessage(msg);
- SmallMessage.Response resp = smallMessage.getResp();
- switch (smallMessage.getType()) {
- case REGISTER:
- if (!resp.getSuccess()) {
- logger.info("注册失败, {}", resp.getErrorMessage());
- } else {
- logger.info("注册成功, 给服务端发信号, 表明准备好可以拉取配置了");
- // 准备拉取配置,给服务端发信号
- SmallMessage pullConfigMessage = new SmallMessage();
- pullConfigMessage.setType(TransferMessageType.PULL_CONFIG);
- pullConfigMessage.setResp(SmallMessage.Response.ok(null, "准备好拉取配置了"));
- ctx.writeAndFlush(MessageUtil.toByteBuf(pullConfigMessage));
- }
- break;
- case PUSH_CONFIG:
- if (!resp.getSuccess()) {
- throw new RuntimeException(resp.getErrorMessage());
- }
- List configFileInfos = MessageUtil.toConfigFileInfoList(resp.getData());
- logger.info("收到来自服务端推送的配置信息");
- Map originConfigs = new HashMap<>(1);
- Map> lastConfigs = new HashMap<>(1);
- try {
- for (ConfigFileInfo configFileInfo : configFileInfos) {
- String fileName = configFileInfo.getFileName();
- // 配置原生内容
- originConfigs.put(fileName, configFileInfo.getFileContent());
- // 转换为应用能识别的配置项
- String suffix = FileUtil.getSuffix(fileName);
- boolean isYml = "yml".equals(suffix) || "yaml".equals(suffix);
- if (isYml) {
- Yaml yaml = new Yaml();
- lastConfigs.put(fileName, yaml.load(configFileInfo.getFileContent()));
- } else {
- Properties properties = new Properties();
- properties.load(StrUtil.getReader(configFileInfo.getFileContent()));
- Map tempMap = new HashMap<>(1);
- for (Map.Entry