Files
lxst-relay/lxst_relay.py
Gabby Morgan f073ebe106 init
2026-05-27 00:21:58 -05:00

462 lines
14 KiB
Python

#!/usr/bin/env python3
"""
lxst-relay - Group voice channel bridge for Reticulum Telephony.
Allows multiple lxst users to call into a shared voice channel
and talk together in a conference bridge style. Audio from each
participant is mixed and redistributed to all others.
"""
import RNS
import os
import time
import signal as signal_module
import threading
import numpy as np
import argparse
from LXST import Pipeline
from LXST.Network import SignallingReceiver, Packetizer, LinkSource
from LXST.Sinks import LocalSink
from LXST.Sources import LocalSource
from LXST.Codecs import Opus
from LXST.Codecs.Codec import Codec, CodecError
from LXST.Primitives.Telephony import Signalling
APP_NAME = "lxst"
PRIMITIVE_NAME = "telephony"
FIELD_SIGNALLING = 0x00
FIELD_FRAMES = 0x01
class FrameCaptureSink(LocalSink):
"""Stores the latest decoded float32 audio frame from a participant."""
def __init__(self, participant):
self.participant = participant
self.samplerate = 48000
self.channels = 1
def handle_frame(self, frame, source, decoded=True):
with self.participant.frame_lock:
self.participant.latest_frame = frame
class ConferenceSource(LocalSource):
"""A source that periodically outputs the mixed audio of all channel
participants except the one identified by *exclude_link_id*."""
def __init__(self, channel, exclude_link_id, frame_time_ms=60):
self.channel = channel
self.exclude_link_id = exclude_link_id
self.frame_time_ms = frame_time_ms
self.frame_time = frame_time_ms / 1000.0
self.should_run = False
self._sink = None
self._codec = None
self.samplerate = 48000
self.channels = 1
self.mixer_thread = None
@property
def sink(self):
return self._sink
@sink.setter
def sink(self, sink):
self._sink = sink
@property
def codec(self):
return self._codec
@codec.setter
def codec(self, codec):
if codec is None:
self._codec = None
elif not issubclass(type(codec), Codec):
raise CodecError(f"Invalid codec specified for {self}")
else:
self._codec = codec
def start(self):
if not self.should_run:
self.should_run = True
self.mixer_thread = threading.Thread(target=self._run, daemon=True)
self.mixer_thread.start()
def stop(self):
self.should_run = False
def _run(self):
while self.should_run:
loop_start = time.time()
if self._sink:
mixed = self.channel.get_mixed_frame(
exclude=self.exclude_link_id
)
if mixed is not None and self._sink.can_receive():
if self._codec:
encoded = self._codec.encode(mixed)
self._sink.handle_frame(encoded, self)
elapsed = time.time() - loop_start
sleep_time = max(0, self.frame_time - elapsed)
time.sleep(sleep_time)
class Participant:
"""A remote caller connected to the relay."""
def __init__(self, link, identity):
self.link = link
self.identity = identity
self.identity_hash = identity.hash
self.link_source = None
self.packetizer = None
self.conf_source = None
self.conf_pipeline = None
self.latest_frame = None
self.frame_lock = threading.Lock()
self.joined_at = time.time()
self.ended = False
class VoiceChannel:
"""A group voice channel that mixes audio from all participants.
Each participant receives a mix of every *other* participant
(their own audio is excluded so they don't hear themselves).
"""
def __init__(self, name, frame_time_ms=60):
self.name = name
self.participants = {}
self.frame_time_ms = frame_time_ms
self.frame_time = frame_time_ms / 1000.0
self.lock = threading.Lock()
def add_participant(self, participant, relay=None):
with self.lock:
link_id = participant.link.link_id
self.participants[link_id] = participant
capture_sink = FrameCaptureSink(participant)
link_source = LinkSource(
link=participant.link,
signalling_receiver=relay,
sink=capture_sink,
)
participant.link_source = link_source
outgoing_codec = Opus(profile=Opus.PROFILE_VOICE_MEDIUM)
packetizer = Packetizer(destination=participant.link)
participant.packetizer = packetizer
conf_source = ConferenceSource(
channel=self,
exclude_link_id=link_id,
frame_time_ms=self.frame_time_ms,
)
participant.conf_source = conf_source
conf_pipeline = Pipeline(
source=conf_source,
codec=outgoing_codec,
sink=packetizer,
)
participant.conf_pipeline = conf_pipeline
link_source.start()
conf_pipeline.start()
RNS.log(
f"Participant {RNS.prettyhexrep(participant.identity_hash)} "
f"joined channel '{self.name}' "
f"({len(self.participants)} participant(s))",
RNS.LOG_NOTICE,
)
def remove_participant(self, link_id):
with self.lock:
p = self.participants.pop(link_id, None)
if p is None:
return
p.ended = True
if p.conf_source:
p.conf_source.stop()
if p.link_source:
p.link_source.stop()
if p.link and p.link.status == RNS.Link.ACTIVE:
p.link.teardown()
count = len(self.participants)
RNS.log(
f"Participant left channel '{self.name}' "
f"({count} participant(s) remaining)",
RNS.LOG_NOTICE,
)
def get_participant_count(self):
with self.lock:
return len(self.participants)
def get_mixed_frame(self, exclude=None):
"""Sum the latest frame from every participant except *exclude*."""
with self.lock:
frames = []
for lid, p in self.participants.items():
if exclude is not None and lid == exclude:
continue
with p.frame_lock:
if p.latest_frame is not None:
frames.append(p.latest_frame)
if not frames:
return None
if len(frames) == 1:
return frames[0]
min_len = min(f.shape[0] for f in frames)
trimmed = [f[:min_len] for f in frames]
mixed = np.sum(trimmed, axis=0)
mixed = np.clip(mixed, -1.0, 1.0)
return mixed
class LXSTRelay(SignallingReceiver):
"""Conference bridge for Reticulum Telephony.
Registers as a standard ``lxst.telephony`` destination so any
lxst client can call in. Callers are placed into a shared
``VoiceChannel`` and their audio is mixed together.
"""
def __init__(
self,
configdir=None,
rnsconfigdir=None,
channel_name="default",
verbosity=0,
):
super().__init__()
self.configdir = configdir
self.channel_name = channel_name
self.should_run = False
if self.configdir is None:
if os.path.isdir("/etc/lxst-relay"):
self.configdir = "/etc/lxst-relay"
else:
userdir = RNS.Reticulum.userdir
self.configdir = os.path.join(userdir, ".lxst-relay")
os.makedirs(self.configdir, exist_ok=True)
identity_path = os.path.join(self.configdir, "identity")
if os.path.isfile(identity_path):
self.identity = RNS.Identity.from_file(identity_path)
if self.identity is None:
RNS.log(
"Could not load identity from file, creating new",
RNS.LOG_ERROR,
)
self.identity = RNS.Identity()
self.identity.to_file(identity_path)
else:
RNS.log("No identity found, generating new", RNS.LOG_NOTICE)
self.identity = RNS.Identity()
self.identity.to_file(identity_path)
RNS.log(
f"Generated identity: "
f"{RNS.prettyhexrep(self.identity.hash)}",
RNS.LOG_NOTICE,
)
self.reticulum = RNS.Reticulum(
configdir=rnsconfigdir, loglevel=3 + verbosity
)
self.destination = RNS.Destination(
self.identity,
RNS.Destination.IN,
RNS.Destination.SINGLE,
APP_NAME,
PRIMITIVE_NAME,
)
self.destination.set_proof_strategy(RNS.Destination.PROVE_NONE)
self.destination.set_link_established_callback(
self._incoming_link_established
)
self.channel = VoiceChannel(name=channel_name)
RNS.log(
f"Relay listening on "
f"{RNS.prettyhexrep(self.destination.hash)}",
RNS.LOG_NOTICE,
)
# ------------------------------------------------------------------
# Link lifecycle
# ------------------------------------------------------------------
def _incoming_link_established(self, link):
link.is_incoming = True
link.is_outgoing = False
link.answered = False
link.is_terminating = False
link.set_remote_identified_callback(self._caller_identified)
link.set_link_closed_callback(self._link_closed)
self.signal(Signalling.STATUS_AVAILABLE, link)
RNS.log("Incoming link, awaiting identification", RNS.LOG_DEBUG)
def _caller_identified(self, link, identity):
RNS.log(
f"Caller identified: {RNS.prettyhexrep(identity.hash)}",
RNS.LOG_NOTICE,
)
participant = Participant(link, identity)
self.signal(Signalling.STATUS_RINGING, link)
time.sleep(0.5)
self.signal(Signalling.STATUS_CONNECTING, link)
self.channel.add_participant(participant, relay=self)
self.signal(Signalling.STATUS_ESTABLISHED, link)
RNS.log(
f"Call established with "
f"{RNS.prettyhexrep(identity.hash)}, "
f"{self.channel.get_participant_count()} participant(s) in "
f"'{self.channel_name}'",
RNS.LOG_NOTICE,
)
def _link_closed(self, link):
RNS.log(f"Link closed: {link}", RNS.LOG_DEBUG)
self.channel.remove_participant(link.link_id)
# ------------------------------------------------------------------
# SignallingReceiver
# ------------------------------------------------------------------
def signalling_received(self, signals, source):
for signal in signals:
if signal >= Signalling.PREFERRED_PROFILE:
profile = signal - Signalling.PREFERRED_PROFILE
RNS.log(
f"Received preferred profile: 0x{profile:02x}",
RNS.LOG_DEBUG,
)
else:
RNS.log(
f"Received signal: 0x{signal:02x}",
RNS.LOG_DEBUG,
)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def announce(self):
self.destination.announce()
RNS.log("Relay announced on network", RNS.LOG_NOTICE)
def start(self):
if self.should_run:
return
signal_module.signal(signal_module.SIGINT, self._sigint_handler)
signal_module.signal(signal_module.SIGTERM, self._sigterm_handler)
self.should_run = True
self.announce()
RNS.log(
f"LXST Relay ready on channel '{self.channel_name}'",
RNS.LOG_NOTICE,
)
RNS.log(
f"Identity: {RNS.prettyhexrep(self.identity.hash)}",
RNS.LOG_NOTICE,
)
RNS.log("Waiting for incoming calls...", RNS.LOG_NOTICE)
while self.should_run:
time.sleep(1)
def stop(self):
self.should_run = False
RNS.log("Relay stopping...", RNS.LOG_NOTICE)
# ------------------------------------------------------------------
# Signal handlers
# ------------------------------------------------------------------
def _sigint_handler(self, sig, frame):
self.stop()
def _sigterm_handler(self, sig, frame):
self.stop()
def main():
parser = argparse.ArgumentParser(
description=(
"LXST Relay - Group voice channel bridge for "
"Reticulum Telephony"
)
)
parser.add_argument(
"--config",
"-c",
default=None,
help="path to configuration directory",
type=str,
)
parser.add_argument(
"--rnsconfig",
default=None,
help="path to alternative Reticulum config directory",
type=str,
)
parser.add_argument(
"--channel",
"-n",
default="default",
help="voice channel name (default: 'default')",
type=str,
)
parser.add_argument(
"--version",
action="version",
version="lxst-relay 0.1.0",
)
parser.add_argument(
"-v", "--verbose", action="count", default=0
)
args = parser.parse_args()
relay = LXSTRelay(
configdir=args.config,
rnsconfigdir=args.rnsconfig,
channel_name=args.channel,
verbosity=args.verbose,
)
try:
relay.start()
except KeyboardInterrupt:
relay.stop()
print("")
if __name__ == "__main__":
main()