If you’re trying to make a Kafka Storm
topology work, and are getting baffled by your recipient topic not receiving any damn thing, here’s the secret:
- The default
org.apache.storm.kafka.bolt.KafkaBolt
implementation expects only a single key field from the upstream (Bolt/Spout
) - If you’re tying your
KafkaBolt
to aKafkaSpout
, you’ve got to use the internal name:str
- However, if you have an upstream Bolt, doing some filtering, then make sure that you tie the name of your ONLY output field (
value
) to theKafkaBolt
Let me break it down a little bit more for the larger good.
Consider a very basic Storm topology where we read raw messages from a Kafka Topic (say, raw_records
), enrich/cleanse them (in a Bolt), and publish these enriched/filtered records on another Kaka Topic (say, filtered_records
).
Given that the final publisher (the guy that talks to filtered_records
) is a KafkaBolt
, it needs a way to find out the relevant key
that the values are available from. And that key
is what you need to specify/detect from the upstream bolt or spout.
So, the declared output field of the upstream Bolt
would be something like:
@Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(new String[]{"output"})); }
Note the key
field named “output“.
Now, in KafkaBolt
the only thing to take care of is using this key
field in the configuration, like so:
KafkaBolt bolt = (new KafkaBolt()).withProducerProperties(newProps(BROKER_URL, OUTPUT_TOPIC)) .withTopicSelector(new DefaultTopicSelector(OUTPUT_TOPIC)) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "output"));
The default key field name is “message
“, so you could as well use the no-arg constructor of FieldNameBasedTupleToKafkaMapper
, by specifying the upstream key as “message
“.
If however, you have scenario where you’d want to pass both the key
and value
from the upstream, for example,
@Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(new String[]{"word","count"})); }
Note that we’ve specified the key field here as “word“.
Then obviously, we need to use this (modified) key name downstream, like so:
KafkaBolt bolt = (new KafkaBolt()).withProducerProperties(newProps(BROKER_URL, OUTPUT_TOPIC)) .withTopicSelector(new DefaultTopicSelector(OUTPUT_TOPIC)) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
Update (2017-08-23): Added the scenario where a modified key name can be used.