feat:自定义 @ConfigurationProperties注解的类属性值更新处理方式
This commit is contained in:
parent
b1d810de95
commit
49860000fc
|
@ -29,6 +29,9 @@
|
|||
- 20241206 读取本地配置缓存(连接服务端失败后的兜底操作)
|
||||
- 20241207 动态更新@Value注解的属性
|
||||
- 20241207 动态更新@ConfigurationProperties注解类中的属性
|
||||
- 感想:太艰难了
|
||||
- 感谢:spring-cloud-context 给我的灵感
|
||||
- 耗时:4小时
|
||||
- 202412xx 动态替换配置指令实现 [loading]
|
||||
|
||||
#### 服务端
|
||||
|
|
|
@ -19,10 +19,10 @@
|
|||
<artifactId>kenaito-config-common</artifactId>
|
||||
<version>1.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-context</artifactId>
|
||||
<version>4.2.0</version>
|
||||
</dependency>
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>org.springframework.cloud</groupId>-->
|
||||
<!-- <artifactId>spring-cloud-context</artifactId>-->
|
||||
<!-- <version>4.2.0</version>-->
|
||||
<!-- </dependency>-->
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -5,15 +5,6 @@ import cn.hutool.core.thread.ThreadUtil;
|
|||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.odboy.config.model.msgtype.ClientInfo;
|
||||
import cn.odboy.config.netty.ConfigClient;
|
||||
import java.io.File;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.BeansException;
|
||||
|
@ -25,6 +16,16 @@ import org.springframework.core.env.ConfigurableEnvironment;
|
|||
import org.springframework.core.env.MapPropertySource;
|
||||
import org.yaml.snakeyaml.Yaml;
|
||||
|
||||
import java.io.File;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* 配置加载器
|
||||
*
|
||||
|
@ -33,238 +34,272 @@ import org.yaml.snakeyaml.Yaml;
|
|||
*/
|
||||
@Configuration
|
||||
public class ClientConfigLoader {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ClientConfigLoader.class);
|
||||
private static final Logger logger = LoggerFactory.getLogger(ClientConfigLoader.class);
|
||||
|
||||
/** 默认的配置值 */
|
||||
private static final String OS_TYPE_WIN = "win";
|
||||
/**
|
||||
* 默认的配置值
|
||||
*/
|
||||
private static final String OS_TYPE_WIN = "win";
|
||||
|
||||
private static final String OS_TYPE_MAC = "mac";
|
||||
private static final String DEFAULT_PATH_WIN = "c:\\data";
|
||||
private static final String DEFAULT_PATH_MAC = "/home/admin/data";
|
||||
private static final String DEFAULT_CONFIG_SERVER = "127.0.0.1";
|
||||
private static final Integer DEFAULT_CONFIG_PORT = 28010;
|
||||
private static final String DEFAULT_CONFIG_ENV = "default";
|
||||
private static final String DEFAULT_CONFIG_DATA_ID = "default";
|
||||
private static final String DEFAULT_PATH_WIN_SEP = ":";
|
||||
private static final String OS_TYPE_MAC = "mac";
|
||||
private static final String DEFAULT_PATH_WIN = "c:\\data";
|
||||
private static final String DEFAULT_PATH_MAC = "/home/admin/data";
|
||||
private static final String DEFAULT_CONFIG_SERVER = "127.0.0.1";
|
||||
private static final Integer DEFAULT_CONFIG_PORT = 28010;
|
||||
private static final String DEFAULT_CONFIG_ENV = "default";
|
||||
private static final String DEFAULT_CONFIG_DATA_ID = "default";
|
||||
private static final String DEFAULT_PATH_WIN_SEP = ":";
|
||||
|
||||
/** 默认配置项:配置中心服务ip */
|
||||
private static final String DEFAULT_CONFIG_NAME_SERVER = "kenaito.config-center.server";
|
||||
/**
|
||||
* 默认配置项:配置中心服务ip
|
||||
*/
|
||||
private static final String DEFAULT_CONFIG_NAME_SERVER = "kenaito.config-center.server";
|
||||
|
||||
/** 默认配置项:配置中心服务端口 */
|
||||
private static final String DEFAULT_CONFIG_NAME_PORT = "kenaito.config-center.port";
|
||||
/**
|
||||
* 默认配置项:配置中心服务端口
|
||||
*/
|
||||
private static final String DEFAULT_CONFIG_NAME_PORT = "kenaito.config-center.port";
|
||||
|
||||
/** 默认配置项:将拉取的配置环境 */
|
||||
private static final String DEFAULT_CONFIG_NAME_ENV = "kenaito.config-center.env";
|
||||
/**
|
||||
* 默认配置项:将拉取的配置环境
|
||||
*/
|
||||
private static final String DEFAULT_CONFIG_NAME_ENV = "kenaito.config-center.env";
|
||||
|
||||
/** 默认配置项:将拉取配置的应用的名称 */
|
||||
private static final String DEFAULT_CONFIG_NAME_DATA_ID = "kenaito.config-center.data-id";
|
||||
/**
|
||||
* 默认配置项:将拉取配置的应用的名称
|
||||
*/
|
||||
private static final String DEFAULT_CONFIG_NAME_DATA_ID = "kenaito.config-center.data-id";
|
||||
|
||||
/** 默认配置项:配置缓存目录 */
|
||||
private static final String DEFAULT_CONFIG_NAME_CACHE_DIR = "kenaito.config-center.cache-dir";
|
||||
/**
|
||||
* 默认配置项:配置缓存目录
|
||||
*/
|
||||
private static final String DEFAULT_CONFIG_NAME_CACHE_DIR = "kenaito.config-center.cache-dir";
|
||||
|
||||
/** Win路径分割符 */
|
||||
private static final String DEFAULT_PATH_SEP_WIN = "\\";
|
||||
/**
|
||||
* Win路径分割符
|
||||
*/
|
||||
private static final String DEFAULT_PATH_SEP_WIN = "\\";
|
||||
|
||||
/** Mac路径分割符 */
|
||||
private static final String DEFAULT_PATH_SEP_MAC = "/";
|
||||
/**
|
||||
* Mac路径分割符
|
||||
*/
|
||||
private static final String DEFAULT_PATH_SEP_MAC = "/";
|
||||
|
||||
/** 配置源名称 */
|
||||
public static final String PROPERTY_SOURCE_NAME = "kenaito-dynamic-config";
|
||||
/**
|
||||
* 配置源名称
|
||||
*/
|
||||
public static final String PROPERTY_SOURCE_NAME = "kenaito-dynamic-config";
|
||||
|
||||
/** 当前客户端配置 */
|
||||
public static final ClientInfo clientInfo = new ClientInfo();
|
||||
/**
|
||||
* 当前客户端配置
|
||||
*/
|
||||
public static final ClientInfo clientInfo = new ClientInfo();
|
||||
|
||||
/** 配置是否加载完毕 */
|
||||
public static boolean isConfigLoaded = false;
|
||||
/**
|
||||
* 配置是否加载完毕
|
||||
*/
|
||||
public static boolean isConfigLoaded = false;
|
||||
|
||||
/** 服务器是否离线 */
|
||||
public static boolean isServerOffline = false;
|
||||
/**
|
||||
* 服务器是否离线
|
||||
*/
|
||||
public static boolean isServerOffline = false;
|
||||
|
||||
/** 原有的配置信息:filename -> file content */
|
||||
public static Map<String, String> originConfigs = new HashMap<>();
|
||||
/**
|
||||
* 原有的配置信息:filename -> file content
|
||||
*/
|
||||
public static Map<String, String> originConfigs = new HashMap<>();
|
||||
|
||||
/** 转换后的配置信息:filename -> {configKey: configValue} */
|
||||
public static Map<String, Map<String, Object>> lastConfigs = new HashMap<>();
|
||||
/**
|
||||
* 转换后的配置信息:filename -> {configKey: configValue}
|
||||
*/
|
||||
public static Map<String, Map<String, Object>> lastConfigs = new HashMap<>();
|
||||
|
||||
/** 所有自定义配置项缓存 */
|
||||
public static Map<String, Object> cacheConfigs = new HashMap<>();
|
||||
/**
|
||||
* 所有自定义配置项缓存
|
||||
*/
|
||||
public static Map<String, Object> cacheConfigs = new HashMap<>();
|
||||
|
||||
/** 定时将配置写盘,缓存配置信息 */
|
||||
private final Thread fixedTimeFlushConfigFileThread =
|
||||
ThreadUtil.newThread(
|
||||
() -> {
|
||||
while (true) {
|
||||
// 原来是所有的配置写到了一个文件中
|
||||
// try {
|
||||
// if (!lastConfigs.isEmpty()) {
|
||||
// List<String> fileContent = new ArrayList<>();
|
||||
// for (Map.Entry<String, Object> kve : lastConfigs.entrySet()) {
|
||||
// fileContent.add(kve.getKey() + "=" + kve.getValue());
|
||||
// }
|
||||
// FileUtil.writeLines(
|
||||
// fileContent,
|
||||
// FileUtil.file(clientInfo.getCacheDir(), "config"),
|
||||
// StandardCharsets.UTF_8);
|
||||
// }
|
||||
// Thread.sleep(5 * 1000);
|
||||
// } catch (Exception e) {
|
||||
// // 忽略
|
||||
// }
|
||||
// 改为每个配置文件写到单独的文件中
|
||||
if (!originConfigs.isEmpty()) {
|
||||
try {
|
||||
// 降低频率,减小对三方应用本身的影响
|
||||
Thread.sleep(30 * 1000);
|
||||
} catch (Exception e) {
|
||||
// 忽略
|
||||
}
|
||||
for (Map.Entry<String, String> kve : originConfigs.entrySet()) {
|
||||
try {
|
||||
FileUtil.writeString(
|
||||
kve.getValue(),
|
||||
FileUtil.file(clientInfo.getCacheDir(), kve.getKey()),
|
||||
StandardCharsets.UTF_8);
|
||||
} catch (Exception e) {
|
||||
logger.error(
|
||||
"应用 {},环境 {},缓存配置文件 {} 失败",
|
||||
clientInfo.getDataId(),
|
||||
clientInfo.getEnv(),
|
||||
kve.getKey(),
|
||||
e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"fixedTimeFlushConfigFileThread");
|
||||
/**
|
||||
* 定时将配置写盘,缓存配置信息
|
||||
*/
|
||||
private final Thread fixedTimeFlushConfigFileThread =
|
||||
ThreadUtil.newThread(
|
||||
() -> {
|
||||
while (true) {
|
||||
// 原来是所有的配置写到了一个文件中
|
||||
// try {
|
||||
// if (!lastConfigs.isEmpty()) {
|
||||
// List<String> fileContent = new ArrayList<>();
|
||||
// for (Map.Entry<String, Object> kve : lastConfigs.entrySet()) {
|
||||
// fileContent.add(kve.getKey() + "=" + kve.getValue());
|
||||
// }
|
||||
// FileUtil.writeLines(
|
||||
// fileContent,
|
||||
// FileUtil.file(clientInfo.getCacheDir(), "config"),
|
||||
// StandardCharsets.UTF_8);
|
||||
// }
|
||||
// Thread.sleep(5 * 1000);
|
||||
// } catch (Exception e) {
|
||||
// // 忽略
|
||||
// }
|
||||
// 改为每个配置文件写到单独的文件中
|
||||
if (!originConfigs.isEmpty()) {
|
||||
try {
|
||||
// 降低频率,减小对三方应用本身的影响
|
||||
Thread.sleep(30 * 1000);
|
||||
} catch (Exception e) {
|
||||
// 忽略
|
||||
}
|
||||
for (Map.Entry<String, String> kve : originConfigs.entrySet()) {
|
||||
try {
|
||||
FileUtil.writeString(
|
||||
kve.getValue(),
|
||||
FileUtil.file(clientInfo.getCacheDir(), kve.getKey()),
|
||||
StandardCharsets.UTF_8);
|
||||
} catch (Exception e) {
|
||||
logger.error(
|
||||
"应用 {},环境 {},缓存配置文件 {} 失败",
|
||||
clientInfo.getDataId(),
|
||||
clientInfo.getEnv(),
|
||||
kve.getKey(),
|
||||
e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"fixedTimeFlushConfigFileThread");
|
||||
|
||||
@Bean
|
||||
public BeanFactoryPostProcessor configLoader(ConfigurableEnvironment environment) {
|
||||
return new BeanFactoryPostProcessor() {
|
||||
@Override
|
||||
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory)
|
||||
throws BeansException {
|
||||
String defaultCacheDir = getDefaultCacheDir();
|
||||
initClientInfo(defaultCacheDir, environment);
|
||||
logger.info("客户端属性: {}", clientInfo);
|
||||
validateCacheDirPath(defaultCacheDir, clientInfo.getCacheDir());
|
||||
createCacheDir(clientInfo.getCacheDir());
|
||||
fixedTimeFlushConfigFileThread.start();
|
||||
ThreadUtil.execAsync(
|
||||
() -> {
|
||||
try {
|
||||
ConfigClient.getInstance().start(clientInfo.getServer(), clientInfo.getPort());
|
||||
} catch (InterruptedException e) {
|
||||
logger.error("Netty客户端启动失败", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
// 这里加个同步锁,等客户端准备就绪后,拉取配置完成时
|
||||
synchronized (clientInfo) {
|
||||
while (!isConfigLoaded) {
|
||||
try {
|
||||
// 等待配置加载完成
|
||||
clientInfo.wait();
|
||||
} catch (InterruptedException e) {
|
||||
Thread currentThread = Thread.currentThread();
|
||||
String currentThreadName = currentThread.getName();
|
||||
currentThread.interrupt();
|
||||
logger.error("中断线程: {}", currentThreadName, e);
|
||||
}
|
||||
}
|
||||
// 判断配置中心服务是否处于离线状态
|
||||
if (isServerOffline) {
|
||||
logger.info("配置中心离线,尝试从本地缓存加载配置文件");
|
||||
String cacheDir = clientInfo.getCacheDir();
|
||||
FileUtil.walkFiles(
|
||||
FileUtil.file(cacheDir),
|
||||
(file -> {
|
||||
try {
|
||||
String fileName = file.getName();
|
||||
originConfigs.clear();
|
||||
// 配置原生内容
|
||||
originConfigs.put(fileName, FileUtil.readString(file, StandardCharsets.UTF_8));
|
||||
// 转换为应用能识别的配置项
|
||||
lastConfigs.clear();
|
||||
String suffix = FileUtil.getSuffix(file);
|
||||
boolean isYml = "yml".equals(suffix) || "yaml".equals(suffix);
|
||||
if (isYml) {
|
||||
Yaml yaml = new Yaml();
|
||||
lastConfigs.put(fileName, yaml.load(originConfigs.get(fileName)));
|
||||
} else {
|
||||
Properties properties = new Properties();
|
||||
properties.load(StrUtil.getReader(originConfigs.get(fileName)));
|
||||
Map<String, Object> tempMap = new HashMap<>(1);
|
||||
for (Map.Entry<Object, Object> kv : properties.entrySet()) {
|
||||
String key = (String) kv.getKey();
|
||||
tempMap.put(key, kv.getValue());
|
||||
}
|
||||
lastConfigs.put(fileName, tempMap);
|
||||
@Bean
|
||||
public BeanFactoryPostProcessor configLoader(ConfigurableEnvironment environment) {
|
||||
return new BeanFactoryPostProcessor() {
|
||||
@Override
|
||||
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory)
|
||||
throws BeansException {
|
||||
String defaultCacheDir = getDefaultCacheDir();
|
||||
initClientInfo(defaultCacheDir, environment);
|
||||
logger.info("客户端属性: {}", clientInfo);
|
||||
validateCacheDirPath(defaultCacheDir, clientInfo.getCacheDir());
|
||||
createCacheDir(clientInfo.getCacheDir());
|
||||
fixedTimeFlushConfigFileThread.start();
|
||||
ThreadUtil.execAsync(
|
||||
() -> {
|
||||
try {
|
||||
ConfigClient.getInstance().start(clientInfo.getServer(), clientInfo.getPort());
|
||||
} catch (InterruptedException e) {
|
||||
logger.error("Netty客户端启动失败", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
// 这里加个同步锁,等客户端准备就绪后,拉取配置完成时
|
||||
synchronized (clientInfo) {
|
||||
while (!isConfigLoaded) {
|
||||
try {
|
||||
// 等待配置加载完成
|
||||
clientInfo.wait();
|
||||
} catch (InterruptedException e) {
|
||||
Thread currentThread = Thread.currentThread();
|
||||
String currentThreadName = currentThread.getName();
|
||||
currentThread.interrupt();
|
||||
logger.error("中断线程: {}", currentThreadName, e);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.info("配置文件转map失败", e);
|
||||
}
|
||||
}));
|
||||
}
|
||||
// 合并配置项
|
||||
cacheConfigs.clear();
|
||||
Set<Map.Entry<String, Map<String, Object>>> filename2ConfigMap = lastConfigs.entrySet();
|
||||
for (Map.Entry<String, Map<String, Object>> filename2Config : filename2ConfigMap) {
|
||||
cacheConfigs.putAll(filename2Config.getValue());
|
||||
}
|
||||
MapPropertySource propertySource =
|
||||
new MapPropertySource(PROPERTY_SOURCE_NAME, cacheConfigs);
|
||||
environment.getPropertySources().addFirst(propertySource);
|
||||
// 判断配置中心服务是否处于离线状态
|
||||
if (isServerOffline) {
|
||||
logger.info("配置中心离线,尝试从本地缓存加载配置文件");
|
||||
String cacheDir = clientInfo.getCacheDir();
|
||||
FileUtil.walkFiles(
|
||||
FileUtil.file(cacheDir),
|
||||
(file -> {
|
||||
try {
|
||||
String fileName = file.getName();
|
||||
originConfigs.clear();
|
||||
// 配置原生内容
|
||||
originConfigs.put(fileName, FileUtil.readString(file, StandardCharsets.UTF_8));
|
||||
// 转换为应用能识别的配置项
|
||||
lastConfigs.clear();
|
||||
String suffix = FileUtil.getSuffix(file);
|
||||
boolean isYml = "yml".equals(suffix) || "yaml".equals(suffix);
|
||||
if (isYml) {
|
||||
Yaml yaml = new Yaml();
|
||||
lastConfigs.put(fileName, yaml.load(originConfigs.get(fileName)));
|
||||
} else {
|
||||
Properties properties = new Properties();
|
||||
properties.load(StrUtil.getReader(originConfigs.get(fileName)));
|
||||
Map<String, Object> tempMap = new HashMap<>(1);
|
||||
for (Map.Entry<Object, Object> kv : properties.entrySet()) {
|
||||
String key = (String) kv.getKey();
|
||||
tempMap.put(key, kv.getValue());
|
||||
}
|
||||
lastConfigs.put(fileName, tempMap);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.info("配置文件转map失败", e);
|
||||
}
|
||||
}));
|
||||
}
|
||||
// 合并配置项
|
||||
cacheConfigs.clear();
|
||||
Set<Map.Entry<String, Map<String, Object>>> filename2ConfigMap = lastConfigs.entrySet();
|
||||
for (Map.Entry<String, Map<String, Object>> filename2Config : filename2ConfigMap) {
|
||||
cacheConfigs.putAll(filename2Config.getValue());
|
||||
}
|
||||
MapPropertySource propertySource =
|
||||
new MapPropertySource(PROPERTY_SOURCE_NAME, cacheConfigs);
|
||||
environment.getPropertySources().addFirst(propertySource);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static void initClientInfo(String defaultCacheDir, ConfigurableEnvironment environment) {
|
||||
clientInfo.setServer(
|
||||
environment.getProperty(DEFAULT_CONFIG_NAME_SERVER, String.class, DEFAULT_CONFIG_SERVER));
|
||||
clientInfo.setPort(
|
||||
environment.getProperty(DEFAULT_CONFIG_NAME_PORT, Integer.class, DEFAULT_CONFIG_PORT));
|
||||
clientInfo.setEnv(
|
||||
environment.getProperty(DEFAULT_CONFIG_NAME_ENV, String.class, DEFAULT_CONFIG_ENV));
|
||||
clientInfo.setDataId(
|
||||
environment.getProperty(DEFAULT_CONFIG_NAME_DATA_ID, String.class, DEFAULT_CONFIG_DATA_ID));
|
||||
clientInfo.setCacheDir(
|
||||
environment.getProperty(DEFAULT_CONFIG_NAME_CACHE_DIR, String.class, defaultCacheDir));
|
||||
}
|
||||
|
||||
private static String getDefaultCacheDir() {
|
||||
String defaultCacheDir;
|
||||
String os = System.getProperty("os.name");
|
||||
if (os.toLowerCase().startsWith(OS_TYPE_WIN)) {
|
||||
defaultCacheDir = DEFAULT_PATH_WIN;
|
||||
} else if (os.toLowerCase().startsWith(OS_TYPE_MAC)) {
|
||||
defaultCacheDir = DEFAULT_PATH_MAC;
|
||||
} else {
|
||||
defaultCacheDir = DEFAULT_PATH_MAC;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static void initClientInfo(String defaultCacheDir, ConfigurableEnvironment environment) {
|
||||
clientInfo.setServer(
|
||||
environment.getProperty(DEFAULT_CONFIG_NAME_SERVER, String.class, DEFAULT_CONFIG_SERVER));
|
||||
clientInfo.setPort(
|
||||
environment.getProperty(DEFAULT_CONFIG_NAME_PORT, Integer.class, DEFAULT_CONFIG_PORT));
|
||||
clientInfo.setEnv(
|
||||
environment.getProperty(DEFAULT_CONFIG_NAME_ENV, String.class, DEFAULT_CONFIG_ENV));
|
||||
clientInfo.setDataId(
|
||||
environment.getProperty(DEFAULT_CONFIG_NAME_DATA_ID, String.class, DEFAULT_CONFIG_DATA_ID));
|
||||
clientInfo.setCacheDir(
|
||||
environment.getProperty(DEFAULT_CONFIG_NAME_CACHE_DIR, String.class, defaultCacheDir));
|
||||
}
|
||||
|
||||
private static String getDefaultCacheDir() {
|
||||
String defaultCacheDir;
|
||||
String os = System.getProperty("os.name");
|
||||
if (os.toLowerCase().startsWith(OS_TYPE_WIN)) {
|
||||
defaultCacheDir = DEFAULT_PATH_WIN;
|
||||
} else if (os.toLowerCase().startsWith(OS_TYPE_MAC)) {
|
||||
defaultCacheDir = DEFAULT_PATH_MAC;
|
||||
} else {
|
||||
defaultCacheDir = DEFAULT_PATH_MAC;
|
||||
return defaultCacheDir;
|
||||
}
|
||||
return defaultCacheDir;
|
||||
}
|
||||
|
||||
private static void validateCacheDirPath(String defaultCacheDir, String cacheDir) {
|
||||
if (defaultCacheDir.contains(DEFAULT_PATH_WIN_SEP)
|
||||
&& !cacheDir.contains(DEFAULT_PATH_WIN_SEP)) {
|
||||
throw new RuntimeException(DEFAULT_CONFIG_NAME_CACHE_DIR + " 配置的路径不正确");
|
||||
private static void validateCacheDirPath(String defaultCacheDir, String cacheDir) {
|
||||
if (defaultCacheDir.contains(DEFAULT_PATH_WIN_SEP)
|
||||
&& !cacheDir.contains(DEFAULT_PATH_WIN_SEP)) {
|
||||
throw new RuntimeException(DEFAULT_CONFIG_NAME_CACHE_DIR + " 配置的路径不正确");
|
||||
}
|
||||
if (cacheDir.contains(DEFAULT_PATH_WIN_SEP) && !cacheDir.contains(DEFAULT_PATH_SEP_WIN)) {
|
||||
throw new RuntimeException(
|
||||
DEFAULT_CONFIG_NAME_CACHE_DIR + " 配置的路径不正确, 正确的路径示范, " + DEFAULT_PATH_WIN);
|
||||
}
|
||||
}
|
||||
if (cacheDir.contains(DEFAULT_PATH_WIN_SEP) && !cacheDir.contains(DEFAULT_PATH_SEP_WIN)) {
|
||||
throw new RuntimeException(
|
||||
DEFAULT_CONFIG_NAME_CACHE_DIR + " 配置的路径不正确, 正确的路径示范, " + DEFAULT_PATH_WIN);
|
||||
}
|
||||
}
|
||||
|
||||
/** 创建缓存文件夹 */
|
||||
private static void createCacheDir(String cacheDir) {
|
||||
Path path = Paths.get(cacheDir);
|
||||
if (!Files.exists(path)) {
|
||||
File mkdir = FileUtil.mkdir(cacheDir);
|
||||
if (!mkdir.canWrite()) {
|
||||
throw new RuntimeException("缓存文件夹创建失败, 无读写权限");
|
||||
}
|
||||
/**
|
||||
* 创建缓存文件夹
|
||||
*/
|
||||
private static void createCacheDir(String cacheDir) {
|
||||
Path path = Paths.get(cacheDir);
|
||||
if (!Files.exists(path)) {
|
||||
File mkdir = FileUtil.mkdir(cacheDir);
|
||||
if (!mkdir.canWrite()) {
|
||||
throw new RuntimeException("缓存文件夹创建失败, 无读写权限");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,18 +1,20 @@
|
|||
package cn.odboy.config.context;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import java.util.Map;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.cloud.context.refresh.ConfigDataContextRefresher;
|
||||
import org.springframework.cloud.context.refresh.ContextRefresher;
|
||||
//import org.springframework.cloud.context.refresh.ConfigDataContextRefresher;
|
||||
import org.springframework.core.env.ConfigurableEnvironment;
|
||||
import org.springframework.core.env.MapPropertySource;
|
||||
import org.springframework.core.env.MutablePropertySources;
|
||||
import org.springframework.core.env.PropertySource;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 客户端配置 辅助类
|
||||
* <p>
|
||||
* 依赖 spring-cloud-context
|
||||
*
|
||||
* @author odboy
|
||||
* @date 2024-12-07
|
||||
|
@ -20,33 +22,34 @@ import org.springframework.stereotype.Component;
|
|||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class ClientPropertyHelper {
|
||||
private final ConfigurableEnvironment environment;
|
||||
private final ValueAnnotationProcessor valueAnnotationProcessor;
|
||||
private final ConfigDataContextRefresher configDataContextRefresher;
|
||||
private final ConfigurableEnvironment environment;
|
||||
private final ValueAnnotationProcessor valueAnnotationProcessor;
|
||||
// private final ConfigDataContextRefresher configDataContextRefresher;
|
||||
private final ConfigPropertyContextRefresher contextRefresher;
|
||||
|
||||
/**
|
||||
* 动态更新配置值
|
||||
*
|
||||
* @param propertyName 属性路径名
|
||||
* @param value 属性值
|
||||
*/
|
||||
public void updateValue(String propertyName, Object value) {
|
||||
if (StrUtil.isNotBlank(propertyName)) {
|
||||
// 设置属性值
|
||||
MutablePropertySources propertySources = environment.getPropertySources();
|
||||
if (propertySources.contains(ClientConfigLoader.PROPERTY_SOURCE_NAME)) {
|
||||
// 更新属性值
|
||||
PropertySource<?> propertySource =
|
||||
propertySources.get(ClientConfigLoader.PROPERTY_SOURCE_NAME);
|
||||
Map<String, Object> source = ((MapPropertySource) propertySource).getSource();
|
||||
source.put(propertyName, value);
|
||||
}
|
||||
// 单独更新@Value对应的值
|
||||
valueAnnotationProcessor.setValue(propertyName, value);
|
||||
// 刷新上下文(解决 @ConfigurationProperties注解的类属性值更新 问题)
|
||||
// Spring Cloud只会对被@RefreshScope和@ConfigurationProperties标注的bean进行刷新
|
||||
// 这个方法主要做了两件事:刷新配置源,也就是PropertySource,然后刷新了@ConfigurationProperties注解的类
|
||||
configDataContextRefresher.refresh();
|
||||
/**
|
||||
* 动态更新配置值
|
||||
*
|
||||
* @param propertyName 属性路径名
|
||||
* @param value 属性值
|
||||
*/
|
||||
public void updateValue(String propertyName, Object value) {
|
||||
if (StrUtil.isNotBlank(propertyName)) {
|
||||
// 设置属性值
|
||||
MutablePropertySources propertySources = environment.getPropertySources();
|
||||
if (propertySources.contains(ClientConfigLoader.PROPERTY_SOURCE_NAME)) {
|
||||
// 更新属性值
|
||||
PropertySource<?> propertySource = propertySources.get(ClientConfigLoader.PROPERTY_SOURCE_NAME);
|
||||
Map<String, Object> source = ((MapPropertySource) propertySource).getSource();
|
||||
source.put(propertyName, value);
|
||||
}
|
||||
// 单独更新@Value对应的值
|
||||
valueAnnotationProcessor.setValue(propertyName, value);
|
||||
// 刷新上下文(解决 @ConfigurationProperties注解的类属性值更新 问题)
|
||||
// Spring Cloud只会对被@RefreshScope和@ConfigurationProperties标注的bean进行刷新
|
||||
// 这个方法主要做了两件事:刷新配置源,也就是PropertySource,然后刷新了@ConfigurationProperties注解的类
|
||||
// configDataContextRefresher.refresh();
|
||||
contextRefresher.refreshAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
package cn.odboy.config.context;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.boot.context.properties.bind.Bindable;
|
||||
import org.springframework.boot.context.properties.bind.Binder;
|
||||
import org.springframework.core.env.ConfigurableEnvironment;
|
||||
import org.springframework.core.env.MapPropertySource;
|
||||
import org.springframework.core.env.MutablePropertySources;
|
||||
import org.springframework.core.env.PropertySource;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 配置属性上下文 刷新
|
||||
*
|
||||
* @author odboy
|
||||
* @date 2024-12-07
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class ConfigPropertyContextRefresher {
|
||||
private final ConfigurationPropertiesAnnotationProcessor processor;
|
||||
private final ConfigurableEnvironment environment;
|
||||
|
||||
/**
|
||||
* 刷新单个属性
|
||||
*
|
||||
* @param propertyName 属性名表达式
|
||||
* @param value 属性值
|
||||
*/
|
||||
public void refreshSingle(String propertyName, Object value) {
|
||||
MutablePropertySources propertySources = environment.getPropertySources();
|
||||
if (propertySources.contains(ClientConfigLoader.PROPERTY_SOURCE_NAME)) {
|
||||
// 更新属性值
|
||||
PropertySource<?> propertySource = propertySources.get(ClientConfigLoader.PROPERTY_SOURCE_NAME);
|
||||
Map<String, Object> source = ((MapPropertySource) propertySource).getSource();
|
||||
source.put(propertyName, value);
|
||||
} else {
|
||||
// 新增属性值
|
||||
Map<String, Object> propertyMap = new HashMap<>(1);
|
||||
MapPropertySource propertySource = new MapPropertySource(ClientConfigLoader.PROPERTY_SOURCE_NAME, propertyMap);
|
||||
propertySources.addFirst(propertySource);
|
||||
}
|
||||
// 使用 Binder 重新绑定 @ConfigurationProperties
|
||||
Binder binder = Binder.get(environment);
|
||||
for (Map.Entry<String, Object> propertyPrefixBean : processor.getPrefixBeanMap().entrySet()) {
|
||||
binder.bind(propertyPrefixBean.getKey(), Bindable.ofInstance(propertyPrefixBean.getValue()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 刷新所有属性
|
||||
*/
|
||||
public void refreshAll() {
|
||||
MutablePropertySources propertySources = environment.getPropertySources();
|
||||
if (propertySources.contains(ClientConfigLoader.PROPERTY_SOURCE_NAME)) {
|
||||
// 替换属性值
|
||||
MapPropertySource propertySource = new MapPropertySource(ClientConfigLoader.PROPERTY_SOURCE_NAME, ClientConfigLoader.cacheConfigs);
|
||||
propertySources.replace(ClientConfigLoader.PROPERTY_SOURCE_NAME, propertySource);
|
||||
} else {
|
||||
// 新增属性值
|
||||
Map<String, Object> propertyMap = new HashMap<>(1);
|
||||
MapPropertySource propertySource = new MapPropertySource(ClientConfigLoader.PROPERTY_SOURCE_NAME, propertyMap);
|
||||
propertySources.addFirst(propertySource);
|
||||
}
|
||||
// 使用 Binder 重新绑定 @ConfigurationProperties
|
||||
Binder binder = Binder.get(environment);
|
||||
for (Map.Entry<String, Object> propertyPrefixBean : processor.getPrefixBeanMap().entrySet()) {
|
||||
binder.bind(propertyPrefixBean.getKey(), Bindable.ofInstance(propertyPrefixBean.getValue()));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
package cn.odboy.config.context;
|
||||
|
||||
import lombok.Getter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.config.BeanPostProcessor;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 加载并处理@ConfigurationProperties对应的引用
|
||||
*
|
||||
* @author odboy
|
||||
* @date 2024-12-07
|
||||
*/
|
||||
@Component
|
||||
public class ConfigurationPropertiesAnnotationProcessor implements BeanPostProcessor {
|
||||
private final Logger logger =
|
||||
LoggerFactory.getLogger(ConfigurationPropertiesAnnotationProcessor.class);
|
||||
@Getter
|
||||
private final Map<String, Object> prefixBeanMap = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
||||
Class<?> clazz = bean.getClass();
|
||||
while (clazz != null) {
|
||||
if (clazz.isAnnotationPresent(ConfigurationProperties.class)) {
|
||||
// 排除springboot框架的配置
|
||||
if (beanName.contains("springframework")) {
|
||||
clazz = clazz.getSuperclass();
|
||||
continue;
|
||||
}
|
||||
// 排除数据源框架的配置
|
||||
if (beanName.contains("dataSource") || beanName.contains("druid")) {
|
||||
clazz = clazz.getSuperclass();
|
||||
continue;
|
||||
}
|
||||
// 排除ORM框架的配置
|
||||
if (beanName.contains("mybatis")) {
|
||||
clazz = clazz.getSuperclass();
|
||||
continue;
|
||||
}
|
||||
// 排除ip2region的配置
|
||||
if (beanName.contains("ip2region")) {
|
||||
clazz = clazz.getSuperclass();
|
||||
continue;
|
||||
}
|
||||
logger.info("扫描到自定义的@ConfigurationProperties注解类: {}", beanName);
|
||||
this.processConfigBean(clazz, bean);
|
||||
}
|
||||
clazz = clazz.getSuperclass();
|
||||
}
|
||||
return bean;
|
||||
}
|
||||
|
||||
private void processConfigBean(Class<?> clazz, Object bean) {
|
||||
ConfigurationProperties annotation = clazz.getAnnotation(ConfigurationProperties.class);
|
||||
// 比如: kenaito.config-center
|
||||
prefixBeanMap.put(annotation.prefix(), bean);
|
||||
}
|
||||
}
|
|
@ -1,8 +1,5 @@
|
|||
package cn.odboy.config.context;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.BeansException;
|
||||
|
@ -10,6 +7,10 @@ import org.springframework.beans.factory.annotation.Value;
|
|||
import org.springframework.beans.factory.config.BeanPostProcessor;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 加载并处理@Value对应的引用
|
||||
*
|
||||
|
@ -18,46 +19,46 @@ import org.springframework.stereotype.Component;
|
|||
*/
|
||||
@Component
|
||||
public class ValueAnnotationProcessor implements BeanPostProcessor {
|
||||
private final Logger logger = LoggerFactory.getLogger(ValueAnnotationProcessor.class);
|
||||
private final Map<String, Field> valueFieldMap = new HashMap<>();
|
||||
private final Map<String, Object> valueBeanMap = new HashMap<>();
|
||||
private final Logger logger = LoggerFactory.getLogger(ValueAnnotationProcessor.class);
|
||||
private final Map<String, Field> valueFieldMap = new HashMap<>();
|
||||
private final Map<String, Object> valueBeanMap = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
||||
Class<?> clazz = bean.getClass();
|
||||
while (clazz != null) {
|
||||
// 处理@Value
|
||||
for (Field field : clazz.getDeclaredFields()) {
|
||||
if (field.isAnnotationPresent(Value.class)) {
|
||||
field.setAccessible(true);
|
||||
// 对@Value的处理
|
||||
Value annotation = field.getAnnotation(Value.class);
|
||||
String propertyName = annotation.value();
|
||||
propertyName =
|
||||
propertyName.substring(propertyName.indexOf("${") + 2, propertyName.lastIndexOf("}"));
|
||||
// 处理带有默认值的配置
|
||||
if (propertyName.contains(":")) {
|
||||
int firstSpIndex = propertyName.indexOf(":");
|
||||
propertyName = propertyName.substring(0, firstSpIndex);
|
||||
}
|
||||
valueFieldMap.put(propertyName, field);
|
||||
valueBeanMap.put(propertyName, bean);
|
||||
@Override
|
||||
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
||||
Class<?> clazz = bean.getClass();
|
||||
while (clazz != null) {
|
||||
// 处理@Value
|
||||
for (Field field : clazz.getDeclaredFields()) {
|
||||
if (field.isAnnotationPresent(Value.class)) {
|
||||
field.setAccessible(true);
|
||||
// 对@Value的处理
|
||||
Value annotation = field.getAnnotation(Value.class);
|
||||
String propertyName = annotation.value();
|
||||
propertyName =
|
||||
propertyName.substring(propertyName.indexOf("${") + 2, propertyName.lastIndexOf("}"));
|
||||
// 处理带有默认值的配置
|
||||
if (propertyName.contains(":")) {
|
||||
int firstSpIndex = propertyName.indexOf(":");
|
||||
propertyName = propertyName.substring(0, firstSpIndex);
|
||||
}
|
||||
valueFieldMap.put(propertyName, field);
|
||||
valueBeanMap.put(propertyName, bean);
|
||||
}
|
||||
}
|
||||
clazz = clazz.getSuperclass();
|
||||
}
|
||||
}
|
||||
clazz = clazz.getSuperclass();
|
||||
return bean;
|
||||
}
|
||||
return bean;
|
||||
}
|
||||
|
||||
public void setValue(String propertyName, Object value) {
|
||||
try {
|
||||
Field field = valueFieldMap.getOrDefault(propertyName, null);
|
||||
if (field != null) {
|
||||
field.setAccessible(true);
|
||||
field.set(valueBeanMap.get(propertyName), value);
|
||||
}
|
||||
} catch (IllegalAccessException e) {
|
||||
logger.error("设置 {} 字段值 {} 失败", propertyName, value, e);
|
||||
public void setValue(String propertyName, Object value) {
|
||||
try {
|
||||
Field field = valueFieldMap.getOrDefault(propertyName, null);
|
||||
if (field != null) {
|
||||
field.setAccessible(true);
|
||||
field.set(valueBeanMap.get(propertyName), value);
|
||||
}
|
||||
} catch (IllegalAccessException e) {
|
||||
logger.error("设置 {} 字段值 {} 失败", propertyName, value, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,11 +6,12 @@ import io.netty.channel.*;
|
|||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import lombok.Data;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 配置中心客户端
|
||||
*
|
||||
|
@ -19,134 +20,161 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
@Data
|
||||
public class ConfigClient {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ConfigClient.class);
|
||||
private static final Logger logger = LoggerFactory.getLogger(ConfigClient.class);
|
||||
|
||||
/** 客户端 */
|
||||
private EventLoopGroup eventLoopGroup;
|
||||
/**
|
||||
* 客户端
|
||||
*/
|
||||
private EventLoopGroup eventLoopGroup;
|
||||
|
||||
/** 存放客户端bootstrap对象 */
|
||||
private Bootstrap bootstrap;
|
||||
/**
|
||||
* 存放客户端bootstrap对象
|
||||
*/
|
||||
private Bootstrap bootstrap;
|
||||
|
||||
/** 存放客户端channel对象 */
|
||||
private Channel channel;
|
||||
/**
|
||||
* 存放客户端channel对象
|
||||
*/
|
||||
private Channel channel;
|
||||
|
||||
/** 重连间隔,单位秒 */
|
||||
private Integer delaySeconds = 5;
|
||||
/**
|
||||
* 重连间隔,单位秒
|
||||
*/
|
||||
private Integer delaySeconds = 5;
|
||||
|
||||
/** 连接属性:服务ip */
|
||||
private String serverIp;
|
||||
/**
|
||||
* 连接属性:服务ip
|
||||
*/
|
||||
private String serverIp;
|
||||
|
||||
/** 连接属性:服务端口 */
|
||||
private Integer serverPort;
|
||||
/**
|
||||
* 连接属性:服务端口
|
||||
*/
|
||||
private Integer serverPort;
|
||||
|
||||
/** 私有静态实例 */
|
||||
private static volatile ConfigClient instance;
|
||||
/**
|
||||
* 私有静态实例
|
||||
*/
|
||||
private static volatile ConfigClient instance;
|
||||
|
||||
/** 最大重试次数 */
|
||||
private static final int MAX_RETRY_COUNT = 2;
|
||||
/**
|
||||
* 最大重试次数
|
||||
*/
|
||||
private static final int MAX_RETRY_COUNT = 2;
|
||||
|
||||
/** 当前重试次数 */
|
||||
private static int retryCount = 0;
|
||||
/**
|
||||
* 当前重试次数
|
||||
*/
|
||||
private static int retryCount = 0;
|
||||
|
||||
/** 私有化构造函数 */
|
||||
private ConfigClient() {}
|
||||
/**
|
||||
* 私有化构造函数
|
||||
*/
|
||||
private ConfigClient() {
|
||||
}
|
||||
|
||||
/** 获取 */
|
||||
public static ConfigClient getInstance() {
|
||||
if (instance == null) {
|
||||
synchronized (ConfigClient.class) {
|
||||
/**
|
||||
* 获取
|
||||
*/
|
||||
public static ConfigClient getInstance() {
|
||||
if (instance == null) {
|
||||
instance = new ConfigClient();
|
||||
}
|
||||
}
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
public void start(String server, Integer port) throws InterruptedException {
|
||||
this.serverIp = server;
|
||||
this.serverPort = port;
|
||||
try {
|
||||
this.eventLoopGroup = new NioEventLoopGroup();
|
||||
this.bootstrap = new Bootstrap();
|
||||
this.bootstrap
|
||||
.group(this.eventLoopGroup)
|
||||
.option(ChannelOption.SO_KEEPALIVE, true)
|
||||
// 设置接收缓冲区大小为10MB
|
||||
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(1024 * 1024 * 10))
|
||||
.channel(NioSocketChannel.class)
|
||||
.handler(
|
||||
new ChannelInitializer<SocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(SocketChannel ch) throws Exception {
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
pipeline.addLast(new ConfigClientHandler());
|
||||
synchronized (ConfigClient.class) {
|
||||
if (instance == null) {
|
||||
instance = new ConfigClient();
|
||||
}
|
||||
});
|
||||
this.doConnect(server, port);
|
||||
} finally {
|
||||
// eventLoopGroup.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
private void doConnect(String server, Integer port) throws InterruptedException {
|
||||
logger.info("开始连接配置中心服务...");
|
||||
if (channel != null && channel.isActive()) {
|
||||
return;
|
||||
}
|
||||
bootstrap.connect(server, port).addListener(new ConfigClientChannelListener()).sync().channel();
|
||||
/// channel.closeFuture().sync();
|
||||
}
|
||||
|
||||
private class ConfigClientChannelListener implements ChannelFutureListener {
|
||||
/** 该方法会在channelActive之前执行,去判断客户端连接是否成功,并做失败重连的操作 */
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture channelFuture) throws Exception {
|
||||
// 连接成功后保存Channel
|
||||
if (channelFuture.isSuccess()) {
|
||||
channel = channelFuture.channel();
|
||||
// 重置重试次数
|
||||
retryCount = 0;
|
||||
logger.info("连接成功 {}:{}", serverIp, serverPort);
|
||||
} else {
|
||||
// 失败后delaySecond秒(默认是5秒)重连,周期性delaySecond秒的重连
|
||||
retryCount++;
|
||||
logger.info("当前重试次数: {}", retryCount);
|
||||
if (retryCount >= MAX_RETRY_COUNT) {
|
||||
ClientConfigLoader.isConfigLoaded = true;
|
||||
ClientConfigLoader.isServerOffline = true;
|
||||
synchronized (ClientConfigLoader.clientInfo) {
|
||||
// 通知等待的线程
|
||||
ClientConfigLoader.clientInfo.notifyAll();
|
||||
}
|
||||
} else {
|
||||
// 进行重连
|
||||
channelFuture
|
||||
.channel()
|
||||
.eventLoop()
|
||||
.schedule(ConfigClient.this::reConnect, delaySeconds, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
}
|
||||
|
||||
/** 重新连接 */
|
||||
protected void reConnect() {
|
||||
try {
|
||||
logger.info("重连配置中心服务 {}:{}", this.serverIp, this.serverPort);
|
||||
ClientConfigLoader.isConfigLoaded = false;
|
||||
ClientConfigLoader.isServerOffline = false;
|
||||
if (channel != null && channel.isOpen()) {
|
||||
logger.info("Channel已激活, 关闭且重启中...");
|
||||
channel.close();
|
||||
}
|
||||
bootstrap
|
||||
.connect(this.serverIp, this.serverPort)
|
||||
.addListener(new ConfigClientChannelListener())
|
||||
.sync()
|
||||
.channel();
|
||||
} catch (Exception e) {
|
||||
logger.error("重连配置中心服务失败 {}:{}", this.serverIp, this.serverPort, e);
|
||||
public void start(String server, Integer port) throws InterruptedException {
|
||||
this.serverIp = server;
|
||||
this.serverPort = port;
|
||||
try {
|
||||
this.eventLoopGroup = new NioEventLoopGroup();
|
||||
this.bootstrap = new Bootstrap();
|
||||
this.bootstrap
|
||||
.group(this.eventLoopGroup)
|
||||
.option(ChannelOption.SO_KEEPALIVE, true)
|
||||
// 设置接收缓冲区大小为10MB
|
||||
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(1024 * 1024 * 10))
|
||||
.channel(NioSocketChannel.class)
|
||||
.handler(
|
||||
new ChannelInitializer<SocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(SocketChannel ch) throws Exception {
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
pipeline.addLast(new ConfigClientHandler());
|
||||
}
|
||||
});
|
||||
this.doConnect(server, port);
|
||||
} finally {
|
||||
// eventLoopGroup.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
private void doConnect(String server, Integer port) throws InterruptedException {
|
||||
logger.info("开始连接配置中心服务...");
|
||||
if (channel != null && channel.isActive()) {
|
||||
return;
|
||||
}
|
||||
bootstrap.connect(server, port).addListener(new ConfigClientChannelListener()).sync().channel();
|
||||
/// channel.closeFuture().sync();
|
||||
}
|
||||
|
||||
private class ConfigClientChannelListener implements ChannelFutureListener {
|
||||
/**
|
||||
* 该方法会在channelActive之前执行,去判断客户端连接是否成功,并做失败重连的操作
|
||||
*/
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture channelFuture) throws Exception {
|
||||
// 连接成功后保存Channel
|
||||
if (channelFuture.isSuccess()) {
|
||||
channel = channelFuture.channel();
|
||||
// 重置重试次数
|
||||
retryCount = 0;
|
||||
logger.info("连接成功 {}:{}", serverIp, serverPort);
|
||||
} else {
|
||||
// 失败后delaySecond秒(默认是5秒)重连,周期性delaySecond秒的重连
|
||||
retryCount++;
|
||||
logger.info("当前重试次数: {}", retryCount);
|
||||
if (retryCount >= MAX_RETRY_COUNT) {
|
||||
ClientConfigLoader.isConfigLoaded = true;
|
||||
ClientConfigLoader.isServerOffline = true;
|
||||
synchronized (ClientConfigLoader.clientInfo) {
|
||||
// 通知等待的线程
|
||||
ClientConfigLoader.clientInfo.notifyAll();
|
||||
}
|
||||
} else {
|
||||
// 进行重连
|
||||
channelFuture
|
||||
.channel()
|
||||
.eventLoop()
|
||||
.schedule(ConfigClient.this::reConnect, delaySeconds, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 重新连接
|
||||
*/
|
||||
protected void reConnect() {
|
||||
try {
|
||||
logger.info("重连配置中心服务 {}:{}", this.serverIp, this.serverPort);
|
||||
ClientConfigLoader.isConfigLoaded = false;
|
||||
ClientConfigLoader.isServerOffline = false;
|
||||
if (channel != null && channel.isOpen()) {
|
||||
logger.info("Channel已激活, 关闭且重启中...");
|
||||
channel.close();
|
||||
}
|
||||
bootstrap
|
||||
.connect(this.serverIp, this.serverPort)
|
||||
.addListener(new ConfigClientChannelListener())
|
||||
.sync()
|
||||
.channel();
|
||||
} catch (Exception e) {
|
||||
logger.error("重连配置中心服务失败 {}:{}", this.serverIp, this.serverPort, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,14 +9,15 @@ import cn.odboy.config.model.msgtype.ConfigFileInfo;
|
|||
import cn.odboy.config.util.MessageUtil;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.yaml.snakeyaml.Yaml;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.yaml.snakeyaml.Yaml;
|
||||
|
||||
/**
|
||||
* 配置中心客户端 业务处理
|
||||
|
@ -25,99 +26,99 @@ import org.yaml.snakeyaml.Yaml;
|
|||
* @date 2024-12-06
|
||||
*/
|
||||
public class ConfigClientHandler extends ChannelInboundHandlerAdapter {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ConfigClientHandler.class);
|
||||
private static final Logger logger = LoggerFactory.getLogger(ConfigClientHandler.class);
|
||||
|
||||
@Override
|
||||
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
|
||||
/// logger.info("当Channel已经注册到它的EventLoop并且能够处理I/O时被调用");
|
||||
super.channelRegistered(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
|
||||
/// logger.info("当Channel从它的EventLoop注销并且无法处理任何I/O时被调用");
|
||||
super.channelUnregistered(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
/// logger.info("当Channel处于活动状态(已经连接到它的远程节点)时被调用, 注册客户端");
|
||||
SmallMessage smallMessage = new SmallMessage();
|
||||
smallMessage.setType(TransferMessageType.REGISTER);
|
||||
smallMessage.setResp(SmallMessage.Response.ok(ClientConfigLoader.clientInfo));
|
||||
ctx.writeAndFlush(MessageUtil.toByteBuf(smallMessage));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
/// logger.info("当Channel离开活动状态并且不再连接到它的远程节点时被调用");
|
||||
/// this.configClient.reConnect();
|
||||
super.channelInactive(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
/// logger.info("当从Channel读取数据时被调用, 收到服务器消息");
|
||||
logger.info("从服务端读取到Object");
|
||||
SmallMessage smallMessage = MessageUtil.getMessage(msg);
|
||||
SmallMessage.Response resp = smallMessage.getResp();
|
||||
switch (smallMessage.getType()) {
|
||||
case REGISTER:
|
||||
if (!resp.getSuccess()) {
|
||||
logger.info("注册失败, {}", resp.getErrorMessage());
|
||||
} else {
|
||||
logger.info("注册成功, 给服务端发信号, 表明准备好可以拉取配置了");
|
||||
// 准备拉取配置,给服务端发信号
|
||||
SmallMessage pullConfigMessage = new SmallMessage();
|
||||
pullConfigMessage.setType(TransferMessageType.PULL_CONFIG);
|
||||
pullConfigMessage.setResp(SmallMessage.Response.ok(null, "准备好拉取配置了"));
|
||||
ctx.writeAndFlush(MessageUtil.toByteBuf(pullConfigMessage));
|
||||
}
|
||||
break;
|
||||
case PUSH_CONFIG:
|
||||
if (!resp.getSuccess()) {
|
||||
throw new RuntimeException(resp.getErrorMessage());
|
||||
}
|
||||
List<ConfigFileInfo> configFileInfos = MessageUtil.toConfigFileInfoList(resp.getData());
|
||||
logger.info("收到来自服务端推送的配置信息");
|
||||
Map<String, String> originConfigs = new HashMap<>(1);
|
||||
Map<String, Map<String, Object>> lastConfigs = new HashMap<>(1);
|
||||
try {
|
||||
for (ConfigFileInfo configFileInfo : configFileInfos) {
|
||||
String fileName = configFileInfo.getFileName();
|
||||
// 配置原生内容
|
||||
originConfigs.put(fileName, configFileInfo.getFileContent());
|
||||
// 转换为应用能识别的配置项
|
||||
String suffix = FileUtil.getSuffix(fileName);
|
||||
boolean isYml = "yml".equals(suffix) || "yaml".equals(suffix);
|
||||
if (isYml) {
|
||||
Yaml yaml = new Yaml();
|
||||
lastConfigs.put(fileName, yaml.load(configFileInfo.getFileContent()));
|
||||
} else {
|
||||
Properties properties = new Properties();
|
||||
properties.load(StrUtil.getReader(configFileInfo.getFileContent()));
|
||||
Map<String, Object> tempMap = new HashMap<>(1);
|
||||
for (Map.Entry<Object, Object> kv : properties.entrySet()) {
|
||||
String key = (String) kv.getKey();
|
||||
tempMap.put(key, kv.getValue());
|
||||
}
|
||||
lastConfigs.put(fileName, tempMap);
|
||||
}
|
||||
}
|
||||
logger.info("配置文件转map成功");
|
||||
ClientConfigLoader.originConfigs = originConfigs;
|
||||
ClientConfigLoader.lastConfigs = lastConfigs;
|
||||
ClientConfigLoader.isConfigLoaded = true;
|
||||
synchronized (ClientConfigLoader.clientInfo) {
|
||||
// 通知所有等待的线程
|
||||
ClientConfigLoader.clientInfo.notifyAll();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.info("配置文件转map失败", e);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
@Override
|
||||
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
|
||||
/// logger.info("当Channel已经注册到它的EventLoop并且能够处理I/O时被调用");
|
||||
super.channelRegistered(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
|
||||
/// logger.info("当Channel从它的EventLoop注销并且无法处理任何I/O时被调用");
|
||||
super.channelUnregistered(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
/// logger.info("当Channel处于活动状态(已经连接到它的远程节点)时被调用, 注册客户端");
|
||||
SmallMessage smallMessage = new SmallMessage();
|
||||
smallMessage.setType(TransferMessageType.REGISTER);
|
||||
smallMessage.setResp(SmallMessage.Response.ok(ClientConfigLoader.clientInfo));
|
||||
ctx.writeAndFlush(MessageUtil.toByteBuf(smallMessage));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
/// logger.info("当Channel离开活动状态并且不再连接到它的远程节点时被调用");
|
||||
/// this.configClient.reConnect();
|
||||
super.channelInactive(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
/// logger.info("当从Channel读取数据时被调用, 收到服务器消息");
|
||||
logger.info("从服务端读取到Object");
|
||||
SmallMessage smallMessage = MessageUtil.getMessage(msg);
|
||||
SmallMessage.Response resp = smallMessage.getResp();
|
||||
switch (smallMessage.getType()) {
|
||||
case REGISTER:
|
||||
if (!resp.getSuccess()) {
|
||||
logger.info("注册失败, {}", resp.getErrorMessage());
|
||||
} else {
|
||||
logger.info("注册成功, 给服务端发信号, 表明准备好可以拉取配置了");
|
||||
// 准备拉取配置,给服务端发信号
|
||||
SmallMessage pullConfigMessage = new SmallMessage();
|
||||
pullConfigMessage.setType(TransferMessageType.PULL_CONFIG);
|
||||
pullConfigMessage.setResp(SmallMessage.Response.ok(null, "准备好拉取配置了"));
|
||||
ctx.writeAndFlush(MessageUtil.toByteBuf(pullConfigMessage));
|
||||
}
|
||||
break;
|
||||
case PUSH_CONFIG:
|
||||
if (!resp.getSuccess()) {
|
||||
throw new RuntimeException(resp.getErrorMessage());
|
||||
}
|
||||
List<ConfigFileInfo> configFileInfos = MessageUtil.toConfigFileInfoList(resp.getData());
|
||||
logger.info("收到来自服务端推送的配置信息");
|
||||
Map<String, String> originConfigs = new HashMap<>(1);
|
||||
Map<String, Map<String, Object>> lastConfigs = new HashMap<>(1);
|
||||
try {
|
||||
for (ConfigFileInfo configFileInfo : configFileInfos) {
|
||||
String fileName = configFileInfo.getFileName();
|
||||
// 配置原生内容
|
||||
originConfigs.put(fileName, configFileInfo.getFileContent());
|
||||
// 转换为应用能识别的配置项
|
||||
String suffix = FileUtil.getSuffix(fileName);
|
||||
boolean isYml = "yml".equals(suffix) || "yaml".equals(suffix);
|
||||
if (isYml) {
|
||||
Yaml yaml = new Yaml();
|
||||
lastConfigs.put(fileName, yaml.load(configFileInfo.getFileContent()));
|
||||
} else {
|
||||
Properties properties = new Properties();
|
||||
properties.load(StrUtil.getReader(configFileInfo.getFileContent()));
|
||||
Map<String, Object> tempMap = new HashMap<>(1);
|
||||
for (Map.Entry<Object, Object> kv : properties.entrySet()) {
|
||||
String key = (String) kv.getKey();
|
||||
tempMap.put(key, kv.getValue());
|
||||
}
|
||||
lastConfigs.put(fileName, tempMap);
|
||||
}
|
||||
}
|
||||
logger.info("配置文件转map成功");
|
||||
ClientConfigLoader.originConfigs = originConfigs;
|
||||
ClientConfigLoader.lastConfigs = lastConfigs;
|
||||
ClientConfigLoader.isConfigLoaded = true;
|
||||
synchronized (ClientConfigLoader.clientInfo) {
|
||||
// 通知所有等待的线程
|
||||
ClientConfigLoader.clientInfo.notifyAll();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.info("配置文件转map失败", e);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue