浅谈Node异步编程的机制

网友投稿 260 2023-03-25


浅谈Node异步编程的机制

本文介绍了Node异步编程,分享给大家,具体如下:

目前的异步编程主要解决方案有:

事件发布/订阅模式

Promise/Deferred模式

流程控制库

事件发布/订阅模式

Node自身提供了events模块,可以轻松实现事件的发布/订阅

//订阅

emmiter.on("event1",function(message){

console.log(message);

})

//发布

emmiter.emit("event1","I am mesaage!");

侦听器可以很灵活地添加和删除,使得事件和具体处理逻辑之间可以很轻松的关联和解耦

事件发布/订阅模式常常用来解耦业务逻辑,事件发布者无需关注订阅的侦听器如何实现业务逻辑,甚至不用关注有多少个侦听器存在,数据通过消息的方式可以很灵活的进行传递。

下面的HTTP就是典型的应用场景

var req = http.request(options,function(res){

res.on('data',function(chunk){

console.log('Body:'+ chunk);

})

res.on('end',function(){

//TODO

})

})

如果一个事件添加了超过10个侦听器,将会得到一条警告,可以通过调用emmite.setMaxListeners(0)将这个限制去掉

继承events模块

var events = require('events');

function Stream(){

events.EventEmiiter.call(this);

}

util.inherits(Stream,events.EventEmitter);

利用事件队列解决雪崩问题

所谓雪崩问题,就是在高访问量,大并发量的情况下缓存失效的情况,此时大量的请求同时融入数据库中,数据库无法同时承受如此大的查询请求,进而往前影响到网站整体的响应速度

解决方案:

var proxy = new events.EventEmitter();

var status = "ready";

var seletc = function(callback){

proxy.once("selected",callback);//为每次请求订阅这个查询时间,推入事件回调函数队列

if(status === 'ready'){

status = 'pending';//设置状态为进行中以防止引起多次查询操作

db.select("SQL",function(results){

proxy.emit("selected",results); //查询操作完成后发布时间

status = 'ready';//重新定义为已准备状态

})

}

}

多异步之间的协作方案

以上情况事件与侦听器的关系都是一对多的,但在异步编程中,也会出现事件与侦听器多对一的情况。

这里以渲染页面所需要的模板读取、数据读取和本地化资源读取为例简要介绍一下

var count = 0 ;

var results = {};

var done = function(key,value){

result[key] = value;

count++;

if(count === 3){

render(results);

}

}

fs.readFile(template_path,"utf8",function(err,template){

done('template',template)

})

db.query(sql,function(err,data){

done('data',data);

})

l10n.get(function(err,resources){

done('resources',resources)

})

偏函数方案

var after = function(times,callback){

var count = 0, result = {};

return function(key,value){

results[key] = value;

count++;

if(count === times){

callback(results);

}

}

}

var done = after(times,render);

var emitter = new events.Emitter();

emitter.on('done',done); //一个侦听器

emitter.on('done',other); //如果业务增长,可以完成多对多的方案

fs.readFile(template_path,"utf8",function(err,template){

emitter.emit('done','template',template);

})

db.query(sql,function(err,data){

emitter.emit('done','data',data);

})

l10n.get(function(err,resources){

emitter.emit('done','resources',resources)

})

引入EventProxy模块方案

var proxy = new EventProxy();

proxy.all('template','data','resources',function(template,data,resources){

//TODO

})

fs.readFile(template_path,'utf8',function(err,template){

proxy.emit('template',temhttp://plate);

})

db.query(sql,function(err,data){

proxy.emit('data',data);

})

l10n.get(function(err,resources){

proxy.emit('resources',resources);

})

Promise/Deferred模式

以上使用事件的方式时,执行流程都需要被预先设定,这是发布/订阅模式的运行机制所决定的。

$.get('/api',{

success:onSuccess,

err:onError,

complete:onComplete

})

//需要严谨设置目标

那么是否有一种先执行异步调用,延迟传递处理的方式的?接下来要说的就是针对这种情况的方式:Promise/Deferred模式

Promise/A

Promise/A提议对单个异步操作做出了这样的抽象定义:

Promise操作只会处在三种状态的一种:未完成态,完成态和失败态。

Promise的状态只会出现从未完成态向完成态或失败态转化,不能逆反,完成态和失败态不能相互转化

Promise的状态一旦转化,就不能被更改。

一个Promise对象只要具备then()即可

接受完成态、错误态的回调方法

可选地支持progress事件回调作为第三个方法

then()方法只接受function对象,其余对象将被忽略

then()方法继续返回Promise对象,以实现链式调用

通过Node的events模块来模拟一个Promise的实现

var Promise = function(){

EventEmitter.call(this)

}

util.inherits(Promise,EventEmitter);

Promise.prototype.then = function(fulfilledHandler,errHandler,progeressHandler){

if(typeof fulfilledHandler === 'function'){

this.once('success',fulfilledHandler); //实现监听对应事件

}

if(typeof errorHandler === 'function'){

this.once('error',errorHandler)

}

if(typeof progressHandler === 'function'){

this.on('progress',progressHandler);

}

return this;

}

以上通过then()将回调函数存放起来,接下来就是等待success、error、progress事件被触发,实现这个功能的对象称为Deferred对象,即延迟对象。

var Deferred = function(){

this.state = 'unfulfilled';

this.promise = new Promise();

}

Deferred.prototype.resolve = function(obj){ //当异步完成后可将resolve作为回调函数,触发相关事件

this.state = 'fulfilled';

this.promise.emit('success',obj);

}

Deferred.prototype.reject = function(err){

this.state = 'failed';

this.promise.emit('error',err);

}

Deferred.prototype.progress = function(data){

this.promise.emit('progress',data)

}

因此,可以对一个典型的响应对象进行封装

res.setEncoding('utf8');

res.on('data',function(chunk){

console.log("Body:" + chunk);

})

res.on('end',function(){

//done

})

res.on('error',function(err){

//error

}

转换成

res.then(function(){

//done

},function(err){

//error

},function(chunk){

console.log('Body:' + chunk);

})

要完成上面的转换,首先需要对res对象进行封装,对data,end,error等事件进行promisify

var promisify = function(res){

var deferred = new Deferred(); //创建一个延迟对象来在res的异步完成回调中发布相关事件

var result = ''; //用来在progress中持续接收数据

res.on('data',function(chunk){ //res的异步操作,回调中发布事件

result += chunk;

deferred.progress(chunk);

})

res.on('end',function(){

deferred.resolve(result);

})

res.on('error',function(err){

deferred.reject(err);

});

return deferred.promise //返回deferred.promise,让外界不能改变deferred的状态,只能让promise的then()方法去接收外界来侦听相关事件。

}

promisify(res).then(function(){

//done

},function(err){

//error

},function(chunk){

console.log('Body:' + chunk);

})

以上,它将业务中不可变的部分封装在了Deferred中,将可变的部分交给了Promise

Promise中的多异步协作

Deferred.prototype.all = function(promises){

var count = promises.length; //记录传进的promise的个数

var that = this; //保存调用all的对象

var results = [];//存放所有promise完成的结果

promises.forEach(function(promise,i){//对promises逐个进行调用

promise.then(function(data){//每个promise成功之后,存放结果到result中,count--,直到所有promise被处理完了,才出发deferred的resolve方法,发布事件,传递结果出去

count--;

result[i] = data;

if(count === 0){

that.resolve(resultswhBiBf);

}

},function(err){

that.reject(err);

});

});

return this.promise; //返回promise来让外界侦听这个deferred发布的事件。

}

var promise1 = readFile('foo.txt','utf-8');//这里的文件读取已经经过promise化

var promise2 = readFile('bar.txt','utf-8');

var deferred = new Deferred();

deferred.all([promise1,promise2]).thne(function(results){//promise1和promise2的then方法在deferred内部的all方法所调用,用于同步所有的promise

//TODO

},function(err){

//TODO

})

支持序列执行的Promise

尝试改造一下代码以实现链式调用

var Deferred = function(){

this.promise = new Promise()

}

//完成态

Deferred.prototype.resolve = function(obj){

var promise = this.promise;

var handler;

while((handler = promise.queue.shift())){

if(handler && handler.fulfilled){

var ret = handler.fulfilled(obj);

if(ret && ret.isPromise){

ret.queue = promhttp://ise.queue;

this.promise = ret;

return;

}

}

}

}

//失败态

Deferred.prototype.reject = function(err){

var promise = this.promise;

var handler;

while((handler = promise.queue.shift())){

if(handler && handler.error){

var ret = handler.error(err);

if(ret && ret.isPromise){

ret.queue = promise.queue;

this.promise = ret;

return

}

}

}

}

//生成回调函数

Deferred.prototype.callback = function(){

var that = this;

return function(err,file){

if(err){

return that.reject(err);

}

that.resolve(file)

}

}

var Promise = function(){

this.queue = []; //队列用于存储待执行的回到函数

this.isPromise = true;

};

Promise.prototype.then = function(fulfilledHandler,errorHandler,progressHandler){

var handler = {};

if(typeof fulfilledHandler === 'function'){

handler.fulfilled = fulfilledHandler;

}

if(typeof errorHandler === 'function'){

handler.error = errorHandler;

}

this.queue.push(handler);

return this;

}

var readFile1 = function(file,encoding){

var deferred = new Deferred();

fs.readFile(file,encoding,deferred.callback());

return deferred.promise;

}

var readFile2 = function(file,encoding){

var deferred = new Deferred();

fs.readFile(file,encoding,deferred.callback());

return deferred.promise;

}

readFile1('file1.txt','utf8').then(function(file1){

return readFile2(file1.trim(),'utf8')

}).then(function(file2){

console.log(file2)

})

流程控制库另外进行总结

参考《深入浅出node.js》一书,想学学习可以下载电子书,下载地址://jb51.net/books/481114.html


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:JNI语言基本知识
下一篇:基于Vue实现图书管理功能
相关文章

 发表评论

暂时没有评论,来抢沙发吧~