IntegrationEventPublisherAdapter.java
package com.wilzwert.myjobs.infrastructure.persistence.mongo.service;
import com.wilzwert.myjobs.core.domain.shared.event.integration.IntegrationEvent;
import com.wilzwert.myjobs.core.domain.shared.event.integration.IntegrationEventId;
import com.wilzwert.myjobs.core.domain.shared.ports.driven.event.IntegrationEventPublisher;
import com.wilzwert.myjobs.infrastructure.event.IntegrationEventDataManager;
import com.wilzwert.myjobs.infrastructure.persistence.mongo.entity.EventStatus;
import com.wilzwert.myjobs.infrastructure.persistence.mongo.entity.MongoIntegrationEvent;
import com.wilzwert.myjobs.infrastructure.persistence.mongo.repository.MongoIntegrationEventRepository;
import com.wilzwert.myjobs.infrastructure.serialization.IntegrationEventSerializationHandler;
import com.wilzwert.myjobs.infrastructure.serialization.exception.SerializationException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;
/**
* @author Wilhelm Zwertvaegher
* Date:06/06/2025
* Time:16:18
*/
@Component
@Slf4j
public class IntegrationEventPublisherAdapter implements IntegrationEventPublisher, IntegrationEventDataManager {
private final MongoIntegrationEventRepository eventRepository;
private final IntegrationEventSerializationHandler integrationEventSerializationHandler;
IntegrationEventPublisherAdapter(
MongoIntegrationEventRepository mongoIntegrationEventRepository,
IntegrationEventSerializationHandler integrationEventSerializationHandler) {
this.eventRepository = mongoIntegrationEventRepository;
this.integrationEventSerializationHandler = integrationEventSerializationHandler;
}
@Override
public IntegrationEvent publish(IntegrationEvent event) {
try {
MongoIntegrationEvent mongoEvent = new MongoIntegrationEvent();
mongoEvent.setId(event.getId().value());
mongoEvent.setStatus(EventStatus.PENDING);
mongoEvent.setOccurredAt(event.getOccurredAt());
mongoEvent.setType(integrationEventSerializationHandler.getResolvableType(event));
mongoEvent.setPayload(integrationEventSerializationHandler.serialize(event));
eventRepository.save(mongoEvent);
return event;
}
catch (SerializationException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
@Override
public Optional<IntegrationEvent> findById(IntegrationEventId integrationEventId) {
return eventRepository.findById(integrationEventId.value()).map(e -> integrationEventSerializationHandler.readFromPayload(e.getType(), e.getPayload()));
}
@Override
public List<IntegrationEvent> findPending() {
return eventRepository.findByStatus(EventStatus.PENDING).stream().map(e -> integrationEventSerializationHandler.readFromPayload(e.getType(), e.getPayload())).toList();
}
@Override
public List<? extends IntegrationEvent> markAllAs(List<? extends IntegrationEvent> events, EventStatus status) {
log.info("Marking {} events as {}", events.size(), status);
eventRepository.saveAll(
eventRepository.findAllById(
events.stream().map(e -> e.getId().value()).toList()
)
.stream().map(e -> {e.setStatus(status); return e;})
.toList()
);
return events;
}
}