1. Testarea serviciului care trimite mesaje Kafka
package com.hanul.pis.ProductsMicroservice;
import com.hanul.pis.ProductsMicroservice.rest.NewProductDto;
import com.hanul.pis.ProductsMicroservice.service.ProductService;
import com.hanul.pis.core.event.ProductCreatedEvent;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.env.Environment;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ActiveProfiles;
import java.math.BigDecimal;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@DirtiesContext // it will corrupt the application context -> each test will start with a clean state
@TestInstance(TestInstance.Lifecycle.PER_CLASS) // one instance for all test methods, for expensive setup code
@ActiveProfiles("test") // look for application-test.properties
// use embedded Kafka server
@EmbeddedKafka (partitions = 3, count = 3, controlledShutdown = true)
@SpringBootTest(properties = "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}")
public class ProductServiceImplTest {
@Autowired
private ProductService productService;
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private Environment environment;
private KafkaMessageListenerContainer<String, ProductCreatedEvent> container;
private BlockingQueue<ConsumerRecord<String, ProductCreatedEvent>> records = new LinkedBlockingQueue<>();
@BeforeAll
void setUp() {
Map<String, Object> consumerProperties = getConsumerProperties();
DefaultKafkaConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);
ContainerProperties containerProperties = new ContainerProperties(environment.getProperty("product-created-evt-topic-name"));
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
container.setupMessageListener((MessageListener<String, ProductCreatedEvent>) records::add);
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
@AfterAll
void tearDown() {
container.stop();
}
@Test
void testCreateProduct_validProduct_success() throws Exception {
// Arrange
NewProductDto newProductDto = new NewProductDto();
newProductDto.setPrice(new BigDecimal(1200));
newProductDto.setQuantity(10);
newProductDto.setTitle("Philips monitor 40\"");
// Act
String productId = productService.createProduct(newProductDto);
// Assert
Assertions.assertNotNull(productId);
ConsumerRecord<String, ProductCreatedEvent> message = records.poll(3, TimeUnit.SECONDS);
Assertions.assertNotNull(message);
Assertions.assertNotNull(message.key());
ProductCreatedEvent event = message.value();
Assertions.assertNotNull(event);
Assertions.assertEquals(newProductDto.getTitle(), event.getTitle());
Assertions.assertEquals(newProductDto.getPrice(), event.getPrice());
Assertions.assertEquals(newProductDto.getQuantity(), event.getQuantity());
}
private Map<String, Object> getConsumerProperties() {
// Brokers' port numbers will dynamically change during executions of this test
// Therefore, they should be dynamically retrieved in configuration
return Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString(),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class,
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class,
ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("spring.kafka.consumer.group-id"),
JsonDeserializer.TRUSTED_PACKAGES, environment.getProperty("spring.kafka.consumer.properties.spring.json.trusted.packages"),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, environment.getProperty("spring.kafka.consumer.auto-offset-reset")
);
}
}
2. Testarea configurarilor - ex. ca producatorul este idempotent
package com.hanul.pis.ProductsMicroservice;
import com.hanul.pis.core.event.ProductCreatedEvent;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.Map;
@SpringBootTest
public class IdempotentProducerItTest { // it will test based on real configuration
@Autowired
KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate;
@Test
void test_enabledIdempotence() {
// Arrange
ProducerFactory<String, ProductCreatedEvent> producerFactory = kafkaTemplate.getProducerFactory();
// Act
Map<String, Object> config = producerFactory.getConfigurationProperties();
// Assert
Assertions.assertEquals("true", config.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
Assertions.assertTrue("all".equalsIgnoreCase((String) config.get(ProducerConfig.ACKS_CONFIG)));
if (config.containsKey(ProducerConfig.RETRIES_CONFIG)) {
Assertions.assertTrue(Integer.parseInt(config.get(ProducerConfig.RETRIES_CONFIG).toString()) > 0);
}
}
}
 
Niciun comentariu:
Trimiteți un comentariu