On building a REST API with thinky posted on 18 April 2015

ReQL is an incredibly powerful beast, and thinky wields most of its power by using the exact same API. While thinky can be used like classic ORMs like mongoose, it is a shame to miss some of its really nice features.

This article describes how to build the thinky part of a REST API for Express, and hopefully is the first of a serie that will showcase what thinky can do. This article focuses only on thinky. If you want to learn how Express work, there is a plethora of tutorial on the Internet.

I. Create the model

var thinky = require('thinky')();
var type = thinky.type;
var r = thinky.r;

var User = thinky.createModel('User', {
  id: type.string(),
  name: type.string().required(),
  email: type.string().email().required(),
  createdAt: type.date().default(r.now())
});

The model has 4 fields:

  • id: a simple string.
  • name: a required string.
  • email: a required string that should be a valid email.
  • createdAt: the date at which the user was created.

The table will be automatically created under the hood. If you immediately fire queries while the table is not ready, the queries will be queued.

II. Insert a new user

function insert(request, response, next) {
  var user = new User(request.body);

  user.save().then(function(result) {
    // user === result
    res.send(JSON.stringify(result));
  }).error(function(error) {
    // Duplicate primary key, not valid document, network errors etc.
    response.send(500, {error: error.message}
  });
}

There are a few things to note here:

  • The object request.body needs to only provide two fields name and email.
  • The field id is the primary key and if undefined, will be automatically generated by RethinkDB when the document is inserted.
  • The field createdAt, when undefined, is automatically set to r.now() by thinky and will be replaced in the database by the time at which the query is executed.

III. Get one document by primary key

function get(request, response, next) {
  User.get(request.id).run().then(function(user) {
    res.send(JSON.stringify(user));
  }).error(function(error) {
    // Document not found, network errors etc.
    response.send(500, {error: error.message}
  });
}

IV. Update a user given its primary key

function get(request, response, next) {
  User.get(request.id).update(request.body).run().then(function(user) {
    res.send(JSON.stringify(user));
  }).error(function(error) {
    // Document not found, not valid document, network errors etc.
    response.send(500, {error: error.message}
  });
}

If you look at this snippet, a unique query is executed, not two. ORMs usually require you to write something like below, which executes two queries.

// Works with thinky, but you do not have to run two queries.
function get(request, response, next) {
  User.get(request.id).run().then(function(user) {
    user.merge(request.body);
    return user.save()
  }).then(function(user) {
    return JSON.stringify(user)
  }).error(function(error) {
    response.send(500, {error: error.message}
  });
}

So what does thinky do in the first snippet?

  • It first validate all the fields passed in update.
  • It run the update query in RethinkDB.
  • It validates the whole new document (returned by the update query).

Thinky validates the whole document again because it can also validation accross multiple fields (like check that a user is more than 21 if he lives in the US, else check that the user is more than 18).

In the most common case, you just validate the type of each field, so the third step will never fails. If it does the document will be reverted (and only in this case two queries are executed).

Note: The user may be returned as undefined if the update is a no-op query. This is currently a regression with 2.0 (see rethinkdb/rethinkdb#4068 to track progress).

V. Delete a user given its primary key

function get(request, response, next) {
  User.get(request.id).delete().execute().then(function(result) {
    res.send(JSON.stringify({status: "ok"}));
  }).error(function(error) {
    // Document not found, network error etc.
    response.send(500, {error: error.message}
  });
}

We use execute here and not run because no document will be returned.

VI. Return all users

function all(request, response, next) {
  User.run().then(function(users) {
    res.send(JSON.stringify(users));
  }).error(function(error) {
    // Network errors etc.
    response.send(500, {error: error.message}
  });
}

VII. Pagination

var perPage = 50;

function range(request, response, next) {
  var start = (request.start) ? request.start: r.minval;
  User.between(start, r.maxcal).limit(perPage).run().then(function(users) {
    res.send(JSON.stringify(users));
  }).error(function(error) {
    response.send(500, {error: error.message}
  });
}

Pagination here is done via primary key with between/limit, and not with skip/limit for performance reasons.

Need the number of users?

function range(request, response, next) {
  User.count().execute().then(function(count) {
    res.send(JSON.stringify(count));
  }).error(function(error) {
    response.send(500, {error: error.message}
  });
}

This is it! Stay tuned for the next article!

Batching operations in Node.js writable streams posted on 15 February 2015

I recently implemented writable and transform streams in rethinkdbdash. Importing a file to a RethinkDB table is now as simple as piping a Readable stream into a Writable one. The constraints for the implementation were:

  • Insert documents in order (first in, first written).
  • Insert documents in batch of at most highWaterMark documents.

I somehow couldn't find anything about this use case, and now that I have implemented these streams, I kind of understand why. Basically the current API is not designed for batching operations - even Transform streams that support buffering (with _flush) behave poorly when it comes to efficiently batching operations.

Let's first look at the API for Writable streams. We have to implement writable._write(value, encoding, done). In rethinkdbdash case, we want to insert documents, so the stream are created in objectMode; value is one document, encoding is not relevant, and done is to be called when we are done processing the supplied document.

A basic implementation would be:

var Writable = require('stream').Writable;
var r = require('rethinkdbdash')();

function WritableStream(table) {
  this._table = table; // The RethinkDB table to insert in

  Writable.call(this, {
      objectMode: true,
      highWaterMark: this._highWaterMark
  });
}

WritableStream.prototype._write = function(value, encoding, done) {
  r.table(this._table).insert(value).run().then(function(result) {
    done();
  }).error(done);
}

Let's consider how to batch inserts now. Looking at the implementation of Writable streams in Node.js, a Writable streams has an internal buffer implemented as a linked list (since 0.12 - see the relevant pull request), and you can access the last element of this list via stream._writableState.lastBufferRequest.

So what we can do is look at the current value we are processing, check if it is the same as the last available value in the internal buffer and from here:

  • just call done to keep buffering if there is more data available
  • flush what we have if there is nothing more in the internal buffer.

So the implementation becomes:

var Writable = require('stream').Writable;
var r = require('rethinkdbdash')();

function WritableStream(table) {
  this._table = table; // The RethinkDB table to insert in
  this._cache = []; // Current cached documents that will be saved at the next insertion
  this._highWaterMark = 16; // The size of our cache and or the internal buffer

  Writable.call(this, {
      objectMode: true,
      highWaterMark: this._highWaterMark
  });
}

WritableStream.prototype._write = function(value, encoding, done) {
  this._cache.push(value);

  if ((this._writableState.lastBufferedRequest != null) &&
      (this._writableState.lastBufferedRequest.chunk !== value)) {
    // There is more data available
    if (this._cache.length === this._highWaterMark) {
      // Our cache is full, flush documents
      this._insert(done)
    }
    else {
      // We have more data coming, let's buffer more
      done();
    }
  }
  else {
    // There is NO more data available, flush what we have
    this._insert(done);
  }
}

WritableStream.prototype._insert = function(done) {
  var values = this._cache;
  this._cache = [];
  r.table(this.table).insert(values).run().then(function(result) {
    done();
  }).error(done);
}

While this implementation looks fine, it suffers from the fact that when we are inserting, the stream is not re-filling its internal buffer. If we inspect the size of this.cache before each insert, when the Writable streams is provided a fast Readable stream, we will see roughly a sequence like 1, highWaterMark-1, 1, highWaterMark-1, 1, highWaterMark-1 etc.

From here things become tricky. Trying to call done just after calling this._insert to attempt re-buffering is actually not enough. From an implementation point of view, done should be called immediately after this._insert as long as there is more data in the cache. If there is no more data available and if we are already inserting, we have to keep a reference of done (in this._pendingCallback in the implementation below) and call it once the current insertion is complete.

One problem that surfaces after implementing this solution is that if you have a fast stream piping into your writable stream, the internal buffer of the writable stream will not buffer fast enough; you still have that sequence of 1 and highWaterMark-1. The only way I found to have optimal buffering in case of an efficient input is to move the call of _write in the call stack.

The implementation looks like this one.

var Writable = require('stream').Writable;
var r = require('rethinkdbdash')();

function WritableStream(table, options, connection) {
  this._table = table; // The RethinkDB table to insert in
  this._cache = []; // Current cached documents that will be saved at the next insertion
  this._pendingCallback = null; // The callback to call when we are done inserting
  this._inserting = false; // Whether an insertion is happening
  this._delayed = false; // Whether the current call to _next was moved in the call stack or not
  this._highWaterMark = options.highWaterMark || 16; // The size of our cache and or the internal buffer

  Writable.call(this, {
    objectMode: true,
    highWaterMark: this._highWaterMark
  });
};
util.inherits(WritableStream, Writable);

WritableStream.prototype._write = function(value, encoding, done) {
  this._cache.push(value);
  this._next(value, encoding, done);
}

// Everytime we want to insert but do not have a full buffer,
// we recurse with setImmediate to give a chance to the input
// stream to push a few more elements
WritableStream.prototype._next = function(value, encoding, done) {
  if ((this._writableState.lastBufferedRequest != null) &&
      (this._writableState.lastBufferedRequest.chunk !== value)) {
    // There's more data to buffer
    if (this._cache.length < this._highWaterMark) {
      this._delayed = false;
      // Call done now, and more data will be put in the cache
      done();
    }
    else {
      if (this._inserting === false) {
        if (this._delayed === true) {
          this._delayed = false;
          // We have to flush
          this._insert();
          // Fill the buffer while we are inserting data
          done();
        }
        else {
          var self = this;
          this._delayed = true;
          setImmediate(function() {
              self._next(value, encoding, done);
          })
        }
      }
      else {
        this._delayed = false;
        // to call when we are dong inserting to keep buffering
        this._pendingCallback = done;
      }
    }
  }
  else { // We just pushed the last element in the internal buffer
    if (this._inserting === false) {
      if (this._delayed === true) {
        this._delayed = false;
        // to call when we are dong inserting to maybe flag the end
        // We cannot call done here as we may be inserting the last batch
        this._pendingCallback = done;
        this._insert();
      }
      else {
        var self = this;
        this._delayed = true;
        setImmediate(function() {
          self._next(value, encoding, done);
        })
      }
    }
    else {
      this._delayed = false;
      // We cannot call done here as we may be inserting the last batch
      this._pendingCallback = done;
    }
  }
}

WritableStream.prototype._insert = function() {
  var self = this;
  self._inserting = true;

  var cache = self._cache;
  self._cache = [];

  self._table.insert(cache).run().then(function(result) {
    self._inserting = false;
    if (result.errors > 0) {
      self.emit('error', new Error("Failed to insert some documents:"+JSON.stringify(result, null, 2)));
    }
    if (typeof self._pendingCallback === 'function') {
      var pendingCallback = self._pendingCallback;
      self._pendingCallback = null;
      pendingCallback();
    }
  }).error(function(error) {
    self._inserting = false;
    self.emit('error', error);
  });
}

This may be a bit hard to digest, but when we have nothing to add in our cache, we flip this._delayed to true and call the same function (_next) with setImmediate. It is only if we end up in the same situation that we will insert an incomplete batch. Tests show a better flow of data (a sequence of highWaterMark values instead of 1 and highWaterMark-1).

Transform streams can be implemented slightly in a more efficient way thanks to the _flush method; we can always call done after insert in an attempt to buffer more incoming data, but as far as I can tell, moving _write in the call stack is also required for a better flow.

The API for Node.js streams is still tagged as unstable, so hopefully the API will become more friendly for the use cases similar to the one described here.

Questions? Feedback? Shoot me a mail at [email protected] or ping me on Twitter @neumino

Note: Because the internal buffer changed data structure, the implementation showed above does not work that well in Node 0.10.

Replicate ReQL API in your own classes posted on 04 February 2015

ReQL is embedded in the host language. If you use JavaScript, you do not have to concatenate and escape SQL strings, or build JSON objects with some special keys; your queries are plain JavaScript.

For example with rethinkdbdash, updating all the users that are at least 18 years old with a field isAdult set to true (in one round trip), can be written like this:

var promise = r.table('users').filter(function(user) {
  return user('age').gt(18)
}).update({ isAdult: true }).run();

RethinkDB ships with a data explorer where you can get test your queries. Because ReQL is composed of chainable commands, the data explorer can provide you with suggestions and auto-completion, making learning ReQL easy and fun.

Replicate the API

But this is not the only advantage of having an embedded chainable language; you can import the query language in your own classes by just copying the commands. This is what thinky (a Node.js ORM) does and this has multiple benefits:

  • It is easy to do and require very little work.
  • Because the API is the same, the learning curve is a Heaviside step.

This article explains how to replicate ReQL API using rethinkdbdash. The same operation can be done with the official driver or in another flexible language like Python, Ruby etc. The only thing to know about rethinkdbdash, is that all commands return an instance of the class Term, and that Term implements all the methods like filter, get, update etc.

Suppose that you have a class User.

function User(data) {
  this.id = data.id;
  this.name = data.name;
  this.email = data.email;
  this.ecp = data.ecp || null; // id of the emergency contact person
};

You can import all the methods in User by looping over all the keys in Term.

var r = require('rethinkdbdash')();
var Term = r.expr(1).__proto__;

for(var key in Term) {
  (function(key) { // this immediately invoked function is required
    User.prototype[key] = function() {
      return r.table('users')[key].apply(table, arguments);
    }
  })(key)l
}

Now you can write:

// Set the name of the user with id 1
Users.get(1).update({name: "Michel"}).run().then(...).error(...);

// Set all the users as "not verified"
Users.update({verified: false}).run().then(...).error(...);

// Perform a simple join to retrieve the emergency contact person
Users.get(1).merge(function(user) {
  return { ecpPerson: Users.get(user.ecp) }
}).run().then(...).error(...);

So if thinky is too cumbersome for your project (if you do not need relations, hooks etc.) you can easily replicate the same API.

Define your own ReQL methods

You can also define methods on Users by simply adding them in Users.prototype, but you can also create your own methods on the queries by wrapping the ReQL queries:

var r = require('rethinkdbdash')();
var Term = r.expr(1).__proto__;

function Query(query) {
  this._query = query; // an instance of Term
};

for(var key in Term) {
  (function(key) {
    if (key === "run") {
      Query.prototype[key] = function() {
        // We want to return the promise returned by the ReQL query here
        return this._query.run.apply(this._query, arguments));
      }
    } else {
      Query.prototype[key] = function() {
        return new Query(this._query[key].apply(this._query, arguments));
      }
    }

    User.prototype[key] = function() {
      return r.table('users')[key].apply(table, arguments)
    }
  })(key);
}

A few notes about this code:

  • You can copy all the methods on Query.prototype except run since for this method you want to return the promise itself.
  • Each method returns a new Query object to enable forking:
var Adults = Users.filter(function(user) { return user("age").gt(18) });
Adults.update({isAdult: true}).run().then(...).error(...);
Adults.filter({location: "US"}).run().then(...).error(...);

This is it! You can now define your own methods:

Query.prototype.isAdult = function() {
  return new Query(this._query.filter(function(user) {
    return user("age").gt(18)
  }));
};
User.prototype.isAdult = function() {
  var table = r.table('users');
  return new Query(table).isAdult();
};

// Retrieve all the adults
var promise = Users.filter({location: "UK"}).isAdult().run();

Questions? Feedback? Shoot me a mail at [email protected] or ping me on Twitter @neumino

Rethinkdbdash 1.16: Node.js streams, implicit run and more posted on 31 January 2015

Note: Never heard of RethinkDB? You may want to read about where it's going.

Rethinkdbdash is an experimental yet stable Node.js driver for RethinkDB. It started as a playground to test promises and connection pools, and still strives today to provide a better experience for developers.

A new version just got released today, and besides an update to support the new RethinkDB 1.16 commands, it comes with a few new features inspired from @jonathanong's writing about an ideal-database-client.

This blog post aims to describe the main differences between rethinkdbdash and the official driver.

Support for Node.js streams

One of the new feature in rethinkdbdash is the support for Node.js streams. You can retrieve a stream with the synchronous method toStream.

// Create a transform stream that will convert data to a string
var stream = require('stream')
var stringifier = new stream.Transform();
stringifier._writableState.objectMode = true;
stringifier._transform = function (data, encoding, done) {
  this.push(JSON.stringify(data));
  this.push('\n');
  done();
}

// Create a writable stream
var fs = require('fs');
var file = fs.createWriteStream('result.txt');

var r = require('rethinkdbdash')();
// Pipe the data to the file through stringifier
r.table("data").toStream()
    .pipe(stringifier)
    .pipe(file);

Optional run

ReQL is composed of chainable methods, requiring the developer to call the run command at the end when the query is complete and ready to be sent to the server. By implementing then and catch as shortcuts for run().then and run().catch, any query will be executed when called with yield without the need to call run.

You can now write queries like this:

var bluebird = require('bluebird');
var r = require('rethinkdbdash')();
bluebird.coroutine(function*() {
    var result = yield r.table("users").insert({id: 1, name: "Michel", age: 28});
    assert.equal(result.inserted, 1);

    result = yield r.table("users").get(1);
    assert.deepEqual(result, {id: 1, name: "Michel", age: 28});

    result = yield r.table("users").get(1).update({name: "John"});
    assert.equal(result.replaced, 1);

    result = yield r.table("users").get(1);
    assert.deepEqual(result, {id: 1, name: "John", age: 28});

    result = yield r.table("users").get(1).update(function(user) {
        return { isAdult: user("age").gt(18) }
    });
    assert.equal(result.replaced, 1);

    result = yield r.table("users").get(1);
    assert.deepEqual(result, {id: 1, name: "John", age: 28, isAdult: true});
});

A single point of failure

Rethinkdbdash has been shipped with a connection pool since its first version. This connection pool is managed by the driver itself, and users do not have to create or manage connections themselves. This has two purposes:

  • Remove repetitive code.
  • Provide a unique point of failure. Consider for example this snippet:
var handleError = function(error) {
    res.status(500).json(error: JSON.stringify(error));
};

r.connect().bind({}).then(function(connection) {
    this._connection = connection;
    this._connection.on('error', handleError);
    return r.table("users").get("[email protected]").run()
}).then(function(user) {
    res.send(JSON.stringify(user));
    return this._connection.close();
}).then(function() {
    this._connection.off('error', handleError);
}).error(handleError);

The problem in this code is that the connection can emit an error at any time, meaning that you have multiple point of failures (the connection and the query itself). In the end, depending on your case, you may have to deal with race conditions. Typically here, handleError may be called twice.

With rethinkdbdash, network errors are caught by the driver and bubbled to the relevant queries. Compare the previous code with:

r.table("users").get("[email protected]").run().then(function(user) {
    res.send(JSON.stringify(user));
}).error(function(error) {
    // Catch error returned by the driver
    // Catch network errors
    res.status(500).json(error: JSON.stringify(error));
});

Like what you read?

Give it a spin:

Note about streams: null values are currently silently dropped from streams as in objectMode, a null value means the end of the stream.

Note: The toStream command was added in rethinkdbdash 1.16.4. The argument {stream: true} has been deprecated.