The elegance of Builder pattern

Paraphrasing Josh Bloch in Effective Java [Bloch, 2017, Item 2]:

While creating objects, in cases where the number of optional parameters of an object is considerable, say 4 or more, one might think of static factory methods [Bloch, 2017, Item 1] as a solution — but they’re more suitable for a small set of parameters. When there are several optional params, static factories cannot be used as it’s cumbersome to imagine and cater to all possible parameter combinations. Another approach that’s proposed in such cases is using JavaBeans but it has its own shortcomings.

Therefore, we usually go with multiple (telescoping) constructors for such requirements. For example:

public Cake(int oilTbsp, int flourMg){
  this(oilTbsp, flourMg, 0);
}

public Cake(int oilTbsp, int flourMg, int eggCount){
  this(oilTbsp, flourMg, eggCount, 0);
}

public Cake(int oilTbsp, int flourMg, int eggCount, int bakingPowderMg){
  this(oilTbsp, flourMg, eggCount, bakingPowederMg);
}

//...

Such implementations, although purpose-serving, are a bit contrived in that the class client needs to tally the parameters accurately. Consider a large parameter list, and this would be an overkill.

A variation of Builder pattern [Gamma, 1995], is what Bloch suggests, for such cases. In it, a builder class is a static member of the class it builds, for example:

public class Cake{
  //...
  private Cake(Builder builder){
    //...
  }

  public static class Builder{
    //...
  }
}

Since the original constructor is hidden, the client first gets a reference to the Builder class — passing all the required params to its constructor or static factory. The client then calls setters on the returned builder object to set the optional parameters of interest. Finally, the client makes a call to the build() method to generate an immutable object.
Since the builder setter methods return the builder itself, the invocations can be chained, like so:

// Set only the parameters of interest
Cake cake = new Cake.Builder(350, 45).egg(2).sugar(240).cocoa(35)...build();

As is apparent, this is intuitive as well as concise.

A builder can be further enhanced by enabling it to build more than one object, based on parameters. One has to be cautious, however, to disallow building an object of an inconsistent state. This can be ensured by validating the passed parameters as early as possible and throwing a suitable exception.

Builders can also be used to automate certain tasks and fill in the fields. For example, autoincrementing the object Id, etc.

As Josh Bloch advises, we should be using Builders as often as possible, especially in cases where the number of parameters is significant. They’re a simple and elegant alternative to telescoping constructors or JavaBeans.

[Full implementation of the Cake builder example is here.]

Tying snips.ai, Strava & Google Speech Engine

So, this happened a couple months ago, and I had lots of fun doing it (watch the video):

A detailed post would follow. (And yes, as mentioned in the video description, kindly ignore the choice of LED colours :)).

A Spark learning.

About a month back, I’d done something I was not very proud of — a piece of code that I was not very happy about — and I had decided to get back to it when time permits.

The scenario was something close to the typical Word Count problem, in which the task required counting words at specific indices, and then print the unified count per word. Of course, there were many other things to consider in the final solution since a streaming context was being dealt with — but those are outside the purview of what I’m trying to highlight here.

An abridged problem statement could be put as:

Given a comma-separated stream of lines, pick the words at indices j and k from each line, and print the cumulative count of all the words at jth and kth positions.

So the ‘crude’ solution to isolate all the words was:

  1. Implement a PairFunction<String, String, Integer>, to get all the words at a given index
  2. Use the above function to get all occurrences of words at index jand get a Pair (Word -> Count) RDD
  3. Use the same function to get all occurrences of words at index k, and get another Pair (Word -> Count) RDD
  4. Use the RDD union() operator to combine the above two RDDs, and get a unified RDD
  5. Do other operations (reduceByKey, etc. …)

As is apparent, anyone would cringe at this approach, especially due to the two passes (#2, #3) over the entire data set — even though it gets the work done!
So I decided to revisit this piece, with the tool of additional knowledge about what all Spark offers.

One useful tool is the flatMap operation that Spark Java 8 offers. By Spark’s definition:

flatMap is a DStream operation that creates a new DStream by generating multiple new records from each record in the source DStream

Given our requirement, this was exactly what was needed — create two records (one for each jth and kth index word), for each incoming line. This would, of course, benefit us in that we have the final (unified) RDD in just a single pass of the incoming stream of lines.

I went ahead with a flatMapToPair implementation, like so:

JavaPairDStream<String, Integer>  unified = lines.flatMapToPair((s) -> {
        String a[] = s.split(",");
        List<Tuple2<String, Integer>> apFreq = new ArrayList<>();
        apFreq.add(new Tuple2<>(a[Constants.J_INDEX],1));
        apFreq.add(new Tuple2<>(a[Constants.K_INDEX],1));
        return apFreq.iterator();
});

To further validate the benefits, I ran some tests* with datasets ranging from 1M to 100M records and the benefits of flatMap approach were more and more pronounced as data grew bigger.

Following were the observations.

flatmap

As we can see, whilst the difference is ~2s for 1 million records, it becomes almost twice as we reach 10M and more than twice at around 100M mark.
It’s therefore, obvious that production systems (e.g. a real-time analytics solution), where data the volume is much higher, need to be cautious about the choice of each operation (transformation, filtering or any other action), as these govern the inter-stage as well as the overall throughput of a Spark application.

 


* Test conditions:
– Performed on a 3-Node (m4.large) Spark cluster on AWS, using Spark 2.2.0 on Hadoop 2.7
– Considers only the time spent on a particular stage (union or flatMap), available via Spark UI
– Each reading is an average of time taken in 3 separate runs

A ‘Kafka > Storm > Kafka’ Topology gotcha

If you’re trying to make a Kafka Storm topology work, and are getting baffled by your recipient topic not receiving any damn thing, here’s the secret:

  • The defaultorg.apache.storm.kafka.bolt.KafkaBolt implementation expects only a single key field from the upstream (Bolt/Spout)
  • If you’re tying your KafkaBolt to a KafkaSpout, you’ve got to use the internal name:str
  • However, if you have an upstream Bolt, doing some filtering, then make sure that you tie the name of your ONLY output field (value) to the KafkaBolt

Let me break it down a little bit more for the larger good.

Consider a very basic Storm topology where we read raw messages from a Kafka Topic (say, raw_records), enrich/cleanse them (in a Bolt), and publish these enriched/filtered records on another Kaka Topic (say, filtered_records).

Given that the final publisher (the guy that talks to filtered_records) is a KafkaBolt, it needs a way to find out the relevant key that the values are available from. And that key is what you need to specify/detect from the upstream bolt or spout.

So, the declared output field of the upstream Bolt would be something like:

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields(new String[]{"output"}));
}

Note the key field named “output“.

Now, in KafkaBolt the only thing to take care of is using this key field in the configuration, like so:

KafkaBolt bolt = (new KafkaBolt()).withProducerProperties(newProps(BROKER_URL,
        OUTPUT_TOPIC))
        .withTopicSelector(new DefaultTopicSelector(OUTPUT_TOPIC))
        .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key",
                "output"));

The default key field name is “message“, so you could as well use the no-arg constructor of  FieldNameBasedTupleToKafkaMapper, by specifying the upstream key as “message“.

If however, you have scenario where you’d want to pass both the key and value from the upstream, for example,

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields(new String[]{"word","count"}));
}

Note that we’ve specified the key field here as “word“.

Then obviously, we need to use this (modified) key name downstream, like so:

KafkaBolt bolt = (new KafkaBolt()).withProducerProperties(newProps(BROKER_URL,
        OUTPUT_TOPIC))
        .withTopicSelector(new DefaultTopicSelector(OUTPUT_TOPIC))
        .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word",
                "count"));

Update (2017-08-23): Added the scenario where a modified key name can be used.

(Good) Introduction to ML

The Machine Learning (ML) bandwagon is on, and I do not want to be left behind. There are plenty of YouTube videos, online courses, books, and people with ready advise on what ML is all about. After have skimmed through several of them, I got this video on YouTube, and honestly, this is the best one I have seen thus far.

Many thanks to Ron Bekkerman, and LinkedIn for putting this on YouTube for the larger good!

The glorious 'threat/reward' model

We have witnessed this since our childhood. Right from our homes, to our schools, colleges, and finally in our jobs. The glorious ‘threat/reward’ model! As the name suggests, it’s an approach where a certain set of actions lead (or are known to lead) to a certain reward (candy, toys, perks, H1B, and of course “that-irresistible-promotion”). On the flipside, non-compliance to a given, pre-defined set of laid-out steps, leads to ‘threats’, or a consequence of those threats (read: no candy, no perks, …and..well, you get the point.)

Continue reading The glorious 'threat/reward' model

Alexa + Raspberry Pi = Fun!

So, after a week a of grappling, trying to make Alexa AVS work on a humble Raspberry Pi 2 Model B, I finally had success! Yayyy! Now, thus far, the responses I have received on declaring this little victory of mine have been more or less like “meh!”, but still, I write about it because for me the experience was far more satiating.

So here goes.

I was fascinated by Amazon Echo, and the fact that Amazon has thrown open the doors to the developer community to build its (what is known as) skills. Since Echo is not widely available in this part of the world yet, plus, getting an Echo and getting it to work doesn’t sound like ‘fun’; I decided to look for Alexa (soft-) implementations for other platforms.

Continue reading Alexa + Raspberry Pi = Fun!

Reliable Distributed Communication with JGroups

We were looking for an alternative to Java Messaging Service (JMS) for a project requirement – I had had my fair share of unpleasant experiences with JMS from the past. With JMS, most of which had to do with housekeeping involved in the event of a broker outage.

Issue at hand

The solution we were working on was a legacy web-based application, which was being enabled for a cluster. Scalability, it seems, was not designed into the system. The solution, therefore, demanded making the system scalable with a quick-turnaround, minimal code changes, and of course, if not positive, then no negative impact on the application throughput.

The main challenge was the presence of a programmatic cache (using core Java data structures), for some specific requirements, which of course, needed to be made cluster-aware. Data consistency is the primary concern in such cases, because the application pages can be serviced by any potentially any node.

In this post, I want to talk about how we achieved the same – since such a scenario of different nodes needing to “talk” can arise in many a applications of the present day – as distributed computing is becoming more and more popular.

Analysis

When we say cluster-aware we imply that the clustered application, might need to talk to its peers at various junctures. It might or might not decide to perform an operation based on those interactions – but the conversation does need to occur. There are of course several options today to facilitate the same – JMS, Hazelcast, JGroups, etc. JMS requires the presence of an active broker process at all times. The housekeeping involved in broker outages, especially when message persistence is enabled (for reliability), becomes a problem. Solutions like Hazelcast required us to modify the existing code so as to use the data structures provided by them. This wasn’t feasible because we wanted to minimize the code changes and rather work out a solution on top of the existing code base.

We settled for JGroups because it seemed to address all the concerns:

  • there is no active process required beforehand, but rather, each node either joins an existing communication session, or else, starts one of its own. This means that there’s no single point of failure (SPOF)
  • message reliability, ordering and flow control are all configurable
  • code changes could be contained, because it involved minor modifications.

JGroups is a reliable IP  toolkit that is based upon a flexible protocol stack. IP , or broadcast, implies sending a message to multiple recipients. Traditionally, multicast communication is UDP (User Datagram Protocol) based, and is unreliable. For instance, think of a streaming video from YouTube – where once the streaming starts, little does it matter that a few frames are missed or jagged. Such are typically unreliable UDP based transmissions.

JGroups addresses the shortcomings of traditional IP multicast by adding aspects like reliability and membership to it.

Flexible protocol stack implies that there are options to tailor JGroups behaviour according to the application requirements. For example, aspects like Failure Detection, Encryption, etc., can be configured.

Design

Conceptually, the design was very simple: the caches on each node needed to “talk” to other nodes, whenever a significant operation occurred. Since we were dealing with user-defined entities, these operations mostly involved some kind of update of these entities.

Since each of the node needed a JGroups handle in order to be able to talk, we invoked a local JGroups session on each. Through the concept of groups and membership, a JGroups session looks for other active sessions when it comes alive. Consequently, members belonging to the same group are aware of all the peers. For example:
Suppose node n1 comes up as the very first node, and initializes a JGroups session, creating a group called ProjectXGroup. Any subsequent node, say, n2, n3, also does the same, but in its case, it find an active ProjectXGroup already in place, and hence joins it, rather than creating a new session. Thus, each node (peer) is aware of all the members in that form the group. JGroups, by design, provides failure detection (FD) – that any node that goes down is removed from the list of active members, thus ensuring that at any given point of time, all the group members are aware of the current state of the group.

Any local update of one or more entities required the change to be propagated to all the peers. We identified that this should be a 3-step process. Given an entity EntityX, the following needed to be done on all the nodes:

  • Lock EntityX
  • Update ExtityX
  • Unlock EntityX

We also needed to ensure that failure to perform any of these operations should result in a rollback, which also needed to be taken care of. For any given operation, we subdivided it into smaller tasks. An example of a task could be LockTask, UpdateTask, UnlockTask, etc. It is these tasks that would be the language of communication. Typically a task would have taskID, information about the entities involved, status flag (SCHEDULED, SUCCESSFUL, FAILED..), etc.

JGroups has options to either broadcast a message or unicast it. Since our implementation required that caches on all the peers be consistent, a sender broadcasts a task to all the peers. Each peer, would then perform the task, and report the outcome back to the sender. Continuing with our earlier example, given a task T1 which the involves the following sub-tasks on an entity known as E1:

  1. Lock E1
  2. Update E1
  3. Unlock E1

Let’s consider the first sub-task — Lock E1:
This would require E1 to be locked on all the nodes, and therefore we invoke the LockTask, with information about E1. Now, suppose n2 is a node which is catering to the user at a given point of time, then n2 would first lock E1 locally, and then broadcast this LockTask to the group.  Since it is a broadcast, all the nodes (including n2) would receive and process the LockTask (and lock E1 locally). We can easily identify the sender and not perform the task there (since the task was issued to other nodes only after the sender had performed it). This means that n2 would ignore the task, but n1, n3..nn would need to perform it.

Once the given task is processed, in order to report the outcome back, we set the appropriate status flags in the original Task object and send it back to the sender. This time, however, we set it as a unicast communication (by specifying the recipient), since we know who is this message directed to. What it boils down to in terms of JGroups is: after performing the task n1, n3,…nn would all get the handle to the JGroups session and can use the same LockTask instance to report back the outcome of a given task. Upon receiving responses from all the peers, and if the responses are positive, the sender can then proceed with other operations (sub-tasks) — Update E1, Unlock E1…etc.

Exceptional Scenarios

There are various junctures where the above conversation could fail, and thus, all the exceptional scenarios need to be taken care of. There were several failure scenarios that we handled. To list a few:

  • No response (or response timeout) from one or more peers
  • Failure of subtask in the sender node
  • Failure of subtask in any peer, etc.

Summary

We discussed how JGroups can be an apt solution in situations where reliable distributed communication is necessary. We also saw an example where JGroups was used for enabling intra-node communication for broadcast and unicast services.