最后我们在 MyIoHandler 中接收这条短信息:
public void messageReceived(IoSession session, Object message) throws Exception {
SmsObject sms = (SmsObject) message;
log.info("The message received is [" + sms.getMessage() + "]");
}
你会看到 Server 端的控制台输出如下信息:
The message received is [你好!Hello World!]
_______________________________________________________________________________
(6 (6 (6
(6----2.2.2.2.))))复杂的解码器复杂的解码器复杂的解码器复杂的解码器::::
下面我们讲解一下如何在解码器中保存状态变量,也就是真正的实现上面所说的 Context。
我们假设这样一种情况,有两条短信:
M sip:wap.fetion.com.cn SIP-C/2.0 S: 1580101xxxx
R: 1889020xxxx L: 21
Hello World!
M sip:wap.fetion.com.cn SIP-C/2.0 S: 1580101xxxx
R: 1889020xxxx L: 21
Hello World!
他们按照上面的颜色标识发送,也就是说红色部分、蓝色部分、绿色部分分别发送(调用三 次 IoSession.write()方法),那么如果你还用上面的 CmccSipcDecoder,将无法工作,因为 第一次数据流(红色部分)发送过取时,数据是不完整的,无法解析出一条短信息,当二次 数据流(蓝色部分)发送过去时,已经可以解析出第一条短信息了,但是第二条短信还是不 完整的,需要等待第三次数据流(绿色部分)的发送。
注意 注意 注意
注意: : : :由于模拟数据发送的规模性问题很麻烦,所以这里采用了这种极端的例子说明问题,
虽不具有典型性,但很能说明问题,这就足够了,所以不要追究这种发送消息是否在真实环 境中存在,更不要追究其合理性。
CmccSispcDecoder 类改为如下的写法:
public class CmccSipcDecoder extends CumulativeProtocolDecoder {
private final Charset charset;
private final AttributeKey CONTEXT = new AttributeKey(getClass(),
"context");
public CmccSipcDecoder(Charset charset) { this.charset = charset;
}
@Override
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
Context ctx = getContext(session);
CharsetDecoder cd = charset.newDecoder();
int matchCount = ctx.getMatchCount();
int line = ctx.getLine();
IoBuffer buffer = ctx.innerBuffer;
String statusLine = ctx.getStatusLine(), sender = ctx.getSender(),
receiver = ctx.getReceiver(), length = ctx.getLength(), sms = ctx.getSms();
while (in.hasRemaining()) { byte b = in.get();
matchCount++;
buffer.put(b);
if (line < 4 && b == 10) { if (line == 0) {
buffer.flip();
statusLine = buffer.getString(matchCount, cd);
statusLine = statusLine.substring(0, statusLine.length() - 1);
matchCount = 0;
buffer.clear();
ctx.setStatusLine(statusLine);
}
if (line == 1) { buffer.flip();
sender = buffer.getString(matchCount, cd);
sender = sender.substring(0, sender.length() - 1);
matchCount = 0;
buffer.clear();
ctx.setSender(sender);
}
if (line == 2) { buffer.flip();
receiver = buffer.getString(matchCount, cd);
receiver = receiver.substring(0, receiver.length() - 1);
matchCount = 0;
buffer.clear();
ctx.setReceiver(receiver);
}
if (line == 3) { buffer.flip();
length = buffer.getString(matchCount, cd);
length = length.substring(0, length.length() - 1);
matchCount = 0;
buffer.clear();
ctx.setLength(length);
}
line++;
} else if (line == 4) {
if (matchCount == Long.parseLong(length.split(": ")[1])) {
buffer.flip();
sms = buffer.getString(matchCount, cd);
ctx.setSms(sms);
// 由于下面的break,这里需要调用else外面的两行代码 ctx.setMatchCount(matchCount);
ctx.setLine(line);
break;
} }
ctx.setMatchCount(matchCount);
ctx.setLine(line);
}
if (ctx.getLine() == 4
&& Long.parseLong(ctx.getLength().split(": ")[1]) == ctx .getMatchCount()) {
SmsObject smsObject = new SmsObject();
smsObject.setSender(sender.split(": ")[1]);
smsObject.setReceiver(receiver.split(": ")[1]);
smsObject.setMessage(sms);
out.write(smsObject);
ctx.reset();
return true;
} else {
return false;
} }
private Context getContext(IoSession session) {
Context context = (Context) session.getAttribute(CONTEXT);
if (context == null) { context = new Context();
session.setAttribute(CONTEXT, context);
}
return context;
}
private class Context {
private final IoBuffer innerBuffer;
private String statusLine = "";
private String sender = "";
private String receiver = "";
private String length = "";
private String sms = "";
public Context() {
innerBuffer = IoBuffer.allocate(100).setAutoExpand(true);
}
private int matchCount = 0;
private int line = 0;
public int getMatchCount() { return matchCount;
}
public void setMatchCount(int matchCount) { this.matchCount = matchCount;
}
public int getLine() { return line;
}
public void setLine(int line) { this.line = line;
}
public String getStatusLine() { return statusLine;
}
public void setStatusLine(String statusLine) { this.statusLine = statusLine;
}
public String getSender() { return sender;
}
public void setSender(String sender) { this.sender = sender;
}
public String getReceiver() { return receiver;
}
public void setReceiver(String receiver) { this.receiver = receiver;
}
public String getLength() { return length;
}
public void setLength(String length) { this.length = length;
}
public String getSms() { return sms;
}
public void setSms(String sms) { this.sms = sms;
}
public void reset() {
this.innerBuffer.clear();
this.matchCount = 0;
this.line = 0;
this.statusLine = "";
this.sender = "";
this.receiver = "";
this.length = "";
this.sms = "";
}
} }
这里我们做了如下的几步操作:
(1.) 所有记录状态的变量移到了 Context 内部类中,包括记录读到短信协议的哪一行的 line。每一行读取了多少个字节的 matchCount,还有记录解析好的状态行、发送者、
接受者、短信内容、累积数据的 innerBuffer 等。这样就可以在数据不能完全解码,
等待下一次 doDecode()方法的调用时,还能承接上一次调用的数据。
(2.) 在 doDecode()方法中主要的变化是各种状态变量首先是从 Context 中获取,然后操 作之后,将最新的值 setXXX()到 Context 中保存。
(3.) 这里注意 doDecode()方法最后的判断,当认为不够解码为一条短信息时,返回 false,也就是在本次数据流解码中不要再调用 doDecode()方法;当认为已经解码 出一条短信息时,输出短消息,然后重置所有的状态变量,返回 true,也就是如果 本次数据流解码中还有没解码完的数据,继续调用 doDecode()方法。
下面我们对客户端稍加改造,来模拟上面的红、蓝、绿三次发送聊天短信息的情况:
MyClient MyClient MyClient MyClient::::
ConnectFuture future = connector.connect(new InetSocketAddress(
HOSTNAME, PORT));
future.awaitUninterruptibly();
session = future.getSession();
for (int i = 0; i < 3; i++) {
SmsObject sms = new SmsObject();
session.write(sms);
System.out.println("****************" + i);
}
这里我们为了方便演示,不在 IoHandler 中发送消息,而是直接在 MyClient 中发送,你要 注意的是三次发送都要使用同一个 IoSession,否则就不是从同一个通道发送过去的了。
CmccSipcEncoder CmccSipcEncoder CmccSipcEncoder CmccSipcEncoder::::
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { SmsObject sms = (SmsObject) message;
CharsetEncoder ce = charset.newEncoder();
String statusLine = "M sip:wap.fetion.com.cn SIP-C/2.0";
String sender = "15801012253";
String receiver = "15866332698";
String smsContent = "你好!Hello World!";
IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);
buffer.putString(statusLine + '\n', ce);
buffer.putString("S: " + sender + '\n', ce);
buffer.putString("R: " + receiver + '\n', ce);
buffer.flip();
out.write(buffer);
IoBuffer buffer2 = IoBuffer.allocate(100).setAutoExpand(true);
buffer2.putString("L: " + (smsContent.getBytes(charset).length) + "\n",ce);
buffer2.putString(smsContent, ce);
buffer2.putString(statusLine + '\n', ce);
buffer2.flip();
out.write(buffer2);
IoBuffer buffer3 = IoBuffer.allocate(100).setAutoExpand(true);
buffer3.putString("S: " + sender + '\n', ce);
buffer3.putString("R: " + receiver + '\n', ce);
buffer3.putString("L: " + (smsContent.getBytes(charset).length) + "\n",ce);
buffer3.putString(smsContent, ce);
buffer3.putString(statusLine + '\n', ce);
buffer3.flip();
out.write(buffer3);
}
上面的这段代码要配合MyClient来操作,你需要做的是在MyClient中的红色输出语句处设置 断点,然后第一调用时CmccSipcEncoder中注释掉蓝、绿色的代码,也就是发送两条短信息的 第一部分(红色的代码),依次类推,也就是MyClient的中的三次断点中,分别执行
CmccSipcEncoder中的红、蓝、绿三段代码,也就是模拟两条短信的三段发送。
你会看到Server端的运行结果是:当MyClient第一次到达断点时,没有短信息被读取到,当 MyClient第二次到达断点时,第一条短信息输出,当MyClient第三次到达断点时,第二条短 信息输出。
Mina Mina Mina
Mina 中自带的解码器中自带的解码器中自带的解码器中自带的解码器::::
解码器 说明
CumulativeProtocolDecoder 累积性解码器,上面我们重点说明了这个解 码器的用法。
SynchronizedProtocolDecoder 这个解码器用于将任何一个解码器包装为一 个线程安全的解码器,用于解决上面说的每 次执行 decode()方法时可能线程不是上一次 的线程的问题,但这样会在高并发时,大大 降低系统的性能。
TextLineDecoder 按 照 文 本 的 换 行 符 ( Windows:\r\n 、 Linux:\n、Mac:\r)解码数据。
PrefixedStringDecoder 这个类继承自 CumulativeProtocolDecoder 类,用于读取数据最前端的 1、2、4 个字节 表示后面的数据长度的数据。譬如:一个段 数据的前两个字节表示后面的真实数据的长 度,那么你就可以用这个方法进行解码。
_______________________________________________________________________________
(6 (6 (6
(6----3.3.3.3.))))多路分离的多路分离的多路分离的多路分离的解码器解码器解码器解码器::::
假设一段数据发送过来之后,需要根据某种条件决定使用哪个解码器,而不是像上面的例子,
固定使用一个解码器,那么该如何做呢?
幸 好 Mina 提 供 了 org.apache.mina.filter.codec.demux 包 来 完 成 这 种 多 路 分 离
(Demultiplexes)的解码工作,也就是同时注册多个解码器,然后运行时依据传入的数据 决定到底使用哪个解码器来工作。所谓多路分离就是依据条件分发到指定的解码器,譬如:
上面的短信协议进行扩展,可以依据状态行来判断使用 1.0 版本的短信协议解码器还是 2.0 版本的短信协议解码器。
下面我们使用一个简单的例子,说明这个多路分离的解码器是如何使用的,需求如下所示:
(1.) 客户端传入两个 int 类型的数字,还有一个 char 类型的符号。
(2.) 如果符号是+,服务端就是用 1 号解码器,对两个数字相加,然后把结果返回给客户 端。
(3.) 如果符号是-,服务端就使用 2 号解码器,将两个数字变为相反数,然后相加,把结 果返回给客户端。
Demux 开发编解码器主要有如下几个步骤:
A. 定义 Client 端、Server 端发送、接收的数据对象。
B. 使用 Demux 编写编码器是实现 MessageEncoder<T>接口,T 是你要编码的数据对象,这 个 MessageEncoder 会在 DemuxingProtocolEncoder 中调用。
C. 使用 Demux 编写编码器是实现 MessageDecoder 接口,这个 MessageDecoder 会在
DemuxingProtocolDecoder 中调用。
D. 在 DemuxingProtocolCodecFactory 中调用 addMessageEncoder()、addMessageDecoder() 方法组装编解码器。
MessageEncoder MessageEncoder MessageEncoder
MessageEncoder 的接口如下所示:
public interface MessageEncoder<T> {
void encode(IoSession session, T message, ProtocolEncoderOutput out) throws Exception;
}
你注意到消息编码器接口与在 ProtocolEncoder 中没什么不同,区别就是 Object message 被泛型具体化了类型,你不需要手动的类型转换了。
MessageDecoder MessageDecoder MessageDecoder
MessageDecoder 的接口如下所示:
public interface MessageDecoder {
static MessageDecoderResult OK = MessageDecoderResult.OK;
static MessageDecoderResult NEED_DATA =
MessageDecoderResult.NEED_DATA;
static MessageDecoderResult NOT_OK = MessageDecoderResult.NOT_OK;
MessageDecoderResult decodable(IoSession session, IoBuffer in);
MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception;
void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception;
} (1.) (1.) (1.)
(1.)decodabledecodabledecodabledecodable()()()()方法有三个返回值方法有三个返回值方法有三个返回值方法有三个返回值,,,,分别表示如下的含义分别表示如下的含义分别表示如下的含义分别表示如下的含义::::
A. MessageDecoderResult.NOT_OK:表示这个解码器不适合解码数据,然后检查其它解码
器,如果都不满足会抛异常;B.
MessageDecoderResult.NEED_DATA:表示当前的读入的数据不够判断是否能够使用这
个解码器解码,然后再次调用 decodable()方法检查其它解码器,如果都是 NEED_DATA, 则等待下次输入;C.
MessageDecoderResult.OK : 表 示 这 个 解 码 器 可 以 解 码 读 入 的 数 据 , 然 后 则 调 用 MessageDecoder 的 decode()方法。
这里注意 decodable()方法对参数 IoBuffer in 的任何操作在方法结束之后,都会复原,也就是 你不必担心在调用 decode()方法时,position 已经不在缓冲区的起始位置。这个方法相当于 是预读取,用于判断是否是可用的解码器。
( ((
(2.2.2.2.))))decode()decode()decode()decode()方法有三个返回值方法有三个返回值方法有三个返回值,方法有三个返回值,,,分别表示如下的含义分别表示如下的含义分别表示如下的含义分别表示如下的含义::::
A. MessageDecoderResult.NOT_OK:表示解码失败,会抛异常;
B.
MessageDecoderResult.NEED_DATA:表示数据不够,需要读到新的数据后,再次调用 decode()方法。
C.
MessageDecoderResult.OK:表示解码成功。
代码演示代码演示 代码演示代码演示::: : (1.)
(1.) (1.)
(1.)客户端发送的数据对象客户端发送的数据对象客户端发送的数据对象客户端发送的数据对象:::: public class SendMessage {
private int i = 0;
private int j = 0;
private char symbol = '+';
public char getSymbol() { return symbol;
}
public void setSymbol(char symbol) { this.symbol = symbol;
}
public int getI() { return i;
}
public void setI(int i) { this.i = i;
}
public int getJ() { return j;
}
public void setJ(int j) { this.j = j;
}
}
(2.) (2.) (2.)
(2.)服务端发送的返回结果对象服务端发送的返回结果对象服务端发送的返回结果对象服务端发送的返回结果对象:::: public class ResultMessage {
private int result = 0;
public int getResult() { return result;
}
public void setResult(int result) { this.result = result;
}
}
(3.)(3.)
(3.)(3.)客户端使用的客户端使用的客户端使用的客户端使用的 SendMessageSendMessageSendMessageSendMessage 的的的的编码器编码器编码器编码器::::
public class SendMessageEncoder implements MessageEncoder<SendMessage>
{
@Override
public void encode(IoSession session, SendMessage message, ProtocolEncoderOutput out) throws Exception {
IoBuffer buffer = IoBuffer.allocate(10);
buffer.putChar(message.getSymbol());
buffer.putInt(message.getI());
buffer.putInt(message.getJ());
buffer.flip();
out.write(buffer);
}
}
这里我们的 SendMessage、ResultMessage 中的字段都是用长度固定的基本数据类型,这样 IoBuffer 就不需要自动扩展了,提高性能。按照一个 char、两个 int 计算,这里的 IoBuffer 只需要 10 个字节的长度就可以了。
(4.)(4.)
(4.)(4.)服务端使用的服务端使用的服务端使用的服务端使用的 SendMessageSendMessageSendMessageSendMessage 的的的的 1111 号解码器号解码器号解码器:号解码器:::
public class SendMessageDecoderPositive implements MessageDecoder {
@Override
public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
if (in.remaining() < 2)
return MessageDecoderResult.NEED_DATA;
else {
char symbol = in.getChar();
if (symbol == '+') {
return MessageDecoderResult.OK;
} else {
return MessageDecoderResult.NOT_OK;
} } }
@Override
public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
SendMessage sm = new SendMessage();
sm.setSymbol(in.getChar());
sm.setI(in.getInt());
sm.setJ(in.getInt());
out.write(sm);
return MessageDecoderResult.OK;
}
@Override
public void finishDecode(IoSession session, ProtocolDecoderOutput out)
throws Exception { // undo
}
}
因为客户端发送的 SendMessage 的前两个字节(char)就是符号位,所以我们在 decodable() 方法中对此条件进行了判断,之后读到两个字节,并且这两个字节表示的字符是+时,才认 为这个解码器可用。
(5.) (5.) (5.)
(5.)服务端使用的服务端使用的服务端使用的服务端使用的 SendMessagSendMessagSendMessagSendMessageeee 的的的的 2222 号解码器号解码器号解码器:号解码器:::
public class SendMessageDecoderNegative implements MessageDecoder {
@Override
public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
if (in.remaining() < 2)
return MessageDecoderResult.NEED_DATA;
else {
char symbol = in.getChar();
if (symbol == '-') {
return MessageDecoderResult.OK;
} else {
return MessageDecoderResult.NOT_OK;
} }
}
@Override
public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
SendMessage sm = new SendMessage();
sm.setSymbol(in.getChar());
sm.setI(-in.getInt());
sm.setJ(-in.getInt());
out.write(sm);
return MessageDecoderResult.OK;
}
@Override
public void finishDecode(IoSession session, ProtocolDecoderOutput out)
throws Exception { // undo
throws Exception { // undo