Skip to content

Commit e1f489b

Browse files
committed
Merge branch 'OMCPath' into MP_merge
2 parents 1e3341f + 2dadf3e commit e1f489b

2 files changed

Lines changed: 264 additions & 0 deletions

File tree

OMPython/OMCSession.py

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,169 @@ def getClassNames(self, className=None, recursive=False, qualified=False, sort=F
271271
return self._ask(question='getClassNames', opt=opt)
272272

273273

274+
class OMCPathReal(pathlib.PurePosixPath):
275+
"""
276+
Implementation of a basic Path object which uses OMC as backend. The connection to OMC is provided via a
277+
OMCSessionZMQ session object.
278+
"""
279+
280+
def __init__(self, *path, session: OMCSessionZMQ):
281+
super().__init__(*path)
282+
self._session = session
283+
284+
def with_segments(self, *pathsegments):
285+
"""
286+
Create a new OMCPath object with the given path segments.
287+
288+
The original definition of Path is overridden to ensure session is set.
289+
"""
290+
return type(self)(*pathsegments, session=self._session)
291+
292+
def is_file(self) -> bool:
293+
"""
294+
Check if the path is a regular file.
295+
"""
296+
return self._session.sendExpression(f'regularFileExists("{self.as_posix()}")')
297+
298+
def is_dir(self) -> bool:
299+
"""
300+
Check if the path is a directory.
301+
"""
302+
return self._session.sendExpression(f'directoryExists("{self.as_posix()}")')
303+
304+
def read_text(self, encoding=None, errors=None) -> str:
305+
"""
306+
Read the content of the file represented by this path as text.
307+
308+
The additional arguments `encoding` and `errors` are only defined for compatibility with Path() definitions.
309+
"""
310+
return self._session.sendExpression(f'readFile("{self.as_posix()}")')
311+
312+
def write_text(self, data: str, encoding=None, errors=None, newline=None) -> bool:
313+
"""
314+
Write text data to the file represented by this path.
315+
316+
The additional arguments `encoding`, `errors`, and `newline` are only defined for compatibility with Path()
317+
definitions.
318+
"""
319+
if not isinstance(data, str):
320+
raise TypeError('data must be str, not %s' %
321+
data.__class__.__name__)
322+
323+
return self._session.sendExpression(f'writeFile("{self.as_posix()}", "{data}", false)')
324+
325+
def mkdir(self, mode=0o777, parents=False, exist_ok=False):
326+
"""
327+
Create a directory at the path represented by this OMCPath object.
328+
329+
The additional arguments `mode`, and `parents` are only defined for compatibility with Path() definitions.
330+
"""
331+
if self.is_dir() and not exist_ok:
332+
raise FileExistsError(f"Directory {self.as_posix()} already exists!")
333+
334+
return self._session.sendExpression(f'mkdir("{self.as_posix()}")')
335+
336+
def cwd(self):
337+
"""
338+
Returns the current working directory as an OMCPath object.
339+
"""
340+
cwd_str = self._session.sendExpression('cd()')
341+
return OMCPath(cwd_str, session=self._session)
342+
343+
def unlink(self, missing_ok: bool = False) -> bool:
344+
"""
345+
Unlink (delete) the file or directory represented by this path.
346+
"""
347+
res = self._session.sendExpression(f'deleteFile("{self.as_posix()}")')
348+
if not res and not missing_ok:
349+
raise FileNotFoundError(f"Cannot delete file {self.as_posix()} - it does not exists!")
350+
return res
351+
352+
def resolve(self, strict: bool = False) -> OMCPath:
353+
"""
354+
Resolve the path to an absolute path. This is done based on available OMC functions.
355+
"""
356+
if strict and not (self.is_file() or self.is_dir()):
357+
raise OMCSessionException(f"Path {self.as_posix()} does not exist!")
358+
359+
if self.is_file():
360+
omcpath = self._omc_resolve(self.parent.as_posix()) / self.name
361+
elif self.is_dir():
362+
omcpath = self._omc_resolve(self.as_posix())
363+
else:
364+
raise OMCSessionException(f"Path {self.as_posix()} is neither a file nor a directory!")
365+
366+
return omcpath
367+
368+
def _omc_resolve(self, pathstr: str) -> OMCPath:
369+
"""
370+
Internal function to resolve the path of the OMCPath object using OMC functions *WITHOUT* changing the cwd
371+
within OMC.
372+
"""
373+
expression = ('omcpath_cwd := cd(); '
374+
f'omcpath_check := cd("{pathstr}"); ' # check requested pathstring
375+
'cd(omcpath_cwd)')
376+
377+
try:
378+
result = self._session.sendExpression(command=expression, parsed=False)
379+
result_parts = result.split('\n')
380+
pathstr_resolved = result_parts[1]
381+
pathstr_resolved = pathstr_resolved[1:-1] # remove quotes
382+
383+
omcpath_resolved = self._session.omcpath(pathstr_resolved)
384+
except OMCSessionException as ex:
385+
raise OMCSessionException(f"OMCPath resolve failed for {pathstr}!") from ex
386+
387+
if not omcpath_resolved.is_file() and not omcpath_resolved.is_dir():
388+
raise OMCSessionException(f"OMCPath resolve failed for {pathstr} - path does not exist!")
389+
390+
return omcpath_resolved
391+
392+
def absolute(self) -> OMCPath:
393+
"""
394+
Resolve the path to an absolute path. This is done by calling resolve() as it is the best we can do
395+
using OMC functions.
396+
"""
397+
return self.resolve(strict=True)
398+
399+
def exists(self) -> bool:
400+
"""
401+
Semi replacement for pathlib.Path.exists().
402+
"""
403+
return self.is_file() or self.is_dir()
404+
405+
def size(self) -> int:
406+
"""
407+
Get the size of the file in bytes - this is a extra function and the best we can do using OMC.
408+
"""
409+
if not self.is_file():
410+
raise OMCSessionException(f"Path {self.as_posix()} is not a file!")
411+
412+
res = self._session.sendExpression(f'stat("{self.as_posix()}")')
413+
if res[0]:
414+
return int(res[1])
415+
416+
raise OMCSessionException(f"Error reading file size for path {self.as_posix()}!")
417+
418+
419+
if sys.version_info < (3, 12):
420+
warnings.warn(
421+
message="Python < 3.12 - using a limited compatibility class as OMCPath replacement.",
422+
category=DeprecationWarning,
423+
stacklevel=1,
424+
)
425+
426+
class OMCPathCompatibility(pathlib.PosixPath):
427+
428+
def size(self) -> int:
429+
return self.stat().st_size
430+
431+
OMCPath = OMCPathCompatibility # noqa: F811
432+
433+
else:
434+
OMCPath = OMCPathReal
435+
436+
274437
class OMCSessionZMQ:
275438

276439
def __init__(
@@ -325,6 +488,42 @@ def __del__(self):
325488

326489
self.omc_zmq = None
327490

491+
def omcpath(self, *path) -> OMCPath:
492+
"""
493+
Create an OMCPath object based on the given path segments and the current OMC session.
494+
"""
495+
496+
# fallback solution for Python < 3.12; a modified pathlib.Path object is used as OMCPath replacement
497+
if sys.version_info < (3, 12):
498+
# noinspection PyArgumentList
499+
return OMCPath(*path)
500+
else:
501+
return OMCPath(*path, session=self)
502+
503+
def omcpath_tempdir(self) -> OMCPath:
504+
"""
505+
Get a temporary directory using OMC.
506+
"""
507+
names = [str(uuid.uuid4()) for _ in range(100)]
508+
509+
tempdir_str = self.sendExpression("getTempDirectoryPath()")
510+
tempdir_base = self.omcpath(tempdir_str)
511+
tempdir: Optional[OMCPath] = None
512+
for name in names:
513+
# create a unique temporary directory name
514+
tempdir = tempdir_base / name
515+
516+
if tempdir.exists():
517+
continue
518+
519+
tempdir.mkdir(parents=True, exist_ok=False)
520+
break
521+
522+
if tempdir is None or not tempdir.is_dir():
523+
raise OMCSessionException("Cannot create a temporary directory!")
524+
525+
return tempdir
526+
328527
def execute(self, command: str):
329528
warnings.warn("This function is depreciated and will be removed in future versions; "
330529
"please use sendExpression() instead", DeprecationWarning, stacklevel=2)

tests/test_OMCPath.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import sys
2+
import OMPython
3+
import pytest
4+
5+
skip_on_windows = pytest.mark.skipif(
6+
sys.platform.startswith("win"),
7+
reason="OpenModelica Docker image is Linux-only; skipping on Windows.",
8+
)
9+
10+
skip_python_older_312 = pytest.mark.skipif(
11+
sys.version_info < (3, 12),
12+
reason="OMCPath only working for Python >= 3.12 (definition of pathlib.PurePath).",
13+
)
14+
15+
16+
@skip_on_windows
17+
@skip_python_older_312
18+
def test_OMCPath_docker():
19+
omcp = OMPython.OMCProcessDocker(docker="openmodelica/openmodelica:v1.25.0-minimal")
20+
om = OMPython.OMCSessionZMQ(omc_process=omcp)
21+
assert om.sendExpression("getVersion()") == "OpenModelica 1.25.0"
22+
23+
_run_OMCPath_checks(om)
24+
25+
del omcp
26+
del om
27+
28+
29+
@skip_python_older_312
30+
def test_OMCPath_local():
31+
om = OMPython.OMCSessionZMQ()
32+
33+
_run_OMCPath_checks(om)
34+
35+
del om
36+
37+
38+
@pytest.mark.skip(reason="Not able to run WSL on github")
39+
def test_OMCPath_WSL():
40+
omcp = OMPython.OMCProcessWSL(
41+
wsl_omc='omc',
42+
wsl_user='omc',
43+
timeout=30.0,
44+
)
45+
om = OMPython.OMCSessionZMQ(omc_process=omcp)
46+
47+
_run_OMCPath_checks(om)
48+
49+
del omcp
50+
del om
51+
52+
53+
def _run_OMCPath_checks(om: OMPython.OMCSessionZMQ):
54+
p1 = om.omcpath_tempdir()
55+
p2 = p1 / '..' / p1.name / 'test.txt'
56+
assert p2.is_file() is False
57+
assert p2.write_text('test')
58+
assert p2.is_file()
59+
p2 = p2.resolve().absolute()
60+
assert str(p2) == f"{str(p1)}/test.txt"
61+
assert p2.read_text() == "test"
62+
assert p2.is_file()
63+
assert p2.parent.is_dir()
64+
assert p2.unlink()
65+
assert p2.is_file() is False

0 commit comments

Comments
 (0)