|
6 | 6 | import re |
7 | 7 | from abc import ABC, abstractmethod |
8 | 8 | from dataclasses import dataclass |
| 9 | +from subprocess import run |
9 | 10 | from typing import Dict, Iterable, List, Optional, Tuple |
10 | 11 |
|
11 | 12 | import psutil |
@@ -404,3 +405,99 @@ def from_utils( |
404 | 405 | logger.warning("Could not read AppleSiliconChip model.") |
405 | 406 |
|
406 | 407 | return cls(output_dir=output_dir, model=model, chip_part=chip_part) |
| 408 | + |
| 409 | + |
| 410 | +@dataclass |
| 411 | +class Raspberry(BaseHardware): |
| 412 | + def __init__( |
| 413 | + self, |
| 414 | + output_dir: str, |
| 415 | + model: str, |
| 416 | + chip_part: str = "CPU", |
| 417 | + ): |
| 418 | + if chip_part == "CPU": |
| 419 | + self.WANTED_COMPONENTS = ( |
| 420 | + "3V7_WL_SW", |
| 421 | + "3V3_SYS", |
| 422 | + "1V8_SYS", |
| 423 | + "1V1_SYS", |
| 424 | + "0V8_SW", |
| 425 | + "VDD_CORE", |
| 426 | + "3V3_DAC", |
| 427 | + "3V3_ADC", |
| 428 | + "0V8_AON", |
| 429 | + ) |
| 430 | + elif chip_part == "RAM": |
| 431 | + self.WANTED_COMPONENTS = ("DDR_VDD2", "DDR_VDD2", "DDR_VDDQ") |
| 432 | + else: |
| 433 | + raise Exception("Unknown chip part", chip_part) |
| 434 | + |
| 435 | + self._output_dir = output_dir |
| 436 | + self._model = model |
| 437 | + self.chip_part = chip_part |
| 438 | + |
| 439 | + def __repr__(self) -> str: |
| 440 | + return f"Raspberry ({self._model} > {self.chip_part})" |
| 441 | + |
| 442 | + def _get_power(self) -> Power: |
| 443 | + """ """ |
| 444 | + measure: Dict = self.get_measure() |
| 445 | + return Power.from_watts(measure["power"]) |
| 446 | + |
| 447 | + def _get_energy(self, delay: Time) -> Energy: |
| 448 | + """ |
| 449 | + Get Chip part energy deltas |
| 450 | + Args: |
| 451 | + chip_part (str): Chip part to get power from (Processor, GPU, etc.) |
| 452 | + :return: energy in kWh |
| 453 | + """ |
| 454 | + energy = Energy.from_power_and_time( |
| 455 | + power=self._get_power(), time=Time.from_seconds(delay) |
| 456 | + ) |
| 457 | + return energy |
| 458 | + |
| 459 | + def total_power(self) -> Power: |
| 460 | + return self._get_power() |
| 461 | + |
| 462 | + def start(self): |
| 463 | + # ... |
| 464 | + ... |
| 465 | + |
| 466 | + def get_model(self): |
| 467 | + return self._model |
| 468 | + |
| 469 | + def get_measure(self): |
| 470 | + components = {} |
| 471 | + res = run(["vcgencmd", "pmic_read_adc"], capture_output=True) |
| 472 | + lines = res.stdout.decode("utf-8").splitlines() |
| 473 | + for line in lines: |
| 474 | + res = re.search( |
| 475 | + "([A-Z_0-9]+)_[VA] (current|volt)\(([0-9]+)\)=([0-9.]+)", # noqa: W605 |
| 476 | + line, |
| 477 | + ) |
| 478 | + component_name, measure_type, idx, value = res.groups() |
| 479 | + component = components[component_name] = components.get(component_name, {}) |
| 480 | + component[measure_type] = float(value) |
| 481 | + pi_power = 0 |
| 482 | + |
| 483 | + for component_name, component in components.items(): |
| 484 | + try: |
| 485 | + component["power"] = component["volt"] * component["current"] |
| 486 | + if component_name in self.WANTED_COMPONENTS: |
| 487 | + pi_power += component["power"] |
| 488 | + except Exception: |
| 489 | + ... |
| 490 | + return { |
| 491 | + "power": pi_power, |
| 492 | + } |
| 493 | + |
| 494 | + @classmethod |
| 495 | + def from_utils( |
| 496 | + cls, output_dir: str, model: Optional[str] = None, chip_part: str = "CPU" |
| 497 | + ) -> "Raspberry": |
| 498 | + if model is None: |
| 499 | + model = detect_cpu_model() |
| 500 | + if model is None: |
| 501 | + logger.warning("Could not read Raspberry model.") |
| 502 | + |
| 503 | + return cls(output_dir=output_dir, model=model, chip_part=chip_part) |
0 commit comments