HeartbeatHandler.java 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package com.zhentao.information.handler;
  2. import io.netty.channel.ChannelHandler;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. import io.netty.handler.timeout.IdleState;
  6. import io.netty.handler.timeout.IdleStateEvent;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.stereotype.Component;
  9. /**
  10. * 心跳处理器
  11. * 处理客户端的心跳检测
  12. */
  13. @Slf4j
  14. @Component
  15. @ChannelHandler.Sharable
  16. public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
  17. private static final int MAX_MISSED_HEARTBEATS = 3;
  18. private int missedHeartbeats = 0;
  19. /**
  20. * 处理用户事件
  21. * 当触发IdleStateEvent时调用
  22. * @param ctx Channel上下文
  23. * @param evt 事件对象
  24. */
  25. @Override
  26. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  27. if (evt instanceof IdleStateEvent) {
  28. IdleStateEvent event = (IdleStateEvent) evt;
  29. if (event.state() == IdleState.READER_IDLE) {
  30. missedHeartbeats++;
  31. log.warn("读空闲,已错过 {} 次心跳", missedHeartbeats);
  32. if (missedHeartbeats >= MAX_MISSED_HEARTBEATS) {
  33. log.error("读空闲超时,关闭连接:{}", ctx.channel().id().asLongText());
  34. ctx.close();
  35. } else {
  36. // 发送心跳检测消息
  37. ctx.writeAndFlush("ping");
  38. }
  39. } else if (event.state() == IdleState.WRITER_IDLE) {
  40. // 发送心跳检测消息
  41. ctx.writeAndFlush("ping");
  42. }
  43. } else {
  44. super.userEventTriggered(ctx, evt);
  45. }
  46. }
  47. @Override
  48. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  49. if (msg instanceof String && "pong".equals(msg)) {
  50. // 收到心跳响应,重置计数器
  51. missedHeartbeats = 0;
  52. log.debug("收到心跳响应");
  53. } else {
  54. super.channelRead(ctx, msg);
  55. }
  56. }
  57. @Override
  58. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  59. log.info("连接断开:{}", ctx.channel().id().asLongText());
  60. super.channelInactive(ctx);
  61. }
  62. @Override
  63. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  64. log.error("心跳处理器异常", cause);
  65. ctx.close();
  66. }
  67. }