Zookeeper实现配置中心

1 本地配置原理

https://docs.spring.io/spring-boot/docs/2.7.2/reference/htmlsingle/#features.external-config

(1)加载application.properties文件

1
2
3
PropertySourceLoader#load()
->PropertiesPropertySourceLoader#load()
->OriginTrackedMapPropertySource

(2)查看Environment的属性配置源

1
2
SpringApplication#run()
->debug: configureIgnoreBeanInfo(environment)

(3)比如在Program arguments中配置hello=hi,environment中的 SimpleCommandLinePropertySource

(4)@Value实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1)使用BeanPostProcessor解析类上的@Value字段
2)获取到字段上的@Value字段
3)解析@Value字段的value属性值,比如age
4)从environment中的属性配置源OriginTrackedMapPropertySource中寻找age的key
5)根据key获取到对应的value值
6)通过field反射的方式设置value值

源码:
AutowiredAnnotationBeanPostProcessor#inject()
->resolveFieldValue(field, bean, beanName) #设置条件:
beanName.equals("orderController")
->AutowiredAnnotationBeanPostProcessor.this.beanFactory.resolveDependency(
->DefaultListableBeanFactory#resolveDependency
->this.doResolveDependency(descriptor...)
# 获取到@Value属性上的value,比如age
->Object value = getAutowireCandidateResolver().getSuggestedValue(descriptor)
# 根据age从env中寻找与之对应的值
-> String strVal = resolveEmbeddedValue((String) value)
#AutowiredAnnotationBeanPostProcessor#inject最后一段逻辑 通过反射给目标字段赋值
-> field#set(bean,value)

2 spring生态中的扩展机制

2.1 常见扩展机制

所谓的扩展机制就是不修改Spring生态源码,也能够把一些想要的代码放到启动流程中

1
2
3
4
5
6
ApplicationContextInitializer
事件监听机制
BeanPostProcessor
BeanFactoryPostProcessor
ApplicationRunner
...

2.2 举例使用ApplicationContextInitializer

(1)自定义ApplicationContextInitializer

1
2
3
4
5
6
public class ZkConfigApplicationContextInitializer implements ApplicationContextInitializer {
@Override
public void initialize(ConfigurableApplicationContext context) {

}
}

resources/META-INF/spring.factories

SPI:读取所有spring.factories文件中的ApplicationContextInitializer类型,并实例化存放到list集合中

1
org.springframework.context.ApplicationContextInitializer=\com.jack.handwrittenzookeeperconfig.initializer.ZkConfigApplicationContextInitializer

3 Zookeeper实现配置中心

3.1 启动Spring Boot拉取zk Server数据

3.1.1 准备数据

/jack-config/product-service

1
2
3
4
5
{
"id": "1",
"product": "mobile",
"price": "3000"
}

3.1.2 Curator连接ZK并获取指定节点数据

1
2
3
4
5
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.1</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ZkConfigApplicationContextInitializer implements ApplicationContextInitializer {
@Override
public void initialize(ConfigurableApplicationContext context) {
System.out.println("我被调用了...");
String connectStr="192.168.0.8:2181";
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder()
.connectionTimeoutMs(20000)
.connectString(connectStr)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
curatorFramework.start();
try {
byte[] bytes = curatorFramework.getData().forPath("/jackconfig/product-service");
// String字符串的Json转成Map
Map<String,Object> map = new ObjectMapper().readValue(new String(bytes), Map.class);
System.out.println("从zookeeper server获取到的值为: "+map);
} catch (Exception e){
e.printStackTrace();
}
}
}

3.2 将map以属性源的形式保存到env中

1
2
3
4
5
// 将map转换成MapPropertySource
MapPropertySource mapPropertySource = new MapPropertySource("product-serviceremote-env", map);
ConfigurableEnvironment environment = context.getEnvironment();
environment.getPropertySources().addFirst(mapPropertySource);
System.out.println("env新增MapPropertySource成功.");

3.3 添加对节点/jack-config/product-service的监听

1
2
3
4
5
6
7
8
9
10
11
12
13
// 永久的监听
CuratorCache curatorCache = CuratorCache.build(curatorFramework, "/jack-config/product-service", CuratorCache.Options.SINGLE_NODE_CACHE);
CuratorCacheListener listener = CuratorCacheListener.builder().forAll(new CuratorCacheListener() {
// 一旦对应 /jack-config/product-service ZNode发生变化,就会回调这个方法
@Override
public void event(Type type, ChildData oldData, ChildData data) {
if(type.equals(Type.NODE_CHANGED)){
System.out.println("ZNode数据更新了, 事件类型为: " + type);
}
}
}).build();
curatorCache.listenable().addListener(listener);
curatorCache.start();

3.4 获取到更新后的数据并重新赋值给environment

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CuratorCacheListener listener = CuratorCacheListener.builder().forAll(new CuratorCacheListener() {
// 一旦对应 /jack-config/product-service ZNode发生变化,就会回调这个方法
@Override
public void event(Type type, ChildData oldData, ChildData data) {
if(type.equals(Type.NODE_CHANGED)){
System.out.println("ZNode数据更新了, 事件类型为: " + type);
try {
Map<String, Object> updateMap = new ObjectMapper().readValue(new String(data.getData()), Map.class);
System.out.println("更新后的数据map为: "+updateMap);
environment.getPropertySources().replace("product-service-remote-env", new MapPropertySource("product-service-remote-enve", updateMap));
} catch (Exception e){
e.printStackTrace();
}
}
}
}).build();

3.5 更新@Value字段

(1)FieldDetail

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class FieldDetail {
private Field field; // 具体哪个字段
private Object instance; // 属于哪个实例
public FieldDetail(Field field, Object instance) {
this.field = field;
this.instance = instance;
}
public Field getField() {
return field;
}
public void setField(Field field) {
this.field = field;
}
public Object getInstance() {
return instance;
}
public void setInstance(Object instance) {
this.instance = instance;
}
}

(2)定义需要保存下来类的注解

1
2
3
4
5
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface JackRefreshScope {
}

(3)添加到目标类上,比如ProductController

1
2
3
4
@RestController
@RequestMapping("/product")
@JackRefreshScope
public class ProductController {

(4)使用后置处理器进行逻辑判断并保存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Component
public class ParseJackRefreshScopeBeanPostProcessor implements BeanPostProcessor {
private Map<String, FieldDetail> fieldDetailMap=new HashMap<>();
public Map<String, FieldDetail> getFieldDetailMap() {
return fieldDetailMap;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> clazz = bean.getClass();
if(clazz.isAnnotationPresent(JackRefreshScope.class)){
System.out.println(clazz); // class com.jack.controller.ProductController
for (Field field : clazz.getDeclaredFields()) {
if(field.isAnnotationPresent(Value.class)){
Value value = field.getAnnotation(Value.class);
String val=value.value(); // 获取到了对应的value值 ${id} ---> id
val=val.substring(2,val.indexOf("}")); // id
System.out.println("val: "+val);
// 保存 val 对应的Field和Field所在的clazz
this.fieldDetailMap.put(val,new FieldDetail(field,bean));
}
}
System.out.println("");
}
return BeanPostProcessor.super.postProcessAfterInitialization(bean,beanName);
}
}

(5)完善ZkConfigApplicationContextInitializer最后的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
// 获取到有哪些字段可能需要更新
ParseJackRefreshScopeBeanPostProcessor parseJackRefreshScopeBeanPostProcessor = context.getBean("parseJackRefreshScopeBeanPostProcessor", ParseJackRefreshScopeBeanPostProcessor.class);
Map<String, FieldDetail> fieldDetailMap = parseJackRefreshScopeBeanPostProcessor.getFieldDetailMap();
for (String key : fieldDetailMap.keySet()) {
if(updateMap.containsKey(key)){ // 判断远端发送过来的map数据中的key
String value = environment.getProperty(key);
Field field = fieldDetailMap.get(key).getField();
field.setAccessible(true);
// 反射更新字段的值
field.set(fieldDetailMap.get(key).getInstance(),value);
}
}

Zookeeper实现配置中心
http://www.zivjie.cn/2023/03/25/中间件/zookeeper/Zookeeper实现配置中心/
作者
Francis
发布于
2023年3月25日
许可协议