Source code for tudatpy.data.processTrk234.converters.derivedDoppler

from tudatpy.estimation.observations_setup.ancillary_settings import dsn_n_way_doppler_ancillary_settings
from tudatpy.estimation.observable_models_setup.links import link_definition, receiver, reflector1
from tudatpy.estimation.observable_models_setup.model_settings import ObservableType
from tudatpy.estimation.observations import create_single_observation_set, SingleObservationSet

from trk234 import SFDU
from . import RadioBase
from pandas import DataFrame


[docs] class DerivedDopplerConverter(RadioBase):
[docs] def extract(self, sfdu_list: list[SFDU]) -> DataFrame: # Filter SFDU objects that represent derived Carrier Doppler data. # - Derived Carrier Doppler format_code == 16 # - Only keep decoded ones # - Only keep 2W and 3W tracking mode doppler_sfdu = [sfdu for sfdu in sfdu_list if sfdu.pri_chdo.format_code == 16] for sfdu in doppler_sfdu: sfdu.decode(sfdu.binarydata, label=False, agg_chdo=False, pri_chdo=False) doppler_sfdu = [ sfdu for sfdu in doppler_sfdu if sfdu.is_decoded and (self.get_tracking_mode(sfdu) not in ("None", "1W")) ] data = { "epoch": [sfdu.timestamp() for sfdu in doppler_sfdu], "link_ends": [self.get_link_ends(sfdu) for sfdu in doppler_sfdu], "band": [self.get_band(sfdu) for sfdu in doppler_sfdu], "tracking_mode": [self.get_tracking_mode(sfdu) for sfdu in doppler_sfdu], "link_delays": [self.get_link_delays(sfdu) for sfdu in doppler_sfdu], "count_time": [sfdu.trk_chdo.obs_cnt_time for sfdu in doppler_sfdu], "obs": [sfdu.trk_chdo.rcv_carr_obs[0] for sfdu in doppler_sfdu], } return DataFrame(data)
[docs] def process(self, doppler_df: DataFrame, spacecraftName: str | None = None) -> list[SingleObservationSet]: observation_set_list = [] for link_end in doppler_df["link_ends"].unique(): link_ends_dict = self.build_link_ends_dict(link_end, spacecraftName) link_def = link_definition(link_ends_dict) df_le = doppler_df[doppler_df["link_ends"] == link_end] for band in df_le["band"].unique(): df_band = df_le[df_le["band"] == band] for ttd in df_band["link_delays"].unique(): df_ttd = df_band[df_band["link_delays"] == ttd] for ct in df_ttd["count_time"].unique(): df_ct = df_ttd[df_ttd["count_time"] == ct] ancillary_settings = dsn_n_way_doppler_ancillary_settings( [ self.frequencyBandsDict[band[0]], self.frequencyBandsDict[band[1]], ], self.frequencyBandsDict[band[1]], 0.0, ct, ttd, ) obs_values = df_ct["obs"].to_numpy(dtype=float).reshape((-1, 1)) station = link_end[2] if len(link_end) == 3 else link_end[1] epoch_seconds = ( df_ct["epoch"] .apply(lambda t: self.from_datetime_UTC_to_TDB(t, station)) .tolist() ) observation_set = create_single_observation_set( ObservableType.dsn_n_way_averaged_doppler_type, link_def.link_ends, obs_values, epoch_seconds, receiver, ancillary_settings, ) observation_set_list.append(observation_set) return observation_set_list