package cn.exrick.xboot.your.util; import cn.exrick.xboot.your.entity.Alarm; import cn.exrick.xboot.your.entity.Car; import cn.exrick.xboot.your.entity.EventLog; import cn.exrick.xboot.your.service.IAlarmService; import cn.exrick.xboot.your.service.ICarService; import cn.exrick.xboot.your.service.IEventLogService; import cn.hutool.json.JSONArray; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MQTT { @Autowired private IAlarmService iAlarmService; @Autowired private ICarService iCarService; @Autowired private HaiKangPost haiKangPost; @Autowired private IEventLogService iEventLogService; public static void main(String[] args) { MQTT mqtt = new MQTT(); mqtt.event("28156526","artemis_28156526_AQ4ZLCRT","X34YB59A"); } public void event(String clientid,String userName,String passWord){ String HOST = "tcp://10.10.101.1:1883"; //String HOST = "tcp://111.63.178.115:1883"; String TOPIC = "artemis/event_msa_alarm/5201154049/admin"; int qos = 1; // String clientid = "28156526"; // String userName = "artemis_28156526_PSDARORD"; // String passWord = "T48B7XI6"; try { // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence()); // MQTT的连接设置 MqttConnectOptions options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); // 设置连接的用户名 options.setUserName(userName); // 设置连接的密码 options.setPassword(passWord.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); // 设置回调函数 client.setCallback(new MqttCallback() { public void connectionLost(Throwable cause) { System.out.println("connectionLost"); String topicInfo = haiKangPost.getTopicInfo(); System.out.println("重连-订阅信息:"+topicInfo); JSONObject jsonObject = JSONUtil.parseObj(topicInfo); if(jsonObject.get("data")!=null){ String data = jsonObject.get("data").toString(); JSONObject jsonObject2 = JSONUtil.parseObj(data); String clientId = jsonObject2.get("clientId").toString(); String userName = jsonObject2.get("userName").toString(); String password = jsonObject2.get("password").toString(); event(clientId,userName,password); } } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------"+ token.isComplete()); } public void messageArrived(String topic, MqttMessage message) throws Exception { // System.out.println("topic:"+topic); // System.out.println("Qos:"+message.getQos()); // System.out.println("message content:"+new String(message.getPayload())); String s = new String(message.getPayload()); if(!s.equals("close")){ JSONObject jsonObject = JSONUtil.parseObj(s); if(jsonObject.get("params")!=null){ String params = jsonObject.get("params").toString(); if(JSONUtil.parseObj(params)!=null){ String events = JSONUtil.parseObj(params).get("events").toString(); JSONArray objects = JSONUtil.parseArray(events); for(int i=0;i wrapper = new QueryWrapper(); wrapper.eq("code",vehicleIndexCode); Car car = iCarService.getOne(wrapper); if(car==null){ return; } alarm.setAlarmId(alarmId); alarm.setCarId(car.getId()); alarm.setCarNo(car.getCarNo()); alarm.setCarUserId(car.getUserId()); alarm.setFollowUserId(car.getFollowUserId()); alarm.setAlarmTime(alarmTime); iAlarmService.saveOrUpdate(alarm); EventLog eventLog = new EventLog(); eventLog.setType(alarm.getType()); eventLog.setRefId(alarmId); eventLog.setCarNo(car.getCarNo()); iEventLogService.saveOrUpdate(eventLog); } } } } } }); client.connect(options); //订阅消息 client.subscribe(TOPIC, qos); } catch (Exception e) { e.printStackTrace(); } } }