普通视图

Received before yesterday

Python的数据结构

2026年1月1日 00:00

未完待续

【新年开篇】让我们拿出跃马扬鞭的勇气,激发万马奔腾的活力,保持马不停蹄的干劲,一起为梦想奋斗、为幸福打拼,把宏伟愿景变成美好现实。

1.概述

在Python中,有四种常见的数据结构

数据结构是否可变是否允许重复是否有序定义符号
列表(List)可变允许有序[]
元组(Tuple)不可变允许有序()
字典(Dict)可变键不允许,值允许有序{}
集合(Set)可变不允许无序{}

2.列表(List)

列表是一种有序的数据结构,元素写在[]中间,用,隔开通过下标访问。

列表创建有三种,直接创建,通过list()方法,以及推导式。

列表的特点:

  • 可以被索引(从左到右和从右到左)和切片(substring)
  • 可以使用+操作符进行拼接
  • 列表中的元素是可变的
  • 元素可以是任意类型
  • 元素允许重复

2.1 创建,索引和切片

#直接创建list1 = [1,2,3,4,5]list2 = ['abc', 2, 1.55]# 索引 => 1print(list1[0])# 第2到第4的元素,不含第4个 => [2, 3]print(list1[1:3])# 从第3个元素开始到末尾 => [3, 4, 5]print(list1[2:])# list1复制成两份拼接一起 => [1, 2, 3, 4, 5, 1, 2, 3, 4, 5]print(list1 * 2)# 拼接 => [1, 2, 3, 4, 5, 'abc', 2, 1.55]print(list1 + list2)

Python列表可以倒序索引,且元素可变

list1[-1] = 100print(list1[-1]) #100

python可以用list()方法创建一个空的集合

empty_list = list()print(empty_list)

list()方法从字符串创建数组

s = 'hello'l = list(s)print(l) #['h', 'e', 'l', 'l', 'o']

3.元组(Tuple)

4.字典(Dict)

5.集合(Set)

LangChain Tools工具使用

2025年12月24日 00:00

未完待续

关于大模型工具使用有关前置知识和原理,已经在下面文章提到:

1.概述

本文介绍基于langchain开发具有工具使用(Function calling)功能的智能体Agent

2.实现

langchain开发Agent,需要安装包

pip install langchain==1.1.2pip install langchain-openaipip install langchain-classic

实现工具方法供大模型调用,并通过函数装饰器@tools修饰工具方法

@tools常用属性

属性类型描述
name_or_callablestr | Callable名称
descriptionstr描述工具的功能,会作为上下文发送给大模型
args_schemaArgsSchema可选择性地指定参数格式
return_directbool是否直接从工具返回

/my_tools.py

from langchain.tools import toolfrom pydantic import BaseModelfrom pydantic import Fieldclass FiledInfo(BaseModel):    """    定义参数信息    """    city: str = Field(description='城市')@tool(args_schema=FiledInfo, description='根据城市名称获取温度')def tp_tool(city: str) -> int:    print('=======tp_tool=======')    if city == '北京':        return 12    elif city == '武汉':        return 23    elif city == '沈阳':        return -10    elif city == '泉州':        return 27    else:        return Noneif __name__ == '__main__':    print( tp_tool.invoke({'city': '沈阳'}) )

使用create_agent创建智能体agent,绑定模型和工具,然后调用invoke()执行

/test_tool2.py

import osfrom langchain.agents import create_agentfrom langchain.chat_models import init_chat_modelfrom my_tool import tp_toolllm = init_chat_model(    model = 'deepseek-chat',    model_provider = 'openai',    api_key = os.getenv('DSKEY'),    base_url = 'https://api.deepseek.com')# 创建 Agent,绑定tp_tool工具agent = create_agent(    llm,    tools=[tp_tool],    system_prompt="""你是一个天气查询助手""")# 执行result = agent.invoke({    "messages": [{"role": "user", "content": "泉州温度多少"}]})for msg in result['messages']:    if hasattr(msg, 'content'):        print(f"{msg.__class__.__name__}: {msg.content}")

输出结果

=======tp_tool=======HumanMessage: 泉州温度多少AIMessage: 我来帮您查询泉州的温度。ToolMessage: 27AIMessage: 根据查询结果,泉州的当前温度是**27°C**。

LangChain开篇

2025年5月24日 00:00

本系列未完待续

关于大语言模型驱动的应用程序有关前置知识,可以移步:

1.概述

LangChain(https://www.langchain.com/)是2022年10月,由哈佛大学的哈里森·蔡斯发起的一个开源框架,采用Python为主要语言编写,用于开发由大语言模型驱动的应用程序,一经推出便获得广泛支持,是最早推出,也是截止成文日期最成熟,支持场景最多的一个大模型应用框架

LangChain顾名思义,Lang指的就是大语言模型,Chain指的就是将大语言模型和各种相关的外部的组件连成一串,这个也是LangChain的核心设计思想。LangChain提供各种支持链式组装的组件,完成高级的特定任务,让复杂的逻辑变得结构化,易于组合和拓展。

LangChain提供整套大模型应用开发的工具集,支持LLM接入,Prompt对话工程构建,记忆管理,工具调用Tools,检索增强生成RAG等多种形态的应用开发。

LangChain类似Spring又分为Spring Framework,Spring Boot, Spring MVC那样,狭义上的LangChain就是LangChain本身,但广义的LangChain除了本身,还包括:LangGraph,LangSmith等组件,LangGraph在的基础上进一步封装,能够协调多个Chain,Tool,Agent完成更复杂的任务和更高级的功能。

本系列将基于Python 3.13.x + LangChain 1.1.x,通过常见形态大模型应用的例子介绍LangChain的使用

2.快速开始

虚拟环境中安装相关包,并设置好python版本

pip install langchain==1.1.2pip install langchain-openai
import sysimport langchainprint(sys.version)print(langchain.__version__)
3.13.11 (tags/v3.13.11:6278944, Dec  5 2025, 16:26:58) [MSC v.1944 64 bit (AMD64)]1.1.2

一个简单的调用大模型例子

from langchain.chat_models import init_chat_modelimport osllm = init_chat_model(    model = 'deepseek-chat',    model_provider = 'openai',    api_key = os.getenv('DSKEY'),    base_url = 'https://api.deepseek.com')print(llm.invoke('你是谁').content)

3. LangChain使用案例

序号文章名概述
1LangChain Prompt提示词工程大模型对话,会话记忆
2LangChain Tools工具使用Tools(Function calling)实现

LangChain Prompt提示词工程

2025年12月12日 00:00

本文未完待续

引言

本文基于Python 1.13.x和LangChain 1.1.2,并采用DeekSeep大模型,介绍LangChain提示词工程的实现。

pip install langchain==1.1.2pip install langchain-openai

类似的其他语言和框架的提示词工程实现案例,可以移步:

1.阻塞式对话

一个简单的对话实现

  • model = 'deepseek-chat' 大模型名称
  • model_provider = 'openai' 采用OpenAI标准
  • api_key = os.getenv('DSKEY') 从环境变量获取API_KEY
  • base_url 接口地址
from langchain.chat_models import init_chat_modelimport osmodel = init_chat_model(    model = 'deepseek-chat',    model_provider = 'openai',    api_key = os.getenv('DSKEY'),    base_url = 'https://api.deepseek.com')print(model.invoke('你是谁').content)

2.流式输出

遍历llm.stream返回的迭代器对象Iterator[AIMessageChunk],得到实时返回的输出,打印追加

from langchain.chat_models import init_chat_modelimport osllm = init_chat_model(    model = 'deepseek-chat',    model_provider = 'openai',    api_key = os.getenv('DSKEY'),    base_url = 'https://api.deepseek.com')for trunk in llm.stream('你是谁'):    print(trunk.content, end='')print('结束')

还可以每次返回和之前的返回拼接在一起

无数trunk对象通过+加在一起,底层是用重写__add__()方法运算符重载实现

from langchain.chat_models import init_chat_modelimport osllm = init_chat_model(    model = 'deepseek-chat',    model_provider = 'openai',    api_key = os.getenv('DSKEY'),    base_url = 'https://api.deepseek.com')full = Nonefor trunk in llm.stream('用一句话介绍自己'):    full = trunk if full is None else full + trunk    print(full.text)    print(full.content_blocks)print('结束')print(full.content_blocks)

运行结果:

[]你好[{'type': 'text', 'text': '你好'}]你好,[{'type': 'text', 'text': '你好,'}]你好,我是[{'type': 'text', 'text': '你好,我是'}]你好,我是Deep[{'type': 'text', 'text': '你好,我是Deep'}]你好,我是DeepSe[{'type': 'text', 'text': '你好,我是DeepSe'}]你好,我是DeepSeek[{'type': 'text', 'text': '你好,我是DeepSeek'}]你好,我是DeepSeek,[{'type': 'text', 'text': '你好,我是DeepSeek,'}]你好,我是DeepSeek,一个[{'type': 'text', 'text': '你好,我是DeepSeek,一个'}]你好,我是DeepSeek,一个由[{'type': 'text', 'text': '你好,我是DeepSeek,一个由'}]你好,我是DeepSeek,一个由深度[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度'}]你好,我是DeepSeek,一个由深度求[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求'}]你好,我是DeepSeek,一个由深度求索[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求索'}]你好,我是DeepSeek,一个由深度求索公司[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求索公司'}]你好,我是DeepSeek,一个由深度求索公司创造的[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求索公司创造的'}]你好,我是DeepSeek,一个由深度求索公司创造的AI[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求索公司创造的AI'}]你好,我是DeepSeek,一个由深度求索公司创造的AI助手[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求索公司创造的AI助手'}]你好,我是DeepSeek,一个由深度求索公司创造的AI助手,[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求索公司创造的AI助手,'}]你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于'}]你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用'}]你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情'}]你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情细腻[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情细腻'}]你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情细腻的方式[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情细腻的方式'}]你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情细腻的方式为你[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情细腻的方式为你'}]你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情细腻的方式为你提供[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情细腻的方式为你提供'}]你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情细腻的方式为你提供帮助[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情细腻的方式为你提供帮助'}]你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情细腻的方式为你提供帮助![{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情细腻的方式为你提供帮助!'}]你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情细腻的方式为你提供帮助!😊[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情细腻的方式为你提供帮助!😊'}]你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情细腻的方式为你提供帮助!😊[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情细腻的方式为你提供帮助!😊'}]你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情细腻的方式为你提供帮助!😊[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情细腻的方式为你提供帮助!😊'}]结束[{'type': 'text', 'text': '你好,我是DeepSeek,一个由深度求索公司创造的AI助手,乐于用热情细腻的方式为你提供帮助!😊'}]

还可以加上一个提示词模板PromptTemplate

要想流式,只要改成chain.stream(...)即可

import osfrom langchain.chat_models import init_chat_modelfrom langchain_core.prompts import PromptTemplatefrom langchain_core.output_parsers import StrOutputParserfrom langchain_core.runnables import RunnableSequenceprompt_template = PromptTemplate(    template='做一个关于{topic}的小诗',    input_variables=['topic'])llm = init_chat_model(    model = 'deepseek-chat',    model_provider = 'openai',    api_key = os.getenv('DSKEY'),    base_url = 'https://api.deepseek.com')parser = StrOutputParser()# 提示词=》大模型=》格式化输出chain = RunnableSequence(prompt_template, llm, parser)resp = chain.invoke({'topic': '霸道总裁爱上做保洁的我'})print(resp)

3.LCEL增强对话功能

要理解LCEL,首先要了解Runable,Runable(langchain_core.runnables.base.Runnable)是langchain中可以调用,批处理,流式输出,转换和组合的工作单元,是实现LCEL的基础,通过重写__or__()方法,实现了|运算符的重载,实现了Runable的类对象之间便可以进行一些类似linux命令中的管道(|)操作。

LCEL,全称LangChain Express Language,即LangChain表达式语言,也是LangChain官方推荐的写法,是一种从Runable而来的声明式方法,用于声明,组合和执行各种组件(模型,提示词,工具等),如果要使用LCEL,对应的组件必须实现Runable,使用LCEL创建的Runable称之为链。

例如,将刚刚提示词模板的例子用LCEL重写

import osfrom langchain.chat_models import init_chat_modelfrom langchain_core.prompts import PromptTemplatefrom langchain_core.output_parsers import StrOutputParserprompt_template = PromptTemplate(    template='做一个关于{topic}的小诗',    input_variables=['topic'])llm = init_chat_model(    model = 'deepseek-chat',    model_provider = 'openai',    api_key = os.getenv('DSKEY'),    base_url = 'https://api.deepseek.com')parser = StrOutputParser()# LCEL重写chain = prompt_template | llm | parserresp = chain.invoke({'topic': '霸道总裁爱上做保洁的我'})print(resp)

还可以自定义一个word_count(text: str) -> int函数,通过langchain的RunnableLambda对象包装,使得函数变为获得链式的执行能力的Runable对象,拼入链中,统计大模型回复的字数

import osfrom langchain.chat_models import init_chat_modelfrom langchain_core.prompts import PromptTemplatefrom langchain_core.output_parsers import StrOutputParserfrom langchain_core.runnables import RunnableLambdaprompt_template = PromptTemplate(    template='做一个关于{topic}的小诗',    input_variables=['topic'])llm = init_chat_model(    model = 'deepseek-chat',    model_provider = 'openai',    api_key = os.getenv('DSKEY'),    base_url = 'https://api.deepseek.com')parser = StrOutputParser()def word_count(text: str) -> int:    print('----------word_count---------')    return len(text)word_counter = RunnableLambda(word_count)# LCEL重写chain = prompt_template | llm | parser | word_counterresp = chain.invoke({'topic': '霸道总裁爱上做保洁的我'})print(resp)

运行:

----------word_count---------232

Python中的模块和包

2025年12月18日 00:00

未完待续

包和模块是Python语言封装功能和组织程序集的解决方案,类似Java中的package和C#中的namespace,将固定功能模块的代码聚合在一起,提高程序的复用性和可维护性。

模块和包在不同环境下位置和作用范围也不一样,本文建议和Python全局环境和虚拟环境(venv)一文搭配食用。

本文基于Python3.13

1.模块

每一个.py文件都是一个模块,每个模块中可以包含变量,函数,类等内容,模块,多用于封装固定功能的代码,每个模块都是一个工具,模块可以提升代码的可维护性和可复用性,还能避免命名冲突。

python中的模块分为三种:标准库模块,自定义模块和第三方模块

  • 标准库模块

    随着Python自带的一些模块,位于Python安装目录的\Lib下(site-packages中的除外),有些是C语言实现的,不能看到源码(Pycharm IDE会为我们准备存根文件,里面仅有注释)也叫内置模块,剩下是python实现,可见源码,叫做非内置模块,例如:copy, os, math, sys, time等都是标准库模块,其中的math, sys, time就是内置模块,copy, os就是非内置模块

    有些模块是用包进行组织的,包的概念后面会有介绍

    python提供了标准库文档用于参考:https://docs.python.org/zh-cn/3.14/py-modindex.html

  • 自定义模块

    是我们为了实现功能自己编写的模块

  • 第三方模块

    通常位于Python安装目录的\Lib\site-packages,引用别人写好的现成的功能,往往使用包来引入,通常使用pip进行管理

1.1 定义模块

模块的命名要符合标识符的命名规则,模块名(文件名)大小写敏感,最重要的是不能与标准库模块重名,否则引入时,会被与之重名的标准库模块顶替(类似Java中的双亲委派)

例如定义两个模块在根路径下,order和pay

order.py

max_amount = 5000_0000def create_order():    print('create_order')def cancel_order():    print('cancel_order')def info():    print('order info')

pay.py

timeout = 300def wechat_pay():    print('wechat_pay')def alipay_pay():    print('alipay_pay')def info():    print('pay info')

1.2 引入模块

在根目录建一个新的mytest模块,引入刚刚建的两个模块,总共有5种常见的引入方式,在不同的场景使用适合的方式进行导入。

1.2.1 import 模块名

引入模块中的全部成员,要使用模块中的成员,需要用模块名.的方式访问

import orderimport payprint(order.max_amount)order.create_order()order.cancel_order()order.info()print(pay.timeout)pay.alipay_pay()pay.wechat_pay()pay.info()

1.2.2 import 模块名 as 别名

可以为引入的模块取一个别名,通过别名.访问,但是别名需要符合标识符的命名规范

import order as oimport pay as pprint(o.max_amount)o.create_order()o.cancel_order()o.info()print(p.timeout)p.alipay_pay()p.wechat_pay()p.info()

1.2.3 from 模块名 import 具体内容1, 具体内容2 …

之前的方式都是将整个模块引入,通过from 模块名 import 具体内容,...可以将模块中的部分成员引入,并可以不需经过模块,直接调用

from order import max_amount, create_order, cancel_order, infofrom pay import timeout, wechat_pay, alipay_pay, infoprint(max_amount)print(timeout)create_order()cancel_order()alipay_pay()wechat_pay()info()

但是有一个问题,两个模块中有重名的成员时,后引入的会覆盖先引入的,例如上面程序运行结果就是:info()执行的是pay模块中的info()

50000000300create_ordercancel_orderalipay_paywechat_paypay info

1.2.4 from 模块名 import 具体内容1 as 别名1, 具体内容2 as 别名2 …

在3的基础上,通过这种方式,将重名成员设置别名,避免冲突

from order import  info as o_infofrom pay import info as p_infoo_info()p_info()

1.2.5 from 模块名 import *

⚠️这是一种不被推荐的用法

引入模块中全部成员,但是和第1种不同的是,访问成员不需要通过模块名或别名,同样会出现重名成员后者覆盖前者的情况,而且和当前模块中声明的成员也可能无形中发生冲突,同样存在按照前后顺序覆盖

timeout = 0from order import *from pay import *max_amount = 5print(timeout)print(max_amount)alipay_pay()wechat_pay()create_order()cancel_order()info()

运行结果:timeout被pay.timeout覆盖,order.max_amount也会被max_amount覆盖,info()调用的是pay模块的info()

3005alipay_paywechat_paycreate_ordercancel_orderpay info

在python中,可以通过__all__来控制from 模块名 import *引入哪些成员,且__all__仅针对from 模块名 import *的方式有效,__all__的值可以是列表或元组

例如将order.py修改成以下,被引入时,只能引入create_order, cancel_order两个函数

列表和元组中每个元素是字符串形式的属性名,不要把函数或变量等直接当成对象直接放进去

max_amount = 5000_0000def create_order():    print('create_order')def cancel_order():    print('cancel_order')def info():    print('order info')__all__ = ['create_order', 'cancel_order']

在mytest.py中再使用未引入的成员将报错

from order import *print(max_amount)create_order()cancel_order()info()
Traceback (most recent call last):  File "D:\python-lang-test\test1\mytest.py", line 60, in <module>    print(max_amount)          ^^^^^^^^^^NameError: name 'max_amount' is not definedProcess finished with exit code 1

1.3 主模块和__name__

一个python项目由诸多模块构成,如果一个模块,是直接在python解释器后直接运行的,则这个模块就是主模块,类似Java中JVM从某个类的public static void main(String[] args)方法开始执行。

比如这样运行某个python项目,mytest.py模块就是主模块

D:\python-lang-test\test1\.venv\Scripts\python.exe D:\python-lang-test\test1\mytest.py

python中有一个特殊的变量:__name__,是一个字符串类型,该变量只有在主模块中出现时,才会被python解释器赋值为一个字符串:”__main__“,如果出现在了非主模块,则会被赋值为当前模块的名

同时,python代码运行时,一旦执行了import语句,被引入的模块代码就会开始自然从上向下执行,类似浏览器中执行js代码一样

例如:

mytest.py(主模块)

import sonprint('主模块执行-开始')print(__name__)son.fun()

son.py

print('son模块执行-开始')def fun():    print(__name__)

运行结果就是上面说的那样:son模块print先执行了,然后主模块print后执行,而且__name__被解释器自动赋上对应的值

son模块执行-开始主模块执行-开始__main__son

这样设计的用途是,可以对某个模块内自己实现的方法进行简单测试,类似Java中如果想在某个类中测试下刚刚写好的方法,就会随手就地写一个main方法然后main中直接启动自己写的方法,对python而言,可以将某个子模块最后加上这样一段if __name__ == '__main__'逻辑

print('son模块执行-开始')def fun():    print('hello world')if __name__ == '__main__':    fun()

只要将当前模块直接启动,就能被当作主模块被解释器直接执行if __name__ == '__main__'下的逻辑实现临时测试,但是上线后当作为子模块被主模块引入,这段if逻辑则会被忽略

如果不加这段if逻辑,同样可以进行测试,但是需要上线前删除或注释测试代码,一旦忘记了,或者少注释了一段,误上线后就可能造成很大的影响,因此if __name__ == '__main__'至少可以使得程序更加安全

2.包

python中,包并不是一个和模块并列的东西,而是模块的进一步升级,一个包含__init__.py文件的文件夹就叫做包。通常将实现某个近似或相关功能的众多模块放在一个包中。

__init__.py是包的初始化文件,可以编写一些初始化逻辑(比如检查下当前环境等),还可以控制包被导入的内容,当包被导入时,__init__.py将被自动调用

模块是对功能的整理,包则是对模块的进一步整理,一个包中可以包含多个模块,也可以包含多个子包,包可以提升代码的可维护性和可复用性,便于管理大型项目。

python的包和模块类似,分为标准库包,自定义包和第三方包,封装标准库模块的自然就是标准库包,第三方包和自定义包同理。

2.1 定义包

定义包和定义模块规则也类似,报名符合标识符命名规范,不能和标准库包的名称冲突,且大小写敏感,一般用小写字母。

例如在项目根路径下,新建一个trade包,新建文件夹,名字和要建的包的包名一致,文件夹里面新建一个空的__init__.py文件,就成功创建了一个包

存在子包时,包名就是父子包用.连接,就像Java那样,例如:org.springframework.boot

project    ├── .venv    └── trade                  └── __init__.py

在Pycharm IDE中,右键新建Python Package可以一气呵成将文件夹和__init__.py同时创建。

包中可以新建自己需要的模块,例如order.py,pay.py

project    ├── .venv    └── trade          ├── order.py          ├── pay.py                    └── __init__.py

2.2 引入包

对于包来说,有五种和引入模块相似的方式,在语法和用法规则都是相同的,唯一改变的是模块名前要加上包名

模块
import 模块名import 包名.模块名
import 模块名 as 别名import 包名.模块名 as 别名
from 模块名 import 具体内容1, 具体内容2 …from 包名.模块名 import 具体内容1, 具体内容2 …
from 模块名 import 具体内容1 as 别名1, 具体内容2 as 别名2 …from 包名.模块名 import 具体内容1 as 别名1, 具体内容2 as 别名2 …
from 模块名 import *from 包名.模块名 import *

除了这五种和引入模块相似的语法,还有包特有的引入方式,新建一个testpg.py模块,测试这些方式

project    ├── .venv    ├── testpg.py    └── trade          ├── order.py          ├── pay.py                    └── __init__.py

2.2.1 from 包名 import 模块名

testpg.py

from trade import payfrom trade import orderprint(pay.timeout)pay.wechat_pay()print(order.max_amount)order.create_order()

2.2.2 from 包名 import 模块名 as 别名

testpg.py

from trade import pay as pfrom trade import order as oprint(p.timeout)p.wechat_pay()print(o.max_amount)o.create_order()

2.2.3 from 包名 import *

包和模块的import *导入逻辑是不一样的,并不是将包下每个模块的所有成员都导入,而是和包的__init__.py文件有关,__init__.py中定义的内容才能被导入

trade/__init__.py

print('trade init')a = 100b = 200

testpg.py

from trade import *print(a)print(b)print(timeout)print(max_amount)

运行结果:导入包时打印trade init,且只有a b能获取到

trade init100200Traceback (most recent call last):  File "D:\python-lang-test\test1\testpg.py", line 29, in <module>    print(timeout)          ^^^^^^^NameError: name 'timeout' is not definedProcess finished with exit code 1

如果要通过包引入模块,可以在__init__.py文件中直接import模块,import也是一种定义

trade/__init__.py

print('trade init')a = 100b = 200import orderimport pay

testpg.py

from trade import *print(a)print(b)print(order.max_amount)pay.wechat_pay()

还可以通过__all__以字符串指定包中的哪些可以被from 包名 import *的语法引入,无需import模块直接写模块名在列表中,例如下面程序,只有order模块和a b能被引入

trade/__init__.py

print('trade init')a = 100b = 200__all__ = ['order', 'a', 'b']

2.2.4 import 包名

直接导入包,通过包名访问成员,导入的包必须在__init__.py中import,通过__all__指定在这种引入方式上不生效

trade/__init__.py

print('trade init')import orderimport paya = 100b = 200

testpg.py

import tradetrade.order.create_order()print(trade.a)

3.pip

pip是python自带的第三方包管理器,在windows下使用管理员权限打开CMD,输入pip回车,就能看到提示,pip实际上对应的是python安装目录的\Scripts\pip.exe文件

第三方包要到pypi的网站查找:https://pypi.org/,就像从maven中央仓库和npm查找Java或js的第三方依赖那样。

通过pip install命令安装第三方包,例如:

pip install numpy

全局环境下,第三方包和模块会被安装在Python安装目录的\Lib\site-packages,一同被安装的还有numpy.libs,numpy-2.3.5.dist-info两个文件夹,numpy.libs是该包依赖的一些底层C实现的东西,numpy-2.3.5.dist-info里面则是描述文件

pip自己也是一个第三方包,在安装python环境时,一般默认安装pip,只要选择了默认安装,就会被安装在Lib\site-packages,Scripts\pip.exe最终就是在运行Lib\site-packages中的pip

pypi的服务器在境外,夜间可能访问不稳定,因此可以使用国内的一些镜像,例如清华大学pypi镜像:https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple

pip安装时临时指定镜像

pip install -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple numpy

永久指定镜像

⚠️如果在虚拟环境下执行,实现每个环境有不同的pip配置,虚拟环境目录下要提前创建好一个pip.ini文件,例如我的是:.venv\pip.ini

pip config set global.index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple

pip还有以下几个常见用法:

命令释义
pip install 包==版本号当前环境中,安装某个版本的某包
pip list当前环境中,已安装的所有第三方包
pip config list当前环境pip配置
pip uninstall ...从当前环境卸载指定的第三方包
pip config unset global.index-url恢复默认的pypi地址
pip install git+https://github.com/**从Git仓库地址安装包

Python全局环境和虚拟环境(venv)

2025年12月15日 00:00

1.概述

在进行python项目开发时,不同项目可能需要依赖的python版本是不同的,有时电脑上需要安装好几个不同版本的python解释器。而且在开发过程中,需要依赖一些第三方的包,不同项目依赖的第三方包及其版本也不相同,需要进行区分。

类似node.js使用npm可以全局安装一些依赖包,也可以配合项目package.json为每个项目单独安装依赖,但是对于python来说,不仅仅依赖包可以分别安装,不同项目甚至可以采用不同python解释器,这个就是通过python的虚拟环境实现的。

环境,就是python解释器 + 依赖包。python虚拟环境的作用就是实现环境的隔离,允许你在不同项目中使用不同的包版本和python版本,隔离项目依赖,避免了包冲突,确保项目的一致性和可移植性。

本文基于 Windows + Python3.13 介绍默认的虚拟环境工具venv

2.全局环境和虚拟环境

2.1 全局环境

python安装到本地后,打开安装目录后可见

其中Lib,Scripts文件夹和python.exe自然形成一个全局环境

  • Lib/下是模块和包,凡是第三方模块和包统一都放在了\Lib\site-packages文件夹内,否则都是标准库模块和标准库包。

  • Scripts/内是一些可执行的文件,例如pip.exe

  • python.exe 即python解释器

设置环境变量后,随便打开一个CMD,执行python,使用的是全局环境的python

2.2 虚拟环境

python3自带了一个虚拟环境工具venv

在python环境变量设置好后,在asas空文件夹下打开CMD,执行python -m venv myenv命令,文件夹内会生成一个myenv文件夹,我们就创建好了一个叫myenv的虚拟环境,在这个asas文件夹下的项目代码可以都依赖这个虚拟环境,所以虚拟环境也是以当前安装的python全局环境为模板复刻而来的

和全局环境一样,虚拟环境也会存在Lib,Scripts文件夹,不同的是,虚拟环境的Lib文件夹下只有site-packages一个文件夹,用于保存项目自身的第三方依赖,也可以理解为虚拟环境下Lib文件夹内只保存该项目本身需要的第三方的依赖。

全局环境的python.exe在虚拟环境下则跑到Scripts文件夹里面去了。

虚拟环境,实际上就是全局环境的一部分文件复制一份分配给了不同的项目而已。

CMD切换到myenv\Scripts目录下,执行activate.bat,会出现一个(myenv) 开头的终端,此时进入的就是该项目的虚拟环境,使用的是虚拟环境的python

执行activate.bat,就是在激活虚拟环境,虚拟环境需要激活才能生效,直接CD到asas文件夹下执行python命令,使用的还是全局环境。

虚拟环境下想要退出,回到全局环境,windows下只要在虚拟环境的DOS下执行deactivate.bat即可。

💡可以用不同的python版本创建不同虚拟环境的项目,就实现了每个项目依赖不同python版本,相互隔离运行

3.不同环境的包依赖规则

  1. 全局环境和虚拟环境可以各自安装第三方依赖包,全局环境项目依赖的第三方包引用自全局环境\Lib\site-packages,虚拟环境项目依赖的第三方包引用自项目自身虚拟环境\Lib\site-packages

  2. python使用pip管理第三方依赖,全局环境执行pip install时,安装在全局环境\Lib\site-packages内,虚拟环境执行pip install时,安装在虚拟环境\Lib\site-packages内

  3. 虚拟环境项目在依赖标准库模块和标准库包时,直接从全局环境\Lib调取,需要第三方依赖包时,只能从自己虚拟环境的\Lib\site-packages内调取,不能调取全局环境\Lib\site-packages中的第三方包

  4. 不同虚拟环境的项目之间,不能引用对方虚拟环境\Lib\site-packages下的第三方依赖包

  5. 全局环境项目也不能调取某个项目虚拟环境\Lib\site-packages下的第三方依赖包

4.在IDE中使用全局环境和虚拟环境

以最好用的Python IDE:PyCharm为例

新建项目时,默认会选中创建虚拟环境下的解释器

虚拟环境下,IDE会自动创建虚拟环境:.venv,终端打开时IDE也会自动为我们切换到虚拟环境的命令行下,安装的包也自动保存到项目虚拟环境下

如只用全局环境,则选择已有的自定义环境,并保证python是安装时的安装目录中的解释器即可

使用python将excel表格转换为SQL INSERT

2024年10月16日 10:00
import pandas as pd# 读取Excel文件file_path = 'D:/1.xlsx'  # 替换为你的文件路径sheet_name = '1'  # 替换为你的工作表名称table_name = 'your_table_name'  # 替换为你的数据库表名称# 使用pandas读取Excel数据df = pd.read_excel(file_path, sheet_name=sheet_name)# 获取表的列名columns = ', '.join(df.columns)# 将每行数据转换为SQL INSERT语句insert_statements = []for index, row in df.iterrows():    # 将每行数据转换为元组格式    values = ', '.join([f"'{str(value)}'" for value in row.values])    sql = f"INSERT INTO {table_name} ({columns}) VALUES ({values});"    insert_statements.append(sql)# 输出SQL INSERT语句for statement in insert_statements:    print(statement)

使用python压缩图片

2024年10月15日 20:30
  1. 首先Linux上面要安装一些软件包
yum install -y libjpeg-develyum install -y zlib-develyum install -y libjpeg libtiff freetype 
  1. 其次需要安装python的依赖
pip3 install tinifypip3 install Pillow
  1. 编写并运行以下代码
import osfrom PIL import Imageimport tinifyfrom concurrent.futures import ThreadPoolExecutor# 配置部分input_folder = '/img'  # 要压缩的图片文件夹whitelist = {'example1.jpg', 'important_image.png'}  # 白名单中的文件quality = 75  # 压缩质量def compress_image(img_path, quality):    """    压缩单张图片并覆盖原文件。        :param img_path: 图片路径    :param quality: 压缩质量    """    try:        img = Image.open(img_path)        img.save(img_path, optimize=True, quality=quality)        print(f"Compressed and saved in place: {img_path}")    except Exception as e:        print(f"Failed to compress {img_path}: {e}")def compress_images_in_place(input_folder, whitelist=None, quality=75):    """    使用 Pillow 压缩图片并覆盖原文件,支持白名单功能。使用多线程加速处理。        :param input_folder: 原始图片文件夹路径    :param whitelist: 白名单列表,包含不希望被压缩的图片文件名(可选)    :param quality: 压缩质量,默认75    """    if whitelist is None:        whitelist = set()    # 创建一个 ThreadPoolExecutor 来处理压缩任务    with ThreadPoolExecutor() as executor:        futures = []        for root, dirs, files in os.walk(input_folder):            for file in files:                if file in whitelist:                    print(f"Skipping (whitelisted): {file}")                    continue                if file.lower().endswith(('.jpg', '.jpeg', '.png')):                    img_path = os.path.join(root, file)                    futures.append(executor.submit(compress_image, img_path, quality))        # 等待所有任务完成        for future in futures:            future.result()# 调用压缩函数compress_images_in_place(input_folder, whitelist, quality)

利用Python实现Hexo站点的持续集成

2024年10月15日 20:00

引言

Hexo博客编写完文章需要从头构建并重新上传生成的静态文件到服务器,不能增量更新十分不便,每次上传的文件也越来越大,于是一个比较好的解决办法是在提交文章、资源和配置后,让服务器自动去从git远程仓库拉取最新的提交到服务器上的本地git仓库中,并部署。使用Jenkins等持续集成工具比较繁琐也消耗资源,于是我选择用python编写一个基于flask的webhook脚本来实现。

具体实现流程是:文章提交到git代码托管平台的远程仓库后,远程仓库发送HTTP回调请求给服务器上的webhook脚本,脚本程序根据回调请求的参数,先调用git拉取远程仓库最新提交内容,再去调用npm构建和部署最新版hexo博客到nginx下。

这里的git代码托管平台我选择Gitee:https://gitee.com/

1. 安装pip和python

首先,确保你已经安装了python。如果没有安装,可以使用以下命令来安装python和pip:

1.1 检查python版本

python3 --version

如果你已经安装了python 3.x版本,可以跳过安装python的步骤。否则,继续安装:

1.2 安装python3

sudo yum install python3 -y  # 适用于 CentOS 或其他 RHEL 系统

1.3 安装pip

安装pip的方法:

sudo yum install python3-pip -y  # CentOS/RHEL 系统

安装完成后,确认pip是否已经成功安装:

pip3 --version

2. 使用pip安装依赖

一旦 pip 安装好,你可以使用以下命令来安装需要的库:

pip3 install flask gitpython

3.编写脚本

vim webhook.py
import osimport subprocessfrom flask import Flask, request, jsonifyimport gitapp = Flask(__name__)# 配置你的本地仓库路径和构建命令REPO_PATH = "/path/to/your/hexo/blog"PUBLIC_PATH = os.path.join(REPO_PATH, 'public')# 拉取代码的函数def pull_code():    try:        repo = git.Repo(REPO_PATH)        origin = repo.remotes.origin        origin.pull()        return True    except Exception as e:        print(f"Failed to pull code: {e}")        return False# 构建 Hexo 站点的函数def build_hexo():    try:        # 执行 Hexo 命令        subprocess.run(["npm", "run", "build"], cwd=REPO_PATH, check=True)                return True    except subprocess.CalledProcessError as e:        print(f"Failed to build Hexo: {e}")        return False@app.route("/webhook", methods=["POST"])def webhook():    # 验证请求是否来自 Gitee    if request.headers.get("X-Gitee-Token") != "": #这里改成你设置的密码        return jsonify({"message": "Unauthorized"}), 401    # 获取事件类型,确保是 push 事件    event = request.headers.get("X-Gitee-Event")    if event != "Push Hook":        return jsonify({"message": "Not a push event"}), 400    # 拉取代码并构建    if pull_code() and build_hexo():        return jsonify({"message": "Hexo build success"}), 200    else:        return jsonify({"message": "Failed to pull or build"}), 500if __name__ == "__main__":    app.run(host="0.0.0.0", port=5000)

代码优化,加入线程控制,防止webhook链接被并发调用后,两个hook任务线程同时执行出现安全问题。

import osimport subprocessfrom flask import Flask, request, jsonifyimport gitimport threadingapp = Flask(__name__)# 配置你的本地仓库路径和构建命令REPO_PATH = "/blog"PUBLIC_PATH = os.path.join(REPO_PATH, 'public')lock = threading.Lock()is_building = False  # 标志位,用于指示是否有任务正在进行# 拉取代码的函数def pull_code():    try:        repo = git.Repo(REPO_PATH)        origin = repo.remotes.origin        origin.pull()        return True    except Exception as e:        print(f"Failed to pull code: {e}")        return False# 构建 Hexo 站点的函数def build_hexo():    try:        # 执行 Hexo 的清理和生成命令        subprocess.run(["npm", "run", "build"], cwd=REPO_PATH, check=True)        #subprocess.run(["hexo", "generate"], cwd=REPO_PATH, check=True)        return True    except subprocess.CalledProcessError as e:        print(f"Failed to build Hexo: {e}")        return False@app.route("/webhook", methods=["POST"])def webhook():    global is_building    # 验证请求是否来自 Gitee    if request.headers.get("X-Gitee-Token") != "":        return jsonify({"message": "Unauthorized"}), 401    # 获取事件类型,确保是 push 事件    event = request.headers.get("X-Gitee-Event")    if event != "Push Hook":        return jsonify({"message": "Not a push event"}), 400    if is_building:        return jsonify({"message": "Build in progress, try again later"}), 429    with lock:        is_building = True  # 设置标志位为 True,表示任务开始        try:            # 拉取代码并构建            if pull_code() and build_hexo():                return jsonify({"message": "Hexo build success"}), 200            else:                return jsonify({"message": "Failed to pull or build"}), 500        finally:            is_building = False  # 重置标志位,表示任务结束            if __name__ == "__main__":    app.run(host="0.0.0.0", port=5000)

代码还可以进一步优化,只对master分支提交推送进行触发构建。

import osimport subprocessfrom flask import Flask, request, jsonifyimport gitimport threadingapp = Flask(__name__)# 配置你的本地仓库路径和构建命令REPO_PATH = "/blog"PUBLIC_PATH = os.path.join(REPO_PATH, 'public')lock = threading.Lock()is_building = False  # 标志位,用于指示是否有任务正在进行# 拉取代码的函数def pull_code():    try:        repo = git.Repo(REPO_PATH)        origin = repo.remotes.origin        origin.pull()        return True    except Exception as e:        print(f"Failed to pull code: {e}")        return False# 构建 Hexo 站点的函数def build_hexo():    try:        # 执行 Hexo 的清理和生成命令        subprocess.run(["npm", "run", "build"], cwd=REPO_PATH, check=True)        #subprocess.run(["hexo", "generate"], cwd=REPO_PATH, check=True)        return True    except subprocess.CalledProcessError as e:        print(f"Failed to build Hexo: {e}")        return False@app.route("/webhook", methods=["POST"])def webhook():    global is_building    # 验证请求是否来自 Gitee    if request.headers.get("X-Gitee-Token") != "":        return jsonify({"message": "Unauthorized"}), 401    # 获取事件类型,确保是 push 事件    event = request.headers.get("X-Gitee-Event")    if event != "Push Hook":        return jsonify({"message": "Not a push event"}), 400    # 解析推送数据    payload = request.get_json()    if not payload:        return jsonify({"message": "Invalid payload"}), 400    # 检查是否是 master 分支的推送    ref = payload.get("ref")    if ref != "refs/heads/master":        return jsonify({"message": "Not a master branch push, ignored"}), 200    if is_building:        return jsonify({"message": "Build in progress, try again later"}), 429    with lock:        is_building = True  # 设置标志位为 True,表示任务开始        try:            # 拉取代码并构建            if pull_code() and build_hexo():                return jsonify({"message": "Hexo build success"}), 200            else:                return jsonify({"message": "Failed to pull or build"}), 500        finally:            is_building = False  # 重置标志位,表示任务结束            if __name__ == "__main__":    app.run(host="0.0.0.0", port=5000)

4.执行脚本

执行前,服务器还需要安装好node、npm、git并配置好环境变量

nohup python3 webhook.py &

5.配置hook到gitee

设置好签名(密码),设置回调地址,勾选两项

除此外,还要将自己gitee账户相匹配的ssh密钥设置在服务器的上用于拉取我们私有仓库的内容。

数独求解工具

作者obaby
2025年8月8日 00:02

对象有一本做数独的书,里面满满的都是题目,然而,看到这前三关直接不淡定了,这尼玛是人能做出来的。

试了半天无过之后,果断还是决定上代码了。

就这么个破玩意儿,愣是跑了十几分钟才出来。

import tkinter as tk
from tkinter import ttk, messagebox, filedialog
import threading
import queue
import time
import json
import copy
import concurrent.futures
import os
import multiprocessing

# --- 数独核心逻辑 ---
def is_valid(board, row, col, num):
    for i in range(9):
        if board[row][i] == num or board[i][col] == num:
            return False
    start_row, start_col = 3 * (row // 3), 3 * (col // 3)
    for i in range(3):
        for j in range(3):
            if board[start_row + i][start_col + j] == num:
                return False
    return True

def solve_sudoku_all_with_progress(board, branch_id, progress_queue):
    solutions = []
    visited = [0]
    def backtrack(bd, pos=0):
        if pos == 81:
            solutions.append(copy.deepcopy(bd))
            return
        row, col = divmod(pos, 9)
        if bd[row][col] != 0:
            backtrack(bd, pos + 1)
        else:
            for num in range(1, 10):
                if is_valid(bd, row, col, num):
                    bd[row][col] = num
                    visited[0] += 1
                    if visited[0] % 1000 == 0:
                        progress_queue.put((branch_id, visited[0]))
                    backtrack(bd, pos + 1)
                    bd[row][col] = 0
    backtrack(board)
    # 结束时推送最终节点数
    progress_queue.put((branch_id, visited[0]))
    return solutions, visited[0]

def worker(new_board, branch_id, progress_queue):
    return solve_sudoku_all_with_progress(new_board, branch_id, progress_queue)

class SudokuGUI:
    def __init__(self, root):
        self.root = root
        self.root.title('多进程数独求解器')
        self.root.geometry('520x650')
        self.root.resizable(False, False)
        self.entries = [[None for _ in range(9)] for _ in range(9)]
        self._build_ui()
        self.progress_queue = queue.Queue()
        self._poll_progress()

    def _build_ui(self):
        self.root.configure(bg='#f7f7fa')
        style = ttk.Style()
        style.theme_use('clam')
        style.configure('TButton', font=('Segoe UI', 12), padding=6)
        style.configure('TLabel', font=('Segoe UI', 11), background='#f7f7fa')
        style.configure('TProgressbar', thickness=18, troughcolor='#e0e0e0', background='#4caf50')
        title = ttk.Label(self.root, text='多进程数独求解器', font=('Segoe UI', 22, 'bold'))
        title.pack(pady=(18, 8))
        frame = tk.Frame(self.root, bg='#f7f7fa')
        frame.pack(pady=8)
        for i in range(9):
            for j in range(9):
                e = tk.Entry(frame, width=2, font=('Consolas', 22), justify='center', bd=2, relief='ridge', bg='#fff', fg='#222')
                e.grid(row=i, column=j, padx=(2 if j % 3 == 0 else 0, 2), pady=(2 if i % 3 == 0 else 0, 2))
                self.entries[i][j] = e
        btn_frame = tk.Frame(self.root, bg='#f7f7fa')
        btn_frame.pack(pady=10)
        ttk.Button(btn_frame, text='求解所有解', command=self.solve).grid(row=0, column=0, padx=8)
        ttk.Button(btn_frame, text='清空', command=self.clear).grid(row=0, column=1, padx=8)
        ttk.Button(btn_frame, text='保存', command=self.save).grid(row=0, column=2, padx=8)
        ttk.Button(btn_frame, text='加载', command=self.load).grid(row=0, column=3, padx=8)
        self.progress_var = tk.DoubleVar()
        self.progress_bar = ttk.Progressbar(self.root, variable=self.progress_var, maximum=1, length=400)
        self.progress_bar.pack(pady=(10, 2))
        self.progress_label = ttk.Label(self.root, text='进度: 0/0  节点: 0  速度: -- 节点/秒')
        self.progress_label.pack()
        self.status_label = ttk.Label(self.root, text='', font=('Segoe UI', 10))
        self.status_label.pack(pady=(8, 0))
        self.thread_label = ttk.Label(self.root, text='线程状态: 空闲', font=('Segoe UI', 10))
        self.thread_label.pack(pady=(2, 0))

    def get_board(self):
        board = []
        for i in range(9):
            row = []
            for j in range(9):
                val = self.entries[i][j].get()
                if val == '':
                    row.append(0)
                else:
                    try:
                        num = int(val)
                        if 1 <= num <= 9:
                            row.append(num)
                        else:
                            raise ValueError
                    except ValueError:
                        messagebox.showerror('输入错误', f'第{i+1}行第{j+1}列输入无效')
                        return None
            board.append(row)
        return board

    def clear(self):
        for i in range(9):
            for j in range(9):
                self.entries[i][j].delete(0, tk.END)
        self.progress_var.set(0)
        self.progress_label.config(text='进度: 0/0  节点: 0  速度: -- 节点/秒')
        self.status_label.config(text='')
        self.thread_label.config(text='线程状态: 空闲')

    def save(self):
        data = [[self.entries[i][j].get() for j in range(9)] for i in range(9)]
        file_path = filedialog.asksaveasfilename(defaultextension='.json', filetypes=[('JSON文件', '*.json')])
        if file_path:
            with open(file_path, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False)
            self.status_label.config(text='已保存')

    def load(self):
        file_path = filedialog.askopenfilename(defaultextension='.json', filetypes=[('JSON文件', '*.json')])
        if file_path:
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            for i in range(9):
                for j in range(9):
                    self.entries[i][j].delete(0, tk.END)
                    if data[i][j]:
                        self.entries[i][j].insert(0, data[i][j])
            self.status_label.config(text='已加载')

    def solve(self):
        board = self.get_board()
        if board is None:
            return
        self.progress_var.set(0)
        self.progress_label.config(text='进度: 0/0  节点: 0  速度: -- 节点/秒')
        self.status_label.config(text='正在求解...')
        self.thread_label.config(text='线程状态: 启动中...')
        self.disable_inputs()
        self._parallel_solve(board)

    def disable_inputs(self):
        for i in range(9):
            for j in range(9):
                self.entries[i][j].config(state='disabled')

    def enable_inputs(self):
        for i in range(9):
            for j in range(9):
                self.entries[i][j].config(state='normal')

    def _parallel_solve(self, board):
        # 找到第一个空格
        first_empty = None
        for i in range(9):
            for j in range(9):
                if board[i][j] == 0:
                    first_empty = (i, j)
                    break
            if first_empty:
                break
        if not first_empty:
            self.root.after(0, lambda: self.thread_label.config(text='线程状态: 空闲'))
            self.on_solve_done([copy.deepcopy(board)])
            return
        i, j = first_empty
        candidates = [num for num in range(1, 10) if is_valid(board, i, j, num)]
        total_tasks = len(candidates)
        finished = [0]
        all_solutions = []
        all_visited = [0]
        branch_nodes = {idx: 0 for idx in range(total_tasks)}
        start_time = time.time()
        self.root.after(0, lambda: self.thread_label.config(text='线程状态: 运行中 (多进程)'))
        # 用multiprocessing.Manager().Queue()跨进程通信
        from multiprocessing import Manager
        mp_manager = Manager()
        mp_queue = mp_manager.Queue()
        def manager():
            try:
                with concurrent.futures.ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
                    futures = []
                    for idx, num in enumerate(candidates):
                        new_board = copy.deepcopy(board)
                        new_board[i][j] = num
                        futures.append(executor.submit(worker, new_board, idx, mp_queue))
                    # 进度监听线程
                    def progress_listener():
                        while finished[0] < total_tasks:
                            try:
                                branch_id, nodes = mp_queue.get(timeout=0.1)
                                branch_nodes[branch_id] = nodes
                                all_visited[0] = sum(branch_nodes.values())
                                elapsed = time.time() - start_time
                                speed = all_visited[0] / elapsed if elapsed > 0 else 0
                                self.progress_queue.put((finished[0], total_tasks, speed, all_visited[0]))
                            except Exception:
                                pass
                    listener_thread = threading.Thread(target=progress_listener, daemon=True)
                    listener_thread.start()
                    for idx, fut in enumerate(futures):
                        result, visited = fut.result()
                        all_solutions.extend(result)
                        finished[0] += 1
                        branch_nodes[idx] = visited
                        all_visited[0] = sum(branch_nodes.values())
                        elapsed = time.time() - start_time
                        speed = all_visited[0] / elapsed if elapsed > 0 else 0
                        self.progress_queue.put((finished[0], total_tasks, speed, all_visited[0]))
                    listener_thread.join(timeout=0.1)
                self.root.after(0, lambda: self.thread_label.config(text='线程状态: 已完成 (多进程)'))
                self.root.after(0, lambda: self.on_solve_done(all_solutions))
            except Exception as e:
                import traceback
                traceback.print_exc()
                self.root.after(0, lambda e=e: messagebox.showerror('主进程异常', str(e)))
        threading.Thread(target=manager, daemon=True).start()

    def _poll_progress(self):
        try:
            while True:
                v, t, s, nodes = self.progress_queue.get_nowait()
                self._update_progress(v, t, s, nodes)
        except queue.Empty:
            pass
        self.root.after(50, self._poll_progress)

    def _update_progress(self, finished, total, speed, nodes):
        progress = finished / total if total > 0 else 0
        speed_str = f"{speed:.1f}" if speed > 0 else "--"
        self.progress_var.set(progress)
        self.progress_label.config(text=f'进度: {finished}/{total}  节点: {nodes}  速度: {speed_str} 节点/秒')

    def on_solve_done(self, solutions):
        self.enable_inputs()
        if not solutions:
            self.status_label.config(text='无解')
            self.thread_label.config(text='线程状态: 空闲')
            messagebox.showinfo('结果', '无解')
        else:
            self.status_label.config(text=f'共找到 {len(solutions)} 个解')
            self.thread_label.config(text='线程状态: 空闲')
            self.show_solution_window(solutions)

    def show_solution_window(self, solutions):
        win = tk.Toplevel(self.root)
        win.title(f'共找到{len(solutions)}个解')
        win.geometry('420x420')
        idx = tk.IntVar(value=0)
        original = [[self.entries[i][j].get() != '' for j in range(9)] for i in range(9)]
        sol_entries = [[tk.Entry(win, width=2, font=('Consolas', 18), justify='center', state='readonly') for _ in range(9)] for _ in range(9)]
        for i in range(9):
            for j in range(9):
                sol_entries[i][j].grid(row=i, column=j, padx=(2 if j % 3 == 0 else 0, 2), pady=(2 if i % 3 == 0 else 0, 2))
        def show(idx_val):
            for i in range(9):
                for j in range(9):
                    sol_entries[i][j].config(state='normal')
                    sol_entries[i][j].delete(0, tk.END)
                    sol_entries[i][j].insert(0, str(solutions[idx_val][i][j]))
                    if original[i][j]:
                        sol_entries[i][j].config(fg='#e53935')
                    else:
                        sol_entries[i][j].config(fg='#388e3c')
                    sol_entries[i][j].config(state='readonly')
        btn_prev = ttk.Button(win, text='上一解', command=lambda: (idx.set((idx.get()-1)%len(solutions)), show(idx.get())))
        btn_next = ttk.Button(win, text='下一解', command=lambda: (idx.set((idx.get()+1)%len(solutions)), show(idx.get())))
        btn_prev.grid(row=9, column=2, columnspan=2, pady=8)
        btn_next.grid(row=9, column=5, columnspan=2, pady=8)
        if len(solutions) == 1:
            btn_prev.config(state='disabled')
            btn_next.config(state='disabled')
        show(0)

if __name__ == "__main__":
    multiprocessing.freeze_support()  # for Windows compatibility
    root = tk.Tk()
    app = SudokuGUI(root)
    root.mainloop()

 

The post 数独求解工具 appeared first on obaby@mars.

❌