Google+ Badge

Monday, April 18, 2016

Let's Build Something: Elixir, Part 5a - Data Ingest, Consumption, and Validation

Whew! Been a little while, but let's keep cruisin'! This installment will tackle defining a custom data type for our data points, providing a means by which we can queue up ingested data and consume it continuously, and validate it as we go along. We'll also write some tests to keep our sanity.

First let's define what our data points should look like. We'll keep it simple for now, and can expand later. Open up a new file at lib/stats_yard/data_point.ex:


There a couple of things happening here that you might be interested in:

  • Line 4: We define a type for our DataPoint struct. Note that __MODULE__ is just a safe way to reference the name of the current module (StatsYard.DataPoint). We can later reference this as StatsYard.DataPoint.t when the need arises.
  • Line 5: Define our basic DataPoint struct. Note that structs always inherit the name of the module in which they are defined. If we wanted it to be referenced as something other than %StatsYard.DataPoint{} we would need to define a module within the outer module, such as StatsYard.DataPoint.Thing. There's no real need for that in this case.
  • Line 7-10: Set up a validation function that will only work when the argument passed is one of our shiny new structs, and the various keys therein pass our guards. Specifically we want the metric and entity fields to be strings (or binaries, in Elixir/Erlang land), and the value to be some type of number.
  • Line 11-12: If we end up in this version of the validate function, log a message and return a tuple to indicate success and spit back out the provided struct.
  • Line 15: Define the "fall-through" validate function that will match on any argument that doesn't pass the above guards. In this case, log a warning and an :invalid tuple with the provided value included.
This module is intended to wrap up the structure of the data we want to use and the functions that are relevant in inspecting and validating it.

Next up, let's add something to let us queue up incoming data points. I like Joe Kain's BlockingQueue. It's a statically-sized GenServer'd FIFO queue that blocks when full or empty. Super simple, and very effective. NB: Joe also has a super awesome blog called Learning Elixir that you really should check out.

First up we need to add it to our deps list in mix.exs:


Then follow that up with a mix deps.get, and we're ready to roll.

First let's walk through the idea here. I want to have a BlockingQueue GenServer that catches our ingested DataPoints, and a separate consumer GenServer that will pop those DataPoints off the ingest queue and validate them.  The most important part of all this is that I don't want to do things in batches, nor do I want to have to explicitly trigger the consumption of data from the ingest queue. Enter supervision and streams!

BlockingQueue's API gives two functions for popping events off a queue: pop/1 and pop_stream/1. As you might have guessed, pop/1 removes and returns the oldest single value from the queue, while pop_stream/1 returns a Stream function - specifically, Stream.repeatedly/1. If you're unfamiliar with Streams, they're effectively a safe mechanism by which you can perform actions on arbitrarily large data sets without having to pull the entire thing into memory. I'm not the best to describe these in great detail, but Joe Kain and Elixir's Getting Started guide have some good descriptions and applications.

So the layout of these bits is going to be something like this:

  1. Start up a named and supervised BlockingQueue GenServer
  2. Start up a named and supervised consumer GenServer that can find the above BlockingQueue process
  3. Consumer process grabs hold of the Stream function returned by BlockingQueue.pop_stream/1 and proceeds to validate every DataPoint that gets pushed into the queue

Here's our consumer:


So here's what's going on in this module:

  1. Line 6: This is my nifty way of telling this function "keep an eye out for things popping up in this stream of data, and take appropriate action when you see a new item." We'll test this out shortly in iex
  2. Line 7: Try to validate, as a DataPoint, every item that comes off the stream
  3. Line 8-9: If the validate succeeds, return the DataPoint, otherwise discard it entirely (remember that our StatsYard.DataPoint.validate/1 function will log a warning when a value fails validation)
  4. Line 18: Note that our public start_link/2 function expects to receive the PID of our ingest queue, which we'll provide in lib/stats_yard.ex when we set up our supervision tree
  5. Line 23-25: Start up a linked process that will kick off our queue consumption loop in consume_data_points/1
  6. Line 27: Set our GenServer's state to the PID of our ingest queue, and we're done!
Notice that this is a very simple GenServer - so simple, in fact, that it doesn't even have any direct means of interaction. For now, this is more than sufficient - we just want something that we can supervise and organize appropriately, with the option to extend it for more robust behavior in the future. (For the studious, you're right - there's always Elixir's GenEvent, but that's for future posts!)

Now let's rig all this up in our supervision tree, and then we'll poke around in iex to see if it's all working as expected. Notice that this has been cleaned up a bit to accommodate our TimeStampWriter bits without getting too cluttered:


(I know that's a big chunk of code to dump into a blog post - my apologies. I mostly wanted to be sure to point out that the structure of this stuff changed significantly. Newer changes will be limited to just the diffs. :-) )

Nothing super exciting here, other than a few things to note:
  1. All supervisors are now started up in their own independent functions, which are called from the pared-down start/2 function
  2. Our supervisors are now named appropriately (Lines 20 and 36)
  3. Our start_main_ingest/0 function lists two workers to be started up under the appropriate supervisor, which will start in the order listed (this will be on the quiz)
  4. Atoms used to name our GenServer processes are pulled out and returned from simple functions at the bottom of the file, so as to avoid headaches later
Enough work, let's play with it! Fire up iex and we'll see if our stuff works:


Cool! We're able to push things into our BlockingQueue without having to know much about it, and our IngestConsumer immediately received the pushed values and attempted to validate them, the results of which are spit back out via log messages.

Now for that quiz I mentioned earlier: in what order were our two ingest GenServers started? Yup, the order listed in our supervision tree definition - the queue first, then the consumer. Why does this matter? 

There's a failure case that we need to recognize and accommodate. Specifically, if our ingest queue process dies, it will indeed be restarted by the supervisor... but our consumer process will merrily chug along holding onto a Stream function that references a now-dead process! That sounds like bad news, but let's verify that I'm not making stuff up:


I know that's a bit dense, but the gist (har har) of it is that we used Supervisor.which_children/1 to see what the PIDs of our two GenServers were, stopped the main_ingest_queue process (rather rudely, too, a la :kill), then checked to see that the expected PID had indeed updated in the supervisor's state. Then we tried to push a value into the main ingest queue, which did indeed work since it had been restarted, but our ingest consumer process never knew about it, because it's waiting for events to flow in from a dead process. That's lame, so let's fix it!

Turns out, this a super simple one-line fix, but reading the docs is a must in order to understand why this fix is appropriate (head over to the Supervisor docs, then search for "rest_for_one"). In lib/stats_yard.ex:


And now to test it out in iex:


Woohoo! Worked like a charm. What's happening here? First, read the docs. :-) Second, in a nutshell, using the strategy rest_for_all causes the supervisor to consider the position of any process that dies under its supervision and then kill and restart all supervised processes that were started subsequent to the original dead process. In our case, the queue process is the first one, so if it dies, everything in the supervision tree of our MainIngestSupervisor is restarted. If it were, for example, the 3rd process started by this supervisor, then the 4th, 5th, ..., nth processes would be restarted, while the 1st and 2nd processes would be left alone. Super handy stuff here!

To Be Continued...

So now we're in a good place from a supervision point of view. This post is already pretty lengthy, so I'm going to title it "Part 5a," and we'll continue with some unit tests and documentation in Part 5b.

Til next time!