fix: support filesystem change notifications for multiple connections
This commit is contained in:
parent
eb973e0f83
commit
d2ab544582
50
backend/server/src/ConnectionManager.ts
Normal file
50
backend/server/src/ConnectionManager.ts
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
import { Socket } from "socket.io"
|
||||||
|
|
||||||
|
class Counter {
|
||||||
|
private count: number = 0
|
||||||
|
|
||||||
|
increment() {
|
||||||
|
this.count++
|
||||||
|
}
|
||||||
|
|
||||||
|
decrement() {
|
||||||
|
this.count = Math.max(0, this.count - 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
getValue(): number {
|
||||||
|
return this.count
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Owner Connection Management
|
||||||
|
export class ConnectionManager {
|
||||||
|
private ownerConnections: Record<string, Counter> = {}
|
||||||
|
private sockets: Record<string, Set<Socket>> = {}
|
||||||
|
|
||||||
|
ownerConnected(sandboxId: string) {
|
||||||
|
this.ownerConnections[sandboxId] ??= new Counter()
|
||||||
|
this.ownerConnections[sandboxId].increment()
|
||||||
|
}
|
||||||
|
|
||||||
|
ownerDisconnected(sandboxId: string) {
|
||||||
|
this.ownerConnections[sandboxId]?.decrement()
|
||||||
|
}
|
||||||
|
|
||||||
|
ownerIsConnected(sandboxId: string): boolean {
|
||||||
|
return this.ownerConnections[sandboxId]?.getValue() > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
addConnectionForSandbox(socket: Socket, sandboxId: string) {
|
||||||
|
this.sockets[sandboxId] ??= new Set()
|
||||||
|
this.sockets[sandboxId].add(socket)
|
||||||
|
}
|
||||||
|
|
||||||
|
removeConnectionForSandbox(socket: Socket, sandboxId: string) {
|
||||||
|
this.sockets[sandboxId]?.delete(socket)
|
||||||
|
}
|
||||||
|
|
||||||
|
connectionsForSandbox(sandboxId: string): Set<Socket> {
|
||||||
|
return this.sockets[sandboxId] ?? new Set();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -50,13 +50,13 @@ export class FileManager {
|
|||||||
public fileData: TFileData[]
|
public fileData: TFileData[]
|
||||||
private fileWatchers: WatchHandle[] = []
|
private fileWatchers: WatchHandle[] = []
|
||||||
private dirName = "/home/user/project"
|
private dirName = "/home/user/project"
|
||||||
private refreshFileList: (files: (TFolder | TFile)[]) => void
|
private refreshFileList: ((files: (TFolder | TFile)[]) => void) | null
|
||||||
|
|
||||||
// Constructor to initialize the FileManager
|
// Constructor to initialize the FileManager
|
||||||
constructor(
|
constructor(
|
||||||
sandboxId: string,
|
sandboxId: string,
|
||||||
sandbox: Sandbox,
|
sandbox: Sandbox,
|
||||||
refreshFileList: (files: (TFolder | TFile)[]) => void
|
refreshFileList: ((files: (TFolder | TFile)[]) => void) | null
|
||||||
) {
|
) {
|
||||||
this.sandboxId = sandboxId
|
this.sandboxId = sandboxId
|
||||||
this.sandbox = sandbox
|
this.sandbox = sandbox
|
||||||
@ -314,7 +314,9 @@ export class FileManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Tell the client to reload the file list
|
// Tell the client to reload the file list
|
||||||
this.refreshFileList(this.files)
|
if (event.type !== "chmod") {
|
||||||
|
this.refreshFileList?.(this.files)
|
||||||
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error(
|
console.error(
|
||||||
`Error handling ${event.type} event for ${event.name}:`,
|
`Error handling ${event.type} event for ${event.name}:`,
|
||||||
|
@ -1,33 +0,0 @@
|
|||||||
class Counter {
|
|
||||||
private count: number = 0
|
|
||||||
|
|
||||||
increment() {
|
|
||||||
this.count++
|
|
||||||
}
|
|
||||||
|
|
||||||
decrement() {
|
|
||||||
this.count = Math.max(0, this.count - 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
getValue(): number {
|
|
||||||
return this.count
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Owner Connection Management
|
|
||||||
export class OwnerConnectionManager {
|
|
||||||
private connections: Record<string, Counter> = {}
|
|
||||||
|
|
||||||
ownerConnected(sandboxId: string) {
|
|
||||||
this.connections[sandboxId] ??= new Counter()
|
|
||||||
this.connections[sandboxId].increment()
|
|
||||||
}
|
|
||||||
|
|
||||||
ownerDisconnected(sandboxId: string) {
|
|
||||||
this.connections[sandboxId]?.decrement()
|
|
||||||
}
|
|
||||||
|
|
||||||
ownerIsConnected(sandboxId: string): boolean {
|
|
||||||
return this.connections[sandboxId]?.getValue() > 0
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,5 +1,5 @@
|
|||||||
import { Sandbox as E2BSandbox } from "e2b"
|
import { Sandbox as E2BSandbox } from "e2b"
|
||||||
import { Socket } from 'socket.io'
|
import { Socket } from "socket.io"
|
||||||
import { AIWorker } from "./AIWorker"
|
import { AIWorker } from "./AIWorker"
|
||||||
import { CONTAINER_TIMEOUT } from "./constants"
|
import { CONTAINER_TIMEOUT } from "./constants"
|
||||||
import { DokkuClient } from "./DokkuClient"
|
import { DokkuClient } from "./DokkuClient"
|
||||||
@ -33,36 +33,35 @@ type ServerContext = {
|
|||||||
aiWorker: AIWorker;
|
aiWorker: AIWorker;
|
||||||
dokkuClient: DokkuClient | null;
|
dokkuClient: DokkuClient | null;
|
||||||
gitClient: SecureGitClient | null;
|
gitClient: SecureGitClient | null;
|
||||||
socket: Socket;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
export class Sandbox {
|
export class Sandbox {
|
||||||
|
// Sandbox properties:
|
||||||
|
sandboxId: string;
|
||||||
fileManager: FileManager | null;
|
fileManager: FileManager | null;
|
||||||
terminalManager: TerminalManager | null;
|
terminalManager: TerminalManager | null;
|
||||||
container: E2BSandbox | null;
|
container: E2BSandbox | null;
|
||||||
|
// Server context:
|
||||||
dokkuClient: DokkuClient | null;
|
dokkuClient: DokkuClient | null;
|
||||||
gitClient: SecureGitClient | null;
|
gitClient: SecureGitClient | null;
|
||||||
aiWorker: AIWorker;
|
aiWorker: AIWorker;
|
||||||
socket: Socket;
|
|
||||||
sandboxId: string;
|
|
||||||
userId: string;
|
|
||||||
isOwner: boolean;
|
|
||||||
|
|
||||||
constructor(sandboxId: string, userId: string, isOwner: boolean, { aiWorker, dokkuClient, gitClient, socket }: ServerContext) {
|
constructor(sandboxId: string, { aiWorker, dokkuClient, gitClient }: ServerContext) {
|
||||||
|
// Sandbox properties:
|
||||||
|
this.sandboxId = sandboxId;
|
||||||
this.fileManager = null;
|
this.fileManager = null;
|
||||||
this.terminalManager = null;
|
this.terminalManager = null;
|
||||||
this.container = null;
|
this.container = null;
|
||||||
this.sandboxId = sandboxId;
|
// Server context:
|
||||||
this.userId = userId;
|
|
||||||
this.isOwner = isOwner;
|
|
||||||
this.aiWorker = aiWorker;
|
this.aiWorker = aiWorker;
|
||||||
this.dokkuClient = dokkuClient;
|
this.dokkuClient = dokkuClient;
|
||||||
this.gitClient = gitClient;
|
this.gitClient = gitClient;
|
||||||
this.socket = socket;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initializes the container for the sandbox environment
|
// Initializes the container for the sandbox environment
|
||||||
async initializeContainer() {
|
async initialize(
|
||||||
|
fileWatchCallback: ((files: (TFolder | TFile)[]) => void) | undefined
|
||||||
|
) {
|
||||||
// Acquire a lock to ensure exclusive access to the sandbox environment
|
// Acquire a lock to ensure exclusive access to the sandbox environment
|
||||||
await lockManager.acquireLock(this.sandboxId, async () => {
|
await lockManager.acquireLock(this.sandboxId, async () => {
|
||||||
// Check if a container already exists and is running
|
// Check if a container already exists and is running
|
||||||
@ -90,14 +89,10 @@ export class Sandbox {
|
|||||||
this.fileManager = new FileManager(
|
this.fileManager = new FileManager(
|
||||||
this.sandboxId,
|
this.sandboxId,
|
||||||
this.container,
|
this.container,
|
||||||
(files: (TFolder | TFile)[]) => {
|
fileWatchCallback ?? null
|
||||||
// Emit an event to the socket when files are loaded
|
|
||||||
this.socket.emit("loaded", files)
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
// Initialize the file manager and emit the initial files
|
// Initialize the file manager and emit the initial files
|
||||||
this.fileManager.initialize()
|
await this.fileManager.initialize()
|
||||||
this.socket.emit("loaded", this.fileManager.files)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -113,12 +108,12 @@ export class Sandbox {
|
|||||||
this.fileManager = null;
|
this.fileManager = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
handlers() {
|
handlers(connection: { userId: string, isOwner: boolean, socket: Socket }) {
|
||||||
|
|
||||||
// Handle heartbeat from a socket connection
|
// Handle heartbeat from a socket connection
|
||||||
const handleHeartbeat: SocketHandler = (_: any) => {
|
const handleHeartbeat: SocketHandler = (_: any) => {
|
||||||
// Only keep the sandbox alive if the owner is still connected
|
// Only keep the sandbox alive if the owner is still connected
|
||||||
if (this.isOwner) {
|
if (connection.isOwner) {
|
||||||
this.container?.setTimeout(CONTAINER_TIMEOUT)
|
this.container?.setTimeout(CONTAINER_TIMEOUT)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -135,7 +130,7 @@ export class Sandbox {
|
|||||||
|
|
||||||
// Handle saving a file
|
// Handle saving a file
|
||||||
const handleSaveFile: SocketHandler = async ({ fileId, body }: any) => {
|
const handleSaveFile: SocketHandler = async ({ fileId, body }: any) => {
|
||||||
await saveFileRL.consume(this.userId, 1);
|
await saveFileRL.consume(connection.userId, 1);
|
||||||
return this.fileManager?.saveFile(fileId, body)
|
return this.fileManager?.saveFile(fileId, body)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -160,25 +155,25 @@ export class Sandbox {
|
|||||||
|
|
||||||
// Handle creating a file
|
// Handle creating a file
|
||||||
const handleCreateFile: SocketHandler = async ({ name }: any) => {
|
const handleCreateFile: SocketHandler = async ({ name }: any) => {
|
||||||
await createFileRL.consume(this.userId, 1);
|
await createFileRL.consume(connection.userId, 1);
|
||||||
return { "success": await this.fileManager?.createFile(name) }
|
return { "success": await this.fileManager?.createFile(name) }
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle creating a folder
|
// Handle creating a folder
|
||||||
const handleCreateFolder: SocketHandler = async ({ name }: any) => {
|
const handleCreateFolder: SocketHandler = async ({ name }: any) => {
|
||||||
await createFolderRL.consume(this.userId, 1);
|
await createFolderRL.consume(connection.userId, 1);
|
||||||
return { "success": await this.fileManager?.createFolder(name) }
|
return { "success": await this.fileManager?.createFolder(name) }
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle renaming a file
|
// Handle renaming a file
|
||||||
const handleRenameFile: SocketHandler = async ({ fileId, newName }: any) => {
|
const handleRenameFile: SocketHandler = async ({ fileId, newName }: any) => {
|
||||||
await renameFileRL.consume(this.userId, 1)
|
await renameFileRL.consume(connection.userId, 1)
|
||||||
return this.fileManager?.renameFile(fileId, newName)
|
return this.fileManager?.renameFile(fileId, newName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle deleting a file
|
// Handle deleting a file
|
||||||
const handleDeleteFile: SocketHandler = async ({ fileId }: any) => {
|
const handleDeleteFile: SocketHandler = async ({ fileId }: any) => {
|
||||||
await deleteFileRL.consume(this.userId, 1)
|
await deleteFileRL.consume(connection.userId, 1)
|
||||||
return this.fileManager?.deleteFile(fileId)
|
return this.fileManager?.deleteFile(fileId)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -191,10 +186,10 @@ export class Sandbox {
|
|||||||
const handleCreateTerminal: SocketHandler = async ({ id }: any) => {
|
const handleCreateTerminal: SocketHandler = async ({ id }: any) => {
|
||||||
await lockManager.acquireLock(this.sandboxId, async () => {
|
await lockManager.acquireLock(this.sandboxId, async () => {
|
||||||
await this.terminalManager?.createTerminal(id, (responseString: string) => {
|
await this.terminalManager?.createTerminal(id, (responseString: string) => {
|
||||||
this.socket.emit("terminalResponse", { id, data: responseString })
|
connection.socket.emit("terminalResponse", { id, data: responseString })
|
||||||
const port = extractPortNumber(responseString)
|
const port = extractPortNumber(responseString)
|
||||||
if (port) {
|
if (port) {
|
||||||
this.socket.emit(
|
connection.socket.emit(
|
||||||
"previewURL",
|
"previewURL",
|
||||||
"https://" + this.container?.getHost(port)
|
"https://" + this.container?.getHost(port)
|
||||||
)
|
)
|
||||||
@ -220,7 +215,7 @@ export class Sandbox {
|
|||||||
|
|
||||||
// Handle generating code
|
// Handle generating code
|
||||||
const handleGenerateCode: SocketHandler = ({ fileName, code, line, instructions }: any) => {
|
const handleGenerateCode: SocketHandler = ({ fileName, code, line, instructions }: any) => {
|
||||||
return this.aiWorker.generateCode(this.userId, fileName, code, line, instructions)
|
return this.aiWorker.generateCode(connection.userId, fileName, code, line, instructions)
|
||||||
}
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
@ -3,17 +3,18 @@ import dotenv from "dotenv"
|
|||||||
import express, { Express } from "express"
|
import express, { Express } from "express"
|
||||||
import fs from "fs"
|
import fs from "fs"
|
||||||
import { createServer } from "http"
|
import { createServer } from "http"
|
||||||
import { Server } from "socket.io"
|
import { Server, Socket } from "socket.io"
|
||||||
import { AIWorker } from "./AIWorker"
|
import { AIWorker } from "./AIWorker"
|
||||||
|
|
||||||
|
import { ConnectionManager } from "./ConnectionManager"
|
||||||
import { DokkuClient } from "./DokkuClient"
|
import { DokkuClient } from "./DokkuClient"
|
||||||
import { OwnerConnectionManager as ConnectionManager } from "./OwnerConnectionManager"
|
|
||||||
import { Sandbox } from "./SandboxManager"
|
import { Sandbox } from "./SandboxManager"
|
||||||
import { SecureGitClient } from "./SecureGitClient"
|
import { SecureGitClient } from "./SecureGitClient"
|
||||||
import { socketAuth } from "./socketAuth"; // Import the new socketAuth middleware
|
import { socketAuth } from "./socketAuth"; // Import the new socketAuth middleware
|
||||||
|
import { TFile, TFolder } from "./types"
|
||||||
|
|
||||||
// Log errors and send a notification to the client
|
// Log errors and send a notification to the client
|
||||||
export const handleErrors = (message: string, error: any, socket: any) => {
|
export const handleErrors = (message: string, error: any, socket: Socket) => {
|
||||||
console.error(message, error);
|
console.error(message, error);
|
||||||
socket.emit("error", `${message} ${error.message ?? error}`);
|
socket.emit("error", `${message} ${error.message ?? error}`);
|
||||||
};
|
};
|
||||||
@ -106,22 +107,36 @@ io.on("connection", async (socket) => {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
connections.addConnectionForSandbox(socket, data.sandboxId)
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Create or retrieve the sandbox manager for the given sandbox ID
|
// Create or retrieve the sandbox manager for the given sandbox ID
|
||||||
const sandboxManager = sandboxes[data.sandboxId] ?? new Sandbox(
|
const sandboxManager = sandboxes[data.sandboxId] ?? new Sandbox(
|
||||||
data.sandboxId,
|
data.sandboxId,
|
||||||
data.userId,
|
{
|
||||||
data.isOwner,
|
aiWorker, dokkuClient, gitClient,
|
||||||
{ aiWorker, dokkuClient, gitClient, socket }
|
}
|
||||||
)
|
)
|
||||||
|
sandboxes[data.sandboxId] = sandboxManager
|
||||||
|
|
||||||
|
const sendFileNotifications = (files: (TFolder | TFile)[]) => {
|
||||||
|
console.log("NOTIFICATION")
|
||||||
|
connections.connectionsForSandbox(data.sandboxId).forEach((socket: Socket) => {
|
||||||
|
socket.emit("loaded", files);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
// Initialize the sandbox container
|
// Initialize the sandbox container
|
||||||
// The file manager and terminal managers will be set up if they have been closed
|
// The file manager and terminal managers will be set up if they have been closed
|
||||||
sandboxManager.initializeContainer()
|
await sandboxManager.initialize(sendFileNotifications)
|
||||||
|
socket.emit("loaded", sandboxManager.fileManager?.files)
|
||||||
|
|
||||||
// Register event handlers for the sandbox
|
// Register event handlers for the sandbox
|
||||||
Object.entries(sandboxManager.handlers()).forEach(([event, handler]) => {
|
Object.entries(sandboxManager.handlers({
|
||||||
|
userId: data.userId,
|
||||||
|
isOwner: data.isOwner,
|
||||||
|
socket
|
||||||
|
})).forEach(([event, handler]) => {
|
||||||
socket.on(event, async (options: any, callback?: (response: any) => void) => {
|
socket.on(event, async (options: any, callback?: (response: any) => void) => {
|
||||||
try {
|
try {
|
||||||
const result = await handler(options)
|
const result = await handler(options)
|
||||||
@ -135,6 +150,8 @@ io.on("connection", async (socket) => {
|
|||||||
// Handle disconnection event
|
// Handle disconnection event
|
||||||
socket.on("disconnect", async () => {
|
socket.on("disconnect", async () => {
|
||||||
try {
|
try {
|
||||||
|
connections.removeConnectionForSandbox(socket, data.sandboxId)
|
||||||
|
|
||||||
if (data.isOwner) {
|
if (data.isOwner) {
|
||||||
connections.ownerDisconnected(data.sandboxId)
|
connections.ownerDisconnected(data.sandboxId)
|
||||||
// If the owner has disconnected from all sockets, close open terminals and file watchers.o
|
// If the owner has disconnected from all sockets, close open terminals and file watchers.o
|
||||||
|
Loading…
x
Reference in New Issue
Block a user