init
This commit is contained in:
@@ -0,0 +1 @@
|
||||
__pycache__
|
||||
Executable
+2
@@ -0,0 +1,2 @@
|
||||
#!/bin/sh
|
||||
exec python3 "$(dirname "$0")/lxst_relay.py" "$@"
|
||||
+461
@@ -0,0 +1,461 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
lxst-relay - Group voice channel bridge for Reticulum Telephony.
|
||||
|
||||
Allows multiple lxst users to call into a shared voice channel
|
||||
and talk together in a conference bridge style. Audio from each
|
||||
participant is mixed and redistributed to all others.
|
||||
"""
|
||||
|
||||
import RNS
|
||||
import os
|
||||
import time
|
||||
import signal as signal_module
|
||||
import threading
|
||||
import numpy as np
|
||||
import argparse
|
||||
|
||||
from LXST import Pipeline
|
||||
from LXST.Network import SignallingReceiver, Packetizer, LinkSource
|
||||
from LXST.Sinks import LocalSink
|
||||
from LXST.Sources import LocalSource
|
||||
from LXST.Codecs import Opus
|
||||
from LXST.Codecs.Codec import Codec, CodecError
|
||||
from LXST.Primitives.Telephony import Signalling
|
||||
|
||||
APP_NAME = "lxst"
|
||||
PRIMITIVE_NAME = "telephony"
|
||||
|
||||
FIELD_SIGNALLING = 0x00
|
||||
FIELD_FRAMES = 0x01
|
||||
|
||||
|
||||
class FrameCaptureSink(LocalSink):
|
||||
"""Stores the latest decoded float32 audio frame from a participant."""
|
||||
|
||||
def __init__(self, participant):
|
||||
self.participant = participant
|
||||
self.samplerate = 48000
|
||||
self.channels = 1
|
||||
|
||||
def handle_frame(self, frame, source, decoded=True):
|
||||
with self.participant.frame_lock:
|
||||
self.participant.latest_frame = frame
|
||||
|
||||
|
||||
class ConferenceSource(LocalSource):
|
||||
"""A source that periodically outputs the mixed audio of all channel
|
||||
participants except the one identified by *exclude_link_id*."""
|
||||
|
||||
def __init__(self, channel, exclude_link_id, frame_time_ms=60):
|
||||
self.channel = channel
|
||||
self.exclude_link_id = exclude_link_id
|
||||
self.frame_time_ms = frame_time_ms
|
||||
self.frame_time = frame_time_ms / 1000.0
|
||||
self.should_run = False
|
||||
self._sink = None
|
||||
self._codec = None
|
||||
self.samplerate = 48000
|
||||
self.channels = 1
|
||||
self.mixer_thread = None
|
||||
|
||||
@property
|
||||
def sink(self):
|
||||
return self._sink
|
||||
|
||||
@sink.setter
|
||||
def sink(self, sink):
|
||||
self._sink = sink
|
||||
|
||||
@property
|
||||
def codec(self):
|
||||
return self._codec
|
||||
|
||||
@codec.setter
|
||||
def codec(self, codec):
|
||||
if codec is None:
|
||||
self._codec = None
|
||||
elif not issubclass(type(codec), Codec):
|
||||
raise CodecError(f"Invalid codec specified for {self}")
|
||||
else:
|
||||
self._codec = codec
|
||||
|
||||
def start(self):
|
||||
if not self.should_run:
|
||||
self.should_run = True
|
||||
self.mixer_thread = threading.Thread(target=self._run, daemon=True)
|
||||
self.mixer_thread.start()
|
||||
|
||||
def stop(self):
|
||||
self.should_run = False
|
||||
|
||||
def _run(self):
|
||||
while self.should_run:
|
||||
loop_start = time.time()
|
||||
if self._sink:
|
||||
mixed = self.channel.get_mixed_frame(
|
||||
exclude=self.exclude_link_id
|
||||
)
|
||||
if mixed is not None and self._sink.can_receive():
|
||||
if self._codec:
|
||||
encoded = self._codec.encode(mixed)
|
||||
self._sink.handle_frame(encoded, self)
|
||||
elapsed = time.time() - loop_start
|
||||
sleep_time = max(0, self.frame_time - elapsed)
|
||||
time.sleep(sleep_time)
|
||||
|
||||
|
||||
class Participant:
|
||||
"""A remote caller connected to the relay."""
|
||||
|
||||
def __init__(self, link, identity):
|
||||
self.link = link
|
||||
self.identity = identity
|
||||
self.identity_hash = identity.hash
|
||||
self.link_source = None
|
||||
self.packetizer = None
|
||||
self.conf_source = None
|
||||
self.conf_pipeline = None
|
||||
self.latest_frame = None
|
||||
self.frame_lock = threading.Lock()
|
||||
self.joined_at = time.time()
|
||||
self.ended = False
|
||||
|
||||
|
||||
class VoiceChannel:
|
||||
"""A group voice channel that mixes audio from all participants.
|
||||
|
||||
Each participant receives a mix of every *other* participant
|
||||
(their own audio is excluded so they don't hear themselves).
|
||||
"""
|
||||
|
||||
def __init__(self, name, frame_time_ms=60):
|
||||
self.name = name
|
||||
self.participants = {}
|
||||
self.frame_time_ms = frame_time_ms
|
||||
self.frame_time = frame_time_ms / 1000.0
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def add_participant(self, participant, relay=None):
|
||||
with self.lock:
|
||||
link_id = participant.link.link_id
|
||||
self.participants[link_id] = participant
|
||||
|
||||
capture_sink = FrameCaptureSink(participant)
|
||||
link_source = LinkSource(
|
||||
link=participant.link,
|
||||
signalling_receiver=relay,
|
||||
sink=capture_sink,
|
||||
)
|
||||
participant.link_source = link_source
|
||||
|
||||
outgoing_codec = Opus(profile=Opus.PROFILE_VOICE_MEDIUM)
|
||||
packetizer = Packetizer(destination=participant.link)
|
||||
participant.packetizer = packetizer
|
||||
|
||||
conf_source = ConferenceSource(
|
||||
channel=self,
|
||||
exclude_link_id=link_id,
|
||||
frame_time_ms=self.frame_time_ms,
|
||||
)
|
||||
participant.conf_source = conf_source
|
||||
|
||||
conf_pipeline = Pipeline(
|
||||
source=conf_source,
|
||||
codec=outgoing_codec,
|
||||
sink=packetizer,
|
||||
)
|
||||
participant.conf_pipeline = conf_pipeline
|
||||
|
||||
link_source.start()
|
||||
conf_pipeline.start()
|
||||
|
||||
RNS.log(
|
||||
f"Participant {RNS.prettyhexrep(participant.identity_hash)} "
|
||||
f"joined channel '{self.name}' "
|
||||
f"({len(self.participants)} participant(s))",
|
||||
RNS.LOG_NOTICE,
|
||||
)
|
||||
|
||||
def remove_participant(self, link_id):
|
||||
with self.lock:
|
||||
p = self.participants.pop(link_id, None)
|
||||
if p is None:
|
||||
return
|
||||
p.ended = True
|
||||
if p.conf_source:
|
||||
p.conf_source.stop()
|
||||
if p.link_source:
|
||||
p.link_source.stop()
|
||||
if p.link and p.link.status == RNS.Link.ACTIVE:
|
||||
p.link.teardown()
|
||||
count = len(self.participants)
|
||||
RNS.log(
|
||||
f"Participant left channel '{self.name}' "
|
||||
f"({count} participant(s) remaining)",
|
||||
RNS.LOG_NOTICE,
|
||||
)
|
||||
|
||||
def get_participant_count(self):
|
||||
with self.lock:
|
||||
return len(self.participants)
|
||||
|
||||
def get_mixed_frame(self, exclude=None):
|
||||
"""Sum the latest frame from every participant except *exclude*."""
|
||||
with self.lock:
|
||||
frames = []
|
||||
for lid, p in self.participants.items():
|
||||
if exclude is not None and lid == exclude:
|
||||
continue
|
||||
with p.frame_lock:
|
||||
if p.latest_frame is not None:
|
||||
frames.append(p.latest_frame)
|
||||
|
||||
if not frames:
|
||||
return None
|
||||
|
||||
if len(frames) == 1:
|
||||
return frames[0]
|
||||
|
||||
min_len = min(f.shape[0] for f in frames)
|
||||
trimmed = [f[:min_len] for f in frames]
|
||||
mixed = np.sum(trimmed, axis=0)
|
||||
mixed = np.clip(mixed, -1.0, 1.0)
|
||||
return mixed
|
||||
|
||||
|
||||
class LXSTRelay(SignallingReceiver):
|
||||
"""Conference bridge for Reticulum Telephony.
|
||||
|
||||
Registers as a standard ``lxst.telephony`` destination so any
|
||||
lxst client can call in. Callers are placed into a shared
|
||||
``VoiceChannel`` and their audio is mixed together.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
configdir=None,
|
||||
rnsconfigdir=None,
|
||||
channel_name="default",
|
||||
verbosity=0,
|
||||
):
|
||||
super().__init__()
|
||||
self.configdir = configdir
|
||||
self.channel_name = channel_name
|
||||
self.should_run = False
|
||||
|
||||
if self.configdir is None:
|
||||
if os.path.isdir("/etc/lxst-relay"):
|
||||
self.configdir = "/etc/lxst-relay"
|
||||
else:
|
||||
userdir = RNS.Reticulum.userdir
|
||||
self.configdir = os.path.join(userdir, ".lxst-relay")
|
||||
|
||||
os.makedirs(self.configdir, exist_ok=True)
|
||||
|
||||
identity_path = os.path.join(self.configdir, "identity")
|
||||
if os.path.isfile(identity_path):
|
||||
self.identity = RNS.Identity.from_file(identity_path)
|
||||
if self.identity is None:
|
||||
RNS.log(
|
||||
"Could not load identity from file, creating new",
|
||||
RNS.LOG_ERROR,
|
||||
)
|
||||
self.identity = RNS.Identity()
|
||||
self.identity.to_file(identity_path)
|
||||
else:
|
||||
RNS.log("No identity found, generating new", RNS.LOG_NOTICE)
|
||||
self.identity = RNS.Identity()
|
||||
self.identity.to_file(identity_path)
|
||||
RNS.log(
|
||||
f"Generated identity: "
|
||||
f"{RNS.prettyhexrep(self.identity.hash)}",
|
||||
RNS.LOG_NOTICE,
|
||||
)
|
||||
|
||||
self.reticulum = RNS.Reticulum(
|
||||
configdir=rnsconfigdir, loglevel=3 + verbosity
|
||||
)
|
||||
|
||||
self.destination = RNS.Destination(
|
||||
self.identity,
|
||||
RNS.Destination.IN,
|
||||
RNS.Destination.SINGLE,
|
||||
APP_NAME,
|
||||
PRIMITIVE_NAME,
|
||||
)
|
||||
self.destination.set_proof_strategy(RNS.Destination.PROVE_NONE)
|
||||
self.destination.set_link_established_callback(
|
||||
self._incoming_link_established
|
||||
)
|
||||
|
||||
self.channel = VoiceChannel(name=channel_name)
|
||||
|
||||
RNS.log(
|
||||
f"Relay listening on "
|
||||
f"{RNS.prettyhexrep(self.destination.hash)}",
|
||||
RNS.LOG_NOTICE,
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Link lifecycle
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _incoming_link_established(self, link):
|
||||
link.is_incoming = True
|
||||
link.is_outgoing = False
|
||||
link.answered = False
|
||||
link.is_terminating = False
|
||||
|
||||
link.set_remote_identified_callback(self._caller_identified)
|
||||
link.set_link_closed_callback(self._link_closed)
|
||||
|
||||
self.signal(Signalling.STATUS_AVAILABLE, link)
|
||||
|
||||
RNS.log("Incoming link, awaiting identification", RNS.LOG_DEBUG)
|
||||
|
||||
def _caller_identified(self, link, identity):
|
||||
RNS.log(
|
||||
f"Caller identified: {RNS.prettyhexrep(identity.hash)}",
|
||||
RNS.LOG_NOTICE,
|
||||
)
|
||||
|
||||
participant = Participant(link, identity)
|
||||
|
||||
self.signal(Signalling.STATUS_RINGING, link)
|
||||
time.sleep(0.5)
|
||||
|
||||
self.signal(Signalling.STATUS_CONNECTING, link)
|
||||
self.channel.add_participant(participant, relay=self)
|
||||
self.signal(Signalling.STATUS_ESTABLISHED, link)
|
||||
|
||||
RNS.log(
|
||||
f"Call established with "
|
||||
f"{RNS.prettyhexrep(identity.hash)}, "
|
||||
f"{self.channel.get_participant_count()} participant(s) in "
|
||||
f"'{self.channel_name}'",
|
||||
RNS.LOG_NOTICE,
|
||||
)
|
||||
|
||||
def _link_closed(self, link):
|
||||
RNS.log(f"Link closed: {link}", RNS.LOG_DEBUG)
|
||||
self.channel.remove_participant(link.link_id)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# SignallingReceiver
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def signalling_received(self, signals, source):
|
||||
for signal in signals:
|
||||
if signal >= Signalling.PREFERRED_PROFILE:
|
||||
profile = signal - Signalling.PREFERRED_PROFILE
|
||||
RNS.log(
|
||||
f"Received preferred profile: 0x{profile:02x}",
|
||||
RNS.LOG_DEBUG,
|
||||
)
|
||||
else:
|
||||
RNS.log(
|
||||
f"Received signal: 0x{signal:02x}",
|
||||
RNS.LOG_DEBUG,
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Public API
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def announce(self):
|
||||
self.destination.announce()
|
||||
RNS.log("Relay announced on network", RNS.LOG_NOTICE)
|
||||
|
||||
def start(self):
|
||||
if self.should_run:
|
||||
return
|
||||
signal_module.signal(signal_module.SIGINT, self._sigint_handler)
|
||||
signal_module.signal(signal_module.SIGTERM, self._sigterm_handler)
|
||||
|
||||
self.should_run = True
|
||||
self.announce()
|
||||
|
||||
RNS.log(
|
||||
f"LXST Relay ready on channel '{self.channel_name}'",
|
||||
RNS.LOG_NOTICE,
|
||||
)
|
||||
RNS.log(
|
||||
f"Identity: {RNS.prettyhexrep(self.identity.hash)}",
|
||||
RNS.LOG_NOTICE,
|
||||
)
|
||||
RNS.log("Waiting for incoming calls...", RNS.LOG_NOTICE)
|
||||
|
||||
while self.should_run:
|
||||
time.sleep(1)
|
||||
|
||||
def stop(self):
|
||||
self.should_run = False
|
||||
RNS.log("Relay stopping...", RNS.LOG_NOTICE)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Signal handlers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _sigint_handler(self, sig, frame):
|
||||
self.stop()
|
||||
|
||||
def _sigterm_handler(self, sig, frame):
|
||||
self.stop()
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description=(
|
||||
"LXST Relay - Group voice channel bridge for "
|
||||
"Reticulum Telephony"
|
||||
)
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--config",
|
||||
"-c",
|
||||
default=None,
|
||||
help="path to configuration directory",
|
||||
type=str,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--rnsconfig",
|
||||
default=None,
|
||||
help="path to alternative Reticulum config directory",
|
||||
type=str,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--channel",
|
||||
"-n",
|
||||
default="default",
|
||||
help="voice channel name (default: 'default')",
|
||||
type=str,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--version",
|
||||
action="version",
|
||||
version="lxst-relay 0.1.0",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-v", "--verbose", action="count", default=0
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
relay = LXSTRelay(
|
||||
configdir=args.config,
|
||||
rnsconfigdir=args.rnsconfig,
|
||||
channel_name=args.channel,
|
||||
verbosity=args.verbose,
|
||||
)
|
||||
|
||||
try:
|
||||
relay.start()
|
||||
except KeyboardInterrupt:
|
||||
relay.stop()
|
||||
print("")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user