arangors_graph_exporter/
aql_graph_loader.rs

1use crate::DatabaseConfiguration;
2use crate::client::auth::handle_auth;
3use crate::client::config::ClientConfig;
4use crate::client::{build_client, make_url};
5use crate::errors::GraphLoaderError;
6use bytes::Bytes;
7use reqwest::StatusCode;
8use reqwest_middleware::ClientWithMiddleware;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use std::collections::HashMap;
12use tokio::task::JoinSet;
13
14/// Data types supported for graph attributes
15#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
16pub enum DataType {
17    Bool,
18    String,
19    U64,
20    I64,
21    F64,
22    JSON,
23}
24
25/// Description of a data item (attribute) with name and type
26#[derive(Clone, Debug)]
27pub struct DataItem {
28    pub name: String,
29    pub data_type: DataType,
30}
31
32/// A batch of graph data containing vertices and edges
33#[derive(Debug)]
34pub struct GraphBatch {
35    /// Vertex IDs as byte vectors
36    pub vertex_ids: Vec<Vec<u8>>,
37    /// Vertex attributes, parallel to vertex_ids
38    pub vertex_attribute_values: Vec<Vec<Value>>,
39    /// Edge source IDs as byte vectors
40    pub edge_from_ids: Vec<Vec<u8>>,
41    /// Edge target IDs as byte vectors
42    pub edge_to_ids: Vec<Vec<u8>>,
43    /// Edge attributes, parallel to edge_from_ids/edge_to_ids
44    pub edge_attribute_values: Vec<Vec<Value>>,
45    /// Total number of type conversion errors encountered
46    pub type_error_count: usize,
47    /// Type error messages (limited based on configuration)
48    pub type_error_messages: Vec<String>,
49    /// Maximum number of type error messages to collect (None = no limit)
50    max_type_errors: Option<u64>,
51}
52
53impl DataItem {
54    pub fn new(name: String, data_type: DataType) -> Self {
55        DataItem { name, data_type }
56    }
57}
58
59impl GraphBatch {
60    fn new(max_type_errors: Option<u64>) -> Self {
61        GraphBatch {
62            vertex_ids: Vec::new(),
63            vertex_attribute_values: Vec::new(),
64            edge_from_ids: Vec::new(),
65            edge_to_ids: Vec::new(),
66            edge_attribute_values: Vec::new(),
67            type_error_count: 0,
68            type_error_messages: Vec::new(),
69            max_type_errors,
70        }
71    }
72
73    fn add_type_error(&mut self, message: String) {
74        self.type_error_count += 1;
75        // Add message only if we haven't reached the limit (or if there's no limit)
76        if let Some(max) = self.max_type_errors {
77            if (self.type_error_messages.len() as u64) < max {
78                self.type_error_messages.push(message);
79            }
80        } else {
81            // No limit, add all messages
82            self.type_error_messages.push(message);
83        }
84    }
85}
86
87/// Convert and validate a value according to the expected data type
88fn convert_and_validate(
89    value: &Value,
90    expected_type: &DataType,
91    attr_name: &str,
92    entity_id: &str,
93) -> Result<Value, String> {
94    match expected_type {
95        DataType::Bool => {
96            if let Some(b) = value.as_bool() {
97                Ok(Value::Bool(b))
98            } else if let Some(s) = value.as_str() {
99                // Try to parse string as bool
100                match s.to_lowercase().as_str() {
101                    "true" | "1" | "yes" => Ok(Value::Bool(true)),
102                    "false" | "0" | "no" => Ok(Value::Bool(false)),
103                    _ => Err(format!(
104                        "Cannot convert '{}' to bool for attribute '{}' in entity '{}'",
105                        s, attr_name, entity_id
106                    )),
107                }
108            } else if let Some(n) = value.as_i64() {
109                Ok(Value::Bool(n != 0))
110            } else if let Some(n) = value.as_u64() {
111                Ok(Value::Bool(n != 0))
112            } else if let Some(n) = value.as_f64() {
113                Ok(Value::Bool(n != 0.0 && n != -0.0))
114            } else {
115                Err(format!(
116                    "Cannot convert {:?} to bool for attribute '{}' in entity '{}'",
117                    value, attr_name, entity_id
118                ))
119            }
120        }
121        DataType::String => {
122            if let Some(s) = value.as_str() {
123                Ok(Value::String(s.to_string()))
124            } else if value.is_null() {
125                Ok(Value::String(String::new()))
126            } else if let Some(b) = value.as_bool() {
127                Ok(Value::String(b.to_string()))
128            } else if let Some(n) = value.as_i64() {
129                Ok(Value::String(n.to_string()))
130            } else if let Some(n) = value.as_u64() {
131                Ok(Value::String(n.to_string()))
132            } else if let Some(n) = value.as_f64() {
133                Ok(Value::String(n.to_string()))
134            } else {
135                // For objects/arrays, convert to JSON string
136                Ok(Value::String(value.to_string()))
137            }
138        }
139        DataType::U64 => {
140            if let Some(n) = value.as_u64() {
141                Ok(Value::Number(n.into()))
142            } else if let Some(n) = value.as_i64() {
143                if n >= 0 {
144                    Ok(Value::Number((n as u64).into()))
145                } else {
146                    Err(format!(
147                        "Cannot convert negative number {} to u64 for attribute '{}' in entity '{}'",
148                        n, attr_name, entity_id
149                    ))
150                }
151            } else if let Some(f) = value.as_f64() {
152                // Note that floating point values which are way larger than 2^53 are integers
153                // anyway, so every f64 value which is strictly smaller than 2^64 can be cast
154                // faithfully to u64. Since Rust 1.45 the behaviour above that is "saturating",
155                // so that the result of a cast is u64::MAX for f64 values >= 2^64, however,
156                // this is not a faithfull representation of the number.
157                // Since NaN values and infinities are not between 0.0 and 2^64, and since
158                // the `round` call cannot bring us close to 2^64, the following code is correct:
159                if f >= 0.0 && f < 2.0_f64.powi(64) {
160                    Ok(Value::Number((f.round() as u64).into()))
161                } else {
162                    Err(format!(
163                        "Cannot convert {} to u64 for attribute '{}' in entity '{}'",
164                        f, attr_name, entity_id
165                    ))
166                }
167            } else if let Some(s) = value.as_str() {
168                s.parse::<u64>()
169                    .map(|n| Value::Number(n.into()))
170                    .map_err(|_| {
171                        format!(
172                            "Cannot parse '{}' as u64 for attribute '{}' in entity '{}'",
173                            s, attr_name, entity_id
174                        )
175                    })
176            } else {
177                Err(format!(
178                    "Cannot convert {:?} to u64 for attribute '{}' in entity '{}'",
179                    value, attr_name, entity_id
180                ))
181            }
182        }
183        DataType::I64 => {
184            if let Some(n) = value.as_i64() {
185                Ok(Value::Number(n.into()))
186            } else if let Some(n) = value.as_u64() {
187                if n <= i64::MAX as u64 {
188                    Ok(Value::Number((n as i64).into()))
189                } else {
190                    Err(format!(
191                        "Cannot convert {} to i64 (overflow) for attribute '{}' in entity '{}'",
192                        n, attr_name, entity_id
193                    ))
194                }
195            } else if let Some(f) = value.as_f64() {
196                // Note that floating point values whose absolute value
197                // is way larger than 2^53 are integers anyway, so every
198                // f64 value which is greater or equal to 2^63 and
199                // strictly smaller than 2^63 can be cast faithfully
200                // to i64. Since NaN values and infinities are not
201                // between -2^63 and 2^63, and since the `round` call
202                // cannot bring us close to absolutele value 2^63, the
203                // following code is correct:
204                if f >= -(2.0_f64.powi(63)) && f < 2.0_f64.powi(63) {
205                    Ok(Value::Number((f.round() as i64).into()))
206                } else {
207                    Err(format!(
208                        "Cannot convert {} to i64 for attribute '{}' in entity '{}'",
209                        f, attr_name, entity_id
210                    ))
211                }
212            } else if let Some(s) = value.as_str() {
213                s.parse::<i64>()
214                    .map(|n| Value::Number(n.into()))
215                    .map_err(|_| {
216                        format!(
217                            "Cannot parse '{}' as i64 for attribute '{}' in entity '{}'",
218                            s, attr_name, entity_id
219                        )
220                    })
221            } else {
222                Err(format!(
223                    "Cannot convert {:?} to i64 for attribute '{}' in entity '{}'",
224                    value, attr_name, entity_id
225                ))
226            }
227        }
228        DataType::F64 => {
229            let make_number = |raw: f64| {
230                serde_json::Number::from_f64(raw)
231                    .map(Value::Number)
232                    .ok_or_else(|| {
233                        format!(
234                            "Cannot represent '{}' as finite f64 for attribute '{}' in entity '{}'",
235                            raw, attr_name, entity_id
236                        )
237                    })
238            };
239
240            if let Some(f) = value.as_f64() {
241                make_number(f)
242            } else if let Some(n) = value.as_i64() {
243                make_number(n as f64)
244            } else if let Some(n) = value.as_u64() {
245                make_number(n as f64)
246            } else if let Some(s) = value.as_str() {
247                let parsed = s.parse::<f64>().map_err(|_| {
248                    format!(
249                        "Cannot parse '{}' as f64 for attribute '{}' in entity '{}'",
250                        s, attr_name, entity_id
251                    )
252                })?;
253                make_number(parsed)
254            } else {
255                Err(format!(
256                    "Cannot convert {:?} to f64 for attribute '{}' in entity '{}'",
257                    value, attr_name, entity_id
258                ))
259            }
260        }
261        DataType::JSON => {
262            // JSON type accepts anything
263            Ok(value.clone())
264        }
265    }
266}
267
268/// Get default value for a data type
269fn default_value_for_type(data_type: &DataType) -> Value {
270    match data_type {
271        DataType::Bool => Value::Bool(false),
272        DataType::String => Value::String(String::new()),
273        DataType::U64 => Value::Number(0.into()),
274        DataType::I64 => Value::Number(0.into()),
275        DataType::F64 => Value::Number(serde_json::Number::from_f64(0.0).unwrap()),
276        DataType::JSON => Value::Null,
277    }
278}
279
280/// An AQL query with bind variables
281#[derive(Clone, Debug)]
282pub struct AqlQuery {
283    pub query: String,
284    pub bind_vars: HashMap<String, Value>,
285}
286
287impl AqlQuery {
288    pub fn new(query: String, bind_vars: HashMap<String, Value>) -> Self {
289        AqlQuery { query, bind_vars }
290    }
291}
292
293/// AQL-based graph loader
294pub struct AqlGraphLoader {
295    db_config: DatabaseConfiguration,
296    batch_size: u64,
297    vertex_attributes: Vec<DataItem>,
298    edge_attributes: Vec<DataItem>,
299    queries: Vec<Vec<AqlQuery>>,
300    /// Maximum number of type error messages to collect per GraphBatch (None = no limit)
301    max_type_errors: Option<u64>,
302}
303
304#[derive(Debug, Serialize, Deserialize)]
305struct CursorOptions {
306    stream: bool,
307}
308
309impl CursorOptions {
310    pub fn new(stream: bool) -> Self {
311        Self { stream }
312    }
313}
314
315#[derive(Debug, Serialize, Deserialize)]
316#[serde(rename_all = "camelCase")]
317struct CreateCursorBody {
318    query: String,
319    options: CursorOptions,
320    #[serde(skip_serializing_if = "Option::is_none")]
321    batch_size: Option<u64>,
322    #[serde(skip_serializing_if = "Option::is_none")]
323    bind_vars: Option<HashMap<String, Value>>,
324}
325
326impl CreateCursorBody {
327    pub fn from_streaming_query_with_size(
328        query: String,
329        batch_size: Option<u64>,
330        bind_vars: Option<HashMap<String, Value>>,
331    ) -> Self {
332        Self {
333            query,
334            batch_size,
335            options: CursorOptions::new(true),
336            bind_vars,
337        }
338    }
339}
340
341#[derive(Debug, Serialize, Deserialize)]
342#[serde(rename_all = "camelCase")]
343struct CursorResponse {
344    has_more: Option<bool>,
345    id: Option<String>,
346}
347
348#[derive(Debug, Serialize, Deserialize)]
349struct GraphData {
350    vertices: Option<Vec<Value>>,
351    edges: Option<Vec<Value>>,
352}
353
354impl AqlGraphLoader {
355    /// Create a new AQL graph loader
356    ///
357    /// # Arguments
358    ///
359    /// * `db_config` - Database configuration
360    /// * `batch_size` - Size of batches to fetch from the database
361    /// * `vertex_attributes` - List of vertex attributes to load with their types
362    /// * `edge_attributes` - List of edge attributes to load with their types
363    /// * `queries` - Nested list of AQL queries (outer list = sequential, inner list = parallel)
364    /// * `max_type_errors` - Maximum number of type error messages to collect per GraphBatch
365    ///   - `None` (recommended default): All type errors are reported
366    ///   - `Some(n)`: At most n type error messages are collected per GraphBatch (can be 0)
367    pub fn new(
368        db_config: DatabaseConfiguration,
369        batch_size: u64,
370        vertex_attributes: Vec<DataItem>,
371        edge_attributes: Vec<DataItem>,
372        queries: Vec<Vec<AqlQuery>>,
373        max_type_errors: Option<u64>,
374    ) -> Result<Self, GraphLoaderError> {
375        // Validate that we have at least one query
376        if queries.is_empty() || queries.iter().all(|q| q.is_empty()) {
377            return Err(GraphLoaderError::Other(
378                "At least one AQL query must be provided".to_string(),
379            ));
380        }
381
382        Ok(AqlGraphLoader {
383            db_config,
384            batch_size,
385            vertex_attributes,
386            edge_attributes,
387            queries,
388            max_type_errors,
389        })
390    }
391
392    /// Execute all AQL queries and call the provided callback with vertices and edges
393    pub async fn do_load<F>(&self, callback: F) -> Result<(), GraphLoaderError>
394    where
395        F: Fn(&mut GraphBatch) -> Result<(), GraphLoaderError> + Send + Sync + Clone + 'static,
396    {
397        // Build the client
398        let use_tls = self.db_config.endpoints[0].starts_with("https://");
399        let client_config = ClientConfig::builder()
400            .n_retries(5)
401            .use_tls(use_tls)
402            .tls_cert_opt(self.db_config.tls_cert.clone())
403            .build();
404        let client = build_client(&client_config)?;
405
406        // Process each group of queries sequentially
407        for query_group in &self.queries {
408            self.execute_query_group(&client, query_group, callback.clone())
409                .await?;
410        }
411
412        Ok(())
413    }
414
415    async fn execute_query_group<F>(
416        &self,
417        client: &ClientWithMiddleware,
418        queries: &[AqlQuery],
419        callback: F,
420    ) -> Result<(), GraphLoaderError>
421    where
422        F: Fn(&mut GraphBatch) -> Result<(), GraphLoaderError> + Send + Sync + Clone + 'static,
423    {
424        // Create a channel for receiving data
425        let (sender, mut receiver) = tokio::sync::mpsc::channel::<Bytes>(10);
426
427        // Spawn consumer thread
428        let callback_clone = callback.clone();
429        let vertex_attributes = self.vertex_attributes.clone();
430        let edge_attributes = self.edge_attributes.clone();
431        let max_type_errors = self.max_type_errors;
432
433        let consumer = std::thread::spawn(move || -> Result<(), GraphLoaderError> {
434            while let Some(resp) = receiver.blocking_recv() {
435                let body = std::str::from_utf8(resp.as_ref())
436                    .map_err(|e| format!("UTF8 error when parsing body: {:?}", e))?;
437
438                // Parse the cursor response
439                let cursor_result: serde_json::Result<CursorResponse> = serde_json::from_str(body);
440                if cursor_result.is_err() {
441                    return Err(GraphLoaderError::ParseError(format!(
442                        "Failed to parse cursor response: {:?}",
443                        cursor_result.err()
444                    )));
445                }
446
447                // Parse the actual result data
448                let parsed: serde_json::Result<serde_json::Map<String, Value>> =
449                    serde_json::from_str(body);
450                if parsed.is_err() {
451                    return Err(GraphLoaderError::ParseError(format!(
452                        "Failed to parse result data: {:?}",
453                        parsed.err()
454                    )));
455                }
456
457                let data = parsed.unwrap();
458                let result = data.get("result");
459                if result.is_none() {
460                    continue;
461                }
462
463                let result_array = result.unwrap().as_array();
464                if result_array.is_none() {
465                    continue;
466                }
467
468                // Create a single batch for this database response
469                let mut batch = GraphBatch::new(max_type_errors);
470
471                // Process each item in the result array and accumulate into the batch
472                for item in result_array.unwrap() {
473                    let graph_data: serde_json::Result<GraphData> =
474                        serde_json::from_value(item.clone());
475                    if let Ok(graph_data) = graph_data {
476                        // Extract and validate vertices
477                        if let Some(vertices) = graph_data.vertices {
478                            for vertex in vertices {
479                                if let Some(id) = vertex.get("_id")
480                                    && let Some(id_str) = id.as_str()
481                                {
482                                    batch.vertex_ids.push(id_str.as_bytes().to_vec());
483
484                                    // Extract and validate attributes only if there are any
485                                    if !vertex_attributes.is_empty() {
486                                        let mut attrs = Vec::new();
487                                        for attr_def in &vertex_attributes {
488                                            let raw_value = vertex
489                                                .get(&attr_def.name)
490                                                .cloned()
491                                                .unwrap_or(Value::Null);
492
493                                            match convert_and_validate(
494                                                &raw_value,
495                                                &attr_def.data_type,
496                                                &attr_def.name,
497                                                id_str,
498                                            ) {
499                                                Ok(converted) => attrs.push(converted),
500                                                Err(err_msg) => {
501                                                    batch.add_type_error(err_msg);
502                                                    attrs.push(default_value_for_type(
503                                                        &attr_def.data_type,
504                                                    ));
505                                                }
506                                            }
507                                        }
508                                        batch.vertex_attribute_values.push(attrs);
509                                    }
510                                }
511                            }
512                        }
513
514                        // Extract and validate edges
515                        if let Some(edges) = graph_data.edges {
516                            for edge in edges {
517                                // Skip null edges (e.g., from traversal start node)
518                                if edge.is_null() {
519                                    continue;
520                                }
521
522                                if let (Some(from), Some(to)) = (edge.get("_from"), edge.get("_to"))
523                                    && let (Some(from_str), Some(to_str)) =
524                                        (from.as_str(), to.as_str())
525                                {
526                                    let edge_id = format!("{}-->{}", from_str, to_str);
527                                    batch.edge_from_ids.push(from_str.as_bytes().to_vec());
528                                    batch.edge_to_ids.push(to_str.as_bytes().to_vec());
529
530                                    // Extract and validate attributes only if there are any
531                                    if !edge_attributes.is_empty() {
532                                        let mut attrs = Vec::new();
533                                        for attr_def in &edge_attributes {
534                                            let raw_value = edge
535                                                .get(&attr_def.name)
536                                                .cloned()
537                                                .unwrap_or(Value::Null);
538
539                                            match convert_and_validate(
540                                                &raw_value,
541                                                &attr_def.data_type,
542                                                &attr_def.name,
543                                                &edge_id,
544                                            ) {
545                                                Ok(converted) => attrs.push(converted),
546                                                Err(err_msg) => {
547                                                    batch.add_type_error(err_msg);
548                                                    attrs.push(default_value_for_type(
549                                                        &attr_def.data_type,
550                                                    ));
551                                                }
552                                            }
553                                        }
554                                        batch.edge_attribute_values.push(attrs);
555                                    }
556                                }
557                            }
558                        }
559                    }
560                }
561
562                // Call the callback once with the accumulated batch from this database response
563                callback_clone(&mut batch)?;
564            }
565            Ok(())
566        });
567
568        // Execute all queries in parallel
569        let mut task_set = JoinSet::new();
570
571        for query in queries {
572            let client_clone = client.clone();
573            let db_config = self.db_config.clone();
574            let query_clone = query.clone();
575            let batch_size = self.batch_size;
576            let sender_clone = sender.clone();
577
578            task_set.spawn(async move {
579                Self::execute_single_query(
580                    &client_clone,
581                    &db_config,
582                    &query_clone,
583                    batch_size,
584                    sender_clone,
585                )
586                .await
587            });
588        }
589
590        // Drop the original sender so the receiver can finish
591        drop(sender);
592
593        // Wait for all tasks to complete
594        let mut errors: Vec<String> = Vec::new();
595        while let Some(res) = task_set.join_next().await {
596            match res {
597                Ok(Ok(())) => {}
598                Ok(Err(e)) => {
599                    errors.push(e.to_string());
600                }
601                Err(e) => {
602                    errors.push(format!("Task join error: {}", e));
603                }
604            }
605        }
606
607        // Wait for consumer to finish
608        let consumer_result = consumer
609            .join()
610            .map_err(|_| GraphLoaderError::Other("Consumer thread panicked".to_string()))?;
611
612        if !errors.is_empty() {
613            return Err(GraphLoaderError::Other(format!(
614                "Errors occurred during query execution: {}",
615                errors.join("; ")
616            )));
617        }
618
619        consumer_result
620    }
621
622    async fn execute_single_query(
623        client: &ClientWithMiddleware,
624        db_config: &DatabaseConfiguration,
625        query: &AqlQuery,
626        batch_size: u64,
627        sender: tokio::sync::mpsc::Sender<Bytes>,
628    ) -> Result<(), GraphLoaderError> {
629        let make_cursor_url = |path: &str| -> String {
630            let suffix = "/_api/cursor".to_owned() + path;
631            make_url(db_config, suffix.as_str())
632        };
633
634        // Create cursor
635        let body = CreateCursorBody::from_streaming_query_with_size(
636            query.query.clone(),
637            Some(batch_size),
638            Some(query.bind_vars.clone()),
639        );
640        let body_v = serde_json::to_vec::<CreateCursorBody>(&body).map_err(|e| {
641            GraphLoaderError::ParseError(format!("Failed to serialize body: {}", e))
642        })?;
643
644        let url = make_cursor_url("");
645        let cursor_create_resp = handle_auth(client.post(url), db_config)
646            .body(body_v)
647            .send()
648            .await;
649
650        let response = cursor_create_resp?;
651        if !response.status().is_success() {
652            let status = response.status();
653            let body = response.text().await.unwrap_or_default();
654            return Err(GraphLoaderError::Other(format!(
655                "Cursor creation failed with status {}: {}",
656                status, body
657            )));
658        }
659
660        let bytes_res = response
661            .bytes()
662            .await
663            .map_err(|e| GraphLoaderError::ParseError(format!("Error reading response: {}", e)))?;
664
665        let response_info = serde_json::from_slice::<CursorResponse>(&bytes_res)
666            .map_err(|e| GraphLoaderError::ParseError(format!("Failed to parse cursor: {}", e)))?;
667
668        // Send first batch
669        sender
670            .send(bytes_res)
671            .await
672            .map_err(|e| GraphLoaderError::Other(format!("Failed to send data: {}", e)))?;
673
674        // Fetch remaining batches if needed
675        if let Some(cursor_id) = response_info.id
676            && response_info.has_more.unwrap_or(false)
677        {
678            loop {
679                let url = make_cursor_url(&format!("/{}", cursor_id));
680                let resp = handle_auth(client.post(url), db_config).send().await;
681
682                let resp =
683                    crate::request::handle_arangodb_response(resp, |c| c == StatusCode::OK).await?;
684
685                let bytes_res = resp.bytes().await.map_err(|e| {
686                    GraphLoaderError::ParseError(format!("Error reading response: {}", e))
687                })?;
688
689                let response_info =
690                    serde_json::from_slice::<CursorResponse>(&bytes_res).map_err(|e| {
691                        GraphLoaderError::ParseError(format!("Failed to parse cursor: {}", e))
692                    })?;
693
694                sender
695                    .send(bytes_res)
696                    .await
697                    .map_err(|e| GraphLoaderError::Other(format!("Failed to send data: {}", e)))?;
698
699                if !response_info.has_more.unwrap_or(false) {
700                    break;
701                }
702            }
703
704            // Clean up cursor, ignore result since cursors are cleaned up automatically
705            let delete_url = make_cursor_url(&format!("/{}", cursor_id));
706            let _ = handle_auth(client.delete(delete_url), db_config)
707                .send()
708                .await;
709        }
710
711        Ok(())
712    }
713}
714
715#[cfg(test)]
716mod tests {
717    use super::*;
718    use serde_json::json;
719
720    // Helper function to create a Value from any JSON input
721    fn val(v: serde_json::Value) -> Value {
722        v
723    }
724
725    #[test]
726    fn test_convert_bool_from_bool() {
727        let result = convert_and_validate(&val(json!(true)), &DataType::Bool, "field", "test_id");
728        assert_eq!(result, Ok(Value::Bool(true)));
729
730        let result = convert_and_validate(&val(json!(false)), &DataType::Bool, "field", "test_id");
731        assert_eq!(result, Ok(Value::Bool(false)));
732    }
733
734    #[test]
735    fn test_convert_bool_from_string() {
736        // Test various true representations
737        for s in &["true", "True", "TRUE", "1", "yes", "Yes", "YES"] {
738            let result = convert_and_validate(&val(json!(s)), &DataType::Bool, "field", "test_id");
739            assert_eq!(result, Ok(Value::Bool(true)), "Failed for string: {}", s);
740        }
741
742        // Test various false representations
743        for s in &["false", "False", "FALSE", "0", "no", "No", "NO"] {
744            let result = convert_and_validate(&val(json!(s)), &DataType::Bool, "field", "test_id");
745            assert_eq!(result, Ok(Value::Bool(false)), "Failed for string: {}", s);
746        }
747
748        // Test invalid string
749        let result =
750            convert_and_validate(&val(json!("maybe")), &DataType::Bool, "field", "test_id");
751        assert!(result.is_err());
752    }
753
754    #[test]
755    fn test_convert_bool_from_numbers() {
756        // From i64
757        let result = convert_and_validate(&val(json!(0)), &DataType::Bool, "field", "test_id");
758        assert_eq!(result, Ok(Value::Bool(false)));
759
760        let result = convert_and_validate(&val(json!(1)), &DataType::Bool, "field", "test_id");
761        assert_eq!(result, Ok(Value::Bool(true)));
762
763        let result = convert_and_validate(&val(json!(-5)), &DataType::Bool, "field", "test_id");
764        assert_eq!(result, Ok(Value::Bool(true)));
765
766        // From u64
767        let result = convert_and_validate(&val(json!(0u64)), &DataType::Bool, "field", "test_id");
768        assert_eq!(result, Ok(Value::Bool(false)));
769
770        let result = convert_and_validate(&val(json!(42u64)), &DataType::Bool, "field", "test_id");
771        assert_eq!(result, Ok(Value::Bool(true)));
772
773        // From f64
774        let result = convert_and_validate(&val(json!(0.0)), &DataType::Bool, "field", "test_id");
775        assert_eq!(result, Ok(Value::Bool(false)));
776
777        let result = convert_and_validate(&val(json!(3.15)), &DataType::Bool, "field", "test_id");
778        assert_eq!(result, Ok(Value::Bool(true)));
779    }
780
781    #[test]
782    fn test_convert_string_from_various_types() {
783        // From string
784        let result =
785            convert_and_validate(&val(json!("hello")), &DataType::String, "field", "test_id");
786        assert_eq!(result, Ok(Value::String("hello".to_string())));
787
788        // From null
789        let result = convert_and_validate(&val(json!(null)), &DataType::String, "field", "test_id");
790        assert_eq!(result, Ok(Value::String(String::new())));
791
792        // From bool
793        let result = convert_and_validate(&val(json!(true)), &DataType::String, "field", "test_id");
794        assert_eq!(result, Ok(Value::String("true".to_string())));
795
796        // From i64
797        let result = convert_and_validate(&val(json!(42)), &DataType::String, "field", "test_id");
798        assert_eq!(result, Ok(Value::String("42".to_string())));
799
800        // From u64
801        let result =
802            convert_and_validate(&val(json!(42u64)), &DataType::String, "field", "test_id");
803        assert_eq!(result, Ok(Value::String("42".to_string())));
804
805        // From f64
806        let result = convert_and_validate(&val(json!(3.15)), &DataType::String, "field", "test_id");
807        assert_eq!(result, Ok(Value::String("3.15".to_string())));
808
809        // From object/array (should convert to JSON string)
810        let result = convert_and_validate(
811            &val(json!({"key": "value"})),
812            &DataType::String,
813            "field",
814            "test_id",
815        );
816        assert!(result.is_ok());
817        if let Ok(Value::String(s)) = result {
818            assert!(s.contains("key"));
819            assert!(s.contains("value"));
820        }
821    }
822
823    #[test]
824    fn test_convert_u64_from_u64() {
825        let result = convert_and_validate(&val(json!(42u64)), &DataType::U64, "field", "test_id");
826        assert_eq!(result, Ok(json!(42)));
827
828        let result = convert_and_validate(&val(json!(0u64)), &DataType::U64, "field", "test_id");
829        assert_eq!(result, Ok(json!(0)));
830    }
831
832    #[test]
833    fn test_convert_u64_from_i64() {
834        // Positive i64 should work
835        let result = convert_and_validate(&val(json!(42)), &DataType::U64, "field", "test_id");
836        assert_eq!(result, Ok(json!(42)));
837
838        let result = convert_and_validate(&val(json!(0)), &DataType::U64, "field", "test_id");
839        assert_eq!(result, Ok(json!(0)));
840
841        // Negative i64 should fail
842        let result = convert_and_validate(&val(json!(-1)), &DataType::U64, "field", "test_id");
843        assert!(result.is_err());
844    }
845
846    #[test]
847    fn test_convert_u64_from_f64() {
848        // Positive float should round
849        let result = convert_and_validate(&val(json!(42.7)), &DataType::U64, "field", "test_id");
850        assert_eq!(result, Ok(json!(43)));
851
852        let result = convert_and_validate(&val(json!(42.3)), &DataType::U64, "field", "test_id");
853        assert_eq!(result, Ok(json!(42)));
854
855        // Negative float should fail
856        let result = convert_and_validate(&val(json!(-1.5)), &DataType::U64, "field", "test_id");
857        assert!(result.is_err());
858    }
859
860    #[test]
861    fn test_convert_u64_from_string() {
862        let result = convert_and_validate(&val(json!("42")), &DataType::U64, "field", "test_id");
863        assert_eq!(result, Ok(json!(42)));
864
865        // Invalid string should fail
866        let result = convert_and_validate(
867            &val(json!("not_a_number")),
868            &DataType::U64,
869            "field",
870            "test_id",
871        );
872        assert!(result.is_err());
873
874        // Negative string should fail
875        let result = convert_and_validate(&val(json!("-1")), &DataType::U64, "field", "test_id");
876        assert!(result.is_err());
877    }
878
879    #[test]
880    fn test_convert_i64_from_i64() {
881        let result = convert_and_validate(&val(json!(42)), &DataType::I64, "field", "test_id");
882        assert_eq!(result, Ok(json!(42)));
883
884        let result = convert_and_validate(&val(json!(-42)), &DataType::I64, "field", "test_id");
885        assert_eq!(result, Ok(json!(-42)));
886
887        let result = convert_and_validate(&val(json!(0)), &DataType::I64, "field", "test_id");
888        assert_eq!(result, Ok(json!(0)));
889    }
890
891    #[test]
892    fn test_convert_i64_from_u64() {
893        // u64 within i64 range should work
894        let result = convert_and_validate(&val(json!(42u64)), &DataType::I64, "field", "test_id");
895        assert_eq!(result, Ok(json!(42)));
896
897        // u64 exceeding i64::MAX should fail
898        let result =
899            convert_and_validate(&val(json!(u64::MAX)), &DataType::I64, "field", "test_id");
900        assert!(result.is_err());
901    }
902
903    #[test]
904    fn test_convert_i64_from_f64() {
905        // Float within range should round
906        let result = convert_and_validate(&val(json!(42.7)), &DataType::I64, "field", "test_id");
907        assert_eq!(result, Ok(json!(43)));
908
909        let result = convert_and_validate(&val(json!(-42.3)), &DataType::I64, "field", "test_id");
910        assert_eq!(result, Ok(json!(-42)));
911    }
912
913    #[test]
914    fn test_convert_i64_from_string() {
915        let result = convert_and_validate(&val(json!("42")), &DataType::I64, "field", "test_id");
916        assert_eq!(result, Ok(json!(42)));
917
918        let result = convert_and_validate(&val(json!("-42")), &DataType::I64, "field", "test_id");
919        assert_eq!(result, Ok(json!(-42)));
920
921        // Invalid string should fail
922        let result = convert_and_validate(
923            &val(json!("not_a_number")),
924            &DataType::I64,
925            "field",
926            "test_id",
927        );
928        assert!(result.is_err());
929    }
930
931    #[test]
932    fn test_convert_f64_from_f64() {
933        let result = convert_and_validate(&val(json!(3.15)), &DataType::F64, "field", "test_id");
934        assert!(result.is_ok());
935        if let Ok(Value::Number(n)) = result {
936            assert_eq!(n.as_f64(), Some(3.15));
937        }
938
939        let result = convert_and_validate(&val(json!(-2.5)), &DataType::F64, "field", "test_id");
940        assert!(result.is_ok());
941    }
942
943    #[test]
944    fn test_convert_f64_from_integers() {
945        // From i64
946        let result = convert_and_validate(&val(json!(42)), &DataType::F64, "field", "test_id");
947        assert!(result.is_ok());
948        if let Ok(Value::Number(n)) = result {
949            assert_eq!(n.as_f64(), Some(42.0));
950        }
951
952        // From u64
953        let result = convert_and_validate(&val(json!(42u64)), &DataType::F64, "field", "test_id");
954        assert!(result.is_ok());
955    }
956
957    #[test]
958    fn test_convert_f64_from_string() {
959        let result = convert_and_validate(&val(json!("3.15")), &DataType::F64, "field", "test_id");
960        assert!(result.is_ok());
961        if let Ok(Value::Number(n)) = result {
962            assert_eq!(n.as_f64(), Some(3.15));
963        }
964
965        let result = convert_and_validate(&val(json!("-2.5")), &DataType::F64, "field", "test_id");
966        assert!(result.is_ok());
967
968        // Invalid string should fail
969        let result = convert_and_validate(
970            &val(json!("not_a_number")),
971            &DataType::F64,
972            "field",
973            "test_id",
974        );
975        assert!(result.is_err());
976    }
977
978    #[test]
979    fn test_convert_json_accepts_anything() {
980        // JSON type should accept any value as-is
981        let test_values = vec![
982            json!(null),
983            json!(true),
984            json!(false),
985            json!(42),
986            json!(-42),
987            json!(3.15),
988            json!("hello"),
989            json!({"key": "value"}),
990            json!([1, 2, 3]),
991        ];
992
993        for value in test_values {
994            let result = convert_and_validate(&value, &DataType::JSON, "field", "test_id");
995            assert_eq!(result, Ok(value.clone()), "Failed for value: {:?}", value);
996        }
997    }
998
999    #[test]
1000    fn test_default_values() {
1001        assert_eq!(default_value_for_type(&DataType::Bool), Value::Bool(false));
1002        assert_eq!(
1003            default_value_for_type(&DataType::String),
1004            Value::String(String::new())
1005        );
1006        assert_eq!(default_value_for_type(&DataType::U64), json!(0));
1007        assert_eq!(default_value_for_type(&DataType::I64), json!(0));
1008        assert!(default_value_for_type(&DataType::F64).is_number());
1009        assert_eq!(default_value_for_type(&DataType::JSON), Value::Null);
1010    }
1011
1012    #[test]
1013    fn test_error_messages_contain_context() {
1014        // Test that error messages include the attribute name and entity ID
1015        let result = convert_and_validate(
1016            &val(json!("invalid")),
1017            &DataType::U64,
1018            "my_field",
1019            "entity_123",
1020        );
1021        assert!(result.is_err());
1022        if let Err(msg) = result {
1023            assert!(msg.contains("my_field"));
1024            assert!(msg.contains("entity_123"));
1025        }
1026
1027        let result = convert_and_validate(&val(json!(-1)), &DataType::U64, "age", "user:42");
1028        assert!(result.is_err());
1029        if let Err(msg) = result {
1030            assert!(msg.contains("age"));
1031            assert!(msg.contains("user:42"));
1032        }
1033    }
1034
1035    #[test]
1036    fn test_graph_batch_type_error_tracking() {
1037        // Test with a limit of 10
1038        let mut batch = GraphBatch::new(Some(10));
1039
1040        assert_eq!(batch.type_error_count, 0);
1041        assert_eq!(batch.type_error_messages.len(), 0);
1042
1043        // Add first error
1044        batch.add_type_error("Error 1".to_string());
1045        assert_eq!(batch.type_error_count, 1);
1046        assert_eq!(batch.type_error_messages.len(), 1);
1047
1048        // Add more errors
1049        for i in 2..=12 {
1050            batch.add_type_error(format!("Error {}", i));
1051        }
1052
1053        // Count should be 12, but messages capped at 10
1054        assert_eq!(batch.type_error_count, 12);
1055        assert_eq!(batch.type_error_messages.len(), 10);
1056    }
1057
1058    #[test]
1059    fn test_graph_batch_type_error_no_limit() {
1060        // Test with no limit
1061        let mut batch = GraphBatch::new(None);
1062
1063        assert_eq!(batch.type_error_count, 0);
1064        assert_eq!(batch.type_error_messages.len(), 0);
1065
1066        // Add errors
1067        for i in 1..=15 {
1068            batch.add_type_error(format!("Error {}", i));
1069        }
1070
1071        // All 15 errors should be tracked
1072        assert_eq!(batch.type_error_count, 15);
1073        assert_eq!(batch.type_error_messages.len(), 15);
1074    }
1075
1076    #[test]
1077    fn test_graph_batch_type_error_zero_limit() {
1078        // Test with zero limit (no messages collected, but count still tracked)
1079        let mut batch = GraphBatch::new(Some(0));
1080
1081        assert_eq!(batch.type_error_count, 0);
1082        assert_eq!(batch.type_error_messages.len(), 0);
1083
1084        // Add errors
1085        for i in 1..=5 {
1086            batch.add_type_error(format!("Error {}", i));
1087        }
1088
1089        // Count should be 5, but no messages collected
1090        assert_eq!(batch.type_error_count, 5);
1091        assert_eq!(batch.type_error_messages.len(), 0);
1092    }
1093
1094    #[test]
1095    fn test_data_item_creation() {
1096        let item = DataItem::new("test_field".to_string(), DataType::String);
1097        assert_eq!(item.name, "test_field");
1098        assert_eq!(item.data_type, DataType::String);
1099    }
1100
1101    #[test]
1102    fn test_aql_query_creation() {
1103        let mut bind_vars = HashMap::new();
1104        bind_vars.insert("param1".to_string(), json!("value1"));
1105
1106        let query = AqlQuery::new("FOR v IN vertices RETURN v".to_string(), bind_vars.clone());
1107
1108        assert_eq!(query.query, "FOR v IN vertices RETURN v");
1109        assert_eq!(query.bind_vars.len(), 1);
1110        assert_eq!(query.bind_vars.get("param1"), Some(&json!("value1")));
1111    }
1112
1113    #[test]
1114    fn test_convert_bool_from_object_should_fail() {
1115        let result = convert_and_validate(
1116            &val(json!({"key": "value"})),
1117            &DataType::Bool,
1118            "field",
1119            "test_id",
1120        );
1121        assert!(result.is_err());
1122    }
1123
1124    #[test]
1125    fn test_convert_u64_from_object_should_fail() {
1126        let result = convert_and_validate(
1127            &val(json!({"key": "value"})),
1128            &DataType::U64,
1129            "field",
1130            "test_id",
1131        );
1132        assert!(result.is_err());
1133    }
1134
1135    #[test]
1136    fn test_convert_i64_from_object_should_fail() {
1137        let result = convert_and_validate(
1138            &val(json!({"key": "value"})),
1139            &DataType::I64,
1140            "field",
1141            "test_id",
1142        );
1143        assert!(result.is_err());
1144    }
1145
1146    #[test]
1147    fn test_convert_f64_from_object_should_fail() {
1148        let result = convert_and_validate(
1149            &val(json!({"key": "value"})),
1150            &DataType::F64,
1151            "field",
1152            "test_id",
1153        );
1154        assert!(result.is_err());
1155    }
1156}