共计 3832 个字符,预计需要花费 10 分钟才能阅读完成。
序
本文主要研究一下 nacos 的 configWatchers
CommunicationController
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/controller/CommunicationController.java
@Controller
@RequestMapping(Constants.COMMUNICATION_CONTROLLER_PATH)
public class CommunicationController {
private final DumpService dumpService;
private final LongPollingService longPollingService;
private String trueStr = "true";
@Autowired
public CommunicationController(DumpService dumpService, LongPollingService longPollingService) {
this.dumpService = dumpService;
this.longPollingService = longPollingService;
}
//......
/**
* 在本台机器上获得订阅改配置的客户端信息
*/
@RequestMapping(value = "/configWatchers", method = RequestMethod.GET)
@ResponseBody
public SampleResult getSubClientConfig(HttpServletRequest request,
HttpServletResponse response,
@RequestParam("dataId") String dataId,
@RequestParam("group") String group,
@RequestParam(value = "tenant", required = false) String tenant,
ModelMap modelMap) {group = StringUtils.isBlank(group) ? Constants.DEFAULT_GROUP : group;
return longPollingService.getCollectSubscribleInfo(dataId, group, tenant);
}
//......
}
- CommunicationController 提供了
/configWatchers
接口,它通过 longPollingService.getCollectSubscribleInfo 返回 SampleResult
LongPollingService
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java
@Service
public class LongPollingService extends AbstractEventListener {
private static final int FIXED_POLLING_INTERVAL_MS = 10000;
private static final int SAMPLE_PERIOD = 100;
private static final int SAMPLE_TIMES = 3;
private static final String TRUE_STR = "true";
private Map<String, Long> retainIps = new ConcurrentHashMap<String, Long>();
//......
public SampleResult getCollectSubscribleInfo(String dataId, String group, String tenant) {List<SampleResult> sampleResultLst = new ArrayList<SampleResult>(50);
for (int i = 0; i < SAMPLE_TIMES; i++) {SampleResult sampleTmp = getSubscribleInfo(dataId, group, tenant);
if (sampleTmp != null) {sampleResultLst.add(sampleTmp);
}
if (i < SAMPLE_TIMES - 1) {
try {Thread.sleep(SAMPLE_PERIOD);
} catch (InterruptedException e) {LogUtil.clientLog.error("sleep wrong", e);
}
}
}
SampleResult sampleResult = mergeSampleResult(sampleResultLst);
return sampleResult;
}
public SampleResult getSubscribleInfo(String dataId, String group, String tenant) {String groupKey = GroupKey.getKeyTenant(dataId, group, tenant);
SampleResult sampleResult = new SampleResult();
Map<String, String> lisentersGroupkeyStatus = new HashMap<String, String>(50);
for (ClientLongPolling clientLongPolling : allSubs) {if (clientLongPolling.clientMd5Map.containsKey(groupKey)) {lisentersGroupkeyStatus.put(clientLongPolling.ip, clientLongPolling.clientMd5Map.get(groupKey));
}
}
sampleResult.setLisentersGroupkeyStatus(lisentersGroupkeyStatus);
return sampleResult;
}
/**
* 聚合采样结果中的采样 ip 和监听配置的信息;合并策略用后面的覆盖前面的是没有问题的
*
* @param sampleResults sample Results
* @return Results
*/
public SampleResult mergeSampleResult(List<SampleResult> sampleResults) {SampleResult mergeResult = new SampleResult();
Map<String, String> lisentersGroupkeyStatus = new HashMap<String, String>(50);
for (SampleResult sampleResult : sampleResults) {Map<String, String> lisentersGroupkeyStatusTmp = sampleResult.getLisentersGroupkeyStatus();
for (Map.Entry<String, String> entry : lisentersGroupkeyStatusTmp.entrySet()) {lisentersGroupkeyStatus.put(entry.getKey(), entry.getValue());
}
}
mergeResult.setLisentersGroupkeyStatus(lisentersGroupkeyStatus);
return mergeResult;
}
//......
}
- LongPollingService 的 getCollectSubscribleInfo 方法循环 SAMPLE_TIMES(
默认值为 3
)次,每次通过 getSubscribleInfo 方法取出 SampleResult 加入到 sampleResultLst,循环的时候间隔 SAMPLE_PERIOD(默认为 100
)毫秒;最后通过 mergeSampleResult 方法进行合并,最后返回 SampleResult - getSubscribleInfo 方法会遍历 allSubs 中的每个 clientLongPolling,收集 clientLongPolling.clientMd5Map.containsKey(groupKey)的 clientLongPolling 的 ip 及 md5 到 lisentersGroupkeyStatus,最后赋值给 sampleResult
- mergeSampleResult 方法合并 sampleResults 中每个 SampleResult 的 lisentersGroupkeyStatus,最后创建一个新的 SampleResult 返回
小结
CommunicationController 提供了 /configWatchers
接口,它通过 longPollingService.getCollectSubscribleInfo 返回 SampleResult
doc
- CommunicationController
正文完