Python3 实战case

Through practical projectscase, MasterPython3 application技能

1. 实战caseoverview

本章节将through many 个practicalprojectcase, helping您巩固所学 Python3knowledge, 并Mastersuch as何将Python3application to practicalDevelopmentin. 这些case涵盖了WebDevelopment, dataanalysis, automation脚本etc. many 个领域, 每个case都package含完整 codeimplementation and 详细 讲解.

throughLearning这些实战case, you willable to:

  • UnderstandPython3 in 不同领域 application场景
  • Masterpracticalproject Development流程 and method
  • Learningsuch as何解决practicalDevelopmentin遇 to issues
  • improvingcodewritingcapacity and project实战experience

2. case一: 简易Webserver

2.1 project介绍

usingPython3 in 置modulecreation一个简易 Webserver, implementationbasic HTTPrequestprocessing and responsefunctions.

2.2 techniques栈

  • Python3 标准library: http.server and socketserver
  • HTML/CSS Basics

2.3 codeimplementation

#!/usr/bin/env python3
"""
简易Webserverimplementation
"""

from http.server import HTTPServer, BaseHTTPRequestprocessingr
import time

class SimpleHTTPRequestprocessingr(BaseHTTPRequestprocessingr):
    
    def do_GET(self):
        """processingGETrequest"""
        # 设置responsestatus码
        self.send_response(200)
        # 设置response头
        self.send_header('Content-type', 'text/html')
        self.end_headers()
        
        # 构建response in 容
        current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
        
        response_html = f"""
        
        
        
            
            
            简易Webserver
            
        


        
            

欢迎访问简易Webserver

这 is a usingPython3 in 置modulecreation Webserver.

当 before 时间: {current_time}

requestpath: {self.path}

serverversion: Python3 HTTP Server

""" # 发送response in 容 self.wfile.write(response_html.encode('utf-8')) def do_POST(self): """processingPOSTrequest""" # 获取request体 long 度 content_length = int(self.headers['Content-Length']) # 读取request体 post_data = self.rfile.read(content_length) # 设置responsestatus码 self.send_response(200) # 设置response头 self.send_header('Content-type', 'text/html') self.end_headers() # 构建response in 容 response_html = f""" POSTrequestprocessing

POSTrequestprocessing成功

server已收 to 您发送 data:

{post_data.decode('utf-8')}
""" # 发送response in 容 self.wfile.write(response_html.encode('utf-8')) def run_server(): """启动server""" host = 'localhost' port = 8000 server = HTTPServer((host, port), SimpleHTTPRequestprocessingr) print(f"server已启动, 监听地址: http://{host}:{port}") print("按 Ctrl+C 停止server") try: server.serve_forever() except KeyboardInterrupt: print("\nserver正 in 停止...") server.server_close() print("server已停止") if __name__ == "__main__": run_server()

2.4 run and test

  1. 将 on 述code保存 for simple_web_server.py
  2. in commands行inrun: python simple_web_server.py
  3. 打开浏览器, 访问 http://localhost:8000
  4. 可以尝试发送POSTrequesttestserver POSTprocessingfunctions

实践case: 学生informationmanagementsystem

writing一个学生informationmanagementsystem, implementation以 under functions:

  1. 添加学生information (姓名, 年龄, 性别, 成绩etc.)
  2. query学生information (按姓名 or 学号query)
  3. modify学生information
  4. delete学生information
  5. 将学生information保存 to file
  6. from file加载学生information
#!/usr/bin/env python3
"""
学生informationmanagementsystem
"""

import json
import os

class Student:
    """学生class"""
    def __init__(self, id, name, age, gender, score):
        self.id = id
        self.name = name
        self.age = age
        self.gender = gender
        self.score = score
    
    def to_dict(self):
        """转换 for dictionary"""
        return {
            'id': self.id,
            'name': self.name,
            'age': self.age,
            'gender': self.gender,
            'score': self.score
        }
    
    @classmethod
    def from_dict(cls, data):
        """ from dictionarycreation学生object"""
        return cls(data['id'], data['name'], data['age'], data['gender'], data['score'])

class Studentmanagementr:
    """学生managementclass"""
    def __init__(self, file_path='students.json'):
        self.file_path = file_path
        self.students = self.load_students()
    
    def load_students(self):
        """ from file加载学生information"""
        if os.path.exists(self.file_path):
            try:
                with open(self.file_path, 'r', encoding='utf-8') as f:
                    data = json.load(f)
                    return [Student.from_dict(s) for s in data]
            except:
                return []
        return []
    
    def save_students(self):
        """保存学生information to file"""
        data = [s.to_dict() for s in self.students]
        with open(self.file_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
    
    def add_student(self, student):
        """添加学生"""
        # check学号 is 否已存 in 
        for s in self.students:
            if s.id == student.id:
                return False, "学号已存 in "
        self.students.append(student)
        self.save_students()
        return True, "添加成功"
    
    def get_student(self, id):
        """根据学号获取学生"""
        for student in self.students:
            if student.id == id:
                return student
        return None
    
    def get_student_by_name(self, name):
        """根据姓名获取学生"""
        result = []
        for student in self.students:
            if student.name == name:
                result.append(student)
        return result
    
    def update_student(self, id, **kwargs):
        """update学生information"""
        student = self.get_student(id)
        if not student:
            return False, "学生不存 in "
        
        for key, value in kwargs.items():
            if hasattr(student, key):
                setattr(student, key, value)
        
        self.save_students()
        return True, "update成功"
    
    def delete_student(self, id):
        """delete学生"""
        student = self.get_student(id)
        if not student:
            return False, "学生不存 in "
        
        self.students.remove(student)
        self.save_students()
        return True, "delete成功"
    
    def list_students(self):
        """列出所 has 学生"""
        return self.students

def main():
    """主function"""
    manager = Studentmanagementr()
    
    while True:
        print("\n学生informationmanagementsystem")
        print("1. 添加学生")
        print("2. query学生")
        print("3. modify学生")
        print("4. delete学生")
        print("5. 列出所 has 学生")
        print("6. 退出system")
        
        choice = input("请选择operation: ")
        
        if choice == '1':
            # 添加学生
            id = input("请输入学号: ")
            name = input("请输入姓名: ")
            age = int(input("请输入年龄: "))
            gender = input("请输入性别: ")
            score = float(input("请输入成绩: "))
            
            student = Student(id, name, age, gender, score)
            success, message = manager.add_student(student)
            print(message)
            
        elif choice == '2':
            # query学生
            query_type = input("请选择query方式 (1. 按学号query 2. 按姓名query) : ")
            if query_type == '1':
                id = input("请输入学号: ")
                student = manager.get_student(id)
                if student:
                    print(f"学号: {student.id}, 姓名: {student.name}, 年龄: {student.age}, 性别: {student.gender}, 成绩: {student.score}")
                else:
                    print("学生不存 in ")
            elif query_type == '2':
                name = input("请输入姓名: ")
                students = manager.get_student_by_name(name)
                if students:
                    for student in students:
                        print(f"学号: {student.id}, 姓名: {student.name}, 年龄: {student.age}, 性别: {student.gender}, 成绩: {student.score}")
                else:
                    print("学生不存 in ")
            
        elif choice == '3':
            # modify学生
            id = input("请输入要modify 学生学号: ")
            student = manager.get_student(id)
            if student:
                print(f"当 before information: 学号: {student.id}, 姓名: {student.name}, 年龄: {student.age}, 性别: {student.gender}, 成绩: {student.score}")
                name = input("请输入 new 姓名 (按回车保持不变) : ")
                age = input("请输入 new 年龄 (按回车保持不变) : ")
                gender = input("请输入 new 性别 (按回车保持不变) : ")
                score = input("请输入 new 成绩 (按回车保持不变) : ")
                
                kwargs = {}
                if name:
                    kwargs['name'] = name
                if age:
                    kwargs['age'] = int(age)
                if gender:
                    kwargs['gender'] = gender
                if score:
                    kwargs['score'] = float(score)
                
                success, message = manager.update_student(id, **kwargs)
                print(message)
            else:
                print("学生不存 in ")
            
        elif choice == '4':
            # delete学生
            id = input("请输入要delete 学生学号: ")
            success, message = manager.delete_student(id)
            print(message)
            
        elif choice == '5':
            # 列出所 has 学生
            students = manager.list_students()
            if students:
                print("\n所 has 学生information: ")
                for student in students:
                    print(f"学号: {student.id}, 姓名: {student.name}, 年龄: {student.age}, 性别: {student.gender}, 成绩: {student.score}")
            else:
                print("暂无学生information")
            
        elif choice == '6':
            # 退出system
            print("再见!")
            break
        
        else:
            print("无效 选择, 请重 new 输入")

if __name__ == "__main__":
    main()

3. case二: dataanalysistool

3.1 project介绍

usingPython3 dataanalysislibrary (such asNumPy, Pandas, Matplotlib) creation一个dataanalysistool, 用于analysis and visualizationdata.

3.2 techniques栈

  • NumPy: 用于数值计算
  • Pandas: 用于dataprocessing and analysis
  • Matplotlib: 用于datavisualization

3.3 codeimplementation

#!/usr/bin/env python3
"""
dataanalysistool
"""

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

class Dataanalysisr:
    """dataanalysisclass"""
    
    def __init__(self, data=None):
        """初始化"""
        self.data = data
    
    def load_data(self, file_path):
        """加载data"""
        try:
            if file_path.endswith('.csv'):
                self.data = pd.read_csv(file_path)
            elif file_path.endswith('.xlsx'):
                self.data = pd.read_excel(file_path)
            elif file_path.endswith('.json'):
                self.data = pd.read_json(file_path)
            else:
                return False, "不support file格式"
            return True, "data加载成功"
        except Exception as e:
            return False, f"加载失败: {str(e)}"
    
    def get_basic_info(self):
        """获取databasicinformation"""
        if self.data is None:
            return "暂无data"
        
        info = """
databasicinformation: 
-----------------
"""
        info += f"data形状: {self.data.shape}\n"
        info += f"列名: {list(self.data.columns)}\n"
        info += "\ndataclass型: \n"
        info += str(self.data.dtypes)
        info += "\n\n缺失值statistics: \n"
        info += str(self.data.isnull().sum())
        return info
    
    def get_descriptive_stats(self):
        """获取describes性statisticsinformation"""
        if self.data is None:
            return "暂无data"
        return self.data.describe().to_string()
    
    def plot_histogram(self, column, bins=20):
        """绘制直方graph"""
        if self.data is None:
            return False, "暂无data"
        
        if column not in self.data.columns:
            return False, "列名不存 in "
        
        plt.figure(figsize=(10, 6))
        plt.hist(self.data[column], bins=bins, edgecolor='black')
        plt.title(f'{column}  分布')
        plt.xlabel(column)
        plt.ylabel('频率')
        plt.grid(axis='y', alpha=0.75)
        plt.savefig(f'{column}_histogram.png')
        plt.close()
        return True, f"直方graph已保存 for  {column}_histogram.png"
    
    def plot_scatter(self, x_column, y_column):
        """绘制散点graph"""
        if self.data is None:
            return False, "暂无data"
        
        if x_column not in self.data.columns or y_column not in self.data.columns:
            return False, "列名不存 in "
        
        plt.figure(figsize=(10, 6))
        plt.scatter(self.data[x_column], self.data[y_column], alpha=0.5)
        plt.title(f'{x_column} vs {y_column}')
        plt.xlabel(x_column)
        plt.ylabel(y_column)
        plt.grid()
        plt.savefig(f'{x_column}_vs_{y_column}_scatter.png')
        plt.close()
        return True, f"散点graph已保存 for  {x_column}_vs_{y_column}_scatter.png"
    
    def plot_correlation_heatmap(self):
        """绘制相关性热力graph"""
        if self.data is None:
            return False, "暂无data"
        
        # 计算相关性矩阵
        corr_matrix = self.data.corr()
        
        plt.figure(figsize=(12, 10))
        sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', fmt='.2f', linewidths=0.5)
        plt.title('相关性热力graph')
        plt.tight_layout()
        plt.savefig('correlation_heatmap.png')
        plt.close()
        return True, "相关性热力graph已保存 for  correlation_heatmap.png"
    
    def plot_boxplot(self, column):
        """绘制箱线graph"""
        if self.data is None:
            return False, "暂无data"
        
        if column not in self.data.columns:
            return False, "列名不存 in "
        
        plt.figure(figsize=(10, 6))
        plt.boxplot(self.data[column].dropna())
        plt.title(f'{column}  箱线graph')
        plt.ylabel(column)
        plt.grid(axis='y', alpha=0.75)
        plt.savefig(f'{column}_boxplot.png')
        plt.close()
        return True, f"箱线graph已保存 for  {column}_boxplot.png"

def main():
    """主function"""
    analyzer = Dataanalysisr()
    
    while True:
        print("\ndataanalysistool")
        print("1. 加载data")
        print("2. 查看databasicinformation")
        print("3. 查看describes性statisticsinformation")
        print("4. 绘制直方graph")
        print("5. 绘制散点graph")
        print("6. 绘制相关性热力graph")
        print("7. 绘制箱线graph")
        print("8. 退出")
        
        choice = input("请选择operation: ")
        
        if choice == '1':
            # 加载data
            file_path = input("请输入filepath: ")
            success, message = analyzer.load_data(file_path)
            print(message)
            
        elif choice == '2':
            # 查看databasicinformation
            info = analyzer.get_basic_info()
            print(info)
            
        elif choice == '3':
            # 查看describes性statisticsinformation
            stats = analyzer.get_descriptive_stats()
            print(stats)
            
        elif choice == '4':
            # 绘制直方graph
            if analyzer.data is None:
                print("请先加载data")
                continue
            column = input("请输入要绘制直方graph 列名: ")
            bins = input("请输入 bins 数量 (默认20) : ")
            bins = int(bins) if bins else 20
            success, message = analyzer.plot_histogram(column, bins)
            print(message)
            
        elif choice == '5':
            # 绘制散点graph
            if analyzer.data is None:
                print("请先加载data")
                continue
            x_column = input("请输入 X 轴列名: ")
            y_column = input("请输入 Y 轴列名: ")
            success, message = analyzer.plot_scatter(x_column, y_column)
            print(message)
            
        elif choice == '6':
            # 绘制相关性热力graph
            if analyzer.data is None:
                print("请先加载data")
                continue
            success, message = analyzer.plot_correlation_heatmap()
            print(message)
            
        elif choice == '7':
            # 绘制箱线graph
            if analyzer.data is None:
                print("请先加载data")
                continue
            column = input("请输入要绘制箱线graph 列名: ")
            success, message = analyzer.plot_boxplot(column)
            print(message)
            
        elif choice == '8':
            # 退出
            print("再见!")
            break
        
        else:
            print("无效 选择, 请重 new 输入")

if __name__ == "__main__":
    main()

3.4 run and test

  1. installation必要 library: pip install numpy pandas matplotlib seaborn
  2. 将 on 述code保存 for data_analyzer.py
  3. 准备一个datafile (CSV, Excel or JSON格式)
  4. in commands行inrun: python data_analyzer.py
  5. 按照提示foroperation, 加载data并foranalysis

4. case三: automation脚本

4.1 project介绍

writing一个automation脚本, 用于批量processingfile, 发送email, 定时执行tasketc.operation.

4.2 techniques栈

  • Python3 标准library: os, shutil, smtplib, schedule etc.

4.3 codeimplementation

#!/usr/bin/env python3
"""
automation脚本tool
"""

import os
import shutil
import smtplib
import schedule
import time
import datetime
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication

class AutomationTool:
    """automationtoolclass"""
    
    def batch_rename_files(self, directory, old_pattern, new_pattern):
        """批量renamefile"""
        if not os.path.exists(directory):
            return False, "Table of Contents不存 in "
        
        renamed_count = 0
        for filename in os.listdir(directory):
            if old_pattern in filename:
                new_filename = filename.replace(old_pattern, new_pattern)
                old_path = os.path.join(directory, filename)
                new_path = os.path.join(directory, new_filename)
                os.rename(old_path, new_path)
                renamed_count += 1
        
        return True, f"成功rename {renamed_count} 个file"
    
    def batch_copy_files(self, source_dir, dest_dir, file_extension=None):
        """批量copyfile"""
        if not os.path.exists(source_dir):
            return False, "sourcesTable of Contents不存 in "
        
        if not os.path.exists(dest_dir):
            os.makedirs(dest_dir)
        
        copied_count = 0
        for filename in os.listdir(source_dir):
            if file_extension and not filename.endswith(file_extension):
                continue
            source_path = os.path.join(source_dir, filename)
            dest_path = os.path.join(dest_dir, filename)
            if os.path.isfile(source_path):
                shutil.copy2(source_path, dest_path)
                copied_count += 1
        
        return True, f"成功copy {copied_count} 个file"
    
    def send_email(self, smtp_server, smtp_port, sender, password, receivers, subject, body, attachments=None):
        """发送email"""
        try:
            # creationemailobject
            msg = MIMEMultipart()
            msg['From'] = sender
            msg['To'] = ', '.join(receivers)
            msg['Subject'] = subject
            
            # 添加email正文
            msg.attach(MIMEText(body, 'plain', 'utf-8'))
            
            # 添加附件
            if attachments:
                for attachment in attachments:
                    if os.path.exists(attachment):
                        with open(attachment, 'rb') as f:
                            part = MIMEApplication(f.read())
                            part.add_header('Content-Disposition', 'attachment', filename=os.path.basename(attachment))
                            msg.attach(part)
            
            # 连接SMTPserverconcurrent送email
            server = smtplib.SMTP(smtp_server, smtp_port)
            server.starttls()
            server.login(sender, password)
            server.send_message(msg)
            server.quit()
            
            return True, "email发送成功"
        except Exception as e:
            return False, f"发送失败: {str(e)}"
    
    def schedule_task(self, interval, task_function, *args, **kwargs):
        """定时执行task"""
        # 根据间隔class型设置定时task
        if interval == 'minute':
            schedule.every().minute.do(task_function, *args, **kwargs)
        elif interval == 'hour':
            schedule.every().hour.do(task_function, *args, **kwargs)
        elif interval == 'day':
            schedule.every().day.do(task_function, *args, **kwargs)
        elif interval == 'week':
            schedule.every().week.do(task_function, *args, **kwargs)
        else:
            return False, "不support 间隔class型"
        
        return True, "定时task已设置"
    
    def run_scheduled_tasks(self):
        """run定时task"""
        print("开始run定时task, 按 Ctrl+C 停止")
        while True:
            schedule.run_pending()
            time.sleep(1)

def example_task():
    """exampletask"""
    print(f"[{datetime.datetime.now()}] 执行exampletask")

def main():
    """主function"""
    tool = AutomationTool()
    
    while True:
        print("\nautomation脚本tool")
        print("1. 批量renamefile")
        print("2. 批量copyfile")
        print("3. 发送email")
        print("4. 设置定时task")
        print("5. 退出")
        
        choice = input("请选择operation: ")
        
        if choice == '1':
            # 批量renamefile
            directory = input("请输入Table of Contentspath: ")
            old_pattern = input("请输入要replace string: ")
            new_pattern = input("请输入 new string: ")
            success, message = tool.batch_rename_files(directory, old_pattern, new_pattern)
            print(message)
            
        elif choice == '2':
            # 批量copyfile
            source_dir = input("请输入sourcesTable of Contentspath: ")
            dest_dir = input("请输入目标Table of Contentspath: ")
            file_extension = input("请输入filescale名 (留空表示所 has file) : ")
            success, message = tool.batch_copy_files(source_dir, dest_dir, file_extension)
            print(message)
            
        elif choice == '3':
            # 发送email
            smtp_server = input("请输入SMTPserver地址: ")
            smtp_port = int(input("请输入SMTPserver端口: "))
            sender = input("请输入发件人邮箱: ")
            password = input("请输入发件人邮箱password: ")
            receivers = input("请输入收件人邮箱 ( many 个邮箱用逗号分隔) : ").split(',')
            subject = input("请输入email主题: ")
            body = input("请输入email正文: ")
            attachments = input("请输入附件path ( many 个附件用逗号分隔, 留空表示无附件) : ").split(',')
            attachments = [a.strip() for a in attachments if a.strip()]
            
            success, message = tool.send_email(smtp_server, smtp_port, sender, password, receivers, subject, body, attachments)
            print(message)
            
        elif choice == '4':
            # 设置定时task
            interval = input("请输入执行间隔 (minute/hour/day/week) : ")
            success, message = tool.schedule_task(interval, example_task)
            print(message)
            if success:
                try:
                    tool.run_scheduled_tasks()
                except KeyboardInterrupt:
                    print("定时task已停止")
            
        elif choice == '5':
            # 退出
            print("再见!")
            break
        
        else:
            print("无效 选择, 请重 new 输入")

if __name__ == "__main__":
    main()

4.4 run and test

  1. 将 on 述code保存 for automation_tool.py
  2. such as果需要using定时taskfunctions, 可能需要installation schedule library: pip install schedule
  3. in commands行inrun: python automation_tool.py
  4. 按照提示foroperation, test不同 automationfunctions

互动练习: creation一个filebackuptool

writing一个filebackuptool, implementation以 under functions:

  1. 选择要backup sourcesTable of Contents
  2. 选择backup目标Table of Contents
  3. 可以选择 is 否压缩backupfile
  4. 可以设置backup频率 (手动, 每天, 每周etc.)
  5. backupcompletion after 发送notificationemail

提示: 可以using shutil moduleforfilecopy and 压缩, using schedule module设置定时task, using smtplib module发送email.