-
Notifications
You must be signed in to change notification settings - Fork 68
Expand file tree
/
Copy pathsensitivity_analysis.py
More file actions
499 lines (415 loc) · 17.8 KB
/
sensitivity_analysis.py
File metadata and controls
499 lines (415 loc) · 17.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
# -*- coding: utf-8 -*-
# =============================================================================
# Global Sensitivity Analysis (GSA) functions and class for the Delta
# Moment-Independent measure based on Monte Carlo simulation LCA results.
# see: https://salib.readthedocs.io/en/latest/api.html#delta-moment-independent-measure
# =============================================================================
import os
import traceback
from time import time
from logging import getLogger
import bw2calc as bc
import numpy as np
import pandas as pd
from SALib.analyze import delta
# SALib>=1.5 can call `numpy.trapezoid`, which is only available in NumPy 2.x.
# Keep compatibility with NumPy 1.x environments by aliasing to `trapz`.
# Can be removed when we move to numpy>1
if not hasattr(np, "trapezoid"):
np.trapezoid = np.trapz
from activity_browser.mod import bw2data as bd
from ..settings import ab_settings
from .montecarlo import MonteCarloLCA, perform_MonteCarlo_LCA
try:
# attempt bw25 import
from bw_graph_tools.graph_traversal import NewNodeEachVisitGraphTraversal
except ImportError:
# standard import on failure
from bw2calc import GraphTraversal
log = getLogger(__name__)
def get_lca(fu, method):
"""Calculates a non-stochastic LCA and returns a the LCA object."""
lca = bc.LCA(fu, method=method)
lca.lci()
lca.lcia()
log.info(f"Non-stochastic LCA score: {lca.score}")
# add reverse dictionaries
lca.activity_dict_rev, lca.product_dict_rev, lca.biosphere_dict_rev = (
lca.reverse_dict()
)
return lca
def filter_technosphere_exchanges(lca, cutoff=0.05, max_calc=1000):
"""Use brightway's GraphTraversal to identify the relevant
technosphere exchanges in a non-stochastic LCA."""
start = time()
res = NewNodeEachVisitGraphTraversal.calculate(lca, cutoff=cutoff, max_calc=int(max_calc))
# get all edges
technosphere_exchange_indices = []
for e in res["edges"]:
if e.consumer_index != -1: # filter out head introduced in graph traversal
technosphere_exchange_indices.append((e.producer_index, e.consumer_index))
log.info(
"TECHNOSPHERE {} filtering resulted in {} of {} exchanges and took {} iterations in {} seconds.".format(
lca.technosphere_matrix.shape,
len(technosphere_exchange_indices),
lca.technosphere_matrix.getnnz(),
res["calculation_count"],
np.round(time() - start, 2),
)
)
return technosphere_exchange_indices
def filter_biosphere_exchanges(lca, cutoff=0.005):
"""Reduce biosphere exchanges to those that matter for a given impact
category in a non-stochastic LCA."""
start = time()
inv = lca.characterized_inventory
finv = inv.multiply(abs(inv) > abs(lca.score / (1 / cutoff)))
biosphere_exchange_indices = list(zip(*finv.nonzero()))
explained_fraction = finv.sum() / lca.score
log.info(
"BIOSPHERE {} filtering resulted in {} of {} exchanges ({}% of total impact) and took {} seconds.".format(
inv.shape,
finv.nnz,
inv.nnz,
np.round(explained_fraction * 100, 2),
np.round(time() - start, 2),
)
)
return biosphere_exchange_indices
def get_exchanges(lca, indices, biosphere=False, only_uncertain=True):
"""Get actual exchange objects from indices.
By default get only exchanges that have uncertainties.
Returns
-------
exchanges : list
List of exchange objects
indices : list of tuples
List of indices
"""
exchanges = list()
for i in indices:
if biosphere:
from_act = bd.get_activity(lca.biosphere_dict_rev[i[0]])
else: # technosphere
from_act = bd.get_activity(lca.activity_dict_rev[i[0]])
to_act = bd.get_activity(lca.activity_dict_rev[i[1]])
for exc in to_act.exchanges():
if exc.input == from_act.key:
exchanges.append(exc)
# continue # if there was always only one max exchange between two activities
# in theory there should be as many exchanges as indices, but since
# multiple exchanges are possible between two activities, the number of
# exchanges must be at least equal or higher to the number of indices
if len(exchanges) < len(
indices
): # must have at least as many exchanges as indices (assu)
raise ValueError(
"Error: mismatch between indices provided ({}) and Exchanges received ({}).".format(
len(indices), len(exchanges)
)
)
# by default drop exchanges and indices if the have no uncertainties
if only_uncertain:
exchanges, indices = drop_no_uncertainty_exchanges(exchanges, indices)
return exchanges, indices
def drop_no_uncertainty_exchanges(excs, indices):
excs_no = list()
indices_no = list()
for exc, ind in zip(excs, indices):
if exc.get("uncertainty type") and exc.get("uncertainty type") >= 1:
excs_no.append(exc)
indices_no.append(ind)
log.info(
"Dropping {} exchanges of {} with no uncertainty. {} remaining.".format(
len(excs) - len(excs_no), len(excs), len(excs_no)
)
)
return excs_no, indices_no
def get_exchanges_dataframe(exchanges, indices, biosphere=False):
"""Returns a Dataframe from the exchange data and a bit of additional information."""
for exc, i in zip(exchanges, indices):
from_act = bd.get_activity(exc.get("input"))
to_act = bd.get_activity(exc.get("output"))
exc.update(
{
"index": i,
"from name": from_act.get("name", np.nan),
"from location": from_act.get("location", np.nan),
"to name": to_act.get("name", np.nan),
"to location": to_act.get("location", np.nan),
}
)
# GSA name (needs to yield unique labels!)
if biosphere:
exc.update(
{
"GSA name": "B: {} // {} ({}) [{}]".format(
from_act.get("name", ""),
to_act.get("name", ""),
to_act.get("reference product", ""),
to_act.get("location", ""),
)
}
)
else:
exc.update(
{
"GSA name": "T: {} FROM {} [{}] TO {} ({}) [{}]".format(
from_act.get("reference product", ""),
from_act.get("name", ""),
from_act.get("location", ""),
to_act.get("name", ""),
to_act.get("reference product", ""),
to_act.get("location", ""),
)
}
)
return pd.DataFrame(exchanges)
def get_CF_dataframe(lca, only_uncertain_CFs=True):
"""Returns a dataframe with the metadata for the characterization factors
(in the biosphere matrix). Filters non-stochastic CFs if desired (default)."""
data = dict()
for params_index, row in enumerate(lca.cf_params):
if only_uncertain_CFs and row["uncertainty_type"] <= 1:
continue
cf_index = row["row"]
bio_act = bd.get_activity(lca.biosphere_dict_rev[cf_index])
data.update({params_index: bio_act.as_dict()})
for name in row.dtype.names:
data[params_index][name] = row[name]
data[params_index]["index"] = cf_index
data[params_index]["GSA name"] = (
"CF: " + bio_act["name"] + str(bio_act["categories"])
)
log.info(
"CHARACTERIZATION FACTORS filtering resulted in including {} of {} characteriation factors.".format(
len(data),
len(lca.cf_params),
)
)
df = pd.DataFrame(data).T
df.rename(columns={"uncertainty_type": "uncertainty type"}, inplace=True)
return df
def get_parameters_DF(mc):
"""Product to make parameters dataframe"""
if bool(mc.parameter_data): # returns False if dict is empty
dfp = pd.DataFrame(mc.parameter_data).T
dfp["GSA name"] = "P: " + dfp["name"]
log.info(f"PARAMETERS: {len(dfp)}")
return dfp
else:
log.info("PARAMETERS: None included.")
return pd.DataFrame() # return emtpy df
def get_exchange_values(matrix, indices):
"""Get technosphere exchanges values from a list of exchanges
(row and column information)"""
return [matrix[i] for i in indices]
def get_X(matrix_list, indices):
"""Get the input data to the GSA, i.e. A and B matrix values for each
model run."""
X = np.zeros((len(matrix_list), len(indices)))
for row, M in enumerate(matrix_list):
X[row, :] = get_exchange_values(M, indices)
return X
def get_X_CF(mc, dfcf, method):
"""Get the characterization factors used for each model run. Only those CFs
that are in the dfcf dataframe will be returned (i.e. by default only the
CFs that have uncertainties."""
# get all CF inputs
CF_data = np.array(mc.CF_dict[method]) # has the same shape as the Xa and Xb below
# reduce this to uncertain CFs only (if this was done for the dfcf)
params_indices = dfcf.index.values
# if params_indices:
return CF_data[:, params_indices]
def get_X_P(dfp):
"""Get the parameter values for each model run"""
lists = [d for d in dfp["values"]]
return list(zip(*lists))
def get_problem(X, names):
return {
"num_vars": X.shape[1],
"names": names,
"bounds": list(zip(*(np.amin(X, axis=0), np.amax(X, axis=0)))),
}
class GlobalSensitivityAnalysis(object):
"""Class for Global Sensitivity Analysis.
For now Delta Moment Independent Measure based on:
https://salib.readthedocs.io/en/latest/api.html#delta-moment-independent-measure
Builds on top of Monte Carlo Simulation results.
"""
def __init__(self, mc):
self.update_mc(mc)
self.act_number = int()
self.method_number = int()
self.cutoff_technosphere = float()
self.cutoff_biosphere = float()
def update_mc(self, mc):
"Update the Monte Carlo Simulation object (and results)."
try:
assert isinstance(mc, MonteCarloLCA)
self.mc = mc
except AssertionError:
raise AssertionError(
"mc should be an instance of MonteCarloLCA, but instead it is a {}.".format(
type(mc)
)
)
def perform_GSA(
self,
act_number=0,
method_number=0,
cutoff_technosphere=0.01,
cutoff_biosphere=0.01,
):
"""Perform GSA for specific reference flow and impact category."""
start = time()
# set FU and method
try:
self.act_number = act_number
self.method_number = method_number
self.cutoff_technosphere = cutoff_technosphere
self.cutoff_biosphere = cutoff_biosphere
self.fu = self.mc.cs["inv"][act_number]
self.activity = bd.get_activity(self.mc.rev_activity_index[act_number])
self.method = self.mc.cs["ia"][method_number]
except Exception as e:
traceback.print_exc()
# todo: QMessageBox.warning(self, 'Could not perform Delta analysis', str(e))
log.error("Initializing the GSA failed.")
return None
log.info(
f"-- GSA --\n Project: {bd.projects.current} CS: {self.mc.cs_name} "
f"Activity: {self.activity} Method: {self.method}",
)
# get non-stochastic LCA object with reverse dictionaries
self.lca = get_lca(self.fu, self.method)
# =============================================================================
# Filter exchanges and get metadata DataFrames
# =============================================================================
dfs = []
# technosphere
if self.mc.include_technosphere:
self.t_indices = filter_technosphere_exchanges(
self.lca, cutoff=cutoff_technosphere, max_calc=1e4
)
self.t_exchanges, self.t_indices = get_exchanges(self.lca, self.t_indices)
self.dft = get_exchanges_dataframe(self.t_exchanges, self.t_indices)
if not self.dft.empty:
dfs.append(self.dft)
# biosphere
if self.mc.include_biosphere:
self.b_indices = filter_biosphere_exchanges(
self.lca, cutoff=cutoff_biosphere
)
self.b_exchanges, self.b_indices = get_exchanges(
self.lca, self.b_indices, biosphere=True
)
self.dfb = get_exchanges_dataframe(
self.b_exchanges, self.b_indices, biosphere=True
)
if not self.dfb.empty:
dfs.append(self.dfb)
# characterization factors
if self.mc.include_cfs:
self.dfcf = get_CF_dataframe(
self.lca, only_uncertain_CFs=True
) # None if no stochastic CFs
if not self.dfcf.empty:
dfs.append(self.dfcf)
# parameters
# todo: if parameters, include df, but remove exchanges from T and B (skipped for now)
self.dfp = get_parameters_DF(self.mc) # Empty df if no parameters
if not self.dfp.empty:
dfs.append(self.dfp)
# Join dataframes to get metadata
self.metadata = pd.concat(dfs, axis=0, ignore_index=True, sort=False)
self.metadata.set_index("GSA name", inplace=True)
# =============================================================================
# GSA
# =============================================================================
# Get X (Technosphere, Biosphere and CF values)
X_list = list()
if self.mc.include_technosphere and self.t_indices:
self.Xa = get_X(self.mc.A_matrices, self.t_indices)
X_list.append(self.Xa)
if self.mc.include_biosphere and self.b_indices:
self.Xb = get_X(self.mc.B_matrices, self.b_indices)
X_list.append(self.Xb)
if self.mc.include_cfs and not self.dfcf.empty:
self.Xc = get_X_CF(self.mc, self.dfcf, self.method)
X_list.append(self.Xc)
if self.mc.include_parameters and not self.dfp.empty:
self.Xp = get_X_P(self.dfp)
X_list.append(self.Xp)
self.X = np.concatenate(X_list, axis=1)
# print('X', self.X.shape)
# Get Y (LCA scores)
self.Y = self.mc.get_results_dataframe(act_key=self.activity.key)[
self.method
].to_numpy()
# log-transformation. This makes it more robust for very uneven distributions of LCA results (e.g. toxicity related impacts).
# Can only be applied if all Monte-Carlo LCA scores are either positive or negative.
# Should not be used when LCA scores overlap zero (sometimes positive and sometimes negative)
# if np.all(self.Y > 0) if self.Y[0] > 0 else np.all(self.Y < 0): # check if all LCA scores are of the same sign
# self.Y = np.log(np.abs(self.Y)) # this makes it more robust for very uneven distributions of LCA results
if np.all(self.Y > 0): # all positive numbers
self.Y = np.log(np.abs(self.Y))
log.info("All positive LCA scores. Log-transformation performed.")
elif np.all(self.Y < 0): # all negative numbers
self.Y = -np.log(np.abs(self.Y))
log.info("All negative LCA scores. Log-transformation performed.")
else: # mixed positive and negative numbers
log.warning(
"Log-transformation cannot be applied as LCA scores overlap zero."
)
# print('Filtering took {} seconds'.format(np.round(time() - start, 2)))
# define problem
self.names = self.metadata.index # ['GSA name']
# print('Names:', len(self.names))
self.problem = get_problem(self.X, self.names)
# perform delta analysis
time_delta = time()
self.Si = delta.analyze(self.problem, self.X, self.Y, print_to_console=False)
log.info(
"Delta analysis took {} seconds".format(
np.round(time() - time_delta, 2),
)
)
# put GSA results in to dataframe
self.dfgsa = pd.DataFrame(self.Si, index=self.names).sort_values(
by="delta", ascending=False
)
self.dfgsa.index.names = ["GSA name"]
# join with metadata
self.df_final = self.dfgsa.join(self.metadata, on="GSA name")
self.df_final.reset_index(inplace=True)
self.df_final["pedigree"] = [str(x) for x in self.df_final["pedigree"]]
log.info("GSA took {} seconds".format(np.round(time() - start, 2)))
def get_save_name(self):
save_name = (
self.mc.cs_name
+ "_"
+ str(self.mc.iterations)
+ "_"
+ self.activity["name"]
+ "_"
+ str(self.method)
+ ".xlsx"
)
save_name = save_name.replace(",", "").replace("'", "").replace("/", "")
return save_name
def export_GSA_output(self):
save_name = "gsa_output_" + self.get_save_name()
self.df_final.to_excel(os.path.join(ab_settings.data_dir, save_name))
def export_GSA_input(self):
"""Export the input data to the GSA with a human readible index"""
X_with_index = pd.DataFrame(self.X.T, index=self.metadata.index)
save_name = "gsa_input_" + self.get_save_name()
X_with_index.to_excel(os.path.join(ab_settings.data_dir, save_name))
if __name__ == "__main__":
mc = perform_MonteCarlo_LCA(project="ei34", cs_name="kraft paper", iterations=20)
g = GlobalSensitivityAnalysis(mc)
g.perform_GSA(
act_number=0, method_number=1, cutoff_technosphere=0.01, cutoff_biosphere=0.01
)
g.export()