IntegrationEventReader.java
package com.wilzwert.myjobs.infrastructure.event;
import com.wilzwert.myjobs.core.domain.shared.event.integration.IntegrationEvent;
import com.wilzwert.myjobs.infrastructure.persistence.mongo.entity.EventStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemReader;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@Component
@Slf4j
public class IntegrationEventReader implements ItemReader<IntegrationEvent> {
private final IntegrationEventDataManager dataManager;
private Iterator<IntegrationEvent> currentBatchIterator = Collections.emptyIterator();
public IntegrationEventReader(IntegrationEventDataManager eventDataManager) {
this.dataManager = eventDataManager;
}
@Override
public IntegrationEvent read() {
if (!currentBatchIterator.hasNext()) {
log.info("Finding pending integration events");
List<IntegrationEvent> batch = dataManager.findPending();
if (batch.isEmpty()) {
return null; // end batch
}
dataManager.markAllAs(batch, EventStatus.IN_PROGRESS);
currentBatchIterator = batch.iterator();
log.info("Found {} pending integration events", batch.size());
}
return currentBatchIterator.next();
}
}