Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Support Scala 2.12 #724

Merged
merged 14 commits into from
Jul 25, 2017
10 changes: 5 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ env:

matrix:
include:
- scala: 2.10.6
jdk: oraclejdk8
script: ./sbt -Dlog4j.configuration=$LOG4J -DsequentialExecution=true ++$TRAVIS_SCALA_VERSION test mimaReportBinaryIssues

- scala: 2.11.8
- scala: 2.11.11
jdk: oraclejdk8
script: ./sbt -Dlog4j.configuration=$LOG4J -DsequentialExecution=true ++$TRAVIS_SCALA_VERSION clean coverage test coverageReport mimaReportBinaryIssues
after_success:
- bash <(curl -s https://codecov.io/bash)

- scala: 2.12.2
jdk: oraclejdk8
script: ./sbt -Dlog4j.configuration=$LOG4J -DsequentialExecution=true ++$TRAVIS_SCALA_VERSION clean test

cache:
directories:
- $HOME/.sbt/0.13/dependency
Expand Down
23 changes: 10 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings
import sbtassembly.Plugin._

def scalaBinaryVersion(scalaVersion: String) = scalaVersion match {
case version if version startsWith "2.10" => "2.10"
case version if version startsWith "2.11" => "2.11"
case version if version startsWith "2.12" => "2.12"
case _ => sys.error("Unsupported scala version: " + scalaVersion)
Expand All @@ -15,25 +14,23 @@ def isScala210x(scalaVersion: String) = scalaBinaryVersion(scalaVersion) == "2.1
def sequentialExecution: Boolean =
Option(System.getProperty("sequentialExecution")).map(_.toBoolean).getOrElse(false)

val algebirdVersion = "0.12.0"
val bijectionVersion = "0.9.1"
val chillVersion = "0.8.3"
val commonsHttpClientVersion = "3.1"
val algebirdVersion = "0.13.0"
val bijectionVersion = "0.9.5"
val chillVersion = "0.8.4"
val commonsLangVersion = "2.6"
val finagleVersion = "6.35.0"
val hadoopVersion = "1.2.1"
val junitVersion = "4.11"
val log4jVersion = "1.2.16"
val novocodeJunitVersion = "0.10"
val scalaCheckVersion = "1.13.4"
val scalatestVersion = "3.0.1"
val scaldingVersion = "0.16.1-RC3"
val scaldingVersion = "0.17.2"
val slf4jVersion = "1.6.6"
val storehausVersion = "0.15.0-RC1"
val storehausVersion = "0.15.0"
val stormDep = "org.apache.storm" % "storm-core" % "1.0.2"
val tormentaVersion = "0.12.0"
val utilVersion = "6.34.0"
val chainVersion = "0.1.0"
val utilVersion = "6.43.0"
val chainVersion = "0.2.0"

val extraSettings = mimaDefaultSettings

Expand All @@ -50,8 +47,8 @@ val executionSettings = if (sequentialExecution) {

val sharedSettings = extraSettings ++ executionSettings ++ Seq(
organization := "com.twitter",
scalaVersion := "2.11.7",
crossScalaVersions := Seq("2.10.5", "2.11.7"),
scalaVersion := "2.11.11",
crossScalaVersions := Seq("2.11.11", "2.12.2"),
// To support hadoop 1.x
javacOptions ++= Seq("-source", "1.6", "-target", "1.6"),

Expand Down Expand Up @@ -188,7 +185,7 @@ def youngestForwardCompatible(subProj: String) =
// Uncomment after release.
// Some(subProj)
// .filterNot(unreleasedModules.contains(_))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we delete these commented lines?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, we need to uncomment after each version bump. Deleting seems like a good way to keep making mima mistakes like the scalding one.

// .map { s => "com.twitter" % ("summingbird-" + s + "_2.10") % "0.9.0" }
// .map { s => "com.twitter" % ("summingbird-" + s + "_2.11") % "0.9.0" }

/**
* Empty this each time we publish a new version (and bump the minor number)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class HDFSStateLaws extends WordSpec {
}
}

def leftClosedRightOpenInterval(low: Timestamp, high: Timestamp) = Interval.leftClosedRightOpen[Timestamp](low, high).right.get
def leftClosedRightOpenInterval(low: Timestamp, high: Timestamp): Interval[Timestamp] =
Interval.leftClosedRightOpen[Timestamp](low, high)

def shouldNotAcceptInterval(state: WaitingState[Interval[Timestamp]], interval: Interval[Timestamp], message: String = "PreparedState accepted a bad Interval!") = {
state.begin.willAccept(interval) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,15 @@ object Timestamp {
implicit val timestamp2Long: Bijection[Timestamp, Long] =
Bijection.build[Timestamp, Long] { _.milliSinceEpoch } { Timestamp(_) }

implicit val timestampSuccessible: Successible[Timestamp] = new Successible[Timestamp] {
// Workaround for https://github.com/twitter/algebird/issues/635
implicit val timestampSuccessible: Successible[Timestamp] = new Successible[Timestamp] with Serializable {
def next(old: Timestamp) = if (old.milliSinceEpoch != Long.MaxValue) Some(old.next) else None
def ordering: Ordering[Timestamp] = Timestamp.orderingOnTimestamp
def partialOrdering = Timestamp.orderingOnTimestamp
}

implicit val timestampPredecessible: Predecessible[Timestamp] = new Predecessible[Timestamp] {
def prev(old: Timestamp) = if (old.milliSinceEpoch != Long.MinValue) Some(old.prev) else None
def ordering: Ordering[Timestamp] = Timestamp.orderingOnTimestamp
def partialOrdering = Timestamp.orderingOnTimestamp
}

// This is a right semigroup, that given any two Timestamps just take the one on the right.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private[scalding] class VersionedState(meta: HDFSMetadata, startDate: Option[Tim
Interval.leftClosedRightOpen(
batcher.earliestTimeOf(beginning),
batcher.earliestTimeOf(end)
).right.get
)
}

def willAccept(available: Interval[Timestamp]) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ class ClientStoreProps extends Properties("ClientStore") {
val offline = Future.value(Some((b, 0)))
val nextB = BatchID(b.id + offset)
if (offset >= 0) {
Await.result(ClientStore.offlineLTEQBatch(0, nextB, offline)) == offline.get
Await.result(ClientStore.offlineLTEQBatch(0, nextB, offline)) == Await.result(offline)
} else {
Await.ready(ClientStore.offlineLTEQBatch(0, nextB, offline)).isThrow
Await.result(ClientStore.offlineLTEQBatch(0, nextB, offline).liftToTry).isThrow
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.twitter.summingbird

import org.scalacheck.{ Arbitrary, Gen }

/**
* [[org.scalacheck.GenArities]] and [[org.scalacheck.ArbitraryArities]] classes from scalacheck
* contains to many lambdas in the same file which leads to
* bug with functions serialization in Scala 2.12: https://issues.scala-lang.org/browse/SI-10232
*
* As a workaround this class contains overriden implicits for such cases.
* Should be imported if you import [[Arbitrary]] class.
*
* ScalaCheck issue is tracked at https://github.com/rickynils/scalacheck/issues/342.
*/
object ArbitraryWorkaround {
implicit val f1: Arbitrary[Int => Int] = Arbitrary(Gen.const(x => x * 2))
implicit val f2: Arbitrary[Int => List[(Int, Int)]] = Arbitrary(Gen.const(x => List((x, x * 3))))
implicit val f3: Arbitrary[Int => Option[Int]] = Arbitrary(Gen.const(x => {
if (x % 2 == 0) None else Some(x * 4)
}))
implicit val f4: Arbitrary[Int => List[Int]] = Arbitrary(Gen.const(x => List(x * 5)))
implicit val f5: Arbitrary[((Int, (Int, Option[Int]))) => List[(Int, Int)]] = Arbitrary(Gen.const {
case (x, (y, optZ)) => List((x, y), (x, optZ.getOrElse(42)))
})
implicit val f6: Arbitrary[((Int, Int)) => List[(Int, Int)]] = Arbitrary(Gen.const(x => List(x, x)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.twitter.conversions.time._
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.memcached.KetamaClientBuilder
import com.twitter.finagle.memcached.protocol.text.Memcached
import com.twitter.finagle.transport.Transport
import com.twitter.storehaus.Store
import com.twitter.storehaus.algebra.MergeableStore
import com.twitter.storehaus.memcache.{ HashEncoder, MemcacheStore }
Expand All @@ -43,12 +44,14 @@ object Memcache {
.tcpConnectTimeout(DEFAULT_TIMEOUT)
.requestTimeout(DEFAULT_TIMEOUT)
.connectTimeout(DEFAULT_TIMEOUT)
.readerIdleTimeout(DEFAULT_TIMEOUT)
.hostConnectionLimit(1)
.codec(Memcached())

val liveness = builder.params[Transport.Liveness].copy(readTimeout = DEFAULT_TIMEOUT)
val liveBuilder = builder.configured(liveness)

KetamaClientBuilder()
.clientBuilder(builder)
.clientBuilder(liveBuilder)
.nodes("localhost:11211")
.build()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import com.twitter.summingbird.SummingbirdRuntimeStats

import com.twitter.scalding.{ Test => TestMode, _ }

import org.scalacheck._
import org.scalacheck.Arbitrary
import com.twitter.summingbird.ArbitraryWorkaround._
import org.scalacheck.Cogen

import org.apache.hadoop.conf.Configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import java.io.File

import com.twitter.scalding._

import org.scalacheck._
import org.scalacheck.Arbitrary
import com.twitter.summingbird.ArbitraryWorkaround._

import org.scalatest.WordSpec

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import com.twitter.summingbird._
import com.twitter.summingbird.batch.Batcher
import com.twitter.summingbird.online.option.LeftJoinGrouping
import org.scalatest.WordSpec
import org.scalacheck._
import org.scalacheck.Arbitrary
import com.twitter.summingbird.ArbitraryWorkaround._

/**
* Tests for Summingbird's Storm planner.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.twitter.summingbird.batch.Batcher
import com.twitter.summingbird.storm.spout.TraversableSpout
import org.scalatest.WordSpec
import org.scalacheck._
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
/**
* Tests for Summingbird's Storm planner.
*/
Expand Down Expand Up @@ -102,9 +102,9 @@ class TopologyTests extends WordSpec {
val bolts = stormTopo.get_bolts
val spouts = stormTopo.get_spouts
assert(bolts.size == 1 && spouts.size == 1)
assert(bolts("Tail").get_common.get_parallelism_hint == 7)
assert(bolts.get("Tail").get_common.get_parallelism_hint == 7)

val spout = spouts.head._2
val spout = spouts.asScala.head._2
assert(spout.get_common.get_parallelism_hint == 10)
}

Expand Down Expand Up @@ -132,8 +132,8 @@ class TopologyTests extends WordSpec {
val spouts = stormTopo.get_spouts

assert(stormTopo.get_bolts_size == 1 && stormTopo.get_spouts_size == 1)
assert(spouts.head._2.get_common.get_parallelism_hint == 10)
assert(bolts("Tail").get_common.get_parallelism_hint == 7)
assert(spouts.asScala.head._2.get_common.get_parallelism_hint == 10)
assert(bolts.get("Tail").get_common.get_parallelism_hint == 7)
}

/*
Expand Down Expand Up @@ -229,7 +229,7 @@ class TopologyTests extends WordSpec {
val bolts = stormTopo.get_bolts

// Tail will have 1 -, distance from there should be onwards
val TDistMap = bolts.map { case (k, v) => (k.split("-").size - 1, v) }
val TDistMap = bolts.asScala.map { case (k, v) => (k.split("-").size - 1, v) }

assert(TDistMap(1).get_common.get_parallelism_hint == 50)
}
Expand All @@ -250,7 +250,7 @@ class TopologyTests extends WordSpec {
val bolts = stormTopo.get_bolts

// Tail will have 1 -, distance from there should be onwards
val TDistMap = bolts.map { case (k, v) => (k.split("-").size - 1, v) }
val TDistMap = bolts.asScala.map { case (k, v) => (k.split("-").size - 1, v) }

assert(TDistMap(1).get_common.get_parallelism_hint == 50)
}
Expand All @@ -270,7 +270,7 @@ class TopologyTests extends WordSpec {
val bolts = stormTopo.get_bolts

// Tail will have 1 -, distance from there should be onwards
val TDistMap = bolts.map { case (k, v) => (k.split("-").size - 1, v) }
val TDistMap = bolts.asScala.map { case (k, v) => (k.split("-").size - 1, v) }

assert(TDistMap(1).get_common.get_parallelism_hint == 50)
}
Expand All @@ -286,7 +286,7 @@ class TopologyTests extends WordSpec {
val stormTopo = storm.plan(p).topology
// Source producer
val bolts = stormTopo.get_bolts
val spouts = stormTopo.get_spouts
val spouts = stormTopo.get_spouts.asScala
val spout = spouts.head._2

assert(spout.get_common.get_parallelism_hint == 30)
Expand All @@ -307,7 +307,7 @@ class TopologyTests extends WordSpec {
val bolts = stormTopo.get_bolts

// Tail will have 1 -, distance from there should be onwards
val TDistMap = bolts.map { case (k, v) => (k.split("-").size - 1, v) }
val TDistMap = bolts.asScala.map { case (k, v) => (k.split("-").size - 1, v) }

assert(TDistMap(0).get_common.get_parallelism_hint == 5)
}
Expand Down