feat: 动态加载配置,测试成功

This commit is contained in:
骑着蜗牛追导弹 2024-12-05 21:51:13 +08:00
parent 2f764fbf0b
commit c8b70718d2
27 changed files with 568 additions and 124 deletions

View File

@ -1,7 +0,0 @@
package cn.odboy.config.constant;
public interface ConfigClientMsgType {
int REGISTER = 1;
int PULL_CONFIG = 2;
int UPDATE_CONFIG = 3;
}

View File

@ -0,0 +1,27 @@
package cn.odboy.config.constant;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 传输的消息类型
*
* @author odboy
* @date 2024-12-05
*/
@Getter
@AllArgsConstructor
public enum TransferMessageType {
/**
* 客户端注册
*/
REGISTER,
/**
* 客户端主动拉取配置文件
*/
PULL_CONFIG,
/**
* 服务端主动推送配置文件
*/
PUSH_CONFIG;
}

View File

@ -1,5 +1,6 @@
package cn.odboy.config.model; package cn.odboy.config.model;
import cn.odboy.config.constant.TransferMessageType;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
@ -11,9 +12,9 @@ import java.io.Serializable;
@NoArgsConstructor @NoArgsConstructor
public class SmallMessage implements Serializable { public class SmallMessage implements Serializable {
/** /**
* 消息类型cn.odboy.config.constant.ConfigClientMsgType * 消息类型cn.odboy.config.constant.TransferMessageType
*/ */
private int type; private TransferMessageType type;
private Response resp; private Response resp;
@Data @Data

View File

@ -5,7 +5,7 @@ import lombok.Data;
import java.io.Serializable; import java.io.Serializable;
@Data @Data
public class ClientProp implements Serializable { public class ClientInfo implements Serializable {
private String server; private String server;
private Integer port; private Integer port;
private String env; private String env;

View File

@ -0,0 +1,11 @@
package cn.odboy.config.model.msgtype;
import lombok.Data;
import java.io.Serializable;
@Data
public class ConfigFileInfo implements Serializable {
private String fileName;
private String fileContent;
}

View File

@ -1,15 +0,0 @@
package cn.odboy.config.model.msgtype;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ConfigKv implements Serializable {
private String key;
private Object value;
}

View File

@ -0,0 +1,18 @@
package cn.odboy.config.util;
import cn.odboy.config.model.SmallMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
public class MessageUtil {
public static ByteBuf toByteBuf(Object data){
return Unpooled.copiedBuffer(ProtostuffUtil.serializer(data));
}
public static SmallMessage getMessage(Object msg) {
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
return ProtostuffUtil.deserializer(bytes, SmallMessage.class);
}
}

View File

@ -1,10 +1,11 @@
package cn.odboy.config.util; package cn.odboy.config.util;
import cn.odboy.config.model.msgtype.ConfigKv; import cn.odboy.config.model.msgtype.ConfigFileInfo;
import com.dyuproject.protostuff.LinkedBuffer; import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil; import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema; import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema; import com.dyuproject.protostuff.runtime.RuntimeSchema;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -66,8 +67,8 @@ public class ProtostuffUtil {
} }
public static void main(String[] args) { public static void main(String[] args) {
byte[] userBytes = ProtostuffUtil.serializer(new ConfigKv("app.config", "zhuge")); byte[] userBytes = ProtostuffUtil.serializer(new ConfigFileInfo());
ConfigKv user = ProtostuffUtil.deserializer(userBytes, ConfigKv.class); ConfigFileInfo user = ProtostuffUtil.deserializer(userBytes, ConfigFileInfo.class);
System.out.println(user); System.out.println(user);
} }
} }

View File

@ -1,7 +1,7 @@
package cn.odboy.config.context; package cn.odboy.config.context;
import cn.hutool.core.io.FileUtil; import cn.hutool.core.io.FileUtil;
import cn.odboy.config.model.msgtype.ClientProp; import cn.odboy.config.model.msgtype.ClientInfo;
import cn.odboy.config.netty.ConfigClient; import cn.odboy.config.netty.ConfigClient;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException; import org.springframework.beans.BeansException;
@ -10,11 +10,14 @@ import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;
import java.io.File; import java.io.File;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
/** /**
* 配置加载器 * 配置加载器
@ -50,7 +53,15 @@ public class ClientConfigLoader {
/** /**
* 当前客户端配置 * 当前客户端配置
*/ */
public static ClientProp clientProp = new ClientProp(); public static final ClientInfo clientInfo = new ClientInfo();
/**
* 配置是否加载完毕
*/
public static boolean isConfigLoaded = false;
/**
* 所有的配置信息
*/
public static Map<String, Object> lastConfigs = new HashMap<>();
@Bean @Bean
public BeanFactoryPostProcessor configLoader(ConfigurableEnvironment environment) { public BeanFactoryPostProcessor configLoader(ConfigurableEnvironment environment) {
@ -58,25 +69,35 @@ public class ClientConfigLoader {
@Override @Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
String defaultCacheDir = getDefaultCacheDir(); String defaultCacheDir = getDefaultCacheDir();
clientProp.setServer(environment.getProperty(DEFAULT_CONFIG_NAME_SERVER, String.class, DEFAULT_CONFIG_SERVER)); clientInfo.setServer(environment.getProperty(DEFAULT_CONFIG_NAME_SERVER, String.class, DEFAULT_CONFIG_SERVER));
clientProp.setPort(environment.getProperty(DEFAULT_CONFIG_NAME_PORT, Integer.class, DEFAULT_CONFIG_PORT)); clientInfo.setPort(environment.getProperty(DEFAULT_CONFIG_NAME_PORT, Integer.class, DEFAULT_CONFIG_PORT));
clientProp.setEnv(environment.getProperty(DEFAULT_CONFIG_NAME_ENV, String.class, DEFAULT_CONFIG_ENV)); clientInfo.setEnv(environment.getProperty(DEFAULT_CONFIG_NAME_ENV, String.class, DEFAULT_CONFIG_ENV));
clientProp.setDataId(environment.getProperty(DEFAULT_CONFIG_NAME_DATA_ID, String.class, DEFAULT_CONFIG_DATA_ID)); clientInfo.setDataId(environment.getProperty(DEFAULT_CONFIG_NAME_DATA_ID, String.class, DEFAULT_CONFIG_DATA_ID));
clientProp.setCacheDir(environment.getProperty(DEFAULT_CONFIG_NAME_CACHE_DIR, String.class, defaultCacheDir)); clientInfo.setCacheDir(environment.getProperty(DEFAULT_CONFIG_NAME_CACHE_DIR, String.class, defaultCacheDir));
log.info("客户端属性: {}", clientProp.toString()); log.info("客户端属性: {}", clientInfo);
validateCacheDirPath(defaultCacheDir, clientProp.getCacheDir()); validateCacheDirPath(defaultCacheDir, clientInfo.getCacheDir());
createCacheDir(clientProp.getCacheDir()); createCacheDir(clientInfo.getCacheDir());
try { try {
ConfigClient client = new ConfigClient(); ConfigClient client = new ConfigClient();
client.start(clientProp.getServer(), clientProp.getPort()); client.start(clientInfo.getServer(), clientInfo.getPort());
// 这里加个同步锁等客户端准备就绪后拉取配置完成时
synchronized (clientInfo) {
while (!isConfigLoaded) {
try {
// 等待配置加载完成
clientInfo.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Thread interrupted", e);
}
}
MapPropertySource propertySource = new MapPropertySource("loadFormServer", lastConfigs);
environment.getPropertySources().addFirst(propertySource);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("Netty Client Start Error", e); log.error("Netty Client Start Error", e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
// ConfigClient client = new ConfigClient("http://your-config-center-url/config");
// Map<String, Object> configKv = client.fetchConfig();
// MapPropertySource propertySource = new MapPropertySource("configCenter", configKv);
// environment.getPropertySources().addFirst(propertySource);
} }
}; };
} }

View File

@ -43,6 +43,8 @@ public class ConfigClient {
bootstrap = new Bootstrap(); bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup) bootstrap.group(eventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_KEEPALIVE, true)
// 设置接收缓冲区大小为10MB
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(1024 * 1024 * 10))
.channel(NioSocketChannel.class) .channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() { .handler(new ChannelInitializer<SocketChannel>() {
@Override @Override

View File

@ -1,13 +1,22 @@
package cn.odboy.config.netty; package cn.odboy.config.netty;
import cn.odboy.config.constant.ConfigClientMsgType; import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.StrUtil;
import cn.odboy.config.constant.TransferMessageType;
import cn.odboy.config.context.ClientConfigLoader; import cn.odboy.config.context.ClientConfigLoader;
import cn.odboy.config.model.SmallMessage; import cn.odboy.config.model.SmallMessage;
import cn.odboy.config.util.ProtostuffUtil; import cn.odboy.config.model.msgtype.ConfigFileInfo;
import io.netty.buffer.ByteBuf; import cn.odboy.config.util.MessageUtil;
import io.netty.buffer.Unpooled; import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import org.yaml.snakeyaml.Yaml;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class ConfigClientHandler extends ChannelInboundHandlerAdapter { public class ConfigClientHandler extends ChannelInboundHandlerAdapter {
private final ConfigClient configClient; private final ConfigClient configClient;
@ -30,10 +39,9 @@ public class ConfigClientHandler extends ChannelInboundHandlerAdapter {
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("ConfigClientHandler -> 当Channel处于活动状态已经连接到它的远程节点时被调用, 注册客户端"); System.err.println("ConfigClientHandler -> 当Channel处于活动状态已经连接到它的远程节点时被调用, 注册客户端");
SmallMessage smallMessage = new SmallMessage(); SmallMessage smallMessage = new SmallMessage();
smallMessage.setType(ConfigClientMsgType.REGISTER); smallMessage.setType(TransferMessageType.REGISTER);
smallMessage.setResp(SmallMessage.Response.ok(ClientConfigLoader.clientProp)); smallMessage.setResp(SmallMessage.Response.ok(ClientConfigLoader.clientInfo));
ByteBuf buf = Unpooled.copiedBuffer(ProtostuffUtil.serializer(smallMessage)); ctx.writeAndFlush(MessageUtil.toByteBuf(smallMessage));
ctx.writeAndFlush(buf);
} }
@Override @Override
@ -44,14 +52,60 @@ public class ConfigClientHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.err.println("ConfigClientHandler -> 当从Channel读取数据时被调用, 收到服务器消息"); // System.err.println("ConfigClientHandler -> 当从Channel读取数据时被调用, 收到服务器消息");
ByteBuf buf = (ByteBuf) msg; System.err.println("ConfigClientHandler -> 从服务端读取到Object");
byte[] bytes = new byte[buf.readableBytes()]; SmallMessage smallMessage = MessageUtil.getMessage(msg);
buf.readBytes(bytes); SmallMessage.Response resp = smallMessage.getResp();
System.err.println("ConfigClientHandler -> 从服务端读取到Object" + ProtostuffUtil.deserializer(bytes, SmallMessage.class)); switch (smallMessage.getType()) {
SmallMessage smallMessage = ProtostuffUtil.deserializer(bytes, SmallMessage.class); case REGISTER:
if (smallMessage.getType() == ConfigClientMsgType.REGISTER) { if (!resp.getSuccess()) {
System.err.println("ConfigClientHandler -> 注册失败, " + resp.getErrorMsg());
} else {
System.err.println("ConfigClientHandler -> 注册成功, 给服务端发信号, 表明准备好可以拉取配置了");
// 准备拉取配置给服务端发信号
SmallMessage pullConfigMessage = new SmallMessage();
pullConfigMessage.setType(TransferMessageType.PULL_CONFIG);
pullConfigMessage.setResp(SmallMessage.Response.ok(null, "准备好拉取配置了"));
ctx.writeAndFlush(MessageUtil.toByteBuf(pullConfigMessage));
}
break;
case PUSH_CONFIG:
if (!resp.getSuccess()) {
throw new RuntimeException(resp.getErrorMsg());
}
List<ConfigFileInfo> configFileInfos = (List<ConfigFileInfo>) resp.getData();
System.err.println("ConfigClientHandler -> 收到来自服务端推送的配置信息");
Map<String, Object> configKv = new HashMap<>(1);
try {
for (ConfigFileInfo configFileInfo : configFileInfos) {
String fileName = configFileInfo.getFileName();
String suffix = FileUtil.getSuffix(fileName);
boolean isYml = "yml".equals(suffix) || "yaml".equals(suffix);
if (isYml) {
Yaml yaml = new Yaml();
configKv.putAll(yaml.load(configFileInfo.getFileContent()));
} else {
Properties properties = new Properties();
properties.load(StrUtil.getReader(configFileInfo.getFileContent()));
for (Map.Entry<Object, Object> kv : properties.entrySet()) {
String key = (String) kv.getKey();
configKv.put(key, kv.getValue());
}
}
}
System.err.println("ConfigClientHandler -> 配置文件转map成功");
ClientConfigLoader.lastConfigs = configKv;
ClientConfigLoader.isConfigLoaded = true;
synchronized (ClientConfigLoader.clientInfo) {
// 通知所有等待的线程
ClientConfigLoader.clientInfo.notifyAll();
}
} catch (IOException e) {
System.err.println("ConfigClientHandler -> 配置文件转map失败");
}
break;
default:
break;
} }
} }
} }

View File

@ -13,34 +13,16 @@ import java.util.List;
public class GenCode { public class GenCode {
public static void main(String[] args) { public static void main(String[] args) {
GenCmdHelper generator = new GenCmdHelper(); GenCmdHelper generator = new GenCmdHelper();
generator.setDatabaseUrl(String.format("jdbc:mysql://%s:%s/%s", "kenaito-mysql.odboy.local", 13306, "kenaito_devops")); generator.setDatabaseUrl(String.format("jdbc:mysql://%s:%s/%s", "kenaito-mysql.odboy.local", 13306, "kenaito_config"));
generator.setDatabaseUsername("root"); generator.setDatabaseUsername("root");
generator.setDatabasePassword("root"); generator.setDatabasePassword("root");
genCareer(generator); genCareer(generator);
} }
private static void genCareer(GenCmdHelper generator) { private static void genCareer(GenCmdHelper generator) {
generator.gen("devops_", List.of( generator.gen("", List.of(
// 应用 "config_file",
// "devops_app", "config_version"
// "devops_app_user",
// "devops_product_line",
// 应用迭代
// "devops_app_iteration",
// "devops_app_iteration_change",
// 容器
// "devops_containerd_cluster_config",
// "devops_containerd_cluster_node",
// "devops_ops_config",
// "devops_containerd_spec_config",
// 网络
// "devops_network_service",
// "devops_network_ingress",
// 流水线
"devops_pipeline_template_type",
"devops_pipeline_template_language",
"devops_pipeline_template_language_config",
"devops_pipeline_template_app"
)); ));
} }
} }

View File

@ -0,0 +1,36 @@
package cn.odboy.domain;
import cn.odboy.base.MyObject;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Getter;
import lombok.Setter;
/**
* <p>
* 配置应用
* </p>
*
* @author odboy
* @since 2024-12-05
*/
@Getter
@Setter
@TableName("config_app")
public class ConfigApp extends MyObject {
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 应用名称
*/
@TableField("app_name")
private String appName;
/**
* 应用说明
*/
@TableField("description")
private String description;
}

View File

@ -0,0 +1,45 @@
package cn.odboy.domain;
import cn.odboy.base.MyObject;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
/**
* <p>
* 配置文件
* </p>
*
* @author odboy
* @since 2024-12-05
*/
@Getter
@Setter
@TableName("config_file")
public class ConfigFile extends MyObject {
@TableId(value = "id", type = IdType.AUTO)
private Long id;
@TableField("app_name")
private String appName;
@TableField("env")
private String env;
/**
* 例如: application-daily.properties
*/
@TableField("file_name")
private String fileName;
/**
* 当前配置内容版本
*/
@TableField("version")
private Long version;
}

View File

@ -0,0 +1,28 @@
package cn.odboy.domain;
import cn.odboy.base.MyObject;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Getter;
import lombok.Setter;
/**
* <p>
* 配置内容版本
* </p>
*
* @author odboy
* @since 2024-12-05
*/
@Getter
@Setter
@TableName("config_version")
public class ConfigVersion extends MyObject {
@TableField("file_id")
private Long fileId;
@TableField("file_content")
private byte[] fileContent;
}

View File

@ -1,5 +1,6 @@
package cn.odboy.infra.netty; package cn.odboy.infra.netty;
import cn.odboy.infra.exception.BadRequestException;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId; import io.netty.channel.ChannelId;
@ -14,32 +15,44 @@ import java.util.concurrent.ConcurrentMap;
*/ */
public class ConfigClientManage { public class ConfigClientManage {
/** /**
* 所有的客户端连接: {env}-{dataId} to ctx * 所有的客户端连接: {env}_{dataId} to ctx
*/ */
private static final ConcurrentMap<String, ChannelHandlerContext> clientMap = new ConcurrentHashMap<>(); private static final ConcurrentMap<String, ChannelHandlerContext> CLIENT = new ConcurrentHashMap<>();
/** /**
* 所有的客户端Id: channelId to {env}-{dataId} * 所有的客户端Id: channelId to {env}_{dataId}
*/ */
private static final ConcurrentMap<ChannelId, String> channelMap = new ConcurrentHashMap<>(); private static final ConcurrentMap<ChannelId, String> CHANNEL = new ConcurrentHashMap<>();
public static void register(String env, String dataId, ChannelHandlerContext ctx) { public static void register(String env, String dataId, ChannelHandlerContext ctx) {
String envClientKey = String.format("%s_%s", env, dataId); String envClientKey = String.format("%s_%s", env, dataId);
clientMap.put(envClientKey, ctx); CLIENT.put(envClientKey, ctx);
channelMap.put(ctx.channel().id(), envClientKey); CHANNEL.put(ctx.channel().id(), envClientKey);
System.err.println("ConfigClientManage -> 客户端注册成功"); System.err.println("ConfigClientManage -> 客户端注册成功");
System.err.println("ConfigClientManage -> ctx.channel.id=" + ctx.channel().id()); System.err.println("ConfigClientManage -> ctx.channel.id=" + ctx.channel().id());
} }
public static void unregister(ChannelId channelId) { public static void unregister(ChannelId channelId) {
String envClientKey = channelMap.getOrDefault(channelId, null); String envClientKey = CHANNEL.getOrDefault(channelId, null);
if (envClientKey != null) { if (envClientKey != null) {
channelMap.remove(channelId); CHANNEL.remove(channelId);
ChannelHandlerContext ctx = clientMap.getOrDefault(envClientKey, null); ChannelHandlerContext ctx = CLIENT.getOrDefault(envClientKey, null);
if (ctx != null) { if (ctx != null) {
clientMap.remove(envClientKey); CLIENT.remove(envClientKey);
System.err.println("ConfigClientManage -> 客户端注销成功"); System.err.println("ConfigClientManage -> 客户端注销成功");
System.err.println("ConfigClientManage -> ctx.channel.id=" + channelId); System.err.println("ConfigClientManage -> ctx.channel.id=" + channelId);
} }
} }
} }
public static String[] getEnvDataId(ChannelId channelId) {
String envDataId = CHANNEL.getOrDefault(channelId, null);
if (envDataId == null) {
throw new BadRequestException("获取配置数据ID失败");
}
String[] s = envDataId.split("_");
if (s.length != 2) {
throw new BadRequestException("获取配置数据ID失败");
}
return s;
}
} }

View File

@ -1,6 +1,7 @@
package cn.odboy.infra.netty; package cn.odboy.infra.netty;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import cn.odboy.service.ConfigFileService;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
@ -11,6 +12,7 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -19,6 +21,8 @@ import org.springframework.stereotype.Component;
public class ConfigNettyServer implements InitializingBean { public class ConfigNettyServer implements InitializingBean {
@Value("${kenaito.config-center.port}") @Value("${kenaito.config-center.port}")
private Integer configCenterPort; private Integer configCenterPort;
@Autowired
private ConfigFileService configFileService;
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
@ -43,7 +47,7 @@ public class ConfigNettyServer implements InitializingBean {
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ConfigServerHandler()); pipeline.addLast(new ConfigServerHandler(configFileService));
} }
}); });
log.info("Netty Server Start..."); log.info("Netty Server Start...");

View File

@ -2,42 +2,69 @@ package cn.odboy.infra.netty;
import cn.hutool.core.exceptions.ExceptionUtil; import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import cn.odboy.config.constant.ConfigClientMsgType; import cn.odboy.config.constant.TransferMessageType;
import cn.odboy.config.model.SmallMessage; import cn.odboy.config.model.SmallMessage;
import cn.odboy.config.model.msgtype.ClientProp; import cn.odboy.config.model.msgtype.ClientInfo;
import cn.odboy.config.util.ProtostuffUtil; import cn.odboy.config.model.msgtype.ConfigFileInfo;
import io.netty.buffer.ByteBuf; import cn.odboy.config.util.MessageUtil;
import cn.odboy.service.ConfigFileService;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.List;
public class ConfigServerHandler extends ChannelInboundHandlerAdapter { public class ConfigServerHandler extends ChannelInboundHandlerAdapter {
private final ConfigFileService configFileService;
public ConfigServerHandler(ConfigFileService configFileService) {
this.configFileService = configFileService;
}
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.err.println("ServerHandler -> 当从Channel读取数据时被调用"); // System.err.println("ServerHandler -> 当从Channel读取数据时被调用");
ByteBuf buf = (ByteBuf) msg; SmallMessage smallMessage = MessageUtil.getMessage(msg);
byte[] bytes = new byte[buf.readableBytes()]; System.err.println("ServerHandler -> 从客户端读取到Object" + smallMessage);
buf.readBytes(bytes);
System.err.println("ServerHandler -> 从客户端读取到Object" + ProtostuffUtil.deserializer(bytes, SmallMessage.class));
SmallMessage smallMessage = ProtostuffUtil.deserializer(bytes, SmallMessage.class);
if (ConfigClientMsgType.REGISTER == smallMessage.getType()) {
SmallMessage.Response resp = smallMessage.getResp(); SmallMessage.Response resp = smallMessage.getResp();
switch (smallMessage.getType()) {
case REGISTER:
if (!resp.getSuccess() || resp.getData() == null) { if (!resp.getSuccess() || resp.getData() == null) {
ctx.channel().writeAndFlush(new SmallMessage(ConfigClientMsgType.REGISTER, SmallMessage.Response.bad("解析客户端属性失败"))); ctx.writeAndFlush(MessageUtil.toByteBuf(new SmallMessage(TransferMessageType.REGISTER, SmallMessage.Response.bad("解析客户端属性失败"))));
return; return;
} }
ClientProp clientProp = (ClientProp) resp.getData(); ClientInfo clientInfo = (ClientInfo) resp.getData();
if (StrUtil.isBlank(clientProp.getEnv())) { if (StrUtil.isBlank(clientInfo.getEnv())) {
ctx.channel().writeAndFlush(new SmallMessage(ConfigClientMsgType.REGISTER, SmallMessage.Response.bad("解析客户端属性失败"))); ctx.writeAndFlush(MessageUtil.toByteBuf(new SmallMessage(TransferMessageType.REGISTER, SmallMessage.Response.bad("解析客户端属性失败"))));
return; return;
} }
if (StrUtil.isBlank(clientProp.getDataId())) { if (StrUtil.isBlank(clientInfo.getDataId())) {
ctx.channel().writeAndFlush(new SmallMessage(ConfigClientMsgType.REGISTER, SmallMessage.Response.bad("解析客户端属性失败"))); ctx.writeAndFlush(MessageUtil.toByteBuf(new SmallMessage(TransferMessageType.REGISTER, SmallMessage.Response.bad("解析客户端属性失败"))));
return; return;
} }
ctx.channel().writeAndFlush(new SmallMessage(ConfigClientMsgType.REGISTER, SmallMessage.Response.ok(null))); ConfigClientManage.register(clientInfo.getEnv(), clientInfo.getDataId(), ctx);
ConfigClientManage.register(clientProp.getEnv(), clientProp.getDataId(), ctx); ctx.writeAndFlush(MessageUtil.toByteBuf(new SmallMessage(TransferMessageType.REGISTER, SmallMessage.Response.ok(null))));
} else if (ConfigClientMsgType.PULL_CONFIG == smallMessage.getType()) { break;
case PULL_CONFIG:
if (!resp.getSuccess()) {
System.err.println("ServerHandler -> 客户端说它没有准备好拉取配置");
return;
}
System.err.println("ServerHandler -> 客户端说它准备好拉取配置了");
String[] envDataId = ConfigClientManage.getEnvDataId(ctx.channel().id());
String env = envDataId[0];
String dataId = envDataId[1];
List<ConfigFileInfo> fileList = configFileService.getFileList(env, dataId);
if (fileList.isEmpty()) {
ctx.writeAndFlush(MessageUtil.toByteBuf(new SmallMessage(TransferMessageType.PUSH_CONFIG, SmallMessage.Response.bad(
String.format("应用 %s 没有环境编码为 %s 的配置", dataId, env)
))));
} else {
System.err.println("ServerHandler -> 推送配置到客户端");
ctx.writeAndFlush(MessageUtil.toByteBuf(new SmallMessage(TransferMessageType.PUSH_CONFIG, SmallMessage.Response.ok(fileList))));
}
break;
default:
break;
} }
} }

View File

@ -0,0 +1,19 @@
package cn.odboy.mapper;
import cn.odboy.domain.ConfigApp;
import cn.odboy.domain.ConfigFile;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
/**
* <p>
* 配置应用 Mapper 接口
* </p>
*
* @author odboy
* @since 2024-12-05
*/
@Mapper
public interface ConfigAppMapper extends BaseMapper<ConfigApp> {
}

View File

@ -0,0 +1,23 @@
package cn.odboy.mapper;
import cn.odboy.config.model.msgtype.ConfigFileInfo;
import cn.odboy.domain.ConfigFile;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* <p>
* 配置文件 Mapper 接口
* </p>
*
* @author odboy
* @since 2024-12-05
*/
@Mapper
public interface ConfigFileMapper extends BaseMapper<ConfigFile> {
List<ConfigFileInfo> selectByEnvAndAppName(@Param("env") String env, @Param("dataId") String dataId);
}

View File

@ -0,0 +1,18 @@
package cn.odboy.rest;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* <p>
* 配置文件 前端控制器
* </p>
*
* @author odboy
* @since 2024-12-05
*/
@RestController
@RequestMapping("/api/configFile")
public class ConfigFileController {
}

View File

@ -0,0 +1,17 @@
package cn.odboy.service;
import cn.odboy.domain.ConfigApp;
import cn.odboy.domain.ConfigFile;
import com.baomidou.mybatisplus.extension.service.IService;
/**
* <p>
* 配置应用 服务类
* </p>
*
* @author odboy
* @since 2024-12-05
*/
public interface ConfigAppService extends IService<ConfigApp> {
}

View File

@ -0,0 +1,22 @@
package cn.odboy.service;
import cn.odboy.config.model.msgtype.ConfigFileInfo;
import java.util.List;
/**
* <p>
* 配置文件 服务类
* </p>
*
* @author odboy
* @since 2024-12-05
*/
public interface ConfigFileService {
/**
* @param env 环境编码
* @param dataId 数据ID这里是应用名称
* @return /
*/
List<ConfigFileInfo> getFileList(String env, String dataId);
}

View File

@ -0,0 +1,23 @@
package cn.odboy.service.impl;
import cn.odboy.domain.ConfigApp;
import cn.odboy.domain.ConfigFile;
import cn.odboy.mapper.ConfigAppMapper;
import cn.odboy.mapper.ConfigFileMapper;
import cn.odboy.service.ConfigAppService;
import cn.odboy.service.ConfigFileService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
/**
* <p>
* 配置应用 服务实现类
* </p>
*
* @author odboy
* @since 2024-12-05
*/
@Service
public class ConfigAppServiceImpl extends ServiceImpl<ConfigAppMapper, ConfigApp> implements ConfigAppService {
}

View File

@ -0,0 +1,28 @@
package cn.odboy.service.impl;
import cn.odboy.config.model.msgtype.ConfigFileInfo;
import cn.odboy.mapper.ConfigFileMapper;
import cn.odboy.service.ConfigFileService;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* <p>
* 配置文件 服务实现类
* </p>
*
* @author odboy
* @since 2024-12-05
*/
@Service
@RequiredArgsConstructor
public class ConfigFileServiceImpl implements ConfigFileService {
private final ConfigFileMapper configFileMapper;
@Override
public List<ConfigFileInfo> getFileList(String env, String dataId) {
return configFileMapper.selectByEnvAndAppName(env, dataId);
}
}

View File

@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.odboy.mapper.ConfigFileMapper">
<select id="selectByEnvAndAppName" resultType="cn.odboy.config.model.msgtype.ConfigFileInfo">
SELECT t1.file_name,
t2.file_content
FROM config_file t1
LEFT JOIN config_version t2 ON t1.id = t2.file_id
AND t1.version = t2.version
WHERE t1.env = #{env}
AND t1.app_name = #{dataId}
</select>
</mapper>

File diff suppressed because one or more lines are too long