"""Module for keep all filtering functionalities
"""
import logging
import pandas as pd
import numpy as np
import xarray as xr
module_logger = logging.getLogger("lidarSuit.filters")
def filter_status(ds: xr.Dataset):
"""Filter dataset based on WindCube's software
Parameters
----------
ds: xr.Dataset
Dataset with LIDAR records
Returns
-------
xr.Dataset:
A Dataset with records where status was true (1)
"""
if "radial_wind_speed_status90" not in ds:
module_logger.error(
"filter_status() requires radial_wind_speed_status90"
)
raise ValueError
return ds.where(ds.radial_wind_speed_status90 == 1)
def filter_snr(ds: xr.Dataset, snr: float):
if "cnr90" not in ds:
module_logger.error("filter_snr() requires cnr90")
raise ValueError
return ds.where(ds.cnr90 > snr)
[docs]class Filtering:
"""SNR and Status filter
It uses the carrier to noise ratio (SNR) and status
variables available in the WindCube's data. It is
similar to the filter described in the manual
Parameters
----------
data : xrarray.Dataset
Dataset containing the original WindCube's data
"""
[docs] def __init__(self, data):
self.data = data
def get_vertical_obs_comp(self, variable, snr=False, status=True):
"""Vertical data filter
It uses the SNR and status variables to filter out
the data from vertical observations
Parameters
----------
variable : str
name of the variable that will be filtered
snr : bool, int, optional
if an interger is given it is used to
as threshold to filter the data based on
the signal to noise ratio
status : bool, optional
if true it filters the data using the status
variable generated by the WindCube's software
Returns
-------
tmp_data : xarray.DataSet
an instance of the variable filtered using
SNR or status variable
"""
tmp_data = self.data[variable]
if status:
tmp_data = tmp_data.where(
self.data.radial_wind_speed_status90 == 1
)
if snr is not False:
tmp_data = tmp_data.where(self.data.cnr90 > snr)
tmp_data = tmp_data.where(self.data.elevation == 90, drop=True)
return tmp_data
def get_radial_obs_comp(self, variable, azm, snr=False, status=True):
"""Slanted data filter
It uses the SNR and status variables to filter out
the data from slanted observations
Parameters
----------
variable : str
name of the variable that will be filtered
snr : bool, int, optional
if an interger is given it is used to
as threshold to filter the data based on
the signal to noise ratio
status : bool, optional
if true it filters the data using the status
variable generated by the WindCube's software
Returns
-------
tmpaData : xarray.DataSet
an instance of the variable filtered using
SNR or status variable
"""
tmp_data = self.data[variable]
if status:
tmp_data = tmp_data.where(self.data.radial_wind_speed_status == 1)
if snr is not False:
tmp_data = tmp_data.where(self.data.cnr > snr)
tmp_data = tmp_data.where(
(self.data.elevation != 90) & (self.data.azimuth == azm), drop=True
)
return tmp_data
# it removes the STE below cloud layer
[docs]class SecondTripEchoFilter:
"""Boundary layer second trip echoes filter
This filter minimises the presence of second
trip echoes (STE). This filter is based on the
standard deviation of the anomaly of the observaions.
It is applicable in regions where there is a contrast
between the real data and the STE.
Parameters
----------
data : object
the object returned from the GetRestructuredData
timeCloudMaks : xarray.DataArray
it is a time series for indicating the presence
of clouds above the maximum WinCube range.
1 indicates cloud and 0 indicates no cloud.
(THIS MAKS IS NOT NEEDED NOW)
n_prof : int
number of profiles used to calculating the anomaly
center : bool, optional
it defines how the mean value for the anomaly will
be calculated
min_periods : int
minimum number of profiles used for calculating the
mean value
n_std : int
Multiplication factor for defining the size of the
window to keep the data. The filter removes any
anomaly larger than n_std * std
str_h : str
starting hour for calculating the anomaly
end_h : str
end hour for calculating the anomaly
Returns
-------
object : object
an object containing data filtered for STE
"""
[docs] def __init__(
self,
data,
n_prof=500,
center=True,
min_periods=30,
n_std=2,
str_h="09",
end_h="16",
):
self.lidar = data
# self.time_cloud_mask = time_cloud_mask
self.n_prof = n_prof
self.center = center
self.min_periods = min_periods
self.n_std = n_std
self.get_time_edges(str_h=str_h, end_h=end_h)
self.cal_mean_and_anom_slant()
self.cal_mean_and_anom_90()
self.cleaning()
self.cleaning90()
def get_time_edges(self, str_h="09", end_h="16"):
"""
It creates the time boundaries for the STD anomaly calculation
Parameters
----------
str_h : str
starting hour for calculating the anomaly
end_h : str
end hour for calculating the anomaly
"""
sel_time = pd.to_datetime(self.lidar.data_transf.time.values[0])
sel_time = sel_time.strftime("%Y%m%d")
self.start_time = pd.to_datetime(f"{sel_time} {str_h}")
self.end_time = pd.to_datetime(f"{sel_time} {end_h}")
def cal_mean_and_anom_slant(self):
"""
It calculates the anomaly from the slanted observations
"""
# slanted beam
tmp_sel_data = self.lidar.data_transf
self.data_mean = tmp_sel_data.rolling(
time=self.n_prof, center=self.center, min_periods=self.min_periods
).mean()
self.data_anom = self.lidar.data_transf - self.data_mean
def cal_mean_and_anom_90(self):
"""
It calculates the anomaly from the vertical observations
"""
# vertical beam
tmp_sel_data_90 = self.lidar.data_transf_90
self.data_mean90 = tmp_sel_data_90.rolling(
time=self.n_prof, center=self.center, min_periods=self.min_periods
).mean()
self.data_anom_90 = self.lidar.data_transf_90 - self.data_mean90
def cleaning(self):
"""
It removes the data that is larger than the n_std * anomaly
from the slanted observations
"""
tmp_anom = self.data_anom.where(
(self.lidar.data_transf.time > self.start_time)
& (self.lidar.data_transf.time < self.end_time)
)
anom_std = tmp_anom.std(dim=["time", "range", "elv"])
tmp_clean_data = self.lidar.data_transf.copy()
tmp_clean_data = tmp_clean_data.where(
np.abs(self.data_anom) < self.n_std * anom_std
)
self.lidar.data_transf.values = tmp_clean_data.values
def cleaning90(self):
"""
It removes the data that is larger than the n_std * anomaly
from the vertical observations
"""
tmp_anom = self.data_anom_90.where(
(self.lidar.data_transf_90.time > self.start_time)
& (self.lidar.data_transf_90.time < self.end_time)
)
anom_std = tmp_anom.std(dim=["time", "range90"])
tmp_clean_data = self.lidar.data_transf_90.copy()
tmp_clean_data = tmp_clean_data.where(
np.abs(self.data_anom_90) < self.n_std * anom_std
)
self.lidar.data_transf_90.values = tmp_clean_data.values
# it removes STE and clouds contamination
# from above the aerosol loaded region
[docs]class WindCubeCloudRemoval:
"""Above boundary layer second trip echoes filter
This filter reduces the second trip echoes contamination
and clouds from regions above the boundary layer. It
requires information from the ceilometer.
IT IS STILL EXPERIMENTAL
Parameters
----------
ceilo : xarray.Dataset
A dataset of the CHM15k Nimbus ceilometer observations.
lidar : xarray.Dataset
An instance of the re-structured WindCube dataset
Returns
-------
object : object
an object containing an instance of the noise
height interface and the re-structured dataset
filtered for STE and clouds.
"""
[docs] def __init__(self, ceilo, lidar=None):
self.lidar = lidar
self.ceilo = ceilo
self.get_noise_free_beta()
self.get_height_interface()
if lidar is not None:
self.get_interp_interf_height()
self.remove_cloud()
def get_noise_free_beta(self):
"""
It removes the noise from the backscattered signal
"""
positive_beta = self.ceilo.beta_raw.where(self.ceilo.beta_raw > 0)
positive_beta = positive_beta.rolling(
range=10, center=True, min_periods=10
).mean()
positive_beta = positive_beta.rolling(
time=15, center=True, min_periods=15
).mean()
self.noise_free_beta = positive_beta
return self
def get_height_interface(self):
"""
It identifies the height of the separation between
the noise and the non-noise data from the ceilometer
backscattered signal
"""
positive_beta = self.ceilo.beta_raw.where(self.ceilo.beta_raw > 0)
tmp_ceilo_hgt = positive_beta.copy()
tmp_values = tmp_ceilo_hgt.values
tmp_values[np.isfinite(tmp_values)] = 1
tmp_ceilo_hgt.values = tmp_values
tmp_ceilo_hgt = tmp_ceilo_hgt * self.ceilo.range
lowest_beta = self.noise_free_beta.where(tmp_ceilo_hgt < 4e3)
tmp_ceilo_hgt = tmp_ceilo_hgt.where(np.isfinite(lowest_beta))
self.interf_height = tmp_ceilo_hgt.max(dim="range")
self.interf_height = self.interf_height.rolling(
time=7, center=True, min_periods=5
).mean()
return self
def get_interp_interf_height(self):
"""
It interpolates the noise height interface to the same
temporal resolution from the windcube data
"""
self.interp_interf_height = self.interf_height.interp(
time=self.lidar.data_transf.time
)
self.interp_interf_height_90 = self.interf_height.interp(
time=self.lidar.data_transf_90.time
)
return self
def remove_cloud(self):
"""
It removes from the windcube's observation all
data above the noise height interface
"""
tmp_height = self.lidar.data_transf.copy()
tmp_values = tmp_height.values
tmp_values[np.isfinite(tmp_values)] = 1
tmp_height.values = tmp_values
tmp_height = tmp_height * self.lidar.data_transf.range
self.lidar.data_transf = self.lidar.data_transf.where(
tmp_height < self.interp_interf_height
)
tmp_height = self.lidar.data_transf_90.copy()
tmp_values = tmp_height.values
tmp_values[np.isfinite(tmp_values)] = 1
tmp_height.values = tmp_values
tmp_height = tmp_height * self.lidar.data_transf_90.range90
self.lidar.data_transf_90 = self.lidar.data_transf_90.where(
tmp_height < self.interp_interf_height_90
)
self.lidar.relative_beta90 = self.lidar.relative_beta90.where(
tmp_height < self.interp_interf_height_90
)
return self