更新時(shí)間:2020-10-29 來(lái)源:黑馬程序員 瀏覽量:
1.NIO群聊實(shí)現(xiàn)步驟
·構(gòu)建Selector以及服務(wù)端監(jiān)聽(tīng)通
·道啟動(dòng)監(jiān)聽(tīng)并處理建立連接請(qǐng)求
·處理讀數(shù)據(jù)
·群發(fā)數(shù)據(jù)實(shí)現(xiàn)
·客戶端測(cè)試實(shí)現(xiàn)
2. 服務(wù)端實(shí)現(xiàn)
2.0 服務(wù)端完整代碼服務(wù)端的主要功能如下
(1)開(kāi)放監(jiān)聽(tīng)端口,方法ChatServer構(gòu)造方法
(2)處理鏈接請(qǐng)求,方法listener實(shí)現(xiàn)連接的建立
(2)讀取消息內(nèi)容,方法readData
(4)轉(zhuǎn)發(fā)消息給當(dāng)前所有在線的人,方法sendData2All
package com.hgy.chat;
/**
* 群聊服務(wù)器
*/
public class ChatServer {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
/**
* 初始化服務(wù)端
*/
public ChatServer() {
try {
// 創(chuàng)建Selector以及ServerSocketChannel
selector = Selector.open();
serverSocketChannel = serverSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8888));
//將服務(wù)端監(jiān)聽(tīng)通道注冊(cè)到Selector中
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 監(jiān)聽(tīng)客戶端操作
*/
public void listener() {
while (true) {
try {
if (selector.select(1000) == 0) {
continue;
}
//獲得所有有事件的key
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//如果當(dāng)前key是處理鏈接類型
if (key.isAcceptable()) {
SocketChannel socketChannel =
serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
// 當(dāng)前鏈接是讀數(shù)據(jù)類型
if (key.isReadable()) {
readData(key);
}
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 讀取數(shù)據(jù)并群發(fā)給所有的用戶
* @param key
*/
private void readData(SelectionKey key) {
try {
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
channel.read(byteBuffer);
String s = new String(byteBuffer.array());
// 寫到其他所有客戶端
sendData2All(s);
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 群發(fā)給所有的用戶
* @param msg 需要發(fā)送的消息
*/
private void sendData2All(String msg) {
try {
// 當(dāng)前在selector上注冊(cè)的所有key就是所有用戶
Set<SelectionKey> keys = selector.keys();
for (SelectionKey key : keys) {
// 獲取每個(gè)用戶的通道
SelectableChannel channel = key.channel();
// 實(shí)現(xiàn)數(shù)據(jù)發(fā)送
if (channel instanceof SocketChannel) {
System.out.println(":::" + msg);
ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
SocketChannel socketChannel = (SocketChannel) channel;
socketChannel.write(byteBuffer);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ChatServer chatServer = new ChatServer();
chatServer.listener();
}
}
2.1 構(gòu)建Selector以及服務(wù)端監(jiān)聽(tīng)通道
當(dāng)ChatServer對(duì)象被創(chuàng)建時(shí)具體實(shí)現(xiàn)步驟如下
(1)創(chuàng)建serverSocketChannel對(duì)象
(2)設(shè)置處理模式為非阻塞模式
(3)綁定監(jiān)聽(tīng)端口
(4)將channel注冊(cè)到selector中
public class ChatServer {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
/**
* 初始化服務(wù)端
*/
public ChatServer() {
try {
// 創(chuàng)建Selector以及ServerSocketChannel
selector = Selector.open();
serverSocketChannel = serverSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8888));
//將服務(wù)端監(jiān)聽(tīng)通道注冊(cè)到Selector中
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.2 實(shí)現(xiàn)監(jiān)聽(tīng)并處理建立連接請(qǐng)求
連接請(qǐng)求處理實(shí)現(xiàn)步驟
(1)獲得所有有事件的key,通過(guò)key就可以拿到用戶的SocketChannel
(2)循環(huán)遍歷每一個(gè)key,判斷當(dāng)前是讀事件,還是建立連接事件
(3)如果是建立連接事件則直接將該通道注冊(cè)到selector中
(4)如果是讀數(shù)據(jù)事件就交給具體的讀數(shù)據(jù)方法處理數(shù)據(jù)
2.3 處理讀數(shù)據(jù)數(shù)據(jù)
處理的具體實(shí)現(xiàn)步驟
(1)通過(guò)key獲取和用戶連接的通道(相當(dāng)于輸入流)
(2)獲取通道的數(shù)據(jù)并打印
(3)將數(shù)據(jù)轉(zhuǎn)發(fā)給其他在線用戶
public void listener() {
while (true) {
try {
if (selector.select(1000) == 0) {
continue;
}
//獲得所有有事件的key
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//如果當(dāng)前key是處理鏈接類型
if (key.isAcceptable()) {
SocketChannel socketChannel =
serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
// 當(dāng)前鏈接是讀數(shù)據(jù)類型
if (key.isReadable()) {
readData(key);
}
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
2.4 群發(fā)數(shù)據(jù)實(shí)現(xiàn)
數(shù)據(jù)群發(fā)實(shí)現(xiàn)步驟
(1)當(dāng)前在線用戶實(shí)際上就是selector中所有注冊(cè)的key,也就是在線的用戶
(2)通過(guò)key拿到和用戶的鏈接講消息轉(zhuǎn)發(fā)出去
/**
* 監(jiān)聽(tīng)客戶端操作
*/
/**
* 讀取數(shù)據(jù)并群發(fā)給所有的用戶
* @param key
*/
private void readData(SelectionKey key) {
try {
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
channel.read(byteBuffer);
String s = new String(byteBuffer.array());
// 寫到其他所有客戶端
sendData2All(s);
}
} catch (IOException e) {
e.printStackTrace();
}
}
2.5 啟動(dòng)服務(wù)端
public static void main(String[] args) {
ChatServer chatServer = new ChatServer();
chatServer.listener();
}
3. 客戶端實(shí)現(xiàn)
客戶端實(shí)現(xiàn)
(1)首先創(chuàng)建SocketChannel對(duì)象并鏈接到具體的服務(wù)器
(2)將通道注冊(cè)到selector中
(3)開(kāi)啟一個(gè)新的線程監(jiān)聽(tīng)selector中所有key的事件
(4)在主線程中循環(huán)阻塞獲取用戶的輸入
public class ChatClient {
public static void main(String[] args) throws Exception {
// 客戶端代碼, 建立連接
Selector selector = Selector.open();
SocketChannel socketChannel = SocketChannel.open(new
InetSocketAddress("127.0.0.1", 8888));
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
// 開(kāi)啟一個(gè)新的線程輪詢當(dāng)前客戶是否有可讀消息
new Thread(() -> {
while (true) {
try {
int select = selector.select(1000);
// 有可讀消息進(jìn)行解析打印
if (select > 0) {
for (SelectionKey key : selector.selectedKeys()) {
if (key.isReadable()) {
SocketChannel channel = (SocketChannel)
key.channel();
ByteBuffer byteBuffer =
ByteBuffer.allocate(1024);
channel.read(byteBuffer);
System.out.println(":==:" + new
String(byteBuffer.array()));
// 寫到其他所有客戶端
System.out.println(new
String(byteBuffer.array()));
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
// 主線程中循環(huán)獲取用戶輸入的聊天消息
while(true) {
Scanner scanner = new Scanner(System.in);
//發(fā)送用戶的消息
socketChannel.write(ByteBuffer.wrap(scanner.nextLine().getBytes()));
}
}
}
猜你喜歡:
Java枚舉(Enum)類型原理詳細(xì)介紹【黑馬程序員】
2020-10-29JVM字符串底層實(shí)現(xiàn)原理介紹【java培訓(xùn) 】
2020-10-28視頻在線點(diǎn)播功能如何實(shí)現(xiàn)?【Java培訓(xùn)】
2020-10-28Java如何發(fā)送響應(yīng)消息體?
2020-10-27Java培訓(xùn)機(jī)構(gòu)培訓(xùn)費(fèi)用多少錢?靠譜培訓(xùn)機(jī)構(gòu)怎么選?
2020-10-27Java培訓(xùn)班培訓(xùn)費(fèi)用大概要多少錢?看完不上當(dāng)
2020-10-27