Source code for lidarSuit.wind_prop_retrieval

import logging

import numpy as np
import xarray as xr
import xrft

from .data_attributes import LoadAttributes
from .data_operator import GetRestructuredData

module_logger = logging.getLogger("lidarSuit.wind_prop_retrieval")
module_logger.debug("loading wind_prop_retrieval")


class FourierTransfWindMethod:
    """FFT wind retrieval method

    It is a fft wind retrieval method. It was proposed
    by Ishwardat (2017). For more details see
    http://resolver.tudelft.nl/uuid:a659654b-e76a-4513-a656-ecad761bdbc8

    Parameters
    ----------
    doppler_obs : xarray.DataArray
        It should be a DataArray of slanted Doppler velocity
        observations as function of the azimuthal angle.
        It must have a coordinate called azm.

    Returns
    -------
    wind_prop : xarray.Dataset
        A Dataset containing the horizontal wind speed
        and direction. It also includes the zonal and
        meridional wind components

    """

    def __init__(self, doppler_obs: xr.DataArray):

        self.logger = logging.getLogger(
            "lidarSuit.wind_prop_retrieval.FourierTransfWindMethod"
        )
        self.logger.info("creating an instance of FourierTransfWindMethod")

        if not isinstance(doppler_obs, xr.DataArray):
            self.logger.error("wrong data type: expecting a xr.DataArray")
            raise TypeError

        self.doppler_obs = doppler_obs

        self.get_comp_amp()
        self.get_phase()
        self.get_rad_wind_speed()
        self.get_hor_wind_speed()
        self.get_wind_dir()
        self.get_wind_comp_u()
        self.get_wind_comp_v()

    def get_comp_amp(self):
        """First harmonic amplitude

        It calculates the complex amplitudes from the
        first harmonic from the observations along the
        azm coordinate
        """

        self.logger.info("calculating the complex amplitude")

        self.comp_amp = xrft.fft(
            self.doppler_obs, dim=["azm"], true_amplitude=False
        ).isel(freq_azm=-2)

        return self

    def get_phase(self):
        """First harmonic phase

        It calculates the phase of the first harmonic from the complex
        amplitude used for retrieving wind direction, i.e. comp_amp.
        """

        self.logger.info("calculating the phase from the complex amplitude")

        self.phase = -np.rad2deg(
            np.arctan2(self.comp_amp.imag, self.comp_amp.real)
        )

        self.phase.attrs = {
            "long_name": "Retrived phase",
            "units": "degree",
            "comments": "Phase derived from comp_amp "
            "variable using the FFT method.",
        }

        return self

    def get_wind_dir(self):
        """Wind direction calculation

        It calculates the wind direction based on the
        phase of the first harmonic
        """

        self.logger.info("retrieving wind direction from the phase")

        self.wind_dir = self.phase + 180
        self.wind_dir.attrs = {
            "long_name": "Wind direction",
            "standard_name": "wind_from_direction",
            "units": "degree",
            "comments": "Wind direction retrived using the FFT method.",
        }

        return self

    def get_rad_wind_speed(self):
        """Wind speed calculation

        It calculates the wind speed using the first harmonic
        """

        self.logger.info(
            "calculating the radial wind speed from the complex amplitude"
        )

        self.rad_wind_speed = (
            2 * np.abs(self.comp_amp) / self.doppler_obs.azm.shape[0]
        )

        self.rad_wind_speed.attrs = {
            "long_name": "wind speed not corrected for the elevation",
            "units": "m s-1",
            "comments": "Radial wind velocity retrived using "
            "the FFT method not corrected for the elevation.",
        }

        return self

    def get_hor_wind_speed(self):
        """Wind speed elevation correction

        It corrects the magnitude of the wind speed using
        the elevation angle
        """

        self.logger.info("retrieving the horizontal wind speed")

        self.hor_wind_speed = self.rad_wind_speed / np.cos(
            np.deg2rad(self.doppler_obs.elv)
        )

        self.hor_wind_speed.attrs = {
            "long_name": "horizontal wind speed",
            "standard_name": "wind_speed",
            "units": "m s-1",
            "comments": "Horizontal wind velocity retrived using "
            "the FFT method corrected for the elevation.",
        }

        return self

    def get_azm_wind(self, azm):
        """Wind speed for a given azimuth

        It retrieves the wind speed for a given azimuthal angle.
        It cam be used to calculate the meridional and zonal wind
        components

        Parameters
        ----------
        azm : float
            an azimuth for retrieving the wind
        """

        self.logger.info("calculating wind speed for a give azimuth")

        azm_hor_wind = self.rad_wind_speed * np.sin(
            np.deg2rad(azm) + np.deg2rad(self.phase.values + 180)
        )
        azm_hor_wind = azm_hor_wind / np.cos(np.deg2rad(self.doppler_obs.elv))

        return azm_hor_wind

    def get_wind_comp_u(self):
        """Zonal wind calculation

        It retrives the zonal wind component
        the -1 multiplication is for comply with the
        conventions used in meteorology
        """

        self.logger.info("retrieving the zonal wind speed component")

        self.comp_u = self.get_azm_wind(0) * -1
        self.comp_u.name = "comp_u"
        self.comp_u.attrs = {
            "long_name": "Zonal wind component",
            "units": "m s-1",
            "comments": "Zonal wind component retrieved using the FFT method.",
        }

        return self

    def get_wind_comp_v(self):
        """Meridional wind calculation

        It retrieves the meridional wind component
        the -1 multiplication is for comply with the
        conventions used in meteorology
        """

        self.logger.info("retrieving the meridional wind speed component")

        self.comp_v = self.get_azm_wind(90) * -1
        self.comp_v.name = "comp_v"
        self.comp_v.attrs = {
            "long_name": "Meridional wind component",
            "units": "m s-1",
            "comments": "Meridional wind component "
            "retrieved using the FFT method.",
        }

        return self

    def wind_prop(self):
        """Wind dataset

        It creates and returnes a dataset containing
        the wind speed, direction, and components
        """

        self.logger.info(
            "creating a xarray dataset from the retrieved wind properties"
        )

        wind_prop = xr.Dataset()
        wind_prop["horizontal_wind_direction"] = self.wind_dir
        wind_prop["horizontal_wind_speed"] = self.hor_wind_speed
        wind_prop["zonal_wind"] = self.comp_u
        wind_prop["meridional_wind"] = self.comp_v

        return wind_prop


[docs]class GetWindProperties5Beam: """DBS wind retrieval This class caculates the wind speeed and direction using the 5bean dataset (DBS files) as input. Parameters ---------- data : xarray.Dataset merged xarray dataset (merged_ds) output from lst.DbsOperations() status_filter : bolean Data filtering based on the wind lidar wind status variable. If True, all data with status not equal to 1 are removed. If False, no filtering is applied. cnr : int, optional Filter based on the carrier to noise ratio. If None, no filtering is applied. If a cnr value is given, all data smaller than the cnr is removed. method : str It can be 'single_dbs' (default) or 'continuous'. If single_dbs: the wind information is retrieved from comple sets of DBS profiles. If continuos: it uses the nearest 4 observations to retrieve the wind information, a tolerance wind has to be specified. tolerance : str It defines the tolerance window that the method will use for identify the nearest profile. Example: '8s' for 8 seconds. Returns ------- object : object This class returns an object containing the derived wind speed (.hor_wind_speed) and direction (.hor_wind_dir). """
[docs] def __init__( self, data: xr.Dataset, status_filter=True, cnr=None, method="single_dbs", tolerance="8s", ): self.logger = logging.getLogger( "lidarSuit.wind_prop_retrieval.GetWindProperties5Beam" ) self.logger.info("creating an instance of GetWindProperties5Beam") if not isinstance(data, xr.Dataset): self.logger.error("wrong data type: expecting a xr.Dataset") raise TypeError if status_filter: data["radial_wind_speed"] = data.radial_wind_speed.where( data.radial_wind_speed_status == 1 ) if cnr is not None: data["radial_wind_speed"] = data.radial_wind_speed.where( data.cnr >= cnr ) elevation = data.elevation.round(1) time90 = elevation.time.where(elevation == 90, drop=True) time_non_90 = elevation.time.where(elevation != 90, drop=True) azimuth_non_90 = data.azimuth.sel( time=time_non_90, method="Nearest" ).round(1) azimuth_non_90[azimuth_non_90 == 360] = 0 self.tolerance = tolerance self.azimuth_non_90 = azimuth_non_90 self.elevation_non_90 = elevation.sel(time=time_non_90) # replace range by measurement_height self.range_val_non_90 = data.measurement_height.sel(time=time_non_90) self.rad_wind_speed_non_90 = data.radial_wind_speed.sel( time=time_non_90 ) self.mean_time_non_90 = data.scan_mean_time.sel(time=time_non_90) self.range_val_90 = data.measurement_height.sel(time=time90) self.ver_wind_speed = data.radial_wind_speed.sel(time=time90) self.correct_vert_wind_comp() if method == "continuous": self.calc_hor_wind_comp_continuous() if method == "single_dbs": self.calc_hor_wind_comp_single_dbs() self.calc_hor_wind_speed() self.calc_hor_wind_dir()
def correct_wind_comp(self, comp): """ This function replaces the gate_index coordinate by the measurement_height. (For any component) Parameters ---------- comp : xarray.DataArray a variable from the original dataset Returns ------- comp : xarray.DataArray updated variable """ self.logger.info( f"replacing the gate_index coordinate by range: {comp.name}" ) comp = comp.rename({"gate_index": "range"}) comp = comp.assign_coords({"range": self.range_val_90.values[0]}) comp.range.attrs = self.range_val_90.attrs return comp def correct_vert_wind_comp(self): """ This function replaces the original coordinate from the vertical wind component by the measurement_height. """ self.logger.info("replacing coordinate from the vertical measurement") self.ver_wind_speed.name = "compW" ver_wind_speed = self.correct_wind_comp(self.ver_wind_speed) self.ver_wind_speed = ver_wind_speed return self def calc_hor_wind_comp_single_dbs(self): """ This method derives v and u components from the WindCube DBS files. The components are caculated from each individual DBS file. The mean time from each scan complete scan is used as identification tag. """ self.logger.info( "calculating the horizontal wind using the SINGLE DBS method" ) comp_wind_speed = self.rad_wind_speed_non_90 / ( 2 * np.cos(np.deg2rad(self.elevation_non_90)) ) comp_vn = comp_wind_speed.where(self.azimuth_non_90 == 0, drop=True) mean_time_vn = self.mean_time_non_90.where( self.azimuth_non_90 == 0, drop=True ) comp_vn = comp_vn.assign_coords({"time": mean_time_vn}) comp_vs = comp_wind_speed.where(self.azimuth_non_90 == 180, drop=True) mean_time_vs = self.mean_time_non_90.where( self.azimuth_non_90 == 180, drop=True ) comp_vs = comp_vs.assign_coords({"time": mean_time_vs}) comp_ue = comp_wind_speed.where(self.azimuth_non_90 == 90, drop=True) mean_time_ue = self.mean_time_non_90.where( self.azimuth_non_90 == 90, drop=True ) comp_ue = comp_ue.assign_coords({"time": mean_time_ue}) comp_uw = comp_wind_speed.where(self.azimuth_non_90 == 270, drop=True) mean_time_uw = self.mean_time_non_90.where( self.azimuth_non_90 == 270, drop=True ) comp_uw = comp_uw.assign_coords({"time": mean_time_uw}) self.comp_v = -(comp_vn - comp_vs) self.comp_u = -(comp_ue - comp_uw) self.comp_v.name = "comp_v" self.comp_u.name = "comp_u" self.comp_v = self.correct_wind_comp(self.comp_v) self.comp_u = self.correct_wind_comp(self.comp_u) def calc_hor_wind_comp_continuous(self): """ Function to derive wind v and u components. It folows the same approach used by the lidar software. """ self.logger.info( "calculating the horizontal wind using the CONTINUOUS DBS method" ) comp_wind_speed = self.rad_wind_speed_non_90 / ( 2 * np.cos(np.deg2rad(self.elevation_non_90)) ) self.comp_vn = comp_wind_speed.where( self.azimuth_non_90 == 0, drop=True ) self.comp_vs = comp_wind_speed.where( self.azimuth_non_90 == 180, drop=True ) comp_vs = self.comp_vs.reindex( time=self.comp_vn.time, method="Nearest", tolerance=self.tolerance ) self.comp_ue = comp_wind_speed.where( self.azimuth_non_90 == 90, drop=True ) self.comp_uw = comp_wind_speed.where( self.azimuth_non_90 == 270, drop=True ) comp_uw = self.comp_uw.reindex( time=self.comp_ue.time, method="Nearest", tolerance=self.tolerance ) self.comp_v = -(self.comp_vn - comp_vs) self.comp_u = -(self.comp_ue - comp_uw) self.comp_v.name = "comp_v" self.comp_u.name = "comp_u" self.comp_v = self.correct_wind_comp(self.comp_v) self.comp_u = self.correct_wind_comp(self.comp_u) self.comp_u = self.comp_u.reindex( time=self.comp_v.time, method="Nearest", tolerance=self.tolerance ) return self def calc_hor_wind_speed(self): """ Function to calculate the wind speed. """ self.logger.info( "calculating the horizontal wind speed using DBS observations" ) hor_wind_speed = np.sqrt(self.comp_v**2.0 + self.comp_u**2.0) hor_wind_speed.name = "hor_wind_speed" hor_wind_speed.attrs["long_name"] = "wind_speed" hor_wind_speed.attrs["units"] = "m/s" self.hor_wind_speed = hor_wind_speed return self def calc_hor_wind_dir(self): """ Function to derive wind direction. If folows the same approach used by the lidar sftware. """ self.logger.info("retrieving the wind direction using DBS observation") wind_dir = 180 + np.rad2deg(np.arctan2(-self.comp_u, -self.comp_v)) wind_dir.name = "hor_wind_dir" wind_dir.attrs["long_name"] = "wind_direction" wind_dir.attrs["units"] = "deg" self.hor_wind_dir = wind_dir return self
[docs]class RetriveWindFFT: """6 beam wind retrieval Wind retrieval based on the FFT method for the 6 beam observations. Parameters ---------- transfd_data : object An instance of the re-structured data, it should be preferentially filtered for artefacts. Returns ------- object : object An object containing an dataset of the retrieved wind speed, direction, wind components (meridional, zonal and vertical) and the relative beta """
[docs] def __init__(self, transfd_data: GetRestructuredData): self.logger = logging.getLogger( "lidarSuit.wind_prop_retrieval.FourierTransfWindMethod" ) self.logger.info("creating an instance of FourierTransfWindMethod") if not isinstance(transfd_data, GetRestructuredData): self.logger.error( "wrong data type: expecting a lst.GetRestructuredData instance" ) raise TypeError self.transfd_data = transfd_data self.ret_hor_wind_data() self.ret_vert_wind_data() self.get_beta() self.load_attrs()
def ret_hor_wind_data(self): """ It applies the FFT based method to retrieve the horizontal wind information """ self.logger.info("retrieving horizontal wind from the 6 beam data") tmp_wind_prop = FourierTransfWindMethod( self.transfd_data.data_transf ).wind_prop() tmp_wind_prop = tmp_wind_prop.squeeze(dim="elv") tmp_wind_prop = tmp_wind_prop.drop(["elv", "freq_azm"]) self.wind_prop = tmp_wind_prop return self def ret_vert_wind_data(self): """ It copies the vertical wind from the observations """ self.logger.info("selecting the vertical wind observations") tmp_wind_w = self.transfd_data.data_transf_90 tmp_wind_w = tmp_wind_w.rename({"time": "time90", "range90": "range"}) self.wind_prop["vertical_wind_speed"] = tmp_wind_w return self def get_beta(self): """ It copies the raw beta from the vertical observations """ self.logger.info("selcting beta from vertical observations") tmp_beta = self.transfd_data.relative_beta90 tmp_beta = tmp_beta.rename({"time": "time90", "range90": "range"}) self.wind_prop["lidar_relative_beta"] = tmp_beta return self def load_attrs(self): """ It loads the attributes from all variables into the dataset """ self.logger.info("loading data attributes") self.wind_prop = LoadAttributes(self.wind_prop).data return self