diff --git a/.gitignore b/.gitignore index 8d81817..169fd63 100644 --- a/.gitignore +++ b/.gitignore @@ -2,13 +2,10 @@ .idea/ .idea_modules/ /project/boot/ -/project/target/ -/target/ -/project/plugins/target/ +target/ /project/plugins/project/boot/ -/project/plugins/project/target/ .DS_Store *.swp *.swo *.xml -/project/project/target/ +*~ diff --git a/build.sbt b/build.sbt index 46e7717..c425a9d 100644 --- a/build.sbt +++ b/build.sbt @@ -1,29 +1,20 @@ -organization := "io.mental" - -version := "0.0.1" - name := "palamedes" - -scalaVersion := "2.10.4" +organization := "io.mental" resolvers += "boundary" at "http://maven.boundary.com/artifactory/repo" libraryDependencies ++= Seq( "com.boundary" % "high-scale-lib" % "1.0.3", -"org.scalatest" % "scalatest_2.10" % "2.2.0" % "test" withSources() withJavadoc(), -"org.scalacheck" %% "scalacheck" % "1.10.0" % "test" withSources() withJavadoc(), -"com.rojoma" % "rojoma-json-v3_2.10" % "3.1.2" withSources() withJavadoc(), -"org.apache.commons" % "commons-lang3" % "3.3.2", -"joda-time" % "joda-time" % "2.1", - "org.joda" % "joda-convert" % "1.7", -"org.mockito" % "mockito-core" % "1.9.5" withSources() withJavadoc(), -"com.twitter" % "algebird_2.10" % "0.7.0" withSources() withJavadoc(), -"com.twitter" % "algebird-core_2.10" % "0.7.0" withSources() withJavadoc(), -"com.twitter" % "algebird-util_2.10" % "0.7.0" withSources() withJavadoc() - ) - - + "org.scalacheck" %% "scalacheck" % "1.10.0" % "test" withSources() withJavadoc(), + "com.rojoma" % "rojoma-json-v3_2.10" % "3.1.2" withSources() withJavadoc(), + "org.apache.commons" % "commons-lang3" % "3.3.2", + "joda-time" % "joda-time" % "2.1", + "org.joda" % "joda-convert" % "1.7", + "org.mockito" % "mockito-core" % "1.9.5" withSources() withJavadoc(), + "com.twitter" % "algebird_2.10" % "0.7.0" withSources() withJavadoc(), + "com.twitter" % "algebird-core_2.10" % "0.7.0" withSources() withJavadoc(), + "com.twitter" % "algebird-util_2.10" % "0.7.0" withSources() withJavadoc() +) + +scalacOptions ++= Seq("-optimize") testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-oD") - - -scalacOptions ++= Seq("-optimize","-deprecation","-feature") diff --git a/project/build.properties b/project/build.properties index be6c454..a6e117b 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.13.5 +sbt.version=0.13.8 diff --git a/project/build.sbt b/project/build.sbt new file mode 100644 index 0000000..b63c00d --- /dev/null +++ b/project/build.sbt @@ -0,0 +1,6 @@ +resolvers ++= Seq( + "socrata maven" at "https://repository-socrata-oss.forge.cloudbees.com/release" +) + +addSbtPlugin("com.socrata" % "socrata-sbt-plugins" % "1.5.3") + diff --git a/src/main/scala/io/abacus/counter/RollingCounter.scala b/src/main/scala/io/abacus/counter/RollingCounter.scala index 67f4d9b..f096174 100644 --- a/src/main/scala/io/abacus/counter/RollingCounter.scala +++ b/src/main/scala/io/abacus/counter/RollingCounter.scala @@ -1,99 +1,81 @@ package io.abacus.counter -import org.cliffc.high_scale_lib.NonBlockingHashMap -import org.cliffc.high_scale_lib.Counter +import org.cliffc.high_scale_lib.{Counter, NonBlockingHashMap} -import scala.collection.mutable.HashMap +import scala.collection.mutable.{HashMap => MHashMap} class RollingCounter[T](buckets:Int) { private val data = new NonBlockingHashMap[T,Array[Counter]]() private val currentBucket = new Counter - - def increment(thing:T) = { + def increment(thing: T): Unit = { val index = currentBucket.longValue % buckets incrementWithBucket(thing,index.toInt) } - - def incrementWithBucket(thing:T, bucket:Int) = { + def incrementWithBucket(thing: T, bucket: Int): Unit = { val value = getBuckets(thing) - value(bucket).increment - + value(bucket).increment() } - - def count(thing:T):Long = { - val array = data.get(thing) - if(array == null) 0L - else { - var i = 0; - var sum = 0L; - while( i < buckets) { - sum += array(i).estimate_get - i = i+1 - } - sum + def count(thing: T): Long = { + Option(data.get(thing)) match { + case None => 0L + case Some(array) => + var i = 0 + var sum = 0L + while( i < buckets) { + sum += array(i).estimate_get + i = i + 1 + } + sum } } - def counts:Map[T,Long] = { + def counts: Map[T,Long] = { val keys = data.keySet - val output = new HashMap[T,Long]() + val output = new MHashMap[T,Long] val it = keys.iterator - while(it.hasNext) { + while (it.hasNext) { val thing = it.next output.put(thing,count(thing)) } output.toMap - } - def advanceBucket() { - resetAllCountsForBucket(((currentBucket.get+1L) % buckets).toInt) - currentBucket.increment - - + def advanceBucket(): Unit = { + resetAllCountsForBucket(((currentBucket.get + 1L) % buckets).toInt) + currentBucket.increment() } - - - - - - - def resetAllCountsForBucket(bucket:Int) { + def resetAllCountsForBucket(bucket: Int): Unit = { val keys = data.keySet val it = keys.iterator - while(it.hasNext) { + while (it.hasNext) { val thing = it.next resetCountForBucket(thing,bucket) } - } - def resetCountForBucket(thing:T,bucket:Int) = { + def resetCountForBucket(thing: T, bucket: Int): Unit = { val value = getBuckets(thing) value(bucket) = new Counter } - - - private def getBuckets(thing:T) = { - val array = data.get(thing) - if(array == null) initialCountsMaybe(thing) else array + private def getBuckets(thing: T): Array[Counter] = { + Option(data.get(thing)) match { + case None => initialCountsMaybe(thing) + case Some(array) => array + } } - private def initialCountsMaybe(thing:T):Array[Counter] ={ + private def initialCountsMaybe(thing: T): Array[Counter] ={ val array = Array.fill[Counter](buckets)(new Counter) - val previous = data.putIfAbsent(thing,array) - if(previous == null) + Option(data.putIfAbsent(thing,array)) match { // This created the array, so return reference to array - array - else - previous - + case None => array + case Some(previous) => previous + } } - } diff --git a/src/main/scala/io/abacus/counter/TimeWindowedCounter.scala b/src/main/scala/io/abacus/counter/TimeWindowedCounter.scala index e42b7f3..c7ee00d 100644 --- a/src/main/scala/io/abacus/counter/TimeWindowedCounter.scala +++ b/src/main/scala/io/abacus/counter/TimeWindowedCounter.scala @@ -1,30 +1,28 @@ package io.abacus.counter -import java.util.concurrent.{TimeUnit, Executors} +import java.util.concurrent.{Executors, TimeUnit} // window is size of the window in milliseconds and // granularity is the size of each interval in milliseconds // window/granularity should be even, if not, we round up -class TimeWindowedCounter[T](window:Long,granularity:Long) { +class TimeWindowedCounter[T](window: Long, granularity: Long) { private val buckets = window/granularity private val counter = new RollingCounter[T](buckets.toInt) private val scheduledThreadPool = Executors.newScheduledThreadPool(1) - scheduledThreadPool.scheduleWithFixedDelay(new HeartBeat(counter),granularity,granularity,TimeUnit.MILLISECONDS) + scheduledThreadPool.scheduleWithFixedDelay(HeartBeat(counter), granularity, granularity, TimeUnit.MILLISECONDS) - def increment(thing:T) = counter.increment(thing) - def counts = counter.counts + def increment(thing: T): Unit = counter.increment(thing) + def counts: Map[T,Long] = counter.counts - def stop = { - scheduledThreadPool.shutdown - scheduledThreadPool.awaitTermination(granularity,TimeUnit.MILLISECONDS) + def stop: Map[T,Long] = { + scheduledThreadPool.shutdown() + scheduledThreadPool.awaitTermination(granularity, TimeUnit.MILLISECONDS) counts } - class HeartBeat(counter:RollingCounter[T]) extends Runnable { - def run() = { - counter.advanceBucket + case class HeartBeat(counter: RollingCounter[T]) extends Runnable { + def run(): Unit = { + counter.advanceBucket() } } - - } diff --git a/src/main/scala/io/abacus/pipeline/CardinalityEstimationPipeline.scala b/src/main/scala/io/abacus/pipeline/CardinalityEstimationPipeline.scala index 21f1f7f..13af4a1 100644 --- a/src/main/scala/io/abacus/pipeline/CardinalityEstimationPipeline.scala +++ b/src/main/scala/io/abacus/pipeline/CardinalityEstimationPipeline.scala @@ -5,12 +5,10 @@ import io.abacus.soroban.elements.Element import scala.{specialized => spec} - - - -class CardinalityEstimationPipeline[@spec (Int) T]()(implicit ev: Element[T]) extends Pipeline[T,T,Long] { - val hll = new HyperLogLogMonoid(12) - var sumHll:HLL = hll.zero +class CardinalityEstimationPipeline[@spec(Int) T]()(implicit ev: Element[T]) extends Pipeline[T,T,Long] { + private val hllBits = 12 + val hll = new HyperLogLogMonoid(hllBits) + var sumHll: HLL = hll.zero override def results: Long = { val approxSize = hll.sizeOf(sumHll) approxSize.estimate diff --git a/src/main/scala/io/abacus/pipeline/DateParsingPipeline.scala b/src/main/scala/io/abacus/pipeline/DateParsingPipeline.scala index d7a4bf1..b5f6279 100644 --- a/src/main/scala/io/abacus/pipeline/DateParsingPipeline.scala +++ b/src/main/scala/io/abacus/pipeline/DateParsingPipeline.scala @@ -6,10 +6,9 @@ import io.abacus.soroban.elements.Element import org.joda.time.DateTime import org.joda.time.format.{DateTimeFormat, ISODateTimeFormat} - -class DateParsingPipeline[T]()(implicit ev:Element[T]) extends Transformer[T,Option[DateTime]] { - import io.abacus.pipeline.DateParsingPipeline._ - override def process(elem:T):Option[DateTime] = { +class DateParsingPipeline[T]()(implicit ev: Element[T]) extends Transformer[T,Option[DateTime]] { + import io.abacus.pipeline.DateParsingPipeline._ // scalastyle:ignore import.grouping + override def process(elem: T): Option[DateTime] = { try { val date = isoFmt.parseDateTime(elem.toString) Some(date) @@ -19,44 +18,40 @@ class DateParsingPipeline[T]()(implicit ev:Element[T]) extends Transformer[T,Opt try { val date = humanFmt.parseLocalDate(elem.toString).toDateTimeAtStartOfDay Some(date) - } - catch { + } catch { case e: Exception => None } } } - } - object DateParsingPipeline { val isoFmt = ISODateTimeFormat.dateTimeNoMillis() val humanFmt = DateTimeFormat.forPattern("MM/dd/yyyy") - - } - -case class DateCounts(years:Long, yearMonths:Long, yearMonthDays:Long) +case class DateCounts(years: Long, yearMonths: Long, yearMonthDays: Long) object DateCounts { implicit val codec = AutomaticJsonCodecBuilder[DateCounts] } class DateCardinalityPipeline() extends Pipeline[Option[DateTime], Option[DateTime], DateCounts ] { - import io.abacus.pipeline.DateCardinalityPipeline._ - val hllYear = new HyperLogLogMonoid(12) - val hllYearMonth = new HyperLogLogMonoid(12) - val hllYearMonthDay = new HyperLogLogMonoid(12) + import io.abacus.pipeline.DateCardinalityPipeline._ // scalastyle:ignore import.grouping + private val hllbits = 12 + val hllYear = new HyperLogLogMonoid(hllbits) + val hllYearMonth = new HyperLogLogMonoid(hllbits) + val hllYearMonthDay = new HyperLogLogMonoid(hllbits) var sumHllyear: HLL = hllYear.zero var sumHllyearMonth: HLL = hllYearMonth.zero var sumHllyearMonthDay: HLL = hllYearMonthDay.zero override def results: DateCounts = { - - DateCounts(hllYear.sizeOf(sumHllyear).estimate, hllYearMonth.sizeOf(sumHllyearMonth).estimate,hllYearMonthDay.sizeOf(sumHllyearMonthDay).estimate) + DateCounts(hllYear.sizeOf(sumHllyear).estimate, + hllYearMonth.sizeOf(sumHllyearMonth).estimate, + hllYearMonthDay.sizeOf(sumHllyearMonthDay).estimate) } - override def process(elem: Option[DateTime]) = { + override def process(elem: Option[DateTime]): Option[DateTime] = { elem match { case Some(dt) => val year = yearFmt.print(dt) @@ -69,10 +64,8 @@ class DateCardinalityPipeline() extends Pipeline[Option[DateTime], Option[DateTi sumHllyearMonth = hllYearMonth.plus(sumHllyearMonth, itemYearMonth) sumHllyearMonthDay = hllYearMonthDay.plus(sumHllyearMonthDay, itemYearMonthDay) case None => - } elem - } } @@ -80,4 +73,4 @@ object DateCardinalityPipeline { val yearFmt = ISODateTimeFormat.year() val yearMonthFmt = ISODateTimeFormat.yearMonth() val yearMonthDayFmt = ISODateTimeFormat.yearMonthDay() -} \ No newline at end of file +} diff --git a/src/main/scala/io/abacus/pipeline/Pipeline.scala b/src/main/scala/io/abacus/pipeline/Pipeline.scala index cfa3ff2..e8ae6d6 100644 --- a/src/main/scala/io/abacus/pipeline/Pipeline.scala +++ b/src/main/scala/io/abacus/pipeline/Pipeline.scala @@ -1,53 +1,44 @@ package io.abacus.pipeline +// scalastyle:off spaces.after.plus spaces.before.plus trait Pipeline[-I,+O,+R] { self => - def andThen[O2, U >: R](next:Pipeline[O,O2,U]):Pipeline[I,O2,U] = { + def andThen[O2, U >: R](next: Pipeline[O,O2,U]): Pipeline[I,O2,U] = { new Pipeline[I,O2,U] { - override def process(elem:I):O2 = { + override def process(elem: I): O2 = { val o = self.process(elem) next.process(o) } - override def results:U = next.results - + override def results: U = next.results } } - def alongWith[U <:I, T >: O, R2](and:Pipeline[U,T,R2]):Pipeline[U,T,(R,R2)] = { + def alongWith[U <:I, T >: O, R2](and: Pipeline[U,T,R2]): Pipeline[U,T,(R,R2)] = { new Pipeline[U,T,(R,R2)] { - override def process(elem:U):T = { + override def process(elem: U): T = { self.process(elem) and.process(elem) } - override def results:(R,R2) = (self.results, and.results) + override def results: (R,R2) = (self.results, and.results) } } - - def pipeResults[O2,R2](next:Pipeline[R,O2,R2]):Pipeline[I,O2,R2] = { + + def pipeResults[O2,R2](next: Pipeline[R,O2,R2]): Pipeline[I,O2,R2] = { new Pipeline[I,O2,R2] { - override def process(elem:I) = { + override def process(elem: I): O2 = { self.process(elem) next.process(self.results) } - override def results:R2 = next.results + override def results: R2 = next.results } } - - def results:R - def process(elem:I):O + def results: R + def process(elem: I): O } - - - trait Transformer[-I,+O] extends Pipeline[I,O,Nothing] { - override def results:Nothing = ??? + override def results: Nothing = ??? } - - - - - diff --git a/src/main/scala/io/abacus/pipeline/TopKPipeline.scala b/src/main/scala/io/abacus/pipeline/TopKPipeline.scala index 1ebc182..2af0175 100644 --- a/src/main/scala/io/abacus/pipeline/TopKPipeline.scala +++ b/src/main/scala/io/abacus/pipeline/TopKPipeline.scala @@ -7,24 +7,23 @@ import io.abacus.soroban.elements.Element import scala.{specialized => spec} - // NOT THREADSAFE BEFORE INITIAL SETUP -class TopKPipeline[@spec (Int) T](k:Int)(implicit ev: Element[T]) extends Pipeline[T,T,Seq[(T,Long)]] { +class TopKPipeline[@spec(Int) T](k: Int)(implicit ev: Element[T]) extends Pipeline[T,T,Seq[(T,Long)]] { + private val sumaryCap = 32 val sssm = new SpaceSaverSemigroup[T] - val processedFirst= new AtomicBoolean(false) - var summary:SpaceSaver[T] = _ - def process(elem:T):T = { - if(processedFirst.get()) { - summary = sssm.plus(summary,SpaceSaver(32,elem)) + val processedFirst = new AtomicBoolean(false) + var summary: SpaceSaver[T] = _ + def process(elem: T): T = { + if (processedFirst.get()) { + summary = sssm.plus(summary,SpaceSaver(sumaryCap,elem)) } else { - summary = SpaceSaver(32, elem) + summary = SpaceSaver(sumaryCap, elem) processedFirst.set(true) } elem } - def results:Seq[(T, Long)] = summary.topK(k).map(v => (v._1, v._2.estimate)) - + def results: Seq[(T, Long)] = summary.topK(k).map(v => (v._1, v._2.estimate)) } diff --git a/src/main/scala/io/abacus/pipeline/YearFinderPipeline.scala b/src/main/scala/io/abacus/pipeline/YearFinderPipeline.scala index cc5fc21..9e6606d 100644 --- a/src/main/scala/io/abacus/pipeline/YearFinderPipeline.scala +++ b/src/main/scala/io/abacus/pipeline/YearFinderPipeline.scala @@ -6,26 +6,22 @@ import io.abacus.soroban.elements.Element import scala.{specialized => spec} - -class YearFinderPipeline[@spec(Int) T](minYear:Int, maxYear:Int, threshold:Double)(implicit ev:Element[T]) extends Pipeline[T,Option[Int],Boolean] { +class YearFinderPipeline[@spec(Int) T](minYear:Int, maxYear:Int, threshold: Double)(implicit ev:Element[T]) + extends Pipeline[T,Option[Int],Boolean] { val possibleYearCount = new AtomicLong(0) val totalElements = new AtomicLong() - def process(elem:T) = { + def process(elem:T): Option[Int] = { totalElements.incrementAndGet() val possibleInt = ev.toInt(elem) possibleInt match { - case Some(pi) => - if (minYear <= pi && pi <= maxYear) { - possibleYearCount.incrementAndGet() - } + case Some(pi) => if (minYear <= pi && pi <= maxYear) possibleYearCount.incrementAndGet() case None => } possibleInt } - def results:Boolean = { + def results: Boolean = { possibleYearCount.get().toDouble/totalElements.get >= threshold } - } diff --git a/src/main/scala/io/abacus/pipeline/elements/Elements.scala b/src/main/scala/io/abacus/pipeline/elements/Element.scala similarity index 63% rename from src/main/scala/io/abacus/pipeline/elements/Elements.scala rename to src/main/scala/io/abacus/pipeline/elements/Element.scala index 04e1fee..36c7ae0 100644 --- a/src/main/scala/io/abacus/pipeline/elements/Elements.scala +++ b/src/main/scala/io/abacus/pipeline/elements/Element.scala @@ -3,18 +3,18 @@ package io.abacus.soroban.elements import java.nio.ByteBuffer import java.text.NumberFormat import java.util.Locale + import scala.{specialized => spec} trait Element[@spec(Int) T] { - def toBytes(t: T):Array[Byte] - def toInt(t: T):Option[Int] + def toBytes(t: T): Array[Byte] + def toInt(t: T): Option[Int] } object Element { - implicit object StringElement extends Element[String] { - def toBytes(underlying: String) = underlying.getBytes - def toInt(underlying:String) = { + def toBytes(underlying: String): Array[Byte] = underlying.getBytes + def toInt(underlying:String): Option[Int] = { val nh = NumberFormat.getInstance(Locale.US) try { val int = nh.parse(underlying) @@ -27,15 +27,15 @@ object Element { } implicit object IntElement extends Element[Int] { - def toBytes(underlying: Int) = { - val buf = new Array[Byte](4) + private val bufLen = 4 + def toBytes(underlying: Int): Array[Byte] = { + val buf = new Array[Byte](bufLen) ByteBuffer .wrap(buf) .putInt(underlying) buf } - def toInt(t:Int) = Some(t) + def toInt(t: Int): Option[Int] = Some(t) } - } diff --git a/src/main/scala/io/abacus/pipeline/elements/Labels.scala b/src/main/scala/io/abacus/pipeline/elements/Label.scala similarity index 80% rename from src/main/scala/io/abacus/pipeline/elements/Labels.scala rename to src/main/scala/io/abacus/pipeline/elements/Label.scala index 7a2d86b..abf2a95 100644 --- a/src/main/scala/io/abacus/pipeline/elements/Labels.scala +++ b/src/main/scala/io/abacus/pipeline/elements/Label.scala @@ -3,15 +3,13 @@ package io.abacus.soroban.elements sealed trait Label case object Person extends Label -case class Location(specific:String) extends Label +case class Location(specific: String) extends Label case object Organization extends Label case object Money extends Label case object Date extends Label - - object Label { - def stringToLabel(label:String):Option[Label] = { + def stringToLabel(label: String): Option[Label] = { label match { case "PERSON" => Some(Person) case "ORGANIZATION" => Some(Organization) @@ -21,4 +19,4 @@ object Label { case _ => None } } -} \ No newline at end of file +} diff --git a/src/test/scala/io/abacus/pipeline/DateParsingPipelineSpec.scala b/src/test/scala/io/abacus/pipeline/DateParsingPipelineSpec.scala index f36d6d3..67d755b 100644 --- a/src/test/scala/io/abacus/pipeline/DateParsingPipelineSpec.scala +++ b/src/test/scala/io/abacus/pipeline/DateParsingPipelineSpec.scala @@ -1,6 +1,6 @@ package io.abacus.pipeline -import org.joda.time.{DateTimeZone, DateTime} +import org.joda.time.{DateTime, DateTimeZone} import org.scalatest.{Failed, ShouldMatchers, WordSpec} class DateParsingPipelineSpec extends WordSpec with ShouldMatchers { @@ -27,14 +27,11 @@ class DateParsingPipelineSpec extends WordSpec with ShouldMatchers { val datetime = "05/12/1982" val a = new DateParsingPipeline[String]() val opt = a.process(datetime) - println(opt) opt match { case Some(d) => assert(d.isEqual(new DateTime(1982, 5, 12, 0, 0, 0))) case None => Failed } - } - } "The DateCardinalityPipeline" should { @@ -53,8 +50,6 @@ class DateParsingPipelineSpec extends WordSpec with ShouldMatchers { a.process(Some(new DateTime(2014,8,27,11,20,3, DateTimeZone.UTC))) a.process(Some(new DateTime(2014,8,23,11,20,3, DateTimeZone.UTC))) a.process(Some(new DateTime(2014,10,29,11,20,3, DateTimeZone.UTC))) - - a.results should be (DateCounts(1,2,3)) } @@ -65,5 +60,4 @@ class DateParsingPipelineSpec extends WordSpec with ShouldMatchers { a.results should be (DateCounts(1,1,1)) } } - } diff --git a/src/test/scala/io/abacus/pipeline/PipelineSpec.scala b/src/test/scala/io/abacus/pipeline/PipelineSpec.scala index a1a10cd..079424a 100644 --- a/src/test/scala/io/abacus/pipeline/PipelineSpec.scala +++ b/src/test/scala/io/abacus/pipeline/PipelineSpec.scala @@ -1,37 +1,34 @@ package io.abacus.pipeline -import io.abacus.pipeline.Pipeline -import org.scalatest.WordSpec -import org.scalatest.ShouldMatchers -import scala.collection.mutable.{Map => MMap, HashMap} +import org.scalatest.{ShouldMatchers, WordSpec} -class PipelineSpec extends WordSpec with ShouldMatchers { - // Simple WordCounter to test things +import scala.collection.mutable.{HashMap => MHashMap, Map => MMap} +class PipelineSpec extends WordSpec with ShouldMatchers { class StringToLength() extends Pipeline[String,Int,Int] { var count = 0 - def process(elem:String) = { count+=elem.length; elem.length} - def results = count + def process(elem: String): Int = { count += elem.length; elem.length } + def results: Int = count } class WordCounter() extends Pipeline[String,String,MMap[String,Int]] { - val words = HashMap.empty[String,Int].withDefaultValue(0) - def process(elem:String) = { - words.put(elem,words(elem)+1) + val words = MHashMap.empty[String,Int].withDefaultValue(0) + def process(elem: String): String = { + words.put(elem,words(elem) + 1) elem } - def results = words + def results: MMap[String,Int] = words } class Summer() extends Pipeline[Int,Int, Int] { var sum = 0 - def process(elem:Int) = {sum+=elem; elem} - def results = sum + def process(elem: Int): Int = { sum += elem; elem } + def results: Int = sum } class WordReduce() extends Pipeline[MMap[String,Int], Int,Int] { - var acc = 0; - def process(elem:MMap[String,Int]) = {val a = elem.map{ case (k,v) => v}.sum; acc+=a; a} - def results = acc + var acc = 0 + def process(elem: MMap[String,Int]): Int = { val a = elem.map{ case (k,v) => v}.sum; acc += a; a} + def results: Int = acc } "A simple smoke test for the Pipeline framework" should { diff --git a/src/test/scala/io/abacus/pipeline/TopKPipelineSpec.scala b/src/test/scala/io/abacus/pipeline/TopKPipelineSpec.scala index d08e328..efa2f27 100644 --- a/src/test/scala/io/abacus/pipeline/TopKPipelineSpec.scala +++ b/src/test/scala/io/abacus/pipeline/TopKPipelineSpec.scala @@ -1,6 +1,5 @@ package io.abacus.pipeline -import io.abacus.pipeline.TopKPipeline import org.scalatest.WordSpec import org.scalatest.ShouldMatchers diff --git a/src/test/scala/io/abacus/pipeline/YearFinderPipelineSpec.scala b/src/test/scala/io/abacus/pipeline/YearFinderPipelineSpec.scala index 47713ad..0569bc0 100644 --- a/src/test/scala/io/abacus/pipeline/YearFinderPipelineSpec.scala +++ b/src/test/scala/io/abacus/pipeline/YearFinderPipelineSpec.scala @@ -1,6 +1,5 @@ package io.abacus.pipeline -import io.abacus.pipeline.YearFinderPipeline import org.scalatest.{ShouldMatchers, WordSpec} class YearFinderPipelineSpec extends WordSpec with ShouldMatchers { diff --git a/version.sbt b/version.sbt new file mode 100644 index 0000000..76ccd7c --- /dev/null +++ b/version.sbt @@ -0,0 +1,2 @@ +version in ThisBuild := "0.0.1" +