百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

Gmail API配置+Python实现google邮件发送完整指南,出海必备!

moboyou 2025-08-05 03:26 11 浏览

最近在做个海外项目,需要使用邮箱通知功能!最开始时候用的163邮箱发现各种收件延迟,无奈之下决定弃用国内邮箱,改用海外最大用户平台谷歌的gmail进行邮件发送。不过登录最新的Gmail邮箱设置界面,可以看到google已经把IMAP权限关闭了,只保留了POP收邮件的功能,所以无法通过IMAP里用户名密码方式直接发送邮件,经过一番折腾,终于把发件功能调通了,接下来我将详细分享下如何配置gmail以及发邮件的源码

开启Gmail的API邮件发送功能

1、google cloud创建API项目

首先进到谷歌云API控制台,创建一个Email项目:
https://console.cloud.google.com/apis/dashboard

2、配置Gmail API

接下来,进到API控制台:
https://console.cloud.google.com/apis/library/browse,搜索Gmail API

点击启用Gmail API

3、配置OAuth consent screen

上一步开启Gmail API后,Enable按钮会变成Manager,点击Manager按钮进入以下页面,选择OAuth consent screen

点击OAuth consent screen后会进到以下页面,点击Get started按钮

这里输入你的应用名,选择完你的gmail邮箱后点击Next

接下来进入到Audience页面,"Audience"选项用于指定谁可以访问您的应用程序。这有助于控制应用程序的访问范围和发布流程。以下是两个选项的作用:

  1. Internal(内部): 仅限于组织内部的用户访问。 您不需要将应用提交进行验证。 适用于仅供内部员工或团队成员使用的应用。
  2. External(外部): 可供任何拥有 Google 帐号的测试用户访问。 应用程序开始时处于测试模式,仅对您添加到测试用户列表中的用户可用。 一旦准备好推向生产环境,可能需要对应用进行验证。 适用于希望在更广泛的用户群体中进行测试或最终发布给公众的应用。

选择合适的选项可以帮助你根据应用的目标用户群体和发布策略来配置访问权限,这里我选择的是External,大家可以根据自己实际情况选择,然后继续点击Next

填写联系人信息,这里可以写个你自己的常用邮箱,点击Next

选择同意协议,点击创建

4、创建OAuth client

等待5秒左右,底下会弹出OAuth配置创建完成消息,继续点击Create OAuth client

这里输入你的OAuth客户端名称,并选择客户端的类型,我使用的Desktop,这里后面发布APP会要用到,不过我们用的Test模式,这里可以随便选个类型!

点击Create后,就会进入到Client ID的生成页面,这里Client ID可以先复制下来

5、获取OAuth Client的json密钥

点击OK后,会自动进到Clients列表,这里我们点击编辑图标

在client secret右侧点击下载图标,下载client和secret的json配置信息重命名为client_secret.json

进入到 /tmp目录,把重命名后的client_secret.json放在/tmp目录下,这个文件后面会用到

生成代码授权token

接下来,我们就可以开始写代码发送邮件了。因google要求首次调用邮件发送接口必须到页面再去授权一次,生成授权token到本地后再去调用发送邮件就无需再调用了!

1、安装python环境

这里选择python因为它代码可以直接pip安装google-api的包,使用其余语言可以参考我下面代码用AI工具(
https://ai.quanyouhulian.com/)再去转一道,这里推荐使用python3.11,安装教程可以参考:
https://mp.weixin.qq.com/s/Me5H_ESVk4SvzqTrm8RqMg

2、安装谷歌google-api以及google-auth依赖

安装完python3.11后,进入到venv虚拟环境中,直接使用以下命令安装相关的依赖包

pip install google-api-python-client google-auth-httplib2 google-auth-oauthlib

安装如下

3、编写python发送邮件的代码

这里我使用的IDE工具是Cursor,先写一个谷歌邮件发送功能类

# -*- coding: utf-8 -*-
import base64
import logging
import os
import sys
import traceback
import time
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formataddr
from types import TracebackType
from typing import Optional

from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import webbrowser

logging.basicConfig(level=logging.INFO)


class GoogleEmail:
    def __init__(self, client_secret_path: str = "", token_path: str = "", headless: bool = False) -> None:
        """
        Initialize an instance of the GoogleEmail class.

        Args:
            client_secret_path (str): The path of the client secret json file. Defaults to "".
            token_path (str): The path for saving/loading the token. If empty, uses default path. Defaults to "".
            headless (bool): Whether to use console mode for OAuth (for servers without browser). Defaults to False.

        Returns:
            None
        """
        self._client_secret_path = client_secret_path
        self._token_path = token_path
        self._headless = headless
        self._gmail_service = None
        self._init()

    def _init(self) -> None:
        """
        Initialize the gmail_service.

        Returns:
            None
        """
        if not os.path.exists(self._client_secret_path):
            logging.error(f"客户端密钥文件不存在: {self._client_secret_path}")
            sys.exit(1)
            
        tmp_dir = os.path.join(os.path.dirname(__file__), "tmp")
        os.makedirs(tmp_dir, exist_ok=True)
        
        # 使用用户指定的token路径或默认路径
        google_token_path = self._token_path if self._token_path else os.path.abspath(os.path.join(tmp_dir, "google_email_token.json"))
        
        credentials = None
        if os.path.exists(google_token_path):
            try:
                credentials = Credentials.from_authorized_user_file(
                    filename=google_token_path,
                    scopes=["https://www.googleapis.com/auth/gmail.send"]
                )
            except Exception as e:
                logging.error(f"读取token文件失败: {e}\n{traceback.format_exc()}")
                sys.exit(1)
                
        if not credentials or not credentials.valid:
            if credentials and credentials.expired and credentials.refresh_token:
                try:
                    credentials.refresh(Request())
                except Exception as e:
                    logging.error(f"刷新token失败: {e}\n{traceback.format_exc()}")
                    # 如果刷新失败,删除旧token文件并重新授权
                    if os.path.exists(google_token_path):
                        os.remove(google_token_path)
            else:
                try:
                    flow = InstalledAppFlow.from_client_secrets_file(
                        client_secrets_file=self._client_secret_path,
                        scopes=["https://www.googleapis.com/auth/gmail.send"]
                    )
                    
                    if self._headless:
                        # 无头模式下显示URL并接收输入的验证码
                        logging.info(f"请在浏览器中打开以下URL进行授权:")
                        logging.info(flow.authorization_url()[0])
                        code = input("请输入授权码: ")
                        flow.fetch_token(code=code)
                        credentials = flow.credentials
                    else:
                        try:
                            # 尝试检测是否有可用的浏览器
                            webbrowser.get()
                            credentials = flow.run_local_server(port=0)
                        except webbrowser.Error:
                            logging.warning("未检测到可用的浏览器,切换到手动授权模式")
                            logging.info(f"请在浏览器中打开以下URL进行授权:")
                            logging.info(flow.authorization_url()[0])
                            code = input("请输入授权码: ")
                            flow.fetch_token(code=code)
                            credentials = flow.credentials
                except Exception as e:
                    logging.error(f"OAuth授权失败: {e}\n{traceback.format_exc()}")
                    sys.exit(1)
                    
            with open(google_token_path, "w") as f:
                f.write(credentials.to_json())

        try:
            # 设置超时参数
            self._gmail_service = build("gmail", "v1", credentials=credentials, cache_discovery=False)
        except Exception as e:
            logging.error(f"创建Gmail服务失败: {e}\n{traceback.format_exc()}")
            sys.exit(1)

    def send(self,
             subject: str,
             body: str,
             to_recipients: str,
             cc_recipients: Optional[str] = None,
             bcc_recipients: Optional[str] = None,
             attachment_path: Optional[str] = None,
             sender_name: Optional[str] = None,
             sender_email: Optional[str] = None,
             retry_times: int = 3,
             retry_interval: int = 5,
             timeout: int = 30
             ) -> None:
        """
        Send an email using Gmail API.

        Args:
            subject (str): The email subject.
            body (str): The email body.
            to_recipients (str): Comma-separated email addresses of the primary recipients.
            cc_recipients (str): Comma-separated email addresses of the CC recipients. Default is None.
            bcc_recipients (str): Comma-separated email addresses of the BCC recipients. Default is None.
            attachment_path (str): Path to the file to be attached. Default is None (no attachment).
            sender_name (str): Display name for the sender. Default is None (uses email address).
            sender_email (str): Email address to display with sender name. Default is None (uses "me").
            retry_times (int): Number of retries if sending fails. Default is 3.
            retry_interval (int): Interval between retries in seconds. Default is 5.
            timeout (int): Timeout for HTTP requests in seconds. Default is 30.

        Returns:
            None
        """
        message = MIMEMultipart()
        message["subject"] = subject
        message.attach(MIMEText(body, "plain"))

        message["to"] = to_recipients

        if cc_recipients:
            message["cc"] = cc_recipients

        if bcc_recipients:
            message["bcc"] = bcc_recipients
            
        if sender_name:
            # 使用formataddr格式化发件人信息为"Name <email>"格式
            email = sender_email if sender_email else "me"
            message["from"] = formataddr((sender_name, email))

        if attachment_path:
            if not os.path.exists(attachment_path):
                logging.error(f"附件不存在: {attachment_path}")
                return
                
            attachment_name = os.path.basename(attachment_path)
            attachment = MIMEBase("application", "octet-stream")
            with open(attachment_path, "rb") as f:
                attachment.set_payload(f.read())
            encoders.encode_base64(attachment)
            attachment.add_header("Content-Disposition", f"attachment; filename={attachment_name}")
            message.attach(attachment)

        raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode("utf-8")
        
        if self._gmail_service is None:
            logging.error("Gmail服务未初始化")
            return
        
        for attempt in range(retry_times):
            try:
                logging.info(f"尝试发送邮件 (尝试 {attempt+1}/{retry_times})...")
                request = self._gmail_service.users().messages().send(
                    userId="me", 
                    body={"raw": raw_message}
                )
                # 设置请求超时
                request.http.timeout = timeout
                response = request.execute()
                logging.info(f"邮件发送成功, 响应: {response}")
                return
            except HttpError as e:
                logging.error(f"发送邮件时出现HTTP错误: {e.status_code} - {e.reason}")
                if e.status_code == 429 or e.status_code >= 500:  # 限流或服务器错误
                    if attempt < retry_times - 1:
                        logging.info(f"将在 {retry_interval} 秒后重试...")
                        time.sleep(retry_interval)
                    else:
                        logging.error("达到最大重试次数,放弃发送")
                else:
                    logging.error(f"发送邮件失败: {e}")
                    break
            except Exception as e:
                logging.error(f"发送邮件失败: {e}\n{traceback.format_exc()}")
                if attempt < retry_times - 1:
                    logging.info(f"将在 {retry_interval} 秒后重试...")
                    time.sleep(retry_interval)
                else:
                    logging.error("达到最大重试次数,放弃发送")

    @staticmethod
    def generate_token(client_secret_path: str, token_output_path: str) -> None:
        """
        在本地生成令牌文件,用于上传到无浏览器环境的服务器

        Args:
            client_secret_path (str): 客户端密钥文件路径
            token_output_path (str): 令牌输出路径

        Returns:
            None
        """
        if not os.path.exists(client_secret_path):
            logging.error(f"客户端密钥文件不存在: {client_secret_path}")
            sys.exit(1)
            
        try:
            flow = InstalledAppFlow.from_client_secrets_file(
                client_secrets_file=client_secret_path,
                scopes=["https://www.googleapis.com/auth/gmail.send"]
            )
            credentials = flow.run_local_server(port=0)
            with open(token_output_path, "w") as f:
                f.write(credentials.to_json())
            logging.info(f"令牌已成功生成到: {token_output_path}")
        except Exception as e:
            logging.error(f"生成令牌失败: {e}\n{traceback.format_exc()}")
            sys.exit(1)


if __name__ == "__main__":
    import argparse
    
    parser = argparse.ArgumentParser(description="Google Email Sender")
    parser.add_argument("--generate-token", action="store_true", help="仅生成令牌文件")
    parser.add_argument("--client-secret", type=str, default="/tmp/client_secret.json", help="客户端密钥文件路径")
    parser.add_argument("--token-path", type=str, default="", help="令牌文件路径")
    parser.add_argument("--headless", action="store_true", help="使用无头模式(无浏览器环境)")
    parser.add_argument("--to", type=str, default="", help="收件人邮箱")
    parser.add_argument("--subject", type=str, default="Test Email", help="邮件主题")
    parser.add_argument("--body", type=str, default="This is a test email.", help="邮件内容")
    parser.add_argument("--sender-name", type=str, default="", help="发件人显示名称")
    parser.add_argument("--sender-email", type=str, default="", help="发件人邮箱地址")
    parser.add_argument("--timeout", type=int, default=30, help="请求超时时间(秒)")
    
    args = parser.parse_args()
    
    if args.generate_token:
        token_path = args.token_path if args.token_path else os.path.join(os.path.dirname(__file__), "tmp", "google_email_token.json")
        GoogleEmail.generate_token(args.client_secret, token_path)
    else:
        if not args.to:
            logging.error("请指定收件人邮箱地址")
            sys.exit(1)
            
        # 发送邮件
        GoogleEmail(
            client_secret_path=args.client_secret,
            token_path=args.token_path,
            headless=args.headless
        ).send(
            to_recipients=args.to,
            subject=args.subject,
            body=args.body,
            sender_name=args.sender_name,
            sender_email=args.sender_email,
            timeout=args.timeout
        )

4、生成授权token.json文件

因为我发送邮件功能要部署到云服务器,但云服务器又打不开浏览器完成授权,所以这里我们要分三步走:

  • 4.1、先本地运行python文件,跳到web页面去做google的授权,把授权的token.json文件存储到本地
  • 4.2、再把本地的token.json文件推送到云服务器
  • 4.3、再云服务器启动时候直接读取本地的token.json方式做鉴权,就能实现直接发送了

先在本地计算机运行以下命令,这里通过本地的client_secret.json去生成google的授权token.json后,再把授权后的token.json存储在本地/tmp下

# 在本地计算机上运行
python google_email.py --generate-token --client-secret=/tmp/client_secret.json --token-path=/tmp/token.json

这段代码执行后,会直接拉起浏览器授权页面,如下:

我这里踩了个坑,没有开权限,这里授权不通过,没有做授权

所以需要重新回到API控制台(
https://console.cloud.google.com/apis/dashboard),点击左侧的OAuth consent screen界面

点击左侧Audience,选择Test users,把刚刚gmail邮件用户添加进来

注意:不要点击Publish app,如果你选择了发布app,需要去左侧Branding菜单配置一堆域名等信息,由于Gmail属于用户敏感信息,必须向Google提交申请才能用于生产环境。如果仅仅是个人或小规模企业使用,保持Testing状态即可。

添加完Test users后,再重新执行在本地计算机上运行命令

# 在本地计算机上运行
python google_email.py --generate-token --client-secret=/tmp/client_secret.json --token-path=/tmp/token.json

授权通过后会进入以下页面,点击Continue

继续点击Continue

授权完成,关闭当前页面

回到/tmp目录下,可以看到会生成一个token.json文件

5、上传token.json文件到云服务器

这里使用scp命令把本地文件上传到服务器去,user和server改成你对应的云服务器的用户名和服务器ip地址

 # 然后将生成的token.json上传到云服务器
scp /tmp/token.json user@server:/tmp/   

6、服务器运行时指定token路径

接下来,登录云服务器,执行以下命令运行发送邮件测试下

# 在服务器上运行时指定token路径
python google_email.py --token-path=/tmp/token.json --to=test@163.com

可以看到成功发送了

7、提供对外API调用接口

这里我们写个controller层,使用fastapi提供对外调用的API接口,再封装下入参

import asyncio
import json
import sys
import traceback
import unittest
import uuid
from io import StringIO
from typing import Dict, List
from unittest.mock import MagicMock, patch

import aiohttp
import requests
import uvicorn
from fastapi import APIRouter, FastAPI, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse

# 添加邮件发送接口
@app.post("/chat/send_gmail", summary="发送Gmail邮件")
async def send_gmail(request: Request) -> Dict:
    # copilot begin
    try:
        # 获取请求数据
        body_bytes = await request.body()
        data = json.loads(body_bytes)
        
        # 从请求中获取必要参数
        subject = data.get("subject", "")
        body = data.get("body", "")
        to_recipients = data.get("to", "")
        cc_recipients = data.get("cc", None)
        bcc_recipients = data.get("bcc", None)
        attachment_path = data.get("attachment", None)
        # 这里修改发件人名称为你自己的
        sender_name = data.get("sender_name", "发件人名称")
        # 这里修改为你自己的gmail邮箱
        sender_email = data.get("sender_email", "你自己的@gmail.com")
        retry_times = data.get("retry_times", 3)
        retry_interval = data.get("retry_interval", 5)
        timeout = data.get("timeout", 30)
        
        # 从配置获取客户端密钥文件路径和token路径
        client_secret_path = get_config("gmail.client_secret_path", "/tmp/client_secret.json")
        token_path = get_config("gmail.token_path", "/tmp/token.json")
        
        # 检查必要参数
        if not subject or not body or not to_recipients:
            return {"code": 1, "msg": "邮件主题、内容和收件人不能为空"}
            
        # 检查token文件是否存在
        import os
        if not os.path.exists(token_path):
            return {"code": 1, "msg": f"Token文件不存在: {token_path}, 请先使用命令行工具生成token"}
        
        # 导入GoogleEmail类
        from ai_shop.base.google_email import GoogleEmail
        
        # 创建GoogleEmail实例并发送邮件,强制使用headless模式避免浏览器弹出
        email_sender = GoogleEmail(
            client_secret_path=client_secret_path,
            token_path=token_path,
            headless=True
        )
        
        email_sender.send(
            subject=subject,
            body=body,
            to_recipients=to_recipients,
            cc_recipients=cc_recipients,
            bcc_recipients=bcc_recipients,
            attachment_path=attachment_path,
            sender_name=sender_name,
            sender_email=sender_email,
            retry_times=retry_times,
            retry_interval=retry_interval,
            timeout=timeout
        )
        
        logger.info(f"邮件发送成功,收件人: {to_recipients}, 主题: {subject}")
        return {"code": 0, "msg": "邮件发送成功"}
    except Exception as e:
        logger.error(f"邮件发送失败: {str(e)}, {traceback.format_exc()}")
        return {"code": 1, "msg": f"邮件发送失败: {str(e)}"}
    # copilot end

# 添加邮件发送接口OPTIONS请求处理
@app.options("/chat/send_gmail")
async def options_send_gmail():
    return Response(status_code=200)

if __name__ == "__main__":
    # 从配置获取服务器设置
    host = get_config("server.host", "0.0.0.0")
    port = get_config("server.port", 8000)

    logger.info(f"正在启动简单AI Shop服务,地址: {host}:{port}")
    logger.info(f"API接口: http://localhost:{port}/chat/completions")
    logger.info(f"API接口: http://localhost:{port}/chat/knowledge/insert")
    logger.info(f"API文档: http://localhost:{port}/docs")
    logger.info("CORS已配置,允许所有来源的跨域请求")
    uvicorn.run(app, host=host, port=port)
# copilot end

测试邮件发送

postman测试发送邮件接口

上面的云服务器工程启动后,使用postman测试下接口,前面填写你自己的云服务器公网ip:端口

收件人邮箱登录验证

进到我的163邮箱收件箱,可以看到成功收到了刚发送的测试邮件

相关推荐

Excel技巧:SHEETSNA函数一键提取所有工作表名称批量生产目录

首先介绍一下此函数:SHEETSNAME函数用于获取工作表的名称,有三个可选参数。语法:=SHEETSNAME([参照区域],[结果方向],[工作表范围])(参照区域,可选。给出参照,只返回参照单元格...

Excel HOUR函数:“小时”提取器_excel+hour函数提取器怎么用

一、函数概述HOUR函数是Excel中用于提取时间值小时部分的日期时间函数,返回0(12:00AM)到23(11:00PM)之间的整数。该函数在时间数据分析、考勤统计、日程安排等场景中应用广泛。语...

Filter+Search信息管理不再难|多条件|模糊查找|Excel函数应用

原创版权所有介绍一个信息管理系统,要求可以实现:多条件、模糊查找,手动输入的内容能去空格。先看效果,如下图动画演示这样的一个效果要怎样实现呢?本文所用函数有Filter和Search。先用filter...

FILTER函数介绍及经典用法12:FILTER+切片器的应用

EXCEL函数技巧:FILTER经典用法12。FILTER+切片器制作筛选按钮。FILTER的函数的经典用法12是用FILTER的函数和切片器制作一个筛选按钮。像左边的原始数据,右边想要制作一...

office办公应用网站推荐_office办公软件大全

以下是针对Office办公应用(Word/Excel/PPT等)的免费学习网站推荐,涵盖官方教程、综合平台及垂直领域资源,适合不同学习需求:一、官方权威资源1.微软Office官方培训...

WPS/Excel职场办公最常用的60个函数大全(含卡片),效率翻倍!

办公最常用的60个函数大全:从入门到精通,效率翻倍!在职场中,WPS/Excel几乎是每个人都离不开的工具,而函数则是其灵魂。掌握常用的函数,不仅能大幅提升工作效率,还能让你在数据处理、报表分析、自动...

收藏|查找神器Xlookup全集|一篇就够|Excel函数|图解教程

原创版权所有全程图解,方便阅读,内容比较多,请先收藏!Xlookup是Vlookup的升级函数,解决了Vlookup的所有缺点,可以完全取代Vlookup,学完本文后你将可以应对所有的查找难题,内容...

批量查询快递总耗时?用Excel这个公式,自动计算揽收到签收天数

批量查询快递总耗时?用Excel这个公式,自动计算揽收到签收天数在电商运营、物流对账等工作中,经常需要统计快递“揽收到签收”的耗时——比如判断某快递公司是否符合“3天内送达”的服务承...

Excel函数公式教程(490个实例详解)

Excel函数公式教程(490个实例详解)管理层的财务人员为什么那么厉害?就是因为他们精通excel技能!财务人员在日常工作中,经常会用到Excel财务函数公式,比如财务报表分析、工资核算、库存管理等...

Excel(WPS表格)Tocol函数应用技巧案例解读,建议收藏备用!

工作中,经常需要从多个单元格区域中提取唯一值,如体育赛事报名信息中提取唯一的参赛者信息等,此时如果复制粘贴然后去重,效率就会很低。如果能合理利用Tocol函数,将会极大地提高工作效率。一、功能及语法结...

Excel中的SCAN函数公式,把计算过程理清,你就会了

Excel新版本里面,除了出现非常好用的xlookup,Filter公式之外,还更新一批自定义函数,可以像写代码一样写公式其中SCAN函数公式,也非常强大,它是一个循环函数,今天来了解这个函数公式的计...

Excel(WPS表格)中多列去重就用Tocol+Unique组合函数,简单高效

在数据的分析和处理中,“去重”一直是绕不开的话题,如果单列去重,可以使用Unique函数完成,如果多列去重,如下图:从数据信息中可以看到,每位参赛者参加了多项运动,如果想知道去重后的参赛者有多少人,该...

Excel(WPS表格)函数Groupby,聚合统计,快速提高效率!

在前期的内容中,我们讲了很多的统计函数,如Sum系列、Average系列、Count系列、Rank系列等等……但如果用一个函数实现类似数据透视表的功能,就必须用Groupby函数,按指定字段进行聚合汇...

Excel新版本,IFS函数公式,太强大了!

我们举一个工作实例,现在需要计算业务员的奖励数据,右边是公司的奖励标准:在新版本的函数公式出来之前,我们需要使用IF函数公式来解决1、IF函数公式IF函数公式由三个参数组成,IF(判断条件,对的时候返...

Excel不用函数公式数据透视表,1秒完成多列项目汇总统计

如何将这里的多组数据进行汇总统计?每组数据当中一列是不同菜品,另一列就是该菜品的销售数量。如何进行汇总统计得到所有的菜品销售数量的求和、技术、平均、最大、最小值等数据?不用函数公式和数据透视表,一秒就...