Source code for tudatpy.data.mpc.mpc

from tudatpy.dynamics import environment_setup
from tudatpy.dynamics import environment
from tudatpy.estimation import observations
from tudatpy.estimation.observable_models_setup import (
    model_settings,
    links,
)
from astropy.table import Table
from tudatpy.dynamics.environment_setup import add_gravity_field_model
from tudatpy.dynamics.environment_setup.gravity_field import central_sbdb
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import datetime
from astroquery.mpc import MPC
from astropy_healpix import HEALPix
from astropy.units import Quantity
import astropy.units as u
import astropy
import copy
import os
import re
from tudatpy.astro import time_representation
from tudatpy.astro.time_representation import DateTime
from tudatpy.data.mpc.parser_80col import parse_80cols_file
from tudatpy.data.mpc.parser_80col import unpackers
# do not remove this line, even if it looks liek an unused import line
from tudatpy.data.mpc.parser_80col.unpackers import OBS_TYPES_TO_DROP

BIAS_LOWRES_FILE = os.path.join(
    os.path.expanduser("~"),
    ".tudat",
    "resource",
    "star_catalog_biases",
    "debias_2018",
    "bias.dat",
)

# Described here:
# https://www.minorplanetcenter.net/iau/info/CatalogueCodes.html
DEFAULT_CATALOG_FLAGS = [
    "a",
    "b",
    "c",
    "d",
    "e",
    "g",
    "i",
    "j",
    "l",
    "m",
    "n",
    "o",
    "p",
    "q",
    "r",
    "t",
    "u",
    "v",
    "w",
    "L",
    "N",
    "Q",
    "R",
    "S",
    "U",
    "Y",
]


[docs] def load_bias_file( filepath: str, Nside: int | None = None, catalog_flags: list = DEFAULT_CATALOG_FLAGS, ) -> tuple[pd.DataFrame, int]: """Loads a healpix star catalog debias file and processes it into a dataframe. Automatically retrieves NSIDE parameter. Parameters ---------- filepath : str Filepath of debias file. Nside : int | None, optional NSIDE value, to be left None in most cases as this is retrieved automatically by the function, by default None catalog_flags : list | None, optional list of catalog flags, should be left default in most cases, by default None Returns ------- tuple[pd.DataFrame, int] Dataframe with biases in multiindex format ((Npix x Ncat) x Nvals), the numpix value Raises ------ ValueError If NSIDE cannot be retrieved automatically. """ # auto retrieve NSIDE if Nside is None: counter = 0 with open(filepath, "r") as file: while counter < 10: line = file.readline() pattern = r"! NSIDE=\s*(\d+)" match = re.search(pattern, line) if match: Nside = int(match.group(1)) break counter += 1 if Nside is None: raise ValueError( "Could not automatically retrieve NSIDE, please provide it as a parameter" ) if catalog_flags is None: catalog_flags = DEFAULT_CATALOG_FLAGS catalog_flags = catalog_flags + ["unknown"] values = ["RA", "DEC", "PMRA", "PMDEC"] # create a multi_index, this effectively creates a df with 3 dimensions. [row, catalog, value] m_index = pd.MultiIndex.from_product( [catalog_flags, values], names=["catalog", "value"], ) bias_dataframe = pd.read_csv( filepath, sep=" ", skiprows=23, skipinitialspace=True, index_col=None, header=None, ).iloc[:, :-1] # we add a set of 'unknown' columns to speed up assignment later len_df = bias_dataframe.shape[0] unknown_columns = np.zeros(shape=(len_df, 4)) bias_dataframe[["aa", "bb", "cc", "dd"]] = unknown_columns # apply the multi_index bias_dataframe.columns = m_index # stack it so it goes from a Npix x (Ncat x Nvals) to (Npix x Ncat) x Nvals shape bias_dataframe = bias_dataframe.stack(level=0, future_stack = True) return bias_dataframe, Nside
[docs] def get_biases_EFCC18( RA: float | np.ndarray | list, DEC: float | np.ndarray | list, epoch_seconds_TDB: float | np.ndarray | list, catalog: str | np.ndarray | list, bias_file: str | None = BIAS_LOWRES_FILE, Nside: int | None = None, catalog_flags: list[str] = DEFAULT_CATALOG_FLAGS, ) -> tuple[np.ndarray, np.ndarray]: """Calculate and return star catalog bias values as described in: "Star catalog position and proper motion corrections in asteroid astrometry II: The Gaia era" by Eggl et al. (2018). Uses the regular bias set by default. A high res version of the bias map can be retrieved from the paper. This can then be selected using the bias_file paramater. Parameters ---------- RA : float | np.ndarray | list Right Ascension value in radians DEC : float | np.ndarray | list Declination value in radians epoch_seconds_TDB : float | np.ndarray | list Time in seconds since J2000 TDB. catalog : str | np.ndarray | list Star Catalog code as described by MPC: https://www.minorplanetcenter.net/iau/info/CatalogueCodes.html bias_file : str | None, optional Optional bias file location, used to load in alternative debias coefficients. By default coefficients are retrieved from Tudat resources, by default None Nside : int | None, optional Optional Nside value, should be left None in most cases, by default None catalog_flags : list[str] | None, optional List of catalog values to use, should be left None in most cases, by default None Returns ------- tuple[np.ndarray, np.ndarray] Right Ascencion Corrections, Declination corrections Raises ------ ValueError If all mandatory inputs are not matching in size. """ if bias_file is None: bias_file = BIAS_LOWRES_FILE # transform input to numpy arrays if not isinstance(RA, np.ndarray): RA = np.array([RA]).flatten() if not isinstance(DEC, np.ndarray): DEC = np.array([DEC]).flatten() if not isinstance(epoch_seconds_TDB, np.ndarray): epoch_seconds_TDB = np.array([epoch_seconds_TDB]).flatten() if not isinstance(catalog, np.ndarray): catalog = np.array([catalog]).flatten() if not (len(RA) == len(DEC) == len(epoch_seconds_TDB) == len(catalog)): raise ValueError("All inputs must have same size") # load bias file # index matches the pixels # this is effectively a 3d table with axes: (pixel, star catalog), value) using pandas multiindex bias_df, nside = load_bias_file( filepath=bias_file, Nside=Nside, catalog_flags=catalog_flags ) # find nearest tile using HEALPix Algorithm and get indices # ideally nside should be retrieved from the load_bias_file function hp_obj = HEALPix(nside=nside) pixels = hp_obj.lonlat_to_healpix( Quantity(RA, unit=u.rad), Quantity(DEC, unit=u.rad) ) # retrieve bias values from bias file using indices # result is N x 4 biases for the correct star catalog all_catalog_ids = bias_df.index.levels[1].to_list() # this changes all ids not present in the bias file to unknown, resulting in zero bias catalog = ["unknown" if (cat not in all_catalog_ids) else cat for cat in catalog] # create combinations of pixel id and catalog then retrieve biases targets = [(pix, cat) for pix, cat in zip(pixels, catalog)] biases = bias_df.loc[targets, ["RA", "DEC", "PMRA", "PMDEC"]].to_numpy() # same as find_orb -> bias.cpp # https://github.com/Bill-Gray/find_orb/blob/master/bias.cpp#L213 epochs_years = [time_representation.seconds_since_epoch_to_julian_years_since_epoch(epoch_tdb) for epoch_tdb in epoch_seconds_TDB] # from the bias file readme.txt: RA_correction = biases[:, 0] + (epochs_years * (biases[:, 2] / 1000)) RA_correction = RA_correction / np.cos(DEC) # DEC here in radians because of cosine DEC_correction = biases[:, 1] + (epochs_years * (biases[:, 3] / 1000)) # convert from arcsec to radians RA_correction = Quantity(RA_correction, unit=u.arcsec).to(u.rad).value DEC_correction = Quantity(DEC_correction, unit=u.arcsec).to(u.rad).value return RA_correction, DEC_correction
[docs] def get_weights_VFCC17( MPC_codes: pd.Series | list | np.ndarray | None = None, epoch: pd.Series | list | np.ndarray | None = None, observation_type: pd.Series | list | np.ndarray | None = None, observatory: pd.Series | list | np.ndarray | None = None, star_catalog: pd.Series | list | np.ndarray | None = None, mpc_table: pd.DataFrame | None = None, return_full_table=False, ) -> np.ndarray | pd.DataFrame: """Retrieves observation weights using the weighting scheme presented in "Statistical analysis of astrometric errors for the most productive asteroid surveys" by Veres et al. (2017). Input may be provided using either a full MPC table (e.g. from BatchMPC) or using the individual variables. Observation types: "x", "X", "V", "v", "W", "w", "R", "r", "Q", "q", "O", are not described by the paper and receive a placeholder weight of 1/100 if provided. Parameters ---------- MPC_codes : pd.Series | list | np.ndarray | None, optional Iterable with the MPC target codes, e.g. 433 for Eros. Size must match other iterables, by default None epoch : pd.Series | list | np.ndarray | None, optional Epoch expressed as Julian Days. Size must match other iterables, by default None observation_type : pd.Series | list | np.ndarray | None, optional Iterable with the observation types in MPC format. See the NOTE2 section of the MPC format description for the exact encoding: https://minorplanetcenter.net/iau/info/OpticalObs.html. Size must match other iterables, by default None observatory : pd.Series | list | np.ndarray | None, optional Iterable with the MPC target codes, e.g. 433 for Eros. Size must match other iterables, by default None star_catalog : pd.Series | list | np.ndarray | None, optional Iterable with the star catalog codes. See the MPC catalog codes page for the exact encoding: https://www.minorplanetcenter.net/iau/info/CatalogueCodes.html. Size must match other iterables, by default None mpc_table : pd.DataFrame | None, optional Table retrieved by calling the mpc.BatchMPC.table property. Set None when using iterable input. Set others None when using table, by default None return_full_table : bool, optional Return the table with all intermediate calculations if True, return a numpy array if False, by default False Returns ------- np.ndarray If `return_full_table` is False, numpy array with weights with same size as input. pd.DataFrame If `return_full_table` is True, pandas table with all intermediate calculations. Raises ------ ValueError MPC_codes, epoch, observation_type, observatory and star_catalog must all be not None and the same size. mpc_table must be None. If table input is used, the remaining input parameters must be done. """ # Input handling if ( (mpc_table is None) and (epoch is not None) and (observation_type is not None) and (observatory is not None) and (star_catalog is not None) ): if not ( len(epoch) == len(observation_type) == len(observatory) == len(star_catalog) ): raise ValueError("All inputs must have same size") table_dict = { "number": MPC_codes, "epoch": epoch, # This is expressed in Julian Days "note2": observation_type, "observatory": observatory, "catalog": star_catalog, } table = pd.DataFrame.from_dict(table_dict) elif ( (mpc_table is not None) and (epoch is None) and (observation_type is None) and (observatory is None) and (star_catalog is None) ): table = mpc_table.copy() else: raise ValueError( "Must provide either parameters: `epoch`, `observation_type`, `observatory` and `star_catalog` OR `mpc_table`." ) table["observatory"] = table["observatory"].astype(str).str.strip().str.zfill(3) table["number"] = table["number"].astype(str).str.strip() # NOTE 1000 is a placeholder. The following observation types are not processed and receive the placeholder value: # first_discoveries = ["x", "X"], roaming = ["V", "v", "W", "w"], radar = ["R", "r", "Q", "q"], offset = ["O"] table = table.assign(inv_w=lambda _: 1000) # get an approximate timezone based on the observatory code's longitude observatories_table = MPC.get_observatory_codes().to_pandas() observatories_table["Code"] = observatories_table["Code"].astype(str).str.strip().str.zfill(3) observatories_table = ( observatories_table.assign( lon_wrapping=lambda x: (x.Longitude + 180) % 360 - 180 ) .assign(approx_tz=lambda x: ((x.lon_wrapping / 180) * 12)) .assign(jd_tz=lambda x: (x.lon_wrapping / 360).fillna(0)) .loc[:, ["Code", "approx_tz", "jd_tz"]] ) # the reset_index + set_index is a cheeky way to keep the original table index # this prevents mismatches when adding weights to the tables in BatchMPC.to_tudat() table = ( pd.merge( how="left", left=table.reset_index(), right=observatories_table, left_on="observatory", right_on="Code", ) .drop("Code", axis=1) .set_index("index") ) # create column that modifies the JD with an approximate timezone. # the time JD.50 will then be the approximate midnight at that timezone. # if we then take the int, we can group by this number to get all observations that night. table = table.assign(epochJD_tz_int=lambda x: np.floor(x.epoch + x.jd_tz)) # epoch is in Julian Days. # table = table.assign(epochJD_tz_int2=lambda x: np.round(x.epochJD + x.jd_tz, 2)) # Below are the weights applied as described per table in: # https://www.sciencedirect.com/science/article/pii/S0019103517301987 # NON-CCD # ################### # TABLE 5: Non-CCD residuals # Conditions for Table 5 # 1890-1-1 and 1950-1-1 pre_1890 = table.epoch <= 2411368.0 # check on Julian Period between_1890_1950 = (table.epoch > 2411368.0) & (table.epoch <= 2433282.0) after_1950 = table.epoch > 2433282.0 photographic = table.note2.isin([np.nan, "P", "A", "N", "Z"]) occultations = table.note2 == "E" hipparcos = table.note2 == "H" transit_circle = table.note2 == "T" encoder = table.note2 == "e" micrometer = table.note2 == "M" satellite = table.note2.isin(["S", "s"]) multinormal_place = table.note2 == "n" # Apply Table 5 table.inv_w = table.inv_w.mask((photographic & pre_1890), 10.0) table.inv_w = table.inv_w.mask((photographic & between_1890_1950), 5.0) table.inv_w = table.inv_w.mask((photographic & after_1950), 2.5) table.inv_w = table.inv_w.mask((occultations), 0.2) table.inv_w = table.inv_w.mask((hipparcos), 0.2) table.inv_w = table.inv_w.mask((transit_circle), 0.5) table.inv_w = table.inv_w.mask((encoder), 0.75) table.inv_w = table.inv_w.mask((micrometer), 2.0) table.inv_w = table.inv_w.mask((satellite), 1.5) table.inv_w = table.inv_w.mask((multinormal_place), 1.0) # ################### # CCD # ################### # TABLE 3: astroid observers # Table 3 conditions: # NOTE for now CCD will receive the same base weighting as CCD. # CMOS sensors for astronomy is a new development. ccd = table.note2.isin(["C", "c", "D"]) cmos = table.note2.isin(["B"]) ccd = ccd | cmos tab3_no_catalog = table.catalog.isin(["unknown", np.nan]) # Apply Table 3: table.inv_w = table.inv_w.mask((ccd & ~tab3_no_catalog), 1.0) table.inv_w = table.inv_w.mask((ccd & tab3_no_catalog), 1.5) table.inv_w = table.inv_w.mask((table.observatory == "704"), 1.0) table.inv_w = table.inv_w.mask((table.observatory == "G96"), 0.5) table.inv_w = table.inv_w.mask((table.observatory == "F51"), 0.2) table.inv_w = table.inv_w.mask((table.observatory == "G45"), 0.6) table.inv_w = table.inv_w.mask((table.observatory == "699"), 0.8) table.inv_w = table.inv_w.mask((table.observatory == "D29"), 0.75) table.inv_w = table.inv_w.mask((table.observatory == "C51"), 1.0) table.inv_w = table.inv_w.mask((table.observatory == "E12"), 0.75) table.inv_w = table.inv_w.mask((table.observatory == "608"), 0.6) table.inv_w = table.inv_w.mask((table.observatory == "J75"), 1.0) # ################### # ################### # TABLE 2: epoch dependant residuals # Table 2 conditions: tab2_703_epoch = table.epoch < 2456658.0 # 2014-1-1 tab2_691_epoch = table.epoch < 2452640.0 # 2003-1-1 tab2_644_epoch = table.epoch < 2452883.0 # 2003-9-1 # Apply Table 2: table.inv_w = table.inv_w.mask((tab2_703_epoch & (table.observatory == "703")), 1.0) table.inv_w = table.inv_w.mask( (~tab2_703_epoch & (table.observatory == "703")), 0.8 ) table.inv_w = table.inv_w.mask((tab2_691_epoch & (table.observatory == "691")), 0.6) table.inv_w = table.inv_w.mask( (~tab2_691_epoch & (table.observatory == "691")), 0.5 ) table.inv_w = table.inv_w.mask((tab2_644_epoch & (table.observatory == "644")), 0.6) table.inv_w = table.inv_w.mask( (~tab2_644_epoch & (table.observatory == "644")), 0.4 ) # ################### # ################### # TABLE 4: Catalog dependant + NEO observers # Table 4 conditions: # NOTE these are the LCO observatories described in the paper LCO_original = [ "K92", "K93", "Q63", "Q64", "V37", "W84", "W85", "W86", "W87", "K91", "E10", "F65", ] # NOTE these are additional LCO observatories, mostly online after publication # Aqawan and Clamshell (0.4m) style observatories have not been included due to their previous omssion in the paper. # Those thus assume the default inv_weight of 1.0 # Since remaining 1m telescopes are duplicates of existing observatories, they are included. # Dates retrieved from: https://sbnmpc.astro.umd.edu/mpecwatch/obs.html # See also: https://lco.global/observatory/sites/mpccodes/ LCO_new = [ # "L09", # online 2018 Aqawan # "Q58", # online 2017 Clamshell # "Q59", # online 2017 Clamshell # "T03", # online 2017 Clamshell # "T04", # online 2016 Clamshell # "V38", # online 2018 Aqawan "V39", # online 2019 1m # "W79", # online 2018 Aqawan # "W89", # online 2015 Aqawan # "Z17", # online 2017 Aqawan # "Z21", # online 2015 Aqawan "Z24", # online 2021 1m "Z31", # online 2021 1m ] LCO_obs = LCO_original + LCO_new MAUNAKEA_obs = ["T09", "T12", "T14"] tab4_LCO_observatories = table.observatory.isin(LCO_obs) # NOTE see catalog codes info: https://www.minorplanetcenter.net/iau/info/CatalogueCodes.html tab4_Catalog_UCAC4 = table.catalog == "q" tab4_Catalog_PPMXL = table.catalog == "t" tab4_Catalog_GAIA = table.catalog.isin(["U", "V", "W", "X", "3", "6"]) tab4_Catalog_USNOB12 = table.catalog.isin(["o", "s"]) tab4_G83_UCAC4_PPMXL = ( (table.observatory == "G83") & (tab4_Catalog_UCAC4 | tab4_Catalog_PPMXL) ) tab4_G83_GAIA = (table.observatory == "G83") & tab4_Catalog_GAIA tab4_Y28_GAIA_PPMXL = ( (table.observatory == "Y28") & (tab4_Catalog_PPMXL & tab4_Catalog_GAIA) ) tab4_568_USNOB = (table.observatory == "568") & tab4_Catalog_USNOB12 tab4_568_GAIA = (table.observatory == "568") & tab4_Catalog_GAIA tab4_568_PPMXL = (table.observatory == "568") & tab4_Catalog_PPMXL tab4_T09_T12_T14_GAIA = (table.observatory.isin(MAUNAKEA_obs)) & tab4_Catalog_GAIA tab4_309_UCAC4_PPMXL = ( (table.observatory == "309") & (tab4_Catalog_UCAC4 | tab4_Catalog_PPMXL) ) tab4_309_GAIA = (table.observatory == "309") & tab4_Catalog_GAIA # Apply Table 4: table.inv_w = table.inv_w.mask(table.observatory == "645", 0.3) table.inv_w = table.inv_w.mask(table.observatory == "673", 0.3) table.inv_w = table.inv_w.mask(table.observatory == "689", 0.5) table.inv_w = table.inv_w.mask(table.observatory == "950", 0.5) table.inv_w = table.inv_w.mask(table.observatory == "H01", 0.3) table.inv_w = table.inv_w.mask(table.observatory == "J04", 0.4) table.inv_w = table.inv_w.mask(tab4_G83_UCAC4_PPMXL, 0.3) table.inv_w = table.inv_w.mask(tab4_G83_GAIA, 0.2) table.inv_w = table.inv_w.mask(tab4_LCO_observatories, 0.4) table.inv_w = table.inv_w.mask(table.observatory == "W84", 0.5) # LCO includes W84? table.inv_w = table.inv_w.mask(tab4_Y28_GAIA_PPMXL, 0.3) table.inv_w = table.inv_w.mask(tab4_568_USNOB, 0.5) table.inv_w = table.inv_w.mask(tab4_568_GAIA, 0.1) table.inv_w = table.inv_w.mask(tab4_568_PPMXL, 0.2) table.inv_w = table.inv_w.mask(tab4_T09_T12_T14_GAIA, 0.1) table.inv_w = table.inv_w.mask(tab4_309_UCAC4_PPMXL, 0.3) table.inv_w = table.inv_w.mask(tab4_309_GAIA, 0.2) # ################### # Transform residual into weight: table = table.assign( weight_pre=lambda x: 1 / np.square(Quantity(x.inv_w, unit=u.arcsec).to(u.rad).value) ) # Reduce weight if there are more than 4 observations that night: # NOTE For satellites, this is done per julian day. # calculate number of observations on one night at one observatory for one target: table = table.assign( observations_on_epoch=lambda x: x.groupby( ["epochJD_tz_int", "observatory", "number"] ).epoch.transform("count") ) # Veres states that, for N > 4, "uncertainties have to be inflated by a factor of sqrt(N)". # This means sigma_new = sigma_old * sqrt(N) # Which also means: weight_new = 1/(sigma_new^2) = W_old/N # Since we want the weight to be 1 when N = 4, we end up dividing by 4. # How this is done in practice: we divide the weight by N if N > 4, else 1. table = table.assign( mult_obs_deweight=lambda x: np.maximum( x.observations_on_epoch / 4, 1.0 ) ) table = table.assign(weight=lambda x: x.weight_pre / x.mult_obs_deweight) if return_full_table: return table else: return table.weight.to_numpy()
[docs] class BatchMPC: """This class provides an interface between observations in the Minor Planet Centre database and Tudat. Notes ---------- Currently, transformations between reference frames are not implemented. As such observations are only returned in the J2000 Frame. Examples ---------- Basic sequence of usage: Initialise and retrieve data: >>> MPCcodes = [1, 4] # Ceres and Vesta >>> batch = BatchMPC() >>> batch.get_observations(MPCcodes) >>> batch.filter(epoch_start=datetime.datetime(2016, 1, 1)) Transform to Tudat format: >>> ... >>> bodies = environment_setup.create_system_of_bodies(body_settings) >>> observation_collection = batch.to_tudat(bodies=bodies, included_satellites=None) """
[docs] def __init__(self) -> None: """Create an empty MPC batch.""" self._table: pd.DataFrame = pd.DataFrame() self._observatories: list[str] = [] self._space_telescopes: list[str] = [] self._bands: list[str] = [] self._MPC_codes: list[str] = [] self._size: int = 0 self._observatory_info: pd.DataFrame | None = None self._MPC_space_telescopes: list[str] = [] self._get_station_info() self._epoch_start: float = 0.0 self._epoch_end: float = 0.0 self._bodies_created = {} self._EFCC18_applied = False self._custom_weights_set = False # for manual additions of table (from_pandas, from_astropy) self._req_cols = ["number", "epoch", "RA", "DEC", "band", "observatory"]
def __copy__(self): new = BatchMPC() new._table = copy.deepcopy(self._table) new._refresh_metadata() return new
[docs] def copy(self) -> "BatchMPC": """Create a copy of the batch, equivalent to copy.copy(batchMPC()) Returns ------- BatchMPC Copy of batch. """ return copy.copy(self)
# getters to make everything read-only @property def table(self) -> pd.DataFrame: """Pandas dataframe with observation data.""" return self._table @property def observatories(self) -> list[str]: """List of observatories in batch.""" return self._observatories @property def space_telescopes(self) -> list[str]: """List of satellite_observatories in batch.""" return self._space_telescopes @property def MPC_objects(self) -> list[str]: """List of MPC objects.""" return self._MPC_codes @property def size(self) -> int: """Number of observations in batch.""" return self._size @property def bands(self) -> list[str]: """List of bands in batch.""" return self._bands @property def epoch_start(self) -> float: """Epoch of oldest observation in batch in seconds since J2000 TDB.""" return self._epoch_start @property def epoch_end(self) -> float: """Epoch of latest observation in batch in seconds since J2000 TDB.""" return self._epoch_end @property def bodies_created(self) -> dict: """Dictionary with the bodies created by to_tudat and details.""" return self._bodies_created def __len__(self): return self._size def __add__(self, other): temp = BatchMPC() temp._table = ( pd.concat([self._table, other._table]) .sort_values("epoch") # this is expressed in Julian Days .drop_duplicates() ) temp._refresh_metadata() return temp # helper functions def _refresh_metadata(self) -> None: """Internal. Update batch metadata.""" self._table.drop_duplicates() self._observatories = list(self._table.observatory.unique()) self._space_telescopes = [ x for x in self._observatories if x in self._MPC_space_telescopes ] if "bands" in self._table.columns: self._bands = list(self._table.band.unique()) # if user gives custom name, set that as body name, else MPC code if "custom_name" in self._table.columns and self._table["custom_name"].notna().any(): self._MPC_codes = list(self._table["custom_name"].unique()) else: self._MPC_codes = list(self._table.number.unique()) self._size = len(self._table) self._epoch_start = self._table.epoch_seconds_TDB.min() self._epoch_end = self._table.epoch_seconds_TDB.max() def _get_station_info(self) -> None: """Internal. Retrieve data on MPC listed observatories.""" try: temp = MPC.get_observatory_codes().to_pandas() temp["Code"] = temp["Code"].astype(str).str.strip().str.zfill(3) # This query checks if Longitude is Nan: non-terrestrial telescopes sats = list(temp.query("Longitude != Longitude").Code.values) self._observatory_info = temp self._MPC_space_telescopes = sats except Exception as e: print("An error occured while retrieving observatory data") print(e) def _add_observatory_positions( self, bodies: environment.SystemOfBodies, earth_name ) -> None: """Internal. Add observatory cartesian postions to station data""" temp = self._observatory_info if temp is None: # This error will probably never occur txt = """Observatory positions can not be assigned. This is likely due to a failure in retrieving the observatories.""" raise ValueError(txt) r_earth = bodies.get(earth_name).shape_model.average_radius # Add geocentric cartesian positions temp = ( temp.assign(X=lambda x: x.cos * r_earth * np.cos(np.radians(x.Longitude))) .assign(Y=lambda x: x.cos * r_earth * np.sin(np.radians(x.Longitude))) .assign(Z=lambda x: x.sin * r_earth) ) self._observatory_info = temp def _standardize_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: """Internal helper to ensure IDs are strings and observatories are zero-padded.""" # Work on a copy to avoid SettingWithCopyWarnings df = df.copy() # Standardize Observatory Codes (e.g., 673 -> "673", 89 -> "089", C51 -> "C51") if "observatory" in df.columns: df["observatory"] = df["observatory"].astype(str).str.strip().str.zfill(3) # Standardize MPC Numbers (e.g., 433 -> "433") if "number" in df.columns: df["number"] = df["number"].astype(str).str.strip() return df def _apply_EFCC18( self, bias_file=None, Nside=None, catalog_flags=None, ): """Internal, applies star catalog biases based on Internal, applies star catalog biases based on 'Star catalog position and proper motion corrections in asteroid astrometry II: The Gaia era' by Eggl et al. (2018) """ if self._EFCC18_applied: pass else: RA_corr, DEC_corr = get_biases_EFCC18( RA=self.table.RA.to_numpy(), DEC=self.table.DEC.to_numpy(), epoch_seconds_TDB=self.table.epoch_seconds_TDB.to_numpy(), catalog=self.table.catalog.to_numpy(), bias_file=bias_file, Nside=Nside, catalog_flags=catalog_flags, ) self._table = self._table.assign(RA_EFCC18=lambda x: x.RA - RA_corr) self._table = self._table.assign(DEC_EFCC18=lambda x: x.DEC - DEC_corr) self._table = self._table.assign(corr_RA_EFCC18=lambda x: RA_corr) self._table = self._table.assign(corr_DEC_EFCC18=lambda x: DEC_corr) self._EFCC18_applied = True def _add_time_columns(self, table: pd.DataFrame) -> pd.DataFrame: """ Internal helper to add standardized time columns to an observation table. This function takes a DataFrame with an 'epoch' column (in Julian Days, UTC) and adds 'epoch_seconds_TDB' and 'epoch_seconds_UTC' columns. Parameters ---------- table : pd.DataFrame The input DataFrame, which must contain an 'epoch' column. Returns ------- pd.DataFrame The DataFrame with the new time columns added. """ # Create DateTime objects from the Julian Day 'epoch' column dt_objects = [DateTime.from_julian_day(jd) for jd in table['epoch']] # Get the default time scale converter time_scale_converter = time_representation.default_time_scale_converter() augmented_table = table.copy() # Add 'epoch_seconds_UTC' column by converting DateTime Objects to epoch augmented_table["epoch_seconds_UTC"] = [dt_obj.epoch() for dt_obj in dt_objects] # Add 'epoch_seconds_TDB' column by converting from UTC to TDB augmented_table["epoch_seconds_TDB"] = [ time_scale_converter.convert_time( input_scale=time_representation.utc_scale, output_scale=time_representation.tdb_scale, input_value=t_utc, ) for t_utc in augmented_table["epoch_seconds_UTC"] ] return augmented_table def _add_custom_name_column(self, table: pd.DataFrame, custom_name) -> pd.DataFrame: augmented_table = table.copy() augmented_table["custom_name"] = [custom_name]*len(augmented_table) return augmented_table # data retrieval options: from_file: allows external observations to be added
[docs] def from_file(self, filename: str, in_degrees: bool = False, frame: str = "J2000", custom_name: str | None = None) -> None: """ Loads observations from a local MPC 80-column text file. This method serves as a high-level convenience function that orchestrates the parsing of a raw 80-column file and loading the data into the batch. It uses the `tudatpy.data.mpc.parser_80col.parse_80cols_file` function internally. The parser returns an Astropy Table with RA/DEC values in radians, so this method subsequently calls `from_astropy` to ingest the data. The `in_degrees` parameter should therefore be `False` (the default). Note ---- If you wish to perform intermediate operations on the parsed data using pandas, you can call the parser directly and then use the `from_pandas` method: .. code-block:: python from tudatpy.data.mpc.parser_80col import parse_80cols_file # 1. Parse the file to an Astropy Table astropy_table = parse_80cols_file("my_obs.txt") # 2. Convert to a pandas DataFrame for manipulation pandas_df = astropy_table.to_pandas() # ... perform custom pandas operations on pandas_df ... # 3. Load the processed DataFrame into the batch batch = BatchMPC() batch.from_pandas(pandas_df, in_degrees=False) Parameters ---------- filename : str The path to the MPC 80-column formatted text file. in_degrees : bool, optional Specifies the unit of RA/DEC in the file. Since the internal parser handles the conversion to radians, this should be left as `False`. Defaults to False. frame : str, optional The reference frame of the observations. Currently, only "J2000" is supported. Defaults to "J2000". """ # Use the dedicated parser submodule to parse the external file. # This function returns an astropy.Table with RA/DEC in radians. astropy_table = parse_80cols_file(filename) # Use the from_astropy method to validate and add the data. # in_degrees is False because the parser has already converted to radians. self.from_astropy(astropy_table, in_degrees=in_degrees, frame=frame, custom_name=custom_name)
# data retrieval options: get_observations: retrieves data from mpc through astroquery.
[docs] def get_observations( self, MPCcodes: list[str | int], id_types: list[str | None] | None = None, drop_misc_observations: bool = True, custom_name: str | None = None ) -> None: """Retrieve all observations for a set of MPC listed objects. This method uses astroquery to retrieve the observations from the MPC. An internet connection is required, observations are cached for faster subsequent retrieval. Removes duplicate and irrelevant observation data by default (see `drop_misc_observations`). Parameters ---------- MPCcodes : list[str | int] List of integer or str MPC object codes for minor planets or comets. id_types : list[str | None] | None, default None A list of identification types ('asteroid_number', 'comet_number', 'comet_designation') corresponding to each MPCcode. If an element is None, the type is considered unknown. If the entire list is None, all types are considered unknown. drop_misc_observations : bool, default True Drops observations made by method: radar and offset (natural satellites). Drops observations made by roaming observers. Drops duplicate listings to denote first observation. """ if not isinstance(MPCcodes, list): raise ValueError("MPCcodes parameter must be a list of integers/strings") # 1. If id_types is not provided, create a list of Nones if id_types is None: id_types = [None] * len(MPCcodes) # 2. Ensure the lists have the same length for a 1-to-1 mapping if len(MPCcodes) != len(id_types): raise ValueError("MPCcodes and id_types must have the same number of elements.") for code, id_type in zip(MPCcodes, id_types): if not (isinstance(code, int) or isinstance(code, str)): raise ValueError( "All codes in the MPCcodes parameter must be integers or strings" ) # 3. Conditionally call the function based on whether id_type exists if id_type is not None: obs = MPC.get_observations(code, id_type=id_type).to_pandas() else: obs = MPC.get_observations(code).to_pandas() obs = self._standardize_dataframe(obs) obs = self._add_custom_name_column(obs, custom_name) # convert JD to J2000 and UTC, convert deg to rad obs = self._add_time_columns(obs) obs = obs.assign( RA=lambda x: (np.radians(x.RA) + np.pi) % (2 * np.pi) - np.pi, DEC=lambda x: np.radians(x.DEC) ) identifier = None if drop_misc_observations: obs = obs.query("note2 not in @OBS_TYPES_TO_DROP") # Check for Comets/Interstellars (Astroquery returns 'comet_type' or 'comettype') # If we have a number and a type, combine them (e.g., 3 + I = 3I) type_col = None if 'comet_type' in obs.columns: type_col = 'comet_type' elif 'comettype' in obs.columns: type_col = 'comettype' if type_col and pd.notna(obs[type_col].iloc[0]): # checks first digit is not NA # It is a comet or interstellar object number_part = str(obs['number'].iloc[0]) type_part = str(obs[type_col].iloc[0]) identifier = f"{number_part}{type_part}" # Result: "3I" elif 'number' in obs.columns: pd.set_option('future.no_silent_downcasting', True) valid_numbers = obs['number'].dropna().astype(str).replace('<NA>', np.nan).dropna() if not valid_numbers.empty: potential_id = valid_numbers.iloc[0] else: # fallback to designation if no number has been assigned yet valid_designations = obs['desig'].dropna().astype(str).replace('<NA>', np.nan).dropna() potential_id = valid_designations.iloc[0] # We allow alphanumeric strings now (to support packed numbers like D4341) # We only pad if it is a short digit string (e.g. '1' -> '00001') # Packed strings are always 5 chars long, so they won't be affected by zfill(5) if len(potential_id) < 5: potential_id = potential_id.zfill(5) try: # Try to unpack it. This handles '00001' and 'D4341'. identifier = unpackers.unpack_permanent_minor_planet(potential_id) except Exception: # If unpacking fails (e.g. it was already unpacked or invalid), # we keep the potential_id as is. identifier = potential_id if identifier is None and 'desig' in obs.columns and pd.notna(obs['desig'].iloc[0]): identifier = str(obs['desig'].iloc[0]) if identifier is None: print(f"Warning: Could not find a valid identifier for object code {code}. Skipping.") continue # Assign the identifier to the 'number' column for the entire DataFrame. obs.loc[:, "number"] = identifier self._table = pd.concat([self._table, obs]) self._refresh_metadata()
def _add_table(self, table: pd.DataFrame, custom_name: str | None, in_degrees: bool = True): """Internal. Formats a manually entered table of observations, used in from_astropy and in from_pandas. """ obs = table obs = self._add_time_columns(obs) obs = self._add_custom_name_column(obs, custom_name) if in_degrees: obs = obs.assign(RA=lambda x: (np.radians(x.RA) + np.pi ) % (2 * np.pi) - np.pi).assign( DEC=lambda x: np.radians(x.DEC) ) # convert object mpc code to string self._table = pd.concat([self._table, obs]) self._refresh_metadata() def _validate_table(self, table: astropy.table.QTable | astropy.table.Table | pd.DataFrame, frame: str) -> None: """Internal helper to validate the frame and required columns of a table. Parameters ---------- table : astropy.table.QTable | astropy.table.Table | pd.DataFrame The table to validate. frame : str The reference frame to check. """ if frame != "J2000": raise NotImplementedError("Only observations in J2000 are supported currently") # Get column names depending on table type if isinstance(table, (astropy.table.QTable, astropy.table.Table)): colnames = table.colnames nrows = len(table) elif isinstance(table, pd.DataFrame): colnames = table.columns nrows = len(table) else: raise TypeError(f"Unsupported table type: {type(table).__name__}") if not set(self._req_cols).issubset(set(colnames)): raise ValueError(f"Table must include a set of mandatory columns: {self._req_cols}") if nrows == 0: raise ValueError("Table contains zero rows: no valid observations were parsed.")
[docs] def from_astropy(self, table: astropy.table.QTable | astropy.table.Table, in_degrees: bool = True, frame: str = "J2000", custom_name: str | None = None) -> None: """Loads observations from an Astropy Table into the BatchMPC object. This method provides a convenient way to import observation data that has been processed or filtered and is stored in an Astropy Table or QTable. It serves as a wrapper that validates the input before converting it to a pandas DataFrame for internal processing. Args: table (astropy.table.Table): The Astropy Table or QTable containing the observation data. It must include the following columns: 'number', 'epoch', 'RA', 'DEC', 'band', 'observatory'. in_degrees (bool, optional): If True, 'RA' and 'DEC' columns are assumed to be in degrees. If False, they are assumed to be in radians. Defaults to True. frame (str, optional): The reference frame of the observations. Currently, only "J2000" is supported. Defaults to "J2000". Raises: ValueError: If the input `table` is not an Astropy Table/QTable or if it is missing any of the required columns. NotImplementedError: If a `frame` other than 'J2000' is provided. """ if not isinstance(table, (astropy.table.QTable, astropy.table.Table)): raise ValueError("Table must be of type astropy.table.QTable or astropy.table.Table") self._validate_table(table, frame) self._add_table(table=table.to_pandas(), in_degrees=in_degrees, custom_name=custom_name)
[docs] def from_pandas(self, table: pd.DataFrame, in_degrees: bool = True, frame: str = "J2000", custom_name: str | None = None) -> None: """ Loads observations from a pandas DataFrame into the BatchMPC object. The DataFrame must contain the following columns: - 'number': The MPC object code (e.g., '433' for Eros). - 'epoch': The observation epoch in Julian Day format. - 'RA': Right Ascension. - 'DEC': Declination. - 'band': The observation band. - 'observatory': The MPC observatory code. Parameters ---------- table : pd.DataFrame The pandas DataFrame containing the observation data. in_degrees : bool, optional If True, RA and DEC columns in the DataFrame are assumed to be in degrees. If False, they are assumed to be in radians. Defaults to True. frame : str, optional The reference frame of the observations. Currently, only "J2000" is supported. Defaults to "J2000". """ if not isinstance(table, pd.DataFrame): raise ValueError("Table must be of type pandas.DataFrame") self._validate_table(table, frame) self._add_table(table=table, in_degrees=in_degrees, custom_name = custom_name)
[docs] def set_weights( self, weights: list | np.ndarray | pd.Series, ): """Manually set weights per observation. Weights are passed to observation collection when `.to_tudat()` is called. Set the `apply_weights_VFCC17` parameter in `.to_tudat()` to `False` to avoid overwriting. The order of the weights should match the order found in the `.table` parameter. Parameters ---------- weights : list | np.ndarray | pd.Series Iterable with weights per observation. Raises ------ ValueError If the size of weights does not match the number of observations in the batch table. """ if len(weights) != len(self.table): raise ValueError( "Weights vector must have same length as number of observations (BatchMPC.table)." ) if isinstance(weights, pd.Series): # this is to avoid errors with indexing in pandas. weights = weights.to_numpy() self._table.loc[:, "weight"] = weights self._custom_weights_set = True
[docs] def filter( self, bands: list[str] | None = None, catalogs: list[str] | None = None, observation_types: list[str] | None = None, observatories: list[str] | None = None, observatories_exclude: list[str] | None = None, epoch_start: float | datetime.datetime | None = None, epoch_end: float | datetime.datetime | None = None, in_place: bool = True, ) -> "None | BatchMPC": """Filter out observations from the batch. Parameters ---------- bands : list[str] | None, optional List of observation bands to keep in the batch, by default None catalogs : list[str] | None, optional List of star catalogs to keep in the batch. See https://www.minorplanetcenter.net/iau/info/CatalogueCodes.html for the exact encodings used by MPC, by default None observation_types : list[str] | None, optional List of observation types to keep in the batch, e.g. CCD, Photographic etc. See the Note2 section of the MPC format description: https://minorplanetcenter.net/iau/info/OpticalObs.html for the exact encoding, by default None observatories : list[str] | None, optional List of observatories to keep in the batch, by default None observatories_exclude : list[str] | None, optional List of observatories to remove from the batch, by default None epoch_start : float | datetime.datetime | None, optional Start date to include observations from, can be in python datetime in utc\ or the more conventional tudat seconds since j2000 in TDB if float,\ by default None epoch_end : float | datetime.datetime | None, optional Final date to include observations from, can be in python datetime in utc\ or the more conventional tudat seconds since j2000 in TDB if float,\ by default None in_place : bool, optional If true, modify the current batch object.\ If false returns a new object that is\ filtered, currect batch remains unmodified, by default True Raises ------ ValueError Is raised if bands, observatories, or observatories_exclude are not list or None. ValueError Is raised if both observations_exclude and observatories are not None. Returns ------- None or BatchMPC Returns a new instance of BatchMPC that is filtered. """ # basic user input handling assert not isinstance( observatories, int ), "stations parameter must be of type 'str' or 'list[str]'" if not (isinstance(observatories, list) or (observatories is None)): raise ValueError("observatories parameter must be list of strings or None") if not (isinstance(catalogs, list) or (catalogs is None)): raise ValueError("catalogs parameter must be list of strings or None") if not (isinstance(observation_types, list) or (observation_types is None)): raise ValueError( "observation_types parameter must be list of strings or None" ) if not ( isinstance(observatories_exclude, list) or (observatories_exclude is None) ): raise ValueError( "observatories_exclude parameter must be list of strings or None" ) if not (isinstance(bands, list) or (bands is None)): raise ValueError("bands parameter must be list of strings or None") if (observatories is not None) and (observatories_exclude is not None): txt = "Include or exclude observatories, not both at the same time." raise ValueError(txt) if in_place: if bands is not None: self._table = self._table.query("band == @bands") if catalogs is not None: self._table = self._table.query("catalog == @catalogs") if observation_types is not None: self._table = self._table.query("note2 == @observation_types") if observatories is not None: self._table = self._table.query("observatory == @observatories") if observatories_exclude is not None: self._table = self._table.query("observatory != @observatories_exclude") timescale_converter_needed = isinstance(epoch_start, datetime.datetime) or \ isinstance(epoch_end, datetime.datetime) if timescale_converter_needed: # This loads necessary kernels/tables time_scale_converter = time_representation.default_time_scale_converter() else: time_scale_converter = None if epoch_start is not None: if isinstance(epoch_start, float) or isinstance(epoch_start, int): self._table = self._table.query( "epoch_seconds_TDB >= @epoch_start" ) elif isinstance(epoch_start, datetime.datetime): epoch_start_iso_string = epoch_start.isoformat(sep=' ') epoch_start_utc = DateTime.from_iso_string(epoch_start_iso_string).to_epoch() epoch_start_tdb = time_scale_converter.convert_time( input_scale=time_representation.utc_scale, output_scale=time_representation.tdb_scale, input_value=epoch_start_utc) self._table = self._table.query("epoch_seconds_TDB >= @epoch_start_tdb") if epoch_end is not None: if isinstance(epoch_end, float) or isinstance(epoch_end, int): self._table = self._table.query( "epoch_seconds_TDB <= @epoch_end" ) elif isinstance(epoch_end, datetime.datetime): epoch_end_iso_string = epoch_end.isoformat(sep=' ') epoch_end_utc = DateTime.from_iso_string(epoch_end_iso_string).to_epoch() epoch_end_tdb = time_scale_converter.convert_time( input_scale=time_representation.utc_scale, output_scale=time_representation.tdb_scale, input_value=epoch_end_utc) self._table = self._table.query("epoch_seconds_TDB <= @epoch_end_tdb") self._refresh_metadata() return None else: new = self.copy() new.filter( bands=bands, catalogs=catalogs, observation_types=observation_types, observatories=observatories, observatories_exclude=observatories_exclude, epoch_start=epoch_start, epoch_end=epoch_end, in_place=True, ) return new
[docs] def create_observations_from_astropy_table( self, table, station_body: str = "Earth", apply_weights_VFCC17: bool = False, apply_star_catalog_debias: bool = False, debias_kwargs: dict = dict(), in_degrees: bool = True, ) -> observations.ObservationCollection: """ Just like to_tudat(), creates a Tudat ObservationCollection from an Astropy table or pandas DataFrame. Unlike to_tudat(), it does not require a SystemOfBodies object as input. Unlike to_tudat(), apply_weights_VFCC17 and apply_star_catalog_debias flags are set to False by default, as users might have minimal tables available, which do not necessary contain all the fields required by the batchMPC class (e.g. note1,note2, catalog, etc...) This method is useful for creating observation collections when the environment is not defined. Note that, when performing a simulation, you must manually ensure that the ground stations (observatory codes) and bodies (MPC numbers) referenced in this collection are added to your simulation environment. Parameters ---------- table : astropy.table.Table | pd.DataFrame The input table containing observations. Must include 'number', 'observatory', 'epoch', 'RA', and 'DEC' columns. station_body : str, optional The name of the body to which the ground stations (observatories) are attached, by default "Earth". Returns ------- observations.ObservationCollection A collection of observation sets grouped by link. """ # 1. Initialize internal table handling # We temporarily use a BatchMPC instance to leverage existing logic temp_batch = BatchMPC() if isinstance(table, Table): temp_batch.from_astropy(table, in_degrees=in_degrees) else: temp_batch.from_pandas(table, in_degrees=in_degrees) # 2. Apply Star Catalog Debias (EFCC18) # Matches logic in to_tudat() if apply_star_catalog_debias: temp_batch._apply_EFCC18(**debias_kwargs) RA_col = "RA_EFCC18" DEC_col = "DEC_EFCC18" else: RA_col = "RA" DEC_col = "DEC" # 3. Apply Weighting (VFCC17) # Matches logic in to_tudat() if apply_weights_VFCC17: weights_table = get_weights_VFCC17( mpc_table=temp_batch.table, return_full_table=True, ) df = weights_table # Work with the table containing weights else: df = temp_batch.table.copy() # 4. Group by unique Link (Target Body + Observatory) unique_links = df.groupby(["number", "observatory"]) observation_set_list = [] for (mpc_code, observatory_code), group in unique_links: # --- A. Define Link Ends --- link_ends = dict() link_ends[links.transmitter] = links.body_origin_link_end_id(str(mpc_code)) link_ends[links.receiver] = links.body_reference_point_link_end_id( station_body, str(observatory_code) ) link_definition = links.link_definition(link_ends) # --- B. Extract Data --- # Use the pre-calculated TDB seconds from the internal processing times = group["epoch_seconds_TDB"].to_numpy() observables = group[[RA_col, DEC_col]].to_numpy() # --- C. Create SingleObservationSet --- observation_set = observations.create_single_observation_set( model_settings.angular_position_type, link_definition.link_ends, observables, times, links.receiver ) # --- D. Apply Weights --- # Extract weights and flatten for Tudat (RA1, DEC1, RA2, DEC2...) if "weight" in group.columns: weights = group["weight"].to_numpy() weights_flat = np.ravel([weights, weights], "F") observation_set.weights_vector = weights_flat observation_set_list.append(observation_set) return observations.ObservationCollection(observation_set_list)
[docs] def to_tudat( self, bodies: environment.SystemOfBodies, included_satellites: dict[str, str] | None, station_body: str = "Earth", add_sbdb_gravity_model: bool = False, apply_weights_VFCC17: bool = True, apply_star_catalog_debias: bool = True, debias_kwargs: dict = dict(), ) -> observations.ObservationCollection: """Converts the observations in the batch into a Tudat compatible format and sets up the relevant Tudat infrastructure to support estimation. This method does the following:\\ 1. (By Default) Applies star catalog debiasing and estimation weight calculation.\\ 2. Creates an empty body for each minor planet with their MPC code as a name.\\ 3. Adds this body to the system of bodies inputted to the method.\\ 4. Retrieves the global position of the terrestrial observatories in the batch and adds these stations to the Tudat environment.\\ 5. Creates link definitions between each unique terrestrial observatory/ minor planet combination in the batch.\\ 6. (Optionally) creates a link definition between each space telescope / minor planet combination in the batch. This requires an addional input.\\ 7. Creates a `SingleObservationSet` object for each unique link that includes all observations for that link.\\ 8. (By Default) Add the relevant weights to the `SingleObservationSet` per observation.\\ 8. Returns the observations Parameters ---------- bodies : environment.SystemOfBodies SystemOfBodies containing at least the earth to allow for the placement of terrestrial telescopes included_satellites : dict[str, str] | None, optional A dictionary that links the name of a space telescope used by the user with the observatory code in MPC. Used when utilising observations from space telescopes like TESS and WISE. The keys should be the MPC observatory codes. The values should be the bodies' in the user's code. The relevant observatory code can be retrieved using the .observatories_table() method, by default None station_body : str, optional Body to attach ground stations to. Does not need to be changed unless the `Earth` body has been renamed, by default "Earth" station_body : bool, optional Adds a central_sbdb gravity model to the object, generated using JPL's small body database. This option is only available for a limited number of bodies and raises an error if unavailable. See tudatpy.dynamics.environment_setup.gravity_field.central_sbdb for more info. Enabled if True, by default False apply_weights_VFCC17 : bool, optional Applies the weighting scheme as described in: "Statistical analysis of astrometric errors for the most productive asteroid surveys" by Veres et al. (2017). Overwrites custom weights set through the `.set_weights()` method if True, by default True apply_star_catalog_debias : bool, optional Applies star catalog debiasing as described in: "Star catalog position and proper motion corrections in asteroid astrometry II: The Gaia era" by Eggl et al. (2018), by default True debias_kwargs : dict, optional Additional options when applying star catalog debiasing. A different debias file can be set here. Options are set as kwargs using a dictionary, see data._biases.get_biases_EFCC18() for more info, by default dict() Returns ------- observations.ObservationCollection ObservationCollection with the observations found in the batch Examples ---------- Using Space Telescopes Create dictionary to link name in tudat with mpc code in batch: >> sats_dict = {"C57":"TESS"} >> obs = batch1.to_tudat(bodies, included_satellites=sats_dict) """ # apply EFCC18 star catalog corrections: if apply_star_catalog_debias: self._apply_EFCC18(**debias_kwargs) RA_col = "RA_EFCC18" DEC_col = "DEC_EFCC18" else: RA_col = "RA" DEC_col = "DEC" # Calculate observation weights and update table: if apply_weights_VFCC17 and not self._custom_weights_set: temp_table: pd.DataFrame = get_weights_VFCC17( mpc_table=self.table, return_full_table=True, ) self._table.loc[:, "weight"] = temp_table.loc[:, "weight"] else: temp_table = self.table.copy() # start user input validation # Ensure that Earth is in the SystemOfBodies object try: bodies.get(station_body) except Exception as e: print( f"Body {station_body} is not in bodies, if you have renamed Earth, " + "set the station_body parameter to the new name." ) raise e # Add positions of the observatories self._add_observatory_positions(bodies, station_body) # get satellites to include and exclude if included_satellites is not None: sat_obs_codes_included = list(included_satellites.keys()) # this appears unused but is used in a pandas query: sat_obs_codes_excluded = list( set(self._space_telescopes) - set(sat_obs_codes_included) ) # Ensure that the satellite is in the SystemOfBodies object for sat in list(included_satellites.values()): try: bodies.get(sat) except Exception as e: print(f"Body {sat} is not in bodies") raise e else: sat_obs_codes_included = [] # this appears unused but is used in a pandas query: sat_obs_codes_excluded = self._space_telescopes # end user input validation # get relevant stations positions tempStations = self._observatory_info.query("Code == @self.observatories").loc[ :, ["Code", "X", "Y", "Z"] ] # add station positions to the observations observations_table = pd.merge( left=temp_table, right=tempStations, left_on="observatory", right_on="Code", how="left", ) # remove the observations from unused satellite observatories observations_table = observations_table.query("Code != @sat_obs_codes_excluded") # add asteroid bodies to SystemOfBodies object for body in self._MPC_codes: if not bodies.does_body_exist(body): bodies.create_empty_body(str(body)) # save the name of the body added self._bodies_created[str(body)] = "empty body" if add_sbdb_gravity_model: add_gravity_field_model(bodies, str(body), central_sbdb(str(body))) self._bodies_created[str(body)] = "empty body + sbdb gravity" # add ground stations to the earth body for idx in range(len(tempStations)): station_name = tempStations.iloc[idx].Code # skip if it is a satellite observatory if station_name in self._space_telescopes: continue ground_station_settings = environment_setup.ground_station.basic_station( station_name=station_name, station_nominal_position=[ tempStations.iloc[idx].X, tempStations.iloc[idx].Y, tempStations.iloc[idx].Z, ], ) # Check if station already exists if station_name not in bodies.get(station_body).ground_station_list: # Add the ground station to the environment environment_setup.add_ground_station( bodies.get_body(station_body), ground_station_settings ) # get unique combinations of mpc bodies and observatories unique_link_combos = ( observations_table.loc[:, ["number", "observatory"]].drop_duplicates() ).values observation_set_list = [] for combo in unique_link_combos: MPC_number = combo[0] station_name = combo[1] # CREATE LINKS link_ends = dict() # observed body link link_ends[links.transmitter] = links.body_origin_link_end_id(MPC_number) if station_name in sat_obs_codes_included: # link for a satellite sat_name = included_satellites[station_name] link_ends[links.receiver] = links.body_origin_link_end_id(sat_name) link_definition = links.link_definition(link_ends) else: # link for a ground station link_ends[links.receiver] = links.body_reference_point_link_end_id( station_body, station_name ) link_definition = links.link_definition(link_ends) # get observations, angles and times for this specific link observations_for_this_link = observations_table.query( "number == @MPC_number" ).query("observatory == @station_name") observation_angles = observations_for_this_link.loc[ :, [RA_col, DEC_col] ].to_numpy() observation_times = observations_for_this_link.loc[ :, ["epoch_seconds_TDB"] ].to_numpy()[:, 0] # create a set of obs for this link observation_set = observations.single_observation_set( model_settings.angular_position_type, link_definition, observation_angles, observation_times, links.receiver, ) # apply weights if apply_weights is True or set_weights() has been used. if apply_weights_VFCC17 or self._custom_weights_set: observation_weights = observations_for_this_link.loc[ :, ["weight"] ].to_numpy()[:, 0] # this is to make sure the order is RA1, DEC1, 2, 2, 3, 3 etc. observation_weights = np.ravel( [observation_weights, observation_weights], "F" ) observation_set.weights_vector = observation_weights observation_set_list.append(observation_set) observation_collection = observations.ObservationCollection( observation_set_list ) return observation_collection
[docs] def plot_observations_temporal( self, objects: list[str] | None = None, figsize: tuple[float] = (9.0, 6.0), ): """Generates a matplotlib figure with the declination and right ascension over time. Parameters ---------- objects : list[str] | None, optional List of specific MPC objects in batch to plot, None to plot all , by default None projection : str, optional projection of the figure options are: 'aitoff', 'hammer', 'lambert' and 'mollweide', by default "aitoff" figsize : tuple[float], optional size of the matplotlib figure, by default (15, 7) Returns ------- Matplotlib figure Matplotlib figure with the observations """ fig, (axRA, axDEC) = plt.subplots(2, 1, figsize=figsize, sharex=True) if objects is None: objs = self.MPC_objects else: objs = [str(o) for o in objects] for obj in objs: tab = self._table.query("number == @obj") axRA.scatter( tab.epoch_seconds_TDB, np.degrees(tab.RA), label="MPC: " + obj, marker=".", # linestyle=None, ) axDEC.scatter( tab.epoch_seconds_TDB, np.degrees(tab.DEC), label="MPC: " + obj, marker=".", # linestyle=None, ) axRA.set_ylabel(r"Right Ascension $[\deg]$") axDEC.set_ylabel(r"Declination $[\deg]$") axDEC.set_xlabel(r"Time [year]") axRA.grid() axDEC.grid() axRA.set_title("Right Ascension") axDEC.set_title("Declination") axRA.legend(bbox_to_anchor=(1.01, 1), loc="upper left") buffer = 10 axRA.set_ylim(0 - buffer, 360 + buffer) axDEC.set_ylim(-90 - buffer, 90 + buffer) fig.set_layout_engine("tight") return fig
[docs] def plot_observations_sky( self, objects: list[str] | None = None, projection: str | None = None, figsize: tuple[float] = (14.0, 7.0), ): """Generates a matplotlib figure with the observations' right ascension and declination over time. NOTE: Only plots observations between 1970 and 3000 to ensure safe datetime conversion across platforms. """ fig, ax = plt.subplots( 1, 1, subplot_kw={"projection": projection}, figsize=figsize ) if objects is None: objs = self.MPC_objects else: objs = [str(o) for o in objects] markers = ["o", "+", "^"] # Query all relevant objects first to check bounds all_obj_data = self._table.query("number == @objs") # Convert TDB Seconds -> UTC seconds time_scale_converter = time_representation.default_time_scale_converter() utc_epochs = [time_scale_converter.convert_time( input_scale=time_representation.tdb_scale, output_scale=time_representation.utc_scale, input_value=t) for t in all_obj_data.epoch_seconds_TDB ] # Prepare Data for Plotting # Apply filter: number IN objs query = "number == @objs" plot_data = self._table.query(query) # Compute global vmin/vmax for consistent colorbar vmin = min(utc_epochs) vmax = max(utc_epochs) plot_handle = None for idx, obj in enumerate(objs): tab = plot_data.query("number == @obj") utc_epochs_object = [time_scale_converter.convert_time( input_scale=time_representation.tdb_scale, output_scale=time_representation.utc_scale, input_value=t) for t in tab.epoch_seconds_TDB.values ] if tab.empty: continue plot_handle = ax.scatter( (tab.RA) - np.pi if projection is not None else np.degrees(tab.RA), (tab.DEC) if projection is not None else np.degrees(tab.DEC), s=30, marker=markers[int(idx % len(markers))], cmap=cm.plasma, label="MPC: " + obj, c=utc_epochs_object, vmin=vmin, vmax=vmax, ) ax.legend(markerscale=2) ax.set_xlabel(r"Right Ascension $[\deg]$") ax.set_ylabel(r"Declination $[\deg]$") if projection is not None: try: # Matplotlib projections can sometimes return non-standard ticks yticks = [ f"{x}°" for x in (np.degrees(np.array(ax.get_yticks().tolist()))).astype(int) ] xticks = [ f"{x}°" for x in (np.degrees(np.array(ax.get_xticks().tolist())) + 180).astype(int) ] if projection in ["aitoff", "hammer", "mollweide"]: ax.set_yticklabels(yticks) ax.set_xticklabels(xticks) elif projection == "lambert": ax.set_xticklabels(xticks) except Exception: pass # Add Colorbar with Date formatting if plot_handle: ticks = np.linspace(vmin, vmax, 7) cbar = plt.colorbar(mappable=plot_handle, ax=ax, label="Time", ticks=ticks) # Convert numeric ticks back to readable date strings using Tudatpy's DateTime cbar.ax.set_yticklabels([DateTime.from_epoch(t).to_iso_string().split(' ')[0] for t in ticks]) start_date_str = DateTime.from_epoch(vmin).to_iso_string() end_date_str = DateTime.from_epoch(vmax).to_iso_string() ax.grid() fig.suptitle(f"{len(plot_data)} observations between {start_date_str} and {end_date_str}") fig.set_layout_engine("tight") return fig
[docs] def summary(self): """Produce a quick summary of the content of the batch.""" print() print(" Batch Summary:") print(f"1. Batch includes {len(self._MPC_codes)} minor planets:") print(" ", self.MPC_objects) satObs = len(self._table.query("observatory == @self.space_telescopes")) print( f"2. Batch includes {self.size} observations, including {satObs} " + "observations from space telescopes" ) print( f"3. The observations range from {self._table.epoch_seconds_TDB.min()} " + f"to {self._table.epoch_seconds_TDB.max()}" ) print(f" In seconds TDB since J2000: {self.epoch_start} to {self.epoch_end}") print( f" In Julian Days: " + f"{self._table.epoch.min()}" + f" to {self._table.epoch.max()}" ) print( f"4. The batch contains observations from {len(self.observatories)} " + f"observatories, including {len(self.space_telescopes)} space telescopes" ) print()
[docs] def observatories_table( self, only_in_batch: bool = True, only_space_telescopes: bool = False, exclude_space_telescopes: bool = False, include_positions: bool = False, ) -> pd.DataFrame: """Returns a pandas DataFrame with information about all MPC observatories, Carthesian positions are only available after running the `to_tudat()` method. Parameters ---------- only_in_batch : bool, optional Filter out observatories that are not in the batch, by default True only_space_telescopes : bool, optional Filter out all observatories except space telescopes, by default False only_space_telescopes : bool, optional Filter out all space telescopes, by default False include_positions : bool, optional Include cartesian positions of the terrestrial telescopes only available after running to_tudat(), by default False Returns ------- pd.DataFrame Dataframe with information about the observatories. """ temp = self._observatory_info.copy() temp2 = self._table count_observations = ( temp2.groupby("observatory") .count() .rename(columns={"number": "count"}) .reset_index(drop=False) .loc[:, ["observatory", "count"]] ) temp = temp.copy() count_observations = count_observations.copy() temp = pd.merge( left=temp, right=count_observations, left_on="Code", right_on="observatory", how="left", ) if only_in_batch: temp = temp.query("Code == @self.observatories") if only_space_telescopes: temp = temp.query("Code == @self._MPC_space_telescopes") if exclude_space_telescopes: temp = temp.query("Code != @self._MPC_space_telescopes") if not include_positions: temp = temp.loc[:, ["Code", "Name", "count"]] return temp