SendJobsRemindersUseCaseImpl.java

package com.wilzwert.myjobs.core.application.usecase;


import com.wilzwert.myjobs.core.domain.model.job.Job;
import com.wilzwert.myjobs.core.domain.model.job.ports.driven.JobDataManager;
import com.wilzwert.myjobs.core.domain.model.user.User;
import com.wilzwert.myjobs.core.domain.model.user.batch.UsersJobsRemindersBulkResult;
import com.wilzwert.myjobs.core.domain.model.user.ports.driven.JobReminderMessageProvider;
import com.wilzwert.myjobs.core.domain.model.user.ports.driven.UserDataManager;
import com.wilzwert.myjobs.core.domain.model.user.ports.driving.SendJobsRemindersUseCase;
import com.wilzwert.myjobs.core.domain.shared.bulk.BulkDataSaveResult;
import com.wilzwert.myjobs.core.domain.shared.collector.UsersJobsBatchCollector;
import com.wilzwert.myjobs.core.domain.shared.specification.DomainSpecification;

import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * @author Wilhelm Zwertvaegher
 */

public class SendJobsRemindersUseCaseImpl implements SendJobsRemindersUseCase {

    private final JobDataManager jobDataManager;

    private final UserDataManager userDataManager;

    private final JobReminderMessageProvider jobReminderMessageProvider;


    public SendJobsRemindersUseCaseImpl(JobDataManager jobDataManager, UserDataManager userDataManager, JobReminderMessageProvider jobReminderMessageProvider) {
        this.jobDataManager = jobDataManager;
        this.userDataManager = userDataManager;
        this.jobReminderMessageProvider = jobReminderMessageProvider;
    }

    /**
     * This is the main logic for sending jobs reminders to a users/jobs chunk :
     * - send a message by user through the JobReminderMessageProvider
     * - save the jobs with their new followUpReminderSentAt through the JobDataManager
     * - save the users with their new jobFollowUpReminderSentAt through the UserDataManager
     * A reference to this method is passed to the batch collector
     * This method could (should ?) be moved to a domain service, but for now it will do
     * @param usersToJobs map of user -> jobs, the relevant users and jobs
     * @return the results for this chunk
     */
    private UsersJobsRemindersBulkResult doSend(Map<User, Set<Job>> usersToJobs) {
        List<String> errors = new ArrayList<>();
        Set<User> usersToSave = new HashSet<>();
        int totalJobs = 0;
        for(Map.Entry<User, Set<Job>> entry : usersToJobs.entrySet()) {

            try {
                jobReminderMessageProvider.send(entry.getKey(), entry.getValue());
                usersToSave.add(entry.getKey());
                // we have to map the jobs Set because the saveFollowUpReminderSentAt method returns a copy of the Job
                Set<Job> jobsToSave = entry.getValue().stream().map(Job::saveFollowUpReminderSentAt).collect(Collectors.toSet());
                jobDataManager.saveAll(jobsToSave);
                totalJobs += jobsToSave.size();
            }
            catch (Exception e) {
                errors.add("An error occurred while sending reminders to "+entry.getKey()+": "+e.getMessage());
            }
        }
        // we have to map the users set because saveJobFollowUpReminderSentAt returns a copy of the User
        usersToSave = usersToSave.stream().map(User::saveJobFollowUpReminderSentAt).collect(Collectors.toSet());
        BulkDataSaveResult serviceResult = null;
        if(!usersToSave.isEmpty()) {
             serviceResult = userDataManager.saveAll(usersToSave);
        }
        int saveErrors = serviceResult != null ? serviceResult.totalCount()-serviceResult.updatedCount() : usersToSave.size();
        return new UsersJobsRemindersBulkResult(usersToJobs.size(), totalJobs, errors, errors.size(), saveErrors);
    }

    /**
     *  The entrypoint to this use case
     *  This method loads all the jobs that need reminders in a Stream, hen uses a custom collector which will iterate through
     *  the stream, chunk it and load the users through the provided callback (userDataManager.findMinimal...)
     *  and finally use a reference to this::doSend to actually send the reminders and collect and return the results
     * @param batchSize the chunk size passed by the infra ; as the infra knows what size can be handled
     * @return a list of results the infra may or may not use
     */
    @Override
    public List<UsersJobsRemindersBulkResult> sendJobsReminders(int batchSize) {
        // load jobs to remind...important : as UsersJobsBatchCollector expects Jobs to be pre-sorted by userId
        // the sort is configured in the DomainSpecification.JobFollowUpToRemind spec
        Stream<Job> jobsToRemind = jobDataManager.stream(DomainSpecification.JobFollowUpToRemind(Instant.now()));
        return jobsToRemind.collect(
                new UsersJobsBatchCollector<>(
                        userIds -> userDataManager.findMinimal(DomainSpecification.in("id", userIds)),
                        this::doSend,
                        batchSize
                )
        );
    }
}