웹 애플리케이션을 개발하다보면 클라이언트의 요청 없이도 서버에서 특정한 데이터를 클라이언트로 보내주어야 하는 상황이 생긴다. 예를 들어, 댓글 알림이나 물건 판매 알림과 같이 실시간 알림 기능을 위해서는 서버에서 해당 이벤트가 발생하는 즉시 클라이언트로 알림을 보내주어야 한다.
개인 프로젝트에서도 해당 기능이 필요한 상황이 생겼다. 어드민에서 사용자가 판매 요청을 진행한 물건을 검토 후 판매 허가를 진행하면, 해당 사용자는 본인의 물건이 판매 허가가 떨어졌다는 실시간 알림을 받아야 하는 기능이 필요했다.
일반적인 HTTP 기반 통신 방식으로는 이 기능을 구현하기에는 어려운 부분이 있다. 서버에서 해당 알림을 보내기 위해서는 클라이언트의 요청이 꼭 필요하기 때문이다. 이렇게 HTTP 기반으로 실시간 통신이 필요한 경우에 고려해볼 수 있는 여러가지 통신 기법들이 있는데, 그 중에서 SSE(Server-Sent Event)를 사용하여 해당 기능을 구현하기로 하였다.
실시간 통신을 위한 여러 기법들에 대해서는 이 글에서 자세하게 설명되어 있다.
SSE를 선택한 이유는, 첫 통신 연결을 제외하면 연결이 된 동안에는 클라이언트의 요청 없이도 서버는 어드민이 물건 판매 허가를 낸 즉시, 클라이언트에게 알림을 바로 전송할 수 있기 때문이다.
이 글에서는 SSE 통신 기능을 Spring에서는 어떻게 구현하는지 알아보고 프로젝트에 적용해볼 것이다.
SseEmitter
SseEmitter 생성
Spring framework 4.2 부터 SSE 통신을 위한 SseEmitter
라는 API를 제공한다. 이를 활용해서 SSE 연결 구독 요청에 대한 응답 및 이벤트 발생에 대한 알림 전송을 진행할 수 있다.
아래와 같이 생성자를 통해 새로운 SseEmitter
객체를 생성할 수 있으며, 생성 시에 파라미터에 SSE 연결 만료시간을 지정할 수 있다. 만약 만료시간을 지정하지 않으면, Spring Boot에서 사용하는 내장 Tomcat 서버에서 지정된 default 값을 따르게 된다. 만료시간이 지나면 브라우저에서는 자동으로 서버에 재연결 요청을 보낸다.
SseEmitter sseWithoutTimeout = new SseEmitter(); // timeout 미지정
SseEmitter sseWithTimeout = new SseEmitter(60 * 1000L); // timeout 지정

SSE 연결 과정
SseEmtter와 이벤트 저장
SseEmitterRepository
위에서 설명한 SseEmitter
객체는 생성한 뒤에 서버에서 저장하고 관리해주어야 한다. 그 이유는, SseEmitter
객체가 생성되고 연결이 유지되는 동안에 이벤트가 발생했을 경우, 해당 객체로 연결된 클라이언트에게 이벤트 알림을 보내야 하기 때문이다.
이를 위해 SseEmitter
를 저장하는 repository를 제작하였다.
public interface SseEmitterRepository {
SseEmitter save(String username, SseEmitter sseEmitter);
Optional<SseEmitter> findByUsername(String username);
void deleteByUsername(String username);
void clear();
}
@Repository
public class SseEmitterRepositoryImpl implements SseEmitterRepository {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
@Override
public SseEmitter save(String username, SseEmitter sseEmitter) {
emitters.put(username, sseEmitter);
return sseEmitter;
}
@Override
public Optional<SseEmitter> findByUsername(String username) {
return Optional.ofNullable(emitters.get(username));
}
@Override
public void deleteByUsername(String username) {
emitters.remove(username);
}
@Override
public void clear() {
emitters.clear();
}
}
- 저장소는 동시성 문제에 대비해 구현체로
ConcurrentHashMap
을 사용하며, key는 username(문자열)로, value는SseEmitter
객체로 지정하여 저장한다.
EventCache
부득이하게 SSE 연결 도중 알림 전송이 누락되었을 경우, 재연결 시 알림이 누락된 이벤트들을 모두 조회한 후 다시 전송해야 한다. 이런 상황에 대비하여 이벤트를 미리 저장해두는 곳이 필요하기 때문에 EventCache
를 만들었다.
public interface EventCache {
Event save(String eventId, Event event);
Optional<Event> findByEventId(String eventId);
Map<String, Event> findAllOmittedEvents(String username);
void deleteByEventId(String eventId);
void clear();
}
@Repository
public class EventCacheImpl implements EventCache {
private final Map<String, Event> eventCache = new ConcurrentHashMap<>();
@Override
public Event save(String eventId, Event event) {
eventCache.put(eventId, event);
return event;
}
@Override
public Optional<Event> findByEventId(String eventId) {
return Optional.ofNullable(eventCache.get(eventId));
}
@Override
public Map<String, Event> findAllOmittedEvents(String username) {
return eventCache.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(username))
.filter(entry -> entry.getKey().compareTo(EventIdUtils.createEventId(username)) < 0)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public void deleteByEventId(String eventId) {
eventCache.remove(eventId);
}
@Override
public void clear() {
eventCache.clear();
}
}
- 동시성 문제에 대비해 구현체로
ConcurrentHashMap
을 사용하며, key는 eventId(문자열)로, value는 이벤트 이름과 알림 내용이 담긴Event
객체를 지정하여 보관한다.
SSE 연결
NotificationController
클라이언트는 SSE 연결을 위해 서버로 구독 요청을 보내야 한다. 이를 위한 controller 클래스를 만들었다.
@RequiredArgsConstructor
@RequestMapping("/api/notifications")
@RestController
public class NotificationController {
private final NotificationService notificationService;
@GetMapping(value = "/subscribe", produces = TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe(Authentication authentication,
@RequestHeader(value = "Last-Event-Id", required = false) String lastEventId) {
if (authentication == null) {
throw new SseUnavailableException(SUBSCRIBER_NOT_PRESENTED);
}
return notificationService.subscribe(authentication.getName(), lastEventId);
}
}
- 클라이언트가 SSE 연결을 위해
/api/notifications/subscribe
로 요청을 보내면, 서버는 응답의 미디어 타입을text/event-stream
으로 지정하고SseEmitter
객체를 응답으로 전달하며 SSE 연결이 시작된다. - 여기서는 어떤 사용자가 SSE 연결 요청을 보냈는지 알 수 있도록, 요청 시에 JWT access token을 전달하도록 만들었다.
Last-Event-Id
: SSE 구독 요청 시 전달할 수 있는 헤더로, 클라이언트가 가장 마지막으로 알림을 받은 이벤트의 id이다.- 클라이언트가 보낸 요청에 이 헤더가 포함되어 있는 경우, 이전 SSE 연결이 어떠한 사유로 중간에 종료되어 이벤트 알림이 누락되었을 수 있다는 뜻으로, 서버는 이 값을 바탕으로 알림이 누락된 이벤트를 모두 찾아 재전송해야 한다.
NotificationService
NotificationController
에서 연결 요청이 들어오면 SseEmitter
생성 및 연결 과정을 거쳐야 한다. 이를 위한 service 클래스를 하나 만들었다.
@Slf4j
@RequiredArgsConstructor
@Service
public class NotificationService {
private static final long TIMEOUT = Duration.ofHours(1).toMillis();
private final NotificationRepository notificationRepository;
private final SseEmitterRepository sseEmitterRepository;
private final EventCache eventCache;
public SseEmitter subscribe(String username, String lastEventId) {
...
}
public void notifyEvent(String username, EventType eventType, String data) {
...
}
}
subscribe()
이 메서드는 SSE 연결 요청이 들어오면 SseEmitter
를 생성 및 저장하고, SSE 비동기 요청이 완료되거나 타임아웃, 오류 발생 시에 호출되는 콜백을 설정한다. 아래의 코드를 보며 더 자세히 알아보자.
public SseEmitter subscribe(String username, String lastEventId) {
SseEmitter sseEmitter = sseEmitterRepository.save(username, new SseEmitter(TIMEOUT));
setEmitterCallback(sseEmitter, username);
if (StringUtils.hasText(lastEventId)) {
notifyOmittedEvents(sseEmitter, username);
} else {
notifyDummy(sseEmitter, username);
}
return sseEmitter;
}
먼저, 요청이 들어오면 새로운 SseEmitter
객체를 생성하고 SseEmitterRepository
에 저장한다. 이후, 해당 SseEmitter
객체의 콜백을 설정하는 setEmitterCallback()
메서드를 호출한다.
private void setEmitterCallback(SseEmitter sseEmitter, String username) {
sseEmitter.onCompletion(() -> {
log.info("SSE completed: subscriber = {}", username);
sseEmitterRepository.deleteByUsername(username);
});
sseEmitter.onTimeout(() -> {
log.info("SSE timeout: subscriber = {}", username);
sseEmitter.complete();
});
sseEmitter.onError((e) -> {
log.info("SSE error: subscriber = {}", username, e);
sseEmitter.complete();
});
}
onCompletion()
- SSE 메시지 전송이 완료되었거나, 타임아웃, 에러 발생 시 호출되는 콜백이다.- 여기서 중요한 점은, 이 콜백은 SSE가 정상적으로 완료된 상황 뿐만 아니라 만료시간이 지났거나 중간에 에러가 발생했을 때도 호출된다는 것이다.
onTimeout()
- 만료시간이 지난 경우에만 호출되는 콜백이다.onError()
- 중간에 에러가 발생했을 경우에만 호출되는 콜백이다.
콜백 설정을 완료한 후, 매개변수로 전달받은 lastEventId
를 확인한다. 만약, SSE 연결 요청 시에 해당 헤더가 포함되었다면, notifyOmittedEvents()
메서드를 호출해 알림이 누락된 이벤트들을 찾아 모두 알림을 재전송한다.
/**
* 알림 전송이 누락된 이벤트들을 조회한 후, 재전송하기 위해 호출하는 메서드
*/
private void notifyOmittedEvents(SseEmitter sseEmitter, String username) {
eventCache.findAllOmittedEvents(username)
.forEach((eventId, event) -> {
notify(sseEmitter, eventId, event.getEventName(), event.getData());
eventCache.deleteByEventId(eventId);
});
}
Last-Event-Id
값이 없을 경우, 아래와 같이 더미 데이터를 전송해야 한다. 그 이유는, SseEmitter
생성 후 만료시간까지 아무런 데이터도 전송하지 않는다면, 재연결 요청 시에 503 Service Unavailable 에러를 전송하기 때문이다.
/**
* 첫 SSE 연결 후, 더미 데이터를 보내기 위해 호출하는 메서드
*/
private void notifyDummy(SseEmitter sseEmitter, String username) {
String eventId = "";
String eventName = SSE_SUBSCRIPTION.getEventName();
String data = "SSE connected. Connected user = " + username;
notify(sseEmitter, eventId, eventName, data);
}
마지막으로, SseEmitter
객체를 반환하는 것으로 subscribe()
메서드의 역할은 끝난다.
notifyEvent()
이벤트가 발생하는 곳에서 알림을 전송하기 위해 호출하는 메서드이다.
/**
* 이벤트가 발생하는 곳에서 알림을 전송하기 위해 호출하는 메서드
*/
public void notifyEvent(String username, EventType eventType, String data) {
String eventId = EventIdUtils.createEventId(username);
String eventName = eventType.getEventName();
eventCache.save(eventId, new Event(eventName, data));
SseEmitter sseEmitter = sseEmitterRepository.findByUsername(username)
.orElseThrow(() -> {
String errorMessage = "SSE connection is not established. User = " + username;
return new SseUnavailableException(CONNECTION_NOT_ESTABLISHED, errorMessage);
});
notify(sseEmitter, eventId, eventName, data);
sseEmitter.complete();
}
private void notify(SseEmitter sseEmitter, String eventId, String eventName, Object data) {
try {
sseEmitter.send(SseEmitter.event()
.id(eventId)
.name(eventName)
.data(data));
} catch (IOException e) {
log.info("Exception occurred while sending notification.");
String username = EventIdUtils.parseUsername(eventId);
sseEmitterRepository.deleteByUsername(username);
throw new SseNotificationException(NOTIFICATION_ERROR);
}
}
- 먼저, 매개변수로 전달받은
username
을 바탕으로eventId
를 생성한다.eventId
는username + "-" + System.currentTimeMillis()
형식으로 생성되며,EventCache
에 이벤트를 저장할 때 key값으로 사용된다.
- EventId를 생성할 때 시간이 추가되는 이유는, 누락된 알림을 eventId를 통해 확인할 수 있도록 하기 위함이다.
-
EventCache
에 저장을 마치면SseEmitterRepository
에서username
으로SseEmitter
를 조회한 후,notify()
메서드를 호출하여 알림을 전송한다.
주의사항
503 Unavailable Service
위에서 언급하였듯이, 처음 SSE 연결 요청 시에 아무런 이벤트를 전달하지 않으면 재연결 요청 시에 503 Unavailable Service 에러를 반환하게 된다. 따라서, 첫 SSE 연결 요청 시에 더미 데이터를 전달하여 해당 에러가 반환되지 않도록 만들어야 한다.
SseEmitter와 이벤트 저장 시 주의사항
현재 SseEmitter
와 Event
객체는 서버의 메모리에 저장되어 있다. 만약, 서버를 증설해야 하는 상황이라면 이 방식은 제대로 동작하지 않는다. 이 경우, 모든 서버 인스턴스에서 공유할 수 있는 저장소를 도입하는 것이 좋은 방법이다.
JPA 사용 시 주의사항
SSE 통신이 진행되는 동안에는 HTTP connection이 계속해서 열려있게 된다. 이때, JPA를 사용하는 상황이고 OSIV 설정이 open-in-view=true
로 설정되어 있다면, SSE 통신이 진행되는 동안 DB connection 또한 같이 열려있게 된다.
여기서 만약 DB Connection Pool에서 사용할 수 있는 connection의 수 보다 더 많은 SSE 통신 요청이 들어오게 된다면 DB connection이 고갈되는 현상이 발생하게 된다. 따라서,open-in-view
를 반드시 false
로 설정해야 한다.
참고
Spring에서 Server-Sent-Events 구현하기
…
tecoble.techcourse.co.kr
'Spring' 카테고리의 다른 글
[Spring] RestClient에 대하여 (0) | 2024.07.01 |
---|---|
[Spring] @Scheduled를 사용한 메서드 스케줄링 (0) | 2024.06.17 |
[Spring / AWS] Spring Boot 3 + AWS Lambda 사용하기 (0) | 2024.04.26 |
[Spring] Pagination 기본값 설정하기 (0) | 2024.04.03 |
[Spring] MultipartFile Bean Validation (0) | 2024.01.26 |
웹 애플리케이션을 개발하다보면 클라이언트의 요청 없이도 서버에서 특정한 데이터를 클라이언트로 보내주어야 하는 상황이 생긴다. 예를 들어, 댓글 알림이나 물건 판매 알림과 같이 실시간 알림 기능을 위해서는 서버에서 해당 이벤트가 발생하는 즉시 클라이언트로 알림을 보내주어야 한다.
개인 프로젝트에서도 해당 기능이 필요한 상황이 생겼다. 어드민에서 사용자가 판매 요청을 진행한 물건을 검토 후 판매 허가를 진행하면, 해당 사용자는 본인의 물건이 판매 허가가 떨어졌다는 실시간 알림을 받아야 하는 기능이 필요했다.
일반적인 HTTP 기반 통신 방식으로는 이 기능을 구현하기에는 어려운 부분이 있다. 서버에서 해당 알림을 보내기 위해서는 클라이언트의 요청이 꼭 필요하기 때문이다. 이렇게 HTTP 기반으로 실시간 통신이 필요한 경우에 고려해볼 수 있는 여러가지 통신 기법들이 있는데, 그 중에서 SSE(Server-Sent Event)를 사용하여 해당 기능을 구현하기로 하였다.
실시간 통신을 위한 여러 기법들에 대해서는 이 글에서 자세하게 설명되어 있다.
SSE를 선택한 이유는, 첫 통신 연결을 제외하면 연결이 된 동안에는 클라이언트의 요청 없이도 서버는 어드민이 물건 판매 허가를 낸 즉시, 클라이언트에게 알림을 바로 전송할 수 있기 때문이다.
이 글에서는 SSE 통신 기능을 Spring에서는 어떻게 구현하는지 알아보고 프로젝트에 적용해볼 것이다.
SseEmitter
SseEmitter 생성
Spring framework 4.2 부터 SSE 통신을 위한 SseEmitter
라는 API를 제공한다. 이를 활용해서 SSE 연결 구독 요청에 대한 응답 및 이벤트 발생에 대한 알림 전송을 진행할 수 있다.
아래와 같이 생성자를 통해 새로운 SseEmitter
객체를 생성할 수 있으며, 생성 시에 파라미터에 SSE 연결 만료시간을 지정할 수 있다. 만약 만료시간을 지정하지 않으면, Spring Boot에서 사용하는 내장 Tomcat 서버에서 지정된 default 값을 따르게 된다. 만료시간이 지나면 브라우저에서는 자동으로 서버에 재연결 요청을 보낸다.
SseEmitter sseWithoutTimeout = new SseEmitter(); // timeout 미지정
SseEmitter sseWithTimeout = new SseEmitter(60 * 1000L); // timeout 지정

SSE 연결 과정
SseEmtter와 이벤트 저장
SseEmitterRepository
위에서 설명한 SseEmitter
객체는 생성한 뒤에 서버에서 저장하고 관리해주어야 한다. 그 이유는, SseEmitter
객체가 생성되고 연결이 유지되는 동안에 이벤트가 발생했을 경우, 해당 객체로 연결된 클라이언트에게 이벤트 알림을 보내야 하기 때문이다.
이를 위해 SseEmitter
를 저장하는 repository를 제작하였다.
public interface SseEmitterRepository {
SseEmitter save(String username, SseEmitter sseEmitter);
Optional<SseEmitter> findByUsername(String username);
void deleteByUsername(String username);
void clear();
}
@Repository
public class SseEmitterRepositoryImpl implements SseEmitterRepository {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
@Override
public SseEmitter save(String username, SseEmitter sseEmitter) {
emitters.put(username, sseEmitter);
return sseEmitter;
}
@Override
public Optional<SseEmitter> findByUsername(String username) {
return Optional.ofNullable(emitters.get(username));
}
@Override
public void deleteByUsername(String username) {
emitters.remove(username);
}
@Override
public void clear() {
emitters.clear();
}
}
- 저장소는 동시성 문제에 대비해 구현체로
ConcurrentHashMap
을 사용하며, key는 username(문자열)로, value는SseEmitter
객체로 지정하여 저장한다.
EventCache
부득이하게 SSE 연결 도중 알림 전송이 누락되었을 경우, 재연결 시 알림이 누락된 이벤트들을 모두 조회한 후 다시 전송해야 한다. 이런 상황에 대비하여 이벤트를 미리 저장해두는 곳이 필요하기 때문에 EventCache
를 만들었다.
public interface EventCache {
Event save(String eventId, Event event);
Optional<Event> findByEventId(String eventId);
Map<String, Event> findAllOmittedEvents(String username);
void deleteByEventId(String eventId);
void clear();
}
@Repository
public class EventCacheImpl implements EventCache {
private final Map<String, Event> eventCache = new ConcurrentHashMap<>();
@Override
public Event save(String eventId, Event event) {
eventCache.put(eventId, event);
return event;
}
@Override
public Optional<Event> findByEventId(String eventId) {
return Optional.ofNullable(eventCache.get(eventId));
}
@Override
public Map<String, Event> findAllOmittedEvents(String username) {
return eventCache.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(username))
.filter(entry -> entry.getKey().compareTo(EventIdUtils.createEventId(username)) < 0)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public void deleteByEventId(String eventId) {
eventCache.remove(eventId);
}
@Override
public void clear() {
eventCache.clear();
}
}
- 동시성 문제에 대비해 구현체로
ConcurrentHashMap
을 사용하며, key는 eventId(문자열)로, value는 이벤트 이름과 알림 내용이 담긴Event
객체를 지정하여 보관한다.
SSE 연결
NotificationController
클라이언트는 SSE 연결을 위해 서버로 구독 요청을 보내야 한다. 이를 위한 controller 클래스를 만들었다.
@RequiredArgsConstructor
@RequestMapping("/api/notifications")
@RestController
public class NotificationController {
private final NotificationService notificationService;
@GetMapping(value = "/subscribe", produces = TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe(Authentication authentication,
@RequestHeader(value = "Last-Event-Id", required = false) String lastEventId) {
if (authentication == null) {
throw new SseUnavailableException(SUBSCRIBER_NOT_PRESENTED);
}
return notificationService.subscribe(authentication.getName(), lastEventId);
}
}
- 클라이언트가 SSE 연결을 위해
/api/notifications/subscribe
로 요청을 보내면, 서버는 응답의 미디어 타입을text/event-stream
으로 지정하고SseEmitter
객체를 응답으로 전달하며 SSE 연결이 시작된다. - 여기서는 어떤 사용자가 SSE 연결 요청을 보냈는지 알 수 있도록, 요청 시에 JWT access token을 전달하도록 만들었다.
Last-Event-Id
: SSE 구독 요청 시 전달할 수 있는 헤더로, 클라이언트가 가장 마지막으로 알림을 받은 이벤트의 id이다.- 클라이언트가 보낸 요청에 이 헤더가 포함되어 있는 경우, 이전 SSE 연결이 어떠한 사유로 중간에 종료되어 이벤트 알림이 누락되었을 수 있다는 뜻으로, 서버는 이 값을 바탕으로 알림이 누락된 이벤트를 모두 찾아 재전송해야 한다.
NotificationService
NotificationController
에서 연결 요청이 들어오면 SseEmitter
생성 및 연결 과정을 거쳐야 한다. 이를 위한 service 클래스를 하나 만들었다.
@Slf4j
@RequiredArgsConstructor
@Service
public class NotificationService {
private static final long TIMEOUT = Duration.ofHours(1).toMillis();
private final NotificationRepository notificationRepository;
private final SseEmitterRepository sseEmitterRepository;
private final EventCache eventCache;
public SseEmitter subscribe(String username, String lastEventId) {
...
}
public void notifyEvent(String username, EventType eventType, String data) {
...
}
}
subscribe()
이 메서드는 SSE 연결 요청이 들어오면 SseEmitter
를 생성 및 저장하고, SSE 비동기 요청이 완료되거나 타임아웃, 오류 발생 시에 호출되는 콜백을 설정한다. 아래의 코드를 보며 더 자세히 알아보자.
public SseEmitter subscribe(String username, String lastEventId) {
SseEmitter sseEmitter = sseEmitterRepository.save(username, new SseEmitter(TIMEOUT));
setEmitterCallback(sseEmitter, username);
if (StringUtils.hasText(lastEventId)) {
notifyOmittedEvents(sseEmitter, username);
} else {
notifyDummy(sseEmitter, username);
}
return sseEmitter;
}
먼저, 요청이 들어오면 새로운 SseEmitter
객체를 생성하고 SseEmitterRepository
에 저장한다. 이후, 해당 SseEmitter
객체의 콜백을 설정하는 setEmitterCallback()
메서드를 호출한다.
private void setEmitterCallback(SseEmitter sseEmitter, String username) {
sseEmitter.onCompletion(() -> {
log.info("SSE completed: subscriber = {}", username);
sseEmitterRepository.deleteByUsername(username);
});
sseEmitter.onTimeout(() -> {
log.info("SSE timeout: subscriber = {}", username);
sseEmitter.complete();
});
sseEmitter.onError((e) -> {
log.info("SSE error: subscriber = {}", username, e);
sseEmitter.complete();
});
}
onCompletion()
- SSE 메시지 전송이 완료되었거나, 타임아웃, 에러 발생 시 호출되는 콜백이다.- 여기서 중요한 점은, 이 콜백은 SSE가 정상적으로 완료된 상황 뿐만 아니라 만료시간이 지났거나 중간에 에러가 발생했을 때도 호출된다는 것이다.
onTimeout()
- 만료시간이 지난 경우에만 호출되는 콜백이다.onError()
- 중간에 에러가 발생했을 경우에만 호출되는 콜백이다.
콜백 설정을 완료한 후, 매개변수로 전달받은 lastEventId
를 확인한다. 만약, SSE 연결 요청 시에 해당 헤더가 포함되었다면, notifyOmittedEvents()
메서드를 호출해 알림이 누락된 이벤트들을 찾아 모두 알림을 재전송한다.
/**
* 알림 전송이 누락된 이벤트들을 조회한 후, 재전송하기 위해 호출하는 메서드
*/
private void notifyOmittedEvents(SseEmitter sseEmitter, String username) {
eventCache.findAllOmittedEvents(username)
.forEach((eventId, event) -> {
notify(sseEmitter, eventId, event.getEventName(), event.getData());
eventCache.deleteByEventId(eventId);
});
}
Last-Event-Id
값이 없을 경우, 아래와 같이 더미 데이터를 전송해야 한다. 그 이유는, SseEmitter
생성 후 만료시간까지 아무런 데이터도 전송하지 않는다면, 재연결 요청 시에 503 Service Unavailable 에러를 전송하기 때문이다.
/**
* 첫 SSE 연결 후, 더미 데이터를 보내기 위해 호출하는 메서드
*/
private void notifyDummy(SseEmitter sseEmitter, String username) {
String eventId = "";
String eventName = SSE_SUBSCRIPTION.getEventName();
String data = "SSE connected. Connected user = " + username;
notify(sseEmitter, eventId, eventName, data);
}
마지막으로, SseEmitter
객체를 반환하는 것으로 subscribe()
메서드의 역할은 끝난다.
notifyEvent()
이벤트가 발생하는 곳에서 알림을 전송하기 위해 호출하는 메서드이다.
/**
* 이벤트가 발생하는 곳에서 알림을 전송하기 위해 호출하는 메서드
*/
public void notifyEvent(String username, EventType eventType, String data) {
String eventId = EventIdUtils.createEventId(username);
String eventName = eventType.getEventName();
eventCache.save(eventId, new Event(eventName, data));
SseEmitter sseEmitter = sseEmitterRepository.findByUsername(username)
.orElseThrow(() -> {
String errorMessage = "SSE connection is not established. User = " + username;
return new SseUnavailableException(CONNECTION_NOT_ESTABLISHED, errorMessage);
});
notify(sseEmitter, eventId, eventName, data);
sseEmitter.complete();
}
private void notify(SseEmitter sseEmitter, String eventId, String eventName, Object data) {
try {
sseEmitter.send(SseEmitter.event()
.id(eventId)
.name(eventName)
.data(data));
} catch (IOException e) {
log.info("Exception occurred while sending notification.");
String username = EventIdUtils.parseUsername(eventId);
sseEmitterRepository.deleteByUsername(username);
throw new SseNotificationException(NOTIFICATION_ERROR);
}
}
- 먼저, 매개변수로 전달받은
username
을 바탕으로eventId
를 생성한다.eventId
는username + "-" + System.currentTimeMillis()
형식으로 생성되며,EventCache
에 이벤트를 저장할 때 key값으로 사용된다.
- EventId를 생성할 때 시간이 추가되는 이유는, 누락된 알림을 eventId를 통해 확인할 수 있도록 하기 위함이다.
-
EventCache
에 저장을 마치면SseEmitterRepository
에서username
으로SseEmitter
를 조회한 후,notify()
메서드를 호출하여 알림을 전송한다.
주의사항
503 Unavailable Service
위에서 언급하였듯이, 처음 SSE 연결 요청 시에 아무런 이벤트를 전달하지 않으면 재연결 요청 시에 503 Unavailable Service 에러를 반환하게 된다. 따라서, 첫 SSE 연결 요청 시에 더미 데이터를 전달하여 해당 에러가 반환되지 않도록 만들어야 한다.
SseEmitter와 이벤트 저장 시 주의사항
현재 SseEmitter
와 Event
객체는 서버의 메모리에 저장되어 있다. 만약, 서버를 증설해야 하는 상황이라면 이 방식은 제대로 동작하지 않는다. 이 경우, 모든 서버 인스턴스에서 공유할 수 있는 저장소를 도입하는 것이 좋은 방법이다.
JPA 사용 시 주의사항
SSE 통신이 진행되는 동안에는 HTTP connection이 계속해서 열려있게 된다. 이때, JPA를 사용하는 상황이고 OSIV 설정이 open-in-view=true
로 설정되어 있다면, SSE 통신이 진행되는 동안 DB connection 또한 같이 열려있게 된다.
여기서 만약 DB Connection Pool에서 사용할 수 있는 connection의 수 보다 더 많은 SSE 통신 요청이 들어오게 된다면 DB connection이 고갈되는 현상이 발생하게 된다. 따라서,open-in-view
를 반드시 false
로 설정해야 한다.
참고
Spring에서 Server-Sent-Events 구현하기
…
tecoble.techcourse.co.kr
'Spring' 카테고리의 다른 글
[Spring] RestClient에 대하여 (0) | 2024.07.01 |
---|---|
[Spring] @Scheduled를 사용한 메서드 스케줄링 (0) | 2024.06.17 |
[Spring / AWS] Spring Boot 3 + AWS Lambda 사용하기 (0) | 2024.04.26 |
[Spring] Pagination 기본값 설정하기 (0) | 2024.04.03 |
[Spring] MultipartFile Bean Validation (0) | 2024.01.26 |