IoT企业物联网平台,从设备端到云端业务系统全链路开发实战( 二 )


IoT企业物联网平台,从设备端到云端业务系统全链路开发实战

文章插图
 
当设备有数据上报后,我们就可以在表格存储的iot_data表中实时看到存储的数据了 。如下图:
IoT企业物联网平台,从设备端到云端业务系统全链路开发实战

文章插图
 
在企业实例的日志服务中,我们可以查看到完整的流转日志,协助我们排查数据链路异常 。如下图:
IoT企业物联网平台,从设备端到云端业务系统全链路开发实战

文章插图
 
业务服务器实时接收数据IoT场景中有些数据需要业务系统实时处理,这时我们可以通过服务端订阅AMQP方式,实时接收设备上报的数据 。
首先,我们要在企业实例的服务端订阅中,创建一个新的消费组,用来接收特定类型的消息 。如下图:
IoT企业物联网平台,从设备端到云端业务系统全链路开发实战

文章插图
 
然后,我们在云产品流转中创建规则引擎,编写数据处理SQL,配置流转目的地为上面创建的服务端订阅消费组 。
IoT企业物联网平台,从设备端到云端业务系统全链路开发实战

文章插图
 
最后,我们在业务服务器编写程序,使用阿里云账号的AccessKey与IoT企业实例建立AMQP连接,参考代码如下:
public static void main(String[] args) throws Exception {//参数说明String accessKey = "子账号accessKey";String accessSecret = "子账号accessSecret";String consumerGroupId = "消费组Id";String iotInstanceId = "企业实例Id";long timeStamp = System.currentTimeMillis();//签名方法:支持hmacmd5,hmacsha1和hmacsha256String signMethod = "hmacsha1";String clientId = "ecs_"+System.currentTimeMillis();String userName = clientId + "|authMode=aksign"+ ",signMethod=" + signMethod+ ",timestamp=" + timeStamp+ ",authId=" + accessKey+ ",iotInstanceId=" + iotInstanceId+ ",consumerGroupId=" + consumerGroupId+ "|";//password组装String signContent = "authId=" + accessKey + "×tamp=" + timeStamp;String password = doSign(signContent,accessSecret, signMethod);//按照qpid-jms的规范,组装连接URL 。String connectionUrl = "failover:(amqps://"+iotInstanceId+".amqp.iothub.aliyuncs.com:5671?amqp.idleTimeout=80000)"+ "?failover.reconnectDelay=30";Hashtable<String, String> hashtable = new Hashtable<>();hashtable.put("connectionfactory.SBCF",connectionUrl);hashtable.put("queue.QUEUE", "default");hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.Apache.qpid.jms.jndi.JmsInitialContextFactory");Context context = new InitialContext(hashtable);ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");Destination queue = (Destination)context.lookup("QUEUE");// Create ConnectionConnection connection = cf.createConnection(userName, password);((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);// Create Session// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);connection.start();// Create Receiver LinkMessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(messageListener);}在AMQP的回调中处理收到的业务数据,参考代码如下:
private static MessageListener messageListener = new MessageListener() {@Overridepublic void onMessage(Message message) {try {//如果要对收到的消息做耗时的处理,请异步处理,确保这里不要有耗时逻辑 。byte[] body = message.getBody(byte[].class);String content = new String(body);String topic = message.getStringProperty("topic");String messageId = message.getStringProperty("messageId");String tag = message.getStringProperty("tag");logger.info("receive message"+ ",n topic = " + topic+ ",n messageId = " + messageId+ ",n tag = " + tag+ ",n content = " + content+"n");System.out.println();} catch (Exception e) {e.printStackTrace();}}};启动业务服务器后,我们看到不断有设备数据流转过来,如下图:
IoT企业物联网平台,从设备端到云端业务系统全链路开发实战

文章插图
 
在企业实例的控制台,服务端订阅中,我们也可以看到消费组的运行情况,包括消费速率,消息堆积量,消费客户端列表,如下图:
IoT企业物联网平台,从设备端到云端业务系统全链路开发实战

文章插图
 
在企业实例的控制台,日志服务中,我们可以看到完整的消息流转日志,如下图:


推荐阅读