Re: [PATCH BlueZ 3/5] test: Add a script to test ASHA

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hi Arun,

On Wed, May 8, 2024 at 11:48 AM Arun Raghavan <arun@xxxxxxxxxxxxx> wrote:
>
> Plays out an audio file to the device. Depends on GStreamer for media
> file reading and decoding (specifically, gstreamer core,
> gst-plugins-base, gst-ffmpeg, and gst-python, or equivalent packages).
> ---
>  test/simple-asha | 158 +++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 158 insertions(+)
>  create mode 100755 test/simple-asha
>
> diff --git a/test/simple-asha b/test/simple-asha
> new file mode 100755
> index 000000000..feff9d29c
> --- /dev/null
> +++ b/test/simple-asha
> @@ -0,0 +1,158 @@
> +#!/usr/bin/env python3
> +# SPDX-License-Identifier: LGPL-2.1-or-later
> +
> +import os
> +import signal
> +import sys
> +
> +import dbus
> +import dbus.service
> +import dbus.mainloop.glib
> +
> +import gi
> +
> +gi.require_version("Gst", "1.0")
> +gi.require_version("GLib", "2.0")
> +from gi.repository import GLib, Gst
> +
> +import bluezutils
> +
> +mainloop = None
> +pipeline = None
> +seqnum: int = 0
> +
> +
> +def signal_handler(_sig, _frame):
> +    print("Got interrupt")
> +    mainloop.quit()
> +
> +
> +signal.signal(signal.SIGINT, signal_handler)
> +
> +
> +def usage():
> +    print(f"Usage: simple-asha <remote addr> <audio file name> (optional volume 0-127)")
> +
> +
> +def start_playback(fd: int):
> +    global mainloop, pipeline
> +
> +    outdata = bytearray(161)
> +
> +    Gst.init(None)
> +
> +    pipeline = Gst.parse_launch(
> +        f"""
> +          filesrc location="{sys.argv[2]}" ! decodebin !
> +          audioconvert ! audioresample !
> +          audiobuffersplit output-buffer-duration="20/1000" ! avenc_g722 !
> +          appsink name=sink emit-signals=true
> +    """
> +    )
> +
> +    def on_new_sample(sink):
> +        global seqnum
> +
> +        sample = sink.emit("pull-sample")
> +        buf = sample.get_buffer()
> +
> +        with buf.map(Gst.MapFlags.READ) as info:
> +            pos = 0
> +
> +            if info.size != 160:
> +                print("Unexpected buffer size: ", info.size)
> +
> +            outdata[pos] = seqnum % 256
> +            pos += 1
> +
> +            for byte in info.data:
> +                outdata[pos] = byte
> +                pos += 1
> +
> +            try:
> +                n = os.write(fd, outdata)
> +                if n != 161:
> +                    print("Wrote less than expected: ", n)
> +            except:
> +                return Gst.FlowReturn.ERROR
> +
> +        seqnum += 1
> +
> +        return Gst.FlowReturn.OK
> +
> +    sink = pipeline.get_by_name("sink")
> +    sink.connect("new-sample", on_new_sample)
> +
> +    pipeline.set_state(Gst.State.PLAYING)
> +
> +    def bus_message(_bus, message, _data) -> bool:
> +        typ = message.type
> +
> +        if typ == Gst.MessageType.EOS:
> +            print("End of stream")
> +            mainloop.quit()
> +        elif typ == Gst.MessageType.ERROR:
> +            err, debug = message.parse_error()
> +            print(f"Pipeline error: {err} ({debug})")
> +            mainloop.quit()
> +
> +    bus = pipeline.get_bus()
> +    bus.add_watch(GLib.PRIORITY_DEFAULT, bus_message, None)
> +
> +
> +if __name__ == "__main__":
> +    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
> +
> +    mainloop = GLib.MainLoop()
> +    bus = dbus.SystemBus()
> +
> +    if (len(sys.argv) == 3) or (len(sys.argv) == 4):
> +        device = bluezutils.find_device(sys.argv[1])
> +        if device is None:
> +            print("Could not find device: ", sys.argv[1])
> +            exit(255)
> +    else:
> +        usage()
> +        sys.exit(255)
> +
> +    asha = bus.get_object("org.bluez", device.object_path + "/asha")
> +    media = dbus.Interface(
> +        bus.get_object("org.bluez", device.object_path + "/asha"),
> +        "org.bluez.MediaEndpoint1",
> +    )
> +
> +    props = asha.GetAll(
> +        "org.bluez.MediaEndpoint1",
> +        dbus_interface="org.freedesktop.DBus.Properties",
> +    )
> +    path = props["Transport"]
> +
> +    print("Trying to acquire", path)
> +
> +    transport = dbus.Interface(
> +        bus.get_object("org.bluez", path),
> +        "org.bluez.MediaTransport1",
> +    )
> +
> +    # Keep default volume at 25%
> +    volume = 32
> +    if len(sys.argv) == 4:
> +        volume = int(sys.argv[3])
> +        if volume < 0 or volume > 127:
> +            print("Volume must be between 0 (mute) and 127 (max)")
> +
> +    transport.Set(
> +        "org.bluez.MediaTransport1",
> +        "Volume",
> +        dbus.UInt16(volume, variant_level=1),
> +        dbus_interface="org.freedesktop.DBus.Properties",
> +    )
> +
> +    (fd, imtu, omtu) = transport.Acquire()
> +
> +    start_playback(fd.take())
> +
> +    mainloop.run()
> +
> +    pipeline.set_state(Gst.State.NULL)
> +    transport.Release()
> --
> 2.45.0

While I don't mind having a python example I think we are much better
of adding such support in bluetoothctl, most should already work with
transport submenu but perhaps we want to add support for gstreamer
pipeline instead of just a file which would be useful for creating
A2DP sbc and BAP lc3 streams.

>


-- 
Luiz Augusto von Dentz





[Index of Archives]     [Bluez Devel]     [Linux Wireless Networking]     [Linux Wireless Personal Area Networking]     [Linux ATH6KL]     [Linux USB Devel]     [Linux Media Drivers]     [Linux Audio Users]     [Linux Kernel]     [Linux SCSI]     [Big List of Linux Books]

  Powered by Linux