🏠 Home   🕵🏽‍♂️ About

node-amqp: Setting up a dead letter exchange Fri 6 Sep, 2013

This post was originally posted on the Media Ingenuity tech blog

We've started to utilize Node.js a lot more over the last year at Media Ingenuity, to the point where a few of the team might even be classed as fanboys (guilty as charged). There's so little friction to getting things done that it's become my go to technology for trying things out or prototyping new ideas. That's exactly the approach we took when we needed to figure out how to collect and store data about user interactions on our sites. Following a few tests and iterations, fast-forward a couple of months, and we're now publishing data from web clients using socket.io. That data then gets pushed to CloudAMQP, where it gets routed to the appropriate queue. From there, we have consumers (also Node processes using node-amqp) doing the data processing. At the moment, most of the data gets pumped into SQL for analysts to play with, but there's also the potential for some cool realtime stuff.

One of the challenges with the setup described was in ensuring we don't lose data. Whilst at first we accepted that we might lose some data while we figured out what we were doing, we're now reliant on that data for a number of things, so we can't afford any gaps. We tried a couple of different approaches to address that, before we settled on the current (and so far, best) solution, dead letter exchanges in RabbitMQ.

Initially, we had the queue consumers go about their business, and if anything went wrong along the way, we just dumped data into log files. We took the view that by tailing the logs, and with some simple alerts, we could react to problems and recover data from the logs if needed. In reality, it didn't quite turn out that way. The cost of recovering data from files was ultimately too prohibitive, so we ended up just accepting the loss and moving on. Certainly not ideal in the long run.

We then implemented manual message acknowledgements in RabbitMQ. In this scenario, the queue delivers a message, but doesn't discard it until the consumer acknowledges that it was handled successfully. Messages that are unsuccessful then need to be dealt with somehow, which is where the dead letter feature comes into play. RabbitMQ allows you to specify an exchange as the dead letter exchange for any queue on creation. When a message is then rejected, it is automatically sent to the dead letter exchange. The consumer process ends up looking something like this

var connection = amqp.createConnection({url: config.CLOUD_AMQP}, connOptions);

connection.on('ready', function() {

// dead letter exchange for rejected messages
connection.exchange(
'DeadLetterExchange',
options = {
durable: true,
autoDelete: false,
type: 'topic'
},
function (ex) {
// queue that'll receive all dead-lettered messages
connection.queue('DeadLetterQueue',
options = {durable: true, autoDelete: false},
function (q) {
q.bind(ex, '#');
}
);
}
);

// primary exchange
connection.exchange(
config.EXCHANGE_NAME,
options = {durable: true, autoDelete: false, type: 'fanout'},
function (ex) {
connection.queue(
config.QUEUE_NAME,
options = {
durable: true,
autoDelete: false,
closeChannelOnUnsubscribe: true,
arguments: {
'x-dead-letter-exchange': 'DeadLetterExchange'
}
},
function (q) {
q.bind(ex, '#');

// define my message handler
q.subscribe(options = {ack: true}, function (payload, headers, info, msg) {
// HANDLE THE MESSAGE THEN...
if (success) {
msg.acknowledge();
} else {
msg.reject(false); // reject=true, requeue=false causes dead-lettering
}
});
}
);
}
);
});

A couple of points on the setup above