승이의 기술블로그

https://dev-seunghee.tistory.com/12

 

[SpringBoot] 이벤트 기반 아키텍처를 알아보고 스프링부트의 이벤트를 구현해보자

📝 들어가며 교내의 스포츠 경기 상활을 실시간으로 확인할 수 있는 서비스 '훕치치'에서는 사용자들이 응원하는 팀에 대해 응원 댓글을 남길 수 있는 기능을 제공하고 있다. 1차 릴리즈 이후

dev-seunghee.tistory.com

위의 글에서 이어집니다!


앞선 게시글에서 '이벤트' 자체란 무엇이고, 스프링부트에서 이를 어떻게 구현할 수 있는지에 대해서 알아봤다.

 

프로젝트의 구현 상황에 따르면 이벤트 기반 아키텍처를 사용하는 것이 옳을 것으로 보인다. 이는 이벤트를 사용하기도 하지만, 동시에 도메인 자체의 변화와도 같다. 왜냐하면 사용자가 남긴 채팅은 일일이 CheerTalk 이라는 도메인으로 취급되고 있기 때문이다. 따라서 새로운 채팅이 등록될 때마다 CheerTalk 이라는 도메인이 저장되는 동시에 이벤트가 발행되어야 하는 것이 옳다. 이러한 매커니즘을 도메인 이벤트라고 한다.

👀 도메인 이벤트란

Captures the memory of something interesting which affects the domain

- 마틴 파울러

 

도메인에서 어떤 일이 발생했는지에 대한 것이다. 연관이 없는 도메인의 활동에 대해서는 무시한다.

- 에릭 에반스

 

도메인 이벤트는 doamin 자체의 변화를 다른 곳에 알리기 위해서 event 로 만드는 것이다.

나머지의 특성들은 앞선 게시글에서 설명한 이벤트와 대부분 동일하다.

👀 AbstractAggregateRoot

스프링부트에서는 AbstractAggregateRoot 를 이용해서 손쉽게 도메인 이벤트를 발행할 수 있다.

AbstractAggregateRoot 를 뜯어보자.

public class AbstractAggregateRoot<A extends AbstractAggregateRoot<A>> {
    @Transient
    private final transient List<Object> domainEvents = new ArrayList();

    public AbstractAggregateRoot() {
    }

    protected <T> T registerEvent(T event) {
        Assert.notNull(event, "Domain event must not be null");
        this.domainEvents.add(event);
        return event;
    }

    @AfterDomainEventPublication
    protected void clearDomainEvents() {
        this.domainEvents.clear();
    }

    @DomainEvents
    protected Collection<Object> domainEvents() {
        return Collections.unmodifiableList(this.domainEvents);
    }

    protected final A andEventsFrom(A aggregate) {
        Assert.notNull(aggregate, "Aggregate must not be null");
        this.domainEvents.addAll(aggregate.domainEvents());
        return this;
    }

    protected final A andEvent(Object event) {
        this.registerEvent(event);
        return this;
    }
}

 

@Transient

- 영속성에 저장하지 않아야 하는 것을 나타내는 어노테이션이다. 이를 통해 데이터베이스에 해당 필드를 저장하지 않을 수 있다.

- 이를 붙이면 객체의 생명 주기 동안에만 존재하게 되는 일시적인 필드가 된다.

@AfterDomainEventPublication

- 이벤트 발행 이후에 특정 메서드를 실행하도록 지정하기 위해 사용된다.

registerEvent(T event)

- 도메인 이벤트를 등록하는 메서드이다.

- 넘겨받은 이벤트를 doaminEvents 라는 필드에 저장한다.

clearDomainEvents()

- 이베트를 발행하는 메서드의 실행이 완료된 이후에 호출된다.

- 이벤트 발행, 핸들러 처리 이후 이를 호출해서 이벤트를 지운다.

- 같은 이벤트가 두번 이상 발행되지 않도록 방지해주는 역할을 한다.

doaminEvents()

- 등록된 도메인 이벤트들을 반환한다.

- 수정할 수 없도록 unmodifiableList 를 활용한다.

@DomainEvents

- 도메인 이벤트를 발행하는 메서드를 지정하는 데에 사용된다.

- 해당 메서드가 호출될 때마다 자동으로 이벤트가 발행된다.

andEventsFrom(A aggregate)

- 다른 집합 루트로부터 도메인 이벤트를 추가하는 메서드이다.

- 새로운 도메인 이벤트를 가져와 현재 집합 루트의 리스트에 추가한다.

andEvent(Object event)

- 도메인 이벤트를 추가하는 메서드이다.

- 내부의 registerEvent 메서드를 호출해서 도메인 이벤트를 등록한다.

registerEvent 메서드와 함께 존재하는 이유

이벤트를 등록하는 동시에 현재 객체를 반환하기 때문에 메서드 체이닝이 가능해지기 때문이다.

빌더 패턴을 이용해 보다 간단하게 코드를 작성할 수 있게 된다.

@MappedSuperclass
@Getter
public class BaseEntity<T extends AbstractAggregateRoot<T>> extends AbstractAggregateRoot<T> {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        BaseEntity<?> that = (BaseEntity<?>) o;
        return id.equals(that.id);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id);
    }
}
@Entity
@Getter
@Table(name = "cheer_talks")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class CheerTalk extends BaseEntity<CheerTalk> {

    // .. 중략

    public CheerTalk(final String content, final Long gameTeamId) {
        this.createdAt = LocalDateTime.now();
        this.content = content;
        this.isBlocked = false;
        this.gameTeamId = gameTeamId;
        registerEvent(new CheerTalkCreateEvent(this));
    }
}

위처럼 CheerTalk 엔티티는 BaseEntity 를 상속하도록 했다. 또한, BaseEntity 는 AbstractAggregateRoot 를 상속하도록 했다.

CheerTalk 엔티티의 생성자에서 AbstractAggregateRoot 내부의 registerEvent 메서드를 호출한다.

여기서 파라미터로 이벤트 객체를 넘겨야 한다.

public record CheerTalkCreateEvent(CheerTalk cheerTalk) {
}

CheerTalkCreateEvent 라는 이벤트 객체를 생성해준다.

이 이벤트 객체를 핸들러에서 받아서 소켓으로 메시지의 내용을 전달해줘야 하므로, 필드로 CheerTalk 을 저장하도록 했다.

AbstractAggregateRoot 의 이벤트 발행

@Service
@Transactional
@RequiredArgsConstructor
public class CheerTalkService {
    public void register(final CheerTalkRequest cheerTalkRequest) {
        validateContent(cheerTalkRequest.content());
        CheerTalk cheerTalk = new CheerTalk(cheerTalkRequest.content(), cheerTalkRequest.gameTeamId());
        cheerTalkRepository.save(cheerTalk);
    }
}

AbstractAggregateRoot 를 이용하여 도메인 이벤트를 발행한 경우에는 명시적으로 .save() 가 호출이 되어야 이벤트가 발행된다.

그렇기에 위와 같이 코드를 작성해줬다.

👀AbstractAggregateRoot를 이용한 도메인 이벤트 발행 과정 뜯어보기

CheerTalkService 에서 해당 엔티티를 저장하고자 하면, 즉 영속화를 하고자 하면 그 때 도메인 이벤트가 발행된다. 도메인 이벤트가 발행 될 때는 EventPublishingRepositoryProxyPostProcessor 가 호출된다. 이는 레포지토리에서의 save 나 delete 메서드가 실행되면 이를 감지하여 호출을 가로챈다.

 

우선, 아래는 빈 초기화 단계 중 advice 를 추가하는 작업이 이뤄지는 과정이다.

public class EventPublishingRepositoryProxyPostProcessor implements RepositoryProxyPostProcessor {
    @Override
    public void postProcess(ProxyFactory factory, RepositoryInformation repositoryInformation) {

        EventPublishingMethod method = EventPublishingMethod.of(repositoryInformation.getDomainType());

        if (method == null) {
            return;
        }

        factory.addAdvice(new EventPublishingMethodInterceptor(method, publisher));
    }

    static class EventPublishingMethod {
        @Nullable
        public static EventPublishingMethod of(Class<?> type) {

            Assert.notNull(type, "Type must not be null");

            EventPublishingMethod eventPublishingMethod = cache.get(type);

            if (eventPublishingMethod != null) {
                return eventPublishingMethod.orNull();
            }

            EventPublishingMethod result = from(type, getDetector(type, DomainEvents.class),
                    () -> getDetector(type, AfterDomainEventPublication.class));

            cache.put(type, result);

            return result.orNull();
        }
        
        private static <T extends Annotation> AnnotationDetectionMethodCallback<T> getDetector(Class<?> type,
				Class<T> annotation) {

			AnnotationDetectionMethodCallback<T> callback = new AnnotationDetectionMethodCallback<T>(annotation);
			ReflectionUtils.doWithMethods(type, callback);

			return callback;
		}
    }
}

가장 먼저 EventPublishingRepositoryProxyPostProcessor 의 postProcess 메서드가 실행된다. 이 때, 내부의 정적 클래스인 EventPublishingMethod 의 of 를 호출한다. 이는 도메인의 타입을 이용해 이벤트 발행 관련 메서드를 확인하고 이를 처리할 수 있는 EventPublishingMethod 객체를 만들어 반환해준다. 이미 해당 타입에 대한 객체가 존재한다면, 캐시에서 가져와 반환한다. 존재하지 않는다면 from 메서드를 이용해 이벤트 발행 관련 메서드를 찾고, 이를 처리할 수 있는 EventPublishingMethod 객체를 생성해 캐시에 저장한다.

getDetector 은 AfterDomainEventPublication 이라는 어노테이션이 붙은 메서드를 찾아 반환해 파라미터로 넘길 수 있도록 한다. 이는 아래 코드에서도 볼 수 있듯 from 메서드에서 세번째 파라미터가 clearing, 즉 이벤트를 지우는 메서드를 받고 있기 때문에 해당 어노테이션이 붙은 메서드를 찾아 넣어준다. 이 부분으로 인해 앞서 언급한 @AfterDomainEventPublication 가 필요했던 것이다.

private static EventPublishingMethod from(Class<?> type, AnnotationDetectionMethodCallback<?> publishing,
        Supplier<AnnotationDetectionMethodCallback<?>> clearing) {

    if (!publishing.hasFoundAnnotation()) {
        return EventPublishingMethod.NONE;
    }

    Method eventMethod = publishing.getRequiredMethod();
    ReflectionUtils.makeAccessible(eventMethod);

    return new EventPublishingMethod(type, eventMethod, getClearingMethod(clearing.get()));
}

 

만약, 발행하는 어노테이션을 찾지 못하면 if 문과 같이 NONE 을 반환한다. 그러나 이를 찾는다면 eventMethod 라는 변수에 이를 담는다. 이후 마침내 EventPublishingMethod 를 만들어 반환한다.

public class EventPublishingRepositoryProxyPostProcessor implements RepositoryProxyPostProcessor {
    @Override
    public void postProcess(ProxyFactory factory, RepositoryInformation repositoryInformation) {

        EventPublishingMethod method = EventPublishingMethod.of(repositoryInformation.getDomainType());

        if (method == null) {
            return;
        }

        factory.addAdvice(new EventPublishingMethodInterceptor(method, publisher));
    }
}

다시 앞의 코드로 돌아가면, 앞의 과정을 거쳐 method 가 만들어지고 null 이 아니라면 advice 로 추가된다.

 

@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {

    Object result = invocation.proceed();

    if (!isEventPublishingMethod(invocation.getMethod())) { // 1
        return result;
    }

    Iterable<?> arguments = asCollection(invocation.getArguments()[0], invocation.getMethod());

    eventMethod.publishEventsFrom(arguments, publisher); // 2

    return result;
}

 

이렇게 advice 가 추가되었고, 실제로 save 메서드와 같이 레포지토리 메서드를 호출하게 되면 위 코드가 호출된다. 이 때 파라미터로 들어온 메서드의 이름이 이벤트를 발행해야 하는 메서드인지 아닌지 확인한다. 만약, 이벤트를 발행하지 않아도 되는 메서드라면 그대로 result 를 반환한다.

private static boolean isEventPublishingMethod(Method method) {
    return method.getParameterCount() == 1 //
            && (isSaveMethod(method.getName()) || isDeleteMethod(method.getName()));
}

private static boolean isSaveMethod(String methodName) {
    return methodName.startsWith("save");
}

private static boolean isDeleteMethod(String methodName) {
    return methodName.equals("delete") || methodName.equals("deleteAll") || methodName.equals("deleteInBatch")
            || methodName.equals("deleteAllInBatch");
}

 

이벤트를 발행해야 하는 메서드인지 아닌지의 기준은 다음과 같다.

1. 파라미터의 개수가 하나인가

2. save 로 메서드의 이름이 시작하는가

3. delete 가 포함되어 있는가

public void publishEventsFrom(Iterable<?> aggregates, ApplicationEventPublisher publisher) {
    for (Object aggregateRoot : aggregates) {

        if (!type.isInstance(aggregateRoot)) {
            continue;
        }

        for (Object event : asCollection(ReflectionUtils.invokeMethod(publishingMethod, aggregateRoot), null)) {
            publisher.publishEvent(event);
        }

        if (clearingMethod != null) {
            ReflectionUtils.invokeMethod(clearingMethod, aggregateRoot);
        }
    }
}

이벤트를 발행해야 한다고 판단이 되면, 이벤트를 발행하는 코드를 호출한다. 이는 반복문을 이용하여 이벤트 목록을 가져오고, publishEvent 메서드를 통해 발행을 한다. clearingMethod 가 null 이 아니라면 aggregateRoot 에서 이벤트를 지우는 작업까지 수행하고 종료한다.

이 과정을 통해 우리는 간단히 레포지토리 단의 메서드 (ex. save, delete) 를 호출함으로서 도메인 이벤트를 사용할 수 있게 된 것이다.

👀 EventHandler

@Component
@RequiredArgsConstructor
public class CheerTalkEventHandler {

    private static final String DESTINATION = "/topic/games/";
    private final EntityUtils entityUtils;
    private final SimpMessagingTemplate messagingTemplate;

    @TransactionalEventListener
    @Async("asyncThreadPool")
    public void handle(CheerTalkCreateEvent event) {

        CheerTalk cheerTalk = event.cheerTalk();
        GameTeam gameTeam = entityUtils.getEntity(cheerTalk.getGameTeamId(), GameTeam.class);
        //.. 중략
        );
    }
}

이제는 해당 이벤트가 발행되면 어떤 액션을 취해야 하는지를 정의해보자. 우리 서비스의 경우에는, 웹소켓을 이용해 메시지를 전송했기 때문에 이와 관련된 로직을 작성해줬다. 이에 대해서는 토픽에 벗어나니 설명에서 제외하겠다.

이전 게시글에서 설명한 @TransactionalEventListener 와 @Async 어노테이션을 붙여줬다. 이를 붙이면 기존의 트랜잭션이 커밋이 돼야 해당 이벤트 핸들링 로직이 실행된다. 또한 이는 비동기로서 동작하기 때문에 서로 다른 쓰레드에서 작동하게 된다. 이로 인해서 이벤트를 핸들링하는 도중에 에러가 발생해도 기존의 작업에는 영향을 주지 않는다. 뿐만 아니라 저장을 기다리고 있는 클라이언트는 이벤트 핸들링까지 기다릴 필요가 없다.

📝 참고 자료

https://msolo021015.medium.com/abstractaggregateroot%EC%99%80-jparepository-save-%EC%95%88%ED%8B%B0-%ED%8C%A8%ED%84%B4%EC%97%90-%EB%8C%80%ED%95%B4-ca81a8f2ce22

 

AbstractAggregateRoot와 JPARepository save() 안티 패턴에 대해

편의상 평어체로 쓰는 점 이해 부탁드립니다 :)

msolo021015.medium.com

https://jeong-pro.tistory.com/250

 

AbstractAggregateRoot의 동작 원리(with @PostUpdate로 맞이한 버그)

AbstractAggregationRoot 동작 원리 AbstractAggregateRoot는 DDD(Domain Driven Design)를 구현하기 편리하게 해 주는, 정확히는 도메인 이벤트를 등록하고 가져오기 편리하게 해주는 클래스 정도로 이해하고 있다.

jeong-pro.tistory.com

 

검색 태그