bulk_indexing #
Description #
The bulk_indexing processor is used to asynchronously consume bulk requests in queues.
Configuration Example #
A simple example is as follows:
pipeline:
- name: bulk_request_ingest
auto_start: true
keep_running: true
processor:
- bulk_indexing:
bulk_size_in_mb: 1
queue_selector.labels:
type: bulk_reshuffle
level: cluster
Parameter Description #
Name | Type | Description |
---|---|---|
elasticsearch | string | The default Elasticsearch cluster ID, which will be used if elasticsearch is not specified in the queue Labels |
idle_timeout_in_seconds | int | Timeout duration of the consumption queue, which is set to 1 by default. |
max_connection_per_node | int | Maximum number of connections allowed by the target node. The default value is 1 . |
max_worker_size | int | The maximum size of workers allowed to run at the same time, default 10 |
bulk.batch_size_in_kb | int | Size of a bulk request, in KB . |
bulk.batch_size_in_mb | int | Size of a bulk request, in MB . |
bulk.batch_size_in_docs | int | Num of docs in bulk request, default 1000 |
bulk.compress | bool | Whether to enable request compression. |
bulk.retry_delay_in_seconds | int | Waiting time for request retry. |
bulk.reject_retry_delay_in_seconds | int | Waiting time for request rejection. |
bulk.max_retry_times | int | Maximum retry count. |
bulk.failure_queue | string | Queue for storing requests that fail because of a back-end failure. |
bulk.invalid_queue | string | Queue for storing requests, for which 4xx is returned because of invalid requests. |
bulk.dead_letter_queue | string | Request queue, for which the maximum retry count is exceeded. |
bulk.safety_parse | bool | Whether to enable secure parsing, that is, no buffer is used and memory usage is higher. The default value is true . |
bulk.doc_buffer_size | bool | Maximum document buffer size for the processing of a single request. You are advised to set it to be greater than the maximum size of a single document. The default value is 256*1024 . |
queue_selector.labels | map | A group of queues filtered by label, in which data needs to be consumed. alias queues |
queue_selector.ids | array | Specifies the UUID of the queue to consume, an array of string |
queue_selector.keys | array | Specifies the unique Key path of the queue to consume, string array |
waiting_after | array | Whether to wait for the specified queue to finish consumption before starting consumption, UUID of the queue, string array |
detect_active_queue | bool | Whether to automatically detect new queues that meet the conditions, default true |
detect_interval | bool | The time interval for automatically detecting new queues that meet the conditions, in milliseconds, default 5000 |
num_of_slices | int | Threads consuming a single queue in parallel, maximum slice size at runtime |
slices | array | Allowed slice numbers, int array |
skip_info_missing | bool | Whether to ignore queue data consumption when conditions are not met, for example, the node, index, or shard information does not exist, that is, whether to consume queue data after information is obtained. The default value is false . Otherwise, one Elasticsearch node is selected to send requests. |
skip_empty_queue | bool | Whether to skip consumption of empty queue, default true |
consumer.source | string | consumer source |
consumer.id | string | consumer UUID |
consumer.name | string | consumer name |
consumer.group | string | consumer group name |
consumer.fetch_min_bytes | int | Minimum size in bytes to pull messages, default 1 |
consumer.fetch_max_bytes | int | The maximum byte size of the pull message, the default is 10485760 , which is 10MB |
consumer.fetch_max_messages | int | Pull the maximum number of messages, default 1 |
consumer.fetch_max_wait_ms | int | Pull maximum waiting time, in milliseconds, default 10000 |