Batching operations in Node.js writable streams posted on 15 February 2015

I recently implemented writable and transform streams in rethinkdbdash. Importing a file to a RethinkDB table is now as simple as piping a Readable stream into a Writable one. The constraints for the implementation were:

  • Insert documents in order (first in, first written).
  • Insert documents in batch of at most highWaterMark documents.

I somehow couldn’t find anything about this use case, and now that I have implemented these streams, I kind of understand why. Basically the current API is not designed for batching operations - even Transform streams that support buffering (with _flush) behave poorly when it comes to efficiently batching operations.

Let’s first look at the API for Writable streams. We have to implement writable._write(value, encoding, done). In rethinkdbdash case, we want to insert documents, so the stream are created in objectMode; value is one document, encoding is not relevant, and done is to be called when we are done processing the supplied document.

A basic implementation would be:

var Writable = require('stream').Writable;
var r = require('rethinkdbdash')();

function WritableStream(table) {
  this._table = table; // The RethinkDB table to insert in

  Writable.call(this, {
      objectMode: true,
      highWaterMark: this._highWaterMark
  });
}

WritableStream.prototype._write = function(value, encoding, done) {
  r.table(this._table).insert(value).run().then(function(result) {
    done();
  }).error(done);
}

Let’s consider how to batch inserts now. Looking at the implementation of Writable streams in Node.js, a Writable streams has an internal buffer implemented as a linked list (since 0.12 - see the relevant pull request), and you can access the last element of this list via stream._writableState.lastBufferRequest.

So what we can do is look at the current value we are processing, check if it is the same as the last available value in the internal buffer and from here:

  • just call done to keep buffering if there is more data available
  • flush what we have if there is nothing more in the internal buffer.

So the implementation becomes:

var Writable = require('stream').Writable;
var r = require('rethinkdbdash')();

function WritableStream(table) {
  this._table = table; // The RethinkDB table to insert in
  this._cache = []; // Current cached documents that will be saved at the next insertion
  this._highWaterMark = 16; // The size of our cache and or the internal buffer

  Writable.call(this, {
      objectMode: true,
      highWaterMark: this._highWaterMark
  });
}

WritableStream.prototype._write = function(value, encoding, done) {
  this._cache.push(value);

  if ((this._writableState.lastBufferedRequest != null) &&
      (this._writableState.lastBufferedRequest.chunk !== value)) {
    // There is more data available
    if (this._cache.length === this._highWaterMark) {
      // Our cache is full, flush documents
      this._insert(done)
    }
    else {
      // We have more data coming, let's buffer more
      done();
    }
  }
  else {
    // There is NO more data available, flush what we have
    this._insert(done);
  }
}

WritableStream.prototype._insert = function(done) {
  var values = this._cache;
  this._cache = [];
  r.table(this.table).insert(values).run().then(function(result) {
    done();
  }).error(done);
}

While this implementation looks fine, it suffers from the fact that when we are inserting, the stream is not re-filling its internal buffer. If we inspect the size of this.cache before each insert, when the Writable streams is provided a fast Readable stream, we will see roughly a sequence like 1, highWaterMark-1, 1, highWaterMark-1, 1, highWaterMark-1 etc.

From here things become tricky. Trying to call done just after calling this._insert to attempt re-buffering is actually not enough. From an implementation point of view, done should be called immediately after this._insert as long as there is more data in the cache. If there is no more data available and if we are already inserting, we have to keep a reference of done (in this._pendingCallback in the implementation below) and call it once the current insertion is complete.

One problem that surfaces after implementing this solution is that if you have a fast stream piping into your writable stream, the internal buffer of the writable stream will not buffer fast enough; you still have that sequence of 1 and highWaterMark-1. The only way I found to have optimal buffering in case of an efficient input is to move the call of _write in the call stack.

The implementation looks like this one.

var Writable = require('stream').Writable;
var r = require('rethinkdbdash')();

function WritableStream(table, options, connection) {
  this._table = table; // The RethinkDB table to insert in
  this._cache = []; // Current cached documents that will be saved at the next insertion
  this._pendingCallback = null; // The callback to call when we are done inserting
  this._inserting = false; // Whether an insertion is happening
  this._delayed = false; // Whether the current call to _next was moved in the call stack or not
  this._highWaterMark = options.highWaterMark || 16; // The size of our cache and or the internal buffer

  Writable.call(this, {
    objectMode: true,
    highWaterMark: this._highWaterMark
  });
};
util.inherits(WritableStream, Writable);

WritableStream.prototype._write = function(value, encoding, done) {
  this._cache.push(value);
  this._next(value, encoding, done);
}

// Everytime we want to insert but do not have a full buffer,
// we recurse with setImmediate to give a chance to the input
// stream to push a few more elements
WritableStream.prototype._next = function(value, encoding, done) {
  if ((this._writableState.lastBufferedRequest != null) &&
      (this._writableState.lastBufferedRequest.chunk !== value)) {
    // There's more data to buffer
    if (this._cache.length < this._highWaterMark) {
      this._delayed = false;
      // Call done now, and more data will be put in the cache
      done();
    }
    else {
      if (this._inserting === false) {
        if (this._delayed === true) {
          this._delayed = false;
          // We have to flush
          this._insert();
          // Fill the buffer while we are inserting data
          done();
        }
        else {
          var self = this;
          this._delayed = true;
          setImmediate(function() {
              self._next(value, encoding, done);
          })
        }
      }
      else {
        this._delayed = false;
        // to call when we are dong inserting to keep buffering
        this._pendingCallback = done;
      }
    }
  }
  else { // We just pushed the last element in the internal buffer
    if (this._inserting === false) {
      if (this._delayed === true) {
        this._delayed = false;
        // to call when we are dong inserting to maybe flag the end
        // We cannot call done here as we may be inserting the last batch
        this._pendingCallback = done;
        this._insert();
      }
      else {
        var self = this;
        this._delayed = true;
        setImmediate(function() {
          self._next(value, encoding, done);
        })
      }
    }
    else {
      this._delayed = false;
      // We cannot call done here as we may be inserting the last batch
      this._pendingCallback = done;
    }
  }
}

WritableStream.prototype._insert = function() {
  var self = this;
  self._inserting = true;

  var cache = self._cache;
  self._cache = [];

  self._table.insert(cache).run().then(function(result) {
    self._inserting = false;
    if (result.errors > 0) {
      self.emit('error', new Error("Failed to insert some documents:"+JSON.stringify(result, null, 2)));
    }
    if (typeof self._pendingCallback === 'function') {
      var pendingCallback = self._pendingCallback;
      self._pendingCallback = null;
      pendingCallback();
    }
  }).error(function(error) {
    self._inserting = false;
    self.emit('error', error);
  });
}

This may be a bit hard to digest, but when we have nothing to add in our cache, we flip this._delayed to true and call the same function (_next) with setImmediate. It is only if we end up in the same situation that we will insert an incomplete batch. Tests show a better flow of data (a sequence of highWaterMark values instead of 1 and highWaterMark-1).

Transform streams can be implemented slightly in a more efficient way thanks to the _flush method; we can always call done after insert in an attempt to buffer more incoming data, but as far as I can tell, moving _write in the call stack is also required for a better flow.

The API for Node.js streams is still tagged as unstable, so hopefully the API will become more friendly for the use cases similar to the one described here.

Questions? Feedback? Shoot me a mail at [email protected] or ping me on Twitter @neumino

Note: Because the internal buffer changed data structure, the implementation showed above does not work that well in Node 0.10.