Spring

@EnableKafkaStreams 분석 - 어떻게 streams는 start 되는가?

gajy 2022. 5. 17. 16:29
728x90

Kafka streams를 Spring boot기반으로 개발해보면서, 2가지 타입으로 시도해보았다.

 

1) Component로 선언하여 stream을 start해주는 방법

2) Autowired 어노테이션을 통해 StreamsBuilder을 주입받는 방법 

 

 

 

가장 큰 차이는 1번에서는 stream을 직접 start해주었고, 2번은 그럴 필요가 없었다.

코드 부분을 보면 이렇다.

1. Component로 선언하여 stream을 start해주는 방법

@Component
public class KafkaStreamsComponent {

    @Bean("kafkaStreamComponent")
    public void kafkaStream() throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        
        final StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("test");
        source.flatMapValues(value -> Arrays.asList(value.split(" ")))
            .to("test2");
        
        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();
    }

}

2. Autowired 어노테이션을 통해 StreamsBuilder을 주입받는 방법

@Configuration
@EnableKafkaStreams
public class KafkaConfig {
    
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfig() {
        Map<String, Object> props = new HashedMap<String, Object>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        return new KafkaStreamsConfiguration(props);
        
    }
}
@Component
public class KafkaProcessor {
    
    final Serde<String> STRING_SERDE = Serdes.String();
    
    @Autowired
    void buildPipeline(StreamsBuilder streamsBuilder) {
        KStream<String, String> messageStream = streamsBuilder
          .stream("test", Consumed.with(STRING_SERDE, STRING_SERDE));
        messageStream.flatMapValues(value -> Arrays.asList(value.split(" ")))
        .to("test2");
    }
}

대략 Autowired로 StreamsBuilder를 주입받기 때문이라고 알 수는 있지만, 자세한 코드를 살펴보았다.

어떻게 자동으로 start 되는가??

참고로 같은 의문을 가진 사람이 있었다.

-> https://stackoverflow.com/questions/71015916/why-to-use-autowired-to-run-kafka-streams-topology-in-a-spring-boot-application

 


우선 두가지 API를 미리 알아야한다.

1. LifeCycle, SmartLifeCycle (https://meteorkor.tistory.com/97)

- LifeCycle: https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/context/Lifecycle.html

=> A common interface defining methods for start/stop lifecycle control. The typical use case for this is to control asynchronous processing. NOTE: This interface does not imply specific auto-startup semantics. Consider implementing SmartLifecycle for that purpose.

lifecycle control을 start/stop하기 위한 인터페이스이며, 자동으로 start해주지는 않기때문에 자동 start를 위해서는 SmartLifecycle을 구현해야한다.

- SmartLifeCycle: https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/context/SmartLifecycle.html

=> An extension of the Lifecycle interface for those objects that require to be started upon ApplicationContext refresh and/or shutdown in a particular order.

ApplicationConext가 refresh/shutdown에 따라 start된다.

 

정리하자면, 서버의 기동/다운과 함께 LifeCycle을 가져간다면 SmartLifeCycle을 고려해야한다.

 

 

 

2. AbstractFactoryBean

https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/beans/factory/config/AbstractFactoryBean.html

=> Simple template superclass for FactoryBean implementations that creates a singleton or a prototype object, depending on a flag.

FactoryBean을 위한 superclass로, singleton 또는 prototype object를 생성할 수 있다.

 

 


이제 순서대로 파고들어가보자.

1. KafkaConfig.java에서 KafkaStreamsDefaultConfiguration를 Bean으로 등록하고있다.

해당 API를 확인해보면 StreamsBuilderFactoryBean을 리턴하여 Bean으로 등록한다.

@Configuration(proxyBeanMethods = false)
public class KafkaStreamsDefaultConfiguration {
...
@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
	public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(
		...
			StreamsBuilderFactoryBean fb = new StreamsBuilderFactoryBean(streamsConfig);
...
}

2. StreamsBuilderFactoryBean은 AbstractFactoryBean을 상속받고 있으며, SmartLifecycle을 구현하고 있다.

- AbstractFactoryBean에서 추상메서드로 작성된 createInstance()를 구현하여 StreamsBuilder를 생성한다.

해당 StreamBuilder를 KafkaProcessor.java에서 @Autowired하여 사용하고 있다.

- SmartLifecycle의 start를 구현하여 topology를 생성하고, kafkaStreams를 start하는 것을 볼 수 있다.

public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilder>
		implements SmartLifecycle, BeanNameAware {
    ...
	@Override
	protected synchronized StreamsBuilder createInstance() {
		if (this.autoStartup) {
			Assert.state(this.properties != null,
					"streams configuration properties must not be null");
		}
		StreamsBuilder builder = new StreamsBuilder();
		this.infrastructureCustomizer.configureBuilder(builder);
		return builder;
	}
    ...
	@Override
	public synchronized void start() {
		if (!this.running) {
			try {
				Assert.state(this.properties != null,
						"streams configuration properties must not be null");
				Topology topology = getObject().build(this.properties); // NOSONAR: getObject() cannot return null
				this.infrastructureCustomizer.configureTopology(topology);
				this.topology = topology;
				LOGGER.debug(() -> topology.describe().toString());
				this.kafkaStreams = new KafkaStreams(topology, this.properties, this.clientSupplier);
				this.kafkaStreams.setStateListener(this.stateListener);
				this.kafkaStreams.setGlobalStateRestoreListener(this.stateRestoreListener);
				this.kafkaStreams.setUncaughtExceptionHandler(this.uncaughtExceptionHandler);
				if (this.kafkaStreamsCustomizer != null) {
					this.kafkaStreamsCustomizer.customize(this.kafkaStreams);
				}
				if (this.cleanupConfig.cleanupOnStart()) {
					this.kafkaStreams.cleanUp();
				}
				this.kafkaStreams.start();
				for (Listener listener : this.listeners) {
					listener.streamsAdded(this.beanName, this.kafkaStreams);
				}
				this.running = true;
			}
			catch (Exception e) {
				throw new KafkaException("Could not start stream: ", e);
			}
		}
	}
    ...
}

 

728x90