Newer Older
829 lines | 31.293kb
Bogdan Timofte authored 2 weeks ago
1
module.exports = function(RED) {
2
  function Z2MSNZB05PHomeBusNode(config) {
3
    RED.nodes.createNode(this, config);
4
    var node = this;
5

            
6
    node.adapterId = "z2m-snzb-05p";
7
    node.sourceTopic = "zigbee2mqtt/SNZB-05P/#";
8
    node.subscriptionStarted = false;
9
    node.startTimer = null;
10
    node.site = normalizeToken(config.site || config.mqttSite);
11
    node.legacyRoom = normalizeToken(config.mqttRoom);
12
    node.legacySensor = normalizeToken(config.mqttSensor);
13
    node.publishCache = Object.create(null);
14
    node.retainedCache = Object.create(null);
15
    node.seenOptionalCapabilities = Object.create(null);
16
    node.detectedDevices = Object.create(null);
17
    node.stats = {
18
      processed_inputs: 0,
19
      devices_detected: 0,
20
      home_messages: 0,
21
      last_messages: 0,
22
      meta_messages: 0,
23
      home_availability_messages: 0,
24
      operational_messages: 0,
25
      invalid_messages: 0,
26
      invalid_topics: 0,
27
      invalid_payloads: 0,
28
      unmapped_messages: 0,
29
      adapter_exceptions: 0,
30
      errors: 0,
31
      dlq: 0
32
    };
33
    node.statsPublishEvery = 25;
34
    node.batteryLowThreshold = Number(config.batteryLowThreshold);
35
    if (!Number.isFinite(node.batteryLowThreshold)) node.batteryLowThreshold = 20;
36

            
37
    var CAPABILITY_MAPPINGS = [
38
      {
39
        sourceSystem: "zigbee2mqtt",
40
        sourceTopicMatch: "zigbee2mqtt/SNZB-05P/<site>/<location>/<device_id>",
41
        sourceFields: ["water_leak", "waterLeak", "leak"],
42
        targetBus: "home",
43
        targetCapability: "water_leak",
44
        core: true,
45
        stream: "value",
46
        payloadProfile: "scalar",
47
        dataType: "boolean",
48
        historianMode: "state",
49
        historianEnabled: true,
50
        read: function(payload) {
51
          return readBool(payload, this.sourceFields);
52
        }
53
      },
54
      {
55
        sourceSystem: "zigbee2mqtt",
56
        sourceTopicMatch: "zigbee2mqtt/SNZB-05P/<site>/<location>/<device_id>",
57
        sourceFields: ["battery"],
58
        targetBus: "home",
59
        targetCapability: "battery",
60
        core: true,
61
        stream: "value",
62
        payloadProfile: "scalar",
63
        dataType: "number",
64
        unit: "%",
65
        precision: 1,
66
        historianMode: "sample",
67
        historianEnabled: true,
68
        read: function(payload) {
69
          var value = readNumber(payload, this.sourceFields[0]);
70
          if (value === undefined) return undefined;
71
          return clamp(Math.round(value), 0, 100);
72
        }
73
      },
74
      {
75
        sourceSystem: "zigbee2mqtt",
76
        sourceTopicMatch: "zigbee2mqtt/SNZB-05P/<site>/<location>/<device_id>",
77
        sourceFields: ["battery_low", "batteryLow", "battery"],
78
        targetBus: "home",
79
        targetCapability: "battery_low",
80
        core: true,
81
        stream: "value",
82
        payloadProfile: "scalar",
83
        dataType: "boolean",
84
        historianMode: "state",
85
        historianEnabled: true,
86
        read: function(payload) {
87
          var raw = readBool(payload, this.sourceFields.slice(0, 2));
88
          if (raw !== undefined) return raw;
89
          var battery = readNumber(payload, this.sourceFields[2]);
90
          if (battery === undefined) return undefined;
91
          return battery <= node.batteryLowThreshold;
92
        }
93
      },
94
      {
95
        sourceSystem: "zigbee2mqtt",
96
        sourceTopicMatch: "zigbee2mqtt/SNZB-05P/<site>/<location>/<device_id>",
97
        sourceFields: ["tamper", "tampered", "tamper_alarm", "alarm_tamper"],
98
        targetBus: "home",
99
        targetCapability: "tamper",
100
        core: false,
101
        stream: "value",
102
        payloadProfile: "scalar",
103
        dataType: "boolean",
104
        historianMode: "state",
105
        historianEnabled: true,
106
        read: function(payload) {
107
          return readBool(payload, this.sourceFields);
108
        }
109
      }
110
    ];
111

            
112
    function normalizeToken(value) {
113
      if (value === undefined || value === null) return "";
114
      return String(value).trim();
115
    }
116

            
117
    function transliterate(value) {
118
      var s = normalizeToken(value);
119
      if (!s) return "";
120
      if (typeof s.normalize === "function") {
121
        s = s.normalize("NFKD").replace(/[\u0300-\u036f]/g, "");
122
      }
123
      return s;
124
    }
125

            
126
    function toKebabCase(value, fallback) {
127
      var s = transliterate(value).toLowerCase().replace(/[^a-z0-9]+/g, "-");
128
      s = s.replace(/^-+|-+$/g, "").replace(/-{2,}/g, "-");
129
      return s || fallback || "";
130
    }
131

            
132
    function clamp(n, min, max) {
133
      return Math.max(min, Math.min(max, n));
134
    }
135

            
136
    function topicTokens(topic) {
137
      if (typeof topic !== "string") return [];
138
      return topic.split("/").map(function(token) { return token.trim(); }).filter(function(token) { return !!token; });
139
    }
140

            
141
    function asBool(value) {
142
      if (typeof value === "boolean") return value;
143
      if (typeof value === "number") return value !== 0;
144
      if (typeof value === "string") {
145
        var v = value.trim().toLowerCase();
146
        if (v === "true" || v === "1" || v === "on" || v === "yes" || v === "online") return true;
147
        if (v === "false" || v === "0" || v === "off" || v === "no" || v === "offline") return false;
148
      }
149
      return null;
150
    }
151

            
152
    function pickFirst(obj, keys) {
153
      if (!obj || typeof obj !== "object") return undefined;
154
      for (var i = 0; i < keys.length; i++) {
155
        if (Object.prototype.hasOwnProperty.call(obj, keys[i])) return obj[keys[i]];
156
      }
157
      return undefined;
158
    }
159

            
160
    function pickPath(obj, path) {
161
      if (!obj || typeof obj !== "object") return undefined;
162
      var parts = path.split(".");
163
      var current = obj;
164
      for (var i = 0; i < parts.length; i++) {
165
        if (!current || typeof current !== "object" || !Object.prototype.hasOwnProperty.call(current, parts[i])) return undefined;
166
        current = current[parts[i]];
167
      }
168
      return current;
169
    }
170

            
171
    function pickFirstPath(obj, paths) {
172
      for (var i = 0; i < paths.length; i++) {
173
        var value = pickPath(obj, paths[i]);
174
        if (value !== undefined && value !== null && normalizeToken(value) !== "") return value;
175
      }
176
      return undefined;
177
    }
178

            
179
    function readBool(payload, fields) {
180
      var raw = asBool(pickFirst(payload, fields));
181
      return raw === null ? undefined : raw;
182
    }
183

            
184
    function readNumber(payload, field) {
185
      if (!payload || typeof payload !== "object") return undefined;
186
      var value = payload[field];
187
      if (typeof value !== "number" || !isFinite(value)) return undefined;
188
      return Number(value);
189
    }
190

            
191
    function toIsoTimestamp(value) {
192
      if (value === undefined || value === null) return "";
193
      if (value instanceof Date && !isNaN(value.getTime())) return value.toISOString();
194
      if (typeof value === "number" && isFinite(value)) {
195
        var ms = value < 100000000000 ? value * 1000 : value;
196
        var dateFromNumber = new Date(ms);
197
        return isNaN(dateFromNumber.getTime()) ? "" : dateFromNumber.toISOString();
198
      }
199
      if (typeof value === "string") {
200
        var trimmed = value.trim();
201
        if (!trimmed) return "";
202
        if (/^\d+$/.test(trimmed)) return toIsoTimestamp(Number(trimmed));
203
        var dateFromString = new Date(trimmed);
204
        return isNaN(dateFromString.getTime()) ? "" : dateFromString.toISOString();
205
      }
206
      return "";
207
    }
208

            
209
    function canonicalSourceTopic(topic) {
210
      var t = normalizeToken(topic);
211
      if (!t) return "";
212
      if (/\/availability$/i.test(t)) return t.replace(/\/availability$/i, "");
213
      return t;
214
    }
215

            
216
    function inferFromTopic(topic) {
217
      var tokens = topicTokens(topic);
218
      var result = {
219
        deviceType: "",
220
        site: "",
221
        location: "",
222
        deviceId: "",
223
        friendlyName: "",
224
        isAvailabilityTopic: false
225
      };
226

            
227
      if (tokens.length >= 2 && tokens[0].toLowerCase() === "zigbee2mqtt") {
228
        result.deviceType = tokens[1];
229

            
230
        if (tokens.length >= 6 && tokens[5].toLowerCase() === "availability") {
231
          result.site = tokens[2];
232
          result.location = tokens[3];
233
          result.deviceId = tokens[4];
234
          result.friendlyName = tokens.slice(1, 5).join("/");
235
          result.isAvailabilityTopic = true;
236
          return result;
237
        }
238

            
239
        if (tokens.length >= 5 && tokens[4].toLowerCase() !== "availability") {
240
          result.site = tokens[2];
241
          result.location = tokens[3];
242
          result.deviceId = tokens[4];
243
          result.friendlyName = tokens.slice(1, 5).join("/");
244
          return result;
245
        }
246

            
247
        result.friendlyName = tokens.slice(1).join("/");
248
        result.deviceId = tokens[1];
249
        result.isAvailabilityTopic = tokens.length >= 3 && tokens[tokens.length - 1].toLowerCase() === "availability";
250
        return result;
251
      }
252

            
253
      if (tokens.length >= 5 && tokens[1].toLowerCase() === "home") {
254
        result.site = tokens[0];
255
        result.location = tokens[2];
256
        result.deviceId = tokens[4];
257
        result.isAvailabilityTopic = tokens.length >= 6 && tokens[5].toLowerCase() === "availability";
258
        return result;
259
      }
260

            
261
      if (tokens.length >= 4) {
262
        result.site = tokens[0];
263
        result.location = tokens[2];
264
        result.deviceId = tokens[3];
265
      } else if (tokens.length >= 2) {
266
        result.location = tokens[tokens.length - 2];
267
        result.deviceId = tokens[tokens.length - 1];
268
      } else if (tokens.length === 1) {
269
        result.deviceId = tokens[0];
270
      }
271

            
272
      return result;
273
    }
274

            
275
    function resolveIdentity(msg, payload) {
276
      var safeMsg = (msg && typeof msg === "object") ? msg : {};
277
      var inferred = inferFromTopic(safeMsg.topic);
278
      var siteRaw = inferred.site
279
        || normalizeToken(pickFirst(payload, ["site", "homeSite"]))
280
        || normalizeToken(pickFirst(safeMsg, ["site", "homeSite"]))
281
        || node.site
282
        || "";
283

            
284
      var locationRaw = inferred.location
285
        || normalizeToken(pickFirst(payload, ["location", "room", "homeLocation", "mqttRoom"]))
286
        || normalizeToken(pickFirst(safeMsg, ["location", "room", "homeLocation", "mqttRoom"]))
287
        || node.legacyRoom
288
        || "";
289

            
290
      var deviceRaw = inferred.deviceId
291
        || normalizeToken(pickFirst(payload, ["deviceId", "device_id", "sensor", "mqttSensor", "friendly_name", "friendlyName", "name", "device"]))
292
        || normalizeToken(pickFirst(safeMsg, ["deviceId", "device_id", "sensor", "mqttSensor"]))
293
        || node.legacySensor
294
        || inferred.friendlyName
295
        || inferred.deviceType;
296

            
297
      var displayName = normalizeToken(pickFirst(payload, ["display_name", "displayName", "friendly_name", "friendlyName", "name"]))
298
        || normalizeToken(pickFirst(safeMsg, ["display_name", "displayName", "friendly_name", "friendlyName", "name"]))
299
        || inferred.friendlyName
300
        || deviceRaw
301
        || "SNZB-05P";
302

            
303
      var sourceRef = normalizeToken(pickFirstPath(payload, ["source_ref", "sourceRef", "ieee_address", "ieeeAddr", "device.ieee_address", "device.ieeeAddr"]))
304
        || normalizeToken(pickFirstPath(safeMsg, ["source_ref", "sourceRef", "ieee_address", "ieeeAddr", "device.ieee_address", "device.ieeeAddr"]))
305
        || canonicalSourceTopic(safeMsg.topic)
306
        || inferred.friendlyName
307
        || inferred.deviceType
308
        || deviceRaw
309
        || "z2m-snzb-05p";
310

            
311
      return {
312
        site: toKebabCase(siteRaw, "unknown"),
313
        location: toKebabCase(locationRaw, "unknown"),
314
        deviceId: toKebabCase(deviceRaw, toKebabCase(inferred.deviceType, "snzb-05p")),
315
        displayName: displayName,
316
        sourceRef: sourceRef,
317
        sourceTopic: canonicalSourceTopic(safeMsg.topic),
318
        isAvailabilityTopic: inferred.isAvailabilityTopic
319
      };
320
    }
321

            
322
    function noteDevice(identity) {
323
      if (!identity) return;
324
      var key = identity.site + "/" + identity.location + "/" + identity.deviceId;
325
      if (node.detectedDevices[key]) return;
326
      node.detectedDevices[key] = true;
327
      node.stats.devices_detected += 1;
328
    }
329

            
330
    function validateInboundTopic(topic) {
331
      var rawTopic = normalizeToken(topic);
332
      if (!rawTopic) {
333
        return {
334
          valid: false,
335
          reason: "Topic must be a non-empty string"
336
        };
337
      }
338

            
339
      var tokens = rawTopic.split("/").map(function(token) {
340
        return token.trim();
341
      });
342
      if (tokens.length < 5) {
343
        return {
344
          valid: false,
345
          reason: "Topic must match zigbee2mqtt/SNZB-05P/<site>/<location>/<device_id>[/availability]"
346
        };
347
      }
348
      if (tokens[0].toLowerCase() !== "zigbee2mqtt" || tokens[1].toLowerCase() !== "snzb-05p") {
349
        return {
350
          valid: false,
351
          reason: "Topic must start with zigbee2mqtt/SNZB-05P"
352
        };
353
      }
354
      if (!tokens[2] || !tokens[3] || !tokens[4]) {
355
        return {
356
          valid: false,
357
          reason: "Topic must contain non-empty site, location and device_id segments"
358
        };
359
      }
360
      if (tokens.length === 5) {
361
        return {
362
          valid: true,
363
          isAvailabilityTopic: false
364
        };
365
      }
366
      if (tokens.length === 6 && tokens[5].toLowerCase() === "availability") {
367
        return {
368
          valid: true,
369
          isAvailabilityTopic: true
370
        };
371
      }
372
      return {
373
        valid: false,
374
        reason: "Topic must not contain extra segments beyond an optional /availability suffix"
375
      };
376
    }
377

            
378
    function translatedMessageCount() {
379
      return node.stats.home_messages + node.stats.last_messages + node.stats.meta_messages + node.stats.home_availability_messages;
380
    }
381

            
382
    function updateNodeStatus(fill, shape, suffix) {
383
      var parts = [
384
        "dev " + node.stats.devices_detected,
385
        "in " + node.stats.processed_inputs,
386
        "tr " + translatedMessageCount()
387
      ];
388
      if (node.stats.operational_messages > 0) parts.push("op " + node.stats.operational_messages);
389
      if (node.stats.errors > 0) parts.push("err " + node.stats.errors);
390
      if (node.stats.invalid_topics > 0) parts.push("topic " + node.stats.invalid_topics);
391
      if (node.stats.invalid_messages > 0) parts.push("msg " + node.stats.invalid_messages);
392
      if (node.stats.invalid_payloads > 0) parts.push("payload " + node.stats.invalid_payloads);
393
      if (node.stats.unmapped_messages > 0) parts.push("unmapped " + node.stats.unmapped_messages);
394
      if (node.stats.adapter_exceptions > 0) parts.push("exc " + node.stats.adapter_exceptions);
395
      if (node.stats.dlq > 0) parts.push("dlq " + node.stats.dlq);
396
      if (suffix) parts.push(suffix);
397
      node.status({
398
        fill: fill,
399
        shape: shape,
400
        text: parts.join(" | ")
401
      });
402
    }
403

            
404
    function buildHomeTopic(identity, mapping, stream) {
405
      return identity.site + "/" + mapping.targetBus + "/" + identity.location + "/" + mapping.targetCapability + "/" + identity.deviceId + "/" + stream;
406
    }
407

            
408
    function buildSysTopic(site, stream) {
409
      return site + "/sys/adapter/" + node.adapterId + "/" + stream;
410
    }
411

            
412
    function makePublishMsg(topic, payload, retain) {
413
      return {
414
        topic: topic,
415
        payload: payload,
416
        qos: 1,
417
        retain: !!retain
418
      };
419
    }
420

            
421
    function makeSubscribeMsg(topic) {
422
      return {
423
        action: "subscribe",
424
        topic: [{
425
          topic: topic,
426
          qos: 2,
427
          rh: 0,
428
          rap: true
429
        }]
430
      };
431
    }
432

            
433
    function signature(value) {
434
      return JSON.stringify(value);
435
    }
436

            
437
    function shouldPublishLiveValue(cacheKey, payload) {
438
      var sig = signature(payload);
439
      if (node.publishCache[cacheKey] === sig) return false;
440
      node.publishCache[cacheKey] = sig;
441
      return true;
442
    }
443

            
444
    function shouldPublishRetained(cacheKey, payload) {
445
      var sig = signature(payload);
446
      if (node.retainedCache[cacheKey] === sig) return false;
447
      node.retainedCache[cacheKey] = sig;
448
      return true;
449
    }
450

            
451
    function humanizeCapability(capability) {
452
      return capability.replace(/_/g, " ").replace(/\b[a-z]/g, function(ch) { return ch.toUpperCase(); });
453
    }
454

            
455
    function resolveObservation(msg, payloadObject) {
456
      var safeMsg = (msg && typeof msg === "object") ? msg : {};
457
      var observedAtRaw = pickFirstPath(payloadObject, [
458
        "observed_at",
459
        "observedAt",
460
        "timestamp",
461
        "time",
462
        "ts",
463
        "last_seen",
464
        "lastSeen",
465
        "device.last_seen",
466
        "device.lastSeen"
467
      ]) || pickFirstPath(safeMsg, [
468
        "observed_at",
469
        "observedAt",
470
        "timestamp",
471
        "time",
472
        "ts"
473
      ]);
474
      var observedAt = toIsoTimestamp(observedAtRaw);
475
      if (observedAt) {
476
        return {
477
          observedAt: observedAt,
478
          quality: "good"
479
        };
480
      }
481
      return {
482
        observedAt: new Date().toISOString(),
483
        quality: "estimated"
484
      };
485
    }
486

            
487
    function buildMetaPayload(identity, mapping) {
488
      var payload = {
489
        schema_ref: "mqbus.home.v1",
490
        payload_profile: mapping.payloadProfile,
491
        stream_payload_profiles: {
492
          value: "scalar",
493
          last: "envelope"
494
        },
495
        data_type: mapping.dataType,
496
        adapter_id: node.adapterId,
497
        source: mapping.sourceSystem,
498
        source_ref: identity.sourceRef,
499
        source_topic: identity.sourceTopic,
500
        display_name: identity.displayName + " " + humanizeCapability(mapping.targetCapability),
501
        tags: [mapping.sourceSystem, "snzb-05p", mapping.targetBus],
502
        historian: {
503
          enabled: !!mapping.historianEnabled,
504
          mode: mapping.historianMode
505
        }
506
      };
507

            
508
      if (mapping.unit) payload.unit = mapping.unit;
509
      if (mapping.precision !== undefined) payload.precision = mapping.precision;
510

            
511
      return payload;
512
    }
513

            
514
    function buildLastPayload(value, observation) {
515
      var payload = {
516
        value: value,
517
        observed_at: observation.observedAt
518
      };
519
      if (observation.quality && observation.quality !== "good") payload.quality = observation.quality;
520
      return payload;
521
    }
522

            
523
    function noteMessage(kind) {
524
      if (!Object.prototype.hasOwnProperty.call(node.stats, kind)) return;
525
      node.stats[kind] += 1;
526
    }
527

            
528
    function noteErrorKind(code) {
529
      if (code === "invalid_message") {
530
        noteMessage("invalid_messages");
531
      } else if (code === "invalid_topic") {
532
        noteMessage("invalid_topics");
533
      } else if (code === "payload_not_object" || code === "invalid_availability_payload") {
534
        noteMessage("invalid_payloads");
535
      } else if (code === "no_mapped_fields") {
536
        noteMessage("unmapped_messages");
537
      } else if (code === "adapter_exception") {
538
        noteMessage("adapter_exceptions");
539
      }
540
    }
541

            
542
    function summarizeForLog(value) {
543
      if (value === undefined) return "undefined";
544
      if (value === null) return "null";
545
      if (typeof value === "string") {
546
        return value.length > 180 ? value.slice(0, 177) + "..." : value;
547
      }
548
      try {
549
        var serialized = JSON.stringify(value);
550
        if (serialized.length > 180) return serialized.slice(0, 177) + "...";
551
        return serialized;
552
      } catch (err) {
553
        return String(value);
554
      }
555
    }
556

            
557
    function logIssue(level, code, reason, msg, rawPayload) {
558
      var parts = [
559
        "[" + node.adapterId + "]",
560
        code + ":",
561
        reason
562
      ];
563
      var sourceTopic = msg && typeof msg === "object" ? normalizeToken(msg.topic) : "";
564
      if (sourceTopic) parts.push("topic=" + sourceTopic);
565
      if (rawPayload !== undefined) parts.push("payload=" + summarizeForLog(rawPayload));
566

            
567
      var text = parts.join(" ");
568
      if (level === "error") {
569
        node.error(text, msg);
570
        return;
571
      }
572
      node.warn(text);
573
    }
574

            
575
    function enqueueHomeMeta(messages, identity, mapping) {
576
      var topic = buildHomeTopic(identity, mapping, "meta");
577
      var payload = buildMetaPayload(identity, mapping);
578
      if (!shouldPublishRetained("meta:" + topic, payload)) return;
579
      messages.push(makePublishMsg(topic, payload, true));
580
      noteMessage("meta_messages");
581
    }
582

            
583
    function enqueueHomeAvailability(messages, identity, mapping, online) {
584
      var topic = buildHomeTopic(identity, mapping, "availability");
585
      var payload = online ? "online" : "offline";
586
      if (!shouldPublishRetained("availability:" + topic, payload)) return;
587
      messages.push(makePublishMsg(topic, payload, true));
588
      noteMessage("home_availability_messages");
589
    }
590

            
591
    function enqueueHomeLast(messages, identity, mapping, value, observation) {
592
      var topic = buildHomeTopic(identity, mapping, "last");
593
      var payload = buildLastPayload(value, observation);
594
      if (!shouldPublishRetained("last:" + topic, payload)) return;
595
      messages.push(makePublishMsg(topic, payload, true));
596
      noteMessage("last_messages");
597
    }
598

            
599
    function enqueueHomeValue(messages, identity, mapping, value) {
600
      var topic = buildHomeTopic(identity, mapping, "value");
601
      if (!shouldPublishLiveValue("value:" + topic, value)) return;
602
      messages.push(makePublishMsg(topic, value, false));
603
      noteMessage("home_messages");
604
    }
605

            
606
    function enqueueAdapterAvailability(messages, site, online) {
607
      var topic = buildSysTopic(site, "availability");
608
      var payload = online ? "online" : "offline";
609
      if (!shouldPublishRetained("sys:availability:" + topic, payload)) return;
610
      messages.push(makePublishMsg(topic, payload, true));
611
      noteMessage("operational_messages");
612
    }
613

            
614
    function enqueueAdapterStats(messages, site, force) {
615
      if (!force && node.stats.processed_inputs !== 1 && (node.stats.processed_inputs % node.statsPublishEvery) !== 0) return;
616
      var topic = buildSysTopic(site, "stats");
617
      var payload = {
618
        processed_inputs: node.stats.processed_inputs,
619
        devices_detected: node.stats.devices_detected,
620
        translated_messages: translatedMessageCount(),
621
        home_messages: node.stats.home_messages,
622
        last_messages: node.stats.last_messages,
623
        meta_messages: node.stats.meta_messages,
624
        home_availability_messages: node.stats.home_availability_messages,
625
        operational_messages: node.stats.operational_messages,
626
        invalid_messages: node.stats.invalid_messages,
627
        invalid_topics: node.stats.invalid_topics,
628
        invalid_payloads: node.stats.invalid_payloads,
629
        unmapped_messages: node.stats.unmapped_messages,
630
        adapter_exceptions: node.stats.adapter_exceptions,
631
        errors: node.stats.errors,
632
        dlq: node.stats.dlq
633
      };
634
      messages.push(makePublishMsg(topic, payload, true));
635
      noteMessage("operational_messages");
636
    }
637

            
638
    function enqueueError(messages, site, code, reason, sourceTopic) {
639
      var payload = {
640
        code: code,
641
        reason: reason,
642
        source_topic: normalizeToken(sourceTopic),
643
        adapter_id: node.adapterId
644
      };
645
      messages.push(makePublishMsg(buildSysTopic(site, "error"), payload, false));
646
      noteErrorKind(code);
647
      noteMessage("errors");
648
      noteMessage("operational_messages");
649
    }
650

            
651
    function enqueueDlq(messages, site, code, sourceTopic, rawPayload) {
652
      var payload = {
653
        code: code,
654
        source_topic: normalizeToken(sourceTopic),
655
        payload: rawPayload
656
      };
657
      messages.push(makePublishMsg(buildSysTopic(site, "dlq"), payload, false));
658
      noteMessage("dlq");
659
      noteMessage("operational_messages");
660
    }
661

            
662
    function activeMappings(payloadObject) {
663
      var result = [];
664
      for (var i = 0; i < CAPABILITY_MAPPINGS.length; i++) {
665
        var mapping = CAPABILITY_MAPPINGS[i];
666
        if (typeof mapping.applies === "function" && !mapping.applies(payloadObject)) continue;
667
        var value = payloadObject ? mapping.read(payloadObject) : undefined;
668
        var seen = mapping.core || node.seenOptionalCapabilities[mapping.targetCapability];
669
        if (value !== undefined) node.seenOptionalCapabilities[mapping.targetCapability] = true;
670
        if (mapping.core || seen || value !== undefined) {
671
          result.push({
672
            mapping: mapping,
673
            hasValue: value !== undefined,
674
            value: value
675
          });
676
        }
677
      }
678
      return result;
679
    }
680

            
681
    function resolveOnlineState(payloadObject, availabilityValue) {
682
      if (availabilityValue !== null && availabilityValue !== undefined) return !!availabilityValue;
683
      if (!payloadObject) return true;
684
      var active = true;
685
      if (typeof payloadObject.availability === "string") active = payloadObject.availability.trim().toLowerCase() !== "offline";
686
      if (typeof payloadObject.online === "boolean") active = payloadObject.online;
687
      return active;
688
    }
689

            
690
    function flush(send, done, controlMessages, publishMessages, error) {
691
      send([
692
        publishMessages.length ? publishMessages : null,
693
        controlMessages.length ? controlMessages : null
694
      ]);
695
      if (done) done(error);
696
    }
697

            
698
    function startSubscriptions() {
699
      if (node.subscriptionStarted) return;
700
      node.subscriptionStarted = true;
701
      node.send([null, makeSubscribeMsg(node.sourceTopic)]);
702
      updateNodeStatus("grey", "dot", "subscribed");
703
    }
704

            
705
    node.on("input", function(msg, send, done) {
706
      send = send || function() { node.send.apply(node, arguments); };
707
      node.stats.processed_inputs += 1;
708

            
709
      try {
710
        var controlMessages = [];
711
        if (!msg || typeof msg !== "object") {
712
          var invalidMessages = [];
713
          var invalidSite = resolveIdentity({}, null).site;
714
          enqueueAdapterAvailability(invalidMessages, invalidSite, true);
715
          enqueueError(invalidMessages, invalidSite, "invalid_message", "Input must be an object", "");
716
          enqueueDlq(invalidMessages, invalidSite, "invalid_message", "", null);
717
          enqueueAdapterStats(invalidMessages, invalidSite, true);
718
          logIssue("warn", "invalid_message", "Input must be an object", null, msg);
719
          updateNodeStatus("yellow", "ring", "bad msg");
720
          flush(send, done, controlMessages, invalidMessages);
721
          return;
722
        }
723

            
724
        var payload = msg.payload;
725
        var payloadObject = payload && typeof payload === "object" && !Array.isArray(payload) ? payload : null;
726
        var identity = resolveIdentity(msg, payloadObject);
727
        var observation = resolveObservation(msg, payloadObject);
728
        var messages = [];
729
        var topicValidation = validateInboundTopic(msg.topic);
730
        var availabilityValue = null;
731

            
732
        enqueueAdapterAvailability(messages, identity.site, true);
733

            
734
        if (!topicValidation.valid) {
735
          enqueueError(messages, identity.site, "invalid_topic", topicValidation.reason, msg.topic);
736
          enqueueDlq(messages, identity.site, "invalid_topic", msg.topic, payload);
737
          enqueueAdapterStats(messages, identity.site, true);
738
          logIssue("warn", "invalid_topic", topicValidation.reason, msg, payload);
739
          updateNodeStatus("yellow", "ring", "bad topic");
740
          flush(send, done, controlMessages, messages);
741
          return;
742
        }
743

            
744
        identity.isAvailabilityTopic = topicValidation.isAvailabilityTopic;
745
        noteDevice(identity);
746

            
747
        if (topicValidation.isAvailabilityTopic) {
748
          availabilityValue = asBool(payload);
749
          if (availabilityValue === null) {
750
            enqueueError(messages, identity.site, "invalid_availability_payload", "Availability payload must be online/offline or boolean", msg.topic);
751
            enqueueDlq(messages, identity.site, "invalid_availability_payload", msg.topic, payload);
752
            enqueueAdapterStats(messages, identity.site, true);
753
            logIssue("warn", "invalid_availability_payload", "Availability payload must be online/offline or boolean", msg, payload);
754
            updateNodeStatus("yellow", "ring", "invalid availability");
755
            flush(send, done, controlMessages, messages);
756
            return;
757
          }
758
        } else if (!payloadObject) {
759
          enqueueError(messages, identity.site, "payload_not_object", "Telemetry payload must be an object", msg.topic);
760
          enqueueDlq(messages, identity.site, "payload_not_object", msg.topic, payload);
761
          enqueueAdapterStats(messages, identity.site, true);
762
          logIssue("warn", "payload_not_object", "Telemetry payload must be an object", msg, payload);
763
          updateNodeStatus("yellow", "ring", "payload not object");
764
          flush(send, done, controlMessages, messages);
765
          return;
766
        }
767

            
768
        var online = resolveOnlineState(payloadObject, availabilityValue);
769
        var mappings = activeMappings(payloadObject);
770
        var hasMappedValue = false;
771
        for (var i = 0; i < mappings.length; i++) {
772
          if (mappings[i].hasValue) {
773
            hasMappedValue = true;
774
            break;
775
          }
776
        }
777
        var hasAvailabilityField = topicValidation.isAvailabilityTopic || (payloadObject && (typeof payloadObject.availability === "string" || typeof payloadObject.online === "boolean"));
778

            
779
        if (!hasMappedValue && !hasAvailabilityField) {
780
          enqueueError(messages, identity.site, "no_mapped_fields", "Payload did not contain any supported SNZB-05P fields", msg.topic);
781
          enqueueDlq(messages, identity.site, "no_mapped_fields", msg.topic, payloadObject);
782
          logIssue("warn", "no_mapped_fields", "Payload did not contain any supported SNZB-05P fields", msg, payloadObject);
783
        }
784

            
785
        for (var j = 0; j < mappings.length; j++) {
786
          enqueueHomeMeta(messages, identity, mappings[j].mapping);
787
          enqueueHomeAvailability(messages, identity, mappings[j].mapping, online);
788
          if (mappings[j].hasValue) {
789
            enqueueHomeLast(messages, identity, mappings[j].mapping, mappings[j].value, observation);
790
            enqueueHomeValue(messages, identity, mappings[j].mapping, mappings[j].value);
791
          }
792
        }
793

            
794
        enqueueAdapterStats(messages, identity.site, false);
795

            
796
        if (!hasMappedValue && !hasAvailabilityField) {
797
          updateNodeStatus("yellow", "ring", "unmapped");
798
        } else {
799
          updateNodeStatus(online ? "green" : "yellow", online ? "dot" : "ring", online ? null : "offline");
800
        }
801

            
802
        flush(send, done, controlMessages, messages);
803
      } catch (err) {
804
        var errorPayload = msg && msg.payload && typeof msg.payload === "object" && !Array.isArray(msg.payload) ? msg.payload : null;
805
        var errorIdentity = resolveIdentity(msg, errorPayload);
806
        noteDevice(errorIdentity);
807
        var errorMessages = [];
808
        enqueueAdapterAvailability(errorMessages, errorIdentity.site, true);
809
        enqueueError(errorMessages, errorIdentity.site, "adapter_exception", err.message, msg && msg.topic);
810
        enqueueAdapterStats(errorMessages, errorIdentity.site, true);
811
        logIssue("error", "adapter_exception", err.message, msg, msg && msg.payload);
812
        updateNodeStatus("red", "ring", "error");
813
        flush(send, done, [], errorMessages, err);
814
      }
815
    });
816

            
817
    node.on("close", function() {
818
      if (node.startTimer) {
819
        clearTimeout(node.startTimer);
820
        node.startTimer = null;
821
      }
822
    });
823

            
824
    node.startTimer = setTimeout(startSubscriptions, 250);
825
    updateNodeStatus("grey", "ring", "waiting");
826
  }
827

            
828
  RED.nodes.registerType("z2m-snzb-05p-homebus", Z2MSNZB05PHomeBusNode);
829
};