fix: support filesystem change notifications for multiple connections

This commit is contained in:
James Murdza 2024-10-25 19:03:13 -06:00
parent eb973e0f83
commit 90ea90f610
5 changed files with 102 additions and 72 deletions

View File

@ -0,0 +1,50 @@
import { Socket } from "socket.io"
class Counter {
private count: number = 0
increment() {
this.count++
}
decrement() {
this.count = Math.max(0, this.count - 1)
}
getValue(): number {
return this.count
}
}
// Owner Connection Management
export class ConnectionManager {
private ownerConnections: Record<string, Counter> = {}
private sockets: Record<string, Set<Socket>> = {}
ownerConnected(sandboxId: string) {
this.ownerConnections[sandboxId] ??= new Counter()
this.ownerConnections[sandboxId].increment()
}
ownerDisconnected(sandboxId: string) {
this.ownerConnections[sandboxId]?.decrement()
}
ownerIsConnected(sandboxId: string): boolean {
return this.ownerConnections[sandboxId]?.getValue() > 0
}
addConnectionForSandbox(socket: Socket, sandboxId: string) {
this.sockets[sandboxId] ??= new Set()
this.sockets[sandboxId].add(socket)
}
removeConnectionForSandbox(socket: Socket, sandboxId: string) {
this.sockets[sandboxId]?.delete(socket)
}
connectionsForSandbox(sandboxId: string): Set<Socket> {
return this.sockets[sandboxId] ?? new Set();
}
}

View File

@ -50,13 +50,13 @@ export class FileManager {
public fileData: TFileData[] public fileData: TFileData[]
private fileWatchers: WatchHandle[] = [] private fileWatchers: WatchHandle[] = []
private dirName = "/home/user/project" private dirName = "/home/user/project"
private refreshFileList: (files: (TFolder | TFile)[]) => void private refreshFileList: ((files: (TFolder | TFile)[]) => void) | null
// Constructor to initialize the FileManager // Constructor to initialize the FileManager
constructor( constructor(
sandboxId: string, sandboxId: string,
sandbox: Sandbox, sandbox: Sandbox,
refreshFileList: (files: (TFolder | TFile)[]) => void refreshFileList: ((files: (TFolder | TFile)[]) => void) | null
) { ) {
this.sandboxId = sandboxId this.sandboxId = sandboxId
this.sandbox = sandbox this.sandbox = sandbox
@ -314,7 +314,9 @@ export class FileManager {
} }
// Tell the client to reload the file list // Tell the client to reload the file list
this.refreshFileList(this.files) if (event.type !== "chmod") {
this.refreshFileList?.(this.files)
}
} catch (error) { } catch (error) {
console.error( console.error(
`Error handling ${event.type} event for ${event.name}:`, `Error handling ${event.type} event for ${event.name}:`,

View File

@ -1,33 +0,0 @@
class Counter {
private count: number = 0
increment() {
this.count++
}
decrement() {
this.count = Math.max(0, this.count - 1)
}
getValue(): number {
return this.count
}
}
// Owner Connection Management
export class OwnerConnectionManager {
private connections: Record<string, Counter> = {}
ownerConnected(sandboxId: string) {
this.connections[sandboxId] ??= new Counter()
this.connections[sandboxId].increment()
}
ownerDisconnected(sandboxId: string) {
this.connections[sandboxId]?.decrement()
}
ownerIsConnected(sandboxId: string): boolean {
return this.connections[sandboxId]?.getValue() > 0
}
}

View File

@ -1,5 +1,5 @@
import { Sandbox as E2BSandbox } from "e2b" import { Sandbox as E2BSandbox } from "e2b"
import { Socket } from 'socket.io' import { Socket } from "socket.io"
import { AIWorker } from "./AIWorker" import { AIWorker } from "./AIWorker"
import { CONTAINER_TIMEOUT } from "./constants" import { CONTAINER_TIMEOUT } from "./constants"
import { DokkuClient } from "./DokkuClient" import { DokkuClient } from "./DokkuClient"
@ -33,36 +33,35 @@ type ServerContext = {
aiWorker: AIWorker; aiWorker: AIWorker;
dokkuClient: DokkuClient | null; dokkuClient: DokkuClient | null;
gitClient: SecureGitClient | null; gitClient: SecureGitClient | null;
socket: Socket;
}; };
export class Sandbox { export class Sandbox {
// Sandbox properties:
sandboxId: string;
fileManager: FileManager | null; fileManager: FileManager | null;
terminalManager: TerminalManager | null; terminalManager: TerminalManager | null;
container: E2BSandbox | null; container: E2BSandbox | null;
// Server context:
dokkuClient: DokkuClient | null; dokkuClient: DokkuClient | null;
gitClient: SecureGitClient | null; gitClient: SecureGitClient | null;
aiWorker: AIWorker; aiWorker: AIWorker;
socket: Socket;
sandboxId: string;
userId: string;
isOwner: boolean;
constructor(sandboxId: string, userId: string, isOwner: boolean, { aiWorker, dokkuClient, gitClient, socket }: ServerContext) { constructor(sandboxId: string, { aiWorker, dokkuClient, gitClient }: ServerContext) {
// Sandbox properties:
this.sandboxId = sandboxId;
this.fileManager = null; this.fileManager = null;
this.terminalManager = null; this.terminalManager = null;
this.container = null; this.container = null;
this.sandboxId = sandboxId; // Server context:
this.userId = userId;
this.isOwner = isOwner;
this.aiWorker = aiWorker; this.aiWorker = aiWorker;
this.dokkuClient = dokkuClient; this.dokkuClient = dokkuClient;
this.gitClient = gitClient; this.gitClient = gitClient;
this.socket = socket;
} }
// Initializes the container for the sandbox environment // Initializes the container for the sandbox environment
async initializeContainer() { async initialize(
fileWatchCallback: ((files: (TFolder | TFile)[]) => void) | undefined
) {
// Acquire a lock to ensure exclusive access to the sandbox environment // Acquire a lock to ensure exclusive access to the sandbox environment
await lockManager.acquireLock(this.sandboxId, async () => { await lockManager.acquireLock(this.sandboxId, async () => {
// Check if a container already exists and is running // Check if a container already exists and is running
@ -90,14 +89,10 @@ export class Sandbox {
this.fileManager = new FileManager( this.fileManager = new FileManager(
this.sandboxId, this.sandboxId,
this.container, this.container,
(files: (TFolder | TFile)[]) => { fileWatchCallback ?? null
// Emit an event to the socket when files are loaded
this.socket.emit("loaded", files)
}
) )
// Initialize the file manager and emit the initial files // Initialize the file manager and emit the initial files
this.fileManager.initialize() await this.fileManager.initialize()
this.socket.emit("loaded", this.fileManager.files)
} }
} }
@ -113,12 +108,12 @@ export class Sandbox {
this.fileManager = null; this.fileManager = null;
} }
handlers() { handlers(connection: { userId: string, isOwner: boolean, socket: Socket }) {
// Handle heartbeat from a socket connection // Handle heartbeat from a socket connection
const handleHeartbeat: SocketHandler = (_: any) => { const handleHeartbeat: SocketHandler = (_: any) => {
// Only keep the sandbox alive if the owner is still connected // Only keep the sandbox alive if the owner is still connected
if (this.isOwner) { if (connection.isOwner) {
this.container?.setTimeout(CONTAINER_TIMEOUT) this.container?.setTimeout(CONTAINER_TIMEOUT)
} }
} }
@ -135,7 +130,7 @@ export class Sandbox {
// Handle saving a file // Handle saving a file
const handleSaveFile: SocketHandler = async ({ fileId, body }: any) => { const handleSaveFile: SocketHandler = async ({ fileId, body }: any) => {
await saveFileRL.consume(this.userId, 1); await saveFileRL.consume(connection.userId, 1);
return this.fileManager?.saveFile(fileId, body) return this.fileManager?.saveFile(fileId, body)
} }
@ -160,25 +155,25 @@ export class Sandbox {
// Handle creating a file // Handle creating a file
const handleCreateFile: SocketHandler = async ({ name }: any) => { const handleCreateFile: SocketHandler = async ({ name }: any) => {
await createFileRL.consume(this.userId, 1); await createFileRL.consume(connection.userId, 1);
return { "success": await this.fileManager?.createFile(name) } return { "success": await this.fileManager?.createFile(name) }
} }
// Handle creating a folder // Handle creating a folder
const handleCreateFolder: SocketHandler = async ({ name }: any) => { const handleCreateFolder: SocketHandler = async ({ name }: any) => {
await createFolderRL.consume(this.userId, 1); await createFolderRL.consume(connection.userId, 1);
return { "success": await this.fileManager?.createFolder(name) } return { "success": await this.fileManager?.createFolder(name) }
} }
// Handle renaming a file // Handle renaming a file
const handleRenameFile: SocketHandler = async ({ fileId, newName }: any) => { const handleRenameFile: SocketHandler = async ({ fileId, newName }: any) => {
await renameFileRL.consume(this.userId, 1) await renameFileRL.consume(connection.userId, 1)
return this.fileManager?.renameFile(fileId, newName) return this.fileManager?.renameFile(fileId, newName)
} }
// Handle deleting a file // Handle deleting a file
const handleDeleteFile: SocketHandler = async ({ fileId }: any) => { const handleDeleteFile: SocketHandler = async ({ fileId }: any) => {
await deleteFileRL.consume(this.userId, 1) await deleteFileRL.consume(connection.userId, 1)
return this.fileManager?.deleteFile(fileId) return this.fileManager?.deleteFile(fileId)
} }
@ -191,10 +186,10 @@ export class Sandbox {
const handleCreateTerminal: SocketHandler = async ({ id }: any) => { const handleCreateTerminal: SocketHandler = async ({ id }: any) => {
await lockManager.acquireLock(this.sandboxId, async () => { await lockManager.acquireLock(this.sandboxId, async () => {
await this.terminalManager?.createTerminal(id, (responseString: string) => { await this.terminalManager?.createTerminal(id, (responseString: string) => {
this.socket.emit("terminalResponse", { id, data: responseString }) connection.socket.emit("terminalResponse", { id, data: responseString })
const port = extractPortNumber(responseString) const port = extractPortNumber(responseString)
if (port) { if (port) {
this.socket.emit( connection.socket.emit(
"previewURL", "previewURL",
"https://" + this.container?.getHost(port) "https://" + this.container?.getHost(port)
) )
@ -220,7 +215,7 @@ export class Sandbox {
// Handle generating code // Handle generating code
const handleGenerateCode: SocketHandler = ({ fileName, code, line, instructions }: any) => { const handleGenerateCode: SocketHandler = ({ fileName, code, line, instructions }: any) => {
return this.aiWorker.generateCode(this.userId, fileName, code, line, instructions) return this.aiWorker.generateCode(connection.userId, fileName, code, line, instructions)
} }
return { return {

View File

@ -3,17 +3,18 @@ import dotenv from "dotenv"
import express, { Express } from "express" import express, { Express } from "express"
import fs from "fs" import fs from "fs"
import { createServer } from "http" import { createServer } from "http"
import { Server } from "socket.io" import { Server, Socket } from "socket.io"
import { AIWorker } from "./AIWorker" import { AIWorker } from "./AIWorker"
import { ConnectionManager } from "./ConnectionManager"
import { DokkuClient } from "./DokkuClient" import { DokkuClient } from "./DokkuClient"
import { OwnerConnectionManager as ConnectionManager } from "./OwnerConnectionManager"
import { Sandbox } from "./SandboxManager" import { Sandbox } from "./SandboxManager"
import { SecureGitClient } from "./SecureGitClient" import { SecureGitClient } from "./SecureGitClient"
import { socketAuth } from "./socketAuth"; // Import the new socketAuth middleware import { socketAuth } from "./socketAuth"; // Import the new socketAuth middleware
import { TFile, TFolder } from "./types"
// Log errors and send a notification to the client // Log errors and send a notification to the client
export const handleErrors = (message: string, error: any, socket: any) => { export const handleErrors = (message: string, error: any, socket: Socket) => {
console.error(message, error); console.error(message, error);
socket.emit("error", `${message} ${error.message ?? error}`); socket.emit("error", `${message} ${error.message ?? error}`);
}; };
@ -106,22 +107,35 @@ io.on("connection", async (socket) => {
return return
} }
} }
connections.addConnectionForSandbox(socket, data.sandboxId)
try { try {
// Create or retrieve the sandbox manager for the given sandbox ID // Create or retrieve the sandbox manager for the given sandbox ID
const sandboxManager = sandboxes[data.sandboxId] ?? new Sandbox( const sandboxManager = sandboxes[data.sandboxId] ?? new Sandbox(
data.sandboxId, data.sandboxId,
data.userId, {
data.isOwner, aiWorker, dokkuClient, gitClient,
{ aiWorker, dokkuClient, gitClient, socket } }
) )
sandboxes[data.sandboxId] = sandboxManager
const sendFileNotifications = (files: (TFolder | TFile)[]) => {
connections.connectionsForSandbox(data.sandboxId).forEach((socket: Socket) => {
socket.emit("loaded", files);
});
};
// Initialize the sandbox container // Initialize the sandbox container
// The file manager and terminal managers will be set up if they have been closed // The file manager and terminal managers will be set up if they have been closed
sandboxManager.initializeContainer() await sandboxManager.initialize(sendFileNotifications)
socket.emit("loaded", sandboxManager.fileManager?.files)
// Register event handlers for the sandbox // Register event handlers for the sandbox
Object.entries(sandboxManager.handlers()).forEach(([event, handler]) => { Object.entries(sandboxManager.handlers({
userId: data.userId,
isOwner: data.isOwner,
socket
})).forEach(([event, handler]) => {
socket.on(event, async (options: any, callback?: (response: any) => void) => { socket.on(event, async (options: any, callback?: (response: any) => void) => {
try { try {
const result = await handler(options) const result = await handler(options)
@ -135,6 +149,8 @@ io.on("connection", async (socket) => {
// Handle disconnection event // Handle disconnection event
socket.on("disconnect", async () => { socket.on("disconnect", async () => {
try { try {
connections.removeConnectionForSandbox(socket, data.sandboxId)
if (data.isOwner) { if (data.isOwner) {
connections.ownerDisconnected(data.sandboxId) connections.ownerDisconnected(data.sandboxId)
// If the owner has disconnected from all sockets, close open terminals and file watchers.o // If the owner has disconnected from all sockets, close open terminals and file watchers.o