• 沒有找到結果。

package com.huawei.iot.amqp.jms;

import org.apache.qpid.jms.JmsConnection;

import org.apache.qpid.jms.JmsConnectionFactory;

import org.apache.qpid.jms.JmsConnectionListener;

import org.apache.qpid.jms.message.JmsInboundMessageDispatch;

import org.apache.qpid.jms.transports.TransportOptions;

import org.apache.qpid.jms.transports.TransportSupport;

文档版本 29 (2021-07-22) 版权所有 © 华为技术有限公司 110

import javax.jms.*;

import javax.naming.Context;

import javax.naming.InitialContext;

import java.net.URI;

import java.util.Hashtable;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

public class HwIotAmqpJavaClientDemo{

//异步线程池,参数可以根据业务特点作调整,也可以用其他异步方式来处理。

private final static ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5000));

public static void main(String[] args) throws Exception{

//连接凭证接入键值。

String accessKey = "${yourAccessKey}";

long timeStamp = System.currentTimeMillis();

//UserName组装方法,请参见文档:AMQP客户端接入说明。

String userName = "accessKey=" + accessKey + "|timestamp=" + timeStamp;

//连接凭证接入码。

String password = "${yourAccessCode}";

//按照qpid-jms的规范,组装连接URL。

String connectionUrl = "amqps://${UUCID}.iot-amqps.cn-north-4.myhuaweicloud.com:5671?

amqp.vhost=default&amqp.idleTimeout=8000&amqp.saslMechanisms=PLAIN";

Hashtable<String, String> hashtable = new Hashtable<>();

hashtable.put("connectionfactory.HwConnectionURL", connectionUrl);

//队列名,可以使用默认队列DefaultQueue String queueName = "${yourQueue}";

hashtable.put("queue.HwQueueName", queueName);

hashtable.put(Context.INITIAL_CONTEXT_FACTORY,

"org.apache.qpid.jms.jndi.JmsInitialContextFactory");

Context context = new InitialContext(hashtable);

JmsConnectionFactory cf = (JmsConnectionFactory) context.lookup("HwConnectionURL");

//同一个链接可创建多个queue,与前面queue.HwQueueName作好配对就行 Destination queue = (Destination) context.lookup("HwQueueName");

//信任服务端

TransportOptions to = new TransportOptions(); to.setTrustAll(true);

cf.setSslContext(TransportSupport.createJdkSslContext(to));

// 创建连接

Connection connection = cf.createConnection(userName, password);

((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);

// 创建 Session

// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。

// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

connection.start();

// 创建 Receiver Link

MessageConsumer consumer = session.createConsumer(queue);

//处理消息有两种方式

// 1,主动拉数据(推荐),参照receiveMessage(consumer)

// 2, 添加监听,参照consumer.setMessageListener(messageListener), 服务端主动推数据给客户 端,但得考虑接受的数据速率是客户能力能够承受住的

receiveMessage(consumer);

// consumer.setMessageListener(messageListener);

}

private static void receiveMessage(MessageConsumer consumer) throws JMSException{

while (true){

e.printStackTrace();

} } }

private static MessageListener messageListener = new MessageListener(){

@Override

private static void processMessage(Message message) { try {

String body = message.getBody(String.class); String content = new String(body);

System.out.println("receive an message, the content is " + content);

} catch (Exception e){

System.out.println("processMessage occurs error: " + e.getMessage());

e.printStackTrace();

} }

private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener(){

/**

* 连接成功建立。

*/

@Override

public void onConnectionEstablished(URI remoteURI){

System.out.println("onConnectionEstablished, remoteUri:" + remoteURI);

} /**

* 尝试过最大重试次数之后,最终连接失败。

*/

@Override

public void onConnectionFailure(Throwable error){

System.out.println("onConnectionFailure, " + error.getMessage());

} /**

* 连接中断。

*/

@Override

public void onConnectionInterrupted(URI remoteURI){

System.out.println("onConnectionInterrupted, remoteUri:" + remoteURI);

} /**

* 连接中断后又自动重连上。

*/

@Override

public void onConnectionRestored(URI remoteURI){

System.out.println("onConnectionRestored, remoteUri:" + remoteURI);

}

@Override

public void onInboundMessage(JmsInboundMessageDispatch envelope){

System.out.println("onInboundMessage, " + envelope);

文档版本 29 (2021-07-22) 版权所有 © 华为技术有限公司 112

}

@Override

public void onSessionClosed(Session session, Throwable cause){

System.out.println("onSessionClosed, session=" + session + ", cause =" + cause);

}

@Override

public void onConsumerClosed(MessageConsumer consumer, Throwable cause){

System.out.println("MessageConsumer, consumer=" + consumer + ", cause =" + cause);

}

@Override

public void onProducerClosed(MessageProducer producer, Throwable cause){

System.out.println("MessageProducer, producer=" + producer + ", cause =" + cause); 码开发请参考Java Demo使用说明。

下发设备消息关键参数说明:

X-Auth-Token 必选 String Header 用户Token。通过调用IAM服务 获取 IAM用户Token接口获取,接口返回 的响应消息头中“X-Subject-Token”就是需要获取的用户 Token。简要的获取方法样例请参见 Token认证。

Instance-Id 可选 String Header 实例ID。物理多租下各实例的唯一 标识,一般华为云租户无需携带该参 数,仅在物理多租场景下从管理面访 问API时需要携带该参数。

project_id 必选 String Path 项目ID。获取方法请参见 获取项目 ID。

device_id 必选 String Path 填写迁移设备的设备ID。

message 必选 String Body 设备执行的消息,字符串,具体格式 需要应用和设备约定。

topic_full_

name 可选 String(

128) Body 迁移设备的topic。

接口样例:

POST https://{Endpoint}/v5/iot/{project_id}/devices/{device_id}/messages Content-Type: application/json

X-Auth-Token: ********

Instance-Id: ********

{ "message": "{\"switch\":\"off\"}",

文档版本 29 (2021-07-22) 版权所有 © 华为技术有限公司 113

"topic_full_name": "/aircondition/cmd"

}

JAVA核心代码样例如下:

public class DeviceMessage {

public static void main(String[] args) throws KeyManagementException, NoSuchAlgorithmException, IOException {

String token = Authentication.getToken();

Map<String, String> headers = new HashMap<String, String>();

headers.put("Content-Type", "application/json");

headers.put("X-Auth-Token", token);

Message message = new Message();

message.setMessage("{\"switch\" :\"off\"}");

message.setTopic_full_name("/aircondition/cmd");

String projectId = "11111";

String deviceId = "5fae45e358115902ce609882_20201113";

String url = "https://iotda.cn-north-4.myhuaweicloud.com/v5/iot/%s/devices/%s/messages";

url = String.format(url, projectId, deviceId);

HttpUtils httpUtils = new HttpUtils();

httpUtils.initClient();

StreamClosedHttpResponse httpResponse = httpUtils.doPost(url, headers, JsonUtils.Obj2String(message));

System.out.println(httpResponse.getContent());

}