Search pipelines

This is an experimental feature and is not recommended for use in a production environment. For updates on the progress of the feature or if you want to leave feedback, join the discussion in the OpenSearch forum.

You can use search pipelines to build new or reuse existing result rerankers, query rewriters, and other components that operate on queries or results. Search pipelines make it easier for you to process search queries and search results within OpenSearch. Moving some of your application functionality into an OpenSearch search pipeline reduces the overall complexity of your application. As part of a search pipeline, you specify a list of processors that perform modular tasks. You can then easily add or reorder these processors to customize search results for your application.

Terminology

The following is a list of search pipeline terminology:

  • Search request processor: A component that takes a search request (the query and the metadata passed in the request), performs an operation with or on the search request, and returns a search request.
  • Search response processor: A component that takes a search response and search request (the query, results, and metadata passed in the request), performs an operation with or on the search response, and returns a search response.
  • Processor: Either a search request processor or a search response processor.
  • Search pipeline: An ordered list of processors that is integrated into OpenSearch. The pipeline intercepts a query, performs processing on the query, sends it to OpenSearch, intercepts the results, performs processing on the results, and returns them to the calling application, as shown in the following diagram.

Search processor diagram

Both request and response processing for the pipeline are performed on the coordinator node, so there is no shard-level processing.

Search request processors

OpenSearch supports the following search request processors:

  • script: Adds a script that is run on newly indexed documents.
  • filter_query: Adds a filtering query that is used to filter requests.

Search response processors

OpenSearch supports the following search response processors:

Viewing available processor types

You can use the Nodes Search Pipelines API to view the available processor types:

  1. GET /_nodes/search_pipelines

copy

The response contains the search_pipelines object that lists the available request and response processors:

Response

  1. {
  2. "_nodes" : {
  3. "total" : 1,
  4. "successful" : 1,
  5. "failed" : 0
  6. },
  7. "cluster_name" : "runTask",
  8. "nodes" : {
  9. "36FHvCwHT6Srbm2ZniEPhA" : {
  10. "name" : "runTask-0",
  11. "transport_address" : "127.0.0.1:9300",
  12. "host" : "127.0.0.1",
  13. "ip" : "127.0.0.1",
  14. "version" : "3.0.0",
  15. "build_type" : "tar",
  16. "build_hash" : "unknown",
  17. "roles" : [
  18. "cluster_manager",
  19. "data",
  20. "ingest",
  21. "remote_cluster_client"
  22. ],
  23. "attributes" : {
  24. "testattr" : "test",
  25. "shard_indexing_pressure_enabled" : "true"
  26. },
  27. "search_pipelines" : {
  28. "request_processors" : [
  29. {
  30. "type" : "filter_query"
  31. },
  32. {
  33. "type" : "script"
  34. }
  35. ],
  36. "response_processors" : [
  37. {
  38. "type" : "rename_field"
  39. }
  40. ]
  41. }
  42. }
  43. }
  44. }

In addition to the processors provided by OpenSearch, additional processors may be provided by plugins.

Creating a search pipeline

Search pipelines are stored in the cluster state. To create a search pipeline, you must configure an ordered list of processors in your OpenSearch cluster. You can have more than one processor of the same type in the pipeline. Each processor has a tag identifier that distinguishes it from the others. Tagging a specific processor can be helpful for debugging error messages, especially if you add multiple processors of the same type.

Example request

The following request creates a search pipeline with a filter_query request processor that uses a term query to return only public messages:

  1. PUT /_search/pipeline/my_pipeline
  2. {
  3. "request_processors": [
  4. {
  5. "filter_query" : {
  6. "tag" : "tag1",
  7. "description" : "This processor is going to restrict to publicly visible documents",
  8. "query" : {
  9. "term": {
  10. "visibility": "public"
  11. }
  12. }
  13. }
  14. }
  15. ]
  16. }

copy

Retrieving search pipelines

To retrieve the details of an existing search pipeline, use the Search Pipeline API.

To view all search pipelines, use the following request:

  1. GET /_search/pipeline

copy

The response contains the pipeline that you set up in the previous section:

Response

  1. {
  2. "my_pipeline" : {
  3. "request_processors" : [
  4. {
  5. "filter_query" : {
  6. "tag" : "tag1",
  7. "description" : "This processor is going to restrict to publicly visible documents",
  8. "query" : {
  9. "term" : {
  10. "visibility" : "public"
  11. }
  12. }
  13. }
  14. }
  15. ]
  16. }
  17. }

To view a particular pipeline, specify the pipeline name as a path parameter:

  1. GET /_search/pipeline/my_pipeline

copy

You can also use wildcard patterns to view a subset of pipelines, for example:

  1. GET /_search/pipeline/my*

copy

Using a search pipeline

To search with a pipeline, specify the pipeline name in the search_pipeline query parameter:

  1. GET /my_index/_search?search_pipeline=my_pipeline

copy

For a complete example of using a search pipeline with a filter_query processor, see filter_query processor example.

Default search pipeline

For convenience, you can set a default search pipeline for an index. Once your index has a default pipeline, you don’t need to specify the search_pipeline query parameter in every search request.

Setting a default search pipeline for an index

To set a default search pipeline for an index, specify the index.search.default_pipeline in the index’s settings:

  1. PUT /my_index/_settings
  2. {
  3. "index.search.default_pipeline" : "my_pipeline"
  4. }

copy

After setting the default pipeline for my_index, you can try the same search for all documents:

  1. GET /my_index/_search

copy

The response contains only the public document, indicating that the pipeline was applied by default:

Response

  1. {
  2. "took" : 19,
  3. "timed_out" : false,
  4. "_shards" : {
  5. "total" : 1,
  6. "successful" : 1,
  7. "skipped" : 0,
  8. "failed" : 0
  9. },
  10. "hits" : {
  11. "total" : {
  12. "value" : 1,
  13. "relation" : "eq"
  14. },
  15. "max_score" : 0.0,
  16. "hits" : [
  17. {
  18. "_index" : "my_index",
  19. "_id" : "1",
  20. "_score" : 0.0,
  21. "_source" : {
  22. "message" : "This is a public message",
  23. "visibility" : "public"
  24. }
  25. }
  26. ]
  27. }
  28. }

Disabling the default pipeline for a request

If you want to run a search request without applying the default pipeline, you can set the search_pipeline query parameter to _none:

  1. GET /my_index/_search?search_pipeline=_none

copy

Removing the default pipeline

To remove the default pipeline from an index, set it to null or _none:

  1. PUT /my_index/_settings
  2. {
  3. "index.search.default_pipeline" : null
  4. }

copy

  1. PUT /my_index/_settings
  2. {
  3. "index.search.default_pipeline" : "_none"
  4. }

copy

Updating a search pipeline

To update a search pipeline dynamically, replace the search pipeline using the Search Pipeline API.

Example request

The following request upserts my_pipeline by adding a filter_query request processor and a rename_field response processor:

  1. PUT /_search/pipeline/my_pipeline
  2. {
  3. "request_processors": [
  4. {
  5. "filter_query": {
  6. "tag": "tag1",
  7. "description": "This processor returns only publicly visible documents",
  8. "query": {
  9. "term": {
  10. "visibility": "public"
  11. }
  12. }
  13. }
  14. }
  15. ],
  16. "response_processors": [
  17. {
  18. "rename_field": {
  19. "field": "message",
  20. "target_field": "notification"
  21. }
  22. }
  23. ]
  24. }

copy

Search pipeline versions

When creating your pipeline, you can specify a version for it in the version parameter:

  1. PUT _search/pipeline/my_pipeline
  2. {
  3. "version": 1234,
  4. "request_processors": [
  5. {
  6. "script": {
  7. "source": """
  8. if (ctx._source['size'] > 100) {
  9. ctx._source['explain'] = false;
  10. }
  11. """
  12. }
  13. }
  14. ]
  15. }

copy

The version is provided in all subsequent responses to get pipeline requests:

  1. GET _search/pipeline/my_pipeline

The response contains the pipeline version:

Response

  1. {
  2. "my_pipeline": {
  3. "version": 1234,
  4. "request_processors": [
  5. {
  6. "script": {
  7. "source": """
  8. if (ctx._source['size'] > 100) {
  9. ctx._source['explain'] = false;
  10. }
  11. """
  12. }
  13. }
  14. ]
  15. }
  16. }