AWS Lambda で RSS から取得した情報を Twitter API を使ってツイート(Tweet)する

AWS Lambda で RSS から取得した情報を Twitter API を使ってツイート(Tweet)する:


概要

AWS Lambda で RSS から情報を取得します。その取得した情報(タイトルとURL)を Twitter API を利用して、ツイート(Tweet)します。ツイートした URL は S3 上でブラックリスト(rss.list)として保存し、同じ URL をツイートしないように処理します。ブラックリストは S3 上で BUCKET_NAME/rss/rss.list で設定しています。Trigger は CloudWatch の Event 等で設定できます。


実行環境


コード

lambda_function.py
import feedparser 
from requests_oauthlib import OAuth1Session 
import boto3 
 
# 認証情報を設定する 
client_key = "CLIENT_KEY" 
client_secret = "CLIENT_SECRET" 
resource_owner_key = "RESOURCE_OWNER_KEY" 
resource_owner_secret = "RESOURCE_OWNER_SECRET" 
 
# S3のバケット名、ディレクトリ名、ファイル名を設定する 
bucket_name = "BUCKET_NAME" 
dir_name = "rss" 
file_name = "rss.list" 
file_path = dir_name + "/" + file_name 
 
def lambda_handler(event, context): 
    rss = get_rss() 
    url_list = get_url_list() 
    tweet, url = select_tweet(rss, url_list) 
    twitter = oauth() 
    post_tweet(twitter, tweet, url) 
 
# RSS を取得する 
def get_rss(): 
 
    # 取得したい RSS を設定する 
    url = "http://b.hatena.ne.jp/hotentry/it.rss" 
    rss = feedparser.parse(url) 
 
    # ステータスコードを確認する 
    if int(rss.status) != 200: 
        print("ERROR:" + str(dic.status)) 
        exit() 
 
    # RSS を返却する 
    return rss 
 
# 認証する 
def oauth(): 
 
    # 認証する 
    twitter = OAuth1Session(client_key, client_secret, resource_owner_key, resource_owner_secret) 
 
    # 認証結果を返却する 
    return twitter 
 
# ブラックリストを取得する 
def get_url_list(): 
 
    # バケットとオブジェクトを取得する 
    s3 = boto3.client('s3') 
    res = s3.get_object(Bucket=bucket_name, Key=file_path) 
 
    # Body部を抽出し、UTF-8に変換する 
    body = res['Body'].read() 
    bodystr = body.decode('Shift-JIS') 
 
    # 改行コードごとに区別する 
    url_list = bodystr.split("\n") 
 
    # URLリストを返却する 
    return url_list 
 
# ツイートを選択する 
def select_tweet(rss, url_list): 
 
    # RSS 情報を出力する 
    for entry in rss.entries: 
        if entry.link not in url_list: 
            tweet = entry.title 
            url = entry.link 
 
            # Tweet した URL をブラックリストに追加する 
            add_url(url) 
            return tweet, url 
 
    # RSSのURLがすべてTweetされてた場合、終了する 
    print("Waring : all rss already tweet") 
    exit(0) 
 
# URLをブラックリストに追加する 
def add_url(url): 
 
    # 既存ファイルを読み込み 
    s3 = boto3.client('s3') 
    res = s3.get_object(Bucket=bucket_name, Key=file_path) 
    body = res['Body'].read() 
    bodystr = body.decode('Shift-JIS') 
    bodystr += url + "\n" 
 
    # バケット情報を取得する 
    s3 = boto3.resource('s3') 
    bucket = s3.Bucket(bucket_name) 
 
    # ファイルに書き込む 
    ret = bucket.put_object( \ 
        ACL='private', \ 
        Body=bodystr, \ 
        Key=file_path, \ 
        ContentType='text/plain' \ 
    ) 
 
# ツイートする 
def post_tweet(twitter, tweet, url): 
 
    # ツイート内容を設定する 
    text = tweet + " " + url 
 
    # ツイートをする 
    api = 'https://api.twitter.com/1.1/statuses/update.json' 
    params = {'status':text} 
    req = twitter.post(api, params = params) 
 
    # ツイートの結果 
    if req.status_code == 200: 
        print("Success Tweet." + tweet + url) 
    else: 
        print("Error: %d" % req.status_code) 

コメント

このブログの人気の投稿

投稿時間:2021-06-20 02:06:12 RSSフィード2021-06-20 02:00 分まとめ(3871件)

投稿時間:2021-04-30 23:37:32 RSSフィード2021-04-30 23:00 分まとめ(42件)

投稿時間:2023-02-05 02:09:04 RSSフィード2023-02-05 02:00 分まとめ(9件)