Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions serialx/platforms/serial_posix.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import array
import asyncio
import errno
import fcntl
Expand Down Expand Up @@ -320,8 +321,7 @@ def _get_modem_pins(self) -> ModemPins:
"""Get current modem control bits."""
assert self._fileno is not None

# A `bytearray` is critical here: `bytes` will not be mutated
buffer = bytearray((0x00000000).to_bytes(4, "little"))
buffer = array.array("i", [0x00000000])

try:
fcntl.ioctl(self._fileno, termios.TIOCMGET, buffer)
Expand All @@ -330,7 +330,7 @@ def _get_modem_pins(self) -> ModemPins:
LOGGER.debug("Device is not a serial port, cannot get modem pins")
return ModemPins()

n = int.from_bytes(buffer, "little")
n = buffer[0]
return ModemPins(
**{
name: PinState.HIGH if n & bit else PinState.LOW
Expand All @@ -353,21 +353,21 @@ def _set_modem_pins(self, modem_pins: ModemPins) -> None:
if all_pins_set:
value = modem_pins_as_int(modem_pins)
LOGGER.debug("Setting all with TIOCMSET: 0x%08X", value)
fcntl.ioctl(self._fileno, termios.TIOCMSET, value.to_bytes(4, "little"))
fcntl.ioctl(self._fileno, termios.TIOCMSET, array.array("i", [value]))
else:
to_set = modem_pins_mask_of_value(modem_pins, PinState.HIGH)
to_clear = modem_pins_mask_of_value(modem_pins, PinState.LOW)

if to_set:
LOGGER.debug("Setting TIOCMBIS: 0x%08X", to_set)
fcntl.ioctl(
self._fileno, termios.TIOCMBIS, to_set.to_bytes(4, "little")
self._fileno, termios.TIOCMBIS, array.array("i", [to_set])
)

if to_clear:
LOGGER.debug("TIOCMBIC: 0x%08X", to_clear)
fcntl.ioctl(
self._fileno, termios.TIOCMBIC, to_clear.to_bytes(4, "little")
self._fileno, termios.TIOCMBIC, array.array("i", [to_clear])
)
except OSError as exc:
if exc.errno == errno.ENOTTY:
Expand Down Expand Up @@ -461,20 +461,20 @@ def _write(self, data: Buffer, *, timeout: float | None) -> int:
def num_unread_bytes(self) -> int:
"""Return the number of bytes waiting to be read."""
assert self._fileno is not None
buffer = bytearray((0x00000000).to_bytes(4, "little"))
buffer = array.array("i", [0x00000000])

fcntl.ioctl(self._fileno, termios.FIONREAD, buffer)

return int.from_bytes(buffer, "little")
return buffer[0]

def num_unwritten_bytes(self) -> int:
"""Return the number of bytes waiting to be written."""
assert self._fileno is not None
buffer = bytearray((0x00000000).to_bytes(4, "little"))
buffer = array.array("i", [0x00000000])

fcntl.ioctl(self._fileno, termios.TIOCOUTQ, buffer)

return int.from_bytes(buffer, "little")
return buffer[0]

def _reset_read_buffer(self) -> None:
"""Reset the read buffer."""
Expand Down
117 changes: 117 additions & 0 deletions tests/test_serial_posix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""POSIX serial port tests."""

import sys

import pytest

if sys.platform in ("win32", "emscripten"):
pytest.skip("POSIX-only tests", allow_module_level=True)

import array
import errno
import termios
from typing import Any
from unittest.mock import patch

from serialx import ModemPins, PinState
from serialx.platforms.serial_posix import PosixSerial


def test_num_unread_bytes_uses_native_int_ioctl_buffer() -> None:
"""FIONREAD writes a native C int, not a little-endian byte string."""

def ioctl(fd: int, request: int, arg: Any = 0, mutate_flag: bool = True) -> int:
assert request == termios.FIONREAD
assert isinstance(arg, array.array)
assert arg.typecode == "i"
arg[0] = 123
return 0

serial = PosixSerial(fileno=1)
with patch("serialx.platforms.serial_posix.fcntl.ioctl", side_effect=ioctl):
assert serial.num_unread_bytes() == 123


def test_num_unwritten_bytes_uses_native_int_ioctl_buffer() -> None:
"""TIOCOUTQ writes a native C int, not a little-endian byte string."""

def ioctl(fd: int, request: int, arg: Any = 0, mutate_flag: bool = True) -> int:
assert request == termios.TIOCOUTQ
assert isinstance(arg, array.array)
assert arg.typecode == "i"
arg[0] = 456
return 0

serial = PosixSerial(fileno=1)
with patch("serialx.platforms.serial_posix.fcntl.ioctl", side_effect=ioctl):
assert serial.num_unwritten_bytes() == 456


def test_get_modem_pins_uses_native_int_ioctl_buffer() -> None:
"""TIOCMGET writes a native C int, not a little-endian byte string."""

def ioctl(fd: int, request: int, arg: Any = 0, mutate_flag: bool = True) -> int:
assert request == termios.TIOCMGET
assert isinstance(arg, array.array)
assert arg.typecode == "i"
arg[0] = termios.TIOCM_DTR
return 0

serial = PosixSerial(fileno=1)
with patch("serialx.platforms.serial_posix.fcntl.ioctl", side_effect=ioctl):
pins = serial.get_modem_pins()

assert pins.dtr is PinState.HIGH
assert pins.rts is PinState.LOW


def test_set_modem_pins_uses_native_int_ioctl_buffer() -> None:
"""TIOCMSET reads a native C int, not a little-endian byte string."""

def ioctl(fd: int, request: int, arg: Any = 0, mutate_flag: bool = True) -> int:
assert request == termios.TIOCMSET
assert isinstance(arg, array.array)
assert arg.typecode == "i"
assert arg[0] & termios.TIOCM_DTR
assert arg[0] & termios.TIOCM_RTS
return 0

serial = PosixSerial(fileno=1)
with patch("serialx.platforms.serial_posix.fcntl.ioctl", side_effect=ioctl):
serial.set_modem_pins(
ModemPins(
le=PinState.LOW,
dtr=PinState.HIGH,
rts=PinState.HIGH,
st=PinState.LOW,
sr=PinState.LOW,
cts=PinState.LOW,
car=PinState.LOW,
rng=PinState.LOW,
dsr=PinState.LOW,
)
)


def test_partial_set_modem_pins_uses_native_int_ioctl_buffer() -> None:
"""TIOCMBIS/TIOCMBIC read native C ints, not little-endian byte strings."""

seen_requests = []

def ioctl(fd: int, request: int, arg: Any = 0, mutate_flag: bool = True) -> int:
assert isinstance(arg, array.array)
assert arg.typecode == "i"
seen_requests.append(request)
if request == termios.TIOCMBIS:
assert arg[0] == termios.TIOCM_DTR
elif request == termios.TIOCMBIC:
assert arg[0] == termios.TIOCM_RTS
else:
raise OSError(errno.ENOTTY, "unexpected ioctl")
return 0

serial = PosixSerial(fileno=1)
with patch("serialx.platforms.serial_posix.fcntl.ioctl", side_effect=ioctl):
serial.set_modem_pins(dtr=True, rts=False)

assert seen_requests == [termios.TIOCMBIS, termios.TIOCMBIC]
Loading