Consuming DataSift Streams with Node.js

by Tim on February 17, 2011

This Node.js code sample consumes a DataSift stream and invokes a callback with each message object as it arrives.

The DataSift streams contain one JSON object per line, and may receive occasional empty lines to act as a “you are still connected” signal. The sample callback echoes each message out to the console.

/*
 * Consuming a DataSift stream using node.js 
 * To use this sample, you will need your DataSift username, API key and a stream identifier.
 * Copyright 2011 Tim Hastings.
 * Released under the MIT License with no warranties given.
 * Have fun!
 * 
 **/

var http = require('http');

function consumeDataSiftStream(username, apikey, stream_id, callback) {

	var buffer = '';

	// parse the JSON of a single line and invoke the callback
	function processLine(line) {
		try {
			var obj = JSON.parse(line);
			callback(obj);
			
		} catch (err) {
			// parse failure or error from callback 
			console.log(JSON.stringify(err, null, '\t'));
		}
	}
	
  	// handle the HTTP response and attach handlers
  	var response_handler = function(resp) {
  		
  		// data handler
  		resp.addListener('data', function(chunk) {

  			// append to buffer
  			buffer = buffer + chunk;

  			// consume any newlines
  			var pos = buffer.indexOf("\n");
  			while (pos >= 0) {

  				// take the line from the buffer
  				var line = buffer.substring(0, pos);

  				// remove from buffer
  				buffer = buffer.substring(pos + 1);

  				// pass to line handler if not empty
  				if (!line.match(/^\s*$/)) {
  					processLine(line);
  				}

  				// any more lines in the buffer?
  				pos = buffer.indexOf("\n");

  			}

  		});

  		// close handler
  		resp.addListener('end', function() {

  			// any remaining buffer will not have a newline in, so invoke
  			if (buffer.length > 0) {
  				processLine(buffer);
  			}

  		});

  	};
  	
  	// create the connection and attach response handler
  	var path = "/"+stream_id+"?api_key="+escape(apikey)+"&username="+escape(username);
  	var client = http.createClient(80, "stream.datasift.net");
  	var headers = {host: "stream.datasift.net"};
  	var req = client.request('GET', path, headers);
  	req.addListener('response', response_handler);
  	req.write('');
  	req.end();

}

// test it
var username = 'USERNAME';
var apikey = 'APIKEY';
var stream_id = 'STREAM_ID';
consumeDataSiftStream(username, apikey, stream_id, function(obj) {

        // is this an error or idle msg?
        if (obj.status) {
                console.log(JSON.stringify(obj, null, '\t'));
        } else if (obj.tick) {
                console.log();
        } else if (obj.interaction && obj.interaction.author) {
                console.log(obj.interaction.author.username+"\n  "+obj.interaction.content);
        } else {
                console.log(JSON.stringify(obj));
        }
		
});

One comment

[...] This post was mentioned on Twitter by Tim Hastings, DataSift. DataSift said: RT @timhastings: Here is a node.js code snippet to monitor and capture a DataSift real-time stream. http://bit.ly/ijk7Eo #nodejs [...]

by Tweets that mention Consuming DataSift Streams with Node.js « Nuke the site from orbit, it’s the only way to be sure. -- Topsy.com on February 17, 2011 at 4:24 pm. Reply #

Leave your comment

Required.

Required. Not published.

If you have one.