nodeJS之流stream

转换流

【stream.Transform】

  转换流(Transform streams) 是双工 Duplex
流,它的出口是从输入总计得来。 它完结了Readable 和 Writable 接口

 transform.prototype._transform = function (data, encoding, callback) {
  this.push(data);
  callback();
};

transform.prototype._transform = function (data, encoding, callback) {
  callback(null, data);
};

  Transform 同样是双工流,看起来和 Duplex
重复了,但二者有一个最主要的分别:Duplex
尽管还要持有可读流和可写流,但两者是争持独立的;Transform
的可读流的数据会经过一定的处理进程自动进入可写流。

  就算会从可读流进去可写流,但并不表示那二者的数据量相同,上面说的大势所趋的拍卖逻辑会决定假设tranform 可读流,然后放入可写流,transform 原义即为转变,很适量的叙述了
Transform 流功效。

  大家最广大的缩减、解压缩用的 zlib 即为 Transform
流,压缩、解压前后的数据量显然分裂,而流的机能就是输入一个 zip
包,输入一个解压文件或反过来。我们平日用的绝半数以上双工流都是 Transform。

  转换流(Transform streams) 的例证包罗:

zlib streams
crypto streams

【socket】

  net 模块可以用来成立 socket,socket 在 NodeJS 中是一个出类拔萃的 Duplex

var net = require('net');

//创建客户端
var client = net.connect({port: 1234}, function() {
    console.log('已连接到服务器');
    client.write('Hi!');
});

//data事件监听。收到数据后,断开连接
client.on('data', function(data) {
    console.log(data.toString());
    client.end();
});

//end事件监听,断开连接时会被触发
client.on('end', function() {
    console.log('已与服务器断开连接');
});

  能够看看 client 就是一个
Duplex,可写流用于向服务器发送信息,可读流用于接受服务器音讯,七个流内的数量并不曾直接的涉及

【gulp】

  gulp 格外擅长处理代码本地打造流程

gulp.src('client/templates/*.jade')
  .pipe(jade())
  .pipe(minify())
  .pipe(gulp.dest('build/minified_templates'));

  其中 jada() 和 minify() 就是优异的 Transform,处理流程大约是

.jade 模板文件 -> jade() -> html 文件 -> minify -> 压缩后的 html

  可以看出,jade() 和 minify()
都是对输入数据做了些新鲜处理,然后交由了出口数据。

  在平日采纳的时候,当一个流同时面向生产者和买主服务的时候我们会接纳Duplex,当只是对数码做一些更换工作的时候我们便会挑选采纳Tranform

 

用途

  写程序必要读取某个配置文件 config.json,那时候简单解析一下

数据:config.json 的内容
方向:设备(物理磁盘文件) -> NodeJS 程序

  大家相应采用 readable 流来做此事

const fs = require('fs');
const FILEPATH = '...';
const rs = fs.createReadStream(FILEPATH);

  通过 fs 模块提供的 createReadStream()
方法大家轻松的创设了一个可读的流,那时候 config.json
的始末从设备流向程序。大家并没有一贯行使 Stream 模块,因为 fs
内部已经引用了 Stream 模块,并做了打包。

  有了数量后大家要求处理,比如必要写到某个路径 DEST
,那时候大家遍必要一个 writable 的流,让多少从程序流向设备

const ws = fs.createWriteStream(DEST);

  三种流都有了,也就是三个数据加工器,那么大家怎样通过类似 Unix
的管道符号 | 来链接流呢?在 NodeJS 中管道符号就是 pipe() 方法。

const fs = require('fs');
const FILEPATH = '...';

const rs = fs.createReadStream(FILEPATH);
const ws = fs.createWriteStream(DEST);

rs.pipe(ws);

  那样大家应用流完毕了简短的文件复制效率,有个值得注意的地点是,数据必须是从上游
pipe 到下游,也就是从一个 readable 流 pipe 到 writable 流

  即便有个须要,把地点一个 package.json
文件中的所有字母都改为小写,并保留到同目录下的 package-lower.json 文件下

  那时候我们就需要用到双向的流了,假定大家有一个专门处理字符转小写的流
lower,那么代码写出来大约是这么的

const fs = require('fs');
const rs = fs.createReadStream('./package.json');
const ws = fs.createWriteStream('./package-lower.json');
rs.pipe(lower).pipe(ws);

rs -> lower:lower 在下游,所以 lower 需要是个 writable 流
lower -> ws:相对而言,lower 又在上游,所以 lower 需要是个 readable 流

  当然要是大家还有额外一些拍卖动作,比如字母还索要转成 ASCII 码

rs.pipe(lower).pipe(acsii).pipe(ws);

  同样 ascii 也务必是双向的流。那样处理的逻辑是十鲜明晰的

  有个用户要求在线看视频的现象,假定我们因此 HTTP
请求重返给用户电影内容

const http = require('http');
const fs = require('fs');

http.createServer((req, res) => {
   fs.readFile(moviePath, (err, data) => {
      res.end(data);
   });
}).listen(8080);

  那样的代码有三个显著的题材

  1、电影文件须求读完事后才能回去给客户,等待时间超长

  2、电影文件必要两回看入内存中,相似动作多了,内存吃不消

  用流可以将影片文件一点点的放入内存中,然后一点点的回到给客户(利用了
HTTP 协议的 Transfer-Encoding: chunked
分段传输特性),用户体验得到优化,同时对内存的开支分明下滑

const http = require('http');
const fs = require('fs');
http.createServer((req, res) => {
   fs.createReadStream(moviePath).pipe(res);
}).listen(8080);

  除了上述好处,代码优雅了成千成万,拓展也比较不难。比如须要对摄像内容收缩,大家可以引入一个特意做此事的流,那些流不用关爱其余一些做了怎么,只如若过渡管道中就足以了

const http = require('http');
const fs = require('fs');
const oppressor = require(oppressor);
http.createServer((req, res) => {
   fs.createReadStream(moviePath)
      .pipe(oppressor)
      .pipe(res);
}).listen(8080);

  可以看出来,使用流后,我们的代码逻辑变得相对独立,可维护性也会有肯定的改良

【文件复制】

  上边以流stream来兑现公文复制

var fs = require('fs');
var readStream = fs.createReadStream('a.txt');
var writeStream = fs.createWriteStream('aa.txt');

//读取数据
readStream.on('data',function(chunk){
    //如果读取的数据还在缓存区,还没有被写入
    if(writeStream.write(chunk) === false){
        //停止读数据
        readStream.pause();
    }
});

//如果数据读取完成
readStream.on('end',function(chunk){
    //停止写入数据
    writeStream.end();
});

//如果缓存区的数据被消耗完
writeStream.on('drain',function(){
    //接着读取数据
    readStream.resume();
});

  使用pipe()方法开展简化

var fs = require('fs');
var readStream = fs.createReadStream('a.txt');
var writeStream = fs.createWriteStream('aa.txt');
readStream.pipe(writeStream);

【远程访问文件】

var http = require('http');
var fs = require('fs');
http.createServer(function(req,res){
    fs.readFile('./a.txt',function(err,data){
        if(err){
            res.end('file not exist!');
        }else{
            res.writeHeader(200,{'Context-Type':'text/html'});
            res.end(data);
        }
    })
}).listen(8000);

  即便应用pipe()方法,则不难很多

var http = require('http');
var fs = require('fs');
http.createServer(function(req,res){
    fs.createReadStream('./a.txt').pipe(res);
}).listen(8000);

  甚至可以加载网上的文本,使用插件request

图片 1

var http = require('http');
var fs = require('fs');
var request = require('request');
http.createServer(function(req,res){
    request('https://www.cnblogs.comlogo_small.gif').pipe(res);
}).listen(8000);

图片 2

【自定义输入输出】

var stream = require('stream');
var Readable = stream.Readable;
var Writable = stream.Writable;

var readStream = new Readable();
var writeStream = new Writable();

readStream.push('I ');
readStream.push('Love ');
readStream.push('NodeJS\n');
readStream.push(null);

writeStream._write = function(chunk,encode,cb){
    console.log(chunk.toString());
    cb();
}
//I 
//Love 
//NodeJS
readStream.pipe(writeStream);

 【使用转换流举行成效定制】

var stream = require('stream');
var util = require('util');

function ReadStream(){
    stream.Readable.call(this);
}
util.inherits(ReadStream,stream.Readable);
ReadStream.prototype._read = function(){
    this.push('I ');
    this.push('Love ');
    this.push('NodeJS\n');
    this.push(null);    
}

function WriteStream(){
    stream.Writable.call(this);
    this._cached = Buffer.from('');
}
util.inherits(WriteStream,stream.Writable);
WriteStream.prototype._write = function(chunk,encode,cb){
    console.log(chunk.toString());
    cb();
}

function TransformStream(){
    stream.Transform.call(this);
}
util.inherits(TransformStream,stream.Transform);
TransformStream.prototype._transform = function(chunk,encode,cb){
    this.push(chunk);
    cb();
}
TransformStream.prototype._flush = function(cb){
    this.push('Oh Yeah!');
    cb();
}

var readStream = new ReadStream();
var writeStream = new WriteStream();
var transformStream = new TransformStream();
//I 
//Love 
//NodeJS
//
//Oh Yeah!
readStream.pipe(transformStream).pipe(writeStream);

 

可写流

  可写流是对数码流向设备的悬空,用来成本上游流过来的数码,通过可写流程序可以把多少写入设备,常见的是本土磁盘文件或者
TCP、HTTP 等互联网响应

process.stdin.pipe(process.stdout);

  process.stdout是一个可写流,程序把可读流 process.stdin
传过来的数量写入的规范输出设备

  Writable(可写流)包括:

HTTP requests, on the client
HTTP responses, on the server
fs write streams
[zlib streams][zlib]
crypto streams
TCP sockets
child process stdin
process.stdout, process.stderr

  [注意]地点的某些例子事实上是 Duplex 流,只是完毕了 Writable 接口

  所有 Writable 流都落到实处了 stream.Writable 类定义的接口。尽管特定的
Writable 流的落成可能略相差很大, 所有的 Writable streams
都可以按一种基本情势进行应用

var myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data');

【’close’ 事件】

  ’close’事件将在流或其底层资源(比如一个文本)关闭后触发。’close’事件触发后,该流将不会再触及任何事件

  [注意]不是怀有可写流都会触发 ‘close’ 事件

【’drain’ 事件】

  即使调用 stream.write(chunk) 方法再次来到 false,流将在适当的机遇触发
‘drain’ 事件,那时才足以一连向流中写入数据

// 向可写流中写入数据一百万次。
// 需要注意背压(back-pressure)
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000;
  write();
  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {
        // 最后 一次
        writer.write(data, encoding, callback);
      } else {
        // 检查是否可以继续写入。 
        // 这里不要传递 callback, 因为写入还没有结束! 
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // 这里提前停下了, 
      // 'drain' 事件触发后才可以继续写入  
      writer.once('drain', write);
    }
  }
}

【’error’ 事件】

  ’error’
事件在写入数据出错或者利用管道出错时触发。事件发生时,回调函数仅会吸收到一个
Error 参数

  [注意]’error’ 事件爆发时,流并不会关闭

【’finish’ 事件】

  在调用了 stream.end()
方法,且缓冲区数据都早就传给底层系统(underlying system)之后, ‘finish’
事件将被触发

const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`);
}
writer.end('This is the end\n');
writer.on('finish', () => {
  console.error('All writes are now complete.');
});

【’pipe’ 事件】

src <stream.Readable> 输出到目标可写流(writable)的源流(source stream)

  在可读流(readable stream)上调用 stream.pipe() 方法,并在对象流向
(destinations) 中添加当前可写流 ( writable ) 时,将会在可写流上触发
‘pipe’ 事件

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
  console.error('something is piping into the writer');
  assert.equal(src, reader);
});
reader.pipe(writer);

【’unpipe’ 事件】

src <Readable Stream> unpiped 当前可写流的源流

  在 Readable 上调用 stream.unpipe() 方法,从目的流向中移除当前
Writable 时,将会触发 ‘unpipe’ 事件

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
  console.error('Something has stopped piping into the writer.');
  assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);

【writable.cork()】

  调用 writable.cork() 方法将恫吓所有写入数据都内存中的缓冲区里。
直到调用 stream.uncork() 或 stream.end()
方法时,缓冲区里的多少才会被输出

  在向流中写入多量小块数据(small chunks of
data)时,内部缓冲区(internal
buffer)可能失效,从而造成质量下降。writable.cork()
方法首要就是用来防止那种场所。 对于那种场馆, 落成了 writable._writev()
方法的流可以对写入的数码开展缓冲,从而增强写入功效

【writable.end([chunk][, encoding][, callback])】

chunk <string> | <Buffer> | <Uint8Array> | <any> 

chunk <string> | <Buffer> | <Uint8Array> | <any> 可选的,需要写入的数据。对于非对象模式下的流, chunk 必须是字符串、或 Buffer、或 Uint8Array。对于对象模式下的流, chunk 可以是任意的 JavaScript 值,除了 null。
encoding <string> 如果 chunk 是字符串,这里指定字符编码。
callback <Function> 可选的,流结束时的回调函数

  调用 writable.end() 方法标明接下去没有数据要被写入
Writable。通过传播可选的 chunk 和 encoding
参数,可以在关闭流从前再写入一段数据。要是传入了可选的 callback
函数,它将作为 ‘finish’ 事件的回调函数。

  [注意]在调用了 stream.end() 方法之后,再调用 stream.write()
方法将会促成错误

// 写入 'hello, ' ,并用 'world!' 来结束写入
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// 后面不允许再写入数据!

【writable.setDefaultEncoding(encoding)】

encoding <string> 新的默认编码
返回: this

  writable.setDefaultEncoding() 用于为 Writable 设置 encoding

【writable.uncork()】

  writable.uncork() 将出口在 stream.cork()
方法被调用之后缓冲在内存中的所有数据

  假设选拔 writable.cork() 和 writable.uncork()
来管理写入缓存,指出使用 process.nextTick() 来延缓调用 writable.uncork()
方法。通过那种方法,可以对单个 Node.js 事件循环中调用的持有
writable.write() 方法开展批处理

stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork());

  要是一个流多次调用了 writable.cork() 方法,那么也非得调用同样次数的
writable.uncork() 方法以出口缓冲区数据

stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
  stream.uncork();
  // 之前的数据只有在 uncork() 被二次调用后才会输出
  stream.uncork();
});

【writable.write(chunk[, encoding][, callback])】

chunk <string> | <Buffer> | <Uint8Array> | <any> 要写入的数据。可选的。 For streams not operating in object mode, chunk must be a string, Buffer or Uint8Array. For object mode streams, chunk may be any JavaScript value other than null.
encoding <string> 如果 chunk 是字符串,这里指定字符编码
callback <Function> 缓冲数据输出时的回调函数
返回: <boolean> 如果流需要等待 'drain' 事件触发才能继续写入数据,这里将返回 false ; 否则返回 true。

  writable.write() 方法向流中写入数据,并在多少处理完了后调用 callback
。假使有荒唐爆发,
callback不必然会收取到这几个荒唐当做第二个参数。要确保可信赖地检测到写入错误,应该监听
‘error’ 事件。

  在确认了 chunk 后,借使中间缓冲区的大小小于成立流时设定的
high沃特er马克 阈值,函数将回来 true 。 若是重返值为 false
,应该为止向流中写入数据,直到 ‘drain’ 事件被触发。

  当一个流不处在 drain 的景况, 对 write() 的调用会缓存数据块,
并且再次来到 false。
一旦有所当前颇具缓存的数码块都排空了(被操作系统接受来展开输出), 那么
‘drain’ 事件就会被触发。 大家提议, 一旦 write() 重返 false, 在 ‘drain’
事件触发前, 不可能写入其它数据块。 可是,当流不处在 ‘drain’ 状态时, 调用
write() 是被允许的, Node.js 会缓存所有曾经写入的数据块,
直到达到最大内存占用, 这时它会白白中止。 甚至在它搁浅之前,
高内存占用将会促成差的垃圾回收器的质量和高的连串相对敏感性
(即便内存不在必要,也见惯司空不会被放出回系统)。
假如远程的另一端没有读取数据, TCP sockets 可能永远也不会 drain ,
所以写入到一个不会drain的socket可能会导致远程可选拔的漏洞。

  对于一个 Transform, 写入数据到一个不会drain的流越发成难点, 因为
Transform 流默许被中止, 直到它们被pipe或者被添加了 ‘data’ 或 ‘readable’
event handler。

  如若将要被写入的多寡足以根据必要转变仍旧取得,我们提议将逻辑封装为一个
Readable 流并且采用 stream.pipe()。 不过一旦调用 write() 优先,
那么可以选拔 ‘drain’ 事件来严防回压并且防止内存难点:

function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb);
  } else {
    process.nextTick(cb);
  }
}

// Wait for cb to be called before doing any other write.
write('hello', () => {
  console.log('write completed, do more writes now');
});

  [注意]目的形式的写入流将忽略 encoding 参数

【writable.destroy([error])】

  销毁流,并释放已传递的荒唐。在那将来,可写的流已经停止了。完结者不应该覆盖此办法,而是完结writable._destroy

 

概述

  流(stream)在Nodejs中是拍卖流数据的肤浅接口。stream模块提供了根基的API
。使用那么些API可以很不难地来创设完结流接口的对象。Nodejs提供了各类流对象。
例如,HTTP请求和process.stdout都是流的实例

  流可以是可读的、可写的,或是可读写的。所有的流都是 伊夫ntEmitter
的实例。

  固然拥有的 Node.js 用户都应当明白流的劳作办法,那点很要紧, 可是stream 模块本身只对于那么些需要创立新的流的实例的开发者最有用处。
对于第一是消费流的开发者来说,他们很少(倘诺局地话)须求一直行使 stream
模块

【类型】

  Node.js 中有七种为主的流类型:

Readable - 可读的流 (例如 fs.createReadStream()).
Writable - 可写的流 (例如 fs.createWriteStream()).
Duplex - 可读写的流 (例如 net.Socket).
Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).

  所有应用 Node.js API 创建的流对象都只可以操作 strings 和 Buffer(或
Uint8Array) 对象。不过,通过一些第三方流的贯彻,依旧可以处理其余类型的
JavaScript 值 (除了 null,它在流处理中有例外意义)。 那一个流被认为是做事在
“对象格局”(object mode)

  在创造流的实例时,可以透过 objectMode
选项使流的实例切换来对象方式。试图将早已存在的流切换来对象情势是不安全的

【缓冲】

  Writable和Readable流都会将数据存储到内部的缓存(buffer)中。这一个缓存可以经过相应的writable._writableState.getBuffer()或readable._readableState.buffer来获取

  缓存的尺寸取决于传递给流构造函数的high沃特er马克选项。
对于一般的流,high沃特er马克选项指定了总共的字节数。对于工作在目的情势的流,highWater马克指定了对象的总数

  当可读流的已毕调用stream.push(chunk)方法时,数据被停放缓存中。如若流的买主没有调用stream.read()方法,
那一个数据会始终存在于其中队列中,直到被消费

  当其中可读缓存的分寸达到highWater马克指定的阈值时,流会暂停从底层资源读取数据,直到当前缓存的多少被消费(也就是说,流会在里边甘休调用readable._read()来填充可读缓存)

  可写流通过反复调用writable.write(chunk)方法将数据放到缓存。当其中可写缓存的总大小小于high沃特er马克指定的阈值时,调用writable.write()将返true。
一旦中间缓存的高低达到或超越high沃特er马克,调用writable.write()将回来false

  stream API 的基本点目的, 越发对于 stream.pipe() 方法,
就是限量缓存数据大小,以完毕可承受的品位。那样,对于读写速度不般配的源头和目的,就不会压倒可用的内存大小。

  Duplex和Transform都是可读写的。 在里头,它们都维护了两
互相独立的缓存用于读和写。
在保险了客观高效的数据流的同时,也使得对于读和写可以独自进行而互不影响。
例如,
net.Socket就是Duplex的实例,它的可读端可以开销从套接字(socket)中接受的数码,
可写端则可以将数据写入到套接字。
由于数量写入到套接字中的速度可能比从套接字接收数据的进度快或者慢,
在读写两端选用独立缓存,并拓展单独操作就显得很首要了

  大约所有的 Node.js 应用,不管多么简单,都在某种程度上使用了流。
下边是在 Node.js 应用中选用流完毕的一个简练的 HTTP 服务器

var http = require('http');
var server = http.createServer((req, res) => {
  // req 是一个 Readable Stream;res 是一个 Writable Stream
  var body = '';
  req.setEncoding('utf8');
  req.on('data', (chunk) => {
    body += chunk;
  });
  req.on('end', () => {
    try {
      var data = JSON.parse(body);
      res.write(typeof data);
      res.end();
    } catch (er) {
      res.statusCode = 400;
      return res.end(`error: ${er.message}`);
    }
  });
});
server.listen(1337);

  Writable 流 (比如例子中的 res) 暴光了有些主意,比如 write() 和 end()
。那个主意可以将数据写入到流中。当流中的数据足以读取时,Readable 流使用
伊芙ntEmitter API 来打招呼应用。
那几个数量可以运用各样方法从流中读取。Writable 和 Readable 流都应用了
伊夫ntEmitter API ,通过三种主意, 与流的当前情景进行相互。Duplex 和
Transform 都是还要满意 Writable 和 Readable
。对于只是不难写入数据到流和从流中消费数据的应用来说,
不需求直接完毕流接口,平常也不必要调用 require(‘stream’)

 

眼前的话

  当内存中不可能三回装下须求处理的数据时,或者一边读取一边处理越发快速时,大家就要求用到数据流。NodeJS中通过各样Stream来提供对数据流的操作。本文将详细表明NodeJS中的流stream

 

读写流

  读写流又叫双工流,就是同时已毕了 Readable 和 Writable
的流,即可以看成上游生产数据,又可以看作下游消费数量,那样可以处于数据流动管道的中档有些

rs.pipe(rws1).pipe(rws2).pipe(rws3).pipe(ws);

  在 NodeJS 中双工流常用的有三种:Duplex和Transform

【stream.Duplex】

  双工流(Duplex streams)是还要落实了 Readable and Writable 接口

const Duplex = require('stream').Duplex;

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  }
});

  Duplex 实例内同时含有可读流和可写流,在实例化 Duplex
类的时候可以传递多少个参数

readableObjectMode : 可读流是否设置为 ObjectMode,默认 false
writableObjectMode : 可写流是否设置为 ObjectMode,默认 false
allowHalfOpen : 默认 true, 设置成 false 的话,当写入端结束的时,流会自动的结束读取端,反之亦然。

  双工流(Duplex streams) 的例子包涵:

tcp sockets
zlib streams
crypto streams

 

可读流

  可读流(Readable
streams)是对提供数据的源流(source)的画个饼来解除饥饿,是生育数量用来供程序消费的流。大家周边的数量生产方式有读取磁盘文件、读取网络请求内容等

const rs = fs.createReadStream(filePath);

  rs就是一个可读流,其生产数量的格局是读取磁盘的文书,大家常见的控制台process.stdin也是一个可读流

process.stdin.pipe(process.stdout);

  通过简单的一句话能够把控制台的输入打印出来,process.stdin
生产数量的法门是读取用户在控制台的输入

  可读流的例证包罗:

HTTP responses, on the client
HTTP requests, on the server
fs read streams
[zlib streams][zlib]
crypto streams
TCP sockets
child process stdout and stderr
process.stdin

  [注意]不无的 Readable 都达成了 stream.Readable 类定义的接口

【二种方式】

  可读流事实上工作在上边二种模式之一:flowing 和 paused 。

  在flowing格局下,可读流自动从系统底层读取数据,并由此伊芙ntEmitter接口的风波尽快将数据提须求使用

  在paused形式下,必须显式调用 stream.read()
方法来从流中读取数据片段。

  所有伊始工作方式为 paused 的 Readable 流,可以透过下边三种途径切换来flowing 方式:

监听 'data' 事件。
调用 stream.resume() 方法。
调用 stream.pipe() 方法将数据发送到 Writable。

  可读流可以经过下边途径切换来 paused 情势:

如果不存在管道目标(pipe destination),可以通过调用 stream.pause() 方法实现。
如果存在管道目标,可以通过取消 'data' 事件监听,并调用 stream.unpipe() 方法移除所有管道目标来实现。

  可读流必要先为其提供消费或忽视数据的建制,才能开端提供数据。要是消费机制被剥夺或吊销,可读流将尝试截至生成数据。

  为了向后优良,裁撤 ‘data’
事件监听并不会自行将流暂停。同时,假使存在管道目的(pipe
destination),且对象状态成为可以接收数据(drain and ask for more
data),调用了 stream.pause() 方法也并不有限支撑流会一向 保持 暂停状态。

  如若 Readable 切换来 flowing
方式,且从未消费者处理流中的多寡,这个数据将会丢掉。比如,调用了
readable.resume() 方法却绝非监听 ‘data’ 事件,或是裁撤了 ‘data’
事件监听,就有可能出现这种情状

【三种状态】

  可读流的“二种操作方式”是一种简易抽象。它抽象了在可读流完结(Readable
stream implementation)内部发生的复杂的情形管理进程。

  在随心所欲时刻,任意可读流应确切处于上面三种状态之一:

readable._readableState.flowing = null
readable._readableState.flowing = false
readable._readableState.flowing = true

  若 readable._readableState.flowing 为
null,由于不存在数据消费者,可读流将不会发出多少。

  若是监听 ‘data’ 事件,调用 readable.pipe() 方法,或者调用
readable.resume() 方法, readable._readableState.flowing 的值将会成为
true 。那时,随着数据变化,可读流开首屡屡接触事件。

  调用 readable.pause() 方法, readable.unpipe() 方法, 或者接收
“背压”(back pressure), 将招致 readable._readableState.flowing 值变为
false。 这将暂停事件流,但 不会 暂停数据变化。

  当 readable._readableState.flowing 值为 false 时,
数据或者堆积到流的内部缓存中

  可读流 API 的演化贯穿了多少个 Node.js
版本,提供了多样办法来消费流数据。经常开发者应该接纳之中一种来开支数据,而不该在单个流使用多样措施来开销数量

  对于半数以上用户,指出采取readable.pipe()方法来消费流数据,因为它是最不难易行的一种已毕。开发者借使要精细地控制数据传递和暴发的长河,可以应用伊夫ntEmitter
和 readable.pause()/readable.resume() 提供的 API 

【’close’ 事件】

  ’close’事件将在流或其底层资源(比如一个文书)关闭后触发。’close’事件触发后,该流将不会再触及任何事件

  [注意]不是所有 Readable 都会触发 ‘close’ 事件

【’data’ 事件】

chunk <Buffer> | <string> | <any> 数据片段。对于非对象模式的可读流,这是一个字符串或者 Buffer。 对于对象模式的可读流,这可以是除 null 以外的任意类型 JavaScript 值。

  ’data’ 事件会在流将数据传递给顾客时接触。当流转换来 flowing
方式时会触发该事件。调用 readable.pipe(), readable.resume() 方法,或为
‘data’ 事件添加回调可以将流转换来 flowing 方式。 ‘data’ 事件也会在调用
readable.read() 方法并有多少重返时接触。

  在没有明显暂停的流上添加’data’事件监听会将流转换为flowing情势。数据会在可用时超过传递给下个流程

  即使调用 readable.setEncoding()
方法显然为流指定了默许编码,回调函数将选拔到一个字符串,否则接收到的数目将是一个
Buffer 实例

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});

【’end’ 事件】

  ’end’ 事件将在流中再没有数量可供消费时接触。

  [注意]’end’ 事件唯有在数据被全然消费后才会触发 。
可以在数量被统统消费后,通过将流转换来 flowing 方式, 或频繁调用
stream.read() 方法来贯彻那或多或少

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
  console.log('There will be no more data.');
});

【’error’ 事件】

  ’error’ 事件可以在其他时候在可读流完结(Readable
implementation)上接触。
平时,这会在底部系统里头出错从而不可能暴发多少,或当流的兑现试图传递错误数据时发出。

  回调函数将接收到一个 Error 对象

【’readable’ 事件】

  ’readable’ 事件将在流中有数据可供读取时触发。在少数情形下,为
‘readable’ 事件添加回调将会招致有些数额被读取到内部缓存中

const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  // 有一些数据可读了
});

  当到达流数据尾部时, ‘readable’ 事件也会触发。触发顺序在 ‘end’
事件从前。

  事实上, ‘readable’
事件注明流有了新的动态:要么是有了新的数量,要么是到了流的尾巴。
对于前者, stream.read() 将再次回到可用的数码。而对此后者, stream.read()
将赶回 null。 例如,上边的例子中的 foo.txt 是一个空文件:  

const fs = require('fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
  console.log('readable:', rr.read());
});
rr.on('end', () => {
  console.log('end');
});

  [注意]一般性状态下, 应该使用 readable.pipe() 方法和 ‘data’
事件编制,而不是 ‘readable’ 事件

【readable.isPaused()】

返回: <boolean>

  readable.isPaused() 方法重回可读流的当前操作意况。 该办法首假如在
readable.pipe()
方法的最底层机制中用到。大多数情景下,没有要求直接动用该情势

const readable = new stream.Readable();

readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false

【readable.pause()】

  返回: this

  readable.pause() 方法将会使 flowing 格局的流为止触发 ‘data’ 事件,
进而切出 flowing 情势。任何可用的数量都将保存在内部缓存中

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
  readable.pause();
  console.log('There will be no additional data for 1 second.');
  setTimeout(() => {
    console.log('Now data will start flowing again.');
    readable.resume();
  }, 1000);
});

【readable.pipe(destination[, options])】

    destination <stream.Writable> 数据写入目标
    options <Object> Pipe 选项
        end <boolean> 在 reader 结束时结束 writer 。默认为 true。

  readable.pipe() 绑定一个 Writable 到 readable 上,
将可写流自动切换来 flowing 格局并将具有数据传给绑定的
Writable。数据流将被活动管理。那样,固然是可读流较快,目标可写流也不会过分(overwhelmed)。

  下边例子将 readable 中的所有数据通过管道传递给名为 file.txt 的文件

const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// readable 中的所有数据都传给了 'file.txt'
readable.pipe(writable);

  可以在单个可读流上绑定多少个可写流。

  readable.pipe()
方法重临目的流的引用,那样就足以对流进行链式地管道操作:

const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);

  默许意况下,当源可读流(the source Readable
stream)触发’end’事件时,目的流也会调用stream.end()方法从而停止写入。要禁用这一默许行为,
end选项应该指定为false,那将使目标流保持开拓, 如下所示:

reader.pipe(writer, { end: false });
reader.on('end', () => {
  writer.end('Goodbye\n');
});

  倘使可读流在拍卖时暴发错误,目的可写流不会活动关闭。
如果暴发错误,需求手动关闭所有流以避免内存泄漏。

  [注意]不论是对 process.stderr 和 process.stdout
指定什么选项,它们都是截止 Node.js 进度退出才关闭

【readable.read([size])】

size <number> Optional argument to specify how much data to read.
Return <string> | <Buffer> | <null>

  readable.read()方法从中间缓冲区中抽出并赶回一些数目。
如果没有可读的多寡,重临null。readable.read()方法默许数据将用作“Buffer”对象回来
,除非已经拔取readable.setEncoding()方法设置编码或流运行在目标格局。

  可选的size参数指定要读取的一定数量的字节。假设size字节不可读,将再次回到null除非流已经截至,在这种场所下所有保留在中间缓冲区的数目将被重返(纵然它超过size
字节 )

  要是没有点名size参数,则内部缓冲区包括的兼具数据将回到。

  readable.read()方法只应该在刹网店模特式下的可读流上运行。在流格局下,readable.read()自动调用直到内部缓冲区的多少完全耗尽

const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  let chunk;
  while (null !== (chunk = readable.read())) {
    console.log(`Received ${chunk.length} bytes of data.`);
  }
});

  一般的话,防止接纳’readable’事件和readable.read()方法,使用readable.pipe()或’data’事件代表

  无论size参数的值是什么,对象情势中的可读流将始终再次回到调用readable.read(size)的单个项目。

  [注意]倘诺readable.read()方法再次来到一个数据块,那么一个’data’事件也将被发送。在曾经被发生的’end’事件后调用stream.read([size])事件将回到null。不会抛出运行时不当

【readable.resume()】

Returns: this

  readable.resume()方法使一个显式暂停的可读流复苏发出“数据”事件,将流转换为流情势。 

  readable.
resume()方法可用以从流中完全地动用数据,而不须求实际处理任何数据,如以下示例所示:

getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('Reached the end, but did not read anything.');
  });

【readable.setEncoding(encoding)】

encoding <string> 要使用的编码
Returns: this

  readble.setEncoding() 方法会为从可读流读入的数码设置字符编码

  By default, no encoding is assigned and stream data will be returned
as Buffer objects.
设置编码会使得该流数据重返指定编码的字符串而不是Buffer对象。例如,调用readable.setEncoding(‘utf-8’)会使得出口数据作为UTF-8数据解析,并作为字符串再次回到。调用readable.setEncoding(‘hex’)使得数据被编码成16进制字符串格式。

  可读流会妥善处理多字节字符,假诺单独直接从流中取出Buffer对象,很可能会招致错误解码

const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
  assert.equal(typeof chunk, 'string');
  console.log('got %d characters of string data', chunk.length);
});

【readable.unpipe([destination])】

destination <stream.Writable>  可选的特定流到unpipe

  unpipe()方法通过行使stream. pipe()方法来分别此前附加的可写流。

  即使没有点名目的地,则具有管道都是独立的。借使指定了目的地,不过尚未设置管道,则什么都不做

const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second
readable.pipe(writable);
setTimeout(() => {
  console.log('Stop writing to file.txt');
  readable.unpipe(writable);
  console.log('Manually close the file stream');
  writable.end();
}, 1000);

【readable.unshift(chunk)】

chunk <Buffer> | <Uint8Array> | <string> | <any> 将数据块移到读队列上。对于不以对象模式操作的流,块必须是字符串、缓冲区或Uint8Array。对于对象模式流,块可能是除了null之外的任何JavaScript值。

  unshift()方法将数据块返回到中间缓冲区中。那在一些景况下是有效的,因为在好几景况下,流被亟需“不消耗”一些数额的代码所用度,而那个多少是无忧无虑地从源代码中领到出来的,那样数据就可以传递给其它的一方。
  [注意]在“end”事件时有发生或将抛出运行时不当之后,无法调用流。使用stream.
unshift()的开发人士常常应该考虑改用转换流

// Pull off a header delimited by \n\n
// use unshift() if we get too much
// Call the callback with (error, header, stream)
const StringDecoder = require('string_decoder').StringDecoder;
function parseHeader(stream, callback) {
  stream.on('error', callback);
  stream.on('readable', onReadable);
  const decoder = new StringDecoder('utf8');
  let header = '';
  function onReadable() {
    let chunk;
    while (null !== (chunk = stream.read())) {
      const str = decoder.write(chunk);
      if (str.match(/\n\n/)) {
        // found the header boundary
        const split = str.split(/\n\n/);
        header += split.shift();
        const remaining = split.join('\n\n');
        const buf = Buffer.from(remaining, 'utf8');
        stream.removeListener('error', callback);
        // remove the readable listener before unshifting
        stream.removeListener('readable', onReadable);
        if (buf.length)
          stream.unshift(buf);
        // now the body of the message can be read from the stream.
        callback(null, header, stream);
      } else {
        // still reading the header.
        header += str;
      }
    }
  }
}

【readable.destroy([error])】

  销毁流,并爆发“错误”。调用后,可读流将释放其余内部资源。完毕者不该覆盖此措施,而是完成readable._destroy