IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    利好 SharedWorker 实现跨页面单例 WebSocket

    Innei发表于 2024-04-08 08:09:36
    love 0
    该渲染由 Shiro API 生成,可能存在排版问题,最佳体验请前往:https://innei.in/posts/tech/using-sharedworker-singleton-websocket-in-nextjs

    在之前的文章中,我详细的介绍了站点的实时人数是如何实现的?,在方案中,我留下了一个悬念,如何使用 SharedWorker 在页面之间共享 Socket 实例,实现跨页面单例 WebSocket。

    动机

    探索这个问题的背景是无意间在知乎看到了 WebSocket 的一个问题,其中有回答提到 WebSocket 连接过于占用服务器资源。当页面重复打开很多个时,每个页面都会建立 WebSocket 连接,确实无意间的增大了服务器压力。虽说这个问题我之前也有考虑过,但是由于个人站点并不会出现大量的用户同时在线,所以并没有深入研究。我们使用的是 Socket.IO 的方案建立 WebSocket 连接,Socket.IO 本身就是对 WebSocket 的封装,所以从开销上来说,Socket.IO 会比原生 WebSocket 多一些开销。关于 Socket.IO 连接对服务器侧的内存开销,文档中有所提及:Memory Usage | Socket.IO。

    虽然这点性能优化不痛不痒也没有任何提升,但是既然有这个问题,那我们这次就试着去解决这个问题。

    找到方案

    我们要解决的问题很明确,就是当一个浏览器打开两个或以上的我站的页面时(多个 Tab),复用同一个 Socket 实例。

    通过搜索关键字,我们了解到一个 API,SharedWorker,SharedWorker 是一个在多个浏览上下文(例如多个窗口、标签或 iframe)之间共享的 Worker。SharedWorker 有一个全局作用域,可以在多个浏览上下文中使用,这样就可以实现我们的目标。

    我们只需要把原本在 Socket 实例的代码放到 SharedWorker 中,然后在页面中与 Worker 中的 Socket 实例通信即可。由于之前的 SocketClient 对 Socket 进行了抽象,在此次重构中并不需要修改太多代码,而是只需要实现通信层即可。

    SharedWorker 基本使用

    先来看看 SharedWorker 是如何使用的。

    这个内容只能在原文中查看哦

    上面的例子中,我们创建了一个 SharedWorker,然后在页面中与 Worker 通信。在 Worker 中,我们监听了 onconnect 事件,当 Worker 连接时,我们监听了 port 的 message 事件,然后在页面中,我们监听了 port 的 message 事件,worker 中的消息在页面中进行进一步处理。

    SharedWorker 同 Worker 一样,消息通过 MessageChannel 进行传递。而 worker.port 本质就是对 MessageChannel 的封装。

    现在我们运行这个基本的例子,看看效果。

    https://codesandbox.io/p/sandbox/ecstatic-jerry-6xsgvs

    当我们同时打开两个 Tab 时,我们可以看到两个 Tab 都会收到 Worker 发送的消息。

    通过 chrome://inspect/#workers 我们可以看到 Worker 的信息。当打开多个 Tab 时,SharedWorker 会在多个 Tab 之间共享;当多个页面都被关闭时,Worker 会被销毁。

    在 Next.js 中使用 SharedWorker

    上面的例子中,我们把 worker 的实现放在了 public 目录下,这样的方式并不适用于工程化的项目中,因为在工程化项目中,我们不应该直接引用 public 目录下的文件,而是应该通过 import 的方式引入。第二,我们或许需要在 Worker 实现中引用外部库,或者使用 TypeScript 编写 Worker 实现。而对于这些需求,直接裸写 js 的方式是无法实现的。

    对此,我们需要对 Next.js 的 webpack 配置进行一些修改,以支持 Worker 的引入。

    首先安装 worker-loader:

    npm i -D worker-loader

    修改 next.config.js:

    /** @type {import('next').NextConfig} */
    
    let nextConfig = {
      webpack: (config, { webpack }) => {
        config.module.rules.unshift({
          test: /\.worker\.ts$/,
          loader: 'worker-loader',
          options: {
            publicPath: '/_next/',
            worker: {
              type: 'SharedWorker',
              // https://v4.webpack.js.org/loaders/worker-loader/#worker
              options: {
                name: 'ws-worker',
              },
            },
          },
        })
    
        return config
      },
    }
    
    export default nextConfig

    这里我们使用了 worker-loader,并且配置了文件后缀 .worker.ts 为 SharedWorker 实现,这样我们就可以在 Next.js 中使用 SharedWorker 了。

    const worker = new SharedWorker("/worker.js"); // ![code --]
    import worker from './worker.worker' // ![code ++]

    实现 SharedWorker Socket

    现在我们在 Worker 中实现 Socket 的逻辑。

    大概梳理一下流程:

    1. 首先在建立 Worker 时候(也就是第一个页面会建立 Worker),传递 Socket 连接配置。(因为 Worker 中无法获得环境变量和主线程变量)
    2. 创建 Socket 实例,然后对 Socket 连接状态和消息事件监听,通过 MessageChannel 传递消息给主线程(页面)。
    3. 在新页面打开时,或者 Socket 连接完成后,传递 Socket 相关信息到主线程,这些数据需要被主进程存储并用于其他组件消费。
    4. 在主进程中,实现和 Worker 的通信,以及对 Socket 实例的操作。比如 emit 方法。
    这个内容只能在原文中查看哦

    具体的代码实现如下:

    这个内容只能在原文中查看哦
    import { io } from 'socket.io-client'
    import type { Socket } from 'socket.io-client'
    
    /// <reference lib="webworker" />
    
    let ws: Socket | null = null
    
    function setupIo(config: { url: string }) {
      if (ws) return
      // 使用 socket.io
      console.log('Connecting to io, url: ', config.url)
    
      ws = io(config.url, {
        timeout: 10000,
        reconnectionDelay: 3000,
        autoConnect: false,
        reconnectionAttempts: 3,
        transports: ['websocket'],
      })
      if (!ws) return
    
      ws.on('disconnect', () => {
        boardcast({
          type: 'disconnect',
        })
      })
    
      /**
       * @param {any} payload
       */
      ws.on('message', (payload) => {
        console.log('ws', payload)
    
        boardcast({
          type: 'message',
          payload,
        })
      })
    
      ws.on('connect', () => {
        console.log('Connected to ws.io server from SharedWorker')
    
        if (waitingEmitQueue.length > 0) {
          waitingEmitQueue.forEach((payload) => {
            if (!ws) return
            ws.emit('message', payload)
          })
          waitingEmitQueue.length = 0
        }
        boardcast({
          type: 'connect',
          // @ts-expect-error
          payload: ws.id,
        })
      })
    
      ws.open()
      boardcast({
        type: 'sid',
        payload: ws.id,
      })
    }
    
    const ports = [] as MessagePort[]
    
    self.addEventListener('connect', (ev: any) => {
      const event = ev as MessageEvent
    
      const port = event.ports[0]
    
      ports.push(port)
    
      port.onmessage = (event) => {
        const { type, payload } = event.data
        console.log('get message from main', event.data)
    
        switch (type) {
          case 'config':
            setupIo(payload)
            break
          case 'emit':
            if (ws) {
              if (ws.connected) ws.emit('message', payload)
              else waitingEmitQueue.push(payload)
            }
            break
          case 'reconnect':
            if (ws) ws.open()
            break
          case 'init':
            port.postMessage({ type: 'ping' })
    
            if (ws) {
              if (ws.connected) port.postMessage({ type: 'connect' })
              port.postMessage({ type: 'sid', payload: ws.id })
            }
            break
          default:
            console.log('Unknown message type:', type)
        }
      }
    
      port.start()
    })
    
    function boardcast(payload: any) {
      console.log('[ws] boardcast', payload)
      ports.forEach((port) => {
        port.postMessage(payload)
      })
    }
    
    const waitingEmitQueue: any[] = []

    Worker 写完了,那么现在就要写主线程和 Worker 通信的代码了。

    先看看流程:

    这个内容只能在原文中查看哦

    具体的代码实现如下:

    这个内容只能在原文中查看哦
    interface WorkerSocket {
      sid: string
    }
    
    class SocketWorker {
      private socket: WorkerSocket | null = null
    
      worker: SharedWorker | null = null
    
      constructor() {
        if (isServerSide) return
        // @ts-expect-error
        import('./io.worker').then(({ default: SharedWorker }) => {
          if (isServerSide) return
          const worker = new SharedWorker()
    
          this.prepare(worker)
          this.worker = worker
        })
      }
    
      async getSid() {
        return this.socket?.sid
      }
    
      private setSid(sid: string) {
        this.socket = {
          ...this.socket,
          sid,
        }
      }
      bindMessageHandler = (worker: SharedWorker) => {
        worker.port.onmessage = (event: MessageEvent) => {
          const { data } = event
          const { type, payload } = data
    
          switch (type) {
            case 'ping': {
              worker?.port.postMessage({
                type: 'pong',
              })
              console.log('[ws worker] pong')
              break
            }
            case 'connect': {
              window.dispatchEvent(new SocketConnectedEvent())
              setSocketIsConnect(true)
    
              const sid = payload
              this.setSid(sid)
              break
            }
            case 'disconnect': {
              window.dispatchEvent(new SocketDisconnectedEvent())
              setSocketIsConnect(false)
              break
            }
            case 'sid': {
              const sid = payload
              this.setSid(sid)
              break
            }
            case 'message': {
              const typedPayload = payload as string | Record<'type' | 'data', any>
              if (typeof typedPayload !== 'string') {
                return this.handleEvent(
                  typedPayload.type,
                  camelcaseKeys(typedPayload.data),
                )
              }
              const { data, type } = JSON.parse(typedPayload) as {
                data: any
                type: EventTypes
              }
              this.handleEvent(type, camelcaseKeys(data))
            }
          }
        }
      }
    
      prepare(worker: SharedWorker) {
        const gatewayUrlWithoutTrailingSlash = GATEWAY_URL.replace(/\/$/, '')
        this.bindMessageHandler(worker)
        worker.port.postMessage({
          type: 'config',
    
          payload: {
            url: `${gatewayUrlWithoutTrailingSlash}/web`,
          },
        })
    
        worker.port.start()
    
        worker.port.postMessage({
          type: 'init',
        })
      }
      handleEvent(type: EventTypes, data: any) {
        // Handle biz event
      }
    
      emit(event: SocketEmitEnum, payload: any) {
        this.worker?.port.postMessage({
          type: 'emit',
          payload: { type: event, payload },
        })
      }
    
      reconnect() {
        this.worker?.port.postMessage({
          type: 'reconnect',
        })
      }
    
      static shared = new SocketWorker()
    }
    
    export const socketWorker = SocketWorker.shared
    export type TSocketClient = SocketWorker

    那么就大功告成了,SocketWorker 基本还是从原有的 SocketClient 中抽象出来的,基本实现了相同的方法,所以在业务中使用没有太大的变化,迁移过程也非常平滑。

    对了,这次的重构位于 Shiro/550abd。可供参考。

    完成的实现位于:

    https://github.com/Innei/Shiro/blob/c399372f7cc1bff55f842ff68342ffb0071b5ae6/src/socket/io.worker.ts

    看完了?说点什么呢



沪ICP备19023445号-2号
友情链接