#!/usr/bin/env python3 """ Extract first valid GPS fix from a GoPro GPMF binary stream. Two usage modes: python3 extract_gpmf_gps.py -- parse pre-extracted binary python3 extract_gpmf_gps.py -- extract GPMF stream via ffmpeg, then parse Output (stdout): lat lon alt lat/lon in decimal degrees (negative = S/W) alt in metres above sea level Exit code: 0 if a valid fix was found, 1 otherwise. """ import struct import sys import os import subprocess import tempfile def _iter_klv(data, offset=0): """Yield (key, type_char, size, repeat, payload) for each KLV record.""" end = len(data) while offset + 8 <= end: key = data[offset:offset + 4] if all(b == 0 for b in key): break type_char = chr(data[offset + 4]) size = data[offset + 5] repeat = struct.unpack(">H", data[offset + 6:offset + 8])[0] total = size * repeat payload = data[offset + 8:offset + 8 + total] yield key, type_char, size, repeat, payload padded = (total + 3) & ~3 offset += 8 + padded def _parse_scal(type_char, payload): """Return list of scale factors from a SCAL record.""" if type_char in ("s",): return [struct.unpack(">h", payload[i:i + 2])[0] for i in range(0, len(payload), 2)] if type_char in ("S",): return [struct.unpack(">H", payload[i:i + 2])[0] for i in range(0, len(payload), 2)] if type_char in ("l",): return [struct.unpack(">i", payload[i:i + 4])[0] for i in range(0, len(payload), 4)] if type_char in ("L",): return [struct.unpack(">I", payload[i:i + 4])[0] for i in range(0, len(payload), 4)] return [1] def _first_gps5_fix(strm_payload): """ Scan a STRM payload for a valid GPS5 fix. Returns (lat, lon, alt) or None. Valid = GPSF >= 2 (2D or 3D fix), GPSP <= 500 (DOP*100), non-zero coords. """ scal = None gpsf = 0 gpsp = 9999 for key, tc, size, repeat, payload in _iter_klv(strm_payload): if key == b"SCAL": scal = _parse_scal(tc, payload) elif key == b"GPSF": try: gpsf = struct.unpack(">I", payload[:4])[0] except struct.error: pass elif key == b"GPSP": try: gpsp = struct.unpack(">H", payload[:2])[0] except struct.error: pass elif key == b"GPS5": if scal is None or gpsf < 2 or gpsp > 500 or size < 20: continue s_lat = scal[0] if len(scal) > 0 else 1 s_lon = scal[1] if len(scal) > 1 else s_lat s_alt = scal[2] if len(scal) > 2 else 1 for i in range(repeat): off = i * size try: lat_r, lon_r, alt_r = struct.unpack(">iii", payload[off:off + 12]) except struct.error: break lat = lat_r / s_lat lon = lon_r / s_lon alt = alt_r / s_alt if abs(lat) > 0.001 and abs(lon) > 0.001: return lat, lon, alt return None def parse_gpmf(data): """ Walk all DEVC blocks in a raw GPMF binary and return the first valid GPS fix. Returns (lat, lon, alt) or None. """ for key, tc, size, repeat, devc_payload in _iter_klv(data): if key != b"DEVC": continue for skey, stc, ssize, srepeat, strm_payload in _iter_klv(devc_payload): if skey != b"STRM": continue result = _first_gps5_fix(strm_payload) if result: return result return None def extract_gpmf_stream(mp4_path): """ Use ffprobe to find the GPMF stream index, then extract it with ffmpeg. Returns the binary data or None. """ try: probe = subprocess.run( ["ffprobe", "-v", "quiet", "-show_streams", "-print_format", "json", mp4_path], capture_output=True, timeout=30 ) import json streams = json.loads(probe.stdout).get("streams", []) gpmd_idx = next( (s["index"] for s in streams if s.get("codec_tag_string") == "gpmd"), None ) if gpmd_idx is None: return None except Exception: return None with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as tmp: tmp_path = tmp.name try: subprocess.run( ["ffmpeg", "-y", "-v", "quiet", "-i", mp4_path, "-map", f"0:{gpmd_idx}", "-c", "copy", "-f", "rawvideo", tmp_path], check=True, timeout=60 ) with open(tmp_path, "rb") as f: return f.read() except Exception: return None finally: try: os.unlink(tmp_path) except OSError: pass def main(): if len(sys.argv) != 2: print(f"Usage: {sys.argv[0]} ", file=sys.stderr) sys.exit(1) path = sys.argv[1] ext = os.path.splitext(path)[1].lower() if ext in (".mp4", ".lrv", ".mov"): data = extract_gpmf_stream(path) else: try: with open(path, "rb") as f: data = f.read() except OSError as e: print(f"Error reading {path}: {e}", file=sys.stderr) sys.exit(1) if not data: print("No GPMF data found", file=sys.stderr) sys.exit(1) result = parse_gpmf(data) if result is None: print("No valid GPS fix found", file=sys.stderr) sys.exit(1) lat, lon, alt = result print(f"{lat:.7f} {lon:.7f} {alt:.3f}") if __name__ == "__main__": main()