From: Bartosz Golaszewski <bartosz.golaszewski@xxxxxxxxxx> Certain polling APIs in the standard library - most notably: the select() function and the poll class - allow to poll any object that implements the fileno() method returning the underlying file descriptor number. Implement fileno() for Chip and LineRequest which allows users to do: rd, _, _ = select([chip/request], [], [], 1) where rd will contain the actual object passed to select which makes for easier reading of events afterwards. Reviewed-by: Vincent Fazio <vfazio@xxxxxxxxxxx> Signed-off-by: Bartosz Golaszewski <bartosz.golaszewski@xxxxxxxxxx> --- Changes in v2: - Add return value type hints to test functions - Link to v1: https://lore.kernel.org/r/20241210-python-fileno-v1-1-c811cb70e122@xxxxxxxxxx --- bindings/python/gpiod/chip.py | 6 +++++ bindings/python/gpiod/line_request.py | 6 +++++ bindings/python/tests/tests_edge_event.py | 37 +++++++++++++++++++++++++++++++ bindings/python/tests/tests_info_event.py | 18 +++++++++++++++ 4 files changed, 67 insertions(+) diff --git a/bindings/python/gpiod/chip.py b/bindings/python/gpiod/chip.py index 30201b5..ddd07b8 100644 --- a/bindings/python/gpiod/chip.py +++ b/bindings/python/gpiod/chip.py @@ -344,6 +344,12 @@ class Chip: return request + def fileno(self) -> int: + """ + Return the underlying file descriptor. + """ + return self.fd + def __repr__(self) -> str: """ Return a string that can be used to re-create this chip object. diff --git a/bindings/python/gpiod/line_request.py b/bindings/python/gpiod/line_request.py index 9471442..ef53e16 100644 --- a/bindings/python/gpiod/line_request.py +++ b/bindings/python/gpiod/line_request.py @@ -224,6 +224,12 @@ class LineRequest: return cast(_ext.Request, self._req).read_edge_events(max_events) + def fileno(self) -> int: + """ + Return the underlying file descriptor. + """ + return self.fd + def __str__(self) -> str: """ Return a user-friendly, human-readable description of this request. diff --git a/bindings/python/tests/tests_edge_event.py b/bindings/python/tests/tests_edge_event.py index d7766ec..bf1685c 100644 --- a/bindings/python/tests/tests_edge_event.py +++ b/bindings/python/tests/tests_edge_event.py @@ -4,6 +4,7 @@ import time from datetime import timedelta from functools import partial +from select import select from threading import Thread from typing import Optional from unittest import TestCase @@ -201,6 +202,42 @@ class ReadingMultipleEdgeEvents(TestCase): self.global_seqno += 1 +class PollLineRequestObject(TestCase): + def setUp(self) -> None: + self.sim = gpiosim.Chip(num_lines=8) + self.request = gpiod.request_lines( + self.sim.dev_path, {2: gpiod.LineSettings(edge_detection=Edge.BOTH)} + ) + self.thread: Optional[Thread] = None + + def tearDown(self) -> None: + if self.thread: + self.thread.join() + del self.thread + self.request.release() + del self.request + del self.sim + + def trigger_rising_edge(self, offset: int) -> None: + time.sleep(0.05) + self.sim.set_pull(offset, Pull.UP) + + def test_select_request_object(self) -> None: + self.thread = Thread(target=partial(self.trigger_rising_edge, 2)) + self.thread.start() + + rd, wr, ex = select([self.request], [], [], 1) + self.assertFalse(wr) + self.assertFalse(ex) + self.assertEqual(rd[0], self.request) + + events = rd[0].read_edge_events() + self.assertEqual(len(events), 1) + + event = events[0] + self.assertEqual(event.line_offset, 2) + + class EdgeEventStringRepresentation(TestCase): def test_edge_event_str(self) -> None: sim = gpiosim.Chip() diff --git a/bindings/python/tests/tests_info_event.py b/bindings/python/tests/tests_info_event.py index e726a54..31dc952 100644 --- a/bindings/python/tests/tests_info_event.py +++ b/bindings/python/tests/tests_info_event.py @@ -7,6 +7,7 @@ import threading import time from dataclasses import FrozenInstanceError from functools import partial +from select import select from typing import Optional from unittest import TestCase @@ -131,6 +132,23 @@ class WatchingInfoEventWorks(TestCase): self.assertGreater(ts_rel, ts_rec) self.assertGreater(ts_rec, ts_req) + def test_select_chip_object(self) -> None: + info = self.chip.watch_line_info(7) + + self.thread = threading.Thread( + target=partial(request_reconfigure_release_line, self.sim.dev_path, 7) + ) + self.thread.start() + + rd, wr, ex = select([self.chip], [], [], 1) + self.assertFalse(wr) + self.assertFalse(ex) + self.assertEqual(rd[0], self.chip) + + event = rd[0].read_info_event() + self.assertEqual(event.event_type, _EventType.LINE_REQUESTED) + self.assertEqual(event.line_info.offset, 7) + class UnwatchingLineInfo(TestCase): def setUp(self) -> None: --- base-commit: 6d9133a259e64da5e03c7e7784f0f27de7b3e59f change-id: 20241210-python-fileno-1ed9c7bf413a Best regards, -- Bartosz Golaszewski <bartosz.golaszewski@xxxxxxxxxx>