feat: Implement broadcaster-hosted TURN functionality over Hyperswarm. Remove the need for Google TURN/STUN servers.

- Modified code to enable the broadcaster to act as a TURN server using Hyperswarm connections.
- Replaced external STUN/TURN servers with custom signaling over Hyperswarm.
- Configured RTCPeerConnection with an empty ICE servers array to prevent reliance on external servers.
- Implemented custom signaling protocol for exchanging offers, answers, and ICE candidates over Hyperswarm connections.
- Updated broadcaster to relay media streams, effectively mimicking TURN server behavior within the Hyperswarm network.
- Adjusted peer connection setup for both broadcaster and listener to use Hyperswarm for signaling and data transport.
- Removed dependency on third-party servers; all communication now occurs within the peer-to-peer network.
- Updated UI elements and controls to reflect the changes.
- Ensured that peer count updates correctly with new connection handling.
This commit is contained in:
Raven Scott 2024-11-24 06:10:31 -05:00
parent e6891e81c3
commit b10b4f390d
3 changed files with 189 additions and 258 deletions

433
app.js
View File

@ -9,7 +9,7 @@ let currentDeviceId = null; // To store the selected audio input device ID
let isBroadcasting = false;
let localStream; // For broadcaster's audio stream
let peerConnections = {}; // Store WebRTC peer connections
let iceCandidateQueues = {}; // Store ICE candidate queues
let dataChannels = {}; // Store data channels for signaling
let conns = []; // Store Hyperswarm connections
document.addEventListener("DOMContentLoaded", () => {
@ -74,7 +74,6 @@ async function populateAudioInputSources() {
}
}
// Updated applyAudioSource function
async function applyAudioSource() {
const selectedDeviceId = document.getElementById('audio-input-select').value;
if (selectedDeviceId !== currentDeviceId) {
@ -82,7 +81,6 @@ async function applyAudioSource() {
if (isBroadcasting) {
console.log("Applying new audio source:", selectedDeviceId);
try {
// Get the new audio stream
const newStream = await navigator.mediaDevices.getUserMedia({
audio: { deviceId: currentDeviceId ? { exact: currentDeviceId } : undefined },
});
@ -118,7 +116,6 @@ async function applyAudioSource() {
}
}
// Update peer count using conns.length
function updatePeerCount() {
const peerCount = conns.length;
const stationInfoElement = document.getElementById('station-info');
@ -149,38 +146,33 @@ async function setupStation(key) {
console.log("New connection established");
conns.push(conn);
updatePeerCount(); // Update peer count when a new connection is established
const remoteKey = conn.remotePublicKey.toString('hex');
console.log("Remote key:", remoteKey);
// Initialize ICE candidate queue
iceCandidateQueues[remoteKey] = [];
// Use the Hyperswarm connection as a data channel for signaling
dataChannels[remoteKey] = conn;
// Handle incoming signaling data
conn.on('data', async (data) => {
console.log("Received data from peer:", data.toString());
const message = JSON.parse(data.toString());
await handleSignalingData(conn, message);
});
// Set up WebRTC peer connection
setupBroadcasterPeerConnection(conn, remoteKey);
conn.on('close', () => {
console.log("Connection closed with peer:", remoteKey);
// Clean up peer connection when connection closes
console.log("Connection closed with peer");
if (peerConnections[remoteKey]) {
peerConnections[remoteKey].close();
delete peerConnections[remoteKey];
}
delete iceCandidateQueues[remoteKey];
delete dataChannels[remoteKey];
conns.splice(conns.indexOf(conn), 1);
updatePeerCount(); // Update peer count when a connection is closed
});
conn.on('error', (err) => {
console.error("Connection error with peer:", remoteKey, err);
console.error("Connection error with peer:", err);
if (peerConnections[remoteKey]) {
peerConnections[remoteKey].close();
delete peerConnections[remoteKey];
}
delete iceCandidateQueues[remoteKey];
delete dataChannels[remoteKey];
conns.splice(conns.indexOf(conn), 1);
updatePeerCount(); // Update peer count on error
});
@ -198,116 +190,189 @@ async function setupStation(key) {
}
}
function stopBroadcast() {
console.log("Broadcast stopped");
// Close all peer connections
Object.values(peerConnections).forEach((pc) => pc.close());
peerConnections = {};
function setupBroadcasterPeerConnection(conn, remoteKey) {
const configuration = {
iceServers: [], // Empty array since we are not using external STUN/TURN servers
};
const peerConnection = new RTCPeerConnection(configuration);
peerConnections[remoteKey] = peerConnection;
// Stop local media tracks
if (localStream) {
localStream.getTracks().forEach((track) => track.stop());
localStream = null;
console.log("Local media tracks stopped");
}
// Add local stream tracks to peer connection
localStream.getTracks().forEach((track) => {
peerConnection.addTrack(track, localStream);
console.log("Added track to peer connection:", track);
});
isBroadcasting = false;
// Handle ICE candidates
peerConnection.onicecandidate = ({ candidate }) => {
if (candidate) {
console.log("Sending ICE candidate to peer");
conn.write(JSON.stringify({ type: 'candidate', candidate }));
}
};
peerConnection.oniceconnectionstatechange = () => {
console.log("Broadcaster ICE connection state changed to:", peerConnection.iceConnectionState);
};
// Handle incoming signaling data
conn.on('data', async (data) => {
const message = JSON.parse(data.toString());
await handleBroadcasterSignalingData(conn, message, remoteKey);
});
}
async function handleSignalingData(conn, message, peerConnection = null) {
try {
const remoteKey = conn.remotePublicKey.toString('hex');
console.log("Handling signaling data:", message.type, "from", remoteKey);
async function handleBroadcasterSignalingData(conn, message, remoteKey) {
const peerConnection = peerConnections[remoteKey];
if (message.type === 'offer') {
console.log("Received offer from peer");
await peerConnection.setRemoteDescription(new RTCSessionDescription(message.offer));
console.log("Set remote description with offer from peer");
if (!peerConnection) {
peerConnection = peerConnections[remoteKey];
const answer = await peerConnection.createAnswer();
await peerConnection.setLocalDescription(answer);
console.log("Created and set local description with answer");
// Send the answer back to the listener
conn.write(JSON.stringify({ type: 'answer', answer }));
console.log("Sent answer to peer");
} else if (message.type === 'candidate') {
console.log("Received ICE candidate from peer");
await peerConnection.addIceCandidate(new RTCIceCandidate(message.candidate));
}
}
async function joinStation() {
try {
const stationId = document.getElementById('station-id').value;
if (!stationId) {
alert("Please enter a station ID.");
return;
}
if (message.type === 'offer') {
// Received an offer from a listener (only for broadcaster)
console.log("Creating new RTCPeerConnection for remote key:", remoteKey);
const configuration = {
iceServers: [{ urls: 'stun:stun.l.google.com:19302' }],
};
peerConnection = new RTCPeerConnection(configuration);
peerConnections[remoteKey] = peerConnection;
console.log("Joining station with ID:", stationId);
const topicBuffer = b4a.from(stationId, 'hex');
swarm = new Hyperswarm();
swarm.join(topicBuffer, { client: true, server: false });
// Add local stream tracks to peer connection
localStream.getTracks().forEach((track) => {
peerConnection.addTrack(track, localStream);
console.log("Added track to peer connection:", track);
swarm.on('connection', (conn) => {
console.log("Connected to broadcaster");
conns.push(conn);
updatePeerCount(); // Update peer count when a new connection is established
const remoteKey = conn.remotePublicKey.toString('hex');
// Use the Hyperswarm connection as a data channel for signaling
dataChannels[remoteKey] = conn;
// Set up WebRTC peer connection
setupListenerPeerConnection(conn, remoteKey);
conn.on('close', () => {
console.log("Connection closed with broadcaster");
if (peerConnections[remoteKey]) {
peerConnections[remoteKey].close();
delete peerConnections[remoteKey];
}
delete dataChannels[remoteKey];
conns.splice(conns.indexOf(conn), 1);
updatePeerCount(); // Update peer count when a connection is closed
});
// Handle ICE candidates
peerConnection.onicecandidate = ({ candidate }) => {
if (candidate) {
console.log("Sending ICE candidate to peer:", candidate);
conn.write(JSON.stringify({ type: 'candidate', candidate }));
} else {
console.log("All ICE candidates have been sent to peer");
conn.on('error', (err) => {
console.error("Connection error with broadcaster:", err);
if (peerConnections[remoteKey]) {
peerConnections[remoteKey].close();
delete peerConnections[remoteKey];
}
};
delete dataChannels[remoteKey];
conns.splice(conns.indexOf(conn), 1);
updatePeerCount(); // Update peer count on error
});
peerConnection.oniceconnectionstatechange = () => {
console.log("Broadcaster ICE connection state changed to:", peerConnection.iceConnectionState);
};
updatePeerCount();
});
// Set remote description
await peerConnection.setRemoteDescription(new RTCSessionDescription(message.offer));
console.log("Set remote description with offer from peer");
document.getElementById('station-info').textContent = `Connected to Station: ${stationId}`;
document.getElementById('setup').classList.add('d-none');
document.getElementById('controls').classList.remove('d-none');
document.getElementById('listener-controls').classList.remove('d-none');
// Process any queued ICE candidates
if (iceCandidateQueues[remoteKey]) {
console.log("Processing queued ICE candidates");
for (const candidate of iceCandidateQueues[remoteKey]) {
await peerConnection.addIceCandidate(candidate);
console.log("Added queued ICE candidate");
}
iceCandidateQueues[remoteKey] = [];
}
const answer = await peerConnection.createAnswer();
await peerConnection.setLocalDescription(answer);
console.log("Created and set local description with answer");
// Send the answer back to the listener
conn.write(JSON.stringify({ type: 'answer', answer }));
console.log("Sent answer to peer");
} else if (message.type === 'candidate') {
// Received an ICE candidate
const candidate = new RTCIceCandidate(message.candidate);
console.log("Received ICE candidate from peer:", candidate);
if (peerConnection && peerConnection.remoteDescription && peerConnection.remoteDescription.type) {
await peerConnection.addIceCandidate(candidate);
console.log("Added ICE candidate to peer connection");
} else {
console.log("Remote description not set yet, queuing ICE candidate");
if (!iceCandidateQueues[remoteKey]) {
iceCandidateQueues[remoteKey] = [];
}
iceCandidateQueues[remoteKey].push(candidate);
}
} else if (message.type === 'answer') {
// Received an answer from the broadcaster (only for listener)
console.log("Received answer from broadcaster");
await peerConnection.setRemoteDescription(new RTCSessionDescription(message.answer));
console.log("Set remote description with answer from broadcaster");
// Process any queued ICE candidates
if (iceCandidateQueues[remoteKey]) {
console.log("Processing queued ICE candidates");
for (const candidate of iceCandidateQueues[remoteKey]) {
await peerConnection.addIceCandidate(candidate);
console.log("Added queued ICE candidate");
}
iceCandidateQueues[remoteKey] = [];
}
}
console.log("Joined station successfully");
} catch (err) {
console.error("Error handling signaling data:", err);
console.error("Error joining station:", err);
alert("Failed to join station. Please try again.");
}
}
function setupListenerPeerConnection(conn, remoteKey) {
const configuration = {
iceServers: [], // Empty array since we are not using external STUN/TURN servers
};
const peerConnection = new RTCPeerConnection(configuration);
peerConnections[remoteKey] = peerConnection;
// Handle incoming tracks (audio streams)
peerConnection.ontrack = (event) => {
console.log("Received remote track");
const [remoteStream] = event.streams;
// Play the remote audio stream
const audioElement = document.createElement('audio');
audioElement.srcObject = remoteStream;
audioElement.autoplay = true;
document.body.appendChild(audioElement);
console.log("Audio element created and playback started");
};
// Handle ICE candidates
peerConnection.onicecandidate = ({ candidate }) => {
if (candidate) {
console.log("Sending ICE candidate to broadcaster");
conn.write(JSON.stringify({ type: 'candidate', candidate }));
}
};
peerConnection.oniceconnectionstatechange = () => {
console.log("Listener ICE connection state changed to:", peerConnection.iceConnectionState);
};
// Handle signaling data from broadcaster
conn.on('data', async (data) => {
const message = JSON.parse(data.toString());
await handleListenerSignalingData(conn, message, remoteKey);
});
initiateOffer(conn, peerConnection);
}
async function handleListenerSignalingData(conn, message, remoteKey) {
const peerConnection = peerConnections[remoteKey];
if (message.type === 'answer') {
console.log("Received answer from broadcaster");
await peerConnection.setRemoteDescription(new RTCSessionDescription(message.answer));
console.log("Set remote description with answer from broadcaster");
} else if (message.type === 'candidate') {
console.log("Received ICE candidate from broadcaster");
await peerConnection.addIceCandidate(new RTCIceCandidate(message.candidate));
}
}
async function initiateOffer(conn, peerConnection) {
try {
console.log("Initiating offer to broadcaster");
// Add transceiver to receive audio
peerConnection.addTransceiver('audio', { direction: 'recvonly' });
const offer = await peerConnection.createOffer();
await peerConnection.setLocalDescription(offer);
console.log("Created and set local description with offer");
// Send offer to broadcaster
conn.write(JSON.stringify({ type: 'offer', offer }));
console.log("Sent offer to broadcaster");
} catch (err) {
console.error("Error initiating offer:", err);
}
}
@ -343,145 +408,3 @@ function leaveStation() {
console.log("Left the station.");
}
async function joinStation() {
try {
const stationId = document.getElementById('station-id').value;
if (!stationId) {
alert("Please enter a station ID.");
return;
}
console.log("Joining station with ID:", stationId);
const topicBuffer = b4a.from(stationId, 'hex');
swarm = new Hyperswarm();
swarm.join(topicBuffer, { client: true, server: false });
swarm.on('connection', (conn) => {
console.log("Connected to broadcaster");
conns.push(conn);
updatePeerCount(); // Update peer count when a new connection is established
const remoteKey = conn.remotePublicKey.toString('hex');
console.log("Remote key:", remoteKey);
// Initialize ICE candidate queue
iceCandidateQueues[remoteKey] = [];
const configuration = {
iceServers: [{ urls: 'stun:stun.l.google.com:19302' }],
};
const peerConnection = new RTCPeerConnection(configuration);
peerConnections[remoteKey] = peerConnection;
// Add transceiver to receive audio
peerConnection.addTransceiver('audio', { direction: 'recvonly' });
// Handle incoming tracks (audio streams)
peerConnection.ontrack = (event) => {
console.log("Received remote track");
const [remoteStream] = event.streams;
// Play the remote audio stream
const audioElement = document.createElement('audio');
audioElement.srcObject = remoteStream;
audioElement.autoplay = true;
document.body.appendChild(audioElement);
console.log("Audio element created and playback started");
};
// Handle ICE candidates
peerConnection.onicecandidate = ({ candidate }) => {
if (candidate) {
console.log("Sending ICE candidate to broadcaster:", candidate);
conn.write(JSON.stringify({ type: 'candidate', candidate }));
} else {
console.log("All ICE candidates have been sent to broadcaster");
}
};
peerConnection.oniceconnectionstatechange = () => {
console.log("Listener ICE connection state changed to:", peerConnection.iceConnectionState);
};
// Handle signaling data from broadcaster
conn.on('data', async (data) => {
console.log("Received data from broadcaster:", data.toString());
const message = JSON.parse(data.toString());
await handleSignalingData(conn, message, peerConnection);
});
conn.on('close', () => {
console.log("Connection closed with broadcaster");
peerConnection.close();
delete peerConnections[remoteKey];
delete iceCandidateQueues[remoteKey];
conns.splice(conns.indexOf(conn), 1);
updatePeerCount(); // Update peer count when a connection is closed
});
conn.on('error', (err) => {
console.error("Connection error with broadcaster:", err);
peerConnection.close();
delete peerConnections[remoteKey];
delete iceCandidateQueues[remoteKey];
conns.splice(conns.indexOf(conn), 1);
updatePeerCount(); // Update peer count on error
});
// Start signaling process
initiateOffer(conn, peerConnection, remoteKey);
updatePeerCount();
});
document.getElementById('station-info').textContent = `Connected to Station: ${stationId}`;
document.getElementById('setup').classList.add('d-none');
document.getElementById('controls').classList.remove('d-none');
document.getElementById('listener-controls').classList.remove('d-none');
console.log("Joined station successfully");
} catch (err) {
console.error("Error joining station:", err);
alert("Failed to join station. Please try again.");
}
}
async function initiateOffer(conn, peerConnection, remoteKey) {
try {
console.log("Initiating offer to broadcaster");
// Handle ICE candidates
peerConnection.onicecandidate = ({ candidate }) => {
if (candidate) {
console.log("Sending ICE candidate to broadcaster:", candidate);
conn.write(JSON.stringify({ type: 'candidate', candidate }));
} else {
console.log("All ICE candidates have been sent to broadcaster");
}
};
peerConnection.oniceconnectionstatechange = () => {
console.log("Listener ICE connection state changed to:", peerConnection.iceConnectionState);
};
// Create offer
const offer = await peerConnection.createOffer();
await peerConnection.setLocalDescription(offer);
console.log("Created and set local description with offer");
// Send offer to broadcaster
conn.write(JSON.stringify({ type: 'offer', offer }));
console.log("Sent offer to broadcaster");
// Process any queued ICE candidates
if (iceCandidateQueues[remoteKey]) {
console.log("Processing queued ICE candidates");
for (const candidate of iceCandidateQueues[remoteKey]) {
await peerConnection.addIceCandidate(candidate);
console.log("Added queued ICE candidate");
}
iceCandidateQueues[remoteKey] = [];
}
} catch (err) {
console.error("Error initiating offer:", err);
}
}

View File

@ -34,7 +34,7 @@
<button id="open-join-modal" class="btn btn-secondary" data-bs-toggle="modal" data-bs-target="#joinModal">Join Station</button>
</div>
<div id="controls" class="d-none mt-4">
<p id="station-info"></p>
<p id="station-info"></p>
<button id="leave-stream" class="btn btn-danger mt-2">Leave Broadcast</button>
<!-- Broadcaster-only Controls -->

View File

@ -8,8 +8,15 @@
"backgroundColor": "#1F2430",
"height": "540",
"width": "720",
"links": ["http://127.0.0.1", "http://localhost", "https://ka-f.fontawesome.com", "https://cdn.jsdelivr.net", "https://cdnjs.cloudflare.com", "https://stackpath.bootstrapcdn.com", "ttps://code.jquery.com"]
"links": [
"http://127.0.0.1",
"http://localhost",
"https://ka-f.fontawesome.com",
"https://cdn.jsdelivr.net",
"https://cdnjs.cloudflare.com",
"https://stackpath.bootstrapcdn.com",
"ttps://code.jquery.com"
]
}
},
"type": "module",
@ -27,6 +34,7 @@
"bootstrap": "^5.3.3",
"hypercore-crypto": "^3.4.2",
"hyperswarm": "^4.8.4",
"node-turn": "^0.0.6",
"pear-stdio": "^1.0.1",
"stream": "^0.0.3"
}