1use crate::aql::get_all_data_aql;
2use crate::client::auth::handle_auth;
3use crate::client::config::ClientConfig;
4use crate::client::{build_client, make_url};
5use crate::errors::GraphLoaderError;
6use crate::request::handle_arangodb_response_with_parsed_body;
7use crate::sharding::{compute_faked_shard_map, compute_shard_map};
8use crate::types::info::{
9 DeploymentInfo, DeploymentType, LoadStrategy, SupportInfo, VersionInformation,
10};
11use crate::{DataLoadConfiguration, DatabaseConfiguration};
12use bytes::Bytes;
13use log::{debug, error, info};
14use reqwest::StatusCode;
15use reqwest_middleware::ClientWithMiddleware;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::cmp::PartialEq;
19use std::collections::{HashMap, HashSet};
20use std::num::ParseIntError;
21use std::thread::JoinHandle;
22use std::time::SystemTime;
23
24static MIN_SUPPORTED_MINOR_VERSIONS: &[(u8, u8)] = &[(3, 12)];
25
26#[derive(Clone)]
28pub struct CollectionInfo {
29 pub name: String,
30 pub fields: Vec<String>,
31}
32
33#[derive(Debug, Serialize, Deserialize)]
34struct CursorResult {
35 result: Vec<Value>,
36}
37
38pub struct GraphLoader {
39 db_config: DatabaseConfiguration,
40 load_config: DataLoadConfiguration,
41 v_collections: HashMap<String, CollectionInfo>,
42 e_collections: HashMap<String, CollectionInfo>,
43 vertex_map: crate::sharding::ShardMap,
44 edge_map: crate::sharding::ShardMap,
45 load_strategy: Option<LoadStrategy>,
47 support_info: Option<SupportInfo>,
48 supports_projections: Option<bool>,
49}
50
51fn collection_name_from_id(id: &str) -> String {
52 match id.find('/') {
53 None => "".to_string(),
54 Some(p) => id[0..p].to_string(),
55 }
56}
57
58impl PartialEq<DeploymentType> for &DeploymentType {
59 fn eq(&self, other: &DeploymentType) -> bool {
60 matches!(
61 (self, other),
62 (DeploymentType::Cluster, DeploymentType::Cluster)
63 | (DeploymentType::Single, DeploymentType::Single)
64 )
65 }
66}
67
68impl GraphLoader {
69 pub async fn new(
70 db_config: DatabaseConfiguration,
71 load_config: DataLoadConfiguration,
72 vertex_collections: Vec<CollectionInfo>,
73 edge_collections: Vec<CollectionInfo>,
74 ) -> Result<GraphLoader, GraphLoaderError> {
75 let v_collections = vertex_collections
76 .into_iter()
77 .map(|c| (c.name.clone(), c))
78 .collect();
79 let e_collections = edge_collections
80 .into_iter()
81 .map(|c| (c.name.clone(), c))
82 .collect();
83
84 let mut graph_loader = GraphLoader {
85 db_config,
86 load_config,
87 v_collections,
88 e_collections,
89 vertex_map: HashMap::new(),
90 edge_map: HashMap::new(),
91 load_strategy: None,
92 support_info: None,
93 supports_projections: None,
94 };
95 let init_result = graph_loader.initialize().await;
96 init_result?;
97
98 Ok(graph_loader)
99 }
100
101 async fn does_arangodb_supports_dump_endpoint(
102 &self,
103 client: &ClientWithMiddleware,
104 support_info: &Option<SupportInfo>,
105 ) -> Result<(bool, bool), String> {
106 let is_cluster = support_info
107 .as_ref()
108 .ok_or("Support Info not set".to_string())?
109 .deployment
110 .deployment_type
111 == DeploymentType::Cluster;
112 let server_version_url = self.db_config.endpoints[0].clone() + "/_api/version";
113 let resp = handle_auth(client.get(server_version_url), &self.db_config)
114 .send()
115 .await;
116 let version_info_result =
117 handle_arangodb_response_with_parsed_body::<VersionInformation>(resp, StatusCode::OK)
118 .await;
119 if let Err(e) = version_info_result {
120 return Err(e.to_string());
121 }
122 let version_info = version_info_result.unwrap();
123
124 let version_parts: Vec<&str> = version_info.version.split('.').collect();
125 if version_parts.len() < 3 {
126 return Err(format!(
127 "Unable to parse ArangoDB Version - got {}",
128 version_info.version
129 ));
130 }
131
132 let (supports_dump_endpoint, support_dump_projections) = {
133 let major: u8 = version_parts
134 .first()
135 .ok_or("Unable to parse Major Version".to_string())?
136 .parse()
137 .map_err(|err: ParseIntError| err.to_string())?;
138 let minor: u8 = version_parts
139 .get(1)
140 .ok_or("Unable to parse Minor Version".to_string())?
141 .parse()
142 .map_err(|err: ParseIntError| err.to_string())?;
143 let major_supports = MIN_SUPPORTED_MINOR_VERSIONS
144 .iter()
145 .map(|x| x.0)
146 .any(|x| x == major);
147 if !major_supports {
148 (false, false)
149 } else {
150 let mut supports_dump = MIN_SUPPORTED_MINOR_VERSIONS
151 .iter()
152 .find(|x| x.0 == major)
153 .ok_or("Unable to find supported version".to_string())?
154 .1
155 <= minor;
156
157 if is_cluster && major == 3 && minor == 11 {
159 supports_dump = true;
160 }
161
162 let mut supports_projections = false;
164 if major == 3 && minor >= 12 {
165 supports_projections = true;
166 }
167
168 (supports_dump, supports_projections)
169 }
170 };
171
172 Ok((supports_dump_endpoint, support_dump_projections))
173 }
174
175 async fn get_arangodb_support_information(
176 &self,
177 client: &ClientWithMiddleware,
178 ) -> Result<SupportInfo, String> {
179 let server_information_url = self.db_config.endpoints[0].clone() + "/_admin/support-info";
180 let support_info_res = handle_auth(client.get(server_information_url), &self.db_config)
181 .send()
182 .await;
183 let support_info_result = handle_arangodb_response_with_parsed_body::<SupportInfo>(
184 support_info_res,
185 StatusCode::OK,
186 )
187 .await;
188 if let Err(e) = support_info_result {
189 return Err(e.to_string());
190 }
191 let support_info = support_info_result.unwrap();
192 Ok(support_info)
193 }
194
195 fn identify_arangodb_load_strategy(&self, dump_support_enabled: bool) -> LoadStrategy {
196 let support_info = &self
197 .support_info
198 .as_ref()
199 .unwrap()
200 .deployment
201 .deployment_type;
202 if !dump_support_enabled && support_info == DeploymentType::Single {
203 LoadStrategy::Aql
204 } else {
205 LoadStrategy::Dump
206 }
207 }
208
209 async fn identify_load_strategy(
210 &mut self,
211 client: &ClientWithMiddleware,
212 ) -> Result<(), String> {
213 self.support_info = match self.get_arangodb_support_information(client).await {
214 Ok(info) => Some(info),
215 Err(e) => {
216 print!(
217 "Failed to read ArangoDB environment information: {}. \
218 This can happen if the current user is not allowed to access the '/_admin/support-info' endpoint. \
219 Assuming ArangoDB instance version is at least '3.12'. \
220 We will use the aql load strategy as the loading strategy. \
221 While this works, this will be slower then using the dump endpoint instead. ",
222 e
223 );
224 None
225 }
226 };
227
228 if self.support_info.is_none() {
229 self.support_info = Some(SupportInfo {
231 deployment: DeploymentInfo {
232 deployment_type: DeploymentType::Single,
233 },
234 });
235
236 self.load_strategy = Some(LoadStrategy::Aql);
238 self.supports_projections = Some(true);
240 } else {
241 let (dump_support_enabled, dump_projections_support) = self
242 .does_arangodb_supports_dump_endpoint(client, &self.support_info)
243 .await?;
244 self.supports_projections = Some(dump_projections_support);
245 self.load_strategy = Some(self.identify_arangodb_load_strategy(dump_support_enabled));
246 }
247
248 debug_assert!(self.load_strategy.is_some());
249 debug_assert!(self.supports_projections.is_some());
250
251 Ok(())
252 }
253
254 fn verify_parameters(&self) -> Result<(), GraphLoaderError> {
255 if !self.get_all_vertex_fields_as_list_to_return().is_empty()
256 && self.load_config.load_all_vertex_attributes
257 {
258 return Err(GraphLoaderError::from(
259 "load_all_vertex_attributes is set to true, but vertex collections are not empty."
260 .to_string(),
261 ));
262 }
263 if !self.get_all_edges_fields_as_list_to_return().is_empty()
264 && self.load_config.load_all_edge_attributes
265 {
266 return Err(GraphLoaderError::from(
267 "load_all_edge_attributes is set to true, but edge collections are not empty."
268 .to_string(),
269 ));
270 }
271 Ok(())
272 }
273
274 async fn initialize(&mut self) -> Result<(), GraphLoaderError> {
275 self.verify_parameters()?;
276
277 let use_tls = self.db_config.endpoints[0].starts_with("https://");
278 let client_config = ClientConfig::builder()
279 .n_retries(5)
280 .use_tls(use_tls)
281 .tls_cert_opt(self.db_config.tls_cert.clone())
282 .build();
283 let client = build_client(&client_config)?;
284 self.identify_load_strategy(&client).await?;
285
286 let load_strategy = self.load_strategy.as_ref().unwrap();
287 let deployment_type = &self
288 .support_info
289 .as_ref()
290 .unwrap()
291 .deployment
292 .deployment_type;
293
294 if load_strategy == &LoadStrategy::Dump {
295 if *deployment_type == DeploymentType::Cluster {
296 let url = make_url(&self.db_config, "/_admin/cluster/shardDistribution");
301 let resp = handle_auth(client.get(url), &self.db_config).send().await;
302
303 let shard_dist = handle_arangodb_response_with_parsed_body::<
304 crate::sharding::ShardDistribution,
305 >(resp, StatusCode::OK)
306 .await?;
307 info!("Using dump strategy for loading data.");
308 self.vertex_map =
309 compute_shard_map(&shard_dist, &self.get_vertex_collections_as_list())?;
310 self.edge_map =
311 compute_shard_map(&shard_dist, &self.get_edge_collections_as_list())?;
312 } else {
313 self.vertex_map = compute_faked_shard_map(&self.get_vertex_collections_as_list());
314 self.edge_map = compute_faked_shard_map(&self.get_edge_collections_as_list());
315 }
316 info!(
317 "{:?} Need to fetch data from {} vertex shards and {} edge shards...",
318 std::time::SystemTime::now(),
319 self.vertex_map.values().map(|v| v.len()).sum::<usize>(),
320 self.edge_map.values().map(|v| v.len()).sum::<usize>()
321 );
322 } else {
323 info!("Using AQL strategy for loading data.");
324 }
325
326 Ok(())
327 }
328
329 pub async fn new_named(
330 db_config: DatabaseConfiguration,
331 load_config: DataLoadConfiguration,
332 graph_name: String,
333 vertex_global_fields: Option<Vec<String>>,
334 edge_global_fields: Option<Vec<String>>,
335 ) -> Result<GraphLoader, GraphLoaderError> {
336 let vertex_coll_list;
337 let edge_coll_list;
338
339 match get_graph_collections(&db_config, graph_name).await {
340 Ok((vertex_collections, edge_collections)) => {
341 vertex_coll_list = vertex_collections
342 .iter()
343 .map(|c| CollectionInfo {
344 name: c.clone(),
345 fields: vertex_global_fields.clone().unwrap_or_default(),
346 })
347 .collect();
348
349 edge_coll_list = edge_collections
350 .iter()
351 .map(|c| CollectionInfo {
352 name: c.clone(),
353 fields: edge_global_fields.clone().unwrap_or_default(),
354 })
355 .collect();
356 }
357 Err(err) => {
358 return Err(err);
359 }
360 }
361
362 let graph_loader =
363 GraphLoader::new(db_config, load_config, vertex_coll_list, edge_coll_list).await?;
364 Ok(graph_loader)
365 }
366
367 pub async fn new_custom(
368 db_config: DatabaseConfiguration,
369 load_config: DataLoadConfiguration,
370 vertex_collections: Vec<CollectionInfo>,
371 edge_collections: Vec<CollectionInfo>,
372 ) -> Result<Self, GraphLoaderError> {
373 let graph_loader =
374 GraphLoader::new(db_config, load_config, vertex_collections, edge_collections).await?;
375 Ok(graph_loader)
376 }
377
378 pub async fn do_vertices<F>(&self, vertices_function: F) -> Result<(), GraphLoaderError>
379 where
380 F: Fn(&Vec<Vec<u8>>, &mut Vec<Vec<Value>>, &Vec<String>) -> Result<(), GraphLoaderError>
381 + Send
382 + Sync
383 + Clone
384 + 'static,
385 {
386 {
387 let mut senders: Vec<tokio::sync::mpsc::Sender<Bytes>> = vec![];
389 let mut consumers: Vec<JoinHandle<Result<(), GraphLoaderError>>> = vec![];
390
391 for _i in 0..self.load_config.parallelism {
392 let (sender, mut receiver) = tokio::sync::mpsc::channel::<Bytes>(10);
393 senders.push(sender);
394
395 let vertex_global_fields = self.get_all_vertex_fields_as_list_to_return();
396 let insert_vertex_clone = vertices_function.clone();
397 let strategy_clone = self.load_strategy;
398 let load_config_clone = self.load_config.clone();
399
400 let consumer = std::thread::spawn(move || -> Result<(), GraphLoaderError> {
401 let begin = SystemTime::now();
402 while let Some(resp) = receiver.blocking_recv() {
403 let body_result = std::str::from_utf8(resp.as_ref());
404 let body = match body_result {
405 Ok(body) => body,
406 Err(e) => {
407 return Err(GraphLoaderError::Utf8Error(format!(
408 "UTF8 error when parsing body: {:?}",
409 e
410 )));
411 }
412 };
413 debug!(
414 "{:?} Received post response, body size: {}",
415 SystemTime::now().duration_since(begin),
416 body.len()
417 );
418 let mut vertex_ids: Vec<Vec<u8>> = Vec::with_capacity(400000);
419 let mut vertex_json: Vec<Vec<Value>> = Vec::with_capacity(400000);
420
421 if strategy_clone == Option::from(LoadStrategy::Dump) {
422 for line in body.lines() {
423 let mut vertex: Value = match serde_json::from_str(line) {
424 Err(err) => {
425 return Err(GraphLoaderError::JsonParseError(format!(
426 "Error parsing document for line:\n{}\n{:?}",
427 line, err
428 )));
429 }
430 Ok(val) => val,
431 };
432
433 let id = &vertex["_id"];
434 let idstr: &String = match id {
435 Value::String(i) => {
436 let mut buf = vec![];
437 buf.extend_from_slice(i.as_bytes());
438 vertex_ids.push(buf);
439 i
440 }
441 _ => {
442 return Err(GraphLoaderError::JsonParseError(format!(
443 "JSON is no object with a string _id attribute:\n{}",
444 vertex
445 )));
446 }
447 };
448
449 if load_config_clone.load_all_vertex_attributes {
450 vertex.as_object_mut().unwrap().remove("_id");
451 vertex_json.push(vec![vertex]);
452 } else {
453 let get_value = |v: &Value, field: &str| -> Value {
457 if field == "@collection_name" {
458 Value::String(collection_name_from_id(idstr))
459 } else {
460 v[field].clone()
461 }
462 };
463
464 let mut cols: Vec<Value> =
465 Vec::with_capacity(vertex_global_fields.len());
466 for f in vertex_global_fields.iter() {
467 let j = get_value(&vertex, f);
468 cols.push(j);
469 }
470
471 vertex_json.push(cols);
472 }
473 }
474 } else {
475 let values = match serde_json::from_str::<CursorResult>(body) {
477 Err(err) => {
478 return Err(GraphLoaderError::JsonParseError(format!(
479 "AQL Error parsing document for body:\n{}\n{:?}",
480 body, err
481 )));
482 }
483 Ok(val) => val,
484 };
485
486 for mut vertex in values.result.into_iter() {
487 let id = &vertex["_id"];
488 let idstr: &String = match id {
489 Value::String(i) => {
490 let mut buf = vec![];
491 buf.extend_from_slice(i.as_bytes());
492 vertex_ids.push(buf);
493 i
494 }
495 _ => {
496 return Err(GraphLoaderError::JsonParseError(format!(
497 "JSON is no object with a string _id attribute:\n{}",
498 vertex
499 )));
500 }
501 };
502
503 if load_config_clone.load_all_vertex_attributes {
504 vertex.as_object_mut().unwrap().remove("_id");
505 vertex_json.push(vec![vertex]);
506 } else {
507 let get_value = |v: &Value, field: &str| -> Value {
511 if field == "@collection_name" {
512 Value::String(collection_name_from_id(idstr))
513 } else {
514 v[field].clone()
515 }
516 };
517
518 let mut cols: Vec<Value> =
519 Vec::with_capacity(vertex_global_fields.len());
520 for f in vertex_global_fields.iter() {
521 let j = get_value(&vertex, f);
522 cols.push(j);
523 }
524 vertex_json.push(cols);
525 }
526 }
527 }
528 insert_vertex_clone(&vertex_ids, &mut vertex_json, &vertex_global_fields)?;
529 }
530 Ok(())
531 });
532 consumers.push(consumer);
533 }
534
535 match &self.load_strategy {
536 Some(LoadStrategy::Dump) => {
537 if self.v_collections.is_empty() {
538 error!("No vertex collections given!");
539 return Err(GraphLoaderError::from(
540 "No vertex collections given!".to_string(),
541 ));
542 }
543 if self.vertex_map.is_empty() {
544 error!("No vertex shards found!");
545 return Err(GraphLoaderError::from(
546 "No vertex shards found!".to_string(),
547 ));
548 }
549
550 let potential_vertex_projections = if self.supports_projections.unwrap_or(false)
551 {
552 self.produce_vertex_projections()
553 } else {
554 None
555 };
556
557 let dump_result = crate::sharding::get_all_shard_data(
558 &self.db_config,
559 &self.load_config,
560 &self.vertex_map,
561 senders,
562 &self
563 .support_info
564 .as_ref()
565 .unwrap()
566 .deployment
567 .deployment_type,
568 potential_vertex_projections,
569 )
570 .await;
571 if let Err(e) = dump_result {
572 error!("Error fetching vertex data: {:?}", e);
573 return Err(GraphLoaderError::from(format!(
574 "Error fetching vertex data: {:?}",
575 e
576 )));
577 }
578 }
579 Some(LoadStrategy::Aql) => {
580 let mut v_collection_infos: Vec<CollectionInfo> = vec![];
581 for (_name, info) in self.v_collections.iter() {
582 v_collection_infos.push(info.clone());
583 }
584
585 let aql_result = get_all_data_aql(
586 &self.db_config,
587 &self.load_config,
588 v_collection_infos.as_slice(),
589 senders,
590 false,
591 )
592 .await;
593 if let Err(e) = aql_result {
594 error!("Error fetching edge data: {:?}", e);
595 return Err(GraphLoaderError::from(format!(
596 "Failed to get aql cursor data: {}",
597 e
598 )));
599 }
600 }
601 None => {
602 return Err(GraphLoaderError::from("Load strategy not set".to_string()));
603 }
604 }
605
606 info!("{:?} Got all data, processing...", SystemTime::now());
607 for c in consumers {
608 match c.join() {
609 Ok(Ok(())) => {
610 }
612 Ok(Err(e)) => {
613 eprintln!("Thread returned error: {:?}", e);
615 return Err(e); }
617 Err(e) => {
618 eprintln!("Thread panicked in do_vertices: {:?}", e);
620 return Err(GraphLoaderError::from(
621 "Thread panicked in do_vertices".to_string(),
622 ));
623 }
624 }
625 }
626 }
627 Ok(())
628 }
629
630 pub async fn do_edges<F>(&self, edges_function: F) -> Result<(), GraphLoaderError>
631 where
632 F: Fn(
633 &Vec<Vec<u8>>,
634 &Vec<Vec<u8>>,
635 &mut Vec<Vec<Value>>,
636 &Vec<String>,
637 ) -> Result<(), GraphLoaderError>
638 + Send
639 + Sync
640 + Clone
641 + 'static,
642 {
643 let mut senders: Vec<tokio::sync::mpsc::Sender<Bytes>> = vec![];
644 let mut consumers: Vec<JoinHandle<Result<(), GraphLoaderError>>> = vec![];
645
646 for _i in 0..self.load_config.parallelism {
647 let (sender, mut receiver) = tokio::sync::mpsc::channel::<Bytes>(10);
648 senders.push(sender);
649
650 let edge_global_fields = self.get_all_edges_fields_as_list_to_return();
651 let insert_edge_clone = edges_function.clone();
652 let strategy_clone = self.load_strategy;
653 let load_config_clone = self.load_config.clone();
654
655 let consumer = std::thread::spawn(move || -> Result<(), GraphLoaderError> {
656 while let Some(resp) = receiver.blocking_recv() {
657 let body = std::str::from_utf8(resp.as_ref())
658 .map_err(|e| format!("UTF8 error when parsing body: {:?}", e))?;
659
660 let mut froms: Vec<Vec<u8>> = Vec::with_capacity(1000000);
661 let mut tos: Vec<Vec<u8>> = Vec::with_capacity(1000000);
662 let mut edge_json: Vec<Vec<Value>> = Vec::with_capacity(400000);
663
664 if strategy_clone == Option::from(LoadStrategy::Dump) {
665 for line in body.lines() {
666 let mut edge: Value = match serde_json::from_str(line) {
667 Err(err) => {
668 return Err(GraphLoaderError::from(format!(
669 "Error parsing document for line:\n{}\n{:?}",
670 line, err
671 )));
672 }
673 Ok(val) => val,
674 };
675
676 let from = &edge["_from"];
677 match from {
678 Value::String(i) => {
679 let mut buf = vec![];
680 buf.extend_from_slice(i.as_bytes());
681 froms.push(buf);
682 }
683 _ => {
684 return Err(GraphLoaderError::from(format!(
685 "JSON is no object with a string _from attribute:\n{}",
686 line
687 )));
688 }
689 }
690
691 let to = &edge["_to"];
692 match to {
693 Value::String(i) => {
694 let mut buf = vec![];
695 buf.extend_from_slice(i.as_bytes());
696 tos.push(buf);
697 }
698 _ => {
699 return Err(GraphLoaderError::from(format!(
700 "JSON is no object with a string _from attribute:\n{}",
701 line
702 )));
703 }
704 }
705
706 if load_config_clone.load_all_edge_attributes {
707 edge.as_object_mut().unwrap().remove("_from");
708 edge.as_object_mut().unwrap().remove("_to");
709 edge_json.push(vec![edge]);
710 } else {
711 let id = &edge["_id"];
713 let idstr: Option<&String> = match id {
714 Value::String(i) => Some(i),
715 _ => None,
716 };
717
718 let get_value = |v: &Value, field: &str| -> Value {
722 if field == "@collection_name" {
723 if let Some(id) = idstr {
724 Value::String(collection_name_from_id(id))
725 } else {
726 Value::String("n/A - _id is missing".to_string())
727 }
728 } else {
729 v[field].clone()
730 }
731 };
732
733 let mut cols: Vec<Value> =
734 Vec::with_capacity(edge_global_fields.len());
735 for f in edge_global_fields.iter() {
736 let j = get_value(&edge, f);
737 cols.push(j);
738 }
739
740 edge_json.push(cols);
741 }
742 }
743 } else {
744 let values = match serde_json::from_str::<CursorResult>(body) {
746 Err(err) => {
747 return Err(GraphLoaderError::from(format!(
748 "Error parsing document for body:\n{}\n{:?}",
749 body, err
750 )));
751 }
752 Ok(val) => val,
753 };
754
755 for mut edge in values.result.into_iter() {
756 let from = &edge["_from"];
757 match from {
758 Value::String(i) => {
759 let mut buf = vec![];
760 buf.extend_from_slice(i.as_bytes());
761 froms.push(buf);
762 }
763 _ => {
764 return Err(GraphLoaderError::from(format!(
765 "JSON is no object with a string _from attribute:\n{}",
766 edge
767 )));
768 }
769 }
770 let to = &edge["_to"];
771
772 match to {
773 Value::String(i) => {
774 let mut buf = vec![];
775 buf.extend_from_slice(i.as_bytes());
776 tos.push(buf);
777 }
778 _ => {
779 return Err(GraphLoaderError::from(format!(
780 "JSON is no object with a string _from attribute:\n{}",
781 edge
782 )));
783 }
784 }
785
786 if load_config_clone.load_all_edge_attributes {
787 edge.as_object_mut().unwrap().remove("_from");
788 edge.as_object_mut().unwrap().remove("_to");
789 edge_json.push(vec![edge]);
790 } else {
791 let id = &edge["_id"];
793 let idstr: Option<&String> = match id {
794 Value::String(i) => Some(i),
795 _ => None,
796 };
797
798 let get_value = |v: &Value, field: &str| -> Value {
802 if field == "@collection_name" {
803 if let Some(id) = idstr {
804 Value::String(collection_name_from_id(id))
805 } else {
806 Value::String("n/A - _id is missing".to_string())
807 }
808 } else {
809 v[field].clone()
810 }
811 };
812
813 let mut cols: Vec<Value> =
814 Vec::with_capacity(edge_global_fields.len());
815 for f in edge_global_fields.iter() {
816 let j = get_value(&edge, f);
817 cols.push(j);
818 }
819 edge_json.push(cols);
820 }
821 }
822 }
823 insert_edge_clone(&froms, &tos, &mut edge_json, &edge_global_fields)?;
824 }
825 Ok(())
826 });
827 consumers.push(consumer);
828 }
829
830 match self.load_strategy {
831 Some(LoadStrategy::Dump) => {
832 if self.e_collections.is_empty() {
833 error!("No edge collections given!");
834 return Err(GraphLoaderError::from(
835 "No edge collections given!".to_string(),
836 ));
837 }
838 if self.edge_map.is_empty() {
839 error!("No edge shards found!");
840 return Err(GraphLoaderError::from("No edge shards found!".to_string()));
841 }
842 let potential_edge_projections = if self.supports_projections.unwrap_or(false) {
843 self.produce_edge_projections()
844 } else {
845 None
846 };
847
848 let shard_result = crate::sharding::get_all_shard_data(
849 &self.db_config,
850 &self.load_config,
851 &self.edge_map,
852 senders,
853 &self
854 .support_info
855 .as_ref()
856 .unwrap()
857 .deployment
858 .deployment_type,
859 potential_edge_projections,
860 )
861 .await;
862 if let Err(e) = shard_result {
863 error!("Error fetching edge data: {:?}", e);
864 return Err(e);
865 }
866 }
867 Some(LoadStrategy::Aql) => {
868 let mut e_collection_infos: Vec<CollectionInfo> = vec![];
869 for (_name, info) in self.e_collections.iter() {
870 e_collection_infos.push(info.clone());
871 }
872
873 let aql_result = get_all_data_aql(
874 &self.db_config,
875 &self.load_config,
876 e_collection_infos.as_slice(),
877 senders,
878 true,
879 )
880 .await;
881 if let Err(e) = aql_result {
882 error!("Error fetching edge data: {:?}", e);
883 return Err(GraphLoaderError::from(format!(
884 "Failed to get aql cursor data: {}",
885 e
886 )));
887 }
888 }
889 None => {
890 return Err(GraphLoaderError::from("Load strategy not set".to_string()));
891 }
892 }
893
894 info!(
895 "{:?} Got all edge data, processing...",
896 std::time::SystemTime::now()
897 );
898 for c in consumers {
899 match c.join() {
900 Ok(Ok(())) => {
901 }
903 Ok(Err(e)) => {
904 eprintln!("Thread returned error: {:?}", e);
906 return Err(e); }
908 Err(e) => {
909 eprintln!("Thread panicked in do_edges: {:?}", e);
911 return Err(GraphLoaderError::from(
912 "Thread panicked in do_edges".to_string(),
913 ));
914 }
915 }
916 }
917 Ok(())
918 }
919
920 pub fn get_vertex_collections_as_list(&self) -> Vec<String> {
921 self.v_collections.keys().cloned().collect()
922 }
923
924 pub fn get_edge_collections_as_list(&self) -> Vec<String> {
925 self.e_collections.keys().cloned().collect()
926 }
927
928 pub fn get_all_vertex_fields_as_list_to_return(&self) -> Vec<String> {
929 let mut unique_fields = HashSet::new();
931 for fields in self.v_collections.values().flat_map(|c| c.fields.clone()) {
932 unique_fields.insert(fields);
933 }
934 unique_fields.into_iter().collect()
935 }
936
937 pub fn get_all_vertices_fields_to_fetch_as_list(&self) -> Vec<String> {
938 let mut unique_fields = self.get_all_vertex_fields_as_list_to_return();
942 if !unique_fields.contains(&"_id".to_string()) {
943 unique_fields.insert(0, "_id".to_string());
945 }
946 unique_fields
947 }
948
949 pub fn get_all_edges_fields_as_list_to_return(&self) -> Vec<String> {
950 let mut unique_fields = HashSet::new();
952 for fields in self.e_collections.values().flat_map(|c| c.fields.clone()) {
953 unique_fields.insert(fields);
954 }
955 unique_fields.into_iter().collect()
956 }
957
958 pub fn get_all_edges_fields_to_fetch_as_list(&self) -> Vec<String> {
959 let mut unique_fields = self.get_all_edges_fields_as_list_to_return();
964 if unique_fields.is_empty() && !self.load_config.load_all_edge_attributes {
965 unique_fields.insert(0, "_to".to_string());
966 unique_fields.insert(0, "_from".to_string());
967 }
968
969 if unique_fields.contains(&"@collection_name".to_string())
970 && !unique_fields.contains(&"_id".to_string())
971 {
972 unique_fields.insert(0, "_id".to_string());
974 }
975
976 unique_fields
977 }
978
979 pub fn produce_vertex_projections(&self) -> Option<HashMap<String, Vec<String>>> {
980 assert!(self.supports_projections.unwrap());
981 let mut potential_vertex_projections: Option<HashMap<String, Vec<String>>> = None;
982 let vertex_global_fields = self.get_all_vertices_fields_to_fetch_as_list();
983
984 let client_wants_all_vertex_attributes = self.load_config.load_all_vertex_attributes;
988 if !client_wants_all_vertex_attributes {
989 let mut vertex_projections: HashMap<String, Vec<String>> = HashMap::new();
990
991 for field in vertex_global_fields {
993 vertex_projections.insert(field.to_string(), vec![field.to_string()]);
994 }
995 potential_vertex_projections = Some(vertex_projections);
996 }
997 potential_vertex_projections
998 }
999
1000 pub fn produce_edge_projections(&self) -> Option<HashMap<String, Vec<String>>> {
1001 assert!(self.supports_projections.unwrap());
1002 let mut potential_edge_projections: Option<HashMap<String, Vec<String>>> = None;
1003 let edge_global_fields = self.get_all_edges_fields_to_fetch_as_list();
1004
1005 let client_wants_all_edge_attributes = self.load_config.load_all_edge_attributes;
1009 if !client_wants_all_edge_attributes {
1010 let mut edge_projections: HashMap<String, Vec<String>> = HashMap::new();
1011
1012 if !edge_global_fields.contains(&"_from".to_string()) {
1015 edge_projections.insert("_from".to_string(), vec!["_from".to_string()]);
1016 }
1017 if !edge_global_fields.contains(&"_to".to_string()) {
1018 edge_projections.insert("_to".to_string(), vec!["_to".to_string()]);
1019 }
1020
1021 for field in edge_global_fields {
1022 edge_projections.insert(field.to_string(), vec![field.to_string()]);
1023 }
1024 potential_edge_projections = Some(edge_projections);
1025 }
1026 potential_edge_projections
1027 }
1028}
1029
1030async fn get_graph_collections(
1031 db_config: &DatabaseConfiguration,
1032 graph_name: String,
1033) -> Result<(Vec<String>, Vec<String>), GraphLoaderError> {
1034 let param_url = format!("/_api/gharial/{}", graph_name);
1035 let url = make_url(db_config, ¶m_url);
1036 let graph_name = graph_name.clone();
1037 let (vertex_collections, edge_collections) =
1038 fetch_edge_and_vertex_collections_by_graph(db_config, url).await?;
1039 info!(
1040 "{:?} Got vertex collections: {:?}, edge collections: {:?} from graph definition for: {:?}.",
1041 SystemTime::now(),
1042 vertex_collections,
1043 edge_collections,
1044 graph_name
1045 );
1046
1047 Ok((vertex_collections, edge_collections))
1048}
1049
1050async fn fetch_edge_and_vertex_collections_by_graph(
1051 db_config: &DatabaseConfiguration,
1052 url: String,
1053) -> Result<(Vec<String>, Vec<String>), GraphLoaderError> {
1054 let mut edge_collection_names = vec![];
1055 let mut vertex_collection_names = vec![];
1056
1057 let use_tls = db_config.endpoints[0].starts_with("https://");
1058 let client_config = ClientConfig::builder()
1059 .n_retries(5)
1060 .use_tls(use_tls)
1061 .tls_cert_opt(db_config.tls_cert.clone())
1062 .build();
1063 let client = build_client(&client_config)?;
1064
1065 let resp = handle_auth(client.get(url), db_config).send().await;
1066
1067 let parsed_response =
1068 handle_arangodb_response_with_parsed_body::<serde_json::Value>(resp, StatusCode::OK)
1069 .await?;
1070 let graph = parsed_response["graph"]
1071 .as_object()
1072 .ok_or(GraphLoaderError::GraphNotObject)?;
1073 let edge_definitions = graph
1074 .get("edgeDefinitions")
1075 .ok_or(GraphLoaderError::NoEdgeDefinitions)?
1076 .as_array()
1077 .ok_or(GraphLoaderError::EdgeDefinitionsNotArray)?;
1078
1079 let mut non_unique_vertex_collection_names = vec![];
1080 for edge_definition in edge_definitions {
1081 let edge_collection_name = edge_definition["collection"]
1082 .as_str()
1083 .ok_or(GraphLoaderError::CollectionNotString)?;
1084 edge_collection_names.push(edge_collection_name.to_string());
1085
1086 let from = edge_definition["from"]
1087 .as_array()
1088 .ok_or(GraphLoaderError::FromNotArray)?;
1089 for vertex in from {
1090 let vertex_collection_name = vertex
1091 .as_str()
1092 .ok_or(GraphLoaderError::FromCollectionNotString)?;
1093 non_unique_vertex_collection_names.push(vertex_collection_name.to_string());
1094 }
1095
1096 let to = edge_definition["to"]
1097 .as_array()
1098 .ok_or(GraphLoaderError::ToNotArray)?;
1099 for vertex in to {
1100 let vertex_collection_name = vertex
1101 .as_str()
1102 .ok_or(GraphLoaderError::ToCollectionNotString)?;
1103 non_unique_vertex_collection_names.push(vertex_collection_name.to_string());
1104 }
1105 }
1106
1107 non_unique_vertex_collection_names.sort();
1108 non_unique_vertex_collection_names.dedup();
1109 vertex_collection_names.append(&mut non_unique_vertex_collection_names);
1110
1111 Ok((vertex_collection_names, edge_collection_names))
1112}