DomainSpecificationConverter.java

package com.wilzwert.myjobs.infrastructure.persistence.mongo.service;

import com.wilzwert.myjobs.core.domain.model.job.JobId;
import com.wilzwert.myjobs.core.domain.model.job.JobStatus;
import com.wilzwert.myjobs.core.domain.model.user.UserId;
import com.wilzwert.myjobs.core.domain.shared.specification.DomainSpecification;
import com.wilzwert.myjobs.infrastructure.persistence.mongo.exception.UnsupportedDomainCriterionException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.aggregation.*;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.stereotype.Service;

import java.util.*;
import java.util.function.Function;

/**
 * @author Wilhelm Zwertvaegher
 * This converter should convert DomainSpecification types to Aggregation.Operation types
 */

@Service
@Slf4j
public class DomainSpecificationConverter {

    private static final Map<Class<?>, Function<DomainSpecification.FieldSpecificationWithValuesList<?>, List<?>>> valuesClassMap = Map.of(
        UserId.class, spec -> spec.getValues().stream().map(u -> ((UserId) u).value()).toList(),
        JobId.class, spec -> spec.getValues().stream().map(u -> ((JobId) u).value()).toList()
    );

    private static final Map<Class<?>, Function<DomainSpecification.FieldSpecificationWithSingleValue<?>, ?>> valueClassMap = Map.of(
            UserId.class, spec -> ((UserId) spec.getValue()).value(),
            JobId.class, spec -> ((JobId) spec.getValue()).value()
    );

    public List<AggregationOperation> convert(DomainSpecification specifications) {
        if (specifications == null) {
            return Collections.emptyList();
        }
        return domainSpecificationToAggregationOperation(specifications);
    }

    public String convertField(String field) {
        if("id".equals(field)) {
            return "_id";
        }
        return field.replaceAll("([a-z])([A-Z]+)", "$1_$2").toLowerCase();
    }

    private <S extends DomainSpecification.FieldSpecificationWithSingleValue<V>, V> Object convertValue(S spec) {
        Function<DomainSpecification.FieldSpecificationWithSingleValue<?>, ?> function = valueClassMap.get(spec.getValueClass());
        if(function == null) {
            return spec.getValue();
        }
        return function.apply(spec);

    }

    private <S extends DomainSpecification.FieldSpecificationWithValuesList<V>, V> List<?> convertValues(S spec) {
        Function<DomainSpecification.FieldSpecificationWithValuesList<?>, List<?>> function = valuesClassMap.get(spec.getValueClass());
        if(function == null) {
            return spec.getValues();
        }
        return function.apply(spec);
    }

    /**
     *
     * @param domainSpecification a query criterion received from the domain
     * @return a MongoDb Criteria
     */
    public Criteria domainCriterionToCriteria(DomainSpecification domainSpecification) {
        switch (domainSpecification) {
            case null -> {
                log.debug("criteria is null because domainSpecification is null");
                return null;
            }
            case DomainSpecification.Eq<?> c -> {
                log.debug("criteria is EQ for " + c.getField());
                return Criteria.where(convertField(c.getField())).is(convertValue(c));
            }
            case DomainSpecification.In<?> c -> {
                log.debug("criteria is In for {}", c.getField());
                log.debug("values {}", c.getValues());
                return Criteria.where(convertField(c.getField())).in(convertValues(c));
            }
            case DomainSpecification.Lt<?> c -> {
                log.debug("criteria is Lt for {}", c.getField());
                return Criteria.where(convertField(c.getField())).lt(convertValue(c));
            }
            case DomainSpecification.Or c -> {
                log.debug("criteria is Or");
                return new Criteria().orOperator(c.getSpecifications().stream().map(this::domainCriterionToCriteria).toList());
            }
            case DomainSpecification.And c -> {
                log.debug("criteria is And");
                return new Criteria().andOperator(c.getSpecifications().stream().map(this::domainCriterionToCriteria).toList());
            }
            default -> throw new UnsupportedDomainCriterionException(domainSpecification.getClass().getName());
        }
    }

    private AggregationOperation domainSpecificationSortToAggregationOperation(DomainSpecification.Sort sort) {
        return Aggregation.sort(sort.getSortDirection().equals(DomainSpecification.SortDirection.ASC) ? Sort.Direction.ASC : Sort.Direction.DESC, convertField(sort.getFieldName()));
    }

    /**
     *
     * @param domainSpecification a spec received from the domain
     * @return a List or AggregationOperation (match, lookup, sort...) to be used in an Aggregation pipeline
     */
    public  List<AggregationOperation> domainSpecificationToAggregationOperation(DomainSpecification domainSpecification) {
        if(domainSpecification instanceof DomainSpecification.Sort sort) {
            return List.of(domainSpecificationSortToAggregationOperation(sort));
        }

        List<AggregationOperation> result;
        if(domainSpecification instanceof DomainSpecification.FullSpecification fullSpecification) {
            result = domainSpecificationToAggregationOperation(fullSpecification);
        }
        else {
            result = new ArrayList<>(List.of(Aggregation.match(domainCriterionToCriteria(domainSpecification))));
        }

        result.addAll(domainSpecification.getSort().stream().map(this::domainSpecificationSortToAggregationOperation).toList());
        return result;
    }

    private List<AggregationOperation> domainSpecificationToAggregationOperation(DomainSpecification.FullSpecification fullDomainSpecification) {
        if(fullDomainSpecification instanceof DomainSpecification.UserJobFollowUpReminderThreshold userJobFollowUpReminderThreshold) {
            return domainSpecificationToAggregationOperation(userJobFollowUpReminderThreshold);
        }

        if(fullDomainSpecification instanceof DomainSpecification.JobFollowUpToRemind jobFollowUpToRemind ) {
            return domainSpecificationToAggregationOperation(jobFollowUpToRemind);
        }

        throw new UnsupportedDomainCriterionException(fullDomainSpecification.getClass().getName());

    }

    /**
     *  This converts a specific Criterion  : users who have not received any job follow-up reminders after
     *  a threshold instant calculated by Mongo, based on provided Instant and user's jobFollowUpReminderDelay
     * @param domainSpecification the Specification received from the domain
     * @return a List or AggregationOperation to be used in an Aggregation pipeline
     */
    public List<AggregationOperation> domainSpecificationToAggregationOperation(DomainSpecification.UserJobFollowUpReminderThreshold domainSpecification) {
    return new ArrayList<>(List.of(
                Aggregation.addFields()
                    .addFieldWithValue(
                            "jobFollowUpReminderThreshold",
                            ArithmeticOperators.Subtract.valueOf(domainSpecification.getReferenceInstant().toEpochMilli()).subtract(
                                    ArithmeticOperators.Multiply.valueOf(
                                            ConditionalOperators.ifNull("user.jobFollowUpReminderDays").then(0)
                                    )
                                    .multiplyBy(86400000)
                            )
                    ).build(),
                Aggregation.match(Criteria.where("jobFollowUpReminderSentAt").lt("jobFollowUpReminderThreshold"))
        ));
    }

    /**
     *  This converts a specific Criterion  : active jobs who have not been reminded after
     *  a threshold instant calculated by Mongo, based on provided Instant and user's jobFollowUpReminderDelay
     * @param domainSpecification the Specification received from the domain
     * @return a List or AggregationOperation to be used in an Aggregation pipeline
     */
    public List<AggregationOperation> domainSpecificationToAggregationOperation(DomainSpecification.JobFollowUpToRemind domainSpecification) {
        List<AggregationOperation> aggregationOperations = new ArrayList<>();
        // filter by status
        aggregationOperations.add(Aggregation.match(Criteria.where("status").in(JobStatus.activeStatuses())));

        // lookup users collection
        aggregationOperations.add(Aggregation.lookup().from("users").localField("user_id").foreignField("_id").as("user"));
        // unwind users
        aggregationOperations.add(Aggregation.unwind("user"));
        // filter users with no job_follow_up_reminder_days
        aggregationOperations.add(Aggregation.match(Criteria.where("user.job_follow_up_reminder_days").exists(true).ne(null)));

        // compute 'thresholdDate' based on provided instant and user.jobFollowUpReminderDelay
        String thresholdField = "job_follow_up_reminder_threshold";
        aggregationOperations.add( Aggregation.addFields()
                .addFieldWithValue(
                        thresholdField,
                        ArithmeticOperators.Subtract.valueOf(domainSpecification.getReferenceInstant().toEpochMilli()).subtract(
                                ArithmeticOperators.Multiply.valueOf("user.job_follow_up_reminder_days")
                                        .multiplyBy(86400000)
                        )
                ).build());

        // convert job's updatedAt Instant to a long (in millis)
        aggregationOperations.add(Aggregation.addFields()
                .addFieldWithValue("updated_at_millis", ConvertOperators.ToLong.toLong("$updated_at")).build());

        // filter by updatedAt (or statusUpdatedAt ?) < threshold
        aggregationOperations.add(Aggregation.match(
                Criteria.expr(
                    ComparisonOperators.Lte.valueOf("updated_at_millis").lessThanEqualTo(thresholdField))
        ));

        // convert job's followUpReminderSentAt Instant to a long (in millis)
        aggregationOperations.add(
            Aggregation.addFields()
                .addFieldWithValue(
                    "job_follow_up_reminder_sent_at_millis",
                        ConvertOperators.ToLong.toLong(
                            ConditionalOperators.ifNull("follow_up_reminder_sent_at").then(0)
                        )
            ).build()
        );

        // filter lastReminderSentAt < threshold to avoid multiple reminders
        aggregationOperations.add(Aggregation.match(
                Criteria.expr(
                        ComparisonOperators.Lte.valueOf("job_follow_up_reminder_sent_at_millis").lessThanEqualTo(thresholdField))
        ));

        return aggregationOperations;
    }
}