产品驱动开发指南
这一篇都讲什么
"产品驱动"是 NexIoT 中把设备字节流翻译成平台标准消息的核心环节,过去拆成四篇,现在统一收在这一篇里:
- 各平台驱动方案对比 + 三个核心方法入门;
- 消息全链路(preDecode → decode → encode → 入库 → 推送)与设备生命周期;
- 高级特性:消息回执、回复主题、跨产品联动;
- 内置函数库速查(HEX、Modbus、CRC、影子、JSON…);
- 实战案例 A/B/C/D(DTU、Modbus 灯控、JT808 应答、跨设备联动);
- 调试三板斧 + 常见踩坑 Top 10。
读完这一篇,从零写出一款生产级驱动绰绰有余。
一、各平台驱动方式对比
本项目的产品驱动,主推的执行引擎是 magic,它是基于 ASM 字节码技术,语法介于 Java 和 JavaScript 之间。别慌,上手很简单,使用起来也很爽。
| NexIoT | 阿里云 IoT | 华为云 IoT | 腾讯云 IoT | |
|---|---|---|---|---|
| 功能名词 | 产品驱动(在线 IDE) | 消息解析 | 插件开发 | 云端解析 |
| 支持方式 | Spring Bean 远程驱动(JAR) JS ✅magic | JavaScript Python PHP | 图形化开发 脚本化开发(JS) FunctionGraph | JavaScript |
| 灵活性 | 高(三种方式可选) | 中(多脚本语言) | 高(图形化 + 脚本化) | 中(单一脚本) |
| 适用场景 | 企业级、灵活复杂协议、高性能 | 快速开发、多语言 | 快速开发、可视化 | 简单协议 |
为什么只开放了 magic
- 平台最大亮点,会 JS、Java 等任意一种就能上手;
- Spring Bean 需硬编码、重启服务,调试链路复杂;
- JavaScript 在 JDK 生态太弱(ES5),实现一个 MD5 都得自己写;
- 上传 JAR 仅适用于内部安全环境。
二、必须掌握的三个方法
preDecode—— 提取设备身份标识,平台要先知道"你是谁";decode—— 设备数据 → 平台格式,把乱七八糟的字节转成 JSON;encode—— 平台指令 → 设备格式,把平台 JSON 拼成设备能识别的字符。
下面是一份可以直接抄的骨架代码:
/**
* 预解码:提取设备身份标识
* @param {String} payload 设备原始报文(二进制、JSON、16 进制字符串等)
* @param {String} context 一般是 topic
* @returns {Object} {deviceId, productKey}
*/
var preDecode = (payload, context) => {
var deviceId;
context = StrUtil.str(context);
// topic 格式五花八门:cat4_lock/p2p/GID_lock@@@123456789012345 / nexiot/up/zxabc123456
if (context.contains("/")) {
var parts = context.split("\\/");
deviceId = parts[parts.length - 1];
}
// deviceId 也可能藏在消息体里
else if (JSONUtil.isTypeJSON(payload)) {
payload = toJson(payload);
deviceId = payload.deviceId;
}
if (deviceId == null) {
return null; // 没解出来就直接丢弃
}
return { "deviceId": deviceId };
};
/**
* 编码:平台指令转设备格式
*/
var encode = payload => {
var result = "";
// TODO:实现编码逻辑
return result;
};
/**
* 解码:设备数据转平台格式
* @returns {Array} 平台标准消息数组
*/
var decode = payload => {
var rs = [];
rs.push({
"messageType": "PROPERTIES",
"properties": { "slaveAddress": 9 },
"subDevice": true
});
rs.push({
"messageType": "EVENT",
"event": "",
"data": {}
});
return rs;
};
// 调试示例(正式使用请删除)
// return preDecode("");
// return encode({});
// return decode('{"temperture": 25.3}');2.1 上报一条属性
{
"messageType": "PROPERTIES",
"properties": {
"sourcePayload": "02240000001302031003CA00DE000000000000000000000058",
"humidity": 97,
"temperature": 22.2,
"illumination": 88
}
}2.2 上报一条事件
{
"messageType": "EVENT",
"event": "online",
"data": {}
}2.3 属性 + 事件一起报(推荐)
[
{ "messageType": "EVENT", "event": "online", "data": {} },
{ "messageType": "PROPERTIES", "properties": { "humidity": 97, "temperature": 22.2 } }
]三、一条上行消息的完整旅程
设备发出一条字节流,到平台落库 / 推送,平台内部经历的关键步骤如下:
对应到代码上:
- 异步入口:
AbstractUPService#asyncUP,每条消息一个traceId,便于全链路日志检索; - 设备实例化与上线:
AbstratIoTService#lifeCycleDevInstance; - 入库与影子:
AbstratIoTService#doLogMetadataAndShadow; - 订阅推送:
AbstratIoTService#doEventNameAndSubscribe。
没出现在数据库 / 影子里?先看这两步
preDecode是否成功返回了deviceId(否则整条消息直接丢弃);decode是否返回了带messageType的对象(没有messageType,平台无法分发)。
四、设备自动上线机制
只要平台收到任意一条该设备的消息,都会触发上线检查:
- 通过
preDecode找到设备; - 在 Redis 中检查两个 key:
offline:{productKey}{deviceId}—— 离线标志;online:{productKey}{deviceId}—— 上线锁(15 分钟原子锁)。
- 如果 DB 中状态为离线 或 存在离线标志,就尝试
setIfAbsent(onlineKey, "1"):- 抢到锁 → 触发
IoTDeviceActionAfterService.online(); - 抢不到 → 已有线程在处理,跳过。
- 抢到锁 → 触发
4.1 主动触发离线
在 decode 返回的事件里把 event 设为 offline,平台会自动调用 offline():
{
"messageType": "EVENT",
"event": "offline",
"data": {}
}默认离线时长
未配置时,超过 24 小时未收到任意消息会自动离线;可在 产品详情 → 端云配置 中调整。
五、messageType 全量枚举
平台在分发消息时,只认 messageType。一条 decode 返回的对象必须显式指定它:
| messageType | 用途 | 关键字段 | 触发动作 |
|---|---|---|---|
PROPERTIES | 设备属性上报(最常用) | properties | 写日志 + 更新影子 |
EVENT | 设备事件(上线、报警等) | event / data | 写日志 + 推送订阅;event=offline 触发下线 |
FUNCTIONS | 指令回执(设备对平台下发的应答) | function / data | 关联回执到下行命令 |
未指定 messageType 会被怎么处理
平台兜底当作 PROPERTIES,但强烈建议显式声明,避免事件被错误归类到属性。
六、完整消息字段速查
{
"messageType": "PROPERTIES", // 必填
"properties": { }, // PROPERTIES 时使用
"event": "online", // EVENT 时使用
"data": { }, // 事件数据 / 函数回执数据
"function": "queryLightState", // FUNCTIONS 时回填
"tags": { }, // 自定义标签,随消息透传
"subDevice": { "slaveAddress": 9 },// 子设备识别(见下节)
"ts": "1723200000000", // 设备侧时间戳(毫秒),不填用平台接收时间
"replyPayload": "OK", // 立即下发到设备的回复内容(TCP/MQTT)
"downTopic": "nexiot/down/xxx" // 指定回复主题(仅 MQTT)
}字段最终由 AbstratIoTService#buildCodecNotNullBean 拼装成 BaseUPRequest,保留字段以源码为准。
七、网关与子设备上报
当一条物理报文里包含「网关 + 子设备」时,网关侧驱动只需识别出从设备地址,子设备数据通过 subDevice 透传:
var pts = {
"messageType": "PROPERTIES",
"properties": { "sourcePayload": payload },
"subDevice": { "slaveAddress": 9 } // 子设备 ID 字段,由产品配置决定
};
return [pts];平台处理顺序:
- 网关自身正常入库;
- 根据
subDevice的字段查找已注册子设备; - 用子设备所属产品的驱动再次解析报文(也可在 magic-script 中通过
ICodecService.decode(productKey, payload)主动触发)。
子设备没数据怎么办
- 必须在「子设备管理」中预先注册子设备,并设置识别字段(如
slaveAddress); - 网关驱动里要确保返回
subDevice,且字段名与产品配置一致。
八、调试模式:不污染生产数据
BaseUPRequest 上有 debug 字段,置 true 后:
- 不写设备日志;
- 不更新影子;
- 仍然走完编解码、订阅推送链路。
适合在 IDE 中验证脚本时使用。具体调试方法见 05. IDE 调试器使用。
九、高级特性
对于 JT808、IEC104 等复杂自定义协议,必须有完整交互(类似 TCP 三次握手)。
9.1 收到消息立即回复
在返回对象上加 replyPayload,平台会自动下发,支持 TCP / MQTT:
{
"messageType": "PROPERTIES",
"properties": { "temperature": 22.2 },
"replyPayload": "7E0001000200000123456789ABCD7E"
}数组多条都挂 replyPayload?
理论支持但不建议,会按顺序逐条下发,建议只在最后一条挂。
9.2 回复特定主题(仅 MQTT)
平台支持在 产品详情 → 端云配置 里设置主题模板,内置 #{deviceId} / #{productKey} 等通配符;也可以在驱动里直接写死:
{
"messageType": "PROPERTIES",
"properties": { "temperture": 25.3 },
"replyPayload": "OK",
"downTopic": "nexiot/down/zxabc123456"
}操作步骤
- 进入
所有产品→ 选择对应产品 →产品详情; - 找到
端云配置,点击编辑; - 填入想要的主题模板,保存。

9.3 跨产品 / 跨设备联动
var decode = (payload, context) => {
// 1. 调另一款产品的驱动重新解析(如电表网关 → 子电表)
var subList = ICodecService.decode("HnDdSnj1kiQg", payload, null, UPRequest.class);
// 2. 借助影子读取上一次累计值,算增量
var lastKwh = shadow_getVal(context.shadow, "totalKwh");
var delta = currentKwh - (lastKwh || 0);
return [{
"messageType": "PROPERTIES",
"properties": { "totalKwh": currentKwh, "deltaKwh": delta }
}];
};注意
跨产品调用属于"高级用法",会引入耦合。优先通过子设备机制(见 网关与子设备上报)解决网关-子设备场景。
十、内置函数库速查
所有函数均来自 cn.universal.core.engine.extend 包,由 FunctionRegistrar 自动注册到脚本运行时,无需 import 即可调用。
10.1 一分钟速查表
| 想做什么 | 直接用 |
|---|---|
| HEX ↔ 字符串 | hex_toAscii(hex) / hex_fromAscii(str) |
| HEX 反转 / 补齐 | hex_reverse(hex) / hex_padLeft(hex, byteLen, fill) |
| 字符串 → JSON | toJson(str) / toJsonArray(str) |
| JSON → 字符串 | toJsonString(obj) |
| Base64 | toBase64Encode(s) / toBase64Decode(s) |
| 类型判断 | isJson / isJsonArray / isMap / isList / isEmpty / isNumber |
| Modbus 地址 | modbus_addr("01") |
| Modbus CRC16 | modbus_crc16(hex) / modbus_crc16Verify(hex, crc) |
| Modbus 识别 | isModbusTcp(hex) / isModbusRtu(hex) |
| CS 校验 | cs_check(hex, mod) / cs_verify(hex, cs, mod) |
| DL/T645 | hex_645AddrEncode/Decode / hex_add33 / hex_sub33 / hex_645SimpleParse |
| 设备影子 | shadow_getVal / shadow_getTs / shadow_getAllVals / shadow_getAllTs |
| 信号强度换档 | csq(0~31) / rsrpSnr(rsrp, snr) |
10.2 类型判断(TypeCheckFunctions)
isJson("{\"a\":1}"); // true
isJsonArray("[1,2]"); // true
isJsonObject(payload); // 是否为 JSONObject
isMap(payload); // 是否为 Map
isList(payload); // 是否为集合或数组
isEmpty(str); // null 或空字符串
isNotEmpty(str);
isNumber("123.45"); // 数字字符串易错点
isJson 与 isJsonArray 互斥:isJson("[1,2]") 返回 false。要识别"是不是 JSON 结构"请并集使用。
10.3 HEX 工具(HexFunctions)
// 编/解码
hex_toAscii("313233"); // "123"
hex_fromAscii("ABC"); // "414243"
hex_toAsciiSafe(hex); // 不可见字符自动转义
hex_isAsciiPrintable(hex);
hex_toGBK(hex); // 按 GBK 解码
hex_fromGBK("中文"); // 中文 → HEX(GBK)
hex_stripSpaces(hex); // 去掉空白与 0x 前缀
// 排版
hex_reverse("01020304"); // "04030201"
hex_padLeft("01", 4, "00"); // "00000001"
// DL/T645 专用
hex_add33(hex);
hex_sub33(hex);
hex_645AddrEncode("123456789012");
hex_645AddrDecode(hex);
hex_diReverse(di);
hex_645SimpleParse(frame);
// 校验
crc_cs(hex); // 累加和校验
crc_csVerify(hex, expectCs);易错点
所有 HEX 函数要求长度为偶数。奇数长度返回 null,先用 hex_stripSpaces 清洗。
10.4 Modbus 工具(ModbusFunctions)
modbus_addr("1"); // "01",自动补两位
modbus_crc16("01030070000 1"); // 计算 RTU CRC16
modbus_crc16Verify(frame, crc);
// 直接拼帧
modbus_buildReadHolding("01", "0070", "0001"); // 读保持寄存器
modbus_buildWriteSingle("01", "00C2", "0041"); // 写单寄存器
modbus_parseReadHoldingResp(respHex);
// 报文识别
isModbusTcp(hex);
isModbusRtu(hex);10.5 CS 求和校验(ProtocolFunctions)
cs_check("AABBCC", 256); // 按字节求和取模
cs_verify("AABBCC", "31", 256);10.6 类型转换(ConvertFunctions)
// JSON
toJson("{\"a\":1}");
toJsonArray("[1,2]");
toJsonString(obj);
toJsonStr(obj); // 兼容旧名
// Base64
toBase64Encode("hello");
toBase64Decode("aGVsbG8=");
base64Encode / base64Decode; // 兼容旧名
// 基础类型
toInt("12");
toLong("12");
toString(obj); // null 时返回 "null"
toStringOrEmpty(obj); // null 返回 ""10.7 JSONObject 增强方法(JSONObjectExtension)
直接挂在 JSONObject 实例上,用点号调用:
var obj = toJson(payload);
obj.getStr("name"); // 缺省 ""
obj.getLong("ts"); // 缺省 0
obj.getDouble("temp");
obj.getBool("online");
obj.getOr("level", "INFO"); // 缺省值
obj.getByPath("a.b.c"); // 嵌套路径
obj.toMap(); // 子节点转 Map10.8 业务工具(UnivFunctions)
csq(20); // 信号强度档位
rsrpSnr(-90, 10); // NB-IoT 信号档位
date_format("yyyy-MM-dd HH:mm:ss"); // 当前时间格式化
date_format_long("2025-01-01 00:00:00");
timestampToDate("1735689600000");
randomSign();
isEmpty(value);
reverse("abc"); // "cba"
stringToAscii("AB"); // "6566"
asciiToString("6566"); // "AB"
stringToUTF8(str);
urlDecode(str);10.9 设备影子读取(ShadowFunctions)
shadow_getVal(shadow, "voltage"); // 当前电压
shadow_getTs(shadow, "voltage"); // 上次上报时间戳
shadow_getAllVals(shadow);
shadow_getAllTs(shadow);用途
跨字段计算(如增量功耗)、跨设备联动、补数。
10.10 自定义函数
在 cn.universal.core.engine.extend 包下新建一个类即可:
@Component
public class MyFunctions implements IdeMagicFunction {
@Function
@Comment("自定义业务函数")
public String my_helper(@Comment(name = "x", value = "入参") String x) {
return x.toUpperCase();
}
}FunctionRegistrar 在 Spring 启动时自动注册,重启服务即可在脚本中使用 my_helper(...)。
十一、实战案例
11.1 案例 A:DTU 心跳 + JSON + Modbus 透传
典型场景:4G/有线 DTU 网关把多种设备的报文一并通过 TCP 转发上来,平台需要分流处理。
源码可参考:cn-tcp-protocol/src/main/resources/ms/demo1.ms。
/**
* 预解码:从 IMEI 提取设备号
* payload 是 ASCII 编码的 16 进制串,例如 "383631..." → "861..."
*/
var preDecode = (payload, context) => {
var IMEI = hex_toAscii(payload);
var rs = { "productKey": "lWAUlCmM53UF" };
if (IMEI.length() === 15 && IMEI.startsWith("86")) {
rs.deviceId = IMEI;
return rs;
}
return null; // 无法识别就丢弃
};
/**
* 解码:分流三类报文
* 1) JSON 原文 → 直接落属性
* 2) HEART|... 心跳 → 解析 IMEI/CSQ/GPS 等
* 3) Modbus RTU/TCP → 提取从站地址,转交子设备
*/
var decode = (payload, context) => {
const pts = {
"messageType": "PROPERTIES",
"properties": { "sourcePayload": payload }
};
const hexSource = hex_toAscii(payload);
if (isJson(hexSource)) {
pts.properties = toJson(hexSource);
} else if (hexSource.startsWith("HEART")) {
// HEART|$(IMEI)|$(CSQ)|$(TIMESTAMP)|$(GPS)|$(ICCID)|$(IMSI)
const parts = hexSource.split("\\|");
if (parts.length >= 7) {
pts.properties = {
"imei": parts[1],
"csq": Integer.parseInt(parts[2]),
"timestamp": parts[3],
"gps": parts[4],
"iccid": parts[5],
"imsi": parts[6]
};
const gps = parts[4].split(",");
if (gps.length === 2) {
pts.properties.longitude = gps[0];
pts.properties.latitude = gps[1];
}
}
} else if (isModbusTcp(hexSource)) {
pts.subDevice = { "slaveAddress": Integer.parseInt(hexSource.substring(12, 14)) };
return pts;
} else if (isModbusRtu(hexSource)) {
pts.subDevice = { "slaveAddress": Integer.parseInt(hexSource.substring(0, 2)) };
return pts;
} else {
pts.properties.hexPayload = hexSource;
}
return [pts];
};关键点
productKey写死:DTU 设备 IMEI 全局唯一,可省去查产品的步骤;subDevice.slaveAddress:识别出 Modbus 从站后,平台会自动找到对应子设备并继续解析;- 多类型分流:
isJson / startsWith / isModbusTcp / isModbusRtu串联使用是最常见的写法。
11.2 案例 B:Modbus 灯光控制器(属性 + 指令)
典型场景:485 灯光控制器,平台既要解析"颜色 + 闪烁"组合状态,也要下发控制指令。
源码可参考:cn-tcp-protocol/src/main/resources/ms/demo2.ms。
B.1 解码:从功能码 03/06 解析灯光状态
var preDecode = (payload) => {
const clean = StrUtil.removeAll(payload, ' ');
return {
"deviceId": clean.substring(2, 14), // 第 2-14 位为电表地址
"productKey": "HnDdSnj1kiQg"
};
};
var decode = (payload, context) => {
payload = hex_toAscii(payload);
const payloadHex = payload.substring(12, payload.length());
const result = [];
const props = {
"messageType": "PROPERTIES",
"properties": { "sourcePayload": payload, "protectionLevel": "IPX54" },
"data": {}
};
if (payloadHex.length() < 8) return props;
const funcCode = payloadHex.substring(2, 4);
const dataHex = payloadHex.substring(6);
if (funcCode === "03" || funcCode === "06") {
const dl = Integer.parseInt(dataHex.substring(2, 4), 16);
const colorMap = { 0:"不亮", 1:"红", 2:"绿", 3:"黄", 4:"蓝", 5:"紫", 6:"青", 7:"白" };
const flashMap = { 4:"常亮", 5:"慢闪", 6:"爆闪" };
const color = dl & 0x0F;
const flash = (dl >> 4) & 0x0F;
props.properties.currentLightColor = color;
props.properties.currentFlashMode = flashMap[flash] ? flash : 0;
props.properties.lightStatus = flashMap[flash] || "熄灭";
props.data.replyFunction = funcCode === "03" ? "queryLightState" : "controlLight";
result.push(props);
}
return result;
};B.2 编码:根据 function 名分发指令
var encode = payload => {
const obj = toJson(payload);
const addr = modbus_addr(obj.data.slaveAddress || "01");
const func = obj.function;
const params = obj.data;
let body = "";
if (func === "controlLight") {
body = addr + "0600C200" + (params.flashMode || "4") + (params.lightColor || "0");
} else if (func === "queryLightState") {
body = addr + "0300700001";
} else if (func === "resetDevice") {
body = addr + "06000C0000";
} else if (func === "cmd") {
body = (params.hexCmd || "").substring(0, params.hexCmd.length() - 4);
}
return body ? (body + modbus_crc16(body)).toUpperCase() : "";
};这段代码示范了三件事
- CRC16 自动追加 —— 业务侧只关心数据域,校验交给
modbus_crc16; - function 名分发 —— 物模型里定义的每个功能名对应一段拼帧逻辑;
- 指令透传 ——
cmd功能让运维人员可以直接灌入裸 HEX 指令,不必改驱动。
11.3 案例 C:带应答的 TCP/MQTT(JT808、IEC104 类)
需要"收到一条消息立刻回一条"时,在返回对象上加 replyPayload 即可:
return [{
"messageType": "PROPERTIES",
"properties": { "temperature": 22.2 },
"replyPayload": "7E0001000200000123456789ABCD7E" // 立即下发
}];如果是 MQTT 协议想回到特定主题:
return [{
"messageType": "PROPERTIES",
"properties": { "temp": 25.3 },
"replyPayload": "OK",
"downTopic": "nexiot/down/" + deviceId // 通配符也行
}];注意
- 数组里多条都带
replyPayload会按顺序逐条下发,建议只在最后一条挂; downTopic仅 MQTT 协议生效,TCP/UDP 忽略。
11.4 案例 D:跨产品 / 跨设备联动
var decode = (payload, context) => {
// 1. 调另一款产品的驱动重新解析(如电表网关 → 子电表)
var subList = ICodecService.decode("HnDdSnj1kiQg", payload, null, UPRequest.class);
// 2. 借助影子读取上一次的累计值,算增量
var lastKwh = shadow_getVal(context.shadow, "totalKwh");
var delta = currentKwh - (lastKwh || 0);
return [{
"messageType": "PROPERTIES",
"properties": { "totalKwh": currentKwh, "deltaKwh": delta }
}];
};十二、调试三板斧
12.1 脚本末尾直接 return
任何 .ms 文件结尾追加一行测试:
return decode("303030323030303030303036303930363030433230303431");IDE 即可立刻看到返回值,等同于本地单测。
12.2 使用平台 IDE 调试器
参考 05. IDE 调试器使用,可设置断点、查看变量、单步运行。
12.3 打开调试模式不污染数据
在 IDE 中以 debug=true 触发,编解码链路完整执行,但不会写日志、不更新影子。
十三、常见踩坑 Top 10
自查清单
| # | 现象 | 原因 / 解法 |
|---|---|---|
| 1 | 设备完全没消息 | preDecode 返回 null,整条丢弃 |
| 2 | 上线了但属性空 | decode 没返回 messageType=PROPERTIES |
| 3 | 影子里有,前端没有 | 字段名与物模型不一致 |
| 4 | HEX 函数返回 null | 长度奇数,先 hex_stripSpaces |
| 5 | CRC 不通过 | 大小写不一致,统一 .toUpperCase() |
| 6 | 子设备数据落到网关 | 没注册子设备 / subDevice 字段名错 |
| 7 | 中文乱码 | 用 hex_fromGBK / hex_toGBK 替代 ASCII 系列 |
| 8 | toJson 抛异常 | 先 isJson(s) 再转 |
| 9 | 应答重复下发 | 数组多条都挂了 replyPayload,只留一条 |
| 10 | event=offline 不生效 | messageType 必须是 EVENT,且 event 字段大小写敏感 |