stream在nodejs中是EventEmitter的实现,并且有多种使用形式,比如文件流、压缩等

一、流的概念

var Stream = require("stream");
var stream = new Stream();


【实例一】
stream.readable = true; // 重点
stream.on('data', function(inputdata) {
	// foo.txt 字符串
	console.log(inputdata);
});
stream.emit("data", new Buffer("foo.txt"));


【实例二】
var duration = 5 * 1000; 
var end = Date.now() + duration;
var interval;
stream.readable = true;
stream.on('end', function(inputdata) {
	// 输出 Emitting an end event
	console.log(inputdata.toString());
});

stream.on('data',function(data){
	// <Buffer 66 6f 6f 2e 74 78 74>
	console.log(data);
	// foo.txt
	console.log(data.toString());
})

interval = setInterval(function(){
	var now = Date.now();
	console.log("Emitting a data event");
	stream.emit("data", new Buffer("foo.txt"));
	if(now >= end) {
		stream.emit("end", "Emitting an end event");
		clearInterval(interval);
	}
})

二、管道的概念

在命令行中|相当于pipe

B.js stdin(等待)

process.stdin.on("data",function(data){
	console.log(data.toString());
})
process.stdin.resume();

命令行中输入:

echo “rich” node B.js

打印出来就是rich

假如A.js

setInterval(function(){
	console.log("dddd")
},1000);

命令行中输入:

node A.js node B.js
【实例】
var bytes = 0;

// 开关
stream.writable = true;
stream.write = function(buffer) {
	bytes += buffer.length;
};

stream.end = function(buffer) {
	if(buffer){
		stream.write(buffer);
	}
	stream.writable = false;
	stream.emit("finish");
	// 3bytes written
	console.log(bytes + "bytes written");
}
stream.pipe(stream); // 设置可以写入时通过管道传给自己
/*
stream.readable = true;
stream.on('data',function(d){
	console.log(d);
})*/
stream.emit("data", new Buffer("foo"));
stream.emit("end");

三、文件流

流的意义 :细水长流 这样的话内存就不会撑爆 比如用readFile去读取一个上G的文件 就会直接报错 如下面这个例子运行所示:

var colors =  require('colors');
var fs = require("fs");
fs.readFile('/user/DVE.ISO', function(err,data) {
	if(!err){
		console.log(data.toString());
	}else {
		console.log(err);
	}
})

另外如果你要做一个网页服务器,用户在查看你网站的图片的时候,同时有100多个人在访问,若图片比较大(好几个M)的话 内存很容易吃不消 下面就是用的流的方式分段实现

/**************************
 * 可读流对象触发事件
 * data 当读取到来自文件、客户端、服务器端等对象的新数据时触发data事件
 * end  当读取完所有数据时触发
 * error 
 * close
 ********************/
 
var fs = require('fs');
var stream;
stream = fs.createReadStream("QQ.dmg");
stream.on("data", function(data){
	var chunk = data.toString();
	process.stdout.write(chunk);
})

stream.on("end",function(){
	console.log('d');
});

/*************************
 * 可读流对象的方法
 * pipe 用于设置一个数据通道,然后取出所有流数据并将其输出到通道另一端所指向的目标对象中 
 * resume 用于通知对象继续触发data事件
 * pause 用于通知对象停止触发data事件
*****************************/

var fs = require("fs");
var readStream = fs.createReadStream(__dirname+"/foo.txt");
var writeStream = fs.createWriteStream(__dirname + "/bar.txt"); 
readStream.pipe(writeStream);


/*************************
 * 可写流对象的事件
 * drain 当用于写入数据的write方法返回true之后触发,
 * 表示操作系统缓存区中的数据已全部输出到目标对象中,可以继续向操作系统缓存区中写入数据
 * open 需要被写入的文件已经被打开 
 * finish 当end方法被调用且数据被全部写入操作系统缓存区时触发
*****************************/ 
writeStream.on("open", function(fd){
	// 11
	console.log("file descriptor"+ fd); 
});
writeStream.on("finish",function(){
	// 27
	console.log(writeStream.bytesWritten); 
})

四、压缩和解压流(基于文件流)

// 压缩
var zlib = require("zlib");
var gzip = zlib.createGzip();*/
/*
var input = fs.createReadStream("foo.txt");
var output = fs.createWriteStream("input.txt.gz");
input.pipe(gzip).pipe(output);

// 解压
var gunzip = zlib.createGunzip();
var input  = fs.createReadStream("input.txt.gz");
var output = fs.createWriteStream("output.txt");

input.pipe(gunzip).pipe(output);