JobDataManagerAdapter.java
package com.wilzwert.myjobs.infrastructure.persistence.mongo.service;
import com.mongodb.bulk.BulkWriteResult;
import com.wilzwert.myjobs.core.domain.model.activity.Activity;
import com.wilzwert.myjobs.core.domain.model.attachment.Attachment;
import com.wilzwert.myjobs.core.domain.model.job.Job;
import com.wilzwert.myjobs.core.domain.model.job.JobId;
import com.wilzwert.myjobs.core.domain.shared.pagination.DomainPage;
import com.wilzwert.myjobs.core.domain.model.user.UserId;
import com.wilzwert.myjobs.core.domain.model.job.ports.driven.JobDataManager;
import com.wilzwert.myjobs.core.domain.shared.bulk.BulkDataSaveResult;
import com.wilzwert.myjobs.core.domain.shared.specification.DomainSpecification;
import com.wilzwert.myjobs.infrastructure.persistence.mongo.entity.MongoJob;
import com.wilzwert.myjobs.infrastructure.persistence.mongo.mapper.JobMapper;
import com.wilzwert.myjobs.infrastructure.persistence.mongo.repository.MongoJobRepository;
import org.springframework.data.mongodb.core.BulkOperations;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.*;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* @author Wilhelm Zwertvaegher
*/
@Component
public class JobDataManagerAdapter implements JobDataManager {
private final MongoJobRepository mongoJobRepository;
private final JobMapper jobMapper;
private final AggregationService aggregationService;
private final MongoTemplate mongoTemplate;
public JobDataManagerAdapter(MongoJobRepository mongoJobRepository, JobMapper jobMapper, AggregationService aggregationService, MongoTemplate mongoTemplate) {
this.mongoJobRepository = mongoJobRepository;
this.jobMapper = jobMapper;
this.aggregationService = aggregationService;
this.mongoTemplate = mongoTemplate;
}
@Override
public Optional<Job> findById(JobId jobId) {
return mongoJobRepository.findById(jobId.value()).map(jobMapper::toDomain).or(Optional::empty);
}
@Override
public Optional<Job> findByUrlAndUserId(String url, UserId userId) {
return mongoJobRepository.findByUrlAndUserId(url, userId.value()).map(jobMapper::toDomain).or(Optional::empty);
}
@Override
public Optional<Job> findByIdAndUserId(JobId jobId, UserId userId) {
return mongoJobRepository.findByIdAndUserId(jobId.value(), userId.value()).map(jobMapper::toDomain).or(Optional::empty);
}
@Override
public DomainPage<Job> findPaginated(DomainSpecification specifications, int page, int size) {
Aggregation aggregation = aggregationService.createAggregationPaginated(specifications, page, size);
List<MongoJob> jobs = aggregationService.aggregate(aggregation, "jobs", MongoJob.class);
if(jobs.isEmpty()) {
return DomainPage.builder(this.jobMapper.toDomain(jobs)).totalElementsCount(0L).currentPage(page).pageSize(size).build();
}
long total = aggregationService.getAggregationCount(aggregation, "jobs");
return DomainPage.builder(this.jobMapper.toDomain(jobs)).totalElementsCount(total).currentPage(page).pageSize(size).build();
}
@Override
public Map<JobId, Job> findMinimal(DomainSpecification specification) {
Aggregation aggregation = aggregationService.createAggregation(specification);
return jobMapper.toDomain(aggregationService.aggregate(aggregation, "jobs", MongoJob.class))
.stream()
.collect(Collectors.toMap(Job::getId, job -> job));
}
@Override
public Stream<Job> stream(DomainSpecification specification) {
Aggregation aggregation = aggregationService.createAggregation(specification);
Stream<MongoJob> stream = aggregationService.stream(aggregation, "jobs", MongoJob.class);
return stream.map(jobMapper::toDomain).onClose(stream::close);
}
@Override
public Job save(Job job) {
return this.jobMapper.toDomain(mongoJobRepository.save(this.jobMapper.toEntity(job)));
}
@Override
public Job saveJobAndActivity(Job job, Activity activity) {
return jobMapper.toDomain(mongoJobRepository.save(jobMapper.toEntity(job)));
}
@Override
public Job saveJobAndAttachment(Job job, Attachment attachment, Activity activity) {
return jobMapper.toDomain(mongoJobRepository.save(jobMapper.toEntity(job)));
}
@Override
public void delete(Job job) {
// attachments, activities are nested mongo collections, so there's nothing more to do that delete the job
mongoJobRepository.delete(jobMapper.toEntity(job));
}
@Override
public Job deleteAttachmentAndSaveJob(Job job, Attachment attachment, Activity activity) {
// attachments and activities are nested collections, nothing more to do
return jobMapper.toDomain(mongoJobRepository.save(jobMapper.toEntity(job)));
}
// TODO : tests (dont forget to test with empty set)
@Override
public BulkDataSaveResult saveAll(Set<Job> jobs) {
// we chose to throw an exception because it seems like something went wrong if someone tries to save an empty set
if(jobs.isEmpty()) {
throw new IllegalArgumentException("jobs must not be empty");
}
BulkOperations bulkOps = mongoTemplate.bulkOps(BulkOperations.BulkMode.ORDERED, MongoJob.class);
List<MongoJob> mongoJobs = jobMapper.toEntity(jobs.stream().toList());
for(MongoJob job : mongoJobs) {
Update update = new Update();
update.set("followUpReminderSentAt", job.getFollowUpReminderSentAt());
bulkOps.updateOne(Query.query(Criteria.where("_id").is(job.getId())), update);
}
BulkWriteResult result = bulkOps.execute();
return new BulkDataSaveResult(jobs.size(), result.getModifiedCount(), result.getInsertedCount(), result.getDeletedCount());
}
}