Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
Tags
- hibe
- 리사이클러뷰 멀티뷰
- 로또 등수 알고리즘
- 리사이클러뷰
- multiview
- mysql multi-row insert
- oauth 로그인
- 로또 등수 코드
- java
- 스프링 오어스
- android studio
- Androoid Studio
- 안드로이드 스튜디오
- 스프링 환경변수 설정
- spring 채팅방
- springboot
- 스프링 소셜 로그인
- 쿠버네티스
- 쿠버네티스 #fabric8
- 멀티뷰
- 스프링 시큐리티 없이
- 스프링 환경변수
- jpa 최적화
- 채팅방 구현
- jpa bulk insert
- 뷰 페이징
- .env
- 중간 테이블 엔티티 최적화
- 로또 앱 만들기
- jpa dto 매핑
Archives
- Today
- Total
야미의 개발
[Spring/SpringBoot] Kafka docker-compose.yml 설정하기, 스프링에서 카프카 도커로 사용하기 본문
스프링/웹 개발
[Spring/SpringBoot] Kafka docker-compose.yml 설정하기, 스프링에서 카프카 도커로 사용하기
채야미 2024. 12. 17. 22:33
주키퍼 하나에 3개의 카프카 브로커를 두는 설정입니다.
뿐만 아니라 kafka-ui도 함께 설정해서 8989 포트로 모니터링 할 수 있습니다.
docker-compose.yml
version: '3.8'
services:
zookeeper-1:
image: confluentinc/cp-zookeeper:latest
ports:
- '32181:32181'
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
kafka-1:
image: confluentinc/cp-kafka:latest
ports:
- '9092:9092'
depends_on:
- zookeeper-1
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
kafka-2:
image: confluentinc/cp-kafka:latest
ports:
- '9093:9093'
depends_on:
- zookeeper-1
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29093,EXTERNAL://localhost:9093
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
kafka-3:
image: confluentinc/cp-kafka:latest
ports:
- '9094:9094'
depends_on:
- zookeeper-1
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29094,EXTERNAL://localhost:9094
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "8989:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-1:29092,kafka-2:29093,kafka-3:29094
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper-1:22181
클래스 설정
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public NewTopic chatTopic() {
return TopicBuilder.name("chatting-topic").partitions(2).replicas(1).build();
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094"); // Kafka 서버 주소
props.put(ConsumerConfig.GROUP_ID_CONFIG, "chat-group"); // 컨슈머 그룹 ID
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 최신 메시지부터 소비하거나 가장 처음부터 소비하는 설정
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(2); // 병렬로 실행될 컨슈머의 수
factory.setBatchListener(false); // 메시지 일괄 처리 비활성화
return factory;
}
}
도커설정에 동작시킨 로컬호스트 포트에 맞춰서 BOOTSTRAP_SERVERS_CONFIG를 설정하면됩니다.
제가 올린 파일은 컨슈머와 프로듀서 config를 따로 두었지만 하나의 KafkaConfig 클래스 안에 두어도 무방합니다.
'스프링 > 웹 개발' 카테고리의 다른 글
[Spring/SpringBoot] SpringSecurity없이 OAuth 구글 소셜로그인 직접 구현하기 (0) | 2024.12.23 |
---|---|
[Spring/SpringBoot] .env 파일 플러그인 없이 읽기 (0) | 2024.12.17 |
[Spring/SpringBoot] STOMP + SpringSecurity 에서 Principal이 null 일때 (WebSocketAuthInterceptor) (1) | 2024.12.17 |
[Spring/Spring Boot] @PathVariable로 Enum 사용하기: String to Enum 변환 및 예외 처리 - 소셜로그인 구현 (0) | 2024.07.06 |
[Spring/SpringBoot] Mock 사용해보기(1) - Mockito란? (0) | 2024.06.12 |
Comments