Rust client

The OpenSearch Rust client lets you connect your Rust application with the data in your OpenSearch cluster. For the client’s complete API documentation and additional examples, see the OpenSearch docs.rs documentation.

This getting started guide illustrates how to connect to OpenSearch, index documents, and run queries. For the client source code, see the opensearch-rs repo.

Setup

If you’re starting a new project, add the opensearch crate to Cargo.toml:

  1. [dependencies]
  2. opensearch = "1.0.0"

copy

Additionally, you may want to add the following serde dependencies that help serialize types to JSON and deserialize JSON responses:

  1. serde = "~1"
  2. serde_json = "~1"

copy

The Rust client uses the higher-level reqwest HTTP client library for HTTP requests, and reqwest uses the tokio platform to support asynchronous requests. If you are planning to use asynchronous functions, you need to add the tokio dependency to Cargo.toml:

  1. tokio = { version = "*", features = ["full"] }

copy

See the Sample program section for the complete Cargo.toml file.

To use the Rust client API, import the modules, structs, and enums you need:

  1. use opensearch::OpenSearch;

copy

Connecting to OpenSearch

To connect to the default OpenSearch host, create a default client object that connects to OpenSearch at the address http://localhost:9200:

  1. let client = OpenSearch::default();

copy

To connect to an OpenSearch host that is running at a different address, create a client with the specified address:

  1. let transport = Transport::single_node("http://localhost:9200")?;
  2. let client = OpenSearch::new(transport);

copy

Alternatively, you can customize the URL and use a connection pool by creating a TransportBuilder struct and passing it to OpenSearch::new to create a new instance of the client:

  1. let url = Url::parse("http://localhost:9200")?;
  2. let conn_pool = SingleNodeConnectionPool::new(url);
  3. let transport = TransportBuilder::new(conn_pool).disable_proxy().build()?;
  4. let client = OpenSearch::new(transport);

copy

Connecting to Amazon OpenSearch Service

The following example illustrates connecting to Amazon OpenSearch Service:

  1. let url = Url::parse("https://...");
  2. let service_name = "es";
  3. let conn_pool = SingleNodeConnectionPool::new(url?);
  4. let region_provider = RegionProviderChain::default_provider().or_else("us-east-1");
  5. let aws_config = aws_config::from_env().region(region_provider).load().await.clone();
  6. let transport = TransportBuilder::new(conn_pool)
  7. .auth(aws_config.clone().try_into()?)
  8. .service_name(service_name)
  9. .build()?;
  10. let client = OpenSearch::new(transport);

copy

Connecting to Amazon OpenSearch Serverless

The following example illustrates connecting to Amazon OpenSearch Serverless Service:

  1. let url = Url::parse("https://...");
  2. let service_name = "aoss";
  3. let conn_pool = SingleNodeConnectionPool::new(url?);
  4. let region_provider = RegionProviderChain::default_provider().or_else("us-east-1");
  5. let aws_config = aws_config::from_env().region(region_provider).load().await.clone();
  6. let transport = TransportBuilder::new(conn_pool)
  7. .auth(aws_config.clone().try_into()?)
  8. .service_name(service_name)
  9. .build()?;
  10. let client = OpenSearch::new(transport);

copy

Creating an index

To create an OpenSearch index, use the create function of the opensearch::indices::Indices struct. You can use the following code to construct a JSON object with custom mappings:

  1. let response = client
  2. .indices()
  3. .create(IndicesCreateParts::Index("movies"))
  4. .body(json!({
  5. "mappings" : {
  6. "properties" : {
  7. "title" : { "type" : "text" }
  8. }
  9. }
  10. }))
  11. .send()
  12. .await?;

copy

Indexing a document

You can index a document into OpenSearch using the client’s index function:

  1. let response = client
  2. .index(IndexParts::IndexId("movies", "1"))
  3. .body(json!({
  4. "id": 1,
  5. "title": "Moneyball",
  6. "director": "Bennett Miller",
  7. "year": "2011"
  8. }))
  9. .send()
  10. .await?;

copy

Performing bulk operations

You can perform several operations at the same time by using the client’s bulk function. First, create the JSON body of a Bulk API call, and then pass it to the bulk function:

  1. let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4);
  2. // add the first operation and document
  3. body.push(json!({"index": {"_id": "2"}}).into());
  4. body.push(json!({
  5. "id": 2,
  6. "title": "Interstellar",
  7. "director": "Christopher Nolan",
  8. "year": "2014"
  9. }).into());
  10. // add the second operation and document
  11. body.push(json!({"index": {"_id": "3"}}).into());
  12. body.push(json!({
  13. "id": 3,
  14. "title": "Star Trek Beyond",
  15. "director": "Justin Lin",
  16. "year": "2015"
  17. }).into());
  18. let response = client
  19. .bulk(BulkParts::Index("movies"))
  20. .body(body)
  21. .send()
  22. .await?;

copy

Searching for documents

The easiest way to search for documents is to construct a query string. The following code uses a multi_match query to search for “miller” in the title and director fields. It boosts the documents where “miller” appears in the title field:

  1. response = client
  2. .search(SearchParts::Index(&["movies"]))
  3. .from(0)
  4. .size(10)
  5. .body(json!({
  6. "query": {
  7. "multi_match": {
  8. "query": "miller",
  9. "fields": ["title^2", "director"]
  10. }
  11. }
  12. }))
  13. .send()
  14. .await?;

copy

You can then read the response body as JSON and iterate over the hits array to read all the _source documents:

  1. let response_body = response.json::<Value>().await?;
  2. for hit in response_body["hits"]["hits"].as_array().unwrap() {
  3. // print the source document
  4. println!("{}", serde_json::to_string_pretty(&hit["_source"]).unwrap());
  5. }

copy

Deleting a document

You can delete a document using the client’s delete function:

  1. let response = client
  2. .delete(DeleteParts::IndexId("movies", "2"))
  3. .send()
  4. .await?;

copy

Deleting an index

You can delete an index using the delete function of the opensearch::indices::Indices struct:

  1. let response = client
  2. .indices()
  3. .delete(IndicesDeleteParts::Index(&["movies"]))
  4. .send()
  5. .await?;

copy

Sample program

The sample program uses the following Cargo.toml file with all dependencies described in the Setup section:

  1. [package]
  2. name = "os_rust_project"
  3. version = "0.1.0"
  4. edition = "2021"
  5. # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
  6. [dependencies]
  7. opensearch = "1.0.0"
  8. tokio = { version = "*", features = ["full"] }
  9. serde = "~1"
  10. serde_json = "~1"

copy

The following sample program creates a client, adds an index with non-default mappings, inserts a document, performs bulk operations, searches for the document, deletes the document, and then deletes the index:

  1. use opensearch::{DeleteParts, OpenSearch, IndexParts, http::request::JsonBody, BulkParts, SearchParts};
  2. use opensearch::{indices::{IndicesDeleteParts, IndicesCreateParts}};
  3. use serde_json::{json, Value};
  4. #[tokio::main]
  5. async fn main() -> Result<(), Box<dyn std::error::Error>> {
  6. let client = OpenSearch::default();
  7. // Create an index
  8. let mut response = client
  9. .indices()
  10. .create(IndicesCreateParts::Index("movies"))
  11. .body(json!({
  12. "mappings" : {
  13. "properties" : {
  14. "title" : { "type" : "text" }
  15. }
  16. }
  17. }))
  18. .send()
  19. .await?;
  20. let mut successful = response.status_code().is_success();
  21. if successful {
  22. println!("Successfully created an index");
  23. }
  24. else {
  25. println!("Could not create an index");
  26. }
  27. // Index a single document
  28. println!("Indexing a single document...");
  29. response = client
  30. .index(IndexParts::IndexId("movies", "1"))
  31. .body(json!({
  32. "id": 1,
  33. "title": "Moneyball",
  34. "director": "Bennett Miller",
  35. "year": "2011"
  36. }))
  37. .send()
  38. .await?;
  39. successful = response.status_code().is_success();
  40. if successful {
  41. println!("Successfully indexed a document");
  42. }
  43. else {
  44. println!("Could not index document");
  45. }
  46. // Index multiple documents using the bulk operation
  47. println!("Indexing multiple documents...");
  48. let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4);
  49. // add the first operation and document
  50. body.push(json!({"index": {"_id": "2"}}).into());
  51. body.push(json!({
  52. "id": 2,
  53. "title": "Interstellar",
  54. "director": "Christopher Nolan",
  55. "year": "2014"
  56. }).into());
  57. // add the second operation and document
  58. body.push(json!({"index": {"_id": "3"}}).into());
  59. body.push(json!({
  60. "id": 3,
  61. "title": "Star Trek Beyond",
  62. "director": "Justin Lin",
  63. "year": "2015"
  64. }).into());
  65. response = client
  66. .bulk(BulkParts::Index("movies"))
  67. .body(body)
  68. .send()
  69. .await?;
  70. let mut response_body = response.json::<Value>().await?;
  71. successful = response_body["errors"].as_bool().unwrap() == false;
  72. if successful {
  73. println!("Successfully performed bulk operations");
  74. }
  75. else {
  76. println!("Could not perform bulk operations");
  77. }
  78. // Search for a document
  79. println!("Searching for a document...");
  80. response = client
  81. .search(SearchParts::Index(&["movies"]))
  82. .from(0)
  83. .size(10)
  84. .body(json!({
  85. "query": {
  86. "multi_match": {
  87. "query": "miller",
  88. "fields": ["title^2", "director"]
  89. }
  90. }
  91. }))
  92. .send()
  93. .await?;
  94. response_body = response.json::<Value>().await?;
  95. for hit in response_body["hits"]["hits"].as_array().unwrap() {
  96. // print the source document
  97. println!("{}", serde_json::to_string_pretty(&hit["_source"]).unwrap());
  98. }
  99. // Delete a document
  100. response = client
  101. .delete(DeleteParts::IndexId("movies", "2"))
  102. .send()
  103. .await?;
  104. successful = response.status_code().is_success();
  105. if successful {
  106. println!("Successfully deleted a document");
  107. }
  108. else {
  109. println!("Could not delete document");
  110. }
  111. // Delete the index
  112. response = client
  113. .indices()
  114. .delete(IndicesDeleteParts::Index(&["movies"]))
  115. .send()
  116. .await?;
  117. successful = response.status_code().is_success();
  118. if successful {
  119. println!("Successfully deleted the index");
  120. }
  121. else {
  122. println!("Could not delete the index");
  123. }
  124. Ok(())
  125. }

copy