Skip to content
This repository was archived by the owner on Mar 11, 2019. It is now read-only.

Commit 834771e

Browse files
committed
refactoring(reporter: influx): Use an asynchronous API for InfluxDB.
Some crashes (timeouts) occurred while overloading the system with the native API for InfluxDB. We now use an asynchronous API.
1 parent 155d238 commit 834771e

File tree

6 files changed

+53
-33
lines changed

6 files changed

+53
-33
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ We all stand on the shoulders of giants and get by with a little help from our f
5858
* [Sigar](https://support.hyperic.com/display/SIGAR/Home) (version 1.6.5 under [Apache 2 license](http://www.apache.org/licenses/LICENSE-2.0)), for providing a portable interface for gathering system information.
5959
* [spray-json](http://spray.io/) (version 1.3.2 under [Apache 2 license](http://www.apache.org/licenses/LICENSE-2.0)), for (de)serializing JSON.
6060
* [docker-java](https://github.com/docker-java/docker-java) (version 2.1.4 under [Apache 2 license](http://www.apache.org/licenses/LICENSE-2.0)), for using the JAVA Docker API.
61-
* [influxdb-java](https://github.com/influxdata/influxdb-java) (version 2.1 under [MIT license](https://github.com/influxdata/influxdb-java/blob/master/LICENSE)), for using the JAVA InfluxDB API.
61+
* [scala-influxdb-client](https://github.com/paulgoldbaum/scala-influxdb-client) (version 0.4.5 under [MIT license](https://github.com/paulgoldbaum/scala-influxdb-client/blob/master/LICENSE)), for using an asynchronous scala API for InfluxDB.
6262

6363
# License
6464
This software is licensed under the *GNU Affero General Public License*, quoted below.

powerapi-cli/src/main/scala/org/powerapi/app/PowerAPI.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ package org.powerapi.app
2424

2525
import java.lang.management.ManagementFactory
2626

27+
import scala.concurrent.Await
2728
import scala.concurrent.duration.DurationInt
2829
import scala.sys
2930
import scala.sys.process.stringSeqToProcess
@@ -98,7 +99,7 @@ object PowerAPI extends App {
9899
| --frequency $MILLISECONDS
99100
| --self (0, 1) --pids [pid, ...] (0, *) --apps [app, ...] (0, *) --containers [id, ...] (0, *) | all (0, 1)
100101
| --agg max|min|geomean|logsum|mean|median|stdev|sum|variance
101-
| --console (0, 1) --file $FILEPATH (0, *) --chart (0, 1) --influx $HOST $USER $PWD $DB $MEASUREMENT (0, *)
102+
| --console (0, 1) --file $FILEPATH (0, *) --chart (0, 1) --influx $HOST $PORT $USER $PWD $DB $MEASUREMENT (0, *)
102103
| duration [s]
103104
|
104105
|example: ./powerapi modules procfs-cpu-simple monitor --frequency 1000 --apps firefox,chrome --agg max --console \
@@ -175,10 +176,12 @@ object PowerAPI extends App {
175176
cliMonitorsSubcommand(options, currentMonitor + ('displays ->
176177
(currentMonitor.getOrElse('displays, Set[Any]()).asInstanceOf[Set[Any]] + new JFreeChartDisplay)
177178
), tail)
178-
case "--influx" :: host :: user :: pwd :: db :: measurement :: tail =>
179-
cliMonitorsSubcommand(options, currentMonitor + ('displays ->
180-
(currentMonitor.getOrElse('displays, Set[Any]()).asInstanceOf[Set[Any]] + new InfluxDisplay(host, user, pwd, db, measurement))
181-
), tail)
179+
case "--influx" :: host :: port :: user :: pwd :: db :: measurement :: tail =>
180+
cliMonitorsSubcommand(options, currentMonitor + ('displays -> {
181+
val influxDisplay = new InfluxDisplay(host, port.toInt, user, pwd, db, measurement)
182+
Await.result(influxDisplay.database.create(), 30.seconds)
183+
currentMonitor.getOrElse('displays, Set[Any]()).asInstanceOf[Set[Any]] + influxDisplay
184+
}), tail)
182185
case option :: tail =>
183186
println(s"unknown monitor option $option")
184187
sys.exit(1)

powerapi-core/build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ libraryDependencies ++= Seq(
2424
"net.java.dev.jna" % "jna" % "4.2.1",
2525
"io.spray" %% "spray-json" % "1.3.2",
2626
"com.github.docker-java" % "docker-java" % "2.1.4",
27-
"org.influxdb" % "influxdb-java" % "2.1"
27+
"com.paulgoldbaum" %% "scala-influxdb-client" % "0.4.5"
2828
)
2929

3030
// Tests

powerapi-core/src/main/scala/org/powerapi/reporter/InfluxDisplay.scala

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,31 +23,29 @@
2323
package org.powerapi.reporter
2424

2525
import java.util.UUID
26-
import java.util.concurrent.TimeUnit
2726

28-
import collection.JavaConversions._
27+
import com.paulgoldbaum.influxdbclient.Parameter.Precision
28+
import com.paulgoldbaum.influxdbclient.{Point, InfluxDB}
2929

30-
31-
import org.influxdb.InfluxDBFactory
32-
import org.influxdb.dto.Point
3330
import org.powerapi.PowerDisplay
3431
import org.powerapi.core.power.Power
3532
import org.powerapi.core.target.Target
3633

3734
/**
3835
* Write power information inside an InfluxDB database.
3936
*/
40-
class InfluxDisplay(host: String, user: String, pwd: String, dbName: String, measurement: String) extends PowerDisplay {
37+
class InfluxDisplay(host: String, port: Int, user: String, pwd: String, dbName: String, measurement: String) extends PowerDisplay {
4138

42-
val influxdb = InfluxDBFactory.connect(host, user, pwd)
39+
val influxdb = InfluxDB.connect(host, port, user, pwd)
40+
val database = influxdb.selectDatabase(dbName)
4341

44-
def display(muid: UUID, timestamp: Long, targets: Set[Target], devices: Set[String], power: Power) {
45-
val point = Point.measurement(measurement)
46-
.time(timestamp, TimeUnit.MILLISECONDS)
47-
.field("power", power.toMilliWatts)
48-
.tag(Map("muid" -> s"$muid", "targets" -> s"${targets.mkString(",")}", "devices" -> s"${devices.mkString(",")}"))
49-
.build()
42+
def display(muid: UUID, timestamp: Long, targets: Set[Target], devices: Set[String], power: Power): Unit = {
43+
val point = Point(measurement, timestamp)
44+
.addField("power", power.toMilliWatts)
45+
.addTag("muid", s"$muid")
46+
.addTag("targets", s"${targets.mkString(",")}")
47+
.addTag("devices", s"${devices.mkString(",")}")
5048

51-
influxdb.write(dbName, "default", point)
49+
database.write(point, precision = Precision.MILLISECONDS)
5250
}
5351
}

powerapi-core/src/test/scala/org/powerapi/reporter/InfluxDisplaySuite.scala

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,16 @@
2323
package org.powerapi.reporter
2424

2525
import java.util.UUID
26-
import java.util.concurrent.TimeUnit
2726

2827
import scala.concurrent.duration.DurationInt
29-
import collection.JavaConversions._
3028

3129
import akka.util.Timeout
3230

33-
import org.influxdb.dto.Query
31+
import org.joda.time.format.ISODateTimeFormat
3432
import org.powerapi.UnitTest
3533
import org.powerapi.core.power._
3634
import org.powerapi.core.target.{Application, Process, Target}
35+
import org.scalatest.time.{Seconds, Span}
3736

3837
class InfluxDisplaySuite extends UnitTest {
3938

@@ -50,13 +49,30 @@ class InfluxDisplaySuite extends UnitTest {
5049
val devices = Set[String]("cpu", "gpu", "ssd")
5150
val power = 10.W
5251

53-
val influxDisplay = new InfluxDisplay("http://localhost:8086", "powerapi", "powerapi", "test", "event.powerapi")
54-
influxDisplay.influxdb.createDatabase("test")
55-
influxDisplay.display(muid, timestamp, targets, devices, power)
56-
val query = new Query("SELECT * FROM \"event.powerapi\"", "test")
57-
val result = influxDisplay.influxdb.query(query, TimeUnit.MILLISECONDS)
58-
result.getResults.head.getSeries.head.getValues.head should contain theSameElementsAs Seq(timestamp.toDouble, s"$muid", devices.mkString(","), targets.mkString(","), power.toMilliWatts)
59-
influxDisplay.influxdb.deleteDatabase("test")
52+
val influxDisplay = new InfluxDisplay("localhost", 8086, "powerapi", "powerapi", "test", "event.powerapi")
53+
54+
whenReady(influxDisplay.database.create(), timeout(Span(30, Seconds))) {
55+
_ =>
56+
influxDisplay.display(muid, timestamp, targets, devices, power)
57+
58+
awaitCond({
59+
whenReady(influxDisplay.database.query("SELECT * FROM \"event.powerapi\"")) {
60+
result =>
61+
result.series.size == 1 &&
62+
result.series.head.records.size == 1 &&
63+
ISODateTimeFormat.dateTimeParser().parseDateTime(result.series.head.records.head("time").toString).getMillis == timestamp &&
64+
result.series.head.records.head("devices") == devices.mkString(",") &&
65+
result.series.head.records.head("muid") == s"$muid" &&
66+
result.series.head.records.head("power") == power.toMilliWatts &&
67+
result.series.head.records.head("targets") == targets.mkString(",")
68+
}
69+
}, 30.seconds, 1.seconds)
70+
71+
whenReady(influxDisplay.database.drop(), timeout(Span(30, Seconds))) {
72+
_ =>
73+
assert(true)
74+
}
75+
}
6076
}
6177
}
6278

powerapi-daemon/src/main/scala/org/powerapi/daemon/PowerAPId.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
*/
2323
package org.powerapi.daemon
2424

25+
import scala.concurrent.Await
26+
import scala.concurrent.duration.DurationInt
2527
import scala.sys.process.stringSeqToProcess
2628

2729
import org.apache.commons.daemon.{Daemon, DaemonContext}
@@ -131,9 +133,10 @@ class PowerAPId extends Daemon {
131133
val fileDisplay = new FileDisplay(file.split("=>")(1))
132134
monitor.to(fileDisplay)
133135
case influx: String if output.startsWith("influx") =>
134-
// influx=>http://locahost:8086,powerapi,powerapi,test,event.powerapi
136+
// influx=>locahost,8086,powerapi,powerapi,test,event.powerapi
135137
val parameters = influx.split("=>")(1).split(",")
136-
val influxDisplay = new InfluxDisplay(parameters(0), parameters(1), parameters(2), parameters(3), parameters(4))
138+
val influxDisplay = new InfluxDisplay(parameters(0), parameters(1).toInt, parameters(2), parameters(3), parameters(4), parameters(5))
139+
Await.result(influxDisplay.database.create(), 30.seconds)
137140
monitor.to(influxDisplay)
138141
case "chart" =>
139142
val chartDisplay = new JFreeChartDisplay()

0 commit comments

Comments
 (0)