Skip to content

Commit

Permalink
* thread: replace all synchronized with ReentrantLock/Condition, incl…
Browse files Browse the repository at this point in the history
…uding RateControl/ShutdownHandling/Test

Signed-off-by: neo <1100909+neowu@users.noreply.github.com>
  • Loading branch information
neowu committed Mar 22, 2024
1 parent 300dcdb commit 6f6f0a5
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 16 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

* mysql: updated and patched to 8.3.0, fixed CJException should be wrapped as SQLException
> make sure use "core.framework.mysql:mysql-connector-j:8.3.0-r2"
* rate: RateControl, replace synchronized with ReentrantLock
* thread: replace all synchronized with ReentrantLock/Condition, including RateControl/ShutdownHandling/Test

### 9.0.8 (1/29/2024 - 3/7/2024)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
import de.bwaldvogel.mongo.backend.memory.MemoryBackend;

import java.net.InetSocketAddress;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author neo
*/
public class TestMongoConfig extends MongoConfig {
private static final ReentrantLock LOCK = new ReentrantLock();

// only start one mongo server for testing to reduce resource overhead,
// only breaking case is that multiple mongo() using same collection name, then if one unit test operates both MongoCollection may result in conflict or merged results
// this can be avoided by designing test differently
Expand All @@ -27,13 +30,16 @@ protected void initialize(ModuleContext context, String name) {
}

private void startLocalMongoServer(ModuleContext context) {
synchronized (TestMongoConfig.class) {
LOCK.lock();
try {
// in test env, config is initialized in order and within same thread, so no threading issue
if (localMongoAddress == null) {
var server = new MongoServer(new MemoryBackend());
localMongoAddress = server.bind();
context.shutdownHook.add(ShutdownHook.STAGE_6, timeout -> server.shutdown());
}
} finally {
LOCK.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
import org.elasticsearch.common.logging.internal.LoggerFactoryImpl;
import org.elasticsearch.logging.internal.spi.LoggerFactory;

import java.util.concurrent.locks.ReentrantLock;

/**
* @author neo
*/
public class TestSearchConfig extends SearchConfig {
private static final ReentrantLock LOCK = new ReentrantLock();

// only start one local node for testing to reduce resource overhead,
// only breaking case is that multiple search() using same index name, then if one unit test operates both ElasticSearchType will result in conflict or merged results
// this can be avoided by designing test differently
Expand All @@ -26,13 +30,16 @@ protected void initialize(ModuleContext context, String name) {
}

private void startLocalElasticSearch(ModuleContext context) {
synchronized (TestSearchConfig.class) {
LOCK.lock();
try {
// in test env, config is initialized in order and within same thread, so no threading issue
if (localESHost == null) {
var server = new LocalElasticSearch();
localESHost = server.start();
context.shutdownHook.add(ShutdownHook.STAGE_6, timeout -> server.close());
}
} finally {
LOCK.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import static core.framework.log.Markers.errorCode;

Expand All @@ -46,9 +49,11 @@ class MessageListenerThread extends Thread {
private final Semaphore semaphore;
private final int concurrency;

private final Object lock = new Object();
private final ReentrantLock lock = new ReentrantLock();
private final Condition processingCondition = lock.newCondition();
private boolean processing;

private volatile boolean shutdown;
private volatile boolean processing;

MessageListenerThread(String name, Consumer<String, byte[]> consumer, MessageListener listener) {
super(name);
Expand All @@ -67,9 +72,16 @@ public void run() {
process();
} finally {
processing = false;
synchronized (lock) {
lock.notifyAll();
}
notifyProcessingCondition();
}
}

private void notifyProcessingCondition() {
lock.lock();
try {
processingCondition.notifyAll();
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -120,15 +132,18 @@ void shutdown() {

boolean awaitTermination(long timeoutInMs) throws InterruptedException {
long end = System.currentTimeMillis() + timeoutInMs;
synchronized (lock) {
lock.lock();
try {
while (processing) {
long left = end - System.currentTimeMillis();
if (left <= 0) {
return false;
}
lock.wait(left);
processingCondition.await(left, TimeUnit.MILLISECONDS);
}
return true;
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,21 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author neo
*/
public class ShutdownHandler implements ExchangeCompletionListener {
final Counter activeRequests = new Counter();

private final Logger logger = LoggerFactory.getLogger(ShutdownHandler.class);
private final Object lock = new Object();

private final ReentrantLock lock = new ReentrantLock();
private final Condition activeRequestCondition = lock.newCondition();

private volatile boolean shutdown;

boolean handle(HttpServerExchange exchange) {
Expand All @@ -41,15 +48,18 @@ void shutdown() {

boolean awaitTermination(long timeoutInMs) throws InterruptedException {
long end = System.currentTimeMillis() + timeoutInMs;
synchronized (lock) {
lock.lock();
try {
while (activeRequests.get() > 0) {
long left = end - System.currentTimeMillis();
if (left <= 0) {
return false;
}
lock.wait(left);
activeRequestCondition.await(left, TimeUnit.MILLISECONDS);
}
return true;
} finally {
lock.unlock();
}
}

Expand All @@ -58,12 +68,19 @@ public void exchangeEvent(HttpServerExchange exchange, NextListener next) {
try {
int count = activeRequests.decrease();
if (count <= 0 && shutdown) {
synchronized (lock) {
lock.notifyAll();
}
notifyActiveRequestCondition();
}
} finally {
next.proceed();
}
}

private void notifyActiveRequestCondition() {
lock.lock();
try {
activeRequestCondition.signalAll();
} finally {
lock.unlock();
}
}
}

0 comments on commit 6f6f0a5

Please sign in to comment.