1use crate::DatabaseConfiguration;
2use crate::client::auth::handle_auth;
3use crate::client::config::ClientConfig;
4use crate::client::{build_client, make_url};
5use crate::errors::GraphLoaderError;
6use bytes::Bytes;
7use reqwest::StatusCode;
8use reqwest_middleware::ClientWithMiddleware;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use std::collections::HashMap;
12use tokio::task::JoinSet;
13
14#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
16pub enum DataType {
17 Bool,
18 String,
19 U64,
20 I64,
21 F64,
22 JSON,
23}
24
25#[derive(Clone, Debug)]
27pub struct DataItem {
28 pub name: String,
29 pub data_type: DataType,
30}
31
32#[derive(Debug)]
34pub struct GraphBatch {
35 pub vertex_ids: Vec<Vec<u8>>,
37 pub vertex_attribute_values: Vec<Vec<Value>>,
39 pub edge_from_ids: Vec<Vec<u8>>,
41 pub edge_to_ids: Vec<Vec<u8>>,
43 pub edge_attribute_values: Vec<Vec<Value>>,
45 pub type_error_count: usize,
47 pub type_error_messages: Vec<String>,
49}
50
51impl DataItem {
52 pub fn new(name: String, data_type: DataType) -> Self {
53 DataItem { name, data_type }
54 }
55}
56
57impl GraphBatch {
58 fn new() -> Self {
59 GraphBatch {
60 vertex_ids: Vec::new(),
61 vertex_attribute_values: Vec::new(),
62 edge_from_ids: Vec::new(),
63 edge_to_ids: Vec::new(),
64 edge_attribute_values: Vec::new(),
65 type_error_count: 0,
66 type_error_messages: Vec::new(),
67 }
68 }
69
70 fn add_type_error(&mut self, message: String) {
71 self.type_error_count += 1;
72 if self.type_error_messages.len() < 10 {
73 self.type_error_messages.push(message);
74 }
75 }
76}
77
78fn convert_and_validate(
80 value: &Value,
81 expected_type: &DataType,
82 attr_name: &str,
83 entity_id: &str,
84) -> Result<Value, String> {
85 match expected_type {
86 DataType::Bool => {
87 if let Some(b) = value.as_bool() {
88 Ok(Value::Bool(b))
89 } else if let Some(s) = value.as_str() {
90 match s.to_lowercase().as_str() {
92 "true" | "1" | "yes" => Ok(Value::Bool(true)),
93 "false" | "0" | "no" => Ok(Value::Bool(false)),
94 _ => Err(format!(
95 "Cannot convert '{}' to bool for attribute '{}' in entity '{}'",
96 s, attr_name, entity_id
97 )),
98 }
99 } else if let Some(n) = value.as_i64() {
100 Ok(Value::Bool(n != 0))
101 } else if let Some(n) = value.as_u64() {
102 Ok(Value::Bool(n != 0))
103 } else if let Some(n) = value.as_f64() {
104 Ok(Value::Bool(n != 0.0 && n != -0.0))
105 } else {
106 Err(format!(
107 "Cannot convert {:?} to bool for attribute '{}' in entity '{}'",
108 value, attr_name, entity_id
109 ))
110 }
111 }
112 DataType::String => {
113 if let Some(s) = value.as_str() {
114 Ok(Value::String(s.to_string()))
115 } else if value.is_null() {
116 Ok(Value::String(String::new()))
117 } else if let Some(b) = value.as_bool() {
118 Ok(Value::String(b.to_string()))
119 } else if let Some(n) = value.as_i64() {
120 Ok(Value::String(n.to_string()))
121 } else if let Some(n) = value.as_u64() {
122 Ok(Value::String(n.to_string()))
123 } else if let Some(n) = value.as_f64() {
124 Ok(Value::String(n.to_string()))
125 } else {
126 Ok(Value::String(value.to_string()))
128 }
129 }
130 DataType::U64 => {
131 if let Some(n) = value.as_u64() {
132 Ok(Value::Number(n.into()))
133 } else if let Some(n) = value.as_i64() {
134 if n >= 0 {
135 Ok(Value::Number((n as u64).into()))
136 } else {
137 Err(format!(
138 "Cannot convert negative number {} to u64 for attribute '{}' in entity '{}'",
139 n, attr_name, entity_id
140 ))
141 }
142 } else if let Some(f) = value.as_f64() {
143 if f >= 0.0 && f < 2.0_f64.powi(64) {
151 Ok(Value::Number((f.round() as u64).into()))
152 } else {
153 Err(format!(
154 "Cannot convert {} to u64 for attribute '{}' in entity '{}'",
155 f, attr_name, entity_id
156 ))
157 }
158 } else if let Some(s) = value.as_str() {
159 s.parse::<u64>()
160 .map(|n| Value::Number(n.into()))
161 .map_err(|_| {
162 format!(
163 "Cannot parse '{}' as u64 for attribute '{}' in entity '{}'",
164 s, attr_name, entity_id
165 )
166 })
167 } else {
168 Err(format!(
169 "Cannot convert {:?} to u64 for attribute '{}' in entity '{}'",
170 value, attr_name, entity_id
171 ))
172 }
173 }
174 DataType::I64 => {
175 if let Some(n) = value.as_i64() {
176 Ok(Value::Number(n.into()))
177 } else if let Some(n) = value.as_u64() {
178 if n <= i64::MAX as u64 {
179 Ok(Value::Number((n as i64).into()))
180 } else {
181 Err(format!(
182 "Cannot convert {} to i64 (overflow) for attribute '{}' in entity '{}'",
183 n, attr_name, entity_id
184 ))
185 }
186 } else if let Some(f) = value.as_f64() {
187 if f >= -(2.0_f64.powi(63)) && f < 2.0_f64.powi(63) {
196 Ok(Value::Number((f.round() as i64).into()))
197 } else {
198 Err(format!(
199 "Cannot convert {} to i64 for attribute '{}' in entity '{}'",
200 f, attr_name, entity_id
201 ))
202 }
203 } else if let Some(s) = value.as_str() {
204 s.parse::<i64>()
205 .map(|n| Value::Number(n.into()))
206 .map_err(|_| {
207 format!(
208 "Cannot parse '{}' as i64 for attribute '{}' in entity '{}'",
209 s, attr_name, entity_id
210 )
211 })
212 } else {
213 Err(format!(
214 "Cannot convert {:?} to i64 for attribute '{}' in entity '{}'",
215 value, attr_name, entity_id
216 ))
217 }
218 }
219 DataType::F64 => {
220 let make_number = |raw: f64| {
221 serde_json::Number::from_f64(raw)
222 .map(Value::Number)
223 .ok_or_else(|| {
224 format!(
225 "Cannot represent '{}' as finite f64 for attribute '{}' in entity '{}'",
226 raw, attr_name, entity_id
227 )
228 })
229 };
230
231 if let Some(f) = value.as_f64() {
232 make_number(f)
233 } else if let Some(n) = value.as_i64() {
234 make_number(n as f64)
235 } else if let Some(n) = value.as_u64() {
236 make_number(n as f64)
237 } else if let Some(s) = value.as_str() {
238 let parsed = s.parse::<f64>().map_err(|_| {
239 format!(
240 "Cannot parse '{}' as f64 for attribute '{}' in entity '{}'",
241 s, attr_name, entity_id
242 )
243 })?;
244 make_number(parsed)
245 } else {
246 Err(format!(
247 "Cannot convert {:?} to f64 for attribute '{}' in entity '{}'",
248 value, attr_name, entity_id
249 ))
250 }
251 }
252 DataType::JSON => {
253 Ok(value.clone())
255 }
256 }
257}
258
259fn default_value_for_type(data_type: &DataType) -> Value {
261 match data_type {
262 DataType::Bool => Value::Bool(false),
263 DataType::String => Value::String(String::new()),
264 DataType::U64 => Value::Number(0.into()),
265 DataType::I64 => Value::Number(0.into()),
266 DataType::F64 => Value::Number(serde_json::Number::from_f64(0.0).unwrap()),
267 DataType::JSON => Value::Null,
268 }
269}
270
271#[derive(Clone, Debug)]
273pub struct AqlQuery {
274 pub query: String,
275 pub bind_vars: HashMap<String, Value>,
276}
277
278impl AqlQuery {
279 pub fn new(query: String, bind_vars: HashMap<String, Value>) -> Self {
280 AqlQuery { query, bind_vars }
281 }
282}
283
284pub struct AqlGraphLoader {
286 db_config: DatabaseConfiguration,
287 batch_size: u64,
288 vertex_attributes: Vec<DataItem>,
289 edge_attributes: Vec<DataItem>,
290 queries: Vec<Vec<AqlQuery>>,
291}
292
293#[derive(Debug, Serialize, Deserialize)]
294struct CursorOptions {
295 stream: bool,
296}
297
298impl CursorOptions {
299 pub fn new(stream: bool) -> Self {
300 Self { stream }
301 }
302}
303
304#[derive(Debug, Serialize, Deserialize)]
305#[serde(rename_all = "camelCase")]
306struct CreateCursorBody {
307 query: String,
308 options: CursorOptions,
309 #[serde(skip_serializing_if = "Option::is_none")]
310 batch_size: Option<u64>,
311 #[serde(skip_serializing_if = "Option::is_none")]
312 bind_vars: Option<HashMap<String, Value>>,
313}
314
315impl CreateCursorBody {
316 pub fn from_streaming_query_with_size(
317 query: String,
318 batch_size: Option<u64>,
319 bind_vars: Option<HashMap<String, Value>>,
320 ) -> Self {
321 Self {
322 query,
323 batch_size,
324 options: CursorOptions::new(true),
325 bind_vars,
326 }
327 }
328}
329
330#[derive(Debug, Serialize, Deserialize)]
331#[serde(rename_all = "camelCase")]
332struct CursorResponse {
333 has_more: Option<bool>,
334 id: Option<String>,
335}
336
337#[derive(Debug, Serialize, Deserialize)]
338struct GraphData {
339 vertices: Option<Vec<Value>>,
340 edges: Option<Vec<Value>>,
341}
342
343impl AqlGraphLoader {
344 pub fn new(
346 db_config: DatabaseConfiguration,
347 batch_size: u64,
348 vertex_attributes: Vec<DataItem>,
349 edge_attributes: Vec<DataItem>,
350 queries: Vec<Vec<AqlQuery>>,
351 ) -> Result<Self, GraphLoaderError> {
352 if queries.is_empty() || queries.iter().all(|q| q.is_empty()) {
354 return Err(GraphLoaderError::Other(
355 "At least one AQL query must be provided".to_string(),
356 ));
357 }
358
359 Ok(AqlGraphLoader {
360 db_config,
361 batch_size,
362 vertex_attributes,
363 edge_attributes,
364 queries,
365 })
366 }
367
368 pub async fn do_load<F>(&self, callback: F) -> Result<(), GraphLoaderError>
370 where
371 F: Fn(&mut GraphBatch) -> Result<(), GraphLoaderError> + Send + Sync + Clone + 'static,
372 {
373 let use_tls = self.db_config.endpoints[0].starts_with("https://");
375 let client_config = ClientConfig::builder()
376 .n_retries(5)
377 .use_tls(use_tls)
378 .tls_cert_opt(self.db_config.tls_cert.clone())
379 .build();
380 let client = build_client(&client_config)?;
381
382 for query_group in &self.queries {
384 self.execute_query_group(&client, query_group, callback.clone())
385 .await?;
386 }
387
388 Ok(())
389 }
390
391 async fn execute_query_group<F>(
392 &self,
393 client: &ClientWithMiddleware,
394 queries: &[AqlQuery],
395 callback: F,
396 ) -> Result<(), GraphLoaderError>
397 where
398 F: Fn(&mut GraphBatch) -> Result<(), GraphLoaderError> + Send + Sync + Clone + 'static,
399 {
400 let (sender, mut receiver) = tokio::sync::mpsc::channel::<Bytes>(10);
402
403 let callback_clone = callback.clone();
405 let vertex_attributes = self.vertex_attributes.clone();
406 let edge_attributes = self.edge_attributes.clone();
407
408 let consumer = std::thread::spawn(move || -> Result<(), GraphLoaderError> {
409 while let Some(resp) = receiver.blocking_recv() {
410 let body = std::str::from_utf8(resp.as_ref())
411 .map_err(|e| format!("UTF8 error when parsing body: {:?}", e))?;
412
413 let cursor_result: serde_json::Result<CursorResponse> = serde_json::from_str(body);
415 if cursor_result.is_err() {
416 return Err(GraphLoaderError::ParseError(format!(
417 "Failed to parse cursor response: {:?}",
418 cursor_result.err()
419 )));
420 }
421
422 let parsed: serde_json::Result<serde_json::Map<String, Value>> =
424 serde_json::from_str(body);
425 if parsed.is_err() {
426 return Err(GraphLoaderError::ParseError(format!(
427 "Failed to parse result data: {:?}",
428 parsed.err()
429 )));
430 }
431
432 let data = parsed.unwrap();
433 let result = data.get("result");
434 if result.is_none() {
435 continue;
436 }
437
438 let result_array = result.unwrap().as_array();
439 if result_array.is_none() {
440 continue;
441 }
442
443 let mut batch = GraphBatch::new();
445
446 for item in result_array.unwrap() {
448 let graph_data: serde_json::Result<GraphData> =
449 serde_json::from_value(item.clone());
450 if let Ok(graph_data) = graph_data {
451 if let Some(vertices) = graph_data.vertices {
453 for vertex in vertices {
454 if let Some(id) = vertex.get("_id")
455 && let Some(id_str) = id.as_str()
456 {
457 batch.vertex_ids.push(id_str.as_bytes().to_vec());
458
459 if !vertex_attributes.is_empty() {
461 let mut attrs = Vec::new();
462 for attr_def in &vertex_attributes {
463 let raw_value = vertex
464 .get(&attr_def.name)
465 .cloned()
466 .unwrap_or(Value::Null);
467
468 match convert_and_validate(
469 &raw_value,
470 &attr_def.data_type,
471 &attr_def.name,
472 id_str,
473 ) {
474 Ok(converted) => attrs.push(converted),
475 Err(err_msg) => {
476 batch.add_type_error(err_msg);
477 attrs.push(default_value_for_type(
478 &attr_def.data_type,
479 ));
480 }
481 }
482 }
483 batch.vertex_attribute_values.push(attrs);
484 }
485 }
486 }
487 }
488
489 if let Some(edges) = graph_data.edges {
491 for edge in edges {
492 if edge.is_null() {
494 continue;
495 }
496
497 if let (Some(from), Some(to)) = (edge.get("_from"), edge.get("_to"))
498 && let (Some(from_str), Some(to_str)) =
499 (from.as_str(), to.as_str())
500 {
501 let edge_id = format!("{}-->{}", from_str, to_str);
502 batch.edge_from_ids.push(from_str.as_bytes().to_vec());
503 batch.edge_to_ids.push(to_str.as_bytes().to_vec());
504
505 if !edge_attributes.is_empty() {
507 let mut attrs = Vec::new();
508 for attr_def in &edge_attributes {
509 let raw_value = edge
510 .get(&attr_def.name)
511 .cloned()
512 .unwrap_or(Value::Null);
513
514 match convert_and_validate(
515 &raw_value,
516 &attr_def.data_type,
517 &attr_def.name,
518 &edge_id,
519 ) {
520 Ok(converted) => attrs.push(converted),
521 Err(err_msg) => {
522 batch.add_type_error(err_msg);
523 attrs.push(default_value_for_type(
524 &attr_def.data_type,
525 ));
526 }
527 }
528 }
529 batch.edge_attribute_values.push(attrs);
530 }
531 }
532 }
533 }
534 }
535 }
536
537 callback_clone(&mut batch)?;
539 }
540 Ok(())
541 });
542
543 let mut task_set = JoinSet::new();
545
546 for query in queries {
547 let client_clone = client.clone();
548 let db_config = self.db_config.clone();
549 let query_clone = query.clone();
550 let batch_size = self.batch_size;
551 let sender_clone = sender.clone();
552
553 task_set.spawn(async move {
554 Self::execute_single_query(
555 &client_clone,
556 &db_config,
557 &query_clone,
558 batch_size,
559 sender_clone,
560 )
561 .await
562 });
563 }
564
565 drop(sender);
567
568 let mut errors: Vec<String> = Vec::new();
570 while let Some(res) = task_set.join_next().await {
571 match res {
572 Ok(Ok(())) => {}
573 Ok(Err(e)) => {
574 errors.push(e.to_string());
575 }
576 Err(e) => {
577 errors.push(format!("Task join error: {}", e));
578 }
579 }
580 }
581
582 let consumer_result = consumer
584 .join()
585 .map_err(|_| GraphLoaderError::Other("Consumer thread panicked".to_string()))?;
586
587 if !errors.is_empty() {
588 return Err(GraphLoaderError::Other(format!(
589 "Errors occurred during query execution: {}",
590 errors.join("; ")
591 )));
592 }
593
594 consumer_result
595 }
596
597 async fn execute_single_query(
598 client: &ClientWithMiddleware,
599 db_config: &DatabaseConfiguration,
600 query: &AqlQuery,
601 batch_size: u64,
602 sender: tokio::sync::mpsc::Sender<Bytes>,
603 ) -> Result<(), GraphLoaderError> {
604 let make_cursor_url = |path: &str| -> String {
605 let suffix = "/_api/cursor".to_owned() + path;
606 make_url(db_config, suffix.as_str())
607 };
608
609 let body = CreateCursorBody::from_streaming_query_with_size(
611 query.query.clone(),
612 Some(batch_size),
613 Some(query.bind_vars.clone()),
614 );
615 let body_v = serde_json::to_vec::<CreateCursorBody>(&body).map_err(|e| {
616 GraphLoaderError::ParseError(format!("Failed to serialize body: {}", e))
617 })?;
618
619 let url = make_cursor_url("");
620 let cursor_create_resp = handle_auth(client.post(url), db_config)
621 .body(body_v)
622 .send()
623 .await;
624
625 let response = cursor_create_resp?;
626 if !response.status().is_success() {
627 let status = response.status();
628 let body = response.text().await.unwrap_or_default();
629 return Err(GraphLoaderError::Other(format!(
630 "Cursor creation failed with status {}: {}",
631 status, body
632 )));
633 }
634
635 let bytes_res = response
636 .bytes()
637 .await
638 .map_err(|e| GraphLoaderError::ParseError(format!("Error reading response: {}", e)))?;
639
640 let response_info = serde_json::from_slice::<CursorResponse>(&bytes_res)
641 .map_err(|e| GraphLoaderError::ParseError(format!("Failed to parse cursor: {}", e)))?;
642
643 sender
645 .send(bytes_res)
646 .await
647 .map_err(|e| GraphLoaderError::Other(format!("Failed to send data: {}", e)))?;
648
649 if let Some(cursor_id) = response_info.id
651 && response_info.has_more.unwrap_or(false)
652 {
653 loop {
654 let url = make_cursor_url(&format!("/{}", cursor_id));
655 let resp = handle_auth(client.post(url), db_config).send().await;
656
657 let resp =
658 crate::request::handle_arangodb_response(resp, |c| c == StatusCode::OK).await?;
659
660 let bytes_res = resp.bytes().await.map_err(|e| {
661 GraphLoaderError::ParseError(format!("Error reading response: {}", e))
662 })?;
663
664 let response_info =
665 serde_json::from_slice::<CursorResponse>(&bytes_res).map_err(|e| {
666 GraphLoaderError::ParseError(format!("Failed to parse cursor: {}", e))
667 })?;
668
669 sender
670 .send(bytes_res)
671 .await
672 .map_err(|e| GraphLoaderError::Other(format!("Failed to send data: {}", e)))?;
673
674 if !response_info.has_more.unwrap_or(false) {
675 break;
676 }
677 }
678
679 let delete_url = make_cursor_url(&format!("/{}", cursor_id));
681 let _ = handle_auth(client.delete(delete_url), db_config)
682 .send()
683 .await;
684 }
685
686 Ok(())
687 }
688}
689
690#[cfg(test)]
691mod tests {
692 use super::*;
693 use serde_json::json;
694
695 fn val(v: serde_json::Value) -> Value {
697 v
698 }
699
700 #[test]
701 fn test_convert_bool_from_bool() {
702 let result = convert_and_validate(&val(json!(true)), &DataType::Bool, "field", "test_id");
703 assert_eq!(result, Ok(Value::Bool(true)));
704
705 let result = convert_and_validate(&val(json!(false)), &DataType::Bool, "field", "test_id");
706 assert_eq!(result, Ok(Value::Bool(false)));
707 }
708
709 #[test]
710 fn test_convert_bool_from_string() {
711 for s in &["true", "True", "TRUE", "1", "yes", "Yes", "YES"] {
713 let result = convert_and_validate(&val(json!(s)), &DataType::Bool, "field", "test_id");
714 assert_eq!(result, Ok(Value::Bool(true)), "Failed for string: {}", s);
715 }
716
717 for s in &["false", "False", "FALSE", "0", "no", "No", "NO"] {
719 let result = convert_and_validate(&val(json!(s)), &DataType::Bool, "field", "test_id");
720 assert_eq!(result, Ok(Value::Bool(false)), "Failed for string: {}", s);
721 }
722
723 let result =
725 convert_and_validate(&val(json!("maybe")), &DataType::Bool, "field", "test_id");
726 assert!(result.is_err());
727 }
728
729 #[test]
730 fn test_convert_bool_from_numbers() {
731 let result = convert_and_validate(&val(json!(0)), &DataType::Bool, "field", "test_id");
733 assert_eq!(result, Ok(Value::Bool(false)));
734
735 let result = convert_and_validate(&val(json!(1)), &DataType::Bool, "field", "test_id");
736 assert_eq!(result, Ok(Value::Bool(true)));
737
738 let result = convert_and_validate(&val(json!(-5)), &DataType::Bool, "field", "test_id");
739 assert_eq!(result, Ok(Value::Bool(true)));
740
741 let result = convert_and_validate(&val(json!(0u64)), &DataType::Bool, "field", "test_id");
743 assert_eq!(result, Ok(Value::Bool(false)));
744
745 let result = convert_and_validate(&val(json!(42u64)), &DataType::Bool, "field", "test_id");
746 assert_eq!(result, Ok(Value::Bool(true)));
747
748 let result = convert_and_validate(&val(json!(0.0)), &DataType::Bool, "field", "test_id");
750 assert_eq!(result, Ok(Value::Bool(false)));
751
752 let result = convert_and_validate(&val(json!(3.15)), &DataType::Bool, "field", "test_id");
753 assert_eq!(result, Ok(Value::Bool(true)));
754 }
755
756 #[test]
757 fn test_convert_string_from_various_types() {
758 let result =
760 convert_and_validate(&val(json!("hello")), &DataType::String, "field", "test_id");
761 assert_eq!(result, Ok(Value::String("hello".to_string())));
762
763 let result = convert_and_validate(&val(json!(null)), &DataType::String, "field", "test_id");
765 assert_eq!(result, Ok(Value::String(String::new())));
766
767 let result = convert_and_validate(&val(json!(true)), &DataType::String, "field", "test_id");
769 assert_eq!(result, Ok(Value::String("true".to_string())));
770
771 let result = convert_and_validate(&val(json!(42)), &DataType::String, "field", "test_id");
773 assert_eq!(result, Ok(Value::String("42".to_string())));
774
775 let result =
777 convert_and_validate(&val(json!(42u64)), &DataType::String, "field", "test_id");
778 assert_eq!(result, Ok(Value::String("42".to_string())));
779
780 let result = convert_and_validate(&val(json!(3.15)), &DataType::String, "field", "test_id");
782 assert_eq!(result, Ok(Value::String("3.15".to_string())));
783
784 let result = convert_and_validate(
786 &val(json!({"key": "value"})),
787 &DataType::String,
788 "field",
789 "test_id",
790 );
791 assert!(result.is_ok());
792 if let Ok(Value::String(s)) = result {
793 assert!(s.contains("key"));
794 assert!(s.contains("value"));
795 }
796 }
797
798 #[test]
799 fn test_convert_u64_from_u64() {
800 let result = convert_and_validate(&val(json!(42u64)), &DataType::U64, "field", "test_id");
801 assert_eq!(result, Ok(json!(42)));
802
803 let result = convert_and_validate(&val(json!(0u64)), &DataType::U64, "field", "test_id");
804 assert_eq!(result, Ok(json!(0)));
805 }
806
807 #[test]
808 fn test_convert_u64_from_i64() {
809 let result = convert_and_validate(&val(json!(42)), &DataType::U64, "field", "test_id");
811 assert_eq!(result, Ok(json!(42)));
812
813 let result = convert_and_validate(&val(json!(0)), &DataType::U64, "field", "test_id");
814 assert_eq!(result, Ok(json!(0)));
815
816 let result = convert_and_validate(&val(json!(-1)), &DataType::U64, "field", "test_id");
818 assert!(result.is_err());
819 }
820
821 #[test]
822 fn test_convert_u64_from_f64() {
823 let result = convert_and_validate(&val(json!(42.7)), &DataType::U64, "field", "test_id");
825 assert_eq!(result, Ok(json!(43)));
826
827 let result = convert_and_validate(&val(json!(42.3)), &DataType::U64, "field", "test_id");
828 assert_eq!(result, Ok(json!(42)));
829
830 let result = convert_and_validate(&val(json!(-1.5)), &DataType::U64, "field", "test_id");
832 assert!(result.is_err());
833 }
834
835 #[test]
836 fn test_convert_u64_from_string() {
837 let result = convert_and_validate(&val(json!("42")), &DataType::U64, "field", "test_id");
838 assert_eq!(result, Ok(json!(42)));
839
840 let result = convert_and_validate(
842 &val(json!("not_a_number")),
843 &DataType::U64,
844 "field",
845 "test_id",
846 );
847 assert!(result.is_err());
848
849 let result = convert_and_validate(&val(json!("-1")), &DataType::U64, "field", "test_id");
851 assert!(result.is_err());
852 }
853
854 #[test]
855 fn test_convert_i64_from_i64() {
856 let result = convert_and_validate(&val(json!(42)), &DataType::I64, "field", "test_id");
857 assert_eq!(result, Ok(json!(42)));
858
859 let result = convert_and_validate(&val(json!(-42)), &DataType::I64, "field", "test_id");
860 assert_eq!(result, Ok(json!(-42)));
861
862 let result = convert_and_validate(&val(json!(0)), &DataType::I64, "field", "test_id");
863 assert_eq!(result, Ok(json!(0)));
864 }
865
866 #[test]
867 fn test_convert_i64_from_u64() {
868 let result = convert_and_validate(&val(json!(42u64)), &DataType::I64, "field", "test_id");
870 assert_eq!(result, Ok(json!(42)));
871
872 let result =
874 convert_and_validate(&val(json!(u64::MAX)), &DataType::I64, "field", "test_id");
875 assert!(result.is_err());
876 }
877
878 #[test]
879 fn test_convert_i64_from_f64() {
880 let result = convert_and_validate(&val(json!(42.7)), &DataType::I64, "field", "test_id");
882 assert_eq!(result, Ok(json!(43)));
883
884 let result = convert_and_validate(&val(json!(-42.3)), &DataType::I64, "field", "test_id");
885 assert_eq!(result, Ok(json!(-42)));
886 }
887
888 #[test]
889 fn test_convert_i64_from_string() {
890 let result = convert_and_validate(&val(json!("42")), &DataType::I64, "field", "test_id");
891 assert_eq!(result, Ok(json!(42)));
892
893 let result = convert_and_validate(&val(json!("-42")), &DataType::I64, "field", "test_id");
894 assert_eq!(result, Ok(json!(-42)));
895
896 let result = convert_and_validate(
898 &val(json!("not_a_number")),
899 &DataType::I64,
900 "field",
901 "test_id",
902 );
903 assert!(result.is_err());
904 }
905
906 #[test]
907 fn test_convert_f64_from_f64() {
908 let result = convert_and_validate(&val(json!(3.15)), &DataType::F64, "field", "test_id");
909 assert!(result.is_ok());
910 if let Ok(Value::Number(n)) = result {
911 assert_eq!(n.as_f64(), Some(3.15));
912 }
913
914 let result = convert_and_validate(&val(json!(-2.5)), &DataType::F64, "field", "test_id");
915 assert!(result.is_ok());
916 }
917
918 #[test]
919 fn test_convert_f64_from_integers() {
920 let result = convert_and_validate(&val(json!(42)), &DataType::F64, "field", "test_id");
922 assert!(result.is_ok());
923 if let Ok(Value::Number(n)) = result {
924 assert_eq!(n.as_f64(), Some(42.0));
925 }
926
927 let result = convert_and_validate(&val(json!(42u64)), &DataType::F64, "field", "test_id");
929 assert!(result.is_ok());
930 }
931
932 #[test]
933 fn test_convert_f64_from_string() {
934 let result = convert_and_validate(&val(json!("3.15")), &DataType::F64, "field", "test_id");
935 assert!(result.is_ok());
936 if let Ok(Value::Number(n)) = result {
937 assert_eq!(n.as_f64(), Some(3.15));
938 }
939
940 let result = convert_and_validate(&val(json!("-2.5")), &DataType::F64, "field", "test_id");
941 assert!(result.is_ok());
942
943 let result = convert_and_validate(
945 &val(json!("not_a_number")),
946 &DataType::F64,
947 "field",
948 "test_id",
949 );
950 assert!(result.is_err());
951 }
952
953 #[test]
954 fn test_convert_json_accepts_anything() {
955 let test_values = vec![
957 json!(null),
958 json!(true),
959 json!(false),
960 json!(42),
961 json!(-42),
962 json!(3.15),
963 json!("hello"),
964 json!({"key": "value"}),
965 json!([1, 2, 3]),
966 ];
967
968 for value in test_values {
969 let result = convert_and_validate(&value, &DataType::JSON, "field", "test_id");
970 assert_eq!(result, Ok(value.clone()), "Failed for value: {:?}", value);
971 }
972 }
973
974 #[test]
975 fn test_default_values() {
976 assert_eq!(default_value_for_type(&DataType::Bool), Value::Bool(false));
977 assert_eq!(
978 default_value_for_type(&DataType::String),
979 Value::String(String::new())
980 );
981 assert_eq!(default_value_for_type(&DataType::U64), json!(0));
982 assert_eq!(default_value_for_type(&DataType::I64), json!(0));
983 assert!(default_value_for_type(&DataType::F64).is_number());
984 assert_eq!(default_value_for_type(&DataType::JSON), Value::Null);
985 }
986
987 #[test]
988 fn test_error_messages_contain_context() {
989 let result = convert_and_validate(
991 &val(json!("invalid")),
992 &DataType::U64,
993 "my_field",
994 "entity_123",
995 );
996 assert!(result.is_err());
997 if let Err(msg) = result {
998 assert!(msg.contains("my_field"));
999 assert!(msg.contains("entity_123"));
1000 }
1001
1002 let result = convert_and_validate(&val(json!(-1)), &DataType::U64, "age", "user:42");
1003 assert!(result.is_err());
1004 if let Err(msg) = result {
1005 assert!(msg.contains("age"));
1006 assert!(msg.contains("user:42"));
1007 }
1008 }
1009
1010 #[test]
1011 fn test_graph_batch_type_error_tracking() {
1012 let mut batch = GraphBatch::new();
1013
1014 assert_eq!(batch.type_error_count, 0);
1015 assert_eq!(batch.type_error_messages.len(), 0);
1016
1017 batch.add_type_error("Error 1".to_string());
1019 assert_eq!(batch.type_error_count, 1);
1020 assert_eq!(batch.type_error_messages.len(), 1);
1021
1022 for i in 2..=12 {
1024 batch.add_type_error(format!("Error {}", i));
1025 }
1026
1027 assert_eq!(batch.type_error_count, 12);
1029 assert_eq!(batch.type_error_messages.len(), 10);
1030 }
1031
1032 #[test]
1033 fn test_data_item_creation() {
1034 let item = DataItem::new("test_field".to_string(), DataType::String);
1035 assert_eq!(item.name, "test_field");
1036 assert_eq!(item.data_type, DataType::String);
1037 }
1038
1039 #[test]
1040 fn test_aql_query_creation() {
1041 let mut bind_vars = HashMap::new();
1042 bind_vars.insert("param1".to_string(), json!("value1"));
1043
1044 let query = AqlQuery::new("FOR v IN vertices RETURN v".to_string(), bind_vars.clone());
1045
1046 assert_eq!(query.query, "FOR v IN vertices RETURN v");
1047 assert_eq!(query.bind_vars.len(), 1);
1048 assert_eq!(query.bind_vars.get("param1"), Some(&json!("value1")));
1049 }
1050
1051 #[test]
1052 fn test_convert_bool_from_object_should_fail() {
1053 let result = convert_and_validate(
1054 &val(json!({"key": "value"})),
1055 &DataType::Bool,
1056 "field",
1057 "test_id",
1058 );
1059 assert!(result.is_err());
1060 }
1061
1062 #[test]
1063 fn test_convert_u64_from_object_should_fail() {
1064 let result = convert_and_validate(
1065 &val(json!({"key": "value"})),
1066 &DataType::U64,
1067 "field",
1068 "test_id",
1069 );
1070 assert!(result.is_err());
1071 }
1072
1073 #[test]
1074 fn test_convert_i64_from_object_should_fail() {
1075 let result = convert_and_validate(
1076 &val(json!({"key": "value"})),
1077 &DataType::I64,
1078 "field",
1079 "test_id",
1080 );
1081 assert!(result.is_err());
1082 }
1083
1084 #[test]
1085 fn test_convert_f64_from_object_should_fail() {
1086 let result = convert_and_validate(
1087 &val(json!({"key": "value"})),
1088 &DataType::F64,
1089 "field",
1090 "test_id",
1091 );
1092 assert!(result.is_err());
1093 }
1094}