KafkaIntegrationEventProcessor.java

package com.wilzwert.myjobs.infrastructure.event.kafka;

import com.wilzwert.myjobs.core.domain.shared.event.integration.IntegrationEvent;
import com.wilzwert.myjobs.infrastructure.serialization.IntegrationEventSerializationHandler;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class KafkaIntegrationEventProcessor implements com.wilzwert.myjobs.infrastructure.event.IntegrationEventProcessor {

    private final KafkaTemplate<String, KafkaIntegrationEvent> kafkaTemplate;

    private final KafkaIntegrationEventTopicResolver topicResolver;

    private final IntegrationEventSerializationHandler serializationHandler;


    KafkaIntegrationEventProcessor(KafkaTemplate<String, KafkaIntegrationEvent> kafkaTemplate, KafkaIntegrationEventTopicResolver topicResolver, IntegrationEventSerializationHandler serializationHandler) {
        this.kafkaTemplate = kafkaTemplate;
        this.topicResolver = topicResolver;
        this.serializationHandler = serializationHandler;
    }

    @Override
    public IntegrationEvent process(@NonNull IntegrationEvent event) throws Exception {
        log.info("Sending event {}", event.getId().value().toString());
        KafkaIntegrationEvent kafkaEvent = new KafkaIntegrationEvent(event.getClass().getSimpleName(), serializationHandler.serialize(event));
        kafkaTemplate.send(topicResolver.resolve(event), event.getId().value().toString(), kafkaEvent).get();
        return event;
    }
}