From db83ac7246ed6fd183974eaf412847ac0c26b1ea Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Tue, 22 Sep 2015 14:20:29 -0700 Subject: [PATCH] Support multiple roles with mesos cluster mode. Conflicts: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- .../cluster/mesos/MesosClusterScheduler.scala | 58 ++++++++++--------- .../cluster/mesos/MesosSchedulerUtils.scala | 5 +- 2 files changed, 35 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 1206f184fbc82..3d4b597640cf0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -21,6 +21,7 @@ import java.io.File import java.util.concurrent.locks.ReentrantLock import java.util.{Collections, Date, List => JList} +import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -425,9 +426,13 @@ private[spark] class MesosClusterScheduler( options } - private class ResourceOffer(val offer: Offer, var cpu: Double, var mem: Double) { + private class ResourceOffer( + val offerId: OfferID, + val slaveId: SlaveID, + var resources: JList[Resource], + var used: Boolean) { override def toString(): String = { - s"Offer id: ${offer.getId.getValue}, cpu: $cpu, mem: $mem" + s"Offer id: ${offerId}, resources: ${resources}" } } @@ -439,34 +444,37 @@ private[spark] class MesosClusterScheduler( private def scheduleTasks( candidates: Seq[MesosDriverDescription], afterLaunchCallback: (String) => Boolean, - currentOffers: List[ResourceOffer], - tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): Unit = { + currentOffers: JList[ResourceOffer], + tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): JList[ResourceOffer] = { for (submission <- candidates) { val driverCpu = submission.cores val driverMem = submission.mem logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem") - val offerOption = currentOffers.find { o => - o.cpu >= driverCpu && o.mem >= driverMem + val offerOption = currentOffers.asScala.find { o => + getResource(o.resources, "cpus") >= driverCpu && + getResource(o.resources, "mem") >= driverMem } if (offerOption.isEmpty) { logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " + s"cpu: $driverCpu, mem: $driverMem") } else { val offer = offerOption.get - offer.cpu -= driverCpu - offer.mem -= driverMem + offer.used = true val taskId = TaskID.newBuilder().setValue(submission.submissionId).build() - val cpuResource = createResource("cpus", driverCpu) - val memResource = createResource("mem", driverMem) + val (remainingResources, cpuResourcesToUse) = + partitionResources(offer.resources, "cpus", driverCpu) + val (finalResources, memResourcesToUse) = + partitionResources(remainingResources, "mem", driverMem) val commandInfo = buildDriverCommand(submission) val appName = submission.schedulerProperties("spark.app.name") val taskInfo = TaskInfo.newBuilder() .setTaskId(taskId) .setName(s"Driver for $appName") - .setSlaveId(offer.offer.getSlaveId) + .setSlaveId(offer.slaveId) .setCommand(commandInfo) - .addResources(cpuResource) - .addResources(memResource) + .addAllResources(cpuResourcesToUse) + .addAllResources(memResourcesToUse) + offer.resources = finalResources submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image => val container = taskInfo.getContainerBuilder() val volumes = submission.schedulerProperties @@ -479,28 +487,28 @@ private[spark] class MesosClusterScheduler( container, image, volumes = volumes, portmaps = portmaps) taskInfo.setContainer(container.build()) } - val queuedTasks = tasks.getOrElseUpdate(offer.offer.getId, new ArrayBuffer[TaskInfo]) + val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo]) queuedTasks += taskInfo.build() - logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " + + logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " + submission.submissionId) - val newState = new MesosClusterSubmissionState(submission, taskId, offer.offer.getSlaveId, + val newState = new MesosClusterSubmissionState(submission, taskId, offer.slaveId, None, new Date(), None) launchedDrivers(submission.submissionId) = newState launchedDriversState.persist(submission.submissionId, newState) afterLaunchCallback(submission.submissionId) } } + currentOffers } override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = { - val currentOffers = offers.map { o => - new ResourceOffer( - o, getResource(o.getResourcesList, "cpus"), getResource(o.getResourcesList, "mem")) - }.toList - logTrace(s"Received offers from Mesos: \n${currentOffers.mkString("\n")}") + logTrace(s"Received offers from Mesos: \n${offers.asScala.mkString("\n")}") val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]() val currentTime = new Date() + var currentOffers = offers.asScala.map { + o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList, false) + }.toList.asJava stateLock.synchronized { // We first schedule all the supervised drivers that are ready to retry. // This list will be empty if none of the drivers are marked as supervise. @@ -508,14 +516,14 @@ private[spark] class MesosClusterScheduler( d.retryState.get.nextRetry.before(currentTime) } - scheduleTasks( + currentOffers = scheduleTasks( copyBuffer(driversToRetry), removeFromPendingRetryDrivers, currentOffers, tasks) // Then we walk through the queued drivers and try to schedule them. - scheduleTasks( + currentOffers = scheduleTasks( copyBuffer(queuedDrivers), removeFromQueuedDrivers, currentOffers, @@ -524,9 +532,7 @@ private[spark] class MesosClusterScheduler( tasks.foreach { case (offerId, tasks) => driver.launchTasks(Collections.singleton(offerId), tasks) } - offers - .filter(o => !tasks.keySet.contains(o.getId)) - .foreach(o => driver.declineOffer(o.getId)) + currentOffers.asScala.filter(!_.used).foreach(o => driver.declineOffer(o.offerId)) } private def copyBuffer( diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 5b854aa5c2754..7fed3cd8ea7ee 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos import java.util.{List => JList} import java.util.concurrent.CountDownLatch +import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal @@ -166,7 +167,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { def partitionResources( resources: JList[Resource], resourceName: String, - amountToUse: Double): (List[Resource], List[Resource]) = { + amountToUse: Double): (JList[Resource], JList[Resource]) = { var remain = amountToUse var requestedResources = new ArrayBuffer[Resource] val remainingResources = resources.map { @@ -189,7 +190,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { val filteredResources = remainingResources.filter(r => r.getType != Value.Type.SCALAR || r.getScalar.getValue > 0.0) - (filteredResources.toList, requestedResources.toList) + (filteredResources.toList.asJava, requestedResources.toList.asJava) } /** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */