|
Bogdan Timofte
authored
a week ago
|
1
|
#!/usr/bin/env python3
|
|
|
2
|
"""
|
|
|
3
|
Extract first valid GPS fix from a GoPro GPMF binary stream.
|
|
|
4
|
|
|
|
5
|
Two usage modes:
|
|
|
6
|
python3 extract_gpmf_gps.py <gpmf.bin> -- parse pre-extracted binary
|
|
|
7
|
python3 extract_gpmf_gps.py <file.mp4> -- extract GPMF stream via ffmpeg, then parse
|
|
|
8
|
|
|
|
9
|
Output (stdout): lat lon alt
|
|
|
10
|
lat/lon in decimal degrees (negative = S/W)
|
|
|
11
|
alt in metres above sea level
|
|
|
12
|
|
|
|
13
|
Exit code: 0 if a valid fix was found, 1 otherwise.
|
|
|
14
|
"""
|
|
|
15
|
|
|
|
16
|
import struct
|
|
|
17
|
import sys
|
|
|
18
|
import os
|
|
|
19
|
import subprocess
|
|
|
20
|
import tempfile
|
|
|
21
|
|
|
|
22
|
|
|
|
23
|
def _iter_klv(data, offset=0):
|
|
|
24
|
"""Yield (key, type_char, size, repeat, payload) for each KLV record."""
|
|
|
25
|
end = len(data)
|
|
|
26
|
while offset + 8 <= end:
|
|
|
27
|
key = data[offset:offset + 4]
|
|
|
28
|
if all(b == 0 for b in key):
|
|
|
29
|
break
|
|
|
30
|
type_char = chr(data[offset + 4])
|
|
|
31
|
size = data[offset + 5]
|
|
|
32
|
repeat = struct.unpack(">H", data[offset + 6:offset + 8])[0]
|
|
|
33
|
total = size * repeat
|
|
|
34
|
payload = data[offset + 8:offset + 8 + total]
|
|
|
35
|
yield key, type_char, size, repeat, payload
|
|
|
36
|
padded = (total + 3) & ~3
|
|
|
37
|
offset += 8 + padded
|
|
|
38
|
|
|
|
39
|
|
|
|
40
|
def _parse_scal(type_char, payload):
|
|
|
41
|
"""Return list of scale factors from a SCAL record."""
|
|
|
42
|
if type_char in ("s",):
|
|
|
43
|
return [struct.unpack(">h", payload[i:i + 2])[0] for i in range(0, len(payload), 2)]
|
|
|
44
|
if type_char in ("S",):
|
|
|
45
|
return [struct.unpack(">H", payload[i:i + 2])[0] for i in range(0, len(payload), 2)]
|
|
|
46
|
if type_char in ("l",):
|
|
|
47
|
return [struct.unpack(">i", payload[i:i + 4])[0] for i in range(0, len(payload), 4)]
|
|
|
48
|
if type_char in ("L",):
|
|
|
49
|
return [struct.unpack(">I", payload[i:i + 4])[0] for i in range(0, len(payload), 4)]
|
|
|
50
|
return [1]
|
|
|
51
|
|
|
|
52
|
|
|
|
53
|
def _first_gps5_fix(strm_payload):
|
|
|
54
|
"""
|
|
|
55
|
Scan a STRM payload for a valid GPS5 fix.
|
|
|
56
|
Returns (lat, lon, alt) or None.
|
|
|
57
|
Valid = GPSF >= 2 (2D or 3D fix), GPSP <= 500 (DOP*100), non-zero coords.
|
|
|
58
|
"""
|
|
|
59
|
scal = None
|
|
|
60
|
gpsf = 0
|
|
|
61
|
gpsp = 9999
|
|
|
62
|
|
|
|
63
|
for key, tc, size, repeat, payload in _iter_klv(strm_payload):
|
|
|
64
|
if key == b"SCAL":
|
|
|
65
|
scal = _parse_scal(tc, payload)
|
|
|
66
|
elif key == b"GPSF":
|
|
|
67
|
try:
|
|
|
68
|
gpsf = struct.unpack(">I", payload[:4])[0]
|
|
|
69
|
except struct.error:
|
|
|
70
|
pass
|
|
|
71
|
elif key == b"GPSP":
|
|
|
72
|
try:
|
|
|
73
|
gpsp = struct.unpack(">H", payload[:2])[0]
|
|
|
74
|
except struct.error:
|
|
|
75
|
pass
|
|
|
76
|
elif key == b"GPS5":
|
|
|
77
|
if scal is None or gpsf < 2 or gpsp > 500 or size < 20:
|
|
|
78
|
continue
|
|
|
79
|
s_lat = scal[0] if len(scal) > 0 else 1
|
|
|
80
|
s_lon = scal[1] if len(scal) > 1 else s_lat
|
|
|
81
|
s_alt = scal[2] if len(scal) > 2 else 1
|
|
|
82
|
for i in range(repeat):
|
|
|
83
|
off = i * size
|
|
|
84
|
try:
|
|
|
85
|
lat_r, lon_r, alt_r = struct.unpack(">iii", payload[off:off + 12])
|
|
|
86
|
except struct.error:
|
|
|
87
|
break
|
|
|
88
|
lat = lat_r / s_lat
|
|
|
89
|
lon = lon_r / s_lon
|
|
|
90
|
alt = alt_r / s_alt
|
|
|
91
|
if abs(lat) > 0.001 and abs(lon) > 0.001:
|
|
|
92
|
return lat, lon, alt
|
|
|
93
|
return None
|
|
|
94
|
|
|
|
95
|
|
|
|
96
|
def parse_gpmf(data):
|
|
|
97
|
"""
|
|
|
98
|
Walk all DEVC blocks in a raw GPMF binary and return the first valid GPS fix.
|
|
|
99
|
Returns (lat, lon, alt) or None.
|
|
|
100
|
"""
|
|
|
101
|
for key, tc, size, repeat, devc_payload in _iter_klv(data):
|
|
|
102
|
if key != b"DEVC":
|
|
|
103
|
continue
|
|
|
104
|
for skey, stc, ssize, srepeat, strm_payload in _iter_klv(devc_payload):
|
|
|
105
|
if skey != b"STRM":
|
|
|
106
|
continue
|
|
|
107
|
result = _first_gps5_fix(strm_payload)
|
|
|
108
|
if result:
|
|
|
109
|
return result
|
|
|
110
|
return None
|
|
|
111
|
|
|
|
112
|
|
|
|
113
|
def extract_gpmf_stream(mp4_path):
|
|
|
114
|
"""
|
|
|
115
|
Use ffprobe to find the GPMF stream index, then extract it with ffmpeg.
|
|
|
116
|
Returns the binary data or None.
|
|
|
117
|
"""
|
|
|
118
|
try:
|
|
|
119
|
probe = subprocess.run(
|
|
|
120
|
["ffprobe", "-v", "quiet", "-show_streams", "-print_format", "json", mp4_path],
|
|
|
121
|
capture_output=True, timeout=30
|
|
|
122
|
)
|
|
|
123
|
import json
|
|
|
124
|
streams = json.loads(probe.stdout).get("streams", [])
|
|
|
125
|
gpmd_idx = next(
|
|
|
126
|
(s["index"] for s in streams if s.get("codec_tag_string") == "gpmd"),
|
|
|
127
|
None
|
|
|
128
|
)
|
|
|
129
|
if gpmd_idx is None:
|
|
|
130
|
return None
|
|
|
131
|
except Exception:
|
|
|
132
|
return None
|
|
|
133
|
|
|
|
134
|
with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as tmp:
|
|
|
135
|
tmp_path = tmp.name
|
|
|
136
|
|
|
|
137
|
try:
|
|
|
138
|
subprocess.run(
|
|
|
139
|
["ffmpeg", "-y", "-v", "quiet", "-i", mp4_path,
|
|
|
140
|
"-map", f"0:{gpmd_idx}", "-c", "copy", "-f", "rawvideo", tmp_path],
|
|
|
141
|
check=True, timeout=60
|
|
|
142
|
)
|
|
|
143
|
with open(tmp_path, "rb") as f:
|
|
|
144
|
return f.read()
|
|
|
145
|
except Exception:
|
|
|
146
|
return None
|
|
|
147
|
finally:
|
|
|
148
|
try:
|
|
|
149
|
os.unlink(tmp_path)
|
|
|
150
|
except OSError:
|
|
|
151
|
pass
|
|
|
152
|
|
|
|
153
|
|
|
|
154
|
def main():
|
|
|
155
|
if len(sys.argv) != 2:
|
|
|
156
|
print(f"Usage: {sys.argv[0]} <file.bin|file.mp4>", file=sys.stderr)
|
|
|
157
|
sys.exit(1)
|
|
|
158
|
|
|
|
159
|
path = sys.argv[1]
|
|
|
160
|
ext = os.path.splitext(path)[1].lower()
|
|
|
161
|
|
|
|
162
|
if ext in (".mp4", ".lrv", ".mov"):
|
|
|
163
|
data = extract_gpmf_stream(path)
|
|
|
164
|
else:
|
|
|
165
|
try:
|
|
|
166
|
with open(path, "rb") as f:
|
|
|
167
|
data = f.read()
|
|
|
168
|
except OSError as e:
|
|
|
169
|
print(f"Error reading {path}: {e}", file=sys.stderr)
|
|
|
170
|
sys.exit(1)
|
|
|
171
|
|
|
|
172
|
if not data:
|
|
|
173
|
print("No GPMF data found", file=sys.stderr)
|
|
|
174
|
sys.exit(1)
|
|
|
175
|
|
|
|
176
|
result = parse_gpmf(data)
|
|
|
177
|
if result is None:
|
|
|
178
|
print("No valid GPS fix found", file=sys.stderr)
|
|
|
179
|
sys.exit(1)
|
|
|
180
|
|
|
|
181
|
lat, lon, alt = result
|
|
|
182
|
print(f"{lat:.7f} {lon:.7f} {alt:.3f}")
|
|
|
183
|
|
|
|
184
|
|
|
|
185
|
if __name__ == "__main__":
|
|
|
186
|
main()
|