使用activeMQ进行android推送
activeMQ下载地址:http://activemq.apache.org/download.html
下载后是一个压缩包:apache-activemq-5.9.0-bin.zip
启动方式:
解压缩,进入apache-activemq-5.9.0-bin\apache-activemq-5.9.0\bin,双击activemq.bat,即可启动activeMQ服务
启动之后:
android客户端推送采用mqtt(paho-mqtt-client-1.0.1.jar),依赖包见附件
但是为了测试,我写了一个swing图形界面,充当手机客户端,依赖的jar包仍然是paho-mqtt-client-1.0.1.jar.界面如下:
使用方法:点击[启动]按钮,开始接收推送消息
对应的主类是:MqttSwing,用于接收推送消息.
我还写了一个发送推送消息的swing图形界面,充当推送后管系统,界面如下:
使用方法:点击[连接]按钮,才可以发送推送消息
对应的主类:PusherApp,用于发送推送消息.
核心代码介绍如下.
客户端连接activeMQ,建立连接(只有建立连接,才能接收到推送消息)
方法名:connect,做了两件事:(1)建立连接;(2)订阅主题(topic)
/*** * 客户端和activeMQ服务器建立连接 * @param BROKER_URL * @param clientId : 用于标识客户端,相当于ios中的device token * @param TOPIC * @param isCleanSession :false--可以接受离线消息; * @return 是否启动成功 */ private boolean connect(String BROKER_URL,String clientId,String TOPIC,boolean isCleanSession){ try { ComponentUtil.appendResult(resultTextPane, "connect time:"+TimeHWUtil.getCurrentMiniuteSecond(), true); mqttClient = new MqttClient(BROKER_URL, clientId, new MemoryPersistence()); MqttConnectOptions options= new MqttConnectOptions(); options.setCleanSession(isCleanSession);//mqtt receive offline message ComponentUtil.appendResult(resultTextPane, "isCleanSession:"+isCleanSession, true); options.setKeepAliveInterval(30); //推送回调类,在此类中处理消息,用于消息监听 mqttClient.setCallback(new MyCallBack(MqttSwing.this)); boolean isSuccess=false; try { mqttClient.connect(options);//CLIENT ID CAN NOT BE SAME isSuccess=true; } catch (Exception e) { if(isPrintException){ e.printStackTrace(); } } if(!isSuccess){ String message="连接失败,请检查client id是否重复了 或者activeMQ是否启动"; ComponentUtil.appendResult(resultTextPane, message, true); GUIUtil23.warningDialog(message); return false; }else{ //Subscribe to topics mqttClient.subscribe(new String[]{TOPIC,clientId}); System.out.println("topic:"+TOPIC+", "+(clientId)); ComponentUtil.appendResult(resultTextPane, "TOPIC:"+TOPIC+", "+(clientId), true); } } catch (MqttException e) { if(isPrintException){ e.printStackTrace();} GUIUtil23.errorDialog(e.getMessage()); return false; } return true; }
推送消息到来时的回调类:MyCallBack
package com.mqtt.hw.callback; import org.apache.commons.lang.StringEscapeUtils; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; import com.mqtt.hw.MqttSwing; import com.time.util.TimeHWUtil; public class MyCallBack implements MqttCallback { private MqttSwing mqttSwing; public MyCallBack(MqttSwing mqttSwing) { super(); this.mqttSwing = mqttSwing; } @Override public void connectionLost(Throwable cause) { } @Override public void messageArrived(MqttTopic topic, MqttMessage message) throws Exception { System.out.println("messageArrived...."+TimeHWUtil.getCurrentMiniuteSecond()); String messageStr=StringEscapeUtils.unescapeHtml(new String(message.getPayload())); System.out.println("message:"+messageStr); this.mqttSwing.receiveMessage(messageStr); //使窗口处于激活状态 } @Override public void deliveryComplete(MqttDeliveryToken token) { } }
推送者与activeMQ建立连接:
/** * 初始化connection和session * * @throws Exception */ private void init(/* String mqIp,boolean transacted */) throws Exception { if (!DialogUtil.verifyTFEmpty(serverIpTextField, "服务器ip")) { return; } String transactedStr = transactedTextField.getText(); boolean transacted = false; if (ValueWidget.isNullOrEmpty(transactedStr)) { transacted = false; } else { transacted = Boolean.parseBoolean(transactedStr); } String message = "transacted:" + transacted; ComponentUtil.appendResult(resultTextArea, message, true); System.out.println(message); String brokerUrl = String.format(BROKER_URL, serverIpTextField.getText()); // 创建链接工厂 TopicConnectionFactory factory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, brokerUrl); ComponentUtil.appendResult(resultTextArea, "url:" + brokerUrl, true); // 通过工厂创建一个连接 connection = factory.createTopicConnection(); // 启动连接 connection.start(); ComponentUtil.appendResult(resultTextArea, "启动connection 成功", true); // 创建一个session会话 transacted session = connection.createTopicSession( transacted /* Boolean.FALSE */, Session.AUTO_ACKNOWLEDGE); }
项目源代码见附件mqtt_swing.zip
手机android客户端(测试推送)见附件android-mqtt-push-master.zip
也可以从 https://github.com/tokudu/AndroidPushNotificationsDemo
下载
详细配置参阅附件mqtt推送详解.zip
依赖的jar包
io0007-find_progess-0.0.8.4-SNAPSHOT.jar
io0007-find_progess-0.0.8.4-SNAPSHOT-sources.jar
---2017年5月11号更新--
最新代码:
GitHub:
https://github.com/liuyu520/mqtt_client_swing.git
https://github.com/liuyu520/io0007
相关推荐
activeMQ推送服务端和客户端完整案例
继上篇的MQTT学习,此篇主要是实现一个安卓客户端,利用activemq服务器,实现安卓客户端之间的推送 首先需要说明客户端的结构 客户端的组成如下:
ActiveMQ MQTT Android客户端Demo
SpringBoot+ActiveMq+MQTT实现消息的发送和接收 后台消费者、生产者、消息发送接口、发送消息业务类等相关配置
ActiveMQ MQTT Android 客户端Demo
java中使用消息中间件ActiveMQ的MQTT协议发布消息使用fusesource,fusesource提供三种方式实现发布消息的方式,分别是阻塞式(BlockingConnection)、回调式(CallbackConnection)和Future样式(FutureConnection)
使用SpringMVC、Spring、ActiveMQ发送和接受Mqtt消息。
Activemq-MQTT-Websocket库Js文件mqttws31.js 前端使用Websocket连接Activemq中间件的Js库文件
mqtt前端连接客户端demo--MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型...
此附件是Apache下面的一个ActiveMq客户端,如果需要在本地调试,不链接远程服务器的MQ可以下载研究。
activeMq 实现mqtt 协议的 client ,mqtt-spy. 用于测试
activemq消息测试工具
基于activemq实现Android推送(服务端+客户端)
mqtt协议时IBM开发的一个协议,facebook的android客户端message即时...这段代码可以实现android的消息推送。还可以在此基础实现即时通讯,在运行这段代码,还需要安装activeMQ服务器。下载地址http://activemq.apache.org/
activemq向android推送消息实例,测试环境activemq5.9.1版,别忘了修改你的主机地址
通过tomcat服务器,向网页发送即时消息。comet的小例子,可以直接运行,用tomcat6.0。
MQTT-Client-Framework已通过大量代理进行了测试: 蚊子帕霍Rabbitmq 蜂巢rsmb 莫斯卡Vernemq Emqtt 莫凯特ActiveMQ 阿波罗CloudMQTT ws hbmqtt(仅限MQTTv311,有限制)用法有关示例应用程序,请参见 创建一个新...
网上其实有的 用的swing界面 演示mqtt 推送和接收消息的全过程