754 use shared EventSource (#859)

* Use SharedWorker based SSE connection sharing in supported browsers

* minor

* Improve SSE cleanup and parsing

---------

Co-authored-by: xiantang <zhujingdi1998@gmail.com>
This commit is contained in:
Halvdan Hoem Grelland
2026-01-19 14:34:45 +01:00
committed by GitHub
parent 122d83caf7
commit 404e2a83c9
3 changed files with 208 additions and 13 deletions
+14 -2
View File
@@ -13,8 +13,13 @@ import (
"time"
)
//go:embed proxy.js
var ProxyScript string
var (
//go:embed proxy.js
ProxyScript string
//go:embed worker.js
WorkerScript string
)
type Streamer interface {
AddSubscriber() *Subscriber
@@ -50,6 +55,7 @@ func NewProxy(cfg *cfgProxy) *Proxy {
func (p *Proxy) Run() {
http.HandleFunc("/", p.proxyHandler)
http.HandleFunc("/__air_internal/sse", p.reloadHandler)
http.HandleFunc("GET /__air_internal/worker.js", p.workerScriptHandler)
if err := p.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatal(p.Stop())
}
@@ -229,6 +235,12 @@ func (p *Proxy) reloadHandler(w http.ResponseWriter, r *http.Request) {
}
}
func (p *Proxy) workerScriptHandler(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/javascript")
w.WriteHeader(http.StatusOK)
_, _ = io.WriteString(w, WorkerScript)
}
func (p *Proxy) Stop() error {
p.stream.Stop()
return p.server.Close()
+72 -11
View File
@@ -1,18 +1,79 @@
(() => {
const eventSource = new EventSource("/__air_internal/sse");
let worker = null;
window.addEventListener('beforeunload', function() {
eventSource.close();
})
const disconnectWorker = () => {
if (worker) {
worker.port.postMessage('disconnect');
}
};
eventSource.addEventListener('reload', () => {
location.reload();
});
// Try to use SharedWorker for shared SSE connection across all windows
if (window.SharedWorker) {
try {
worker = new SharedWorker('/__air_internal/worker.js', { name: 'air-sse-worker' });
worker.port.onmessage = (event) => {
const message = event.data;
eventSource.addEventListener('build-failed', (event) => {
const data = JSON.parse(event.data);
showErrorInModal(data);
});
switch (message.type) {
case 'reload':
location.reload();
break;
case 'build-failed':
const data = parseBuildFailed(message.data);
showErrorInModal(data);
break;
}
};
worker.port.start();
// Gracefully disconnect from SharedWorker when the window is closed
window.addEventListener('beforeunload', disconnectWorker);
window.addEventListener('pagehide', disconnectWorker);
} catch (e) {
// Setting up SharedWorker failed, so fall back to per-window EventSource
console.warn('air: SharedWorker setup failed, falling back to EventSource', e);
worker = null;
}
}
// SharedWorker is not available or failed somehow. Use per-window EventSource as fallback
if (!worker) {
const eventSource = new EventSource("/__air_internal/sse");
window.addEventListener('beforeunload', function () {
eventSource.close();
});
window.addEventListener('pagehide', function () {
eventSource.close();
});
eventSource.addEventListener('reload', () => {
location.reload();
});
eventSource.addEventListener('build-failed', (event) => {
const data = parseBuildFailed(event.data);
showErrorInModal(data);
});
}
function parseBuildFailed(raw) {
try {
const parsed = JSON.parse(raw);
return {
error: parsed.error ?? "Build failed",
command: parsed.command ?? "",
output: parsed.output ?? "",
};
} catch (e) {
console.warn("air: failed to parse build-failed payload", e);
return {
error: "Build failed",
command: "",
output: String(raw),
};
}
}
function showErrorInModal(data) {
document.body.insertAdjacentHTML(`beforeend`, `
+122
View File
@@ -0,0 +1,122 @@
(() => {
const ports = new Set();
let sse = null;
let terminationTimer = null;
let reconnectTimer = null;
let reconnectAttempts = 0;
self.onconnect = (event) => {
const port = event.ports[0];
ports.add(port);
if (terminationTimer) { // We're still alive
cancelTermination();
}
// Initialize the EventSource once
if (!sse) {
initSSE();
}
// Handle graceful disconnect message from port
port.onmessage = (e) => {
if (e.data === 'disconnect') {
ports.delete(port);
if (ports.size === 0) {
scheduleTermination();
}
}
};
port.start();
};
function initSSE() {
if (sse) {
return;
}
sse = new EventSource("/__air_internal/sse");
sse.addEventListener('reload', () => {
broadcast({ type: 'reload' });
});
sse.addEventListener('build-failed', (e) => {
broadcast({ type: 'build-failed', data: e.data });
});
sse.onopen = () => {
reconnectAttempts = 0;
};
sse.onerror = () => {
if (sse) {
sse.close();
sse = null;
}
scheduleReconnect();
};
}
function scheduleReconnect() {
if (reconnectTimer || ports.size === 0) {
return;
}
const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), 10000);
reconnectAttempts += 1;
reconnectTimer = setTimeout(() => {
reconnectTimer = null;
if (ports.size === 0) {
return;
}
initSSE();
}, delay);
}
function clearReconnect() {
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
reconnectAttempts = 0;
}
function broadcast(data) {
ports.forEach(port => {
try {
port.postMessage(data)
} catch (e) {
// This port is dead so we remove it. If this was the last port, schedule termination.
ports.delete(port);
if (ports.size === 0) {
scheduleTermination();
}
}
});
}
function cancelTermination() {
clearTimeout(terminationTimer);
terminationTimer = null;
}
function scheduleTermination() {
if (terminationTimer) { // Already scheduled
return
}
clearReconnect();
terminationTimer = setTimeout(() => {
if (sse) {
sse.close();
sse = null;
}
clearReconnect();
self.close();
}, 3000);
}
})();