arangors_graph_exporter/
aql_graph_loader.rs

1use crate::DatabaseConfiguration;
2use crate::client::auth::handle_auth;
3use crate::client::config::ClientConfig;
4use crate::client::{build_client, make_url};
5use crate::errors::GraphLoaderError;
6use bytes::Bytes;
7use reqwest::StatusCode;
8use reqwest_middleware::ClientWithMiddleware;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use std::collections::HashMap;
12use tokio::task::JoinSet;
13
14/// Data types supported for graph attributes
15#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
16pub enum DataType {
17    Bool,
18    String,
19    U64,
20    I64,
21    F64,
22    JSON,
23}
24
25/// Description of a data item (attribute) with name and type
26#[derive(Clone, Debug)]
27pub struct DataItem {
28    pub name: String,
29    pub data_type: DataType,
30}
31
32/// A batch of graph data containing vertices and edges
33#[derive(Debug)]
34pub struct GraphBatch {
35    /// Vertex IDs as byte vectors
36    pub vertex_ids: Vec<Vec<u8>>,
37    /// Vertex attributes, parallel to vertex_ids
38    pub vertex_attribute_values: Vec<Vec<Value>>,
39    /// Edge source IDs as byte vectors
40    pub edge_from_ids: Vec<Vec<u8>>,
41    /// Edge target IDs as byte vectors
42    pub edge_to_ids: Vec<Vec<u8>>,
43    /// Edge attributes, parallel to edge_from_ids/edge_to_ids
44    pub edge_attribute_values: Vec<Vec<Value>>,
45    /// Total number of type conversion errors encountered
46    pub type_error_count: usize,
47    /// First few type error messages (up to 10)
48    pub type_error_messages: Vec<String>,
49}
50
51impl DataItem {
52    pub fn new(name: String, data_type: DataType) -> Self {
53        DataItem { name, data_type }
54    }
55}
56
57impl GraphBatch {
58    fn new() -> Self {
59        GraphBatch {
60            vertex_ids: Vec::new(),
61            vertex_attribute_values: Vec::new(),
62            edge_from_ids: Vec::new(),
63            edge_to_ids: Vec::new(),
64            edge_attribute_values: Vec::new(),
65            type_error_count: 0,
66            type_error_messages: Vec::new(),
67        }
68    }
69
70    fn add_type_error(&mut self, message: String) {
71        self.type_error_count += 1;
72        if self.type_error_messages.len() < 10 {
73            self.type_error_messages.push(message);
74        }
75    }
76}
77
78/// Convert and validate a value according to the expected data type
79fn convert_and_validate(
80    value: &Value,
81    expected_type: &DataType,
82    attr_name: &str,
83    entity_id: &str,
84) -> Result<Value, String> {
85    match expected_type {
86        DataType::Bool => {
87            if let Some(b) = value.as_bool() {
88                Ok(Value::Bool(b))
89            } else if let Some(s) = value.as_str() {
90                // Try to parse string as bool
91                match s.to_lowercase().as_str() {
92                    "true" | "1" | "yes" => Ok(Value::Bool(true)),
93                    "false" | "0" | "no" => Ok(Value::Bool(false)),
94                    _ => Err(format!(
95                        "Cannot convert '{}' to bool for attribute '{}' in entity '{}'",
96                        s, attr_name, entity_id
97                    )),
98                }
99            } else if let Some(n) = value.as_i64() {
100                Ok(Value::Bool(n != 0))
101            } else if let Some(n) = value.as_u64() {
102                Ok(Value::Bool(n != 0))
103            } else if let Some(n) = value.as_f64() {
104                Ok(Value::Bool(n != 0.0 && n != -0.0))
105            } else {
106                Err(format!(
107                    "Cannot convert {:?} to bool for attribute '{}' in entity '{}'",
108                    value, attr_name, entity_id
109                ))
110            }
111        }
112        DataType::String => {
113            if let Some(s) = value.as_str() {
114                Ok(Value::String(s.to_string()))
115            } else if value.is_null() {
116                Ok(Value::String(String::new()))
117            } else if let Some(b) = value.as_bool() {
118                Ok(Value::String(b.to_string()))
119            } else if let Some(n) = value.as_i64() {
120                Ok(Value::String(n.to_string()))
121            } else if let Some(n) = value.as_u64() {
122                Ok(Value::String(n.to_string()))
123            } else if let Some(n) = value.as_f64() {
124                Ok(Value::String(n.to_string()))
125            } else {
126                // For objects/arrays, convert to JSON string
127                Ok(Value::String(value.to_string()))
128            }
129        }
130        DataType::U64 => {
131            if let Some(n) = value.as_u64() {
132                Ok(Value::Number(n.into()))
133            } else if let Some(n) = value.as_i64() {
134                if n >= 0 {
135                    Ok(Value::Number((n as u64).into()))
136                } else {
137                    Err(format!(
138                        "Cannot convert negative number {} to u64 for attribute '{}' in entity '{}'",
139                        n, attr_name, entity_id
140                    ))
141                }
142            } else if let Some(f) = value.as_f64() {
143                // Note that floating point values which are way larger than 2^53 are integers
144                // anyway, so every f64 value which is strictly smaller than 2^64 can be cast
145                // faithfully to u64. Since Rust 1.45 the behaviour above that is "saturating",
146                // so that the result of a cast is u64::MAX for f64 values >= 2^64, however,
147                // this is not a faithfull representation of the number.
148                // Since NaN values and infinities are not between 0.0 and 2^64, and since
149                // the `round` call cannot bring us close to 2^64, the following code is correct:
150                if f >= 0.0 && f < 2.0_f64.powi(64) {
151                    Ok(Value::Number((f.round() as u64).into()))
152                } else {
153                    Err(format!(
154                        "Cannot convert {} to u64 for attribute '{}' in entity '{}'",
155                        f, attr_name, entity_id
156                    ))
157                }
158            } else if let Some(s) = value.as_str() {
159                s.parse::<u64>()
160                    .map(|n| Value::Number(n.into()))
161                    .map_err(|_| {
162                        format!(
163                            "Cannot parse '{}' as u64 for attribute '{}' in entity '{}'",
164                            s, attr_name, entity_id
165                        )
166                    })
167            } else {
168                Err(format!(
169                    "Cannot convert {:?} to u64 for attribute '{}' in entity '{}'",
170                    value, attr_name, entity_id
171                ))
172            }
173        }
174        DataType::I64 => {
175            if let Some(n) = value.as_i64() {
176                Ok(Value::Number(n.into()))
177            } else if let Some(n) = value.as_u64() {
178                if n <= i64::MAX as u64 {
179                    Ok(Value::Number((n as i64).into()))
180                } else {
181                    Err(format!(
182                        "Cannot convert {} to i64 (overflow) for attribute '{}' in entity '{}'",
183                        n, attr_name, entity_id
184                    ))
185                }
186            } else if let Some(f) = value.as_f64() {
187                // Note that floating point values whose absolute value
188                // is way larger than 2^53 are integers anyway, so every
189                // f64 value which is greater or equal to 2^63 and
190                // strictly smaller than 2^63 can be cast faithfully
191                // to i64. Since NaN values and infinities are not
192                // between -2^63 and 2^63, and since the `round` call
193                // cannot bring us close to absolutele value 2^63, the
194                // following code is correct:
195                if f >= -(2.0_f64.powi(63)) && f < 2.0_f64.powi(63) {
196                    Ok(Value::Number((f.round() as i64).into()))
197                } else {
198                    Err(format!(
199                        "Cannot convert {} to i64 for attribute '{}' in entity '{}'",
200                        f, attr_name, entity_id
201                    ))
202                }
203            } else if let Some(s) = value.as_str() {
204                s.parse::<i64>()
205                    .map(|n| Value::Number(n.into()))
206                    .map_err(|_| {
207                        format!(
208                            "Cannot parse '{}' as i64 for attribute '{}' in entity '{}'",
209                            s, attr_name, entity_id
210                        )
211                    })
212            } else {
213                Err(format!(
214                    "Cannot convert {:?} to i64 for attribute '{}' in entity '{}'",
215                    value, attr_name, entity_id
216                ))
217            }
218        }
219        DataType::F64 => {
220            let make_number = |raw: f64| {
221                serde_json::Number::from_f64(raw)
222                    .map(Value::Number)
223                    .ok_or_else(|| {
224                        format!(
225                            "Cannot represent '{}' as finite f64 for attribute '{}' in entity '{}'",
226                            raw, attr_name, entity_id
227                        )
228                    })
229            };
230
231            if let Some(f) = value.as_f64() {
232                make_number(f)
233            } else if let Some(n) = value.as_i64() {
234                make_number(n as f64)
235            } else if let Some(n) = value.as_u64() {
236                make_number(n as f64)
237            } else if let Some(s) = value.as_str() {
238                let parsed = s.parse::<f64>().map_err(|_| {
239                    format!(
240                        "Cannot parse '{}' as f64 for attribute '{}' in entity '{}'",
241                        s, attr_name, entity_id
242                    )
243                })?;
244                make_number(parsed)
245            } else {
246                Err(format!(
247                    "Cannot convert {:?} to f64 for attribute '{}' in entity '{}'",
248                    value, attr_name, entity_id
249                ))
250            }
251        }
252        DataType::JSON => {
253            // JSON type accepts anything
254            Ok(value.clone())
255        }
256    }
257}
258
259/// Get default value for a data type
260fn default_value_for_type(data_type: &DataType) -> Value {
261    match data_type {
262        DataType::Bool => Value::Bool(false),
263        DataType::String => Value::String(String::new()),
264        DataType::U64 => Value::Number(0.into()),
265        DataType::I64 => Value::Number(0.into()),
266        DataType::F64 => Value::Number(serde_json::Number::from_f64(0.0).unwrap()),
267        DataType::JSON => Value::Null,
268    }
269}
270
271/// An AQL query with bind variables
272#[derive(Clone, Debug)]
273pub struct AqlQuery {
274    pub query: String,
275    pub bind_vars: HashMap<String, Value>,
276}
277
278impl AqlQuery {
279    pub fn new(query: String, bind_vars: HashMap<String, Value>) -> Self {
280        AqlQuery { query, bind_vars }
281    }
282}
283
284/// AQL-based graph loader
285pub struct AqlGraphLoader {
286    db_config: DatabaseConfiguration,
287    batch_size: u64,
288    vertex_attributes: Vec<DataItem>,
289    edge_attributes: Vec<DataItem>,
290    queries: Vec<Vec<AqlQuery>>,
291}
292
293#[derive(Debug, Serialize, Deserialize)]
294struct CursorOptions {
295    stream: bool,
296}
297
298impl CursorOptions {
299    pub fn new(stream: bool) -> Self {
300        Self { stream }
301    }
302}
303
304#[derive(Debug, Serialize, Deserialize)]
305#[serde(rename_all = "camelCase")]
306struct CreateCursorBody {
307    query: String,
308    options: CursorOptions,
309    #[serde(skip_serializing_if = "Option::is_none")]
310    batch_size: Option<u64>,
311    #[serde(skip_serializing_if = "Option::is_none")]
312    bind_vars: Option<HashMap<String, Value>>,
313}
314
315impl CreateCursorBody {
316    pub fn from_streaming_query_with_size(
317        query: String,
318        batch_size: Option<u64>,
319        bind_vars: Option<HashMap<String, Value>>,
320    ) -> Self {
321        Self {
322            query,
323            batch_size,
324            options: CursorOptions::new(true),
325            bind_vars,
326        }
327    }
328}
329
330#[derive(Debug, Serialize, Deserialize)]
331#[serde(rename_all = "camelCase")]
332struct CursorResponse {
333    has_more: Option<bool>,
334    id: Option<String>,
335}
336
337#[derive(Debug, Serialize, Deserialize)]
338struct GraphData {
339    vertices: Option<Vec<Value>>,
340    edges: Option<Vec<Value>>,
341}
342
343impl AqlGraphLoader {
344    /// Create a new AQL graph loader
345    pub fn new(
346        db_config: DatabaseConfiguration,
347        batch_size: u64,
348        vertex_attributes: Vec<DataItem>,
349        edge_attributes: Vec<DataItem>,
350        queries: Vec<Vec<AqlQuery>>,
351    ) -> Result<Self, GraphLoaderError> {
352        // Validate that we have at least one query
353        if queries.is_empty() || queries.iter().all(|q| q.is_empty()) {
354            return Err(GraphLoaderError::Other(
355                "At least one AQL query must be provided".to_string(),
356            ));
357        }
358
359        Ok(AqlGraphLoader {
360            db_config,
361            batch_size,
362            vertex_attributes,
363            edge_attributes,
364            queries,
365        })
366    }
367
368    /// Execute all AQL queries and call the provided callback with vertices and edges
369    pub async fn do_load<F>(&self, callback: F) -> Result<(), GraphLoaderError>
370    where
371        F: Fn(&mut GraphBatch) -> Result<(), GraphLoaderError> + Send + Sync + Clone + 'static,
372    {
373        // Build the client
374        let use_tls = self.db_config.endpoints[0].starts_with("https://");
375        let client_config = ClientConfig::builder()
376            .n_retries(5)
377            .use_tls(use_tls)
378            .tls_cert_opt(self.db_config.tls_cert.clone())
379            .build();
380        let client = build_client(&client_config)?;
381
382        // Process each group of queries sequentially
383        for query_group in &self.queries {
384            self.execute_query_group(&client, query_group, callback.clone())
385                .await?;
386        }
387
388        Ok(())
389    }
390
391    async fn execute_query_group<F>(
392        &self,
393        client: &ClientWithMiddleware,
394        queries: &[AqlQuery],
395        callback: F,
396    ) -> Result<(), GraphLoaderError>
397    where
398        F: Fn(&mut GraphBatch) -> Result<(), GraphLoaderError> + Send + Sync + Clone + 'static,
399    {
400        // Create a channel for receiving data
401        let (sender, mut receiver) = tokio::sync::mpsc::channel::<Bytes>(10);
402
403        // Spawn consumer thread
404        let callback_clone = callback.clone();
405        let vertex_attributes = self.vertex_attributes.clone();
406        let edge_attributes = self.edge_attributes.clone();
407
408        let consumer = std::thread::spawn(move || -> Result<(), GraphLoaderError> {
409            while let Some(resp) = receiver.blocking_recv() {
410                let body = std::str::from_utf8(resp.as_ref())
411                    .map_err(|e| format!("UTF8 error when parsing body: {:?}", e))?;
412
413                // Parse the cursor response
414                let cursor_result: serde_json::Result<CursorResponse> = serde_json::from_str(body);
415                if cursor_result.is_err() {
416                    return Err(GraphLoaderError::ParseError(format!(
417                        "Failed to parse cursor response: {:?}",
418                        cursor_result.err()
419                    )));
420                }
421
422                // Parse the actual result data
423                let parsed: serde_json::Result<serde_json::Map<String, Value>> =
424                    serde_json::from_str(body);
425                if parsed.is_err() {
426                    return Err(GraphLoaderError::ParseError(format!(
427                        "Failed to parse result data: {:?}",
428                        parsed.err()
429                    )));
430                }
431
432                let data = parsed.unwrap();
433                let result = data.get("result");
434                if result.is_none() {
435                    continue;
436                }
437
438                let result_array = result.unwrap().as_array();
439                if result_array.is_none() {
440                    continue;
441                }
442
443                // Create a single batch for this database response
444                let mut batch = GraphBatch::new();
445
446                // Process each item in the result array and accumulate into the batch
447                for item in result_array.unwrap() {
448                    let graph_data: serde_json::Result<GraphData> =
449                        serde_json::from_value(item.clone());
450                    if let Ok(graph_data) = graph_data {
451                        // Extract and validate vertices
452                        if let Some(vertices) = graph_data.vertices {
453                            for vertex in vertices {
454                                if let Some(id) = vertex.get("_id")
455                                    && let Some(id_str) = id.as_str()
456                                {
457                                    batch.vertex_ids.push(id_str.as_bytes().to_vec());
458
459                                    // Extract and validate attributes only if there are any
460                                    if !vertex_attributes.is_empty() {
461                                        let mut attrs = Vec::new();
462                                        for attr_def in &vertex_attributes {
463                                            let raw_value = vertex
464                                                .get(&attr_def.name)
465                                                .cloned()
466                                                .unwrap_or(Value::Null);
467
468                                            match convert_and_validate(
469                                                &raw_value,
470                                                &attr_def.data_type,
471                                                &attr_def.name,
472                                                id_str,
473                                            ) {
474                                                Ok(converted) => attrs.push(converted),
475                                                Err(err_msg) => {
476                                                    batch.add_type_error(err_msg);
477                                                    attrs.push(default_value_for_type(
478                                                        &attr_def.data_type,
479                                                    ));
480                                                }
481                                            }
482                                        }
483                                        batch.vertex_attribute_values.push(attrs);
484                                    }
485                                }
486                            }
487                        }
488
489                        // Extract and validate edges
490                        if let Some(edges) = graph_data.edges {
491                            for edge in edges {
492                                // Skip null edges (e.g., from traversal start node)
493                                if edge.is_null() {
494                                    continue;
495                                }
496
497                                if let (Some(from), Some(to)) = (edge.get("_from"), edge.get("_to"))
498                                    && let (Some(from_str), Some(to_str)) =
499                                        (from.as_str(), to.as_str())
500                                {
501                                    let edge_id = format!("{}-->{}", from_str, to_str);
502                                    batch.edge_from_ids.push(from_str.as_bytes().to_vec());
503                                    batch.edge_to_ids.push(to_str.as_bytes().to_vec());
504
505                                    // Extract and validate attributes only if there are any
506                                    if !edge_attributes.is_empty() {
507                                        let mut attrs = Vec::new();
508                                        for attr_def in &edge_attributes {
509                                            let raw_value = edge
510                                                .get(&attr_def.name)
511                                                .cloned()
512                                                .unwrap_or(Value::Null);
513
514                                            match convert_and_validate(
515                                                &raw_value,
516                                                &attr_def.data_type,
517                                                &attr_def.name,
518                                                &edge_id,
519                                            ) {
520                                                Ok(converted) => attrs.push(converted),
521                                                Err(err_msg) => {
522                                                    batch.add_type_error(err_msg);
523                                                    attrs.push(default_value_for_type(
524                                                        &attr_def.data_type,
525                                                    ));
526                                                }
527                                            }
528                                        }
529                                        batch.edge_attribute_values.push(attrs);
530                                    }
531                                }
532                            }
533                        }
534                    }
535                }
536
537                // Call the callback once with the accumulated batch from this database response
538                callback_clone(&mut batch)?;
539            }
540            Ok(())
541        });
542
543        // Execute all queries in parallel
544        let mut task_set = JoinSet::new();
545
546        for query in queries {
547            let client_clone = client.clone();
548            let db_config = self.db_config.clone();
549            let query_clone = query.clone();
550            let batch_size = self.batch_size;
551            let sender_clone = sender.clone();
552
553            task_set.spawn(async move {
554                Self::execute_single_query(
555                    &client_clone,
556                    &db_config,
557                    &query_clone,
558                    batch_size,
559                    sender_clone,
560                )
561                .await
562            });
563        }
564
565        // Drop the original sender so the receiver can finish
566        drop(sender);
567
568        // Wait for all tasks to complete
569        let mut errors: Vec<String> = Vec::new();
570        while let Some(res) = task_set.join_next().await {
571            match res {
572                Ok(Ok(())) => {}
573                Ok(Err(e)) => {
574                    errors.push(e.to_string());
575                }
576                Err(e) => {
577                    errors.push(format!("Task join error: {}", e));
578                }
579            }
580        }
581
582        // Wait for consumer to finish
583        let consumer_result = consumer
584            .join()
585            .map_err(|_| GraphLoaderError::Other("Consumer thread panicked".to_string()))?;
586
587        if !errors.is_empty() {
588            return Err(GraphLoaderError::Other(format!(
589                "Errors occurred during query execution: {}",
590                errors.join("; ")
591            )));
592        }
593
594        consumer_result
595    }
596
597    async fn execute_single_query(
598        client: &ClientWithMiddleware,
599        db_config: &DatabaseConfiguration,
600        query: &AqlQuery,
601        batch_size: u64,
602        sender: tokio::sync::mpsc::Sender<Bytes>,
603    ) -> Result<(), GraphLoaderError> {
604        let make_cursor_url = |path: &str| -> String {
605            let suffix = "/_api/cursor".to_owned() + path;
606            make_url(db_config, suffix.as_str())
607        };
608
609        // Create cursor
610        let body = CreateCursorBody::from_streaming_query_with_size(
611            query.query.clone(),
612            Some(batch_size),
613            Some(query.bind_vars.clone()),
614        );
615        let body_v = serde_json::to_vec::<CreateCursorBody>(&body).map_err(|e| {
616            GraphLoaderError::ParseError(format!("Failed to serialize body: {}", e))
617        })?;
618
619        let url = make_cursor_url("");
620        let cursor_create_resp = handle_auth(client.post(url), db_config)
621            .body(body_v)
622            .send()
623            .await;
624
625        let response = cursor_create_resp?;
626        if !response.status().is_success() {
627            let status = response.status();
628            let body = response.text().await.unwrap_or_default();
629            return Err(GraphLoaderError::Other(format!(
630                "Cursor creation failed with status {}: {}",
631                status, body
632            )));
633        }
634
635        let bytes_res = response
636            .bytes()
637            .await
638            .map_err(|e| GraphLoaderError::ParseError(format!("Error reading response: {}", e)))?;
639
640        let response_info = serde_json::from_slice::<CursorResponse>(&bytes_res)
641            .map_err(|e| GraphLoaderError::ParseError(format!("Failed to parse cursor: {}", e)))?;
642
643        // Send first batch
644        sender
645            .send(bytes_res)
646            .await
647            .map_err(|e| GraphLoaderError::Other(format!("Failed to send data: {}", e)))?;
648
649        // Fetch remaining batches if needed
650        if let Some(cursor_id) = response_info.id
651            && response_info.has_more.unwrap_or(false)
652        {
653            loop {
654                let url = make_cursor_url(&format!("/{}", cursor_id));
655                let resp = handle_auth(client.post(url), db_config).send().await;
656
657                let resp =
658                    crate::request::handle_arangodb_response(resp, |c| c == StatusCode::OK).await?;
659
660                let bytes_res = resp.bytes().await.map_err(|e| {
661                    GraphLoaderError::ParseError(format!("Error reading response: {}", e))
662                })?;
663
664                let response_info =
665                    serde_json::from_slice::<CursorResponse>(&bytes_res).map_err(|e| {
666                        GraphLoaderError::ParseError(format!("Failed to parse cursor: {}", e))
667                    })?;
668
669                sender
670                    .send(bytes_res)
671                    .await
672                    .map_err(|e| GraphLoaderError::Other(format!("Failed to send data: {}", e)))?;
673
674                if !response_info.has_more.unwrap_or(false) {
675                    break;
676                }
677            }
678
679            // Clean up cursor, ignore result since cursors are cleaned up automatically
680            let delete_url = make_cursor_url(&format!("/{}", cursor_id));
681            let _ = handle_auth(client.delete(delete_url), db_config)
682                .send()
683                .await;
684        }
685
686        Ok(())
687    }
688}
689
690#[cfg(test)]
691mod tests {
692    use super::*;
693    use serde_json::json;
694
695    // Helper function to create a Value from any JSON input
696    fn val(v: serde_json::Value) -> Value {
697        v
698    }
699
700    #[test]
701    fn test_convert_bool_from_bool() {
702        let result = convert_and_validate(&val(json!(true)), &DataType::Bool, "field", "test_id");
703        assert_eq!(result, Ok(Value::Bool(true)));
704
705        let result = convert_and_validate(&val(json!(false)), &DataType::Bool, "field", "test_id");
706        assert_eq!(result, Ok(Value::Bool(false)));
707    }
708
709    #[test]
710    fn test_convert_bool_from_string() {
711        // Test various true representations
712        for s in &["true", "True", "TRUE", "1", "yes", "Yes", "YES"] {
713            let result = convert_and_validate(&val(json!(s)), &DataType::Bool, "field", "test_id");
714            assert_eq!(result, Ok(Value::Bool(true)), "Failed for string: {}", s);
715        }
716
717        // Test various false representations
718        for s in &["false", "False", "FALSE", "0", "no", "No", "NO"] {
719            let result = convert_and_validate(&val(json!(s)), &DataType::Bool, "field", "test_id");
720            assert_eq!(result, Ok(Value::Bool(false)), "Failed for string: {}", s);
721        }
722
723        // Test invalid string
724        let result =
725            convert_and_validate(&val(json!("maybe")), &DataType::Bool, "field", "test_id");
726        assert!(result.is_err());
727    }
728
729    #[test]
730    fn test_convert_bool_from_numbers() {
731        // From i64
732        let result = convert_and_validate(&val(json!(0)), &DataType::Bool, "field", "test_id");
733        assert_eq!(result, Ok(Value::Bool(false)));
734
735        let result = convert_and_validate(&val(json!(1)), &DataType::Bool, "field", "test_id");
736        assert_eq!(result, Ok(Value::Bool(true)));
737
738        let result = convert_and_validate(&val(json!(-5)), &DataType::Bool, "field", "test_id");
739        assert_eq!(result, Ok(Value::Bool(true)));
740
741        // From u64
742        let result = convert_and_validate(&val(json!(0u64)), &DataType::Bool, "field", "test_id");
743        assert_eq!(result, Ok(Value::Bool(false)));
744
745        let result = convert_and_validate(&val(json!(42u64)), &DataType::Bool, "field", "test_id");
746        assert_eq!(result, Ok(Value::Bool(true)));
747
748        // From f64
749        let result = convert_and_validate(&val(json!(0.0)), &DataType::Bool, "field", "test_id");
750        assert_eq!(result, Ok(Value::Bool(false)));
751
752        let result = convert_and_validate(&val(json!(3.15)), &DataType::Bool, "field", "test_id");
753        assert_eq!(result, Ok(Value::Bool(true)));
754    }
755
756    #[test]
757    fn test_convert_string_from_various_types() {
758        // From string
759        let result =
760            convert_and_validate(&val(json!("hello")), &DataType::String, "field", "test_id");
761        assert_eq!(result, Ok(Value::String("hello".to_string())));
762
763        // From null
764        let result = convert_and_validate(&val(json!(null)), &DataType::String, "field", "test_id");
765        assert_eq!(result, Ok(Value::String(String::new())));
766
767        // From bool
768        let result = convert_and_validate(&val(json!(true)), &DataType::String, "field", "test_id");
769        assert_eq!(result, Ok(Value::String("true".to_string())));
770
771        // From i64
772        let result = convert_and_validate(&val(json!(42)), &DataType::String, "field", "test_id");
773        assert_eq!(result, Ok(Value::String("42".to_string())));
774
775        // From u64
776        let result =
777            convert_and_validate(&val(json!(42u64)), &DataType::String, "field", "test_id");
778        assert_eq!(result, Ok(Value::String("42".to_string())));
779
780        // From f64
781        let result = convert_and_validate(&val(json!(3.15)), &DataType::String, "field", "test_id");
782        assert_eq!(result, Ok(Value::String("3.15".to_string())));
783
784        // From object/array (should convert to JSON string)
785        let result = convert_and_validate(
786            &val(json!({"key": "value"})),
787            &DataType::String,
788            "field",
789            "test_id",
790        );
791        assert!(result.is_ok());
792        if let Ok(Value::String(s)) = result {
793            assert!(s.contains("key"));
794            assert!(s.contains("value"));
795        }
796    }
797
798    #[test]
799    fn test_convert_u64_from_u64() {
800        let result = convert_and_validate(&val(json!(42u64)), &DataType::U64, "field", "test_id");
801        assert_eq!(result, Ok(json!(42)));
802
803        let result = convert_and_validate(&val(json!(0u64)), &DataType::U64, "field", "test_id");
804        assert_eq!(result, Ok(json!(0)));
805    }
806
807    #[test]
808    fn test_convert_u64_from_i64() {
809        // Positive i64 should work
810        let result = convert_and_validate(&val(json!(42)), &DataType::U64, "field", "test_id");
811        assert_eq!(result, Ok(json!(42)));
812
813        let result = convert_and_validate(&val(json!(0)), &DataType::U64, "field", "test_id");
814        assert_eq!(result, Ok(json!(0)));
815
816        // Negative i64 should fail
817        let result = convert_and_validate(&val(json!(-1)), &DataType::U64, "field", "test_id");
818        assert!(result.is_err());
819    }
820
821    #[test]
822    fn test_convert_u64_from_f64() {
823        // Positive float should round
824        let result = convert_and_validate(&val(json!(42.7)), &DataType::U64, "field", "test_id");
825        assert_eq!(result, Ok(json!(43)));
826
827        let result = convert_and_validate(&val(json!(42.3)), &DataType::U64, "field", "test_id");
828        assert_eq!(result, Ok(json!(42)));
829
830        // Negative float should fail
831        let result = convert_and_validate(&val(json!(-1.5)), &DataType::U64, "field", "test_id");
832        assert!(result.is_err());
833    }
834
835    #[test]
836    fn test_convert_u64_from_string() {
837        let result = convert_and_validate(&val(json!("42")), &DataType::U64, "field", "test_id");
838        assert_eq!(result, Ok(json!(42)));
839
840        // Invalid string should fail
841        let result = convert_and_validate(
842            &val(json!("not_a_number")),
843            &DataType::U64,
844            "field",
845            "test_id",
846        );
847        assert!(result.is_err());
848
849        // Negative string should fail
850        let result = convert_and_validate(&val(json!("-1")), &DataType::U64, "field", "test_id");
851        assert!(result.is_err());
852    }
853
854    #[test]
855    fn test_convert_i64_from_i64() {
856        let result = convert_and_validate(&val(json!(42)), &DataType::I64, "field", "test_id");
857        assert_eq!(result, Ok(json!(42)));
858
859        let result = convert_and_validate(&val(json!(-42)), &DataType::I64, "field", "test_id");
860        assert_eq!(result, Ok(json!(-42)));
861
862        let result = convert_and_validate(&val(json!(0)), &DataType::I64, "field", "test_id");
863        assert_eq!(result, Ok(json!(0)));
864    }
865
866    #[test]
867    fn test_convert_i64_from_u64() {
868        // u64 within i64 range should work
869        let result = convert_and_validate(&val(json!(42u64)), &DataType::I64, "field", "test_id");
870        assert_eq!(result, Ok(json!(42)));
871
872        // u64 exceeding i64::MAX should fail
873        let result =
874            convert_and_validate(&val(json!(u64::MAX)), &DataType::I64, "field", "test_id");
875        assert!(result.is_err());
876    }
877
878    #[test]
879    fn test_convert_i64_from_f64() {
880        // Float within range should round
881        let result = convert_and_validate(&val(json!(42.7)), &DataType::I64, "field", "test_id");
882        assert_eq!(result, Ok(json!(43)));
883
884        let result = convert_and_validate(&val(json!(-42.3)), &DataType::I64, "field", "test_id");
885        assert_eq!(result, Ok(json!(-42)));
886    }
887
888    #[test]
889    fn test_convert_i64_from_string() {
890        let result = convert_and_validate(&val(json!("42")), &DataType::I64, "field", "test_id");
891        assert_eq!(result, Ok(json!(42)));
892
893        let result = convert_and_validate(&val(json!("-42")), &DataType::I64, "field", "test_id");
894        assert_eq!(result, Ok(json!(-42)));
895
896        // Invalid string should fail
897        let result = convert_and_validate(
898            &val(json!("not_a_number")),
899            &DataType::I64,
900            "field",
901            "test_id",
902        );
903        assert!(result.is_err());
904    }
905
906    #[test]
907    fn test_convert_f64_from_f64() {
908        let result = convert_and_validate(&val(json!(3.15)), &DataType::F64, "field", "test_id");
909        assert!(result.is_ok());
910        if let Ok(Value::Number(n)) = result {
911            assert_eq!(n.as_f64(), Some(3.15));
912        }
913
914        let result = convert_and_validate(&val(json!(-2.5)), &DataType::F64, "field", "test_id");
915        assert!(result.is_ok());
916    }
917
918    #[test]
919    fn test_convert_f64_from_integers() {
920        // From i64
921        let result = convert_and_validate(&val(json!(42)), &DataType::F64, "field", "test_id");
922        assert!(result.is_ok());
923        if let Ok(Value::Number(n)) = result {
924            assert_eq!(n.as_f64(), Some(42.0));
925        }
926
927        // From u64
928        let result = convert_and_validate(&val(json!(42u64)), &DataType::F64, "field", "test_id");
929        assert!(result.is_ok());
930    }
931
932    #[test]
933    fn test_convert_f64_from_string() {
934        let result = convert_and_validate(&val(json!("3.15")), &DataType::F64, "field", "test_id");
935        assert!(result.is_ok());
936        if let Ok(Value::Number(n)) = result {
937            assert_eq!(n.as_f64(), Some(3.15));
938        }
939
940        let result = convert_and_validate(&val(json!("-2.5")), &DataType::F64, "field", "test_id");
941        assert!(result.is_ok());
942
943        // Invalid string should fail
944        let result = convert_and_validate(
945            &val(json!("not_a_number")),
946            &DataType::F64,
947            "field",
948            "test_id",
949        );
950        assert!(result.is_err());
951    }
952
953    #[test]
954    fn test_convert_json_accepts_anything() {
955        // JSON type should accept any value as-is
956        let test_values = vec![
957            json!(null),
958            json!(true),
959            json!(false),
960            json!(42),
961            json!(-42),
962            json!(3.15),
963            json!("hello"),
964            json!({"key": "value"}),
965            json!([1, 2, 3]),
966        ];
967
968        for value in test_values {
969            let result = convert_and_validate(&value, &DataType::JSON, "field", "test_id");
970            assert_eq!(result, Ok(value.clone()), "Failed for value: {:?}", value);
971        }
972    }
973
974    #[test]
975    fn test_default_values() {
976        assert_eq!(default_value_for_type(&DataType::Bool), Value::Bool(false));
977        assert_eq!(
978            default_value_for_type(&DataType::String),
979            Value::String(String::new())
980        );
981        assert_eq!(default_value_for_type(&DataType::U64), json!(0));
982        assert_eq!(default_value_for_type(&DataType::I64), json!(0));
983        assert!(default_value_for_type(&DataType::F64).is_number());
984        assert_eq!(default_value_for_type(&DataType::JSON), Value::Null);
985    }
986
987    #[test]
988    fn test_error_messages_contain_context() {
989        // Test that error messages include the attribute name and entity ID
990        let result = convert_and_validate(
991            &val(json!("invalid")),
992            &DataType::U64,
993            "my_field",
994            "entity_123",
995        );
996        assert!(result.is_err());
997        if let Err(msg) = result {
998            assert!(msg.contains("my_field"));
999            assert!(msg.contains("entity_123"));
1000        }
1001
1002        let result = convert_and_validate(&val(json!(-1)), &DataType::U64, "age", "user:42");
1003        assert!(result.is_err());
1004        if let Err(msg) = result {
1005            assert!(msg.contains("age"));
1006            assert!(msg.contains("user:42"));
1007        }
1008    }
1009
1010    #[test]
1011    fn test_graph_batch_type_error_tracking() {
1012        let mut batch = GraphBatch::new();
1013
1014        assert_eq!(batch.type_error_count, 0);
1015        assert_eq!(batch.type_error_messages.len(), 0);
1016
1017        // Add first error
1018        batch.add_type_error("Error 1".to_string());
1019        assert_eq!(batch.type_error_count, 1);
1020        assert_eq!(batch.type_error_messages.len(), 1);
1021
1022        // Add more errors
1023        for i in 2..=12 {
1024            batch.add_type_error(format!("Error {}", i));
1025        }
1026
1027        // Count should be 12, but messages capped at 10
1028        assert_eq!(batch.type_error_count, 12);
1029        assert_eq!(batch.type_error_messages.len(), 10);
1030    }
1031
1032    #[test]
1033    fn test_data_item_creation() {
1034        let item = DataItem::new("test_field".to_string(), DataType::String);
1035        assert_eq!(item.name, "test_field");
1036        assert_eq!(item.data_type, DataType::String);
1037    }
1038
1039    #[test]
1040    fn test_aql_query_creation() {
1041        let mut bind_vars = HashMap::new();
1042        bind_vars.insert("param1".to_string(), json!("value1"));
1043
1044        let query = AqlQuery::new("FOR v IN vertices RETURN v".to_string(), bind_vars.clone());
1045
1046        assert_eq!(query.query, "FOR v IN vertices RETURN v");
1047        assert_eq!(query.bind_vars.len(), 1);
1048        assert_eq!(query.bind_vars.get("param1"), Some(&json!("value1")));
1049    }
1050
1051    #[test]
1052    fn test_convert_bool_from_object_should_fail() {
1053        let result = convert_and_validate(
1054            &val(json!({"key": "value"})),
1055            &DataType::Bool,
1056            "field",
1057            "test_id",
1058        );
1059        assert!(result.is_err());
1060    }
1061
1062    #[test]
1063    fn test_convert_u64_from_object_should_fail() {
1064        let result = convert_and_validate(
1065            &val(json!({"key": "value"})),
1066            &DataType::U64,
1067            "field",
1068            "test_id",
1069        );
1070        assert!(result.is_err());
1071    }
1072
1073    #[test]
1074    fn test_convert_i64_from_object_should_fail() {
1075        let result = convert_and_validate(
1076            &val(json!({"key": "value"})),
1077            &DataType::I64,
1078            "field",
1079            "test_id",
1080        );
1081        assert!(result.is_err());
1082    }
1083
1084    #[test]
1085    fn test_convert_f64_from_object_should_fail() {
1086        let result = convert_and_validate(
1087            &val(json!({"key": "value"})),
1088            &DataType::F64,
1089            "field",
1090            "test_id",
1091        );
1092        assert!(result.is_err());
1093    }
1094}