bulk_indexing

bulk_indexing #

描述 #

bulk_indexing 处理器用来异步消费队列里面的 bulk 请求。

配置示例 #

一个简单的示例如下:

pipeline:
- name: bulk_request_ingest
  auto_start: true
  keep_running: true
  processor:
    - bulk_indexing:
        bulk_size_in_mb: 1
        queue_selector.labels:
          type: bulk_reshuffle
          level: cluster

参数说明 #

名称类型说明
elasticsearchstring默认的 Elasticsearch 集群 ID,如果队列 Labels 里面没有指定 elasticsearch 的话会使用这个参数
idle_timeout_in_secondsint消费队列的超时时间,默认 1, 即 1s
max_connection_per_nodeint目标节点允许的最大连接数,默认 1
max_worker_sizeint最大允许同时运行的 worker 大小,默认 10
bulk.compressbool是否开启请求压缩
bulk.batch_size_in_kbint批次请求的单位大小,单位 KB
bulk.batch_size_in_mbint批次请求的单位大小,单位 MB,默认 10
bulk.batch_size_in_docsint批次请求的文档个数, 默认 1000
bulk.retry_delay_in_secondsint请求重试的等待时间
bulk.reject_retry_delay_in_secondsint请求拒绝的等待时间
bulk.max_retry_timesint最大重试次数
bulk.failure_queuestring因为后端故障而失败的请求队列
bulk.invalid_queuestring因为请求不合法的 4xx 请求队列
bulk.dead_letter_queuestring超过最大重试次数的请求队列
bulk.safety_parsebool是否启用安全解析,即不采用 buffer 的方式,占用内存更高一点,默认为 true
bulk.doc_buffer_sizebool单次请求处理的最大文档 buff size,建议设置超过单个文档的最大大小,默认 256*1024
queue_selector.labelsmap根据 Label 来过滤一组需要消费的队列, 同 queues 配置
queue_selector.idsarray指定要消费的队列的 UUID, 字符数组
queue_selector.keysarray指定要消费的队列的唯一 Key 路径, 字符数组
waiting_afterarray是否等待指定队列消费完成才开始消费, 队列的 UUID, 字符数组
detect_active_queuebool是否自动检测符合条件的新的队列,默认 true
detect_intervalbool自动检测符合条件的新的队列的时间间隔,单位毫秒, 默认 5000
num_of_slicesint并行消费单个队列的线程, 运行时最大的 slice 大小
slicesarray允许的 slice 编号, int 数组
skip_info_missingbool忽略不满足条件的队列,如节点、索引、分片信息不存在时则需等待信息获取后再消费,默认为 false,否则会随机挑选一个 es 节点来发送请求
skip_empty_queuebool是否跳过空队列的消费, 默认 true
consumer.sourcestring消费者来源
consumer.idstring消费者唯一标识
consumer.namestring消费者名称
consumer.groupstring消费者组名称
consumer.fetch_min_bytesint拉取消息最小的字节大小, 默认 1
consumer.fetch_max_bytesint拉取消息最大的字节大小, 默认 10485760, 即 10MB
consumer.fetch_max_messagesint拉取最大的消息个数, 默认 1
consumer.fetch_max_wait_msint拉取最大的等待时间, 单位毫秒, 默认 10000