Source code for tudatpy.data.processTrk234.converters.derivedSraRange

from tudatpy.estimation.observations_setup.ancillary_settings import dsn_n_way_range_ancillary_settings
from tudatpy.estimation.observable_models_setup.links import link_definition, receiver
from tudatpy.estimation.observable_models_setup.model_settings import ObservableType

from tudatpy.estimation.observations import create_single_observation_set, SingleObservationSet

from . import RadioBase
from trk234 import SFDU
from pandas import DataFrame
import numpy as _np
from datetime import datetime, timedelta


[docs] class DerivedSraRangeConverter(RadioBase):
[docs] def extract(self, sfdu_list: list[SFDU]) -> DataFrame: # Filter SFDU objects that represent SRA Range data. # - SRA Range format_code == 7 # - Only keep decoded ones # - Skip records with invalid obs (obs == -1.0 and rng_type != 0) # - Skip if uplink frequency is zero # - Only keep 2W and 3W tracking mode range_sfdu_list = [sfdu for sfdu in sfdu_list if sfdu.pri_chdo.format_code == 7] for sfdu in range_sfdu_list: sfdu.decode(sfdu.binarydata, label=False, agg_chdo=False, pri_chdo=False) range_sfdu = [ sfdu for sfdu in range_sfdu_list if sfdu.is_decoded and ( self.get_tracking_mode(sfdu) == "2W" or self.get_tracking_mode(sfdu) == "3W" ) and not (sfdu.trk_chdo.meas_rng == -1.0 and sfdu.trk_chdo.rng_type != 0) and sfdu.trk_chdo.ul_freq != 0.0 ] data = { "epoch": [sfdu.timestamp() for sfdu in range_sfdu], "link_ends": [self.get_link_ends(sfdu) for sfdu in range_sfdu], "band": [self.get_band(sfdu) for sfdu in range_sfdu], "tracking_mode": [self.get_tracking_mode(sfdu) for sfdu in range_sfdu], "link_delays": [self.get_link_delays(sfdu) for sfdu in range_sfdu], "obs": [ _np.mod(sfdu.trk_chdo.meas_rng, sfdu.trk_chdo.rng_modulo) for sfdu in range_sfdu ], "zero_phase_times": [ self.get_zero_phase_times(sfdu) for sfdu in range_sfdu ], "lowest_ranging_component": [ sfdu.trk_chdo.last_comp_num for sfdu in range_sfdu ], } return DataFrame(data)
[docs] def process(self, range_df: DataFrame, spacecraftName: str | None = None) -> list[SingleObservationSet]: observation_set_list = [] for link_end in range_df["link_ends"].unique(): link_ends_dict = self.build_link_ends_dict(link_end, spacecraftName) link_def = link_definition(link_ends_dict) df_le = range_df[range_df["link_ends"] == link_end] for band in df_le["band"].unique(): df_band = df_le[df_le["band"] == band] for ttd in df_band["link_delays"].unique(): df_ttd = df_band[df_band["link_delays"] == ttd] for lrc in df_ttd["lowest_ranging_component"].unique(): df_lrc = df_ttd[df_ttd["lowest_ranging_component"] == lrc] # NOTE - we force to build an observation set per observable in order to # store conversion factor calculated in the dsnNWayRangeObservationModel.h # A faster approach would be to use the dataframe apply method as done in # the derivedDoppler converter. for _, row in df_lrc.iterrows(): ancillary_settings = dsn_n_way_range_ancillary_settings( [ self.frequencyBandsDict[band[0]], self.frequencyBandsDict[band[1]], ], lrc, ttd, ) obs_values = [ _np.array([row["obs"]], dtype=float).reshape((-1, 1)) ] station = link_end[2] if len(link_end) == 3 else link_end[1] epoch_seconds = [ self.from_datetime_UTC_to_TDB(row["epoch"], station) ] observation_set = create_single_observation_set( ObservableType.dsn_n_way_range_type, link_def.link_ends, obs_values, epoch_seconds, receiver, ancillary_settings, ) observation_set_list.append(observation_set) return observation_set_list
[docs] def get_zero_phase_times(self, sfdu: SFDU) -> tuple[datetime, datetime]: """ Get the zero phase times from the three way SRA Range. Parameters ---------- sfdu : SFDU The SFDU object to extract the zero phase times from. Returns ------- tuple A tuple containing the transmit and receive zero phase times. """ return ( sfdu.timestamp() + timedelta(seconds=sfdu.trk_chdo.transmit_inphs_time), sfdu.timestamp() + timedelta(seconds=sfdu.trk_chdo.rcv_inphs_time), )