Brijesh's Git Server — sse-from-child-process @ a71941568ab0ca6905b5bdeaf9097f4a1d83ce23

create new task specific route and handle page refresh
Brijesh Wawdhane ops@brijesh.dev
Fri, 14 Mar 2025 18:00:27 +0530
commit

a71941568ab0ca6905b5bdeaf9097f4a1d83ce23

parent

03cd45024cff42c5fbba573a358a5a28bfc2e07e

A client/app/[taskId]/page.tsx

@@ -0,0 +1,107 @@

+// 'use client'; + +// import { useState, useEffect, use } from 'react' +// import { useRouter } from 'next/navigation' + +// export default function TaskPage({ params }: { params: Promise<{ taskId: string }> }) { +// const [updates, setUpdates] = useState<string[]>([]) +// const router = useRouter() +// const taskId = use(params).taskId + +// useEffect(() => { +// // Clear updates on mount +// setUpdates([]) + +// // Start listening for updates +// const eventSource = new EventSource(`http://localhost:4000/api/task-updates/${taskId}`) + +// eventSource.onmessage = (event) => { +// const update = JSON.parse(event.data) +// // Only add the update if it's not already in the array +// if (!updates.includes(update.message)) { +// setUpdates(prev => [...prev, update.message]) +// } + +// if (update.progress === 100) { +// eventSource.close() +// } +// } + +// eventSource.onerror = () => { +// eventSource.close() +// } + +// return () => { +// eventSource.close() +// } +// }, [taskId]) + +// return ( +// <div style={{ padding: '20px' }}> +// <h1>Task Details</h1> +// <p>Task ID: {taskId}</p> + +// <div style={{ marginTop: '20px' }}> +// <h2>Updates:</h2> +// {updates.map((update, index) => ( +// <p key={index}>{update}</p> +// ))} +// </div> +// </div> +// ) +// } + +'use client'; + +import { useState, useEffect, use } from 'react' +import { useRouter } from 'next/navigation' + +export default function TaskPage({ params }: { params: Promise<{ taskId: string }> }) { + const [updates, setUpdates] = useState<string[]>([]) + const router = useRouter() + const taskId = use(params).taskId + + useEffect(() => { + // Clear updates only when taskId changes, not on every mount + setUpdates([]) + + const eventSource = new EventSource(`http://localhost:4000/api/task-updates/${taskId}`) + + eventSource.onmessage = (event) => { + const update = JSON.parse(event.data) + setUpdates(prev => { + // Check if this message already exists in our updates + if (prev.includes(update.message)) { + return prev + } + return [...prev, update.message] + }) + + if (update.progress === 100) { + eventSource.close() + } + } + + eventSource.onerror = () => { + eventSource.close() + } + + return () => { + eventSource.close() + } + }, [taskId]) // Only taskId in dependencies, not updates + + return ( + <div style={{ padding: '20px' }}> + <h1>Task Details</h1> + <p>Task ID: {taskId}</p> + + <div style={{ marginTop: '20px' }}> + <h2>Updates:</h2> + {updates.map((update, index) => ( + <p key={index}>{update}</p> + ))} + </div> + </div> + ) +}
M client/app/page.tsxclient/app/page.tsx

@@ -1,53 +1,35 @@

'use client'; -import { useState, useEffect } from 'react' +import { useState } from 'react' +import { useRouter } from 'next/navigation' export default function Home() { - const [taskId, setTaskId] = useState<string | null>(null) - const [updates, setUpdates] = useState<string[]>([]) + const [loading, setLoading] = useState(false) + const router = useRouter() const startTask = async () => { try { + setLoading(true) const response = await fetch('http://localhost:4000/api/start-task', { method: 'POST' }) const data = await response.json() - setTaskId(data.taskId) - setUpdates([`Task started with ID: ${data.taskId}`]) - // Start listening for updates - const eventSource = new EventSource(`http://localhost:4000/api/task-updates/${data.taskId}`) - - eventSource.onmessage = (event) => { - const update = JSON.parse(event.data) - setUpdates(prev => [...prev, update.message]) - - if (update.progress === 100) { - eventSource.close() - } - } - - eventSource.onerror = () => { - eventSource.close() - } + // Redirect to the task details page + router.push(`/${data.taskId}`) } catch (error) { console.error('Error starting task:', error) + } finally { + setLoading(false) } } return ( <div style={{ padding: '20px' }}> <h1>SSE Long Task Example</h1> - <button onClick={startTask} disabled={!!taskId}> - Start Long Task + <button onClick={startTask} disabled={loading}> + {loading ? 'Creating task...' : 'Start Long Task'} </button> - - <div style={{ marginTop: '20px' }}> - <h2>Updates:</h2> - {updates.map((update, index) => ( - <p key={index}>{update}</p> - ))} - </div> </div> ) }
M server/dist/index.jsserver/dist/index.js

@@ -12,47 +12,73 @@ app.use((0, cors_1.default)());

app.use(express_1.default.json()); const PORT = 4000; const taskProcesses = {}; -// Endpoint to start a long running task +// Cleanup any stale tasks older than 1 minute +setInterval(() => { + const now = Date.now(); + Object.entries(taskProcesses).forEach(([taskId, task]) => { + if (now - task.startTime > 60000) { // 60 seconds + task.process.kill(); + delete taskProcesses[taskId]; + } + }); +}, 30000); // Check every 30 seconds +app.get("/healthcheck", (req, res) => { + res.status(200).json({ status: "healthy" }); +}); app.post('/api/start-task', (req, res) => { const taskId = Date.now().toString(); - // Create a child process - note the path is relative to the dist directory const childProcess = (0, child_process_1.fork)(path_1.default.join(__dirname, 'worker.js')); taskProcesses[taskId] = { process: childProcess, - clients: [] + clients: [], + startTime: Date.now(), + currentProgress: 0, + isComplete: false }; - // Start the task in child process + // Listen for updates from child process + childProcess.on('message', (message) => { + if (taskProcesses[taskId]) { + taskProcesses[taskId].currentProgress = message.progress; + taskProcesses[taskId].isComplete = message.progress >= 100; + // Broadcast to all clients + taskProcesses[taskId].clients.forEach(client => { + client.write(`data: ${JSON.stringify(message)}\n\n`); + }); + } + }); childProcess.send({ taskId }); res.json({ taskId, message: 'Task started' }); }); -// SSE endpoint to receive updates app.get('/api/task-updates/:taskId', (req, res) => { const { taskId } = req.params; + if (!taskProcesses[taskId]) { + return res.status(404).json({ error: 'Task not found' }); + } // SSE setup res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive' }); - // Keep connection alive const keepAlive = setInterval(() => { res.write(':\n\n'); }, 30000); - // Add this client to the list of clients for this task - if (taskProcesses[taskId]) { - taskProcesses[taskId].clients.push(res); - // Listen for messages from child process - taskProcesses[taskId].process.on('message', (message) => { - res.write(`data: ${JSON.stringify(message)}\n\n`); - }); - } + // Send current progress immediately + const currentProgress = taskProcesses[taskId].currentProgress; + res.write(`data: ${JSON.stringify({ + taskId, + progress: currentProgress, + message: `Task ${taskId} is ${currentProgress}% complete` + })}\n\n`); + // Add this client to the list + taskProcesses[taskId].clients.push(res); // Clean up on client disconnect req.on('close', () => { clearInterval(keepAlive); if (taskProcesses[taskId]) { taskProcesses[taskId].clients = taskProcesses[taskId].clients.filter(client => client !== res); - // If no clients are listening, cleanup the task - if (taskProcesses[taskId].clients.length === 0) { + // Only cleanup task if it's complete and has no clients + if (taskProcesses[taskId].clients.length === 0 && taskProcesses[taskId].isComplete) { taskProcesses[taskId].process.kill(); delete taskProcesses[taskId]; }
M server/dist/worker.jsserver/dist/worker.js

@@ -1,9 +1,8 @@

"use strict"; process.on('message', ({ taskId }) => { let progress = 0; - // Simulate work with progress updates const interval = setInterval(() => { - progress += 2; + progress += 0.1; if (process.send) { process.send({ taskId,

@@ -22,5 +21,5 @@ });

} process.exit(); } - }, 1000); // Send update every 1 seconds + }, 1000); });
M server/src/index.tsserver/src/index.ts

@@ -1,4 +1,4 @@

-import express, { Response } from 'express' +import express, { Response, Request } from 'express' import { fork } from 'child_process' import cors from 'cors' import path from 'path'

@@ -12,33 +12,67 @@

interface TaskProcess { [key: string]: { process: ReturnType<typeof fork> - clients: express.Response<any, Record<string, any>>[] // Updated type + clients: express.Response<any, Record<string, any>>[] + startTime: number + currentProgress: number + isComplete: boolean } } const taskProcesses: TaskProcess = {} -// Endpoint to start a long running task -app.post('/api/start-task', (req, res) => { +// Cleanup any stale tasks older than 1 minute +setInterval(() => { + const now = Date.now() + Object.entries(taskProcesses).forEach(([taskId, task]) => { + if (now - task.startTime > 60000) { // 60 seconds + task.process.kill() + delete taskProcesses[taskId] + } + }) +}, 30000) // Check every 30 seconds + +app.get("/healthcheck", (req: express.Request, res: express.Response) => { + res.status(200).json({ status: "healthy" }) +}) + +app.post('/api/start-task', (req: express.Request, res: express.Response) => { const taskId = Date.now().toString() - // Create a child process - note the path is relative to the dist directory const childProcess = fork(path.join(__dirname, 'worker.js')) taskProcesses[taskId] = { process: childProcess, - clients: [] + clients: [], + startTime: Date.now(), + currentProgress: 0, + isComplete: false } - // Start the task in child process + // Listen for updates from child process + childProcess.on('message', (message: { progress: number, taskId: string }) => { + if (taskProcesses[taskId]) { + taskProcesses[taskId].currentProgress = message.progress + taskProcesses[taskId].isComplete = message.progress >= 100 + + // Broadcast to all clients + taskProcesses[taskId].clients.forEach(client => { + client.write(`data: ${JSON.stringify(message)}\n\n`) + }) + } + }) + childProcess.send({ taskId }) res.json({ taskId, message: 'Task started' }) }) -// SSE endpoint to receive updates -app.get('/api/task-updates/:taskId', (req, res: express.Response) => { // Explicitly type the response +app.get('/api/task-updates/:taskId', (req: any, res: any) => { const { taskId } = req.params + + if (!taskProcesses[taskId]) { + return res.status(404).json({ error: 'Task not found' }) + } // SSE setup res.writeHead(200, {

@@ -47,31 +81,29 @@ 'Cache-Control': 'no-cache',

'Connection': 'keep-alive' }) - // Keep connection alive const keepAlive = setInterval(() => { res.write(':\n\n') }, 30000) - // Add this client to the list of clients for this task - if (taskProcesses[taskId]) { - taskProcesses[taskId].clients.push(res) - - // Listen for messages from child process - taskProcesses[taskId].process.on('message', (message) => { - res.write(`data: ${JSON.stringify(message)}\n\n`) - }) - } + // Send current progress immediately + const currentProgress = taskProcesses[taskId].currentProgress + res.write(`data: ${JSON.stringify({ + taskId, + progress: currentProgress, + message: `Task ${taskId} is ${currentProgress}% complete` + })}\n\n`) + + // Add this client to the list + taskProcesses[taskId].clients.push(res) // Clean up on client disconnect req.on('close', () => { clearInterval(keepAlive) if (taskProcesses[taskId]) { - taskProcesses[taskId].clients = taskProcesses[taskId].clients.filter(client => - client !== res - ) + taskProcesses[taskId].clients = taskProcesses[taskId].clients.filter(client => client !== res) - // If no clients are listening, cleanup the task - if (taskProcesses[taskId].clients.length === 0) { + // Only cleanup task if it's complete and has no clients + if (taskProcesses[taskId].clients.length === 0 && taskProcesses[taskId].isComplete) { taskProcesses[taskId].process.kill() delete taskProcesses[taskId] }

@@ -81,4 +113,4 @@ })

app.listen(PORT, () => { console.log(`Server running on port ${PORT}`) -}) +})
M server/src/worker.tsserver/src/worker.ts

@@ -1,9 +1,12 @@

-process.on('message', ({ taskId }) => { +interface TaskMessage { + taskId: string +} + +process.on('message', ({ taskId }: TaskMessage) => { let progress = 0 - // Simulate work with progress updates const interval = setInterval(() => { - progress += 2 + progress += 0.1 if (process.send) { process.send({

@@ -24,5 +27,5 @@ })

} process.exit() } - }, 1000) // Send update every 1 seconds -}) + }, 1000) +})