Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/example_multiple_spectrograms_id_public.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@
"source": [
"from pandas import Timedelta\n",
"\n",
"spectro_dataset = dataset.get_analysis_spectrodataset(analysis)\n",
"spectro_dataset = dataset.get_analysis_spectrodatasets(analysis)\n",
"\n",
"for sd in spectro_dataset.data:\n",
" sd.name = next(iter(sd.audio_data.files)).path.stem\n",
Expand Down
2 changes: 1 addition & 1 deletion docs/source/example_multiple_spectrograms_public.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@
"source": [
"import matplotlib.pyplot as plt\n",
"\n",
"analysis_spectro_dataset = dataset.get_analysis_spectrodataset(\n",
"analysis_spectro_dataset = dataset.get_analysis_spectrodatasets(\n",
" analysis=analysis,\n",
" audio_dataset=audio_dataset, # So that the filtered SpectroDataset is returned\n",
")\n",
Expand Down
117 changes: 117 additions & 0 deletions src/osekit/core_api/spectro_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,123 @@ def update(first: int, last: int) -> None:

update(first=first, last=last)

def get_zoomed_spectro_datasets(
self,
zoom_levels: list[int],
zoom_ffts: list[ShortTimeFFT] | None,
) -> dict[int, SpectroDataset]:
"""Return all zoomed SpectroDatasets from the current SpectroDataset.

For a given zoom level x, each SpectroData from the current SpectroDataset
will be split in x parts (that is, a 10s-long SpectroData with x=2 will lead to
two 5s-long SpectroDatas).

Parameters
----------
zoom_levels: list[int]
All required zoom levels.
zoom_ffts: list[ShortTimeFFT]|None
FFTs to use for computing the zoomed SpectroDataset.
If None, will be defaulted as done in
the SpectroDataset._get_zoomed_fft() method.

Returns
-------
dict[int, SpectroDataset]
Dictionary where the key are the zoom levels, and the values are the
corresponding SpectroDatasets.

"""
zoom_ffts = zoom_ffts if zoom_ffts is not None else [None] * len(zoom_levels)
output = {}
for zoom_level, zoom_fft in zip(zoom_levels, zoom_ffts, strict=True):
if zoom_level == 1:
continue
zoom_sds = self.get_zoomed_spectro_dataset(
zoom_level=zoom_level,
zoom_fft=zoom_fft,
)
output[zoom_level] = zoom_sds
return output

def get_zoomed_spectro_dataset(
self,
zoom_level: int,
zoom_fft: ShortTimeFFT | None = None,
) -> SpectroDataset:
"""Return a zoomed SpectroDataset from the current SpectroDataset.

For a given zoom level x, each SpectroData from the current SpectroDataset
will be split in x parts (that is, a 10s-long SpectroData with x=2 will lead to
two 5s-long SpectroDatas).

Parameters
----------
zoom_level: int
Zoom level of the output SpectroDataset.
Each SpectroData from the current SpectroDataset will be split in
zoom_level equal-duration parts.
zoom_fft: ShortTimeFFT | None
FFT to use for computing the zoomed SpectroDataset.
If None, will be defaulted as done in
the SpectroDataset._get_zoomed_fft() method.

Returns
-------
SpectroDataset:
Zoomed SpectroDataset from the current SpectroDataset.

"""
if zoom_level == 1:
return self

zoom_fft = zoom_fft or self._get_zoomed_fft(zoom_level=zoom_level)

zoomed_sds = SpectroDataset(
[zoomed_sd for sd in self.data for zoomed_sd in sd.split(zoom_level)],
name=f"{self.name}_x{zoom_level}",
suffix=self.suffix,
scale=self.scale,
v_lim=self.v_lim,
)
zoomed_sds.fft = zoom_fft

return zoomed_sds

def _get_zoomed_fft(
self,
zoom_level: int,
) -> ShortTimeFFT:
"""Compute the default FFT to use for computing the zoomed spectra.

By default, SpectroDatasets with a zoomed factor z will use the
same FFT as the z=1 SpectroDataset, but with a hop that is
divided by z.

Parameters
----------
zoom_level: int
Zoom level used for computing the spectra.

Returns
-------
ShortTimeFFT
FFT used for computing the zoomed spectra.

"""
if zoom_level < 1:
msg = f"Invalid zoom level {zoom_level}."
raise ValueError(msg)

if zoom_level == 1:
return self.fft

return ShortTimeFFT(
win=self.fft.win,
hop=self.fft.hop // zoom_level,
fs=self.fft.fs,
)

def to_dict(self) -> dict:
"""Serialize a SpectroDataset to a dictionary.

Expand Down
31 changes: 25 additions & 6 deletions src/osekit/public_api/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
from enum import Flag, auto
from typing import TYPE_CHECKING, Literal

from osekit.core_api.frequency_scale import Scale
from scipy.signal import ShortTimeFFT

from osekit.utils.audio_utils import Normalization

if TYPE_CHECKING:
from pandas import Timedelta, Timestamp
from scipy.signal import ShortTimeFFT

from osekit.core_api.frequency_scale import Scale


class AnalysisType(Flag):
Expand Down Expand Up @@ -78,6 +80,8 @@ def __init__(
colormap: str | None = None,
scale: Scale | None = None,
nb_ltas_time_bins: int | None = None,
zoom_levels: list[int] | None = None,
zoom_ffts: list[ShortTimeFFT] | None = None,
) -> None:
"""Initialize an Analysis object.

Expand Down Expand Up @@ -141,6 +145,19 @@ def __init__(
If None, the spectrogram will be computed regularly.
If specified, the spectrogram will be computed as LTAS, with the value
representing the maximum number of averaged time bins.
zoom_levels: list[int] | None
If specified, additional analyses datasets will be created at the requested
zoom levels.
e.g. with a data_duration of 10s and zoom_levels = [2,4], 3 SpectroDatasets
will be created, with data_duration = 5s and 2.5s.
This will only affect spectral exports, and if AnalysisType.AUDIO is
included in the analysis, zoomed SpectroDatasets will be linked to the
x1 zoom SpectroData.
zoom_ffts: list[ShortTimeFFT | None]
FFT to use for computing the zoomed spectra.
By default, SpectroDatasets with a zoomed factor z will use the
same FFT as the z=1 SpectroDataset, but with a hop that is
divided by z.

"""
self.analysis_type = analysis_type
Expand All @@ -153,16 +170,18 @@ def __init__(
self.name = name
self.normalization = normalization
self.subtype = subtype
self.fft = fft
self.v_lim = v_lim
self.colormap = colormap
self.scale = scale
self.nb_ltas_time_bins = nb_ltas_time_bins

if self.is_spectro and fft is None:
raise ValueError(
"FFT parameter should be given if spectra outputs are selected.",
)
msg = "FFT parameter should be given if spectra outputs are selected."
raise ValueError(msg)

self.fft = fft
self.zoom_levels = list({1, *zoom_levels}) if zoom_levels else None
self.zoom_ffts = zoom_ffts

@property
def is_spectro(self) -> bool:
Expand Down
89 changes: 65 additions & 24 deletions src/osekit/public_api/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,18 @@ def get_analysis_audiodataset(self, analysis: Analysis) -> AudioDataset:

return ads

def get_analysis_spectrodataset(
def get_analysis_spectrodatasets(
self,
analysis: Analysis,
audio_dataset: AudioDataset | None = None,
) -> SpectroDataset | LTASDataset:
"""Return a SpectroDataset (or LTASDataset) created from analysis parameters.
) -> tuple[
SpectroDataset | LTASDataset,
dict[int, list[SpectroDataset | LTASDataset]],
]:
"""Return SpectroDatasets (or LTASDatasets) created from analysis parameters.

The output contains the unzoomed dataset (matching the analysis data_duration) plus
the potential zoomed datasets.

Parameters
----------
Expand All @@ -276,11 +282,14 @@ def get_analysis_spectrodataset(

Returns
-------
SpectroDataset | LTASDataset:
The SpectroDataset that match the analysis parameters.
This SpectroDataset can be used, for example, to have a peek at the
tuple[SpectroDataset | LTASDataset, dict[int,list[SpectroDataset | LTASDataset]]]:
SpectroDatasets that match the analysis parameters.
The first element of the tuple is the unzoomed analysis dataset.
The second element of the tuple is a dict, with the key
being the zoom level and the value the corresponding analysis dataset.
These SpectroDataset can be used, for example, to have a peek at the
analysis output before running it.
If Analysis.is_ltas is True, a LTASDataset is returned.
If Analysis.nb_ltas_time_bins is not None, a LTASDataset is returned.

"""
if analysis.fft is None:
Expand Down Expand Up @@ -308,7 +317,13 @@ def get_analysis_spectrodataset(
nb_time_bins=analysis.nb_ltas_time_bins,
)

return sds
if analysis.zoom_levels is None:
return sds, {}

return sds, sds.get_zoomed_spectro_datasets(
zoom_levels=analysis.zoom_levels,
zoom_ffts=analysis.zoom_ffts,
)

def run_analysis(
self,
Expand Down Expand Up @@ -363,26 +378,35 @@ def run_analysis(
self._add_audio_dataset(ads=ads, analysis_name=analysis.name)

sds = None
zoom_sdses = {}
if analysis.is_spectro:
sds = (
self.get_analysis_spectrodataset(
sds, zoom_sdses = (
self.get_analysis_spectrodatasets(
analysis=analysis,
audio_dataset=ads,
)
if spectro_dataset is None
else spectro_dataset
)
self._add_spectro_dataset(sds=sds, analysis_name=analysis.name)
for zoom_level, zoom_sds in zoom_sdses.items():
self._add_spectro_dataset(
sds=zoom_sds,
analysis_name=analysis.name,
zoom_level=zoom_level,
zoom_reference=sds.name,
)

self.export_analysis(
analysis_type=analysis.analysis_type,
ads=ads,
sds=sds,
link=True,
subtype=analysis.subtype,
nb_jobs=nb_jobs,
name=analysis.name,
)
for analysis_sds in [sds, *list(zoom_sdses.values())]:
self.export_analysis(
analysis_type=analysis.analysis_type,
ads=ads,
sds=analysis_sds,
link=True,
subtype=analysis.subtype,
nb_jobs=nb_jobs,
name=analysis.name,
)

self.write_json()

Expand Down Expand Up @@ -537,12 +561,16 @@ def _add_spectro_dataset(
self,
sds: SpectroDataset | LTASDataset,
analysis_name: str,
zoom_level: int = 1,
zoom_reference: str | None = None,
) -> None:
sds.folder = self._get_spectro_dataset_subpath(sds=sds)
self.datasets[sds.name] = {
"class": type(sds).__name__,
"dataset": sds,
"analysis": analysis_name,
"zoom_level": zoom_level,
"zoom_reference": zoom_reference,
}
sds.write_json(sds.folder)

Expand Down Expand Up @@ -702,11 +730,7 @@ def to_dict(self) -> dict:
"""
return {
"datasets": {
name: {
"class": dataset["class"],
"analysis": dataset["analysis"],
"json": str(dataset["dataset"].folder / f"{name}.json"),
}
name: self.analysis_dataset_to_dict(name=name)
for name, dataset in self.datasets.items()
},
"instrument": (
Expand All @@ -718,6 +742,20 @@ def to_dict(self) -> dict:
"timezone": self.timezone,
}

def analysis_dataset_to_dict(self, name: str) -> dict:
dataset = self.datasets[name]
output = {
"class": dataset["class"],
"analysis": dataset["analysis"],
"json": str(dataset["dataset"].folder / f"{name}.json"),
}
if type(dataset["dataset"]) in (SpectroDataset, LTASDataset):
output |= {
"zoom_level": dataset["zoom_level"],
"zoom_reference": dataset["zoom_reference"],
}
return output

@classmethod
def from_dict(cls, dictionary: dict) -> Dataset:
"""Deserialize a dataset from a dictionary.
Expand Down Expand Up @@ -749,6 +787,9 @@ def from_dict(cls, dictionary: dict) -> Dataset:
"analysis": dataset["analysis"],
"dataset": dataset_class.from_json(Path(dataset["json"])),
}
for zoom_info in ("zoom_level", "zoom_reference"):
if zoom_info in dataset:
datasets[name][zoom_info] = dataset[zoom_info]
return cls(
folder=Path(),
instrument=Instrument.from_dict(dictionary["instrument"]),
Expand Down
2 changes: 1 addition & 1 deletion tests/test_public_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ def test_get_analysis_spectrodataset(
)
dataset.build()

analysis_sds = dataset.get_analysis_spectrodataset(analysis=analysis)
analysis_sds, _ = dataset.get_analysis_spectrodatasets(analysis=analysis)

assert all(
ad.begin == e.begin and ad.end == e.end
Expand Down
Loading