commit f073ebe106596460fb49346802a162c1fa72f99e Author: Gabby Morgan Date: Wed May 27 00:21:58 2026 -0500 init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ed8ebf5 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..7ba5f98 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# lxst-relay diff --git a/lxst-relay b/lxst-relay new file mode 100755 index 0000000..3591a65 --- /dev/null +++ b/lxst-relay @@ -0,0 +1,2 @@ +#!/bin/sh +exec python3 "$(dirname "$0")/lxst_relay.py" "$@" diff --git a/lxst_relay.py b/lxst_relay.py new file mode 100644 index 0000000..c209147 --- /dev/null +++ b/lxst_relay.py @@ -0,0 +1,461 @@ +#!/usr/bin/env python3 +""" +lxst-relay - Group voice channel bridge for Reticulum Telephony. + +Allows multiple lxst users to call into a shared voice channel +and talk together in a conference bridge style. Audio from each +participant is mixed and redistributed to all others. +""" + +import RNS +import os +import time +import signal as signal_module +import threading +import numpy as np +import argparse + +from LXST import Pipeline +from LXST.Network import SignallingReceiver, Packetizer, LinkSource +from LXST.Sinks import LocalSink +from LXST.Sources import LocalSource +from LXST.Codecs import Opus +from LXST.Codecs.Codec import Codec, CodecError +from LXST.Primitives.Telephony import Signalling + +APP_NAME = "lxst" +PRIMITIVE_NAME = "telephony" + +FIELD_SIGNALLING = 0x00 +FIELD_FRAMES = 0x01 + + +class FrameCaptureSink(LocalSink): + """Stores the latest decoded float32 audio frame from a participant.""" + + def __init__(self, participant): + self.participant = participant + self.samplerate = 48000 + self.channels = 1 + + def handle_frame(self, frame, source, decoded=True): + with self.participant.frame_lock: + self.participant.latest_frame = frame + + +class ConferenceSource(LocalSource): + """A source that periodically outputs the mixed audio of all channel + participants except the one identified by *exclude_link_id*.""" + + def __init__(self, channel, exclude_link_id, frame_time_ms=60): + self.channel = channel + self.exclude_link_id = exclude_link_id + self.frame_time_ms = frame_time_ms + self.frame_time = frame_time_ms / 1000.0 + self.should_run = False + self._sink = None + self._codec = None + self.samplerate = 48000 + self.channels = 1 + self.mixer_thread = None + + @property + def sink(self): + return self._sink + + @sink.setter + def sink(self, sink): + self._sink = sink + + @property + def codec(self): + return self._codec + + @codec.setter + def codec(self, codec): + if codec is None: + self._codec = None + elif not issubclass(type(codec), Codec): + raise CodecError(f"Invalid codec specified for {self}") + else: + self._codec = codec + + def start(self): + if not self.should_run: + self.should_run = True + self.mixer_thread = threading.Thread(target=self._run, daemon=True) + self.mixer_thread.start() + + def stop(self): + self.should_run = False + + def _run(self): + while self.should_run: + loop_start = time.time() + if self._sink: + mixed = self.channel.get_mixed_frame( + exclude=self.exclude_link_id + ) + if mixed is not None and self._sink.can_receive(): + if self._codec: + encoded = self._codec.encode(mixed) + self._sink.handle_frame(encoded, self) + elapsed = time.time() - loop_start + sleep_time = max(0, self.frame_time - elapsed) + time.sleep(sleep_time) + + +class Participant: + """A remote caller connected to the relay.""" + + def __init__(self, link, identity): + self.link = link + self.identity = identity + self.identity_hash = identity.hash + self.link_source = None + self.packetizer = None + self.conf_source = None + self.conf_pipeline = None + self.latest_frame = None + self.frame_lock = threading.Lock() + self.joined_at = time.time() + self.ended = False + + +class VoiceChannel: + """A group voice channel that mixes audio from all participants. + + Each participant receives a mix of every *other* participant + (their own audio is excluded so they don't hear themselves). + """ + + def __init__(self, name, frame_time_ms=60): + self.name = name + self.participants = {} + self.frame_time_ms = frame_time_ms + self.frame_time = frame_time_ms / 1000.0 + self.lock = threading.Lock() + + def add_participant(self, participant, relay=None): + with self.lock: + link_id = participant.link.link_id + self.participants[link_id] = participant + + capture_sink = FrameCaptureSink(participant) + link_source = LinkSource( + link=participant.link, + signalling_receiver=relay, + sink=capture_sink, + ) + participant.link_source = link_source + + outgoing_codec = Opus(profile=Opus.PROFILE_VOICE_MEDIUM) + packetizer = Packetizer(destination=participant.link) + participant.packetizer = packetizer + + conf_source = ConferenceSource( + channel=self, + exclude_link_id=link_id, + frame_time_ms=self.frame_time_ms, + ) + participant.conf_source = conf_source + + conf_pipeline = Pipeline( + source=conf_source, + codec=outgoing_codec, + sink=packetizer, + ) + participant.conf_pipeline = conf_pipeline + + link_source.start() + conf_pipeline.start() + + RNS.log( + f"Participant {RNS.prettyhexrep(participant.identity_hash)} " + f"joined channel '{self.name}' " + f"({len(self.participants)} participant(s))", + RNS.LOG_NOTICE, + ) + + def remove_participant(self, link_id): + with self.lock: + p = self.participants.pop(link_id, None) + if p is None: + return + p.ended = True + if p.conf_source: + p.conf_source.stop() + if p.link_source: + p.link_source.stop() + if p.link and p.link.status == RNS.Link.ACTIVE: + p.link.teardown() + count = len(self.participants) + RNS.log( + f"Participant left channel '{self.name}' " + f"({count} participant(s) remaining)", + RNS.LOG_NOTICE, + ) + + def get_participant_count(self): + with self.lock: + return len(self.participants) + + def get_mixed_frame(self, exclude=None): + """Sum the latest frame from every participant except *exclude*.""" + with self.lock: + frames = [] + for lid, p in self.participants.items(): + if exclude is not None and lid == exclude: + continue + with p.frame_lock: + if p.latest_frame is not None: + frames.append(p.latest_frame) + + if not frames: + return None + + if len(frames) == 1: + return frames[0] + + min_len = min(f.shape[0] for f in frames) + trimmed = [f[:min_len] for f in frames] + mixed = np.sum(trimmed, axis=0) + mixed = np.clip(mixed, -1.0, 1.0) + return mixed + + +class LXSTRelay(SignallingReceiver): + """Conference bridge for Reticulum Telephony. + + Registers as a standard ``lxst.telephony`` destination so any + lxst client can call in. Callers are placed into a shared + ``VoiceChannel`` and their audio is mixed together. + """ + + def __init__( + self, + configdir=None, + rnsconfigdir=None, + channel_name="default", + verbosity=0, + ): + super().__init__() + self.configdir = configdir + self.channel_name = channel_name + self.should_run = False + + if self.configdir is None: + if os.path.isdir("/etc/lxst-relay"): + self.configdir = "/etc/lxst-relay" + else: + userdir = RNS.Reticulum.userdir + self.configdir = os.path.join(userdir, ".lxst-relay") + + os.makedirs(self.configdir, exist_ok=True) + + identity_path = os.path.join(self.configdir, "identity") + if os.path.isfile(identity_path): + self.identity = RNS.Identity.from_file(identity_path) + if self.identity is None: + RNS.log( + "Could not load identity from file, creating new", + RNS.LOG_ERROR, + ) + self.identity = RNS.Identity() + self.identity.to_file(identity_path) + else: + RNS.log("No identity found, generating new", RNS.LOG_NOTICE) + self.identity = RNS.Identity() + self.identity.to_file(identity_path) + RNS.log( + f"Generated identity: " + f"{RNS.prettyhexrep(self.identity.hash)}", + RNS.LOG_NOTICE, + ) + + self.reticulum = RNS.Reticulum( + configdir=rnsconfigdir, loglevel=3 + verbosity + ) + + self.destination = RNS.Destination( + self.identity, + RNS.Destination.IN, + RNS.Destination.SINGLE, + APP_NAME, + PRIMITIVE_NAME, + ) + self.destination.set_proof_strategy(RNS.Destination.PROVE_NONE) + self.destination.set_link_established_callback( + self._incoming_link_established + ) + + self.channel = VoiceChannel(name=channel_name) + + RNS.log( + f"Relay listening on " + f"{RNS.prettyhexrep(self.destination.hash)}", + RNS.LOG_NOTICE, + ) + + # ------------------------------------------------------------------ + # Link lifecycle + # ------------------------------------------------------------------ + + def _incoming_link_established(self, link): + link.is_incoming = True + link.is_outgoing = False + link.answered = False + link.is_terminating = False + + link.set_remote_identified_callback(self._caller_identified) + link.set_link_closed_callback(self._link_closed) + + self.signal(Signalling.STATUS_AVAILABLE, link) + + RNS.log("Incoming link, awaiting identification", RNS.LOG_DEBUG) + + def _caller_identified(self, link, identity): + RNS.log( + f"Caller identified: {RNS.prettyhexrep(identity.hash)}", + RNS.LOG_NOTICE, + ) + + participant = Participant(link, identity) + + self.signal(Signalling.STATUS_RINGING, link) + time.sleep(0.5) + + self.signal(Signalling.STATUS_CONNECTING, link) + self.channel.add_participant(participant, relay=self) + self.signal(Signalling.STATUS_ESTABLISHED, link) + + RNS.log( + f"Call established with " + f"{RNS.prettyhexrep(identity.hash)}, " + f"{self.channel.get_participant_count()} participant(s) in " + f"'{self.channel_name}'", + RNS.LOG_NOTICE, + ) + + def _link_closed(self, link): + RNS.log(f"Link closed: {link}", RNS.LOG_DEBUG) + self.channel.remove_participant(link.link_id) + + # ------------------------------------------------------------------ + # SignallingReceiver + # ------------------------------------------------------------------ + + def signalling_received(self, signals, source): + for signal in signals: + if signal >= Signalling.PREFERRED_PROFILE: + profile = signal - Signalling.PREFERRED_PROFILE + RNS.log( + f"Received preferred profile: 0x{profile:02x}", + RNS.LOG_DEBUG, + ) + else: + RNS.log( + f"Received signal: 0x{signal:02x}", + RNS.LOG_DEBUG, + ) + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def announce(self): + self.destination.announce() + RNS.log("Relay announced on network", RNS.LOG_NOTICE) + + def start(self): + if self.should_run: + return + signal_module.signal(signal_module.SIGINT, self._sigint_handler) + signal_module.signal(signal_module.SIGTERM, self._sigterm_handler) + + self.should_run = True + self.announce() + + RNS.log( + f"LXST Relay ready on channel '{self.channel_name}'", + RNS.LOG_NOTICE, + ) + RNS.log( + f"Identity: {RNS.prettyhexrep(self.identity.hash)}", + RNS.LOG_NOTICE, + ) + RNS.log("Waiting for incoming calls...", RNS.LOG_NOTICE) + + while self.should_run: + time.sleep(1) + + def stop(self): + self.should_run = False + RNS.log("Relay stopping...", RNS.LOG_NOTICE) + + # ------------------------------------------------------------------ + # Signal handlers + # ------------------------------------------------------------------ + + def _sigint_handler(self, sig, frame): + self.stop() + + def _sigterm_handler(self, sig, frame): + self.stop() + + +def main(): + parser = argparse.ArgumentParser( + description=( + "LXST Relay - Group voice channel bridge for " + "Reticulum Telephony" + ) + ) + + parser.add_argument( + "--config", + "-c", + default=None, + help="path to configuration directory", + type=str, + ) + parser.add_argument( + "--rnsconfig", + default=None, + help="path to alternative Reticulum config directory", + type=str, + ) + parser.add_argument( + "--channel", + "-n", + default="default", + help="voice channel name (default: 'default')", + type=str, + ) + parser.add_argument( + "--version", + action="version", + version="lxst-relay 0.1.0", + ) + parser.add_argument( + "-v", "--verbose", action="count", default=0 + ) + + args = parser.parse_args() + + relay = LXSTRelay( + configdir=args.config, + rnsconfigdir=args.rnsconfig, + channel_name=args.channel, + verbosity=args.verbose, + ) + + try: + relay.start() + except KeyboardInterrupt: + relay.stop() + print("") + + +if __name__ == "__main__": + main()