public class RateLimiter {
private static final int ONE_SECOND = 1000;
private final Map<String, List<Long>> requests;
private boolean enabled = false;
private int requestsPerSecond;
public RateLimiter(Environment environment) {
requests = new HashMap<>();
if (environment.getProperty("enable.rate.limiter") != null) {
this.enabled = Boolean.parseBoolean(environment.getProperty("enable.rate.limiter"));
}
if (this.enabled) {
String property = environment.getProperty("max.requests.per.user.per.second");
Assert.notNull(property, "Max requests per second not defined!");
this.requestsPerSecond = Integer.parseInt(property);
}
}
public boolean allows(String ipAddress) {
if (!enabled) {
return true;
}
if (!requests.containsKey(ipAddress)) {
requests.put(ipAddress, new ArrayList<>());
}
Long now = System.currentTimeMillis();
cleanup(ipAddress, now);
requests.get(ipAddress).add(now);
return requests.get(ipAddress).size() <= requestsPerSecond;
}
private void cleanup(String ipAddress, Long now) {
List<Long> markedForDeletion = new ArrayList<>();
for (Long timestamp : requests.get(ipAddress)) {
if (now - timestamp > ONE_SECOND) {
markedForDeletion.add(timestamp);
}
}
requests.get(ipAddress).removeAll(markedForDeletion);
}
}
BoomBleBee
Am deschis acest blog pentru aducere aminte.
04 decembrie 2024
Implementarea unui Rate Limiter
31 octombrie 2024
Test de integrare pt Kafka Consumer
@EmbeddedKafka
@SpringBootTest(properties = "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}")
public class ProductCreatedEventHandlerTest {
@MockBean
ProcessedEventRepository processedEventRepository;
@Autowired
KafkaTemplate<String, Object> kafkaTemplate;
@SpyBean // true object, its methods can be intercepted
ProductCreatedEventHandler productCreatedEventHandler;
@Test
public void testHandle() throws Exception {
// Arrange
ProductCreatedEvent event = new ProductCreatedEvent(UUID.randomUUID().toString(), "iPhone SE", BigDecimal.valueOf(24.5), 12);
ProducerRecord<String, Object> record = new ProducerRecord<>(PRODUCT_CREATED_EVT_TOPIC, event.getProductId(), event);
String messageId = UUID.randomUUID().toString();
record.headers().add("messageId", messageId.getBytes());
record.headers().add(KafkaHeaders.RECEIVED_KEY, event.getProductId().getBytes());
ProcessedEventEntity processedEventEntity = new ProcessedEventEntity();
when(processedEventRepository.findByMessageId(any())).thenReturn(processedEventEntity);
// Act
kafkaTemplate.send(record).get();
// Assert
ArgumentCaptor<String> messageIdCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> messageKeyCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<ProductCreatedEvent> eventCaptor = ArgumentCaptor.forClass(ProductCreatedEvent.class);
verify(productCreatedEventHandler, timeout(5000).times(1))
.handle(eventCaptor.capture(), messageIdCaptor.capture(), messageKeyCaptor.capture());
Assertions.assertEquals(messageId, messageIdCaptor.getValue());
Assertions.assertEquals(event.getProductId(), messageKeyCaptor.getValue());
Assertions.assertEquals(event.getProductId(), eventCaptor.getValue().getProductId());
}
}
30 octombrie 2024
Test de integrare pt Kafka Producer
1. Testarea serviciului care trimite mesaje Kafka
package com.hanul.pis.ProductsMicroservice;
import com.hanul.pis.ProductsMicroservice.rest.NewProductDto;
import com.hanul.pis.ProductsMicroservice.service.ProductService;
import com.hanul.pis.core.event.ProductCreatedEvent;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.env.Environment;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ActiveProfiles;
import java.math.BigDecimal;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@DirtiesContext // it will corrupt the application context -> each test will start with a clean state
@TestInstance(TestInstance.Lifecycle.PER_CLASS) // one instance for all test methods, for expensive setup code
@ActiveProfiles("test") // look for application-test.properties
// use embedded Kafka server
@EmbeddedKafka (partitions = 3, count = 3, controlledShutdown = true)
@SpringBootTest(properties = "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}")
public class ProductServiceImplTest {
@Autowired
private ProductService productService;
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private Environment environment;
private KafkaMessageListenerContainer<String, ProductCreatedEvent> container;
private BlockingQueue<ConsumerRecord<String, ProductCreatedEvent>> records = new LinkedBlockingQueue<>();
@BeforeAll
void setUp() {
Map<String, Object> consumerProperties = getConsumerProperties();
DefaultKafkaConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);
ContainerProperties containerProperties = new ContainerProperties(environment.getProperty("product-created-evt-topic-name"));
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
container.setupMessageListener((MessageListener<String, ProductCreatedEvent>) records::add);
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
@AfterAll
void tearDown() {
container.stop();
}
@Test
void testCreateProduct_validProduct_success() throws Exception {
// Arrange
NewProductDto newProductDto = new NewProductDto();
newProductDto.setPrice(new BigDecimal(1200));
newProductDto.setQuantity(10);
newProductDto.setTitle("Philips monitor 40\"");
// Act
String productId = productService.createProduct(newProductDto);
// Assert
Assertions.assertNotNull(productId);
ConsumerRecord<String, ProductCreatedEvent> message = records.poll(3, TimeUnit.SECONDS);
Assertions.assertNotNull(message);
Assertions.assertNotNull(message.key());
ProductCreatedEvent event = message.value();
Assertions.assertNotNull(event);
Assertions.assertEquals(newProductDto.getTitle(), event.getTitle());
Assertions.assertEquals(newProductDto.getPrice(), event.getPrice());
Assertions.assertEquals(newProductDto.getQuantity(), event.getQuantity());
}
private Map<String, Object> getConsumerProperties() {
// Brokers' port numbers will dynamically change during executions of this test
// Therefore, they should be dynamically retrieved in configuration
return Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString(),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class,
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class,
ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("spring.kafka.consumer.group-id"),
JsonDeserializer.TRUSTED_PACKAGES, environment.getProperty("spring.kafka.consumer.properties.spring.json.trusted.packages"),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, environment.getProperty("spring.kafka.consumer.auto-offset-reset")
);
}
}
2. Testarea configurarilor - ex. ca producatorul este idempotent
package com.hanul.pis.ProductsMicroservice;
import com.hanul.pis.core.event.ProductCreatedEvent;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.Map;
@SpringBootTest
public class IdempotentProducerItTest { // it will test based on real configuration
@Autowired
KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate;
@Test
void test_enabledIdempotence() {
// Arrange
ProducerFactory<String, ProductCreatedEvent> producerFactory = kafkaTemplate.getProducerFactory();
// Act
Map<String, Object> config = producerFactory.getConfigurationProperties();
// Assert
Assertions.assertEquals("true", config.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
Assertions.assertTrue("all".equalsIgnoreCase((String) config.get(ProducerConfig.ACKS_CONFIG)));
if (config.containsKey(ProducerConfig.RETRIES_CONFIG)) {
Assertions.assertTrue(Integer.parseInt(config.get(ProducerConfig.RETRIES_CONFIG).toString()) > 0);
}
}
}
16 august 2024
Grind75
#57. Insert Interval (link)
class Solution {
public int[][] insert(int[][] intervals, int[] newInterval) {
int start = newInterval[0], stop = newInterval[1];
// case: newInterval contains all intervals
if (intervals.length == 0 || start <= intervals[0][0] && stop >= intervals[intervals.length-1][1]) {
int[][] result = new int[1][2];
result[0] = newInterval;
return result;
}
int[] insertion = new int[2];
// What should insertion[0] be?
insertion[0] = start;
if (start > intervals[0][0]) {
for (int i=0; i<intervals.length; i++) {
if (start >= intervals[i][0] && start <= intervals[i][1]) {
insertion[0] = intervals[i][0];
break;
}
}
}
// What should insertion[1] be?
insertion[1] = stop;
if (stop < intervals[intervals.length-1][1]) {
for (int i=0; i<intervals.length; i++) {
if (stop >= intervals[i][0] && stop <= intervals[i][1]) {
insertion[1] = intervals[i][1];
break;
}
}
}
// Place insertion
boolean added = false;
List<Integer[]> result = new ArrayList<>();
for (int i=0; i<intervals.length; i++) {
if (insertion[0] <= intervals[i][0]) {
addToList(insertion, result);
added = true;
while (i<intervals.length && intervals[i][1] <= insertion[1]) {
i++;
}
copyRest(i, intervals, result);
break;
} else {
addToList(intervals[i], result);
}
}
if (!added) {
// add at the end
addToList(insertion, result);
}
// List to arrays
int[][] array = new int[result.size()][2];
for (int i=0; i<result.size(); i++) {
array[i][0] = result.get(i)[0];
array[i][1] = result.get(i)[1];
}
return array;
}
private void copyRest(int from, int[][] intervals, List<Integer[]> list) {
for (int i=from; i<intervals.length; i++) {
addToList(intervals[i], list);
}
}
private void addToList(int[] interval, List<Integer[]> list) {
Integer[] pair = new Integer[2];
pair[0] = interval[0];
pair[1] = interval[1];
list.add(pair);
}
}
Pt. restul LINK
09 august 2024
Exemple Deadlock & Livelock
Deadlock
Cand doua sau mai multe fire de executie sunt blocate in asteptarea eliberarii unor resurse de care au nevoie. Solutie: how to prevent deadlock in Java.
private static void deadlock() {
final String res1 = "Resource_1";
final String res2 = "Resource_2";
Thread t1 = new Thread(() -> {
synchronized (res1) {
System.out.println("[t1] acquired access to res1");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (res2) {
System.out.println("[t1] Yay! escaped deadlock."); // not happening
}
}
});
Thread t2 = new Thread(() -> {
synchronized (res2) {
System.out.println("[t2] acquired access to res2");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (res1) {
System.out.println("[t2] Yay! escaped deadlock.");
}
}
});
t1.start();
t2.start();
}
Livelock
Doua sau mai multe fire de executie isi cedeaza dreptul de a rula in favoarea celorlalte astfel ajungand sa nu ruleze niciodata, iar aplicatia nu progreseaza. Solutie: schimbarea logicii.
static class Spoon {
Diner owner;
public Spoon (Diner firstOwner) {
this.owner = firstOwner;
}
synchronized void setOwner(Diner owner) {
this.owner = owner;
}
synchronized void use() {
System.out.println(owner.name + " just ate!");
}
}
static class Diner {
String name;
boolean isHungry;
public Diner (String name) {
this.name = name;
this.isHungry = true;
}
public void eatWith (Spoon spoon, Diner spouse) {
while (isHungry) {
if (spoon.owner != this) {
// wait for a while for the spoon to be released
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// after wait, try to give the spoon to the spouse if she's hungry
if (spouse.isHungry) {
System.out.println("[" + name + "] Eat, baby, eat!");
spoon.owner = spouse;
} else {
// finally
spoon.use();
isHungry = false;
System.out.println("[" + name + "] Finally ate!!!"); // never
spoon.owner = spouse;
}
}
}
}
private static void livelock() {
final Diner husband = new Diner("Adnan");
final Diner wife = new Diner("Hannan");
Spoon spoon = new Spoon(wife);
try {
new Thread(() -> husband.eatWith(spoon, wife)).start();
Thread.sleep(1500);
new Thread(() -> wife.eatWith(spoon, husband)).start();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
Abonați-vă la:
Postări (Atom)