from tudatpy.dynamics import environment_setup
from tudatpy.dynamics import environment
from tudatpy.estimation import observations
from tudatpy.estimation.observable_models_setup import (
model_settings,
links,
)
from astropy.table import Table
from tudatpy.dynamics.environment_setup import add_gravity_field_model
from tudatpy.dynamics.environment_setup.gravity_field import central_sbdb
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import datetime
from astroquery.mpc import MPC
from astropy_healpix import HEALPix
from astropy.units import Quantity
import astropy.units as u
import astropy
import copy
import os
import re
from tudatpy.astro import time_representation
from tudatpy.astro.time_representation import DateTime
from tudatpy.data.mpc.parser_80col import parse_80cols_file
from tudatpy.data.mpc.parser_80col import unpackers
# do not remove this line, even if it looks liek an unused import line
from tudatpy.data.mpc.parser_80col.unpackers import OBS_TYPES_TO_DROP
BIAS_LOWRES_FILE = os.path.join(
os.path.expanduser("~"),
".tudat",
"resource",
"star_catalog_biases",
"debias_2018",
"bias.dat",
)
# Described here:
# https://www.minorplanetcenter.net/iau/info/CatalogueCodes.html
DEFAULT_CATALOG_FLAGS = [
"a",
"b",
"c",
"d",
"e",
"g",
"i",
"j",
"l",
"m",
"n",
"o",
"p",
"q",
"r",
"t",
"u",
"v",
"w",
"L",
"N",
"Q",
"R",
"S",
"U",
"Y",
]
[docs]
def load_bias_file(
filepath: str,
Nside: int | None = None,
catalog_flags: list = DEFAULT_CATALOG_FLAGS,
) -> tuple[pd.DataFrame, int]:
"""Loads a healpix star catalog debias file and processes it into a dataframe. Automatically retrieves NSIDE parameter.
Parameters
----------
filepath : str
Filepath of debias file.
Nside : int | None, optional
NSIDE value, to be left None in most cases as this is retrieved automatically by the function, by default None
catalog_flags : list | None, optional
list of catalog flags, should be left default in most cases, by default None
Returns
-------
tuple[pd.DataFrame, int]
Dataframe with biases in multiindex format ((Npix x Ncat) x Nvals), the numpix value
Raises
------
ValueError
If NSIDE cannot be retrieved automatically.
"""
# auto retrieve NSIDE
if Nside is None:
counter = 0
with open(filepath, "r") as file:
while counter < 10:
line = file.readline()
pattern = r"! NSIDE=\s*(\d+)"
match = re.search(pattern, line)
if match:
Nside = int(match.group(1))
break
counter += 1
if Nside is None:
raise ValueError(
"Could not automatically retrieve NSIDE, please provide it as a parameter"
)
if catalog_flags is None:
catalog_flags = DEFAULT_CATALOG_FLAGS
catalog_flags = catalog_flags + ["unknown"]
values = ["RA", "DEC", "PMRA", "PMDEC"]
# create a multi_index, this effectively creates a df with 3 dimensions. [row, catalog, value]
m_index = pd.MultiIndex.from_product(
[catalog_flags, values],
names=["catalog", "value"],
)
bias_dataframe = pd.read_csv(
filepath,
sep=" ",
skiprows=23,
skipinitialspace=True,
index_col=None,
header=None,
).iloc[:, :-1]
# we add a set of 'unknown' columns to speed up assignment later
len_df = bias_dataframe.shape[0]
unknown_columns = np.zeros(shape=(len_df, 4))
bias_dataframe[["aa", "bb", "cc", "dd"]] = unknown_columns
# apply the multi_index
bias_dataframe.columns = m_index
# stack it so it goes from a Npix x (Ncat x Nvals) to (Npix x Ncat) x Nvals shape
bias_dataframe = bias_dataframe.stack(level=0, future_stack = True)
return bias_dataframe, Nside
[docs]
def get_biases_EFCC18(
RA: float | np.ndarray | list,
DEC: float | np.ndarray | list,
epoch_seconds_TDB: float | np.ndarray | list,
catalog: str | np.ndarray | list,
bias_file: str | None = BIAS_LOWRES_FILE,
Nside: int | None = None,
catalog_flags: list[str] = DEFAULT_CATALOG_FLAGS,
) -> tuple[np.ndarray, np.ndarray]:
"""Calculate and return star catalog bias values as described in:
"Star catalog position and proper motion corrections in asteroid astrometry II: The Gaia era" by Eggl et al. (2018).
Uses the regular bias set by default. A high res version of the bias map can be retrieved from the paper.
This can then be selected using the bias_file paramater.
Parameters
----------
RA : float | np.ndarray | list
Right Ascension value in radians
DEC : float | np.ndarray | list
Declination value in radians
epoch_seconds_TDB : float | np.ndarray | list
Time in seconds since J2000 TDB.
catalog : str | np.ndarray | list
Star Catalog code as described by MPC: https://www.minorplanetcenter.net/iau/info/CatalogueCodes.html
bias_file : str | None, optional
Optional bias file location, used to load in alternative debias coefficients. By default coefficients are retrieved from Tudat resources, by default None
Nside : int | None, optional
Optional Nside value, should be left None in most cases, by default None
catalog_flags : list[str] | None, optional
List of catalog values to use, should be left None in most cases, by default None
Returns
-------
tuple[np.ndarray, np.ndarray]
Right Ascencion Corrections, Declination corrections
Raises
------
ValueError
If all mandatory inputs are not matching in size.
"""
if bias_file is None:
bias_file = BIAS_LOWRES_FILE
# transform input to numpy arrays
if not isinstance(RA, np.ndarray):
RA = np.array([RA]).flatten()
if not isinstance(DEC, np.ndarray):
DEC = np.array([DEC]).flatten()
if not isinstance(epoch_seconds_TDB, np.ndarray):
epoch_seconds_TDB = np.array([epoch_seconds_TDB]).flatten()
if not isinstance(catalog, np.ndarray):
catalog = np.array([catalog]).flatten()
if not (len(RA) == len(DEC) == len(epoch_seconds_TDB) == len(catalog)):
raise ValueError("All inputs must have same size")
# load bias file
# index matches the pixels
# this is effectively a 3d table with axes: (pixel, star catalog), value) using pandas multiindex
bias_df, nside = load_bias_file(
filepath=bias_file, Nside=Nside, catalog_flags=catalog_flags
)
# find nearest tile using HEALPix Algorithm and get indices
# ideally nside should be retrieved from the load_bias_file function
hp_obj = HEALPix(nside=nside)
pixels = hp_obj.lonlat_to_healpix(
Quantity(RA, unit=u.rad), Quantity(DEC, unit=u.rad)
)
# retrieve bias values from bias file using indices
# result is N x 4 biases for the correct star catalog
all_catalog_ids = bias_df.index.levels[1].to_list()
# this changes all ids not present in the bias file to unknown, resulting in zero bias
catalog = ["unknown" if (cat not in all_catalog_ids) else cat for cat in catalog]
# create combinations of pixel id and catalog then retrieve biases
targets = [(pix, cat) for pix, cat in zip(pixels, catalog)]
biases = bias_df.loc[targets, ["RA", "DEC", "PMRA", "PMDEC"]].to_numpy()
# same as find_orb -> bias.cpp
# https://github.com/Bill-Gray/find_orb/blob/master/bias.cpp#L213
epochs_years = [time_representation.seconds_since_epoch_to_julian_years_since_epoch(epoch_tdb) for epoch_tdb in epoch_seconds_TDB]
# from the bias file readme.txt:
RA_correction = biases[:, 0] + (epochs_years * (biases[:, 2] / 1000))
RA_correction = RA_correction / np.cos(DEC) # DEC here in radians because of cosine
DEC_correction = biases[:, 1] + (epochs_years * (biases[:, 3] / 1000))
# convert from arcsec to radians
RA_correction = Quantity(RA_correction, unit=u.arcsec).to(u.rad).value
DEC_correction = Quantity(DEC_correction, unit=u.arcsec).to(u.rad).value
return RA_correction, DEC_correction
[docs]
def get_weights_VFCC17(
MPC_codes: pd.Series | list | np.ndarray | None = None,
epoch: pd.Series | list | np.ndarray | None = None,
observation_type: pd.Series | list | np.ndarray | None = None,
observatory: pd.Series | list | np.ndarray | None = None,
star_catalog: pd.Series | list | np.ndarray | None = None,
mpc_table: pd.DataFrame | None = None,
return_full_table=False,
) -> np.ndarray | pd.DataFrame:
"""Retrieves observation weights using the weighting scheme presented in
"Statistical analysis of astrometric errors for the most productive
asteroid surveys" by Veres et al. (2017). Input may be provided using
either a full MPC table (e.g. from BatchMPC) or using the individual
variables.
Observation types: "x", "X", "V", "v", "W", "w", "R", "r", "Q", "q", "O",
are not described by the paper and receive a placeholder weight of 1/100
if provided.
Parameters
----------
MPC_codes : pd.Series | list | np.ndarray | None, optional
Iterable with the MPC target codes, e.g. 433 for Eros. Size must match
other iterables, by default None
epoch : pd.Series | list | np.ndarray | None, optional
Epoch expressed as Julian Days. Size must match other iterables, by default None
observation_type : pd.Series | list | np.ndarray | None, optional
Iterable with the observation types in MPC format.
See the NOTE2 section of the MPC format description for the exact encoding:
https://minorplanetcenter.net/iau/info/OpticalObs.html.
Size must match other iterables, by default None
observatory : pd.Series | list | np.ndarray | None, optional
Iterable with the MPC target codes, e.g. 433 for Eros.
Size must match other iterables, by default None
star_catalog : pd.Series | list | np.ndarray | None, optional
Iterable with the star catalog codes.
See the MPC catalog codes page for the exact encoding:
https://www.minorplanetcenter.net/iau/info/CatalogueCodes.html.
Size must match other iterables, by default None
mpc_table : pd.DataFrame | None, optional
Table retrieved by calling the mpc.BatchMPC.table property.
Set None when using iterable input.
Set others None when using table, by default None
return_full_table : bool, optional
Return the table with all intermediate calculations if True,
return a numpy array if False, by default False
Returns
-------
np.ndarray
If `return_full_table` is False, numpy array with weights with same size as input.
pd.DataFrame
If `return_full_table` is True, pandas table with all intermediate calculations.
Raises
------
ValueError
MPC_codes, epoch, observation_type, observatory and star_catalog must all
be not None and the same size. mpc_table must be None.
If table input is used, the remaining input parameters must be done.
"""
# Input handling
if (
(mpc_table is None)
and (epoch is not None)
and (observation_type is not None)
and (observatory is not None)
and (star_catalog is not None)
):
if not (
len(epoch)
== len(observation_type)
== len(observatory)
== len(star_catalog)
):
raise ValueError("All inputs must have same size")
table_dict = {
"number": MPC_codes,
"epoch": epoch, # This is expressed in Julian Days
"note2": observation_type,
"observatory": observatory,
"catalog": star_catalog,
}
table = pd.DataFrame.from_dict(table_dict)
elif (
(mpc_table is not None)
and (epoch is None)
and (observation_type is None)
and (observatory is None)
and (star_catalog is None)
):
table = mpc_table.copy()
else:
raise ValueError(
"Must provide either parameters: `epoch`, `observation_type`, `observatory` and `star_catalog` OR `mpc_table`."
)
table["observatory"] = table["observatory"].astype(str).str.strip().str.zfill(3)
table["number"] = table["number"].astype(str).str.strip()
# NOTE 1000 is a placeholder. The following observation types are not processed and receive the placeholder value:
# first_discoveries = ["x", "X"], roaming = ["V", "v", "W", "w"], radar = ["R", "r", "Q", "q"], offset = ["O"]
table = table.assign(inv_w=lambda _: 1000)
# get an approximate timezone based on the observatory code's longitude
observatories_table = MPC.get_observatory_codes().to_pandas()
observatories_table["Code"] = observatories_table["Code"].astype(str).str.strip().str.zfill(3)
observatories_table = (
observatories_table.assign(
lon_wrapping=lambda x: (x.Longitude + 180) % 360 - 180
)
.assign(approx_tz=lambda x: ((x.lon_wrapping / 180) * 12))
.assign(jd_tz=lambda x: (x.lon_wrapping / 360).fillna(0))
.loc[:, ["Code", "approx_tz", "jd_tz"]]
)
# the reset_index + set_index is a cheeky way to keep the original table index
# this prevents mismatches when adding weights to the tables in BatchMPC.to_tudat()
table = (
pd.merge(
how="left",
left=table.reset_index(),
right=observatories_table,
left_on="observatory",
right_on="Code",
)
.drop("Code", axis=1)
.set_index("index")
)
# create column that modifies the JD with an approximate timezone.
# the time JD.50 will then be the approximate midnight at that timezone.
# if we then take the int, we can group by this number to get all observations that night.
table = table.assign(epochJD_tz_int=lambda x: np.floor(x.epoch + x.jd_tz)) # epoch is in Julian Days.
# table = table.assign(epochJD_tz_int2=lambda x: np.round(x.epochJD + x.jd_tz, 2))
# Below are the weights applied as described per table in:
# https://www.sciencedirect.com/science/article/pii/S0019103517301987
# NON-CCD
# ###################
# TABLE 5: Non-CCD residuals
# Conditions for Table 5
# 1890-1-1 and 1950-1-1
pre_1890 = table.epoch <= 2411368.0 # check on Julian Period
between_1890_1950 = (table.epoch > 2411368.0) & (table.epoch <= 2433282.0)
after_1950 = table.epoch > 2433282.0
photographic = table.note2.isin([np.nan, "P", "A", "N", "Z"])
occultations = table.note2 == "E"
hipparcos = table.note2 == "H"
transit_circle = table.note2 == "T"
encoder = table.note2 == "e"
micrometer = table.note2 == "M"
satellite = table.note2.isin(["S", "s"])
multinormal_place = table.note2 == "n"
# Apply Table 5
table.inv_w = table.inv_w.mask((photographic & pre_1890), 10.0)
table.inv_w = table.inv_w.mask((photographic & between_1890_1950), 5.0)
table.inv_w = table.inv_w.mask((photographic & after_1950), 2.5)
table.inv_w = table.inv_w.mask((occultations), 0.2)
table.inv_w = table.inv_w.mask((hipparcos), 0.2)
table.inv_w = table.inv_w.mask((transit_circle), 0.5)
table.inv_w = table.inv_w.mask((encoder), 0.75)
table.inv_w = table.inv_w.mask((micrometer), 2.0)
table.inv_w = table.inv_w.mask((satellite), 1.5)
table.inv_w = table.inv_w.mask((multinormal_place), 1.0)
# ###################
# CCD
# ###################
# TABLE 3: astroid observers
# Table 3 conditions:
# NOTE for now CCD will receive the same base weighting as CCD.
# CMOS sensors for astronomy is a new development.
ccd = table.note2.isin(["C", "c", "D"])
cmos = table.note2.isin(["B"])
ccd = ccd | cmos
tab3_no_catalog = table.catalog.isin(["unknown", np.nan])
# Apply Table 3:
table.inv_w = table.inv_w.mask((ccd & ~tab3_no_catalog), 1.0)
table.inv_w = table.inv_w.mask((ccd & tab3_no_catalog), 1.5)
table.inv_w = table.inv_w.mask((table.observatory == "704"), 1.0)
table.inv_w = table.inv_w.mask((table.observatory == "G96"), 0.5)
table.inv_w = table.inv_w.mask((table.observatory == "F51"), 0.2)
table.inv_w = table.inv_w.mask((table.observatory == "G45"), 0.6)
table.inv_w = table.inv_w.mask((table.observatory == "699"), 0.8)
table.inv_w = table.inv_w.mask((table.observatory == "D29"), 0.75)
table.inv_w = table.inv_w.mask((table.observatory == "C51"), 1.0)
table.inv_w = table.inv_w.mask((table.observatory == "E12"), 0.75)
table.inv_w = table.inv_w.mask((table.observatory == "608"), 0.6)
table.inv_w = table.inv_w.mask((table.observatory == "J75"), 1.0)
# ###################
# ###################
# TABLE 2: epoch dependant residuals
# Table 2 conditions:
tab2_703_epoch = table.epoch < 2456658.0 # 2014-1-1
tab2_691_epoch = table.epoch < 2452640.0 # 2003-1-1
tab2_644_epoch = table.epoch < 2452883.0 # 2003-9-1
# Apply Table 2:
table.inv_w = table.inv_w.mask((tab2_703_epoch & (table.observatory == "703")), 1.0)
table.inv_w = table.inv_w.mask(
(~tab2_703_epoch & (table.observatory == "703")), 0.8
)
table.inv_w = table.inv_w.mask((tab2_691_epoch & (table.observatory == "691")), 0.6)
table.inv_w = table.inv_w.mask(
(~tab2_691_epoch & (table.observatory == "691")), 0.5
)
table.inv_w = table.inv_w.mask((tab2_644_epoch & (table.observatory == "644")), 0.6)
table.inv_w = table.inv_w.mask(
(~tab2_644_epoch & (table.observatory == "644")), 0.4
)
# ###################
# ###################
# TABLE 4: Catalog dependant + NEO observers
# Table 4 conditions:
# NOTE these are the LCO observatories described in the paper
LCO_original = [
"K92",
"K93",
"Q63",
"Q64",
"V37",
"W84",
"W85",
"W86",
"W87",
"K91",
"E10",
"F65",
]
# NOTE these are additional LCO observatories, mostly online after publication
# Aqawan and Clamshell (0.4m) style observatories have not been included due to their previous omssion in the paper.
# Those thus assume the default inv_weight of 1.0
# Since remaining 1m telescopes are duplicates of existing observatories, they are included.
# Dates retrieved from: https://sbnmpc.astro.umd.edu/mpecwatch/obs.html
# See also: https://lco.global/observatory/sites/mpccodes/
LCO_new = [
# "L09", # online 2018 Aqawan
# "Q58", # online 2017 Clamshell
# "Q59", # online 2017 Clamshell
# "T03", # online 2017 Clamshell
# "T04", # online 2016 Clamshell
# "V38", # online 2018 Aqawan
"V39", # online 2019 1m
# "W79", # online 2018 Aqawan
# "W89", # online 2015 Aqawan
# "Z17", # online 2017 Aqawan
# "Z21", # online 2015 Aqawan
"Z24", # online 2021 1m
"Z31", # online 2021 1m
]
LCO_obs = LCO_original + LCO_new
MAUNAKEA_obs = ["T09", "T12", "T14"]
tab4_LCO_observatories = table.observatory.isin(LCO_obs)
# NOTE see catalog codes info: https://www.minorplanetcenter.net/iau/info/CatalogueCodes.html
tab4_Catalog_UCAC4 = table.catalog == "q"
tab4_Catalog_PPMXL = table.catalog == "t"
tab4_Catalog_GAIA = table.catalog.isin(["U", "V", "W", "X", "3", "6"])
tab4_Catalog_USNOB12 = table.catalog.isin(["o", "s"])
tab4_G83_UCAC4_PPMXL = (
(table.observatory == "G83") & (tab4_Catalog_UCAC4 | tab4_Catalog_PPMXL)
)
tab4_G83_GAIA = (table.observatory == "G83") & tab4_Catalog_GAIA
tab4_Y28_GAIA_PPMXL = (
(table.observatory == "Y28") & (tab4_Catalog_PPMXL & tab4_Catalog_GAIA)
)
tab4_568_USNOB = (table.observatory == "568") & tab4_Catalog_USNOB12
tab4_568_GAIA = (table.observatory == "568") & tab4_Catalog_GAIA
tab4_568_PPMXL = (table.observatory == "568") & tab4_Catalog_PPMXL
tab4_T09_T12_T14_GAIA = (table.observatory.isin(MAUNAKEA_obs)) & tab4_Catalog_GAIA
tab4_309_UCAC4_PPMXL = (
(table.observatory == "309") & (tab4_Catalog_UCAC4 | tab4_Catalog_PPMXL)
)
tab4_309_GAIA = (table.observatory == "309") & tab4_Catalog_GAIA
# Apply Table 4:
table.inv_w = table.inv_w.mask(table.observatory == "645", 0.3)
table.inv_w = table.inv_w.mask(table.observatory == "673", 0.3)
table.inv_w = table.inv_w.mask(table.observatory == "689", 0.5)
table.inv_w = table.inv_w.mask(table.observatory == "950", 0.5)
table.inv_w = table.inv_w.mask(table.observatory == "H01", 0.3)
table.inv_w = table.inv_w.mask(table.observatory == "J04", 0.4)
table.inv_w = table.inv_w.mask(tab4_G83_UCAC4_PPMXL, 0.3)
table.inv_w = table.inv_w.mask(tab4_G83_GAIA, 0.2)
table.inv_w = table.inv_w.mask(tab4_LCO_observatories, 0.4)
table.inv_w = table.inv_w.mask(table.observatory == "W84", 0.5) # LCO includes W84?
table.inv_w = table.inv_w.mask(tab4_Y28_GAIA_PPMXL, 0.3)
table.inv_w = table.inv_w.mask(tab4_568_USNOB, 0.5)
table.inv_w = table.inv_w.mask(tab4_568_GAIA, 0.1)
table.inv_w = table.inv_w.mask(tab4_568_PPMXL, 0.2)
table.inv_w = table.inv_w.mask(tab4_T09_T12_T14_GAIA, 0.1)
table.inv_w = table.inv_w.mask(tab4_309_UCAC4_PPMXL, 0.3)
table.inv_w = table.inv_w.mask(tab4_309_GAIA, 0.2)
# ###################
# Transform residual into weight:
table = table.assign(
weight_pre=lambda x: 1
/ np.square(Quantity(x.inv_w, unit=u.arcsec).to(u.rad).value)
)
# Reduce weight if there are more than 4 observations that night:
# NOTE For satellites, this is done per julian day.
# calculate number of observations on one night at one observatory for one target:
table = table.assign(
observations_on_epoch=lambda x: x.groupby(
["epochJD_tz_int", "observatory", "number"]
).epoch.transform("count")
)
# Veres states that, for N > 4, "uncertainties have to be inflated by a factor of sqrt(N)".
# This means sigma_new = sigma_old * sqrt(N)
# Which also means: weight_new = 1/(sigma_new^2) = W_old/N
# Since we want the weight to be 1 when N = 4, we end up dividing by 4.
# How this is done in practice: we divide the weight by N if N > 4, else 1.
table = table.assign(
mult_obs_deweight=lambda x: np.maximum(
x.observations_on_epoch / 4, 1.0
)
)
table = table.assign(weight=lambda x: x.weight_pre / x.mult_obs_deweight)
if return_full_table:
return table
else:
return table.weight.to_numpy()
[docs]
class BatchMPC:
"""This class provides an interface between observations
in the Minor Planet Centre database and Tudat.
Notes
----------
Currently, transformations between reference frames are not implemented.
As such observations are only returned in the J2000 Frame.
Examples
----------
Basic sequence of usage:
Initialise and retrieve data:
>>> MPCcodes = [1, 4] # Ceres and Vesta
>>> batch = BatchMPC()
>>> batch.get_observations(MPCcodes)
>>> batch.filter(epoch_start=datetime.datetime(2016, 1, 1))
Transform to Tudat format:
>>> ...
>>> bodies = environment_setup.create_system_of_bodies(body_settings)
>>> observation_collection = batch.to_tudat(bodies=bodies, included_satellites=None)
"""
[docs]
def __init__(self) -> None:
"""Create an empty MPC batch."""
self._table: pd.DataFrame = pd.DataFrame()
self._observatories: list[str] = []
self._space_telescopes: list[str] = []
self._bands: list[str] = []
self._MPC_codes: list[str] = []
self._size: int = 0
self._observatory_info: pd.DataFrame | None = None
self._MPC_space_telescopes: list[str] = []
self._get_station_info()
self._epoch_start: float = 0.0
self._epoch_end: float = 0.0
self._bodies_created = {}
self._EFCC18_applied = False
self._custom_weights_set = False
# for manual additions of table (from_pandas, from_astropy)
self._req_cols = ["number", "epoch", "RA", "DEC", "band", "observatory"]
def __copy__(self):
new = BatchMPC()
new._table = copy.deepcopy(self._table)
new._refresh_metadata()
return new
[docs]
def copy(self) -> "BatchMPC":
"""Create a copy of the batch, equivalent to copy.copy(batchMPC())
Returns
-------
BatchMPC
Copy of batch.
"""
return copy.copy(self)
# getters to make everything read-only
@property
def table(self) -> pd.DataFrame:
"""Pandas dataframe with observation data."""
return self._table
@property
def observatories(self) -> list[str]:
"""List of observatories in batch."""
return self._observatories
@property
def space_telescopes(self) -> list[str]:
"""List of satellite_observatories in batch."""
return self._space_telescopes
@property
def MPC_objects(self) -> list[str]:
"""List of MPC objects."""
return self._MPC_codes
@property
def size(self) -> int:
"""Number of observations in batch."""
return self._size
@property
def bands(self) -> list[str]:
"""List of bands in batch."""
return self._bands
@property
def epoch_start(self) -> float:
"""Epoch of oldest observation in batch in seconds since J2000 TDB."""
return self._epoch_start
@property
def epoch_end(self) -> float:
"""Epoch of latest observation in batch in seconds since J2000 TDB."""
return self._epoch_end
@property
def bodies_created(self) -> dict:
"""Dictionary with the bodies created by to_tudat and details."""
return self._bodies_created
def __len__(self):
return self._size
def __add__(self, other):
temp = BatchMPC()
temp._table = (
pd.concat([self._table, other._table])
.sort_values("epoch") # this is expressed in Julian Days
.drop_duplicates()
)
temp._refresh_metadata()
return temp
# helper functions
def _refresh_metadata(self) -> None:
"""Internal. Update batch metadata."""
self._table.drop_duplicates()
self._observatories = list(self._table.observatory.unique())
self._space_telescopes = [
x for x in self._observatories if x in self._MPC_space_telescopes
]
if "bands" in self._table.columns:
self._bands = list(self._table.band.unique())
# if user gives custom name, set that as body name, else MPC code
if "custom_name" in self._table.columns and self._table["custom_name"].notna().any():
self._MPC_codes = list(self._table["custom_name"].unique())
else:
self._MPC_codes = list(self._table.number.unique())
self._size = len(self._table)
self._epoch_start = self._table.epoch_seconds_TDB.min()
self._epoch_end = self._table.epoch_seconds_TDB.max()
def _get_station_info(self) -> None:
"""Internal. Retrieve data on MPC listed observatories."""
try:
temp = MPC.get_observatory_codes().to_pandas()
temp["Code"] = temp["Code"].astype(str).str.strip().str.zfill(3)
# This query checks if Longitude is Nan: non-terrestrial telescopes
sats = list(temp.query("Longitude != Longitude").Code.values)
self._observatory_info = temp
self._MPC_space_telescopes = sats
except Exception as e:
print("An error occured while retrieving observatory data")
print(e)
def _add_observatory_positions(
self, bodies: environment.SystemOfBodies, earth_name
) -> None:
"""Internal. Add observatory cartesian postions to station data"""
temp = self._observatory_info
if temp is None:
# This error will probably never occur
txt = """Observatory positions can not be assigned.
This is likely due to a failure in retrieving the observatories."""
raise ValueError(txt)
r_earth = bodies.get(earth_name).shape_model.average_radius
# Add geocentric cartesian positions
temp = (
temp.assign(X=lambda x: x.cos * r_earth * np.cos(np.radians(x.Longitude)))
.assign(Y=lambda x: x.cos * r_earth * np.sin(np.radians(x.Longitude)))
.assign(Z=lambda x: x.sin * r_earth)
)
self._observatory_info = temp
def _standardize_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
"""Internal helper to ensure IDs are strings and observatories are zero-padded."""
# Work on a copy to avoid SettingWithCopyWarnings
df = df.copy()
# Standardize Observatory Codes (e.g., 673 -> "673", 89 -> "089", C51 -> "C51")
if "observatory" in df.columns:
df["observatory"] = df["observatory"].astype(str).str.strip().str.zfill(3)
# Standardize MPC Numbers (e.g., 433 -> "433")
if "number" in df.columns:
df["number"] = df["number"].astype(str).str.strip()
return df
def _apply_EFCC18(
self,
bias_file=None,
Nside=None,
catalog_flags=None,
):
"""Internal, applies star catalog biases based on
Internal, applies star catalog biases based on
'Star catalog position and proper motion corrections in asteroid astrometry II: The Gaia era' by Eggl et al. (2018)
"""
if self._EFCC18_applied:
pass
else:
RA_corr, DEC_corr = get_biases_EFCC18(
RA=self.table.RA.to_numpy(),
DEC=self.table.DEC.to_numpy(),
epoch_seconds_TDB=self.table.epoch_seconds_TDB.to_numpy(),
catalog=self.table.catalog.to_numpy(),
bias_file=bias_file,
Nside=Nside,
catalog_flags=catalog_flags,
)
self._table = self._table.assign(RA_EFCC18=lambda x: x.RA - RA_corr)
self._table = self._table.assign(DEC_EFCC18=lambda x: x.DEC - DEC_corr)
self._table = self._table.assign(corr_RA_EFCC18=lambda x: RA_corr)
self._table = self._table.assign(corr_DEC_EFCC18=lambda x: DEC_corr)
self._EFCC18_applied = True
def _add_time_columns(self, table: pd.DataFrame) -> pd.DataFrame:
"""
Internal helper to add standardized time columns to an observation table.
This function takes a DataFrame with an 'epoch' column (in Julian Days, UTC)
and adds 'epoch_seconds_TDB' and 'epoch_seconds_UTC' columns.
Parameters
----------
table : pd.DataFrame
The input DataFrame, which must contain an 'epoch' column.
Returns
-------
pd.DataFrame
The DataFrame with the new time columns added.
"""
# Create DateTime objects from the Julian Day 'epoch' column
dt_objects = [DateTime.from_julian_day(jd) for jd in table['epoch']]
# Get the default time scale converter
time_scale_converter = time_representation.default_time_scale_converter()
augmented_table = table.copy()
# Add 'epoch_seconds_UTC' column by converting DateTime Objects to epoch
augmented_table["epoch_seconds_UTC"] = [dt_obj.epoch() for dt_obj in dt_objects]
# Add 'epoch_seconds_TDB' column by converting from UTC to TDB
augmented_table["epoch_seconds_TDB"] = [
time_scale_converter.convert_time(
input_scale=time_representation.utc_scale,
output_scale=time_representation.tdb_scale,
input_value=t_utc,
)
for t_utc in augmented_table["epoch_seconds_UTC"]
]
return augmented_table
def _add_custom_name_column(self, table: pd.DataFrame, custom_name) -> pd.DataFrame:
augmented_table = table.copy()
augmented_table["custom_name"] = [custom_name]*len(augmented_table)
return augmented_table
# data retrieval options: from_file: allows external observations to be added
[docs]
def from_file(self, filename: str, in_degrees: bool = False, frame: str = "J2000", custom_name: str | None = None) -> None:
"""
Loads observations from a local MPC 80-column text file.
This method serves as a high-level convenience function that orchestrates the
parsing of a raw 80-column file and loading the data into the batch. It uses
the `tudatpy.data.mpc.parser_80col.parse_80cols_file` function internally.
The parser returns an Astropy Table with RA/DEC values in radians, so this
method subsequently calls `from_astropy` to ingest the data. The `in_degrees`
parameter should therefore be `False` (the default).
Note
----
If you wish to perform intermediate operations on the parsed data using
pandas, you can call the parser directly and then use the `from_pandas`
method:
.. code-block:: python
from tudatpy.data.mpc.parser_80col import parse_80cols_file
# 1. Parse the file to an Astropy Table
astropy_table = parse_80cols_file("my_obs.txt")
# 2. Convert to a pandas DataFrame for manipulation
pandas_df = astropy_table.to_pandas()
# ... perform custom pandas operations on pandas_df ...
# 3. Load the processed DataFrame into the batch
batch = BatchMPC()
batch.from_pandas(pandas_df, in_degrees=False)
Parameters
----------
filename : str
The path to the MPC 80-column formatted text file.
in_degrees : bool, optional
Specifies the unit of RA/DEC in the file. Since the internal parser
handles the conversion to radians, this should be left as `False`.
Defaults to False.
frame : str, optional
The reference frame of the observations. Currently, only "J2000" is
supported. Defaults to "J2000".
"""
# Use the dedicated parser submodule to parse the external file.
# This function returns an astropy.Table with RA/DEC in radians.
astropy_table = parse_80cols_file(filename)
# Use the from_astropy method to validate and add the data.
# in_degrees is False because the parser has already converted to radians.
self.from_astropy(astropy_table, in_degrees=in_degrees, frame=frame, custom_name=custom_name)
# data retrieval options: get_observations: retrieves data from mpc through astroquery.
[docs]
def get_observations(
self,
MPCcodes: list[str | int],
id_types: list[str | None] | None = None,
drop_misc_observations: bool = True,
custom_name: str | None = None
) -> None:
"""Retrieve all observations for a set of MPC listed objects.
This method uses astroquery to retrieve the observations from the MPC.
An internet connection is required, observations are cached for faster subsequent retrieval.
Removes duplicate and irrelevant observation data by default (see `drop_misc_observations`).
Parameters
----------
MPCcodes : list[str | int]
List of integer or str MPC object codes for minor planets or comets.
id_types : list[str | None] | None, default None
A list of identification types ('asteroid_number', 'comet_number', 'comet_designation') corresponding to each MPCcode.
If an element is None, the type is considered unknown. If the entire list is None,
all types are considered unknown.
drop_misc_observations : bool, default True
Drops observations made by method: radar and offset (natural satellites).
Drops observations made by roaming observers.
Drops duplicate listings to denote first observation.
"""
if not isinstance(MPCcodes, list):
raise ValueError("MPCcodes parameter must be a list of integers/strings")
# 1. If id_types is not provided, create a list of Nones
if id_types is None:
id_types = [None] * len(MPCcodes)
# 2. Ensure the lists have the same length for a 1-to-1 mapping
if len(MPCcodes) != len(id_types):
raise ValueError("MPCcodes and id_types must have the same number of elements.")
for code, id_type in zip(MPCcodes, id_types):
if not (isinstance(code, int) or isinstance(code, str)):
raise ValueError(
"All codes in the MPCcodes parameter must be integers or strings"
)
# 3. Conditionally call the function based on whether id_type exists
if id_type is not None:
obs = MPC.get_observations(code, id_type=id_type).to_pandas()
else:
obs = MPC.get_observations(code).to_pandas()
obs = self._standardize_dataframe(obs)
obs = self._add_custom_name_column(obs, custom_name)
# convert JD to J2000 and UTC, convert deg to rad
obs = self._add_time_columns(obs)
obs = obs.assign(
RA=lambda x: (np.radians(x.RA) + np.pi) % (2 * np.pi) - np.pi,
DEC=lambda x: np.radians(x.DEC)
)
identifier = None
if drop_misc_observations:
obs = obs.query("note2 not in @OBS_TYPES_TO_DROP")
# Check for Comets/Interstellars (Astroquery returns 'comet_type' or 'comettype')
# If we have a number and a type, combine them (e.g., 3 + I = 3I)
type_col = None
if 'comet_type' in obs.columns: type_col = 'comet_type'
elif 'comettype' in obs.columns: type_col = 'comettype'
if type_col and pd.notna(obs[type_col].iloc[0]): # checks first digit is not NA
# It is a comet or interstellar object
number_part = str(obs['number'].iloc[0])
type_part = str(obs[type_col].iloc[0])
identifier = f"{number_part}{type_part}" # Result: "3I"
elif 'number' in obs.columns:
pd.set_option('future.no_silent_downcasting', True)
valid_numbers = obs['number'].dropna().astype(str).replace('<NA>', np.nan).dropna()
if not valid_numbers.empty:
potential_id = valid_numbers.iloc[0]
else:
# fallback to designation if no number has been assigned yet
valid_designations = obs['desig'].dropna().astype(str).replace('<NA>', np.nan).dropna()
potential_id = valid_designations.iloc[0]
# We allow alphanumeric strings now (to support packed numbers like D4341)
# We only pad if it is a short digit string (e.g. '1' -> '00001')
# Packed strings are always 5 chars long, so they won't be affected by zfill(5)
if len(potential_id) < 5:
potential_id = potential_id.zfill(5)
try:
# Try to unpack it. This handles '00001' and 'D4341'.
identifier = unpackers.unpack_permanent_minor_planet(potential_id)
except Exception:
# If unpacking fails (e.g. it was already unpacked or invalid),
# we keep the potential_id as is.
identifier = potential_id
if identifier is None and 'desig' in obs.columns and pd.notna(obs['desig'].iloc[0]):
identifier = str(obs['desig'].iloc[0])
if identifier is None:
print(f"Warning: Could not find a valid identifier for object code {code}. Skipping.")
continue
# Assign the identifier to the 'number' column for the entire DataFrame.
obs.loc[:, "number"] = identifier
self._table = pd.concat([self._table, obs])
self._refresh_metadata()
def _add_table(self, table: pd.DataFrame, custom_name: str | None, in_degrees: bool = True):
"""Internal. Formats a manually entered table of observations, used in from_astropy and in from_pandas. """
obs = table
obs = self._add_time_columns(obs)
obs = self._add_custom_name_column(obs, custom_name)
if in_degrees:
obs = obs.assign(RA=lambda x: (np.radians(x.RA) + np.pi ) % (2 * np.pi) - np.pi).assign(
DEC=lambda x: np.radians(x.DEC)
)
# convert object mpc code to string
self._table = pd.concat([self._table, obs])
self._refresh_metadata()
def _validate_table(self, table: astropy.table.QTable | astropy.table.Table | pd.DataFrame, frame: str) -> None:
"""Internal helper to validate the frame and required columns of a table.
Parameters
----------
table : astropy.table.QTable | astropy.table.Table | pd.DataFrame
The table to validate.
frame : str
The reference frame to check.
"""
if frame != "J2000":
raise NotImplementedError("Only observations in J2000 are supported currently")
# Get column names depending on table type
if isinstance(table, (astropy.table.QTable, astropy.table.Table)):
colnames = table.colnames
nrows = len(table)
elif isinstance(table, pd.DataFrame):
colnames = table.columns
nrows = len(table)
else:
raise TypeError(f"Unsupported table type: {type(table).__name__}")
if not set(self._req_cols).issubset(set(colnames)):
raise ValueError(f"Table must include a set of mandatory columns: {self._req_cols}")
if nrows == 0:
raise ValueError("Table contains zero rows: no valid observations were parsed.")
[docs]
def from_astropy(self, table: astropy.table.QTable | astropy.table.Table, in_degrees: bool = True, frame: str = "J2000", custom_name: str | None = None) -> None:
"""Loads observations from an Astropy Table into the BatchMPC object.
This method provides a convenient way to import observation data that has been
processed or filtered and is stored in an Astropy Table or QTable. It serves
as a wrapper that validates the input before converting it to a pandas DataFrame
for internal processing.
Args:
table (astropy.table.Table):
The Astropy Table or QTable containing the observation data. It must
include the following columns: 'number', 'epoch', 'RA', 'DEC',
'band', 'observatory'.
in_degrees (bool, optional):
If True, 'RA' and 'DEC' columns are assumed to be in degrees.
If False, they are assumed to be in radians. Defaults to True.
frame (str, optional):
The reference frame of the observations. Currently, only "J2000" is
supported. Defaults to "J2000".
Raises:
ValueError: If the input `table` is not an Astropy Table/QTable or if it
is missing any of the required columns.
NotImplementedError: If a `frame` other than 'J2000' is provided.
"""
if not isinstance(table, (astropy.table.QTable, astropy.table.Table)):
raise ValueError("Table must be of type astropy.table.QTable or astropy.table.Table")
self._validate_table(table, frame)
self._add_table(table=table.to_pandas(), in_degrees=in_degrees, custom_name=custom_name)
[docs]
def from_pandas(self, table: pd.DataFrame, in_degrees: bool = True, frame: str = "J2000", custom_name: str | None = None) -> None:
"""
Loads observations from a pandas DataFrame into the BatchMPC object.
The DataFrame must contain the following columns:
- 'number': The MPC object code (e.g., '433' for Eros).
- 'epoch': The observation epoch in Julian Day format.
- 'RA': Right Ascension.
- 'DEC': Declination.
- 'band': The observation band.
- 'observatory': The MPC observatory code.
Parameters
----------
table : pd.DataFrame
The pandas DataFrame containing the observation data.
in_degrees : bool, optional
If True, RA and DEC columns in the DataFrame are assumed to be in degrees.
If False, they are assumed to be in radians. Defaults to True.
frame : str, optional
The reference frame of the observations. Currently, only "J2000" is supported.
Defaults to "J2000".
"""
if not isinstance(table, pd.DataFrame):
raise ValueError("Table must be of type pandas.DataFrame")
self._validate_table(table, frame)
self._add_table(table=table, in_degrees=in_degrees, custom_name = custom_name)
[docs]
def set_weights(
self,
weights: list | np.ndarray | pd.Series,
):
"""Manually set weights per observation. Weights are passed to
observation collection when `.to_tudat()` is called. Set the
`apply_weights_VFCC17` parameter in `.to_tudat()` to `False` to avoid
overwriting. The order of the weights should match the order found in
the `.table` parameter.
Parameters
----------
weights : list | np.ndarray | pd.Series
Iterable with weights per observation.
Raises
------
ValueError
If the size of weights does not match the number of observations in
the batch table.
"""
if len(weights) != len(self.table):
raise ValueError(
"Weights vector must have same length as number of observations (BatchMPC.table)."
)
if isinstance(weights, pd.Series):
# this is to avoid errors with indexing in pandas.
weights = weights.to_numpy()
self._table.loc[:, "weight"] = weights
self._custom_weights_set = True
[docs]
def filter(
self,
bands: list[str] | None = None,
catalogs: list[str] | None = None,
observation_types: list[str] | None = None,
observatories: list[str] | None = None,
observatories_exclude: list[str] | None = None,
epoch_start: float | datetime.datetime | None = None,
epoch_end: float | datetime.datetime | None = None,
in_place: bool = True,
) -> "None | BatchMPC":
"""Filter out observations from the batch.
Parameters
----------
bands : list[str] | None, optional
List of observation bands to keep in the batch, by default None
catalogs : list[str] | None, optional
List of star catalogs to keep in the batch. See
https://www.minorplanetcenter.net/iau/info/CatalogueCodes.html
for the exact encodings used by MPC, by default None
observation_types : list[str] | None, optional
List of observation types to keep in the batch, e.g. CCD,
Photographic etc. See the Note2 section of the MPC format description:
https://minorplanetcenter.net/iau/info/OpticalObs.html
for the exact encoding, by default None
observatories : list[str] | None, optional
List of observatories to keep in the batch, by default None
observatories_exclude : list[str] | None, optional
List of observatories to remove from the batch, by default None
epoch_start : float | datetime.datetime | None, optional
Start date to include observations from, can be in python datetime in utc\
or the more conventional tudat seconds since j2000 in TDB if float,\
by default None
epoch_end : float | datetime.datetime | None, optional
Final date to include observations from, can be in python datetime in utc\
or the more conventional tudat seconds since j2000 in TDB if float,\
by default None
in_place : bool, optional
If true, modify the current batch object.\
If false returns a new object that is\
filtered, currect batch remains unmodified, by default True
Raises
------
ValueError
Is raised if bands, observatories, or observatories_exclude are not list or
None.
ValueError
Is raised if both observations_exclude and observatories are not None.
Returns
-------
None or BatchMPC
Returns a new instance of BatchMPC that is filtered.
"""
# basic user input handling
assert not isinstance(
observatories, int
), "stations parameter must be of type 'str' or 'list[str]'"
if not (isinstance(observatories, list) or (observatories is None)):
raise ValueError("observatories parameter must be list of strings or None")
if not (isinstance(catalogs, list) or (catalogs is None)):
raise ValueError("catalogs parameter must be list of strings or None")
if not (isinstance(observation_types, list) or (observation_types is None)):
raise ValueError(
"observation_types parameter must be list of strings or None"
)
if not (
isinstance(observatories_exclude, list) or (observatories_exclude is None)
):
raise ValueError(
"observatories_exclude parameter must be list of strings or None"
)
if not (isinstance(bands, list) or (bands is None)):
raise ValueError("bands parameter must be list of strings or None")
if (observatories is not None) and (observatories_exclude is not None):
txt = "Include or exclude observatories, not both at the same time."
raise ValueError(txt)
if in_place:
if bands is not None:
self._table = self._table.query("band == @bands")
if catalogs is not None:
self._table = self._table.query("catalog == @catalogs")
if observation_types is not None:
self._table = self._table.query("note2 == @observation_types")
if observatories is not None:
self._table = self._table.query("observatory == @observatories")
if observatories_exclude is not None:
self._table = self._table.query("observatory != @observatories_exclude")
timescale_converter_needed = isinstance(epoch_start, datetime.datetime) or \
isinstance(epoch_end, datetime.datetime)
if timescale_converter_needed:
# This loads necessary kernels/tables
time_scale_converter = time_representation.default_time_scale_converter()
else:
time_scale_converter = None
if epoch_start is not None:
if isinstance(epoch_start, float) or isinstance(epoch_start, int):
self._table = self._table.query(
"epoch_seconds_TDB >= @epoch_start"
)
elif isinstance(epoch_start, datetime.datetime):
epoch_start_iso_string = epoch_start.isoformat(sep=' ')
epoch_start_utc = DateTime.from_iso_string(epoch_start_iso_string).to_epoch()
epoch_start_tdb = time_scale_converter.convert_time(
input_scale=time_representation.utc_scale,
output_scale=time_representation.tdb_scale,
input_value=epoch_start_utc)
self._table = self._table.query("epoch_seconds_TDB >= @epoch_start_tdb")
if epoch_end is not None:
if isinstance(epoch_end, float) or isinstance(epoch_end, int):
self._table = self._table.query(
"epoch_seconds_TDB <= @epoch_end"
)
elif isinstance(epoch_end, datetime.datetime):
epoch_end_iso_string = epoch_end.isoformat(sep=' ')
epoch_end_utc = DateTime.from_iso_string(epoch_end_iso_string).to_epoch()
epoch_end_tdb = time_scale_converter.convert_time(
input_scale=time_representation.utc_scale,
output_scale=time_representation.tdb_scale,
input_value=epoch_end_utc)
self._table = self._table.query("epoch_seconds_TDB <= @epoch_end_tdb")
self._refresh_metadata()
return None
else:
new = self.copy()
new.filter(
bands=bands,
catalogs=catalogs,
observation_types=observation_types,
observatories=observatories,
observatories_exclude=observatories_exclude,
epoch_start=epoch_start,
epoch_end=epoch_end,
in_place=True,
)
return new
[docs]
def create_observations_from_astropy_table(
self,
table,
station_body: str = "Earth",
apply_weights_VFCC17: bool = False,
apply_star_catalog_debias: bool = False,
debias_kwargs: dict = dict(),
in_degrees: bool = True,
) -> observations.ObservationCollection:
"""
Just like to_tudat(), creates a Tudat ObservationCollection from an Astropy table or pandas DataFrame.
Unlike to_tudat(), it does not require a SystemOfBodies object as input.
Unlike to_tudat(), apply_weights_VFCC17 and apply_star_catalog_debias flags are set to False by default,
as users might have minimal tables available, which do not necessary contain all the fields required
by the batchMPC class (e.g. note1,note2, catalog, etc...)
This method is useful for creating observation collections when the environment
is not defined. Note that, when performing a simulation,
you must manually ensure that the ground stations (observatory codes)
and bodies (MPC numbers) referenced in this
collection are added to your simulation environment.
Parameters
----------
table : astropy.table.Table | pd.DataFrame
The input table containing observations. Must include 'number',
'observatory', 'epoch', 'RA', and 'DEC' columns.
station_body : str, optional
The name of the body to which the ground stations (observatories)
are attached, by default "Earth".
Returns
-------
observations.ObservationCollection
A collection of observation sets grouped by link.
"""
# 1. Initialize internal table handling
# We temporarily use a BatchMPC instance to leverage existing logic
temp_batch = BatchMPC()
if isinstance(table, Table):
temp_batch.from_astropy(table, in_degrees=in_degrees)
else:
temp_batch.from_pandas(table, in_degrees=in_degrees)
# 2. Apply Star Catalog Debias (EFCC18)
# Matches logic in to_tudat()
if apply_star_catalog_debias:
temp_batch._apply_EFCC18(**debias_kwargs)
RA_col = "RA_EFCC18"
DEC_col = "DEC_EFCC18"
else:
RA_col = "RA"
DEC_col = "DEC"
# 3. Apply Weighting (VFCC17)
# Matches logic in to_tudat()
if apply_weights_VFCC17:
weights_table = get_weights_VFCC17(
mpc_table=temp_batch.table,
return_full_table=True,
)
df = weights_table # Work with the table containing weights
else:
df = temp_batch.table.copy()
# 4. Group by unique Link (Target Body + Observatory)
unique_links = df.groupby(["number", "observatory"])
observation_set_list = []
for (mpc_code, observatory_code), group in unique_links:
# --- A. Define Link Ends ---
link_ends = dict()
link_ends[links.transmitter] = links.body_origin_link_end_id(str(mpc_code))
link_ends[links.receiver] = links.body_reference_point_link_end_id(
station_body, str(observatory_code)
)
link_definition = links.link_definition(link_ends)
# --- B. Extract Data ---
# Use the pre-calculated TDB seconds from the internal processing
times = group["epoch_seconds_TDB"].to_numpy()
observables = group[[RA_col, DEC_col]].to_numpy()
# --- C. Create SingleObservationSet ---
observation_set = observations.create_single_observation_set(
model_settings.angular_position_type,
link_definition.link_ends,
observables,
times,
links.receiver
)
# --- D. Apply Weights ---
# Extract weights and flatten for Tudat (RA1, DEC1, RA2, DEC2...)
if "weight" in group.columns:
weights = group["weight"].to_numpy()
weights_flat = np.ravel([weights, weights], "F")
observation_set.weights_vector = weights_flat
observation_set_list.append(observation_set)
return observations.ObservationCollection(observation_set_list)
[docs]
def to_tudat(
self,
bodies: environment.SystemOfBodies,
included_satellites: dict[str, str] | None,
station_body: str = "Earth",
add_sbdb_gravity_model: bool = False,
apply_weights_VFCC17: bool = True,
apply_star_catalog_debias: bool = True,
debias_kwargs: dict = dict(),
) -> observations.ObservationCollection:
"""Converts the observations in the batch into a Tudat compatible format and
sets up the relevant Tudat infrastructure to support estimation.
This method does the following:\\
1. (By Default) Applies star catalog debiasing and estimation
weight calculation.\\
2. Creates an empty body for each minor planet with their MPC code as a
name.\\
3. Adds this body to the system of bodies inputted to the method.\\
4. Retrieves the global position of the terrestrial observatories in
the batch and adds these stations to the Tudat environment.\\
5. Creates link definitions between each unique terrestrial
observatory/ minor planet combination in the batch.\\
6. (Optionally) creates a link definition between each
space telescope / minor planet combination in the batch.
This requires an addional input.\\
7. Creates a `SingleObservationSet` object for each unique link that
includes all observations for that link.\\
8. (By Default) Add the relevant weights to the `SingleObservationSet`
per observation.\\
8. Returns the observations
Parameters
----------
bodies : environment.SystemOfBodies
SystemOfBodies containing at least the earth to allow for the placement of
terrestrial telescopes
included_satellites : dict[str, str] | None, optional
A dictionary that links the name of a space telescope used by the user with
the observatory code in MPC. Used when utilising observations from space
telescopes like TESS and WISE. The keys should be the MPC observatory
codes. The values should be the bodies' in the user's code. The relevant
observatory code can be retrieved using
the .observatories_table() method, by default None
station_body : str, optional
Body to attach ground stations to. Does not need to be changed unless the
`Earth` body has been renamed, by default "Earth"
station_body : bool, optional
Adds a central_sbdb gravity model to the object, generated using JPL's small body database.
This option is only available for a limited number of bodies and raises an error if unavailable.
See tudatpy.dynamics.environment_setup.gravity_field.central_sbdb for more info.
Enabled if True, by default False
apply_weights_VFCC17 : bool, optional
Applies the weighting scheme as described in: "Statistical analysis of astrometric errors
for the most productive asteroid surveys" by Veres et al. (2017). Overwrites custom weights
set through the `.set_weights()` method if True, by default True
apply_star_catalog_debias : bool, optional
Applies star catalog debiasing as described in: "Star catalog position and proper motion corrections
in asteroid astrometry II: The Gaia era" by Eggl et al. (2018), by default True
debias_kwargs : dict, optional
Additional options when applying star catalog debiasing. A different debias file
can be set here. Options are set as kwargs using a dictionary, see data._biases.get_biases_EFCC18()
for more info, by default dict()
Returns
-------
observations.ObservationCollection
ObservationCollection with the observations found in the batch
Examples
----------
Using Space Telescopes
Create dictionary to link name in tudat with mpc code in batch:
>> sats_dict = {"C57":"TESS"}
>> obs = batch1.to_tudat(bodies, included_satellites=sats_dict)
"""
# apply EFCC18 star catalog corrections:
if apply_star_catalog_debias:
self._apply_EFCC18(**debias_kwargs)
RA_col = "RA_EFCC18"
DEC_col = "DEC_EFCC18"
else:
RA_col = "RA"
DEC_col = "DEC"
# Calculate observation weights and update table:
if apply_weights_VFCC17 and not self._custom_weights_set:
temp_table: pd.DataFrame = get_weights_VFCC17(
mpc_table=self.table,
return_full_table=True,
)
self._table.loc[:, "weight"] = temp_table.loc[:, "weight"]
else:
temp_table = self.table.copy()
# start user input validation
# Ensure that Earth is in the SystemOfBodies object
try:
bodies.get(station_body)
except Exception as e:
print(
f"Body {station_body} is not in bodies, if you have renamed Earth, "
+ "set the station_body parameter to the new name."
)
raise e
# Add positions of the observatories
self._add_observatory_positions(bodies, station_body)
# get satellites to include and exclude
if included_satellites is not None:
sat_obs_codes_included = list(included_satellites.keys())
# this appears unused but is used in a pandas query:
sat_obs_codes_excluded = list(
set(self._space_telescopes) - set(sat_obs_codes_included)
)
# Ensure that the satellite is in the SystemOfBodies object
for sat in list(included_satellites.values()):
try:
bodies.get(sat)
except Exception as e:
print(f"Body {sat} is not in bodies")
raise e
else:
sat_obs_codes_included = []
# this appears unused but is used in a pandas query:
sat_obs_codes_excluded = self._space_telescopes
# end user input validation
# get relevant stations positions
tempStations = self._observatory_info.query("Code == @self.observatories").loc[
:, ["Code", "X", "Y", "Z"]
]
# add station positions to the observations
observations_table = pd.merge(
left=temp_table,
right=tempStations,
left_on="observatory",
right_on="Code",
how="left",
)
# remove the observations from unused satellite observatories
observations_table = observations_table.query("Code != @sat_obs_codes_excluded")
# add asteroid bodies to SystemOfBodies object
for body in self._MPC_codes:
if not bodies.does_body_exist(body):
bodies.create_empty_body(str(body))
# save the name of the body added
self._bodies_created[str(body)] = "empty body"
if add_sbdb_gravity_model:
add_gravity_field_model(bodies, str(body), central_sbdb(str(body)))
self._bodies_created[str(body)] = "empty body + sbdb gravity"
# add ground stations to the earth body
for idx in range(len(tempStations)):
station_name = tempStations.iloc[idx].Code
# skip if it is a satellite observatory
if station_name in self._space_telescopes:
continue
ground_station_settings = environment_setup.ground_station.basic_station(
station_name=station_name,
station_nominal_position=[
tempStations.iloc[idx].X,
tempStations.iloc[idx].Y,
tempStations.iloc[idx].Z,
],
)
# Check if station already exists
if station_name not in bodies.get(station_body).ground_station_list:
# Add the ground station to the environment
environment_setup.add_ground_station(
bodies.get_body(station_body), ground_station_settings
)
# get unique combinations of mpc bodies and observatories
unique_link_combos = (
observations_table.loc[:, ["number", "observatory"]].drop_duplicates()
).values
observation_set_list = []
for combo in unique_link_combos:
MPC_number = combo[0]
station_name = combo[1]
# CREATE LINKS
link_ends = dict()
# observed body link
link_ends[links.transmitter] = links.body_origin_link_end_id(MPC_number)
if station_name in sat_obs_codes_included:
# link for a satellite
sat_name = included_satellites[station_name]
link_ends[links.receiver] = links.body_origin_link_end_id(sat_name)
link_definition = links.link_definition(link_ends)
else:
# link for a ground station
link_ends[links.receiver] = links.body_reference_point_link_end_id(
station_body, station_name
)
link_definition = links.link_definition(link_ends)
# get observations, angles and times for this specific link
observations_for_this_link = observations_table.query(
"number == @MPC_number"
).query("observatory == @station_name")
observation_angles = observations_for_this_link.loc[
:, [RA_col, DEC_col]
].to_numpy()
observation_times = observations_for_this_link.loc[
:, ["epoch_seconds_TDB"]
].to_numpy()[:, 0]
# create a set of obs for this link
observation_set = observations.single_observation_set(
model_settings.angular_position_type,
link_definition,
observation_angles,
observation_times,
links.receiver,
)
# apply weights if apply_weights is True or set_weights() has been used.
if apply_weights_VFCC17 or self._custom_weights_set:
observation_weights = observations_for_this_link.loc[
:, ["weight"]
].to_numpy()[:, 0]
# this is to make sure the order is RA1, DEC1, 2, 2, 3, 3 etc.
observation_weights = np.ravel(
[observation_weights, observation_weights], "F"
)
observation_set.weights_vector = observation_weights
observation_set_list.append(observation_set)
observation_collection = observations.ObservationCollection(
observation_set_list
)
return observation_collection
[docs]
def plot_observations_temporal(
self,
objects: list[str] | None = None,
figsize: tuple[float] = (9.0, 6.0),
):
"""Generates a matplotlib figure with the declination and right ascension
over time.
Parameters
----------
objects : list[str] | None, optional
List of specific MPC objects in batch to plot, None to plot all
, by default None
projection : str, optional
projection of the figure options are: 'aitoff', 'hammer',
'lambert' and 'mollweide', by default "aitoff"
figsize : tuple[float], optional
size of the matplotlib figure, by default (15, 7)
Returns
-------
Matplotlib figure
Matplotlib figure with the observations
"""
fig, (axRA, axDEC) = plt.subplots(2, 1, figsize=figsize, sharex=True)
if objects is None:
objs = self.MPC_objects
else:
objs = [str(o) for o in objects]
for obj in objs:
tab = self._table.query("number == @obj")
axRA.scatter(
tab.epoch_seconds_TDB,
np.degrees(tab.RA),
label="MPC: " + obj,
marker=".",
# linestyle=None,
)
axDEC.scatter(
tab.epoch_seconds_TDB,
np.degrees(tab.DEC),
label="MPC: " + obj,
marker=".",
# linestyle=None,
)
axRA.set_ylabel(r"Right Ascension $[\deg]$")
axDEC.set_ylabel(r"Declination $[\deg]$")
axDEC.set_xlabel(r"Time [year]")
axRA.grid()
axDEC.grid()
axRA.set_title("Right Ascension")
axDEC.set_title("Declination")
axRA.legend(bbox_to_anchor=(1.01, 1), loc="upper left")
buffer = 10
axRA.set_ylim(0 - buffer, 360 + buffer)
axDEC.set_ylim(-90 - buffer, 90 + buffer)
fig.set_layout_engine("tight")
return fig
[docs]
def plot_observations_sky(
self,
objects: list[str] | None = None,
projection: str | None = None,
figsize: tuple[float] = (14.0, 7.0),
):
"""Generates a matplotlib figure with the observations'
right ascension and declination over time.
NOTE: Only plots observations between 1970 and 3000 to ensure
safe datetime conversion across platforms.
"""
fig, ax = plt.subplots(
1, 1, subplot_kw={"projection": projection}, figsize=figsize
)
if objects is None:
objs = self.MPC_objects
else:
objs = [str(o) for o in objects]
markers = ["o", "+", "^"]
# Query all relevant objects first to check bounds
all_obj_data = self._table.query("number == @objs")
# Convert TDB Seconds -> UTC seconds
time_scale_converter = time_representation.default_time_scale_converter()
utc_epochs = [time_scale_converter.convert_time(
input_scale=time_representation.tdb_scale,
output_scale=time_representation.utc_scale,
input_value=t) for t in all_obj_data.epoch_seconds_TDB
]
# Prepare Data for Plotting
# Apply filter: number IN objs
query = "number == @objs"
plot_data = self._table.query(query)
# Compute global vmin/vmax for consistent colorbar
vmin = min(utc_epochs)
vmax = max(utc_epochs)
plot_handle = None
for idx, obj in enumerate(objs):
tab = plot_data.query("number == @obj")
utc_epochs_object = [time_scale_converter.convert_time(
input_scale=time_representation.tdb_scale,
output_scale=time_representation.utc_scale,
input_value=t) for t in tab.epoch_seconds_TDB.values
]
if tab.empty:
continue
plot_handle = ax.scatter(
(tab.RA) - np.pi if projection is not None else np.degrees(tab.RA),
(tab.DEC) if projection is not None else np.degrees(tab.DEC),
s=30,
marker=markers[int(idx % len(markers))],
cmap=cm.plasma,
label="MPC: " + obj,
c=utc_epochs_object,
vmin=vmin,
vmax=vmax,
)
ax.legend(markerscale=2)
ax.set_xlabel(r"Right Ascension $[\deg]$")
ax.set_ylabel(r"Declination $[\deg]$")
if projection is not None:
try:
# Matplotlib projections can sometimes return non-standard ticks
yticks = [
f"{x}°"
for x in (np.degrees(np.array(ax.get_yticks().tolist()))).astype(int)
]
xticks = [
f"{x}°"
for x in (np.degrees(np.array(ax.get_xticks().tolist())) + 180).astype(int)
]
if projection in ["aitoff", "hammer", "mollweide"]:
ax.set_yticklabels(yticks)
ax.set_xticklabels(xticks)
elif projection == "lambert":
ax.set_xticklabels(xticks)
except Exception:
pass
# Add Colorbar with Date formatting
if plot_handle:
ticks = np.linspace(vmin, vmax, 7)
cbar = plt.colorbar(mappable=plot_handle, ax=ax, label="Time", ticks=ticks)
# Convert numeric ticks back to readable date strings using Tudatpy's DateTime
cbar.ax.set_yticklabels([DateTime.from_epoch(t).to_iso_string().split(' ')[0] for t in ticks])
start_date_str = DateTime.from_epoch(vmin).to_iso_string()
end_date_str = DateTime.from_epoch(vmax).to_iso_string()
ax.grid()
fig.suptitle(f"{len(plot_data)} observations between {start_date_str} and {end_date_str}")
fig.set_layout_engine("tight")
return fig
[docs]
def summary(self):
"""Produce a quick summary of the content of the batch."""
print()
print(" Batch Summary:")
print(f"1. Batch includes {len(self._MPC_codes)} minor planets:")
print(" ", self.MPC_objects)
satObs = len(self._table.query("observatory == @self.space_telescopes"))
print(
f"2. Batch includes {self.size} observations, including {satObs} "
+ "observations from space telescopes"
)
print(
f"3. The observations range from {self._table.epoch_seconds_TDB.min()} "
+ f"to {self._table.epoch_seconds_TDB.max()}"
)
print(f" In seconds TDB since J2000: {self.epoch_start} to {self.epoch_end}")
print(
f" In Julian Days: "
+ f"{self._table.epoch.min()}"
+ f" to {self._table.epoch.max()}"
)
print(
f"4. The batch contains observations from {len(self.observatories)} "
+ f"observatories, including {len(self.space_telescopes)} space telescopes"
)
print()
[docs]
def observatories_table(
self,
only_in_batch: bool = True,
only_space_telescopes: bool = False,
exclude_space_telescopes: bool = False,
include_positions: bool = False,
) -> pd.DataFrame:
"""Returns a pandas DataFrame with information about all MPC observatories,
Carthesian positions are only available after running the `to_tudat()` method.
Parameters
----------
only_in_batch : bool, optional
Filter out observatories that are not in the batch, by default True
only_space_telescopes : bool, optional
Filter out all observatories except space telescopes, by default False
only_space_telescopes : bool, optional
Filter out all space telescopes, by default False
include_positions : bool, optional
Include cartesian positions of the terrestrial telescopes only available
after running to_tudat(), by default False
Returns
-------
pd.DataFrame
Dataframe with information about the observatories.
"""
temp = self._observatory_info.copy()
temp2 = self._table
count_observations = (
temp2.groupby("observatory")
.count()
.rename(columns={"number": "count"})
.reset_index(drop=False)
.loc[:, ["observatory", "count"]]
)
temp = temp.copy()
count_observations = count_observations.copy()
temp = pd.merge(
left=temp,
right=count_observations,
left_on="Code",
right_on="observatory",
how="left",
)
if only_in_batch:
temp = temp.query("Code == @self.observatories")
if only_space_telescopes:
temp = temp.query("Code == @self._MPC_space_telescopes")
if exclude_space_telescopes:
temp = temp.query("Code != @self._MPC_space_telescopes")
if not include_positions:
temp = temp.loc[:, ["Code", "Name", "count"]]
return temp