Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 29 additions & 78 deletions udsoncan/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
_import_isotp_err = e

try:
from udsoncan.j2534 import J2534, TxStatusFlag, Protocol_ID, Error_ID, Ioctl_Flags, Ioctl_ID, SCONFIG_LIST
from udsoncan.j2534 import J2534, Protocol_ID, Error_ID, Ioctl_Flags, Ioctl_ID, SCONFIG_LIST
_import_j2534_err = None
except Exception as e:
_import_j2534_err = e
Expand Down Expand Up @@ -63,8 +63,7 @@ def send(self, data: Union[bytes, Request, Response], timeout: Optional[float] =

:returns: None
"""
if not self.is_open():
raise RuntimeError("Connection is not opened")
self.check_connection_opened()

if isinstance(data, Request) or isinstance(data, Response):
payload = data.get_payload()
Expand All @@ -79,6 +78,10 @@ def send(self, data: Union[bytes, Request, Response], timeout: Optional[float] =
else:
self.specific_send(payload)

def check_connection_opened(self):
if not self.is_open():
raise RuntimeError(self.__class__.__name__ + ' is not opened')

def wait_frame(self, timeout: Optional[float] = None, exception: bool = False) -> Optional[bytes]:
"""Waits for the reception of a frame of data from the underlying transport protocol

Expand All @@ -92,8 +95,7 @@ def wait_frame(self, timeout: Optional[float] = None, exception: bool = False) -
:returns: Received data
:rtype: bytes or None
"""
if not self.is_open():
raise RuntimeError("Connection is not opened")
self.check_connection_opened()

try:
frame = self.specific_wait_frame(timeout=timeout)
Expand Down Expand Up @@ -243,21 +245,13 @@ def specific_send(self, payload: bytes, timeout: Optional[float] = None) -> None
self.sock.send(payload)

def specific_wait_frame(self, timeout: Optional[float] = None) -> Optional[bytes]:
if not self.opened:
raise RuntimeError("Connection is not open")
self.check_connection_opened()

timedout = False
frame = None
try:
frame = self.rxqueue.get(block=True, timeout=timeout)
return self.rxqueue.get(block=True, timeout=timeout)
except queue.Empty:
timedout = True

if timedout:
raise TimeoutException("Did not received frame in time (timeout=%s sec)" % timeout)

return frame

def empty_rxqueue(self) -> None:
while not self.rxqueue.empty():
self.rxqueue.get()
Expand Down Expand Up @@ -359,22 +353,13 @@ def specific_send(self, payload: bytes, timeout: Optional[float] = None) -> None
self.tpsock.send(payload)

def specific_wait_frame(self, timeout: Optional[float] = None) -> Optional[bytes]:
if not self.opened:
raise RuntimeError("Connection is not open")
self.check_connection_opened()

timedout = False
frame = None
try:
frame = self.rxqueue.get(block=True, timeout=timeout)

return self.rxqueue.get(block=True, timeout=timeout)
except queue.Empty:
timedout = True

if timedout:
raise TimeoutException("Did not received ISOTP frame in time (timeout=%s sec)" % timeout)

return frame

def empty_rxqueue(self) -> None:
while not self.rxqueue.empty():
self.rxqueue.get()
Expand Down Expand Up @@ -442,17 +427,12 @@ def specific_send(self, payload: bytes, timeout: Optional[float] = None) -> None
self.touserqueue.put(payload, block=True, timeout=timeout)

def specific_wait_frame(self, timeout: Optional[float] = None) -> Optional[bytes]:
if not self.opened:
raise RuntimeError("Connection is not open")
self.check_connection_opened()

timedout = False
frame = None
try:
frame = self.fromuserqueue.get(block=True, timeout=timeout)
except queue.Empty:
timedout = True

if timedout:
raise TimeoutException("Did not receive frame from user queue in time (timeout=%s sec)" % timeout)

if self.mtu is not None:
Expand Down Expand Up @@ -577,8 +557,7 @@ def specific_send(self, payload: bytes, timeout: Optional[float] = None) -> None
self.isotp_layer.send(payload, send_timeout=timeout)

def specific_wait_frame(self, timeout: Optional[float] = None) -> Optional[bytes]:
if not self.opened:
raise RuntimeError("Connection is not opened")
self.check_connection_opened()

frame = self.isotp_layer.recv(block=True, timeout=timeout)
if frame is None:
Expand Down Expand Up @@ -649,25 +628,15 @@ def specific_send(self, payload: bytes, timeout: Optional[float] = None):
self.toIsoTPQueue.put(bytearray(payload)) # isotp.protocol.TransportLayer uses byte array. udsoncan is strict on bytes format

def specific_wait_frame(self, timeout: Optional[float] = None) -> Optional[bytes]:
if not self.opened:
raise RuntimeError("Connection is not open")
self.check_connection_opened()

timedout = False
frame = None
try:
frame = self.fromIsoTPQueue.get(block=True, timeout=timeout)
# isotp.protocol.TransportLayer uses bytearray. udsoncan is strict on bytes format
return bytes(frame)
except queue.Empty:
timedout = True

if timedout:
raise TimeoutException("Did not receive IsoTP frame from the Transport layer in time (timeout=%s sec)" % timeout)

if frame is None:
return None

# isotp.protocol.TransportLayer uses bytearray. udsoncan is strict on bytes format
return bytes(frame)

def empty_rxqueue(self) -> None:
while not self.fromIsoTPQueue.empty():
self.fromIsoTPQueue.get()
Expand Down Expand Up @@ -804,7 +773,7 @@ def __init__(self,

def open(self) -> "J2534Connection":
self.exit_requested = False
self.sem = threading.Semaphore()
self.interfaceSemaphore = threading.Semaphore()
self.rxthread = threading.Thread(target=self.rxthread_task, daemon=True)
self.rxthread.start()
self.opened = True
Expand All @@ -822,15 +791,15 @@ def is_open(self) -> bool:

def rxthread_task(self) -> None:
while not self.exit_requested:
self.sem.acquire()
self.interfaceSemaphore.acquire()
try:
result, data, numMessages = self.interface.PassThruReadMsgs(self.channelID, self.protocol.value, 1, 1)
result, data, numMessages = self.interface.PassThruReadMsgs(self.channelID, self.protocol.value, pNumMsgs=1)
if data is not None:
self.rxqueue.put(data)
except Exception:
self.logger.critical("Exiting J2534 rx thread")
self.exit_requested = True
self.sem.release()
self.interfaceSemaphore.release()
time.sleep(0.001)

def log_last_operation(self, exec_method: str, with_raise = False) -> None:
Expand Down Expand Up @@ -860,27 +829,19 @@ def specific_send(self, payload: bytes, timeout: Optional[float] = None):
timeout = 0 if timeout is None else timeout

# Fix for avoid ERR_CONCURRENT_API_CALL. Stop reading
self.sem.acquire()
self.interfaceSemaphore.acquire()
self.result = self.interface.PassThruWriteMsgs(self.channelID, payload, self.protocol.value, Timeout=int(timeout * 1000))
self.log_last_operation('PassThruWriteMsgs', with_raise=True)
self.sem.release()
self.interfaceSemaphore.release()

def specific_wait_frame(self, timeout: Optional[float] = None) -> Optional[bytes]:
if not self.opened:
raise RuntimeError("J2534 Connection is not open")
self.check_connection_opened()

timedout = False
frame = None
try:
frame = self.rxqueue.get(block=True, timeout=timeout)
return self.rxqueue.get(block=True, timeout=timeout)
except queue.Empty:
timedout = True

if timedout:
raise TimeoutException("Did not received response from J2534 RxQueue (timeout=%s sec)" % timeout)

return frame

def empty_rxqueue(self) -> None:
while not self.rxqueue.empty():
self.rxqueue.get()
Expand Down Expand Up @@ -942,21 +903,13 @@ def specific_send(self, payload: bytes, timeout: Optional[float] = None):
self.rxqueue.put(self.ResponseData[payload])

def specific_wait_frame(self, timeout: Optional[float] = None) -> Optional[bytes]:
if not self.opened:
raise RuntimeError("Fake Connection is not open")
self.check_connection_opened()

timedout = False
frame = None
try:
frame = self.rxqueue.get(block=True, timeout=timeout)
return self.rxqueue.get(block=True, timeout=timeout)
except queue.Empty:
timedout = True

if timedout:
raise TimeoutException("Did not received response from J2534 RxQueue (timeout=%s sec)" % timeout)

return frame

def empty_rxqueue(self) -> None:
while not self.rxqueue.empty():
self.rxqueue.get()
Expand Down Expand Up @@ -1002,13 +955,11 @@ def __init__(self, rx_id: int, tx_id: int, name: Optional[str] = None, *args, **
self.opened = False

def specific_send(self, payload: bytes, timeout: Optional[float] = None) -> None:
if self.conn is None or not self.opened:
raise RuntimeError("Connection is not opened")
self.check_connection_opened()
self.conn.send(payload)

def specific_wait_frame(self, timeout: Optional[float] = None) -> Optional[bytes]:
if not self.opened or self.conn is None:
raise RuntimeError("Connection is not open")
self.check_connection_opened()

frame = cast(Optional[bytes], self.conn.recv(timeout))

Expand All @@ -1034,7 +985,7 @@ def empty_rxqueue(self) -> None:
self.conn.empty()

def is_open(self) -> bool:
return self.opened
return self.conn and self.opened

def __enter__(self) -> "SyncAioIsotpConnection":
return self
Expand Down
Loading