diff --git a/canopen/objectdictionary/__init__.py b/canopen/objectdictionary/__init__.py index fa694c56..aa3ed6ac 100644 --- a/canopen/objectdictionary/__init__.py +++ b/canopen/objectdictionary/__init__.py @@ -207,6 +207,8 @@ def __init__(self, name: str, index: int): self.name = name #: Storage location of index self.storage_location = None + #: CiA 306 ObjFlags bitfield + self.obj_flags: int = 0 self.subindices = {} self.names = {} @@ -266,6 +268,8 @@ def __init__(self, name: str, index: int): self.name = name #: Storage location of index self.storage_location = None + #: CiA 306 ObjFlags bitfield + self.obj_flags: int = 0 self.subindices = {} self.names = {} @@ -372,6 +376,10 @@ def __init__(self, name: str, index: int, subindex: int = 0): self.bit_definitions: dict[str, list[int]] = {} #: Storage location of index self.storage_location = None + #: CiA 306 ObjFlags bitfield + self.obj_flags: int = 0 + #: CiA 306 Denotation string (DCF only) + self.denotation: str = "" #: Can this variable be mapped to a PDO self.pdo_mappable = False diff --git a/canopen/objectdictionary/eds.py b/canopen/objectdictionary/eds.py index 312874d5..13cab50f 100644 --- a/canopen/objectdictionary/eds.py +++ b/canopen/objectdictionary/eds.py @@ -139,14 +139,17 @@ def import_eds(source, node_id): arr.add_member(last_subindex) arr.add_member(build_variable(eds, section, node_id, index, 1)) arr.storage_location = storage_location + arr.obj_flags = _get_obj_flags(eds, section) od.add_object(arr) elif object_type == objectcodes.ARRAY: arr = ODArray(name, index) arr.storage_location = storage_location + arr.obj_flags = _get_obj_flags(eds, section) od.add_object(arr) elif object_type == objectcodes.RECORD: record = ODRecord(name, index) record.storage_location = storage_location + record.obj_flags = _get_obj_flags(eds, section) od.add_object(record) continue @@ -252,6 +255,15 @@ def _revert_variable(var_type, value): return f"0x{value:02X}" +def _get_obj_flags(eds, section): + if eds.has_option(section, "ObjFlags"): + try: + return int(eds.get(section, "ObjFlags"), 0) + except ValueError: + pass + return 0 + + def build_variable(eds, section, node_id, index, subindex=0): """Creates a object dictionary entry. :param eds: String stream of the eds file @@ -330,6 +342,9 @@ def build_variable(eds, section, node_id, index, subindex=0): var.unit = eds.get(section, "Unit") except ValueError: pass + var.obj_flags = _get_obj_flags(eds, section) + if eds.has_option(section, "Denotation"): + var.denotation = eds.get(section, "Denotation") return var @@ -404,12 +419,19 @@ def export_variable(var, eds): if getattr(var, 'unit', '') != '': eds.set(section, "Unit", var.unit) + if getattr(var, 'obj_flags', 0) != 0: + eds.set(section, "ObjFlags", f"0x{var.obj_flags:X}") + if device_commisioning and getattr(var, 'denotation', '') != '': + eds.set(section, "Denotation", var.denotation) + def export_record(var, eds): section = f"{var.index:04X}" export_common(var, eds, section) eds.set(section, "SubNumber", f"0x{len(var.subindices):X}") ot = objectcodes.RECORD if isinstance(var, ODRecord) else objectcodes.ARRAY eds.set(section, "ObjectType", f"0x{ot:X}") + if getattr(var, 'obj_flags', 0) != 0: + eds.set(section, "ObjFlags", f"0x{var.obj_flags:X}") for i in var: export_variable(var[i], eds) diff --git a/test/sample.eds b/test/sample.eds index 1afe9965..7085d635 100644 --- a/test/sample.eds +++ b/test/sample.eds @@ -1017,3 +1017,32 @@ PDOMapping=0x0 Factor=ERROR Description= Unit= + +[3060] +ParameterName=Object with ObjFlags +ObjectType=0x7 +DataType=0x0007 +AccessType=rw +PDOMapping=0 +ObjFlags=0x1 + +[3063] +ParameterName=Record with ObjFlags +ObjectType=0x9 +ObjFlags=0x3 +SubNumber=0x2 + +[3063sub0] +ParameterName=Highest sub-index supported +ObjectType=0x7 +DataType=0x0005 +AccessType=ro +DefaultValue=0x01 +PDOMapping=0 + +[3063sub1] +ParameterName=Value +ObjectType=0x7 +DataType=0x0007 +AccessType=rw +PDOMapping=0 diff --git a/test/test_eds.py b/test/test_eds.py index 68f5ad3c..15a5a2c9 100644 --- a/test/test_eds.py +++ b/test/test_eds.py @@ -359,6 +359,69 @@ def verify_od(self, source, doctype): self.assertEqual(self.od.comments, exported_od.comments) + def test_reading_obj_flags(self): + var = self.od[0x3060] + self.assertIsInstance(var, canopen.objectdictionary.ODVariable) + self.assertEqual(var.obj_flags, 0x1) + + def test_reading_obj_flags_default(self): + """Standard objects without ObjFlags must have obj_flags == 0.""" + var = self.od[0x1017] # Producer heartbeat time — no ObjFlags in sample.eds + self.assertEqual(var.obj_flags, 0) + + def test_reading_obj_flags_record(self): + record = self.od[0x3063] + self.assertIsInstance(record, canopen.objectdictionary.ODRecord) + self.assertEqual(record.obj_flags, 0x3) + + def test_roundtrip_obj_flags(self): + import io + with io.StringIO() as dest: + canopen.export_od(self.od, dest, 'eds') + dest.name = 'mock.eds' + dest.seek(0) + od2 = canopen.import_od(dest) + self.assertEqual(od2[0x3060].obj_flags, 0x1) + self.assertEqual(od2[0x1017].obj_flags, 0) + + def test_roundtrip_obj_flags_record(self): + import io + with io.StringIO() as dest: + canopen.export_od(self.od, dest, 'eds') + dest.name = 'mock.eds' + dest.seek(0) + od2 = canopen.import_od(dest) + self.assertEqual(od2[0x3063].obj_flags, 0x3) + + def test_invalid_obj_flags_returns_zero(self): + import configparser + from canopen.objectdictionary.eds import _get_obj_flags + eds = configparser.RawConfigParser() + eds.optionxform = str + eds.add_section("3060") + eds.set("3060", "ObjFlags", "not_a_number") + self.assertEqual(_get_obj_flags(eds, "3060"), 0) + + def test_denotation_roundtrip_dcf(self): + import io + self.od[0x3060].denotation = 'FlaggedObject' + with io.StringIO() as dest: + canopen.export_od(self.od, dest, 'dcf') + dest.name = 'mock.dcf' + dest.seek(0) + od2 = canopen.import_od(dest) + self.assertEqual(od2[0x3060].denotation, 'FlaggedObject') + + def test_denotation_not_exported_in_eds_mode(self): + import io + self.od[0x3060].denotation = 'ShouldNotAppear' + with io.StringIO() as dest: + canopen.export_od(self.od, dest, 'eds') + dest.name = 'mock.eds' + dest.seek(0) + od2 = canopen.import_od(dest) + self.assertEqual(od2[0x3060].denotation, '') + if __name__ == "__main__": unittest.main()