Node全解09_01stream

Stream

第一个 Stream 例子

1
2
3
4
5
6
7
const fs = require('fs')
const stream = fs.createWriteStream('./big_file.txt')
for (let i = 0; i < 1000000; i++) {
stream.write(`这是第${i}行内容,我们需要很多内容,要不停的写文件啊啊啊啊啊啊啊回车\n`)
}
stream.end() // 别忘了关闭 stream
console.log('done')

分析

  • 打开流,多次往里面塞内容,关闭流
  • 看起来就是可以写多次嘛,没什么大不了
  • 最终我们得到一个 100M 左右的文件

释义

  • stream 是流,但默认没有水,
  • stream.write 可以让水流中有水(数据)
  • 每次写的小数据叫做 chunk (块)
  • 产生数据的一段叫做 source (源头)
  • 得到数据的一段叫做 sink (水池) 上面例子的水池就是big_file.txt

第二个 stream 例子

1
2
3
4
5
6
7
8
9
10
11
12
13
const fs = require('fs');
const http = require('http');
const server = http.createServer()
server.on('request', (request, response) => {
fs.readFile('./big_file.txt', (error,
data) => {
if (error) throw error
response.end(data)
console.log('done')
})
})
server.listen(8888)
console.log(8888)

运行这个例子 并在浏览器打开 http://localhost:8888/

打开活动监视器 / 任务管理器 搜索 node 查看内存占用 一下飙升到110M左右

这个例子的结论

  • 如果用户请求过来,由于big_file.txt 的大小是 100M左右,再加上 Node.js 本身占用的几兆,所以Node.js 总体占用的内存是 110兆内存
  • 一个用户占用 100M左右 10个呢? 服务器内存是不是就不够用了

第三个例子

用 stream 改写

1
2
3
4
5
6
7
8
9
const fs = require('fs');
const http = require('http');
const server = http.createServer()
server.on('request', (request, response) => {
const stream =
fs.createReadStream('./big_file.txt')
stream.pipe(response)
})
server.listen(8888)
  • 查看 Node.js 内存占用,基本不会超过 30MB
  • 文件 steam 和 response stream 是通过管道相连的
  • 流 可以使你的内存降的非常低

管道 Pipe

两个流可以用一个管道相连
stream1 的末尾接上 stream2 的开端
只要 stream1 有数据,就会流到 stream2

常用代码

1
stream1.pipe(stream2)

链式调用

1
2
3
4
5
a.pipe(b).pipe(c)

// 等价于
a.pipe(b)
b.pipe(c)

管道可以通过事件实现

1
2
3
4
5
6
7
8
// stream1 一有数据就塞给 stream2
stream1.on('data', (chunk)=>{
stream2.write(chunk)
})
// stream1 停了 ,就停了 stream2
stream1.on('end', ()=>{
stream2.end()
})

Stream 对象的原型链

s = fs.createReadStream(path)

  • 那么它的对象层级为
  • 自身属性(fs.createReadStream 构造)
  • 原型: stream.Readable.prototype
  • 二级原型: stream.Stream.prototype
  • 三级原型: events.EventEmitter.prototype
  • 四级原型: Object.prototype

Stream对象 都继承了 EventEmitter

如何查看它的原型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const fs = require('fs');
const stream = require('stream');
const events = require('events');
const s = fs.createReadStream('./big_file.txt')

console.log(s)

console.log(stream.Readable.prototype);

console.log(events.EventEmitter.prototype);

// 运行技巧
node --inspect-brk xxx.js
// 打开浏览器 点击 step over

支持的事件和方法

Readable Stream

事件

  • data,end
  • error,close,readable

方法

  • pipe()
  • unpipe()
  • wrap()
  • destroy()
  • read()
  • unshift()
  • resume() / pause()
  • isPaused()
  • setEncoding()

Writeable Stream

事件

  • drain 面试爱问

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // 假设 stream1 开始 一秒写 10M数据 后来 stream1 一秒写 1000M数据
    // 这样就会堵车 , 所有有了 drain 不堵车了
    stream1.on('data', (chunk)=>{
    const flag = stream2.write(chunk)
    if(flag === false){
    // done write 堵车了,你别写了
    }
    // 不堵车了
    stream2.on('drain',()=>{
    // go on write
    })

    })
    // stream1 停了 ,就停了 stream2
    stream1.on('end', ()=>{
    stream2.end()
    })
  • finish

  • error,close,pipe,unpipe

方法

  • write(),destroy(),end(),cork(),uncork(),setDefaultcoding()

Stream 分类

  • Readable 可读
  • Writeable 可写
  • Duplex 可读可写(双向) 同时读和写,注意 其实就是两个管道 但是两个管道没有交叉点
  • Transform 可读可写(变化) 例子 webpack 你写的是ES6 读到的是 ES5

Readable Stream

静止态 paused 和 流动态 flowing

  • 默认处于 paused 态
  • 添加 data 事件监听,它就变成 flowing
  • 删掉 data 事件监听, 它就变成 paused
  • paused() 可以让它 变为 paused
  • resume() 可以让它 变为 flowing
1
2
3
4
5
6
7
const fs = require('fs');
const stream = fs.createReadStream('./big_file.txt')
stream.pipe(response)
stream.pause();
setTimeout(()=>{
stream.resume
},3000)

Writeable Stream

drain 流干了事件

  • 表示可以加水了
  • 我们调用 stream.write(chunk)的时候,可能会得到 false
  • false 代表写的太快了,数据积压了
  • 这个时候不能在 write 了,要监听 drain
  • 等 drain 事件触发了,才能继续 write
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
const fs = require('fs')

function writeOneMillionTimes(writer, data) {
let i = 1000000
write()

function write() {
let ok = true
do {
i--
if (i === 0) {
// Last time!
writer.write(data)
} else {
// See if we should continue, or wait.
// Don't pass the callback, because we're not done yet.
ok = writer.write(data)
if (ok === false) {
console.log('不能再写了')
}
}
} while (i > 0 && ok)
if (i > 0) {
// Had to stop early!
// Write some more once it drains.
writer.once('drain', () => {
console.log('干涸了')
write()
})
}
}
}

const writer = fs.createWriteStream('./big_file.txt')
writeOneMillionTimes(writer, 'hello world')

finish 事件

  • 调用 steam.end() 之后 而且
  • 缓冲区数据已经传给底层系统之后
  • 触发 finish 事件