본문 바로가기
Kafka

Kafka와 MongoDB를 연동하는 자바 애플리케이션 구축 가이드

by 떡쇠 2024. 2. 10.
반응형

이 글에서는 프로듀서 애플리케이션에서 카프카 토픽에 메시지를 전송하고 컨슈머는 메시지를 MongoDB에 적재하는 과정을 실습한다.

 

Docker는 리눅스를 기반으로 하기때문에 윈도우 환경에서 리눅스를 사용할 수 있도록 도와주는 WSL2 ( Windows Subsystem for Linux 2 )를 활성화하고 리눅스 우분투와 도커를 설치 이후 과정을 실습했다.

1. Docker-Compose.yml 작성

도커 컨테이너에 카프카와 주키퍼를 실행하기 위해 Docker-Compose.yml 을 작성한다.

version: '3.8'
services:
  zookeeper:
    image: wurstmeister/zookeeper:latest
    ports:
      - "2181:2181" # 주키퍼의 기본 포트 2181
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - "9092:9092" # 카프카의 기본 포트 9092
    environment:  # WSL 2 환경의 IP를 kafka 문자열에 매핑하여 사용했다. 아이피를 직접 입력해도 됨. 
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://your_wsl2_ip:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zookeeper

 

  • KAFKA_ADVERTISED_LISTENERS: INSIDE와 OUTSIDE라는 두 리스너를 정의한다. 이 환경 변수는 클라이언트에게 노출할 브로커의 주소를 지정하는데 INSIDE는 도커 내부 네트워크에서 사용되며, OUTSIDE는  외부에서 브로커에 접근할 수 있도록(WSL2 환경이므로 호스트에서 접근) 사용된다.  INSIDE://kafka:9092의 kafka는 로컬 컴퓨터에서 WSL 2에 설치한 카프카 클러스터와 통신할 때 설정한 IP를 사용자 지정 문자로 매핑하여 통신한다. 
    Windows/System32/drivers/etc/hosts 파일에 아래 줄 추가
    your_wsl2_ip kafka
  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 각 리스너의 보안 프로토콜을 정의한다. 리스너는 카프카에서 클라리언트와 브로커 간의 통신을 수신하고 처리하는 역할을 하는데 여기서는 모두 PLAINTEXT로 설정한다.  (카프카 클러스터 내에서 클라이언트와 브로커 간의 통신을 암호화하지 않는 보안 프로토콜)
  • KAFKA_LISTENERS: 카프카가 수신할 주소와 포트를 정의한다. INSIDE 리스너는 카프카 클러스터 내부 통신용이며, OUTSIDE 리스너는 외부(윈도우 호스트 포함)에서의 접근을 위해 다른 포트(9094)를 사용한다.
  • KAFKA_INTER_BROKER_LISTENER_NAME: 카프카 클러스터 내에서 브로커 간 통신을 위한 리스너의 이름을 지정한다.
  • KAFKA_ZOOKEEPER_CONNECT: 카프카 브로커가 주키퍼 서버에 연결하는 데 사용되는 주키퍼 연결 문자열을 지정한다.

리눅스에서 docker-compose.yml 작성


2. Docker-Compose.yml를 통한 컨테이너 생성

도커 컴포즈 파일이 존재하는 디렉토리에서 아래 명령어를 실행하면 파일 내용의 설정을 기반으로 주키퍼와 카프카 컨테이너를 생성 후 실행한다.

docker-compose up -d  // -d 옵션으로 백그라운드에서 실행한다.

카프카, 주키퍼 컨테이너 생성 및 실행


3. 도커 컨테이너 상태 확인

토픽 생성 전 카프카 컨테이너 내부로 접근하기 위해 아래 명령어로 도커 컨테이너의 실행 상태를 확인한다.

docker ps

 

컨테이너 생성 후 실행이 됐다면 STATUS에 Up 으로 출력이 된다

실행중인 컨테이너들의 상태

카프카의 IP가 0.0.0.0:9092인 것은 카프카 브로커가 모든 네트워크 인터페이스에서 클라이언트의 연결을 수신할 수 있음을 의미한다. 해당 서버의 모든 네트워크 인터페이스(예: 로컬 호스트, 외부 IP 등)에서 클라이언트의 연결을 수신할 수 있다는 것이다.


4. 카프카 토픽 생성

카프카 클러스터에 토픽을 생성하기 위해 아래 명령어로 카프카 컨테이너 내부에 접근한다.

docker exec -it <container_id_or_name> 892fe61e8e4a /bin/sh

 

카프카 내부에 접근 후 아래 명령어로 토픽을 생성한다.

 kafka-topics.sh --create --bootstrap-server your_broker_ip:9092 --replication-factor 1 --partitions 1 --topic myTopic
  • kafka-topics.sh: 이 명령어는 카프카 토픽 관리 도구를 실행하는 스크립트이다. 이를 사용하여 카프카 토픽을 생성하거나 관리할 수 있습니다.
  • --create: 이 옵션은 새로운 토픽을 생성한다는 것을 나타낸다.
  • --bootstrap-server: 이 옵션은 토픽을 생성할 때 사용할 카프카 클러스터의 부트스트랩 서버를 지정한다. 브로커의 ip와 포트를 입력한다.
  • --replication-factor: 이 옵션은 토픽의 복제 팩터를 지정한다. 복제 팩터는 토픽의 각 파티션을 복제할 브로커의 수를 의미한다. 이 경우 복제 팩터가 1이므로 각 파티션은 하나의 브로커에만 저장된다.
  • --partitions: 이 옵션은 토픽의 파티션 수를 지정한다. 파티션은 토픽의 데이터를 병렬로 처리하기 위해 사용된다. 이 경우 파티션 수가 1이므로 토픽은 하나의 파티션으로만 구성된다.
  • --topic: 이 옵션은 생성할 토픽의 이름을 지정한다. 여기서 myTopic은 새로 생성될 토픽의 이름이다.

 

토픽 생성 완료

 


5. 카프카 토픽에 메시지 전송

토픽에 메시지를 전송을 하기 위해 프로듀서 애플리케이션을 Rest API를 개발한다.

 

application.yml 

server:
  port: 9090

spring:
  kafka:
    producer:
      bootstrap-servers: kafka:9092 # WSL 2 환경의 IP를 kafka 문자열에 매핑하여 사용했다. 아이피를 직접 입력해도 됨. 
      acks: all 
      key-serializer: org.apache.kafka.common.serialization.StringSerializer 
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer 
      
      
      ## 필수옵션 ## 
      # bootstrap-servers: 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름:포트를 1개 이상 작성한다.
      # key-serializer: 레코드의 메시지 키를 직렬화하는 클래스를 지정한다.
      # value-serializer: 레코드의 메시지 값을 직렬화하는 클래스를 지정한다.
      
      ## 선택 옵션 ##
      # acks: 프로듀서가 전송한 데이터가 브로커들에 정상적으로 저장되었는지 전송 성공 여부를 확인하는데 사용하는 옵션.
      #		  0, 1, -1(all)중 하나로 설정할 수 있다. 설정 값에 따라 데이터의 유실 가능성이 달라진다.
      #       1: 리더 파티션에 데이터가 저장되면 전송 성공으로 판단한다. (기본 값 1)
      #       0: 프로듀서가 전송한 즉시 브로커에 데이터 저장 여부와 상관 없이 성공으로 판단한다.
      #      -1: 토픽의 min.insync.replicas 개수에 해당하는 리더 파티션과 팔로워 파티션에 데이터가 저장되면 성공하는 것으로 판단한다.     
      # buffer.memory: 브로커가 전송할 데이터를 배치로 모으기 위해 설정할 버퍼 메로리 양을 지정한다. (기본 값 32MB)
      # retries: 프로듀서가 브로커로부터 에러를 받고 난 뒤 재전송을 시도하는 횟수를 지정한다. (기본 값 2147483647)
      # batch.size: 배치로 전송할 레코드 최대 용량을 지정한다. (기본 값 16384)
      #				너무 작게 설정하면 프로듀서가 브로커로 더 자주보내기 때문에 네트워크 부담이 있고 너무 크면 메모리를 많이 사용하게 된다.
      # linger.ms: 배치를 전송하기 전까지 기다리는 최소 시간이다. (기본 값 0)
      # partitioner.class: 레코드를 파티션에 전송할 때 적용하는 파티셔너 클래스를 지정한다. (기본 값 org.apache.kafka.clients.producer.internals.DefaultPartitioner)
      # enable.idemotence: 멱등성 프로듀서로 동작할지 여부를 설정한다. (기본 값 false)
      # transactional.id: 프로듀서가 레코드를 전송할 때 레코드를 트랜잭션 단위로 묶을지 여부를 설정한다. (기본 값 null)
      #					  프로듀서의 고유한 트랜잭션 아이디를 설정할 수 있다. 이 값을 설정하면 트랜잭션 프로듀서로 동작한다.

 

Controller

클라이언트로부터 /api/messages/send 엔드포인트로 POST 요청을 받으면 그 요청의 body에 포함된 메시지(MessageDto 객체)를 Kafka로 전송하기 위해 KafkaProducerService의 sendMessage 메서드를 호출한다.

 

package com.theyim.kafka.producer.Controlller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.theyim.kafka.producer.dto.MessageDto;
import com.theyim.kafka.producer.service.KafkaProducerService;

@RestController
@RequestMapping("/api/messages")
public class MessageController {

	@Autowired
	private KafkaProducerService kafkaProducerService;
	
	@PostMapping("/send")
	public void sendMessage(@RequestBody MessageDto message) {
		kafkaProducerService.sendMessage(message);
	}
}

 

 

Service

MessageDto 객체를 Kafka의 지정된 토픽("my-topic")으로 전송하기 위해 KafkaTemplate는 메시지를 비동기로 보낼 수 있는 기능을 제공하는 클래스다. 해당 클래스의 send() 메서드를 사용하여 Kafka의 "my-topic" 토픽으로 MessageDto 객체를 전송한다.

package com.theyim.kafka.producer.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import com.theyim.kafka.producer.dto.MessageDto;

@Service
public class KafkaProducerService {
	
	private static String TOPIC_NAME = "my-topic";
	
	@Autowired
	private KafkaTemplate<String, Object> kafkaTemplate;

	public void sendMessage(MessageDto messageDto) {	
		this.kafkaTemplate.send("my-topic", messageDto);
	}
}

 

 

Postman에서 요청을 보낸 후 아래 명령어로 토픽에 전송된 메시지를 확인한다.

kafka-console-consumer.sh --bootstrap-server kafka_broker_ip:9092 --topic my-topic --from-beginning

토픽에 전송된 메시지


6. 컨슈머 애플리케이션 개발

application.yml 

프로듀서 애플리케이션의 application.yml 파일에 consumer를 추가한다.

server:
  port: 9090

spring:
  kafka:
    producer: 
      bootstrap-servers: kafka:9092 # WSL 2 환경의 IP를 kafka 문자열에 매핑하여 사용했다. 아이피를 직접 입력해도 됨. 
      acks: all
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # 메시지 키 직렬화 방식
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 메시지 값 직렬화 방식
    
    consumer:
      bootstrap-servers: kafka:9092 
      group-id: my-consumer-group
      auto-offset-reset: earliest
    properties:
      schema.registry.url: http://kafka:8081
  data:
    mongodb:
      uri: mongodb://localhost:27017/mydatabase # 데이터베이스가 없는 경우 자동으로 생성해준다.

 

Model

MongoDB에 저장할 메시지 객체를 정의한다.

package com.theyim.kafka.consumer.model;

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

import lombok.Data;

@Document 
@Data
public class Message {
	
	@Id
	private String id;
	private String name;
	private int age;

}

 

Service

@KafkaListener 어노테이션은 스프링 프레임워크의 일부인 Spring Kafka에서 제공하는 어노테이션으로 특정 Kafka 토픽의 메시지를 자동으로 수신하여 처리할 수 있게 해준다. 메시지가 해당 토픽에 도착할 때마다 스프링이 자동으로 해당 메서드를 호출하여 메시지를 처리할 수 있다.

package com.theyim.kafka.consumer.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.theyim.kafka.consumer.model.Message;
import com.theyim.kafka.consumer.repository.KafkaConsumerRepository;
import lombok.extern.slf4j.Slf4j;

@Service
@Slf4j
public class KafkaConsumerService {
	
	@Autowired
	private KafkaConsumerRepository kafkaConsumerRepository;

	private final ObjectMapper objectMapper = new ObjectMapper();
	
    // 카프카 클러스터의 토픽 이름과 application.yml의 consumer에 정의된 group-id 입력
	@KafkaListener(topics = "my-topic", groupId = "my-consumer-group")
	public void lister(String message) {

		try {
			Message msg = objectMapper.readValue(message, Message.class); // JSON 데이터를 Java 객체로 역직렬화
			kafkaConsumerRepository.save(msg);	
			log.info("성공 " + message);
			
		} catch (Exception e) {
			log.error("실패 " + message);
		}
	}

}

 

Repository

MongoDB와 연동하기 위해 MongoRepository를 상속받고 값의 타입과 key의 자료형의 제네릭을 지정한다.

package com.theyim.kafka.consumer.repository;

import org.springframework.data.mongodb.repository.MongoRepository;
import com.theyim.kafka.consumer.model.Message;

public interface KafkaConsumerRepository extends MongoRepository<Message, Object> {

}

 


7. 카프카 토픽과  MongDB 메시지 적재 확인

프로듀서에서 전송한 카프카 토픽(토픽명: my-topic)에 실시간으로 메시지가 적재된다.

 

토픽에 메시지가 적재될 때마다 컨슈머가 자동으로 메서드를 호출하여 메시지를 MongoDB에 적재한다.

 

 

반응형

'Kafka' 카테고리의 다른 글

consumer와 partition 의 비율  (0) 2024.02.17
카프카(Kafka)의 기본 개념  (0) 2024.02.11