Skip to content

Commit 4a89c56

Browse files
committed
Refactor createCSVData
I found the old code really hard to understand, so I decided to rewrite it from scratch. It should behave exactly the same (and the tests in test_ModelicaSystem prove that).
1 parent 1c50417 commit 4a89c56

1 file changed

Lines changed: 44 additions & 104 deletions

File tree

OMPython/ModelicaSystem.py

Lines changed: 44 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -801,112 +801,52 @@ def checkValidInputs(self, name):
801801
else:
802802
ModelicaSystemError('Error!!! Value must be in tuple format')
803803

804-
# To create csv file for inputs
805-
def createCSVData(self):
806-
sl = [] # Actual timestamps
807-
skip = False
808-
809-
# check for NONE in input list and replace with proper data (e.g) [(startTime, 0.0), (stopTime, 0.0)]
810-
tmpinputlist = {}
811-
for key, value in self.inputlist.items():
812-
if value is None:
813-
tmpinputlist[key] = [(float(self.simulateOptions["startTime"]), 0.0),
814-
(float(self.simulateOptions["stopTime"]), 0.0)]
804+
def createCSVData(self) -> None:
805+
start_time: float = float(self.simulateOptions["startTime"])
806+
stop_time: float = float(self.simulateOptions["stopTime"])
807+
808+
# Replace None inputs with a default constant zero signal
809+
inputs: dict[str, list[tuple[float, float]]] = {}
810+
for input_name, input_signal in self.inputlist.items():
811+
if input_signal is None:
812+
inputs[input_name] = [(start_time, 0.0), (stop_time, 0.0)]
815813
else:
816-
tmpinputlist[key] = value
817-
818-
inp = list(tmpinputlist.values())
819-
820-
for i in inp:
821-
cl = list()
822-
el = list()
823-
for t, x in i:
824-
cl.append(t)
825-
for i in cl:
826-
if skip is True:
827-
skip = False
828-
continue
829-
if i not in sl:
830-
el.append(i)
831-
else:
832-
elem_no = cl.count(i)
833-
sl_no = sl.count(i)
834-
if elem_no == 2 and sl_no == 1:
835-
el.append(i)
836-
skip = True
837-
sl = sl + el
838-
839-
sl.sort()
840-
for t in sl:
841-
for i in inp:
842-
for ttt in [tt[0] for tt in i]:
843-
if t not in [tt[0] for tt in i]:
844-
i.append((t, '?'))
845-
inpSortedList = list()
846-
sortedList = list()
847-
for i in inp:
848-
sortedList = sorted(i, key=lambda x: x[0])
849-
inpSortedList.append(sortedList)
850-
for i in inpSortedList:
851-
ind = 0
852-
for t, x in i:
853-
if x == '?':
854-
t1 = i[ind - 1][0]
855-
u1 = i[ind - 1][1]
856-
t2 = i[ind + 1][0]
857-
u2 = i[ind + 1][1]
858-
nex = 2
859-
while (u2 == '?'):
860-
u2 = i[ind + nex][1]
861-
t2 = i[ind + nex][0]
862-
nex += 1
863-
x = float(u1 + (u2 - u1) * (t - t1) / (t2 - t1))
864-
i[ind] = (t, x)
865-
ind += 1
866-
slSet = list()
867-
slSet = set(sl)
868-
for i in inpSortedList:
869-
tempTime = list()
870-
for (t, x) in i:
871-
tempTime.append(t)
872-
inSl = None
873-
inI = None
874-
for s in slSet:
875-
inSl = sl.count(s)
876-
inI = tempTime.count(s)
877-
if inSl != inI:
878-
test = list()
879-
test = [(x, y) for x, y in i if x == s]
880-
i.append(test[0])
881-
newInpList = list()
882-
tempSorting = list()
883-
for i in inpSortedList:
884-
# i.sort() => just sorting might not work so need to sort according to 1st element of a tuple
885-
tempSorting = sorted(i, key=lambda x: x[0])
886-
newInpList.append(tempSorting)
887-
888-
interpolated_inputs_all = list()
889-
for i in newInpList:
890-
templist = list()
891-
for (t, x) in i:
892-
templist.append(x)
893-
interpolated_inputs_all.append(templist)
894-
895-
name = ','.join(list(self.inputlist.keys()))
896-
name = f'time,{name},end'
897-
898-
a = ''
899-
l = []
900-
l.append(name)
901-
for i in range(0, len(sl)):
902-
a = f'{float(sl[i])},{",".join(str(float(inppp[i])) for inppp in interpolated_inputs_all)},0'
903-
l.append(a)
904-
905-
self.csvFile = (pathlib.Path(self.tempdir) / f'{self.modelName}.csv').as_posix()
814+
inputs[input_name] = input_signal
815+
816+
# Collect all unique timestamps across all input signals
817+
all_times = np.array(
818+
sorted({t for signal in inputs.values() for t, _ in signal}),
819+
dtype=float
820+
)
821+
822+
# Interpolate missing values
823+
interpolated_inputs: dict[str, np.ndarray] = {}
824+
for signal_name, signal_values in inputs.items():
825+
signal = np.array(signal_values)
826+
interpolated_inputs[signal_name] = np.interp(
827+
all_times,
828+
signal[:, 0], # times
829+
signal[:, 1] # values
830+
)
831+
832+
# Write CSV file
833+
input_names = list(interpolated_inputs.keys())
834+
header = ['time'] + input_names + ['end']
835+
836+
csv_rows = [header]
837+
for i, t in enumerate(all_times):
838+
row = [
839+
t, # time
840+
*(interpolated_inputs[name][i] for name in input_names), # input values
841+
0 # trailing 'end' column
842+
]
843+
csv_rows.append(row)
844+
845+
self.csvFile: str = (pathlib.Path(self.tempdir) / f'{self.modelName}.csv').as_posix()
846+
906847
with open(self.csvFile, "w", newline="") as f:
907-
writer = csv.writer(f, delimiter='\n')
908-
writer.writerow(l)
909-
f.close()
848+
writer = csv.writer(f)
849+
writer.writerows(csv_rows)
910850

911851
# to convert Modelica model to FMU
912852
def convertMo2Fmu(self, version="2.0", fmuType="me_cs", fileNamePrefix="<default>", includeResources=True): # 19

0 commit comments

Comments
 (0)