小型直播系统-java实现(四)

websocket协议介绍

屁话不多说,简单点说,就是一个保持全双工,能够保持长连接的协议,我们用它来进行发送弹幕和发视频,图片进行即时交流的工具。下面进行弹幕的技术实现。

@OnOpen

public void onOpen(Session session)

{session.setMaxTextMessageBufferSize((int) MAX_BIG_LONG);

addOnlineCount(); // 在线数加1--必须先加1---错误的时候会减1

this.session = session;

if (!parseQueryString(session)) // 如果未能取得用户id和type,退出return;// 验证账号,防止伪造

this.chatUser = loginChatServer(chatUserId);

if (this.chatUser == null)

{

closeSession(session);return;

}

addChatUserToHashMap(roomId, chatUserId);

try

{// 发一个应答标记,表示已经成功登陆,没有构造

sendMessage("SUCCESS");

}

catch (IOException e) {

// TODO Auto-generated catch blocke.printStackTrace();

}Constant.ONLINECOUNT = onlineCount.toString();

}

onopen标识客户端与服务器进行通讯连接,在此处可进行业务逻辑的处理,将所需账号提出进行保存。

@Component

@ServerEndpoint("/chatServer")

public class ChatServer {

private static Log logger = LogFactory.getLog(ChatServer.class);

/** AtomicInteger:线程安全的整数对象 */

private static AtomicInteger onlineCount = new AtomicInteger(0);// 线程安全整数对象

private static long MAX_BIG_LONG = 1024 * 4 * 1024;

/** roomId与一个集合的哈希。集合中存储当前房间的所有用户 */

private static ConcurrentHashMap<String, CopyOnWriteArraySet> roomToChatUserHashMap = new ConcurrentHashMap<String, CopyOnWriteArraySet>();

/** 用户与chatServer实例的哈希。 */

private static ConcurrentHashMap chatUserToChatServer = new ConcurrentHashMap();

/** token验证 **/

private String token;

/** 房间号 **/

private String roomId;

/** chatUser 主键Id **/

private String chatUserId;

/** chatUserd对象 **/

private ChatUser chatUser;

/**

* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。若要实现服务端与单一客户端通信的话,

* 可以使用Map来存放,其中Key可以为用户标识·1

*/



声明两个类级别的ConcurrentHashMap存储房间号和chatUser之间的对应关系,一个房间对应多个chatUser账号,一个chatUser对应一个chatServer实例,发送消息时只需进行遍历这俩个map找到相应的实例对象,调用它的发消息即可成功的发送

/**

* 一个房间对应的一个chatuser列表 发消息时候进行遍历操作

*

* @param chatUserId

* @param chatUserId

* @return

*/

private boolean addChatUserToHashMap(String roomId, String chatUserId) {

try {

CopyOnWriteArraySet chatUserIdSet = null;

if (roomToChatUserHashMap.containsKey(roomId)) {

chatUserIdSet = roomToChatUserHashMap.get(roomId);

} else {

chatUserIdSet = new CopyOnWriteArraySet();

}

chatUserIdSet.add(chatUserId);

roomToChatUserHashMap.put(roomId, chatUserIdSet);

chatUserToChatServer.put(chatUserId, this);

return true;

} catch (Exception e) {

e.printStackTrace();

return false;

}

}

这是进入房间时,将自身账号加入到房间聊天群组,用来发广播消息时遍历的。

/**

* 从哈希中移除已经断开的连接

*

* @param chatUserId

* @param terminalUuid

* @return

*/

private boolean removeChatUserFromRoomHashMap(String roomId, String chatUserId) {

try {

CopyOnWriteArraySet chatUserIdSet = null;

if (
roomToChatUserHashMap.containsKey(roomId)) {// 如果存在

chatUserIdSet = roomToChatUserHashMap.get(chatUserId);// 取得chatUserId的集合

} else {

return true;

}

chatUserIdSet.remove(chatUserId);// 从集合中移除

if (chatUserIdSet.size() == 0) {// 如果已经没有连接终端


roomToChatUserHashMap.remove(roomId);// 则清除

} else {

roomToChatUserHashMap.put(roomId, chatUserIdSet);// 更新哈希

}

ChatServer chatServer = chatUserToChatServer.get(chatUserId);

// 释放资源,清空chatServer

chatServer = null;


chatUserToChatServer.remove(chatUserId);// 将chatServer的实例从哈希中移除。

return true;

} catch (Exception e) {

e.printStackTrace();

return false;

}

}

当断开聊天室时,应该讲其从聊天组移除掉。

/**

* 关闭websocket连接。

*

* @param session

* 要关闭的会话

*/

private void closeSession(Session session) {

try {

session.close();

} catch (IOException e) {

e.printStackTrace();

}

return;

}

/**

* 连接关闭调用的方法

*/

@OnClose

public void onClose(Session session) {

try {

removeChatUserFromRoomHashMap(this.roomId, this.chatUserId);

subOnlineCount(); // 在线数减1

} catch (Exception e) {

}

}

关闭调用的方法

/**

* 收到客户端消息后调用的方法。 客户端发送消息的方法是,发送到服务器,消息中指明要传送给那个用户,消息的类型。

* 此处只管发送,至于发送的模式(紧急通知,普通通知,订单消息,聊天信息),这里并进行解析处理,由客户端自行处理。

*

*

* @param message

* 客户端发送过来的消息

* @param session

* 可选的参数

*/

@OnMessage

public void onMessage(String message, Session session) {

if (StringUtils.isBlank(message)) // 收到的是空串

return;

if (StringUtils.equals(Constant.SUCCESS_RESPONSE, message)) {

return;

}

Gson gson = null;

try {// 解析json串

gson = new Gson();

ChatMessage chatMessage = gson.fromJson(message, ChatMessage.class);

// 解析json出错

if (chatMessage == null)

return;

/**

* 目的为了以后对每个模块进行拓展,所以分开写

*

*/

// 如果是图片类型的消息体

if (StringUtils.equals(chatMessage.getMessageType(), EnumMessageType.IMAGE.name())) {

try {

boolean dealImageResult = dealBinary(chatMessage);// 处理图片结果

if (!dealImageResult) {

return;// 如果没生成,则返回

}

} catch (Exception e) {

logger.error("处理图片异常" + e.getMessage());

}

}

// 处理小视频

else if ((StringUtils.equals(chatMessage.getMessageType(), EnumMessageType.VIDEO.name()))) {

try {

boolean dealVideoResult = dealBinary(chatMessage);

if (!dealVideoResult) {

return;

}

} catch (Exception e) {

logger.error("处理小视频异常" + e.getMessage());

}

}

// 处理二进制文件

else if ((StringUtils.equals(chatMessage.getMessageType(), EnumMessageType.BINARY.name()))) {

try {

boolean dealBinaryResult = dealBinary(chatMessage);

if (!dealBinaryResult) {

return;

}

} catch (Exception e) {

// TODO: handle exception

logger.error("处理文件异常" + e.getMessage());

}

}


chatMessage.setImageBase64("");// 清空串

// 不用做NPE判断,因为chatUser如果为空 则推出closeSession 所以不可能为空

chatMessage.setChatName(this.chatUser.getUsername());

sendMessageToEveryoneInRoom(chatMessage);

} catch (Exception e) {// 发生错误即退出

e.printStackTrace();

}

}

进行通讯的模块。具体代码私信我,

## 不足之处为考虑多个终端登录的情况,比如一个用户双开浏览器,这时候这套方案显然是不可行的,具体的解决方案找我咨询

## 弹幕聊天室的实现

相关文章

去字节跳动面试(Java岗),这1道面试题100%会问到

前言受邀参加过字节跳动面试的小伙伴一般都会收到一封面试邀请的邮件,邮件上面会注明考查的内容,只有两项,其中第一项就是“我们主要考察通用型的业务问题和过往的项目经历”,项目经历很好理解,那么“通用性业务...

直播预告丨如何实现Oracle存储过程到java的一键转化

数据库国产化改造过程中难度最高、人力投入最大的,莫过于对Oracle存储过程的重构。其中,把大量业务逻辑计算,从数据库层往应用层迁移,是数据库国产化改造的核心工作,也是未来实现架构分层分域和应用服务化...

基于PHP(前端是安卓)的开源直播系统

真正的大师,永远都怀着一颗学徒的心!一、项目简介今天说的这个软件是一款基于PHP(前端是安卓)的开源直播系统。二、实现功能用户管理身份确认直播管理财务管理等级管理内容管理三、技术选型PHPjavaht...

炉石:环境一天一个样!炸弹战也能分杯羹,抓了牧师还能克法师

"通灵学院"天梯环境的发展是吮吸万变的,前几天快攻肆意妄为,因此各种专抓快攻的体系站了出来,以术士、牧师突出。这两天当宇宙牧、JK牧强势崛起之后,新一轮的体系制衡又出现了,最最明显的就是炸弹战体系,其...

wxJava获取openid

1. 使用微信api获取openid:在用户同意授权(grant authorization)后,会返回一个code(authorization code)参数。在使用这个code换取access_t...