Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Support multiple roles with mesos cluster mode.
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
  • Loading branch information
tnachen authored and Michael Gummelt committed Oct 7, 2015
1 parent 908e37b commit db83ac7
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File
import java.util.concurrent.locks.ReentrantLock
import java.util.{Collections, Date, List => JList}

import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -425,9 +426,13 @@ private[spark] class MesosClusterScheduler(
options
}

private class ResourceOffer(val offer: Offer, var cpu: Double, var mem: Double) {
private class ResourceOffer(
val offerId: OfferID,
val slaveId: SlaveID,
var resources: JList[Resource],
var used: Boolean) {
override def toString(): String = {
s"Offer id: ${offer.getId.getValue}, cpu: $cpu, mem: $mem"
s"Offer id: ${offerId}, resources: ${resources}"
}
}

Expand All @@ -439,34 +444,37 @@ private[spark] class MesosClusterScheduler(
private def scheduleTasks(
candidates: Seq[MesosDriverDescription],
afterLaunchCallback: (String) => Boolean,
currentOffers: List[ResourceOffer],
tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): Unit = {
currentOffers: JList[ResourceOffer],
tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): JList[ResourceOffer] = {
for (submission <- candidates) {
val driverCpu = submission.cores
val driverMem = submission.mem
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
val offerOption = currentOffers.find { o =>
o.cpu >= driverCpu && o.mem >= driverMem
val offerOption = currentOffers.asScala.find { o =>
getResource(o.resources, "cpus") >= driverCpu &&
getResource(o.resources, "mem") >= driverMem
}
if (offerOption.isEmpty) {
logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
s"cpu: $driverCpu, mem: $driverMem")
} else {
val offer = offerOption.get
offer.cpu -= driverCpu
offer.mem -= driverMem
offer.used = true
val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
val cpuResource = createResource("cpus", driverCpu)
val memResource = createResource("mem", driverMem)
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.resources, "cpus", driverCpu)
val (finalResources, memResourcesToUse) =
partitionResources(remainingResources, "mem", driverMem)
val commandInfo = buildDriverCommand(submission)
val appName = submission.schedulerProperties("spark.app.name")
val taskInfo = TaskInfo.newBuilder()
.setTaskId(taskId)
.setName(s"Driver for $appName")
.setSlaveId(offer.offer.getSlaveId)
.setSlaveId(offer.slaveId)
.setCommand(commandInfo)
.addResources(cpuResource)
.addResources(memResource)
.addAllResources(cpuResourcesToUse)
.addAllResources(memResourcesToUse)
offer.resources = finalResources
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
val container = taskInfo.getContainerBuilder()
val volumes = submission.schedulerProperties
Expand All @@ -479,43 +487,43 @@ private[spark] class MesosClusterScheduler(
container, image, volumes = volumes, portmaps = portmaps)
taskInfo.setContainer(container.build())
}
val queuedTasks = tasks.getOrElseUpdate(offer.offer.getId, new ArrayBuffer[TaskInfo])
val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
queuedTasks += taskInfo.build()
logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
submission.submissionId)
val newState = new MesosClusterSubmissionState(submission, taskId, offer.offer.getSlaveId,
val newState = new MesosClusterSubmissionState(submission, taskId, offer.slaveId,
None, new Date(), None)
launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState)
afterLaunchCallback(submission.submissionId)
}
}
currentOffers
}

override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = {
val currentOffers = offers.map { o =>
new ResourceOffer(
o, getResource(o.getResourcesList, "cpus"), getResource(o.getResourcesList, "mem"))
}.toList
logTrace(s"Received offers from Mesos: \n${currentOffers.mkString("\n")}")
logTrace(s"Received offers from Mesos: \n${offers.asScala.mkString("\n")}")
val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]()
val currentTime = new Date()

var currentOffers = offers.asScala.map {
o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList, false)
}.toList.asJava
stateLock.synchronized {
// We first schedule all the supervised drivers that are ready to retry.
// This list will be empty if none of the drivers are marked as supervise.
val driversToRetry = pendingRetryDrivers.filter { d =>
d.retryState.get.nextRetry.before(currentTime)
}

scheduleTasks(
currentOffers = scheduleTasks(
copyBuffer(driversToRetry),
removeFromPendingRetryDrivers,
currentOffers,
tasks)

// Then we walk through the queued drivers and try to schedule them.
scheduleTasks(
currentOffers = scheduleTasks(
copyBuffer(queuedDrivers),
removeFromQueuedDrivers,
currentOffers,
Expand All @@ -524,9 +532,7 @@ private[spark] class MesosClusterScheduler(
tasks.foreach { case (offerId, tasks) =>
driver.launchTasks(Collections.singleton(offerId), tasks)
}
offers
.filter(o => !tasks.keySet.contains(o.getId))
.foreach(o => driver.declineOffer(o.getId))
currentOffers.asScala.filter(!_.used).foreach(o => driver.declineOffer(o.offerId))
}

private def copyBuffer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos
import java.util.{List => JList}
import java.util.concurrent.CountDownLatch

import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
Expand Down Expand Up @@ -166,7 +167,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
def partitionResources(
resources: JList[Resource],
resourceName: String,
amountToUse: Double): (List[Resource], List[Resource]) = {
amountToUse: Double): (JList[Resource], JList[Resource]) = {
var remain = amountToUse
var requestedResources = new ArrayBuffer[Resource]
val remainingResources = resources.map {
Expand All @@ -189,7 +190,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
val filteredResources =
remainingResources.filter(r => r.getType != Value.Type.SCALAR || r.getScalar.getValue > 0.0)

(filteredResources.toList, requestedResources.toList)
(filteredResources.toList.asJava, requestedResources.toList.asJava)
}

/** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */
Expand Down

0 comments on commit db83ac7

Please sign in to comment.