bulk_indexing

bulk_indexing #

Description #

The bulk_indexing processor is used to asynchronously consume bulk requests in queues.

Configuration Example #

A simple example is as follows:

pipeline:
- name: bulk_request_ingest
  auto_start: true
  keep_running: true
  processor:
    - bulk_indexing:
        bulk_size_in_mb: 1
        queue_selector.labels:
          type: bulk_reshuffle
          level: cluster

Parameter Description #

NameTypeDescription
elasticsearchstringThe default Elasticsearch cluster ID, which will be used if elasticsearch is not specified in the queue Labels
idle_timeout_in_secondsintTimeout duration of the consumption queue, which is set to 1 by default.
max_connection_per_nodeintMaximum number of connections allowed by the target node. The default value is 1.
max_worker_sizeintThe maximum size of workers allowed to run at the same time, default 10
bulk.batch_size_in_kbintSize of a bulk request, in KB.
bulk.batch_size_in_mbintSize of a bulk request, in MB.
bulk.batch_size_in_docsintNum of docs in bulk request, default 1000
bulk.compressboolWhether to enable request compression.
bulk.retry_delay_in_secondsintWaiting time for request retry.
bulk.reject_retry_delay_in_secondsintWaiting time for request rejection.
bulk.max_retry_timesintMaximum retry count.
bulk.failure_queuestringQueue for storing requests that fail because of a back-end failure.
bulk.invalid_queuestringQueue for storing requests, for which 4xx is returned because of invalid requests.
bulk.dead_letter_queuestringRequest queue, for which the maximum retry count is exceeded.
bulk.safety_parseboolWhether to enable secure parsing, that is, no buffer is used and memory usage is higher. The default value is true.
bulk.doc_buffer_sizeboolMaximum document buffer size for the processing of a single request. You are advised to set it to be greater than the maximum size of a single document. The default value is 256*1024.
queue_selector.labelsmapA group of queues filtered by label, in which data needs to be consumed. alias queues
queue_selector.idsarraySpecifies the UUID of the queue to consume, an array of string
queue_selector.keysarraySpecifies the unique Key path of the queue to consume, string array
waiting_afterarrayWhether to wait for the specified queue to finish consumption before starting consumption, UUID of the queue, string array
detect_active_queueboolWhether to automatically detect new queues that meet the conditions, default true
detect_intervalboolThe time interval for automatically detecting new queues that meet the conditions, in milliseconds, default 5000
num_of_slicesintThreads consuming a single queue in parallel, maximum slice size at runtime
slicesarrayAllowed slice numbers, int array
skip_info_missingboolWhether to ignore queue data consumption when conditions are not met, for example, the node, index, or shard information does not exist, that is, whether to consume queue data after information is obtained. The default value is false. Otherwise, one Elasticsearch node is selected to send requests.
skip_empty_queueboolWhether to skip consumption of empty queue, default true
consumer.sourcestringconsumer source
consumer.idstringconsumer UUID
consumer.namestringconsumer name
consumer.groupstringconsumer group name
consumer.fetch_min_bytesintMinimum size in bytes to pull messages, default 1
consumer.fetch_max_bytesintThe maximum byte size of the pull message, the default is 10485760, which is 10MB
consumer.fetch_max_messagesintPull the maximum number of messages, default 1
consumer.fetch_max_wait_msintPull maximum waiting time, in milliseconds, default 10000