Open WenUI 在 Huggingface 的安装与配置
Open WenUI 在 Huggingface 的安装与配置
记录在 Huggingface 安装自动备份 Open WebUI 的过程及 Open WebUI 的相关配置。
Huggingface 空间配置:
参考教程:抱脸部署OpenWebUI教程(Lite版) - 开发调优 / 开发调优, Lv1 - LINUX DO
空间文件配置
1 | space_name/ |
Dockerfile
1
2
3
4
5
6FROM ghcr.io/open-webui/open-webui:main
COPY sync_data.sh sync_data.sh
RUN chmod -R 777 ./data && \
sed -i "1r sync_data.sh" ./start.shREADME.md
1
2
3
4
5
6
7title: Open WebUI
emoji: 📉
colorFrom: indigo
colorTo: yellow
sdk: docker
pinned: false
app_port: 8080sync_data.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
mkdir -p ./data
# 生成校验和文件
generate_sum() {
local file=$1
local sum_file=$2
sha256sum "$file" > "$sum_file"
}
# 优先从WebDAV恢复数据
if [ ! -z "$WEBDAV_URL" ] && [ ! -z "$WEBDAV_USERNAME" ] && [ ! -z "$WEBDAV_PASSWORD" ]; then
echo "尝试从WebDAV恢复数据..."
curl -L --fail --user "$WEBDAV_USERNAME:$WEBDAV_PASSWORD" "$WEBDAV_URL/webui.db" -o "./data/webui.db" && {
echo "从WebDAV恢复数据成功"
} || {
if [ ! -z "$G_NAME" ] && [ ! -z "$G_TOKEN" ]; then
echo "从WebDAV恢复失败,尝试从GitHub恢复..."
REPO_URL="https://${G_TOKEN}@github.com/${G_NAME}.git"
git clone "$REPO_URL" ./data/temp && {
if [ -f ./data/temp/webui.db ]; then
mv ./data/temp/webui.db ./data/webui.db
echo "从GitHub仓库恢复成功"
rm -rf ./data/temp
else
echo "GitHub仓库中未找到webui.db"
rm -rf ./data/temp
fi
}
else
echo "WebDAV恢复失败,且未配置GitHub"
fi
}
else
echo "未配置WebDAV,跳过数据恢复"
fi
# 同步函数
sync_data() {
while true; do
echo "开始同步..."
HOUR=$(date +%H)
if [ -f "./data/webui.db" ]; then
# 生成新的校验和文件
generate_sum "./data/webui.db" "./data/webui.db.sha256.new"
# 检查文件是否变化
if [ ! -f "./data/webui.db.sha256" ] || ! cmp -s "./data/webui.db.sha256.new" "./data/webui.db.sha256"; then
echo "检测到文件变化,开始同步..."
mv "./data/webui.db.sha256.new" "./data/webui.db.sha256"
# 同步到WebDAV
if [ ! -z "$WEBDAV_URL" ] && [ ! -z "$WEBDAV_USERNAME" ] && [ ! -z "$WEBDAV_PASSWORD" ]; then
echo "同步到WebDAV..."
# 上传数据文件
curl -L -T "./data/webui.db" --user "$WEBDAV_USERNAME:$WEBDAV_PASSWORD" "$WEBDAV_URL/webui.db" && {
echo "WebDAV更新成功"
# 每日备份(包括WebDAV和GitHub),在每天0点进行
if [ "$HOUR" = "00" ]; then
echo "开始每日备份..."
# 获取前一天的日期
YESTERDAY=$(date -d "yesterday" '+%Y%m%d')
FILENAME_DAILY="webui_${YESTERDAY}.db"
# WebDAV每日备份
curl -L -T "./data/webui.db" --user "$WEBDAV_USERNAME:$WEBDAV_PASSWORD" "$WEBDAV_URL/$FILENAME_DAILY" && {
echo "WebDAV日期备份成功: $FILENAME_DAILY"
# GitHub每日备份
if [ ! -z "$G_NAME" ] && [ ! -z "$G_TOKEN" ]; then
echo "开始GitHub每日备份..."
REPO_URL="https://${G_TOKEN}@github.com/${G_NAME}.git"
git clone "$REPO_URL" ./data/temp || {
echo "GitHub克隆失败"
rm -rf ./data/temp
}
if [ -d "./data/temp" ]; then
cd ./data/temp
git config user.name "AutoSync Bot"
git config user.email "autosync@bot.com"
git checkout main || git checkout master
cp ../webui.db ./webui.db
if [[ -n $(git status -s) ]]; then
git add webui.db
git commit -m "Auto sync webui.db for ${YESTERDAY}"
git push origin HEAD && {
echo "GitHub推送成功"
} || echo "GitHub推送失败"
else
echo "GitHub: 无数据变化"
fi
cd ../..
rm -rf ./data/temp
fi
fi
} || echo "WebDAV日期备份失败"
fi
} || {
echo "WebDAV上传失败,重试..."
sleep 10
curl -L -T "./data/webui.db" --user "$WEBDAV_USERNAME:$WEBDAV_PASSWORD" "$WEBDAV_URL/webui.db" || {
echo "WebDAV重试失败"
}
}
fi
else
echo "文件未发生变化,跳过同步"
rm -f "./data/webui.db.sha256.new"
fi
else
echo "未找到webui.db,跳过同步"
fi
echo "当前时间: $(date '+%Y-%m-%d %H:%M:%S')"
echo "下次同步: $(date -d '+5 minutes' '+%Y-%m-%d %H:%M:%S')"
sleep 300
done
}
# 启动同步进程
sync_data &
环境变量配置
公共变量
1
2
3
4
5
6
7UID=1000
GID=1000
ENABLE_AUTH=true
WEBUI_AUTH=true
ENABLE_OLLAMA_API=false
TZ=Asia/Shanghai
ENABLE_REALTIME_CHAT_SAVE=false私有变量
1
2
3
4
5WEBDAV_URL=https://alist.com/dav # WebDAV 的地址
WEBDAV_USERNAME=user # WebDAV 的用户名
WEBDAV_PASSWORD=password # WebDAV 的密码
G_NAME=name/repository # GitHub 仓库名称,可选
G_TOKEN=ghp_xxx # GitHub 具有仓库读写权限的令牌,可选
基于 filen 的 WebDAV
参考教程:终章:在huggingface上部署filen-webdav(已挂,serv00还可以用) - 开发调优 - LINUX DO
Filen 地址(注册有免费存储容量):Filen – Next Generation End-To-End Encrypted Cloud Storage
Huggingface 空间配置:
1 | space_name/ |
Dockerfile
1
2
3
4
5
6
7
8
9
10
11
12
13FROM node:lts-alpine
WORKDIR /app
COPY . .
ENV TZ=Asia/Shanghai
ENV PORT=1900
RUN npm install @filen/webdav@latest
ENTRYPOINT ["node", "start.mjs"]README.md
1
2
3
4
5
6
7title: Filen Webdav
emoji: ⚡
colorFrom: pink
colorTo: purple
sdk: docker
pinned: false
app_port: 1900start.mjs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20// Proxy mode, multi user
import { WebDAVServer } from "@filen/webdav"
const hostname = "0.0.0.0"
const port = process.env.PORT || 1900
const https = false
const server = new WebDAVServer({
hostname,
port,
https,
// Omit the user object
authMode: "basic" // Only basic auth is supported in proxy mode
})
await server.start()
console.log(
`WebDAV server started on ${https ? "https" : "http"}://${hostname === "127.0.0.1" ? "local.webdav.filen.io" : hostname}:${port}`
)
挂载时对应的环境变量:
1 | WEBDAV_URL="https://xxx.hf.space" # 空间地址 |
Open WebUI 配置
设置相关
- 联网搜索:Searxng 查询 URL:
https://gitdeem-searxng.hf.space/search?q=<query> - 语音:语音转文本引擎:
https://api.siliconflow.cn/v1,语音转文本模型:FunAudioLLM/SenseVoiceSmall,网页API文本转语音音色Microsoft Yunxi Online (Natural) - Chinese (Mainland)
Prompt 提示词
Claude SVG 设计师
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102# SVG Visualization Generation Expert
You are an expert SVG visualization generator, specialized in creating detailed, balanced, and informative visual representations. You excel at transforming complex data and concepts into clear, engaging SVG visualizations.
## Role & Capabilities
- Create precise and visually appealing SVG visualizations
- Transform complex data into clear visual representations
- Ensure accessibility and readability in all visualizations
- Maintain consistent visual hierarchy and design principles
- Optimize SVG code for performance and compatibility
## Process Flow
### 1. REQUIREMENT ANALYSIS
Before generating any visualization, analyze the request by considering:
DATA ASPECTS:
- Quantitative values and their ranges
- Categorical information
- Time-series components
- Relationships and hierarchies
- Missing or implied information
CONTEXTUAL ASPECTS:
- Primary purpose of the visualization
- Target audience and their needs
- Required level of detail
- Key insights to highlight
- Context and domain-specific requirements
### 2. VISUALIZATION DESIGN
CHART SELECTION:
- Choose the most appropriate visualization type based on:
* Data characteristics (continuous, discrete, categorical, etc.)
* Relationship types (comparison, distribution, composition, etc.)
* Number of variables and their relationships
* Desired message and insight focus
VISUAL ELEMENTS:
- Layout and composition
* Implement clear visual hierarchy
* Ensure balanced element distribution
* Maintain appropriate whitespace
- Color scheme
* Use accessible color combinations
* Apply consistent color meaning
* Consider color blindness accessibility
- Typography
* Select readable fonts
* Use appropriate text sizes
* Implement clear text hierarchy
### 3. SVG IMPLEMENTATION
TECHNICAL SPECIFICATIONS:
- Viewport and viewBox settings
- Responsive design considerations
- Element positioning and scaling
- Optimization for different screen sizes
ELEMENTS UTILIZATION:
- Basic shapes: rect, circle, ellipse, line
- Advanced paths: path, polyline, polygon
- Text elements: text, tspan
- Groups and transformations: g, transform
- Styling: fill, stroke, opacity
- Reusable components: defs, use
- Custom markers and patterns
### 4. QUALITY ASSURANCE
Verify the following aspects:
TECHNICAL VALIDATION:
- SVG syntax correctness
- Element alignment and positioning
- Responsive behavior
- Browser compatibility
VISUAL VERIFICATION:
- Color contrast and accessibility
- Text readability
- Element spacing and alignment
- Overall visual balance
CONTENT ACCURACY:
- Data representation accuracy
- Label correctness
- Scale accuracy
- Legend completeness
### 5. OUTPUT DELIVERY
Provide the following:
1. Complete SVG code with:
- Clear structure and organization
- Meaningful element IDs and classes
- Appropriate viewBox settings
- Optimized code
2. Implementation notes (if relevant):
- Usage instructions
- Browser compatibility notes
- Scaling considerations
- Interactive features (if any)
## Response Format
Your response should follow this structure:
\
[Detailed analysis of the visualization requirements]
```svg
[Complete SVG code]
```
[Any relevant notes about usage or implementation]
\
Remember to:
- Prioritize clarity and accessibility
- Maintain consistency in design choices
- Consider scalability and responsiveness
- Optimize for different viewing contexts
- Follow SVG best practices
- Follow the language of the userChatGPT 思考引导
1
2
3
4
5
6
7
8
9在**回答之前**或是**反思之后**,输出完整的Chain-of-Thought,也就是透过一步步的思考、推导最后得到答案。Chain-of-Thought开头请先以Markdown三级标题输出"Chain-of-Thought",然后Chain-of-Thought的每一行请都以Markdown引用符号">"开头。
当产生初步答案后,则输出完整的反思过程,你将扮演冷酷客观的第三者来一步一步地审核、反思之前的答案,并思考有无其他可能或改进之处,反思开头请先以Markdown三级标题输出"Rethink",然后反思过程每一行都要用markdown引用符号">>"开头。
反思过程结束后,启动下一轮的Chain-of-Thought->产出答案->反思过程,一直到产出答案没有问题为止。
所有**涉及数据的引用**绝对禁止凭记忆回答或是随机杜撰,都必须先上网查寻,确认并附上引用来源资料。
为了便于编写,这段提示采用中文写作。如果用户使用其他语言提问,请仍然遵循用户的语言。
Function 函数
Flux 文生图
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309"""
title: Flux 文生图
author: linux.do
author_url: https://linux.do/u/Yuccc/summary
funding_url: https://linux.do/t/topic/278703
version: 0.1
"""
import asyncio
import random
import re
from typing import Callable, Awaitable, Any, Optional
import aiohttp
from pydantic import BaseModel, Field
class AIOutput(BaseModel):
success: bool
prompt: str
width: int
height: int
reason: Optional[str] = None
seed: int = Field(default=-1)
class Filter:
# 使用您提供的正则表达式,转换为 Python 格式
JSON_REGEX = re.compile(r"\{(?:\\.|[^\\}])*}")
class Valves(BaseModel):
priority: int = Field(default=0, description="用于过滤操作的优先级别。")
Siliconflow_Base_URL: str = Field(
default="https://api.siliconflow.cn",
description="Siliconflow API的基础URL。(例如:https://api.siliconflow.cn)",
)
Siliconflow_API_KEY: str = Field(
default="",
description="Siliconflow API的API密钥。",
)
max_retries: int = Field(
default=3,
description="HTTP请求的最大重试次数。",
)
num_inference_steps: int = Field(
default=20,
description="执行的推理步骤数。(1-100)",
)
model_name: str = Field(
default="black-forest-labs/FLUX.1-schnell",
description="用于生成图像的模型名称。",
)
# 新增的超分开关
enable_super_resolution: bool = Field(
default=True, description="是否启用图片超分辨率增强。"
)
# 新增的超分API URL
Super_Resolution_URL: str = Field(
default="https://api.pearktrue.cn/api/imagedistinct/",
description="图片超分辨率API的URL。",
)
def __init__(self):
self.valves = self.Valves()
def remove_markdown_images(content: str) -> str:
# 根据需要调整,确保保留JSON格式
return re.sub(r"!\[.*?\]\([^)]*\)", "", content)
async def inlet(
self,
body: dict,
__event_emitter__: Callable[[Any], Awaitable[None]],
__user__: Optional[dict] = None,
__model__: Optional[dict] = None,
) -> dict:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "✨正在飞速生成提示词中,请耐心等待...",
"done": False,
},
}
)
for i, msg in enumerate(body["messages"]):
body["messages"][i]["content"] = self.remove_markdown_images(msg["content"])
return body
async def text_to_image(
self, prompt: str, image_size: str, seed: int, __user__: Optional[dict] = None
):
import time
url = f"{self.valves.Siliconflow_Base_URL}/v1/images/generations"
payload = {
"model": self.valves.model_name, # 使用配置中的模型名称
"prompt": prompt,
"image_size": image_size,
"seed": seed,
"num_inference_steps": self.valves.num_inference_steps, # 保持推理步数
}
headers = {
"authorization": f"Bearer {random.choice([key for key in self.valves.Siliconflow_API_KEY.split(',') if key])}",
"accept": "application/json",
"content-type": "application/json",
}
start_time = time.time()
async with aiohttp.ClientSession() as session:
for attempt in range(self.valves.max_retries):
try:
async with session.post(
url, json=payload, headers=headers
) as response:
response.raise_for_status()
response_data = await response.json()
generation_time = time.time() - start_time
response_data["generation_time"] = generation_time
return response_data
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt == self.valves.max_retries - 1:
return {"error": str(e)}
async def super_resolution_image(self, image_url: str):
"""对图片进行超分辨率处理"""
import time
async with aiohttp.ClientSession() as session:
payload = {"file": image_url}
start_time = time.time()
try:
async with session.post(
self.valves.Super_Resolution_URL,
json=payload,
headers={"Content-Type": "application/json"},
) as response:
response.raise_for_status()
result = await response.json()
super_resolution_time = time.time() - start_time
if result.get("code") == 200 and "image_url" in result:
return {
"image_url": result["image_url"],
"super_resolution_time": super_resolution_time,
}
else:
print(f"Super resolution failed: {result}")
return {
"image_url": image_url,
"super_resolution_time": super_resolution_time,
}
except Exception as e:
print(f"Super resolution error: {e}")
super_resolution_time = time.time() - start_time
return {
"image_url": image_url,
"super_resolution_time": super_resolution_time,
}
async def generate_single_image(
self, ai_output: AIOutput, __user__: Optional[dict] = None
):
image_size = f"{ai_output.width}x{ai_output.height}"
if ai_output.seed == -1:
ai_output.seed = random.randint(0, 9999999999)
seed = ai_output.seed
result = await self.text_to_image(ai_output.prompt, image_size, seed, __user__)
if isinstance(result, dict) and "error" in result:
error_message = result["error"]
raise Exception(f"Siliconflow API Error: {error_message}")
return result
async def outlet(
self,
body: dict,
__event_emitter__: Callable[[Any], Awaitable[None]],
__user__: Optional[dict] = None,
__model__: Optional[dict] = None,
) -> dict:
if "messages" in body and body["messages"] and __user__ and "id" in __user__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "🚀正在火速生成图片中,请耐心等待...",
"done": False,
},
}
)
messages = body["messages"]
if messages:
ai_output_content = messages[-1].get("content", "")
match = self.JSON_REGEX.search(ai_output_content)
if not match:
raise ValueError("未在消息内容中找到有效的AI输出JSON。")
ai_output_json_str = match.group()
try:
ai_output = AIOutput.parse_raw(ai_output_json_str)
except Exception as e:
raise ValueError(f"解析AI输出JSON时出错: {e}")
if ai_output.success:
response_data = await self.generate_single_image(
ai_output, __user__
)
if response_data and "images" in response_data:
images = response_data.get("images", [])
if images:
original_image_url = images[0].get("url", "")
generation_time = response_data.get("generation_time", 0)
# 如果启用超分辨率,则进行图片超分
if self.valves.enable_super_resolution:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "🔍正在进行图片超分辨率处理...",
"done": False,
},
}
)
super_resolution_result = (
await self.super_resolution_image(
original_image_url
)
)
image_url = super_resolution_result["image_url"]
super_resolution_time = super_resolution_result[
"super_resolution_time"
]
else:
image_url = original_image_url
super_resolution_time = 0
# 更新content_lines并重新写入
content_lines = [
f"### 生成信息",
f"**提示词 (Prompt):** {ai_output.prompt}",
f"**尺寸 (Size):** {ai_output.width}x{ai_output.height}",
f"**种子 (Seed):** {ai_output.seed}",
f"**模型名称 (Model):** {self.valves.model_name}",
f"**超分辨率 (Super Resolution):** {'✅已启用' if self.valves.enable_super_resolution else '❌已禁用'}",
f"**图片生成耗时 (Generation Time):** {generation_time:.2f}s",
]
# 如果启用了超分,添加超分耗时信息
if self.valves.enable_super_resolution:
content_lines.append(
f"**超分辨率耗时 (Super Resolution Time):** {super_resolution_time:.2f}s"
)
# 添加图片部分
content_lines.extend(
[
"\n### 生成的图片",
f"",
]
)
# 如果启用了超分,添加两个下载链接
if self.valves.enable_super_resolution:
content_lines.extend(
[
f"[🖼️超分辨率图片下载链接]({image_url})",
f"[🔗原始图片下载链接]({original_image_url})",
]
)
else:
# 如果没有启用超分,只添加原始图片下载链接
content_lines.append(
f"[🔗图片下载链接]({original_image_url})"
)
body["messages"][-1]["content"] = "\n\n".join(content_lines)
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"🎉图片生成成功!"
+ f"(原始生成: {generation_time:.2f}s"
+ (
f", 超分辨率: {super_resolution_time:.2f}s"
if self.valves.enable_super_resolution
else ""
)
+ ")",
"done": True,
},
}
)
else:
raise Exception(
"Siliconflow API Error: No images found in response."
)
else:
raise Exception(f"AI Output Error: {ai_output.reason}")
return bodyDeepLX 翻译
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177"""
title: DeepLX Translate
author: mpopui
author_urls: https://github.com/mpopui
description: |
Use DeepLX translation API to translate the user's native language into the large model's native language, and then translate it back into the user's native language.
version: 1.0
licence: MIT
"""
import re
from typing import List, Optional
from pydantic import BaseModel
import requests
import logging
from open_webui.utils.misc import get_last_user_message, get_last_assistant_message
# 配置日志记录
logging.basicConfig(level=logging.WARNING) # 设置所需的日志级别
class Filter:
class Valves(BaseModel):
api_url: str = (
"https://api.deeplx.org/0leAo8E7SzGwk-6IrtwI10Y9lNoLdGbK8bxQNh7nI5g/translate" # DeepLX API的URL
)
source_user: str = "auto" # 用户输入的源语言,默认值为"auto"
target_user: str = "en" # 用户输入的目标语言,默认值为"en"
source_assistant: str = "en" # 助手输入的源语言,默认值为"en"
target_assistant: str = "zh" # 助手输入的目标语言,默认值为"zh"
def __init__(self, valves: Optional[Valves] = None) -> None:
self.valves = valves if valves else self.Valves()
self.code_blocks = [] # 存储代码块的列表
def translate(self, text: str, source: str, target: str) -> str:
# 更新API URL为DeepLX的URL
url = self.valves.api_url
# 构建请求数据
payload = {"text": text, "source_lang": source, "target_lang": target}
headers = {"Content-Type": "application/json"}
try:
# 使用POST方法发送请求
r = requests.post(
url, json=payload, headers=headers, timeout=10
) # 添加超时以提高健壮性
r.raise_for_status()
result = r.json()
translated_text = result["data"] # 提取翻译后的文本
return translated_text
except requests.exceptions.RequestException as e:
error_msg = f"翻译API错误: {str(e)}"
logging.error(error_msg)
return f"{text}\\\\n\\\\n[翻译失败: {error_msg}]"
except Exception as e:
error_msg = f"翻译过程中发生意外错误: {str(e)}"
logging.exception(error_msg) # 记录意外错误的回溯
return f"{text}\\\\n\\\\n[翻译失败: {error_msg}]"
def split_text_around_table(self, text: str) -> List[str]:
# 使用正则表达式将文本拆分为表格前的文本和表格文本
table_regex = r"((?:^.*?\\|.*?\\n)+)(?=\\n[^\\|\\s].*?\\|)"
matches = re.split(table_regex, text, flags=re.MULTILINE)
if len(matches) > 1:
return [matches[0], matches[1]]
else:
return [text, ""]
def clean_table_delimiters(self, text: str) -> str:
# 用单个短划线替换表格分隔符周围的多个空格
return re.sub(r"(\\|\\s*-+\\s*)+", lambda m: m.group(0).replace(" ", "-"), text)
async def inlet(self, body: dict, __user__: Optional[dict] = None) -> dict:
print(f"入口函数: {__name__}")
print(f"用户输入的源语言: {self.valves.source_user}")
print(f"用户输入的目标语言: {self.valves.target_user}")
print(f"助手输入的源语言: {self.valves.source_assistant}")
print(f"助手输入的目标语言: {self.valves.target_assistant}")
user_message = get_last_user_message(body["messages"])
# 查找并存储代码块
code_regex = r"```(.*?)```"
self.code_blocks = re.findall(code_regex, user_message, flags=re.DOTALL)
# 暂时用占位符替换代码块
user_message_processed = re.sub(
code_regex, "__CODE_BLOCK__", user_message, flags=re.DOTALL
)
if self.valves.source_user != self.valves.target_user:
parts = self.split_text_around_table(user_message_processed)
text_before_table, table_text = parts
translated_before_table = self.translate(
text_before_table,
self.valves.source_user,
self.valves.target_user,
)
translated_user_message = translated_before_table + table_text
translated_user_message = self.clean_table_delimiters(
translated_user_message
)
# 在翻译后的消息中还原代码块
for code in self.code_blocks:
translated_user_message = translated_user_message.replace(
"__CODE_BLOCK__", f"```{code}```", 1
)
for message in reversed(body["messages"]):
if message["role"] == "user":
if "[翻译失败:" in translated_user_message:
print(
f"翻译失败,语言对为 {self.valves.source_user} 到 {self.valves.target_user}"
)
else:
message["content"] = translated_user_message
break
return body
async def outlet(self, body: dict, __user__: Optional[dict] = None) -> dict:
print(f"出口函数: {__name__}")
print(f"用户输入的源语言: {self.valves.source_user}")
print(f"用户输入的目标语言: {self.valves.target_user}")
print(f"助手输入的源语言: {self.valves.source_assistant}")
print(f"助手输入的目标语言: {self.valves.target_assistant}")
assistant_message = get_last_assistant_message(body["messages"])
# 查找并存储代码块
code_regex = r"```(.*?)```"
self.code_blocks = re.findall(code_regex, assistant_message, flags=re.DOTALL)
# 暂时用占位符替换代码块
assistant_message_processed = re.sub(
code_regex, "__CODE_BLOCK__", assistant_message, flags=re.DOTALL
)
if self.valves.source_assistant != self.valves.target_assistant:
parts = self.split_text_around_table(assistant_message_processed)
text_before_table, table_text = parts
translated_before_table = self.translate(
text_before_table,
self.valves.source_assistant,
self.valves.target_assistant,
)
translated_assistant_message = translated_before_table + table_text
translated_assistant_message = self.clean_table_delimiters(
translated_assistant_message
)
# 在翻译后的消息中还原代码块
for code in self.code_blocks:
translated_assistant_message = translated_assistant_message.replace(
"__CODE_BLOCK__", f"```{code}```", 1
)
for message in reversed(body["messages"]):
if message["role"] == "assistant":
if "[翻译失败:" in translated_assistant_message:
print(
f"翻译失败,语言对为 {self.valves.source_assistant} 到 {self.valves.target_assistant}"
)
else:
message["content"] = translated_assistant_message
break
return body
逆向 API
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 知無涯者!
