Skip to content

Commit

Permalink
Add unit tests for CollectionHandlePartitionMap.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 346358891
  • Loading branch information
yuangu-google authored and arcs-c3po committed Dec 14, 2020
1 parent c049e13 commit c34ad1c
Show file tree
Hide file tree
Showing 5 changed files with 344 additions and 24 deletions.
16 changes: 8 additions & 8 deletions java/arcs/core/allocator/Allocator.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
package arcs.core.allocator

import arcs.core.allocator.CollectionHandlePartitionMap.Companion.arcId
import arcs.core.common.ArcId
import arcs.core.common.Id
import arcs.core.common.toArcId
Expand All @@ -19,7 +20,7 @@ import arcs.core.entity.HandleSpec
import arcs.core.host.ArcHost
import arcs.core.host.ArcHostException
import arcs.core.host.ArcHostNotFoundException
import arcs.core.host.HandleManagerImpl
import arcs.core.host.HandleManager
import arcs.core.host.HostRegistry
import arcs.core.host.ParticleNotFoundException
import arcs.core.storage.CapabilitiesResolver
Expand Down Expand Up @@ -60,7 +61,7 @@ class Allocator(
* Stores the provided list of [Plan.Parition] for the provided [ArcId]. Existing values
* will be replaced.
*/
suspend fun set(arcId: ArcId, partitions: List<Plan.Partition>)
suspend fun set(partitions: List<Plan.Partition>)

/**
* Return the current list of [Plan.Partition] for the provided [ArcId]. If an Arc with
Expand Down Expand Up @@ -114,7 +115,7 @@ class Allocator(
val partitions = computePartitions(arcId, newPlan)
log.debug { "Computed partitions" }
// Store computed partitions for later
partitionMap.set(arcId, partitions)
partitionMap.set(partitions)
try {
startPlanPartitionsOnHosts(partitions)
return Arc(arcId, partitions, this, scope)
Expand Down Expand Up @@ -179,16 +180,16 @@ class Allocator(
companion object {
/**
* Creates an [Allocator] which serializes Arc/Particle state to the storage system backing
* the provided [handleManagerImpl].
* the provided [handleManager].
*/
fun create(
hostRegistry: HostRegistry,
handleManagerImpl: HandleManagerImpl,
handleManager: HandleManager,
scope: CoroutineScope
): Allocator {
return Allocator(
hostRegistry,
CollectionHandlePartitionMap(handleManagerImpl),
CollectionHandlePartitionMap(handleManager),
scope
)
}
Expand All @@ -210,10 +211,9 @@ class Allocator(
private val partitions = mutableMapOf<ArcId, List<Plan.Partition>>()

override suspend fun set(
arcId: ArcId,
partitions: List<Plan.Partition>
) = mutex.withLock {
this.partitions[arcId] = partitions
this.partitions[partitions.arcId()] = partitions
}

override suspend fun readPartitions(
Expand Down
38 changes: 22 additions & 16 deletions java/arcs/core/allocator/CollectionHandlePartitionMap.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package arcs.core.allocator

import arcs.core.common.ArcId
import arcs.core.common.SuspendableLazy
import arcs.core.common.toArcId
import arcs.core.data.CollectionType
import arcs.core.data.EntityType
import arcs.core.data.FieldType
Expand All @@ -16,7 +17,7 @@ import arcs.core.entity.EntityBaseSpec
import arcs.core.entity.HandleSpec
import arcs.core.entity.ReadWriteCollectionHandle
import arcs.core.entity.awaitReady
import arcs.core.host.HandleManagerImpl
import arcs.core.host.HandleManager
import arcs.core.storage.keys.RamDiskStorageKey
import arcs.core.storage.referencemode.ReferenceModeStorageKey
import arcs.core.util.TaggedLog
Expand All @@ -26,20 +27,20 @@ import kotlinx.coroutines.withContext

/**
* An implementation of [Allocator.PartitionSerialization] that stores partition information in an Arcs
* collection handle, created by the [HandleManagerImpl] provided at construction. The handle
* collection handle, created by the [HandleManager] provided at construction. The handle
* will be created the first time any of the publicly exposed methods is called.
*/
@OptIn(ExperimentalCoroutinesApi::class)
class CollectionHandlePartitionMap(
private val handleManagerImpl: HandleManagerImpl
private val handleManager: HandleManager
) : Allocator.PartitionSerialization {

private val log = TaggedLog { "CollectionHandlePartitionMap" }

@Suppress("UNCHECKED_CAST")
private val collection = SuspendableLazy {
val entitySpec = EntityBaseSpec(SCHEMA)
val handle = handleManagerImpl.createHandle(
val handle = handleManager.createHandle(
HandleSpec(
"partitions",
HandleMode.ReadWrite,
Expand All @@ -52,8 +53,9 @@ class CollectionHandlePartitionMap(
}

/** Persists [ArcId] and associated [Plan.Partition]s */
override suspend fun set(arcId: ArcId, partitions: List<Plan.Partition>) {
log.debug { "writePartitionMap(arcId=$arcId)" }
override suspend fun set(partitions: List<Plan.Partition>) {
val arcId = partitions.arcId()
log.debug { "writePartitionMap(arcId=${partitions.arcId()})" }

val writes = withContext(collection().dispatcher) {
partitions.map { (_, arcHost, particles) ->
Expand All @@ -72,16 +74,6 @@ class CollectionHandlePartitionMap(
writes.joinAll()
}

/** Converts a [RawEntity] to a [Plan.Partition] */
private fun entityToPartition(entity: EntityBase): Plan.Partition =
Plan.Partition(
entity.getSingletonValue("arc") as String,
entity.getSingletonValue("host") as String,
entity.getCollectionValue("particles").map {
Plan.Particle(it as String, "", mapOf())
}
)

/** Reads associated [PlanPartition]s with an [ArcId]. */
override suspend fun readPartitions(arcId: ArcId): List<Plan.Partition> =
entitiesForArc(arcId).map { entityToPartition(it) }
Expand All @@ -99,6 +91,16 @@ class CollectionHandlePartitionMap(
return entities.map { entityToPartition(it) }
}

/** Converts a [RawEntity] to a [Plan.Partition] */
private fun entityToPartition(entity: EntityBase): Plan.Partition =
Plan.Partition(
entity.getSingletonValue("arc") as String,
entity.getSingletonValue("host") as String,
entity.getCollectionValue("particles").map {
Plan.Particle(it as String, "", mapOf())
}
)

/** Looks up [RawEntity]s representing [PlanPartition]s for a given [ArcId] */
private suspend fun entitiesForArc(arcId: ArcId): List<EntityBase> {
return withContext(collection().dispatcher) {
Expand Down Expand Up @@ -126,5 +128,9 @@ class CollectionHandlePartitionMap(
RamDiskStorageKey("partition"),
RamDiskStorageKey("partitions")
)

fun List<Plan.Partition>.arcId(): ArcId {
return this.first().arcId.toArcId()
}
}
}
4 changes: 4 additions & 0 deletions java/arcs/core/data/Plan.kt
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,8 @@ data class Plan(
Plan(particles = f, handles = t.handles, annotations = t.annotations)
}
}

fun List<Partition>.arcId(): String {
return this.first().arcId
}
}
11 changes: 11 additions & 0 deletions javatests/arcs/core/allocator/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,19 @@ arcs_kt_jvm_test_suite(
package = "arcs.core.allocator",
deps = [
":allocator-test-util",
"//java/arcs/core/allocator",
"//java/arcs/core/common",
"//java/arcs/core/data",
"//java/arcs/core/entity",
"//java/arcs/core/host",
"//java/arcs/core/storage",
"//java/arcs/core/util",
"//java/arcs/core/util/testutil",
"//third_party/java/junit:junit-android",
"//third_party/java/truth:truth-android",
"//third_party/kotlin/kotlin:kotlin_test",
"//third_party/kotlin/kotlinx_coroutines",
"//third_party/kotlin/kotlinx_coroutines:kotlinx_coroutines_test",
],
)

Expand Down
Loading

0 comments on commit c34ad1c

Please sign in to comment.