-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
* minor fixes to upgrade path * Fixes for concurrency handling during upgrade * fix build failure --------- Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
- Loading branch information
1 parent
ec466e1
commit b44e19c
Showing
9 changed files
with
360 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
84 changes: 84 additions & 0 deletions
84
hudi-common/src/main/java/org/apache/hudi/client/transaction/lock/NoopLockProvider.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.hudi.client.transaction.lock; | ||
|
||
import org.apache.hudi.common.config.LockConfiguration; | ||
import org.apache.hudi.common.lock.LockProvider; | ||
import org.apache.hudi.common.util.StringUtils; | ||
import org.apache.hudi.storage.StorageConfiguration; | ||
|
||
import org.jetbrains.annotations.NotNull; | ||
|
||
import java.io.Serializable; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
|
||
/** | ||
* NoopLockProvider as the name suggests, is a no op lock provider. Any caller asking for a lock will be able to get hold of the lock. | ||
* This is not meant to be used a producation grade lock providers. This is meant to be used for Hudi's internal operations. | ||
* For eg: During upgrade, we have nested lock situations and we leverage this {@code NoopLockProvider} for any operations we | ||
* might want to do within the upgradeHandler blocks to avoid re-entrant situations. Not all lock providers might support re-entrancy and during upgrade, | ||
* it is expected to have a single writer to the Hudi table of interest. | ||
*/ | ||
public class NoopLockProvider implements LockProvider<ReentrantReadWriteLock>, Serializable { | ||
|
||
public NoopLockProvider(final LockConfiguration lockConfiguration, final StorageConfiguration<?> conf) { | ||
// no op. | ||
} | ||
|
||
@Override | ||
public boolean tryLock(long time, @NotNull TimeUnit unit) throws InterruptedException { | ||
return true; | ||
} | ||
|
||
@Override | ||
public void unlock() { | ||
// no op. | ||
} | ||
|
||
@Override | ||
public void lockInterruptibly() { | ||
// no op. | ||
} | ||
|
||
@Override | ||
public void lock() { | ||
// no op. | ||
} | ||
|
||
@Override | ||
public boolean tryLock() { | ||
return true; | ||
} | ||
|
||
@Override | ||
public ReentrantReadWriteLock getLock() { | ||
return new ReentrantReadWriteLock(); | ||
} | ||
|
||
@Override | ||
public String getCurrentOwnerLockInfo() { | ||
return StringUtils.EMPTY_STRING; | ||
} | ||
|
||
@Override | ||
public void close() { | ||
// no op. | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
145 changes: 145 additions & 0 deletions
145
...op-common/src/test/java/org/apache/hudi/client/transaction/lock/TestNoopLockProvider.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.hudi.client.transaction.lock; | ||
|
||
import org.apache.hudi.common.config.HoodieCommonConfig; | ||
import org.apache.hudi.common.config.LockConfiguration; | ||
import org.apache.hudi.common.config.TypedProperties; | ||
import org.apache.hudi.storage.StorageConfiguration; | ||
|
||
import org.junit.jupiter.api.Assertions; | ||
import org.junit.jupiter.api.Test; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
import static org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf; | ||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; | ||
|
||
/** | ||
* Tests {@code NoopLockProvider}. | ||
*/ | ||
public class TestNoopLockProvider { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(TestNoopLockProvider.class); | ||
private final StorageConfiguration<?> storageConf = getDefaultStorageConf(); | ||
private final LockConfiguration lockConfiguration1; | ||
private final LockConfiguration lockConfiguration2; | ||
|
||
public TestNoopLockProvider() { | ||
TypedProperties properties = new TypedProperties(); | ||
properties.put(HoodieCommonConfig.BASE_PATH.key(), "table1"); | ||
lockConfiguration1 = new LockConfiguration(properties); | ||
properties.put(HoodieCommonConfig.BASE_PATH.key(), "table2"); | ||
lockConfiguration2 = new LockConfiguration(properties); | ||
} | ||
|
||
@Test | ||
public void testLockAcquisition() { | ||
NoopLockProvider noopLockProvider = new NoopLockProvider(lockConfiguration1, storageConf); | ||
assertDoesNotThrow(() -> { | ||
noopLockProvider.lock(); | ||
}); | ||
assertDoesNotThrow(() -> { | ||
noopLockProvider.unlock(); | ||
}); | ||
} | ||
|
||
@Test | ||
public void testLockReAcquisitionBySameThread() { | ||
NoopLockProvider noopLockProvider = new NoopLockProvider(lockConfiguration1, storageConf); | ||
assertDoesNotThrow(() -> { | ||
noopLockProvider.lock(); | ||
}); | ||
assertDoesNotThrow(() -> { | ||
noopLockProvider.lock(); | ||
}); | ||
assertDoesNotThrow(() -> { | ||
noopLockProvider.unlock(); | ||
}); | ||
assertDoesNotThrow(() -> { | ||
noopLockProvider.lock(); | ||
}); | ||
} | ||
|
||
@Test | ||
public void testLockReAcquisitionBySameThreadWithTwoTables() { | ||
NoopLockProvider noopLockProvider1 = new NoopLockProvider(lockConfiguration1, storageConf); | ||
NoopLockProvider noopLockProvider2 = new NoopLockProvider(lockConfiguration2, storageConf); | ||
|
||
assertDoesNotThrow(() -> { | ||
noopLockProvider1.lock(); | ||
}); | ||
assertDoesNotThrow(() -> { | ||
noopLockProvider2.lock(); | ||
}); | ||
assertDoesNotThrow(() -> { | ||
noopLockProvider1.lock(); | ||
}); | ||
assertDoesNotThrow(() -> { | ||
noopLockProvider1.lock(); | ||
}); | ||
assertDoesNotThrow(() -> { | ||
noopLockProvider1.unlock(); | ||
}); | ||
assertDoesNotThrow(() -> { | ||
noopLockProvider2.unlock(); | ||
}); | ||
} | ||
|
||
@Test | ||
public void testLockReAcquisitionByDifferentThread() { | ||
NoopLockProvider noopLockProvider = new NoopLockProvider(lockConfiguration1, storageConf); | ||
final AtomicBoolean writer2Completed = new AtomicBoolean(false); | ||
|
||
// Main test thread | ||
assertDoesNotThrow(() -> { | ||
noopLockProvider.lock(); | ||
}); | ||
|
||
// Another writer thread in parallel, should be able to acquire the lock instantly | ||
Thread writer2 = new Thread(new Runnable() { | ||
@Override | ||
public void run() { | ||
assertDoesNotThrow(() -> { | ||
noopLockProvider.lock(); | ||
}); | ||
assertDoesNotThrow(() -> { | ||
noopLockProvider.unlock(); | ||
}); | ||
writer2Completed.set(true); | ||
} | ||
}); | ||
writer2.start(); | ||
|
||
assertDoesNotThrow(() -> { | ||
noopLockProvider.unlock(); | ||
}); | ||
|
||
try { | ||
writer2.join(); | ||
} catch (InterruptedException e) { | ||
// | ||
} | ||
Assertions.assertTrue(writer2Completed.get()); | ||
|
||
writer2.interrupt(); | ||
} | ||
} |
Oops, something went wrong.