1 contributor
#!/usr/bin/env perl
#
# mdns_host_seed.pl - Collect mDNS A records into the host source database.
#
use strict;
use warnings;
use Cwd qw(abs_path);
use File::Basename qw(dirname);
use File::Path qw(make_path);
use Getopt::Long qw(GetOptions);
use IO::Socket::INET;
use POSIX qw(strftime);
use Socket qw(IPPROTO_IP IP_ADD_MEMBERSHIP SO_REUSEADDR inet_aton inet_ntoa sockaddr_in);
use DBI;
my $script_dir = dirname(abs_path($0));
my $project_dir = dirname($script_dir);
my %opt = (
db => $ENV{HOST_MANAGER_DB} || "$project_dir/var/host-manager.sqlite",
worker_id => $ENV{HOST_MANAGER_MDNS_WORKER_ID} || 'mdns-listener',
bind => $ENV{HOST_MANAGER_MDNS_BIND} || '0.0.0.0',
group => $ENV{HOST_MANAGER_MDNS_GROUP} || '224.0.0.251',
port => $ENV{HOST_MANAGER_MDNS_PORT} || 5353,
once => 0,
timeout => 0,
dry_run => 0,
verbose => 0,
);
GetOptions(
'db=s' => \$opt{db},
'worker-id=s' => \$opt{worker_id},
'bind=s' => \$opt{bind},
'group=s' => \$opt{group},
'port=i' => \$opt{port},
'once' => \$opt{once},
'timeout=i' => \$opt{timeout},
'dry-run' => \$opt{dry_run},
'verbose' => \$opt{verbose},
'help|h' => sub { usage(); exit 0; },
) or do {
usage();
exit 2;
};
my $socket = IO::Socket::INET->new(
LocalAddr => $opt{bind},
LocalPort => $opt{port},
Proto => 'udp',
ReuseAddr => 1,
) or die "Cannot listen for mDNS on $opt{bind}:$opt{port}: $!\n";
setsockopt($socket, Socket::SOL_SOCKET(), SO_REUSEADDR, pack('i', 1));
setsockopt($socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, inet_aton($opt{group}) . inet_aton($opt{bind}))
or die "Cannot join mDNS multicast group $opt{group}: $!\n";
print "mdns host seed listening on udp://$opt{bind}:$opt{port} group $opt{group}\n" if $opt{verbose};
my $dbh = open_database(\%opt);
print "mDNS SQLite database: $opt{db}\n" if $opt{verbose};
print "mDNS worker id: $opt{worker_id}\n" if $opt{verbose};
my $deadline = $opt{timeout} ? time() + $opt{timeout} : 0;
while (1) {
my $timeout = undef;
if ($deadline) {
$timeout = $deadline - time();
last if $timeout <= 0;
}
my $rin = '';
vec($rin, fileno($socket), 1) = 1;
my $ready = select(my $rout = $rin, undef, undef, $timeout);
last if !defined($ready) || $ready <= 0;
my $peer = recv($socket, my $packet, 9000, 0);
next unless defined $peer && length $packet;
my ($peer_port, $peer_addr) = sockaddr_in($peer);
my $peer_ip = inet_ntoa($peer_addr);
my @records = grep { record_is_usable($_) } parse_mdns_packet($packet);
next unless @records;
my $changed = seed_observations($dbh, \%opt, \@records, $peer_ip);
print "stored " . scalar(@$changed) . " observation change(s)\n" if $opt{verbose} && @$changed;
last if $opt{once} && @$changed;
}
exit 0;
sub usage {
print <<"EOF";
Usage: perl scripts/mdns_host_seed.pl [options]
Options:
--db path SQLite database. Defaults to var/host-manager.sqlite.
--worker-id id data_workers.worker_id. Defaults to mdns-listener.
--bind addr Local bind address. Defaults to 0.0.0.0.
--group addr Multicast group. Defaults to 224.0.0.251.
--port n UDP port. Defaults to 5353.
--once Exit after the first stored observation.
--timeout n Exit after n seconds.
--dry-run Print proposed observation changes without writing.
--verbose Print listener and change details.
Only A records for private/link-local .local names are collected. Observations
are upserted into SQLite table mdns_observations. hosts.yaml is not modified.
EOF
}
sub parse_mdns_packet {
my ($packet) = @_;
return () if length($packet) < 12;
my ($id, $flags, $qd, $an, $ns, $ar) = unpack('n6', substr($packet, 0, 12));
my $offset = 12;
for (1 .. $qd) {
my ($name, $next) = read_dns_name($packet, $offset);
last unless defined $name && $next + 4 <= length($packet);
$offset = $next + 4;
}
my @records;
for (1 .. ($an + $ns + $ar)) {
my ($name, $next) = read_dns_name($packet, $offset);
last unless defined $name && $next + 10 <= length($packet);
my ($type, $class, $ttl, $rdlen) = unpack('nnNn', substr($packet, $next, 10));
my $rdata_offset = $next + 10;
last if $rdata_offset + $rdlen > length($packet);
if ($type == 1 && $rdlen == 4) {
push @records, {
name => lc($name),
ip => inet_ntoa(substr($packet, $rdata_offset, 4)),
ttl => $ttl,
};
}
$offset = $rdata_offset + $rdlen;
}
return @records;
}
sub read_dns_name {
my ($packet, $offset) = @_;
my @labels;
my $pos = $offset;
my $next;
my %seen;
while (1) {
return unless $pos < length($packet);
return if $seen{$pos}++;
my $len = ord(substr($packet, $pos, 1));
if (($len & 0xc0) == 0xc0) {
return unless $pos + 1 < length($packet);
my $ptr = unpack('n', substr($packet, $pos, 2)) & 0x3fff;
$next = $pos + 2 unless defined $next;
$pos = $ptr;
next;
}
if ($len == 0) {
$next = $pos + 1 unless defined $next;
last;
}
return if ($len & 0xc0) || $pos + 1 + $len > length($packet);
push @labels, substr($packet, $pos + 1, $len);
$pos += 1 + $len;
}
return (join('.', @labels), $next);
}
sub record_is_usable {
my ($record) = @_;
return 0 unless ($record->{name} || '') =~ /\A[a-z0-9][a-z0-9_.-]*\.local\z/;
return 0 unless ip_is_observable($record->{ip} || '');
return 1;
}
sub ip_is_observable {
my ($ip) = @_;
return $ip =~ /\A192\.168\.\d+\.\d+\z/
|| $ip =~ /\A10\.\d+\.\d+\.\d+\z/
|| $ip =~ /\A172\.(?:1[6-9]|2\d|3[01])\.\d+\.\d+\z/
|| $ip =~ /\A169\.254\.\d+\.\d+\z/;
}
sub open_database {
my ($opt) = @_;
ensure_parent_dir($opt->{db});
my $dbh = DBI->connect(
"dbi:SQLite:dbname=$opt->{db}",
'',
'',
{
RaiseError => 1,
PrintError => 0,
AutoCommit => 1,
sqlite_unicode => 1,
},
) or die "Cannot open SQLite database $opt->{db}\n";
$dbh->do('PRAGMA journal_mode = WAL');
$dbh->do('PRAGMA foreign_keys = ON');
ensure_runtime_schema_exists($dbh);
upsert_worker($dbh, $opt->{worker_id});
return $dbh;
}
sub ensure_runtime_schema_exists {
my ($dbh) = @_;
for my $table (qw(data_workers mdns_observations)) {
my ($exists) = $dbh->selectrow_array(
"SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = ?",
undef,
$table,
);
die "Missing SQLite table $table; start host-manager once to initialize schema\n" unless $exists;
}
}
sub upsert_worker {
my ($dbh, $worker_id) = @_;
my $now = iso_now();
$dbh->do(
'INSERT INTO data_workers (worker_id, worker_type, name, status, source, last_run_at, notes, created_at, updated_at) '
. "VALUES (?, 'mdns', 'mDNS listener', 'active', 'udp://224.0.0.251:5353', ?, 'mDNS observation collector source.', ?, ?) "
. 'ON CONFLICT(worker_id) DO UPDATE SET worker_type = excluded.worker_type, name = excluded.name, status = excluded.status, '
. 'source = excluded.source, last_run_at = excluded.last_run_at, notes = excluded.notes, updated_at = excluded.updated_at',
undef,
$worker_id,
$now,
$now,
$now,
);
}
sub seed_observations {
my ($dbh, $opt, $records, $peer_ip) = @_;
my @changes;
for my $record (@$records) {
my $change = merge_observation($dbh, $opt, $record, $peer_ip);
push @changes, $change if $change;
}
if (@changes && $opt->{dry_run}) {
print change_line($_) . "\n" for @changes;
}
return \@changes;
}
sub merge_observation {
my ($dbh, $opt, $record, $peer_ip) = @_;
my $now = iso_now();
my $key = "$record->{name}|$record->{ip}";
my $existing = $dbh->selectrow_hashref(
'SELECT observation_key, seen_count FROM mdns_observations WHERE observation_key = ?',
undef,
$key,
);
my $raw = raw_observation($record, $peer_ip);
if (!$existing) {
unless ($opt->{dry_run}) {
$dbh->do(
'INSERT INTO mdns_observations '
. '(observation_key, worker_id, host_fqdn, observed_name, ip_address, rr_type, ttl, first_seen, last_seen, seen_count, last_peer, raw) '
. "VALUES (?, ?, NULL, ?, ?, 'A', ?, ?, ?, 1, ?, ?)",
undef,
$key,
$opt->{worker_id},
$record->{name},
$record->{ip},
int($record->{ttl} || 0),
$now,
$now,
$peer_ip,
$raw,
);
}
return { action => 'created', key => $key, name => $record->{name}, ip => $record->{ip} };
}
unless ($opt->{dry_run}) {
$dbh->do(
'UPDATE mdns_observations SET ttl = ?, last_seen = ?, seen_count = seen_count + 1, last_peer = ?, raw = ? WHERE observation_key = ?',
undef,
int($record->{ttl} || 0),
$now,
$peer_ip,
$raw,
$key,
);
}
return { action => 'updated', key => $key, name => $record->{name}, ip => $record->{ip} };
}
sub raw_observation {
my ($record, $peer_ip) = @_;
return join(' ', (
'source=mdns',
'rr_type=A',
'name=' . ($record->{name} || ''),
'ip=' . ($record->{ip} || ''),
'ttl=' . int($record->{ttl} || 0),
'peer=' . ($peer_ip || ''),
));
}
sub ensure_parent_dir {
my ($path) = @_;
my $dir = dirname($path);
make_path($dir) unless -d $dir;
}
sub change_line {
my ($change) = @_;
return join(' ', map { "$_=$change->{$_}" } sort keys %$change);
}
sub iso_now {
return strftime('%Y-%m-%dT%H:%M:%SZ', gmtime);
}