rintStream的用法
马克-to-win:从学java第一天,我们就经常用到System.out.println(),实际上查阅文档可知,System.out就是Sun 编的一个PrintStream的实例对象。PrintStream顾名思义,Sun编它,就是用来打印的,以各种各样的格式,打印各种各样的数据,(boolean,char,double,float)。下面的例子就介绍了println(int x),print(String)和print(char c)的用法。马克- to-win:马克 java社区:防盗版实名手机尾号: 73203。
例:1.2.1
import java.io.*;
public class TestMark_to_win {
public static void main(String args[]) throws Exception {
byte inp[]=new byte[3];
inp[0]=97;inp[1]=98;inp[2]=99;
for (int i=0; i < 3; i++) {
/*there is no such method as println(Byte x), only sun. have the
following public void println(int x) if you want to print out
"a", use System.out.println((char)inp[i]);*/
System.out.println(inp[i]);
}
for (int i=0; i < 3; i++) {
/*public void print(char c)Print a character.*/
System.out.println((char) inp[i]);
}
char c='z';
System.out.println(c);
String s="我们是good123";
System.out.println(s);
double d=3.14;
System.out.println(d);
}
}
结果是:
97
98
99
a
b
c
z
我们是good123
3.14
例:1.2.2
import java.io.*;
public class TestMark_to_win {
public static void main(String args[]) throws Exception {
String m="qi hello bye97我们";
FileOutputStream f2=new FileOutputStream("i:/4.txt");
PrintStream ps=new PrintStream(f2);
/*void println(String x) Print a String and then terminate the line.
*/
篇幅有限更多请见扩展链接:http://www.mark-to-win.com/tutorial/java_8_UsageOfPrintStream.html
文转自 “美团点评技术博客” http://tech.meituan.com/stream-internals.html
上篇(基础篇)主要介绍了Stream的基本概念和用法,本篇将深入剖析背后工作原理,重点是如何实现流式数据处理和back pressure机制。
目录
本篇介绍stream是如何实现流式数据处理的。
数据生产和消耗的媒介
为什么使用流取数据
如何通过流取到数据
read
push方法
end事件
readable事件
doRead
howMuchToRead
数据的流式消耗
数据消耗模式
暂停模式
流动模式
背压反馈机制
pipe
消耗驱动的数据生产
数据生产和消耗的媒介
为什么使用流取数据
下面是一个读取文件内容的例子:
const fs=require('fs')
fs.readFile(file, function (err, body) { console.log(body) console.log(body.toString())
})
但如果文件内容较大,譬如在440M时,执行上述代码的输出为:
<Buffer 64 74 09 75 61 09 63 6f 75 6e 74 0a 0a 64 74 09 75 61 09 63 6f 75 6e 74 0a 32 30 31 35 31 32 30 38 09 4d 6f 7a 69 6c 6c 61 2f 35 2e 30 20 28 63 6f 6d ... >
buffer.js:382
throw new Error('toString failed');
^
Error: toString failed
at Buffer.toString (buffer.js:382:11)
报错的原因是body
这个Buffer
对象的长度过大,导致toString
方法失败。
可见,这种一次获取全部内容的做法,不适合操作大文件。
可以考虑使用流来读取文件内容。
const fs=require('fs')
fs.createReadStream(file).pipe(process.stdout)
fs.createReadStream
创建一个可读流,连接了源头(上游,文件)和消耗方(下游,标准输出)。
执行上面代码时,流会逐次调用fs.read
,将文件中的内容分批取出传给下游。
在文件看来,它的内容被分块地连续取走了。
在下游看来,它收到的是一个先后到达的数据序列。
如果不需要一次操作全部内容,它可以处理完一个数据便丢掉。
在流看来,任一时刻它都只存储了文件中的一部分数据,只是内容在变化而已。
这种情况就像是用水管去取池子中的水。
每当用掉一点水,水管便会从池子中再取出一点。
无论水池有多大,都只存储了与水管容积等量的水。
如何通过流取到数据
用Readable
创建对象readable
后,便得到了一个可读流。
如果实现_read
方法,就将流连接到一个底层数据源。
流通过调用_read
向底层请求数据,底层再调用流的push
方法将需要的数据传递过来。
当readable
连接了数据源后,下游便可以调用readable.read(n)
向流请求数据,同时监听readable
的data
事件来接收取到的数据。
这个流程可简述为:
read
read
方法中的逻辑可用下图表示,后面几节将对该图中各环节加以说明。
push方法
消耗方调用read(n)
促使流输出数据,而流通过_read()
使底层调用push
方法将数据传给流。
如果流在流动模式下(state.flowing
为true
)输出数据,数据会自发地通过data
事件输出,不需要消耗方反复调用read(n)
。
如果调用push
方法时缓存为空,则当前数据即为下一个需要的数据。
这个数据可能先添加到缓存中,也可能直接输出。
执行read
方法时,在调用_read
后,如果从缓存中取到了数据,就以data
事件输出。
所以,如果_read
异步调用push
时发现缓存为空,则意味着当前数据是下一个需要的数据,且不会被read
方法输出,应当在push
方法中立即以data
事件输出。
因此,上图中“立即输出”的条件是:
state.flowing && state.length===0 && !state.sync
end事件
由于流是分次向底层请求数据的,需要底层显示地告诉流数据是否取完。
所以,当某次(执行_read()
)取数据时,调用了push(null)
,就意味着底层数据取完。
此时,流会设置state.ended
。
state.length
表示缓存中当前的数据量。
只有当state.length
为0
,且state.ended
为true
,才意味着所有的数据都被消耗了。
一旦在执行read(n)
时检测到这个条件,便会触发end
事件。
当然,这个事件只会触发一次。
readable事件
在调用完_read()
后,read(n)
会试着从缓存中取数据。
如果_read()
是异步调用push
方法的,则此时缓存中的数据量不会增多,容易出现数据量不够的现象。
如果read(n)
的返回值为null
,说明这次未能从缓存中取出所需量的数据。
此时,消耗方需要等待新的数据到达后再次尝试调用read
方法。
在数据到达后,流是通过readable
事件来通知消耗方的。
在此种情况下,push
方法如果立即输出数据,接收方直接监听data
事件即可,否则数据被添加到缓存中,需要触发readable
事件。
消耗方必须监听这个事件,再调用read
方法取得数据。
doRead
流中维护了一个缓存,当缓存中的数据足够多时,调用read()
不会引起_read()
的调用,即不需要向底层请求数据。
用doRead
来表示read(n)
是否需要向底层取数据,其逻辑为:
var doRead=state.needReadableif (state.length===0 || state.length - n < state.highWaterMark) {
doRead=true}if (state.ended || state.reading) {
doRead=false}if (doRead) {
state.reading=true
state.sync=true
if (state.length===0) {
state.needReadable=true
} this._read(state.highWaterMark)
state.sync=false}
state.reading
标志上次从底层取数据的操作是否已完成。
一旦push
方法被调用,就会设置为false
,表示此次_read()
结束。
state.highWaterMark
是给缓存大小设置的一个上限阈值。
如果取走n
个数据后,缓存中保有的数据不足这个量,便会从底层取一次数据。
howMuchToRead
调用read(n)
去取n
个数据时,m=howMuchToRead(n)
是将从缓存中实际获取的数据量。
根据以下几种情况赋值,一旦确定则立即返回:
state.length
为0,state.ended
为true
。
数据源已枯竭,且缓存为空,无数据可取,m
为0.
state.objectMode
为true
。
n
为0,则m
为0;
否则m
为1,将缓存的第一个元素输出。
n
是数字。
若n <=0
,则m
为0;
若n > state.length
,表示缓存中数据量不够。
此时如果还有数据可读(state.ended
为false
),则m
为0,同时设置state.needReadable
,下次执行read()
时doRead
会为true
,将从底层再取数据。
如果已无数据可读(state.ended
为true
),则m
为state.length
,将剩下的数据全部输出。
若0 < n <=state.length
,则缓存中数据够用,m
为n
。
其它情况。
state.flowing
为true
(流动模式),则m
为缓存中第一个元素(Buffer
)的长度,实则还是将第一个元素输出;
否则m
为state.length
,将缓存读空。
上面的规则中:
n
通常是undefined
或0
,即不指定读取的字节数。
read(0)
不会有数据输出,但从前面对doRead
的分析可以看出,是有可能从底层读取数据的。
执行read()
时,由于流动模式下数据会不断输出,所以每次只输出缓存中第一个元素输出,而非流动模式则会将缓存读空。
objectMode
为true
时,m
为0
或1
。此时,一次push()
对应一次data
事件。
综上所述:
可读流是获取底层数据的工具,消耗方通过调用read
方法向流请求数据,流再从缓存中将数据返回,或以data
事件输出。
如果缓存中数据不够,便会调用_read
方法去底层取数据。
该方法在拿到底层数据后,调用push
方法将数据交由流处理(立即输出或存入缓存)。
可以结合readable
事件和read
方法来将数据全部消耗,这是暂停模式的消耗方法。
但更常见的是在流动模式下消耗数据,具体见后面的章节。
数据的流式消耗
所谓“流式数据”,是指按时间先后到达的数据序列。
数据消耗模式
可以在两种模式下消耗可读流中的数据:暂停模式(paused mode)和流动模式(flowing mode)。
流动模式下,数据会源源不断地生产出来,形成“流动”现象。
监听流的data
事件便可进入该模式。
暂停模式下,需要显示地调用read()
,触发data
事件。
可读流对象readable
中有一个维护状态的对象,readable._readableState
,这里简称为state
。
其中有一个标记,state.flowing
, 可用来判别流的模式。
它有三种可能值:
true
。流动模式。
false
。暂停模式。
null
。初始状态。
调用readable.resume()
可使流进入流动模式,state.flowing
被设为true
。
调用readable.pause()
可使流进入暂停模式,state.flowing
被设为false
。
暂停模式
在初始状态下,监听data
事件,会使流进入流动模式。
但如果在暂停模式下,监听data
事件并不会使它进入流动模式。
为了消耗流,需要显示调用read()
方法。
const Readable=require('stream').Readable// 底层数据const dataSource=['a', 'b', 'c']const readable=Readable()
readable._read=function () { if (dataSource.length) { this.push(dataSource.shift())
} else { this.push(null)
}
}// 进入暂停模式readable.pause()
readable.on('data', data=> process.stdout.write('\ndata: ' + data))var data=readable.read()while (data !==null) {
process.stdout.write('\nread: ' + data)
data=readable.read()
}
执行上面的脚本,输出如下:
data: a
read: a
data: b
read: b
data: c
read: c
可见,在暂停模式下,调用一次read
方法便读取一次数据。
执行read()
时,如果缓存中数据不够,会调用_read()
去底层取。
_read
方法中可以同步或异步地调用push(data)
来将底层数据交给流处理。
在上面的例子中,由于是同步调用push
方法,数据会添加到缓存中。
read
方法在执行完_read
方法后,便从缓存中取数据,再返回,且以data
事件输出。
如果改成异步调用push
方法,则由于_read()
执行完后,数据来不及放入缓存,
将出现read()
返回null
的现象。
见下面的示例:
const Readable=require('stream').Readable// 底层数据const dataSource=['a', 'b', 'c']const readable=Readable()
readable._read=function () {
process.nextTick(()=> { if (dataSource.length) { this.push(dataSource.shift())
} else { this.push(null)
}
})
}
readable.pause()
readable.on('data', data=> process.stdout.write('\ndata: ' + data))while (null !==readable.read()) ;
执行上述脚本,可以发现没有任何数据输出。
此时,需要使用readable
事件:
const Readable=require('stream').Readable// 底层数据const dataSource=['a', 'b', 'c']const readable=Readable()
readable._read=function () {
process.nextTick(()=> { if (dataSource.length) { this.push(dataSource.shift())
} else { this.push(null)
}
})
}
readable.pause()
readable.on('data', data=> process.stdout.write('\ndata: ' + data))
readable.on('readable', function () { while (null !==readable.read()) ;;
})
输出:
data: a
data: b
data: c
当read()
返回null
时,意味着当前缓存数据不够,而且底层数据还没加进来(异步调用push()
)。
此种情况下state.needReadable
会被设置为true
。
push
方法被调用时,由于是暂停模式,不会立即输出数据,而是将数据放入缓存,并触发一次readable
事件。
所以,一旦read
被调用,上面的例子中就会形成一个循环:readable
事件导致read
方法调用,read
方法又触发readable
事件。
首次监听readable
事件时,还会触发一次read(0)
的调用,从而引起_read
和push
方法的调用,从而启动循环。
总之,在暂停模式下需要使用readable
事件和read
方法来消耗流。
流动模式
流动模式使用起来更简单一些。
一般创建流后,监听data
事件,或者通过pipe
方法将数据导向另一个可写流,即可进入流动模式开始消耗数据。
尤其是pipe
方法中还提供了back pressure机制,所以使用pipe
进入流动模式的情况非常普遍。
本节解释data
事件如何能触发流动模式。
先看一下Readable
是如何处理data
事件的监听的:
Readable.prototype.on=function (ev, fn) { var res=Stream.prototype.on.call(this, ev, fn) if (ev==='data' && false !==this._readableState.flowing) { this.resume()
} // 处理readable事件的监听
// 省略
return res
}
Stream
继承自EventEmitter
,且是Readable
的父类。
从上面的逻辑可以看出,在将fn
加入事件队列后,如果发现处于非暂停模式,则会调用this.resume()
,开始流动模式。
resume()
方法先将state.flowing
设为true
,
然后会在下一个tick中执行flow
,试图将缓存读空:
if (state.flowing) do { var chunk=stream.read()
} while (null !==chunk && state.flowing)
flow
中每次read()
都可能触发push()
的调用,
而push()
中又可能触发flow()
或read()
的调用,
这样就形成了数据生生不息的流动。
其关系可简述为:
下面再详细看一下push()
的两个分支:
if (state.flowing && state.length===0 && !state.sync) {
stream.emit('data', chunk)
stream.read(0)
} else {
state.length +=state.objectMode ? 1 : chunk.length
state.buffer.push(chunk) if (state.needReadable)
emitReadable(stream)
}
称第一个分支为立即输出。
在立即输出的情况下,输出数据后,执行read(0)
,进一步引起_read()
和push()
的调用,从而使数据源源不断地输出。
在非立即输出的情况下,数据先被添加到缓存中。
此时有两种情况:
state.length
为0。
这时,在调用_read()
前,state.needReadable
就会被设为true
。
因此,一定会调用emitReadable()
。
这个方法会在下一个tick中触发readable
事件,同时再调用flow()
,从而形成流动。
state.length
不为0。
由于流动模式下,每次都是从缓存中取第一个元素,所以这时read()
返回值一定不为null
。
故flow()
中的循环还在继续。
此外,从push()
的两个分支可以看出来,如果state.flowing
设为false
,第一个分支便不会再进去,也就不会再调用read(0)
。
同时第二个分支中引发flow
的调用后,也不会再调用read()
,这就完全暂停了底层数据的读取。
事实上,pause
方法就是这样使流从流动模式转换到暂停模式的。
背压反馈机制
考虑下面的例子:
const fs=require('fs')
fs.createReadStream(file).on('data', doSomething)
监听data
事件后文件中的内容便立即开始源源不断地传给doSomething()
。
如果doSomething
处理数据较慢,就需要缓存来不及处理的数据data
,占用大量内存。
理想的情况是下游消耗一个数据,上游才生产一个新数据,这样整体的内存使用就能保持在一个水平。
Readable
提供pipe
方法,用来实现这个功能。
pipe
用pipe
方法连接上下游:
const fs=require('fs')
fs.createReadStream(file).pipe(writable)
writable
是一个可写流Writable
对象,上游调用其write
方法将数据写入其中。
writable
内部维护了一个写队列,当这个队列长度达到某个阈值(state.highWaterMark
)时,
执行write()
时返回false
,否则返回true
。
于是上游可以根据write()
的返回值在流动模式和暂停模式间切换:
readable.on('data', function (data) { if (false===writable.write(data)) {
readable.pause()
}
})
writable.on('drain', function () {
readable.resume()
})
上面便是pipe
方法的核心逻辑。
当write()
返回false
时,调用readable.pause()
使上游进入暂停模式,不再触发data
事件。
但是当writable
将缓存清空时,会触发一个drain
事件,再调用readable.resume()
使上游进入流动模式,继续触发data
事件。
看一个例子:
const stream=require('stream')var c=0const readable=stream.Readable({
highWaterMark: 2,
read: function () {
process.nextTick(()=> { var data=c < 6 ? String.fromCharCode(c + 65) : null
console.log('push', ++c, data) this.push(data)
})
}
})const writable=stream.Writable({
highWaterMark: 2,
write: function (chunk, enc, next) { console.log('write', chunk)
}
})
readable.pipe(writable)
输出:
push 1 A
write <Buffer 41>
push 2 B
push 3 C
push 4 D
虽然上游一共有6个数据(ABCDEF
)可以生产,但实际只生产了4个(ABCD
)。
这是因为第一个数据(A
)迟迟未能写完(未调用next()
),所以后面通过write
方法添加进来的数据便被缓存起来。
下游的缓存队列到达2时,write
返回false
,上游切换至暂停模式。
此时下游保存了AB
。
由于Readable
总是缓存state.highWaterMark
这么多的数据,所以上游保存了CD
。
从而一共生产出来ABCD
四个数据。
下面使用tick-node将Readable
的debug信息按tick分组:
? NODE_DEBUG=stream tick-node pipe.js
STREAM 18930: pipe count=1 opts=undefined
STREAM 18930: resume
---------- TICK 1 ----------
STREAM 18930: resume read 0
STREAM 18930: read 0
STREAM 18930: need readable false
STREAM 18930: length less than watermark true
STREAM 18930: do read
STREAM 18930: flow true
STREAM 18930: read undefined
STREAM 18930: need readable true
STREAM 18930: length less than watermark true
STREAM 18930: reading or ended false
---------- TICK 2 ----------
push 1 A
STREAM 18930: ondata
write <Buffer 41>
STREAM 18930: read 0
STREAM 18930: need readable true
STREAM 18930: length less than watermark true
STREAM 18930: do read
---------- TICK 3 ----------
push 2 B
STREAM 18930: ondata
STREAM 18930: call pause flowing=true
STREAM 18930: pause
STREAM 18930: read 0
STREAM 18930: need readable true
STREAM 18930: length less than watermark true
STREAM 18930: do read
---------- TICK 4 ----------
push 3 C
STREAM 18930: emitReadable false
STREAM 18930: emit readable
STREAM 18930: flow false
---------- TICK 5 ----------
STREAM 18930: maybeReadMore read 0
STREAM 18930: read 0
STREAM 18930: need readable false
STREAM 18930: length less than watermark true
STREAM 18930: do read
---------- TICK 6 ----------
push 4 D
---------- TICK 7 ----------
TICK 0: readable.resume()
TICK 1: readable
在流动模式下开始从底层读取数据
TICK 2: A
被输出,同时执行readable.read(0)
。
TICK 3: B
被输出,同时执行readable.read(0)
。
writable.write('B')
返回false
。
执行readable.pause()
切换至暂停模式。
TICK 4: TICK 3中read(0)
引起push('C')
的调用,C
被加到readable
缓存中。
此时,writable
中有A
和B
,readable
中有C
。
这时已在暂停模式,但在readable.push('C')
结束前,发现缓存中只有1个数据,小于设定的highWaterMark
(2),故准备在下一个tick再读一次数据。
TICK 5: 调用read(0)
从底层取数据。
TICK 6: push('D')
,D
被加到readable
缓存中。
此时,writable
中有A
和B
,readable
中有C
和D
。
readable
缓存中有2个数据,等于设定的highWaterMark
(2),不再从底层读取数据。
可以认为,随着下游缓存队列的增加,上游写数据时受到的阻力变大。
这种back pressure大到一定程度时上游便停止写,等到back pressure降低时再继续。
消耗驱动的数据生产
使用pipe()
时,数据的生产和消耗形成了一个闭环。
通过负反馈调节上游的数据生产节奏,事实上形成了一种所谓的拉式流(pull stream)。
用喝饮料来说明拉式流和普通流的区别的话,普通流就像是将杯子里的饮料往嘴里倾倒,动力来源于上游,数据是被推往下游的;拉式流则是用吸管去喝饮料,动力实际来源于下游,数据是被拉去下游的。
所以,使用拉式流时,是“按需生产”。
如果下游停止消耗,上游便会停止生产。
所有缓存的数据量便是两者的阈值和。
当使用Transform
作为下游时,尤其需要注意消耗。
const stream=require('stream')var c=0const readable=stream.Readable({
highWaterMark: 2,
read: function () {
process.nextTick(()=> { var data=c < 26 ? String.fromCharCode(c++ + 97) : null
console.log('push', data) this.push(data)
})
}
})const transform=stream.Transform({
highWaterMark: 2,
transform: function (buf, enc, next) { console.log('transform', buf)
next(null, buf)
}
})
readable.pipe(transform)
以上代码执行结果为:
push a
transform <Buffer 61>
push b
transform <Buffer 62>
push c
push d
push e
push f
可见,并没有将26个字母全生产出来。
Transform
中有两个缓存:可写端的缓存和可读端的缓存。
调用transform.write()
时,如果可读端缓存未满,数据会经过变换后加入到可读端的缓存中。
当可读端缓存到达阈值后,再调用transform.write()
则会将写操作缓存到可写端的缓存队列。
当可写端的缓存队列也到达阈值时,transform.write()
返回false
,上游进入暂停模式,不再继续transform.write()
。
所以,上面的transform
中实际存储了4个数据,ab
在可读端(经过了_transform
的处理),cd
在可写端(还未经过_transform
处理)。
此时,由前面一节的分析可知,readable
将缓存ef
,之后便不再生产数据。
这三个缓存加起来的长度恰好为6,所以一共就生产了6个数据。
要想将26个数据全生产出来,有两种做法。
第一种是消耗transform
中可读端的缓存,以拉动上游的生产:
readable.pipe(transform).pipe(process.stdout)
第二种是,不要将数据存入可读端中,这样可读端的缓存便会一直处于数据不足状态,上游便会源源不断地生产数据:
const transform=stream.Transform({
highWaterMark: 2,
transform: function (buf, enc, next) {
next()
}
})
参考文献
GitHub,substack/browserify-handbook
GitHub,zoubin/streamify-your-node-program
阅读更多技术类文章,请关注微信公众号 “美团点评技术团队”
文转自 “美团点评技术团队” http://tech.meituan.com/stream-in-action.html
背景
前面两篇(基础篇和进阶篇)主要介绍流的基本用法和原理,本篇从应用的角度,介绍如何使用管道进行程序设计,主要内容包括:
管道的概念
Browserify的管道设计
Gulp的管道设计
两种管道设计模式比较
实例
Pipeline
所谓“管道”,指的是通过a.pipe(b)
的形式连接起来的多个Stream对象的组合。
假如现在有两个Transform
:bold
和red
,分别可将文本流中某些关键字加粗和飘红。
可以按下面的方式对文本同时加粗和飘红:
// source: 输入流// dest: 输出目的地source.pipe(bold).pipe(red).pipe(dest)
bold.pipe(red)
便可以看作一个管道,输入流先后经过bold
和red
的变换再输出。
但如果这种加粗且飘红的功能的应用场景很广,我们期望的使用方式是:
// source: 输入流// dest: 输出目的地// pipeline: 加粗且飘红source.pipe(pipeline).pipe(dest)
此时,pipeline
封装了bold.pipe(red)
,从逻辑上来讲,也称其为管道。
其实现可简化为:
var pipeline=new Duplex()var streams=pipeline._streams=[bold, red]// 底层写逻辑:将数据写入管道的第一个Stream,即boldpipeline._write=function (buf, enc, next) {
streams[0].write(buf, enc, next)
}// 底层读逻辑:从管道的最后一个Stream(即red)中读取数据pipeline._read=function () { var buf var reads=0
var r=streams[streams.length - 1] // 将缓存读空
while ((buf=r.read()) !==null) {
pipeline.push(buf)
reads++
} if (reads===0) { // 缓存本来为空,则等待新数据的到来
r.once('readable', function () {
pipeline._read()
})
}
}// 将各个Stream组合起来(此处等同于`bold.pipe(red)`)streams.reduce(function (r, next) {
r.pipe(next) return next
})
往pipeline
写数据时,数据直接写入bold
,再流向red
,最后从pipeline
读数据时再从red
中读出。
如果需要在中间新加一个underline
的Stream,可以:
pipeline._streams.splice(1, 0, underline)
bold.unpipe(red)
bold.pipe(underline).pipe(red)
如果要将red
替换成green
,可以:
// 删除redpipeline._streams.pop()
bold.unpipe(red)// 添加greenpipeline._streams.push(green)
bold.pipe(green)
可见,这种管道的各个环节是可以修改的。
stream-splicer对上述逻辑进行了进一步封装,提供splice
、push
、pop
等方法,使得pipeline
可以像数组那样被修改:
var splicer=require('stream-splicer')var pipeline=splicer([bold, red])// 在中间添加underlinepipeline.splice(1, 0, underline)// 删除redpipeline.pop()// 添加greenpipeline.push(green)
labeled-stream-splicer在此基础上又添加了使用名字替代下标进行操作的功能:
var splicer=require('labeled-stream-splicer')var pipeline=splicer([ 'bold', bold, 'red', red,
])// 在`red`前添加underlinepipeline.splice('red', 0, underline)// 删除`bold`pipeline.splice('bold', 1)
由于pipeline
本身与其各个环节一样,也是一个Stream对象,因此可以嵌套:
var splicer=require('labeled-stream-splicer')var pipeline=splicer([ 'style', [ bold, red ], 'insert', [ comma ],
])
pipeline.get('style') // 取得管道:[bold, red]
.splice(1, 0, underline) // 添加underline
Browserify
Browserify的功能介绍可见substack/browserify-handbook,其核心逻辑的实现在于管道的设计:
var splicer=require('labeled-stream-splicer')var pipeline=splicer.obj([ // 记录输入管道的数据,重建管道时直接将记录的数据写入。
// 用于像watch时需要多次打包的情况
'record', [ this._recorder() ], // 依赖解析,预处理
'deps', [ this._mdeps ], // 处理JSON文件
'json', [ this._json() ], // 删除文件前面的BOM
'unbom', [ this._unbom() ], // 删除文件前面的`#!`行
'unshebang', [ this._unshebang() ], // 语法检查
'syntax', [ this._syntax() ], // 排序,以确保打包结果的稳定性
'sort', [ depsSort(dopts) ], // 对拥有同样内容的模块去重
'dedupe', [ this._dedupe() ], // 将id从文件路径转换成数字,避免暴露系统路径信息
'label', [ this._label(opts) ], // 为每个模块触发一次dep事件
'emit-deps', [ this._emitDeps() ], 'debug', [ this._debug(opts) ], // 将模块打包
'pack', [ this._bpack ], // 更多自定义的处理
'wrap', [],
])
每个模块用row
表示,定义如下:
{ // 模块的唯一标识
id: id, // 模块对应的文件路径
file: '/path/to/file', // 模块内容
source: '', // 模块的依赖
deps: { // `require(expr)`
expr: id,
}
}
在wrap
阶段前,所有的阶段都处理这样的对象流,且除pack
外,都输出这样的流。
有的补充row
中的一些信息,有的则对这些信息做一些变换,有的只是读取和输出。
一般row
中的source
、deps
内容都是在deps
阶段解析出来的。
下面提供一个修改Browserify管道的函数。
var Transform=require('stream').Transform// 创建Transform对象function through(write, end) { return Transform({
transform: write,
flush: end,
})
}// `b`为Browserify实例// 该插件可打印出打包时间function log(b) { // watch时需要重新打包,整个pipeline会被重建,所以也要重新修改
b.on('reset', reset) // 修改当前pipeline
reset() function reset () { var time=null
var bytes=0
b.pipeline.get('record').on('end', function () { // 以record阶段结束为起始时刻
time=Date.now()
}) // `wrap`是最后一个阶段,在其后添加记录结束时刻的Transform
b.pipeline.get('wrap').push(through(write, end)) function write (buf, enc, next) { // 累计大小
bytes +=buf.length this.push(buf)
next()
} function end () { // 打包时间
var delta=Date.now() - time
b.emit('time', delta)
b.emit('bytes', bytes)
b.emit('log', bytes + ' bytes written ('
+ (delta / 1000).toFixed(2) + ' seconds)'
) this.push(null)
}
}
}var fs=require('fs')var browserify=require('browserify')var b=browserify(opts)// 应用插件b.plugin(log)
b.bundle().pipe(fs.createWriteStream('bundle.js'))
事实上,这里的b.plugin(log)
就是直接执行了log(b)
。
在插件中,可以修改b.pipeline
中的任何一个环节。
因此,Browserify本身只保留了必要的功能,其它都由插件去实现,如watchify、factor-bundle等。
除了了上述的插件机制外,Browserify还有一套Transform机制,即通过b.transform(transform)
可以新增一些文件内容预处理的Transform。
预处理是发生在deps
阶段的,当模块文件内容被读出来时,会经过这些Transform处理,然后才做依赖解析,如babelify、envify。
Gulp
Gulp的核心逻辑分成两块:任务调度与文件处理。
任务调度是基于orchestrator,而文件处理则是基于vinyl-fs。
类似于Browserify提供的模块定义(用row
表示),vinyl-fs也提供了文件定义(vinyl对象)。
Browserify的管道处理的是row
流,Gulp管道处理vinyl流:
gulp.task('scripts', ['clean'], function() { // Minify and copy all JavaScript (except vendor scripts)
// with sourcemaps all the way down
return gulp.src(paths.scripts)
.pipe(sourcemaps.init())
.pipe(coffee())
.pipe(uglify())
.pipe(concat('all.min.js'))
.pipe(sourcemaps.write())
.pipe(gulp.dest('build/js'));
});
任务中创建的管道起始于gulp.src
,终止于gulp.dest
,中间有若干其它的Transform(插件)。
如果与Browserify的管道对比,可以发现Browserify是确定了一条具有完整功能的管道,而Gulp本身只提供了创建vinyl流和将vinyl流写入磁盘的工具,管道中间经历什么全由用户决定。
这是因为任务中做什么,是没有任何限制的,文件处理也只是常见的情况,并非一定要用gulp.src
与gulp.dest
。
两种模式比较
Browserify与Gulp都借助管道的概念来实现插件机制。
Browserify定义了模块的数据结构,提供了默认的管道以处理这样的数据流,而插件可用来修改管道结构,以定制处理行为。
Gulp虽也定义了文件的数据结构,但只提供产生、消耗这种数据流的接口,完全由用户通过插件去构造处理管道。
当明确具体的处理需求时,可以像Browserify那样,构造一个基本的处理管道,以提供插件机制。
如果需要的是实现任意功能的管道,可以如Gulp那样,只提供数据流的抽象。
实例
本节中实现一个针对Git仓库自动生成changelog的工具,完整代码见ezchangelog。
ezchangelog的输入为git log
生成的文本流,输出默认为markdown格式的文本流,但可以修改为任意的自定义格式。
输入示意:
commit 9c5829ce45567bedccda9beb7f5de17574ea9437
Author: zoubin <zoubin04@gmail.com>
Date: Sat Nov 7 18:42:35 2015 +0800
CHANGELOG
commit 3bf9055b732cc23a9c14f295ff91f48aed5ef31a
Author: zoubin <zoubin04@gmail.com>
Date: Sat Nov 7 18:41:37 2015 +0800
4.0.3
commit 87abe8e12374079f73fc85c432604642059806ae
Author: zoubin <zoubin04@gmail.com>
Date: Sat Nov 7 18:41:32 2015 +0800
fix readme
add more tests
输出示意:
* [[`9c5829c`](https://github.com/zoubin/ezchangelog/commit/9c5829c)] CHANGELOG## [v4.0.3](https://github.com/zoubin/ezchangelog/commit/3bf9055) (2015-11-07)* [[`87abe8e`](https://github.com/zoubin/ezchangelog/commit/87abe8e)] fix readme add more tests
其实需要的是这样一个pipeline
:
source.pipe(pipeline).pipe(dest)
可以分为两个阶段:
parse:从输入文本流中解析出commit信息
format: 将commit流变换为文本流
默认的情况下,要想得到示例中的markdown,需要解析出每个commit的sha1、日期、消息、是否为tag。
定义commit的格式如下:
{
commit: { // commit sha1
long: '3bf9055b732cc23a9c14f295ff91f48aed5ef31a',
short: '3bf9055',
},
committer: { // commit date
date: new Date('Sat Nov 7 18:41:37 2015 +0800'),
}, // raw message lines
messages: ['', ' 4.0.3', ''], // raw headers before the messages
headers: [
['Author', 'zoubin <zoubin04@gmail.com>'],
['Date', 'Sat Nov 7 18:41:37 2015 +0800'],
], // the first non-empty message line
subject: '4.0.3', // other message lines
body: '', // git tag
tag: 'v4.0.3', // link to the commit. opts.baseUrl should be specified.
url: 'https://github.com/zoubin/ezchangelog/commit/3bf9055',
}
于是有:
var splicer=require('labeled-stream-splicer')
pipeline=splicer.obj([ 'parse', [ // 按行分隔
'split', split(), // 生成commit对象,解析出sha1和日期
'commit', commit(), // 解析出tag
'tag', tag(), // 解析出url
'url', url({ baseUrl: opts.baseUrl }),
], 'format', [ // 将commit组合成markdown文本
'markdownify', markdownify(),
],
])
至此,基本功能已经实现。
现在将其封装并提供插件机制。
function Changelog(opts) {
opts=opts || {} this._options=opts // 创建pipeline
this.pipeline=splicer.obj([ 'parse', [ 'split', split(), 'commit', commit(), 'tag', tag(), 'url', url({ baseUrl: opts.baseUrl }),
], 'format', [ 'markdownify', markdownify(),
],
]) // 应用插件
;[].concat(opts.plugin).filter(Boolean).forEach(function (p) { this.plugin(p)
}, this)
}
Changelog.prototype.plugin=function (p, opts) { if (Array.isArray(p)) {
opts=p[1]
p=p[0]
} // 执行插件函数,修改pipeline
p(this, opts) return this}
上面的实现提供了两种方式来应用插件。
一种是通过配置传入,另一种是创建实例后再调用plugin
方法,本质一样。
为了使用方便,还可以简单封装一下。
function changelog(opts) { return new Changelog(opts).pipeline
}
这样,就可以如下方式使用:
source.pipe(changelog()).pipe(dest)
这个已经非常接近我们的预期了。
现在来开发一个插件,修改默认的渲染方式。
var through=require('through2')function customFormatter(c) { // c是`Changelog`实例
// 添加解析author的transform
c.pipeline.get('parse').push(through.obj(function (ci, enc, next) { // parse the author name from: 'zoubin <zoubin04@gmail.com>'
ci.committer.author=ci.headers[0][1].split(/\s+/)[0]
next(null, ci)
})) // 替换原有的渲染
c.pipeline.get('format').splice('markdownify', 1, through.obj(function (ci, enc, next) { var sha1=ci.commit.short
sha1='[`' + sha1 + '`](' + c._options.baseUrl + sha1 + ')'
var date=ci.committer.date.toISOString().slice(0, 10)
next(null, '* ' + sha1 + ' ' + date + ' @' + ci.committer.author + '\n')
}))
}
source
.pipe(changelog({
baseUrl: 'https://github.com/zoubin/ezchangelog/commit/',
plugin: [customFormatter],
}))
.pipe(dest)
同样的输入,输出将会是:
* [`9c5829c`](https://github.com/zoubin/ezchangelog/commit/9c5829c) 2015-11-07 @zoubin* [`3bf9055`](https://github.com/zoubin/ezchangelog/commit/3bf9055) 2015-11-07 @zoubin* [`87abe8e`](https://github.com/zoubin/ezchangelog/commit/87abe8e) 2015-11-07 @zoubin
可以看出,通过创建可修改的管道,ezchangelog保持了本身逻辑的单一性,同时又提供了强大的自定义空间。
参考文献
GitHub,substack/browserify-handbook
GitHub,zoubin/streamify-your-node-program
查看更多技术类文章,请关注微信公众号:美团点评技术团队。
*请认真填写需求信息,我们会在24小时内与您取得联系。