package com.huawei.iot.amqp.jms;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.transports.TransportOptions;
import org.apache.qpid.jms.transports.TransportSupport;
文档版本 29 (2021-07-22) 版权所有 © 华为技术有限公司 110
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.net.URI;
import java.util.Hashtable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class HwIotAmqpJavaClientDemo{
//异步线程池,参数可以根据业务特点作调整,也可以用其他异步方式来处理。
private final static ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5000));
public static void main(String[] args) throws Exception{
//连接凭证接入键值。
String accessKey = "${yourAccessKey}";
long timeStamp = System.currentTimeMillis();
//UserName组装方法,请参见文档:AMQP客户端接入说明。
String userName = "accessKey=" + accessKey + "|timestamp=" + timeStamp;
//连接凭证接入码。
String password = "${yourAccessCode}";
//按照qpid-jms的规范,组装连接URL。
String connectionUrl = "amqps://${UUCID}.iot-amqps.cn-north-4.myhuaweicloud.com:5671?
amqp.vhost=default&amqp.idleTimeout=8000&amqp.saslMechanisms=PLAIN";
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.HwConnectionURL", connectionUrl);
//队列名,可以使用默认队列DefaultQueue String queueName = "${yourQueue}";
hashtable.put("queue.HwQueueName", queueName);
hashtable.put(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
JmsConnectionFactory cf = (JmsConnectionFactory) context.lookup("HwConnectionURL");
//同一个链接可创建多个queue,与前面queue.HwQueueName作好配对就行 Destination queue = (Destination) context.lookup("HwQueueName");
//信任服务端
TransportOptions to = new TransportOptions(); to.setTrustAll(true);
cf.setSslContext(TransportSupport.createJdkSslContext(to));
// 创建连接
Connection connection = cf.createConnection(userName, password);
((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
// 创建 Session
// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// 创建 Receiver Link
MessageConsumer consumer = session.createConsumer(queue);
//处理消息有两种方式
// 1,主动拉数据(推荐),参照receiveMessage(consumer)
// 2, 添加监听,参照consumer.setMessageListener(messageListener), 服务端主动推数据给客户 端,但得考虑接受的数据速率是客户能力能够承受住的
receiveMessage(consumer);
// consumer.setMessageListener(messageListener);
}
private static void receiveMessage(MessageConsumer consumer) throws JMSException{
while (true){
e.printStackTrace();
} } }
private static MessageListener messageListener = new MessageListener(){
@Override
private static void processMessage(Message message) { try {
String body = message.getBody(String.class); String content = new String(body);
System.out.println("receive an message, the content is " + content);
} catch (Exception e){
System.out.println("processMessage occurs error: " + e.getMessage());
e.printStackTrace();
} }
private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener(){
/**
* 连接成功建立。
*/
@Override
public void onConnectionEstablished(URI remoteURI){
System.out.println("onConnectionEstablished, remoteUri:" + remoteURI);
} /**
* 尝试过最大重试次数之后,最终连接失败。
*/
@Override
public void onConnectionFailure(Throwable error){
System.out.println("onConnectionFailure, " + error.getMessage());
} /**
* 连接中断。
*/
@Override
public void onConnectionInterrupted(URI remoteURI){
System.out.println("onConnectionInterrupted, remoteUri:" + remoteURI);
} /**
* 连接中断后又自动重连上。
*/
@Override
public void onConnectionRestored(URI remoteURI){
System.out.println("onConnectionRestored, remoteUri:" + remoteURI);
}
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope){
System.out.println("onInboundMessage, " + envelope);
文档版本 29 (2021-07-22) 版权所有 © 华为技术有限公司 112
}
@Override
public void onSessionClosed(Session session, Throwable cause){
System.out.println("onSessionClosed, session=" + session + ", cause =" + cause);
}
@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable cause){
System.out.println("MessageConsumer, consumer=" + consumer + ", cause =" + cause);
}
@Override
public void onProducerClosed(MessageProducer producer, Throwable cause){
System.out.println("MessageProducer, producer=" + producer + ", cause =" + cause); 码开发请参考Java Demo使用说明。
下发设备消息关键参数说明:
X-Auth-Token 必选 String Header 用户Token。通过调用IAM服务 获取 IAM用户Token接口获取,接口返回 的响应消息头中“X-Subject-Token”就是需要获取的用户 Token。简要的获取方法样例请参见 Token认证。
Instance-Id 可选 String Header 实例ID。物理多租下各实例的唯一 标识,一般华为云租户无需携带该参 数,仅在物理多租场景下从管理面访 问API时需要携带该参数。
project_id 必选 String Path 项目ID。获取方法请参见 获取项目 ID。
device_id 必选 String Path 填写迁移设备的设备ID。
message 必选 String Body 设备执行的消息,字符串,具体格式 需要应用和设备约定。
topic_full_
name 可选 String(
128) Body 迁移设备的topic。
接口样例:
POST https://{Endpoint}/v5/iot/{project_id}/devices/{device_id}/messages Content-Type: application/json
X-Auth-Token: ********
Instance-Id: ********
{ "message": "{\"switch\":\"off\"}",
文档版本 29 (2021-07-22) 版权所有 © 华为技术有限公司 113
"topic_full_name": "/aircondition/cmd"
}
JAVA核心代码样例如下:
public class DeviceMessage {
public static void main(String[] args) throws KeyManagementException, NoSuchAlgorithmException, IOException {
String token = Authentication.getToken();
Map<String, String> headers = new HashMap<String, String>();
headers.put("Content-Type", "application/json");
headers.put("X-Auth-Token", token);
Message message = new Message();
message.setMessage("{\"switch\" :\"off\"}");
message.setTopic_full_name("/aircondition/cmd");
String projectId = "11111";
String deviceId = "5fae45e358115902ce609882_20201113";
String url = "https://iotda.cn-north-4.myhuaweicloud.com/v5/iot/%s/devices/%s/messages";
url = String.format(url, projectId, deviceId);
HttpUtils httpUtils = new HttpUtils();
httpUtils.initClient();
StreamClosedHttpResponse httpResponse = httpUtils.doPost(url, headers, JsonUtils.Obj2String(message));
System.out.println(httpResponse.getContent());
}