Redisは単なるキャッシュサーバーではありません。インメモリデータストアとして、キャッシュ、セッション管理、リアルタイムランキング、メッセージキュー、Pub/Subメッセージングなど、多彩なユースケースをカバーする万能ツールです。筆者が関わったプロジェクトのほぼすべてでRedisが使われており、もはやWebアプリケーション開発に欠かせない存在と言える。
本記事では、Redisの代表的な活用パターンを実際のコード例とともに解説する。特にキャッシュ戦略とPub/Subメッセージングに焦点を当て、実務で即座に使える知識を提供したい。
最も一般的なキャッシュパターンです。アプリケーションがまずキャッシュを確認し、ヒットしなければデータソースから取得してキャッシュに格納する。
import redis
import json
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
def get_user_profile(user_id: str) -> dict:
cache_key = f"user:profile:{user_id}"
# キャッシュから取得を試みる
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# キャッシュミス:DBから取得
profile = fetch_from_database(user_id)
# キャッシュに格納(TTL: 1時間)
r.setex(cache_key, 3600, json.dumps(profile))
return profile
def update_user_profile(user_id: str, data: dict):
# DBを更新
save_to_database(user_id, data)
# キャッシュを無効化
r.delete(f"user:profile:{user_id}")
このパターンのポイントは、データ更新時にキャッシュを無効化する点です。更新後のデータでキャッシュを書き換える「Write-Through」パターンもあるが、Cache-Asideの方がシンプルで整合性の問題が起きにくい。
キャッシュ設計で注意すべき3つの問題があります。
import random
def set_with_jitter(key: str, value: str, base_ttl: int):
"""TTLにジッターを加えてスタンピードを防ぐ"""
jitter = random.randint(0, base_ttl // 10)
r.setex(key, base_ttl + jitter, value)
def get_with_negative_cache(key: str) -> dict | None:
"""ネガティブキャッシュでペネトレーションを防ぐ"""
cached = r.get(key)
if cached == "__NULL__":
return None
if cached:
return json.loads(cached)
result = fetch_from_database(key)
if result is None:
# 存在しないことをキャッシュ(短いTTL)
r.setex(key, 300, "__NULL__")
return None
r.setex(key, 3600, json.dumps(result))
return result
Redisの Sorted Set は、スコア付きの順序集合を扱うデータ構造で、リアルタイムランキングの実装に最適です。
# スコアの追加・更新
r.zadd("ranking:daily", {"player_A": 1500, "player_B": 2300, "player_C": 1800})
# スコアの加算
r.zincrby("ranking:daily", 100, "player_A")
# トップ10を取得(スコア降順)
top10 = r.zrevrange("ranking:daily", 0, 9, withscores=True)
for rank, (player, score) in enumerate(top10, 1):
print(f"{rank}位: {player} - {int(score)}pt")
# 特定プレイヤーの順位を取得(0始まり)
rank = r.zrevrank("ranking:daily", "player_A")
print(f"player_Aの順位: {rank + 1}位")
Sorted Setの操作はほとんどが O(log N) の計算量で、数百万件のデータがあってもミリ秒単位で応答できます。ゲームのランキングだけでなく、ECサイトの人気商品ランキングや、SNSのトレンドトピックなどにも応用できます。
RedisのPub/Sub機能は、リアルタイム通知やイベント駆動アーキテクチャの構築に利用できます。軽量で導入が簡単なため、KafkaやRabbitMQほどの堅牢性は不要だが、リアルタイム性が求められる場面に適しています。
# パブリッシャー側
def publish_notification(channel: str, message: dict):
r.publish(channel, json.dumps(message))
# 使用例:注文ステータスの通知
publish_notification("orders:status", {
"order_id": "ORD-12345",
"status": "shipped",
"timestamp": "2025-01-22T10:30:00Z"
})
# サブスクライバー側
def subscribe_notifications(channels: list):
pubsub = r.pubsub()
pubsub.subscribe(*channels)
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
process_notification(data)
Redis Pub/Subにはいくつかの制約があります。まず、メッセージの永続化がない。サブスクライバーが接続していない間に発行されたメッセージは失われます。メッセージの信頼性が必要な場合は、Redis Streamsの使用を検討すべきです。
また、Pub/Subのメッセージはクラスタ全体にブロードキャストされるため、大量のチャンネルと大量のメッセージがある環境ではネットワーク帯域に注意が必要です。
Redis 5.0で導入されたStreamsは、Pub/Subの弱点を補完するデータ構造です。メッセージの永続化、コンシューマーグループによる負荷分散、メッセージの再処理が可能で、Kafkaに近い機能を軽量に実現できます。
# メッセージの追加
r.xadd("stream:events", {"type": "page_view", "url": "/products/123", "user": "u001"})
# コンシューマーグループの作成
r.xgroup_create("stream:events", "analytics_group", id="0", mkstream=True)
# コンシューマーグループからの読み取り
messages = r.xreadgroup(
"analytics_group", "consumer_1",
{"stream:events": ">"},
count=10, block=5000
)
本番環境でRedis Streamsを使う場合は、メッセージの確認応答(XACK)と、未処理メッセージの再配信(XCLAIM)の仕組みをきちんと実装することが重要です。
Redisは、その多彩なデータ構造とサブミリ秒の応答速度により、現代のWebアプリケーションに欠かせないインフラとなっています。キャッシュ戦略ではCache-Asideパターンを基本とし、スタンピードやペネトレーションへの対策を忘れないこと。リアルタイムランキングにはSorted Set、軽量な通知にはPub/Sub、堅牢なメッセージングにはStreamsと、ユースケースに応じて機能を使い分けることが成功の鍵です。