MediaImporter / extract_gpmf_gps.py
1 contributor
186 lines | 5.712kb
#!/usr/bin/env python3
"""
Extract first valid GPS fix from a GoPro GPMF binary stream.

Two usage modes:
  python3 extract_gpmf_gps.py <gpmf.bin>   -- parse pre-extracted binary
  python3 extract_gpmf_gps.py <file.mp4>   -- extract GPMF stream via ffmpeg, then parse

Output (stdout):  lat lon alt
  lat/lon in decimal degrees (negative = S/W)
  alt in metres above sea level

Exit code: 0 if a valid fix was found, 1 otherwise.
"""

import struct
import sys
import os
import subprocess
import tempfile


def _iter_klv(data, offset=0):
    """Yield (key, type_char, size, repeat, payload) for each KLV record."""
    end = len(data)
    while offset + 8 <= end:
        key = data[offset:offset + 4]
        if all(b == 0 for b in key):
            break
        type_char = chr(data[offset + 4])
        size = data[offset + 5]
        repeat = struct.unpack(">H", data[offset + 6:offset + 8])[0]
        total = size * repeat
        payload = data[offset + 8:offset + 8 + total]
        yield key, type_char, size, repeat, payload
        padded = (total + 3) & ~3
        offset += 8 + padded


def _parse_scal(type_char, payload):
    """Return list of scale factors from a SCAL record."""
    if type_char in ("s",):
        return [struct.unpack(">h", payload[i:i + 2])[0] for i in range(0, len(payload), 2)]
    if type_char in ("S",):
        return [struct.unpack(">H", payload[i:i + 2])[0] for i in range(0, len(payload), 2)]
    if type_char in ("l",):
        return [struct.unpack(">i", payload[i:i + 4])[0] for i in range(0, len(payload), 4)]
    if type_char in ("L",):
        return [struct.unpack(">I", payload[i:i + 4])[0] for i in range(0, len(payload), 4)]
    return [1]


def _first_gps5_fix(strm_payload):
    """
    Scan a STRM payload for a valid GPS5 fix.
    Returns (lat, lon, alt) or None.
    Valid = GPSF >= 2 (2D or 3D fix), GPSP <= 500 (DOP*100), non-zero coords.
    """
    scal = None
    gpsf = 0
    gpsp = 9999

    for key, tc, size, repeat, payload in _iter_klv(strm_payload):
        if key == b"SCAL":
            scal = _parse_scal(tc, payload)
        elif key == b"GPSF":
            try:
                gpsf = struct.unpack(">I", payload[:4])[0]
            except struct.error:
                pass
        elif key == b"GPSP":
            try:
                gpsp = struct.unpack(">H", payload[:2])[0]
            except struct.error:
                pass
        elif key == b"GPS5":
            if scal is None or gpsf < 2 or gpsp > 500 or size < 20:
                continue
            s_lat = scal[0] if len(scal) > 0 else 1
            s_lon = scal[1] if len(scal) > 1 else s_lat
            s_alt = scal[2] if len(scal) > 2 else 1
            for i in range(repeat):
                off = i * size
                try:
                    lat_r, lon_r, alt_r = struct.unpack(">iii", payload[off:off + 12])
                except struct.error:
                    break
                lat = lat_r / s_lat
                lon = lon_r / s_lon
                alt = alt_r / s_alt
                if abs(lat) > 0.001 and abs(lon) > 0.001:
                    return lat, lon, alt
    return None


def parse_gpmf(data):
    """
    Walk all DEVC blocks in a raw GPMF binary and return the first valid GPS fix.
    Returns (lat, lon, alt) or None.
    """
    for key, tc, size, repeat, devc_payload in _iter_klv(data):
        if key != b"DEVC":
            continue
        for skey, stc, ssize, srepeat, strm_payload in _iter_klv(devc_payload):
            if skey != b"STRM":
                continue
            result = _first_gps5_fix(strm_payload)
            if result:
                return result
    return None


def extract_gpmf_stream(mp4_path):
    """
    Use ffprobe to find the GPMF stream index, then extract it with ffmpeg.
    Returns the binary data or None.
    """
    try:
        probe = subprocess.run(
            ["ffprobe", "-v", "quiet", "-show_streams", "-print_format", "json", mp4_path],
            capture_output=True, timeout=30
        )
        import json
        streams = json.loads(probe.stdout).get("streams", [])
        gpmd_idx = next(
            (s["index"] for s in streams if s.get("codec_tag_string") == "gpmd"),
            None
        )
        if gpmd_idx is None:
            return None
    except Exception:
        return None

    with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as tmp:
        tmp_path = tmp.name

    try:
        subprocess.run(
            ["ffmpeg", "-y", "-v", "quiet", "-i", mp4_path,
             "-map", f"0:{gpmd_idx}", "-c", "copy", "-f", "rawvideo", tmp_path],
            check=True, timeout=60
        )
        with open(tmp_path, "rb") as f:
            return f.read()
    except Exception:
        return None
    finally:
        try:
            os.unlink(tmp_path)
        except OSError:
            pass


def main():
    if len(sys.argv) != 2:
        print(f"Usage: {sys.argv[0]} <file.bin|file.mp4>", file=sys.stderr)
        sys.exit(1)

    path = sys.argv[1]
    ext = os.path.splitext(path)[1].lower()

    if ext in (".mp4", ".lrv", ".mov"):
        data = extract_gpmf_stream(path)
    else:
        try:
            with open(path, "rb") as f:
                data = f.read()
        except OSError as e:
            print(f"Error reading {path}: {e}", file=sys.stderr)
            sys.exit(1)

    if not data:
        print("No GPMF data found", file=sys.stderr)
        sys.exit(1)

    result = parse_gpmf(data)
    if result is None:
        print("No valid GPS fix found", file=sys.stderr)
        sys.exit(1)

    lat, lon, alt = result
    print(f"{lat:.7f} {lon:.7f} {alt:.3f}")


if __name__ == "__main__":
    main()