Nacos配置中心集群原理及源码分析

网友投稿 284 2022-08-16


Nacos配置中心集群原理及源码分析

目录Nacos集群工作原理配置变更同步入口AsyncNotifyServiceAsyncTask目标节点接收请求NacosDelayTaskExecuteEngineProcessRunnableprocessTasksDumpProcessor.process

Nacos作为配置中心,必然需要保证服务节点的高可用性,那么Nacos是如何实现集群的呢?

下面这个图,表示Nacos集群的部署图。

Nacos集群工作原理

Nacos作为配置中心的集群结构中,是一种无中心化节点的设计,由于没有主从节点,也没有选举机制,所以为了能够实现热备,就需要增加虚拟IP(VIP)。

Nacos的数据存储分为两部分

mysql数据库存储,所有Nacos节点共享同一份数据,数据的副本机制由Mysql本身的主从方案来解决,从而保证数据的可靠性。每个节点的本地磁盘,会保存一份全量数据,具体路径:/data/program/nacos-1/data/config-data/${GROUP}.

在Nacos的设计中,Mysql是一个中心数据仓库,且认为在Mysql中的数据是绝对正确的。 除此之外,Nacos在启动时会把Mysql中的数据写一份到本地磁盘。

这么设计的好处是可以提高性能,当客户端需要请求某个配置项时,服务端会想Ian从磁盘中读取对应文件返回,而磁盘的读取效率要比数据库效率高。

当配置发生变更时:

Nacos会把变更的配置保存到数据库,然后再写入本地文件。接着发送一个HTTP请求,给到集群中的其他节点,其他节点收到事件后,从Mysql中dump刚刚写入的数据到本地文件中。

另外,NacosServer启动后,会同步启动一个定时任务,每隔6小时,会dump一次全量数据到本地文件

配置变更同步入口

当配置发生修改、删除、新增操作时,通过发布一个notifyConfigChange事件。

@PostMapping

@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)

public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,

@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,

@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,

@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,

@RequestParam(value = "appName", required = false) String appName,

@RequestParam(value = "src_user", required = false) String srcUser,

@RequestParam(value = "config_tags", required = false) String configTags,

@RequestParam(value = "desc", required = false) String desc,

@RequestParam(value = "use", required = false) String use,

@RequestParam(value = "effect", required = false) String effect,

@RequestParam(value = "type", required = false) String type,

@RequestParam(value = "schema", required = false) String schema) throws NacosException {

//省略..

if (StringUtils.isBlank(betaIps)) {

if (StringUtils.isBlank(tag)) {

persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);

ConfigChangePublisher

.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));

} else {

persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);

ConfigChangePublisher.notifyConfigChange(

new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));

}

}//省略

return true;

}

AsyncNotifyService

配置数据变更事件,专门有一个监听器AsyncNotifyService,它会处理数据变更后的同步事件。

@Autowired

public AsyncNotifyService(ServerMemberManager memberManager) {

this.memberManager = memberManager;

// Register ConfigDataChangeEvent to NotifyCenter.

NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);

// Register A Subscriber to subscribe ConfigDataChangeEvent.

NotifyCenter.registerSubscriber(new Subscriber() {

@Override

public void onEvent(Event event) {

// Generate ConfigDataChangeEvent concurrently

if (event instanceof ConfigDataChangeEvent) {

ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;

long dumpTs = evt.lastModifiedTs;

String dataId = evt.dataId;

String group = evt.group;

String tenant = evt.tenant;

String tag = evt.tag;

Collection ipList = memberManager.allMembers(); //得到集群中的ip列表

// 构建NotifySingleTask,并添加到队列中。

Queue queue = new LinkedList();

for (Member member : ipList) { //遍历集群中的每个节点

queue.add(new NotifySingleTask(dataId, group, tenant, tNyWcOFag, dumpTs, member.getAddress(),

evt.isBeta));

}

//异步执行任务 AsyncTask

ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));

}

}

@Override

public Class extends Event> subscribeType() {

return ConfigDataChangeEvent.class;

}

});

}

AsyncTask

@Override

public void run() {

executeAsyncInvoke();

}

private void executeAsyncInvoke() {

while (!queue.isEmpty()) {//遍历队列中的数据,直到数据为空

NotifySingleTask task = queue.poll(); //获取task

String targetIp = task.getTargetIP(); //获取目标ip

if (memberManager.hasMember(targetIp)) { //如果集群中的ip列表包含目标ip

// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify

//判断目标ip的健康状态

boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp); //

if (unHealthNeedDelay) { //如果目标服务是非健康,则继续添加到队列中,延后再执行。

// target ip is unhealthy, then put it in the notification list

ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,

task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,

0, task.target);

// get delay time and set fail count to the task

asyncTaskExecute(task);

} else {

//构建header

Header header = Header.newInstance();

header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));

header.addParam(NotNyWcOFifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());

if (task.isBeta) {

header.addParam("isBeta", "true");

}

AuthHeaderUtil.addIdentityToHeader(header);

//通过restTemplate发起远程调用,如果调用成功,则执行AsyncNotifyCallBack的回调方法

restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));

}

}

}

}

目标节点接收请求

数据同步的请求地址为,task.url=http://192.168.8.16:8848/nacos/v1/cs/communication/dataChange?dataId=log.yaml&group=DEFAULT_GROUP

@GetMapping("/dataChange")

public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,

@RequestParam("group") String group,

@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,

@RequestParam(value = "tag", required = false) String tag) {

dataId = dataId.trim();

group = group.trim();

String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);

long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);

String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);

String isBetaStr = request.getHeader("isBeta");

if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {

dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);

} else {

//

dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);

}

return true;

}

dumpService.dump用来实现配置的更新,代码如下

当前任务会被添加到DumpTaskMgr中管理。

public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,

boolean isBeta) {

String groupKey = GroupKey2.getKey(dataId, group, tenant);

String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);

dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));

DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);

}

TaskManager.addTask, 先调用父类去完成任务添加。

@Override

public void addTask(Object key, AbstractDelayTask newTask) {

super.addTask(key, newTask);

MetricsMonitor.getDumpTaskMonitor().set(tasks.size());

}

在这种场景设计中,一般都会采用生产者消费者模式来完成,因此这里不难猜测到,任务会被保存到一个队列中,然后有另外一个线程来执行。

NacosDelayTaskExecuteEngine

TaskManager的父类是NacosDelayTaskExecuteEngine,

这个类中有一个成员属性protected final ConcurrentHashMap tasks;,专门来保存延期执行的任务类型AbstractDelayTask.

在这个类的构造方法中,初始化了一个延期执行的任务,其中具体的任务是ProcessRunnable.

public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {

super(logger);

tasks = new ConcurrentHashMap(initCapacity);

processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));

processingExecutor

.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);

}

ProcessRunnable

private class ProcessRunnable implements Runnable {

@Override

public void run() {

try {

processTasks();

} catch (Throwable e) {

getEngineLog().error(e.toString(), e);

}

}

}

processTasks

protected void processTasks() {

//获取所有的任务

Collection keys = getAllTaskKeys();

for (Object taskKey : keys) {

AbstractDelayTask task = removeTask(taskKey);

if (null == task) {

continue;

}

//获取任务处理器,这里返回的是DumpProcessor

NacosTaskProcessor processor = getProcessor(taskKey);

if (null == processor) {

getEngineLog().error("processor not found for task, so discarded. " + task);

continue;

}

try {

// ReAdd task if process failed

//执行具体任务

if (!processor.process(task)) {

retryFailedTask(taskKey, task);

}

} catch (Throwable e) {

getEngineLog().error("Nacos task execute error : " + e.toString(), e);

retryFailedTask(taskKey, task);

}

}

}

DumpProcessor.http://process

读取数据库的最新数据,然后更新本地缓存和磁盘

以上就是Nacos配置中心集群原理及源码分析的详细内容,更多关于Nacos配置中心集群原理的资料请关注我们其它相关文章!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:JavaWeb简单文件上传流程的实战记录
下一篇:mybatis实现特殊字段加密方式
相关文章

 发表评论

暂时没有评论,来抢沙发吧~