How to integrate TagoIO with Milesight UG65 embedded LoRaWAN Network Server via MQTT

How to integrate TagoIO with Milesight UG65 embedded LoRaWAN Network Server via MQTT

@FuJi Kok Fook-Kee

Hi TagoIO team

I would like to integrate TagoIO with Embedded LoRaWAN Network Server of Milesight Gateway via MQTT instead of HTTPs.

I have two issues to resolve:

  1. How to parse two or multi level of RAW JSON to TagoIO JSON?

The snippet example only parse one level of JSON. The RAW payload consists of two level of JSON for “rxInfo” & “txinfo” variables. See below:

[{“applicationID”:“2”,
“applicationName”:“TagoIO_AM319_MQTT”,
“data”:“A2feAARohQUAAAbLAAd90QEIfSAACXNNJwp9AwALfQMADH0DAA==”,
“devEUI”:“FFFFFFFFFFFFFFFF”,
“deviceName”:“AM319_FFFF”,
“fCnt”:256,
“fPort”:85,
"rxInfo[{“altitude”:0,“latitude”:0,“loRaSNR”:13.2,“longitude”:0,“mac”:“FFFFFFFFFFFFFFFF”,“name”:“Local Gateway”,“rssi”:-45,“time”:“2021-12-03T16:57:31.08877Z”}],
“time”:“2021-12-03T16:57:31.08877Z”,
"txInfo":{“adr”:true,“codeRate”:“4/5”,“dataRate”:{“bandwidth”:125,“modulation”:“LORA”,“spreadFactor”:7},“frequency”:923200000},“metadata”:{“mqtt_topic”:“data”}}]

The same issue had been resolved in the “Generic HTTPs Endpoint” Network middleware.
Can Tago share how it was done in the “Generic HTTPs Endpoint” Network middleware.

  1. How to implement the MQTT subscribe for my own MQTT connector?

I would like to add my own MQTT connectors (integrate with TagoIO MQTT broker/network).
But I notice that the MQTT Subscribe coding need to be implemented if I’m using my own MQTT connector instead of the TagoIO Custom MQTT connector.
Can Tago share how it was done in the “Custom MQTT” connector?

Payload parser of my own MQTT connector :

const ignore_vars = [];

function toTagoFormat(object_item, serie, prefix = ‘’) {
const result = [];
for (const key in object_item) {
if (ignore_vars.includes(key)) continue;

if (typeof object_item[key] == 'object') {
  result.push({
    variable: object_item[key].variable || `${prefix}${key}`,
    value: object_item[key].value,
    serie: object_item[key].serie || serie,
    metadata: object_item[key].metadata,
    location: object_item[key].location,
    unit: object_item[key].unit,
  });
} else {
  result.push({
    variable: `${prefix}${key}`,
    value: object_item[key],
    serie,
  });
}

}

return result;
}

if (!payload[0].variable) {
// Get a unique serie for the incoming data.
const serie = payload[0].serie || new Date().getTime();

payload = toTagoFormat(payload[0], serie);
}

const payload_raw = payload.find(x => x.variable === ‘data’);
if (payload_raw) {
try {
// Convert the data from Hex to Javascript Buffer.
// const buffer = Buffer.from(payload_raw.value, ‘hex’);
const buffer = Buffer.from(payload_raw.value, ‘base64’);

var data = [];
const datahex = Buffer.from(payload_raw.value, “base64”).toString(“hex”);
data.push({ variable: ‘data_payload’, value: datahex });

for (var i = 0; i < buffer.length;) {
var channel_id = buffer[i++];
var channel_type = buffer[i++];
// BATTERY
if (channel_id === 0x01 && channel_type === 0x75) {
data.push({ variable: ‘battery’, value: buffer.readUInt8(i), unit: ‘%’ });
i += 1;
}
// TEMPERATURE
else if (channel_id === 0x03 && channel_type === 0x67) {
// °C
data.push({ variable: ‘temperature’, value: buffer.readInt16LE(i) / 10.0, unit: ‘°C’ });
i += 2;
}
// HUMIDITY
else if (channel_id === 0x04 && channel_type === 0x68) {
data.push({ variable: ‘humidity’, value: buffer.readUInt8(i) / 2.0, unit: ‘%’ });
i += 1;
}
// PIR Status
else if (channel_id === 0x05 && channel_type === 0x00) {
data.push({ variable: ‘pir_status’, value: buffer.readUInt8(i) });
i += 1;
}
// LIGHT Level
else if (channel_id === 0x06 && channel_type === 0xcb) {
data.push({ variable: ‘light_level’, value: buffer.readUInt8(i) });
i += 1;
}
// CO2
else if (channel_id === 0x07 && channel_type === 0x7D) {
data.push({ variable: ‘co2’, value: buffer.readUInt16LE(i), unit: ‘ppm’ });
i += 2;
}
// TVOC
else if (channel_id === 0x08 && channel_type === 0x7D) {
data.push({ variable: ‘tvoc’, value: buffer.readUInt16LE(i), unit: ‘ppb’ });
i += 2;
}
// PRESSURE
else if (channel_id === 0x09 && channel_type === 0x73) {
data.push({ variable: ‘pressure’, value: buffer.readUInt16LE(i) / 10, unit: ‘hpa’ });
i += 2;
}
// HCHO
else if (channel_id === 0x0A && channel_type === 0x7D) {
data.push({ variable: ‘hcho’, value: buffer.readUInt16LE(i) / 100, unit: ‘mg/m3’ });
i += 2;
}
// PM2.5
else if (channel_id === 0x0B && channel_type === 0x7D) {
data.push({ variable: ‘pm2_5’, value: buffer.readUInt16LE(i), unit: ‘ug/m3’ });
i += 2;
}
// PM10
else if (channel_id === 0x0C && channel_type === 0x7D) {
data.push({ variable: ‘pm10’, value: buffer.readUInt16LE(i), unit: ‘ug/m3’ });
i += 2;
} else {
break;
}
}

// Remove unwanted variables.
payload = payload.filter(x => !ignore_vars.includes(x.variable));

// This will concat the content sent by your device with the content generated in this payload parser.
// It also add the field "serie" and "time" to it, copying from your sensor data.
payload = payload.concat(data.map(x => ({ ...x, serie: payload_raw.serie, time: payload_raw.time })));

} catch (e) {
// Print the error to the Live Inspector.
console.error(e);

// Return the variable parse_error for debugging.
payload = [{ variable: 'parse_error', value: e.message }];

}
}

Thanks