UsersJobsBatchCollector.java
package com.wilzwert.myjobs.core.domain.shared.collector;
import com.wilzwert.myjobs.core.domain.model.job.Job;
import com.wilzwert.myjobs.core.domain.model.user.User;
import com.wilzwert.myjobs.core.domain.model.user.UserId;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;
/**
* @author Wilhelm Zwertvaegher
* A collector for batch operations on Users and Jobs, as in "for each User we have a list of Jobs"
* This is why the bat chProcessing function expects a Map<User, SortedSet<Job>> parameter
* Please note that the batchSize concerns users, e.g, batchSize users will be handled, whatever number of Jobs each have
* Warning : Jobs list MUST be sorted by userId BEFORE using this collector.
* This is very important because otherwise the appropriate grouping user -> jobs would not be guaranteed
*/
public class UsersJobsBatchCollector<T> implements Collector<Job, Map<UserId, SortedSet<Job>>, List<T>> {
/**
* The function used to find users based on a list of UserId
* This must be passed as a constructor parameter when instantiating this Collector
*/
private final Function<List<UserId>, Map<UserId, User>> findUsersFunction;
/**
* The function used to do the batch processing
*/
private final Function<Map<User, Set<Job>>, T> batchProcessing;
/**
* Use for state keeping while accumulating
*/
private UserId currentUserId;
/**
* Size of the users chunk passed to the batch processing
*/
private final int batchSize;
/**
* The results of the batch processing
*/
private final List<T> results = new ArrayList<>();
public UsersJobsBatchCollector(Function<List<UserId>, Map<UserId, User>> findUsersFunction, Function<Map<User, Set<Job>>, T> batchProcessing, int batchSize) {
this.findUsersFunction = findUsersFunction;
this.batchProcessing = batchProcessing;
this.batchSize = batchSize;
}
@Override
public Supplier<Map<UserId, SortedSet<Job>>> supplier() {
return HashMap::new;
}
private Map<User, Set<Job>> load(Map<UserId, SortedSet<Job>> userIdsToJobs) {
Map<UserId, User> users = findUsersFunction.apply(userIdsToJobs.keySet().stream().toList());
return users
.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getValue, e -> userIdsToJobs.get(e.getKey())));
}
@Override
public BiConsumer<Map<UserId, SortedSet<Job>>, Job> accumulator() {
return (userIdsToJobs, job) -> {
// detecting a change of user in the stream triggers the processing
// (only after at least first job / user has been handled)
if(currentUserId != null && !job.getUserId().equals(currentUserId) && userIdsToJobs.size() >= batchSize) {
// pass a copy of the current supplier to the batchProcessing
final Map<UserId, SortedSet<Job>> copy = new HashMap<>(userIdsToJobs);
results.add(batchProcessing.apply(load(copy)));
// reset current supplier
userIdsToJobs.clear();
}
userIdsToJobs.computeIfAbsent(job.getUserId(), k -> new TreeSet<>(Comparator.comparing(Job::getUpdatedAt))).add(job);
currentUserId = job.getUserId();
};
}
@Override
public BinaryOperator<Map<UserId, SortedSet<Job>>> combiner() {
return (set1, set2) -> { throw new UnsupportedOperationException("Parallel processing is not supported"); };
}
@Override
public Function<Map<UserId, SortedSet<Job>>, List<T>> finisher() {
return userIdsToJobs -> {
if(!userIdsToJobs.isEmpty()) {
// userIdsToJobs not cleared, apply batch processing to the remaining entries
results.add(batchProcessing.apply(load(userIdsToJobs)));
}
return results;
};
}
@Override
public Set<Characteristics> characteristics() {
return Set.of();
}
}