Skip to content
This repository was archived by the owner on Mar 11, 2019. It is now read-only.

Commit 630e8e9

Browse files
committed
feature(disk): Implements a Disk module
This feature adds a disk module (sensor, formula) by using the cgroup facilities (to get more details about disk usages at target level). All methods for interacting with cgroup are written inside the LinuxHelper and it works only on Linux platforms.
1 parent 834771e commit 630e8e9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1593
-261
lines changed

powerapi-cli/src/main/scala/org/powerapi/app/PowerAPI.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.powerapi.core.power._
3434
import org.powerapi.core.target._
3535
import org.powerapi.module.cpu.dvfs.CpuDvfsModule
3636
import org.powerapi.module.cpu.simple.{ProcFSCpuSimpleModule, SigarCpuSimpleModule}
37+
import org.powerapi.module.disk.simple.DiskSimpleModule
3738
import org.powerapi.module.extpowermeter.g5komegawatt.G5kOmegaWattModule
3839
import org.powerapi.module.extpowermeter.powerspy.PowerSpyModule
3940
import org.powerapi.module.extpowermeter.rapl.RAPLModule
@@ -48,7 +49,7 @@ import org.powerapi.{PowerDisplay, PowerMeter, PowerMonitoring}
4849
* @author <a href="mailto:l.huertas.pro@gmail.com">Loïc Huertas</a>
4950
*/
5051
object PowerAPI extends App {
51-
val modulesR = """(procfs-cpu-simple|sigar-cpu-simple|cpu-dvfs|libpfm|libpfm-process|libpfm-core|libpfm-core-process|powerspy|g5k-omegawatt|rapl)(,(procfs-cpu-simple|sigar-cpu-simple|cpu-dvfs|libpfm|libpfm-process|libpfm-core|libpfm-core-process|powerspy|g5k-omegawatt|rapl))*""".r
52+
val modulesR = """(procfs-cpu-simple|sigar-cpu-simple|cpu-dvfs|libpfm|libpfm-process|libpfm-core|libpfm-core-process|powerspy|g5k-omegawatt|rapl|disk-simple)(,(procfs-cpu-simple|sigar-cpu-simple|cpu-dvfs|libpfm|libpfm-process|libpfm-core|libpfm-core-process|powerspy|g5k-omegawatt|rapl|disk-simple))*""".r
5253
val aggR = """max|min|geomean|logsum|mean|median|stdev|sum|variance""".r
5354
val durationR = """\d+""".r
5455
val pidsR = """(\d+)(,(\d+))*""".r
@@ -94,7 +95,7 @@ object PowerAPI extends App {
9495
|Different settings can be used per software-defined power meter by using the prefix option.
9596
|Please, refer to the documentation inside the GitHub wiki for further details.
9697
|
97-
|usage: ./powerapi modules procfs-cpu-simple|sigar-cpu-simple|cpu-dvfs|libpfm|libpfm-process|libpfm-core|libpfm-core-process|powerspy|g5k-omegawatt|rapl (1, *) *--prefix [name]*
98+
|usage: ./powerapi modules procfs-cpu-simple|sigar-cpu-simple|cpu-dvfs|libpfm|libpfm-process|libpfm-core|libpfm-core-process|powerspy|g5k-omegawatt|rapl|disk-simple (1, *) *--prefix [name]*
9899
| monitor (1, *)
99100
| --frequency $MILLISECONDS
100101
| --self (0, 1) --pids [pid, ...] (0, *) --apps [app, ...] (0, *) --containers [id, ...] (0, *) | all (0, 1)
@@ -228,6 +229,8 @@ object PowerAPI extends App {
228229
G5kOmegaWattModule(powerMeterConf('prefix).asInstanceOf[Option[String]])
229230
case "rapl" =>
230231
RAPLModule()
232+
case "disk-simple" =>
233+
DiskSimpleModule(powerMeterConf('prefix).asInstanceOf[Option[String]])
231234
}
232235
}).toSeq
233236

powerapi-core/src/main/scala/org/powerapi/PowerMeter.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,15 @@ import java.util.concurrent.TimeUnit
2828
import scala.concurrent.ExecutionContext.Implicits._
2929
import scala.concurrent.duration.{DurationLong, FiniteDuration}
3030
import scala.concurrent.{Await, Future}
31-
3231
import akka.actor._
3332
import akka.pattern.{after, ask, gracefulStop, pipe}
3433
import akka.util.Timeout
35-
3634
import org.powerapi.core.MonitorChannel.{startMonitor, stopAllMonitor}
3735
import org.powerapi.core.power._
3836
import org.powerapi.core.target.Target
3937
import org.powerapi.core.{ActorComponent, Clocks, ConfigValue, Configuration, MessageBus, Monitor, Monitors}
4038
import org.powerapi.module.FormulaChannel.{startFormula, stopAllFormula}
39+
import org.powerapi.module.PowerChannel.AggregatePowerReport
4140
import org.powerapi.module.SensorChannel.{startSensor, stopAllSensor}
4241
import org.powerapi.module.{Formula, Formulas, Sensor, Sensors}
4342
import org.powerapi.reporter.ReporterChannel.stopAllReporter
@@ -245,5 +244,5 @@ trait PowerMonitoring {
245244
*/
246245
trait PowerDisplay {
247246

248-
def display(muid: UUID, timestamp: Long, targets: Set[Target], devices: Set[String], power: Power)
247+
def display(aggregatePowerReport: AggregatePowerReport)
249248
}

powerapi-core/src/main/scala/org/powerapi/core/MonitorActors.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ class MonitorChild(eventBus: MessageBus, muid: UUID, targets: Set[Target]) exten
151151
* Aggregate all RawPowerReport for the current monitoring.
152152
*/
153153
def aggregate(aggR: AggregatePowerReport, powerReport: RawPowerReport, aggregator: Option[Seq[Power] => Power]): Unit = {
154-
if (aggR.size == 0 || aggR.tick == powerReport.tick) {
154+
if (aggR.size == 0 || aggR.ticks.map(_.timestamp).contains(powerReport.tick.timestamp)) {
155155
aggR += powerReport
156156
}
157157
else {

powerapi-core/src/main/scala/org/powerapi/core/OSHelper.scala

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,22 @@ case class TimeInStates(times: Map[Long, Long]) {
6262
*/
6363
case class GlobalCpuTimes(idleTime: Long, activeTime: Long)
6464

65+
/**
66+
* Wrapper class for disk info.
67+
*/
68+
case class Disk(name: String, major: Int, minor: Int, bytesRead: Long = 0, bytesWritten: Long = 0) {
69+
def -(other: Disk): Disk = {
70+
if (name == other.name && major == other.major && minor == other.minor) {
71+
val diffBytesRead = bytesRead - other.bytesRead
72+
val diffBytesWrite = bytesWritten - other.bytesWritten
73+
74+
Disk(name, major, minor, if (diffBytesRead > 0) diffBytesRead else 0, if (diffBytesWrite > 0) diffBytesWrite else 0)
75+
}
76+
77+
else this
78+
}
79+
}
80+
6581
/**
6682
* Base trait use for implementing os specific methods.
6783
*
@@ -111,6 +127,49 @@ trait OSHelper {
111127
case _ => 0L
112128
}
113129
}
130+
131+
/**
132+
* Create a cgroup attached to a given subsystem.
133+
*/
134+
def createCGroup(subsystem: String, name: String): Unit
135+
136+
/**
137+
* Test if a cgroup exists.
138+
*/
139+
def existsCGroup(subsystem: String, name: String): Boolean
140+
141+
/**
142+
* Attach pids (`toAttach` string) to an existing cgroup.
143+
*/
144+
def attachToCGroup(subsystem: String, name: String, toAttach: String): Unit
145+
146+
/**
147+
* Delete a cgroup attached to each given subsystem.
148+
*/
149+
def deleteCGroup(subsystem: String, name: String): Unit
150+
151+
/**
152+
* Get information for selected disks.
153+
*/
154+
def getDiskInfo(names: Seq[String]): Seq[Disk]
155+
156+
/**
157+
* Get the global read/written bytes information for selected disks.
158+
*/
159+
def getGlobalDiskBytes(disks: Seq[Disk]): Seq[Disk]
160+
161+
/**
162+
* Get the target read/written bytes information for a given target on selected disks.
163+
*/
164+
def getTargetDiskBytes(disks: Seq[Disk], target: Target): Seq[Disk]
165+
166+
/**
167+
* Get all directories recursively inside a given path.
168+
*/
169+
def getAllDirectories(dir: File): Seq[File] = {
170+
val current = dir.listFiles.filter(_.isDirectory)
171+
current ++ current.flatMap(getAllDirectories)
172+
}
114173
}
115174

116175
/**
@@ -166,6 +225,24 @@ class LinuxHelper extends Configuration(None) with OSHelper {
166225
case ConfigValue(p) => p
167226
case _ => "/sys/devices/system/cpu/cpu%?index/cpufreq/stats/time_in_state"
168227
}
228+
/**
229+
* Cgroup base directory.
230+
*/
231+
lazy val cgroupSysFSPath = load {
232+
_.getString("powerapi.sysfs.cgroup-sysfs-path")
233+
} match {
234+
case ConfigValue(p) => p
235+
case _ => "/sys/fs/cgroup"
236+
}
237+
/**
238+
* Disk stats filepath.
239+
*/
240+
lazy val diskStatPath = load {
241+
_.getString("powerapi.procfs.disk-stats-path")
242+
} match {
243+
case ConfigValue(p) => p
244+
case _ => "/proc/diskstats"
245+
}
169246
/**
170247
* CPU's topology.
171248
*/
@@ -297,6 +374,91 @@ class LinuxHelper extends Configuration(None) with OSHelper {
297374

298375
TimeInStates(result.toMap[Long, Long])
299376
}
377+
378+
def createCGroup(subsystem: String, name: String): Unit = {
379+
Seq("cgcreate", "-g", s"$subsystem:/$name").!
380+
}
381+
382+
def existsCGroup(subsystem: String, name: String): Boolean = {
383+
new File(s"$cgroupSysFSPath/$subsystem/$name").exists()
384+
}
385+
386+
def attachToCGroup(subsystem: String, name: String, toAttach: String): Unit = {
387+
Seq("cgclassify", "-g", s"$subsystem:/$name", s"$toAttach").!
388+
}
389+
390+
def deleteCGroup(subsystem: String, name: String): Unit = {
391+
Seq("cgdelete", s"$subsystem:/$name").!
392+
}
393+
394+
def getDiskInfo(names: Seq[String]): Seq[Disk] = {
395+
val Regex = s"\\s+([0-9]+)\\s+([0-9]+)\\s+(${names.mkString("|")})\\s+.*".r
396+
397+
using(diskStatPath)(source => {
398+
source.getLines().collect {
399+
case line => line match {
400+
case Regex(major, minor, name) => Some(Disk(name, major.toInt, minor.toInt))
401+
case _ => None
402+
}
403+
}.filter(_.isDefined).map(_.get).toList
404+
})
405+
}
406+
407+
def getGlobalDiskBytes(disks: Seq[Disk]): Seq[Disk] = {
408+
val directory = new File(s"$cgroupSysFSPath/blkio")
409+
val directories = Seq(directory) ++ getAllDirectories(directory)
410+
val bytesRead = collection.mutable.Map[String, Long]()
411+
val bytesWritten = collection.mutable.Map[String, Long]()
412+
413+
for (dir <- directories) {
414+
using(s"${dir.getAbsolutePath}/blkio.throttle.io_service_bytes")(source => {
415+
val lines = source.getLines().toList
416+
417+
for (disk <- disks) {
418+
val ReadRegex = s"(${disk.major}):(${disk.minor})\\s+Read\\s+([0-9]+)".r
419+
val WriteRegex = s"(${disk.major}):(${disk.minor})\\s+Write\\s+([0-9]+)".r
420+
421+
for (line <- lines) {
422+
line match {
423+
case ReadRegex(_, _, bytes) => bytesRead += (disk.name -> (bytesRead.getOrElse(disk.name, 0l) + bytes.toLong))
424+
case WriteRegex(_, _, bytes) => bytesWritten += (disk.name -> (bytesWritten.getOrElse(disk.name, 0l) + bytes.toLong))
425+
case _ =>
426+
}
427+
}
428+
}
429+
})
430+
}
431+
432+
disks.map(disk => Disk(disk.name, disk.major, disk.minor, bytesRead.getOrElse(disk.name, 0l), bytesWritten.getOrElse(disk.name, 0l)))
433+
}
434+
435+
def getTargetDiskBytes(disks: Seq[Disk], target: Target): Seq[Disk] = {
436+
val pids = getProcesses(target).map(_.pid)
437+
val directories = pids.map(pid => new File(s"$cgroupSysFSPath/blkio/powerapi/$pid"))
438+
val bytesRead = collection.mutable.Map[String, Long]()
439+
val bytesWritten = collection.mutable.Map[String, Long]()
440+
441+
for (dir <- directories) {
442+
using(s"${dir.getAbsolutePath}/blkio.throttle.io_service_bytes")(source => {
443+
val lines = source.getLines().toList
444+
445+
for (disk <- disks) {
446+
val ReadRegex = s"(${disk.major}):(${disk.minor})\\s+Read\\s+([0-9]+)".r
447+
val WriteRegex = s"(${disk.major}):(${disk.minor})\\s+Write\\s+([0-9]+)".r
448+
449+
for (line <- lines) {
450+
line match {
451+
case ReadRegex(_, _, bytes) => bytesRead += (disk.name -> (bytesRead.getOrElse(disk.name, 0l) + bytes.toLong))
452+
case WriteRegex(_, _, bytes) => bytesWritten += (disk.name -> (bytesWritten.getOrElse(disk.name, 0l) + bytes.toLong))
453+
case _ =>
454+
}
455+
}
456+
}
457+
})
458+
}
459+
460+
disks.map(disk => Disk(disk.name, disk.major, disk.minor, bytesRead.getOrElse(disk.name, 0l), bytesWritten.getOrElse(disk.name, 0l)))
461+
}
300462
}
301463

302464
trait SigarHelperConfiguration extends Configuration {
@@ -356,4 +518,18 @@ class SigarHelper(sigar: SigarProxy) extends OSHelper {
356518
}
357519

358520
def getTimeInStates: TimeInStates = throw new SigarException("sigar cannot be able to get how many time CPU spent under each frequency")
521+
522+
def createCGroup(subsystem: String, name: String): Unit = {}
523+
524+
def existsCGroup(subsystem: String, name: String): Boolean = false
525+
526+
def attachToCGroup(subsystem: String, name: String, toAttach: String): Unit = {}
527+
528+
def deleteCGroup(subsystem: String, name: String): Unit = {}
529+
530+
def getDiskInfo(names: Seq[String]): Seq[Disk] = Seq()
531+
532+
def getGlobalDiskBytes(disks: Seq[Disk]): Seq[Disk] = Seq()
533+
534+
def getTargetDiskBytes(disks: Seq[Disk], target: Target): Seq[Disk] = Seq()
359535
}

powerapi-core/src/main/scala/org/powerapi/module/PowerChannel.scala

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,7 @@ object PowerChannel extends Channel {
9191
/**
9292
* Base trait for each power report
9393
*/
94-
trait PowerReport extends Message {
95-
def muid: UUID
96-
97-
def tick: Tick
98-
}
94+
trait PowerReport extends Message
9995

10096
/**
10197
* RawPowerReport is represented as a dedicated type of message.
@@ -132,21 +128,27 @@ object PowerChannel extends Channel {
132128

133129
def devices: Set[String] = reports.map(_.device).toSet
134130

135-
def power = aggregator.getOrElse(SUM _)(reports.map(_.power))
131+
def power: Power = aggregator.getOrElse(SUM _)(reports.map(_.power))
132+
133+
def powerPerDevice: Map[String, Power] = {
134+
for ((device, deviceReports) <- reports.groupBy(_.device)) yield {
135+
device -> aggregator.getOrElse(SUM _)(deviceReports.map(_.power))
136+
}
137+
}
138+
139+
def powerPerTarget: Map[Target, Power] = {
140+
for ((target, targetReports) <- reports.groupBy(_.target)) yield {
141+
target -> aggregator.getOrElse(SUM _)(targetReports.map(_.power))
142+
}
143+
}
136144

137145
def aggregator: Option[Seq[Power] => Power] = _aggregator
138146

139147
def aggregator_=(agg: Option[Seq[Power] => Power]): Unit = {
140148
_aggregator = agg
141149
}
142150

143-
def tick: Tick = {
144-
if (reports.map(_.tick).toSet.size == 1) reports.head.tick
145-
else new Tick {
146-
val topic = ""
147-
val timestamp = System.currentTimeMillis()
148-
}
149-
}
151+
def ticks: Set[Tick] = reports.map(_.tick).toSet
150152
}
151153

152154
}

0 commit comments

Comments
 (0)