1use crate::DatabaseConfiguration;
2use crate::client::auth::handle_auth;
3use crate::client::config::ClientConfig;
4use crate::client::{build_client, make_url};
5use crate::errors::GraphLoaderError;
6use bytes::Bytes;
7use reqwest::StatusCode;
8use reqwest_middleware::ClientWithMiddleware;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use std::collections::HashMap;
12use tokio::task::JoinSet;
13
14#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
16pub enum DataType {
17 Bool,
18 String,
19 U64,
20 I64,
21 F64,
22 JSON,
23}
24
25#[derive(Clone, Debug)]
27pub struct DataItem {
28 pub name: String,
29 pub data_type: DataType,
30}
31
32#[derive(Debug)]
34pub struct GraphBatch {
35 pub vertex_ids: Vec<Vec<u8>>,
37 pub vertex_attribute_values: Vec<Vec<Value>>,
39 pub edge_from_ids: Vec<Vec<u8>>,
41 pub edge_to_ids: Vec<Vec<u8>>,
43 pub edge_attribute_values: Vec<Vec<Value>>,
45 pub type_error_count: usize,
47 pub type_error_messages: Vec<String>,
49 max_type_errors: Option<u64>,
51}
52
53impl DataItem {
54 pub fn new(name: String, data_type: DataType) -> Self {
55 DataItem { name, data_type }
56 }
57}
58
59impl GraphBatch {
60 fn new(max_type_errors: Option<u64>) -> Self {
61 GraphBatch {
62 vertex_ids: Vec::new(),
63 vertex_attribute_values: Vec::new(),
64 edge_from_ids: Vec::new(),
65 edge_to_ids: Vec::new(),
66 edge_attribute_values: Vec::new(),
67 type_error_count: 0,
68 type_error_messages: Vec::new(),
69 max_type_errors,
70 }
71 }
72
73 fn add_type_error(&mut self, message: String) {
74 self.type_error_count += 1;
75 if let Some(max) = self.max_type_errors {
77 if (self.type_error_messages.len() as u64) < max {
78 self.type_error_messages.push(message);
79 }
80 } else {
81 self.type_error_messages.push(message);
83 }
84 }
85}
86
87fn convert_and_validate(
89 value: &Value,
90 expected_type: &DataType,
91 attr_name: &str,
92 entity_id: &str,
93) -> Result<Value, String> {
94 match expected_type {
95 DataType::Bool => {
96 if let Some(b) = value.as_bool() {
97 Ok(Value::Bool(b))
98 } else if let Some(s) = value.as_str() {
99 match s.to_lowercase().as_str() {
101 "true" | "1" | "yes" => Ok(Value::Bool(true)),
102 "false" | "0" | "no" => Ok(Value::Bool(false)),
103 _ => Err(format!(
104 "Cannot convert '{}' to bool for attribute '{}' in entity '{}'",
105 s, attr_name, entity_id
106 )),
107 }
108 } else if let Some(n) = value.as_i64() {
109 Ok(Value::Bool(n != 0))
110 } else if let Some(n) = value.as_u64() {
111 Ok(Value::Bool(n != 0))
112 } else if let Some(n) = value.as_f64() {
113 Ok(Value::Bool(n != 0.0 && n != -0.0))
114 } else {
115 Err(format!(
116 "Cannot convert {:?} to bool for attribute '{}' in entity '{}'",
117 value, attr_name, entity_id
118 ))
119 }
120 }
121 DataType::String => {
122 if let Some(s) = value.as_str() {
123 Ok(Value::String(s.to_string()))
124 } else if value.is_null() {
125 Ok(Value::String(String::new()))
126 } else if let Some(b) = value.as_bool() {
127 Ok(Value::String(b.to_string()))
128 } else if let Some(n) = value.as_i64() {
129 Ok(Value::String(n.to_string()))
130 } else if let Some(n) = value.as_u64() {
131 Ok(Value::String(n.to_string()))
132 } else if let Some(n) = value.as_f64() {
133 Ok(Value::String(n.to_string()))
134 } else {
135 Ok(Value::String(value.to_string()))
137 }
138 }
139 DataType::U64 => {
140 if let Some(n) = value.as_u64() {
141 Ok(Value::Number(n.into()))
142 } else if let Some(n) = value.as_i64() {
143 if n >= 0 {
144 Ok(Value::Number((n as u64).into()))
145 } else {
146 Err(format!(
147 "Cannot convert negative number {} to u64 for attribute '{}' in entity '{}'",
148 n, attr_name, entity_id
149 ))
150 }
151 } else if let Some(f) = value.as_f64() {
152 if f >= 0.0 && f < 2.0_f64.powi(64) {
160 Ok(Value::Number((f.round() as u64).into()))
161 } else {
162 Err(format!(
163 "Cannot convert {} to u64 for attribute '{}' in entity '{}'",
164 f, attr_name, entity_id
165 ))
166 }
167 } else if let Some(s) = value.as_str() {
168 s.parse::<u64>()
169 .map(|n| Value::Number(n.into()))
170 .map_err(|_| {
171 format!(
172 "Cannot parse '{}' as u64 for attribute '{}' in entity '{}'",
173 s, attr_name, entity_id
174 )
175 })
176 } else {
177 Err(format!(
178 "Cannot convert {:?} to u64 for attribute '{}' in entity '{}'",
179 value, attr_name, entity_id
180 ))
181 }
182 }
183 DataType::I64 => {
184 if let Some(n) = value.as_i64() {
185 Ok(Value::Number(n.into()))
186 } else if let Some(n) = value.as_u64() {
187 if n <= i64::MAX as u64 {
188 Ok(Value::Number((n as i64).into()))
189 } else {
190 Err(format!(
191 "Cannot convert {} to i64 (overflow) for attribute '{}' in entity '{}'",
192 n, attr_name, entity_id
193 ))
194 }
195 } else if let Some(f) = value.as_f64() {
196 if f >= -(2.0_f64.powi(63)) && f < 2.0_f64.powi(63) {
205 Ok(Value::Number((f.round() as i64).into()))
206 } else {
207 Err(format!(
208 "Cannot convert {} to i64 for attribute '{}' in entity '{}'",
209 f, attr_name, entity_id
210 ))
211 }
212 } else if let Some(s) = value.as_str() {
213 s.parse::<i64>()
214 .map(|n| Value::Number(n.into()))
215 .map_err(|_| {
216 format!(
217 "Cannot parse '{}' as i64 for attribute '{}' in entity '{}'",
218 s, attr_name, entity_id
219 )
220 })
221 } else {
222 Err(format!(
223 "Cannot convert {:?} to i64 for attribute '{}' in entity '{}'",
224 value, attr_name, entity_id
225 ))
226 }
227 }
228 DataType::F64 => {
229 let make_number = |raw: f64| {
230 serde_json::Number::from_f64(raw)
231 .map(Value::Number)
232 .ok_or_else(|| {
233 format!(
234 "Cannot represent '{}' as finite f64 for attribute '{}' in entity '{}'",
235 raw, attr_name, entity_id
236 )
237 })
238 };
239
240 if let Some(f) = value.as_f64() {
241 make_number(f)
242 } else if let Some(n) = value.as_i64() {
243 make_number(n as f64)
244 } else if let Some(n) = value.as_u64() {
245 make_number(n as f64)
246 } else if let Some(s) = value.as_str() {
247 let parsed = s.parse::<f64>().map_err(|_| {
248 format!(
249 "Cannot parse '{}' as f64 for attribute '{}' in entity '{}'",
250 s, attr_name, entity_id
251 )
252 })?;
253 make_number(parsed)
254 } else {
255 Err(format!(
256 "Cannot convert {:?} to f64 for attribute '{}' in entity '{}'",
257 value, attr_name, entity_id
258 ))
259 }
260 }
261 DataType::JSON => {
262 Ok(value.clone())
264 }
265 }
266}
267
268fn default_value_for_type(data_type: &DataType) -> Value {
270 match data_type {
271 DataType::Bool => Value::Bool(false),
272 DataType::String => Value::String(String::new()),
273 DataType::U64 => Value::Number(0.into()),
274 DataType::I64 => Value::Number(0.into()),
275 DataType::F64 => Value::Number(serde_json::Number::from_f64(0.0).unwrap()),
276 DataType::JSON => Value::Null,
277 }
278}
279
280#[derive(Clone, Debug)]
282pub struct AqlQuery {
283 pub query: String,
284 pub bind_vars: HashMap<String, Value>,
285}
286
287impl AqlQuery {
288 pub fn new(query: String, bind_vars: HashMap<String, Value>) -> Self {
289 AqlQuery { query, bind_vars }
290 }
291}
292
293pub struct AqlGraphLoader {
295 db_config: DatabaseConfiguration,
296 batch_size: u64,
297 vertex_attributes: Vec<DataItem>,
298 edge_attributes: Vec<DataItem>,
299 queries: Vec<Vec<AqlQuery>>,
300 max_type_errors: Option<u64>,
302}
303
304#[derive(Debug, Serialize, Deserialize)]
305struct CursorOptions {
306 stream: bool,
307}
308
309impl CursorOptions {
310 pub fn new(stream: bool) -> Self {
311 Self { stream }
312 }
313}
314
315#[derive(Debug, Serialize, Deserialize)]
316#[serde(rename_all = "camelCase")]
317struct CreateCursorBody {
318 query: String,
319 options: CursorOptions,
320 #[serde(skip_serializing_if = "Option::is_none")]
321 batch_size: Option<u64>,
322 #[serde(skip_serializing_if = "Option::is_none")]
323 bind_vars: Option<HashMap<String, Value>>,
324}
325
326impl CreateCursorBody {
327 pub fn from_streaming_query_with_size(
328 query: String,
329 batch_size: Option<u64>,
330 bind_vars: Option<HashMap<String, Value>>,
331 ) -> Self {
332 Self {
333 query,
334 batch_size,
335 options: CursorOptions::new(true),
336 bind_vars,
337 }
338 }
339}
340
341#[derive(Debug, Serialize, Deserialize)]
342#[serde(rename_all = "camelCase")]
343struct CursorResponse {
344 has_more: Option<bool>,
345 id: Option<String>,
346}
347
348#[derive(Debug, Serialize, Deserialize)]
349struct GraphData {
350 vertices: Option<Vec<Value>>,
351 edges: Option<Vec<Value>>,
352}
353
354impl AqlGraphLoader {
355 pub fn new(
368 db_config: DatabaseConfiguration,
369 batch_size: u64,
370 vertex_attributes: Vec<DataItem>,
371 edge_attributes: Vec<DataItem>,
372 queries: Vec<Vec<AqlQuery>>,
373 max_type_errors: Option<u64>,
374 ) -> Result<Self, GraphLoaderError> {
375 if queries.is_empty() || queries.iter().all(|q| q.is_empty()) {
377 return Err(GraphLoaderError::Other(
378 "At least one AQL query must be provided".to_string(),
379 ));
380 }
381
382 Ok(AqlGraphLoader {
383 db_config,
384 batch_size,
385 vertex_attributes,
386 edge_attributes,
387 queries,
388 max_type_errors,
389 })
390 }
391
392 pub async fn do_load<F>(&self, callback: F) -> Result<(), GraphLoaderError>
394 where
395 F: Fn(&mut GraphBatch) -> Result<(), GraphLoaderError> + Send + Sync + Clone + 'static,
396 {
397 let use_tls = self.db_config.endpoints[0].starts_with("https://");
399 let client_config = ClientConfig::builder()
400 .n_retries(5)
401 .use_tls(use_tls)
402 .tls_cert_opt(self.db_config.tls_cert.clone())
403 .build();
404 let client = build_client(&client_config)?;
405
406 for query_group in &self.queries {
408 self.execute_query_group(&client, query_group, callback.clone())
409 .await?;
410 }
411
412 Ok(())
413 }
414
415 async fn execute_query_group<F>(
416 &self,
417 client: &ClientWithMiddleware,
418 queries: &[AqlQuery],
419 callback: F,
420 ) -> Result<(), GraphLoaderError>
421 where
422 F: Fn(&mut GraphBatch) -> Result<(), GraphLoaderError> + Send + Sync + Clone + 'static,
423 {
424 let (sender, mut receiver) = tokio::sync::mpsc::channel::<Bytes>(10);
426
427 let callback_clone = callback.clone();
429 let vertex_attributes = self.vertex_attributes.clone();
430 let edge_attributes = self.edge_attributes.clone();
431 let max_type_errors = self.max_type_errors;
432
433 let consumer = std::thread::spawn(move || -> Result<(), GraphLoaderError> {
434 while let Some(resp) = receiver.blocking_recv() {
435 let body = std::str::from_utf8(resp.as_ref())
436 .map_err(|e| format!("UTF8 error when parsing body: {:?}", e))?;
437
438 let cursor_result: serde_json::Result<CursorResponse> = serde_json::from_str(body);
440 if cursor_result.is_err() {
441 return Err(GraphLoaderError::ParseError(format!(
442 "Failed to parse cursor response: {:?}",
443 cursor_result.err()
444 )));
445 }
446
447 let parsed: serde_json::Result<serde_json::Map<String, Value>> =
449 serde_json::from_str(body);
450 if parsed.is_err() {
451 return Err(GraphLoaderError::ParseError(format!(
452 "Failed to parse result data: {:?}",
453 parsed.err()
454 )));
455 }
456
457 let data = parsed.unwrap();
458 let result = data.get("result");
459 if result.is_none() {
460 continue;
461 }
462
463 let result_array = result.unwrap().as_array();
464 if result_array.is_none() {
465 continue;
466 }
467
468 let mut batch = GraphBatch::new(max_type_errors);
470
471 for item in result_array.unwrap() {
473 let graph_data: serde_json::Result<GraphData> =
474 serde_json::from_value(item.clone());
475 if let Ok(graph_data) = graph_data {
476 if let Some(vertices) = graph_data.vertices {
478 for vertex in vertices {
479 if let Some(id) = vertex.get("_id")
480 && let Some(id_str) = id.as_str()
481 {
482 batch.vertex_ids.push(id_str.as_bytes().to_vec());
483
484 if !vertex_attributes.is_empty() {
486 let mut attrs = Vec::new();
487 for attr_def in &vertex_attributes {
488 let raw_value = vertex
489 .get(&attr_def.name)
490 .cloned()
491 .unwrap_or(Value::Null);
492
493 match convert_and_validate(
494 &raw_value,
495 &attr_def.data_type,
496 &attr_def.name,
497 id_str,
498 ) {
499 Ok(converted) => attrs.push(converted),
500 Err(err_msg) => {
501 batch.add_type_error(err_msg);
502 attrs.push(default_value_for_type(
503 &attr_def.data_type,
504 ));
505 }
506 }
507 }
508 batch.vertex_attribute_values.push(attrs);
509 }
510 }
511 }
512 }
513
514 if let Some(edges) = graph_data.edges {
516 for edge in edges {
517 if edge.is_null() {
519 continue;
520 }
521
522 if let (Some(from), Some(to)) = (edge.get("_from"), edge.get("_to"))
523 && let (Some(from_str), Some(to_str)) =
524 (from.as_str(), to.as_str())
525 {
526 let edge_id = format!("{}-->{}", from_str, to_str);
527 batch.edge_from_ids.push(from_str.as_bytes().to_vec());
528 batch.edge_to_ids.push(to_str.as_bytes().to_vec());
529
530 if !edge_attributes.is_empty() {
532 let mut attrs = Vec::new();
533 for attr_def in &edge_attributes {
534 let raw_value = edge
535 .get(&attr_def.name)
536 .cloned()
537 .unwrap_or(Value::Null);
538
539 match convert_and_validate(
540 &raw_value,
541 &attr_def.data_type,
542 &attr_def.name,
543 &edge_id,
544 ) {
545 Ok(converted) => attrs.push(converted),
546 Err(err_msg) => {
547 batch.add_type_error(err_msg);
548 attrs.push(default_value_for_type(
549 &attr_def.data_type,
550 ));
551 }
552 }
553 }
554 batch.edge_attribute_values.push(attrs);
555 }
556 }
557 }
558 }
559 }
560 }
561
562 callback_clone(&mut batch)?;
564 }
565 Ok(())
566 });
567
568 let mut task_set = JoinSet::new();
570
571 for query in queries {
572 let client_clone = client.clone();
573 let db_config = self.db_config.clone();
574 let query_clone = query.clone();
575 let batch_size = self.batch_size;
576 let sender_clone = sender.clone();
577
578 task_set.spawn(async move {
579 Self::execute_single_query(
580 &client_clone,
581 &db_config,
582 &query_clone,
583 batch_size,
584 sender_clone,
585 )
586 .await
587 });
588 }
589
590 drop(sender);
592
593 let mut errors: Vec<String> = Vec::new();
595 while let Some(res) = task_set.join_next().await {
596 match res {
597 Ok(Ok(())) => {}
598 Ok(Err(e)) => {
599 errors.push(e.to_string());
600 }
601 Err(e) => {
602 errors.push(format!("Task join error: {}", e));
603 }
604 }
605 }
606
607 let consumer_result = consumer
609 .join()
610 .map_err(|_| GraphLoaderError::Other("Consumer thread panicked".to_string()))?;
611
612 if !errors.is_empty() {
613 return Err(GraphLoaderError::Other(format!(
614 "Errors occurred during query execution: {}",
615 errors.join("; ")
616 )));
617 }
618
619 consumer_result
620 }
621
622 async fn execute_single_query(
623 client: &ClientWithMiddleware,
624 db_config: &DatabaseConfiguration,
625 query: &AqlQuery,
626 batch_size: u64,
627 sender: tokio::sync::mpsc::Sender<Bytes>,
628 ) -> Result<(), GraphLoaderError> {
629 let make_cursor_url = |path: &str| -> String {
630 let suffix = "/_api/cursor".to_owned() + path;
631 make_url(db_config, suffix.as_str())
632 };
633
634 let body = CreateCursorBody::from_streaming_query_with_size(
636 query.query.clone(),
637 Some(batch_size),
638 Some(query.bind_vars.clone()),
639 );
640 let body_v = serde_json::to_vec::<CreateCursorBody>(&body).map_err(|e| {
641 GraphLoaderError::ParseError(format!("Failed to serialize body: {}", e))
642 })?;
643
644 let url = make_cursor_url("");
645 let cursor_create_resp = handle_auth(client.post(url), db_config)
646 .body(body_v)
647 .send()
648 .await;
649
650 let response = cursor_create_resp?;
651 if !response.status().is_success() {
652 let status = response.status();
653 let body = response.text().await.unwrap_or_default();
654 return Err(GraphLoaderError::Other(format!(
655 "Cursor creation failed with status {}: {}",
656 status, body
657 )));
658 }
659
660 let bytes_res = response
661 .bytes()
662 .await
663 .map_err(|e| GraphLoaderError::ParseError(format!("Error reading response: {}", e)))?;
664
665 let response_info = serde_json::from_slice::<CursorResponse>(&bytes_res)
666 .map_err(|e| GraphLoaderError::ParseError(format!("Failed to parse cursor: {}", e)))?;
667
668 sender
670 .send(bytes_res)
671 .await
672 .map_err(|e| GraphLoaderError::Other(format!("Failed to send data: {}", e)))?;
673
674 if let Some(cursor_id) = response_info.id
676 && response_info.has_more.unwrap_or(false)
677 {
678 loop {
679 let url = make_cursor_url(&format!("/{}", cursor_id));
680 let resp = handle_auth(client.post(url), db_config).send().await;
681
682 let resp =
683 crate::request::handle_arangodb_response(resp, |c| c == StatusCode::OK).await?;
684
685 let bytes_res = resp.bytes().await.map_err(|e| {
686 GraphLoaderError::ParseError(format!("Error reading response: {}", e))
687 })?;
688
689 let response_info =
690 serde_json::from_slice::<CursorResponse>(&bytes_res).map_err(|e| {
691 GraphLoaderError::ParseError(format!("Failed to parse cursor: {}", e))
692 })?;
693
694 sender
695 .send(bytes_res)
696 .await
697 .map_err(|e| GraphLoaderError::Other(format!("Failed to send data: {}", e)))?;
698
699 if !response_info.has_more.unwrap_or(false) {
700 break;
701 }
702 }
703
704 let delete_url = make_cursor_url(&format!("/{}", cursor_id));
706 let _ = handle_auth(client.delete(delete_url), db_config)
707 .send()
708 .await;
709 }
710
711 Ok(())
712 }
713}
714
715#[cfg(test)]
716mod tests {
717 use super::*;
718 use serde_json::json;
719
720 fn val(v: serde_json::Value) -> Value {
722 v
723 }
724
725 #[test]
726 fn test_convert_bool_from_bool() {
727 let result = convert_and_validate(&val(json!(true)), &DataType::Bool, "field", "test_id");
728 assert_eq!(result, Ok(Value::Bool(true)));
729
730 let result = convert_and_validate(&val(json!(false)), &DataType::Bool, "field", "test_id");
731 assert_eq!(result, Ok(Value::Bool(false)));
732 }
733
734 #[test]
735 fn test_convert_bool_from_string() {
736 for s in &["true", "True", "TRUE", "1", "yes", "Yes", "YES"] {
738 let result = convert_and_validate(&val(json!(s)), &DataType::Bool, "field", "test_id");
739 assert_eq!(result, Ok(Value::Bool(true)), "Failed for string: {}", s);
740 }
741
742 for s in &["false", "False", "FALSE", "0", "no", "No", "NO"] {
744 let result = convert_and_validate(&val(json!(s)), &DataType::Bool, "field", "test_id");
745 assert_eq!(result, Ok(Value::Bool(false)), "Failed for string: {}", s);
746 }
747
748 let result =
750 convert_and_validate(&val(json!("maybe")), &DataType::Bool, "field", "test_id");
751 assert!(result.is_err());
752 }
753
754 #[test]
755 fn test_convert_bool_from_numbers() {
756 let result = convert_and_validate(&val(json!(0)), &DataType::Bool, "field", "test_id");
758 assert_eq!(result, Ok(Value::Bool(false)));
759
760 let result = convert_and_validate(&val(json!(1)), &DataType::Bool, "field", "test_id");
761 assert_eq!(result, Ok(Value::Bool(true)));
762
763 let result = convert_and_validate(&val(json!(-5)), &DataType::Bool, "field", "test_id");
764 assert_eq!(result, Ok(Value::Bool(true)));
765
766 let result = convert_and_validate(&val(json!(0u64)), &DataType::Bool, "field", "test_id");
768 assert_eq!(result, Ok(Value::Bool(false)));
769
770 let result = convert_and_validate(&val(json!(42u64)), &DataType::Bool, "field", "test_id");
771 assert_eq!(result, Ok(Value::Bool(true)));
772
773 let result = convert_and_validate(&val(json!(0.0)), &DataType::Bool, "field", "test_id");
775 assert_eq!(result, Ok(Value::Bool(false)));
776
777 let result = convert_and_validate(&val(json!(3.15)), &DataType::Bool, "field", "test_id");
778 assert_eq!(result, Ok(Value::Bool(true)));
779 }
780
781 #[test]
782 fn test_convert_string_from_various_types() {
783 let result =
785 convert_and_validate(&val(json!("hello")), &DataType::String, "field", "test_id");
786 assert_eq!(result, Ok(Value::String("hello".to_string())));
787
788 let result = convert_and_validate(&val(json!(null)), &DataType::String, "field", "test_id");
790 assert_eq!(result, Ok(Value::String(String::new())));
791
792 let result = convert_and_validate(&val(json!(true)), &DataType::String, "field", "test_id");
794 assert_eq!(result, Ok(Value::String("true".to_string())));
795
796 let result = convert_and_validate(&val(json!(42)), &DataType::String, "field", "test_id");
798 assert_eq!(result, Ok(Value::String("42".to_string())));
799
800 let result =
802 convert_and_validate(&val(json!(42u64)), &DataType::String, "field", "test_id");
803 assert_eq!(result, Ok(Value::String("42".to_string())));
804
805 let result = convert_and_validate(&val(json!(3.15)), &DataType::String, "field", "test_id");
807 assert_eq!(result, Ok(Value::String("3.15".to_string())));
808
809 let result = convert_and_validate(
811 &val(json!({"key": "value"})),
812 &DataType::String,
813 "field",
814 "test_id",
815 );
816 assert!(result.is_ok());
817 if let Ok(Value::String(s)) = result {
818 assert!(s.contains("key"));
819 assert!(s.contains("value"));
820 }
821 }
822
823 #[test]
824 fn test_convert_u64_from_u64() {
825 let result = convert_and_validate(&val(json!(42u64)), &DataType::U64, "field", "test_id");
826 assert_eq!(result, Ok(json!(42)));
827
828 let result = convert_and_validate(&val(json!(0u64)), &DataType::U64, "field", "test_id");
829 assert_eq!(result, Ok(json!(0)));
830 }
831
832 #[test]
833 fn test_convert_u64_from_i64() {
834 let result = convert_and_validate(&val(json!(42)), &DataType::U64, "field", "test_id");
836 assert_eq!(result, Ok(json!(42)));
837
838 let result = convert_and_validate(&val(json!(0)), &DataType::U64, "field", "test_id");
839 assert_eq!(result, Ok(json!(0)));
840
841 let result = convert_and_validate(&val(json!(-1)), &DataType::U64, "field", "test_id");
843 assert!(result.is_err());
844 }
845
846 #[test]
847 fn test_convert_u64_from_f64() {
848 let result = convert_and_validate(&val(json!(42.7)), &DataType::U64, "field", "test_id");
850 assert_eq!(result, Ok(json!(43)));
851
852 let result = convert_and_validate(&val(json!(42.3)), &DataType::U64, "field", "test_id");
853 assert_eq!(result, Ok(json!(42)));
854
855 let result = convert_and_validate(&val(json!(-1.5)), &DataType::U64, "field", "test_id");
857 assert!(result.is_err());
858 }
859
860 #[test]
861 fn test_convert_u64_from_string() {
862 let result = convert_and_validate(&val(json!("42")), &DataType::U64, "field", "test_id");
863 assert_eq!(result, Ok(json!(42)));
864
865 let result = convert_and_validate(
867 &val(json!("not_a_number")),
868 &DataType::U64,
869 "field",
870 "test_id",
871 );
872 assert!(result.is_err());
873
874 let result = convert_and_validate(&val(json!("-1")), &DataType::U64, "field", "test_id");
876 assert!(result.is_err());
877 }
878
879 #[test]
880 fn test_convert_i64_from_i64() {
881 let result = convert_and_validate(&val(json!(42)), &DataType::I64, "field", "test_id");
882 assert_eq!(result, Ok(json!(42)));
883
884 let result = convert_and_validate(&val(json!(-42)), &DataType::I64, "field", "test_id");
885 assert_eq!(result, Ok(json!(-42)));
886
887 let result = convert_and_validate(&val(json!(0)), &DataType::I64, "field", "test_id");
888 assert_eq!(result, Ok(json!(0)));
889 }
890
891 #[test]
892 fn test_convert_i64_from_u64() {
893 let result = convert_and_validate(&val(json!(42u64)), &DataType::I64, "field", "test_id");
895 assert_eq!(result, Ok(json!(42)));
896
897 let result =
899 convert_and_validate(&val(json!(u64::MAX)), &DataType::I64, "field", "test_id");
900 assert!(result.is_err());
901 }
902
903 #[test]
904 fn test_convert_i64_from_f64() {
905 let result = convert_and_validate(&val(json!(42.7)), &DataType::I64, "field", "test_id");
907 assert_eq!(result, Ok(json!(43)));
908
909 let result = convert_and_validate(&val(json!(-42.3)), &DataType::I64, "field", "test_id");
910 assert_eq!(result, Ok(json!(-42)));
911 }
912
913 #[test]
914 fn test_convert_i64_from_string() {
915 let result = convert_and_validate(&val(json!("42")), &DataType::I64, "field", "test_id");
916 assert_eq!(result, Ok(json!(42)));
917
918 let result = convert_and_validate(&val(json!("-42")), &DataType::I64, "field", "test_id");
919 assert_eq!(result, Ok(json!(-42)));
920
921 let result = convert_and_validate(
923 &val(json!("not_a_number")),
924 &DataType::I64,
925 "field",
926 "test_id",
927 );
928 assert!(result.is_err());
929 }
930
931 #[test]
932 fn test_convert_f64_from_f64() {
933 let result = convert_and_validate(&val(json!(3.15)), &DataType::F64, "field", "test_id");
934 assert!(result.is_ok());
935 if let Ok(Value::Number(n)) = result {
936 assert_eq!(n.as_f64(), Some(3.15));
937 }
938
939 let result = convert_and_validate(&val(json!(-2.5)), &DataType::F64, "field", "test_id");
940 assert!(result.is_ok());
941 }
942
943 #[test]
944 fn test_convert_f64_from_integers() {
945 let result = convert_and_validate(&val(json!(42)), &DataType::F64, "field", "test_id");
947 assert!(result.is_ok());
948 if let Ok(Value::Number(n)) = result {
949 assert_eq!(n.as_f64(), Some(42.0));
950 }
951
952 let result = convert_and_validate(&val(json!(42u64)), &DataType::F64, "field", "test_id");
954 assert!(result.is_ok());
955 }
956
957 #[test]
958 fn test_convert_f64_from_string() {
959 let result = convert_and_validate(&val(json!("3.15")), &DataType::F64, "field", "test_id");
960 assert!(result.is_ok());
961 if let Ok(Value::Number(n)) = result {
962 assert_eq!(n.as_f64(), Some(3.15));
963 }
964
965 let result = convert_and_validate(&val(json!("-2.5")), &DataType::F64, "field", "test_id");
966 assert!(result.is_ok());
967
968 let result = convert_and_validate(
970 &val(json!("not_a_number")),
971 &DataType::F64,
972 "field",
973 "test_id",
974 );
975 assert!(result.is_err());
976 }
977
978 #[test]
979 fn test_convert_json_accepts_anything() {
980 let test_values = vec![
982 json!(null),
983 json!(true),
984 json!(false),
985 json!(42),
986 json!(-42),
987 json!(3.15),
988 json!("hello"),
989 json!({"key": "value"}),
990 json!([1, 2, 3]),
991 ];
992
993 for value in test_values {
994 let result = convert_and_validate(&value, &DataType::JSON, "field", "test_id");
995 assert_eq!(result, Ok(value.clone()), "Failed for value: {:?}", value);
996 }
997 }
998
999 #[test]
1000 fn test_default_values() {
1001 assert_eq!(default_value_for_type(&DataType::Bool), Value::Bool(false));
1002 assert_eq!(
1003 default_value_for_type(&DataType::String),
1004 Value::String(String::new())
1005 );
1006 assert_eq!(default_value_for_type(&DataType::U64), json!(0));
1007 assert_eq!(default_value_for_type(&DataType::I64), json!(0));
1008 assert!(default_value_for_type(&DataType::F64).is_number());
1009 assert_eq!(default_value_for_type(&DataType::JSON), Value::Null);
1010 }
1011
1012 #[test]
1013 fn test_error_messages_contain_context() {
1014 let result = convert_and_validate(
1016 &val(json!("invalid")),
1017 &DataType::U64,
1018 "my_field",
1019 "entity_123",
1020 );
1021 assert!(result.is_err());
1022 if let Err(msg) = result {
1023 assert!(msg.contains("my_field"));
1024 assert!(msg.contains("entity_123"));
1025 }
1026
1027 let result = convert_and_validate(&val(json!(-1)), &DataType::U64, "age", "user:42");
1028 assert!(result.is_err());
1029 if let Err(msg) = result {
1030 assert!(msg.contains("age"));
1031 assert!(msg.contains("user:42"));
1032 }
1033 }
1034
1035 #[test]
1036 fn test_graph_batch_type_error_tracking() {
1037 let mut batch = GraphBatch::new(Some(10));
1039
1040 assert_eq!(batch.type_error_count, 0);
1041 assert_eq!(batch.type_error_messages.len(), 0);
1042
1043 batch.add_type_error("Error 1".to_string());
1045 assert_eq!(batch.type_error_count, 1);
1046 assert_eq!(batch.type_error_messages.len(), 1);
1047
1048 for i in 2..=12 {
1050 batch.add_type_error(format!("Error {}", i));
1051 }
1052
1053 assert_eq!(batch.type_error_count, 12);
1055 assert_eq!(batch.type_error_messages.len(), 10);
1056 }
1057
1058 #[test]
1059 fn test_graph_batch_type_error_no_limit() {
1060 let mut batch = GraphBatch::new(None);
1062
1063 assert_eq!(batch.type_error_count, 0);
1064 assert_eq!(batch.type_error_messages.len(), 0);
1065
1066 for i in 1..=15 {
1068 batch.add_type_error(format!("Error {}", i));
1069 }
1070
1071 assert_eq!(batch.type_error_count, 15);
1073 assert_eq!(batch.type_error_messages.len(), 15);
1074 }
1075
1076 #[test]
1077 fn test_graph_batch_type_error_zero_limit() {
1078 let mut batch = GraphBatch::new(Some(0));
1080
1081 assert_eq!(batch.type_error_count, 0);
1082 assert_eq!(batch.type_error_messages.len(), 0);
1083
1084 for i in 1..=5 {
1086 batch.add_type_error(format!("Error {}", i));
1087 }
1088
1089 assert_eq!(batch.type_error_count, 5);
1091 assert_eq!(batch.type_error_messages.len(), 0);
1092 }
1093
1094 #[test]
1095 fn test_data_item_creation() {
1096 let item = DataItem::new("test_field".to_string(), DataType::String);
1097 assert_eq!(item.name, "test_field");
1098 assert_eq!(item.data_type, DataType::String);
1099 }
1100
1101 #[test]
1102 fn test_aql_query_creation() {
1103 let mut bind_vars = HashMap::new();
1104 bind_vars.insert("param1".to_string(), json!("value1"));
1105
1106 let query = AqlQuery::new("FOR v IN vertices RETURN v".to_string(), bind_vars.clone());
1107
1108 assert_eq!(query.query, "FOR v IN vertices RETURN v");
1109 assert_eq!(query.bind_vars.len(), 1);
1110 assert_eq!(query.bind_vars.get("param1"), Some(&json!("value1")));
1111 }
1112
1113 #[test]
1114 fn test_convert_bool_from_object_should_fail() {
1115 let result = convert_and_validate(
1116 &val(json!({"key": "value"})),
1117 &DataType::Bool,
1118 "field",
1119 "test_id",
1120 );
1121 assert!(result.is_err());
1122 }
1123
1124 #[test]
1125 fn test_convert_u64_from_object_should_fail() {
1126 let result = convert_and_validate(
1127 &val(json!({"key": "value"})),
1128 &DataType::U64,
1129 "field",
1130 "test_id",
1131 );
1132 assert!(result.is_err());
1133 }
1134
1135 #[test]
1136 fn test_convert_i64_from_object_should_fail() {
1137 let result = convert_and_validate(
1138 &val(json!({"key": "value"})),
1139 &DataType::I64,
1140 "field",
1141 "test_id",
1142 );
1143 assert!(result.is_err());
1144 }
1145
1146 #[test]
1147 fn test_convert_f64_from_object_should_fail() {
1148 let result = convert_and_validate(
1149 &val(json!({"key": "value"})),
1150 &DataType::F64,
1151 "field",
1152 "test_id",
1153 );
1154 assert!(result.is_err());
1155 }
1156}