Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@
.idea/
.idea_modules/
/project/boot/
/project/target/
/target/
/project/plugins/target/
target/
/project/plugins/project/boot/
/project/plugins/project/target/
.DS_Store
*.swp
*.swo
*.xml
/project/project/target/
*~
35 changes: 13 additions & 22 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,29 +1,20 @@
organization := "io.mental"

version := "0.0.1"

name := "palamedes"

scalaVersion := "2.10.4"
organization := "io.mental"

resolvers += "boundary" at "http://maven.boundary.com/artifactory/repo"

libraryDependencies ++= Seq(
"com.boundary" % "high-scale-lib" % "1.0.3",
"org.scalatest" % "scalatest_2.10" % "2.2.0" % "test" withSources() withJavadoc(),
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test" withSources() withJavadoc(),
"com.rojoma" % "rojoma-json-v3_2.10" % "3.1.2" withSources() withJavadoc(),
"org.apache.commons" % "commons-lang3" % "3.3.2",
"joda-time" % "joda-time" % "2.1",
"org.joda" % "joda-convert" % "1.7",
"org.mockito" % "mockito-core" % "1.9.5" withSources() withJavadoc(),
"com.twitter" % "algebird_2.10" % "0.7.0" withSources() withJavadoc(),
"com.twitter" % "algebird-core_2.10" % "0.7.0" withSources() withJavadoc(),
"com.twitter" % "algebird-util_2.10" % "0.7.0" withSources() withJavadoc()
)


"org.scalacheck" %% "scalacheck" % "1.10.0" % "test" withSources() withJavadoc(),
"com.rojoma" % "rojoma-json-v3_2.10" % "3.1.2" withSources() withJavadoc(),
"org.apache.commons" % "commons-lang3" % "3.3.2",
"joda-time" % "joda-time" % "2.1",
"org.joda" % "joda-convert" % "1.7",
"org.mockito" % "mockito-core" % "1.9.5" withSources() withJavadoc(),
"com.twitter" % "algebird_2.10" % "0.7.0" withSources() withJavadoc(),
"com.twitter" % "algebird-core_2.10" % "0.7.0" withSources() withJavadoc(),
"com.twitter" % "algebird-util_2.10" % "0.7.0" withSources() withJavadoc()
)

scalacOptions ++= Seq("-optimize")
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-oD")


scalacOptions ++= Seq("-optimize","-deprecation","-feature")
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=0.13.5
sbt.version=0.13.8
6 changes: 6 additions & 0 deletions project/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
resolvers ++= Seq(
"socrata maven" at "https://repository-socrata-oss.forge.cloudbees.com/release"
)

addSbtPlugin("com.socrata" % "socrata-sbt-plugins" % "1.5.3")

88 changes: 35 additions & 53 deletions src/main/scala/io/abacus/counter/RollingCounter.scala
Original file line number Diff line number Diff line change
@@ -1,99 +1,81 @@
package io.abacus.counter

import org.cliffc.high_scale_lib.NonBlockingHashMap
import org.cliffc.high_scale_lib.Counter
import org.cliffc.high_scale_lib.{Counter, NonBlockingHashMap}

import scala.collection.mutable.HashMap
import scala.collection.mutable.{HashMap => MHashMap}

class RollingCounter[T](buckets:Int) {
private val data = new NonBlockingHashMap[T,Array[Counter]]()
private val currentBucket = new Counter


def increment(thing:T) = {
def increment(thing: T): Unit = {
val index = currentBucket.longValue % buckets
incrementWithBucket(thing,index.toInt)
}


def incrementWithBucket(thing:T, bucket:Int) = {
def incrementWithBucket(thing: T, bucket: Int): Unit = {
val value = getBuckets(thing)
value(bucket).increment

value(bucket).increment()
}


def count(thing:T):Long = {
val array = data.get(thing)
if(array == null) 0L
else {
var i = 0;
var sum = 0L;
while( i < buckets) {
sum += array(i).estimate_get
i = i+1
}
sum
def count(thing: T): Long = {
Option(data.get(thing)) match {
case None => 0L
case Some(array) =>
var i = 0
var sum = 0L
while( i < buckets) {
sum += array(i).estimate_get
i = i + 1
}
sum
}
}

def counts:Map[T,Long] = {
def counts: Map[T,Long] = {
val keys = data.keySet
val output = new HashMap[T,Long]()
val output = new MHashMap[T,Long]
val it = keys.iterator
while(it.hasNext) {
while (it.hasNext) {
val thing = it.next
output.put(thing,count(thing))
}

output.toMap

}

def advanceBucket() {
resetAllCountsForBucket(((currentBucket.get+1L) % buckets).toInt)
currentBucket.increment


def advanceBucket(): Unit = {
resetAllCountsForBucket(((currentBucket.get + 1L) % buckets).toInt)
currentBucket.increment()
}







def resetAllCountsForBucket(bucket:Int) {
def resetAllCountsForBucket(bucket: Int): Unit = {
val keys = data.keySet
val it = keys.iterator
while(it.hasNext) {
while (it.hasNext) {
val thing = it.next
resetCountForBucket(thing,bucket)
}

}

def resetCountForBucket(thing:T,bucket:Int) = {
def resetCountForBucket(thing: T, bucket: Int): Unit = {
val value = getBuckets(thing)
value(bucket) = new Counter
}



private def getBuckets(thing:T) = {
val array = data.get(thing)
if(array == null) initialCountsMaybe(thing) else array
private def getBuckets(thing: T): Array[Counter] = {
Option(data.get(thing)) match {
case None => initialCountsMaybe(thing)
case Some(array) => array
}
}

private def initialCountsMaybe(thing:T):Array[Counter] ={
private def initialCountsMaybe(thing: T): Array[Counter] ={
val array = Array.fill[Counter](buckets)(new Counter)
val previous = data.putIfAbsent(thing,array)
if(previous == null)
Option(data.putIfAbsent(thing,array)) match {
// This created the array, so return reference to array
array
else
previous

case None => array
case Some(previous) => previous
}
}

}
24 changes: 11 additions & 13 deletions src/main/scala/io/abacus/counter/TimeWindowedCounter.scala
Original file line number Diff line number Diff line change
@@ -1,30 +1,28 @@
package io.abacus.counter
import java.util.concurrent.{TimeUnit, Executors}
import java.util.concurrent.{Executors, TimeUnit}

// window is size of the window in milliseconds and
// granularity is the size of each interval in milliseconds
// window/granularity should be even, if not, we round up
class TimeWindowedCounter[T](window:Long,granularity:Long) {
class TimeWindowedCounter[T](window: Long, granularity: Long) {
private val buckets = window/granularity
private val counter = new RollingCounter[T](buckets.toInt)
private val scheduledThreadPool = Executors.newScheduledThreadPool(1)

scheduledThreadPool.scheduleWithFixedDelay(new HeartBeat(counter),granularity,granularity,TimeUnit.MILLISECONDS)
scheduledThreadPool.scheduleWithFixedDelay(HeartBeat(counter), granularity, granularity, TimeUnit.MILLISECONDS)

def increment(thing:T) = counter.increment(thing)
def counts = counter.counts
def increment(thing: T): Unit = counter.increment(thing)
def counts: Map[T,Long] = counter.counts

def stop = {
scheduledThreadPool.shutdown
scheduledThreadPool.awaitTermination(granularity,TimeUnit.MILLISECONDS)
def stop: Map[T,Long] = {
scheduledThreadPool.shutdown()
scheduledThreadPool.awaitTermination(granularity, TimeUnit.MILLISECONDS)
counts
}

class HeartBeat(counter:RollingCounter[T]) extends Runnable {
def run() = {
counter.advanceBucket
case class HeartBeat(counter: RollingCounter[T]) extends Runnable {
def run(): Unit = {
counter.advanceBucket()
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ import io.abacus.soroban.elements.Element

import scala.{specialized => spec}




class CardinalityEstimationPipeline[@spec (Int) T]()(implicit ev: Element[T]) extends Pipeline[T,T,Long] {
val hll = new HyperLogLogMonoid(12)
var sumHll:HLL = hll.zero
class CardinalityEstimationPipeline[@spec(Int) T]()(implicit ev: Element[T]) extends Pipeline[T,T,Long] {
private val hllBits = 12
val hll = new HyperLogLogMonoid(hllBits)
var sumHll: HLL = hll.zero
override def results: Long = {
val approxSize = hll.sizeOf(sumHll)
approxSize.estimate
Expand Down
37 changes: 15 additions & 22 deletions src/main/scala/io/abacus/pipeline/DateParsingPipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import io.abacus.soroban.elements.Element
import org.joda.time.DateTime
import org.joda.time.format.{DateTimeFormat, ISODateTimeFormat}


class DateParsingPipeline[T]()(implicit ev:Element[T]) extends Transformer[T,Option[DateTime]] {
import io.abacus.pipeline.DateParsingPipeline._
override def process(elem:T):Option[DateTime] = {
class DateParsingPipeline[T]()(implicit ev: Element[T]) extends Transformer[T,Option[DateTime]] {
import io.abacus.pipeline.DateParsingPipeline._ // scalastyle:ignore import.grouping
override def process(elem: T): Option[DateTime] = {
try {
val date = isoFmt.parseDateTime(elem.toString)
Some(date)
Expand All @@ -19,44 +18,40 @@ class DateParsingPipeline[T]()(implicit ev:Element[T]) extends Transformer[T,Opt
try {
val date = humanFmt.parseLocalDate(elem.toString).toDateTimeAtStartOfDay
Some(date)
}
catch {
} catch {
case e: Exception => None
}
}
}

}


object DateParsingPipeline {
val isoFmt = ISODateTimeFormat.dateTimeNoMillis()
val humanFmt = DateTimeFormat.forPattern("MM/dd/yyyy")


}


case class DateCounts(years:Long, yearMonths:Long, yearMonthDays:Long)
case class DateCounts(years: Long, yearMonths: Long, yearMonthDays: Long)
object DateCounts {
implicit val codec = AutomaticJsonCodecBuilder[DateCounts]
}

class DateCardinalityPipeline() extends Pipeline[Option[DateTime], Option[DateTime], DateCounts ] {
import io.abacus.pipeline.DateCardinalityPipeline._
val hllYear = new HyperLogLogMonoid(12)
val hllYearMonth = new HyperLogLogMonoid(12)
val hllYearMonthDay = new HyperLogLogMonoid(12)
import io.abacus.pipeline.DateCardinalityPipeline._ // scalastyle:ignore import.grouping
private val hllbits = 12
val hllYear = new HyperLogLogMonoid(hllbits)
val hllYearMonth = new HyperLogLogMonoid(hllbits)
val hllYearMonthDay = new HyperLogLogMonoid(hllbits)
var sumHllyear: HLL = hllYear.zero
var sumHllyearMonth: HLL = hllYearMonth.zero
var sumHllyearMonthDay: HLL = hllYearMonthDay.zero

override def results: DateCounts = {

DateCounts(hllYear.sizeOf(sumHllyear).estimate, hllYearMonth.sizeOf(sumHllyearMonth).estimate,hllYearMonthDay.sizeOf(sumHllyearMonthDay).estimate)
DateCounts(hllYear.sizeOf(sumHllyear).estimate,
hllYearMonth.sizeOf(sumHllyearMonth).estimate,
hllYearMonthDay.sizeOf(sumHllyearMonthDay).estimate)
}

override def process(elem: Option[DateTime]) = {
override def process(elem: Option[DateTime]): Option[DateTime] = {
elem match {
case Some(dt) =>
val year = yearFmt.print(dt)
Expand All @@ -69,15 +64,13 @@ class DateCardinalityPipeline() extends Pipeline[Option[DateTime], Option[DateTi
sumHllyearMonth = hllYearMonth.plus(sumHllyearMonth, itemYearMonth)
sumHllyearMonthDay = hllYearMonthDay.plus(sumHllyearMonthDay, itemYearMonthDay)
case None =>

}
elem

}
}

object DateCardinalityPipeline {
val yearFmt = ISODateTimeFormat.year()
val yearMonthFmt = ISODateTimeFormat.yearMonth()
val yearMonthDayFmt = ISODateTimeFormat.yearMonthDay()
}
}
Loading