小型直播系统-java实现(四)
websocket协议介绍
屁话不多说,简单点说,就是一个保持全双工,能够保持长连接的协议,我们用它来进行发送弹幕和发视频,图片进行即时交流的工具。下面进行弹幕的技术实现。
@OnOpen
public void onOpen(Session session)
{session.setMaxTextMessageBufferSize((int) MAX_BIG_LONG);
addOnlineCount(); // 在线数加1--必须先加1---错误的时候会减1
this.session = session;
if (!parseQueryString(session)) // 如果未能取得用户id和type,退出return;// 验证账号,防止伪造
this.chatUser = loginChatServer(chatUserId);
if (this.chatUser == null)
{
closeSession(session);return;
}
addChatUserToHashMap(roomId, chatUserId);
try
{// 发一个应答标记,表示已经成功登陆,没有构造
sendMessage("SUCCESS");
}
catch (IOException e) {
// TODO Auto-generated catch blocke.printStackTrace();
}Constant.ONLINECOUNT = onlineCount.toString();
}
onopen标识客户端与服务器进行通讯连接,在此处可进行业务逻辑的处理,将所需账号提出进行保存。
@Component
@ServerEndpoint("/chatServer")
public class ChatServer {
private static Log logger = LogFactory.getLog(ChatServer.class);
/** AtomicInteger:线程安全的整数对象 */
private static AtomicInteger onlineCount = new AtomicInteger(0);// 线程安全整数对象
private static long MAX_BIG_LONG = 1024 * 4 * 1024;
/** roomId与一个集合的哈希。集合中存储当前房间的所有用户 */
private static ConcurrentHashMap<String, CopyOnWriteArraySet
> roomToChatUserHashMap = new ConcurrentHashMap<String, CopyOnWriteArraySet >(); /** 用户与chatServer实例的哈希。 */
private static ConcurrentHashMap
chatUserToChatServer = new ConcurrentHashMap (); /** token验证 **/
private String token;
/** 房间号 **/
private String roomId;
/** chatUser 主键Id **/
private String chatUserId;
/** chatUserd对象 **/
private ChatUser chatUser;
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。若要实现服务端与单一客户端通信的话,
* 可以使用Map来存放,其中Key可以为用户标识·1
*/
声明两个类级别的ConcurrentHashMap存储房间号和chatUser之间的对应关系,一个房间对应多个chatUser账号,一个chatUser对应一个chatServer实例,发送消息时只需进行遍历这俩个map找到相应的实例对象,调用它的发消息即可成功的发送
/**
* 一个房间对应的一个chatuser列表 发消息时候进行遍历操作
*
* @param chatUserId
* @param chatUserId
* @return
*/
private boolean addChatUserToHashMap(String roomId, String chatUserId) {
try {
CopyOnWriteArraySet
chatUserIdSet = null; if (roomToChatUserHashMap.containsKey(roomId)) {
chatUserIdSet = roomToChatUserHashMap.get(roomId);
} else {
chatUserIdSet = new CopyOnWriteArraySet
(); }
chatUserIdSet.add(chatUserId);
roomToChatUserHashMap.put(roomId, chatUserIdSet);
chatUserToChatServer.put(chatUserId, this);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
这是进入房间时,将自身账号加入到房间聊天群组,用来发广播消息时遍历的。
/**
* 从哈希中移除已经断开的连接
*
* @param chatUserId
* @param terminalUuid
* @return
*/
private boolean removeChatUserFromRoomHashMap(String roomId, String chatUserId) {
try {
CopyOnWriteArraySet
chatUserIdSet = null; if (
roomToChatUserHashMap.containsKey(roomId)) {// 如果存在chatUserIdSet = roomToChatUserHashMap.get(chatUserId);// 取得chatUserId的集合
} else {
return true;
}
chatUserIdSet.remove(chatUserId);// 从集合中移除
if (chatUserIdSet.size() == 0) {// 如果已经没有连接终端
roomToChatUserHashMap.remove(roomId);// 则清除} else {
roomToChatUserHashMap.put(roomId, chatUserIdSet);// 更新哈希
}
ChatServer chatServer = chatUserToChatServer.get(chatUserId);
// 释放资源,清空chatServer
chatServer = null;
chatUserToChatServer.remove(chatUserId);// 将chatServer的实例从哈希中移除。return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
当断开聊天室时,应该讲其从聊天组移除掉。
/**
* 关闭websocket连接。
*
* @param session
* 要关闭的会话
*/
private void closeSession(Session session) {
try {
session.close();
} catch (IOException e) {
e.printStackTrace();
}
return;
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session) {
try {
removeChatUserFromRoomHashMap(this.roomId, this.chatUserId);
subOnlineCount(); // 在线数减1
} catch (Exception e) {
}
}
关闭调用的方法
/**
* 收到客户端消息后调用的方法。 客户端发送消息的方法是,发送到服务器,消息中指明要传送给那个用户,消息的类型。
* 此处只管发送,至于发送的模式(紧急通知,普通通知,订单消息,聊天信息),这里并进行解析处理,由客户端自行处理。
*
*
* @param message
* 客户端发送过来的消息
* @param session
* 可选的参数
*/
@OnMessage
public void onMessage(String message, Session session) {
if (StringUtils.isBlank(message)) // 收到的是空串
return;
if (StringUtils.equals(Constant.SUCCESS_RESPONSE, message)) {
return;
}
Gson gson = null;
try {// 解析json串
gson = new Gson();
ChatMessage chatMessage = gson.fromJson(message, ChatMessage.class);
// 解析json出错
if (chatMessage == null)
return;
/**
* 目的为了以后对每个模块进行拓展,所以分开写
*
*/
// 如果是图片类型的消息体
if (StringUtils.equals(chatMessage.getMessageType(), EnumMessageType.IMAGE.name())) {
try {
boolean dealImageResult = dealBinary(chatMessage);// 处理图片结果
if (!dealImageResult) {
return;// 如果没生成,则返回
}
} catch (Exception e) {
logger.error("处理图片异常" + e.getMessage());
}
}
// 处理小视频
else if ((StringUtils.equals(chatMessage.getMessageType(), EnumMessageType.VIDEO.name()))) {
try {
boolean dealVideoResult = dealBinary(chatMessage);
if (!dealVideoResult) {
return;
}
} catch (Exception e) {
logger.error("处理小视频异常" + e.getMessage());
}
}
// 处理二进制文件
else if ((StringUtils.equals(chatMessage.getMessageType(), EnumMessageType.BINARY.name()))) {
try {
boolean dealBinaryResult = dealBinary(chatMessage);
if (!dealBinaryResult) {
return;
}
} catch (Exception e) {
// TODO: handle exception
logger.error("处理文件异常" + e.getMessage());
}
}
chatMessage.setImageBase64("");// 清空串// 不用做NPE判断,因为chatUser如果为空 则推出closeSession 所以不可能为空
chatMessage.setChatName(this.chatUser.getUsername());
sendMessageToEveryoneInRoom(chatMessage);
} catch (Exception e) {// 发生错误即退出
e.printStackTrace();
}
}
进行通讯的模块。具体代码私信我,