A ‘Kafka > Storm > Kafka’ Topology gotcha

If you’re trying to make a Kafka Storm topology work, and are getting baffled by your recipient topic not receiving any damn thing, here’s the secret:

  • The defaultorg.apache.storm.kafka.bolt.KafkaBolt implementation expects only a single key field from the upstream (Bolt/Spout)
  • If you’re tying your KafkaBolt to a KafkaSpout, you’ve got to use the internal name:str
  • However, if you have an upstream Bolt, doing some filtering, then make sure that you tie the name of your ONLY output field (value) to the KafkaBolt

Let me break it down a little bit more for the larger good.

Consider a very basic Storm topology where we read raw messages from a Kafka Topic (say, raw_records), enrich/cleanse them (in a Bolt), and publish these enriched/filtered records on another Kaka Topic (say, filtered_records).

Given that the final publisher (the guy that talks to filtered_records) is a KafkaBolt, it needs a way to find out the relevant key that the values are available from. And that key is what you need to specify/detect from the upstream bolt or spout.

So, the declared output field of the upstream Bolt would be something like:

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields(new String[]{"output"}));
}

Note the key field named “output“.

Now, in KafkaBolt the only thing to take care of is using this key field in the configuration, like so:

KafkaBolt bolt = (new KafkaBolt()).withProducerProperties(newProps(BROKER_URL,
        OUTPUT_TOPIC))
        .withTopicSelector(new DefaultTopicSelector(OUTPUT_TOPIC))
        .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key",
                "output"));

The default key field name is “message“, so you could as well use the no-arg constructor of  FieldNameBasedTupleToKafkaMapper, by specifying the upstream key as “message“.

If however, you have scenario where you’d want to pass both the key and value from the upstream, for example,

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields(new String[]{"word","count"}));
}

Note that we’ve specified the key field here as “word“.

Then obviously, we need to use this (modified) key name downstream, like so:

KafkaBolt bolt = (new KafkaBolt()).withProducerProperties(newProps(BROKER_URL,
        OUTPUT_TOPIC))
        .withTopicSelector(new DefaultTopicSelector(OUTPUT_TOPIC))
        .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word",
                "count"));

Update (2017-08-23): Added the scenario where a modified key name can be used.