`

activeMQ 推送之mqtt客户端

阅读更多

使用activeMQ进行android推送

activeMQ下载地址:http://activemq.apache.org/download.html

下载后是一个压缩包:apache-activemq-5.9.0-bin.zip

启动方式:

解压缩,进入apache-activemq-5.9.0-bin\apache-activemq-5.9.0\bin,双击activemq.bat,即可启动activeMQ服务

 启动之后:

 android客户端推送采用mqtt(paho-mqtt-client-1.0.1.jar),依赖包见附件

 

但是为了测试,我写了一个swing图形界面,充当手机客户端,依赖的jar包仍然是paho-mqtt-client-1.0.1.jar.界面如下:

使用方法:点击[启动]按钮,开始接收推送消息
 对应的主类是:
MqttSwing,用于接收推送消息.

 

我还写了一个发送推送消息的swing图形界面,充当推送后管系统,界面如下:

使用方法:点击[连接]按钮,才可以发送推送消息
 对应的主类:PusherApp,用于发送推送消息.

核心代码介绍如下.

客户端连接activeMQ,建立连接(只有建立连接,才能接收到推送消息)

方法名:connect,做了两件事:(1)建立连接;(2)订阅主题(topic)

 

/***
	 * 客户端和activeMQ服务器建立连接
	 * @param BROKER_URL
	 * @param clientId : 用于标识客户端,相当于ios中的device token
	 * @param TOPIC
	 * @param isCleanSession :false--可以接受离线消息;
	 * @return 是否启动成功 
	 */
	private boolean connect(String BROKER_URL,String clientId,String TOPIC,boolean isCleanSession){
		try {
			ComponentUtil.appendResult(resultTextPane, "connect time:"+TimeHWUtil.getCurrentMiniuteSecond(), true);
            mqttClient = new MqttClient(BROKER_URL, clientId, new MemoryPersistence());
            MqttConnectOptions options= new MqttConnectOptions();
            options.setCleanSession(isCleanSession);//mqtt receive offline message
            ComponentUtil.appendResult(resultTextPane, "isCleanSession:"+isCleanSession, true);
            options.setKeepAliveInterval(30);
            //推送回调类,在此类中处理消息,用于消息监听
            mqttClient.setCallback(new MyCallBack(MqttSwing.this));
            boolean isSuccess=false;
            try {
				mqttClient.connect(options);//CLIENT ID CAN NOT BE SAME
				isSuccess=true;
			} catch (Exception e) {
				if(isPrintException){
					e.printStackTrace();
				}
			}
            if(!isSuccess){
            	String message="连接失败,请检查client id是否重复了 或者activeMQ是否启动";
            	ComponentUtil.appendResult(resultTextPane, message, true);
            	GUIUtil23.warningDialog(message);
            	return false;
            }else{
            //Subscribe to topics 
	            mqttClient.subscribe(new String[]{TOPIC,clientId});
	           
	            System.out.println("topic:"+TOPIC+",  "+(clientId));
	            ComponentUtil.appendResult(resultTextPane, "TOPIC:"+TOPIC+",  "+(clientId), true);
            }

        } catch (MqttException e) {
        	if(isPrintException){
            e.printStackTrace();}
            GUIUtil23.errorDialog(e.getMessage());
            return false;
        }
		return true;
	}

 

 

推送消息到来时的回调类:MyCallBack

 

package com.mqtt.hw.callback;

import org.apache.commons.lang.StringEscapeUtils;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;

import com.mqtt.hw.MqttSwing;
import com.time.util.TimeHWUtil;

public class MyCallBack implements MqttCallback {
	private MqttSwing mqttSwing;
	
	
	
	public MyCallBack(MqttSwing mqttSwing) {
		super();
		this.mqttSwing = mqttSwing;
	}

	@Override
	public void connectionLost(Throwable cause) {
		
	}

	@Override
	public void messageArrived(MqttTopic topic, MqttMessage message)
			throws Exception {
		System.out.println("messageArrived...."+TimeHWUtil.getCurrentMiniuteSecond());
		String messageStr=StringEscapeUtils.unescapeHtml(new String(message.getPayload()));
		System.out.println("message:"+messageStr);
		this.mqttSwing.receiveMessage(messageStr);
		//使窗口处于激活状态

	}

	@Override
	public void deliveryComplete(MqttDeliveryToken token) {
		
	}

}

 

 

推送者与activeMQ建立连接:

 

/**
	 * 初始化connection和session
	 * 
	 * @throws Exception
	 */
	private void init(/* String mqIp,boolean transacted */) throws Exception {
		if (!DialogUtil.verifyTFEmpty(serverIpTextField, "服务器ip")) {
			return;
		}
		String transactedStr = transactedTextField.getText();
		boolean transacted = false;
		if (ValueWidget.isNullOrEmpty(transactedStr)) {
			transacted = false;
		} else {
			transacted = Boolean.parseBoolean(transactedStr);
		}
		String message = "transacted:" + transacted;
		ComponentUtil.appendResult(resultTextArea, message, true);
		System.out.println(message);
		String brokerUrl = String.format(BROKER_URL,
				serverIpTextField.getText());
		// 创建链接工厂
		TopicConnectionFactory factory = new ActiveMQConnectionFactory(
				ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD, brokerUrl);
		ComponentUtil.appendResult(resultTextArea, "url:" + brokerUrl, true);
		// 通过工厂创建一个连接
		
		connection = factory.createTopicConnection();
		// 启动连接
		connection.start();
		ComponentUtil.appendResult(resultTextArea, "启动connection 成功", true);
		// 创建一个session会话 transacted
		session = connection.createTopicSession(
				transacted /* Boolean.FALSE */, Session.AUTO_ACKNOWLEDGE);

	}

 

 

项目源代码见附件mqtt_swing.zip

手机android客户端(测试推送)见附件android-mqtt-push-master.zip

也可以从 https://github.com/tokudu/AndroidPushNotificationsDemo

 

  下载

详细配置参阅附件mqtt推送详解.zip

 依赖的jar包

io0007-find_progess-0.0.8.4-SNAPSHOT.jar

io0007-find_progess-0.0.8.4-SNAPSHOT-sources.jar

 

---2017年5月11号更新--

最新代码:

GitHub:

https://github.com/liuyu520/mqtt_client_swing.git

https://github.com/liuyu520/io0007

  • 大小: 36.2 KB
  • 大小: 166.9 KB
  • 大小: 35.7 KB
  • 大小: 40.3 KB
2
0
分享到:
评论
19 楼 hw1287789687 2017-05-11  
jackyx 写道
你好,请问有IOS和推送后台(服务器端)的资料吗?

最新代码:https://github.com/liuyu520/mqtt_client_swing.git
18 楼 hw1287789687 2017-05-11  
zhaoke2011 写道
运行你的那个20150621 的,提示错误: 找不到或无法加载主类 com.mqtt.hw.MqttClientSwing,新手 求教  qq  502727023 谢谢

最新代码:https://github.com/liuyu520/mqtt_client_swing.git
17 楼 hw1287789687 2017-05-11  
bbsanwei 写道
如果客户端下线了,上线后,消息还会推送给他吗,这个有设置吗

最新代码:https://github.com/liuyu520/mqtt_client_swing.git
16 楼 Yunba云巴 2016-12-05  
bbsanwei 写道
如果客户端下线了,上线后,消息还会推送给他吗,这个有设置吗


我们也是基于MQTT协议实现的实时通信系统,消息推送只是我们其中的一项产品服务。

实现Android推送方面,客户端在集成我们的 Android SDK后,服务端便可通过 SDK 或使用 RESTful API,向 Android 客户端发消息。

为了保证消息的实时性,我们Android SDK 会启动一个后台的 Service,创建并保持到云巴https://yunba.io/服务器的长连接,从而保证了消息推送的实时性。

同样为了保证消息能够被送达,我们 SDK 支持 离线消息 的功能,可保证消息送达客户端。

也就是说,在推送消息时,如果客户端当前不在线,消息将暂存在云巴服务器上(多达 50 条,长达 15 天)。 当客户端上线并成功连接到云巴的服务器后,服务器会把离线消息推送给该客户端。客户端成功接收后,服务器才会删除保存的离线消息。

iOS推送同理,除此以外,我们的 SDK 集成了 APNs,这样开发者就无需开发与 APNs 对接的模块,也不必自己负责 Device Token 的更新。
15 楼 bbsanwei 2016-08-23  
如果客户端下线了,上线后,消息还会推送给他吗,这个有设置吗
14 楼 a406979945 2016-01-12  
你好,这个swing的消息推送与接收都能连接到服务器  但是消息发送出来 后 并不能接收到,
13 楼 zhaoke2011 2015-08-08  
运行你的那个20150621 的,提示错误: 找不到或无法加载主类 com.mqtt.hw.MqttClientSwing,新手 求教  qq  502727023 谢谢
11 楼 hw1287789687 2015-06-21  
最新的源代码 下载地址:http://pan.baidu.com/s/1sjLYibb
10 楼 hw1287789687 2015-06-21  
jackyx 写道
import com.swing.component.AssistPopupTextPane
没找到这个包呢,能否发一份最新完整的给我

这个类在附件的io0007-find_progess-0.0.8.4-SNAPSHOT.jar
9 楼 c1007857613 2015-06-05  
引用
    
8 楼 hw1287789687 2014-09-30  
jackyx 写道
你好,请问有IOS和推送后台(服务器端)的资料吗?

ios 推送资料下载地址:http://pan.baidu.com/s/1i35kRch
7 楼 hw1287789687 2014-09-30  
import com.swing.component.AssistPopupTextPane 所在的jar包下载地址:
http://pan.baidu.com/s/1mghyjUw ,http://pan.baidu.com/s/1c0y90u0
mqtt推送详解.zip  下载地址:http://pan.baidu.com/s/1o6r31Ui
6 楼 hw1287789687 2014-09-30  
jackyx 写道
import com.swing.component.AssistPopupTextPane
没找到这个包呢,能否发一份最新完整的给我

http://pan.baidu.com/s/1hqlAekW
5 楼 jackyx 2014-09-28  
你的jar不全,能否提供下载?
4 楼 jackyx 2014-09-28  
import com.swing.component.AssistPopupTextPane
没找到这个包呢,能否发一份最新完整的给我
3 楼 jackyx 2014-09-28  
你好,请问有IOS和推送后台(服务器端)的资料吗?
2 楼 lwhhuahua 2014-09-11  
mqtt推送详解.zip  这个附件呢?
1 楼 hw1287789687 2014-07-02  
需要什么资料可以给我留言,我还做过ios 推送后台(服务器端)

相关推荐

Global site tag (gtag.js) - Google Analytics