由于运维工作需要,经常需要将一些数据从数据库中导出,发送给运营和需求部门,天天去手动查询,又有点太费时间了,于是研究学习了Python的基本功能,通过Python脚本和Linux 的crontab命令实现了每天自动化的数据查询和邮件发送,今天小编就来说说关于python邮件发送教程?下面更多详细答案一起来看看吧!
python邮件发送教程
背景由于运维工作需要,经常需要将一些数据从数据库中导出,发送给运营和需求部门,天天去手动查询,又有点太费时间了,于是研究学习了Python的基本功能,通过Python脚本和Linux 的crontab命令实现了每天自动化的数据查询和邮件发送。
代码实现定义了以下几个代码模块实现了配置文件读取、日志记录、数据库连接访问查询、导出到xlsx和带附件的邮件发送功能。1、demo.py 示例文件2、config.yml 配置文件3、common_log.py 实现日志记录4、common_db.py 实现数据库连接和访问5、common_xlsx.py 实现数据表格的处理6、common_email.py 实现带附件的邮件发送
1、demo.py 示例
# coding: utf-8
import common_db as mydb
import common_xlsx as my_xlsx
import common_log as mylog
import common_email as my_email
if __name__ == "__main__":
phone = '13********7'
sql = """ select orderno,orderAmount,actualAmount,phone order where phone= '{0}' """.format(phone)
# 查询订单
result = mydb.select_by_parameters(sql)
filename = "测试.xlsx"
sheet_name = '订单查询'
# 邮件接收人
receivers = "zhangsan@aliyun.com"
receivers_cc = "lisi@aliyun.com"
# 邮件抄送人
if len(result) > 0:
# 创建工作表
my_xlsx.create_xlsx(filename)
mylog.logger.info("创建工作表成功" filename)
# 将查询结果放入工作表
my_xlsx.create_sheet_in_xlsx(filename, sheet_name, result,0)
mylog.logger.info("创建工作簿成功" sheet_name)
# 对工作簿求和
my_xlsx.sum_col_for_sheet(filename, sheet_name)
mylog.logger.info("求和汇总成功")
# 删除空工作表
my_xlsx.delete_sheet_from_xlsx(filename,'Sheet')
# 发送邮件
my_email.to_send_email("测试查询结果", receivers, receivers_cc, filename, "订单查询结果.xlsx")
mylog.logger.info("发送邮件成功")
mysql:
host: 192.168.x.x
port: 3306
username: xxxx
password: xxxx
database: xxxx
log:
log_path: D:\log
log_size: 8
log_num: 3
email:
smtp: smtp.126.com
# 发送方邮件地址
from: xxxx@aliyun.com
# 发送方授权码
password: fdsafafdsafsa
import logging.handlers
import logging
import yaml
import os
import sys
# 提供日志功能
class logger:
# 先读取XML文件中的配置数据
# 由于config.xml放置在与当前文件相同的目录下,因此通过 __file__ 来获取XML文件的目录,然后再拼接成绝对路径
# 这里利用了lxml库来解析XML
# root = etree.parse(os.path.join(os.path.dirname(__file__), 'config.xml')).getroot()
# 先读取yml中的配置数据
# 由于
# 读取日志文件保存路径
with open('config.yml', 'r') as f:
result = yaml.load(f, Loader=yaml.FullLoader)
config_log = result["log"]
logpath = config_log['log_path']
# root.find('logpath').text
# 读取日志文件容量,转换为字节
logsize = 1024*1024*int(config_log['log_size'])
# 读取日志文件保存个数
lognum = int(config_log['log_num'])
# 日志文件名:由用例脚本的名称,结合日志保存路径,得到日志文件的绝对路径
logname = os.path.join(logpath, sys.argv[0].split('/')[-1].split('.')[0]) ".log"
# 初始化logger
log = logging.getLogger()
# 日志格式,可以根据需要设置
fmt = logging.Formatter('[%(asctime)s][%(filename)s][line:%(lineno)d][%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S')
# 日志输出到文件,这里用到了上面获取的日志名称,大小,保存个数
handle1 = logging.handlers.RotatingFileHandler(logname, maxBytes=logsize, backupCount=lognum)
handle1.setFormatter(fmt)
# 同时输出到屏幕,便于实施观察
handle2 = logging.StreamHandler(stream=sys.stdout)
handle2.setFormatter(fmt)
log.addHandler(handle1)
log.addHandler(handle2)
# 设置日志基本,这里设置为INFO,表示只有INFO级别及以上的会打印
log.setLevel(logging.INFO)
# 日志接口,用户只需调用这里的接口即可,这里只定位了INFO, WARNING, ERROR三个级别的日志,可根据需要定义更多接口
@classmethod
def info(cls, msg):
cls.log.info(msg)
return
@classmethod
def warning(cls, msg):
cls.log.warning(msg)
return
@classmethod
def error(cls, msg):
cls.log.error(msg)
return
import time
import pymysql
from common_log import *
# 连接数据库
def get_connection():
_conn_status = True
_max_retries_count = 10 # 设置最大重试次数
_conn_retries_count = 0 # 初始重试次数
_conn_timeout = 3 # 连接超时时间为3秒
with open('config.yml', 'r') as f:
result = yaml.load(f, Loader=yaml.FullLoader)
config_mysql = result["mysql"]
while _conn_status and _conn_retries_count <= _max_retries_count:
try:
connect = pymysql.connect(host=config_mysql['host'], user=config_mysql['username'],
password=config_mysql['password'], database=config_mysql['database'],
port=config_mysql['port'])
_conn_status = False # 如果conn成功则_status为设置为False则退出循环,返回db连接对象
logger.info("连接数据库成功")
return connect
except Exception as e:
_conn_retries_count = 1
logger.info("第%s次连接数据库失败"%(_conn_retries_count))
logger.error(e)
time.sleep(3)
continue
# 查询函数
def select_by_parameters(sql, params=None):
try:
connect = get_connection()
cursor = connect.cursor(pymysql.cursors.DictCursor)
cursor.execute(sql, params)
result = cursor.fetchall()
return result
except Exception as e:
logger.error("执行查询报错")
logger.info(sql)
logger.error(e)
finally:
try:
cursor.close()
except Exception as e:
logger.error("关闭游标对象报错")
logger.error(e)
try:
connect.close()
except Exception as e:
print(e)
print("数据库链接关闭异常")
import openpyxl
import os
from openpyxl.styles import PatternFill, Border, Side, Alignment, Protection, Font, colors
from openpyxl.utils import get_column_letter
# 定义边框
thin_border = Border(left=Side(style='thin', color='FFFFFF'), right=Side(style='thin', color='FFFFFF'),
top=Side(style='thin', color='FFFFFF'), bottom=Side(style='thin', color='FFFFFF'))
# 居中对齐
alignment_center = Alignment(horizontal='center', vertical='center')
# 右对齐
alignment_right = Alignment(horizontal='center', vertical='center')
# 双行填充
fill_double = PatternFill(fgColor='FFDCE6F1', fill_type='solid')
# 单行填充
fill_single = PatternFill(fgColor='FFB8CCE4', fill_type='solid')
# 表头填充
fill_head = PatternFill(fgColor='FF366092', fill_type='solid')
font_head = Font(bold=True, color='FFFFFFFF')
# 1、创建xlsx脚本——在指定的filepath,创建指定的filename的xlsx文件
def create_xlsx(filename):
wb = openpyxl.Workbook()
wb.save(filename)
# 2、增加工作簿脚本——读取指定路径下的xlsx文件,在工作表第index位置增加一个工作簿,并将mysql查询结果result写入到该工作簿
def create_sheet_in_xlsx(filename, sheet_name, result, index):
# 加载文件
wb = openpyxl.load_workbook(filename)
# 在指定位置创建工作表
wb.create_sheet(sheet_name, index)
# 获取新建的工作表
ws = wb[sheet_name]
j = 1
if len(result) > 0:
# 写表头
for key, value in (result[0].items()):
ws.cell(1, j, format(key)).border = thin_border
# 定义对齐方式
ws.cell(1, j).alignment = alignment_center
# 字体 颜色为白色
ws.cell(1, j).font = font_head
# 填充
ws.cell(1, j).fill = fill_head
# 边框
j = j 1
ws.row_dimensions[1].height = 30
# 写数据
# 根据结果集行数量进行循环
for i in range(len(result)):
# 循环当前行,定义j变量为列使用
j = 1
for key, value in (result[i].items()):
if i % 2 == 0:
ws.cell(i 2, j).fill = fill_double
else:
ws.cell(i 2, j).fill = fill_single
ws.cell(i 2, j, value)
ws.cell(i 2, j).border = thin_border
ws.cell(i 2, j).alignment = alignment_center
ws.row_dimensions[i 2].height = 20
if '时间' in format(key):
ws.cell(i 2, j).alignment = alignment_center
ws.cell(i 2, j).number_format = 'yyyy-mm-dd hh:mm:ss'
j = j 1
continue
# 如果字段名称是数量的话,不保留小数
if '数量' in format(key):
ws.cell(i 2, j).number_format = '0'
j = j 1
continue
# 如果是字段名称包含金额的话,则右对齐
if '金额' in format(key):
ws.cell(i 2, j).alignment = alignment_right
ws.cell(i 2, j).number_format = '#,##0.00'
j = j 1
continue
j = j 1
# 保存工作表
wb.save(filename)
# 3、删除工作簿
def delete_sheet_from_xlsx(filename,sheet_name):
wb = openpyxl.load_workbook(filename)
wb.remove_sheet(wb[sheet_name])
wb.save(filename)
# 4、往工作簿中追加行 将mysql的查询结果追加到工作表sheet_name末尾
def add_result_to_sheet(filename,new_sheet_name,result):
wb = openpyxl.load_workbook(filename)
ws = wb[new_sheet_name]
# 获取最大行
mr = ws.max_row
if len(result) > 0:
for i in range(len(result)):
# 循环当前行,定义j变量为列使用
j = 1
for key, value in (result[i].items()):
if i % 2 == 0:
ws.cell(i mr, j).fill = fill_double
else:
ws.cell(i mr, j).fill = fill_single
ws.cell(i mr, j, value)
ws.cell(i mr, j).border = thin_border
ws.cell(i mr, j).alignment = alignment_center
ws.row_dimensions[i 2].height = 20
if '时间' in format(key):
ws.cell(i mr, j).number_format = 'yyyy-mm-dd hh:mm:ss'
j = j 1
continue
# 如果字段名称是数量的话,不保留小数
if '数量' in format(key):
ws.cell(i mr, j).number_format = '0'
j = j 1
continue
# 如果是字段名称包含金额的话,则右对齐
if '金额' in format(key):
ws.cell(i mr, j).alignment = alignment_right
ws.cell(i mr, j).number_format = '#,##0.00'
j = j 1
continue
j = j 1
wb.save(filename)
# 5、删除工作簿中最大行
def delete_max_row_from_sheet(filename,sheet_name):
wb = openpyxl.load_workbook(filename)
ws = wb[sheet_name]
ws.delete_rows(ws.max_row)
wb.save(filename)
# 6、删除工作簿中指定列
def delete_col_from_sheet(filename,sheet_name,colno):
wb = openpyxl.load_workbook(filename)
ws = wb[sheet_name]
ws.delete_cols(colno)
wb.save(filename)
# 7、对表格中金额和数量字段进行求和
def sum_col_for_sheet(filename,sheet_name):
wb = openpyxl.load_workbook(filename)
ws = wb[sheet_name]
for col in list(ws.columns):
l = [c.value for c in col]
if '金额' in l[0] or '数量' in l[0]:
# 本列行的数量
row_size = len(col) 1
# 本列的列号是
col_no = col[1].column
col_code = get_column_letter(col[1].column)
ws.cell(row_size, col_no, "=sum(" str(col_code) str(2) ":" str(col_code) str(
row_size - 1) ")").border = thin_border
ws.cell(row_size, col_no).font = font_head
# 填充
ws.cell(row_size, col_no).fill = fill_head
# 测试添加样式
if '金额' in l[0]:
ws.cell(row_size, col_no).number_format = '#,##0.00'
ws.cell(row_size, col_no).alignment = alignment_right
if '数量' in l[0]:
ws.cell(row_size, col_no).number_format = '0'
ws.cell(row_size, col_no).alignment = alignment_center
ws.row_dimensions[col_no].height = 20
wb.save(filename)
from email.mime.text import MIMEText
from email.header import Header
from email.mime.multipart import MIMEMultipart
from smtplib import SMTP_SSL
from common_log import *
import yaml
#
# file_Name是路径名称加文件名和扩展名;
# new_file_name是在邮件附件中显示的名称
# receivers是收件人列表,中间逗号隔开
# receivers_cc是抄送人列表
# mail_subject是邮件主题
def to_send_email(mail_subject,receivers,receivers_cc,file_name,new_file_name):
with open('config.yml', 'r') as f:
result = yaml.load(f, Loader=yaml.FullLoader)
config_email = result["email"]
password = config_email['password']
msg = MIMEMultipart('related')
msgAlternative = MIMEMultipart('alternative')
msgAlternative.attach(MIMEText("<h1>见附件</h1> <br />", "html", "utf-8"))
msg.attach(msgAlternative)
# file_name 是指文件路径加名称和扩展名
file1 = MIMEText( open(file_name, 'rb').read(), 'base64', 'utf-8' )
file1["Content-Type"] = 'application/octet-stream'
# new_file_name是指邮件附件中显示的名称
file1.add_header('Content-Disposition', 'attachment',filename=new_file_name)
msg.attach(file1)
msg['Subject'] = Header(mail_subject, 'utf-8').encode()
msg['From'] = config_email['from']
msg['To'] = receivers
msg['Cc'] = receivers_cc
try:
smtp = SMTP_SSL(config_email['smtp'])
smtp.login(msg['From'], password)
smtp.sendmail(msg['From'], msg['To'].split(',') msg['Cc'].split(','), msg.as_string())
logger.info("发送邮件成功,邮件接收人是:%s,邮件抄送人是:%s"%(receivers, receivers_cc))
except Exception as e:
logger.error("发送邮件出现错误,邮件接收人是:%s,邮件抄送人是:%s" % (receivers, receivers_cc))
logger.error(e)
finally:
try:
smtp.quit()
except Exception as e:
logger.error(e)