arangors_graph_exporter/
graph_loader.rs

1use crate::aql::get_all_data_aql;
2use crate::client::auth::handle_auth;
3use crate::client::config::ClientConfig;
4use crate::client::{build_client, make_url};
5use crate::errors::GraphLoaderError;
6use crate::request::handle_arangodb_response_with_parsed_body;
7use crate::sharding::{compute_faked_shard_map, compute_shard_map};
8use crate::types::info::{
9    DeploymentInfo, DeploymentType, LoadStrategy, SupportInfo, VersionInformation,
10};
11use crate::{DataLoadConfiguration, DatabaseConfiguration};
12use bytes::Bytes;
13use log::{debug, error, info};
14use reqwest::StatusCode;
15use reqwest_middleware::ClientWithMiddleware;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::cmp::PartialEq;
19use std::collections::{HashMap, HashSet};
20use std::num::ParseIntError;
21use std::thread::JoinHandle;
22use std::time::SystemTime;
23
24static MIN_SUPPORTED_MINOR_VERSIONS: &[(u8, u8)] = &[(3, 12)];
25
26// Define the necessary structs
27#[derive(Clone)]
28pub struct CollectionInfo {
29    pub name: String,
30    pub fields: Vec<String>,
31}
32
33#[derive(Debug, Serialize, Deserialize)]
34struct CursorResult {
35    result: Vec<Value>,
36}
37
38pub struct GraphLoader {
39    db_config: DatabaseConfiguration,
40    load_config: DataLoadConfiguration,
41    v_collections: HashMap<String, CollectionInfo>,
42    e_collections: HashMap<String, CollectionInfo>,
43    vertex_map: crate::sharding::ShardMap,
44    edge_map: crate::sharding::ShardMap,
45    // should be used as private
46    load_strategy: Option<LoadStrategy>,
47    support_info: Option<SupportInfo>,
48    supports_projections: Option<bool>,
49}
50
51fn collection_name_from_id(id: &str) -> String {
52    match id.find('/') {
53        None => "".to_string(),
54        Some(p) => id[0..p].to_string(),
55    }
56}
57
58impl PartialEq<DeploymentType> for &DeploymentType {
59    fn eq(&self, other: &DeploymentType) -> bool {
60        matches!(
61            (self, other),
62            (DeploymentType::Cluster, DeploymentType::Cluster)
63                | (DeploymentType::Single, DeploymentType::Single)
64        )
65    }
66}
67
68impl GraphLoader {
69    pub async fn new(
70        db_config: DatabaseConfiguration,
71        load_config: DataLoadConfiguration,
72        vertex_collections: Vec<CollectionInfo>,
73        edge_collections: Vec<CollectionInfo>,
74    ) -> Result<GraphLoader, GraphLoaderError> {
75        let v_collections = vertex_collections
76            .into_iter()
77            .map(|c| (c.name.clone(), c))
78            .collect();
79        let e_collections = edge_collections
80            .into_iter()
81            .map(|c| (c.name.clone(), c))
82            .collect();
83
84        let mut graph_loader = GraphLoader {
85            db_config,
86            load_config,
87            v_collections,
88            e_collections,
89            vertex_map: HashMap::new(),
90            edge_map: HashMap::new(),
91            load_strategy: None,
92            support_info: None,
93            supports_projections: None,
94        };
95        let init_result = graph_loader.initialize().await;
96        init_result?;
97
98        Ok(graph_loader)
99    }
100
101    async fn does_arangodb_supports_dump_endpoint(
102        &self,
103        client: &ClientWithMiddleware,
104        support_info: &Option<SupportInfo>,
105    ) -> Result<(bool, bool), String> {
106        let is_cluster = support_info
107            .as_ref()
108            .ok_or("Support Info not set".to_string())?
109            .deployment
110            .deployment_type
111            == DeploymentType::Cluster;
112        let server_version_url = self.db_config.endpoints[0].clone() + "/_api/version";
113        let resp = handle_auth(client.get(server_version_url), &self.db_config)
114            .send()
115            .await;
116        let version_info_result =
117            handle_arangodb_response_with_parsed_body::<VersionInformation>(resp, StatusCode::OK)
118                .await;
119        if let Err(e) = version_info_result {
120            return Err(e.to_string());
121        }
122        let version_info = version_info_result.unwrap();
123
124        let version_parts: Vec<&str> = version_info.version.split('.').collect();
125        if version_parts.len() < 3 {
126            return Err(format!(
127                "Unable to parse ArangoDB Version - got {}",
128                version_info.version
129            ));
130        }
131
132        let (supports_dump_endpoint, support_dump_projections) = {
133            let major: u8 = version_parts
134                .first()
135                .ok_or("Unable to parse Major Version".to_string())?
136                .parse()
137                .map_err(|err: ParseIntError| err.to_string())?;
138            let minor: u8 = version_parts
139                .get(1)
140                .ok_or("Unable to parse Minor Version".to_string())?
141                .parse()
142                .map_err(|err: ParseIntError| err.to_string())?;
143            let major_supports = MIN_SUPPORTED_MINOR_VERSIONS
144                .iter()
145                .map(|x| x.0)
146                .any(|x| x == major);
147            if !major_supports {
148                (false, false)
149            } else {
150                let mut supports_dump = MIN_SUPPORTED_MINOR_VERSIONS
151                    .iter()
152                    .find(|x| x.0 == major)
153                    .ok_or("Unable to find supported version".to_string())?
154                    .1
155                    <= minor;
156
157                // One special rule, if we are a cluster and 3.11, we support the dump endpoint
158                if is_cluster && major == 3 && minor == 11 {
159                    supports_dump = true;
160                }
161
162                // Rule for projections: Supported from 3.12 onwards
163                let mut supports_projections = false;
164                if major == 3 && minor >= 12 {
165                    supports_projections = true;
166                }
167
168                (supports_dump, supports_projections)
169            }
170        };
171
172        Ok((supports_dump_endpoint, support_dump_projections))
173    }
174
175    async fn get_arangodb_support_information(
176        &self,
177        client: &ClientWithMiddleware,
178    ) -> Result<SupportInfo, String> {
179        let server_information_url = self.db_config.endpoints[0].clone() + "/_admin/support-info";
180        let support_info_res = handle_auth(client.get(server_information_url), &self.db_config)
181            .send()
182            .await;
183        let support_info_result = handle_arangodb_response_with_parsed_body::<SupportInfo>(
184            support_info_res,
185            StatusCode::OK,
186        )
187        .await;
188        if let Err(e) = support_info_result {
189            return Err(e.to_string());
190        }
191        let support_info = support_info_result.unwrap();
192        Ok(support_info)
193    }
194
195    fn identify_arangodb_load_strategy(&self, dump_support_enabled: bool) -> LoadStrategy {
196        let support_info = &self
197            .support_info
198            .as_ref()
199            .unwrap()
200            .deployment
201            .deployment_type;
202        if !dump_support_enabled && support_info == DeploymentType::Single {
203            LoadStrategy::Aql
204        } else {
205            LoadStrategy::Dump
206        }
207    }
208
209    async fn identify_load_strategy(
210        &mut self,
211        client: &ClientWithMiddleware,
212    ) -> Result<(), String> {
213        self.support_info = match self.get_arangodb_support_information(client).await {
214            Ok(info) => Some(info),
215            Err(e) => {
216                print!(
217                    "Failed to read ArangoDB environment information: {}. \
218                    This can happen if the current user is not allowed to access the '/_admin/support-info' endpoint. \
219                    Assuming ArangoDB instance version is at least '3.12'. \
220                    We will use the aql load strategy as the loading strategy. \
221                    While this works, this will be slower then using the dump endpoint instead. ",
222                    e
223                );
224                None
225            }
226        };
227
228        if self.support_info.is_none() {
229            // assume the environment is a cluster
230            self.support_info = Some(SupportInfo {
231                deployment: DeploymentInfo {
232                    deployment_type: DeploymentType::Single,
233                },
234            });
235
236            // assume that dump endpoint is supported
237            self.load_strategy = Some(LoadStrategy::Aql);
238            // assume projections are supported
239            self.supports_projections = Some(true);
240        } else {
241            let (dump_support_enabled, dump_projections_support) = self
242                .does_arangodb_supports_dump_endpoint(client, &self.support_info)
243                .await?;
244            self.supports_projections = Some(dump_projections_support);
245            self.load_strategy = Some(self.identify_arangodb_load_strategy(dump_support_enabled));
246        }
247
248        debug_assert!(self.load_strategy.is_some());
249        debug_assert!(self.supports_projections.is_some());
250
251        Ok(())
252    }
253
254    fn verify_parameters(&self) -> Result<(), GraphLoaderError> {
255        if !self.get_all_vertex_fields_as_list_to_return().is_empty()
256            && self.load_config.load_all_vertex_attributes
257        {
258            return Err(GraphLoaderError::from(
259                "load_all_vertex_attributes is set to true, but vertex collections are not empty."
260                    .to_string(),
261            ));
262        }
263        if !self.get_all_edges_fields_as_list_to_return().is_empty()
264            && self.load_config.load_all_edge_attributes
265        {
266            return Err(GraphLoaderError::from(
267                "load_all_edge_attributes is set to true, but edge collections are not empty."
268                    .to_string(),
269            ));
270        }
271        Ok(())
272    }
273
274    async fn initialize(&mut self) -> Result<(), GraphLoaderError> {
275        self.verify_parameters()?;
276
277        let use_tls = self.db_config.endpoints[0].starts_with("https://");
278        let client_config = ClientConfig::builder()
279            .n_retries(5)
280            .use_tls(use_tls)
281            .tls_cert_opt(self.db_config.tls_cert.clone())
282            .build();
283        let client = build_client(&client_config)?;
284        self.identify_load_strategy(&client).await?;
285
286        let load_strategy = self.load_strategy.as_ref().unwrap();
287        let deployment_type = &self
288            .support_info
289            .as_ref()
290            .unwrap()
291            .deployment
292            .deployment_type;
293
294        if load_strategy == &LoadStrategy::Dump {
295            if *deployment_type == DeploymentType::Cluster {
296                // Compute which shard we must get from which dbserver, we do vertices
297                // and edges right away to be able to error out early:
298
299                // First ask for the shard distribution:
300                let url = make_url(&self.db_config, "/_admin/cluster/shardDistribution");
301                let resp = handle_auth(client.get(url), &self.db_config).send().await;
302
303                let shard_dist = handle_arangodb_response_with_parsed_body::<
304                    crate::sharding::ShardDistribution,
305                >(resp, StatusCode::OK)
306                .await?;
307                info!("Using dump strategy for loading data.");
308                self.vertex_map =
309                    compute_shard_map(&shard_dist, &self.get_vertex_collections_as_list())?;
310                self.edge_map =
311                    compute_shard_map(&shard_dist, &self.get_edge_collections_as_list())?;
312            } else {
313                self.vertex_map = compute_faked_shard_map(&self.get_vertex_collections_as_list());
314                self.edge_map = compute_faked_shard_map(&self.get_edge_collections_as_list());
315            }
316            info!(
317                "{:?} Need to fetch data from {} vertex shards and {} edge shards...",
318                std::time::SystemTime::now(),
319                self.vertex_map.values().map(|v| v.len()).sum::<usize>(),
320                self.edge_map.values().map(|v| v.len()).sum::<usize>()
321            );
322        } else {
323            info!("Using AQL strategy for loading data.");
324        }
325
326        Ok(())
327    }
328
329    pub async fn new_named(
330        db_config: DatabaseConfiguration,
331        load_config: DataLoadConfiguration,
332        graph_name: String,
333        vertex_global_fields: Option<Vec<String>>,
334        edge_global_fields: Option<Vec<String>>,
335    ) -> Result<GraphLoader, GraphLoaderError> {
336        let vertex_coll_list;
337        let edge_coll_list;
338
339        match get_graph_collections(&db_config, graph_name).await {
340            Ok((vertex_collections, edge_collections)) => {
341                vertex_coll_list = vertex_collections
342                    .iter()
343                    .map(|c| CollectionInfo {
344                        name: c.clone(),
345                        fields: vertex_global_fields.clone().unwrap_or_default(),
346                    })
347                    .collect();
348
349                edge_coll_list = edge_collections
350                    .iter()
351                    .map(|c| CollectionInfo {
352                        name: c.clone(),
353                        fields: edge_global_fields.clone().unwrap_or_default(),
354                    })
355                    .collect();
356            }
357            Err(err) => {
358                return Err(err);
359            }
360        }
361
362        let graph_loader =
363            GraphLoader::new(db_config, load_config, vertex_coll_list, edge_coll_list).await?;
364        Ok(graph_loader)
365    }
366
367    pub async fn new_custom(
368        db_config: DatabaseConfiguration,
369        load_config: DataLoadConfiguration,
370        vertex_collections: Vec<CollectionInfo>,
371        edge_collections: Vec<CollectionInfo>,
372    ) -> Result<Self, GraphLoaderError> {
373        let graph_loader =
374            GraphLoader::new(db_config, load_config, vertex_collections, edge_collections).await?;
375        Ok(graph_loader)
376    }
377
378    pub async fn do_vertices<F>(&self, vertices_function: F) -> Result<(), GraphLoaderError>
379    where
380        F: Fn(&Vec<Vec<u8>>, &mut Vec<Vec<Value>>, &Vec<String>) -> Result<(), GraphLoaderError>
381            + Send
382            + Sync
383            + Clone
384            + 'static,
385    {
386        {
387            // We use multiple threads to receive the data in batches:
388            let mut senders: Vec<tokio::sync::mpsc::Sender<Bytes>> = vec![];
389            let mut consumers: Vec<JoinHandle<Result<(), GraphLoaderError>>> = vec![];
390
391            for _i in 0..self.load_config.parallelism {
392                let (sender, mut receiver) = tokio::sync::mpsc::channel::<Bytes>(10);
393                senders.push(sender);
394
395                let vertex_global_fields = self.get_all_vertex_fields_as_list_to_return();
396                let insert_vertex_clone = vertices_function.clone();
397                let strategy_clone = self.load_strategy;
398                let load_config_clone = self.load_config.clone();
399
400                let consumer = std::thread::spawn(move || -> Result<(), GraphLoaderError> {
401                    let begin = SystemTime::now();
402                    while let Some(resp) = receiver.blocking_recv() {
403                        let body_result = std::str::from_utf8(resp.as_ref());
404                        let body = match body_result {
405                            Ok(body) => body,
406                            Err(e) => {
407                                return Err(GraphLoaderError::Utf8Error(format!(
408                                    "UTF8 error when parsing body: {:?}",
409                                    e
410                                )));
411                            }
412                        };
413                        debug!(
414                            "{:?} Received post response, body size: {}",
415                            SystemTime::now().duration_since(begin),
416                            body.len()
417                        );
418                        let mut vertex_ids: Vec<Vec<u8>> = Vec::with_capacity(400000);
419                        let mut vertex_json: Vec<Vec<Value>> = Vec::with_capacity(400000);
420
421                        if strategy_clone == Option::from(LoadStrategy::Dump) {
422                            for line in body.lines() {
423                                let mut vertex: Value = match serde_json::from_str(line) {
424                                    Err(err) => {
425                                        return Err(GraphLoaderError::JsonParseError(format!(
426                                            "Error parsing document for line:\n{}\n{:?}",
427                                            line, err
428                                        )));
429                                    }
430                                    Ok(val) => val,
431                                };
432
433                                let id = &vertex["_id"];
434                                let idstr: &String = match id {
435                                    Value::String(i) => {
436                                        let mut buf = vec![];
437                                        buf.extend_from_slice(i.as_bytes());
438                                        vertex_ids.push(buf);
439                                        i
440                                    }
441                                    _ => {
442                                        return Err(GraphLoaderError::JsonParseError(format!(
443                                            "JSON is no object with a string _id attribute:\n{}",
444                                            vertex
445                                        )));
446                                    }
447                                };
448
449                                if load_config_clone.load_all_vertex_attributes {
450                                    vertex.as_object_mut().unwrap().remove("_id");
451                                    vertex_json.push(vec![vertex]);
452                                } else {
453                                    // If we get here, we have to extract the field
454                                    // values in `fields` from the json and store it
455                                    // to vertex_json:
456                                    let get_value = |v: &Value, field: &str| -> Value {
457                                        if field == "@collection_name" {
458                                            Value::String(collection_name_from_id(idstr))
459                                        } else {
460                                            v[field].clone()
461                                        }
462                                    };
463
464                                    let mut cols: Vec<Value> =
465                                        Vec::with_capacity(vertex_global_fields.len());
466                                    for f in vertex_global_fields.iter() {
467                                        let j = get_value(&vertex, f);
468                                        cols.push(j);
469                                    }
470
471                                    vertex_json.push(cols);
472                                }
473                            }
474                        } else {
475                            // This it the AQL Loading variant
476                            let values = match serde_json::from_str::<CursorResult>(body) {
477                                Err(err) => {
478                                    return Err(GraphLoaderError::JsonParseError(format!(
479                                        "AQL Error parsing document for body:\n{}\n{:?}",
480                                        body, err
481                                    )));
482                                }
483                                Ok(val) => val,
484                            };
485
486                            for mut vertex in values.result.into_iter() {
487                                let id = &vertex["_id"];
488                                let idstr: &String = match id {
489                                    Value::String(i) => {
490                                        let mut buf = vec![];
491                                        buf.extend_from_slice(i.as_bytes());
492                                        vertex_ids.push(buf);
493                                        i
494                                    }
495                                    _ => {
496                                        return Err(GraphLoaderError::JsonParseError(format!(
497                                            "JSON is no object with a string _id attribute:\n{}",
498                                            vertex
499                                        )));
500                                    }
501                                };
502
503                                if load_config_clone.load_all_vertex_attributes {
504                                    vertex.as_object_mut().unwrap().remove("_id");
505                                    vertex_json.push(vec![vertex]);
506                                } else {
507                                    // If we get here, we have to extract the field
508                                    // values in `fields` from the json and store it
509                                    // to vertex_json:
510                                    let get_value = |v: &Value, field: &str| -> Value {
511                                        if field == "@collection_name" {
512                                            Value::String(collection_name_from_id(idstr))
513                                        } else {
514                                            v[field].clone()
515                                        }
516                                    };
517
518                                    let mut cols: Vec<Value> =
519                                        Vec::with_capacity(vertex_global_fields.len());
520                                    for f in vertex_global_fields.iter() {
521                                        let j = get_value(&vertex, f);
522                                        cols.push(j);
523                                    }
524                                    vertex_json.push(cols);
525                                }
526                            }
527                        }
528                        insert_vertex_clone(&vertex_ids, &mut vertex_json, &vertex_global_fields)?;
529                    }
530                    Ok(())
531                });
532                consumers.push(consumer);
533            }
534
535            match &self.load_strategy {
536                Some(LoadStrategy::Dump) => {
537                    if self.v_collections.is_empty() {
538                        error!("No vertex collections given!");
539                        return Err(GraphLoaderError::from(
540                            "No vertex collections given!".to_string(),
541                        ));
542                    }
543                    if self.vertex_map.is_empty() {
544                        error!("No vertex shards found!");
545                        return Err(GraphLoaderError::from(
546                            "No vertex shards found!".to_string(),
547                        ));
548                    }
549
550                    let potential_vertex_projections = if self.supports_projections.unwrap_or(false)
551                    {
552                        self.produce_vertex_projections()
553                    } else {
554                        None
555                    };
556
557                    let dump_result = crate::sharding::get_all_shard_data(
558                        &self.db_config,
559                        &self.load_config,
560                        &self.vertex_map,
561                        senders,
562                        &self
563                            .support_info
564                            .as_ref()
565                            .unwrap()
566                            .deployment
567                            .deployment_type,
568                        potential_vertex_projections,
569                    )
570                    .await;
571                    if let Err(e) = dump_result {
572                        error!("Error fetching vertex data: {:?}", e);
573                        return Err(GraphLoaderError::from(format!(
574                            "Error fetching vertex data: {:?}",
575                            e
576                        )));
577                    }
578                }
579                Some(LoadStrategy::Aql) => {
580                    let mut v_collection_infos: Vec<CollectionInfo> = vec![];
581                    for (_name, info) in self.v_collections.iter() {
582                        v_collection_infos.push(info.clone());
583                    }
584
585                    let aql_result = get_all_data_aql(
586                        &self.db_config,
587                        &self.load_config,
588                        v_collection_infos.as_slice(),
589                        senders,
590                        false,
591                    )
592                    .await;
593                    if let Err(e) = aql_result {
594                        error!("Error fetching edge data: {:?}", e);
595                        return Err(GraphLoaderError::from(format!(
596                            "Failed to get aql cursor data: {}",
597                            e
598                        )));
599                    }
600                }
601                None => {
602                    return Err(GraphLoaderError::from("Load strategy not set".to_string()));
603                }
604            }
605
606            info!("{:?} Got all data, processing...", SystemTime::now());
607            for c in consumers {
608                match c.join() {
609                    Ok(Ok(())) => {
610                        // The thread completed successfully and returned Ok
611                    }
612                    Ok(Err(e)) => {
613                        // The thread completed but returned an error
614                        eprintln!("Thread returned error: {:?}", e);
615                        return Err(e); // Propagate the error
616                    }
617                    Err(e) => {
618                        // The thread panicked
619                        eprintln!("Thread panicked in do_vertices: {:?}", e);
620                        return Err(GraphLoaderError::from(
621                            "Thread panicked in do_vertices".to_string(),
622                        ));
623                    }
624                }
625            }
626        }
627        Ok(())
628    }
629
630    pub async fn do_edges<F>(&self, edges_function: F) -> Result<(), GraphLoaderError>
631    where
632        F: Fn(
633                &Vec<Vec<u8>>,
634                &Vec<Vec<u8>>,
635                &mut Vec<Vec<Value>>,
636                &Vec<String>,
637            ) -> Result<(), GraphLoaderError>
638            + Send
639            + Sync
640            + Clone
641            + 'static,
642    {
643        let mut senders: Vec<tokio::sync::mpsc::Sender<Bytes>> = vec![];
644        let mut consumers: Vec<JoinHandle<Result<(), GraphLoaderError>>> = vec![];
645
646        for _i in 0..self.load_config.parallelism {
647            let (sender, mut receiver) = tokio::sync::mpsc::channel::<Bytes>(10);
648            senders.push(sender);
649
650            let edge_global_fields = self.get_all_edges_fields_as_list_to_return();
651            let insert_edge_clone = edges_function.clone();
652            let strategy_clone = self.load_strategy;
653            let load_config_clone = self.load_config.clone();
654
655            let consumer = std::thread::spawn(move || -> Result<(), GraphLoaderError> {
656                while let Some(resp) = receiver.blocking_recv() {
657                    let body = std::str::from_utf8(resp.as_ref())
658                        .map_err(|e| format!("UTF8 error when parsing body: {:?}", e))?;
659
660                    let mut froms: Vec<Vec<u8>> = Vec::with_capacity(1000000);
661                    let mut tos: Vec<Vec<u8>> = Vec::with_capacity(1000000);
662                    let mut edge_json: Vec<Vec<Value>> = Vec::with_capacity(400000);
663
664                    if strategy_clone == Option::from(LoadStrategy::Dump) {
665                        for line in body.lines() {
666                            let mut edge: Value = match serde_json::from_str(line) {
667                                Err(err) => {
668                                    return Err(GraphLoaderError::from(format!(
669                                        "Error parsing document for line:\n{}\n{:?}",
670                                        line, err
671                                    )));
672                                }
673                                Ok(val) => val,
674                            };
675
676                            let from = &edge["_from"];
677                            match from {
678                                Value::String(i) => {
679                                    let mut buf = vec![];
680                                    buf.extend_from_slice(i.as_bytes());
681                                    froms.push(buf);
682                                }
683                                _ => {
684                                    return Err(GraphLoaderError::from(format!(
685                                        "JSON is no object with a string _from attribute:\n{}",
686                                        line
687                                    )));
688                                }
689                            }
690
691                            let to = &edge["_to"];
692                            match to {
693                                Value::String(i) => {
694                                    let mut buf = vec![];
695                                    buf.extend_from_slice(i.as_bytes());
696                                    tos.push(buf);
697                                }
698                                _ => {
699                                    return Err(GraphLoaderError::from(format!(
700                                        "JSON is no object with a string _from attribute:\n{}",
701                                        line
702                                    )));
703                                }
704                            }
705
706                            if load_config_clone.load_all_edge_attributes {
707                                edge.as_object_mut().unwrap().remove("_from");
708                                edge.as_object_mut().unwrap().remove("_to");
709                                edge_json.push(vec![edge]);
710                            } else {
711                                // it is not guaranteed that the _id field is present
712                                let id = &edge["_id"];
713                                let idstr: Option<&String> = match id {
714                                    Value::String(i) => Some(i),
715                                    _ => None,
716                                };
717
718                                // If we get here, we have to extract the field
719                                // values in `fields` from the json and store it
720                                // to edge_json:
721                                let get_value = |v: &Value, field: &str| -> Value {
722                                    if field == "@collection_name" {
723                                        if let Some(id) = idstr {
724                                            Value::String(collection_name_from_id(id))
725                                        } else {
726                                            Value::String("n/A - _id is missing".to_string())
727                                        }
728                                    } else {
729                                        v[field].clone()
730                                    }
731                                };
732
733                                let mut cols: Vec<Value> =
734                                    Vec::with_capacity(edge_global_fields.len());
735                                for f in edge_global_fields.iter() {
736                                    let j = get_value(&edge, f);
737                                    cols.push(j);
738                                }
739
740                                edge_json.push(cols);
741                            }
742                        }
743                    } else {
744                        // AQL Variant
745                        let values = match serde_json::from_str::<CursorResult>(body) {
746                            Err(err) => {
747                                return Err(GraphLoaderError::from(format!(
748                                    "Error parsing document for body:\n{}\n{:?}",
749                                    body, err
750                                )));
751                            }
752                            Ok(val) => val,
753                        };
754
755                        for mut edge in values.result.into_iter() {
756                            let from = &edge["_from"];
757                            match from {
758                                Value::String(i) => {
759                                    let mut buf = vec![];
760                                    buf.extend_from_slice(i.as_bytes());
761                                    froms.push(buf);
762                                }
763                                _ => {
764                                    return Err(GraphLoaderError::from(format!(
765                                        "JSON is no object with a string _from attribute:\n{}",
766                                        edge
767                                    )));
768                                }
769                            }
770                            let to = &edge["_to"];
771
772                            match to {
773                                Value::String(i) => {
774                                    let mut buf = vec![];
775                                    buf.extend_from_slice(i.as_bytes());
776                                    tos.push(buf);
777                                }
778                                _ => {
779                                    return Err(GraphLoaderError::from(format!(
780                                        "JSON is no object with a string _from attribute:\n{}",
781                                        edge
782                                    )));
783                                }
784                            }
785
786                            if load_config_clone.load_all_edge_attributes {
787                                edge.as_object_mut().unwrap().remove("_from");
788                                edge.as_object_mut().unwrap().remove("_to");
789                                edge_json.push(vec![edge]);
790                            } else {
791                                // it is not guaranteed that the _id field is present
792                                let id = &edge["_id"];
793                                let idstr: Option<&String> = match id {
794                                    Value::String(i) => Some(i),
795                                    _ => None,
796                                };
797
798                                // If we get here, we have to extract the field
799                                // values in `fields` from the json and store it
800                                // to edge_json:
801                                let get_value = |v: &Value, field: &str| -> Value {
802                                    if field == "@collection_name" {
803                                        if let Some(id) = idstr {
804                                            Value::String(collection_name_from_id(id))
805                                        } else {
806                                            Value::String("n/A - _id is missing".to_string())
807                                        }
808                                    } else {
809                                        v[field].clone()
810                                    }
811                                };
812
813                                let mut cols: Vec<Value> =
814                                    Vec::with_capacity(edge_global_fields.len());
815                                for f in edge_global_fields.iter() {
816                                    let j = get_value(&edge, f);
817                                    cols.push(j);
818                                }
819                                edge_json.push(cols);
820                            }
821                        }
822                    }
823                    insert_edge_clone(&froms, &tos, &mut edge_json, &edge_global_fields)?;
824                }
825                Ok(())
826            });
827            consumers.push(consumer);
828        }
829
830        match self.load_strategy {
831            Some(LoadStrategy::Dump) => {
832                if self.e_collections.is_empty() {
833                    error!("No edge collections given!");
834                    return Err(GraphLoaderError::from(
835                        "No edge collections given!".to_string(),
836                    ));
837                }
838                if self.edge_map.is_empty() {
839                    error!("No edge shards found!");
840                    return Err(GraphLoaderError::from("No edge shards found!".to_string()));
841                }
842                let potential_edge_projections = if self.supports_projections.unwrap_or(false) {
843                    self.produce_edge_projections()
844                } else {
845                    None
846                };
847
848                let shard_result = crate::sharding::get_all_shard_data(
849                    &self.db_config,
850                    &self.load_config,
851                    &self.edge_map,
852                    senders,
853                    &self
854                        .support_info
855                        .as_ref()
856                        .unwrap()
857                        .deployment
858                        .deployment_type,
859                    potential_edge_projections,
860                )
861                .await;
862                if let Err(e) = shard_result {
863                    error!("Error fetching edge data: {:?}", e);
864                    return Err(e);
865                }
866            }
867            Some(LoadStrategy::Aql) => {
868                let mut e_collection_infos: Vec<CollectionInfo> = vec![];
869                for (_name, info) in self.e_collections.iter() {
870                    e_collection_infos.push(info.clone());
871                }
872
873                let aql_result = get_all_data_aql(
874                    &self.db_config,
875                    &self.load_config,
876                    e_collection_infos.as_slice(),
877                    senders,
878                    true,
879                )
880                .await;
881                if let Err(e) = aql_result {
882                    error!("Error fetching edge data: {:?}", e);
883                    return Err(GraphLoaderError::from(format!(
884                        "Failed to get aql cursor data: {}",
885                        e
886                    )));
887                }
888            }
889            None => {
890                return Err(GraphLoaderError::from("Load strategy not set".to_string()));
891            }
892        }
893
894        info!(
895            "{:?} Got all edge data, processing...",
896            std::time::SystemTime::now()
897        );
898        for c in consumers {
899            match c.join() {
900                Ok(Ok(())) => {
901                    // The thread completed successfully and returned Ok
902                }
903                Ok(Err(e)) => {
904                    // The thread completed but returned an error
905                    eprintln!("Thread returned error: {:?}", e);
906                    return Err(e); // Propagate the error
907                }
908                Err(e) => {
909                    // The thread panicked
910                    eprintln!("Thread panicked in do_edges: {:?}", e);
911                    return Err(GraphLoaderError::from(
912                        "Thread panicked in do_edges".to_string(),
913                    ));
914                }
915            }
916        }
917        Ok(())
918    }
919
920    pub fn get_vertex_collections_as_list(&self) -> Vec<String> {
921        self.v_collections.keys().cloned().collect()
922    }
923
924    pub fn get_edge_collections_as_list(&self) -> Vec<String> {
925        self.e_collections.keys().cloned().collect()
926    }
927
928    pub fn get_all_vertex_fields_as_list_to_return(&self) -> Vec<String> {
929        // Guaranteed to be unique
930        let mut unique_fields = HashSet::new();
931        for fields in self.v_collections.values().flat_map(|c| c.fields.clone()) {
932            unique_fields.insert(fields);
933        }
934        unique_fields.into_iter().collect()
935    }
936
937    pub fn get_all_vertices_fields_to_fetch_as_list(&self) -> Vec<String> {
938        // Guaranteed to be unique
939        // This method adds required fields if they are not present,
940        // which need to be available to deliver the required resources.
941        let mut unique_fields = self.get_all_vertex_fields_as_list_to_return();
942        if !unique_fields.contains(&"_id".to_string()) {
943            // _id is always required.
944            unique_fields.insert(0, "_id".to_string());
945        }
946        unique_fields
947    }
948
949    pub fn get_all_edges_fields_as_list_to_return(&self) -> Vec<String> {
950        // Guaranteed to be unique
951        let mut unique_fields = HashSet::new();
952        for fields in self.e_collections.values().flat_map(|c| c.fields.clone()) {
953            unique_fields.insert(fields);
954        }
955        unique_fields.into_iter().collect()
956    }
957
958    pub fn get_all_edges_fields_to_fetch_as_list(&self) -> Vec<String> {
959        // Guaranteed to be unique
960        // This method adds required fields if they are not present,
961        // which need to be available to deliver the required resources.
962
963        let mut unique_fields = self.get_all_edges_fields_as_list_to_return();
964        if unique_fields.is_empty() && !self.load_config.load_all_edge_attributes {
965            unique_fields.insert(0, "_to".to_string());
966            unique_fields.insert(0, "_from".to_string());
967        }
968
969        if unique_fields.contains(&"@collection_name".to_string())
970            && !unique_fields.contains(&"_id".to_string())
971        {
972            // Compared to vertices, this is not a mandatory field.
973            unique_fields.insert(0, "_id".to_string());
974        }
975
976        unique_fields
977    }
978
979    pub fn produce_vertex_projections(&self) -> Option<HashMap<String, Vec<String>>> {
980        assert!(self.supports_projections.unwrap());
981        let mut potential_vertex_projections: Option<HashMap<String, Vec<String>>> = None;
982        let vertex_global_fields = self.get_all_vertices_fields_to_fetch_as_list();
983
984        // We can only make use of projections in case:
985        // 1.) The user has not requested all vertex attributes
986        // 2.) ArangoDB supports the dump endpoint, which is Version 3.12 or higher
987        let client_wants_all_vertex_attributes = self.load_config.load_all_vertex_attributes;
988        if !client_wants_all_vertex_attributes {
989            let mut vertex_projections: HashMap<String, Vec<String>> = HashMap::new();
990
991            // now add all user specific fields
992            for field in vertex_global_fields {
993                vertex_projections.insert(field.to_string(), vec![field.to_string()]);
994            }
995            potential_vertex_projections = Some(vertex_projections);
996        }
997        potential_vertex_projections
998    }
999
1000    pub fn produce_edge_projections(&self) -> Option<HashMap<String, Vec<String>>> {
1001        assert!(self.supports_projections.unwrap());
1002        let mut potential_edge_projections: Option<HashMap<String, Vec<String>>> = None;
1003        let edge_global_fields = self.get_all_edges_fields_to_fetch_as_list();
1004
1005        // We can only make use of projections in case:
1006        // 1.) The user has not requested all vertex attributes
1007        // 2.) ArangoDB supports the dump endpoint, which is Version 3.12 or higher
1008        let client_wants_all_edge_attributes = self.load_config.load_all_edge_attributes;
1009        if !client_wants_all_edge_attributes {
1010            let mut edge_projections: HashMap<String, Vec<String>> = HashMap::new();
1011
1012            // if edge_global_fields does not contain "_from" and "_to" we have to add it as it is required.
1013            // if this is not done, those fields will not be returned from the server's dump endpoint
1014            if !edge_global_fields.contains(&"_from".to_string()) {
1015                edge_projections.insert("_from".to_string(), vec!["_from".to_string()]);
1016            }
1017            if !edge_global_fields.contains(&"_to".to_string()) {
1018                edge_projections.insert("_to".to_string(), vec!["_to".to_string()]);
1019            }
1020
1021            for field in edge_global_fields {
1022                edge_projections.insert(field.to_string(), vec![field.to_string()]);
1023            }
1024            potential_edge_projections = Some(edge_projections);
1025        }
1026        potential_edge_projections
1027    }
1028}
1029
1030async fn get_graph_collections(
1031    db_config: &DatabaseConfiguration,
1032    graph_name: String,
1033) -> Result<(Vec<String>, Vec<String>), GraphLoaderError> {
1034    let param_url = format!("/_api/gharial/{}", graph_name);
1035    let url = make_url(db_config, &param_url);
1036    let graph_name = graph_name.clone();
1037    let (vertex_collections, edge_collections) =
1038        fetch_edge_and_vertex_collections_by_graph(db_config, url).await?;
1039    info!(
1040        "{:?} Got vertex collections: {:?}, edge collections: {:?} from graph definition for: {:?}.",
1041        SystemTime::now(),
1042        vertex_collections,
1043        edge_collections,
1044        graph_name
1045    );
1046
1047    Ok((vertex_collections, edge_collections))
1048}
1049
1050async fn fetch_edge_and_vertex_collections_by_graph(
1051    db_config: &DatabaseConfiguration,
1052    url: String,
1053) -> Result<(Vec<String>, Vec<String>), GraphLoaderError> {
1054    let mut edge_collection_names = vec![];
1055    let mut vertex_collection_names = vec![];
1056
1057    let use_tls = db_config.endpoints[0].starts_with("https://");
1058    let client_config = ClientConfig::builder()
1059        .n_retries(5)
1060        .use_tls(use_tls)
1061        .tls_cert_opt(db_config.tls_cert.clone())
1062        .build();
1063    let client = build_client(&client_config)?;
1064
1065    let resp = handle_auth(client.get(url), db_config).send().await;
1066
1067    let parsed_response =
1068        handle_arangodb_response_with_parsed_body::<serde_json::Value>(resp, StatusCode::OK)
1069            .await?;
1070    let graph = parsed_response["graph"]
1071        .as_object()
1072        .ok_or(GraphLoaderError::GraphNotObject)?;
1073    let edge_definitions = graph
1074        .get("edgeDefinitions")
1075        .ok_or(GraphLoaderError::NoEdgeDefinitions)?
1076        .as_array()
1077        .ok_or(GraphLoaderError::EdgeDefinitionsNotArray)?;
1078
1079    let mut non_unique_vertex_collection_names = vec![];
1080    for edge_definition in edge_definitions {
1081        let edge_collection_name = edge_definition["collection"]
1082            .as_str()
1083            .ok_or(GraphLoaderError::CollectionNotString)?;
1084        edge_collection_names.push(edge_collection_name.to_string());
1085
1086        let from = edge_definition["from"]
1087            .as_array()
1088            .ok_or(GraphLoaderError::FromNotArray)?;
1089        for vertex in from {
1090            let vertex_collection_name = vertex
1091                .as_str()
1092                .ok_or(GraphLoaderError::FromCollectionNotString)?;
1093            non_unique_vertex_collection_names.push(vertex_collection_name.to_string());
1094        }
1095
1096        let to = edge_definition["to"]
1097            .as_array()
1098            .ok_or(GraphLoaderError::ToNotArray)?;
1099        for vertex in to {
1100            let vertex_collection_name = vertex
1101                .as_str()
1102                .ok_or(GraphLoaderError::ToCollectionNotString)?;
1103            non_unique_vertex_collection_names.push(vertex_collection_name.to_string());
1104        }
1105    }
1106
1107    non_unique_vertex_collection_names.sort();
1108    non_unique_vertex_collection_names.dedup();
1109    vertex_collection_names.append(&mut non_unique_vertex_collection_names);
1110
1111    Ok((vertex_collection_names, edge_collection_names))
1112}