그리고 kafkaConfig를 만들어 Producer와 Comsumer의 bootstrapServer 정보, group, client ID, serializer와 desirializer를 설정해준다. 아래의 코드는 거의 최소 설정으로 구성한 것인데, 보안이나 timeout, partition 등의 설정은 원하는대로 추가해주면 된다.
큐에 데이터를 발행해줄 api를 하나 만들어준다. 실행한 서버의 이 url에 요청을 보내면 TestPayload 객체가 메세지로 변환되어 test 토픽에 쌓이게 될 것이다.
위의 config에서 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG를 JsonSerializer로 설정해주었기 때문에 객체를 넣으면 Json 데이터로 변환되어 저장된다. Json이기 때문에 inner class가 있는 복잡한 객체도 바로 전달할 수 있다.
단, payload로 전송할 객체는 kafka에 저장될 형태로 역직렬화, 직렬화하는 과정이 필요하기 때문에 Serializable을 상속받거나 (kotlin에선) data class로 정의해줘야한다.
여기서 사용할 test topic은, kafka에서 따로 생성해주어야한다. 하지만 원한다면 config 코드상에서 topic을 Bean으로 정의하는 방법도 있다.
test 토픽에 저장된 메시지들을 소비할 consumer 클래스이다. @KafkaListener 어노테이션을 달아주면 해당 topic의 메시지를 받아 처리한다.
group은 config에서 설정한 groupId를 그대로 적은 것인데, 다중 group을 관리하는 것이 목적이 아니기 때문에 큰 의미는 없다.