Source code for lidarSuit.filters

"""Module for keep all filtering functionalities

"""
import logging

import pandas as pd
import numpy as np
import xarray as xr

module_logger = logging.getLogger("lidarSuit.filters")


def filter_status(ds: xr.Dataset):
    """Filter dataset based on WindCube's software

    Parameters
    ----------
    ds: xr.Dataset
        Dataset with LIDAR records

    Returns
    -------

    xr.Dataset:
        A Dataset with records where status was true (1)
    """
    if "radial_wind_speed_status90" not in ds:
        module_logger.error(
            "filter_status() requires radial_wind_speed_status90"
        )
        raise ValueError

    return ds.where(ds.radial_wind_speed_status90 == 1)


def filter_snr(ds: xr.Dataset, snr: float):
    if "cnr90" not in ds:
        module_logger.error("filter_snr() requires cnr90")
        raise ValueError

    return ds.where(ds.cnr90 > snr)


[docs]class Filtering: """SNR and Status filter It uses the carrier to noise ratio (SNR) and status variables available in the WindCube's data. It is similar to the filter described in the manual Parameters ---------- data : xrarray.Dataset Dataset containing the original WindCube's data """
[docs] def __init__(self, data): self.data = data
def get_vertical_obs_comp(self, variable, snr=False, status=True): """Vertical data filter It uses the SNR and status variables to filter out the data from vertical observations Parameters ---------- variable : str name of the variable that will be filtered snr : bool, int, optional if an interger is given it is used to as threshold to filter the data based on the signal to noise ratio status : bool, optional if true it filters the data using the status variable generated by the WindCube's software Returns ------- tmp_data : xarray.DataSet an instance of the variable filtered using SNR or status variable """ tmp_data = self.data[variable] if status: tmp_data = tmp_data.where( self.data.radial_wind_speed_status90 == 1 ) if snr is not False: tmp_data = tmp_data.where(self.data.cnr90 > snr) tmp_data = tmp_data.where(self.data.elevation == 90, drop=True) return tmp_data def get_radial_obs_comp(self, variable, azm, snr=False, status=True): """Slanted data filter It uses the SNR and status variables to filter out the data from slanted observations Parameters ---------- variable : str name of the variable that will be filtered snr : bool, int, optional if an interger is given it is used to as threshold to filter the data based on the signal to noise ratio status : bool, optional if true it filters the data using the status variable generated by the WindCube's software Returns ------- tmpaData : xarray.DataSet an instance of the variable filtered using SNR or status variable """ tmp_data = self.data[variable] if status: tmp_data = tmp_data.where(self.data.radial_wind_speed_status == 1) if snr is not False: tmp_data = tmp_data.where(self.data.cnr > snr) tmp_data = tmp_data.where( (self.data.elevation != 90) & (self.data.azimuth == azm), drop=True ) return tmp_data
# it removes the STE below cloud layer
[docs]class SecondTripEchoFilter: """Boundary layer second trip echoes filter This filter minimises the presence of second trip echoes (STE). This filter is based on the standard deviation of the anomaly of the observaions. It is applicable in regions where there is a contrast between the real data and the STE. Parameters ---------- data : object the object returned from the GetRestructuredData timeCloudMaks : xarray.DataArray it is a time series for indicating the presence of clouds above the maximum WinCube range. 1 indicates cloud and 0 indicates no cloud. (THIS MAKS IS NOT NEEDED NOW) n_prof : int number of profiles used to calculating the anomaly center : bool, optional it defines how the mean value for the anomaly will be calculated min_periods : int minimum number of profiles used for calculating the mean value n_std : int Multiplication factor for defining the size of the window to keep the data. The filter removes any anomaly larger than n_std * std str_h : str starting hour for calculating the anomaly end_h : str end hour for calculating the anomaly Returns ------- object : object an object containing data filtered for STE """
[docs] def __init__( self, data, n_prof=500, center=True, min_periods=30, n_std=2, str_h="09", end_h="16", ): self.lidar = data # self.time_cloud_mask = time_cloud_mask self.n_prof = n_prof self.center = center self.min_periods = min_periods self.n_std = n_std self.get_time_edges(str_h=str_h, end_h=end_h) self.cal_mean_and_anom_slant() self.cal_mean_and_anom_90() self.cleaning() self.cleaning90()
def get_time_edges(self, str_h="09", end_h="16"): """ It creates the time boundaries for the STD anomaly calculation Parameters ---------- str_h : str starting hour for calculating the anomaly end_h : str end hour for calculating the anomaly """ sel_time = pd.to_datetime(self.lidar.data_transf.time.values[0]) sel_time = sel_time.strftime("%Y%m%d") self.start_time = pd.to_datetime(f"{sel_time} {str_h}") self.end_time = pd.to_datetime(f"{sel_time} {end_h}") def cal_mean_and_anom_slant(self): """ It calculates the anomaly from the slanted observations """ # slanted beam tmp_sel_data = self.lidar.data_transf self.data_mean = tmp_sel_data.rolling( time=self.n_prof, center=self.center, min_periods=self.min_periods ).mean() self.data_anom = self.lidar.data_transf - self.data_mean def cal_mean_and_anom_90(self): """ It calculates the anomaly from the vertical observations """ # vertical beam tmp_sel_data_90 = self.lidar.data_transf_90 self.data_mean90 = tmp_sel_data_90.rolling( time=self.n_prof, center=self.center, min_periods=self.min_periods ).mean() self.data_anom_90 = self.lidar.data_transf_90 - self.data_mean90 def cleaning(self): """ It removes the data that is larger than the n_std * anomaly from the slanted observations """ tmp_anom = self.data_anom.where( (self.lidar.data_transf.time > self.start_time) & (self.lidar.data_transf.time < self.end_time) ) anom_std = tmp_anom.std(dim=["time", "range", "elv"]) tmp_clean_data = self.lidar.data_transf.copy() tmp_clean_data = tmp_clean_data.where( np.abs(self.data_anom) < self.n_std * anom_std ) self.lidar.data_transf.values = tmp_clean_data.values def cleaning90(self): """ It removes the data that is larger than the n_std * anomaly from the vertical observations """ tmp_anom = self.data_anom_90.where( (self.lidar.data_transf_90.time > self.start_time) & (self.lidar.data_transf_90.time < self.end_time) ) anom_std = tmp_anom.std(dim=["time", "range90"]) tmp_clean_data = self.lidar.data_transf_90.copy() tmp_clean_data = tmp_clean_data.where( np.abs(self.data_anom_90) < self.n_std * anom_std ) self.lidar.data_transf_90.values = tmp_clean_data.values
# it removes STE and clouds contamination # from above the aerosol loaded region
[docs]class WindCubeCloudRemoval: """Above boundary layer second trip echoes filter This filter reduces the second trip echoes contamination and clouds from regions above the boundary layer. It requires information from the ceilometer. IT IS STILL EXPERIMENTAL Parameters ---------- ceilo : xarray.Dataset A dataset of the CHM15k Nimbus ceilometer observations. lidar : xarray.Dataset An instance of the re-structured WindCube dataset Returns ------- object : object an object containing an instance of the noise height interface and the re-structured dataset filtered for STE and clouds. """
[docs] def __init__(self, ceilo, lidar=None): self.lidar = lidar self.ceilo = ceilo self.get_noise_free_beta() self.get_height_interface() if lidar is not None: self.get_interp_interf_height() self.remove_cloud()
def get_noise_free_beta(self): """ It removes the noise from the backscattered signal """ positive_beta = self.ceilo.beta_raw.where(self.ceilo.beta_raw > 0) positive_beta = positive_beta.rolling( range=10, center=True, min_periods=10 ).mean() positive_beta = positive_beta.rolling( time=15, center=True, min_periods=15 ).mean() self.noise_free_beta = positive_beta return self def get_height_interface(self): """ It identifies the height of the separation between the noise and the non-noise data from the ceilometer backscattered signal """ positive_beta = self.ceilo.beta_raw.where(self.ceilo.beta_raw > 0) tmp_ceilo_hgt = positive_beta.copy() tmp_values = tmp_ceilo_hgt.values tmp_values[np.isfinite(tmp_values)] = 1 tmp_ceilo_hgt.values = tmp_values tmp_ceilo_hgt = tmp_ceilo_hgt * self.ceilo.range lowest_beta = self.noise_free_beta.where(tmp_ceilo_hgt < 4e3) tmp_ceilo_hgt = tmp_ceilo_hgt.where(np.isfinite(lowest_beta)) self.interf_height = tmp_ceilo_hgt.max(dim="range") self.interf_height = self.interf_height.rolling( time=7, center=True, min_periods=5 ).mean() return self def get_interp_interf_height(self): """ It interpolates the noise height interface to the same temporal resolution from the windcube data """ self.interp_interf_height = self.interf_height.interp( time=self.lidar.data_transf.time ) self.interp_interf_height_90 = self.interf_height.interp( time=self.lidar.data_transf_90.time ) return self def remove_cloud(self): """ It removes from the windcube's observation all data above the noise height interface """ tmp_height = self.lidar.data_transf.copy() tmp_values = tmp_height.values tmp_values[np.isfinite(tmp_values)] = 1 tmp_height.values = tmp_values tmp_height = tmp_height * self.lidar.data_transf.range self.lidar.data_transf = self.lidar.data_transf.where( tmp_height < self.interp_interf_height ) tmp_height = self.lidar.data_transf_90.copy() tmp_values = tmp_height.values tmp_values[np.isfinite(tmp_values)] = 1 tmp_height.values = tmp_values tmp_height = tmp_height * self.lidar.data_transf_90.range90 self.lidar.data_transf_90 = self.lidar.data_transf_90.where( tmp_height < self.interp_interf_height_90 ) self.lidar.relative_beta90 = self.lidar.relative_beta90.where( tmp_height < self.interp_interf_height_90 ) return self