Real World Observables

Reactive code examples can be mind-blowing. Powerful, succint, robust…they seem to handle many concurrency scenarios without breaking a sweat. But let’s be honest, examples from 30-minute conference talks and short blog posts rarely reflect the messy real world™.

In any case, developers get all pumped up about reactive goodness and want to use it in their projects. Alas, they quickly find out that it’s not all reactive unicorns and rainbows, and that thinking of state in traditional programs as a sequence of events is not trivial.

I will attempt to shed some light on how to do that by writing a real world example in a few lines of RxJS: An FTP client.

An FTP client??

What is this, an article for dinosaurs? Are we also going to write it in COBOL, and use punch cards?? Go ahead, laugh at my choice for a “real world” program. I’ll wait. The venerable FTP protocol should have died an honorable death long time ago, and it’s admittedly pretty awful by today’s standards. But hey, it’s 2016 and FTP is still here, clinging to life and still used for transferring files all around the world. And it’s an easy enough protocol to understand (disclaimer: I implemented a popular FTP library for Node in the past).

So, in this article we’ll toast to FTP by making a simple client (no passive/active mode, no TLS, for the sake of brevity) that should work with most FTP servers out there.

Let’s get to it, and get off my lawn.

FTP internals crash-course

In the FTP protocol, the client sends a command with 0 or more parameters separated by spaces. The server then sends a response back with a numeric code and a message. We use the code to know the result of executing our command.

This is an example raw FTP session (user commands are in bold):

The server reads one command and returns its response before running the next one. By working sequentially, it makes it harder to manage in an asynchronous environment like Node.js, because we need to keep the state of each sent command and match it with responses coming from the server to make sure that we don’t mix responses up. Fortunately for us, this is a non-issue when using reactive programming.

Response format

For stuff like retrieving directory listings, the server needs to return multiline responses. The response rules are very clear:

  • The last line of a response always begins with three ASCII digits and a space
  • Any other line is considered part of a multi-line response

For example, the following six lines contain two responses:

To parse responses, we create a helper function parseFtpResponses:

This function is the only part of the program that keeps any kind of state, but since it is local state it can’t cause us any trouble. Used in a reduce or a scan operator, parseFtpResponses will transform separate strings to a sequence of FTP responses

Node streams to Observables

These are the modules we’ll use for this experiment:

Let’s now create the Ftp class:

responses is an Observable that wraps the connection to the FTP server _ftpSocket and emits its data. Easy peasy.

Of course, _ftpSocket will spew raw data in utf8. It’s our task to make sense of it. But because responses is an Observable, it’s easy to transform its data. We’ll start by making the Observable emit only whole lines of text, delimited by CRLF characters, as specified by the FTP RFC:

The flatMap operator replaces data emitted in fromStream with whole lines of text, separated by newline characters. We then apply parseResponses to the sequence of lines, to obtain whole FTP responses, and finally we discard empty responses with filter and extract the response value with map.

So, responses now emits nice and groomed server responses. With responses finished we actually have the bulk of our client library already coded. We just need a couple more things.

Writing to the socket

Requests is a Subject where we can push commands, and will output the same commands to us (Remember that a Subject acts as an Observer and as an Observable at the same time).

this.requests = new Rx.Subject();

And we create the only method in the Ftp class, the command method, which will push commands to the requests Subject:

command(cmd) {
  cmd = cmd.trim() + '\r\n';
  this.requests.onNext(cmd);
}

And whenever a new command is emitted by requests, we push it in the socket.

writeToStream(this.requests, this._ftpSocket, 'utf8');

Zip it up!

And here comes my favorite part in this whole bunch of code. Since every command receives one reply (except for some commands return “marks”, but we’ll ignore those in this post), we simply pair every request emitted with every response returned:

this.tuples = Rx.Observable.zip(this.requests, res.skip(1));

The zip operator pairs values from 2 or more observables in the order they are emitted. It guarantees that every command is paired with the response that comes afterwards. Notice how we use skip to dismiss the first response from the server. That’s because FTP servers emit a welcome message when a client connects. We’re not interested in that message, and it would mess up our zipping.

That’s it! You can see the complete code for the library here.

The REPL

Let’s use the library we’ve just created and code a nice REPL that looks like this:

Pretty slick! Fortunately, Node.js comes with the repl library, which makes creating these kind of programs a breeze:

In this code we simply get ahold of the Ftp stream of request/response tuples, taking only the responses. We then make a new Subject where we’ll push the callbacks that the REPL eval property uses to signal that a command has been introduced. We tell the REPL to use stdin/stdout for input and output, respectively. Every time the user introduces a command, it will call Ftp.command with it, and push the callback in the callbacks Subject.

Finally, we use our friend zip again to pair responses from the server with their respective callback, and we subscribe to the Observable generated by zip, where we run each command callback with its corresponding response from the server.

Conclusion

This article may need a couple of reads until everything makes sense. In short, RxJS programming is all about thinking in streams of data (Observables) that keep as little state as possible, and that can be continuously transformed to achieve the desired output.

If you want to know more about the possibilities of Reactive Programming, I wrote a book about it with everything you need to know to write reactive JavaScript applications 🙂 Go check it out!

Find the complete code for this article here. Discussion of this article in Hacker News.

Leave a Reply

Your email address will not be published. Required fields are marked *