(1)业务复杂度介绍
开门见山,假设一个直播间同时500w人在线,那么1秒钟1000条弹幕,那么弹幕系统的推送频率就是: 500w * 1000条/秒=50亿条/秒 ,想想b站2019跨年晚会那次弹幕系统得是多么的nb,况且一个大型网站不可能只有一个直播间!
package mainimport ( "fmt" "net/http")func main() { fmt.println("listen localhost:8080") // 注册一个用于websocket的路由,实际业务中不可能只有一个路由 http.handlefunc("/messages", messagehandler) // 监听8080端口,没有实现服务异常处理器,因此第二个参数是nil http.listenandserve("localhost:8080", nil)}func messagehandler(response http.responsewriter, request *http.request) { // todo: 实现消息处理 response.write([]byte("helloworld"))}然后完善messagehandler函数:
func messagehandler(response http.responsewriter, request *http.request) { var upgrader = websocket.upgrader{ // 允许跨域 checkorigin: func(resquest *http.request) bool { return true }, } // 建立连接 conn, err := upgrader.upgrade(response, request, nil) if err != nil { return } // 收发消息 for { // 读取消息 _, bytes, err := conn.readmessage() if err != nil { _ = conn.close() } // 写入消息 err = conn.writemessage(websocket.textmessage, bytes) if err != nil { _ = conn.close() } }}现在基本上实现了websocket功能,但是websocket的原生api不是线程安全的(close方法是线程安全的,并且是可重入的),并且其他模块无法复用业务逻辑,因此进行封装:
封装connection对象描述一个websocket连接为connection对象提供线程安全的关闭、接收、发送api
// main.gopackage mainimport ( "bluemiaomiao.github.io/websocket-go/service" "fmt" "net/http" "github.com/gorilla/websocket")func main() { fmt.println("listen localhost:8080") http.handlefunc("/messages", messagehandler) _ = http.listenandserve("localhost:8080", nil)}func messagehandler(response http.responsewriter, request *http.request) { var upgrader = websocket.upgrader{ // 允许跨域 checkorigin: func(resquest *http.request) bool { return true }, } // 建立连接 conn, err := upgrader.upgrade(response, request, nil) wsconn, err := service.create(conn) if err != nil { return } // 收发消息 for { // 读取消息 msg, err := wsconn.readone() if err != nil { wsconn.close() } // 写入消息 err = wsconn.writeone(msg) if err != nil { _ = conn.close() } }}// service/messsage_service.gopackage serviceimport ( "errors" "github.com/gorilla/websocket" "sync")// 封装的连接对象// // 由于websocket的close()方法是可重入的,所以可以多次调用,但是关闭channel的close()// 方法不是可重入的,因此通过isclosed进行判断// isclosed可能发生资源竞争,因此通过互斥锁避免// 关闭websocket连接后,也要自动关闭输入输出消息流,因此通过signalcloseloopchan实现type connection struct { conn *websocket.conn // 具体的连接对象 inputstream chan []byte // 输入流,使用channel模拟 outputstream chan []byte // 输出流,使用chaneel模拟 signalcloseloopchan chan byte // 关闭信号 isclosed bool // 是否调用过close()方法 lock sync.mutex // 简单的锁}// 用于初始化一个连接对象func create(conn *websocket.conn) (connection *connection, err error) { connection = &connection{ conn: conn, inputstream: make(chan []byte, 1000), outputstream: make(chan []byte, 1000), signalcloseloopchan: make(chan byte, 1), isclosed: false, } // 启动读写循环 go connection.readloop() go connection.writeloop() return}// 读取一条消息func (c *connection) readone() (msg []byte, err error) { select { case msg = <-(*c).inputstream: case <-(*c).signalcloseloopchan: err = errors.new("connection is closed") } return}// 写入一条消息func (c *connection) writeone(msg []byte) (err error) { select { case (*c).outputstream <- msg: case <-(*c).signalcloseloopchan: err = errors.new("connection is closed") } return}// 关闭连接对象func (c *connection) close() { _ = (*c).conn.close() (*c).lock.lock() if !(*c).isclosed { close((*c).signalcloseloopchan) } (*c).lock.unlock()}// 读取循环func (c *connection) readloop() { // 不停的读取长连接中的消息,只要存在消息就将其放到队列中 for { _, bytes, err := (*c).conn.readmessage() if err != nil { (*c).close() } select { case <-(*
网站SEO优化做移动端的必要性上海ecs云服务器提示错误建站程序如何安装在虚拟主机上dv ssl为什么ZStack可以,可以渠道,也可以阿里云2019阿里云双十一怎样拼团租用云服务器更优惠?我手机收到非法信息监测-云服务器问题内链SEO怎么做才能让网站更具好排名呢?