Kafka Streams - Spring boot example
test topic 데이터를 consume하고, " "(공백)단위로 구분하여 test2 topic으로 produce하는 것을 목표로 하였다.
방법은 크게 두가지로 시도하였다.
1) Component로 선언하여 stream을 start해주는 방법
2) Autowired 어노테이션을 통해 StreamsBuilder을 주입받는 방법 → 참고: https://www.baeldung.com/spring-boot-kafka-streams
(Topology: https://kafka.apache.org/0102/documentation/streams/core-concepts#streams_topology)
핵심은 Topology를 만드는 것인데,
1번의 경우 Topology를 만들고, stream을 start한 반면,
2번의 경우 Spring boot가 Kafka streams와 관련된 Wrapper와 API를 제공하기 때문에 직접 topology를 작성하지 않아도 된다.
(In essence, Spring Boot provides a very thin wrapper around Streams API while managing the lifecycle of our KStream instance.
It creates and configures the required components for the topology and executes our Streams application.
Importantly, this lets us focus on our core business logic while Spring manages the lifecycle.)
==> 그렇다면 2번의 경우 어떻게 streams.start()를 하지 않고도 streams가 동작되는걸까?
이에 대한 포스팅도 함께 해보았다.
1. Component로 선언하여 stream을 start해주는 방법
@Component
public class KafkaStreamsComponent {
@Bean("kafkaStreamComponent")
public void kafkaStream() throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("test");
source.flatMapValues(value -> Arrays.asList(value.split(" ")))
.to("test2");
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
}
}
2. Autowired 어노테이션을 통해 StreamsBuilder을 주입받는 방법
@Configuration
@EnableKafkaStreams
public class KafkaConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfig() {
Map<String, Object> props = new HashedMap<String, Object>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.40.111.74:30001");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return new KafkaStreamsConfiguration(props);
}
}
@Component
public class KafkaProcessor {
final Serde<String> STRING_SERDE = Serdes.String();
@Autowired
void buildPipeline(StreamsBuilder streamsBuilder) {
KStream<String, String> messageStream = streamsBuilder
.stream("test", Consumed.with(STRING_SERDE, STRING_SERDE));
messageStream.flatMapValues(value -> Arrays.asList(value.split(" ")))
.to("test2");
}
}
결과보기
- producer
$ kubectl exec --tty -i kafka-demo-client --namespace default -- bash
$ kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic test
> This is test
- consumer
$ kubectl exec --tty -i kafka-demo-client --namespace default -- bash
$ kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test2 \
--from-beginning
This
is
test