From edc367b31a6fdcf2c33eac4877ba2307b7424d3b Mon Sep 17 00:00:00 2001 From: albertfj114 Date: Sun, 29 Mar 2026 22:02:20 -0400 Subject: [PATCH] chore: initial commit --- .gitignore | 9 ++ choir-mixer/Dockerfile | 10 +++ choir-mixer/main.py | 141 ++++++++++++++++++++++++++++++++ choir-mixer/mixer.py | 61 ++++++++++++++ choir-mixer/requirements.txt | 3 + choir-mixer/tests/conftest.py | 31 +++++++ choir-mixer/tests/test_mixer.py | 66 +++++++++++++++ docker-compose.yml | 24 ++++++ livekit.yaml | 15 ++++ 9 files changed, 360 insertions(+) create mode 100644 .gitignore create mode 100644 choir-mixer/Dockerfile create mode 100644 choir-mixer/main.py create mode 100644 choir-mixer/mixer.py create mode 100644 choir-mixer/requirements.txt create mode 100644 choir-mixer/tests/conftest.py create mode 100644 choir-mixer/tests/test_mixer.py create mode 100644 docker-compose.yml create mode 100644 livekit.yaml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..803f186 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +node_modules/ +__pycache__/ +*.pyc +*.pyo +.env +*.env +.venv/ +venv/ +*.log diff --git a/choir-mixer/Dockerfile b/choir-mixer/Dockerfile new file mode 100644 index 0000000..ee296fa --- /dev/null +++ b/choir-mixer/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +CMD ["python", "main.py", "start"] diff --git a/choir-mixer/main.py b/choir-mixer/main.py new file mode 100644 index 0000000..2d747c7 --- /dev/null +++ b/choir-mixer/main.py @@ -0,0 +1,141 @@ +import asyncio +import logging +import time +import numpy as np +from livekit import agents, rtc +from mixer import normalize_rms, mix_streams, soft_limit + +logger = logging.getLogger("choir-mixer") +logging.basicConfig(level=logging.INFO) + +SAMPLE_RATE = 48000 +NUM_CHANNELS = 1 +FRAME_DURATION_MS = 20 +SAMPLES_PER_FRAME = SAMPLE_RATE * FRAME_DURATION_MS // 1000 # 960 +MAX_STREAMS = 6 +TARGET_DBFS = -20.0 +FRAME_MAX_AGE_S = 0.06 # Only use frames received in the last 60ms + +server = agents.AgentServer() + + +@server.rtc_session(agent_name="choir-mixer") +async def choir_mixer_agent(ctx: agents.JobContext): + """Choir mixer agent: subscribes to participant audio, mixes, and publishes.""" + room = ctx.room + + # Track active audio streams: {track_sid: AudioStream} + audio_streams: dict[str, rtc.AudioStream] = {} + # Latest frame per track: {track_sid: (timestamp, np.ndarray)} + latest_frames: dict[str, tuple[float, np.ndarray]] = {} + lock = asyncio.Lock() + + # Set up audio output + source = rtc.AudioSource(SAMPLE_RATE, NUM_CHANNELS) + track = rtc.LocalAudioTrack.create_audio_track("choir_mix", source) + options = rtc.TrackPublishOptions() + options.source = rtc.TrackSource.SOURCE_MICROPHONE + await ctx.connect(auto_subscribe=agents.AutoSubscribe.AUDIO_ONLY) + await room.local_participant.publish_track(track, options) + logger.info("Choir mixer joined room: %s", room.name) + + async def read_track(track_sid: str, stream: rtc.AudioStream): + """Continuously read frames from one participant's audio stream.""" + try: + async for event in stream: + frame = event.frame + # Convert int16 PCM to float32 [-1.0, 1.0] + pcm = np.frombuffer(frame.data, dtype=np.int16).astype(np.float32) / 32768.0 + async with lock: + latest_frames[track_sid] = (time.monotonic(), pcm) + except Exception as e: + logger.warning("Stream read error for %s: %s", track_sid, e) + finally: + async with lock: + latest_frames.pop(track_sid, None) + audio_streams.pop(track_sid, None) + logger.info("Stream ended: %s (active: %d)", track_sid, len(audio_streams)) + + @room.on("track_subscribed") + def on_track_subscribed( + subscribed_track: rtc.Track, + publication: rtc.RemoteTrackPublication, + participant: rtc.RemoteParticipant, + ): + if subscribed_track.kind != rtc.TrackKind.KIND_AUDIO: + return + if len(audio_streams) >= MAX_STREAMS: + logger.info("At max streams (%d), ignoring track from %s", + MAX_STREAMS, participant.identity) + return + + sid = subscribed_track.sid + stream = rtc.AudioStream( + subscribed_track, + sample_rate=SAMPLE_RATE, + num_channels=NUM_CHANNELS, + ) + audio_streams[sid] = stream + asyncio.create_task(read_track(sid, stream)) + logger.info("Subscribed to %s from %s (active: %d)", + sid, participant.identity, len(audio_streams)) + + @room.on("track_unsubscribed") + def on_track_unsubscribed( + unsubscribed_track: rtc.Track, + publication: rtc.RemoteTrackPublication, + participant: rtc.RemoteParticipant, + ): + sid = unsubscribed_track.sid + stream = audio_streams.get(sid) + if stream: + asyncio.create_task(stream.aclose()) + logger.info("Unsubscribed from %s (%s)", sid, participant.identity) + + # Mixing loop: runs at frame rate (~20ms intervals) + async def mixing_loop(): + while True: + now = time.monotonic() + async with lock: + # Only use frames that arrived recently (discard stale ones) + frames = [ + pcm for ts, pcm in latest_frames.values() + if now - ts < FRAME_MAX_AGE_S + ] + + if frames: + # Normalize each stream, mix, and limit + normalized = [normalize_rms(f, TARGET_DBFS) for f in frames] + mixed = mix_streams(normalized) + limited = soft_limit(mixed) + else: + limited = np.zeros(SAMPLES_PER_FRAME, dtype=np.float32) + + # Convert float32 back to int16 PCM + pcm_int16 = (limited * 32767).astype(np.int16) + audio_frame = rtc.AudioFrame( + data=pcm_int16.tobytes(), + sample_rate=SAMPLE_RATE, + num_channels=NUM_CHANNELS, + samples_per_channel=len(pcm_int16), + ) + await source.capture_frame(audio_frame) + + await asyncio.sleep(FRAME_DURATION_MS / 1000) + + # Start mixing loop + mix_task = asyncio.create_task(mixing_loop()) + + # Keep agent alive until mixing_loop exits (room disconnect cancels it) + try: + await mix_task + except asyncio.CancelledError: + pass + finally: + for stream in list(audio_streams.values()): + await stream.aclose() + logger.info("Choir mixer exiting room: %s", room.name) + + +if __name__ == "__main__": + agents.cli.run_app(server) diff --git a/choir-mixer/mixer.py b/choir-mixer/mixer.py new file mode 100644 index 0000000..d8c31a9 --- /dev/null +++ b/choir-mixer/mixer.py @@ -0,0 +1,61 @@ +import numpy as np + +SAMPLES_PER_FRAME = 960 # 48kHz * 20ms + + +def normalize_rms(samples: np.ndarray, target_dbfs: float = -20.0) -> np.ndarray: + """Scale audio samples so their RMS matches target_dbfs. + + Args: + samples: float32 PCM audio samples + target_dbfs: target loudness in dBFS (e.g. -20.0) + + Returns: + Scaled float32 samples at target loudness. Silence is returned as-is. + """ + rms = np.sqrt(np.mean(samples ** 2)) + if rms < 1e-10: + return samples + + # Noise gate: don't amplify signals below -40 dBFS (background noise, keyboard clicks) + rms_dbfs = 20 * np.log10(rms + 1e-10) + if rms_dbfs < -40.0: + return np.zeros_like(samples) + + target_rms = 10 ** (target_dbfs / 20) + gain = target_rms / rms + return (samples * gain).astype(np.float32) + + +def mix_streams(streams: list[np.ndarray]) -> np.ndarray: + """Sum multiple audio streams into one. + + Args: + streams: list of float32 PCM arrays (same sample rate, possibly different lengths) + + Returns: + Summed float32 array. Length matches the shortest stream. + Empty list returns silence of SAMPLES_PER_FRAME length. + """ + if not streams: + return np.zeros(SAMPLES_PER_FRAME, dtype=np.float32) + + min_len = min(s.shape[0] for s in streams) + trimmed = [s[:min_len] for s in streams] + return np.sum(trimmed, axis=0).astype(np.float32) + + +def soft_limit(samples: np.ndarray) -> np.ndarray: + """Apply tanh-based soft limiter to prevent clipping. + + Gentle saturation: signals below ~0.5 pass nearly unchanged, + signals approaching 1.0 are compressed, signals above 1.0 are + smoothly clamped. + + Args: + samples: float32 PCM audio samples + + Returns: + Limited float32 samples guaranteed within [-1.0, 1.0]. + """ + return np.tanh(samples).astype(np.float32) diff --git a/choir-mixer/requirements.txt b/choir-mixer/requirements.txt new file mode 100644 index 0000000..18bda5c --- /dev/null +++ b/choir-mixer/requirements.txt @@ -0,0 +1,3 @@ +livekit-agents>=1.0.0 +livekit>=1.0.0 +numpy>=1.26.0 diff --git a/choir-mixer/tests/conftest.py b/choir-mixer/tests/conftest.py new file mode 100644 index 0000000..b8392f8 --- /dev/null +++ b/choir-mixer/tests/conftest.py @@ -0,0 +1,31 @@ +import numpy as np +import pytest + +SAMPLE_RATE = 48000 +FRAME_DURATION_MS = 20 +SAMPLES_PER_FRAME = SAMPLE_RATE * FRAME_DURATION_MS // 1000 # 960 + +@pytest.fixture +def silence(): + """960 samples of silence.""" + return np.zeros(SAMPLES_PER_FRAME, dtype=np.float32) + +@pytest.fixture +def quiet_tone(): + """960 samples of a quiet 440Hz sine wave at -40 dBFS.""" + t = np.arange(SAMPLES_PER_FRAME, dtype=np.float32) / SAMPLE_RATE + amplitude = 10 ** (-40 / 20) # ~0.01 + return (amplitude * np.sin(2 * np.pi * 440 * t)).astype(np.float32) + +@pytest.fixture +def loud_tone(): + """960 samples of a loud 440Hz sine wave at -6 dBFS.""" + t = np.arange(SAMPLES_PER_FRAME, dtype=np.float32) / SAMPLE_RATE + amplitude = 10 ** (-6 / 20) # ~0.5 + return (amplitude * np.sin(2 * np.pi * 440 * t)).astype(np.float32) + +@pytest.fixture +def clipping_tone(): + """960 samples of a sine wave that exceeds [-1, 1] range.""" + t = np.arange(SAMPLES_PER_FRAME, dtype=np.float32) / SAMPLE_RATE + return (1.5 * np.sin(2 * np.pi * 440 * t)).astype(np.float32) diff --git a/choir-mixer/tests/test_mixer.py b/choir-mixer/tests/test_mixer.py new file mode 100644 index 0000000..ae7e7fa --- /dev/null +++ b/choir-mixer/tests/test_mixer.py @@ -0,0 +1,66 @@ +import numpy as np +from mixer import normalize_rms, mix_streams, soft_limit + +TARGET_DBFS = -20 + + +class TestNormalizeRms: + def test_silence_stays_silent(self, silence): + result = normalize_rms(silence, TARGET_DBFS) + assert np.allclose(result, 0.0) + + def test_quiet_tone_gets_louder(self, quiet_tone): + original_rms = np.sqrt(np.mean(quiet_tone ** 2)) + result = normalize_rms(quiet_tone, TARGET_DBFS) + result_rms = np.sqrt(np.mean(result ** 2)) + assert result_rms > original_rms + + def test_loud_tone_gets_quieter(self, loud_tone): + original_rms = np.sqrt(np.mean(loud_tone ** 2)) + result = normalize_rms(loud_tone, TARGET_DBFS) + result_rms = np.sqrt(np.mean(result ** 2)) + assert result_rms < original_rms + + def test_normalized_to_target(self, loud_tone): + result = normalize_rms(loud_tone, TARGET_DBFS) + result_rms = np.sqrt(np.mean(result ** 2)) + result_dbfs = 20 * np.log10(result_rms + 1e-10) + assert abs(result_dbfs - TARGET_DBFS) < 1.0 # within 1 dB + + +class TestMixStreams: + def test_single_stream_unchanged(self, loud_tone): + result = mix_streams([loud_tone]) + assert np.allclose(result, loud_tone) + + def test_two_streams_summed(self, loud_tone): + result = mix_streams([loud_tone, loud_tone]) + # Two identical streams summed should be louder + assert np.max(np.abs(result)) > np.max(np.abs(loud_tone)) + + def test_empty_list_returns_silence(self, silence): + result = mix_streams([]) + assert result.shape[0] == 960 + assert np.allclose(result, 0.0) + + def test_different_lengths_uses_shortest(self): + short = np.ones(480, dtype=np.float32) * 0.5 + long = np.ones(960, dtype=np.float32) * 0.5 + result = mix_streams([short, long]) + assert result.shape[0] == 480 + + +class TestSoftLimit: + def test_quiet_signal_unchanged(self, quiet_tone): + result = soft_limit(quiet_tone) + assert np.allclose(result, quiet_tone, atol=0.001) + + def test_clipping_signal_contained(self, clipping_tone): + result = soft_limit(clipping_tone) + assert np.max(np.abs(result)) <= 1.0 + + def test_preserves_sign(self, clipping_tone): + result = soft_limit(clipping_tone) + # Signs should match where input is non-zero + nonzero = np.abs(clipping_tone) > 0.01 + assert np.all(np.sign(result[nonzero]) == np.sign(clipping_tone[nonzero])) diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..95b0469 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,24 @@ +version: "3.9" +services: + livekit: + image: livekit/livekit-server:latest + restart: unless-stopped + ports: + - "7880:7880" + - "7881:7881" + - "50000-50100:50000-50100/udp" + volumes: + - ./livekit.yaml:/etc/livekit.yaml + command: --config /etc/livekit.yaml + + choir-mixer: + build: + context: ./choir-mixer + dockerfile: Dockerfile + environment: + - LIVEKIT_URL=ws://livekit:7880 + - LIVEKIT_API_KEY=${LIVEKIT_API_KEY} + - LIVEKIT_API_SECRET=${LIVEKIT_API_SECRET} + depends_on: + - livekit + restart: unless-stopped diff --git a/livekit.yaml b/livekit.yaml new file mode 100644 index 0000000..36cf386 --- /dev/null +++ b/livekit.yaml @@ -0,0 +1,15 @@ +port: 7880 +rtc: + tcp_port: 7881 + port_range_start: 50000 + port_range_end: 50100 + use_external_ip: false + node_ip: 192.168.0.241 + stun_servers: [] +keys: + 6c0e48f597799858028a4d7c88bdf830: cb43b9cc77ea24378af12cf3c4a0e52e7de267ca0d9d33ac7919354685e56e8d +logging: + level: info +room: + empty_timeout: 300 + max_participants: 50