システムから送信したメールの返信内容を各担当者がシステムに反映されているかチェックするプログラムを作成するため、メールをGmailで受信し、メールの解析を行うことにした。 この記事ではGmailAPIのサンプルコードと解説をします。
前提条件
- Mac M1(AppleSilicon)
- Python3.12
- GmailAPI v1
- GmailAPIの認証情報の設定、JSONファイル取得済(疎通確認済)
全体の流れ
- Google認証サーバーに認証情報を送信し、アクセストークンを受取
- GmailAPIサービス起動
- 取得したい条件を設定したクエリを生成し、GmailAPIからメールIDを取得する
- メールIDからメールの詳細情報を取得し、件名など必要情報を取得する
- ターミナルから実行
完成形はこちら GmailAPIメール情報取得 - GitHub
1.Google認証サーバーに認証情報を送信し、アクセストークンを受取
必要パッケージのインストール
1pip install google-api-python-client google-auth-httplib2 google-auth-oauthlib
ダウンロードした認証情報のJSONファイル(credentials.json)をプログラムと同ディレクトリに保存し、認証サーバー送信する 受け取ったアクセストークンを保存し、有効期間内は同トークンを使用し、有効期限切れの場合は更新する
1import os.path2
3from google.auth.transport.requests import Request4from google.oauth2.credentials import Credentials5from google_auth_oauthlib.flow import InstalledAppFlow6
7# 使用するGmailAPI8SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]9
10# GmailAPIトークン取得11def getGmailToken():12 creds = None13
14 if os.path.exists("token.json"):15 creds = Credentials.from_authorized_user_file("token.json", SCOPES)13 collapsed lines
16 if not creds or not creds.valid:17 if creds and creds.expired and creds.refresh_token:18 creds.refresh(Request())19 else:20 flow = InstalledAppFlow.from_client_secrets_file(21 "credentials.json", SCOPES22 )23 creds = flow.run_local_server(port=0)24 # Save the credentials for the next run25 with open("token.json", "w") as token:26 token.write(creds.to_json())27
28 return creds
2.GmailAPIのサービスを起動
GmailAPIのサービスを起動
1from googleapiclient.discovery import build2
3# GmailAPI サービス起動4def buildGmailService(creds):5
6 service = build("gmail", "v1", credentials=creds)7 return service
3.取得したい条件を設定したクエリを生成し、GmailAPIからメールIDを取得する
GmailAPIのメール一覧を取得するAPIから取得したい条件を設定し、メールIDを取得する メールは1回のリクエストで500件まで取得可能 将来的にGoogleCloudFuncitonsで使用する予定なのでエラーが出たときに強制終了「sys.exit()」していますが、必要に応じたプログラムを改変してください。 また、下記のプログラムはメールIDだけでなくスレッドIDも取得しています。
1import sys2
3from googleapiclient.errors import HttpError4
5# メール取得条件(meは認証情報のGmail)6USERID = "me"7# 検索条件8# 「in:anywhere」は迷惑メール、ゴミ箱を含む(全メール)9# Gmailの検索条件「https://support.google.com/mail/answer/7190?hl=ja」は参照10Q = "in:anywhere 「gmailの検索条件を記載」"11# 取得件数12NUM = 50013
14# GmailAPI メッセージリスト取得15# Method: users.messages.list の仕様12 collapsed lines
16# https://developers.google.com/gmail/api/reference/rest/v1/users.messages/list?hl=ja17def getGmailMsgList(service,USERID,Q,NUM):18 try:19 # GmailAPIメール一覧取得20 results = service.users().messages().list(userId=USERID,q=Q,maxResults=NUM).execute()21 messages = results.get("messages", [])22
23 except HttpError as error:24 print(f"An error occurred: {error}")25 sys.exit()26
27 return messages
4.メールIDからメールの詳細情報を取得し、件名など必要情報を取得する
1import os.path2import sys3import base644import re5import datetime6import pytz7
8from googleapiclient.errors import HttpError9
10# GmailAPI メッセージ詳細取得11# Method: users.messages.get の仕様12# https://developers.google.com/gmail/api/reference/rest/v1/users.messages/get?hl=ja13def getGmailMsgDetail(service,msgId):14 try:15 detail = service.users().messages().get(userId=USERID,id=msgId).execute()7 collapsed lines
16 except HttpError as error:17 print(f"An error occurred: {error}")18 sys.exit()19
20 return detail21
22messages = getGmailMsgList(service,USERID,Q,NUM)
件名取得
1# GmailAPI 件名取得2def getGmailSubject(headers):3 subject = ''4 for h in headers:5 if h['name'] == 'Subject':6 subject = h['value']7 return subject8
9def main():10 for message in messages:11 # メール詳細取得12 detail = getGmailMsgDetail(service,message["id"])13 d['subject'] = getGmailSubject(detail['payload']['headers'])
メールアドレス取得
正規置換を利用してメールアドレスのみ取得する
1# GmailAPI メールアドレス取得2def getGmailAddr(headers,label):3 subject = ''4 for h in headers:5 if h['name'] == label:6 mail = getMailAddr(h['value'])7 return mail8
9# メールアドレス抽出10def getMailAddr(str):11 mail = ''12 pattern = r'[\w\.-]+@[\w\.-]+'13 match = re.search(pattern, str)14 if match:15 # 抽出結果があればメールアドレスを出力11 collapsed lines
16 mail = match.group()17 return mail18
19def main():20 for message in messages:21 # メール詳細取得22 detail = getGmailMsgDetail(service,message["id"])23 # From取得24 d['fromMail'] = getGmailAddr(detail['payload']['headers'],"From")25 # To取得26 d['toMail'] = getGmailAddr(detail['payload']['headers'],"To")
送受信日時取得
UNIX時間のミリ秒なので1000で割って秒単位にしてから日付型に変更する
1d['unixTime'] = detail['internalDate']
はUNIX時間そのままを取得
1# GmailAPI 送受信日時(日本時間に変更)2def getGmailInternalDate(unixtime):3 utc_datetime = datetime.datetime.fromtimestamp(int(unixtime)/1000, datetime.timezone.utc)4 jst_timezone = pytz.timezone('Asia/Tokyo')5 internaldate = utc_datetime.replace(tzinfo=pytz.utc).astimezone(jst_timezone)6 return internaldate7
8def main():9 for message in messages:10 # メール詳細取得11 detail = getGmailMsgDetail(service,message["id"])12 # 送受信日時取得13 d['unixTime'] = detail['internalDate']14 d['internalDate'] = getGmailInternalDate(detail['internalDate'])
本文取得
planメールとhtmlメールでは格納先が違うのに両方に対応する またbase64形式をUTF-8形式に変換する
1# GmailAPI 本文取得2def getGamilBody(txt):3 message = ''4 # textメール5 if 'data' in txt['payload']['body']:6 message = txt['payload']['body']['data']7 # htmlメール8 elif 'parts' in txt['payload']:9 if 'parts' in txt['payload']['parts'][0]:10 message = txt['payload']['parts'][0]['parts'][0]['body']['data']11 elif 'body' in txt['payload']['parts'][0]:12 message = txt['payload']['parts'][0]['body']['data']13
14 return base64ToUtf8(message)15
10 collapsed lines
16# base64をUTF-8に変換17def base64ToUtf8(str):18 return base64.urlsafe_b64decode(str).decode()19
20def main():21 for message in messages:22 # メール詳細取得23 detail = getGmailMsgDetail(service,message["id"])24 # 本文取得25 d['body'] = getGamilBody(detail)
5.ターミナルから実行
1python3 main.py
参考
下記の記事を参考にさせていただきました。