news 2026/5/1 6:10:32

Benthos使用示例

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Benthos使用示例

展示一些完整的Benthos使用示例:

1.基础配置示例

YAML配置文件

yaml

# config.yaml input: gcp_pubsub: project: my-project subscription: my-subscription pipeline: processors: - bloblang: | # 转换数据 root = { "id": this.id, "processed_at": now(), "data": this.body.uppercase() } output: file: path: "./output/${! timestamp_unix() }.json" codec: lines

2.CSV批量处理示例

yaml

# batch_csv.yaml input: file: paths: - ./data/*.csv codec: lines pipeline: processors: # 1. CSV解析 - parse_csv: schema: | [{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"value","type":"float"}] # 2. 批量聚合(每100条一批) - group_by_value: value: ${!batch_index()} period: 1s size: 100 # 3. 批量处理 - for_each: - bloblang: | root = this root.batch_id = uuid_v4() root.processed_at = now() output: sql_raw: driver: postgres dsn: postgres://user:pass@localhost:5432/db query: | INSERT INTO records (id, name, value, batch_id, processed_at) VALUES ($1, $2, $3, $4, $5) args_mapping: | [ this.id, this.name, this.value, this.batch_id, this.processed_at ]

3.Go代码中嵌入Benthos

go

package main import ( "context" "fmt" "github.com/benthosdev/benthos/v4/public/service" _ "github.com/benthosdev/benthos/v4/public/components/all" // 导入所有组件 ) func main() { // 创建流 builder := service.NewStreamBuilder() // 配置输入 err := builder.SetYAML(` input: generate: mapping: | root.id = uuid_v4() root.timestamp = timestamp_unix() interval: 1s count: 100 pipeline: processors: - batch: count: 10 processors: - archive: format: lines - compress: algorithm: gzip output: http_client: url: http://localhost:8080/api/batch verb: POST headers: Content-Type: application/json `) if err != nil { panic(err) } // 添加自定义处理器 builder.AddProcessorFunc(func(ctx context.Context, m *service.Message) ([]*service.Message, error) { // 自定义处理逻辑 body, err := m.AsBytes() if err != nil { return nil, err } fmt.Printf("Processing: %s\n", string(body)) return []*service.Message{m}, nil }) // 运行流 stream, err := builder.Build() if err != nil { panic(err) } ctx := context.Background() if err := stream.Run(ctx); err != nil { panic(err) } }

4.Kafka到Elasticsearch批量处理

yaml

# kafka_to_es.yaml input: kafka: addresses: - localhost:9092 topics: - logs consumer_group: batch-processor batching: count: 1000 # 每批1000条 period: 30s # 或每30秒 pipeline: threads: 4 # 并行处理 processors: # 批量处理器 - bloblang: | # 解析JSON日志 root = this.parse_json() root.processed = now() root.host = this.host.lowercase() # 批量过滤 - filter_parts: bloblang: | this.level != "DEBUG" # 过滤掉DEBUG日志 # 批量映射 - mapping: | root.index = "logs-${!timestamp_format(now(),"2006-01-02")}" root.id = this.id root.type = "_doc" root.body = this output: elasticsearch: urls: - http://localhost:9200 index: ${!json().index} id: ${!json().id} action: index max_in_flight: 10 # 并发数 batching: count: 500 # 输出批次大小 period: 10s check: | root = this.index.suffix("-bulk") != ""

5.HTTP API批处理服务

go

// main.go package main import ( "context" "net/http" "github.com/benthosdev/benthos/v4/public/service" _ "github.com/benthosdev/benthos/v4/public/components/io" _ "github.com/benthosdev/benthos/v4/public/components/prometheus" ) func main() { // 创建HTTP输入和输出的流 httpStream := ` http: address: 0.0.0.0:8080 path: /ingest allowed_verbs: [POST] pipeline: processors: - batch: count: 50 period: 10s processors: - try: - bloblang: | root.items = this.map_each(item -> { "id": item.id, "processed": now(), "status": "pending" }) - catch: - bloblang: | root.error = "batch processing failed" root.items = [] - split: # 拆分为单个项目处理 - bloblang: | root = this root.batch_size = batch_size() output: broker: pattern: fan_out outputs: - redis_streams: url: tcp://localhost:6379 stream: processed_items body_key: "" max_length: 1000 - http_client: url: http://monitor:9090/metrics verb: POST headers: Content-Type: application/json batching: count: 100 - prometheus: metric_mapping: | batch_size: type: counter path: batch_size labels: source: http_ingest ` builder := service.NewStreamBuilder() // 从YAML配置 if err := builder.SetYAML(httpStream); err != nil { panic(err) } // 添加监控端点 builder.SetHTTPMux("/metrics", func(w http.ResponseWriter, r *http.Request) { // 自定义监控 w.WriteHeader(http.StatusOK) w.Write([]byte(`{"status":"healthy"}`)) }) stream, err := builder.Build() if err != nil { panic(err) } ctx := context.Background() if err := stream.Run(ctx); err != nil { panic(err) } }

6.错误处理和重试

yaml

# error_handling.yaml input: kafka: addresses: [localhost:9092] topics: [orders] batching: count: 100 pipeline: processors: - try: - bloblang: | root = this.parse_json().catch(this) - cache: resource: redis_cache operator: set key: "${!this.order_id}" value: "${!content()}" - catch: - bloblang: | root.error = error() root.original = content() root.retry_count = meta("retry_count").or(0) + 1 - retry: max_retries: 3 backoff: initial_interval: 1s max_interval: 30s output: broker: pattern: try outputs: # 主要输出 - sql_raw: driver: postgres dsn: postgres://user:pass@localhost/db query: "INSERT INTO orders VALUES ($1, $2, $3)" args_mapping: "[this.id, this.amount, this.timestamp]" # 失败时输出到死信队列 - drop_on: ${!error()} - kafka: addresses: [localhost:9092] topic: dead_letter_queue key: "${!meta(\"kafka_key\")}"

7.动态批处理配置

yaml

# dynamic_batching.yaml input: generate: count: 1000 interval: 100ms mapping: 'root = {"value": random_int()}' pipeline: processors: - batch: # 动态批次大小 count: ${!meta("batch_size").or(50)} processors: - bloblang: | root.items = this root.batch_id = uuid_v4() root.total = this.sum(item -> item.value) # 基于内容调整批次大小 - branch: processors: - bloblang: | new_size = if this.total > 1000 { 20 # 总值大时用小批次 } else { 100 # 总值小时用大批次 } meta_set("batch_size", new_size) result_map: 'root = deleted()' # 不修改内容 output: http_client: url: http://api:8080/process-batch verb: POST

8.运行和监控

bash

# 1. 安装Benthos go install github.com/benthosdev/benthos/v4/cmd/benthos@latest # 2. 运行配置文件 benthos -c config.yaml # 3. 动态重载配置 benthos -c config.yaml --reload # 4. 测试配置 benthos lint config.yaml benthos test config.yaml # 5. 查看指标 curl http://localhost:4195/stats

9.实用技巧

go

// 自定义批处理插件 func init() { service.RegisterProcessor( "batch_transform", service.NewConfigSpec(). Field(service.NewStringField("prefix")), func(conf *service.ParsedConfig, mgr *service.Resources) ( service.Processor, error, ) { prefix, err := conf.FieldString("prefix") if err != nil { return nil, err } return service.NewBatchProcessorFunc(func(ctx context.Context, batch service.MessageBatch) ([]service.MessageBatch, error) { for _, msg := range batch { msg.SetStructured(map[string]interface{}{ "prefixed": prefix + msg.AsString(), }) } return []service.MessageBatch{batch}, nil }), nil }, ) }

这些示例展示了Benthos在批处理场景下的灵活性和强大功能。你可以根据实际需求组合使用这些组件。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/18 10:31:51

动态声学模型,抑郁预警更准

📝 博客主页:Jax的CSDN主页 动态声学模型:抑郁预警的精准突破目录动态声学模型:抑郁预警的精准突破 引言:抑郁筛查的困境与动态声学的曙光 一、动态声学模型:从静态到动态的范式跃迁 1.1 核心原理&#xff…

作者头像 李华
网站建设 2026/4/17 18:47:25

支持实时录音与多格式导出|FunASR WebUI镜像使用手册

支持实时录音与多格式导出|FunASR WebUI镜像使用手册 1. 快速上手:从启动到首次识别 你是不是也经常遇到这样的场景?会议录音要整理成文字、课程音频需要转写笔记,或者想给一段视频加字幕却苦于手动输入太慢。现在,有…

作者头像 李华
网站建设 2026/4/23 14:46:01

NewBie-image-Exp0.1与Stable Diffusion对比:动漫生成质量实测报告

NewBie-image-Exp0.1与Stable Diffusion对比:动漫生成质量实测报告 1. 引言:一场关于动漫生成能力的直接对话 如果你正在寻找一个能稳定输出高质量动漫图像的AI模型,那么你很可能已经听说过 Stable Diffusion ——这个开源社区中的“老将”…

作者头像 李华
网站建设 2026/4/29 14:28:22

2025终极选择:告别卡顿,这3款终端如何重塑你的开发体验?

2025终极选择:告别卡顿,这3款终端如何重塑你的开发体验? 【免费下载链接】wezterm A GPU-accelerated cross-platform terminal emulator and multiplexer written by wez and implemented in Rust 项目地址: https://gitcode.com/GitHub_T…

作者头像 李华
网站建设 2026/4/24 7:37:57

Amlogic电视盒子变废为宝:从刷机到高性能服务器的终极改造指南

Amlogic电视盒子变废为宝:从刷机到高性能服务器的终极改造指南 【免费下载链接】amlogic-s9xxx-armbian amlogic-s9xxx-armbian: 该项目提供了为Amlogic、Rockchip和Allwinner盒子构建的Armbian系统镜像,支持多种设备,允许用户将安卓TV系统更…

作者头像 李华
网站建设 2026/4/27 0:32:48

升级FSMN VAD后,语音检测效率提升3倍经验总结

升级FSMN VAD后,语音检测效率提升3倍经验总结 1. 背景与升级动因 1.1 语音活动检测的实际挑战 在处理大量音频数据的场景中,比如会议录音转写、电话客服质检、课堂语音分析等,我们常常面临一个核心问题:如何从长时间的音频流中…

作者头像 李华