0%

ETL 第一篇

背景

最近找工作,有些大数据岗位我想投,但是奈何之前的工作内容大数据不是主业,大数据经验不够看,我最早要追溯到15年当时spark+hive,然后17年的storm+hbase,到最近的flink+ck,我觉得我努把力看能不能够一够大数据相关的岗位。

基础环境准备

把我给媳妇儿配的打LOL的电脑,偷偷拿来用一用,当成小型服务器,反正性能对LOL来说,很过剩了,不影响。

我之前鼓捣其它技术的时候就在电脑上装了虚拟机,所以也不折腾了,直接装个ubuntu,然后装个docker+docker compose,就差不多了。

docker镜像源

单独说下,因为docker默认用的国外的镜像源所以安装后几乎是不可用的,这时候需要配置国内的镜像。
要注意验证镜像源,比如通过curl等命令,看是否能正常访问是否能免验证访问,我就是被阿里云的镜像加速器耽搁了小半小时,就是按照官方的配置始终403,最后才发现,原理阿里前几个月更新了协议,大概意思是,不再支持外部直接用加速镜像,而是支持阿里云本身的产品使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 1. 验证镜像源
curl 镜像源
# 2. 添加镜像源
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": [
"https://xxxx"
]
}
EOF

# 3. 使其生效
sudo systemctl daemon-reload
sudo systemctl restart docker

# 4. 查看镜像是否修改成功
docker info

# 5. 拉取镜像验证
docker pull xxx

安装CK

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# 1. 获取ck镜像
docker pull clickhouse/clickhouse-server
# 2. 添加ck需要的目录
mkdir -p /data/clickhouse/data /data/clickhouse/config /data/clickhouse/logs

# 3. ck的配置
cat > /data/clickhouse/config/config.xml << EOF
<?xml version="1.0"?>
<yandex>
<logger>
<level>information</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
</logger>

<http_port>8123</http_port>
<tcp_port>9000</tcp_port>
<interserver_http_port>9009</interserver_http_port>

<listen_host>0.0.0.0</listen_host>

<max_connections>4096</max_connections>
<keep_alive_timeout>10</keep_alive_timeout>
<max_concurrent_queries>100</max_concurrent_queries>
<uncompressed_cache_size>8589934592</uncompressed_cache_size>
<mark_cache_size>5368709120</mark_cache_size>

<path>/var/lib/clickhouse/</path>
<tmp_path>/var/lib/clickhouse/tmp/</tmp_path>

<user_directories>
<users_xml>
<path>/etc/clickhouse-server/users.xml</path>
</users_xml>
</user_directories>

<timezone>UTC</timezone>
</yandex>
EOF

# 4. ck用户管理
cat > /data/clickhouse/config/users.xml << EOF
<?xml version="1.0"?>
<yandex>
<users>
<default>
<password>yourpassword</password>
<networks>
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>

<profiles>
<default>
<max_memory_usage>10000000000</max_memory_usage>
<use_uncompressed_cache>0</use_uncompressed_cache>
<load_balancing>random</load_balancing>
</default>
</profiles>

<quotas>
<default>
<interval>
<duration>3600</duration>
<queries>0</queries>
<errors>0</errors>
<result_rows>0</result_rows>
<read_rows>0</read_rows>
<execution_time>0</execution_time>
</interval>
</default>
</quotas>
</yandex>
EOF

# 5.运行容器

docker run -d \
--name clickhouse-server \
--ulimit nofile=262144:262144 \
-p 8123:8123 \
-p 9000:9000 \
-p 9009:9009 \
-v /data/clickhouse/data:/var/lib/clickhouse \
-v /data/clickhouse/config/config.xml:/etc/clickhouse-server/config.xml \
-v /data/clickhouse/config/users.xml:/etc/clickhouse-server/users.xml \
-v /data/clickhouse/logs:/var/log/clickhouse-server \
--restart=always \
clickhouse/clickhouse-server:latest

# 6. 测试是否可用(内部)
docker exec -it clickhouse-server clickhouse-client --password yourpassword

# 7.暴露到外部可访问,由于不想每次run都写一长串,也为了后续方便管理其它容器,把docker compose装上
apt update
apt install -y docker-compose
# 8.compose文件编写,别忘了暴露environment
nano /data/clickhouse/docker-compose.yml

version: '3'
services:
clickhouse:
image: clickhouse/clickhouse-server:latest
container_name: clickhouse-server
restart: always
ports:
- "8123:8123"
- "9000:9000"
- "9009:9009"
volumes:
- /data/clickhouse/data:/var/lib/clickhouse
- /data/clickhouse/config/config.xml:/etc/clickhouse-server/config.xml
- /data/clickhouse/config/users.xml:/etc/clickhouse-server/users.xml
- /data/clickhouse/logs:/var/log/clickhouse-server
environment:
- CLICKHOUSE_USER=default
- CLICKHOUSE_PASSWORD=xxxx
ulimits:
nofile:
soft: 262144
hard: 262144
# 删除ck容器后重启
cd /data/clickhouse
docker-compose up -d
# 9. 看是否正常返回
curl "http://xx:8123/?user=default&password=xx&query=SELECT%201"


还有待续….

参考

https://www.coderjia.cn/archives/dba3f94c-a021-468a-8ac6-e840f85867ea
https://hub.docker.com/r/clickhouse/clickhouse-server/

AI 第四篇

引言

最近准备面试嘛,看到好些JD里,特别是关于大模型的JD,里面有个技能要求Prompt Engineering。刚好我也有兴趣,平时也是claude.ai和chatgpt、deepseek的重度用户,美元都花了好些,问题问的多了,慢慢的知道问题描述的准确性与预期的答案相关性确实很大。确实感觉Prompt Engineering(提示词工程)已经成为一项重要技能。无论你是开发者、内容创作者还是普通用户,掌握这项技能都能帮助你更有效地与AI交流,获得更满意的结果。刚好借此JD机会,更深入的学习下怎么才能写好Prompt。

什么是Prompt Engineering?

基本概念解释

Prompt Engineering是指设计和优化输入到AI模型(如ChatGPT、Claude等)的提示词的过程,目的是引导AI生成更准确、更符合预期的输出内容。

简单来说,就像我们与人交流时,清晰表达自己的需求会得到更好的回应一样,与AI的交流也需要”说人话”,而Prompt Engineering就是学习如何更好地”对AI说话”的艺术。

为什么Prompt Engineering很重要?

  • 节省时间:好的提示词能直接获得理想结果,减少反复尝试的时间
  • 提高质量:精心设计的提示词能显著提升AI输出的质量和准确度
  • 解锁潜能:掌握高级技巧后,你可以让AI完成更复杂的任务

Prompt Engineering的基础知识

提示词的基本结构

一个好的提示词通常包含以下几个要素:

  1. 明确的指令:清楚地告诉AI你想要它做什么
  2. 上下文信息:提供必要的背景知识
  3. 输入数据:需要AI处理的具体内容
  4. 输出格式:期望AI如何组织和呈现结果

简单例子对比

不好的提示词

1
写一篇关于AI的科普文章

好的提示词

1
2
请写一篇800字左右的科普文章,主题是"人工智能的发展历程",适合完全不动技术的人阅读,
包含三个主要发展阶段,使用生动的比喻和案例解释专业概念,并在结尾提出对未来的展望。

提升Prompt效果的核心技巧

1. 清晰具体

越具体的提示词越能得到准确的回答。包括具体描述:

  • 所需输出的长度(字数/段落数)
  • 目标受众(专业水平/年龄段)
  • 风格(正式/轻松/创意)
  • 结构(要点/段落/表格)

例子

1
请用简单的语言向我10岁女儿解释光合作用,不超过200字,使用至少2个生活中的比喻,避免使用专业术语。

2. 提供示例(少样本学习)

通过提供几个输入-输出的示例,可以更好地引导AI理解你的期望。

例子

1
2
3
4
5
6
7
8
9
10
请按照以下格式将这些句子翻译成英文:

中文:我喜欢吃苹果。
英文:I like to eat apples.

中文:明天我要去北京旅游。
英文:I will travel to Beijing tomorrow.

中文:这本书很有趣,我想推荐给你。
英文:

3. 角色设定

让AI扮演特定角色,能使回答更符合特定专业或风格需求。

例子

1
请你扮演一位经验丰富的营销专家,分析我的产品定位问题,并提供改进建议。我的产品是一款...

4. 分步骤思考

引导AI一步步思考问题,可以获得更准确的结果,特别是对于复杂问题。

例子

1
2
3
请帮我解决这个数学问题,在回答前,请先分析问题,列出已知条件,然后逐步推导求解过程,最后给出结论。

问题:一个圆柱形水箱,底面积为3平方米,高为2米。现在水箱中有水,深度为1.5米。如果以每分钟0.1立方米的速度向水箱中注水,需要多少分钟才能将水箱装满?

5. 指定输出格式

明确要求特定的输出格式,使结果更易于使用。

例子

1
2
3
4
5
6
7
8
请分析这家公司的优势和劣势,并以下面的JSON格式输出结果:

{
"公司名称": "XX科技",
"优势": ["优势1", "优势2", "优势3"],
"劣势": ["劣势1", "劣势2", "劣势3"],
"改进建议": ["建议1", "建议2", "建议3"]
}

进阶技巧

链式思维(Chain-of-Thought)

引导AI展示其思考过程,对于复杂推理特别有效。

例子

1
问题:小明有12个苹果,他给了小红3个,又给了小李他手中苹果数量的一半,最后他还剩下多少个苹果?请一步一步地思考,解释每一步的计算过程和原因。

思维树(Tree of Thoughts)

引导AI探索多种可能性和解决方案路径。

例子

1
请用思维树的方式分析我创业的三个不同选择(开咖啡店、做在线教育、开发APP),每个选择探索三个可能的发展路径,考虑不同条件下的结果,然后总结最优选择。

自我评估和修正

让AI评估自己的输出并进行改进。

例子

1
请写一篇关于气候变化的短文,然后评估这篇文章的优缺点,并基于评估给出一个改进版本。

常见应用场景实战

内容创作

写作辅助

例子

1
请为我的科技博客生成一篇文章大纲,主题是"5G技术如何改变我们的生活"。大纲应包含引言、3-5个主要部分、每部分2-3个小节,以及结论。每个小节都需要有简短描述。

创意生成

例子

1
我正在设计一个以"海洋保护"为主题的儿童故事书。请创作5个可能的故事情节,每个情节包含主角描述、基本冲突和教育意义。

数据分析与处理

例子

1
2
我有一组销售数据,包含产品名称、月份和销售额。请帮我分析这些数据,找出销售趋势,并提出改进建议。数据如下:
[数据内容]

代码辅助

例子

1
请编写一个Python函数,用于分析文本情感倾向。函数应接受一段文本作为输入,返回积极、消极或中性的评价以及置信度分数。请包含必要的注释和简单的使用示例。

常见问题与解决方法

如何处理AI回答过于笼统或离题?

  • 解决方法:增加具体细节,使用引导性问题,明确输出要求
  • 例子:原提示”谈谈人工智能的未来”可改为”请从就业、教育和伦理三个方面,具体分析人工智能在未来10年可能带来的社会变革。每个方面请提供至少两个具体的预测和可能的应对策略。”

如何避免AI生成的内容过于冗长?

  • 解决方法:明确字数限制,要求简洁回答,指定重点内容
  • 例子:”请用不超过300字,总结量子计算的核心原理,重点解释量子比特和量子纠缠这两个概念。”

如何使AI生成更创新性的内容?

  • 解决方法:明确要求原创思路,设置情景约束,激励思维发散
  • 例子:”请提出5种从未出现过的智能家居产品创意,每种产品都需要融合至少两种现有技术,并解决一个特定的家庭难题。”

免费学习资源

  1. 免费课程与教程

    • OpenAI的Prompt Engineering指南 (官网免费提供)
    • 李宏毅教授的”Large Language Model”课程 (YouTube完整课程)
    • Khan Academy的AI基础知识 (免费教育平台)
    • Hugging Face的NLP教程 (官方文档免费)
    • Coursera上的免费AI课程 (可以免费旁听)
  2. 免费电子书与指南

    • 《Prompt Engineering Guide》by Lilian Weng (在线免费阅读)
    • Dair.ai的Prompt Engineering Guide (GitHub上免费)
    • Github上的awesome-chatgpt-prompts开源仓库
    • OpenAI官方的最佳实践指南
  3. 免费在线社区与资源

    • GitHub上的Prompt Engineering资源库
    • Reddit的r/PromptEngineering社区
    • Discord的公开AI社区讨论组
  4. 免费网站

结语

Prompt Engineering不仅是一项技术技能,更是一门艺术。通过不断实践和调整,你会发现与AI交流的效率和质量都会显著提升。记住,最好的学习方式是实践——从今天开始尝试这些技巧,记录效果,持续改进。

面试 第一篇

源自ByteByteGo

HTTPS工作原理:三个关键步骤解

根据图中内容,HTTPS(Hypertext Transfer Protocol Secure)的工作原理可以分为三个关键步骤:

服务器证书检查(Server Certificate Check)

这是建立安全连接的第一步:

  • 客户端(浏览器)向服务器发送”HELLO”消息
  • 服务器回应”HELLO”
  • 服务器发送自己的证书给客户端
  • 客户端向证书颁发机构(CA)发送请求,确认此证书是否有效
  • CA回应”YES”,表示证书合法

这一步确保了用户正在与合法网站通信,而不是某个冒充者。证书颁发机构作为可信第三方,保证了服务器的身份。

密钥交换(Key Exchange)

验证服务器身份后,需要建立加密通信:

  • 客户端从服务器证书中提取服务器的公钥
  • 客户端创建一个会话密钥
  • 客户端告知服务器它支持的密码套件(”I know A,B,C,D cipher suites”)
  • 服务器选择一个密码套件(”OK, Let’s use C”)
  • 客户端使用服务器的公钥和选定的密码套件加密会话密钥
  • 服务器使用自己的私钥解密,获得会话密钥

此时,服务器也拥有了会话密钥,为后续加密通信做好准备。

加密通信隧道(Encrypted Tunnel)

在完成前两步后:

  • 客户端和服务器双方都拥有相同的会话密钥
  • 客户端发送的数据使用会话密钥加密
  • 服务器接收数据后使用相同的会话密钥解密
  • 服务器发送给客户端的数据也使用相同的会话密钥加密
  • 客户端接收后使用会话密钥解密

这形成了一个安全的加密通信隧道,即使数据在传输过程中被拦截,没有会话密钥的第三方也无法解密内容,保证了数据传输的安全性。

PS:从输入URL到页面显示的完整过程

  1. URL解析:浏览器解析URL,确定协议(HTTP/HTTPS)、域名和路径
  2. DNS解析:浏览器查询DNS服务器,将域名转换为IP地址
    • 首先检查浏览器缓存
    • 然后检查操作系统缓存
    • 接着查询本地DNS服务器
    • 必要时进行递归查询
  3. 建立TCP连接:浏览器与服务器建立TCP连接(三次握手)
    • 客户端发送SYN包
    • 服务器回应SYN-ACK包
    • 客户端发送ACK包
  4. TLS握手(HTTPS):如果是HTTPS,还需进行TLS握手
    • 服务器证书检查
    • 密钥交换
    • 建立加密通信隧道
  5. 发送HTTP请求:浏览器向服务器发送HTTP请求,包含请求方法、路径、头部信息等
  6. 服务器处理请求:服务器接收请求,进行相应处理,生成HTTP响应
  7. 接收HTTP响应:浏览器接收服务器返回的数据,包含状态码、响应头、响应体
  8. 解析HTML:浏览器开始解析HTML,构建DOM树
  9. 处理CSS:解析CSS,构建CSSOM树
  10. 执行JavaScript:加载并执行JavaScript代码
  11. 构建渲染树:将DOM树和CSSOM树结合,形成渲染树
  12. 布局:计算每个元素在屏幕上的精确位置和大小
  13. 绘制:将计算好的像素信息绘制到屏幕上
  14. 合成:将多个图层合成为最终显示的页面
  15. TCP连接关闭:数据传输完成后,断开TCP连接(四次挥手)
    • 客户端发送FIN包,表示客户端不再发送数据
    • 服务器发送ACK包,确认收到客户端的FIN
    • 服务器发送FIN包,表示服务器也不再发送数据
    • 客户端发送ACK包,确认收到服务器的FIN

Java开发工程师、全栈开发工程师

亲爱的招聘团队:

如果软件工程师是一道菜,那我就是那种经过12年慢火熬制的老汤底——看起来平淡无奇,但一尝就知道功夫在里头。

我的技术栈就像一个资深玩家的技能树:主技能点满了Java和React,副技能解锁了Vue、Docker、Python和Go等。在我的职业旅程中,我善于将复杂问题分解为简单模块,轻松应对各种技术挑战。但我最厉害的外挂其实是曾经当过产品助理——这让我不仅能听懂产品经理说的”简单调整”背后隐藏的36个子需求,还能在技术与业务之间自如翻译,堪称”产品语言通”。

在我的12年职业生涯中,我从”这bug在本地没问题啊”进化到了”这需求有啥实际意义”再到”好的,我来搞定!”。带团队的经历让我明白,比起Debug代码,Debug人际关系才是真正的高难度挑战。所幸,我在这两方面都交出了不错的成绩单。

相信我的技术能力、产品思维和团队协作经验能为您的团队带来实质性的贡献。代码之外,我能够搭建开发者与产品、业务之间的桥梁,确保我们不只是在开发功能,而是在创造价值。我们一定能擦出技术的火花——毕竟,一个能理解产品、带过团队、写了12年代码还没秃顶的工程师,不是每天都能遇到的。

代码问候,
[软件开发特种兵]

联系方式:
电话:[18515068121]
邮箱:[gamehu@yeah.net]
Wechat:[GamehuDB]

P.S. 我的GitHub贡献图可能不够绿,但我的生产环境代码从不让服务器变红。

Java Development Engineer/Full Stack Development Engineer

Dear Hiring Team:

If software engineers were dishes, I’d be that slow-simmered stock that’s been cooking for 12 years — looking unassuming, but one taste reveals the expertise within.

My tech stack resembles a veteran player’s skill tree: maxed-out primary skills in Java and React, with unlocked secondary abilities in Vue, Docker, Python, Go, and more. Throughout my professional journey, I’ve honed the ability to break complex problems into simple modules, easily tackling various technical challenges. But my most powerful perk comes from my experience as a product assistant — I can decode the 36 hidden sub-requirements behind a product manager’s “simple adjustment” and fluently translate between technical and business languages, making me a true “product whisperer.”

During my 12-year career, I’ve evolved from “but the bug doesn’t appear on my local machine” to “what’s the actual purpose of this requirement” to “I’ll handle it!” My team leadership experience taught me that debugging human relationships is far more challenging than debugging code. Fortunately, I’ve managed to achieve good results in both areas.

I believe my technical abilities, product mindset, and team collaboration experience will bring substantial value to your team. Beyond coding, I can build bridges between developers, product teams, and business units, ensuring we’re not just developing features but creating value. We’ll definitely create technical sparks together — after all, an engineer who understands products, has led teams, and has written code for 12 years without going bald isn’t someone you meet every day.

Code regards,
[Software Development Special Forces]

Contact Information:
twitter:[Gamehu520]
email:[gamehu@yeah.net]
Wechat:[GamehuDB]

P.S. My GitHub contribution graph might not be very green, but my production code never turns servers red.

AI 第三篇

背景

知道MCP还是源于因为之前验证大模型集成时了解到的。因为当时后续计划做AI应用,增加saas平台的噱头的同时成为一个亮点功能,提升用户体验。

基础概念与架构设计

MCP(Model Context Protocol)是一个开放协议,它标准化了应用程序如何向大语言模型(LLMs)提供上下文。可以将MCP比作AI应用的USB-C接口。正如USB-C提供了一种标准化的方式,将你的设备连接到各种外围设备和配件,MCP 也提供了一种标准化的方式,将AI模型连接到不同的数据源和工具。

一、协议本质解构

MCP是一种客户端-服务器架构的数据访问协议,专为AI应用(如大语言模型)设计,其核心是为AI应用提供一种标准化方式来安全访问多种数据源。

关键组件

  1. MCP 主机:运行AI应用并发起数据请求,如聊天应用或IDE
  2. MCP 客户端:处理与 MCP 服务器的通信
  3. MCP 服务器:连接到各种数据源的轻量级程序
  4. 大型语言模型(LLM):分析问题并选择回答的 AI 模型
  5. 数据源:包括数据库、外部 API 等

MCP采用简洁的JSON格式进行通信,主要支持两类基本操作:

  • 发现操作:客户端识别服务器提供的能力
  • 执行操作:请求服务器执行特定工具来访问数据

二、核心作用剖析

  1. 访问标准化

    • 为不同类型的数据源提供统一的访问接口
    • 客户端无需了解每个数据源的具体访问细节
  2. 安全控制

    • 服务器明确声明其访问能力和权限范围
    • 支持基本的认证和授权机制
    • 客户端可以限制服务器的访问范围
  3. 工具扩展性

    • 服务器可以动态注册和提供各种工具
    • 客户端可以发现并使用这些工具
    • 支持从简单的文件读取到复杂的API调用等多种操作

三、基本工作流程

MCP的典型工作流程如下:内部实现可能会有多次循环

alt text
大体流程如下:

  1. 用户向 MCP 主机(如聊天应用或 IDE)提出问题
  2. 主机将问题发送给大型语言模型(LLM)进行分析
  3. LLM 确定需要使用哪些工具来回答问题
  4. 主机通过 MCP 客户端请求执行相应工具
  5. MCP 客户端向不同的 MCP 服务器发送工具执行请求
  6. MCP 服务器访问相应的数据源(数据库或外部 API)
  7. 数据源返回结果给 MCP 服务器,再传回客户端
  8. MCP 客户端汇总工具执行结果并返回给主机
  9. 主机将工具结果发送给 LLM 生成最终回答
  10. 最终回答显示给用户

四、现实应用场景

MCP适用于以下典型场景:

  1. 增强型AI聊天应用

    • 让聊天机器人能够访问用户本地文件和数据库
    • 使AI可以获取并引用真实、最新的信息
  2. 智能开发工具

    • IDE中的代码助手可以访问项目代码文件
    • 辅助工具可以查询API文档和相关资源
  3. 企业AI集成

    • 让AI应用安全地访问企业内部数据
    • 在保护敏感信息的同时提供个性化服务

五、协议现状与局限

当前MCP协议的特点与局限:

  1. 简洁性优先

    • 协议设计相对简单,专注于解决基本的数据访问问题
    • 尚未包含复杂的加密、动态路由等高级功能
  2. 开发阶段

    • 协议仍在发展中,标准可能会随时间演进
    • 生态系统正在逐步构建
  3. 基础功能聚焦

    • 当前主要聚焦于基础的数据访问能力
    • 缺乏高级的事务处理、分布式一致性等特性

结论

MCP代表了AI工具与数据源之间交互的一个重要标准化尝试。它为构建能够访问和利用各种数据的AI应用提供了基础架构,虽然相对简单,但解决了AI应用难以安全访问多样化数据的关键问题。随着协议的发展,MCP有潜力成为AI应用与数据源之间交互的重要标准,类似于HTTP对于web应用的意义。

但是目前MCP仍处于相对早期阶段,其真正的潜力和影响力将随着更多实现和应用的出现而逐步显现。

参考

https://www.anthropic.com/news/model-context-protocol
https://github.com/modelcontextprotocol
https://modelcontextprotocol.io/introduction
https://www.youtube.com/watch?v=sahuZMMXNpI
https://www.youtube.com/watch?v=eur8dUO9mvE
https://www.youtube.com/watch?v=kQmXtrmQ5Zg&t=2s

离职系列 第N篇
离职前一天,想想简历咋写,弄个排版出来,后续好造着整理下简历。纯属个人意见。我先自己试试,不好用再改。

我的观点

我觉得简历的本质是为了筛选而不是为了深入了解你。所以我认为简历:

  1. 首先得清爽。
  2. 然后得简明扼要。

不用写太多同时又能体现关键信息,就跟咱们做程序一样,设计时重点之一就是数据得便于各场景使用,便于使用很大的一个方面就是数据能各种过滤和组合,通常是现有简明的入口,如果要了解细节就得下钻,可能是一层或多层才能看透数据。那简历就像入口,如果对方有兴趣才会下钻,所以不应该想着一个简历就把自己交代的底裤都没有,一方面是内容太多不容易抓到重点,另一方面是太细了搞得人都没欲望深入探讨,咋约你面试呢?

所以简历得像咱们对待产品需求一样,你得解决需求场景同时兼顾一些扩展性。抽象出来一个模板适配通用场景,然后可根据具体特殊场景,再保证真实的前提下做一些微调,对其JD中的要求。


抽象了一个通用模板如下:

基本信息

  • 求职意向:技术负责人/技术专家
  • 工作年限:8年+
  • 学历:xx
  • 电话:xx
  • 期望薪资:xx

专业技能

技术栈

  • 后端:Java、Spring Boot、Spring Cloud、MySQL、Redis、消息队列
  • 前端:React、TypeScript、Ant Design、Redux、Webpack
  • DevOps:Docker、Jenkins、Git、Jira
  • 架构:微服务架构、前后端分离、分布式系统设计

管理能力

  • 团队管理:带领3-6人团队,完成项目全周期开发
  • 技术规划:制定技术方案,把控技术方向,推动技术创新
  • 敏捷实践:推行敏捷开发,提升团队效能
  • 人才培养:建立技术培训体系,提升团队技术能力

工作经历

XX公司(2021-至今)

职位:技术负责人

负责工作:

  1. 带领5-6人团队完成大型LLM应用平台开发,实现从0到1

    • 设计并实现基于微服务架构的系统框架
    • 优化系统性能,提升用户体验
    • 建立代码规范和技术文档体系
    • 系统月活用户达到10w+,支持高并发访问
  2. 技术架构升级与优化

    • 推动系统微服务化改造,提升系统可扩展性
    • 实现核心模块性能优化,接口响应时间提升50%
    • 建立监控告警体系,提高系统稳定性

XX公司(2019-2021)

职位:Web前端负责人

负责工作:

  1. 带领3-4人前端团队完成企业级SaaS平台开发

    • 基于React技术栈搭建前端框架
    • 实现组件库设计与开发
    • 推动前端工程化建设
    • 平台服务企业客户100+
  2. 技术改进与创新

    • 建立前端性能监控体系
    • 推动前端自动化测试实践
    • 优化构建流程,部署时间缩短60%

项目经验

xx平台(2022-2023)

  • 项目规模:5-6人团队,服务10w+用户
  • 技术架构:Spring Cloud + React + MySQL + Redis
  • 主要职责:
    • 负责整体技术方案设计
    • 核心功能开发与性能优化
    • 带领团队完成开发任务
  • 项目成就:
    • 系统支持高并发访问,峰值QPS 5000+
    • 用户响应时间<500ms
    • 系统可用性达99.9%

xx企业级SaaS平台(2019-2021)

  • 项目规模:前端3-4人团队
  • 技术架构:React + TypeScript + Ant Design
  • 主要职责:
    • 前端架构设计与实现
    • 团队管理与技术指导
    • 核心功能开发
  • 项目成就:
    • 平台月活用户5w+
    • 前端性能提升40%
    • 客户满意度95%+

教育背景

  • XX大学 计算机科学与技术 本科

个人评价

  • 扎实的技术功底,丰富的项目经验
  • 优秀的团队管理能力和沟通协调能力
  • 具备较强的技术视野和架构设计能力
  • 持续学习,保持对新技术的敏感度

离职系列 第十二篇
离职系列,想想这几年在公司的成长,在这做个记录。这篇是关于AI功能方案验证。

背景

落地事业部上一年关于AI的创新奖的方案,更合适的叫法应该是交接,之前的团队几乎停留在理论上和一个demo上,因为久久没有效果,可能感受到了上级的压力,需要看到实际效果,最终找到了我们团队,希望能借助我们团队让其产生价值(官方说法),我理解实际就接这个摊子。好在年前我自己捣鼓过AI知识库AI应用-知识库,所以不怯场。

方案介绍

因为在这之前就是一个demo,具体前面的团队也说不出来个所以然,所以我们直接说说两边对话后的方案走向,希望做一个功能:容量预测,意思是根据服务器的多个指标历史数据,预估服务器未来的负载情况,从而给予客户参考或预案。

我试着画下大体的方案:

架构图:

alt text

时序图:

alt text

大体流程

  1. 调度器触发flink批任务从ClickHouse获取原始数据
  2. Flink进行基本的数据清洗和标准化
  3. 处理后的数据存入CK
  4. python预测模型从CK获取数据
  5. 预测模型生成预测结果
  6. 阈值分析器识别潜在瓶颈
  7. python大模型提供深度解释和建议
  8. 生成预测报告和告警

学习链接

ollama

ollama is an open-source tool that simplifies running large language models locally on your personal computer
https://www.youtube.com/watch?v=GWB9ApTPTv4&t=171s

离职系列 第十一篇
离职系列,想想这几年在公司的成长,在这做个记录。这篇是关于维护的一个旧功能“全部告警跟踪”。

背景

每个租户有自己的告警数据,少则几千多则几十万条数据,云平台提供了一个功能叫“全部告警跟踪”,该功能顾名思义,会展示所有租户的所有告警信息(刷新那一刻是实时的),还能支持过滤、搜索等操作,这功能据说上线没多久就有问题,比如点分页时不时会出现超时。但是因为这功能用的人非常少,且只有管理员才有权限,也就一直放着。
但是新版需求要求解决这个问题,因为现在是我维护这个功能,所以需要我先出个技术方案。

解法设计输出模板

  1. 解法设计的模板很多,但是我感觉稍微有点重,当前产品的节奏,没有那么多的时间和人力给我做那么详细的解法设计,所以简单梳理了一个简化版的解法设计,并与干系人达成了一致。

  2. 模板如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29

    1. 引言
    - 背景说明
    - 问题陈述(现状、目标)
    - 关键术语
    - 参考资料

    2. 需求分析
    - 核心诉求/期望交付的价值
    - 非功要求
    3. 约束条件
    - 依赖项
    - 假设项

    4. 方案设计
    - 可选方案对比(2-3个)
    * 方案描述
    * 优缺点分析
    * 非功表现
    - 推荐方案详细说明
    * 架构设计
    * 核心流程
    * 关键设计点、算法伪代码(如果有必要)

    5. 实施评估(因为团队自己做实施,所以加上这一章)
    - 影响范围
    - 实施成本
    - 后续影响

  3. 要分清楚解法设计和详细设计的核心区别:

    1. 解法设计:回答”用什么方案解决问题”
      • 关注整体思路
      • 多个方案对比选择
      • 架构层面决策
    2. 详细设计:回答”如何具体实现这个方案”
      • 已选定方案的具体技术实现细节
      • 编码层面设计

开始

在这儿我就不原方不动的把整个解法贴出来了,只捡几个重点说。

需求分析

一定要记住,虽然咱们是干技术的,但是做解法的时候,一定先不要直接从技术的角度思考,先从业务的角度,还原业务场景,以及可能的演进需求,做到扩展性。

  1. 年少不懂事的时候,干过一段时间的产品助理,当时就学会做需求分析的几把斧:

    tips:

    1. 搞清楚买单的人和使用的人谁?分别想解决什么问题,特别是买单的人容易被忽视。(使用方再满意,买单的人不满意也是白搭)
    2. 维护好与需求调研对象的关系(人情世故)
    3. 5W1H方法做需求分析和挖掘(找出底层需求,避免浮于表面文字)
    4. KANO方法对需求分级(找出痛点先解决,其它的都是锦上添花)

这儿的原始需求是管理员能对所有租户的告警跟踪查看,关注其下团队成员所负责的租户的处理情况,对工作进度有了解,同时可以随时查看核心客户的数据。
这样几句简单的话,应用5W1H+KANO拆解下:

  1. 5W1H分析:

    1. WHO(谁)

      • 主体:管理员
      • 关注对象:团队成员、租户
    2. WHAT(什么)

      • 查看所有租户的告警跟踪情况
      • 了解团队成员的工作进度
      • 查看核心客户数据
    3. WHEN(什么时候)

      • 随时(需要实时或准实时的数据)
      • 告警发生后的跟踪过程中
    4. WHERE(在哪里)

      • 系统内
    5. WHY(为什么),更深入可以加入5Why方法,探寻源需求。

      • 监督团队工作情况
      • 及时了解核心客户状况
      • 确保告警得到及时处理
  2. HOW(怎么做)

    • 提供告警跟踪查看、筛选功能
    • 展示团队成员负责的租户处理进度
    • 支持核心客户数据快速查看
  3. KANO模型分析:

    1. 基本型需求(Must-be):

      • 查看所有租户的告警记录
      • 查看告警处理状态
    2. 期望型需求(Performance):

      • 团队成员工作进度追踪
      • 核心客户数据查看
    3. 兴奋型需求(Delighter):

      • 数据分析和统计

这里能得到几个关键信息:

  1. 依然需要在活的实时的数据(需求已经明确)
  2. 需要搜索、分页、筛选(大数据量的场景)
  3. 后续很有可能需要统计数据(要考虑数据聚合)
  4. 非功
    1. 1000+租户,每个租户50w的告警,10s内刷出数据。
    2. 经费有限,且重新申请流程慢,额度小。

方案

  1. 方案1:ShardingSphere 自身实现。
    广播表是ShardingSphere中的一个概念,指的是在所有分片中存在的表,每个分片都有完整的副本。当更新广播表时,所有分片都会同步更新。通常用于数据量不大且需要频繁关联查询的表,比如字典表。
    1. 优点:简单,不用引入任何其他组件。
    2. 缺点:
      1. 数据量太大,无法在每个分片都复制全量数据。
  2. 方案2:ClickHouse(开源版)+Flink CDC
    1. 优点:
      1. CK在已在多个产品运用,学习成本较低。
      2. 可以支持复杂的查询、聚合需求。
      3. 适合离线分析。
      4. 单表查询性能极强。
    2. 缺点:
      1. 不支持事务。
      2. 集群部署成本高(官方没有提供Helm Chart。且ClickHouse集群扩展不方便,很多手动处理,不适合弹性扩展,集成k8s较难)。
      3. 删除/更新性能差,更适合批量追加。告警数据会经常变更,可能存在性能问题。
      4. 手动管理分片、分区、MergeTree等,维护成本较高。
  3. 方案3:Doris+Flink CDC
    1. 优点:

      1. 实时性高、支持高并发。
      2. 可以支持复杂的查询需求、聚合需求。
      3. 集群部署成本低(Doris,官方提供了Helm Chart,且适合弹性扩展,运维压力小)。
      4. 自动话程度高(分片、负载均衡、存储管理等)
      5. SQL友好
      6. 存算分离
    2. 缺点:

      1. 引入Doris新组件,可能会增加采购成本。
      2. 复杂的模糊搜索可能无法实现。
  4. 方案4:ES+Flink CDC
    1. 优点:

      1. 近实时,可能有秒级延迟。
      2. 可以支持复杂的查询需求(特别是全文检索)。
      3. 集群部署成本低(官方有Helm Chart和Operator,且适合弹性扩展,可无缝集成k8s,运维压力小)
    2. 缺点:

      1. 不支持事务
      2. 引入ES新组件,可能会增加较大采购成本(ES需要较多内存和SSD磁盘)。
      3. 很多时候需要手动处理,比如分片分步、设计索引、索引优化、GC 调优等,维护成本较高。
      4. 使用DSL,不是标准 SQL,学习成本较高。

推荐方案2

原因:

  1. 在活告警数据量可控,暂不考虑扩展。
  2. 系统已接入了CK,最低成本(学习、部署、购买)。
时序图

alt text

关键验证点

1、2验证点,由于前期已经做过验证,着重验证3、4就行,特别是更新和删除数据。

验证结果

按500个租户,每个租户5000在活告警,没问题,因为主要是验证可行性,没有那么严格的压测,图啥的当时就没留了。这块详设的时候会更具体严格一些。

离职系列 第十篇
离职系列,想想这几年在公司的成长,在这做个记录。这篇是关于线上的bug。

背景

其实说来这个问题,跟之前的遇见连接超时有个遗留项也有一些关系,因为报错的源头,也是是数据库连接关闭,与上一次仅仅是我那块出问题不同的是,这次是大批量的租户多种任务都失败,飞书告警消息都把我弹麻了。

问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

2024-10-23 17:00:10,177 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:631 - 客户:2xx319,告警数据处理异常
org.springframework.jdbc.UncategorizedSQLException:
2024-10-23 17:00:10,176 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:511 - 客户:2xx319,集成平台巡检数据处理异常
024-10-23 17:00:10,175 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:547 - 客户:2xx319,服务状态数据处理异常
org.springframework.jdbc.UncategorizedSQLException:
... 35 common frames omitted
2024-10-23 17:00:10,174 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:582 - 客户:2xx319,心跳数据处理异常
... 82 common frames omitted
org.springframework.jdbc.UncategorizedSQLException:
### Cause: java.sql.SQLException: Connection is closed
; uncategorized SQLException; SQL state [null]; error code [0]; Connection is closed
at org.mybatis.spring.MyBatisExceptionTranslator.translateExceptionIfPossible(MyBatisExceptionTranslator.java:93)
at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:439)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.sql.SQLException: Connection is closed


这次定位很快,具体定位的就不再赘述,出了问题后,我想了想有两个明确的因素:

  1. 上次类似的错误就发现了,连接池设置存在问题。
    1. 再次检查,当前没有慢sql,所以初步判断是连接池问题。
  2. 新上线了策略功能,策略把之前定时默认执行的任务,可更改为每个租户下每种类型单独的执行时间和周期。
    1. 怀疑存在了N个客户N个任务都在同一时间点执行的问题,导致连接池耗尽。

处理

  1. 根据预留的后门,手动把核心任务给生成了,让线上能正常处理。
  2. 因为之前已知了引入ShardingSphere后同时引入了HikariCP连接池,现在只留HikariCP连接池,并对参数进行调优。
    1. 以下是同事调优后的参数:超时时间以及连接池大小都对应阿里云购买的高性能PG做了对应的调整。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      master0:
      dataSourceClassName: com.zaxxer.hikari.HikariDataSource
      driverClassName: org.postgresql.Driver
      jdbcUrl: jdbc:postgresql://xx.aliyuncs.com:xx/xx
      username: xxx
      password: xxx
      connectionTimeout: 60000
      idleTimeout: 600000
      maxLifetime: 3600000
      maximumPoolSize: 200
      minimumIdle: 1
      poolName: business-data-master0

  3. 临时的先让cron表达式有一定的偏移量比如
    1. {% codeblock %}
      
                     return timeList.stream()
                         .map(time -> {
                             String[] timeParts = parseTime(time);
                             // TODO 临时解法:为每个cron添加随机偏移( 0~3分钟)
                             int minuteOffset = ThreadLocalRandom.current().nextInt(4); // 生成 0~3 的随机数
                             int minute = (Integer.parseInt(timeParts[1]) + minuteOffset) % 60; // 防止超出 59 分钟
                             return "0 " + minute + " " + timeParts[0] + " * * ? ";
                         })
                         .collect(Collectors.toList());
                      private static String[] parseTime(String time) {
                          return time.split(":"); // 格式为 "HH:mm"
                      }
                  }
      
         {% endcodeblock %}
      

2、3做完之后,腾出缓冲时间着手长期解了,需要重新做下解法设计,以适配高并发的场景。

解法设计1.0

具体的解法设计咋做,可看下之前的遇见多表查询,这儿就直接给出一些结论:

  1. 任务错峰(随机延迟)
  2. 任务限流(线程池 + 队列)
  3. 任务优先级机制(先执行核心任务)

UML:

alt text

流程图:

alt text

时序图:

alt text

关键伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363

// 任务优先级定义
private enum TaskPriority {
HIGH(0),
MEDIUM(5),
LOW(10);

private final int value;

TaskPriority(int value) {
this.value = value;
}

public int getValue() {
return value;
}
}

/**
* manage-biz Powerjob 调度类
* 优化版本 - 任务削峰与队列管理
*
* @author hht
* @since 2024-09-10
*/
@Component(value = "manageBizPowerjobDispatcher")
@Slf4j
@RequiredArgsConstructor
public class ManageBizPowerjobDispatcher {
private final IXxxScheduleService XxxScheduleService;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

/** 平安通告powerjob任务id */
public static final String TASK_SAFETY_NOTICE_ID = "generateXxx";

public static final String SUCCESS = "success";

// 配置参数,可从配置文件注入
@Value("${powerjob.task.max-concurrent:10}")
private int maxConcurrentTasks;

@Value("${powerjob.task.queue-capacity:500}")
private int queueCapacity;

@Value("${powerjob.task.max-delay-minutes:5}")
private int maxDelayMinutes;

@Value("${powerjob.task.worker-threads:20}")
private int workerThreads;



// 延迟任务定义
@Data
private static class DelayedTask implements Delayed {
private final Runnable task;
private final long executeTime;
private final String taskId;
private final String jobParams;

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

}

// 优先级任务定义
@Data
private static class PriorityTask implements Comparable<PriorityTask> {
private final Runnable task;
private final TaskPriority priority;
private final String taskId;
private final String jobParams;
private final long createTime;

@Override
public int compareTo(PriorityTask other) {
// 先按优先级排序,再按创建时间排序
int priorityCompare = Integer.compare(priority.getValue(), other.priority.getValue());
if (priorityCompare != 0) {
return priorityCompare;
}
return Long.compare(createTime, other.createTime);
}
}

/**
* 1.单独的线程,负责从队列中获取任务并分发
* 2.协调延迟队列和优先级队列
* 3.控制任务的并发执行数量
*/
private class TaskDispatcher implements Runnable {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
// 先检查延迟队列
DelayedTask delayedTask = delayedTaskQueue.poll();
if (delayedTask != null) {
// 将任务添加到优先级队列
submitToPriorityQueue(delayedTask.getTask(), TaskPriority.HIGH, delayedTask.getTaskId(), delayedTask.getJobParams());
continue;
}

// 从优先级队列取任务执行
PriorityTask priorityTask = priorityTaskQueue.take();
if (priorityTask != null) {
try {
// 获取信号量,控制并发
taskSemaphore.acquire();

// 记录任务开始执行
activeTaskCount.incrementAndGet();
taskExecutionCount.computeIfAbsent(priorityTask.getTaskId(), k -> new AtomicInteger(0)).incrementAndGet();

// 提交到线程池执行
executorService.submit(() -> {
try {
log.info("执行任务: {}, 参数: {}", priorityTask.getTaskId(), priorityTask.getJobParams());
priorityTask.getTask().run();
} catch (Exception e) {
log.error("任务执行异常: {}", priorityTask.getTaskId(), e);
} finally {
// 释放信号量
taskSemaphore.release();
// 更新计数器
activeTaskCount.decrementAndGet();
AtomicInteger counter = taskExecutionCount.get(priorityTask.getTaskId());
if (counter != null) {
counter.decrementAndGet();
}
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("任务分发器异常", e);
}
}
}
}

// 任务队列和执行器
private DelayQueue<DelayedTask> delayedTaskQueue;
private PriorityBlockingQueue<PriorityTask> priorityTaskQueue;
private ExecutorService executorService;
private ExecutorService dispatcherService;
private Semaphore taskSemaphore;
private Random random;

// 任务执行状态监控
private AtomicLong totalTasksReceived = new AtomicLong(0);
private AtomicLong totalTasksExecuted = new AtomicLong(0);
private AtomicInteger activeTaskCount = new AtomicInteger(0);
private Map<String, AtomicInteger> taskExecutionCount = new ConcurrentHashMap<>();

@PostConstruct
public void init() {
// 初始化任务队列
delayedTaskQueue = new DelayQueue<>();
priorityTaskQueue = new PriorityBlockingQueue<>(queueCapacity);

// 初始化线程池
executorService = Executors.newFixedThreadPool(workerThreads, new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "task-worker-" + counter.getAndIncrement());
thread.setDaemon(true);
return thread;
}
});

// 初始化分发器线程,
dispatcherService = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r, "task-dispatcher");
thread.setDaemon(true);
return thread;
});

// 初始化信号量
taskSemaphore = new Semaphore(maxConcurrentTasks);

// 初始化随机数生成器
random = new Random();

// 启动任务分发线程
dispatcherService.submit(new TaskDispatcher());

log.info("任务调度器初始化完成,最大并发任务数: {}, 队列容量: {}, 最大延迟分钟数: {}, 工作线程数: {}",
maxConcurrentTasks, queueCapacity, maxDelayMinutes, workerThreads);
}

@PreDestroy
public void shutdown() {
// 关闭调度器
if (dispatcherService != null) {
dispatcherService.shutdownNow();
}

// 关闭执行器
if (executorService != null) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}

log.info("任务调度器已关闭,总接收任务数: {}, 总执行任务数: {}",
totalTasksReceived.get(), totalTasksExecuted.get());
}

/**
* 提交任务到延迟队列
*/
private void submitToDelayQueue(Runnable task, String taskId, String jobParams) {
// 随机延迟时间,在0到maxDelayMinutes分钟之间
long delayMs = random.nextInt((int) TimeUnit.MINUTES.toMillis(maxDelayMinutes));
DelayedTask delayedTask = new DelayedTask(task, delayMs, taskId, jobParams);
delayedTaskQueue.offer(delayedTask);
totalTasksReceived.incrementAndGet();

log.info("任务已提交到延迟队列: {}, 延迟: {}ms", taskId, delayMs);
}

/**
* 提交任务到优先级队列
*/
private void submitToPriorityQueue(Runnable task, TaskPriority priority, String taskId, String jobParams) {
PriorityTask priorityTask = new PriorityTask(task, priority, taskId, jobParams);
priorityTaskQueue.offer(priorityTask);

log.info("任务已提交到优先级队列: {}, 优先级: {}", taskId, priority);
}

/**
* 获取任务类型对应的优先级
*/
private TaskPriority getTaskPriority(String taskId) {
switch (taskId) {
case TASK_SAFETY_NOTICE_ID:
case TASK_PUSH_SERVICE_STATUS_ID:
return TaskPriority.HIGH;
case TASK_TENANT_SERVICE_PHASE_ID:
case TASK_WEEKLY_SUMMARY:
return TaskPriority.MEDIUM;
default:
return TaskPriority.LOW;
}
}

/**
* 创建可执行的任务
*/
private Runnable createExecutableTask(String taskId, String jobParams, TaskContext taskContext) {
switch (taskId) {
case TASK_SAFETY_NOTICE_ID:
return () -> generateXxxTask(taskContext);
case TASK_OTHER:
return () -> generateOtherTask(taskContext);
...
default:
throw new IllegalArgumentException("未知的任务类型: " + taskId);
}
}

/**
* 通用任务提交方法
*/
private ProcessResult submitTask(String taskId, TaskContext taskContext) {
try {
totalTasksReceived.incrementAndGet();

// 检查任务执行情况,如果已有大量相同类型任务,加入延迟队列
int activeCount = taskExecutionCount.computeIfAbsent(taskId, k -> new AtomicInteger(0)).get();
if (activeCount > maxConcurrentTasks / 2) {
log.warn("当前任务类型 {} 正在执行的数量较多: {}, 将使用延迟队列分散负载", taskId, activeCount);
submitToDelayQueue(createExecutableTask(taskId, taskContext.getJobParams(), taskContext), taskId, taskContext.getJobParams());
} else {
// 根据任务类型分配优先级
TaskPriority priority = getTaskPriority(taskId);
submitToPriorityQueue(createExecutableTask(taskId, taskContext.getJobParams(), taskContext), priority, taskId, taskContext.getJobParams());
}

return new ProcessResult(true, formatResponse(SUCCESS, taskId));
} catch (Exception e) {
log.error("提交任务异常: {}", taskId, e);
return new ProcessResult(false, formatResponse(e.getMessage(), taskId));
}
}

// ====== 以下是原始的PowerJob任务处理方法,改为使用队列系统 ======

/**
* 告警任务
*/
@PowerJobHandler(name = TASK_OTHER)
public ProcessResult generateOtherTask(TaskContext taskContext) {
log.info("==================== 调度触发(其它任务) ======================");
return submitTask(TASK_OTHER, taskContext);
}

private ProcessResult generateOtherTask(TaskContext taskContext) {
try {
StrategyJobParams jobParams = new StrategyJobParams();
if (StringUtils.hasLength(taskContext.getJobParams())) {
jobParams = OBJECT_MAPPER.readValue(taskContext.getJobParams(), StrategyJobParams.class);
}
strategyScheduleService.generateTask(jobParams);
totalTasksExecuted.incrementAndGet();
return new ProcessResult(true, formatResponse(SUCCESS, TASK_OTHER));
} catch (Exception e) {
log.error("调度执行【其它任务】异常", e);
return new ProcessResult(false, formatResponse(e.getMessage(), TASK_OTHER));
}
}


/**
* 生成平安通告
*/
@PowerJobHandler(name = TASK_SAFETY_NOTICE_ID)
public ProcessResult generateXxx(TaskContext taskContext) {
log.info("==================== 调度触发(平安通告) ======================");
return submitTask(TASK_SAFETY_NOTICE_ID, taskContext);
}

private ProcessResult generateXxxTask(TaskContext taskContext) {
try {
// 获取调度任务的参数
StrategyJobParams jobParams = null;
if (StringUtils.hasLength(taskContext.getJobParams())) {
jobParams = OBJECT_MAPPER.readValue(taskContext.getJobParams(), StrategyJobParams.class);
}
// 生成平安通告
XxxScheduleService.autoGenerateBatch(jobParams);
totalTasksExecuted.incrementAndGet();
return new ProcessResult(true, formatResponse("success", TASK_SAFETY_NOTICE_ID));
} catch (Exception e) {
log.error("调度执行【平安通告】异常", e);
return new ProcessResult(false, formatResponse(e.getMessage(), TASK_SAFETY_NOTICE_ID));
}
}

private String formatResponse(String info, String id) {
return String.format("{\"taskId\": \"%s\", \"info\": \"%s\"}", id, info);
}
}


解法设计2.0

主要是解决一些异常场景,比如:

  1. 服务异常重启,任务丢了?
  2. 信号量获取阻塞,所有任务堆积?
  3. 发生异常,及时感知等

这一步还停留在设计阶段,也可能是我设计并落地,也先做个记录。

解法:

  1. Redis代替内存队列,开启持久话,便于启动后恢复。
  2. 核心业务单独维护信号量
  3. 设置拒绝策略,当队列超过阈值直接异常返回给powerjob
    1. 同时发送告警
  4. 适当的动态调整信号量

离职系列 第九篇
离职系列,想想这几年在公司的成长,在这做个记录。这篇是遇到的一个比较典型的线上问题。

问题现象

写了一个每天执行两次的定时任务,该任务会分批对线上所有几百个租户生成《平安通告》,上线1个多月后突然手机收到告警,某几个用户生成失败。

  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
2024-10-17 17:00:10,125 [safetyNoticeGenerator8] ERROR [com.alibaba.druid.pool.DruidDataSource] DruidDataSource.java:1988 - {conn-110021} discard
org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:395)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:498)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:415)
at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:190)
at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:177)
at com.alibaba.druid.pool.DruidPooledPreparedStatement.execute(DruidPooledPreparedStatement.java:483)
at org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement$2.executeSQL(ShardingSpherePreparedStatement.java:439)
at org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement$2.executeSQL(ShardingSpherePreparedStatement.java:435)
at org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback.execute(JDBCExecutorCallback.java:95)
at org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback.execute(JDBCExecutorCallback.java:75)
at org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine.syncExecute(ExecutorEngine.java:135)
at org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine.serialExecute(ExecutorEngine.java:121)
at org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine.execute(ExecutorEngine.java:115)
at org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor.execute(JDBCExecutor.java:65)
at org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor.execute(JDBCExecutor.java:49)
at org.apache.shardingsphere.driver.executor.DriverJDBCExecutor.doExecute(DriverJDBCExecutor.java:156)
at org.apache.shardingsphere.driver.executor.DriverJDBCExecutor.execute(DriverJDBCExecutor.java:145)
at org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement.execute(ShardingSpherePreparedStatement.java:402)
at com.zaxxer.hikari.pool.ProxyPreparedStatement.execute(ProxyPreparedStatement.java:44)
at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.execute(HikariProxyPreparedStatement.java)
at org.apache.ibatis.executor.statement.PreparedStatementHandler.query(PreparedStatementHandler.java:65)
at org.apache.ibatis.executor.statement.RoutingStatementHandler.query(RoutingStatementHandler.java:80)
at jdk.internal.reflect.GeneratedMethodAccessor49.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.apache.ibatis.plugin.Plugin.invoke(Plugin.java:61)
at jdk.proxy2/jdk.proxy2.$Proxy207.query(Unknown Source)
at org.apache.ibatis.executor.SimpleExecutor.doQuery(SimpleExecutor.java:65)
at org.apache.ibatis.executor.BaseExecutor.queryFromDatabase(BaseExecutor.java:333)
at org.apache.ibatis.executor.BaseExecutor.query(BaseExecutor.java:158)
at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:110)
at com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor.intercept(MybatisPlusInterceptor.java:81)
at org.apache.ibatis.plugin.Plugin.invoke(Plugin.java:59)
at jdk.proxy2/jdk.proxy2.$Proxy206.query(Unknown Source)
at jdk.internal.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.apache.ibatis.plugin.Invocation.proceed(Invocation.java:49)
at com.github.yulichang.interceptor.MPJInterceptor.intercept(MPJInterceptor.java:76)
at org.apache.ibatis.plugin.Plugin.invoke(Plugin.java:59)
at jdk.proxy2/jdk.proxy2.$Proxy206.query(Unknown Source)
at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:154)
at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:147)
at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:142)
at jdk.internal.reflect.GeneratedMethodAccessor109.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:425)
at jdk.proxy2/jdk.proxy2.$Proxy189.selectList(Unknown Source)
at org.mybatis.spring.SqlSessionTemplate.selectList(SqlSessionTemplate.java:224)
at com.baomidou.mybatisplus.core.override.MybatisMapperMethod.executeForMany(MybatisMapperMethod.java:166)
at com.baomidou.mybatisplus.core.override.MybatisMapperMethod.execute(MybatisMapperMethod.java:77)
at com.baomidou.mybatisplus.core.override.MybatisMapperProxy$PlainMethodInvoker.invoke(MybatisMapperProxy.java:152)
at com.baomidou.mybatisplus.core.override.MybatisMapperProxy.invoke(MybatisMapperProxy.java:89)
at jdk.proxy2/jdk.proxy2.$Proxy197.findDistinctCiIdsByIdentityId(Unknown Source)
at com.xxx.xxxcloud.manage.service.safety.notice.impl.xxServiceImpl.addAlertData(xxServiceImpl.java:612)
at com.xxx.xxxcloud.manage.service.safety.notice.impl.xxServiceImpl.lambda$setJsonField$5(xxServiceImpl.java:368)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at com.xxx.xxxcloud.manage.service.safety.notice.impl.xxServiceImpl.setJsonField(xxServiceImpl.java:346)
at com.xxx.xxxcloud.manage.service.safety.notice.impl.xxServiceImpl.batchCreate(xxServiceImpl.java:201)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:751)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:117)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:391)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:751)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:703)
at com.xxx.xxxcloud.manage.service.safety.notice.impl.xxServiceImpl$$SpringCGLIB$$0.batchCreate(<generated>)
at com.xxx.xxxcloud.manage.service.safety.notice.impl.SafetyNoticeScheduleServiceImpl.processBatch(SafetyNoticeScheduleServiceImpl.java:194)
at com.xxx.xxxcloud.manage.service.safety.notice.impl.SafetyNoticeScheduleServiceImpl.lambda$processBatchesInThreadPool$0(SafetyNoticeScheduleServiceImpl.java:112)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.net.SocketTimeoutException: Read timed out
at java.base/sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:288)
at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:314)
at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:355)
at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:808)
at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
at java.base/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:484)
at java.base/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:478)
at java.base/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70)
at java.base/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1465)
at java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:1069)
at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:161)
at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:128)
at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:113)
at org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:73)
at org.postgresql.core.PGStream.receiveChar(PGStream.java:465)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2155)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:368)
... 82 common frames omitted
org.springframework.jdbc.UncategorizedSQLException:
### Error querying database. Cause: java.sql.SQLException: Connection is closed
### The error may exist in class path resource [mapper/TbxxHealthCheckResultMapper.xml]
### The error may involve com.xxx.xxxcloud.manage.mapper.xx.TbxxHealthCheckResultMapper.selectExecuteLatest
### The error occurred while executing a query
### Cause: java.sql.SQLException: Connection is closed
; uncategorized SQLException; SQL state [null]; error code [0]; Connection is closed
at org.mybatis.spring.MyBatisExceptionTranslator.translateExceptionIfPossible(MyBatisExceptionTranslator.java:93)
at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:439)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.sql.SQLException: Connection is closed
at com.zaxxer.hikari.pool.ProxyConnection$ClosedConnection.lambda$getClosedConnection$0(ProxyConnection.java:502)
at jdk.proxy3/jdk.proxy3.$Proxy173.prepareStatement(Unknown Source)
at com.zaxxer.hikari.pool.ProxyConnection.prepareStatement(ProxyConnection.java:327)
at com.zaxxer.hikari.pool.HikariProxyConnection.prepareStatement(HikariProxyConnection.java)

从上面有几个关键信息梳理下异常调用链:

得出几个重要信息:

  1. 异常发生的业务代码是在批量处理”xx报告”时,查询告警处
  2. ShardingSphere下的连接池hikari抛出了异常Connection is closed
  3. Druid连接池也抛出了异常{conn-xxx} discard
  4. 底层SocketTimeoutException,表明是客户端等待数据库服务器的响应超时(初步判断为慢sql)

所以我开始从以下几个方面排查:

  1. 查业务代码的变更,为什么之前好好的跑了一个多月,突然出问题。
  2. 检查数据库连接参数设置
  3. 评估查询的数据量是否过大
  4. 看下HikariCP和Druid是否有啥联系以及为啥会有两个连接池?
  5. 查看数据库负载情况

处理过程

第一步:摸索

怀疑一切,切忌先入为主。

  1. 想办法重现问题
    1. 提前发通知,下午7点以后会操作线上系统。通常6点半以后几乎就没人用系统了。
    2. 因为设计该功能时,留了补偿手动,可手动重新触发报告生成。
    3. 反复重试了几次,问题未复现。
  2. 查看业务代码变更。
    1. 报错处业务代码owner是我,没做任何更改,所以变成重点关注addAlertData方法,也就是与告警相关(重点在数据量)
    2. 业务没变更但是加入了ShardingSphere(异常中也有这块的信息,先存疑)
  3. 检查系统连接池参数
    1. druid,几乎都没有做定制,都是使用的默认值。
      1. 使用默认值其实存在风险,应该根据业务调整一些参数,因为买的阿里的pg所以咨询了他们拿到了一份他们暴露的参数调优参考,后续可针对性修改)
    2. 新增的ShardingSphere的HikariCP也是使用的默认值。(异常中也有这块的信息,先存疑)
  4. 观察显示的数据库负载情况(阿里云的监控看板、pg的pg_stat_activity等视图、数据库日志等)
    1. 从视图发现确实存在执行时间较长的几条sqlsql,虽然有些慢但是不足以触发异常。慢的原因初步判断为数据量过大(报错的租户行数都在50w左右)
    2. 记录下这个sql,拿去控制台执行,EXPLAIN ANALYZE该sql,发现Seq Scan除了时间较长以外还存在索引问题,几乎每次查询都要用到的“告警状态”字段之前没加索引。(可能是原因之一)
    3. 查看数据库日志如下,这表明数据库和客户端的连接中断确实是问题的根源,可能是因为网络问题、数据库负载过高、或者连接超时等因素导致的。

第二步:验证

验证怀疑的所有点,通常控制变量法进行验证。

因为暂时没有复现,不着急先按兵不动。

  1. 前提是要有补偿方案,不能阻断线上使用,特别是这种核心业务。因为当时留了后门可以手动触发某个租户所以没问题。
  2. 虽然是线上故障同时也算严重bug,但是也不要着急,胡乱改一通,可能按下葫芦又起瓢,尽可能的找到根因,哪怕不能一次性修复。

等待了两天发现同样的问题又发生了。

  1. 但是这次发现了上一次遗漏的一个信息,报错的租户从日志看时间,异常都是在10s以后抛出的。(初步判断是SocketTimeout的超时时间,可能是10s)
  2. 拿到同样报错的sql去执行发现虽然跟之前没啥差别,问题还是那些问题,也没超过10s。(怀疑是因为报错时候执行了sql,触发了pg的缓存,所以再去查缓存生效,导致执行时间变短)

验证第一个问题:

因为当前看存在有两个连接池,查HikariCP与Druid的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// HikariCP只有connectionTimeout为30s
static {
CONNECTION_TIMEOUT = TimeUnit.SECONDS.toMillis(30L);
VALIDATION_TIMEOUT = TimeUnit.SECONDS.toMillis(5L);
SOFT_TIMEOUT_FLOOR = Long.getLong("com.zaxxer.hikari.timeoutMs.floor", 250L);
IDLE_TIMEOUT = TimeUnit.MINUTES.toMillis(10L);
MAX_LIFETIME = TimeUnit.MINUTES.toMillis(30L);
unitTest = false;
}

// DruidDataSource初始化,socketTimeout是10s
public void init() throws SQLException {
if (!this.inited) {
DruidDriver.getInstance();
ReentrantLock lock = this.lock;

try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new SQLException("interrupt", e);
}

boolean init = false;

if (this.connectTimeout == 0) {
this.connectTimeout = 10000;
}

if (this.socketTimeout == 0) {
this.socketTimeout = 10000;
}

先不管HikariCP,至少能确定Druid的10s超时确实存在,因为阿里云上我开起了pg的慢sql监控,根据时间刚好查到了确实存在一条告警sql查询执行时间为10s+。

  1. 所以初步判断SocketTimeoutException是因为sql执行时间超过了连接池的默认超时。
  2. 去查了下该租户下告警的数据总数已经超过了50W,在活5000左右。

第三步:修复方案

方案如下:

  1. 加索引,对常用的字段“告警状态”添加索引。
  2. 对在活告警查询的地方分批
  3. 对在活主告警限制查询的条数而不是查所有。

第四步:方案执行

  1. 对所有配置分表的表,检查索引,添加必要的索引。

    1.   -- ========================================
        -- 描述: 创建告警表的“status”字段索引
        -- 文件名: 001_create_alert_status_index.sql
        -- 作者: hht
        -- 创建日期: 2024-10-31
        -- ========================================
      
        DO $$
        DECLARE
            i INTEGER;
        BEGIN
            -- 遍历 tb_xx_alert_0 ~ tb_xx_alert_15
            FOR i IN 0..15 LOOP
                -- 动态生成 ALTER TABLE 语句,添加字段
                EXECUTE FORMAT('
                    ALTER TABLE public.tb_xx_alert_%s 
                    CREATE INDEX tb_xx_alert_%s_status_index ON tb_xx_alert_%s  (status);
                ', i);
            END LOOP;
        END $$;
      
  2. 告警分批且限制条数

    1. 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      private List<AlertVO> findSubscribedAlerts(AlertSubscribedParam param) {
      try {
      List<List<String>> parts = PartsListUtil.getParts(param.getSubscribedCiIds(), BATCH_SIZE);
      List<AlertVO> all = new ArrayList<>();
      for (List<String> part : parts) {
      // 分批查询
      param.setSubscribedCiIds(part);
      // 主告警不超过100条
      Page<AlertVO> page = new Page<>(1, 100 - all.size());
      IPage<AlertVO> result = tbXxAlertMapper.findSubscribedAlerts(page, param);
      if (result.getTotal() == 0) {
      continue;
      }
      // 聚合告警的子告警数据补充
      getAlertAggChildren(param.getIdentityId(), result.getRecords());
      all.addAll(result.getRecords());
      // 如果已经达到100,退出循环
      if (all.size() == 100) {
      break;
      }
      }
      // 最后统一按createTime降序排序
      all.sort((a1, a2) -> Long.compare(a2.getCreateTime(), a1.getCreateTime()));
      return all;
      } catch (Exception e) {
      String errorMessage =
      String.format("查询关注告警失败: identityId=%s, deal=%s", param.getIdentityId(), param.isDeal());
      log.error(errorMessage, param.getIdentityId(), e);
      throw new SafetyNoticeException(errorMessage, e);
      }
      }

第五步:监控效果

  1. 上线后,连续一周到点蹲守
    1. SELECT pg_stat_reset(); – 重置所有统计信息
    2. 数据库负载看板以及pg的pg_stat_activity。
      1. 之前的sql执行时间没有再超过2s
      2. 看板上慢sql也没有在发现
      3. 数据库日志也没有再出现异常
    3. pg_stat_user_indexes+EXPLAIN ANALYZE 对应sql,查看索引使用情况。
  2. 业务功能正常。

看上去目前超时的问题暂时解决,但是要想更彻底的解,还需要后续对遗留项逐个解决。

遗留项/改进项

  1. 连接池的参数调优,虽然默认的看上去没啥问题,但是迟早肯定会出问题,记一个DFX。
  2. 多了一个HikariCP连接池,而且通过jconsole看了下,两个连接池都会初始化,这块是否有必要有两个连接池,这儿存在隐患,对ShardingSphere需要深入了解下,记一个DFX。
  3. 历史数据的清理,跟PO提出,需要加一个需求不仅仅是告警,可能还有其他数据。
  4. 对于核心且经常更新的表是否需要定时REINDEX