package com.wgcloud.controller; import cn.hutool.json.JSONArray; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.wgcloud.entity.*; import com.wgcloud.service.*; import com.wgcloud.util.DateUtil; import com.wgcloud.util.FormatUtil; import com.wgcloud.util.ThreadPoolUtil; import com.wgcloud.util.TokenUtils; import com.wgcloud.util.msg.WarnMailUtil; import com.wgcloud.util.staticvar.BatchData; import com.wgcloud.util.staticvar.StaticKeys; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import javax.annotation.Resource; import java.util.*; /** * @version v3.3 * @ClassName:AgentGoController.java * @author: http://www.wgstart.com * @date: 2021年1月16日 * @Description: agent上报数据处理 * @Copyright: 2019-2021 wgcloud. All rights reserved. */ @Controller @RequestMapping("/agentGo") public class AgentGoController { private static final Logger logger = LoggerFactory.getLogger(AgentGoController.class); @Resource private LogInfoService logInfoService; @Resource private SystemInfoService systemInfoService; @Autowired private DockerInfoService dockerInfoService; @Autowired private AppInfoService appInfoService; @Autowired private PortInfoService portInfoService; @Autowired private TokenUtils tokenUtils; /** * agent上报数据处理 * * @param paramBean * @return */ @ResponseBody @RequestMapping("/minTask") public JSONObject minTask(@RequestBody String paramBean) { JSONObject agentJsonObject = (JSONObject) JSONUtil.parse(paramBean); logger.debug("agent上报数据-------------" + agentJsonObject.toString()); JSONObject resultJson = new JSONObject(); if (!tokenUtils.checkAgentToken(agentJsonObject)) { logger.error(StaticKeys.TOKEN_ERROR); resultJson.set("result", StaticKeys.TOKEN_ERROR); return resultJson; } //agent配置的主机ip String bindIp = agentJsonObject.getStr("bindIp"); if (isExists(bindIp)) { logger.error("agent multiple times at the same time:" + bindIp); resultJson.set("result", "agent multiple times at the same time:" + bindIp); return resultJson; } //主机名称 String hostName = agentJsonObject.getStr("hostName"); //主机所属用户账号 String account = agentJsonObject.getStr("account"); //主机累积接收流量 String recvBytes = agentJsonObject.getStr("recvBytes"); //主机累积发送流量 String sentBytes = agentJsonObject.getStr("sentBytes"); //agent版本 String agentVer = agentJsonObject.getStr("agentVer"); //cpu核数 String cpuCoresNum = agentJsonObject.getStr("cpuCoresNum"); //系统信息 JSONObject hostInfoJson = agentJsonObject.getJSONObject("hostInfo"); //docker监控信息 JSONObject dockersJson = agentJsonObject.getJSONObject("dockers"); //端口监控信息 JSONObject portInfosJson = agentJsonObject.getJSONObject("portInfos"); //系统负载信息 JSONObject loadJson = agentJsonObject.getJSONObject("load"); //cpu温度信息 JSONArray temperaturesJsonArray = agentJsonObject.getJSONArray("temperatures"); //上报频率s String submitSeconds = agentJsonObject.getStr("submitSeconds"); //连接数量(包括tcp、udp等) String netConnections = agentJsonObject.getStr("netConnections"); //处理cpuinfo JSONArray cpuInfoJson = initCpuJson(agentJsonObject); //cpu使用百分比 Double cpuPercentJson = agentJsonObject.getDouble("cpuPercent"); //物理内存信息 JSONObject virtualMemoryJson = agentJsonObject.getJSONObject("virtualMemory"); //交换区内存信息 JSONObject swapMemoryStatJson = agentJsonObject.getJSONObject("swapMemoryStat"); //网络流量信息 JSONObject netIOCountersJson = agentJsonObject.getJSONObject("netIOCounters"); //进程监控信息 JSONObject processListJson = agentJsonObject.getJSONObject("processList"); Double cpuPercentVal = 0d;//cpu使用百分比 Double memTotalVal = 0d;//物理内存总大小,G Double memPercentVal = 0d;//物理内存使用百分比 Date nowtime = new Date(); //bindIp为空时候,用计算机名称 if (StringUtils.isEmpty(bindIp)) { bindIp = hostName; logger.error("bindIp is null"); if (StringUtils.isEmpty(bindIp)) { resultJson.set("result", "error:bindIp is null"); return resultJson; } } try { //添加cpu使用率 begin if (cpuPercentJson != null) { cpuPercentVal = FormatUtil.formatDouble(cpuPercentJson, 2); addCpuState(cpuPercentJson, nowtime, bindIp); } //添加cpu使用率 end //添加物理内存使用 begin if (virtualMemoryJson != null) { if (virtualMemoryJson.getLong("total") != null) { memTotalVal = FormatUtil.formatDouble((virtualMemoryJson.getLong("total") / (double) 1024 / 1024 / 1024), 2); } else { memTotalVal = 0d; } if (virtualMemoryJson.getDouble("usedPercent") != null) { memPercentVal = FormatUtil.formatDouble(virtualMemoryJson.getDouble("usedPercent"), 2); } else { memPercentVal = 0d; } addMemState(memPercentVal, nowtime, bindIp); } //添加物理内存使用 end //系统负载 SysLoadState sysLoadState = null; if (loadJson != null) { sysLoadState = addLoadState(loadJson, nowtime, bindIp); } //网络流量 NetIoState netIoState = null; if (netIOCountersJson != null) { netIoState = addNetIoState(netIOCountersJson, nowtime, bindIp, netConnections); } //进程监控 if (processListJson != null) { addAppState(processListJson, nowtime, bindIp); } //docker监控 if (dockersJson != null) { addDockerState(dockersJson, nowtime, bindIp); } //系统信息 if (hostInfoJson != null && cpuInfoJson != null) { addSystemInfo(hostInfoJson, cpuInfoJson.getJSONObject(0), cpuPercentVal, memPercentVal, memTotalVal, nowtime, bindIp, submitSeconds, agentVer , recvBytes, sentBytes, netIoState, cpuCoresNum, hostName, sysLoadState, netConnections, swapMemoryStatJson, account); } //端口信息 if (portInfosJson != null) { addPortInfos(portInfosJson, nowtime, bindIp); } //cpu温度信息 if (temperaturesJsonArray != null) { addCpuTemperatures(temperaturesJsonArray, nowtime, bindIp); } resultJson.set("result", "success"); } catch (Exception e) { logger.error("解析上报数据错误", e); resultJson.set("result", "error:" + e.toString()); } finally { return resultJson; } } /** * 前置处理cpu信息,排查异常错误信息 * * @param agentJsonObject */ private JSONArray initCpuJson(JSONObject agentJsonObject) { JSONArray cpuInfoJson = null; try { //cpu信息 cpuInfoJson = agentJsonObject.getJSONArray("cpuInfo"); if (cpuInfoJson.size() < 1) { JSONObject cpuInfoJsonTmp = new JSONObject(); cpuInfoJsonTmp.set("cores", 0); cpuInfoJsonTmp.set("modelName", "?"); cpuInfoJson = new JSONArray(); cpuInfoJson.add(cpuInfoJsonTmp); } } catch (Exception e) { logger.error("cpuInfoJson is error : ", e); JSONObject cpuInfoJsonTmp = new JSONObject(); cpuInfoJsonTmp.set("cores", 0); cpuInfoJsonTmp.set("modelName", "?"); cpuInfoJson = new JSONArray(); cpuInfoJson.add(cpuInfoJsonTmp); } return cpuInfoJson; } /** * 处理上报的内存使用率数据 * * @param memPercentJson * @param nowtime * @param bindIp */ private void addMemState(Double memPercentJson, Date nowtime, String bindIp) { try { MemState bean = new MemState(); bean.setUsePer(memPercentJson); bean.setCreateTime(nowtime); bean.setHostname(bindIp); BatchData.MEM_STATE_LIST.add(bean); Runnable runnable = () -> { WarnMailUtil.sendWarnInfo(bean); }; ThreadPoolUtil.executor.execute(runnable); } catch (Exception e) { logger.error("解析内存使用率上报数据错误", e); } } /** * 处理上报cpu使用率数据 * * @param cpuPercentJson * @param nowtime * @param bindIp */ private void addCpuState(Double cpuPercentJson, Date nowtime, String bindIp) { try { CpuState bean = new CpuState(); bean.setHostname(bindIp); Double cpuPercentVal = FormatUtil.formatDouble(cpuPercentJson, 2); bean.setSys(cpuPercentVal); bean.setCreateTime(nowtime); BatchData.CPU_STATE_LIST.add(bean); Runnable runnable = () -> { WarnMailUtil.sendCpuWarnInfo(bean); }; ThreadPoolUtil.executor.execute(runnable); } catch (Exception e) { logger.error("解析cpu使用率上报数据错误", e); } } /** * 处理上报的系统负载数据 * * @param loadJson * @param nowtime * @param bindIp * @return */ private SysLoadState addLoadState(JSONObject loadJson, Date nowtime, String bindIp) { SysLoadState bean = new SysLoadState(); try { bean.setHostname(bindIp); bean.setOneLoad(FormatUtil.formatDouble(loadJson.getDouble("load1"), 2)); bean.setFiveLoad(FormatUtil.formatDouble(loadJson.getDouble("load5"), 2)); bean.setFifteenLoad(FormatUtil.formatDouble(loadJson.getDouble("load15"), 2)); bean.setCreateTime(nowtime); BatchData.SYSLOAD_STATE_LIST.add(bean); Runnable runnable = () -> { WarnMailUtil.sendSysLoadWarnInfo(bean); }; ThreadPoolUtil.executor.execute(runnable); } catch (Exception e) { logger.error("解析系统负载上报数据错误", e); } return bean; } /** * 处理上报的网卡流量数据 * * @param netIOCountersJson * @param nowtime * @param bindIp * @param netConnections * @return */ private NetIoState addNetIoState(JSONObject netIOCountersJson, Date nowtime, String bindIp, String netConnections) { NetIoState bean = new NetIoState(); try { bean.setNetConnections(netConnections); if (netIOCountersJson.getDouble("res_recv_bytes") != null) { bean.setRxbyt(FormatUtil.formatDouble(netIOCountersJson.getDouble("res_recv_bytes"), 2) + ""); } else { bean.setRxbyt("0"); } if (netIOCountersJson.getDouble("res_sent_bytes") != null) { bean.setTxbyt(FormatUtil.formatDouble(netIOCountersJson.getDouble("res_sent_bytes"), 2) + ""); } else { bean.setTxbyt("0"); } if (netIOCountersJson.getDouble("res_sent_packets") != null) { bean.setTxpck(FormatUtil.formatDouble(netIOCountersJson.getDouble("res_sent_packets"), 2) + ""); } else { bean.setTxpck("0"); } if (netIOCountersJson.getDouble("res_recv_packets") != null) { bean.setRxpck(FormatUtil.formatDouble(netIOCountersJson.getDouble("res_recv_packets"), 2) + ""); } else { bean.setRxpck("0"); } if (netIOCountersJson.getDouble("res_dropin_packets") != null) { bean.setDropin(FormatUtil.formatDouble(netIOCountersJson.getDouble("res_dropin_packets"), 2) + ""); } else { bean.setDropin("0"); } if (netIOCountersJson.getDouble("res_dropout_packets") != null) { bean.setDropout(FormatUtil.formatDouble(netIOCountersJson.getDouble("res_dropout_packets"), 2) + ""); } else { bean.setDropout("0"); } bean.setCreateTime(nowtime); bean.setHostname(bindIp); BatchData.NETIO_STATE_LIST.add(bean); Runnable runnable = () -> { try { if (bean != null) { WarnMailUtil.sendDownSpeedWarnInfo(bean); WarnMailUtil.sendUpSpeedWarnInfo(bean); } } catch (Exception e) { e.printStackTrace(); } }; ThreadPoolUtil.executor.execute(runnable); } catch (Exception e) { logger.error("解析网络流量上报数据错误", e); } return bean; } /** * 处理上报进程数据 * * @param processListJson * @param nowtime * @param bindIp */ private void addAppState(JSONObject processListJson, Date nowtime, String bindIp) { List keys = new ArrayList<>(processListJson.keySet()); for (String proId : keys) { try { String[] vals = processListJson.getStr(proId).split(","); AppInfo appInfo = new AppInfo(); appInfo.setHostname(bindIp); appInfo.setId(proId); appInfo.setCreateTime(nowtime); appInfo.setState(StaticKeys.ON_STATE); appInfo.setMemPer(FormatUtil.formatDouble(Double.valueOf(vals[1]), 2)); appInfo.setCpuPer(FormatUtil.formatDouble(Double.valueOf(vals[0]), 2)); appInfo.setReadBytes(FormatUtil.formatDouble(Double.valueOf(vals[2]), 2) + ""); appInfo.setWritesBytes(FormatUtil.formatDouble(Double.valueOf(vals[3]), 2) + ""); appInfo.setThreadsNum(vals[4]); appInfo.setGatherPid(vals[5]); String appTimes = DateUtil.millisToDate(vals[6], "yyyy-MM-dd HH:mm:ss"); appInfo.setAppTimes(appTimes); //进程获取出错,判定下线 begin if (appInfo.getCpuPer() < 0) { appInfo.setState(StaticKeys.DOWN_STATE); //下线后,不再更新时间戳 appInfo.setCreateTime(null); appInfo.setAppTimes(null); BatchData.APP_INFO_LIST.add(appInfo); Runnable runnable = () -> { try { AppInfo appInfoW = appInfoService.selectById(proId); if (appInfoW != null) { WarnMailUtil.sendAppDown(appInfoW, true); } } catch (Exception e) { e.printStackTrace(); } }; ThreadPoolUtil.executor.execute(runnable); continue; } //进程获取出错,判定下线 end BatchData.APP_INFO_LIST.add(appInfo); AppState appState = new AppState(); appState.setAppInfoId(proId); appState.setCreateTime(nowtime); appState.setCpuPer(FormatUtil.formatDouble(Double.valueOf(vals[0]), 2)); appState.setMemPer(FormatUtil.formatDouble(Double.valueOf(vals[1]), 2)); appState.setThreadsNum(vals[4]); BatchData.APP_STATE_LIST.add(appState); } catch (Exception e) { logger.error("解析进程上报数据错误", e); } } } /** * 处理上报docker数据 * * @param dockersJson * @param nowtime * @param bindIp */ private void addDockerState(JSONObject dockersJson, Date nowtime, String bindIp) { List keys = new ArrayList<>(dockersJson.keySet()); for (String dockerId : keys) { try { JSONObject valJson = dockersJson.getJSONObject(dockerId); DockerInfo dockerInfo = new DockerInfo(); dockerInfo.setId(dockerId); dockerInfo.setCreateTime(nowtime); dockerInfo.setState(StaticKeys.ON_STATE); dockerInfo.setHostname(bindIp); dockerInfo.setMemPer(FormatUtil.formatDouble(valJson.getDouble("memToM"), 2)); dockerInfo.setDockerCommand(valJson.getStr("command")); dockerInfo.setDockerImage(valJson.getStr("image")); dockerInfo.setDockerCreated(valJson.getStr("created")); dockerInfo.setDockerPort(valJson.getStr("portStr")); dockerInfo.setDockerSize(FormatUtil.formatDouble(valJson.getDouble("sizeRootFs"), 2) + ""); dockerInfo.setDockerStatus(valJson.getStr("status")); dockerInfo.setGatherDockerNames(valJson.getStr("names")); //docker获取出错,判定下线 begin if (dockerInfo.getMemPer() < 0) { dockerInfo.setState(StaticKeys.DOWN_STATE); //下线后,不再更新时间戳 dockerInfo.setCreateTime(null); BatchData.DOCKER_INFO_LIST.add(dockerInfo); Runnable runnable = () -> { try { DockerInfo dockerInfoW = dockerInfoService.selectById(dockerId); if (dockerInfoW != null) { WarnMailUtil.sendDockerDown(dockerInfoW, true); } } catch (Exception e) { e.printStackTrace(); } }; ThreadPoolUtil.executor.execute(runnable); continue; } //docker获取出错,判定下线 end BatchData.DOCKER_INFO_LIST.add(dockerInfo); DockerState dockerState = new DockerState(); dockerState.setDockerInfoId(dockerId); dockerState.setCreateTime(nowtime); dockerState.setMemPer(dockerInfo.getMemPer()); BatchData.DOCKER_STATE_LIST.add(dockerState); } catch (Exception e) { logger.error("解析docker上报数据错误", e); } } } /** * 存贮系统信息 * * @param hostInfoJson 主机信息 * @param cpuInfoJson cpu信息 * @param cpuPercentVal cpu使用率 * @param memPercentVal 内存使用率 * @param memTotalVal 内存总大小 * @param nowtime 上报时间 * @param bindIp 客户端ip * @param submitSeconds 上报频率,秒 * @throws Exception */ private void addSystemInfo(JSONObject hostInfoJson, JSONObject cpuInfoJson, Double cpuPercentVal, Double memPercentVal, Double memTotalVal, Date nowtime, String bindIp, String submitSeconds, String agentVer, String recvBytes, String sentBytes, NetIoState netIoState, String cpuCoresNum, String hostNameExt, SysLoadState sysLoadState, String netConnections, JSONObject swapMemoryStatJson, String account) throws Exception { SystemInfo bean = new SystemInfo(); bean.setSubmitSeconds(submitSeconds); bean.setAgentVer(agentVer); bean.setState(StaticKeys.ON_STATE); bean.setHostname(bindIp); if (!StringUtils.isEmpty(account)) { bean.setAccount(account); } bean.setNetConnections(netConnections); bean.setHostnameExt(hostNameExt); bean.setCreateTime(nowtime); bean.setBootTime(hostInfoJson.getLong("bootTime")); bean.setUptime(hostInfoJson.getLong("uptime")); bean.setCpuPer(cpuPercentVal); bean.setMemPer(memPercentVal); bean.setProcs(hostInfoJson.getStr("procs")); bean.setCpuCoreNum(cpuCoresNum); bean.setCpuXh(cpuInfoJson.getStr("modelName")); bean.setPlatForm(hostInfoJson.getStr("platform")); bean.setPlatformVersion(hostInfoJson.getStr("platformVersion") + ",kernelArch:" + hostInfoJson.getStr("kernelArch")); bean.setTotalMem(memTotalVal + "G"); if (null == netIoState) { netIoState = new NetIoState(); } bean.setRxbyt(netIoState.getRxbyt()); bean.setTxbyt(netIoState.getTxbyt()); if (null == sysLoadState) { sysLoadState = new SysLoadState(); } bean.setFiveLoad(sysLoadState.getFiveLoad()); bean.setFifteenLoad(sysLoadState.getFifteenLoad()); if (!StringUtils.isEmpty(recvBytes)) { bean.setBytesRecv(FormatUtil.formatDouble(Double.valueOf(recvBytes), 2) + ""); } else { bean.setBytesRecv("0"); } if (!StringUtils.isEmpty(sentBytes)) { bean.setBytesSent(FormatUtil.formatDouble(Double.valueOf(sentBytes), 2) + ""); } else { bean.setBytesSent("0"); } //添加交换区内存信息 setSwapMemInfo(bean, swapMemoryStatJson); BatchData.SYSTEM_INFO_LIST.add(bean); } /** * 处理cpu温度 * * @param temperaturesJsonArray * @param nowtime * @param bindIp */ private void addCpuTemperatures(JSONArray temperaturesJsonArray, Date nowtime, String bindIp) { try { logger.debug("addCpuTemperatures--------------" + temperaturesJsonArray.toString()); for (Object temperaturesObj : temperaturesJsonArray) { CpuTemperatures cpuTemperatures = new CpuTemperatures(); cpuTemperatures.setHostname(bindIp); cpuTemperatures.setCreateTime(nowtime); JSONObject temperaturesJson = JSONUtil.parseObj(temperaturesObj); String sensor = temperaturesJson.getStr("sensorKey"); String sensorTemperature = temperaturesJson.getStr("temperature"); String sensorHigh = temperaturesJson.getStr("sensorCritical"); String sensorCritical = temperaturesJson.getStr("sensorCritical"); cpuTemperatures.setCore_index(sensor); cpuTemperatures.setCrit(sensorCritical); cpuTemperatures.setInput(sensorTemperature); cpuTemperatures.setMax(sensorHigh); BatchData.CPU_TEMPERATURES_LIST.add(cpuTemperatures); Runnable runnable = () -> { WarnMailUtil.sendCpuTemperatures(cpuTemperatures); }; ThreadPoolUtil.executor.execute(runnable); } } catch (Exception e) { logger.error("解析cpu温度上报数据错误", e); } } /** * 处理上报端口数据 * * @param portInfoJson * @param nowtime * @param bindIp */ private void addPortInfos(JSONObject portInfoJson, Date nowtime, String bindIp) { List keys = new ArrayList<>(portInfoJson.keySet()); for (String portId : keys) { try { String state = portInfoJson.getStr(portId); PortInfo portInfo = new PortInfo(); portInfo.setId(portId); portInfo.setCreateTime(nowtime); portInfo.setState(state); portInfo.setHostname(bindIp); //端口telnet出错,判定下线 begin if (!StaticKeys.ON_STATE.equals(state)) { portInfo.setState(StaticKeys.DOWN_STATE); //下线后,不再更新时间戳 portInfo.setCreateTime(null); BatchData.PORT_INFO_LIST.add(portInfo); Runnable runnable = () -> { try { PortInfo portInfoW = portInfoService.selectById(portId); if (portInfoW != null) { WarnMailUtil.sendPortDown(portInfoW, true); } } catch (Exception e) { e.printStackTrace(); } }; ThreadPoolUtil.executor.execute(runnable); continue; } //端口telnet出错,判定下线 end BatchData.PORT_INFO_LIST.add(portInfo); } catch (Exception e) { logger.error("解析端口上报数据错误", e); } } } /** * 处理交换区内存信息 * * @param bean * @param swapMemoryStatJson */ private void setSwapMemInfo(SystemInfo bean, JSONObject swapMemoryStatJson) { try { Double memTotalVal = 0d;//内存总大小,G Double memPercentVal = 0d;//内存使用百分比 if (swapMemoryStatJson != null) { if (swapMemoryStatJson.getLong("total") != null) { memTotalVal = FormatUtil.formatDouble((swapMemoryStatJson.getLong("total") / (double) 1024 / 1024 / 1024), 2); } if (swapMemoryStatJson.getDouble("usedPercent") != null) { memPercentVal = FormatUtil.formatDouble(swapMemoryStatJson.getDouble("usedPercent"), 2); } bean.setSwapMemPer(String.valueOf(memPercentVal)); bean.setTotalSwapMem(memTotalVal + "G"); } } catch (Exception e) { logger.error("解析交换区内存信息数据错误", e); } } /** * 查询agent是否在同一上报周期(29s)提交多次,多次不受理 * * @param bindIp * @return true存在,false不存在 */ private boolean isExists(String bindIp) { if (StringUtils.isEmpty(bindIp)) { return true; } for (SystemInfo systemInfo : BatchData.SYSTEM_INFO_LIST) { if (systemInfo.getHostname().equals(bindIp)) { return true; } } return false; } }