diff --git a/docs/source/example_multiple_spectrograms_id_public.ipynb b/docs/source/example_multiple_spectrograms_id_public.ipynb index 8e3e6d0f..68ea7b24 100644 --- a/docs/source/example_multiple_spectrograms_id_public.ipynb +++ b/docs/source/example_multiple_spectrograms_id_public.ipynb @@ -175,7 +175,7 @@ "source": [ "from pandas import Timedelta\n", "\n", - "spectro_dataset = dataset.get_analysis_spectrodataset(analysis)\n", + "spectro_dataset = dataset.get_analysis_spectrodatasets(analysis)\n", "\n", "for sd in spectro_dataset.data:\n", " sd.name = next(iter(sd.audio_data.files)).path.stem\n", diff --git a/docs/source/example_multiple_spectrograms_public.ipynb b/docs/source/example_multiple_spectrograms_public.ipynb index 47a3477d..ec240ab0 100644 --- a/docs/source/example_multiple_spectrograms_public.ipynb +++ b/docs/source/example_multiple_spectrograms_public.ipynb @@ -243,7 +243,7 @@ "source": [ "import matplotlib.pyplot as plt\n", "\n", - "analysis_spectro_dataset = dataset.get_analysis_spectrodataset(\n", + "analysis_spectro_dataset = dataset.get_analysis_spectrodatasets(\n", " analysis=analysis,\n", " audio_dataset=audio_dataset, # So that the filtered SpectroDataset is returned\n", ")\n", diff --git a/src/osekit/core_api/spectro_dataset.py b/src/osekit/core_api/spectro_dataset.py index 5bd9dbf7..0e979db9 100644 --- a/src/osekit/core_api/spectro_dataset.py +++ b/src/osekit/core_api/spectro_dataset.py @@ -384,6 +384,123 @@ def update(first: int, last: int) -> None: update(first=first, last=last) + def get_zoomed_spectro_datasets( + self, + zoom_levels: list[int], + zoom_ffts: list[ShortTimeFFT] | None, + ) -> dict[int, SpectroDataset]: + """Return all zoomed SpectroDatasets from the current SpectroDataset. + + For a given zoom level x, each SpectroData from the current SpectroDataset + will be split in x parts (that is, a 10s-long SpectroData with x=2 will lead to + two 5s-long SpectroDatas). + + Parameters + ---------- + zoom_levels: list[int] + All required zoom levels. + zoom_ffts: list[ShortTimeFFT]|None + FFTs to use for computing the zoomed SpectroDataset. + If None, will be defaulted as done in + the SpectroDataset._get_zoomed_fft() method. + + Returns + ------- + dict[int, SpectroDataset] + Dictionary where the key are the zoom levels, and the values are the + corresponding SpectroDatasets. + + """ + zoom_ffts = zoom_ffts if zoom_ffts is not None else [None] * len(zoom_levels) + output = {} + for zoom_level, zoom_fft in zip(zoom_levels, zoom_ffts, strict=True): + if zoom_level == 1: + continue + zoom_sds = self.get_zoomed_spectro_dataset( + zoom_level=zoom_level, + zoom_fft=zoom_fft, + ) + output[zoom_level] = zoom_sds + return output + + def get_zoomed_spectro_dataset( + self, + zoom_level: int, + zoom_fft: ShortTimeFFT | None = None, + ) -> SpectroDataset: + """Return a zoomed SpectroDataset from the current SpectroDataset. + + For a given zoom level x, each SpectroData from the current SpectroDataset + will be split in x parts (that is, a 10s-long SpectroData with x=2 will lead to + two 5s-long SpectroDatas). + + Parameters + ---------- + zoom_level: int + Zoom level of the output SpectroDataset. + Each SpectroData from the current SpectroDataset will be split in + zoom_level equal-duration parts. + zoom_fft: ShortTimeFFT | None + FFT to use for computing the zoomed SpectroDataset. + If None, will be defaulted as done in + the SpectroDataset._get_zoomed_fft() method. + + Returns + ------- + SpectroDataset: + Zoomed SpectroDataset from the current SpectroDataset. + + """ + if zoom_level == 1: + return self + + zoom_fft = zoom_fft or self._get_zoomed_fft(zoom_level=zoom_level) + + zoomed_sds = SpectroDataset( + [zoomed_sd for sd in self.data for zoomed_sd in sd.split(zoom_level)], + name=f"{self.name}_x{zoom_level}", + suffix=self.suffix, + scale=self.scale, + v_lim=self.v_lim, + ) + zoomed_sds.fft = zoom_fft + + return zoomed_sds + + def _get_zoomed_fft( + self, + zoom_level: int, + ) -> ShortTimeFFT: + """Compute the default FFT to use for computing the zoomed spectra. + + By default, SpectroDatasets with a zoomed factor z will use the + same FFT as the z=1 SpectroDataset, but with a hop that is + divided by z. + + Parameters + ---------- + zoom_level: int + Zoom level used for computing the spectra. + + Returns + ------- + ShortTimeFFT + FFT used for computing the zoomed spectra. + + """ + if zoom_level < 1: + msg = f"Invalid zoom level {zoom_level}." + raise ValueError(msg) + + if zoom_level == 1: + return self.fft + + return ShortTimeFFT( + win=self.fft.win, + hop=self.fft.hop // zoom_level, + fs=self.fft.fs, + ) + def to_dict(self) -> dict: """Serialize a SpectroDataset to a dictionary. diff --git a/src/osekit/public_api/analysis.py b/src/osekit/public_api/analysis.py index 25344b0b..45947087 100644 --- a/src/osekit/public_api/analysis.py +++ b/src/osekit/public_api/analysis.py @@ -3,12 +3,14 @@ from enum import Flag, auto from typing import TYPE_CHECKING, Literal -from osekit.core_api.frequency_scale import Scale +from scipy.signal import ShortTimeFFT + from osekit.utils.audio_utils import Normalization if TYPE_CHECKING: from pandas import Timedelta, Timestamp - from scipy.signal import ShortTimeFFT + + from osekit.core_api.frequency_scale import Scale class AnalysisType(Flag): @@ -78,6 +80,8 @@ def __init__( colormap: str | None = None, scale: Scale | None = None, nb_ltas_time_bins: int | None = None, + zoom_levels: list[int] | None = None, + zoom_ffts: list[ShortTimeFFT] | None = None, ) -> None: """Initialize an Analysis object. @@ -141,6 +145,19 @@ def __init__( If None, the spectrogram will be computed regularly. If specified, the spectrogram will be computed as LTAS, with the value representing the maximum number of averaged time bins. + zoom_levels: list[int] | None + If specified, additional analyses datasets will be created at the requested + zoom levels. + e.g. with a data_duration of 10s and zoom_levels = [2,4], 3 SpectroDatasets + will be created, with data_duration = 5s and 2.5s. + This will only affect spectral exports, and if AnalysisType.AUDIO is + included in the analysis, zoomed SpectroDatasets will be linked to the + x1 zoom SpectroData. + zoom_ffts: list[ShortTimeFFT | None] + FFT to use for computing the zoomed spectra. + By default, SpectroDatasets with a zoomed factor z will use the + same FFT as the z=1 SpectroDataset, but with a hop that is + divided by z. """ self.analysis_type = analysis_type @@ -153,16 +170,18 @@ def __init__( self.name = name self.normalization = normalization self.subtype = subtype - self.fft = fft self.v_lim = v_lim self.colormap = colormap self.scale = scale self.nb_ltas_time_bins = nb_ltas_time_bins if self.is_spectro and fft is None: - raise ValueError( - "FFT parameter should be given if spectra outputs are selected.", - ) + msg = "FFT parameter should be given if spectra outputs are selected." + raise ValueError(msg) + + self.fft = fft + self.zoom_levels = list({1, *zoom_levels}) if zoom_levels else None + self.zoom_ffts = zoom_ffts @property def is_spectro(self) -> bool: diff --git a/src/osekit/public_api/dataset.py b/src/osekit/public_api/dataset.py index e284350d..2b8fbe1d 100644 --- a/src/osekit/public_api/dataset.py +++ b/src/osekit/public_api/dataset.py @@ -258,12 +258,18 @@ def get_analysis_audiodataset(self, analysis: Analysis) -> AudioDataset: return ads - def get_analysis_spectrodataset( + def get_analysis_spectrodatasets( self, analysis: Analysis, audio_dataset: AudioDataset | None = None, - ) -> SpectroDataset | LTASDataset: - """Return a SpectroDataset (or LTASDataset) created from analysis parameters. + ) -> tuple[ + SpectroDataset | LTASDataset, + dict[int, list[SpectroDataset | LTASDataset]], + ]: + """Return SpectroDatasets (or LTASDatasets) created from analysis parameters. + + The output contains the unzoomed dataset (matching the analysis data_duration) plus + the potential zoomed datasets. Parameters ---------- @@ -276,11 +282,14 @@ def get_analysis_spectrodataset( Returns ------- - SpectroDataset | LTASDataset: - The SpectroDataset that match the analysis parameters. - This SpectroDataset can be used, for example, to have a peek at the + tuple[SpectroDataset | LTASDataset, dict[int,list[SpectroDataset | LTASDataset]]]: + SpectroDatasets that match the analysis parameters. + The first element of the tuple is the unzoomed analysis dataset. + The second element of the tuple is a dict, with the key + being the zoom level and the value the corresponding analysis dataset. + These SpectroDataset can be used, for example, to have a peek at the analysis output before running it. - If Analysis.is_ltas is True, a LTASDataset is returned. + If Analysis.nb_ltas_time_bins is not None, a LTASDataset is returned. """ if analysis.fft is None: @@ -308,7 +317,13 @@ def get_analysis_spectrodataset( nb_time_bins=analysis.nb_ltas_time_bins, ) - return sds + if analysis.zoom_levels is None: + return sds, {} + + return sds, sds.get_zoomed_spectro_datasets( + zoom_levels=analysis.zoom_levels, + zoom_ffts=analysis.zoom_ffts, + ) def run_analysis( self, @@ -363,9 +378,10 @@ def run_analysis( self._add_audio_dataset(ads=ads, analysis_name=analysis.name) sds = None + zoom_sdses = {} if analysis.is_spectro: - sds = ( - self.get_analysis_spectrodataset( + sds, zoom_sdses = ( + self.get_analysis_spectrodatasets( analysis=analysis, audio_dataset=ads, ) @@ -373,16 +389,24 @@ def run_analysis( else spectro_dataset ) self._add_spectro_dataset(sds=sds, analysis_name=analysis.name) + for zoom_level, zoom_sds in zoom_sdses.items(): + self._add_spectro_dataset( + sds=zoom_sds, + analysis_name=analysis.name, + zoom_level=zoom_level, + zoom_reference=sds.name, + ) - self.export_analysis( - analysis_type=analysis.analysis_type, - ads=ads, - sds=sds, - link=True, - subtype=analysis.subtype, - nb_jobs=nb_jobs, - name=analysis.name, - ) + for analysis_sds in [sds, *list(zoom_sdses.values())]: + self.export_analysis( + analysis_type=analysis.analysis_type, + ads=ads, + sds=analysis_sds, + link=True, + subtype=analysis.subtype, + nb_jobs=nb_jobs, + name=analysis.name, + ) self.write_json() @@ -537,12 +561,16 @@ def _add_spectro_dataset( self, sds: SpectroDataset | LTASDataset, analysis_name: str, + zoom_level: int = 1, + zoom_reference: str | None = None, ) -> None: sds.folder = self._get_spectro_dataset_subpath(sds=sds) self.datasets[sds.name] = { "class": type(sds).__name__, "dataset": sds, "analysis": analysis_name, + "zoom_level": zoom_level, + "zoom_reference": zoom_reference, } sds.write_json(sds.folder) @@ -702,11 +730,7 @@ def to_dict(self) -> dict: """ return { "datasets": { - name: { - "class": dataset["class"], - "analysis": dataset["analysis"], - "json": str(dataset["dataset"].folder / f"{name}.json"), - } + name: self.analysis_dataset_to_dict(name=name) for name, dataset in self.datasets.items() }, "instrument": ( @@ -718,6 +742,20 @@ def to_dict(self) -> dict: "timezone": self.timezone, } + def analysis_dataset_to_dict(self, name: str) -> dict: + dataset = self.datasets[name] + output = { + "class": dataset["class"], + "analysis": dataset["analysis"], + "json": str(dataset["dataset"].folder / f"{name}.json"), + } + if type(dataset["dataset"]) in (SpectroDataset, LTASDataset): + output |= { + "zoom_level": dataset["zoom_level"], + "zoom_reference": dataset["zoom_reference"], + } + return output + @classmethod def from_dict(cls, dictionary: dict) -> Dataset: """Deserialize a dataset from a dictionary. @@ -749,6 +787,9 @@ def from_dict(cls, dictionary: dict) -> Dataset: "analysis": dataset["analysis"], "dataset": dataset_class.from_json(Path(dataset["json"])), } + for zoom_info in ("zoom_level", "zoom_reference"): + if zoom_info in dataset: + datasets[name][zoom_info] = dataset[zoom_info] return cls( folder=Path(), instrument=Instrument.from_dict(dictionary["instrument"]), diff --git a/tests/test_public_api.py b/tests/test_public_api.py index 1add226e..85e53073 100644 --- a/tests/test_public_api.py +++ b/tests/test_public_api.py @@ -1036,7 +1036,7 @@ def test_get_analysis_spectrodataset( ) dataset.build() - analysis_sds = dataset.get_analysis_spectrodataset(analysis=analysis) + analysis_sds, _ = dataset.get_analysis_spectrodatasets(analysis=analysis) assert all( ad.begin == e.begin and ad.end == e.end diff --git a/tests/test_spectro.py b/tests/test_spectro.py index 062fb6a6..bd00c432 100644 --- a/tests/test_spectro.py +++ b/tests/test_spectro.py @@ -1224,3 +1224,47 @@ def mocked_ad_init( assert ad.begin == sd.begin assert ad.end == sd.end + + +@pytest.mark.parametrize( + ("fft", "zoom_level", "expected"), + [ + pytest.param( + ShortTimeFFT(hamming(1024), hop=1024, fs=24_000), + 1, + ShortTimeFFT(hamming(1024), hop=1024, fs=24_000), + id="x1_zoom_only_equals_no_zoom", + ), + pytest.param( + ShortTimeFFT(hamming(1024), hop=1024, fs=24_000), + 2, + ShortTimeFFT(hamming(1024), hop=512, fs=24_000), + id="x2_zoom", + ), + pytest.param( + ShortTimeFFT(hamming(1024), hop=1024, fs=24_000), + 8, + ShortTimeFFT(hamming(1024), hop=128, fs=24_000), + id="x8_zoom", + ), + pytest.param( + ShortTimeFFT(hamming(1024), hop=1024, fs=24_000), + 3, + ShortTimeFFT(hamming(1024), hop=341, fs=24_000), + id="hop_is_rounded_down", + ), + ], +) +def test_get_zoom_fft( + patch_audio_data: pytest.MonkeyPatch, + fft: ShortTimeFFT, + zoom_level: int, + expected: ShortTimeFFT, +) -> None: + sds = SpectroDataset( + [SpectroData.from_audio_data(AudioData(mocked_value=[]), fft=fft)], + ) + zoom_fft = sds._get_zoomed_fft(zoom_level) + assert np.array_equal(zoom_fft.win, expected.win) + assert zoom_fft.hop == expected.hop + assert zoom_fft.fs == expected.fs