import json
import getpass
import os
import requests
from collections import defaultdict
from urllib.parse import urljoin, urlparse
from tudatpy.dynamics import environment
from datetime import datetime, timedelta
import math
from tudatpy.data import get_resource_path
from tudatpy.astro.element_conversion import mean_to_true_anomaly
[docs]
class SpaceTrackQuery:
"""
Handles queries to the Space-Track.org API for retrieving TLEs and other
space data. Manages authentication, session persistence, and local caching
to minimise API usage.
For working with the retrieved OMM records, see :class:`OMMUtils`.
"""
[docs]
def __init__(
self,
username: str | None = None,
password: str | None = None,
spacetrack_url: str = "https://www.space-track.org",
tle_data_folder: str = get_resource_path() + "/tle_data",
timeout: int = 60,
) -> None:
"""
Parameters
----------
username : str | None, optional
Space-Track.org username. Prompted interactively if not provided.
password : str | None, optional
Space-Track.org password. Prompted interactively if not provided.
spacetrack_url : str, optional
Base URL. Defaults to ``'https://www.space-track.org'``.
tle_data_folder : str, optional
Directory for locally cached TLE files.
"""
if not username:
username = input("space-track username: ")
if not password:
password = getpass.getpass("space-track password: ")
self.username: str = username
self.spacetrack_url: str = spacetrack_url
self.tle_data_folder: str = tle_data_folder
os.makedirs(self.tle_data_folder, exist_ok=True)
self.session: requests.Session = requests.Session()
self.timeout = timeout
self._login(password)
del password # do not keep the plaintext password on the instance
# ------------------------------------------------------------------
# Context manager
# ------------------------------------------------------------------
def __enter__(self) -> "SpaceTrackQuery":
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.logout()
# ------------------------------------------------------------------
# Authentication
# ------------------------------------------------------------------
def _login(self, password: str) -> None:
"""
Logs in to Space-Track.org.
Parameters
----------
password : str
Plaintext password. Never stored on the instance.
Raises
------
requests.exceptions.RequestException
On network errors or bad credentials.
"""
print("Logging into Space-Track...")
try:
response = self.session.post(
urljoin(self.spacetrack_url, "/ajaxauth/login"),
json={"identity": self.username, "password": password}, timeout=self.timeout
)
response.raise_for_status()
print("Login successful.")
except requests.exceptions.RequestException as e:
print(f"Login failed: {e}")
raise
[docs]
def logout(self) -> None:
"""
Logs out and closes the session. Safe to call multiple times.
"""
try:
self.session.get(urljoin(self.spacetrack_url, "/ajaxauth/logout"),
timeout = self.timeout)
print("Logged out of Space-Track.")
except requests.exceptions.RequestException as e:
print(f"Logout request failed (session may already be expired): {e}")
finally:
self.session.close()
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _build_url(self, *path_parts: str) -> str:
"""
Joins path segments onto the base URL, safe on all platforms.
Parameters
----------
*path_parts : str
URL path segments to join.
Returns
-------
str
The full URL.
"""
path = "/".join(part.strip("/") for part in path_parts)
return urljoin(self.spacetrack_url.rstrip("/") + "/", path)
def _fetch_json(self, url: str) -> list:
"""
GET a URL and return the body as a list.
Parameters
----------
url : str
The URL to fetch.
Returns
-------
list
The JSON response body as a list.
"""
response = self.session.get(url, timeout=self.timeout)
response.raise_for_status()
data = response.json()
return data if isinstance(data, list) else [data]
def _get_json_and_save(
self, url: str, json_name: str, merge: bool = False
) -> list | None:
"""
Fetches JSON from the API and persists it to the local cache.
Parameters
----------
url : str
API endpoint.
json_name : str
Cache filename (relative to ``tle_data_folder``).
merge : bool
``True`` → merge into existing file. ``False`` → overwrite.
Returns
-------
list | None
The freshly fetched records, or ``None`` on failure.
"""
filepath = os.path.join(self.tle_data_folder, json_name)
try:
print(f"Fetching data for {json_name}...")
new_data = self._fetch_json(url)
if merge and os.path.exists(filepath):
print(f"Merge requested. Updating {json_name}...")
self._save_unique_sorted(filepath, new_data)
with open(filepath, "r") as f:
final = json.load(f)
return final.get("data", final) if isinstance(final, dict) else final
else:
print(f"Overwrite mode. Saving {json_name}...")
if os.path.exists(filepath):
os.remove(filepath)
self._save_unique_sorted(filepath, new_data)
return new_data
except requests.exceptions.RequestException as e:
print(f"API Error: {e}")
return None
def _save_unique_sorted(self, filepath: str, new_data: list[dict]) -> None:
"""
Merges ``new_data`` into the cache file and updates ``last_api_hit``.
When two records share the same ``NORAD_CAT_ID + EPOCH`` key the one
with the later ``CREATION_DATE`` wins.
File structure on disk::
{"last_api_hit": "<ISO datetime>", "data": [...]}
Parameters
----------
filepath : str
Path to the JSON cache file.
new_data : list[dict]
New OMM records to merge.
Returns
-------
None
"""
existing: list[dict] = []
if os.path.exists(filepath):
try:
with open(filepath, "r") as f:
content = json.load(f)
existing = content.get("data", content) if isinstance(content, dict) else content
except json.JSONDecodeError:
existing = []
unique: dict[str, dict] = {}
for item in existing:
unique[f"{item['NORAD_CAT_ID']}_{item['EPOCH']}"] = item
for item in new_data:
key = f"{item['NORAD_CAT_ID']}_{item['EPOCH']}"
if key not in unique or item.get("CREATION_DATE", "") > unique[key].get("CREATION_DATE", ""):
unique[key] = item
sorted_data = sorted(unique.values(), key=lambda x: (x["EPOCH"], x["NORAD_CAT_ID"]))
with open(filepath, "w") as f:
json.dump({"last_api_hit": datetime.now().isoformat(), "data": sorted_data}, f, indent=4)
# ------------------------------------------------------------------
# Download methods
# ------------------------------------------------------------------
[docs]
def get_tles_for_date_range(
self,
norad_id: int | str,
start_date: str,
end_date: str,
override_last_api_hit: bool = False,
) -> list[dict] | None:
"""
Retrieves TLEs for a single satellite over a date range.
Respects a 1.5-hour API cooldown to avoid hammering the server when
data is absent (e.g. the satellite had not yet launched).
Parameters
----------
norad_id : int | str
NORAD catalogue ID.
start_date : str
Start of range, ``"YYYY-MM-DD"``.
end_date : str
End of range, ``"YYYY-MM-DD"``.
override_last_api_hit : bool
If ``True``, bypass the cooldown and force an API call.
Returns
-------
list[dict] | None
OMM records within the requested window.
"""
norad_id = str(norad_id)
filepath = os.path.join(self.tle_data_folder, f"tle_{norad_id}.json")
req_start = datetime.strptime(start_date, "%Y-%m-%d")
req_end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1, microseconds=-1)
local_data: list[dict] = []
last_hit: datetime | None = None
if os.path.exists(filepath):
with open(filepath, "r") as f:
content = json.load(f)
if isinstance(content, dict):
local_data = content.get("data", [])
last_hit = datetime.fromisoformat(content["last_api_hit"]) if "last_api_hit" in content else None
else:
local_data = content # legacy list format
needs_fetch = True
cooldown = timedelta(hours=1.5)
if local_data:
local_min = datetime.fromisoformat(local_data[0]["EPOCH"])
local_max = datetime.fromisoformat(local_data[-1]["EPOCH"])
if req_start >= local_min and req_end <= local_max:
print("Local cache fully covers request. Skipping API.")
needs_fetch = False
if needs_fetch and last_hit:
time_since = datetime.now() - last_hit
if not override_last_api_hit and time_since < cooldown:
print(f"Cache is recent ({time_since} ago). Skipping API despite gaps.")
needs_fetch = False
if needs_fetch:
print(f"Fetching {norad_id} from API ({start_date} -- {end_date})...")
url = self._build_url(
f"basicspacedata/query/class/gp_history/NORAD_CAT_ID/{norad_id}"
f"/EPOCH/{start_date}--{end_date}/orderby/EPOCH asc/format/json"
)
fetched = self._fetch_json(url)
self._save_unique_sorted(filepath, fetched or [])
with open(filepath, "r") as f:
local_data = json.load(f).get("data", [])
return [
omm for omm in local_data
if req_start <= datetime.fromisoformat(omm["EPOCH"]) <= req_end
]
[docs]
def latest_on_orbit(
self, update_existing: bool = False, filename: str | None = None
) -> list | None:
"""
Retrieves the newest propagable element set for all on-orbit payloads.
Parameters
----------
update_existing : bool
``True`` → merge into existing file. ``False`` → overwrite.
filename : str | None
Optional filename override. Defaults to ``'latest_on_orbit.json'``.
Returns
-------
list | None
List of OMM records or None if the request failed.
"""
url = self._build_url(
"basicspacedata/query/class/gp/OBJECT_TYPE/PAYLOAD"
"/decay_date/null-val/epoch/>now-30/orderby/norad_cat_id/format/json"
)
return self._get_json_and_save(url, filename or "latest_on_orbit.json", merge=update_existing)
[docs]
def descending_epoch(
self,
N: int | None = None,
update_existing: bool = False,
filename: str | None = None,
) -> list | None:
"""
Retrieves GP data ordered by epoch (most recent first).
Parameters
----------
N : int | None
Optional result limit.
update_existing : bool
``True`` → merge. ``False`` → overwrite.
filename : str | None
Optional filename override.
Returns
-------
list | None
List of OMM records or None if the request failed.
"""
if filename:
json_name = filename
elif N is not None:
json_name = f"gp_descending_limit_{N}.json"
else:
json_name = "gp_descending.json"
parts = ["basicspacedata/query/class/gp/OBJECT_TYPE/PAYLOAD/orderby/epoch desc"]
if N is not None:
parts.append(f"limit/{N}")
parts.append("format/json")
return self._get_json_and_save(self._build_url("/".join(parts)), json_name, merge=update_existing)
[docs]
def get_tles_by_norad_ids(
self,
norad_ids: int | list[int],
history: bool = False,
orderby: str = "epoch desc",
limit_per_object: int = 1,
update_existing: bool = False,
filename: str | None = None,
) -> list | None:
"""
Retrieves TLEs for one or more specific NORAD IDs.
Parameters
----------
norad_ids : int | list[int]
One or more NORAD IDs.
history : bool
If ``True``, queries ``gp_history`` for historical records.
orderby : str
API ordering string.
limit_per_object : int
Maximum records per object.
update_existing : bool
``True`` → merge. ``False`` → overwrite.
filename : str | None
Force a specific cache filename.
Returns
-------
list | None
List of OMM records or None if the request failed.
"""
if not isinstance(norad_ids, (list, tuple, set)):
norad_ids = [norad_ids]
id_string = ",".join(map(str, norad_ids))
tle_class = "gp"
use_orderby = True
if history or limit_per_object > 1:
tle_class = "gp_history"
if len(norad_ids) > 1:
print("Note: Removing 'orderby' for batch history query.")
use_orderby = False
if filename:
json_name = filename
elif len(norad_ids) == 1:
json_name = f"{tle_class}_{norad_ids[0]}_limit_{limit_per_object}.json"
else:
json_name = f"{tle_class}_batch_{len(norad_ids)}_ids_limit_{limit_per_object}.json"
parts = ["basicspacedata/query", f"class/{tle_class}", f"NORAD_CAT_ID/{id_string}"]
if use_orderby:
parts.append(f"orderby/{orderby}")
parts.extend([f"limit/{limit_per_object}", "format/json"])
json_data = self._get_json_and_save(self._build_url("/".join(parts)), json_name, merge=update_existing)
return json_data
[docs]
def filtered_by_oe_dict(
self,
filter_oe_dict: dict[str, tuple[float | None, float | None]],
limit: int = 100,
output_file: str = "filtered_results.json",
update_existing: bool = False,
) -> list | None:
"""
Retrieves payloads filtered by orbital element bounds.
Parameters
----------
filter_oe_dict : dict
Mapping of orbital element names to ``(min, max)`` bound tuples.
Either bound may be ``None`` for a one-sided filter.
limit : int
Maximum number of results.
output_file : str
Cache filename.
update_existing : bool
``True`` → merge. ``False`` → overwrite.
Returns
-------
list | None
List of OMM records or None if the request failed.
"""
parts = ["basicspacedata/query/class/gp/OBJECT_TYPE/PAYLOAD"]
for oe, bounds in filter_oe_dict.items():
min_val = bounds[0] if len(bounds) > 0 else None
max_val = bounds[1] if len(bounds) > 1 else None
if min_val is not None and max_val is not None:
parts.extend([oe.upper(), f"{min_val}--{max_val}"])
elif min_val is not None:
parts.extend([oe.upper(), f">{min_val}"])
elif max_val is not None:
parts.extend([oe.upper(), f"<{max_val}"])
parts.append(f"orderby/epoch desc/limit/{limit}/format/json")
return self._get_json_and_save(self._build_url("/".join(parts)), output_file, merge=update_existing)
[docs]
def query_from_query_builder_url(
self,
query: str,
output_file: str = "custom_query.json",
update_existing: bool = False,
) -> list | None:
"""
Executes a user-provided Space-Track query URL or query path.
The input may be:
- a full URL copied from the Space-Track 'Query Builder'
- only the query path starting from 'basicspacedata/query/...'
Parameters
----------
query : str
Full Space-Track API URL or query path.
output_file : str
Local cache filename.
update_existing : bool
True → merge with existing cache. False → overwrite.
Returns
-------
list | None
JSON records returned by the API.
"""
query = query.strip()
if query.startswith("http"): # Full URL provided
parsed = urlparse(query)
if "space-track.org" not in parsed.netloc:
raise ValueError("Only space-track.org URLs are allowed.")
# Extract only the path part
path = parsed.path.lstrip("/")
else:
path = query.lstrip("/") # Only path provided
# Ensure correct prefix
if not path.startswith("basicspacedata/query"):
raise ValueError(
"Query must start with 'basicspacedata/query/...'"
)
# Ensure JSON format
if "format/json" not in path:
if not path.endswith("/"):
path += "/"
path += "format/json"
url = self._build_url(path)
return self._get_json_and_save(
url,
output_file,
merge=update_existing
)
[docs]
class OMMUtils:
"""
Stateless utility class for working with OMM (Orbit Mean-Elements Message)
records in dictionary form.
This class has no dependency on Space-Track.org or any particular data
source — it operates on plain OMM dicts regardless of where they came from.
All methods are static; instantiation is not required.
"""
[docs]
@staticmethod
def save_batch_to_individual_files(
json_data: list[dict],
output_folder: str,
) -> list[str] | None:
"""
Splits a batch list of OMM records into one JSON file per NORAD ID.
Parameters
----------
json_data : list[dict]
Batch OMM records, each containing a ``NORAD_CAT_ID`` field.
output_folder : str
Directory in which to write the per-satellite files.
Returns
-------
list[str] | None
List of created filenames (basenames only), or None if no data.
"""
if not json_data:
print("No data to split.")
return None
grouped: dict[str, list] = defaultdict(list)
for record in json_data:
grouped[record["NORAD_CAT_ID"]].append(record)
saved_files: list[str] = []
for norad_id, records in grouped.items():
filename = f"{norad_id}_from_batch.json"
filepath = os.path.join(output_folder, filename)
with open(filepath, "w") as f:
json.dump(records, f, indent=4)
saved_files.append(filename)
print(f"Split batch data into {len(saved_files)} individual files in {output_folder}.")
return saved_files
[docs]
@staticmethod
def get_tles(json_data: list[dict] | dict) -> defaultdict[str, list[tuple[str, str]]]:
"""
Extracts TLE line pairs from a list of OMM records.
Parameters
----------
json_data : list[dict] | dict
One or more OMM records containing ``TLE_LINE1`` and ``TLE_LINE2``.
Returns
-------
dict[str, tuple[str, str]]
Mapping of NORAD ID (str) → (TLE line 1, TLE line 2).
"""
if isinstance(json_data, dict):
json_data = [json_data]
final_dict = defaultdict(list)
for entry in json_data:
final_dict[entry["NORAD_CAT_ID"]].append(
(entry["TLE_LINE1"], entry["TLE_LINE2"])
)
return final_dict
[docs]
@staticmethod
def get_tudat_keplerian_element_set(
omm: dict,
) -> tuple[float, float, float, float, float, float]:
"""
Extracts and converts Keplerian elements from a single OMM record into
SI units suitable for Tudat.
Parameters
----------
omm : dict
A single OMM record (km and degrees, as returned by Space-Track or
any compliant source).
Returns
-------
tuple[float, float, float, float, float, float]
``(a, e, i, omega, RAAN, true_anomaly)`` where distances are in
metres and angles are in radians.
Raises
------
TypeError
If ``omm`` is a list rather than a single dict.
ValueError
If ``omm`` is empty or falsy.
"""
if not omm:
raise ValueError("No OMM record provided.")
if isinstance(omm, list):
raise TypeError(
"omm must be a single dictionary, not a list. "
"Pass one record at a time."
)
a = float(omm["SEMIMAJOR_AXIS"]) * 1e3
e = float(omm["ECCENTRICITY"])
i = float(omm["INCLINATION"]) * math.pi / 180
omega = float(omm["ARG_OF_PERICENTER"]) * math.pi / 180
raan = float(omm["RA_OF_ASC_NODE"]) * math.pi / 180
mo = float(omm["MEAN_ANOMALY"]) * math.pi / 180
return a, e, i, omega, raan, mean_to_true_anomaly(e, mo)
[docs]
@staticmethod
def tle_to_TleEphemeris_object(
tle_line_1: str, tle_line_2: str
) -> environment.Tle:
"""
Converts a TLE line pair into a Tudat ``TleEphemeris`` object.
Parameters
----------
tle_line_1 : str
First line of the TLE.
tle_line_2 : str
Second line of the TLE.
Returns
-------
environment.Tle
Configured TleEphemeris object.
"""
return environment.TleEphemeris(
"Earth", "J2000", environment.Tle(tle_line_1, tle_line_2), False
)
[docs]
@staticmethod
def tle_to_Tle_object(
tle_line_1: str, tle_line_2: str
) -> environment.TleEphemeris:
"""
Converts a TLE line pair into a Tudat ``TleEphemeris`` object.
Parameters
----------
tle_line_1 : str
First line of the TLE.
tle_line_2 : str
Second line of the TLE.
Returns
-------
environment.Tle
Tle object.
"""
return environment.Tle(tle_line_1, tle_line_2)
[docs]
@staticmethod
def clean_file(filepath: str) -> None:
"""
Removes duplicate TLE entries from a cache file in-place.
Supports both legacy (plain list) and current (dict with metadata)
formats. When two records share the same EPOCH, the one with the later
``CREATION_DATE`` is kept.
Parameters
----------
filepath : str
Absolute path to the cache file.
Returns
-------
None
"""
if not os.path.exists(filepath):
print(f"File not found: {filepath}")
return
with open(filepath, "r") as f:
content = json.load(f)
if isinstance(content, list):
data, metadata, is_dict = content, {}, False
elif isinstance(content, dict) and "data" in content:
data = content["data"]
metadata = {k: v for k, v in content.items() if k != "data"}
is_dict = True
else:
print(f"Skipping {filepath}: unknown format.")
return
initial_count = len(data)
unique: dict[str, dict] = {}
for entry in data:
epoch = entry.get("EPOCH")
norad_id = entry.get("NORAD_CAT_ID")
if not epoch or not norad_id:
continue
composite_key = f"{norad_id}_{epoch}"
if composite_key not in unique or entry.get("CREATION_DATE", "") > unique[composite_key].get("CREATION_DATE", ""):
unique[composite_key] = entry
cleaned = sorted(unique.values(), key=lambda x: (x["EPOCH"], x["NORAD_CAT_ID"]))
output: dict | list = ({**metadata, "data": cleaned} if is_dict else cleaned)
with open(filepath, "w") as f:
json.dump(output, f, indent=4)
print(f"Cleaned {filepath}. Removed {initial_count - len(cleaned)} duplicates.")