今天给各位分享asp网站源码分享授权破解的知识,其中也会对asp的网站进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!
前言
咱们的重点关键字就是两个WebSocket和集群,实现的框架便是基于ASP.NETCore,我也基于golang实现了一套,本文涉及到的相关源码和golang版本的实现都已上传至我的github,具体仓库地址可以转到文末自行跳转到
通过上面的图我们可以看到,我们这里构建集群示例使用的nginx,如果让nginx支持WebSocket的话,需要额外的配置,这个在网上有很多相关的文章介绍,这里就来列一下咱们示例的nginx配置,在配置文件nginx.conf里
//上游服务器地址也就是websocket服务的真实地址\nupstreamwsbackend{\nserver127.0.0.1:5001;\nserver127.0.0.1:5678;\n}\n\nserver{\nlisten5000;\nserver_namelocalhost;\n\nlocation~/chat/{\n//upstream地址\nproxy_passhttp://wsbackend;\nproxy_connect_timeout60s;\nproxy_read_timeout3600s;\nproxy_send_timeout3600s;\n//记得转发避免踩坑\nproxy_set_headerHost$host;\nproxy_http_version1.1;\n//http升级成websocket协议的头标识\nproxy_set_headerUpgrade$http_upgrade;\nproxy_set_headerConnection&34;;\n}\n}\n
这套配置呢,在搜索引擎上能收到很多,不过不妨碍我把使用的粘贴出来。这一套亲测有效,也是我使用的配置,请放心使用。个人认为如果是线上环境采用的负载均衡策略可以选择ip_hash的方式,保证同一个ip的客户端用户可以分发到一台WebSocket实例中去,这样的话能尽量避免使用redis的用户频道做消息传递。好了,接下来准备开始展示具体实现的代码了。
一对一发送34;127.0.0.1:6379&34;/&34;HelloWorld!&34;/chat/user/{id}&34;user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort}join&34;user:&34;{userPrefix}{id}&39;\\0&34;user{id}send:{msgBody.Msg}&34;user{id}send:{msgBody.Msg}&34;{userPrefix}{msgBody.Id}&34;{userPrefix}{id}&34;user{msgBody.FromId}send:{msgBody.Msg}&
上面我们展示了一对一发送的情况,接下来我们来看一下,群组发送的情况。群组发送的话就是只要大家都加入一个群组,只要客户端在群组里发送一条消息,则注册到当前群组内的所有客户端都可以收到消息。相对于一对一的情况就是如果当前WebSocket服务端如果存在用户加入某个群组,则当前当前WebSocket服务端则可以订阅一个group:群组唯一标识的redis频道,集群中的其他WebSocket服务器通过这个redis频道接收群组消息,通过一张图描述一下
群组的实现方式相对于一对一要简单一点
发送端可以不用考虑当前服务中的客户端连接,一股脑的交给redis把消息发布出去如果有WebSocket服务中的用户订阅了当前分组则可以接受消息,获取组内的用户循环发送消息
展示一下代码实现的方式,首先是定义一个action用于表示群组的相关场景
//包含两个标识一个是组别标识一个是注册到组别的用户\n[HttpGet(&34;)]\npublicasyncTaskChatGroup(stringgroupId,stringuserId)\n{\nif(HttpContext.WebSockets.IsWebSocketRequest)\n{\n_logger.LogInformation($&34;);\n\nvarwebSocket=awaitHttpContext.WebSockets.AcceptWebSocketAsync();\n//调用HandleGroup处理群组相关的消息\nawait_socketHandler.HandleGroup(groupId,userId,webSocket);\n}\nelse\n{\nHttpContext.Response.StatusCode=StatusCodes.Status400BadRequest;\n}\n}\n
接下来看一下HandleGroup的相关逻辑,还是在WebSocketHandler类中,看一下代码实现
publicclassWebSocketHandler:IDisposable\n{\nprivatereadonlyUserConnectionUserConnection=new();\nprivatereadonlyGroupUserGroupUser=new();\nprivatereadonlySemaphoreSlim_lock=new(1,1);\nprivatereadonlyConcurrentDictionary<string,IDisposable>_disposables=new();\nprivatereadonlystringgroupPrefix=&34;;\n\nprivatereadonlyILogger<WebSocketHandler>_logger;\nprivatereadonlyRedisClient_redisClient;\n\npublicWebSocketHandler(ILogger<WebSocketHandler>logger,RedisClientredisClient)\n{\n_logger=logger;\n_redisClient=redisClient;\n}\n\npublicasyncTaskHandleGroup(stringgroupId,stringuserId,WebSocketwebSocket)\n{\n//因为群组的集合可能会存在很多用户一起访问所以限制访问数量\nawait_lock.WaitAsync();\n//初始化群组容器群唯一标识为key群员容器为value\nvarcurrentGroup=GroupUser.Groups.GetOrAdd(groupId,newUserConnection{});\n//当前用户加入当前群组\n_=currentGroup.GetOrAdd(userId,webSocket);\n//只有有当前WebSocket服务的第一个加入当前组的时候才去订阅群组频道\n//如果不限制的话则会出现如果当前WebSocket服务有多个用户在一个组内则会重复收到redis消息\nif(currentGroup.Count==1)\n{\n//订阅redis频道\nawaitSubGroupMsg($&34;);\n}\n\n_lock.Release();\n\nvarbuffer=newbyte[1024*4];\n//阻塞接收WebSocket消息\nvarreceiveResult=awaitwebSocket.ReceiveAsync(newArraySegment<byte>(buffer),CancellationToken.None);\n//服务不退出的话则一直等待接收\nwhile(webSocket.State==WebSocketState.Open)\n{\ntry\n{\nstringmsg=Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd(&39;);\n_logger.LogInformation($&34;);\n\n//组装redis频道发布的消息,目标为群组标识\nChannelMsgBodychannelMsgBody=newChannelMsgBody{FromId=userId,ToId=groupId,Msg=msg};\n//通过redis发布消息\n_redisClient.Publish($&34;,JsonConvert.SerializeObject(channelMsgBody));\n\nreceiveResult=awaitwebSocket.ReceiveAsync(newArraySegment<byte>(buffer),CancellationToken.None);\n}\ncatch(Exceptionex)\n{\n_logger.LogError(ex,ex.Message);\nbreak;\n}\n}\n//如果客户端退出则在当前群组集合删除当前用户\n_=currentGroup.TryRemove(userId,out_);\nawaitwebSocket.CloseAsync(receiveResult.CloseStatus.Value,receiveResult.CloseStatusDescription,CancellationToken.None);\n}\n\nprivateasyncTaskSubGroupMsg(stringchannel)\n{\nvarsub=_redisClient.Subscribe(channel,async(channel,data)=>{\nChannelMsgBodymsgBody=JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());\nbyte[]sendByte=Encoding.UTF8.GetBytes($&34;);\n\n//在当前WebSocket服务器找到当前群组里的用户\nGroupUser.Groups.TryGetValue(msgBody.ToId,outvarcurrentGroup);\n//循环当前WebSocket服务器里的用户发送消息\nforeach(varuserincurrentGroup)\n{\n//不用给自己发送了\nif(user.Key==msgBody.FromId)\n{\ncontinue;\n}\n\nif(user.Value.State==WebSocketState.Open)\n{\nawaituser.Value.SendAsync(newArraySegment<byte>(sendByte,0,sendByte.Length),WebSocketMessageType.Text,true,CancellationToken.None);\n}\n}\n});\n//把当前频道加入订阅集合\n_disposables.TryAdd(channel,sub);\n}\n}\n
这里涉及到了GroupUser类,是来存储群组和群组用户的对应关系的,定义如下
publicclassGroupUser\n{\n//key为群组的唯一标识\npublicConcurrentDictionary<string,UserConnection>Groups=newConcurrentDictionary<string,UserConnection>();\n}\n
演示一下把两个用户添加到一个群组内,然后发送接收消息的场景,用户u1发送
用户u2接收
发送所有人34;/chat/all/{id}&34;alluser:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort}join&34;all&39;\\0&34;user{id}send:{msg}&34;user【{msgBody.FromId}】sendall:{msgBody.Msg}&
上面我们分别展示了一对一、群组、所有人的场景,但是实际使用的时候,每个用户只需要注册到WebSocket集群一次也就是保持一个连接即可,而不是一对一一个连接、注册群组一个连接、所有消息的时候一个连接。所以我们需要把上面的演示整合一下,一个用户只需要连接到WebSocket集群一次即可,至于发送给谁,加入什么群组,接收全部消息等都是连接后通过一些标识区分的,而不必每个类型的操作都注册一次,就和微信和QQ一样我只要登录了即可,至于其他操作都是靠数据标识区分的。接下来咱们就整合一下代码达到这个效果,大致的思路是
用户连接到WebSocket集群,把用户和连接保存到当前WebSocket服务器的用户集合中去。一对一发送的时候,只需要在具体的服务器中找到具体的客户端发送消息群组的时候,先把当前用户标识加入群组集合即可,接收消息的时候根据群组集合里的用户标识去用户集合里去拿具体的WebSocket连接发送消息全员消息的时候,直接遍历集群中的每个WebSocket服务里的用户集合里的WebSocket连接训话发送消息
这样的话就保证了每个客户端用户在集群中只会绑定一个连接,首先还是单独定义一个action,用于让客户端用户连接上来,具体实现代码如下所示
publicclassWebSocketChannelController:ControllerBase\n{\nprivatereadonlyILogger<WebSocketController>_logger;\nprivatereadonlyWebSocketChannelHandler_webSocketChannelHandler;\n\npublicWebSocketChannelController(ILogger<WebSocketController>logger,WebSocketChannelHandlerwebSocketChannelHandler)\n{\n_logger=logger;\n_webSocketChannelHandler=webSocketChannelHandler;\n}\n\n//只需要把当前用户连接到服务即可\n[HttpGet(&34;)]\npublicasyncTaskChannel(stringid)\n{\nif(HttpContext.WebSockets.IsWebSocketRequest)\n{\n_logger.LogInformation($&34;);\n\nvarwebSocket=awaitHttpContext.WebSockets.AcceptWebSocketAsync();\nawait_webSocketChannelHandler.HandleChannel(id,webSocket);\n}\nelse\n{\nHttpContext.Response.StatusCode=StatusCodes.Status400BadRequest;\n}\n}\n}\n
接下来看一下WebSocketChannelHandler类的HandleChannel方法实现,用于处理不同的消息,比如一对一、群组、全员消息等不同类型的消息
publicclassWebSocketChannelHandler:IDisposable\n{\n//用于存储当前WebSocket服务器链接上来的所有用户对应关系\nprivatereadonlyUserConnectionUserConnection=new();\n//用于存储群组和用户关系,用户集合采用HashSet保证每个用户只加入一个群组一次\nprivatereadonlyConcurrentDictionary<string,HashSet<string>>GroupUser=newConcurrentDictionary<string,HashSet<string>>();\nprivatereadonlySemaphoreSlim_lock=new(1,1);\n//存放redis订阅实例\nprivatereadonlyConcurrentDictionary<string,IDisposable>_disposables=new();\n\n//一对一redis频道前缀\nprivatereadonlystringuserPrefix=&34;;\n//群组redis频道前缀\nprivatereadonlystringgroupPrefix=&34;;\n//全员redis频道\nprivatereadonlystringall=&34;;\n\nprivatereadonlyILogger<WebSocketHandler>_logger;\nprivatereadonlyRedisClient_redisClient;\n\npublicWebSocketChannelHandler(ILogger<WebSocketHandler>logger,RedisClientredisClient)\n{\n_logger=logger;\n_redisClient=redisClient;\n}\n\npublicasyncTaskHandleChannel(stringid,WebSocketwebSocket)\n{\nawait_lock.WaitAsync();\n\n//每次连接进来就添加到用户集合\n_=UserConnection.GetOrAdd(id,webSocket);\n\n//每个WebSocket服务实例只需要订阅一次全员消息频道\nawaitSubMsg($&34;);\nif(UserConnection.Count==1)\n{\nawaitSubAllMsg(all);\n}\n\n_lock.Release();\nvarbuffer=newbyte[1024*4];\n//接收客户端消息\nvarreceiveResult=awaitwebSocket.ReceiveAsync(newArraySegment<byte>(buffer),CancellationToken.None);\n\nwhile(webSocket.State==WebSocketState.Open)\n{\ntry\n{\nstringmsg=Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd(&39;);\n//读取客户端消息\nChannelDatachannelData=JsonConvert.DeserializeObject<ChannelData>(msg);\n//判断消息类型\nswitch(channelData.Method)\n{\n//一对一\ncase&34;:\nawaitHandleOne(id,channelData.MsgBody,receiveResult);\nbreak;\n//把用户加入群组\ncase&34;:\nawaitAddUserGroup(id,channelData.Group,webSocket);\nbreak;\n//处理群组消息\ncase&34;:\nawaitHandleGroup(channelData.Group,id,webSocket,channelData.MsgBody);\nbreak;\n//处理全员消息\ndefault:\nawaitHandleAll(id,channelData.MsgBody);\nbreak;\n}\n\nreceiveResult=awaitwebSocket.ReceiveAsync(newArraySegment<byte>(buffer),CancellationToken.None);\n}\ncatch(Exceptionex)\n{\n_logger.LogError(ex,ex.Message);\nbreak;\n}\n}\n\nawaitwebSocket.CloseAsync(receiveResult.CloseStatus.Value,receiveResult.CloseStatusDescription,CancellationToken.None);\n\n//在群组中移除当前用户\nforeach(varusersinGroupUser.Values)\n{\nlock(users)\n{\nusers.Remove(id);\n}\n}\n//当前客户端用户退出则移除连接\n_=UserConnection.TryRemove(id,out_);\n//取消用户频道订阅\n_disposables.Remove($&34;,outvarsub);\nsub?.Dispose();\n}\n\npublicvoidDispose()\n{\nforeach(vardisposablein_disposables)\n{\ndisposable.Value.Dispose();\n}\n\n_disposables.Clear();\n}\n}\n
这里涉及到了ChannelData类是用于接收客户端消息的类模板,具体定义如下
publicclassChannelData\n{\n//消息类型比如一对一群组全员\npublicstringMethod{get;set;}\n//群组标识\npublicstringGroup{get;set;}\n//消息体\npublicobjectMsgBody{get;set;}\n}\n
类中并不会包含当前用户信息,因为连接到当前服务的时候已经提供了客户端唯一标识。结合上面的处理代码我们可以看出,客户端用户连接到WebSocket实例之后,先注册当前用户的redis订阅频道并且当前实例仅注册一次全员消息的redis频道,用于处理非当前实例注册客户端的一对一消息处理和全员消息处理,然后等待接收客户端消息,根据客户端消息的消息类型来判断是进行一对一、群组、或者全员的消息类型处理,它的工作流程入下图所示
由代码和上面的流程图可知,它根据不同的标识去处理不同类型的消息,接下来我们可以看下每种消息类型的处理方式。
一对一处理34;user{id}send:{msgBody.Msg}&34;user{id}send:{msgBody.Msg}&34;{userPrefix}{msgBody.Id}&34;user{msgBody.FromId}send:{msgBody.Msg}&34;Method&34;One&34;MsgBody&34;Id&34;2&34;Msg&34;Hello&
接下来看群组处理方式,这个和之前的逻辑是有出入的,首先是用户要先加入到某个群组然后才能接收群组消息或者在群组中发送消息,之前是一个用户对应多个连接,整合了之后集群中每个用户只关联唯一的一个WebSocket连接,首先看用户加入群组的逻辑
privateasyncTaskAddUserGroup(stringuser,stringgroup,WebSocketwebSocket)\n{\n//获取群组信息\nvarcurrentGroup=GroupUser.GetOrAdd(group,newHashSet<string>());\n\nlock(currentGroup)\n{\n//把用户标识加入当前组\n_=currentGroup.Add(user);\n}\n\n//每个组的redis频道,在每台WebSocket服务器实例只注册一次订阅\nif(currentGroup.Count==1)\n{\n//订阅当前组消息\nawaitSubGroupMsg($&34;);\n}\n\nstringaddMsg=$&34;;\nbyte[]sendByte=Encoding.UTF8.GetBytes(addMsg);\nawaitwebSocket.SendAsync(newArraySegment<byte>(sendByte,0,sendByte.Length),WebSocketMessageType.Text,true,CancellationToken.None);\n//如果有用户加入群组,则通知其他群成员\nChannelMsgBodychannelMsgBody=newChannelMsgBody{FromId=user,ToId=group,Msg=addMsg};\n_redisClient.Publish($&34;,JsonConvert.SerializeObject(channelMsgBody));\n}\n
用户想要在群组内发消息,则必须先加入到一个具体的群组内,具体的加入群组的格式如下
{&34;:&34;,&34;:&34;}\n
Method为UserGroup代表着用户加入群组的业务类型,Group代表着你要加入的群组唯一标识。接下来就看下,用户发送群组消息的逻辑了
privateasyncTaskHandleGroup(stringgroupId,stringuserId,WebSocketwebSocket,objectmsgBody)\n{\n//判断群组是否存在\nvarhasValue=GroupUser.TryGetValue(groupId,outvarusers);\nif(!hasValue)\n{\nbyte[]sendByte=Encoding.UTF8.GetBytes($&34;);\nawaitwebSocket.SendAsync(newArraySegment<byte>(sendByte,0,sendByte.Length),WebSocketMessageType.Text,true,CancellationToken.None);\nreturn;\n}\n\n//只有加入到当前群组,才能在群组内发送消息\nif(!users.Contains(userId))\n{\nbyte[]sendByte=Encoding.UTF8.GetBytes($&34;);\nawaitwebSocket.SendAsync(newArraySegment<byte>(sendByte,0,sendByte.Length),WebSocketMessageType.Text,true,CancellationToken.None);\nreturn;\n}\n\n_logger.LogInformation($&34;);\n\n//发送群组消息\nChannelMsgBodychannelMsgBody=newChannelMsgBody{FromId=userId,ToId=groupId,Msg=msgBody.ToString()};\n_redisClient.Publish($&34;,JsonConvert.SerializeObject(channelMsgBody));\n}\n
加入群组之后则可以发送和接收群组内的消息了,给群组发送消息的格式如下
{&34;:&34;,&34;:&34;,&34;:&34;}\n
Method为Group代表着用户加入群组的业务类型,Group则代表你要发送到具体的群组的唯一标识,MsgBody则是发送到群组内的消息。最后再来看下订阅群组内消息的情况,也就是处理群组消息的逻辑
privateasyncTaskSubGroupMsg(stringchannel)\n{\nvarsub=_redisClient.Subscribe(channel,async(channel,data)=>\n{\n//接收群组订阅消息\nChannelMsgBodymsgBody=JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());\nbyte[]sendByte=Encoding.UTF8.GetBytes($&34;);\n\n//获取当前服务器实例中当前群组的所有用户连接\nGroupUser.TryGetValue(msgBody.ToId,outvarcurrentGroup);\nforeach(varuserincurrentGroup)\n{\nif(user==msgBody.FromId)\n{\ncontinue;\n}\n\n//通过群组内的用户标识去用户集合获取用户集合里的用户唯一连接发送消息\nif(UserConnection.TryGetValue(user,outvartargetSocket)&&targetSocket.State==WebSocketState.Open)\n{\nawaittargetSocket.SendAsync(newArraySegment<byte>(sendByte,0,sendByte.Length),WebSocketMessageType.Text,true,CancellationToken.None);\n}\nelse\n{\ncurrentGroup.Remove(user);\n}\n}\n});\n_disposables.TryAdd(channel,sub);\n}\n
全员消息处理34;user{id}send:{msgBody}&34;Method&34;All&34;MsgBody&34;HelloAll&34;user【{msgBody.FromId}】sendall:{msgBody.Msg}&
由于篇幅有限,没办法设计到全部的相关源码,因此在这里贴出来github相关的地址,方便大家查看和运行源码。相关的源码我这里实现了两个版本,一个是基于asp.netcore的版本,一个是基于golang的版本。两份源码的实现思路是一致的,所以这两份代码可以运行在一套集群示例里,配置在一套nginx里,并且连接到同一个redis实例里即可
asp.netcore源码示例https://github.com/softlgl/WebsocketClustergolang源码示例https://github.com/softlgl/websocket-cluster
仓库里还涉及到本人闲暇之余开源的其他仓库,由于本人能力有限难登大雅之堂,就不做广告了,有兴趣的同学可以自行浏览一下。
总结#
本文基于ASP.NETCore框架提供了一个基于WebSocket做集群的示例,由于思想是通用的,所以基于这个思路楼主也实现了golang版本。其实在之前就想自己动手搞一搞关于WebSocket集群方面的设计,本篇文章算是对之前想法的一个落地操作。其核心思路文章已经做了相关介绍,由于这些只是博主关于构思的实现,可能有很多细节尚未体现到,还希望大家多多理解。其核心思路总结一下
首先是,利用可以构建WebSocket服务的框架,在当前服务实例中保存当前客户端用户和WebSocket的连接关系如果消息的目标客户端不在当前服务器,可以利用redis频道、消息队列相关、甚至是数据库类的共享回话发送的消息,由目标服务器获取目标是否属于自己的ws会话本文设计的思路使用的是无状态的方式,即WebSocket服务实例之间不存在直接的消息通信和相互的服务地址存储,当然也可以利用redis等存储在线用户信息等,这个可以参考具体业务自行设计
读万卷书,行万里路。在这个时刻都在变化点的环境里,唯有不断的进化自己,多接触多尝试不用的事物,多扩展自己的认知思维,方能构建自己的底层逻辑。毕竟越底层越抽象,越通用越抽象。面对未知的挑战,自身作为自己坚强的后盾,可能才会让自己更踏实。
好了,文章到此结束,希望可以帮助到大家。