分布式java应用学习笔记一

基于消息方式实现系统间的通信

常用的通信协议:

tcp/ip 保证数数据传输的可靠性,会牺牲性能

udp/ip 双方不建立联接,面是发送到网上进行传递,性能较好

系统间通信对数据的处理

同步IO常用的是

(Blocking IO) 当发起读写操作时, 均为阻塞方式,只有当操作完成后才会释放

资源

NIO(Non-Blocking IO) 基于事件驱动的,实际上采用的reactor模式2, 发起读写操作

时是非阻塞方式, linux2.6 以后版本采用epoll3方式来实现NIO

异步操作方式;AIO(基于事件戏动) 采用proactor模式4 直接调用API的read和write

方法

读取:操作系统将可读的流传达入read方法的缓冲区,通知应用

写入:操作系统把write方法写入的流写入完毕时通知应用程序

Window s基于locp实现了AIO linux目前只有基于epoll模拟实现的AIO(只有j dk7才支持AIO)

实现方式:

TCP/IP+BIO

采用socket和serverSocket来实现,

客户端的关键代码

public void client() throws UnknownHostException, IOException {

Socket socket = new Socket("服务器ip/域名", 123);

// 读取服务器返回流

BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

// 向服务器写入的流

PrintWriter out = new PrintWriter(socket.getOutputStream());

// 向服务器发送信息

out.println("hello");

// 阻塞读取服务端的返回信息

in.readLine();

}

服务器端的关键代码

public void server() throws IOException {

// 监听的端口

ServerSocket serverSocket = new ServerSocket(123);

// 设置超时时间

// serverSocket.setSoTimeout(11);

// 接收客户端建立联接的请求

// 也是通过socket.getInputStream() 和socket.getOutputStream();

// 来进行读写操作,该方法会一直阻塞到有发送建立联接的请求

Socket socket = serverSocket.accept();

}

一般是采用联接池的方式来维护socket(容易出现服务器持掉的现象),所以要根据情况设置超时时间, 为了能够同时接收多个连接请求,就要在accept取得socket后,将此socket 放到一个线程中单独处理,通常称为一连接一线程,防止资源耗尽,必须限制线程数量,这就造成了BIO的情况下服务端能支撑的连接数是有限的

TCP/IP+NIO

采用channel和selector来实现

public void client() throws IOException {

SocketChannel socketChannel = SocketChannel.open();

// 设置为非阻塞模式

socketChannel.configureBlocking(false);

// 接着返回false,表示正在建立连接

socketChannel.connect(new SocketAddress() {

});

Selector selector = Selector.open();

// 注册selector和感受性趣的连接事件

socketChannel.register(selector, SelectionKey.OP_CONNECT);

// 阻塞至有感性趣的事件发生, 直到达超时时间

// 如果希望一直等,就调用无参的select方法

int nKeys = selector.select();

// 如果不阻塞直接返回目前是否有感性趣的事件发生

// selector.selectNow();

// nKeys>0 说明有感兴趣的事发生

SelectionKey sKey = null;

if (nKeys > 0) {

Set<SelectionKey> keys = selector.selectedKeys();

for (SelectionKey key : keys) {

//

if (key.isConnectable()) {

SocketChannel sc = (SocketChannel) key.channel();

// 非阻塞式

sc.configureBlocking(false);

// 注册感性趣的IO事件,通常不常注册写事件

// 在缓冲区未满的情况,是一直可写的

sc.register(selector, SelectionKey.OP_READ);

// 完成连接的建立

sc.finishConnect();

}

// 有流可以读取

else if (key.isReadable()) {

ByteBuffer buffer = ByteBuffer.allocate(1024);

SocketChannel sc = (SocketChannel) key.channel();

int readBytes = 0;

try {

int ret = 0;

try {

while ((ret = sc.read(buffer)) > 0) {

readBytes += ret;

}

} finally {

buffer.flip();

}

} finally {

if (null != buffer) {

buffer.clear();

}

}

} else if (key.isWritable()) {

// 取消对OP_WRITE事件的注册

key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));

SocketChannel sc = (SocketChannel) key.channel();

// 阻塞操作,直到写入缓冲区或出现异常,返回的为成功写入的字节数

// 当发送缓冲区已满,返回0

int writtenedSize = sc.write(ByteBuffer.allocate(1024));

// 如果未写入,则继续注册感性趣的事件

if (writtenedSize == 0) {

key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);

}

}

selector.selectedKeys().clear();

}

// 对于要写入的流

int wSize = socketChannel.write(ByteBuffer.allocate(1024));

}

}

public void server() throws IOException {

ServerSocketChannel ssc = ServerSocketChannel.open();

ServerSocket serverSocket= ssc.socket();

//绑定监听的接口

serverSocket.bind(new InetSocketAddress(123));

ssc.configureBlocking(false);

//注册感兴趣的连接建立事件

ssc.register(Selector.open() , SelectionKey.OP_ACCEPT);

// 和客户端同样的方式对selector.select进行轮询,只是添加了一个如下方法

Set<SelectionKey> keys = selector.selectedKeys();

for (SelectionKey key : keys) {

//

if (key.isAcceptable()) {

ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

SocketChannel socketChannel = serverSocketChannel.accept();

if (null == socketChannel) {

continue;

}

socketChannel.configureBlocking(false);

socketChannel.register(selector, SelectionKey.OP_READ);

}

}

}

UDP/IP+BIO

采用socket 由于UDP/IP是无连接的,要进行双向通信,必须两端都成为UDP server

基于datagramSocket和DatagramPacket

客户端和服务器端的代码如下:

// 如果双向通信,必须启动一个监听端口,承担服务器的职责

public void clentOrServer() {

try {

DatagramSocket serverSocket = new DatagramSocket(123);

byte[] buffer = new byte[65507];

DatagramPacket receivePackep = new DatagramPacket(buffer, buffer.length);

DatagramSocket socket = new DatagramSocket();

DatagramPacket packet = new DatagramPacket(datas, datas.length, server, port);

// 阻塞发送pack到指定的服务器和端口,

// 网络io异常抛出ioexception,

// 连不上目标端口 porUnreachableException

socket.send(packet);

// 阻塞并同步读取流信息,如接收到的流信息比packet长度长

// 则删除更长信息,

serverSocket.setSoTimeout(100);// 设置读取流的超时时间

serverSocket.receive(receivePackep);

} catch (SocketException e) {

e.printStackTrace();

}

}

UDP/IP+NIO

采用datagrameChannel和byteBuffer来实现

// 一对一的系统间的通信

public void clent() {

try {

DatagramChannel receiveChannel = DatagramChannel.open();

// 非阻塞模式

receiveChannel.configureBlocking(false);

DatagramSocket socket = receiveChannel.socket();

socket.bind(new InetSocketAddress(123));

Selector selector = Selector.open();

receiveChannel.register(selector, SelectionKey.OP_ACCEPT);

// 可象TCP/IP+NIO中对selector的遍历一样

DatagramChannel sendChannel = DatagramChannel.open();

sendChannel.configureBlocking(false);

SocketAddress target = new InetSocketAddress("127.0.0.1", 123);

sendChannel.write(ByteBuffer.allocate(1024));

} catch (IOException e) {

e.printStackTrace();

}

}

try {

DatagramChannel receiveChannel = DatagramChannel.open();

// 非阻塞模式

receiveChannel.configureBlocking(false);

DatagramSocket socket = receiveChannel.socket();

socket.bind(new InetSocketAddress(123));

Selector selector = Selector.open();

receiveChannel.register(selector, SelectionKey.OP_ACCEPT);

// 可象TCP/IP+NIO中对selector的遍历一样

DatagramChannel sendChannel = DatagramChannel.open();

sendChannel.configureBlocking(false);

SocketAddress target = new InetSocketAddress("127.0.0.1", 123);

sendChannel.write(ByteBuffer.allocate(1024));

} catch (IOException e) {

e.printStackTrace();

}

}

基于开源框架实现消息方式的系统间通信

Mina 是apache的开源项目,基于是java NIO构建

关键类为:

loConnector 配置客户端的消息处理器 io事件处理线程池 消息发送/接收的

Filter chain

loAcctptor 配置服务器端的io事件处理线程池 消息发送/接收的filter Chain

loHandler 作为mina和应用的接口 底层发生事件 mina会通知应用实现的

handler

loSession 类似于socketChannel的封装,可以进行连接的控制及流信息的输出

它采用filter chain的方式封装消息发送和接收

基于远程调用方式实现系统间的通信

这种方式主要用来实现基于RMI和webservice的应用

RMI(remote method invocation) 是java用于实现远程调用的重要机制

Webservice

基于开源框架实现远程调用方式的系统间通信

SpringRMI