안녕하세요, 절박한 개발자입니다. 이번 글에서는 카프카(Kafka)를 Spring Boot 프로젝트에서 어떻게 사용하는지 간단히 기록해두려고 합니다. 가독성이 떨어질 수 있으니 양해 부탁드립니다.
이번 글을 통해 카프카를 Spring Boot와 함께 사용하는 방법을 공유하고자 합니다.
카프카 With Spring Boot
Spring Boot에서 카프카를 사용하기 위해선 spring-kafka 의존성을 프로젝트에 추가해야 합니다. build.gradle 또는 pom.xml 파일에 아래와 같이 의존성을 추가해 주세요.
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
카프카를 사용하려면 카프카 서버의 정보를 설정해야 합니다. application.yml 또는 application.properties에 카프카 서버의 정보를 설정합니다. 다음과 같이 카프카 서버와 컨슈머 그룹을 설정할 수 있습니다.
kafka:
bootstrap-servers: j12c209.p.ssafy.io:9092
consumer:
group-id: "used-product"
- bootstrap-severs : 내가 연결한 카프카 서버 주소
- group - id : 해당 컨슈머 그룹 설정
그 다음 토픽을 Java 코드로 생성해봅시다.
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic createPaymentChargedTopic() {
return new NewTopic("used-product-sold", 3, (short) 2);
}
}
NewTopic을 사용하여 생성합니다. 토픽 이름, 파티션 갯수, 레플리카 갯수입니다.
프로듀서 설정
카프카의 메시지를 보내는 역할을 하는 프로듀서를 설정해야 합니다. KafkaTemplate을 이용하여 메시지를 전송할 수 있습니다. 메시지가 성공적으로 전송되면 로그로 기록하고, 실패 시 에러 로그를 기록하도록 설정해봅니다.
package com.awoo.usedproduct.infra.kafka;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
@Component
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void sendKafkaMessage(KafkaTopic topic, Object value) {
CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic.getTopicName(), value);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(result);
} else {
handleFailure(ex);
}
});
}
private void handleSuccess(SendResult<String, Object> result) {
log.info("메시지 전송 성공: topic={}, partition={}, offset={}, key={}, value={}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset(),
result.getProducerRecord().key(),
result.getProducerRecord().value()
);
}
private void handleFailure(Throwable ex) {
log.error("메시지 전송 실패: {}", ex.getMessage(), ex);
}
}
이 코드에서 sendKafkaMessage 메서드는 KafkaTemplate을 통해 메시지를 보내고, whenComplete를 사용해 성공/실패에 따라 다른 처리를 합니다. 성공 시 메시지의 topic, partition, offset 등의 정보를 로그로 출력합니다.
컨슈머 설정
카프카에서 메시지를 소비하는 컨슈머 설정입니다. @KafkaListener 어노테이션을 사용하여 특정 토픽을 리슨하고, 메시지를 처리합니다.
package com.awoo.payment.ui.listener;
import com.awoo.payment.application.SafePayService;
import com.awoo.payment.application.command.ConfirmSafeTransactionCommand;
import com.awoo.payment.ui.listener.event.UsedProductSoldOutBySafeEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {
private final SafePayService safePayService;
@KafkaListener(topics = "used-product-safe-sold")
public void processUsedProductSafeSold(UsedProductSoldOutBySafeEvent event) {
log.info("Received event: {}", event.toString());
ConfirmSafeTransactionCommand command = ConfirmSafeTransactionCommand.builder()
.usedProductId(event.usedProductId())
.price(event.price())
.buyerId(event.buyerId())
.sellerId(event.sellerId())
.build();
safePayService.confirmSafeTransaction(command);
}
}
카프카를 Spring Boot와 함께 사용할 때, 메시지를 어떻게 전송할지 결정하는 것은 중요한 부분입니다. 카프카에서 메시지를 전송하려면 producer 설정을 통해 어떤 직렬화 방식을 사용할지 선택해야 합니다.
kafka:
bootstrap-servers: j12c209.p.ssafy.io:9092
consumer:
group-id: "used-product"
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
이렇게 초간단 KAFKA 사용법을 알려드렸습니다. 아무래도 저도 처음 카프카를 쓰다 보니 어색한 내용이 많았을꺼라 생각합니다.
제가 더욱 공부하고 새로운 카프카 사용버을 발표해보도록 하겠습니다..
참고 :
https://spring.io/projects/spring-kafka
Spring for Apache Kafka
The Spring for Apache Kafka (spring-kafka) project applies core Spring concepts to the development of Kafka-based messaging solutions. It provides a "template" as a high-level abstraction for sending messages. It also provides support for Message-driven PO
spring.io
'Spring' 카테고리의 다른 글
[Spring] Spring @Configuration 란? (0) | 2025.04.22 |
---|---|
[해결 방안] Spring STOMP content-length 초과 에러 해결하기 (0) | 2025.03.31 |
[해결 방안] Spring Gateway를 통해 Stomp를 설정했을때 헤더가 두개오는 문제 해결방안 (1) | 2025.03.31 |
[Spring] MongoDB와 JPA Repository 충돌 해결 (0) | 2025.02.09 |
[개선사항] Spring Events을 활용하여 결합도 낮추기 (0) | 2024.12.23 |