33from qtpy import QtWidgets
44
55import bw2data as bd
6+ import bw_functional as bf
67
78from activity_browser import application
89from activity_browser .bwutils import refresh_node
@@ -18,40 +19,44 @@ class ActivityDuplicateToDB(ABAction):
1819
1920 @classmethod
2021 @exception_dialogs
21- def run (cls , nodes : List [tuple | int | bd .Node ], to_db : str = None ):
22+ def run (cls , nodes : List [tuple | int | bd .Node ], to_db_name : str = None ):
2223 nodes = [refresh_node (node ) for node in nodes ]
23-
2424 dbs = {node .get ("database" ) for node in nodes }
25+ from_db_name = next (iter (dbs ))
26+ from_db_backend = bd .databases [from_db_name ]["backend" ]
27+
2528 if not len (dbs ) == 1 :
2629 raise ValueError ("All selected activities must be from the same database." )
27- from_db = next (iter (dbs ))
2830
29- if to_db and not cls . confirm_db ( to_db ):
30- return
31+ if any ([ isinstance ( node , bf . Product ) for node in nodes ] ):
32+ raise ValueError ( "Products cannot be duplicated to another database. Duplicate the parent process instead." )
3133
32- to_db = to_db or cls .request_db (nodes , backend = bd .databases [from_db ]["backend" ])
34+ if to_db_name :
35+ if not cls .confirm_db (to_db_name ):
36+ return
37+ else :
38+ to_db_name = cls .request_db (from_db_name )
3339
34- if not to_db :
35- return
40+ to_db_backend = bd .databases [to_db_name ]["backend" ]
3641
37- new_nodes = []
38-
39- # otherwise move all supplied nodes to the db by copying them
40- for node in nodes :
41- new_node = node .copy (database = to_db )
42- new_nodes .append (new_node )
42+ if from_db_backend == to_db_backend :
43+ new_nodes = cls .duplicate_simple (nodes , to_db_name )
44+ elif from_db_backend == "sqlite" and to_db_backend == "functional_sqlite" :
45+ new_nodes = cls .duplicate_sqlite_to_functional_sqlite (nodes , to_db_name )
46+ elif from_db_backend == "functional_sqlite" and to_db_backend == "sqlite" :
47+ new_nodes = cls .duplicate_functional_sqlite_to_sqlite (nodes , to_db_name )
48+ else :
49+ raise NotImplementedError (f"Moving from { from_db_backend } to { to_db_backend } is not supported." )
4350
4451 ActivityOpen .run (new_nodes )
4552
4653 @staticmethod
47- def request_db (nodes : list [bd .Node ], backend : str ) -> str | None :
48- # get valid databases (not the original database, locked databases, or databases with a different backend)
49- origin_db = next (iter (nodes )).get ("database" )
54+ def request_db (from_db_name : str ) -> str | None :
55+ # get valid databases (not the original database, or locked databases)
5056 target_dbs = [
51- db for db , meta in bd .databases .items () if
52- db != origin_db
53- and meta .get ("read_only" ) is not True
54- and meta .get ("backend" ) == backend
57+ db_name for db_name , meta in bd .databases .items () if
58+ db_name != from_db_name
59+ and meta .get ("read_only" , True ) is not True
5560 ]
5661
5762 # return if there are no valid databases to duplicate to
@@ -76,10 +81,54 @@ def request_db(nodes: list[bd.Node], backend: str) -> str | None:
7681 return target_db if ok else None
7782
7883 @staticmethod
79- def confirm_db (to_db : str ):
84+ def confirm_db (to_db_name : str ):
8085 user_choice = QtWidgets .QMessageBox .question (
8186 application .main_window ,
8287 "Move to new database" ,
83- f"Move to { to_db } and open as new tab?" ,
88+ f"Move to { to_db_name } and open as new tab?" ,
8489 )
8590 return user_choice == user_choice .Yes
91+
92+ @staticmethod
93+ def duplicate_simple (nodes : list [bd .Node ], to_db_name : str ) -> list [bd .Node ]:
94+ new_nodes = []
95+
96+ # move all supplied nodes to the db by copying them
97+ for node in nodes :
98+ new_node = node .copy (database = to_db_name )
99+ new_nodes .append (new_node )
100+
101+ return new_nodes
102+
103+ @staticmethod
104+ def duplicate_sqlite_to_functional_sqlite (nodes : list [bd .Node ], to_db_name : str ) -> list [bd .Node ]:
105+ from bw_functional .convert import SQLiteToFunctionalSQLite
106+ new_nodes = []
107+
108+ for node in nodes :
109+ dataset = node .as_dict ()
110+
111+ dataset .pop ("id" , None )
112+ dataset .pop ("key" , None )
113+
114+ dataset ["exchanges" ] = [exc .as_dict () for exc in node .exchanges ()]
115+ dataset ["database" ] = to_db_name # because we didn't copy the dict this will also be reflected in node.key
116+
117+ new_datasets = SQLiteToFunctionalSQLite .convert_process (node .key , dataset , False )
118+ new_exchanges = [x for ds in new_datasets .values () for x in ds .pop ("exchanges" , [])]
119+
120+ for key , new_dataset in new_datasets .items ():
121+ new_node = bd .Node (** new_dataset )
122+ new_node .save ()
123+ new_nodes .append (new_node )
124+
125+ for exc in new_exchanges :
126+ exc ["output" ] = (to_db_name , exc ["output" ][1 ]) # relink output to new db
127+ new_exc = bd .Edge (** exc )
128+ new_exc .save ()
129+
130+ return new_nodes
131+
132+ @staticmethod
133+ def duplicate_functional_sqlite_to_sqlite (nodes : list [bd .Node ], to_db_name : str ) -> list [bd .Node ]:
134+ raise NotImplementedError ("Duplicating from functional_sqlite to sqlite is not yet implemented." )
0 commit comments