@EnableKafkaStreams 분석 - 어떻게 streams는 start 되는가?
Kafka streams를 Spring boot기반으로 개발해보면서, 2가지 타입으로 시도해보았다.
1) Component로 선언하여 stream을 start해주는 방법
2) Autowired 어노테이션을 통해 StreamsBuilder을 주입받는 방법
가장 큰 차이는 1번에서는 stream을 직접 start해주었고, 2번은 그럴 필요가 없었다.
코드 부분을 보면 이렇다.
1. Component로 선언하여 stream을 start해주는 방법
@Component
public class KafkaStreamsComponent {
@Bean("kafkaStreamComponent")
public void kafkaStream() throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("test");
source.flatMapValues(value -> Arrays.asList(value.split(" ")))
.to("test2");
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
}
}
2. Autowired 어노테이션을 통해 StreamsBuilder을 주입받는 방법
@Configuration
@EnableKafkaStreams
public class KafkaConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfig() {
Map<String, Object> props = new HashedMap<String, Object>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return new KafkaStreamsConfiguration(props);
}
}
@Component
public class KafkaProcessor {
final Serde<String> STRING_SERDE = Serdes.String();
@Autowired
void buildPipeline(StreamsBuilder streamsBuilder) {
KStream<String, String> messageStream = streamsBuilder
.stream("test", Consumed.with(STRING_SERDE, STRING_SERDE));
messageStream.flatMapValues(value -> Arrays.asList(value.split(" ")))
.to("test2");
}
}
대략 Autowired로 StreamsBuilder를 주입받기 때문이라고 알 수는 있지만, 자세한 코드를 살펴보았다.
어떻게 자동으로 start 되는가??
참고로 같은 의문을 가진 사람이 있었다.
우선 두가지 API를 미리 알아야한다.
1. LifeCycle, SmartLifeCycle (https://meteorkor.tistory.com/97)
- LifeCycle: https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/context/Lifecycle.html
=> A common interface defining methods for start/stop lifecycle control. The typical use case for this is to control asynchronous processing. NOTE: This interface does not imply specific auto-startup semantics. Consider implementing SmartLifecycle for that purpose.
lifecycle control을 start/stop하기 위한 인터페이스이며, 자동으로 start해주지는 않기때문에 자동 start를 위해서는 SmartLifecycle을 구현해야한다.
- SmartLifeCycle: https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/context/SmartLifecycle.html
=> An extension of the Lifecycle interface for those objects that require to be started upon ApplicationContext refresh and/or shutdown in a particular order.
ApplicationConext가 refresh/shutdown에 따라 start된다.
정리하자면, 서버의 기동/다운과 함께 LifeCycle을 가져간다면 SmartLifeCycle을 고려해야한다.
2. AbstractFactoryBean
=> Simple template superclass for FactoryBean implementations that creates a singleton or a prototype object, depending on a flag.
FactoryBean을 위한 superclass로, singleton 또는 prototype object를 생성할 수 있다.
이제 순서대로 파고들어가보자.
1. KafkaConfig.java에서 KafkaStreamsDefaultConfiguration를 Bean으로 등록하고있다.
해당 API를 확인해보면 StreamsBuilderFactoryBean을 리턴하여 Bean으로 등록한다.
@Configuration(proxyBeanMethods = false)
public class KafkaStreamsDefaultConfiguration {
...
@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(
...
StreamsBuilderFactoryBean fb = new StreamsBuilderFactoryBean(streamsConfig);
...
}
2. StreamsBuilderFactoryBean은 AbstractFactoryBean을 상속받고 있으며, SmartLifecycle을 구현하고 있다.
- AbstractFactoryBean에서 추상메서드로 작성된 createInstance()를 구현하여 StreamsBuilder를 생성한다.
해당 StreamBuilder를 KafkaProcessor.java에서 @Autowired하여 사용하고 있다.
- SmartLifecycle의 start를 구현하여 topology를 생성하고, kafkaStreams를 start하는 것을 볼 수 있다.
public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilder>
implements SmartLifecycle, BeanNameAware {
...
@Override
protected synchronized StreamsBuilder createInstance() {
if (this.autoStartup) {
Assert.state(this.properties != null,
"streams configuration properties must not be null");
}
StreamsBuilder builder = new StreamsBuilder();
this.infrastructureCustomizer.configureBuilder(builder);
return builder;
}
...
@Override
public synchronized void start() {
if (!this.running) {
try {
Assert.state(this.properties != null,
"streams configuration properties must not be null");
Topology topology = getObject().build(this.properties); // NOSONAR: getObject() cannot return null
this.infrastructureCustomizer.configureTopology(topology);
this.topology = topology;
LOGGER.debug(() -> topology.describe().toString());
this.kafkaStreams = new KafkaStreams(topology, this.properties, this.clientSupplier);
this.kafkaStreams.setStateListener(this.stateListener);
this.kafkaStreams.setGlobalStateRestoreListener(this.stateRestoreListener);
this.kafkaStreams.setUncaughtExceptionHandler(this.uncaughtExceptionHandler);
if (this.kafkaStreamsCustomizer != null) {
this.kafkaStreamsCustomizer.customize(this.kafkaStreams);
}
if (this.cleanupConfig.cleanupOnStart()) {
this.kafkaStreams.cleanUp();
}
this.kafkaStreams.start();
for (Listener listener : this.listeners) {
listener.streamsAdded(this.beanName, this.kafkaStreams);
}
this.running = true;
}
catch (Exception e) {
throw new KafkaException("Could not start stream: ", e);
}
}
}
...
}