채팅 서비스 구현하기(NoSQL, WebSocket, Kafka)
미흡한 부분이 있다면 언제든 댓글에 남겨주세요😊 수정보완하겠습니다!
앞으로 진행될 순서는 다음과 같습니다.
- 왜 이 기능을 구현했나요?
- 왜 이 기술을 사용했나요?
- 어떻게 구현했나요?
- 더 알아보기
1. 왜 이 기능을 구현했나요?
저희 프로젝트 주제는 농산물 직거래 서비스였습니다. 직거래를 위해서 채팅 기능이 필수였습니다.
당근마켓처럼 채팅으로 거래하는 방식으로 구현하고자 하였고, 채팅기능은 카카오톡 형식으로 만들었습니다.
2. 왜 이 기술을 사용했나요?
라이브채팅 플랫폼 구현기 1탄 : 개발 언어 및 기반기술 조사
이 글을 참고하여 기술을 선정하였습니다.
(현업에서는 이렇게 꼼꼼하게 기술을 도입하는구나 알게되었다)
NoSQL
RDBMS VS NoSQL
면접 단골 질문인 RDBMS와 NoSQL 차이에 대해 알아보면
- 데이터 모델:
- MySQL: 관계형 데이터베이스 시스템으로, 테이블과 스키마를 사용하여 데이터를 구조화합니다.
- MongoDB: NoSQL 데이터베이스 시스템으로, 컬렉션과 문서를 사용하여 데이터를 저장합니다. 문서는 유연한 JSON 형식의 데이터 구조를 가지고 있습니다.
- 스키마와 유연성:
- MySQL: 정적인 스키마를 가지고 있어 데이터 구조를 미리 정의해야 합니다. 엄격한 데이터 일관성을 제공합니다.
- MongoDB: 동적 스키마를 사용하여 데이터 구조를 런타임에 정의할 수 있습니다. 이는 더 큰 유연성을 제공하며, 데이터 모델의 변경이 용이합니다.
- 쿼리 언어:
- MySQL: SQL(Structured Query Language)을 사용하여 데이터를 조회하고 조작합니다.
- MongoDB: JavaScript 문법을 기반으로 한 Query Language를 사용합니다. JSON 형식으로 데이터를 쿼리하고 조작합니다.
- 성능 및 확장성:
- MySQL: 수직적 확장이 주로 사용되며, 성능은 하드웨어 업그레이드로 개선됩니다.
- MongoDB: 수평적 확장이 가능하며, 여러 서버에 데이터를 분산하여 처리하므로 대규모 확장이 용이합니다.
- 데이터 관계성:
- MySQL: 관계형 데이터베이스이기 때문에 복잡한 관계를 표현하는 데 용이합니다.
- MongoDB: NoSQL 데이터베이스로, 관계를 중시하지 않으며 중복 데이터를 포함하여 읽기 속도를 높일 수 있습니다.
저희는 둘 다 사용했습니다. 채팅방 생성에는 MySQL을 사용했고, 채팅메시지는 MongoDB에 저장하였습니다. 채팅방 생성에 MySQL을 사용한 이유는 관계형 데이터의 효율적인 처리와 트랜잭션 일관성을 확보하기 위해, 채팅 메시지는 MongoDB에 저장하여 유연한 비정형 데이터와 실시간 처리를 지원하기 위해 선택했습니다.
WebSocket
HTTP Polling VS WebSocket
HTTP Polling과 WebSocket은 둘 다 클라이언트와 서버 간의 양방향 통신을 지원하는 웹 애플리케이션에서 사용되는 두 가지 다른 통신 프로토콜입니다. 각각의 장단점을 살펴보고, 왜 WebSocket을 선호하는지에 대한 이유를 알아보겠습니다.
HTTP Polling:
장점
- 호환성: 대부분의 환경에서 HTTP가 지원되므로 HTTP Polling은 거의 모든 클라이언트와 서버에서 사용 가능합니다.
- 간단함: HTTP Polling은 기본적인 HTTP 프로토콜을 사용하므로 구현이 간단하고 이해하기 쉽습니다.
단점:
- 지연: 주기적으로 서버에 요청을 보내야 하므로 실시간 통신에서는 지연이 발생할 수 있습니다.
- 불필요한 트래픽: 주기적인 폴링으로 인해 불필요한 네트워크 트래픽이 발생할 수 있습니다.
WebSocket:
장점:
- 실시간 통신: WebSocket은 양방향 통신을 지원하여 실시간으로 데이터를 전송할 수 있습니다.
- 낮은 지연: Polling과 달리 서버와 지속적으로 연결을 유지하고 있기 때문에 지연이 낮습니다.
- 효율적인 네트워크 사용: 계속해서 연결을 유지하므로 불필요한 트래픽이 감소하고 효율적인 네트워크 사용이 가능합니다.
단점:
- 호환성: 일부 환경에서 WebSocket이 지원되지 않을 수 있습니다. 특히, 방화벽이나 프록시 서버에서 WebSocket 트래픽을 차단하는 경우가 있습니다.
- 복잡성: 구현 및 유지 관리가 Polling보다 더 복잡할 수 있습니다.
WebSocket 선택 이유:
- 실시간 통신 필요성: WebSocket은 실시간 통신이 필요한 상황에서 효과적입니다.
- 낮은 지연: WebSocket은 지속적인 연결로 인해 지연이 낮아 사용자 경험이 향상됩니다.
- 효율적인 네트워크 사용: Polling과 비교하여 불필요한 트래픽이 감소하므로 네트워크 자원을 효율적으로 사용할 수 있습니다.
Kafka
kafka의 경우 매칭 서비스 구현하기(RabbitMQ, Unit Test) 에서 비교했기 때문에 선택한 이유만 작성하였습니다.
- 실시간 처리: 카프카는 실시간 데이터 스트림 처리에 특화되어 있어, 메시지가 실시간으로 전달되고 처리될 수 있습니다. 이는 채팅 시스템에서 사용자 간의 실시간 메시지 전송과 수신을 가능하게 합니다.
- 데이터 유실 방지: 카프카는 메시지를 영구적으로 보관하므로 데이터 유실의 위험을 줄여줍니다. 이는 채팅 메시지나 중요한 정보의 손실을 방지하는 데 큰 장점입니다.
3. 어떻게 구현했나요?
채팅방 생성
TradeCreateRequestDto
@Getter
@NoArgsConstructor
public class TradeCreateRequestDto {
@NotNull
private Long articleId;
@NotNull
private Long buyerId;
@NotNull
private Long sellerId;
public TradeCreateRequestDto(Long articleId, Long buyerId, Long sellerId) {
this.articleId = articleId;
this.buyerId = buyerId;
this.sellerId = sellerId;
}
}
글 아이디, 판매자 아이디, 구매자 아이디를 받아와 채팅방을 생성했습니다.
TradeController
@PostMapping("/trades")
public ResponseEntity<?> createTrade(
@RequestBody @Valid final TradeCreateRequestDto tradeCreateRequestDto) {
Trade trade = tradeService.createTrade(tradeCreateRequestDto);
return HttpResponse.okWithData(HttpStatus.OK, "채팅방 생성에 성공했습니다.", trade.getId());
}
채팅방을 생성하면 채팅방 아이디를 반환하고, 이 아이디를 통해 페이지 이동을 하고 소켓연결을 합니다.
Trade
@ToString
@Entity
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Trade extends BaseEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false)
private LocalDateTime tradedAt;
@Enumerated(EnumType.STRING)
@Column(nullable = false)
private Status status;
@Column(nullable = false)
private Long articleId;
@Column(nullable = false)
private Long buyerId;
@Column(nullable = false)
private Long sellerId;
@Column(nullable = false)
private Long checkedMessageIdSeller;
@Column(nullable = false)
private Long checkedMessageIdBuyer;
Trade
entity는 이렇게 작성하였습니다.
채팅(프론트)
useChat
import stomp from "stompjs";
import SockJS from "sockjs-client";
import { useSelector } from "react-redux";
import { useEffect, useState, useRef } from "react";
function useChat({ tradeId }) {
const REACT_APP_SOCKET_URL = process.env.REACT_APP_SOCKET_URL;
const stompCilent = useRef({});
const [message, setMessage] = useState("");
const [curTradeId, setCurTradeId] = useState(null);
const [newMessages, setNewMessages] = useState([]);
const { id } = useSelector((state) => state.MemberReducer);
function getJwtFromCookie() {
const cookies = document.cookie.split(";");
for (const cookie of cookies) {
const [name, value] = cookie.trim().split("=");
if (name === "jwt") {
return value;
}
}
return null; // 'jwt' 이름의 쿠키를 찾지 못한 경우 null 반환
}
// JWT 쿠키 가져오기
const jwtToken = getJwtFromCookie();
const memberId = id;
const headers = {
Authorization: jwtToken,
chatNumber: tradeId,
};
/* ------------------ chat ------------------ */
function onConnected() {
// user 개인 구독
stompCilent.current.subscribe(
`/sub/trade/${tradeId}`,
function (curMessage) {
setNewMessages((newMessages) => [
...newMessages,
JSON.parse(curMessage.body),
]);
}
);
}
function connect() {
const socket = new SockJS(REACT_APP_SOCKET_URL);
stompCilent.current = stomp.over(socket);
stompCilent.current.connect(headers, () => {
setTimeout(function () {
onConnected();
setCurTradeId(tradeId);
}, 500);
});
}
const ChangeMessages = (event) => {
setMessage(event.target.value);
};
const sendMessage = async (e) => {
e.preventDefault();
await stompCilent.current.send(
"/pub/send",
headers,
JSON.stringify({
tradeId: tradeId,
memberId: memberId,
message: message,
})
);
setMessage("");
};
useEffect(() => {
if (curTradeId != null) {
stompCilent.current?.unsubscribe({
Authorization: jwtToken,
chatNumber: curTradeId,
});
stompCilent.current?.disconnect();
}
connect();
setNewMessages([]);
}, [tradeId]);
return { message, sendMessage, newMessages, ChangeMessages };
}
export default useChat;
React 애플리케이션에서 WebSocket 통신을 사용하는 채팅 기능을 다룹니다.
onConnected
함수: WebSocket 연결이 성공했을 때 호출되는 콜백 함수입니다. 지정된 채팅 채널(/sub/trade/${tradeId}
)에 구독하며, 새로운 메시지가 도착하면newMessages
배열을 업데이트합니다.connect
함수: WebSocket 서버에 연결하기 위한 함수입니다.SockJS
와stompjs
를 사용하여 WebSocket 연결을 설정하고, 헤더에 인증 정보를 포함하여 연결을 시도합니다. 연결이 성공하면 일정 시간 후에onConnected
함수를 호출하고 현재tradeId
를 설정합니다.ChangeMessages
함수: 입력 필드 값이 변경될 때 호출되는 함수로, 입력된 메시지를message
상태에 업데이트합니다.sendMessage
함수: 메시지를 서버로 전송하는 함수로, Stomp를 사용하여/pub/send
채널로 메시지를 전송합니다. 전송 후에는message
상태를 초기화하여 입력 필드를 비웁니다.useEffect
훅:tradeId
가 변경될 때 이전 채널의 구독을 해제하고 연결을 종료한 뒤, 새로운tradeId
에 대한 연결을 설정합니다. 또한,newMessages
배열을 초기화하여 이전 메시지를 지웁니다.
처음에 disconnect를 하지 않아 다른 채팅방에 들어갔다가 다시 왔을 경우 중복으로 연결되어 채팅 메시지가 중복으로 보내지는 문제가 있었습니다. 그래서 다른 채팅방에 접속할 때(채팅방 아이디가 바뀔 때) 연결을 끊고 다시 연결하도록 하였습니다.
채팅(백엔드)
WebSocketConfig
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry
.addEndpoint("/trades/trade")
.setAllowedOriginPatterns("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/sub");
registry.setApplicationDestinationPrefixes("/pub");
}
}
registerStompEndpoints
메서드:- "/trades/trade" 엔드포인트에 대한 Stomp 연결을 등록하며, CORS를 허용하고 SockJS를 사용합니다.
configureMessageBroker
메서드:- 메시지 브로커를 구성합니다.
- "/sub" 프리픽스로 시작하는 채널을 활성화하여 클라이언트에게 메시지를 브로드캐스트합니다.
- "/pub" 프리픽스로 시작하는 채널을 통해 클라이언트에서 메시지를 브로커로 전송할 수 있게 합니다.
- 즉, "/sub"은 서버에서 클라이언트로 메시지를 전달하는 데 사용되고, "/pub"은 클라이언트에서 서버로 메시지를 전송하는 데 사용됩니다.
KafkaProducerConfig
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.server}")
private String BOOTSTRAP_SERVER;
@Bean
public ProducerFactory<String, KafkaMessageDto> newProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, KafkaMessageDto> newKafkaTemplate() {
return new KafkaTemplate<>(newProducerFactory());
}
}
Kafka에 KafkaMessageDto
객체를 전송하기 위한 프로듀서와 템플릿을 설정합니다. 메시지의 키는 문자열로, 값은 KafkaMessageDto
객체로 직렬화하여 전송됩니다.
KafkaConsumerConfig
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.server}")
private String BOOTSTRAP_SERVER;
private static final String GROUP_ID = "group";
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Kafka에서 문자열 형식의 메시지를 수신하는 컨슈머를 설정합니다. 또한, 리스너 컨테이너 팩토리를 통해 여러 스레드에서 동시에 Kafka 메시지를 처리할 수 있도록 설정합니다.
KafkaMessageDto
public class KafkaMessageDto implements Serializable {
@NotBlank(message = "메시지를 입력하세요.")
private String message;
@NotNull
private Long tradeId;
@NotNull
private Long memberId;
}
메시지와 거래 아이디(채팅방 아이디), 보낸 회원의 아이디를 담아 메시지를 보냅니다.
TradeController
@MessageMapping("/send")
public void sendMessage(KafkaMessageDto message) {
log.info(message.getMessage());
messageService.sendMessage(message);
}
@MessageMapping("/send")
어노테이션이 붙은 메서드 **sendMessage**
는 WebSocket 엔드포인트 "/send"로부터 수신된 메시지를 처리합니다.
MessageService
@Service
@RequiredArgsConstructor
public class MessageService {
private final KafkaProducer kafkaProducer;
private final TradeMessageRepository tradeMessageRepository;
public TradeMessage sendMessage(KafkaMessageDto message) {
kafkaProducer.send(message);
TradeMessage tradeMessage = TradeMessage.createTradeMessage(message.getMessage(),
message.getTradeId(),
message.getMemberId(), LocalDateTime.now());
tradeMessageRepository.save(tradeMessage);
return tradeMessage;
}
}
Kafka를 통해 메시지를 전송하고 동시에 MongoDB에 TradeMessage를 저장함으로써, 실시간 메시징 및 데이터베이스 저장 기능을 통합적으로 수행합니다.
TradeMessage
private String id;
private String message;
private Long tradeId;
private Long memberId;
private LocalDateTime createdAt;
TradeMessage
는 id, message, tradeId, memberId, createdAt
으로 구성하였습니다.
KafkaProducer
@Service
@RequiredArgsConstructor
public class KafkaProducer {
private static final String TOPIC_NAME = "trade";
private final KafkaTemplate<String, KafkaMessageDto> kafkaTemplate;
public void send(KafkaMessageDto message) {
ListenableFuture<SendResult<String, KafkaMessageDto>> future = kafkaTemplate.send(TOPIC_NAME,
message);
}
}
KafkaTemplate을 사용하여 trade 토픽에 메시지를 비동기적으로 전송합니다.
KafkaConsumer
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaConsumer {
private static final String TOPIC_NAME = "trade";
private final SimpMessageSendingOperations simpMessageSendingOperations;
private final ObjectMapper objectMapper = new ObjectMapper();
@KafkaListener(topics = TOPIC_NAME)
public void listenMessage(String jsonMessage) {
try {
KafkaMessageDto message = objectMapper.readValue(jsonMessage, KafkaMessageDto.class);
log.info(">>>" + message.getMemberId() + "," + message.getMessage());
simpMessageSendingOperations.convertAndSend("/sub/trade/" + message.getTradeId(), message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
KafkaConsumer
클래스는 Kafka 토픽에서 메시지를 수신하고, 수신된 메시지를 WebSocket을 통해 클라이언트에게 전달하는 역할을 합니다.
4. 더 알아보기
Kafka
Q: 왜 Kafka를 선택하여 채팅 메시지의 비동기 처리에 사용했나요?
A: Kafka는 대용량의 데이터를 안정적이고 효율적으로 처리할 수 있는 분산형 스트리밍 플랫폼으로, 메시지 큐 시스템으로서의 강력한 성능을 제공합니다. 실시간 채팅에서는 높은 처리량이 필요하고, 메시지의 손실 없이 안정적으로 전달되어야 하기 때문에 Kafka를 선택했습니다. 또한, Kafka의 분산 아키텍처는 확장성을 제공하므로 증가하는 트래픽에 대처할 수 있었습니다.
Q: Kafka의 주요 특징 중 채팅 메시지 처리에 어떤 부분을 활용했나요?
A: 우선, Kafka의 Pub-Sub 모델을 활용하여 메시지의 발신자(Producer)와 수신자(Consumer)를 분리하여 메시지를 안전하게 전달했습니다. 또한, Kafka의 영속성을 이용하여 메시지의 지속성을 보장하고, 필요에 따라 메시지를 유지하거나 보관할 수 있었습니다. 이는 채팅 메시지의 중요성과 장기적인 저장 요구에 부합했습니다.
Q: 카프카는 왜 대용량 실시간 데이터 스트림 처리에 적합한가요?
A: 카프카는 대용량 데이터를 안정적으로 처리하는 데 강점을 가지고 있습니다. 그 이유로는 높은 확장성, 내구성 있는 메시지 스토리지, 분산 아키텍처, 멀티-컨슈머 지원 등이 있습니다. 또한, 프로듀서와 컨슈머 간의 디커플링으로 인해 서로 독립적으로 확장이 가능하며, 이는 대규모 데이터 스트림 처리에 이상적입니다.
Q: 카프카의 토픽과 파티션의 역할은 무엇인가요?
A: 토픽은 메시지 스트림을 카테고리화하는데 사용되며, 파티션은 토픽을 물리적으로 분할합니다. 파티션은 메시지의 분산 저장과 병렬 처리를 가능하게 하며, 각 파티션은 독립적으로 컨슈머에게 할당될 수 있어서 처리량을 증가시키고 병목 현상을 방지할 수 있습니다.
Q: 카프카의 데이터 보존 및 내구성을 어떻게 보장하나요?
A: 카프카는 메시지를 영구적으로 보존하고 안전하게 처리하기 위해 여러 복제본을 유지합니다. 각 메시지는 설정된 보존 기간 동안 브로커에 저장되며, 복제본이 다수 존재하기 때문에 노드의 장애에 대비하여 내구성을 제공합니다.
Q: 카프카 컨슈머 그룹이 필요한 이유는 무엇인가요?
A: 컨슈머 그룹은 특정 토픽의 메시지를 병렬로 처리하기 위한 개념입니다. 여러 컨슈머가 동일한 그룹에 속하면 각 컨슈머가 특정 파티션의 메시지를 동시에 처리할 수 있습니다. 이를 통해 메시지의 병렬 처리와 확장성을 극대화할 수 있습니다.
Q: 카프카와 RabbitMQ의 차이점은 무엇인가요?
A: 카프카는 대규모 데이터 스트림 처리에 특화되어 있으며, 메시지는 영구적으로 저장되고 병렬 처리가 가능합니다. 반면, RabbitMQ는 메시지 큐 시스템으로서 실시간 처리보다는 안정적인 메시지 전달에 중점을 둡니다. RabbitMQ는 메시지를 큐에 저장하여 수신 대기하며, 컨슈머가 메시지를 처리하면 큐에서 삭제됩니다.
WebStomp
Q: WebStomp은 무엇인가요?
A: WebStomp은 WebSocket을 기반으로 하는 Stomp 프로토콜의 JavaScript 라이브러리로, 웹 애플리케이션에서 서버와의 양방향 통신을 구현하는 데 사용됩니다. 이를 통해 브라우저에서 서버로 메시지를 실시간으로 전송하고 받을 수 있습니다.
Q: Stomp 프로토콜이란 무엇이며 어떤 상황에서 사용되나요?
A: Stomp(간단한 텍스트 지향 메시징 프로토콜)은 주로 메시징 시스템에서 사용되는 메시징 프로토콜입니다. 텍스트 기반이며 간단하고 가벼워서 다양한 언어 및 플랫폼 간의 통신에 적합합니다. 주로 메시지 큐와 같은 환경에서 사용되어 데이터를 주고받거나 이벤트를 전파하는 데 활용됩니다.
Q: WebStomp의 주요 특징은 무엇인가요?
A: WebStomp은 브라우저와 서버 간의 양방향 통신을 위해 설계된 라이브러리입니다. 브라우저에서 WebSocket을 사용하여 실시간 데이터를 주고받을 수 있게 해주며, 이를 Stomp 프로토콜을 통해 구조화된 메시지로 처리합니다. 이를 통해 웹 애플리케이션에서 실시간 상호작용 및 데이터 전송이 가능해집니다.
'개발 글쓰기' 카테고리의 다른 글
알림 서비스 구현하기(Spring Batch, SSE, Kafka) (0) | 2023.12.31 |
---|---|
매칭 서비스 구현하기(RabbitMQ, Unit Test) (0) | 2023.12.31 |