Java Aio Socket异步通信例子
服务端实现:
public class AioServer { private AsynchronousServerSocketChannel server; private int port; private ByteBuffer send = ByteBuffer.allocate(2048); private ByteBuffer recive = ByteBuffer.allocate(2048); public AioServer(int port) throws IOException { server = AsynchronousServerSocketChannel.open(); this.port = port; } public void startup() throws Exception { System.out.println("========服务已启动========="); AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(4)); server.bind(new InetSocketAddress(port)); // 监听客户端连接 server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() { @Override public void completed(AsynchronousSocketChannel result, Void attachment) { try { recive.clear(); Integer n = result.read(recive).get(); System.out.println("size:"+n+">已捕捉到连接-消息:"+new String(recive.array())); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }; //接收客户端数据 send.clear(); send.put("处理成功".getBytes()); send.flip(); result.write(send); //反馈结果 server.accept(null, this); //这里是递归使服务器保持监听状态 } @Override public void failed(Throwable exc, Void attachment) { } }); group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); } public static void main(String[] args) { try { new AioServer(8000).startup(); } catch (Exception e) { e.printStackTrace(); } } }
客户端实现:
public class AioClient { private int port; private ByteBuffer send = ByteBuffer.allocate(2048); private ByteBuffer recive = ByteBuffer.allocate(2048); private boolean isDone = false; public AioClient(int port) { this.port = port; } public void send(String message) throws IOException { System.out.println("正在发送..."); AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); client.connect(new InetSocketAddress(port), null, new CompletionHandler<Void, Void>() { @Override public void completed(Void result, Void attachment) { send.clear(); send.put(message.getBytes()); send.flip(); recive.clear(); // 写 client.write(send, null, new CompletionHandler<Integer, Void>() { @Override public void completed(Integer result, Void attachment) { // 读取结果 try { Integer integer = client.read(recive).get(); System.out.println("size:"+integer+"> 收到结果:"+new String(recive.array())); isDone = true; } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, Void attachment) { } }); } @Override public void failed(Throwable exc, Void attachment) { } }); while (isDone == false) { try { Thread.sleep(1);// 这里起到wait的作用,不做点事情此处会被优化为wait 会一直挂起 } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { try { new AioClient(8000).send("客户端的数据包"); } catch (IOException e) { e.printStackTrace(); } } }