@EmbeddedKafka
@SpringBootTest(properties = "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}")
public class ProductCreatedEventHandlerTest {
@MockBean
ProcessedEventRepository processedEventRepository;
@Autowired
KafkaTemplate<String, Object> kafkaTemplate;
@SpyBean // true object, its methods can be intercepted
ProductCreatedEventHandler productCreatedEventHandler;
@Test
public void testHandle() throws Exception {
// Arrange
ProductCreatedEvent event = new ProductCreatedEvent(UUID.randomUUID().toString(), "iPhone SE", BigDecimal.valueOf(24.5), 12);
ProducerRecord<String, Object> record = new ProducerRecord<>(PRODUCT_CREATED_EVT_TOPIC, event.getProductId(), event);
String messageId = UUID.randomUUID().toString();
record.headers().add("messageId", messageId.getBytes());
record.headers().add(KafkaHeaders.RECEIVED_KEY, event.getProductId().getBytes());
ProcessedEventEntity processedEventEntity = new ProcessedEventEntity();
when(processedEventRepository.findByMessageId(any())).thenReturn(processedEventEntity);
// Act
kafkaTemplate.send(record).get();
// Assert
ArgumentCaptor<String> messageIdCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> messageKeyCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<ProductCreatedEvent> eventCaptor = ArgumentCaptor.forClass(ProductCreatedEvent.class);
verify(productCreatedEventHandler, timeout(5000).times(1))
.handle(eventCaptor.capture(), messageIdCaptor.capture(), messageKeyCaptor.capture());
Assertions.assertEquals(messageId, messageIdCaptor.getValue());
Assertions.assertEquals(event.getProductId(), messageKeyCaptor.getValue());
Assertions.assertEquals(event.getProductId(), eventCaptor.getValue().getProductId());
}
}
BoomBleBee
Am deschis acest blog pentru aducere aminte.
31 octombrie 2024
Test de integrare pt Kafka Consumer
30 octombrie 2024
Test de integrare pt Kafka Producer
1. Testarea serviciului care trimite mesaje Kafka
package com.hanul.pis.ProductsMicroservice;
import com.hanul.pis.ProductsMicroservice.rest.NewProductDto;
import com.hanul.pis.ProductsMicroservice.service.ProductService;
import com.hanul.pis.core.event.ProductCreatedEvent;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.env.Environment;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ActiveProfiles;
import java.math.BigDecimal;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@DirtiesContext // it will corrupt the application context -> each test will start with a clean state
@TestInstance(TestInstance.Lifecycle.PER_CLASS) // one instance for all test methods, for expensive setup code
@ActiveProfiles("test") // look for application-test.properties
// use embedded Kafka server
@EmbeddedKafka (partitions = 3, count = 3, controlledShutdown = true)
@SpringBootTest(properties = "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}")
public class ProductServiceImplTest {
@Autowired
private ProductService productService;
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private Environment environment;
private KafkaMessageListenerContainer<String, ProductCreatedEvent> container;
private BlockingQueue<ConsumerRecord<String, ProductCreatedEvent>> records = new LinkedBlockingQueue<>();
@BeforeAll
void setUp() {
Map<String, Object> consumerProperties = getConsumerProperties();
DefaultKafkaConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);
ContainerProperties containerProperties = new ContainerProperties(environment.getProperty("product-created-evt-topic-name"));
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
container.setupMessageListener((MessageListener<String, ProductCreatedEvent>) records::add);
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
@AfterAll
void tearDown() {
container.stop();
}
@Test
void testCreateProduct_validProduct_success() throws Exception {
// Arrange
NewProductDto newProductDto = new NewProductDto();
newProductDto.setPrice(new BigDecimal(1200));
newProductDto.setQuantity(10);
newProductDto.setTitle("Philips monitor 40\"");
// Act
String productId = productService.createProduct(newProductDto);
// Assert
Assertions.assertNotNull(productId);
ConsumerRecord<String, ProductCreatedEvent> message = records.poll(3, TimeUnit.SECONDS);
Assertions.assertNotNull(message);
Assertions.assertNotNull(message.key());
ProductCreatedEvent event = message.value();
Assertions.assertNotNull(event);
Assertions.assertEquals(newProductDto.getTitle(), event.getTitle());
Assertions.assertEquals(newProductDto.getPrice(), event.getPrice());
Assertions.assertEquals(newProductDto.getQuantity(), event.getQuantity());
}
private Map<String, Object> getConsumerProperties() {
// Brokers' port numbers will dynamically change during executions of this test
// Therefore, they should be dynamically retrieved in configuration
return Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString(),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class,
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class,
ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("spring.kafka.consumer.group-id"),
JsonDeserializer.TRUSTED_PACKAGES, environment.getProperty("spring.kafka.consumer.properties.spring.json.trusted.packages"),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, environment.getProperty("spring.kafka.consumer.auto-offset-reset")
);
}
}
2. Testarea configurarilor - ex. ca producatorul este idempotent
package com.hanul.pis.ProductsMicroservice;
import com.hanul.pis.core.event.ProductCreatedEvent;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.Map;
@SpringBootTest
public class IdempotentProducerItTest { // it will test based on real configuration
@Autowired
KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate;
@Test
void test_enabledIdempotence() {
// Arrange
ProducerFactory<String, ProductCreatedEvent> producerFactory = kafkaTemplate.getProducerFactory();
// Act
Map<String, Object> config = producerFactory.getConfigurationProperties();
// Assert
Assertions.assertEquals("true", config.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
Assertions.assertTrue("all".equalsIgnoreCase((String) config.get(ProducerConfig.ACKS_CONFIG)));
if (config.containsKey(ProducerConfig.RETRIES_CONFIG)) {
Assertions.assertTrue(Integer.parseInt(config.get(ProducerConfig.RETRIES_CONFIG).toString()) > 0);
}
}
}
16 august 2024
Grind75
#57. Insert Interval (link)
class Solution {
public int[][] insert(int[][] intervals, int[] newInterval) {
int start = newInterval[0], stop = newInterval[1];
// case: newInterval contains all intervals
if (intervals.length == 0 || start <= intervals[0][0] && stop >= intervals[intervals.length-1][1]) {
int[][] result = new int[1][2];
result[0] = newInterval;
return result;
}
int[] insertion = new int[2];
// What should insertion[0] be?
insertion[0] = start;
if (start > intervals[0][0]) {
for (int i=0; i<intervals.length; i++) {
if (start >= intervals[i][0] && start <= intervals[i][1]) {
insertion[0] = intervals[i][0];
break;
}
}
}
// What should insertion[1] be?
insertion[1] = stop;
if (stop < intervals[intervals.length-1][1]) {
for (int i=0; i<intervals.length; i++) {
if (stop >= intervals[i][0] && stop <= intervals[i][1]) {
insertion[1] = intervals[i][1];
break;
}
}
}
// Place insertion
boolean added = false;
List<Integer[]> result = new ArrayList<>();
for (int i=0; i<intervals.length; i++) {
if (insertion[0] <= intervals[i][0]) {
addToList(insertion, result);
added = true;
while (i<intervals.length && intervals[i][1] <= insertion[1]) {
i++;
}
copyRest(i, intervals, result);
break;
} else {
addToList(intervals[i], result);
}
}
if (!added) {
// add at the end
addToList(insertion, result);
}
// List to arrays
int[][] array = new int[result.size()][2];
for (int i=0; i<result.size(); i++) {
array[i][0] = result.get(i)[0];
array[i][1] = result.get(i)[1];
}
return array;
}
private void copyRest(int from, int[][] intervals, List<Integer[]> list) {
for (int i=from; i<intervals.length; i++) {
addToList(intervals[i], list);
}
}
private void addToList(int[] interval, List<Integer[]> list) {
Integer[] pair = new Integer[2];
pair[0] = interval[0];
pair[1] = interval[1];
list.add(pair);
}
}
Pt. restul LINK
09 august 2024
Exemple Deadlock & Livelock
Deadlock
Cand doua sau mai multe fire de executie sunt blocate in asteptarea eliberarii unor resurse de care au nevoie. Solutie: how to prevent deadlock in Java.
private static void deadlock() {
final String res1 = "Resource_1";
final String res2 = "Resource_2";
Thread t1 = new Thread(() -> {
synchronized (res1) {
System.out.println("[t1] acquired access to res1");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (res2) {
System.out.println("[t1] Yay! escaped deadlock."); // not happening
}
}
});
Thread t2 = new Thread(() -> {
synchronized (res2) {
System.out.println("[t2] acquired access to res2");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (res1) {
System.out.println("[t2] Yay! escaped deadlock.");
}
}
});
t1.start();
t2.start();
}
Livelock
Doua sau mai multe fire de executie isi cedeaza dreptul de a rula in favoarea celorlalte astfel ajungand sa nu ruleze niciodata, iar aplicatia nu progreseaza. Solutie: schimbarea logicii.
static class Spoon {
Diner owner;
public Spoon (Diner firstOwner) {
this.owner = firstOwner;
}
synchronized void setOwner(Diner owner) {
this.owner = owner;
}
synchronized void use() {
System.out.println(owner.name + " just ate!");
}
}
static class Diner {
String name;
boolean isHungry;
public Diner (String name) {
this.name = name;
this.isHungry = true;
}
public void eatWith (Spoon spoon, Diner spouse) {
while (isHungry) {
if (spoon.owner != this) {
// wait for a while for the spoon to be released
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// after wait, try to give the spoon to the spouse if she's hungry
if (spouse.isHungry) {
System.out.println("[" + name + "] Eat, baby, eat!");
spoon.owner = spouse;
} else {
// finally
spoon.use();
isHungry = false;
System.out.println("[" + name + "] Finally ate!!!"); // never
spoon.owner = spouse;
}
}
}
}
private static void livelock() {
final Diner husband = new Diner("Adnan");
final Diner wife = new Diner("Hannan");
Spoon spoon = new Spoon(wife);
try {
new Thread(() -> husband.eatWith(spoon, wife)).start();
Thread.sleep(1500);
new Thread(() -> wife.eatWith(spoon, husband)).start();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
07 august 2024
Script bash pt build proiecte
./script.sh <nume branch>
branch="master"
if [ -n "$1" ]; then
branch=$1
fi
printf "\ngit pull from "$branch"\n\n"
currentDate=`date +"%Y-%m-%d-%H%M"`
filename="$currentDate.log"
touch $filename
for dir in `ls .`;
do
if [[ -d $dir ]]; then
echo $'\n'$dir
cd $dir
mvn clean &>> ../$filename
git checkout master
git pull
cd ..
echo $'\n' &>> $filename
fi
done
# legacy first
echo $'\n' >> $filename
for dir in `ls .`;
do
if [[ -d $dir && "$dir" == *"legacy"* ]]; then
echo $'\n'$dir
cd $dir
mvn install -DskipTests &>> ../$filename
cd ..
echo $'\n' &>> $filename
fi
done
printf "\n"
for dir in `ls .`;
do
if [[ -d $dir && "$dir" != *"legacy"* ]]; then
echo $'\n'$dir
cd $dir
mvn install -DskipTests &>> ../$filename
cd ..
echo $'\n' &>> $filename
fi
done
Abonați-vă la:
Postări (Atom)