#!/usr/bin/env python3 """ lxst-relay - Group voice channel bridge for Reticulum Telephony. Allows multiple lxst users to call into a shared voice channel and talk together in a conference bridge style. Audio from each participant is mixed and redistributed to all others. """ import RNS import os import time import signal as signal_module import threading import numpy as np import argparse from LXST import Pipeline from LXST.Network import SignallingReceiver, Packetizer, LinkSource from LXST.Sinks import LocalSink from LXST.Sources import LocalSource from LXST.Codecs import Opus from LXST.Codecs.Codec import Codec, CodecError from LXST.Primitives.Telephony import Signalling APP_NAME = "lxst" PRIMITIVE_NAME = "telephony" FIELD_SIGNALLING = 0x00 FIELD_FRAMES = 0x01 class FrameCaptureSink(LocalSink): """Stores the latest decoded float32 audio frame from a participant.""" def __init__(self, participant): self.participant = participant self.samplerate = 48000 self.channels = 1 def handle_frame(self, frame, source, decoded=True): with self.participant.frame_lock: self.participant.latest_frame = frame class ConferenceSource(LocalSource): """A source that periodically outputs the mixed audio of all channel participants except the one identified by *exclude_link_id*.""" def __init__(self, channel, exclude_link_id, frame_time_ms=60): self.channel = channel self.exclude_link_id = exclude_link_id self.frame_time_ms = frame_time_ms self.frame_time = frame_time_ms / 1000.0 self.should_run = False self._sink = None self._codec = None self.samplerate = 48000 self.channels = 1 self.mixer_thread = None @property def sink(self): return self._sink @sink.setter def sink(self, sink): self._sink = sink @property def codec(self): return self._codec @codec.setter def codec(self, codec): if codec is None: self._codec = None elif not issubclass(type(codec), Codec): raise CodecError(f"Invalid codec specified for {self}") else: self._codec = codec def start(self): if not self.should_run: self.should_run = True self.mixer_thread = threading.Thread(target=self._run, daemon=True) self.mixer_thread.start() def stop(self): self.should_run = False def _run(self): while self.should_run: loop_start = time.time() if self._sink: mixed = self.channel.get_mixed_frame( exclude=self.exclude_link_id ) if mixed is not None and self._sink.can_receive(): if self._codec: encoded = self._codec.encode(mixed) self._sink.handle_frame(encoded, self) elapsed = time.time() - loop_start sleep_time = max(0, self.frame_time - elapsed) time.sleep(sleep_time) class Participant: """A remote caller connected to the relay.""" def __init__(self, link, identity): self.link = link self.identity = identity self.identity_hash = identity.hash self.link_source = None self.packetizer = None self.conf_source = None self.conf_pipeline = None self.latest_frame = None self.frame_lock = threading.Lock() self.joined_at = time.time() self.ended = False class VoiceChannel: """A group voice channel that mixes audio from all participants. Each participant receives a mix of every *other* participant (their own audio is excluded so they don't hear themselves). """ def __init__(self, name, frame_time_ms=60): self.name = name self.participants = {} self.frame_time_ms = frame_time_ms self.frame_time = frame_time_ms / 1000.0 self.lock = threading.Lock() def add_participant(self, participant, relay=None): with self.lock: link_id = participant.link.link_id self.participants[link_id] = participant capture_sink = FrameCaptureSink(participant) link_source = LinkSource( link=participant.link, signalling_receiver=relay, sink=capture_sink, ) participant.link_source = link_source outgoing_codec = Opus(profile=Opus.PROFILE_VOICE_MEDIUM) packetizer = Packetizer(destination=participant.link) participant.packetizer = packetizer conf_source = ConferenceSource( channel=self, exclude_link_id=link_id, frame_time_ms=self.frame_time_ms, ) participant.conf_source = conf_source conf_pipeline = Pipeline( source=conf_source, codec=outgoing_codec, sink=packetizer, ) participant.conf_pipeline = conf_pipeline link_source.start() conf_pipeline.start() RNS.log( f"Participant {RNS.prettyhexrep(participant.identity_hash)} " f"joined channel '{self.name}' " f"({len(self.participants)} participant(s))", RNS.LOG_NOTICE, ) def remove_participant(self, link_id): with self.lock: p = self.participants.pop(link_id, None) if p is None: return p.ended = True if p.conf_source: p.conf_source.stop() if p.link_source: p.link_source.stop() if p.link and p.link.status == RNS.Link.ACTIVE: p.link.teardown() count = len(self.participants) RNS.log( f"Participant left channel '{self.name}' " f"({count} participant(s) remaining)", RNS.LOG_NOTICE, ) def get_participant_count(self): with self.lock: return len(self.participants) def get_mixed_frame(self, exclude=None): """Sum the latest frame from every participant except *exclude*.""" with self.lock: frames = [] for lid, p in self.participants.items(): if exclude is not None and lid == exclude: continue with p.frame_lock: if p.latest_frame is not None: frames.append(p.latest_frame) if not frames: return None if len(frames) == 1: return frames[0] min_len = min(f.shape[0] for f in frames) trimmed = [f[:min_len] for f in frames] mixed = np.sum(trimmed, axis=0) mixed = np.clip(mixed, -1.0, 1.0) return mixed class LXSTRelay(SignallingReceiver): """Conference bridge for Reticulum Telephony. Registers as a standard ``lxst.telephony`` destination so any lxst client can call in. Callers are placed into a shared ``VoiceChannel`` and their audio is mixed together. """ def __init__( self, configdir=None, rnsconfigdir=None, channel_name="default", verbosity=0, ): super().__init__() self.configdir = configdir self.channel_name = channel_name self.should_run = False if self.configdir is None: if os.path.isdir("/etc/lxst-relay"): self.configdir = "/etc/lxst-relay" else: userdir = RNS.Reticulum.userdir self.configdir = os.path.join(userdir, ".lxst-relay") os.makedirs(self.configdir, exist_ok=True) identity_path = os.path.join(self.configdir, "identity") if os.path.isfile(identity_path): self.identity = RNS.Identity.from_file(identity_path) if self.identity is None: RNS.log( "Could not load identity from file, creating new", RNS.LOG_ERROR, ) self.identity = RNS.Identity() self.identity.to_file(identity_path) else: RNS.log("No identity found, generating new", RNS.LOG_NOTICE) self.identity = RNS.Identity() self.identity.to_file(identity_path) RNS.log( f"Generated identity: " f"{RNS.prettyhexrep(self.identity.hash)}", RNS.LOG_NOTICE, ) self.reticulum = RNS.Reticulum( configdir=rnsconfigdir, loglevel=3 + verbosity ) self.destination = RNS.Destination( self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, PRIMITIVE_NAME, ) self.destination.set_proof_strategy(RNS.Destination.PROVE_NONE) self.destination.set_link_established_callback( self._incoming_link_established ) self.channel = VoiceChannel(name=channel_name) RNS.log( f"Relay listening on " f"{RNS.prettyhexrep(self.destination.hash)}", RNS.LOG_NOTICE, ) # ------------------------------------------------------------------ # Link lifecycle # ------------------------------------------------------------------ def _incoming_link_established(self, link): link.is_incoming = True link.is_outgoing = False link.answered = False link.is_terminating = False link.set_remote_identified_callback(self._caller_identified) link.set_link_closed_callback(self._link_closed) self.signal(Signalling.STATUS_AVAILABLE, link) RNS.log("Incoming link, awaiting identification", RNS.LOG_DEBUG) def _caller_identified(self, link, identity): RNS.log( f"Caller identified: {RNS.prettyhexrep(identity.hash)}", RNS.LOG_NOTICE, ) participant = Participant(link, identity) self.signal(Signalling.STATUS_RINGING, link) time.sleep(0.5) self.signal(Signalling.STATUS_CONNECTING, link) self.channel.add_participant(participant, relay=self) self.signal(Signalling.STATUS_ESTABLISHED, link) RNS.log( f"Call established with " f"{RNS.prettyhexrep(identity.hash)}, " f"{self.channel.get_participant_count()} participant(s) in " f"'{self.channel_name}'", RNS.LOG_NOTICE, ) def _link_closed(self, link): RNS.log(f"Link closed: {link}", RNS.LOG_DEBUG) self.channel.remove_participant(link.link_id) # ------------------------------------------------------------------ # SignallingReceiver # ------------------------------------------------------------------ def signalling_received(self, signals, source): for signal in signals: if signal >= Signalling.PREFERRED_PROFILE: profile = signal - Signalling.PREFERRED_PROFILE RNS.log( f"Received preferred profile: 0x{profile:02x}", RNS.LOG_DEBUG, ) else: RNS.log( f"Received signal: 0x{signal:02x}", RNS.LOG_DEBUG, ) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def announce(self): self.destination.announce() RNS.log("Relay announced on network", RNS.LOG_NOTICE) def start(self): if self.should_run: return signal_module.signal(signal_module.SIGINT, self._sigint_handler) signal_module.signal(signal_module.SIGTERM, self._sigterm_handler) self.should_run = True self.announce() RNS.log( f"LXST Relay ready on channel '{self.channel_name}'", RNS.LOG_NOTICE, ) RNS.log( f"Identity: {RNS.prettyhexrep(self.identity.hash)}", RNS.LOG_NOTICE, ) RNS.log("Waiting for incoming calls...", RNS.LOG_NOTICE) while self.should_run: time.sleep(1) def stop(self): self.should_run = False RNS.log("Relay stopping...", RNS.LOG_NOTICE) # ------------------------------------------------------------------ # Signal handlers # ------------------------------------------------------------------ def _sigint_handler(self, sig, frame): self.stop() def _sigterm_handler(self, sig, frame): self.stop() def main(): parser = argparse.ArgumentParser( description=( "LXST Relay - Group voice channel bridge for " "Reticulum Telephony" ) ) parser.add_argument( "--config", "-c", default=None, help="path to configuration directory", type=str, ) parser.add_argument( "--rnsconfig", default=None, help="path to alternative Reticulum config directory", type=str, ) parser.add_argument( "--channel", "-n", default="default", help="voice channel name (default: 'default')", type=str, ) parser.add_argument( "--version", action="version", version="lxst-relay 0.1.0", ) parser.add_argument( "-v", "--verbose", action="count", default=0 ) args = parser.parse_args() relay = LXSTRelay( configdir=args.config, rnsconfigdir=args.rnsconfig, channel_name=args.channel, verbosity=args.verbose, ) try: relay.start() except KeyboardInterrupt: relay.stop() print("") if __name__ == "__main__": main()