Node全解09_02Stream自定义

自定义Stream

Writable Stream

1
2
3
4
5
6
7
8
9
10
11
const { Writable } = require("stream");

const outStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
});

// process.stdin 就是标准输入
process.stdin.pipe(outStream);

Readable Stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const { Readable } = require('stream')

const inStream = new Readable()

inStream.push('ABCDEFGHIJKLM')
inStream.push('NOPQRSTUVWXYZ')

inStream.push(null) // No more data

inStream.on('data', (chunk) => {
process.stdout.write(chunk)
console.log('写数据了')
})

// 把所有数据都 push进去了 然后在 pipe

对方调用 read 我们才提供数据

  • 如果你想创建一个可读的流应该是 等别人 read在推 而不是上面 那样提前push好
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const { Readable } = require("stream");

const inStream = new Readable({
read(size) {
const char = String.fromCharCode(this.currentCharCode++)
this.push(char);
console.log(`推了 ${char}`)
if (this.currentCharCode > 90) { // Z
this.push(null);
}
}
})

inStream.currentCharCode = 65 // A

inStream.pipe(process.stdout)

Duplex 流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const { Duplex } = require("stream");
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});

inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);

Transform

1
2
3
4
5
6
7
8
const { Transform } = require("stream");
const upperCaseTr = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(upperCaseTr).pipe(process.stdout);

内置的 Transform Stream

1
2
3
4
5
6
7
const fs = require("fs");
const zlib = require("zlib");
const file = process.argv[2]; // 文件路径

fs.createReadStream(file) // 创建流 一点一点的读
.pipe(zlib.createGzip()) // 每次读就传给 gzip 压缩
.pipe(fs.createWriteStream(file + ".gz")); // 将压缩后的 gzip流 传给一个可写的流

加密 / 压缩 / 打印进入 / 完成提示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const fs = require("fs");
const zlib = require("zlib");
const file = process.argv[2];
const crypto = require("crypto");


const { Transform } = require("stream");

const reportProgress = new Transform({
transform(chunk, encoding, callback) {
process.stdout.write(".");
callback(null, chunk);
}
});


fs.createReadStream(file)
.pipe(crypto.createCipher("aes192", "123456")) // 加密
.pipe(zlib.createGzip()) // 压缩
.pipe(reportProgress) // 变换流 每次传入数据打一个点
.pipe(fs.createWriteStream(file + ".gz"))
.on("finish", () => console.log("Done")); // 完成后的后续

// 注意调用顺序
// 一定要 先加密 在 gzip 反之无法打开文件

Stream 的用途在 Node.js 里应用非常广泛

Readable Stream

  • HTTP Response
  • HTTP Request
  • fs read stream
  • zlib stream
  • TCP sockets
  • child process stdout & stderr
  • process.stdin
  • 其他

Writable Stream

  • HTTP Response
  • HTTP Request
  • fs write stream
  • zlib stream
  • TCP sockets
  • child process stdin
  • process.stdout, process.stderr
  • 其他

数据流的积压问题 Back Pressure

一定要看

highWaterMark 是干什么的?

参考

Node.js Stream 文档

面试题

例子代码仓库