package com.wgcloud.common; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.jcraft.jsch.ChannelShell; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; import com.wgcloud.util.staticvar.StaticKeys; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.InputStream; import java.io.OutputStream; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; /** * @version v3.3 * @ClassName:NettytHandler.java * @author: http://www.wgstart.com * @date: 2021年4月22日 * @Description: netty处理handler * @Copyright: 2017-2021 wgcloud. All rights reserved. */ public class NettytHandler extends SimpleChannelInboundHandler { private static final Logger logger = LoggerFactory.getLogger(NettytHandler.class); //操作类型 public static final String HANDLE_OPERATE = "handle"; //操作指令 public static final String HANDLE_VALUE = "value"; //连接超时30s public static final Integer CONNECTION_OUT = 30000; //回车 public static final String ENTER_VAL = "\r"; //换行 public static final String LINE_NEXT_VAL = "\n"; //TAB制表符 public static final String TAB_VAL = "\t"; //所有正在连接的channel都会存在这里面,所以也可以间接代表在线的客户端 public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); //客户端执行的命令 public static Map MAP_CMD = Collections.synchronizedMap(new HashMap()); //客户端保存的ssh连接session public static Map MAP_SSH_SESSION = Collections.synchronizedMap(new HashMap()); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { String channelId = ctx.channel().id().toString(); String msgJSonStr = msg.text(); if (StringUtils.isEmpty(msgJSonStr)) { return; } JSONObject msgJson = JSONUtil.parseObj(msgJSonStr); if ("connect".equals(msgJson.getStr(HANDLE_OPERATE))) { //执行连接操作 try { getSSHChannel(channelId, msgJson.getStr("ip"), msgJson.getStr("user"), msgJson.getStr("pwd"), Integer.valueOf(msgJson.getStr("port")), msgJson.getStr("priKeyBasePath")); executeCommand(ctx, ENTER_VAL); } catch (JSchException e) { ctx.writeAndFlush(new TextWebSocketFrame(LINE_NEXT_VAL + e.toString())); logger.error("ssh终端连接错误:", e); } } else if ("cmd".equals(msgJson.getStr(HANDLE_OPERATE))) { //发送指令执行 String cmdStr = msgJson.getStr(HANDLE_VALUE); //判断指令是否包含TAB,若是则不做任何操作,直接返回 if (cmdStr.endsWith(TAB_VAL)) { return; } //普通指令字符使用MAP_CMD存贮该终端的指令 if (StringUtils.isEmpty(MAP_CMD.get(channelId))) { MAP_CMD.put(channelId, cmdStr); } else { MAP_CMD.put(channelId, MAP_CMD.get(channelId) + cmdStr); } //判断指令是否包含回车,若是则发送执行,执行完成清空MAP_CMD已存贮该终端的指令 if (cmdStr.contains(ENTER_VAL)) { if (!StringUtils.isEmpty(MAP_CMD.get(channelId))) { executeCommand(ctx, MAP_CMD.get(channelId)); MAP_CMD.remove(channelId); } } } } //客户端建立连接 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { channelGroup.add(ctx.channel()); logger.info(ctx.channel().remoteAddress() + "ssh终端上线了!"); } //关闭连接 @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { channelGroup.remove(ctx.channel()); logger.info(ctx.channel().remoteAddress() + "ssh终端断开连接"); ChannelShell channelShell = MAP_SSH_SESSION.get(ctx.channel().id().toString()); if (channelShell != null) { Session session = channelShell.getSession(); if (channelShell != null) { channelShell.disconnect(); } if (session != null) { session.disconnect(); } MAP_SSH_SESSION.remove(ctx.channel().id().toString()); MAP_CMD.remove(ctx.channel().id().toString()); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.channel().close(); } /** * ssh2连接服务器,并将channel存到map里 * * @param channelId 终端ID * @param host 主机IP * @param user 用户名 * @param password 密码 * @param port ssh端口 * @param priKeyBasePath 私钥文件 * @return */ private static ChannelShell getSSHChannel(String channelId, String host, String user, String password, Integer port, String priKeyBasePath) throws JSchException { ChannelShell channelShell = MAP_SSH_SESSION.get(channelId); if (channelShell != null) { return channelShell; } JSch jsch = new JSch(); Session session = jsch.getSession(user, host, port); if (!StringUtils.isEmpty(password)) { //密码登录 session.setPassword(password); //忽略第一次连接时候 hostkey 检查 session.setConfig("StrictHostKeyChecking", "no"); } else { //添加私钥登录 logger.debug("priKeyBasePath-----------" + StaticKeys.JAR_PATH + "/" + priKeyBasePath); jsch.addIdentity(StaticKeys.JAR_PATH + "/" + priKeyBasePath); Properties config = new Properties(); config.put("StrictHostKeyChecking", "no"); session.setConfig(config); } session.connect(CONNECTION_OUT); // 设置timeout时间 session.setTimeout(600000); //开启shell,shell 具有上下文交互,执行命令不会马上退出 channelShell = (ChannelShell) session.openChannel("shell"); channelShell.connect(CONNECTION_OUT); channelShell.setPtyType("dumb"); channelShell.setPty(true); MAP_SSH_SESSION.put(channelId, channelShell); return channelShell; } /** * 执行shell脚本指令 * * @param ctx * @param cmds */ private static void executeCommand(ChannelHandlerContext ctx, String cmds) { ctx.writeAndFlush(new TextWebSocketFrame(LINE_NEXT_VAL)); try { ChannelShell channelShell = MAP_SSH_SESSION.get(ctx.channel().id().toString()); if (null == channelShell) { ctx.writeAndFlush(new TextWebSocketFrame("\n\r~$ ")); return; } InputStream inputStream = channelShell.getInputStream(); OutputStream outputStream = channelShell.getOutputStream(); outputStream.write((cmds).getBytes()); outputStream.flush(); if (!StringUtils.isEmpty(cmds)) { logger.info(channelShell.getSession().getHost() + "," + channelShell.getSession().getUserName() + ",执行ssh指令:" + cmds); } String showMsg = ""; Thread.sleep(400); byte[] tmp = new byte[4096]; //读取shell命令执行结果循环次数,最大3,也就是每次读取执行结果等待1500ms,此期间若没有返回数据就会退出 int beat = 0; while (true) { //循环读取shell执行结果 begin while (inputStream.available() > 0) { int i = inputStream.read(tmp); if (i < 0) { break; } showMsg = new String(tmp, 0, i, "utf-8"); ctx.writeAndFlush(new TextWebSocketFrame(showMsg)); Thread.sleep(500); } //循环读取shell执行结果 end Thread.sleep(500); //判断shell通道是否关闭 begin if (channelShell.isClosed()) { //查询是否还有可读取的字节,若有则继续循环读取 if (inputStream.available() > 0) { continue; } ctx.writeAndFlush(new TextWebSocketFrame("exit-status: " + channelShell.getExitStatus() + ",会话通道已超时请重新连接")); break; } //判断shell通道是否关闭 end beat++; //循环3次,读取结束 if (beat > 2) { break; } } } catch (Exception e) { logger.error("shh指令执行错误:", e); } } /** * 清空web ssh终端缓存map */ public static void clearOldData() { MAP_CMD.clear(); MAP_SSH_SESSION.clear(); } }