indexing_merge

indexing_merge #

描述 #

The indexing_merge processor is used to consume pure JSON documents in the queue, and merge them into bulk requests and save them in the specified queue. It needs to be consumed with the bulk_indexing processor, and batch writes are used instead of single requests to improve write throughput.

Configuration Example #

A simple example is as follows:

pipeline:
  - name: indexing_merge
    auto_start: true
    keep_running: true
    processor:
      - indexing_merge:
          input_queue: "request_logging"
          elasticsearch: "logging-server"
          index_name: "infini_gateway_requests"
          output_queue:
            name: "gateway_requests"
            label:
              tag: "request_logging"
          worker_size: 1
          bulk_size_in_mb: 10
  - name: logging_requests
    auto_start: true
    keep_running: true
    processor:
      - bulk_indexing:
          bulk:
            compress: true
            batch_size_in_mb: 10
            batch_size_in_docs: 5000
          consumer:
            fetch_max_messages: 100
          queues:
            type: indexing_merge
          when:
            cluster_available: [ "logging-server" ]

Parameter Description #

NameTypeDescription
input_queueintName of a subscribed queue
worker_sizeintNumber of threads that concurrently execute consumption tasks, which is set to 1 by default.
idle_timeout_in_secondsintTimeout duration of the consumption queue, in seconds. The default value is 5.
bulk_size_in_kbintSize of a bulk request, in KB.
bulk_size_in_mbintSize of a bulk request, in MB.
elasticsearchstringName of a target cluster, to which requests are saved.
index_namestringName of the index stored to the target cluster.
type_namestringName of the index type stored to the target cluster. It is set based on the cluster version. The value is doc for Elasticsearch versions earlier than v7 and _doc for versions later than v7.
output_queue.namestringThe name of output queue
output_queue.labelmapThe labels assign to the output queue,with label type:indexing_merge builtin.
failure_queuestringThe name of queue to save failure requests
invalid_queuestringThe name of queue to save invalid requests