useFetchEventSource

useFetchEventSource 是一個 React Hook,允許你使用 POST 等 HTTP 方法訂閱 EventSource 並實時接收更新。

服务端代码示范:

SSE Server Implementation
const express = require('express');
const cors = require('cors');
const bodyParser = require('body-parser');

const app = express();
const PORT = 3001;

// 存储所有活跃的 SSE 連接
const clients = new Map();
let messageCount = 0;

app.use(cors({
  origin: 'http://localhost:3000',
  methods: ['GET', 'POST', 'OPTIONS'],
  allowedHeaders: ['Content-Type', 'Cache-Control', 'Connection', 'Accept', 'Authorization'],
  exposedHeaders: ['Content-Type'],
  credentials: true,
  maxAge: 86400
}));

app.options('*', cors());

// POST /events 使用原始请求處理
app.post('/events', async (req, res) => {
  // 1. 立即设置必要的头部
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  res.setHeader('X-Accel-Buffering', 'no');
  res.setHeader('Access-Control-Allow-Origin', 'http://localhost:3000');
  res.setHeader('Access-Control-Allow-Credentials', 'true');
  
  // 2. 读取和解析请求体
  let body = '';
  for await (const chunk of req) {
    body += chunk;
  }
  
  // 3. 解析配置
  let config;
  try {
    config = body ? JSON.parse(body) : {};
  } catch (e) {
    config = {};
  }
  
  const channel = config.channel || 'default';
  const interval = parseInt(config.interval) || 3000;
  
  // 4. 禁用请求超时
  req.socket.setTimeout(0);
  req.socket.setNoDelay(true);
  req.socket.setKeepAlive(true);
  
  // 5. 開始發送數據
  console.log(`New client connected to channel: ${channel}`);
  
  // 6. 将連接添加到对应频道的客户端集合
  if (!clients.has(channel)) {
    clients.set(channel, new Set());
  }
  clients.get(channel).add(res);
  
  const totalClients = Array.from(clients.values())
    .reduce((sum, set) => sum + set.size, 0);
  console.log(`Client connected to channel ${channel}. Total clients: ${totalClients}`);
  
  // 7. 发送連接成功消息
  const sendEvent = (data, eventType = null) => {
    if (eventType) {
      res.write(`event: ${eventType}\n`);
    }
    res.write(`id: ${Date.now()}\n`);
    res.write(`data: ${JSON.stringify(data)}\n\n`);
  };
  
  // 8. 发送初始消息
  try {
    sendEvent({ 
      message: 'Connected to SSE stream',
      channel: channel,
      time: new Date().toISOString() 
    }, 'connected');
    
    // 9. 设置定期消息发送
    let messageCounter = 0;
    const intervalId = setInterval(() => {
      messageCounter++;
      messageCount++;
      
      try {
        sendEvent({
          id: messageCount,
          count: messageCounter,
          channel: channel,
          time: new Date().toISOString(),
          message: `Channel ${channel} message ${messageCounter}`
        });
      } catch (error) {
        console.error(`Error sending message to channel ${channel}:`, error);
        cleanup();
      }
    }, interval);
    
    // 10. 设置心跳
    const heartbeatId = setInterval(() => {
      try {
        res.write(':\n\n');
      } catch (error) {
        console.error(`Heartbeat error on channel ${channel}:`, error);
        cleanup();
      }
    }, 15000);
    
    // 11. 清理函數
    const cleanup = () => {
      clearInterval(intervalId);
      clearInterval(heartbeatId);
      
      const channelClients = clients.get(channel);
      if (channelClients) {
        channelClients.delete(res);
        if (channelClients.size === 0) {
          clients.delete(channel);
        }
      }
      
      const remainingClients = Array.from(clients.values())
        .reduce((sum, set) => sum + set.size, 0);
      console.log(`Client disconnected from channel ${channel}. Total clients: ${remainingClients}`);
      
      try {
        res.end();
      } catch (error) {
        console.error('Error ending response:', error);
      }
    };
    
    // 12. 设置連接关闭處理
    req.on('close', cleanup);
    req.on('end', cleanup);
    res.on('close', cleanup);
    res.on('error', cleanup);
    
  } catch (error) {
    console.error(`Error in SSE connection for channel ${channel}:`, error);
    res.end();
  }
});

// GET SSE 端点處理函數
function handleGETConnection(req, res, channel = 'default', interval = 3000) {
  // 设置 SSE 头部
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'X-Accel-Buffering': 'no',
    'Access-Control-Allow-Origin': 'http://localhost:3000',
    'Access-Control-Allow-Credentials': 'true'
  });

  console.log(`New client connected to channel: ${channel}`);

  // 发送連接成功消息
  const sendEvent = (data, eventType = null) => {
    if (eventType) {
      res.write(`event: ${eventType}\n`);
    }
    res.write(`id: ${Date.now()}\n`);
    res.write(`data: ${JSON.stringify(data)}\n\n`);
  };

  // 发送初始連接消息
  sendEvent({ 
    message: 'Connected to SSE stream',
    channel: channel,
    time: new Date().toISOString() 
  }, 'connected');

  // 将連接添加到对应频道的客户端集合
  if (!clients.has(channel)) {
    clients.set(channel, new Set());
  }
  clients.get(channel).add(res);
  
  const totalClients = Array.from(clients.values())
    .reduce((sum, set) => sum + set.size, 0);
  console.log(`Client connected to channel ${channel}. Total clients: ${totalClients}`);

  // 定期发送消息
  let messageCounter = 0;
  const intervalId = setInterval(() => {
    messageCounter++;
    messageCount++;
    sendEvent({
      id: messageCount,
      count: messageCounter,
      channel: channel,
      time: new Date().toISOString(),
      message: `Channel ${channel} message ${messageCounter}`
    });
  }, interval);

  // 心跳檢測
  const heartbeatId = setInterval(() => {
    res.write(':\n\n');
  }, 15000);

  // 清理函數
  const cleanup = () => {
    clearInterval(intervalId);
    clearInterval(heartbeatId);
    const channelClients = clients.get(channel);
    if (channelClients) {
      channelClients.delete(res);
      if (channelClients.size === 0) {
        clients.delete(channel);
      }
    }
    const remainingClients = Array.from(clients.values())
      .reduce((sum, set) => sum + set.size, 0);
    console.log(`Client disconnected from channel ${channel}. Total clients: ${remainingClients}`);
    try {
      res.end();
    } catch (error) {
      console.error('Error ending response:', error);
    }
  };

  // 監聽連接关闭
  req.on('close', cleanup);
  res.on('close', cleanup);
  res.on('error', cleanup);
}

// GET SSE 端点
app.get('/events', (req, res) => {
  const channel = req.query.channel || 'default';
  const interval = parseInt(req.query.interval) || 3000;
  handleGETConnection(req, res, channel, interval);
});

// 广播消息端点
app.post('/broadcast', bodyParser.json(), (req, res) => {
  const { message, channel, eventType = 'broadcast' } = req.body;
  
  let targetClients = new Set();
  if (channel) {
    targetClients = clients.get(channel) || new Set();
    console.log(`Broadcasting message to channel ${channel} (${targetClients.size} clients):`, message);
  } else {
    targetClients = new Set(
      Array.from(clients.values())
        .flatMap(channelClients => Array.from(channelClients))
    );
    console.log(`Broadcasting message to all channels (${targetClients.size} clients):`, message);
  }
  
  let successCount = 0;
  for (const client of targetClients) {
    try {
      client.write(`event: ${eventType}\n`);
      client.write(`id: ${Date.now()}\n`);
      client.write(`data: ${JSON.stringify({ 
        message,
        channel: channel || 'all',
        time: new Date().toISOString()
      })}\n\n`);
      successCount++;
    } catch (error) {
      console.error('Error broadcasting to client:', error);
    }
  }

  res.json({ 
    success: true, 
    clientCount: targetClients.size,
    successfulBroadcasts: successCount,
    channel: channel || 'all',
    message: 'Broadcast sent successfully' 
  });
});

// 狀態端点
app.get('/status', (req, res) => {
  const channelStats = {};
  clients.forEach((clientSet, channel) => {
    channelStats[channel] = clientSet.size;
  });

  res.json({
    activeConnections: Array.from(clients.values())
      .reduce((sum, set) => sum + set.size, 0),
    channelStats,
    messageCount,
    uptime: process.uptime()
  });
});

// 错误處理中间件
app.use((err, req, res, next) => {
  console.error('Server error:', err);
  res.status(500).json({
    error: 'Internal Server Error',
    message: err.message
  });
});

// 启动服务器
app.listen(PORT, () => {
  console.log(`SSE Server running at http://localhost:${PORT}`);
  console.log('Available endpoints:');
  console.log(`- GET  /events?channel={channel}&interval={interval} - SSE stream`);
  console.log(`- POST /events - SSE stream with body params`);
  console.log(`- POST /broadcast - Broadcast message to all clients or specific channel`);
  console.log(`- GET  /status - Server status`);
});

示例

基础用法

Live Editor
function Demo() {
 const { data, status } = useFetchEventSource("https://broad-scene-1112.ploomberapp.io/stream", {
    openWhenHidden: false
  });

  return (
    <div>
      <div>狀態: {status}</div>
      <div>数据: {JSON.stringify(data)}</div>
    </div>
  );
}
Result

POST 请求示例

Live Editor
function Demo() {
  const { status, data, error, close, open } = useFetchEventSource(
    "http://localhost:3001/events",
    {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        Accept: "text/event-stream",
        "Cache-Control": "no-cache",
        Connection: "keep-alive",
      },
      immediate: false, // 不立即連接
      body: JSON.stringify({
        channel: "custom-channel", // 訂閱特定频道
        interval: 2000 // 消息间隔(毫秒)
      }),
    }
  );

  return (
    <div>
      <div>狀態: {status}</div>
      <div>数据: {JSON.stringify(data)}</div>
      <div>错误: {error?.message}</div>
      <button onClick={open}>开启</button>
      <button onClick={close}>关闭</button>
    </div>
  );
}
Result

事件處理示例

Live Editor
function Demo() {
  const { data, status } = useFetchEventSource("http://localhost:3001/events", {
    onOpen: () => {
      console.log("連接已建立");
    },
    onMessage: (event) => {
      console.log("收到新消息:", event.data);
    },
    onError: (error) => {
      console.error("連接错误:", error);
      return 5000; // 5秒后重试
    },
    onClose: () => {
      console.log("連接已关闭");
    }
  });

  return (
    <div>
      <div>連接狀態: {status}</div>
      <div>最新消息: {JSON.stringify(data)}</div>
    </div>
  );
}
Result

自动重连示例

Live Editor
function Demo() {
  const { status, data } = useFetchEventSource("http://localhost:3001/events", {
    autoReconnect: {
      retries: 3,     // 最大重试次数
      delay: 1000,    // 重试间隔(毫秒)
      onFailed: () => {
        console.log("三次重试后連接失败");
      }
    }
  });

  return (
    <div>
      <div>連接狀態: {status}</div>
      <div>数据: {JSON.stringify(data)}</div>
    </div>
  );
}
Result

多频道示例

Live Editor
function Demo() {
  const [channel, setChannel] = useState('default');
  
  const { data, close, open } = useFetchEventSource(
    "http://localhost:3001/events",
    {
      method: "POST",
      immediate: false,
      body: JSON.stringify({ channel }),
      onMessage: (event) => {
        console.log(`收到来自 ${channel} 的消息:`, event);
      }
    }
  );

  const switchChannel = (newChannel) => {
    close(); // 关闭当前連接
    setChannel(newChannel);
    open();  // 开启新連接
  };

  return (
    <div>
      <select 
        value={channel} 
        onChange={(e) => switchChannel(e.target.value)}
      >
        <option value="default">默认频道</option>
        <option value="news">新闻频道</option>
        <option value="alerts">提醒频道</option>
      </select>
      <div>当前频道: {channel}</div>
      <div>最新消息: {data?.message}</div>
    </div>
  );
}
Result

API

UseFetchEventSourceStatus

Type

export type UseFetchEventSourceStatus = 'CONNECTING' | 'CONNECTED' | 'DISCONNECTED'

UseFetchEventSourceAutoReconnectOptions

參數名描述類型預設值
retries重试次数,如果是函数,会调用来判断是否重试number | (() => boolean)-
delay重连前的延迟时间(毫秒)number-
onFailed重连失败时的回调() => void-

UseFetchEventSourceOptions

參數名描述類型預設值
methodHTTP 请求方法string-
headers请求头Record<string, string>-
bodyPOST 请求的请求体any-
withCredentials使用凭证boolean-
immediate立即打开连接,默认打开boolean-
autoReconnect连接断开时自动重连UseFetchEventSourceAutoReconnectOptions-
onOpen连接打开时的回调() => void-
onMessage接收到消息时的回调(event: UseFetchEventSourceMessage) => void-
onError发生错误时的回调,返回数字表示多少毫秒后重试(error: Error) => number | void | null | undefined-
onClose连接关闭时的回调() => void-

UseFetchEventSourceMessage

參數名描述類型預設值
id事件 IDstring | null (必填)-
event事件类型string | null (必填)-
data事件数据string (必填)-

UseFetchEventSourceReturn

參數名描述類型預設值
data接收到的数据string | null (必填)-
error发生的错误Error | null (必填)-
status连接的状态UseFetchEventSourceStatus (必填)-
lastEventId最后的事件 IDstring | null (必填)-
event事件名string | null (必填)-
close关闭连接() => void (必填)-
open打开连接() => void (必填)-

UseFetchEventSource

Returns

UseFetchEventSourceReturn

Arguments

參數名描述類型預設值
url服务器发送事件的 URLstring | URL (必填)-
optionsEventSource 选项UseFetchEventSourceOptions | undefined-