You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
209 lines
6.4 KiB
209 lines
6.4 KiB
// write data to it, and it'll emit data in 512 byte blocks. |
|
// if you .end() or .flush(), it'll emit whatever it's got, |
|
// padded with nulls to 512 bytes. |
|
|
|
module.exports = BlockStream |
|
|
|
var Stream = require("stream").Stream |
|
, inherits = require("inherits") |
|
, assert = require("assert").ok |
|
, debug = process.env.DEBUG ? console.error : function () {} |
|
|
|
function BlockStream (size, opt) { |
|
this.writable = this.readable = true |
|
this._opt = opt || {} |
|
this._chunkSize = size || 512 |
|
this._offset = 0 |
|
this._buffer = [] |
|
this._bufferLength = 0 |
|
if (this._opt.nopad) this._zeroes = false |
|
else { |
|
this._zeroes = new Buffer(this._chunkSize) |
|
for (var i = 0; i < this._chunkSize; i ++) { |
|
this._zeroes[i] = 0 |
|
} |
|
} |
|
} |
|
|
|
inherits(BlockStream, Stream) |
|
|
|
BlockStream.prototype.write = function (c) { |
|
// debug(" BS write", c) |
|
if (this._ended) throw new Error("BlockStream: write after end") |
|
if (c && !Buffer.isBuffer(c)) c = new Buffer(c + "") |
|
if (c.length) { |
|
this._buffer.push(c) |
|
this._bufferLength += c.length |
|
} |
|
// debug("pushed onto buffer", this._bufferLength) |
|
if (this._bufferLength >= this._chunkSize) { |
|
if (this._paused) { |
|
// debug(" BS paused, return false, need drain") |
|
this._needDrain = true |
|
return false |
|
} |
|
this._emitChunk() |
|
} |
|
return true |
|
} |
|
|
|
BlockStream.prototype.pause = function () { |
|
// debug(" BS pausing") |
|
this._paused = true |
|
} |
|
|
|
BlockStream.prototype.resume = function () { |
|
// debug(" BS resume") |
|
this._paused = false |
|
return this._emitChunk() |
|
} |
|
|
|
BlockStream.prototype.end = function (chunk) { |
|
// debug("end", chunk) |
|
if (typeof chunk === "function") cb = chunk, chunk = null |
|
if (chunk) this.write(chunk) |
|
this._ended = true |
|
this.flush() |
|
} |
|
|
|
BlockStream.prototype.flush = function () { |
|
this._emitChunk(true) |
|
} |
|
|
|
BlockStream.prototype._emitChunk = function (flush) { |
|
// debug("emitChunk flush=%j emitting=%j paused=%j", flush, this._emitting, this._paused) |
|
|
|
// emit a <chunkSize> chunk |
|
if (flush && this._zeroes) { |
|
// debug(" BS push zeroes", this._bufferLength) |
|
// push a chunk of zeroes |
|
var padBytes = (this._bufferLength % this._chunkSize) |
|
if (padBytes !== 0) padBytes = this._chunkSize - padBytes |
|
if (padBytes > 0) { |
|
// debug("padBytes", padBytes, this._zeroes.slice(0, padBytes)) |
|
this._buffer.push(this._zeroes.slice(0, padBytes)) |
|
this._bufferLength += padBytes |
|
// debug(this._buffer[this._buffer.length - 1].length, this._bufferLength) |
|
} |
|
} |
|
|
|
if (this._emitting || this._paused) return |
|
this._emitting = true |
|
|
|
// debug(" BS entering loops") |
|
var bufferIndex = 0 |
|
while (this._bufferLength >= this._chunkSize && |
|
(flush || !this._paused)) { |
|
// debug(" BS data emission loop", this._bufferLength) |
|
|
|
var out |
|
, outOffset = 0 |
|
, outHas = this._chunkSize |
|
|
|
while (outHas > 0 && (flush || !this._paused) ) { |
|
// debug(" BS data inner emit loop", this._bufferLength) |
|
var cur = this._buffer[bufferIndex] |
|
, curHas = cur.length - this._offset |
|
// debug("cur=", cur) |
|
// debug("curHas=%j", curHas) |
|
// If it's not big enough to fill the whole thing, then we'll need |
|
// to copy multiple buffers into one. However, if it is big enough, |
|
// then just slice out the part we want, to save unnecessary copying. |
|
// Also, need to copy if we've already done some copying, since buffers |
|
// can't be joined like cons strings. |
|
if (out || curHas < outHas) { |
|
out = out || new Buffer(this._chunkSize) |
|
cur.copy(out, outOffset, |
|
this._offset, this._offset + Math.min(curHas, outHas)) |
|
} else if (cur.length === outHas && this._offset === 0) { |
|
// shortcut -- cur is exactly long enough, and no offset. |
|
out = cur |
|
} else { |
|
// slice out the piece of cur that we need. |
|
out = cur.slice(this._offset, this._offset + outHas) |
|
} |
|
|
|
if (curHas > outHas) { |
|
// means that the current buffer couldn't be completely output |
|
// update this._offset to reflect how much WAS written |
|
this._offset += outHas |
|
outHas = 0 |
|
} else { |
|
// output the entire current chunk. |
|
// toss it away |
|
outHas -= curHas |
|
outOffset += curHas |
|
bufferIndex ++ |
|
this._offset = 0 |
|
} |
|
} |
|
|
|
this._bufferLength -= this._chunkSize |
|
assert(out.length === this._chunkSize) |
|
// debug("emitting data", out) |
|
// debug(" BS emitting, paused=%j", this._paused, this._bufferLength) |
|
this.emit("data", out) |
|
out = null |
|
} |
|
// debug(" BS out of loops", this._bufferLength) |
|
|
|
// whatever is left, it's not enough to fill up a block, or we're paused |
|
this._buffer = this._buffer.slice(bufferIndex) |
|
if (this._paused) { |
|
// debug(" BS paused, leaving", this._bufferLength) |
|
this._needsDrain = true |
|
this._emitting = false |
|
return |
|
} |
|
|
|
// if flushing, and not using null-padding, then need to emit the last |
|
// chunk(s) sitting in the queue. We know that it's not enough to |
|
// fill up a whole block, because otherwise it would have been emitted |
|
// above, but there may be some offset. |
|
var l = this._buffer.length |
|
if (flush && !this._zeroes && l) { |
|
if (l === 1) { |
|
if (this._offset) { |
|
this.emit("data", this._buffer[0].slice(this._offset)) |
|
} else { |
|
this.emit("data", this._buffer[0]) |
|
} |
|
} else { |
|
var outHas = this._bufferLength |
|
, out = new Buffer(outHas) |
|
, outOffset = 0 |
|
for (var i = 0; i < l; i ++) { |
|
var cur = this._buffer[i] |
|
, curHas = cur.length - this._offset |
|
cur.copy(out, outOffset, this._offset) |
|
this._offset = 0 |
|
outOffset += curHas |
|
this._bufferLength -= curHas |
|
} |
|
this.emit("data", out) |
|
} |
|
// truncate |
|
this._buffer.length = 0 |
|
this._bufferLength = 0 |
|
this._offset = 0 |
|
} |
|
|
|
// now either drained or ended |
|
// debug("either draining, or ended", this._bufferLength, this._ended) |
|
// means that we've flushed out all that we can so far. |
|
if (this._needDrain) { |
|
// debug("emitting drain", this._bufferLength) |
|
this._needDrain = false |
|
this.emit("drain") |
|
} |
|
|
|
if ((this._bufferLength === 0) && this._ended && !this._endEmitted) { |
|
// debug("emitting end", this._bufferLength) |
|
this._endEmitted = true |
|
this.emit("end") |
|
} |
|
|
|
this._emitting = false |
|
|
|
// debug(" BS no longer emitting", flush, this._paused, this._emitting, this._bufferLength, this._chunkSize) |
|
}
|
|
|