`
simple1024
  • 浏览: 73416 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Mina自定义协议-实现数据交互

    博客分类:
  • Mina
阅读更多

 

本文主要现实mina的自定义协议,并且实现服务器和客户端的简单数据交互。

 

"mina协议的自定义"可参考本博Mina相关文章。

 

正题,所需要的基础类:

抽象协议类

请求协议

响应协议

(需要定制自己的协议格式)

 

协议编码解码工厂

协议编码

协议解码

 

客户端

客户端Handler

 

服务器

服务器Handler

 

 

/**
 * 消息协议
 * 
 * @author Simple
 * 
 */
public abstract class JAbsMessageProtocal {

  public abstract byte getTag();// 消息协议类型  请求/响应

  public abstract int getLength();// 消息协议数据长度
}

/**
 * 报头:
 *    short tag:请求/响应
 *    int   length:数据长度
 * 报体:
 *    short   methodCode:功能函数
 *    byte    resultCode:结果码
 *    String  content:数据内容
 */
 

 

 

/**
 * 消息协议-请求
 * 
 * @author Simple
 * 
 */
public class JMessageProtocalReq extends JAbsMessageProtocal {

  private short functionCode;// 功能代码

  private String content;// 请求内容

  @Override
  public int getLength() {
    return 2 + (content == null ? 0 : content.getBytes().length);
  }

  @Override
  public byte getTag() {
    return JConstant.REQ;
  }

  public void setFunctionCode(short functionCode) {
    this.functionCode=functionCode;
  }

  public short getFunctionCode() {
    return functionCode;
  }

  public void setContent(String content) {
    this.content=content;
  }

  public String getContent() {
    return content;
  }

  @Override
  public String toString() {
    return "JMessageProtocalReq [content=" + content + ", functionCode=" + functionCode + ", getLength()=" + getLength()
      + ", getTag()=" + getTag() + "]";
  }
}
 

 

 

/**
 * 消息协议-响应
 * 
 * @author Simple
 * 
 */
public class JMessageProtocalRes extends JAbsMessageProtocal {

  private byte resultCode;// 结果码

  private String content;// 响应内容

  @Override
  public int getLength() {
    return 1 + (getContent() == null ? 0 : getContent().getBytes().length);
  }

  @Override
  public byte getTag() {
    return JConstant.RES;
  }

  public void setResultCode(byte resultCode) {
    this.resultCode=resultCode;
  }

  public byte getResultCode() {
    return resultCode;
  }

  public void setContent(String content) {
    this.content=content;
  }

  public String getContent() {
    return content;
  }

  @Override
  public String toString() {
    return "JMessageProtocalRes [content=" + content + ", resultCode=" + resultCode + ", getLength()=" + getLength()
      + ", getTag()=" + getTag() + "]";
  }
}

 

/**
 * JMessageProtocal解码编码工厂
 * 
 * @author Simple
 * 
 */
public class JMessageProtocalCodecFactory implements ProtocolCodecFactory {

  private final JMessageProtocalDecoder decoder;

  private final JMessageProtocalEncoder encoder;

  public JMessageProtocalCodecFactory(Charset charset) {
    this.decoder=new JMessageProtocalDecoder(charset);
    this.encoder=new JMessageProtocalEncoder(charset);
  }

  public ProtocolDecoder getDecoder(IoSession paramIoSession) throws Exception {
    return decoder;
  }

  public ProtocolEncoder getEncoder(IoSession paramIoSession) throws Exception {
    return encoder;
  }
}

 

/**
 * JMessageProtocal解码
 * @author Simple
 *
 */
public class JMessageProtocalDecoder extends ProtocolDecoderAdapter {

  private Logger log=Logger.getLogger(JMessageProtocalDecoder.class);

  private Charset charset;

  public JMessageProtocalDecoder(Charset charset) {
    this.charset=charset;
  }

  /**
   * 解码
   */
  public void decode(IoSession session, IoBuffer buf, ProtocolDecoderOutput out) throws Exception {
    JAbsMessageProtocal absMP=null;
    // 获取协议tag
    byte tag=buf.get();
    // 获取协议体长度
    int length=buf.getInt();
    // 取出协议体
    byte[] bodyData=new byte[length];
    buf.get(bodyData);
    // 为解析数据做准备
    // 检测协议
    IoBuffer tempBuf=IoBuffer.allocate(100).setAutoExpand(true);
    tempBuf.put(tag);
    tempBuf.putInt(length);
    tempBuf.put(bodyData);
    tempBuf.flip();
    if(!canDecode(tempBuf)) {
      return;
    }
    // 协议体buf
    IoBuffer bodyBuf=IoBuffer.allocate(100).setAutoExpand(true);
    bodyBuf.put(bodyData);
    bodyBuf.flip();
    // 整个协议buf
    IoBuffer allBuf=IoBuffer.allocate(100).setAutoExpand(true);
    allBuf.put(tag);
    allBuf.putInt(length);
    allBuf.put(bodyData);
    allBuf.flip();
    //
    if(tag == JConstant.REQ) {
      JMessageProtocalReq req=new JMessageProtocalReq();
      short functionCode=bodyBuf.getShort();
      String content=bodyBuf.getString(charset.newDecoder());
      req.setFunctionCode(functionCode);
      req.setContent(content);
      absMP=req;
    } else if(tag == JConstant.RES) {
      JMessageProtocalRes res=new JMessageProtocalRes();
      byte resultCode=bodyBuf.get();
      String content=bodyBuf.getString(charset.newDecoder());
      res.setResultCode(resultCode);
      res.setContent(content);
      absMP=res;
    } else {
      log.error("未定义的Tag");
    }
    out.write(absMP);
  }

  // 是否可以解码
  private boolean canDecode(IoBuffer buf) {
    int protocalHeadLength=5;// 协议头长度
    int remaining=buf.remaining();
    if(remaining < protocalHeadLength) {
      log.error("错误,协议不完整,协议头长度小于" + protocalHeadLength);
      return false;
    } else {
      log.debug("协议完整");
      // 获取协议tag
      byte tag=buf.get();
      if(tag == JConstant.REQ || tag == JConstant.RES) {
        log.debug("Tag=" + tag);
      } else {
        log.error("错误,未定义的Tag类型");
        return false;
      }
      // 获取协议体长度
      int length=buf.getInt();
      if(buf.remaining() < length) {
        log.error("错误,真实协议体长度小于消息头中取得的值");
        return false;
      } else {
        log.debug("真实协议体长度:" + buf.remaining() + " = 消息头中取得的值:" + length);
      }
    }
    return true;
  }
}

 

/**
 * JMessageProtocal编码
 * @author Simple
 *
 */
public class JMessageProtocalEncoder extends ProtocolEncoderAdapter {

  private Charset charset;

  public JMessageProtocalEncoder(Charset charset) {
    this.charset=charset;
  }

  /**
   * 编码
   */
  public void encode(IoSession session, Object object, ProtocolEncoderOutput out) throws Exception {
    // new buf
    IoBuffer buf=IoBuffer.allocate(2048).setAutoExpand(true);
    // object --> AbsMP
    JAbsMessageProtocal absMp=(JAbsMessageProtocal)object;
    buf.put(absMp.getTag());
    buf.putInt(absMp.getLength());
    if(object instanceof JMessageProtocalReq) {// 请求协议
      JMessageProtocalReq mpReq=(JMessageProtocalReq)object;
      buf.putShort(mpReq.getFunctionCode());
      buf.putString(mpReq.getContent(), charset.newEncoder());
    } else if(object instanceof JMessageProtocalRes) {// 响应协议
      JMessageProtocalRes mpRes=(JMessageProtocalRes)object;
      buf.put(mpRes.getResultCode());
      buf.putString(mpRes.getContent(), charset.newEncoder());
    }
    buf.flip();
    out.write(buf);
  }
}

 

/**
 * MINA 客户端
 * 
 * @author Simple
 * 
 */
public class MainClient {

  @SuppressWarnings("unused")
  private static Logger log=Logger.getLogger(MainClient.class);

  private static final int PORT=9999;

  public static void main(String[] args) {
    NioSocketConnector connector=new NioSocketConnector();
    DefaultIoFilterChainBuilder chain=connector.getFilterChain();
    chain.addLast("ProtocolCodecFilter", new ProtocolCodecFilter(new JMessageProtocalCodecFactory(Charset.forName("UTF-8"))));
    connector.setHandler(new MinaClientHandler());
    connector.setConnectTimeoutMillis(3000);
    ConnectFuture cf=connector.connect(new InetSocketAddress("localhost", PORT));
    cf.awaitUninterruptibly();// 等待连接创建完成
    JMessageProtocalReq req=new JMessageProtocalReq();
    req.setFunctionCode((short)1);
    req.setContent("hello world!!!");
    cf.getSession().write(req);
    cf.getSession().getCloseFuture().awaitUninterruptibly();// 等待连接断开
    connector.dispose();
  }
}

 

/**
 * MINA 客户端消息处理
 * 
 * @author Simple
 * 
 */
public class MinaClientHandler extends IoHandlerAdapter {

  private Logger log=Logger.getLogger(MinaClientHandler.class);

  @Override
  public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
    log.error(String.format("Client产生异常!"));
  }

  @Override
  public void messageReceived(IoSession session, Object message) throws Exception {
    log.debug(String.format("来自Server[%s]的消息:%s", session.getRemoteAddress(), message.toString()));
  }

  @Override
  public void messageSent(IoSession session, Object message) throws Exception {
    log.debug(String.format("向Server[%s]发送消息:%s", session.getRemoteAddress(), message.toString()));
  }

  @Override
  public void sessionClosed(IoSession session) throws Exception {
    log.debug(String.format("与Server[%s]断开连接!", session.getRemoteAddress()));
  }

  @Override
  public void sessionCreated(IoSession session) throws Exception {
    log.debug(String.format("与Server[%s]建立连接!", session.getRemoteAddress()));
  }

  @Override
  public void sessionOpened(IoSession session) throws Exception {
    log.debug(String.format("与Server[%s]打开连接!", session.getRemoteAddress()));
  }

  @Override
  public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
    log.debug(String.format("Client进入空闲状态!"));
  }
}

 

/**
 * MINA 服务器
 * 
 * @author Simple
 * 
 */
public class MainServer {

  private static Logger log=Logger.getLogger(MainServer.class);

  private static final int PORT=9999;

  public static void main(String[] args) throws Exception {
    SocketAcceptor acceptor=new NioSocketAcceptor();// tcp/ip 接收者
    DefaultIoFilterChainBuilder chain=acceptor.getFilterChain();// 过滤管道
    chain.addLast("ProtocolCodecFilter", new ProtocolCodecFilter(new JMessageProtocalCodecFactory(Charset.forName("UTF-8"))));
    acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);// 读写通道10s内无操作进入空闲状态
    acceptor.setHandler(new MinaServerHandler());// 设置handler
    acceptor.bind(new InetSocketAddress(PORT));// 设置端口
    log.debug(String.format("Server Listing on %s", PORT));
  }
}

 

/**
 * MINA 服务器消息处理
 * 
 * @author Simple
 * 
 */
public class MinaServerHandler extends IoHandlerAdapter {

  private Logger log=Logger.getLogger(MinaServerHandler.class);

  @Override
  public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
    log.error(String.format("Server产生异常!"));
  }

  @Override
  public void messageReceived(IoSession session, Object message) throws Exception {
    log.debug(String.format("来自Client[%s]的消息:%s", session.getRemoteAddress(), message.toString()));
  }

  @Override
  public void messageSent(IoSession session, Object message) throws Exception {
    log.debug(String.format("向Client[%s]发送消息:%s", session.getRemoteAddress(), message.toString()));
  }

  @Override
  public void sessionClosed(IoSession session) throws Exception {
    log.debug(String.format("Client[%s]与Server断开连接!", session.getRemoteAddress()));
  }

  @Override
  public void sessionCreated(IoSession session) throws Exception {
    log.debug(String.format("Client[%s]与Server建立连接!", session.getRemoteAddress()));
  }

  @Override
  public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
    log.debug(String.format("Server进入空闲状态!"));
  }

  @Override
  public void sessionOpened(IoSession session) throws Exception {
    log.debug(String.format("Client[%s]与Server打开连接!", session.getRemoteAddress()));
  }
}

 

 

如果想深入了解mina,我感觉还得去读一下mina的源码,会有很大的收获。

3
2
分享到:
评论
1 楼 fengqingyebai 2015-11-02  
感谢分享谢谢

相关推荐

    java开源包3

    WebSocket4J 是一个用 Java 实现的 WebSocket 协议的类库,可使用 Java 来构建交互式 Web 应用。WebSocket4J 并未实现客户端通讯协议,所以不能用它来连接 WebSocket 服务器。 Struts验证码插件 JCaptcha4Struts2 ...

    JAVA上百实例源码以及开源项目源代码

    例如,容易实现协议的设计。 Java EJB中有、无状态SessionBean的两个例子 两个例子,无状态SessionBean可会话Bean必须实现SessionBean,获取系统属性,初始化JNDI,取得Home对象的引用,创建EJB对象,计算利息等;...

    java开源包4

    WebSocket4J 是一个用 Java 实现的 WebSocket 协议的类库,可使用 Java 来构建交互式 Web 应用。WebSocket4J 并未实现客户端通讯协议,所以不能用它来连接 WebSocket 服务器。 Struts验证码插件 JCaptcha4Struts2 ...

    java开源包1

    WebSocket4J 是一个用 Java 实现的 WebSocket 协议的类库,可使用 Java 来构建交互式 Web 应用。WebSocket4J 并未实现客户端通讯协议,所以不能用它来连接 WebSocket 服务器。 Struts验证码插件 JCaptcha4Struts2 ...

    java开源包10

    WebSocket4J 是一个用 Java 实现的 WebSocket 协议的类库,可使用 Java 来构建交互式 Web 应用。WebSocket4J 并未实现客户端通讯协议,所以不能用它来连接 WebSocket 服务器。 Struts验证码插件 JCaptcha4Struts2 ...

    JAVA上百实例源码以及开源项目

    例如,容易实现协议的设计。 Java EJB中有、无状态SessionBean的两个例子 两个例子,无状态SessionBean可会话Bean必须实现SessionBean,获取系统属性,初始化JNDI,取得Home对象的引用,创建EJB对象,计算利息等;...

    java开源包11

    WebSocket4J 是一个用 Java 实现的 WebSocket 协议的类库,可使用 Java 来构建交互式 Web 应用。WebSocket4J 并未实现客户端通讯协议,所以不能用它来连接 WebSocket 服务器。 Struts验证码插件 JCaptcha4Struts2 ...

    java开源包2

    WebSocket4J 是一个用 Java 实现的 WebSocket 协议的类库,可使用 Java 来构建交互式 Web 应用。WebSocket4J 并未实现客户端通讯协议,所以不能用它来连接 WebSocket 服务器。 Struts验证码插件 JCaptcha4Struts2 ...

    java开源包6

    WebSocket4J 是一个用 Java 实现的 WebSocket 协议的类库,可使用 Java 来构建交互式 Web 应用。WebSocket4J 并未实现客户端通讯协议,所以不能用它来连接 WebSocket 服务器。 Struts验证码插件 JCaptcha4Struts2 ...

    java开源包5

    WebSocket4J 是一个用 Java 实现的 WebSocket 协议的类库,可使用 Java 来构建交互式 Web 应用。WebSocket4J 并未实现客户端通讯协议,所以不能用它来连接 WebSocket 服务器。 Struts验证码插件 JCaptcha4Struts2 ...

    java开源包8

    WebSocket4J 是一个用 Java 实现的 WebSocket 协议的类库,可使用 Java 来构建交互式 Web 应用。WebSocket4J 并未实现客户端通讯协议,所以不能用它来连接 WebSocket 服务器。 Struts验证码插件 JCaptcha4Struts2 ...

    java开源包7

    WebSocket4J 是一个用 Java 实现的 WebSocket 协议的类库,可使用 Java 来构建交互式 Web 应用。WebSocket4J 并未实现客户端通讯协议,所以不能用它来连接 WebSocket 服务器。 Struts验证码插件 JCaptcha4Struts2 ...

    java开源包9

    WebSocket4J 是一个用 Java 实现的 WebSocket 协议的类库,可使用 Java 来构建交互式 Web 应用。WebSocket4J 并未实现客户端通讯协议,所以不能用它来连接 WebSocket 服务器。 Struts验证码插件 JCaptcha4Struts2 ...

    java开源包101

    WebSocket4J 是一个用 Java 实现的 WebSocket 协议的类库,可使用 Java 来构建交互式 Web 应用。WebSocket4J 并未实现客户端通讯协议,所以不能用它来连接 WebSocket 服务器。 Struts验证码插件 JCaptcha4Struts2 ...

    Java资源包01

    WebSocket4J 是一个用 Java 实现的 WebSocket 协议的类库,可使用 Java 来构建交互式 Web 应用。WebSocket4J 并未实现客户端通讯协议,所以不能用它来连接 WebSocket 服务器。 Struts验证码插件 JCaptcha4Struts2 ...

Global site tag (gtag.js) - Google Analytics