diff --git a/CHANGES.rst b/CHANGES.rst index af6e4071..96a6b855 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -13,15 +13,19 @@ New Features ------------ +sbpy.activity +^^^^^^^^^^^^^ +- New `sbpy.activity.CircularAperture.from_coma_equivalent()` to immediately + create a `CircularAperture` from any other `Aperture` given a nominal coma + surface brightness distribution. [#393] + sbpy.utils ^^^^^^^^^^ - - New `required_packages` and `optional_packages` functions to test for the presence of required and optional packages. sbpy.utils.decorators ^^^^^^^^^^^^^^^^^^^^^ - - New `requires` and `optionally_uses` function decorators to simplify testing for required and optional packages. diff --git a/docs/sbpy/activity/dust.rst b/docs/sbpy/activity/dust.rst index ef651820..b5b47476 100644 --- a/docs/sbpy/activity/dust.rst +++ b/docs/sbpy/activity/dust.rst @@ -83,6 +83,15 @@ The `Afrho` class may be converted to a flux density, and the original value is >>> print(np.log10(f.value)) # doctest: +FLOAT_CMP -13.99 +`Afrho` may also be converted to/from geometric cross sectional area, given geometric albedo, photometric aperture, and observer-comet distance: + + >>> Ap = 0.05 # geometric albedo + >>> G = afrho.to_cross_section(Ap, aper, eph) + >>> print(G) # doctest: +FLOAT_CMP + 25763.15641363505 km2 + >>> print(Afrho.from_cross_section(G, Ap, aper, eph)) + 6029.9024895289485 cm + `Afrho` works seamlessly with `sbpy`'s spectral calibration framework (:ref:`sbpy-calib`) when the `astropy` affiliated package `synphot` is installed. The solar flux density (via `~sbpy.calib.solar_fluxd`) is not required, but instead the spectral wavelengths or the system transmission of the instrument and filter: .. doctest-requires:: synphot; astropy>=5.3 @@ -101,25 +110,31 @@ The `Afrho` class may be converted to a flux density, and the original value is Thermal emission with *εfρ* ^^^^^^^^^^^^^^^^^^^^^^^^^^^ - + The `Efrho` class has the same functionality as the `Afrho` class. The most important difference is that *εfρ* is calculated using a Planck function and temperature. `sbpy` follows common practice and parameterizes the temperature as a constant scale factor of :math:`T_{BB} = 278\,r_h^{1/2}`\ K, the equilibrium temperature of a large blackbody sphere at a distance :math:`r_h` from the Sun. Reproduce the *εfρ* of 246P/NEAT from Kelley et al. (2013). .. doctest-requires:: synphot - >>> wave = [15.8, 22.3] * u.um - >>> fluxd = [25.75, 59.2] * u.mJy + >>> wave = 15.8 * u.um + >>> fluxd = 25.75 * u.mJy >>> aper = 11.1 * u.arcsec >>> eph = Ephem.from_dict({'rh': 4.28 * u.au, 'delta': 3.71 * u.au}) >>> efrho = Efrho.from_fluxd(wave, fluxd, aper, eph) - >>> for i in range(len(wave)): - ... print('{:5.1f} at {:.1f}'.format(efrho[i], wave[i])) # doctest: +FLOAT_CMP - 406.2 cm at 15.8 um - 427.9 cm at 22.3 um + >>> print(efrho) # doctest: +FLOAT_CMP + 396.71290996643665 cm + +Compare to 397.0 cm listed in Kelley et al. (2013). -Compare to 397.0 cm and 424.6 cm listed in Kelley et al. (2013). +`Efrho` may also be converted to/from geometric cross sectional area, given emissivity, photometric aperture, and observer-comet distance: + >>> epsilon = 0.95 # geometric albedo + >>> G = efrho.to_cross_section(epsilon, aper, eph) + >>> print(G) # doctest: +FLOAT_CMP + 391.83188076171695 km2 + >>> print(Efrho.from_cross_section(G, epsilon, aper, eph)) # doctest: +FLOAT_CMP + 396.7129099664366 cm To/from magnitudes ^^^^^^^^^^^^^^^^^^ diff --git a/sbpy/activity/dust.py b/sbpy/activity/dust.py index 62a74a64..d6266cf8 100644 --- a/sbpy/activity/dust.py +++ b/sbpy/activity/dust.py @@ -32,7 +32,7 @@ from .. import data as sbd from .. import units as sbu from ..spectroscopy.sources import SinglePointSpectrumError -from .core import Aperture +from .core import CircularAperture @bib.cite( @@ -337,8 +337,7 @@ def from_fluxd(cls, wfb, fluxd, aper, eph, **kwargs): """ - fluxd1cm = cls(1 * u.cm).to_fluxd(wfb, aper, - eph, unit=fluxd.unit, **kwargs) + fluxd1cm = cls(1 * u.cm).to_fluxd(wfb, aper, eph, unit=fluxd.unit, **kwargs) if isinstance(fluxd1cm, u.Magnitude): coma = cls((fluxd - fluxd1cm).physical * u.cm) @@ -380,12 +379,8 @@ def to_fluxd(self, wfb, aper, eph, unit=None, **kwargs): # rho = effective circular aperture radius at the distance of # the comet. Keep track of array dimensionality as Ephem # objects can needlessly increase the number of dimensions. - if isinstance(aper, Aperture): - rho = aper.coma_equivalent_radius() - ndim = np.ndim(rho) - else: - rho = aper - ndim = np.ndim(rho) + rho = CircularAperture.from_coma_equivalent(aper).dim + ndim = np.ndim(rho) rho = rho.to("km", sbu.projected_size(eph)) ndim = max(ndim, np.ndim(self)) @@ -628,6 +623,120 @@ def to_phase(self, to_phase, from_phase, Phi=None): return self * Phi(to_phase) / Phi(from_phase) + @sbd.dataclass_input(eph=sbd.Ephem) + @sbd.quantity_to_dataclass(eph=(sbd.Ephem, "delta")) + def to_cross_section(self, Ap, aper, eph): + """Convert from Afρ to dust geometric cross-sectional area. + + + Parameters + ---------- + Ap : float + Grain geometric albedo. Note that A in the Afρ quantity is ``4 * + Ap`` (A'Hearn et al. 1984). + + aper : `~astropy.units.Quantity` or `~sbpy.activity.Aperture` + Aperture of the observation as a circular radius (length + or angular units), or as an `~sbpy.activity.Aperture`. + + eph: `~astropy.units.Quantity`, dictionary-like, `~sbpy.data.Ephem` + Observer-comet distance, or ephemeris data object with "delta" + defined. + + + Returns + ------- + G : `~astropy.units.Quantity` + + + Examples + -------- + >>> import astropy.units as u + >>> from sbpy.activity import Afrho + >>> afrho = Afrho(1000 * u.cm) + >>> eph = {"rh": 1 * u.au, "delta": 1 * u.au, "phase": 60 * u.deg} + >>> aper = 1 * u.arcsec + >>> G = afrho.to_cross_section(0.05, aper, eph) # doctest: +FLOAT_CMP + + + To account for phase angle, first use ``to_phase``: + >>> afrho.to_phase(0 * u.deg, eph["phase"]).to_cross_section(0.1, aper, eph) + ... # doctest: +FLOAT_CMP + + + """ + + # G = pi (rh delta)**2 F_comet / (Ap F_sun) + # F_comet / F_sun = G Ap / (pi rh**2 delta**2) + + # Afrho = 4 (rh delta)**2 F_comet / (rho F_sun) + # F_comet / F_sun = Afrho rho / (4 delta**2 rh**2) + + # G Ap / (pi rh**2 delta**2) = Afrho rho / (4 delta**2 rh**2) + # G = Afrho rho pi rh**2 delta**2 / (4 delta**2 rh**2 Ap) + # G = pi Afrho rho / (4 Ap) + + # rho = effective circular aperture radius at the distance of + # the comet. + if isinstance(aper, Aperture): + rho = aper.coma_equivalent_radius() + else: + rho = aper + rho = rho.to("km", sbu.projected_size(eph)) + + G = (self * rho * np.pi / (4 * Ap)).to("km2") + + # Avoid allowing the Ephem object to needlessly return an array. + ndim = np.ndim(self * aper * Ap) + if ndim == 0 and ndim != np.ndim(G) and len(G) == 1: + return G[0] + else: + return G + + @classmethod + @sbd.dataclass_input(eph=sbd.Ephem) + @sbd.quantity_to_dataclass(eph=(sbd.Ephem, "delta")) + def from_cross_section(cls, G, Ap, aper, eph): + """Initialize from dust geometric cross-sectional area. + + + Parameters + ---------- + G : `~astropy.units.Quantity` + Dust geometric cross-sectional area. + + Ap : float + Grain geometric albedo. Note that A in the Afρ quantity is ``4 * + Ap`` (A'Hearn et al. 1984). + + aper : `~astropy.units.Quantity` or `~sbpy.activity.Aperture` + Aperture of the observation as a circular radius (length or angular + units), or as an `~sbpy.activity.Aperture`. + + eph: `~astropy.units.Quantity`, dictionary-like, `~sbpy.data.Ephem` + Observer-comet distance, or ephemeris data object with "delta" + defined. + + + Returns + ------- + G : `~astropy.units.Quantity` + + + Examples + -------- + >>> import astropy.units as u + >>> from sbpy.activity import Afrho + >>> eph = {"rh": 1 * u.au, "delta": 1 * u.au} + >>> aper = 1 * u.arcsec + >>> Afrho.from_cross_section(1 * u.km**2, 0.05, aper, eph) + + """ + + G1 = cls(1 * u.cm).to_cross_section(Ap, aper, eph) + afrho = cls((G / G1).to("") * u.cm) + return afrho + class Efrho(DustComaQuantity): """Coma dust quantity for thermal emission. @@ -642,7 +751,7 @@ class Efrho(DustComaQuantity): The value(s). unit : str, `~astropy.units.Unit`, optional - The unit of the input value. Strings must be parseable by + The unit of the input value. Strings must be parsable by :mod:`~astropy.units` package. dtype : `~numpy.dtype`, optional @@ -760,3 +869,107 @@ def _source_fluxd(self, wfb, eph, unit=None, Tscale=1.1, T=None, B=None): ) return B + + @sbd.dataclass_input(eph=sbd.Ephem) + @sbd.quantity_to_dataclass(eph=(sbd.Ephem, "delta")) + def to_cross_section(self, epsilon, aper, eph): + """Convert from εfρ to dust geometric cross-sectional area. + + + Parameters + ---------- + epsilon : float + Grain emissivity. + + aper : `~astropy.units.Quantity` or `~sbpy.activity.Aperture` + Aperture of the observation as a circular radius (length + or angular units), or as an `~sbpy.activity.Aperture`. + + eph: `~astropy.units.Quantity`, dictionary-like, `~sbpy.data.Ephem` + Observer-comet distance, or ephemeris data object with "delta" + defined. + + + Returns + ------- + G : `~astropy.units.Quantity` + + + Examples + -------- + >>> import astropy.units as u + >>> from sbpy.activity import Efrho + >>> efrho = Efrho(1000 * u.cm) + >>> eph = {"rh": 1 * u.au, "delta": 1 * u.au} + >>> aper = 1 * u.arcsec + >>> efrho.to_cross_section(0.95, aper, eph) # doctest: +FLOAT_CMP + + + """ + + # F_comet = G epsilon B(T) / delta**2 + # + # efrho = F_comet delta**2 / (pi rho B(T)) + # F_comet = efrho pi rho B(T) / delta**2 + # + # G epsilon B(T) / delta**2 = efrho pi rho B(T) / delta**2 + # G epsilon = efrho pi rho + # + # G = efrho pi rho / epsilon + + # rho = effective circular aperture radius at the distance of + # the comet. + if isinstance(aper, Aperture): + rho = aper.coma_equivalent_radius() + else: + rho = aper + rho = rho.to("km", sbu.projected_size(eph)) + + G = (self * np.pi * rho / epsilon).to("km2") + + # Avoid allowing the Ephem object to needlessly return an array. + ndim = np.ndim(self * aper * epsilon) + if ndim == 0 and ndim != np.ndim(G) and len(G) == 1: + return G[0] + else: + return G + + @classmethod + @sbd.dataclass_input(eph=sbd.Ephem) + @sbd.quantity_to_dataclass(eph=(sbd.Ephem, "delta")) + def from_cross_section(cls, G, epsilon, aper, eph): + """Initialize from dust geometric cross-sectional area. + + + Parameters + ---------- + G : `~astropy.units.Quantity` + Dust geometric cross-sectional area. + + epsilon : float + Grain emissivity. + + aper : `~astropy.units.Quantity` or `~sbpy.activity.Aperture` + Aperture of the observation as a circular radius (length + or angular units), or as an `~sbpy.activity.Aperture`. + + eph: `~astropy.units.Quantity`, dictionary-like, `~sbpy.data.Ephem` + Observer-comet distance, or ephemeris data object with "delta" + defined. + + + Returns + ------- + G : `~astropy.units.Quantity` + + + Examples + -------- + >>> eph = {"rh": 1 * u.au, "delta": 1 * u.au} + >>> aper = 1 * u.arcsec + >>> Efrho.from_cross_section(1 * u.km**2, 0.95, aper, eph) + + """ + + G1 = Efrho(1 * u.cm).to_cross_section(epsilon, aper, eph) + return Efrho((G / G1).to("") * u.cm) diff --git a/sbpy/activity/tests/test_dust.py b/sbpy/activity/tests/test_dust.py index f15b1a3d..e03c33f6 100644 --- a/sbpy/activity/tests/test_dust.py +++ b/sbpy/activity/tests/test_dust.py @@ -155,6 +155,68 @@ def test_to_phase(self): afrho = Afrho(10 * u.cm).to_phase(15 * u.deg, 0 * u.deg).to('cm') assert np.isclose(afrho.value, 5.8720) + def test_to_cross_section(self): + """ + + Compare with Kelley et al. 2021, PSJ, 2, 131. + + The original work missed a factor of rh in the cross-sectional area + calculation. Multiply their G by rh to correct it. + + rh, delta, phase, and m have an additional significant digit over Table + 1 of Kelley et al. based on a rerun of their lightcurve analysis script + on 2024 Jan 14. + + Telescope: Lowell 0.8m + Date: 2018-12-04 04:26:25 + rh: 1.062 au (1.0620) + delta: 0.107 au (0.1068) + phase: 42.32 deg (42.324) + filter: R (calibrated to PS1 r) + m: 12.00 +/- 0.02 mag (12.003) + A(theta) f rho: 79.45 cm + G: 15.26 km2 --> 16.21 + + Ap (PS1 r): 4.19% + rho: 5" + + """ + + eph = Ephem.from_dict( + { + "rh": 1.0620 * u.au, + "delta": 0.1068 * u.au, + "phase": 42.324 * u.deg, + } + ) + rho = 5 * u.arcsec + + with solar_fluxd.set({"PS1 r": -26.93 * u.ABmag}): + afrho = Afrho.from_fluxd("PS1 r", 12.003 * u.ABmag, rho, eph) + assert u.isclose(afrho, 79.45 * u.cm, rtol=0.001) + + a0frho = afrho.to_phase(0 * u.deg, eph["phase"][0]) + G = a0frho.to_cross_section(0.0419, rho, eph["delta"]) + assert u.isclose(G, 16.21 * u.km**2, rtol=0.002) + assert np.ndim(G) == 0 + + # get a 1D array + G = Afrho(1, "cm").to_cross_section(0.04, rho, [1, 2] * u.au) + assert np.size(G) == 2 + + def test_from_cross_section(self): + """See test_to_cross_section for notes.""" + a0frho = Afrho.from_cross_section( + 16.21 * u.km**2, 0.0419, 5 * u.arcsec, 0.1068 * u.au + ) + afrho = a0frho.to_phase(42.324 * u.deg, 0 * u.deg) + assert u.isclose(afrho, 79.45 * u.cm, rtol=0.002) + assert np.ndim(afrho) == 0 + + # get a 1D array + afrho = Afrho.from_cross_section(1 * u.km**2, 0.04, 5 * u.arcsec, [1, 2] * u.au) + assert np.size(afrho) == 2 + def test_from_fluxd_PR125(self): """Regression test for PR#125: User requested Phi was ignored.""" afrho = Afrho(100 * u.cm) @@ -271,3 +333,53 @@ def test_source_fluxd_B_error(self): with pytest.raises(ValueError): assert Efrho.from_fluxd(1 * u.um, 1 * u.Jy, 1 * u.arcsec, eph, B=5 * u.m) + + def test_to_cross_section(self): + """ + Based on TestAfrho.test_to_cross_section. + """ + + eph = Ephem.from_dict( + { + "rh": 1.0620 * u.au, + "delta": 0.1068 * u.au, + "phase": 42.324 * u.deg, + } + ) + rho = 5 * u.arcsec + a0frho = Afrho(222.89862496, "cm") + + Ap = 0.0419 + A = 4 * Ap + epsilon = 1 - Ap + ef_af = epsilon / A + + efrho = Efrho(ef_af * a0frho) + G = efrho.to_cross_section(epsilon, rho, eph) + + assert u.isclose(G, 16.21 * u.km**2, rtol=0.002) + assert np.ndim(G) == 0 + + # get a 1D array + G = Efrho(1, "cm").to_cross_section(epsilon, rho, [1, 2] * u.au) + assert np.size(G) == 2 + + def test_from_cross_section(self): + """See test_to_cross_section for notes.""" + + Ap = 0.0419 + A = 4 * Ap + epsilon = 1 - Ap + ef_af = epsilon / A + delta = 0.1068 * u.au + rho = 5 * u.arcsec + + a0frho = Afrho(222.89862496, "cm") + + efrho = Efrho.from_cross_section(16.21 * u.km**2, epsilon, rho, delta) + assert u.isclose(efrho, ef_af * a0frho, rtol=0.002) + assert np.ndim(efrho) == 0 + + # get a 1D array + efrho = Efrho.from_cross_section(1 * u.km**2, epsilon, rho, [1, 2] * u.au) + assert np.size(efrho) == 2 diff --git a/sbpy/spectroscopy/coma.py b/sbpy/spectroscopy/coma.py new file mode 100644 index 00000000..e29ee882 --- /dev/null +++ b/sbpy/spectroscopy/coma.py @@ -0,0 +1,263 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst +"""sbpy Coma Spectroscopy + +Spectrophotometric classes for cometary comae. + +Uses the astropy modeling framework. + +Requires synphot. + +""" + +import numpy as np +import astropy.units as u +from astropy.modeling import Fittable1DModel, Parameter +from .core import SpectralGradient +from .sources import BlackbodySource +from ..activity.dust import Afrho, Efrho +from .. import data as sbd +from .. import units as sbu +from ..calib import Sun + +__all__ = ["Scattered", "Thermal"] + + +class Scattered(Fittable1DModel): + """Light scattered from coma dust. + + Does not directly account for geometric albedo or phase function: + + F_lambda = cross_section * F_sun * R_lambda(S) / pi / rh**2 / delta**2 + + where ``R`` is the spectral reddening function, ``F_sun`` is the flux + density of the Sun at 1 au, ``rh`` is the heliocentric distance scale factor + for the solar flux density, ``delta`` is the observer-coma distance, and + ``cross_section`` is the effective cross sectional area of the dust. + + + Parameters + ---------- + eph: dictionary-like, `~sbpy.data.Ephem` + Ephemerides of the comet. Required fields: 'rh', 'delta'. + + unit : string or `astropy.units.Unit` + Unit of the resultant spectrum (spectral flux density). + + cross_section : float or `~astropy.units.Quantity`, optional + Cross-sectional area of the dust. For float input, km**2 is assumed. + + S : float, `~astropy.units.Quantity`, or + `~sbpy.spectroscopy.SpectralGradient`, optional + Spectral gradient. For float input, %/100 nm is assumed. For float or + quantity input, the wavelength of normalization is 550 nm. + + """ + + n_inputs = 1 + n_outputs = 1 + + input_units = {"x": u.um} + _input_units_allow_dimensionless = True + input_units_equivalencies = {"x": u.spectral()} + + cross_section = Parameter( + description="cross-sectional area of dust", default=1, unit=u.km**2 + ) + S = Parameter( + description="spectral gradient", default=5, unit=u.percent / sbu.hundred_nm + ) + + @sbd.dataclass_input(eph=sbd.Ephem) + def __init__(self, eph, unit, *args, **kwargs): + self.sun = Sun.from_default() + self.eph = eph + self.unit = u.Unit(unit) + if "S" in kwargs and isinstance(kwargs["S"], SpectralGradient): + kwargs["S"] = u.Quantity( + kwargs["S"].renormalize(550 * u.nm), u.percent / sbu.hundred_nm + ) + super().__init__(*args, **kwargs) + + def _parameter_units_for_data_units(self, inputs_unit, outputs_unit): + return { + "cross_section": u.km**2, + "S": u.percent / sbu.hundred_nm, + } + + @property + def rh(self): + return self.eph["rh"][0] + + @property + def delta(self): + return self.eph["delta"][0] + + def evaluate(self, wave, cross_section, S): + _wave = u.Quantity(wave, u.um) + _cross_section = u.Quantity(cross_section, u.km**2) + # Keep S as a scalar value + _S = np.atleast_1d(u.Quantity(S, u.percent / sbu.hundred_nm))[0] + + sun = self.sun.redden(SpectralGradient(_S, wave0=550 * u.nm)) + if np.size(_wave) == 1: + F_sun = sun(_wave, unit=self.unit) + else: + F_sun = sun.observe(_wave, unit=self.unit) + + spec = ( + _cross_section + * F_sun + / np.pi + / self.delta**2 + / self.rh.to_value(u.au) ** 2 + ).to(self.unit) + return spec if hasattr(cross_section, "unit") else spec.value + + # def fit_deriv(self, wave, cross_section, S): + # spec = self.evaluate(wave, cross_section, S) + # dspec_dcross_section = spec / cross_section + # dspec_dS = spec * (wave - 0.55 * u.um) + # return [dspec_dcross_section, dspec_dS] + + @u.quantity_input(wave=u.m) + def afrho(self, wave, aper): + """Convert spectrum to Afρ quantity. + + + Parameters + ---------- + wave : `~astropy.units.Quantity` + Spectral wavelengths. + + aper : `~astropy.units.Quantity` or `~sbpy.activity.Aperture` + Aperture of the observation as a circular radius (length + or angular units), or as an `~sbpy.activity.Aperture`. + + """ + + return Afrho.from_fluxd(wave, self(wave), aper, self.eph) + + +class Thermal(Fittable1DModel): + """Thermal emission from coma dust. + + Simple blackbody model, does not directly account for thermal emissivity. + + F_lambda = cross_section * B_lambda(T) / delta**2 + + where ``B_lambda`` is the Planck function, ``T = Tscale * 278 / sqrt(rh)`` + is the dust temperature, and ``cross_section`` is the effective cross + sectional area. + + + Parameters + ---------- + eph: dictionary-like, `~sbpy.data.Ephem` + Ephemerides of the comet. Required fields: 'rh', 'delta' + + unit : string or `astropy.units.Unit` + Unit of the resultant spectrum (spectral flux density). + + cross_section : float or `~astropy.units.Quantity`, optional + Effective cross-sectional area of the dust. For float input, km**2 is + assumed. + + Tscale : float or `~astropy.units.Quantity`, optional + LTE blackbody sphere temperature scale factor: ``T = Tscale * 278 / + sqrt(rh)``. + + """ + + n_inputs = 1 + n_outputs = 1 + + input_units = {"x": u.um} + _input_units_allow_dimensionless = True + input_units_equivalencies = {"x": u.spectral()} + + cross_section = Parameter( + description="effective emission cross-sectional area of dust", + default=1, + unit=u.km**2, + ) + Tscale = Parameter( + description="LTE blackbody sphere temperature scale factor", + default=1.0, + min=0, + ) + + @sbd.dataclass_input(eph=sbd.Ephem) + def __init__(self, eph, unit, *args, **kwargs): + self.eph = eph + self.unit = unit + super().__init__(*args, **kwargs) + + def _parameter_units_for_data_units(self, inputs_unit, outputs_unit): + return { + "cross_section": u.km**2, + "Tscale": "", + } + + @property + def rh(self): + return self.eph["rh"][0] + + @property + def delta(self): + return self.eph["delta"][0] + + def evaluate(self, wave, cross_section, Tscale): + """Evaluate the model. + + + Parameters + ---------- + x : float, `~numpy.ndarray`, or `~astropy.units.Quantity` + Frequency or wavelength at which to compute the blackbody. If no + units are given, then micrometer is assumed. + + cross_section : float or `~astropy.units.Quantity`, optional + Effective cross-sectional area of the dust. If no units are given, + then km**2 is assumed. + + Tscale : float or `~astropy.units.Quantity`, optional + LTE blackbody sphere temperature scale factor: ``T = Tscale * 278 / + sqrt(rh)``. + + + Returns + ------- + y : number, `~numpy.ndarray`, or `~astropy.units.Quantity` + Spectrum. + + """ + + _wave = u.Quantity(wave, u.um) + _cross_section = u.Quantity(cross_section, u.km**2) + T = Tscale * u.Quantity(278, u.K) / np.sqrt(self.rh.to_value("au")) + + B = BlackbodySource(T) + if np.size(_wave) == 1: + F_bb = B(_wave, unit=self.unit) + else: + F_bb = B.observe(_wave, unit=self.unit) + + spec = (_cross_section * (F_bb / np.pi) / self.delta**2).to(self.unit) + return spec if hasattr(cross_section, "unit") else spec.value + + def efrho(self, epsilon, aper): + """Convert to εfρ quantity. + + + Parameters + ---------- + epsilon : float + Grain emissivity. + + aper : `~astropy.units.Quantity` or `~sbpy.activity.Aperture` + Aperture of the observation as a circular radius (length or angular + units), or as an `~sbpy.activity.Aperture` object. + + """ + + return Efrho.from_cross_section(self.cross_section, epsilon, aper, self.delta) diff --git a/sbpy/spectroscopy/sources.py b/sbpy/spectroscopy/sources.py index 6de151b0..23928539 100644 --- a/sbpy/spectroscopy/sources.py +++ b/sbpy/spectroscopy/sources.py @@ -8,9 +8,7 @@ """ -__all__ = [ - 'BlackbodySource', 'Reddening' -] +__all__ = ["BlackbodySource", "Reddening"] import numpy as np from abc import ABC @@ -30,12 +28,13 @@ class SpectralElement: class BaseUnitlessSpectrum: pass + from ..exceptions import SbpyException from ..utils.decorators import requires __doctest_requires__ = { - 'SpectralSource': ['synphot'], - 'BlackbodySource': ['synphot'], + "SpectralSource": ["synphot"], + "BlackbodySource": ["synphot"], } @@ -99,15 +98,14 @@ def from_array(cls, wave, fluxd, meta=None, **kwargs): """ source = synphot.SourceSpectrum( - synphot.Empirical1D, points=wave, lookup_table=fluxd, - meta=meta) + synphot.Empirical1D, points=wave, lookup_table=fluxd, meta=meta + ) return cls(source, **kwargs) @classmethod @requires("synphot") - def from_file(cls, filename, wave_unit=None, flux_unit=None, - cache=True, **kwargs): + def from_file(cls, filename, wave_unit=None, flux_unit=None, cache=True, **kwargs): """Load the source spectrum from a file. NaNs are dropped. @@ -130,7 +128,7 @@ def from_file(cls, filename, wave_unit=None, flux_unit=None, """ - if filename.lower().endswith(('.fits', '.fit', '.fz')): + if filename.lower().endswith((".fits", ".fit", ".fz")): read_spec = synphot.specio.read_fits_spec else: read_spec = synphot.specio.read_ascii_spec @@ -144,8 +142,11 @@ def from_file(cls, filename, wave_unit=None, flux_unit=None, spec = read_spec(fn, wave_unit=wave_unit, flux_unit=flux_unit) i = np.isfinite(spec[1] * spec[2]) source = synphot.SourceSpectrum( - synphot.Empirical1D, points=spec[1][i], lookup_table=spec[2][i], - meta={'header': spec[0]}) + synphot.Empirical1D, + points=spec[1][i], + lookup_table=spec[2][i], + meta={"header": spec[0]}, + ) return cls(source, **kwargs) @@ -162,7 +163,7 @@ def wave(self): @property def fluxd(self): """The source spectrum.""" - return self.source(self._source.waveset, flux_unit='W / (m2 um)') + return self.source(self._source.waveset, flux_unit="W / (m2 um)") @property def source(self): @@ -200,14 +201,15 @@ def __call__(self, wave_or_freq, unit=None): if unit is not None: unit = u.Unit(unit) - elif wave_or_freq.unit.is_equivalent('m'): - unit = u.Unit('W/(m2 um)') + elif wave_or_freq.unit.is_equivalent("m"): + unit = u.Unit("W/(m2 um)") else: unit = u.Jy if unit.is_equivalent(sbu.VEGA): - fluxd = self.source(wave_or_freq, 'W/(m2 um)').to( - unit, sbu.spectral_density_vega(wave_or_freq)) + fluxd = self.source(wave_or_freq, "W/(m2 um)").to( + unit, sbu.spectral_density_vega(wave_or_freq) + ) else: fluxd = self.source(wave_or_freq, unit) @@ -262,16 +264,14 @@ def observe(self, wfb, unit=None, interpolate=False, **kwargs): """ if isinstance(wfb, (list, tuple, SpectralElement)): - lambda_eff, fluxd = self.observe_bandpass( - wfb, unit=unit, **kwargs) + lambda_eff, fluxd = self.observe_bandpass(wfb, unit=unit, **kwargs) elif isinstance(wfb, u.Quantity): if interpolate: fluxd = self(wfb, unit=unit) else: fluxd = self.observe_spectrum(wfb, unit=unit, **kwargs) else: - raise TypeError('Unsupported type for `wfb` type: {}' - .format(type(wfb))) + raise TypeError("Unsupported type for `wfb` type: {}".format(type(wfb))) return fluxd @@ -314,7 +314,7 @@ def observe_bandpass(self, bp, unit=None, **kwargs): ndim = np.ndim(bp) if unit is None: - unit = u.Unit('W/(m2 um)') + unit = u.Unit("W/(m2 um)") else: unit = u.Unit(unit) @@ -323,7 +323,7 @@ def observe_bandpass(self, bp, unit=None, **kwargs): obs = synphot.Observation(self.source, bp[i], **kwargs) lambda_eff = obs.effective_wavelength() lambda_pivot = obs.bandpass.pivot() - _fluxd = obs.effstim('W/(m2 um)') + _fluxd = obs.effstim("W/(m2 um)") if unit.is_equivalent(sbu.VEGAmag): fluxd[i] = _fluxd.to(unit, sbu.spectral_density_vega(bp[i])) @@ -387,14 +387,14 @@ def observe_spectrum(self, wave_or_freq, unit=None, **kwargs): if np.size(wave_or_freq) == 1: raise SinglePointSpectrumError( - 'Multiple wavelengths or frequencies required for ' - 'observe_spectrum. Instead consider interpolation ' - 'with {}().' - .format(self.__class__.__name__)) + "Multiple wavelengths or frequencies required for " + "observe_spectrum. Instead consider interpolation " + "with {}().".format(self.__class__.__name__) + ) if unit is None: - if wave_or_freq.unit.is_equivalent('m'): - unit = u.Unit('W/(m2 um)') + if wave_or_freq.unit.is_equivalent("m"): + unit = u.Unit("W/(m2 um)") else: unit = u.Jy else: @@ -406,14 +406,14 @@ def observe_spectrum(self, wave_or_freq, unit=None, **kwargs): # standards are not. force='taper' will affect retrieving # flux densities at the edges of the spectrum, but is # preferred to avoid wild extrapolation. - kwargs['force'] = kwargs.get('force', 'taper') + kwargs["force"] = kwargs.get("force", "taper") - obs = synphot.Observation( - self.source, specele, binset=wave_or_freq, **kwargs) + obs = synphot.Observation(self.source, specele, binset=wave_or_freq, **kwargs) if unit.is_equivalent(sbu.VEGAmag): - fluxd = obs.sample_binned(flux_unit='W/(m2 um)').to( - unit, sbu.spectral_density_vega(wave_or_freq)) + fluxd = obs.sample_binned(flux_unit="W/(m2 um)").to( + unit, sbu.spectral_density_vega(wave_or_freq) + ) else: fluxd = obs.sample_binned(flux_unit=unit) @@ -457,8 +457,9 @@ def color_index(self, wfb, unit): w, m[i] = self.observe_bandpass(wfb[i], unit=unit) eff_wave.append(w) else: - raise TypeError('Unsupported type for `wfb` type: {}' - .format(type(wfb[i]))) + raise TypeError( + "Unsupported type for `wfb` type: {}".format(type(wfb[i])) + ) ci = m[0] - m[1] @@ -479,12 +480,14 @@ def redden(self, S): """ from copy import deepcopy + r = Reddening(S) red_spec = deepcopy(self) red_spec._source = red_spec.source * r if red_spec.description is not None: - red_spec._description = '{} reddened by {} at {}'.format( - red_spec.description, S, S.wave0) + red_spec._description = "{} reddened by {} at {}".format( + red_spec.description, S, S.wave0 + ) return red_spec @@ -504,17 +507,19 @@ class BlackbodySource(SpectralSource): @requires("synphot") def __init__(self, T=None): - super().__init__(None, description='πB(T)') + super().__init__(None, description="πB(T)") if T is None: - raise TypeError('T is required.') + raise TypeError("T is required.") self._T = u.Quantity(T, u.K) - self._source = synphot.SourceSpectrum( - synphot.BlackBody1D, temperature=self._T.value) * np.pi + self._source = ( + synphot.SourceSpectrum(synphot.BlackBody1D, temperature=self._T.value) + * np.pi + ) def __repr__(self): - return ''.format(self._T) + return "".format(self._T) @property def T(self): @@ -534,14 +539,11 @@ class Reddening(BaseUnitlessSpectrum): @u.quantity_input(S=u.percent / u.um) @requires("synphot") def __init__(self, S): - - if getattr(S, 'wave0', None) is None: - raise ValueError("Normalization wavelength in `S` (.wave0) is " - "required by not available.") - + if getattr(S, "wave0", None) is None: + raise ValueError("Normalization wavelength in `S` (.wave0) is required.") wv = [1, 2] * S.wave0 - df = (S.wave0 * S).to('').value + df = (S.wave0 * S).to("").value super().__init__( - synphot.Empirical1D, points=wv, lookup_table=[1, 1+df], - fill_value=None) + synphot.Empirical1D, points=wv, lookup_table=[1, 1 + df], fill_value=None + ) diff --git a/sbpy/spectroscopy/tests/test_coma.py b/sbpy/spectroscopy/tests/test_coma.py new file mode 100644 index 00000000..9c9cabd9 --- /dev/null +++ b/sbpy/spectroscopy/tests/test_coma.py @@ -0,0 +1,164 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst + +import pytest +import numpy as np +import astropy.units as u +from astropy.modeling import fitting +from ..sources import BlackbodySource, Reddening +from ..coma import Scattered, Thermal +from ..core import SpectralGradient +from ...activity.dust import Afrho, Efrho +from ...calib import Sun +from ...units import hundred_nm + +pytest.importorskip("synphot") + + +class TestScattered: + def test_init(self): + s = Scattered({"rh": 1 * u.au, "delta": 1 * u.au}, "W/(m2 um)") + assert s.rh == 1 * u.au + assert s.delta == 1 * u.au + assert s.unit == u.Unit("W/(m2 um)") + + def test_evaluate(self): + cross_section = 1 * u.km**2 + S = 0 * u.percent / hundred_nm + + w = [2.0, 2.2] * u.um + eph = {"rh": 1 * u.au, "delta": 1 * u.au} + unit = u.Unit("W/(m2 um)") + + # expected value + sun = Sun.from_default() + expected = ( + cross_section + * sun.observe(w, unit=unit) + / np.pi + / eph["rh"].to_value("au") ** 2 + / eph["delta"] ** 2 + ).to(unit) + + # test multiple wavelengths and no reddening + s = Scattered(eph, unit, cross_section=cross_section, S=S) + assert u.allclose(s(w), expected) + + # test single wavelength and add reddening + w = 2 * u.um + s.S = 1 * u.percent / hundred_nm + R = 1 + ((w - 0.55 * u.um) * s.S).to_value("") + expected = ( + cross_section + * sun(w, unit=unit) + * R + / np.pi + / eph["rh"].to_value("au") ** 2 + / eph["delta"] ** 2 + ).to(unit) + assert u.allclose(s(w), expected) + + def test_afrho(self): + cross_section = 1 * u.km**2 + S = 0 * u.percent / hundred_nm + w = 2.0 * u.um + eph = {"rh": 1 * u.au, "delta": 1 * u.au} + rap = 1 * u.arcsec + unit = u.Unit("W/(m2 um)") + + s = Scattered(eph, unit, cross_section=cross_section, S=S) + a = s.afrho(w, rap) + b = Afrho.from_cross_section(cross_section, 1, rap, eph) + assert u.isclose(a, b) + + def test_fit(self): + wave = np.linspace(0.5, 0.6, 3) * u.um + S = SpectralGradient(2.0 * u.percent / hundred_nm, wave0=0.55 * u.um) + G = 100 * u.km**2 + Ap = 0.05 + rho = 5 * u.arcsec + eph = { + "rh": 1 * u.au, + "delta": 1 * u.au, + "phase": 60 * u.deg, + } + unit = u.Jy + + afrho = Afrho.from_cross_section(G, Ap, rho, eph) + redden = Reddening(S) + spec = afrho.to_fluxd(wave, rho, eph, unit=unit) * redden(wave) + guess = Scattered( + eph, unit, cross_section=10 * u.km**2, S=3 * u.percent / hundred_nm + ) + fitter = fitting.LevMarLSQFitter() + fit = fitter(guess, wave, spec) + + assert u.isclose(fit.cross_section, Ap * G, rtol=1e-4) + assert u.isclose(fit.S, S, rtol=5e-3) + + +class TestThermal: + def test_init(self): + s = Thermal({"rh": 1 * u.au, "delta": 1 * u.au}, "W/(m2 um)") + assert s.rh == 1 * u.au + assert s.delta == 1 * u.au + assert s.unit == u.Unit("W/(m2 um)") + + def test_evaluate(self): + cross_section = 1 * u.km**2 + Tscale = 1.0 + + w = [20.0, 22.0] * u.um + eph = {"rh": 1 * u.au, "delta": 1 * u.au} + unit = u.Unit("W/(m2 um)") + + # expected value + B = BlackbodySource(278 / np.sqrt(eph["rh"].to_value("au"))) + expected = cross_section * B.observe(w, unit=unit) / np.pi / eph["delta"] ** 2 + + t = Thermal(eph, unit, cross_section=cross_section, Tscale=Tscale) + assert u.allclose(t(w), expected) + + # test single wavelength and hotter temperatures + w = 2 * u.um + Tscale = 1.1 + eph = {"rh": 0.8 * u.au, "delta": 1 * u.au} + t = Thermal(eph, unit, cross_section=cross_section, Tscale=Tscale) + B = BlackbodySource(Tscale * 278 / np.sqrt(eph["rh"].to_value("au"))) + expected = cross_section * B(w, unit=unit) / np.pi / eph["delta"] ** 2 + assert u.allclose(t(w), expected) + + def test_efrho(self): + cross_section = 1 * u.km**2 + Tscale = 1.1 + w = 20.0 * u.um + eph = {"rh": 1 * u.au, "delta": 1 * u.au} + rap = 1 * u.arcsec + epsilon = 0.95 + unit = u.Unit("W/(m2 um)") + + t = Thermal(eph, unit, cross_section=cross_section, Tscale=Tscale) + a = t.efrho(epsilon, rap) + b = Efrho.from_cross_section(cross_section, epsilon, rap, eph) + assert u.isclose(a, b) + + def test_fit(self): + wave = np.linspace(8, 12, 3) * u.um + Tscale = 1.1 + G = 100 * u.km**2 + epsilon = 0.95 + rho = 5 * u.arcsec + eph = { + "rh": 1 * u.au, + "delta": 1 * u.au, + "phase": 60 * u.deg, + } + unit = u.Jy + + efrho = Efrho.from_cross_section(G, epsilon, rho, eph) + spec = efrho.to_fluxd(wave, rho, eph, unit=unit, Tscale=Tscale) + guess = Thermal(eph, unit, cross_section=100 * u.km**2, Tscale=1.08) + fitter = fitting.LevMarLSQFitter() + fit = fitter(guess, wave, spec) + + assert u.isclose(fit.cross_section, epsilon * G) + assert u.isclose(fit.Tscale, Tscale)