bulk_indexing #
描述 #
bulk_indexing 处理器用来异步消费队列里面的 bulk 请求。
配置示例 #
一个简单的示例如下:
pipeline:
- name: bulk_request_ingest
auto_start: true
keep_running: true
processor:
- bulk_indexing:
bulk_size_in_mb: 1
queue_selector.labels:
type: bulk_reshuffle
level: cluster
参数说明 #
名称 | 类型 | 说明 |
---|---|---|
elasticsearch | string | 默认的 Elasticsearch 集群 ID,如果队列 Labels 里面没有指定 elasticsearch 的话会使用这个参数 |
idle_timeout_in_seconds | int | 消费队列的超时时间,默认 1 , 即 1s |
max_connection_per_node | int | 目标节点允许的最大连接数,默认 1 |
max_worker_size | int | 最大允许同时运行的 worker 大小,默认 10 |
bulk.compress | bool | 是否开启请求压缩 |
bulk.batch_size_in_kb | int | 批次请求的单位大小,单位 KB |
bulk.batch_size_in_mb | int | 批次请求的单位大小,单位 MB ,默认 10 |
bulk.batch_size_in_docs | int | 批次请求的文档个数, 默认 1000 |
bulk.retry_delay_in_seconds | int | 请求重试的等待时间 |
bulk.reject_retry_delay_in_seconds | int | 请求拒绝的等待时间 |
bulk.max_retry_times | int | 最大重试次数 |
bulk.failure_queue | string | 因为后端故障而失败的请求队列 |
bulk.invalid_queue | string | 因为请求不合法的 4xx 请求队列 |
bulk.dead_letter_queue | string | 超过最大重试次数的请求队列 |
bulk.safety_parse | bool | 是否启用安全解析,即不采用 buffer 的方式,占用内存更高一点,默认为 true |
bulk.doc_buffer_size | bool | 单次请求处理的最大文档 buff size,建议设置超过单个文档的最大大小,默认 256*1024 |
queue_selector.labels | map | 根据 Label 来过滤一组需要消费的队列, 同 queues 配置 |
queue_selector.ids | array | 指定要消费的队列的 UUID, 字符数组 |
queue_selector.keys | array | 指定要消费的队列的唯一 Key 路径, 字符数组 |
waiting_after | array | 是否等待指定队列消费完成才开始消费, 队列的 UUID, 字符数组 |
detect_active_queue | bool | 是否自动检测符合条件的新的队列,默认 true |
detect_interval | bool | 自动检测符合条件的新的队列的时间间隔,单位毫秒, 默认 5000 |
num_of_slices | int | 并行消费单个队列的线程, 运行时最大的 slice 大小 |
slices | array | 允许的 slice 编号, int 数组 |
skip_info_missing | bool | 忽略不满足条件的队列,如节点、索引、分片信息不存在时则需等待信息获取后再消费,默认为 false ,否则会随机挑选一个 es 节点来发送请求 |
skip_empty_queue | bool | 是否跳过空队列的消费, 默认 true |
consumer.source | string | 消费者来源 |
consumer.id | string | 消费者唯一标识 |
consumer.name | string | 消费者名称 |
consumer.group | string | 消费者组名称 |
consumer.fetch_min_bytes | int | 拉取消息最小的字节大小, 默认 1 |
consumer.fetch_max_bytes | int | 拉取消息最大的字节大小, 默认 10485760 , 即 10MB |
consumer.fetch_max_messages | int | 拉取最大的消息个数, 默认 1 |
consumer.fetch_max_wait_ms | int | 拉取最大的等待时间, 单位毫秒, 默认 10000 |