MediaImporter / extract_gpmf_gps.py
Newer Older
186 lines | 5.712kb
Bogdan Timofte authored a week ago
1
#!/usr/bin/env python3
2
"""
3
Extract first valid GPS fix from a GoPro GPMF binary stream.
4

            
5
Two usage modes:
6
  python3 extract_gpmf_gps.py <gpmf.bin>   -- parse pre-extracted binary
7
  python3 extract_gpmf_gps.py <file.mp4>   -- extract GPMF stream via ffmpeg, then parse
8

            
9
Output (stdout):  lat lon alt
10
  lat/lon in decimal degrees (negative = S/W)
11
  alt in metres above sea level
12

            
13
Exit code: 0 if a valid fix was found, 1 otherwise.
14
"""
15

            
16
import struct
17
import sys
18
import os
19
import subprocess
20
import tempfile
21

            
22

            
23
def _iter_klv(data, offset=0):
24
    """Yield (key, type_char, size, repeat, payload) for each KLV record."""
25
    end = len(data)
26
    while offset + 8 <= end:
27
        key = data[offset:offset + 4]
28
        if all(b == 0 for b in key):
29
            break
30
        type_char = chr(data[offset + 4])
31
        size = data[offset + 5]
32
        repeat = struct.unpack(">H", data[offset + 6:offset + 8])[0]
33
        total = size * repeat
34
        payload = data[offset + 8:offset + 8 + total]
35
        yield key, type_char, size, repeat, payload
36
        padded = (total + 3) & ~3
37
        offset += 8 + padded
38

            
39

            
40
def _parse_scal(type_char, payload):
41
    """Return list of scale factors from a SCAL record."""
42
    if type_char in ("s",):
43
        return [struct.unpack(">h", payload[i:i + 2])[0] for i in range(0, len(payload), 2)]
44
    if type_char in ("S",):
45
        return [struct.unpack(">H", payload[i:i + 2])[0] for i in range(0, len(payload), 2)]
46
    if type_char in ("l",):
47
        return [struct.unpack(">i", payload[i:i + 4])[0] for i in range(0, len(payload), 4)]
48
    if type_char in ("L",):
49
        return [struct.unpack(">I", payload[i:i + 4])[0] for i in range(0, len(payload), 4)]
50
    return [1]
51

            
52

            
53
def _first_gps5_fix(strm_payload):
54
    """
55
    Scan a STRM payload for a valid GPS5 fix.
56
    Returns (lat, lon, alt) or None.
57
    Valid = GPSF >= 2 (2D or 3D fix), GPSP <= 500 (DOP*100), non-zero coords.
58
    """
59
    scal = None
60
    gpsf = 0
61
    gpsp = 9999
62

            
63
    for key, tc, size, repeat, payload in _iter_klv(strm_payload):
64
        if key == b"SCAL":
65
            scal = _parse_scal(tc, payload)
66
        elif key == b"GPSF":
67
            try:
68
                gpsf = struct.unpack(">I", payload[:4])[0]
69
            except struct.error:
70
                pass
71
        elif key == b"GPSP":
72
            try:
73
                gpsp = struct.unpack(">H", payload[:2])[0]
74
            except struct.error:
75
                pass
76
        elif key == b"GPS5":
77
            if scal is None or gpsf < 2 or gpsp > 500 or size < 20:
78
                continue
79
            s_lat = scal[0] if len(scal) > 0 else 1
80
            s_lon = scal[1] if len(scal) > 1 else s_lat
81
            s_alt = scal[2] if len(scal) > 2 else 1
82
            for i in range(repeat):
83
                off = i * size
84
                try:
85
                    lat_r, lon_r, alt_r = struct.unpack(">iii", payload[off:off + 12])
86
                except struct.error:
87
                    break
88
                lat = lat_r / s_lat
89
                lon = lon_r / s_lon
90
                alt = alt_r / s_alt
91
                if abs(lat) > 0.001 and abs(lon) > 0.001:
92
                    return lat, lon, alt
93
    return None
94

            
95

            
96
def parse_gpmf(data):
97
    """
98
    Walk all DEVC blocks in a raw GPMF binary and return the first valid GPS fix.
99
    Returns (lat, lon, alt) or None.
100
    """
101
    for key, tc, size, repeat, devc_payload in _iter_klv(data):
102
        if key != b"DEVC":
103
            continue
104
        for skey, stc, ssize, srepeat, strm_payload in _iter_klv(devc_payload):
105
            if skey != b"STRM":
106
                continue
107
            result = _first_gps5_fix(strm_payload)
108
            if result:
109
                return result
110
    return None
111

            
112

            
113
def extract_gpmf_stream(mp4_path):
114
    """
115
    Use ffprobe to find the GPMF stream index, then extract it with ffmpeg.
116
    Returns the binary data or None.
117
    """
118
    try:
119
        probe = subprocess.run(
120
            ["ffprobe", "-v", "quiet", "-show_streams", "-print_format", "json", mp4_path],
121
            capture_output=True, timeout=30
122
        )
123
        import json
124
        streams = json.loads(probe.stdout).get("streams", [])
125
        gpmd_idx = next(
126
            (s["index"] for s in streams if s.get("codec_tag_string") == "gpmd"),
127
            None
128
        )
129
        if gpmd_idx is None:
130
            return None
131
    except Exception:
132
        return None
133

            
134
    with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as tmp:
135
        tmp_path = tmp.name
136

            
137
    try:
138
        subprocess.run(
139
            ["ffmpeg", "-y", "-v", "quiet", "-i", mp4_path,
140
             "-map", f"0:{gpmd_idx}", "-c", "copy", "-f", "rawvideo", tmp_path],
141
            check=True, timeout=60
142
        )
143
        with open(tmp_path, "rb") as f:
144
            return f.read()
145
    except Exception:
146
        return None
147
    finally:
148
        try:
149
            os.unlink(tmp_path)
150
        except OSError:
151
            pass
152

            
153

            
154
def main():
155
    if len(sys.argv) != 2:
156
        print(f"Usage: {sys.argv[0]} <file.bin|file.mp4>", file=sys.stderr)
157
        sys.exit(1)
158

            
159
    path = sys.argv[1]
160
    ext = os.path.splitext(path)[1].lower()
161

            
162
    if ext in (".mp4", ".lrv", ".mov"):
163
        data = extract_gpmf_stream(path)
164
    else:
165
        try:
166
            with open(path, "rb") as f:
167
                data = f.read()
168
        except OSError as e:
169
            print(f"Error reading {path}: {e}", file=sys.stderr)
170
            sys.exit(1)
171

            
172
    if not data:
173
        print("No GPMF data found", file=sys.stderr)
174
        sys.exit(1)
175

            
176
    result = parse_gpmf(data)
177
    if result is None:
178
        print("No valid GPS fix found", file=sys.stderr)
179
        sys.exit(1)
180

            
181
    lat, lon, alt = result
182
    print(f"{lat:.7f} {lon:.7f} {alt:.3f}")
183

            
184

            
185
if __name__ == "__main__":
186
    main()