news 2026/5/1 10:58:26

DorisStreamLoader工具类

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
DorisStreamLoader工具类
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <!-- SpringBoot 父工程 必须带,版本统一管理核心,版本号建议固定2.7.18 稳定版 --> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.18</version> <relativePath/> </parent> <groupId>com.example</groupId> <artifactId>demo-project</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo-project</name> <description>SpringBoot Project</description> <!-- JDK版本指定 1.8 --> <properties> <java.version>1.8</java.version> </properties> <!-- 代码所需全部依赖【都带版本号】 --> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>2.7.18</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.13.5</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.13</version> </dependency> <dependency> <groupId>org.apache.tomcat</groupId> <artifactId>tomcat-util</artifactId> <version>9.0.80</version> </dependency> </dependencies> <!-- SpringBoot打包插件,必须带,打包可执行jar包 --> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.7.18</version> </plugin> </plugins> </build> </project>
package com.example.demo.util; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.http.HttpHeaders; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.DefaultRedirectStrategy; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; import org.apache.tomcat.util.codec.binary.Base64; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.util.*; /** * @author qushen * @create 2023/6/22 19:41:08 */ @Component public class DorisStreamLoader { private final Logger log = LoggerFactory.getLogger(DorisStreamLoader.class); /** * 用户名 */ @Value("${spring.datasource.doris.username}") private String user ; /** * 密码 */ @Value("${spring.datasource.doris.password}") private String password; /** * doris stream load url */ @Value("${spring.datasource.doris.loadUrl}") private String loadUrl ; private ObjectMapper objectMapper = new ObjectMapper(); { objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); objectMapper.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai")); objectMapper.configure(SerializationFeature.WRITE_ENUMS_USING_INDEX, true); } /** * 构建http客户端 */ final HttpClientBuilder httpClientBuilder = HttpClients .custom() .setRedirectStrategy(new DefaultRedirectStrategy() { @Override protected boolean isRedirectable(String method) { return true; } }); /** * JSON格式的数据导入 * @param content String * @throws Exception Exception */ public void loadJson(String table,List<Map<String,Object>> datas,Map<String,String> defaultValue) throws Exception { try (CloseableHttpClient client = httpClientBuilder.build()) { String fullLoadUrl = String.format(loadUrl,table); HttpPut httpPut = new HttpPut(fullLoadUrl); httpPut.removeHeaders(HttpHeaders.CONTENT_LENGTH); httpPut.removeHeaders(HttpHeaders.TRANSFER_ENCODING); httpPut.setHeader(HttpHeaders.EXPECT, "100-continue"); httpPut.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(user, password)); httpPut.setHeader("Content-Type", "application/json;charset=UTF-8"); // You can set stream load related properties in the Header, here we set label and column_separator. httpPut.setHeader("column_separator", ","); httpPut.setHeader("format", "json"); httpPut.setHeader("strip_outer_array", "true"); httpPut.setHeader("line_delimiter", "\\x02"); httpPut.setHeader("two_phase_commit", "false"); httpPut.setHeader("strict_mode", "true"); // Set up the import file. Here you can also use StringEntity to transfer arbitrary data. ArrayNode arrayNode = objectMapper.createArrayNode(); int[] se = new int[]{0,0}; for ( int i = 0;i < datas.size(); i++) { Map<String, Object> stringObjectMap = datas.get(i); ObjectNode objectNode = objectMapper.createObjectNode(); for (Map.Entry<String, Object> stringObjectEntry : stringObjectMap.entrySet()) { objectNode.putPOJO(stringObjectEntry.getKey().toLowerCase(),stringObjectEntry.getValue()); } objectNode.putPOJO("sjcrsj",new Date()); arrayNode.add(objectNode); if ((i+1)%5000 == 0) { sendData(client, httpPut,objectMapper.writeValueAsString(arrayNode),se); arrayNode.removeAll(); } } if (!arrayNode.isEmpty()) { sendData(client, httpPut,objectMapper.writeValueAsString(arrayNode),se); } log.info("数据总数:{},成功数:{},失败数:{}",datas.size(),se[0],se[1]); } } private void sendData(CloseableHttpClient client, HttpPut httpPut,String data,int[] se) throws IOException { StringEntity entity = new StringEntity(data, "UTF-8"); httpPut.setEntity(entity); httpPut.setHeader("label", UUID.randomUUID().toString()); try (CloseableHttpResponse response = client.execute(httpPut)) { String loadResult = ""; if (response.getEntity() != null) { loadResult = EntityUtils.toString(response.getEntity()); } final int statusCode = response.getStatusLine().getStatusCode(); if (statusCode != 200) { throw new IOException(String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult)); } JsonNode jsonNode = objectMapper.readTree(loadResult); String status = jsonNode.get("Status").asText(); if ("Fail".equals(status)) { throw new IOException("导入doris失败:"+loadResult); } else if ("Label Already Exists".equals(status)) { } int numberTotalRows = jsonNode.get("NumberTotalRows").asInt(); int numberLoadedRows = jsonNode.get("NumberLoadedRows").asInt(); int numberFilteredRows = jsonNode.get("NumberFilteredRows").asInt(); se[0] += numberLoadedRows; se[1] += numberFilteredRows; } catch (Exception e) { log.error("写入doris失败",e); } } /** * 封装认证信息 * @param username String * @param password String * @return String */ private static String basicAuthHeader(String username, String password) { final String tobeEncode = username + ":" + password; byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8)); return "Basic " + new String(encoded); } public static void main(String[] args) { System.out.println(basicAuthHeader("root", "W*!#7Z*N@sLXxs")); } public static void main1(String[] args) throws Exception { DorisStreamLoader dorisStreamLoader = new DorisStreamLoader(); dorisStreamLoader.loadUrl = "http://192.168.21.117:8030/api/sywsdb/%s/_stream_load"; dorisStreamLoader.user = "root"; dorisStreamLoader.password = "WYYTT951"; List<Map<String,Object>> maps = new ArrayList<>(); Map<String,Object> map = new HashMap<>(); map.put("clgj", ""); map.put("jyzid", "4433e13b-1bbb-11ee-ae7e-0242c0a80002"); map.put("jyzdqhdm", "652801"); map.put("jyzsl", 44.00); map.put("lx", "0"); map.put("jyzqqhdm", "652823"); map.put("jyzh", "6550351129"); map.put("dlx", "02"); map.put("dqhdm", "652823"); map.put("ddsl", 44.00); map.put("jyzlx", "1"); map.put("sm", ""); map.put("id", "4e5d72f1-a4fa-480c-85e9-40e19b517d0e"); map.put("jgryxm", "xxxxxx"); map.put("dmc", "xxxxxx"); map.put("sjcrsj", new Date(1691940454000L)); map.put("jgryid", "78e7a042-6908-45f8-999a-422ea4689b47"); map.put("rq",new Date(1691940454000L)); Map<String,Object> map1 = new HashMap<>(); map1.put("clgj", ""); map1.put("jyzid", "44335435-1bbb-11ee-ae7e-0242c0a80002"); map1.put("jyzdqhdm", "653121"); map1.put("jyzsl", 1.00); map1.put("lx", "0"); map1.put("jyzqqhdm", "653121"); map1.put("jyzh", "6550348683"); map1.put("dlx", "02"); map1.put("dqhdm", "653121"); map1.put("ddsl", 1.00); map1.put("jyzlx", "1"); map1.put("sm", ""); map1.put("id", "c2bce8d2-43f7-4c3c-9166-e7bba7ac1d2b"); map1.put("jgryxm", "xxxxxx"); map1.put("dmc", "xxxxxx"); map1.put("sjcrsj", new Date(1691940453000L)); map1.put("jgryid", "6059858a-483b-4e2b-a98f-27f423c3d886"); map1.put("rq",new Date(1688212304000L)); Map<String,Object> map2 = new HashMap<>(); map2.put("clgj", ""); map2.put("jyzid", "44340156-1bbb-11ee-ae7e-0242c0a80002"); map2.put("jyzdqhdm", "652926"); map2.put("jyzsl", 10000.00); map2.put("lx", "0"); map2.put("jyzqqhdm", "652926"); map2.put("jyzh", "6550351562"); map2.put("dlx", "02"); map2.put("dqhdm", "652926"); map2.put("ddsl", 10000.00); map2.put("jyzlx", "1"); map2.put("sm", ""); map2.put("id", "6cd3f741-b8c8-4343-a510-78a6be3b7adc"); map2.put("jgryxm", "xxxxxx"); map2.put("dmc", "xxxxxx"); map2.put("sjcrsj", new Date(1691940454000L)); map2.put("jgryid", "a56ffc0b-09d0-4e34-82e2-8457ccb0625b"); map2.put("rq",new Date(1688439693000L)); maps.add(map); maps.add(map1); maps.add(map2); dorisStreamLoader.loadJson("jyz_dd",maps,null); } }
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/1 6:20:53

测试AI驱动的聊天机器人:NLU评估指南

NLU在AI聊天机器人中的核心地位 AI驱动的聊天机器人已成为企业客服、虚拟助手等领域的标配&#xff0c;其核心能力依赖于自然语言理解&#xff08;NLU&#xff09;模块。NLU负责解析用户输入的语义&#xff0c;识别意图、抽取实体并维护对话上下文。对于软件测试从业者而言&am…

作者头像 李华
网站建设 2026/5/1 6:21:19

导师严选2026 TOP9 AI论文写作软件:自考毕业论文全攻略

导师严选2026 TOP9 AI论文写作软件&#xff1a;自考毕业论文全攻略 2026年AI论文写作工具测评&#xff1a;精准适配自考人群的高效选择 随着人工智能技术的不断进步&#xff0c;AI论文写作工具在学术领域的应用愈发广泛。对于自考学生而言&#xff0c;撰写毕业论文不仅是一项挑…

作者头像 李华
网站建设 2026/5/1 6:18:14

前端必看:dhtml.js到底是什么?还有用吗?

对于经常进行网页开发的前端开发者来说&#xff0c;dhtml.js是一个可能会遇到的术语。它并非指代一个单一的、官方的JavaScript库&#xff0c;而是一个历史性的概念&#xff0c;通常泛指用于实现动态HTML效果的客户端脚本代码集合。理解它的具体所指&#xff0c;有助于我们厘清…

作者头像 李华
网站建设 2026/5/1 6:21:07

Spring Boot 4.0 新功能全解析:Java 开发者的又一个大版本来了

Spring Boot 4.0 新功能全解析&#xff1a;Java 开发者的又一个大版本来了 各位 Java 码农小伙伴们&#xff0c;听说了嘛&#xff1f;Spring Boot 4.0 正式发布啦&#xff01;这是个超级大的版本更新&#xff0c;Spring 团队这次也是下了狠功夫&#xff0c;直接把 2024 年发布…

作者头像 李华
网站建设 2026/5/1 6:19:35

扩展运算符 vs Rest 参数:前端新人别再傻傻分不清了!

扩展运算符 vs Rest 参数&#xff1a;前端新人别再傻傻分不清了&#xff01;扩展运算符 vs Rest 参数&#xff1a;前端新人别再傻傻分不清了&#xff01;先甩一句狠话&#xff1a;... 这三个点&#xff0c;就是前端界的“薛定谔的猫”JavaScript 里的 ... 到底在搞什么鬼扩展运…

作者头像 李华