fix: forward filesystem change notifications to all relevant connections
This commit is contained in:
parent
a87a4b5160
commit
7ace8f569a
50
backend/server/src/ConnectionManager.ts
Normal file
50
backend/server/src/ConnectionManager.ts
Normal file
@ -0,0 +1,50 @@
|
||||
import { Socket } from "socket.io"
|
||||
|
||||
class Counter {
|
||||
private count: number = 0
|
||||
|
||||
increment() {
|
||||
this.count++
|
||||
}
|
||||
|
||||
decrement() {
|
||||
this.count = Math.max(0, this.count - 1)
|
||||
}
|
||||
|
||||
getValue(): number {
|
||||
return this.count
|
||||
}
|
||||
}
|
||||
|
||||
// Owner Connection Management
|
||||
export class ConnectionManager {
|
||||
private ownerConnections: Record<string, Counter> = {}
|
||||
private sockets: Record<string, Set<Socket>> = {}
|
||||
|
||||
ownerConnected(sandboxId: string) {
|
||||
this.ownerConnections[sandboxId] ??= new Counter()
|
||||
this.ownerConnections[sandboxId].increment()
|
||||
}
|
||||
|
||||
ownerDisconnected(sandboxId: string) {
|
||||
this.ownerConnections[sandboxId]?.decrement()
|
||||
}
|
||||
|
||||
ownerIsConnected(sandboxId: string): boolean {
|
||||
return this.ownerConnections[sandboxId]?.getValue() > 0
|
||||
}
|
||||
|
||||
addConnectionForSandbox(socket: Socket, sandboxId: string) {
|
||||
this.sockets[sandboxId] ??= new Set()
|
||||
this.sockets[sandboxId].add(socket)
|
||||
}
|
||||
|
||||
removeConnectionForSandbox(socket: Socket, sandboxId: string) {
|
||||
this.sockets[sandboxId]?.delete(socket)
|
||||
}
|
||||
|
||||
connectionsForSandbox(sandboxId: string): Set<Socket> {
|
||||
return this.sockets[sandboxId] ?? new Set();
|
||||
}
|
||||
|
||||
}
|
@ -50,13 +50,13 @@ export class FileManager {
|
||||
public fileData: TFileData[]
|
||||
private fileWatchers: WatchHandle[] = []
|
||||
private dirName = "/home/user/project"
|
||||
private refreshFileList: (files: (TFolder | TFile)[]) => void
|
||||
private refreshFileList: ((files: (TFolder | TFile)[]) => void) | null
|
||||
|
||||
// Constructor to initialize the FileManager
|
||||
constructor(
|
||||
sandboxId: string,
|
||||
sandbox: Sandbox,
|
||||
refreshFileList: (files: (TFolder | TFile)[]) => void
|
||||
refreshFileList: ((files: (TFolder | TFile)[]) => void) | null
|
||||
) {
|
||||
this.sandboxId = sandboxId
|
||||
this.sandbox = sandbox
|
||||
@ -314,7 +314,9 @@ export class FileManager {
|
||||
}
|
||||
|
||||
// Tell the client to reload the file list
|
||||
this.refreshFileList(this.files)
|
||||
if (event.type !== "chmod") {
|
||||
this.refreshFileList?.(this.files)
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(
|
||||
`Error handling ${event.type} event for ${event.name}:`,
|
||||
|
@ -1,33 +0,0 @@
|
||||
class Counter {
|
||||
private count: number = 0
|
||||
|
||||
increment() {
|
||||
this.count++
|
||||
}
|
||||
|
||||
decrement() {
|
||||
this.count = Math.max(0, this.count - 1)
|
||||
}
|
||||
|
||||
getValue(): number {
|
||||
return this.count
|
||||
}
|
||||
}
|
||||
|
||||
// Owner Connection Management
|
||||
export class OwnerConnectionManager {
|
||||
private connections: Record<string, Counter> = {}
|
||||
|
||||
ownerConnected(sandboxId: string) {
|
||||
this.connections[sandboxId] ??= new Counter()
|
||||
this.connections[sandboxId].increment()
|
||||
}
|
||||
|
||||
ownerDisconnected(sandboxId: string) {
|
||||
this.connections[sandboxId]?.decrement()
|
||||
}
|
||||
|
||||
ownerIsConnected(sandboxId: string): boolean {
|
||||
return this.connections[sandboxId]?.getValue() > 0
|
||||
}
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
import { Sandbox as E2BSandbox } from "e2b"
|
||||
import { Socket } from 'socket.io'
|
||||
import { Socket } from "socket.io"
|
||||
import { AIWorker } from "./AIWorker"
|
||||
import { CONTAINER_TIMEOUT } from "./constants"
|
||||
import { DokkuClient } from "./DokkuClient"
|
||||
@ -33,36 +33,35 @@ type ServerContext = {
|
||||
aiWorker: AIWorker;
|
||||
dokkuClient: DokkuClient | null;
|
||||
gitClient: SecureGitClient | null;
|
||||
socket: Socket;
|
||||
};
|
||||
|
||||
export class Sandbox {
|
||||
// Sandbox properties:
|
||||
sandboxId: string;
|
||||
fileManager: FileManager | null;
|
||||
terminalManager: TerminalManager | null;
|
||||
container: E2BSandbox | null;
|
||||
// Server context:
|
||||
dokkuClient: DokkuClient | null;
|
||||
gitClient: SecureGitClient | null;
|
||||
aiWorker: AIWorker;
|
||||
socket: Socket;
|
||||
sandboxId: string;
|
||||
userId: string;
|
||||
isOwner: boolean;
|
||||
|
||||
constructor(sandboxId: string, userId: string, isOwner: boolean, { aiWorker, dokkuClient, gitClient, socket }: ServerContext) {
|
||||
constructor(sandboxId: string, { aiWorker, dokkuClient, gitClient }: ServerContext) {
|
||||
// Sandbox properties:
|
||||
this.sandboxId = sandboxId;
|
||||
this.fileManager = null;
|
||||
this.terminalManager = null;
|
||||
this.container = null;
|
||||
this.sandboxId = sandboxId;
|
||||
this.userId = userId;
|
||||
this.isOwner = isOwner;
|
||||
// Server context:
|
||||
this.aiWorker = aiWorker;
|
||||
this.dokkuClient = dokkuClient;
|
||||
this.gitClient = gitClient;
|
||||
this.socket = socket;
|
||||
}
|
||||
|
||||
// Initializes the container for the sandbox environment
|
||||
async initializeContainer() {
|
||||
async initialize(
|
||||
fileWatchCallback: ((files: (TFolder | TFile)[]) => void) | undefined
|
||||
) {
|
||||
// Acquire a lock to ensure exclusive access to the sandbox environment
|
||||
await lockManager.acquireLock(this.sandboxId, async () => {
|
||||
// Check if a container already exists and is running
|
||||
@ -90,14 +89,10 @@ export class Sandbox {
|
||||
this.fileManager = new FileManager(
|
||||
this.sandboxId,
|
||||
this.container,
|
||||
(files: (TFolder | TFile)[]) => {
|
||||
// Emit an event to the socket when files are loaded
|
||||
this.socket.emit("loaded", files)
|
||||
}
|
||||
fileWatchCallback ?? null
|
||||
)
|
||||
// Initialize the file manager and emit the initial files
|
||||
this.fileManager.initialize()
|
||||
this.socket.emit("loaded", this.fileManager.files)
|
||||
await this.fileManager.initialize()
|
||||
}
|
||||
}
|
||||
|
||||
@ -113,12 +108,12 @@ export class Sandbox {
|
||||
this.fileManager = null;
|
||||
}
|
||||
|
||||
handlers() {
|
||||
handlers(connection: { userId: string, isOwner: boolean, socket: Socket }) {
|
||||
|
||||
// Handle heartbeat from a socket connection
|
||||
const handleHeartbeat: SocketHandler = (_: any) => {
|
||||
// Only keep the sandbox alive if the owner is still connected
|
||||
if (this.isOwner) {
|
||||
if (connection.isOwner) {
|
||||
this.container?.setTimeout(CONTAINER_TIMEOUT)
|
||||
}
|
||||
}
|
||||
@ -135,7 +130,7 @@ export class Sandbox {
|
||||
|
||||
// Handle saving a file
|
||||
const handleSaveFile: SocketHandler = async ({ fileId, body }: any) => {
|
||||
await saveFileRL.consume(this.userId, 1);
|
||||
await saveFileRL.consume(connection.userId, 1);
|
||||
return this.fileManager?.saveFile(fileId, body)
|
||||
}
|
||||
|
||||
@ -160,25 +155,25 @@ export class Sandbox {
|
||||
|
||||
// Handle creating a file
|
||||
const handleCreateFile: SocketHandler = async ({ name }: any) => {
|
||||
await createFileRL.consume(this.userId, 1);
|
||||
await createFileRL.consume(connection.userId, 1);
|
||||
return { "success": await this.fileManager?.createFile(name) }
|
||||
}
|
||||
|
||||
// Handle creating a folder
|
||||
const handleCreateFolder: SocketHandler = async ({ name }: any) => {
|
||||
await createFolderRL.consume(this.userId, 1);
|
||||
await createFolderRL.consume(connection.userId, 1);
|
||||
return { "success": await this.fileManager?.createFolder(name) }
|
||||
}
|
||||
|
||||
// Handle renaming a file
|
||||
const handleRenameFile: SocketHandler = async ({ fileId, newName }: any) => {
|
||||
await renameFileRL.consume(this.userId, 1)
|
||||
await renameFileRL.consume(connection.userId, 1)
|
||||
return this.fileManager?.renameFile(fileId, newName)
|
||||
}
|
||||
|
||||
// Handle deleting a file
|
||||
const handleDeleteFile: SocketHandler = async ({ fileId }: any) => {
|
||||
await deleteFileRL.consume(this.userId, 1)
|
||||
await deleteFileRL.consume(connection.userId, 1)
|
||||
return this.fileManager?.deleteFile(fileId)
|
||||
}
|
||||
|
||||
@ -191,10 +186,10 @@ export class Sandbox {
|
||||
const handleCreateTerminal: SocketHandler = async ({ id }: any) => {
|
||||
await lockManager.acquireLock(this.sandboxId, async () => {
|
||||
await this.terminalManager?.createTerminal(id, (responseString: string) => {
|
||||
this.socket.emit("terminalResponse", { id, data: responseString })
|
||||
connection.socket.emit("terminalResponse", { id, data: responseString })
|
||||
const port = extractPortNumber(responseString)
|
||||
if (port) {
|
||||
this.socket.emit(
|
||||
connection.socket.emit(
|
||||
"previewURL",
|
||||
"https://" + this.container?.getHost(port)
|
||||
)
|
||||
@ -220,7 +215,7 @@ export class Sandbox {
|
||||
|
||||
// Handle generating code
|
||||
const handleGenerateCode: SocketHandler = ({ fileName, code, line, instructions }: any) => {
|
||||
return this.aiWorker.generateCode(this.userId, fileName, code, line, instructions)
|
||||
return this.aiWorker.generateCode(connection.userId, fileName, code, line, instructions)
|
||||
}
|
||||
|
||||
return {
|
||||
|
@ -3,17 +3,18 @@ import dotenv from "dotenv"
|
||||
import express, { Express } from "express"
|
||||
import fs from "fs"
|
||||
import { createServer } from "http"
|
||||
import { Server } from "socket.io"
|
||||
import { Server, Socket } from "socket.io"
|
||||
import { AIWorker } from "./AIWorker"
|
||||
|
||||
import { ConnectionManager } from "./ConnectionManager"
|
||||
import { DokkuClient } from "./DokkuClient"
|
||||
import { OwnerConnectionManager as ConnectionManager } from "./OwnerConnectionManager"
|
||||
import { Sandbox } from "./SandboxManager"
|
||||
import { SecureGitClient } from "./SecureGitClient"
|
||||
import { socketAuth } from "./socketAuth"; // Import the new socketAuth middleware
|
||||
import { TFile, TFolder } from "./types"
|
||||
|
||||
// Log errors and send a notification to the client
|
||||
export const handleErrors = (message: string, error: any, socket: any) => {
|
||||
export const handleErrors = (message: string, error: any, socket: Socket) => {
|
||||
console.error(message, error);
|
||||
socket.emit("error", `${message} ${error.message ?? error}`);
|
||||
};
|
||||
@ -106,22 +107,35 @@ io.on("connection", async (socket) => {
|
||||
return
|
||||
}
|
||||
}
|
||||
connections.addConnectionForSandbox(socket, data.sandboxId)
|
||||
|
||||
try {
|
||||
// Create or retrieve the sandbox manager for the given sandbox ID
|
||||
const sandboxManager = sandboxes[data.sandboxId] ?? new Sandbox(
|
||||
data.sandboxId,
|
||||
data.userId,
|
||||
data.isOwner,
|
||||
{ aiWorker, dokkuClient, gitClient, socket }
|
||||
{
|
||||
aiWorker, dokkuClient, gitClient,
|
||||
}
|
||||
)
|
||||
sandboxes[data.sandboxId] = sandboxManager
|
||||
|
||||
const sendFileNotifications = (files: (TFolder | TFile)[]) => {
|
||||
connections.connectionsForSandbox(data.sandboxId).forEach((socket: Socket) => {
|
||||
socket.emit("loaded", files);
|
||||
});
|
||||
};
|
||||
|
||||
// Initialize the sandbox container
|
||||
// The file manager and terminal managers will be set up if they have been closed
|
||||
sandboxManager.initializeContainer()
|
||||
await sandboxManager.initialize(sendFileNotifications)
|
||||
socket.emit("loaded", sandboxManager.fileManager?.files)
|
||||
|
||||
// Register event handlers for the sandbox
|
||||
Object.entries(sandboxManager.handlers()).forEach(([event, handler]) => {
|
||||
Object.entries(sandboxManager.handlers({
|
||||
userId: data.userId,
|
||||
isOwner: data.isOwner,
|
||||
socket
|
||||
})).forEach(([event, handler]) => {
|
||||
socket.on(event, async (options: any, callback?: (response: any) => void) => {
|
||||
try {
|
||||
const result = await handler(options)
|
||||
@ -135,6 +149,8 @@ io.on("connection", async (socket) => {
|
||||
// Handle disconnection event
|
||||
socket.on("disconnect", async () => {
|
||||
try {
|
||||
connections.removeConnectionForSandbox(socket, data.sandboxId)
|
||||
|
||||
if (data.isOwner) {
|
||||
connections.ownerDisconnected(data.sandboxId)
|
||||
// If the owner has disconnected from all sockets, close open terminals and file watchers.o
|
||||
|
Loading…
x
Reference in New Issue
Block a user