1 contributor
#!/usr/bin/env python3
"""
Extract first valid GPS fix from a GoPro GPMF binary stream.
Two usage modes:
python3 extract_gpmf_gps.py <gpmf.bin> -- parse pre-extracted binary
python3 extract_gpmf_gps.py <file.mp4> -- extract GPMF stream via ffmpeg, then parse
Output (stdout): lat lon alt
lat/lon in decimal degrees (negative = S/W)
alt in metres above sea level
Exit code: 0 if a valid fix was found, 1 otherwise.
"""
import struct
import sys
import os
import subprocess
import tempfile
def _iter_klv(data, offset=0):
"""Yield (key, type_char, size, repeat, payload) for each KLV record."""
end = len(data)
while offset + 8 <= end:
key = data[offset:offset + 4]
if all(b == 0 for b in key):
break
type_char = chr(data[offset + 4])
size = data[offset + 5]
repeat = struct.unpack(">H", data[offset + 6:offset + 8])[0]
total = size * repeat
payload = data[offset + 8:offset + 8 + total]
yield key, type_char, size, repeat, payload
padded = (total + 3) & ~3
offset += 8 + padded
def _parse_scal(type_char, payload):
"""Return list of scale factors from a SCAL record."""
if type_char in ("s",):
return [struct.unpack(">h", payload[i:i + 2])[0] for i in range(0, len(payload), 2)]
if type_char in ("S",):
return [struct.unpack(">H", payload[i:i + 2])[0] for i in range(0, len(payload), 2)]
if type_char in ("l",):
return [struct.unpack(">i", payload[i:i + 4])[0] for i in range(0, len(payload), 4)]
if type_char in ("L",):
return [struct.unpack(">I", payload[i:i + 4])[0] for i in range(0, len(payload), 4)]
return [1]
def _first_gps5_fix(strm_payload):
"""
Scan a STRM payload for a valid GPS5 fix.
Returns (lat, lon, alt) or None.
Valid = GPSF >= 2 (2D or 3D fix), GPSP <= 500 (DOP*100), non-zero coords.
"""
scal = None
gpsf = 0
gpsp = 9999
for key, tc, size, repeat, payload in _iter_klv(strm_payload):
if key == b"SCAL":
scal = _parse_scal(tc, payload)
elif key == b"GPSF":
try:
gpsf = struct.unpack(">I", payload[:4])[0]
except struct.error:
pass
elif key == b"GPSP":
try:
gpsp = struct.unpack(">H", payload[:2])[0]
except struct.error:
pass
elif key == b"GPS5":
if scal is None or gpsf < 2 or gpsp > 500 or size < 20:
continue
s_lat = scal[0] if len(scal) > 0 else 1
s_lon = scal[1] if len(scal) > 1 else s_lat
s_alt = scal[2] if len(scal) > 2 else 1
for i in range(repeat):
off = i * size
try:
lat_r, lon_r, alt_r = struct.unpack(">iii", payload[off:off + 12])
except struct.error:
break
lat = lat_r / s_lat
lon = lon_r / s_lon
alt = alt_r / s_alt
if abs(lat) > 0.001 and abs(lon) > 0.001:
return lat, lon, alt
return None
def parse_gpmf(data):
"""
Walk all DEVC blocks in a raw GPMF binary and return the first valid GPS fix.
Returns (lat, lon, alt) or None.
"""
for key, tc, size, repeat, devc_payload in _iter_klv(data):
if key != b"DEVC":
continue
for skey, stc, ssize, srepeat, strm_payload in _iter_klv(devc_payload):
if skey != b"STRM":
continue
result = _first_gps5_fix(strm_payload)
if result:
return result
return None
def extract_gpmf_stream(mp4_path):
"""
Use ffprobe to find the GPMF stream index, then extract it with ffmpeg.
Returns the binary data or None.
"""
try:
probe = subprocess.run(
["ffprobe", "-v", "quiet", "-show_streams", "-print_format", "json", mp4_path],
capture_output=True, timeout=30
)
import json
streams = json.loads(probe.stdout).get("streams", [])
gpmd_idx = next(
(s["index"] for s in streams if s.get("codec_tag_string") == "gpmd"),
None
)
if gpmd_idx is None:
return None
except Exception:
return None
with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as tmp:
tmp_path = tmp.name
try:
subprocess.run(
["ffmpeg", "-y", "-v", "quiet", "-i", mp4_path,
"-map", f"0:{gpmd_idx}", "-c", "copy", "-f", "rawvideo", tmp_path],
check=True, timeout=60
)
with open(tmp_path, "rb") as f:
return f.read()
except Exception:
return None
finally:
try:
os.unlink(tmp_path)
except OSError:
pass
def main():
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <file.bin|file.mp4>", file=sys.stderr)
sys.exit(1)
path = sys.argv[1]
ext = os.path.splitext(path)[1].lower()
if ext in (".mp4", ".lrv", ".mov"):
data = extract_gpmf_stream(path)
else:
try:
with open(path, "rb") as f:
data = f.read()
except OSError as e:
print(f"Error reading {path}: {e}", file=sys.stderr)
sys.exit(1)
if not data:
print("No GPMF data found", file=sys.stderr)
sys.exit(1)
result = parse_gpmf(data)
if result is None:
print("No valid GPS fix found", file=sys.stderr)
sys.exit(1)
lat, lon, alt = result
print(f"{lat:.7f} {lon:.7f} {alt:.3f}")
if __name__ == "__main__":
main()