sse.js 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. const { status, header } = require('node-res')
  2. class SSE {
  3. constructor () {
  4. this.subscriptions = new Set()
  5. this.counter = 0
  6. }
  7. // Subscribe to a channel and set initial headers
  8. subscribe (req, res) {
  9. req.socket.setTimeout(0)
  10. status(res, 200)
  11. header(res, 'Content-Type', 'text/event-stream')
  12. header(res, 'Cache-Control', 'no-cache')
  13. if (req.httpVersion !== '2.0') {
  14. header(res, 'Connection', 'keep-alive')
  15. }
  16. this.subscriptions.add(res)
  17. res.on('close', () => this.subscriptions.delete(res))
  18. this.broadcast('ready', {})
  19. }
  20. // Publish event and data to all connected clients
  21. broadcast (event, data) {
  22. this.counter++
  23. // Do console.log(this.subscriptions.size) to see, if there are any memory leaks
  24. for (const res of this.subscriptions) {
  25. this.clientBroadcast(res, event, data)
  26. }
  27. }
  28. // Publish event and data to a given response object
  29. clientBroadcast (res, event, data) {
  30. res.write(`id: ${this.counter}\n`)
  31. res.write('event: message\n')
  32. res.write(`data: ${JSON.stringify({ event, ...data })}\n\n`)
  33. }
  34. }
  35. module.exports = SSE