-
Notifications
You must be signed in to change notification settings - Fork 0
/
graph_conversions.py
271 lines (212 loc) · 10.1 KB
/
graph_conversions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
import networkx as nx
import matplotlib.cm as cm
import matplotlib.pyplot as plt
from networkx.readwrite import json_graph
import os.path
import json
import uuid
import random
import copy
import string
import random
def generate_graph_from_graphwalker_json(file_name):
file_path = os.path.join("json_models", file_name)
with open(file_path) as f:
json_data = json.load(f)
model_data = json_data.get("models")[0]
model = {
"directed": False,
"multigraph": False,
"graph": {"name": model_data["name"]},
"nodes": [],
"links": [],
}
for vertice in model_data["vertices"]:
vertice_dict = {
"id": vertice["id"],
"name": vertice["name"],
"x": vertice["properties"]["x"],
"y": vertice["properties"]["y"],
}
model["nodes"].append(vertice_dict)
for edge in model_data["edges"]:
edge_dict = {
"source": edge["sourceVertexId"],
"target": edge["targetVertexId"],
"id": edge["id"],
"name": edge["name"],
}
model["links"].append(edge_dict)
return model
def generate_mutation_model(main_model, mutation_number):
mutated_model = copy.deepcopy(main_model)
model_name = mutated_model["graph"]["name"]
mutated_model["graph"]["name"] = f"{model_name}-{mutation_number + 1}"
action = ""
random_number = random.randint(1, 2)
if random_number == 1:
action = delete_link(mutated_model)
elif random_number == 2:
action = delete_node(mutated_model)
mutated_model["action"] = action
return mutated_model
def delete_link(model):
"""Delete a random link from the model to create a mutant by Delete(edge)
Do not delete links that start and end nodes are part of.
"""
start_node = next((e for e in model["nodes"] if e["name"] == "v_Start"), None)
finish_node = next((e for e in model["nodes"] if e["name"] == "v_Finish"), None)
connection_link_ids = [e["id"] for e in model["links"] if e["name"] in "e_Connection"]
selected_link_id = ""
while selected_link_id == "":
random_link_number = random.randint(0, len(model["links"]) - 1)
temp_link = model["links"][random_link_number]
if (
temp_link["source"] != start_node["id"]
and temp_link["source"] != finish_node["id"]
and temp_link["target"] != start_node["id"]
and temp_link["target"] != finish_node["id"]
and temp_link["id"] not in connection_link_ids
):
selected_link_id = temp_link["id"]
new_links = [x for x in model["links"] if x["id"] != selected_link_id]
model["links"] = new_links
return f"The link with id: {selected_link_id} is deleted"
def delete_node(model):
"""Delete a random node from the model to create a mutant by Delete(vertex)
Do not delete start and end nodes.
"""
selected_node_id = ""
while selected_node_id == "":
random_node_number = random.randint(0, len(model["nodes"]) - 1)
temp_node = model["nodes"][random_node_number]
if temp_node["name"] != "v_Start" and temp_node["name"] != "v_Finish":
selected_node_id = temp_node["id"]
new_links = [x for x in model["links"] if (x["source"] != selected_node_id and x["target"] != selected_node_id)]
model["links"] = new_links
return f"The node with id: {selected_node_id} is deleted"
def get_model_vertice(id, main_model):
vertice_dict = {"id": id, "name": "", "properties": {"x": 0, "y": 0}}
vertice = [x for x in main_model["nodes"] if x.get("id") == id]
vertice_dict["name"] = vertice[0]["name"]
vertice_dict["properties"]["x"] = vertice[0]["x"]
vertice_dict["properties"]["y"] = vertice[0]["y"]
return vertice_dict
def get_model_edges(source_id, main_model, community_vertice_ids):
empty_edge_dict = {
"id": "",
"name": "",
"sourceVertexId": source_id,
"targetVertexId": "",
}
edges = [x for x in main_model["links"] if x.get("source") == source_id and x.get("target") in community_vertice_ids]
edge_dict_list = []
for edge in edges:
edge_dict = empty_edge_dict.copy()
edge_dict["name"] = edge["name"]
edge_dict["sourceVertexId"] = edge["source"]
edge_dict["targetVertexId"] = edge["target"]
edge_dict["id"] = edge["id"]
edge_dict_list.append(edge_dict)
return edge_dict_list
def get_community_last_vertice(community, main_model):
main_model_finish_id = [x for x in main_model["nodes"] if x.get("name") == "v_Finish"][0]["id"]
finish_edge = [x for x in main_model["links"] if x.get("target") == main_model_finish_id][0]
last_vertice_id = finish_edge.get("source")
# community_last_vertice = [x for x in main_model["nodes"] if x.get("id") == last_vertice_id][0]
community_last_vertice = get_model_vertice(last_vertice_id, main_model)
return community_last_vertice
def find_entrance_to_community_model(community, main_model):
# find target vertexes that have edge with other members of the main model
for vertice_id in community:
entrance_vertex_array = [x for x in main_model["links"] if x.get("target") == vertice_id and x.get("source") not in community]
if len(entrance_vertex_array) > 0:
return entrance_vertex_array[0]
def generate_graphwalker_json_from_model(community_number, community, main_model):
empty_model_json_file = os.path.join("json_models", "emptyModel.json")
with open(empty_model_json_file) as f_main:
json_data = json.load(f_main)
model_data = json_data.get("models")[0]
model_data["id"] = str(uuid.uuid4())
model_data["name"] = f"community-{community_number}"
# Create guid for the start node
start_element_id = str(uuid.uuid4())
model_data["startElementId"] = start_element_id
model_data["vertices"][0]["id"] = start_element_id
# Create guid for the end node
finish_element_id = str(uuid.uuid4())
model_data["vertices"][1]["id"] = finish_element_id
json_data["selectedElementId"] = finish_element_id
main_model_start_edge = None
main_model_start_id = [x for x in main_model["nodes"] if x.get("name") == "v_Start"][0]["id"]
main_model_start_edge_list = [x for x in main_model["links"] if x.get("source") == main_model_start_id and x.get("target") in community]
if len(main_model_start_edge_list) > 0:
main_model_start_edge = main_model_start_edge_list[0]
community_start_edge = {}
is_middle_community = False
# If main_model_start_edge is None, it means the community is in the middle of the main Graph so not connected with the v_Start
if main_model_start_edge is None:
is_middle_community = True
# In this case, we will just connect the community nodel v_Start with the start vertex of the community
# We need to find the proper entry to the community here
community_entrance_edge = find_entrance_to_community_model(community, main_model)
community_start_edge = {
"id": str(uuid.uuid4()),
"name": community_entrance_edge.get("name"),
"sourceVertexId": start_element_id,
"targetVertexId": community_entrance_edge.get("target"),
}
else:
community_start_edge = {
"id": str(uuid.uuid4()),
"name": main_model_start_edge.get("name"),
"sourceVertexId": start_element_id,
"targetVertexId": main_model_start_edge.get("target"),
}
model_data["edges"].append(community_start_edge)
community_last_vertice = get_community_last_vertice(community, main_model)
for vertice_id in community:
vertice = get_model_vertice(vertice_id, main_model)
community_vertice_ids = community.copy()
community_vertice_ids.append(community_last_vertice["id"])
edges = get_model_edges(vertice_id, main_model, community_vertice_ids)
model_data["vertices"].append(vertice)
for edge in edges:
model_data["edges"].append(edge)
model_data["vertices"].append(community_last_vertice)
community_end_edge = {
"id": str(uuid.uuid4()),
"name": "e_Finish",
"sourceVertexId": community_last_vertice.get("id"),
"targetVertexId": finish_element_id,
}
model_data["edges"].append(community_end_edge)
# Pseudo edge from finish to start
community_pseudo_edge = {
"id": str(uuid.uuid4()),
"name": "e_Pseudo",
"sourceVertexId": finish_element_id,
"targetVertexId": start_element_id,
}
model_data["edges"].append(community_pseudo_edge)
community_json_name = f"community-{community_number}.json"
community_json_file_path = os.path.join("json_models", community_json_name)
with open(community_json_file_path, "w", encoding="utf-8") as f_community:
json.dump(json_data, f_community, ensure_ascii=False, indent=4)
return community_json_name, is_middle_community
def eliminate_same_name_vertexes(model_name):
model_json_file = os.path.join("json_models", model_name)
with open(model_json_file) as f_main:
json_data = json.load(f_main)
model_data = json_data.get("models")[0]
existing_vertex_names = []
for vertice in model_data["vertices"]:
if vertice["name"] in existing_vertex_names:
random_suffix = "".join(random.choices(string.ascii_uppercase + string.digits, k=10))
vertice["name"] = vertice["name"] + random_suffix
if vertice["name"] not in existing_vertex_names:
existing_vertex_names.append(vertice["name"])
else:
existing_vertex_names.append(vertice["name"])
with open(model_json_file, "w", encoding="utf-8") as json_file:
json.dump(json_data, json_file, ensure_ascii=False, indent=4)